LangGraph 架構深度解析:從 BSP 執行模型到有狀態 AI 工作流
0. 為什麼 LangGraph 值得深入研究?
LangGraph 不是另一個「LangChain 插件」,而是一個獨立的低級編排框架,其設計借鑒了 Google 的 Pregel 分布式圖處理算法。如果你想理解現代 AI Agent 工作流的底層機制,LangGraph 的設計是極佳的學習對象。
LangGraph 的核心價值:
- 基於 Pregel 的 BSP 執行模型:確定性、可恢復、無競態條件
- Channel-based 狀態管理:靈活的狀態聚合策略
- 原生持久化支持:自動 Checkpoint,支持時間旅行
- 人在迴圈:靈活的中斷/恢復機制
- 多代理支持:Supervisor、Network、Hierarchical 等模式
整體定位可以用一張圖表示:
flowchart TD
classDef pad fill:transparent,stroke:transparent,color:transparent;
subgraph LangChainCore["langchain-core"]
direction TB
core_pad[" "]:::pad
LLM["BaseChatModel"]
Tools["BaseTool"]
Messages["BaseMessage"]
end
subgraph LangGraph["LangGraph (編排層)"]
direction TB
lg_pad[" "]:::pad
StateGraph["StateGraph / MessageGraph"]
Pregel["Pregel Engine"]
Channels["Channels"]
Checkpoint["Checkpointer"]
end
subgraph UserApp["用戶應用"]
direction TB
user_pad[" "]:::pad
Agent["AI Agent"]
Workflow["Workflow"]
end
LangChainCore --> LangGraph
LangGraph --> UserApp
接下來深入探討 LangGraph 的核心架構。
1. 專案結構與規模
LangGraph 的核心代碼約有 15,000 行,集中在 Pregel 執行引擎:
langgraph/
├── libs/
│ ├── langgraph/langgraph/ # 核心庫 (~15K 行)
│ │ ├── graph/ # 圖定義 API
│ │ │ ├── state.py # StateGraph 類 (1,483 行)
│ │ │ ├── message.py # 消息處理和 add_messages
│ │ │ └── _branch.py # 條件邊邏輯
│ │ ├── pregel/ # 執行引擎 (~10.5K 行)
│ │ │ ├── main.py # Pregel 主類 (3,322 行)
│ │ │ ├── _loop.py # 執行循環 (1,328 行)
│ │ │ ├── _algo.py # 圖算法 (1,233 行)
│ │ │ ├── _runner.py # 任務執行器
│ │ │ ├── _read.py # 通道讀取
│ │ │ ├── _write.py # 通道寫入
│ │ │ └── _retry.py # 重試機制
│ │ ├── channels/ # 狀態通道
│ │ │ ├── base.py # 基礎通道接口
│ │ │ ├── binop.py # BinaryOperatorAggregate
│ │ │ ├── last_value.py # LastValue 通道
│ │ │ ├── topic.py # Topic 通道
│ │ │ └── ephemeral.py # 臨時通道
│ │ ├── managed/ # 受管理的值
│ │ ├── func/ # 函數式 API
│ │ ├── types.py # 核心類型定義 (584 行)
│ │ └── config.py # 配置管理
│ ├── checkpoint/ # 檢查點基礎接口
│ ├── checkpoint-sqlite/ # SQLite 實現
│ ├── checkpoint-postgres/ # PostgreSQL 實現
│ └── prebuilt/ # 預構建組件
├── docs/ # 文檔
└── tests/ # 測試套件
版本資訊:
- 許可證:MIT
- Python 要求:>= 3.9
- 核心依賴:langchain-core >= 0.3.0
2. Pregel / BSP 執行模型
2.1 什麼是 BSP?
Bulk Synchronous Parallel (BSP) 是 LangGraph 的核心執行模型,源自 Google Pregel 論文。其核心思想是:
- 執行分為多個超步 (Super-step)
- 每個超步內,節點並行執行
- 超步之間有全局同步點 (Barrier)
- 節點只能讀取上一超步的狀態快照
這個設計帶來的優勢:
- 確定性:相同輸入、相同路徑、相同結果
- 無競態條件:節點執行階段不會互相干擾
- 易恢復:任意超步都可作為檢查點恢復
2.2 超步執行流程
每個超步包含四個階段:
flowchart TD
Start([開始])
Plan["1. Plan<br/>決定活躍節點"]
Exec["2. Execute<br/>並行執行節點"]
Update["3. Update<br/>套用通道更新"]
Checkpoint["4. Checkpoint<br/>保存狀態"]
Loop{還有活躍節點?}
End([結束])
Start --> Plan --> Exec --> Update --> Checkpoint --> Loop
Loop -->|是| Plan
Loop -->|否| End
詳細說明:
1. Plan 階段
根據通道版本決定哪些節點需要執行:
# 節點激活條件:
# 存在某個通道 c,使得 channel_versions[c] > versions_seen[node][c]
checkpoint = {
"channel_versions": {
"input": 2,
"messages": 5,
"state": 3,
},
"versions_seen": {
"chatbot": {"input": 2, "messages": 4}, # chatbot 需要執行
"tools": {"messages": 5}, # tools 不需要執行
}
}
這種設計使得活躍節點判斷只需要 O(1) 時間,不需要掃描完整狀態。
2. Execute 階段
使用 ThreadPoolExecutor(同步)或原生 asyncio(異步)並行執行活躍節點:
# 關鍵代碼示意
class AsyncBackgroundExecutor:
def __init__(self, config):
self.loop = asyncio.get_running_loop()
self.semaphore = asyncio.Semaphore(max_concurrency)
async def submit(self, coro):
async with self.semaphore:
return await coro
關鍵設計:節點執行中只能讀取「上一超步」的通道狀態,所有寫入都緩衝到本地。
3. Update 階段
收集所有節點的寫入,通過 Channel 的聚合策略合併:
def apply_writes(checkpoint, channels, tasks):
for task in tasks:
for channel_name, value in task.writes:
channels[channel_name].update([value])
return updated_channels
4. Checkpoint 階段
保存當前狀態到持久化存儲(可選同步或異步):
checkpoint = {
"v": 1, # 版本
"id": "checkpoint_001", # 唯一 ID
"ts": "2025-12-17T10:00:00Z", # 時間戳
"channel_values": {...}, # 通道值
"channel_versions": {...}, # 通道版本
"versions_seen": {...}, # 節點已見版本
}
2.3 BSP 如何避免競態條件
讓我用一個具體例子說明:
假設場景:三個節點同時執行,都要讀寫同一個 channel counter
# 初始值: counter = 10
def node_A(state):
val = state["counter"] # 讀
return {"counter": val + 1} # 寫 +1
def node_B(state):
val = state["counter"] # 讀
return {"counter": val + 2} # 寫 +2
def node_C(state):
val = state["counter"] # 讀
return {"counter": val + 3} # 寫 +3
傳統共享內存方式(有競態條件):
時間軸 → T1 T2 T3 T4
Node A: 讀=10 寫=11
Node B: 讀=11 寫=13
Node C: 讀=13 寫=16
結果: counter = 16 (依賴執行順序)
問題:
- 執行順序不同 → 結果不同(不確定性)
- 需要加鎖 → 效能差
- A 和 B 同時讀到 10,都寫回去,一個會被覆蓋
BSP 模型(無競態條件):
flowchart TD
subgraph SuperStep["超步 N"]
Snapshot["快照: counter = 10"]
subgraph Parallel["並行執行"]
A["Node A: 讀=10 → 緩衝 +1"]
B["Node B: 讀=10 → 緩衝 +2"]
C["Node C: 讀=10 → 緩衝 +3"]
end
Barrier["Barrier (等待完成)"]
Apply["apply_writes()<br/>合併: 10 + 1 + 2 + 3 = 16"]
end
NextStep["超步 N+1: counter = 16"]
Snapshot --> Parallel --> Barrier --> Apply --> NextStep
關鍵差異:
| 傳統共享內存 | BSP 模型 |
|---|---|
| 讀到「當前」值(可能被改過) | 讀到「超步開始時的快照」 |
| 寫立即生效 | 寫入本地緩衝,超步結束後批量合併 |
| 需要鎖/原子操作 | 無需鎖,天然並行安全 |
| 結果依賴執行順序 | 結果確定性(與執行順序無關) |
3. Channel 狀態系統
3.1 Channel 的設計理念
LangGraph 不直接操作「變數」,而是操作「通道(Channel)」。每個 State 欄位背後,是一個對應的 Channel,負責定義「如何合併多個節點的更新」。
3.2 Channel 類型層次
BaseChannel[Value, Update, Checkpoint]
├── LastValue[V] # 保留最新值(默認)
├── BinaryOperatorAggregate[V] # 二元運算聚合
├── Topic[V] # 發佈-訂閱
├── NamedBarrierValue[V] # 同步屏障
├── EphemeralValue[V] # 臨時值(不持久化)
└── AnyValue[V] # 接受任何更新
3.3 使用 Annotated 定義 Reducer
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add
class State(TypedDict):
messages: Annotated[list, add] # 使用 add 作為 reducer → 追加
count: int # 無 reducer → 覆蓋 (LastValue)
items: Annotated[list, lambda a, b: a + b] # 自定義 reducer
builder = StateGraph(State)
3.4 BinaryOperatorAggregate 實現
class BinaryOperatorAggregate(BaseChannel[Value, Value, Value]):
"""使用二元運算符聚合多個更新"""
def __init__(self, operator: Callable[[Value, Value], Value]):
self.operator = operator
self.value = None
def update(self, values: Sequence[Value]) -> bool:
"""按順序套用所有更新"""
for value in values:
if self.value is None:
self.value = value
else:
self.value = self.operator(self.value, value)
return True
def get(self) -> Value:
return self.value
3.5 Channel 與 State 的對應關係
flowchart LR
subgraph State["State (TypedDict)"]
M["messages<br/>Annotated[list, add]"]
C["count<br/>int"]
end
subgraph Channels["Channels"]
MC["BinaryOperatorAggregate(add)"]
CC["LastValue"]
end
M --> MC
C --> CC
4. StateGraph:圖定義 API
4.1 基本用法
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[list, add_messages]
def chatbot(state: State):
response = llm.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state: State):
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return END
# 構建圖
builder = StateGraph(State)
builder.add_node("chatbot", chatbot)
builder.add_node("tools", tool_node)
# 定義邊
builder.add_edge(START, "chatbot")
builder.add_conditional_edges("chatbot", should_continue, {
"tools": "tools",
END: END
})
builder.add_edge("tools", "chatbot")
# 編譯
graph = builder.compile(checkpointer=checkpointer)
4.2 關鍵方法
| 方法 | 用途 | 示例 |
|---|---|---|
add_node(name, func) | 添加節點 | builder.add_node("chatbot", chatbot_func) |
add_edge(from, to) | 添加直接邊 | builder.add_edge("tools", "chatbot") |
add_conditional_edges(from, condition, mapping) | 添加條件路由 | 見上例 |
compile(checkpointer, interrupt_before, ...) | 編譯為可執行圖 | graph = builder.compile() |
4.3 圖的可視化
# 生成 Mermaid 圖
print(graph.get_graph().draw_mermaid())
# 生成 PNG 圖片
graph.get_graph().draw_mermaid_png(output_file_path="graph.png")
5. Checkpoint 持久化系統
5.1 Checkpoint 數據結構
class Checkpoint(TypedDict):
v: int # Checkpoint 版本
id: str # 單調遞增 ID
ts: str # ISO 時間戳
channel_values: dict[str, Any] # 通道值
channel_versions: dict[str, int] # 每個通道的版本
versions_seen: dict[str, dict[str, int]] # 節點已見版本
5.2 Checkpointer 實現
LangGraph 提供多種 Checkpointer:
# 內存(開發環境)
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
# SQLite(單機持久化)
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("./checkpoints.db")
# PostgreSQL(生產環境)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
)
5.3 時間旅行:回滾到任意檢查點
# 列出所有檢查點
config = {"configurable": {"thread_id": "user_123"}}
for checkpoint in graph.get_state_history(config):
print(f"ID: {checkpoint.config['configurable']['checkpoint_id']}")
print(f"State: {checkpoint.values}")
# 回滾到特定檢查點
old_config = {
"configurable": {
"thread_id": "user_123",
"checkpoint_id": "checkpoint_001"
}
}
state = graph.get_state(old_config)
# 從該檢查點繼續執行
for event in graph.stream(None, old_config):
print(event)
6. 人在迴圈 (Human-in-the-Loop)
6.1 靜態中斷點
在編譯時指定中斷位置:
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["tools"], # 在 tools 節點前中斷
interrupt_after=["chatbot"], # 在 chatbot 節點後中斷
)
6.2 動態中斷
在節點內動態決定是否中斷:
from langgraph.types import interrupt
def review_node(state: State):
# 動態中斷,等待人工輸入
approval = interrupt({
"question": "Approve this action?",
"data": state["pending_action"]
})
if approval == "yes":
return {"approved": True}
return {"approved": False}
6.3 中斷與恢復流程
flowchart TD
Start([開始]) --> Run["執行圖"]
Run --> Check{遇到中斷點?}
Check -->|否| Continue["繼續執行"]
Check -->|是| Save["保存 Checkpoint"]
Save --> Wait["等待人工輸入"]
Wait --> Resume["恢復執行"]
Resume --> Run
Continue --> End([結束])
6.4 恢復執行
config = {"configurable": {"thread_id": "user_123"}}
# 第一次執行 - 在中斷點停止
for event in graph.stream({"messages": [user_input]}, config):
print(event)
# 查看當前狀態
state = graph.get_state(config)
print(f"Next node: {state.next}")
# 方式一:直接恢復
for event in graph.stream(None, config):
print(event)
# 方式二:帶人工輸入恢復
from langgraph.types import Command
for event in graph.stream(Command(resume="approved"), config):
print(event)
# 方式三:修改狀態後恢復
graph.update_state(config, {"approved": True})
for event in graph.stream(None, config):
print(event)
7. 多代理架構
7.1 四種主要模式
flowchart LR
subgraph Network["Network 模式"]
N1["Agent 1"] <--> N2["Agent 2"]
N2 <--> N3["Agent 3"]
N1 <--> N3
end
subgraph Supervisor["Supervisor 模式"]
S["Supervisor"]
S --> A1["Agent 1"]
S --> A2["Agent 2"]
S --> A3["Agent 3"]
end
| 模式 | 描述 | 適用場景 |
|---|---|---|
| Network | 全連接,任意通信 | 協作討論、腦力激盪 |
| Supervisor | 中央監督者路由 | 任務分派、專家選擇 |
| Hierarchical | 多層監督者 | 複雜組織結構 |
| Handoffs | 直接切換 | 專家系統、狀態機 |
7.2 Supervisor 模式實現
from langgraph.types import Command
def supervisor(state: State):
"""監督者決定下一個執行的代理"""
response = llm.invoke(
f"Given the task: {state['task']}, "
f"which agent should handle it? "
f"Options: researcher, writer, coder"
)
return Command(goto=response.agent_name)
def researcher(state: State):
"""研究員代理"""
result = search_web(state["task"])
return {"research": result}
def writer(state: State):
"""寫手代理"""
article = llm.invoke(f"Write about: {state['research']}")
return {"article": article}
# 構建圖
builder = StateGraph(State)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("writer", writer)
builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", lambda x: x.goto)
builder.add_edge("researcher", "supervisor")
builder.add_edge("writer", END)
7.3 子圖支持
# 創建子圖
subgraph = create_research_subgraph()
# 將子圖作為節點
builder.add_node("research_workflow", subgraph)
# 子圖內可以跳轉到父圖
def subgraph_node(state: SubState):
return Command(
graph=Command.PARENT, # 跳轉到父圖
update={"result": state["result"]}
)
7.4 Send:動態任務分派
from langgraph.types import Send
def router(state: State):
"""動態創建多個並行任務"""
return [
Send("process", {"item": item})
for item in state["items"]
]
builder.add_conditional_edges("router", router)
8. 流式處理
8.1 流模式
| 模式 | 內容 | 用途 |
|---|---|---|
values | 每步完整狀態 | 調試、狀態檢視 |
updates | 增量更新 | 實時 UI 更新 |
messages | LLM token | 打字效果 |
custom | 自定義信號 | 進度條、狀態指示 |
debug | 詳細追蹤信息 | 開發調試 |
8.2 使用示例
# values 模式 - 完整狀態
for chunk in graph.stream(input, stream_mode="values"):
print(f"Full state: {chunk}")
# updates 模式 - 增量更新
for chunk in graph.stream(input, stream_mode="updates"):
print(f"Node: {chunk.keys()}, Update: {chunk}")
# messages 模式 - LLM token
for chunk in graph.stream(input, stream_mode="messages"):
if hasattr(chunk, 'content'):
print(chunk.content, end="", flush=True)
# 多模式組合
for chunk in graph.stream(input, stream_mode=["updates", "messages"]):
print(chunk)
8.3 自定義流信號
from langgraph.config import get_stream_writer
def processing_node(state: State):
writer = get_stream_writer()
# 發送進度信號
writer({"type": "progress", "value": 0})
for i, item in enumerate(state["items"]):
process(item)
writer({"type": "progress", "value": (i + 1) / len(state["items"]) * 100})
writer({"type": "progress", "value": 100})
return {"processed": True}
# 接收自定義信號
for chunk in graph.stream(input, stream_mode="custom"):
if chunk.get("type") == "progress":
print(f"Progress: {chunk['value']}%")
9. 函數式 API
除了 Graph API,LangGraph 還提供更 Pythonic 的函數式 API:
9.1 @task 和 @entrypoint
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
@task
def process_item(item: str) -> str:
"""可被追蹤和檢查點的任務"""
return expensive_operation(item)
@task
def analyze(data: list[str]) -> dict:
"""分析數據"""
return {"summary": summarize(data)}
@entrypoint(checkpointer=InMemorySaver())
def workflow(items: list[str]) -> dict:
# 並行執行所有任務
futures = [process_item(item) for item in items]
results = [f.result() for f in futures]
# 中斷等待人工審核
from langgraph.types import interrupt
approval = interrupt({
"question": "Review results?",
"results": results
})
if approval == "yes":
analysis = analyze(results).result()
return {"status": "completed", "analysis": analysis}
return {"status": "rejected"}
9.2 優勢
- Pythonic 風格:像寫普通函數一樣
- 自動狀態管理:無需手動定義 State
- 自動持久化:@task 的結果自動保存
- 支持中斷:可以在任意位置中斷和恢復
10. 重試機制
10.1 全局重試策略
from langgraph.types import RetryPolicy
graph = builder.compile(
retry_policy=RetryPolicy(
initial_interval=0.5, # 首次重試等待(秒)
backoff_factor=2.0, # 指數退避因子
max_interval=128.0, # 最大等待時間
max_attempts=3, # 最多重試次數
jitter=True, # 隨機抖動(避免驚群)
retry_on=[ # 重試的異常類型
ConnectionError,
TimeoutError,
]
)
)
10.2 節點級重試
builder.add_node(
"flaky_api_call",
api_call_func,
retry_policy=RetryPolicy(
max_attempts=5,
initial_interval=1.0,
retry_on=[APIError]
)
)
11. 可擴展性設計
11.1 自定義 Channel
from langgraph.channels.base import BaseChannel
from typing import Sequence
class SumChannel(BaseChannel[int, int, int]):
"""累加所有更新的 Channel"""
def __init__(self):
self.value = 0
def update(self, values: Sequence[int]) -> bool:
for v in values:
self.value += v
return True
def get(self) -> int:
return self.value
def from_checkpoint(self, checkpoint: int) -> "SumChannel":
channel = SumChannel()
channel.value = checkpoint
return channel
def checkpoint(self) -> int:
return self.value
11.2 自定義 Checkpointer
from langgraph.checkpoint.base import BaseCheckpointSaver
from typing import Optional
class RedisCheckpointer(BaseCheckpointSaver):
"""Redis 實現的 Checkpointer"""
def __init__(self, redis_client):
self.redis = redis_client
def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
thread_id = config["configurable"]["thread_id"]
data = self.redis.get(f"checkpoint:{thread_id}")
if data:
return CheckpointTuple(**json.loads(data))
return None
def put(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
) -> RunnableConfig:
thread_id = config["configurable"]["thread_id"]
self.redis.set(
f"checkpoint:{thread_id}",
json.dumps({
"checkpoint": checkpoint,
"metadata": metadata,
})
)
return config
11.3 Runtime 注入
節點可以接收多種運行時注入的參數:
from langgraph.types import StreamWriter
from langgraph.store.base import BaseStore
def my_node(
state: State,
config: RunnableConfig, # 注入:運行配置
stream_writer: StreamWriter, # 注入:流寫入器
store: BaseStore, # 注入:跨線程存儲
) -> dict:
# 使用 config
user_id = config["configurable"].get("user_id")
# 使用 store(跨會話持久化)
user_prefs = store.get(("users", user_id), "preferences")
# 使用 stream_writer
stream_writer({"status": "processing"})
return {"result": "done"}
12. 完整示例
12.1 基本 ReAct Agent
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langgraph.graph.message import add_messages
from langchain_anthropic import ChatAnthropic
from typing import TypedDict, Annotated
# 定義狀態
class State(TypedDict):
messages: Annotated[list, add_messages]
# 初始化 LLM 和工具
llm = ChatAnthropic(model="claude-sonnet-4-20250514")
tools = [search_tool, calculator_tool]
llm_with_tools = llm.bind_tools(tools)
# 定義節點
def chatbot(state: State):
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state: State):
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return END
# 構建圖
builder = StateGraph(State)
builder.add_node("chatbot", chatbot)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "chatbot")
builder.add_conditional_edges("chatbot", should_continue)
builder.add_edge("tools", "chatbot")
graph = builder.compile()
# 執行
result = graph.invoke({
"messages": [("user", "What is 25 * 4?")]
})
print(result["messages"][-1].content)
12.2 帶持久化和人在迴圈
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["tools"] # 在工具執行前中斷
)
config = {"configurable": {"thread_id": "user_123"}}
# 第一次執行 - 在 tools 前中斷
print("=== First run ===")
for event in graph.stream(
{"messages": [("user", "Search for AI news")]},
config,
stream_mode="updates"
):
print(event)
# 查看待執行的工具
state = graph.get_state(config)
print(f"\nNext: {state.next}")
print(f"Pending tool calls: {state.values['messages'][-1].tool_calls}")
# 人工審核後恢復執行
print("\n=== Resume after approval ===")
for event in graph.stream(None, config, stream_mode="updates"):
print(event)
12.3 使用預構建的 ReAct Agent
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import InMemorySaver
agent = create_react_agent(
model="anthropic:claude-sonnet-4-20250514",
tools=[search_tool, calculator_tool],
prompt="You are a helpful research assistant.",
checkpointer=InMemorySaver(),
)
result = agent.invoke(
{"messages": [("user", "Research the latest AI developments")]},
config={"configurable": {"thread_id": "research_1"}}
)
13. 設計特點總結
| 特性 | 描述 | 優勢 |
|---|---|---|
| BSP 執行模型 | 基於 Google Pregel | 確定性、可恢復、無競態 |
| 版本追蹤 | O(1) 節點激活判斷 | 高效執行 |
| Channel 多態 | 靈活的狀態聚合 | 適應各種場景 |
| 原生持久化 | 自動 Checkpoint | 時間旅行、故障恢復 |
| 人在迴圈 | 靈活中斷/恢復 | 人工審核、修正 |
| 多代理支持 | 多種協作模式 | 複雜系統構建 |
| LangChain 整合 | Runnable 協議 | 生態兼容 |
| 函數式 API | @task/@entrypoint | Pythonic 風格 |
| 豐富流模式 | values/updates/messages/custom | 靈活輸出 |
14. 架構權衡
優勢
- 確定性:BSP 保證相同輸入產生相同結果
- 可恢復性:細粒度檢查點支持任意中斷恢復
- 可觀測性:多種流模式支持完整執行跟蹤
- 並行友好:節點天然並行,無競態條件
- LangChain 整合:與 LangChain 生態無縫對接
權衡
- 編譯開銷:每次 compile() 需要驗證圖結構
- 版本追蹤開銷:channel_versions 增加記憶體使用
- 同步點:每個超步是全局同步點,可能成為瓶頸
- 狀態序列化:完整狀態檢查點可能很大
15. 結語:LangGraph 的定位
LangGraph 是 AI Agent 工作流的底層引擎:
- 如果你需要簡單的 ReAct Agent:使用
create_react_agent()預構建組件 - 如果你需要自定義工作流:使用 StateGraph API
- 如果你需要複雜多代理系統:使用子圖 + Supervisor/Network 模式
- 如果你需要人工審核流程:使用 interrupt + checkpoint
LangGraph 的設計使其成為構建可靠、可觀測、可恢復的 AI 工作流的理想選擇。