StateGraph — API reference
StateGraph — API reference
Section titled “StateGraph — API reference”Verified against langgraph==1.2.4 (modules: langgraph.graph.state, langgraph.types).
StateGraph is the primary graph builder. You declare a state schema, add nodes and edges, then call .compile() to get a CompiledStateGraph that implements the LangChain Runnable protocol (invoke / stream / ainvoke / astream).
Minimal runnable example
Section titled “Minimal runnable example”from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.memory import InMemorySaver
class State(TypedDict): counter: int
def increment(state: State) -> dict: return {"counter": state["counter"] + 1}
builder: StateGraph[State, None, State, State] = StateGraph(State)builder.add_node("increment", increment)builder.add_edge(START, "increment")builder.add_edge("increment", END)
graph = builder.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": "1"}}print(graph.invoke({"counter": 0}, config)) # {'counter': 1}Imports at a glance
Section titled “Imports at a glance”All symbols below come from the exact module path in the installed package.
| Symbol | Import path |
|---|---|
StateGraph, CompiledStateGraph | langgraph.graph.state (also re-exported from langgraph.graph) |
START, END | langgraph.graph (re-exported from langgraph.constants) |
add_messages, MessagesState, REMOVE_ALL_MESSAGES | langgraph.graph.message |
Command, Send, interrupt, StateSnapshot, Interrupt, Overwrite, RetryPolicy, CachePolicy, Durability, GraphOutput | langgraph.types |
Runtime, ExecutionInfo, ServerInfo, get_runtime | langgraph.runtime |
InMemorySaver | langgraph.checkpoint.memory |
BaseStore, InMemoryStore | langgraph.store.base, langgraph.store.memory |
The top-level langgraph.graph.__init__ only re-exports START, END, StateGraph, add_messages, MessagesState, MessageGraph — everything else must be imported from its real module.
Constructor
Section titled “Constructor”StateGraph( state_schema: type[StateT], context_schema: type[ContextT] | None = None, *, input_schema: type[InputT] | None = None, output_schema: type[OutputT] | None = None,)state_schema— aTypedDict, dataclass, or PydanticBaseModel. Each field defines a channel; annotating withAnnotated[T, reducer]turns it into a reducing channel.context_schema— run-scoped read-only context (e.g.user_id,db_conn). Injected viaRuntime[ContextT](see below).input_schema/output_schema— optional narrower schemas that differ from the main state.
Deprecated kwargs that still work but warn:
config_schema→ usecontext_schema(deprecated since v0.6).input,output→ useinput_schema,output_schema(deprecated since v0.5).
Reducers and add_messages
Section titled “Reducers and add_messages”A reducer is a function (current, update) -> new_value attached to a state key with Annotated[...].
import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph.message import add_messagesfrom langchain_core.messages import AnyMessage
class ChatState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] visited: Annotated[list[str], operator.add]add_messagesmerges two message lists byid: same-id messages overwrite, new-id messages append. Passformat="langchain-openai"to coerce to OpenAI-format blocks (requireslangchain-core>=0.3.11).REMOVE_ALL_MESSAGES(fromlanggraph.graph.message) is a sentinel id on aRemoveMessage(id=REMOVE_ALL_MESSAGES)that wipes the history.- Without a reducer, a channel uses
LastValuesemantics: the latest write wins, and two concurrent writes in one super-step raiseInvalidUpdateError.
Bypass a reducer for a single write with Overwrite:
from langgraph.types import Overwrite
def replace_messages(state: ChatState) -> dict: return {"messages": Overwrite(value=[])}add_node
Section titled “add_node”Four overloads, all returning Self for chaining:
builder.add_node(fn) # name = fn.__name__builder.add_node("my_node", fn) # explicit namebuilder.add_node(fn, input_schema=NodeInput) # per-node input schemabuilder.add_node("my_node", fn, input_schema=NodeInput)All overloads accept the same keyword options:
| Option | Type | Effect |
|---|---|---|
defer | bool | Run this node only when the graph is about to finish (after all other tasks drain). Useful for summarization/finalization. |
metadata | dict | Attached to the node; surfaces in tracing/streaming metadata. |
input_schema | type | Node receives a narrower shape. Channels outside this schema are not visible. |
retry_policy | RetryPolicy | Sequence[RetryPolicy] | Controls retries on exceptions. First matching policy in a sequence wins. |
cache_policy | CachePolicy | Cache the node’s output by input hash. Requires a cache= backend on .compile(). |
timeout | float | timedelta | TimeoutPolicy | None | Per-attempt timeout. A plain float/timedelta is the wall-clock limit; TimeoutPolicy adds idle-timeout and heartbeat support. |
destinations | dict[str, str] | tuple[str, ...] | Visualization hint for edgeless nodes that return Command(goto=...). Does not affect execution. |
A node’s callable signature can be any of:
def node(state: State): ...def node(state: State, config: RunnableConfig): ...def node(state: State, runtime: Runtime[Context]): ...def node(state: State, *, writer: StreamWriter): ... # opt-in custom streamasync def node(state: State, runtime: Runtime[Context]): ...Return types: dict, the state schema instance, None, or a Command. Returning None is a no-op on all channels.
RetryPolicy
Section titled “RetryPolicy”from langgraph.types import RetryPolicy
builder.add_node( "risky", risky_fn, retry_policy=RetryPolicy( initial_interval=0.5, # seconds before first retry backoff_factor=2.0, # exponential multiplier max_interval=128.0, max_attempts=3, jitter=True, retry_on=ConnectionError, # type, tuple, or Callable[[Exception], bool] ),)retry_on accepts an exception type, a tuple of types, or a predicate. Default is langgraph._internal._retry.default_retry_on (retries on httpx.HTTPStatusError 5xx, httpx.TransportError, ConnectionError, and request timeouts).
CachePolicy
Section titled “CachePolicy”from langgraph.types import CachePolicyfrom langgraph.cache.memory import InMemoryCache
builder.add_node("lookup", lookup, cache_policy=CachePolicy(ttl=300))graph = builder.compile(cache=InMemoryCache())key_func defaults to pickle-hashing the input. Pass a custom (input) -> str | bytes for deterministic cache keys.
TimeoutPolicy
Section titled “TimeoutPolicy”TimeoutPolicy (from langgraph.types) controls per-attempt cancellation for async nodes. A plain float or timedelta on the timeout= kwarg is a shorthand for TimeoutPolicy(run_timeout=...).
from datetime import timedeltafrom langgraph.types import TimeoutPolicy
@dataclass(**_DC_KWARGS)class TimeoutPolicy: run_timeout: float | timedelta | None = None # hard wall-clock cap idle_timeout: float | timedelta | None = None # max time between progress signals refresh_on: Literal["auto", "heartbeat"] = "auto"run_timeout — hard cap for a single attempt. Never refreshed, even by heartbeats.
idle_timeout — cap on how long the attempt may sit without a progress signal. Progress signals under "auto" mode include:
- Any LangChain callback event inside the node or its descendants.
- Explicit
runtime.heartbeat()calls. - Stream writer writes.
Under "heartbeat" mode, only runtime.heartbeat() resets the clock.
When the timeout fires, NodeTimeoutError is raised. The node’s retry_policy (if set) then decides whether to retry.
from langgraph.types import TimeoutPolicy, RetryPolicy
# Hard cap: abort after 30 seconds regardless of progressbuilder.add_node( "llm_call", llm_node, timeout=30.0, # same as TimeoutPolicy(run_timeout=30.0))
# Idle cap: reset on every LLM callback token, abort if silent for 10 sbuilder.add_node( "streaming_llm", streaming_node, timeout=TimeoutPolicy(idle_timeout=10.0),)
# Explicit heartbeat mode: node must call runtime.heartbeat() every 60 sasync def long_download(state: State, runtime: Runtime) -> dict: for chunk in download_chunks(state["url"]): runtime.heartbeat() # prevents idle-timeout eviction process(chunk) return {"done": True}
builder.add_node( "download", long_download, timeout=TimeoutPolicy(idle_timeout=60.0, refresh_on="heartbeat"), retry_policy=RetryPolicy(max_attempts=3),)Sync nodes are not supported.
timeout=on a sync node raisesValueErrorat compile time. Useasyncio.to_threador wrap the node in an async wrapper if you need timeouts on CPU-bound code.
add_edge
Section titled “add_edge”builder.add_edge("a", "b") # singlebuilder.add_edge(["a", "b"], "c") # waits for ALL of a, b (barrier edge)builder.add_edge(START, "a") # entry pointbuilder.add_edge("last", END) # finish pointRaises ValueError if the start is END, the end is START, or a named node is missing.
add_conditional_edges
Section titled “add_conditional_edges”builder.add_conditional_edges( source: str, path: Callable[..., Hashable | Sequence[Hashable]], path_map: dict[Hashable, str] | list[str] | None = None,)path is called with the state (and optionally config/runtime) and returns:
- a single node name → routes there,
- a list of node names → fan-out to all,
- one or more
Send(node, arg)instances → map-reduce with custom per-destination state, - the string
"END"orEND→ stop.
If your path returns arbitrary labels, map them to node names with path_map. Adding a Literal[...] return annotation or passing path_map keeps the Mermaid diagram accurate — without either, the visualizer assumes every node is reachable.
add_sequence
Section titled “add_sequence”builder.add_sequence([ load_docs, ("retrieve", retrieve_fn), # tuple = (name, callable) rerank,])Wires the nodes in order with auto-generated edges and uses each callable’s __name__ if no explicit name is given. Raises on empty input or duplicate names.
Entry / exit helpers
Section titled “Entry / exit helpers”builder.set_entry_point("planner") # == add_edge(START, "planner")builder.set_finish_point("writer") # == add_edge("writer", END)
builder.set_conditional_entry_point( router, path_map={"yes": "a", "no": "b"})set_node_defaults
Section titled “set_node_defaults”builder.set_node_defaults( *, retry_policy: RetryPolicy | Sequence[RetryPolicy] | None = None, cache_policy: CachePolicy | None = None, error_handler: StateNode | None = None, timeout: float | timedelta | TimeoutPolicy | None = None,) -> SelfSet default policies that apply to every node in the graph. Per-node values on add_node(...) always override these defaults. Defaults are resolved at compile time. They are not inherited by subgraphs.
| Parameter | Applies to | Notes |
|---|---|---|
retry_policy | All nodes (including error-handler nodes) | Same as add_node(..., retry_policy=...) |
cache_policy | Regular nodes only | Caching error-handler results is unsafe — skipped automatically |
error_handler | Regular nodes (not error-handler nodes) | Default error handler when a regular node raises and no per-node handler is set |
timeout | All nodes (including error-handler nodes) | Accepts float (seconds), timedelta, or TimeoutPolicy |
from langgraph.graph import StateGraph, START, ENDfrom langgraph.types import RetryPolicy, TimeoutPolicyfrom langgraph.checkpoint.memory import InMemorySaverfrom typing_extensions import TypedDict
class State(TypedDict): count: int last_error: str
# Graph-wide fallback handler for unhandled exceptionsdef global_error_handler(state: State, error: Exception) -> dict: return {"last_error": f"{type(error).__name__}: {error}"}
def flaky_node(state: State) -> dict: if state["count"] % 3 == 0: raise ConnectionError("simulated transient failure") return {"count": state["count"] + 1}
def finalize(state: State) -> dict: return {}
builder = ( StateGraph(State) # Apply defaults to every node: 3 retries on any exception, 10 s timeout, # and a global error handler for unhandled failures. .set_node_defaults( retry_policy=RetryPolicy(max_attempts=3), timeout=10.0, error_handler=global_error_handler, ) .add_node("flaky", flaky_node) .add_node("finalize", finalize, retry_policy=RetryPolicy(max_attempts=1)) # overrides the graph default .add_edge(START, "flaky") .add_edge("flaky", "finalize") .add_edge("finalize", END))
graph = builder.compile(checkpointer=InMemorySaver())result = graph.invoke({"count": 0, "last_error": ""}, {"configurable": {"thread_id": "t1"}})print(result)Per-node retry_policy=RetryPolicy(max_attempts=1) on "finalize" overrides the graph-level max_attempts=3. The error_handler and timeout from set_node_defaults still apply to "finalize" because no per-node override was given for those.
set_node_defaults returns Self for method chaining:
builder = ( StateGraph(State) .set_node_defaults( retry_policy=RetryPolicy(max_attempts=3, retry_on=ConnectionError), error_handler=my_fallback, ) .add_node("a", node_a) .add_node("b", node_b, retry_policy=RetryPolicy(max_attempts=1)) # overrides .add_edge(START, "a") .add_edge("a", "b") .add_edge("b", END))graph = builder.compile()compile(...)
Section titled “compile(...)”graph = builder.compile( checkpointer=None, # BaseCheckpointSaver | True | False | None *, cache=None, # BaseCache, needed for CachePolicy store=None, # BaseStore for long-term memory interrupt_before=None, # list[str] | "*" | None interrupt_after=None, # list[str] | "*" | None debug=False, name=None,)checkpointer=Trueis only valid when the graph is used as a subgraph — it inherits the parent’s checkpointer. On a root graph,TrueraisesRuntimeError.checkpointer=Falseexplicitly disables checkpointing even when the parent has one.interrupt_before/interrupt_afteraccept"*"(all nodes) or a list of node names.storeis required whenever any tool/node usesInjectedStoreor readsruntime.store.
Returns a CompiledStateGraph, which exposes (all inherited from Pregel):
| Method | Purpose |
|---|---|
invoke(input, config=None, *, context=None, stream_mode=None, interrupt_before=None, interrupt_after=None, durability=None, version="v1") | Run to completion, return final state. |
stream(...) | Yield per-step events (see the Streaming modes reference). |
ainvoke / astream | Async variants. |
get_state(config, *, subgraphs=False) | Return the current StateSnapshot for a thread. Requires a checkpointer. |
get_state_history(config, *, filter=None, before=None, limit=None) | Iterate historical snapshots (newest first). |
update_state(config, values, as_node=None, task_id=None) | Write an update as if it came from as_node. |
bulk_update_state(config, supersteps) | Apply multiple StateUpdate groups as distinct super-steps. |
get_subgraphs(namespace=None, recurse=False) | Iterate nested compiled graphs. |
get_graph(...) / draw_mermaid() / draw_png() | Visualization helpers. |
Durability modes
Section titled “Durability modes”Pass durability="sync" | "async" | "exit" on invoke/stream. Semantics:
| Mode | When checkpoints are persisted |
|---|---|
"sync" | Before the next step begins. Strongest guarantee, slowest. |
"async" | Written asynchronously while the next step runs. Default. |
"exit" | Only at graph exit. Cheapest, no mid-run time-travel. |
checkpoint_during=False is deprecated and maps to durability="exit".
Runtime context (Runtime[Context])
Section titled “Runtime context (Runtime[Context])”Runtime bundles per-run data separate from state. Added in v0.6.
from dataclasses import dataclassfrom langgraph.runtime import Runtime
@dataclassclass Ctx: user_id: str
def node(state: State, runtime: Runtime[Ctx]) -> dict: uid = runtime.context.user_id if runtime.store: memory = runtime.store.get(("users",), uid) return {...}
graph = StateGraph(State, context_schema=Ctx).add_node(node).compile()graph.invoke({...}, context=Ctx(user_id="alice"))Runtime fields:
| Field | Type | Description |
|---|---|---|
context | ContextT | What you passed in context=. |
store | BaseStore | None | What you passed to compile(store=...). |
stream_writer | (Any) -> None | Writes a value to stream_mode="custom". |
heartbeat | () -> None | Signals progress to reset an idle timeout (see TimeoutPolicy). No-op outside an idle-timed attempt. |
previous | Any | Functional API only — the last saved return value for this thread. |
execution_info | ExecutionInfo | None | Read-only metadata for the current node run (see below). |
server_info | ServerInfo | None | Set by LangGraph Platform only; None when running open-source. |
control | RunControl | None | Run-scoped cooperative draining handle (see below). |
To get the config instead, add config: RunnableConfig as a parameter or call get_config() from langgraph.config.
ExecutionInfo
Section titled “ExecutionInfo”from langgraph.runtime import ExecutionInforuntime.execution_info is a frozen dataclass with read-only per-run metadata. It is populated after task preparation, so it is None only in very early lifecycle hooks.
| Field | Type | Description |
|---|---|---|
checkpoint_id | str | The ULID-style checkpoint ID written by this step. |
checkpoint_ns | str | The checkpoint namespace (empty string for root; "parent:task_id" for subgraphs). |
task_id | str | The Pregel task ID for the current node invocation. |
thread_id | str | None | The thread ID from config["configurable"]["thread_id"]. None when no checkpointer. |
run_id | str | None | The LangSmith run ID, if run_id was set in RunnableConfig. |
node_attempt | int | Current attempt number (1-indexed). Increments on each retry. |
node_first_attempt_time | float | None | Unix timestamp for when the first attempt of this node started. |
from langgraph.runtime import Runtime
def audit_node(state: State, runtime: Runtime) -> dict: info = runtime.execution_info if info: print(f"thread={info.thread_id} ns={info.checkpoint_ns} attempt={info.node_attempt}") return {}execution_info also has a patch(**overrides) helper that returns a new ExecutionInfo with selected fields replaced. You will not normally need this outside testing.
Runtime.heartbeat() — resetting idle timeouts
Section titled “Runtime.heartbeat() — resetting idle timeouts”When a node is registered with timeout=TimeoutPolicy(idle_timeout=..., refresh_on="heartbeat"), the idle clock only resets on explicit runtime.heartbeat() calls. This is useful for long-running loops that do not naturally emit LangChain callback events.
import asynciofrom langgraph.runtime import Runtimefrom langgraph.types import TimeoutPolicy
async def batch_processor(state: State, runtime: Runtime) -> dict: results = [] for i, item in enumerate(state["items"]): result = await process_item(item) results.append(result) # Signal that we're still alive — prevents idle-timeout eviction runtime.heartbeat() return {"results": results}
builder.add_node( "batch", batch_processor, timeout=TimeoutPolicy(idle_timeout=30.0, refresh_on="heartbeat"),)Outside an idle-timed attempt (e.g., when timeout=None or the node is sync), runtime.heartbeat() is a no-op.
RunControl — cooperative draining
Section titled “RunControl — cooperative draining”runtime.control is a RunControl instance that lets external code signal a graceful shutdown to a running node. The node cooperates by checking runtime.drain_requested and returning early when set.
from langgraph.runtime import Runtime, RunControl
async def interruptible_worker(state: State, runtime: Runtime) -> dict: results = [] for item in state["items"]: if runtime.drain_requested: # Graceful shutdown: save progress and exit early return {"results": results, "partial": True, "reason": runtime.drain_reason} result = await process(item) results.append(result) return {"results": results, "partial": False}RunControl is populated automatically by the Pregel executor — you never create one yourself. Key properties:
| Property / Method | Description |
|---|---|
drain_requested: bool | True after request_drain() has been called. |
drain_reason: str | None | The string passed to request_drain() (e.g. "shutdown"). |
request_drain(reason="shutdown") | Called externally to signal the node to exit. |
The runtime.drain_requested and runtime.drain_reason convenience properties forward to runtime.control (or return False / None if control is None).
State schema: TypedDict vs Pydantic vs dataclass
Section titled “State schema: TypedDict vs Pydantic vs dataclass”All three work as state_schema. Since v1.1, invoke() coerces input dicts into the declared type before calling nodes.
from pydantic import BaseModel
class State(BaseModel): counter: int = 0
graph.invoke({"counter": 0})# Nodes receive State(counter=0) — a real Pydantic instance.Annotated[..., reducer] works the same across all three schema styles. For Pydantic, use Field(default_factory=...) if the default depends on call time.
Patterns
Section titled “Patterns”1. Fan-out / map-reduce with Send
Section titled “1. Fan-out / map-reduce with Send”from langgraph.types import Send
def dispatch(state: State) -> list[Send]: return [Send("worker", {"item": i}) for i in state["items"]]
builder.add_conditional_edges("planner", dispatch)builder.add_node("worker", worker_fn)builder.add_edge("worker", "aggregate")builder.add_edge(["planner", "worker"], "aggregate") # barrier: wait for all workers2. Deferred finalization
Section titled “2. Deferred finalization”builder.add_node("summarize", summarize_fn, defer=True)builder.add_edge(START, "research")builder.add_edge("research", "write")builder.add_edge("write", "summarize")# `summarize` runs last, after every other task in the run has drained.3. Per-node retry on transient HTTP errors
Section titled “3. Per-node retry on transient HTTP errors”import httpxfrom langgraph.types import RetryPolicy
builder.add_node( "fetch", fetch_fn, retry_policy=RetryPolicy( max_attempts=5, retry_on=(httpx.TransportError, httpx.HTTPStatusError), ),)4. Cached expensive step
Section titled “4. Cached expensive step”from langgraph.cache.memory import InMemoryCachefrom langgraph.types import CachePolicy
builder.add_node("embed", embed_fn, cache_policy=CachePolicy(ttl=3600))graph = builder.compile(cache=InMemoryCache(), checkpointer=InMemorySaver())5. Narrow node input with input_schema
Section titled “5. Narrow node input with input_schema”class QueryOnly(TypedDict): query: str
def classify(state: QueryOnly) -> dict: return {"category": "billing" if "bill" in state["query"] else "other"}
builder.add_node("classify", classify, input_schema=QueryOnly)The node cannot read unrelated channels and stays cheap to trace.
6. Async node with idle timeout + heartbeat
Section titled “6. Async node with idle timeout + heartbeat”For nodes that stream from an LLM or process long lists, use idle_timeout so stalled attempts are cancelled before the hard wall-clock cap fires:
import asynciofrom langgraph.types import TimeoutPolicy, RetryPolicyfrom langgraph.runtime import Runtimefrom typing_extensions import TypedDict
class BatchState(TypedDict): items: list[str] results: list[str]
async def process_batch(state: BatchState, runtime: Runtime) -> dict: results = [] for item in state["items"]: # Expensive per-item work result = await asyncio.to_thread(expensive_cpu_work, item) results.append(result) # Reset the idle clock after each item runtime.heartbeat() return {"results": results}
builder.add_node( "process", process_batch, # Abort if no progress for 20 s; retry up to 3 times timeout=TimeoutPolicy(idle_timeout=20.0, refresh_on="heartbeat"), retry_policy=RetryPolicy(max_attempts=3, retry_on=asyncio.TimeoutError),)7. Reading ExecutionInfo for per-node tracing
Section titled “7. Reading ExecutionInfo for per-node tracing”Correlate a node’s LangSmith run with your own observability system using execution_info:
import loggingfrom langgraph.runtime import Runtimefrom typing_extensions import TypedDict
logger = logging.getLogger(__name__)
class State(TypedDict): query: str answer: str
def traced_node(state: State, runtime: Runtime) -> dict: info = runtime.execution_info span_attrs = { "thread_id": info.thread_id if info else None, "checkpoint_id": info.checkpoint_id if info else None, "attempt": info.node_attempt if info else 1, } logger.info("node start", extra=span_attrs) answer = call_llm(state["query"]) logger.info("node done", extra={**span_attrs, "tokens": len(answer)}) return {"answer": answer}8. Pydantic state schema
Section titled “8. Pydantic state schema”StateGraph accepts a Pydantic BaseModel as its state schema. Since v1.1, invoke() coerces dict inputs to the model before nodes run, so nodes receive a typed instance.
import operatorfrom typing import Annotatedfrom pydantic import BaseModel, Fieldfrom langgraph.graph import StateGraph, START, END
class DocState(BaseModel): text: str = "" word_count: int = 0 tags: Annotated[list[str], operator.add] = Field(default_factory=list)
def count_words(state: DocState) -> dict: # `state` is a real DocState instance — full type safety + validation count = len(state.text.split()) return {"word_count": count}
def tag_document(state: DocState) -> dict: tag = "long" if state.word_count > 50 else "short" return {"tags": [tag]} # operator.add appends to existing list
builder = StateGraph(DocState)builder.add_node("count", count_words)builder.add_node("tag", tag_document)builder.add_edge(START, "count")builder.add_edge("count", "tag")builder.add_edge("tag", END)
graph = builder.compile()
# Dict input is coerced to DocState before nodes runresult = graph.invoke({"text": "LangGraph builds stateful multi-actor apps with LLMs."})print(result) # {'text': '...', 'word_count': 8, 'tags': ['short']}9. Narrowing I/O with input_schema and output_schema
Section titled “9. Narrowing I/O with input_schema and output_schema”input_schema restricts what callers must pass; output_schema restricts what the graph returns. Internal channels that are not in output_schema are stripped from the final result.
from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, END
class FullState(TypedDict): raw_text: str cleaned_text: str word_count: int _internal_flag: bool # should never be visible externally
class UserInput(TypedDict): raw_text: str # callers only need to provide raw text
class UserOutput(TypedDict): cleaned_text: str # callers only see the cleaned result word_count: int
def clean(state: FullState) -> dict: return { "cleaned_text": state["raw_text"].strip().lower(), "_internal_flag": True, }
def count(state: FullState) -> dict: return {"word_count": len(state["cleaned_text"].split())}
builder = StateGraph(FullState, input_schema=UserInput, output_schema=UserOutput)builder.add_node("clean", clean)builder.add_node("count", count)builder.add_edge(START, "clean")builder.add_edge("clean", "count")builder.add_edge("count", END)
graph = builder.compile()
# Caller provides only UserInput fieldsresult = graph.invoke({"raw_text": " Hello World "})print(result) # {'cleaned_text': 'hello world', 'word_count': 2}# '_internal_flag' is NOT present — stripped by output_schema10. Chaining nodes with add_sequence
Section titled “10. Chaining nodes with add_sequence”add_sequence wires a list of nodes sequentially: node[0] → node[1] → … → node[n]. It uses each callable’s __name__ as the node name; pass (name, fn) tuples for explicit names.
from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, END
class Pipeline(TypedDict): text: str tokens: list[str] normalized: list[str] result: str
def tokenize(state: Pipeline) -> dict: return {"tokens": state["text"].split()}
def normalize(state: Pipeline) -> dict: return {"normalized": [t.lower() for t in state["tokens"]]}
def join_result(state: Pipeline) -> dict: return {"result": " ".join(state["normalized"])}
builder = StateGraph(Pipeline)builder.add_edge(START, "tokenize")# add_sequence registers the three nodes and connects them in orderbuilder.add_sequence([tokenize, normalize, join_result])builder.add_edge("join_result", END)
graph = builder.compile()print(graph.invoke({"text": "Hello World from LangGraph", "tokens": [], "normalized": [], "result": ""}))# {'text': 'Hello World from LangGraph', 'tokens': [...], 'normalized': [...], 'result': 'hello world from langgraph'}You can also mix plain callables and (name, fn) tuples:
builder.add_sequence([ ("load", load_fn), # explicit name transform_fn, # uses transform_fn.__name__ ("save", save_fn), # explicit name])11. Streaming custom events with StreamWriter
Section titled “11. Streaming custom events with StreamWriter”Nodes can push arbitrary values to stream_mode="custom" by declaring a writer: StreamWriter parameter:
from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import StreamWriterfrom langgraph.checkpoint.memory import InMemorySaver
class State(TypedDict): items: list[str] processed: list[str]
def batch_process(state: State, writer: StreamWriter) -> dict: """Process items one at a time, streaming progress to the caller.""" results = [] for i, item in enumerate(state["items"]): # Emit a progress event — only visible with stream_mode="custom" writer({"progress": i + 1, "total": len(state["items"]), "item": item}) results.append(item.upper()) return {"processed": results}
builder = StateGraph(State)builder.add_node("process", batch_process)builder.add_edge(START, "process")builder.add_edge("process", END)
graph = builder.compile(checkpointer=InMemorySaver())cfg = {"configurable": {"thread_id": "stream-1"}}
# Receive both state updates and custom eventsfor mode, data in graph.stream( {"items": ["a", "b", "c"], "processed": []}, cfg, stream_mode=["updates", "custom"],): print(f"{mode}: {data}")# custom: {'progress': 1, 'total': 3, 'item': 'a'}# custom: {'progress': 2, 'total': 3, 'item': 'b'}# custom: {'progress': 3, 'total': 3, 'item': 'c'}# updates: {'process': {'processed': ['A', 'B', 'C']}}Gotchas
Section titled “Gotchas”- Two writes, no reducer, one super-step →
InvalidUpdateError. Either add a reducer or stagger the writes with edges. checkpointer=Nonedisables every feature that depends on persistence:interrupt(),get_state,update_state,get_state_history, time travel, thread-scoped memory. UseInMemorySaver()while developing.config_schema=is deprecated, but still accepted. Rename tocontext_schema=before v2.0.AgentState/AgentStatePydanticinlanggraph.prebuiltare deprecated in v1.0 — they now live inlangchain.agents.create_react_agentinlanggraph.prebuiltis deprecated in v1.0 — migrate tolangchain.agents.create_agent. The signature here still works; the deprecation is runtime-warning level.- Root graphs cannot have
checkpointer=True. That value is only for subgraphs inheriting from the parent. destinations=does not route — it only labels edges in the rendered diagram for nodes that returnCommand(goto=...).TimeoutPolicyonly works on async nodes. Settingtimeout=on a synchronous node raisesValueErrorat node registration. Convert the node toasyncor wrap it withasyncio.to_thread.runtime.heartbeat()is a no-op withoutrefresh_on="heartbeat". Underrefresh_on="auto"(default), progress is detected automatically from callbacks and stream writes; callingheartbeat()is still valid but redundant.runtime.execution_infoisNonebriefly during startup. Don’t access it in lifecycle hooks that run before task preparation.RunControlis populated automatically. You cannot construct or inject your ownRunControl— it is owned by the executor and forwarded throughRuntime.control.
Breaking changes
Section titled “Breaking changes”| Version | Change |
|---|---|
| 1.2 | TimeoutPolicy dataclass introduced with run_timeout, idle_timeout, refresh_on. Runtime.heartbeat() added. RunControl and cooperative draining added (runtime.control, runtime.drain_requested, runtime.drain_reason). ExecutionInfo extended with checkpoint_ns and task_id fields. |
| 1.1 | invoke()/stream() coerce input dicts into the declared state schema for Pydantic/dataclass. V2 stream mode emits typed StreamPart dicts. Python 3.9 dropped. |
| 1.0 | AgentState, AgentStatePydantic, create_react_agent deprecated in favor of langchain.agents.create_agent. ns, when, resumable, interrupt_id removed from Interrupt (in v0.6). |
| 0.6 | config_schema on StateGraph deprecated; use context_schema. Runtime[Ctx] replaces ad-hoc config["configurable"] usage for run context. |
| 0.5 | input / output kwargs on StateGraph.__init__ deprecated; use input_schema / output_schema. |