Pydantic AI: Production Deployment & Architecture Guide
Pydantic AI: Production Deployment & Architecture Guide
Section titled “Pydantic AI: Production Deployment & Architecture Guide”Version: 1.0.0
Focus: Enterprise-grade deployment, scaling, monitoring, and architectural patterns
Production Architecture Patterns
Section titled “Production Architecture Patterns”Multi-Tier Agent Architecture
Section titled “Multi-Tier Agent Architecture”"""Enterprise-grade agent system with three tiers:1. API Layer - FastAPI endpoints2. Agent Layer - Business logic agents3. Infrastructure Layer - Databases, caches, external services"""
from fastapi import FastAPI, HTTPException, Dependsfrom pydantic import BaseModelfrom pydantic_ai import Agentfrom dataclasses import dataclassimport logging
# Configure structured logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)
# ============ INFRASTRUCTURE LAYER ============
@dataclassclass InfrastructureDependencies: """All infrastructure dependencies.""" database_url: str redis_url: str api_keys: dict logger: logging.Logger environment: str # 'development', 'staging', 'production'
async def get_infrastructure() -> InfrastructureDependencies: """Factory for infrastructure dependencies.""" import os return InfrastructureDependencies( database_url=os.getenv('DATABASE_URL', 'postgresql://localhost/agent_db'), redis_url=os.getenv('REDIS_URL', 'redis://localhost:6379'), api_keys={'openai': os.getenv('OPENAI_API_KEY')}, logger=logger, environment=os.getenv('ENVIRONMENT', 'development') )
# ============ AGENT LAYER ============
class AgentProvider: """Factory for creating configured agents."""
@staticmethod def create_support_agent(deps: InfrastructureDependencies) -> Agent: """Create support/customer service agent.""" return Agent( 'openai:gpt-4o', deps_type=InfrastructureDependencies, instructions=''' You are a professional customer support agent. - Always be polite and professional - Prioritize customer satisfaction - Escalate complex issues appropriately - Log all interactions for quality assurance ''', name='SupportAgent' )
@staticmethod def create_data_analyst_agent(deps: InfrastructureDependencies) -> Agent: """Create data analysis agent.""" return Agent( 'openai:gpt-4o', deps_type=InfrastructureDependencies, instructions=''' You are a data analysis expert. - Provide accurate insights from data - Explain findings clearly - Suggest actionable recommendations - Use visualizations when helpful ''', name='DataAnalystAgent' )
# ============ API LAYER ============
app = FastAPI( title='Pydantic AI Enterprise', version='1.0.0', docs_url='/api/docs')
class SupportRequest(BaseModel): """Type-safe request model.""" customer_id: int issue: str priority: str = 'normal' # normal, high, critical
class AgentResponse(BaseModel): """Type-safe response model.""" response: str suggested_actions: list[str] escalated: bool tokens_used: int
@app.post('/api/support/query', response_model=AgentResponse)async def support_query( request: SupportRequest, deps: InfrastructureDependencies = Depends(get_infrastructure)) -> AgentResponse: """ Handle customer support queries through agent.
Type-safe request/response with full validation. """ try: agent = AgentProvider.create_support_agent(deps)
result = await agent.run( f"Customer {request.customer_id}: {request.issue}", deps=deps )
return AgentResponse( response=result.output, suggested_actions=['escalate', 'provide_docs'], escalated=request.priority == 'critical', tokens_used=result.usage().total_tokens )
except Exception as e: deps.logger.error(f"Support query failed: {e}") raise HTTPException(status_code=500, detail=str(e))Containerized Deployment
Section titled “Containerized Deployment”# Dockerfile for Pydantic AI agent serviceFROM python:3.11-slim
WORKDIR /app
# Install dependenciesCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt
# Copy applicationCOPY . .
# Health checkHEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD python -c "import requests; requests.get('http://localhost:8000/health')"
# Run with gunicornCMD ["gunicorn", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", \ "-b", "0.0.0.0:8000", "main:app"]# docker-compose.yml for full stackversion: '3.8'
services: agent-api: build: . ports: - "8000:8000" environment: DATABASE_URL: postgresql://user:password@postgres:5432/agents REDIS_URL: redis://redis:6379 OPENAI_API_KEY: ${OPENAI_API_KEY} ENVIRONMENT: production LOGFIRE_TOKEN: ${LOGFIRE_TOKEN} depends_on: - postgres - redis restart: unless-stopped
postgres: image: postgres:15-alpine environment: POSTGRES_USER: user POSTGRES_PASSWORD: password POSTGRES_DB: agents volumes: - postgres_data:/var/lib/postgresql/data
redis: image: redis:7-alpine volumes: - redis_data:/data
volumes: postgres_data: redis_data:Kubernetes Deployment
Section titled “Kubernetes Deployment”apiVersion: apps/v1kind: Deploymentmetadata: name: pydantic-ai-agent namespace: productionspec: replicas: 3 selector: matchLabels: app: pydantic-ai-agent template: metadata: labels: app: pydantic-ai-agent spec: containers: - name: agent image: your-registry/pydantic-ai-agent:1.0.0 ports: - containerPort: 8000 env: - name: DATABASE_URL valueFrom: secretKeyRef: name: agent-secrets key: database-url - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: agent-secrets key: openai-api-key resources: requests: memory: "512Mi" cpu: "250m" limits: memory: "2Gi" cpu: "1000m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8000 initialDelaySeconds: 5 periodSeconds: 5
---apiVersion: v1kind: Servicemetadata: name: pydantic-ai-agent-service namespace: productionspec: type: LoadBalancer selector: app: pydantic-ai-agent ports: - protocol: TCP port: 80 targetPort: 8000Observability & Monitoring
Section titled “Observability & Monitoring”Logfire Integration
Section titled “Logfire Integration”"""Production-grade observability with Pydantic Logfire.Provides automatic tracing, logging, and performance monitoring."""
import logfirefrom pydantic_ai import Agent
# Configure Logfirelogfire.configure( service_name='pydantic-ai-agent', service_version='1.0.0', environment='production', send_to_logfire='always' # 'always', 'if-token-present', 'never')
# Instrument Pydantic AI automaticallylogfire.instrument_pydantic_ai()
# Instrument other librarieslogfire.instrument_httpx()logfire.instrument_sqlalchemy()logfire.instrument_asyncpg()
# Create agentagent = Agent('openai:gpt-4o')
# Custom spans for business logic@logfire.span('process_customer_request')async def handle_request(customer_id: int, query: str): """Trace entire customer request flow."""
# Add context to trace logfire.set_attribute('customer_id', customer_id) logfire.set_attribute('query_length', len(query))
# Run agent (automatically traced) result = await agent.run(query)
# Log custom metrics logfire.info( 'agent_response_generated', response_length=len(result.output), tokens_used=result.usage().total_tokens )
return result
# Performance monitoring@logfire.span('expensive_operation')async def monitor_performance(): """Monitor performance with automatic timing.""" import time start = time.time()
# Your operation here await agent.run('Complex query')
duration = time.time() - start logfire.warn(f'slow_operation', duration=duration)Metrics & Dashboards
Section titled “Metrics & Dashboards”"""Prometheus metrics for Pydantic AI agents.Integrates with Grafana for visualization."""
from prometheus_client import Counter, Histogram, Gaugeimport time
# Define metricsagent_requests = Counter( 'pydantic_ai_requests_total', 'Total agent requests', ['agent_name', 'status'])
agent_latency = Histogram( 'pydantic_ai_request_duration_seconds', 'Request latency in seconds', ['agent_name'], buckets=(0.1, 0.5, 1.0, 2.0, 5.0))
tokens_used = Counter( 'pydantic_ai_tokens_used_total', 'Total tokens consumed', ['agent_name', 'token_type'])
active_requests = Gauge( 'pydantic_ai_active_requests', 'Currently active requests', ['agent_name'])
# Middleware to track metricsasync def metrics_middleware(agent_name: str): """Track metrics for agent calls."""
async def wrapper(agent_run): active_requests.labels(agent_name=agent_name).inc() start_time = time.time()
try: result = await agent_run() agent_requests.labels(agent_name=agent_name, status='success').inc()
# Track token usage usage = result.usage() tokens_used.labels(agent_name=agent_name, token_type='input').inc( usage.input_tokens ) tokens_used.labels(agent_name=agent_name, token_type='output').inc( usage.output_tokens )
return result
except Exception as e: agent_requests.labels(agent_name=agent_name, status='error').inc() raise
finally: duration = time.time() - start_time agent_latency.labels(agent_name=agent_name).observe(duration) active_requests.labels(agent_name=agent_name).dec()
return wrapperError Handling & Resilience
Section titled “Error Handling & Resilience”Comprehensive Error Strategy
Section titled “Comprehensive Error Strategy”"""Production error handling with:- Graceful degradation- Automatic retries- Error categorization- User-friendly messages"""
from pydantic_ai import Agent, ModelRetryfrom enum import Enumimport asynciofrom typing import Optional
class ErrorCategory(str, Enum): """Error classification for monitoring.""" RATE_LIMIT = 'rate_limit' TIMEOUT = 'timeout' VALIDATION = 'validation' EXTERNAL_SERVICE = 'external_service' INTERNAL = 'internal'
class AgentError(Exception): """Base agent error with category.""" def __init__(self, message: str, category: ErrorCategory, retryable: bool = False): self.message = message self.category = category self.retryable = retryable super().__init__(message)
async def retry_with_backoff( func, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0): """Exponential backoff retry strategy."""
for attempt in range(max_retries): try: return await func()
except AgentError as e: if not e.retryable or attempt == max_retries - 1: raise
# Exponential backoff with jitter delay = min(base_delay * (2 ** attempt), max_delay) import random jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)
except Exception as e: # Unexpected error raise AgentError( f"Unexpected error: {str(e)}", ErrorCategory.INTERNAL )
# Agent with comprehensive error handlingagent = Agent('openai:gpt-4o')
@agent.output_validatorasync def validate_and_retry(ctx, output): """Validate output and request retry if needed."""
if not output or len(str(output)) == 0: raise ModelRetry('Empty response. Please provide a complete answer.')
return output
async def safe_agent_run(query: str) -> Optional[str]: """Safe agent execution with error handling."""
try: async def run_agent(): return await agent.run(query)
result = await retry_with_backoff(run_agent) return result.output
except AgentError as e: logger.error(f"{e.category.value}: {e.message}")
# Return user-friendly fallback fallback_messages = { ErrorCategory.RATE_LIMIT: "Service is busy. Please try again in a moment.", ErrorCategory.TIMEOUT: "Request timed out. Please try again.", ErrorCategory.VALIDATION: "Invalid input. Please check your request.", ErrorCategory.EXTERNAL_SERVICE: "External service unavailable. Retrying...", ErrorCategory.INTERNAL: "An internal error occurred. Please contact support.", }
return fallback_messages.get(e.category, "An error occurred.")
except Exception as e: logger.exception(f"Unexpected error in agent run: {e}") return NoneToken Management & Cost Control
Section titled “Token Management & Cost Control”"""Production token tracking for cost control and optimization."""
from pydantic_ai import Agent, UsageLimits, RunUsagefrom dataclasses import dataclassfrom datetime import datetime, timedelta
@dataclassclass TokenBudget: """Track token usage and costs.""" daily_limit: int monthly_limit: int cost_per_1k_input: float cost_per_1k_output: float tokens_used_today: int = 0 tokens_used_month: int = 0
class TokenTracker: """Track and enforce token budgets."""
def __init__(self, budget: TokenBudget): self.budget = budget self.last_reset = datetime.now()
def check_budget(self, estimated_tokens: int) -> bool: """Check if request fits within budget."""
# Reset daily count if needed if (datetime.now() - self.last_reset).days >= 1: self.budget.tokens_used_today = 0 self.last_reset = datetime.now()
can_use_today = ( self.budget.tokens_used_today + estimated_tokens <= self.budget.daily_limit )
can_use_month = ( self.budget.tokens_used_month + estimated_tokens <= self.budget.monthly_limit )
return can_use_today and can_use_month
def record_usage(self, usage: RunUsage): """Record actual token usage.""" total = usage.input_tokens + usage.output_tokens self.budget.tokens_used_today += total self.budget.tokens_used_month += total
def estimate_cost(self, usage: RunUsage) -> float: """Calculate cost of request.""" input_cost = (usage.input_tokens / 1000) * self.budget.cost_per_1k_input output_cost = (usage.output_tokens / 1000) * self.budget.cost_per_1k_output return input_cost + output_cost
# Usagetracker = TokenTracker(TokenBudget( daily_limit=100_000, monthly_limit=5_000_000, cost_per_1k_input=0.005, cost_per_1k_output=0.015))
agent = Agent('openai:gpt-4o')
async def cost_controlled_run(query: str) -> Optional[str]: """Run agent with cost controls."""
# Estimate tokens (rough estimate) estimated_tokens = len(query.split()) * 2 + 200
if not tracker.check_budget(estimated_tokens): return "Budget limit reached. Please try again later."
# Run with token limits result = await agent.run( query, usage_limits=UsageLimits(total_tokens_limit=estimated_tokens + 100) )
# Record actual usage and cost tracker.record_usage(result.usage()) cost = tracker.estimate_cost(result.usage())
logger.info(f"Request cost: ${cost:.4f}")
return result.outputCaching & Performance
Section titled “Caching & Performance”"""Production caching strategies for agent responses.Reduces latency and costs significantly."""
from redis import Redisimport hashlibimport jsonfrom pydantic_ai import Agent
redis_client = Redis.from_url('redis://localhost:6379')
class AgentCacheManager: """Manage caching for agent responses."""
def __init__(self, ttl: int = 3600): self.ttl = ttl # Cache TTL in seconds
def _generate_cache_key(self, query: str, agent_name: str) -> str: """Generate deterministic cache key.""" key_data = f"{agent_name}:{query}" return hashlib.md5(key_data.encode()).hexdigest()
async def get_cached_response( self, query: str, agent_name: str ) -> Optional[str]: """Get response from cache if available."""
key = self._generate_cache_key(query, agent_name) cached = redis_client.get(key)
if cached: logger.info(f"Cache hit for query: {query[:50]}...") return cached.decode('utf-8')
return None
async def cache_response( self, query: str, response: str, agent_name: str ): """Cache agent response."""
key = self._generate_cache_key(query, agent_name) redis_client.setex(key, self.ttl, response)
logger.info(f"Cached response for: {query[:50]}...")
def invalidate(self, pattern: str): """Invalidate cache entries by pattern.""" for key in redis_client.scan_iter(match=pattern): redis_client.delete(key)
# Usage with cachecache_manager = AgentCacheManager(ttl=3600)agent = Agent('openai:gpt-4o')
async def run_with_caching(query: str, agent_name: str = 'default') -> str: """Run agent with caching."""
# Try cache first cached_response = await cache_manager.get_cached_response(query, agent_name) if cached_response: return cached_response
# Run agent if not cached result = await agent.run(query) response = result.output
# Cache for future use await cache_manager.cache_response(query, response, agent_name)
return responseScaling Strategies
Section titled “Scaling Strategies”Horizontal Scaling with Load Balancing
Section titled “Horizontal Scaling with Load Balancing”"""Load balancing configuration for multiple agent instances."""
# nginx.conf - Reverse proxy and load balancerupstream pydantic_agents { least_conn; # Least connection load balancing server agent1:8000 weight=1; server agent2:8000 weight=1; server agent3:8000 weight=1;
keepalive 32;}
server { listen 80; server_name api.example.com;
location /api/agent/ { proxy_pass http://pydantic_agents; proxy_http_version 1.1; proxy_set_header Connection ""; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $host;
# Timeout settings proxy_connect_timeout 60s; proxy_send_timeout 60s; proxy_read_timeout 60s; }
# Health check endpoint location /health { proxy_pass http://pydantic_agents/health; }}Queue-Based Processing for Long-Running Tasks
Section titled “Queue-Based Processing for Long-Running Tasks”"""Use message queues (Celery, RabbitMQ) for long-running agent tasks."""
from celery import Celeryfrom pydantic_ai import Agent
app = Celery('agent_tasks', broker='redis://localhost:6379')
@app.task( bind=True, max_retries=3, default_retry_delay=60)def run_analysis_agent(self, data: str): """Long-running agent task with Celery."""
try: agent = Agent('openai:gpt-4o') result = agent.run_sync(data)
# Store result store_result(self.request.id, result.output)
return result.output
except Exception as e: # Retry with exponential backoff raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
# FastAPI endpointfrom fastapi import BackgroundTasks
@app.post('/api/analyze-async')async def analyze_async(data: str, background_tasks: BackgroundTasks): """Submit long task to queue."""
task = run_analysis_agent.delay(data)
return { 'task_id': task.id, 'status': 'queued', 'check_url': f'/api/task/{task.id}' }
@app.get('/api/task/{task_id}')async def get_task_result(task_id: str): """Poll for task result."""
from celery.result import AsyncResult
result = AsyncResult(task_id, app=app)
return { 'task_id': task_id, 'status': result.state, 'result': result.result if result.ready() else None }(Continues with more production patterns, best practices, and operational guidance)