diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..9aa7aa5 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +.env +.venv/ +__pycache__/ +.git/ +.gitea/ +*.pyc +*.pyo +data/ diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..f753546 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,78 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Commands + +```bash +# Run the server (reads .env for config) +uvicorn main:app --host 0.0.0.0 --port 8000 + +# Verify code compiles and imports cleanly +python -c "import main; print('OK')" + +# Syntax check a specific file +python -m py_compile path/to/file.py + +# Docker build +docker build -t agents-api -f docker/Dockerfile . +``` + +There is no test suite or linting setup yet. + +## Architecture + +This is a **Discord bot** powered by a LangGraph agent with a pluggable skill system. The only interaction surface is Discord DMs — there is no web chat UI. + +``` +Discord DM → bot.py → LangGraph StateGraph → skills (tools) → external APIs + │ + REST API (auth status, JellyStat) +``` + +### Agent / skill system (`agents/`) + +- **Agents** (`agents/__init__.py`) are thin wrappers pairing a system prompt with a list of skill names. +- **Skills** (`agents/skills/__init__.py`) provide prompt fragments, OpenAI tool definitions, and an async `execute` callback. Each skill self-registers via `register()` at import time. +- Agents and skills are loaded at startup by `load_all_agents()` in `main.py`, which triggers side-effecting imports of all agent/skill modules. + +The media-agent (`agents/media_agent.py`) is the primary agent. Skills attached: +- `seerr` — search, trending, discover, request, submit issues via Seerr API +- `watch_history` — Jellyfin watch stats via the internal JellyStat API +- `media_info` — base persona (prompt-only) +- `triage` — fallback rules for unsupported actions (prompt-only) +- `easter_eggs` — theme-aware persona flavours (prompt-only) + +### LangGraph graph (`src/graph.py`) + +Two-node StateGraph: `agent_node → tool_node → agent_node`. The agent node calls DeepSeek (OpenAI-compatible) with system prompt + tool defs. The tool node executes tool calls through the skill system via `execute_tool()`. A custom tool node is used — not LangChain's ToolNode. + +State is `AgentState` (`src/state.py`): a TypedDict with `messages` (LangGraph `add_messages` reducer) and `discord_user_id`. + +### Discord bot (`gateway/discord/bot.py`) + +Runs in a background daemon thread with its own asyncio event loop (separate from FastAPI's). DMs only — no server/channel messages. Requires users to share a guild with the bot. Maintains per-user conversation history via `ConversationStore` (in-memory dict, last N exchanges). Supports `/login ` (Quick Connect) and `/logout `. + +### Auth system (`src/auth_store.py`, `gateway/auth/`) + +SQLite-backed (WAL mode) with a single `user_auth` table keyed on `(discord_user_id, service)`. Stores opaque service tokens, never passwords. `AuthService` is a minimal ABC with `name`/`display_name` properties; `JellyfinAuth` is the only implementation, using Jellyfin Quick Connect (initiate → poll → exchange secret for token). The auth gate in `execute_tool()` checks `skill.requires_auth` before executing tools. + +### REST API (`main.py`) + +Minimal — only two routers remain: +- `gateway/v1/auth.py` — `GET /api/v1/auth/Discord/status` (linked services lookup) and `POST /api/v1/auth/reset` (dev only) +- `gateway/jellystat/api.py` — `GET /jellystat/{history,genres,summary}/{user_id}` called internally by the `watch_history` skill + +### JellyStat (`gateway/jellystat/`) + +PostgreSQL connection pool stored on `app.state.jellystat_pool`. Database functions (`startup-functions.sql`) are deployed on startup via `CREATE OR REPLACE FUNCTION`. The watch_history skill calls these endpoints over HTTP (localhost) rather than querying the DB directly, keeping DB credentials isolated from the skill layer. + +### Seerr session caching (`agents/skills/seerr.py`) + +`httpx.AsyncClient` instances are cached per event loop (the Discord bot thread has its own loop separate from FastAPI). Cookie-based auth for Seerr is obtained once at startup via a sync login (thread-safe with double-check locking), then reused across all event-loop-specific clients. + +## Key patterns + +- **Self-registration**: agents, skills, and auth services all register at import time via module-level function calls. New modules just need to be imported once (see `load_all_agents()` and `import gateway.auth.jellyfin` in `main.py`). +- **Auth gate**: skills declare `requires_auth=["jellyfin"]` and the framework checks credentials before tool execution. Tools receive `_discord_user_id` injected into their args dict. +- **TMDb IDs as source of truth**: media tools display `[tmdb:123456]` tags and prefer IDs over title matching. The system prompt instructs the LLM to always show and use these IDs. diff --git a/agents/skills/easter_eggs.py b/agents/skills/easter_eggs.py index 5900c18..7aff077 100644 --- a/agents/skills/easter_eggs.py +++ b/agents/skills/easter_eggs.py @@ -84,9 +84,9 @@ THEMES = { "airplane", "plane", "flight", "pilot", "cockpit", "turbulence" ], "persona": ( - "mention airplane noises, and say that Erwin will be terrified " - "mention something airplane related, and hope Erwin is not around to see this reference." - "Stay fully functional — carry out all requested actions normally, but only if something is requested" + "Mention airplane noises, and say that Erwin will be terrified. " + "Mention something airplane related, and hope Erwin is not around to see this reference. " + "Stay fully functional — carry out all requested actions normally, but only if something is requested." ) }, @@ -95,9 +95,9 @@ THEMES = { "unable to read", "dyslexia", "dislexia", "dislexic", "dyslexic", "typo", "trouble reading", "misspelled", "misspelling" ], "persona": ( - "mention that tim is dyslexic and has trouble reading in the response " - "mention it is sometimes difficult to read, since the creator (which is Tim) of this agent is dyslexic. " - "Stay fully functional — carry out all requested actions normally, but only if something is requested" + "Mention that Tim is dyslexic and has trouble reading in the response. " + "Mention it is sometimes difficult to read, since the creator (Tim) of this agent is dyslexic. " + "Stay fully functional — carry out all requested actions normally, but only if something is requested." ) } } diff --git a/agents/skills/seerr.py b/agents/skills/seerr.py index 46269c2..84bf5d8 100644 --- a/agents/skills/seerr.py +++ b/agents/skills/seerr.py @@ -565,26 +565,12 @@ async def _discover(args: dict) -> ToolResult: r.raise_for_status() results = r.json().get("results", []) desc = genre - elif studio: - # /discover/studio/{studioId} requires a numeric TMDB studio ID. - # Fall back to searching by name via /search. - desc = studio - search_query = studio + elif studio or keyword: + # Both use /search with free-text query (studio needs a numeric TMDB + # studio ID not available here, so fall back to name search). + desc = studio or keyword endpoint = "/api/v1/search" - params["query"] = search_query - if language: - params["language"] = language - async with _client() as c: - r = await c.get(endpoint, params=params) - r.raise_for_status() - results = r.json().get("results", []) - # Filter to requested media type - results = [item for item in results if item.get("mediaType") == kind] - elif keyword: - # Free-text keyword → use /search, filtered by mediaType - desc = keyword - endpoint = "/api/v1/search" - params["query"] = keyword + params["query"] = desc if language: params["language"] = language async with _client() as c: @@ -616,7 +602,6 @@ async def _request_media(args: dict) -> ToolResult: async with _client() as c: # --- Fast-path: TMDb ID known — confirm the title and request directly --- if tmdb_id: - # Quick lookup to get the correct title for the confirmation message detail_r = await c.get(f"/api/v1/{kind}/{tmdb_id}") if detail_r.status_code == 200: detail = detail_r.json() @@ -626,11 +611,6 @@ async def _request_media(args: dict) -> ToolResult: or detail.get("firstAirDate", "")[:4] or "?" ) - else: - # Detail lookup failed — fall back to title search - pass - - if detail_r.status_code == 200: # Submit directly with the known TMDb ID request_body: dict = {"mediaType": kind, "mediaId": tmdb_id} if kind == "tv": @@ -651,6 +631,7 @@ async def _request_media(args: dict) -> ToolResult: f"❌ Failed to request **{media_title}** ({media_year}). " f"Seerr responded with status {req_r.status_code}: {req_r.text[:500]}" ) + # Detail lookup failed — fall through to slow-path title search # --- Slow-path: search by title --- r = await c.get("/api/v1/search", params={"query": quote(title), "page": 1}) diff --git a/gateway/auth/__init__.py b/gateway/auth/__init__.py index 2344e47..37ea6e4 100644 --- a/gateway/auth/__init__.py +++ b/gateway/auth/__init__.py @@ -10,17 +10,17 @@ Add a new service (Plex, Seerr, etc.) by: from __future__ import annotations from abc import ABC, abstractmethod -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import Optional # --------------------------------------------------------------------------- -# AuthResult — returned by AuthService.authenticate() +# AuthResult — returned by AuthService authentication # --------------------------------------------------------------------------- @dataclass class AuthResult: - """Outcome of a credential validation attempt.""" + """Outcome of an authentication attempt.""" success: bool external_user_id: Optional[str] = None external_name: Optional[str] = None @@ -38,8 +38,6 @@ class AuthService(ABC): Subclasses must implement: - name : unique identifier used in URLs and DB keys - display_name : human-readable label shown in Discord - - render_login_form(token, discord_id) → HTML string - - authenticate(form_data) → AuthResult """ @property @@ -54,22 +52,6 @@ class AuthService(ABC): """Human-readable: "Jellyfin", "Seerr", "Plex" """ ... - @abstractmethod - def render_login_form(self, token: str, discord_id: int) -> str: - """Return HTML string with a login form for this service. - - The form MUST include these hidden fields: - - - - """ - ... - - @abstractmethod - async def authenticate(self, form_data: dict) -> AuthResult: - """Validate credentials against the service. Return AuthResult.""" - ... - # --------------------------------------------------------------------------- # Global registry diff --git a/gateway/auth/jellyfin.py b/gateway/auth/jellyfin.py index 6a49efd..b3c9ef7 100644 --- a/gateway/auth/jellyfin.py +++ b/gateway/auth/jellyfin.py @@ -1,15 +1,10 @@ """ -Jellyfin AuthService — validates Jellyfin credentials and stores the session token. +Jellyfin AuthService — authenticates users via Jellyfin Quick Connect. -Two authentication flows: - 1. Quick Connect (primary): user enters a short code on their Jellyfin app. - - initiate_quick_connect() → {code, secret} - - poll_quick_connect(secret) → "Active" | "Authorized" | "Expired" - - authenticate_quick_connect(secret) → AuthResult with token - - 2. Username/password (legacy): renders an HTML form, called via the REST API. - - render_login_form(token, discord_id) → HTML string - - authenticate(form_data) → AuthResult +Flow: + 1. initiate_quick_connect() → {code, secret} + 2. poll_quick_connect(secret) → "Active" | "Authorized" | "Expired" + 3. authenticate_quick_connect(secret) → AuthResult with token """ from __future__ import annotations @@ -25,14 +20,6 @@ from src.config import get_config logger = logging.getLogger("auth.jellyfin") -# Emby-style authorization header required by Jellyfin's AuthenticateByName -_EMBY_HEADER = ( - 'MediaBrowser Client="AgentBot",' - 'Device="DiscordBot",' - 'DeviceId="agent-bot",' - 'Version="1.0"' -) - @dataclass class QuickConnectResult: @@ -64,9 +51,7 @@ class JellyfinAuth(AuthService): async def _resolve_url(self) -> str | None: """ - Resolve the Jellyfin server URL. - 1. Check JELLYFIN_URL env var (used in deployment). - 2. Check if user already has a stored auth with a URL (from legacy login). + Resolve the Jellyfin server URL from the JELLYFIN_URL env var. Returns None if no URL is configured. """ # First: explicit env var @@ -272,129 +257,6 @@ class JellyfinAuth(AuthService): error_message="An unexpected error occurred during authentication.", ) - # ------------------------------------------------------------------ - # Login form (legacy — used by the REST API) - # ------------------------------------------------------------------ - - def render_login_form(self, token: str, discord_id: int) -> str: - return f""" - - - - -Link Jellyfin - - - -

🔗 Link Jellyfin to Discord

-

Enter your Jellyfin server URL and credentials to link your account.

- -
- - - - - - - - - - - - - - -
- -""" - - # ------------------------------------------------------------------ - # Authentication - # ------------------------------------------------------------------ - - async def authenticate(self, form_data: dict) -> AuthResult: - url = form_data.get("jellyfin_url", "").strip().rstrip("/") - username = form_data.get("username", "").strip() - password = form_data.get("password", "").strip() - - if not url or not username or not password: - return AuthResult( - success=False, - error_message="All fields are required (URL, username, password).", - ) - - logger.info("Attempting Jellyfin login for '%s' on %s", username, url) - - async with httpx.AsyncClient(timeout=10) as client: - try: - resp = await client.post( - f"{url}/Users/AuthenticateByName", - json={"Username": username, "Pw": password}, - headers={"X-Emby-Authorization": _EMBY_HEADER}, - ) - if resp.status_code != 200: - logger.warning( - "Jellyfin login failed for '%s': HTTP %s", username, resp.status_code - ) - return AuthResult( - success=False, - error_message=f"Login failed — check your server URL and credentials.", - ) - - data = resp.json() - user = data.get("User", {}) - token = data.get("AccessToken", "") - - if not token: - return AuthResult( - success=False, - error_message="Jellyfin returned an unexpected response.", - ) - - logger.info( - "Jellyfin login OK: user=%s (%s)", - user.get("Name", "?"), - user.get("Id", "?"), - ) - - return AuthResult( - success=True, - external_user_id=user.get("Id", ""), - external_name=user.get("Name", username), - credentials={ - "token": token, - "url": url, - "user_id": user.get("Id", ""), - }, - ) - - except httpx.TimeoutException: - return AuthResult( - success=False, - error_message=f"Could not reach {url} — connection timed out. Check the URL.", - ) - except httpx.ConnectError: - return AuthResult( - success=False, - error_message=f"Could not connect to {url}. Is the server running?", - ) - except Exception as exc: - logger.exception("Unexpected error during Jellyfin login") - return AuthResult( - success=False, - error_message=f"An unexpected error occurred. Please try again.", - ) # Self-register at import time diff --git a/gateway/dependencies.py b/gateway/dependencies.py deleted file mode 100644 index 6dfaba8..0000000 --- a/gateway/dependencies.py +++ /dev/null @@ -1,36 +0,0 @@ -from fastapi import Request -from openai import OpenAI - -from src.graph import create_agent_graph - - -def get_llm_client(request: Request) -> OpenAI: - """FastAPI dependency — returns the singleton OpenAI client from app.state.""" - return request.app.state.llm_client - - -def get_agent_graph(agent_id: str, request: Request): - """ - FastAPI dependency — returns the compiled LangGraph graph for *agent_id*. - - Graphs are lazily compiled on first use and cached on app.state so each - agent's graph is only built once per process lifetime. - """ - cache: dict = request.app.state.agent_graphs - - if agent_id not in cache: - from agents import get as get_agent - - agent = get_agent(agent_id) - if agent is None: - # Fall back to the naked agent if the requested one doesn't exist - agent_id = "naked" - agent = get_agent(agent_id) - - cache[agent_id] = create_agent_graph( - client=request.app.state.llm_client, - agent_skills=agent.skills, - system_prompt=agent.build_system_prompt(), - ) - - return cache[agent_id] diff --git a/gateway/discord/bot.py b/gateway/discord/bot.py index 05da455..d35ff6b 100644 --- a/gateway/discord/bot.py +++ b/gateway/discord/bot.py @@ -207,9 +207,6 @@ class AgentBot(discord.Client): # --- Quick Connect flow --- svc = get_auth_service(service) - if svc is None: - await message.channel.send(f"Unknown service: {service}") - return True await message.channel.send(f"🔑 Starting **{svc.display_name}** Quick Connect…") diff --git a/gateway/v1/auth.py b/gateway/v1/auth.py index 0ca93b2..1402d28 100644 --- a/gateway/v1/auth.py +++ b/gateway/v1/auth.py @@ -1,156 +1,27 @@ """ -Auth API — generic endpoints for linking Discord users to external services. +Auth API — endpoints for checking linked services and dev operations. -GET /api/v1/auth/login?service=X&token=Y&discord_id=Z - Validates the link token and serves a service-specific login form. - -POST /api/v1/auth/login - Accepts the form submission, validates credentials against the service, - stores the session, and returns a result page. - -GET /api/v1/auth/status?discord_id=Z +GET /api/v1/auth/Discord/status?discord_id=Z Returns which services are linked for this Discord user. + +POST /api/v1/auth/reset + Wipes the auth store (dev only — requires ALLOW_AUTH_RESET=true). """ from __future__ import annotations import logging -from fastapi import APIRouter, Form, HTTPException, Request -from fastapi.responses import HTMLResponse +from fastapi import APIRouter, HTTPException -from gateway.auth import get_auth_service, list_auth_services from src import auth_store +from src.config import get_config logger = logging.getLogger("gateway.auth") router = APIRouter(prefix="/api/v1/auth", tags=["auth"]) -# --------------------------------------------------------------------------- -# GET /auth/login — serve the login form -# --------------------------------------------------------------------------- - -@router.get("/login") -async def login_form( - service: str, - token: str, - discord_id: int, -): - """Validate the one-time link token and return a service-specific login form.""" - - # Validate the token WITHOUT consuming it (the POST will consume it) - result = auth_store.validate_token(token) - if result is None: - raise HTTPException(status_code=400, detail="Invalid or expired link token.") - - uid, svc = result - if uid != discord_id or svc != service: - raise HTTPException(status_code=400, detail="Token does not match the request.") - - # Look up the AuthService - svc_obj = get_auth_service(service) - if svc_obj is None: - raise HTTPException(status_code=404, detail=f"Unknown service: {service}") - - logger.info("Serving login form: user=%s service=%s", discord_id, service) - return HTMLResponse(svc_obj.render_login_form(token, discord_id)) - - -# --------------------------------------------------------------------------- -# POST /auth/login — handle form submission -# --------------------------------------------------------------------------- - -@router.post("/login") -async def login_submit(request: Request): - """Handle the login form POST: validate credentials, store auth, show result.""" - - # Parse form data - form = await request.form() - token = form.get("token", "") - discord_id_str = form.get("discord_id", "") - service = form.get("service", "") - - if not token or not discord_id_str or not service: - raise HTTPException(status_code=400, detail="Missing required fields.") - - try: - discord_id = int(discord_id_str) - except (ValueError, TypeError): - raise HTTPException(status_code=400, detail="Invalid discord_id.") - - # Consume the token on POST (the GET only validated, didn't consume) - result = auth_store.consume_token(token) - if result is None: - raise HTTPException(status_code=400, detail="Invalid or expired link token.") - - # Look up the AuthService - svc_obj = get_auth_service(service) - if svc_obj is None: - raise HTTPException(status_code=404, detail=f"Unknown service: {service}") - - # Collect service-specific form fields (everything except token, discord_id, service) - form_data: dict[str, str] = {} - for key, value in form.items(): - if key not in ("token", "discord_id", "service"): - form_data[key] = str(value) - - # Authenticate against the service - auth_result = await svc_obj.authenticate(form_data) - - if not auth_result.success: - return HTMLResponse( - status_code=401, - content=f""" -Login Failed - -

❌ Login Failed

-

{auth_result.error_message or "Authentication failed. Please try again."}

-

← Go back and try again

-""", - ) - - # Store the successful auth - auth_store.store_auth( - discord_user_id=discord_id, - service=service, - external_user_id=auth_result.external_user_id or "", - external_name=auth_result.external_name or "", - credentials=auth_result.credentials, - ) - - logger.info( - "Auth linked: discord=%s → %s (%s)", - discord_id, - service, - auth_result.external_name, - ) - - return HTMLResponse(f""" - - - - -Account Linked - - - -

✅ Account Linked!

-

Logged in as {auth_result.external_name} on {svc_obj.display_name}.

-

You can close this page and return to Discord.

- -""") - - # --------------------------------------------------------------------------- # GET /auth/status — get all linked services for a Discord user # --------------------------------------------------------------------------- @@ -199,12 +70,10 @@ async def auth_status(discord_id: int): # POST /auth/reset — wipe auth store (DEV ONLY) # --------------------------------------------------------------------------- -from src.config import get_config # noqa: E402 - @router.post("/reset") async def reset_auth(): """ - Reset the entire auth store — clears all link tokens and user auth records. + Reset the entire auth store — clears all user auth records. Only enabled when ALLOW_AUTH_RESET=true in the environment. Returns 403 in production. @@ -217,4 +86,4 @@ async def reset_auth(): auth_store.reset_all() logger.warning("Auth store reset via API endpoint.") - return {"status": "ok", "message": "Auth store cleared — all tokens and auth records removed."} + return {"status": "ok", "message": "Auth store cleared — all auth records removed."} diff --git a/gateway/v1/chat.py b/gateway/v1/chat.py deleted file mode 100644 index 68cfd95..0000000 --- a/gateway/v1/chat.py +++ /dev/null @@ -1,241 +0,0 @@ -from fastapi import APIRouter, Depends, Request -from fastapi.responses import StreamingResponse -from openai import OpenAI -from pydantic import BaseModel -import json - -from gateway.dependencies import get_llm_client, get_agent_graph -from agents import get as get_agent, list_all as list_all_agents -from src.state import AgentState - -router = APIRouter() - - -class ChatRequest(BaseModel): - message: str - session_id: str | None = None - agent_id: str | None = None - - -class ChatCompletionRequest(BaseModel): - messages: list[dict] - stream: bool = False - model: str = "deepseek-chat" - - -# --------------------------------------------------------------------------- -# Agent resolution -# --------------------------------------------------------------------------- - -def _resolve_agent(agent_id: str | None = None, model: str | None = None): - """ - 1. explicit agent_id - 2. model field (OpenWebUI sends this — maps to agent_id if registered) - 3. fallback to "naked" - """ - lookup = agent_id or model - if lookup is None: - return get_agent("naked") - agent = get_agent(lookup) - return agent if agent else get_agent("naked") - - -# --------------------------------------------------------------------------- -# LangGraph helpers -# --------------------------------------------------------------------------- - -async def _invoke_graph(graph, messages: list[dict]) -> str: - """Run the graph synchronously (non-streaming) and return the final text.""" - state: AgentState = {"messages": messages} - result = await graph.ainvoke(state) - last_msg = result["messages"][-1] - return last_msg.content or "" - - -async def _stream_graph(graph, messages: list[dict]): - """ - Run the graph and stream the final response token-by-token. - - LangGraph's astream_events would require langchain-openai's ChatOpenAI - to intercept LLM chunks. Instead we run the graph to completion (tools - execute silently) and then stream the final text content character by - character — this gives the client a real SSE stream without adding new - dependencies. - """ - state: AgentState = {"messages": messages} - result = await graph.ainvoke(state) - content = result["messages"][-1].content or "" - # Yield token-by-token so the SSE client sees incremental output - for token in content: - yield token - - -# --------------------------------------------------------------------------- -# Non-streaming run (kept for /chat/sync and sync completions) -# --------------------------------------------------------------------------- - -async def run_agent_with_tools( - request: Request, - messages: list[dict], - agent_id: str | None = None, - model: str | None = None, -) -> str: - """Send messages through the agent's LangGraph. Non-streaming.""" - agent = _resolve_agent(agent_id, model) - graph = get_agent_graph(agent.agent_id, request) - return await _invoke_graph(graph, messages) - - -# --------------------------------------------------------------------------- -# Streaming generator (kept for /chat and stream completions) -# --------------------------------------------------------------------------- - -async def run_agent_stream( - request: Request, - messages: list[dict], - agent_id: str | None = None, - model: str | None = None, -): - """Async generator — yields tokens via the agent's LangGraph.""" - agent = _resolve_agent(agent_id, model) - graph = get_agent_graph(agent.agent_id, request) - async for token in _stream_graph(graph, messages): - yield token - - -# --------------------------------------------------------------------------- -# Endpoints -# --------------------------------------------------------------------------- - -@router.get("/") -def root(): - return {"status": "ok"} - - -@router.post("/chat") -async def chat( - req: ChatRequest, - request: Request, - client: OpenAI = Depends(get_llm_client), -): - """Streaming chat — single message, no history.""" - messages = [{"role": "user", "content": req.message}] - - async def event_stream(): - async for token in run_agent_stream(request, messages, req.agent_id): - payload = json.dumps({"token": token, "session_id": req.session_id}) - yield f"data: {payload}\n\n" - yield f"data: {json.dumps({'done': True, 'session_id': req.session_id})}\n\n" - - return StreamingResponse( - event_stream(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "X-Accel-Buffering": "no", - }, - ) - - -@router.post("/chat/sync") -async def chat_sync( - req: ChatRequest, - request: Request, - client: OpenAI = Depends(get_llm_client), -): - """Non-streaming chat — single message.""" - messages = [{"role": "user", "content": req.message}] - response = await run_agent_with_tools(request, messages, req.agent_id) - return {"response": response, "session_id": req.session_id} - - -@router.get("/agents") -def list_agents(): - """Return all registered agents.""" - return { - "agents": [ - { - "agent_id": a.agent_id, - "description": a.description, - "skills": a.skills, - } - for a in list_all_agents().values() - ] - } - - -@router.get("/models") -def list_models(): - """Return agents as selectable models for OpenWebUI.""" - return { - "object": "list", - "data": [ - { - "id": a.agent_id, - "object": "model", - "created": 0, - "owned_by": "local-agent", - } - for a in list_all_agents().values() - ], - } - - -@router.post("/chat/completions") -async def chat_completions( - req: ChatCompletionRequest, - request: Request, - client: OpenAI = Depends(get_llm_client), -): - """OpenAI-compatible /chat/completions — supports stream=True. - Multi-turn: req.messages contains the FULL conversation history. - Agent resolved from the model field (OpenWebUI sends this). - """ - agent = _resolve_agent(model=req.model) - - if req.stream: - async def sse_stream(): - async for token in run_agent_stream( - request, req.messages, agent_id=agent.agent_id, - ): - chunk = { - "id": "chatcmpl-local", - "object": "chat.completion.chunk", - "choices": [ - {"index": 0, "delta": {"content": token}, "finish_reason": None} - ], - } - yield f"data: {json.dumps(chunk)}\n\n" - final_chunk = { - "id": "chatcmpl-local", - "object": "chat.completion.chunk", - "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], - } - yield f"data: {json.dumps(final_chunk)}\n\n" - yield "data: [DONE]\n\n" - - return StreamingResponse( - sse_stream(), - media_type="text/event-stream", - headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}, - ) - - # Non-streaming — full history, LangGraph agent - response = await run_agent_with_tools( - request, req.messages, agent_id=agent.agent_id, - ) - - return { - "id": "chatcmpl-local", - "object": "chat.completion", - "created": 0, - "model": req.model, - "choices": [ - { - "index": 0, - "message": {"role": "assistant", "content": response}, - "finish_reason": "stop", - } - ], - } diff --git a/main.py b/main.py index 67a4de9..8dc7b04 100644 --- a/main.py +++ b/main.py @@ -5,10 +5,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from gateway.v1.auth import router as auth_router -from gateway.v1.chat import router as v1_router from gateway.jellystat.api import router as jellystat_router -from src.config import DEEPSEEK_API_KEY, get_config -from src.llm import create_client # --------------------------------------------------------------------------- # Logging — tool calls will appear in the uvicorn console @@ -57,17 +54,8 @@ app.add_middleware( allow_headers=["*"], ) -# --------------------------------------------------------------------------- -# Singletons (stored on app.state so every module can reach them via Depends) -# --------------------------------------------------------------------------- -app.state.llm_client = create_client(DEEPSEEK_API_KEY) - -# Lazy-compiled LangGraph graphs — populated on first use per agent -app.state.agent_graphs: dict = {} - # --------------------------------------------------------------------------- # Routers # --------------------------------------------------------------------------- -app.include_router(v1_router, prefix="/v1") app.include_router(auth_router) app.include_router(jellystat_router) \ No newline at end of file diff --git a/src/auth_store.py b/src/auth_store.py index f14864f..f33ba87 100644 --- a/src/auth_store.py +++ b/src/auth_store.py @@ -1,23 +1,17 @@ """ Auth Store — SQLite-backed persistence for Discord-to-service authentication. -Two tables: - - link_tokens : one-time tokens sent via Discord DM to initiate login - - user_auth : per-user, per-service credentials (Jellyfin token, etc.) - +Stores per-user, per-service credentials (Jellyfin access tokens, etc.). Thread-safe via WAL mode and a shared lock. No passwords are ever stored -— only opaque service tokens (e.g. Jellyfin AccessToken). +— only opaque service tokens. """ from __future__ import annotations import logging -import secrets import sqlite3 import threading -from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import Optional from src.config import get_config @@ -27,7 +21,6 @@ logger = logging.getLogger("auth_store") # Config # --------------------------------------------------------------------------- AUTH_DB_PATH = get_config("AUTH_DB_PATH", "data/auth.db") -TOKEN_EXPIRY_MINUTES = int(get_config("AUTH_TOKEN_EXPIRY", "10")) # --------------------------------------------------------------------------- # Singleton handle @@ -54,8 +47,6 @@ def _resolve_path() -> Path: def _get_conn() -> sqlite3.Connection: """Return a thread-local connection to the auth database.""" - import sqlite3 - conn = sqlite3.connect(str(_resolve_path()), check_same_thread=False) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") @@ -68,15 +59,6 @@ def _get_conn() -> sqlite3.Connection: # --------------------------------------------------------------------------- _SCHEMA = """ -CREATE TABLE IF NOT EXISTS link_tokens ( - token TEXT PRIMARY KEY, - discord_user_id INTEGER NOT NULL, - service TEXT NOT NULL, - expires_at TEXT NOT NULL, - used INTEGER DEFAULT 0, - created_at TEXT DEFAULT (datetime('now')) -); - CREATE TABLE IF NOT EXISTS user_auth ( discord_user_id INTEGER NOT NULL, service TEXT NOT NULL, @@ -107,97 +89,6 @@ def _ensure_schema() -> None: logger.info("Auth store initialized at %s", _resolve_path()) -# --------------------------------------------------------------------------- -# Public API — Link Tokens -# --------------------------------------------------------------------------- - -def create_token(discord_user_id: int, service: str) -> str: - """Generate a one-time link token. Expires after TOKEN_EXPIRY_MINUTES.""" - _ensure_schema() - token = secrets.token_urlsafe(32) - expires = (datetime.now(timezone.utc) + timedelta(minutes=TOKEN_EXPIRY_MINUTES)).isoformat() - - with _db_lock: - conn = _get_conn() - conn.execute( - "INSERT INTO link_tokens (token, discord_user_id, service, expires_at) VALUES (?, ?, ?, ?)", - (token, discord_user_id, service, expires), - ) - conn.commit() - conn.close() - - logger.info("Created link token for user %s / service %s", discord_user_id, service) - return token - - -def validate_token(token: str) -> tuple[int, str] | None: - """Read-only validation — does NOT consume the token. - - Returns (discord_user_id, service) if the token exists, is unused, - and has not expired. Returns None otherwise. - """ - _ensure_schema() - - with _db_lock: - conn = _get_conn() - row = conn.execute( - "SELECT discord_user_id, service, used, expires_at FROM link_tokens WHERE token = ?", - (token,), - ).fetchone() - conn.close() - - if row is None: - return None - if row["used"]: - return None - - expires = datetime.fromisoformat(row["expires_at"]) - if datetime.now(timezone.utc) > expires: - return None - - return (row["discord_user_id"], row["service"]) - - -def consume_token(token: str) -> tuple[int, str] | None: - """Validate and consume a link token. Returns (discord_user_id, service) or None. - - A token is valid if: - - It exists - - It has not been used - - It has not expired - """ - _ensure_schema() - - with _db_lock: - conn = _get_conn() - row = conn.execute( - "SELECT discord_user_id, service, used, expires_at FROM link_tokens WHERE token = ?", - (token,), - ).fetchone() - - if row is None: - conn.close() - return None - - if row["used"]: - conn.close() - logger.warning("Token already used: %s…", token[:8]) - return None - - expires = datetime.fromisoformat(row["expires_at"]) - if datetime.now(timezone.utc) > expires: - conn.close() - logger.warning("Token expired: %s…", token[:8]) - return None - - conn.execute("UPDATE link_tokens SET used = 1 WHERE token = ?", (token,)) - conn.commit() - result = (row["discord_user_id"], row["service"]) - conn.close() - logger.info("Token consumed: %s… → user=%s service=%s", token[:8], result[0], result[1]) - return result - - # --------------------------------------------------------------------------- # Public API — User Auth # --------------------------------------------------------------------------- @@ -350,9 +241,8 @@ def reset_all() -> None: with _db_lock: conn = _get_conn() - conn.execute("DELETE FROM link_tokens") conn.execute("DELETE FROM user_auth") conn.commit() conn.close() - logger.warning("Auth store RESET — all tokens and auth records cleared.") + logger.warning("Auth store RESET — all auth records cleared.") diff --git a/src/tools_adapter.py b/src/tools_adapter.py deleted file mode 100644 index ee393b4..0000000 --- a/src/tools_adapter.py +++ /dev/null @@ -1,51 +0,0 @@ -""" -Tools adapter — bridges the existing skill/tool system with LangGraph's ToolNode. - -LangGraph's ToolNode expects callable tools (typically @tool-decorated functions). -This module wraps our skill-based tool definitions and async executors so -ToolNode can invoke them without any changes to the skills/ layer. -""" - -from __future__ import annotations - -import json -from typing import Any - -from langchain_core.tools import tool - -from agents.skills import get_all_tools, execute_tool - - -def build_langgraph_tools(skill_names: list[str]) -> list: - """ - Convert the registered skill tool definitions into LangChain-compatible - @tool-decorated functions that ToolNode can call. - - Each tool wraps the existing `execute_tool()` pipeline, so the skill - system's ToolResult + httpx session handling is fully preserved. - """ - tool_defs = get_all_tools(skill_names) - wrapped: list = [] - - for td in tool_defs: - fn_def = td.get("function", {}) - fn_name = fn_def.get("name", "") - fn_desc = fn_def.get("description", "") - - # Create a unique factory so each closure captures the right fn_name - def _make_tool(name: str, desc: str, skills: list[str]): - @tool(name, description=desc) - async def _wrapped(**kwargs: Any) -> str: - """Execute the tool via the skill system and return its content.""" - result = await execute_tool(skills, name, kwargs) - if result is None: - return f"Tool '{name}' is not available." - return result.content - - # Stash the original OpenAI schema so LangGraph can use it - _wrapped.metadata = fn_def - return _wrapped - - wrapped.append(_make_tool(fn_name, fn_desc, skill_names)) - - return wrapped