LangGraph 深度解析:以 Pregel 計算模型驅動的 Agent 編排框架
LangGraph 是 LangChain 團隊開發的低階 Agent 編排框架,專為建構長時間運行、有狀態的 Agent 工作流而設計。它受 Google Pregel 論文與 Apache Beam 啟發,將 Agent 的執行過程建模為有向圖上的狀態流動。本文從原始碼角度,深入剖析其核心架構與關鍵設計決策。
一、定位與差異化
LangGraph 雖然出自 LangChain 團隊,但並非 LangChain 的一部分,可以獨立使用。它的核心定位是一個低階編排框架:不抽象 prompt 或架構模式,而是提供狀態管理、持久化、中斷/恢復等基礎設施。
其差異化的核心價值在於四個方面:
- Durable Execution:透過 Checkpoint 實現持久執行,即使在中途失敗也能恢復
- Human-in-the-loop:內建 interrupt/resume 機制,支援人機協作
- Comprehensive Memory:Channel 系統提供細粒度的狀態管理
- Debug with LangSmith:整合 LangSmith 實現 Time-travel debugging
在 Monorepo 結構上,LangGraph 將核心框架(StateGraph、Pregel 引擎、Channels)、高階預建 API(create_react_agent、ToolNode)、Checkpoint 實作(SQLite、PostgreSQL)以及 SDK 與 CLI 分別組織在 libs/ 目錄下的獨立套件中。
二、Pregel 計算模型(BSP)
LangGraph 最獨特的設計選擇,是採用 Pregel 論文中的 BSP(Bulk Synchronous Parallel) 模型作為執行引擎。這在 AI Agent 框架中是極為罕見的。
執行步驟
每一步(step)包含三個階段:
- Plan(規劃):決定本步要執行哪些節點。第一步選擇訂閱 input channel 的節點;後續步選擇訂閱上一步更新過的 channel 的節點。
- Execute(執行):並行執行所有選中的節點。節點執行期間,channel 的更新對其他節點不可見。
- Update(更新):將所有節點的寫入統一套用到 channel。
這個迴圈持續進行,直到沒有節點被選中(圖完成),或達到最大步數限制(recursion_limit,預設 25)。
兩大抽象:Actor 與 Channel
整個 Pregel 模型建立在兩個核心抽象之上:
Actor(PregelNode) 是計算單元,訂閱一個或多個 channel,讀取 channel 資料作為輸入,執行邏輯後寫入 channel。它實作了 LangChain 的 Runnable 介面。
Channel(BaseChannel) 是 Actor 之間通訊的媒介,每個 channel 定義了三個型別:Value(儲存型別)、Update(更新型別)與 Checkpoint(序列化型別)。核心方法包含 update(values)、get()、checkpoint() 和 from_checkpoint()。
BSP 模型帶來的關鍵優勢是確定性:步內寫入互不可見,消除了競態條件;透過 checkpoint 可以精確重現任何一步的狀態;天然支援並行執行。
三、Channel 系統
Channel 是 LangGraph 狀態管理的基石,定義在 langgraph/channels/ 目錄下。框架提供了多種 channel 型別,每種承擔不同的語意角色。
LastValue(預設 channel)
class LastValue(BaseChannel[Value, Value, Value]):
"""儲存最後收到的值,每步只能收到一個值。"""
這是最基本的 channel,StateGraph 中沒有指定 reducer 的欄位預設使用。關鍵限制是每步只接受一個更新,如果多個節點同時寫入同一個 LastValue channel,會拋出 InvalidUpdateError。
BinaryOperatorAggregate(Reducer channel)
class BinaryOperatorAggregate(BaseChannel[Value, Value, Value]):
"""透過二元運算子將新值與當前值聚合。"""
當 State 欄位使用 Annotated[type, reducer_func] 時建立。reducer 的簽名為 (current_value, new_value) -> updated_value,支援 Overwrite 類型直接繞過 reducer 覆寫值。初始化時會嘗試以型別的零值建立預設值,例如 list() 產生 []、int() 產生 0。
EphemeralValue(暫時值 channel)
用於觸發節點執行的一次性訊號,步後清除。每個 StateGraph 節點的 branch:to:{node_name} channel 使用此型別,這正是邊觸發的內部實作機制。
其他 channel 型別
- Topic:PubSub 風格的 channel,支援跨步累積或每步重新收集,用於 Send/TASKS 機制中的任務分發
- AnyValue:儲存最後收到的值,假設多個值都相等,不做唯一性檢查
- NamedBarrierValue:等待多個命名來源都寫入後才觸發,用於多源等待邊
add_edge([start1, start2], end)的實現
四、StateGraph:圖的建構
StateGraph 是使用者最常接觸的 API,定義在 langgraph/graph/state.py:
class StateGraph(Generic[StateT, ContextT, InputT, OutputT]):
"""節點透過讀寫共享狀態進行通訊的圖。
每個節點的簽名是 State -> Partial<State>。
"""
Schema 到 Channel 的映射
StateGraph 的精巧之處在於自動從 TypedDict 或 Pydantic BaseModel 的型別提示推導 channel 配置:
class State(TypedDict):
messages: Annotated[list, add_messages] # -> BinaryOperatorAggregate
count: int # -> LastValue
_get_channels(schema) 函式解析型別提示,將 Annotated[list, operator.add] 映射為 BinaryOperatorAggregate,將無 Annotated 的欄位映射為 LastValue。這種型別驅動的設計讓使用者無需手動管理 channel。
節點與邊
節點定義支援多種簽名,最基本的形式是接受 state 並返回部分更新的函式:
def my_node(state: State) -> dict:
return {"count": state["count"] + 1}
也支援帶 config、StreamWriter、Store 或 Runtime 的進階簽名。
邊的類型包括:
# 直接邊
graph.add_edge("node_a", "node_b")
# 條件邊
graph.add_conditional_edges(
"source_node",
path_function, # (state) -> str | list[str]
path_map={"key": "node_name"}
)
# 多源等待邊(透過 NamedBarrierValue channel 實現)
graph.add_edge(["node_a", "node_b"], "node_c")
編譯過程
compile() 方法將 StateGraph 轉換為可執行的 Pregel 圖:
compiled = graph.compile(
checkpointer=InMemorySaver(),
interrupt_before=["human_node"],
interrupt_after=["tools"],
)
編譯過程依序完成驗證、建立 channel(包括每個節點的 branch:to:{node_name} EphemeralValue channel 和 TASKS channel)、將節點包裝為 PregelNode(設定 triggers、channels、writers),最終返回 CompiledStateGraph 物件。
五、Pregel 執行引擎
定義在 langgraph/pregel/main.py 的 Pregel 類別與 _loop.py 的 PregelLoop 構成了實際的執行引擎。
執行流程
Pregel.invoke() / stream()
-> PregelLoop
-> 載入 checkpoint
-> channels_from_checkpoint()
-> loop:
-> prepare_next_tasks() # 決定要執行的 task
-> should_interrupt() # 檢查是否需要中斷
-> PregelRunner.tick() # 並行執行 task
-> apply_writes() # 套用寫入到 channel
-> create_checkpoint() # 建立新的 checkpoint
-> 輸出結果
版本追蹤機制
這是 Pregel 引擎最精妙的部分。Checkpoint 中維護兩個版本映射:
checkpoint["channel_versions"] # {channel_name: version}
checkpoint["versions_seen"] # {node_name: {channel_name: version}}
每次 channel 更新,其版本遞增。每個節點記錄它已「看過」的 channel 版本。prepare_next_tasks 透過比對 versions_seen 與 channel_versions,精確判定哪些節點需要重新執行。
apply_writes
寫入套用的流程嚴謹且具確定性:按 task path 排序確保穩定順序、更新 versions_seen、消費已讀的 channel、按 channel 分組寫入、呼叫每個 channel 的 update(values) 方法、更新 channel_versions,最後回傳被更新的 channel 集合以供下一步的 prepare_next_tasks 使用。
六、Checkpoint 系統
Checkpoint 是 LangGraph 實現 Durable Execution 的核心機制。
結構與介面
class Checkpoint(TypedDict):
v: int # 版本號(目前 v4)
id: str # UUID(單調遞增,可排序)
ts: str # ISO 8601 時間戳
channel_values: dict # channel 序列化值
channel_versions: dict # channel 版本號
versions_seen: dict # 節點已見版本
框架提供三種 Checkpoint Saver 實作:InMemorySaver(測試用)、SqliteSaver 和 PostgresSaver(生產環境)。序列化使用 JsonPlusSerializer,支援基本 JSON 型別、datetime、UUID、Pydantic models、LangChain messages 等,也支援加密序列化。
Pending Writes 與 Durable Execution
CheckpointTuple 中的 pending_writes 記錄尚未被 checkpoint 消化的寫入。這是實現持久執行的關鍵:即使在節點執行中途失敗,已完成的寫入不會丟失,重啟後可以從失敗點繼續。
Time-Travel Debugging
透過 get_state_history() 可以遍歷圖的所有歷史 checkpoint,並透過指定特定 checkpoint_id 回到任意歷史狀態重新執行:
# 遍歷歷史
for state in graph.get_state_history(config):
print(state.values, state.metadata["step"])
# 回溯到特定 checkpoint
config = {"configurable": {"thread_id": "1", "checkpoint_id": "some-old-id"}}
graph.invoke(None, config) # 從該 checkpoint 恢復執行
七、Human-in-the-loop(人機協作)
LangGraph 提供了完整的人機協作機制,這是其相較於其他 Agent 框架最突出的能力之一。
interrupt() 函式
def human_review_node(state):
answer = interrupt("你確定要執行此操作嗎?")
if answer == "yes":
return {"approved": True}
return {"approved": False}
其工作原理值得深入理解:
- 節點呼叫
interrupt("請確認"),第一次呼叫時拋出GraphInterrupt例外 - 中斷值被序列化到 checkpoint 的 pending writes 中
- 客戶端收到中斷資訊後,使用
Command(resume="yes")恢復 - 節點從頭重新執行,但
interrupt()這次從 scratchpad 的 resume 列表中取得恢復值 - 返回恢復值,繼續正常執行
這裡有一個重要的設計細節:resume 時節點會完全重新執行,因此節點函式中 interrupt() 之前的邏輯必須是冪等的。
interrupt_before / interrupt_after
除了程式化的 interrupt() 呼叫,編譯時也可以宣告式地指定中斷點:
graph.compile(
checkpointer=InMemorySaver(),
interrupt_before=["tools"], # 在 tools 節點執行前暫停
interrupt_after=["agent"], # 在 agent 節點執行後暫停
)
框架透過 should_interrupt() 函式檢查是否有 channel 自上次中斷以來被更新,以及被觸發的節點是否在 interrupt 列表中,藉此決定是否暫停。
八、Streaming 系統
LangGraph 支援多種串流模式,透過 StreamMode 控制:
StreamMode = Literal[
"values", # 每步後發出完整狀態
"updates", # 每步後只發出變更
"messages", # LLM token 逐一發出
"custom", # 自訂 stream(透過 StreamWriter)
"debug", # checkpoints + tasks
]
stream_mode="messages" 透過 StreamMessagesHandler 攔截 LangChain callback 中的 LLM token 輸出,實現即時串流。stream_mode="custom" 則允許節點透過注入的 StreamWriter 手動寫入自訂資料。
九、ToolNode 與 create_react_agent
ToolNode
ToolNode 定義在 langgraph/prebuilt/tool_node.py,是預建的工具執行節點:
class ToolNode(RunnableCallable):
"""在 LangGraph 工作流中執行工具的節點。"""
核心功能包括:多個 tool call 的平行執行、多種錯誤處理策略(bool/str/Callable/Exception type)、透過 InjectedState 將圖狀態注入工具、支援 Command 回傳值控制圖的流程,以及 wrap_tool_call 中介層模式。
@tool
def my_tool(query: str, state: Annotated[dict, InjectedState]) -> str:
"""需要存取圖狀態的工具。"""
return f"Messages count: {len(state['messages'])}"
create_react_agent
create_react_agent 定義在 langgraph/prebuilt/chat_agent_executor.py,是理解 LangGraph 實作模式的最佳範例。其圖結構為:
START -> agent -> should_continue -> tools -> agent -> ... -> END
核心路由邏輯判斷 AI 回覆中是否包含 tool calls:
def should_continue(state):
last_message = state["messages"][-1]
if not isinstance(last_message, AIMessage) or not last_message.tool_calls:
return END
else:
# v2: 每個 tool call 透過 Send 獨立分發
return [
Send("tools", ToolCallWithContext(
tool_call=call, state=state,
))
for call in last_message.tool_calls
]
v2 版本使用 Send API 將每個 tool call 分發到獨立的 ToolNode 實例,相較 v1 的批次處理,每個 tool call 可以獨立被中斷/恢復,實現更精細的 Human-in-the-loop 控制。
十、Functional API
除了圖形化的 StateGraph API,LangGraph 還提供了更接近一般程式設計風格的 Functional API。
@task 與 @entrypoint
@task
def add_one(a: int) -> int:
return a + 1
@entrypoint(checkpointer=InMemorySaver())
def my_workflow(topic: str, *, previous: str | None = None) -> str:
essay = compose_essay(topic).result()
review = interrupt({"question": "Review this?", "essay": essay})
return {"essay": essay, "review": review}
@task 裝飾的函式呼叫後返回 SyncAsyncFuture,支援並行執行,也支援 retry_policy 和 cache_policy。@entrypoint 定義工作流入口,支援 previous 參數存取同一 thread 上次的返回值,以及 entrypoint.final() 分離返回值和持久化值。
Functional API 在內部同樣編譯為 Pregel 圖,享有完整的 checkpoint 和 interrupt/resume 支援,但以更直覺的函式風格撰寫。
十一、Send 機制(Map-Reduce)
Send 是實現 map-reduce 模式的關鍵機制:
class Send:
node: str # 目標節點名稱
arg: Any # 傳給節點的輸入
def continue_to_jokes(state):
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
graph.add_conditional_edges(START, continue_to_jokes)
Send 允許同一節點被多次並行調用,每次使用不同的輸入。Send 物件被寫入 TASKS channel(Topic(Send, accumulate=False)),在下一步由 prepare_next_tasks 處理為獨立的 task。
十二、技術亮點
Pregel 模型的選擇
以圖計算領域的 BSP 模型作為 Agent 執行引擎,在 AI Agent 框架中是獨一無二的選擇。這帶來了確定性(步內寫入互不可見)、可重放性(精確重現任何步的狀態)和可擴展性(天然並行)。
Channel 作為一等公民
Channel 不僅是資料容器,更是控制流的核心。trigger 機制透過 channel 更新觸發節點執行,版本追蹤精確知道每個節點「看過」哪些資料,而不同 channel 型別提供了不同的聚合語意(覆寫、聚合、暫存、屏障)。
Command 統一控制流
Command 類型統一了狀態更新(update)、節點導航(goto)、中斷恢復(resume)和跨圖通訊(graph=Command.PARENT),是 v0.2+ 引入的重要抽象,取代了先前 conditional edges 在某些場景下的使用。
型別驅動的 Schema 推導
Annotated[type, reducer] 的設計讓使用者在定義 TypedDict 時就完成了 channel 的配置,既直覺又強大。框架自動處理 channel 建立、預設值初始化和序列化邏輯。
十三、限制與觀察
學習曲線
Pregel 計算模型對不熟悉圖計算的開發者而言抽象度較高。Channel 系統的內部運作需要深入理解才能有效除錯。StateGraph API 雖然簡化了表面使用,但底層 Pregel 的行為(版本追蹤、step 概念、BSP 的可見性規則)在遇到問題時仍需理解。
LangChain 依賴
核心依賴 langchain-core(RunnableConfig、Messages、Callbacks)。雖然可以不使用 LangChain 的 LLM 抽象,但 message 型別系統深度綁定。create_react_agent 需要 BaseChatModel 介面,這使得與非 LangChain 生態的 LLM 客戶端整合需要額外的適配工作。
效能考量
每步都需要序列化/反序列化 checkpoint(如果啟用),channel 版本比對是 O(channels * nodes) 的操作,Send 機制的 map-reduce 會產生大量 task。在高頻呼叫或大規模狀態的場景下,這些開銷可能值得關注。
快速演進中的 API
框架仍在快速演進,多個 API 經歷了 deprecation 周期:MessageGraph 被 StateGraph + messages key 取代、config_schema 被 context_schema 取代、create_react_agent 從 prebuilt 移至 langchain.agents。checkpoint 格式也已經迭代到 v4,每次升級都伴隨遷移需求。
除錯挑戰
錯誤堆疊中包含大量框架內部幀(Pregel loop、runner、executor),InvalidUpdateError 的錯誤訊息有時不夠清晰,加上 BSP 模型中 channel 狀態在步內的不可見性,都可能讓開發者在除錯時感到困惑。Time-travel debugging 和 LangSmith 整合在一定程度上緩解了這個問題,但前提是要正確配置 checkpointer。
十四、總結
LangGraph 以 Pregel 計算模型為基礎,建構了一套在 Agent 框架中獨樹一幟的架構。Channel 系統提供了精確的狀態管理,Checkpoint 機制實現了真正的持久執行,而 interrupt/resume 則讓人機協作成為一等能力。
對於需要建構複雜、長時間運行、需要人類介入的 Agent 工作流的團隊而言,LangGraph 提供了其他框架難以匹敵的基礎設施。但其學習曲線和快速演進的 API 也意味著團隊需要投入時間深入理解底層機制,才能真正發揮這套框架的全部潛力。
選擇 LangGraph 的關鍵考量在於:你的 Agent 系統是否真正需要持久執行、人機協作和精細的狀態管理?如果答案是肯定的,那麼理解 Pregel 模型和 Channel 系統的投資將會得到豐厚回報。