Skip to content

第八章 子图与多智能体协作


8.1 子图(Subgraph)—— 图中嵌套图

前面七章我们构建的都是"单图"——所有节点平铺在同一个 StateGraph 中。当 Agent 逻辑变复杂时(十几个节点、多种职责混在一起),单图会变得难以维护。

解决方案:子图(Subgraph)——把一个大图拆成多个小图,像搭积木一样组合。

什么是子图

单图(所有逻辑挤在一起):

  START → classify → search → evaluate → rewrite →
          generate → format → review → END
  → 8 个节点,逻辑耦合,难以复用

子图(拆分成模块):

  ┌── 主图 ────────────────────────────┐
  │                                     │
  │ START → classify →  ┌────────────┐  │
  │                     │ 搜索子图    │  │
  │                     │ search →   │  │
  │                     │ evaluate → │  │
  │                     │ rewrite    │  │
  │                     └─────┬──────┘  │
  │                           ↓         │
  │                     ┌────────────┐  │
  │                     │ 写作子图    │  │
  │                     │ generate → │  │
  │                     │ format →   │  │
  │                     │ review     │  │
  │                     └─────┬──────┘  │
  │                           ↓         │
  │                          END        │
  └─────────────────────────────────────┘

创建子图

子图就是一个普通的 StateGraph,编译后作为节点添加到主图中:

python
from langgraph.graph import StateGraph, MessagesState, START, END

# ====== 子图:搜索模块 ======
def search(state: MessagesState):
    return {"messages": [AIMessage(content="搜索结果:LangGraph 是...")]}

def evaluate(state: MessagesState):
    return {"messages": [AIMessage(content="搜索结果评分:0.95")]}

search_graph = StateGraph(MessagesState)
search_graph.add_node("search", search)
search_graph.add_node("evaluate", evaluate)
search_graph.add_edge(START, "search")
search_graph.add_edge("search", "evaluate")
search_graph.add_edge("evaluate", END)

# 编译子图
search_app = search_graph.compile()

# ====== 主图 ======
main_graph = StateGraph(MessagesState)
main_graph.add_node("search_module", search_app)  # 把子图当节点!
main_graph.add_node("respond", respond_fn)

main_graph.add_edge(START, "search_module")
main_graph.add_edge("search_module", "respond")
main_graph.add_edge("respond", END)

app = main_graph.compile()

子图的执行流程

用户调用 app.invoke(...)

  ↓ 进入主图
  START

  ↓ 进入子图节点 "search_module"
  ┌──────────────────────┐
  │ search_app 内部执行:  │
  │ START → search →     │
  │ evaluate → END       │
  └──────────┬───────────┘
             │ 子图返回,State 合并回主图

  respond → END

子图的优势

优势说明
模块化每个子图负责一个独立功能
可复用同一个子图可以在多个主图中使用
独立测试子图可以单独编译和测试
团队协作不同人负责不同子图

类比:子图就像编程中的函数。你不会把所有逻辑写在一个函数里,同理也不应该把所有 Agent 逻辑放在一个图里。


8.2 Supervisor 模式 —— 一个"经理"调度多个"员工"

当你有多个专业 Agent 时,需要有人来协调它们。Supervisor 模式就是让一个"经理" Agent 来分配任务。

架构

Supervisor 模式:

  用户提问


  ┌──────────┐
  │ Supervisor│ ← "经理":决定交给谁处理
  └────┬─────┘

       ├── "需要搜索" → ┌────────────┐
       │                 │ 搜索 Agent  │ → 返回结果给 Supervisor
       │                 └────────────┘

       ├── "需要写作" → ┌────────────┐
       │                 │ 写作 Agent  │ → 返回结果给 Supervisor
       │                 └────────────┘

       ├── "需要计算" → ┌────────────┐
       │                 │ 计算 Agent  │ → 返回结果给 Supervisor
       │                 └────────────┘

       └── "任务完成" → END

实现方式

python
from langchain_deepseek import ChatDeepSeek
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, MessagesState, START, END

llm = ChatDeepSeek(model="deepseek-chat", temperature=0)

# ====== 专业 Agent(员工)======
def researcher(state: MessagesState):
    """搜索研究员"""
    response = llm.invoke([
        SystemMessage(content="你是一个研究员,擅长搜索和分析信息。"),
        *state["messages"]
    ])
    return {"messages": [response]}

def writer(state: MessagesState):
    """写作助手"""
    response = llm.invoke([
        SystemMessage(content="你是一个写作高手,擅长将研究结果整理成文章。"),
        *state["messages"]
    ])
    return {"messages": [response]}

# ====== Supervisor(经理)======
def supervisor(state: MessagesState):
    """经理:决定下一步交给谁"""
    response = llm.invoke([
        SystemMessage(content="""你是一个任务调度经理。
根据对话内容决定下一步应该交给谁处理。
只返回以下选项之一:
- researcher:需要搜索或分析信息
- writer:需要撰写或整理内容
- FINISH:任务已完成"""),
        *state["messages"]
    ])
    return {"messages": [response]}

def route_supervisor(state: MessagesState) -> str:
    """根据 Supervisor 的决策路由"""
    last_msg = state["messages"][-1].content.strip().lower()
    if "researcher" in last_msg:
        return "researcher"
    elif "writer" in last_msg:
        return "writer"
    return END

# ====== 构建图 ======
graph = StateGraph(MessagesState)

graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher)
graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_supervisor)
graph.add_edge("researcher", "supervisor")  # 员工干完活回到经理
graph.add_edge("writer", "supervisor")      # 员工干完活回到经理

app = graph.compile()

执行流程

用户: "帮我调研 LangGraph 的优势,然后写一篇总结"

第 1 轮:Supervisor → "researcher"(需要先调研)
第 2 轮:Researcher → 返回调研结果 → Supervisor
第 3 轮:Supervisor → "writer"(有了素材,可以写了)
第 4 轮:Writer → 返回文章 → Supervisor
第 5 轮:Supervisor → "FINISH"(任务完成)→ END

Supervisor 的核心思想:"经理"不做具体工作,只负责分配任务和判断完成度。就像一个项目经理,把任务分给研究员和写手,然后检查结果。


8.3 Handoff 模式 —— Agent 之间的任务交接

Supervisor 模式中,所有通信都经过"经理"中转。而 Handoff 模式不同——Agent 之间直接交接,类似接力赛。

Supervisor vs Handoff

Supervisor 模式(中心化):
                  ┌──────────┐
            ┌────→│ 搜索Agent │────┐
            │     └──────────┘    │
  ┌─────────┴────┐          ┌────┴─────────┐
  │  Supervisor   │←─────────│  Supervisor   │
  └─────────┬────┘          └────┬─────────┘
            │     ┌──────────┐    │
            └────→│ 写作Agent │────┘
                  └──────────┘
  → 所有沟通都经过 Supervisor

Handoff 模式(去中心化):
  ┌──────────┐        ┌──────────┐
  │ 搜索Agent │───────→│ 写作Agent │
  └──────────┘        └──────────┘
       ↑                    │
       │                    ↓
  ┌──────────┐        ┌──────────┐
  │ 审核Agent │←───────│ 格式Agent │
  └──────────┘        └──────────┘
  → Agent 之间直接"交棒"

用 LangGraph 实现 Handoff

Handoff 的核心思想是让每个 Agent 自己决定"下一步交给谁":

python
from typing import TypedDict, Annotated
import operator

class TeamState(TypedDict):
    messages: Annotated[list, operator.add]
    next_agent: str  # 由当前 Agent 设置,指定下一个接手的 Agent

def research_agent(state: TeamState):
    """研究 Agent:完成搜索后交给写作 Agent"""
    response = llm.invoke([
        SystemMessage(content="你是研究员。搜索相关信息并总结要点。"),
        *state["messages"]
    ])
    return {
        "messages": [response],
        "next_agent": "writer"  # 交接给写作 Agent
    }

def writer_agent(state: TeamState):
    """写作 Agent:完成写作后交给审核 Agent"""
    response = llm.invoke([
        SystemMessage(content="你是写作专家。根据研究结果撰写文章。"),
        *state["messages"]
    ])
    return {
        "messages": [response],
        "next_agent": "reviewer"  # 交接给审核 Agent
    }

def reviewer_agent(state: TeamState):
    """审核 Agent:审核通过则结束,不通过则交回写作"""
    response = llm.invoke([
        SystemMessage(content="你是审核编辑。评审文章质量,回复 APPROVE 或 REVISE。"),
        *state["messages"]
    ])
    if "APPROVE" in response.content.upper():
        next_step = "end"
    else:
        next_step = "writer"  # 打回给写作 Agent 修改
    return {
        "messages": [response],
        "next_agent": next_step
    }

def route_handoff(state: TeamState) -> str:
    """根据 next_agent 字段路由"""
    if state["next_agent"] == "end":
        return END
    return state["next_agent"]

# 构建图
graph = StateGraph(TeamState)
graph.add_node("researcher", research_agent)
graph.add_node("writer", writer_agent)
graph.add_node("reviewer", reviewer_agent)

graph.add_edge(START, "researcher")
graph.add_conditional_edges("researcher", route_handoff)
graph.add_conditional_edges("writer", route_handoff)
graph.add_conditional_edges("reviewer", route_handoff)

app = graph.compile()

两种模式对比

特性SupervisorHandoff
结构中心化(星形)去中心化(链/网状)
控制经理统一调度Agent 自主决定下一步
灵活性适合任务不固定的场景适合流程固定的场景
瓶颈Supervisor 成为瓶颈无单点瓶颈
复杂度简单(一个路由中心)较高(每个 Agent 负责路由)
适用场景客服/通用助手流水线/内容生产

选择建议:任务类型多变、需要动态调度 → Supervisor。流程相对固定、Agent 之间有明确的先后关系 → Handoff。两种模式也可以混合使用。


8.4 实战:构建一个研究 + 写作的双 Agent 系统

用 Supervisor 模式构建一个完整的"调研 → 写作"双 Agent 系统。

完整代码

python
from typing import TypedDict, Annotated
import operator
from langchain_deepseek import ChatDeepSeek
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

# ====== State ======
class TeamState(TypedDict):
    messages: Annotated[list, operator.add]
    research_notes: str     # 研究员的调研笔记
    draft: str              # 写手的文章草稿
    iteration: int          # 迭代次数

# ====== LLM ======
llm = ChatDeepSeek(model="deepseek-chat", temperature=0.7)

# ====== Supervisor ======
def supervisor(state: TeamState) -> dict:
    """调度经理:决定分配给谁"""
    system_prompt = """你是一个项目经理,管理一个研究员和一个写手。
根据当前进度决定下一步:
- 如果还没有调研笔记 → 返回 "researcher"
- 如果有笔记但没有草稿 → 返回 "writer"
- 如果有草稿了 → 返回 "FINISH"
只返回一个单词:researcher、writer 或 FINISH"""

    context = f"""当前状态:
- 调研笔记:{"有" if state.get("research_notes") else "无"}
- 文章草稿:{"有" if state.get("draft") else "无"}
- 迭代次数:{state.get("iteration", 0)}"""

    response = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=context),
        *state["messages"]
    ])
    return {"messages": [AIMessage(content=f"[Supervisor] {response.content}")]}

def route_supervisor(state: TeamState) -> str:
    last_msg = state["messages"][-1].content
    if "researcher" in last_msg.lower():
        return "researcher"
    elif "writer" in last_msg.lower():
        return "writer"
    return END

# ====== 研究员 Agent ======
def researcher(state: TeamState) -> dict:
    """研究员:调研并输出笔记"""
    user_request = state["messages"][0].content
    response = llm.invoke([
        SystemMessage(content="你是一个高级研究员。请对以下主题进行调研,输出结构化的要点笔记。"),
        HumanMessage(content=f"调研主题:{user_request}")
    ])
    return {
        "messages": [AIMessage(content=f"[Researcher] 调研完成")],
        "research_notes": response.content
    }

# ====== 写手 Agent ======
def writer(state: TeamState) -> dict:
    """写手:根据笔记撰写文章"""
    response = llm.invoke([
        SystemMessage(content="你是一个专业技术写手。根据以下调研笔记撰写一篇短文(200字左右)。"),
        HumanMessage(content=f"调研笔记:\n{state['research_notes']}")
    ])
    return {
        "messages": [AIMessage(content=f"[Writer] 文章撰写完成")],
        "draft": response.content,
        "iteration": state.get("iteration", 0) + 1
    }

# ====== 构建图 ======
graph = StateGraph(TeamState)

graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher)
graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_supervisor)
graph.add_edge("researcher", "supervisor")
graph.add_edge("writer", "supervisor")

app = graph.compile(checkpointer=MemorySaver())

# ====== 运行 ======
config = {"configurable": {"thread_id": "team_work_001"}}

result = app.invoke(
    {
        "messages": [HumanMessage(content="请调研 LangGraph 的核心优势并写一篇总结")],
        "research_notes": "",
        "draft": "",
        "iteration": 0
    },
    config=config,
)

print("=== 调研笔记 ===")
print(result["research_notes"][:200] + "...")
print("\n=== 最终文章 ===")
print(result["draft"][:300] + "...")

执行流程

用户: "请调研 LangGraph 的核心优势并写一篇总结"

  Supervisor → "researcher"(没有笔记,先调研)
  Researcher → 输出调研笔记 → 回到 Supervisor
  Supervisor → "writer"(有笔记了,开始写)
  Writer → 输出文章草稿 → 回到 Supervisor
  Supervisor → "FINISH"(有草稿了,完成)→ END

  每个角色只做自己擅长的事,Supervisor 负责协调。

8.5 多智能体的 State 共享与隔离

多 Agent 协作时,State 管理变得更复杂。核心问题是:哪些数据该共享、哪些该隔离

共享 State(所有 Agent 看同一个 State)

python
# 最简单的方式:所有 Agent 共享同一个 State
class SharedState(TypedDict):
    messages: Annotated[list, operator.add]  # 所有 Agent 共享消息列表
    current_task: str                         # 共享的任务上下文
    results: dict                             # 共享的结果集
共享 State:

  ┌──────────────────────────────────────┐
  │            SharedState               │
  │ messages: [...]                      │
  │ current_task: "..."                  │
  │ results: {...}                       │
  └───────┬──────────┬──────────┬────────┘
          │          │          │
       Agent A    Agent B    Agent C
       (都能读写所有字段)

优点:简单、所有 Agent 信息透明。 缺点:Agent 之间可能互相干扰。

隔离 State(子图有自己的 State)

python
# 子图有自己的 State 类型
class ResearchState(TypedDict):
    messages: Annotated[list, operator.add]
    search_results: list   # 只有研究子图用

class WritingState(TypedDict):
    messages: Annotated[list, operator.add]
    outline: str           # 只有写作子图用
    draft: str

# 子图有独立的 State
research_graph = StateGraph(ResearchState)
writing_graph = StateGraph(WritingState)
隔离 State:

  ┌── 主图 State ──────────────────────┐
  │ messages: [...]                     │
  │ final_result: "..."                 │
  │                                     │
  │  ┌── 研究子图 State ─┐              │
  │  │ search_results: []│              │
  │  └──────────────────┘              │
  │                                     │
  │  ┌── 写作子图 State ─┐              │
  │  │ outline: "..."    │              │
  │  │ draft: "..."      │              │
  │  └──────────────────┘              │
  └─────────────────────────────────────┘

  子图的私有字段在子图外不可见。
  只有 messages(共同字段)会自动在主图和子图之间同步。

State 共享的原则

数据类型建议原因
对话消息 (messages)共享所有 Agent 需要了解上下文
中间结果看情况下游 Agent 需要时共享
Agent 私有数据隔离避免干扰,用子图 State
全局配置共享如 user_id、session_id

设计原则:默认共享 messages,其他字段按需决定。如果发现 State 字段太多、Agent 之间干扰不断,就该考虑用子图隔离了。


本章小结

知识点要点
子图图中嵌套图,模块化 + 可复用
Supervisor 模式一个经理调度多个员工,中心化控制
Handoff 模式Agent 之间直接交接,去中心化协作
Supervisor vs Handoff动态任务用 Supervisor,固定流程用 Handoff
State 共享所有 Agent 读写同一个 State
State 隔离子图有独立 State,私有字段不可见

下一章预告:Streaming —— 实时响应。流式输出 Token、实时追踪节点执行进度、SSE 推送到前端。让你的 Agent 不再"闷头想半天才说话"。

坚持是一种品格