Skip to content

PydanticAI: Streaming Output & Events

Verified against pydantic-ai==1.85.1 — source modules: pydantic_ai.result, pydantic_ai.agent.abstract, pydantic_ai.run.

PydanticAI streams at three levels:

  1. Text / output tokensagent.run_stream(...)StreamedRunResult.stream_text() / .stream_output().
  2. Model protocol eventsAgentStream events (PartStartEvent, PartDeltaEvent, PartEndEvent, FunctionToolCallEvent, …).
  3. Graph nodesagent.iter(...) yields UserPromptNode, ModelRequestNode, CallToolsNode, End.

Pick the right one for your use case.

from pydantic_ai import Agent
agent = Agent('openai:gpt-5.2')
async def main():
async with agent.run_stream('Tell me a joke.') as stream:
async for chunk in stream.stream_text(delta=True):
print(chunk, end='', flush=True)
print('\n---')
print('final:', await stream.get_output())

run_stream is an @asynccontextmanager. You must enter it (async with) and iterate to the end — exiting early cancels the underlying request.

APIReturnsWhen to use
agent.run_stream(...)StreamedRunResultYou want validated output streamed as it arrives. Stops at first final output.
agent.run_stream_events(...)AsyncIterator[AgentStreamEvent | AgentRunResultEvent]You want raw protocol events for the full run (including tool calls, retries).
async with agent.iter(...) as run:AgentRun (iterate nodes)You want to inspect / interleave nodes, drive the graph manually, or branch.
agent.run_stream_sync(...)StreamedRunResultSyncSync code path (CLI, notebook). Wraps the async version.
event_stream_handler= on runside-channel onlyYou want a fire-and-forget tap into events without changing the return type.

run_stream — validated output, token by token

Section titled “run_stream — validated output, token by token”
from pydantic import BaseModel
from pydantic_ai import Agent
class City(BaseModel):
name: str
population: int
agent = Agent('openai:gpt-5.2', output_type=City)
async def main():
async with agent.run_stream('Tell me about Paris.') as stream:
async for partial in stream.stream_output():
print(partial) # City with progressively-filled fields
final: City = await stream.get_output()

Key methods on StreamedRunResult (result.py:328):

  • stream_output(debounce_by=0.1) — yields the output type repeatedly as Pydantic partial-validates. Useful for structured progress bars.
  • stream_text(delta=False, debounce_by=0.1) — text-only streams. delta=True yields token deltas; False (default) yields cumulative strings.
  • stream_responses(debounce_by=0.1) — yields raw ModelResponse snapshots (all parts so far).
  • get_output() — awaits the full response, runs output validators, returns the final typed value.
  • all_messages() / new_messages() / *_json() — once the stream is drained.
  • is_complete: bool — set after any terminal stream method.

debounce_by groups deltas in a debounce_by-second window before yielding. Keep it ≥50 ms for structured output (parses per chunk) and None if you want every single token.

stream_text(delta=True) skips validators. get_output() runs the full output pipeline (schema + validators) on the assembled content — call it to assert the final value is well-formed before persisting.

Use when you want every tool call, retry, and delta observed by the graph, not just the final output:

from pydantic_ai.messages import (
PartStartEvent, PartDeltaEvent, FunctionToolCallEvent, FunctionToolResultEvent,
)
async for event in agent.run_stream_events('Search docs for X.'):
if isinstance(event, PartStartEvent):
print(f'[start part kind={event.part.part_kind}]')
elif isinstance(event, PartDeltaEvent):
print('Δ', event.delta)
elif isinstance(event, FunctionToolCallEvent):
print(f'→ {event.part.tool_name}({event.part.args_as_json_str()})')
elif isinstance(event, FunctionToolResultEvent):
print(f'← {event.result.tool_name} = {event.result.content!r}')

Event types (messages.py):

  • PartStartEvent — a new part (text, thinking, tool call, …) began.
  • PartDeltaEvent — incremental update to the current part. event.delta is a TextPartDelta / ThinkingPartDelta / ToolCallPartDelta.
  • PartEndEvent — the part finished.
  • FunctionToolCallEvent — an agent-defined tool is about to be called.
  • FunctionToolResultEvent — that tool finished.
  • BuiltinToolCallEvent / BuiltinToolResultEvent — built-in tools (web search, etc.).
  • FinalResultEvent — the step that produced the final output (appears once).
  • AgentRunResultEvent — the terminating event for run_stream_events; carries the AgentRunResult.

agent.iter(...) — drive the graph yourself

Section titled “agent.iter(...) — drive the graph yourself”
async with agent.iter('What is 2 + 2?') as run:
async for node in run:
match node.__class__.__name__:
case 'UserPromptNode': print('> user prompt')
case 'ModelRequestNode': print('> model request')
case 'CallToolsNode': print('> tool call/return step')
case 'End': print('> done', node.data)
# After iteration, the result is available:
result = run.result # AgentRunResult[OutputDataT] | None

You can also await run.next(my_node) to drive step-by-step, or call run.ctx.state / run.ctx.deps to introspect.

event_stream_handler= — tap events without changing API shape

Section titled “event_stream_handler= — tap events without changing API shape”

Any of run, run_sync, or iter accept an event_stream_handler: EventStreamHandler[Deps] (agent/abstract.py:68), a callable (RunContext, AsyncIterable[AgentStreamEvent]) -> Awaitable[None] that fires with the event stream. The run’s return type is unchanged.

async def tap(ctx, events):
async for e in events:
metrics.observe(e.__class__.__name__)
result = await agent.run('hi', event_stream_handler=tap)

agent.run_stream_sync(...) returns a StreamedRunResultSync (result.py:637). All stream methods yield sync iterators — handy for CLI tools.

with agent.run_stream_sync('hi') as stream:
for chunk in stream.stream_text(delta=True):
print(chunk, end='')
final = stream.get_output()

Under the hood this wraps the async streamer in sync_async_iterator; don’t call it from within an already-running event loop.

  • Structured output + partial validation: Pydantic’s experimental partial validator is enabled on each stream_output yield. Required fields that haven’t arrived are left unset.
  • Text streaming: no validation during delta streaming. Validators only run inside get_output().
  • Debounce default (0.1 s) is chosen to balance perceived latency vs. per-chunk parsing cost. Set None for token-by-token; set larger values (0.250.5) for long structured outputs.

The event stream slots straight into Server-Sent Events or NDJSON:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic_ai import Agent
app = FastAPI()
agent = Agent('openai:gpt-5.2')
@app.get('/stream')
async def stream_endpoint(q: str):
async def gen():
async with agent.run_stream(q) as s:
async for chunk in s.stream_text(delta=True):
yield f'data: {chunk}\n\n'
yield 'event: done\ndata: {}\n\n'
return StreamingResponse(gen(), media_type='text/event-stream')

For the AG UI / Vercel AI SDK protocols, use the dedicated adapters (pydantic_ai.ui.ag_ui.AGUIAdapter, pydantic_ai.ui.vercel_ai) rather than rolling your own SSE. See the integrations guide.

1. Cumulative text that the UI can overwrite

Section titled “1. Cumulative text that the UI can overwrite”
async with agent.run_stream(q) as s:
async for full in s.stream_text(delta=False):
ui.replace_area(full) # overwrites, always shows cumulative text

2. Progressive structured output with fallback

Section titled “2. Progressive structured output with fallback”
last: City | None = None
async with agent.run_stream(q) as s:
try:
async for partial in s.stream_output(debounce_by=0.25):
last = partial
ui.render(partial)
except Exception:
ui.render(last) # show best-effort if streaming breaks
raise
final = await s.get_output()
async with agent.run_stream(q) as s:
async for _ in s.stream_text(delta=True):
pass
await s.get_output()
used = s.usage() # RunUsage available via the public API once streaming is done
print(used)

4. Tool-call tracing via run_stream_events

Section titled “4. Tool-call tracing via run_stream_events”
async for e in agent.run_stream_events(q):
if isinstance(e, FunctionToolCallEvent):
logger.info('tool_call', name=e.part.tool_name, args=e.part.args)
elif isinstance(e, FunctionToolResultEvent):
logger.info('tool_result', name=e.result.tool_name)
stop = False
async with agent.run_stream(q) as s:
async for chunk in s.stream_text(delta=True):
ui.append(chunk)
if ui.cancelled():
stop = True
break
# context exit sends the cancellation downstream; no need to await get_output()

Do not break out of run_stream and then try to reuse the stream — close the context and call run_stream again.

  • run_stream stops at the first final output. Tool calls after that are skipped. Use iter or run + event_stream_handler if you want the full graph to execute.
  • Validators only run once (in get_output() or on the final yield of stream_output). Partial yields are best-effort.
  • Don’t mix stream_text(delta=True) and stream_output() on the same result — both drain the underlying iterator.
  • run_stream_sync from an async caller will raise; use run_stream directly.
  • OpenAI strict-mode structured output streams schemas that Pydantic’s partial validator cannot always consume cleanly. If you see ValidationError spam during partial yields, raise debounce_by or use text streaming + one-shot get_output().
  • Agent.run_stream(...)agent/abstract.py:518
  • Agent.run_stream_events(...)agent/abstract.py:934
  • Agent.run_stream_sync(...)agent/abstract.py:794
  • Agent.iter(...)agent/__init__.py:952
  • StreamedRunResultresult.py:328
  • AgentStreamresult.py:48
  • AgentStreamEvent union — messages.py (PartStart, PartDelta, PartEnd, FunctionToolCall, FunctionToolResult, BuiltinToolCall, BuiltinToolResult, FinalResult)
  • EventStreamHandler type alias — agent/abstract.py:68