LangGraph Checkpoint 深度解析:狀態持久化、Time-travel 與 Human-in-the-Loop 實踐


本文深度探索 LangGraph 的 Checkpoint 機制,涵蓋資料結構、儲存後端、恢復策略、以及與 Store 的互補關係。


為什麼需要 Checkpoint?

在 AI Agent 系統中,狀態管理是核心挑戰之一。當一個複雜的對話或工作流程執行到一半時,可能會遇到:

  • 用戶中斷對話,稍後繼續
  • 系統需要審核敏感操作
  • 執行失敗需要從特定狀態恢復
  • 需要「回到過去」查看歷史決策

LangGraph 的 Checkpoint 機制 就是為了解決這些問題而生。它讓你的 Graph 狀態可以被持久化、恢復、回溯。


一、存什麼內容 (What Gets Stored)

1.1 Checkpoint 資料結構

每個 Checkpoint 本質上是一個完整的 Graph 狀態快照:

{
    "v": 1,                                              # 版本號
    "ts": "2024-07-31T20:14:19.804150+00:00",            # 時間戳
    "id": "1ef4f797-8335-6428-8001-8a1503f9b875",        # 唯一 ID
    "channel_values": {                                  # 通道值(實際狀態)
        "messages": [...],
        "current_step": "agent",
        "context": {...}
    },
    "channel_versions": {...},                           # 通道版本追蹤
    "versions_seen": {...},                              # 已見版本
    "pending_sends": []                                  # 待發送訊息
}

1.2 核心欄位說明

欄位用途
thread_id標識一系列相關的 Checkpoint,實現多租戶對話隔離
checkpoint_id唯一標識特定的狀態快照,用於 Time-travel
channel_valuesGraph 中所有通道的當前值(狀態)
channel_versions每個通道的版本號,用於追蹤變化
pending_sends待處理的訊息/事件,確保不會遺漏

1.3 Checkpointer vs Store:短期與長期記憶

LangGraph 有兩套互補的持久化機制:

特性Checkpointer(短期記憶)Store(長期記憶)
保存範圍單一對話線程(thread)跨對話、自定義 namespace
保存內容Graph 狀態快照任意結構化數據
主要用途對話上下文、中斷恢復、Time-travel用戶偏好、組織知識、學習記錄
生命週期隨線程存在持久化直到明確刪除
查詢方式依賴 thread_id支持語義搜索

簡單來說:

  • Checkpointer 記住「對話進行到哪裡」
  • Store 記住「用戶喜歡什麼」、「公司政策是什麼」

二、怎麼存 (How Checkpoints Are Written)

2.1 CheckpointSaver 抽象層

所有 Checkpointer 都實現 BaseCheckpointSaver 接口:

class BaseCheckpointSaver:
    def put(self, config, checkpoint, metadata) -> None:
        """儲存 checkpoint 及其元數據"""

    def put_writes(self, config, writes, task_id) -> None:
        """儲存中間寫入(pending writes)"""

    def get_tuple(self, config) -> Optional[CheckpointTuple]:
        """根據配置獲取 checkpoint tuple"""

    def list(self, config, options) -> Iterator[CheckpointTuple]:
        """列出符合條件的 checkpoints(按時間降序)"""

2.2 後端實現選擇

MemorySaver:開發與測試

from langgraph.checkpoint.memory import InMemorySaver

memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "user-123"}}
response = graph.invoke({"messages": [...]}, config=config)

適用場景:本地開發、調試、單元測試 限制:進程重啟後數據丟失

SqliteSaver:輕量級持久化

pip install langgraph-checkpoint-sqlite
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

# 同步版本
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

# 異步版本
async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

適用場景:單機部署的小型應用、實驗性項目 限制:SQLite 的寫入鎖會限制並發性能

PostgresSaver:生產環境首選

pip install langgraph-checkpoint-postgres "psycopg[binary,pool]"
from psycopg_pool import ConnectionPool
from psycopg.rows import dict_row
from langgraph.checkpoint.postgres import PostgresSaver

# 連接池配置(關鍵參數)
connection_kwargs = {
    "autocommit": True,        # 必須啟用
    "prepare_threshold": 0,
    "row_factory": dict_row,   # 必須設置
}

pool = ConnectionPool(
    conninfo="postgresql://user:password@localhost:5432/dbname",
    max_size=20,
    kwargs=connection_kwargs
)

checkpointer = PostgresSaver(pool)
checkpointer.setup()  # 首次使用時創建表結構

graph = builder.compile(checkpointer=checkpointer)

生產環境要點

  1. autocommit=True:確保 .setup() 正確提交表結構
  2. row_factory=dict_row:checkpointer 使用字典式訪問
  3. 連接池大小:根據並發量調整,建議 max_size=20

ShallowPostgresSaver:存儲優化版

如果不需要歷史記錄,可使用 ShallowPostgresSaver 僅保留最新 checkpoint:

from langgraph.checkpoint.postgres.shallow import ShallowPostgresSaver

shallow_checkpointer = ShallowPostgresSaver.from_conn_string(
    "postgresql://user:password@localhost:5432/dbname"
)

適用場景:不需要 Time-travel、僅需最新狀態的應用,可大幅降低存儲成本


三、怎麼 load 出來 (How Checkpoints Are Loaded)

3.1 獲取執行歷史

# 方法一:使用 graph.get_state_history()
for state in graph.get_state_history(config):
    print(f"Checkpoint: {state.config['configurable']['checkpoint_id']}")
    print(f"State: {state.values}")
    print(f"Next: {state.next}")  # 下一個要執行的節點
    print("---")

3.2 Time-travel:回溯到特定 Checkpoint

# 指定 checkpoint_id 回溯
config = {
    "configurable": {
        "thread_id": "thread-1",
        "checkpoint_id": "1ef4f797-8335-6428-8001-8a1503f9b875"
    }
}

# 從該 checkpoint 繼續執行(會創建新的分支)
result = graph.invoke({"messages": [new_message]}, config=config)

重要:所有回溯操作都會在歷史中創建新的分支(fork),不會覆蓋原有歷史。

3.3 修改狀態並重放

# 獲取當前狀態
current_state = graph.get_state(config)

# 修改狀態(會創建新的 checkpoint)
graph.update_state(
    config,
    {"messages": [AIMessage(content="修正後的回覆")]},
    as_node="agent"  # 以特定節點身份更新
)

# 從修改後的狀態繼續執行
result = graph.invoke(None, config=config)

四、怎麼去檢視、管理 (How to Inspect and Manage)

4.1 Human-in-the-Loop 工作流程

LangGraph 0.2.31+ 推薦使用 interrupt() 函數實現人工介入:

from langgraph.types import interrupt, Command

def approval_node(state):
    # 最佳實踐:在節點開始處中斷
    user_decision = interrupt({
        "question": "是否批准此操作?",
        "options": ["approve", "reject", "modify"],
        "current_data": state
    })

    if user_decision == "approve":
        return {"status": "approved", "proceed": True}
    elif user_decision == "reject":
        return {"status": "rejected", "proceed": False}
    else:
        return {"status": "pending_modification"}

恢復中斷的執行

# 用戶做出決定後,恢復執行
result = graph.invoke(
    Command(resume="approve"),
    config={"configurable": {"thread_id": "approval-123"}}
)

關鍵理解:中斷的線程不佔用運行時資源,僅佔用存儲空間。可以在數月後、不同機器上恢復執行。

4.2 設置斷點

# 在特定節點前自動暫停
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["sensitive_action", "final_confirmation"]
)

# 或在特定節點後暫停
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_after=["review_step"]
)

4.3 Store:長期記憶系統

當你需要跨對話記住某些信息時,可以使用 Store:

from langgraph.store.postgres import PostgresStore

store = PostgresStore.from_conn_string(
    "postgresql://user:password@localhost:5432/dbname"
)
store.setup()

# 按用戶組織數據
namespace = ("users", "user_123")
store.put(namespace, "preferences", {"theme": "dark", "language": "zh"})

# 語義搜索
results = store.search(
    ("users", "user_123"),
    query="用戶之前問過關於訂單的問題",
    limit=5
)

五、實際應用場景

5.1 場景一:對話恢復

用戶在使用 AI 助手時中斷,稍後回來繼續對話:

# 用戶回來了,恢復對話
config = {"configurable": {"thread_id": "user-session-456"}}

# 獲取最近狀態
current_state = graph.get_state(config)
print(f"上次聊到:{current_state.values['messages'][-1].content}")

# 繼續對話
result = graph.invoke(
    {"messages": [HumanMessage(content="我之前說到哪裡了?")]},
    config=config
)

5.2 場景二:人工審核敏感操作

金融交易、刪除操作等需要人工確認:

def transfer_money(state):
    amount = state["amount"]
    recipient = state["recipient"]

    # 中斷等待審核
    approval = interrupt({
        "type": "transfer_approval",
        "amount": amount,
        "recipient": recipient,
        "risk_level": "high" if amount > 10000 else "low"
    })

    if approval["action"] == "approve":
        return {"transfer_result": execute_transfer(amount, recipient)}
    else:
        return {"transfer_result": "cancelled", "reason": approval["reason"]}

5.3 場景三:錯誤恢復與調試

當工作流程失敗時,可以回到失敗前的狀態修改後重試:

# 查看所有 checkpoint
for checkpoint in graph.get_state_history(config):
    print(f"ID: {checkpoint.config['configurable']['checkpoint_id']}")
    print(f"Next: {checkpoint.next}")
    print(f"Status: {'failed' if checkpoint.metadata.get('failed') else 'success'}")

# 回到失敗前的狀態,修改後重試
recovery_config = {
    "configurable": {
        "thread_id": "workflow-123",
        "checkpoint_id": "correct-checkpoint-id"
    }
}
graph.update_state(recovery_config, {"corrected_input": ...})
result = graph.invoke(None, config=recovery_config)

六、選型決策樹

需要跨對話記憶?
├── 是 → 使用 Store(PostgresStore for 生產)
└── 否 → 僅使用 Checkpointer

需要 Time-travel?
├── 是 → 使用 PostgresSaver
└── 否 → 考慮 ShallowPostgresSaver 降低存儲

部署環境?
├── 開發/測試 → InMemorySaver + InMemoryStore
├── 單機生產 → SqliteSaver(輕量)或 PostgresSaver
└── 分佈式生產 → PostgresSaver + PostgresStore + 連接池

七、監控與可觀測性

import logging
import time

# 啟用 LangGraph 日誌
logging.getLogger("langgraph").setLevel(logging.INFO)

# 追蹤 checkpoint 操作
class MonitoredCheckpointer(PostgresSaver):
    def put(self, config, checkpoint, metadata):
        logger.info(f"Saving checkpoint: {checkpoint['id']}")
        start = time.time()
        result = super().put(config, checkpoint, metadata)
        logger.info(f"Checkpoint saved in {time.time() - start:.2f}s")
        return result

    def get_tuple(self, config):
        start = time.time()
        result = super().get_tuple(config)
        logger.info(f"Checkpoint loaded in {time.time() - start:.2f}s")
        return result

八、結語:Checkpoint 的核心價值

LangGraph 的 Checkpoint 機制為 AI Agent 系統帶來了:

  1. 可恢復性:任何中斷都可以在任意時間、任意機器恢復
  2. Time-travel:完整執行歷史支援審計和錯誤回溯
  3. Human-in-the-Loop:安全地引入人工審核
  4. 多租戶隔離:thread_id 實現乾淨的狀態隔離
  5. 靈活的 Backend:從記憶體到 Postgres,支援不同部署場景

掌握 Checkpoint 機制,是構建生產級 AI Agent 系統的關鍵一步。


參考資源