Implement streaming support for chat endpoint and add async chat completions
Build and Push Agent API / build (push) Successful in 5s
Build and Push Agent API / build (push) Successful in 5s
This commit is contained in:
+119
-7
@@ -1,6 +1,9 @@
|
|||||||
from fastapi import APIRouter, Body, Depends
|
from fastapi import APIRouter, Body, Depends
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
from openai import OpenAI
|
from openai import OpenAI
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
import json
|
||||||
|
import asyncio
|
||||||
|
|
||||||
from api.dependencies import get_llm_client
|
from api.dependencies import get_llm_client
|
||||||
|
|
||||||
@@ -12,7 +15,18 @@ class ChatRequest(BaseModel):
|
|||||||
session_id: str | None = None
|
session_id: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class ChatCompletionRequest(BaseModel):
|
||||||
|
messages: list[dict]
|
||||||
|
stream: bool = False
|
||||||
|
model: str = "deepseek-chat"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Core helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def run_agent(client: OpenAI, message: str, session_id: str | None = None) -> str:
|
def run_agent(client: OpenAI, message: str, session_id: str | None = None) -> str:
|
||||||
|
"""Non-streaming: returns the full response as a single string."""
|
||||||
response = client.chat.completions.create(
|
response = client.chat.completions.create(
|
||||||
model="deepseek-chat",
|
model="deepseek-chat",
|
||||||
messages=[
|
messages=[
|
||||||
@@ -23,13 +37,68 @@ def run_agent(client: OpenAI, message: str, session_id: str | None = None) -> st
|
|||||||
return response.choices[0].message.content
|
return response.choices[0].message.content
|
||||||
|
|
||||||
|
|
||||||
|
async def run_agent_stream(client: OpenAI, message: str, session_id: str | None = None):
|
||||||
|
"""Async generator that yields text tokens as they arrive from the LLM."""
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
# OpenAI's sync streaming iterator must run in a thread so it doesn't block the event loop
|
||||||
|
def _sync_stream():
|
||||||
|
stream = client.chat.completions.create(
|
||||||
|
model="deepseek-chat",
|
||||||
|
messages=[
|
||||||
|
{"role": "system", "content": "You are a helpful agent."},
|
||||||
|
{"role": "user", "content": message},
|
||||||
|
],
|
||||||
|
stream=True,
|
||||||
|
)
|
||||||
|
for chunk in stream:
|
||||||
|
delta = chunk.choices[0].delta
|
||||||
|
if delta and delta.content:
|
||||||
|
yield delta.content
|
||||||
|
|
||||||
|
# Run the sync generator in a thread, yield results back to the async world
|
||||||
|
gen = _sync_stream()
|
||||||
|
while True:
|
||||||
|
token = await loop.run_in_executor(None, next, gen, None)
|
||||||
|
if token is None:
|
||||||
|
break
|
||||||
|
yield token
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Endpoints
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
@router.get("/")
|
@router.get("/")
|
||||||
def root():
|
def root():
|
||||||
return {"status": "ok"}
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
@router.post("/chat")
|
@router.post("/chat")
|
||||||
def chat(req: ChatRequest, client: OpenAI = Depends(get_llm_client)):
|
async def chat(req: ChatRequest, client: OpenAI = Depends(get_llm_client)):
|
||||||
|
"""Streaming chat endpoint — returns Server-Sent Events."""
|
||||||
|
async def event_stream():
|
||||||
|
async for token in run_agent_stream(client, req.message, req.session_id):
|
||||||
|
payload = json.dumps({"token": token, "session_id": req.session_id})
|
||||||
|
yield f"data: {payload}\n\n"
|
||||||
|
|
||||||
|
# Signal completion
|
||||||
|
yield f"data: {json.dumps({'done': True, 'session_id': req.session_id})}\n\n"
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_stream(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
"X-Accel-Buffering": "no", # Disable nginx buffering if behind a proxy
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/chat/sync")
|
||||||
|
def chat_sync(req: ChatRequest, client: OpenAI = Depends(get_llm_client)):
|
||||||
|
"""Non-streaming fallback — returns the full response at once."""
|
||||||
response = run_agent(client, req.message, req.session_id)
|
response = run_agent(client, req.message, req.session_id)
|
||||||
return {"response": response, "session_id": req.session_id}
|
return {"response": response, "session_id": req.session_id}
|
||||||
|
|
||||||
@@ -44,27 +113,70 @@ def list_models():
|
|||||||
"object": "model",
|
"object": "model",
|
||||||
"created": 0,
|
"created": 0,
|
||||||
"owned_by": "local-agent",
|
"owned_by": "local-agent",
|
||||||
}
|
},
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@router.post("/chat/completions")
|
@router.post("/chat/completions")
|
||||||
def chat_completions(
|
async def chat_completions(
|
||||||
payload: dict = Body(...),
|
req: ChatCompletionRequest,
|
||||||
client: OpenAI = Depends(get_llm_client),
|
client: OpenAI = Depends(get_llm_client),
|
||||||
):
|
):
|
||||||
messages = payload["messages"]
|
"""OpenAI-compatible /chat/completions — supports stream=True."""
|
||||||
user_message = messages[-1]["content"]
|
user_message = req.messages[-1]["content"]
|
||||||
response = run_agent(client, user_message)
|
|
||||||
|
|
||||||
|
if req.stream:
|
||||||
|
async def sse_stream():
|
||||||
|
async for token in run_agent_stream(client, user_message):
|
||||||
|
chunk = {
|
||||||
|
"id": "chatcmpl-local",
|
||||||
|
"object": "chat.completion.chunk",
|
||||||
|
"choices": [
|
||||||
|
{
|
||||||
|
"index": 0,
|
||||||
|
"delta": {"content": token},
|
||||||
|
"finish_reason": None,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
yield f"data: {json.dumps(chunk)}\n\n"
|
||||||
|
# Final chunk with finish_reason
|
||||||
|
final_chunk = {
|
||||||
|
"id": "chatcmpl-local",
|
||||||
|
"object": "chat.completion.chunk",
|
||||||
|
"choices": [
|
||||||
|
{
|
||||||
|
"index": 0,
|
||||||
|
"delta": {},
|
||||||
|
"finish_reason": "stop",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
yield f"data: {json.dumps(final_chunk)}\n\n"
|
||||||
|
yield "data: [DONE]\n\n"
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
sse_stream(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Non-streaming path
|
||||||
|
response = run_agent(client, user_message)
|
||||||
return {
|
return {
|
||||||
"id": "chatcmpl-local",
|
"id": "chatcmpl-local",
|
||||||
"object": "chat.completion",
|
"object": "chat.completion",
|
||||||
|
"created": 0,
|
||||||
|
"model": req.model,
|
||||||
"choices": [
|
"choices": [
|
||||||
{
|
{
|
||||||
"index": 0,
|
"index": 0,
|
||||||
"message": {"role": "assistant", "content": response},
|
"message": {"role": "assistant", "content": response},
|
||||||
|
"finish_reason": "stop",
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user