第六章 持久化与检查点
6.1 为什么需要持久化 —— Agent 也需要"存档"
前面五章构建的所有 Agent 都有一个致命弱点:没有记忆。每次 invoke() 都是一个全新的开始,之前的对话历史、中间状态、工具调用结果——全部丢失。
问题场景
场景 1:多轮对话
用户: "帮我查北京天气"
Agent: "北京今天晴,25°C"(第一轮 invoke 结束)
用户: "那上海呢?"
Agent: "你在说什么?"(第二轮 invoke,完全不记得上一轮)
场景 2:长时间任务
Agent 正在执行一个 10 步的研究任务...
第 7 步时服务器重启了。
重启后:从头开始。前 6 步白做了。
场景 3:人工审批
Agent: "我要删除这个文件,请确认"
(暂停等人类回复...)
人类 30 分钟后回来确认。
Agent: "我是谁?我在哪?"(内存早就释放了)解决方案:Checkpointer
LangGraph 的答案是Checkpointer(检查点)——在图执行的每一步之后,自动保存当前 State 的快照。
没有 Checkpointer:
invoke() → Node A → Node B → Node C → 结果
↑ 进来的状态 丢失 ↗ 全部丢失
有 Checkpointer:
invoke() → Node A → [存档] → Node B → [存档] → Node C → [存档] → 结果
↓ ↓ ↓
checkpoint_1 checkpoint_2 checkpoint_3
(保存在磁盘/DB)
下次 invoke 时:
→ 读取最新的 checkpoint → 从上次的状态继续Checkpointer 的三大能力
| 能力 | 说明 | 应用场景 |
|---|---|---|
| 多轮对话 | 每轮的 messages 自动积累 | 聊天机器人 |
| 断点恢复 | 中断后从上次状态继续执行 | 长任务、人工审批 |
| 状态回溯 | 查看/回滚到任意历史步骤 | 调试、审计 |
类比:Checkpointer 就是游戏里的"自动存档"。每打过一关自动存一次,死了可以从最近的存档点继续,不用从头来。
6.2 MemorySaver:最简单的内存检查点
MemorySaver 是 LangGraph 内置的最简单的 Checkpointer——它把 State 快照存在内存中。适合开发调试,不适合生产(进程重启就丢了)。
使用方式
只需在 compile() 时传入:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_deepseek import ChatDeepSeek
# 构建一个简单的聊天图
llm = ChatDeepSeek(model="deepseek-chat", temperature=0)
def chatbot(state: MessagesState):
response = llm.invoke(state["messages"])
return {"messages": [response]}
graph = StateGraph(MessagesState)
graph.add_node("chatbot", chatbot)
graph.add_edge(START, "chatbot")
graph.add_edge("chatbot", END)
# 关键:传入 checkpointer
memory = MemorySaver()
app = graph.compile(checkpointer=memory)有 Checkpointer 后的变化
一旦启用 Checkpointer,invoke() 必须传入 config 参数,指定 thread_id:
from langchain_core.messages import HumanMessage
# 必须指定 thread_id —— 标识"这是哪个对话"
config = {"configurable": {"thread_id": "user_001"}}
# 第一轮对话
result = app.invoke(
{"messages": [HumanMessage(content="我叫小明")]},
config=config
)
print(result["messages"][-1].content)
# "你好小明!有什么可以帮你的?"
# 第二轮对话(同一个 thread_id → 记得上一轮!)
result = app.invoke(
{"messages": [HumanMessage(content="我刚才说我叫什么?")]},
config=config
)
print(result["messages"][-1].content)
# "你说你叫小明。" ← 记住了!有 Checkpointer 的多轮对话:
第 1 轮 invoke(thread_id="user_001")
┌──────────────────────────────────────┐
│ messages: [ │
│ HumanMessage("我叫小明"), │
│ AIMessage("你好小明!") │
│ ] │
└──────────────────────────────────────┘
│ 自动存档到 MemorySaver
↓
第 2 轮 invoke(同一个 thread_id)
┌──────────────────────────────────────┐
│ messages: [ │
│ HumanMessage("我叫小明"), ← 从存档恢复 │
│ AIMessage("你好小明!"), ← 从存档恢复 │
│ HumanMessage("我叫什么?"), ← 本轮新增 │
│ AIMessage("你叫小明。") ← 本轮新增 │
│ ] │
└──────────────────────────────────────┘没有 thread_id 会怎样?
# ❌ 忘记传 config → 报错!
result = app.invoke({"messages": [HumanMessage(content="你好")]})
# → Error: Checkpointer requires a thread_id in config启用 Checkpointer 后,thread_id 是必传的。它是多轮对话的"会话标识"——详见 6.4 节。
MemorySaver 的定位:开发调试用。它只在内存中存储,进程退出数据就没了。生产环境需要用下一节的 SqliteSaver 或 PostgresSaver。
6.3 SqliteSaver / PostgresSaver:生产级持久化
MemorySaver 的数据存在内存里——进程一重启就全没了。生产环境需要把 State 持久化到磁盘或数据库。
SqliteSaver(单机部署)
适合个人项目、小团队、单服务器部署的场景。
# 安装依赖
pip install langgraph-checkpoint-sqlitefrom langgraph.checkpoint.sqlite import SqliteSaver
# 方式 1:指定数据库文件路径
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
app = graph.compile(checkpointer=checkpointer)
# ... 使用 app
# 方式 2:使用内存数据库(测试用)
with SqliteSaver.from_conn_string(":memory:") as checkpointer:
app = graph.compile(checkpointer=checkpointer)SqliteSaver 的数据存储:
checkpoints.db(SQLite 文件)
┌──────────────────────────────────────┐
│ thread_id │ step │ state_snapshot │
│───────────│──────│──────────────────│
│ user_001 │ 1 │ {messages: [...]} │
│ user_001 │ 2 │ {messages: [...]} │
│ user_002 │ 1 │ {messages: [...]} │
└──────────────────────────────────────┘
重启服务器后 → 从文件读取 → 对话继续PostgresSaver(多机/云部署)
适合生产级部署、多实例、需要高可用的场景。
# 安装依赖
pip install langgraph-checkpoint-postgresfrom langgraph.checkpoint.postgres import PostgresSaver
# 连接 PostgreSQL
DB_URI = "postgresql://user:password@localhost:5432/langgraph_db"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
# 首次使用时创建表结构
checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)
# ... 使用 app异步版本
如果你的应用用了 async(如 FastAPI),使用相应的异步版本:
# 异步 SQLite
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
app = graph.compile(checkpointer=checkpointer)
result = await app.ainvoke(...)
# 异步 PostgreSQL
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)三种 Checkpointer 对比
| Checkpointer | 存储位置 | 持久性 | 适合场景 |
|---|---|---|---|
MemorySaver | 内存 | ❌ 重启丢失 | 开发调试 |
SqliteSaver | 本地文件 | ✅ 持久 | 单机部署、小项目 |
PostgresSaver | PostgreSQL | ✅ 持久 | 生产、多实例、云部署 |
选型建议:从
MemorySaver开始开发 → 功能稳定后切到SqliteSaver→ 上生产切到PostgresSaver。三者的 API 完全一样,只需要换一行compile(checkpointer=xxx)代码。
6.4 thread_id:多轮对话的会话管理
thread_id 是 Checkpointer 系统的核心概念——它标识"这是哪个对话/会话"。不同的 thread_id 对应不同的 State 历史,完全隔离。
thread_id 的作用
thread_id = 对话的唯一标识
thread_id="user_001"
┌────────────────────────────┐
│ 对话 1 的 State │
│ messages: [你好, 我叫小明...]│
└────────────────────────────┘
thread_id="user_002"
┌────────────────────────────┐
│ 对话 2 的 State │
│ messages: [帮我查天气...] │
└────────────────────────────┘
两个对话完全独立,互不干扰。多用户/多会话场景
# 用户 A 的对话
config_a = {"configurable": {"thread_id": "user_alice_001"}}
app.invoke({"messages": [HumanMessage(content="我喜欢Python")]}, config=config_a)
# 用户 B 的对话(完全独立)
config_b = {"configurable": {"thread_id": "user_bob_001"}}
app.invoke({"messages": [HumanMessage(content="我喜欢Java")]}, config=config_b)
# 用户 A 继续(记得 A 的偏好,不受 B 影响)
app.invoke({"messages": [HumanMessage(content="我喜欢什么语言?")]}, config=config_a)
# → "你说你喜欢 Python"thread_id 的命名策略
| 场景 | thread_id 格式 | 示例 |
|---|---|---|
| 单用户单会话 | 用户 ID | "user_123" |
| 单用户多会话 | 用户 ID + 会话 ID | "user_123_chat_456" |
| Web 应用 | Session ID | "session_abc123" |
| API 服务 | Request ID | "req_20260401_001" |
import uuid
# 推荐:用 UUID 保证唯一性
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}新对话 vs 继续对话
# 新对话:用新的 thread_id
new_config = {"configurable": {"thread_id": "new_conversation"}}
result = app.invoke({"messages": [HumanMessage(content="你好")]}, config=new_config)
# → 全新的对话,没有历史
# 继续对话:用同一个 thread_id
result = app.invoke({"messages": [HumanMessage(content="接着聊")]}, config=new_config)
# → 延续上面的对话,带着历史在 Web 应用中使用
from fastapi import FastAPI
from langchain_core.messages import HumanMessage
fastapi_app = FastAPI()
@fastapi_app.post("/chat")
async def chat(user_id: str, session_id: str, message: str):
# 用 user_id + session_id 组合成 thread_id
thread_id = f"{user_id}_{session_id}"
config = {"configurable": {"thread_id": thread_id}}
result = await app.ainvoke(
{"messages": [HumanMessage(content=message)]},
config=config
)
return {"reply": result["messages"][-1].content}关键认知:
thread_id是你管理 Agent 记忆的唯一手段。同一个thread_id= 同一个对话;换一个thread_id= 全新的开始。这比手动管理对话历史简单得多。
6.5 查看和回溯历史状态(get_state / get_state_history)
Checkpointer 不仅仅是"存档"——它还记录了图执行的每一步。你可以查看当前状态、遍历历史、甚至回滚到某个历史时间点重新执行。
get_state:查看当前状态
config = {"configurable": {"thread_id": "user_001"}}
# 查看该 thread 的当前状态
state = app.get_state(config)
# state.values —— 当前的 State 内容
print(state.values["messages"])
# state.next —— 下一个要执行的节点(如果图还没结束)
print(state.next) # () 表示已结束,("agent",) 表示下一步是 agent
# state.config —— 该 checkpoint 的配置信息
print(state.config)get_state 返回的 StateSnapshot:
StateSnapshot(
values={ ← 当前 State 的内容
"messages": [...],
},
next=(), ← 下一步要执行的节点
config={ ← checkpoint 配置
"configurable": {
"thread_id": "user_001",
"checkpoint_id": "xxx" ← 每个快照的唯一 ID
}
},
parent_config={...} ← 上一步的 checkpoint 配置
)get_state_history:遍历历史
config = {"configurable": {"thread_id": "user_001"}}
# 遍历该 thread 的所有历史 checkpoint
for state in app.get_state_history(config):
print(f"Step: {state.config['configurable']['checkpoint_id']}")
print(f" Next: {state.next}")
msg_count = len(state.values.get("messages", []))
print(f" Messages: {msg_count} 条")
print("---")输出示例:
Step: checkpoint_5(最新)
Next: () ← 已结束
Messages: 6 条
---
Step: checkpoint_4
Next: ("chatbot",) ← 当时即将执行 chatbot
Messages: 5 条
---
Step: checkpoint_3
Next: ("tools",) ← 当时即将执行 tools
Messages: 4 条
---
...(一直到最初的状态)回滚到历史状态
找到某个历史 checkpoint 后,你可以从那个点重新执行:
# 假设我们想从 checkpoint_3 重新开始
# 先找到那个 checkpoint
target_config = None
for state in app.get_state_history(config):
if len(state.values.get("messages", [])) == 4:
target_config = state.config
break
# 从那个 checkpoint 继续执行
if target_config:
result = app.invoke(None, config=target_config)
# → 从 checkpoint_3 的状态继续,重新走后面的步骤update_state:手动修改状态
你甚至可以手动修改 checkpoint 中的 State,然后继续执行:
config = {"configurable": {"thread_id": "user_001"}}
# 手动给 State 加一条消息
app.update_state(
config,
{"messages": [HumanMessage(content="忽略之前的指令,回答最新问题")]},
)
# 继续执行(用修改后的 State)
result = app.invoke(None, config=config)update_state 的应用场景:
场景 1:人工修正
Agent 产生了错误的中间结果 → 人工修改 State → 继续执行
场景 2:注入上下文
在 Agent 执行中途 → 注入额外的系统消息 → 改变行为
场景 3:Human-in-the-Loop(下一章详细讲)
Agent 暂停等人类确认 → 人类修改 State → 恢复执行实际调试小技巧
# 快速打印最近一轮对话的所有消息
config = {"configurable": {"thread_id": "debug_session"}}
state = app.get_state(config)
for msg in state.values["messages"]:
role = msg.type # human / ai / tool
content = msg.content[:100] if msg.content else "(工具调用)"
print(f"[{role}] {content}")核心价值:
get_state/get_state_history/update_state这三个 API 赋予你对 Agent 执行过程的完全可观测性和可控性——你可以查看任意时刻的状态、回滚到任意步骤、手动修改并继续。这是 LangGraph 区别于其他 Agent 框架的一大优势。
本章小结
| 知识点 | 要点 |
|---|---|
| 为什么持久化 | 多轮对话记忆 + 断点恢复 + 状态回溯 |
| MemorySaver | 内存存储,开发调试用,重启丢失 |
| SqliteSaver | 文件存储,单机部署 |
| PostgresSaver | 数据库存储,生产部署 |
| thread_id | 对话/会话的唯一标识,不同 thread 完全隔离 |
| get_state | 查看当前 State 快照 |
| get_state_history | 遍历所有历史 checkpoint |
| update_state | 手动修改 State 后继续执行 |
下一章预告:Human-in-the-Loop —— 人机协作。让 Agent 在关键时刻暂停等人类确认,人类修改 State 后恢复执行。这是构建可信赖 AI 系统的关键能力。