Skip to content

LangGraph Observability and Monitoring (Python)

LangGraph Observability and Monitoring (Python)

Section titled “LangGraph Observability and Monitoring (Python)”

Verified against langgraph==1.2.2 (modules: langgraph.runtime, langgraph.types, langgraph.graph.state).

LangGraph provides several built-in mechanisms for observing what your graph is doing — without external instrumentation libraries:

MechanismAPIWhat you see
Custom stream eventsruntime.stream_writer(...)Arbitrary events from inside any node
Execution metadataruntime.execution_infoThread ID, task ID, attempt number, retry timing
State snapshotsgraph.get_state()Current state at any point
Full state historygraph.get_state_history()Every checkpoint in the thread
Debug streamingstream_mode="debug"Node start/end events with timing
LangSmith tracingEnvironment variablesAutomatic traces in LangSmith

1. runtime.stream_writer — emit custom events from nodes

Section titled “1. runtime.stream_writer — emit custom events from nodes”

runtime.stream_writer is a callable that emits any JSON-serialisable value on the "custom" stream channel. Clients reading the stream with stream_mode="custom" (or the combined ["values", "custom"]) receive these events in real time.

from typing import Annotated
from typing_extensions import TypedDict
from langchain_core.messages import AnyMessage, AIMessage, HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.runtime import Runtime
from langgraph.checkpoint.memory import InMemorySaver
from langchain_anthropic import ChatAnthropic
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
query: str
model = ChatAnthropic(model="claude-3-5-sonnet-20241022")
def research_node(state: State, runtime: Runtime) -> dict:
"""A multi-step node that emits progress events."""
# Emit start event
runtime.stream_writer({"event": "research_start", "query": state["query"]})
# Step 1 — search
runtime.stream_writer({"event": "step", "name": "web_search", "status": "running"})
search_results = web_search(state["query"])
runtime.stream_writer({"event": "step", "name": "web_search", "status": "done",
"result_count": len(search_results)})
# Step 2 — summarize
runtime.stream_writer({"event": "step", "name": "summarize", "status": "running"})
response = model.invoke([
HumanMessage(f"Summarize these results: {search_results}")
])
runtime.stream_writer({"event": "step", "name": "summarize", "status": "done"})
# Emit completion event
runtime.stream_writer({"event": "research_done", "query": state["query"]})
return {"messages": [response]}
builder = StateGraph(State)
builder.add_node("research", research_node)
builder.add_edge(START, "research")
builder.add_edge("research", END)
graph = builder.compile(checkpointer=InMemorySaver())
# Consume custom events alongside state updates
config = {"configurable": {"thread_id": "obs-1"}}
for event in graph.stream(
{"query": "Latest AI breakthroughs"},
config,
stream_mode=["values", "custom"], # receive both state values AND custom events
):
mode, data = event # tuple: ("values", state_dict) or ("custom", your_payload)
if mode == "custom":
print(f"[custom] {data}")
elif mode == "values":
print(f"[state] messages: {len(data.get('messages', []))}")

1.2 Streaming with get_runtime() — when you can’t annotate the function

Section titled “1.2 Streaming with get_runtime() — when you can’t annotate the function”

If you’re wrapping third-party code that has a fixed signature, use get_runtime() instead of parameter injection:

from langgraph.runtime import get_runtime
def third_party_node(state: State) -> dict:
"""Fixed signature — can't add a runtime parameter."""
runtime = get_runtime()
runtime.stream_writer({"event": "third_party_start"})
result = run_third_party_lib(state["query"])
runtime.stream_writer({"event": "third_party_done"})
return {"result": result}

get_runtime() raises RuntimeError if called outside a Pregel task context (i.e. outside an active graph execution).


2. ExecutionInfo — structured execution metadata

Section titled “2. ExecutionInfo — structured execution metadata”

runtime.execution_info provides structured access to IDs and counters that are otherwise scattered across RunnableConfig and the Pregel scratchpad. Use it for correlation IDs in logs and metrics.

# Source-verified fields (langgraph.runtime.ExecutionInfo, langgraph 1.2.2):
@dataclass(frozen=True, slots=True)
class ExecutionInfo:
checkpoint_id: str
checkpoint_ns: str
task_id: str
thread_id: str | None # None if no checkpointer
run_id: str | None # None if not in RunnableConfig
node_attempt: int = 1 # 1-indexed; increments on retries
node_first_attempt_time: float | None = None # unix timestamp of first attempt

2.1 Structured logging with correlation IDs

Section titled “2.1 Structured logging with correlation IDs”
import logging
import time
from langgraph.runtime import Runtime
logger = logging.getLogger(__name__)
def observable_node(state: State, runtime: Runtime) -> dict:
info = runtime.execution_info
logger.info(
"Node started",
extra={
"thread_id": info.thread_id,
"task_id": info.task_id,
"run_id": info.run_id,
"attempt": info.node_attempt,
},
)
if info.node_attempt > 1:
elapsed = time.time() - (info.node_first_attempt_time or time.time())
logger.warning(
"Node retry",
extra={"attempt": info.node_attempt, "elapsed_since_first": elapsed},
)
# Emit a retry event to the custom stream
runtime.stream_writer({"event": "retry", "attempt": info.node_attempt})
result = do_work(state["query"])
logger.info("Node completed", extra={"task_id": info.task_id})
return {"result": result}
from langgraph.runtime import Runtime
import time
def metered_node(state: State, runtime: Runtime) -> dict:
info = runtime.execution_info
start = time.monotonic()
try:
result = do_expensive_work(state["query"])
status = "success"
except Exception:
status = "error"
raise
finally:
duration = time.monotonic() - start
# Emit a metric event — consumed by your monitoring pipeline
runtime.stream_writer({
"metric": "node_duration_seconds",
"value": duration,
"labels": {
"thread_id": info.thread_id,
"status": status,
"attempt": info.node_attempt,
},
})
return {"result": result}

3. State history inspection — get_state and get_state_history

Section titled “3. State history inspection — get_state and get_state_history”

CompiledStateGraph provides methods to inspect the state at any checkpoint:

from langgraph.checkpoint.memory import InMemorySaver
graph = builder.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": "inspect-1"}}
# Run the graph
graph.invoke({"query": "test"}, config)
# Get the current state
snapshot = graph.get_state(config)
print(f"Values: {snapshot.values}") # current state dict
print(f"Next: {snapshot.next}") # which nodes run next (empty = done)
print(f"Created: {snapshot.created_at}") # ISO timestamp of this checkpoint
print(f"Metadata: {snapshot.metadata}") # step number, source, writes
# List every checkpoint for a thread in reverse chronological order
history = list(graph.get_state_history(config))
print(f"Total checkpoints: {len(history)}")
for i, snapshot in enumerate(history):
cp_id = snapshot.config["configurable"]["checkpoint_id"]
print(f"\nCheckpoint {i}: {cp_id}")
print(f" Next node: {snapshot.next}")
print(f" State keys: {list(snapshot.values.keys())}")
print(f" Step: {snapshot.metadata.get('step', '?')}")

3.3 Replay from a specific checkpoint (time-travel)

Section titled “3.3 Replay from a specific checkpoint (time-travel)”
# Get the state from 2 checkpoints ago
old_snapshot = history[2]
old_config = old_snapshot.config # includes checkpoint_id
# Re-invoke from that historical checkpoint
result = graph.invoke(None, old_config)
print(f"Replayed from checkpoint: {result}")

3.4 get_subgraphs — inspect nested graph state

Section titled “3.4 get_subgraphs — inspect nested graph state”

When your graph contains subgraphs, aget_subgraphs() (async) traverses the nested execution:

async def inspect_subgraphs():
config = {"configurable": {"thread_id": "sub-1"}}
async for name, subgraph in graph.aget_subgraphs(recurse=True):
state = await subgraph.aget_state(config)
print(f"Subgraph '{name}': {list(state.values.keys())}")

4. Debug streaming — built-in node event tracing

Section titled “4. Debug streaming — built-in node event tracing”

stream_mode="debug" emits structured events for every node start, node end, and checkpoint write — no extra instrumentation required.

config = {"configurable": {"thread_id": "debug-1"}}
for event in graph.stream({"query": "test"}, config, stream_mode="debug"):
event_type = event.get("type")
if event_type == "task":
# A node task is about to start
print(f"→ Starting: {event['payload']['name']} (step {event['step']})")
elif event_type == "task_result":
# A node task completed
node = event["payload"]["name"]
error = event["payload"].get("error")
if error:
print(f"✗ Failed: {node}{error}")
else:
print(f"✓ Done: {node}")
elif event_type == "checkpoint":
print(f"⬛ Checkpoint written (step {event['step']})")

5. LangSmith tracing — automatic, zero-code

Section titled “5. LangSmith tracing — automatic, zero-code”

Set two environment variables and every graph run is automatically traced in LangSmith — node-by-node latency, inputs/outputs, errors, retry counts.

Terminal window
export LANGCHAIN_API_KEY="ls__..."
export LANGCHAIN_TRACING_V2="true"
export LANGCHAIN_PROJECT="my-agent" # optional; defaults to "default"

No code changes needed. Every graph.invoke() and graph.stream() call will appear in the LangSmith UI with:

  • Node-by-node execution timeline and latency
  • Full input/output state at each node
  • Error details and retry counts
  • Thread and run IDs for correlation
from langchain_core.runnables import RunnableConfig
config: RunnableConfig = {
"configurable": {"thread_id": "user-123"},
"tags": ["production", "user-query"], # filter runs by tag in LangSmith
"metadata": {"user_id": "user-123", # attach arbitrary metadata
"env": "prod"},
"run_name": "customer_support_query", # human-readable name in LangSmith
}
result = graph.invoke({"query": "help me"}, config)

For distributed tracing across services, use OpenTelemetry. Instead of wrapping node functions manually (which hides the true node boundary), use stream_mode="debug" events as the trigger for span creation:

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer("langgraph-app")
async def run_with_otel_tracing(query: str) -> dict:
config = {"configurable": {"thread_id": "otel-1"}}
spans: dict[str, trace.Span] = {}
result = {}
async for event in graph.astream(
{"query": query}, config, stream_mode="debug"
):
event_type = event.get("type")
if event_type == "task":
node_name = event["payload"]["name"]
span = tracer.start_span(
f"langgraph.node.{node_name}",
attributes={
"graph.node": node_name,
"graph.step": event["step"],
"graph.thread": config["configurable"]["thread_id"],
},
)
spans[node_name] = span
elif event_type == "task_result":
node_name = event["payload"]["name"]
span = spans.pop(node_name, None)
if span:
error = event["payload"].get("error")
if error:
span.set_status(Status(StatusCode.ERROR, error))
else:
span.set_status(Status(StatusCode.OK))
span.end()
elif event_type == "checkpoint":
result = event["payload"].get("values", {})
return result

What to monitorSignalHow to get it
Node latencyDuration between task and task_result debug eventsstream_mode="debug"
Retry rateruntime.execution_info.node_attempt > 1runtime.stream_writer({"metric": "retry"})
Error ratetask_result events with error setstream_mode="debug"
Active threadsThread IDs in useTrack in your own DB keyed by thread_id
State sizelen(json.dumps(state.values))After each get_state() call
Token usageLangSmith traces OR usage_metadata on AI messagesLangSmith or parse responses
Cache hit rateCustom event from node before/after cache checkruntime.stream_writer({"cache": "hit"})