Command, Send & control flow — API reference
Command, Send & control flow — API reference
Section titled “Command, Send & control flow — API reference”Verified against langgraph==1.2.4 (module: langgraph.types).
LangGraph’s control flow primitives live in langgraph.types:
| Symbol | Purpose |
|---|---|
Command(update, goto, resume, graph) | Update state and/or jump to another node and/or resume an interrupt — all in one return value from a node. |
Send(node, arg) | Dispatch a node with custom state; used from conditional edges for fan-out and from Command.goto for dynamic routing. |
interrupt(value) | Pause the current task and surface value to the client (resume with Command(resume=...)). |
Overwrite(value) | Write directly to a reducing channel, bypassing the reducer. |
Interrupt(value, id) | The dataclass surfaced inside StateSnapshot.interrupts (v1.1: value and id only; older attributes ns, when, resumable were removed in v0.6). |
Minimal runnable example
Section titled “Minimal runnable example”from typing import Literalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Command
class State(TypedDict): messages: list[str] next: str
def planner(state: State) -> Command[Literal["writer", "critic", "__end__"]]: if len(state["messages"]) >= 3: return Command(goto=END) if state["messages"] and state["messages"][-1].startswith("draft"): return Command(update={"next": "critique"}, goto="critic") return Command(update={"next": "write"}, goto="writer")
def writer(state: State) -> Command[Literal["planner"]]: return Command(update={"messages": state["messages"] + ["draft v1"]}, goto="planner")
def critic(state: State) -> Command[Literal["planner"]]: return Command(update={"messages": state["messages"] + ["critique"]}, goto="planner")
builder = StateGraph(State)builder.add_node("planner", planner)builder.add_node("writer", writer)builder.add_node("critic", critic)builder.add_edge(START, "planner")
graph = builder.compile()print(graph.invoke({"messages": [], "next": ""}))Notes:
- No
add_edgefromplannertowriter/critic/END— the node returnsCommand(goto=...). Declaredestinations={"writer", "critic"}onadd_nodeonly for diagram purposes. - Type-hinting the return as
Command[Literal["writer", "critic", "__end__"]]keeps the Mermaid visualization accurate.
Command in full
Section titled “Command in full”@dataclass(frozen=True, kw_only=True, slots=True)class Command(Generic[N], ToolOutputMixin): graph: str | None = None # target graph ("__parent__" for Command.PARENT) update: Any | None = None # state update (dict, dataclass, Pydantic, tuple list, scalar) resume: dict[str, Any] | Any | None = None goto: Send | Sequence[Send | N] | N = () PARENT: ClassVar[Literal["__parent__"]] = "__parent__"Any subset of update, resume, goto, graph can be set. When a node returns Command(update={...}) without goto, it behaves like returning a dict — the graph’s edges decide where to go next.
update
Section titled “update”update accepts the same shapes as a normal node return:
dict— keys are channel names.- A list of
(channel, value)tuples. - A Pydantic model / dataclass matching the state schema.
- A scalar — written to the
__root__channel when the state has a root channel.
Reducers apply as usual; wrap a value in Overwrite(...) to bypass them.
Command(goto="next_node") # singleCommand(goto=["fan_out_a", "fan_out_b"]) # multiple (unrelated to Send fan-out)Command(goto=Send("worker", {"item": x})) # dispatch a node with custom inputCommand(goto=[Send("w", {"i": i}) for i in xs]) # fan-out with SendsSpecial values:
END→ terminate this execution path.- A node name not in the graph raises
ValueErrorat runtime. - Mixing
Sendand plain names in the same list is allowed.
resume
Section titled “resume”Used to resume from an interrupt(). Two shapes:
Command(resume="a single value") # next interrupt gets this valueCommand(resume={"interrupt-id-1": "v1", "interrupt-id-2": "v2"})# address by interrupt idSee the interrupt() section below.
graph / Command.PARENT
Section titled “graph / Command.PARENT”Command(graph=Command.PARENT, goto="retry", update={"reason": "timeout"})From inside a subgraph node, this routes the command to the parent graph — useful for bubbling an error or a handoff signal up to a supervisor.
class Send: node: str arg: Any timeout: TimeoutPolicy | None # added in 1.2.x
def __init__( self, /, node: str, arg: Any, *, timeout: float | timedelta | TimeoutPolicy | None = None, ) -> None: self.node = node self.arg = arg self.timeout = TimeoutPolicy.coerce(timeout) # normalised to TimeoutPolicy | NoneSend packages a node name and a custom state payload. Two places accept it:
- Conditional edges: return one or more
Sends from thepathcallable. Command.goto: returnCommand(goto=Send("worker", {...}))from a node.
The receiving node runs with the provided arg as its state snapshot for this task. The node is a concrete named node; the sent state can be any subset of the node’s input schema.
Equality is structural (node + arg + timeout), and Send is hashable.
Per-task timeout on Send
Section titled “Per-task timeout on Send”The timeout parameter overrides the target node’s default timeout for this specific dispatched task. Pass a float (seconds), timedelta, or TimeoutPolicy directly:
import operatorfrom datetime import timedeltafrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Send, TimeoutPolicy
class BatchState(TypedDict): jobs: list[dict] results: Annotated[list[str], operator.add]
class JobState(TypedDict): id: str payload: str priority: str
def dispatch(state: BatchState) -> list[Send]: """Fan-out jobs; high-priority jobs get a 10-second hard cap, normal gets 60.""" sends = [] for job in state["jobs"]: timeout = ( 10.0 # high-priority: 10 s hard cap if job["priority"] == "high" else timedelta(seconds=60) # normal: 60 s hard cap ) sends.append(Send("run_job", job, timeout=timeout)) return sends
def run_job(state: JobState) -> dict: return {"results": [f"done:{state['id']}"]}
builder = StateGraph(BatchState)builder.add_node("run_job", run_job)builder.add_conditional_edges(START, dispatch)builder.add_edge("run_job", END)graph = builder.compile()For fine-grained control (both run and idle caps), pass a TimeoutPolicy:
sends = [ Send( "expensive_node", {"item": item}, timeout=TimeoutPolicy(run_timeout=60.0, idle_timeout=10.0), ) for item in items]interrupt()
Section titled “interrupt()”from langgraph.types import interrupt, Command
def ask(state: State) -> dict: answer = interrupt({"question": "How old are you?"}) return {"age": int(answer)}Semantics:
- First execution inside a node raises a
GraphInterruptcontaining anInterrupt(value, id). The graph pauses; theInterruptshows up inStateSnapshot.interruptsand in the__interrupt__key emitted onstream_mode="updates". - The client resumes with
graph.invoke(Command(resume="42"), cfg). The node re-runs from the top, this timeinterrupt(...)returns"42". - Multiple
interrupt()calls in one node are matched by order in the current task. Resume values scope to the task, not the graph. - A checkpointer is required. Without one,
interrupt()raises with no way to resume.
Resume by id when a node has several interrupts:
from langgraph.types import Commandcfg = {"configurable": {"thread_id": "t"}}# From the streaming output, you saw:# __interrupt__ = (Interrupt(value=..., id='abc'), Interrupt(value=..., id='def'))graph.invoke(Command(resume={"abc": "yes", "def": "no"}), cfg)Interrupt dataclass
Section titled “Interrupt dataclass”@final@dataclass(init=False, slots=True)class Interrupt: value: Any id: strOnly value and id are supported. The deprecated interrupt_id property still exists but warns. ns, when, and resumable were removed in v0.6 — use StateSnapshot.interrupts for structural info.
Overwrite
Section titled “Overwrite”Writes a value to a reducing channel without applying the reducer:
import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.types import Overwrite
class S(TypedDict): items: Annotated[list[str], operator.add]
def reset(state: S) -> dict: return {"items": Overwrite(["start-over"])}Two Overwrites for the same channel in one super-step raise InvalidUpdateError.
Patterns
Section titled “Patterns”1. Map-reduce with Send
Section titled “1. Map-reduce with Send”from langgraph.types import Send
def dispatch(state: dict) -> list[Send]: return [Send("score", {"item": x}) for x in state["items"]]
builder.add_node("score", score_fn)builder.add_conditional_edges("dispatch", dispatch)builder.add_edge("score", "aggregate")builder.add_edge(["dispatch", "score"], "aggregate") # barrier waitscore runs once per item with its own state snapshot. Use a reducer on the downstream channel (e.g., Annotated[list, operator.add]) so results concatenate.
2. Supervisor routing without edges
Section titled “2. Supervisor routing without edges”from typing import Literalfrom langgraph.types import Command
def supervisor(state: dict) -> Command[Literal["researcher", "writer", "__end__"]]: if not state.get("notes"): return Command(goto="researcher") if not state.get("draft"): return Command(goto="writer", update={"phase": "drafting"}) return Command(goto=END)
builder.add_node("supervisor", supervisor, destinations=("researcher", "writer", END))destinations= feeds the diagram only; the supervisor’s typed return drives execution.
3. Subgraph bubbling to parent
Section titled “3. Subgraph bubbling to parent”def worker(state: dict) -> Command: if state["escalate"]: return Command( graph=Command.PARENT, goto="human_review", update={"reason": state["reason"]}, ) return Command(update={"done": True})Inside a compiled subgraph worker can hand control back to the parent graph’s human_review node while carrying state.
4. Tool-authored commands
Section titled “4. Tool-authored commands”Any @tool that returns a Command is treated as control flow by ToolNode. Example:
from langchain_core.tools import toolfrom langgraph.types import Command
@tooldef transfer_to_refunds(reason: str) -> Command: """Hand this conversation to the refunds agent.""" return Command(goto="refunds_agent", update={"transfer_reason": reason})ToolNode unpacks the Command into a state update plus goto.
5. Interrupt + resume + update
Section titled “5. Interrupt + resume + update”from langgraph.types import interrupt, Command
def approve(state): decision = interrupt({"approve?": state["proposal"]}) if decision == "yes": return Command(goto="execute", update={"approved_by": "human"}) return Command(goto="cancel")
# Client:graph.stream(initial, cfg) # emits __interrupt__graph.invoke(Command(resume="yes"), cfg) # continues into "execute"6. Send with a per-task timeout
Section titled “6. Send with a per-task timeout”Pass a timeout= to Send to cap how long an individual parallel task may run. A plain float is a hard wall-clock limit (run_timeout); pass a TimeoutPolicy for idle-based cancellation.
import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Send, TimeoutPolicy, RetryPolicy
class Scrape(TypedDict): urls: list[str] results: Annotated[list[str], operator.add]
def scrape_page(state: dict) -> dict: """Scrape one URL — runs per-Send with its own timeout.""" url = state["url"] # ... real HTTP fetch here ... return {"results": [f"content:{url}"]}
def dispatch(state: Scrape) -> list[Send]: return [ Send( "scrape_page", {"url": url, "results": []}, # Each individual task gets 10 s wall-clock; retry up to 2 extra times timeout=10.0, ) for url in state["urls"] ]
builder = StateGraph(Scrape)builder.add_node( "scrape_page", scrape_page, retry_policy=RetryPolicy(max_attempts=3, retry_on=TimeoutError),)builder.add_conditional_edges(START, dispatch)builder.add_edge("scrape_page", END)
graph = builder.compile()result = graph.invoke({"urls": ["https://a.com", "https://b.com"], "results": []})print(result["results"])7. Complete multi-agent handoff with Command.PARENT
Section titled “7. Complete multi-agent handoff with Command.PARENT”A supervisor graph runs two specialised subgraphs. Each subgraph can escalate back to the supervisor using Command(graph=Command.PARENT, goto="supervisor").
import operatorfrom typing import Annotated, Literalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Commandfrom langgraph.checkpoint.memory import InMemorySaver
# ── Shared state used by both the parent and subgraphs ──────────────────────
class SharedState(TypedDict): task: str output: str escalated: bool
# ── Subgraph A: researcher ──────────────────────────────────────────────────
def researcher_node(state: SharedState) -> Command[Literal["__end__"]]: # Simulate research; escalate if topic is too complex if "complex" in state["task"]: return Command( graph=Command.PARENT, # send to the parent supervisor goto="supervisor", update={"escalated": True, "output": "Research: escalated — topic too complex"}, ) return Command( update={"output": f"Research done: {state['task']}"}, goto=END, )
researcher = ( StateGraph(SharedState) .add_node("researcher_node", researcher_node, destinations=["__end__"]) .add_edge(START, "researcher_node") .compile())
# ── Subgraph B: writer ──────────────────────────────────────────────────────
def writer_node(state: SharedState) -> dict: return {"output": f"Draft written for: {state['task']}"}
writer = ( StateGraph(SharedState) .add_node("writer_node", writer_node) .add_edge(START, "writer_node") .add_edge("writer_node", END) .compile())
# ── Parent supervisor ───────────────────────────────────────────────────────
class SupervisorState(SharedState): phase: str
def supervisor(state: SupervisorState) -> Command[Literal["researcher", "writer", "__end__"]]: if state.get("escalated"): # Researcher escalated — handle manually and finish return Command( update={"output": "Supervisor resolved escalation.", "phase": "done"}, goto=END, ) if state["phase"] == "start": return Command(update={"phase": "research"}, goto="researcher") if state["phase"] == "research": return Command(update={"phase": "write"}, goto="writer") return Command(goto=END)
parent = StateGraph(SupervisorState)parent.add_node("supervisor", supervisor, destinations=["researcher", "writer", "__end__"])parent.add_node("researcher", researcher)parent.add_node("writer", writer)parent.add_edge(START, "supervisor")
graph = parent.compile(checkpointer=InMemorySaver())cfg = {"configurable": {"thread_id": "multi-1"}}
# Normal taskresult = graph.invoke({"task": "Write about LangGraph", "output": "", "escalated": False, "phase": "start"}, cfg)print(result["output"]) # "Draft written for: Write about LangGraph"
# Complex task triggers escalation from researcher → parent supervisorcfg2 = {"configurable": {"thread_id": "multi-2"}}result2 = graph.invoke({"task": "complex quantum theory", "output": "", "escalated": False, "phase": "start"}, cfg2)print(result2["output"]) # "Supervisor resolved escalation."8. Fan-out then fan-in with Send and a barrier edge
Section titled “8. Fan-out then fan-in with Send and a barrier edge”Send from a conditional edge fans out to N parallel tasks. A barrier edge (add_edge(["source1", "source2"], "target")) waits for all of them before running the aggregation node.
import operatorfrom typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Send
class Pipeline(TypedDict): documents: list[str] scores: Annotated[list[dict], operator.add] # accumulates results from all workers
def score_document(state: dict) -> dict: """Score one document — runs once per Send.""" doc = state["document"] score = len(doc) / 100.0 # replace with a real scoring call return {"scores": [{"doc": doc[:30], "score": score}]}
def aggregate(state: Pipeline) -> dict: avg = sum(s["score"] for s in state["scores"]) / len(state["scores"]) best = max(state["scores"], key=lambda s: s["score"]) print(f"Scored {len(state['scores'])} documents. avg={avg:.2f}, best={best['doc']!r}") return {}
builder = StateGraph(Pipeline)builder.add_node("score_document", score_document)builder.add_node("aggregate", aggregate)
# Fan out: one Send per document, all run in parallelbuilder.add_conditional_edges( START, lambda s: [Send("score_document", {"document": d, "scores": []}) for d in s["documents"]],)# Fan in: barrier waits for all score_document tasksbuilder.add_edge("score_document", "aggregate")builder.add_edge("aggregate", END)
graph = builder.compile()graph.invoke({ "documents": ["Short doc", "A much longer document with more content", "Medium length document here"], "scores": [],})Gotchas
Section titled “Gotchas”Command(goto="name")bypasses explicit edges. A node that returns a Command will follow the command’s goto even if you calledadd_edge("node", "next"). Pick one style per node.Command.gotodoes not acceptstrfor subgraph namespaces. Always use a plain node name at the current graph level; cross-graph jumps usegraph=Command.PARENT.- The type parameter on
Command[Literal[...]]is for the visualizer. It doesn’t narrow to runtime errors. Send(node, arg)ignores the main state.argis the snapshot for the target node’s run. If you need context, stuff it intoarg.- Equality compares
argtoo. TwoSend("x", {...})with unhashable dicts are hashable at theSendlevel but raise if you stick them in a set without care — dict compares structurally, hash uses tuple of(node, arg). - A node that returns
Command(graph=Command.PARENT)outside a subgraph raises. Only valid when the node runs inside a compiled subgraph used by a parent. update=in aCommandstill goes through reducers. UseOverwrite(...)in theupdatevalues if you need to replace a reducing channel.- Resuming an interrupt re-runs the node from the top. Make side effects idempotent or put them in
@tasks.
Breaking changes
Section titled “Breaking changes”| Version | Change |
|---|---|
| 1.0 | Command is the canonical way for a node/tool to return control-flow intent. Returning a dict still works for pure state updates. |
| 0.6 | Interrupt.ns, Interrupt.when, Interrupt.resumable removed. Interrupt.interrupt_id deprecated in favor of Interrupt.id. |
| 0.4 | Interrupt.id introduced as a property, supporting resume-by-id via Command(resume={id: value}). |
| 0.2.24 | RetryPolicy, CachePolicy, Interrupt first exported from langgraph.types. |