LangGraph 深度解析:以 Pregel 計算模型驅動的 Agent 編排框架


LangGraph 是 LangChain 團隊開發的低階 Agent 編排框架,專為建構長時間運行、有狀態的 Agent 工作流而設計。它受 Google Pregel 論文與 Apache Beam 啟發,將 Agent 的執行過程建模為有向圖上的狀態流動。本文從原始碼角度,深入剖析其核心架構與關鍵設計決策。


一、定位與差異化

LangGraph 雖然出自 LangChain 團隊,但並非 LangChain 的一部分,可以獨立使用。它的核心定位是一個低階編排框架:不抽象 prompt 或架構模式,而是提供狀態管理、持久化、中斷/恢復等基礎設施。

其差異化的核心價值在於四個方面:

  • Durable Execution:透過 Checkpoint 實現持久執行,即使在中途失敗也能恢復
  • Human-in-the-loop:內建 interrupt/resume 機制,支援人機協作
  • Comprehensive Memory:Channel 系統提供細粒度的狀態管理
  • Debug with LangSmith:整合 LangSmith 實現 Time-travel debugging

在 Monorepo 結構上,LangGraph 將核心框架(StateGraph、Pregel 引擎、Channels)、高階預建 API(create_react_agent、ToolNode)、Checkpoint 實作(SQLite、PostgreSQL)以及 SDK 與 CLI 分別組織在 libs/ 目錄下的獨立套件中。


二、Pregel 計算模型(BSP)

LangGraph 最獨特的設計選擇,是採用 Pregel 論文中的 BSP(Bulk Synchronous Parallel) 模型作為執行引擎。這在 AI Agent 框架中是極為罕見的。

執行步驟

每一步(step)包含三個階段:

  1. Plan(規劃):決定本步要執行哪些節點。第一步選擇訂閱 input channel 的節點;後續步選擇訂閱上一步更新過的 channel 的節點。
  2. Execute(執行):並行執行所有選中的節點。節點執行期間,channel 的更新對其他節點不可見。
  3. Update(更新):將所有節點的寫入統一套用到 channel。

這個迴圈持續進行,直到沒有節點被選中(圖完成),或達到最大步數限制(recursion_limit,預設 25)。

兩大抽象:Actor 與 Channel

整個 Pregel 模型建立在兩個核心抽象之上:

Actor(PregelNode) 是計算單元,訂閱一個或多個 channel,讀取 channel 資料作為輸入,執行邏輯後寫入 channel。它實作了 LangChain 的 Runnable 介面。

Channel(BaseChannel) 是 Actor 之間通訊的媒介,每個 channel 定義了三個型別:Value(儲存型別)、Update(更新型別)與 Checkpoint(序列化型別)。核心方法包含 update(values)get()checkpoint()from_checkpoint()

BSP 模型帶來的關鍵優勢是確定性:步內寫入互不可見,消除了競態條件;透過 checkpoint 可以精確重現任何一步的狀態;天然支援並行執行。


三、Channel 系統

Channel 是 LangGraph 狀態管理的基石,定義在 langgraph/channels/ 目錄下。框架提供了多種 channel 型別,每種承擔不同的語意角色。

LastValue(預設 channel)

class LastValue(BaseChannel[Value, Value, Value]):
    """儲存最後收到的值,每步只能收到一個值。"""

這是最基本的 channel,StateGraph 中沒有指定 reducer 的欄位預設使用。關鍵限制是每步只接受一個更新,如果多個節點同時寫入同一個 LastValue channel,會拋出 InvalidUpdateError

BinaryOperatorAggregate(Reducer channel)

class BinaryOperatorAggregate(BaseChannel[Value, Value, Value]):
    """透過二元運算子將新值與當前值聚合。"""

當 State 欄位使用 Annotated[type, reducer_func] 時建立。reducer 的簽名為 (current_value, new_value) -> updated_value,支援 Overwrite 類型直接繞過 reducer 覆寫值。初始化時會嘗試以型別的零值建立預設值,例如 list() 產生 []int() 產生 0

EphemeralValue(暫時值 channel)

用於觸發節點執行的一次性訊號,步後清除。每個 StateGraph 節點的 branch:to:{node_name} channel 使用此型別,這正是邊觸發的內部實作機制。

其他 channel 型別

  • Topic:PubSub 風格的 channel,支援跨步累積或每步重新收集,用於 Send/TASKS 機制中的任務分發
  • AnyValue:儲存最後收到的值,假設多個值都相等,不做唯一性檢查
  • NamedBarrierValue:等待多個命名來源都寫入後才觸發,用於多源等待邊 add_edge([start1, start2], end) 的實現

四、StateGraph:圖的建構

StateGraph 是使用者最常接觸的 API,定義在 langgraph/graph/state.py

class StateGraph(Generic[StateT, ContextT, InputT, OutputT]):
    """節點透過讀寫共享狀態進行通訊的圖。
    每個節點的簽名是 State -> Partial<State>。
    """

Schema 到 Channel 的映射

StateGraph 的精巧之處在於自動從 TypedDict 或 Pydantic BaseModel 的型別提示推導 channel 配置:

class State(TypedDict):
    messages: Annotated[list, add_messages]  # -> BinaryOperatorAggregate
    count: int                                # -> LastValue

_get_channels(schema) 函式解析型別提示,將 Annotated[list, operator.add] 映射為 BinaryOperatorAggregate,將無 Annotated 的欄位映射為 LastValue。這種型別驅動的設計讓使用者無需手動管理 channel。

節點與邊

節點定義支援多種簽名,最基本的形式是接受 state 並返回部分更新的函式:

def my_node(state: State) -> dict:
    return {"count": state["count"] + 1}

也支援帶 config、StreamWriter、Store 或 Runtime 的進階簽名。

邊的類型包括:

# 直接邊
graph.add_edge("node_a", "node_b")

# 條件邊
graph.add_conditional_edges(
    "source_node",
    path_function,          # (state) -> str | list[str]
    path_map={"key": "node_name"}
)

# 多源等待邊(透過 NamedBarrierValue channel 實現)
graph.add_edge(["node_a", "node_b"], "node_c")

編譯過程

compile() 方法將 StateGraph 轉換為可執行的 Pregel 圖:

compiled = graph.compile(
    checkpointer=InMemorySaver(),
    interrupt_before=["human_node"],
    interrupt_after=["tools"],
)

編譯過程依序完成驗證、建立 channel(包括每個節點的 branch:to:{node_name} EphemeralValue channel 和 TASKS channel)、將節點包裝為 PregelNode(設定 triggers、channels、writers),最終返回 CompiledStateGraph 物件。


五、Pregel 執行引擎

定義在 langgraph/pregel/main.py 的 Pregel 類別與 _loop.py 的 PregelLoop 構成了實際的執行引擎。

執行流程

Pregel.invoke() / stream()
  -> PregelLoop
    -> 載入 checkpoint
    -> channels_from_checkpoint()
    -> loop:
      -> prepare_next_tasks()     # 決定要執行的 task
      -> should_interrupt()       # 檢查是否需要中斷
      -> PregelRunner.tick()      # 並行執行 task
      -> apply_writes()           # 套用寫入到 channel
      -> create_checkpoint()      # 建立新的 checkpoint
    -> 輸出結果

版本追蹤機制

這是 Pregel 引擎最精妙的部分。Checkpoint 中維護兩個版本映射:

checkpoint["channel_versions"]  # {channel_name: version}
checkpoint["versions_seen"]     # {node_name: {channel_name: version}}

每次 channel 更新,其版本遞增。每個節點記錄它已「看過」的 channel 版本。prepare_next_tasks 透過比對 versions_seenchannel_versions,精確判定哪些節點需要重新執行。

apply_writes

寫入套用的流程嚴謹且具確定性:按 task path 排序確保穩定順序、更新 versions_seen、消費已讀的 channel、按 channel 分組寫入、呼叫每個 channel 的 update(values) 方法、更新 channel_versions,最後回傳被更新的 channel 集合以供下一步的 prepare_next_tasks 使用。


六、Checkpoint 系統

Checkpoint 是 LangGraph 實現 Durable Execution 的核心機制。

結構與介面

class Checkpoint(TypedDict):
    v: int                    # 版本號(目前 v4)
    id: str                   # UUID(單調遞增,可排序)
    ts: str                   # ISO 8601 時間戳
    channel_values: dict      # channel 序列化值
    channel_versions: dict    # channel 版本號
    versions_seen: dict       # 節點已見版本

框架提供三種 Checkpoint Saver 實作:InMemorySaver(測試用)、SqliteSaver 和 PostgresSaver(生產環境)。序列化使用 JsonPlusSerializer,支援基本 JSON 型別、datetime、UUID、Pydantic models、LangChain messages 等,也支援加密序列化。

Pending Writes 與 Durable Execution

CheckpointTuple 中的 pending_writes 記錄尚未被 checkpoint 消化的寫入。這是實現持久執行的關鍵:即使在節點執行中途失敗,已完成的寫入不會丟失,重啟後可以從失敗點繼續。

Time-Travel Debugging

透過 get_state_history() 可以遍歷圖的所有歷史 checkpoint,並透過指定特定 checkpoint_id 回到任意歷史狀態重新執行:

# 遍歷歷史
for state in graph.get_state_history(config):
    print(state.values, state.metadata["step"])

# 回溯到特定 checkpoint
config = {"configurable": {"thread_id": "1", "checkpoint_id": "some-old-id"}}
graph.invoke(None, config)  # 從該 checkpoint 恢復執行

七、Human-in-the-loop(人機協作)

LangGraph 提供了完整的人機協作機制,這是其相較於其他 Agent 框架最突出的能力之一。

interrupt() 函式

def human_review_node(state):
    answer = interrupt("你確定要執行此操作嗎?")
    if answer == "yes":
        return {"approved": True}
    return {"approved": False}

其工作原理值得深入理解:

  1. 節點呼叫 interrupt("請確認"),第一次呼叫時拋出 GraphInterrupt 例外
  2. 中斷值被序列化到 checkpoint 的 pending writes 中
  3. 客戶端收到中斷資訊後,使用 Command(resume="yes") 恢復
  4. 節點從頭重新執行,但 interrupt() 這次從 scratchpad 的 resume 列表中取得恢復值
  5. 返回恢復值,繼續正常執行

這裡有一個重要的設計細節:resume 時節點會完全重新執行,因此節點函式中 interrupt() 之前的邏輯必須是冪等的。

interrupt_before / interrupt_after

除了程式化的 interrupt() 呼叫,編譯時也可以宣告式地指定中斷點:

graph.compile(
    checkpointer=InMemorySaver(),
    interrupt_before=["tools"],    # 在 tools 節點執行前暫停
    interrupt_after=["agent"],     # 在 agent 節點執行後暫停
)

框架透過 should_interrupt() 函式檢查是否有 channel 自上次中斷以來被更新,以及被觸發的節點是否在 interrupt 列表中,藉此決定是否暫停。


八、Streaming 系統

LangGraph 支援多種串流模式,透過 StreamMode 控制:

StreamMode = Literal[
    "values",       # 每步後發出完整狀態
    "updates",      # 每步後只發出變更
    "messages",     # LLM token 逐一發出
    "custom",       # 自訂 stream(透過 StreamWriter)
    "debug",        # checkpoints + tasks
]

stream_mode="messages" 透過 StreamMessagesHandler 攔截 LangChain callback 中的 LLM token 輸出,實現即時串流。stream_mode="custom" 則允許節點透過注入的 StreamWriter 手動寫入自訂資料。


九、ToolNode 與 create_react_agent

ToolNode

ToolNode 定義在 langgraph/prebuilt/tool_node.py,是預建的工具執行節點:

class ToolNode(RunnableCallable):
    """在 LangGraph 工作流中執行工具的節點。"""

核心功能包括:多個 tool call 的平行執行、多種錯誤處理策略(bool/str/Callable/Exception type)、透過 InjectedState 將圖狀態注入工具、支援 Command 回傳值控制圖的流程,以及 wrap_tool_call 中介層模式。

@tool
def my_tool(query: str, state: Annotated[dict, InjectedState]) -> str:
    """需要存取圖狀態的工具。"""
    return f"Messages count: {len(state['messages'])}"

create_react_agent

create_react_agent 定義在 langgraph/prebuilt/chat_agent_executor.py,是理解 LangGraph 實作模式的最佳範例。其圖結構為:

START -> agent -> should_continue -> tools -> agent -> ... -> END

核心路由邏輯判斷 AI 回覆中是否包含 tool calls:

def should_continue(state):
    last_message = state["messages"][-1]
    if not isinstance(last_message, AIMessage) or not last_message.tool_calls:
        return END
    else:
        # v2: 每個 tool call 透過 Send 獨立分發
        return [
            Send("tools", ToolCallWithContext(
                tool_call=call, state=state,
            ))
            for call in last_message.tool_calls
        ]

v2 版本使用 Send API 將每個 tool call 分發到獨立的 ToolNode 實例,相較 v1 的批次處理,每個 tool call 可以獨立被中斷/恢復,實現更精細的 Human-in-the-loop 控制。


十、Functional API

除了圖形化的 StateGraph API,LangGraph 還提供了更接近一般程式設計風格的 Functional API。

@task 與 @entrypoint

@task
def add_one(a: int) -> int:
    return a + 1

@entrypoint(checkpointer=InMemorySaver())
def my_workflow(topic: str, *, previous: str | None = None) -> str:
    essay = compose_essay(topic).result()
    review = interrupt({"question": "Review this?", "essay": essay})
    return {"essay": essay, "review": review}

@task 裝飾的函式呼叫後返回 SyncAsyncFuture,支援並行執行,也支援 retry_policy 和 cache_policy。@entrypoint 定義工作流入口,支援 previous 參數存取同一 thread 上次的返回值,以及 entrypoint.final() 分離返回值和持久化值。

Functional API 在內部同樣編譯為 Pregel 圖,享有完整的 checkpoint 和 interrupt/resume 支援,但以更直覺的函式風格撰寫。


十一、Send 機制(Map-Reduce)

Send 是實現 map-reduce 模式的關鍵機制:

class Send:
    node: str  # 目標節點名稱
    arg: Any   # 傳給節點的輸入

def continue_to_jokes(state):
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

graph.add_conditional_edges(START, continue_to_jokes)

Send 允許同一節點被多次並行調用,每次使用不同的輸入。Send 物件被寫入 TASKS channel(Topic(Send, accumulate=False)),在下一步由 prepare_next_tasks 處理為獨立的 task。


十二、技術亮點

Pregel 模型的選擇

以圖計算領域的 BSP 模型作為 Agent 執行引擎,在 AI Agent 框架中是獨一無二的選擇。這帶來了確定性(步內寫入互不可見)、可重放性(精確重現任何步的狀態)和可擴展性(天然並行)。

Channel 作為一等公民

Channel 不僅是資料容器,更是控制流的核心。trigger 機制透過 channel 更新觸發節點執行,版本追蹤精確知道每個節點「看過」哪些資料,而不同 channel 型別提供了不同的聚合語意(覆寫、聚合、暫存、屏障)。

Command 統一控制流

Command 類型統一了狀態更新(update)、節點導航(goto)、中斷恢復(resume)和跨圖通訊(graph=Command.PARENT),是 v0.2+ 引入的重要抽象,取代了先前 conditional edges 在某些場景下的使用。

型別驅動的 Schema 推導

Annotated[type, reducer] 的設計讓使用者在定義 TypedDict 時就完成了 channel 的配置,既直覺又強大。框架自動處理 channel 建立、預設值初始化和序列化邏輯。


十三、限制與觀察

學習曲線

Pregel 計算模型對不熟悉圖計算的開發者而言抽象度較高。Channel 系統的內部運作需要深入理解才能有效除錯。StateGraph API 雖然簡化了表面使用,但底層 Pregel 的行為(版本追蹤、step 概念、BSP 的可見性規則)在遇到問題時仍需理解。

LangChain 依賴

核心依賴 langchain-core(RunnableConfig、Messages、Callbacks)。雖然可以不使用 LangChain 的 LLM 抽象,但 message 型別系統深度綁定。create_react_agent 需要 BaseChatModel 介面,這使得與非 LangChain 生態的 LLM 客戶端整合需要額外的適配工作。

效能考量

每步都需要序列化/反序列化 checkpoint(如果啟用),channel 版本比對是 O(channels * nodes) 的操作,Send 機制的 map-reduce 會產生大量 task。在高頻呼叫或大規模狀態的場景下,這些開銷可能值得關注。

快速演進中的 API

框架仍在快速演進,多個 API 經歷了 deprecation 周期:MessageGraphStateGraph + messages key 取代、config_schemacontext_schema 取代、create_react_agent 從 prebuilt 移至 langchain.agents。checkpoint 格式也已經迭代到 v4,每次升級都伴隨遷移需求。

除錯挑戰

錯誤堆疊中包含大量框架內部幀(Pregel loop、runner、executor),InvalidUpdateError 的錯誤訊息有時不夠清晰,加上 BSP 模型中 channel 狀態在步內的不可見性,都可能讓開發者在除錯時感到困惑。Time-travel debugging 和 LangSmith 整合在一定程度上緩解了這個問題,但前提是要正確配置 checkpointer。


十四、總結

LangGraph 以 Pregel 計算模型為基礎,建構了一套在 Agent 框架中獨樹一幟的架構。Channel 系統提供了精確的狀態管理,Checkpoint 機制實現了真正的持久執行,而 interrupt/resume 則讓人機協作成為一等能力。

對於需要建構複雜、長時間運行、需要人類介入的 Agent 工作流的團隊而言,LangGraph 提供了其他框架難以匹敵的基礎設施。但其學習曲線和快速演進的 API 也意味著團隊需要投入時間深入理解底層機制,才能真正發揮這套框架的全部潛力。

選擇 LangGraph 的關鍵考量在於:你的 Agent 系統是否真正需要持久執行、人機協作和精細的狀態管理?如果答案是肯定的,那麼理解 Pregel 模型和 Channel 系統的投資將會得到豐厚回報。