Class deep-dives — 10 core LangGraph types
Class deep-dives — 10 core LangGraph types
Section titled “Class deep-dives — 10 core LangGraph types”Verified against langgraph==1.2.2 / langgraph-prebuilt==1.1.0 / langgraph-checkpoint==4.1.1.
Each section below was written by inspecting the installed package source directly. All signatures and behaviours are drawn from the actual implementation, not documentation.
1 · StateGraph
Section titled “1 · StateGraph”Module: langgraph.graph.state
Re-exported from: langgraph.graph
StateGraph is the declarative builder for a stateful graph. You declare a state schema, add nodes and edges, and call .compile() to get a runnable.
Constructor
Section titled “Constructor”StateGraph( state_schema: type[StateT], context_schema: type[ContextT] | None = None, *, input_schema: type[InputT] | None = None, output_schema: type[OutputT] | None = None,)state_schema must be a TypedDict, dataclass, or Pydantic BaseModel. Each field maps to a channel:
- Plain field →
LastValuechannel (one write per super-step; concurrent writes raiseInvalidUpdateError) Annotated[T, reducer]→BinaryOperatorAggregatechannel (concurrent writes merged byreducer)Annotated[list[T], Topic(T)]→Topicchannel (concurrent writes collected into a list)
add_node — full signature
Section titled “add_node — full signature”builder.add_node( node: str | callable, action: callable | None = None, *, defer: bool = False, metadata: dict | None = None, input_schema: type | None = None, retry_policy: RetryPolicy | Sequence[RetryPolicy] | None = None, cache_policy: CachePolicy | None = None, error_handler: callable | None = None, destinations: dict[str, str] | tuple[str, ...] | None = None, timeout: float | timedelta | TimeoutPolicy | None = None,)Key kwargs you may not know about:
| kwarg | What it does |
|---|---|
retry_policy | Retry on failure. RetryPolicy(max_attempts=3, backoff_factor=2.0). |
cache_policy | Cache node output. CachePolicy(ttl=300). |
input_schema | Narrow the state keys the node sees — only those keys in the schema are passed. |
error_handler | A fallback node called when this node raises. Receives the full state + __error__ key. |
destinations | Remap outgoing edges: {"next": "actually_this_node"}. |
defer | If True, the node only runs after all non-deferred nodes in the same super-step have finished. |
Full example: retry, cache, input schema
Section titled “Full example: retry, cache, input schema”import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import RetryPolicy, CachePolicyfrom langgraph.checkpoint.memory import InMemorySaverfrom langgraph.cache.memory import InMemoryCache
class PipelineState(TypedDict): query: str results: Annotated[list[str], operator.add] # reducer: accumulate across parallel nodes cost: Annotated[float, operator.add]
class SearchInput(TypedDict): query: str # node only sees this key from state
def web_search(state: SearchInput) -> dict: """Simulates a flaky web search that may need retrying.""" import random if random.random() < 0.3: raise ConnectionError("Transient network error") return {"results": [f"web: {state['query']}"], "cost": 0.001}
def db_search(state: SearchInput) -> dict: return {"results": [f"db: {state['query']}"], "cost": 0.0001}
def error_handler(state: PipelineState) -> dict: """Called when web_search exhausts its retries.""" return {"results": [f"[fallback] {state['query']}"], "cost": 0.0}
builder = StateGraph(PipelineState)
# add_node with retry, cache, narrow input, and error fallbackbuilder.add_node( "web_search", web_search, input_schema=SearchInput, retry_policy=RetryPolicy(max_attempts=4, initial_interval=0.2, backoff_factor=2.0), cache_policy=CachePolicy(ttl=300), # cache for 5 minutes error_handler=error_handler,)builder.add_node( "db_search", db_search, input_schema=SearchInput, retry_policy=RetryPolicy(max_attempts=2),)
# Fan out from START to both nodes, then converge at ENDbuilder.add_edge(START, "web_search")builder.add_edge(START, "db_search")builder.add_edge(["web_search", "db_search"], END)
cache = InMemoryCache()graph = builder.compile(checkpointer=InMemorySaver(), cache=cache)
result = graph.invoke({"query": "langgraph docs", "results": [], "cost": 0.0})print(result["results"]) # e.g. ['web: langgraph docs', 'db: langgraph docs']print(result["cost"]) # sum of both nodes' costsadd_sequence
Section titled “add_sequence”Shortcut to add a linear chain in one call:
from langgraph.graph import StateGraph, START, ENDfrom typing_extensions import TypedDict
class S(TypedDict): value: int
builder = StateGraph(S)builder.add_node("a", lambda s: {"value": s["value"] + 1})builder.add_node("b", lambda s: {"value": s["value"] * 2})builder.add_node("c", lambda s: {"value": s["value"] - 3})
# Equivalent to add_edge(START,"a"), add_edge("a","b"), add_edge("b","c"), add_edge("c",END)builder.add_sequence(["a", "b", "c"])builder.add_edge(START, "a")builder.add_edge("c", END)
graph = builder.compile()print(graph.invoke({"value": 5})) # {"value": (5+1)*2-3 = 9}2 · CompiledStateGraph
Section titled “2 · CompiledStateGraph”Module: langgraph.graph.state
The object returned by StateGraph.compile(). Implements the full LangChain Runnable protocol plus LangGraph-specific methods.
Invoke & stream
Section titled “Invoke & stream”from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.memory import InMemorySaverfrom langchain_core.messages import HumanMessage, AIMessagefrom langgraph.graph.message import add_messagesfrom typing import Annotated
class ChatState(TypedDict): messages: Annotated[list, add_messages]
def chat_node(state: ChatState) -> dict: last = state["messages"][-1].content return {"messages": [AIMessage(content=f"Echo: {last}")]}
builder = StateGraph(ChatState)builder.add_node("chat", chat_node)builder.add_edge(START, "chat")builder.add_edge("chat", END)
graph = builder.compile(checkpointer=InMemorySaver())cfg = {"configurable": {"thread_id": "demo"}}
# --- invoke: returns final state ---result = graph.invoke({"messages": [HumanMessage(content="hi")]}, cfg)print(result["messages"][-1].content) # Echo: hi
# --- stream: yields per-node or per-step dicts ---for chunk in graph.stream( {"messages": [HumanMessage(content="hello")]}, cfg, stream_mode="updates", # "values" | "updates" | "messages" | "custom" | "debug"): print(chunk)# {'chat': {'messages': [AIMessage(content='Echo: hello')]}}get_state / update_state
Section titled “get_state / update_state”from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.memory import InMemorySaverfrom langgraph.types import interrupt, Command
class S(TypedDict): step: int notes: str
def step_node(state: S) -> dict: val = interrupt(f"Approve step {state['step']}?") # pause for human return {"step": state["step"] + 1, "notes": val}
builder = StateGraph(S)builder.add_node("step", step_node)builder.add_edge(START, "step")builder.add_edge("step", END)
graph = builder.compile(checkpointer=InMemorySaver())cfg = {"configurable": {"thread_id": "t1"}}
# First run — pauses at interruptfor ev in graph.stream({"step": 0, "notes": ""}, cfg): print(ev)
# Inspect the paused statesnap = graph.get_state(cfg)print(snap.values) # {'step': 0, 'notes': ''}print(snap.interrupts) # [Interrupt(value='Approve step 0?', id='...')]print(snap.next) # ('step',)
# Manually patch state before resuming (e.g. override a value)graph.update_state(cfg, {"notes": "pre-approved"})
# Resume — passes answer to interrupt()for ev in graph.stream(Command(resume="approved"), cfg): print(ev)
final = graph.get_state(cfg)print(final.values) # {'step': 1, 'notes': 'approved'}get_state_history — time-travel
Section titled “get_state_history — time-travel”from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.memory import InMemorySaver
class Counter(TypedDict): n: int
builder = StateGraph(Counter)builder.add_node("inc", lambda s: {"n": s["n"] + 1})builder.add_edge(START, "inc")builder.add_edge("inc", END)
graph = builder.compile(checkpointer=InMemorySaver())cfg = {"configurable": {"thread_id": "history"}}
# Run 3 times on the same threadfor _ in range(3): graph.invoke({"n": 0}, cfg)
# Walk the checkpoint history newest → oldestfor snap in graph.get_state_history(cfg): print(f"step={snap.metadata.get('step')} n={snap.values.get('n')}")
# Re-run from a specific checkpoint (time-travel)history = list(graph.get_state_history(cfg))old_cfg = history[-1].config # oldest snapshotgraph.invoke(None, old_cfg) # re-replay from therebulk_update_state
Section titled “bulk_update_state”Update multiple state keys atomically in a single checkpoint write:
graph.bulk_update_state( cfg, updates=[ {"n": 99}, {"n": 100}, # second update wins on LastValue channels ], as_node="__start__",)3 · InMemorySaver (BaseCheckpointSaver)
Section titled “3 · InMemorySaver (BaseCheckpointSaver)”Module: langgraph.checkpoint.memory
Alias: MemorySaver (backward-compat)
What it stores
Section titled “What it stores”storage: thread_id → namespace → checkpoint_id → (checkpoint_bytes, metadata_bytes, parent_id)writes: (thread_id, ns, checkpoint_id) → {(task_id, idx): (task_id, channel, value_bytes, path)}blobs: (thread_id, ns, channel, version) → (encoding, bytes)All lookups use get_tuple (latest or by id) and list (history). Async variants are identical sync methods re-used under asyncio.
Custom checkpointer pattern
Section titled “Custom checkpointer pattern”Extend BaseCheckpointSaver to build your own backend:
from collections.abc import Iterator, AsyncIteratorfrom typing import Anyfrom langchain_core.runnables import RunnableConfigfrom langgraph.checkpoint.base import ( BaseCheckpointSaver, Checkpoint, CheckpointMetadata, CheckpointTuple, ChannelVersions, PendingWrite, get_checkpoint_id,)import redisimport json
class RedisCheckpointSaver(BaseCheckpointSaver[str]): """Minimal Redis-backed checkpointer."""
def __init__(self, redis_client: redis.Redis): super().__init__() self.r = redis_client
def _key(self, thread_id: str, ns: str, cp_id: str) -> str: return f"lg:checkpoint:{thread_id}:{ns}:{cp_id}"
def get_next_version(self, current: str | None, channel: Any) -> str: v = int(current.split(".")[0]) if current else 0 return f"{v + 1:032}.0"
def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None: tid = config["configurable"]["thread_id"] ns = config["configurable"].get("checkpoint_ns", "") cp_id = get_checkpoint_id(config) if cp_id: raw = self.r.hget(f"lg:meta:{tid}:{ns}", cp_id) if not raw: return None data = json.loads(raw) else: # Latest: scan all keys for this thread+ns, pick max id all_ids = list(self.r.hkeys(f"lg:meta:{tid}:{ns}")) if not all_ids: return None cp_id = max(k.decode() if isinstance(k, bytes) else k for k in all_ids) raw = self.r.hget(f"lg:meta:{tid}:{ns}", cp_id) data = json.loads(raw)
return CheckpointTuple( config={"configurable": {"thread_id": tid, "checkpoint_ns": ns, "checkpoint_id": cp_id}}, checkpoint=self.serde.loads_typed(tuple(data["checkpoint"])), metadata=self.serde.loads_typed(tuple(data["metadata"])), parent_config=( {"configurable": {"thread_id": tid, "checkpoint_ns": ns, "checkpoint_id": data["parent"]}} if data.get("parent") else None ), )
def list(self, config: RunnableConfig | None, *, filter=None, before=None, limit=None) -> Iterator[CheckpointTuple]: # Simplified: yield nothing (production impl would iterate Redis) return iter([])
def put(self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: ChannelVersions) -> RunnableConfig: tid = config["configurable"]["thread_id"] ns = config["configurable"].get("checkpoint_ns", "") cp_id = checkpoint["id"] parent_id = config["configurable"].get("checkpoint_id")
c = {k: v for k, v in checkpoint.items() if k != "channel_values"} self.r.hset(f"lg:meta:{tid}:{ns}", cp_id, json.dumps({ "checkpoint": list(self.serde.dumps_typed(c)), "metadata": list(self.serde.dumps_typed(metadata)), "parent": parent_id, })) return {"configurable": {"thread_id": tid, "checkpoint_ns": ns, "checkpoint_id": cp_id}}
def put_writes(self, config: RunnableConfig, writes: list[tuple[str, Any]], task_id: str, task_path: str = "") -> None: pass # simplified: skip write persistence
# Usage:# r = redis.Redis(host="localhost", port=6379, db=0)# graph = builder.compile(checkpointer=RedisCheckpointSaver(r))Listing and filtering checkpoints
Section titled “Listing and filtering checkpoints”from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.graph import StateGraph, START, ENDfrom typing_extensions import TypedDict
class S(TypedDict): x: int
builder = StateGraph(S)builder.add_node("inc", lambda s: {"x": s["x"] + 1})builder.add_edge(START, "inc")builder.add_edge("inc", END)
saver = InMemorySaver()graph = builder.compile(checkpointer=saver)cfg = {"configurable": {"thread_id": "inspect"}}
graph.invoke({"x": 1}, cfg)graph.invoke({"x": 1}, cfg)graph.invoke({"x": 1}, cfg)
# List all checkpoints for this thread, newest firstfor tup in saver.list(cfg): print(tup.metadata.get("step"), tup.checkpoint["id"][:8])
# Filter by metadatafor tup in saver.list(cfg, filter={"source": "loop"}): print(tup.checkpoint["id"][:8], "is a loop checkpoint")
# Delete the entire threadsaver.delete_thread("inspect")print(list(saver.list(cfg))) # []4 · ToolNode
Section titled “4 · ToolNode”Module: langgraph.prebuilt.tool_node
Re-exported from: langgraph.prebuilt
State injection
Section titled “State injection”Tools can read the full graph state by declaring a parameter annotated with InjectedState:
import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langchain_core.tools import toolfrom langchain_core.messages import AnyMessagefrom langgraph.prebuilt import ToolNode, InjectedState, InjectedStorefrom langgraph.graph.message import add_messages
class AppState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] user_id: str balance: float
@tooldef get_balance(state: Annotated[AppState, InjectedState]) -> str: """Return the current account balance.""" return f"Balance for user {state['user_id']}: ${state['balance']:.2f}"
@tooldef charge( amount: float, state: Annotated[AppState, InjectedState],) -> str: """Simulate charging the user's account.""" new_balance = state["balance"] - amount if new_balance < 0: return f"Insufficient funds. Balance: ${state['balance']:.2f}" # InjectedState is read-only — return a Command to update state from langgraph.types import Command return Command( update={"balance": new_balance}, goto="agent", # send control back to agent )
tool_node = ToolNode([get_balance, charge])Store injection
Section titled “Store injection”from langchain_core.tools import toolfrom langgraph.prebuilt import ToolNode, InjectedStorefrom langgraph.store.base import BaseStorefrom typing import Annotated
@tooldef remember_fact( fact: str, user_id: str, store: Annotated[BaseStore, InjectedStore],) -> str: """Save a fact about the user for future reference.""" existing = store.get(("users", user_id), "facts") facts = existing.value if existing else [] facts.append(fact) store.put(("users", user_id), "facts", {"items": facts}) return f"Remembered: {fact}"
@tooldef recall_facts( user_id: str, store: Annotated[BaseStore, InjectedStore],) -> str: """Retrieve all stored facts about a user.""" item = store.get(("users", user_id), "facts") if not item: return "No facts stored yet." return f"Facts about {user_id}: {item.value['items']}"wrap_tool_call — intercepting calls before execution
Section titled “wrap_tool_call — intercepting calls before execution”from langgraph.prebuilt import ToolNodefrom langgraph.prebuilt.tool_node import ToolCallRequestfrom langchain_core.tools import toolimport time
@tooldef slow_tool(query: str) -> str: """A tool that might take a while.""" return f"Result for {query}"
def audit_interceptor(request: ToolCallRequest, execute) -> any: """Log every tool call and measure duration.""" t0 = time.perf_counter() print(f"[audit] calling {request.tool_call['name']} args={request.tool_call['args']}") try: result = execute(request) elapsed = time.perf_counter() - t0 print(f"[audit] {request.tool_call['name']} OK in {elapsed:.3f}s") return result except Exception as exc: print(f"[audit] {request.tool_call['name']} FAILED: {exc}") raise
# Modify tool call args before executiondef sanitize_interceptor(request: ToolCallRequest, execute) -> any: """Strip PII from tool args before they hit the tool.""" import re clean_args = { k: re.sub(r"\d{4}-\d{4}-\d{4}-\d{4}", "****-****-****-****", str(v)) for k, v in request.tool_call["args"].items() } modified = {**request.tool_call, "args": clean_args} return execute(request.override(tool_call=modified))
tool_node = ToolNode( [slow_tool], wrap_tool_call=audit_interceptor,)handle_tool_errors
Section titled “handle_tool_errors”from langchain_core.tools import tool, ToolExceptionfrom langgraph.prebuilt import ToolNode
@tooldef risky_tool(x: int) -> int: """Might divide by zero.""" return 100 // x
# Default (True): format exception as a ToolMessage and continuenode_default = ToolNode([risky_tool])
# Custom messagenode_msg = ToolNode([risky_tool], handle_tool_errors="Something went wrong — please try different args.")
# Custom formatterdef my_formatter(exc: Exception) -> str: return f"Tool error [{type(exc).__name__}]: {exc}"
node_custom = ToolNode([risky_tool], handle_tool_errors=my_formatter)
# Specific exception types onlynode_typed = ToolNode([risky_tool], handle_tool_errors=(ZeroDivisionError, ValueError))
# False: exceptions bubble up and can crash the graphnode_strict = ToolNode([risky_tool], handle_tool_errors=False)5 · create_react_agent
Section titled “5 · create_react_agent”Module: langgraph.prebuilt.chat_agent_executor
Re-exported from: langgraph.prebuilt
⚠ Deprecated in
langgraph-prebuilt==1.1.0(LangGraph v1.0).create_react_agenthas moved tolangchain.agents.create_agent. The old function continues to work — it transparently delegates to the new implementation — but emits aDeprecationWarningon each call. All new code should uselangchain.agents.create_agent.Migration is a one-line import change for most projects:
# Beforefrom langgraph.prebuilt import create_react_agentagent = create_react_agent(model, tools)# Afterfrom langchain.agents import create_agentagent = create_agent(model, tools)The parameter names and return type (
CompiledStateGraph) are identical.AgentState,AgentStatePydantic,HumanInterruptConfig,HumanInterrupt, andValidationNodealso moved tolangchain.agents.
Full parameter reference
Section titled “Full parameter reference”from langgraph.prebuilt import create_react_agent
agent = create_react_agent( model=..., # LLM or callable(state, runtime) → LLM tools=[...], # list[BaseTool | callable] or ToolNode prompt=None, # str | SystemMessage | callable | Runnable response_format=None, # Pydantic model, TypedDict, JSON schema, or (prompt, schema) tuple pre_model_hook=None, # callable | Runnable — runs before every LLM call post_model_hook=None, # callable | Runnable — runs after every LLM call state_schema=None, # custom state TypedDict (default: MessagesState) context_schema=None, # read-only context type checkpointer=None, # any BaseCheckpointSaver store=None, # any BaseStore interrupt_before=None, # list of node names to pause before interrupt_after=None, # list of node names to pause after debug=False, version="v2", # "v1" or "v2" name=None, # graph name)Basic: prompt and tools
Section titled “Basic: prompt and tools”from langchain_core.tools import toolfrom langgraph.prebuilt import create_react_agentfrom langgraph.checkpoint.memory import InMemorySaver
@tooldef add(a: int, b: int) -> int: """Add two numbers.""" return a + b
@tooldef multiply(a: int, b: int) -> int: """Multiply two numbers.""" return a * b
# Use a stub so this example runs without an API keyfrom unittest.mock import MagicMockfrom langchain_core.messages import AIMessage, ToolCall
mock_llm = MagicMock()mock_llm.bind_tools.return_value = mock_llmmock_llm.invoke.return_value = AIMessage(content="Done", tool_calls=[])
agent = create_react_agent( model=mock_llm, tools=[add, multiply], prompt="You are a math assistant. Always show your working.", checkpointer=InMemorySaver(),)
cfg = {"configurable": {"thread_id": "math-1"}}result = agent.invoke({"messages": [("user", "What is 6 * 7?")]}, cfg)print(result["messages"][-1].content)pre_model_hook — message trimming
Section titled “pre_model_hook — message trimming”pre_model_hook is a callable/Runnable that receives the full agent state and returns a dict to merge into state before the LLM call. The most common use-case is message trimming or summarisation:
from langchain_core.messages import trim_messages, AnyMessagefrom langgraph.graph.message import add_messagesfrom typing import Annotatedfrom typing_extensions import TypedDict
class TrimmedState(TypedDict): messages: Annotated[list[AnyMessage], add_messages]
def trim_hook(state: TrimmedState) -> dict: """Keep only the last 10 messages to avoid context-window overflow.""" kept = trim_messages( state["messages"], max_tokens=4000, token_counter=len, # replace with tiktoken in production strategy="last", allow_partial=False, ) return {"messages": kept}
agent = create_react_agent( model=mock_llm, tools=[add], pre_model_hook=trim_hook, state_schema=TrimmedState, checkpointer=InMemorySaver(),)post_model_hook — tracking cost / usage
Section titled “post_model_hook — tracking cost / usage”post_model_hook receives the state after the LLM has responded. Use it to record token usage or update counters:
from langchain_core.messages import AnyMessagefrom langgraph.graph.message import add_messagesfrom typing import Annotatedfrom typing_extensions import TypedDict
class TrackedState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] total_tokens: int
def track_usage(state: TrackedState) -> dict: """Accumulate token usage from the last AI message.""" last = state["messages"][-1] usage = getattr(last, "usage_metadata", None) or {} new_tokens = (usage.get("input_tokens", 0) + usage.get("output_tokens", 0)) return {"total_tokens": state.get("total_tokens", 0) + new_tokens}
agent = create_react_agent( model=mock_llm, tools=[add], post_model_hook=track_usage, state_schema=TrackedState,)response_format — structured output
Section titled “response_format — structured output”When you provide response_format, the agent makes a separate LLM call at the end using .with_structured_output() and returns the result in state["structured_response"]:
from pydantic import BaseModelfrom langgraph.prebuilt import create_react_agent
class MathResult(BaseModel): answer: float explanation: str confidence: float
agent = create_react_agent( model=mock_llm, tools=[add, multiply], response_format=MathResult,)
# The structured response is in state["structured_response"]result = agent.invoke({"messages": [("user", "What is 6 * 7?")]})# result["structured_response"] is a MathResult instance (or None if the model fails)You can also pass a (prompt, schema) tuple to use a custom extraction prompt:
agent = create_react_agent( model=mock_llm, tools=[add], response_format=( "Extract the numerical answer and confidence from the conversation.", MathResult, ),)Dynamic model selection
Section titled “Dynamic model selection”Pass a callable (state, runtime) → BaseChatModel to swap the model at run-time based on context:
from dataclasses import dataclassfrom langgraph.runtime import Runtime
@dataclassclass ModelCtx: tier: str = "standard" # "standard" or "premium"
# Pre-instantiate models (don't recreate on every call)# gpt4_model = ChatOpenAI(model="gpt-4o").bind_tools(tools)# gpt35_model = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)
def select_model(state, runtime: Runtime[ModelCtx]): """Use the premium model for premium-tier requests.""" if runtime.context and runtime.context.tier == "premium": return gpt4_model return gpt35_model
# agent = create_react_agent(# model=select_model,# tools=[add, multiply],# context_schema=ModelCtx,# )# result = agent.invoke(# {"messages": [("user", "What is 6 * 7?")]},# context=ModelCtx(tier="premium"),# )interrupt_before / interrupt_after
Section titled “interrupt_before / interrupt_after”Pause execution at specific nodes for human review without modifying node code:
agent = create_react_agent( model=mock_llm, tools=[add, multiply], checkpointer=InMemorySaver(), interrupt_before=["tools"], # pause before every tool execution)
cfg = {"configurable": {"thread_id": "review"}}
# First call pauses before toolsfor ev in agent.stream({"messages": [("user", "What is 6 * 7?")]}, cfg): print(ev)
# Inspectsnap = agent.get_state(cfg)print(snap.next) # ('tools',)print(snap.interrupts) # currently no GraphInterrupt for interrupt_before — just paused at boundary
# Resume executionfrom langgraph.types import Commandfor ev in agent.stream(Command(resume=None), cfg): print(ev)6 · Command
Section titled “6 · Command”Module: langgraph.types
Command is the universal return type for nodes that need to both update state and control the next node. Returning a Command from a node replaces a static edge.
Full signature
Section titled “Full signature”@dataclassclass Command: graph: str | None = None # None = this graph, Command.PARENT = parent graph update: Any | None = None # state update (dict, list of (key,val) tuples, BaseModel) resume: dict | Any | None = None # value(s) to resume interrupt() with goto: Send | Sequence[Send | str] | str = ()Supervisor routing pattern
Section titled “Supervisor routing pattern”from typing_extensions import TypedDict, Literalfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Commandfrom langchain_core.messages import AnyMessage, HumanMessage, AIMessagefrom langgraph.graph.message import add_messagesfrom typing import Annotated
class SupervState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] route: str
def supervisor(state: SupervState) -> Command[Literal["researcher", "writer", END]]: last = state["messages"][-1].content.lower() if "research" in last: target = "researcher" elif "write" in last or "draft" in last: target = "writer" else: target = END return Command( goto=target, update={"route": target}, )
def researcher(state: SupervState) -> Command[Literal["supervisor"]]: response = AIMessage(content="Research complete: found 5 sources.") return Command( goto="supervisor", update={"messages": [response]}, )
def writer(state: SupervState) -> Command[Literal["supervisor"]]: response = AIMessage(content="Draft complete: 500 words written.") return Command( goto="supervisor", update={"messages": [response]}, )
builder = StateGraph(SupervState)builder.add_node("supervisor", supervisor)builder.add_node("researcher", researcher)builder.add_node("writer", writer)builder.add_edge(START, "supervisor")
graph = builder.compile()result = graph.invoke({ "messages": [HumanMessage(content="Please research LangGraph then write a summary.")], "route": "",})for m in result["messages"]: print(m.content)Command.PARENT — escaping a subgraph
Section titled “Command.PARENT — escaping a subgraph”A node inside a subgraph can send a Command up to the parent graph using graph=Command.PARENT:
from langgraph.types import Command
def escalate_node(state: dict) -> Command: """From inside a subgraph, update the parent state and route to a parent node.""" return Command( graph=Command.PARENT, update={"escalation_reason": "budget exceeded"}, goto="approval_node", # a node in the PARENT graph )Resume by interrupt ID
Section titled “Resume by interrupt ID”When a node contains multiple interrupt() calls, resume each by its unique ID:
from langgraph.types import interrupt, Commandfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.memory import InMemorySaver
class MultiState(TypedDict): a: str b: str
def dual_interrupt(state: MultiState) -> dict: answer_a = interrupt({"q": "First question?"}) answer_b = interrupt({"q": "Second question?"}) return {"a": answer_a, "b": answer_b}
builder = StateGraph(MultiState)builder.add_node("dual", dual_interrupt)builder.add_edge(START, "dual")builder.add_edge("dual", END)
graph = builder.compile(checkpointer=InMemorySaver())cfg = {"configurable": {"thread_id": "multi"}}
# First run — pauses at first interruptlist(graph.stream({"a": "", "b": ""}, cfg))
snap = graph.get_state(cfg)first_id = snap.interrupts[0].id
# Resume the first interrupt by its ID, leaving second pendinglist(graph.stream(Command(resume={first_id: "answer-to-first"}), cfg))
snap2 = graph.get_state(cfg)second_id = snap2.interrupts[0].id
# Resume the second interruptlist(graph.stream(Command(resume={second_id: "answer-to-second"}), cfg))goto with Send objects
Section titled “goto with Send objects”from langgraph.types import Command, Send
def fan_out_node(state: dict) -> Command: """Send each item in a list to a worker node in parallel.""" items = state.get("items", []) return Command( goto=[Send("worker", {"item": item, "idx": i}) for i, item in enumerate(items)], )7 · Send
Section titled “7 · Send”Module: langgraph.types
Send routes execution to a named node with a specific input, bypassing the shared state. This enables dynamic fan-out (map-reduce) where the number of parallel branches isn’t known at graph-build time.
Map-reduce pattern
Section titled “Map-reduce pattern”import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Send
class MapState(TypedDict): urls: list[str] # input: list of URLs to scrape results: Annotated[list[str], operator.add] # reduced: all results combined
class WorkerState(TypedDict): url: str result: str
def distribute(state: MapState) -> list[Send]: """Conditional edge — return one Send per URL.""" return [Send("scrape", {"url": u, "result": ""}) for u in state["urls"]]
def scrape(state: WorkerState) -> dict: """Worker: scrape one URL. This runs in parallel across all Sends.""" return {"results": [f"content of {state['url']}"]}
builder = StateGraph(MapState)builder.add_node("scrape", scrape)builder.add_conditional_edges(START, distribute)builder.add_edge("scrape", END)
graph = builder.compile()result = graph.invoke({ "urls": ["a.com", "b.com", "c.com"], "results": [],})print(result["results"]) # ['content of a.com', 'content of b.com', 'content of c.com']Variable-depth tree traversal
Section titled “Variable-depth tree traversal”import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Send
class TreeState(TypedDict): nodes: list[dict] # remaining nodes to process visited: Annotated[list[str], operator.add]
class NodeState(TypedDict): node_id: str children: list[str]
def branch(state: TreeState) -> list[Send] | str: """Fan out to all unvisited nodes.""" if not state["nodes"]: return END return [Send("process", n) for n in state["nodes"]]
def process(state: NodeState) -> dict: """Process a single tree node; report its id.""" return {"visited": [state["node_id"]]}
builder = StateGraph(TreeState)builder.add_node("process", process)builder.add_conditional_edges(START, branch)builder.add_edge("process", END)
graph = builder.compile()result = graph.invoke({ "nodes": [ {"node_id": "A", "children": []}, {"node_id": "B", "children": []}, {"node_id": "C", "children": []}, ], "visited": [],})print(sorted(result["visited"])) # ['A', 'B', 'C']8 · @task and @entrypoint (Functional API)
Section titled “8 · @task and @entrypoint (Functional API)”Module: langgraph.func
The Functional API lets you write a workflow as a normal Python function instead of building a StateGraph. The decorated function gets the same checkpointing, streaming, interrupts, and time-travel.
Parallel tasks with retry and caching
Section titled “Parallel tasks with retry and caching”from langgraph.func import entrypoint, taskfrom langgraph.types import RetryPolicy, CachePolicyfrom langgraph.checkpoint.memory import InMemorySaverfrom langgraph.cache.memory import InMemoryCache
@task( retry_policy=RetryPolicy(max_attempts=3, initial_interval=0.1), cache_policy=CachePolicy(ttl=600), # cache results for 10 minutes name="fetch_url",)def fetch(url: str) -> str: """Fetch a URL — retried up to 3 times, result cached for 10 min.""" import urllib.request try: with urllib.request.urlopen(url, timeout=5) as r: return r.read(500).decode("utf-8", errors="replace") except Exception as e: raise RuntimeError(f"fetch failed: {e}") from e
@task(name="summarise")def summarise(text: str) -> str: """Produce a one-line summary (stub).""" return text[:80].replace("\n", " ") + "..."
cache = InMemoryCache()saver = InMemorySaver()
@entrypoint(checkpointer=saver, cache=cache)def research_pipeline(urls: list[str]) -> list[str]: """Fetch all URLs in parallel, then summarise each.""" # All fetches start immediately (futures) fetch_futures = [fetch(u) for u in urls] # Summarise each — each summarise call can also run in parallel summarise_futures = [summarise(f.result()) for f in fetch_futures] return [sf.result() for sf in summarise_futures]
cfg = {"configurable": {"thread_id": "research-1"}}# First run hits the network; second run reads from cacheresult = research_pipeline.invoke(["https://example.com", "https://python.org"], cfg)print(result)entrypoint.final — save a different value than what you return
Section titled “entrypoint.final — save a different value than what you return”Sometimes you want to return a value to the caller but checkpoint a different value for the next run’s previous:
from langgraph.func import entrypointfrom langgraph.checkpoint.memory import InMemorySaverfrom typing import TypedDict
class ConvHistory(TypedDict): messages: list[str]
saver = InMemorySaver()
@entrypoint(checkpointer=saver)def conversation( user_msg: str, *, previous: ConvHistory | None,) -> str: """Return a response string but checkpoint the full history.""" history = previous or ConvHistory(messages=[]) response = f"You said: {user_msg}"
updated_history = ConvHistory( messages=history["messages"] + [f"user: {user_msg}", f"bot: {response}"] )
# Return the response to the caller, save the full history to checkpoint return entrypoint.final(value=response, save=updated_history)
cfg = {"configurable": {"thread_id": "chat-1"}}print(conversation.invoke("hello", cfg)) # "You said: hello"print(conversation.invoke("world", cfg)) # "You said: world" (previous has full history)Interrupts inside tasks
Section titled “Interrupts inside tasks”from langgraph.func import entrypoint, taskfrom langgraph.types import interrupt, Commandfrom langgraph.checkpoint.memory import InMemorySaver
@taskdef draft_email(to: str, subject: str) -> str: return f"To: {to}\nSubject: {subject}\n\nDear {to}, ..."
@entrypoint(checkpointer=InMemorySaver())def send_email_workflow(params: dict) -> dict: draft = draft_email(params["to"], params["subject"]).result()
# Pause for human approval — the task has already run (results cached) approved = interrupt({ "draft": draft, "question": "Send this email?", })
if approved == "yes": return {"status": "sent", "draft": draft} return {"status": "cancelled", "draft": draft}
cfg = {"configurable": {"thread_id": "email-1"}}list(send_email_workflow.stream({"to": "alice@example.com", "subject": "Hello"}, cfg))
# Inspect and resumesnap = send_email_workflow.get_state(cfg)print(snap.interrupts[0].value["question"]) # 'Send this email?'
list(send_email_workflow.stream(Command(resume="yes"), cfg))Async tasks
Section titled “Async tasks”import asynciofrom langgraph.func import entrypoint, taskfrom langgraph.checkpoint.memory import InMemorySaver
@taskasync def async_fetch(url: str) -> str: import aiohttp async with aiohttp.ClientSession() as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as r: return await r.text()
@entrypoint(checkpointer=InMemorySaver())async def async_pipeline(urls: list[str]) -> list[str]: futures = [async_fetch(u) for u in urls] return await asyncio.gather(*futures)
async def main(): cfg = {"configurable": {"thread_id": "async-1"}} result = await async_pipeline.ainvoke(["https://example.com"], cfg) print(result)
# asyncio.run(main())9 · BinaryOperatorAggregate and Topic
Section titled “9 · BinaryOperatorAggregate and Topic”Module: langgraph.channels.binop, langgraph.channels.topic
Both channels handle concurrent writes in the same super-step — situations where two or more parallel nodes write to the same state key. The default LastValue channel raises InvalidUpdateError in that case.
BinaryOperatorAggregate — reducer merging
Section titled “BinaryOperatorAggregate — reducer merging”import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, END
class AggState(TypedDict): total: Annotated[int, operator.add] # accumulate integers max_score: Annotated[float, max] # keep the maximum tags: Annotated[list[str], operator.add] # concatenate lists combined: Annotated[str, lambda a, b: a + "|" + b] # custom reducer
def node_a(state: AggState) -> dict: return {"total": 10, "max_score": 0.8, "tags": ["fast"], "combined": "A"}
def node_b(state: AggState) -> dict: return {"total": 5, "max_score": 0.95, "tags": ["accurate"], "combined": "B"}
def node_c(state: AggState) -> dict: return {"total": 3, "max_score": 0.7, "tags": ["cheap"], "combined": "C"}
builder = StateGraph(AggState)builder.add_node("a", node_a)builder.add_node("b", node_b)builder.add_node("c", node_c)# All three run in parallel from STARTbuilder.add_edge(START, "a")builder.add_edge(START, "b")builder.add_edge(START, "c")builder.add_edge(["a", "b", "c"], END)
graph = builder.compile()result = graph.invoke({"total": 0, "max_score": 0.0, "tags": [], "combined": ""})print(result["total"]) # 18 (10+5+3)print(result["max_score"]) # 0.95print(result["tags"]) # ['fast', 'accurate', 'cheap']print(result["combined"]) # 'A|B|C' (order depends on completion)Overwrite — force-reset a reducing channel
Section titled “Overwrite — force-reset a reducing channel”Normally a reducer merges values. Overwrite skips the merge and replaces instead:
from langgraph.types import Overwrite
def reset_node(state: AggState) -> dict: """Forcibly reset the total even though it has a reducer.""" return {"total": Overwrite(0)}Topic — PubSub event buffer
Section titled “Topic — PubSub event buffer”Topic collects all writes in a super-step into a list, then clears it for the next step (unless accumulate=True):
import operatorfrom typing import Annotated, Sequencefrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.channels import Topic
class EventState(TypedDict): events: Annotated[list[str], Topic(str)] # cleared each step history: Annotated[list[str], Topic(str, accumulate=True)] # never cleared
def producer_a(state: EventState) -> dict: return {"events": "a_ran", "history": "a_ran"}
def producer_b(state: EventState) -> dict: return {"events": "b_ran", "history": "b_ran"}
def consumer(state: EventState) -> dict: print(f"events this step: {state['events']}") print(f"all history: {state['history']}") return {}
builder = StateGraph(EventState)builder.add_node("a", producer_a)builder.add_node("b", producer_b)builder.add_node("consume", consumer)builder.add_edge(START, "a")builder.add_edge(START, "b")builder.add_edge(["a", "b"], "consume")builder.add_edge("consume", END)
graph = builder.compile()graph.invoke({"events": [], "history": []})# events this step: ['a_ran', 'b_ran']# all history: ['a_ran', 'b_ran']Choosing the right channel
Section titled “Choosing the right channel”| Scenario | Channel | Declaration |
|---|---|---|
| Single writer per step | LastValue (default) | value: str |
| Sum / count across parallel nodes | BinaryOperatorAggregate | Annotated[int, operator.add] |
| Keep max/min from parallel nodes | BinaryOperatorAggregate | Annotated[float, max] |
| Collect all events in a step | Topic | Annotated[list[str], Topic(str)] |
| Accumulate events forever | Topic(accumulate=True) | Annotated[list[str], Topic(str, accumulate=True)] |
| Message history (smart merge) | add_messages | Annotated[list[AnyMessage], add_messages] |
| Force-reset a reducing channel | Overwrite | Return {"key": Overwrite(new_value)} |
10 · InMemoryStore
Section titled “10 · InMemoryStore”Module: langgraph.store.memory
Base class: langgraph.store.base.BaseStore
InMemoryStore provides long-term, cross-thread memory — a key-value store whose data lives outside checkpoints and survives across multiple conversations. Optionally add a vector index for semantic search.
Basic CRUD
Section titled “Basic CRUD”from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
# --- put ---store.put(("users", "alice"), "preferences", {"theme": "dark", "lang": "en"})store.put(("users", "alice"), "notes", {"items": ["remember to call Bob"]})
# --- get ---item = store.get(("users", "alice"), "preferences")print(item.value) # {'theme': 'dark', 'lang': 'en'}print(item.namespace) # ('users', 'alice')print(item.key) # 'preferences'print(item.created_at) # datetime
# --- update: put again (upserts) ---store.put(("users", "alice"), "preferences", {"theme": "light", "lang": "en"})
# --- delete ---store.delete(("users", "alice"), "notes")print(store.get(("users", "alice"), "notes")) # NoneSearching and listing namespaces
Section titled “Searching and listing namespaces”store.put(("users", "bob"), "preferences", {"theme": "dark"})store.put(("users", "carol"), "preferences", {"theme": "light"})store.put(("teams", "eng"), "config", {"sprint": 42})
# search by metadata filterresults = store.search(("users",), filter={"theme": "dark"})for hit in results: print(hit.namespace, hit.key, hit.value)# ('users', 'alice') preferences {'theme': 'dark', ...} (if still dark)# ('users', 'bob') preferences {'theme': 'dark'}
# list_namespacesnamespaces = store.list_namespaces(prefix=("users",))print(namespaces)# [('users', 'alice'), ('users', 'bob'), ('users', 'carol')]
# max_depth=1 — get only the first segmentprint(store.list_namespaces(max_depth=1))# [('users',), ('teams',)]Batch operations
Section titled “Batch operations”batch() submits multiple operations in one call — important for backends like Postgres where each call is a round-trip:
from langgraph.store.base import GetOp, PutOp, SearchOp
results = store.batch([ GetOp(namespace=("users", "alice"), key="preferences"), PutOp(namespace=("users", "dan"), key="prefs", value={"theme": "dark"}), SearchOp(namespace_prefix=("users",), filter={"theme": "dark"}, limit=5),])
get_result, put_result, search_result = resultsprint(get_result.value if get_result else "not found")print(search_result) # list[SearchItem]Semantic / vector search
Section titled “Semantic / vector search”Install numpy for better performance (pip install numpy). Pass index to InMemoryStore to enable vector similarity search. Use embed with any function that maps list[str] → list[list[float]]:
from langgraph.store.memory import InMemoryStore
def embed(texts: list[str]) -> list[list[float]]: """Stub embedder: returns the character count as a 1-D vector.""" return [[float(len(t))] for t in texts]
store = InMemoryStore( index={ "dims": 1, "embed": embed, "fields": ["text"], # which value keys to embed; default is entire value })
store.put(("docs",), "python-guide", {"text": "Python programming guide"})store.put(("docs",), "typescript-guide", {"text": "TypeScript programming guide"})store.put(("docs",), "docker-intro", {"text": "Docker container introduction"})
# Semantic searchhits = store.search(("docs",), query="Python scripting tutorial", limit=2)for h in hits: print(h.key, h.score, h.value["text"])Using the store inside a graph
Section titled “Using the store inside a graph”Wire store into .compile() and access it from nodes via Runtime:
from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.store.memory import InMemoryStorefrom langgraph.runtime import Runtime
class ConvState(TypedDict): user_id: str message: str reply: str
def recall_and_respond(state: ConvState, runtime: Runtime) -> dict: """Reads from store, produces a reply, and writes back.""" user_id = state["user_id"] store = runtime.store
# Load prior messages history_item = store.get(("chats", user_id), "history") history = history_item.value["msgs"] if history_item else []
# Produce a reply (stub) reply = f"[based on {len(history)} prior messages] {state['message']!r}"
# Save updated history store.put(("chats", user_id), "history", {"msgs": history + [state["message"]]})
return {"reply": reply}
store = InMemoryStore()builder = StateGraph(ConvState)builder.add_node("respond", recall_and_respond)builder.add_edge(START, "respond")builder.add_edge("respond", END)graph = builder.compile(store=store)
cfg1 = {"configurable": {"thread_id": "t1"}}print(graph.invoke({"user_id": "alice", "message": "hello", "reply": ""}, cfg1)["reply"])print(graph.invoke({"user_id": "alice", "message": "how are you?", "reply": ""}, cfg1)["reply"])# Second call shows 1 prior message in historyTTL support
Section titled “TTL support”InMemoryStore accepts a ttl kwarg on put() (seconds until expiry). The store auto-expires entries on read:
import time
store = InMemoryStore()store.put(("sessions",), "tok123", {"data": "session data"}, ttl=2) # expires in 2 sitem = store.get(("sessions",), "tok123")print(item is not None) # True
time.sleep(3)item = store.get(("sessions",), "tok123")print(item) # None — expiredQuick reference: which feature lives where
Section titled “Quick reference: which feature lives where”| Feature | Class/Function | Module |
|---|---|---|
| Build a graph | StateGraph | langgraph.graph.state |
| Run a graph | CompiledStateGraph | langgraph.graph.state |
| Short-term memory (in-process) | InMemorySaver | langgraph.checkpoint.memory |
| Custom checkpointer | BaseCheckpointSaver | langgraph.checkpoint.base |
| Tool execution node | ToolNode | langgraph.prebuilt.tool_node |
| Quick ReAct agent (deprecated) | create_react_agent | langgraph.prebuilt — use langchain.agents.create_agent |
| Routing + state updates | Command | langgraph.types |
| Dynamic fan-out | Send | langgraph.types |
| Pause for human input | interrupt() | langgraph.types |
| Function-based workflow | @entrypoint / @task | langgraph.func |
| Concurrent write merging | BinaryOperatorAggregate | langgraph.channels.binop |
| PubSub event channel | Topic | langgraph.channels.topic |
| Long-term / cross-thread memory | InMemoryStore | langgraph.store.memory |
| Node retry on failure | RetryPolicy | langgraph.types |
| Node result caching | CachePolicy + InMemoryCache | langgraph.types, langgraph.cache.memory |
| Recursion limit detection | IsLastStep / RemainingSteps | langgraph.managed.is_last_step |