Skip to content

Pydantic AI: Recipes & Real-World Examples

Pydantic AI: Recipes & Real-World Examples

Section titled “Pydantic AI: Recipes & Real-World Examples”

Version: 1.107.0 (June 2026) Purpose: Practical, production-tested code examples for common scenarios


Recipe 1: Customer Support Chatbot with Database Integration

Section titled “Recipe 1: Customer Support Chatbot with Database Integration”
"""
Production-ready customer support agent that:
- Accesses customer database
- Tracks conversation history
- Validates inputs and outputs
- Handles errors gracefully
"""
from dataclasses import dataclass
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext, ModelRetry
import asyncio
import sqlite3
from datetime import datetime
# Database setup
def init_database():
"""Initialize SQLite database."""
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE customers (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT,
account_status TEXT,
issue_history TEXT
)
''')
cursor.execute('INSERT INTO customers VALUES (1, "John Doe", "john@example.com", "active", "None")')
cursor.execute('INSERT INTO customers VALUES (2, "Jane Smith", "jane@example.com", "premium", "Payment issue")')
conn.commit()
return conn
# Dependencies
@dataclass
class SupportDependencies:
db_connection: sqlite3.Connection
customer_id: int
conversation_history: list[dict]
# Output models
class SupportResponse(BaseModel):
"""Structured support response."""
solution: str = Field(..., description="Detailed solution to customer's problem")
category: str = Field(..., regex='^(technical|billing|general|escalation)$')
confidence: float = Field(..., ge=0.0, le=1.0)
next_steps: list[str] = Field(..., description="Recommended next steps")
# Create agent
support_agent = Agent(
'openai:gpt-4o',
deps_type=SupportDependencies,
output_type=SupportResponse,
name='CustomerSupportAgent'
)
# System prompt with dynamic context
@support_agent.system_prompt
async def customer_context(ctx: RunContext[SupportDependencies]) -> str:
"""Fetch customer context and build system prompt."""
cursor = ctx.deps.db_connection.cursor()
cursor.execute(
'SELECT name, email, account_status FROM customers WHERE id = ?',
(ctx.deps.customer_id,)
)
customer = cursor.fetchone()
if not customer:
return "You are a helpful customer support agent."
name, email, status = customer
return f"""You are a professional customer support representative.
Customer Profile:
- Name: {name}
- Email: {email}
- Account Status: {status}
- Conversation History: {len(ctx.deps.conversation_history)} messages
Guidelines:
1. Be empathetic and professional
2. Provide clear, actionable solutions
3. For premium customers, prioritise faster resolution
4. Escalate if the issue is outside your scope
"""
# Tools for agents
@support_agent.tool
async def get_customer_history(
ctx: RunContext[SupportDependencies],
limit: int = 5
) -> str:
"""Retrieve customer's issue history."""
cursor = ctx.deps.db_connection.cursor()
cursor.execute(
'SELECT issue_history FROM customers WHERE id = ?',
(ctx.deps.customer_id,)
)
result = cursor.fetchone()
return result[0] if result else "No history available"
@support_agent.tool
async def create_support_ticket(
ctx: RunContext[SupportDependencies],
issue_summary: str,
priority: str = 'normal'
) -> str:
"""Create a support ticket for escalation."""
ticket_id = f"TICKET_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return f"Created ticket {ticket_id} with priority {priority}"
# Output validator
@support_agent.output_validator
async def validate_response(
ctx: RunContext[SupportDependencies],
output: SupportResponse
) -> SupportResponse:
"""Validate support response quality."""
if len(output.solution) < 50:
raise ModelRetry(
"Please provide a more detailed solution (at least 50 characters)."
)
if output.confidence < 0.5:
raise ModelRetry(
"Your confidence is low. Please reconsider your response or escalate."
)
return output
# Main execution
async def handle_customer_issue(customer_id: int, issue: str):
"""Handle a customer support issue."""
conn = init_database()
deps = SupportDependencies(
db_connection=conn,
customer_id=customer_id,
conversation_history=[]
)
try:
result = await support_agent.run(
issue,
deps=deps
)
# Store in conversation history
deps.conversation_history.append({
'customer_id': customer_id,
'issue': issue,
'response': result.output,
'timestamp': datetime.now().isoformat()
})
return result.output
finally:
conn.close()
# Usage
if __name__ == '__main__':
response = asyncio.run(handle_customer_issue(
customer_id=1,
issue="I was charged twice for my last purchase!"
))
print(f"Solution: {response.solution}")
print(f"Category: {response.category}")

Recipe 2: Multi-Agent Workflow - Research & Writing Pipeline

Section titled “Recipe 2: Multi-Agent Workflow - Research & Writing Pipeline”
"""
Multi-agent system where agents specialise in different tasks:
1. ResearchAgent: Gathers information
2. WriterAgent: Structures and writes content
3. EditorAgent: Reviews and refines
"""
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from pydantic import BaseModel
@dataclass
class SharedContext:
"""Shared context between agents."""
topic: str
research_notes: str = ""
draft_content: str = ""
feedback: str = ""
# Research Agent
research_agent = Agent(
'openai:gpt-4o',
name='ResearchAgent',
instructions='You are a research specialist. Gather comprehensive information on topics.'
)
@research_agent.tool_plain
def search_knowledge_base(query: str) -> str:
"""Search internal knowledge base."""
# Simulate knowledge base search
return f"Found articles about '{query}' in the knowledge base."
async def research_phase(context: SharedContext) -> str:
"""Research phase of the pipeline."""
research_prompt = f"""
Research the following topic comprehensively:
Topic: {context.topic}
Provide:
1. Key facts and findings
2. Recent developments
3. Expert perspectives
4. Credible sources
"""
result = await research_agent.run(research_prompt)
context.research_notes = result.output
return result.output
# Writer Agent
class Article(BaseModel):
title: str
introduction: str
sections: list[dict] # {'heading': str, 'content': str}
conclusion: str
writer_agent = Agent(
'openai:gpt-4o',
output_type=Article,
name='WriterAgent',
instructions='You are a technical writer. Structure information into clear, engaging articles.'
)
async def writing_phase(context: SharedContext) -> Article:
"""Writing phase using research notes."""
writing_prompt = f"""
Based on this research:
{context.research_notes}
Write a well-structured article on: {context.topic}
Structure:
- Title (catchy and descriptive)
- Introduction (hook the reader)
- 3-4 main sections with clear headings
- Conclusion (key takeaways)
"""
result = await writer_agent.run(writing_prompt)
context.draft_content = str(result.output)
return result.output
# Editor Agent
class EditedArticle(BaseModel):
original: Article
suggestions: list[str]
grammar_issues: list[str]
improvements: str
editor_agent = Agent(
'openai:gpt-4o',
output_type=EditedArticle,
name='EditorAgent',
instructions='You are a professional editor. Review content for clarity, grammar, and impact.'
)
async def editing_phase(context: SharedContext, article: Article) -> EditedArticle:
"""Editing phase for quality assurance."""
editing_prompt = f"""
Review this article:
{context.draft_content}
Provide:
1. Improvement suggestions for clarity
2. Grammar and spelling issues
3. Overall quality assessment
4. Recommendation for publication
"""
result = await editor_agent.run(editing_prompt)
return result.output
# Orchestrate pipeline
async def research_and_write_pipeline(topic: str) -> Article:
"""Complete research-to-publication pipeline."""
context = SharedContext(topic=topic)
print("🔍 Research phase...")
research = await research_phase(context)
print(f"Research complete. Found: {len(research)} characters")
print("✍️ Writing phase...")
article = await writing_phase(context)
print(f"Draft complete: {article.title}")
print("✏️ Editing phase...")
edited = await editing_phase(context, article)
print(f"Feedback: {edited.improvements}")
return article
# Usage
if __name__ == '__main__':
article = asyncio.run(
research_and_write_pipeline("Type Safety in Python")
)
print(f"\n✅ Final Article: {article.title}")

Section titled “Recipe 3: RAG (Retrieval-Augmented Generation) with Vector Search”
"""
RAG pattern combining semantic search with LLM generation.
Requires: pgvector, asyncpg, openai (for embeddings)
"""
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
import asyncpg
from openai import AsyncOpenAI
from pydantic import BaseModel
@dataclass
class RAGDependencies:
"""RAG system dependencies."""
db_pool: asyncpg.Pool
openai_client: AsyncOpenAI
embedding_model: str = 'text-embedding-3-small'
class RAGResponse(BaseModel):
answer: str
sources: list[str]
confidence: float
rag_agent = Agent(
'openai:gpt-4o',
deps_type=RAGDependencies,
output_type=RAGResponse,
name='RAGAgent'
)
@rag_agent.tool
async def retrieve_documents(
ctx: RunContext[RAGDependencies],
query: str,
top_k: int = 5
) -> str:
"""Retrieve relevant documents using vector search."""
# Create embedding for query
embedding_response = await ctx.deps.openai_client.embeddings.create(
input=query,
model=ctx.deps.embedding_model
)
query_embedding = embedding_response.data[0].embedding
# Search for similar documents using pgvector
async with ctx.deps.db_pool.acquire() as conn:
rows = await conn.fetch('''
SELECT id, title, content, 1 - (embedding <=> $1) as similarity
FROM documents
ORDER BY similarity DESC
LIMIT $2
''', query_embedding, top_k)
# Format retrieved documents
formatted_docs = []
for row in rows:
formatted_docs.append(
f"[{row['title']}]\n{row['content']}\n"
)
return "\n\n".join(formatted_docs)
async def rag_query(
query: str,
db_pool: asyncpg.Pool,
openai_client: AsyncOpenAI
) -> RAGResponse:
"""Execute RAG query."""
deps = RAGDependencies(
db_pool=db_pool,
openai_client=openai_client
)
result = await rag_agent.run(query, deps=deps)
return result.output
# Usage
async def main():
# Initialize database pool
pool = await asyncpg.create_pool('postgresql://localhost/documents')
openai_client = AsyncOpenAI()
try:
response = await rag_query(
"How do I implement type safety in Python?",
pool,
openai_client
)
print(f"Answer: {response.answer}")
print(f"Sources: {response.sources}")
print(f"Confidence: {response.confidence}")
finally:
await pool.close()

Recipe 4: Streaming Agent with Real-Time Response

Section titled “Recipe 4: Streaming Agent with Real-Time Response”
"""
Real-time streaming agent for web frontends.
Returns tokens as they arrive for immediate UI updates.
"""
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic_ai import Agent
import asyncio
app = FastAPI()
agent = Agent('openai:gpt-4o')
@app.post('/api/chat/stream')
async def chat_stream(message: str):
"""Stream agent response in real-time."""
async def stream_generator():
async with agent.run_stream(message) as response:
# Stream text as it arrives
async for text in response.stream_text():
yield text.encode()
yield b'\n' # Newline for client parsing
return StreamingResponse(
stream_generator(),
media_type='text/event-stream'
)
@app.post('/api/chat/structured-stream')
async def structured_stream(message: str):
"""Stream structured output with events."""
from pydantic import BaseModel
import json
class Response(BaseModel):
answer: str
metadata: dict
structured_agent = Agent(
'openai:gpt-4o',
output_type=Response
)
async def event_stream():
async with structured_agent.run_stream(message) as response:
# Stream partial text
async for text in response.stream_text():
event = {
'type': 'text_delta',
'data': text
}
yield f"data: {json.dumps(event)}\n\n".encode()
# Stream final structured output
result = await response.result()
final_event = {
'type': 'final_result',
'data': result.output.model_dump()
}
yield f"data: {json.dumps(final_event)}\n\n".encode()
return StreamingResponse(
event_stream(),
media_type='text/event-stream'
)
# Frontend JavaScript
javascript_example = """
// Connect to streaming endpoint
const eventSource = new EventSource('/api/chat/stream?message=Hello');
eventSource.onmessage = (event) => {
const text = event.data;
document.getElementById('response').textContent += text;
};
eventSource.onerror = () => {
console.error('Stream error');
eventSource.close();
};
"""

"""
Agent that maintains conversation memory across sessions.
Uses PostgreSQL for persistence.
"""
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
import asyncpg
from datetime import datetime
@dataclass
class MemoryDependencies:
db_pool: asyncpg.Pool
user_id: int
session_id: str
memory_agent = Agent(
'openai:gpt-4o',
deps_type=MemoryDependencies,
name='MemoryAgent'
)
@memory_agent.tool
async def store_memory(
ctx: RunContext[MemoryDependencies],
key: str,
value: str,
ttl: int = 86400
) -> bool:
"""Store information in persistent memory."""
async with ctx.deps.db_pool.acquire() as conn:
await conn.execute('''
INSERT INTO user_memory (user_id, session_id, key, value, expires_at)
VALUES ($1, $2, $3, $4, NOW() + INTERVAL '1 second' * $5)
ON CONFLICT (user_id, key) DO UPDATE
SET value = $3, updated_at = NOW()
''', ctx.deps.user_id, ctx.deps.session_id, key, value, ttl)
return True
@memory_agent.tool
async def recall_memory(
ctx: RunContext[MemoryDependencies],
key: str | None = None
) -> dict:
"""Recall stored information."""
async with ctx.deps.db_pool.acquire() as conn:
if key:
row = await conn.fetchrow('''
SELECT key, value FROM user_memory
WHERE user_id = $1 AND key = $2
AND expires_at > NOW()
''', ctx.deps.user_id, key)
return {key: row['value']} if row else {}
else:
rows = await conn.fetch('''
SELECT key, value FROM user_memory
WHERE user_id = $1 AND expires_at > NOW()
''', ctx.deps.user_id)
return {row['key']: row['value'] for row in rows}
@memory_agent.system_prompt
async def memory_aware_prompt(ctx: RunContext[MemoryDependencies]) -> str:
"""Include user's memory in system prompt."""
memories = await recall_memory(ctx)
memory_str = "\n".join([f"- {k}: {v}" for k, v in memories.items()])
return f"""
You have access to the user's persistent memory:
{memory_str if memory_str else "No stored memories yet"}
Feel free to reference or update this memory during conversation.
"""
async def conversational_agent_with_memory(
user_id: int,
session_id: str,
message: str,
db_pool: asyncpg.Pool
) -> str:
"""Run agent with memory capabilities."""
deps = MemoryDependencies(
db_pool=db_pool,
user_id=user_id,
session_id=session_id
)
result = await memory_agent.run(message, deps=deps)
return result.output

Recipe 6: Error Recovery with Retry Strategies

Section titled “Recipe 6: Error Recovery with Retry Strategies”
"""
Production agent with sophisticated error handling and recovery.
"""
from pydantic_ai import Agent, ModelRetry, RunContext
from typing import Optional
import asyncio
from enum import Enum
class ErrorRecoveryStrategy(str, Enum):
EXPONENTIAL_BACKOFF = 'exponential'
LINEAR_BACKOFF = 'linear'
IMMEDIATE_RETRY = 'immediate'
class ResilientAgent:
def __init__(
self,
model: str,
max_retries: int = 3,
strategy: ErrorRecoveryStrategy = ErrorRecoveryStrategy.EXPONENTIAL_BACKOFF
):
self.agent = Agent(model)
self.max_retries = max_retries
self.strategy = strategy
def _calculate_backoff(self, attempt: int) -> float:
"""Calculate backoff time based on strategy."""
if self.strategy == ErrorRecoveryStrategy.EXPONENTIAL_BACKOFF:
return 2 ** attempt # 1, 2, 4, 8, ...
elif self.strategy == ErrorRecoveryStrategy.LINEAR_BACKOFF:
return attempt # 1, 2, 3, 4, ...
else:
return 0 # Immediate retry
async def run_with_recovery(self, prompt: str) -> Optional[str]:
"""Run agent with automatic recovery."""
last_error = None
for attempt in range(self.max_retries):
try:
result = await self.agent.run(prompt)
return result.output
except Exception as e:
last_error = e
if attempt < self.max_retries - 1:
backoff = self._calculate_backoff(attempt)
print(f"Attempt {attempt + 1} failed. Retrying in {backoff}s...")
await asyncio.sleep(backoff)
# All retries exhausted
print(f"All retries failed. Last error: {last_error}")
return None
# Usage
resilient = ResilientAgent(
'openai:gpt-4o',
max_retries=3,
strategy=ErrorRecoveryStrategy.EXPONENTIAL_BACKOFF
)
result = asyncio.run(
resilient.run_with_recovery("Analyse this data: ...")
)

Recipe 7: Research Agent with common_tools (DuckDuckGo + web fetch)

Section titled “Recipe 7: Research Agent with common_tools (DuckDuckGo + web fetch)”
"""
Research agent that searches the web then fetches and summarises the top result.
Requires: pip install "pydantic-ai-slim[duckduckgo,web-fetch]"
"""
import asyncio
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool
from pydantic_ai.common_tools.web_fetch import web_fetch_tool
class ResearchSummary(BaseModel):
topic: str
key_findings: list[str] = Field(..., min_length=2, description='At least 2 bullet findings')
sources_consulted: list[str] = Field(..., description='URLs fetched during research')
confidence: float = Field(..., ge=0.0, le=1.0)
research_agent = Agent(
'openai:gpt-4o',
output_type=ResearchSummary,
tools=[
duckduckgo_search_tool(max_results=3),
web_fetch_tool(max_content_length=4000, timeout=15),
],
instructions=(
'You are a research assistant. '
'1. Use duckduckgo_search to find relevant URLs for the topic. '
'2. Use web_fetch on the top 1-2 results to read the actual content. '
'3. Synthesise findings into the structured output.'
),
)
async def research(topic: str) -> ResearchSummary:
result = await research_agent.run(f'Research: {topic}')
return result.output
if __name__ == '__main__':
summary = asyncio.run(research('PydanticAI latest features 2026'))
print(f'Topic: {summary.topic}')
for finding in summary.key_findings:
print(f' • {finding}')
print(f'Sources: {summary.sources_consulted}')
print(f'Confidence: {summary.confidence:.0%}')

Recipe 8: Concurrent batch processing with ConcurrencyLimiter

Section titled “Recipe 8: Concurrent batch processing with ConcurrencyLimiter”
"""
Process a large batch of items in parallel while capping model concurrency.
Useful when you have many tasks but want to avoid hitting rate limits.
"""
import asyncio
from pydantic import BaseModel
from pydantic_ai import Agent, ConcurrencyLimiter, limit_model_concurrency
from pydantic_ai.exceptions import ConcurrencyLimitExceeded
class ItemAnalysis(BaseModel):
item_id: str
category: str
sentiment: str # positive | negative | neutral
score: float
# Shared limiter: max 5 parallel model calls, queue up to 15 more
shared_limiter = ConcurrencyLimiter(max_running=5, max_queued=15, name='batch-pool')
model = limit_model_concurrency('openai:gpt-4o', limiter=shared_limiter)
agent = Agent(model, output_type=ItemAnalysis)
async def analyse_item(item_id: str, text: str) -> ItemAnalysis | None:
try:
result = await agent.run(
f'Analyse this customer feedback (item_id={item_id}): {text}'
)
return result.output
except ConcurrencyLimitExceeded:
print(f'[{item_id}] Dropped — queue full')
return None
async def process_batch(items: list[dict]) -> list[ItemAnalysis]:
tasks = [analyse_item(item['id'], item['text']) for item in items]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
async def main():
items = [
{'id': f'item-{i}', 'text': f'Sample feedback number {i}. Great product!'}
for i in range(30)
]
analyses = await process_batch(items)
positives = [a for a in analyses if a.sentiment == 'positive']
print(f'Processed {len(analyses)}/30 items, {len(positives)} positive')
print(f'Pool status — running: {shared_limiter.running_count}, waiting: {shared_limiter.waiting_count}')
if __name__ == '__main__':
asyncio.run(main())

Recipe 9: Node-level inspection with AgentRun.iter()

Section titled “Recipe 9: Node-level inspection with AgentRun.iter()”
"""
Use agent.iter() to record every graph node, measure per-step latency,
and inspect tool call arguments before they execute.
"""
import asyncio
import time
from pydantic_ai import Agent, RunContext
from pydantic_graph import End
from pydantic_ai.run import AgentRun
agent = Agent('openai:gpt-4o')
@agent.tool_plain
def get_temperature(city: str) -> float:
"""Return the current temperature for a city (simulated)."""
return 22.0
async def run_with_inspection(prompt: str) -> dict:
"""Run the agent and collect execution telemetry."""
telemetry = {
'nodes': [],
'step_times_ms': [],
'run_id': None,
'output': None,
}
async with agent.iter(prompt) as run:
telemetry['run_id'] = run.run_id
node = run.next_node
while not isinstance(node, End):
node_name = type(node).__name__
t0 = time.monotonic()
# Inspect the node before it executes
if hasattr(node, 'request'):
telemetry['nodes'].append({'type': node_name, 'has_request': True})
else:
telemetry['nodes'].append({'type': node_name})
node = await run.next(node)
elapsed_ms = (time.monotonic() - t0) * 1000
telemetry['step_times_ms'].append(round(elapsed_ms, 1))
telemetry['output'] = run.result.output
telemetry['total_messages'] = len(run.all_messages())
return telemetry
async def main():
telemetry = await run_with_inspection('What is the temperature in London?')
print('run_id:', telemetry['run_id'])
for i, (node, ms) in enumerate(zip(telemetry['nodes'], telemetry['step_times_ms'])):
print(f' step {i+1}: {node["type"]}{ms} ms')
print('output:', telemetry['output'])
print('messages:', telemetry['total_messages'])
if __name__ == '__main__':
asyncio.run(main())

Recipe 10: Prompt enrichment with format_as_xml

Section titled “Recipe 10: Prompt enrichment with format_as_xml”
"""
Use format_as_xml to inject structured context (product catalogue, user profile)
into the system prompt so the model can reason over it without JSON parsing issues.
"""
import asyncio
from dataclasses import dataclass, field
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext, format_as_xml
@dataclass
class Product:
id: str
name: str
price: float
category: str
in_stock: bool
@dataclass
class ShoppingContext:
user_name: str
budget: float
products: list[Product] = field(default_factory=list)
class Recommendation(BaseModel):
recommended_products: list[str] # product IDs
reasoning: str
total_cost: float
agent = Agent(
'openai:gpt-4o',
deps_type=ShoppingContext,
output_type=Recommendation,
)
@agent.system_prompt
async def inject_catalogue(ctx: RunContext[ShoppingContext]) -> str:
catalogue_xml = format_as_xml(
ctx.deps.products,
root_tag='catalogue',
item_tag='product',
)
return (
f'You are a shopping assistant for {ctx.deps.user_name}. '
f'Their budget is ${ctx.deps.budget:.2f}.\n\n'
f'Available products:\n{catalogue_xml}\n\n'
'Recommend products within the budget and explain your choices.'
)
async def main():
context = ShoppingContext(
user_name='Alice',
budget=50.0,
products=[
Product('p1', 'Python Book', 35.0, 'books', True),
Product('p2', 'Keyboard', 85.0, 'electronics', True),
Product('p3', 'USB Hub', 25.0, 'electronics', True),
Product('p4', 'Mouse Pad', 12.0, 'accessories', True),
],
)
result = await agent.run(
'What should I buy today?',
deps=context,
)
rec = result.output
print('Recommended:', rec.recommended_products)
print('Total cost:', rec.total_cost)
print('Reasoning:', rec.reasoning)
if __name__ == '__main__':
asyncio.run(main())

Recipe 11: Capability Bundle — scoped tool set per user role

Section titled “Recipe 11: Capability Bundle — scoped tool set per user role”
"""
Demonstrates the `Capability` convenience class: bundle tools, toolsets, and
instructions under a single named capability that can be toggled per run.
Pattern:
- Build a 'read-only' and an 'admin' Capability
- Inject only the appropriate one based on the caller's role
"""
import asyncio
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext
from pydantic_ai.capabilities import Capability
from pydantic_ai.models.test import TestModel
# --- Capabilities ---
read_cap: Capability = Capability(
id='read',
description='Read-only data access tools',
instructions='You may ONLY read data. Never modify anything.',
)
admin_cap: Capability = Capability(
id='admin',
description='Full CRUD data access tools',
instructions='You have full read/write access. Confirm destructive actions.',
)
@read_cap.tool_plain
def list_users() -> list[str]:
"""List all user names."""
return ['alice', 'bob', 'carol']
@read_cap.tool_plain
def get_user(name: str) -> dict:
"""Get details for a specific user."""
return {'name': name, 'active': True}
@admin_cap.tool_plain
def list_users_admin() -> list[str]:
"""List all users including suspended accounts."""
return ['alice', 'bob', 'carol', 'suspended_dave']
@admin_cap.tool_plain
def delete_user(name: str) -> str:
"""Permanently delete a user account."""
return f'User {name!r} deleted.'
@dataclass
class Deps:
role: str # 'read' or 'admin'
agent = Agent(TestModel(custom_result_text='Done.'), deps_type=Deps)
async def run_as(role: str, query: str) -> str:
cap = admin_cap if role == 'admin' else read_cap
result = await agent.run(query, deps=Deps(role=role), capabilities=[cap])
return result.output
async def main() -> None:
print(await run_as('read', 'List users'))
print(await run_as('admin', 'Delete user bob'))
if __name__ == '__main__':
asyncio.run(main())

Recipe 12: Web-research agent with WebSearch + WebFetch (native / local fallback)

Section titled “Recipe 12: Web-research agent with WebSearch + WebFetch (native / local fallback)”
"""
Research agent that combines WebSearch (for discovery) and WebFetch (for full content).
Uses native=True on both capabilities so the agent picks the fastest path on each
provider; falls back to DuckDuckGo / markdownify-based fetch on providers that don't
support native tools.
Requirements (for the local fallback paths):
pip install "pydantic-ai-slim[web-fetch,duckduckgo]"
"""
import asyncio
from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_ai.capabilities import WebSearch, WebFetch
from pydantic_ai.models.test import TestModel
class ResearchSummary(BaseModel):
topic: str
key_facts: list[str]
sources: list[str]
agent = Agent(
TestModel(custom_result_text='{"topic":"AI","key_facts":["fact1"],"sources":["https://example.com"]}'),
output_type=ResearchSummary,
system_prompt=(
'You are a research assistant. Use web_search to discover relevant pages, '
'then web_fetch to read them in full. Summarise the findings.'
),
capabilities=[
WebSearch(
native=True,
local='duckduckgo',
search_context_size='high',
blocked_domains=['reddit.com', 'twitter.com'],
),
WebFetch(
native=True,
local=True,
enable_citations=True,
max_content_tokens=4096,
),
],
)
async def main() -> None:
result = await agent.run('Research: What are the latest advances in diffusion models?')
summary = result.output
print(f'Topic: {summary.topic}')
for fact in summary.key_facts:
print(f' • {fact}')
print('Sources:', summary.sources)
if __name__ == '__main__':
asyncio.run(main())

Recipe 13: Inline HITL with HandleDeferredToolCalls

Section titled “Recipe 13: Inline HITL with HandleDeferredToolCalls”
"""
Demonstrates `HandleDeferredToolCalls`: intercept deferred tool calls inline,
automatically approve low-risk operations, reject high-risk ones with an
explanation, and let the agent continue without a second invocation.
"""
import asyncio
from pydantic_ai import Agent, RunContext
from pydantic_ai.capabilities import HandleDeferredToolCalls
from pydantic_ai.models.test import TestModel
from pydantic_ai.toolsets import FunctionToolset
from pydantic_ai.tools import DeferredToolRequests, DeferredToolResults, ToolApproved
LOW_RISK_TOOLS = {'send_notification', 'log_event'}
HIGH_RISK_TOOLS = {'delete_record', 'send_email_blast'}
async def approval_handler(
ctx: RunContext[None],
requests: DeferredToolRequests,
) -> DeferredToolResults | None:
results: dict[str, ToolApproved | str] = {}
for call in requests.tool_calls:
if call.tool_name in LOW_RISK_TOOLS:
results[call.tool_call_id] = ToolApproved()
elif call.tool_name in HIGH_RISK_TOOLS:
results[call.tool_call_id] = (
f'Rejected: {call.tool_name} requires manual review'
)
if not results:
return None
return requests.build_results(results)
toolset = FunctionToolset()
@toolset.tool_plain(requires_approval=True)
def send_notification(message: str) -> str:
"""Send a push notification."""
return f'Notification sent: {message}'
@toolset.tool_plain(requires_approval=True)
def delete_record(record_id: str) -> str:
"""Permanently delete a database record."""
return f'Record {record_id} deleted'
agent = Agent(
TestModel(custom_result_text='All actions processed.'),
toolsets=[toolset],
capabilities=[HandleDeferredToolCalls(handler=approval_handler)],
)
async def main() -> None:
result = await agent.run('Send a notification about the deploy, then delete record #42.')
print(result.output)
if __name__ == '__main__':
asyncio.run(main())

Recipe 14: ProcessEventStream — latency observer + thinking-strip processor

Section titled “Recipe 14: ProcessEventStream — latency observer + thinking-strip processor”
"""
Shows both forms of ProcessEventStream:
1. Observer (async def → None): records first-token latency.
2. Processor (async generator): strips ThinkingPart events so downstream
consumers never see extended-thinking content.
"""
import asyncio
import time
from collections.abc import AsyncIterable, AsyncIterator
from pydantic_ai import Agent, RunContext
from pydantic_ai.capabilities import ProcessEventStream
from pydantic_ai.messages import AgentStreamEvent, PartStartEvent, ThinkingPart
from pydantic_ai.models.test import TestModel
class LatencyTracker:
first_token_ms: float | None = None
_start: float = 0.0
def reset(self) -> None:
self._start = time.monotonic()
tracker = LatencyTracker()
async def latency_observer(
ctx: RunContext[None],
stream: AsyncIterable[AgentStreamEvent],
) -> None:
tracker.reset()
async for event in stream:
if tracker.first_token_ms is None and isinstance(event, PartStartEvent):
tracker.first_token_ms = (time.monotonic() - tracker._start) * 1000
async def strip_thinking(
ctx: RunContext[None],
stream: AsyncIterable[AgentStreamEvent],
) -> AsyncIterator[AgentStreamEvent]:
async for event in stream:
if isinstance(event, PartStartEvent) and isinstance(event.part, ThinkingPart):
continue
yield event
agent = Agent(
TestModel(custom_result_text='42 is the answer.'),
capabilities=[
ProcessEventStream(handler=latency_observer),
ProcessEventStream(handler=strip_thinking),
],
)
async def main() -> None:
result = await agent.run('What is the meaning of life?')
print('Output:', result.output)
if tracker.first_token_ms is not None:
print(f'First-token latency: {tracker.first_token_ms:.1f} ms')
else:
print('First-token latency: n/a (TestModel)')
if __name__ == '__main__':
asyncio.run(main())