Skip to content

Chapter 1 — Setup & Core Concepts

What you’ll learn: install LangGraph, understand the mental model, and learn the four primitives — state, nodes, edges, compilation — that every graph builds on.

Time: ~15 minutes.

This is the first chapter of the Zero → Hero path. Next chapter builds your first real agent on top of these primitives.

LangGraph is a low-level orchestration framework for building stateful, long-running agent systems. Unlike high-level abstractions that hide complexity, LangGraph gives you full control over:

  • Agent behaviour through explicit state management
  • Conditional logic with fine-grained routing
  • Persistence with durable execution across failures
  • Memory both short-term (checkpoints) and long-term (stores)
  • Human oversight through interrupts and approvals

Built by LangChain Inc, it’s inspired by Google’s Pregel and Apache Beam, providing production-grade infrastructure trusted by Klarna, Replit, and Elastic.

Think of LangGraph as a state machine with graphs:

Initial State → Node A → Condition → [Node B or Node C] → Final State
Checkpoint saved

Each node is a Python function. State flows through edges. Conditions route based on logic. Checkpoints persist progress.


Terminal window
# Core LangGraph
pip install langgraph langchain-core
# Async support
pip install aiosqlite
# For database checkpointing
pip install langgraph[postgres] # PostgreSQL support
pip install psycopg2-binary # PostgreSQL adapter
# LLM providers (example with Anthropic)
pip install langchain-anthropic
# Development & debugging
pip install langgraph-cli # CLI tools
my-agent-project/
├── agent.py # Main agent definitions
├── states.py # State schemas
├── nodes.py # Node implementations
├── tools.py # Custom tools
├── checkpointer.py # Persistence setup
├── langgraph.json # CLI config
└── requirements.txt
agent.py
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing_extensions import TypedDict
class State(TypedDict):
message: str
response: str
def process_node(state: State) -> dict:
return {"response": f"Processed: {state['message']}"}
# Build graph
builder = StateGraph(State)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
# Compile with memory
graph = builder.compile(checkpointer=InMemorySaver())
# Execute
result = graph.invoke(
{"message": "Hello"},
config={"configurable": {"thread_id": "user-1"}}
)
print(result)

State is the single source of truth for your graph. Define it with TypedDict or Pydantic:

from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
class ChatState(TypedDict):
messages: Annotated[list, add_messages] # Merges new + old messages
user_id: str
context: dict
should_continue: bool
# The add_messages reducer automatically appends new messages
# If you pass {"messages": [new_msg]}, it merges with existing

Key insight: The reducer function (like add_messages) defines how state updates combine with existing state.

Custom reducer example:

from operator import add
class CounterState(TypedDict):
count: Annotated[int, add] # 5 + 3 = 8 (not replaced)
last_update: str
class AppendListState(TypedDict):
items: Annotated[list, lambda x, y: x + y] # Custom append logic

Nodes are Python functions that receive state and return updates:

def my_node(state: State) -> dict:
"""Process state and return updates."""
processed = transform(state["data"])
return {
"data": processed,
"step_count": state.get("step_count", 0) + 1
}
# Async nodes
async def async_node(state: State) -> dict:
result = await expensive_operation(state["data"])
return {"result": result}

Critical: Return only the fields you’re updating. Other fields merge automatically.

Edges connect nodes and define control flow:

from langgraph.graph import StateGraph, START, END
builder = StateGraph(State)
# Fixed edge: A → B always
builder.add_edge("node_a", "node_b")
# START/END pseudo-nodes
builder.add_edge(START, "node_a") # Entry point
builder.add_edge("node_b", END) # Exit point
# Conditional edge: Choose next node based on state
def should_continue(state: State) -> str:
if state["counter"] > 5:
return "finish"
return "loop"
builder.add_conditional_edges(
"decision",
should_continue,
{
"finish": END,
"loop": "decision"
}
)

The .compile() method turns your graph into an executable Pregel engine:

from langgraph.checkpoint.sqlite import SqliteSaver
# Compile with persistence
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
graph = builder.compile(checkpointer=checkpointer)
# Without persistence (in-memory only)
graph = builder.compile()

Multiple ways to run your graph:

# Synchronous - blocking
result = graph.invoke(
{"message": "Hello"},
config={"configurable": {"thread_id": "user-1"}}
)
# Streaming - get updates as they happen
for event in graph.stream(
{"message": "Hello"},
config={"configurable": {"thread_id": "user-1"}},
stream_mode="values" # or "updates" or "debug"
):
print(event)
# Batch - process multiple inputs
results = graph.batch(
[{"message": "A"}, {"message": "B"}],
configs=[
{"configurable": {"thread_id": f"user-{i}"}}
for i in range(2)
]
)
# Asynchronous
import asyncio
async_result = await graph.ainvoke({"message": "Hello"}, config={...})
# Streaming async
async for event in graph.astream(...):
print(event)