from fastapi import APIRouter, Depends, Request from fastapi.responses import StreamingResponse from openai import OpenAI from pydantic import BaseModel import json from api.dependencies import get_llm_client, get_agent_graph from agents import get as get_agent, list_all as list_all_agents from core.state import AgentState router = APIRouter() class ChatRequest(BaseModel): message: str session_id: str | None = None agent_id: str | None = None class ChatCompletionRequest(BaseModel): messages: list[dict] stream: bool = False model: str = "deepseek-chat" # --------------------------------------------------------------------------- # Agent resolution # --------------------------------------------------------------------------- def _resolve_agent(agent_id: str | None = None, model: str | None = None): """ 1. explicit agent_id 2. model field (OpenWebUI sends this — maps to agent_id if registered) 3. fallback to "naked" """ lookup = agent_id or model if lookup is None: return get_agent("naked") agent = get_agent(lookup) return agent if agent else get_agent("naked") # --------------------------------------------------------------------------- # LangGraph helpers # --------------------------------------------------------------------------- async def _invoke_graph(graph, messages: list[dict]) -> str: """Run the graph synchronously (non-streaming) and return the final text.""" state: AgentState = {"messages": messages} result = await graph.ainvoke(state) last_msg = result["messages"][-1] return last_msg.content or "" async def _stream_graph(graph, messages: list[dict]): """ Run the graph and stream the final response token-by-token. LangGraph's astream_events would require langchain-openai's ChatOpenAI to intercept LLM chunks. Instead we run the graph to completion (tools execute silently) and then stream the final text content character by character — this gives the client a real SSE stream without adding new dependencies. """ state: AgentState = {"messages": messages} result = await graph.ainvoke(state) content = result["messages"][-1].content or "" # Yield token-by-token so the SSE client sees incremental output for token in content: yield token # --------------------------------------------------------------------------- # Non-streaming run (kept for /chat/sync and sync completions) # --------------------------------------------------------------------------- async def run_agent_with_tools( request: Request, messages: list[dict], agent_id: str | None = None, model: str | None = None, ) -> str: """Send messages through the agent's LangGraph. Non-streaming.""" agent = _resolve_agent(agent_id, model) graph = get_agent_graph(agent.agent_id, request) return await _invoke_graph(graph, messages) # --------------------------------------------------------------------------- # Streaming generator (kept for /chat and stream completions) # --------------------------------------------------------------------------- async def run_agent_stream( request: Request, messages: list[dict], agent_id: str | None = None, model: str | None = None, ): """Async generator — yields tokens via the agent's LangGraph.""" agent = _resolve_agent(agent_id, model) graph = get_agent_graph(agent.agent_id, request) async for token in _stream_graph(graph, messages): yield token # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.get("/") def root(): return {"status": "ok"} @router.post("/chat") async def chat( req: ChatRequest, request: Request, client: OpenAI = Depends(get_llm_client), ): """Streaming chat — single message, no history.""" messages = [{"role": "user", "content": req.message}] async def event_stream(): async for token in run_agent_stream(request, messages, req.agent_id): payload = json.dumps({"token": token, "session_id": req.session_id}) yield f"data: {payload}\n\n" yield f"data: {json.dumps({'done': True, 'session_id': req.session_id})}\n\n" return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) @router.post("/chat/sync") async def chat_sync( req: ChatRequest, request: Request, client: OpenAI = Depends(get_llm_client), ): """Non-streaming chat — single message.""" messages = [{"role": "user", "content": req.message}] response = await run_agent_with_tools(request, messages, req.agent_id) return {"response": response, "session_id": req.session_id} @router.get("/agents") def list_agents(): """Return all registered agents.""" return { "agents": [ { "agent_id": a.agent_id, "description": a.description, "skills": a.skills, } for a in list_all_agents().values() ] } @router.get("/models") def list_models(): """Return agents as selectable models for OpenWebUI.""" return { "object": "list", "data": [ { "id": a.agent_id, "object": "model", "created": 0, "owned_by": "local-agent", } for a in list_all_agents().values() ], } @router.post("/chat/completions") async def chat_completions( req: ChatCompletionRequest, request: Request, client: OpenAI = Depends(get_llm_client), ): """OpenAI-compatible /chat/completions — supports stream=True. Multi-turn: req.messages contains the FULL conversation history. Agent resolved from the model field (OpenWebUI sends this). """ agent = _resolve_agent(model=req.model) if req.stream: async def sse_stream(): async for token in run_agent_stream( request, req.messages, agent_id=agent.agent_id, ): chunk = { "id": "chatcmpl-local", "object": "chat.completion.chunk", "choices": [ {"index": 0, "delta": {"content": token}, "finish_reason": None} ], } yield f"data: {json.dumps(chunk)}\n\n" final_chunk = { "id": "chatcmpl-local", "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse( sse_stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}, ) # Non-streaming — full history, LangGraph agent response = await run_agent_with_tools( request, req.messages, agent_id=agent.agent_id, ) return { "id": "chatcmpl-local", "object": "chat.completion", "created": 0, "model": req.model, "choices": [ { "index": 0, "message": {"role": "assistant", "content": response}, "finish_reason": "stop", } ], }