PydanticAI: Streaming Output & Events
Streaming
Section titled “Streaming”Verified against pydantic-ai==1.85.1 — source modules: pydantic_ai.result, pydantic_ai.agent.abstract, pydantic_ai.run.
PydanticAI streams at three levels:
- Text / output tokens —
agent.run_stream(...)→StreamedRunResult.stream_text()/.stream_output(). - Model protocol events —
AgentStreamevents (PartStartEvent,PartDeltaEvent,PartEndEvent,FunctionToolCallEvent, …). - Graph nodes —
agent.iter(...)yieldsUserPromptNode,ModelRequestNode,CallToolsNode,End.
Pick the right one for your use case.
Minimal runnable example
Section titled “Minimal runnable example”from pydantic_ai import Agent
agent = Agent('openai:gpt-5.2')
async def main(): async with agent.run_stream('Tell me a joke.') as stream: async for chunk in stream.stream_text(delta=True): print(chunk, end='', flush=True) print('\n---') print('final:', await stream.get_output())run_stream is an @asynccontextmanager. You must enter it (async with) and iterate to the end — exiting early cancels the underlying request.
The three streaming APIs
Section titled “The three streaming APIs”| API | Returns | When to use |
|---|---|---|
agent.run_stream(...) | StreamedRunResult | You want validated output streamed as it arrives. Stops at first final output. |
agent.run_stream_events(...) | AsyncIterator[AgentStreamEvent | AgentRunResultEvent] | You want raw protocol events for the full run (including tool calls, retries). |
async with agent.iter(...) as run: | AgentRun (iterate nodes) | You want to inspect / interleave nodes, drive the graph manually, or branch. |
agent.run_stream_sync(...) | StreamedRunResultSync | Sync code path (CLI, notebook). Wraps the async version. |
event_stream_handler= on run | side-channel only | You want a fire-and-forget tap into events without changing the return type. |
run_stream — validated output, token by token
Section titled “run_stream — validated output, token by token”from pydantic import BaseModelfrom pydantic_ai import Agent
class City(BaseModel): name: str population: int
agent = Agent('openai:gpt-5.2', output_type=City)
async def main(): async with agent.run_stream('Tell me about Paris.') as stream: async for partial in stream.stream_output(): print(partial) # City with progressively-filled fields final: City = await stream.get_output()Key methods on StreamedRunResult (result.py:328):
stream_output(debounce_by=0.1)— yields the output type repeatedly as Pydantic partial-validates. Useful for structured progress bars.stream_text(delta=False, debounce_by=0.1)— text-only streams.delta=Trueyields token deltas;False(default) yields cumulative strings.stream_responses(debounce_by=0.1)— yields rawModelResponsesnapshots (all parts so far).get_output()— awaits the full response, runs output validators, returns the final typed value.all_messages() / new_messages() / *_json()— once the stream is drained.is_complete: bool— set after any terminal stream method.
debounce_by groups deltas in a debounce_by-second window before yielding. Keep it ≥50 ms for structured output (parses per chunk) and None if you want every single token.
Why get_output() after streaming text?
Section titled “Why get_output() after streaming text?”stream_text(delta=True) skips validators. get_output() runs the full output pipeline (schema + validators) on the assembled content — call it to assert the final value is well-formed before persisting.
run_stream_events — raw protocol events
Section titled “run_stream_events — raw protocol events”Use when you want every tool call, retry, and delta observed by the graph, not just the final output:
from pydantic_ai.messages import ( PartStartEvent, PartDeltaEvent, FunctionToolCallEvent, FunctionToolResultEvent,)
async for event in agent.run_stream_events('Search docs for X.'): if isinstance(event, PartStartEvent): print(f'[start part kind={event.part.part_kind}]') elif isinstance(event, PartDeltaEvent): print('Δ', event.delta) elif isinstance(event, FunctionToolCallEvent): print(f'→ {event.part.tool_name}({event.part.args_as_json_str()})') elif isinstance(event, FunctionToolResultEvent): print(f'← {event.result.tool_name} = {event.result.content!r}')Event types (messages.py):
PartStartEvent— a new part (text, thinking, tool call, …) began.PartDeltaEvent— incremental update to the current part.event.deltais aTextPartDelta/ThinkingPartDelta/ToolCallPartDelta.PartEndEvent— the part finished.FunctionToolCallEvent— an agent-defined tool is about to be called.FunctionToolResultEvent— that tool finished.BuiltinToolCallEvent/BuiltinToolResultEvent— built-in tools (web search, etc.).FinalResultEvent— the step that produced the final output (appears once).AgentRunResultEvent— the terminating event forrun_stream_events; carries theAgentRunResult.
agent.iter(...) — drive the graph yourself
Section titled “agent.iter(...) — drive the graph yourself”async with agent.iter('What is 2 + 2?') as run: async for node in run: match node.__class__.__name__: case 'UserPromptNode': print('> user prompt') case 'ModelRequestNode': print('> model request') case 'CallToolsNode': print('> tool call/return step') case 'End': print('> done', node.data)
# After iteration, the result is available:result = run.result # AgentRunResult[OutputDataT] | NoneYou can also await run.next(my_node) to drive step-by-step, or call run.ctx.state / run.ctx.deps to introspect.
event_stream_handler= — tap events without changing API shape
Section titled “event_stream_handler= — tap events without changing API shape”Any of run, run_sync, or iter accept an event_stream_handler: EventStreamHandler[Deps] (agent/abstract.py:68), a callable (RunContext, AsyncIterable[AgentStreamEvent]) -> Awaitable[None] that fires with the event stream. The run’s return type is unchanged.
async def tap(ctx, events): async for e in events: metrics.observe(e.__class__.__name__)
result = await agent.run('hi', event_stream_handler=tap)Sync streaming — run_stream_sync
Section titled “Sync streaming — run_stream_sync”agent.run_stream_sync(...) returns a StreamedRunResultSync (result.py:637). All stream methods yield sync iterators — handy for CLI tools.
with agent.run_stream_sync('hi') as stream: for chunk in stream.stream_text(delta=True): print(chunk, end='') final = stream.get_output()Under the hood this wraps the async streamer in sync_async_iterator; don’t call it from within an already-running event loop.
Debouncing and partial validation
Section titled “Debouncing and partial validation”- Structured output + partial validation: Pydantic’s experimental partial validator is enabled on each
stream_outputyield. Required fields that haven’t arrived are left unset. - Text streaming: no validation during delta streaming. Validators only run inside
get_output(). - Debounce default (
0.1s) is chosen to balance perceived latency vs. per-chunk parsing cost. SetNonefor token-by-token; set larger values (0.25–0.5) for long structured outputs.
Streaming over HTTP (FastAPI)
Section titled “Streaming over HTTP (FastAPI)”The event stream slots straight into Server-Sent Events or NDJSON:
from fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom pydantic_ai import Agent
app = FastAPI()agent = Agent('openai:gpt-5.2')
@app.get('/stream')async def stream_endpoint(q: str): async def gen(): async with agent.run_stream(q) as s: async for chunk in s.stream_text(delta=True): yield f'data: {chunk}\n\n' yield 'event: done\ndata: {}\n\n' return StreamingResponse(gen(), media_type='text/event-stream')For the AG UI / Vercel AI SDK protocols, use the dedicated adapters (pydantic_ai.ui.ag_ui.AGUIAdapter, pydantic_ai.ui.vercel_ai) rather than rolling your own SSE. See the integrations guide.
Patterns
Section titled “Patterns”1. Cumulative text that the UI can overwrite
Section titled “1. Cumulative text that the UI can overwrite”async with agent.run_stream(q) as s: async for full in s.stream_text(delta=False): ui.replace_area(full) # overwrites, always shows cumulative text2. Progressive structured output with fallback
Section titled “2. Progressive structured output with fallback”last: City | None = Noneasync with agent.run_stream(q) as s: try: async for partial in s.stream_output(debounce_by=0.25): last = partial ui.render(partial) except Exception: ui.render(last) # show best-effort if streaming breaks raise final = await s.get_output()3. Track token usage while streaming
Section titled “3. Track token usage while streaming”async with agent.run_stream(q) as s: async for _ in s.stream_text(delta=True): pass await s.get_output() used = s.usage() # RunUsage available via the public API once streaming is doneprint(used)4. Tool-call tracing via run_stream_events
Section titled “4. Tool-call tracing via run_stream_events”async for e in agent.run_stream_events(q): if isinstance(e, FunctionToolCallEvent): logger.info('tool_call', name=e.part.tool_name, args=e.part.args) elif isinstance(e, FunctionToolResultEvent): logger.info('tool_result', name=e.result.tool_name)5. Cancel mid-stream
Section titled “5. Cancel mid-stream”stop = Falseasync with agent.run_stream(q) as s: async for chunk in s.stream_text(delta=True): ui.append(chunk) if ui.cancelled(): stop = True break# context exit sends the cancellation downstream; no need to await get_output()Do not break out of run_stream and then try to reuse the stream — close the context and call run_stream again.
Gotchas
Section titled “Gotchas”run_streamstops at the first final output. Tool calls after that are skipped. Useiterorrun+event_stream_handlerif you want the full graph to execute.- Validators only run once (in
get_output()or on the final yield ofstream_output). Partial yields are best-effort. - Don’t mix
stream_text(delta=True)andstream_output()on the same result — both drain the underlying iterator. run_stream_syncfrom an async caller will raise; userun_streamdirectly.- OpenAI strict-mode structured output streams schemas that Pydantic’s partial validator cannot always consume cleanly. If you see
ValidationErrorspam during partial yields, raisedebounce_byor use text streaming + one-shotget_output().
Reference
Section titled “Reference”Agent.run_stream(...)—agent/abstract.py:518Agent.run_stream_events(...)—agent/abstract.py:934Agent.run_stream_sync(...)—agent/abstract.py:794Agent.iter(...)—agent/__init__.py:952StreamedRunResult—result.py:328AgentStream—result.py:48AgentStreamEventunion —messages.py(PartStart,PartDelta,PartEnd,FunctionToolCall,FunctionToolResult,BuiltinToolCall,BuiltinToolResult,FinalResult)EventStreamHandlertype alias —agent/abstract.py:68