LangGraph 狀態持久化深度解析:Checkpointer 與 Memory Store 完整指南
LangGraph 狀態持久化深度解析:Checkpointer 與 Memory Store 完整指南
建構生產級 AI Agent 時,狀態持久化是不可或缺的核心能力。LangGraph 提供了兩套互補的持久化機制:Checkpointer(短期記憶)和 Memory Store(長期記憶),讓你的 Agent 能夠記住對話上下文、支援 Human-in-the-loop 工作流程、實現跨對話的知識累積。
本文將深入剖析這兩套系統的架構設計、Backend 選型策略,以及生產環境部署的實戰經驗。
核心概念:短期記憶 vs 長期記憶
在深入技術細節之前,先釐清 LangGraph 的記憶模型:
| 特性 | Checkpointer(短期記憶) | Store(長期記憶) |
|---|---|---|
| 保存範圍 | 單一對話線程(thread) | 跨對話、自定義 namespace |
| 保存內容 | Graph 狀態快照 | 任意結構化數據 |
| 主要用途 | 對話上下文、中斷恢復、Time-travel | 用戶偏好、組織知識、學習記錄 |
| 生命週期 | 隨線程存在 | 持久化直到明確刪除 |
關鍵理解:Checkpointer 讓你的 Agent 在單次對話中保持上下文;Store 讓你的 Agent 能夠「記住」用戶並在多次交互中學習成長。
Part 1: Checkpointer 深度解析
1.1 核心架構:BaseCheckpointSaver
所有 Checkpointer 都實現 BaseCheckpointSaver 接口,提供四個核心方法:
class BaseCheckpointSaver:
def put(self, config, checkpoint, metadata) -> None:
"""儲存 checkpoint 及其元數據"""
def put_writes(self, config, writes, task_id) -> None:
"""儲存中間寫入(pending writes)"""
def get_tuple(self, config) -> Optional[CheckpointTuple]:
"""根據配置獲取 checkpoint tuple"""
def list(self, config, options) -> Iterator[CheckpointTuple]:
"""列出符合條件的 checkpoints(按時間降序)"""
1.2 Checkpoint 數據結構
每個 checkpoint 包含完整的 Graph 狀態快照:
{
"v": 1, # 版本號
"ts": "2024-07-31T20:14:19.804150+00:00", # 時間戳
"id": "1ef4f797-8335-6428-8001-8a1503f9b875", # 唯一 ID
"channel_values": { # 通道值(實際狀態)
"messages": [...],
"current_step": "agent"
},
"channel_versions": {...}, # 通道版本追蹤
"versions_seen": {...}, # 已見版本
"pending_sends": [] # 待發送訊息
}
關鍵標識符:
- thread_id:標識一系列相關的 checkpoints,實現多租戶對話
- checkpoint_id:唯一標識特定的狀態快照,用於 Time-travel
1.3 Backend 選型指南
MemorySaver:開發與測試
from langgraph.checkpoint.memory import InMemorySaver
memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)
# 使用 thread_id 調用
config = {"configurable": {"thread_id": "user-123"}}
response = graph.invoke({"messages": [...]}, config=config)
適用場景:
- 本地開發和調試
- 單元測試
- 快速原型驗證
限制:進程重啟後數據丟失,不適合生產環境。
SqliteSaver:輕量級持久化
pip install langgraph-checkpoint-sqlite
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
# 同步版本
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
# 異步版本
async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
適用場景:
- 單機部署的小型應用
- 實驗性項目
- 對寫入性能要求不高的場景
限制:SQLite 的寫入鎖會限制並發性能。
PostgresSaver:生產環境首選
pip install langgraph-checkpoint-postgres "psycopg[binary,pool]"
from psycopg_pool import ConnectionPool
from psycopg.rows import dict_row
from langgraph.checkpoint.postgres import PostgresSaver
# 連接池配置(關鍵參數)
connection_kwargs = {
"autocommit": True, # 必須啟用
"prepare_threshold": 0,
"row_factory": dict_row, # 必須設置
}
pool = ConnectionPool(
conninfo="postgresql://user:password@localhost:5432/dbname",
max_size=20,
kwargs=connection_kwargs
)
checkpointer = PostgresSaver(pool)
checkpointer.setup() # 首次使用時創建表結構
graph = builder.compile(checkpointer=checkpointer)
生產環境要點:
- autocommit=True:確保
.setup()正確提交表結構 - row_factory=dict_row:checkpointer 使用字典式訪問
- 連接池大小:根據並發量調整,建議 max_size=20
ShallowPostgresSaver:存儲優化版
如果不需要歷史記錄,可使用 ShallowPostgresSaver 僅保留最新 checkpoint:
from langgraph.checkpoint.postgres.shallow import ShallowPostgresSaver
shallow_checkpointer = ShallowPostgresSaver.from_conn_string(
"postgresql://user:password@localhost:5432/dbname"
)
適用場景:不需要 Time-travel、僅需最新狀態的應用,可大幅降低存儲成本。
Part 2: Time-travel 與狀態回溯
Time-travel 是 LangGraph 最強大的功能之一,允許你回溯到歷史狀態並探索替代執行路徑。
2.1 獲取執行歷史
# 方法一:使用 graph.get_state_history()
for state in graph.get_state_history(config):
print(f"Checkpoint: {state.config['configurable']['checkpoint_id']}")
print(f"State: {state.values}")
print(f"Next: {state.next}") # 下一個要執行的節點
print("---")
# 方法二:使用 LangGraph SDK(適用於 LangGraph Platform)
from langgraph_sdk import get_client
client = get_client()
history = await client.threads.get_history(thread_id="thread_1")
2.2 回溯到特定 Checkpoint
# 指定 checkpoint_id 回溯
config = {
"configurable": {
"thread_id": "thread_1",
"checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875"
}
}
# 從該 checkpoint 繼續執行(會創建新的分支)
result = graph.invoke({"messages": [new_message]}, config=config)
重要:所有回溯操作都會在歷史中創建新的分支(fork),不會覆蓋原有歷史。
2.3 修改狀態並重放
# 獲取當前狀態
current_state = graph.get_state(config)
# 修改狀態(會創建新的 checkpoint)
graph.update_state(
config,
{"messages": [AIMessage(content="修正後的回覆")]},
as_node="agent" # 以特定節點身份更新
)
# 從修改後的狀態繼續執行
result = graph.invoke(None, config=config)
Part 3: Human-in-the-Loop 工作流程
3.1 使用 interrupt() 實現人工介入
LangGraph 0.2.31+ 推薦使用 interrupt() 函數:
from langgraph.types import interrupt, Command
def approval_node(state):
# 最佳實踐:在節點開始處中斷
user_decision = interrupt({
"question": "是否批准此操作?",
"options": ["approve", "reject", "modify"]
})
if user_decision == "approve":
return {"status": "approved", "proceed": True}
elif user_decision == "reject":
return {"status": "rejected", "proceed": False}
else:
# 處理修改請求
return {"status": "pending_modification"}
3.2 恢復中斷的執行
# 用戶做出決定後,恢復執行
result = graph.invoke(
Command(resume="approve"),
config={"configurable": {"thread_id": "approval-123"}}
)
3.3 設置斷點
# 在特定節點前自動暫停
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["sensitive_action", "final_confirmation"]
)
關鍵理解:中斷的線程不佔用運行時資源,僅佔用存儲空間。可以在數月後、不同機器上恢復執行。
Part 4: Memory Store 長期記憶系統
4.1 BaseStore 接口
from langgraph.store.base import BaseStore
class BaseStore:
def put(self, namespace: tuple, key: str, value: dict) -> None:
"""存儲數據"""
def get(self, namespace: tuple, key: str) -> Optional[Item]:
"""檢索特定數據"""
def search(self, namespace: tuple, query: str, limit: int = 10) -> List[Item]:
"""語義搜索(需配置 embedding)"""
def delete(self, namespace: tuple, key: str) -> None:
"""刪除數據"""
def list_namespaces(self, prefix: tuple = None) -> List[tuple]:
"""列出命名空間"""
4.2 Namespace 概念
Namespace 是元組形式的層級路徑,類似文件夾結構:
# 按用戶組織
namespace = ("users", "user_123")
store.put(namespace, "preferences", {"theme": "dark", "language": "zh"})
# 按組織組織
namespace = ("orgs", "company_456", "knowledge")
store.put(namespace, "policy_v1", {"content": "..."})
# 混合組織
namespace = ("orgs", "company_456", "users", "user_123")
4.3 InMemoryStore 與 PostgresStore
InMemoryStore:開發用
from langgraph.store.memory import InMemoryStore
from langchain.embeddings import init_embeddings
# 基礎用法
store = InMemoryStore()
# 支持語義搜索
store = InMemoryStore(
index={
"dims": 1536,
"embed": init_embeddings("openai:text-embedding-3-small")
}
)
PostgresStore:生產用
pip install langgraph-checkpoint-postgres
from langgraph.store.postgres import PostgresStore
store = PostgresStore.from_conn_string(
"postgresql://user:password@localhost:5432/dbname"
)
store.setup()
# 配置語義搜索
store = PostgresStore(
connection_string="postgresql://...",
index={
"dims": 1536,
"embed": init_embeddings("openai:text-embedding-3-small"),
"fields": ["text"], # 指定要嵌入的字段
}
)
4.4 Cross-thread 記憶共享
Store 的核心價值在於跨對話共享記憶:
from langgraph.graph import StateGraph
from langgraph.store.postgres import AsyncPostgresStore
from langgraph.checkpoint.postgres import AsyncPostgresSaver
# 同時使用 Checkpointer 和 Store
graph = builder.compile(
checkpointer=AsyncPostgresSaver(...), # 短期記憶
store=AsyncPostgresStore(...) # 長期記憶
)
# 在節點中使用 store
def personalized_response(state, *, store):
user_id = state["user_id"]
namespace = ("users", user_id)
# 讀取用戶歷史偏好
prefs = store.get(namespace, "preferences")
history = store.search(namespace, query=state["query"], limit=5)
# 更新用戶記錄
store.put(namespace, "last_interaction", {
"timestamp": datetime.now().isoformat(),
"topic": state["topic"]
})
return {"personalized_context": prefs, "relevant_history": history}
實際應用場景:
- 用戶偏好管理:記住語言、主題、溝通風格
- 組織知識庫:共享公司政策、產品信息
- 學習記錄:追蹤用戶反饋,持續改進回覆品質
Part 5: 生產環境最佳實踐
5.1 連接池配置
from psycopg_pool import ConnectionPool, AsyncConnectionPool
# 同步連接池
sync_pool = ConnectionPool(
conninfo="postgresql://user:password@localhost:5432/dbname",
max_size=20,
min_size=5,
kwargs={
"autocommit": True,
"prepare_threshold": 0,
"row_factory": dict_row,
}
)
# 異步連接池
async_pool = AsyncConnectionPool(
conninfo="postgresql://user:password@localhost:5432/dbname",
max_size=20,
min_size=5,
kwargs={
"autocommit": True,
"prepare_threshold": 0,
"row_factory": dict_row,
}
)
5.2 錯誤處理與重試策略
from langgraph.pregel import RetryPolicy
# 配置重試策略
retry_policy = RetryPolicy(
max_attempts=3,
# LangGraph 默認使用指數退避
)
# 對特定節點應用重試
builder.add_node("llm_call", call_llm, retry=retry_policy)
builder.add_node("tool_call", execute_tool, retry=retry_policy)
最佳實踐:
- 僅對瞬態錯誤重試(5xx、429 Too Many Requests)
- 保持重試範圍最小化
- 重試耗盡後,錯誤會持久化到 checkpoint 供後續檢查
5.3 Serde 選擇
| 格式 | 優點 | 缺點 |
|---|---|---|
| JSON(默認) | 穩定、可讀、易調試 | 體積較大 |
| MessagePack | 更小、更快 | 存在線程安全問題 |
建議:生產環境優先使用 JSON,除非有明確的性能瓶頸且經過充分測試。
5.4 監控與可觀測性
import logging
# 啟用 LangGraph 日誌
logging.getLogger("langgraph").setLevel(logging.INFO)
# 追蹤 checkpoint 操作
class MonitoredCheckpointer(PostgresSaver):
def put(self, config, checkpoint, metadata):
logger.info(f"Saving checkpoint: {checkpoint['id']}")
start = time.time()
result = super().put(config, checkpoint, metadata)
logger.info(f"Checkpoint saved in {time.time() - start:.2f}s")
return result
總結:選型決策樹
需要跨對話記憶?
├── 是 → 使用 Store(PostgresStore for 生產)
└── 否 → 僅使用 Checkpointer
需要 Time-travel?
├── 是 → 使用 PostgresSaver
└── 否 → 考慮 ShallowPostgresSaver 降低存儲
部署環境?
├── 開發/測試 → InMemorySaver + InMemoryStore
├── 單機生產 → SqliteSaver(輕量)或 PostgresSaver
└── 分佈式生產 → PostgresSaver + PostgresStore + 連接池
LangGraph 的持久化系統設計精良,提供了從開發到生產的完整解決方案。理解 Checkpointer(短期記憶)與 Store(長期記憶)的區別,選擇適合的 Backend,配置好連接池和錯誤處理,你就能構建出真正健壯的生產級 AI Agent。