CrewAI Production Guide
CrewAI Production Guide
Section titled “CrewAI Production Guide”Deploying, Testing, Optimising, and Maintaining Multi-Agent Systems
Section titled “Deploying, Testing, Optimising, and Maintaining Multi-Agent Systems”Table of Contents
Section titled “Table of Contents”- Production Fundamentals
- Testing Strategies
- Deployment Patterns
- Monitoring and Logging
- Performance Optimisation
- Cost Optimisation
- Error Handling and Recovery
- Security Considerations
- Scaling Strategies
- Integration Patterns
Production Fundamentals
Section titled “Production Fundamentals”Development vs. Production Configuration
Section titled “Development vs. Production Configuration”Development Environment:
crew = Crew( agents=agents, tasks=tasks, verbose=True, # Enable detailed logging memory=True, # Enable memory for debugging max_rpm=100 # Higher limit for testing)Production Environment:
crew = Crew( agents=agents, tasks=tasks, verbose=False, # Minimal output memory=True, # Maintain history for analysis max_rpm=10, # Respect API rate limits share_crew_state=True # Optimise for reliability)Configuration Management
Section titled “Configuration Management”Store configuration in environment variables:
import osfrom dotenv import load_dotenv
load_dotenv()
# API ConfigurationOPENAI_API_KEY = os.getenv('OPENAI_API_KEY')ANTHROPIC_API_KEY = os.getenv('ANTHROPIC_API_KEY')
# Model ConfigurationPRODUCTION_MODEL = os.getenv('PRODUCTION_MODEL', 'openai/gpt-4-turbo')FALLBACK_MODEL = os.getenv('FALLBACK_MODEL', 'openai/gpt-3.5-turbo')
# Execution ConfigurationMAX_RPM = int(os.getenv('MAX_RPM', '10'))VERBOSE = os.getenv('VERBOSE', 'False').lower() == 'true'
# Create LLM based on configurationllm = LLM( model=PRODUCTION_MODEL, api_key=OPENAI_API_KEY, temperature=0.5)Project Structure for Production
Section titled “Project Structure for Production”my_crewai_project/├── .env # Environment variables├── .env.example # Template for env vars├── .gitignore├── README.md├── requirements.txt├── setup.py├── pyproject.toml│├── src/│ └── my_project/│ ├── __init__.py│ ├── main.py # Entry point│ ├── config.py # Configuration management│ ├── logging_config.py # Logging setup│ ││ ├── agents/│ │ ├── __init__.py│ │ ├── base_agent.py│ │ └── specialized_agents.py│ ││ ├── tasks/│ │ ├── __init__.py│ │ └── task_definitions.py│ ││ ├── crews/│ │ ├── __init__.py│ │ └── crew_factory.py│ ││ ├── tools/│ │ ├── __init__.py│ │ └── custom_tools.py│ ││ └── utils/│ ├── __init__.py│ ├── error_handling.py│ ├── monitoring.py│ └── caching.py│├── tests/│ ├── unit/│ │ ├── test_agents.py│ │ ├── test_tasks.py│ │ └── test_tools.py│ ││ ├── integration/│ │ ├── test_crew_execution.py│ │ └── test_end_to_end.py│ ││ └── performance/│ ├── test_latency.py│ └── test_throughput.py│└── docs/ ├── deployment.md ├── troubleshooting.md └── api_reference.mdTesting Strategies
Section titled “Testing Strategies”Unit Testing Agents
Section titled “Unit Testing Agents”import pytestfrom crewai import Agent, LLM
def test_agent_creation(): """Test agent initialisation.""" agent = Agent( role="Test Agent", goal="Test goal", backstory="Test backstory" )
assert agent.role == "Test Agent" assert agent.goal == "Test goal" assert agent.backstory == "Test backstory"
def test_agent_with_tools(): """Test agent with tool assignment.""" from crewai_tools import FileReadTool
reader = FileReadTool() agent = Agent( role="File Reader", goal="Read files", tools=[reader] )
assert len(agent.tools) == 1 assert agent.tools[0].name == "read_file"
@pytest.mark.asyncioasync def test_agent_async_execution(): """Test async agent execution.""" agent = Agent(role="Async Agent", goal="Test async") # Test async functionality passUnit Testing Tasks
Section titled “Unit Testing Tasks”def test_task_creation(): """Test task initialisation.""" agent = Agent(role="Test", goal="Test")
task = Task( description="Test task", expected_output="Test output", agent=agent )
assert task.description == "Test task" assert task.agent == agent
def test_task_with_async_execution(): """Test async task configuration.""" agent = Agent(role="Test", goal="Test")
task = Task( description="Async task", expected_output="Output", agent=agent, async_execution=True )
assert task.async_execution is TrueIntegration Testing Crews
Section titled “Integration Testing Crews”import pytestfrom crewai import Agent, Task, Crew, Process, LLM
@pytest.fixturedef test_crew(): """Create a test crew.""" agent1 = Agent( role="Test Agent 1", goal="Test goal 1", backstory="Test" ) agent2 = Agent( role="Test Agent 2", goal="Test goal 2", backstory="Test" )
task1 = Task( description="Test task 1", expected_output="Output 1", agent=agent1 )
task2 = Task( description="Test task 2", expected_output="Output 2", agent=agent2 )
crew = Crew( agents=[agent1, agent2], tasks=[task1, task2], process=Process.sequential )
return crew
def test_crew_execution(test_crew): """Test crew execution.""" result = test_crew.kickoff() assert result is not None assert isinstance(result, (str, dict))
def test_crew_sequential_process(test_crew): """Test sequential process execution.""" assert test_crew.process == Process.sequential # Verify tasks execute in orderPerformance Testing
Section titled “Performance Testing”import timeimport pytest
def test_crew_execution_time(): """Test crew execution latency.""" crew = setup_test_crew()
start_time = time.time() result = crew.kickoff() execution_time = time.time() - start_time
# Assert execution completes within threshold assert execution_time < 30, f"Execution took {execution_time}s, threshold is 30s"
def test_crew_throughput(): """Test crew throughput.""" crew = setup_test_crew()
start_time = time.time()
for _ in range(10): crew.kickoff()
total_time = time.time() - start_time avg_time_per_exec = total_time / 10
assert avg_time_per_exec < 5, f"Average execution time {avg_time_per_exec}s exceeds threshold"
@pytest.mark.performancedef test_memory_usage(): """Test memory consumption.""" import psutil
crew = setup_test_crew()
process = psutil.Process() memory_before = process.memory_info().rss / 1024 / 1024 # MB
crew.kickoff()
memory_after = process.memory_info().rss / 1024 / 1024 memory_used = memory_after - memory_before
assert memory_used < 500, f"Memory usage {memory_used}MB exceeds threshold"Deployment Patterns
Section titled “Deployment Patterns”Docker Deployment
Section titled “Docker Deployment”Dockerfile:
FROM python:3.11-slim
WORKDIR /app
# Install dependenciesCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt
# Copy applicationCOPY src/ ./src/COPY .env .
# Set environmentENV PYTHONUNBUFFERED=1ENV PYTHONPATH=/app
# Run applicationCMD ["python", "-m", "src.my_project.main"]Docker Compose:
version: '3.8'
services: crewai-app: build: . environment: - OPENAI_API_KEY=${OPENAI_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - LOG_LEVEL=INFO volumes: - ./data:/app/data ports: - "8000:8000" restart: unless-stopped
redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data
volumes: redis_data:FastAPI Integration
Section titled “FastAPI Integration”from fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelfrom typing import Optionalfrom crewai import Crew
app = FastAPI(title="CrewAI Service")
class TaskRequest(BaseModel): topic: str depth: str = "comprehensive" preferences: Optional[dict] = None
class TaskResponse(BaseModel): task_id: str status: str result: Optional[str] = None
@app.post("/execute", response_model=TaskResponse)async def execute_crew(request: TaskRequest): """Execute crew task via API.""" try: crew = create_crew_for_task(request.topic, request.depth) result = crew.kickoff()
return TaskResponse( task_id="generated_id", status="completed", result=result ) except Exception as e: raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")async def health_check(): """Health check endpoint.""" return {"status": "healthy"}Kubernetes Deployment
Section titled “Kubernetes Deployment”deployment.yaml:
apiVersion: apps/v1kind: Deploymentmetadata: name: crewai-deploymentspec: replicas: 3 selector: matchLabels: app: crewai template: metadata: labels: app: crewai spec: containers: - name: crewai image: my-registry/crewai:latest ports: - containerPort: 8000 env: - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: api-keys key: openai resources: requests: memory: "512Mi" cpu: "500m" limits: memory: "1Gi" cpu: "1000m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10Monitoring and Logging
Section titled “Monitoring and Logging”Comprehensive Logging Setup
Section titled “Comprehensive Logging Setup”import loggingimport jsonfrom datetime import datetime
# Configure logginglogging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('crew_execution.log'), logging.StreamHandler() ])
class CrewLogger: """Custom logger for CrewAI operations."""
def __init__(self, name): self.logger = logging.getLogger(name)
def log_agent_action(self, agent_name, action, details): """Log agent actions.""" self.logger.info(json.dumps({ 'timestamp': datetime.now().isoformat(), 'type': 'agent_action', 'agent': agent_name, 'action': action, 'details': details }))
def log_task_start(self, task_name): """Log task start.""" self.logger.info(f"Task started: {task_name}")
def log_task_complete(self, task_name, duration, success=True): """Log task completion.""" status = "SUCCESS" if success else "FAILED" self.logger.info(f"Task {task_name} {status} (Duration: {duration}s)")
def log_error(self, component, error): """Log errors.""" self.logger.error(f"{component} error: {str(error)}", exc_info=True)
# Usagelogger = CrewLogger(__name__)logger.log_task_start("research_task")Monitoring Metrics
Section titled “Monitoring Metrics”from dataclasses import dataclassfrom typing import Dict, Listimport time
@dataclassclass ExecutionMetrics: """Metrics for crew execution.""" crew_name: str start_time: float end_time: float total_agents: int total_tasks: int successful_tasks: int failed_tasks: int total_tokens_used: int total_cost: float
@property def execution_time(self) -> float: return self.end_time - self.start_time
@property def success_rate(self) -> float: total = self.successful_tasks + self.failed_tasks return (self.successful_tasks / total * 100) if total > 0 else 0
def to_dict(self) -> Dict: return { 'crew_name': self.crew_name, 'execution_time': self.execution_time, 'success_rate': self.success_rate, 'total_agents': self.total_agents, 'total_tasks': self.total_tasks, 'total_tokens_used': self.total_tokens_used, 'total_cost': self.total_cost }
class MetricsCollector: """Collect and track execution metrics."""
def __init__(self): self.metrics: List[ExecutionMetrics] = []
def add_metrics(self, metrics: ExecutionMetrics): """Add metrics record.""" self.metrics.append(metrics)
def get_average_execution_time(self) -> float: """Get average execution time.""" if not self.metrics: return 0 total_time = sum(m.execution_time for m in self.metrics) return total_time / len(self.metrics)
def get_overall_success_rate(self) -> float: """Get overall success rate.""" if not self.metrics: return 0 total_successful = sum(m.successful_tasks for m in self.metrics) total_tasks = sum(m.successful_tasks + m.failed_tasks for m in self.metrics) return (total_successful / total_tasks * 100) if total_tasks > 0 else 0Performance Optimisation
Section titled “Performance Optimisation”LLM Caching Strategy
Section titled “LLM Caching Strategy”from functools import lru_cachefrom typing import Tuple
class LLMCache: """Cache LLM responses to reduce API calls."""
def __init__(self, cache_size=128): self.cache = {} self.cache_size = cache_size
def get_cache_key(self, prompt: str, model: str) -> str: """Generate cache key from prompt and model.""" import hashlib combined = f"{model}:{prompt}" return hashlib.sha256(combined.encode()).hexdigest()
def get(self, prompt: str, model: str) -> str | None: """Retrieve cached response.""" key = self.get_cache_key(prompt, model) return self.cache.get(key)
def set(self, prompt: str, model: str, response: str): """Cache response.""" if len(self.cache) >= self.cache_size: # Remove oldest entry oldest_key = next(iter(self.cache)) del self.cache[oldest_key]
key = self.get_cache_key(prompt, model) self.cache[key] = response
def clear(self): """Clear cache.""" self.cache.clear()
# Usagecache = LLMCache(cache_size=256)
# Before API callcached_response = cache.get(prompt, model_name)if cached_response: return cached_response
# After API callresponse = llm.call(prompt)cache.set(prompt, model_name, response)Parallel Task Execution
Section titled “Parallel Task Execution”import asynciofrom typing import List
async def execute_tasks_concurrently(tasks: List[Task], agents: List[Agent]) -> List[str]: """Execute multiple tasks concurrently."""
async def run_task(task, agent): # Convert synchronous task execution to async loop = asyncio.get_event_loop() return await loop.run_in_executor(None, task.execute, agent)
# Run all tasks concurrently results = await asyncio.gather( *[run_task(task, agent) for task, agent in zip(tasks, agents)] )
return results
# Usageasync def main(): tasks = [task1, task2, task3] agents = [agent1, agent2, agent3]
results = await execute_tasks_concurrently(tasks, agents) print(f"All tasks completed: {len(results)} results")
# Run async functionasyncio.run(main())Memory Optimisation
Section titled “Memory Optimisation”class OptimisedMemoryManager: """Manage memory efficiently."""
def __init__(self, max_memory_mb=512): self.max_memory = max_memory_mb * 1024 * 1024 # Convert to bytes self.memory_usage = {}
def should_cleanup(self) -> bool: """Check if cleanup is needed.""" import psutil process = psutil.Process() current_memory = process.memory_info().rss return current_memory > self.max_memory
def cleanup_old_entries(self): """Remove oldest memory entries.""" # Remove least recently used entries sorted_entries = sorted( self.memory_usage.items(), key=lambda x: x[1]['timestamp'] )
# Remove oldest 25% entries_to_remove = len(sorted_entries) // 4 for key, _ in sorted_entries[:entries_to_remove]: del self.memory_usage[key]
def add_memory_entry(self, key: str, value: dict): """Add memory entry with cleanup.""" if self.should_cleanup(): self.cleanup_old_entries()
from datetime import datetime self.memory_usage[key] = { **value, 'timestamp': datetime.now() }Cost Optimisation
Section titled “Cost Optimisation”Token Cost Tracking
Section titled “Token Cost Tracking”class CostTracker: """Track and optimise API costs."""
# Pricing per 1K tokens (as of 2024) PRICING = { 'gpt-4-turbo': {'input': 0.01, 'output': 0.03}, 'gpt-3.5-turbo': {'input': 0.0005, 'output': 0.0015}, 'claude-3-opus': {'input': 0.015, 'output': 0.075}, 'claude-3-sonnet': {'input': 0.003, 'output': 0.015} }
def __init__(self): self.total_cost = 0.0 self.token_usage = {}
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float: """Calculate cost for API call.""" if model not in self.PRICING: return 0.0
pricing = self.PRICING[model] input_cost = (input_tokens / 1000) * pricing['input'] output_cost = (output_tokens / 1000) * pricing['output']
total = input_cost + output_cost self.total_cost += total
# Track usage if model not in self.token_usage: self.token_usage[model] = {'input': 0, 'output': 0}
self.token_usage[model]['input'] += input_tokens self.token_usage[model]['output'] += output_tokens
return total
def get_cost_report(self) -> dict: """Get comprehensive cost report.""" return { 'total_cost': f"${self.total_cost:.4f}", 'by_model': { model: { 'input_tokens': self.token_usage[model]['input'], 'output_tokens': self.token_usage[model]['output'], 'cost': f"${self.calculate_model_cost(model):.4f}" } for model in self.token_usage } }
def calculate_model_cost(self, model: str) -> float: """Calculate cost for specific model.""" pricing = self.PRICING.get(model, {'input': 0, 'output': 0}) tokens = self.token_usage.get(model, {'input': 0, 'output': 0})
input_cost = (tokens['input'] / 1000) * pricing['input'] output_cost = (tokens['output'] / 1000) * pricing['output']
return input_cost + output_cost
# Usagetracker = CostTracker()
# After each LLM callcost = tracker.calculate_cost('gpt-4-turbo', input_tokens=500, output_tokens=200)print(f"Cost for this call: ${cost:.4f}")
# Get overall reportreport = tracker.get_cost_report()print(f"Total cost: {report['total_cost']}")Error Handling and Recovery
Section titled “Error Handling and Recovery”Robust Error Handling
Section titled “Robust Error Handling”from typing import Optional, Callableimport time
class TaskExecutor: """Execute tasks with error handling and retry logic."""
def __init__(self, max_retries=3, backoff_factor=2): self.max_retries = max_retries self.backoff_factor = backoff_factor
def execute_with_retry( self, task: Task, on_error: Optional[Callable] = None ) -> str: """Execute task with automatic retry."""
for attempt in range(self.max_retries): try: return self.execute_task(task)
except Exception as e: if attempt < self.max_retries - 1: # Calculate backoff time wait_time = self.backoff_factor ** attempt logger.warning( f"Task failed (attempt {attempt + 1}/{self.max_retries}). " f"Retrying in {wait_time}s. Error: {str(e)}" ) time.sleep(wait_time) else: # All retries exhausted if on_error: on_error(e) raise
def execute_task(self, task: Task) -> str: """Execute single task.""" # Implementation pass
# Usageexecutor = TaskExecutor(max_retries=3)
try: result = executor.execute_with_retry( task, on_error=lambda e: logger.error(f"Task failed: {e}") )except Exception as e: logger.critical(f"Task execution failed after retries: {e}")Security Considerations
Section titled “Security Considerations”API Key Management
Section titled “API Key Management”import osfrom cryptography.fernet import Fernet
class SecureKeyManager: """Manage API keys securely."""
def __init__(self, master_key: Optional[str] = None): if not master_key: master_key = os.getenv('MASTER_KEY')
self.cipher = Fernet(master_key.encode())
def encrypt_key(self, api_key: str) -> str: """Encrypt API key.""" return self.cipher.encrypt(api_key.encode()).decode()
def decrypt_key(self, encrypted_key: str) -> str: """Decrypt API key.""" return self.cipher.decrypt(encrypted_key.encode()).decode()
@staticmethod def load_from_env() -> dict: """Load keys from environment.""" return { 'openai': os.getenv('OPENAI_API_KEY'), 'anthropic': os.getenv('ANTHROPIC_API_KEY'), 'serper': os.getenv('SERPER_API_KEY') }
# Usagemanager = SecureKeyManager()
# Encrypt keys for storageencrypted = manager.encrypt_key(api_key)
# Decrypt keys for useapi_key = manager.decrypt_key(encrypted)Scaling Strategies
Section titled “Scaling Strategies”Horizontal Scaling with Load Balancing
Section titled “Horizontal Scaling with Load Balancing”from typing import Listimport random
class CrewLoadBalancer: """Distribute tasks across multiple crew instances."""
def __init__(self, crews: List[Crew]): self.crews = crews
def execute_balanced(self, task: Task) -> str: """Execute task on least loaded crew.""" # Select crew with lowest recent task count selected_crew = min(self.crews, key=lambda c: c.task_count) return selected_crew.execute_task(task)
def execute_parallel(self, tasks: List[Task]) -> List[str]: """Execute tasks in parallel across crews.""" results = [] for task, crew in zip(tasks, self.crews): result = crew.execute_task(task) results.append(result) return results
# Usagecrews = [create_crew() for _ in range(3)]balancer = CrewLoadBalancer(crews)
results = balancer.execute_parallel([task1, task2, task3])Integration Patterns
Section titled “Integration Patterns”Database Integration
Section titled “Database Integration”from sqlalchemy import create_engine, Column, String, DateTime, Integerfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmakerfrom datetime import datetime
Base = declarative_base()
class ExecutionRecord(Base): """Store crew execution records.""" __tablename__ = 'executions'
id = Column(Integer, primary_key=True) crew_name = Column(String) task_name = Column(String) status = Column(String) # success, failed, pending result = Column(String) created_at = Column(DateTime, default=datetime.utcnow) completed_at = Column(DateTime)
# Create database sessionengine = create_engine('sqlite:///crew_execution.db')Base.metadata.create_all(engine)Session = sessionmaker(bind=engine)
class ExecutionDatabase: """Manage execution records."""
def __init__(self): self.session = Session()
def record_execution(self, crew_name: str, task_name: str, result: str, status: str): """Record execution.""" record = ExecutionRecord( crew_name=crew_name, task_name=task_name, result=result, status=status, completed_at=datetime.utcnow() ) self.session.add(record) self.session.commit()
def get_execution_history(self, crew_name: str) -> List[ExecutionRecord]: """Get execution history.""" return self.session.query(ExecutionRecord).filter( ExecutionRecord.crew_name == crew_name ).order_by(ExecutionRecord.created_at.desc()).all()This production guide provides comprehensive strategies for deploying, monitoring, and optimising CrewAI systems in production environments.