LangGraph: Advanced Recipes & Real-World Patterns
LangGraph: Advanced Recipes & Real-World Patterns
Section titled “LangGraph: Advanced Recipes & Real-World Patterns”Updated for LangGraph 1.2.4 (June 2026)
This guide includes recipes demonstrating the latest v1.2.4 features:
- Node Caching for performance
- Deferred Nodes for fan-in patterns
- Pre/Post Model Hooks for LLM customization
- Cross-Thread Memory for persistent context
- Tools State Updates for dynamic behavior
- Command Tool for edgeless flows
InjectedState+InjectedStorefor context-aware tools (Recipe 9)Overwritefor resetting accumulated channels (Recipe 10)CheckpointTuplefor checkpoint history browsing and time-travel (Recipe 11)update_state/StateUpdatefor human-in-the-loop approval flows (Recipe 12)
Recipe 1: RAG System with Quality Control
Section titled “Recipe 1: RAG System with Quality Control”Retrieval-Augmented Generation with automatic re-retrieval and refinement:
from langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Sendfrom typing_extensions import TypedDictfrom typing import Annotatedfrom langgraph.graph.message import add_messagesimport json
class RAGState(TypedDict): question: str messages: Annotated[list, add_messages] retrieved_docs: list[dict] relevance_score: float generation_attempt: int final_answer: str source_citations: list[str]
def retrieve_documents(state: RAGState) -> dict: """Retrieve relevant documents.""" question = state["question"]
# Use semantic search docs = semantic_search( query=question, index="knowledge_base", top_k=5 )
return { "retrieved_docs": docs, "messages": [{ "role": "system", "content": f"Retrieved {len(docs)} documents" }] }
def grade_documents(state: RAGState) -> dict: """Grade relevance of retrieved documents."""
docs = state["retrieved_docs"] question = state["question"]
# Use LLM to grade graded = [] for doc in docs: grade_prompt = f""" Question: {question} Document: {doc['content']}
Is this document relevant to the question? (yes/no) Explain your reasoning. """
response = model.invoke(grade_prompt) is_relevant = "yes" in response.content.lower()
if is_relevant: graded.append(doc)
avg_relevance = len(graded) / len(docs) if docs else 0
return { "retrieved_docs": graded, "relevance_score": avg_relevance }
def decide_strategy(state: RAGState) -> str: """Decide whether to generate, re-retrieve, or escalate."""
relevance = state["relevance_score"] attempt = state["generation_attempt"]
if relevance > 0.7: return "generate" elif attempt < 2: return "refine_query" else: return "escalate"
def refine_query(state: RAGState) -> dict: """Refine query for better retrieval."""
original_query = state["question"] failed_attempt = state["retrieved_docs"]
refine_prompt = f""" Original question: {original_query}
The retrieval wasn't successful. Rephrase the question to: 1. Be more specific 2. Include key terms 3. Clarify the intent
Provide only the refined question. """
response = model.invoke(refine_prompt) refined_query = response.content
return { "question": refined_query, "generation_attempt": state["generation_attempt"] + 1, "messages": [{ "role": "assistant", "content": f"Query refined: {refined_query}" }] }
def generate_answer(state: RAGState) -> dict: """Generate answer from retrieved documents."""
question = state["question"] docs = state["retrieved_docs"]
context = "\n\n".join([ f"Source {i+1} ({doc.get('title', 'Unknown')}):\n{doc['content']}" for i, doc in enumerate(docs) ])
prompt = f""" Question: {question}
Context from documents: {context}
Provide a comprehensive answer based on the context. Cite sources by referencing the source numbers (e.g., [1], [2]). """
response = model.invoke(prompt)
# Extract citations citations = [] for i, doc in enumerate(docs): source_id = f"[{i+1}]" if source_id in response.content: citations.append(f"{i+1}. {doc.get('title', 'Unknown')}")
return { "final_answer": response.content, "source_citations": citations, "messages": [{ "role": "assistant", "content": response.content }] }
def escalate(state: RAGState) -> dict: """Escalate to human when unable to answer."""
return { "final_answer": "Unable to find relevant information. Escalating to human support.", "messages": [{ "role": "assistant", "content": "This question requires human expert review." }] }
# Build RAG graphrag_builder = StateGraph(RAGState)rag_builder.add_node("retrieve", retrieve_documents)rag_builder.add_node("grade", grade_documents)rag_builder.add_node("generate", generate_answer)rag_builder.add_node("refine", refine_query)rag_builder.add_node("escalate", escalate)
rag_builder.add_edge(START, "retrieve")rag_builder.add_edge("retrieve", "grade")
rag_builder.add_conditional_edges( "grade", decide_strategy, { "generate": "generate", "refine_query": "refine", "escalate": "escalate" })
rag_builder.add_edge("generate", END)rag_builder.add_edge("escalate", END)rag_builder.add_edge("refine", "retrieve") # Loop back
rag_graph = rag_builder.compile(checkpointer=InMemorySaver())
# Usageresult = rag_graph.invoke({ "question": "How do I set up LangGraph?", "generation_attempt": 0})
print(f"Answer: {result['final_answer']}")print(f"Sources: {result['source_citations']}")Recipe 2: Customer Support Ticket Classifier & Router
Section titled “Recipe 2: Customer Support Ticket Classifier & Router”Classify support tickets and route to appropriate handler:
from enum import Enum
class TicketPriority(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical"
class TicketCategory(Enum): BILLING = "billing" TECHNICAL = "technical" FEATURE_REQUEST = "feature_request" BUG = "bug" OTHER = "other"
class SupportTicketState(TypedDict): ticket_id: str customer_email: str subject: str description: str priority: TicketPriority category: TicketCategory assigned_to: str response: str needs_escalation: bool
def classify_ticket(state: SupportTicketState) -> dict: """Classify ticket priority and category."""
ticket_text = f"{state['subject']}\n{state['description']}"
classification_prompt = f""" Classify this support ticket:
{ticket_text}
Respond with JSON: {{ "priority": "low|medium|high|critical", "category": "billing|technical|feature_request|bug|other", "summary": "one line summary" }} """
response = model.invoke(classification_prompt)
# Parse JSON response import json try: result = json.loads(response.content) priority = TicketPriority[result["priority"].upper()] category = TicketCategory[result["category"].upper()] except: priority = TicketPriority.MEDIUM category = TicketCategory.OTHER
return { "priority": priority, "category": category }
def route_ticket(state: SupportTicketState) -> str: """Route to appropriate handler."""
if state["priority"] == TicketPriority.CRITICAL: return "escalate" elif state["category"] == TicketCategory.BILLING: return "billing_handler" elif state["category"] == TicketCategory.TECHNICAL: return "tech_handler" elif state["category"] == TicketCategory.BUG: return "bug_handler" else: return "general_handler"
def billing_handler(state: SupportTicketState) -> dict: """Handle billing issues."""
prompt = f""" Customer billing inquiry: {state['description']}
Provide helpful guidance on billing. """
response = model.invoke(prompt)
return { "response": response.content, "assigned_to": "billing-team" }
def tech_handler(state: SupportTicketState) -> dict: """Handle technical issues."""
prompt = f""" Technical support request: {state['description']}
Provide step-by-step technical guidance. Suggest debugging steps. """
response = model.invoke(prompt)
return { "response": response.content, "assigned_to": "technical-support" }
def bug_handler(state: SupportTicketState) -> dict: """Handle bug reports."""
prompt = f""" Bug report: {state['description']}
Acknowledge the bug. Request additional information if needed. Provide workaround if available. """
response = model.invoke(prompt)
return { "response": response.content, "assigned_to": "engineering" }
def general_handler(state: SupportTicketState) -> dict: """Handle general inquiries."""
prompt = f""" Customer inquiry: {state['description']}
Provide helpful response. """
response = model.invoke(prompt)
return { "response": response.content, "assigned_to": "general-support" }
def escalate(state: SupportTicketState) -> dict: """Escalate critical issues."""
escalation_prompt = f""" Critical support ticket needs immediate attention:
Priority: {state['priority'].value} Category: {state['category'].value} Description: {state['description']}
Prepare escalation brief for management. """
response = model.invoke(escalation_prompt)
return { "response": response.content, "assigned_to": "management", "needs_escalation": True }
# Build support graphsupport_builder = StateGraph(SupportTicketState)support_builder.add_node("classify", classify_ticket)support_builder.add_node("billing", billing_handler)support_builder.add_node("technical", tech_handler)support_builder.add_node("bug", bug_handler)support_builder.add_node("general", general_handler)support_builder.add_node("escalate", escalate)
support_builder.add_edge(START, "classify")
support_builder.add_conditional_edges( "classify", route_ticket, { "billing_handler": "billing", "tech_handler": "technical", "bug_handler": "bug", "general_handler": "general", "escalate": "escalate" })
support_builder.add_edge("billing", END)support_builder.add_edge("technical", END)support_builder.add_edge("bug", END)support_builder.add_edge("general", END)support_builder.add_edge("escalate", END)
support_graph = support_builder.compile(checkpointer=InMemorySaver())
# Usageresult = support_graph.invoke({ "ticket_id": "TKT-001", "customer_email": "user@example.com", "subject": "Billing charge issue", "description": "I was charged twice this month. Please help!"})
print(f"Priority: {result['priority'].value}")print(f"Category: {result['category'].value}")print(f"Response: {result['response']}")print(f"Assigned to: {result['assigned_to']}")Recipe 3: Research Agent with Parallel Data Sources
Section titled “Recipe 3: Research Agent with Parallel Data Sources”Gather information from multiple sources in parallel:
from langgraph.types import Sendfrom datetime import datetime
class ResearchState(TypedDict): topic: str research_queries: list[str] web_results: list[dict] academic_results: list[dict] news_results: list[dict] synthesized_report: str citations: list[dict]
def generate_queries(state: ResearchState) -> dict: """Generate search queries from topic."""
prompt = f""" Topic: {state['topic']}
Generate 3 different search queries to thoroughly research this topic: 1. General overview 2. Recent developments 3. Expert perspectives
Return as JSON array of queries. """
response = model.invoke(prompt)
import json queries = json.loads(response.content)
return {"research_queries": queries}
def parallel_search(state: ResearchState) -> list[Send]: """Create parallel search tasks."""
return [ Send("web_search", {"query": q}) for q in state["research_queries"] ] + [ Send("academic_search", {"query": q}) for q in state["research_queries"] ] + [ Send("news_search", {"query": q}) for q in state["research_queries"] ]
def web_search(state: ResearchState) -> dict: """Search general web."""
from tavily import Client
client = Client(api_key=os.getenv("TAVILY_API_KEY"))
results = client.search( query=state.get("query", state["topic"]), include_answer=True )
return { "web_results": results["results"][:5] # Top 5 }
def academic_search(state: ResearchState) -> dict: """Search academic sources."""
# Use academic API or service results = semantic_scholar_search( query=state.get("query", state["topic"]), limit=5 )
return {"academic_results": results}
def news_search(state: ResearchState) -> dict: """Search recent news."""
# Use news API from newsapi import NewsApiClient
newsapi = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
results = newsapi.get_everything( q=state.get("query", state["topic"]), sort_by="recency", language="en" )
return {"news_results": results["articles"][:5]}
def synthesize_report(state: ResearchState) -> dict: """Synthesize all research into report."""
web_summary = "\n".join([ f"- {r['title']}: {r['snippet']}" for r in state["web_results"][:3] ])
academic_summary = "\n".join([ f"- {r.get('title', 'Unknown')}" for r in state["academic_results"][:3] ])
news_summary = "\n".join([ f"- {r['title']} ({r['publishedAt'][:10]})" for r in state["news_results"][:3] ])
synthesis_prompt = f""" Topic: {state['topic']}
Web search results: {web_summary}
Academic research: {academic_summary}
Recent news: {news_summary}
Write a comprehensive research report synthesizing these sources. Include: 1. Overview 2. Key findings 3. Recent developments 4. Expert insights 5. Implications """
response = model.invoke(synthesis_prompt)
# Compile citations citations = [] for source_list in [state["web_results"], state["academic_results"], state["news_results"]]: for source in source_list[:3]: citations.append({ "title": source.get("title", "Unknown"), "url": source.get("link") or source.get("url"), "date": source.get("publishedAt", "Unknown")[:10] })
return { "synthesized_report": response.content, "citations": citations }
# Build research graphresearch_builder = StateGraph(ResearchState)research_builder.add_node("generate_queries", generate_queries)research_builder.add_node("web_search", web_search)research_builder.add_node("academic_search", academic_search)research_builder.add_node("news_search", news_search)research_builder.add_node("synthesize", synthesize_report)
research_builder.add_edge(START, "generate_queries")
# Parallel searchresearch_builder.add_conditional_edges( "generate_queries", lambda _: ["web_search", "academic_search", "news_search"], ["web_search", "academic_search", "news_search"])
# Gather and synthesizeresearch_builder.add_edge("web_search", "synthesize")research_builder.add_edge("academic_search", "synthesize")research_builder.add_edge("news_search", "synthesize")research_builder.add_edge("synthesize", END)
research_graph = research_builder.compile()
# Usageresult = research_graph.invoke({"topic": "AI safety 2024"})
print(result["synthesized_report"])print("\nCitations:")for i, cite in enumerate(result["citations"][:5], 1): print(f"{i}. {cite['title']} ({cite['date']})")Recipe 4: Agentic Loop with Tool Calling
Section titled “Recipe 4: Agentic Loop with Tool Calling”Autonomous agent that reasons and acts:
from langgraph.prebuilt import ToolNode, tools_conditionfrom langchain_core.tools import tool
# Define specialized tools@tooldef search_knowledge_base(query: str) -> str: """Search internal knowledge base.""" results = db.search_documents(query, limit=3) return "\n".join([r["content"] for r in results])
@tooldef check_inventory(product_id: str) -> dict: """Check product inventory.""" return { "product_id": product_id, "in_stock": True, "quantity": 50 }
@tooldef submit_order(user_id: str, items: list[dict]) -> dict: """Submit an order.""" order_id = f"ORD-{uuid.uuid4().hex[:8].upper()}" return { "order_id": order_id, "status": "confirmed", "total": 99.99 }
@tooldef send_email(to: str, subject: str, body: str) -> str: """Send email notification.""" # Send email return f"Email sent to {to}"
tools = [ search_knowledge_base, check_inventory, submit_order, send_email]
# Create agentmodel = ChatAnthropic(model="claude-3-5-sonnet-20241022")
# Build custom agent for more controlclass AgentState(TypedDict): messages: Annotated[list, add_messages] user_request: str reasoning: str action_taken: bool
def agent_reasoning_node(state: AgentState) -> dict: """Agent reasons about what to do."""
system_prompt = """You are a helpful shopping assistant. Analyze user requests and decide what actions to take. Use available tools to help the customer."""
response = model.invoke(state["messages"])
# Capture reasoning reasoning = response.content has_tool_calls = hasattr(response, 'tool_calls') and len(response.tool_calls) > 0
return { "messages": [response], "reasoning": reasoning, "action_taken": has_tool_calls }
# Build agentic graphagent_builder = StateGraph(AgentState)agent_builder.add_node("reasoning", agent_reasoning_node)agent_builder.add_node("tools", ToolNode(tools))
agent_builder.add_edge(START, "reasoning")
# Use tools_condition for automatic routingagent_builder.add_conditional_edges( "reasoning", tools_condition, {"tools": "tools", END: END})
agent_builder.add_edge("tools", "reasoning") # Loop back
agentic_graph = agent_builder.compile(checkpointer=InMemorySaver())
# Usageconfig = {"configurable": {"thread_id": "customer-123"}}
result = agentic_graph.invoke({ "messages": [{ "role": "user", "content": "I want to buy 2 units of product ABC123" }], "user_request": "Purchase items", "reasoning": ""}, config=config)
print("Final response:", result["messages"][-1].content)Recipe 5: Document Processing Pipeline
Section titled “Recipe 5: Document Processing Pipeline”Multi-stage document processing with quality checks:
from enum import Enum
class DocumentType(Enum): PDF = "pdf" DOCX = "docx" JSON = "json" TEXT = "text"
class ProcessingState(TypedDict): document_id: str document_content: str document_type: DocumentType extraction_result: dict validation_result: dict enrichment_result: dict processing_status: str error_message: str
def extract_content(state: ProcessingState) -> dict: """Extract structured content from document."""
try: if state["document_type"] == DocumentType.PDF: # PDF extraction content = extract_text_from_pdf(state["document_content"]) elif state["document_type"] == DocumentType.DOCX: # DOCX extraction content = extract_text_from_docx(state["document_content"]) else: content = state["document_content"]
# Use LLM to structure the content extraction_prompt = f""" Extract structured information from this document:
{content[:2000]} # First 2000 chars
Extract: 1. Title 2. Key sections 3. Main topics 4. Metadata (author, date, etc)
Return as JSON. """
response = model.invoke(extraction_prompt)
import json extracted = json.loads(response.content)
return { "extraction_result": extracted, "processing_status": "extracted" }
except Exception as e: return { "error_message": str(e), "processing_status": "extraction_failed" }
def validate_content(state: ProcessingState) -> dict: """Validate extracted content quality."""
if state["processing_status"] == "extraction_failed": return {"validation_result": {"valid": False}}
extracted = state["extraction_result"]
validation_prompt = f""" Validate this extracted content:
{json.dumps(extracted, indent=2)}
Check: 1. Completeness - all expected fields present 2. Accuracy - information makes sense 3. Format - proper structure
Return: {{"valid": true/false, "issues": ["list of issues"]}} """
response = model.invoke(validation_prompt)
import json validation = json.loads(response.content)
return {"validation_result": validation}
def enrich_content(state: ProcessingState) -> dict: """Enrich content with additional insights."""
if not state["validation_result"].get("valid"): return {"enrichment_result": {}}
extracted = state["extraction_result"]
enrichment_prompt = f""" Enrich this document with: 1. Summary 2. Key entities (people, organizations, concepts) 3. Related topics 4. Action items 5. Risk assessment (if applicable)
Content: {json.dumps(extracted, indent=2)}
Return as JSON. """
response = model.invoke(enrichment_prompt)
import json enrichment = json.loads(response.content)
return { "enrichment_result": enrichment, "processing_status": "complete" }
# Build document processing pipelinedoc_builder = StateGraph(ProcessingState)doc_builder.add_node("extract", extract_content)doc_builder.add_node("validate", validate_content)doc_builder.add_node("enrich", enrich_content)
doc_builder.add_edge(START, "extract")doc_builder.add_edge("extract", "validate")doc_builder.add_edge("validate", "enrich")doc_builder.add_edge("enrich", END)
doc_pipeline = doc_builder.compile()
# Usageresult = doc_pipeline.invoke({ "document_id": "doc-001", "document_content": "Your PDF/document content here", "document_type": DocumentType.PDF})
if result["processing_status"] == "complete": print("Extraction:", result["extraction_result"]) print("Enrichment:", result["enrichment_result"])else: print("Error:", result["error_message"])Recipe 6: Conversation with Long-term Memory
Section titled “Recipe 6: Conversation with Long-term Memory”Maintain user context across multiple conversations:
from datetime import datetime
class ConversationState(TypedDict): user_id: str message: str conversation_history: Annotated[list, add_messages] user_profile: dict relevant_memories: list[dict] response: str
async def fetch_user_profile( state: ConversationState, store: Annotated[AsyncPostgresStore, InjectedStore]) -> dict: """Fetch user profile from long-term store."""
user_id = state["user_id"] namespace = ("users", user_id)
profile_item = await store.aget(namespace, "profile") profile = profile_item.value if profile_item else { "name": "User", "preferences": {}, "interests": [], "conversation_count": 0 }
return {"user_profile": profile}
async def retrieve_memories( state: ConversationState, store: Annotated[AsyncPostgresStore, InjectedStore]) -> dict: """Retrieve relevant memories from long-term store."""
user_id = state["user_id"] current_message = state["message"]
# Semantic search for relevant memories namespace = ("users", user_id, "memories")
memories = await store.asearch( namespace_prefix=namespace, query=current_message, limit=5 )
return { "relevant_memories": [m.value for m in memories] }
def build_context(state: ConversationState) -> dict: """Build conversational context from memories and profile."""
profile = state["user_profile"] memories = state["relevant_memories"]
context_parts = [ f"User: {profile.get('name', 'User')}", f"Interests: {', '.join(profile.get('interests', []))}", ]
if memories: context_parts.append("Relevant context from past conversations:") for mem in memories: context_parts.append(f"- {mem.get('content', '')}")
return "\n".join(context_parts)
def chat_node(state: ConversationState) -> dict: """Generate response using context."""
context = build_context(state)
system_prompt = f""" You are a helpful assistant with knowledge of the user's history and preferences.
User Context: {context}
Be personalized and reference relevant past context when appropriate. """
messages = state["conversation_history"] + [ {"role": "user", "content": state["message"]} ]
response = model.invoke(messages, system_prompt=system_prompt)
return { "response": response.content, "conversation_history": [{"role": "assistant", "content": response.content}] }
async def save_memory( state: ConversationState, store: Annotated[AsyncPostgresStore, InjectedStore]) -> dict: """Save conversation to long-term memory."""
user_id = state["user_id"] namespace = ("users", user_id, "memories")
memory_entry = { "timestamp": datetime.now().isoformat(), "user_message": state["message"], "assistant_response": state["response"], "content": f"User: {state['message']}\nAssistant: {state['response']}" }
memory_id = f"mem-{uuid.uuid4().hex[:8]}"
await store.aput( namespace, memory_id, memory_entry, index=["content"] # Index for semantic search )
# Update conversation count profile_namespace = ("users", user_id) profile = await store.aget(profile_namespace, "profile") profile_data = profile.value if profile else {} profile_data["conversation_count"] = profile_data.get("conversation_count", 0) + 1 profile_data["last_conversation"] = datetime.now().isoformat()
await store.aput(profile_namespace, "profile", profile_data)
return {}
# Build conversational graph with memoryconv_builder = StateGraph(ConversationState)conv_builder.add_node("fetch_profile", fetch_user_profile)conv_builder.add_node("retrieve_memories", retrieve_memories)conv_builder.add_node("chat", chat_node)conv_builder.add_node("save_memory", save_memory)
conv_builder.add_edge(START, "fetch_profile")conv_builder.add_edge("fetch_profile", "retrieve_memories")conv_builder.add_edge("retrieve_memories", "chat")conv_builder.add_edge("chat", "save_memory")conv_builder.add_edge("save_memory", END)
conversation_graph = conv_builder.compile( store=store # Pass long-term store)
# Usageconfig = {"configurable": {"thread_id": "user-alice"}}
result = conversation_graph.invoke({ "user_id": "alice", "message": "What was I asking about last time?"}, config=config)
print(result["response"])Performance Optimization Tips
Section titled “Performance Optimization Tips”Tip 1: Lazy Evaluation
Section titled “Tip 1: Lazy Evaluation”# Bad - eager evaluationdef slow_node(state): all_results = [expensive_operation(i) for i in range(1000)] return {"results": all_results}
# Good - lazy evaluationdef fast_node(state): def results_generator(): for i in range(1000): yield expensive_operation(i)
return {"results": results_generator()}Tip 2: Efficient State Updates
Section titled “Tip 2: Efficient State Updates”# Bad - rebuilds entire listreturn {"items": state["items"] + [new_item]}
# Good - append reducerclass State(TypedDict): items: Annotated[list, lambda x, y: x + y]
return {"items": [new_item]} # Automatically appendedTip 3: Streaming for Real-Time Feedback
Section titled “Tip 3: Streaming for Real-Time Feedback”# Stream intermediate results to clientasync def stream_processing(): async for event in graph.astream( {"query": "Process this"}, stream_mode="updates" ): # Send each update to client via WebSocket await websocket.send_json(event) yield eventRecipe 7: Cached Multi-Agent Research System (v1.2.1)
Section titled “Recipe 7: Cached Multi-Agent Research System (v1.2.1)”Uses: CachePolicy, InMemoryCache, InMemoryStore, parallel Send fan-out
# Correct imports — all verified against langgraph==1.2.1import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Send, RetryPolicy, CachePolicyfrom langgraph.store.memory import InMemoryStorefrom langgraph.cache.memory import InMemoryCachefrom langgraph.checkpoint.memory import InMemorySaverfrom langgraph.runtime import Runtime
# ---------------------------------------------------------------------------# State schema# ---------------------------------------------------------------------------
class ResearchState(TypedDict): user_id: str topic: str search_queries: list[str] # reducer: all parallel researcher writes are accumulated research_results: Annotated[list[dict], operator.add] final_report: str
class WorkerState(TypedDict): """Narrow state used by each parallel worker node.""" query: str
# ---------------------------------------------------------------------------# Nodes# ---------------------------------------------------------------------------
def load_prefs_and_plan(state: ResearchState, runtime: Runtime) -> dict: """Load user preferences from the store and generate search queries.""" store = runtime.store user_id = state["user_id"]
# Read stored preferences (or use defaults) prefs_item = store.get(("users", user_id), "research_prefs") if store else None prefs = prefs_item.value if prefs_item else { "depth": "standard", # or "comprehensive" "max_queries": 3, }
topic = state["topic"] n = prefs.get("max_queries", 3) # In production: call an LLM to generate queries queries = [f"{topic} - aspect {i+1}" for i in range(n)]
return {"search_queries": queries}
def fan_out(state: ResearchState) -> list[Send]: """Conditional edge: launch one researcher per query in parallel.""" return [Send("researcher", {"query": q}) for q in state["search_queries"]]
def researcher(state: WorkerState) -> dict: """Research a single query — runs in parallel (one per Send). Results are merged into research_results via the operator.add reducer. """ query = state["query"] # In production: call a search API or LLM here result = { "query": query, "summary": f"[stub] findings for '{query}'", "sources": [f"https://example.com/search?q={query.replace(' ', '+')}"], } return {"research_results": [result]}
def synthesise(state: ResearchState, runtime: Runtime) -> dict: """Combine all parallel results into a final report and save to store.""" store = runtime.store all_results = state["research_results"]
bullets = "\n".join(f"- {r['query']}: {r['summary']}" for r in all_results) report = f"## Research Report: {state['topic']}\n\n{bullets}"
# Persist for future look-up if store: store.put( ("users", state["user_id"], "reports"), state["topic"], {"report": report, "query_count": len(all_results)}, )
return {"final_report": report}
# ---------------------------------------------------------------------------# Graph construction# ---------------------------------------------------------------------------
builder = StateGraph(ResearchState)builder.add_node( "plan", load_prefs_and_plan,)builder.add_node( "researcher", researcher, # Retry transient errors; cache results for 10 minutes per unique query retry_policy=RetryPolicy(max_attempts=3, initial_interval=0.5), cache_policy=CachePolicy(ttl=600),)builder.add_node("synthesise", synthesise)
builder.add_edge(START, "plan")builder.add_conditional_edges("plan", fan_out) # dynamic fan-outbuilder.add_edge("researcher", "synthesise")builder.add_edge("synthesise", END)
# Pass both store (long-term memory) and cache (node result caching)store = InMemoryStore()cache = InMemoryCache()checkpointer = InMemorySaver()
research_graph = builder.compile( checkpointer=checkpointer, store=store, cache=cache,)
# ---------------------------------------------------------------------------# Usage# ---------------------------------------------------------------------------
cfg = {"configurable": {"thread_id": "research-session-1"}}
result = research_graph.invoke( {"user_id": "alice", "topic": "AI in healthcare", "search_queries": [], "research_results": [], "final_report": ""}, cfg,)print(result["final_report"])
# Second run: identical queries hit the cache (CachePolicy ttl=600)result2 = research_graph.invoke( {"user_id": "alice", "topic": "AI in healthcare", "search_queries": [], "research_results": [], "final_report": ""}, cfg,)print(result2["final_report"]) # same content, returned from cacheRecipe 8: Smart Shopping Assistant with Pre/Post Model Hooks (v1.2.1)
Section titled “Recipe 8: Smart Shopping Assistant with Pre/Post Model Hooks (v1.2.1)”Uses: create_react_agent with pre_model_hook / post_model_hook, Command-returning tools, custom state schema
# Correct imports — verified against langgraph==1.2.1 / langgraph-prebuilt==1.1.0import operatorfrom typing import Annotated, Anyfrom typing_extensions import TypedDictfrom langchain_core.messages import AnyMessage, SystemMessagefrom langchain_core.tools import toolfrom langgraph.graph.message import add_messagesfrom langgraph.prebuilt import create_react_agent, InjectedStatefrom langgraph.checkpoint.memory import InMemorySaverfrom langgraph.types import Command
# ---------------------------------------------------------------------------# State schema — extends the default MessagesState with shopping fields# ---------------------------------------------------------------------------
class ShoppingState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] user_id: str cart: list[dict] total: float tokens_used: int
# ---------------------------------------------------------------------------# Tools — use InjectedState to read the graph state directly# Return Command to update state alongside the tool message# ---------------------------------------------------------------------------
@tooldef add_to_cart( product_id: str, product_name: str, price: float, quantity: int, state: Annotated[ShoppingState, InjectedState],) -> Command: """Add a product to the shopping cart and update the total.""" item = {"product_id": product_id, "name": product_name, "price": price, "qty": quantity} new_cart = state.get("cart", []) + [item] new_total = state.get("total", 0.0) + price * quantity return Command( update={ "cart": new_cart, "total": new_total, }, goto="agent", # return control to the agent node )
@tooldef view_cart(state: Annotated[ShoppingState, InjectedState]) -> str: """Return a human-readable summary of the current cart.""" cart = state.get("cart", []) if not cart: return "Your cart is empty." lines = [f"- {item['name']} x{item['qty']} ${item['price'] * item['qty']:.2f}" for item in cart] total = state.get("total", 0.0) return "Cart:\n" + "\n".join(lines) + f"\n\nTotal: ${total:.2f}"
@tooldef search_products(category: str) -> list[dict]: """Search available products in a category (stub).""" catalogue = { "laptop": [ {"id": "mbp-16", "name": "MacBook Pro 16", "price": 2499.0}, {"id": "xps-15", "name": "Dell XPS 15", "price": 1799.0}, ], "monitor": [ {"id": "lg-27", "name": "LG 27\" 4K", "price": 499.0}, ], } return catalogue.get(category.lower(), [])
# ---------------------------------------------------------------------------# Pre-model hook — inject cart context as a system message before every LLM call# ---------------------------------------------------------------------------
def shopping_context_hook(state: ShoppingState) -> dict: """Prepend a system message summarising the current cart state.
pre_model_hook receives the full state and returns a dict to merge into state before the LLM call. We prepend a SystemMessage to 'messages' — add_messages will handle the merge correctly. """ cart = state.get("cart", []) total = state.get("total", 0.0) summary = ( f"User {state.get('user_id', 'unknown')} | " f"Cart: {len(cart)} item(s), total ${total:.2f}. " "Help them find and add the best products for their needs." ) return {"messages": [SystemMessage(content=summary)]}
# ---------------------------------------------------------------------------# Post-model hook — accumulate token usage after every LLM call# ---------------------------------------------------------------------------
def track_tokens(state: ShoppingState) -> dict: """After the LLM responds, read usage_metadata from the latest AI message and accumulate the token count.
post_model_hook receives the state (already updated with the new AI message) and returns a dict to merge into state. """ last = state["messages"][-1] usage = getattr(last, "usage_metadata", None) or {} new_tokens = usage.get("input_tokens", 0) + usage.get("output_tokens", 0) return {"tokens_used": state.get("tokens_used", 0) + new_tokens}
# ---------------------------------------------------------------------------# Build the agent# ---------------------------------------------------------------------------
tools = [add_to_cart, view_cart, search_products]
# Replace with a real LLM in production:# from langchain_anthropic import ChatAnthropic# llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")from unittest.mock import MagicMockfrom langchain_core.messages import AIMessagellm = MagicMock()llm.bind_tools.return_value = llmllm.invoke.return_value = AIMessage(content="Here are some laptop options for you.", tool_calls=[])
shopping_agent = create_react_agent( model=llm, tools=tools, state_schema=ShoppingState, pre_model_hook=shopping_context_hook, # inject cart context post_model_hook=track_tokens, # record token usage checkpointer=InMemorySaver(),)
# ---------------------------------------------------------------------------# Usage# ---------------------------------------------------------------------------
cfg = {"configurable": {"thread_id": "shopper-123"}}
result = shopping_agent.invoke( { "messages": [("user", "I need a good laptop for programming. Show me options and add the best one.")], "user_id": "shopper-123", "cart": [], "total": 0.0, "tokens_used": 0, }, cfg,)
print(f"Cart: {result['cart']}")print(f"Total: ${result['total']:.2f}")print(f"Tokens used: {result['tokens_used']}")print(f"Last reply: {result['messages'][-1].content}")Recipe 9: Intelligent Workflow Coordinator (v1.2.1)
Section titled “Recipe 9: Intelligent Workflow Coordinator (v1.2.1)”Uses: Command routing, Send fan-out, defer=True node, RetryPolicy
# Correct imports — all verified against langgraph==1.2.1import operatorfrom typing import Annotated, Literalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Command, Send, RetryPolicyfrom langgraph.checkpoint.memory import InMemorySaverfrom langgraph.cache.memory import InMemoryCache
# ---------------------------------------------------------------------------# State schema# ---------------------------------------------------------------------------
TASK_TYPES = ["data", "training", "validation"]
class WorkflowState(TypedDict): workflow_id: str tasks: list[dict] # list of {"id":..., "type":..., "priority":...} completed: Annotated[list[str], operator.add] # reducer: accumulate completed task ids failed: Annotated[list[str], operator.add] # reducer: accumulate failed task ids workflow_status: str
class TaskState(TypedDict): """Narrow state injected into each worker by Send.""" task_id: str task_type: str
# ---------------------------------------------------------------------------# Nodes# ---------------------------------------------------------------------------
def coordinator(state: WorkflowState) -> Command[Literal["fan_out", "__end__"]]: """Decide whether to launch tasks or finish.
Returns Command(goto="fan_out") while there are pending tasks, or Command(goto=END, update={"workflow_status": "complete"}) when done. """ completed = state.get("completed", []) failed = state.get("failed", []) done = completed + failed pending = [t for t in state["tasks"] if t["id"] not in done]
if not pending: return Command( goto=END, update={"workflow_status": f"done — {len(completed)} ok, {len(failed)} failed"}, )
return Command(goto="fan_out")
def fan_out(state: WorkflowState) -> list[Send]: """Launch all pending tasks in parallel using Send.""" completed = state.get("completed", []) failed = state.get("failed", []) done = set(completed + failed) return [ Send("run_task", {"task_id": t["id"], "task_type": t["type"]}) for t in state["tasks"] if t["id"] not in done ]
def run_task(state: TaskState) -> dict: """Execute a single task. Runs in parallel — one instance per Send.
Results accumulate back into WorkflowState via the operator.add reducer on 'completed' / 'failed'. """ task_id = state["task_id"] task_type = state["task_type"]
# Simulate work — replace with real logic import random success = random.random() > 0.1 # 10% artificial failure rate
if success: print(f" [task] {task_id} ({task_type}) OK") return {"completed": [task_id]} else: print(f" [task] {task_id} ({task_type}) FAILED") return {"failed": [task_id]}
# ---------------------------------------------------------------------------# Graph construction## Key features demonstrated:# - coordinator uses Command for dynamic routing (no static edges from it)# - fan_out returns list[Send] for variable-width parallelism# - run_task uses retry_policy so transient failures are retried automatically# - defer=True on the aggregate node means it runs AFTER all parallel run_task# instances in the same super-step finish# ---------------------------------------------------------------------------
def aggregate(state: WorkflowState) -> dict: """Summarise. Runs only after all parallel run_task nodes complete because it is registered with defer=True. """ return { "workflow_status": ( f"aggregated: {len(state.get('completed', []))} completed, " f"{len(state.get('failed', []))} failed" ) }
builder = StateGraph(WorkflowState)builder.add_node("coordinator", coordinator)builder.add_node("fan_out", fan_out)builder.add_node( "run_task", run_task, retry_policy=RetryPolicy(max_attempts=3, initial_interval=0.1),)builder.add_node( "aggregate", aggregate, defer=True, # waits for all non-deferred nodes in the same super-step)
builder.add_edge(START, "coordinator")# fan_out and run_task edges come from coordinator's Command / fan_out's Sendsbuilder.add_edge("fan_out", "run_task")builder.add_edge("run_task", "aggregate")builder.add_edge("aggregate", "coordinator") # loop back — coordinator will exit via END
# ---------------------------------------------------------------------------# Usage# ---------------------------------------------------------------------------
cache = InMemoryCache()graph = builder.compile( checkpointer=InMemorySaver(), cache=cache,)
cfg = {"configurable": {"thread_id": "workflow-001"}}result = graph.invoke( { "workflow_id": "ml-pipeline-001", "tasks": [ {"id": "data_task", "type": "data", "priority": 1}, {"id": "training_task", "type": "training", "priority": 2}, {"id": "validation_task", "type": "validation", "priority": 3}, ], "completed": [], "failed": [], "workflow_status": "pending", }, cfg, {"recursion_limit": 50},)
print(f"Workflow Status: {result['workflow_status']}")print(f"Completed tasks: {result['completed']}")print(f"Failed tasks: {result['failed']}")This collection of recipes covers real-world patterns you’ll encounter building production AI systems with LangGraph 1.2.1.
Adapt and combine them for your specific use cases!
Recipe 10: Long-Term Memory with Vector Search and InMemoryStore (v1.2.1)
Section titled “Recipe 10: Long-Term Memory with Vector Search and InMemoryStore (v1.2.1)”Uses: InMemoryStore with vector index, Runtime context injection, per-user memory namespaces
A complete chatbot that persists and retrieves user preferences using InMemoryStore with optional vector search. The Runtime object carries both the store and a typed context object so nodes stay pure functions.
from langgraph.store.memory import InMemoryStorefrom langgraph.graph.message import MessagesState, add_messagesfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.runtime import Runtimefrom langchain_core.messages import HumanMessage, AIMessage, SystemMessagefrom typing import Annotatedfrom typing_extensions import TypedDictfrom dataclasses import dataclass
# Store with optional vector searchstore = InMemoryStore( index={ "dims": 1536, "embed": embed_fn, # your embedding function "fields": ["content"] })
# Typed context carried through the run@dataclassclass UserContext: user_id: str
class ChatState(MessagesState): pass
def recall_memories(state: ChatState, runtime: Runtime[UserContext]) -> dict: """Load relevant memories from long-term store.""" user_id = runtime.context.user_id
# Get last user message for search last_msg = state["messages"][-1].content if state["messages"] else ""
# Search for relevant memories memories = runtime.store.search( ("memories", user_id), query=last_msg, limit=3 )
if memories: mem_text = "\n".join(f"- {m.value['content']}" for m in memories) system = SystemMessage(content=f"User memories:\n{mem_text}") return {"messages": [system]} return {}
def save_memory(state: ChatState, runtime: Runtime[UserContext]) -> dict: """Save important facts to long-term store.""" user_id = runtime.context.user_id if runtime.context else "anon" last_ai = state["messages"][-1] if state["messages"] else None if not last_ai: return {}
import uuid runtime.store.put( ("memories", user_id), str(uuid.uuid4()), {"content": last_ai.content[:500]} ) return {}
def call_model(state: ChatState) -> dict: # model invocation return {"messages": [AIMessage(content="Response...")]}
builder = StateGraph(ChatState, context_schema=UserContext)builder.add_node("recall", recall_memories)builder.add_node("agent", call_model)builder.add_node("save", save_memory)
builder.add_edge(START, "recall")builder.add_edge("recall", "agent")builder.add_edge("agent", "save")builder.add_edge("save", END)
graph = builder.compile(store=store)
# Pass context via configurableresult = graph.invoke( {"messages": [HumanMessage("I prefer dark mode")]}, {"configurable": {"context": UserContext(user_id="alice")}})
print(result["messages"][-1].content)
# On subsequent runs the recalled memories are injected as a SystemMessage# before the agent node, so the model always has the user's preferences in# its context window.Key points:
InMemoryStoreaccepts anindexdict to enable semantic (vector) search viastore.search(..., query=...). Omit it for key-only lookup.Runtime[UserContext]is injected automatically by LangGraph when declared as a node parameter; it exposes.context,.store, and.stream_writer.context_schema=UserContextonStateGraphtells the compiler which dataclass to deserialise fromconfigurable["context"].
Recipe 11: Parallel Map-Reduce with Send and BinaryOperatorAggregate (v1.2.1)
Section titled “Recipe 11: Parallel Map-Reduce with Send and BinaryOperatorAggregate (v1.2.1)”Uses: Send for fan-out, Annotated[list, operator.add] reducer for fan-in, ToolRuntime inside a tool
A document-analysis pipeline that dispatches each document to a parallel analyze_doc node, then reduces all results in a single reduce_results node.
import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Sendfrom langgraph.prebuilt.tool_node import ToolRuntimefrom langgraph.store.base import BaseStorefrom langchain_core.tools import tool
# ── State ────────────────────────────────────────────────────────────────────
class PipelineState(TypedDict): documents: list[str] # raw document texts to process # BinaryOperatorAggregate: each parallel branch appends its slice analysis_results: Annotated[list[dict], operator.add] summary: str
# ── Parallel branch state (sent via Send) ────────────────────────────────────
class DocState(TypedDict): doc_text: str analysis_results: Annotated[list[dict], operator.add]
# ── Tool that uses ToolRuntime to access the store ───────────────────────────
@tooldef save_insight( doc_id: str, insight: str, runtime: ToolRuntime,) -> str: """Persist an insight from a document into the shared store.""" if runtime.store: import uuid runtime.store.put( ("insights",), str(uuid.uuid4()), {"doc_id": doc_id, "insight": insight} ) if runtime.stream_writer: runtime.stream_writer({"saved_insight": insight}) return f"Saved insight for {doc_id}"
# ── Nodes ────────────────────────────────────────────────────────────────────
def fan_out(state: PipelineState) -> list[Send]: """Create one Send per document — each runs analyze_doc in parallel.""" return [ Send("analyze_doc", {"doc_text": doc, "analysis_results": []}) for doc in state["documents"] ]
def analyze_doc(state: DocState) -> dict: """Analyse a single document (runs in parallel for every document).""" text = state["doc_text"]
# Simulated analysis — replace with real LLM / extraction logic result = { "length": len(text), "preview": text[:80], "sentiment": "positive" if "good" in text.lower() else "neutral", }
return {"analysis_results": [result]} # appended by reducer
def reduce_results(state: PipelineState) -> dict: """Runs only after ALL analyze_doc branches have completed (fan-in).""" results = state["analysis_results"]
avg_length = sum(r["length"] for r in results) / max(len(results), 1) sentiments = [r["sentiment"] for r in results]
summary = ( f"Processed {len(results)} documents. " f"Avg length: {avg_length:.0f} chars. " f"Sentiments: {', '.join(set(sentiments))}." ) return {"summary": summary}
# ── Graph ────────────────────────────────────────────────────────────────────
builder = StateGraph(PipelineState)builder.add_node("analyze_doc", analyze_doc)builder.add_node("reduce_results", reduce_results)
# fan_out returns a list[Send] — LangGraph dispatches each in parallelbuilder.add_conditional_edges(START, fan_out, ["analyze_doc"])
# All analyze_doc branches converge herebuilder.add_edge("analyze_doc", "reduce_results")builder.add_edge("reduce_results", END)
pipeline = builder.compile()
result = pipeline.invoke({ "documents": [ "This is a good product review.", "The service was okay.", "Great experience overall — good work!" ], "analysis_results": [], "summary": ""})
print(result["summary"])# Processed 3 documents. Avg length: 37 chars. Sentiments: positive, neutral.Key points:
Annotated[list, operator.add]is a BinaryOperatorAggregate channel. Each parallel branch writes{"analysis_results": [single_item]}and LangGraph concatenates them automatically; no explicit merge node is needed.Send("node_name", partial_state)lets you dynamically create parallel branches at runtime. The partial state is merged into the branch’s state before the node runs.ToolRuntimeis injected byToolNodewhen a@tooldeclares it as a parameter. It exposes.store,.state,.stream_writer, and.tool_call_id— giving tools read/write access to cross-thread memory without coupling them to a specific state schema.
Recipe 12: ToolRuntime All-In-One (v1.2.1)
Section titled “Recipe 12: ToolRuntime All-In-One (v1.2.1)”Uses: ToolRuntime for store access, streaming progress events, and tool_call_id correlation
A single tool that demonstrates every capability exposed by ToolRuntime: reading graph state, writing to the long-term store, emitting streaming progress, and tagging output with the originating tool call ID.
from langgraph.prebuilt.tool_node import ToolRuntimefrom langgraph.store.base import BaseStorefrom langgraph.prebuilt import ToolNode, create_react_agentfrom langchain_core.tools import toolfrom typing import Annotatedimport uuid
@tooldef research_and_remember( topic: str, runtime: ToolRuntime,) -> str: """Research a topic and save findings to memory.
Demonstrates all four ToolRuntime capabilities: 1. runtime.state — read current graph state 2. runtime.store — write to long-term cross-thread store 3. runtime.stream_writer — emit custom streaming events 4. runtime.tool_call_id — correlate results to the originating call """ # 1. Read state (optional — may be None if store injection is disabled) user_id = runtime.state.get("user_id", "anon") if runtime.state else "anon"
# 2. Emit streaming progress so the client can show a spinner / log if runtime.stream_writer: runtime.stream_writer({"status": "researching", "topic": topic})
# 3. Perform the work (simulated — replace with real API calls) findings = f"Key findings about {topic}: [placeholder — add real research logic]"
# 4. Persist findings to long-term store under a per-user namespace if runtime.store: runtime.store.put( ("research", user_id), str(uuid.uuid4()), {"topic": topic, "findings": findings} )
# 5. Include the tool_call_id in the streaming event so the client can # match progress updates back to the specific tool invocation if runtime.stream_writer: runtime.stream_writer({ "status": "saved", "tool_call_id": runtime.tool_call_id })
return findings
# ── Wire the tool into a ReAct agent ─────────────────────────────────────────
from langgraph.store.memory import InMemoryStorefrom typing_extensions import TypedDictfrom langgraph.graph.message import add_messages
store = InMemoryStore()
class AgentState(TypedDict): messages: Annotated[list, add_messages] user_id: str
agent = create_react_agent( model=model, # your ChatAnthropic / ChatOpenAI etc. tools=[research_and_remember], state_schema=AgentState,)
# Stream the run — custom events emitted by stream_writer appear in the# "custom" stream mode alongside the standard message events.for event in agent.stream( { "messages": [{"role": "user", "content": "Research quantum computing"}], "user_id": "alice", }, stream_mode=["updates", "custom"], config={"store": store},): print(event)Key points:
ToolRuntimeis automatically injected byToolNode/create_react_agent— declare it as a parameter typedToolRuntimeand LangGraph wires it up; never pass it manually from your own code.runtime.stream_writeraccepts any JSON-serialisable dict. These are surfaced when the caller usesstream_mode="custom"(or a list that includes"custom").runtime.storeis the same store instance passed tocompile(store=...)or theconfigdict, so tools share the same persistent memory as graph nodes.runtime.tool_call_idmatches theidfield on theToolCallthat triggered this invocation — useful for correlating streaming progress events to a specific call when multiple tool calls fire in parallel.
Recipe 9: State-Aware Shopping Agent with InjectedState + InjectedStore
Section titled “Recipe 9: State-Aware Shopping Agent with InjectedState + InjectedStore”This recipe demonstrates how InjectedState and InjectedStore let tools access graph state and persistent storage without exposing internal details to the LLM.
from typing import Any, Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, END, MessagesStatefrom langgraph.prebuilt import ToolNode, InjectedState, InjectedStore, tools_condition, create_react_agentfrom langgraph.store.memory import InMemoryStorefrom langchain_core.tools import toolfrom langchain_anthropic import ChatAnthropic
# Extend MessagesState with domain-specific fieldsclass ShopState(MessagesState): user_id: str cart: list[dict] # [{"id": str, "name": str, "price": float}] user_tier: str # "standard" | "premium"
# ── Tools ────────────────────────────────────────────────────────────────────
@tooldef add_to_cart( product_id: str, product_name: str, price: float, # Injected from state — invisible to LLM cart: Annotated[list, InjectedState("cart")], user_tier: Annotated[str, InjectedState("user_tier")],) -> str: """Add a product to the shopping cart.""" discount = 0.20 if user_tier == "premium" else 0.0 final_price = price * (1 - discount) # Note: returning a string here; ToolNode wraps it in ToolMessage return ( f"Added {product_name!r} at ${final_price:.2f}" f"{' (20% premium discount applied)' if discount else ''}. " f"Cart now has {len(cart) + 1} item(s)." )
@tooldef view_cart( state: Annotated[dict, InjectedState()],) -> str: """View the current cart contents.""" cart = state.get("cart", []) tier = state.get("user_tier", "standard") if not cart: return f"Cart is empty ({tier} account)." lines = [f" • {item['name']}: ${item['price']:.2f}" for item in cart] return f"Cart ({tier}, {len(cart)} items):\n" + "\n".join(lines)
@tooldef save_preference( key: str, value: str, user_id: Annotated[str, InjectedState("user_id")], store: Annotated[Any, InjectedStore()],) -> str: """Save a shopping preference (e.g. favourite brand, size).""" store.put(("preferences", user_id), key, {"value": value}) return f"Saved preference: {key} = {value}"
@tooldef get_preference( key: str, user_id: Annotated[str, InjectedState("user_id")], store: Annotated[Any, InjectedStore()],) -> str: """Retrieve a previously saved shopping preference.""" item = store.get(("preferences", user_id), key) if item is None: return f"No preference found for '{key}'." return f"Your {key}: {item.value['value']}"
# ── Agent setup ───────────────────────────────────────────────────────────────
tools = [add_to_cart, view_cart, save_preference, get_preference]model = ChatAnthropic(model="claude-3-5-haiku-20241022").bind_tools(tools)persistent_store = InMemoryStore()
def agent_node(state: ShopState) -> dict: return {"messages": [model.invoke(state["messages"])]}
builder = StateGraph(ShopState)builder.add_node("agent", agent_node)builder.add_node("tools", ToolNode(tools))builder.add_edge(START, "agent")builder.add_conditional_edges("agent", tools_condition)builder.add_edge("tools", "agent")
graph = builder.compile(store=persistent_store)
# ── Usage ─────────────────────────────────────────────────────────────────────
from langchain_core.messages import HumanMessage
result = graph.invoke({ "messages": [HumanMessage("Add some Sony headphones for $299 to my cart")], "user_id": "user-42", "cart": [], "user_tier": "premium",})print(result["messages"][-1].content)# "Added 'Sony headphones' at $239.20 (20% premium discount applied). Cart now has 1 item(s)."Recipe 10: Resetting Accumulated State with Overwrite
Section titled “Recipe 10: Resetting Accumulated State with Overwrite”When a node needs to replace an accumulator channel rather than append to it, use Overwrite to bypass the reducer entirely.
import operatorfrom typing import Annotated, Literalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Overwritefrom langgraph.checkpoint.memory import InMemorySaver
class PipelineState(TypedDict): batch_id: str events: Annotated[list[str], operator.add] # accumulates across nodes errors: Annotated[list[str], operator.add] phase: str
# Normal accumulation — appends to eventsdef process_batch(state: PipelineState) -> dict: batch = state["batch_id"] return { "events": [f"processed:{batch}"], "phase": "processed", }
# Appends error detaildef handle_error(state: PipelineState) -> dict: return { "errors": [f"error in batch {state['batch_id']}"], "events": [f"error:{state['batch_id']}"], "phase": "errored", }
# Clears both lists — hard reset before re-processingdef reset_state(state: PipelineState) -> dict: return { "events": Overwrite(value=[f"reset:{state['batch_id']}"]), "errors": Overwrite(value=[]), "phase": "reset", }
def route(state: PipelineState) -> Literal["handle_error", "reset_state", "__end__"]: if state["errors"]: return "handle_error" if state["phase"] == "reset": return "__end__" return "__end__"
builder = StateGraph(PipelineState)builder.add_node("process", process_batch)builder.add_node("handle_error", handle_error)builder.add_node("reset_state", reset_state)builder.add_edge(START, "process")builder.add_conditional_edges("process", route)builder.add_edge("handle_error", "reset_state")builder.add_edge("reset_state", END)
graph = builder.compile(checkpointer=InMemorySaver())
# Run with a batch that will have an error, then reset# In practice you'd trigger the error condition via routing logicresult = graph.invoke({ "batch_id": "batch-007", "events": ["initial"], "errors": [], "phase": "pending",})print(result["events"]) # depends on routing; after reset → ["reset:batch-007"]print(result["errors"]) # [] — overwritten by reset_stateKey rule for Overwrite
Section titled “Key rule for Overwrite”Only one node may Overwrite a given channel per super-step. If two concurrent nodes both return Overwrite(...) for the same channel, LangGraph raises InvalidUpdateError.
Recipe 11: Checkpoint History Browser with CheckpointTuple
Section titled “Recipe 11: Checkpoint History Browser with CheckpointTuple”Use CheckpointTuple to build a debugging tool that inspects every state the graph passed through, and supports rewinding to any historical step.
from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import StateUpdatefrom typing_extensions import TypedDict
class WorkflowState(TypedDict): input: str draft: str score: float revision: int
def draft_step(state: WorkflowState) -> dict: return { "draft": f"Draft #{state['revision'] + 1} for: {state['input']}", "revision": state["revision"] + 1, "score": 0.5 + state["revision"] * 0.1, }
saver = InMemorySaver()builder = StateGraph(WorkflowState)builder.add_node("draft", draft_step)builder.add_edge(START, "draft")builder.add_edge("draft", END)graph = builder.compile(checkpointer=saver)
config = {"configurable": {"thread_id": "audit-demo"}}
# Run three times to build up historyfor _ in range(3): graph.invoke({"input": "AI trends", "draft": "", "score": 0.0, "revision": 0}, config)
# ── Browse checkpoint history ─────────────────────────────────────────────────
print("=== Checkpoint History ===")checkpoints = list(saver.list(config))for i, cp in enumerate(checkpoints): meta = cp.metadata vals = cp.checkpoint.get("channel_values", {}) print( f"[{i}] source={meta.get('source')!r:8} " f"step={meta.get('step'):3} " f"revision={vals.get('revision', '?')} " f"score={vals.get('score', '?')}" )
# ── Time-travel: rewind to an earlier step ──────────────────────────────────
# Pick the second-oldest checkpoint (index -1 is newest)target_cp = checkpoints[-2]past_state = graph.get_state(target_cp.config)print(f"\nRewound to revision={past_state.values['revision']}, score={past_state.values['score']}")
# Continue from that historical point (forks the thread)resumed = graph.invoke(None, target_cp.config)print(f"After resume: revision={resumed['revision']}, score={resumed['score']}")
# ── Filter by source ─────────────────────────────────────────────────────────
loop_checkpoints = list(saver.list(config, filter={"source": "loop"}))print(f"\nLoop checkpoints: {len(loop_checkpoints)}")Recipe 12: Human-in-the-Loop Approval with update_state and StateUpdate
Section titled “Recipe 12: Human-in-the-Loop Approval with update_state and StateUpdate”Pause the graph at a sensitive step, let a human review and modify state, then resume:
from typing import Literalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.memory import InMemorySaverfrom langgraph.types import StateUpdate
class ApprovalState(TypedDict): request: str draft_action: str approved: bool reviewer_note: str result: str
def generate_action(state: ApprovalState) -> dict: """Generate a proposed action (requires human approval before executing).""" return { "draft_action": f"Transfer $10,000 for: {state['request']}", "approved": False, }
def execute_action(state: ApprovalState) -> dict: """Execute the approved action.""" if not state["approved"]: return {"result": "Cancelled — not approved."} return { "result": f"Executed: {state['draft_action']}. Note: {state['reviewer_note']}" }
saver = InMemorySaver()builder = StateGraph(ApprovalState)builder.add_node("generate", generate_action)builder.add_node("execute", execute_action)builder.add_edge(START, "generate")builder.add_edge("generate", "execute") # interrupted here in practicebuilder.add_edge("execute", END)
# Interrupt AFTER generate so the human sees the draft before execute runsgraph = builder.compile(checkpointer=saver, interrupt_after=["generate"])
config = {"configurable": {"thread_id": "approval-thread"}}
# Step 1: Start the graph — it pauses after "generate"graph.invoke( {"request": "vendor invoice #1234", "draft_action": "", "approved": False, "reviewer_note": "", "result": ""}, config,)
# Step 2: Human reviews draft_action via get_statestate = graph.get_state(config)print("Draft action:", state.values["draft_action"])# → "Transfer $10,000 for: vendor invoice #1234"
# Step 3: Human approves (and optionally edits)graph.update_state( config, { "approved": True, "reviewer_note": "Verified invoice matches PO-5678", # Human can also change draft_action here if needed }, as_node="generate", # treat this as if generate emitted the update)
# Step 4: Resume — execute now runsfinal = graph.invoke(None, config)print("Result:", final["result"])# → "Executed: Transfer $10,000 for: vendor invoice #1234. Note: Verified invoice matches PO-5678"
# ── Bulk update: apply multiple edits atomically ─────────────────────────────
# For multi-field updates you want transactional, use bulk_update_state:graph.bulk_update_state( config, [ [StateUpdate({"approved": True}, as_node="generate")], [StateUpdate({"reviewer_note": "All clear"}, as_node="generate")], ],)Recipe 13: ToolRuntime — unified runtime injection
Section titled “Recipe 13: ToolRuntime — unified runtime injection”ToolRuntime is the single dataclass injected into any tool that declares runtime: ToolRuntime. It replaces the older InjectedState / InjectedStore annotations with one unified parameter, and additionally exposes tool_call_id, config, stream_writer, and the full tools list.
import datetimefrom typing import Annotatedfrom typing_extensions import TypedDictfrom langchain_core.messages import AnyMessagefrom langchain_core.tools import toolfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.graph.message import add_messagesfrom langgraph.prebuilt import ToolNode, tools_conditionfrom langgraph.prebuilt.tool_node import ToolRuntimefrom langgraph.store.memory import InMemoryStorefrom langgraph.checkpoint.memory import InMemorySaverfrom langchain_anthropic import ChatAnthropic
class WorkflowState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] user_id: str session_start: str
@tooldef save_research_note(topic: str, content: str, runtime: ToolRuntime) -> str: """Save a research note to long-term memory for the current user.
Args: topic: Short topic label for the note. content: The note content to persist. """ user_id = runtime.state["user_id"] # read from graph state call_id = runtime.tool_call_id # unique per invocation
# Persist to the store — survives across threads runtime.store.put( ("research", user_id), call_id, { "topic": topic, "content": content, "saved_at": datetime.datetime.utcnow().isoformat(), }, )
# Stream a custom event visible to any stream consumer runtime.stream_writer({"note_saved": {"topic": topic, "call_id": call_id}})
return f"Note on '{topic}' saved (id={call_id[:8]})"
@tooldef list_notes(runtime: ToolRuntime) -> str: """List all research notes saved for the current user.""" user_id = runtime.state["user_id"] items = runtime.store.search(("research", user_id)) if not items: return "No notes saved yet." lines = [f"- {item.value['topic']}: {item.value['content'][:60]}" for item in items] return "\n".join(lines)
tools = [save_research_note, list_notes]model = ChatAnthropic(model="claude-opus-4-8").bind_tools(tools)store = InMemoryStore()
def call_model(state: WorkflowState) -> dict: return {"messages": [model.invoke(state["messages"])]}
graph = StateGraph(WorkflowState)graph.add_node("agent", call_model)graph.add_node("tools", ToolNode(tools))graph.add_edge(START, "agent")graph.add_conditional_edges("agent", tools_condition)graph.add_edge("tools", "agent")
app = graph.compile(checkpointer=InMemorySaver(), store=store)
result = app.invoke( { "messages": [{"role": "user", "content": "Save a note: LangGraph uses Pregel-style supersteps"}], "user_id": "alice", "session_start": datetime.datetime.utcnow().isoformat(), }, config={"configurable": {"thread_id": "alice-session-1"}},)Recipe 14: ToolNode.wrap_tool_call — interceptor for retries, caching, and auditing
Section titled “Recipe 14: ToolNode.wrap_tool_call — interceptor for retries, caching, and auditing”wrap_tool_call is a sync interceptor that wraps every tool execution inside a ToolNode. Use it for cross-cutting concerns — rate-limiting, audit logging, result caching — without modifying each tool individually.
import hashlibimport jsonimport timefrom collections import defaultdictfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langchain_core.messages import AnyMessage, ToolMessagefrom langchain_core.tools import toolfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.graph.message import add_messagesfrom langgraph.prebuilt import ToolNode, tools_conditionfrom langgraph.prebuilt.tool_node import ToolCallRequestfrom langchain_anthropic import ChatAnthropic
# ── Simple in-process cache ──────────────────────────────────────────────────
_result_cache: dict[str, tuple[str, float]] = {}_call_log: list[dict] = []_call_counts: dict[str, list[float]] = defaultdict(list)RATE_LIMIT_PER_MINUTE = 10CACHE_TTL_SECONDS = 120
def observability_wrapper(request: ToolCallRequest, execute) -> ToolMessage: """Layer rate-limiting, caching, and audit logging onto every tool call.""" tool_name = request.tool_call["name"] args = request.tool_call["args"] call_id = request.tool_call["id"] now = time.time()
# ── 1. Rate limiting ───────────────────────────────────────────────────── window = [t for t in _call_counts[tool_name] if now - t < 60] _call_counts[tool_name] = window if len(window) >= RATE_LIMIT_PER_MINUTE: return ToolMessage( content=f"Rate limit hit for '{tool_name}'. Retry in {60 - (now - window[0]):.0f}s.", tool_call_id=call_id, ) _call_counts[tool_name].append(now)
# ── 2. Cache lookup ────────────────────────────────────────────────────── cache_key = hashlib.md5( json.dumps({"tool": tool_name, "args": args}, sort_keys=True).encode() ).hexdigest() if cache_key in _result_cache: cached_content, cached_at = _result_cache[cache_key] if now - cached_at < CACHE_TTL_SECONDS: _call_log.append({"call_id": call_id, "tool": tool_name, "source": "cache"}) return ToolMessage(content=cached_content, tool_call_id=call_id)
# ── 3. Execute and cache result ────────────────────────────────────────── result: ToolMessage = execute() _result_cache[cache_key] = (result.content, now) _call_log.append({ "call_id": call_id, "tool": tool_name, "args": args, "source": "live", "timestamp": now, }) return result
# ── Tools ────────────────────────────────────────────────────────────────────
@tooldef get_exchange_rate(from_currency: str, to_currency: str) -> str: """Get the exchange rate between two currencies.""" # Replace with a real FX API in production rates = {"USD_EUR": 0.92, "EUR_USD": 1.09, "GBP_USD": 1.27} key = f"{from_currency}_{to_currency}" rate = rates.get(key, "unknown") return f"1 {from_currency} = {rate} {to_currency}"
@tooldef get_country_info(country: str) -> str: """Get basic information about a country.""" info = { "France": "Capital: Paris, Currency: EUR, Population: 68M", "UK": "Capital: London, Currency: GBP, Population: 67M", } return info.get(country, f"No data for {country}")
class AgentState(TypedDict): messages: Annotated[list[AnyMessage], add_messages]
tools = [get_exchange_rate, get_country_info]model = ChatAnthropic(model="claude-opus-4-8").bind_tools(tools)tool_node = ToolNode(tools, wrap_tool_call=observability_wrapper)
def call_model(state: AgentState) -> dict: return {"messages": [model.invoke(state["messages"])]}
graph = StateGraph(AgentState)graph.add_node("agent", call_model)graph.add_node("tools", tool_node)graph.add_edge(START, "agent")graph.add_conditional_edges("agent", tools_condition)graph.add_edge("tools", "agent")
app = graph.compile()result = app.invoke({"messages": [{"role": "user", "content": "USD to EUR rate?"}]})
# Second call uses cacheresult2 = app.invoke({"messages": [{"role": "user", "content": "USD to EUR exchange rate again?"}]})print(f"Audit log entries: {len(_call_log)}")Recipe 15: Custom channels — BinaryOperatorAggregate, Topic, and EphemeralValue
Section titled “Recipe 15: Custom channels — BinaryOperatorAggregate, Topic, and EphemeralValue”Custom channels give you precise control over how state fields merge when multiple nodes write to them in the same step.
import operatorfrom typing import Annotated, Sequencefrom typing_extensions import TypedDictfrom langgraph.channels.binop import BinaryOperatorAggregatefrom langgraph.channels.topic import Topicfrom langgraph.channels.ephemeral_value import EphemeralValuefrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Send
# ── Reducer functions ─────────────────────────────────────────────────────────
def keep_max_confidence(a: float, b: float) -> float: """BinaryOperatorAggregate reducer: keep the highest confidence score.""" return max(a, b)
# ── State definition with three custom channel types ─────────────────────────
class EnsembleState(TypedDict): document: str
# BinaryOperatorAggregate: each parallel classifier writes a score; # only the maximum survives to the next node. best_confidence: Annotated[float, keep_max_confidence]
# Topic (step-scoped): ALL classifier labels written this step are collected # into a list; the list is cleared at the start of the next step. this_step_labels: Annotated[Sequence[str], Topic(str)]
# EphemeralValue: carries intermediate tokens from tokenize → classify; # visible only within the same step, never persisted in checkpoints. token_count: Annotated[int, EphemeralValue(int)]
# Standard list accumulation using operator.add all_labels: Annotated[list[str], operator.add]
# ── Nodes ────────────────────────────────────────────────────────────────────
def tokenize(state: EnsembleState) -> dict: """Count tokens — written to EphemeralValue, only visible this step.""" tokens = state["document"].split() return {"token_count": len(tokens)}
def classifier_a(state: EnsembleState) -> dict: """Fast shallow classifier.""" tokens = state.get("token_count", 0) confidence = min(0.5 + tokens * 0.01, 0.85) return { "best_confidence": confidence, "this_step_labels": "invoice", # single value → Topic collects it "all_labels": ["invoice"], }
def classifier_b(state: EnsembleState) -> dict: """Slow deep classifier — usually more accurate.""" return { "best_confidence": 0.93, "this_step_labels": "financial_document", "all_labels": ["financial_document"], }
def classifier_c(state: EnsembleState) -> dict: """Rule-based classifier.""" text = state["document"].lower() label = "invoice" if "invoice" in text or "amount due" in text else "unknown" return { "best_confidence": 0.70 if label != "unknown" else 0.30, "this_step_labels": label, "all_labels": [label], }
def aggregate(state: EnsembleState) -> dict: # this_step_labels contains ALL labels from the three classifiers this step labels = list(state["this_step_labels"]) # e.g. ["invoice", "financial_document", "invoice"] # best_confidence is the max across all three classifiers print(f"Labels this step: {labels}") print(f"Best confidence : {state['best_confidence']}") print(f"token_count (ephemeral, should be 0 or missing here): {state.get('token_count')}") return {}
def fan_out(state: EnsembleState): """Run all three classifiers in parallel via Send.""" return [ Send("classifier_a", state), Send("classifier_b", state), Send("classifier_c", state), ]
# ── Graph ─────────────────────────────────────────────────────────────────────
graph = StateGraph(EnsembleState)graph.add_node("tokenize", tokenize)graph.add_node("classifier_a", classifier_a)graph.add_node("classifier_b", classifier_b)graph.add_node("classifier_c", classifier_c)graph.add_node("aggregate", aggregate)
graph.add_edge(START, "tokenize")graph.add_conditional_edges("tokenize", fan_out, ["classifier_a", "classifier_b", "classifier_c"])graph.add_edge("classifier_a", "aggregate")graph.add_edge("classifier_b", "aggregate")graph.add_edge("classifier_c", "aggregate")graph.add_edge("aggregate", END)
app = graph.compile()result = app.invoke({ "document": "Invoice #1234 — Amount due: $4,500", "best_confidence": 0.0, "this_step_labels": [], "token_count": 0, "all_labels": [],})print(f"All labels accumulated: {result['all_labels']}")Recipe 16: NamedBarrierValue — fan-in that waits for named contributors
Section titled “Recipe 16: NamedBarrierValue — fan-in that waits for named contributors”NamedBarrierValue blocks a node from running until every named token has been written. This is stronger than a standard fan-in edge because it is declared in the state schema and enforced by the channel runtime, not by graph topology alone.
from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.channels.named_barrier_value import NamedBarrierValuefrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Send
REQUIRED_VALIDATORS = {"schema_check", "auth_check", "quota_check"}
class APIGatewayState(TypedDict): request_body: dict user_id: str
# The node that reads this field cannot proceed until all three tokens arrive validation_barrier: Annotated[str, NamedBarrierValue(str, REQUIRED_VALIDATORS)]
response_status: int response_body: str
def check_schema(state: APIGatewayState) -> dict: """Validate request body schema.""" required_fields = {"action", "payload"} missing = required_fields - set(state["request_body"].keys()) if missing: raise ValueError(f"Missing fields: {missing}") return {"validation_barrier": "schema_check"}
def check_auth(state: APIGatewayState) -> dict: """Validate that user_id is present and non-empty.""" if not state["user_id"]: raise PermissionError("Missing user_id") return {"validation_barrier": "auth_check"}
def check_quota(state: APIGatewayState) -> dict: """Check that the user has not exceeded their API quota.""" # In production, query a quota service here allowed_users = {"alice", "bob", "charlie"} if state["user_id"] not in allowed_users: raise PermissionError(f"Quota exceeded for {state['user_id']}") return {"validation_barrier": "quota_check"}
def process_request(state: APIGatewayState) -> dict: """Runs only after schema_check, auth_check, AND quota_check all completed.""" action = state["request_body"].get("action") return { "response_status": 200, "response_body": f"Action '{action}' processed for user {state['user_id']}", }
def dispatch_validators(state: APIGatewayState): """Fan out to all three validators in parallel.""" return [ Send("schema_check", state), Send("auth_check", state), Send("quota_check", state), ]
graph = StateGraph(APIGatewayState)graph.add_node("schema_check", check_schema)graph.add_node("auth_check", check_auth)graph.add_node("quota_check", check_quota)graph.add_node("process", process_request)
graph.add_conditional_edges( START, dispatch_validators, ["schema_check", "auth_check", "quota_check"])# All three must complete before "process" runsgraph.add_edge("schema_check", "process")graph.add_edge("auth_check", "process")graph.add_edge("quota_check", "process")graph.add_edge("process", END)
app = graph.compile()result = app.invoke({ "request_body": {"action": "create_order", "payload": {"item": "widget"}}, "user_id": "alice", "validation_barrier": "", "response_status": 0, "response_body": "",})print(result["response_status"], result["response_body"])# 200 Action 'create_order' processed for user alice