366 lines
11 KiB
Python
366 lines
11 KiB
Python
from fastapi import APIRouter, Depends
|
|
from fastapi.responses import StreamingResponse
|
|
from openai import OpenAI
|
|
from pydantic import BaseModel
|
|
import json
|
|
import asyncio
|
|
|
|
from api.dependencies import get_llm_client
|
|
from agents import get as get_agent, list_all as list_all_agents
|
|
from skills import get_all_tools, execute_tool
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class ChatRequest(BaseModel):
|
|
message: str
|
|
session_id: str | None = None
|
|
agent_id: str | None = None
|
|
|
|
|
|
class ChatCompletionRequest(BaseModel):
|
|
messages: list[dict]
|
|
stream: bool = False
|
|
model: str = "deepseek-chat"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Agent resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _resolve_agent(agent_id: str | None = None, model: str | None = None):
|
|
"""
|
|
1. explicit agent_id
|
|
2. model field (OpenWebUI sends this — maps to agent_id if registered)
|
|
3. fallback to "naked"
|
|
"""
|
|
lookup = agent_id or model
|
|
if lookup is None:
|
|
return get_agent("naked")
|
|
agent = get_agent(lookup)
|
|
return agent if agent else get_agent("naked")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool-calling loop (non-streaming)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def run_agent_with_tools(
|
|
client: OpenAI,
|
|
messages: list[dict],
|
|
agent_id: str | None = None,
|
|
model: str | None = None,
|
|
max_turns: int = 5,
|
|
) -> str:
|
|
"""Send messages to the LLM with tool definitions. Tool-calling loop."""
|
|
agent = _resolve_agent(agent_id, model)
|
|
tools = get_all_tools(agent.skills)
|
|
system_prompt = agent.build_system_prompt()
|
|
|
|
full_messages: list[dict] = [{"role": "system", "content": system_prompt}]
|
|
full_messages.extend(messages)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
for _ in range(max_turns):
|
|
resp = await loop.run_in_executor(
|
|
None,
|
|
lambda: client.chat.completions.create(
|
|
model="deepseek-chat",
|
|
messages=full_messages,
|
|
tools=tools if tools else None,
|
|
tool_choice="auto" if tools else None,
|
|
),
|
|
)
|
|
choice = resp.choices[0]
|
|
|
|
if choice.finish_reason == "stop" and choice.message.content:
|
|
return choice.message.content
|
|
|
|
if choice.message.tool_calls:
|
|
full_messages.append(choice.message.model_dump(exclude_none=True))
|
|
for tc in choice.message.tool_calls:
|
|
fn_name = tc.function.name
|
|
fn_args = json.loads(tc.function.arguments)
|
|
tr = await execute_tool(agent.skills, fn_name, fn_args)
|
|
result = tr.content if tr else f"Tool '{fn_name}' is not available."
|
|
full_messages.append({
|
|
"role": "tool", "tool_call_id": tc.id, "content": result,
|
|
})
|
|
continue
|
|
|
|
return choice.message.content or "I'm not sure how to help with that."
|
|
|
|
return "I've taken several actions but still need more information. Could you clarify?"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Streaming generators
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _stream_with_tools(
|
|
client: OpenAI,
|
|
messages: list[dict],
|
|
agent_id: str | None = None,
|
|
model: str | None = None,
|
|
max_turns: int = 5,
|
|
):
|
|
"""Streaming tool-calling loop. Tools run silently, final text is streamed."""
|
|
agent = _resolve_agent(agent_id, model)
|
|
tools = get_all_tools(agent.skills)
|
|
system_prompt = agent.build_system_prompt()
|
|
|
|
full_messages: list[dict] = [{"role": "system", "content": system_prompt}]
|
|
full_messages.extend(messages)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
for turn in range(max_turns):
|
|
resp = await loop.run_in_executor(
|
|
None,
|
|
lambda: client.chat.completions.create(
|
|
model="deepseek-chat",
|
|
messages=full_messages,
|
|
tools=tools if tools else None,
|
|
tool_choice="auto" if tools else None,
|
|
),
|
|
)
|
|
choice = resp.choices[0]
|
|
|
|
if choice.message.tool_calls:
|
|
full_messages.append(choice.message.model_dump(exclude_none=True))
|
|
for tc in choice.message.tool_calls:
|
|
fn_name = tc.function.name
|
|
fn_args = json.loads(tc.function.arguments)
|
|
tr = await execute_tool(agent.skills, fn_name, fn_args)
|
|
result = tr.content if tr else f"Tool '{fn_name}' is not available."
|
|
full_messages.append({
|
|
"role": "tool",
|
|
"tool_call_id": tc.id,
|
|
"content": result,
|
|
})
|
|
continue
|
|
|
|
if choice.finish_reason == "stop" and choice.message.content:
|
|
for token in choice.message.content:
|
|
yield token
|
|
await asyncio.sleep(0)
|
|
return
|
|
|
|
def _sync_stream():
|
|
stream = client.chat.completions.create(
|
|
model="deepseek-chat", messages=full_messages, stream=True,
|
|
)
|
|
for chunk in stream:
|
|
delta = chunk.choices[0].delta
|
|
if delta and delta.content:
|
|
yield delta.content
|
|
|
|
gen = _sync_stream()
|
|
while True:
|
|
token = await loop.run_in_executor(None, next, gen, None)
|
|
if token is None:
|
|
return
|
|
yield token
|
|
|
|
yield "\u2026"
|
|
|
|
|
|
async def run_agent_stream(
|
|
client: OpenAI,
|
|
messages: list[dict],
|
|
agent_id: str | None = None,
|
|
model: str | None = None,
|
|
):
|
|
"""Async generator — yields tokens. Uses tool-loop when skills have tools."""
|
|
agent = _resolve_agent(agent_id, model)
|
|
tools = get_all_tools(agent.skills)
|
|
|
|
if tools:
|
|
async for token in _stream_with_tools(client, messages, agent_id, model):
|
|
yield token
|
|
return
|
|
|
|
# No tools — simple streaming
|
|
system_prompt = agent.build_system_prompt()
|
|
full_messages: list[dict] = [{"role": "system", "content": system_prompt}]
|
|
full_messages.extend(messages)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def _sync_stream():
|
|
stream = client.chat.completions.create(
|
|
model="deepseek-chat", messages=full_messages, stream=True,
|
|
)
|
|
for chunk in stream:
|
|
delta = chunk.choices[0].delta
|
|
if delta and delta.content:
|
|
yield delta.content
|
|
|
|
gen = _sync_stream()
|
|
while True:
|
|
token = await loop.run_in_executor(None, next, gen, None)
|
|
if token is None:
|
|
break
|
|
yield token
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/")
|
|
def root():
|
|
return {"status": "ok"}
|
|
|
|
|
|
@router.post("/chat")
|
|
async def chat(
|
|
req: ChatRequest,
|
|
client: OpenAI = Depends(get_llm_client),
|
|
):
|
|
"""Streaming chat — single message, no history."""
|
|
messages = [{"role": "user", "content": req.message}]
|
|
|
|
async def event_stream():
|
|
async for token in run_agent_stream(client, messages, req.agent_id):
|
|
payload = json.dumps({"token": token, "session_id": req.session_id})
|
|
yield f"data: {payload}\n\n"
|
|
yield f"data: {json.dumps({'done': True, 'session_id': req.session_id})}\n\n"
|
|
|
|
return StreamingResponse(
|
|
event_stream(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|
|
|
|
|
|
@router.post("/chat/sync")
|
|
async def chat_sync(
|
|
req: ChatRequest,
|
|
client: OpenAI = Depends(get_llm_client),
|
|
):
|
|
"""Non-streaming chat — single message."""
|
|
agent = _resolve_agent(req.agent_id)
|
|
tools = get_all_tools(agent.skills)
|
|
messages = [{"role": "user", "content": req.message}]
|
|
|
|
if tools:
|
|
response = await run_agent_with_tools(client, messages, req.agent_id)
|
|
else:
|
|
agent_obj = _resolve_agent(req.agent_id)
|
|
resp = client.chat.completions.create(
|
|
model="deepseek-chat",
|
|
messages=[
|
|
{"role": "system", "content": agent_obj.build_system_prompt()},
|
|
{"role": "user", "content": req.message},
|
|
],
|
|
)
|
|
response = resp.choices[0].message.content
|
|
|
|
return {"response": response, "session_id": req.session_id}
|
|
|
|
|
|
@router.get("/agents")
|
|
def list_agents():
|
|
"""Return all registered agents."""
|
|
return {
|
|
"agents": [
|
|
{
|
|
"agent_id": a.agent_id,
|
|
"description": a.description,
|
|
"skills": a.skills,
|
|
}
|
|
for a in list_all_agents().values()
|
|
]
|
|
}
|
|
|
|
|
|
@router.get("/models")
|
|
def list_models():
|
|
"""Return agents as selectable models for OpenWebUI."""
|
|
return {
|
|
"object": "list",
|
|
"data": [
|
|
{
|
|
"id": a.agent_id,
|
|
"object": "model",
|
|
"created": 0,
|
|
"owned_by": "local-agent",
|
|
}
|
|
for a in list_all_agents().values()
|
|
],
|
|
}
|
|
|
|
|
|
@router.post("/chat/completions")
|
|
async def chat_completions(
|
|
req: ChatCompletionRequest,
|
|
client: OpenAI = Depends(get_llm_client),
|
|
):
|
|
"""OpenAI-compatible /chat/completions — supports stream=True.
|
|
Multi-turn: req.messages contains the FULL conversation history.
|
|
Agent resolved from the model field (OpenWebUI sends this).
|
|
"""
|
|
agent = _resolve_agent(model=req.model)
|
|
|
|
if req.stream:
|
|
async def sse_stream():
|
|
async for token in run_agent_stream(
|
|
client, req.messages, agent_id=agent.agent_id,
|
|
):
|
|
chunk = {
|
|
"id": "chatcmpl-local",
|
|
"object": "chat.completion.chunk",
|
|
"choices": [
|
|
{"index": 0, "delta": {"content": token}, "finish_reason": None}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n"
|
|
final_chunk = {
|
|
"id": "chatcmpl-local",
|
|
"object": "chat.completion.chunk",
|
|
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
|
|
}
|
|
yield f"data: {json.dumps(final_chunk)}\n\n"
|
|
yield "data: [DONE]\n\n"
|
|
|
|
return StreamingResponse(
|
|
sse_stream(),
|
|
media_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
|
|
)
|
|
|
|
# Non-streaming — full history, tool-calling
|
|
tools = get_all_tools(agent.skills)
|
|
if tools:
|
|
response = await run_agent_with_tools(
|
|
client, req.messages, agent_id=agent.agent_id,
|
|
)
|
|
else:
|
|
system_prompt = agent.build_system_prompt()
|
|
full_msgs: list[dict] = [{"role": "system", "content": system_prompt}]
|
|
full_msgs.extend(req.messages)
|
|
resp = client.chat.completions.create(
|
|
model="deepseek-chat", messages=full_msgs,
|
|
)
|
|
response = resp.choices[0].message.content
|
|
|
|
return {
|
|
"id": "chatcmpl-local",
|
|
"object": "chat.completion",
|
|
"created": 0,
|
|
"model": req.model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"message": {"role": "assistant", "content": response},
|
|
"finish_reason": "stop",
|
|
}
|
|
],
|
|
}
|