Agno 深度解析:下一代多智能體框架的架構與實現
引言:AI Agent 框架的演進
隨著大型語言模型(LLM)的發展,AI Agent 已從概念驗證走向生產應用。然而,構建生產級的多智能體系統面臨諸多挑戰:
- 狀態管理:如何持久化對話歷史和 Agent 狀態?
- 工具整合:如何安全地讓 Agent 使用外部工具?
- 協作機制:多個 Agent 如何高效協作?
- 生產部署:如何將 Agent 系統部署到生產環境?
Agno 是一個為解決這些問題而生的現代多智能體框架。它提供:
- Framework:構建 Agent、Team、Workflow 的完整工具鏈
- AgentOS Runtime:生產級的運行時環境
- Control Plane:可視化監控和管理界面
本文將從源碼層面深入剖析 Agno 的核心架構。
第一章:Agno 整體架構
1.1 架構概覽
graph TB
subgraph "Application Layer"
APP[Your Application]
end
subgraph "Agno Framework"
AGENT[Agent]
TEAM[Team]
WORKFLOW[Workflow]
subgraph "Core Modules"
MODEL[Models]
TOOLS[Tools/Toolkits]
MEMORY[Memory]
KNOWLEDGE[Knowledge]
GUARD[Guardrails]
end
subgraph "Protocols"
MCP[MCP Protocol]
A2A[A2A Protocol]
end
end
subgraph "AgentOS Runtime"
FASTAPI[FastAPI Server]
SSE[SSE Endpoints]
STATE[State Management]
end
subgraph "Storage"
DB[(Database<br/>SQLite/PostgreSQL)]
VECTOR[(Vector Store)]
end
subgraph "LLM Providers"
OPENAI[OpenAI]
ANTHROPIC[Anthropic]
GOOGLE[Google]
OLLAMA[Ollama]
end
APP --> AGENT
APP --> TEAM
APP --> WORKFLOW
AGENT --> MODEL
AGENT --> TOOLS
AGENT --> MEMORY
AGENT --> KNOWLEDGE
AGENT --> GUARD
AGENT --> MCP
TEAM --> AGENT
WORKFLOW --> AGENT
WORKFLOW --> TEAM
FASTAPI --> AGENT
FASTAPI --> TEAM
FASTAPI --> WORKFLOW
MODEL --> OPENAI
MODEL --> ANTHROPIC
MODEL --> GOOGLE
MODEL --> OLLAMA
MEMORY --> DB
KNOWLEDGE --> VECTOR
1.2 核心設計理念
graph LR
subgraph "Agno 設計原則"
PERF[極致效能<br/>μs 級實例化]
TYPE[類型安全<br/>Pydantic Schema]
ASYNC[異步優先<br/>高吞吐量]
PROD[生產就緒<br/>開箱即用]
PRIV[隱私優先<br/>本地運行]
end
效能對比(官方基準測試):
| 指標 | Agno | LangGraph | CrewAI | PydanticAI |
|---|---|---|---|---|
| 實例化時間 | ~3μs | ~1.6ms | ~210μs | ~171μs |
| 記憶體佔用 | ~6.6KB | ~160KB | ~66KB | ~26KB |
| 速度倍數 | 1x | 529x 慢 | 70x 慢 | 57x 慢 |
第二章:Agent 核心架構
2.1 Agent 類結構
# agno/agent/agent.py
@dataclass(init=False)
class Agent:
"""Agno Agent 核心類"""
# === 基礎設置 ===
model: Optional[Model] = None # LLM 模型
name: Optional[str] = None # Agent 名稱
id: Optional[str] = None # 唯一標識符
# === 用戶與會話 ===
user_id: Optional[str] = None
session_id: Optional[str] = None
# === 提示詞設置 ===
system_message: Optional[str] = None # 系統提示
instructions: Optional[List[str]] = None # 指令列表
description: Optional[str] = None # Agent 描述
# === 工具與能力 ===
tools: Optional[List[Toolkit]] = None # 工具集
knowledge: Optional[Knowledge] = None # 知識庫
memory: Optional[MemoryManager] = None # 記憶管理
# === 輸入輸出模式 ===
input_schema: Optional[Type[BaseModel]] = None
output_schema: Optional[Type[BaseModel]] = None
# === 安全與控制 ===
guardrails: Optional[List[BaseGuardrail]] = None
pre_hooks: Optional[List[Callable]] = None
post_hooks: Optional[List[Callable]] = None
# === 資料持久化 ===
db: Optional[BaseDb] = None # 資料庫連接
# === 高級功能 ===
reasoning: bool = False # 推理模式
add_history_to_context: bool = False # 歷史上下文
markdown: bool = False # Markdown 輸出
2.2 Agent 生命週期
sequenceDiagram
participant User
participant Agent
participant PreHook
participant Model
participant Tools
participant PostHook
participant DB
User->>Agent: run(message)
Agent->>Agent: 建立 RunContext
Agent->>PreHook: 執行前置鉤子
PreHook-->>Agent: 驗證/轉換輸入
Agent->>Agent: 構建 System Message
Agent->>Agent: 加載歷史上下文
Agent->>Agent: 查詢 Knowledge
Agent->>Model: 發送請求
Model-->>Agent: 響應/工具調用
loop 工具調用循環
Agent->>Tools: 執行工具
Tools-->>Agent: 工具結果
Agent->>Model: 繼續對話
Model-->>Agent: 響應
end
Agent->>PostHook: 執行後置鉤子
PostHook-->>Agent: 處理輸出
Agent->>DB: 保存會話
Agent-->>User: RunOutput
2.3 核心運行邏輯
# agno/agent/agent.py (簡化版)
class Agent:
async def arun(
self,
message: Optional[str] = None,
*,
messages: Optional[List[Message]] = None,
images: Optional[List[Image]] = None,
audio: Optional[List[Audio]] = None,
videos: Optional[List[Video]] = None,
files: Optional[List[File]] = None,
stream: bool = True,
**kwargs
) -> Union[RunOutput, AsyncIterator[RunOutputEvent]]:
"""異步運行 Agent"""
# 1. 創建運行上下文
run_context = RunContext(
agent=self,
user_id=self.user_id,
session_id=self.session_id or str(uuid4()),
)
# 2. 執行前置鉤子
if self.pre_hooks:
message = await self._execute_pre_hooks(message, run_context)
# 3. 驗證輸入
if self.input_schema:
message = await self._validate_input(message)
# 4. 構建消息列表
run_messages = await self._build_messages(
message=message,
messages=messages,
images=images,
audio=audio,
run_context=run_context,
)
# 5. 查詢知識庫(如果配置)
if self.knowledge:
knowledge_context = await self._search_knowledge(message)
run_messages = self._inject_knowledge(run_messages, knowledge_context)
# 6. 調用模型
if stream:
return self._stream_response(run_messages, run_context)
else:
return await self._get_response(run_messages, run_context)
async def _get_response(
self,
messages: RunMessages,
run_context: RunContext,
) -> RunOutput:
"""獲取模型響應"""
# 調用模型
model_response = await self.model.ainvoke(
messages=messages.to_model_messages(),
tools=self._get_tools_for_model(),
)
# 處理工具調用
while model_response.tool_calls:
tool_results = await self._execute_tool_calls(
model_response.tool_calls,
run_context,
)
# 將工具結果加入消息
messages.add_tool_results(tool_results)
# 繼續調用模型
model_response = await self.model.ainvoke(
messages=messages.to_model_messages(),
tools=self._get_tools_for_model(),
)
# 驗證輸出
if self.output_schema:
output = await self._validate_output(model_response.content)
else:
output = model_response.content
# 執行後置鉤子
if self.post_hooks:
output = await self._execute_post_hooks(output, run_context)
# 保存會話
if self.db:
await self._save_session(messages, output, run_context)
return RunOutput(
content=output,
messages=messages,
metrics=model_response.metrics,
)
async def _execute_tool_calls(
self,
tool_calls: List[ToolCall],
run_context: RunContext,
) -> List[ToolResult]:
"""並行執行工具調用"""
async def execute_single(tool_call: ToolCall) -> ToolResult:
tool = self._get_tool(tool_call.name)
if tool is None:
return ToolResult(
tool_call_id=tool_call.id,
error=f"Tool {tool_call.name} not found",
)
try:
# 執行工具
result = await tool.aexecute(**tool_call.arguments)
return ToolResult(
tool_call_id=tool_call.id,
content=result,
)
except Exception as e:
return ToolResult(
tool_call_id=tool_call.id,
error=str(e),
)
# 並行執行所有工具調用
results = await asyncio.gather(
*[execute_single(tc) for tc in tool_calls]
)
return list(results)
2.4 模型抽象層
# agno/models/base.py
class Model(ABC):
"""模型抽象基類"""
id: str
name: str
provider: str
@abstractmethod
async def ainvoke(
self,
messages: List[Message],
tools: Optional[List[Tool]] = None,
**kwargs
) -> ModelResponse:
"""異步調用模型"""
pass
@abstractmethod
async def astream(
self,
messages: List[Message],
tools: Optional[List[Tool]] = None,
**kwargs
) -> AsyncIterator[ModelResponseEvent]:
"""異步流式調用"""
pass
# agno/models/anthropic/claude.py
class Claude(Model):
"""Claude 模型實現"""
def __init__(
self,
id: str = "claude-sonnet-4-5",
api_key: Optional[str] = None,
**kwargs
):
self.id = id
self.client = anthropic.AsyncAnthropic(api_key=api_key)
async def ainvoke(
self,
messages: List[Message],
tools: Optional[List[Tool]] = None,
**kwargs
) -> ModelResponse:
# 轉換消息格式
anthropic_messages = self._convert_messages(messages)
# 轉換工具格式
anthropic_tools = self._convert_tools(tools) if tools else None
# 調用 API
response = await self.client.messages.create(
model=self.id,
messages=anthropic_messages,
tools=anthropic_tools,
max_tokens=kwargs.get("max_tokens", 4096),
)
return self._convert_response(response)
# agno/models/openai/gpt.py
class OpenAIChat(Model):
"""OpenAI GPT 模型實現"""
def __init__(
self,
id: str = "gpt-4o",
api_key: Optional[str] = None,
**kwargs
):
self.id = id
self.client = openai.AsyncOpenAI(api_key=api_key)
async def ainvoke(
self,
messages: List[Message],
tools: Optional[List[Tool]] = None,
**kwargs
) -> ModelResponse:
response = await self.client.chat.completions.create(
model=self.id,
messages=self._convert_messages(messages),
tools=self._convert_tools(tools) if tools else None,
)
return self._convert_response(response)
第三章:工具系統
3.1 Toolkit 架構
graph TB
subgraph "Tool System"
TOOLKIT[Toolkit Base]
subgraph "Built-in Toolkits"
WEB[WebTools]
FILE[FileTools]
CODE[CodeTools]
DB[DatabaseTools]
API[APITools]
end
subgraph "Protocol Tools"
MCP_T[MCPTools]
A2A_T[A2ATools]
end
CUSTOM[Custom Tools]
end
TOOLKIT --> WEB
TOOLKIT --> FILE
TOOLKIT --> CODE
TOOLKIT --> DB
TOOLKIT --> API
TOOLKIT --> MCP_T
TOOLKIT --> A2A_T
TOOLKIT --> CUSTOM
3.2 Toolkit 實現
# agno/tools/toolkit.py
class Toolkit(ABC):
"""工具集基類"""
name: str
description: str
enabled: bool = True
@abstractmethod
def get_tools(self) -> List[Function]:
"""返回工具列表"""
pass
# agno/tools/function.py
class Function:
"""函數工具封裝"""
def __init__(
self,
name: str,
description: str,
parameters: Dict[str, Any],
func: Callable,
requires_confirmation: bool = False,
):
self.name = name
self.description = description
self.parameters = parameters
self.func = func
self.requires_confirmation = requires_confirmation
async def aexecute(self, **kwargs) -> Any:
"""異步執行函數"""
if iscoroutinefunction(self.func):
return await self.func(**kwargs)
else:
return await asyncio.to_thread(self.func, **kwargs)
def to_openai_tool(self) -> Dict[str, Any]:
"""轉換為 OpenAI 工具格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
}
}
# 使用裝飾器定義工具
from agno.tools import tool
class MyToolkit(Toolkit):
name = "my_toolkit"
description = "My custom tools"
@tool
def search_web(self, query: str) -> str:
"""Search the web for information.
Args:
query: The search query
"""
# 實現搜索邏輯
return f"Results for: {query}"
@tool
async def fetch_data(self, url: str) -> dict:
"""Fetch data from a URL.
Args:
url: The URL to fetch
"""
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
def get_tools(self) -> List[Function]:
return [
self.search_web,
self.fetch_data,
]
3.3 MCP 整合
# agno/tools/mcp.py
class MCPTools(Toolkit):
"""Model Context Protocol 工具集"""
def __init__(
self,
transport: Literal["stdio", "streamable-http"] = "stdio",
url: Optional[str] = None,
command: Optional[str] = None,
**kwargs
):
self.transport = transport
self.url = url
self.command = command
self._client = None
self._tools = None
async def _connect(self):
"""建立 MCP 連接"""
if self.transport == "streamable-http":
self._client = MCPHttpClient(self.url)
else:
self._client = MCPStdioClient(self.command)
await self._client.connect()
self._tools = await self._client.list_tools()
def get_tools(self) -> List[Function]:
"""獲取 MCP 服務器提供的工具"""
if self._tools is None:
asyncio.run(self._connect())
functions = []
for tool in self._tools:
func = Function(
name=tool.name,
description=tool.description,
parameters=tool.input_schema,
func=self._create_executor(tool.name),
)
functions.append(func)
return functions
def _create_executor(self, tool_name: str) -> Callable:
"""創建工具執行器"""
async def executor(**kwargs) -> Any:
return await self._client.call_tool(tool_name, kwargs)
return executor
# 使用示例
agent = Agent(
model=Claude(),
tools=[
MCPTools(
transport="streamable-http",
url="https://example.com/mcp"
)
]
)
第四章:Team 多智能體協作
4.1 Team 架構
graph TB
subgraph "Team Structure"
TEAM[Team]
LEADER[Team Leader]
MEMBER1[Agent 1]
MEMBER2[Agent 2]
MEMBER3[Agent 3]
TEAM --> LEADER
LEADER --> MEMBER1
LEADER --> MEMBER2
LEADER --> MEMBER3
end
subgraph "Collaboration Modes"
ROUTE[Route Mode<br/>路由模式]
COLLAB[Collaborate Mode<br/>協作模式]
COORD[Coordinate Mode<br/>協調模式]
end
subgraph "Communication"
SHARE[Shared Memory]
MSG[Message Passing]
STATE[Shared State]
end
4.2 Team 實現
# agno/team/team.py
@dataclass(init=False)
class Team:
"""多智能體團隊"""
name: str
agents: List[Agent]
leader: Optional[Agent] = None
mode: Literal["route", "collaborate", "coordinate"] = "route"
# 共享資源
shared_memory: Optional[MemoryManager] = None
shared_knowledge: Optional[Knowledge] = None
# 團隊設置
max_rounds: int = 10
enable_agentic_context: bool = True
async def arun(
self,
message: str,
**kwargs
) -> TeamRunOutput:
"""運行團隊"""
if self.mode == "route":
return await self._run_route_mode(message, **kwargs)
elif self.mode == "collaborate":
return await self._run_collaborate_mode(message, **kwargs)
elif self.mode == "coordinate":
return await self._run_coordinate_mode(message, **kwargs)
async def _run_route_mode(
self,
message: str,
**kwargs
) -> TeamRunOutput:
"""路由模式:Leader 選擇最適合的 Agent 處理"""
# 1. Leader 分析任務並選擇 Agent
selection_prompt = self._build_selection_prompt(message)
leader_response = await self.leader.arun(selection_prompt)
selected_agent_name = self._parse_agent_selection(leader_response)
selected_agent = self._get_agent_by_name(selected_agent_name)
# 2. 選中的 Agent 處理任務
agent_response = await selected_agent.arun(message, **kwargs)
return TeamRunOutput(
content=agent_response.content,
selected_agent=selected_agent_name,
messages=agent_response.messages,
)
async def _run_collaborate_mode(
self,
message: str,
**kwargs
) -> TeamRunOutput:
"""協作模式:多個 Agent 並行處理,合併結果"""
# 1. 分解任務
subtasks = await self._decompose_task(message)
# 2. 並行分配給不同 Agent
tasks = []
for agent, subtask in zip(self.agents, subtasks):
tasks.append(agent.arun(subtask, **kwargs))
responses = await asyncio.gather(*tasks)
# 3. 合併結果
merged_result = await self._merge_responses(responses)
return TeamRunOutput(
content=merged_result,
agent_responses=responses,
)
async def _run_coordinate_mode(
self,
message: str,
**kwargs
) -> TeamRunOutput:
"""協調模式:Agent 輪流處理,傳遞上下文"""
context = {"original_message": message}
current_message = message
for round_num in range(self.max_rounds):
for agent in self.agents:
# 構建包含上下文的消息
augmented_message = self._build_context_message(
current_message, context
)
# Agent 處理
response = await agent.arun(augmented_message, **kwargs)
# 更新上下文
context[f"{agent.name}_response"] = response.content
# 檢查是否完成
if self._is_task_complete(response):
return TeamRunOutput(
content=response.content,
rounds=round_num + 1,
context=context,
)
current_message = response.content
return TeamRunOutput(
content=current_message,
rounds=self.max_rounds,
context=context,
)
def _build_selection_prompt(self, message: str) -> str:
"""構建 Agent 選擇提示"""
agents_info = "\n".join([
f"- {agent.name}: {agent.description}"
for agent in self.agents
])
return f"""
Based on the following task, select the most appropriate agent:
Task: {message}
Available Agents:
{agents_info}
Respond with just the agent name.
"""
# 使用示例
research_agent = Agent(
name="Researcher",
description="Expert at finding and analyzing information",
model=Claude(),
tools=[WebSearchTools()],
)
writer_agent = Agent(
name="Writer",
description="Expert at writing clear and engaging content",
model=Claude(),
)
editor_agent = Agent(
name="Editor",
description="Expert at reviewing and improving content",
model=Claude(),
)
content_team = Team(
name="Content Team",
agents=[research_agent, writer_agent, editor_agent],
leader=Agent(
name="Team Leader",
description="Coordinates the content creation process",
model=Claude(),
),
mode="coordinate",
)
# 運行團隊
result = await content_team.arun("Write an article about AI trends in 2024")
第五章:Workflow 工作流
5.1 Workflow 架構
graph TB
subgraph "Workflow Structure"
START((Start))
STEP1[Step 1<br/>Agent A]
STEP2[Step 2<br/>Agent B]
COND{Condition}
STEP3A[Step 3A]
STEP3B[Step 3B]
END((End))
START --> STEP1
STEP1 --> STEP2
STEP2 --> COND
COND --> |Yes| STEP3A
COND --> |No| STEP3B
STEP3A --> END
STEP3B --> END
end
5.2 Workflow 實現
# agno/workflow/workflow.py
@dataclass(init=False)
class Workflow:
"""工作流定義"""
name: str
description: str
# 工作流步驟
steps: List[WorkflowStep]
# 狀態管理
state: Dict[str, Any] = field(default_factory=dict)
# 持久化
db: Optional[BaseDb] = None
# 錯誤處理
on_error: Optional[Callable] = None
max_retries: int = 3
async def arun(
self,
input_data: Dict[str, Any],
**kwargs
) -> WorkflowRunOutput:
"""運行工作流"""
self.state = {"input": input_data}
current_step = self.steps[0]
while current_step is not None:
try:
# 執行步驟
result = await self._execute_step(current_step)
# 更新狀態
self.state[current_step.name] = result
# 保存檢查點
if self.db:
await self._save_checkpoint()
# 獲取下一步
current_step = self._get_next_step(current_step, result)
except Exception as e:
if self.on_error:
await self.on_error(e, current_step, self.state)
else:
raise
return WorkflowRunOutput(
state=self.state,
final_output=self.state.get("output"),
)
async def _execute_step(self, step: WorkflowStep) -> Any:
"""執行單個步驟"""
# 準備輸入
step_input = step.input_mapper(self.state) if step.input_mapper else self.state
if isinstance(step.executor, Agent):
# Agent 步驟
result = await step.executor.arun(
message=step_input.get("message", ""),
**step_input
)
return result.content
elif isinstance(step.executor, Team):
# Team 步驟
result = await step.executor.arun(
message=step_input.get("message", ""),
**step_input
)
return result.content
elif callable(step.executor):
# 自定義函數步驟
if iscoroutinefunction(step.executor):
return await step.executor(**step_input)
else:
return step.executor(**step_input)
def _get_next_step(
self,
current_step: WorkflowStep,
result: Any
) -> Optional[WorkflowStep]:
"""獲取下一個步驟"""
if current_step.next_step:
# 固定下一步
return self._get_step_by_name(current_step.next_step)
elif current_step.condition:
# 條件分支
next_step_name = current_step.condition(result, self.state)
return self._get_step_by_name(next_step_name)
else:
# 順序執行
current_index = self.steps.index(current_step)
if current_index + 1 < len(self.steps):
return self.steps[current_index + 1]
return None
@dataclass
class WorkflowStep:
"""工作流步驟"""
name: str
executor: Union[Agent, Team, Callable]
input_mapper: Optional[Callable] = None
next_step: Optional[str] = None
condition: Optional[Callable] = None
# 使用示例
data_pipeline = Workflow(
name="Data Analysis Pipeline",
steps=[
WorkflowStep(
name="collect_data",
executor=data_collector_agent,
input_mapper=lambda state: {"message": f"Collect data about {state['input']['topic']}"}
),
WorkflowStep(
name="analyze_data",
executor=analyst_agent,
input_mapper=lambda state: {"message": f"Analyze: {state['collect_data']}"}
),
WorkflowStep(
name="check_quality",
executor=check_data_quality, # 自定義函數
condition=lambda result, state: "generate_report" if result["quality"] > 0.8 else "collect_more_data"
),
WorkflowStep(
name="collect_more_data",
executor=data_collector_agent,
next_step="analyze_data"
),
WorkflowStep(
name="generate_report",
executor=report_writer_agent,
),
]
)
result = await data_pipeline.arun({"topic": "AI market trends"})
第六章:Memory 與 Knowledge
6.1 Memory 系統
graph TB
subgraph "Memory Types"
SHORT[Short-term Memory<br/>Ephemeral Context]
LONG[Long-term Memory<br/>Persistent Storage]
USER[User Memory<br/>Per-user Context]
CULTURE[Culture<br/>Shared Knowledge]
end
subgraph "Storage"
SQLITE[(SQLite)]
POSTGRES[(PostgreSQL)]
REDIS[(Redis)]
end
SHORT --> REDIS
LONG --> SQLITE
LONG --> POSTGRES
USER --> SQLITE
CULTURE --> POSTGRES
6.2 Memory 實現
# agno/memory/manager.py
class MemoryManager:
"""記憶管理器"""
def __init__(
self,
db: BaseDb,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
):
self.db = db
self.user_id = user_id
self.session_id = session_id
async def add_memory(
self,
content: str,
metadata: Optional[Dict] = None,
memory_type: str = "general"
) -> str:
"""添加記憶"""
memory_id = str(uuid4())
await self.db.insert(
table="memories",
data={
"id": memory_id,
"user_id": self.user_id,
"session_id": self.session_id,
"content": content,
"memory_type": memory_type,
"metadata": json.dumps(metadata or {}),
"created_at": datetime.utcnow(),
}
)
return memory_id
async def search_memories(
self,
query: str,
limit: int = 10,
memory_type: Optional[str] = None,
) -> List[Memory]:
"""搜索相關記憶"""
# 使用嵌入向量搜索
query_embedding = await self._get_embedding(query)
results = await self.db.vector_search(
table="memories",
vector=query_embedding,
limit=limit,
filters={
"user_id": self.user_id,
"memory_type": memory_type,
} if memory_type else {"user_id": self.user_id}
)
return [Memory(**r) for r in results]
async def get_user_memory(self) -> UserMemory:
"""獲取用戶級別的記憶摘要"""
memories = await self.db.query(
table="user_memories",
filters={"user_id": self.user_id}
)
return UserMemory(
user_id=self.user_id,
preferences=memories.get("preferences", {}),
facts=memories.get("facts", []),
interaction_style=memories.get("interaction_style"),
)
# agno/knowledge/knowledge.py
class Knowledge:
"""知識庫管理"""
def __init__(
self,
vector_store: VectorStore,
embedder: Embedder,
chunk_size: int = 512,
chunk_overlap: int = 50,
):
self.vector_store = vector_store
self.embedder = embedder
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
async def add_document(
self,
content: str,
metadata: Optional[Dict] = None,
source: Optional[str] = None,
) -> List[str]:
"""添加文檔到知識庫"""
# 分塊
chunks = self._chunk_text(content)
# 生成嵌入
embeddings = await self.embedder.embed_batch([c.text for c in chunks])
# 存儲
ids = []
for chunk, embedding in zip(chunks, embeddings):
doc_id = await self.vector_store.add(
text=chunk.text,
embedding=embedding,
metadata={
**(metadata or {}),
"source": source,
"chunk_index": chunk.index,
}
)
ids.append(doc_id)
return ids
async def search(
self,
query: str,
limit: int = 5,
filters: Optional[KnowledgeFilter] = None,
) -> List[KnowledgeResult]:
"""搜索知識庫"""
# 生成查詢嵌入
query_embedding = await self.embedder.embed(query)
# 向量搜索
results = await self.vector_store.search(
embedding=query_embedding,
limit=limit,
filters=filters,
)
return results
async def agentic_search(
self,
query: str,
agent: Agent,
max_iterations: int = 3,
) -> List[KnowledgeResult]:
"""Agent 驅動的智能搜索"""
all_results = []
search_queries = [query]
for _ in range(max_iterations):
# 執行搜索
for q in search_queries:
results = await self.search(q)
all_results.extend(results)
# 讓 Agent 評估結果並生成新查詢
evaluation = await agent.arun(
f"""
Original query: {query}
Current results: {all_results}
Are these results sufficient? If not, suggest refined search queries.
"""
)
if "sufficient" in evaluation.content.lower():
break
# 提取新的搜索查詢
search_queries = self._extract_queries(evaluation.content)
return self._deduplicate(all_results)
第七章:AgentOS 運行時
7.1 AgentOS 架構
graph TB
subgraph "Client"
UI[AgentOS UI<br/>Browser]
end
subgraph "AgentOS Runtime"
FASTAPI[FastAPI App]
SSE[SSE Endpoints]
REST[REST Endpoints]
WS[WebSocket]
subgraph "Endpoints"
CHAT[/v1/chat]
RUNS[/v1/runs]
AGENTS[/v1/agents]
SESSIONS[/v1/sessions]
end
end
subgraph "Your Agents"
A1[Agent 1]
A2[Agent 2]
TEAM[Team]
WF[Workflow]
end
UI --> SSE
UI --> REST
FASTAPI --> CHAT
FASTAPI --> RUNS
FASTAPI --> AGENTS
FASTAPI --> SESSIONS
CHAT --> A1
CHAT --> A2
RUNS --> TEAM
RUNS --> WF
7.2 AgentOS 實現
# agno/os/agent_os.py
class AgentOS:
"""AgentOS 生產運行時"""
def __init__(
self,
agents: Optional[List[Agent]] = None,
teams: Optional[List[Team]] = None,
workflows: Optional[List[Workflow]] = None,
title: str = "AgentOS",
version: str = "1.0.0",
):
self.agents = {a.name: a for a in (agents or [])}
self.teams = {t.name: t for t in (teams or [])}
self.workflows = {w.name: w for w in (workflows or [])}
self.title = title
self.version = version
def get_app(self) -> FastAPI:
"""獲取 FastAPI 應用"""
app = FastAPI(
title=self.title,
version=self.version,
)
# 註冊路由
self._register_routes(app)
# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
return app
def _register_routes(self, app: FastAPI):
"""註冊 API 路由"""
@app.get("/v1/agents")
async def list_agents():
return {
"agents": [
{"name": a.name, "description": a.description}
for a in self.agents.values()
]
}
@app.post("/v1/agents/{agent_name}/runs")
async def create_run(
agent_name: str,
request: RunRequest,
):
agent = self.agents.get(agent_name)
if not agent:
raise HTTPException(404, f"Agent {agent_name} not found")
# 創建運行
run_id = str(uuid4())
# 異步執行
asyncio.create_task(
self._execute_run(run_id, agent, request)
)
return {"run_id": run_id}
@app.get("/v1/runs/{run_id}/stream")
async def stream_run(run_id: str):
"""SSE 流式響應"""
async def event_generator():
async for event in self._get_run_events(run_id):
yield {
"event": event.type,
"data": json.dumps(event.data),
}
return EventSourceResponse(event_generator())
@app.post("/v1/chat")
async def chat(request: ChatRequest):
"""統一聊天接口"""
agent = self.agents.get(request.agent_name)
if not agent:
raise HTTPException(404, f"Agent {request.agent_name} not found")
response = await agent.arun(
message=request.message,
session_id=request.session_id,
user_id=request.user_id,
)
return {
"content": response.content,
"session_id": response.session_id,
}
async def _execute_run(
self,
run_id: str,
agent: Agent,
request: RunRequest
):
"""執行運行並發送事件"""
try:
async for event in agent.arun(
message=request.message,
stream=True,
session_id=request.session_id,
):
await self._emit_event(run_id, event)
except Exception as e:
await self._emit_event(run_id, RunEvent(
type="error",
data={"error": str(e)}
))
def serve(
self,
app: str = "main:app",
host: str = "0.0.0.0",
port: int = 8000,
reload: bool = False,
):
"""啟動服務"""
import uvicorn
uvicorn.run(app, host=host, port=port, reload=reload)
# 使用示例
from agno.agent import Agent
from agno.models.anthropic import Claude
from agno.os import AgentOS
# 創建 Agent
assistant = Agent(
name="Assistant",
description="A helpful AI assistant",
model=Claude(),
markdown=True,
)
# 創建 AgentOS
agent_os = AgentOS(agents=[assistant])
# 獲取 FastAPI app
app = agent_os.get_app()
# 運行
if __name__ == "__main__":
agent_os.serve(app="main:app", reload=True)
第八章:Guardrails 安全機制
8.1 Guardrails 架構
graph TB
subgraph "Guardrails Pipeline"
INPUT[User Input]
PRE[Pre-Guardrails]
AGENT[Agent Execution]
POST[Post-Guardrails]
OUTPUT[Output]
INPUT --> PRE
PRE --> |Pass| AGENT
PRE --> |Block| BLOCKED[Blocked Response]
AGENT --> POST
POST --> |Pass| OUTPUT
POST --> |Block| MODIFIED[Modified Response]
end
subgraph "Guardrail Types"
CONTENT[Content Filter]
PII[PII Detection]
TOXIC[Toxicity Check]
INJECT[Injection Detection]
CUSTOM[Custom Rules]
end
8.2 Guardrails 實現
# agno/guardrails/base.py
class BaseGuardrail(ABC):
"""Guardrail 基類"""
name: str
enabled: bool = True
@abstractmethod
async def check(
self,
content: str,
context: Optional[Dict] = None
) -> GuardrailResult:
"""檢查內容"""
pass
@dataclass
class GuardrailResult:
passed: bool
message: Optional[str] = None
modified_content: Optional[str] = None
metadata: Optional[Dict] = None
# agno/guardrails/content.py
class ContentFilterGuardrail(BaseGuardrail):
"""內容過濾器"""
def __init__(
self,
blocked_terms: List[str] = None,
blocked_patterns: List[str] = None,
):
self.name = "content_filter"
self.blocked_terms = blocked_terms or []
self.blocked_patterns = [
re.compile(p) for p in (blocked_patterns or [])
]
async def check(
self,
content: str,
context: Optional[Dict] = None
) -> GuardrailResult:
# 檢查禁用詞
for term in self.blocked_terms:
if term.lower() in content.lower():
return GuardrailResult(
passed=False,
message=f"Content contains blocked term: {term}"
)
# 檢查模式
for pattern in self.blocked_patterns:
if pattern.search(content):
return GuardrailResult(
passed=False,
message="Content matches blocked pattern"
)
return GuardrailResult(passed=True)
# agno/guardrails/pii.py
class PIIGuardrail(BaseGuardrail):
"""PII(個人身份信息)檢測"""
def __init__(self, action: Literal["block", "mask"] = "mask"):
self.name = "pii_detector"
self.action = action
# PII 模式
self.patterns = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
"credit_card": r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
}
async def check(
self,
content: str,
context: Optional[Dict] = None
) -> GuardrailResult:
detected_pii = []
for pii_type, pattern in self.patterns.items():
matches = re.findall(pattern, content)
if matches:
detected_pii.append({
"type": pii_type,
"count": len(matches)
})
if not detected_pii:
return GuardrailResult(passed=True)
if self.action == "block":
return GuardrailResult(
passed=False,
message=f"PII detected: {detected_pii}"
)
else:
# 遮蔽 PII
masked_content = content
for pii_type, pattern in self.patterns.items():
masked_content = re.sub(pattern, f"[{pii_type.upper()}]", masked_content)
return GuardrailResult(
passed=True,
modified_content=masked_content,
metadata={"pii_detected": detected_pii}
)
# agno/guardrails/injection.py
class PromptInjectionGuardrail(BaseGuardrail):
"""提示詞注入檢測"""
def __init__(self, model: Optional[Model] = None):
self.name = "injection_detector"
self.model = model
# 常見注入模式
self.injection_patterns = [
r'ignore\s+(all\s+)?previous\s+instructions',
r'disregard\s+.*\s+instructions',
r'forget\s+everything',
r'you\s+are\s+now\s+a',
r'pretend\s+to\s+be',
r'act\s+as\s+if',
]
async def check(
self,
content: str,
context: Optional[Dict] = None
) -> GuardrailResult:
# 模式匹配檢測
for pattern in self.injection_patterns:
if re.search(pattern, content, re.IGNORECASE):
return GuardrailResult(
passed=False,
message="Potential prompt injection detected"
)
# 使用模型進行深度檢測(可選)
if self.model:
detection_prompt = f"""
Analyze the following text for potential prompt injection attempts:
Text: {content}
Is this a prompt injection attempt? Respond with YES or NO.
"""
response = await self.model.ainvoke([
Message(role="user", content=detection_prompt)
])
if "YES" in response.content.upper():
return GuardrailResult(
passed=False,
message="Prompt injection detected by AI analysis"
)
return GuardrailResult(passed=True)
# 使用示例
agent = Agent(
model=Claude(),
guardrails=[
ContentFilterGuardrail(blocked_terms=["password", "secret"]),
PIIGuardrail(action="mask"),
PromptInjectionGuardrail(),
]
)
第九章:最佳實踐
9.1 專案結構
my_agent_project/
├── agents/
│ ├── __init__.py
│ ├── researcher.py
│ ├── writer.py
│ └── editor.py
│
├── teams/
│ ├── __init__.py
│ └── content_team.py
│
├── workflows/
│ ├── __init__.py
│ └── article_pipeline.py
│
├── tools/
│ ├── __init__.py
│ ├── search.py
│ └── database.py
│
├── knowledge/
│ ├── __init__.py
│ └── setup.py
│
├── config/
│ ├── __init__.py
│ └── settings.py
│
├── main.py
├── pyproject.toml
└── README.md
9.2 配置管理
# config/settings.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# API Keys
openai_api_key: str
anthropic_api_key: str
# Database
database_url: str = "sqlite:///agno.db"
# Vector Store
vector_store_url: str = "http://localhost:6333"
# AgentOS
agent_os_host: str = "0.0.0.0"
agent_os_port: int = 8000
class Config:
env_file = ".env"
settings = Settings()
9.3 錯誤處理
# 自定義異常處理
from agno.exceptions import AgnoException
class AgentExecutionError(AgnoException):
"""Agent 執行錯誤"""
pass
class ToolExecutionError(AgnoException):
"""工具執行錯誤"""
pass
# 在 Agent 中使用
agent = Agent(
model=Claude(),
on_error=lambda e, ctx: logger.error(f"Agent error: {e}"),
max_retries=3,
retry_delay=1.0,
)
總結
Agno 是一個設計精良的多智能體框架,其核心優勢包括:
graph TB
subgraph "Agno 核心優勢"
PERF[極致效能<br/>529x faster]
TYPE[類型安全<br/>Pydantic]
PROD[生產就緒<br/>AgentOS]
PRIV[隱私優先<br/>本地運行]
FLEX[靈活架構<br/>Agent/Team/Workflow]
end
關鍵要點:
- Agent 設計:基於 dataclass 的輕量級設計,支援完整的生命週期管理
- 工具系統:統一的工具抽象,支援 MCP 和 A2A 協議
- 多智能體協作:Team 提供多種協作模式(路由、協作、協調)
- 工作流編排:Workflow 支援複雜的任務流程和條件分支
- 生產運行時:AgentOS 提供開箱即用的 FastAPI 服務
- 安全機制:完整的 Guardrails 系統保護應用安全
Agno 代表了 AI Agent 框架的新一代設計理念,值得深入學習和實踐。