LangGraph 架構深度解析:從 BSP 執行模型到有狀態 AI 工作流


0. 為什麼 LangGraph 值得深入研究?

LangGraph 不是另一個「LangChain 插件」,而是一個獨立的低級編排框架,其設計借鑒了 Google 的 Pregel 分布式圖處理算法。如果你想理解現代 AI Agent 工作流的底層機制,LangGraph 的設計是極佳的學習對象。

LangGraph 的核心價值:

  1. 基於 Pregel 的 BSP 執行模型:確定性、可恢復、無競態條件
  2. Channel-based 狀態管理:靈活的狀態聚合策略
  3. 原生持久化支持:自動 Checkpoint,支持時間旅行
  4. 人在迴圈:靈活的中斷/恢復機制
  5. 多代理支持:Supervisor、Network、Hierarchical 等模式

整體定位可以用一張圖表示:

flowchart TD
  classDef pad fill:transparent,stroke:transparent,color:transparent;

  subgraph LangChainCore["langchain-core"]
    direction TB
    core_pad[" "]:::pad
    LLM["BaseChatModel"]
    Tools["BaseTool"]
    Messages["BaseMessage"]
  end

  subgraph LangGraph["LangGraph (編排層)"]
    direction TB
    lg_pad[" "]:::pad
    StateGraph["StateGraph / MessageGraph"]
    Pregel["Pregel Engine"]
    Channels["Channels"]
    Checkpoint["Checkpointer"]
  end

  subgraph UserApp["用戶應用"]
    direction TB
    user_pad[" "]:::pad
    Agent["AI Agent"]
    Workflow["Workflow"]
  end

  LangChainCore --> LangGraph
  LangGraph --> UserApp

接下來深入探討 LangGraph 的核心架構。


1. 專案結構與規模

LangGraph 的核心代碼約有 15,000 行,集中在 Pregel 執行引擎:

langgraph/
├── libs/
│   ├── langgraph/langgraph/        # 核心庫 (~15K 行)
│   │   ├── graph/                  # 圖定義 API
│   │   │   ├── state.py            # StateGraph 類 (1,483 行)
│   │   │   ├── message.py          # 消息處理和 add_messages
│   │   │   └── _branch.py          # 條件邊邏輯
│   │   ├── pregel/                 # 執行引擎 (~10.5K 行)
│   │   │   ├── main.py             # Pregel 主類 (3,322 行)
│   │   │   ├── _loop.py            # 執行循環 (1,328 行)
│   │   │   ├── _algo.py            # 圖算法 (1,233 行)
│   │   │   ├── _runner.py          # 任務執行器
│   │   │   ├── _read.py            # 通道讀取
│   │   │   ├── _write.py           # 通道寫入
│   │   │   └── _retry.py           # 重試機制
│   │   ├── channels/               # 狀態通道
│   │   │   ├── base.py             # 基礎通道接口
│   │   │   ├── binop.py            # BinaryOperatorAggregate
│   │   │   ├── last_value.py       # LastValue 通道
│   │   │   ├── topic.py            # Topic 通道
│   │   │   └── ephemeral.py        # 臨時通道
│   │   ├── managed/                # 受管理的值
│   │   ├── func/                   # 函數式 API
│   │   ├── types.py                # 核心類型定義 (584 行)
│   │   └── config.py               # 配置管理
│   ├── checkpoint/                 # 檢查點基礎接口
│   ├── checkpoint-sqlite/          # SQLite 實現
│   ├── checkpoint-postgres/        # PostgreSQL 實現
│   └── prebuilt/                   # 預構建組件
├── docs/                           # 文檔
└── tests/                          # 測試套件

版本資訊

  • 許可證:MIT
  • Python 要求:>= 3.9
  • 核心依賴:langchain-core >= 0.3.0

2. Pregel / BSP 執行模型

2.1 什麼是 BSP?

Bulk Synchronous Parallel (BSP) 是 LangGraph 的核心執行模型,源自 Google Pregel 論文。其核心思想是:

  • 執行分為多個超步 (Super-step)
  • 每個超步內,節點並行執行
  • 超步之間有全局同步點 (Barrier)
  • 節點只能讀取上一超步的狀態快照

這個設計帶來的優勢:

  • 確定性:相同輸入、相同路徑、相同結果
  • 無競態條件:節點執行階段不會互相干擾
  • 易恢復:任意超步都可作為檢查點恢復

2.2 超步執行流程

每個超步包含四個階段:

flowchart TD
  Start([開始])
  Plan["1. Plan<br/>決定活躍節點"]
  Exec["2. Execute<br/>並行執行節點"]
  Update["3. Update<br/>套用通道更新"]
  Checkpoint["4. Checkpoint<br/>保存狀態"]
  Loop{還有活躍節點?}
  End([結束])

  Start --> Plan --> Exec --> Update --> Checkpoint --> Loop
  Loop -->|是| Plan
  Loop -->|否| End

詳細說明

1. Plan 階段

根據通道版本決定哪些節點需要執行:

# 節點激活條件:
# 存在某個通道 c,使得 channel_versions[c] > versions_seen[node][c]

checkpoint = {
    "channel_versions": {
        "input": 2,
        "messages": 5,
        "state": 3,
    },
    "versions_seen": {
        "chatbot": {"input": 2, "messages": 4},  # chatbot 需要執行
        "tools": {"messages": 5},                 # tools 不需要執行
    }
}

這種設計使得活躍節點判斷只需要 O(1) 時間,不需要掃描完整狀態。

2. Execute 階段

使用 ThreadPoolExecutor(同步)或原生 asyncio(異步)並行執行活躍節點:

# 關鍵代碼示意
class AsyncBackgroundExecutor:
    def __init__(self, config):
        self.loop = asyncio.get_running_loop()
        self.semaphore = asyncio.Semaphore(max_concurrency)

    async def submit(self, coro):
        async with self.semaphore:
            return await coro

關鍵設計:節點執行中只能讀取「上一超步」的通道狀態,所有寫入都緩衝到本地。

3. Update 階段

收集所有節點的寫入,通過 Channel 的聚合策略合併:

def apply_writes(checkpoint, channels, tasks):
    for task in tasks:
        for channel_name, value in task.writes:
            channels[channel_name].update([value])
    return updated_channels

4. Checkpoint 階段

保存當前狀態到持久化存儲(可選同步或異步):

checkpoint = {
    "v": 1,                          # 版本
    "id": "checkpoint_001",          # 唯一 ID
    "ts": "2025-12-17T10:00:00Z",    # 時間戳
    "channel_values": {...},         # 通道值
    "channel_versions": {...},       # 通道版本
    "versions_seen": {...},          # 節點已見版本
}

2.3 BSP 如何避免競態條件

讓我用一個具體例子說明:

假設場景:三個節點同時執行,都要讀寫同一個 channel counter

# 初始值: counter = 10

def node_A(state):
    val = state["counter"]  # 讀
    return {"counter": val + 1}  # 寫 +1

def node_B(state):
    val = state["counter"]  # 讀
    return {"counter": val + 2}  # 寫 +2

def node_C(state):
    val = state["counter"]  # 讀
    return {"counter": val + 3}  # 寫 +3

傳統共享內存方式(有競態條件)

時間軸 →    T1      T2      T3      T4
Node A:   讀=10           寫=11
Node B:         讀=11           寫=13
Node C:                讀=13           寫=16

結果: counter = 16 (依賴執行順序)

問題:

  • 執行順序不同 → 結果不同(不確定性)
  • 需要加鎖 → 效能差
  • A 和 B 同時讀到 10,都寫回去,一個會被覆蓋

BSP 模型(無競態條件)

flowchart TD
  subgraph SuperStep["超步 N"]
    Snapshot["快照: counter = 10"]
    subgraph Parallel["並行執行"]
      A["Node A: 讀=10 → 緩衝 +1"]
      B["Node B: 讀=10 → 緩衝 +2"]
      C["Node C: 讀=10 → 緩衝 +3"]
    end
    Barrier["Barrier (等待完成)"]
    Apply["apply_writes()<br/>合併: 10 + 1 + 2 + 3 = 16"]
  end
  NextStep["超步 N+1: counter = 16"]

  Snapshot --> Parallel --> Barrier --> Apply --> NextStep

關鍵差異

傳統共享內存BSP 模型
讀到「當前」值(可能被改過)讀到「超步開始時的快照」
寫立即生效寫入本地緩衝,超步結束後批量合併
需要鎖/原子操作無需鎖,天然並行安全
結果依賴執行順序結果確定性(與執行順序無關)

3. Channel 狀態系統

3.1 Channel 的設計理念

LangGraph 不直接操作「變數」,而是操作「通道(Channel)」。每個 State 欄位背後,是一個對應的 Channel,負責定義「如何合併多個節點的更新」。

3.2 Channel 類型層次

BaseChannel[Value, Update, Checkpoint]
├── LastValue[V]                # 保留最新值(默認)
├── BinaryOperatorAggregate[V]  # 二元運算聚合
├── Topic[V]                    # 發佈-訂閱
├── NamedBarrierValue[V]        # 同步屏障
├── EphemeralValue[V]           # 臨時值(不持久化)
└── AnyValue[V]                 # 接受任何更新

3.3 使用 Annotated 定義 Reducer

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add

class State(TypedDict):
    messages: Annotated[list, add]  # 使用 add 作為 reducer → 追加
    count: int                       # 無 reducer → 覆蓋 (LastValue)
    items: Annotated[list, lambda a, b: a + b]  # 自定義 reducer

builder = StateGraph(State)

3.4 BinaryOperatorAggregate 實現

class BinaryOperatorAggregate(BaseChannel[Value, Value, Value]):
    """使用二元運算符聚合多個更新"""

    def __init__(self, operator: Callable[[Value, Value], Value]):
        self.operator = operator
        self.value = None

    def update(self, values: Sequence[Value]) -> bool:
        """按順序套用所有更新"""
        for value in values:
            if self.value is None:
                self.value = value
            else:
                self.value = self.operator(self.value, value)
        return True

    def get(self) -> Value:
        return self.value

3.5 Channel 與 State 的對應關係

flowchart LR
  subgraph State["State (TypedDict)"]
    M["messages<br/>Annotated[list, add]"]
    C["count<br/>int"]
  end

  subgraph Channels["Channels"]
    MC["BinaryOperatorAggregate(add)"]
    CC["LastValue"]
  end

  M --> MC
  C --> CC

4. StateGraph:圖定義 API

4.1 基本用法

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]

def chatbot(state: State):
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

def should_continue(state: State):
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "tools"
    return END

# 構建圖
builder = StateGraph(State)
builder.add_node("chatbot", chatbot)
builder.add_node("tools", tool_node)

# 定義邊
builder.add_edge(START, "chatbot")
builder.add_conditional_edges("chatbot", should_continue, {
    "tools": "tools",
    END: END
})
builder.add_edge("tools", "chatbot")

# 編譯
graph = builder.compile(checkpointer=checkpointer)

4.2 關鍵方法

方法用途示例
add_node(name, func)添加節點builder.add_node("chatbot", chatbot_func)
add_edge(from, to)添加直接邊builder.add_edge("tools", "chatbot")
add_conditional_edges(from, condition, mapping)添加條件路由見上例
compile(checkpointer, interrupt_before, ...)編譯為可執行圖graph = builder.compile()

4.3 圖的可視化

# 生成 Mermaid 圖
print(graph.get_graph().draw_mermaid())

# 生成 PNG 圖片
graph.get_graph().draw_mermaid_png(output_file_path="graph.png")

5. Checkpoint 持久化系統

5.1 Checkpoint 數據結構

class Checkpoint(TypedDict):
    v: int                              # Checkpoint 版本
    id: str                             # 單調遞增 ID
    ts: str                             # ISO 時間戳
    channel_values: dict[str, Any]      # 通道值
    channel_versions: dict[str, int]    # 每個通道的版本
    versions_seen: dict[str, dict[str, int]]  # 節點已見版本

5.2 Checkpointer 實現

LangGraph 提供多種 Checkpointer:

# 內存(開發環境)
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()

# SQLite(單機持久化)
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("./checkpoints.db")

# PostgreSQL(生產環境)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/db"
)

5.3 時間旅行:回滾到任意檢查點

# 列出所有檢查點
config = {"configurable": {"thread_id": "user_123"}}
for checkpoint in graph.get_state_history(config):
    print(f"ID: {checkpoint.config['configurable']['checkpoint_id']}")
    print(f"State: {checkpoint.values}")

# 回滾到特定檢查點
old_config = {
    "configurable": {
        "thread_id": "user_123",
        "checkpoint_id": "checkpoint_001"
    }
}
state = graph.get_state(old_config)

# 從該檢查點繼續執行
for event in graph.stream(None, old_config):
    print(event)

6. 人在迴圈 (Human-in-the-Loop)

6.1 靜態中斷點

在編譯時指定中斷位置:

graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["tools"],   # 在 tools 節點前中斷
    interrupt_after=["chatbot"],  # 在 chatbot 節點後中斷
)

6.2 動態中斷

在節點內動態決定是否中斷:

from langgraph.types import interrupt

def review_node(state: State):
    # 動態中斷,等待人工輸入
    approval = interrupt({
        "question": "Approve this action?",
        "data": state["pending_action"]
    })

    if approval == "yes":
        return {"approved": True}
    return {"approved": False}

6.3 中斷與恢復流程

flowchart TD
  Start([開始]) --> Run["執行圖"]
  Run --> Check{遇到中斷點?}
  Check -->|否| Continue["繼續執行"]
  Check -->|是| Save["保存 Checkpoint"]
  Save --> Wait["等待人工輸入"]
  Wait --> Resume["恢復執行"]
  Resume --> Run
  Continue --> End([結束])

6.4 恢復執行

config = {"configurable": {"thread_id": "user_123"}}

# 第一次執行 - 在中斷點停止
for event in graph.stream({"messages": [user_input]}, config):
    print(event)

# 查看當前狀態
state = graph.get_state(config)
print(f"Next node: {state.next}")

# 方式一:直接恢復
for event in graph.stream(None, config):
    print(event)

# 方式二:帶人工輸入恢復
from langgraph.types import Command
for event in graph.stream(Command(resume="approved"), config):
    print(event)

# 方式三:修改狀態後恢復
graph.update_state(config, {"approved": True})
for event in graph.stream(None, config):
    print(event)

7. 多代理架構

7.1 四種主要模式

flowchart LR
  subgraph Network["Network 模式"]
    N1["Agent 1"] <--> N2["Agent 2"]
    N2 <--> N3["Agent 3"]
    N1 <--> N3
  end

  subgraph Supervisor["Supervisor 模式"]
    S["Supervisor"]
    S --> A1["Agent 1"]
    S --> A2["Agent 2"]
    S --> A3["Agent 3"]
  end
模式描述適用場景
Network全連接,任意通信協作討論、腦力激盪
Supervisor中央監督者路由任務分派、專家選擇
Hierarchical多層監督者複雜組織結構
Handoffs直接切換專家系統、狀態機

7.2 Supervisor 模式實現

from langgraph.types import Command

def supervisor(state: State):
    """監督者決定下一個執行的代理"""
    response = llm.invoke(
        f"Given the task: {state['task']}, "
        f"which agent should handle it? "
        f"Options: researcher, writer, coder"
    )
    return Command(goto=response.agent_name)

def researcher(state: State):
    """研究員代理"""
    result = search_web(state["task"])
    return {"research": result}

def writer(state: State):
    """寫手代理"""
    article = llm.invoke(f"Write about: {state['research']}")
    return {"article": article}

# 構建圖
builder = StateGraph(State)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("writer", writer)

builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", lambda x: x.goto)
builder.add_edge("researcher", "supervisor")
builder.add_edge("writer", END)

7.3 子圖支持

# 創建子圖
subgraph = create_research_subgraph()

# 將子圖作為節點
builder.add_node("research_workflow", subgraph)

# 子圖內可以跳轉到父圖
def subgraph_node(state: SubState):
    return Command(
        graph=Command.PARENT,  # 跳轉到父圖
        update={"result": state["result"]}
    )

7.4 Send:動態任務分派

from langgraph.types import Send

def router(state: State):
    """動態創建多個並行任務"""
    return [
        Send("process", {"item": item})
        for item in state["items"]
    ]

builder.add_conditional_edges("router", router)

8. 流式處理

8.1 流模式

模式內容用途
values每步完整狀態調試、狀態檢視
updates增量更新實時 UI 更新
messagesLLM token打字效果
custom自定義信號進度條、狀態指示
debug詳細追蹤信息開發調試

8.2 使用示例

# values 模式 - 完整狀態
for chunk in graph.stream(input, stream_mode="values"):
    print(f"Full state: {chunk}")

# updates 模式 - 增量更新
for chunk in graph.stream(input, stream_mode="updates"):
    print(f"Node: {chunk.keys()}, Update: {chunk}")

# messages 模式 - LLM token
for chunk in graph.stream(input, stream_mode="messages"):
    if hasattr(chunk, 'content'):
        print(chunk.content, end="", flush=True)

# 多模式組合
for chunk in graph.stream(input, stream_mode=["updates", "messages"]):
    print(chunk)

8.3 自定義流信號

from langgraph.config import get_stream_writer

def processing_node(state: State):
    writer = get_stream_writer()

    # 發送進度信號
    writer({"type": "progress", "value": 0})

    for i, item in enumerate(state["items"]):
        process(item)
        writer({"type": "progress", "value": (i + 1) / len(state["items"]) * 100})

    writer({"type": "progress", "value": 100})
    return {"processed": True}

# 接收自定義信號
for chunk in graph.stream(input, stream_mode="custom"):
    if chunk.get("type") == "progress":
        print(f"Progress: {chunk['value']}%")

9. 函數式 API

除了 Graph API,LangGraph 還提供更 Pythonic 的函數式 API:

9.1 @task 和 @entrypoint

from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

@task
def process_item(item: str) -> str:
    """可被追蹤和檢查點的任務"""
    return expensive_operation(item)

@task
def analyze(data: list[str]) -> dict:
    """分析數據"""
    return {"summary": summarize(data)}

@entrypoint(checkpointer=InMemorySaver())
def workflow(items: list[str]) -> dict:
    # 並行執行所有任務
    futures = [process_item(item) for item in items]
    results = [f.result() for f in futures]

    # 中斷等待人工審核
    from langgraph.types import interrupt
    approval = interrupt({
        "question": "Review results?",
        "results": results
    })

    if approval == "yes":
        analysis = analyze(results).result()
        return {"status": "completed", "analysis": analysis}

    return {"status": "rejected"}

9.2 優勢

  • Pythonic 風格:像寫普通函數一樣
  • 自動狀態管理:無需手動定義 State
  • 自動持久化:@task 的結果自動保存
  • 支持中斷:可以在任意位置中斷和恢復

10. 重試機制

10.1 全局重試策略

from langgraph.types import RetryPolicy

graph = builder.compile(
    retry_policy=RetryPolicy(
        initial_interval=0.5,      # 首次重試等待(秒)
        backoff_factor=2.0,        # 指數退避因子
        max_interval=128.0,        # 最大等待時間
        max_attempts=3,            # 最多重試次數
        jitter=True,               # 隨機抖動(避免驚群)
        retry_on=[                 # 重試的異常類型
            ConnectionError,
            TimeoutError,
        ]
    )
)

10.2 節點級重試

builder.add_node(
    "flaky_api_call",
    api_call_func,
    retry_policy=RetryPolicy(
        max_attempts=5,
        initial_interval=1.0,
        retry_on=[APIError]
    )
)

11. 可擴展性設計

11.1 自定義 Channel

from langgraph.channels.base import BaseChannel
from typing import Sequence

class SumChannel(BaseChannel[int, int, int]):
    """累加所有更新的 Channel"""

    def __init__(self):
        self.value = 0

    def update(self, values: Sequence[int]) -> bool:
        for v in values:
            self.value += v
        return True

    def get(self) -> int:
        return self.value

    def from_checkpoint(self, checkpoint: int) -> "SumChannel":
        channel = SumChannel()
        channel.value = checkpoint
        return channel

    def checkpoint(self) -> int:
        return self.value

11.2 自定義 Checkpointer

from langgraph.checkpoint.base import BaseCheckpointSaver
from typing import Optional

class RedisCheckpointer(BaseCheckpointSaver):
    """Redis 實現的 Checkpointer"""

    def __init__(self, redis_client):
        self.redis = redis_client

    def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
        thread_id = config["configurable"]["thread_id"]
        data = self.redis.get(f"checkpoint:{thread_id}")
        if data:
            return CheckpointTuple(**json.loads(data))
        return None

    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions,
    ) -> RunnableConfig:
        thread_id = config["configurable"]["thread_id"]
        self.redis.set(
            f"checkpoint:{thread_id}",
            json.dumps({
                "checkpoint": checkpoint,
                "metadata": metadata,
            })
        )
        return config

11.3 Runtime 注入

節點可以接收多種運行時注入的參數:

from langgraph.types import StreamWriter
from langgraph.store.base import BaseStore

def my_node(
    state: State,
    config: RunnableConfig,           # 注入:運行配置
    stream_writer: StreamWriter,      # 注入:流寫入器
    store: BaseStore,                 # 注入:跨線程存儲
) -> dict:
    # 使用 config
    user_id = config["configurable"].get("user_id")

    # 使用 store(跨會話持久化)
    user_prefs = store.get(("users", user_id), "preferences")

    # 使用 stream_writer
    stream_writer({"status": "processing"})

    return {"result": "done"}

12. 完整示例

12.1 基本 ReAct Agent

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langgraph.graph.message import add_messages
from langchain_anthropic import ChatAnthropic
from typing import TypedDict, Annotated

# 定義狀態
class State(TypedDict):
    messages: Annotated[list, add_messages]

# 初始化 LLM 和工具
llm = ChatAnthropic(model="claude-sonnet-4-20250514")
tools = [search_tool, calculator_tool]
llm_with_tools = llm.bind_tools(tools)

# 定義節點
def chatbot(state: State):
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def should_continue(state: State):
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "tools"
    return END

# 構建圖
builder = StateGraph(State)
builder.add_node("chatbot", chatbot)
builder.add_node("tools", ToolNode(tools))

builder.add_edge(START, "chatbot")
builder.add_conditional_edges("chatbot", should_continue)
builder.add_edge("tools", "chatbot")

graph = builder.compile()

# 執行
result = graph.invoke({
    "messages": [("user", "What is 25 * 4?")]
})
print(result["messages"][-1].content)

12.2 帶持久化和人在迴圈

from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()

graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["tools"]  # 在工具執行前中斷
)

config = {"configurable": {"thread_id": "user_123"}}

# 第一次執行 - 在 tools 前中斷
print("=== First run ===")
for event in graph.stream(
    {"messages": [("user", "Search for AI news")]},
    config,
    stream_mode="updates"
):
    print(event)

# 查看待執行的工具
state = graph.get_state(config)
print(f"\nNext: {state.next}")
print(f"Pending tool calls: {state.values['messages'][-1].tool_calls}")

# 人工審核後恢復執行
print("\n=== Resume after approval ===")
for event in graph.stream(None, config, stream_mode="updates"):
    print(event)

12.3 使用預構建的 ReAct Agent

from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import InMemorySaver

agent = create_react_agent(
    model="anthropic:claude-sonnet-4-20250514",
    tools=[search_tool, calculator_tool],
    prompt="You are a helpful research assistant.",
    checkpointer=InMemorySaver(),
)

result = agent.invoke(
    {"messages": [("user", "Research the latest AI developments")]},
    config={"configurable": {"thread_id": "research_1"}}
)

13. 設計特點總結

特性描述優勢
BSP 執行模型基於 Google Pregel確定性、可恢復、無競態
版本追蹤O(1) 節點激活判斷高效執行
Channel 多態靈活的狀態聚合適應各種場景
原生持久化自動 Checkpoint時間旅行、故障恢復
人在迴圈靈活中斷/恢復人工審核、修正
多代理支持多種協作模式複雜系統構建
LangChain 整合Runnable 協議生態兼容
函數式 API@task/@entrypointPythonic 風格
豐富流模式values/updates/messages/custom靈活輸出

14. 架構權衡

優勢

  1. 確定性:BSP 保證相同輸入產生相同結果
  2. 可恢復性:細粒度檢查點支持任意中斷恢復
  3. 可觀測性:多種流模式支持完整執行跟蹤
  4. 並行友好:節點天然並行,無競態條件
  5. LangChain 整合:與 LangChain 生態無縫對接

權衡

  1. 編譯開銷:每次 compile() 需要驗證圖結構
  2. 版本追蹤開銷:channel_versions 增加記憶體使用
  3. 同步點:每個超步是全局同步點,可能成為瓶頸
  4. 狀態序列化:完整狀態檢查點可能很大

15. 結語:LangGraph 的定位

LangGraph 是 AI Agent 工作流的底層引擎

  • 如果你需要簡單的 ReAct Agent:使用 create_react_agent() 預構建組件
  • 如果你需要自定義工作流:使用 StateGraph API
  • 如果你需要複雜多代理系統:使用子圖 + Supervisor/Network 模式
  • 如果你需要人工審核流程:使用 interrupt + checkpoint

LangGraph 的設計使其成為構建可靠、可觀測、可恢復的 AI 工作流的理想選擇。