Agno 深度解析:下一代多智能體框架的架構與實現


引言:AI Agent 框架的演進

隨著大型語言模型(LLM)的發展,AI Agent 已從概念驗證走向生產應用。然而,構建生產級的多智能體系統面臨諸多挑戰:

  • 狀態管理:如何持久化對話歷史和 Agent 狀態?
  • 工具整合:如何安全地讓 Agent 使用外部工具?
  • 協作機制:多個 Agent 如何高效協作?
  • 生產部署:如何將 Agent 系統部署到生產環境?

Agno 是一個為解決這些問題而生的現代多智能體框架。它提供:

  1. Framework:構建 Agent、Team、Workflow 的完整工具鏈
  2. AgentOS Runtime:生產級的運行時環境
  3. Control Plane:可視化監控和管理界面

本文將從源碼層面深入剖析 Agno 的核心架構。


第一章:Agno 整體架構

1.1 架構概覽

graph TB
    subgraph "Application Layer"
        APP[Your Application]
    end

    subgraph "Agno Framework"
        AGENT[Agent]
        TEAM[Team]
        WORKFLOW[Workflow]

        subgraph "Core Modules"
            MODEL[Models]
            TOOLS[Tools/Toolkits]
            MEMORY[Memory]
            KNOWLEDGE[Knowledge]
            GUARD[Guardrails]
        end

        subgraph "Protocols"
            MCP[MCP Protocol]
            A2A[A2A Protocol]
        end
    end

    subgraph "AgentOS Runtime"
        FASTAPI[FastAPI Server]
        SSE[SSE Endpoints]
        STATE[State Management]
    end

    subgraph "Storage"
        DB[(Database<br/>SQLite/PostgreSQL)]
        VECTOR[(Vector Store)]
    end

    subgraph "LLM Providers"
        OPENAI[OpenAI]
        ANTHROPIC[Anthropic]
        GOOGLE[Google]
        OLLAMA[Ollama]
    end

    APP --> AGENT
    APP --> TEAM
    APP --> WORKFLOW

    AGENT --> MODEL
    AGENT --> TOOLS
    AGENT --> MEMORY
    AGENT --> KNOWLEDGE
    AGENT --> GUARD
    AGENT --> MCP

    TEAM --> AGENT
    WORKFLOW --> AGENT
    WORKFLOW --> TEAM

    FASTAPI --> AGENT
    FASTAPI --> TEAM
    FASTAPI --> WORKFLOW

    MODEL --> OPENAI
    MODEL --> ANTHROPIC
    MODEL --> GOOGLE
    MODEL --> OLLAMA

    MEMORY --> DB
    KNOWLEDGE --> VECTOR

1.2 核心設計理念

graph LR
    subgraph "Agno 設計原則"
        PERF[極致效能<br/>μs 級實例化]
        TYPE[類型安全<br/>Pydantic Schema]
        ASYNC[異步優先<br/>高吞吐量]
        PROD[生產就緒<br/>開箱即用]
        PRIV[隱私優先<br/>本地運行]
    end

效能對比(官方基準測試):

指標AgnoLangGraphCrewAIPydanticAI
實例化時間~3μs~1.6ms~210μs~171μs
記憶體佔用~6.6KB~160KB~66KB~26KB
速度倍數1x529x 慢70x 慢57x 慢

第二章:Agent 核心架構

2.1 Agent 類結構

# agno/agent/agent.py
@dataclass(init=False)
class Agent:
    """Agno Agent 核心類"""

    # === 基礎設置 ===
    model: Optional[Model] = None          # LLM 模型
    name: Optional[str] = None             # Agent 名稱
    id: Optional[str] = None               # 唯一標識符

    # === 用戶與會話 ===
    user_id: Optional[str] = None
    session_id: Optional[str] = None

    # === 提示詞設置 ===
    system_message: Optional[str] = None   # 系統提示
    instructions: Optional[List[str]] = None  # 指令列表
    description: Optional[str] = None      # Agent 描述

    # === 工具與能力 ===
    tools: Optional[List[Toolkit]] = None  # 工具集
    knowledge: Optional[Knowledge] = None  # 知識庫
    memory: Optional[MemoryManager] = None # 記憶管理

    # === 輸入輸出模式 ===
    input_schema: Optional[Type[BaseModel]] = None
    output_schema: Optional[Type[BaseModel]] = None

    # === 安全與控制 ===
    guardrails: Optional[List[BaseGuardrail]] = None
    pre_hooks: Optional[List[Callable]] = None
    post_hooks: Optional[List[Callable]] = None

    # === 資料持久化 ===
    db: Optional[BaseDb] = None            # 資料庫連接

    # === 高級功能 ===
    reasoning: bool = False                # 推理模式
    add_history_to_context: bool = False   # 歷史上下文
    markdown: bool = False                 # Markdown 輸出

2.2 Agent 生命週期

sequenceDiagram
    participant User
    participant Agent
    participant PreHook
    participant Model
    participant Tools
    participant PostHook
    participant DB

    User->>Agent: run(message)
    Agent->>Agent: 建立 RunContext

    Agent->>PreHook: 執行前置鉤子
    PreHook-->>Agent: 驗證/轉換輸入

    Agent->>Agent: 構建 System Message
    Agent->>Agent: 加載歷史上下文
    Agent->>Agent: 查詢 Knowledge

    Agent->>Model: 發送請求
    Model-->>Agent: 響應/工具調用

    loop 工具調用循環
        Agent->>Tools: 執行工具
        Tools-->>Agent: 工具結果
        Agent->>Model: 繼續對話
        Model-->>Agent: 響應
    end

    Agent->>PostHook: 執行後置鉤子
    PostHook-->>Agent: 處理輸出

    Agent->>DB: 保存會話

    Agent-->>User: RunOutput

2.3 核心運行邏輯

# agno/agent/agent.py (簡化版)
class Agent:
    async def arun(
        self,
        message: Optional[str] = None,
        *,
        messages: Optional[List[Message]] = None,
        images: Optional[List[Image]] = None,
        audio: Optional[List[Audio]] = None,
        videos: Optional[List[Video]] = None,
        files: Optional[List[File]] = None,
        stream: bool = True,
        **kwargs
    ) -> Union[RunOutput, AsyncIterator[RunOutputEvent]]:
        """異步運行 Agent"""

        # 1. 創建運行上下文
        run_context = RunContext(
            agent=self,
            user_id=self.user_id,
            session_id=self.session_id or str(uuid4()),
        )

        # 2. 執行前置鉤子
        if self.pre_hooks:
            message = await self._execute_pre_hooks(message, run_context)

        # 3. 驗證輸入
        if self.input_schema:
            message = await self._validate_input(message)

        # 4. 構建消息列表
        run_messages = await self._build_messages(
            message=message,
            messages=messages,
            images=images,
            audio=audio,
            run_context=run_context,
        )

        # 5. 查詢知識庫(如果配置)
        if self.knowledge:
            knowledge_context = await self._search_knowledge(message)
            run_messages = self._inject_knowledge(run_messages, knowledge_context)

        # 6. 調用模型
        if stream:
            return self._stream_response(run_messages, run_context)
        else:
            return await self._get_response(run_messages, run_context)

    async def _get_response(
        self,
        messages: RunMessages,
        run_context: RunContext,
    ) -> RunOutput:
        """獲取模型響應"""

        # 調用模型
        model_response = await self.model.ainvoke(
            messages=messages.to_model_messages(),
            tools=self._get_tools_for_model(),
        )

        # 處理工具調用
        while model_response.tool_calls:
            tool_results = await self._execute_tool_calls(
                model_response.tool_calls,
                run_context,
            )

            # 將工具結果加入消息
            messages.add_tool_results(tool_results)

            # 繼續調用模型
            model_response = await self.model.ainvoke(
                messages=messages.to_model_messages(),
                tools=self._get_tools_for_model(),
            )

        # 驗證輸出
        if self.output_schema:
            output = await self._validate_output(model_response.content)
        else:
            output = model_response.content

        # 執行後置鉤子
        if self.post_hooks:
            output = await self._execute_post_hooks(output, run_context)

        # 保存會話
        if self.db:
            await self._save_session(messages, output, run_context)

        return RunOutput(
            content=output,
            messages=messages,
            metrics=model_response.metrics,
        )

    async def _execute_tool_calls(
        self,
        tool_calls: List[ToolCall],
        run_context: RunContext,
    ) -> List[ToolResult]:
        """並行執行工具調用"""

        async def execute_single(tool_call: ToolCall) -> ToolResult:
            tool = self._get_tool(tool_call.name)
            if tool is None:
                return ToolResult(
                    tool_call_id=tool_call.id,
                    error=f"Tool {tool_call.name} not found",
                )

            try:
                # 執行工具
                result = await tool.aexecute(**tool_call.arguments)
                return ToolResult(
                    tool_call_id=tool_call.id,
                    content=result,
                )
            except Exception as e:
                return ToolResult(
                    tool_call_id=tool_call.id,
                    error=str(e),
                )

        # 並行執行所有工具調用
        results = await asyncio.gather(
            *[execute_single(tc) for tc in tool_calls]
        )

        return list(results)

2.4 模型抽象層

# agno/models/base.py
class Model(ABC):
    """模型抽象基類"""

    id: str
    name: str
    provider: str

    @abstractmethod
    async def ainvoke(
        self,
        messages: List[Message],
        tools: Optional[List[Tool]] = None,
        **kwargs
    ) -> ModelResponse:
        """異步調用模型"""
        pass

    @abstractmethod
    async def astream(
        self,
        messages: List[Message],
        tools: Optional[List[Tool]] = None,
        **kwargs
    ) -> AsyncIterator[ModelResponseEvent]:
        """異步流式調用"""
        pass


# agno/models/anthropic/claude.py
class Claude(Model):
    """Claude 模型實現"""

    def __init__(
        self,
        id: str = "claude-sonnet-4-5",
        api_key: Optional[str] = None,
        **kwargs
    ):
        self.id = id
        self.client = anthropic.AsyncAnthropic(api_key=api_key)

    async def ainvoke(
        self,
        messages: List[Message],
        tools: Optional[List[Tool]] = None,
        **kwargs
    ) -> ModelResponse:
        # 轉換消息格式
        anthropic_messages = self._convert_messages(messages)

        # 轉換工具格式
        anthropic_tools = self._convert_tools(tools) if tools else None

        # 調用 API
        response = await self.client.messages.create(
            model=self.id,
            messages=anthropic_messages,
            tools=anthropic_tools,
            max_tokens=kwargs.get("max_tokens", 4096),
        )

        return self._convert_response(response)


# agno/models/openai/gpt.py
class OpenAIChat(Model):
    """OpenAI GPT 模型實現"""

    def __init__(
        self,
        id: str = "gpt-4o",
        api_key: Optional[str] = None,
        **kwargs
    ):
        self.id = id
        self.client = openai.AsyncOpenAI(api_key=api_key)

    async def ainvoke(
        self,
        messages: List[Message],
        tools: Optional[List[Tool]] = None,
        **kwargs
    ) -> ModelResponse:
        response = await self.client.chat.completions.create(
            model=self.id,
            messages=self._convert_messages(messages),
            tools=self._convert_tools(tools) if tools else None,
        )

        return self._convert_response(response)

第三章:工具系統

3.1 Toolkit 架構

graph TB
    subgraph "Tool System"
        TOOLKIT[Toolkit Base]

        subgraph "Built-in Toolkits"
            WEB[WebTools]
            FILE[FileTools]
            CODE[CodeTools]
            DB[DatabaseTools]
            API[APITools]
        end

        subgraph "Protocol Tools"
            MCP_T[MCPTools]
            A2A_T[A2ATools]
        end

        CUSTOM[Custom Tools]
    end

    TOOLKIT --> WEB
    TOOLKIT --> FILE
    TOOLKIT --> CODE
    TOOLKIT --> DB
    TOOLKIT --> API
    TOOLKIT --> MCP_T
    TOOLKIT --> A2A_T
    TOOLKIT --> CUSTOM

3.2 Toolkit 實現

# agno/tools/toolkit.py
class Toolkit(ABC):
    """工具集基類"""

    name: str
    description: str
    enabled: bool = True

    @abstractmethod
    def get_tools(self) -> List[Function]:
        """返回工具列表"""
        pass


# agno/tools/function.py
class Function:
    """函數工具封裝"""

    def __init__(
        self,
        name: str,
        description: str,
        parameters: Dict[str, Any],
        func: Callable,
        requires_confirmation: bool = False,
    ):
        self.name = name
        self.description = description
        self.parameters = parameters
        self.func = func
        self.requires_confirmation = requires_confirmation

    async def aexecute(self, **kwargs) -> Any:
        """異步執行函數"""
        if iscoroutinefunction(self.func):
            return await self.func(**kwargs)
        else:
            return await asyncio.to_thread(self.func, **kwargs)

    def to_openai_tool(self) -> Dict[str, Any]:
        """轉換為 OpenAI 工具格式"""
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": self.parameters,
            }
        }


# 使用裝飾器定義工具
from agno.tools import tool

class MyToolkit(Toolkit):
    name = "my_toolkit"
    description = "My custom tools"

    @tool
    def search_web(self, query: str) -> str:
        """Search the web for information.

        Args:
            query: The search query
        """
        # 實現搜索邏輯
        return f"Results for: {query}"

    @tool
    async def fetch_data(self, url: str) -> dict:
        """Fetch data from a URL.

        Args:
            url: The URL to fetch
        """
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return response.json()

    def get_tools(self) -> List[Function]:
        return [
            self.search_web,
            self.fetch_data,
        ]

3.3 MCP 整合

# agno/tools/mcp.py
class MCPTools(Toolkit):
    """Model Context Protocol 工具集"""

    def __init__(
        self,
        transport: Literal["stdio", "streamable-http"] = "stdio",
        url: Optional[str] = None,
        command: Optional[str] = None,
        **kwargs
    ):
        self.transport = transport
        self.url = url
        self.command = command
        self._client = None
        self._tools = None

    async def _connect(self):
        """建立 MCP 連接"""
        if self.transport == "streamable-http":
            self._client = MCPHttpClient(self.url)
        else:
            self._client = MCPStdioClient(self.command)

        await self._client.connect()
        self._tools = await self._client.list_tools()

    def get_tools(self) -> List[Function]:
        """獲取 MCP 服務器提供的工具"""
        if self._tools is None:
            asyncio.run(self._connect())

        functions = []
        for tool in self._tools:
            func = Function(
                name=tool.name,
                description=tool.description,
                parameters=tool.input_schema,
                func=self._create_executor(tool.name),
            )
            functions.append(func)

        return functions

    def _create_executor(self, tool_name: str) -> Callable:
        """創建工具執行器"""
        async def executor(**kwargs) -> Any:
            return await self._client.call_tool(tool_name, kwargs)
        return executor


# 使用示例
agent = Agent(
    model=Claude(),
    tools=[
        MCPTools(
            transport="streamable-http",
            url="https://example.com/mcp"
        )
    ]
)

第四章:Team 多智能體協作

4.1 Team 架構

graph TB
    subgraph "Team Structure"
        TEAM[Team]
        LEADER[Team Leader]
        MEMBER1[Agent 1]
        MEMBER2[Agent 2]
        MEMBER3[Agent 3]

        TEAM --> LEADER
        LEADER --> MEMBER1
        LEADER --> MEMBER2
        LEADER --> MEMBER3
    end

    subgraph "Collaboration Modes"
        ROUTE[Route Mode<br/>路由模式]
        COLLAB[Collaborate Mode<br/>協作模式]
        COORD[Coordinate Mode<br/>協調模式]
    end

    subgraph "Communication"
        SHARE[Shared Memory]
        MSG[Message Passing]
        STATE[Shared State]
    end

4.2 Team 實現

# agno/team/team.py
@dataclass(init=False)
class Team:
    """多智能體團隊"""

    name: str
    agents: List[Agent]
    leader: Optional[Agent] = None
    mode: Literal["route", "collaborate", "coordinate"] = "route"

    # 共享資源
    shared_memory: Optional[MemoryManager] = None
    shared_knowledge: Optional[Knowledge] = None

    # 團隊設置
    max_rounds: int = 10
    enable_agentic_context: bool = True

    async def arun(
        self,
        message: str,
        **kwargs
    ) -> TeamRunOutput:
        """運行團隊"""

        if self.mode == "route":
            return await self._run_route_mode(message, **kwargs)
        elif self.mode == "collaborate":
            return await self._run_collaborate_mode(message, **kwargs)
        elif self.mode == "coordinate":
            return await self._run_coordinate_mode(message, **kwargs)

    async def _run_route_mode(
        self,
        message: str,
        **kwargs
    ) -> TeamRunOutput:
        """路由模式:Leader 選擇最適合的 Agent 處理"""

        # 1. Leader 分析任務並選擇 Agent
        selection_prompt = self._build_selection_prompt(message)
        leader_response = await self.leader.arun(selection_prompt)

        selected_agent_name = self._parse_agent_selection(leader_response)
        selected_agent = self._get_agent_by_name(selected_agent_name)

        # 2. 選中的 Agent 處理任務
        agent_response = await selected_agent.arun(message, **kwargs)

        return TeamRunOutput(
            content=agent_response.content,
            selected_agent=selected_agent_name,
            messages=agent_response.messages,
        )

    async def _run_collaborate_mode(
        self,
        message: str,
        **kwargs
    ) -> TeamRunOutput:
        """協作模式:多個 Agent 並行處理,合併結果"""

        # 1. 分解任務
        subtasks = await self._decompose_task(message)

        # 2. 並行分配給不同 Agent
        tasks = []
        for agent, subtask in zip(self.agents, subtasks):
            tasks.append(agent.arun(subtask, **kwargs))

        responses = await asyncio.gather(*tasks)

        # 3. 合併結果
        merged_result = await self._merge_responses(responses)

        return TeamRunOutput(
            content=merged_result,
            agent_responses=responses,
        )

    async def _run_coordinate_mode(
        self,
        message: str,
        **kwargs
    ) -> TeamRunOutput:
        """協調模式:Agent 輪流處理,傳遞上下文"""

        context = {"original_message": message}
        current_message = message

        for round_num in range(self.max_rounds):
            for agent in self.agents:
                # 構建包含上下文的消息
                augmented_message = self._build_context_message(
                    current_message, context
                )

                # Agent 處理
                response = await agent.arun(augmented_message, **kwargs)

                # 更新上下文
                context[f"{agent.name}_response"] = response.content

                # 檢查是否完成
                if self._is_task_complete(response):
                    return TeamRunOutput(
                        content=response.content,
                        rounds=round_num + 1,
                        context=context,
                    )

                current_message = response.content

        return TeamRunOutput(
            content=current_message,
            rounds=self.max_rounds,
            context=context,
        )

    def _build_selection_prompt(self, message: str) -> str:
        """構建 Agent 選擇提示"""
        agents_info = "\n".join([
            f"- {agent.name}: {agent.description}"
            for agent in self.agents
        ])

        return f"""
        Based on the following task, select the most appropriate agent:

        Task: {message}

        Available Agents:
        {agents_info}

        Respond with just the agent name.
        """


# 使用示例
research_agent = Agent(
    name="Researcher",
    description="Expert at finding and analyzing information",
    model=Claude(),
    tools=[WebSearchTools()],
)

writer_agent = Agent(
    name="Writer",
    description="Expert at writing clear and engaging content",
    model=Claude(),
)

editor_agent = Agent(
    name="Editor",
    description="Expert at reviewing and improving content",
    model=Claude(),
)

content_team = Team(
    name="Content Team",
    agents=[research_agent, writer_agent, editor_agent],
    leader=Agent(
        name="Team Leader",
        description="Coordinates the content creation process",
        model=Claude(),
    ),
    mode="coordinate",
)

# 運行團隊
result = await content_team.arun("Write an article about AI trends in 2024")

第五章:Workflow 工作流

5.1 Workflow 架構

graph TB
    subgraph "Workflow Structure"
        START((Start))
        STEP1[Step 1<br/>Agent A]
        STEP2[Step 2<br/>Agent B]
        COND{Condition}
        STEP3A[Step 3A]
        STEP3B[Step 3B]
        END((End))

        START --> STEP1
        STEP1 --> STEP2
        STEP2 --> COND
        COND --> |Yes| STEP3A
        COND --> |No| STEP3B
        STEP3A --> END
        STEP3B --> END
    end

5.2 Workflow 實現

# agno/workflow/workflow.py
@dataclass(init=False)
class Workflow:
    """工作流定義"""

    name: str
    description: str

    # 工作流步驟
    steps: List[WorkflowStep]

    # 狀態管理
    state: Dict[str, Any] = field(default_factory=dict)

    # 持久化
    db: Optional[BaseDb] = None

    # 錯誤處理
    on_error: Optional[Callable] = None
    max_retries: int = 3

    async def arun(
        self,
        input_data: Dict[str, Any],
        **kwargs
    ) -> WorkflowRunOutput:
        """運行工作流"""

        self.state = {"input": input_data}
        current_step = self.steps[0]

        while current_step is not None:
            try:
                # 執行步驟
                result = await self._execute_step(current_step)

                # 更新狀態
                self.state[current_step.name] = result

                # 保存檢查點
                if self.db:
                    await self._save_checkpoint()

                # 獲取下一步
                current_step = self._get_next_step(current_step, result)

            except Exception as e:
                if self.on_error:
                    await self.on_error(e, current_step, self.state)
                else:
                    raise

        return WorkflowRunOutput(
            state=self.state,
            final_output=self.state.get("output"),
        )

    async def _execute_step(self, step: WorkflowStep) -> Any:
        """執行單個步驟"""

        # 準備輸入
        step_input = step.input_mapper(self.state) if step.input_mapper else self.state

        if isinstance(step.executor, Agent):
            # Agent 步驟
            result = await step.executor.arun(
                message=step_input.get("message", ""),
                **step_input
            )
            return result.content

        elif isinstance(step.executor, Team):
            # Team 步驟
            result = await step.executor.arun(
                message=step_input.get("message", ""),
                **step_input
            )
            return result.content

        elif callable(step.executor):
            # 自定義函數步驟
            if iscoroutinefunction(step.executor):
                return await step.executor(**step_input)
            else:
                return step.executor(**step_input)

    def _get_next_step(
        self,
        current_step: WorkflowStep,
        result: Any
    ) -> Optional[WorkflowStep]:
        """獲取下一個步驟"""

        if current_step.next_step:
            # 固定下一步
            return self._get_step_by_name(current_step.next_step)

        elif current_step.condition:
            # 條件分支
            next_step_name = current_step.condition(result, self.state)
            return self._get_step_by_name(next_step_name)

        else:
            # 順序執行
            current_index = self.steps.index(current_step)
            if current_index + 1 < len(self.steps):
                return self.steps[current_index + 1]

        return None


@dataclass
class WorkflowStep:
    """工作流步驟"""

    name: str
    executor: Union[Agent, Team, Callable]
    input_mapper: Optional[Callable] = None
    next_step: Optional[str] = None
    condition: Optional[Callable] = None


# 使用示例
data_pipeline = Workflow(
    name="Data Analysis Pipeline",
    steps=[
        WorkflowStep(
            name="collect_data",
            executor=data_collector_agent,
            input_mapper=lambda state: {"message": f"Collect data about {state['input']['topic']}"}
        ),
        WorkflowStep(
            name="analyze_data",
            executor=analyst_agent,
            input_mapper=lambda state: {"message": f"Analyze: {state['collect_data']}"}
        ),
        WorkflowStep(
            name="check_quality",
            executor=check_data_quality,  # 自定義函數
            condition=lambda result, state: "generate_report" if result["quality"] > 0.8 else "collect_more_data"
        ),
        WorkflowStep(
            name="collect_more_data",
            executor=data_collector_agent,
            next_step="analyze_data"
        ),
        WorkflowStep(
            name="generate_report",
            executor=report_writer_agent,
        ),
    ]
)

result = await data_pipeline.arun({"topic": "AI market trends"})

第六章:Memory 與 Knowledge

6.1 Memory 系統

graph TB
    subgraph "Memory Types"
        SHORT[Short-term Memory<br/>Ephemeral Context]
        LONG[Long-term Memory<br/>Persistent Storage]
        USER[User Memory<br/>Per-user Context]
        CULTURE[Culture<br/>Shared Knowledge]
    end

    subgraph "Storage"
        SQLITE[(SQLite)]
        POSTGRES[(PostgreSQL)]
        REDIS[(Redis)]
    end

    SHORT --> REDIS
    LONG --> SQLITE
    LONG --> POSTGRES
    USER --> SQLITE
    CULTURE --> POSTGRES

6.2 Memory 實現

# agno/memory/manager.py
class MemoryManager:
    """記憶管理器"""

    def __init__(
        self,
        db: BaseDb,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
    ):
        self.db = db
        self.user_id = user_id
        self.session_id = session_id

    async def add_memory(
        self,
        content: str,
        metadata: Optional[Dict] = None,
        memory_type: str = "general"
    ) -> str:
        """添加記憶"""
        memory_id = str(uuid4())

        await self.db.insert(
            table="memories",
            data={
                "id": memory_id,
                "user_id": self.user_id,
                "session_id": self.session_id,
                "content": content,
                "memory_type": memory_type,
                "metadata": json.dumps(metadata or {}),
                "created_at": datetime.utcnow(),
            }
        )

        return memory_id

    async def search_memories(
        self,
        query: str,
        limit: int = 10,
        memory_type: Optional[str] = None,
    ) -> List[Memory]:
        """搜索相關記憶"""

        # 使用嵌入向量搜索
        query_embedding = await self._get_embedding(query)

        results = await self.db.vector_search(
            table="memories",
            vector=query_embedding,
            limit=limit,
            filters={
                "user_id": self.user_id,
                "memory_type": memory_type,
            } if memory_type else {"user_id": self.user_id}
        )

        return [Memory(**r) for r in results]

    async def get_user_memory(self) -> UserMemory:
        """獲取用戶級別的記憶摘要"""

        memories = await self.db.query(
            table="user_memories",
            filters={"user_id": self.user_id}
        )

        return UserMemory(
            user_id=self.user_id,
            preferences=memories.get("preferences", {}),
            facts=memories.get("facts", []),
            interaction_style=memories.get("interaction_style"),
        )


# agno/knowledge/knowledge.py
class Knowledge:
    """知識庫管理"""

    def __init__(
        self,
        vector_store: VectorStore,
        embedder: Embedder,
        chunk_size: int = 512,
        chunk_overlap: int = 50,
    ):
        self.vector_store = vector_store
        self.embedder = embedder
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    async def add_document(
        self,
        content: str,
        metadata: Optional[Dict] = None,
        source: Optional[str] = None,
    ) -> List[str]:
        """添加文檔到知識庫"""

        # 分塊
        chunks = self._chunk_text(content)

        # 生成嵌入
        embeddings = await self.embedder.embed_batch([c.text for c in chunks])

        # 存儲
        ids = []
        for chunk, embedding in zip(chunks, embeddings):
            doc_id = await self.vector_store.add(
                text=chunk.text,
                embedding=embedding,
                metadata={
                    **(metadata or {}),
                    "source": source,
                    "chunk_index": chunk.index,
                }
            )
            ids.append(doc_id)

        return ids

    async def search(
        self,
        query: str,
        limit: int = 5,
        filters: Optional[KnowledgeFilter] = None,
    ) -> List[KnowledgeResult]:
        """搜索知識庫"""

        # 生成查詢嵌入
        query_embedding = await self.embedder.embed(query)

        # 向量搜索
        results = await self.vector_store.search(
            embedding=query_embedding,
            limit=limit,
            filters=filters,
        )

        return results

    async def agentic_search(
        self,
        query: str,
        agent: Agent,
        max_iterations: int = 3,
    ) -> List[KnowledgeResult]:
        """Agent 驅動的智能搜索"""

        all_results = []
        search_queries = [query]

        for _ in range(max_iterations):
            # 執行搜索
            for q in search_queries:
                results = await self.search(q)
                all_results.extend(results)

            # 讓 Agent 評估結果並生成新查詢
            evaluation = await agent.arun(
                f"""
                Original query: {query}
                Current results: {all_results}

                Are these results sufficient? If not, suggest refined search queries.
                """
            )

            if "sufficient" in evaluation.content.lower():
                break

            # 提取新的搜索查詢
            search_queries = self._extract_queries(evaluation.content)

        return self._deduplicate(all_results)

第七章:AgentOS 運行時

7.1 AgentOS 架構

graph TB
    subgraph "Client"
        UI[AgentOS UI<br/>Browser]
    end

    subgraph "AgentOS Runtime"
        FASTAPI[FastAPI App]
        SSE[SSE Endpoints]
        REST[REST Endpoints]
        WS[WebSocket]

        subgraph "Endpoints"
            CHAT[/v1/chat]
            RUNS[/v1/runs]
            AGENTS[/v1/agents]
            SESSIONS[/v1/sessions]
        end
    end

    subgraph "Your Agents"
        A1[Agent 1]
        A2[Agent 2]
        TEAM[Team]
        WF[Workflow]
    end

    UI --> SSE
    UI --> REST

    FASTAPI --> CHAT
    FASTAPI --> RUNS
    FASTAPI --> AGENTS
    FASTAPI --> SESSIONS

    CHAT --> A1
    CHAT --> A2
    RUNS --> TEAM
    RUNS --> WF

7.2 AgentOS 實現

# agno/os/agent_os.py
class AgentOS:
    """AgentOS 生產運行時"""

    def __init__(
        self,
        agents: Optional[List[Agent]] = None,
        teams: Optional[List[Team]] = None,
        workflows: Optional[List[Workflow]] = None,
        title: str = "AgentOS",
        version: str = "1.0.0",
    ):
        self.agents = {a.name: a for a in (agents or [])}
        self.teams = {t.name: t for t in (teams or [])}
        self.workflows = {w.name: w for w in (workflows or [])}
        self.title = title
        self.version = version

    def get_app(self) -> FastAPI:
        """獲取 FastAPI 應用"""

        app = FastAPI(
            title=self.title,
            version=self.version,
        )

        # 註冊路由
        self._register_routes(app)

        # 配置 CORS
        app.add_middleware(
            CORSMiddleware,
            allow_origins=["*"],
            allow_methods=["*"],
            allow_headers=["*"],
        )

        return app

    def _register_routes(self, app: FastAPI):
        """註冊 API 路由"""

        @app.get("/v1/agents")
        async def list_agents():
            return {
                "agents": [
                    {"name": a.name, "description": a.description}
                    for a in self.agents.values()
                ]
            }

        @app.post("/v1/agents/{agent_name}/runs")
        async def create_run(
            agent_name: str,
            request: RunRequest,
        ):
            agent = self.agents.get(agent_name)
            if not agent:
                raise HTTPException(404, f"Agent {agent_name} not found")

            # 創建運行
            run_id = str(uuid4())

            # 異步執行
            asyncio.create_task(
                self._execute_run(run_id, agent, request)
            )

            return {"run_id": run_id}

        @app.get("/v1/runs/{run_id}/stream")
        async def stream_run(run_id: str):
            """SSE 流式響應"""
            async def event_generator():
                async for event in self._get_run_events(run_id):
                    yield {
                        "event": event.type,
                        "data": json.dumps(event.data),
                    }

            return EventSourceResponse(event_generator())

        @app.post("/v1/chat")
        async def chat(request: ChatRequest):
            """統一聊天接口"""
            agent = self.agents.get(request.agent_name)
            if not agent:
                raise HTTPException(404, f"Agent {request.agent_name} not found")

            response = await agent.arun(
                message=request.message,
                session_id=request.session_id,
                user_id=request.user_id,
            )

            return {
                "content": response.content,
                "session_id": response.session_id,
            }

    async def _execute_run(
        self,
        run_id: str,
        agent: Agent,
        request: RunRequest
    ):
        """執行運行並發送事件"""

        try:
            async for event in agent.arun(
                message=request.message,
                stream=True,
                session_id=request.session_id,
            ):
                await self._emit_event(run_id, event)

        except Exception as e:
            await self._emit_event(run_id, RunEvent(
                type="error",
                data={"error": str(e)}
            ))

    def serve(
        self,
        app: str = "main:app",
        host: str = "0.0.0.0",
        port: int = 8000,
        reload: bool = False,
    ):
        """啟動服務"""
        import uvicorn
        uvicorn.run(app, host=host, port=port, reload=reload)


# 使用示例
from agno.agent import Agent
from agno.models.anthropic import Claude
from agno.os import AgentOS

# 創建 Agent
assistant = Agent(
    name="Assistant",
    description="A helpful AI assistant",
    model=Claude(),
    markdown=True,
)

# 創建 AgentOS
agent_os = AgentOS(agents=[assistant])

# 獲取 FastAPI app
app = agent_os.get_app()

# 運行
if __name__ == "__main__":
    agent_os.serve(app="main:app", reload=True)

第八章:Guardrails 安全機制

8.1 Guardrails 架構

graph TB
    subgraph "Guardrails Pipeline"
        INPUT[User Input]
        PRE[Pre-Guardrails]
        AGENT[Agent Execution]
        POST[Post-Guardrails]
        OUTPUT[Output]

        INPUT --> PRE
        PRE --> |Pass| AGENT
        PRE --> |Block| BLOCKED[Blocked Response]
        AGENT --> POST
        POST --> |Pass| OUTPUT
        POST --> |Block| MODIFIED[Modified Response]
    end

    subgraph "Guardrail Types"
        CONTENT[Content Filter]
        PII[PII Detection]
        TOXIC[Toxicity Check]
        INJECT[Injection Detection]
        CUSTOM[Custom Rules]
    end

8.2 Guardrails 實現

# agno/guardrails/base.py
class BaseGuardrail(ABC):
    """Guardrail 基類"""

    name: str
    enabled: bool = True

    @abstractmethod
    async def check(
        self,
        content: str,
        context: Optional[Dict] = None
    ) -> GuardrailResult:
        """檢查內容"""
        pass


@dataclass
class GuardrailResult:
    passed: bool
    message: Optional[str] = None
    modified_content: Optional[str] = None
    metadata: Optional[Dict] = None


# agno/guardrails/content.py
class ContentFilterGuardrail(BaseGuardrail):
    """內容過濾器"""

    def __init__(
        self,
        blocked_terms: List[str] = None,
        blocked_patterns: List[str] = None,
    ):
        self.name = "content_filter"
        self.blocked_terms = blocked_terms or []
        self.blocked_patterns = [
            re.compile(p) for p in (blocked_patterns or [])
        ]

    async def check(
        self,
        content: str,
        context: Optional[Dict] = None
    ) -> GuardrailResult:
        # 檢查禁用詞
        for term in self.blocked_terms:
            if term.lower() in content.lower():
                return GuardrailResult(
                    passed=False,
                    message=f"Content contains blocked term: {term}"
                )

        # 檢查模式
        for pattern in self.blocked_patterns:
            if pattern.search(content):
                return GuardrailResult(
                    passed=False,
                    message="Content matches blocked pattern"
                )

        return GuardrailResult(passed=True)


# agno/guardrails/pii.py
class PIIGuardrail(BaseGuardrail):
    """PII(個人身份信息)檢測"""

    def __init__(self, action: Literal["block", "mask"] = "mask"):
        self.name = "pii_detector"
        self.action = action

        # PII 模式
        self.patterns = {
            "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            "phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
            "credit_card": r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
        }

    async def check(
        self,
        content: str,
        context: Optional[Dict] = None
    ) -> GuardrailResult:
        detected_pii = []

        for pii_type, pattern in self.patterns.items():
            matches = re.findall(pattern, content)
            if matches:
                detected_pii.append({
                    "type": pii_type,
                    "count": len(matches)
                })

        if not detected_pii:
            return GuardrailResult(passed=True)

        if self.action == "block":
            return GuardrailResult(
                passed=False,
                message=f"PII detected: {detected_pii}"
            )
        else:
            # 遮蔽 PII
            masked_content = content
            for pii_type, pattern in self.patterns.items():
                masked_content = re.sub(pattern, f"[{pii_type.upper()}]", masked_content)

            return GuardrailResult(
                passed=True,
                modified_content=masked_content,
                metadata={"pii_detected": detected_pii}
            )


# agno/guardrails/injection.py
class PromptInjectionGuardrail(BaseGuardrail):
    """提示詞注入檢測"""

    def __init__(self, model: Optional[Model] = None):
        self.name = "injection_detector"
        self.model = model

        # 常見注入模式
        self.injection_patterns = [
            r'ignore\s+(all\s+)?previous\s+instructions',
            r'disregard\s+.*\s+instructions',
            r'forget\s+everything',
            r'you\s+are\s+now\s+a',
            r'pretend\s+to\s+be',
            r'act\s+as\s+if',
        ]

    async def check(
        self,
        content: str,
        context: Optional[Dict] = None
    ) -> GuardrailResult:
        # 模式匹配檢測
        for pattern in self.injection_patterns:
            if re.search(pattern, content, re.IGNORECASE):
                return GuardrailResult(
                    passed=False,
                    message="Potential prompt injection detected"
                )

        # 使用模型進行深度檢測(可選)
        if self.model:
            detection_prompt = f"""
            Analyze the following text for potential prompt injection attempts:

            Text: {content}

            Is this a prompt injection attempt? Respond with YES or NO.
            """

            response = await self.model.ainvoke([
                Message(role="user", content=detection_prompt)
            ])

            if "YES" in response.content.upper():
                return GuardrailResult(
                    passed=False,
                    message="Prompt injection detected by AI analysis"
                )

        return GuardrailResult(passed=True)


# 使用示例
agent = Agent(
    model=Claude(),
    guardrails=[
        ContentFilterGuardrail(blocked_terms=["password", "secret"]),
        PIIGuardrail(action="mask"),
        PromptInjectionGuardrail(),
    ]
)

第九章:最佳實踐

9.1 專案結構

my_agent_project/
├── agents/
│   ├── __init__.py
│   ├── researcher.py
│   ├── writer.py
│   └── editor.py

├── teams/
│   ├── __init__.py
│   └── content_team.py

├── workflows/
│   ├── __init__.py
│   └── article_pipeline.py

├── tools/
│   ├── __init__.py
│   ├── search.py
│   └── database.py

├── knowledge/
│   ├── __init__.py
│   └── setup.py

├── config/
│   ├── __init__.py
│   └── settings.py

├── main.py
├── pyproject.toml
└── README.md

9.2 配置管理

# config/settings.py
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # API Keys
    openai_api_key: str
    anthropic_api_key: str

    # Database
    database_url: str = "sqlite:///agno.db"

    # Vector Store
    vector_store_url: str = "http://localhost:6333"

    # AgentOS
    agent_os_host: str = "0.0.0.0"
    agent_os_port: int = 8000

    class Config:
        env_file = ".env"

settings = Settings()

9.3 錯誤處理

# 自定義異常處理
from agno.exceptions import AgnoException

class AgentExecutionError(AgnoException):
    """Agent 執行錯誤"""
    pass

class ToolExecutionError(AgnoException):
    """工具執行錯誤"""
    pass

# 在 Agent 中使用
agent = Agent(
    model=Claude(),
    on_error=lambda e, ctx: logger.error(f"Agent error: {e}"),
    max_retries=3,
    retry_delay=1.0,
)

總結

Agno 是一個設計精良的多智能體框架,其核心優勢包括:

graph TB
    subgraph "Agno 核心優勢"
        PERF[極致效能<br/>529x faster]
        TYPE[類型安全<br/>Pydantic]
        PROD[生產就緒<br/>AgentOS]
        PRIV[隱私優先<br/>本地運行]
        FLEX[靈活架構<br/>Agent/Team/Workflow]
    end

關鍵要點:

  1. Agent 設計:基於 dataclass 的輕量級設計,支援完整的生命週期管理
  2. 工具系統:統一的工具抽象,支援 MCP 和 A2A 協議
  3. 多智能體協作:Team 提供多種協作模式(路由、協作、協調)
  4. 工作流編排:Workflow 支援複雜的任務流程和條件分支
  5. 生產運行時:AgentOS 提供開箱即用的 FastAPI 服務
  6. 安全機制:完整的 Guardrails 系統保護應用安全

Agno 代表了 AI Agent 框架的新一代設計理念,值得深入學習和實踐。


參考資源

  1. Agno 官方文檔
  2. Agno GitHub
  3. AgentOS UI
  4. Cookbook 範例
  5. MCP 協議規範