PydanticAI Streaming Server (FastAPI)
PydanticAI Streaming Server (FastAPI)
Section titled “PydanticAI Streaming Server (FastAPI)”Verified against pydantic-ai==1.101.0 — source modules: pydantic_ai.agent, pydantic_ai.result, pydantic_ai.messages.
PydanticAI’s agent.run_stream() and agent.run_stream_events() produce async iterators that map directly onto Server-Sent Events (SSE). This guide shows three production patterns: plain text streaming, structured output streaming, and raw event streaming.
Installation
Section titled “Installation”pip install "pydantic-ai[openai]" fastapi uvicornPattern 1: Plain text streaming
Section titled “Pattern 1: Plain text streaming”The simplest useful server — streams model tokens as they arrive.
import jsonimport osfrom fastapi import FastAPI, Queryfrom fastapi.responses import StreamingResponsefrom pydantic_ai import Agent
app = FastAPI()
agent = Agent( 'openai:gpt-4o', system_prompt='You are a helpful assistant. Be concise.',)
@app.get('/stream/text')async def stream_text(q: str = Query(..., min_length=1)): async def event_generator(): async with agent.run_stream(q) as stream: async for delta in stream.stream_text(delta=True): # JSON-encode to prevent newlines in delta from breaking SSE framing yield f'data: {json.dumps({"text": delta})}\n\n' # Signal completion; client disconnects on this event yield 'event: done\ndata: {}\n\n'
return StreamingResponse(event_generator(), media_type='text/event-stream')Pattern 2: Structured output streaming
Section titled “Pattern 2: Structured output streaming”Streams partial Pydantic objects as the model fills in each field. Ideal for progress indicators on long structured outputs.
import jsonfrom fastapi import FastAPI, Queryfrom fastapi.responses import StreamingResponsefrom pydantic import BaseModelfrom pydantic_ai import Agent
app = FastAPI()
class ResearchReport(BaseModel): title: str summary: str key_findings: list[str] confidence_score: float
agent = Agent('openai:gpt-4o', output_type=ResearchReport)
@app.get('/stream/structured')async def stream_structured(q: str = Query(...)): async def event_generator(): async with agent.run_stream(q) as stream: # stream_output() yields partial ResearchReport instances # as each field is filled in by the model async for partial in stream.stream_output(debounce_by=0.1): # model_dump excludes None (unfilled) fields payload = partial.model_dump(exclude_none=True) yield f'data: {json.dumps(payload)}\n\n'
# Final validated output final = await stream.get_output() yield f'event: result\ndata: {final.model_dump_json()}\n\n'
yield 'event: done\ndata: {}\n\n'
return StreamingResponse(event_generator(), media_type='text/event-stream')Pattern 3: Raw event streaming
Section titled “Pattern 3: Raw event streaming”Exposes every protocol event — tool calls, tool results, retries — giving the client full visibility into the agent’s reasoning.
import jsonfrom fastapi import FastAPI, Queryfrom fastapi.responses import StreamingResponsefrom pydantic_ai import Agent, RunContextfrom pydantic_ai.messages import ( PartStartEvent, PartDeltaEvent, FunctionToolCallEvent, FunctionToolResultEvent, AgentRunResultEvent,)
app = FastAPI()
agent = Agent( 'openai:gpt-4o', system_prompt='Use the tools available to answer accurately.',)
@agent.toolasync def lookup_price(ctx: RunContext[None], product: str) -> dict: # Replace with real DB / API call return {'product': product, 'price': 9.99, 'currency': 'USD'}
@app.get('/stream/events')async def stream_events(q: str = Query(...)): async def event_generator(): async for event in agent.run_stream_events(q): if isinstance(event, PartStartEvent): data = {'type': 'part_start', 'kind': event.part.part_kind} yield f'data: {json.dumps(data)}\n\n'
elif isinstance(event, PartDeltaEvent): # Text deltas — send the incremental token delta = getattr(event.delta, 'content_delta', None) if delta: data = {'type': 'text_delta', 'delta': delta} yield f'data: {json.dumps(data)}\n\n'
elif isinstance(event, FunctionToolCallEvent): data = { 'type': 'tool_call', 'tool': event.part.tool_name, 'args': event.part.args_as_dict(), } yield f'data: {json.dumps(data)}\n\n'
elif isinstance(event, FunctionToolResultEvent): data = { 'type': 'tool_result', 'tool': event.result.tool_name, 'content': str(event.result.content)[:500], } yield f'data: {json.dumps(data)}\n\n'
elif isinstance(event, AgentRunResultEvent): data = {'type': 'final', 'output': str(event.result.output)} yield f'event: result\ndata: {json.dumps(data)}\n\n'
yield 'event: done\ndata: {}\n\n'
return StreamingResponse(event_generator(), media_type='text/event-stream')Pattern 4: Multi-turn chat endpoint
Section titled “Pattern 4: Multi-turn chat endpoint”Maintains conversation history across requests using PydanticAI’s message_history.
import jsonfrom fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom pydantic import BaseModelfrom pydantic_ai import Agentfrom pydantic_ai.messages import ModelMessagesTypeAdapter
app = FastAPI()agent = Agent('openai:gpt-4o', system_prompt='You are a helpful assistant.')
# In production use Redis / Postgres — this is an in-memory demo_sessions: dict[str, bytes] = {}
class ChatRequest(BaseModel): session_id: str message: str
@app.post('/chat')async def chat(req: ChatRequest): # Load prior history from the session store prior_json = _sessions.get(req.session_id, b'[]') history = ModelMessagesTypeAdapter.validate_json(prior_json)
async def event_generator(): async with agent.run_stream( req.message, message_history=history, ) as stream: async for chunk in stream.stream_text(delta=True): yield f'data: {json.dumps({"text": chunk})}\n\n'
# Persist updated history after the stream is fully consumed await stream.get_output() updated = ModelMessagesTypeAdapter.dump_json(stream.all_messages()) _sessions[req.session_id] = updated
yield 'event: done\ndata: {}\n\n'
return StreamingResponse(event_generator(), media_type='text/event-stream')Pattern 5: Usage and error reporting in the SSE envelope
Section titled “Pattern 5: Usage and error reporting in the SSE envelope”Report token usage and surface errors back through the SSE stream rather than letting FastAPI generate a 500.
import jsonfrom fastapi import FastAPI, Queryfrom fastapi.responses import StreamingResponsefrom pydantic_ai import Agent, UsageLimitsfrom pydantic_ai.exceptions import UsageLimitExceeded, UnexpectedModelBehavior
app = FastAPI()agent = Agent('openai:gpt-4o')
@app.get('/stream/safe')async def stream_safe(q: str = Query(...)): async def event_generator(): try: async with agent.run_stream( q, usage_limits=UsageLimits(output_tokens_limit=1000), ) as stream: async for delta in stream.stream_text(delta=True): yield f'data: {json.dumps({"text": delta})}\n\n'
await stream.get_output() usage = stream.usage() yield f'event: usage\ndata: {json.dumps({"tokens": usage.total_tokens})}\n\n'
except UsageLimitExceeded as e: yield f'event: error\ndata: {json.dumps({"kind": "budget", "msg": str(e)})}\n\n' except UnexpectedModelBehavior as e: yield f'event: error\ndata: {json.dumps({"kind": "model", "msg": str(e)})}\n\n' finally: yield 'event: done\ndata: {}\n\n'
return StreamingResponse(event_generator(), media_type='text/event-stream')Using the AG UI protocol adapter
Section titled “Using the AG UI protocol adapter”For AG UI-compatible frontends (Vercel AI SDK, CopilotKit), use PydanticAI’s built-in adapter instead of rolling your own SSE:
from fastapi import FastAPIfrom pydantic_ai import Agent
app = FastAPI()agent = Agent('openai:gpt-4o')
# to_ag_ui() returns an ASGI app you mount directlyapp.mount('/ai', agent.to_ag_ui())The AG UI adapter handles the protocol framing, message de/serialisation, and error mapping automatically.
Full application structure
Section titled “Full application structure”import osimport jsonimport loggingfrom contextlib import asynccontextmanagerfrom fastapi import FastAPI, Query, HTTPExceptionfrom fastapi.responses import StreamingResponsefrom pydantic import BaseModelfrom pydantic_ai import Agent, UsageLimits, RunContextfrom pydantic_ai.exceptions import UsageLimitExceeded, UnexpectedModelBehavior
logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)
@asynccontextmanagerasync def lifespan(app: FastAPI): logger.info('Starting up PydanticAI streaming server') yield logger.info('Shutting down')
app = FastAPI(title='PydanticAI Stream API', lifespan=lifespan)
agent = Agent( os.getenv('MODEL', 'openai:gpt-4o'), system_prompt='You are a concise, helpful assistant.',)
@app.get('/health')async def health(): return {'status': 'ok'}
@app.get('/stream')async def stream( q: str = Query(..., min_length=1, max_length=2000), max_tokens: int = Query(default=500, le=4000),): async def gen(): try: async with agent.run_stream( q, usage_limits=UsageLimits(output_tokens_limit=max_tokens), ) as s: async for chunk in s.stream_text(delta=True): yield f'data: {json.dumps({"text": chunk})}\n\n' await s.get_output() usage = s.usage() yield f'event: usage\ndata: {json.dumps({"total_tokens": usage.total_tokens})}\n\n' except UsageLimitExceeded as e: yield f'event: error\ndata: {json.dumps({"error": "budget_exceeded", "detail": str(e)})}\n\n' except UnexpectedModelBehavior as e: logger.exception('model_error') yield f'event: error\ndata: {json.dumps({"error": "model_error", "detail": str(e)})}\n\n' except Exception as e: logger.exception('unexpected_error') yield f'event: error\ndata: {json.dumps({"error": "internal", "detail": "Internal server error"})}\n\n' finally: yield 'event: done\ndata: {}\n\n'
return StreamingResponse(gen(), media_type='text/event-stream')Running the server
Section titled “Running the server”uvicorn app:app --host 0.0.0.0 --port 8000 --reload# Test with curlcurl -N "http://localhost:8000/stream?q=Hello+World"Deployment
Section titled “Deployment”Dockerfile
Section titled “Dockerfile”FROM python:3.12-slimWORKDIR /app
COPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt
COPY . .EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]requirements.txt
Section titled “requirements.txt”pydantic-ai[openai]>=1.99.0fastapi>=0.115uvicorn[standard]>=0.34Kubernetes Deployment
Section titled “Kubernetes Deployment”apiVersion: apps/v1kind: Deploymentmetadata: name: pydanticai-streamspec: replicas: 2 selector: matchLabels: app: pydanticai-stream template: metadata: labels: app: pydanticai-stream spec: containers: - name: app image: ghcr.io/yourorg/pydanticai-stream:latest ports: - containerPort: 8000 env: - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: openai-secrets key: apiKey - name: MODEL value: openai:gpt-4o resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m"---apiVersion: v1kind: Servicemetadata: name: pydanticai-streamspec: selector: app: pydanticai-stream ports: - port: 80 targetPort: 8000Security checklist
Section titled “Security checklist”- Auth: Put API-key or JWT validation in a FastAPI
Dependsdependency before the streaming endpoint executes. - Input validation: Use
Query(min_length=1, max_length=N)to reject oversized prompts early. - Budget: Always set
usage_limits=UsageLimits(output_tokens_limit=...)to prevent runaway costs. - PII: Never log raw prompts or model responses — use structured logging with explicit fields.
- CORS: Configure
fastapi.middleware.cors.CORSMiddlewarewith an explicitallow_originslist. - Rate limiting: Add
slowapior a reverse-proxy rate limit; streaming endpoints hold connections open and are easy to DoS.
Reference
Section titled “Reference”Agent.run_stream(...)—pydantic_ai/agent/abstract.pyAgent.run_stream_events(...)—pydantic_ai/agent/abstract.pyStreamedRunResult.stream_text(delta=True)—pydantic_ai/result.pyStreamedRunResult.stream_output(debounce_by=...)—pydantic_ai/result.pyStreamedRunResult.get_output()—pydantic_ai/result.pyStreamedRunResult.usage()—pydantic_ai/result.pyModelMessagesTypeAdapter—pydantic_ai/messages.pyAgent.to_ag_ui()—pydantic_ai/agent(mounts apydantic_ai.ui.ag_uiASGI app)UsageLimits,UsageLimitExceeded—pydantic_ai/usage.py