implement JellyStat API for watch history, genre summary, and user summary; add PostgreSQL connection pool and update requirements
This commit is contained in:
@@ -0,0 +1,106 @@
|
||||
"""JellyStat REST API — watch history, genre summary, and user summary."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncpg
|
||||
from fastapi import APIRouter, Depends, Query
|
||||
|
||||
from gateway.jellystat.db import get_pool
|
||||
from gateway.jellystat.models import (
|
||||
GenreSummaryResponse,
|
||||
UserSummaryResponse,
|
||||
WatchHistoryResponse,
|
||||
)
|
||||
|
||||
router = APIRouter(prefix="/jellystat", tags=["jellystat"])
|
||||
|
||||
DEFAULT_WINDOW_MINUTES = 10080 # 7 days
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /jellystat/history/{user_id}
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("/history/{user_id}", response_model=WatchHistoryResponse)
|
||||
async def get_watch_history(
|
||||
user_id: str,
|
||||
minutes: int = Query(
|
||||
default=DEFAULT_WINDOW_MINUTES, ge=1, description="Time window in minutes"
|
||||
),
|
||||
pool: asyncpg.Pool = Depends(get_pool),
|
||||
):
|
||||
"""Return watch history grouped by title, ordered by most-watched first."""
|
||||
rows = await pool.fetch(
|
||||
"SELECT * FROM fn_user_watch_history($1, $2)", user_id, minutes
|
||||
)
|
||||
return WatchHistoryResponse(
|
||||
user_id=user_id,
|
||||
window_minutes=minutes,
|
||||
items=[
|
||||
{
|
||||
"title": r["title"],
|
||||
"watch_time_sec": float(r["watch_time_sec"]),
|
||||
"media_type": r["media_type"],
|
||||
}
|
||||
for r in rows
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /jellystat/genres/{user_id}
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("/genres/{user_id}", response_model=GenreSummaryResponse)
|
||||
async def get_genre_summary(
|
||||
user_id: str,
|
||||
minutes: int = Query(
|
||||
default=DEFAULT_WINDOW_MINUTES, ge=1, description="Time window in minutes"
|
||||
),
|
||||
pool: asyncpg.Pool = Depends(get_pool),
|
||||
):
|
||||
"""Return total watch time per genre, ordered by most-watched first."""
|
||||
rows = await pool.fetch(
|
||||
"SELECT * FROM fn_user_genre_summary($1, $2)", user_id, minutes
|
||||
)
|
||||
return GenreSummaryResponse(
|
||||
user_id=user_id,
|
||||
window_minutes=minutes,
|
||||
genres=[
|
||||
{"genre": r["genre"], "watch_time_sec": float(r["watch_time_sec"])}
|
||||
for r in rows
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /jellystat/summary/{user_id}
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("/summary/{user_id}", response_model=UserSummaryResponse)
|
||||
async def get_user_summary(
|
||||
user_id: str,
|
||||
pool: asyncpg.Pool = Depends(get_pool),
|
||||
):
|
||||
"""Return all-time summary: total watch time, most-watched titles, top genres."""
|
||||
rows = await pool.fetch("SELECT * FROM fn_user_summary($1)", user_id)
|
||||
|
||||
# fn_user_summary returns key-value rows — build a dict
|
||||
# asyncpg already deserialises JSONB → Python objects
|
||||
metrics: dict[str, object] = {r["metric"]: r["value"] for r in rows}
|
||||
|
||||
top_genres_raw = metrics.get("top_genres", [])
|
||||
top_genres: list[str] = top_genres_raw if isinstance(top_genres_raw, list) else []
|
||||
|
||||
return UserSummaryResponse(
|
||||
user_id=user_id,
|
||||
total_watch_time_sec=float(metrics.get("total_watch_time", 0)),
|
||||
most_watched_series=metrics.get("most_watched_series"),
|
||||
most_watched_movie=metrics.get("most_watched_movie"),
|
||||
total_last_30d_sec=float(metrics.get("total_last_30d", 0)),
|
||||
total_last_7d_sec=float(metrics.get("total_last_7d", 0)),
|
||||
top_genres=top_genres,
|
||||
)
|
||||
@@ -0,0 +1,130 @@
|
||||
"""PostgreSQL connection pool for the JellyStat database."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import asyncpg
|
||||
from fastapi import FastAPI, Request
|
||||
|
||||
from src.config import get_config
|
||||
|
||||
logger = logging.getLogger("gateway.jellystat")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DSN builder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _build_dsn() -> str:
|
||||
"""Build a PostgreSQL DSN from individual environment variables."""
|
||||
host = get_config("JELLYSTAT_DB_HOST", "localhost")
|
||||
port = get_config("JELLYSTAT_DB_PORT", "5432")
|
||||
user = get_config("JELLYSTAT_DB_USER", "postgres")
|
||||
password = get_config("JELLYSTAT_DB_PASSWORD", "")
|
||||
dbname = get_config("JELLYSTAT_DB_NAME", "jfstat")
|
||||
return f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pool lifecycle (called from main.py lifespan)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def init_pool(app: FastAPI) -> None:
|
||||
"""Create the connection pool and store it on app.state."""
|
||||
dsn = _build_dsn()
|
||||
safe = dsn.split("@")[1] if "@" in dsn else dsn
|
||||
logger.info("Connecting to JellyStat database at %s", safe)
|
||||
|
||||
pool = await asyncpg.create_pool(dsn, min_size=1, max_size=5)
|
||||
app.state.jellystat_pool = pool
|
||||
|
||||
# Deploy functions on every startup (CREATE OR REPLACE is idempotent)
|
||||
await _ensure_functions(pool)
|
||||
|
||||
|
||||
async def close_pool(app: FastAPI) -> None:
|
||||
"""Close the pool on shutdown."""
|
||||
pool: asyncpg.Pool | None = getattr(app.state, "jellystat_pool", None)
|
||||
if pool:
|
||||
await pool.close()
|
||||
logger.info("JellyStat pool closed")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FastAPI dependency
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def get_pool(request: Request) -> asyncpg.Pool:
|
||||
"""Return the JellyStat connection pool from app state."""
|
||||
return request.app.state.jellystat_pool
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Function deployment
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _ensure_functions(pool: asyncpg.Pool) -> None:
|
||||
"""Run startup-functions.sql to create or replace all JellyStat functions."""
|
||||
sql_path = Path(__file__).parent / "startup-functions.sql"
|
||||
if not sql_path.exists():
|
||||
logger.warning("startup-functions.sql not found — skipping function deployment")
|
||||
return
|
||||
|
||||
sql = sql_path.read_text()
|
||||
statements = _split_sql(sql)
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
for stmt in statements:
|
||||
try:
|
||||
await conn.execute(stmt)
|
||||
except Exception:
|
||||
# Log but don't crash — functions might already exist
|
||||
logger.exception("Failed to deploy SQL statement — continuing")
|
||||
|
||||
logger.info("JellyStat functions deployed (%d statements)", len(statements))
|
||||
|
||||
|
||||
def _split_sql(sql: str) -> list[str]:
|
||||
"""
|
||||
Split a multi-statement SQL string into individual statements.
|
||||
|
||||
Respects $$ dollar-quoting so that semicolons inside function bodies
|
||||
don't cause premature splits. Pure comment lines (starting with ``--``)
|
||||
outside dollar-quoted blocks are stripped.
|
||||
"""
|
||||
statements: list[str] = []
|
||||
current: list[str] = []
|
||||
in_dollar_quote = False
|
||||
|
||||
for line in sql.split("\n"):
|
||||
stripped = line.strip()
|
||||
|
||||
# Skip pure comment lines outside of dollar-quoted blocks
|
||||
if not in_dollar_quote and stripped.startswith("--"):
|
||||
continue
|
||||
|
||||
# Toggle dollar-quote state whenever we see $$
|
||||
if "$$" in line:
|
||||
in_dollar_quote = not in_dollar_quote
|
||||
|
||||
current.append(line)
|
||||
|
||||
# Statement terminator: semicolon at end of line, outside $$ block
|
||||
if not in_dollar_quote and line.rstrip().endswith(";"):
|
||||
stmt = "\n".join(current).strip()
|
||||
if stmt:
|
||||
statements.append(stmt)
|
||||
current = []
|
||||
|
||||
# Catch any trailing statement that wasn't terminated by a semicolon
|
||||
if current:
|
||||
stmt = "\n".join(current).strip()
|
||||
if stmt:
|
||||
statements.append(stmt)
|
||||
|
||||
return statements
|
||||
@@ -0,0 +1,36 @@
|
||||
"""Pydantic response models for the JellyStat API."""
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class WatchHistoryItem(BaseModel):
|
||||
title: str
|
||||
watch_time_sec: float
|
||||
media_type: str
|
||||
|
||||
|
||||
class WatchHistoryResponse(BaseModel):
|
||||
user_id: str
|
||||
window_minutes: int
|
||||
items: list[WatchHistoryItem]
|
||||
|
||||
|
||||
class GenreSummaryItem(BaseModel):
|
||||
genre: str
|
||||
watch_time_sec: float
|
||||
|
||||
|
||||
class GenreSummaryResponse(BaseModel):
|
||||
user_id: str
|
||||
window_minutes: int
|
||||
genres: list[GenreSummaryItem]
|
||||
|
||||
|
||||
class UserSummaryResponse(BaseModel):
|
||||
user_id: str
|
||||
total_watch_time_sec: float
|
||||
most_watched_series: str | None
|
||||
most_watched_movie: str | None
|
||||
total_last_30d_sec: float
|
||||
total_last_7d_sec: float
|
||||
top_genres: list[str]
|
||||
@@ -0,0 +1,224 @@
|
||||
-- ============================================================================
|
||||
-- JellyStat API Functions
|
||||
-- Parameterized database functions callable by the API layer as:
|
||||
-- SELECT * FROM fn_user_watch_history('user_id_here', 10080);
|
||||
-- SELECT * FROM fn_user_genre_summary('user_id_here', 10080);
|
||||
-- SELECT * FROM fn_user_summary('user_id_here');
|
||||
-- ============================================================================
|
||||
|
||||
-- ----------------------------------------------------------------------------
|
||||
-- 1. User Watch History
|
||||
-- Returns every distinct title watched in the last N minutes,
|
||||
-- grouped and summed by title, ordered by most-watched first.
|
||||
-- ----------------------------------------------------------------------------
|
||||
CREATE OR REPLACE FUNCTION public.fn_user_watch_history(
|
||||
p_user_id TEXT,
|
||||
p_minutes INTEGER DEFAULT 10080 -- 7 days in minutes
|
||||
)
|
||||
RETURNS TABLE(
|
||||
title TEXT,
|
||||
watch_time_sec NUMERIC,
|
||||
media_type TEXT
|
||||
)
|
||||
LANGUAGE sql
|
||||
STABLE
|
||||
AS $$
|
||||
SELECT
|
||||
COALESCE(a."SeriesName", a."NowPlayingItemName") AS title,
|
||||
SUM(a."PlaybackDuration")::NUMERIC AS watch_time_sec,
|
||||
CASE
|
||||
WHEN a."SeriesName" IS NOT NULL THEN 'series'
|
||||
ELSE 'movie'
|
||||
END AS media_type
|
||||
FROM jf_playback_activity a
|
||||
WHERE a."UserId" = p_user_id
|
||||
AND a."ActivityDateInserted"
|
||||
>= NOW() - (p_minutes * INTERVAL '1 minute')
|
||||
GROUP BY
|
||||
COALESCE(a."SeriesName", a."NowPlayingItemName"),
|
||||
CASE WHEN a."SeriesName" IS NOT NULL THEN 'series' ELSE 'movie' END
|
||||
ORDER BY watch_time_sec DESC;
|
||||
$$;
|
||||
|
||||
-- ----------------------------------------------------------------------------
|
||||
-- 2. Genre Summary
|
||||
-- Returns total watch time per genre for a user over the last N minutes.
|
||||
-- Resolves genres for both movies (directly on the item) and series
|
||||
-- episodes (via jf_library_episodes → jf_library_items chain).
|
||||
-- ----------------------------------------------------------------------------
|
||||
CREATE OR REPLACE FUNCTION public.fn_user_genre_summary(
|
||||
p_user_id TEXT,
|
||||
p_minutes INTEGER DEFAULT 10080
|
||||
)
|
||||
RETURNS TABLE(
|
||||
genre TEXT,
|
||||
watch_time_sec NUMERIC
|
||||
)
|
||||
LANGUAGE sql
|
||||
STABLE
|
||||
AS $$
|
||||
WITH movie_genres AS (
|
||||
-- Movies: join playback directly to library_items on NowPlayingItemId
|
||||
SELECT
|
||||
genre_item.value AS genre,
|
||||
SUM(a."PlaybackDuration") AS watch_time_sec
|
||||
FROM jf_playback_activity a
|
||||
JOIN jf_library_items i
|
||||
ON i."Id" = a."NowPlayingItemId"
|
||||
CROSS JOIN LATERAL jsonb_array_elements_text(i."Genres") AS genre_item(value)
|
||||
WHERE a."UserId" = p_user_id
|
||||
AND a."SeriesName" IS NULL -- movies only
|
||||
AND a."ActivityDateInserted"
|
||||
>= NOW() - (p_minutes * INTERVAL '1 minute')
|
||||
AND i."Genres" IS NOT NULL
|
||||
AND jsonb_array_length(i."Genres") > 0
|
||||
GROUP BY genre_item.value
|
||||
),
|
||||
series_genres AS (
|
||||
-- Series: playback → episodes → series item → genres
|
||||
SELECT
|
||||
genre_item.value AS genre,
|
||||
SUM(a."PlaybackDuration") AS watch_time_sec
|
||||
FROM jf_playback_activity a
|
||||
JOIN jf_library_episodes e
|
||||
ON e."EpisodeId" = a."EpisodeId"
|
||||
JOIN jf_library_items i
|
||||
ON i."Id" = e."SeriesId"
|
||||
CROSS JOIN LATERAL jsonb_array_elements_text(i."Genres") AS genre_item(value)
|
||||
WHERE a."UserId" = p_user_id
|
||||
AND a."SeriesName" IS NOT NULL -- TV episodes only
|
||||
AND a."ActivityDateInserted"
|
||||
>= NOW() - (p_minutes * INTERVAL '1 minute')
|
||||
AND i."Genres" IS NOT NULL
|
||||
AND jsonb_array_length(i."Genres") > 0
|
||||
GROUP BY genre_item.value
|
||||
),
|
||||
combined AS (
|
||||
SELECT genre, watch_time_sec FROM movie_genres
|
||||
UNION ALL
|
||||
SELECT genre, watch_time_sec FROM series_genres
|
||||
)
|
||||
SELECT
|
||||
genre,
|
||||
SUM(watch_time_sec)::NUMERIC AS watch_time_sec
|
||||
FROM combined
|
||||
GROUP BY genre
|
||||
ORDER BY watch_time_sec DESC;
|
||||
$$;
|
||||
|
||||
-- ----------------------------------------------------------------------------
|
||||
-- 3. User Summary
|
||||
-- One-shot dashboard: all-time stats + recent windows + top genres.
|
||||
-- Returns key-value rows that the API trivially converts to a JSON object
|
||||
-- with Object.fromEntries() or similar.
|
||||
-- ----------------------------------------------------------------------------
|
||||
CREATE OR REPLACE FUNCTION public.fn_user_summary(
|
||||
p_user_id TEXT
|
||||
)
|
||||
RETURNS TABLE(
|
||||
metric TEXT,
|
||||
value JSONB
|
||||
)
|
||||
LANGUAGE sql
|
||||
STABLE
|
||||
AS $$
|
||||
-- total_watch_time (all time)
|
||||
SELECT 'total_watch_time'::TEXT AS metric,
|
||||
to_jsonb(COALESCE(SUM("PlaybackDuration"), 0)::NUMERIC) AS value
|
||||
FROM jf_playback_activity
|
||||
WHERE "UserId" = p_user_id
|
||||
|
||||
UNION ALL
|
||||
|
||||
-- most_watched_series (by total watch time)
|
||||
SELECT 'most_watched_series'::TEXT AS metric,
|
||||
COALESCE(
|
||||
(SELECT to_jsonb("SeriesName")
|
||||
FROM jf_playback_activity
|
||||
WHERE "UserId" = p_user_id
|
||||
AND "SeriesName" IS NOT NULL
|
||||
GROUP BY "SeriesName"
|
||||
ORDER BY SUM("PlaybackDuration") DESC
|
||||
LIMIT 1),
|
||||
'null'::JSONB
|
||||
) AS value
|
||||
|
||||
UNION ALL
|
||||
|
||||
-- most_watched_movie (by total watch time)
|
||||
SELECT 'most_watched_movie'::TEXT AS metric,
|
||||
COALESCE(
|
||||
(SELECT to_jsonb("NowPlayingItemName")
|
||||
FROM jf_playback_activity
|
||||
WHERE "UserId" = p_user_id
|
||||
AND "SeriesName" IS NULL
|
||||
GROUP BY "NowPlayingItemName"
|
||||
ORDER BY SUM("PlaybackDuration") DESC
|
||||
LIMIT 1),
|
||||
'null'::JSONB
|
||||
) AS value
|
||||
|
||||
UNION ALL
|
||||
|
||||
-- total_watch_time_last_month (last 30 days)
|
||||
SELECT 'total_last_30d'::TEXT AS metric,
|
||||
to_jsonb(COALESCE(SUM("PlaybackDuration"), 0)::NUMERIC) AS value
|
||||
FROM jf_playback_activity
|
||||
WHERE "UserId" = p_user_id
|
||||
AND "ActivityDateInserted" >= NOW() - INTERVAL '30 days'
|
||||
|
||||
UNION ALL
|
||||
|
||||
-- total_watch_time_last_week (last 7 days)
|
||||
SELECT 'total_last_7d'::TEXT AS metric,
|
||||
to_jsonb(COALESCE(SUM("PlaybackDuration"), 0)::NUMERIC) AS value
|
||||
FROM jf_playback_activity
|
||||
WHERE "UserId" = p_user_id
|
||||
AND "ActivityDateInserted" >= NOW() - INTERVAL '7 days'
|
||||
|
||||
UNION ALL
|
||||
|
||||
-- top_genres (top 3 all-time, as a JSON array)
|
||||
SELECT 'top_genres'::TEXT AS metric,
|
||||
COALESCE(
|
||||
(SELECT jsonb_agg(genre ORDER BY watch_time_sec DESC)
|
||||
FROM (
|
||||
SELECT genre, SUM(watch_time_sec) AS watch_time_sec
|
||||
FROM (
|
||||
-- movies
|
||||
SELECT
|
||||
genre_item.value AS genre,
|
||||
SUM(a."PlaybackDuration") AS watch_time_sec
|
||||
FROM jf_playback_activity a
|
||||
JOIN jf_library_items i ON i."Id" = a."NowPlayingItemId"
|
||||
CROSS JOIN LATERAL jsonb_array_elements_text(i."Genres") AS genre_item(value)
|
||||
WHERE a."UserId" = p_user_id
|
||||
AND a."SeriesName" IS NULL
|
||||
AND i."Genres" IS NOT NULL
|
||||
AND jsonb_array_length(i."Genres") > 0
|
||||
GROUP BY genre_item.value
|
||||
|
||||
UNION ALL
|
||||
|
||||
-- series
|
||||
SELECT
|
||||
genre_item.value AS genre,
|
||||
SUM(a."PlaybackDuration") AS watch_time_sec
|
||||
FROM jf_playback_activity a
|
||||
JOIN jf_library_episodes e ON e."EpisodeId" = a."EpisodeId"
|
||||
JOIN jf_library_items i ON i."Id" = e."SeriesId"
|
||||
CROSS JOIN LATERAL jsonb_array_elements_text(i."Genres") AS genre_item(value)
|
||||
WHERE a."UserId" = p_user_id
|
||||
AND a."SeriesName" IS NOT NULL
|
||||
AND i."Genres" IS NOT NULL
|
||||
AND jsonb_array_length(i."Genres") > 0
|
||||
GROUP BY genre_item.value
|
||||
) combined
|
||||
GROUP BY genre
|
||||
ORDER BY SUM(watch_time_sec) DESC
|
||||
LIMIT 3
|
||||
) top3
|
||||
),
|
||||
'[]'::JSONB
|
||||
) AS value;
|
||||
$$;
|
||||
Reference in New Issue
Block a user