LangGraph Checkpoint 深度解析:狀態持久化、Time-travel 與 Human-in-the-Loop 實踐
本文深度探索 LangGraph 的 Checkpoint 機制,涵蓋資料結構、儲存後端、恢復策略、以及與 Store 的互補關係。
為什麼需要 Checkpoint?
在 AI Agent 系統中,狀態管理是核心挑戰之一。當一個複雜的對話或工作流程執行到一半時,可能會遇到:
- 用戶中斷對話,稍後繼續
- 系統需要審核敏感操作
- 執行失敗需要從特定狀態恢復
- 需要「回到過去」查看歷史決策
LangGraph 的 Checkpoint 機制 就是為了解決這些問題而生。它讓你的 Graph 狀態可以被持久化、恢復、回溯。
一、存什麼內容 (What Gets Stored)
1.1 Checkpoint 資料結構
每個 Checkpoint 本質上是一個完整的 Graph 狀態快照:
{
"v": 1, # 版本號
"ts": "2024-07-31T20:14:19.804150+00:00", # 時間戳
"id": "1ef4f797-8335-6428-8001-8a1503f9b875", # 唯一 ID
"channel_values": { # 通道值(實際狀態)
"messages": [...],
"current_step": "agent",
"context": {...}
},
"channel_versions": {...}, # 通道版本追蹤
"versions_seen": {...}, # 已見版本
"pending_sends": [] # 待發送訊息
}
1.2 核心欄位說明
| 欄位 | 用途 |
|---|---|
thread_id | 標識一系列相關的 Checkpoint,實現多租戶對話隔離 |
checkpoint_id | 唯一標識特定的狀態快照,用於 Time-travel |
channel_values | Graph 中所有通道的當前值(狀態) |
channel_versions | 每個通道的版本號,用於追蹤變化 |
pending_sends | 待處理的訊息/事件,確保不會遺漏 |
1.3 Checkpointer vs Store:短期與長期記憶
LangGraph 有兩套互補的持久化機制:
| 特性 | Checkpointer(短期記憶) | Store(長期記憶) |
|---|---|---|
| 保存範圍 | 單一對話線程(thread) | 跨對話、自定義 namespace |
| 保存內容 | Graph 狀態快照 | 任意結構化數據 |
| 主要用途 | 對話上下文、中斷恢復、Time-travel | 用戶偏好、組織知識、學習記錄 |
| 生命週期 | 隨線程存在 | 持久化直到明確刪除 |
| 查詢方式 | 依賴 thread_id | 支持語義搜索 |
簡單來說:
- Checkpointer 記住「對話進行到哪裡」
- Store 記住「用戶喜歡什麼」、「公司政策是什麼」
二、怎麼存 (How Checkpoints Are Written)
2.1 CheckpointSaver 抽象層
所有 Checkpointer 都實現 BaseCheckpointSaver 接口:
class BaseCheckpointSaver:
def put(self, config, checkpoint, metadata) -> None:
"""儲存 checkpoint 及其元數據"""
def put_writes(self, config, writes, task_id) -> None:
"""儲存中間寫入(pending writes)"""
def get_tuple(self, config) -> Optional[CheckpointTuple]:
"""根據配置獲取 checkpoint tuple"""
def list(self, config, options) -> Iterator[CheckpointTuple]:
"""列出符合條件的 checkpoints(按時間降序)"""
2.2 後端實現選擇
MemorySaver:開發與測試
from langgraph.checkpoint.memory import InMemorySaver
memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "user-123"}}
response = graph.invoke({"messages": [...]}, config=config)
適用場景:本地開發、調試、單元測試 限制:進程重啟後數據丟失
SqliteSaver:輕量級持久化
pip install langgraph-checkpoint-sqlite
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
# 同步版本
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
# 異步版本
async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
適用場景:單機部署的小型應用、實驗性項目 限制:SQLite 的寫入鎖會限制並發性能
PostgresSaver:生產環境首選
pip install langgraph-checkpoint-postgres "psycopg[binary,pool]"
from psycopg_pool import ConnectionPool
from psycopg.rows import dict_row
from langgraph.checkpoint.postgres import PostgresSaver
# 連接池配置(關鍵參數)
connection_kwargs = {
"autocommit": True, # 必須啟用
"prepare_threshold": 0,
"row_factory": dict_row, # 必須設置
}
pool = ConnectionPool(
conninfo="postgresql://user:password@localhost:5432/dbname",
max_size=20,
kwargs=connection_kwargs
)
checkpointer = PostgresSaver(pool)
checkpointer.setup() # 首次使用時創建表結構
graph = builder.compile(checkpointer=checkpointer)
生產環境要點:
autocommit=True:確保.setup()正確提交表結構row_factory=dict_row:checkpointer 使用字典式訪問- 連接池大小:根據並發量調整,建議
max_size=20
ShallowPostgresSaver:存儲優化版
如果不需要歷史記錄,可使用 ShallowPostgresSaver 僅保留最新 checkpoint:
from langgraph.checkpoint.postgres.shallow import ShallowPostgresSaver
shallow_checkpointer = ShallowPostgresSaver.from_conn_string(
"postgresql://user:password@localhost:5432/dbname"
)
適用場景:不需要 Time-travel、僅需最新狀態的應用,可大幅降低存儲成本
三、怎麼 load 出來 (How Checkpoints Are Loaded)
3.1 獲取執行歷史
# 方法一:使用 graph.get_state_history()
for state in graph.get_state_history(config):
print(f"Checkpoint: {state.config['configurable']['checkpoint_id']}")
print(f"State: {state.values}")
print(f"Next: {state.next}") # 下一個要執行的節點
print("---")
3.2 Time-travel:回溯到特定 Checkpoint
# 指定 checkpoint_id 回溯
config = {
"configurable": {
"thread_id": "thread-1",
"checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875"
}
}
# 從該 checkpoint 繼續執行(會創建新的分支)
result = graph.invoke({"messages": [new_message]}, config=config)
重要:所有回溯操作都會在歷史中創建新的分支(fork),不會覆蓋原有歷史。
3.3 修改狀態並重放
# 獲取當前狀態
current_state = graph.get_state(config)
# 修改狀態(會創建新的 checkpoint)
graph.update_state(
config,
{"messages": [AIMessage(content="修正後的回覆")]},
as_node="agent" # 以特定節點身份更新
)
# 從修改後的狀態繼續執行
result = graph.invoke(None, config=config)
四、怎麼去檢視、管理 (How to Inspect and Manage)
4.1 Human-in-the-Loop 工作流程
LangGraph 0.2.31+ 推薦使用 interrupt() 函數實現人工介入:
from langgraph.types import interrupt, Command
def approval_node(state):
# 最佳實踐:在節點開始處中斷
user_decision = interrupt({
"question": "是否批准此操作?",
"options": ["approve", "reject", "modify"],
"current_data": state
})
if user_decision == "approve":
return {"status": "approved", "proceed": True}
elif user_decision == "reject":
return {"status": "rejected", "proceed": False}
else:
return {"status": "pending_modification"}
恢復中斷的執行
# 用戶做出決定後,恢復執行
result = graph.invoke(
Command(resume="approve"),
config={"configurable": {"thread_id": "approval-123"}}
)
關鍵理解:中斷的線程不佔用運行時資源,僅佔用存儲空間。可以在數月後、不同機器上恢復執行。
4.2 設置斷點
# 在特定節點前自動暫停
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["sensitive_action", "final_confirmation"]
)
# 或在特定節點後暫停
graph = builder.compile(
checkpointer=checkpointer,
interrupt_after=["review_step"]
)
4.3 Store:長期記憶系統
當你需要跨對話記住某些信息時,可以使用 Store:
from langgraph.store.postgres import PostgresStore
store = PostgresStore.from_conn_string(
"postgresql://user:password@localhost:5432/dbname"
)
store.setup()
# 按用戶組織數據
namespace = ("users", "user_123")
store.put(namespace, "preferences", {"theme": "dark", "language": "zh"})
# 語義搜索
results = store.search(
("users", "user_123"),
query="用戶之前問過關於訂單的問題",
limit=5
)
五、實際應用場景
5.1 場景一:對話恢復
用戶在使用 AI 助手時中斷,稍後回來繼續對話:
# 用戶回來了,恢復對話
config = {"configurable": {"thread_id": "user-session-456"}}
# 獲取最近狀態
current_state = graph.get_state(config)
print(f"上次聊到:{current_state.values['messages'][-1].content}")
# 繼續對話
result = graph.invoke(
{"messages": [HumanMessage(content="我之前說到哪裡了?")]},
config=config
)
5.2 場景二:人工審核敏感操作
金融交易、刪除操作等需要人工確認:
def transfer_money(state):
amount = state["amount"]
recipient = state["recipient"]
# 中斷等待審核
approval = interrupt({
"type": "transfer_approval",
"amount": amount,
"recipient": recipient,
"risk_level": "high" if amount > 10000 else "low"
})
if approval["action"] == "approve":
return {"transfer_result": execute_transfer(amount, recipient)}
else:
return {"transfer_result": "cancelled", "reason": approval["reason"]}
5.3 場景三:錯誤恢復與調試
當工作流程失敗時,可以回到失敗前的狀態修改後重試:
# 查看所有 checkpoint
for checkpoint in graph.get_state_history(config):
print(f"ID: {checkpoint.config['configurable']['checkpoint_id']}")
print(f"Next: {checkpoint.next}")
print(f"Status: {'failed' if checkpoint.metadata.get('failed') else 'success'}")
# 回到失敗前的狀態,修改後重試
recovery_config = {
"configurable": {
"thread_id": "workflow-123",
"checkpoint_id": "correct-checkpoint-id"
}
}
graph.update_state(recovery_config, {"corrected_input": ...})
result = graph.invoke(None, config=recovery_config)
六、選型決策樹
需要跨對話記憶?
├── 是 → 使用 Store(PostgresStore for 生產)
└── 否 → 僅使用 Checkpointer
需要 Time-travel?
├── 是 → 使用 PostgresSaver
└── 否 → 考慮 ShallowPostgresSaver 降低存儲
部署環境?
├── 開發/測試 → InMemorySaver + InMemoryStore
├── 單機生產 → SqliteSaver(輕量)或 PostgresSaver
└── 分佈式生產 → PostgresSaver + PostgresStore + 連接池
七、監控與可觀測性
import logging
import time
# 啟用 LangGraph 日誌
logging.getLogger("langgraph").setLevel(logging.INFO)
# 追蹤 checkpoint 操作
class MonitoredCheckpointer(PostgresSaver):
def put(self, config, checkpoint, metadata):
logger.info(f"Saving checkpoint: {checkpoint['id']}")
start = time.time()
result = super().put(config, checkpoint, metadata)
logger.info(f"Checkpoint saved in {time.time() - start:.2f}s")
return result
def get_tuple(self, config):
start = time.time()
result = super().get_tuple(config)
logger.info(f"Checkpoint loaded in {time.time() - start:.2f}s")
return result
八、結語:Checkpoint 的核心價值
LangGraph 的 Checkpoint 機制為 AI Agent 系統帶來了:
- 可恢復性:任何中斷都可以在任意時間、任意機器恢復
- Time-travel:完整執行歷史支援審計和錯誤回溯
- Human-in-the-Loop:安全地引入人工審核
- 多租戶隔離:thread_id 實現乾淨的狀態隔離
- 靈活的 Backend:從記憶體到 Postgres,支援不同部署場景
掌握 Checkpoint 機制,是構建生產級 AI Agent 系統的關鍵一步。