Add agent and skill system: implement Agent and Skill classes, register media and naked agents, and create media_info demo skill
Build and Push Agent API / build (push) Successful in 5s

This commit is contained in:
2026-05-10 19:24:44 +02:00
parent 54ac77ab51
commit cb4ebfa43e
7 changed files with 276 additions and 22 deletions
+64
View File
@@ -0,0 +1,64 @@
"""
Agent system — each agent combines a base LLM with optional skills
to produce tailored system prompts and behavior.
An Agent is a lightweight wrapper:
- agent_id : unique name (e.g. "naked", "media-agent")
- description : human-readable summary
- skills : list of skill names to load
- base_prompt : default system prompt (optional — falls back to generic)
"""
from dataclasses import dataclass, field
from typing import Dict, List
from skills import Skill, get_combined_prompt, list_all as list_all_skills
@dataclass
class Agent:
agent_id: str
description: str = ""
skills: List[str] = field(default_factory=list)
base_prompt: str = "You are a helpful agent."
def build_system_prompt(self) -> str:
"""Combine base_prompt with all registered skills' prompt fragments."""
return get_combined_prompt(self.skills, base_prompt=self.base_prompt)
def __repr__(self) -> str:
sk = ", ".join(self.skills) if self.skills else "none"
return f"Agent(id={self.agent_id!r}, skills=[{sk}])"
# ---------------------------------------------------------------------------
# Global agent registry
# ---------------------------------------------------------------------------
_agent_registry: Dict[str, Agent] = {}
def register(agent: Agent) -> None:
"""Register an agent so it can be looked up by agent_id."""
_agent_registry[agent.agent_id] = agent
def get(agent_id: str) -> Agent | None:
"""Return a registered agent by id, or None."""
return _agent_registry.get(agent_id)
def list_all() -> Dict[str, Agent]:
"""Return a shallow copy of the registry."""
return dict(_agent_registry)
def load_all_agents() -> None:
"""
Import all agent modules so they self-register.
Call this once at startup.
"""
import agents.naked # noqa: F401
import agents.media_agent # noqa: F401
# Also import skill modules so they self-register
import skills.media_info # noqa: F401
+19
View File
@@ -0,0 +1,19 @@
"""
media-agent — an agent that knows how to handle media queries
(Jellyfin / Sonarr / Seerr / subtitle requests).
For now it only loads the *media_info* demo skill which teaches it
a structured response format. Later you'll add real API-calling skills.
"""
from agents import Agent, register
media_agent = Agent(
agent_id="media-agent",
description="Media assistant — handles movie/TV/subtitle/ticket requests. "
"Will eventually connect to Seerr, Sonarr, Jellyfin, etc.",
skills=["media_info"],
base_prompt="You are a media assistant. Help users with their media library.",
)
register(media_agent)
+15
View File
@@ -0,0 +1,15 @@
"""
naked agent — a barebone LLM with no extra skills attached.
Just a thin wrapper that instructs the LLM to be a general helpful assistant.
"""
from agents import Agent, register
naked_agent = Agent(
agent_id="naked",
description="A plain LLM — no extra skills, just a helpful assistant.",
skills=[], # no skills
base_prompt="You are a helpful, general-purpose assistant.",
)
register(naked_agent)
+74 -19
View File
@@ -6,6 +6,7 @@ import json
import asyncio import asyncio
from api.dependencies import get_llm_client from api.dependencies import get_llm_client
from agents import get as get_agent, list_all as list_all_agents
router = APIRouter() router = APIRouter()
@@ -13,6 +14,7 @@ router = APIRouter()
class ChatRequest(BaseModel): class ChatRequest(BaseModel):
message: str message: str
session_id: str | None = None session_id: str | None = None
agent_id: str | None = None # which agent to use ("naked", "media-agent", …)
class ChatCompletionRequest(BaseModel): class ChatCompletionRequest(BaseModel):
@@ -25,28 +27,59 @@ class ChatCompletionRequest(BaseModel):
# Core helpers # Core helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def run_agent(client: OpenAI, message: str, session_id: str | None = None) -> str: def _resolve_agent(agent_id: str | None = None, model: str | None = None):
"""Non-streaming: returns the full response as a single string.""" """
Look up the agent. Resolution order:
1. explicit agent_id
2. model name (OpenWebUI sends this — maps to agent_id if registered)
3. fallback to "naked"
"""
lookup = agent_id or model
if lookup is None:
agent = get_agent("naked")
else:
agent = get_agent(lookup)
if agent is None:
agent = get_agent("naked")
return agent
def run_agent(
client: OpenAI,
message: str,
session_id: str | None = None,
agent_id: str | None = None,
model: str | None = None,
) -> str:
"""Non-streaming: uses the chosen agent's system prompt."""
agent = _resolve_agent(agent_id, model)
response = client.chat.completions.create( response = client.chat.completions.create(
model="deepseek-chat", model="deepseek-chat",
messages=[ messages=[
{"role": "system", "content": "You are a helpful agent."}, {"role": "system", "content": agent.build_system_prompt()},
{"role": "user", "content": message}, {"role": "user", "content": message},
], ],
) )
return response.choices[0].message.content return response.choices[0].message.content
async def run_agent_stream(client: OpenAI, message: str, session_id: str | None = None): async def run_agent_stream(
"""Async generator that yields text tokens as they arrive from the LLM.""" client: OpenAI,
message: str,
session_id: str | None = None,
agent_id: str | None = None,
model: str | None = None,
):
"""Async generator — yields tokens using the chosen agent's system prompt."""
agent = _resolve_agent(agent_id, model)
system_prompt = agent.build_system_prompt()
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
# OpenAI's sync streaming iterator must run in a thread so it doesn't block the event loop
def _sync_stream(): def _sync_stream():
stream = client.chat.completions.create( stream = client.chat.completions.create(
model="deepseek-chat", model="deepseek-chat",
messages=[ messages=[
{"role": "system", "content": "You are a helpful agent."}, {"role": "system", "content": system_prompt},
{"role": "user", "content": message}, {"role": "user", "content": message},
], ],
stream=True, stream=True,
@@ -56,7 +89,6 @@ async def run_agent_stream(client: OpenAI, message: str, session_id: str | None
if delta and delta.content: if delta and delta.content:
yield delta.content yield delta.content
# Run the sync generator in a thread, yield results back to the async world
gen = _sync_stream() gen = _sync_stream()
while True: while True:
token = await loop.run_in_executor(None, next, gen, None) token = await loop.run_in_executor(None, next, gen, None)
@@ -78,11 +110,12 @@ def root():
async def chat(req: ChatRequest, client: OpenAI = Depends(get_llm_client)): async def chat(req: ChatRequest, client: OpenAI = Depends(get_llm_client)):
"""Streaming chat endpoint — returns Server-Sent Events.""" """Streaming chat endpoint — returns Server-Sent Events."""
async def event_stream(): async def event_stream():
async for token in run_agent_stream(client, req.message, req.session_id): async for token in run_agent_stream(
client, req.message, req.session_id, req.agent_id,
):
payload = json.dumps({"token": token, "session_id": req.session_id}) payload = json.dumps({"token": token, "session_id": req.session_id})
yield f"data: {payload}\n\n" yield f"data: {payload}\n\n"
# Signal completion
yield f"data: {json.dumps({'done': True, 'session_id': req.session_id})}\n\n" yield f"data: {json.dumps({'done': True, 'session_id': req.session_id})}\n\n"
return StreamingResponse( return StreamingResponse(
@@ -91,7 +124,7 @@ async def chat(req: ChatRequest, client: OpenAI = Depends(get_llm_client)):
headers={ headers={
"Cache-Control": "no-cache", "Cache-Control": "no-cache",
"Connection": "keep-alive", "Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering if behind a proxy "X-Accel-Buffering": "no",
}, },
) )
@@ -99,21 +132,38 @@ async def chat(req: ChatRequest, client: OpenAI = Depends(get_llm_client)):
@router.post("/chat/sync") @router.post("/chat/sync")
def chat_sync(req: ChatRequest, client: OpenAI = Depends(get_llm_client)): def chat_sync(req: ChatRequest, client: OpenAI = Depends(get_llm_client)):
"""Non-streaming fallback — returns the full response at once.""" """Non-streaming fallback — returns the full response at once."""
response = run_agent(client, req.message, req.session_id) response = run_agent(client, req.message, req.session_id, req.agent_id)
return {"response": response, "session_id": req.session_id} return {"response": response, "session_id": req.session_id}
@router.get("/agents")
def list_agents():
"""Return all registered agents with their ids, descriptions, and skills."""
return {
"agents": [
{
"agent_id": a.agent_id,
"description": a.description,
"skills": a.skills,
}
for a in list_all_agents().values()
]
}
@router.get("/models") @router.get("/models")
def list_models(): def list_models():
"""Return all registered agents as selectable models for OpenWebUI."""
return { return {
"object": "list", "object": "list",
"data": [ "data": [
{ {
"id": "agent-model", "id": a.agent_id,
"object": "model", "object": "model",
"created": 0, "created": 0,
"owned_by": "local-agent", "owned_by": "local-agent",
}, }
for a in list_all_agents().values()
], ],
} }
@@ -123,12 +173,17 @@ async def chat_completions(
req: ChatCompletionRequest, req: ChatCompletionRequest,
client: OpenAI = Depends(get_llm_client), client: OpenAI = Depends(get_llm_client),
): ):
"""OpenAI-compatible /chat/completions — supports stream=True.""" """OpenAI-compatible /chat/completions — supports stream=True.
The last message's content is used as the user prompt; defaults to 'naked' agent.
"""
user_message = req.messages[-1]["content"] user_message = req.messages[-1]["content"]
# Resolve agent from the model field (OpenWebUI sends this)
agent = _resolve_agent(model=req.model)
if req.stream: if req.stream:
async def sse_stream(): async def sse_stream():
async for token in run_agent_stream(client, user_message): async for token in run_agent_stream(client, user_message, agent_id=agent.agent_id):
chunk = { chunk = {
"id": "chatcmpl-local", "id": "chatcmpl-local",
"object": "chat.completion.chunk", "object": "chat.completion.chunk",
@@ -141,7 +196,6 @@ async def chat_completions(
], ],
} }
yield f"data: {json.dumps(chunk)}\n\n" yield f"data: {json.dumps(chunk)}\n\n"
# Final chunk with finish_reason
final_chunk = { final_chunk = {
"id": "chatcmpl-local", "id": "chatcmpl-local",
"object": "chat.completion.chunk", "object": "chat.completion.chunk",
@@ -165,8 +219,9 @@ async def chat_completions(
}, },
) )
# Non-streaming path # Non-streaming path — resolve agent from model field
response = run_agent(client, user_message) agent = _resolve_agent(model=req.model)
response = run_agent(client, user_message, agent_id=agent.agent_id)
return { return {
"id": "chatcmpl-local", "id": "chatcmpl-local",
"object": "chat.completion", "object": "chat.completion",
+9 -3
View File
@@ -5,12 +5,18 @@ from api.v1.chat import router as v1_router
from core.config import DEEPSEEK_API_KEY from core.config import DEEPSEEK_API_KEY
from core.llm import create_client from core.llm import create_client
# ---------------------------------------------------------------------------
# Load all agents & skills so they self-register at startup
# ---------------------------------------------------------------------------
from agents import load_all_agents # noqa: E402
load_all_agents()
# ---------------------------------------------------------------------------
# App
# ---------------------------------------------------------------------------
app = FastAPI() app = FastAPI()
# ---------------------------------------------------------------------------
# Middleware
# ---------------------------------------------------------------------------
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=["*"], allow_origins=["*"],
+50
View File
@@ -0,0 +1,50 @@
"""
Skill system — each skill is a piece of domain knowledge or a capability
that can be attached to an agent to shape its behavior and system prompt.
A Skill is a lightweight object with:
- name : short identifier (e.g. "media_info")
- description : human-readable summary
- prompt_fragment : extra text injected into the agent's system prompt
"""
from dataclasses import dataclass, field
from typing import Dict
@dataclass
class Skill:
name: str
description: str
prompt_fragment: str = ""
# ---------------------------------------------------------------------------
# Global skill registry — populated at startup / import time
# ---------------------------------------------------------------------------
_skill_registry: Dict[str, Skill] = {}
def register(skill: Skill) -> None:
"""Register a skill so agents can look it up by name."""
_skill_registry[skill.name] = skill
def get(name: str) -> Skill | None:
"""Return a registered skill by name, or None."""
return _skill_registry.get(name)
def list_all() -> Dict[str, Skill]:
"""Return a shallow copy of the registry."""
return dict(_skill_registry)
def get_combined_prompt(skill_names: list[str], base_prompt: str = "") -> str:
"""Build a system prompt from a base prompt + requested skill fragments."""
parts = [base_prompt] if base_prompt else []
for name in skill_names:
s = get(name)
if s and s.prompt_fragment:
parts.append(s.prompt_fragment)
return "\n\n".join(parts)
+45
View File
@@ -0,0 +1,45 @@
"""
Demo skill: media_info
Gives the agent knowledge about how to respond to media-related queries
(movie / TV / subtitle requests). This is intentionally simple — in the future
you would add real API-calling skills here (Sonarr / Jellyfin / Seerr / etc.).
"""
from skills import Skill, register
media_info_skill = Skill(
name="media_info",
description="Respond to media queries with a structured format "
"(movie / TV show requests, subtitles, tickets).",
prompt_fragment="""## Media Agent Instructions
You are a media assistant. When users ask about movies, TV shows, subtitles,
or media library requests, follow these rules:
- If a user wants to **request** a movie or show, respond with a clear
confirmation using this format:
```
[MEDIA REQUEST]
Title: <title>
Type: <movie | show>
Status: PENDING — this would be submitted to Seerr
```
- If a user asks about **subtitles**, acknowledge the request and respond with:
```
[SUBTITLE REQUEST]
Media: <title>
Language: <language>
Status: PENDING — Bazarr would process this
```
- Otherwise, answer normally but always remind the user that media-backend
integrations (Seerr, Sonarr, Jellyfin) are not yet connected.
This is a **demo** skill. Real API calls will be added later.""",
)
register(media_info_skill)