第八章 子图与多智能体协作
8.1 子图(Subgraph)—— 图中嵌套图
前面七章我们构建的都是"单图"——所有节点平铺在同一个 StateGraph 中。当 Agent 逻辑变复杂时(十几个节点、多种职责混在一起),单图会变得难以维护。
解决方案:子图(Subgraph)——把一个大图拆成多个小图,像搭积木一样组合。
什么是子图
单图(所有逻辑挤在一起):
START → classify → search → evaluate → rewrite →
generate → format → review → END
→ 8 个节点,逻辑耦合,难以复用
子图(拆分成模块):
┌── 主图 ────────────────────────────┐
│ │
│ START → classify → ┌────────────┐ │
│ │ 搜索子图 │ │
│ │ search → │ │
│ │ evaluate → │ │
│ │ rewrite │ │
│ └─────┬──────┘ │
│ ↓ │
│ ┌────────────┐ │
│ │ 写作子图 │ │
│ │ generate → │ │
│ │ format → │ │
│ │ review │ │
│ └─────┬──────┘ │
│ ↓ │
│ END │
└─────────────────────────────────────┘创建子图
子图就是一个普通的 StateGraph,编译后作为节点添加到主图中:
from langgraph.graph import StateGraph, MessagesState, START, END
# ====== 子图:搜索模块 ======
def search(state: MessagesState):
return {"messages": [AIMessage(content="搜索结果:LangGraph 是...")]}
def evaluate(state: MessagesState):
return {"messages": [AIMessage(content="搜索结果评分:0.95")]}
search_graph = StateGraph(MessagesState)
search_graph.add_node("search", search)
search_graph.add_node("evaluate", evaluate)
search_graph.add_edge(START, "search")
search_graph.add_edge("search", "evaluate")
search_graph.add_edge("evaluate", END)
# 编译子图
search_app = search_graph.compile()
# ====== 主图 ======
main_graph = StateGraph(MessagesState)
main_graph.add_node("search_module", search_app) # 把子图当节点!
main_graph.add_node("respond", respond_fn)
main_graph.add_edge(START, "search_module")
main_graph.add_edge("search_module", "respond")
main_graph.add_edge("respond", END)
app = main_graph.compile()子图的执行流程
用户调用 app.invoke(...)
│
↓ 进入主图
START
│
↓ 进入子图节点 "search_module"
┌──────────────────────┐
│ search_app 内部执行: │
│ START → search → │
│ evaluate → END │
└──────────┬───────────┘
│ 子图返回,State 合并回主图
↓
respond → END子图的优势
| 优势 | 说明 |
|---|---|
| 模块化 | 每个子图负责一个独立功能 |
| 可复用 | 同一个子图可以在多个主图中使用 |
| 独立测试 | 子图可以单独编译和测试 |
| 团队协作 | 不同人负责不同子图 |
类比:子图就像编程中的函数。你不会把所有逻辑写在一个函数里,同理也不应该把所有 Agent 逻辑放在一个图里。
8.2 Supervisor 模式 —— 一个"经理"调度多个"员工"
当你有多个专业 Agent 时,需要有人来协调它们。Supervisor 模式就是让一个"经理" Agent 来分配任务。
架构
Supervisor 模式:
用户提问
│
↓
┌──────────┐
│ Supervisor│ ← "经理":决定交给谁处理
└────┬─────┘
│
├── "需要搜索" → ┌────────────┐
│ │ 搜索 Agent │ → 返回结果给 Supervisor
│ └────────────┘
│
├── "需要写作" → ┌────────────┐
│ │ 写作 Agent │ → 返回结果给 Supervisor
│ └────────────┘
│
├── "需要计算" → ┌────────────┐
│ │ 计算 Agent │ → 返回结果给 Supervisor
│ └────────────┘
│
└── "任务完成" → END实现方式
from langchain_deepseek import ChatDeepSeek
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, MessagesState, START, END
llm = ChatDeepSeek(model="deepseek-chat", temperature=0)
# ====== 专业 Agent(员工)======
def researcher(state: MessagesState):
"""搜索研究员"""
response = llm.invoke([
SystemMessage(content="你是一个研究员,擅长搜索和分析信息。"),
*state["messages"]
])
return {"messages": [response]}
def writer(state: MessagesState):
"""写作助手"""
response = llm.invoke([
SystemMessage(content="你是一个写作高手,擅长将研究结果整理成文章。"),
*state["messages"]
])
return {"messages": [response]}
# ====== Supervisor(经理)======
def supervisor(state: MessagesState):
"""经理:决定下一步交给谁"""
response = llm.invoke([
SystemMessage(content="""你是一个任务调度经理。
根据对话内容决定下一步应该交给谁处理。
只返回以下选项之一:
- researcher:需要搜索或分析信息
- writer:需要撰写或整理内容
- FINISH:任务已完成"""),
*state["messages"]
])
return {"messages": [response]}
def route_supervisor(state: MessagesState) -> str:
"""根据 Supervisor 的决策路由"""
last_msg = state["messages"][-1].content.strip().lower()
if "researcher" in last_msg:
return "researcher"
elif "writer" in last_msg:
return "writer"
return END
# ====== 构建图 ======
graph = StateGraph(MessagesState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher)
graph.add_node("writer", writer)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_supervisor)
graph.add_edge("researcher", "supervisor") # 员工干完活回到经理
graph.add_edge("writer", "supervisor") # 员工干完活回到经理
app = graph.compile()执行流程
用户: "帮我调研 LangGraph 的优势,然后写一篇总结"
第 1 轮:Supervisor → "researcher"(需要先调研)
第 2 轮:Researcher → 返回调研结果 → Supervisor
第 3 轮:Supervisor → "writer"(有了素材,可以写了)
第 4 轮:Writer → 返回文章 → Supervisor
第 5 轮:Supervisor → "FINISH"(任务完成)→ ENDSupervisor 的核心思想:"经理"不做具体工作,只负责分配任务和判断完成度。就像一个项目经理,把任务分给研究员和写手,然后检查结果。
8.3 Handoff 模式 —— Agent 之间的任务交接
Supervisor 模式中,所有通信都经过"经理"中转。而 Handoff 模式不同——Agent 之间直接交接,类似接力赛。
Supervisor vs Handoff
Supervisor 模式(中心化):
┌──────────┐
┌────→│ 搜索Agent │────┐
│ └──────────┘ │
┌─────────┴────┐ ┌────┴─────────┐
│ Supervisor │←─────────│ Supervisor │
└─────────┬────┘ └────┬─────────┘
│ ┌──────────┐ │
└────→│ 写作Agent │────┘
└──────────┘
→ 所有沟通都经过 Supervisor
Handoff 模式(去中心化):
┌──────────┐ ┌──────────┐
│ 搜索Agent │───────→│ 写作Agent │
└──────────┘ └──────────┘
↑ │
│ ↓
┌──────────┐ ┌──────────┐
│ 审核Agent │←───────│ 格式Agent │
└──────────┘ └──────────┘
→ Agent 之间直接"交棒"用 LangGraph 实现 Handoff
Handoff 的核心思想是让每个 Agent 自己决定"下一步交给谁":
from typing import TypedDict, Annotated
import operator
class TeamState(TypedDict):
messages: Annotated[list, operator.add]
next_agent: str # 由当前 Agent 设置,指定下一个接手的 Agent
def research_agent(state: TeamState):
"""研究 Agent:完成搜索后交给写作 Agent"""
response = llm.invoke([
SystemMessage(content="你是研究员。搜索相关信息并总结要点。"),
*state["messages"]
])
return {
"messages": [response],
"next_agent": "writer" # 交接给写作 Agent
}
def writer_agent(state: TeamState):
"""写作 Agent:完成写作后交给审核 Agent"""
response = llm.invoke([
SystemMessage(content="你是写作专家。根据研究结果撰写文章。"),
*state["messages"]
])
return {
"messages": [response],
"next_agent": "reviewer" # 交接给审核 Agent
}
def reviewer_agent(state: TeamState):
"""审核 Agent:审核通过则结束,不通过则交回写作"""
response = llm.invoke([
SystemMessage(content="你是审核编辑。评审文章质量,回复 APPROVE 或 REVISE。"),
*state["messages"]
])
if "APPROVE" in response.content.upper():
next_step = "end"
else:
next_step = "writer" # 打回给写作 Agent 修改
return {
"messages": [response],
"next_agent": next_step
}
def route_handoff(state: TeamState) -> str:
"""根据 next_agent 字段路由"""
if state["next_agent"] == "end":
return END
return state["next_agent"]
# 构建图
graph = StateGraph(TeamState)
graph.add_node("researcher", research_agent)
graph.add_node("writer", writer_agent)
graph.add_node("reviewer", reviewer_agent)
graph.add_edge(START, "researcher")
graph.add_conditional_edges("researcher", route_handoff)
graph.add_conditional_edges("writer", route_handoff)
graph.add_conditional_edges("reviewer", route_handoff)
app = graph.compile()两种模式对比
| 特性 | Supervisor | Handoff |
|---|---|---|
| 结构 | 中心化(星形) | 去中心化(链/网状) |
| 控制 | 经理统一调度 | Agent 自主决定下一步 |
| 灵活性 | 适合任务不固定的场景 | 适合流程固定的场景 |
| 瓶颈 | Supervisor 成为瓶颈 | 无单点瓶颈 |
| 复杂度 | 简单(一个路由中心) | 较高(每个 Agent 负责路由) |
| 适用场景 | 客服/通用助手 | 流水线/内容生产 |
选择建议:任务类型多变、需要动态调度 → Supervisor。流程相对固定、Agent 之间有明确的先后关系 → Handoff。两种模式也可以混合使用。
8.4 实战:构建一个研究 + 写作的双 Agent 系统
用 Supervisor 模式构建一个完整的"调研 → 写作"双 Agent 系统。
完整代码
from typing import TypedDict, Annotated
import operator
from langchain_deepseek import ChatDeepSeek
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
# ====== State ======
class TeamState(TypedDict):
messages: Annotated[list, operator.add]
research_notes: str # 研究员的调研笔记
draft: str # 写手的文章草稿
iteration: int # 迭代次数
# ====== LLM ======
llm = ChatDeepSeek(model="deepseek-chat", temperature=0.7)
# ====== Supervisor ======
def supervisor(state: TeamState) -> dict:
"""调度经理:决定分配给谁"""
system_prompt = """你是一个项目经理,管理一个研究员和一个写手。
根据当前进度决定下一步:
- 如果还没有调研笔记 → 返回 "researcher"
- 如果有笔记但没有草稿 → 返回 "writer"
- 如果有草稿了 → 返回 "FINISH"
只返回一个单词:researcher、writer 或 FINISH"""
context = f"""当前状态:
- 调研笔记:{"有" if state.get("research_notes") else "无"}
- 文章草稿:{"有" if state.get("draft") else "无"}
- 迭代次数:{state.get("iteration", 0)}"""
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=context),
*state["messages"]
])
return {"messages": [AIMessage(content=f"[Supervisor] {response.content}")]}
def route_supervisor(state: TeamState) -> str:
last_msg = state["messages"][-1].content
if "researcher" in last_msg.lower():
return "researcher"
elif "writer" in last_msg.lower():
return "writer"
return END
# ====== 研究员 Agent ======
def researcher(state: TeamState) -> dict:
"""研究员:调研并输出笔记"""
user_request = state["messages"][0].content
response = llm.invoke([
SystemMessage(content="你是一个高级研究员。请对以下主题进行调研,输出结构化的要点笔记。"),
HumanMessage(content=f"调研主题:{user_request}")
])
return {
"messages": [AIMessage(content=f"[Researcher] 调研完成")],
"research_notes": response.content
}
# ====== 写手 Agent ======
def writer(state: TeamState) -> dict:
"""写手:根据笔记撰写文章"""
response = llm.invoke([
SystemMessage(content="你是一个专业技术写手。根据以下调研笔记撰写一篇短文(200字左右)。"),
HumanMessage(content=f"调研笔记:\n{state['research_notes']}")
])
return {
"messages": [AIMessage(content=f"[Writer] 文章撰写完成")],
"draft": response.content,
"iteration": state.get("iteration", 0) + 1
}
# ====== 构建图 ======
graph = StateGraph(TeamState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher)
graph.add_node("writer", writer)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_supervisor)
graph.add_edge("researcher", "supervisor")
graph.add_edge("writer", "supervisor")
app = graph.compile(checkpointer=MemorySaver())
# ====== 运行 ======
config = {"configurable": {"thread_id": "team_work_001"}}
result = app.invoke(
{
"messages": [HumanMessage(content="请调研 LangGraph 的核心优势并写一篇总结")],
"research_notes": "",
"draft": "",
"iteration": 0
},
config=config,
)
print("=== 调研笔记 ===")
print(result["research_notes"][:200] + "...")
print("\n=== 最终文章 ===")
print(result["draft"][:300] + "...")执行流程
用户: "请调研 LangGraph 的核心优势并写一篇总结"
Supervisor → "researcher"(没有笔记,先调研)
Researcher → 输出调研笔记 → 回到 Supervisor
Supervisor → "writer"(有笔记了,开始写)
Writer → 输出文章草稿 → 回到 Supervisor
Supervisor → "FINISH"(有草稿了,完成)→ END
每个角色只做自己擅长的事,Supervisor 负责协调。8.5 多智能体的 State 共享与隔离
多 Agent 协作时,State 管理变得更复杂。核心问题是:哪些数据该共享、哪些该隔离。
共享 State(所有 Agent 看同一个 State)
# 最简单的方式:所有 Agent 共享同一个 State
class SharedState(TypedDict):
messages: Annotated[list, operator.add] # 所有 Agent 共享消息列表
current_task: str # 共享的任务上下文
results: dict # 共享的结果集共享 State:
┌──────────────────────────────────────┐
│ SharedState │
│ messages: [...] │
│ current_task: "..." │
│ results: {...} │
└───────┬──────────┬──────────┬────────┘
│ │ │
Agent A Agent B Agent C
(都能读写所有字段)优点:简单、所有 Agent 信息透明。 缺点:Agent 之间可能互相干扰。
隔离 State(子图有自己的 State)
# 子图有自己的 State 类型
class ResearchState(TypedDict):
messages: Annotated[list, operator.add]
search_results: list # 只有研究子图用
class WritingState(TypedDict):
messages: Annotated[list, operator.add]
outline: str # 只有写作子图用
draft: str
# 子图有独立的 State
research_graph = StateGraph(ResearchState)
writing_graph = StateGraph(WritingState)隔离 State:
┌── 主图 State ──────────────────────┐
│ messages: [...] │
│ final_result: "..." │
│ │
│ ┌── 研究子图 State ─┐ │
│ │ search_results: []│ │
│ └──────────────────┘ │
│ │
│ ┌── 写作子图 State ─┐ │
│ │ outline: "..." │ │
│ │ draft: "..." │ │
│ └──────────────────┘ │
└─────────────────────────────────────┘
子图的私有字段在子图外不可见。
只有 messages(共同字段)会自动在主图和子图之间同步。State 共享的原则
| 数据类型 | 建议 | 原因 |
|---|---|---|
| 对话消息 (messages) | 共享 | 所有 Agent 需要了解上下文 |
| 中间结果 | 看情况 | 下游 Agent 需要时共享 |
| Agent 私有数据 | 隔离 | 避免干扰,用子图 State |
| 全局配置 | 共享 | 如 user_id、session_id |
设计原则:默认共享 messages,其他字段按需决定。如果发现 State 字段太多、Agent 之间干扰不断,就该考虑用子图隔离了。
本章小结
| 知识点 | 要点 |
|---|---|
| 子图 | 图中嵌套图,模块化 + 可复用 |
| Supervisor 模式 | 一个经理调度多个员工,中心化控制 |
| Handoff 模式 | Agent 之间直接交接,去中心化协作 |
| Supervisor vs Handoff | 动态任务用 Supervisor,固定流程用 Handoff |
| State 共享 | 所有 Agent 读写同一个 State |
| State 隔离 | 子图有独立 State,私有字段不可见 |
下一章预告:Streaming —— 实时响应。流式输出 Token、实时追踪节点执行进度、SSE 推送到前端。让你的 Agent 不再"闷头想半天才说话"。