RemoteGraph — API reference
RemoteGraph — API reference
Section titled “RemoteGraph — API reference”Verified against langgraph==1.2.4 / langgraph-sdk==0.4.2 (module: langgraph.pregel.remote).
RemoteGraph is a drop-in replacement for a compiled StateGraph that calls a LangGraph Platform deployment (LangSmith, self-hosted server, or any API that speaks the LangGraph Server spec) over HTTP. It exposes the same invoke, stream, get_state, update_state, get_state_history surface as a local graph, so you can swap a local graph for a remote one without changing the calling code.
RemoteGraph is also a valid LangGraph node: you can add it to a parent StateGraph with add_node(remote_graph), enabling local-remote hybrid architectures.
Minimal runnable example
Section titled “Minimal runnable example”from langchain_core.runnables import RunnableConfigfrom langgraph.pregel.remote import RemoteGraph
# Point at a deployment (set LANGGRAPH_API_KEY in your env, or pass api_key=)remote = RemoteGraph( "my_agent", # graph ID or assistant ID on the server url="https://my-deployment.langsmith.com",)
cfg: RunnableConfig = {"configurable": {"thread_id": "thread-1"}}result = remote.invoke({"messages": [("user", "Hello")]}, cfg)print(result["messages"][-1].content)Requires
langgraph-sdk— installed automatically withlanggraph:Terminal window pip install langgraph
Imports
Section titled “Imports”from langgraph.pregel.remote import RemoteGraph, RemoteExceptionBoth are also exported from langgraph.pregel:
from langgraph.pregel import RemoteGraphConstructor
Section titled “Constructor”RemoteGraph( assistant_id: str, # graph_id or assistant UUID on the server /, *, url: str | None = None, api_key: str | None = None, headers: dict[str, str] | None = None, client: LangGraphClient | None = None, sync_client: SyncLangGraphClient | None = None, config: RunnableConfig | None = None, name: str | None = None, distributed_tracing: bool = False,)| Parameter | Description |
|---|---|
assistant_id | The graph name or UUID registered on the server. Positional-only. |
url | HTTP base URL of the deployment. Reads LANGGRAPH_API_KEY / LANGSMITH_API_KEY / LANGCHAIN_API_KEY from the environment if api_key is not passed. |
api_key | Overrides the environment variable API key. |
headers | Extra HTTP headers forwarded on every request. |
client | Pre-built async LangGraphClient (from langgraph_sdk). Use when you need full control over connection pooling. |
sync_client | Pre-built sync SyncLangGraphClient. |
config | A RunnableConfig merged into every call. Useful for attaching a persistent thread_id or tags at construction time. |
name | Human-readable name shown in diagrams and traces. Defaults to assistant_id. |
distributed_tracing | Forward LangSmith tracing headers so server-side spans appear under the local trace. |
Provide at least one of url, client, or sync_client.
Methods
Section titled “Methods”All methods mirror the CompiledStateGraph surface:
| Method | Returns | Notes |
|---|---|---|
invoke(input, config, *, context, interrupt_before, interrupt_after, headers, params, version) | dict | GraphOutput | Blocking. version="v2" returns GraphOutput. |
ainvoke(...) | dict | GraphOutput | Async variant. |
stream(input, config, *, stream_mode, interrupt_before, interrupt_after, subgraphs, headers, params, version) | Iterator | Sync generator. |
astream(...) | AsyncIterator | Async generator. |
get_state(config, *, subgraphs, headers, params) | StateSnapshot | Requires checkpointer on server. |
aget_state(...) | StateSnapshot | Async variant. |
get_state_history(config, *, filter, before, limit, headers, params) | Iterator[StateSnapshot] | Oldest-first. |
aget_state_history(...) | AsyncIterator[StateSnapshot] | Async variant. |
update_state(config, values, as_node, *, headers, params) | RunnableConfig | Write state without running the graph. |
aupdate_state(...) | RunnableConfig | Async variant. |
bulk_update_state(config, updates) | RunnableConfig | Apply multiple super-steps at once. |
abulk_update_state(...) | RunnableConfig | Async variant. |
get_graph(config, *, xray, headers, params) | DrawableGraph | Fetch the server-side graph topology for visualization. |
aget_graph(...) | DrawableGraph | Async variant. |
with_config(config, **kwargs) | Self | Return a copy with merged config. |
invoke and ainvoke
Section titled “invoke and ainvoke”result = remote.invoke( {"messages": [("user", "Summarise this document")]}, {"configurable": {"thread_id": "t-1"}}, interrupt_before=["human_review"], # pause before this node version="v2", # returns GraphOutput instead of dict)# result.value → final state dict# result.interrupts → tuple[Interrupt, ...]# Async version:result = await remote.ainvoke( {"messages": [("user", "hello")]}, {"configurable": {"thread_id": "t-1"}},)stream and astream
Section titled “stream and astream”for chunk in remote.stream( {"messages": [("user", "hello")]}, {"configurable": {"thread_id": "t-1"}}, stream_mode="updates",): print(chunk)# Multiple modes, async:async for mode, data in remote.astream( {"messages": [("user", "hi")]}, {"configurable": {"thread_id": "t-1"}}, stream_mode=["updates", "messages"],): if mode == "messages": msg, meta = data print(msg.content, end="", flush=True)All seven stream modes work: "values", "updates", "messages", "custom", "checkpoints", "tasks", "debug".
State inspection and time-travel
Section titled “State inspection and time-travel”cfg = {"configurable": {"thread_id": "t-1"}}
# Current statesnap = remote.get_state(cfg)print(snap.values)print(snap.next) # which nodes will run nextprint(snap.interrupts) # pending Interrupt objects
# Full history (newest first)for snap in remote.get_state_history(cfg, limit=10): print(snap.metadata["step"], snap.values)
# Time-travel: resume from an older checkpointold_cfg = snap.config # config pointer from historyresult = remote.invoke( Command(resume="yes"), old_cfg,)Interrupt / resume over a remote graph
Section titled “Interrupt / resume over a remote graph”The pattern is identical to a local graph — interrupt() on the server pauses execution and the Interrupt surfaces in StateSnapshot.interrupts and the __interrupt__ key of stream_mode="updates".
from langgraph.types import Command, interrupt
cfg = {"configurable": {"thread_id": "t-human"}}
# First call — triggers an interrupt on the serverfor chunk in remote.stream({"query": "Analyse financials"}, cfg): if "__interrupt__" in chunk: question = chunk["__interrupt__"][0].value print("Server asks:", question)
# Resume with the human's answerresult = remote.invoke(Command(resume="Approve"), cfg)print(result)Using RemoteGraph as a subgraph node
Section titled “Using RemoteGraph as a subgraph node”Add a RemoteGraph directly to a parent StateGraph just like any compiled subgraph:
from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.graph.message import add_messages, MessagesStatefrom langgraph.pregel.remote import RemoteGraphfrom typing import Annotatedfrom langchain_core.messages import AnyMessage
# Remote specialistsummariser = RemoteGraph( "summariser_agent", url="https://my-deployment.langsmith.com", name="summariser", # appears as node label in diagrams)
class OrchestratorState(TypedDict): messages: Annotated[list[AnyMessage], add_messages]
def prepare(state: OrchestratorState) -> dict: return {}
# Wire the remote graph as a nodebuilder = StateGraph(OrchestratorState)builder.add_node("prepare", prepare)builder.add_node("summariser", summariser) # RemoteGraph as nodebuilder.add_edge(START, "prepare")builder.add_edge("prepare", "summariser")builder.add_edge("summariser", END)
graph = builder.compile()result = graph.invoke({"messages": [("user", "Summarise: ...")]})When used as a node, RemoteGraph forwards the parent’s config (including thread_id) automatically. The subgraph uses checkpointer=True semantics — it inherits the parent’s checkpointing scope.
Distributed tracing with LangSmith
Section titled “Distributed tracing with LangSmith”Set distributed_tracing=True to propagate the LangSmith trace from your local process into the remote deployment, so all spans appear under a single trace:
remote = RemoteGraph( "my_agent", url="https://my-deployment.langsmith.com", distributed_tracing=True,)Under the hood, RemoteGraph adds X-Langsmith-Trace headers to every request when a LangSmith RunTree is active.
with_config — partial application
Section titled “with_config — partial application”Bind defaults at construction time to avoid repeating config on every call:
base = RemoteGraph("my_agent", url="https://my-deployment.langsmith.com")
# Create a per-user handle that always routes to the same threadalice_graph = base.with_config({"configurable": {"thread_id": "alice"}})
# All calls go to alice's threadalice_graph.invoke({"messages": [("user", "hi")]})alice_graph.invoke({"messages": [("user", "follow up")]})RemoteException
Section titled “RemoteException”When the server returns an error, RemoteGraph wraps it in RemoteException:
from langgraph.pregel.remote import RemoteException
try: remote.invoke({"messages": []}, cfg)except RemoteException as e: print("Server error:", e)except Exception as e: print("Network or auth error:", e)Patterns
Section titled “Patterns”1. Async streaming with token-level output
Section titled “1. Async streaming with token-level output”import asynciofrom langgraph.pregel.remote import RemoteGraph
remote = RemoteGraph("chat_agent", url="https://my-deployment.langsmith.com")
async def chat(user_input: str, thread_id: str) -> None: cfg = {"configurable": {"thread_id": thread_id}} async for mode, data in remote.astream( {"messages": [("user", user_input)]}, cfg, stream_mode=["updates", "messages"], ): if mode == "messages": msg, meta = data if msg.content: print(msg.content, end="", flush=True) elif mode == "updates" and "__interrupt__" in data: print("\n[Waiting for approval]")
asyncio.run(chat("Tell me about LangGraph", "u-123"))2. Parallel remote calls with asyncio.gather
Section titled “2. Parallel remote calls with asyncio.gather”import asynciofrom langgraph.pregel.remote import RemoteGraph
remote = RemoteGraph("analyser", url="https://my-deployment.langsmith.com")
async def analyse_all(documents: list[str]) -> list[dict]: tasks = [ remote.ainvoke( {"document": doc}, {"configurable": {"thread_id": f"doc-{i}"}}, ) for i, doc in enumerate(documents) ] return await asyncio.gather(*tasks)
results = asyncio.run(analyse_all(["doc1...", "doc2...", "doc3..."]))3. Human-in-the-loop orchestrator over a remote graph
Section titled “3. Human-in-the-loop orchestrator over a remote graph”from langgraph.types import Commandfrom langgraph.pregel.remote import RemoteGraph
remote = RemoteGraph("approval_flow", url="https://my-deployment.langsmith.com")cfg = {"configurable": {"thread_id": "approval-1"}}
def run_with_approval(initial_input: dict) -> dict: state = remote.invoke(initial_input, cfg)
# Check if interrupted snap = remote.get_state(cfg) while snap.interrupts: for interrupt in snap.interrupts: print("Approval required:", interrupt.value) decision = input("Approve? (y/n): ") state = remote.invoke(Command(resume=decision), cfg) snap = remote.get_state(cfg)
return state
result = run_with_approval({"proposal": "Deploy to prod"})4. State inspection and manual update
Section titled “4. State inspection and manual update”from langgraph.pregel.remote import RemoteGraph
remote = RemoteGraph("pipeline", url="https://my-deployment.langsmith.com")cfg = {"configurable": {"thread_id": "pipe-42"}}
# Inspect current statesnap = remote.get_state(cfg)print("Current values:", snap.values)print("Next nodes:", snap.next)
# Manually inject a value as if it came from a nodenew_cfg = remote.update_state( cfg, {"status": "approved", "approver": "alice"}, as_node="review_step",)
# Continue from the updated stateresult = remote.invoke(None, new_cfg)5. Custom headers for per-request auth
Section titled “5. Custom headers for per-request auth”from langgraph.pregel.remote import RemoteGraph
remote = RemoteGraph("secure_agent", url="https://my-deployment.langsmith.com")
def call_for_user(user_token: str, query: str) -> dict: return remote.invoke( {"query": query}, {"configurable": {"thread_id": user_token}}, headers={"X-User-Token": user_token}, # forwarded per-request )6. Wrapping RemoteGraph with fallback to a local graph
Section titled “6. Wrapping RemoteGraph with fallback to a local graph”from langgraph.pregel.remote import RemoteGraph, RemoteExceptionfrom langgraph.graph import StateGraph, START, END
# Build a lightweight local fallbacklocal_graph = ... # a compiled StateGraph
remote = RemoteGraph("main_agent", url="https://my-deployment.langsmith.com")
def invoke_with_fallback(input_: dict, cfg: dict) -> dict: try: return remote.invoke(input_, cfg) except (RemoteException, Exception): # Fall back to local graph when the remote is unavailable return local_graph.invoke(input_, cfg)Gotchas
Section titled “Gotchas”urlis required for auto-created clients. If you pass neitherurlnor an explicitclient/sync_client, every method call raisesValueError.assistant_idis positional-only. You cannot writeRemoteGraph(assistant_id="x", url=...)— the first arg has no keyword form.- Thread-ID must come from the caller. Unlike a local graph compiled with
InMemorySaver, the remote server has its own checkpointer. Always setconfig["configurable"]["thread_id"]. checkpointer=Truesemantics when used as a subgraph. When wired into a parentStateGraph, theRemoteGraphuses the parent’sthread_idto isolate its checkpoint namespace. The server-side checkpointer must be enabled on the deployment.namedefaults toassistant_id. Always passname=when adding as a node; the Mermaid diagram showsassistant_idotherwise, which is usually a UUID.astream_eventsraisesNotImplementedError. Useastreamwith the appropriatestream_modeinstead.distributed_tracing=Truerequires an active LangSmith session. Without aLANGSMITH_API_KEYand an activeRunTree, the tracing headers are not added — no error is raised.params=are passed as HTTP query parameters. Useful for server-side feature flags or route selection, but their interpretation is deployment-specific.subgraphs=Trueonstreamexposes inner namespace paths. The namespace tuples include the server-side task IDs, which change on each run. Filter by node name prefix, not the full tuple.
Breaking changes
Section titled “Breaking changes”| Version | Change |
|---|---|
| 0.3.14 (sdk) | RemoteGraph uses the LangGraph SDK SyncLangGraphClient / LangGraphClient internally; url auto-creates both. |
| 1.2.1 | bulk_update_state / abulk_update_state added for multi-step state injection. name= kwarg respects custom labels in diagram. |
| 1.0 | RemoteGraph first stabilised; moved from langgraph.pregel.remote to top-level re-export from langgraph.pregel. |