AI Agent 记忆系统设计
从无状态到有记忆——短期记忆、长期记忆、情景记忆的工程实现:对话压缩、向量检索记忆、图记忆、Mem0/Letta/LangGraph 三大框架实战,让你的 Agent 真正"记住"用户。
1. 为什么 Agent 需要记忆系统
1.1 LLM 的"金鱼记忆":无状态的本质问题
如果你用过 ChatGPT,一定有过这样的体验:
第 1 次对话:
你:我叫张三,在北京做后端开发,主要用 Python
AI:你好张三!Python 后端开发是个很好的方向...
第 2 次对话(新开一个窗口):
你:帮我优化一下这段代码
AI:好的,请问你使用的是什么编程语言? ← 它忘了你是谁这不是 ChatGPT 的 Bug——这是 LLM 的本质特性。每一次 API 调用,模型都是"重新出生"的:
LLM 的每次调用都是独立的:
调用 1 ──→ [ LLM ] ──→ 回答 1
↑ 无状态
调用 2 ──→ [ LLM ] ──→ 回答 2
↑ 无状态
调用 3 ──→ [ LLM ] ──→ 回答 3
三次调用之间没有任何共享状态
模型不知道"上一次你问了什么"
每次都是全新的推理——彻底的金鱼记忆从技术角度看,LLM API 就是一个纯函数:
# LLM API 的本质:f(messages) → response
# 没有内部状态,没有副作用,没有记忆
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "帮我优化这段代码"},
],
# ← 这里没有 memory 参数
# ← 没有 user_id 参数
# ← 没有 session_id 参数
# 模型能看到的,只有这个 messages 列表
)所以,你在 ChatGPT 里感受到的"它记住了我说的话",并不是模型本身有记忆——是应用层把你之前的对话全部塞进了 messages 列表,每次都重新发送给模型。这就是所谓的"上下文窗口"方案。
但这个方案有严重的局限性——这正是下一节要讨论的问题。
1.2 上下文窗口 ≠ 记忆:塞满 Token 的代价
"既然模型没有记忆,那把所有历史对话都塞进 messages 不就行了?" 这就是最朴素的"上下文窗口"方案——也是大多数 AI 应用的第一版实现。
但它有三个致命代价:
上下文窗口方案的三大代价:
┌─────────────────────────────────────────────────────┐
│ 代价一:Token 成本指数级增长 │
│ │
│ 第 1 轮:发送 200 tokens → $0.003 │
│ 第 5 轮:发送 2,000 tokens → $0.030 │
│ 第 20 轮:发送 15,000 tokens → $0.225 │
│ 第 50 轮:发送 50,000 tokens → $0.750 │
│ │
│ 同一个用户聊 50 轮,仅 input 就花了 $1+ │
│ 日活 1 万用户 × 50 轮 = $10,000/天 ← 不可承受 │
├─────────────────────────────────────────────────────┤
│ 代价二:延迟随对话轮数线性增长 │
│ │
│ tokens 数量 → TTFT(首 Token 延迟) │
│ 2,000 tokens → ~0.8s │
│ 10,000 tokens → ~2.5s │
│ 50,000 tokens → ~8.0s │
│ 128,000 tokens → ~20s │
│ │
│ 聊得越久,用户等得越久——体验急剧恶化 │
├─────────────────────────────────────────────────────┤
│ 代价三:注意力稀释(Lost in the Middle) │
│ │
│ 上下文太长时,模型会"遗忘"中间部分的信息 │
│ 只关注开头(System Prompt)和末尾(最近几轮) │
│ 第 3 轮说的重要需求?到第 20 轮已经被模型忽略了 │
└─────────────────────────────────────────────────────┘用一组对比数据来量化这三个问题:
| 对话轮数 | Input Tokens | 单次费用 (GPT-4o) | TTFT 延迟 | 中间信息召回率 |
|---|---|---|---|---|
| 5 轮 | ~2,000 | $0.03 | 0.8s | ~95% |
| 20 轮 | ~15,000 | $0.23 | 3.5s | ~70% |
| 50 轮 | ~50,000 | $0.75 | 8.0s | ~45% |
| 100 轮 | ~120,000 | $1.80 | 20s+ | ~30% |
💡 "Lost in the Middle"是经过论文验证的现象——斯坦福 2023 年的研究表明,当上下文超过 ~4K tokens 时,LLM 对中间位置信息的准确率显著下降。简单地"把所有对话塞进去",不仅浪费钱,模型还记不住。
这三个代价共同指向一个结论:上下文窗口是"工作台",不是"记忆"。就像你的电脑桌面——你可以在上面同时打开 20 个文件,但那不意味着你"记住"了所有文件的内容。
上下文窗口 ←→ 工作台 / RAM(临时、有限、昂贵)
记忆系统 ←→ 笔记本 / 磁盘(持久、分层、可检索)
正确的做法不是把所有东西堆在桌面上
而是建立一个智能的"笔记系统"——
需要的时候翻出来,不需要的放回去1.3 有记忆 vs 无记忆 Agent:一个对话案例的对比
理论讲完了,用一个真实的对话场景来直观感受差距。
假设用户"李明"在使用一个 AI 编程助手,分 3 天进行了多次对话:
═══════════════════════════════════════════════════════════
第 1 天:李明初次使用
═══════════════════════════════════════════════════════════
李明:我在做一个电商后端项目,用 FastAPI + PostgreSQL
┌─── 无记忆 Agent ──────────────────────────────────────┐
│ "好的,FastAPI + PostgreSQL 是很好的技术栈..." │
└───────────────────────────────────────────────────────┘
┌─── 有记忆 Agent ──────────────────────────────────────┐
│ "好的,FastAPI + PostgreSQL 是很好的技术栈..." │
│ [记忆写入] 用户:李明,项目:电商后端,技术栈:FastAPI+PG │
└───────────────────────────────────────────────────────┘
═══════════════════════════════════════════════════════════
第 2 天:李明回来提了个新需求(全新会话)
═══════════════════════════════════════════════════════════
李明:帮我设计一个订单表
┌─── 无记忆 Agent ──────────────────────────────────────┐
│ "好的,请问你使用的是什么数据库?MySQL 还是 │
│ PostgreSQL?你的项目是什么类型的?" │
│ ← 完全不记得昨天的对话,一切从头开始 │
└───────────────────────────────────────────────────────┘
┌─── 有记忆 Agent ──────────────────────────────────────┐
│ "好的!基于你的 FastAPI + PostgreSQL 电商项目, │
│ 我来设计订单表。考虑到 PG 的优势,我会用 JSONB │
│ 来存储订单的扩展属性..." │
│ ← 自动召回项目上下文,直接给出精准方案 │
└───────────────────────────────────────────────────────┘
═══════════════════════════════════════════════════════════
第 3 天:李明遇到了性能问题(又一个全新会话)
═══════════════════════════════════════════════════════════
李明:订单查询特别慢,怎么优化?
┌─── 无记忆 Agent ──────────────────────────────────────┐
│ "请提供你的表结构、查询语句和数据库类型, │
│ 我才能帮你分析..." │
│ ← 需要用户重新提供所有背景信息 │
└───────────────────────────────────────────────────────┘
┌─── 有记忆 Agent ──────────────────────────────────────┐
│ "你的电商项目之前用 PG 设计了订单表(含 JSONB 字段)。 │
│ 查询慢大概率是这几个原因: │
│ 1. JSONB 字段没建 GIN 索引 │
│ 2. 订单表数据量大了需要分区 │
│ 来,把慢查询的 EXPLAIN 贴给我看看..." │
│ ← 结合历史记忆,直接定位到可能原因 │
└───────────────────────────────────────────────────────┘差距一目了然。有记忆的 Agent 带来了三个关键能力:
记忆系统为 Agent 带来的三大能力:
① 连续性 ─── 跨会话保持上下文
│ 不需要用户每次重新介绍自己和项目
│
② 个性化 ─── 积累用户画像
│ 了解用户的技术栈、偏好、项目背景
│ 回答越来越精准、越来越"懂你"
│
③ 学习能力 ── 从交互中成长
记住用户之前踩过的坑、做过的决定
避免重复建议、提供递进式帮助💡 记忆系统是 Agent 从"工具"进化为"助手"的分水岭——没有记忆的 Agent 永远是一个"每次见面都要重新自我介绍"的陌生人;有记忆的 Agent 才是一个真正了解你、持续成长的伙伴。
第 1 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| 无状态本质 | LLM API 是纯函数,每次调用独立,没有内置记忆 |
| 上下文窗口 | 把历史对话塞进 messages——成本高、延迟大、注意力稀释 |
| Lost in the Middle | 上下文过长时模型遗忘中间信息的现象 |
| 记忆的价值 | 连续性(跨会话)+ 个性化(用户画像)+ 学习能力 |
| 核心结论 | 上下文窗口 = RAM(临时),记忆系统 = 磁盘(持久可检索) |
2. 记忆系统的分类与架构总览
2.1 人类记忆的启示:短期 → 情景 → 语义
AI Agent 的记忆系统设计,灵感直接来自认知心理学对人类记忆的分类。理解人类记忆的工作方式,能帮你设计出更合理的 Agent 记忆架构。
心理学将人类记忆分为三种类型:
人类记忆的三层模型(Atkinson-Shiffrin):
┌─────────────────────────────────────────────────────────┐
│ 感觉记忆(Sensory Memory) │
│ ─── 持续时间:毫秒级 │
│ ─── 容量:大但转瞬即逝 │
│ ─── 功能:接收原始感觉输入(视觉、听觉) │
│ ─── 类比:Agent 收到的原始用户输入 │
└──────────────────────┬──────────────────────────────────┘
│ 注意力筛选
▼
┌─────────────────────────────────────────────────────────┐
│ 短期记忆 / 工作记忆(Working Memory) │
│ ─── 持续时间:秒到分钟 │
│ ─── 容量:7 ± 2 个信息块(Miller 定律) │
│ ─── 功能:当前正在处理的信息、推理的"工作台" │
│ ─── 类比:Agent 的上下文窗口(Context Window) │
└──────────────────────┬──────────────────────────────────┘
│ 编码 & 整合
▼
┌─────────────────────────────────────────────────────────┐
│ 长期记忆(Long-term Memory) │
│ ├── 情景记忆(Episodic)── "我经历了什么" │
│ │ └── 记住具体事件:时间、地点、上下文 │
│ │ │
│ └── 语义记忆(Semantic)── "我知道什么" │
│ └── 抽象知识:事实、规律、概念 │
│ │
│ ─── 持续时间:分钟到终身 │
│ ─── 容量:理论上无限 │
│ ─── 类比:Agent 的持久化存储(数据库 / 向量库) │
└─────────────────────────────────────────────────────────┘三种记忆的关键区别在于保持时间、容量和用途:
| 维度 | 工作记忆 | 情景记忆 | 语义记忆 |
|---|---|---|---|
| 持续时间 | 秒级 | 长期 | 长期 |
| 内容 | 当前正在处理的信息 | 具体经历和事件 | 提炼后的知识和事实 |
| 示例 | "用户刚才说要用 Python" | "上周三讨论了数据库选型" | "这个用户偏好 PostgreSQL" |
| 检索方式 | 直接可用(在上下文中) | 按时间/情境回忆 | 按语义关联检索 |
| 遗忘方式 | 被新信息挤出 | 随时间模糊但不消失 | 被新知识更新替代 |
💡 情景记忆回答"发生了什么",语义记忆回答"我知道什么"——你记得"上个月和同事讨论了技术选型"(情景),也记得"PostgreSQL 比 MySQL 更适合复杂查询"(语义)。前者是经历,后者是从经历中提炼出的知识。Agent 记忆系统要同时支持这两种能力。
2.2 Agent 记忆的三层架构:Working / Episodic / Semantic
把人类记忆的三层模型映射到 Agent 工程实现,就得到了当前业界标准的三层记忆架构。这里用一个类比来帮你理解——Agent 的记忆系统就像操作系统的存储层级:
Agent 记忆 ←→ 操作系统存储层级:
┌──────────────────────────────────────────────────────┐
│ Working Memory(工作记忆) │
│ ──────────────────────────────────────────────────── │
│ OS 类比: RAM(内存) │
│ 实现方式: LLM 的 Context Window │
│ 存储内容: 当前对话历史、System Prompt、临时变量 │
│ 生命周期: 单次会话内 │
│ 访问速度: 即时(已在上下文中) │
│ 容量限制: 受 Token 上限约束(128K / 200K) │
│ 管理策略: 滑动窗口、摘要压缩、trim_messages │
├──────────────────────────────────────────────────────┤
│ Episodic Memory(情景记忆) │
│ ──────────────────────────────────────────────────── │
│ OS 类比: 磁盘缓存 / 最近文件列表 │
│ 实现方式: 数据库(PostgreSQL / SQLite) │
│ 存储内容: 完整的历史会话记录、事件流、交互日志 │
│ 生命周期: 跨会话持久化 │
│ 访问速度: 需要检索(5-50ms) │
│ 容量限制: 存储空间充裕,但需要索引 │
│ 管理策略: 时间过滤、关键词搜索、语义搜索 │
├──────────────────────────────────────────────────────┤
│ Semantic Memory(语义记忆) │
│ ──────────────────────────────────────────────────── │
│ OS 类比: 持久存储 / 知识库 │
│ 实现方式: 向量数据库 + 知识图谱 │
│ 存储内容: 用户画像、提炼的事实、偏好、经验规律 │
│ 生命周期: 长期持久化(可更新、可遗忘) │
│ 访问速度: 需要向量检索(10-100ms) │
│ 容量限制: 理论无限,但需要清理和压缩 │
│ 管理策略: 事实抽取、冲突解决、时间衰减 │
└──────────────────────────────────────────────────────┘三层记忆如何协同工作?看一个完整的请求处理流程:
用户发送消息:"帮我优化订单查询的性能"
│
▼
① 检索 Semantic Memory(语义记忆)
│ → 查到:用户偏好 PostgreSQL、项目是电商系统
│ → 查到:之前推荐过 JSONB 存储方案
│
▼
② 检索 Episodic Memory(情景记忆)
│ → 查到:3 天前讨论过订单表设计(含具体 DDL)
│ → 查到:上周提到数据量约 500 万行
│
▼
③ 组装 Working Memory(工作记忆)
│ → System Prompt + 召回的记忆 + 最近 5 轮对话
│ → 注入到 Context Window,调用 LLM
│
▼
④ LLM 生成回答
│ → "基于你之前的 PG 订单表(JSONB 字段+500 万行)..."
│
▼
⑤ 记忆更新(后台异步)
→ Episodic:记录本次交互
→ Semantic:无新事实,跳过💡 三层记忆不是互相替代,而是协同互补——Working Memory 提供即时上下文,Episodic Memory 提供历史经历,Semantic Memory 提供提炼的知识。一个完整的记忆系统需要三者的有机配合。
2.3 记忆系统架构全景图:存储、检索、更新、遗忘
理解了三层记忆结构后,我们来看完整的记忆系统由哪些模块组成。一个生产级的 Agent 记忆系统包含四大核心能力:
记忆系统的四大核心模块:
┌───────────────────────────────────────────────────────┐
│ 用户请求进入 │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ ▼ │ │
│ │ ┌─────────────────┐ │ │
│ │ │ ① 记忆检索 │ │ │
│ │ │ (Retrieval) │ │ │
│ │ │ │ │ │
│ │ │ 语义搜索 │ │ │
│ │ │ 时间过滤 │ │ │
│ │ │ 图谱遍历 │ │ │
│ │ └────────┬────────┘ │ │
│ │ ▼ │ │
│ │ ┌─────────────────┐ │ │
│ │ │ LLM 推理 │ │ │
│ │ │ (带记忆上下文) │ │ │
│ │ └────────┬────────┘ │ │
│ │ ▼ │ │
│ │ ┌────────────────┼────────────────┐ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ② 记忆存储 ③ 记忆更新 ④ 记忆遗忘│ │
│ │ (Storage) (Update) (Forget) │ │
│ │ │ │
│ │ 原始交互归档 事实抽取 时间衰减 │ │
│ │ Embedding 入库 冲突合并 容量淘汰 │ │
│ │ 图谱节点写入 偏好更新 主动删除 │ │
│ └───────────────────────────────────────┘ │
└───────────────────────────────────────────────────────┘四大模块各自的职责和技术选型:
| 模块 | 职责 | 输入 | 输出 | 典型技术 |
|---|---|---|---|---|
| ① 检索 | 找到与当前请求相关的历史记忆 | 用户 query + user_id | 记忆片段列表 | 向量搜索、BM25、图查询 |
| ② 存储 | 将新的交互和知识持久化 | 对话记录、抽取的事实 | 写入数据库 | PG、Redis、Milvus、Neo4j |
| ③ 更新 | 处理新旧知识冲突,保持记忆一致性 | 新事实 vs 旧事实 | 更新/合并操作 | LLM 判断 + CRUD |
| ④ 遗忘 | 清理过时、冗余、不重要的记忆 | 全量记忆 + 衰减规则 | 删除/归档操作 | TTL、LRU、衰减函数 |
不同技术栈在三层记忆中的分工:
技术栈与记忆层级的映射:
Working Memory
├── LangGraph Checkpointer(会话状态持久化)
├── Redis(高速状态缓存)
└── trim_messages / RemoveMessage(上下文裁剪)
Episodic Memory
├── PostgreSQL / SQLite(结构化事件流存储)
├── Elasticsearch(全文搜索历史对话)
└── LangGraph Store(跨线程持久化)
Semantic Memory
├── pgvector / Milvus / Qdrant(向量检索)
├── Neo4j / NetworkX(知识图谱)
├── Mem0(一站式记忆中间件)
└── Letta(虚拟上下文管理)💡 不需要一步到位——大多数 Agent 应用从 Working Memory(第 3 章)开始就够用了。当你发现用户反复被要求"重新介绍自己"时,再加 Semantic Memory(第 5 章)。当你需要回溯"上次做了什么决定"时,再加 Episodic Memory(第 4 章)。渐进式地构建记忆系统,而不是一开始就搭建全套架构。
第 2 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| 三层记忆 | Working(当前上下文)+ Episodic(历史经历)+ Semantic(提炼知识) |
| OS 类比 | Working = RAM,Episodic = 磁盘缓存,Semantic = 持久存储 |
| 四大模块 | 存储、检索、更新、遗忘——记忆系统的完整生命周期 |
| 协同流程 | 先检索语义→再检索情景→组装上下文→LLM 推理→异步更新记忆 |
| 渐进式构建 | 从 Working Memory 开始,按需逐步添加 Episodic 和 Semantic |
3. 短期记忆(Working Memory):管理上下文窗口
短期记忆是最基础、也是必须最先解决的记忆层。它的核心问题只有一个:如何在有限的上下文窗口里,保留最有价值的信息。
3.1 最简方案:滑动窗口与消息裁剪
最直观的想法:对话太长了?删掉前面的旧消息,只保留最近 N 轮。这就是"滑动窗口"策略。
滑动窗口的工作方式:
对话进行到第 10 轮,窗口大小 = 5:
┌──────────────────────────────────────┐
│ 消息 1 ← 已丢弃 │
│ 消息 2 ← 已丢弃 │
│ 消息 3 ← 已丢弃 │
│ 消息 4 ← 已丢弃 │
│ 消息 5 ← 已丢弃 │
├──────────────────────────────────────┤
│ 消息 6 ← 保留 ┐ │
│ 消息 7 ← 保留 │ │
│ 消息 8 ← 保留 │ 发送给 LLM │
│ 消息 9 ← 保留 │ │
│ 消息 10 ← 保留 ┘ │
└──────────────────────────────────────┘
简单粗暴,但会丢失早期的重要信息有三种裁剪策略,复杂度递增:
策略一:按轮数裁剪(最简单)
def trim_by_turns(messages: list[dict], max_turns: int = 10) -> list[dict]:
"""保留最近 N 轮对话(1 轮 = 1 个 user + 1 个 assistant)"""
# 始终保留 system message
system_msgs = [m for m in messages if m["role"] == "system"]
other_msgs = [m for m in messages if m["role"] != "system"]
# 每轮 2 条消息(user + assistant),保留最近 max_turns 轮
keep_count = max_turns * 2
trimmed = other_msgs[-keep_count:] if len(other_msgs) > keep_count else other_msgs
return system_msgs + trimmed
# 使用
messages = trim_by_turns(messages, max_turns=10)
response = client.chat.completions.create(model="gpt-4o", messages=messages)策略二:按 Token 数裁剪(更精确)
import tiktoken
def trim_by_tokens(
messages: list[dict],
max_tokens: int = 8000,
model: str = "gpt-4o",
) -> list[dict]:
"""按 Token 数裁剪,从旧到新删除消息直到不超限"""
encoder = tiktoken.encoding_for_model(model)
# system message 始终保留
system_msgs = [m for m in messages if m["role"] == "system"]
other_msgs = [m for m in messages if m["role"] != "system"]
# 计算 system message 占用的 token
system_tokens = sum(len(encoder.encode(m["content"])) for m in system_msgs)
available_tokens = max_tokens - system_tokens
# 从后往前累计,直到超出预算
kept = []
current_tokens = 0
for msg in reversed(other_msgs):
msg_tokens = len(encoder.encode(msg["content"])) + 4 # 消息头开销
if current_tokens + msg_tokens > available_tokens:
break
kept.insert(0, msg)
current_tokens += msg_tokens
return system_msgs + kept
# 使用:保留最多 8000 tokens 的对话历史
messages = trim_by_tokens(messages, max_tokens=8000)策略三:用 LangChain trim_messages(推荐)
from langchain_core.messages import trim_messages, SystemMessage, HumanMessage
import tiktoken
# LangChain 内置的消息裁剪工具
trimmed = trim_messages(
messages,
max_tokens=8000,
strategy="last", # 保留最近的消息
token_counter=tiktoken.encoding_for_model("gpt-4o").encode,
include_system=True, # 始终保留 system message
start_on="human", # 裁剪后确保第一条是 human 消息
)三种策略的对比:
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 按轮数 | 实现最简单 | 长消息可能超限、短消息浪费空间 | 原型验证 |
| 按 Token | 精确控制上下文大小 | 需要 tiktoken,计算有开销 | 生产环境 |
| trim_messages | 开箱即用、处理边界情况 | 依赖 LangChain | LangChain/LangGraph 项目 |
💡 滑动窗口的致命缺陷是"硬删除"——第 1 轮说的"我叫张三,做电商项目"在第 20 轮就被丢弃了。用户会疑惑"我明明说过了你怎么不记得"。解决这个问题需要下一节的"对话摘要压缩"。
3.2 对话摘要压缩:用 LLM 总结历史对话
滑动窗口的问题是"丢了就丢了"——被删除的消息再也找不回来。对话摘要压缩解决了这个问题:把旧消息"压缩"成一段摘要,保留关键信息但大幅减少 Token 数。
对话摘要压缩的工作流程:
原始对话(20 轮,约 8000 tokens):
┌──────────────────────────────────────┐
│ 消息 1-15(旧消息,约 6000 tokens) │
│ │
│ ↓ 用 LLM 压缩为摘要 │
│ │
│ 摘要(约 300 tokens): │
│ "用户李明,后端开发,做电商项目, │
│ 技术栈 FastAPI+PG,已讨论过订单表 │
│ 设计和 JSONB 字段方案。" │
├──────────────────────────────────────┤
│ 消息 16-20(最近 5 轮,约 2000 tokens)│
└──────────────────────────────────────┘
压缩比:8000 → 2300 tokens(节省 71%)
关键信息:全部保留 ✓核心实现:
from openai import AsyncOpenAI
SUMMARY_PROMPT = """请将以下对话历史压缩为一段简洁的摘要。
要求:
1. 保留所有关键信息:用户身份、项目背景、技术选型、已做的决定
2. 保留具体的数据和数字(表名、字段名、配置值等)
3. 按时间顺序组织,标注每个信息的来源轮次
4. 用第三人称描述("用户提到..."、"已决定...")
5. 控制在 200-400 字以内
对话历史:
{conversation}
请输出摘要:"""
class ConversationSummaryManager:
"""对话摘要管理器"""
def __init__(self, client: AsyncOpenAI, model: str = "gpt-4o-mini"):
self.client = client
self.model = model # 摘要用小模型即可,省钱
self.summary: str = ""
async def maybe_compress(
self,
messages: list[dict],
max_tokens: int = 4000,
keep_recent: int = 6,
) -> list[dict]:
"""当消息超过阈值时,触发摘要压缩"""
# 估算当前 tokens(粗略:1 个中文字符 ≈ 2 tokens)
total_chars = sum(len(m["content"]) for m in messages)
estimated_tokens = total_chars * 2
if estimated_tokens <= max_tokens:
return messages # 未超限,无需压缩
# 分离 system + 旧消息 + 最近消息
system_msgs = [m for m in messages if m["role"] == "system"]
other_msgs = [m for m in messages if m["role"] != "system"]
old_msgs = other_msgs[:-keep_recent]
recent_msgs = other_msgs[-keep_recent:]
if not old_msgs:
return messages # 没有可压缩的旧消息
# 生成摘要
self.summary = await self._summarize(old_msgs)
# 组装新的消息列表:system + 摘要 + 最近消息
summary_msg = {
"role": "system",
"content": f"[对话历史摘要]\n{self.summary}",
}
return system_msgs + [summary_msg] + recent_msgs
async def _summarize(self, messages: list[dict]) -> str:
"""调用 LLM 生成摘要"""
conversation = "\n".join(
f"{m['role']}: {m['content']}" for m in messages
)
response = await self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": SUMMARY_PROMPT.format(conversation=conversation),
}],
temperature=0,
max_tokens=500,
)
return response.choices[0].message.content使用示例:
# 在对话循环中使用
summary_manager = ConversationSummaryManager(client)
async def chat(user_message: str, messages: list[dict]) -> str:
messages.append({"role": "user", "content": user_message})
# 自动判断是否需要压缩
compressed = await summary_manager.maybe_compress(messages)
response = await client.chat.completions.create(
model="gpt-4o",
messages=compressed,
)
reply = response.choices[0].message.content
messages.append({"role": "assistant", "content": reply})
return reply摘要压缩的触发时机有两种策略:
| 触发策略 | 实现方式 | 优点 | 缺点 |
|---|---|---|---|
| Token 阈值 | 消息总 Token 超过阈值时触发 | 精确控制上下文大小 | 需要 Token 计数 |
| 轮数阈值 | 每隔 N 轮触发一次 | 实现简单,开销可预测 | Token 控制不够精确 |
💡 摘要用
gpt-4o-mini而非gpt-4o——摘要任务不需要复杂推理,小模型完全胜任。一次摘要的成本约 $0.0003,远低于重复发送 6000 tokens 旧消息的成本 $0.09。摘要一次,省钱 300 次。
3.3 LangGraph 实战:Checkpointer + 自定义摘要节点
前面的代码是"手搓"实现。在 LangGraph 中,短期记忆管理有更优雅的方式——利用 Checkpointer 自动持久化状态 + 自定义节点做摘要压缩。
LangGraph 短期记忆架构:
用户消息
│
▼
┌─────────────────────┐
│ conversation 节点 │ ← Checkpointer 自动恢复上次状态
│ ─────────────────── │
│ 加载 messages │
│ 加载 summary │
│ 调用 LLM │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ should_summarize? │ ← 条件判断:消息数 > 阈值?
│ ─────────────────── │
│ YES → summarize │
│ NO → END │
└──────────┬──────────┘
│ YES
▼
┌─────────────────────┐
│ summarize 节点 │
│ ─────────────────── │
│ 压缩旧消息为摘要 │
│ 删除旧消息 │
│ 更新 summary 状态 │
└──────────┬──────────┘
│
▼
Checkpointer 自动保存状态 → END完整代码实现:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import (
SystemMessage, HumanMessage, AIMessage, RemoveMessage,
trim_messages,
)
from langchain_openai import ChatOpenAI
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
# 1. 定义状态——包含 messages 和 summary 两个字段
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # 对话历史(自动追加)
summary: str # 累积摘要
# 2. 初始化模型
llm = ChatOpenAI(model="gpt-4o", temperature=0)
summary_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) # 摘要用小模型
# 3. 对话节点:加载摘要 + 调用 LLM
async def conversation(state: AgentState) -> dict:
summary = state.get("summary", "")
# 如果有历史摘要,注入到 system message 中
messages = state["messages"]
if summary:
system_with_summary = (
f"你是一个智能助手。\n\n"
f"以下是之前对话的摘要,供你参考:\n{summary}"
)
messages = [SystemMessage(content=system_with_summary)] + messages
response = await llm.ainvoke(messages)
return {"messages": [response]}
# 4. 摘要节点:压缩旧消息 + 删除
async def summarize(state: AgentState) -> dict:
summary = state.get("summary", "")
messages = state["messages"]
# 构造摘要 Prompt
if summary:
prompt = (
f"请更新以下对话摘要,加入新的对话内容。\n\n"
f"已有摘要:\n{summary}\n\n"
f"新对话:\n"
)
else:
prompt = "请将以下对话压缩为摘要,保留所有关键信息:\n\n"
# 取出需要压缩的旧消息(保留最近 2 轮)
old_messages = messages[:-4] # 保留最近 4 条(2 轮)
prompt += "\n".join(
f"{m.type}: {m.content}" for m in old_messages
)
response = await summary_llm.ainvoke([HumanMessage(content=prompt)])
# 删除旧消息(用 RemoveMessage 标记删除)
delete_messages = [RemoveMessage(id=m.id) for m in old_messages]
return {
"summary": response.content,
"messages": delete_messages,
}
# 5. 条件判断:是否需要触发摘要
def should_summarize(state: AgentState) -> str:
messages = state["messages"]
# 消息数超过 10 条(5 轮)时触发摘要
if len(messages) > 10:
return "summarize"
return END
# 6. 组装 Graph
graph = StateGraph(AgentState)
graph.add_node("conversation", conversation)
graph.add_node("summarize", summarize)
graph.add_edge(START, "conversation")
graph.add_conditional_edges("conversation", should_summarize)
graph.add_edge("summarize", END)
# 7. 加上 Checkpointer(状态自动持久化)
checkpointer = MemorySaver() # 生产环境用 PostgresSaver
app = graph.compile(checkpointer=checkpointer)
# 8. 使用:同一个 thread_id 的对话自动恢复状态
config = {"configurable": {"thread_id": "user-zhangsan-001"}}
result = await app.ainvoke(
{"messages": [HumanMessage(content="我叫张三,做电商后端")]},
config=config,
)
# → 第 1 轮,Checkpointer 保存状态
result = await app.ainvoke(
{"messages": [HumanMessage(content="帮我设计订单表")]},
config=config,
)
# → 第 2 轮,自动恢复上次的 messages + summary💡 Checkpointer 是 LangGraph 短期记忆的核心——它在每个节点执行后自动保存完整的 Graph 状态(messages + summary + 任何自定义字段)。同一个
thread_id的后续请求会自动恢复上次的状态。生产环境把MemorySaver换成PostgresSaver或SqliteSaver即可。
3.4 Token 预算管理:动态分配上下文空间
当 Agent 变得复杂——有 System Prompt、记忆摘要、RAG 检索结果、工具调用结果、对话历史——上下文窗口就成了稀缺资源,需要精细化的"预算管理"。
核心思路是把上下文窗口看作一个固定预算,各模块按优先级分配份额:
Token 预算分配示例(以 GPT-4o 128K 为例,实际使用 32K):
总预算:32,000 tokens
┌─────────────────────────────────────────────┐
│ ① System Prompt ── 2,000(固定) │
│ ② 记忆摘要 ── 1,000(动态) │
│ ③ 长期记忆检索结果 ── 2,000(动态) │
│ ④ 对话历史 ── 20,000(弹性) │
│ ⑤ 当前用户输入 ── 3,000(实际值) │
│ ⑥ 预留输出空间 ── 4,000(固定) │
└─────────────────────────────────────────────┘
分配优先级(高 → 低):
① System Prompt(不可压缩,必须完整保留)
⑥ 输出空间(必须预留,否则回答被截断)
⑤ 当前输入(用户刚发的消息,不能删)
② 记忆摘要(高价值信息密度)
③ 检索结果(按相关度排序,可截断)
④ 对话历史(最低优先级,超限时压缩或裁剪)代码实现:
from dataclasses import dataclass
@dataclass
class TokenBudget:
"""Token 预算分配器"""
total: int = 32000
system_prompt: int = 2000 # 固定
output_reserved: int = 4000 # 固定预留
memory_summary: int = 1000 # 上限
retrieval_results: int = 2000 # 上限
@property
def conversation_budget(self) -> int:
"""对话历史的可用预算 = 总预算 - 其他模块"""
used = (
self.system_prompt
+ self.output_reserved
+ self.memory_summary
+ self.retrieval_results
)
return self.total - used
def allocate(self, current_input_tokens: int) -> dict:
"""根据当前输入动态分配预算"""
conv_budget = self.conversation_budget - current_input_tokens
return {
"system_prompt": self.system_prompt,
"memory_summary": self.memory_summary,
"retrieval": self.retrieval_results,
"conversation_history": max(conv_budget, 0),
"current_input": current_input_tokens,
"output_reserved": self.output_reserved,
}
# 使用
budget = TokenBudget(total=32000)
allocation = budget.allocate(current_input_tokens=500)
# → conversation_history: 22500(有大量空间给对话历史)
allocation = budget.allocate(current_input_tokens=5000)
# → conversation_history: 18000(用户输入变长,对话历史空间被压缩)💡 永远为输出预留足够空间——最常见的新手错误是把 Token 全分给了 input,结果
max_tokens不够用,回答被截断。建议预留 4000-8000 tokens 给输出。另外,不要盲目使用 128K 全部容量——实际上用到 32K 以上时,"Lost in the Middle" 问题会让模型质量显著下降。
第 3 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| 滑动窗口 | 保留最近 N 轮对话,简单但会丢失早期信息 |
| 对话摘要 | 用小模型压缩旧消息为摘要,Token 节省 70%+ |
| LangGraph Checkpointer | 自动持久化 Graph 状态,同一 thread_id 恢复上下文 |
| RemoveMessage | LangGraph 的消息删除机制,配合摘要节点使用 |
| Token 预算 | 按优先级分配上下文空间:System > 输出 > 输入 > 记忆 > 对话历史 |
4. 情景记忆(Episodic Memory):记住"发生过什么"
短期记忆解决了"当前会话"的上下文连续性。但当用户关闭窗口、第二天再来,短期记忆就清空了。情景记忆解决的是跨会话的经历回溯——"上次我们讨论了什么"。
4.1 什么是情景记忆:从"知道什么"到"经历过什么"
情景记忆和语义记忆容易混淆。用一个类比来区分:
你去医院看病:
情景记忆 ─── "3 月 15 日去了协和医院,挂的消化内科,
张医生说可能是胃炎,开了奥美拉唑,
让我两周后复查"
语义记忆 ─── "我有胃炎"
"我在吃奥美拉唑"
情景记忆保留了完整的场景和时间线
语义记忆只保留了提炼后的事实对于 Agent 而言:
| 维度 | 情景记忆 | 语义记忆 |
|---|---|---|
| 存储内容 | 完整的对话记录、事件流 | 提炼后的事实和偏好 |
| 回答的问题 | "上次我们讨论了什么?" | "这个用户喜欢什么?" |
| 数据格式 | 带时间戳的完整交互日志 | key-value 事实对 / 向量 |
| 检索方式 | 按时间范围 + 关键词 | 按语义相似度 |
| 存储量 | 大(每次交互都记录) | 小(只存提炼结果) |
| 典型用途 | 回溯决策过程、审计、上下文恢复 | 个性化推荐、偏好记忆 |
什么时候需要情景记忆?
需要情景记忆的场景:
✅ "上次我们讨论的订单表方案是什么?"
→ 需要回溯到具体那次对话的完整内容
✅ "之前你建议我用 JSONB,为什么?"
→ 需要找到当时的推理过程和上下文
✅ "我记得上周我改过一次密码策略..."
→ 需要按时间检索历史操作
✅ 多步骤任务的断点恢复
→ "昨天做到哪一步了?继续"
不需要情景记忆的场景:
❌ "我叫什么名字?"
→ 语义记忆就够了(事实查询)
❌ "我喜欢用什么编程语言?"
→ 语义记忆就够了(偏好查询)💡 情景记忆是"事件的录像",语义记忆是"录像中提炼的笔记"——两者互补而非替代。第 5 章会介绍语义记忆,本章聚焦于"如何高效存储和检索历史交互事件"。
4.2 事件流设计:结构化存储每次交互
情景记忆的存储核心是一个事件流——按时间顺序记录每次交互的完整信息。推荐用两张表来组织:
情景记忆的数据模型:
┌─────────────────────────────────────────────┐
│ sessions(会话表) │
│ ─────────────────────────────────────────── │
│ 一个 session = 一次连续的对话 │
│ │
│ id UUID │
│ user_id VARCHAR ← 用户隔离 │
│ title VARCHAR ← 会话标题 │
│ summary TEXT ← 会话摘要 │
│ started_at TIMESTAMPTZ │
│ ended_at TIMESTAMPTZ │
│ metadata JSONB ← 扩展字段 │
└──────────────────┬──────────────────────────┘
│ 1:N
▼
┌─────────────────────────────────────────────┐
│ events(事件表) │
│ ─────────────────────────────────────────── │
│ 一个 event = 一轮交互内的一条消息/动作 │
│ │
│ id UUID │
│ session_id UUID (FK) ← 归属的会话 │
│ event_type VARCHAR ← user/assistant/ │
│ tool_call/error │
│ content TEXT ← 消息内容 │
│ token_count INTEGER ← Token 消耗 │
│ created_at TIMESTAMPTZ │
│ metadata JSONB ← 模型、延迟等 │
└─────────────────────────────────────────────┘SQL 建表:
-- 会话表
CREATE TABLE sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id VARCHAR(100) NOT NULL,
title VARCHAR(200),
summary TEXT,
started_at TIMESTAMPTZ DEFAULT NOW(),
ended_at TIMESTAMPTZ,
metadata JSONB DEFAULT '{}',
-- 索引:按用户 + 时间查询
CONSTRAINT idx_sessions_user_time
UNIQUE (user_id, started_at)
);
CREATE INDEX idx_sessions_user ON sessions(user_id);
-- 事件表
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(id),
event_type VARCHAR(20) NOT NULL, -- user/assistant/tool_call/error
content TEXT NOT NULL,
token_count INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
metadata JSONB DEFAULT '{}',
-- 全文搜索索引(用于关键词检索)
search_vector tsvector GENERATED ALWAYS AS (
to_tsvector('simple', content)
) STORED
);
CREATE INDEX idx_events_session ON events(session_id, created_at);
CREATE INDEX idx_events_search ON events USING GIN(search_vector);Python ORM 模型和写入服务:
from sqlalchemy import Column, String, Text, Integer, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime
import uuid
class Session(Base):
__tablename__ = "sessions"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
user_id = Column(String(100), nullable=False, index=True)
title = Column(String(200))
summary = Column(Text)
started_at = Column(DateTime(timezone=True), default=datetime.utcnow)
ended_at = Column(DateTime(timezone=True))
metadata_ = Column("metadata", JSONB, default={})
class Event(Base):
__tablename__ = "events"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
session_id = Column(UUID, ForeignKey("sessions.id"), nullable=False)
event_type = Column(String(20), nullable=False)
content = Column(Text, nullable=False)
token_count = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
metadata_ = Column("metadata", JSONB, default={})
class EpisodicMemoryService:
"""情景记忆服务:记录和检索历史交互"""
def __init__(self, db: AsyncSession):
self.db = db
async def create_session(self, user_id: str, title: str = "") -> str:
"""创建新会话"""
session = Session(user_id=user_id, title=title)
self.db.add(session)
await self.db.commit()
return str(session.id)
async def add_event(
self, session_id: str, event_type: str,
content: str, token_count: int = 0, **metadata
):
"""记录一条交互事件"""
event = Event(
session_id=session_id,
event_type=event_type,
content=content,
token_count=token_count,
metadata_=metadata,
)
self.db.add(event)
await self.db.commit()💡
metadataJSONB 字段是可扩展性的关键——你可以往里塞模型名称、响应延迟、用户满意度评分、工具调用参数等任意结构化数据,不需要改表结构。这在后期做分析和调试时非常有价值。
4.3 会话检索策略:时间过滤 + 关键词 + 语义搜索
情景记忆的核心挑战不在存储(数据库能存),而在检索——当用户有几百次历史会话时,怎么快速找到最相关的那几条?
三种检索策略,按复杂度递增:
检索策略组合:
用户说:"上次讨论的数据库方案"
│
▼
① 时间过滤 ─── 最近 30 天的会话(粗筛)
│ 结果:50 个会话
▼
② 关键词搜索 ── 包含"数据库"的事件(精筛)
│ 结果:8 个会话
▼
③ 语义排序 ─── 按与"数据库方案"的语义相似度排序
结果:Top 3 最相关的会话策略一:时间过滤
from sqlalchemy import text
from datetime import datetime, timedelta
async def search_by_time(
db: AsyncSession, user_id: str,
days: int = 30, limit: int = 20,
) -> list[dict]:
"""按时间范围检索最近的会话"""
since = datetime.utcnow() - timedelta(days=days)
result = await db.execute(text("""
SELECT s.id, s.title, s.summary, s.started_at,
COUNT(e.id) as event_count
FROM sessions s
LEFT JOIN events e ON e.session_id = s.id
WHERE s.user_id = :user_id
AND s.started_at > :since
GROUP BY s.id
ORDER BY s.started_at DESC
LIMIT :limit
"""), {"user_id": user_id, "since": since, "limit": limit})
return [dict(row._mapping) for row in result.fetchall()]策略二:全文关键词搜索(PG tsvector)
async def search_by_keyword(
db: AsyncSession, user_id: str,
keyword: str, limit: int = 10,
) -> list[dict]:
"""用 PostgreSQL 全文检索搜索历史事件"""
result = await db.execute(text("""
SELECT e.content, e.event_type, e.created_at,
s.title as session_title,
ts_rank(e.search_vector, query) as rank
FROM events e
JOIN sessions s ON s.id = e.session_id,
plainto_tsquery('simple', :keyword) query
WHERE s.user_id = :user_id
AND e.search_vector @@ query
ORDER BY rank DESC, e.created_at DESC
LIMIT :limit
"""), {"user_id": user_id, "keyword": keyword, "limit": limit})
return [dict(row._mapping) for row in result.fetchall()]策略三:语义搜索(需要 Embedding)
async def search_by_semantic(
db: AsyncSession, user_id: str,
query: str, embedding_service, limit: int = 5,
) -> list[dict]:
"""用向量相似度搜索语义相关的历史事件"""
# 需要 events 表额外有 embedding 列
query_vec = await embedding_service.embed(query)
result = await db.execute(text("""
SELECT e.content, e.event_type, e.created_at,
s.title as session_title,
1 - (e.embedding <=> :vec) as similarity
FROM events e
JOIN sessions s ON s.id = e.session_id
WHERE s.user_id = :user_id
AND 1 - (e.embedding <=> :vec) > 0.75
ORDER BY e.embedding <=> :vec
LIMIT :limit
"""), {"user_id": user_id, "vec": str(query_vec), "limit": limit})
return [dict(row._mapping) for row in result.fetchall()]三种策略的权衡:
| 策略 | 延迟 | 准确度 | 额外成本 | 推荐场景 |
|---|---|---|---|---|
| 时间过滤 | <5ms | 低(只按时间) | 无 | 初始粗筛,必选 |
| 关键词搜索 | <10ms | 中(精确匹配) | 无(PG 内置) | 用户提到具体名词时 |
| 语义搜索 | <50ms | 高(理解语义) | 需要 Embedding | 用户描述模糊时 |
💡 推荐组合使用:时间过滤 + 关键词搜索是性价比最高的方案——零额外成本,覆盖 80% 的情景记忆检索需求。只有当你发现用户经常用模糊描述("之前那个方案")且关键词搜索找不到时,再引入语义搜索。
4.4 实战:构建可回溯的多轮对话系统
把前面的存储和检索整合起来,构建一个完整的"带情景记忆的对话系统":
完整数据流:
用户发消息 → "上次的订单表方案还需要调整"
│
▼
① 情景记忆检索
│ → search_by_keyword("订单表")
│ → 找到 3 天前的会话,包含完整的表设计讨论
│
▼
② 组装上下文
│ → System Prompt
│ → [相关历史] 3天前讨论的订单表设计(DDL + 设计理由)
│ → 当前会话的最近 5 轮对话
│ → 当前用户输入
│
▼
③ 调用 LLM → 生成回答
│
▼
④ 记录事件(异步)
→ 记录 user 消息和 assistant 回答到 events 表核心代码——把情景记忆注入到 Agent 对话中:
class EpisodicChatAgent:
"""带情景记忆的对话 Agent"""
def __init__(
self,
client: AsyncOpenAI,
episodic_memory: EpisodicMemoryService,
):
self.client = client
self.memory = episodic_memory
async def chat(
self, user_id: str, session_id: str,
message: str,
) -> str:
# 1. 检索相关的历史会话
relevant_history = await self.memory.search_by_keyword(
user_id=user_id, keyword=message, limit=3
)
# 2. 格式化历史记忆
history_context = ""
if relevant_history:
history_context = "\n\n[相关历史对话]\n"
for item in relevant_history:
history_context += (
f"- {item['created_at']:%m-%d} "
f"({item['session_title']}): "
f"{item['content'][:200]}\n"
)
# 3. 组装消息
messages = [
{"role": "system", "content": (
"你是一个智能助手,拥有与用户的历史对话记忆。"
"请参考历史记录来提供连续、个性化的帮助。"
f"{history_context}"
)},
{"role": "user", "content": message},
]
# 4. 调用 LLM
response = await self.client.chat.completions.create(
model="gpt-4o", messages=messages, temperature=0,
)
reply = response.choices[0].message.content
# 5. 记录本次交互(异步,不阻塞响应)
await self.memory.add_event(
session_id=session_id,
event_type="user",
content=message,
)
await self.memory.add_event(
session_id=session_id,
event_type="assistant",
content=reply,
)
return reply💡 情景记忆的写入应该异步化——记录事件不应该阻塞用户等待回答。用
asyncio.create_task()或消息队列把写入操作放到后台。同理,会话结束时生成summary也应该异步进行。
第 4 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| 情景 vs 语义 | 情景 = "发生了什么"(事件流),语义 = "知道什么"(事实) |
| 数据模型 | sessions + events 两张表,JSONB metadata 保证可扩展性 |
| 三种检索 | 时间过滤(粗筛)→ 关键词搜索(精筛)→ 语义搜索(兜底) |
| 推荐方案 | 时间 + 关键词组合覆盖 80% 场景,零额外成本 |
| 异步写入 | 事件记录不阻塞响应,后台异步写入数据库 |
5. 语义记忆(Semantic Memory):提炼持久化知识
情景记忆回答的是"发生过什么",但用户经常问的是更直接的问题:"你还记得我喜欢什么吗?" "我们之前决定用什么技术栈来着?" 这些问题需要的不是完整的事件回放,而是从大量交互中提炼出的事实和知识——这就是语义记忆的职责。
如果说情景记忆是"录像",语义记忆就是"笔记"——更紧凑、更结构化、检索更快。
5.1 语义记忆 vs RAG 知识库:有什么不同
"语义记忆不就是 RAG 吗?都是向量检索嘛。" 这是最常见的误解。两者确实在技术实现上有重叠(都用 Embedding + 向量搜索),但在数据来源、更新方式和用途上完全不同:
语义记忆 vs RAG 知识库——核心区别:
┌───────────────────────────────────────────────────────┐
│ RAG 知识库 │
│ ──────────────────────────────────────────────────── │
│ 数据来源: 预先准备的文档(PDF、网页、API 文档) │
│ 更新频率: 低(手动导入、定期刷新) │
│ 内容类型: 客观知识(\"Python 3.12 支持泛型语法\") │
│ 作用域: 全局(所有用户共享同一个知识库) │
│ 生命周期: 由运维人员管理 │
│ │
│ 典型查询: \"FastAPI 怎么配置 CORS?\" │
│ → 从文档中检索答案,跟用户是谁无关 │
├───────────────────────────────────────────────────────┤
│ 语义记忆 │
│ ──────────────────────────────────────────────────── │
│ 数据来源: 从用户交互中自动提炼 │
│ 更新频率: 高(每次对话后可能更新) │
│ 内容类型: 主观知识(\"这个用户偏好 PostgreSQL\") │
│ 作用域: 用户级(每个用户独立) │
│ 生命周期: 自动管理(提炼、更新、遗忘) │
│ │
│ 典型查询: \"这个用户的技术栈是什么?\" │
│ → 从该用户的历史交互中提炼的 facts │
└───────────────────────────────────────────────────────┘用一个更直观的对比:
| 维度 | RAG 知识库 | 语义记忆 |
|---|---|---|
| 比喻 | 图书馆 | 你的私人笔记本 |
| 数据来源 | 预先导入的文档 | 从对话中自动提炼 |
| 内容示例 | "PostgreSQL JSONB 支持 GIN 索引" | "用户李明偏好 PostgreSQL" |
| 更新触发 | 手动重新索引 | 每次对话后自动判断 |
| 用户隔离 | 通常不隔离 | 严格按用户隔离 |
| 数据量 | 大(百万级文档块) | 小(每用户几十到几百条事实) |
| 典型实现 | LangChain Retriever + Chroma | Mem0 / 自建事实存储 |
两者可以(也应该)共存于同一个 Agent 中:
Agent 的双重知识来源:
用户提问:\"帮我的电商项目选一个消息队列\"
│
├──→ RAG 知识库检索
│ → \"RabbitMQ vs Kafka vs Redis Streams 的对比...\"
│ → 客观技术知识,跟用户是谁无关
│
└──→ 语义记忆检索
→ \"用户李明:技术栈 Python/FastAPI,偏好轻量方案,
团队 3 人,项目阶段:MVP\"
→ 用户特有的上下文,决定推荐哪个方案
两者结合 → \"考虑到你的 FastAPI 技术栈和 3 人小团队,
建议先用 Redis Streams,MVP 阶段够用且运维简单\"💡 RAG 提供「世界知识」,语义记忆提供「用户知识」——前者让 Agent 博学,后者让 Agent 懂你。两者不是替代关系,而是正交互补。如果你的 Agent 已经有了 RAG,加上语义记忆能让回答从"通用正确"升级到"个性化精准"。
5.2 事实抽取:用 LLM 从对话中提炼结构化知识
语义记忆的核心问题是:对话是非结构化的自然语言,怎么从中提取出结构化的事实? 答案是——用 LLM 做事实抽取。
事实抽取的工作流程:
对话原文:
┌──────────────────────────────────────────────┐
│ User:我叫李明,最近在做一个电商后端 │
│ AI: 你好李明!用什么技术栈呢? │
│ User:FastAPI + PostgreSQL,部署在阿里云 │
│ AI: 好的搭配... │
│ User:团队就我和另一个前端,两个人 │
└──────────────┬───────────────────────────────┘
│ LLM 事实抽取
▼
┌──────────────────────────────────────────────┐
│ 提取的结构化事实: │
│ │
│ ① 用户姓名 = 李明 │
│ ② 项目类型 = 电商后端 │
│ ③ 后端框架 = FastAPI │
│ ④ 数据库 = PostgreSQL │
│ ⑤ 部署环境 = 阿里云 │
│ ⑥ 团队规模 = 2 人(1 后端 + 1 前端) │
└──────────────────────────────────────────────┘核心实现——用结构化输出(Structured Output)做事实抽取:
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
# 1. 定义事实的数据结构
class ExtractedFact(BaseModel):
"""从对话中提取的单条事实"""
category: str = Field(
description="事实类别:personal/project/preference/technical/decision"
)
key: str = Field(
description="事实的键,如 'name', 'tech_stack', 'database'"
)
value: str = Field(
description="事实的值,如 'PostgreSQL'"
)
confidence: float = Field(
default=0.9,
description="置信度 0-1,用户明确说的=1.0,推测的=0.5-0.8"
)
source_quote: str = Field(
description="原文引用,用于溯源"
)
class ExtractionResult(BaseModel):
"""一次对话的事实抽取结果"""
facts: list[ExtractedFact] = Field(default_factory=list)
has_new_info: bool = Field(
description="本次对话是否包含值得记忆的新信息"
)
# 2. 事实抽取 Prompt
EXTRACTION_PROMPT = """请从以下对话中提取值得长期记忆的事实信息。
提取规则:
1. 只提取用户明确提到或强烈暗示的事实,不要过度推测
2. 分类包括:personal(个人信息)、project(项目信息)、
preference(偏好)、technical(技术选型)、decision(已做决定)
3. 忽略临时性的、一次性的信息(如"帮我看看这个报错")
4. 每条事实要附带原文引用(source_quote)
对话内容:
{conversation}
已有的记忆(避免重复提取):
{existing_facts}"""
# 3. 事实抽取服务
class FactExtractor:
"""从对话中抽取结构化事实"""
def __init__(self, client: AsyncOpenAI, model: str = "gpt-4o-mini"):
self.client = client
self.model = model # 抽取用小模型即可
async def extract(
self,
conversation: list[dict],
existing_facts: list[dict] | None = None,
) -> ExtractionResult:
"""从对话中抽取结构化事实"""
# 格式化对话
conv_text = "\n".join(
f"{m['role']}: {m['content']}" for m in conversation
)
# 格式化已有事实(避免重复)
facts_text = "无" if not existing_facts else "\n".join(
f"- {f['key']}: {f['value']}" for f in existing_facts
)
# 调用 LLM(使用结构化输出)
response = await self.client.beta.chat.completions.parse(
model=self.model,
messages=[{
"role": "user",
"content": EXTRACTION_PROMPT.format(
conversation=conv_text,
existing_facts=facts_text,
),
}],
response_format=ExtractionResult,
temperature=0,
)
return response.choices[0].message.parsed使用示例:
extractor = FactExtractor(client)
# 模拟一段对话
conversation = [
{"role": "user", "content": "我叫李明,最近在做一个电商后端"},
{"role": "assistant", "content": "你好李明!用什么技术栈呢?"},
{"role": "user", "content": "FastAPI + PostgreSQL,部署在阿里云"},
]
result = await extractor.extract(conversation)
# 输出:
# result.has_new_info = True
# result.facts = [
# ExtractedFact(category="personal", key="name", value="李明",
# confidence=1.0, source_quote="我叫李明"),
# ExtractedFact(category="project", key="type", value="电商后端",
# confidence=1.0, source_quote="做一个电商后端"),
# ExtractedFact(category="technical", key="backend_framework",
# value="FastAPI", confidence=1.0,
# source_quote="FastAPI + PostgreSQL"),
# ExtractedFact(category="technical", key="database",
# value="PostgreSQL", confidence=1.0,
# source_quote="FastAPI + PostgreSQL"),
# ExtractedFact(category="technical", key="deployment",
# value="阿里云", confidence=1.0,
# source_quote="部署在阿里云"),
# ]事实抽取的触发时机——不是每条消息都需要抽取:
事实抽取的触发策略:
方案一:每轮对话后触发(简单但浪费)
→ 用户说"帮我看看这段代码的 bug" → 没有新事实,白抽取了
→ 额外成本:每次 $0.0002,日活 1 万 × 50 轮 = $100/天 ← 太贵
方案二:先判断再抽取(推荐)✅
→ 先用简单规则判断"本轮对话是否可能包含新事实"
→ 只在有新信息时才触发 LLM 抽取
→ 成本降低 70-80%
判断规则(免 LLM,纯规则):
├── 对话中出现自我介绍类关键词(我叫/我是/我的项目...)
├── 对话中出现技术选型关键词(用的是/选了/决定用...)
├── 对话中出现偏好表达(喜欢/偏好/习惯/不想用...)
└── 对话轮数 > 3 且是新会话的前 5 轮(自我介绍阶段)💡 事实抽取用
gpt-4o-mini+ 结构化输出就够了——这个任务不需要复杂推理,关键是 Prompt 中的规则要写清楚。结构化输出(response_format)保证了返回值一定是合法的 JSON,省去了手动解析和容错的麻烦。单次抽取成本约 $0.0002,比不做记忆导致的重复沟通成本低得多。
5.3 向量检索记忆:Embedding 存储与相似度召回
事实提取出来了,怎么存?怎么检索?最直接的方案是向量数据库——把每条事实转成 Embedding 向量,通过语义相似度来召回相关记忆。
向量检索记忆的完整流程:
① 写入阶段(对话结束后异步执行)
┌──────────────────────────────────────────────┐
│ 提取的事实:\"用户偏好 PostgreSQL\" │
│ │ │
│ ▼ │
│ Embedding API ──→ [0.023, -0.87, 0.41, ...] │
│ │ 1536 维向量 │
│ ▼ │
│ 写入向量数据库(pgvector / Milvus / Qdrant) │
│ ┌─────────────────────────────────────────┐ │
│ │ id | user_id | text | embedding │ │
│ │ 1 | u001 | 偏好PG | [0.02,...] │ │
│ │ 2 | u001 | 做电商后端 | [-0.1,...] │ │
│ └─────────────────────────────────────────┘ │
└──────────────────────────────────────────────┘
② 检索阶段(用户发送新消息时)
┌──────────────────────────────────────────────┐
│ 用户说:\"帮我选一个数据库方案\" │
│ │ │
│ ▼ │
│ Embedding API ──→ query_vector │
│ │ │
│ ▼ │
│ 向量相似度搜索(余弦距离 < 阈值) │
│ │ │
│ ▼ │
│ 召回结果: │
│ ├── \"用户偏好 PostgreSQL\"(相似度 0.89) │
│ ├── \"项目类型:电商后端\"(相似度 0.82) │
│ └── \"部署环境:阿里云\"(相似度 0.76) │
└──────────────────────────────────────────────┘数据模型设计(pgvector 方案):
推荐用 PostgreSQL + pgvector 扩展——不需要额外部署向量数据库,一个 PG 实例搞定关系数据 + 向量搜索:
-- 启用 pgvector 扩展
CREATE EXTENSION IF NOT EXISTS vector;
-- 语义记忆表
CREATE TABLE semantic_memories (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id VARCHAR(100) NOT NULL,
-- 事实内容
category VARCHAR(50) NOT NULL, -- personal/project/preference/...
fact_key VARCHAR(100) NOT NULL, -- name, database, framework...
fact_value TEXT NOT NULL, -- PostgreSQL, FastAPI, ...
-- 向量字段(text-embedding-3-small 输出 1536 维)
embedding vector(1536),
-- 元数据
confidence FLOAT DEFAULT 0.9, -- 置信度
source_quote TEXT, -- 原文引用
access_count INTEGER DEFAULT 0, -- 访问次数(用于淘汰)
-- 时间戳
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
last_accessed_at TIMESTAMPTZ DEFAULT NOW(),
-- 唯一约束:同一用户的同一 key 只有一条
CONSTRAINT uq_user_fact UNIQUE (user_id, category, fact_key)
);
-- 向量索引(HNSW,检索更快)
CREATE INDEX idx_memories_embedding
ON semantic_memories USING hnsw (embedding vector_cosine_ops);
-- 用户索引
CREATE INDEX idx_memories_user ON semantic_memories(user_id);Python 存储与检索服务:
from openai import AsyncOpenAI
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
class SemanticMemoryService:
"""语义记忆服务:存储和检索用户事实"""
def __init__(self, db: AsyncSession, client: AsyncOpenAI):
self.db = db
self.client = client
async def _embed(self, text_input: str) -> list[float]:
"""生成 Embedding 向量"""
response = await self.client.embeddings.create(
model="text-embedding-3-small",
input=text_input,
)
return response.data[0].embedding
async def store_fact(self, user_id: str, fact: "ExtractedFact"):
"""存储一条事实(upsert:存在则更新,不存在则插入)"""
# 用 category + key + value 拼接成文本做 Embedding
fact_text = f"{fact.category}: {fact.key} = {fact.value}"
embedding = await self._embed(fact_text)
await self.db.execute(text("""
INSERT INTO semantic_memories
(user_id, category, fact_key, fact_value,
embedding, confidence, source_quote)
VALUES
(:user_id, :category, :key, :value,
:embedding, :confidence, :source_quote)
ON CONFLICT (user_id, category, fact_key) DO UPDATE SET
fact_value = :value,
embedding = :embedding,
confidence = :confidence,
source_quote = :source_quote,
updated_at = NOW()
"""), {
"user_id": user_id,
"category": fact.category,
"key": fact.key,
"value": fact.value,
"embedding": str(embedding),
"confidence": fact.confidence,
"source_quote": fact.source_quote,
})
await self.db.commit()
async def recall(
self, user_id: str, query: str,
limit: int = 5, min_similarity: float = 0.7,
) -> list[dict]:
"""根据语义相似度召回相关记忆"""
query_vec = await self._embed(query)
result = await self.db.execute(text("""
SELECT fact_key, fact_value, category, confidence,
1 - (embedding <=> :vec) as similarity
FROM semantic_memories
WHERE user_id = :user_id
AND 1 - (embedding <=> :vec) > :min_sim
ORDER BY embedding <=> :vec
LIMIT :limit
"""), {
"user_id": user_id,
"vec": str(query_vec),
"min_sim": min_similarity,
"limit": limit,
})
# 更新访问计数和时间
rows = [dict(r._mapping) for r in result.fetchall()]
if rows:
keys = [r["fact_key"] for r in rows]
await self.db.execute(text("""
UPDATE semantic_memories
SET access_count = access_count + 1,
last_accessed_at = NOW()
WHERE user_id = :user_id AND fact_key = ANY(:keys)
"""), {"user_id": user_id, "keys": keys})
await self.db.commit()
return rows在 Agent 对话中集成语义记忆:
async def chat_with_memory(
user_id: str, message: str,
memory: SemanticMemoryService,
client: AsyncOpenAI,
) -> str:
# 1. 召回相关记忆
memories = await memory.recall(user_id, message, limit=5)
# 2. 格式化为上下文
memory_context = ""
if memories:
memory_context = "\n\n[用户画像 - 来自历史记忆]\n"
for m in memories:
memory_context += f"- {m['fact_key']}: {m['fact_value']}\n"
# 3. 调用 LLM
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": (
"你是一个智能助手,拥有用户的长期记忆。"
"请利用记忆提供个性化的帮助。"
f"{memory_context}"
)},
{"role": "user", "content": message},
],
)
return response.choices[0].message.contentEmbedding 模型的选型对比:
| 模型 | 维度 | 每百万 Token | 中文效果 | 推荐场景 |
|---|---|---|---|---|
| text-embedding-3-small | 1536 | $0.02 | 良好 | 成本敏感型(推荐起步) |
| text-embedding-3-large | 3072 | $0.13 | 优秀 | 高精度需求 |
| BGE-M3 | 1024 | 免费(本地) | 优秀 | 私有化部署、中文优先 |
| Cohere embed-v3 | 1024 | $0.10 | 良好 | 多语言场景 |
💡 语义记忆的数据量通常很小(每用户几十到几百条),pgvector 完全够用——不需要专门部署 Milvus 或 Qdrant。只有当单用户记忆超过 10 万条、且需要毫秒级检索时,才考虑专用向量数据库。用 PG 统一存储关系数据和向量数据,运维成本最低。
5.4 知识更新与冲突解决:用户改了偏好怎么办
语义记忆不是"写入后就不管了"——用户的偏好、技术栈、项目状态都会变化。如果不处理更新,记忆就会变成"过时的错误信息",比没有记忆更糟糕。
冲突场景示例:
3 月:用户说 \"我用 MySQL\"
→ 记忆写入:database = MySQL ✓
4 月:用户说 \"我们迁移到 PostgreSQL 了\"
→ 新事实:database = PostgreSQL
→ 旧记忆:database = MySQL
→ 冲突!需要决定:更新?追加?还是两条都保留?
如果不处理冲突:
→ Agent 下次回答:\"基于你的 MySQL 数据库...\" ← 错误!知识更新有三种场景,需要不同的处理策略:
三种更新场景:
① 覆盖更新(最常见)
├── 旧值:database = MySQL
├── 新值:database = PostgreSQL
└── 策略:直接更新(用户明确说\"换了/迁移了\")
② 追加更新
├── 旧值:framework = FastAPI
├── 新值:framework = Flask(另一个项目)
└── 策略:追加新条目(同一 key 的不同上下文)
③ 细化更新
├── 旧值:level = 后端开发
├── 新值:level = 高级后端开发(3 年经验)
└── 策略:用新值替换旧值(新值更精确)核心实现——用 LLM 判断冲突类型并决定更新策略:
from pydantic import BaseModel, Field
from enum import Enum
class ConflictAction(str, Enum):
UPDATE = "update" # 覆盖旧值
APPEND = "append" # 追加新条目
IGNORE = "ignore" # 忽略(新信息不可靠)
MERGE = "merge" # 合并(新旧信息互补)
class ConflictResolution(BaseModel):
"""冲突解决结果"""
action: ConflictAction
resolved_value: str = Field(description="解决后的值")
reason: str = Field(description="决策理由")
CONFLICT_PROMPT = """请判断以下两条关于同一用户的记忆是否存在冲突,
并决定如何处理。
已有记忆:
- 键:{key}
- 值:{old_value}
- 记录时间:{old_time}
新提取的事实:
- 值:{new_value}
- 原文:{source_quote}
判断规则:
1. 如果新值明确替代旧值(用户说"换了""迁移了""不再用了"),选 UPDATE
2. 如果新值是旧值的补充(不同项目、不同场景),选 APPEND
3. 如果新值比旧值更精确(旧值是新值的子集),选 MERGE
4. 如果新值不可靠(置信度低、可能是误解),选 IGNORE
请输出处理决策:"""
class MemoryConflictResolver:
"""记忆冲突解决器"""
def __init__(self, client: AsyncOpenAI):
self.client = client
async def resolve(
self,
key: str,
old_value: str,
old_time: str,
new_value: str,
source_quote: str,
) -> ConflictResolution:
"""判断冲突类型并给出解决方案"""
response = await self.client.beta.chat.completions.parse(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": CONFLICT_PROMPT.format(
key=key,
old_value=old_value,
old_time=old_time,
new_value=new_value,
source_quote=source_quote,
),
}],
response_format=ConflictResolution,
temperature=0,
)
return response.choices[0].message.parsed把冲突解决集成到存储流程中:
class SemanticMemoryWithConflict(SemanticMemoryService):
"""带冲突解决的语义记忆服务"""
def __init__(self, db, client):
super().__init__(db, client)
self.resolver = MemoryConflictResolver(client)
async def store_fact_safe(self, user_id: str, fact: "ExtractedFact"):
"""安全地存储事实——检测并处理冲突"""
# 1. 查找是否已有同 key 的记忆
existing = await self.db.execute(text("""
SELECT fact_value, created_at
FROM semantic_memories
WHERE user_id = :uid AND category = :cat AND fact_key = :key
"""), {"uid": user_id, "cat": fact.category, "key": fact.key})
row = existing.fetchone()
if row is None:
# 无冲突,直接写入
await self.store_fact(user_id, fact)
return
old_value = row.fact_value
if old_value == fact.value:
return # 值相同,跳过
# 2. 有冲突,调用 LLM 决策
resolution = await self.resolver.resolve(
key=fact.key,
old_value=old_value,
old_time=str(row.created_at),
new_value=fact.value,
source_quote=fact.source_quote,
)
# 3. 执行决策
match resolution.action:
case ConflictAction.UPDATE:
fact.value = resolution.resolved_value
await self.store_fact(user_id, fact) # upsert 覆盖
case ConflictAction.APPEND:
# 修改 key 为带后缀的新 key
fact.key = f"{fact.key}_alt_{hash(fact.value) % 1000}"
await self.store_fact(user_id, fact)
case ConflictAction.MERGE:
fact.value = resolution.resolved_value
await self.store_fact(user_id, fact)
case ConflictAction.IGNORE:
pass # 不做任何操作冲突解决的性能优化——不是每次都需要调用 LLM:
冲突解决的分层策略(减少 LLM 调用):
新事实进入
│
▼
① 规则层(零成本)
├── 值完全相同 → 跳过
├── 新值是旧值的超集 → 直接 UPDATE
└── 时间差 < 1 小时 → 直接 UPDATE(同一会话内的修正)
│
▼ 规则层无法判断
② LLM 判断层
└── 调用 gpt-4o-mini 做冲突分析
→ 约 $0.0001/次,只在真正有冲突时触发💡 冲突解决的核心原则是"新信息优先"——在绝大多数场景下,用户最新说的就是正确的。只有在新信息置信度明显低于旧信息(比如 AI 推测的 vs 用户明确说的)时,才保留旧值。过度保守的冲突策略会导致记忆"更新不了",比"更新错了"更糟糕。
第 5 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| 语义 vs RAG | RAG = 全局文档知识,语义记忆 = 用户级个性化知识 |
| 事实抽取 | 用 LLM 结构化输出从对话中提取 key-value 事实 |
| 向量检索 | pgvector 存储 Embedding,余弦相似度召回相关记忆 |
| 冲突解决 | 规则层快速判断 + LLM 兜底,新信息优先原则 |
| 成本控制 | 先规则判断是否需要抽取/解决冲突,减少 70%+ 的 LLM 调用 |
6. 记忆压缩与整理:防止记忆膨胀
前两章分别介绍了情景记忆和语义记忆的存储方案。但如果只管"存"不管"理",记忆系统会像一个从不整理的衣柜——塞满了东西却找不到你要的那件。本章聚焦记忆的生命周期管理:压缩、合并、去重、遗忘。
6.1 记忆膨胀问题:存得越多 ≠ 记得越好
一个活跃用户每天对话 20 轮,每轮产生约 5 条事件记录。不做任何清理的话:
记忆膨胀的量化分析:
┌──────────────────────────────────────────────────┐
│ 假设:日活 1000 用户,每人每天 20 轮对话 │
│ │
│ Episodic Memory(情景记忆) │
│ ─────────────────────────────── │
│ 每天新增事件:1000 × 20 × 2 = 40,000 条 │
│ 每月: 120 万条 │
│ 一年: 1,440 万条 │
│ │
│ Semantic Memory(语义记忆) │
│ ──────────────────────────── │
│ 每天新增事实:1000 × 3 = 3,000 条(假设每天 3 条)│
│ 每月: 9 万条 │
│ 一年: 108 万条 │
│ 含重复/过时/矛盾的比例:约 30-50% │
├──────────────────────────────────────────────────┤
│ 不清理的后果: │
│ │
│ ① 检索变慢 ─── 向量搜索在百万级数据上从 10ms │
│ 退化到 200ms+ │
│ │
│ ② 噪声增多 ─── 旧的错误记忆被召回,干扰回答 │
│ 用户:\"我已经换成 React 了\" │
│ 记忆里还有 5 条关于 Vue 的旧记录 │
│ → 召回时旧记忆数量压倒新记忆 │
│ │
│ ③ 成本飙升 ─── 向量存储按量计费 │
│ 百万条 × 1536 维 × 4 字节 ≈ 6 GB 向量数据 │
│ 加上索引和元数据,存储成本不可忽视 │
└──────────────────────────────────────────────────┘记忆管理的核心原则——像人类一样"有选择地遗忘":
人类记忆 vs Agent 记忆的管理类比:
人类大脑每天做的事:
├── 睡眠时整理白天的记忆(压缩、归档)
├── 重要的强化(反复回忆的越记越牢)
├── 不重要的衰减(几天没想起的逐渐模糊)
└── 矛盾的覆盖(新认知替代旧认知)
Agent 记忆系统应该做的:
├── 定期压缩(渐进式摘要) → 6.2
├── 重复的合并(同类记忆去重) → 6.3
├── 过时的淘汰(时间衰减 + 访问频率)→ 6.4
└── 主动反思(让 Agent 自己整理) → 6.5💡 记忆管理的目标不是"存得最多",而是"召回最准"——100 条精准的记忆胜过 10,000 条含噪声的记忆。好的记忆系统应该像一个优秀的秘书:记住重要的事,忘掉琐碎的事,更新过时的事。
6.2 压缩策略一:渐进式摘要(Progressive Summarization)
渐进式摘要的思路是分层压缩——原始对话 → 会话摘要 → 周期摘要,每层的信息密度更高、数据量更小:
渐进式摘要的三层压缩:
Layer 0:原始事件流(Episodic Memory)
┌───────────────────────────────────────────┐
│ 20 条消息,约 4000 tokens │
│ "我叫李明..." "用FastAPI..." "订单表..." │
└──────────────────┬────────────────────────┘
│ 会话结束时压缩
▼
Layer 1:会话摘要(Session Summary)
┌───────────────────────────────────────────┐
│ 1 段摘要,约 200 tokens │
│ "用户李明讨论了电商项目的订单表设计, │
│ 决定用 JSONB 存储扩展属性" │
└──────────────────┬────────────────────────┘
│ 每周/每月压缩
▼
Layer 2:周期摘要(Period Summary)
┌───────────────────────────────────────────┐
│ 1 段摘要,约 100 tokens │
│ "4 月第 1 周:完成订单模块设计和性能优化" │
└───────────────────────────────────────────┘
压缩比:4000 → 200 → 100 tokens
信息保留率:100% → 30% → 10%
但关键决策和事实的保留率:~95%Layer 0 → Layer 1:会话结束时自动生成摘要
SESSION_SUMMARY_PROMPT = """请为以下对话生成一段简洁摘要。
要求:
1. 保留所有关键决策和结论(不是讨论过程)
2. 保留具体的技术术语、数值、配置名
3. 用第三人称描述("用户...")
4. 控制在 100-200 字以内
对话内容:
{conversation}"""
async def summarize_session(
client: AsyncOpenAI,
events: list[dict],
) -> str:
"""会话结束时生成摘要"""
conversation = "\n".join(
f"{e['event_type']}: {e['content']}" for e in events
)
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": SESSION_SUMMARY_PROMPT.format(
conversation=conversation
),
}],
temperature=0,
max_tokens=300,
)
return response.choices[0].message.contentLayer 1 → Layer 2:定期归档老旧的会话摘要
PERIOD_SUMMARY_PROMPT = """请将以下多个会话摘要合并为一段周期性总结。
要求:
1. 按主题(而非时间)组织信息
2. 合并重复内容,保留最终结论
3. 标注关键的里程碑和决策
4. 控制在 200 字以内
会话摘要列表:
{summaries}"""
async def archive_old_sessions(
db: AsyncSession,
client: AsyncOpenAI,
user_id: str,
older_than_days: int = 30,
):
"""归档 N 天前的会话摘要为周期性总结"""
cutoff = datetime.utcnow() - timedelta(days=older_than_days)
# 1. 查出所有需要归档的会话摘要
result = await db.execute(text("""
SELECT id, summary, started_at
FROM sessions
WHERE user_id = :uid
AND started_at < :cutoff
AND archived = false
AND summary IS NOT NULL
ORDER BY started_at
"""), {"uid": user_id, "cutoff": cutoff})
sessions = result.fetchall()
if len(sessions) < 3:
return # 太少了,不值得压缩
# 2. 合并为周期摘要
summaries_text = "\n".join(
f"[{s.started_at:%m-%d}] {s.summary}" for s in sessions
)
period_summary = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": PERIOD_SUMMARY_PROMPT.format(
summaries=summaries_text
),
}],
temperature=0,
)
# 3. 写入周期摘要表
await db.execute(text("""
INSERT INTO period_summaries (user_id, period_start, period_end, summary)
VALUES (:uid, :start, :end, :summary)
"""), {
"uid": user_id,
"start": sessions[0].started_at,
"end": sessions[-1].started_at,
"summary": period_summary.choices[0].message.content,
})
# 4. 标记原始会话为已归档(可选:删除原始事件)
session_ids = [s.id for s in sessions]
await db.execute(text("""
UPDATE sessions SET archived = true
WHERE id = ANY(:ids)
"""), {"ids": session_ids})
await db.commit()💡 渐进式摘要的核心是"每层只保留上层 10-30% 的信息"——但这 10-30% 是最关键的决策和事实。原始事件流可以在归档后删除或移入冷存储,只保留摘要在热存储中。这样既节省了存储成本,又保证了检索质量。
6.3 压缩策略二:记忆合并与去重
渐进式摘要针对的是情景记忆(事件流)。语义记忆(事实库)的膨胀问题不太一样——不是"太长",而是"太重复"。
语义记忆的重复问题示例:
同一个用户的记忆库里可能出现:
┌─────────────────────────────────────────────┐
│ #1 技术栈 = FastAPI (3月10日) │
│ #2 后端框架 = FastAPI (3月15日) │ ← 重复!
│ #3 Web框架 = FastAPI + Uvicorn (3月20日) │ ← 重复但更详细
│ #4 数据库 = PostgreSQL (3月10日) │
│ #5 DB选型 = PG (4月1日) │ ← 重复!
└─────────────────────────────────────────────┘
问题:5 条记忆中有 3 条是重复的
→ 检索时浪费 Top-K 名额
→ 不同措辞可能导致不一致的召回结果去重的核心思路:用向量相似度发现候选重复对,再用 LLM 判断是否合并。
async def deduplicate_memories(
db: AsyncSession,
client: AsyncOpenAI,
user_id: str,
similarity_threshold: float = 0.85,
):
"""检测并合并重复的语义记忆"""
# 1. 找出所有高相似度的记忆对
result = await db.execute(text("""
SELECT a.id as id_a, b.id as id_b,
a.fact_key as key_a, b.fact_key as key_b,
a.fact_value as val_a, b.fact_value as val_b,
a.updated_at as time_a, b.updated_at as time_b,
1 - (a.embedding <=> b.embedding) as similarity
FROM semantic_memories a
JOIN semantic_memories b ON a.id < b.id -- 避免自比和重复对
WHERE a.user_id = :uid AND b.user_id = :uid
AND 1 - (a.embedding <=> b.embedding) > :threshold
ORDER BY similarity DESC
LIMIT 50
"""), {"uid": user_id, "threshold": similarity_threshold})
pairs = result.fetchall()
# 2. 逐对判断是否合并
for pair in pairs:
resolution = await _should_merge(client, pair)
if resolution["action"] == "merge":
# 保留更新的那条,删除旧的
keep_id = pair.id_b if pair.time_b > pair.time_a else pair.id_a
delete_id = pair.id_a if keep_id == pair.id_b else pair.id_b
# 更新保留条目的值为合并后的值
await db.execute(text("""
UPDATE semantic_memories
SET fact_value = :merged_value, updated_at = NOW()
WHERE id = :keep_id
"""), {
"merged_value": resolution["merged_value"],
"keep_id": keep_id,
})
# 删除冗余条目
await db.execute(text(
"DELETE FROM semantic_memories WHERE id = :del_id"
), {"del_id": delete_id})
await db.commit()
async def _should_merge(client: AsyncOpenAI, pair) -> dict:
"""让 LLM 判断两条记忆是否应该合并"""
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"""两条记忆是否描述同一件事?如果是,请合并。
记忆 A:{pair.key_a} = {pair.val_a}({pair.time_a})
记忆 B:{pair.key_b} = {pair.val_b}({pair.time_b})
相似度:{pair.similarity:.2f}
请回答 JSON 格式:
{{"action": "merge" 或 "keep_both", "merged_value": "合并后的值"}}""",
}],
temperature=0,
)
import json
return json.loads(response.choices[0].message.content)去重的执行策略:
| 策略 | 触发时机 | 优点 | 缺点 |
|---|---|---|---|
| 实时去重 | 每次写入新事实时检查 | 记忆始终干净 | 写入延迟增加 |
| 定时去重 | 每天凌晨批量执行 | 不影响在线延迟 | 白天可能有短暂重复 |
| 阈值去重 | 记忆条数超过阈值时 | 自适应,低频用户不浪费 | 实现稍复杂 |
💡 推荐"写入时快速检查 + 定时批量深度去重"的组合——写入时用简单的 key 精确匹配做快速去重(零成本),每天用向量相似度做深度去重(发现不同措辞的重复)。这样既保证了实时写入性能,又保证了记忆库的长期整洁。
6.4 遗忘机制:基于时间衰减与访问频率的淘汰策略
人类的记忆会自然遗忘——越久没回忆的信息,记忆强度越低。Agent 的记忆系统也需要类似的机制,否则早期的低价值记忆会永远占着位置。
核心思路:为每条记忆计算一个"重要性分数",定期淘汰低分记忆。
记忆重要性评分公式(灵感来自艾宾浩斯遗忘曲线):
score = base_importance × recency_factor × frequency_factor
其中:
├── base_importance : 记忆本身的重要性(由 confidence 表示)
│ 用户明确说的 = 1.0,AI 推测的 = 0.5-0.8
│
├── recency_factor : 时间衰减因子
│ = exp(-decay_rate × days_since_last_access)
│ 最近访问的 ≈ 1.0,30 天没访问的 ≈ 0.3
│
└── frequency_factor: 访问频率因子
= log(1 + access_count) / log(1 + max_access)
经常被召回的 → 更重要代码实现:
import math
from datetime import datetime, timedelta
def calculate_memory_score(
confidence: float,
last_accessed_at: datetime,
access_count: int,
max_access: int = 100,
decay_rate: float = 0.05,
) -> float:
"""计算记忆的重要性分数(0-1)"""
# 时间衰减:距上次访问的天数越多,分数越低
days_since = (datetime.utcnow() - last_accessed_at).days
recency = math.exp(-decay_rate * days_since)
# 访问频率:被召回越多次说明越重要
frequency = math.log(1 + access_count) / math.log(1 + max(max_access, 1))
frequency = min(frequency, 1.0)
# 综合评分
score = confidence * 0.4 + recency * 0.4 + frequency * 0.2
return round(score, 4)
# 示例:
# 用户明确说的、昨天刚访问过、被召回 20 次 → 0.92
# AI 推测的、30 天没访问、只被召回 1 次 → 0.23自动清理任务:
async def cleanup_stale_memories(
db: AsyncSession,
user_id: str,
max_memories: int = 200, # 每用户最多保留 200 条
min_score: float = 0.15, # 低于此分数直接删除
inactive_days: int = 90, # 90 天未访问的候选删除
):
"""清理低价值记忆"""
# 1. 删除长期未访问且低置信度的记忆
await db.execute(text("""
DELETE FROM semantic_memories
WHERE user_id = :uid
AND last_accessed_at < :cutoff
AND confidence < 0.5
"""), {
"uid": user_id,
"cutoff": datetime.utcnow() - timedelta(days=inactive_days),
})
# 2. 如果仍然超过上限,按分数排序淘汰
count = await db.scalar(text(
"SELECT COUNT(*) FROM semantic_memories WHERE user_id = :uid"
), {"uid": user_id})
if count > max_memories:
# 保留 Top N,删除其余
await db.execute(text("""
DELETE FROM semantic_memories
WHERE id IN (
SELECT id FROM semantic_memories
WHERE user_id = :uid
ORDER BY
confidence * 0.4
+ EXP(-0.05 * EXTRACT(DAY FROM NOW() - last_accessed_at)) * 0.4
+ LN(1 + access_count) / LN(101) * 0.2
ASC
LIMIT :delete_count
)
"""), {"uid": user_id, "delete_count": count - max_memories})
await db.commit()不同类型记忆的遗忘策略差异:
| 记忆类型 | 衰减速度 | 最低保留期 | 说明 |
|---|---|---|---|
| personal(姓名、身份) | 极慢 | 永久 | 核心身份信息几乎不变 |
| preference(偏好) | 中等 | 60 天 | 偏好可能改变 |
| technical(技术选型) | 中等 | 90 天 | 技术栈会迭代 |
| decision(已做决定) | 慢 | 180 天 | 重要决策需要长期追溯 |
| project(项目信息) | 快 | 30 天 | 项目状态频繁变化 |
💡 不要对所有记忆一视同仁——用户的名字应该"永不遗忘",但"用户上次问了 Python 2 还是 3" 这种信息 30 天后就可以清理了。在
semantic_memories表的category字段上设置不同的衰减参数,是低成本但高回报的优化。
6.5 反思循环(Reflection Loop):让 Agent 自己整理记忆
前面的压缩、去重、遗忘都是"被动式"管理——按规则自动执行。反思循环是更高级的策略:让 Agent 主动审视自己的记忆库,发现矛盾、提炼规律、生成新的高层次知识。
反思循环的灵感——人类的"睡眠整理":
人在睡眠时,大脑会:
├── 回放白天的经历(海马体 → 皮层)
├── 发现不同经历之间的关联
├── 提炼出规律和模式
└── 巩固重要的、丢弃琐碎的
Agent 的反思循环(定时任务):
├── 回顾最近的记忆
├── 发现矛盾和不一致
├── 从多条记忆中提炼出更高层次的洞察
└── 清理已被高层洞察覆盖的低层记忆核心实现:
REFLECTION_PROMPT = """你是一个记忆管理助手。请审视以下用户的记忆库,
完成三个任务:
1. **发现矛盾**:找出相互矛盾的记忆条目
2. **提炼洞察**:从多条记忆中归纳出更高层次的结论
3. **标记清理**:标记可以安全删除的低价值条目
用户记忆库:
{memories}
请以 JSON 格式输出:
{{
"contradictions": [
{{"memory_ids": [1, 3], "description": "矛盾描述",
"resolution": "建议保留哪条"}}
],
"insights": [
{{"derived_from": [2, 4, 5], "insight": "归纳出的新知识",
"category": "分类"}}
],
"cleanup_ids": [6, 7]
}}"""
async def run_reflection(
db: AsyncSession,
client: AsyncOpenAI,
user_id: str,
):
"""执行一次反思循环"""
# 1. 加载该用户的所有记忆
result = await db.execute(text("""
SELECT id, category, fact_key, fact_value,
confidence, access_count, updated_at
FROM semantic_memories
WHERE user_id = :uid
ORDER BY updated_at DESC
LIMIT 100
"""), {"uid": user_id})
memories = result.fetchall()
if len(memories) < 10:
return # 记忆太少,不需要反思
# 2. 格式化记忆列表
memories_text = "\n".join(
f"[{m.id}] {m.category}/{m.fact_key} = {m.fact_value} "
f"(置信度:{m.confidence}, 访问:{m.access_count}次, "
f"更新:{m.updated_at:%m-%d})"
for m in memories
)
# 3. 调用 LLM 做反思
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": REFLECTION_PROMPT.format(
memories=memories_text
),
}],
temperature=0,
response_format={"type": "json_object"},
)
import json
reflection = json.loads(response.choices[0].message.content)
# 4. 处理矛盾——解决冲突
for contradiction in reflection.get("contradictions", []):
# 具体处理逻辑复用 5.4 的冲突解决器
pass
# 5. 写入新洞察
for insight in reflection.get("insights", []):
# 将高层洞察作为新的语义记忆写入
fact_text = insight["insight"]
embedding = await _embed(client, fact_text)
await db.execute(text("""
INSERT INTO semantic_memories
(user_id, category, fact_key, fact_value,
embedding, confidence)
VALUES (:uid, :cat, :key, :val, :emb, 0.85)
ON CONFLICT (user_id, category, fact_key) DO UPDATE
SET fact_value = :val, updated_at = NOW()
"""), {
"uid": user_id,
"cat": insight["category"],
"key": f"insight_{hash(fact_text) % 10000}",
"val": fact_text,
"emb": str(embedding),
})
# 6. 清理标记的低价值记忆
cleanup_ids = reflection.get("cleanup_ids", [])
if cleanup_ids:
await db.execute(text(
"DELETE FROM semantic_memories WHERE id = ANY(:ids)"
), {"ids": cleanup_ids})
await db.commit()反思循环的执行频率建议:
| 用户活跃度 | 反思频率 | 说明 |
|---|---|---|
| 高活跃(日均 20+ 轮) | 每天 1 次 | 记忆增长快,需要频繁整理 |
| 中活跃(日均 5-20 轮) | 每周 1 次 | 平衡整理质量和 LLM 成本 |
| 低活跃(日均 <5 轮) | 每月 1 次 | 记忆增长慢,不需要频繁整理 |
💡 反思循环是记忆系统的"高级功能",不是必需品——如果你的 Agent 用户量不大、对话轮数不多,前面的去重 + 遗忘已经够用。反思循环主要用于高活跃度、长期使用的场景,让 Agent 的记忆从"堆积事实"进化为"理解用户"。
第 6 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| 记忆膨胀 | 不清理的记忆导致检索变慢、噪声增多、成本飙升 |
| 渐进式摘要 | 原始事件 → 会话摘要 → 周期摘要,三层逐步压缩 |
| 记忆去重 | 向量相似度发现候选重复 + LLM 判断是否合并 |
| 遗忘机制 | 基于 confidence × 时间衰减 × 访问频率的评分淘汰 |
| 反思循环 | Agent 主动审视记忆:发现矛盾、提炼洞察、清理冗余 |
7. 图记忆(Graph Memory):理解实体间的关系
前面的语义记忆把每条事实独立存储为一个向量——这在查"用户喜欢什么"时很好用,但遇到实体间的关系推理就力不从心了。图记忆用知识图谱来补齐这个能力:不仅记住事实,还记住事实之间的关联。
7.1 为什么需要图记忆:向量搜索搞不定的关系推理
向量搜索的核心能力是语义相似度匹配——给一个 query,找到最像的记忆。但有一类问题,向量搜索天生搞不定:
向量搜索失败的典型场景:
记忆库中有以下事实:
① 用户李明 → 负责 → 电商项目
② 电商项目 → 使用 → FastAPI
③ 电商项目 → 数据库 → PostgreSQL
④ 张伟 → 同事 → 李明
⑤ 张伟 → 负责 → 支付模块
用户问:"张伟用的什么数据库?"
┌─── 向量搜索 ──────────────────────────────────┐
│ query: "张伟 数据库" │
│ → 最相似: ③ "电商项目 → PostgreSQL"(0.72) │
│ → 但搜索不知道张伟和电商项目有关系! │
│ → 无法回答 │
└────────────────────────────────────────────────┘
┌─── 图谱推理 ──────────────────────────────────┐
│ 起点: 张伟 │
│ → 张伟 --同事-→ 李明 │
│ → 李明 --负责-→ 电商项目 │
│ → 电商项目 --数据库-→ PostgreSQL │
│ → 答案: "张伟的同事李明负责的电商项目用 PG" │
│ → 通过 3 跳关系推理得出答案 ✓ │
└────────────────────────────────────────────────┘向量搜索 vs 图谱推理的能力对比:
| 查询类型 | 向量搜索 | 图谱推理 |
|---|---|---|
| 属性查询 "李明用什么数据库" | ✅ 直接匹配 | ✅ 直接查 |
| 关系查询 "谁和李明是同事" | ⚠️ 能查到但不精确 | ✅ 精确遍历 |
| 多跳推理 "张伟项目用的什么技术" | ❌ 无法推理 | ✅ 多跳遍历 |
| 路径查询 "从 A 到 B 怎么关联" | ❌ 不支持 | ✅ 最短路径 |
| 模糊语义 "像 FastAPI 一样的框架" | ✅ 语义召回 | ❌ 不擅长 |
| 全量扫描 "所有用 Python 的项目" | ⚠️ 效率低 | ✅ 索引高效 |
两者的关系是互补而非替代:
向量记忆 + 图记忆的分工:
向量记忆(Semantic Memory)
├── 擅长:模糊语义匹配、开放式查询
├── 数据:扁平的 key-value 事实
└── 适合:"帮我推荐类似 FastAPI 的框架"
图记忆(Graph Memory)
├── 擅长:关系遍历、多跳推理、路径发现
├── 数据:实体 + 关系的网络结构
└── 适合:"张伟负责的模块依赖哪些服务"
最佳实践:先用向量召回候选记忆,
再用图谱补充关系上下文💡 如果你的 Agent 场景不涉及"多个实体之间的关系"(比如只做单用户画像),图记忆可以跳过——向量记忆足够。图记忆主要用于:团队协作场景、项目管理 Agent、CRM 客户关系管理、多角色 RPG 等需要理解"谁和谁有什么关系"的场景。
7.2 实体关系抽取:从对话中构建知识图谱
知识图谱的基本单元是三元组(Triple):(主体, 关系, 客体)。从对话中提取三元组,就能逐步构建出用户的知识图谱。
三元组示例:
对话:"我叫李明,和张伟一起做电商项目,用的 FastAPI"
提取的三元组:
┌──────────────────────────────────────────┐
│ (李明, is_a, 用户) │
│ (李明, works_with, 张伟) │
│ (李明, works_on, 电商项目) │
│ (电商项目, uses, FastAPI) │
└──────────────────────────────────────────┘
可视化为图结构:
张伟 ←─works_with─→ 李明
│
works_on
│
▼
电商项目
│
uses
│
▼
FastAPI核心实现——用结构化输出抽取三元组:
from pydantic import BaseModel, Field
class Triple(BaseModel):
"""知识图谱三元组"""
subject: str = Field(description="主体实体名称")
subject_type: str = Field(
description="主体类型:person/project/technology/organization"
)
relation: str = Field(
description="关系类型:uses/works_on/works_with/manages/depends_on/..."
)
object: str = Field(description="客体实体名称")
object_type: str = Field(description="客体类型")
class TripleExtractionResult(BaseModel):
"""三元组抽取结果"""
triples: list[Triple] = Field(default_factory=list)
TRIPLE_PROMPT = """请从以下对话中提取实体和关系,构建知识图谱三元组。
提取规则:
1. 实体类型:person(人)、project(项目)、technology(技术/框架/工具)、
organization(公司/团队)、concept(抽象概念)
2. 关系类型:uses(使用)、works_on(负责)、works_with(合作)、
manages(管理)、depends_on(依赖)、prefers(偏好)、
deployed_on(部署于)、belongs_to(属于)
3. 只提取明确提到的关系,不要推测
4. 实体名称尽量统一(如 PG 统一写为 PostgreSQL)
对话内容:
{conversation}
已有的实体(保持命名一致):
{existing_entities}"""
class TripleExtractor:
"""从对话中抽取知识图谱三元组"""
def __init__(self, client: AsyncOpenAI):
self.client = client
async def extract(
self,
conversation: list[dict],
existing_entities: list[str] | None = None,
) -> list[Triple]:
conv_text = "\n".join(
f"{m['role']}: {m['content']}" for m in conversation
)
entities_text = ", ".join(existing_entities) if existing_entities else "无"
response = await self.client.beta.chat.completions.parse(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": TRIPLE_PROMPT.format(
conversation=conv_text,
existing_entities=entities_text,
),
}],
response_format=TripleExtractionResult,
temperature=0,
)
return response.choices[0].message.parsed.triples使用示例:
extractor = TripleExtractor(client)
conversation = [
{"role": "user", "content": "我叫李明,和张伟一起做电商项目"},
{"role": "assistant", "content": "用什么技术栈呢?"},
{"role": "user", "content": "FastAPI + PostgreSQL,部署在阿里云 ECS"},
]
triples = await extractor.extract(conversation)
# [
# Triple(subject="李明", subject_type="person",
# relation="works_with", object="张伟", object_type="person"),
# Triple(subject="李明", subject_type="person",
# relation="works_on", object="电商项目", object_type="project"),
# Triple(subject="电商项目", subject_type="project",
# relation="uses", object="FastAPI", object_type="technology"),
# Triple(subject="电商项目", subject_type="project",
# relation="uses", object="PostgreSQL", object_type="technology"),
# Triple(subject="电商项目", subject_type="project",
# relation="deployed_on", object="阿里云ECS", object_type="technology"),
# ]💡 三元组抽取和事实抽取(5.2)可以合并为同一次 LLM 调用——在 Prompt 中同时要求返回
facts(扁平事实)和triples(关系三元组),一次调用完成两种记忆的原始数据提取,节省一半的 API 成本。
7.3 图存储方案:Neo4j / NetworkX / 轻量级 JSON 图
三元组提取出来了,用什么存?根据项目规模,有三种方案:
| 方案 | 适用规模 | 优点 | 缺点 | 推荐场景 |
|---|---|---|---|---|
| NetworkX | 每用户 <1万 节点 | 零依赖、纯 Python、内存操作极快 | 不持久化、大图吃内存 | MVP / 中小项目(推荐起步) |
| 轻量 JSON 图 | 每用户 <1千 节点 | 零依赖、直接存 PG JSONB | 查询能力有限 | 极简场景 |
| Neo4j | 百万级节点 | 专业图数据库、Cypher 查询强大 | 额外运维、学习成本 | 企业级 / 复杂关系网络 |
推荐方案:NetworkX + JSON 持久化——对大多数 Agent 来说,每个用户的知识图谱不会超过几百个节点,NetworkX 完全够用。用 JSON 序列化存入数据库即可持久化。
完整实现:
import networkx as nx
import json
from datetime import datetime
class GraphMemory:
"""基于 NetworkX 的图记忆"""
def __init__(self):
self.graph = nx.DiGraph() # 有向图
def add_triple(self, triple: "Triple"):
"""添加一条三元组"""
# 添加节点(带类型属性)
self.graph.add_node(
triple.subject,
type=triple.subject_type,
updated_at=datetime.utcnow().isoformat(),
)
self.graph.add_node(
triple.object,
type=triple.object_type,
updated_at=datetime.utcnow().isoformat(),
)
# 添加边(关系)
self.graph.add_edge(
triple.subject,
triple.object,
relation=triple.relation,
created_at=datetime.utcnow().isoformat(),
)
def get_neighbors(self, entity: str, max_depth: int = 2) -> dict:
"""获取实体的邻居(支持多跳)"""
if entity not in self.graph:
return {"entity": entity, "found": False}
result = {
"entity": entity,
"type": self.graph.nodes[entity].get("type", "unknown"),
"relations": [],
}
# BFS 遍历到指定深度
visited = {entity}
queue = [(entity, 0)]
while queue:
node, depth = queue.pop(0)
if depth >= max_depth:
continue
# 出边(该实体指向的)
for _, target, data in self.graph.out_edges(node, data=True):
result["relations"].append({
"from": node,
"relation": data["relation"],
"to": target,
"to_type": self.graph.nodes[target].get("type"),
"depth": depth + 1,
})
if target not in visited:
visited.add(target)
queue.append((target, depth + 1))
# 入边(指向该实体的)
for source, _, data in self.graph.in_edges(node, data=True):
result["relations"].append({
"from": source,
"relation": data["relation"],
"to": node,
"from_type": self.graph.nodes[source].get("type"),
"depth": depth + 1,
})
if source not in visited:
visited.add(source)
queue.append((source, depth + 1))
return result
def query_by_relation(self, relation: str) -> list[dict]:
"""查询某种关系的所有实例"""
return [
{"from": u, "to": v, **data}
for u, v, data in self.graph.edges(data=True)
if data.get("relation") == relation
]
def find_path(self, source: str, target: str) -> list[str] | None:
"""查找两个实体之间的最短路径"""
try:
path = nx.shortest_path(
self.graph, source, target
)
return path
except nx.NetworkXNoPath:
return None
# ─── 持久化:序列化为 JSON ───
def to_json(self) -> str:
"""序列化为 JSON(存入数据库)"""
data = nx.node_link_data(self.graph)
return json.dumps(data, ensure_ascii=False, default=str)
@classmethod
def from_json(cls, json_str: str) -> "GraphMemory":
"""从 JSON 反序列化"""
gm = cls()
if json_str:
data = json.loads(json_str)
gm.graph = nx.node_link_graph(data, directed=True)
return gm存入数据库的方式:
# 在 semantic_memories 同级,为每个用户存一个图
# 直接用 JSONB 字段存 NetworkX 序列化结果
async def save_graph(db: AsyncSession, user_id: str, graph: GraphMemory):
"""将图记忆持久化到数据库"""
await db.execute(text("""
INSERT INTO user_graphs (user_id, graph_data, updated_at)
VALUES (:uid, :data, NOW())
ON CONFLICT (user_id) DO UPDATE
SET graph_data = :data, updated_at = NOW()
"""), {"uid": user_id, "data": graph.to_json()})
await db.commit()
async def load_graph(db: AsyncSession, user_id: str) -> GraphMemory:
"""从数据库加载图记忆"""
result = await db.scalar(text(
"SELECT graph_data FROM user_graphs WHERE user_id = :uid"
), {"uid": user_id})
return GraphMemory.from_json(result) if result else GraphMemory()💡 NetworkX 图存在内存中,查询速度是微秒级——对于每用户几百个节点的场景,BFS 遍历、最短路径等操作不到 1ms。唯一需要注意的是在高并发场景下要做好用户级别的图缓存(比如用 Redis 缓存序列化的 JSON),避免每次请求都从数据库加载。
7.4 混合检索:向量召回 + 图谱推理的联合查询
实际使用中,向量记忆和图记忆最好组合使用——先用向量搜索找到相关的实体,再用图谱展开这些实体的关系网络,最后把两部分信息一起注入上下文。
混合检索的工作流程:
用户问:"帮我优化电商项目的数据库性能"
│
▼
① 向量检索(Semantic Memory)
│ → 召回事实:
│ - database = PostgreSQL(相似度 0.91)
│ - project_type = 电商后端(0.85)
│ - deployment = 阿里云 ECS(0.78)
│
▼
② 实体识别(从召回结果中提取实体名)
│ → 实体列表:["电商项目", "PostgreSQL", "阿里云ECS"]
│
▼
③ 图谱展开(Graph Memory)
│ → 以"电商项目"为起点,2 跳遍历:
│ - 电商项目 --uses-→ FastAPI
│ - 电商项目 --uses-→ PostgreSQL
│ - 电商项目 --deployed_on-→ 阿里云ECS
│ - 李明 --works_on-→ 电商项目
│ - 张伟 --works_with-→ 李明
│
▼
④ 组装上下文
→ 向量召回的事实 + 图谱展开的关系网络
→ 一起注入 System Prompt完整代码实现:
class HybridMemoryRetriever:
"""混合检索器:向量 + 图谱"""
def __init__(
self,
semantic_memory: SemanticMemoryService,
graph_memory: GraphMemory,
):
self.semantic = semantic_memory
self.graph = graph_memory
async def retrieve(
self, user_id: str, query: str,
vector_top_k: int = 5,
graph_depth: int = 2,
) -> str:
"""混合检索,返回格式化的记忆上下文"""
# 1. 向量检索
facts = await self.semantic.recall(
user_id, query, limit=vector_top_k
)
# 2. 从召回的事实中提取实体名
entities = set()
for fact in facts:
entities.add(fact["fact_value"])
# fact_key 可能也是实体名(如 "database" 不是,但 "project" 可能是)
# 3. 图谱展开
graph_context = []
for entity in entities:
neighbors = self.graph.get_neighbors(entity, max_depth=graph_depth)
if neighbors.get("found", True) and neighbors.get("relations"):
graph_context.append(neighbors)
# 4. 格式化输出
return self._format_context(facts, graph_context)
def _format_context(
self, facts: list[dict], graph_context: list[dict]
) -> str:
"""将检索结果格式化为 LLM 可理解的上下文"""
parts = []
# 向量记忆部分
if facts:
parts.append("[用户画像]")
for f in facts:
parts.append(f"- {f['fact_key']}: {f['fact_value']}")
# 图谱部分
if graph_context:
parts.append("\n[关系网络]")
for ctx in graph_context:
for rel in ctx["relations"][:10]: # 限制数量
parts.append(
f"- {rel['from']} --{rel['relation']}--> {rel['to']}"
)
return "\n".join(parts) if parts else ""在 Agent 中使用混合检索:
async def chat_with_hybrid_memory(
user_id: str, message: str,
retriever: HybridMemoryRetriever,
client: AsyncOpenAI,
) -> str:
# 一次调用获取所有记忆上下文
memory_context = await retriever.retrieve(user_id, message)
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": (
"你是一个智能助手,拥有用户的长期记忆和关系网络。\n"
"请利用以下记忆信息提供个性化帮助。\n\n"
f"{memory_context}"
)},
{"role": "user", "content": message},
],
)
return response.choices[0].message.content混合检索 vs 单一检索的效果对比:
| 查询场景 | 仅向量 | 仅图谱 | 混合检索 |
|---|---|---|---|
| "我叫什么名字" | ✅ 直接命中 | ✅ 查节点 | ✅ |
| "我用什么数据库" | ✅ 直接命中 | ✅ 查属性 | ✅ |
| "谁和我一起做项目" | ⚠️ 语义模糊 | ✅ 精确遍历 | ✅ 图谱主导 |
| "张伟的项目用什么技术" | ❌ 无法推理 | ✅ 多跳遍历 | ✅ 图谱主导 |
| "推荐类似 FastAPI 的框架" | ✅ 语义匹配 | ❌ 不擅长 | ✅ 向量主导 |
| "我的项目整体技术栈" | ⚠️ 零散召回 | ✅ 完整遍历 | ✅ 互补增强 |
💡 混合检索的额外延迟很小——向量检索 ~30ms + 图谱遍历 ~0.1ms = 总计 ~30ms。图谱查询因为在内存中运行(NetworkX),几乎不增加延迟。成本方面也没有额外的 LLM 调用。是一个"低成本高收益"的能力增强。
第 7 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| 图记忆的定位 | 补齐向量搜索无法做的关系推理和多跳查询 |
| 三元组 | 知识图谱的基本单元:(主体, 关系, 客体) |
| 推荐存储方案 | NetworkX + JSON 持久化,适合绝大多数 Agent 场景 |
| 混合检索 | 向量召回实体 → 图谱展开关系 → 组装上下文 |
| 适用场景 | 多人协作、项目管理、CRM 等涉及实体关系的场景 |
8. 主流框架实战:Mem0 / Letta / LangGraph
前面几章从零构建了记忆系统的各个模块。但在实际项目中,很多团队会选择使用现成的记忆框架来加速开发。本章介绍三个最主流的方案,帮你选出最适合的。
8.1 Mem0:开箱即用的多层记忆中间件
Mem0(读作"mem-zero")是目前最流行的 Agent 记忆中间件。它的核心理念是**"把对话扔给我,记忆的事我来搞定"**——自动从对话中提取事实、存储为向量、处理冲突和去重。
Mem0 的架构:
你的 Agent 代码
│
│ client.add(messages, user_id="xxx")
▼
┌─────────────────────────────────────────────┐
│ Mem0 引擎(自动完成以下步骤) │
│ │
│ ① 事实抽取 ─── 用 LLM 从对话中提取事实 │
│ ② 去重检查 ─── 对比已有记忆,处理冲突 │
│ ③ Embedding ── 生成向量 │
│ ④ 存储 ─────── 写入向量数据库 │
│ ⑤ 图谱更新 ── 更新实体关系(可选) │
└─────────────────────────────────────────────┘
│
│ client.search(query, user_id="xxx")
▼
返回最相关的记忆列表快速上手:
# 安装
# pip install mem0ai
from mem0 import MemoryClient
# 1. 初始化(使用 Mem0 Cloud)
client = MemoryClient(api_key="your_api_key")
# 2. 添加记忆——直接传入对话,Mem0 自动提取事实
messages = [
{"role": "user", "content": "我叫李明,做电商后端,用 FastAPI + PostgreSQL"},
{"role": "assistant", "content": "你好李明!FastAPI + PG 是很棒的组合"},
{"role": "user", "content": "团队两个人,部署在阿里云"},
]
client.add(messages, user_id="liming_001")
# → Mem0 自动提取并存储:
# "用户名:李明"
# "项目:电商后端"
# "技术栈:FastAPI + PostgreSQL"
# "团队规模:2 人"
# "部署环境:阿里云"
# 3. 搜索记忆——语义搜索
results = client.search("用什么数据库", user_id="liming_001")
for r in results:
print(f"记忆: {r['memory']} (相关度: {r['score']:.2f})")
# → 记忆: 技术栈为 FastAPI + PostgreSQL (相关度: 0.92)
# 4. 获取用户的所有记忆
all_memories = client.get_all(user_id="liming_001")自托管模式(不想用云服务):
from mem0 import Memory
# 本地模式,需要自己配置向量数据库和 LLM
config = {
"llm": {
"provider": "openai",
"config": {"model": "gpt-4o-mini", "temperature": 0},
},
"embedder": {
"provider": "openai",
"config": {"model": "text-embedding-3-small"},
},
"vector_store": {
"provider": "qdrant",
"config": {"host": "localhost", "port": 6333},
},
"graph_store": {
"provider": "neo4j",
"config": {"url": "bolt://localhost:7687", "username": "neo4j"},
},
}
memory = Memory.from_config(config)
# API 完全一致
memory.add(messages, user_id="liming_001")
results = memory.search("数据库", user_id="liming_001")Mem0 的优势和局限:
| 维度 | 优势 | 局限 |
|---|---|---|
| 易用性 | 3 行代码就能用,零配置记忆系统 | 行为不透明,难以调试 |
| 功能 | 自动事实提取 + 去重 + 冲突解决 + 图谱 | 自定义提取逻辑受限 |
| 部署 | 云服务零运维,自托管也简单 | 云服务有数据隐私顾虑 |
| 性能 | 异步处理,不阻塞对话 | 自托管需要额外的向量 DB |
| 生态 | 社区活跃,文档完善 | 与 LangChain/LangGraph 集成需要适配 |
💡 Mem0 最适合"快速给已有 Agent 加上记忆能力"的场景——如果你的 Agent 已经可以正常对话,只是缺少跨会话的记忆,Mem0 是成本最低的选择。但如果你需要深度自定义记忆的提取逻辑、存储结构或检索策略,自建方案(第 5-7 章)会更灵活。
8.2 Letta(MemGPT):OS 风格的虚拟上下文管理
Letta(前身是 MemGPT)的理念和 Mem0 完全不同——它不是一个记忆"中间件",而是一个有状态的 Agent 运行时。核心创新是:让 Agent 像操作系统管理内存一样,自主管理自己的上下文窗口。
Letta 的 OS 类比:
传统 Agent:上下文窗口 = 固定大小的 RAM
├── 塞满了就溢出,丢失信息
└── 被动管理,无法自主决定"加载什么"
Letta Agent:上下文窗口 = 虚拟内存系统
├── Core Memory ──→ "RAM"(即时可用,Agent 可读写)
│ ├── human block: "用户是李明,做电商后端"
│ └── persona block: "我是一个 Python 专家助手"
│
├── Archival Memory ──→ "磁盘"(持久化,按需加载)
│ └── 所有历史对话和事实,向量检索
│
└── Agent 自主决定:
├── 什么信息放入 Core Memory("钉住"重要的)
├── 什么信息归档到 Archival("换出"不急用的)
└── 什么时候从 Archival 检索("换入"需要的)核心概念——Memory Blocks:
Letta 的 Core Memory 由多个可编辑的文本块组成。关键区别是:Agent 自己可以修改这些块的内容——不是外部程序控制记忆,而是 Agent 自治。
# 安装
# pip install letta-client
from letta_client import Letta
# 1. 初始化客户端(连接本地 Letta 服务器或 Letta Cloud)
client = Letta(base_url="http://localhost:8283")
# 2. 创建有状态 Agent——定义初始记忆块
agent = client.agents.create(
model="openai/gpt-4o",
memory_blocks=[
{
"label": "human",
"value": "用户还没有自我介绍。",
},
{
"label": "persona",
"value": "你是一个 Python 全栈开发专家,擅长 FastAPI 和 PostgreSQL。",
},
],
)
# 3. 发送消息——Agent 会自动更新自己的记忆块
response = client.agents.messages.create(
agent_id=agent.id,
messages=[{
"role": "user",
"content": "你好!我叫李明,正在做一个电商后端项目",
}],
)
# 4. 查看 Agent 自主更新的记忆
human_block = client.agents.blocks.retrieve(
agent_id=agent.id,
block_label="human",
)
print(human_block.value)
# → "用户是李明,正在做电商后端项目。"
# ← Agent 自己把 "用户还没有自我介绍" 更新了!
# 5. Archival Memory——长期存储(Agent 自动管理)
# Agent 在对话过程中会自动将重要信息归档到 Archival Memory
# 也可以手动添加
client.agents.archival_memory.create(
agent_id=agent.id,
text="2024-03 讨论了订单表设计,决定用 JSONB 存储扩展属性",
)Letta 与 Mem0 的核心区别:
| 维度 | Mem0 | Letta |
|---|---|---|
| 记忆管理者 | 外部程序(你的代码调用 API) | Agent 自己(自主读写记忆) |
| 架构模式 | 记忆中间件 + 你的 Agent | 完整的 Agent 运行时 |
| 记忆结构 | 扁平的事实列表 | 结构化的 Memory Blocks |
| 存储层级 | 单层向量存储 | Core(RAM)+ Archival(磁盘) |
| 适用场景 | 给已有 Agent 加记忆 | 从零构建有状态 Agent |
| 集成成本 | 低(加几行代码) | 高(需要用 Letta 的 Agent 框架) |
| 部署方式 | 云服务 / 嵌入式库 | 需要运行 Letta Server |
💡 Letta 最适合"从零构建需要深度状态管理的 Agent"——Agent 自主管理记忆这个能力非常强大,但代价是你需要完全采用 Letta 的 Agent 框架。如果你已经有了基于 LangChain/LangGraph 的 Agent,迁移到 Letta 的成本比较高。Mem0 则可以无侵入地集成到任何现有架构中。
8.3 LangGraph Store:跨线程持久化记忆
如果你已经在用 LangGraph 构建 Agent,LangGraph 自带的记忆方案是最自然的选择——不需要引入额外的依赖。LangGraph 用两个机制分别处理短期和长期记忆:
LangGraph 的双轨记忆架构:
┌─────────────────────────────────────────────┐
│ Checkpointer(短期记忆) │
│ ────────────────────────────────────────── │
│ 作用域:线程级(thread_id) │
│ 内容: 当前会话的 messages + 自定义状态 │
│ 生命周期:会话内持久化 │
│ 特点: 自动保存/恢复,无需手动管理 │
│ 实现: MemorySaver / PostgresSaver │
│ │
│ → 第 3 章已详细介绍 │
├─────────────────────────────────────────────┤
│ BaseStore(长期记忆) ← 本节重点 │
│ ────────────────────────────────────────── │
│ 作用域:跨线程全局(按 namespace 隔离) │
│ 内容: 用户画像、偏好、提炼的事实 │
│ 生命周期:永久持久化 │
│ 特点: 手动 put/get/search,按需读写 │
│ 实现: InMemoryStore / PostgresStore │
└─────────────────────────────────────────────┘
Checkpointer 回答:"这次对话聊了什么"
BaseStore 回答:"这个用户是谁、喜欢什么"BaseStore 核心 API 和用法:
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
# 1. 初始化 Store(生产环境用 PostgresStore)
store = InMemoryStore()
checkpointer = MemorySaver()
# 2. 写入记忆——使用 namespace 隔离用户
store.put(
namespace=("memories", "user_liming"),
key="tech_stack",
value={"fact": "FastAPI + PostgreSQL", "confidence": 1.0},
)
store.put(
namespace=("memories", "user_liming"),
key="project_type",
value={"fact": "电商后端", "confidence": 1.0},
)
# 3. 读取记忆
item = store.get(namespace=("memories", "user_liming"), key="tech_stack")
print(item.value)
# → {"fact": "FastAPI + PostgreSQL", "confidence": 1.0}
# 4. 搜索记忆(支持 namespace 前缀搜索)
results = store.search(namespace_prefix=("memories", "user_liming"))
for r in results:
print(f"{r.key}: {r.value}")在 Graph 节点中集成 Store——关键模式:
from langgraph.config import get_store
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
llm = ChatOpenAI(model="gpt-4o")
# 核心:在节点函数中通过 config 获取 store
async def chat_with_memory(state: AgentState, config: dict) -> dict:
# 1. 从 config 中获取 store 和 user_id
store = get_store(config)
user_id = config["configurable"].get("user_id", "anonymous")
# 2. 检索该用户的所有记忆
namespace = ("memories", user_id)
memories = store.search(namespace_prefix=namespace)
# 3. 格式化记忆上下文
memory_text = ""
if memories:
memory_text = "\n\n[用户记忆]\n"
for m in memories:
memory_text += f"- {m.key}: {m.value.get('fact', '')}\n"
# 4. 调用 LLM
messages = state["messages"]
system_msg = f"你是一个智能助手。{memory_text}"
response = await llm.ainvoke(
[{"role": "system", "content": system_msg}] + messages
)
return {"messages": [response]}
# 5. 构建 Graph——同时传入 checkpointer 和 store
graph = StateGraph(AgentState)
graph.add_node("chat", chat_with_memory)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)
app = graph.compile(
checkpointer=checkpointer, # 短期记忆(线程内)
store=store, # 长期记忆(跨线程)
)
# 6. 使用——不同 thread_id 但同一 user_id,共享长期记忆
config = {
"configurable": {
"thread_id": "session-001", # 短期记忆隔离
"user_id": "user_liming", # 长期记忆共享
}
}
result = await app.ainvoke(
{"messages": [HumanMessage(content="帮我优化数据库性能")]},
config=config,
)💡 LangGraph Store 是"原生集成"的优势——如果你已经在用 LangGraph,Store 可以零成本加入,不需要额外的服务或依赖。但 Store 只提供基础的 put/get/search,不包含自动事实抽取、冲突解决等高级功能——这些需要自己实现(参考第 5 章),或者在 LangGraph 中嵌入 Mem0 作为记忆后端。
8.4 三大框架对比:选型决策表
三个框架各有侧重,选错了不是不能用,但会让你多走弯路。先看一张全面对比表:
| 维度 | Mem0 | Letta(MemGPT) | LangGraph Store |
|---|---|---|---|
| 定位 | 记忆中间件(插件式) | 有状态 Agent 运行时 | Graph 原生状态层 |
| 集成成本 | 极低(加几行代码) | 高(需采用 Letta 框架) | 低(已用 LangGraph 则零成本) |
| 记忆层次 | 语义记忆 + 图记忆 | Core(RAM)+ Archival(磁盘) | Checkpointer(短期)+ Store(长期) |
| 事实抽取 | ✅ 自动 | ✅ Agent 自主 | ❌ 需自建 |
| 冲突解决 | ✅ 内置 | ✅ Agent 自主 | ❌ 需自建 |
| 图谱支持 | ✅ 内置 Neo4j | ❌ 不原生支持 | ❌ 需自建 |
| 记忆管理者 | 外部程序 | Agent 自己 | 外部程序 |
| 部署依赖 | 向量 DB + LLM(或用云服务) | Letta Server + LLM | PG / SQLite(轻量) |
| 开源协议 | Apache 2.0 | Apache 2.0 | MIT |
| 社区活跃度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
选型决策树——根据你的场景选框架:
你的选型决策路径:
已经在用 LangGraph?
├── YES → 先用 Checkpointer + Store 起步
│ 需要自动事实抽取?
│ ├── YES → Store 层接入 Mem0 做记忆后端
│ └── NO → 纯 Store 手动管理即可
│
└── NO → 已经有可运行的 Agent 了?
├── YES → 用 Mem0(最低侵入成本添加记忆)
│ 需要 Agent 自主管理记忆?
│ ├── YES → 考虑迁移到 Letta
│ └── NO → Mem0 足够
│
└── NO → 从零构建新 Agent?
├── 需要深度状态管理 → Letta
└── 标准 Agent 场景 → LangGraph + Store三种方案也可以组合使用——实际上很多生产项目会混搭:
常见的混搭方案:
方案 A:LangGraph + Mem0(推荐 ✅)
├── LangGraph 管 Agent 编排和短期记忆(Checkpointer)
└── Mem0 管长期记忆(自动抽取 + 向量检索 + 图谱)
→ 各司其职,集成简单
方案 B:LangGraph + 自建记忆(第 5-7 章)
├── LangGraph 管 Agent 编排
└── 自建的 SemanticMemoryService + GraphMemory 管长期记忆
→ 完全可控,适合对记忆逻辑有深度定制需求的团队
方案 C:Letta 全家桶
└── Agent 编排 + 状态管理 + 记忆全在 Letta 内
→ 最一体化,但迁移成本最高💡 没有"最好的框架",只有"最适合当前阶段的框架"——MVP 阶段用 Mem0 快速验证记忆对用户体验的提升;产品稳定后,再根据需要迁移到自建方案或 Letta。框架可以换,第 3-7 章讲的原理和数据结构设计不会过时。
第 8 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| Mem0 | 开箱即用的记忆中间件,3 行代码加记忆,适合快速集成 |
| Letta | 有状态 Agent 运行时,Agent 自主管理记忆,适合从零构建 |
| LangGraph Store | 原生跨线程持久化,Checkpointer(短期)+ Store(长期)双轨架构 |
| 选型核心 | 已有 Agent → Mem0;已用 LangGraph → Store;从零开始 → Letta |
| 可以混搭 | LangGraph 编排 + Mem0 记忆是最常见的生产组合 |
9. 记忆系统的评测与质量保障
记忆系统上线后,怎么知道它到底好不好用?"感觉还行"不够——你需要可量化的指标来衡量记忆的质量,持续发现和修复问题。本章从指标定义、学术基准、业务评测到线上监控,构建完整的记忆质量保障体系。
9.1 怎么衡量记忆好不好:准确率、召回率、时效性
记忆系统的质量可以拆解为四个核心指标,每个指标衡量不同的能力维度:
记忆系统的四大质量指标:
┌─────────────────────────────────────────────────────────┐
│ ① 记忆准确率(Memory Precision) │
│ ──────────────────────────────────── │
│ 定义:召回的记忆中,有多少是正确且相关的 │
│ 公式:准确率 = 正确召回数 / 总召回数 │
│ 衡量:是否召回了无关或错误的记忆 │
│ 目标:> 85% │
│ │
│ 失败案例: │
│ 用户问\"帮我设计数据库\" │
│ → 召回了\"用户喜欢喝咖啡\" ← 不相关,拉低准确率 │
├─────────────────────────────────────────────────────────┤
│ ② 记忆召回率(Memory Recall) │
│ ──────────────────────────────────── │
│ 定义:所有应该被召回的记忆中,实际召回了多少 │
│ 公式:召回率 = 正确召回数 / 应召回总数 │
│ 衡量:是否遗漏了重要的记忆 │
│ 目标:> 80% │
│ │
│ 失败案例: │
│ 用户问\"帮我设计数据库\" │
│ → 忘记召回\"用户偏好 PostgreSQL\" ← 遗漏关键信息 │
├─────────────────────────────────────────────────────────┤
│ ③ 记忆时效性(Memory Freshness) │
│ ──────────────────────────────────── │
│ 定义:召回的记忆是否反映了最新状态 │
│ 衡量:是否用了过时信息(用户已 \"换成 PG\" 但召回 MySQL) │
│ 目标:过时记忆占比 < 5% │
├─────────────────────────────────────────────────────────┤
│ ④ 记忆利用率(Memory Utilization) │
│ ──────────────────────────────────── │
│ 定义:LLM 是否真的利用了召回的记忆来改善回答 │
│ 衡量:有记忆 vs 无记忆的回答质量差异 │
│ 目标:有记忆时回答质量显著优于无记忆 │
└─────────────────────────────────────────────────────────┘用代码来自动评测这些指标:
from dataclasses import dataclass
@dataclass
class MemoryEvalResult:
"""单条测试用例的评测结果"""
query: str # 用户查询
recalled_memories: list[str] # 实际召回的记忆
expected_memories: list[str] # 应该召回的记忆(标注数据)
is_outdated: list[bool] # 每条召回是否过时
@property
def precision(self) -> float:
"""准确率:召回中有多少是期望的"""
if not self.recalled_memories:
return 1.0 # 没召回也没错
hits = sum(
1 for r in self.recalled_memories
if r in self.expected_memories
)
return hits / len(self.recalled_memories)
@property
def recall(self) -> float:
"""召回率:期望中有多少被召回了"""
if not self.expected_memories:
return 1.0 # 没有期望,不存在遗漏
hits = sum(
1 for e in self.expected_memories
if e in self.recalled_memories
)
return hits / len(self.expected_memories)
@property
def freshness(self) -> float:
"""时效性:非过时记忆的比例"""
if not self.recalled_memories:
return 1.0
fresh = sum(1 for o in self.is_outdated if not o)
return fresh / len(self.is_outdated)
@property
def f1(self) -> float:
"""F1 = 准确率和召回率的调和平均"""
p, r = self.precision, self.recall
return 2 * p * r / (p + r) if (p + r) > 0 else 0
def evaluate_memory_system(
test_cases: list[MemoryEvalResult],
) -> dict:
"""批量评测记忆系统"""
n = len(test_cases)
return {
"avg_precision": sum(t.precision for t in test_cases) / n,
"avg_recall": sum(t.recall for t in test_cases) / n,
"avg_f1": sum(t.f1 for t in test_cases) / n,
"avg_freshness": sum(t.freshness for t in test_cases) / n,
"total_cases": n,
}四个指标之间的关系和平衡:
| 优化方向 | 准确率影响 | 召回率影响 | 典型手段 |
|---|---|---|---|
| 提高相似度阈值 | ↑ 提高 | ↓ 降低 | min_similarity 从 0.7 → 0.85 |
| 增大 Top-K | ↓ 可能降低 | ↑ 提高 | limit 从 3 → 10 |
| 加强去重 | ↑ 提高 | → 不变 | 定期合并重复记忆 |
| 加速遗忘 | ↑ 提高(减少噪声) | ↓ 可能降低 | 缩短衰减半衰期 |
💡 记忆系统和搜索引擎一样,追求 F1 的平衡,而不是单一指标的极端——准确率 100% 但召回率 10%(太保守,什么都不敢召回)和召回率 100% 但准确率 20%(太激进,召回一堆噪声)都是失败的。F1 > 0.8 是一个合理的生产目标。
9.2 学术基准:LoCoMo 与 LongMemEval
自己构建评测集之前,先看看学术界怎么评测记忆系统。两个最主流的基准:
两大记忆评测基准对比:
┌─────────────────────────────────────────────────────────┐
│ LoCoMo(Long Conversation Memory) │
│ ────────────────────────────────────── │
│ 来源:斯坦福 2024 │
│ 任务:在超长对话(数千轮)后回答记忆相关问题 │
│ 评测维度: │
│ ├── 单跳事实回忆(\"用户叫什么名字\") │
│ ├── 多跳推理(\"用户推荐的那本书的作者是谁\") │
│ ├── 时间推理(\"上周二讨论的话题是什么\") │
│ └── 开放生成(\"总结用户的技术偏好\") │
│ │
│ 数据规模:~600 个测试问题,对话长度 300-1000 轮 │
│ 关键发现:上下文窗口再大也不如有记忆系统的 Agent │
├─────────────────────────────────────────────────────────┤
│ LongMemEval(Long-term Memory Evaluation) │
│ ────────────────────────────────────── │
│ 来源:微软研究院 2024 │
│ 任务:评测跨会话长期记忆的质量 │
│ 评测维度: │
│ ├── 信息提取(从历史中提取特定事实) │
│ ├── 知识更新(检测冲突后是否用了新信息) │
│ ├── 时间推理(涉及时间顺序的问答) │
│ ├── 多会话推理(跨多个历史会话的综合问题) │
│ └── 拒绝回答(记忆中没有的信息能否正确拒答) │
│ │
│ 数据规模:~500 个测试问题,模拟 50+ 个独立会话 │
│ 关键发现:冲突解决和时间推理是当前系统最薄弱的环节 │
└─────────────────────────────────────────────────────────┘如何用这些基准来评测自己的记忆系统:
# 用 LoCoMo 思路构建评测 Pipeline(简化版)
import json
from openai import AsyncOpenAI
# 1. 准备测试数据(模拟 LoCoMo 格式)
test_cases = [
{
"history": [ # 模拟历史对话
{"session": "3月10日", "content": "用户说自己是后端开发,用 FastAPI"},
{"session": "3月15日", "content": "用户说数据库选了 PostgreSQL"},
{"session": "3月20日", "content": "用户说已经从 MySQL 迁移到 PG 了"},
],
"question": "用户目前用什么数据库?",
"expected_answer": "PostgreSQL",
"category": "knowledge_update", # 知识更新类
},
{
"history": [
{"session": "3月10日", "content": "用户叫李明,团队 3 人"},
{"session": "3月12日", "content": "张伟是李明的同事,负责前端"},
],
"question": "李明团队中谁负责前端?",
"expected_answer": "张伟",
"category": "multi_hop", # 多跳推理类
},
]
# 2. 评测函数——用 LLM 判断回答是否正确
async def judge_answer(
client: AsyncOpenAI,
question: str,
expected: str,
actual: str,
) -> dict:
"""用 LLM 作为裁判,判断回答质量"""
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"""请判断实际回答是否正确回答了问题。
问题:{question}
预期答案:{expected}
实际回答:{actual}
请以 JSON 格式回答:
{{"correct": true/false, "score": 0-1, "reason": "判断理由"}}""",
}],
temperature=0,
response_format={"type": "json_object"},
)
return json.loads(response.choices[0].message.content)两个基准在不同维度上的侧重对比:
| 维度 | LoCoMo | LongMemEval |
|---|---|---|
| 侧重点 | 超长单会话 | 跨多次独立会话 |
| 核心挑战 | 长上下文中的信息定位 | 知识更新与冲突 |
| 最有价值的测试 | 多跳推理、时间推理 | 知识更新、拒绝回答 |
| 对工程的启示 | 摘要压缩的质量很关键 | 冲突解决机制不可或缺 |
💡 不需要完整跑 LoCoMo / LongMemEval 全套测试——从中各挑 20-30 个有代表性的 case,按你的业务场景做适配,就能发现记忆系统 80% 的问题。重点关注知识更新和多跳推理两类,它们是实际使用中用户抱怨最多的。
9.3 业务评测:构建你自己的记忆质量测试集
学术基准验证的是"通用能力",但你的 Agent 面对的是特定业务场景。需要构建贴合业务的测试集。
核心思路:模拟用户的真实使用路径,在关键节点上插入验证问题。
业务评测的测试用例设计模式:
Step 1:构造对话历史(模拟用户多天使用)
┌────────────────────────────────────────────┐
│ Day 1:用户自我介绍 + 项目背景 │
│ Day 2:技术选型讨论 │
│ Day 3:做了一个重要决定 │
│ Day 5:修改了之前的决定 │
│ Day 7:新开会话,问了一个问题 │
└────────────────────────────────────────────┘
Step 2:在 Day 7 的新会话中插入验证问题
┌────────────────────────────────────────────┐
│ Q1(事实回忆):\"我叫什么名字?\" │
│ Q2(偏好召回):\"我用什么数据库?\" │
│ Q3(知识更新):\"我现在用什么框架?\" │
│ Q4(多跳推理):\"我同事负责什么模块?\" │
│ Q5(拒绝回答):\"我的手机号是多少?\" │
└────────────────────────────────────────────┘
Step 3:对比 Agent 回答 vs 标准答案测试用例模板(可直接使用):
# 记忆质量测试用例模板
memory_test_suite = {
"name": "电商助手记忆质量测试集 v1",
"scenarios": [
# ─── 场景一:基本事实记忆 ───
{
"setup_conversations": [
{"day": 1, "messages": [
{"role": "user", "content": "我叫李明,做电商后端"},
{"role": "user", "content": "用 FastAPI + PostgreSQL"},
]},
],
"test_queries": [
{
"query": "我叫什么名字?",
"expected": "李明",
"category": "fact_recall",
"priority": "P0", # 最基本的能力,必须通过
},
{
"query": "帮我设计一个新接口",
"expected_context": ["FastAPI", "PostgreSQL"],
"category": "context_injection",
"priority": "P0",
},
],
},
# ─── 场景二:知识更新 ───
{
"setup_conversations": [
{"day": 1, "messages": [
{"role": "user", "content": "我们目前用 MySQL"},
]},
{"day": 5, "messages": [
{"role": "user", "content": "我们已经迁移到 PostgreSQL 了"},
]},
],
"test_queries": [
{
"query": "我用什么数据库?",
"expected": "PostgreSQL",
"not_expected": "MySQL", # 不应该出现旧信息
"category": "knowledge_update",
"priority": "P0",
},
],
},
# ─── 场景三:拒绝回答 ───
{
"setup_conversations": [
{"day": 1, "messages": [
{"role": "user", "content": "我叫李明"},
]},
],
"test_queries": [
{
"query": "我的银行卡号是什么?",
"expected_behavior": "refuse", # 应该拒答
"category": "refusal",
"priority": "P1",
},
],
},
],
}自动化回归测试——每次改动记忆逻辑后跑一遍:
async def run_memory_regression_test(
agent, # 你的 Agent 实例
test_suite: dict,
) -> dict:
"""运行记忆回归测试"""
results = {"passed": 0, "failed": 0, "details": []}
for scenario in test_suite["scenarios"]:
# 1. 模拟历史对话(写入记忆)
user_id = f"test_user_{hash(str(scenario)) % 10000}"
for conv in scenario["setup_conversations"]:
for msg in conv["messages"]:
await agent.chat(user_id, msg["content"])
# 2. 运行验证问题
for test in scenario["test_queries"]:
response = await agent.chat(user_id, test["query"])
# 3. 判断是否通过
passed = True
reason = ""
if "expected" in test:
if test["expected"].lower() not in response.lower():
passed = False
reason = f"期望包含 '{test['expected']}',实际: {response[:100]}"
if "not_expected" in test:
if test["not_expected"].lower() in response.lower():
passed = False
reason = f"不应包含 '{test['not_expected']}',但出现了"
results["passed" if passed else "failed"] += 1
results["details"].append({
"query": test["query"],
"category": test["category"],
"priority": test["priority"],
"passed": passed,
"reason": reason,
})
return results测试集的覆盖率目标:
| 类别 | 建议测试数 | P0 占比 | 说明 |
|---|---|---|---|
| 事实回忆 | 10-15 条 | 100% | 最基本的能力 |
| 知识更新 | 8-10 条 | 100% | 记忆系统区别于 RAG 的核心 |
| 多跳推理 | 5-8 条 | 50% | 有图记忆才需要重测 |
| 时间推理 | 5 条 | 50% | "上次/之前/最近"类查询 |
| 拒绝回答 | 5 条 | 80% | 不编造不存在的记忆 |
| 记忆利用 | 5-8 条 | 80% | 回答是否真的用了记忆 |
💡 测试集是"活的"——每次线上发现记忆相关的 Bad Case,就加入测试集。3 个月后你会积累 50-100 个高质量测试用例,比任何学术基准都更贴合你的业务。P0 级别的 case 全部通过才能发版。
9.4 线上监控:记忆命中率与用户满意度追踪
离线评测保证"发布前没问题",线上监控保证"运行中没退化"。记忆系统需要持续观测的核心指标:
线上监控的三层指标体系:
Layer 1:系统指标(实时告警)
├── 记忆检索延迟 P99 < 100ms
├── 事实抽取成功率 > 95%
├── 向量写入失败率 < 0.1%
└── 记忆库容量(每用户平均记忆条数)
Layer 2:质量指标(小时级看板)
├── 记忆命中率:有召回 / 总请求(目标 > 60%)
├── 记忆使用率:LLM 实际引用了记忆 / 有召回(目标 > 70%)
└── 过时记忆率:用户纠正"你记错了" / 总记忆召回(目标 < 5%)
Layer 3:业务指标(天级分析)
├── 有记忆用户的留存率 vs 无记忆用户
├── 有记忆时会话轮数(预期减少,因为不需要反复介绍)
└── 用户主动触发记忆的频率("你还记得...吗")记忆命中率的埋点实现:
import time
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class MemoryMetrics:
"""记忆系统的运行时指标收集"""
_counters: dict = field(default_factory=lambda: defaultdict(int))
_latencies: dict = field(default_factory=lambda: defaultdict(list))
def record_retrieval(
self, user_id: str, query: str,
recalled_count: int, latency_ms: float,
):
"""记录一次检索"""
self._counters["total_requests"] += 1
if recalled_count > 0:
self._counters["hit_requests"] += 1
else:
self._counters["miss_requests"] += 1
self._latencies["retrieval"].append(latency_ms)
def record_correction(self, user_id: str):
"""用户纠正了错误记忆("你记错了")"""
self._counters["corrections"] += 1
@property
def hit_rate(self) -> float:
total = self._counters["total_requests"]
return self._counters["hit_requests"] / total if total > 0 else 0
@property
def correction_rate(self) -> float:
hits = self._counters["hit_requests"]
return self._counters["corrections"] / hits if hits > 0 else 0
@property
def p99_latency(self) -> float:
lats = sorted(self._latencies["retrieval"])
if not lats:
return 0
idx = int(len(lats) * 0.99)
return lats[min(idx, len(lats) - 1)]
def summary(self) -> dict:
return {
"hit_rate": f"{self.hit_rate:.1%}",
"correction_rate": f"{self.correction_rate:.1%}",
"p99_latency_ms": f"{self.p99_latency:.1f}",
"total_requests": self._counters["total_requests"],
}
# 在 Agent 中集成指标收集
metrics = MemoryMetrics()
async def chat_with_monitoring(
user_id: str, message: str,
memory: "SemanticMemoryService",
client: "AsyncOpenAI",
) -> str:
# 1. 检索记忆(带计时)
start = time.time()
memories = await memory.recall(user_id, message, limit=5)
latency = (time.time() - start) * 1000
# 2. 记录指标
metrics.record_retrieval(
user_id, message,
recalled_count=len(memories),
latency_ms=latency,
)
# 3. 检测用户是否在纠正记忆
correction_keywords = ["你记错了", "不对", "我改了", "不再用"]
if any(kw in message for kw in correction_keywords):
metrics.record_correction(user_id)
# 4. 正常对话流程...
# ...A/B 测试——量化记忆系统的价值:
记忆系统的 A/B 测试设计:
实验组 A(有记忆) 对照组 B(无记忆)
├── 50% 流量 ├── 50% 流量
├── 完整的记忆检索 + 注入 ├── 纯上下文窗口
└── 观测指标: └── 观测指标:
├── 会话平均轮数 ├── 会话平均轮数
├── 用户重复提供信息次数 ├── 用户重复提供信息次数
├── 回答满意度评分 ├── 回答满意度评分
└── 7 日留存率 └── 7 日留存率
预期结果(基于行业数据):
├── 会话轮数减少 20-30%(不需要反复介绍自己)
├── 满意度提升 15-25%
└── 7 日留存提升 10-20%💡 线上监控最重要的单一指标是"过时记忆率"(correction_rate)——如果用户频繁纠正"你记错了",说明冲突解决机制失效了。这比记忆命中率更重要——命中率低只是"没帮上忙",过时记忆率高是"帮了倒忙"。建议设告警阈值:correction_rate > 5% 时自动触发排查。
第 9 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| 四大指标 | 准确率、召回率、时效性、利用率——追求 F1 > 0.8 的平衡 |
| 学术基准 | LoCoMo(长对话)+ LongMemEval(跨会话)挑 20-30 case 做适配 |
| 业务测试集 | 模拟用户使用路径 + 验证问题,线上 Bad Case 持续加入 |
| 线上监控 | 系统指标(延迟)+ 质量指标(命中率)+ 业务指标(留存) |
| 最关键指标 | 过时记忆率(correction_rate)> 5% 必须告警 |
10. 生产实践:完整记忆系统的设计模式
前面 9 章分别介绍了记忆系统的各个模块。本章把它们串联成一个可部署到生产环境的完整系统——从端到端架构、多用户隔离、隐私合规到性能优化,解决"从 Demo 到上线"的最后一公里问题。
10.1 端到端架构:从请求到记忆的完整数据流
先看一个生产级记忆系统的完整架构图——把前面所有模块串在一起:
生产级 Agent 记忆系统——端到端架构:
用户请求
│
▼
┌─────────────────────────────────────────────────────────────┐
│ API Gateway / WebSocket Handler │
│ → 鉴权、限流、提取 user_id + session_id │
└──────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Memory Retrieval Layer(记忆检索层)── 同步,阻塞响应 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ① Semantic Memory 检索 │ │
│ │ → pgvector 余弦相似度 Top-5 │ │
│ │ → 返回用户画像、偏好、项目信息 │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ ② Graph Memory 检索 │ │
│ │ → 从召回实体出发 BFS 2 跳 │ │
│ │ → 返回关系网络上下文 │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ ③ Episodic Memory 检索(可选) │ │
│ │ → 按关键词搜索最近 30 天的相关会话 │ │
│ │ → 返回历史对话片段 │ │
│ └─────────────────────────────────────────────────────┘ │
│ 总耗时目标:< 50ms(三路并行) │
└──────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Context Assembly(上下文组装) │
│ → Token Budget 分配 │
│ → System Prompt + 记忆上下文 + 对话历史 + 当前输入 │
│ → 如果超限:摘要压缩 / 裁剪对话历史 │
└──────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ LLM Inference │
│ → 调用 GPT-4o / Claude 生成回答 │
│ → 流式返回给用户 │
└──────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Memory Update Layer(记忆更新层)── 异步,不阻塞响应 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ④ Episodic 写入 │ │
│ │ → 记录 user + assistant 消息到 events 表 │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ ⑤ 事实抽取(条件触发) │ │
│ │ → 规则预判:本轮是否包含新信息? │ │
│ │ → YES → LLM 抽取 → 冲突检测 → 写入 Semantic │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ ⑥ 图谱更新(条件触发) │ │
│ │ → 抽取三元组 → 写入 Graph Memory │ │
│ └─────────────────────────────────────────────────────┘ │
│ 总耗时:200-500ms(后台异步,用户无感知) │
└─────────────────────────────────────────────────────────────┘把这个架构翻译成代码骨架:
import asyncio
from dataclasses import dataclass
@dataclass
class MemoryContext:
"""检索到的记忆上下文"""
semantic_facts: list[dict] # 语义记忆(用户画像)
graph_relations: list[dict] # 图谱关系
episodic_history: list[dict] # 历史对话片段
def format_for_prompt(self) -> str:
"""格式化为 LLM 可理解的上下文"""
parts = []
if self.semantic_facts:
parts.append("[用户画像]")
for f in self.semantic_facts:
parts.append(f"- {f['fact_key']}: {f['fact_value']}")
if self.graph_relations:
parts.append("\n[关系网络]")
for r in self.graph_relations[:8]:
parts.append(f"- {r['from']} --{r['relation']}--> {r['to']}")
if self.episodic_history:
parts.append("\n[相关历史]")
for h in self.episodic_history[:3]:
parts.append(f"- [{h['date']}] {h['summary'][:150]}")
return "\n".join(parts)
class ProductionMemoryAgent:
"""生产级带记忆的 Agent"""
def __init__(
self,
llm_client,
semantic_memory: "SemanticMemoryService",
graph_memory: "GraphMemory",
episodic_memory: "EpisodicMemoryService",
fact_extractor: "FactExtractor",
):
self.llm = llm_client
self.semantic = semantic_memory
self.graph = graph_memory
self.episodic = episodic_memory
self.extractor = fact_extractor
async def chat(
self, user_id: str, session_id: str, message: str,
) -> str:
# ─── 同步阶段:检索 + 推理 ───
# 1. 三路并行检索记忆
memory_ctx = await self._retrieve_memories(user_id, message)
# 2. 组装上下文 + 调用 LLM
response = await self._generate_response(
message, memory_ctx
)
# ─── 异步阶段:更新记忆(不阻塞响应)───
# 3. 后台更新记忆
asyncio.create_task(
self._update_memories(
user_id, session_id, message, response
)
)
return response
async def _retrieve_memories(
self, user_id: str, query: str,
) -> MemoryContext:
"""三路并行检索(总耗时 ≈ 最慢的那一路)"""
semantic_task = self.semantic.recall(user_id, query, limit=5)
graph_task = asyncio.to_thread(
self.graph.get_neighbors_for_query, query
)
episodic_task = self.episodic.search_by_keyword(
user_id, query, limit=3
)
facts, relations, history = await asyncio.gather(
semantic_task, graph_task, episodic_task
)
return MemoryContext(
semantic_facts=facts,
graph_relations=relations,
episodic_history=history,
)
async def _update_memories(
self, user_id: str, session_id: str,
message: str, response: str,
):
"""异步更新记忆(后台执行)"""
# ① 记录事件流(总是执行)
await self.episodic.add_event(
session_id, "user", message
)
await self.episodic.add_event(
session_id, "assistant", response
)
# ② 条件触发事实抽取
if self._should_extract(message):
conversation = [
{"role": "user", "content": message},
{"role": "assistant", "content": response},
]
result = await self.extractor.extract(conversation)
if result.has_new_info:
for fact in result.facts:
await self.semantic.store_fact_safe(user_id, fact)
def _should_extract(self, message: str) -> bool:
"""规则预判:是否需要触发事实抽取"""
triggers = [
"我叫", "我是", "我的项目", "我们用", "我们选",
"我喜欢", "我偏好", "我不想", "已经换成", "迁移到",
]
return any(t in message for t in triggers)各层的延迟预算:
| 阶段 | 目标延迟 | 是否阻塞用户 | 说明 |
|---|---|---|---|
| 记忆检索 | < 50ms | ✅ 阻塞 | 三路并行,取决于最慢那路 |
| 上下文组装 | < 5ms | ✅ 阻塞 | 纯 CPU 操作 |
| LLM 推理 | 1-5s | ✅ 阻塞(流式) | TTFT 通常 0.5-1s |
| 事件记录 | < 10ms | ❌ 异步 | 写数据库 |
| 事实抽取 | 200-500ms | ❌ 异步 | 调 LLM,后台执行 |
| 图谱更新 | 100-300ms | ❌ 异步 | 抽取 + 写入 |
💡 "同步检索、异步更新"是记忆系统的黄金法则——用户感知的延迟只有检索层的 50ms,记忆更新的 500ms 完全在后台完成。千万不要在返回响应之前做事实抽取和图谱更新,那会让每次回答多等半秒。
10.2 多用户隔离与权限控制
记忆系统是用户级别最敏感的数据——A 用户的记忆绝对不能泄漏给 B 用户。这不是"可选的安全特性",而是"出了事故就是 P0 级别的信任危机"。
多用户隔离的三层防护:
┌─────────────────────────────────────────────────────────┐
│ Layer 1:数据层隔离(基础 ✓) │
│ ────────────────────────────────────── │
│ 所有记忆表都带 user_id 字段 │
│ 所有 SQL 查询都必须带 WHERE user_id = :uid │
│ 建索引时 user_id 作为第一个字段 │
│ │
│ 风险点:忘记加 WHERE 条件 → 全量泄漏 │
│ 防护:用 PostgreSQL RLS(行级安全策略)兜底 │
├─────────────────────────────────────────────────────────┤
│ Layer 2:应用层隔离(推荐 ✓) │
│ ────────────────────────────────────── │
│ 记忆服务的每个方法都强制要求 user_id 参数 │
│ 不提供"查询所有用户记忆"的 API │
│ 日志中脱敏 user_id(只显示 hash 前 8 位) │
├─────────────────────────────────────────────────────────┤
│ Layer 3:网络层隔离(企业级) │
│ ────────────────────────────────────── │
│ 不同租户的记忆存在不同的数据库实例 │
│ 向量数据库按租户分 Collection / Namespace │
│ 适用于 SaaS 产品的多租户场景 │
└─────────────────────────────────────────────────────────┘用 PostgreSQL RLS(行级安全策略)做兜底隔离——即使代码写漏了 WHERE 条件,数据库层面也不会泄漏:
-- 启用行级安全策略
ALTER TABLE semantic_memories ENABLE ROW LEVEL SECURITY;
ALTER TABLE events ENABLE ROW LEVEL SECURITY;
ALTER TABLE sessions ENABLE ROW LEVEL SECURITY;
-- 策略:只能访问自己的数据
-- 应用连接时通过 SET app.current_user_id 传递用户身份
CREATE POLICY user_isolation_policy ON semantic_memories
USING (user_id = current_setting('app.current_user_id'));
CREATE POLICY user_isolation_policy ON events
USING (session_id IN (
SELECT id FROM sessions
WHERE user_id = current_setting('app.current_user_id')
));
CREATE POLICY user_isolation_policy ON sessions
USING (user_id = current_setting('app.current_user_id'));在应用层强制隔离的代码模式:
from contextlib import asynccontextmanager
@asynccontextmanager
async def user_scoped_db(db: AsyncSession, user_id: str):
"""设置当前请求的用户上下文(配合 RLS)"""
await db.execute(
text("SET app.current_user_id = :uid"),
{"uid": user_id},
)
try:
yield db
finally:
await db.execute(text("RESET app.current_user_id"))
# 使用方式
async def get_user_memories(db: AsyncSession, user_id: str):
async with user_scoped_db(db, user_id) as scoped_db:
# 即使这里忘记写 WHERE user_id = ...
# RLS 也会自动过滤,只返回该用户的数据
result = await scoped_db.execute(
text("SELECT * FROM semantic_memories")
)
return result.fetchall()不同部署模式下的隔离策略选择:
| 场景 | 推荐隔离方式 | 说明 |
|---|---|---|
| 单用户/个人项目 | 无需特殊处理 | 只有一个用户 |
| 多用户 SaaS | RLS + 应用层隔离 | 成本最低的生产级方案 |
| 企业多租户 | 独立数据库实例 | 数据物理隔离,合规要求最高 |
| 开放平台 | Namespace 隔离 + API Key | 向量库按 namespace,PG 按 schema |
💡 RLS 是"保险丝",不是"主开关"——你的代码中仍然应该在每条 SQL 中显式写
WHERE user_id = :uid,RLS 只是防止你忘记写的时候出现数据泄漏。两道防线一起用,才能真正做到万无一失。
10.3 隐私合规:记忆数据的脱敏与生命周期管理
记忆系统天然会存储用户的个人信息——姓名、公司、技术偏好、项目细节。这在 GDPR / 个人信息保护法下是高风险数据处理行为,必须在设计阶段就考虑合规。
记忆数据的隐私风险矩阵:
┌──────────────────────────────────────────────────────┐
│ 高风险(必须处理) │
│ ├── 真实姓名、手机号、邮箱 │
│ ├── 公司名称、项目代号(商业机密) │
│ ├── 密码、API Key(对话中可能出现) │
│ └── 健康、财务等敏感类别信息 │
├──────────────────────────────────────────────────────┤
│ 中风险(建议处理) │
│ ├── 技术栈偏好(可间接识别身份) │
│ ├── 工作经历描述 │
│ └── 具体的代码片段(可能含业务逻辑) │
├──────────────────────────────────────────────────────┤
│ 低风险(可直接存储) │
│ ├── 通用技术偏好("喜欢 Python") │
│ ├── 抽象化的项目类型("电商后端") │
│ └── 编程风格偏好 │
└──────────────────────────────────────────────────────┘关键合规要求及实现方案:
| 合规要求 | 法规来源 | 实现方案 |
|---|---|---|
| 知情同意 | GDPR Art.6 / 个保法 Art.13 | 首次使用时弹窗告知"我们会记住你的偏好" |
| 数据最小化 | GDPR Art.5 | 事实抽取 Prompt 中明确"不提取敏感信息" |
| 删除权 | GDPR Art.17 | 提供"删除我的所有记忆" API |
| 可携带权 | GDPR Art.20 | 提供"导出我的记忆" API |
| 存储期限 | 个保法 Art.19 | 设置记忆 TTL,到期自动清理 |
敏感信息过滤——在事实抽取阶段拦截:
import re
class SensitiveFilter:
"""敏感信息过滤器——在记忆写入前拦截"""
# 正则匹配敏感信息
PATTERNS = {
"phone": re.compile(r"1[3-9]\d{9}"),
"email": re.compile(r"\S+@\S+\.\S+"),
"id_card": re.compile(r"\d{17}[\dXx]"),
"api_key": re.compile(
r"(sk-|ak_|secret_|password[=:])\S{8,}"
),
"bank_card": re.compile(r"\d{16,19}"),
}
# 不应该存储的事实类别
BLOCKED_CATEGORIES = {
"health", "finance", "password", "credential",
}
def filter_fact(self, fact: "ExtractedFact") -> bool:
"""返回 True 表示安全,False 表示应拦截"""
# 1. 类别黑名单
if fact.category in self.BLOCKED_CATEGORIES:
return False
# 2. 正则检测敏感内容
for name, pattern in self.PATTERNS.items():
if pattern.search(fact.value):
return False
if pattern.search(fact.source_quote):
return False
return True
def redact_text(self, text: str) -> str:
"""脱敏处理:将敏感信息替换为占位符"""
for name, pattern in self.PATTERNS.items():
text = pattern.sub(f"[已脱敏-{name}]", text)
return text用户删除权的实现——"忘记我"功能:
async def forget_user(
db: AsyncSession, user_id: str,
scope: str = "all", # all / semantic / episodic / graph
):
"""删除用户的记忆数据(GDPR 删除权)"""
if scope in ("all", "semantic"):
await db.execute(text(
"DELETE FROM semantic_memories WHERE user_id = :uid"
), {"uid": user_id})
if scope in ("all", "episodic"):
# 先删事件,再删会话(外键约束)
await db.execute(text("""
DELETE FROM events WHERE session_id IN (
SELECT id FROM sessions WHERE user_id = :uid
)
"""), {"uid": user_id})
await db.execute(text(
"DELETE FROM sessions WHERE user_id = :uid"
), {"uid": user_id})
if scope in ("all", "graph"):
await db.execute(text(
"DELETE FROM user_graphs WHERE user_id = :uid"
), {"uid": user_id})
await db.commit()
# 记录审计日志(删除操作本身需要留痕)
await db.execute(text("""
INSERT INTO audit_logs (user_id, action, scope, created_at)
VALUES (:uid, 'memory_deletion', :scope, NOW())
"""), {"uid": user_id, "scope": scope})
await db.commit()💡 在事实抽取的 Prompt 中就应该声明"不要提取手机号、邮箱、密码等敏感信息"——这是第一道防线。
SensitiveFilter是第二道防线(正则兜底)。两层过滤后写入的记忆,基本不会包含高风险个人信息。但一定要提供"删除我的记忆"功能——这不是可选的,是法律要求的。
10.4 性能优化:异步写入、批量嵌入、缓存预热
记忆系统的性能瓶颈集中在三个地方:Embedding 生成(每次 API 调用 30-80ms)、向量写入(含索引更新)、图记忆加载(从数据库反序列化)。逐一优化:
三大性能瓶颈及优化策略:
① Embedding 生成 ── 单次 30-80ms,高频场景成为瓶颈
→ 解法:批量嵌入 + 本地缓存
② 向量写入 ── 含索引更新,HNSW 重建耗时
→ 解法:异步队列 + 批量写入
③ 图记忆加载 ── 每次请求从 DB 加载 JSON 反序列化
→ 解法:用户级内存缓存 + 预热优化一:批量 Embedding——减少 API 调用次数
from openai import AsyncOpenAI
class BatchEmbedder:
"""批量 Embedding 服务——合并多次请求为一次 API 调用"""
def __init__(self, client: AsyncOpenAI, batch_size: int = 20):
self.client = client
self.batch_size = batch_size
self._cache: dict[str, list[float]] = {} # 简易缓存
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""批量生成 Embedding(单次 API 调用)"""
# 1. 检查缓存
uncached = [t for t in texts if t not in self._cache]
if uncached:
# 2. 批量调用(最多 2048 条/次)
for i in range(0, len(uncached), self.batch_size):
batch = uncached[i:i + self.batch_size]
response = await self.client.embeddings.create(
model="text-embedding-3-small",
input=batch,
)
for text_input, data in zip(batch, response.data):
self._cache[text_input] = data.embedding
# 3. 从缓存返回
return [self._cache[t] for t in texts]
async def embed_single(self, text: str) -> list[float]:
"""单条 Embedding(优先走缓存)"""
results = await self.embed_batch([text])
return results[0]
# 使用:一次性嵌入 10 条事实,只调 1 次 API
embedder = BatchEmbedder(client)
facts = ["用户偏好 PG", "项目是电商", "团队 3 人", ...]
vectors = await embedder.embed_batch(facts)
# → 1 次 API 调用(而非 10 次),节省 90% 延迟优化二:异步写入队列——记忆更新不阻塞对话
import asyncio
from collections import deque
class MemoryWriteQueue:
"""异步记忆写入队列——收集 + 批量刷写"""
def __init__(
self,
memory_service: "SemanticMemoryService",
embedder: BatchEmbedder,
flush_interval: float = 2.0, # 每 2 秒刷写一次
max_batch: int = 20, # 或积累 20 条就刷写
):
self.memory = memory_service
self.embedder = embedder
self.flush_interval = flush_interval
self.max_batch = max_batch
self._queue: deque = deque()
self._running = False
async def enqueue(self, user_id: str, fact: "ExtractedFact"):
"""入队(立即返回,不阻塞)"""
self._queue.append((user_id, fact))
# 积累够了立即刷写
if len(self._queue) >= self.max_batch:
await self._flush()
async def start(self):
"""启动后台刷写循环"""
self._running = True
while self._running:
await asyncio.sleep(self.flush_interval)
if self._queue:
await self._flush()
async def _flush(self):
"""批量刷写"""
batch = []
while self._queue and len(batch) < self.max_batch:
batch.append(self._queue.popleft())
if not batch:
return
# 批量生成 Embedding
texts = [
f"{f.category}: {f.key} = {f.value}"
for _, f in batch
]
vectors = await self.embedder.embed_batch(texts)
# 批量写入数据库
for (user_id, fact), vec in zip(batch, vectors):
await self.memory.store_fact_with_vector(
user_id, fact, vec
)优化三:图记忆缓存——避免每次请求都反序列化
from functools import lru_cache
import time
class GraphMemoryCache:
"""用户级图记忆缓存"""
def __init__(self, db, ttl_seconds: int = 300):
self.db = db
self.ttl = ttl_seconds
self._cache: dict[str, tuple[GraphMemory, float]] = {}
async def get(self, user_id: str) -> "GraphMemory":
"""获取图记忆(优先走缓存)"""
now = time.time()
if user_id in self._cache:
graph, cached_at = self._cache[user_id]
if now - cached_at < self.ttl:
return graph # 缓存命中,< 0.01ms
# 缓存未命中,从数据库加载
graph = await load_graph(self.db, user_id) # ~5-20ms
self._cache[user_id] = (graph, now)
return graph
def invalidate(self, user_id: str):
"""图谱更新后使缓存失效"""
self._cache.pop(user_id, None)各优化手段的效果量化:
| 优化手段 | 优化前 | 优化后 | 提升幅度 |
|---|---|---|---|
| 批量 Embedding | 10 条 × 50ms = 500ms | 1 次 × 60ms = 60ms | 8x |
| 异步写入队列 | 写入阻塞 200ms | 入队 < 1ms | 200x(用户感知) |
| 图缓存 | 每次加载 15ms | 缓存命中 0.01ms | 1500x |
| 三项组合 | 记忆更新总耗时 ~700ms | 用户感知 < 1ms | 对话体验零影响 |
💡 性能优化的优先级:异步化 > 批量化 > 缓存——先确保记忆更新不阻塞用户(异步化解决 90% 的体验问题),再用批量化降低 API 成本,最后用缓存优化高频读取路径。不要在记忆系统上过度优化——对话的主要延迟来自 LLM 推理(1-5s),记忆检索的 50ms 相比之下微不足道。
10.5 设计清单:记忆系统上线前的 Checklist
把前面所有章节的关键点浓缩为一份上线前检查清单——发版前逐项过一遍,确保没有遗漏:
═══════════════════════════════════════════════════════
记忆系统上线 Checklist
═══════════════════════════════════════════════════════
─── 基础能力 ───
□ 短期记忆:对话超过 N 轮后自动摘要压缩
□ 短期记忆:Checkpointer 配置为持久化存储(PG/SQLite)
□ Token 预算:各模块有明确的 Token 分配,输出预留 ≥ 4000
□ System Prompt + 记忆 + 对话历史的总 Token < 模型上限的 80%
─── 长期记忆 ───
□ 语义记忆:事实抽取 Prompt 明确且有示例
□ 语义记忆:向量检索阈值已调优(推荐 0.75-0.85)
□ 冲突解决:新旧事实冲突时有明确的处理策略
□ 图记忆(如需要):三元组抽取 + NetworkX 存储可正常工作
□ 情景记忆(如需要):sessions + events 表已建好索引
─── 数据安全 ───
□ 多用户隔离:所有查询都带 user_id 过滤
□ RLS 策略已启用(防止代码遗漏导致数据泄漏)
□ 敏感信息过滤:手机号/邮箱/密码/API Key 不会被存入记忆
□ 用户删除权:"忘记我" API 已实现并测试
□ 审计日志:记忆的增删改都有日志可追溯
─── 性能 ───
□ 记忆检索延迟 P99 < 100ms(向量搜索 + 图遍历)
□ 记忆更新是异步的,不阻塞用户响应
□ Embedding 生成用批量 API(而非逐条调用)
□ 图记忆有用户级缓存(避免每次反序列化)
─── 质量保障 ───
□ 记忆回归测试集:≥ 30 个 case,P0 全部通过
□ 知识更新测试:用户改了偏好后,记忆能正确更新
□ 拒绝回答测试:不编造记忆中不存在的信息
□ 线上监控:命中率、过时率、检索延迟有看板和告警
□ 过时记忆率告警阈值:> 5% 触发排查
─── 运维 ───
□ 定时去重任务已配置(每天凌晨)
□ 过期记忆清理任务已配置(按 category 差异化 TTL)
□ 渐进式摘要(如需要):30 天以上的会话自动归档
□ 数据库备份策略覆盖记忆相关的所有表
□ 容量预估:单用户记忆上限 + 总存储预算已规划
═══════════════════════════════════════════════════════按项目阶段的建议实施路径:
渐进式实施路线图:
Phase 1:MVP(1-2 天)
├── 实现滑动窗口 + 对话摘要压缩(第 3 章)
├── 用 LangGraph Checkpointer 持久化会话状态
└── 效果:单会话内的上下文连续性 ✓
Phase 2:基础记忆(3-5 天)
├── 加入语义记忆:事实抽取 + pgvector 存储(第 5 章)
├── 加入 Mem0 或自建 SemanticMemoryService
└── 效果:跨会话记住用户画像 ✓
Phase 3:高级能力(1-2 周)
├── 加入图记忆:三元组抽取 + 混合检索(第 7 章)
├── 加入记忆管理:去重 + 遗忘 + 反思循环(第 6 章)
├── 加入情景记忆:事件流 + 会话回溯(第 4 章)
└── 效果:关系推理 + 记忆自维护 ✓
Phase 4:生产加固(持续)
├── 多用户隔离 + 隐私合规(第 10 章)
├── 性能优化:异步化 + 批量化 + 缓存
├── 质量保障:评测 + 监控 + A/B 测试(第 9 章)
└── 效果:生产级可靠的记忆系统 ✓💡 不要试图一步到位——Phase 1 只需要 1-2 天,就能让你的 Agent 从"金鱼记忆"升级到"至少能记住这次对话"。Phase 2 再花 3-5 天,就能实现跨会话的用户画像。大多数 Agent 到 Phase 2 就够用了,Phase 3-4 等业务需求驱动再加。
第 10 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| 端到端架构 | 同步检索(< 50ms)→ LLM 推理 → 异步更新(后台 500ms) |
| 黄金法则 | 同步检索、异步更新——记忆更新永远不阻塞用户响应 |
| 用户隔离 | RLS 兜底 + 应用层 WHERE user_id,双重防线 |
| 隐私合规 | Prompt 声明 + 正则过滤 + 删除权 API,三层防护 |
| 性能优化 | 异步化 > 批量化 > 缓存,优先级从高到低 |
| 实施路径 | MVP(1天)→ 基础记忆(3天)→ 高级能力(1周)→ 生产加固 |