Skip to main content
PostgresSaver is a checkpoint saver that stores checkpoints in a PostgreSQL database. It provides a robust, production-ready persistence solution for LangGraph agents with support for high concurrency and advanced features.

Overview

PostgresSaver is designed for:
  • Production workloads
  • High concurrency applications
  • Multi-threaded environments
  • Distributed systems
  • Applications requiring advanced querying capabilities

Class Definition

from langgraph.checkpoint.postgres import PostgresSaver

class PostgresSaver(BasePostgresSaver):
    """Checkpointer that stores checkpoints in a Postgres database."""
Source: langgraph.checkpoint.postgres.__init__:32

Installation

Install the PostgreSQL checkpoint package:
pip install langgraph-checkpoint-postgres
This package requires psycopg (version 3+) and psycopg-pool for connection pooling.

Constructor

def __init__(
    self,
    conn: Connection | ConnectionPool,
    pipe: Pipeline | None = None,
    serde: SerializerProtocol | None = None,
) -> None

Parameters

  • conn (Connection | ConnectionPool): PostgreSQL connection or connection pool
  • pipe (Pipeline | None): Optional psycopg Pipeline for batching operations
  • serde (SerializerProtocol | None): Serializer for encoding/decoding checkpoints. Defaults to JsonPlusSerializer
Pipeline should only be used with a single Connection, not ConnectionPool.
Source: langgraph.checkpoint.postgres.__init__:37

Usage

Basic Setup

from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph

DB_URI = "postgres://user:password@localhost:5432/mydatabase"

# Create a graph
builder = StateGraph(int)
builder.add_node("add_one", lambda x: x + 1)
builder.set_entry_point("add_one")
builder.set_finish_point("add_one")

# Use PostgresSaver
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    # Setup database (must be called once)
    checkpointer.setup()
    
    # Compile graph with checkpointer
    graph = builder.compile(checkpointer=checkpointer)
    
    # Use the graph
    config = {"configurable": {"thread_id": "user-123"}}
    result = graph.invoke(3, config)
    print(result)  # Output: 4

Using with Pipeline

Pipeline mode enables batching of database operations for better performance:
with PostgresSaver.from_conn_string(DB_URI, pipeline=True) as checkpointer:
    checkpointer.setup()
    graph = builder.compile(checkpointer=checkpointer)
    config = {"configurable": {"thread_id": "user-123"}}
    result = graph.invoke(3, config)
Source: langgraph.checkpoint.postgres.__init__:54

Using Connection Pool

from psycopg_pool import ConnectionPool
from langgraph.checkpoint.postgres import PostgresSaver

with ConnectionPool(DB_URI) as pool:
    checkpointer = PostgresSaver(pool)
    checkpointer.setup()
    graph = builder.compile(checkpointer=checkpointer)
    # Use graph...

Class Methods

from_conn_string

@classmethod
@contextmanager
def from_conn_string(
    cls, 
    conn_string: str, 
    *, 
    pipeline: bool = False
) -> Iterator[PostgresSaver]
Create a new PostgresSaver instance from a connection string. Parameters:
  • conn_string (str): PostgreSQL connection string (e.g., postgres://user:pass@host:port/db)
  • pipeline (bool): Whether to use Pipeline for batching operations (default: False)
Returns:
  • Iterator[PostgresSaver]: A context manager yielding a PostgresSaver instance
Example:
DB_URI = "postgres://postgres:postgres@localhost:5432/checkpoints?sslmode=disable"

with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup()
    # Use checkpointer...
Source: langgraph.checkpoint.postgres.__init__:54

Instance Methods

setup

def setup(self) -> None
Set up the checkpoint database. Creates necessary tables and runs migrations. Important: This method MUST be called directly by the user the first time the checkpointer is used. Example:
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup()  # Required on first use
    graph = builder.compile(checkpointer=checkpointer)
Source: langgraph.checkpoint.postgres.__init__:77

get_tuple

def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None
Get a checkpoint tuple from the database. Parameters:
  • config (RunnableConfig): Configuration containing thread_id and optionally checkpoint_id
Returns:
  • CheckpointTuple | None: The checkpoint tuple, or None if not found
Example:
# Get latest checkpoint
config = {"configurable": {"thread_id": "user-123"}}
checkpoint_tuple = checkpointer.get_tuple(config)

# Get specific checkpoint by ID
config = {
    "configurable": {
        "thread_id": "user-123",
        "checkpoint_ns": "",
        "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875",
    }
}
checkpoint_tuple = checkpointer.get_tuple(config)
Source: langgraph.checkpoint.postgres.__init__:184

list

def list(
    self,
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
) -> Iterator[CheckpointTuple]
List checkpoints from the database. Parameters:
  • config (RunnableConfig | None): Base configuration for filtering
  • filter (dict[str, Any] | None): Additional metadata filtering criteria
  • before (RunnableConfig | None): Only return checkpoints before this checkpoint ID
  • limit (int | None): Maximum number of checkpoints to return
Returns:
  • Iterator[CheckpointTuple]: Iterator of checkpoint tuples, ordered by checkpoint ID (newest first)
Example:
# List all checkpoints for a thread
config = {"configurable": {"thread_id": "user-123"}}
checkpoints = list(checkpointer.list(config, limit=10))

# List with metadata filter
filter_criteria = {"source": "input"}
checkpoints = list(checkpointer.list(config, filter=filter_criteria))

# List checkpoints before a specific checkpoint
before_config = {
    "configurable": {
        "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875"
    }
}
checkpoints = list(checkpointer.list(config, before=before_config, limit=5))
Source: langgraph.checkpoint.postgres.__init__:104

put

def put(
    self,
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig
Save a checkpoint to the database. Parameters:
  • config (RunnableConfig): Configuration for the checkpoint
  • checkpoint (Checkpoint): The checkpoint to save
  • metadata (CheckpointMetadata): Additional metadata
  • new_versions (ChannelVersions): New channel versions
Returns:
  • RunnableConfig: Updated configuration with the new checkpoint ID
Example:
config = {"configurable": {"thread_id": "user-123", "checkpoint_ns": ""}}
checkpoint = {
    "v": 1,
    "ts": "2024-05-04T06:32:42.235444+00:00",
    "id": "1ef4f797-8335-6428-8001-8a1503f9b875",
    "channel_values": {"messages": [], "state": "active"},
    "channel_versions": {},
    "versions_seen": {},
}
metadata = {"source": "input", "step": 1, "run_id": "abc123"}
saved_config = checkpointer.put(config, checkpoint, metadata, {})
Source: langgraph.checkpoint.postgres.__init__:255

put_writes

def put_writes(
    self,
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None
Store intermediate writes linked to a checkpoint. Parameters:
  • config (RunnableConfig): Configuration of the related checkpoint
  • writes (Sequence[tuple[str, Any]]): List of (channel, value) pairs to store
  • task_id (str): Identifier for the task creating the writes
  • task_path (str): Path of the task (default: "")
Source: langgraph.checkpoint.postgres.__init__:336

delete_thread

def delete_thread(self, thread_id: str) -> None
Delete all checkpoints and writes associated with a thread ID. Parameters:
  • thread_id (str): The thread ID to delete
Example:
checkpointer.delete_thread("user-123")
Source: langgraph.checkpoint.postgres.__init__:370

Database Schema

PostgresSaver creates three tables:

checkpoints table

CREATE TABLE checkpoints (
    thread_id TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    checkpoint_id TEXT NOT NULL,
    parent_checkpoint_id TEXT,
    type TEXT,
    checkpoint JSONB NOT NULL,
    metadata JSONB NOT NULL DEFAULT '{}',
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);

checkpoint_blobs table

CREATE TABLE checkpoint_blobs (
    thread_id TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    channel TEXT NOT NULL,
    version TEXT NOT NULL,
    type TEXT NOT NULL,
    blob BYTEA,
    PRIMARY KEY (thread_id, checkpoint_ns, channel, version)
);

checkpoint_writes table

CREATE TABLE checkpoint_writes (
    thread_id TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    checkpoint_id TEXT NOT NULL,
    task_id TEXT NOT NULL,
    idx INTEGER NOT NULL,
    channel TEXT NOT NULL,
    type TEXT,
    blob BYTEA NOT NULL,
    task_path TEXT NOT NULL DEFAULT '',
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);
Indices are automatically created on thread_id columns for performance.

AsyncPostgresSaver

For async applications, use AsyncPostgresSaver:
import asyncio
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.graph import StateGraph

DB_URI = "postgres://user:password@localhost:5432/mydatabase"

async def main():
    builder = StateGraph(int)
    builder.add_node("add_one", lambda x: x + 1)
    builder.set_entry_point("add_one")
    builder.set_finish_point("add_one")
    
    async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
        await checkpointer.setup()
        graph = builder.compile(checkpointer=checkpointer)
        
        config = {"configurable": {"thread_id": "user-123"}}
        result = await graph.ainvoke(3, config)
        print(result)  # Output: 4

asyncio.run(main())
Source: langgraph.checkpoint.postgres.aio:32

Advanced Features

Connection Pooling

Use ConnectionPool for better resource management:
from psycopg_pool import ConnectionPool

pool = ConnectionPool(
    DB_URI,
    min_size=1,
    max_size=10,
    timeout=30,
)

checkpointer = PostgresSaver(pool)
checkpointer.setup()

Pipeline Mode

Pipeline mode batches database operations for improved performance:
with PostgresSaver.from_conn_string(DB_URI, pipeline=True) as checkpointer:
    checkpointer.setup()
    # Operations are automatically batched

JSONB Storage

PostgresSaver stores checkpoints as JSONB, enabling:
  • Efficient querying of checkpoint data
  • Native JSON operators in SQL queries
  • Indexing on specific JSON fields
  • Smaller storage footprint for structured data

Blob Storage

Large channel values are stored separately in checkpoint_blobs table for:
  • Optimized storage of binary data
  • Reduced checkpoint table size
  • Better query performance

Performance Considerations

  • Use connection pooling for multi-threaded applications
  • Enable pipeline mode when available for batching operations
  • Set appropriate pool sizes based on your concurrency requirements
  • Use indices on frequently queried metadata fields
  • Regular VACUUM operations to maintain performance

ShallowPostgresSaver

For specialized use cases requiring minimal checkpoint storage:
from langgraph.checkpoint.postgres.shallow import ShallowPostgresSaver

with ShallowPostgresSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup()
    # Uses optimized storage strategy
ShallowPostgresSaver stores only essential checkpoint data, reducing storage requirements.

Migrations

The checkpointer automatically runs database migrations on setup(). The migration system:
  • Tracks applied migrations in checkpoint_migrations table
  • Applies new migrations incrementally
  • Uses CONCURRENTLY for index creation to avoid locking
  • Supports version-based migration ordering

See Also