Skip to content

AI Agent 记忆系统设计

从无状态到有记忆——短期记忆、长期记忆、情景记忆的工程实现:对话压缩、向量检索记忆、图记忆、Mem0/Letta/LangGraph 三大框架实战,让你的 Agent 真正"记住"用户。


1. 为什么 Agent 需要记忆系统

1.1 LLM 的"金鱼记忆":无状态的本质问题

如果你用过 ChatGPT,一定有过这样的体验:

第 1 次对话:
  你:我叫张三,在北京做后端开发,主要用 Python
  AI:你好张三!Python 后端开发是个很好的方向...

第 2 次对话(新开一个窗口):
  你:帮我优化一下这段代码
  AI:好的,请问你使用的是什么编程语言? ← 它忘了你是谁

这不是 ChatGPT 的 Bug——这是 LLM 的本质特性。每一次 API 调用,模型都是"重新出生"的:

LLM 的每次调用都是独立的:

  调用 1 ──→ [  LLM  ] ──→ 回答 1
               ↑ 无状态
  调用 2 ──→ [  LLM  ] ──→ 回答 2
               ↑ 无状态
  调用 3 ──→ [  LLM  ] ──→ 回答 3

  三次调用之间没有任何共享状态
  模型不知道"上一次你问了什么"
  每次都是全新的推理——彻底的金鱼记忆

从技术角度看,LLM API 就是一个纯函数

python
# LLM API 的本质:f(messages) → response
# 没有内部状态,没有副作用,没有记忆

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": "帮我优化这段代码"},
    ],
    # ← 这里没有 memory 参数
    # ← 没有 user_id 参数
    # ← 没有 session_id 参数
    # 模型能看到的,只有这个 messages 列表
)

所以,你在 ChatGPT 里感受到的"它记住了我说的话",并不是模型本身有记忆——是应用层把你之前的对话全部塞进了 messages 列表,每次都重新发送给模型。这就是所谓的"上下文窗口"方案。

但这个方案有严重的局限性——这正是下一节要讨论的问题。

1.2 上下文窗口 ≠ 记忆:塞满 Token 的代价

"既然模型没有记忆,那把所有历史对话都塞进 messages 不就行了?" 这就是最朴素的"上下文窗口"方案——也是大多数 AI 应用的第一版实现。

但它有三个致命代价:

上下文窗口方案的三大代价:

  ┌─────────────────────────────────────────────────────┐
  │  代价一:Token 成本指数级增长                          │
  │                                                      │
  │  第 1 轮:发送 200 tokens    →  $0.003               │
  │  第 5 轮:发送 2,000 tokens  →  $0.030               │
  │  第 20 轮:发送 15,000 tokens →  $0.225              │
  │  第 50 轮:发送 50,000 tokens →  $0.750              │
  │                                                      │
  │  同一个用户聊 50 轮,仅 input 就花了 $1+              │
  │  日活 1 万用户 × 50 轮 = $10,000/天 ← 不可承受       │
  ├─────────────────────────────────────────────────────┤
  │  代价二:延迟随对话轮数线性增长                         │
  │                                                      │
  │  tokens 数量 → TTFT(首 Token 延迟)                  │
  │  2,000 tokens  →  ~0.8s                              │
  │  10,000 tokens →  ~2.5s                              │
  │  50,000 tokens →  ~8.0s                              │
  │  128,000 tokens → ~20s                               │
  │                                                      │
  │  聊得越久,用户等得越久——体验急剧恶化                  │
  ├─────────────────────────────────────────────────────┤
  │  代价三:注意力稀释(Lost in the Middle)               │
  │                                                      │
  │  上下文太长时,模型会"遗忘"中间部分的信息               │
  │  只关注开头(System Prompt)和末尾(最近几轮)          │
  │  第 3 轮说的重要需求?到第 20 轮已经被模型忽略了        │
  └─────────────────────────────────────────────────────┘

用一组对比数据来量化这三个问题:

对话轮数Input Tokens单次费用 (GPT-4o)TTFT 延迟中间信息召回率
5 轮~2,000$0.030.8s~95%
20 轮~15,000$0.233.5s~70%
50 轮~50,000$0.758.0s~45%
100 轮~120,000$1.8020s+~30%

💡 "Lost in the Middle"是经过论文验证的现象——斯坦福 2023 年的研究表明,当上下文超过 ~4K tokens 时,LLM 对中间位置信息的准确率显著下降。简单地"把所有对话塞进去",不仅浪费钱,模型还记不住。

这三个代价共同指向一个结论:上下文窗口是"工作台",不是"记忆"。就像你的电脑桌面——你可以在上面同时打开 20 个文件,但那不意味着你"记住"了所有文件的内容。

上下文窗口  ←→  工作台 / RAM(临时、有限、昂贵)
记忆系统    ←→  笔记本 / 磁盘(持久、分层、可检索)

正确的做法不是把所有东西堆在桌面上
而是建立一个智能的"笔记系统"——
  需要的时候翻出来,不需要的放回去

1.3 有记忆 vs 无记忆 Agent:一个对话案例的对比

理论讲完了,用一个真实的对话场景来直观感受差距。

假设用户"李明"在使用一个 AI 编程助手,分 3 天进行了多次对话:

═══════════════════════════════════════════════════════════
  第 1 天:李明初次使用
═══════════════════════════════════════════════════════════

  李明:我在做一个电商后端项目,用 FastAPI + PostgreSQL

  ┌─── 无记忆 Agent ──────────────────────────────────────┐
  │ "好的,FastAPI + PostgreSQL 是很好的技术栈..."          │
  └───────────────────────────────────────────────────────┘
  ┌─── 有记忆 Agent ──────────────────────────────────────┐
  │ "好的,FastAPI + PostgreSQL 是很好的技术栈..."          │
  │ [记忆写入] 用户:李明,项目:电商后端,技术栈:FastAPI+PG │
  └───────────────────────────────────────────────────────┘

═══════════════════════════════════════════════════════════
  第 2 天:李明回来提了个新需求(全新会话)
═══════════════════════════════════════════════════════════

  李明:帮我设计一个订单表

  ┌─── 无记忆 Agent ──────────────────────────────────────┐
  │ "好的,请问你使用的是什么数据库?MySQL 还是              │
  │  PostgreSQL?你的项目是什么类型的?"                     │
  │  ← 完全不记得昨天的对话,一切从头开始                    │
  └───────────────────────────────────────────────────────┘
  ┌─── 有记忆 Agent ──────────────────────────────────────┐
  │ "好的!基于你的 FastAPI + PostgreSQL 电商项目,          │
  │  我来设计订单表。考虑到 PG 的优势,我会用 JSONB          │
  │  来存储订单的扩展属性..."                               │
  │  ← 自动召回项目上下文,直接给出精准方案                  │
  └───────────────────────────────────────────────────────┘

═══════════════════════════════════════════════════════════
  第 3 天:李明遇到了性能问题(又一个全新会话)
═══════════════════════════════════════════════════════════

  李明:订单查询特别慢,怎么优化?

  ┌─── 无记忆 Agent ──────────────────────────────────────┐
  │ "请提供你的表结构、查询语句和数据库类型,                 │
  │  我才能帮你分析..."                                     │
  │  ← 需要用户重新提供所有背景信息                         │
  └───────────────────────────────────────────────────────┘
  ┌─── 有记忆 Agent ──────────────────────────────────────┐
  │ "你的电商项目之前用 PG 设计了订单表(含 JSONB 字段)。   │
  │  查询慢大概率是这几个原因:                              │
  │  1. JSONB 字段没建 GIN 索引                            │
  │  2. 订单表数据量大了需要分区                            │
  │  来,把慢查询的 EXPLAIN 贴给我看看..."                   │
  │  ← 结合历史记忆,直接定位到可能原因                      │
  └───────────────────────────────────────────────────────┘

差距一目了然。有记忆的 Agent 带来了三个关键能力:

记忆系统为 Agent 带来的三大能力:

  ① 连续性 ─── 跨会话保持上下文
     │         不需要用户每次重新介绍自己和项目

  ② 个性化 ─── 积累用户画像
     │         了解用户的技术栈、偏好、项目背景
     │         回答越来越精准、越来越"懂你"

  ③ 学习能力 ── 从交互中成长
                记住用户之前踩过的坑、做过的决定
                避免重复建议、提供递进式帮助

💡 记忆系统是 Agent 从"工具"进化为"助手"的分水岭——没有记忆的 Agent 永远是一个"每次见面都要重新自我介绍"的陌生人;有记忆的 Agent 才是一个真正了解你、持续成长的伙伴。

第 1 章核心知识回顾:

概念一句话解释
无状态本质LLM API 是纯函数,每次调用独立,没有内置记忆
上下文窗口把历史对话塞进 messages——成本高、延迟大、注意力稀释
Lost in the Middle上下文过长时模型遗忘中间信息的现象
记忆的价值连续性(跨会话)+ 个性化(用户画像)+ 学习能力
核心结论上下文窗口 = RAM(临时),记忆系统 = 磁盘(持久可检索)

2. 记忆系统的分类与架构总览

2.1 人类记忆的启示:短期 → 情景 → 语义

AI Agent 的记忆系统设计,灵感直接来自认知心理学对人类记忆的分类。理解人类记忆的工作方式,能帮你设计出更合理的 Agent 记忆架构。

心理学将人类记忆分为三种类型:

人类记忆的三层模型(Atkinson-Shiffrin):

  ┌─────────────────────────────────────────────────────────┐
  │  感觉记忆(Sensory Memory)                              │
  │  ─── 持续时间:毫秒级                                    │
  │  ─── 容量:大但转瞬即逝                                  │
  │  ─── 功能:接收原始感觉输入(视觉、听觉)                 │
  │  ─── 类比:Agent 收到的原始用户输入                       │
  └──────────────────────┬──────────────────────────────────┘
                         │ 注意力筛选

  ┌─────────────────────────────────────────────────────────┐
  │  短期记忆 / 工作记忆(Working Memory)                    │
  │  ─── 持续时间:秒到分钟                                  │
  │  ─── 容量:7 ± 2 个信息块(Miller 定律)                 │
  │  ─── 功能:当前正在处理的信息、推理的"工作台"              │
  │  ─── 类比:Agent 的上下文窗口(Context Window)           │
  └──────────────────────┬──────────────────────────────────┘
                         │ 编码 & 整合

  ┌─────────────────────────────────────────────────────────┐
  │  长期记忆(Long-term Memory)                            │
  │  ├── 情景记忆(Episodic)── "我经历了什么"               │
  │  │    └── 记住具体事件:时间、地点、上下文                 │
  │  │                                                       │
  │  └── 语义记忆(Semantic)── "我知道什么"                  │
  │       └── 抽象知识:事实、规律、概念                      │
  │                                                          │
  │  ─── 持续时间:分钟到终身                                │
  │  ─── 容量:理论上无限                                    │
  │  ─── 类比:Agent 的持久化存储(数据库 / 向量库)          │
  └─────────────────────────────────────────────────────────┘

三种记忆的关键区别在于保持时间、容量和用途

维度工作记忆情景记忆语义记忆
持续时间秒级长期长期
内容当前正在处理的信息具体经历和事件提炼后的知识和事实
示例"用户刚才说要用 Python""上周三讨论了数据库选型""这个用户偏好 PostgreSQL"
检索方式直接可用(在上下文中)按时间/情境回忆按语义关联检索
遗忘方式被新信息挤出随时间模糊但不消失被新知识更新替代

💡 情景记忆回答"发生了什么",语义记忆回答"我知道什么"——你记得"上个月和同事讨论了技术选型"(情景),也记得"PostgreSQL 比 MySQL 更适合复杂查询"(语义)。前者是经历,后者是从经历中提炼出的知识。Agent 记忆系统要同时支持这两种能力。

2.2 Agent 记忆的三层架构:Working / Episodic / Semantic

把人类记忆的三层模型映射到 Agent 工程实现,就得到了当前业界标准的三层记忆架构。这里用一个类比来帮你理解——Agent 的记忆系统就像操作系统的存储层级

Agent 记忆 ←→ 操作系统存储层级:

  ┌──────────────────────────────────────────────────────┐
  │  Working Memory(工作记忆)                           │
  │  ──────────────────────────────────────────────────── │
  │  OS 类比:  RAM(内存)                               │
  │  实现方式:  LLM 的 Context Window                    │
  │  存储内容:  当前对话历史、System Prompt、临时变量      │
  │  生命周期:  单次会话内                                │
  │  访问速度:  即时(已在上下文中)                       │
  │  容量限制:  受 Token 上限约束(128K / 200K)          │
  │  管理策略:  滑动窗口、摘要压缩、trim_messages         │
  ├──────────────────────────────────────────────────────┤
  │  Episodic Memory(情景记忆)                          │
  │  ──────────────────────────────────────────────────── │
  │  OS 类比:  磁盘缓存 / 最近文件列表                    │
  │  实现方式:  数据库(PostgreSQL / SQLite)             │
  │  存储内容:  完整的历史会话记录、事件流、交互日志        │
  │  生命周期:  跨会话持久化                              │
  │  访问速度:  需要检索(5-50ms)                        │
  │  容量限制:  存储空间充裕,但需要索引                   │
  │  管理策略:  时间过滤、关键词搜索、语义搜索             │
  ├──────────────────────────────────────────────────────┤
  │  Semantic Memory(语义记忆)                          │
  │  ──────────────────────────────────────────────────── │
  │  OS 类比:  持久存储 / 知识库                          │
  │  实现方式:  向量数据库 + 知识图谱                     │
  │  存储内容:  用户画像、提炼的事实、偏好、经验规律        │
  │  生命周期:  长期持久化(可更新、可遗忘)               │
  │  访问速度:  需要向量检索(10-100ms)                  │
  │  容量限制:  理论无限,但需要清理和压缩                 │
  │  管理策略:  事实抽取、冲突解决、时间衰减               │
  └──────────────────────────────────────────────────────┘

三层记忆如何协同工作?看一个完整的请求处理流程:

用户发送消息:"帮我优化订单查询的性能"


① 检索 Semantic Memory(语义记忆)
  │  → 查到:用户偏好 PostgreSQL、项目是电商系统
  │  → 查到:之前推荐过 JSONB 存储方案


② 检索 Episodic Memory(情景记忆)
  │  → 查到:3 天前讨论过订单表设计(含具体 DDL)
  │  → 查到:上周提到数据量约 500 万行


③ 组装 Working Memory(工作记忆)
  │  → System Prompt + 召回的记忆 + 最近 5 轮对话
  │  → 注入到 Context Window,调用 LLM


④ LLM 生成回答
  │  → "基于你之前的 PG 订单表(JSONB 字段+500 万行)..."


⑤ 记忆更新(后台异步)
     → Episodic:记录本次交互
     → Semantic:无新事实,跳过

💡 三层记忆不是互相替代,而是协同互补——Working Memory 提供即时上下文,Episodic Memory 提供历史经历,Semantic Memory 提供提炼的知识。一个完整的记忆系统需要三者的有机配合。

2.3 记忆系统架构全景图:存储、检索、更新、遗忘

理解了三层记忆结构后,我们来看完整的记忆系统由哪些模块组成。一个生产级的 Agent 记忆系统包含四大核心能力

记忆系统的四大核心模块:

  ┌───────────────────────────────────────────────────────┐
  │                    用户请求进入                         │
  │                        │                               │
  │    ┌───────────────────┼───────────────────┐           │
  │    │                   ▼                   │           │
  │    │          ┌─────────────────┐          │           │
  │    │          │   ① 记忆检索     │          │           │
  │    │          │   (Retrieval)    │          │           │
  │    │          │                  │          │           │
  │    │          │  语义搜索        │          │           │
  │    │          │  时间过滤        │          │           │
  │    │          │  图谱遍历        │          │           │
  │    │          └────────┬────────┘          │           │
  │    │                   ▼                   │           │
  │    │          ┌─────────────────┐          │           │
  │    │          │   LLM 推理      │          │           │
  │    │          │  (带记忆上下文) │          │           │
  │    │          └────────┬────────┘          │           │
  │    │                   ▼                   │           │
  │    │  ┌────────────────┼────────────────┐  │           │
  │    │  │                │                │  │           │
  │    │  ▼                ▼                ▼  │           │
  │    │ ② 记忆存储    ③ 记忆更新     ④ 记忆遗忘│           │
  │    │ (Storage)     (Update)      (Forget)  │           │
  │    │                                       │           │
  │    │ 原始交互归档   事实抽取       时间衰减  │           │
  │    │ Embedding 入库 冲突合并       容量淘汰  │           │
  │    │ 图谱节点写入   偏好更新       主动删除  │           │
  │    └───────────────────────────────────────┘           │
  └───────────────────────────────────────────────────────┘

四大模块各自的职责和技术选型:

模块职责输入输出典型技术
① 检索找到与当前请求相关的历史记忆用户 query + user_id记忆片段列表向量搜索、BM25、图查询
② 存储将新的交互和知识持久化对话记录、抽取的事实写入数据库PG、Redis、Milvus、Neo4j
③ 更新处理新旧知识冲突,保持记忆一致性新事实 vs 旧事实更新/合并操作LLM 判断 + CRUD
④ 遗忘清理过时、冗余、不重要的记忆全量记忆 + 衰减规则删除/归档操作TTL、LRU、衰减函数

不同技术栈在三层记忆中的分工:

技术栈与记忆层级的映射:

  Working Memory
  ├── LangGraph Checkpointer(会话状态持久化)
  ├── Redis(高速状态缓存)
  └── trim_messages / RemoveMessage(上下文裁剪)

  Episodic Memory
  ├── PostgreSQL / SQLite(结构化事件流存储)
  ├── Elasticsearch(全文搜索历史对话)
  └── LangGraph Store(跨线程持久化)

  Semantic Memory
  ├── pgvector / Milvus / Qdrant(向量检索)
  ├── Neo4j / NetworkX(知识图谱)
  ├── Mem0(一站式记忆中间件)
  └── Letta(虚拟上下文管理)

💡 不需要一步到位——大多数 Agent 应用从 Working Memory(第 3 章)开始就够用了。当你发现用户反复被要求"重新介绍自己"时,再加 Semantic Memory(第 5 章)。当你需要回溯"上次做了什么决定"时,再加 Episodic Memory(第 4 章)。渐进式地构建记忆系统,而不是一开始就搭建全套架构。

第 2 章核心知识回顾:

概念一句话解释
三层记忆Working(当前上下文)+ Episodic(历史经历)+ Semantic(提炼知识)
OS 类比Working = RAM,Episodic = 磁盘缓存,Semantic = 持久存储
四大模块存储、检索、更新、遗忘——记忆系统的完整生命周期
协同流程先检索语义→再检索情景→组装上下文→LLM 推理→异步更新记忆
渐进式构建从 Working Memory 开始,按需逐步添加 Episodic 和 Semantic

3. 短期记忆(Working Memory):管理上下文窗口

短期记忆是最基础、也是必须最先解决的记忆层。它的核心问题只有一个:如何在有限的上下文窗口里,保留最有价值的信息

3.1 最简方案:滑动窗口与消息裁剪

最直观的想法:对话太长了?删掉前面的旧消息,只保留最近 N 轮。这就是"滑动窗口"策略。

滑动窗口的工作方式:

  对话进行到第 10 轮,窗口大小 = 5:

  ┌──────────────────────────────────────┐
  │  消息 1  ← 已丢弃                    │
  │  消息 2  ← 已丢弃                    │
  │  消息 3  ← 已丢弃                    │
  │  消息 4  ← 已丢弃                    │
  │  消息 5  ← 已丢弃                    │
  ├──────────────────────────────────────┤
  │  消息 6  ← 保留 ┐                    │
  │  消息 7  ← 保留  │                   │
  │  消息 8  ← 保留  │ 发送给 LLM        │
  │  消息 9  ← 保留  │                   │
  │  消息 10 ← 保留 ┘                    │
  └──────────────────────────────────────┘

  简单粗暴,但会丢失早期的重要信息

有三种裁剪策略,复杂度递增:

策略一:按轮数裁剪(最简单)

python
def trim_by_turns(messages: list[dict], max_turns: int = 10) -> list[dict]:
    """保留最近 N 轮对话(1 轮 = 1 个 user + 1 个 assistant)"""
    # 始终保留 system message
    system_msgs = [m for m in messages if m["role"] == "system"]
    other_msgs = [m for m in messages if m["role"] != "system"]
    
    # 每轮 2 条消息(user + assistant),保留最近 max_turns 轮
    keep_count = max_turns * 2
    trimmed = other_msgs[-keep_count:] if len(other_msgs) > keep_count else other_msgs
    
    return system_msgs + trimmed

# 使用
messages = trim_by_turns(messages, max_turns=10)
response = client.chat.completions.create(model="gpt-4o", messages=messages)

策略二:按 Token 数裁剪(更精确)

python
import tiktoken

def trim_by_tokens(
    messages: list[dict], 
    max_tokens: int = 8000,
    model: str = "gpt-4o",
) -> list[dict]:
    """按 Token 数裁剪,从旧到新删除消息直到不超限"""
    encoder = tiktoken.encoding_for_model(model)
    
    # system message 始终保留
    system_msgs = [m for m in messages if m["role"] == "system"]
    other_msgs = [m for m in messages if m["role"] != "system"]
    
    # 计算 system message 占用的 token
    system_tokens = sum(len(encoder.encode(m["content"])) for m in system_msgs)
    available_tokens = max_tokens - system_tokens
    
    # 从后往前累计,直到超出预算
    kept = []
    current_tokens = 0
    for msg in reversed(other_msgs):
        msg_tokens = len(encoder.encode(msg["content"])) + 4  # 消息头开销
        if current_tokens + msg_tokens > available_tokens:
            break
        kept.insert(0, msg)
        current_tokens += msg_tokens
    
    return system_msgs + kept

# 使用:保留最多 8000 tokens 的对话历史
messages = trim_by_tokens(messages, max_tokens=8000)

策略三:用 LangChain trim_messages(推荐)

python
from langchain_core.messages import trim_messages, SystemMessage, HumanMessage
import tiktoken

# LangChain 内置的消息裁剪工具
trimmed = trim_messages(
    messages,
    max_tokens=8000,
    strategy="last",          # 保留最近的消息
    token_counter=tiktoken.encoding_for_model("gpt-4o").encode,
    include_system=True,      # 始终保留 system message
    start_on="human",         # 裁剪后确保第一条是 human 消息
)

三种策略的对比:

策略优点缺点适用场景
按轮数实现最简单长消息可能超限、短消息浪费空间原型验证
按 Token精确控制上下文大小需要 tiktoken,计算有开销生产环境
trim_messages开箱即用、处理边界情况依赖 LangChainLangChain/LangGraph 项目

💡 滑动窗口的致命缺陷是"硬删除"——第 1 轮说的"我叫张三,做电商项目"在第 20 轮就被丢弃了。用户会疑惑"我明明说过了你怎么不记得"。解决这个问题需要下一节的"对话摘要压缩"。

3.2 对话摘要压缩:用 LLM 总结历史对话

滑动窗口的问题是"丢了就丢了"——被删除的消息再也找不回来。对话摘要压缩解决了这个问题:把旧消息"压缩"成一段摘要,保留关键信息但大幅减少 Token 数

对话摘要压缩的工作流程:

  原始对话(20 轮,约 8000 tokens):
  ┌──────────────────────────────────────┐
  │  消息 1-15(旧消息,约 6000 tokens)   │
  │                                       │
  │         ↓ 用 LLM 压缩为摘要           │
  │                                       │
  │  摘要(约 300 tokens):              │
  │  "用户李明,后端开发,做电商项目,      │
  │   技术栈 FastAPI+PG,已讨论过订单表    │
  │   设计和 JSONB 字段方案。"             │
  ├──────────────────────────────────────┤
  │  消息 16-20(最近 5 轮,约 2000 tokens)│
  └──────────────────────────────────────┘

  压缩比:8000 → 2300 tokens(节省 71%)
  关键信息:全部保留 ✓

核心实现:

python
from openai import AsyncOpenAI

SUMMARY_PROMPT = """请将以下对话历史压缩为一段简洁的摘要。

要求:
1. 保留所有关键信息:用户身份、项目背景、技术选型、已做的决定
2. 保留具体的数据和数字(表名、字段名、配置值等)
3. 按时间顺序组织,标注每个信息的来源轮次
4. 用第三人称描述("用户提到..."、"已决定...")
5. 控制在 200-400 字以内

对话历史:
{conversation}

请输出摘要:"""

class ConversationSummaryManager:
    """对话摘要管理器"""
    
    def __init__(self, client: AsyncOpenAI, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model  # 摘要用小模型即可,省钱
        self.summary: str = ""
    
    async def maybe_compress(
        self,
        messages: list[dict],
        max_tokens: int = 4000,
        keep_recent: int = 6,
    ) -> list[dict]:
        """当消息超过阈值时,触发摘要压缩"""
        # 估算当前 tokens(粗略:1 个中文字符 ≈ 2 tokens)
        total_chars = sum(len(m["content"]) for m in messages)
        estimated_tokens = total_chars * 2
        
        if estimated_tokens <= max_tokens:
            return messages  # 未超限,无需压缩
        
        # 分离 system + 旧消息 + 最近消息
        system_msgs = [m for m in messages if m["role"] == "system"]
        other_msgs = [m for m in messages if m["role"] != "system"]
        
        old_msgs = other_msgs[:-keep_recent]
        recent_msgs = other_msgs[-keep_recent:]
        
        if not old_msgs:
            return messages  # 没有可压缩的旧消息
        
        # 生成摘要
        self.summary = await self._summarize(old_msgs)
        
        # 组装新的消息列表:system + 摘要 + 最近消息
        summary_msg = {
            "role": "system",
            "content": f"[对话历史摘要]\n{self.summary}",
        }
        
        return system_msgs + [summary_msg] + recent_msgs
    
    async def _summarize(self, messages: list[dict]) -> str:
        """调用 LLM 生成摘要"""
        conversation = "\n".join(
            f"{m['role']}: {m['content']}" for m in messages
        )
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{
                "role": "user",
                "content": SUMMARY_PROMPT.format(conversation=conversation),
            }],
            temperature=0,
            max_tokens=500,
        )
        return response.choices[0].message.content

使用示例:

python
# 在对话循环中使用
summary_manager = ConversationSummaryManager(client)

async def chat(user_message: str, messages: list[dict]) -> str:
    messages.append({"role": "user", "content": user_message})
    
    # 自动判断是否需要压缩
    compressed = await summary_manager.maybe_compress(messages)
    
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=compressed,
    )
    
    reply = response.choices[0].message.content
    messages.append({"role": "assistant", "content": reply})
    return reply

摘要压缩的触发时机有两种策略:

触发策略实现方式优点缺点
Token 阈值消息总 Token 超过阈值时触发精确控制上下文大小需要 Token 计数
轮数阈值每隔 N 轮触发一次实现简单,开销可预测Token 控制不够精确

💡 摘要用 gpt-4o-mini 而非 gpt-4o——摘要任务不需要复杂推理,小模型完全胜任。一次摘要的成本约 $0.0003,远低于重复发送 6000 tokens 旧消息的成本 $0.09。摘要一次,省钱 300 次。

3.3 LangGraph 实战:Checkpointer + 自定义摘要节点

前面的代码是"手搓"实现。在 LangGraph 中,短期记忆管理有更优雅的方式——利用 Checkpointer 自动持久化状态 + 自定义节点做摘要压缩

LangGraph 短期记忆架构:

  用户消息


  ┌─────────────────────┐
  │  conversation 节点   │ ← Checkpointer 自动恢复上次状态
  │  ─────────────────── │
  │  加载 messages       │
  │  加载 summary        │
  │  调用 LLM            │
  └──────────┬──────────┘


  ┌─────────────────────┐
  │  should_summarize?   │ ← 条件判断:消息数 > 阈值?
  │  ─────────────────── │
  │  YES → summarize     │
  │  NO  → END           │
  └──────────┬──────────┘
             │ YES

  ┌─────────────────────┐
  │  summarize 节点      │
  │  ─────────────────── │
  │  压缩旧消息为摘要    │
  │  删除旧消息          │
  │  更新 summary 状态   │
  └──────────┬──────────┘


         Checkpointer 自动保存状态 → END

完整代码实现:

python
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import (
    SystemMessage, HumanMessage, AIMessage, RemoveMessage,
    trim_messages,
)
from langchain_openai import ChatOpenAI
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages

# 1. 定义状态——包含 messages 和 summary 两个字段
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # 对话历史(自动追加)
    summary: str                              # 累积摘要

# 2. 初始化模型
llm = ChatOpenAI(model="gpt-4o", temperature=0)
summary_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)  # 摘要用小模型

# 3. 对话节点:加载摘要 + 调用 LLM
async def conversation(state: AgentState) -> dict:
    summary = state.get("summary", "")
    
    # 如果有历史摘要,注入到 system message 中
    messages = state["messages"]
    if summary:
        system_with_summary = (
            f"你是一个智能助手。\n\n"
            f"以下是之前对话的摘要,供你参考:\n{summary}"
        )
        messages = [SystemMessage(content=system_with_summary)] + messages
    
    response = await llm.ainvoke(messages)
    return {"messages": [response]}

# 4. 摘要节点:压缩旧消息 + 删除
async def summarize(state: AgentState) -> dict:
    summary = state.get("summary", "")
    messages = state["messages"]
    
    # 构造摘要 Prompt
    if summary:
        prompt = (
            f"请更新以下对话摘要,加入新的对话内容。\n\n"
            f"已有摘要:\n{summary}\n\n"
            f"新对话:\n"
        )
    else:
        prompt = "请将以下对话压缩为摘要,保留所有关键信息:\n\n"
    
    # 取出需要压缩的旧消息(保留最近 2 轮)
    old_messages = messages[:-4]  # 保留最近 4 条(2 轮)
    
    prompt += "\n".join(
        f"{m.type}: {m.content}" for m in old_messages
    )
    
    response = await summary_llm.ainvoke([HumanMessage(content=prompt)])
    
    # 删除旧消息(用 RemoveMessage 标记删除)
    delete_messages = [RemoveMessage(id=m.id) for m in old_messages]
    
    return {
        "summary": response.content,
        "messages": delete_messages,
    }

# 5. 条件判断:是否需要触发摘要
def should_summarize(state: AgentState) -> str:
    messages = state["messages"]
    # 消息数超过 10 条(5 轮)时触发摘要
    if len(messages) > 10:
        return "summarize"
    return END

# 6. 组装 Graph
graph = StateGraph(AgentState)
graph.add_node("conversation", conversation)
graph.add_node("summarize", summarize)

graph.add_edge(START, "conversation")
graph.add_conditional_edges("conversation", should_summarize)
graph.add_edge("summarize", END)

# 7. 加上 Checkpointer(状态自动持久化)
checkpointer = MemorySaver()  # 生产环境用 PostgresSaver
app = graph.compile(checkpointer=checkpointer)

# 8. 使用:同一个 thread_id 的对话自动恢复状态
config = {"configurable": {"thread_id": "user-zhangsan-001"&#125;&#125;

result = await app.ainvoke(
    {"messages": [HumanMessage(content="我叫张三,做电商后端")]},
    config=config,
)
# → 第 1 轮,Checkpointer 保存状态

result = await app.ainvoke(
    {"messages": [HumanMessage(content="帮我设计订单表")]},
    config=config,
)
# → 第 2 轮,自动恢复上次的 messages + summary

💡 Checkpointer 是 LangGraph 短期记忆的核心——它在每个节点执行后自动保存完整的 Graph 状态(messages + summary + 任何自定义字段)。同一个 thread_id 的后续请求会自动恢复上次的状态。生产环境把 MemorySaver 换成 PostgresSaverSqliteSaver 即可。

3.4 Token 预算管理:动态分配上下文空间

当 Agent 变得复杂——有 System Prompt、记忆摘要、RAG 检索结果、工具调用结果、对话历史——上下文窗口就成了稀缺资源,需要精细化的"预算管理"。

核心思路是把上下文窗口看作一个固定预算,各模块按优先级分配份额

Token 预算分配示例(以 GPT-4o 128K 为例,实际使用 32K):

  总预算:32,000 tokens
  ┌─────────────────────────────────────────────┐
  │  ① System Prompt       ── 2,000(固定)      │
  │  ② 记忆摘要            ── 1,000(动态)      │
  │  ③ 长期记忆检索结果     ── 2,000(动态)      │
  │  ④ 对话历史            ── 20,000(弹性)     │
  │  ⑤ 当前用户输入        ── 3,000(实际值)     │
  │  ⑥ 预留输出空间        ── 4,000(固定)      │
  └─────────────────────────────────────────────┘

  分配优先级(高 → 低):
  ① System Prompt(不可压缩,必须完整保留)
  ⑥ 输出空间(必须预留,否则回答被截断)
  ⑤ 当前输入(用户刚发的消息,不能删)
  ② 记忆摘要(高价值信息密度)
  ③ 检索结果(按相关度排序,可截断)
  ④ 对话历史(最低优先级,超限时压缩或裁剪)

代码实现:

python
from dataclasses import dataclass

@dataclass
class TokenBudget:
    """Token 预算分配器"""
    total: int = 32000
    system_prompt: int = 2000      # 固定
    output_reserved: int = 4000    # 固定预留
    memory_summary: int = 1000     # 上限
    retrieval_results: int = 2000  # 上限
    
    @property
    def conversation_budget(self) -> int:
        """对话历史的可用预算 = 总预算 - 其他模块"""
        used = (
            self.system_prompt 
            + self.output_reserved 
            + self.memory_summary 
            + self.retrieval_results
        )
        return self.total - used
    
    def allocate(self, current_input_tokens: int) -> dict:
        """根据当前输入动态分配预算"""
        conv_budget = self.conversation_budget - current_input_tokens
        
        return {
            "system_prompt": self.system_prompt,
            "memory_summary": self.memory_summary,
            "retrieval": self.retrieval_results,
            "conversation_history": max(conv_budget, 0),
            "current_input": current_input_tokens,
            "output_reserved": self.output_reserved,
        }

# 使用
budget = TokenBudget(total=32000)
allocation = budget.allocate(current_input_tokens=500)
# → conversation_history: 22500(有大量空间给对话历史)

allocation = budget.allocate(current_input_tokens=5000)
# → conversation_history: 18000(用户输入变长,对话历史空间被压缩)

💡 永远为输出预留足够空间——最常见的新手错误是把 Token 全分给了 input,结果 max_tokens 不够用,回答被截断。建议预留 4000-8000 tokens 给输出。另外,不要盲目使用 128K 全部容量——实际上用到 32K 以上时,"Lost in the Middle" 问题会让模型质量显著下降。

第 3 章核心知识回顾:

概念一句话解释
滑动窗口保留最近 N 轮对话,简单但会丢失早期信息
对话摘要用小模型压缩旧消息为摘要,Token 节省 70%+
LangGraph Checkpointer自动持久化 Graph 状态,同一 thread_id 恢复上下文
RemoveMessageLangGraph 的消息删除机制,配合摘要节点使用
Token 预算按优先级分配上下文空间:System > 输出 > 输入 > 记忆 > 对话历史

4. 情景记忆(Episodic Memory):记住"发生过什么"

短期记忆解决了"当前会话"的上下文连续性。但当用户关闭窗口、第二天再来,短期记忆就清空了。情景记忆解决的是跨会话的经历回溯——"上次我们讨论了什么"。

4.1 什么是情景记忆:从"知道什么"到"经历过什么"

情景记忆和语义记忆容易混淆。用一个类比来区分:

你去医院看病:

  情景记忆 ─── "3 月 15 日去了协和医院,挂的消化内科,
               张医生说可能是胃炎,开了奥美拉唑,
               让我两周后复查"

  语义记忆 ─── "我有胃炎"
               "我在吃奥美拉唑"

  情景记忆保留了完整的场景和时间线
  语义记忆只保留了提炼后的事实

对于 Agent 而言:

维度情景记忆语义记忆
存储内容完整的对话记录、事件流提炼后的事实和偏好
回答的问题"上次我们讨论了什么?""这个用户喜欢什么?"
数据格式带时间戳的完整交互日志key-value 事实对 / 向量
检索方式按时间范围 + 关键词按语义相似度
存储量大(每次交互都记录)小(只存提炼结果)
典型用途回溯决策过程、审计、上下文恢复个性化推荐、偏好记忆

什么时候需要情景记忆?

需要情景记忆的场景:

  ✅ "上次我们讨论的订单表方案是什么?"
     → 需要回溯到具体那次对话的完整内容

  ✅ "之前你建议我用 JSONB,为什么?"
     → 需要找到当时的推理过程和上下文

  ✅ "我记得上周我改过一次密码策略..."
     → 需要按时间检索历史操作

  ✅ 多步骤任务的断点恢复
     → "昨天做到哪一步了?继续"

不需要情景记忆的场景:

  ❌ "我叫什么名字?"
     → 语义记忆就够了(事实查询)

  ❌ "我喜欢用什么编程语言?"
     → 语义记忆就够了(偏好查询)

💡 情景记忆是"事件的录像",语义记忆是"录像中提炼的笔记"——两者互补而非替代。第 5 章会介绍语义记忆,本章聚焦于"如何高效存储和检索历史交互事件"。

4.2 事件流设计:结构化存储每次交互

情景记忆的存储核心是一个事件流——按时间顺序记录每次交互的完整信息。推荐用两张表来组织:

情景记忆的数据模型:

  ┌─────────────────────────────────────────────┐
  │  sessions(会话表)                           │
  │  ─────────────────────────────────────────── │
  │  一个 session = 一次连续的对话               │
  │                                               │
  │  id          UUID                             │
  │  user_id     VARCHAR       ← 用户隔离        │
  │  title       VARCHAR       ← 会话标题        │
  │  summary     TEXT          ← 会话摘要        │
  │  started_at  TIMESTAMPTZ                      │
  │  ended_at    TIMESTAMPTZ                      │
  │  metadata    JSONB         ← 扩展字段        │
  └──────────────────┬──────────────────────────┘
                     │ 1:N

  ┌─────────────────────────────────────────────┐
  │  events(事件表)                             │
  │  ─────────────────────────────────────────── │
  │  一个 event = 一轮交互内的一条消息/动作       │
  │                                               │
  │  id          UUID                             │
  │  session_id  UUID (FK)     ← 归属的会话      │
  │  event_type  VARCHAR       ← user/assistant/  │
  │                               tool_call/error │
  │  content     TEXT          ← 消息内容        │
  │  token_count INTEGER       ← Token 消耗      │
  │  created_at  TIMESTAMPTZ                      │
  │  metadata    JSONB         ← 模型、延迟等    │
  └─────────────────────────────────────────────┘

SQL 建表:

sql
-- 会话表
CREATE TABLE sessions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id VARCHAR(100) NOT NULL,
    title VARCHAR(200),
    summary TEXT,
    started_at TIMESTAMPTZ DEFAULT NOW(),
    ended_at TIMESTAMPTZ,
    metadata JSONB DEFAULT '{}',
    
    -- 索引:按用户 + 时间查询
    CONSTRAINT idx_sessions_user_time 
        UNIQUE (user_id, started_at)
);

CREATE INDEX idx_sessions_user ON sessions(user_id);

-- 事件表
CREATE TABLE events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    session_id UUID NOT NULL REFERENCES sessions(id),
    event_type VARCHAR(20) NOT NULL,  -- user/assistant/tool_call/error
    content TEXT NOT NULL,
    token_count INTEGER DEFAULT 0,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    metadata JSONB DEFAULT '{}',
    
    -- 全文搜索索引(用于关键词检索)
    search_vector tsvector GENERATED ALWAYS AS (
        to_tsvector('simple', content)
    ) STORED
);

CREATE INDEX idx_events_session ON events(session_id, created_at);
CREATE INDEX idx_events_search ON events USING GIN(search_vector);

Python ORM 模型和写入服务:

python
from sqlalchemy import Column, String, Text, Integer, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime
import uuid

class Session(Base):
    __tablename__ = "sessions"
    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    user_id = Column(String(100), nullable=False, index=True)
    title = Column(String(200))
    summary = Column(Text)
    started_at = Column(DateTime(timezone=True), default=datetime.utcnow)
    ended_at = Column(DateTime(timezone=True))
    metadata_ = Column("metadata", JSONB, default={})

class Event(Base):
    __tablename__ = "events"
    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    session_id = Column(UUID, ForeignKey("sessions.id"), nullable=False)
    event_type = Column(String(20), nullable=False)
    content = Column(Text, nullable=False)
    token_count = Column(Integer, default=0)
    created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
    metadata_ = Column("metadata", JSONB, default={})

class EpisodicMemoryService:
    """情景记忆服务:记录和检索历史交互"""
    
    def __init__(self, db: AsyncSession):
        self.db = db
    
    async def create_session(self, user_id: str, title: str = "") -> str:
        """创建新会话"""
        session = Session(user_id=user_id, title=title)
        self.db.add(session)
        await self.db.commit()
        return str(session.id)
    
    async def add_event(
        self, session_id: str, event_type: str,
        content: str, token_count: int = 0, **metadata
    ):
        """记录一条交互事件"""
        event = Event(
            session_id=session_id,
            event_type=event_type,
            content=content,
            token_count=token_count,
            metadata_=metadata,
        )
        self.db.add(event)
        await self.db.commit()

💡 metadata JSONB 字段是可扩展性的关键——你可以往里塞模型名称、响应延迟、用户满意度评分、工具调用参数等任意结构化数据,不需要改表结构。这在后期做分析和调试时非常有价值。

4.3 会话检索策略:时间过滤 + 关键词 + 语义搜索

情景记忆的核心挑战不在存储(数据库能存),而在检索——当用户有几百次历史会话时,怎么快速找到最相关的那几条?

三种检索策略,按复杂度递增:

检索策略组合:

  用户说:"上次讨论的数据库方案"


  ① 时间过滤 ─── 最近 30 天的会话(粗筛)
    │              结果:50 个会话

  ② 关键词搜索 ── 包含"数据库"的事件(精筛)
    │              结果:8 个会话

  ③ 语义排序 ─── 按与"数据库方案"的语义相似度排序
                   结果:Top 3 最相关的会话

策略一:时间过滤

python
from sqlalchemy import text
from datetime import datetime, timedelta

async def search_by_time(
    db: AsyncSession, user_id: str,
    days: int = 30, limit: int = 20,
) -> list[dict]:
    """按时间范围检索最近的会话"""
    since = datetime.utcnow() - timedelta(days=days)
    
    result = await db.execute(text("""
        SELECT s.id, s.title, s.summary, s.started_at,
               COUNT(e.id) as event_count
        FROM sessions s
        LEFT JOIN events e ON e.session_id = s.id
        WHERE s.user_id = :user_id
          AND s.started_at > :since
        GROUP BY s.id
        ORDER BY s.started_at DESC
        LIMIT :limit
    """), {"user_id": user_id, "since": since, "limit": limit})
    
    return [dict(row._mapping) for row in result.fetchall()]

策略二:全文关键词搜索(PG tsvector)

python
async def search_by_keyword(
    db: AsyncSession, user_id: str,
    keyword: str, limit: int = 10,
) -> list[dict]:
    """用 PostgreSQL 全文检索搜索历史事件"""
    result = await db.execute(text("""
        SELECT e.content, e.event_type, e.created_at,
               s.title as session_title,
               ts_rank(e.search_vector, query) as rank
        FROM events e
        JOIN sessions s ON s.id = e.session_id,
             plainto_tsquery('simple', :keyword) query
        WHERE s.user_id = :user_id
          AND e.search_vector @@ query
        ORDER BY rank DESC, e.created_at DESC
        LIMIT :limit
    """), {"user_id": user_id, "keyword": keyword, "limit": limit})
    
    return [dict(row._mapping) for row in result.fetchall()]

策略三:语义搜索(需要 Embedding)

python
async def search_by_semantic(
    db: AsyncSession, user_id: str,
    query: str, embedding_service, limit: int = 5,
) -> list[dict]:
    """用向量相似度搜索语义相关的历史事件"""
    # 需要 events 表额外有 embedding 列
    query_vec = await embedding_service.embed(query)
    
    result = await db.execute(text("""
        SELECT e.content, e.event_type, e.created_at,
               s.title as session_title,
               1 - (e.embedding <=> :vec) as similarity
        FROM events e
        JOIN sessions s ON s.id = e.session_id
        WHERE s.user_id = :user_id
          AND 1 - (e.embedding <=> :vec) > 0.75
        ORDER BY e.embedding <=> :vec
        LIMIT :limit
    """), {"user_id": user_id, "vec": str(query_vec), "limit": limit})
    
    return [dict(row._mapping) for row in result.fetchall()]

三种策略的权衡:

策略延迟准确度额外成本推荐场景
时间过滤<5ms低(只按时间)初始粗筛,必选
关键词搜索<10ms中(精确匹配)无(PG 内置)用户提到具体名词时
语义搜索<50ms高(理解语义)需要 Embedding用户描述模糊时

💡 推荐组合使用:时间过滤 + 关键词搜索是性价比最高的方案——零额外成本,覆盖 80% 的情景记忆检索需求。只有当你发现用户经常用模糊描述("之前那个方案")且关键词搜索找不到时,再引入语义搜索。

4.4 实战:构建可回溯的多轮对话系统

把前面的存储和检索整合起来,构建一个完整的"带情景记忆的对话系统":

完整数据流:

  用户发消息 → "上次的订单表方案还需要调整"


  ① 情景记忆检索
  │  → search_by_keyword("订单表") 
  │  → 找到 3 天前的会话,包含完整的表设计讨论


  ② 组装上下文
  │  → System Prompt
  │  → [相关历史] 3天前讨论的订单表设计(DDL + 设计理由)
  │  → 当前会话的最近 5 轮对话
  │  → 当前用户输入


  ③ 调用 LLM → 生成回答


  ④ 记录事件(异步)
     → 记录 user 消息和 assistant 回答到 events 表

核心代码——把情景记忆注入到 Agent 对话中:

python
class EpisodicChatAgent:
    """带情景记忆的对话 Agent"""
    
    def __init__(
        self, 
        client: AsyncOpenAI,
        episodic_memory: EpisodicMemoryService,
    ):
        self.client = client
        self.memory = episodic_memory
    
    async def chat(
        self, user_id: str, session_id: str,
        message: str,
    ) -> str:
        # 1. 检索相关的历史会话
        relevant_history = await self.memory.search_by_keyword(
            user_id=user_id, keyword=message, limit=3
        )
        
        # 2. 格式化历史记忆
        history_context = ""
        if relevant_history:
            history_context = "\n\n[相关历史对话]\n"
            for item in relevant_history:
                history_context += (
                    f"- {item['created_at']:%m-%d} "
                    f"({item['session_title']}): "
                    f"{item['content'][:200]}\n"
                )
        
        # 3. 组装消息
        messages = [
            {"role": "system", "content": (
                "你是一个智能助手,拥有与用户的历史对话记忆。"
                "请参考历史记录来提供连续、个性化的帮助。"
                f"{history_context}"
            )},
            {"role": "user", "content": message},
        ]
        
        # 4. 调用 LLM
        response = await self.client.chat.completions.create(
            model="gpt-4o", messages=messages, temperature=0,
        )
        reply = response.choices[0].message.content
        
        # 5. 记录本次交互(异步,不阻塞响应)
        await self.memory.add_event(
            session_id=session_id,
            event_type="user",
            content=message,
        )
        await self.memory.add_event(
            session_id=session_id,
            event_type="assistant",
            content=reply,
        )
        
        return reply

💡 情景记忆的写入应该异步化——记录事件不应该阻塞用户等待回答。用 asyncio.create_task() 或消息队列把写入操作放到后台。同理,会话结束时生成 summary 也应该异步进行。

第 4 章核心知识回顾:

概念一句话解释
情景 vs 语义情景 = "发生了什么"(事件流),语义 = "知道什么"(事实)
数据模型sessions + events 两张表,JSONB metadata 保证可扩展性
三种检索时间过滤(粗筛)→ 关键词搜索(精筛)→ 语义搜索(兜底)
推荐方案时间 + 关键词组合覆盖 80% 场景,零额外成本
异步写入事件记录不阻塞响应,后台异步写入数据库

5. 语义记忆(Semantic Memory):提炼持久化知识

情景记忆回答的是"发生过什么",但用户经常问的是更直接的问题:"你还记得我喜欢什么吗?" "我们之前决定用什么技术栈来着?" 这些问题需要的不是完整的事件回放,而是从大量交互中提炼出的事实和知识——这就是语义记忆的职责。

如果说情景记忆是"录像",语义记忆就是"笔记"——更紧凑、更结构化、检索更快。

5.1 语义记忆 vs RAG 知识库:有什么不同

"语义记忆不就是 RAG 吗?都是向量检索嘛。" 这是最常见的误解。两者确实在技术实现上有重叠(都用 Embedding + 向量搜索),但在数据来源、更新方式和用途上完全不同:

语义记忆 vs RAG 知识库——核心区别:

  ┌───────────────────────────────────────────────────────┐
  │  RAG 知识库                                           │
  │  ──────────────────────────────────────────────────── │
  │  数据来源:  预先准备的文档(PDF、网页、API 文档)     │
  │  更新频率:  低(手动导入、定期刷新)                  │
  │  内容类型:  客观知识(\"Python 3.12 支持泛型语法\")   │
  │  作用域:    全局(所有用户共享同一个知识库)           │
  │  生命周期:  由运维人员管理                            │
  │                                                       │
  │  典型查询:  \"FastAPI 怎么配置 CORS?\"               │
  │  → 从文档中检索答案,跟用户是谁无关                   │
  ├───────────────────────────────────────────────────────┤
  │  语义记忆                                             │
  │  ──────────────────────────────────────────────────── │
  │  数据来源:  从用户交互中自动提炼                      │
  │  更新频率:  高(每次对话后可能更新)                  │
  │  内容类型:  主观知识(\"这个用户偏好 PostgreSQL\")    │
  │  作用域:    用户级(每个用户独立)                    │
  │  生命周期:  自动管理(提炼、更新、遗忘)              │
  │                                                       │
  │  典型查询:  \"这个用户的技术栈是什么?\"              │
  │  → 从该用户的历史交互中提炼的 facts                   │
  └───────────────────────────────────────────────────────┘

用一个更直观的对比:

维度RAG 知识库语义记忆
比喻图书馆你的私人笔记本
数据来源预先导入的文档从对话中自动提炼
内容示例"PostgreSQL JSONB 支持 GIN 索引""用户李明偏好 PostgreSQL"
更新触发手动重新索引每次对话后自动判断
用户隔离通常不隔离严格按用户隔离
数据量大(百万级文档块)小(每用户几十到几百条事实)
典型实现LangChain Retriever + ChromaMem0 / 自建事实存储

两者可以(也应该)共存于同一个 Agent 中:

Agent 的双重知识来源:

  用户提问:\"帮我的电商项目选一个消息队列\"

    ├──→ RAG 知识库检索
    │    → \"RabbitMQ vs Kafka vs Redis Streams 的对比...\"
    │    → 客观技术知识,跟用户是谁无关

    └──→ 语义记忆检索
         → \"用户李明:技术栈 Python/FastAPI,偏好轻量方案,
            团队 3 人,项目阶段:MVP\"
         → 用户特有的上下文,决定推荐哪个方案

  两者结合 → \"考虑到你的 FastAPI 技术栈和 3 人小团队,
              建议先用 Redis Streams,MVP 阶段够用且运维简单\"

💡 RAG 提供「世界知识」,语义记忆提供「用户知识」——前者让 Agent 博学,后者让 Agent 懂你。两者不是替代关系,而是正交互补。如果你的 Agent 已经有了 RAG,加上语义记忆能让回答从"通用正确"升级到"个性化精准"。

5.2 事实抽取:用 LLM 从对话中提炼结构化知识

语义记忆的核心问题是:对话是非结构化的自然语言,怎么从中提取出结构化的事实? 答案是——用 LLM 做事实抽取。

事实抽取的工作流程:

  对话原文:
  ┌──────────────────────────────────────────────┐
  │  User:我叫李明,最近在做一个电商后端         │
  │  AI:  你好李明!用什么技术栈呢?             │
  │  User:FastAPI + PostgreSQL,部署在阿里云     │
  │  AI:  好的搭配...                            │
  │  User:团队就我和另一个前端,两个人            │
  └──────────────┬───────────────────────────────┘
                 │ LLM 事实抽取

  ┌──────────────────────────────────────────────┐
  │  提取的结构化事实:                            │
  │                                               │
  │  ① 用户姓名 = 李明                           │
  │  ② 项目类型 = 电商后端                       │
  │  ③ 后端框架 = FastAPI                        │
  │  ④ 数据库   = PostgreSQL                     │
  │  ⑤ 部署环境 = 阿里云                         │
  │  ⑥ 团队规模 = 2 人(1 后端 + 1 前端)        │
  └──────────────────────────────────────────────┘

核心实现——用结构化输出(Structured Output)做事实抽取:

python
from pydantic import BaseModel, Field
from openai import AsyncOpenAI

# 1. 定义事实的数据结构
class ExtractedFact(BaseModel):
    """从对话中提取的单条事实"""
    category: str = Field(
        description="事实类别:personal/project/preference/technical/decision"
    )
    key: str = Field(
        description="事实的键,如 'name', 'tech_stack', 'database'"
    )
    value: str = Field(
        description="事实的值,如 'PostgreSQL'"
    )
    confidence: float = Field(
        default=0.9,
        description="置信度 0-1,用户明确说的=1.0,推测的=0.5-0.8"
    )
    source_quote: str = Field(
        description="原文引用,用于溯源"
    )

class ExtractionResult(BaseModel):
    """一次对话的事实抽取结果"""
    facts: list[ExtractedFact] = Field(default_factory=list)
    has_new_info: bool = Field(
        description="本次对话是否包含值得记忆的新信息"
    )

# 2. 事实抽取 Prompt
EXTRACTION_PROMPT = """请从以下对话中提取值得长期记忆的事实信息。

提取规则:
1. 只提取用户明确提到或强烈暗示的事实,不要过度推测
2. 分类包括:personal(个人信息)、project(项目信息)、
   preference(偏好)、technical(技术选型)、decision(已做决定)
3. 忽略临时性的、一次性的信息(如"帮我看看这个报错")
4. 每条事实要附带原文引用(source_quote)

对话内容:
{conversation}

已有的记忆(避免重复提取):
{existing_facts}"""

# 3. 事实抽取服务
class FactExtractor:
    """从对话中抽取结构化事实"""
    
    def __init__(self, client: AsyncOpenAI, model: str = "gpt-4o-mini"):
        self.client = client
        self.model = model  # 抽取用小模型即可
    
    async def extract(
        self, 
        conversation: list[dict],
        existing_facts: list[dict] | None = None,
    ) -> ExtractionResult:
        """从对话中抽取结构化事实"""
        # 格式化对话
        conv_text = "\n".join(
            f"{m['role']}: {m['content']}" for m in conversation
        )
        
        # 格式化已有事实(避免重复)
        facts_text = "无" if not existing_facts else "\n".join(
            f"- {f['key']}: {f['value']}" for f in existing_facts
        )
        
        # 调用 LLM(使用结构化输出)
        response = await self.client.beta.chat.completions.parse(
            model=self.model,
            messages=[{
                "role": "user",
                "content": EXTRACTION_PROMPT.format(
                    conversation=conv_text,
                    existing_facts=facts_text,
                ),
            }],
            response_format=ExtractionResult,
            temperature=0,
        )
        
        return response.choices[0].message.parsed

使用示例:

python
extractor = FactExtractor(client)

# 模拟一段对话
conversation = [
    {"role": "user", "content": "我叫李明,最近在做一个电商后端"},
    {"role": "assistant", "content": "你好李明!用什么技术栈呢?"},
    {"role": "user", "content": "FastAPI + PostgreSQL,部署在阿里云"},
]

result = await extractor.extract(conversation)

# 输出:
# result.has_new_info = True
# result.facts = [
#   ExtractedFact(category="personal", key="name", value="李明",
#                 confidence=1.0, source_quote="我叫李明"),
#   ExtractedFact(category="project", key="type", value="电商后端",
#                 confidence=1.0, source_quote="做一个电商后端"),
#   ExtractedFact(category="technical", key="backend_framework",
#                 value="FastAPI", confidence=1.0,
#                 source_quote="FastAPI + PostgreSQL"),
#   ExtractedFact(category="technical", key="database",
#                 value="PostgreSQL", confidence=1.0,
#                 source_quote="FastAPI + PostgreSQL"),
#   ExtractedFact(category="technical", key="deployment",
#                 value="阿里云", confidence=1.0,
#                 source_quote="部署在阿里云"),
# ]

事实抽取的触发时机——不是每条消息都需要抽取:

事实抽取的触发策略:

  方案一:每轮对话后触发(简单但浪费)
  → 用户说"帮我看看这段代码的 bug" → 没有新事实,白抽取了
  → 额外成本:每次 $0.0002,日活 1 万 × 50 轮 = $100/天 ← 太贵

  方案二:先判断再抽取(推荐)✅
  → 先用简单规则判断"本轮对话是否可能包含新事实"
  → 只在有新信息时才触发 LLM 抽取
  → 成本降低 70-80%

  判断规则(免 LLM,纯规则):
  ├── 对话中出现自我介绍类关键词(我叫/我是/我的项目...)
  ├── 对话中出现技术选型关键词(用的是/选了/决定用...)
  ├── 对话中出现偏好表达(喜欢/偏好/习惯/不想用...)
  └── 对话轮数 > 3 且是新会话的前 5 轮(自我介绍阶段)

💡 事实抽取用 gpt-4o-mini + 结构化输出就够了——这个任务不需要复杂推理,关键是 Prompt 中的规则要写清楚。结构化输出(response_format)保证了返回值一定是合法的 JSON,省去了手动解析和容错的麻烦。单次抽取成本约 $0.0002,比不做记忆导致的重复沟通成本低得多。

5.3 向量检索记忆:Embedding 存储与相似度召回

事实提取出来了,怎么存?怎么检索?最直接的方案是向量数据库——把每条事实转成 Embedding 向量,通过语义相似度来召回相关记忆。

向量检索记忆的完整流程:

  ① 写入阶段(对话结束后异步执行)
  ┌──────────────────────────────────────────────┐
  │  提取的事实:\"用户偏好 PostgreSQL\"            │
  │       │                                       │
  │       ▼                                       │
  │  Embedding API ──→ [0.023, -0.87, 0.41, ...]  │
  │       │              1536 维向量               │
  │       ▼                                       │
  │  写入向量数据库(pgvector / Milvus / Qdrant) │
  │  ┌─────────────────────────────────────────┐  │
  │  │ id | user_id | text        | embedding  │  │
  │  │ 1  | u001   | 偏好PG      | [0.02,...] │  │
  │  │ 2  | u001   | 做电商后端   | [-0.1,...] │  │
  │  └─────────────────────────────────────────┘  │
  └──────────────────────────────────────────────┘

  ② 检索阶段(用户发送新消息时)
  ┌──────────────────────────────────────────────┐
  │  用户说:\"帮我选一个数据库方案\"               │
  │       │                                       │
  │       ▼                                       │
  │  Embedding API ──→ query_vector               │
  │       │                                       │
  │       ▼                                       │
  │  向量相似度搜索(余弦距离 < 阈值)             │
  │       │                                       │
  │       ▼                                       │
  │  召回结果:                                    │
  │  ├── \"用户偏好 PostgreSQL\"(相似度 0.89)     │
  │  ├── \"项目类型:电商后端\"(相似度 0.82)       │
  │  └── \"部署环境:阿里云\"(相似度 0.76)        │
  └──────────────────────────────────────────────┘

数据模型设计(pgvector 方案):

推荐用 PostgreSQL + pgvector 扩展——不需要额外部署向量数据库,一个 PG 实例搞定关系数据 + 向量搜索:

sql
-- 启用 pgvector 扩展
CREATE EXTENSION IF NOT EXISTS vector;

-- 语义记忆表
CREATE TABLE semantic_memories (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id VARCHAR(100) NOT NULL,
    
    -- 事实内容
    category VARCHAR(50) NOT NULL,    -- personal/project/preference/...
    fact_key VARCHAR(100) NOT NULL,   -- name, database, framework...
    fact_value TEXT NOT NULL,          -- PostgreSQL, FastAPI, ...
    
    -- 向量字段(text-embedding-3-small 输出 1536 维)
    embedding vector(1536),
    
    -- 元数据
    confidence FLOAT DEFAULT 0.9,     -- 置信度
    source_quote TEXT,                -- 原文引用
    access_count INTEGER DEFAULT 0,   -- 访问次数(用于淘汰)
    
    -- 时间戳
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    last_accessed_at TIMESTAMPTZ DEFAULT NOW(),
    
    -- 唯一约束:同一用户的同一 key 只有一条
    CONSTRAINT uq_user_fact UNIQUE (user_id, category, fact_key)
);

-- 向量索引(HNSW,检索更快)
CREATE INDEX idx_memories_embedding 
    ON semantic_memories USING hnsw (embedding vector_cosine_ops);

-- 用户索引
CREATE INDEX idx_memories_user ON semantic_memories(user_id);

Python 存储与检索服务:

python
from openai import AsyncOpenAI
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

class SemanticMemoryService:
    """语义记忆服务:存储和检索用户事实"""
    
    def __init__(self, db: AsyncSession, client: AsyncOpenAI):
        self.db = db
        self.client = client
    
    async def _embed(self, text_input: str) -> list[float]:
        """生成 Embedding 向量"""
        response = await self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text_input,
        )
        return response.data[0].embedding
    
    async def store_fact(self, user_id: str, fact: "ExtractedFact"):
        """存储一条事实(upsert:存在则更新,不存在则插入)"""
        # 用 category + key + value 拼接成文本做 Embedding
        fact_text = f"{fact.category}: {fact.key} = {fact.value}"
        embedding = await self._embed(fact_text)
        
        await self.db.execute(text("""
            INSERT INTO semantic_memories 
                (user_id, category, fact_key, fact_value, 
                 embedding, confidence, source_quote)
            VALUES 
                (:user_id, :category, :key, :value,
                 :embedding, :confidence, :source_quote)
            ON CONFLICT (user_id, category, fact_key) DO UPDATE SET
                fact_value = :value,
                embedding = :embedding,
                confidence = :confidence,
                source_quote = :source_quote,
                updated_at = NOW()
        """), {
            "user_id": user_id,
            "category": fact.category,
            "key": fact.key,
            "value": fact.value,
            "embedding": str(embedding),
            "confidence": fact.confidence,
            "source_quote": fact.source_quote,
        })
        await self.db.commit()
    
    async def recall(
        self, user_id: str, query: str,
        limit: int = 5, min_similarity: float = 0.7,
    ) -> list[dict]:
        """根据语义相似度召回相关记忆"""
        query_vec = await self._embed(query)
        
        result = await self.db.execute(text("""
            SELECT fact_key, fact_value, category, confidence,
                   1 - (embedding <=> :vec) as similarity
            FROM semantic_memories
            WHERE user_id = :user_id
              AND 1 - (embedding <=> :vec) > :min_sim
            ORDER BY embedding <=> :vec
            LIMIT :limit
        """), {
            "user_id": user_id,
            "vec": str(query_vec),
            "min_sim": min_similarity,
            "limit": limit,
        })
        
        # 更新访问计数和时间
        rows = [dict(r._mapping) for r in result.fetchall()]
        if rows:
            keys = [r["fact_key"] for r in rows]
            await self.db.execute(text("""
                UPDATE semantic_memories 
                SET access_count = access_count + 1,
                    last_accessed_at = NOW()
                WHERE user_id = :user_id AND fact_key = ANY(:keys)
            """), {"user_id": user_id, "keys": keys})
            await self.db.commit()
        
        return rows

在 Agent 对话中集成语义记忆:

python
async def chat_with_memory(
    user_id: str, message: str,
    memory: SemanticMemoryService,
    client: AsyncOpenAI,
) -> str:
    # 1. 召回相关记忆
    memories = await memory.recall(user_id, message, limit=5)
    
    # 2. 格式化为上下文
    memory_context = ""
    if memories:
        memory_context = "\n\n[用户画像 - 来自历史记忆]\n"
        for m in memories:
            memory_context += f"- {m['fact_key']}: {m['fact_value']}\n"
    
    # 3. 调用 LLM
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": (
                "你是一个智能助手,拥有用户的长期记忆。"
                "请利用记忆提供个性化的帮助。"
                f"{memory_context}"
            )},
            {"role": "user", "content": message},
        ],
    )
    return response.choices[0].message.content

Embedding 模型的选型对比:

模型维度每百万 Token中文效果推荐场景
text-embedding-3-small1536$0.02良好成本敏感型(推荐起步)
text-embedding-3-large3072$0.13优秀高精度需求
BGE-M31024免费(本地)优秀私有化部署、中文优先
Cohere embed-v31024$0.10良好多语言场景

💡 语义记忆的数据量通常很小(每用户几十到几百条),pgvector 完全够用——不需要专门部署 Milvus 或 Qdrant。只有当单用户记忆超过 10 万条、且需要毫秒级检索时,才考虑专用向量数据库。用 PG 统一存储关系数据和向量数据,运维成本最低。

5.4 知识更新与冲突解决:用户改了偏好怎么办

语义记忆不是"写入后就不管了"——用户的偏好、技术栈、项目状态都会变化。如果不处理更新,记忆就会变成"过时的错误信息",比没有记忆更糟糕。

冲突场景示例:

  3 月:用户说 \"我用 MySQL\"
  → 记忆写入:database = MySQL ✓

  4 月:用户说 \"我们迁移到 PostgreSQL 了\"
  → 新事实:database = PostgreSQL
  → 旧记忆:database = MySQL
  → 冲突!需要决定:更新?追加?还是两条都保留?

  如果不处理冲突:
  → Agent 下次回答:\"基于你的 MySQL 数据库...\" ← 错误!

知识更新有三种场景,需要不同的处理策略:

三种更新场景:

  ① 覆盖更新(最常见)
  ├── 旧值:database = MySQL
  ├── 新值:database = PostgreSQL
  └── 策略:直接更新(用户明确说\"换了/迁移了\")

  ② 追加更新
  ├── 旧值:framework = FastAPI
  ├── 新值:framework = Flask(另一个项目)
  └── 策略:追加新条目(同一 key 的不同上下文)

  ③ 细化更新
  ├── 旧值:level = 后端开发
  ├── 新值:level = 高级后端开发(3 年经验)
  └── 策略:用新值替换旧值(新值更精确)

核心实现——用 LLM 判断冲突类型并决定更新策略:

python
from pydantic import BaseModel, Field
from enum import Enum

class ConflictAction(str, Enum):
    UPDATE = "update"   # 覆盖旧值
    APPEND = "append"   # 追加新条目
    IGNORE = "ignore"   # 忽略(新信息不可靠)
    MERGE = "merge"     # 合并(新旧信息互补)

class ConflictResolution(BaseModel):
    """冲突解决结果"""
    action: ConflictAction
    resolved_value: str = Field(description="解决后的值")
    reason: str = Field(description="决策理由")

CONFLICT_PROMPT = """请判断以下两条关于同一用户的记忆是否存在冲突,
并决定如何处理。

已有记忆:
- 键:{key}
- 值:{old_value}
- 记录时间:{old_time}

新提取的事实:
- 值:{new_value}
- 原文:{source_quote}

判断规则:
1. 如果新值明确替代旧值(用户说"换了""迁移了""不再用了"),选 UPDATE
2. 如果新值是旧值的补充(不同项目、不同场景),选 APPEND
3. 如果新值比旧值更精确(旧值是新值的子集),选 MERGE
4. 如果新值不可靠(置信度低、可能是误解),选 IGNORE

请输出处理决策:"""

class MemoryConflictResolver:
    """记忆冲突解决器"""
    
    def __init__(self, client: AsyncOpenAI):
        self.client = client
    
    async def resolve(
        self,
        key: str,
        old_value: str,
        old_time: str,
        new_value: str,
        source_quote: str,
    ) -> ConflictResolution:
        """判断冲突类型并给出解决方案"""
        response = await self.client.beta.chat.completions.parse(
            model="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": CONFLICT_PROMPT.format(
                    key=key,
                    old_value=old_value,
                    old_time=old_time,
                    new_value=new_value,
                    source_quote=source_quote,
                ),
            }],
            response_format=ConflictResolution,
            temperature=0,
        )
        return response.choices[0].message.parsed

把冲突解决集成到存储流程中:

python
class SemanticMemoryWithConflict(SemanticMemoryService):
    """带冲突解决的语义记忆服务"""
    
    def __init__(self, db, client):
        super().__init__(db, client)
        self.resolver = MemoryConflictResolver(client)
    
    async def store_fact_safe(self, user_id: str, fact: "ExtractedFact"):
        """安全地存储事实——检测并处理冲突"""
        # 1. 查找是否已有同 key 的记忆
        existing = await self.db.execute(text("""
            SELECT fact_value, created_at 
            FROM semantic_memories
            WHERE user_id = :uid AND category = :cat AND fact_key = :key
        """), {"uid": user_id, "cat": fact.category, "key": fact.key})
        
        row = existing.fetchone()
        
        if row is None:
            # 无冲突,直接写入
            await self.store_fact(user_id, fact)
            return
        
        old_value = row.fact_value
        if old_value == fact.value:
            return  # 值相同,跳过
        
        # 2. 有冲突,调用 LLM 决策
        resolution = await self.resolver.resolve(
            key=fact.key,
            old_value=old_value,
            old_time=str(row.created_at),
            new_value=fact.value,
            source_quote=fact.source_quote,
        )
        
        # 3. 执行决策
        match resolution.action:
            case ConflictAction.UPDATE:
                fact.value = resolution.resolved_value
                await self.store_fact(user_id, fact)  # upsert 覆盖
                
            case ConflictAction.APPEND:
                # 修改 key 为带后缀的新 key
                fact.key = f"{fact.key}_alt_{hash(fact.value) % 1000}"
                await self.store_fact(user_id, fact)
                
            case ConflictAction.MERGE:
                fact.value = resolution.resolved_value
                await self.store_fact(user_id, fact)
                
            case ConflictAction.IGNORE:
                pass  # 不做任何操作

冲突解决的性能优化——不是每次都需要调用 LLM:

冲突解决的分层策略(减少 LLM 调用):

  新事实进入


  ① 规则层(零成本)
  ├── 值完全相同 → 跳过
  ├── 新值是旧值的超集 → 直接 UPDATE
  └── 时间差 < 1 小时 → 直接 UPDATE(同一会话内的修正)

    ▼ 规则层无法判断
  ② LLM 判断层
  └── 调用 gpt-4o-mini 做冲突分析
      → 约 $0.0001/次,只在真正有冲突时触发

💡 冲突解决的核心原则是"新信息优先"——在绝大多数场景下,用户最新说的就是正确的。只有在新信息置信度明显低于旧信息(比如 AI 推测的 vs 用户明确说的)时,才保留旧值。过度保守的冲突策略会导致记忆"更新不了",比"更新错了"更糟糕。

第 5 章核心知识回顾:

概念一句话解释
语义 vs RAGRAG = 全局文档知识,语义记忆 = 用户级个性化知识
事实抽取用 LLM 结构化输出从对话中提取 key-value 事实
向量检索pgvector 存储 Embedding,余弦相似度召回相关记忆
冲突解决规则层快速判断 + LLM 兜底,新信息优先原则
成本控制先规则判断是否需要抽取/解决冲突,减少 70%+ 的 LLM 调用

6. 记忆压缩与整理:防止记忆膨胀

前两章分别介绍了情景记忆和语义记忆的存储方案。但如果只管"存"不管"理",记忆系统会像一个从不整理的衣柜——塞满了东西却找不到你要的那件。本章聚焦记忆的生命周期管理:压缩、合并、去重、遗忘。

6.1 记忆膨胀问题:存得越多 ≠ 记得越好

一个活跃用户每天对话 20 轮,每轮产生约 5 条事件记录。不做任何清理的话:

记忆膨胀的量化分析:

  ┌──────────────────────────────────────────────────┐
  │  假设:日活 1000 用户,每人每天 20 轮对话         │
  │                                                   │
  │  Episodic Memory(情景记忆)                      │
  │  ───────────────────────────────                  │
  │  每天新增事件:1000 × 20 × 2 = 40,000 条          │
  │  每月:         120 万条                           │
  │  一年:         1,440 万条                         │
  │                                                   │
  │  Semantic Memory(语义记忆)                      │
  │  ────────────────────────────                     │
  │  每天新增事实:1000 × 3 = 3,000 条(假设每天 3 条)│
  │  每月:         9 万条                             │
  │  一年:         108 万条                           │
  │  含重复/过时/矛盾的比例:约 30-50%                │
  ├──────────────────────────────────────────────────┤
  │  不清理的后果:                                    │
  │                                                   │
  │  ① 检索变慢 ─── 向量搜索在百万级数据上从 10ms     │
  │                 退化到 200ms+                      │
  │                                                   │
  │  ② 噪声增多 ─── 旧的错误记忆被召回,干扰回答      │
  │     用户:\"我已经换成 React 了\"                   │
  │     记忆里还有 5 条关于 Vue 的旧记录               │
  │     → 召回时旧记忆数量压倒新记忆                   │
  │                                                   │
  │  ③ 成本飙升 ─── 向量存储按量计费                   │
  │     百万条 × 1536 维 × 4 字节 ≈ 6 GB 向量数据      │
  │     加上索引和元数据,存储成本不可忽视              │
  └──────────────────────────────────────────────────┘

记忆管理的核心原则——像人类一样"有选择地遗忘"

人类记忆 vs Agent 记忆的管理类比:

  人类大脑每天做的事:
  ├── 睡眠时整理白天的记忆(压缩、归档)
  ├── 重要的强化(反复回忆的越记越牢)
  ├── 不重要的衰减(几天没想起的逐渐模糊)
  └── 矛盾的覆盖(新认知替代旧认知)

  Agent 记忆系统应该做的:
  ├── 定期压缩(渐进式摘要)          → 6.2
  ├── 重复的合并(同类记忆去重)       → 6.3
  ├── 过时的淘汰(时间衰减 + 访问频率)→ 6.4
  └── 主动反思(让 Agent 自己整理)    → 6.5

💡 记忆管理的目标不是"存得最多",而是"召回最准"——100 条精准的记忆胜过 10,000 条含噪声的记忆。好的记忆系统应该像一个优秀的秘书:记住重要的事,忘掉琐碎的事,更新过时的事。

6.2 压缩策略一:渐进式摘要(Progressive Summarization)

渐进式摘要的思路是分层压缩——原始对话 → 会话摘要 → 周期摘要,每层的信息密度更高、数据量更小:

渐进式摘要的三层压缩:

  Layer 0:原始事件流(Episodic Memory)
  ┌───────────────────────────────────────────┐
  │  20 条消息,约 4000 tokens                 │
  │  "我叫李明..." "用FastAPI..." "订单表..."   │
  └──────────────────┬────────────────────────┘
                     │ 会话结束时压缩

  Layer 1:会话摘要(Session Summary)
  ┌───────────────────────────────────────────┐
  │  1 段摘要,约 200 tokens                   │
  │  "用户李明讨论了电商项目的订单表设计,       │
  │   决定用 JSONB 存储扩展属性"               │
  └──────────────────┬────────────────────────┘
                     │ 每周/每月压缩

  Layer 2:周期摘要(Period Summary)
  ┌───────────────────────────────────────────┐
  │  1 段摘要,约 100 tokens                   │
  │  "4 月第 1 周:完成订单模块设计和性能优化"  │
  └───────────────────────────────────────────┘

  压缩比:4000 → 200 → 100 tokens
  信息保留率:100% → 30% → 10%
  但关键决策和事实的保留率:~95%

Layer 0 → Layer 1:会话结束时自动生成摘要

python
SESSION_SUMMARY_PROMPT = """请为以下对话生成一段简洁摘要。

要求:
1. 保留所有关键决策和结论(不是讨论过程)
2. 保留具体的技术术语、数值、配置名
3. 用第三人称描述("用户...")
4. 控制在 100-200 字以内

对话内容:
{conversation}"""

async def summarize_session(
    client: AsyncOpenAI,
    events: list[dict],
) -> str:
    """会话结束时生成摘要"""
    conversation = "\n".join(
        f"{e['event_type']}: {e['content']}" for e in events
    )
    
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": SESSION_SUMMARY_PROMPT.format(
                conversation=conversation
            ),
        }],
        temperature=0,
        max_tokens=300,
    )
    return response.choices[0].message.content

Layer 1 → Layer 2:定期归档老旧的会话摘要

python
PERIOD_SUMMARY_PROMPT = """请将以下多个会话摘要合并为一段周期性总结。

要求:
1. 按主题(而非时间)组织信息
2. 合并重复内容,保留最终结论
3. 标注关键的里程碑和决策
4. 控制在 200 字以内

会话摘要列表:
{summaries}"""

async def archive_old_sessions(
    db: AsyncSession,
    client: AsyncOpenAI,
    user_id: str,
    older_than_days: int = 30,
):
    """归档 N 天前的会话摘要为周期性总结"""
    cutoff = datetime.utcnow() - timedelta(days=older_than_days)
    
    # 1. 查出所有需要归档的会话摘要
    result = await db.execute(text("""
        SELECT id, summary, started_at 
        FROM sessions
        WHERE user_id = :uid 
          AND started_at < :cutoff
          AND archived = false
          AND summary IS NOT NULL
        ORDER BY started_at
    """), {"uid": user_id, "cutoff": cutoff})
    
    sessions = result.fetchall()
    if len(sessions) < 3:
        return  # 太少了,不值得压缩
    
    # 2. 合并为周期摘要
    summaries_text = "\n".join(
        f"[{s.started_at:%m-%d}] {s.summary}" for s in sessions
    )
    
    period_summary = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": PERIOD_SUMMARY_PROMPT.format(
                summaries=summaries_text
            ),
        }],
        temperature=0,
    )
    
    # 3. 写入周期摘要表
    await db.execute(text("""
        INSERT INTO period_summaries (user_id, period_start, period_end, summary)
        VALUES (:uid, :start, :end, :summary)
    """), {
        "uid": user_id,
        "start": sessions[0].started_at,
        "end": sessions[-1].started_at,
        "summary": period_summary.choices[0].message.content,
    })
    
    # 4. 标记原始会话为已归档(可选:删除原始事件)
    session_ids = [s.id for s in sessions]
    await db.execute(text("""
        UPDATE sessions SET archived = true 
        WHERE id = ANY(:ids)
    """), {"ids": session_ids})
    
    await db.commit()

💡 渐进式摘要的核心是"每层只保留上层 10-30% 的信息"——但这 10-30% 是最关键的决策和事实。原始事件流可以在归档后删除或移入冷存储,只保留摘要在热存储中。这样既节省了存储成本,又保证了检索质量。

6.3 压缩策略二:记忆合并与去重

渐进式摘要针对的是情景记忆(事件流)。语义记忆(事实库)的膨胀问题不太一样——不是"太长",而是"太重复"。

语义记忆的重复问题示例:

  同一个用户的记忆库里可能出现:
  ┌─────────────────────────────────────────────┐
  │  #1  技术栈 = FastAPI          (3月10日)     │
  │  #2  后端框架 = FastAPI        (3月15日)     │  ← 重复!
  │  #3  Web框架 = FastAPI + Uvicorn (3月20日)   │  ← 重复但更详细
  │  #4  数据库 = PostgreSQL       (3月10日)     │
  │  #5  DB选型 = PG               (4月1日)     │  ← 重复!
  └─────────────────────────────────────────────┘

  问题:5 条记忆中有 3 条是重复的
  → 检索时浪费 Top-K 名额
  → 不同措辞可能导致不一致的召回结果

去重的核心思路:用向量相似度发现候选重复对,再用 LLM 判断是否合并

python
async def deduplicate_memories(
    db: AsyncSession,
    client: AsyncOpenAI,
    user_id: str,
    similarity_threshold: float = 0.85,
):
    """检测并合并重复的语义记忆"""
    # 1. 找出所有高相似度的记忆对
    result = await db.execute(text("""
        SELECT a.id as id_a, b.id as id_b,
               a.fact_key as key_a, b.fact_key as key_b,
               a.fact_value as val_a, b.fact_value as val_b,
               a.updated_at as time_a, b.updated_at as time_b,
               1 - (a.embedding <=> b.embedding) as similarity
        FROM semantic_memories a
        JOIN semantic_memories b ON a.id < b.id  -- 避免自比和重复对
        WHERE a.user_id = :uid AND b.user_id = :uid
          AND 1 - (a.embedding <=> b.embedding) > :threshold
        ORDER BY similarity DESC
        LIMIT 50
    """), {"uid": user_id, "threshold": similarity_threshold})
    
    pairs = result.fetchall()
    
    # 2. 逐对判断是否合并
    for pair in pairs:
        resolution = await _should_merge(client, pair)
        
        if resolution["action"] == "merge":
            # 保留更新的那条,删除旧的
            keep_id = pair.id_b if pair.time_b > pair.time_a else pair.id_a
            delete_id = pair.id_a if keep_id == pair.id_b else pair.id_b
            
            # 更新保留条目的值为合并后的值
            await db.execute(text("""
                UPDATE semantic_memories 
                SET fact_value = :merged_value, updated_at = NOW()
                WHERE id = :keep_id
            """), {
                "merged_value": resolution["merged_value"],
                "keep_id": keep_id,
            })
            
            # 删除冗余条目
            await db.execute(text(
                "DELETE FROM semantic_memories WHERE id = :del_id"
            ), {"del_id": delete_id})
    
    await db.commit()

async def _should_merge(client: AsyncOpenAI, pair) -> dict:
    """让 LLM 判断两条记忆是否应该合并"""
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": f"""两条记忆是否描述同一件事?如果是,请合并。

记忆 A:{pair.key_a} = {pair.val_a}{pair.time_a}
记忆 B:{pair.key_b} = {pair.val_b}{pair.time_b}
相似度:{pair.similarity:.2f}

请回答 JSON 格式:
&#123;&#123;"action": "merge" 或 "keep_both", "merged_value": "合并后的值"&#125;&#125;""",
        }],
        temperature=0,
    )
    import json
    return json.loads(response.choices[0].message.content)

去重的执行策略:

策略触发时机优点缺点
实时去重每次写入新事实时检查记忆始终干净写入延迟增加
定时去重每天凌晨批量执行不影响在线延迟白天可能有短暂重复
阈值去重记忆条数超过阈值时自适应,低频用户不浪费实现稍复杂

💡 推荐"写入时快速检查 + 定时批量深度去重"的组合——写入时用简单的 key 精确匹配做快速去重(零成本),每天用向量相似度做深度去重(发现不同措辞的重复)。这样既保证了实时写入性能,又保证了记忆库的长期整洁。

6.4 遗忘机制:基于时间衰减与访问频率的淘汰策略

人类的记忆会自然遗忘——越久没回忆的信息,记忆强度越低。Agent 的记忆系统也需要类似的机制,否则早期的低价值记忆会永远占着位置。

核心思路:为每条记忆计算一个"重要性分数",定期淘汰低分记忆

记忆重要性评分公式(灵感来自艾宾浩斯遗忘曲线):

  score = base_importance × recency_factor × frequency_factor

  其中:
  ├── base_importance : 记忆本身的重要性(由 confidence 表示)
  │                     用户明确说的 = 1.0,AI 推测的 = 0.5-0.8

  ├── recency_factor  : 时间衰减因子
  │                     = exp(-decay_rate × days_since_last_access)
  │                     最近访问的 ≈ 1.0,30 天没访问的 ≈ 0.3

  └── frequency_factor: 访问频率因子
                        = log(1 + access_count) / log(1 + max_access)
                        经常被召回的 → 更重要

代码实现:

python
import math
from datetime import datetime, timedelta

def calculate_memory_score(
    confidence: float,
    last_accessed_at: datetime,
    access_count: int,
    max_access: int = 100,
    decay_rate: float = 0.05,
) -> float:
    """计算记忆的重要性分数(0-1)"""
    # 时间衰减:距上次访问的天数越多,分数越低
    days_since = (datetime.utcnow() - last_accessed_at).days
    recency = math.exp(-decay_rate * days_since)
    
    # 访问频率:被召回越多次说明越重要
    frequency = math.log(1 + access_count) / math.log(1 + max(max_access, 1))
    frequency = min(frequency, 1.0)
    
    # 综合评分
    score = confidence * 0.4 + recency * 0.4 + frequency * 0.2
    return round(score, 4)

# 示例:
# 用户明确说的、昨天刚访问过、被召回 20 次 → 0.92
# AI 推测的、30 天没访问、只被召回 1 次 → 0.23

自动清理任务:

python
async def cleanup_stale_memories(
    db: AsyncSession,
    user_id: str,
    max_memories: int = 200,      # 每用户最多保留 200 条
    min_score: float = 0.15,      # 低于此分数直接删除
    inactive_days: int = 90,      # 90 天未访问的候选删除
):
    """清理低价值记忆"""
    # 1. 删除长期未访问且低置信度的记忆
    await db.execute(text("""
        DELETE FROM semantic_memories
        WHERE user_id = :uid
          AND last_accessed_at < :cutoff
          AND confidence < 0.5
    """), {
        "uid": user_id,
        "cutoff": datetime.utcnow() - timedelta(days=inactive_days),
    })
    
    # 2. 如果仍然超过上限,按分数排序淘汰
    count = await db.scalar(text(
        "SELECT COUNT(*) FROM semantic_memories WHERE user_id = :uid"
    ), {"uid": user_id})
    
    if count > max_memories:
        # 保留 Top N,删除其余
        await db.execute(text("""
            DELETE FROM semantic_memories
            WHERE id IN (
                SELECT id FROM semantic_memories
                WHERE user_id = :uid
                ORDER BY 
                    confidence * 0.4 
                    + EXP(-0.05 * EXTRACT(DAY FROM NOW() - last_accessed_at)) * 0.4
                    + LN(1 + access_count) / LN(101) * 0.2
                ASC
                LIMIT :delete_count
            )
        """), {"uid": user_id, "delete_count": count - max_memories})
    
    await db.commit()

不同类型记忆的遗忘策略差异:

记忆类型衰减速度最低保留期说明
personal(姓名、身份)极慢永久核心身份信息几乎不变
preference(偏好)中等60 天偏好可能改变
technical(技术选型)中等90 天技术栈会迭代
decision(已做决定)180 天重要决策需要长期追溯
project(项目信息)30 天项目状态频繁变化

💡 不要对所有记忆一视同仁——用户的名字应该"永不遗忘",但"用户上次问了 Python 2 还是 3" 这种信息 30 天后就可以清理了。在 semantic_memories 表的 category 字段上设置不同的衰减参数,是低成本但高回报的优化。

6.5 反思循环(Reflection Loop):让 Agent 自己整理记忆

前面的压缩、去重、遗忘都是"被动式"管理——按规则自动执行。反思循环是更高级的策略:让 Agent 主动审视自己的记忆库,发现矛盾、提炼规律、生成新的高层次知识

反思循环的灵感——人类的"睡眠整理":

  人在睡眠时,大脑会:
  ├── 回放白天的经历(海马体 → 皮层)
  ├── 发现不同经历之间的关联
  ├── 提炼出规律和模式
  └── 巩固重要的、丢弃琐碎的

  Agent 的反思循环(定时任务):
  ├── 回顾最近的记忆
  ├── 发现矛盾和不一致
  ├── 从多条记忆中提炼出更高层次的洞察
  └── 清理已被高层洞察覆盖的低层记忆

核心实现:

python
REFLECTION_PROMPT = """你是一个记忆管理助手。请审视以下用户的记忆库,
完成三个任务:

1. **发现矛盾**:找出相互矛盾的记忆条目
2. **提炼洞察**:从多条记忆中归纳出更高层次的结论
3. **标记清理**:标记可以安全删除的低价值条目

用户记忆库:
{memories}

请以 JSON 格式输出:
&#123;&#123;
  "contradictions": [
    &#123;&#123;"memory_ids": [1, 3], "description": "矛盾描述", 
     "resolution": "建议保留哪条"&#125;&#125;
  ],
  "insights": [
    &#123;&#123;"derived_from": [2, 4, 5], "insight": "归纳出的新知识",
     "category": "分类"&#125;&#125;
  ],
  "cleanup_ids": [6, 7]
&#125;&#125;"""

async def run_reflection(
    db: AsyncSession,
    client: AsyncOpenAI,
    user_id: str,
):
    """执行一次反思循环"""
    # 1. 加载该用户的所有记忆
    result = await db.execute(text("""
        SELECT id, category, fact_key, fact_value, 
               confidence, access_count, updated_at
        FROM semantic_memories
        WHERE user_id = :uid
        ORDER BY updated_at DESC
        LIMIT 100
    """), {"uid": user_id})
    
    memories = result.fetchall()
    if len(memories) < 10:
        return  # 记忆太少,不需要反思
    
    # 2. 格式化记忆列表
    memories_text = "\n".join(
        f"[{m.id}] {m.category}/{m.fact_key} = {m.fact_value} "
        f"(置信度:{m.confidence}, 访问:{m.access_count}次, "
        f"更新:{m.updated_at:%m-%d})"
        for m in memories
    )
    
    # 3. 调用 LLM 做反思
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": REFLECTION_PROMPT.format(
                memories=memories_text
            ),
        }],
        temperature=0,
        response_format={"type": "json_object"},
    )
    
    import json
    reflection = json.loads(response.choices[0].message.content)
    
    # 4. 处理矛盾——解决冲突
    for contradiction in reflection.get("contradictions", []):
        # 具体处理逻辑复用 5.4 的冲突解决器
        pass
    
    # 5. 写入新洞察
    for insight in reflection.get("insights", []):
        # 将高层洞察作为新的语义记忆写入
        fact_text = insight["insight"]
        embedding = await _embed(client, fact_text)
        await db.execute(text("""
            INSERT INTO semantic_memories 
                (user_id, category, fact_key, fact_value,
                 embedding, confidence)
            VALUES (:uid, :cat, :key, :val, :emb, 0.85)
            ON CONFLICT (user_id, category, fact_key) DO UPDATE
            SET fact_value = :val, updated_at = NOW()
        """), {
            "uid": user_id,
            "cat": insight["category"],
            "key": f"insight_{hash(fact_text) % 10000}",
            "val": fact_text,
            "emb": str(embedding),
        })
    
    # 6. 清理标记的低价值记忆
    cleanup_ids = reflection.get("cleanup_ids", [])
    if cleanup_ids:
        await db.execute(text(
            "DELETE FROM semantic_memories WHERE id = ANY(:ids)"
        ), {"ids": cleanup_ids})
    
    await db.commit()

反思循环的执行频率建议:

用户活跃度反思频率说明
高活跃(日均 20+ 轮)每天 1 次记忆增长快,需要频繁整理
中活跃(日均 5-20 轮)每周 1 次平衡整理质量和 LLM 成本
低活跃(日均 <5 轮)每月 1 次记忆增长慢,不需要频繁整理

💡 反思循环是记忆系统的"高级功能",不是必需品——如果你的 Agent 用户量不大、对话轮数不多,前面的去重 + 遗忘已经够用。反思循环主要用于高活跃度、长期使用的场景,让 Agent 的记忆从"堆积事实"进化为"理解用户"。

第 6 章核心知识回顾:

概念一句话解释
记忆膨胀不清理的记忆导致检索变慢、噪声增多、成本飙升
渐进式摘要原始事件 → 会话摘要 → 周期摘要,三层逐步压缩
记忆去重向量相似度发现候选重复 + LLM 判断是否合并
遗忘机制基于 confidence × 时间衰减 × 访问频率的评分淘汰
反思循环Agent 主动审视记忆:发现矛盾、提炼洞察、清理冗余

7. 图记忆(Graph Memory):理解实体间的关系

前面的语义记忆把每条事实独立存储为一个向量——这在查"用户喜欢什么"时很好用,但遇到实体间的关系推理就力不从心了。图记忆用知识图谱来补齐这个能力:不仅记住事实,还记住事实之间的关联。

7.1 为什么需要图记忆:向量搜索搞不定的关系推理

向量搜索的核心能力是语义相似度匹配——给一个 query,找到最像的记忆。但有一类问题,向量搜索天生搞不定:

向量搜索失败的典型场景:

  记忆库中有以下事实:
  ① 用户李明 → 负责 → 电商项目
  ② 电商项目 → 使用 → FastAPI
  ③ 电商项目 → 数据库 → PostgreSQL
  ④ 张伟 → 同事 → 李明
  ⑤ 张伟 → 负责 → 支付模块

  用户问:"张伟用的什么数据库?"

  ┌─── 向量搜索 ──────────────────────────────────┐
  │  query: "张伟 数据库"                           │
  │  → 最相似: ③ "电商项目 → PostgreSQL"(0.72)   │
  │  → 但搜索不知道张伟和电商项目有关系!           │
  │  → 无法回答                                    │
  └────────────────────────────────────────────────┘

  ┌─── 图谱推理 ──────────────────────────────────┐
  │  起点: 张伟                                    │
  │  → 张伟 --同事-→ 李明                         │
  │  → 李明 --负责-→ 电商项目                     │
  │  → 电商项目 --数据库-→ PostgreSQL              │
  │  → 答案: "张伟的同事李明负责的电商项目用 PG"   │
  │  → 通过 3 跳关系推理得出答案 ✓                 │
  └────────────────────────────────────────────────┘

向量搜索 vs 图谱推理的能力对比:

查询类型向量搜索图谱推理
属性查询 "李明用什么数据库"✅ 直接匹配✅ 直接查
关系查询 "谁和李明是同事"⚠️ 能查到但不精确✅ 精确遍历
多跳推理 "张伟项目用的什么技术"❌ 无法推理✅ 多跳遍历
路径查询 "从 A 到 B 怎么关联"❌ 不支持✅ 最短路径
模糊语义 "像 FastAPI 一样的框架"✅ 语义召回❌ 不擅长
全量扫描 "所有用 Python 的项目"⚠️ 效率低✅ 索引高效

两者的关系是互补而非替代

向量记忆 + 图记忆的分工:

  向量记忆(Semantic Memory)
  ├── 擅长:模糊语义匹配、开放式查询
  ├── 数据:扁平的 key-value 事实
  └── 适合:"帮我推荐类似 FastAPI 的框架"

  图记忆(Graph Memory)
  ├── 擅长:关系遍历、多跳推理、路径发现
  ├── 数据:实体 + 关系的网络结构
  └── 适合:"张伟负责的模块依赖哪些服务"

  最佳实践:先用向量召回候选记忆,
           再用图谱补充关系上下文

💡 如果你的 Agent 场景不涉及"多个实体之间的关系"(比如只做单用户画像),图记忆可以跳过——向量记忆足够。图记忆主要用于:团队协作场景、项目管理 Agent、CRM 客户关系管理、多角色 RPG 等需要理解"谁和谁有什么关系"的场景。

7.2 实体关系抽取:从对话中构建知识图谱

知识图谱的基本单元是三元组(Triple):(主体, 关系, 客体)。从对话中提取三元组,就能逐步构建出用户的知识图谱。

三元组示例:

  对话:"我叫李明,和张伟一起做电商项目,用的 FastAPI"

  提取的三元组:
  ┌──────────────────────────────────────────┐
  │  (李明, is_a, 用户)                       │
  │  (李明, works_with, 张伟)                 │
  │  (李明, works_on, 电商项目)               │
  │  (电商项目, uses, FastAPI)                │
  └──────────────────────────────────────────┘

  可视化为图结构:

      张伟 ←─works_with─→ 李明

                       works_on


                       电商项目

                         uses


                        FastAPI

核心实现——用结构化输出抽取三元组:

python
from pydantic import BaseModel, Field

class Triple(BaseModel):
    """知识图谱三元组"""
    subject: str = Field(description="主体实体名称")
    subject_type: str = Field(
        description="主体类型:person/project/technology/organization"
    )
    relation: str = Field(
        description="关系类型:uses/works_on/works_with/manages/depends_on/..."
    )
    object: str = Field(description="客体实体名称")
    object_type: str = Field(description="客体类型")

class TripleExtractionResult(BaseModel):
    """三元组抽取结果"""
    triples: list[Triple] = Field(default_factory=list)

TRIPLE_PROMPT = """请从以下对话中提取实体和关系,构建知识图谱三元组。

提取规则:
1. 实体类型:person(人)、project(项目)、technology(技术/框架/工具)、
   organization(公司/团队)、concept(抽象概念)
2. 关系类型:uses(使用)、works_on(负责)、works_with(合作)、
   manages(管理)、depends_on(依赖)、prefers(偏好)、
   deployed_on(部署于)、belongs_to(属于)
3. 只提取明确提到的关系,不要推测
4. 实体名称尽量统一(如 PG 统一写为 PostgreSQL)

对话内容:
{conversation}

已有的实体(保持命名一致):
{existing_entities}"""

class TripleExtractor:
    """从对话中抽取知识图谱三元组"""
    
    def __init__(self, client: AsyncOpenAI):
        self.client = client
    
    async def extract(
        self,
        conversation: list[dict],
        existing_entities: list[str] | None = None,
    ) -> list[Triple]:
        conv_text = "\n".join(
            f"{m['role']}: {m['content']}" for m in conversation
        )
        entities_text = ", ".join(existing_entities) if existing_entities else "无"
        
        response = await self.client.beta.chat.completions.parse(
            model="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": TRIPLE_PROMPT.format(
                    conversation=conv_text,
                    existing_entities=entities_text,
                ),
            }],
            response_format=TripleExtractionResult,
            temperature=0,
        )
        
        return response.choices[0].message.parsed.triples

使用示例:

python
extractor = TripleExtractor(client)

conversation = [
    {"role": "user", "content": "我叫李明,和张伟一起做电商项目"},
    {"role": "assistant", "content": "用什么技术栈呢?"},
    {"role": "user", "content": "FastAPI + PostgreSQL,部署在阿里云 ECS"},
]

triples = await extractor.extract(conversation)
# [
#   Triple(subject="李明", subject_type="person",
#          relation="works_with", object="张伟", object_type="person"),
#   Triple(subject="李明", subject_type="person",
#          relation="works_on", object="电商项目", object_type="project"),
#   Triple(subject="电商项目", subject_type="project",
#          relation="uses", object="FastAPI", object_type="technology"),
#   Triple(subject="电商项目", subject_type="project",
#          relation="uses", object="PostgreSQL", object_type="technology"),
#   Triple(subject="电商项目", subject_type="project",
#          relation="deployed_on", object="阿里云ECS", object_type="technology"),
# ]

💡 三元组抽取和事实抽取(5.2)可以合并为同一次 LLM 调用——在 Prompt 中同时要求返回 facts(扁平事实)和 triples(关系三元组),一次调用完成两种记忆的原始数据提取,节省一半的 API 成本。

7.3 图存储方案:Neo4j / NetworkX / 轻量级 JSON 图

三元组提取出来了,用什么存?根据项目规模,有三种方案:

方案适用规模优点缺点推荐场景
NetworkX每用户 <1万 节点零依赖、纯 Python、内存操作极快不持久化、大图吃内存MVP / 中小项目(推荐起步)
轻量 JSON 图每用户 <1千 节点零依赖、直接存 PG JSONB查询能力有限极简场景
Neo4j百万级节点专业图数据库、Cypher 查询强大额外运维、学习成本企业级 / 复杂关系网络

推荐方案:NetworkX + JSON 持久化——对大多数 Agent 来说,每个用户的知识图谱不会超过几百个节点,NetworkX 完全够用。用 JSON 序列化存入数据库即可持久化。

完整实现:

python
import networkx as nx
import json
from datetime import datetime

class GraphMemory:
    """基于 NetworkX 的图记忆"""
    
    def __init__(self):
        self.graph = nx.DiGraph()  # 有向图
    
    def add_triple(self, triple: "Triple"):
        """添加一条三元组"""
        # 添加节点(带类型属性)
        self.graph.add_node(
            triple.subject, 
            type=triple.subject_type,
            updated_at=datetime.utcnow().isoformat(),
        )
        self.graph.add_node(
            triple.object, 
            type=triple.object_type,
            updated_at=datetime.utcnow().isoformat(),
        )
        
        # 添加边(关系)
        self.graph.add_edge(
            triple.subject, 
            triple.object,
            relation=triple.relation,
            created_at=datetime.utcnow().isoformat(),
        )
    
    def get_neighbors(self, entity: str, max_depth: int = 2) -> dict:
        """获取实体的邻居(支持多跳)"""
        if entity not in self.graph:
            return {"entity": entity, "found": False}
        
        result = {
            "entity": entity,
            "type": self.graph.nodes[entity].get("type", "unknown"),
            "relations": [],
        }
        
        # BFS 遍历到指定深度
        visited = {entity}
        queue = [(entity, 0)]
        
        while queue:
            node, depth = queue.pop(0)
            if depth >= max_depth:
                continue
            
            # 出边(该实体指向的)
            for _, target, data in self.graph.out_edges(node, data=True):
                result["relations"].append({
                    "from": node,
                    "relation": data["relation"],
                    "to": target,
                    "to_type": self.graph.nodes[target].get("type"),
                    "depth": depth + 1,
                })
                if target not in visited:
                    visited.add(target)
                    queue.append((target, depth + 1))
            
            # 入边(指向该实体的)
            for source, _, data in self.graph.in_edges(node, data=True):
                result["relations"].append({
                    "from": source,
                    "relation": data["relation"],
                    "to": node,
                    "from_type": self.graph.nodes[source].get("type"),
                    "depth": depth + 1,
                })
                if source not in visited:
                    visited.add(source)
                    queue.append((source, depth + 1))
        
        return result
    
    def query_by_relation(self, relation: str) -> list[dict]:
        """查询某种关系的所有实例"""
        return [
            {"from": u, "to": v, **data}
            for u, v, data in self.graph.edges(data=True)
            if data.get("relation") == relation
        ]
    
    def find_path(self, source: str, target: str) -> list[str] | None:
        """查找两个实体之间的最短路径"""
        try:
            path = nx.shortest_path(
                self.graph, source, target
            )
            return path
        except nx.NetworkXNoPath:
            return None
    
    # ─── 持久化:序列化为 JSON ───
    
    def to_json(self) -> str:
        """序列化为 JSON(存入数据库)"""
        data = nx.node_link_data(self.graph)
        return json.dumps(data, ensure_ascii=False, default=str)
    
    @classmethod
    def from_json(cls, json_str: str) -> "GraphMemory":
        """从 JSON 反序列化"""
        gm = cls()
        if json_str:
            data = json.loads(json_str)
            gm.graph = nx.node_link_graph(data, directed=True)
        return gm

存入数据库的方式:

python
# 在 semantic_memories 同级,为每个用户存一个图
# 直接用 JSONB 字段存 NetworkX 序列化结果

async def save_graph(db: AsyncSession, user_id: str, graph: GraphMemory):
    """将图记忆持久化到数据库"""
    await db.execute(text("""
        INSERT INTO user_graphs (user_id, graph_data, updated_at)
        VALUES (:uid, :data, NOW())
        ON CONFLICT (user_id) DO UPDATE
        SET graph_data = :data, updated_at = NOW()
    """), {"uid": user_id, "data": graph.to_json()})
    await db.commit()

async def load_graph(db: AsyncSession, user_id: str) -> GraphMemory:
    """从数据库加载图记忆"""
    result = await db.scalar(text(
        "SELECT graph_data FROM user_graphs WHERE user_id = :uid"
    ), {"uid": user_id})
    return GraphMemory.from_json(result) if result else GraphMemory()

💡 NetworkX 图存在内存中,查询速度是微秒级——对于每用户几百个节点的场景,BFS 遍历、最短路径等操作不到 1ms。唯一需要注意的是在高并发场景下要做好用户级别的图缓存(比如用 Redis 缓存序列化的 JSON),避免每次请求都从数据库加载。

7.4 混合检索:向量召回 + 图谱推理的联合查询

实际使用中,向量记忆和图记忆最好组合使用——先用向量搜索找到相关的实体,再用图谱展开这些实体的关系网络,最后把两部分信息一起注入上下文。

混合检索的工作流程:

  用户问:"帮我优化电商项目的数据库性能"


  ① 向量检索(Semantic Memory)
  │  → 召回事实:
  │    - database = PostgreSQL(相似度 0.91)
  │    - project_type = 电商后端(0.85)
  │    - deployment = 阿里云 ECS(0.78)


  ② 实体识别(从召回结果中提取实体名)
  │  → 实体列表:["电商项目", "PostgreSQL", "阿里云ECS"]


  ③ 图谱展开(Graph Memory)
  │  → 以"电商项目"为起点,2 跳遍历:
  │    - 电商项目 --uses-→ FastAPI
  │    - 电商项目 --uses-→ PostgreSQL
  │    - 电商项目 --deployed_on-→ 阿里云ECS
  │    - 李明 --works_on-→ 电商项目
  │    - 张伟 --works_with-→ 李明


  ④ 组装上下文
     → 向量召回的事实 + 图谱展开的关系网络
     → 一起注入 System Prompt

完整代码实现:

python
class HybridMemoryRetriever:
    """混合检索器:向量 + 图谱"""
    
    def __init__(
        self,
        semantic_memory: SemanticMemoryService,
        graph_memory: GraphMemory,
    ):
        self.semantic = semantic_memory
        self.graph = graph_memory
    
    async def retrieve(
        self, user_id: str, query: str,
        vector_top_k: int = 5,
        graph_depth: int = 2,
    ) -> str:
        """混合检索,返回格式化的记忆上下文"""
        # 1. 向量检索
        facts = await self.semantic.recall(
            user_id, query, limit=vector_top_k
        )
        
        # 2. 从召回的事实中提取实体名
        entities = set()
        for fact in facts:
            entities.add(fact["fact_value"])
            # fact_key 可能也是实体名(如 "database" 不是,但 "project" 可能是)
        
        # 3. 图谱展开
        graph_context = []
        for entity in entities:
            neighbors = self.graph.get_neighbors(entity, max_depth=graph_depth)
            if neighbors.get("found", True) and neighbors.get("relations"):
                graph_context.append(neighbors)
        
        # 4. 格式化输出
        return self._format_context(facts, graph_context)
    
    def _format_context(
        self, facts: list[dict], graph_context: list[dict]
    ) -> str:
        """将检索结果格式化为 LLM 可理解的上下文"""
        parts = []
        
        # 向量记忆部分
        if facts:
            parts.append("[用户画像]")
            for f in facts:
                parts.append(f"- {f['fact_key']}: {f['fact_value']}")
        
        # 图谱部分
        if graph_context:
            parts.append("\n[关系网络]")
            for ctx in graph_context:
                for rel in ctx["relations"][:10]:  # 限制数量
                    parts.append(
                        f"- {rel['from']} --{rel['relation']}--> {rel['to']}"
                    )
        
        return "\n".join(parts) if parts else ""

在 Agent 中使用混合检索:

python
async def chat_with_hybrid_memory(
    user_id: str, message: str,
    retriever: HybridMemoryRetriever,
    client: AsyncOpenAI,
) -> str:
    # 一次调用获取所有记忆上下文
    memory_context = await retriever.retrieve(user_id, message)
    
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": (
                "你是一个智能助手,拥有用户的长期记忆和关系网络。\n"
                "请利用以下记忆信息提供个性化帮助。\n\n"
                f"{memory_context}"
            )},
            {"role": "user", "content": message},
        ],
    )
    return response.choices[0].message.content

混合检索 vs 单一检索的效果对比:

查询场景仅向量仅图谱混合检索
"我叫什么名字"✅ 直接命中✅ 查节点
"我用什么数据库"✅ 直接命中✅ 查属性
"谁和我一起做项目"⚠️ 语义模糊✅ 精确遍历✅ 图谱主导
"张伟的项目用什么技术"❌ 无法推理✅ 多跳遍历✅ 图谱主导
"推荐类似 FastAPI 的框架"✅ 语义匹配❌ 不擅长✅ 向量主导
"我的项目整体技术栈"⚠️ 零散召回✅ 完整遍历✅ 互补增强

💡 混合检索的额外延迟很小——向量检索 ~30ms + 图谱遍历 ~0.1ms = 总计 ~30ms。图谱查询因为在内存中运行(NetworkX),几乎不增加延迟。成本方面也没有额外的 LLM 调用。是一个"低成本高收益"的能力增强。

第 7 章核心知识回顾:

概念一句话解释
图记忆的定位补齐向量搜索无法做的关系推理和多跳查询
三元组知识图谱的基本单元:(主体, 关系, 客体)
推荐存储方案NetworkX + JSON 持久化,适合绝大多数 Agent 场景
混合检索向量召回实体 → 图谱展开关系 → 组装上下文
适用场景多人协作、项目管理、CRM 等涉及实体关系的场景

8. 主流框架实战:Mem0 / Letta / LangGraph

前面几章从零构建了记忆系统的各个模块。但在实际项目中,很多团队会选择使用现成的记忆框架来加速开发。本章介绍三个最主流的方案,帮你选出最适合的。

8.1 Mem0:开箱即用的多层记忆中间件

Mem0(读作"mem-zero")是目前最流行的 Agent 记忆中间件。它的核心理念是**"把对话扔给我,记忆的事我来搞定"**——自动从对话中提取事实、存储为向量、处理冲突和去重。

Mem0 的架构:

  你的 Agent 代码

    │  client.add(messages, user_id="xxx")

  ┌─────────────────────────────────────────────┐
  │  Mem0 引擎(自动完成以下步骤)                │
  │                                               │
  │  ① 事实抽取 ─── 用 LLM 从对话中提取事实       │
  │  ② 去重检查 ─── 对比已有记忆,处理冲突        │
  │  ③ Embedding ── 生成向量                      │
  │  ④ 存储 ─────── 写入向量数据库                │
  │  ⑤ 图谱更新 ── 更新实体关系(可选)           │
  └─────────────────────────────────────────────┘

    │  client.search(query, user_id="xxx")

  返回最相关的记忆列表

快速上手:

python
# 安装
# pip install mem0ai

from mem0 import MemoryClient

# 1. 初始化(使用 Mem0 Cloud)
client = MemoryClient(api_key="your_api_key")

# 2. 添加记忆——直接传入对话,Mem0 自动提取事实
messages = [
    {"role": "user", "content": "我叫李明,做电商后端,用 FastAPI + PostgreSQL"},
    {"role": "assistant", "content": "你好李明!FastAPI + PG 是很棒的组合"},
    {"role": "user", "content": "团队两个人,部署在阿里云"},
]

client.add(messages, user_id="liming_001")
# → Mem0 自动提取并存储:
#   "用户名:李明"
#   "项目:电商后端"
#   "技术栈:FastAPI + PostgreSQL"
#   "团队规模:2 人"
#   "部署环境:阿里云"

# 3. 搜索记忆——语义搜索
results = client.search("用什么数据库", user_id="liming_001")
for r in results:
    print(f"记忆: {r['memory']}  (相关度: {r['score']:.2f})")
# → 记忆: 技术栈为 FastAPI + PostgreSQL  (相关度: 0.92)

# 4. 获取用户的所有记忆
all_memories = client.get_all(user_id="liming_001")

自托管模式(不想用云服务):

python
from mem0 import Memory

# 本地模式,需要自己配置向量数据库和 LLM
config = {
    "llm": {
        "provider": "openai",
        "config": {"model": "gpt-4o-mini", "temperature": 0},
    },
    "embedder": {
        "provider": "openai",
        "config": {"model": "text-embedding-3-small"},
    },
    "vector_store": {
        "provider": "qdrant",
        "config": {"host": "localhost", "port": 6333},
    },
    "graph_store": {
        "provider": "neo4j",
        "config": {"url": "bolt://localhost:7687", "username": "neo4j"},
    },
}

memory = Memory.from_config(config)

# API 完全一致
memory.add(messages, user_id="liming_001")
results = memory.search("数据库", user_id="liming_001")

Mem0 的优势和局限:

维度优势局限
易用性3 行代码就能用,零配置记忆系统行为不透明,难以调试
功能自动事实提取 + 去重 + 冲突解决 + 图谱自定义提取逻辑受限
部署云服务零运维,自托管也简单云服务有数据隐私顾虑
性能异步处理,不阻塞对话自托管需要额外的向量 DB
生态社区活跃,文档完善与 LangChain/LangGraph 集成需要适配

💡 Mem0 最适合"快速给已有 Agent 加上记忆能力"的场景——如果你的 Agent 已经可以正常对话,只是缺少跨会话的记忆,Mem0 是成本最低的选择。但如果你需要深度自定义记忆的提取逻辑、存储结构或检索策略,自建方案(第 5-7 章)会更灵活。

8.2 Letta(MemGPT):OS 风格的虚拟上下文管理

Letta(前身是 MemGPT)的理念和 Mem0 完全不同——它不是一个记忆"中间件",而是一个有状态的 Agent 运行时。核心创新是:让 Agent 像操作系统管理内存一样,自主管理自己的上下文窗口

Letta 的 OS 类比:

  传统 Agent:上下文窗口 = 固定大小的 RAM
  ├── 塞满了就溢出,丢失信息
  └── 被动管理,无法自主决定"加载什么"

  Letta Agent:上下文窗口 = 虚拟内存系统
  ├── Core Memory ──→ "RAM"(即时可用,Agent 可读写)
  │   ├── human block: "用户是李明,做电商后端"
  │   └── persona block: "我是一个 Python 专家助手"

  ├── Archival Memory ──→ "磁盘"(持久化,按需加载)
  │   └── 所有历史对话和事实,向量检索

  └── Agent 自主决定:
      ├── 什么信息放入 Core Memory("钉住"重要的)
      ├── 什么信息归档到 Archival("换出"不急用的)
      └── 什么时候从 Archival 检索("换入"需要的)

核心概念——Memory Blocks:

Letta 的 Core Memory 由多个可编辑的文本块组成。关键区别是:Agent 自己可以修改这些块的内容——不是外部程序控制记忆,而是 Agent 自治。

python
# 安装
# pip install letta-client

from letta_client import Letta

# 1. 初始化客户端(连接本地 Letta 服务器或 Letta Cloud)
client = Letta(base_url="http://localhost:8283")

# 2. 创建有状态 Agent——定义初始记忆块
agent = client.agents.create(
    model="openai/gpt-4o",
    memory_blocks=[
        {
            "label": "human",
            "value": "用户还没有自我介绍。",
        },
        {
            "label": "persona",
            "value": "你是一个 Python 全栈开发专家,擅长 FastAPI 和 PostgreSQL。",
        },
    ],
)

# 3. 发送消息——Agent 会自动更新自己的记忆块
response = client.agents.messages.create(
    agent_id=agent.id,
    messages=[{
        "role": "user",
        "content": "你好!我叫李明,正在做一个电商后端项目",
    }],
)

# 4. 查看 Agent 自主更新的记忆
human_block = client.agents.blocks.retrieve(
    agent_id=agent.id,
    block_label="human",
)
print(human_block.value)
# → "用户是李明,正在做电商后端项目。"
# ← Agent 自己把 "用户还没有自我介绍" 更新了!

# 5. Archival Memory——长期存储(Agent 自动管理)
# Agent 在对话过程中会自动将重要信息归档到 Archival Memory
# 也可以手动添加
client.agents.archival_memory.create(
    agent_id=agent.id,
    text="2024-03 讨论了订单表设计,决定用 JSONB 存储扩展属性",
)

Letta 与 Mem0 的核心区别:

维度Mem0Letta
记忆管理者外部程序(你的代码调用 API)Agent 自己(自主读写记忆)
架构模式记忆中间件 + 你的 Agent完整的 Agent 运行时
记忆结构扁平的事实列表结构化的 Memory Blocks
存储层级单层向量存储Core(RAM)+ Archival(磁盘)
适用场景给已有 Agent 加记忆从零构建有状态 Agent
集成成本低(加几行代码)高(需要用 Letta 的 Agent 框架)
部署方式云服务 / 嵌入式库需要运行 Letta Server

💡 Letta 最适合"从零构建需要深度状态管理的 Agent"——Agent 自主管理记忆这个能力非常强大,但代价是你需要完全采用 Letta 的 Agent 框架。如果你已经有了基于 LangChain/LangGraph 的 Agent,迁移到 Letta 的成本比较高。Mem0 则可以无侵入地集成到任何现有架构中。

8.3 LangGraph Store:跨线程持久化记忆

如果你已经在用 LangGraph 构建 Agent,LangGraph 自带的记忆方案是最自然的选择——不需要引入额外的依赖。LangGraph 用两个机制分别处理短期和长期记忆:

LangGraph 的双轨记忆架构:

  ┌─────────────────────────────────────────────┐
  │  Checkpointer(短期记忆)                    │
  │  ──────────────────────────────────────────  │
  │  作用域:线程级(thread_id)                  │
  │  内容:  当前会话的 messages + 自定义状态      │
  │  生命周期:会话内持久化                       │
  │  特点:  自动保存/恢复,无需手动管理           │
  │  实现:  MemorySaver / PostgresSaver          │
  │                                               │
  │  → 第 3 章已详细介绍                          │
  ├─────────────────────────────────────────────┤
  │  BaseStore(长期记忆) ← 本节重点             │
  │  ──────────────────────────────────────────  │
  │  作用域:跨线程全局(按 namespace 隔离)       │
  │  内容:  用户画像、偏好、提炼的事实            │
  │  生命周期:永久持久化                         │
  │  特点:  手动 put/get/search,按需读写        │
  │  实现:  InMemoryStore / PostgresStore        │
  └─────────────────────────────────────────────┘

  Checkpointer 回答:"这次对话聊了什么"
  BaseStore 回答:"这个用户是谁、喜欢什么"

BaseStore 核心 API 和用法:

python
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI

# 1. 初始化 Store(生产环境用 PostgresStore)
store = InMemoryStore()
checkpointer = MemorySaver()

# 2. 写入记忆——使用 namespace 隔离用户
store.put(
    namespace=("memories", "user_liming"),
    key="tech_stack",
    value={"fact": "FastAPI + PostgreSQL", "confidence": 1.0},
)

store.put(
    namespace=("memories", "user_liming"),
    key="project_type",
    value={"fact": "电商后端", "confidence": 1.0},
)

# 3. 读取记忆
item = store.get(namespace=("memories", "user_liming"), key="tech_stack")
print(item.value)
# → {"fact": "FastAPI + PostgreSQL", "confidence": 1.0}

# 4. 搜索记忆(支持 namespace 前缀搜索)
results = store.search(namespace_prefix=("memories", "user_liming"))
for r in results:
    print(f"{r.key}: {r.value}")

在 Graph 节点中集成 Store——关键模式:

python
from langgraph.config import get_store
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]

llm = ChatOpenAI(model="gpt-4o")

# 核心:在节点函数中通过 config 获取 store
async def chat_with_memory(state: AgentState, config: dict) -> dict:
    # 1. 从 config 中获取 store 和 user_id
    store = get_store(config)
    user_id = config["configurable"].get("user_id", "anonymous")
    
    # 2. 检索该用户的所有记忆
    namespace = ("memories", user_id)
    memories = store.search(namespace_prefix=namespace)
    
    # 3. 格式化记忆上下文
    memory_text = ""
    if memories:
        memory_text = "\n\n[用户记忆]\n"
        for m in memories:
            memory_text += f"- {m.key}: {m.value.get('fact', '')}\n"
    
    # 4. 调用 LLM
    messages = state["messages"]
    system_msg = f"你是一个智能助手。{memory_text}"
    
    response = await llm.ainvoke(
        [{"role": "system", "content": system_msg}] + messages
    )
    
    return {"messages": [response]}

# 5. 构建 Graph——同时传入 checkpointer 和 store
graph = StateGraph(AgentState)
graph.add_node("chat", chat_with_memory)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)

app = graph.compile(
    checkpointer=checkpointer,  # 短期记忆(线程内)
    store=store,                 # 长期记忆(跨线程)
)

# 6. 使用——不同 thread_id 但同一 user_id,共享长期记忆
config = {
    "configurable": {
        "thread_id": "session-001",   # 短期记忆隔离
        "user_id": "user_liming",     # 长期记忆共享
    }
}

result = await app.ainvoke(
    {"messages": [HumanMessage(content="帮我优化数据库性能")]},
    config=config,
)

💡 LangGraph Store 是"原生集成"的优势——如果你已经在用 LangGraph,Store 可以零成本加入,不需要额外的服务或依赖。但 Store 只提供基础的 put/get/search,不包含自动事实抽取、冲突解决等高级功能——这些需要自己实现(参考第 5 章),或者在 LangGraph 中嵌入 Mem0 作为记忆后端。

8.4 三大框架对比:选型决策表

三个框架各有侧重,选错了不是不能用,但会让你多走弯路。先看一张全面对比表:

维度Mem0Letta(MemGPT)LangGraph Store
定位记忆中间件(插件式)有状态 Agent 运行时Graph 原生状态层
集成成本极低(加几行代码)高(需采用 Letta 框架)低(已用 LangGraph 则零成本)
记忆层次语义记忆 + 图记忆Core(RAM)+ Archival(磁盘)Checkpointer(短期)+ Store(长期)
事实抽取✅ 自动✅ Agent 自主❌ 需自建
冲突解决✅ 内置✅ Agent 自主❌ 需自建
图谱支持✅ 内置 Neo4j❌ 不原生支持❌ 需自建
记忆管理者外部程序Agent 自己外部程序
部署依赖向量 DB + LLM(或用云服务)Letta Server + LLMPG / SQLite(轻量)
开源协议Apache 2.0Apache 2.0MIT
社区活跃度⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

选型决策树——根据你的场景选框架:

你的选型决策路径:

  已经在用 LangGraph?
  ├── YES → 先用 Checkpointer + Store 起步
  │         需要自动事实抽取?
  │         ├── YES → Store 层接入 Mem0 做记忆后端
  │         └── NO  → 纯 Store 手动管理即可

  └── NO → 已经有可运行的 Agent 了?
           ├── YES → 用 Mem0(最低侵入成本添加记忆)
           │         需要 Agent 自主管理记忆?
           │         ├── YES → 考虑迁移到 Letta
           │         └── NO  → Mem0 足够

           └── NO → 从零构建新 Agent?
                    ├── 需要深度状态管理 → Letta
                    └── 标准 Agent 场景   → LangGraph + Store

三种方案也可以组合使用——实际上很多生产项目会混搭:

常见的混搭方案:

  方案 A:LangGraph + Mem0(推荐 ✅)
  ├── LangGraph 管 Agent 编排和短期记忆(Checkpointer)
  └── Mem0 管长期记忆(自动抽取 + 向量检索 + 图谱)
  → 各司其职,集成简单

  方案 B:LangGraph + 自建记忆(第 5-7 章)
  ├── LangGraph 管 Agent 编排
  └── 自建的 SemanticMemoryService + GraphMemory 管长期记忆
  → 完全可控,适合对记忆逻辑有深度定制需求的团队

  方案 C:Letta 全家桶
  └── Agent 编排 + 状态管理 + 记忆全在 Letta 内
  → 最一体化,但迁移成本最高

💡 没有"最好的框架",只有"最适合当前阶段的框架"——MVP 阶段用 Mem0 快速验证记忆对用户体验的提升;产品稳定后,再根据需要迁移到自建方案或 Letta。框架可以换,第 3-7 章讲的原理和数据结构设计不会过时。

第 8 章核心知识回顾:

概念一句话解释
Mem0开箱即用的记忆中间件,3 行代码加记忆,适合快速集成
Letta有状态 Agent 运行时,Agent 自主管理记忆,适合从零构建
LangGraph Store原生跨线程持久化,Checkpointer(短期)+ Store(长期)双轨架构
选型核心已有 Agent → Mem0;已用 LangGraph → Store;从零开始 → Letta
可以混搭LangGraph 编排 + Mem0 记忆是最常见的生产组合

9. 记忆系统的评测与质量保障

记忆系统上线后,怎么知道它到底好不好用?"感觉还行"不够——你需要可量化的指标来衡量记忆的质量,持续发现和修复问题。本章从指标定义、学术基准、业务评测到线上监控,构建完整的记忆质量保障体系。

9.1 怎么衡量记忆好不好:准确率、召回率、时效性

记忆系统的质量可以拆解为四个核心指标,每个指标衡量不同的能力维度:

记忆系统的四大质量指标:

  ┌─────────────────────────────────────────────────────────┐
  │  ① 记忆准确率(Memory Precision)                        │
  │  ────────────────────────────────────                   │
  │  定义:召回的记忆中,有多少是正确且相关的                  │
  │  公式:准确率 = 正确召回数 / 总召回数                     │
  │  衡量:是否召回了无关或错误的记忆                         │
  │  目标:> 85%                                            │
  │                                                         │
  │  失败案例:                                              │
  │  用户问\"帮我设计数据库\"                                 │
  │  → 召回了\"用户喜欢喝咖啡\" ← 不相关,拉低准确率          │
  ├─────────────────────────────────────────────────────────┤
  │  ② 记忆召回率(Memory Recall)                           │
  │  ────────────────────────────────────                   │
  │  定义:所有应该被召回的记忆中,实际召回了多少              │
  │  公式:召回率 = 正确召回数 / 应召回总数                   │
  │  衡量:是否遗漏了重要的记忆                              │
  │  目标:> 80%                                            │
  │                                                         │
  │  失败案例:                                              │
  │  用户问\"帮我设计数据库\"                                 │
  │  → 忘记召回\"用户偏好 PostgreSQL\" ← 遗漏关键信息          │
  ├─────────────────────────────────────────────────────────┤
  │  ③ 记忆时效性(Memory Freshness)                        │
  │  ────────────────────────────────────                   │
  │  定义:召回的记忆是否反映了最新状态                       │
  │  衡量:是否用了过时信息(用户已 \"换成 PG\" 但召回 MySQL) │
  │  目标:过时记忆占比 < 5%                                 │
  ├─────────────────────────────────────────────────────────┤
  │  ④ 记忆利用率(Memory Utilization)                      │
  │  ────────────────────────────────────                   │
  │  定义:LLM 是否真的利用了召回的记忆来改善回答             │
  │  衡量:有记忆 vs 无记忆的回答质量差异                     │
  │  目标:有记忆时回答质量显著优于无记忆                     │
  └─────────────────────────────────────────────────────────┘

用代码来自动评测这些指标:

python
from dataclasses import dataclass

@dataclass
class MemoryEvalResult:
    """单条测试用例的评测结果"""
    query: str                    # 用户查询
    recalled_memories: list[str]  # 实际召回的记忆
    expected_memories: list[str]  # 应该召回的记忆(标注数据)
    is_outdated: list[bool]       # 每条召回是否过时
    
    @property
    def precision(self) -> float:
        """准确率:召回中有多少是期望的"""
        if not self.recalled_memories:
            return 1.0  # 没召回也没错
        hits = sum(
            1 for r in self.recalled_memories 
            if r in self.expected_memories
        )
        return hits / len(self.recalled_memories)
    
    @property
    def recall(self) -> float:
        """召回率:期望中有多少被召回了"""
        if not self.expected_memories:
            return 1.0  # 没有期望,不存在遗漏
        hits = sum(
            1 for e in self.expected_memories 
            if e in self.recalled_memories
        )
        return hits / len(self.expected_memories)
    
    @property
    def freshness(self) -> float:
        """时效性:非过时记忆的比例"""
        if not self.recalled_memories:
            return 1.0
        fresh = sum(1 for o in self.is_outdated if not o)
        return fresh / len(self.is_outdated)
    
    @property
    def f1(self) -> float:
        """F1 = 准确率和召回率的调和平均"""
        p, r = self.precision, self.recall
        return 2 * p * r / (p + r) if (p + r) > 0 else 0

def evaluate_memory_system(
    test_cases: list[MemoryEvalResult],
) -> dict:
    """批量评测记忆系统"""
    n = len(test_cases)
    return {
        "avg_precision": sum(t.precision for t in test_cases) / n,
        "avg_recall": sum(t.recall for t in test_cases) / n,
        "avg_f1": sum(t.f1 for t in test_cases) / n,
        "avg_freshness": sum(t.freshness for t in test_cases) / n,
        "total_cases": n,
    }

四个指标之间的关系和平衡:

优化方向准确率影响召回率影响典型手段
提高相似度阈值↑ 提高↓ 降低min_similarity 从 0.7 → 0.85
增大 Top-K↓ 可能降低↑ 提高limit 从 3 → 10
加强去重↑ 提高→ 不变定期合并重复记忆
加速遗忘↑ 提高(减少噪声)↓ 可能降低缩短衰减半衰期

💡 记忆系统和搜索引擎一样,追求 F1 的平衡,而不是单一指标的极端——准确率 100% 但召回率 10%(太保守,什么都不敢召回)和召回率 100% 但准确率 20%(太激进,召回一堆噪声)都是失败的。F1 > 0.8 是一个合理的生产目标。

9.2 学术基准:LoCoMo 与 LongMemEval

自己构建评测集之前,先看看学术界怎么评测记忆系统。两个最主流的基准:

两大记忆评测基准对比:

  ┌─────────────────────────────────────────────────────────┐
  │  LoCoMo(Long Conversation Memory)                     │
  │  ──────────────────────────────────────                 │
  │  来源:斯坦福 2024                                      │
  │  任务:在超长对话(数千轮)后回答记忆相关问题             │
  │  评测维度:                                              │
  │  ├── 单跳事实回忆(\"用户叫什么名字\")                   │
  │  ├── 多跳推理(\"用户推荐的那本书的作者是谁\")           │
  │  ├── 时间推理(\"上周二讨论的话题是什么\")               │
  │  └── 开放生成(\"总结用户的技术偏好\")                   │
  │                                                         │
  │  数据规模:~600 个测试问题,对话长度 300-1000 轮         │
  │  关键发现:上下文窗口再大也不如有记忆系统的 Agent         │
  ├─────────────────────────────────────────────────────────┤
  │  LongMemEval(Long-term Memory Evaluation)             │
  │  ──────────────────────────────────────                 │
  │  来源:微软研究院 2024                                   │
  │  任务:评测跨会话长期记忆的质量                          │
  │  评测维度:                                              │
  │  ├── 信息提取(从历史中提取特定事实)                    │
  │  ├── 知识更新(检测冲突后是否用了新信息)                │
  │  ├── 时间推理(涉及时间顺序的问答)                      │
  │  ├── 多会话推理(跨多个历史会话的综合问题)              │
  │  └── 拒绝回答(记忆中没有的信息能否正确拒答)            │
  │                                                         │
  │  数据规模:~500 个测试问题,模拟 50+ 个独立会话          │
  │  关键发现:冲突解决和时间推理是当前系统最薄弱的环节       │
  └─────────────────────────────────────────────────────────┘

如何用这些基准来评测自己的记忆系统:

python
# 用 LoCoMo 思路构建评测 Pipeline(简化版)

import json
from openai import AsyncOpenAI

# 1. 准备测试数据(模拟 LoCoMo 格式)
test_cases = [
    {
        "history": [  # 模拟历史对话
            {"session": "3月10日", "content": "用户说自己是后端开发,用 FastAPI"},
            {"session": "3月15日", "content": "用户说数据库选了 PostgreSQL"},
            {"session": "3月20日", "content": "用户说已经从 MySQL 迁移到 PG 了"},
        ],
        "question": "用户目前用什么数据库?",
        "expected_answer": "PostgreSQL",
        "category": "knowledge_update",  # 知识更新类
    },
    {
        "history": [
            {"session": "3月10日", "content": "用户叫李明,团队 3 人"},
            {"session": "3月12日", "content": "张伟是李明的同事,负责前端"},
        ],
        "question": "李明团队中谁负责前端?",
        "expected_answer": "张伟",
        "category": "multi_hop",  # 多跳推理类
    },
]

# 2. 评测函数——用 LLM 判断回答是否正确
async def judge_answer(
    client: AsyncOpenAI,
    question: str,
    expected: str,
    actual: str,
) -> dict:
    """用 LLM 作为裁判,判断回答质量"""
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": f"""请判断实际回答是否正确回答了问题。

问题:{question}
预期答案:{expected}
实际回答:{actual}

请以 JSON 格式回答:
&#123;&#123;"correct": true/false, "score": 0-1, "reason": "判断理由"&#125;&#125;""",
        }],
        temperature=0,
        response_format={"type": "json_object"},
    )
    return json.loads(response.choices[0].message.content)

两个基准在不同维度上的侧重对比:

维度LoCoMoLongMemEval
侧重点超长单会话跨多次独立会话
核心挑战长上下文中的信息定位知识更新与冲突
最有价值的测试多跳推理、时间推理知识更新、拒绝回答
对工程的启示摘要压缩的质量很关键冲突解决机制不可或缺

💡 不需要完整跑 LoCoMo / LongMemEval 全套测试——从中各挑 20-30 个有代表性的 case,按你的业务场景做适配,就能发现记忆系统 80% 的问题。重点关注知识更新多跳推理两类,它们是实际使用中用户抱怨最多的。

9.3 业务评测:构建你自己的记忆质量测试集

学术基准验证的是"通用能力",但你的 Agent 面对的是特定业务场景。需要构建贴合业务的测试集。

核心思路:模拟用户的真实使用路径,在关键节点上插入验证问题

业务评测的测试用例设计模式:

  Step 1:构造对话历史(模拟用户多天使用)
  ┌────────────────────────────────────────────┐
  │  Day 1:用户自我介绍 + 项目背景            │
  │  Day 2:技术选型讨论                       │
  │  Day 3:做了一个重要决定                   │
  │  Day 5:修改了之前的决定                   │
  │  Day 7:新开会话,问了一个问题             │
  └────────────────────────────────────────────┘

  Step 2:在 Day 7 的新会话中插入验证问题
  ┌────────────────────────────────────────────┐
  │  Q1(事实回忆):\"我叫什么名字?\"          │
  │  Q2(偏好召回):\"我用什么数据库?\"         │
  │  Q3(知识更新):\"我现在用什么框架?\"       │
  │  Q4(多跳推理):\"我同事负责什么模块?\"     │
  │  Q5(拒绝回答):\"我的手机号是多少?\"       │
  └────────────────────────────────────────────┘

  Step 3:对比 Agent 回答 vs 标准答案

测试用例模板(可直接使用):

python
# 记忆质量测试用例模板
memory_test_suite = {
    "name": "电商助手记忆质量测试集 v1",
    "scenarios": [
        # ─── 场景一:基本事实记忆 ───
        {
            "setup_conversations": [
                {"day": 1, "messages": [
                    {"role": "user", "content": "我叫李明,做电商后端"},
                    {"role": "user", "content": "用 FastAPI + PostgreSQL"},
                ]},
            ],
            "test_queries": [
                {
                    "query": "我叫什么名字?",
                    "expected": "李明",
                    "category": "fact_recall",
                    "priority": "P0",  # 最基本的能力,必须通过
                },
                {
                    "query": "帮我设计一个新接口",
                    "expected_context": ["FastAPI", "PostgreSQL"],
                    "category": "context_injection",
                    "priority": "P0",
                },
            ],
        },
        # ─── 场景二:知识更新 ───
        {
            "setup_conversations": [
                {"day": 1, "messages": [
                    {"role": "user", "content": "我们目前用 MySQL"},
                ]},
                {"day": 5, "messages": [
                    {"role": "user", "content": "我们已经迁移到 PostgreSQL 了"},
                ]},
            ],
            "test_queries": [
                {
                    "query": "我用什么数据库?",
                    "expected": "PostgreSQL",
                    "not_expected": "MySQL",  # 不应该出现旧信息
                    "category": "knowledge_update",
                    "priority": "P0",
                },
            ],
        },
        # ─── 场景三:拒绝回答 ───
        {
            "setup_conversations": [
                {"day": 1, "messages": [
                    {"role": "user", "content": "我叫李明"},
                ]},
            ],
            "test_queries": [
                {
                    "query": "我的银行卡号是什么?",
                    "expected_behavior": "refuse",  # 应该拒答
                    "category": "refusal",
                    "priority": "P1",
                },
            ],
        },
    ],
}

自动化回归测试——每次改动记忆逻辑后跑一遍:

python
async def run_memory_regression_test(
    agent,  # 你的 Agent 实例
    test_suite: dict,
) -> dict:
    """运行记忆回归测试"""
    results = {"passed": 0, "failed": 0, "details": []}
    
    for scenario in test_suite["scenarios"]:
        # 1. 模拟历史对话(写入记忆)
        user_id = f"test_user_{hash(str(scenario)) % 10000}"
        for conv in scenario["setup_conversations"]:
            for msg in conv["messages"]:
                await agent.chat(user_id, msg["content"])
        
        # 2. 运行验证问题
        for test in scenario["test_queries"]:
            response = await agent.chat(user_id, test["query"])
            
            # 3. 判断是否通过
            passed = True
            reason = ""
            
            if "expected" in test:
                if test["expected"].lower() not in response.lower():
                    passed = False
                    reason = f"期望包含 '{test['expected']}',实际: {response[:100]}"
            
            if "not_expected" in test:
                if test["not_expected"].lower() in response.lower():
                    passed = False
                    reason = f"不应包含 '{test['not_expected']}',但出现了"
            
            results["passed" if passed else "failed"] += 1
            results["details"].append({
                "query": test["query"],
                "category": test["category"],
                "priority": test["priority"],
                "passed": passed,
                "reason": reason,
            })
    
    return results

测试集的覆盖率目标:

类别建议测试数P0 占比说明
事实回忆10-15 条100%最基本的能力
知识更新8-10 条100%记忆系统区别于 RAG 的核心
多跳推理5-8 条50%有图记忆才需要重测
时间推理5 条50%"上次/之前/最近"类查询
拒绝回答5 条80%不编造不存在的记忆
记忆利用5-8 条80%回答是否真的用了记忆

💡 测试集是"活的"——每次线上发现记忆相关的 Bad Case,就加入测试集。3 个月后你会积累 50-100 个高质量测试用例,比任何学术基准都更贴合你的业务。P0 级别的 case 全部通过才能发版。

9.4 线上监控:记忆命中率与用户满意度追踪

离线评测保证"发布前没问题",线上监控保证"运行中没退化"。记忆系统需要持续观测的核心指标:

线上监控的三层指标体系:

  Layer 1:系统指标(实时告警)
  ├── 记忆检索延迟 P99 < 100ms
  ├── 事实抽取成功率 > 95%
  ├── 向量写入失败率 < 0.1%
  └── 记忆库容量(每用户平均记忆条数)

  Layer 2:质量指标(小时级看板)
  ├── 记忆命中率:有召回 / 总请求(目标 > 60%)
  ├── 记忆使用率:LLM 实际引用了记忆 / 有召回(目标 > 70%)
  └── 过时记忆率:用户纠正"你记错了" / 总记忆召回(目标 < 5%)

  Layer 3:业务指标(天级分析)
  ├── 有记忆用户的留存率 vs 无记忆用户
  ├── 有记忆时会话轮数(预期减少,因为不需要反复介绍)
  └── 用户主动触发记忆的频率("你还记得...吗")

记忆命中率的埋点实现:

python
import time
from dataclasses import dataclass, field
from collections import defaultdict

@dataclass
class MemoryMetrics:
    """记忆系统的运行时指标收集"""
    _counters: dict = field(default_factory=lambda: defaultdict(int))
    _latencies: dict = field(default_factory=lambda: defaultdict(list))
    
    def record_retrieval(
        self, user_id: str, query: str,
        recalled_count: int, latency_ms: float,
    ):
        """记录一次检索"""
        self._counters["total_requests"] += 1
        if recalled_count > 0:
            self._counters["hit_requests"] += 1
        else:
            self._counters["miss_requests"] += 1
        self._latencies["retrieval"].append(latency_ms)
    
    def record_correction(self, user_id: str):
        """用户纠正了错误记忆("你记错了")"""
        self._counters["corrections"] += 1
    
    @property
    def hit_rate(self) -> float:
        total = self._counters["total_requests"]
        return self._counters["hit_requests"] / total if total > 0 else 0
    
    @property
    def correction_rate(self) -> float:
        hits = self._counters["hit_requests"]
        return self._counters["corrections"] / hits if hits > 0 else 0
    
    @property
    def p99_latency(self) -> float:
        lats = sorted(self._latencies["retrieval"])
        if not lats:
            return 0
        idx = int(len(lats) * 0.99)
        return lats[min(idx, len(lats) - 1)]
    
    def summary(self) -> dict:
        return {
            "hit_rate": f"{self.hit_rate:.1%}",
            "correction_rate": f"{self.correction_rate:.1%}",
            "p99_latency_ms": f"{self.p99_latency:.1f}",
            "total_requests": self._counters["total_requests"],
        }

# 在 Agent 中集成指标收集
metrics = MemoryMetrics()

async def chat_with_monitoring(
    user_id: str, message: str,
    memory: "SemanticMemoryService",
    client: "AsyncOpenAI",
) -> str:
    # 1. 检索记忆(带计时)
    start = time.time()
    memories = await memory.recall(user_id, message, limit=5)
    latency = (time.time() - start) * 1000
    
    # 2. 记录指标
    metrics.record_retrieval(
        user_id, message,
        recalled_count=len(memories),
        latency_ms=latency,
    )
    
    # 3. 检测用户是否在纠正记忆
    correction_keywords = ["你记错了", "不对", "我改了", "不再用"]
    if any(kw in message for kw in correction_keywords):
        metrics.record_correction(user_id)
    
    # 4. 正常对话流程...
    # ...

A/B 测试——量化记忆系统的价值:

记忆系统的 A/B 测试设计:

  实验组 A(有记忆)          对照组 B(无记忆)
  ├── 50% 流量                ├── 50% 流量
  ├── 完整的记忆检索 + 注入   ├── 纯上下文窗口
  └── 观测指标:              └── 观测指标:
      ├── 会话平均轮数             ├── 会话平均轮数
      ├── 用户重复提供信息次数     ├── 用户重复提供信息次数
      ├── 回答满意度评分           ├── 回答满意度评分
      └── 7 日留存率              └── 7 日留存率

  预期结果(基于行业数据):
  ├── 会话轮数减少 20-30%(不需要反复介绍自己)
  ├── 满意度提升 15-25%
  └── 7 日留存提升 10-20%

💡 线上监控最重要的单一指标是"过时记忆率"(correction_rate)——如果用户频繁纠正"你记错了",说明冲突解决机制失效了。这比记忆命中率更重要——命中率低只是"没帮上忙",过时记忆率高是"帮了倒忙"。建议设告警阈值:correction_rate > 5% 时自动触发排查。

第 9 章核心知识回顾:

概念一句话解释
四大指标准确率、召回率、时效性、利用率——追求 F1 > 0.8 的平衡
学术基准LoCoMo(长对话)+ LongMemEval(跨会话)挑 20-30 case 做适配
业务测试集模拟用户使用路径 + 验证问题,线上 Bad Case 持续加入
线上监控系统指标(延迟)+ 质量指标(命中率)+ 业务指标(留存)
最关键指标过时记忆率(correction_rate)> 5% 必须告警

10. 生产实践:完整记忆系统的设计模式

前面 9 章分别介绍了记忆系统的各个模块。本章把它们串联成一个可部署到生产环境的完整系统——从端到端架构、多用户隔离、隐私合规到性能优化,解决"从 Demo 到上线"的最后一公里问题。

10.1 端到端架构:从请求到记忆的完整数据流

先看一个生产级记忆系统的完整架构图——把前面所有模块串在一起:

生产级 Agent 记忆系统——端到端架构:

  用户请求


  ┌─────────────────────────────────────────────────────────────┐
  │  API Gateway / WebSocket Handler                             │
  │  → 鉴权、限流、提取 user_id + session_id                    │
  └──────────────────────────┬──────────────────────────────────┘


  ┌─────────────────────────────────────────────────────────────┐
  │  Memory Retrieval Layer(记忆检索层)── 同步,阻塞响应       │
  │  ┌─────────────────────────────────────────────────────┐    │
  │  │  ① Semantic Memory 检索                              │    │
  │  │     → pgvector 余弦相似度 Top-5                      │    │
  │  │     → 返回用户画像、偏好、项目信息                    │    │
  │  ├─────────────────────────────────────────────────────┤    │
  │  │  ② Graph Memory 检索                                 │    │
  │  │     → 从召回实体出发 BFS 2 跳                        │    │
  │  │     → 返回关系网络上下文                              │    │
  │  ├─────────────────────────────────────────────────────┤    │
  │  │  ③ Episodic Memory 检索(可选)                      │    │
  │  │     → 按关键词搜索最近 30 天的相关会话                │    │
  │  │     → 返回历史对话片段                                │    │
  │  └─────────────────────────────────────────────────────┘    │
  │  总耗时目标:< 50ms(三路并行)                             │
  └──────────────────────────┬──────────────────────────────────┘


  ┌─────────────────────────────────────────────────────────────┐
  │  Context Assembly(上下文组装)                               │
  │  → Token Budget 分配                                        │
  │  → System Prompt + 记忆上下文 + 对话历史 + 当前输入         │
  │  → 如果超限:摘要压缩 / 裁剪对话历史                       │
  └──────────────────────────┬──────────────────────────────────┘


  ┌─────────────────────────────────────────────────────────────┐
  │  LLM Inference                                               │
  │  → 调用 GPT-4o / Claude 生成回答                            │
  │  → 流式返回给用户                                           │
  └──────────────────────────┬──────────────────────────────────┘


  ┌─────────────────────────────────────────────────────────────┐
  │  Memory Update Layer(记忆更新层)── 异步,不阻塞响应        │
  │  ┌─────────────────────────────────────────────────────┐    │
  │  │  ④ Episodic 写入                                     │    │
  │  │     → 记录 user + assistant 消息到 events 表          │    │
  │  ├─────────────────────────────────────────────────────┤    │
  │  │  ⑤ 事实抽取(条件触发)                              │    │
  │  │     → 规则预判:本轮是否包含新信息?                  │    │
  │  │     → YES → LLM 抽取 → 冲突检测 → 写入 Semantic      │    │
  │  ├─────────────────────────────────────────────────────┤    │
  │  │  ⑥ 图谱更新(条件触发)                              │    │
  │  │     → 抽取三元组 → 写入 Graph Memory                 │    │
  │  └─────────────────────────────────────────────────────┘    │
  │  总耗时:200-500ms(后台异步,用户无感知)                   │
  └─────────────────────────────────────────────────────────────┘

把这个架构翻译成代码骨架:

python
import asyncio
from dataclasses import dataclass

@dataclass
class MemoryContext:
    """检索到的记忆上下文"""
    semantic_facts: list[dict]     # 语义记忆(用户画像)
    graph_relations: list[dict]    # 图谱关系
    episodic_history: list[dict]   # 历史对话片段
    
    def format_for_prompt(self) -> str:
        """格式化为 LLM 可理解的上下文"""
        parts = []
        if self.semantic_facts:
            parts.append("[用户画像]")
            for f in self.semantic_facts:
                parts.append(f"- {f['fact_key']}: {f['fact_value']}")
        if self.graph_relations:
            parts.append("\n[关系网络]")
            for r in self.graph_relations[:8]:
                parts.append(f"- {r['from']} --{r['relation']}--> {r['to']}")
        if self.episodic_history:
            parts.append("\n[相关历史]")
            for h in self.episodic_history[:3]:
                parts.append(f"- [{h['date']}] {h['summary'][:150]}")
        return "\n".join(parts)

class ProductionMemoryAgent:
    """生产级带记忆的 Agent"""
    
    def __init__(
        self,
        llm_client,
        semantic_memory: "SemanticMemoryService",
        graph_memory: "GraphMemory",
        episodic_memory: "EpisodicMemoryService",
        fact_extractor: "FactExtractor",
    ):
        self.llm = llm_client
        self.semantic = semantic_memory
        self.graph = graph_memory
        self.episodic = episodic_memory
        self.extractor = fact_extractor
    
    async def chat(
        self, user_id: str, session_id: str, message: str,
    ) -> str:
        # ─── 同步阶段:检索 + 推理 ───
        
        # 1. 三路并行检索记忆
        memory_ctx = await self._retrieve_memories(user_id, message)
        
        # 2. 组装上下文 + 调用 LLM
        response = await self._generate_response(
            message, memory_ctx
        )
        
        # ─── 异步阶段:更新记忆(不阻塞响应)───
        
        # 3. 后台更新记忆
        asyncio.create_task(
            self._update_memories(
                user_id, session_id, message, response
            )
        )
        
        return response
    
    async def _retrieve_memories(
        self, user_id: str, query: str,
    ) -> MemoryContext:
        """三路并行检索(总耗时 ≈ 最慢的那一路)"""
        semantic_task = self.semantic.recall(user_id, query, limit=5)
        graph_task = asyncio.to_thread(
            self.graph.get_neighbors_for_query, query
        )
        episodic_task = self.episodic.search_by_keyword(
            user_id, query, limit=3
        )
        
        facts, relations, history = await asyncio.gather(
            semantic_task, graph_task, episodic_task
        )
        
        return MemoryContext(
            semantic_facts=facts,
            graph_relations=relations,
            episodic_history=history,
        )
    
    async def _update_memories(
        self, user_id: str, session_id: str,
        message: str, response: str,
    ):
        """异步更新记忆(后台执行)"""
        # ① 记录事件流(总是执行)
        await self.episodic.add_event(
            session_id, "user", message
        )
        await self.episodic.add_event(
            session_id, "assistant", response
        )
        
        # ② 条件触发事实抽取
        if self._should_extract(message):
            conversation = [
                {"role": "user", "content": message},
                {"role": "assistant", "content": response},
            ]
            result = await self.extractor.extract(conversation)
            if result.has_new_info:
                for fact in result.facts:
                    await self.semantic.store_fact_safe(user_id, fact)
    
    def _should_extract(self, message: str) -> bool:
        """规则预判:是否需要触发事实抽取"""
        triggers = [
            "我叫", "我是", "我的项目", "我们用", "我们选",
            "我喜欢", "我偏好", "我不想", "已经换成", "迁移到",
        ]
        return any(t in message for t in triggers)

各层的延迟预算:

阶段目标延迟是否阻塞用户说明
记忆检索< 50ms✅ 阻塞三路并行,取决于最慢那路
上下文组装< 5ms✅ 阻塞纯 CPU 操作
LLM 推理1-5s✅ 阻塞(流式)TTFT 通常 0.5-1s
事件记录< 10ms❌ 异步写数据库
事实抽取200-500ms❌ 异步调 LLM,后台执行
图谱更新100-300ms❌ 异步抽取 + 写入

💡 "同步检索、异步更新"是记忆系统的黄金法则——用户感知的延迟只有检索层的 50ms,记忆更新的 500ms 完全在后台完成。千万不要在返回响应之前做事实抽取和图谱更新,那会让每次回答多等半秒。

10.2 多用户隔离与权限控制

记忆系统是用户级别最敏感的数据——A 用户的记忆绝对不能泄漏给 B 用户。这不是"可选的安全特性",而是"出了事故就是 P0 级别的信任危机"。

多用户隔离的三层防护:

  ┌─────────────────────────────────────────────────────────┐
  │  Layer 1:数据层隔离(基础 ✓)                          │
  │  ──────────────────────────────────────                 │
  │  所有记忆表都带 user_id 字段                            │
  │  所有 SQL 查询都必须带 WHERE user_id = :uid             │
  │  建索引时 user_id 作为第一个字段                         │
  │                                                         │
  │  风险点:忘记加 WHERE 条件 → 全量泄漏                   │
  │  防护:用 PostgreSQL RLS(行级安全策略)兜底             │
  ├─────────────────────────────────────────────────────────┤
  │  Layer 2:应用层隔离(推荐 ✓)                          │
  │  ──────────────────────────────────────                 │
  │  记忆服务的每个方法都强制要求 user_id 参数               │
  │  不提供"查询所有用户记忆"的 API                         │
  │  日志中脱敏 user_id(只显示 hash 前 8 位)              │
  ├─────────────────────────────────────────────────────────┤
  │  Layer 3:网络层隔离(企业级)                          │
  │  ──────────────────────────────────────                 │
  │  不同租户的记忆存在不同的数据库实例                      │
  │  向量数据库按租户分 Collection / Namespace               │
  │  适用于 SaaS 产品的多租户场景                           │
  └─────────────────────────────────────────────────────────┘

用 PostgreSQL RLS(行级安全策略)做兜底隔离——即使代码写漏了 WHERE 条件,数据库层面也不会泄漏:

sql
-- 启用行级安全策略
ALTER TABLE semantic_memories ENABLE ROW LEVEL SECURITY;
ALTER TABLE events ENABLE ROW LEVEL SECURITY;
ALTER TABLE sessions ENABLE ROW LEVEL SECURITY;

-- 策略:只能访问自己的数据
-- 应用连接时通过 SET app.current_user_id 传递用户身份
CREATE POLICY user_isolation_policy ON semantic_memories
    USING (user_id = current_setting('app.current_user_id'));

CREATE POLICY user_isolation_policy ON events
    USING (session_id IN (
        SELECT id FROM sessions 
        WHERE user_id = current_setting('app.current_user_id')
    ));

CREATE POLICY user_isolation_policy ON sessions
    USING (user_id = current_setting('app.current_user_id'));

在应用层强制隔离的代码模式:

python
from contextlib import asynccontextmanager

@asynccontextmanager
async def user_scoped_db(db: AsyncSession, user_id: str):
    """设置当前请求的用户上下文(配合 RLS)"""
    await db.execute(
        text("SET app.current_user_id = :uid"),
        {"uid": user_id},
    )
    try:
        yield db
    finally:
        await db.execute(text("RESET app.current_user_id"))

# 使用方式
async def get_user_memories(db: AsyncSession, user_id: str):
    async with user_scoped_db(db, user_id) as scoped_db:
        # 即使这里忘记写 WHERE user_id = ...
        # RLS 也会自动过滤,只返回该用户的数据
        result = await scoped_db.execute(
            text("SELECT * FROM semantic_memories")
        )
        return result.fetchall()

不同部署模式下的隔离策略选择:

场景推荐隔离方式说明
单用户/个人项目无需特殊处理只有一个用户
多用户 SaaSRLS + 应用层隔离成本最低的生产级方案
企业多租户独立数据库实例数据物理隔离,合规要求最高
开放平台Namespace 隔离 + API Key向量库按 namespace,PG 按 schema

💡 RLS 是"保险丝",不是"主开关"——你的代码中仍然应该在每条 SQL 中显式写 WHERE user_id = :uid,RLS 只是防止你忘记写的时候出现数据泄漏。两道防线一起用,才能真正做到万无一失。

10.3 隐私合规:记忆数据的脱敏与生命周期管理

记忆系统天然会存储用户的个人信息——姓名、公司、技术偏好、项目细节。这在 GDPR / 个人信息保护法下是高风险数据处理行为,必须在设计阶段就考虑合规。

记忆数据的隐私风险矩阵:

  ┌──────────────────────────────────────────────────────┐
  │  高风险(必须处理)                                    │
  │  ├── 真实姓名、手机号、邮箱                           │
  │  ├── 公司名称、项目代号(商业机密)                   │
  │  ├── 密码、API Key(对话中可能出现)                  │
  │  └── 健康、财务等敏感类别信息                         │
  ├──────────────────────────────────────────────────────┤
  │  中风险(建议处理)                                    │
  │  ├── 技术栈偏好(可间接识别身份)                     │
  │  ├── 工作经历描述                                     │
  │  └── 具体的代码片段(可能含业务逻辑)                 │
  ├──────────────────────────────────────────────────────┤
  │  低风险(可直接存储)                                  │
  │  ├── 通用技术偏好("喜欢 Python")                   │
  │  ├── 抽象化的项目类型("电商后端")                   │
  │  └── 编程风格偏好                                     │
  └──────────────────────────────────────────────────────┘

关键合规要求及实现方案:

合规要求法规来源实现方案
知情同意GDPR Art.6 / 个保法 Art.13首次使用时弹窗告知"我们会记住你的偏好"
数据最小化GDPR Art.5事实抽取 Prompt 中明确"不提取敏感信息"
删除权GDPR Art.17提供"删除我的所有记忆" API
可携带权GDPR Art.20提供"导出我的记忆" API
存储期限个保法 Art.19设置记忆 TTL,到期自动清理

敏感信息过滤——在事实抽取阶段拦截:

python
import re

class SensitiveFilter:
    """敏感信息过滤器——在记忆写入前拦截"""
    
    # 正则匹配敏感信息
    PATTERNS = {
        "phone": re.compile(r"1[3-9]\d{9}"),
        "email": re.compile(r"\S+@\S+\.\S+"),
        "id_card": re.compile(r"\d{17}[\dXx]"),
        "api_key": re.compile(
            r"(sk-|ak_|secret_|password[=:])\S{8,}"
        ),
        "bank_card": re.compile(r"\d{16,19}"),
    }
    
    # 不应该存储的事实类别
    BLOCKED_CATEGORIES = {
        "health", "finance", "password", "credential",
    }
    
    def filter_fact(self, fact: "ExtractedFact") -> bool:
        """返回 True 表示安全,False 表示应拦截"""
        # 1. 类别黑名单
        if fact.category in self.BLOCKED_CATEGORIES:
            return False
        
        # 2. 正则检测敏感内容
        for name, pattern in self.PATTERNS.items():
            if pattern.search(fact.value):
                return False
            if pattern.search(fact.source_quote):
                return False
        
        return True
    
    def redact_text(self, text: str) -> str:
        """脱敏处理:将敏感信息替换为占位符"""
        for name, pattern in self.PATTERNS.items():
            text = pattern.sub(f"[已脱敏-{name}]", text)
        return text

用户删除权的实现——"忘记我"功能:

python
async def forget_user(
    db: AsyncSession, user_id: str,
    scope: str = "all",  # all / semantic / episodic / graph
):
    """删除用户的记忆数据(GDPR 删除权)"""
    if scope in ("all", "semantic"):
        await db.execute(text(
            "DELETE FROM semantic_memories WHERE user_id = :uid"
        ), {"uid": user_id})
    
    if scope in ("all", "episodic"):
        # 先删事件,再删会话(外键约束)
        await db.execute(text("""
            DELETE FROM events WHERE session_id IN (
                SELECT id FROM sessions WHERE user_id = :uid
            )
        """), {"uid": user_id})
        await db.execute(text(
            "DELETE FROM sessions WHERE user_id = :uid"
        ), {"uid": user_id})
    
    if scope in ("all", "graph"):
        await db.execute(text(
            "DELETE FROM user_graphs WHERE user_id = :uid"
        ), {"uid": user_id})
    
    await db.commit()
    
    # 记录审计日志(删除操作本身需要留痕)
    await db.execute(text("""
        INSERT INTO audit_logs (user_id, action, scope, created_at)
        VALUES (:uid, 'memory_deletion', :scope, NOW())
    """), {"uid": user_id, "scope": scope})
    await db.commit()

💡 在事实抽取的 Prompt 中就应该声明"不要提取手机号、邮箱、密码等敏感信息"——这是第一道防线。SensitiveFilter 是第二道防线(正则兜底)。两层过滤后写入的记忆,基本不会包含高风险个人信息。但一定要提供"删除我的记忆"功能——这不是可选的,是法律要求的。

10.4 性能优化:异步写入、批量嵌入、缓存预热

记忆系统的性能瓶颈集中在三个地方:Embedding 生成(每次 API 调用 30-80ms)、向量写入(含索引更新)、图记忆加载(从数据库反序列化)。逐一优化:

三大性能瓶颈及优化策略:

  ① Embedding 生成 ── 单次 30-80ms,高频场景成为瓶颈
     → 解法:批量嵌入 + 本地缓存

  ② 向量写入 ── 含索引更新,HNSW 重建耗时
     → 解法:异步队列 + 批量写入

  ③ 图记忆加载 ── 每次请求从 DB 加载 JSON 反序列化
     → 解法:用户级内存缓存 + 预热

优化一:批量 Embedding——减少 API 调用次数

python
from openai import AsyncOpenAI

class BatchEmbedder:
    """批量 Embedding 服务——合并多次请求为一次 API 调用"""
    
    def __init__(self, client: AsyncOpenAI, batch_size: int = 20):
        self.client = client
        self.batch_size = batch_size
        self._cache: dict[str, list[float]] = {}  # 简易缓存
    
    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """批量生成 Embedding(单次 API 调用)"""
        # 1. 检查缓存
        uncached = [t for t in texts if t not in self._cache]
        
        if uncached:
            # 2. 批量调用(最多 2048 条/次)
            for i in range(0, len(uncached), self.batch_size):
                batch = uncached[i:i + self.batch_size]
                response = await self.client.embeddings.create(
                    model="text-embedding-3-small",
                    input=batch,
                )
                for text_input, data in zip(batch, response.data):
                    self._cache[text_input] = data.embedding
        
        # 3. 从缓存返回
        return [self._cache[t] for t in texts]
    
    async def embed_single(self, text: str) -> list[float]:
        """单条 Embedding(优先走缓存)"""
        results = await self.embed_batch([text])
        return results[0]

# 使用:一次性嵌入 10 条事实,只调 1 次 API
embedder = BatchEmbedder(client)
facts = ["用户偏好 PG", "项目是电商", "团队 3 人", ...]
vectors = await embedder.embed_batch(facts)
# → 1 次 API 调用(而非 10 次),节省 90% 延迟

优化二:异步写入队列——记忆更新不阻塞对话

python
import asyncio
from collections import deque

class MemoryWriteQueue:
    """异步记忆写入队列——收集 + 批量刷写"""
    
    def __init__(
        self,
        memory_service: "SemanticMemoryService",
        embedder: BatchEmbedder,
        flush_interval: float = 2.0,  # 每 2 秒刷写一次
        max_batch: int = 20,           # 或积累 20 条就刷写
    ):
        self.memory = memory_service
        self.embedder = embedder
        self.flush_interval = flush_interval
        self.max_batch = max_batch
        self._queue: deque = deque()
        self._running = False
    
    async def enqueue(self, user_id: str, fact: "ExtractedFact"):
        """入队(立即返回,不阻塞)"""
        self._queue.append((user_id, fact))
        
        # 积累够了立即刷写
        if len(self._queue) >= self.max_batch:
            await self._flush()
    
    async def start(self):
        """启动后台刷写循环"""
        self._running = True
        while self._running:
            await asyncio.sleep(self.flush_interval)
            if self._queue:
                await self._flush()
    
    async def _flush(self):
        """批量刷写"""
        batch = []
        while self._queue and len(batch) < self.max_batch:
            batch.append(self._queue.popleft())
        
        if not batch:
            return
        
        # 批量生成 Embedding
        texts = [
            f"{f.category}: {f.key} = {f.value}"
            for _, f in batch
        ]
        vectors = await self.embedder.embed_batch(texts)
        
        # 批量写入数据库
        for (user_id, fact), vec in zip(batch, vectors):
            await self.memory.store_fact_with_vector(
                user_id, fact, vec
            )

优化三:图记忆缓存——避免每次请求都反序列化

python
from functools import lru_cache
import time

class GraphMemoryCache:
    """用户级图记忆缓存"""
    
    def __init__(self, db, ttl_seconds: int = 300):
        self.db = db
        self.ttl = ttl_seconds
        self._cache: dict[str, tuple[GraphMemory, float]] = {}
    
    async def get(self, user_id: str) -> "GraphMemory":
        """获取图记忆(优先走缓存)"""
        now = time.time()
        
        if user_id in self._cache:
            graph, cached_at = self._cache[user_id]
            if now - cached_at < self.ttl:
                return graph  # 缓存命中,< 0.01ms
        
        # 缓存未命中,从数据库加载
        graph = await load_graph(self.db, user_id)  # ~5-20ms
        self._cache[user_id] = (graph, now)
        return graph
    
    def invalidate(self, user_id: str):
        """图谱更新后使缓存失效"""
        self._cache.pop(user_id, None)

各优化手段的效果量化:

优化手段优化前优化后提升幅度
批量 Embedding10 条 × 50ms = 500ms1 次 × 60ms = 60ms8x
异步写入队列写入阻塞 200ms入队 < 1ms200x(用户感知)
图缓存每次加载 15ms缓存命中 0.01ms1500x
三项组合记忆更新总耗时 ~700ms用户感知 < 1ms对话体验零影响

💡 性能优化的优先级:异步化 > 批量化 > 缓存——先确保记忆更新不阻塞用户(异步化解决 90% 的体验问题),再用批量化降低 API 成本,最后用缓存优化高频读取路径。不要在记忆系统上过度优化——对话的主要延迟来自 LLM 推理(1-5s),记忆检索的 50ms 相比之下微不足道。

10.5 设计清单:记忆系统上线前的 Checklist

把前面所有章节的关键点浓缩为一份上线前检查清单——发版前逐项过一遍,确保没有遗漏:

═══════════════════════════════════════════════════════
  记忆系统上线 Checklist
═══════════════════════════════════════════════════════

  ─── 基础能力 ───

  □ 短期记忆:对话超过 N 轮后自动摘要压缩
  □ 短期记忆:Checkpointer 配置为持久化存储(PG/SQLite)
  □ Token 预算:各模块有明确的 Token 分配,输出预留 ≥ 4000
  □ System Prompt + 记忆 + 对话历史的总 Token < 模型上限的 80%

  ─── 长期记忆 ───

  □ 语义记忆:事实抽取 Prompt 明确且有示例
  □ 语义记忆:向量检索阈值已调优(推荐 0.75-0.85)
  □ 冲突解决:新旧事实冲突时有明确的处理策略
  □ 图记忆(如需要):三元组抽取 + NetworkX 存储可正常工作
  □ 情景记忆(如需要):sessions + events 表已建好索引

  ─── 数据安全 ───

  □ 多用户隔离:所有查询都带 user_id 过滤
  □ RLS 策略已启用(防止代码遗漏导致数据泄漏)
  □ 敏感信息过滤:手机号/邮箱/密码/API Key 不会被存入记忆
  □ 用户删除权:"忘记我" API 已实现并测试
  □ 审计日志:记忆的增删改都有日志可追溯

  ─── 性能 ───

  □ 记忆检索延迟 P99 < 100ms(向量搜索 + 图遍历)
  □ 记忆更新是异步的,不阻塞用户响应
  □ Embedding 生成用批量 API(而非逐条调用)
  □ 图记忆有用户级缓存(避免每次反序列化)

  ─── 质量保障 ───

  □ 记忆回归测试集:≥ 30 个 case,P0 全部通过
  □ 知识更新测试:用户改了偏好后,记忆能正确更新
  □ 拒绝回答测试:不编造记忆中不存在的信息
  □ 线上监控:命中率、过时率、检索延迟有看板和告警
  □ 过时记忆率告警阈值:> 5% 触发排查

  ─── 运维 ───

  □ 定时去重任务已配置(每天凌晨)
  □ 过期记忆清理任务已配置(按 category 差异化 TTL)
  □ 渐进式摘要(如需要):30 天以上的会话自动归档
  □ 数据库备份策略覆盖记忆相关的所有表
  □ 容量预估:单用户记忆上限 + 总存储预算已规划

═══════════════════════════════════════════════════════

按项目阶段的建议实施路径:

渐进式实施路线图:

  Phase 1:MVP(1-2 天)
  ├── 实现滑动窗口 + 对话摘要压缩(第 3 章)
  ├── 用 LangGraph Checkpointer 持久化会话状态
  └── 效果:单会话内的上下文连续性 ✓

  Phase 2:基础记忆(3-5 天)
  ├── 加入语义记忆:事实抽取 + pgvector 存储(第 5 章)
  ├── 加入 Mem0 或自建 SemanticMemoryService
  └── 效果:跨会话记住用户画像 ✓

  Phase 3:高级能力(1-2 周)
  ├── 加入图记忆:三元组抽取 + 混合检索(第 7 章)
  ├── 加入记忆管理:去重 + 遗忘 + 反思循环(第 6 章)
  ├── 加入情景记忆:事件流 + 会话回溯(第 4 章)
  └── 效果:关系推理 + 记忆自维护 ✓

  Phase 4:生产加固(持续)
  ├── 多用户隔离 + 隐私合规(第 10 章)
  ├── 性能优化:异步化 + 批量化 + 缓存
  ├── 质量保障:评测 + 监控 + A/B 测试(第 9 章)
  └── 效果:生产级可靠的记忆系统 ✓

💡 不要试图一步到位——Phase 1 只需要 1-2 天,就能让你的 Agent 从"金鱼记忆"升级到"至少能记住这次对话"。Phase 2 再花 3-5 天,就能实现跨会话的用户画像。大多数 Agent 到 Phase 2 就够用了,Phase 3-4 等业务需求驱动再加。

第 10 章核心知识回顾:

概念一句话解释
端到端架构同步检索(< 50ms)→ LLM 推理 → 异步更新(后台 500ms)
黄金法则同步检索、异步更新——记忆更新永远不阻塞用户响应
用户隔离RLS 兜底 + 应用层 WHERE user_id,双重防线
隐私合规Prompt 声明 + 正则过滤 + 删除权 API,三层防护
性能优化异步化 > 批量化 > 缓存,优先级从高到低
实施路径MVP(1天)→ 基础记忆(3天)→ 高级能力(1周)→ 生产加固

坚持是一种品格