LangGraph 狀態持久化深度解析:Checkpointer 與 Memory Store 完整指南


LangGraph 狀態持久化深度解析:Checkpointer 與 Memory Store 完整指南

建構生產級 AI Agent 時,狀態持久化是不可或缺的核心能力。LangGraph 提供了兩套互補的持久化機制:Checkpointer(短期記憶)和 Memory Store(長期記憶),讓你的 Agent 能夠記住對話上下文、支援 Human-in-the-loop 工作流程、實現跨對話的知識累積。

本文將深入剖析這兩套系統的架構設計、Backend 選型策略,以及生產環境部署的實戰經驗。


核心概念:短期記憶 vs 長期記憶

在深入技術細節之前,先釐清 LangGraph 的記憶模型:

特性Checkpointer(短期記憶)Store(長期記憶)
保存範圍單一對話線程(thread)跨對話、自定義 namespace
保存內容Graph 狀態快照任意結構化數據
主要用途對話上下文、中斷恢復、Time-travel用戶偏好、組織知識、學習記錄
生命週期隨線程存在持久化直到明確刪除

關鍵理解:Checkpointer 讓你的 Agent 在單次對話中保持上下文;Store 讓你的 Agent 能夠「記住」用戶並在多次交互中學習成長。


Part 1: Checkpointer 深度解析

1.1 核心架構:BaseCheckpointSaver

所有 Checkpointer 都實現 BaseCheckpointSaver 接口,提供四個核心方法:

class BaseCheckpointSaver:
    def put(self, config, checkpoint, metadata) -> None:
        """儲存 checkpoint 及其元數據"""

    def put_writes(self, config, writes, task_id) -> None:
        """儲存中間寫入(pending writes)"""

    def get_tuple(self, config) -> Optional[CheckpointTuple]:
        """根據配置獲取 checkpoint tuple"""

    def list(self, config, options) -> Iterator[CheckpointTuple]:
        """列出符合條件的 checkpoints(按時間降序)"""

1.2 Checkpoint 數據結構

每個 checkpoint 包含完整的 Graph 狀態快照:

{
    "v": 1,                                              # 版本號
    "ts": "2024-07-31T20:14:19.804150+00:00",            # 時間戳
    "id": "1ef4f797-8335-6428-8001-8a1503f9b875",        # 唯一 ID
    "channel_values": {                                  # 通道值(實際狀態)
        "messages": [...],
        "current_step": "agent"
    },
    "channel_versions": {...},                           # 通道版本追蹤
    "versions_seen": {...},                              # 已見版本
    "pending_sends": []                                  # 待發送訊息
}

關鍵標識符

  • thread_id:標識一系列相關的 checkpoints,實現多租戶對話
  • checkpoint_id:唯一標識特定的狀態快照,用於 Time-travel

1.3 Backend 選型指南

MemorySaver:開發與測試

from langgraph.checkpoint.memory import InMemorySaver

memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)

# 使用 thread_id 調用
config = {"configurable": {"thread_id": "user-123"}}
response = graph.invoke({"messages": [...]}, config=config)

適用場景

  • 本地開發和調試
  • 單元測試
  • 快速原型驗證

限制:進程重啟後數據丟失,不適合生產環境。

SqliteSaver:輕量級持久化

pip install langgraph-checkpoint-sqlite
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

# 同步版本
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

# 異步版本
async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

適用場景

  • 單機部署的小型應用
  • 實驗性項目
  • 對寫入性能要求不高的場景

限制:SQLite 的寫入鎖會限制並發性能。

PostgresSaver:生產環境首選

pip install langgraph-checkpoint-postgres "psycopg[binary,pool]"
from psycopg_pool import ConnectionPool
from psycopg.rows import dict_row
from langgraph.checkpoint.postgres import PostgresSaver

# 連接池配置(關鍵參數)
connection_kwargs = {
    "autocommit": True,        # 必須啟用
    "prepare_threshold": 0,
    "row_factory": dict_row,   # 必須設置
}

pool = ConnectionPool(
    conninfo="postgresql://user:password@localhost:5432/dbname",
    max_size=20,
    kwargs=connection_kwargs
)

checkpointer = PostgresSaver(pool)
checkpointer.setup()  # 首次使用時創建表結構

graph = builder.compile(checkpointer=checkpointer)

生產環境要點

  1. autocommit=True:確保 .setup() 正確提交表結構
  2. row_factory=dict_row:checkpointer 使用字典式訪問
  3. 連接池大小:根據並發量調整,建議 max_size=20

ShallowPostgresSaver:存儲優化版

如果不需要歷史記錄,可使用 ShallowPostgresSaver 僅保留最新 checkpoint:

from langgraph.checkpoint.postgres.shallow import ShallowPostgresSaver

shallow_checkpointer = ShallowPostgresSaver.from_conn_string(
    "postgresql://user:password@localhost:5432/dbname"
)

適用場景:不需要 Time-travel、僅需最新狀態的應用,可大幅降低存儲成本。


Part 2: Time-travel 與狀態回溯

Time-travel 是 LangGraph 最強大的功能之一,允許你回溯到歷史狀態並探索替代執行路徑。

2.1 獲取執行歷史

# 方法一:使用 graph.get_state_history()
for state in graph.get_state_history(config):
    print(f"Checkpoint: {state.config['configurable']['checkpoint_id']}")
    print(f"State: {state.values}")
    print(f"Next: {state.next}")  # 下一個要執行的節點
    print("---")

# 方法二:使用 LangGraph SDK(適用於 LangGraph Platform)
from langgraph_sdk import get_client
client = get_client()
history = await client.threads.get_history(thread_id="thread_1")

2.2 回溯到特定 Checkpoint

# 指定 checkpoint_id 回溯
config = {
    "configurable": {
        "thread_id": "thread_1",
        "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875"
    }
}

# 從該 checkpoint 繼續執行(會創建新的分支)
result = graph.invoke({"messages": [new_message]}, config=config)

重要:所有回溯操作都會在歷史中創建新的分支(fork),不會覆蓋原有歷史。

2.3 修改狀態並重放

# 獲取當前狀態
current_state = graph.get_state(config)

# 修改狀態(會創建新的 checkpoint)
graph.update_state(
    config,
    {"messages": [AIMessage(content="修正後的回覆")]},
    as_node="agent"  # 以特定節點身份更新
)

# 從修改後的狀態繼續執行
result = graph.invoke(None, config=config)

Part 3: Human-in-the-Loop 工作流程

3.1 使用 interrupt() 實現人工介入

LangGraph 0.2.31+ 推薦使用 interrupt() 函數:

from langgraph.types import interrupt, Command

def approval_node(state):
    # 最佳實踐:在節點開始處中斷
    user_decision = interrupt({
        "question": "是否批准此操作?",
        "options": ["approve", "reject", "modify"]
    })

    if user_decision == "approve":
        return {"status": "approved", "proceed": True}
    elif user_decision == "reject":
        return {"status": "rejected", "proceed": False}
    else:
        # 處理修改請求
        return {"status": "pending_modification"}

3.2 恢復中斷的執行

# 用戶做出決定後,恢復執行
result = graph.invoke(
    Command(resume="approve"),
    config={"configurable": {"thread_id": "approval-123"}}
)

3.3 設置斷點

# 在特定節點前自動暫停
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["sensitive_action", "final_confirmation"]
)

關鍵理解:中斷的線程不佔用運行時資源,僅佔用存儲空間。可以在數月後、不同機器上恢復執行。


Part 4: Memory Store 長期記憶系統

4.1 BaseStore 接口

from langgraph.store.base import BaseStore

class BaseStore:
    def put(self, namespace: tuple, key: str, value: dict) -> None:
        """存儲數據"""

    def get(self, namespace: tuple, key: str) -> Optional[Item]:
        """檢索特定數據"""

    def search(self, namespace: tuple, query: str, limit: int = 10) -> List[Item]:
        """語義搜索(需配置 embedding)"""

    def delete(self, namespace: tuple, key: str) -> None:
        """刪除數據"""

    def list_namespaces(self, prefix: tuple = None) -> List[tuple]:
        """列出命名空間"""

4.2 Namespace 概念

Namespace 是元組形式的層級路徑,類似文件夾結構:

# 按用戶組織
namespace = ("users", "user_123")
store.put(namespace, "preferences", {"theme": "dark", "language": "zh"})

# 按組織組織
namespace = ("orgs", "company_456", "knowledge")
store.put(namespace, "policy_v1", {"content": "..."})

# 混合組織
namespace = ("orgs", "company_456", "users", "user_123")

4.3 InMemoryStore 與 PostgresStore

InMemoryStore:開發用

from langgraph.store.memory import InMemoryStore
from langchain.embeddings import init_embeddings

# 基礎用法
store = InMemoryStore()

# 支持語義搜索
store = InMemoryStore(
    index={
        "dims": 1536,
        "embed": init_embeddings("openai:text-embedding-3-small")
    }
)

PostgresStore:生產用

pip install langgraph-checkpoint-postgres
from langgraph.store.postgres import PostgresStore

store = PostgresStore.from_conn_string(
    "postgresql://user:password@localhost:5432/dbname"
)
store.setup()

# 配置語義搜索
store = PostgresStore(
    connection_string="postgresql://...",
    index={
        "dims": 1536,
        "embed": init_embeddings("openai:text-embedding-3-small"),
        "fields": ["text"],  # 指定要嵌入的字段
    }
)

4.4 Cross-thread 記憶共享

Store 的核心價值在於跨對話共享記憶:

from langgraph.graph import StateGraph
from langgraph.store.postgres import AsyncPostgresStore
from langgraph.checkpoint.postgres import AsyncPostgresSaver

# 同時使用 Checkpointer 和 Store
graph = builder.compile(
    checkpointer=AsyncPostgresSaver(...),  # 短期記憶
    store=AsyncPostgresStore(...)           # 長期記憶
)

# 在節點中使用 store
def personalized_response(state, *, store):
    user_id = state["user_id"]
    namespace = ("users", user_id)

    # 讀取用戶歷史偏好
    prefs = store.get(namespace, "preferences")
    history = store.search(namespace, query=state["query"], limit=5)

    # 更新用戶記錄
    store.put(namespace, "last_interaction", {
        "timestamp": datetime.now().isoformat(),
        "topic": state["topic"]
    })

    return {"personalized_context": prefs, "relevant_history": history}

實際應用場景

  1. 用戶偏好管理:記住語言、主題、溝通風格
  2. 組織知識庫:共享公司政策、產品信息
  3. 學習記錄:追蹤用戶反饋,持續改進回覆品質

Part 5: 生產環境最佳實踐

5.1 連接池配置

from psycopg_pool import ConnectionPool, AsyncConnectionPool

# 同步連接池
sync_pool = ConnectionPool(
    conninfo="postgresql://user:password@localhost:5432/dbname",
    max_size=20,
    min_size=5,
    kwargs={
        "autocommit": True,
        "prepare_threshold": 0,
        "row_factory": dict_row,
    }
)

# 異步連接池
async_pool = AsyncConnectionPool(
    conninfo="postgresql://user:password@localhost:5432/dbname",
    max_size=20,
    min_size=5,
    kwargs={
        "autocommit": True,
        "prepare_threshold": 0,
        "row_factory": dict_row,
    }
)

5.2 錯誤處理與重試策略

from langgraph.pregel import RetryPolicy

# 配置重試策略
retry_policy = RetryPolicy(
    max_attempts=3,
    # LangGraph 默認使用指數退避
)

# 對特定節點應用重試
builder.add_node("llm_call", call_llm, retry=retry_policy)
builder.add_node("tool_call", execute_tool, retry=retry_policy)

最佳實踐

  • 僅對瞬態錯誤重試(5xx、429 Too Many Requests)
  • 保持重試範圍最小化
  • 重試耗盡後,錯誤會持久化到 checkpoint 供後續檢查

5.3 Serde 選擇

格式優點缺點
JSON(默認)穩定、可讀、易調試體積較大
MessagePack更小、更快存在線程安全問題

建議:生產環境優先使用 JSON,除非有明確的性能瓶頸且經過充分測試。

5.4 監控與可觀測性

import logging

# 啟用 LangGraph 日誌
logging.getLogger("langgraph").setLevel(logging.INFO)

# 追蹤 checkpoint 操作
class MonitoredCheckpointer(PostgresSaver):
    def put(self, config, checkpoint, metadata):
        logger.info(f"Saving checkpoint: {checkpoint['id']}")
        start = time.time()
        result = super().put(config, checkpoint, metadata)
        logger.info(f"Checkpoint saved in {time.time() - start:.2f}s")
        return result

總結:選型決策樹

需要跨對話記憶?
├── 是 → 使用 Store(PostgresStore for 生產)
└── 否 → 僅使用 Checkpointer

需要 Time-travel?
├── 是 → 使用 PostgresSaver
└── 否 → 考慮 ShallowPostgresSaver 降低存儲

部署環境?
├── 開發/測試 → InMemorySaver + InMemoryStore
├── 單機生產 → SqliteSaver(輕量)或 PostgresSaver
└── 分佈式生產 → PostgresSaver + PostgresStore + 連接池

LangGraph 的持久化系統設計精良,提供了從開發到生產的完整解決方案。理解 Checkpointer(短期記憶)與 Store(長期記憶)的區別,選擇適合的 Backend,配置好連接池和錯誤處理,你就能構建出真正健壯的生產級 AI Agent。


參考資源