From 54ac77ab511a6f4b0bc378ebf9364a9cfa35f0f7 Mon Sep 17 00:00:00 2001 From: TimHoogervorst Date: Sun, 10 May 2026 18:54:46 +0200 Subject: [PATCH] Implement streaming support for chat endpoint and add async chat completions --- api/v1/chat.py | 126 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 119 insertions(+), 7 deletions(-) diff --git a/api/v1/chat.py b/api/v1/chat.py index b470095..0bdacee 100644 --- a/api/v1/chat.py +++ b/api/v1/chat.py @@ -1,6 +1,9 @@ from fastapi import APIRouter, Body, Depends +from fastapi.responses import StreamingResponse from openai import OpenAI from pydantic import BaseModel +import json +import asyncio from api.dependencies import get_llm_client @@ -12,7 +15,18 @@ class ChatRequest(BaseModel): session_id: str | None = None +class ChatCompletionRequest(BaseModel): + messages: list[dict] + stream: bool = False + model: str = "deepseek-chat" + + +# --------------------------------------------------------------------------- +# Core helpers +# --------------------------------------------------------------------------- + def run_agent(client: OpenAI, message: str, session_id: str | None = None) -> str: + """Non-streaming: returns the full response as a single string.""" response = client.chat.completions.create( model="deepseek-chat", messages=[ @@ -23,13 +37,68 @@ def run_agent(client: OpenAI, message: str, session_id: str | None = None) -> st return response.choices[0].message.content +async def run_agent_stream(client: OpenAI, message: str, session_id: str | None = None): + """Async generator that yields text tokens as they arrive from the LLM.""" + loop = asyncio.get_running_loop() + + # OpenAI's sync streaming iterator must run in a thread so it doesn't block the event loop + def _sync_stream(): + stream = client.chat.completions.create( + model="deepseek-chat", + messages=[ + {"role": "system", "content": "You are a helpful agent."}, + {"role": "user", "content": message}, + ], + stream=True, + ) + for chunk in stream: + delta = chunk.choices[0].delta + if delta and delta.content: + yield delta.content + + # Run the sync generator in a thread, yield results back to the async world + gen = _sync_stream() + while True: + token = await loop.run_in_executor(None, next, gen, None) + if token is None: + break + yield token + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + @router.get("/") def root(): return {"status": "ok"} @router.post("/chat") -def chat(req: ChatRequest, client: OpenAI = Depends(get_llm_client)): +async def chat(req: ChatRequest, client: OpenAI = Depends(get_llm_client)): + """Streaming chat endpoint — returns Server-Sent Events.""" + async def event_stream(): + async for token in run_agent_stream(client, req.message, req.session_id): + payload = json.dumps({"token": token, "session_id": req.session_id}) + yield f"data: {payload}\n\n" + + # Signal completion + yield f"data: {json.dumps({'done': True, 'session_id': req.session_id})}\n\n" + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable nginx buffering if behind a proxy + }, + ) + + +@router.post("/chat/sync") +def chat_sync(req: ChatRequest, client: OpenAI = Depends(get_llm_client)): + """Non-streaming fallback — returns the full response at once.""" response = run_agent(client, req.message, req.session_id) return {"response": response, "session_id": req.session_id} @@ -44,27 +113,70 @@ def list_models(): "object": "model", "created": 0, "owned_by": "local-agent", - } + }, ], } @router.post("/chat/completions") -def chat_completions( - payload: dict = Body(...), +async def chat_completions( + req: ChatCompletionRequest, client: OpenAI = Depends(get_llm_client), ): - messages = payload["messages"] - user_message = messages[-1]["content"] - response = run_agent(client, user_message) + """OpenAI-compatible /chat/completions — supports stream=True.""" + user_message = req.messages[-1]["content"] + if req.stream: + async def sse_stream(): + async for token in run_agent_stream(client, user_message): + chunk = { + "id": "chatcmpl-local", + "object": "chat.completion.chunk", + "choices": [ + { + "index": 0, + "delta": {"content": token}, + "finish_reason": None, + } + ], + } + yield f"data: {json.dumps(chunk)}\n\n" + # Final chunk with finish_reason + final_chunk = { + "id": "chatcmpl-local", + "object": "chat.completion.chunk", + "choices": [ + { + "index": 0, + "delta": {}, + "finish_reason": "stop", + } + ], + } + yield f"data: {json.dumps(final_chunk)}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse( + sse_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + + # Non-streaming path + response = run_agent(client, user_message) return { "id": "chatcmpl-local", "object": "chat.completion", + "created": 0, + "model": req.model, "choices": [ { "index": 0, "message": {"role": "assistant", "content": response}, + "finish_reason": "stop", } ], } \ No newline at end of file