Checkpointers — API reference
Checkpointers — API reference
Section titled “Checkpointers — API reference”Verified against langgraph==1.2.1, langgraph-checkpoint==4.1.0, langgraph-checkpoint-sqlite==3.0.3, langgraph-checkpoint-postgres==3.0.5 (modules: langgraph.checkpoint.{base,memory,sqlite,postgres}).
A checkpointer is a BaseCheckpointSaver subclass. It persists the per-thread history of Checkpoint/CheckpointTuple objects so the graph can pause (interrupt), resume (Command(resume=...)), replay (get_state_history), time-travel, and keep short-term memory across invocations.
Pick the right backend:
| Backend | Import | Best for | Persists? | History? | Async | TTL | Pipeline |
|---|---|---|---|---|---|---|---|
InMemorySaver | langgraph.checkpoint.memory | Unit tests, demos, single-process dev | No | Yes | Yes (same class) | No | — |
SqliteSaver | langgraph.checkpoint.sqlite | Small single-process apps, CLI tools, on-disk scratch | Yes (file) | Yes | No | No | — |
AsyncSqliteSaver | langgraph.checkpoint.sqlite.aio | Async single-process apps (uses aiosqlite) | Yes (file) | Yes | Yes | No | — |
PostgresSaver | langgraph.checkpoint.postgres | Sync production deployments | Yes | Yes | No | No | Yes |
AsyncPostgresSaver | langgraph.checkpoint.postgres.aio | Async production deployments | Yes | Yes | Yes | No | Yes |
ShallowPostgresSaver / AsyncShallowPostgresSaver | langgraph.checkpoint.postgres.shallow | Latest-only row, no time travel | Yes | No | (both) | No | Yes |
ShallowPostgresSaveris deprecated since 2.0.20 and will be removed in a future release (its ownDeprecationWarningstill names 3.0.0, but as of 3.0.5 the class is retained for compatibility). UsePostgresSaverwithdurability="exit"oninvoke/streaminstead.
The SQLite and Postgres packages install separately:
pip install langgraph-checkpoint-sqlitepip install langgraph-checkpoint-postgresMinimal runnable example
Section titled “Minimal runnable example”import operatorimport sqlite3from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.checkpoint.sqlite import SqliteSaver
class S(TypedDict): count: Annotated[int, operator.add] # reducer: add new values to existing
def bump(state: S) -> dict: return {"count": 1} # adds 1 to the persisted value via the reducer
builder = StateGraph(S).add_node("bump", bump)builder.add_edge(START, "bump").add_edge("bump", END)
conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)checkpointer = SqliteSaver(conn)graph = builder.compile(checkpointer=checkpointer)
cfg = {"configurable": {"thread_id": "t-1"}}print(graph.invoke({"count": 0}, cfg)) # {'count': 1}print(graph.invoke({"count": 0}, cfg)) # {'count': 2} — accumulated from checkpointprint(graph.invoke({"count": 0}, cfg)) # {'count': 3}Without the
operator.addreducer,"count"uses defaultLastValuesemantics and every call would reset it back to0. A reducer (or aMessagesState-style append-only channel) is what makes state grow across runs — the checkpointer only persists it.
InMemorySaver
Section titled “InMemorySaver”from langgraph.checkpoint.memory import InMemorySaver
saver = InMemorySaver()# Or, with a custom serializer:# saver = InMemorySaver(serde=my_serde)Full constructor: InMemorySaver(*, serde=None, factory=defaultdict). factory swaps the underlying mapping type (e.g., a PersistentDict for on-disk simulation in tests); most callers leave it at the default.
Stores checkpoints in a nested defaultdict. Lost at process exit. Implements both sync and async methods (aget, aput, etc.) — it’s fine to use under asyncio.
No from_conn_string, no setup(). It is a context manager if you want explicit lifetime (with InMemorySaver() as saver: ...).
Import note:
InMemorySavermust be imported fromlanggraph.checkpoint.memory. The nameMemorySaverexists in the same module as a backward-compatible alias, butInMemorySaveris the primary name. Do not import from the top-levellanggraph.checkpointpackage.
Checkpointer type alias — subgraph usage
Section titled “Checkpointer type alias — subgraph usage”When composing graphs, the Checkpointer type alias controls whether a subgraph participates in checkpointing:
from langgraph.types import Checkpointer# Checkpointer = None | bool | BaseCheckpointSaver| Value | Effect on the subgraph |
|---|---|
None (default) | Inherit the parent graph’s checkpointer. |
True | Enable persistent checkpointing for this subgraph, using the parent’s backend. |
False | Disable checkpointing for this subgraph even when the parent has one. |
A BaseCheckpointSaver instance | Use this specific saver for the subgraph. |
from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.graph import StateGraph, START, END
# Subgraph that opts out of checkpointingsub_builder = StateGraph(SubState)sub_builder.add_node("step", my_step)sub_builder.add_edge(START, "step").add_edge("step", END)subgraph = sub_builder.compile(checkpointer=False) # no checkpointing
# Parent graph uses a checkpointer; subgraph does notparent_builder = StateGraph(ParentState)parent_builder.add_node("sub", subgraph)graph = parent_builder.compile(checkpointer=InMemorySaver())The ensure_valid_checkpointer() utility validates a value before it reaches the graph compiler:
from langgraph.types import ensure_valid_checkpointer
checkpointer = ensure_valid_checkpointer(my_checkpointer) # raises if invalid typeSqliteSaver / AsyncSqliteSaver
Section titled “SqliteSaver / AsyncSqliteSaver”import sqlite3from langgraph.checkpoint.sqlite import SqliteSaver
# Direct: own the connection yourself.conn = sqlite3.connect("checkpoints.db", check_same_thread=False)saver = SqliteSaver(conn) # __init__(conn, *, serde=None)
# Managed: connection opened and closed for you.with SqliteSaver.from_conn_string("checkpoints.db") as saver: graph = builder.compile(checkpointer=saver) graph.invoke(...)from_conn_string(conn_string)is a context manager (it uses@contextmanager). You must usewith; assigning the result to a variable and indexing into it will not work.setup()is called automatically on first use; you don’t need to invoke it.- The connection is opened with
check_same_thread=False; internal locking makes it safe across threads. - Use
":memory:"as the conn string for an ephemeral in-process DB.
Async variant:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as saver: graph = builder.compile(checkpointer=saver) await graph.ainvoke(..., cfg)Backed by aiosqlite. Same from_conn_string-as-context-manager rule applies (async context manager in this case).
PostgresSaver / AsyncPostgresSaver
Section titled “PostgresSaver / AsyncPostgresSaver”from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgres://user:pass@localhost:5432/db?sslmode=disable"
with PostgresSaver.from_conn_string(DB_URI) as saver: saver.setup() # REQUIRED on first use — runs schema migrations graph = builder.compile(checkpointer=saver) graph.invoke(inputs, {"configurable": {"thread_id": "t-1"}})- You must call
setup()explicitly the first time, unlikeSqliteSaver. It runs the embeddedMIGRATIONSand creates tablescheckpoints,checkpoint_blobs,checkpoint_writes,checkpoint_migrations. from_conn_string(conn_string, *, pipeline=False)opens a singlepsycopg.Connectionwithautocommit=True,prepare_threshold=0,row_factory=dict_row.pipeline=Truewraps it in apsycopgpipeline for fewer round-trips (single-connection only).- Direct constructor:
PostgresSaver(conn, pipe=None, serde=None).connmay be apsycopg.Connectionor apsycopg_pool.ConnectionPool(in which casepipemust beNone).
Async:
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
async with AsyncPostgresSaver.from_conn_string(DB_URI, pipeline=False) as saver: await saver.setup() graph = builder.compile(checkpointer=saver) await graph.ainvoke(..., cfg)Uses psycopg.AsyncConnection and optionally AsyncConnectionPool.
Connection pooling
Section titled “Connection pooling”For long-lived pools (web servers), construct directly with a pool:
from psycopg_pool import ConnectionPoolfrom psycopg.rows import dict_rowfrom langgraph.checkpoint.postgres import PostgresSaver
pool = ConnectionPool( DB_URI, max_size=20, kwargs={"autocommit": True, "prepare_threshold": 0, "row_factory": dict_row},)saver = PostgresSaver(pool)saver.setup()The autocommit=True, prepare_threshold=0, row_factory=dict_row combination is required — the saver depends on all three.
What gets stored per thread
Section titled “What gets stored per thread”A Checkpoint (TypedDict, langgraph.checkpoint.base):
{ "v": 1, # format version "id": "01HY...", # monotonically increasing ULID-ish "ts": "2026-04-22T12:34:56Z", "channel_values": {...}, # current value of every channel "channel_versions": {...}, # per-channel monotonic version "versions_seen": {...}, # per-node last seen versions "updated_channels": [...], # channels changed in this step}CheckpointMetadata tags each checkpoint with:
source:"input" | "loop" | "update" | "fork"— how the checkpoint was created.step:-1for the initial input,0for the first loop step, then1, 2, ....writes: mapping of node name → output written in this step.parents: mapping of checkpoint namespace → parent checkpoint id (subgraphs).
StateSnapshot — fields
Section titled “StateSnapshot — fields”graph.get_state(config) returns a StateSnapshot namedtuple. All fields:
from langgraph.checkpoint.base import StateSnapshot
snapshot: StateSnapshot = graph.get_state(config)
snapshot.values # dict[str, Any] — current channel valuessnapshot.next # tuple[str, ...] — names of nodes queued to run nextsnapshot.config # RunnableConfig — config that identifies this checkpointsnapshot.metadata # CheckpointMetadata | None — source, step, writes, parentssnapshot.created_at # str | None — ISO-8601 timestamp of this checkpointsnapshot.parent_config # RunnableConfig | None — config of the preceding checkpointsnapshot.tasks # tuple[PregelTask, ...] — pending task descriptorssnapshot.interrupts # tuple[Interrupt, ...] — interrupts raised in the current step (new in 1.2+)interrupts field (new in 1.2+)
Section titled “interrupts field (new in 1.2+)”snapshot.interrupts exposes the Interrupt objects raised during the most recent step — useful for inspecting why a graph paused without re-running it:
from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.types import interrupt
def approval_node(state): answer = interrupt("Approve this action?") # pauses execution return {"approved": answer}
# After the graph pauses, inspect without re-running:snapshot = graph.get_state(cfg)for intr in snapshot.interrupts: print(intr.value) # "Approve this action?"Iterating state history
Section titled “Iterating state history”get_state_history() returns an iterator of StateSnapshot objects, newest first:
for snapshot in graph.get_state_history(config, limit=10): print(snapshot.config["configurable"]["checkpoint_id"]) print(snapshot.created_at) print(snapshot.values) print(snapshot.metadata)Patching state manually
Section titled “Patching state manually”update_state() writes a new checkpoint as if the named node had produced the given output:
graph.update_state( config, {"counter": 42}, # channel values to patch as_node="my_node", # treat this update as if emitted by "my_node")# Returns a RunnableConfig pointing at the newly created checkpoint.Durability setting
Section titled “Durability setting”Durability controls when each checkpoint is flushed to the backend. Import the type or pass the literal string directly:
from langgraph.types import Durability# Durability = Literal["sync", "async", "exit"]| Value | Behavior |
|---|---|
"sync" | Checkpoint is persisted synchronously before the next step begins. Safest, slowest. |
"async" (default) | Checkpoint is persisted in the background while the next step executes. Best throughput. |
"exit" | Checkpoint is persisted only when the graph exits. No mid-run time-travel. Minimal I/O. |
Set it at compile time so every invocation of that graph uses the same policy:
from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.graph import StateGraph, START, END
graph = builder.compile(checkpointer=InMemorySaver())durability is a per-call option on invoke / stream / ainvoke / astream — it is not a compile() argument:
# Set per-run when calling invoke or stream:graph.invoke(inputs, cfg, durability="async") # background writes, better throughputgraph.stream(inputs, cfg, durability="sync") # wait for write before next stepgraph.invoke(inputs, cfg, durability="exit") # write only when graph exitsThe legacy checkpoint_during=False kwarg is still accepted and maps to durability="exit", but emits a DeprecationWarning. Migrate to the explicit durability= spelling.
Use
durability="exit"as a lightweight replacement for the deprecatedShallowPostgresSaver: you keep a single-row footprint per thread while preservingPostgresSaver’s full API.
Required config keys
Section titled “Required config keys”Every call that touches a checkpointer needs at least:
{"configurable": {"thread_id": "some-unique-id"}}Optionally also:
checkpoint_ns— subgraph namespace (set automatically by parents).checkpoint_id— fetch/resume from a specific checkpoint (time travel).
Calling graph.invoke(input, {"configurable": {}}) on a graph with a checkpointer raises ValueError: Checkpointer requires one or more of the following 'configurable' keys: thread_id, checkpoint_ns, checkpoint_id.
BaseCheckpointSaver methods you’ll actually use
Section titled “BaseCheckpointSaver methods you’ll actually use”saver.get_tuple(config) -> CheckpointTuple | Nonesaver.list(config, *, filter=None, before=None, limit=None) -> Iterator[CheckpointTuple]saver.put(config, checkpoint, metadata, new_versions) -> RunnableConfigsaver.put_writes(config, writes, task_id, task_path="") -> Nonesaver.delete_thread(thread_id) -> None# All have async twins: aget_tuple, alist, aput, aput_writes, adelete_threadTypical app code uses the graph-level helpers instead:
graph.get_state(cfg) # uses saver.get_tuplelist(graph.get_state_history(cfg)) # uses saver.listgraph.update_state(cfg, {"count": 42}) # uses saver.put + saver.put_writesawait graph.adelete_thread(thread_id) # routes to the checkpointerThread lifecycle management: delete_thread / adelete_thread
Section titled “Thread lifecycle management: delete_thread / adelete_thread”Every checkpoint, write, and blob for a thread is permanently removed when you call delete_thread. Use this for GDPR right-to-erasure flows, session cleanup, or bounded-memory test teardown.
from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.graph import StateGraph, START, ENDfrom typing_extensions import TypedDict
class S(TypedDict): count: int
builder = StateGraph(S).add_node("bump", lambda s: {"count": s["count"] + 1})builder.add_edge(START, "bump").add_edge("bump", END)
saver = InMemorySaver()graph = builder.compile(checkpointer=saver)
cfg = {"configurable": {"thread_id": "user-42"}}graph.invoke({"count": 0}, cfg)graph.invoke({"count": 0}, cfg) # count == 2 after two calls
# Full GDPR erasure for user-42saver.delete_thread("user-42")# or via the graph: graph.delete_thread("user-42")
# The thread no longer exists — next invoke starts freshgraph.invoke({"count": 0}, cfg) # count == 1 againAsync variant:
await saver.adelete_thread("user-42")# or:await graph.adelete_thread("user-42")InMemorySaver.delete_thread removes entries from all three internal dicts (storage, writes, blobs) in a single call. PostgresSaver / AsyncPostgresSaver issue DELETE statements targeting all three backing tables (checkpoints, checkpoint_blobs, checkpoint_writes) for the given thread ID.
Deleting a thread is permanent. There is no soft-delete or recycle bin — the data is gone immediately. Call
get_state_historyto archive thread content before deletion if you need an audit trail.
Serializers
Section titled “Serializers”Default: JsonPlusSerializer from langgraph.checkpoint.serde.jsonplus — handles Pydantic models, dataclasses, datetime, uuid, Decimal, LangChain BaseMessage, and plain JSON.
For confidentiality at rest, wrap it with EncryptedSerializer:
# pip install pycryptodomeimport osfrom langgraph.checkpoint.serde.encrypted import EncryptedSerializerfrom langgraph.checkpoint.memory import InMemorySaver
# AES-128 key: must be exactly 16, 24, or 32 byteskey = os.urandom(16)
encrypted_serde = EncryptedSerializer.from_pycryptodome_aes(key=key)saver = InMemorySaver(serde=encrypted_serde)
# All data stored in the checkpointer is AES-EAX encrypted.# PostgresSaver and SqliteSaver also accept serde=:# saver = PostgresSaver(conn, serde=encrypted_serde)Alternatively, set LANGGRAPH_AES_KEY in your environment (16, 24, or 32 character string) and omit the key= argument:
export LANGGRAPH_AES_KEY="your-16-char-key"# key is read from LANGGRAPH_AES_KEY automaticallyencrypted_serde = EncryptedSerializer.from_pycryptodome_aes()All savers accept serde=... in their constructor. InMemorySaver accepts it too, via kwarg only.
Patterns
Section titled “Patterns”1. Conversation memory (short-term)
Section titled “1. Conversation memory (short-term)”from langgraph.checkpoint.memory import InMemorySavergraph = builder.compile(checkpointer=InMemorySaver())
cfg = {"configurable": {"thread_id": "alice"}}graph.invoke({"messages": [HumanMessage("Hi")]}, cfg)graph.invoke({"messages": [HumanMessage("What did I say?")]}, cfg)# Second call sees the full message history for thread 'alice'.2. Time-travel / replay
Section titled “2. Time-travel / replay”history = list(graph.get_state_history(cfg))# history[0] is latest; history[-1] is the initial input.earlier = history[3].configgraph.invoke(None, earlier) # replays from that checkpointPassing None as input means “continue from the saved state” — the initial state is already there.
3. Fork a branch
Section titled “3. Fork a branch”# Edit state at a past checkpoint, creating a new branch.new_cfg = graph.update_state(earlier, {"plan": "take-a-different-route"})graph.invoke(None, new_cfg)update_state returns a config pointing at the new checkpoint; passing it to invoke continues from there.
4. Time-travel with full snapshot inspection
Section titled “4. Time-travel with full snapshot inspection”Use StateSnapshot fields to pick a fork point programmatically:
from langgraph.checkpoint.memory import InMemorySaver
saver = InMemorySaver()graph = builder.compile(checkpointer=saver)
cfg = {"configurable": {"thread_id": "replay-demo"}}
# Run the graph several times to build up historyfor _ in range(5): graph.invoke({"count": 0}, cfg)
# Retrieve full history (newest first)history = list(graph.get_state_history(cfg))
# Inspect each snapshotfor snap in history: print(snap.created_at, snap.values, snap.metadata["step"])
# Pick a specific past snapshot by index (index 2 = 3rd-most-recent)old_snapshot = history[2]print("Rewinding to step:", old_snapshot.metadata["step"])print("State at that point:", old_snapshot.values)print("Interrupts at that point:", old_snapshot.interrupts) # new in 1.2+
# Fork: replay from that checkpoint with no new inputforked_config = old_snapshot.configresult = graph.invoke(None, forked_config) # resumes from the saved state
# Or patch state before replaying (creates a new branch)patched_config = graph.update_state( forked_config, {"count": 99}, # override a channel value as_node="bump", # attribute the patch to the "bump" node)result = graph.invoke(None, patched_config)5. Production Postgres with pooling
Section titled “5. Production Postgres with pooling”from contextlib import asynccontextmanagerfrom psycopg_pool import AsyncConnectionPoolfrom psycopg.rows import dict_rowfrom langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
pool = AsyncConnectionPool( DB_URI, max_size=32, kwargs={"autocommit": True, "prepare_threshold": 0, "row_factory": dict_row}, open=False,)saver = AsyncPostgresSaver(pool)
@asynccontextmanagerasync def lifespan(app): await pool.open() await saver.setup() yield await pool.close()6. Per-user thread IDs
Section titled “6. Per-user thread IDs”Namespace thread ids by user so a leak cannot cross accounts:
cfg = {"configurable": {"thread_id": f"user:{user_id}:conv:{conv_id}"}}graph.invoke({"messages": msgs}, cfg)For cross-thread (long-term) memory, pair with a Store — see the Store reference.
7. Listing all threads for a user (audit / GDPR)
Section titled “7. Listing all threads for a user (audit / GDPR)”saver.list(config=None) iterates all checkpoints across all threads. Filter by metadata or walk the storage dict (for InMemorySaver) to enumerate threads for a given user prefix:
from langgraph.checkpoint.memory import InMemorySaver
saver = InMemorySaver()
def list_user_threads(saver: InMemorySaver, user_prefix: str) -> list[str]: return [ tid for tid in saver.storage if tid.startswith(user_prefix) ]
def purge_user(saver: InMemorySaver, user_id: str) -> int: threads = list_user_threads(saver, f"user:{user_id}:") for tid in threads: saver.delete_thread(tid) return len(threads)
# Example:cfg = {"configurable": {"thread_id": "user:alice:conv:1"}}graph.invoke({"messages": [HumanMessage("hi")]}, cfg)
purge_user(saver, "alice") # removes all threads for aliceFor PostgresSaver, query SELECT DISTINCT thread_id FROM checkpoints WHERE thread_id LIKE 'user:alice:%' and then call saver.delete_thread(tid) for each row.
8. filter= on list() — metadata-scoped history
Section titled “8. filter= on list() — metadata-scoped history”Use filter to restrict list() to checkpoints from a specific source or step range:
# Only checkpoints written after a graph invocation (source='loop' or source='update')checkpoints = list( saver.list(cfg, filter={"source": "loop"}, limit=5))
# Checkpoints at a specific step:checkpoints = list( saver.list(cfg, filter={"step": 2}))filter is a dict of CheckpointMetadata key/value pairs. All pairs must match (AND semantics).
Gotchas
Section titled “Gotchas”from_conn_stringis a context manager, not a factory.saver = SqliteSaver.from_conn_string("x.db")yields a context manager object, not a saver. Always usewith.- Postgres needs
setup()once. Don’t skip it on first deploy; the migration table is bootstrapped from this call. ShallowPostgresSaveronly keeps the latest checkpoint. Noget_state_history, no forking, no time travel. Deprecated — preferPostgresSaverwithdurability="exit".thread_idis required. A checkpointed graph called with an empty configurable dict raisesValueError.InMemorySaveris not persistent. Restarting the process loses all threads. Not suitable for Platform-hosted agents (the managed checkpointer is injected automatically there — don’t pass one at all).- Don’t share a raw
psycopg.Connectionacross threads without the saver. The saver holds athreading.Lock; bypassing it breaksautocommitcontract. - Deleting a thread is
delete_thread(thread_id)— this is a checkpointer method, not a graph method on older versions. In v1.x,graph.delete_thread/graph.adelete_threadforward to the checkpointer. InMemorySaverimport path. Always import fromlanggraph.checkpoint.memory.MemorySaveris an alias in the same module butInMemorySaveris the canonical name.Checkpointer=Falseon subgraphs. Passingcheckpointer=Falsewhen compiling a subgraph disables checkpointing for that subgraph even when the parent has one. This is intentional; useNone(the default) to inherit the parent’s backend.
Breaking changes
Section titled “Breaking changes”| Version | Change |
|---|---|
| langgraph 1.2.1 | StateSnapshot.interrupts field added; Checkpointer type alias (None | bool | BaseCheckpointSaver) formalised in langgraph.types; ensure_valid_checkpointer() utility added. |
| checkpoint 4.0 | Checkpoint.v == 1 is the supported format; checkpoints with v < 4 from the old pending-sends schema are auto-migrated on read. |
| checkpoint 3.x | checkpoint_during kwarg deprecated; migrate to durability="sync" | "async" | "exit". |
| postgres 3.0 / shallow 2.0.20 | ShallowPostgresSaver / AsyncShallowPostgresSaver deprecated; prefer PostgresSaver with durability="exit". |
| langgraph 0.4 | Interrupt.id introduced; interrupt_id deprecated. |
| langgraph 0.6 | Interrupt.ns, when, resumable, interrupt_id removed. |