from fastapi import APIRouter, Depends from fastapi.responses import StreamingResponse from openai import OpenAI from pydantic import BaseModel import json import asyncio from api.dependencies import get_llm_client from agents import get as get_agent, list_all as list_all_agents from skills import get_all_tools, execute_tool, ToolResult router = APIRouter() class ChatRequest(BaseModel): message: str session_id: str | None = None agent_id: str | None = None # which agent to use ("naked", "media-agent", …) class ChatCompletionRequest(BaseModel): messages: list[dict] stream: bool = False model: str = "deepseek-chat" # --------------------------------------------------------------------------- # Agent resolution # --------------------------------------------------------------------------- def _resolve_agent(agent_id: str | None = None, model: str | None = None): """ Resolution order: 1. explicit agent_id 2. model field (OpenWebUI sends this — maps to agent_id if registered) 3. fallback to "naked" """ lookup = agent_id or model if lookup is None: return get_agent("naked") agent = get_agent(lookup) return agent if agent else get_agent("naked") # --------------------------------------------------------------------------- # Tool-calling loop (non-streaming) # --------------------------------------------------------------------------- async def run_agent_with_tools( client: OpenAI, message: str, agent_id: str | None = None, model: str | None = None, max_turns: int = 5, ) -> str: """Send the user message to the LLM with tool definitions. Loop: if the LLM responds with tool_calls, execute them and feed results back until the LLM produces a final text answer. """ agent = _resolve_agent(agent_id, model) tools = get_all_tools(agent.skills) system_prompt = agent.build_system_prompt() messages: list[dict] = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": message}, ] loop = asyncio.get_running_loop() for _ in range(max_turns): resp = await loop.run_in_executor( None, lambda: client.chat.completions.create( model="deepseek-chat", messages=messages, tools=tools if tools else None, tool_choice="auto" if tools else None, ), ) choice = resp.choices[0] # If the model sends a final text answer, return it if choice.finish_reason == "stop" and choice.message.content: return choice.message.content # If the model wants to call tools if choice.message.tool_calls: # Append the assistant message with tool_calls messages.append(choice.message.model_dump(exclude_none=True)) for tc in choice.message.tool_calls: fn_name = tc.function.name fn_args = json.loads(tc.function.arguments) tr = await execute_tool(agent.skills, fn_name, fn_args) result = tr.content if tr else f"Tool '{fn_name}' is not available right now." messages.append({ "role": "tool", "tool_call_id": tc.id, "content": result, }) continue # Fallback — should not normally happen return choice.message.content or "I'm not sure how to help with that." return "I've taken several actions but still need more information. Could you clarify?" # --------------------------------------------------------------------------- # Non-streaming helper (no tools — used by sync endpoint if tools are absent) # --------------------------------------------------------------------------- def run_agent_simple( client: OpenAI, message: str, agent_id: str | None = None, model: str | None = None, ) -> str: """Plain LLM call — no tools. Used when the agent has no tool-enabled skills.""" agent = _resolve_agent(agent_id, model) response = client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": agent.build_system_prompt()}, {"role": "user", "content": message}, ], ) return response.choices[0].message.content # --------------------------------------------------------------------------- # Streaming generators # --------------------------------------------------------------------------- async def _stream_with_tools( client: OpenAI, message: str, agent_id: str | None = None, model: str | None = None, max_turns: int = 5, ): """Streaming version with tool-calling loop. Yields tokens from the final text response (tools run silently in the background). """ agent = _resolve_agent(agent_id, model) tools = get_all_tools(agent.skills) system_prompt = agent.build_system_prompt() messages: list[dict] = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": message}, ] loop = asyncio.get_running_loop() for turn in range(max_turns): # Non-streaming call to check for tool_calls resp = await loop.run_in_executor( None, lambda: client.chat.completions.create( model="deepseek-chat", messages=messages, tools=tools if tools else None, tool_choice="auto" if tools else None, ), ) choice = resp.choices[0] # Tool calls? Execute them and loop if choice.message.tool_calls: messages.append(choice.message.model_dump(exclude_none=True)) for tc in choice.message.tool_calls: fn_name = tc.function.name fn_args = json.loads(tc.function.arguments) tr = await execute_tool(agent.skills, fn_name, fn_args) result = tr.content if tr else f"Tool '{fn_name}' is not available right now." messages.append({ "role": "tool", "tool_call_id": tc.id, "content": result, }) continue # Final text answer — stream it if choice.finish_reason == "stop" and choice.message.content: # Already have a non-streaming answer — yield it token-by-token for token in choice.message.content: yield token await asyncio.sleep(0) return # Last resort: stream the final response def _sync_stream(): stream = client.chat.completions.create( model="deepseek-chat", messages=messages, stream=True, ) for chunk in stream: delta = chunk.choices[0].delta if delta and delta.content: yield delta.content gen = _sync_stream() while True: token = await loop.run_in_executor(None, next, gen, None) if token is None: return yield token yield "…" async def run_agent_stream( client: OpenAI, message: str, agent_id: str | None = None, model: str | None = None, ): """Async generator — yields tokens. Uses tool-loop when skills have tools.""" agent = _resolve_agent(agent_id, model) tools = get_all_tools(agent.skills) if tools: async for token in _stream_with_tools(client, message, agent_id, model): yield token return # No tools — simple streaming system_prompt = agent.build_system_prompt() loop = asyncio.get_running_loop() def _sync_stream(): stream = client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": message}, ], stream=True, ) for chunk in stream: delta = chunk.choices[0].delta if delta and delta.content: yield delta.content gen = _sync_stream() while True: token = await loop.run_in_executor(None, next, gen, None) if token is None: break yield token # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.get("/") def root(): return {"status": "ok"} @router.post("/chat") async def chat(req: ChatRequest, client: OpenAI = Depends(get_llm_client)): """Streaming chat endpoint — returns Server-Sent Events.""" async def event_stream(): async for token in run_agent_stream( client, req.message, req.agent_id, ): payload = json.dumps({"token": token, "session_id": req.session_id}) yield f"data: {payload}\n\n" yield f"data: {json.dumps({'done': True, 'session_id': req.session_id})}\n\n" return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) @router.post("/chat/sync") async def chat_sync(req: ChatRequest, client: OpenAI = Depends(get_llm_client)): """Non-streaming endpoint — uses tool-calling when the agent has tools.""" agent = _resolve_agent(req.agent_id) tools = get_all_tools(agent.skills) if tools: response = await run_agent_with_tools( client, req.message, req.agent_id, ) else: response = run_agent_simple(client, req.message, req.agent_id) return {"response": response, "session_id": req.session_id} @router.get("/agents") def list_agents(): """Return all registered agents with their ids, descriptions, and skills.""" return { "agents": [ { "agent_id": a.agent_id, "description": a.description, "skills": a.skills, } for a in list_all_agents().values() ] } @router.get("/models") def list_models(): """Return all registered agents as selectable models for OpenWebUI.""" return { "object": "list", "data": [ { "id": a.agent_id, "object": "model", "created": 0, "owned_by": "local-agent", } for a in list_all_agents().values() ], } @router.post("/chat/completions") async def chat_completions( req: ChatCompletionRequest, client: OpenAI = Depends(get_llm_client), ): """OpenAI-compatible /chat/completions — supports stream=True. Resolves the agent from the model field (OpenWebUI sends this). """ user_message = req.messages[-1]["content"] agent = _resolve_agent(model=req.model) if req.stream: async def sse_stream(): async for token in run_agent_stream(client, user_message, agent_id=agent.agent_id): chunk = { "id": "chatcmpl-local", "object": "chat.completion.chunk", "choices": [ { "index": 0, "delta": {"content": token}, "finish_reason": None, } ], } yield f"data: {json.dumps(chunk)}\n\n" final_chunk = { "id": "chatcmpl-local", "object": "chat.completion.chunk", "choices": [ { "index": 0, "delta": {}, "finish_reason": "stop", } ], } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse( sse_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", }, ) # Non-streaming path tools = get_all_tools(agent.skills) if tools: response = await run_agent_with_tools(client, user_message, agent_id=agent.agent_id) else: response = run_agent_simple(client, user_message, agent_id=agent.agent_id) return { "id": "chatcmpl-local", "object": "chat.completion", "created": 0, "model": req.model, "choices": [ { "index": 0, "message": {"role": "assistant", "content": response}, "finish_reason": "stop", } ], }