Chapter 3 — Multi-Agent Systems
Chapter 3 — Multi-Agent Systems
Section titled “Chapter 3 — Multi-Agent Systems”What you’ll learn: three canonical multi-agent topologies — a supervisor routing to specialists, parallel workers with fan-out/fan-in, and direct hand-off between agents.
Time: ~25 minutes.
Prereqs: Chapter 2 — Your first agent.
Multi-Agent Systems
Section titled “Multi-Agent Systems”Example 1: Supervisor Pattern
Section titled “Example 1: Supervisor Pattern”One coordinator agent routing to specialists:
from langchain_core.messages import BaseMessage# Note: AgentExecutor and create_tool_calling_agent require `pip install langchain langchain-anthropic`# from langchain.agents import AgentExecutor, create_tool_calling_agent# from langchain_anthropic import ChatAnthropicfrom langchain_core.prompts import ChatPromptTemplatefrom langgraph.types import Sendfrom langchain_core.tools import toolfrom typing import List
# Define specialized agents' tools@tooldef research_tool(query: str) -> str: """Search the web for information.""" return f"Research results for: {query}"
@tooldef calculator_tool(expression: str) -> str: """Evaluate a simple arithmetic expression.
SECURITY: never call `eval()` on model/user-provided input — it executes arbitrary Python. We restrict the AST to a small set of arithmetic nodes. For anything beyond `+ - * / ** ()`, use a dedicated library such as `simpleeval` or a CAS like SymPy. """ import ast import operator as op
allowed_binops = { ast.Add: op.add, ast.Sub: op.sub, ast.Mult: op.mul, ast.Div: op.truediv, ast.Pow: op.pow, ast.Mod: op.mod, } allowed_unaryops = {ast.UAdd: op.pos, ast.USub: op.neg}
def _eval(node): if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)): return node.value if isinstance(node, ast.BinOp) and type(node.op) in allowed_binops: return allowed_binops[type(node.op)](_eval(node.left), _eval(node.right)) if isinstance(node, ast.UnaryOp) and type(node.op) in allowed_unaryops: return allowed_unaryops[type(node.op)](_eval(node.operand)) raise ValueError(f"Unsupported expression: {ast.dump(node)}")
tree = ast.parse(expression, mode="eval") return str(_eval(tree.body))
# Helper function to create a specialist agentdef create_agent(llm, tools: list, system_prompt: str) -> AgentExecutor: prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), ("placeholder", "{chat_history}"), ("human", "{input}"), ("placeholder", "{agent_scratchpad}"), ]) agent = create_tool_calling_agent(llm, tools, prompt) executor = AgentExecutor(agent=agent, tools=tools) return executor
# Create agent runner functiondef agent_node(state: dict, agent: AgentExecutor, name: str) -> dict: result = agent.invoke(state) return {"messages": [BaseMessage(type="human", content=result["output"], name=name)]}
# Create specialized agentsmodel = ChatAnthropic(model="claude-3-5-sonnet-20240620")research_agent = create_agent(model, [research_tool], "You are a research specialist. Find accurate information.")math_agent = create_agent(model, [calculator_tool], "You are a math specialist. Solve problems step-by-step.")
# Supervisor stateclass SupervisorState(TypedDict): messages: Annotated[list, add_messages] next: str
# Supervisor logicdef supervisor_node(state: SupervisorState) -> dict: """Analyze request and pick best agent.""" last_message = state["messages"][-1]
# If the last message is from an agent, the supervisor can decide to end the process if hasattr(last_message, 'name'): return {"next": "END"}
prompt = f"""You manage two specialist agents:- research_agent: For web searches, fact-finding, current info- math_agent: For calculations and equations
Request: {last_message.content}
Which agent should handle this? Reply with ONLY the agent name or FINISH."""
response = model.invoke(prompt) next_agent = response.content.strip()
return {"next": next_agent}
# Build supervisor graphbuilder = StateGraph(SupervisorState)builder.add_node("supervisor", supervisor_node)builder.add_node("research_agent", lambda state: agent_node(state, research_agent, "research_agent"))builder.add_node("math_agent", lambda state: agent_node(state, math_agent, "math_agent"))
builder.add_edge(START, "supervisor")builder.add_conditional_edges( "supervisor", lambda x: x["next"], { "research_agent": "research_agent", "math_agent": "math_agent", "FINISH": END, })
# Agents return to supervisorbuilder.add_edge("research_agent", "supervisor")builder.add_edge("math_agent", "supervisor")
supervisor_graph = builder.compile(checkpointer=InMemorySaver())
# Test itconfig = {"configurable": {"thread_id": "supervisor-test"}}
result = supervisor_graph.invoke( {"messages": [{"role": "user", "content": "Research AI trends and calculate 25% of 1000"}]}, config=config)
print("Final response:", result["messages"][-1].content)Example 2: Parallel Worker Pattern
Section titled “Example 2: Parallel Worker Pattern”Fan-out to multiple workers, collect results:
from langgraph.types import Send
# Shared graph state — only fields the main graph sees.class WorkflowState(TypedDict): tasks: list[dict] results: Annotated[dict, lambda x, y: {**x, **y}] # reducer merges dicts
# Per-worker payload — the shape delivered by each `Send`. Workers see# exactly these fields, not the whole WorkflowState.class WorkerPayload(TypedDict): task_id: str task_data: str
def dispatch(state: WorkflowState) -> list[Send]: """Fan-out: one Send per task. Returning a list of Sends from a conditional-edge function tells LangGraph to launch that many parallel copies of the target node, each with its own payload.""" return [ Send("worker", {"task_id": task["id"], "task_data": task["data"]}) for task in state["tasks"] ]
def worker_node(payload: WorkerPayload) -> dict: """Process one task. Receives the per-worker payload from `Send`.""" result = f"Processed: {payload['task_data']}" # Returning to the shared WorkflowState: the `results` reducer merges # each worker's single-entry dict into the aggregate dict. return {"results": {payload["task_id"]: result}}
def collect_results(state: WorkflowState) -> dict: """Fan-in: runs once after all workers complete.""" summary = f"Completed {len(state['results'])} tasks" return {"results": {"summary": summary}}
# Build parallel graphbuilder = StateGraph(WorkflowState)builder.add_node("worker", worker_node)builder.add_node("collect", collect_results)
# Fan-out: a conditional edge whose function returns list[Send] launches# N parallel workers. `["worker"]` is the list of allowed targets.builder.add_conditional_edges(START, dispatch, ["worker"])
# Fan-in: every worker edge lands on collect; LangGraph waits until all# parallel branches from a fan-out converge before running the next node.builder.add_edge("worker", "collect")builder.add_edge("collect", END)
parallel_graph = builder.compile()
# Testresult = parallel_graph.invoke({ "tasks": [ {"id": "task-1", "data": "data-a"}, {"id": "task-2", "data": "data-b"}, {"id": "task-3", "data": "data-c"} ]})
print("Results:", result["results"])# Output: {'task-1': 'Processed: data-a', 'task-2': 'Processed: data-b', ...}Example 3: Handoff Pattern
Section titled “Example 3: Handoff Pattern”Agents handing off to each other mid-conversation:
class HandoffState(TypedDict): messages: Annotated[list, add_messages] current_agent: str handoff_reason: str
def agent_a(state: HandoffState) -> dict: """First agent - handles initial request.""" last_message = state["messages"][-1].content
# Check if should handoff if "transfer" in last_message.lower(): return { "current_agent": "agent_b", "handoff_reason": "User requested transfer", "messages": [ { "role": "assistant", "content": "Transferring to agent B..." } ] }
# Normal response response = f"Agent A responds to: {last_message}" return { "current_agent": "agent_a", "messages": [{"role": "assistant", "content": response}] }
def agent_b(state: HandoffState) -> dict: """Second agent - takes over.""" last_message = state["messages"][-1].content response = f"Agent B (now handling): {last_message}" return { "current_agent": "agent_b", "messages": [{"role": "assistant", "content": response}] }
def route_agent(state: HandoffState) -> str: """Route to current agent.""" agent = state.get("current_agent", "agent_a") return agent
# Build handoff graphbuilder = StateGraph(HandoffState)builder.add_node("agent_a", agent_a)builder.add_node("agent_b", agent_b)
builder.add_edge(START, "agent_a")builder.add_conditional_edges( "agent_a", lambda state: "agent_b" if state.get("current_agent") == "agent_b" else "agent_a")builder.add_edge("agent_b", END)
handoff_graph = builder.compile(checkpointer=InMemorySaver())
# Test handoffconfig = {"configurable": {"thread_id": "handoff-test"}}
result = handoff_graph.invoke( {"messages": [{"role": "user", "content": "Help me"}], "current_agent": "agent_a"}, config=config)print("Step 1:", result["messages"][-1].content)
result = handoff_graph.invoke( {"messages": [{"role": "user", "content": "Transfer me to another agent"}]}, config=config)print("Step 2:", result["messages"][-1].content)print("Current agent:", result["current_agent"])