Class deep-dives Vol. 15 — Runtime, Store, Streaming & Error APIs
Class deep-dives Vol. 15 — Runtime, Store, Streaming & Error APIs
Section titled “Class deep-dives Vol. 15 — Runtime, Store, Streaming & Error APIs”Verified against langgraph==1.2.5 / langgraph-checkpoint==4.1.1 / langgraph-prebuilt==1.1.0.
Every section was written by inspecting the installed package source directly. All signatures and behaviours are drawn from the actual implementation, not documentation.
Classes covered
Section titled “Classes covered”| # | Class / symbol | Module |
|---|---|---|
| 1 | Runtime + ExecutionInfo + RunControl + ServerInfo | langgraph.runtime |
| 2 | BaseStore + Item + SearchItem | langgraph.store.base |
| 3 | GetOp + SearchOp + PutOp + ListNamespacesOp + MatchCondition | langgraph.store.base |
| 4 | IndexConfig + TTLConfig | langgraph.store.base |
| 5 | UIMessage + push_ui_message + delete_ui_message | langgraph.graph.ui |
| 6 | StreamTransformer + ProtocolEvent | langgraph.stream._types |
| 7 | RemoteGraph | langgraph.pregel.remote |
| 8 | NodeError + NodeTimeoutError + NodeCancelledError + GraphDrained | langgraph.errors |
| 9 | IsLastStep + RemainingSteps | langgraph.managed.is_last_step |
| 10 | HumanResponse | langgraph.prebuilt.interrupt |
1 · Runtime + ExecutionInfo + RunControl + ServerInfo
Section titled “1 · Runtime + ExecutionInfo + RunControl + ServerInfo”Module: langgraph.runtime
Import:
from langgraph.runtime import Runtime, ExecutionInfo, RunControl, ServerInfo, get_runtimeAdded in v0.6.0, Runtime is the unified injection point for everything a node needs beyond its state slice: typed context, a cross-thread store, a stream writer, heartbeat ticks, per-attempt metadata, and cooperative drain control. Declare it as a parameter on any node function and LangGraph injects it automatically.
Source signature (1.2.5)
Section titled “Source signature (1.2.5)”@dataclass(**_DC_KWARGS)class Runtime(Generic[ContextT]): context: ContextT = field(default=None) store: BaseStore | None = field(default=None) stream_writer: StreamWriter = field(default=_no_op_stream_writer) heartbeat: Callable[[], None] = field(default=_no_op_heartbeat) previous: Any = field(default=None) execution_info: ExecutionInfo | None = field(default=None) server_info: ServerInfo | None = field(default=None) control: RunControl | None = field(default=None)ExecutionInfo is a frozen dataclass injected into runtime.execution_info:
@dataclass(frozen=True, slots=True)class ExecutionInfo: checkpoint_id: str checkpoint_ns: str task_id: str thread_id: str | None = None run_id: str | None = None node_attempt: int = 1 # 1-indexed retry counter node_first_attempt_time: float | None = NoneRunControl is a cooperative drain signal:
class RunControl: def request_drain(self, reason: str = "shutdown") -> None: ... @property def drain_requested(self) -> bool: ... @property def drain_reason(self) -> str | None: ...Example 1: Typed context + store access via Runtime
Section titled “Example 1: Typed context + store access via Runtime”from dataclasses import dataclassfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.runtime import Runtimefrom langgraph.store.memory import InMemoryStore
@dataclassclass AppContext: user_id: str locale: str = "en"
class State(TypedDict): query: str answer: str
store = InMemoryStore()# Pre-populate some user datastore.put(("users",), "alice", {"name": "Alice", "plan": "pro"})store.put(("users",), "bob", {"name": "Bob", "plan": "free"})
def lookup_and_answer(state: State, runtime: Runtime[AppContext]) -> dict: user_id = runtime.context.user_id locale = runtime.context.locale
user_item = runtime.store.get(("users",), user_id) if runtime.store else None name = user_item.value["name"] if user_item else "stranger" plan = user_item.value["plan"] if user_item else "unknown"
return {"answer": f"[{locale}] Hello {name} ({plan} plan): {state['query']}"}
graph = ( StateGraph(State, context_schema=AppContext) .add_node("lookup", lookup_and_answer) .add_edge(START, "lookup") .add_edge("lookup", END) .compile(store=store))
result = graph.invoke( {"query": "What is my plan?", "answer": ""}, context=AppContext(user_id="alice", locale="en-GB"),)print(result["answer"])# [en-GB] Hello Alice (pro plan): What is my plan?Example 2: ExecutionInfo — retry-aware node logic
Section titled “Example 2: ExecutionInfo — retry-aware node logic”import asynciofrom dataclasses import dataclassfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.runtime import Runtimefrom langgraph.types import RetryPolicy
class State(TypedDict): result: str
attempt_log: list[int] = []
def flaky_node(state: State, runtime: Runtime) -> dict: info = runtime.execution_info attempt = info.node_attempt if info else 1 attempt_log.append(attempt)
if attempt < 3: raise ValueError(f"Simulated failure on attempt {attempt}")
return {"result": f"succeeded on attempt {attempt}"}
graph = ( StateGraph(State) .add_node( "flaky", flaky_node, retry=RetryPolicy(max_attempts=5), ) .add_edge(START, "flaky") .add_edge("flaky", END) .compile())
output = graph.invoke({"result": ""})print(output["result"]) # succeeded on attempt 3print(attempt_log) # [1, 2, 3]Example 3: heartbeat — keeping alive during long computation
Section titled “Example 3: heartbeat — keeping alive during long computation”import timefrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.runtime import Runtimefrom langgraph.types import TimeoutPolicy
class State(TypedDict): items_processed: int
def slow_batch(state: State, runtime: Runtime) -> dict: for i in range(10): time.sleep(0.05) # simulate work runtime.heartbeat() # resets the idle-timeout timer return {"items_processed": 10}
graph = ( StateGraph(State) .add_node( "batch", slow_batch, # Without heartbeats this would fire after 0.1 s of silence timeout=TimeoutPolicy(idle_timeout=0.2), ) .add_edge(START, "batch") .add_edge("batch", END) .compile())
result = graph.invoke({"items_processed": 0})print(result["items_processed"]) # 10Example 4: RunControl — cooperative drain on SIGTERM
Section titled “Example 4: RunControl — cooperative drain on SIGTERM”import signalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.runtime import Runtime, get_runtimefrom langgraph.types import Command
class State(TypedDict): step: int done: bool
def step_node(state: State, runtime: Runtime) -> Command: # Check if a drain was requested (e.g. SIGTERM) if runtime.drain_requested: return Command(update={"done": True}, goto=END) return Command(update={"step": state["step"] + 1})
# In production you'd wire this to SIGTERM:# signal.signal(signal.SIGTERM, lambda *_: get_runtime().control.request_drain())Example 5: get_runtime() — accessing runtime outside the node signature
Section titled “Example 5: get_runtime() — accessing runtime outside the node signature”from langgraph.runtime import get_runtime
def side_effect_node(state: dict) -> dict: runtime = get_runtime() if runtime and runtime.execution_info: print(f"Thread: {runtime.execution_info.thread_id}") print(f"Attempt: {runtime.execution_info.node_attempt}") return state2 · BaseStore + Item + SearchItem
Section titled “2 · BaseStore + Item + SearchItem”Module: langgraph.store.base
Import:
from langgraph.store.base import BaseStore, Item, SearchItemfrom langgraph.store.memory import InMemoryStore # concrete implementationBaseStore is the abstract base for cross-thread, cross-run memory. Unlike checkpoint state (which is scoped to a single thread), a store is shared across all threads. Items live in hierarchical namespaces — tuples of strings that act like a folder path.
Item fields
Section titled “Item fields”Item.namespace : tuple[str, ...] — e.g. ("users", "alice")Item.key : str — unique within namespaceItem.value : dict[str, Any] — arbitrary JSON-serialisable dataItem.created_at : datetimeItem.updated_at : datetimeSearchItem extends Item with a score: float | None field set by vector-search implementations.
Example 1: Basic CRUD with InMemoryStore
Section titled “Example 1: Basic CRUD with InMemoryStore”from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
# PUT — namespace must be non-empty, no dots in labelsstore.put(("profiles", "users"), "alice", {"name": "Alice", "role": "admin"})store.put(("profiles", "users"), "bob", {"name": "Bob", "role": "viewer"})
# GETitem = store.get(("profiles", "users"), "alice")print(item.value) # {"name": "Alice", "role": "admin"}print(item.namespace) # ("profiles", "users")print(item.key) # "alice"
# SEARCH with filterresults = store.search(("profiles", "users"), filter={"role": "admin"})print([r.key for r in results]) # ["alice"]
# DELETE — implemented as put(value=None)store.delete(("profiles", "users"), "bob")print(store.get(("profiles", "users"), "bob")) # NoneExample 2: Namespaced memory across threads
Section titled “Example 2: Namespaced memory across threads”from typing_extensions import TypedDict, Annotatedfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.graph.message import add_messagesfrom langgraph.store.memory import InMemoryStorefrom langgraph.checkpoint.memory import InMemorySaverfrom langchain_core.messages import HumanMessage, AIMessage
store = InMemoryStore()checkpointer = InMemorySaver()
class ChatState(TypedDict): messages: Annotated[list, add_messages] user_id: str
def remember(state: ChatState, store: InMemoryStore) -> dict: user_id = state["user_id"] ns = ("memories", user_id)
# recall previous facts memories = store.search(ns) context = "; ".join(m.value.get("fact", "") for m in memories)
last_user = next( (m.content for m in reversed(state["messages"]) if isinstance(m, HumanMessage)), "" )
# store a new fact about the user's last message import hashlib key = hashlib.md5(last_user.encode()).hexdigest()[:8] store.put(ns, key, {"fact": f"user said: {last_user}"})
response = f"(Remembered {len(memories)} facts). Context: {context!r}" return {"messages": [AIMessage(content=response)]}
graph = ( StateGraph(ChatState) .add_node("remember", remember) .add_edge(START, "remember") .add_edge("remember", END) .compile(checkpointer=checkpointer, store=store))
cfg = {"configurable": {"thread_id": "t1"}}graph.invoke({"messages": [HumanMessage("Hello!")], "user_id": "alice"}, cfg)result = graph.invoke({"messages": [HumanMessage("I like coffee")], "user_id": "alice"}, cfg)print(result["messages"][-1].content)# (Remembered 1 facts). Context: 'user said: Hello!'Example 3: Listing namespaces
Section titled “Example 3: Listing namespaces”from langgraph.store.memory import InMemoryStore
store = InMemoryStore()store.put(("docs", "reports", "2024"), "q1", {"title": "Q1 Report"})store.put(("docs", "reports", "2024"), "q2", {"title": "Q2 Report"})store.put(("docs", "wiki"), "home", {"title": "Home"})store.put(("cache", "embeddings"), "emb1", {"vec": [0.1, 0.2]})
# All namespaces up to depth 2namespaces = store.list_namespaces(max_depth=2)print(namespaces)# [("cache", "embeddings"), ("docs", "reports"), ("docs", "wiki")]
# Only under "docs"docs_ns = store.list_namespaces(prefix=("docs",))print(docs_ns)# [("docs", "reports", "2024"), ("docs", "wiki")]Example 4: Async store operations
Section titled “Example 4: Async store operations”import asynciofrom langgraph.store.memory import InMemoryStore
store = InMemoryStore()
async def main(): await store.aput(("async_ns",), "key1", {"value": 42}) item = await store.aget(("async_ns",), "key1") print(item.value) # {"value": 42}
results = await store.asearch(("async_ns",), filter={"value": {"$gte": 40}}) print(results[0].key) # key1
await store.adelete(("async_ns",), "key1") print(await store.aget(("async_ns",), "key1")) # None
asyncio.run(main())3 · GetOp + SearchOp + PutOp + ListNamespacesOp + MatchCondition
Section titled “3 · GetOp + SearchOp + PutOp + ListNamespacesOp + MatchCondition”Module: langgraph.store.base
Import:
from langgraph.store.base import ( GetOp, SearchOp, PutOp, ListNamespacesOp, MatchCondition, Op, Result,)BaseStore has two abstract methods — batch() (sync) and abatch() (async). All convenience methods (get, put, search, delete, list_namespaces and their async counterparts) delegate to one of these. Custom store implementations must override both. Understanding the op types lets you build high-throughput pipelines that issue many operations in a single round-trip.
Op types at a glance
Section titled “Op types at a glance”NamedTuple | Fields | Returned Result type |
|---|---|---|
GetOp | namespace, key, refresh_ttl | Item | None |
PutOp | namespace, key, value, index, ttl | None |
SearchOp | namespace_prefix, filter, limit, offset, query, refresh_ttl | list[SearchItem] |
ListNamespacesOp | match_conditions, max_depth, limit, offset | list[tuple[str, ...]] |
MatchCondition(match_type, path) is used inside ListNamespacesOp to filter returned namespaces by prefix or suffix. Wildcards ("*") are allowed in path.
Example 1: Batching multiple ops in one call
Section titled “Example 1: Batching multiple ops in one call”from langgraph.store.memory import InMemoryStorefrom langgraph.store.base import GetOp, PutOp, SearchOp
store = InMemoryStore()
# Batch puts together in one call for efficiencyput_results = store.batch([ PutOp(("inventory",), "item_a", {"qty": 10, "category": "tools"}), PutOp(("inventory",), "item_b", {"qty": 5, "category": "consumables"}),])print(put_results) # [None, None] — PutOp results are always None
# Reads must be in a separate batch — same-batch GetOps see the state# *before* the PutOps in that batch have been committed.read_results = store.batch([ GetOp(("inventory",), "item_a"), GetOp(("inventory",), "item_b"),])print(read_results[0].value) # {"qty": 10, "category": "tools"}print(read_results[1].value) # {"qty": 5, "category": "consumables"}Example 2: Compound search + list in one batch
Section titled “Example 2: Compound search + list in one batch”from langgraph.store.memory import InMemoryStorefrom langgraph.store.base import SearchOp, ListNamespacesOp, MatchCondition
store = InMemoryStore()for i in range(5): store.put(("products", "electronics"), f"prod_{i}", {"price": i * 10, "in_stock": i % 2 == 0})
results = store.batch([ SearchOp( namespace_prefix=("products",), filter={"in_stock": True}, limit=10, ), ListNamespacesOp( match_conditions=(MatchCondition(match_type="prefix", path=("products",)),), max_depth=2, ),])
items, namespaces = resultsprint([r.key for r in items]) # ["prod_0", "prod_2", "prod_4"]print(namespaces) # [("products", "electronics")]Example 3: PutOp with index=False (skip vector indexing)
Section titled “Example 3: PutOp with index=False (skip vector indexing)”from langgraph.store.memory import InMemoryStorefrom langgraph.store.base import PutOp, GetOp
store = InMemoryStore()
store.batch([ # This item will be stored but NOT indexed for semantic search PutOp(("secrets",), "api_key", {"key": "sk-xxx"}, index=False), # This item uses default indexing PutOp(("docs",), "readme", {"text": "Getting started guide"}, index=None),])
# Direct fetch still works regardless of indexingitem = store.batch([GetOp(("secrets",), "api_key")])[0]print(item.value["key"]) # sk-xxxExample 4: MatchCondition wildcards for namespace discovery
Section titled “Example 4: MatchCondition wildcards for namespace discovery”from langgraph.store.memory import InMemoryStorefrom langgraph.store.base import ListNamespacesOp, MatchCondition
store = InMemoryStore()for user in ["alice", "bob", "carol"]: store.put(("users", user, "v1"), "profile", {"name": user}) store.put(("users", user, "v2"), "profile", {"name": user, "extended": True})
# Find all namespaces ending with "v2" under any userresults = store.batch([ ListNamespacesOp( match_conditions=( MatchCondition(match_type="prefix", path=("users",)), MatchCondition(match_type="suffix", path=("v2",)), ), )])[0]
print(results)# [("users","alice","v2"), ("users","bob","v2"), ("users","carol","v2")]4 · IndexConfig + TTLConfig
Section titled “4 · IndexConfig + TTLConfig”Module: langgraph.store.base
Import:
from langgraph.store.base import IndexConfig, TTLConfigfrom langgraph.store.memory import InMemoryStoreThese two TypedDicts are the constructor-level knobs on any store implementation that supports vector search and automatic expiry.
IndexConfig fields
Section titled “IndexConfig fields”| Field | Type | Description |
|---|---|---|
dims | int | Embedding vector dimension |
embed | Embeddings | EmbeddingsFunc | AEmbeddingsFunc | str | How to embed text |
fields | list[str] | JSON-path fields to embed. ["$"] embeds the whole value (default). Omit the key to use ["$"]. |
TTLConfig fields
Section titled “TTLConfig fields”| Field | Type | Description |
|---|---|---|
refresh_on_read | bool | Whether get/search resets the expiry clock (default True) |
default_ttl | float | None | Minutes until expiry for new items. None = no expiry |
sweep_interval_minutes | int | None | How often the store purges expired items |
Example 1: Semantic search with IndexConfig
Section titled “Example 1: Semantic search with IndexConfig”from langgraph.store.memory import InMemoryStore
# Fake embedding function for illustration — replace with a real onedef embed_fn(texts: list[str]) -> list[list[float]]: # Real usage: call an embedding API here return [[float(ord(c)) / 1000 for c in text[:4].ljust(4)] for text in texts]
store = InMemoryStore( index={ "dims": 4, "embed": embed_fn, "fields": ["content"], # only embed the "content" field })
store.put(("docs",), "intro", {"content": "Getting started with LangGraph"})store.put(("docs",), "adv", {"content": "Advanced graph patterns"})store.put(("docs",), "mem", {"content": "Memory and persistence"})
# Semantic search — needs a real embed_fn to rank meaningfullyresults = store.search(("docs",), query="how to start")for r in results: print(r.key, r.score)Example 2: Field-level indexing on put
Section titled “Example 2: Field-level indexing on put”Override the store’s default fields per item using PutOp(index=[...]) or store.put(..., index=[...]):
from langgraph.store.memory import InMemoryStore
def embed_fn(texts): return [[float(ord(c)) / 1000 for c in t[:4].ljust(4)] for t in texts]
store = InMemoryStore(index={"dims": 4, "embed": embed_fn})
# Index only the summary field for this itemstore.put( ("articles",), "article_1", {"title": "LangGraph Guide", "summary": "Covers graph basics", "body": "...very long..."}, index=["summary"], # override: only embed "summary")
# Disable indexing entirely for secretsstore.put( ("internal",), "secret", {"token": "abc123"}, index=False,)Example 3: TTLConfig — automatic expiry
Section titled “Example 3: TTLConfig — automatic expiry”TTL support is adapter-specific — InMemoryStore does not support it
(supports_ttl = False). Production adapters such as AsyncPostgresStore
(from langgraph-checkpoint-postgres) do. The constructor argument is ttl
(not index) and the per-item ttl kwarg on put / aput is gated behind
supports_ttl.
# TTL requires a store adapter that sets supports_ttl = True.# Example shown with a hypothetical PostgresStore — replace with your adapter.## from langgraph_checkpoint_postgres import AsyncPostgresStore## async with AsyncPostgresStore.from_conn_string(# "postgresql://user:pass@localhost/db",# ttl={# "default_ttl": 60.0, # items expire after 60 minutes by default# "refresh_on_read": True, # reset timer on every get/search# "sweep_interval_minutes": 10, # background sweeper interval# },# ) as store:# # Use store default TTL (60 min)# await store.aput(("sessions",), "sess_1", {"user": "alice"})## # Override to 5 minutes for this item# await store.aput(("sessions",), "sess_2", {"user": "bob"}, ttl=5.0)## # This item never expires# await store.aput(("sessions",), "sess_3", {"user": "carol"}, ttl=None)
# For InMemoryStore, TTL arguments are rejected at runtime:from langgraph.store.memory import InMemoryStorestore = InMemoryStore() # no ttl= parameter acceptedprint(store.supports_ttl) # FalseExample 4: Nested field indexing with JSON-path syntax
Section titled “Example 4: Nested field indexing with JSON-path syntax”from langgraph.store.memory import InMemoryStore
def embed_fn(texts): return [[float(ord(c)) / 1000 for c in t[:4].ljust(4)] for t in texts]
store = InMemoryStore( index={ "dims": 4, "embed": embed_fn, "fields": ["$"], # embed whole document by default })
# For this item, index each message's content separatelystore.put( ("convs",), "conv_1", { "messages": [ {"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi there!"}, ] }, index=["messages[*].content"], # creates one vector per message)5 · UIMessage + push_ui_message + delete_ui_message
Section titled “5 · UIMessage + push_ui_message + delete_ui_message”Module: langgraph.graph.ui
Import:
from langgraph.graph.ui import UIMessage, RemoveUIMessage, push_ui_message, delete_ui_messageUIMessage lets nodes stream structured UI updates to a frontend in real time. The pattern works alongside stream_mode="custom" or alongside the ui state key. The helper push_ui_message() both writes to the stream (for real-time delivery) and appends to graph state (for replay).
UIMessage TypedDict fields
Section titled “UIMessage TypedDict fields”| Field | Type | Description |
|---|---|---|
type | Literal["ui"] | Discriminator |
id | str | Unique component ID (auto-generated if not provided) |
name | str | Frontend component name |
props | dict[str, Any] | Props to pass to the component |
metadata | dict[str, Any] | Framework metadata (run_id, merge flag, etc.) |
Source signature of push_ui_message
Section titled “Source signature of push_ui_message”def push_ui_message( name: str, props: dict[str, Any], *, id: str | None = None, metadata: dict[str, Any] | None = None, message: AnyMessage | None = None, state_key: str | None = "ui", merge: bool = False,) -> UIMessage: ...Example 1: Streaming a progress bar to the UI
Section titled “Example 1: Streaming a progress bar to the UI”import asynciofrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.graph.ui import UIMessage, push_ui_message, ui_message_reducer
class State(TypedDict): task: str ui: Annotated[list[UIMessage], ui_message_reducer]
def run_task(state: State) -> dict: # Push a "loading" component — visible immediately via stream msg = push_ui_message( name="progress-bar", props={"label": f"Processing: {state['task']}", "progress": 0}, )
# Simulate work in stages for pct in (25, 50, 75, 100): push_ui_message( name="progress-bar", props={"label": f"Processing: {state['task']}", "progress": pct}, id=msg["id"], # same ID → update the existing component merge=True, # merge props instead of replacing )
return {}
graph = ( StateGraph(State) .add_node("task", run_task) .add_edge(START, "task") .add_edge("task", END) .compile())
for chunk in graph.stream({"task": "data export", "ui": []}, stream_mode="custom"): print(chunk) # UIMessage dicts arrive as they're pushedExample 2: Associating a UIMessage with a LLM message
Section titled “Example 2: Associating a UIMessage with a LLM message”from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.graph.message import add_messagesfrom langgraph.graph.ui import UIMessage, push_ui_message, ui_message_reducerfrom langchain_core.messages import AIMessage
class State(TypedDict): messages: Annotated[list, add_messages] ui: Annotated[list[UIMessage], ui_message_reducer]
def agent_with_card(state: State) -> dict: response = AIMessage(content="Here is your weather forecast.")
# Associate the UI card with the AI message by passing message=response push_ui_message( name="weather-card", props={"city": "London", "temp": "15°C", "condition": "cloudy"}, message=response, # links card to this message via message_id in metadata )
return {"messages": [response]}
graph = ( StateGraph(State) .add_node("agent", agent_with_card) .add_edge(START, "agent") .add_edge("agent", END) .compile())Example 3: Removing a UI component
Section titled “Example 3: Removing a UI component”from langgraph.graph.ui import delete_ui_message
def cleanup_node(state: dict) -> dict: # Remove a previously pushed component by its ID delete_ui_message("component-uuid-1234") return {}6 · StreamTransformer + ProtocolEvent
Section titled “6 · StreamTransformer + ProtocolEvent”Module: langgraph.stream._types
Import:
from langgraph.stream._types import StreamTransformer, ProtocolEventStreamTransformer is the extension point for the v3 streaming API. Transformers register on a StreamMux and receive every ProtocolEvent (a uniform envelope around raw stream parts) before it reaches the caller. Use them to build custom projections, PII redaction, cost tracking, or moderation pipelines.
ProtocolEvent structure
Section titled “ProtocolEvent structure”class ProtocolEvent(TypedDict): type: Literal["event"] event_id: NotRequired[str] seq: NotRequired[int] # monotonic; use for ordering, not timestamp method: str # StreamMode: "values", "messages", "custom", etc. params: _ProtocolEventParams
class _ProtocolEventParams(TypedDict): namespace: list[str] timestamp: int # wall-clock ms — not monotonic data: Any interrupts: NotRequired[tuple[Any, ...]]StreamTransformer interface
Section titled “StreamTransformer interface”class StreamTransformer(ABC): requires_async: ClassVar[bool] = False supports_sync: ClassVar[bool] = False required_stream_modes: ClassVar[tuple[str, ...]] = () before_builtins: ClassVar[bool] = False
def init(self) -> dict[str, Any]: ... # return the projection dict def process(self, event: ProtocolEvent) -> bool: ... # return False to suppress async def aprocess(self, event: ProtocolEvent) -> bool: ... def finalize(self) -> None: ... # run ends normally async def afinalize(self) -> None: ... def fail(self, err: BaseException) -> None: ... async def afail(self, err: BaseException) -> None: ... def schedule(self, coro, *, on_error="log") -> asyncio.Task: ...Example 1: Counting stream events by mode
Section titled “Example 1: Counting stream events by mode”from collections import defaultdictfrom typing import Anyfrom langgraph.stream._types import StreamTransformer, ProtocolEvent
class EventCounter(StreamTransformer): """Count how many events arrive per stream mode."""
required_stream_modes = () # compatible with any modes
def init(self) -> dict[str, Any]: self._counts: dict[str, int] = defaultdict(int) return {"event_counts": self._counts}
def process(self, event: ProtocolEvent) -> bool: self._counts[event["method"]] += 1 return True # keep the event in the main log
def finalize(self) -> None: print("Stream event counts:", dict(self._counts))Example 2: Filtering sensitive keys from values events
Section titled “Example 2: Filtering sensitive keys from values events”from typing import Anyfrom langgraph.stream._types import StreamTransformer, ProtocolEvent
SENSITIVE_KEYS = frozenset({"api_key", "password", "token"})
class RedactSensitiveFields(StreamTransformer): """Remove sensitive fields from 'values' stream events before they reach callers."""
before_builtins = True # must run before built-in transformers snapshot values required_stream_modes = ("values",)
def init(self) -> dict[str, Any]: return {}
def process(self, event: ProtocolEvent) -> bool: if event["method"] == "values": data = event["params"].get("data") if isinstance(data, dict): for key in SENSITIVE_KEYS: data.pop(key, None) return TrueExample 3: Async transformer with schedule()
Section titled “Example 3: Async transformer with schedule()”import asynciofrom typing import Anyfrom langgraph.stream._types import StreamTransformer, ProtocolEvent
class AsyncCostTracker(StreamTransformer): """Log token usage to an external system after each messages event."""
requires_async = True required_stream_modes = ("messages",)
def init(self) -> dict[str, Any]: self._token_total = 0 return {}
async def aprocess(self, event: ProtocolEvent) -> bool: if event["method"] == "messages": delta = event["params"].get("data", {}) if isinstance(delta, dict): usage = delta.get("usage_metadata") or {} self._token_total += usage.get("total_tokens", 0) return True
async def afinalize(self) -> None: # Fire-and-forget log to external system self.schedule(self._log_tokens(self._token_total))
async def _log_tokens(self, total: int) -> None: await asyncio.sleep(0) # replace with real async API call print(f"Total tokens used: {total}")7 · RemoteGraph
Section titled “7 · RemoteGraph”Module: langgraph.pregel.remote
Import:
from langgraph.pregel.remote import RemoteGraphRemoteGraph wraps the LangGraph Server API — it behaves identically to a local CompiledStateGraph but delegates all execution to a remote deployment. You can use it as a standalone runnable or embed it as a subgraph node in a local graph.
Constructor signature
Section titled “Constructor signature”RemoteGraph( assistant_id: str, # graph_id or assistant name on the server /, *, url: str | None = None, api_key: str | None = None, headers: dict[str, str] | None = None, client: LangGraphClient | None = None, sync_client: SyncLangGraphClient | None = None, config: RunnableConfig | None = None, name: str | None = None, distributed_tracing: bool = False,)Example 1: Calling a remote graph synchronously
Section titled “Example 1: Calling a remote graph synchronously”from langgraph.pregel.remote import RemoteGraph
# Replace with your LangGraph Server deployment URLremote = RemoteGraph( "my-agent", url="https://my-deployment.langsmith.app", api_key="lsv2_...",)
# invoke / stream / ainvoke / astream all work exactly like a local graphresult = remote.invoke( {"messages": [{"role": "user", "content": "Hello!"}]}, config={"configurable": {"thread_id": "thread-1"}},)print(result)Example 2: RemoteGraph as a subgraph node
Section titled “Example 2: RemoteGraph as a subgraph node”from typing_extensions import TypedDict, Annotatedfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.graph.message import add_messagesfrom langgraph.pregel.remote import RemoteGraphfrom langchain_core.messages import HumanMessage
class OrchestratorState(TypedDict): messages: Annotated[list, add_messages] delegated_result: str
# The remote specialised agentspecialist = RemoteGraph( "specialist-agent", url="https://my-deployment.langsmith.app",)
def delegate(state: OrchestratorState) -> dict: result = specialist.invoke( {"messages": state["messages"]}, config={"configurable": {"thread_id": "specialist-1"}}, ) last_msg = result["messages"][-1] return {"delegated_result": last_msg.content}
orchestrator = ( StateGraph(OrchestratorState) .add_node("delegate", delegate) .add_edge(START, "delegate") .add_edge("delegate", END) .compile())Example 3: Streaming from a remote graph
Section titled “Example 3: Streaming from a remote graph”import asynciofrom langgraph.pregel.remote import RemoteGraph
remote = RemoteGraph( "summariser", url="https://my-deployment.langsmith.app",)
async def stream_remote() -> None: async for chunk in remote.astream( {"text": "Summarise the history of computing"}, stream_mode="messages", config={"configurable": {"thread_id": "t-42"}}, ): print(chunk)
asyncio.run(stream_remote())Example 4: Passing thread state from parent to remote
Section titled “Example 4: Passing thread state from parent to remote”from langgraph.pregel.remote import RemoteGraph
remote = RemoteGraph( "tool-executor", url="https://my-deployment.langsmith.app",)
# Resume an interrupted run with `Command`from langgraph.types import Command
result = remote.invoke( Command(resume={"approved": True}), config={"configurable": {"thread_id": "thread-with-interrupt"}},)8 · NodeError + NodeTimeoutError + NodeCancelledError + GraphDrained
Section titled “8 · NodeError + NodeTimeoutError + NodeCancelledError + GraphDrained”Module: langgraph.errors
Import:
from langgraph.errors import ( NodeError, NodeTimeoutError, NodeCancelledError, GraphDrained, GraphRecursionError, InvalidUpdateError, EmptyInputError, TaskNotFound,)Error hierarchy
Section titled “Error hierarchy”Exception├── GraphBubbleUp # internal signalling base│ ├── GraphDrained # cooperative SIGTERM drain completed│ └── GraphInterrupt # interrupt() — suppressed by root│ └── [deprecated] NodeInterrupt├── GraphRecursionError(RecursionError) # recursion_limit exceeded├── InvalidUpdateError # concurrent LastValue write / bad return value├── EmptyInputError # graph received empty input├── TaskNotFound # distributed-mode task lookup failure├── NodeCancelledError # user node raised asyncio.CancelledError└── NodeTimeoutError # idle_timeout or run_timeout exceededNodeError is a dataclass (not an Exception) injected into error handler functions:
@dataclass(frozen=True, slots=True)class NodeError: node: str # name of the failed node error: BaseException # the original exceptionExample 1: Per-node error handler with NodeError
Section titled “Example 1: Per-node error handler with NodeError”from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.errors import NodeErrorfrom langgraph.types import Command
class State(TypedDict): value: int status: str
def risky_node(state: State) -> dict: if state["value"] < 0: raise ValueError(f"negative value: {state['value']}") return {"status": "ok"}
def handle_risky_error(state: State, error: NodeError) -> Command: # error.node → "risky" # error.error → ValueError("negative value: -1") print(f"Node '{error.node}' failed: {error.error}") return Command( update={"status": f"recovered: {error.error}"}, goto=END, )
graph = ( StateGraph(State) .add_node("risky", risky_node, error_handler=handle_risky_error) .add_edge(START, "risky") .add_edge("risky", END) .compile())
result = graph.invoke({"value": -1, "status": ""})print(result["status"]) # recovered: negative value: -1Example 2: Catching NodeTimeoutError
Section titled “Example 2: Catching NodeTimeoutError”TimeoutPolicy relies on asyncio cancellation — it fires reliably only when
the node is async (or uses cooperative await points). A sync node that
blocks in time.sleep() cannot be preempted by the event loop.
import asynciofrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.errors import NodeError, NodeTimeoutErrorfrom langgraph.types import Command, TimeoutPolicy
class State(TypedDict): result: str
async def slow_node(state: State) -> dict: await asyncio.sleep(10) # cooperative — can be cancelled by the timeout return {"result": "done"}
def timeout_handler(state: State, error: NodeError) -> Command: if isinstance(error.error, NodeTimeoutError): nte: NodeTimeoutError = error.error print(f"Timeout! kind={nte.kind}, elapsed={nte.elapsed:.2f}s") return Command(update={"result": "timed out"}, goto=END)
graph = ( StateGraph(State) .add_node( "slow", slow_node, error_handler=timeout_handler, timeout=TimeoutPolicy(run_timeout=0.1), ) .add_edge(START, "slow") .add_edge("slow", END) .compile())
result = asyncio.run(graph.ainvoke({"result": ""}))print(result["result"]) # timed outExample 3: GraphDrained — graceful SIGTERM handling
Section titled “Example 3: GraphDrained — graceful SIGTERM handling”import signalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.memory import InMemorySaverfrom langgraph.errors import GraphDrainedfrom langgraph.runtime import Runtimefrom langgraph.types import Command
_control_ref = None # global reference so the signal handler can reach it
class State(TypedDict): step: int
def step_node(state: State, runtime: Runtime) -> Command: global _control_ref _control_ref = runtime.control
if runtime.drain_requested: return Command(update={}, goto=END) return Command(update={"step": state["step"] + 1})
checkpointer = InMemorySaver()graph = ( StateGraph(State) .add_node("step", step_node) .add_edge(START, "step") .add_conditional_edges("step", lambda s: "step" if s["step"] < 100 else END) .compile(checkpointer=checkpointer))
def _sigterm_handler(signum, frame): if _control_ref is not None: _control_ref.request_drain("SIGTERM received")
signal.signal(signal.SIGTERM, _sigterm_handler)
# Simulate a drain after 3 steps by calling request_drain() directlyimport threading
def _drain_after_delay(): import time; time.sleep(0.01) if _control_ref: _control_ref.request_drain("simulated SIGTERM")
threading.Thread(target=_drain_after_delay, daemon=True).start()
cfg = {"configurable": {"thread_id": "drain-demo"}}try: graph.invoke({"step": 0}, cfg)except GraphDrained as e: print(f"Graph drained gracefully: {e.reason}") # Checkpoint was saved; resume with the same thread_id later print("Resume by calling graph.invoke({}, cfg) again")Example 4: GraphRecursionError — adjusting the recursion limit
Section titled “Example 4: GraphRecursionError — adjusting the recursion limit”from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.errors import GraphRecursionError
class State(TypedDict): counter: int
def count_up(state: State) -> dict: return {"counter": state["counter"] + 1}
# Unconditional self-loop — will always hit the recursion limitlooping_graph = ( StateGraph(State) .add_node("count", count_up) .add_edge(START, "count") .add_edge("count", "count") # loops forever .compile())
try: looping_graph.invoke({"counter": 0}, config={"recursion_limit": 5})except GraphRecursionError: print("Hit the recursion limit — increase it or add a termination condition")
# To raise the limit:# looping_graph.invoke({"counter": 0}, config={"recursion_limit": 100})9 · IsLastStep + RemainingSteps
Section titled “9 · IsLastStep + RemainingSteps”Module: langgraph.managed.is_last_step
Import:
from langgraph.managed.is_last_step import IsLastStep, RemainingStepsThese are type aliases backed by ManagedValue subclasses. Declare a parameter with one of these types and LangGraph injects the current loop position. They are the idiomatic way to prevent a node from exceeding the graph’s recursion_limit.
Source (1.2.5)
Section titled “Source (1.2.5)”class IsLastStepManager(ManagedValue[bool]): @staticmethod def get(scratchpad: PregelScratchpad) -> bool: return scratchpad.step == scratchpad.stop - 1
IsLastStep = Annotated[bool, IsLastStepManager]
class RemainingStepsManager(ManagedValue[int]): @staticmethod def get(scratchpad: PregelScratchpad) -> int: return scratchpad.stop - scratchpad.step
RemainingSteps = Annotated[int, RemainingStepsManager]Both are Annotated aliases: annotating a state field with them causes LangGraph to inject the value rather than reading it from the state dict.
Example 1: IsLastStep — forced termination on final step
Section titled “Example 1: IsLastStep — forced termination on final step”from typing_extensions import TypedDict, Annotatedfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.managed.is_last_step import IsLastStepfrom langchain_core.messages import HumanMessage, AIMessage
class AgentState(TypedDict): messages: Annotated[list, lambda a, b: a + b] is_last_step: IsLastStep # injected by the framework
def agent_node(state: AgentState) -> dict: if state["is_last_step"]: # Forced exit before hitting GraphRecursionError return { "messages": [AIMessage(content="I've run out of steps. Final answer: unknown.")] }
# Normal agent logic last_msg = state["messages"][-1].content return { "messages": [AIMessage(content=f"Thinking about: {last_msg}")] }
def should_continue(state: AgentState) -> str: last = state["messages"][-1] if isinstance(last, AIMessage) and "Final answer" in last.content: return END return "agent"
graph = ( StateGraph(AgentState) .add_node("agent", agent_node) .add_edge(START, "agent") .add_conditional_edges("agent", should_continue) .compile())
result = graph.invoke( {"messages": [HumanMessage(content="Keep going")]}, config={"recursion_limit": 4},)print(result["messages"][-1].content)# I've run out of steps. Final answer: unknown.Example 2: RemainingSteps — proportional work
Section titled “Example 2: RemainingSteps — proportional work”from typing_extensions import TypedDict, Annotatedfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.managed.is_last_step import RemainingStepsfrom langgraph.types import Command
class PlanState(TypedDict): plan: list[str] done: list[str] remaining_steps: RemainingSteps # injected
def execute_node(state: PlanState) -> Command: steps_left = state["remaining_steps"]
if not state["plan"] or steps_left <= 1: return Command(update={}, goto=END)
# Take only what fits in the remaining budget safe_batch = state["plan"][: max(1, steps_left - 1)] return Command( update={ "plan": state["plan"][len(safe_batch):], "done": state["done"] + safe_batch, } )
graph = ( StateGraph(PlanState) .add_node("execute", execute_node) .add_edge(START, "execute") .add_conditional_edges( "execute", lambda s: END if not s["plan"] else "execute", ) .compile())
result = graph.invoke( {"plan": ["a", "b", "c", "d", "e"], "done": []}, config={"recursion_limit": 4},)print(result["done"]) # at most recursion_limit-1 itemsExample 3: Combining IsLastStep with RetryPolicy
Section titled “Example 3: Combining IsLastStep with RetryPolicy”from typing_extensions import TypedDict, Annotatedfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.managed.is_last_step import IsLastStepfrom langgraph.types import Command, RetryPolicy
class State(TypedDict): counter: int is_last_step: IsLastStep
def loop_or_stop(state: State) -> dict | Command: if state["is_last_step"]: print(f"Stopping at counter={state['counter']} (last step)") # Must use Command(goto=END) — returning {} still routes through # keep_going which would loop again since counter < 100. return Command(goto=END) return {"counter": state["counter"] + 1}
def keep_going(state: State) -> str: return "node" if state["counter"] < 100 else END
graph = ( StateGraph(State) .add_node("node", loop_or_stop, retry=RetryPolicy(max_attempts=2)) .add_edge(START, "node") .add_conditional_edges("node", keep_going) .compile())
result = graph.invoke({"counter": 0}, config={"recursion_limit": 5})print(result["counter"]) # stops before hitting GraphRecursionError10 · HumanResponse
Section titled “10 · HumanResponse”Module: langgraph.prebuilt.interrupt
Import:
from langgraph.prebuilt.interrupt import HumanResponseHumanResponse is the structured reply that flows back into a graph when a human-in-the-loop interrupt is resumed. It lives alongside interrupt() (from langgraph.types) which is the modern replacement for the deprecated NodeInterrupt. The response type field tells your node exactly what the operator chose to do.
HumanResponse TypedDict
Section titled “HumanResponse TypedDict”class HumanResponse(TypedDict): type: Literal["accept", "ignore", "response", "edit"] args: None | str | ActionRequesttype | args | Meaning |
|---|---|---|
"accept" | None | User approved as-is |
"ignore" | None | User skipped this step |
"response" | str | User provided free-text feedback |
"edit" | ActionRequest | User modified the payload |
Example 1: Tool-approval flow
Section titled “Example 1: Tool-approval flow”from typing_extensions import TypedDict, Annotatedfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.graph.message import add_messagesfrom langgraph.types import interrupt, Commandfrom langgraph.prebuilt.interrupt import HumanResponsefrom langgraph.checkpoint.memory import InMemorySaverfrom langchain_core.messages import HumanMessage, AIMessage
class State(TypedDict): messages: Annotated[list, add_messages] approved: bool
def agent(state: State) -> dict: return { "messages": [AIMessage(content="I want to call delete_user(user_id=42)")] }
def approval_gate(state: State) -> Command: last = state["messages"][-1] response: HumanResponse = interrupt( { "question": "Approve this action?", "action": last.content, } )
if response["type"] == "accept": return Command(update={"approved": True}, goto="execute") elif response["type"] == "ignore": return Command(update={"approved": False}, goto=END) elif response["type"] == "response": # User gave feedback — add it to messages and loop back to agent feedback = response["args"] return Command( update={ "messages": [HumanMessage(content=f"Feedback: {feedback}")], "approved": False, }, goto="agent", ) else: # "edit" edited = response["args"] # ActionRequest with updated args return Command(update={"approved": True}, goto="execute")
def execute(state: State) -> dict: return {"messages": [AIMessage(content="Action executed.")]}
graph = ( StateGraph(State) .add_node("agent", agent) .add_node("approval_gate", approval_gate) .add_node("execute", execute) .add_edge(START, "agent") .add_edge("agent", "approval_gate") .add_edge("execute", END) .compile(checkpointer=InMemorySaver()))
# First invocation — graph pauses at interruptthread_cfg = {"configurable": {"thread_id": "approval-1"}}graph.invoke({"messages": [], "approved": False}, thread_cfg)
# Resume with "accept"result = graph.invoke( Command(resume=HumanResponse(type="accept", args=None)), thread_cfg,)print(result["approved"]) # TrueExample 2: "edit" response — user modifies the action
Section titled “Example 2: "edit" response — user modifies the action”from langgraph.prebuilt.interrupt import HumanResponse, ActionRequestfrom langgraph.types import Command
# When the user edits the payload, resume with an ActionRequestedited_request = ActionRequest( action="delete_user", args={"user_id": 99}, # changed from 42 to 99)
result = graph.invoke( Command( resume=HumanResponse( type="edit", args=edited_request, ) ), {"configurable": {"thread_id": "approval-2"}},)Example 3: Multi-interrupt loop — review every output
Section titled “Example 3: Multi-interrupt loop — review every output”from typing_extensions import TypedDict, Annotatedfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.graph.message import add_messagesfrom langgraph.types import interrupt, Commandfrom langgraph.prebuilt.interrupt import HumanResponsefrom langgraph.checkpoint.memory import InMemorySaverfrom langchain_core.messages import AIMessage, HumanMessage
class State(TypedDict): messages: Annotated[list, add_messages] accepted_count: int
def generate(state: State) -> dict: draft = f"Draft #{len(state['messages']) + 1}: some AI output" return {"messages": [AIMessage(content=draft)]}
def review(state: State) -> Command: last = state["messages"][-1] resp: HumanResponse = interrupt({"draft": last.content})
if resp["type"] == "accept": if state["accepted_count"] + 1 >= 3: return Command(update={"accepted_count": state["accepted_count"] + 1}, goto=END) return Command(update={"accepted_count": state["accepted_count"] + 1}, goto="generate") elif resp["type"] == "response": return Command( update={"messages": [HumanMessage(content=resp["args"])]}, goto="generate", ) return Command(goto="generate")
graph = ( StateGraph(State) .add_node("generate", generate) .add_node("review", review) .add_edge(START, "generate") .add_edge("generate", "review") .compile(checkpointer=InMemorySaver()))Summary
Section titled “Summary”| Class | Module | Use case |
|---|---|---|
Runtime | langgraph.runtime | Unified injection of context, store, heartbeat, drain control |
ExecutionInfo | langgraph.runtime | Per-attempt metadata (thread_id, checkpoint_id, attempt number) |
RunControl | langgraph.runtime | Cooperative SIGTERM drain signalling |
BaseStore | langgraph.store.base | Cross-thread, cross-run persistent key-value memory |
Item / SearchItem | langgraph.store.base | Retrieved store items (with optional similarity score) |
GetOp / PutOp / SearchOp / ListNamespacesOp | langgraph.store.base | Batch store operations for high-throughput scenarios |
IndexConfig / TTLConfig | langgraph.store.base | Store-level vector search and expiry configuration |
UIMessage + push_ui_message | langgraph.graph.ui | Stream real-time UI component updates from nodes |
StreamTransformer | langgraph.stream._types | Custom stream projection, redaction, or side-effects |
RemoteGraph | langgraph.pregel.remote | Embed a remote LangGraph Server deployment as a subgraph |
NodeError / NodeTimeoutError / GraphDrained | langgraph.errors | Per-node error handlers and graceful drain |
IsLastStep / RemainingSteps | langgraph.managed.is_last_step | Prevent GraphRecursionError in looping agents |
HumanResponse | langgraph.prebuilt.interrupt | Structured accept/ignore/edit/response from human operators |