Skip to content

学习时长:4-5 周

基础 RAG 系统(查询→检索→生成)在简单场景下表现良好,但面对复杂查询、多跳推理、长文档理解等真实业务需求时,往往会遇到检索不准、上下文丢失、答案幻觉等瓶颈。高级 RAG 技术通过优化检索前、检索中、检索后三个阶段,将系统准确率从 70% 提升至 90% 以上。


🧭 技术全景图

用户查询

【检索前优化】
├── 查询改写(Query Rewriting)
├── 查询分解(Query Decomposition)
├── HyDE 假设文档嵌入
└── 查询路由(Query Routing)

【检索中优化】
├── 混合检索(Hybrid Search)
├── 多路召回(Multi-Route Retrieval)
├── 父子文档检索(Parent-Child Retrieval)
└── 自适应检索(Adaptive Retrieval)

【检索后优化】
├── 重排序(Reranking)
├── 上下文压缩(Context Compression)
├── 去重与融合(Dedup & Fusion)
└── 引用溯源(Citation Tracking)

【高级架构】
├── Graph RAG(知识图谱增强)
├── Agentic RAG(Agent 驱动)
├── Corrective RAG(自我纠错)
├── Self-RAG(自适应检索生成)
└── Modular RAG(模块化 RAG)

最终答案

4.4.1 检索前优化(Pre-Retrieval)

用户的原始查询往往模糊、口语化或包含多个子问题,直接用于检索效果不佳。检索前优化的核心目标是将用户意图转化为高质量的检索查询


1. 查询改写(Query Rewriting)

将用户的口语化、模糊查询改写为更适合语义检索的表述。

python
from openai import OpenAI

client = OpenAI()

def rewrite_query(original_query: str, conversation_history: list[dict] = None) -> list[str]:
    """
    查询改写:生成多个语义等价但表述不同的查询
    
    策略:
    1. 扩展同义词和相关术语
    2. 补充上下文(基于对话历史)
    3. 生成多个角度的查询(提升召回率)
    """
    history_context = ""
    if conversation_history:
        recent = conversation_history[-3:]  # 最近3轮对话
        history_context = "\n".join(
            f"{msg['role']}: {msg['content']}" for msg in recent
        )

    prompt = f"""你是一个查询优化专家。请将用户的原始查询改写为 3 个更适合语义检索的版本。

要求:
1. 保持原始查询的核心意图不变
2. 使用不同的表述方式和关键词
3. 补充隐含的上下文信息
4. 如果有对话历史,结合上下文消除歧义

{f"对话历史:{chr(10)}{history_context}" if history_context else ""}

原始查询:{original_query}

请以 JSON 数组格式输出 3 个改写后的查询,不要其他解释:
["查询1", "查询2", "查询3"]
"""

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3
    )

    import json
    try:
        queries = json.loads(response.choices[0].message.content)
        return [original_query] + queries  # 保留原始查询 + 3个改写
    except json.JSONDecodeError:
        return [original_query]


# 示例
original = "Python 怎么连数据库"
rewritten = rewrite_query(original)
for i, q in enumerate(rewritten):
    print(f"{i}. {q}")

# 输出:
# 0. Python 怎么连数据库
# 1. Python 连接数据库的方法和常用库(如 SQLAlchemy、psycopg2)
# 2. 如何使用 Python 操作 MySQL/PostgreSQL 数据库
# 3. Python 数据库连接教程:ORM 与原生 SQL 驱动对比

2. 查询分解(Query Decomposition)

将复杂的多跳问题拆解为多个简单子问题,分别检索后合并答案。

python
def decompose_query(complex_query: str) -> list[dict]:
    """
    查询分解:将复杂问题拆解为可独立检索的子问题
    
    适用场景:
    - 多跳推理:"A 公司的 CEO 毕业于哪所大学?"
    - 比较类问题:"React 和 Vue 哪个更适合大型项目?"
    - 多条件问题:"2024年营收超过100亿且员工超过1万的科技公司有哪些?"
    """
    prompt = f"""将以下复杂问题分解为可以独立回答的子问题。

要求:
1. 每个子问题应该足够简单,可以通过单次检索回答
2. 子问题之间标注依赖关系(如果有)
3. 按照回答顺序排列

复杂问题:{complex_query}

以 JSON 格式输出:
{{
  "sub_questions": [
    {{
      "id": 1,
      "question": "子问题1",
      "depends_on": [],
      "purpose": "这个子问题解决什么"
    }}
  ],
  "reasoning": "分解思路说明"
}}
"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        temperature=0,
        response_format={"type": "json_object"}
    )

    import json
    return json.loads(response.choices[0].message.content)


def answer_with_decomposition(complex_query: str, retriever) -> str:
    """
    分解-检索-合成 完整流程
    """
    # Step 1: 分解问题
    decomposed = decompose_query(complex_query)
    sub_questions = decomposed["sub_questions"]
    print(f"分解为 {len(sub_questions)} 个子问题")

    # Step 2: 逐个检索并回答子问题
    sub_answers = {}
    for sq in sub_questions:
        # 检查依赖
        context_from_deps = ""
        for dep_id in sq.get("depends_on", []):
            if dep_id in sub_answers:
                context_from_deps += f"\n已知信息:{sub_answers[dep_id]}"

        # 检索相关文档
        query = sq["question"] + context_from_deps
        docs = retriever.search(query, top_k=3)
        context = "\n".join(docs)

        # 回答子问题
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "根据提供的资料简洁回答问题。如果资料不足,说明无法确定。"},
                {"role": "user", "content": f"资料:\n{context}\n\n问题:{sq['question']}"}
            ],
            temperature=0
        )
        sub_answers[sq["id"]] = response.choices[0].message.content
        print(f"  子问题 {sq['id']}: {sq['question']}")
        print(f"  答案: {sub_answers[sq['id']][:100]}...")

    # Step 3: 合成最终答案
    synthesis_prompt = f"""根据以下子问题的答案,综合回答原始问题。

原始问题:{complex_query}

子问题与答案:
"""
    for sq in sub_questions:
        synthesis_prompt += f"\nQ: {sq['question']}\nA: {sub_answers[sq['id']]}\n"

    synthesis_prompt += "\n请综合以上信息,给出完整、连贯的最终答案:"

    final_response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": synthesis_prompt}],
        temperature=0.3
    )

    return final_response.choices[0].message.content


# 示例
result = decompose_query("对比 LLaMA 3.1 和 Qwen 2.5 在中文任务上的表现,哪个更适合做中文客服机器人?")
print(json.dumps(result, ensure_ascii=False, indent=2))

3. HyDE(假设文档嵌入)

先让 LLM 生成一个"假设性答案",再用这个答案的向量去检索真实文档。核心思想:答案和答案之间的语义距离,比问题和答案之间的距离更近

python
def hyde_retrieval(query: str, retriever, top_k: int = 5) -> list[str]:
    """
    HyDE(Hypothetical Document Embeddings)
    
    流程:
    1. LLM 生成假设性答案(不需要准确,只需语义相关)
    2. 用假设答案的向量检索真实文档
    3. 用真实文档生成最终答案
    
    优势:
    - 解决"问题-文档"语义鸿沟问题
    - 对短查询效果提升显著(30%+)
    
    劣势:
    - 增加一次 LLM 调用(延迟 +500ms)
    - 假设答案方向错误时可能误导检索
    """
    # Step 1: 生成假设性答案
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": "请直接回答以下问题。不需要完全准确,但要包含相关的关键术语和概念。"
            },
            {"role": "user", "content": query}
        ],
        temperature=0.7,
        max_tokens=300
    )
    hypothetical_answer = response.choices[0].message.content

    print(f"假设答案:{hypothetical_answer[:150]}...")

    # Step 2: 用假设答案检索(而非原始查询)
    docs_from_hyde = retriever.search(hypothetical_answer, top_k=top_k)

    # Step 3: 同时用原始查询检索(双路召回)
    docs_from_query = retriever.search(query, top_k=top_k)

    # Step 4: 合并去重
    all_docs = list(dict.fromkeys(docs_from_hyde + docs_from_query))  # 保序去重

    return all_docs[:top_k]


# 进阶:多假设 HyDE(生成多个假设答案,提升召回多样性)
def multi_hyde_retrieval(query: str, retriever, num_hypotheses: int = 3, top_k: int = 5) -> list[str]:
    """生成多个假设答案,分别检索后融合"""
    all_docs = []

    for i in range(num_hypotheses):
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": f"请从第 {i+1} 个不同角度回答以下问题,包含相关术语。"},
                {"role": "user", "content": query}
            ],
            temperature=0.9  # 高温度增加多样性
        )
        hypothesis = response.choices[0].message.content
        docs = retriever.search(hypothesis, top_k=3)
        all_docs.extend(docs)

    # 去重并返回
    return list(dict.fromkeys(all_docs))[:top_k]

4. 查询路由(Query Routing)

根据查询类型自动选择不同的检索策略或知识库。

python
from enum import Enum
from pydantic import BaseModel


class QueryType(str, Enum):
    FACTUAL = "factual"           # 事实查询 → 精确检索
    ANALYTICAL = "analytical"     # 分析比较 → 多路检索 + 长上下文
    PROCEDURAL = "procedural"     # 操作步骤 → 文档检索
    CONVERSATIONAL = "conversational"  # 闲聊 → 直接 LLM 回答
    CODE = "code"                 # 代码相关 → 代码库检索
    REALTIME = "realtime"         # 实时信息 → 联网搜索


class QueryRoute(BaseModel):
    query_type: QueryType
    confidence: float
    suggested_sources: list[str]
    needs_retrieval: bool


def route_query(query: str) -> QueryRoute:
    """
    查询路由:分类查询类型,决定检索策略
    """
    prompt = f"""分析以下查询的类型,返回 JSON 格式:

查询:{query}

类型定义:
- factual: 事实性问题(有明确答案),如"Python 3.12 发布日期"
- analytical: 分析比较类,如"React vs Vue 的优缺点"
- procedural: 操作步骤类,如"如何部署 Docker 容器"
- conversational: 闲聊/问候,如"你好"、"谢谢"
- code: 代码生成/调试,如"写一个排序算法"
- realtime: 需要实时信息,如"今天的天气"、"最新的股价"

输出格式:
{{
  "query_type": "类型",
  "confidence": 0.95,
  "suggested_sources": ["knowledge_base", "code_repo", "web_search"],
  "needs_retrieval": true
}}
"""

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0,
        response_format={"type": "json_object"}
    )

    import json
    data = json.loads(response.choices[0].message.content)
    return QueryRoute(**data)


class RAGRouter:
    """
    RAG 路由器:根据查询类型分发到不同处理管道
    """

    def __init__(self, retrievers: dict, llm_client):
        self.retrievers = retrievers  # {"knowledge_base": ..., "code_repo": ..., ...}
        self.client = llm_client

    def process(self, query: str) -> dict:
        route = route_query(query)
        print(f"路由结果:{route.query_type}(置信度 {route.confidence:.0%})")

        if not route.needs_retrieval:
            # 不需要检索,直接 LLM 回答
            return self._direct_answer(query)

        if route.query_type == QueryType.FACTUAL:
            return self._factual_pipeline(query)
        elif route.query_type == QueryType.ANALYTICAL:
            return self._analytical_pipeline(query)
        elif route.query_type == QueryType.PROCEDURAL:
            return self._procedural_pipeline(query)
        elif route.query_type == QueryType.CODE:
            return self._code_pipeline(query)
        elif route.query_type == QueryType.REALTIME:
            return self._realtime_pipeline(query)
        else:
            return self._direct_answer(query)

    def _factual_pipeline(self, query: str) -> dict:
        """事实查询:精确检索 + 低温度生成"""
        docs = self.retrievers["knowledge_base"].search(query, top_k=3)
        answer = self._generate(query, docs, temperature=0)
        return {"answer": answer, "sources": docs, "pipeline": "factual"}

    def _analytical_pipeline(self, query: str) -> dict:
        """分析比较:多路检索 + 查询分解"""
        decomposed = decompose_query(query)
        all_docs = []
        for sq in decomposed["sub_questions"]:
            docs = self.retrievers["knowledge_base"].search(sq["question"], top_k=3)
            all_docs.extend(docs)
        all_docs = list(dict.fromkeys(all_docs))[:8]  # 去重,保留更多上下文
        answer = self._generate(query, all_docs, temperature=0.3)
        return {"answer": answer, "sources": all_docs, "pipeline": "analytical"}

    def _procedural_pipeline(self, query: str) -> dict:
        """操作步骤:检索 + 步骤化输出"""
        docs = self.retrievers["knowledge_base"].search(query, top_k=5)
        answer = self._generate(
            query, docs, temperature=0.3,
            system_extra="请以清晰的步骤格式回答,使用编号列表。"
        )
        return {"answer": answer, "sources": docs, "pipeline": "procedural"}

    def _code_pipeline(self, query: str) -> dict:
        """代码查询:代码库检索 + 代码生成"""
        docs = self.retrievers.get("code_repo", self.retrievers["knowledge_base"]).search(query, top_k=5)
        answer = self._generate(
            query, docs, temperature=0.2,
            system_extra="请提供完整可运行的代码示例,包含注释和错误处理。"
        )
        return {"answer": answer, "sources": docs, "pipeline": "code"}

    def _realtime_pipeline(self, query: str) -> dict:
        """实时信息:联网搜索(此处为示意)"""
        # 实际应用中接入搜索 API(如 Tavily、SerpAPI)
        return {
            "answer": "此问题需要实时信息,建议接入搜索 API。",
            "sources": [],
            "pipeline": "realtime"
        }

    def _direct_answer(self, query: str) -> dict:
        """直接回答,不检索"""
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": query}],
            temperature=0.7
        )
        return {
            "answer": response.choices[0].message.content,
            "sources": [],
            "pipeline": "direct"
        }

    def _generate(self, query: str, docs: list, temperature: float = 0.3,
                  system_extra: str = "") -> str:
        context = "\n\n---\n\n".join(docs)
        system_msg = f"根据以下参考资料回答问题。如果资料不足,请明确说明。{system_extra}"
        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": system_msg},
                {"role": "user", "content": f"参考资料:\n{context}\n\n问题:{query}"}
            ],
            temperature=temperature
        )
        return response.choices[0].message.content

4.4.2 检索中优化(Retrieval Enhancement)

语义检索(向量相似度)与关键词检索(BM25)结合,兼顾语义理解和精确匹配。

python
# pip install rank_bm25 numpy

import numpy as np
from rank_bm25 import BM25Okapi
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import jieba


class HybridRetriever:
    """
    混合检索器:语义检索 + BM25 关键词检索
    
    优势:
    - 语义检索擅长:同义词、换一种说法的查询
    - BM25 擅长:精确术语匹配、专有名词、代码片段
    - 两者互补,综合效果 > 单一方法
    """

    def __init__(
        self,
        embedding_model: str = "BAAI/bge-large-zh-v1.5",
        semantic_weight: float = 0.6,
        bm25_weight: float = 0.4
    ):
        self.embed_model = SentenceTransformer(embedding_model)
        self.semantic_weight = semantic_weight
        self.bm25_weight = bm25_weight
        self.documents: list[str] = []
        self.doc_embeddings: np.ndarray | None = None
        self.bm25: BM25Okapi | None = None

    def index(self, documents: list[str]):
        """建立索引"""
        self.documents = documents

        # 语义索引
        print("构建语义索引...")
        self.doc_embeddings = self.embed_model.encode(documents, show_progress_bar=True)

        # BM25 索引(中文需要分词)
        print("构建 BM25 索引...")
        tokenized_docs = [list(jieba.cut(doc)) for doc in documents]
        self.bm25 = BM25Okapi(tokenized_docs)

        print(f"索引完成:{len(documents)} 个文档")

    def search(self, query: str, top_k: int = 5) -> list[dict]:
        """
        混合检索
        
        Returns:
            [{"text": "...", "score": 0.85, "semantic_score": 0.9, "bm25_score": 0.7, "rank": 1}, ...]
        """
        # 1. 语义检索
        query_embedding = self.embed_model.encode([query])
        semantic_scores = cosine_similarity(query_embedding, self.doc_embeddings)[0]

        # 2. BM25 检索
        tokenized_query = list(jieba.cut(query))
        bm25_scores = self.bm25.get_scores(tokenized_query)

        # 3. 归一化分数到 [0, 1]
        def normalize(scores):
            min_s, max_s = scores.min(), scores.max()
            if max_s == min_s:
                return np.zeros_like(scores)
            return (scores - min_s) / (max_s - min_s)

        semantic_norm = normalize(semantic_scores)
        bm25_norm = normalize(bm25_scores)

        # 4. 加权融合
        combined_scores = (
            self.semantic_weight * semantic_norm +
            self.bm25_weight * bm25_norm
        )

        # 5. 排序并返回 Top-K
        top_indices = np.argsort(combined_scores)[::-1][:top_k]

        results = []
        for rank, idx in enumerate(top_indices, 1):
            results.append({
                "text": self.documents[idx],
                "score": float(combined_scores[idx]),
                "semantic_score": float(semantic_scores[idx]),
                "bm25_score": float(bm25_scores[idx]),
                "rank": rank,
                "index": int(idx)
            })

        return results


# 使用示例
retriever = HybridRetriever(semantic_weight=0.6, bm25_weight=0.4)

documents = [
    "FastAPI 是一个现代、快速的 Python Web 框架,基于 Starlette 和 Pydantic。",
    "Django 是 Python 最流行的全栈 Web 框架,内置 ORM、Admin 和模板引擎。",
    "Flask 是一个轻量级的 Python Web 微框架,适合小型项目和 API 开发。",
    "SQLAlchemy 是 Python 最强大的 ORM 库,支持多种数据库后端。",
    "使用 pip install fastapi 安装 FastAPI,然后用 uvicorn 启动服务。",
    "ASGI 是异步服务器网关接口,FastAPI 和 Starlette 都基于 ASGI 标准。",
]

retriever.index(documents)

results = retriever.search("如何安装和使用 FastAPI", top_k=3)
for r in results:
    print(f"[排名 {r['rank']}] 综合: {r['score']:.3f} | "
          f"语义: {r['semantic_score']:.3f} | BM25: {r['bm25_score']:.3f}")
    print(f"  {r['text'][:80]}...\n")

2. 父子文档检索(Parent-Child Retrieval)

核心思想:用小块(Child)做精确检索,返回大块(Parent)做上下文。解决"分块太小丢失上下文,分块太大检索不精确"的矛盾。

python
import uuid
from dataclasses import dataclass, field


@dataclass
class DocumentChunk:
    chunk_id: str
    parent_id: str | None
    text: str
    level: str  # "parent" 或 "child"
    metadata: dict = field(default_factory=dict)


class ParentChildRetriever:
    """
    父子文档检索器
    
    架构:
    - Parent(大块,800-1500字符):保留完整上下文,用于生成
    - Child(小块,200-400字符):精确语义匹配,用于检索
    
    流程:
    1. 文档 → 切分为 Parent 块
    2. 每个 Parent → 进一步切分为多个 Child 块
    3. 检索时:用 Child 块做向量检索
    4. 返回时:返回匹配 Child 对应的 Parent 块
    """

    def __init__(self, embedding_model, vector_store):
        self.embed_model = embedding_model
        self.vector_store = vector_store
        self.parent_store: dict[str, DocumentChunk] = {}  # parent_id → chunk
        self.child_to_parent: dict[str, str] = {}  # child_id → parent_id

    def index_document(
        self,
        text: str,
        parent_chunk_size: int = 1000,
        parent_overlap: int = 100,
        child_chunk_size: int = 300,
        child_overlap: int = 50,
        metadata: dict = None
    ):
        """建立父子索引"""
        metadata = metadata or {}

        # Step 1: 切分为 Parent 块
        parent_chunks = self._split_text(text, parent_chunk_size, parent_overlap)

        for parent_text in parent_chunks:
            parent_id = str(uuid.uuid4())
            parent_chunk = DocumentChunk(
                chunk_id=parent_id,
                parent_id=None,
                text=parent_text,
                level="parent",
                metadata=metadata
            )
            self.parent_store[parent_id] = parent_chunk

            # Step 2: 每个 Parent 切分为 Child 块
            child_chunks = self._split_text(parent_text, child_chunk_size, child_overlap)

            for child_text in child_chunks:
                child_id = str(uuid.uuid4())
                self.child_to_parent[child_id] = parent_id

                # Step 3: 只将 Child 块存入向量数据库
                child_embedding = self.embed_model.encode(child_text)
                self.vector_store.add(
                    ids=[child_id],
                    embeddings=[child_embedding.tolist()],
                    documents=[child_text],
                    metadatas=[{"parent_id": parent_id, **metadata}]
                )

        print(f"索引完成:{len(parent_chunks)} 个 Parent 块")

    def search(self, query: str, top_k: int = 3) -> list[dict]:
        """
        检索:用 Child 匹配,返回 Parent 上下文
        """
        query_embedding = self.embed_model.encode(query)

        # 检索 Child 块(多检索一些,因为多个 Child 可能属于同一 Parent)
        child_results = self.vector_store.query(
            query_embeddings=[query_embedding.tolist()],
            n_results=top_k * 3
        )

        # 去重:同一 Parent 只返回一次
        seen_parents = set()
        results = []

        for i, child_id in enumerate(child_results["ids"][0]):
            parent_id = child_results["metadatas"][0][i].get("parent_id")
            if parent_id and parent_id not in seen_parents:
                seen_parents.add(parent_id)
                parent_chunk = self.parent_store.get(parent_id)
                if parent_chunk:
                    results.append({
                        "parent_text": parent_chunk.text,
                        "matched_child": child_results["documents"][0][i],
                        "distance": child_results["distances"][0][i] if "distances" in child_results else None,
                        "metadata": parent_chunk.metadata
                    })

                if len(results) >= top_k:
                    break

        return results

    @staticmethod
    def _split_text(text: str, chunk_size: int, overlap: int) -> list[str]:
        """简单的固定大小分块"""
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunks.append(text[start:end])
            start = end - overlap
        return chunks


# 使用示例
from sentence_transformers import SentenceTransformer
import chromadb

embed_model = SentenceTransformer("BAAI/bge-large-zh-v1.5")
chroma_client = chromadb.Client()
collection = chroma_client.create_collection("parent_child_demo")

retriever = ParentChildRetriever(embed_model, collection)

long_document = """
# FastAPI 完整教程

## 第一章:安装与配置

FastAPI 是一个现代、高性能的 Python Web 框架。它基于 Python 3.7+ 的类型提示,
使用 Starlette 作为 Web 框架核心,Pydantic 作为数据验证库。

安装 FastAPI 非常简单,只需要运行 pip install fastapi uvicorn 即可。
uvicorn 是推荐的 ASGI 服务器,用于运行 FastAPI 应用。

创建第一个应用只需要几行代码:
from fastapi import FastAPI
app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}

使用 uvicorn main:app --reload 启动开发服务器。

## 第二章:路由与请求处理

FastAPI 支持所有标准的 HTTP 方法:GET、POST、PUT、DELETE、PATCH。
路由定义使用装饰器语法,非常直观。

路径参数通过函数参数自动解析,类型会自动验证。
查询参数也是通过函数参数定义,FastAPI 会自动区分路径参数和查询参数。

请求体使用 Pydantic 模型定义,自动完成数据验证和序列化。
这是 FastAPI 最强大的特性之一,可以大幅减少手动验证代码。
"""

retriever.index_document(
    long_document,
    parent_chunk_size=500,
    child_chunk_size=150,
    metadata={"source": "fastapi_tutorial.md"}
)

results = retriever.search("如何安装 FastAPI", top_k=2)
for r in results:
    print(f"匹配的 Child:{r['matched_child'][:80]}...")
    print(f"返回的 Parent(完整上下文):{r['parent_text'][:200]}...")
    print()

3. 多路召回与 Reciprocal Rank Fusion(RRF)

从多个检索源获取结果,使用 RRF 算法融合排名。

python
def reciprocal_rank_fusion(
    result_lists: list[list[dict]],
    k: int = 60,
    top_n: int = 10
) -> list[dict]:
    """
    Reciprocal Rank Fusion(RRF)
    
    将多个排名列表融合为一个统一排名。
    
    公式:RRF_score(d) = Σ 1 / (k + rank_i(d))
    
    Args:
        result_lists: 多个检索结果列表,每个元素包含 "id" 和 "text"
        k: 平滑参数(默认 60,论文推荐值)
        top_n: 返回前 N 个结果
    
    Returns:
        融合后的排序结果
    """
    rrf_scores: dict[str, float] = {}
    doc_map: dict[str, dict] = {}

    for result_list in result_lists:
        for rank, doc in enumerate(result_list, start=1):
            doc_id = doc.get("id") or doc.get("text")[:50]  # 用文本前50字符作为 ID
            rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1.0 / (k + rank)
            doc_map[doc_id] = doc

    # 按 RRF 分数排序
    sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)

    results = []
    for doc_id in sorted_ids[:top_n]:
        doc = doc_map[doc_id]
        doc["rrf_score"] = rrf_scores[doc_id]
        results.append(doc)

    return results


# 使用示例:融合语义检索、BM25、HyDE 三路结果
def multi_route_retrieval(query: str, retriever, top_k: int = 5) -> list[dict]:
    """多路召回 + RRF 融合"""

    # 路线 1:原始查询语义检索
    route1 = retriever.semantic_search(query, top_k=10)

    # 路线 2:BM25 关键词检索
    route2 = retriever.bm25_search(query, top_k=10)

    # 路线 3:查询改写后检索
    rewritten = rewrite_query(query)
    route3 = []
    for rq in rewritten[1:]:  # 跳过原始查询
        route3.extend(retriever.semantic_search(rq, top_k=5))

    # RRF 融合
    fused = reciprocal_rank_fusion([route1, route2, route3], top_n=top_k)

    return fused

4.4.3 检索后优化(Post-Retrieval)

1. 重排序(Reranking)

使用专门的 Cross-Encoder 模型对检索结果重新打分排序,显著提升精度。

python
# pip install sentence-transformers

from sentence_transformers import CrossEncoder


class Reranker:
    """
    重排序器:使用 Cross-Encoder 对检索结果精排
    
    原理:
    - Bi-Encoder(Embedding):分别编码 query 和 doc,速度快但精度有限
    - Cross-Encoder(Reranker):同时编码 query+doc,精度高但速度慢
    
    最佳实践:
    - 先用 Bi-Encoder 粗排(Top-20~50)
    - 再用 Cross-Encoder 精排(Top-3~5)
    """

    def __init__(self, model_name: str = "BAAI/bge-reranker-v2-m3"):
        """
        推荐模型:
        - BAAI/bge-reranker-v2-m3:多语言,效果最好
        - BAAI/bge-reranker-large:中文优化
        - cross-encoder/ms-marco-MiniLM-L-12-v2:英文,速度快
        """
        self.model = CrossEncoder(model_name)

    def rerank(
        self,
        query: str,
        documents: list[str],
        top_k: int = 5,
        score_threshold: float = 0.0
    ) -> list[dict]:
        """
        重排序
        
        Args:
            query: 用户查询
            documents: 待排序的文档列表
            top_k: 返回前 K 个
            score_threshold: 最低分数阈值(过滤低相关文档)
        
        Returns:
            [{"text": "...", "score": 0.95, "original_rank": 3}, ...]
        """
        # 构建 query-doc 对
        pairs = [(query, doc) for doc in documents]

        # Cross-Encoder 打分
        scores = self.model.predict(pairs)

        # 排序
        ranked = sorted(
            [
                {"text": doc, "score": float(score), "original_rank": i + 1}
                for i, (doc, score) in enumerate(zip(documents, scores))
            ],
            key=lambda x: x["score"],
            reverse=True
        )

        # 过滤低分 + 截断
        ranked = [r for r in ranked if r["score"] >= score_threshold]

        return ranked[:top_k]


# 使用示例
reranker = Reranker("BAAI/bge-reranker-v2-m3")

query = "FastAPI 如何实现流式响应"
# 假设粗排返回了 10 个文档
coarse_results = [
    "FastAPI 支持 StreamingResponse,可以实现流式输出。",
    "Django 是一个全栈 Web 框架。",
    "使用 SSE(Server-Sent Events)可以实现服务端推送。",
    "FastAPI 的依赖注入系统非常强大。",
    "流式响应在 AI 应用中非常重要,可以实现打字效果。",
    "Flask 也可以通过 Response 生成器实现流式输出。",
    "Pydantic 用于数据验证和序列化。",
    "ASGI 服务器支持异步流式处理。",
    "uvicorn 是推荐的 ASGI 服务器。",
    "WebSocket 也可以用于实时双向通信。",
]

reranked = reranker.rerank(query, coarse_results, top_k=5, score_threshold=0.1)

print(f"查询:{query}\n")
for r in reranked:
    print(f"  [分数 {r['score']:.3f}] (原排名 {r['original_rank']}) {r['text']}")

使用 Cohere Rerank API(云端方案):

python
# pip install cohere

import cohere

co = cohere.Client("your-cohere-api-key")

def cohere_rerank(query: str, documents: list[str], top_k: int = 5) -> list[dict]:
    """使用 Cohere Rerank API(无需本地 GPU)"""
    response = co.rerank(
        model="rerank-multilingual-v3.0",  # 多语言模型
        query=query,
        documents=documents,
        top_n=top_k,
        return_documents=True
    )

    return [
        {
            "text": r.document.text,
            "score": r.relevance_score,
            "original_rank": r.index + 1
        }
        for r in response.results
    ]

2. 上下文压缩(Context Compression)

检索到的文档可能包含大量无关信息。上下文压缩只保留与查询相关的部分,减少 token 消耗并提升答案质量。

python
def compress_context(
    query: str,
    documents: list[str],
    max_total_tokens: int = 2000
) -> list[str]:
    """
    上下文压缩:从检索文档中提取与查询最相关的段落
    
    策略:
    1. 对每个文档,提取与查询相关的关键句子
    2. 控制总 token 数不超过预算
    3. 保留文档来源标记(便于引用溯源)
    """
    compressed_docs = []

    for i, doc in enumerate(documents):
        prompt = f"""从以下文档中提取与查询最相关的内容。

要求:
1. 只保留直接相关的句子和段落
2. 保持原文表述,不要改写
3. 如果整个文档都不相关,返回 "NOT_RELEVANT"
4. 压缩后的内容不超过原文的 50%

查询:{query}

文档 {i + 1}
{doc}

提取的相关内容:
"""

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
            max_tokens=500
        )

        compressed = response.choices[0].message.content.strip()
        if compressed != "NOT_RELEVANT" and len(compressed) > 20:
            compressed_docs.append(f"[来源 {i + 1}] {compressed}")

    return compressed_docs


# 轻量级方案:基于句子相似度的压缩(无需 LLM 调用)
def compress_by_similarity(
    query: str,
    document: str,
    embed_model,
    top_sentences: int = 5,
    min_similarity: float = 0.3
) -> str:
    """基于句子级语义相似度的压缩(不依赖 LLM,速度快)"""
    import re
    sentences = re.split(r'[。!?\n]+', document)
    sentences = [s.strip() for s in sentences if len(s.strip()) > 10]

    if not sentences:
        return document

    query_emb = embed_model.encode([query])
    sent_embs = embed_model.encode(sentences)
    similarities = cosine_similarity(query_emb, sent_embs)[0]

    # 按相似度排序,保留 top_sentences 个
    ranked = sorted(
        zip(sentences, similarities, range(len(sentences))),
        key=lambda x: x[1],
        reverse=True
    )

    # 过滤低相似度 + 截断
    selected = [
        (sent, sim, idx) for sent, sim, idx in ranked
        if sim >= min_similarity
    ][:top_sentences]

    # 按原文顺序排列(保持连贯性)
    selected.sort(key=lambda x: x[2])

    return "。".join([s[0] for s in selected]) + "。"

3. 引用溯源(Citation Tracking)

让 LLM 在回答中标注信息来源,提升可信度并方便用户验证。

python
def generate_with_citations(
    query: str,
    documents: list[dict],  # [{"text": "...", "source": "文件名", "page": 3}, ...]
) -> dict:
    """
    带引用的回答生成
    
    Returns:
        {
            "answer": "根据资料[1],FastAPI 支持流式响应...[2]",
            "citations": [
                {"id": 1, "source": "fastapi_docs.pdf", "page": 15, "excerpt": "..."},
                {"id": 2, "source": "tutorial.md", "excerpt": "..."}
            ]
        }
    """
    # 构建带编号的上下文
    numbered_context = ""
    for i, doc in enumerate(documents, 1):
        source_info = doc.get("source", "未知来源")
        page_info = f",第{doc['page']}页" if doc.get("page") else ""
        numbered_context += f"\n[{i}] ({source_info}{page_info})\n{doc['text']}\n"

    prompt = f"""根据以下编号参考资料回答问题。

要求:
1. 在回答中使用 [编号] 标注信息来源,如 [1]、[2]
2. 每个关键事实都必须标注来源
3. 如果多个来源支持同一观点,标注所有来源,如 [1][3]
4. 如果参考资料不足以回答,明确说明并标注哪些部分缺乏依据

参考资料:
{numbered_context}

问题:{query}

带引用的回答:
"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3
    )

    answer = response.choices[0].message.content

    # 提取引用编号
    import re
    cited_ids = set(int(m) for m in re.findall(r'\[(\d+)\]', answer))

    citations = [
        {
            "id": i,
            "source": doc.get("source", "未知"),
            "page": doc.get("page"),
            "excerpt": doc["text"][:200] + "..."
        }
        for i, doc in enumerate(documents, 1)
        if i in cited_ids
    ]

    return {
        "answer": answer,
        "citations": citations,
        "uncited_sources": [i for i in range(1, len(documents) + 1) if i not in cited_ids]
    }

4.4.4 高级 RAG 架构

1. Corrective RAG(自我纠错 RAG)

检索后评估文档相关性,不相关时自动触发补充检索或联网搜索。

python
class CorrectiveRAG:
    """
    Corrective RAG(CRAG)
    
    流程:
    1. 检索文档
    2. 评估每个文档的相关性(Correct / Ambiguous / Incorrect)
    3. 根据评估结果决定下一步:
       - 全部 Correct → 直接生成
       - 部分 Ambiguous → 知识精炼后生成
       - 全部 Incorrect → 触发 Web 搜索补充
    
    论文:Corrective Retrieval Augmented Generation (2024)
    """

    def __init__(self, retriever, reranker=None):
        self.retriever = retriever
        self.reranker = reranker

    def evaluate_relevance(self, query: str, document: str) -> dict:
        """评估单个文档与查询的相关性"""
        prompt = f"""评估以下文档与查询的相关性。

查询:{query}

文档:{document}

评估标准:
- correct:文档直接包含回答查询所需的信息
- ambiguous:文档部分相关,可能有用但不确定
- incorrect:文档与查询完全无关

以 JSON 格式输出:
&#123;&#123;"relevance": "correct/ambiguous/incorrect", "confidence": 0.95, "reason": "简要说明"&#125;&#125;
"""
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
            response_format={"type": "json_object"}
        )

        import json
        return json.loads(response.choices[0].message.content)

    def refine_knowledge(self, query: str, document: str) -> str:
        """知识精炼:从 Ambiguous 文档中提取有用信息"""
        prompt = f"""从以下文档中提取与查询相关的信息片段。
只保留直接有用的部分,去除无关内容。如果没有任何有用信息,返回空字符串。

查询:{query}
文档:{document}

提取的相关信息:
"""
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        return response.choices[0].message.content.strip()

    def web_search(self, query: str) -> list[str]:
        """
        联网搜索补充(需接入搜索 API)
        实际应用中可使用 Tavily、SerpAPI、Bing Search 等
        """
        # 示意实现
        print(f"  触发联网搜索:{query}")
        # results = tavily_client.search(query, max_results=3)
        # return [r["content"] for r in results["results"]]
        return [f"[联网搜索结果] 关于 '{query}' 的最新信息..."]

    def query(self, query: str, top_k: int = 5) -> dict:
        """CRAG 完整查询流程"""
        print(f"查询:{query}")

        # Step 1: 检索
        raw_docs = self.retriever.search(query, top_k=top_k)
        doc_texts = [d["text"] if isinstance(d, dict) else d for d in raw_docs]
        print(f"  检索到 {len(doc_texts)} 个文档")

        # Step 2: 评估每个文档
        evaluations = []
        for doc in doc_texts:
            eval_result = self.evaluate_relevance(query, doc)
            evaluations.append(eval_result)

        correct_docs = [doc for doc, ev in zip(doc_texts, evaluations) if ev["relevance"] == "correct"]
        ambiguous_docs = [doc for doc, ev in zip(doc_texts, evaluations) if ev["relevance"] == "ambiguous"]
        incorrect_count = sum(1 for ev in evaluations if ev["relevance"] == "incorrect")

        print(f"  评估结果:{len(correct_docs)} correct, {len(ambiguous_docs)} ambiguous, {incorrect_count} incorrect")

        # Step 3: 根据评估结果决定策略
        final_context = []
        action_taken = "direct"

        if correct_docs:
            # 有正确文档 → 直接使用
            final_context.extend(correct_docs)
            action_taken = "direct"

        if ambiguous_docs:
            # 模糊文档 → 精炼后使用
            for doc in ambiguous_docs:
                refined = self.refine_knowledge(query, doc)
                if refined:
                    final_context.append(refined)
            action_taken = "refined"

        if not final_context:
            # 全部不相关 → 联网搜索
            web_results = self.web_search(query)
            final_context.extend(web_results)
            action_taken = "web_search"

        # Step 4: 生成最终答案
        context = "\n\n---\n\n".join(final_context)
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "根据参考资料准确回答问题。如果信息不足,请明确说明。"},
                {"role": "user", "content": f"参考资料:\n{context}\n\n问题:{query}"}
            ],
            temperature=0.3
        )

        return {
            "answer": response.choices[0].message.content,
            "action": action_taken,
            "num_correct": len(correct_docs),
            "num_ambiguous": len(ambiguous_docs),
            "num_incorrect": incorrect_count,
            "context_used": final_context
        }

2. Self-RAG(自适应检索生成)

模型自主决定是否需要检索检索结果是否有用生成的答案是否需要引用支持

python
class SelfRAG:
    """
    Self-RAG:让 LLM 自主控制检索行为
    
    核心 Token(论文定义的特殊标记):
    - [Retrieve]:是否需要检索(Yes/No)
    - [IsRel]:检索结果是否相关(Relevant/Irrelevant)
    - [IsSup]:生成内容是否有检索支持(Fully/Partially/No)
    - [IsUse]:最终答案是否有用(1-5分)
    
    简化实现:用 LLM 判断代替特殊 Token
    """

    def __init__(self, retriever):
        self.retriever = retriever

    def should_retrieve(self, query: str, partial_answer: str = "") -> bool:
        """判断是否需要检索"""
        prompt = f"""判断回答以下问题是否需要检索外部知识库。

问题:{query}
{f"已有部分回答:{partial_answer}" if partial_answer else ""}

判断标准:
- 需要检索:涉及具体事实、数据、最新信息、专业知识
- 不需要检索:通用常识、简单计算、创意写作、闲聊

只回答 "yes" 或 "no":
"""
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
            max_tokens=5
        )
        return "yes" in response.choices[0].message.content.lower()

    def check_support(self, claim: str, evidence: str) -> dict:
        """检查生成的内容是否有证据支持"""
        prompt = f"""判断以下声明是否被证据支持。

声明:{claim}
证据:{evidence}

以 JSON 输出:
&#123;&#123;"support_level": "fully/partially/no", "explanation": "简要说明"&#125;&#125;
"""
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
            response_format={"type": "json_object"}
        )
        import json
        return json.loads(response.choices[0].message.content)

    def query(self, query: str) -> dict:
        """Self-RAG 完整流程"""
        # Step 1: 判断是否需要检索
        needs_retrieval = self.should_retrieve(query)
        print(f"需要检索:{needs_retrieval}")

        if not needs_retrieval:
            # 直接生成
            response = client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": query}],
                temperature=0.7
            )
            return {
                "answer": response.choices[0].message.content,
                "retrieved": False,
                "sources": []
            }

        # Step 2: 检索
        docs = self.retriever.search(query, top_k=5)
        doc_texts = [d["text"] if isinstance(d, dict) else d for d in docs]

        # Step 3: 生成答案
        context = "\n\n".join(doc_texts)
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "根据参考资料回答问题,确保每个关键事实都有资料支持。"},
                {"role": "user", "content": f"参考资料:\n{context}\n\n问题:{query}"}
            ],
            temperature=0.3
        )
        answer = response.choices[0].message.content

        # Step 4: 自我验证(检查答案是否有证据支持)
        support_check = self.check_support(answer, context)
        print(f"证据支持度:{support_check['support_level']}")

        if support_check["support_level"] == "no":
            # 答案缺乏支持 → 重新生成,更保守
            response = client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "严格根据参考资料回答。没有依据的内容请明确标注'资料未提及'。"},
                    {"role": "user", "content": f"参考资料:\n{context}\n\n问题:{query}"}
                ],
                temperature=0
            )
            answer = response.choices[0].message.content

        return {
            "answer": answer,
            "retrieved": True,
            "sources": doc_texts,
            "support_level": support_check["support_level"]
        }

3. Graph RAG(知识图谱增强 RAG)

将文档构建为知识图谱,利用图结构进行多跳推理和关系发现。

python
# pip install networkx

import networkx as nx
from dataclasses import dataclass


@dataclass
class Entity:
    name: str
    type: str  # "person", "organization", "technology", "concept"
    description: str = ""


@dataclass
class Relation:
    source: str
    target: str
    relation_type: str  # "uses", "belongs_to", "created_by", "compared_with"
    description: str = ""


class GraphRAG:
    """
    Graph RAG:知识图谱增强的 RAG
    
    优势(相比纯向量检索):
    1. 多跳推理:"A 的创始人的母校是?" → A→创始人→母校
    2. 关系发现:"哪些技术与 FastAPI 相关?" → 图遍历
    3. 全局摘要:

坚持是一种品格