Implement LangGraph integration: refactor agent-tool interaction, add graph compilation, and enhance state management
Build and Push Agent API / build (push) Successful in 22s

This commit is contained in:
2026-05-24 10:18:59 +02:00
parent 1d821d18fe
commit 2f7f94f1ce
8 changed files with 534 additions and 242 deletions
+114 -64
View File
@@ -1,6 +1,7 @@
# API Architecture — Agent + Skill + Tool Pipeline
# API Architecture — Agent + Skill + Graph Pipeline
This document explains how the API routes user messages through the agent/skill/tool pipeline to produce responses.
This document explains how the API routes user messages through the
agent / skill / LangGraph pipeline to produce responses.
---
@@ -17,27 +18,22 @@ This document explains how the API routes user messages through the agent/skill/
│ api/v1/chat.py — chat_completions() │
│ │
│ 1. _resolve_agent(req.model) → Agent │
│ 2. agent.build_system_prompt() → system prompt
│ 3. Build full_messages = [system] + req.messages
│ 4. run_agent_with_tools(client, messages, agent_id) │
│ 2. get_agent_graph(agent_id) → compiled StateGraph
│ 3. graph.ainvoke(state) or _stream_graph(graph, messages)
└──────────────────────────────┬───────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
Tool-Calling Loop (run_agent_with_tools / run_agent_stream)
LangGraph StateGraph (core/graph.py)
│ │
while turns < max_turns:
response = LLM.chat(messages, tools=agent_tools)
if response has tool_calls:
for each tool_call:
result = execute_tool(skills, name, args)
append result to messages
else:
│ return response.text (stream tokens if streaming) │
┌──────────────┐ tool_calls? ┌──────────────┐
│ agent_node │ ───────────────▶ │ tool_node │
│ (LLM call) │ ◀─────────────── │ (skill exec) │
└──────┬───────┘ └──────────────┘
│ no tool_calls
[END]
└──────────────────────────────────────────────────────────────────┘
```
---
## Key Concepts
@@ -61,7 +57,8 @@ Agent(
- `build_system_prompt()` — merges base_prompt + all skill prompt fragments
Agents self-register at import time via `agents/__init__.py`'s `register()`.
`main.py` calls `load_all_agents()` at startup to import all agent/skill modules.
`main.py` calls `load_all_agents()` at startup to import every agent and skill
module.
### 2. Skill
@@ -78,37 +75,47 @@ Skill(
)
```
- `prompt_fragment` — injected into the agent's system prompt. Teaches the LLM what tools are available and when to use them.
- `prompt_fragment` — injected into the agent's system prompt.
- `tools` — list of OpenAI function definitions (name, description, parameters).
- `execute` — async callable that routes tool calls to API handlers.
### 3. Tool
### 3. Graph
A **Tool** is a single function the LLM can call. Defined as part of a skill's `tools` list.
Each agent gets a **compiled LangGraph StateGraph** built by
`core/graph.py:create_agent_graph()`. The graph is compiled lazily on the
first request and cached on `app.state.agent_graphs` for the lifetime of the
process.
| Graph node / edge | What it does |
|---|---|
| `agent_node` | Converts state messages to OpenAI dicts, calls the LLM with the agent's system prompt + tool definitions, returns an `AIMessage` |
| `tool_node` | Reads `tool_calls` from the last AI message, calls `execute_tool()` from the skill system, returns `ToolMessage` results |
| `_should_continue` | Conditional edge — returns `"tool_node"` if the AI message has `tool_calls`, else `END` |
### 4. State
Defined in `core/state.py`:
```python
{
"type": "function",
"function": {
"name": "seerr_trending",
"description": "Get trending movies and TV shows from Seerr...",
"parameters": {
"type": "object",
"properties": {
"kind": {"type": "string", "enum": ["movie", "tv", "all"]},
"language": {"type": "string"},
},
"required": ["kind"],
},
},
}
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
```
When the LLM responds with a tool call, the loop:
1. Extracts `function.name` (e.g. `"seerr_trending"`) and `function.arguments` (e.g. `{"kind": "movie"}`)
2. Calls `execute_tool(agent.skills, name, args)` which finds the owning skill and runs it
3. Appends the result text to the message history
4. Sends back to the LLM for a follow-up response
LangGraph's `add_messages` reducer appends new messages and replaces messages
with matching IDs (so tool-call results overwrite their placeholders).
### 5. Message Conversion
Because we use the raw `openai` client (not `langchain-openai`), messages must
be converted between LangChain and OpenAI formats at every LLM call:
- **LangChain → OpenAI** (`_lc_role_to_openai`, `_langchain_tc_to_openai`):
Maps `type``role` and converts top-level `name`/`args` tool-calls into
the nested `function` sub-object that the OpenAI API expects.
- **OpenAI → LangChain** (inside `agent_node`):
Converts the `ChatCompletionMessage` response into an `AIMessage` with
LangChain-format `tool_calls` (top-level `name`/`args`/`id`).
---
@@ -130,28 +137,36 @@ When the LLM responds with a tool call, the loop:
2. chat_completions():
→ _resolve_agent(model="media-agent")
→ get_agent("media-agent") → Agent(skills=["media_info", "seerr", "triage"])
tools = get_all_tools(["media_info", "seerr", "triage"])
Returns 7 tool definitions from seerr.py
→ system_prompt = agent.build_system_prompt()
→ base_prompt + media_info fragment + seerr fragment + triage fragment
get_agent_graph("media-agent", request)
looks up app.state.agent_graphs["media-agent"]
→ first call → create_agent_graph() compiles the graph with 7 Seerr tools
→ run_agent_with_tools(request, messages, agent_id)
→ _invoke_graph(graph, messages)
3. run_agent_with_tools() — Turn 1:
→ LLM receives: [system prompt with tools] + [user: "What are trending movies?"]
→ LLM responds: tool_calls = [{"function": {"name": "seerr_trending", "arguments": {"kind": "movie"}}}]
3. Graph — Pass 1 (agent_node):
→ LLM receives: [system prompt] + [user: "What are trending movies?"]
→ LLM responds with tool_calls: seerr_trending(kind="movie")
→ agent_node returns AIMessage with tool_calls in LangChain format
4. Execute tool:
execute_tool(["media_info", "seerr", "triage"], "seerr_trending", {"kind": "movie"})
→ Finds seerr skill → calls _execute("seerr_trending", ...) → _trending(args)
→ GET /api/v1/discover/trending?mediaType=movie
→ Returns formatted list with [tmdb:IDs]
4. Graph — _should_continue:
AIMessage has tool_calls → route to "tool_node"
5. run_agent_with_tools() — Turn 2:
LLM receives: previous messages + [tool: "Found 20 trending movies..."]
LLM responds: text = "Here are the top trending movies! 🎬 ..."
finish_reason="stop" → return the text
5. Graph — tool_node:
Reads tool_call: name="seerr_trending", args={"kind": "movie"}
execute_tool(["media_info", "seerr", "triage"], "seerr_trending", ...)
Seerr API → GET /api/v1/discover/trending?mediaType=movie
→ Returns ToolMessage with formatted results including [tmdb:IDs]
6. chat_completions() returns:
{ "choices": [{"message": {"content": "Here are the top trending movies!..."}}] }
6. Graph — Pass 2 (agent_node):
→ LLM receives previous exchange + tool result
→ LLM responds with text only (no tool_calls)
→ agent_node returns AIMessage(content="Here are the top trending movies!...")
7. Graph — _should_continue:
→ No tool_calls → route to END
8. chat_completions() returns:
{ "choices": [{"message": {"role": "assistant", "content": "Here are the top..."}}] }
```
### Step-by-step: "Request the 2026 one" (multi-turn context)
@@ -172,14 +187,49 @@ When the LLM responds with a tool call, the loop:
2. chat_completions():
→ req.messages contains the ENTIRE conversation history
System prompt prepended → full_messages = [system] + 5 history messages
LLM sees everything: the trending list with [tmdb:931285], the disambiguation, "the 2026 one"
graph.ainvoke({"messages": all_messages})
agent_node prepends system prompt and sends everything to the LLM
3. LLM reasons:
- I previously listed Mortal Kombat II (2026) with [tmdb:931285]
3. LLM reasons from full context:
- Previously listed Mortal Kombat II (2026) with [tmdb:931285]
- The user said "request the mortal kombat one" → I searched and showed 4 options
- Now they say "the 2026 one" → that matches Mortal Kombat II (2026) [tmdb:931285]
- I should call seerr_request_media(kind="movie", title="Mortal Kombat II", tmdb_id=931285)
4. Tool executes the request → ✅ Success
4. tool_node executes the request → ✅ Success
```
---
## Streaming
Streaming works slightly differently from the sync path:
```
chat_completions(stream=True)
→ _stream_graph(graph, messages)
→ graph.ainvoke(state) # runs graph to completion (tools execute silently)
→ yields content character-by-character via SSE
```
For true token-level streaming (tokens appear as the LLM generates them),
the agent_node would need to use `langchain-openai`'s `ChatOpenAI` instead of
the raw `openai` client. The current approach is a pragmatic middle ground
that avoids adding another dependency while still giving the SSE client
incremental output.
---
## File Map
| File | Responsibility |
|---|---|
| `main.py` | FastAPI app, singleton creation, router mounting |
| `api/v1/chat.py` | Endpoints — resolves agent, invokes graph, formats responses |
| `api/dependencies.py` | `get_llm_client()`, `get_agent_graph()` — FastAPI `Depends` |
| `core/graph.py` | `create_agent_graph()` — builds the StateGraph |
| `core/state.py` | `AgentState` TypedDict |
| `core/llm.py` | `create_client()` — OpenAI client factory |
| `core/config.py` | Environment variable loader |
| `agents/` | Agent definitions (dataclass + self-registration) |
| `skills/` | Skill definitions (prompt fragments + tools + executors) |
+29
View File
@@ -1,7 +1,36 @@
from fastapi import Request
from openai import OpenAI
from core.graph import create_agent_graph
def get_llm_client(request: Request) -> OpenAI:
"""FastAPI dependency — returns the singleton OpenAI client from app.state."""
return request.app.state.llm_client
def get_agent_graph(agent_id: str, request: Request):
"""
FastAPI dependency — returns the compiled LangGraph graph for *agent_id*.
Graphs are lazily compiled on first use and cached on app.state so each
agent's graph is only built once per process lifetime.
"""
cache: dict = request.app.state.agent_graphs
if agent_id not in cache:
from agents import get as get_agent
agent = get_agent(agent_id)
if agent is None:
# Fall back to the naked agent if the requested one doesn't exist
agent_id = "naked"
agent = get_agent(agent_id)
cache[agent_id] = create_agent_graph(
client=request.app.state.llm_client,
agent_skills=agent.skills,
system_prompt=agent.build_system_prompt(),
)
return cache[agent_id]
+53 -177
View File
@@ -1,13 +1,12 @@
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse
from openai import OpenAI
from pydantic import BaseModel
import json
import asyncio
from api.dependencies import get_llm_client
from api.dependencies import get_llm_client, get_agent_graph
from agents import get as get_agent, list_all as list_all_agents
from skills import get_all_tools, execute_tool
from core.state import AgentState
router = APIRouter()
@@ -42,166 +41,65 @@ def _resolve_agent(agent_id: str | None = None, model: str | None = None):
# ---------------------------------------------------------------------------
# Tool-calling loop (non-streaming)
# LangGraph helpers
# ---------------------------------------------------------------------------
async def _invoke_graph(graph, messages: list[dict]) -> str:
"""Run the graph synchronously (non-streaming) and return the final text."""
state: AgentState = {"messages": messages}
result = await graph.ainvoke(state)
last_msg = result["messages"][-1]
return last_msg.content or ""
async def _stream_graph(graph, messages: list[dict]):
"""
Run the graph and stream the final response token-by-token.
LangGraph's astream_events would require langchain-openai's ChatOpenAI
to intercept LLM chunks. Instead we run the graph to completion (tools
execute silently) and then stream the final text content character by
character — this gives the client a real SSE stream without adding new
dependencies.
"""
state: AgentState = {"messages": messages}
result = await graph.ainvoke(state)
content = result["messages"][-1].content or ""
# Yield token-by-token so the SSE client sees incremental output
for token in content:
yield token
# ---------------------------------------------------------------------------
# Non-streaming run (kept for /chat/sync and sync completions)
# ---------------------------------------------------------------------------
async def run_agent_with_tools(
client: OpenAI,
request: Request,
messages: list[dict],
agent_id: str | None = None,
model: str | None = None,
max_turns: int = 5,
) -> str:
"""Send messages to the LLM with tool definitions. Tool-calling loop."""
"""Send messages through the agent's LangGraph. Non-streaming."""
agent = _resolve_agent(agent_id, model)
tools = get_all_tools(agent.skills)
system_prompt = agent.build_system_prompt()
full_messages: list[dict] = [{"role": "system", "content": system_prompt}]
full_messages.extend(messages)
loop = asyncio.get_running_loop()
for _ in range(max_turns):
resp = await loop.run_in_executor(
None,
lambda: client.chat.completions.create(
model="deepseek-chat",
messages=full_messages,
tools=tools if tools else None,
tool_choice="auto" if tools else None,
),
)
choice = resp.choices[0]
if choice.finish_reason == "stop" and choice.message.content:
return choice.message.content
if choice.message.tool_calls:
full_messages.append(choice.message.model_dump(exclude_none=True))
for tc in choice.message.tool_calls:
fn_name = tc.function.name
fn_args = json.loads(tc.function.arguments)
tr = await execute_tool(agent.skills, fn_name, fn_args)
result = tr.content if tr else f"Tool '{fn_name}' is not available."
full_messages.append({
"role": "tool", "tool_call_id": tc.id, "content": result,
})
continue
return choice.message.content or "I'm not sure how to help with that."
return "I've taken several actions but still need more information. Could you clarify?"
graph = get_agent_graph(agent.agent_id, request)
return await _invoke_graph(graph, messages)
# ---------------------------------------------------------------------------
# Streaming generators
# Streaming generator (kept for /chat and stream completions)
# ---------------------------------------------------------------------------
async def _stream_with_tools(
client: OpenAI,
messages: list[dict],
agent_id: str | None = None,
model: str | None = None,
max_turns: int = 5,
):
"""Streaming tool-calling loop. Tools run silently, final text is streamed."""
agent = _resolve_agent(agent_id, model)
tools = get_all_tools(agent.skills)
system_prompt = agent.build_system_prompt()
full_messages: list[dict] = [{"role": "system", "content": system_prompt}]
full_messages.extend(messages)
loop = asyncio.get_running_loop()
for turn in range(max_turns):
resp = await loop.run_in_executor(
None,
lambda: client.chat.completions.create(
model="deepseek-chat",
messages=full_messages,
tools=tools if tools else None,
tool_choice="auto" if tools else None,
),
)
choice = resp.choices[0]
if choice.message.tool_calls:
full_messages.append(choice.message.model_dump(exclude_none=True))
for tc in choice.message.tool_calls:
fn_name = tc.function.name
fn_args = json.loads(tc.function.arguments)
tr = await execute_tool(agent.skills, fn_name, fn_args)
result = tr.content if tr else f"Tool '{fn_name}' is not available."
full_messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result,
})
continue
if choice.finish_reason == "stop" and choice.message.content:
for token in choice.message.content:
yield token
await asyncio.sleep(0)
return
def _sync_stream():
stream = client.chat.completions.create(
model="deepseek-chat", messages=full_messages, stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta and delta.content:
yield delta.content
gen = _sync_stream()
while True:
token = await loop.run_in_executor(None, next, gen, None)
if token is None:
return
yield token
yield "\u2026"
async def run_agent_stream(
client: OpenAI,
request: Request,
messages: list[dict],
agent_id: str | None = None,
model: str | None = None,
):
"""Async generator — yields tokens. Uses tool-loop when skills have tools."""
"""Async generator — yields tokens via the agent's LangGraph."""
agent = _resolve_agent(agent_id, model)
tools = get_all_tools(agent.skills)
if tools:
async for token in _stream_with_tools(client, messages, agent_id, model):
yield token
return
# No tools — simple streaming
system_prompt = agent.build_system_prompt()
full_messages: list[dict] = [{"role": "system", "content": system_prompt}]
full_messages.extend(messages)
loop = asyncio.get_running_loop()
def _sync_stream():
stream = client.chat.completions.create(
model="deepseek-chat", messages=full_messages, stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta and delta.content:
yield delta.content
gen = _sync_stream()
while True:
token = await loop.run_in_executor(None, next, gen, None)
if token is None:
break
graph = get_agent_graph(agent.agent_id, request)
async for token in _stream_graph(graph, messages):
yield token
@@ -217,13 +115,14 @@ def root():
@router.post("/chat")
async def chat(
req: ChatRequest,
request: Request,
client: OpenAI = Depends(get_llm_client),
):
"""Streaming chat — single message, no history."""
messages = [{"role": "user", "content": req.message}]
async def event_stream():
async for token in run_agent_stream(client, messages, req.agent_id):
async for token in run_agent_stream(request, messages, req.agent_id):
payload = json.dumps({"token": token, "session_id": req.session_id})
yield f"data: {payload}\n\n"
yield f"data: {json.dumps({'done': True, 'session_id': req.session_id})}\n\n"
@@ -242,26 +141,12 @@ async def chat(
@router.post("/chat/sync")
async def chat_sync(
req: ChatRequest,
request: Request,
client: OpenAI = Depends(get_llm_client),
):
"""Non-streaming chat — single message."""
agent = _resolve_agent(req.agent_id)
tools = get_all_tools(agent.skills)
messages = [{"role": "user", "content": req.message}]
if tools:
response = await run_agent_with_tools(client, messages, req.agent_id)
else:
agent_obj = _resolve_agent(req.agent_id)
resp = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": agent_obj.build_system_prompt()},
{"role": "user", "content": req.message},
],
)
response = resp.choices[0].message.content
response = await run_agent_with_tools(request, messages, req.agent_id)
return {"response": response, "session_id": req.session_id}
@@ -300,6 +185,7 @@ def list_models():
@router.post("/chat/completions")
async def chat_completions(
req: ChatCompletionRequest,
request: Request,
client: OpenAI = Depends(get_llm_client),
):
"""OpenAI-compatible /chat/completions — supports stream=True.
@@ -311,7 +197,7 @@ async def chat_completions(
if req.stream:
async def sse_stream():
async for token in run_agent_stream(
client, req.messages, agent_id=agent.agent_id,
request, req.messages, agent_id=agent.agent_id,
):
chunk = {
"id": "chatcmpl-local",
@@ -335,20 +221,10 @@ async def chat_completions(
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
# Non-streaming — full history, tool-calling
tools = get_all_tools(agent.skills)
if tools:
response = await run_agent_with_tools(
client, req.messages, agent_id=agent.agent_id,
)
else:
system_prompt = agent.build_system_prompt()
full_msgs: list[dict] = [{"role": "system", "content": system_prompt}]
full_msgs.extend(req.messages)
resp = client.chat.completions.create(
model="deepseek-chat", messages=full_msgs,
)
response = resp.choices[0].message.content
# Non-streaming — full history, LangGraph agent
response = await run_agent_with_tools(
request, req.messages, agent_id=agent.agent_id,
)
return {
"id": "chatcmpl-local",