diff --git a/.env.example b/.env.example index 8aa184b..d72c0f8 100644 --- a/.env.example +++ b/.env.example @@ -39,3 +39,12 @@ BASE_URL=http://localhost:8000 # Link token expiry in minutes (default 10) # AUTH_TOKEN_EXPIRY=10 + +# --------------------------------------------------------------------------- +# JellyStat — PostgreSQL watch-history database +# --------------------------------------------------------------------------- +JELLYSTAT_DB_HOST=localhost +JELLYSTAT_DB_PORT=5432 +JELLYSTAT_DB_USER=postgres +JELLYSTAT_DB_PASSWORD= +JELLYSTAT_DB_NAME=jfstat diff --git a/agents/skills/__init__.py b/agents/skills/__init__.py index 1e37b4d..bdd3a83 100644 --- a/agents/skills/__init__.py +++ b/agents/skills/__init__.py @@ -118,8 +118,8 @@ async def execute_tool( if t.get("function", {}).get("name") == tool_name: # --- Auth gate --- if s.requires_auth and discord_user_id is not None: - from core import auth_store - from auth import get_auth_service + from src import auth_store + from gateway.auth import get_auth_service missing: list[str] = [] for svc in s.requires_auth: if not auth_store.is_authenticated(discord_user_id, svc): @@ -134,6 +134,9 @@ async def execute_tool( + " ".join(f"Send `/login {m}` in a DM to get started." for m in missing) ) # --- End auth gate --- + # Inject discord_user_id so skills can resolve external user IDs + if discord_user_id is not None: + args = {**args, "_discord_user_id": discord_user_id} try: result = await s.execute(tool_name, args) if not result.success: diff --git a/agents/skills/watch_history.py b/agents/skills/watch_history.py index ff414fb..a7c75c8 100644 --- a/agents/skills/watch_history.py +++ b/agents/skills/watch_history.py @@ -1,14 +1,29 @@ """ -Watch History skill — fetch the user's Jellyfin watch history. +Watch History skill — fetch the user's Jellyfin watch history via JellyStat API. -Currently a placeholder — returns a "coming soon" message. -The auth gate (`requires_auth=["jellyfin"]`) is already active: -users who haven't linked Jellyfin will be prompted to /login first. +Requires the user to have linked Jellyfin via `/login jellyfin` in Discord. +The auth gate (`requires_auth=["jellyfin"]`) is already active — users who +haven't linked Jellyfin will be prompted to /login first. + +Architecture +------------ +This skill calls the JellyStat REST API (same FastAPI process, via HTTP) +rather than accessing the PostgreSQL database directly. This keeps the +bot isolated from database credentials. """ from __future__ import annotations +import httpx + from agents.skills import Skill, register, ToolResult +from src import auth_store +from src.config import get_config + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- +BASE_URL = (get_config("BASE_URL") or "http://localhost:8000").rstrip("/") # --------------------------------------------------------------------------- # Tool definitions @@ -20,59 +35,237 @@ TOOLS = [ "function": { "name": "watch_history", "description": ( - "Get the user's recent Jellyfin watch history — movies and TV " - "episodes they have watched, sorted by most recent. " - "Call this when a user asks about their watching activity." + "Get the user's Jellyfin watch history — titles grouped by total " + "watch time in a configurable time window. Use this when a user " + "asks what they've watched, what they've been watching recently, " + "or wants to see their viewing activity." ), "parameters": { "type": "object", "properties": { "limit": { "type": "integer", - "description": "How many items to return (default 10, max 20)", - } + "description": "How many titles to return (default 10, max 20).", + }, + "minutes": { + "type": "integer", + "description": ( + "Time window in minutes. Default 10080 (7 days). " + "Use a large number like 525600 for 'all time' (1 year)." + ), + }, }, }, }, - } + }, + { + "type": "function", + "function": { + "name": "watch_genres", + "description": ( + "Get the user's most-watched genres from Jellyfin, ranked by " + "total watch time. Use this when a user asks what kinds of " + "content they watch most, their favourite genres, or what " + "categories dominate their viewing." + ), + "parameters": { + "type": "object", + "properties": { + "minutes": { + "type": "integer", + "description": ( + "Time window in minutes. Default 10080 (7 days). " + "Use a large number like 525600 for 'all time'." + ), + }, + }, + }, + }, + }, + { + "type": "function", + "function": { + "name": "watch_summary", + "description": ( + "Get an all-time Jellyfin watch summary — total watch time, " + "most-watched series, most-watched movie, 30-day and 7-day " + "activity, and top 3 genres. Use this when a user asks for " + "their overall stats, a dashboard, or 'how much have I watched?'." + ), + "parameters": {"type": "object", "properties": {}}, + }, + }, ] # --------------------------------------------------------------------------- -# Executor (placeholder) +# Helpers # --------------------------------------------------------------------------- + +def _resolve_jellyfin_id(args: dict) -> str | None: + """Extract the Jellyfin user ID from auth_store using the injected Discord ID.""" + discord_user_id = args.pop("_discord_user_id", None) + if discord_user_id is None: + return None # not called from Discord — shouldn't happen with auth gate + + auth = auth_store.get_auth(discord_user_id, "jellyfin") + if auth is None or not auth.get("external_user_id"): + return None + + return auth["external_user_id"] + + +async def _fetch_json(url: str) -> dict: + """GET *url* and return the parsed JSON body, or {} on failure.""" + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(url) + resp.raise_for_status() + return resp.json() + + +def _format_seconds(total: float) -> str: + """Convert seconds to a human-friendly string.""" + total = max(total, 0) + hours = int(total // 3600) + minutes = int((total % 3600) // 60) + if hours and minutes: + return f"{hours}h {minutes}m" + if hours: + return f"{hours}h" + if minutes: + return f"{minutes}m" + return f"{int(total)}s" + + +def _format_history(data: dict, limit: int) -> ToolResult: + """Format a watch-history API response for the LLM.""" + items = data.get("items", [])[:limit] + if not items: + return ToolResult.ok("You haven't watched anything in this time window.") + + lines = [f"**Watch History** (last {data.get('window_minutes', '?')} minutes):"] + for i, item in enumerate(items, 1): + duration = _format_seconds(item["watch_time_sec"]) + icon = "📺" if item["media_type"] == "series" else "🎬" + lines.append(f"{i}. {icon} **{item['title']}** — {duration}") + + return ToolResult.ok("\n".join(lines)) + + +def _format_genres(data: dict) -> ToolResult: + """Format a genre-summary API response for the LLM.""" + genres = data.get("genres", []) + if not genres: + return ToolResult.ok("No genre data available for this time window.") + + lines = [f"**Top Genres** (last {data.get('window_minutes', '?')} minutes):"] + for i, g in enumerate(genres, 1): + duration = _format_seconds(g["watch_time_sec"]) + lines.append(f"{i}. **{g['genre']}** — {duration}") + + return ToolResult.ok("\n".join(lines)) + + +def _format_summary(data: dict) -> ToolResult: + """Format a user-summary API response for the LLM.""" + total = _format_seconds(data.get("total_watch_time_sec", 0)) + last_30 = _format_seconds(data.get("total_last_30d_sec", 0)) + last_7 = _format_seconds(data.get("total_last_7d_sec", 0)) + + top_series = data.get("most_watched_series") or "—" + top_movie = data.get("most_watched_movie") or "—" + top_genres = data.get("top_genres", []) + genres_str = ", ".join(top_genres) if top_genres else "—" + + lines = [ + "**Your Jellyfin Summary** (all time):", + f"⏱️ Total watch time: **{total}**", + f"📺 Most-watched series: **{top_series}**", + f"🎬 Most-watched movie: **{top_movie}**", + f"📅 Last 30 days: **{last_30}**", + f"📅 Last 7 days: **{last_7}**", + f"🏷️ Top genres: {genres_str}", + ] + return ToolResult.ok("\n".join(lines)) + + +# --------------------------------------------------------------------------- +# Executor +# --------------------------------------------------------------------------- + + async def _execute(tool_name: str, args: dict) -> ToolResult: - if tool_name == "watch_history": - return ToolResult.ok( - "👷 **Watch History — Coming Soon!**\n\n" - "This feature is currently being built. Soon you'll be able to " - "see your recently watched movies and TV episodes right here.\n\n" - "In the meantime, you can check your watch history directly in Jellyfin." + # 1. Resolve Jellyfin user ID + jellyfin_id = _resolve_jellyfin_id(args) + if jellyfin_id is None: + return ToolResult.fail( + "Your Jellyfin account is not linked. Use `/login jellyfin` in a DM to connect." + ) + + # 2. Route to the right JellyStat endpoint + try: + match tool_name: + case "watch_history": + limit = args.get("limit", 10) + minutes = args.get("minutes", 10080) + url = f"{BASE_URL}/jellystat/history/{jellyfin_id}?minutes={minutes}" + data = await _fetch_json(url) + return _format_history(data, limit) + + case "watch_genres": + minutes = args.get("minutes", 10080) + url = f"{BASE_URL}/jellystat/genres/{jellyfin_id}?minutes={minutes}" + data = await _fetch_json(url) + return _format_genres(data) + + case "watch_summary": + url = f"{BASE_URL}/jellystat/summary/{jellyfin_id}" + data = await _fetch_json(url) + return _format_summary(data) + + case _: + return ToolResult.fail(f"Unknown tool: {tool_name}") + + except httpx.HTTPError: + return ToolResult.fail( + "Could not reach the watch-history service right now. " + "Please try again in a moment." ) - return ToolResult.fail(f"Unknown tool: {tool_name}") # --------------------------------------------------------------------------- # Skill registration # --------------------------------------------------------------------------- +_PROMPT = ( + "## Watch History\n" + "\n" + "You have THREE tools to answer questions about the user's Jellyfin watch activity:\n" + "\n" + "1. **`watch_history`** — per-title watch time in a time window (default: 7 days).\n" + " Use when a user asks what they've watched, to show their history,\n" + " or what they watched this week or yesterday.\n" + "\n" + "2. **`watch_genres`** — watch time broken down by genre.\n" + " Use when a user asks what genres they watch, whether they watch more\n" + " comedy than drama, or what their most-watched genre is.\n" + "\n" + "3. **`watch_summary`** — all-time dashboard: total watch time, most-watched\n" + " series and movie, 30-day and 7-day activity, and top 3 genres.\n" + " Use when a user asks for their stats, how much they've watched in\n" + " total, or what their favourites are.\n" + "\n" + "Always call the appropriate tool before answering — NEVER guess at watch data.\n" + "Format watch times in a human-readable way (hours and minutes), but keep the\n" + "raw data visible too." +) + watch_history_skill = Skill( name="watch_history", - description="User's Jellyfin watch history (coming soon)", + description="User's Jellyfin watch history, genres, and summary stats", requires_auth=["jellyfin"], - prompt_fragment="""## Watch History - -You can fetch the user's Jellyfin watch history with the `watch_history` tool. -Call it when users ask things like: -- "what have I watched?" -- "show my watch history" -- "what did I watch recently?" -- "what was the last movie I saw?" -- "what TV shows have I been watching?" - -The tool is currently a **placeholder** — it returns a "coming soon" message. -Tell the user this feature is being worked on and will be available soon.""", + prompt_fragment=_PROMPT, tools=TOOLS, execute=_execute, ) diff --git a/gateway/jellystat/__init__.py b/gateway/jellystat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gateway/jellystat/api.py b/gateway/jellystat/api.py new file mode 100644 index 0000000..dd60310 --- /dev/null +++ b/gateway/jellystat/api.py @@ -0,0 +1,106 @@ +"""JellyStat REST API — watch history, genre summary, and user summary.""" + +from __future__ import annotations + +import asyncpg +from fastapi import APIRouter, Depends, Query + +from gateway.jellystat.db import get_pool +from gateway.jellystat.models import ( + GenreSummaryResponse, + UserSummaryResponse, + WatchHistoryResponse, +) + +router = APIRouter(prefix="/jellystat", tags=["jellystat"]) + +DEFAULT_WINDOW_MINUTES = 10080 # 7 days + + +# --------------------------------------------------------------------------- +# GET /jellystat/history/{user_id} +# --------------------------------------------------------------------------- + + +@router.get("/history/{user_id}", response_model=WatchHistoryResponse) +async def get_watch_history( + user_id: str, + minutes: int = Query( + default=DEFAULT_WINDOW_MINUTES, ge=1, description="Time window in minutes" + ), + pool: asyncpg.Pool = Depends(get_pool), +): + """Return watch history grouped by title, ordered by most-watched first.""" + rows = await pool.fetch( + "SELECT * FROM fn_user_watch_history($1, $2)", user_id, minutes + ) + return WatchHistoryResponse( + user_id=user_id, + window_minutes=minutes, + items=[ + { + "title": r["title"], + "watch_time_sec": float(r["watch_time_sec"]), + "media_type": r["media_type"], + } + for r in rows + ], + ) + + +# --------------------------------------------------------------------------- +# GET /jellystat/genres/{user_id} +# --------------------------------------------------------------------------- + + +@router.get("/genres/{user_id}", response_model=GenreSummaryResponse) +async def get_genre_summary( + user_id: str, + minutes: int = Query( + default=DEFAULT_WINDOW_MINUTES, ge=1, description="Time window in minutes" + ), + pool: asyncpg.Pool = Depends(get_pool), +): + """Return total watch time per genre, ordered by most-watched first.""" + rows = await pool.fetch( + "SELECT * FROM fn_user_genre_summary($1, $2)", user_id, minutes + ) + return GenreSummaryResponse( + user_id=user_id, + window_minutes=minutes, + genres=[ + {"genre": r["genre"], "watch_time_sec": float(r["watch_time_sec"])} + for r in rows + ], + ) + + +# --------------------------------------------------------------------------- +# GET /jellystat/summary/{user_id} +# --------------------------------------------------------------------------- + + +@router.get("/summary/{user_id}", response_model=UserSummaryResponse) +async def get_user_summary( + user_id: str, + pool: asyncpg.Pool = Depends(get_pool), +): + """Return all-time summary: total watch time, most-watched titles, top genres.""" + rows = await pool.fetch("SELECT * FROM fn_user_summary($1)", user_id) + + # fn_user_summary returns key-value rows — build a dict + # asyncpg already deserialises JSONB → Python objects + metrics: dict[str, object] = {r["metric"]: r["value"] for r in rows} + + top_genres_raw = metrics.get("top_genres", []) + top_genres: list[str] = top_genres_raw if isinstance(top_genres_raw, list) else [] + + return UserSummaryResponse( + user_id=user_id, + total_watch_time_sec=float(metrics.get("total_watch_time", 0)), + most_watched_series=metrics.get("most_watched_series"), + most_watched_movie=metrics.get("most_watched_movie"), + total_last_30d_sec=float(metrics.get("total_last_30d", 0)), + total_last_7d_sec=float(metrics.get("total_last_7d", 0)), + top_genres=top_genres, + ) diff --git a/gateway/jellystat/db.py b/gateway/jellystat/db.py new file mode 100644 index 0000000..09b5b7a --- /dev/null +++ b/gateway/jellystat/db.py @@ -0,0 +1,130 @@ +"""PostgreSQL connection pool for the JellyStat database.""" + +from __future__ import annotations + +import logging +from pathlib import Path + +import asyncpg +from fastapi import FastAPI, Request + +from src.config import get_config + +logger = logging.getLogger("gateway.jellystat") + +# --------------------------------------------------------------------------- +# DSN builder +# --------------------------------------------------------------------------- + + +def _build_dsn() -> str: + """Build a PostgreSQL DSN from individual environment variables.""" + host = get_config("JELLYSTAT_DB_HOST", "localhost") + port = get_config("JELLYSTAT_DB_PORT", "5432") + user = get_config("JELLYSTAT_DB_USER", "postgres") + password = get_config("JELLYSTAT_DB_PASSWORD", "") + dbname = get_config("JELLYSTAT_DB_NAME", "jfstat") + return f"postgresql://{user}:{password}@{host}:{port}/{dbname}" + + +# --------------------------------------------------------------------------- +# Pool lifecycle (called from main.py lifespan) +# --------------------------------------------------------------------------- + + +async def init_pool(app: FastAPI) -> None: + """Create the connection pool and store it on app.state.""" + dsn = _build_dsn() + safe = dsn.split("@")[1] if "@" in dsn else dsn + logger.info("Connecting to JellyStat database at %s", safe) + + pool = await asyncpg.create_pool(dsn, min_size=1, max_size=5) + app.state.jellystat_pool = pool + + # Deploy functions on every startup (CREATE OR REPLACE is idempotent) + await _ensure_functions(pool) + + +async def close_pool(app: FastAPI) -> None: + """Close the pool on shutdown.""" + pool: asyncpg.Pool | None = getattr(app.state, "jellystat_pool", None) + if pool: + await pool.close() + logger.info("JellyStat pool closed") + + +# --------------------------------------------------------------------------- +# FastAPI dependency +# --------------------------------------------------------------------------- + + +async def get_pool(request: Request) -> asyncpg.Pool: + """Return the JellyStat connection pool from app state.""" + return request.app.state.jellystat_pool + + +# --------------------------------------------------------------------------- +# Function deployment +# --------------------------------------------------------------------------- + + +async def _ensure_functions(pool: asyncpg.Pool) -> None: + """Run startup-functions.sql to create or replace all JellyStat functions.""" + sql_path = Path(__file__).parent / "startup-functions.sql" + if not sql_path.exists(): + logger.warning("startup-functions.sql not found — skipping function deployment") + return + + sql = sql_path.read_text() + statements = _split_sql(sql) + + async with pool.acquire() as conn: + for stmt in statements: + try: + await conn.execute(stmt) + except Exception: + # Log but don't crash — functions might already exist + logger.exception("Failed to deploy SQL statement — continuing") + + logger.info("JellyStat functions deployed (%d statements)", len(statements)) + + +def _split_sql(sql: str) -> list[str]: + """ + Split a multi-statement SQL string into individual statements. + + Respects $$ dollar-quoting so that semicolons inside function bodies + don't cause premature splits. Pure comment lines (starting with ``--``) + outside dollar-quoted blocks are stripped. + """ + statements: list[str] = [] + current: list[str] = [] + in_dollar_quote = False + + for line in sql.split("\n"): + stripped = line.strip() + + # Skip pure comment lines outside of dollar-quoted blocks + if not in_dollar_quote and stripped.startswith("--"): + continue + + # Toggle dollar-quote state whenever we see $$ + if "$$" in line: + in_dollar_quote = not in_dollar_quote + + current.append(line) + + # Statement terminator: semicolon at end of line, outside $$ block + if not in_dollar_quote and line.rstrip().endswith(";"): + stmt = "\n".join(current).strip() + if stmt: + statements.append(stmt) + current = [] + + # Catch any trailing statement that wasn't terminated by a semicolon + if current: + stmt = "\n".join(current).strip() + if stmt: + statements.append(stmt) + + return statements diff --git a/gateway/jellystat/models.py b/gateway/jellystat/models.py new file mode 100644 index 0000000..f59d7f0 --- /dev/null +++ b/gateway/jellystat/models.py @@ -0,0 +1,36 @@ +"""Pydantic response models for the JellyStat API.""" + +from pydantic import BaseModel + + +class WatchHistoryItem(BaseModel): + title: str + watch_time_sec: float + media_type: str + + +class WatchHistoryResponse(BaseModel): + user_id: str + window_minutes: int + items: list[WatchHistoryItem] + + +class GenreSummaryItem(BaseModel): + genre: str + watch_time_sec: float + + +class GenreSummaryResponse(BaseModel): + user_id: str + window_minutes: int + genres: list[GenreSummaryItem] + + +class UserSummaryResponse(BaseModel): + user_id: str + total_watch_time_sec: float + most_watched_series: str | None + most_watched_movie: str | None + total_last_30d_sec: float + total_last_7d_sec: float + top_genres: list[str] diff --git a/gateway/jellystat/startup-functions.sql b/gateway/jellystat/startup-functions.sql new file mode 100644 index 0000000..f741482 --- /dev/null +++ b/gateway/jellystat/startup-functions.sql @@ -0,0 +1,224 @@ +-- ============================================================================ +-- JellyStat API Functions +-- Parameterized database functions callable by the API layer as: +-- SELECT * FROM fn_user_watch_history('user_id_here', 10080); +-- SELECT * FROM fn_user_genre_summary('user_id_here', 10080); +-- SELECT * FROM fn_user_summary('user_id_here'); +-- ============================================================================ + +-- ---------------------------------------------------------------------------- +-- 1. User Watch History +-- Returns every distinct title watched in the last N minutes, +-- grouped and summed by title, ordered by most-watched first. +-- ---------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION public.fn_user_watch_history( + p_user_id TEXT, + p_minutes INTEGER DEFAULT 10080 -- 7 days in minutes +) +RETURNS TABLE( + title TEXT, + watch_time_sec NUMERIC, + media_type TEXT +) +LANGUAGE sql +STABLE +AS $$ + SELECT + COALESCE(a."SeriesName", a."NowPlayingItemName") AS title, + SUM(a."PlaybackDuration")::NUMERIC AS watch_time_sec, + CASE + WHEN a."SeriesName" IS NOT NULL THEN 'series' + ELSE 'movie' + END AS media_type + FROM jf_playback_activity a + WHERE a."UserId" = p_user_id + AND a."ActivityDateInserted" + >= NOW() - (p_minutes * INTERVAL '1 minute') + GROUP BY + COALESCE(a."SeriesName", a."NowPlayingItemName"), + CASE WHEN a."SeriesName" IS NOT NULL THEN 'series' ELSE 'movie' END + ORDER BY watch_time_sec DESC; +$$; + +-- ---------------------------------------------------------------------------- +-- 2. Genre Summary +-- Returns total watch time per genre for a user over the last N minutes. +-- Resolves genres for both movies (directly on the item) and series +-- episodes (via jf_library_episodes → jf_library_items chain). +-- ---------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION public.fn_user_genre_summary( + p_user_id TEXT, + p_minutes INTEGER DEFAULT 10080 +) +RETURNS TABLE( + genre TEXT, + watch_time_sec NUMERIC +) +LANGUAGE sql +STABLE +AS $$ + WITH movie_genres AS ( + -- Movies: join playback directly to library_items on NowPlayingItemId + SELECT + genre_item.value AS genre, + SUM(a."PlaybackDuration") AS watch_time_sec + FROM jf_playback_activity a + JOIN jf_library_items i + ON i."Id" = a."NowPlayingItemId" + CROSS JOIN LATERAL jsonb_array_elements_text(i."Genres") AS genre_item(value) + WHERE a."UserId" = p_user_id + AND a."SeriesName" IS NULL -- movies only + AND a."ActivityDateInserted" + >= NOW() - (p_minutes * INTERVAL '1 minute') + AND i."Genres" IS NOT NULL + AND jsonb_array_length(i."Genres") > 0 + GROUP BY genre_item.value + ), + series_genres AS ( + -- Series: playback → episodes → series item → genres + SELECT + genre_item.value AS genre, + SUM(a."PlaybackDuration") AS watch_time_sec + FROM jf_playback_activity a + JOIN jf_library_episodes e + ON e."EpisodeId" = a."EpisodeId" + JOIN jf_library_items i + ON i."Id" = e."SeriesId" + CROSS JOIN LATERAL jsonb_array_elements_text(i."Genres") AS genre_item(value) + WHERE a."UserId" = p_user_id + AND a."SeriesName" IS NOT NULL -- TV episodes only + AND a."ActivityDateInserted" + >= NOW() - (p_minutes * INTERVAL '1 minute') + AND i."Genres" IS NOT NULL + AND jsonb_array_length(i."Genres") > 0 + GROUP BY genre_item.value + ), + combined AS ( + SELECT genre, watch_time_sec FROM movie_genres + UNION ALL + SELECT genre, watch_time_sec FROM series_genres + ) + SELECT + genre, + SUM(watch_time_sec)::NUMERIC AS watch_time_sec + FROM combined + GROUP BY genre + ORDER BY watch_time_sec DESC; +$$; + +-- ---------------------------------------------------------------------------- +-- 3. User Summary +-- One-shot dashboard: all-time stats + recent windows + top genres. +-- Returns key-value rows that the API trivially converts to a JSON object +-- with Object.fromEntries() or similar. +-- ---------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION public.fn_user_summary( + p_user_id TEXT +) +RETURNS TABLE( + metric TEXT, + value JSONB +) +LANGUAGE sql +STABLE +AS $$ + -- total_watch_time (all time) + SELECT 'total_watch_time'::TEXT AS metric, + to_jsonb(COALESCE(SUM("PlaybackDuration"), 0)::NUMERIC) AS value + FROM jf_playback_activity + WHERE "UserId" = p_user_id + + UNION ALL + + -- most_watched_series (by total watch time) + SELECT 'most_watched_series'::TEXT AS metric, + COALESCE( + (SELECT to_jsonb("SeriesName") + FROM jf_playback_activity + WHERE "UserId" = p_user_id + AND "SeriesName" IS NOT NULL + GROUP BY "SeriesName" + ORDER BY SUM("PlaybackDuration") DESC + LIMIT 1), + 'null'::JSONB + ) AS value + + UNION ALL + + -- most_watched_movie (by total watch time) + SELECT 'most_watched_movie'::TEXT AS metric, + COALESCE( + (SELECT to_jsonb("NowPlayingItemName") + FROM jf_playback_activity + WHERE "UserId" = p_user_id + AND "SeriesName" IS NULL + GROUP BY "NowPlayingItemName" + ORDER BY SUM("PlaybackDuration") DESC + LIMIT 1), + 'null'::JSONB + ) AS value + + UNION ALL + + -- total_watch_time_last_month (last 30 days) + SELECT 'total_last_30d'::TEXT AS metric, + to_jsonb(COALESCE(SUM("PlaybackDuration"), 0)::NUMERIC) AS value + FROM jf_playback_activity + WHERE "UserId" = p_user_id + AND "ActivityDateInserted" >= NOW() - INTERVAL '30 days' + + UNION ALL + + -- total_watch_time_last_week (last 7 days) + SELECT 'total_last_7d'::TEXT AS metric, + to_jsonb(COALESCE(SUM("PlaybackDuration"), 0)::NUMERIC) AS value + FROM jf_playback_activity + WHERE "UserId" = p_user_id + AND "ActivityDateInserted" >= NOW() - INTERVAL '7 days' + + UNION ALL + + -- top_genres (top 3 all-time, as a JSON array) + SELECT 'top_genres'::TEXT AS metric, + COALESCE( + (SELECT jsonb_agg(genre ORDER BY watch_time_sec DESC) + FROM ( + SELECT genre, SUM(watch_time_sec) AS watch_time_sec + FROM ( + -- movies + SELECT + genre_item.value AS genre, + SUM(a."PlaybackDuration") AS watch_time_sec + FROM jf_playback_activity a + JOIN jf_library_items i ON i."Id" = a."NowPlayingItemId" + CROSS JOIN LATERAL jsonb_array_elements_text(i."Genres") AS genre_item(value) + WHERE a."UserId" = p_user_id + AND a."SeriesName" IS NULL + AND i."Genres" IS NOT NULL + AND jsonb_array_length(i."Genres") > 0 + GROUP BY genre_item.value + + UNION ALL + + -- series + SELECT + genre_item.value AS genre, + SUM(a."PlaybackDuration") AS watch_time_sec + FROM jf_playback_activity a + JOIN jf_library_episodes e ON e."EpisodeId" = a."EpisodeId" + JOIN jf_library_items i ON i."Id" = e."SeriesId" + CROSS JOIN LATERAL jsonb_array_elements_text(i."Genres") AS genre_item(value) + WHERE a."UserId" = p_user_id + AND a."SeriesName" IS NOT NULL + AND i."Genres" IS NOT NULL + AND jsonb_array_length(i."Genres") > 0 + GROUP BY genre_item.value + ) combined + GROUP BY genre + ORDER BY SUM(watch_time_sec) DESC + LIMIT 3 + ) top3 + ), + '[]'::JSONB + ) AS value; +$$; \ No newline at end of file diff --git a/main.py b/main.py index 586ca45..67a4de9 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ from fastapi.middleware.cors import CORSMiddleware from gateway.v1.auth import router as auth_router from gateway.v1.chat import router as v1_router +from gateway.jellystat.api import router as jellystat_router from src.config import DEEPSEEK_API_KEY, get_config from src.llm import create_client @@ -33,11 +34,15 @@ import gateway.auth.jellyfin # noqa: E402 — self-registers JellyfinAuth @asynccontextmanager async def lifespan(app: FastAPI): from gateway.discord.bot import start_in_background # noqa: E402 + from gateway.jellystat.db import init_pool, close_pool # noqa: E402 + await init_pool(app) start_in_background() yield + await close_pool(app) + # --------------------------------------------------------------------------- # App @@ -64,4 +69,5 @@ app.state.agent_graphs: dict = {} # Routers # --------------------------------------------------------------------------- app.include_router(v1_router, prefix="/v1") -app.include_router(auth_router) \ No newline at end of file +app.include_router(auth_router) +app.include_router(jellystat_router) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index b62cbf6..f1afa9f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ httpx langgraph langgraph-checkpoint discord.py -python-multipart \ No newline at end of file +python-multipart +asyncpg \ No newline at end of file