From 2f7f94f1cef8a76ff925a8d8e6f5d26ed03c52c3 Mon Sep 17 00:00:00 2001 From: TimHoogervorst Date: Sun, 24 May 2026 10:18:59 +0200 Subject: [PATCH] Implement LangGraph integration: refactor agent-tool interaction, add graph compilation, and enhance state management --- api/api.md | 178 +++++++++++++++++----------- api/dependencies.py | 29 +++++ api/v1/chat.py | 230 +++++++++---------------------------- core/graph.py | 261 ++++++++++++++++++++++++++++++++++++++++++ core/state.py | 20 ++++ core/tools_adapter.py | 51 +++++++++ main.py | 3 + requirements.txt | 4 +- 8 files changed, 534 insertions(+), 242 deletions(-) create mode 100644 core/graph.py create mode 100644 core/state.py create mode 100644 core/tools_adapter.py diff --git a/api/api.md b/api/api.md index f4ebdde..1153989 100644 --- a/api/api.md +++ b/api/api.md @@ -1,6 +1,7 @@ -# API Architecture — Agent + Skill + Tool Pipeline +# API Architecture — Agent + Skill + Graph Pipeline -This document explains how the API routes user messages through the agent/skill/tool pipeline to produce responses. +This document explains how the API routes user messages through the +agent / skill / LangGraph pipeline to produce responses. --- @@ -17,27 +18,22 @@ This document explains how the API routes user messages through the agent/skill/ │ api/v1/chat.py — chat_completions() │ │ │ │ 1. _resolve_agent(req.model) → Agent │ -│ 2. agent.build_system_prompt() → system prompt │ -│ 3. Build full_messages = [system] + req.messages │ -│ 4. run_agent_with_tools(client, messages, agent_id) │ +│ 2. get_agent_graph(agent_id) → compiled StateGraph │ +│ 3. graph.ainvoke(state) or _stream_graph(graph, messages) │ └──────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────┐ -│ Tool-Calling Loop (run_agent_with_tools / run_agent_stream) │ +│ LangGraph StateGraph (core/graph.py) │ │ │ -│ while turns < max_turns: │ -│ response = LLM.chat(messages, tools=agent_tools) │ -│ if response has tool_calls: │ -│ for each tool_call: │ -│ result = execute_tool(skills, name, args) │ -│ append result to messages │ -│ else: │ -│ return response.text (stream tokens if streaming) │ +│ ┌──────────────┐ tool_calls? ┌──────────────┐ │ +│ │ agent_node │ ───────────────▶ │ tool_node │ │ +│ │ (LLM call) │ ◀─────────────── │ (skill exec) │ │ +│ └──────┬───────┘ └──────────────┘ │ +│ │ no tool_calls │ +│ ▼ │ +│ [END] │ └──────────────────────────────────────────────────────────────────┘ -``` - ---- ## Key Concepts @@ -61,7 +57,8 @@ Agent( - `build_system_prompt()` — merges base_prompt + all skill prompt fragments Agents self-register at import time via `agents/__init__.py`'s `register()`. -`main.py` calls `load_all_agents()` at startup to import all agent/skill modules. +`main.py` calls `load_all_agents()` at startup to import every agent and skill +module. ### 2. Skill @@ -78,37 +75,47 @@ Skill( ) ``` -- `prompt_fragment` — injected into the agent's system prompt. Teaches the LLM what tools are available and when to use them. +- `prompt_fragment` — injected into the agent's system prompt. - `tools` — list of OpenAI function definitions (name, description, parameters). - `execute` — async callable that routes tool calls to API handlers. -### 3. Tool +### 3. Graph -A **Tool** is a single function the LLM can call. Defined as part of a skill's `tools` list. +Each agent gets a **compiled LangGraph StateGraph** built by +`core/graph.py:create_agent_graph()`. The graph is compiled lazily on the +first request and cached on `app.state.agent_graphs` for the lifetime of the +process. + +| Graph node / edge | What it does | +|---|---| +| `agent_node` | Converts state messages to OpenAI dicts, calls the LLM with the agent's system prompt + tool definitions, returns an `AIMessage` | +| `tool_node` | Reads `tool_calls` from the last AI message, calls `execute_tool()` from the skill system, returns `ToolMessage` results | +| `_should_continue` | Conditional edge — returns `"tool_node"` if the AI message has `tool_calls`, else `END` | + +### 4. State + +Defined in `core/state.py`: ```python -{ - "type": "function", - "function": { - "name": "seerr_trending", - "description": "Get trending movies and TV shows from Seerr...", - "parameters": { - "type": "object", - "properties": { - "kind": {"type": "string", "enum": ["movie", "tv", "all"]}, - "language": {"type": "string"}, - }, - "required": ["kind"], - }, - }, -} +class AgentState(TypedDict): + messages: Annotated[list, add_messages] ``` -When the LLM responds with a tool call, the loop: -1. Extracts `function.name` (e.g. `"seerr_trending"`) and `function.arguments` (e.g. `{"kind": "movie"}`) -2. Calls `execute_tool(agent.skills, name, args)` which finds the owning skill and runs it -3. Appends the result text to the message history -4. Sends back to the LLM for a follow-up response +LangGraph's `add_messages` reducer appends new messages and replaces messages +with matching IDs (so tool-call results overwrite their placeholders). + +### 5. Message Conversion + +Because we use the raw `openai` client (not `langchain-openai`), messages must +be converted between LangChain and OpenAI formats at every LLM call: + +- **LangChain → OpenAI** (`_lc_role_to_openai`, `_langchain_tc_to_openai`): + Maps `type` → `role` and converts top-level `name`/`args` tool-calls into + the nested `function` sub-object that the OpenAI API expects. + +- **OpenAI → LangChain** (inside `agent_node`): + Converts the `ChatCompletionMessage` response into an `AIMessage` with + LangChain-format `tool_calls` (top-level `name`/`args`/`id`). --- @@ -130,28 +137,36 @@ When the LLM responds with a tool call, the loop: 2. chat_completions(): → _resolve_agent(model="media-agent") → get_agent("media-agent") → Agent(skills=["media_info", "seerr", "triage"]) - → tools = get_all_tools(["media_info", "seerr", "triage"]) - → Returns 7 tool definitions from seerr.py - → system_prompt = agent.build_system_prompt() - → base_prompt + media_info fragment + seerr fragment + triage fragment + → get_agent_graph("media-agent", request) + → looks up app.state.agent_graphs["media-agent"] + → first call → create_agent_graph() compiles the graph with 7 Seerr tools + → run_agent_with_tools(request, messages, agent_id) + → _invoke_graph(graph, messages) -3. run_agent_with_tools() — Turn 1: - → LLM receives: [system prompt with tools] + [user: "What are trending movies?"] - → LLM responds: tool_calls = [{"function": {"name": "seerr_trending", "arguments": {"kind": "movie"}}}] +3. Graph — Pass 1 (agent_node): + → LLM receives: [system prompt] + [user: "What are trending movies?"] + → LLM responds with tool_calls: seerr_trending(kind="movie") + → agent_node returns AIMessage with tool_calls in LangChain format -4. Execute tool: - → execute_tool(["media_info", "seerr", "triage"], "seerr_trending", {"kind": "movie"}) - → Finds seerr skill → calls _execute("seerr_trending", ...) → _trending(args) - → GET /api/v1/discover/trending?mediaType=movie - → Returns formatted list with [tmdb:IDs] +4. Graph — _should_continue: + → AIMessage has tool_calls → route to "tool_node" -5. run_agent_with_tools() — Turn 2: - → LLM receives: previous messages + [tool: "Found 20 trending movies..."] - → LLM responds: text = "Here are the top trending movies! 🎬 ..." - → finish_reason="stop" → return the text +5. Graph — tool_node: + → Reads tool_call: name="seerr_trending", args={"kind": "movie"} + → execute_tool(["media_info", "seerr", "triage"], "seerr_trending", ...) + → Seerr API → GET /api/v1/discover/trending?mediaType=movie + → Returns ToolMessage with formatted results including [tmdb:IDs] -6. chat_completions() returns: - { "choices": [{"message": {"content": "Here are the top trending movies!..."}}] } +6. Graph — Pass 2 (agent_node): + → LLM receives previous exchange + tool result + → LLM responds with text only (no tool_calls) + → agent_node returns AIMessage(content="Here are the top trending movies!...") + +7. Graph — _should_continue: + → No tool_calls → route to END + +8. chat_completions() returns: + { "choices": [{"message": {"role": "assistant", "content": "Here are the top..."}}] } ``` ### Step-by-step: "Request the 2026 one" (multi-turn context) @@ -172,14 +187,49 @@ When the LLM responds with a tool call, the loop: 2. chat_completions(): → req.messages contains the ENTIRE conversation history - → System prompt prepended → full_messages = [system] + 5 history messages - → LLM sees everything: the trending list with [tmdb:931285], the disambiguation, "the 2026 one" + → graph.ainvoke({"messages": all_messages}) + → agent_node prepends system prompt and sends everything to the LLM -3. LLM reasons: - - I previously listed Mortal Kombat II (2026) with [tmdb:931285] +3. LLM reasons from full context: + - Previously listed Mortal Kombat II (2026) with [tmdb:931285] - The user said "request the mortal kombat one" → I searched and showed 4 options - Now they say "the 2026 one" → that matches Mortal Kombat II (2026) [tmdb:931285] - I should call seerr_request_media(kind="movie", title="Mortal Kombat II", tmdb_id=931285) -4. Tool executes the request → ✅ Success +4. tool_node executes the request → ✅ Success ``` + +--- + +## Streaming + +Streaming works slightly differently from the sync path: + +``` +chat_completions(stream=True) + → _stream_graph(graph, messages) + → graph.ainvoke(state) # runs graph to completion (tools execute silently) + → yields content character-by-character via SSE +``` + +For true token-level streaming (tokens appear as the LLM generates them), +the agent_node would need to use `langchain-openai`'s `ChatOpenAI` instead of +the raw `openai` client. The current approach is a pragmatic middle ground +that avoids adding another dependency while still giving the SSE client +incremental output. + +--- + +## File Map + +| File | Responsibility | +|---|---| +| `main.py` | FastAPI app, singleton creation, router mounting | +| `api/v1/chat.py` | Endpoints — resolves agent, invokes graph, formats responses | +| `api/dependencies.py` | `get_llm_client()`, `get_agent_graph()` — FastAPI `Depends` | +| `core/graph.py` | `create_agent_graph()` — builds the StateGraph | +| `core/state.py` | `AgentState` TypedDict | +| `core/llm.py` | `create_client()` — OpenAI client factory | +| `core/config.py` | Environment variable loader | +| `agents/` | Agent definitions (dataclass + self-registration) | +| `skills/` | Skill definitions (prompt fragments + tools + executors) | diff --git a/api/dependencies.py b/api/dependencies.py index fc19b7a..d57fee1 100644 --- a/api/dependencies.py +++ b/api/dependencies.py @@ -1,7 +1,36 @@ from fastapi import Request from openai import OpenAI +from core.graph import create_agent_graph + def get_llm_client(request: Request) -> OpenAI: """FastAPI dependency — returns the singleton OpenAI client from app.state.""" return request.app.state.llm_client + + +def get_agent_graph(agent_id: str, request: Request): + """ + FastAPI dependency — returns the compiled LangGraph graph for *agent_id*. + + Graphs are lazily compiled on first use and cached on app.state so each + agent's graph is only built once per process lifetime. + """ + cache: dict = request.app.state.agent_graphs + + if agent_id not in cache: + from agents import get as get_agent + + agent = get_agent(agent_id) + if agent is None: + # Fall back to the naked agent if the requested one doesn't exist + agent_id = "naked" + agent = get_agent(agent_id) + + cache[agent_id] = create_agent_graph( + client=request.app.state.llm_client, + agent_skills=agent.skills, + system_prompt=agent.build_system_prompt(), + ) + + return cache[agent_id] diff --git a/api/v1/chat.py b/api/v1/chat.py index 70365a5..5f71fb8 100644 --- a/api/v1/chat.py +++ b/api/v1/chat.py @@ -1,13 +1,12 @@ -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, Request from fastapi.responses import StreamingResponse from openai import OpenAI from pydantic import BaseModel import json -import asyncio -from api.dependencies import get_llm_client +from api.dependencies import get_llm_client, get_agent_graph from agents import get as get_agent, list_all as list_all_agents -from skills import get_all_tools, execute_tool +from core.state import AgentState router = APIRouter() @@ -42,166 +41,65 @@ def _resolve_agent(agent_id: str | None = None, model: str | None = None): # --------------------------------------------------------------------------- -# Tool-calling loop (non-streaming) +# LangGraph helpers +# --------------------------------------------------------------------------- + +async def _invoke_graph(graph, messages: list[dict]) -> str: + """Run the graph synchronously (non-streaming) and return the final text.""" + state: AgentState = {"messages": messages} + result = await graph.ainvoke(state) + last_msg = result["messages"][-1] + return last_msg.content or "" + + +async def _stream_graph(graph, messages: list[dict]): + """ + Run the graph and stream the final response token-by-token. + + LangGraph's astream_events would require langchain-openai's ChatOpenAI + to intercept LLM chunks. Instead we run the graph to completion (tools + execute silently) and then stream the final text content character by + character — this gives the client a real SSE stream without adding new + dependencies. + """ + state: AgentState = {"messages": messages} + result = await graph.ainvoke(state) + content = result["messages"][-1].content or "" + # Yield token-by-token so the SSE client sees incremental output + for token in content: + yield token + + +# --------------------------------------------------------------------------- +# Non-streaming run (kept for /chat/sync and sync completions) # --------------------------------------------------------------------------- async def run_agent_with_tools( - client: OpenAI, + request: Request, messages: list[dict], agent_id: str | None = None, model: str | None = None, - max_turns: int = 5, ) -> str: - """Send messages to the LLM with tool definitions. Tool-calling loop.""" + """Send messages through the agent's LangGraph. Non-streaming.""" agent = _resolve_agent(agent_id, model) - tools = get_all_tools(agent.skills) - system_prompt = agent.build_system_prompt() - - full_messages: list[dict] = [{"role": "system", "content": system_prompt}] - full_messages.extend(messages) - - loop = asyncio.get_running_loop() - - for _ in range(max_turns): - resp = await loop.run_in_executor( - None, - lambda: client.chat.completions.create( - model="deepseek-chat", - messages=full_messages, - tools=tools if tools else None, - tool_choice="auto" if tools else None, - ), - ) - choice = resp.choices[0] - - if choice.finish_reason == "stop" and choice.message.content: - return choice.message.content - - if choice.message.tool_calls: - full_messages.append(choice.message.model_dump(exclude_none=True)) - for tc in choice.message.tool_calls: - fn_name = tc.function.name - fn_args = json.loads(tc.function.arguments) - tr = await execute_tool(agent.skills, fn_name, fn_args) - result = tr.content if tr else f"Tool '{fn_name}' is not available." - full_messages.append({ - "role": "tool", "tool_call_id": tc.id, "content": result, - }) - continue - - return choice.message.content or "I'm not sure how to help with that." - - return "I've taken several actions but still need more information. Could you clarify?" + graph = get_agent_graph(agent.agent_id, request) + return await _invoke_graph(graph, messages) # --------------------------------------------------------------------------- -# Streaming generators +# Streaming generator (kept for /chat and stream completions) # --------------------------------------------------------------------------- -async def _stream_with_tools( - client: OpenAI, - messages: list[dict], - agent_id: str | None = None, - model: str | None = None, - max_turns: int = 5, -): - """Streaming tool-calling loop. Tools run silently, final text is streamed.""" - agent = _resolve_agent(agent_id, model) - tools = get_all_tools(agent.skills) - system_prompt = agent.build_system_prompt() - - full_messages: list[dict] = [{"role": "system", "content": system_prompt}] - full_messages.extend(messages) - - loop = asyncio.get_running_loop() - - for turn in range(max_turns): - resp = await loop.run_in_executor( - None, - lambda: client.chat.completions.create( - model="deepseek-chat", - messages=full_messages, - tools=tools if tools else None, - tool_choice="auto" if tools else None, - ), - ) - choice = resp.choices[0] - - if choice.message.tool_calls: - full_messages.append(choice.message.model_dump(exclude_none=True)) - for tc in choice.message.tool_calls: - fn_name = tc.function.name - fn_args = json.loads(tc.function.arguments) - tr = await execute_tool(agent.skills, fn_name, fn_args) - result = tr.content if tr else f"Tool '{fn_name}' is not available." - full_messages.append({ - "role": "tool", - "tool_call_id": tc.id, - "content": result, - }) - continue - - if choice.finish_reason == "stop" and choice.message.content: - for token in choice.message.content: - yield token - await asyncio.sleep(0) - return - - def _sync_stream(): - stream = client.chat.completions.create( - model="deepseek-chat", messages=full_messages, stream=True, - ) - for chunk in stream: - delta = chunk.choices[0].delta - if delta and delta.content: - yield delta.content - - gen = _sync_stream() - while True: - token = await loop.run_in_executor(None, next, gen, None) - if token is None: - return - yield token - - yield "\u2026" - - async def run_agent_stream( - client: OpenAI, + request: Request, messages: list[dict], agent_id: str | None = None, model: str | None = None, ): - """Async generator — yields tokens. Uses tool-loop when skills have tools.""" + """Async generator — yields tokens via the agent's LangGraph.""" agent = _resolve_agent(agent_id, model) - tools = get_all_tools(agent.skills) - - if tools: - async for token in _stream_with_tools(client, messages, agent_id, model): - yield token - return - - # No tools — simple streaming - system_prompt = agent.build_system_prompt() - full_messages: list[dict] = [{"role": "system", "content": system_prompt}] - full_messages.extend(messages) - - loop = asyncio.get_running_loop() - - def _sync_stream(): - stream = client.chat.completions.create( - model="deepseek-chat", messages=full_messages, stream=True, - ) - for chunk in stream: - delta = chunk.choices[0].delta - if delta and delta.content: - yield delta.content - - gen = _sync_stream() - while True: - token = await loop.run_in_executor(None, next, gen, None) - if token is None: - break + graph = get_agent_graph(agent.agent_id, request) + async for token in _stream_graph(graph, messages): yield token @@ -217,13 +115,14 @@ def root(): @router.post("/chat") async def chat( req: ChatRequest, + request: Request, client: OpenAI = Depends(get_llm_client), ): """Streaming chat — single message, no history.""" messages = [{"role": "user", "content": req.message}] async def event_stream(): - async for token in run_agent_stream(client, messages, req.agent_id): + async for token in run_agent_stream(request, messages, req.agent_id): payload = json.dumps({"token": token, "session_id": req.session_id}) yield f"data: {payload}\n\n" yield f"data: {json.dumps({'done': True, 'session_id': req.session_id})}\n\n" @@ -242,26 +141,12 @@ async def chat( @router.post("/chat/sync") async def chat_sync( req: ChatRequest, + request: Request, client: OpenAI = Depends(get_llm_client), ): """Non-streaming chat — single message.""" - agent = _resolve_agent(req.agent_id) - tools = get_all_tools(agent.skills) messages = [{"role": "user", "content": req.message}] - - if tools: - response = await run_agent_with_tools(client, messages, req.agent_id) - else: - agent_obj = _resolve_agent(req.agent_id) - resp = client.chat.completions.create( - model="deepseek-chat", - messages=[ - {"role": "system", "content": agent_obj.build_system_prompt()}, - {"role": "user", "content": req.message}, - ], - ) - response = resp.choices[0].message.content - + response = await run_agent_with_tools(request, messages, req.agent_id) return {"response": response, "session_id": req.session_id} @@ -300,6 +185,7 @@ def list_models(): @router.post("/chat/completions") async def chat_completions( req: ChatCompletionRequest, + request: Request, client: OpenAI = Depends(get_llm_client), ): """OpenAI-compatible /chat/completions — supports stream=True. @@ -311,7 +197,7 @@ async def chat_completions( if req.stream: async def sse_stream(): async for token in run_agent_stream( - client, req.messages, agent_id=agent.agent_id, + request, req.messages, agent_id=agent.agent_id, ): chunk = { "id": "chatcmpl-local", @@ -335,20 +221,10 @@ async def chat_completions( headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}, ) - # Non-streaming — full history, tool-calling - tools = get_all_tools(agent.skills) - if tools: - response = await run_agent_with_tools( - client, req.messages, agent_id=agent.agent_id, - ) - else: - system_prompt = agent.build_system_prompt() - full_msgs: list[dict] = [{"role": "system", "content": system_prompt}] - full_msgs.extend(req.messages) - resp = client.chat.completions.create( - model="deepseek-chat", messages=full_msgs, - ) - response = resp.choices[0].message.content + # Non-streaming — full history, LangGraph agent + response = await run_agent_with_tools( + request, req.messages, agent_id=agent.agent_id, + ) return { "id": "chatcmpl-local", diff --git a/core/graph.py b/core/graph.py new file mode 100644 index 0000000..70f755a --- /dev/null +++ b/core/graph.py @@ -0,0 +1,261 @@ +""" +LangGraph agent graph factory. + +Builds a StateGraph that replaces the manual tool-calling loop in api/v1/chat.py. +The graph has two nodes: + - agent_node : calls the LLM (with system prompt + tool definitions) + - tool_node : executes tool calls via the existing skill system + +A conditional edge routes tool_calls back to the agent, or ends the run. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Literal + +from langchain_core.messages import AIMessage, ToolMessage +from langgraph.graph import END, StateGraph +from openai import OpenAI + +from core.state import AgentState +from skills import get_all_tools, execute_tool + +logger = logging.getLogger("graph") + + +# --------------------------------------------------------------------------- +# Helper — map LangChain message type → OpenAI role +# --------------------------------------------------------------------------- + +def _lc_role_to_openai(msg_type: str) -> str: + """Convert a LangChain message type string to an OpenAI role.""" + mapping = {"human": "user", "ai": "assistant", "tool": "tool", "system": "system"} + return mapping.get(msg_type, "user") + + +def _langchain_tc_to_openai(tool_calls: list) -> list[dict[str, Any]]: + """ + Convert LangChain-format tool_calls (with `name`/`args` at top level) + back to OpenAI format (with a nested `function` sub-object). + """ + result: list[dict[str, Any]] = [] + for tc in tool_calls: + if isinstance(tc, dict): + if "function" in tc: + result.append(tc) + else: + # LangChain format: {"name": ..., "args": ..., "id": ...} + result.append({ + "id": tc.get("id", ""), + "type": "function", + "function": { + "name": tc.get("name", ""), + "arguments": json.dumps(tc.get("args", {})), + }, + }) + else: + # Pydantic model — dump to dict + d = tc.model_dump() if hasattr(tc, "model_dump") else {} + if "function" in d: + result.append(d) + else: + result.append({ + "id": d.get("id", ""), + "type": "function", + "function": { + "name": d.get("name", ""), + "arguments": json.dumps(d.get("args", {})), + }, + }) + return result + + +# --------------------------------------------------------------------------- +# Agent node — calls the LLM +# --------------------------------------------------------------------------- + +def _make_agent_node( + client: OpenAI, + system_prompt: str, + tool_defs: list[dict[str, Any]], + model_name: str = "deepseek-chat", +): + """ + Return a callable suitable as a LangGraph node. + + The node reads the current message list from state, prepends the system + prompt, and calls the LLM. If tool_defs is non-empty the LLM may return + tool_calls; ToolNode (or our custom tool node) will handle them. + """ + + def agent_node(state: AgentState) -> dict[str, list]: + messages = state["messages"] + + # Convert LangChain message objects to plain dicts for the OpenAI client. + full: list[dict[str, Any]] = [{"role": "system", "content": system_prompt}] + for m in messages: + if isinstance(m, dict): + # Already a plain dict — pass through. + # But fix tool_calls if they're in LangChain format. + d = dict(m) + tc = d.get("tool_calls") + if tc and isinstance(tc, list) and tc and isinstance(tc[0], dict) and "function" not in tc[0]: + d["tool_calls"] = _langchain_tc_to_openai(tc) + full.append(d) + else: + # LangChain message object → OpenAI-compatible dict + role = _lc_role_to_openai(getattr(m, "type", "user")) + d: dict[str, Any] = {"role": role, "content": getattr(m, "content", "")} + # Serialize tool_calls back to OpenAI format (if this is an AI msg) + tc = getattr(m, "tool_calls", None) + if tc: + d["tool_calls"] = _langchain_tc_to_openai(tc) + tc_id = getattr(m, "tool_call_id", None) + if tc_id: + d["tool_call_id"] = tc_id + full.append(d) + + resp = client.chat.completions.create( + model=model_name, + messages=full, + tools=tool_defs if tool_defs else None, + tool_choice="auto" if tool_defs else None, + ) + choice = resp.choices[0] + + # Convert OpenAI tool_calls to the dict format LangChain expects. + raw_tool_calls = list(choice.message.tool_calls) if choice.message.tool_calls else [] + tool_calls: list[dict[str, Any]] = [] + for tc in raw_tool_calls: + fn = tc.function + tool_calls.append({ + "name": fn.name, + "args": json.loads(fn.arguments), + "id": tc.id, + }) + ai_msg = AIMessage( + content=choice.message.content or "", + tool_calls=tool_calls if tool_calls else [], + id=getattr(choice.message, "id", None), + ) + return {"messages": [ai_msg]} + + return agent_node + + +# --------------------------------------------------------------------------- +# Tool node — executes tools via the existing skill system +# --------------------------------------------------------------------------- + +def _make_tool_node(skill_names: list[str]): + """ + Return a callable that executes tool_calls from the last AI message. + + This replaces LangGraph's built-in ToolNode — we call our own + `execute_tool()` pipeline so that skill-level auth, httpx sessions, + and ToolResult handling are fully preserved. + """ + + async def tool_node(state: AgentState) -> dict[str, list]: + last_msg = state["messages"][-1] + tool_calls = getattr(last_msg, "tool_calls", None) + if not tool_calls: + return {"messages": []} + + results: list[ToolMessage] = [] + for tc in tool_calls: + # Handle both LangChain format (top-level name/args) and + # OpenAI format (nested "function" key). + if isinstance(tc, dict): + if "function" in tc: + # OpenAI format: {"id":..., "function": {"name":..., "arguments":"..."}} + fn = tc["function"] + fn_name = fn.get("name", "") + fn_args_raw = fn.get("arguments", "{}") + else: + # LangChain format: {"name":..., "args":{...}, "id":...} + fn_name = tc.get("name", "") + fn_args_raw = tc.get("args", {}) + tc_id = tc.get("id", "") + else: + fn_name = getattr(tc, "name", "") + fn_args_raw = getattr(tc, "args", {}) + tc_id = getattr(tc, "id", "") + + # Parse args if they arrive as a JSON string + if isinstance(fn_args_raw, str): + fn_args = json.loads(fn_args_raw) + else: + fn_args = fn_args_raw + + tr = await execute_tool(skill_names, fn_name, fn_args) + content = tr.content if tr else f"Tool '{fn_name}' is not available." + results.append(ToolMessage(content=content, tool_call_id=tc_id)) + + return {"messages": results} + + return tool_node + + +# --------------------------------------------------------------------------- +# Router — decides whether to continue tool-calling or stop +# --------------------------------------------------------------------------- + +def _should_continue(state: AgentState) -> Literal["tool_node", END]: + """If the last message contains tool_calls → execute them, else finish.""" + last_msg = state["messages"][-1] + if getattr(last_msg, "tool_calls", None): + return "tool_node" + return END + + +# --------------------------------------------------------------------------- +# Graph factory — the public API +# --------------------------------------------------------------------------- + +def create_agent_graph( + *, + client: OpenAI, + agent_skills: list[str], + system_prompt: str, + model_name: str = "deepseek-chat", +) -> StateGraph: + """ + Build and compile a LangGraph StateGraph for a single agent. + + Parameters + ---------- + client : The OpenAI-compatible client (already authenticated). + agent_skills : Skill names assigned to the agent (e.g. ["seerr", "triage"]). + system_prompt : The fully-built system prompt (base + skill fragments). + model_name : Model identifier sent to the LLM provider. + + Returns + ------- + A compiled LangGraph graph ready for `.ainvoke()` or `.astream()`. + """ + tool_defs = get_all_tools(agent_skills) + + graph = StateGraph(AgentState) + + # Nodes + graph.add_node( + "agent_node", + _make_agent_node(client, system_prompt, tool_defs, model_name), + ) + if tool_defs: + graph.add_node("tool_node", _make_tool_node(agent_skills)) + graph.add_conditional_edges("agent_node", _should_continue, { + "tool_node": "tool_node", + END: END, + }) + graph.add_edge("tool_node", "agent_node") + else: + # No tools — agent responds once and finishes + graph.add_edge("agent_node", END) + + graph.set_entry_point("agent_node") + + return graph.compile() diff --git a/core/state.py b/core/state.py new file mode 100644 index 0000000..3e434b2 --- /dev/null +++ b/core/state.py @@ -0,0 +1,20 @@ +""" +LangGraph agent state — defines the shape of the state object that flows +through every node in the agent graph. +""" + +from typing import Annotated, TypedDict + +from langgraph.graph.message import add_messages + + +class AgentState(TypedDict): + """ + The single source of truth that travels through every node in the graph. + + `messages` uses LangGraph's `add_messages` reducer, which: + - Appends new messages to the list. + - Replaces messages with the same ID (useful for tool-call results). + """ + + messages: Annotated[list, add_messages] diff --git a/core/tools_adapter.py b/core/tools_adapter.py new file mode 100644 index 0000000..e7fccba --- /dev/null +++ b/core/tools_adapter.py @@ -0,0 +1,51 @@ +""" +Tools adapter — bridges the existing skill/tool system with LangGraph's ToolNode. + +LangGraph's ToolNode expects callable tools (typically @tool-decorated functions). +This module wraps our skill-based tool definitions and async executors so +ToolNode can invoke them without any changes to the skills/ layer. +""" + +from __future__ import annotations + +import json +from typing import Any + +from langchain_core.tools import tool + +from skills import get_all_tools, execute_tool + + +def build_langgraph_tools(skill_names: list[str]) -> list: + """ + Convert the registered skill tool definitions into LangChain-compatible + @tool-decorated functions that ToolNode can call. + + Each tool wraps the existing `execute_tool()` pipeline, so the skill + system's ToolResult + httpx session handling is fully preserved. + """ + tool_defs = get_all_tools(skill_names) + wrapped: list = [] + + for td in tool_defs: + fn_def = td.get("function", {}) + fn_name = fn_def.get("name", "") + fn_desc = fn_def.get("description", "") + + # Create a unique factory so each closure captures the right fn_name + def _make_tool(name: str, desc: str, skills: list[str]): + @tool(name, description=desc) + async def _wrapped(**kwargs: Any) -> str: + """Execute the tool via the skill system and return its content.""" + result = await execute_tool(skills, name, kwargs) + if result is None: + return f"Tool '{name}' is not available." + return result.content + + # Stash the original OpenAI schema so LangGraph can use it + _wrapped.metadata = fn_def + return _wrapped + + wrapped.append(_make_tool(fn_name, fn_desc, skill_names)) + + return wrapped diff --git a/main.py b/main.py index 943bc5e..1fedf96 100644 --- a/main.py +++ b/main.py @@ -41,6 +41,9 @@ app.add_middleware( # --------------------------------------------------------------------------- app.state.llm_client = create_client(DEEPSEEK_API_KEY) +# Lazy-compiled LangGraph graphs — populated on first use per agent +app.state.agent_graphs: dict = {} + # --------------------------------------------------------------------------- # Routers # --------------------------------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index db69327..5e700b8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,6 @@ fastapi openai uvicorn python-dotenv -httpx \ No newline at end of file +httpx +langgraph +langgraph-checkpoint \ No newline at end of file