学习时长:4-5 周
基础 RAG 系统(查询→检索→生成)在简单场景下表现良好,但面对复杂查询、多跳推理、长文档理解等真实业务需求时,往往会遇到检索不准、上下文丢失、答案幻觉等瓶颈。高级 RAG 技术通过优化检索前、检索中、检索后三个阶段,将系统准确率从 70% 提升至 90% 以上。
🧭 技术全景图
用户查询
↓
【检索前优化】
├── 查询改写(Query Rewriting)
├── 查询分解(Query Decomposition)
├── HyDE 假设文档嵌入
└── 查询路由(Query Routing)
↓
【检索中优化】
├── 混合检索(Hybrid Search)
├── 多路召回(Multi-Route Retrieval)
├── 父子文档检索(Parent-Child Retrieval)
└── 自适应检索(Adaptive Retrieval)
↓
【检索后优化】
├── 重排序(Reranking)
├── 上下文压缩(Context Compression)
├── 去重与融合(Dedup & Fusion)
└── 引用溯源(Citation Tracking)
↓
【高级架构】
├── Graph RAG(知识图谱增强)
├── Agentic RAG(Agent 驱动)
├── Corrective RAG(自我纠错)
├── Self-RAG(自适应检索生成)
└── Modular RAG(模块化 RAG)
↓
最终答案4.4.1 检索前优化(Pre-Retrieval)
用户的原始查询往往模糊、口语化或包含多个子问题,直接用于检索效果不佳。检索前优化的核心目标是将用户意图转化为高质量的检索查询。
1. 查询改写(Query Rewriting)
将用户的口语化、模糊查询改写为更适合语义检索的表述。
python
from openai import OpenAI
client = OpenAI()
def rewrite_query(original_query: str, conversation_history: list[dict] = None) -> list[str]:
"""
查询改写:生成多个语义等价但表述不同的查询
策略:
1. 扩展同义词和相关术语
2. 补充上下文(基于对话历史)
3. 生成多个角度的查询(提升召回率)
"""
history_context = ""
if conversation_history:
recent = conversation_history[-3:] # 最近3轮对话
history_context = "\n".join(
f"{msg['role']}: {msg['content']}" for msg in recent
)
prompt = f"""你是一个查询优化专家。请将用户的原始查询改写为 3 个更适合语义检索的版本。
要求:
1. 保持原始查询的核心意图不变
2. 使用不同的表述方式和关键词
3. 补充隐含的上下文信息
4. 如果有对话历史,结合上下文消除歧义
{f"对话历史:{chr(10)}{history_context}" if history_context else ""}
原始查询:{original_query}
请以 JSON 数组格式输出 3 个改写后的查询,不要其他解释:
["查询1", "查询2", "查询3"]
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
import json
try:
queries = json.loads(response.choices[0].message.content)
return [original_query] + queries # 保留原始查询 + 3个改写
except json.JSONDecodeError:
return [original_query]
# 示例
original = "Python 怎么连数据库"
rewritten = rewrite_query(original)
for i, q in enumerate(rewritten):
print(f"{i}. {q}")
# 输出:
# 0. Python 怎么连数据库
# 1. Python 连接数据库的方法和常用库(如 SQLAlchemy、psycopg2)
# 2. 如何使用 Python 操作 MySQL/PostgreSQL 数据库
# 3. Python 数据库连接教程:ORM 与原生 SQL 驱动对比2. 查询分解(Query Decomposition)
将复杂的多跳问题拆解为多个简单子问题,分别检索后合并答案。
python
def decompose_query(complex_query: str) -> list[dict]:
"""
查询分解:将复杂问题拆解为可独立检索的子问题
适用场景:
- 多跳推理:"A 公司的 CEO 毕业于哪所大学?"
- 比较类问题:"React 和 Vue 哪个更适合大型项目?"
- 多条件问题:"2024年营收超过100亿且员工超过1万的科技公司有哪些?"
"""
prompt = f"""将以下复杂问题分解为可以独立回答的子问题。
要求:
1. 每个子问题应该足够简单,可以通过单次检索回答
2. 子问题之间标注依赖关系(如果有)
3. 按照回答顺序排列
复杂问题:{complex_query}
以 JSON 格式输出:
{{
"sub_questions": [
{{
"id": 1,
"question": "子问题1",
"depends_on": [],
"purpose": "这个子问题解决什么"
}}
],
"reasoning": "分解思路说明"
}}
"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0,
response_format={"type": "json_object"}
)
import json
return json.loads(response.choices[0].message.content)
def answer_with_decomposition(complex_query: str, retriever) -> str:
"""
分解-检索-合成 完整流程
"""
# Step 1: 分解问题
decomposed = decompose_query(complex_query)
sub_questions = decomposed["sub_questions"]
print(f"分解为 {len(sub_questions)} 个子问题")
# Step 2: 逐个检索并回答子问题
sub_answers = {}
for sq in sub_questions:
# 检查依赖
context_from_deps = ""
for dep_id in sq.get("depends_on", []):
if dep_id in sub_answers:
context_from_deps += f"\n已知信息:{sub_answers[dep_id]}"
# 检索相关文档
query = sq["question"] + context_from_deps
docs = retriever.search(query, top_k=3)
context = "\n".join(docs)
# 回答子问题
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "根据提供的资料简洁回答问题。如果资料不足,说明无法确定。"},
{"role": "user", "content": f"资料:\n{context}\n\n问题:{sq['question']}"}
],
temperature=0
)
sub_answers[sq["id"]] = response.choices[0].message.content
print(f" 子问题 {sq['id']}: {sq['question']}")
print(f" 答案: {sub_answers[sq['id']][:100]}...")
# Step 3: 合成最终答案
synthesis_prompt = f"""根据以下子问题的答案,综合回答原始问题。
原始问题:{complex_query}
子问题与答案:
"""
for sq in sub_questions:
synthesis_prompt += f"\nQ: {sq['question']}\nA: {sub_answers[sq['id']]}\n"
synthesis_prompt += "\n请综合以上信息,给出完整、连贯的最终答案:"
final_response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": synthesis_prompt}],
temperature=0.3
)
return final_response.choices[0].message.content
# 示例
result = decompose_query("对比 LLaMA 3.1 和 Qwen 2.5 在中文任务上的表现,哪个更适合做中文客服机器人?")
print(json.dumps(result, ensure_ascii=False, indent=2))3. HyDE(假设文档嵌入)
先让 LLM 生成一个"假设性答案",再用这个答案的向量去检索真实文档。核心思想:答案和答案之间的语义距离,比问题和答案之间的距离更近。
python
def hyde_retrieval(query: str, retriever, top_k: int = 5) -> list[str]:
"""
HyDE(Hypothetical Document Embeddings)
流程:
1. LLM 生成假设性答案(不需要准确,只需语义相关)
2. 用假设答案的向量检索真实文档
3. 用真实文档生成最终答案
优势:
- 解决"问题-文档"语义鸿沟问题
- 对短查询效果提升显著(30%+)
劣势:
- 增加一次 LLM 调用(延迟 +500ms)
- 假设答案方向错误时可能误导检索
"""
# Step 1: 生成假设性答案
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "请直接回答以下问题。不需要完全准确,但要包含相关的关键术语和概念。"
},
{"role": "user", "content": query}
],
temperature=0.7,
max_tokens=300
)
hypothetical_answer = response.choices[0].message.content
print(f"假设答案:{hypothetical_answer[:150]}...")
# Step 2: 用假设答案检索(而非原始查询)
docs_from_hyde = retriever.search(hypothetical_answer, top_k=top_k)
# Step 3: 同时用原始查询检索(双路召回)
docs_from_query = retriever.search(query, top_k=top_k)
# Step 4: 合并去重
all_docs = list(dict.fromkeys(docs_from_hyde + docs_from_query)) # 保序去重
return all_docs[:top_k]
# 进阶:多假设 HyDE(生成多个假设答案,提升召回多样性)
def multi_hyde_retrieval(query: str, retriever, num_hypotheses: int = 3, top_k: int = 5) -> list[str]:
"""生成多个假设答案,分别检索后融合"""
all_docs = []
for i in range(num_hypotheses):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"请从第 {i+1} 个不同角度回答以下问题,包含相关术语。"},
{"role": "user", "content": query}
],
temperature=0.9 # 高温度增加多样性
)
hypothesis = response.choices[0].message.content
docs = retriever.search(hypothesis, top_k=3)
all_docs.extend(docs)
# 去重并返回
return list(dict.fromkeys(all_docs))[:top_k]4. 查询路由(Query Routing)
根据查询类型自动选择不同的检索策略或知识库。
python
from enum import Enum
from pydantic import BaseModel
class QueryType(str, Enum):
FACTUAL = "factual" # 事实查询 → 精确检索
ANALYTICAL = "analytical" # 分析比较 → 多路检索 + 长上下文
PROCEDURAL = "procedural" # 操作步骤 → 文档检索
CONVERSATIONAL = "conversational" # 闲聊 → 直接 LLM 回答
CODE = "code" # 代码相关 → 代码库检索
REALTIME = "realtime" # 实时信息 → 联网搜索
class QueryRoute(BaseModel):
query_type: QueryType
confidence: float
suggested_sources: list[str]
needs_retrieval: bool
def route_query(query: str) -> QueryRoute:
"""
查询路由:分类查询类型,决定检索策略
"""
prompt = f"""分析以下查询的类型,返回 JSON 格式:
查询:{query}
类型定义:
- factual: 事实性问题(有明确答案),如"Python 3.12 发布日期"
- analytical: 分析比较类,如"React vs Vue 的优缺点"
- procedural: 操作步骤类,如"如何部署 Docker 容器"
- conversational: 闲聊/问候,如"你好"、"谢谢"
- code: 代码生成/调试,如"写一个排序算法"
- realtime: 需要实时信息,如"今天的天气"、"最新的股价"
输出格式:
{{
"query_type": "类型",
"confidence": 0.95,
"suggested_sources": ["knowledge_base", "code_repo", "web_search"],
"needs_retrieval": true
}}
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0,
response_format={"type": "json_object"}
)
import json
data = json.loads(response.choices[0].message.content)
return QueryRoute(**data)
class RAGRouter:
"""
RAG 路由器:根据查询类型分发到不同处理管道
"""
def __init__(self, retrievers: dict, llm_client):
self.retrievers = retrievers # {"knowledge_base": ..., "code_repo": ..., ...}
self.client = llm_client
def process(self, query: str) -> dict:
route = route_query(query)
print(f"路由结果:{route.query_type}(置信度 {route.confidence:.0%})")
if not route.needs_retrieval:
# 不需要检索,直接 LLM 回答
return self._direct_answer(query)
if route.query_type == QueryType.FACTUAL:
return self._factual_pipeline(query)
elif route.query_type == QueryType.ANALYTICAL:
return self._analytical_pipeline(query)
elif route.query_type == QueryType.PROCEDURAL:
return self._procedural_pipeline(query)
elif route.query_type == QueryType.CODE:
return self._code_pipeline(query)
elif route.query_type == QueryType.REALTIME:
return self._realtime_pipeline(query)
else:
return self._direct_answer(query)
def _factual_pipeline(self, query: str) -> dict:
"""事实查询:精确检索 + 低温度生成"""
docs = self.retrievers["knowledge_base"].search(query, top_k=3)
answer = self._generate(query, docs, temperature=0)
return {"answer": answer, "sources": docs, "pipeline": "factual"}
def _analytical_pipeline(self, query: str) -> dict:
"""分析比较:多路检索 + 查询分解"""
decomposed = decompose_query(query)
all_docs = []
for sq in decomposed["sub_questions"]:
docs = self.retrievers["knowledge_base"].search(sq["question"], top_k=3)
all_docs.extend(docs)
all_docs = list(dict.fromkeys(all_docs))[:8] # 去重,保留更多上下文
answer = self._generate(query, all_docs, temperature=0.3)
return {"answer": answer, "sources": all_docs, "pipeline": "analytical"}
def _procedural_pipeline(self, query: str) -> dict:
"""操作步骤:检索 + 步骤化输出"""
docs = self.retrievers["knowledge_base"].search(query, top_k=5)
answer = self._generate(
query, docs, temperature=0.3,
system_extra="请以清晰的步骤格式回答,使用编号列表。"
)
return {"answer": answer, "sources": docs, "pipeline": "procedural"}
def _code_pipeline(self, query: str) -> dict:
"""代码查询:代码库检索 + 代码生成"""
docs = self.retrievers.get("code_repo", self.retrievers["knowledge_base"]).search(query, top_k=5)
answer = self._generate(
query, docs, temperature=0.2,
system_extra="请提供完整可运行的代码示例,包含注释和错误处理。"
)
return {"answer": answer, "sources": docs, "pipeline": "code"}
def _realtime_pipeline(self, query: str) -> dict:
"""实时信息:联网搜索(此处为示意)"""
# 实际应用中接入搜索 API(如 Tavily、SerpAPI)
return {
"answer": "此问题需要实时信息,建议接入搜索 API。",
"sources": [],
"pipeline": "realtime"
}
def _direct_answer(self, query: str) -> dict:
"""直接回答,不检索"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}],
temperature=0.7
)
return {
"answer": response.choices[0].message.content,
"sources": [],
"pipeline": "direct"
}
def _generate(self, query: str, docs: list, temperature: float = 0.3,
system_extra: str = "") -> str:
context = "\n\n---\n\n".join(docs)
system_msg = f"根据以下参考资料回答问题。如果资料不足,请明确说明。{system_extra}"
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_msg},
{"role": "user", "content": f"参考资料:\n{context}\n\n问题:{query}"}
],
temperature=temperature
)
return response.choices[0].message.content4.4.2 检索中优化(Retrieval Enhancement)
1. 混合检索(Hybrid Search)
将语义检索(向量相似度)与关键词检索(BM25)结合,兼顾语义理解和精确匹配。
python
# pip install rank_bm25 numpy
import numpy as np
from rank_bm25 import BM25Okapi
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import jieba
class HybridRetriever:
"""
混合检索器:语义检索 + BM25 关键词检索
优势:
- 语义检索擅长:同义词、换一种说法的查询
- BM25 擅长:精确术语匹配、专有名词、代码片段
- 两者互补,综合效果 > 单一方法
"""
def __init__(
self,
embedding_model: str = "BAAI/bge-large-zh-v1.5",
semantic_weight: float = 0.6,
bm25_weight: float = 0.4
):
self.embed_model = SentenceTransformer(embedding_model)
self.semantic_weight = semantic_weight
self.bm25_weight = bm25_weight
self.documents: list[str] = []
self.doc_embeddings: np.ndarray | None = None
self.bm25: BM25Okapi | None = None
def index(self, documents: list[str]):
"""建立索引"""
self.documents = documents
# 语义索引
print("构建语义索引...")
self.doc_embeddings = self.embed_model.encode(documents, show_progress_bar=True)
# BM25 索引(中文需要分词)
print("构建 BM25 索引...")
tokenized_docs = [list(jieba.cut(doc)) for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
print(f"索引完成:{len(documents)} 个文档")
def search(self, query: str, top_k: int = 5) -> list[dict]:
"""
混合检索
Returns:
[{"text": "...", "score": 0.85, "semantic_score": 0.9, "bm25_score": 0.7, "rank": 1}, ...]
"""
# 1. 语义检索
query_embedding = self.embed_model.encode([query])
semantic_scores = cosine_similarity(query_embedding, self.doc_embeddings)[0]
# 2. BM25 检索
tokenized_query = list(jieba.cut(query))
bm25_scores = self.bm25.get_scores(tokenized_query)
# 3. 归一化分数到 [0, 1]
def normalize(scores):
min_s, max_s = scores.min(), scores.max()
if max_s == min_s:
return np.zeros_like(scores)
return (scores - min_s) / (max_s - min_s)
semantic_norm = normalize(semantic_scores)
bm25_norm = normalize(bm25_scores)
# 4. 加权融合
combined_scores = (
self.semantic_weight * semantic_norm +
self.bm25_weight * bm25_norm
)
# 5. 排序并返回 Top-K
top_indices = np.argsort(combined_scores)[::-1][:top_k]
results = []
for rank, idx in enumerate(top_indices, 1):
results.append({
"text": self.documents[idx],
"score": float(combined_scores[idx]),
"semantic_score": float(semantic_scores[idx]),
"bm25_score": float(bm25_scores[idx]),
"rank": rank,
"index": int(idx)
})
return results
# 使用示例
retriever = HybridRetriever(semantic_weight=0.6, bm25_weight=0.4)
documents = [
"FastAPI 是一个现代、快速的 Python Web 框架,基于 Starlette 和 Pydantic。",
"Django 是 Python 最流行的全栈 Web 框架,内置 ORM、Admin 和模板引擎。",
"Flask 是一个轻量级的 Python Web 微框架,适合小型项目和 API 开发。",
"SQLAlchemy 是 Python 最强大的 ORM 库,支持多种数据库后端。",
"使用 pip install fastapi 安装 FastAPI,然后用 uvicorn 启动服务。",
"ASGI 是异步服务器网关接口,FastAPI 和 Starlette 都基于 ASGI 标准。",
]
retriever.index(documents)
results = retriever.search("如何安装和使用 FastAPI", top_k=3)
for r in results:
print(f"[排名 {r['rank']}] 综合: {r['score']:.3f} | "
f"语义: {r['semantic_score']:.3f} | BM25: {r['bm25_score']:.3f}")
print(f" {r['text'][:80]}...\n")2. 父子文档检索(Parent-Child Retrieval)
核心思想:用小块(Child)做精确检索,返回大块(Parent)做上下文。解决"分块太小丢失上下文,分块太大检索不精确"的矛盾。
python
import uuid
from dataclasses import dataclass, field
@dataclass
class DocumentChunk:
chunk_id: str
parent_id: str | None
text: str
level: str # "parent" 或 "child"
metadata: dict = field(default_factory=dict)
class ParentChildRetriever:
"""
父子文档检索器
架构:
- Parent(大块,800-1500字符):保留完整上下文,用于生成
- Child(小块,200-400字符):精确语义匹配,用于检索
流程:
1. 文档 → 切分为 Parent 块
2. 每个 Parent → 进一步切分为多个 Child 块
3. 检索时:用 Child 块做向量检索
4. 返回时:返回匹配 Child 对应的 Parent 块
"""
def __init__(self, embedding_model, vector_store):
self.embed_model = embedding_model
self.vector_store = vector_store
self.parent_store: dict[str, DocumentChunk] = {} # parent_id → chunk
self.child_to_parent: dict[str, str] = {} # child_id → parent_id
def index_document(
self,
text: str,
parent_chunk_size: int = 1000,
parent_overlap: int = 100,
child_chunk_size: int = 300,
child_overlap: int = 50,
metadata: dict = None
):
"""建立父子索引"""
metadata = metadata or {}
# Step 1: 切分为 Parent 块
parent_chunks = self._split_text(text, parent_chunk_size, parent_overlap)
for parent_text in parent_chunks:
parent_id = str(uuid.uuid4())
parent_chunk = DocumentChunk(
chunk_id=parent_id,
parent_id=None,
text=parent_text,
level="parent",
metadata=metadata
)
self.parent_store[parent_id] = parent_chunk
# Step 2: 每个 Parent 切分为 Child 块
child_chunks = self._split_text(parent_text, child_chunk_size, child_overlap)
for child_text in child_chunks:
child_id = str(uuid.uuid4())
self.child_to_parent[child_id] = parent_id
# Step 3: 只将 Child 块存入向量数据库
child_embedding = self.embed_model.encode(child_text)
self.vector_store.add(
ids=[child_id],
embeddings=[child_embedding.tolist()],
documents=[child_text],
metadatas=[{"parent_id": parent_id, **metadata}]
)
print(f"索引完成:{len(parent_chunks)} 个 Parent 块")
def search(self, query: str, top_k: int = 3) -> list[dict]:
"""
检索:用 Child 匹配,返回 Parent 上下文
"""
query_embedding = self.embed_model.encode(query)
# 检索 Child 块(多检索一些,因为多个 Child 可能属于同一 Parent)
child_results = self.vector_store.query(
query_embeddings=[query_embedding.tolist()],
n_results=top_k * 3
)
# 去重:同一 Parent 只返回一次
seen_parents = set()
results = []
for i, child_id in enumerate(child_results["ids"][0]):
parent_id = child_results["metadatas"][0][i].get("parent_id")
if parent_id and parent_id not in seen_parents:
seen_parents.add(parent_id)
parent_chunk = self.parent_store.get(parent_id)
if parent_chunk:
results.append({
"parent_text": parent_chunk.text,
"matched_child": child_results["documents"][0][i],
"distance": child_results["distances"][0][i] if "distances" in child_results else None,
"metadata": parent_chunk.metadata
})
if len(results) >= top_k:
break
return results
@staticmethod
def _split_text(text: str, chunk_size: int, overlap: int) -> list[str]:
"""简单的固定大小分块"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start = end - overlap
return chunks
# 使用示例
from sentence_transformers import SentenceTransformer
import chromadb
embed_model = SentenceTransformer("BAAI/bge-large-zh-v1.5")
chroma_client = chromadb.Client()
collection = chroma_client.create_collection("parent_child_demo")
retriever = ParentChildRetriever(embed_model, collection)
long_document = """
# FastAPI 完整教程
## 第一章:安装与配置
FastAPI 是一个现代、高性能的 Python Web 框架。它基于 Python 3.7+ 的类型提示,
使用 Starlette 作为 Web 框架核心,Pydantic 作为数据验证库。
安装 FastAPI 非常简单,只需要运行 pip install fastapi uvicorn 即可。
uvicorn 是推荐的 ASGI 服务器,用于运行 FastAPI 应用。
创建第一个应用只需要几行代码:
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
使用 uvicorn main:app --reload 启动开发服务器。
## 第二章:路由与请求处理
FastAPI 支持所有标准的 HTTP 方法:GET、POST、PUT、DELETE、PATCH。
路由定义使用装饰器语法,非常直观。
路径参数通过函数参数自动解析,类型会自动验证。
查询参数也是通过函数参数定义,FastAPI 会自动区分路径参数和查询参数。
请求体使用 Pydantic 模型定义,自动完成数据验证和序列化。
这是 FastAPI 最强大的特性之一,可以大幅减少手动验证代码。
"""
retriever.index_document(
long_document,
parent_chunk_size=500,
child_chunk_size=150,
metadata={"source": "fastapi_tutorial.md"}
)
results = retriever.search("如何安装 FastAPI", top_k=2)
for r in results:
print(f"匹配的 Child:{r['matched_child'][:80]}...")
print(f"返回的 Parent(完整上下文):{r['parent_text'][:200]}...")
print()3. 多路召回与 Reciprocal Rank Fusion(RRF)
从多个检索源获取结果,使用 RRF 算法融合排名。
python
def reciprocal_rank_fusion(
result_lists: list[list[dict]],
k: int = 60,
top_n: int = 10
) -> list[dict]:
"""
Reciprocal Rank Fusion(RRF)
将多个排名列表融合为一个统一排名。
公式:RRF_score(d) = Σ 1 / (k + rank_i(d))
Args:
result_lists: 多个检索结果列表,每个元素包含 "id" 和 "text"
k: 平滑参数(默认 60,论文推荐值)
top_n: 返回前 N 个结果
Returns:
融合后的排序结果
"""
rrf_scores: dict[str, float] = {}
doc_map: dict[str, dict] = {}
for result_list in result_lists:
for rank, doc in enumerate(result_list, start=1):
doc_id = doc.get("id") or doc.get("text")[:50] # 用文本前50字符作为 ID
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1.0 / (k + rank)
doc_map[doc_id] = doc
# 按 RRF 分数排序
sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)
results = []
for doc_id in sorted_ids[:top_n]:
doc = doc_map[doc_id]
doc["rrf_score"] = rrf_scores[doc_id]
results.append(doc)
return results
# 使用示例:融合语义检索、BM25、HyDE 三路结果
def multi_route_retrieval(query: str, retriever, top_k: int = 5) -> list[dict]:
"""多路召回 + RRF 融合"""
# 路线 1:原始查询语义检索
route1 = retriever.semantic_search(query, top_k=10)
# 路线 2:BM25 关键词检索
route2 = retriever.bm25_search(query, top_k=10)
# 路线 3:查询改写后检索
rewritten = rewrite_query(query)
route3 = []
for rq in rewritten[1:]: # 跳过原始查询
route3.extend(retriever.semantic_search(rq, top_k=5))
# RRF 融合
fused = reciprocal_rank_fusion([route1, route2, route3], top_n=top_k)
return fused4.4.3 检索后优化(Post-Retrieval)
1. 重排序(Reranking)
使用专门的 Cross-Encoder 模型对检索结果重新打分排序,显著提升精度。
python
# pip install sentence-transformers
from sentence_transformers import CrossEncoder
class Reranker:
"""
重排序器:使用 Cross-Encoder 对检索结果精排
原理:
- Bi-Encoder(Embedding):分别编码 query 和 doc,速度快但精度有限
- Cross-Encoder(Reranker):同时编码 query+doc,精度高但速度慢
最佳实践:
- 先用 Bi-Encoder 粗排(Top-20~50)
- 再用 Cross-Encoder 精排(Top-3~5)
"""
def __init__(self, model_name: str = "BAAI/bge-reranker-v2-m3"):
"""
推荐模型:
- BAAI/bge-reranker-v2-m3:多语言,效果最好
- BAAI/bge-reranker-large:中文优化
- cross-encoder/ms-marco-MiniLM-L-12-v2:英文,速度快
"""
self.model = CrossEncoder(model_name)
def rerank(
self,
query: str,
documents: list[str],
top_k: int = 5,
score_threshold: float = 0.0
) -> list[dict]:
"""
重排序
Args:
query: 用户查询
documents: 待排序的文档列表
top_k: 返回前 K 个
score_threshold: 最低分数阈值(过滤低相关文档)
Returns:
[{"text": "...", "score": 0.95, "original_rank": 3}, ...]
"""
# 构建 query-doc 对
pairs = [(query, doc) for doc in documents]
# Cross-Encoder 打分
scores = self.model.predict(pairs)
# 排序
ranked = sorted(
[
{"text": doc, "score": float(score), "original_rank": i + 1}
for i, (doc, score) in enumerate(zip(documents, scores))
],
key=lambda x: x["score"],
reverse=True
)
# 过滤低分 + 截断
ranked = [r for r in ranked if r["score"] >= score_threshold]
return ranked[:top_k]
# 使用示例
reranker = Reranker("BAAI/bge-reranker-v2-m3")
query = "FastAPI 如何实现流式响应"
# 假设粗排返回了 10 个文档
coarse_results = [
"FastAPI 支持 StreamingResponse,可以实现流式输出。",
"Django 是一个全栈 Web 框架。",
"使用 SSE(Server-Sent Events)可以实现服务端推送。",
"FastAPI 的依赖注入系统非常强大。",
"流式响应在 AI 应用中非常重要,可以实现打字效果。",
"Flask 也可以通过 Response 生成器实现流式输出。",
"Pydantic 用于数据验证和序列化。",
"ASGI 服务器支持异步流式处理。",
"uvicorn 是推荐的 ASGI 服务器。",
"WebSocket 也可以用于实时双向通信。",
]
reranked = reranker.rerank(query, coarse_results, top_k=5, score_threshold=0.1)
print(f"查询:{query}\n")
for r in reranked:
print(f" [分数 {r['score']:.3f}] (原排名 {r['original_rank']}) {r['text']}")使用 Cohere Rerank API(云端方案):
python
# pip install cohere
import cohere
co = cohere.Client("your-cohere-api-key")
def cohere_rerank(query: str, documents: list[str], top_k: int = 5) -> list[dict]:
"""使用 Cohere Rerank API(无需本地 GPU)"""
response = co.rerank(
model="rerank-multilingual-v3.0", # 多语言模型
query=query,
documents=documents,
top_n=top_k,
return_documents=True
)
return [
{
"text": r.document.text,
"score": r.relevance_score,
"original_rank": r.index + 1
}
for r in response.results
]2. 上下文压缩(Context Compression)
检索到的文档可能包含大量无关信息。上下文压缩只保留与查询相关的部分,减少 token 消耗并提升答案质量。
python
def compress_context(
query: str,
documents: list[str],
max_total_tokens: int = 2000
) -> list[str]:
"""
上下文压缩:从检索文档中提取与查询最相关的段落
策略:
1. 对每个文档,提取与查询相关的关键句子
2. 控制总 token 数不超过预算
3. 保留文档来源标记(便于引用溯源)
"""
compressed_docs = []
for i, doc in enumerate(documents):
prompt = f"""从以下文档中提取与查询最相关的内容。
要求:
1. 只保留直接相关的句子和段落
2. 保持原文表述,不要改写
3. 如果整个文档都不相关,返回 "NOT_RELEVANT"
4. 压缩后的内容不超过原文的 50%
查询:{query}
文档 {i + 1}:
{doc}
提取的相关内容:
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0,
max_tokens=500
)
compressed = response.choices[0].message.content.strip()
if compressed != "NOT_RELEVANT" and len(compressed) > 20:
compressed_docs.append(f"[来源 {i + 1}] {compressed}")
return compressed_docs
# 轻量级方案:基于句子相似度的压缩(无需 LLM 调用)
def compress_by_similarity(
query: str,
document: str,
embed_model,
top_sentences: int = 5,
min_similarity: float = 0.3
) -> str:
"""基于句子级语义相似度的压缩(不依赖 LLM,速度快)"""
import re
sentences = re.split(r'[。!?\n]+', document)
sentences = [s.strip() for s in sentences if len(s.strip()) > 10]
if not sentences:
return document
query_emb = embed_model.encode([query])
sent_embs = embed_model.encode(sentences)
similarities = cosine_similarity(query_emb, sent_embs)[0]
# 按相似度排序,保留 top_sentences 个
ranked = sorted(
zip(sentences, similarities, range(len(sentences))),
key=lambda x: x[1],
reverse=True
)
# 过滤低相似度 + 截断
selected = [
(sent, sim, idx) for sent, sim, idx in ranked
if sim >= min_similarity
][:top_sentences]
# 按原文顺序排列(保持连贯性)
selected.sort(key=lambda x: x[2])
return "。".join([s[0] for s in selected]) + "。"3. 引用溯源(Citation Tracking)
让 LLM 在回答中标注信息来源,提升可信度并方便用户验证。
python
def generate_with_citations(
query: str,
documents: list[dict], # [{"text": "...", "source": "文件名", "page": 3}, ...]
) -> dict:
"""
带引用的回答生成
Returns:
{
"answer": "根据资料[1],FastAPI 支持流式响应...[2]",
"citations": [
{"id": 1, "source": "fastapi_docs.pdf", "page": 15, "excerpt": "..."},
{"id": 2, "source": "tutorial.md", "excerpt": "..."}
]
}
"""
# 构建带编号的上下文
numbered_context = ""
for i, doc in enumerate(documents, 1):
source_info = doc.get("source", "未知来源")
page_info = f",第{doc['page']}页" if doc.get("page") else ""
numbered_context += f"\n[{i}] ({source_info}{page_info})\n{doc['text']}\n"
prompt = f"""根据以下编号参考资料回答问题。
要求:
1. 在回答中使用 [编号] 标注信息来源,如 [1]、[2]
2. 每个关键事实都必须标注来源
3. 如果多个来源支持同一观点,标注所有来源,如 [1][3]
4. 如果参考资料不足以回答,明确说明并标注哪些部分缺乏依据
参考资料:
{numbered_context}
问题:{query}
带引用的回答:
"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
answer = response.choices[0].message.content
# 提取引用编号
import re
cited_ids = set(int(m) for m in re.findall(r'\[(\d+)\]', answer))
citations = [
{
"id": i,
"source": doc.get("source", "未知"),
"page": doc.get("page"),
"excerpt": doc["text"][:200] + "..."
}
for i, doc in enumerate(documents, 1)
if i in cited_ids
]
return {
"answer": answer,
"citations": citations,
"uncited_sources": [i for i in range(1, len(documents) + 1) if i not in cited_ids]
}4.4.4 高级 RAG 架构
1. Corrective RAG(自我纠错 RAG)
检索后评估文档相关性,不相关时自动触发补充检索或联网搜索。
python
class CorrectiveRAG:
"""
Corrective RAG(CRAG)
流程:
1. 检索文档
2. 评估每个文档的相关性(Correct / Ambiguous / Incorrect)
3. 根据评估结果决定下一步:
- 全部 Correct → 直接生成
- 部分 Ambiguous → 知识精炼后生成
- 全部 Incorrect → 触发 Web 搜索补充
论文:Corrective Retrieval Augmented Generation (2024)
"""
def __init__(self, retriever, reranker=None):
self.retriever = retriever
self.reranker = reranker
def evaluate_relevance(self, query: str, document: str) -> dict:
"""评估单个文档与查询的相关性"""
prompt = f"""评估以下文档与查询的相关性。
查询:{query}
文档:{document}
评估标准:
- correct:文档直接包含回答查询所需的信息
- ambiguous:文档部分相关,可能有用但不确定
- incorrect:文档与查询完全无关
以 JSON 格式输出:
{{"relevance": "correct/ambiguous/incorrect", "confidence": 0.95, "reason": "简要说明"}}
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0,
response_format={"type": "json_object"}
)
import json
return json.loads(response.choices[0].message.content)
def refine_knowledge(self, query: str, document: str) -> str:
"""知识精炼:从 Ambiguous 文档中提取有用信息"""
prompt = f"""从以下文档中提取与查询相关的信息片段。
只保留直接有用的部分,去除无关内容。如果没有任何有用信息,返回空字符串。
查询:{query}
文档:{document}
提取的相关信息:
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0
)
return response.choices[0].message.content.strip()
def web_search(self, query: str) -> list[str]:
"""
联网搜索补充(需接入搜索 API)
实际应用中可使用 Tavily、SerpAPI、Bing Search 等
"""
# 示意实现
print(f" 触发联网搜索:{query}")
# results = tavily_client.search(query, max_results=3)
# return [r["content"] for r in results["results"]]
return [f"[联网搜索结果] 关于 '{query}' 的最新信息..."]
def query(self, query: str, top_k: int = 5) -> dict:
"""CRAG 完整查询流程"""
print(f"查询:{query}")
# Step 1: 检索
raw_docs = self.retriever.search(query, top_k=top_k)
doc_texts = [d["text"] if isinstance(d, dict) else d for d in raw_docs]
print(f" 检索到 {len(doc_texts)} 个文档")
# Step 2: 评估每个文档
evaluations = []
for doc in doc_texts:
eval_result = self.evaluate_relevance(query, doc)
evaluations.append(eval_result)
correct_docs = [doc for doc, ev in zip(doc_texts, evaluations) if ev["relevance"] == "correct"]
ambiguous_docs = [doc for doc, ev in zip(doc_texts, evaluations) if ev["relevance"] == "ambiguous"]
incorrect_count = sum(1 for ev in evaluations if ev["relevance"] == "incorrect")
print(f" 评估结果:{len(correct_docs)} correct, {len(ambiguous_docs)} ambiguous, {incorrect_count} incorrect")
# Step 3: 根据评估结果决定策略
final_context = []
action_taken = "direct"
if correct_docs:
# 有正确文档 → 直接使用
final_context.extend(correct_docs)
action_taken = "direct"
if ambiguous_docs:
# 模糊文档 → 精炼后使用
for doc in ambiguous_docs:
refined = self.refine_knowledge(query, doc)
if refined:
final_context.append(refined)
action_taken = "refined"
if not final_context:
# 全部不相关 → 联网搜索
web_results = self.web_search(query)
final_context.extend(web_results)
action_taken = "web_search"
# Step 4: 生成最终答案
context = "\n\n---\n\n".join(final_context)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "根据参考资料准确回答问题。如果信息不足,请明确说明。"},
{"role": "user", "content": f"参考资料:\n{context}\n\n问题:{query}"}
],
temperature=0.3
)
return {
"answer": response.choices[0].message.content,
"action": action_taken,
"num_correct": len(correct_docs),
"num_ambiguous": len(ambiguous_docs),
"num_incorrect": incorrect_count,
"context_used": final_context
}2. Self-RAG(自适应检索生成)
模型自主决定是否需要检索、检索结果是否有用、生成的答案是否需要引用支持。
python
class SelfRAG:
"""
Self-RAG:让 LLM 自主控制检索行为
核心 Token(论文定义的特殊标记):
- [Retrieve]:是否需要检索(Yes/No)
- [IsRel]:检索结果是否相关(Relevant/Irrelevant)
- [IsSup]:生成内容是否有检索支持(Fully/Partially/No)
- [IsUse]:最终答案是否有用(1-5分)
简化实现:用 LLM 判断代替特殊 Token
"""
def __init__(self, retriever):
self.retriever = retriever
def should_retrieve(self, query: str, partial_answer: str = "") -> bool:
"""判断是否需要检索"""
prompt = f"""判断回答以下问题是否需要检索外部知识库。
问题:{query}
{f"已有部分回答:{partial_answer}" if partial_answer else ""}
判断标准:
- 需要检索:涉及具体事实、数据、最新信息、专业知识
- 不需要检索:通用常识、简单计算、创意写作、闲聊
只回答 "yes" 或 "no":
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0,
max_tokens=5
)
return "yes" in response.choices[0].message.content.lower()
def check_support(self, claim: str, evidence: str) -> dict:
"""检查生成的内容是否有证据支持"""
prompt = f"""判断以下声明是否被证据支持。
声明:{claim}
证据:{evidence}
以 JSON 输出:
{{"support_level": "fully/partially/no", "explanation": "简要说明"}}
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0,
response_format={"type": "json_object"}
)
import json
return json.loads(response.choices[0].message.content)
def query(self, query: str) -> dict:
"""Self-RAG 完整流程"""
# Step 1: 判断是否需要检索
needs_retrieval = self.should_retrieve(query)
print(f"需要检索:{needs_retrieval}")
if not needs_retrieval:
# 直接生成
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": query}],
temperature=0.7
)
return {
"answer": response.choices[0].message.content,
"retrieved": False,
"sources": []
}
# Step 2: 检索
docs = self.retriever.search(query, top_k=5)
doc_texts = [d["text"] if isinstance(d, dict) else d for d in docs]
# Step 3: 生成答案
context = "\n\n".join(doc_texts)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "根据参考资料回答问题,确保每个关键事实都有资料支持。"},
{"role": "user", "content": f"参考资料:\n{context}\n\n问题:{query}"}
],
temperature=0.3
)
answer = response.choices[0].message.content
# Step 4: 自我验证(检查答案是否有证据支持)
support_check = self.check_support(answer, context)
print(f"证据支持度:{support_check['support_level']}")
if support_check["support_level"] == "no":
# 答案缺乏支持 → 重新生成,更保守
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "严格根据参考资料回答。没有依据的内容请明确标注'资料未提及'。"},
{"role": "user", "content": f"参考资料:\n{context}\n\n问题:{query}"}
],
temperature=0
)
answer = response.choices[0].message.content
return {
"answer": answer,
"retrieved": True,
"sources": doc_texts,
"support_level": support_check["support_level"]
}3. Graph RAG(知识图谱增强 RAG)
将文档构建为知识图谱,利用图结构进行多跳推理和关系发现。
python
# pip install networkx
import networkx as nx
from dataclasses import dataclass
@dataclass
class Entity:
name: str
type: str # "person", "organization", "technology", "concept"
description: str = ""
@dataclass
class Relation:
source: str
target: str
relation_type: str # "uses", "belongs_to", "created_by", "compared_with"
description: str = ""
class GraphRAG:
"""
Graph RAG:知识图谱增强的 RAG
优势(相比纯向量检索):
1. 多跳推理:"A 的创始人的母校是?" → A→创始人→母校
2. 关系发现:"哪些技术与 FastAPI 相关?" → 图遍历
3. 全局摘要: