5.6 Agent 架构设计
从单 Agent 到多 Agent 协作——选对架构模式决定了 Agent 系统的可靠性、可扩展性和可维护性。
学习时长:2-3 周
📋 6 大 Agent 架构模式总览
| # | 架构模式 | 核心思想 | 通信方式 | 典型场景 | 章节 |
|---|---|---|---|---|---|
| 1 | 单 Agent(Router) | 一个 LLM + 多工具,路由选择 | — | 聊天助手、简单问答 | §六大架构 1 |
| 2 | Supervisor(监督者) | 中心 Agent 分发任务,专家执行 | 中心化 | 内容创作、研究报告 | §六大架构 2 |
| 3 | Hierarchical(层级) | 多层 Supervisor,树状分发 | 树状 | 软件开发、企业流程 | §六大架构 3 |
| 4 | Swarm(群体智能) | Agent 间平等协作,Handoff 传递 | P2P | 客服系统、流程审批 | §六大架构 4 |
| 5 | Map-Reduce(并行) | 拆分→并行处理→合并 | 扇出扇入 | 批量分析、多文档处理 | §六大架构 5 |
| 6 | Reflection(反思) | 生成→评估→改进循环 | 循环 | 高质量写作、代码审查 | §六大架构 6 |
Agent 架构演进
Level 1: 简单链式调用
用户 → LLM → 输出
(没有工具,纯对话)
Level 2: ReAct Agent
用户 → LLM → 工具调用 → 观察 → LLM → 输出
(单 Agent + 工具)
Level 3: 多步规划 Agent
用户 → 规划器 → [子任务1, 子任务2, ...] → 各自执行 → 整合输出
(Plan-and-Execute)
Level 4: 多 Agent 协作
用户 → Supervisor → [Agent A, Agent B, Agent C] → 整合输出
(Multi-Agent System)
Level 5: 自主 Agent 网络
用户 → Agent 自主发现同伴 → 动态协作 → 自我改进
(Agentic AI + A2A 协议)六大经典架构模式
1. 单 Agent(Router)
最简单的模式:一个 Agent 负责所有事情。
┌──────────────┐
│ 用户输入 │
└──────┬───────┘
▼
┌──────────────┐
│ 单 Agent │
│ (LLM+工具) │──→ 工具A / 工具B / 工具C
└──────┬───────┘
▼
┌──────────────┐
│ 输出结果 │
└──────────────┘python
# LangGraph 实现
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(
model=ChatOpenAI(model="gpt-4o"),
tools=[search_tool, calculator_tool, db_tool]
)
result = await agent.ainvoke({"messages": [("user", "查询上周销售额并生成分析报告")]})适用:简单任务、工具数量 < 10 局限:工具太多时 LLM 容易选错、单点瓶颈
2. Supervisor(监督者模式)⭐
一个 Supervisor Agent 负责任务分发,多个专家 Agent 负责执行。
┌──────────────┐
│ Supervisor │
│ (分发+整合) │
└──┬───┬───┬──┘
│ │ │
┌────────┘ │ └────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 研究 Agent│ │ 写作 Agent│ │ 审核 Agent│
│ (搜索工具)│ │ (文档工具)│ │ (评估工具)│
└──────────┘ └──────────┘ └──────────┘python
from langgraph.graph import StateGraph, START, END
from typing import Literal
# Supervisor 路由决策
def supervisor(state: State) -> dict:
"""决定下一步交给哪个 Agent"""
response = llm.invoke([
{"role": "system", "content": """你是任务管理者。根据当前状态决定下一步:
- "researcher": 需要搜索信息
- "writer": 需要撰写内容
- "reviewer": 需要审核质量
- "FINISH": 任务完成"""},
{"role": "user", "content": f"当前状态: {state['messages'][-1].content}"}
])
return {"next": response.content.strip()}
# 构建图
graph = StateGraph(State)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher_agent)
graph.add_node("writer", writer_agent)
graph.add_node("reviewer", reviewer_agent)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", lambda s: s["next"], {
"researcher": "researcher",
"writer": "writer",
"reviewer": "reviewer",
"FINISH": END
})
# 每个 Agent 执行完都回到 Supervisor
graph.add_edge("researcher", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("reviewer", "supervisor")
app = graph.compile()适用:需要多种专业能力的复杂任务 优势:职责清晰、可独立优化每个 Agent 注意:Supervisor 是瓶颈,需要足够强的模型
3. Hierarchical(层级模式)
多层 Supervisor,适合超复杂任务。
┌────────────────┐
│ Top Supervisor │
└───┬────────┬──┘
│ │
┌────────▼──┐ ┌──▼────────┐
│ 研究主管 │ │ 开发主管 │
└──┬─────┬──┘ └──┬─────┬──┘
│ │ │ │
┌──▼──┐┌─▼──┐ ┌──▼──┐┌─▼──┐
│搜索 ││分析 │ │前端 ││后端 │
└─────┘└────┘ └─────┘└────┘适用:企业级复杂系统(如自动化软件开发)
4. Swarm(群体智能模式)
Agent 之间平等协作,通过 Handoff 动态传递控制权。
┌──────────┐ handoff ┌──────────┐ handoff ┌──────────┐
│ Agent A │──────────→│ Agent B │──────────→│ Agent C │
│ (客服入口)│ │ (技术支持)│ │ (退款处理)│
└──────────┘ └──────────┘ └──────────┘python
# OpenAI Swarm 风格(概念示例)
from swarm import Swarm, Agent
client = Swarm()
# 定义 Agent
triage_agent = Agent(
name="分诊 Agent",
instructions="判断用户需求,转接到合适的 Agent",
functions=[transfer_to_sales, transfer_to_support]
)
sales_agent = Agent(
name="销售 Agent",
instructions="处理购买咨询、推荐产品",
functions=[check_inventory, create_order]
)
support_agent = Agent(
name="技术支持 Agent",
instructions="处理技术问题和故障排查",
functions=[search_docs, create_ticket]
)
def transfer_to_sales():
"""转接到销售 Agent"""
return sales_agent
def transfer_to_support():
"""转接到技术支持 Agent"""
return support_agent
# 运行:自动在 Agent 之间流转
response = client.run(agent=triage_agent, messages=[
{"role": "user", "content": "我的订单出了问题"}
])适用:客服系统、流程化任务 优势:无中心瓶颈、Agent 可独立扩展
5. Map-Reduce(并行处理模式)
将大任务拆分,多个 Agent 并行处理,最后合并结果。
┌──── Agent 1 → 结果1 ────┐
│ │
用户输入 → 拆分 ──── Agent 2 → 结果2 ──── 合并 → 最终输出
│ │
└──── Agent 3 → 结果3 ────┘python
import asyncio
async def map_reduce_agent(query: str, documents: list[str]) -> str:
"""Map-Reduce:并行处理多个文档,合并分析"""
# Map:每个文档分配一个 Agent 并行分析
tasks = [
analyze_document(doc, query)
for doc in documents
]
results = await asyncio.gather(*tasks)
# Reduce:合并所有分析结果
combined = "\n\n".join([
f"文档 {i+1} 分析:\n{r}"
for i, r in enumerate(results)
])
final = await llm.ainvoke([
{"role": "system", "content": "综合以下分析结果,生成统一的结论报告"},
{"role": "user", "content": combined}
])
return final.content适用:批量数据处理、多文档分析、并行搜索
6. Reflection(反思模式)
Agent 生成输出后,由另一个 Agent(或自身)评估质量,不合格则重新生成。
用户输入 → 生成 Agent → 输出草稿
│
▼
评估 Agent
│ │
不合格│ │合格
▼ ▼
反馈修改 最终输出python
async def reflection_loop(query: str, max_iterations: int = 3) -> str:
"""反思循环:生成 → 评估 → 改进"""
draft = await generator.ainvoke(query)
for i in range(max_iterations):
# 评估
evaluation = await evaluator.ainvoke(
f"评估以下回答的质量(1-10分)并指出改进点:\n\n{draft}"
)
score = extract_score(evaluation)
if score >= 8:
return draft # 质量达标,直接返回
# 改进
draft = await generator.ainvoke(
f"原始问题:{query}\n\n上次回答:{draft}\n\n评审反馈:{evaluation}\n\n请改进:"
)
return draft适用:高质量内容生成、代码编写、学术报告
架构选型指南
你的任务是什么?
简单问答 + 几个工具?
→ 单 Agent(Router)
需要多种专业能力协作?
→ Supervisor 模式
任务层级很深、团队庞大?
→ Hierarchical 模式
流程化、需要动态转接?
→ Swarm 模式
可以并行处理的批量任务?
→ Map-Reduce 模式
需要高质量输出、允许多轮迭代?
→ Reflection 模式
不确定?
→ 从 Supervisor 开始,按需演进| 模式 | 复杂度 | Agent 数量 | 通信 | 典型场景 |
|---|---|---|---|---|
| 单 Agent | ⭐ | 1 | — | 聊天助手 |
| Supervisor | ⭐⭐ | 3-5 | 中心化 | 内容创作 |
| Hierarchical | ⭐⭐⭐ | 5+ | 树状 | 软件开发 |
| Swarm | ⭐⭐ | 3-10 | P2P | 客服系统 |
| Map-Reduce | ⭐⭐ | N | 扇出扇入 | 数据分析 |
| Reflection | ⭐⭐ | 2 | 循环 | 高质量生成 |
Agent 可靠性工程
1. 工具调用防护
python
# 工具调用超时 + 重试 + 降级
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
async def safe_tool_call(tool_fn, args, timeout: int = 30):
"""安全的工具调用:超时 + 重试 + 异常捕获"""
try:
result = await asyncio.wait_for(tool_fn(**args), timeout=timeout)
return result
except asyncio.TimeoutError:
return f"⚠️ 工具调用超时({timeout}s),请尝试简化请求"
except Exception as e:
return f"⚠️ 工具调用失败:{str(e)}"2. Agent 循环保护
python
MAX_ITERATIONS = 15 # 最大循环次数
MAX_TOKENS_BUDGET = 50000 # Token 预算
async def run_agent_safely(agent, input_msg: str) -> str:
"""防止 Agent 无限循环"""
total_tokens = 0
iterations = 0
async for event in agent.astream({"messages": [("user", input_msg)]}):
iterations += 1
total_tokens += event.get("token_count", 0)
if iterations > MAX_ITERATIONS:
return "⚠️ Agent 执行超过最大迭代次数,已强制终止"
if total_tokens > MAX_TOKENS_BUDGET:
return "⚠️ Token 消耗超出预算,已强制终止"
return event["messages"][-1].content3. Human-in-the-Loop
python
from langgraph.checkpoint.memory import MemorySaver
# LangGraph 中断点:敏感操作前等待人工确认
graph = StateGraph(State)
# 标记需要人工审核的节点
graph.add_node("execute_action", execute_action)
app = graph.compile(
checkpointer=MemorySaver(),
interrupt_before=["execute_action"] # 执行前暂停
)
# 运行到断点自动暂停
result = await app.ainvoke(input, config={"configurable": {"thread_id": "1"}})
# → 暂停,等待人工确认
# 人工确认后继续
result = await app.ainvoke(None, config={"configurable": {"thread_id": "1"}})Agentic RAG(Agent 驱动的检索)
传统 RAG 是固定 Pipeline,Agentic RAG 让 Agent 自主决定何时检索、检索什么。
传统 RAG(固定流程):
用户提问 → 检索 → 生成 → 输出
问题:不是所有问题都需要检索
Agentic RAG(智能决策):
用户提问 → Agent 判断 →
需要检索? → 检索 → 评估质量 → 不够?→ 重新检索/改写查询
不需要? → 直接回答
需要多个来源? → 并行检索 → 合并python
def agentic_rag_router(query: str) -> str:
"""Agent 自主决定检索策略"""
decision = llm.invoke(f"""
判断以下问题应该如何处理:
问题:{query}
选项:
A. 直接回答(常识/简单问题)
B. 检索知识库(需要专业知识)
C. 搜索互联网(需要最新信息)
D. 混合检索(知识库 + 互联网)
只输出选项字母。
""")
return decision.strip()学习资源