Chapter 10 — Production & Troubleshooting
Chapter 10 — Production & Troubleshooting
Section titled “Chapter 10 — Production & Troubleshooting”What you’ll learn: the smallest viable deployment path — a Docker image, a langgraph.json CLI config, and calling a deployed graph via the LangGraph SDK. Plus troubleshooting for the five errors you’ll hit most often.
Time: ~20 minutes.
For the full deployment playbook (Kubernetes, cost optimization, disaster recovery, observability), continue to the Production Guide after this chapter.
Production Deployment
Section titled “Production Deployment”Docker Setup
Section titled “Docker Setup”# DockerfileFROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Start LangGraph serverCMD ["langgraph", "run", "--host", "0.0.0.0", "--port", "8000"]# Build and rundocker build -t my-agent:v1 .docker run -p 8000:8000 \ -e ANTHROPIC_API_KEY=$ANTHROPIC_API_KEY \ my-agent:v1CLI Configuration
Section titled “CLI Configuration”Save the following as langgraph.json in your project root:
{ "dependencies": [ "langchain_anthropic", "langchain_tavily", "./agents" ], "graphs": { "main_agent": "./agents.py:graph", "research_agent": "./agents.py:research_graph" }, "env": "./.env", "python_version": "3.11"}Remote Execution via SDK
Section titled “Remote Execution via SDK”from langgraph_sdk import get_clientimport asyncio
async def main(): client = get_client(url="https://my-deployment.langraph.app")
# List available assistants (from langgraph.json graphs) assistants = await client.assistants.search() assistant_id = assistants[0]["assistant_id"]
# Create conversation thread thread = await client.threads.create()
# Stream execution async for chunk in client.runs.stream( thread_id=thread["thread_id"], assistant_id=assistant_id, input={"query": "Research AI trends"} ): if chunk.event == "messages/partial": print(chunk.data[0]["content"], end="", flush=True)
# Get final state final_state = await client.threads.get_state(thread["thread_id"]) print(f"\nFinal: {final_state}")
asyncio.run(main())Common Patterns Summary
Section titled “Common Patterns Summary”| Pattern | Use Case | Key Idea |
|---|---|---|
| Linear | Simple pipelines | Node A → B → C → END |
| Conditional | Decision trees | Routes based on state |
| Looping | Iterations | Self-referencing edges with exit condition |
| Supervisor | Multi-agent | Central router to specialists |
| Parallel | Concurrent work | Fan-out with Send, fan-in with collection |
| ReAct | Autonomous agent | Reason → Action → Observe loop |
| Tree-of-Thoughts | Complex reasoning | Multiple parallel thought paths |
| Reflection | Quality improvement | Self-critique → Refine loop |
| Interrupt | Human approval | Pause, wait, resume with Command |
| Caching | Performance | Store expensive results |
Troubleshooting
Section titled “Troubleshooting”Issue: “Checkpointer must be provided for interrupts”
Section titled “Issue: “Checkpointer must be provided for interrupts””Cause: Trying to use interrupt() without a checkpointer
Fix: Always compile with a checkpointer when using interrupts:
graph = builder.compile(checkpointer=InMemorySaver())Issue: State not persisting across invocations
Section titled “Issue: State not persisting across invocations”Cause: Missing thread_id in config
Fix: Always provide consistent thread_id:
config = {"configurable": {"thread_id": "unique-id"}}result = graph.invoke(input, config=config) # Same config each timeIssue: Reducer functions not working
Section titled “Issue: Reducer functions not working”Cause: Not using Annotated with reducer function
Fix: Proper state schema:
# Wrongclass State(TypedDict): messages: list
# Correctclass State(TypedDict): messages: Annotated[list, add_messages]Issue: Tools not being called
Section titled “Issue: Tools not being called”Cause: Model not properly bound to tools
Fix: Use .bind_tools():
model_with_tools = model.bind_tools(tools)response = model_with_tools.invoke(messages) # WorksIssue: Infinite loops
Section titled “Issue: Infinite loops”Cause: Conditional edge always returns to same node
Fix: Add iteration counter or state check:
def should_continue(state) -> str: if state.get("iterations", 0) >= MAX_ITERATIONS: return END return "process"Performance Tips
Section titled “Performance Tips”- Use async when possible:
ainvoke()andastream()for I/O-bound tasks - Batch processing:
graph.batch()for multiple inputs - Streaming: Use
stream_mode="updates"to reduce data transfer - Checkpointer selection: PostgreSQL > SQLite > In-Memory based on scale
- Cache expensive operations: Store results in long-term Store
- Limit iterations: Always set
MAX_ITERATIONSto prevent runaway loops
Next steps
Section titled “Next steps”You’ve finished the Zero → Hero path. 🎉 Where to go from here:
- Build something real — pick a recipe from the Recipes collection (RAG, support router, research agent, doc pipeline, long-term memory chat).
- Ship it — read the full Production Guide covering Kubernetes, observability, cost tracking, and disaster recovery.
- Scale it — see Performance Optimization and Observability.
- Stream it — the FastAPI streaming server example shows token-level SSE from a compiled graph.
Good luck, and welcome to durable, stateful agent systems.