Skip to content

Streaming modes — API reference

Verified against langgraph==1.2.2 (modules: langgraph.types, langgraph.pregel.main, langgraph.config).

Every compiled graph (both StateGraph and @entrypoint workflows) exposes:

graph.stream(input, config=None, *, stream_mode=..., version="v1" | "v2", ...)
graph.astream(input, config=None, *, stream_mode=..., version="v1" | "v2", ...)
graph.invoke(input, config=None, *, version="v1" | "v2", ...)
graph.ainvoke(input, config=None, *, version="v1" | "v2", ...)

stream_mode controls what is yielded. version controls how it is typed. The v2 API yields structured StreamPart dicts from langgraph.types; v1 yields raw values.

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
class S(TypedDict):
x: int
def step(state: S) -> dict:
return {"x": state["x"] + 1}
graph = (
StateGraph(S)
.add_node("step", step)
.add_edge(START, "step")
.add_edge("step", END)
.compile(checkpointer=InMemorySaver())
)
cfg = {"configurable": {"thread_id": "t"}}
for chunk in graph.stream({"x": 0}, cfg, stream_mode="updates"):
print(chunk)
# {'step': {'x': 1}}
for part in graph.stream({"x": 0}, cfg, stream_mode="updates", version="v2"):
# part is a typed StreamPart dict
print(part["type"], part["ns"], part["data"])
# updates () {'step': {'x': 1}}

StreamMode is a Literal union exported from langgraph.types:

StreamMode = Literal["values", "updates", "checkpoints", "tasks", "debug", "messages", "custom", "tools"]
ModeYieldsTypical use
"values"Full state after each step.Show the current state as the graph runs.
"updates"{node_name: state_update} per node per step. Interrupts come through as {"__interrupt__": (Interrupt,...)}.Activity feed; detecting __interrupt__.
"messages"(message, metadata) tuples for every LLM token emitted inside any node.Token-by-token chat UIs.
"custom"Whatever you passed to StreamWriter / get_stream_writer().Domain-specific progress events.
"checkpoints"Checkpoint payloads (config, values, metadata, next, parent_config, tasks).Audit logs, progress DBs.
"tasks"Task start / result events (id, name, input, triggers / id, name, error, interrupts, result).Observability dashboards.
"debug"All checkpoint + task events wrapped in a DebugPayload with step number and timestamp.Replacing prints while developing.
"tools"Per-tool-call start/delta/finish events.Real-time streaming of tool execution output via ToolCallTransformer.

You can also pass a list of modes. With version="v1" the iterator yields (mode, data) tuples; with version="v2" every chunk is a StreamPart and you discriminate by chunk["type"]:

# v1 list: yields (mode, data) tuples
for mode, data in graph.stream(inp, cfg, stream_mode=["updates", "messages"]):
if mode == "updates":
...
elif mode == "messages":
token, meta = data
# v2 list: yields StreamPart dicts — use chunk["type"] to discriminate
for chunk in graph.stream(inp, cfg, stream_mode=["updates", "custom"], version="v2"):
if chunk["type"] == "updates":
for node_name, state in chunk["data"].items():
print(f"Node `{node_name}` updated: {state}")
elif chunk["type"] == "custom":
print(f"Custom event: {chunk['data']}")

Set subgraphs=True to see events from inside child graphs. The leading element of each yielded tuple becomes the namespace path:

for ns, data in graph.stream(inp, cfg, stream_mode="updates", subgraphs=True):
# ns = ('parent_node:<task_id>', 'child_node:<task_id>')
...
for ns, mode, data in graph.stream(inp, cfg, stream_mode=["updates", "messages"], subgraphs=True):
...

With version="v2", ns is already a tuple on every StreamPart regardless of subgraphs=.

graph.stream(input, cfg, stream_mode="updates") # v1 (default)
graph.stream(input, cfg, stream_mode="updates", version="v2") # v2
  • v1: yields raw values. Simple to consume, but you often have to sniff types (isinstance(chunk, tuple), "__interrupt__" in chunk, etc.).
  • v2: yields StreamPart TypedDicts with type, ns, data fields. Interrupts are pulled out into ValuesStreamPart.interrupts for stream_mode="values".

The StreamPart union (from langgraph.types) is the sum of all per-mode TypedDicts:

StreamPart = (
ValuesStreamPart
| UpdatesStreamPart
| MessagesStreamPart
| CustomStreamPart
| CheckpointStreamPart
| TasksStreamPart
| DebugStreamPart
)

Each StreamPart has three guaranteed fields:

FieldTypeMeaning
typestr (mode name)Discriminator — narrow to the concrete TypedDict.
nstuple[str, ...]Namespace path (empty tuple for root graph events).
datavaries per typeThe actual payload — see per-mode sections below.
class ValuesStreamPart(TypedDict):
type: Literal["values"]
ns: tuple[str, ...]
data: OutputT # full state (dict / Pydantic / dataclass)
interrupts: tuple[Interrupt, ...]
class UpdatesStreamPart(TypedDict):
type: Literal["updates"]
ns: tuple[str, ...]
data: dict[str, Any] # {node_name: node_output}
class MessagesStreamPart(TypedDict):
type: Literal["messages"]
ns: tuple[str, ...]
data: tuple[AnyMessage, dict[str, Any]] # (message_chunk, metadata)
class CustomStreamPart(TypedDict):
type: Literal["custom"]
ns: tuple[str, ...]
data: Any # whatever StreamWriter emitted
class CheckpointStreamPart(TypedDict):
type: Literal["checkpoints"]
ns: tuple[str, ...]
data: CheckpointPayload # see "checkpoints" section
class TasksStreamPart(TypedDict):
type: Literal["tasks"]
ns: tuple[str, ...]
data: TaskPayload | TaskResultPayload
class DebugStreamPart(TypedDict):
type: Literal["debug"]
ns: tuple[str, ...]
data: DebugPayload # _DebugCheckpointPayload | _DebugTaskPayload | _DebugTaskResultPayload
async for part in graph.astream(inp, cfg, version="v2"):
match part["type"]:
case "values":
state = part["data"]
pending = part["interrupts"] # tuple[Interrupt, ...]
case "updates":
updates: dict[str, Any] = part["data"] # {node_name: output}
case "messages":
msg, meta = part["data"] # (BaseMessage, metadata dict)
case "custom":
payload = part["data"] # whatever StreamWriter wrote
case "checkpoints":
cp = part["data"] # CheckpointPayload
case "tasks":
ev = part["data"] # TaskPayload or TaskResultPayload
case "debug":
dbg = part["data"] # DebugPayload — discriminate on dbg["type"]

With v2, invoke returns a typed container instead of a dict:

from langgraph.types import GraphOutput
result: GraphOutput = graph.invoke({"x": 0}, cfg, version="v2")
print(result.value) # final state — dict / Pydantic / dataclass per state_schema
print(result.interrupts) # tuple[Interrupt, ...]

For back-compat, result["key"] still works on a GraphOutput but emits DeprecationWarning; prefer result.value["key"].

Emits the entire state after each step. For the functional API, emits exactly once at the end.

for s in graph.stream(inp, cfg, stream_mode="values"):
# v1: s is the state dict (or your state_schema instance)
print(s)

v2 shape (ValuesStreamPart):

{"type": "values", "ns": (), "data": <state>, "interrupts": (Interrupt(...),)}

Full v2 example:

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
class State(TypedDict):
count: int
message: str
def increment(state: State) -> dict:
return {"count": state["count"] + 1, "message": f"step {state['count'] + 1}"}
graph = (
StateGraph(State)
.add_node("increment", increment)
.add_edge(START, "increment")
.add_edge("increment", END)
.compile(checkpointer=InMemorySaver())
)
cfg = {"configurable": {"thread_id": "demo"}}
for part in graph.stream({"count": 0, "message": ""}, cfg, stream_mode="values", version="v2"):
print(part["type"], part["ns"])
print("state:", part["data"])
if part["interrupts"]:
print("pending interrupts:", part["interrupts"])
# values ()
# state: {'count': 1, 'message': 'step 1'}

Emits one event per node per step, keyed by node name:

{"planner": {"messages": [...], "next": "writer"}}

Interrupts show up as a sibling key "__interrupt__" whose value is a tuple of Interrupt dataclasses. Parallel nodes in the same super-step produce separate events.

Full v2 example:

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
x: int
y: int
def node_a(state: State) -> dict:
return {"x": state["x"] * 2}
def node_b(state: State) -> dict:
return {"y": state["y"] + 10}
graph = (
StateGraph(State)
.add_node("node_a", node_a)
.add_node("node_b", node_b)
.add_edge(START, "node_a")
.add_edge("node_a", "node_b")
.add_edge("node_b", END)
.compile()
)
for part in graph.stream({"x": 3, "y": 0}, stream_mode="updates", version="v2"):
# part["type"] == "updates"
# part["data"] == {"node_a": {"x": 6}} then {"node_b": {"y": 10}}
for node_name, node_output in part["data"].items():
print(f"Node `{node_name}` returned: {node_output}")
# Detect interrupts in v1 updates mode
for chunk in graph.stream(inp, cfg, stream_mode="updates"):
if "__interrupt__" in chunk:
for interrupt in chunk["__interrupt__"]:
print("Interrupt:", interrupt.value)

Yields tuples of (message, metadata) for every LLM invocation inside any node.

  • message — usually an AIMessageChunk; see langchain_core.messages. Concatenate .content for the full text.
  • metadata — dict with the following keys:
Metadata keyTypeDescription
langgraph_stepintExecution step number within the current run.
langgraph_nodestrName of the node that produced this token.
langgraph_triggerslist[str]Channel writes that caused this node to execute.
langgraph_pathtuple[str, ...]Full namespace path, including subgraph nesting.
langgraph_checkpoint_nsstrCheckpoint namespace string for this execution context.
ls_model_namestrLangSmith model name tag (if set on the LLM).
ls_providerstrLangSmith provider tag (if set on the LLM).

Wire an LLM normally and let LangGraph’s callbacks do the work:

from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
def draft(state: dict) -> dict:
return {"text": llm.invoke(state["prompt"]).content}
for msg, meta in graph.stream({"prompt": "hi"}, cfg, stream_mode="messages"):
if meta["langgraph_node"] == "draft":
print(msg.content, end="", flush=True)

Full v2 example filtering by node name:

from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o-mini")
class State(TypedDict):
topic: str
joke: str
poem: str
def write_joke(state: State):
joke_response = model.invoke(
[{"role": "user", "content": f"Write a joke about {state['topic']}"}]
)
return {"joke": joke_response.content}
def write_poem(state: State):
poem_response = model.invoke(
[{"role": "user", "content": f"Write a short poem about {state['topic']}"}]
)
return {"poem": poem_response.content}
graph = (
StateGraph(State)
.add_node(write_joke)
.add_node(write_poem)
# run both concurrently from START
.add_edge(START, "write_joke")
.add_edge(START, "write_poem")
.compile()
)
# v2: use chunk["type"] to identify messages chunks, then filter by node
for chunk in graph.stream(
{"topic": "cats"},
stream_mode="messages",
version="v2",
):
if chunk["type"] == "messages":
msg, metadata = chunk["data"]
# Only print tokens from the poem node
if msg.content and metadata["langgraph_node"] == "write_poem":
print(msg.content, end="|", flush=True)

Accessing all available metadata fields:

for chunk in graph.stream(inputs, stream_mode="messages", version="v2"):
if chunk["type"] == "messages":
msg, meta = chunk["data"]
print(f"step={meta['langgraph_step']}")
print(f"node={meta['langgraph_node']}")
print(f"triggers={meta['langgraph_triggers']}")
print(f"path={meta['langgraph_path']}")
print(f"checkpoint_ns={meta['langgraph_checkpoint_ns']}")

Write arbitrary values from inside a node using get_stream_writer() (preferred, context-var based) or by declaring stream_writer: StreamWriter as a node parameter (injection-based).

# from langgraph.types
StreamWriter = Callable[[Any], None]

LangGraph always injects a StreamWriter into nodes that declare it as a parameter, but the callable is a no-op unless stream_mode="custom" (or a list containing "custom") is active. This means you can safely leave stream_writer calls in production code without performance impact when not streaming.

Section titled “Method 1: get_stream_writer() (recommended)”

get_stream_writer() retrieves the StreamWriter via a context variable — no parameter declaration needed. Works in both StateGraph nodes and @task decorators. Requires Python 3.11+ in async contexts.

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.config import get_stream_writer
class State(TypedDict):
topic: str
joke: str
def generate_joke(state: State):
writer = get_stream_writer()
writer({"status": "thinking of a joke..."})
result = f"Why did the {state['topic']} go to school? To get a sundae education!"
writer({"status": "done"})
return {"joke": result}
graph = (
StateGraph(State)
.add_node(generate_joke)
.add_edge(START, "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode=["updates", "custom"],
version="v2",
):
if chunk["type"] == "updates":
for node_name, state in chunk["data"].items():
print(f"Node {node_name} updated: {state}")
elif chunk["type"] == "custom":
print(f"Status: {chunk['data']['status']}")

Output:

Status: thinking of a joke...
Status: done
Node generate_joke updated: {'joke': 'Why did the ice cream go to school? To get a sundae education!'}

Method 2: StreamWriter as a node parameter (injection)

Section titled “Method 2: StreamWriter as a node parameter (injection)”
from langgraph.types import StreamWriter
def my_node(state: State, stream_writer: StreamWriter) -> dict:
stream_writer({"progress": "Step 1 complete"})
# ... do work ...
stream_writer({"progress": "Step 2 complete"})
return {"result": "done"}
from langchain.tools import tool
from langgraph.config import get_stream_writer
@tool
def query_database(query: str) -> str:
"""Query the database."""
writer = get_stream_writer()
writer({"data": "Retrieved 0/100 records", "type": "progress"})
# ... perform query ...
writer({"data": "Retrieved 100/100 records", "type": "progress"})
return "some-answer"
# Consume the custom events
for chunk in graph.stream(inputs, stream_mode="custom", version="v2"):
if chunk["type"] == "custom":
print(f"{chunk['data']['type']}: {chunk['data']['data']}")
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_stream_writer
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> int:
writer = get_stream_writer()
writer("Started processing")
result = inputs["x"] * 2
writer(f"Result is {result}")
return result
config = {"configurable": {"thread_id": "abc"}}
# v1 list form: yields (mode, chunk) tuples
for mode, chunk in main.stream({"x": 5}, stream_mode=["custom", "updates"], config=config):
print(f"{mode}: {chunk}")

Output:

custom: Started processing
custom: Result is 10
updates: {'main': 10}

Outside stream_mode="custom", calls to the StreamWriter are no-ops — it’s safe to leave them in production code.

Emits a CheckpointPayload each time a checkpoint is created:

# CheckpointPayload shape
{
"config": {...}, # RunnableConfig pointer to this checkpoint
"metadata": {
"source": "loop", # "input" | "loop" | "update"
"step": 1,
"parents": {},
"run_id": "...",
"writes": {...}, # channel writes at this step
},
"values": {<state>}, # full state at this checkpoint
"next": ["writer"], # nodes scheduled to run next
"parent_config": {...}, # pointer to parent checkpoint (or None)
"tasks": [
{
"id": "...",
"name": "planner",
"result": {...},
"state": None,
"interrupts": [],
}
],
}

Requires a checkpointer; otherwise the mode yields nothing.

Full v2 example:

from langgraph.checkpoint.memory import InMemorySaver
graph = (
StateGraph(State)
# ... nodes / edges ...
.compile(checkpointer=InMemorySaver())
)
cfg = {"configurable": {"thread_id": "audit-run"}}
async for part in graph.astream(inp, cfg, stream_mode="checkpoints", version="v2"):
# part["type"] == "checkpoints"
cp = part["data"] # CheckpointPayload
print(f"step={cp['metadata']['step']}")
print(f"next={cp['next']}")
print(f"state={cp['values']}")

Two payload shapes interleaved on one stream — a TaskPayload when a task starts and a TaskResultPayload when it finishes:

# TaskPayload — emitted when a node begins executing
{
"id": "abc123", # unique task identifier
"name": "planner", # node name
"input": {...}, # task input data
"triggers": ["branch:to:planner"], # channel writes that triggered this task
}
# TaskResultPayload — emitted when a node finishes
{
"id": "abc123", # same id as the corresponding TaskPayload
"name": "planner", # node name
"error": None, # error message string, or None on success
"interrupts": [], # list of interrupt dicts, if any
"result": {"x": 1}, # channel writes returned by this node
}

Full v2 example:

for part in graph.stream(inp, cfg, stream_mode="tasks", version="v2"):
ev = part["data"] # TaskPayload | TaskResultPayload
if "triggers" in ev:
# it's a TaskPayload (task start)
print(f"[START] {ev['name']} (id={ev['id']}) triggered by {ev['triggers']}")
else:
# it's a TaskResultPayload (task end)
if ev["error"]:
print(f"[ERROR] {ev['name']}: {ev['error']}")
else:
print(f"[DONE ] {ev['name']}: result={ev['result']}")

Pair with "messages" to annotate token events with the owning task:

for chunk in graph.stream(inp, cfg, stream_mode=["tasks", "messages"], version="v2"):
if chunk["type"] == "tasks" and "triggers" not in chunk["data"]:
# TaskResultPayload
print(f"task finished: {chunk['data']['name']}")
elif chunk["type"] == "messages":
msg, meta = chunk["data"]
print(f" token from {meta['langgraph_node']}: {msg.content!r}")

Emits DebugPayload wrappers — a discriminated union of three event types, all sharing a step (int) and timestamp (ISO 8601 str):

# _DebugCheckpointPayload
{"type": "checkpoint", "step": 1, "timestamp": "2026-05-24T12:00:00Z", "payload": <CheckpointPayload>}
# _DebugTaskPayload
{"type": "task", "step": 1, "timestamp": "2026-05-24T12:00:01Z", "payload": <TaskPayload>}
# _DebugTaskResultPayload
{"type": "task_result", "step": 1, "timestamp": "2026-05-24T12:00:02Z", "payload": <TaskResultPayload>}

Full v2 example:

for part in graph.stream(inp, cfg, stream_mode="debug", version="v2"):
dbg = part["data"] # DebugPayload
match dbg["type"]:
case "checkpoint":
print(f"[step {dbg['step']}] checkpoint — next: {dbg['payload']['next']}")
case "task":
print(f"[step {dbg['step']}] task start — {dbg['payload']['name']}")
case "task_result":
r = dbg["payload"]
status = "ERROR" if r["error"] else "OK"
print(f"[step {dbg['step']}] task result — {r['name']} [{status}]")

Useful for replacing print() during development — controlled by a flag:

DEBUG = True
for part in graph.stream(
inp, cfg,
stream_mode="debug" if DEBUG else "updates",
version="v2",
):
...

stream_mode="tools" — per-tool-call streaming

Section titled “stream_mode="tools" — per-tool-call streaming”

The "tools" mode emits low-level protocol events as each tool call executes inside a ToolNode. Three event types flow through:

Event typeWhen it firesData fields
tool-startedA tool call beginstool_call_id, tool_name, input
tool-output-deltaA tool emits a partial output deltatool_call_id, delta
tool-finishedA tool call completestool_call_id, output
tool-errorA tool call raisestool_call_id, message
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START
from langgraph.graph.message import MessagesState
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers."""
return a * b
tools = [multiply]
llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)
def agent(state: MessagesState) -> dict:
return {"messages": [llm.invoke(state["messages"])]}
builder = StateGraph(MessagesState)
builder.add_node("agent", agent)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "agent")
graph = builder.compile()
for event in graph.stream(
{"messages": [("user", "What is 6 times 7?")]},
stream_mode="tools",
):
print(event)
# Example output:
# {'method': 'tools', 'params': {'namespace': (), 'data': {'event': 'tool-started', 'tool_call_id': 'tc_01', 'tool_name': 'multiply', 'input': {'a': 6, 'b': 7}}}}
# {'method': 'tools', 'params': {'namespace': (), 'data': {'event': 'tool-finished', 'tool_call_id': 'tc_01', 'output': 42}}}

Structured streaming with ToolCallTransformer

Section titled “Structured streaming with ToolCallTransformer”

ToolCallTransformer (from langgraph.prebuilt._tool_call_transformer) is a stream transformer that projects raw tools events into ToolCallStream handles — one per tool invocation. Each handle exposes the tool’s tool_call_id, tool_name, input, output_deltas (a channel of incremental results), and output (terminal).

Add it at compile time:

from langgraph.prebuilt._tool_call_transformer import ToolCallTransformer
graph = builder.compile(transformers=[ToolCallTransformer])

Then iterate run.tool_calls to get ToolCallStream objects as tools start:

# Sync iteration
with graph.stream(
{"messages": [("user", "What is 6 times 7?")]},
stream_mode="tools",
) as run:
for tool_call_stream in run.tool_calls:
print(f"Tool started: {tool_call_stream.tool_name}, id={tool_call_stream.tool_call_id}")
print(f"Input: {tool_call_stream.input}")
# Iterate output deltas in real time
for delta in tool_call_stream.output_deltas:
print(f" delta: {delta}")
print(f"Final output: {tool_call_stream.output}")
FieldTypeDescription
tool_call_idstrThe ID from the AI message’s tool call.
tool_namestrName of the tool being executed.
inputdict | NoneThe tool’s input arguments (from the tool-started event).
output_deltasStreamChannel[Any]Channel of incremental output chunks. Iterate sync or async.
outputAnyFinal output from the tool-finished event. None until complete.
errorstr | NoneError message from tool-error. None on success.
completedboolTrue once a terminal event has been seen.
import asyncio
from langgraph.prebuilt._tool_call_transformer import ToolCallTransformer
graph = builder.compile(transformers=[ToolCallTransformer])
async def stream_tools():
async with graph.astream(
{"messages": [("user", "What is 6 times 7?")]},
stream_mode="tools",
) as run:
async for tool_call_stream in run.tool_calls:
print(f"Tool: {tool_call_stream.tool_name} ({tool_call_stream.tool_call_id})")
async for delta in tool_call_stream.output_deltas:
print(f" delta: {delta}")
if tool_call_stream.error:
print(f" ERROR: {tool_call_stream.error}")
else:
print(f" output: {tool_call_stream.output}")
asyncio.run(stream_tools())
for mode, data in graph.stream(
{"messages": [("user", "Multiply 6 by 7")]},
stream_mode=["updates", "tools"],
):
if mode == "updates":
print("Node update:", data)
elif mode == "tools":
# Raw tools event dict
event_data = data["params"]["data"]
if event_data.get("event") == "tool-finished":
print("Tool finished, output:", event_data.get("output"))

On invoke / stream, set durability="sync" | "async" | "exit" to trade checkpoint-write timing against speed:

graph.stream(inp, cfg, stream_mode="updates", durability="sync")

With durability="exit" you will not see "checkpoints" events per step — only at the very end.

Same signatures, awaitable. v2 typing works the same:

async for part in graph.astream({"x": 0}, cfg, version="v2"):
if part["type"] == "messages":
msg, meta = part["data"]
print(msg.content, end="", flush=True)
async for msg, meta in graph.astream(inp, cfg, stream_mode="messages"):
if msg.content and meta["langgraph_node"] == "writer":
print(msg.content, end="", flush=True)
import json
async for mode, data in graph.astream(inp, cfg, stream_mode=["updates", "messages"]):
if mode == "updates" and "__interrupt__" in data:
yield f"event: interrupt\ndata: {json.dumps([i.value for i in data['__interrupt__']])}\n\n"
elif mode == "messages":
tok, _ = data
if tok.content:
yield f"event: token\ndata: {tok.content}\n\n"

With v2 for cleaner type discrimination:

import json
async for chunk in graph.astream(inp, cfg, stream_mode=["updates", "messages"], version="v2"):
if chunk["type"] == "updates" and "__interrupt__" in chunk["data"]:
interrupts = [i.value for i in chunk["data"]["__interrupt__"]]
yield f"event: interrupt\ndata: {json.dumps(interrupts)}\n\n"
elif chunk["type"] == "messages":
msg, _ = chunk["data"]
if msg.content:
yield f"event: token\ndata: {msg.content}\n\n"
from langgraph.config import get_stream_writer
def download(state: State) -> dict:
writer = get_stream_writer()
urls = state["urls"]
for i, url in enumerate(urls, start=1):
fetch(url)
writer({"pct": int(100 * i / len(urls)), "url": url})
return {"done": True}
# Consume progress on the caller side
for chunk in graph.stream(inp, cfg, stream_mode="custom", version="v2"):
if chunk["type"] == "custom":
pct = chunk["data"]["pct"]
print(f"\r[{'#' * (pct // 10):<10}] {pct}%", end="", flush=True)
from langgraph.types import GraphOutput
out: GraphOutput = await graph.ainvoke(inp, cfg, version="v2")
if out.interrupts:
return {"status": "awaiting_input", "prompts": [i.value for i in out.interrupts]}
return {"status": "done", "state": out.value}
async for part in graph.astream(inp, cfg, stream_mode="checkpoints", version="v2"):
cp = part["data"]
audit.write({
"run_id": cp["metadata"]["run_id"],
"step": cp["metadata"]["step"],
"next": cp["next"],
"updated": cp["metadata"].get("writes"),
})

6. Full StreamPart dispatch with all seven modes

Section titled “6. Full StreamPart dispatch with all seven modes”
from langgraph.types import (
ValuesStreamPart,
UpdatesStreamPart,
MessagesStreamPart,
CustomStreamPart,
CheckpointStreamPart,
TasksStreamPart,
DebugStreamPart,
)
ALL_MODES = ["values", "updates", "messages", "custom", "checkpoints", "tasks", "debug"]
async for part in graph.astream(inp, cfg, stream_mode=ALL_MODES, version="v2"):
match part["type"]:
case "values":
print("STATE:", part["data"], "INTERRUPTS:", part["interrupts"])
case "updates":
print("UPDATES:", part["data"])
case "messages":
msg, meta = part["data"]
print(f"TOKEN [{meta['langgraph_node']}]:", repr(msg.content))
case "custom":
print("CUSTOM:", part["data"])
case "checkpoints":
cp = part["data"]
print(f"CHECKPOINT step={cp['metadata']['step']} next={cp['next']}")
case "tasks":
ev = part["data"]
if "triggers" in ev:
print(f"TASK START: {ev['name']}")
else:
print(f"TASK END: {ev['name']} error={ev['error']}")
case "debug":
dbg = part["data"]
print(f"DEBUG [{dbg['type']}] step={dbg['step']}")

7. StreamWriter injection for library code

Section titled “7. StreamWriter injection for library code”

When building reusable graph components that should not depend on get_stream_writer()’s context-var mechanism, declare the writer as a parameter instead:

from langgraph.types import StreamWriter
def reusable_node(state: State, stream_writer: StreamWriter) -> dict:
"""Works whether called inside a graph stream or directly in tests."""
stream_writer({"event": "started"})
result = do_work(state)
stream_writer({"event": "finished", "output": result})
return {"output": result}

In unit tests, pass a no-op or a list-appending collector:

events: list = []
reusable_node({"x": 1}, stream_writer=events.append)
assert events == [{"event": "started"}, {"event": "finished", "output": ...}]
  • Default stream mode is "updates". Passing stream_mode=None inherits from the graph’s own default (which is "updates" for root graphs and "values" when invoked as a subgraph step).
  • "checkpoints" needs a checkpointer. Without one you get no events, not an error.
  • stream_mode="messages" requires callbacks. If you construct LLMs outside LangGraph and hand back messages manually, you won’t see tokens. Use the LangChain ChatModel interface inside a node so callbacks fire.
  • v2 is opt-in per call. There is no global switch. Always pass version="v2" if you want typed output; otherwise you get the legacy shape.
  • print_mode= is separate from stream_mode=. print_mode prints to stdout for debugging and does not change what stream() yields.
  • subgraphs=True changes the tuple shape. With a single mode you get (ns, data); with a list of modes you get (ns, mode, data). With version="v2" this collapses because ns is always part of the StreamPart.
  • The "__interrupt__" key only appears in "updates" v1 mode. For "values" v2, interrupts live in part["interrupts"].
  • stream_writer calls are lost outside "custom" mode. That’s intentional — but if you expected to see them, add "custom" to stream_mode.
  • print_mode is additive. Passing print_mode="updates" both prints updates and keeps whatever your stream_mode emits to the iterator.
  • get_stream_writer() requires Python 3.11+ in async contexts. In async nodes on Python 3.10 and below, declare stream_writer: StreamWriter as a parameter instead.
  • v2 multi-mode no longer yields (mode, data) tuples. When using a list of modes with version="v2", every chunk is a StreamPart dict — use chunk["type"] instead of unpacking a tuple.
  • "debug" is a superset of "checkpoints" + "tasks". Using "debug" alone in development is equivalent to receiving both, wrapped with step/timestamp context.
VersionChange
1.2.1get_stream_writer() added to langgraph.config as the preferred context-var-based alternative to parameter injection. StreamWriter injection as a node keyword argument remains supported.
1.1version="v2" on stream/astream yields typed StreamPart dicts; invoke/ainvoke with version="v2" return GraphOutput. GraphOutput[key] indexing raises DeprecationWarning. With v2, multi-mode lists yield StreamPart dicts (not (mode, data) tuples).
1.0stream_mode="tasks" split from "debug"; "checkpoints" added as its own mode. StreamMode literal type ("values" | "updates" | "checkpoints" | "tasks" | "debug" | "messages" | "custom") stabilized in langgraph.types.
0.6interrupt_before / interrupt_after on invoke/stream accept "*" for all nodes.
0.5checkpoint_during=False deprecated in favor of durability="exit".