Pydantic AI: Durable Execution Guide
Pydantic AI: Durable Execution Guide
Section titled “Pydantic AI: Durable Execution Guide”Preserve Progress Across Failures & Restarts
Section titled “Preserve Progress Across Failures & Restarts”Version: 1.0.0 (2025) Framework: Pydantic AI with Durable Execution Focus: Fault-tolerant agent systems with checkpoint/resume capabilities
Table of Contents
Section titled “Table of Contents”- Overview
- Core Concepts
- Prefect Integration
- DBOS Integration
- Temporal Integration
- Custom Durable Backends
- State Persistence Patterns
- Fault Tolerance
- Production Examples
Overview
Section titled “Overview”What is Durable Execution?
Section titled “What is Durable Execution?”Durable execution ensures your AI agents can:
- Resume from interruptions - API failures, application crashes, or deployments
- Preserve progress - Don’t lose expensive LLM calls or partial results
- Recover automatically - Continue exactly where you left off
- Guarantee completion - Execute long-running workflows reliably
Why Durable Execution?
Section titled “Why Durable Execution?”# ❌ WITHOUT Durable Execution# API fails at step 3 of 10 → Start over, waste tokens, lose progress
# ✅ WITH Durable Execution# API fails at step 3 of 10 → Resume from step 3, preserve all prior workUse Cases:
- Long-running multi-agent workflows
- Expensive research or analysis tasks
- Critical business processes requiring guarantees
- Production systems with high reliability requirements
Core Concepts
Section titled “Core Concepts”Durable Execution Primitives
Section titled “Durable Execution Primitives”from pydantic_ai import Agentfrom pydantic_ai.durable import DurableAgent, Checkpoint
# 1. Checkpoints - Save state at key pointscheckpoint = Checkpoint( agent_state={'step': 3, 'results': [...]}, message_history=[...], tool_results={...})
# 2. Resume Tokens - Resume from any checkpointresume_token = checkpoint.create_resume_token()
# 3. Durable Agent - Automatic checkpointingagent = DurableAgent( 'openai:gpt-4o', checkpoint_every_n_steps=1, # Checkpoint after each step backend='prefect' # or 'dbos', 'temporal', 'custom')Execution Guarantees
Section titled “Execution Guarantees”| Feature | Standard Agent | Durable Agent |
|---|---|---|
| Resume after crash | ❌ No | ✅ Yes |
| Preserve token usage | ❌ Restart = new tokens | ✅ Only new work uses tokens |
| Long-running workflows | ⚠️ Risk of failure | ✅ Guaranteed completion |
| Exactly-once execution | ❌ May retry entire flow | ✅ Each step executes once |
| State persistence | ❌ Memory only | ✅ Durable storage |
Prefect Integration
Section titled “Prefect Integration”# Install Pydantic AI with Prefectpip install "pydantic-ai[prefect]"
# Configure Prefectprefect cloud login# or for self-hostedprefect server startBasic Durable Agent with Prefect
Section titled “Basic Durable Agent with Prefect”from pydantic_ai import Agentfrom pydantic_ai.durable.prefect import PrefectDurableAgentfrom pydantic import BaseModelfrom prefect import flow, taskimport asyncio
class ResearchResult(BaseModel): topic: str findings: list[str] summary: str sources: list[str]
@flow(name="durable-research-agent", retries=3)async def research_workflow(topic: str) -> ResearchResult: """Durable research workflow that survives failures."""
# Create durable agent with automatic checkpointing agent = PrefectDurableAgent( 'openai:gpt-4o', output_type=ResearchResult, instructions='Conduct thorough research on the topic', checkpoint_every_n_steps=1, # Checkpoint after each tool call checkpoint_on_error=True, # Checkpoint before raising errors )
# Add research tools @agent.tool async def search_web(ctx, query: str) -> list[str]: """Search the web for information.""" # Simulate web search await asyncio.sleep(2) return [f"Result for {query}"]
@agent.tool async def analyze_sources(ctx, sources: list[str]) -> dict: """Analyze and summarize sources.""" await asyncio.sleep(3) return {'analysis': 'Detailed analysis'}
# Run with automatic durability result = await agent.run( f"Research the topic: {topic}", flow_run_name=f"research-{topic}" # Track in Prefect UI )
return result.output
# Usageif __name__ == '__main__': result = asyncio.run(research_workflow("Quantum Computing")) print(f"Research completed: {result.topic}") print(f"Findings: {len(result.findings)}")Advanced Prefect Patterns
Section titled “Advanced Prefect Patterns”from pydantic_ai.durable.prefect import PrefectDurableAgent, FlowContextfrom prefect import flow, task, get_run_loggerfrom prefect.artifacts import create_markdown_artifactfrom datetime import timedelta
@flow( name="multi-stage-agent-pipeline", retries=3, retry_delay_seconds=60, timeout_seconds=3600)async def multi_stage_pipeline(input_data: dict) -> dict: """Multi-stage durable pipeline with checkpoints."""
logger = get_run_logger()
# Stage 1: Data Collection collector_agent = PrefectDurableAgent( 'openai:gpt-4o', checkpoint_every_n_steps=1, name='DataCollector' )
logger.info("Starting data collection stage") collection_result = await collector_agent.run( f"Collect data about: {input_data['topic']}", flow_run_name="stage-1-collection" )
# Create artifact in Prefect UI await create_markdown_artifact( key="collection-results", markdown=f"# Collection Results\n{collection_result.output}" )
# Stage 2: Analysis analyst_agent = PrefectDurableAgent( 'anthropic:claude-3-5-sonnet-latest', checkpoint_every_n_steps=1, name='DataAnalyst' )
logger.info("Starting analysis stage") analysis_result = await analyst_agent.run( f"Analyze this data: {collection_result.output}", flow_run_name="stage-2-analysis" )
# Stage 3: Report Generation writer_agent = PrefectDurableAgent( 'openai:gpt-4o', checkpoint_every_n_steps=2, name='ReportWriter' )
logger.info("Starting report generation") report_result = await writer_agent.run( f"Generate report from analysis: {analysis_result.output}", flow_run_name="stage-3-report" )
return { 'collection': collection_result.output, 'analysis': analysis_result.output, 'report': report_result.output }
# Manual checkpoint and resume@flow(name="manual-checkpoint-example")async def manual_checkpoint_workflow(): """Demonstrate manual checkpoint control."""
agent = PrefectDurableAgent( 'openai:gpt-4o', checkpoint_every_n_steps=0, # Disable automatic checkpointing )
# Manual checkpoint creation checkpoint_id = await agent.create_checkpoint({ 'step': 1, 'data': 'Initial state' })
try: result = await agent.run("Complex query")
# Checkpoint successful result await agent.create_checkpoint({ 'step': 2, 'result': result.output })
except Exception as e: # Resume from last checkpoint checkpoint = await agent.get_checkpoint(checkpoint_id) print(f"Resuming from: {checkpoint.state}")
# Retry from checkpoint result = await agent.run_from_checkpoint(checkpoint_id)
return resultMonitoring Durable Flows
Section titled “Monitoring Durable Flows”from prefect import flow, get_run_loggerfrom prefect.client import get_clientfrom pydantic_ai.durable.prefect import PrefectDurableAgent
@flow(name="monitored-durable-agent")async def monitored_workflow(query: str): """Workflow with comprehensive monitoring."""
logger = get_run_logger()
agent = PrefectDurableAgent( 'openai:gpt-4o', checkpoint_every_n_steps=1, on_checkpoint=lambda cp: logger.info(f"Checkpoint created: {cp.id}"), on_resume=lambda cp: logger.info(f"Resuming from: {cp.id}") )
# Get flow run context async with get_client() as client: flow_run = await client.read_flow_run(get_run_logger().context['flow_run_id']) logger.info(f"Flow run: {flow_run.name}")
result = await agent.run(query)
# Log metrics logger.info(f"Tokens used: {result.usage().total_tokens}") logger.info(f"Checkpoints created: {agent.checkpoint_count}")
return result.outputDBOS Integration
Section titled “DBOS Integration”# Install Pydantic AI with DBOSpip install "pydantic-ai[dbos]"
# Initialize DBOS projectdbos init my-agent-projectcd my-agent-projectDBOS Durable Agent
Section titled “DBOS Durable Agent”from pydantic_ai import Agentfrom pydantic_ai.durable.dbos import DBOSDurableAgent, workflow, transactionfrom dbos import DBOSimport asyncio
# Initialize DBOSdbos = DBOS()
@workflow()async def durable_agent_workflow(topic: str) -> dict: """ DBOS workflow with automatic durability.
DBOS guarantees: - Exactly-once execution of each step - Automatic recovery from any failure - Complete audit trail - Time-travel debugging """
# Create DBOS durable agent agent = DBOSDurableAgent( 'openai:gpt-4o', instructions='You are a research assistant', workflow_id=DBOS.workflow_id # Link to DBOS workflow )
# Step 1: Research (durable) research_result = await agent.run( f"Research {topic}", step_name="research" # Named step for checkpointing )
# Step 2: Save to database (transactional) await save_research(research_result.output)
# Step 3: Analyze (durable) analysis_result = await agent.run( f"Analyze: {research_result.output}", step_name="analysis" )
return { 'research': research_result.output, 'analysis': analysis_result.output }
@transaction()async def save_research(data: str): """Transactional database operation.""" # DBOS ensures this executes exactly once async with DBOS.pool.acquire() as conn: await conn.execute( "INSERT INTO research (data, timestamp) VALUES ($1, NOW())", data )
# Advanced: Multi-agent DBOS workflow@workflow()async def multi_agent_dbos_workflow(task: str) -> dict: """Coordinate multiple durable agents."""
# Agent 1: Planner planner = DBOSDurableAgent( 'openai:gpt-4o', name='Planner', workflow_id=DBOS.workflow_id )
plan = await planner.run( f"Create a plan for: {task}", step_name="planning" )
# Agent 2: Executor executor = DBOSDurableAgent( 'anthropic:claude-3-5-sonnet-latest', name='Executor', workflow_id=DBOS.workflow_id )
execution = await executor.run( f"Execute this plan: {plan.output}", step_name="execution" )
# Agent 3: Reviewer reviewer = DBOSDurableAgent( 'openai:gpt-4o', name='Reviewer', workflow_id=DBOS.workflow_id )
review = await reviewer.run( f"Review execution: {execution.output}", step_name="review" )
return { 'plan': plan.output, 'execution': execution.output, 'review': review.output }
# Time-travel debuggingasync def debug_workflow(workflow_id: str): """Debug workflow by replaying from any checkpoint."""
# Get workflow history history = await DBOS.get_workflow_history(workflow_id)
print(f"Workflow steps: {len(history.steps)}")
for step in history.steps: print(f"Step: {step.name}, Status: {step.status}") print(f" Input: {step.input}") print(f" Output: {step.output}") print(f" Tokens: {step.metadata.get('tokens_used')}")
# Replay from specific step if history.status == 'failed': print(f"Replaying from step: {history.failed_step}") await DBOS.replay_workflow(workflow_id, from_step=history.failed_step)DBOS State Management
Section titled “DBOS State Management”from pydantic_ai.durable.dbos import DBOSDurableAgent, workflow, statefrom pydantic import BaseModelfrom typing import Any
class WorkflowState(BaseModel): """Type-safe workflow state.""" current_step: int results: dict[str, Any] tokens_used: int errors: list[str]
@workflow()async def stateful_workflow(query: str) -> dict: """Workflow with managed state."""
# Initialize state wf_state = WorkflowState( current_step=0, results={}, tokens_used=0, errors=[] )
agent = DBOSDurableAgent('openai:gpt-4o')
try: # Step 1 wf_state.current_step = 1 result1 = await agent.run("Step 1", step_name="step_1") wf_state.results['step_1'] = result1.output wf_state.tokens_used += result1.usage().total_tokens
# Persist state await DBOS.set_state('workflow_state', wf_state.model_dump())
# Step 2 wf_state.current_step = 2 result2 = await agent.run("Step 2", step_name="step_2") wf_state.results['step_2'] = result2.output wf_state.tokens_used += result2.usage().total_tokens
await DBOS.set_state('workflow_state', wf_state.model_dump())
except Exception as e: wf_state.errors.append(str(e)) await DBOS.set_state('workflow_state', wf_state.model_dump())
# Workflow will automatically retry from last checkpoint raise
return wf_state.results
# Resume workflow from failureasync def resume_failed_workflow(workflow_id: str): """Resume a failed workflow."""
# DBOS automatically resumes from last successful checkpoint result = await DBOS.resume_workflow(workflow_id)
# Get final state final_state = await DBOS.get_state(workflow_id, 'workflow_state')
return WorkflowState(**final_state)Temporal Integration
Section titled “Temporal Integration”# Install Pydantic AI with Temporalpip install "pydantic-ai[temporal]"
# Start Temporal server (Docker)docker run -d -p 7233:7233 temporalio/auto-setup:latestTemporal Durable Agent
Section titled “Temporal Durable Agent”from pydantic_ai import Agentfrom pydantic_ai.durable.temporal import TemporalDurableAgentfrom temporalio import workflow, activityfrom temporalio.client import Clientfrom temporalio.worker import Workerfrom datetime import timedeltaimport asyncio
@workflow.defnclass ResearchWorkflow: """Temporal workflow with durable agents."""
@workflow.run async def run(self, topic: str) -> dict: """Main workflow execution."""
# Create durable agent agent = TemporalDurableAgent( 'openai:gpt-4o', workflow_context=workflow.info(), # Link to Temporal workflow instructions='Conduct thorough research' )
# Step 1: Initial research (automatically durable) research = await agent.run( f"Research: {topic}", activity_id="research_step" )
# Step 2: Wait for human approval (Temporal signal) await workflow.wait_condition(lambda: self.approved)
# Step 3: Continue after approval analysis = await agent.run( f"Analyze: {research.output}", activity_id="analysis_step" )
return { 'research': research.output, 'analysis': analysis.output }
@workflow.signal def approve(self): """Signal to approve and continue.""" self.approved = True
# Long-running workflow with timeouts@workflow.defnclass LongRunningAgentWorkflow: """Handle long-running agent tasks."""
@workflow.run async def run(self, complex_task: str) -> str: """Execute long-running task with checkpoints."""
agent = TemporalDurableAgent( 'openai:gpt-4o', checkpoint_interval=timedelta(minutes=5) # Checkpoint every 5 min )
# Execute with activity heartbeating result = await agent.run( complex_task, activity_id="long_task", heartbeat_interval=timedelta(seconds=30), # Heartbeat every 30s timeout=timedelta(hours=2) # Max 2 hour timeout )
return result.output
# Run workflowasync def run_temporal_workflow(): """Execute Temporal workflow."""
# Connect to Temporal client = await Client.connect("localhost:7233")
# Start workflow handle = await client.start_workflow( ResearchWorkflow.run, "Quantum Computing", id="research-workflow-1", task_queue="agent-tasks" )
print(f"Started workflow: {handle.id}")
# Signal approval after 10 seconds await asyncio.sleep(10) await handle.signal(ResearchWorkflow.approve)
# Wait for result result = await handle.result() print(f"Workflow completed: {result}")
# Worker to process workflowsasync def run_worker(): """Start Temporal worker."""
client = await Client.connect("localhost:7233")
worker = Worker( client, task_queue="agent-tasks", workflows=[ResearchWorkflow, LongRunningAgentWorkflow] )
await worker.run()Custom Durable Backends
Section titled “Custom Durable Backends”Implement Custom Backend
Section titled “Implement Custom Backend”from pydantic_ai.durable import DurableBackend, Checkpointfrom typing import Optional, Anyimport jsonimport asynciofrom datetime import datetime
class PostgreSQLDurableBackend(DurableBackend): """Custom durable backend using PostgreSQL."""
def __init__(self, connection_string: str): self.connection_string = connection_string self.pool = None
async def initialize(self): """Initialize database connection pool.""" import asyncpg
self.pool = await asyncpg.create_pool(self.connection_string)
# Create checkpoints table async with self.pool.acquire() as conn: await conn.execute(""" CREATE TABLE IF NOT EXISTS agent_checkpoints ( id SERIAL PRIMARY KEY, workflow_id VARCHAR(255) NOT NULL, checkpoint_id VARCHAR(255) UNIQUE NOT NULL, step_number INTEGER NOT NULL, agent_state JSONB NOT NULL, message_history JSONB NOT NULL, tool_results JSONB, metadata JSONB, created_at TIMESTAMP DEFAULT NOW(), INDEX idx_workflow (workflow_id), INDEX idx_checkpoint (checkpoint_id) ) """)
async def save_checkpoint( self, workflow_id: str, checkpoint: Checkpoint ) -> str: """Save checkpoint to database."""
async with self.pool.acquire() as conn: checkpoint_id = f"{workflow_id}-{checkpoint.step_number}"
await conn.execute(""" INSERT INTO agent_checkpoints ( workflow_id, checkpoint_id, step_number, agent_state, message_history, tool_results, metadata ) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (checkpoint_id) DO UPDATE SET agent_state = EXCLUDED.agent_state, message_history = EXCLUDED.message_history, tool_results = EXCLUDED.tool_results, metadata = EXCLUDED.metadata """, workflow_id, checkpoint_id, checkpoint.step_number, json.dumps(checkpoint.agent_state), json.dumps(checkpoint.message_history), json.dumps(checkpoint.tool_results or {}), json.dumps(checkpoint.metadata or {}) )
return checkpoint_id
async def load_checkpoint( self, checkpoint_id: str ) -> Optional[Checkpoint]: """Load checkpoint from database."""
async with self.pool.acquire() as conn: row = await conn.fetchrow(""" SELECT step_number, agent_state, message_history, tool_results, metadata FROM agent_checkpoints WHERE checkpoint_id = $1 """, checkpoint_id)
if not row: return None
return Checkpoint( step_number=row['step_number'], agent_state=json.loads(row['agent_state']), message_history=json.loads(row['message_history']), tool_results=json.loads(row['tool_results']), metadata=json.loads(row['metadata']) )
async def list_checkpoints( self, workflow_id: str ) -> list[str]: """List all checkpoints for a workflow."""
async with self.pool.acquire() as conn: rows = await conn.fetch(""" SELECT checkpoint_id FROM agent_checkpoints WHERE workflow_id = $1 ORDER BY step_number ASC """, workflow_id)
return [row['checkpoint_id'] for row in rows]
async def delete_checkpoints( self, workflow_id: str, before_step: Optional[int] = None ): """Delete old checkpoints."""
async with self.pool.acquire() as conn: if before_step: await conn.execute(""" DELETE FROM agent_checkpoints WHERE workflow_id = $1 AND step_number < $2 """, workflow_id, before_step) else: await conn.execute(""" DELETE FROM agent_checkpoints WHERE workflow_id = $1 """, workflow_id)
# Use custom backendasync def use_custom_backend(): """Use custom PostgreSQL backend."""
from pydantic_ai import Agent from pydantic_ai.durable import DurableAgent
# Initialize backend backend = PostgreSQLDurableBackend( "postgresql://user:pass@localhost/agents" ) await backend.initialize()
# Create durable agent with custom backend agent = DurableAgent( 'openai:gpt-4o', backend=backend, workflow_id='custom-workflow-123', checkpoint_every_n_steps=1 )
# Run workflow result = await agent.run("Complex multi-step query")
# List checkpoints checkpoints = await backend.list_checkpoints('custom-workflow-123') print(f"Created {len(checkpoints)} checkpoints")
# Resume from specific checkpoint if checkpoints: checkpoint = await backend.load_checkpoint(checkpoints[-1]) resumed_agent = DurableAgent.from_checkpoint( checkpoint, backend=backend )
continued_result = await resumed_agent.run("Continue from checkpoint")State Persistence Patterns
Section titled “State Persistence Patterns”Stateful Multi-Step Workflows
Section titled “Stateful Multi-Step Workflows”from pydantic_ai import Agentfrom pydantic_ai.durable import DurableAgent, StatefulWorkflowfrom pydantic import BaseModelfrom typing import Any, Optionalfrom enum import Enum
class WorkflowPhase(str, Enum): PLANNING = 'planning' EXECUTION = 'execution' VALIDATION = 'validation' COMPLETION = 'completion'
class WorkflowContext(BaseModel): """Persistent workflow context.""" phase: WorkflowPhase plan: Optional[str] = None execution_results: list[str] = [] validation_errors: list[str] = [] attempt_count: int = 0 total_tokens_used: int = 0
class StatefulAgentWorkflow(StatefulWorkflow): """Multi-phase stateful workflow."""
def __init__(self, backend): super().__init__(backend) self.context = WorkflowContext(phase=WorkflowPhase.PLANNING)
async def run(self, task: str) -> dict: """Execute stateful workflow with automatic persistence."""
# Phase 1: Planning self.context.phase = WorkflowPhase.PLANNING await self.save_state()
planner = DurableAgent( 'openai:gpt-4o', backend=self.backend, workflow_id=self.workflow_id )
plan_result = await planner.run(f"Create a plan for: {task}") self.context.plan = plan_result.output self.context.total_tokens_used += plan_result.usage().total_tokens await self.save_state()
# Phase 2: Execution self.context.phase = WorkflowPhase.EXECUTION await self.save_state()
executor = DurableAgent( 'anthropic:claude-3-5-sonnet-latest', backend=self.backend, workflow_id=self.workflow_id )
exec_result = await executor.run(f"Execute: {self.context.plan}") self.context.execution_results.append(exec_result.output) self.context.total_tokens_used += exec_result.usage().total_tokens await self.save_state()
# Phase 3: Validation self.context.phase = WorkflowPhase.VALIDATION await self.save_state()
validator = DurableAgent( 'openai:gpt-4o', backend=self.backend, workflow_id=self.workflow_id )
validation_result = await validator.run( f"Validate: {self.context.execution_results[-1]}" )
if "error" in validation_result.output.lower(): self.context.validation_errors.append(validation_result.output) self.context.attempt_count += 1
if self.context.attempt_count < 3: # Retry execution phase self.context.phase = WorkflowPhase.EXECUTION await self.save_state() return await self.run(task) # Recursive retry
# Phase 4: Completion self.context.phase = WorkflowPhase.COMPLETION await self.save_state()
return { 'plan': self.context.plan, 'results': self.context.execution_results, 'tokens_used': self.context.total_tokens_used, 'attempts': self.context.attempt_count }
async def save_state(self): """Persist current context.""" await self.backend.save_checkpoint( self.workflow_id, Checkpoint( step_number=len(self.context.execution_results), agent_state=self.context.model_dump(), message_history=[], metadata={'phase': self.context.phase.value} ) )
@classmethod async def resume_from_checkpoint( cls, workflow_id: str, backend: DurableBackend ) -> 'StatefulAgentWorkflow': """Resume workflow from saved state."""
checkpoints = await backend.list_checkpoints(workflow_id)
if not checkpoints: raise ValueError(f"No checkpoints found for {workflow_id}")
# Load latest checkpoint latest = await backend.load_checkpoint(checkpoints[-1])
workflow = cls(backend) workflow.workflow_id = workflow_id workflow.context = WorkflowContext(**latest.agent_state)
return workflowFault Tolerance
Section titled “Fault Tolerance”Automatic Recovery Patterns
Section titled “Automatic Recovery Patterns”from pydantic_ai.durable import DurableAgent, RetryPolicyfrom typing import Optionalimport asyncio
class ResilientDurableAgent: """Durable agent with comprehensive fault tolerance."""
def __init__( self, model: str, backend: DurableBackend, max_retries: int = 3, retry_delay: float = 1.0 ): self.agent = DurableAgent( model, backend=backend, checkpoint_every_n_steps=1, checkpoint_on_error=True )
self.retry_policy = RetryPolicy( max_retries=max_retries, retry_delay=retry_delay, exponential_backoff=True, retry_on_errors=['APIError', 'TimeoutError', 'RateLimitError'] )
async def run_with_recovery( self, query: str, workflow_id: str ) -> Optional[str]: """Run agent with automatic recovery."""
attempt = 0 last_error = None
while attempt < self.retry_policy.max_retries: try: # Try to run agent result = await self.agent.run(query, workflow_id=workflow_id) return result.output
except Exception as e: last_error = e error_type = type(e).__name__
# Check if error is retryable if error_type not in self.retry_policy.retry_on_errors: raise
attempt += 1
# Calculate backoff delay if self.retry_policy.exponential_backoff: delay = self.retry_policy.retry_delay * (2 ** attempt) else: delay = self.retry_policy.retry_delay
print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...") await asyncio.sleep(delay)
# Try to resume from last checkpoint checkpoints = await self.agent.backend.list_checkpoints(workflow_id)
if checkpoints: print(f"Resuming from checkpoint: {checkpoints[-1]}") checkpoint = await self.agent.backend.load_checkpoint( checkpoints[-1] )
# Resume execution self.agent = DurableAgent.from_checkpoint( checkpoint, backend=self.agent.backend )
# All retries exhausted raise last_error
# Circuit breaker patternfrom enum import Enumfrom datetime import datetime, timedelta
class CircuitState(str, Enum): CLOSED = 'closed' # Normal operation OPEN = 'open' # Failing, reject requests HALF_OPEN = 'half_open' # Testing recovery
class CircuitBreaker: """Circuit breaker for durable agents."""
def __init__( self, failure_threshold: int = 5, timeout: timedelta = timedelta(minutes=1) ): self.state = CircuitState.CLOSED self.failure_count = 0 self.failure_threshold = failure_threshold self.timeout = timeout self.last_failure_time = None
async def call(self, agent_func): """Execute agent function through circuit breaker."""
if self.state == CircuitState.OPEN: # Check if timeout has elapsed if datetime.now() - self.last_failure_time > self.timeout: self.state = CircuitState.HALF_OPEN else: raise Exception("Circuit breaker is OPEN")
try: result = await agent_func()
# Success - reset if in half-open state if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.CLOSED self.failure_count = 0
return result
except Exception as e: self.failure_count += 1 self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN
raise
# Usageasync def fault_tolerant_workflow(): """Complete fault-tolerant workflow."""
backend = PostgreSQLDurableBackend("postgresql://localhost/agents") await backend.initialize()
# Create resilient agent resilient_agent = ResilientDurableAgent( 'openai:gpt-4o', backend=backend, max_retries=3 )
# Wrap in circuit breaker circuit_breaker = CircuitBreaker(failure_threshold=5)
try: result = await circuit_breaker.call( lambda: resilient_agent.run_with_recovery( "Complex query that might fail", workflow_id="fault-tolerant-workflow-1" ) )
print(f"Success: {result}")
except Exception as e: print(f"All recovery attempts failed: {e}")Production Examples
Section titled “Production Examples”Complete Production Workflow
Section titled “Complete Production Workflow”from pydantic_ai.durable.prefect import PrefectDurableAgentfrom prefect import flow, task, get_run_loggerfrom prefect.blocks.system import Secretfrom pydantic import BaseModelfrom typing import List, Optionalimport asyncio
class DocumentAnalysis(BaseModel): """Analysis result schema.""" summary: str key_points: List[str] sentiment: str action_items: List[str] confidence: float
@flow( name="production-document-analysis", retries=3, retry_delay_seconds=[60, 300, 900], # Exponential backoff timeout_seconds=3600, persist_result=True)async def production_analysis_workflow( document_url: str, user_id: str) -> DocumentAnalysis: """ Production-grade document analysis with full durability.
Features: - Automatic checkpointing - Retry with exponential backoff - State persistence - Comprehensive error handling - Observability integration """
logger = get_run_logger() logger.info(f"Starting analysis for user {user_id}")
# Fetch API key from Prefect secret api_key_block = await Secret.load("openai-api-key") api_key = api_key_block.get()
# Create durable agent with production configuration agent = PrefectDurableAgent( 'openai:gpt-4o', output_type=DocumentAnalysis, instructions=""" Analyze documents thoroughly and provide: 1. Concise summary 2. 3-5 key points 3. Overall sentiment 4. Actionable items 5. Confidence score """, checkpoint_every_n_steps=1, checkpoint_on_error=True, name=f"DocumentAnalyzer-{user_id}" )
# Add document fetching tool @agent.tool async def fetch_document(ctx, url: str) -> str: """Fetch document content from URL.""" import httpx
async with httpx.AsyncClient() as client: response = await client.get(url, timeout=30) response.raise_for_status() return response.text[:10000] # Limit to 10k chars
# Execute analysis with full durability try: result = await agent.run( f"Analyze the document at: {document_url}", flow_run_name=f"analysis-{user_id}-{document_url}", metadata={ 'user_id': user_id, 'document_url': document_url } )
logger.info(f"Analysis completed: {result.usage().total_tokens} tokens used")
# Validate output analysis = result.output
if analysis.confidence < 0.7: logger.warning(f"Low confidence analysis: {analysis.confidence}")
return analysis
except Exception as e: logger.error(f"Analysis failed: {e}")
# Log failure for monitoring from prefect.events import emit_event emit_event( event="document-analysis.failed", resource={"prefect.flow-run.id": str(get_run_logger().context['flow_run_id'])}, payload={'error': str(e), 'user_id': user_id} )
raise
# Deploy to Prefect Cloudif __name__ == '__main__': # Deploy workflow from prefect.deployments import Deployment from prefect.server.schemas.schedules import CronSchedule
deployment = Deployment.build_from_flow( flow=production_analysis_workflow, name="production-document-analysis", work_queue_name="agent-queue", parameters={ "document_url": "https://example.com/doc.pdf", "user_id": "default" }, description="Durable document analysis workflow", tags=["production", "agents", "durable"] )
deployment.apply()Best Practices
Section titled “Best Practices”1. Checkpoint Frequency
Section titled “1. Checkpoint Frequency”# ✅ GOOD: Checkpoint after expensive operationsagent = DurableAgent( 'openai:gpt-4o', checkpoint_every_n_steps=1, # After each LLM call)
# ❌ BAD: Too infrequent (lose progress)agent = DurableAgent( 'openai:gpt-4o', checkpoint_every_n_steps=100, # Might lose 99 steps)2. State Size Management
Section titled “2. State Size Management”# ✅ GOOD: Compact stateclass CompactState(BaseModel): step: int result_ids: list[str] # Store IDs, not full data
# ❌ BAD: Large stateclass BloatedState(BaseModel): full_results: list[dict] # Megabytes of data3. Error Categorization
Section titled “3. Error Categorization”# ✅ GOOD: Distinguish retryable vs permanent errorsif isinstance(error, RateLimitError): # Retryable - checkpoint and retry await checkpoint() await retry_with_backoff()elif isinstance(error, ValidationError): # Permanent - fail fast raiseSummary
Section titled “Summary”Durable Execution enables:
- ✅ Fault-tolerant AI agent workflows
- ✅ Resume from any point after failures
- ✅ Cost savings (avoid re-running expensive LLM calls)
- ✅ Production-grade reliability
- ✅ Long-running workflows (hours/days)
- ✅ Exactly-once execution guarantees
Choose your backend:
- Prefect: Best for Python-native workflows, great UI
- DBOS: Best for database-centric apps, time-travel debugging
- Temporal: Best for polyglot systems, mature ecosystem
- Custom: Best for specific infrastructure requirements
Next Steps:
- Choose a durable backend (Prefect, DBOS, or Temporal)
- Add checkpointing to critical workflows
- Implement retry and recovery logic
- Monitor checkpoint creation and resume events
- Test failure scenarios thoroughly
See Also:
pydantic_ai_graph_support.md- Graph-based workflowspydantic_ai_production_guide.md- Production deploymentpydantic_ai_comprehensive_guide.md- Core concepts