学习时长:3-4 周
文档处理是 RAG 系统的数据准备阶段,直接决定了检索质量。正确的文档解析和分块策略能够提升检索准确率 30% 以上。
目录
4.3.1 文档解析(多格式支持)
主流文档格式解析库对比
| 库 | 支持格式 | 易用性 | 质量 | 推荐场景 |
|---|---|---|---|---|
| PyPDF2 | ⭐⭐⭐ | ⭐⭐ | 简单 PDF,无复杂布局 | |
| pdfplumber | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 表格提取,布局分析 | |
| PyMuPDF (fitz) | PDF, XPS, EPUB | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 高质量 PDF 解析(推荐) |
| python-docx | Word (.docx) | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | Word 文档解析 |
| python-pptx | PowerPoint | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | PPT 演示文稿 |
| beautifulsoup4 | HTML | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 网页抓取 |
| Unstructured | 20+ 格式 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 一站式解决方案 |
1. PDF 解析(PyMuPDF 推荐)
python
import fitz # PyMuPDF
def extract_pdf_text(pdf_path: str) -> str:
"""
提取 PDF 文本内容
Args:
pdf_path: PDF 文件路径
Returns:
提取的文本内容
"""
doc = fitz.open(pdf_path)
full_text = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
# 添加页码标记(方便后续追溯来源)
full_text.append(f"## 第 {page_num + 1} 页\n\n{text}")
doc.close()
return "\n\n".join(full_text)
# 使用
pdf_text = extract_pdf_text("technical_manual.pdf")
print(f"提取文本长度:{len(pdf_text)} 字符")提取 PDF 表格(pdfplumber):
python
import pdfplumber
def extract_pdf_tables(pdf_path: str):
"""提取 PDF 中的表格"""
tables = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
page_tables = page.extract_tables()
for table in page_tables:
# 转换为 Markdown 格式
if table:
headers = table[0]
rows = table[1:]
md_table = "| " + " | ".join(headers) + " |\n"
md_table += "| " + " | ".join(["---"] * len(headers)) + " |\n"
for row in rows:
md_table += "| " + " | ".join(row) + " |\n"
tables.append(md_table)
return tables
# 使用
tables = extract_pdf_tables("report.pdf")
for i, table in enumerate(tables):
print(f"表格 {i+1}:\n{table}\n")带图片描述的 PDF 解析(OCR):
python
import fitz
from PIL import Image
import io
def extract_pdf_with_images(pdf_path: str):
"""提取 PDF 文本 + 图片(可选 OCR)"""
doc = fitz.open(pdf_path)
content = []
for page_num in range(len(doc)):
page = doc[page_num]
# 提取文本
text = page.get_text()
content.append(f"## 第 {page_num + 1} 页\n\n{text}")
# 提取图片
images = page.get_images()
for img_index, img in enumerate(images):
xref = img[0]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
# 保存图片(可选)
img_path = f"page{page_num+1}_img{img_index+1}.png"
with open(img_path, "wb") as f:
f.write(image_bytes)
# 添加图片占位符(后续可用 Vision API 生成描述)
content.append(f"\n[图片: {img_path}]\n")
doc.close()
return "\n".join(content)2. Word 文档解析
python
from docx import Document
def extract_word_text(docx_path: str) -> str:
"""
提取 Word 文档内容,保留结构
支持:段落、标题、列表、表格
"""
doc = Document(docx_path)
content = []
for element in doc.element.body:
# 段落
if element.tag.endswith('p'):
para = next((p for p in doc.paragraphs if p._element == element), None)
if para:
# 检测标题级别
if para.style.name.startswith('Heading'):
level = int(para.style.name.split()[-1])
content.append(f"{'#' * level} {para.text}")
else:
content.append(para.text)
# 表格
elif element.tag.endswith('tbl'):
table = next((t for t in doc.tables if t._element == element), None)
if table:
md_table = []
for i, row in enumerate(table.rows):
cells = [cell.text.strip() for cell in row.cells]
md_table.append("| " + " | ".join(cells) + " |")
# 添加表头分隔线
if i == 0:
md_table.append("| " + " | ".join(["---"] * len(cells)) + " |")
content.append("\n".join(md_table))
return "\n\n".join(content)
# 使用
word_text = extract_word_text("company_policy.docx")
print(word_text[:500])提取 Word 元数据:
python
from docx import Document
def extract_word_metadata(docx_path: str) -> dict:
"""提取 Word 文档元数据"""
doc = Document(docx_path)
core_props = doc.core_properties
metadata = {
"title": core_props.title,
"author": core_props.author,
"subject": core_props.subject,
"created": core_props.created,
"modified": core_props.modified,
"keywords": core_props.keywords
}
return metadata
metadata = extract_word_metadata("report.docx")
print(f"文档标题:{metadata['title']}")
print(f"作者:{metadata['author']}")3. 网页抓取(BeautifulSoup)
python
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
def extract_webpage_text(url: str) -> dict:
"""
提取网页主要内容
Returns:
{"title": "标题", "text": "正文", "links": ["链接1", "链接2"]}
"""
response = requests.get(url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
})
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
# 移除脚本和样式
for script in soup(["script", "style", "nav", "footer"]):
script.decompose()
# 提取标题
title = soup.find('title').get_text() if soup.find('title') else ""
# 提取正文(启发式方法:找最长的文本块)
main_content = soup.find('main') or soup.find('article') or soup.find('body')
text = main_content.get_text(separator='\n', strip=True)
# 提取链接
links = []
for link in soup.find_all('a', href=True):
full_url = urljoin(url, link['href'])
links.append(full_url)
return {
"title": title,
"text": text,
"links": links[:10] # 只保留前10个链接
}
# 使用
webpage = extract_webpage_text("https://example.com/article")
print(f"标题:{webpage['title']}")
print(f"正文长度:{len(webpage['text'])} 字符")4. 统一文档加载器(Unstructured)
python
from unstructured.partition.auto import partition
def extract_any_document(file_path: str) -> str:
"""
自动识别并解析任何格式的文档
支持:PDF, Word, PPT, HTML, Markdown, 图片(OCR), Excel 等
"""
elements = partition(filename=file_path)
# 将所有元素转换为文本
text = "\n\n".join([str(el) for el in elements])
return text
# 使用
text = extract_any_document("unknown_format.pdf")
print(text[:500])4.3.2 文本分块策略(Chunking)
分块策略直接影响检索质量,过小会丢失上下文,过大会降低检索精度。
分块策略对比
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 固定大小 | 简单、快速 | 可能截断语义 | 结构化文档、代码 |
| 按句子分割 | 保留完整语义 | 长度不均匀 | 新闻、文章 |
| 递归分块 | 平衡长度与语义 | 计算复杂 | 通用场景(推荐) |
| 语义分块 | 最优语义完整性 | 需要模型计算 | 高质量要求 |
| 按章节标题 | 结构清晰 | 依赖文档格式 | 技术文档、书籍 |
1. 固定大小分块(简单粗暴)
python
def chunk_by_size(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
"""
按固定大小分块,带重叠
Args:
text: 待分块文本
chunk_size: 每块字符数
overlap: 重叠字符数(保留上下文)
Returns:
文本块列表
"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - overlap # 重叠部分
return chunks
# 使用
text = "这是一个很长的文档..." * 100
chunks = chunk_by_size(text, chunk_size=500, overlap=50)
print(f"分成 {len(chunks)} 个块")2. 递归分块(推荐)
python
from langchain.text_splitter import RecursiveCharacterTextSplitter
def chunk_by_recursive(text: str, chunk_size: int = 500, chunk_overlap: int = 50) -> list[str]:
"""
递归分块:优先按段落,再按句子,最后按字符
分隔符优先级:\n\n → \n → 。 → , → 空格
"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", "。", "!", "?", ",", " ", ""],
length_function=len
)
chunks = splitter.split_text(text)
return chunks
# 使用
text = """
# 深度学习简介
深度学习是机器学习的一个分支,它使用多层神经网络来学习数据的表示。
## 核心概念
神经网络由多个层组成,每层包含多个神经元。通过反向传播算法训练网络。
## 应用场景
深度学习广泛应用于图像识别、自然语言处理、语音识别等领域。
"""
chunks = chunk_by_recursive(text, chunk_size=100, chunk_overlap=20)
for i, chunk in enumerate(chunks):
print(f"Chunk {i+1}:{chunk}\n")3. 语义分块(最优质量)
python
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
def chunk_by_semantic(text: str, similarity_threshold: float = 0.5) -> list[str]:
"""
语义分块:基于句子语义相似度分组
Args:
text: 待分块文本
similarity_threshold: 相似度阈值(低于此值则分块)
Returns:
语义连贯的文本块
"""
# 按句子分割
sentences = [s.strip() for s in text.split('。') if s.strip()]
# 生成句子向量
model = SentenceTransformer('BAAI/bge-small-zh-v1.5')
embeddings = model.encode(sentences)
# 计算相邻句子的相似度
chunks = []
current_chunk = [sentences[0]]
for i in range(1, len(sentences)):
similarity = cosine_similarity(
[embeddings[i-1]],
[embeddings[i]]
)[0][0]
if similarity >= similarity_threshold:
# 语义相似,合并到当前块
current_chunk.append(sentences[i])
else:
# 语义差异大,开始新块
chunks.append('。'.join(current_chunk) + '。')
current_chunk = [sentences[i]]
# 添加最后一块
if current_chunk:
chunks.append('。'.join(current_chunk) + '。')
return chunks
# 使用
text = """
人工智能发展迅速。深度学习是其核心技术。神经网络模拟人脑结构。
今天天气很好。阳光明媚适合出游。我们去公园散步吧。
Python是流行的编程语言。它广泛用于AI开发。TensorFlow和PyTorch是常用框架。
"""
chunks = chunk_by_semantic(text, similarity_threshold=0.6)
for i, chunk in enumerate(chunks):
print(f"语义块 {i+1}:{chunk}\n")4. 按标题分块(结构化文档)
python
import re
def chunk_by_headers(markdown_text: str) -> list[dict]:
"""
按 Markdown 标题分块
Returns:
[{"title": "标题", "level": 1, "content": "内容"}, ...]
"""
chunks = []
current_chunk = {"title": "", "level": 0, "content": ""}
for line in markdown_text.split('\n'):
# 检测标题
header_match = re.match(r'^(#{1,6})\s+(.+)$', line)
if header_match:
# 保存上一个块
if current_chunk["content"]:
chunks.append(current_chunk.copy())
# 开始新块
level = len(header_match.group(1))
title = header_match.group(2)
current_chunk = {
"title": title,
"level": level,
"content": ""
}
else:
current_chunk["content"] += line + "\n"
# 添加最后一块
if current_chunk["content"]:
chunks.append(current_chunk)
return chunks
# 使用
markdown = """
# 第一章 引言
这是引言内容。
## 1.1 背景
背景描述...
## 1.2 目标
目标说明...
# 第二章 方法
方法详解...
"""
chunks = chunk_by_headers(markdown)
for chunk in chunks:
print(f"{'#' * chunk['level']} {chunk['title']}")
print(chunk['content'][:50] + "...\n")4.3.3 元数据提取与索引优化
元数据能显著提升检索精度和用户体验(可追溯来源、按时间/作者过滤等)。
核心元数据类型
| 元数据 | 说明 | 示例 | 用途 |
|---|---|---|---|
| source | 文档来源 | "技术手册.pdf" | 追溯信息来源 |
| page | 页码 | 42 | 精确定位 |
| section | 章节标题 | "2.3 模型部署" | 结构化检索 |
| author | 作者 | "张三" | 按作者过滤 |
| created_at | 创建时间 | "2024-01-15" | 按时间排序 |
| category | 分类标签 | "技术文档" | 分类检索 |
| keywords | 关键词 | ["Python", "AI"] | 关键词匹配 |
完整的文档处理流程
python
from datetime import datetime
import hashlib
def process_document(file_path: str, category: str = "general") -> list[dict]:
"""
完整的文档处理流程:解析 → 分块 → 提取元数据
Returns:
[
{
"chunk_id": "doc1_chunk0",
"text": "文本内容",
"metadata": {
"source": "文件名",
"chunk_index": 0,
"category": "分类",
...
}
},
...
]
"""
import os
# 1. 根据文件类型选择解析器
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.pdf':
text = extract_pdf_text(file_path)
elif file_ext == '.docx':
text = extract_word_text(file_path)
elif file_ext == '.md':
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
else:
raise ValueError(f"不支持的文件格式:{file_ext}")
# 2. 文本分块
chunks = chunk_by_recursive(text, chunk_size=500, chunk_overlap=50)
# 3. 构建文档 ID(基于文件路径的哈希)
doc_id = hashlib.md5(file_path.encode()).hexdigest()[:8]
# 4. 为每个块添加元数据
processed_chunks = []
for i, chunk_text in enumerate(chunks):
chunk = {
"chunk_id": f"{doc_id}_chunk{i}",
"text": chunk_text,
"metadata": {
"source": os.path.basename(file_path),
"file_path": file_path,
"chunk_index": i,
"total_chunks": len(chunks),
"category": category,
"processed_at": datetime.now().isoformat(),
"char_count": len(chunk_text)
}
}
processed_chunks.append(chunk)
return processed_chunks
# 使用
chunks = process_document("technical_doc.pdf", category="技术文档")
print(f"处理完成,共 {len(chunks)} 个块")
print(f"\n第一个块:")
print(f"ID: {chunks[0]['chunk_id']}")
print(f"文本: {chunks[0]['text'][:100]}...")
print(f"元数据: {chunks[0]['metadata']}")索引到向量数据库(带元数据)
python
import chromadb
from openai import OpenAI
def index_documents_with_metadata(chunks: list[dict], collection_name: str = "documents"):
"""
将处理好的文档块索引到向量数据库
"""
client = OpenAI(api_key="your-api-key")
chroma_client = chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_or_create_collection(name=collection_name)
# 批量生成向量
texts = [chunk["text"] for chunk in chunks]
response = client.embeddings.create(
model="text-embedding-3-small",
input=texts
)
embeddings = [data.embedding for data in response.data]
# 索引
collection.add(
ids=[chunk["chunk_id"] for chunk in chunks],
documents=texts,
embeddings=embeddings,
metadatas=[chunk["metadata"] for chunk in chunks]
)
print(f"✅ 已索引 {len(chunks)} 个文档块到集合 '{collection_name}'")
# 使用
index_documents_with_metadata(chunks)带元数据过滤的检索
python
def search_with_metadata(query: str, category: str = None, source: str = None):
"""
带元数据过滤的检索
"""
chroma_client = chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_collection("documents")
# 构建过滤条件
where = {}
if category:
where["category"] = category
if source:
where["source"] = source
# 检索
results = collection.query(
query_texts=[query],
n_results=3,
where=where if where else None
)
print(f"查询:{query}")
if where:
print(f"过滤条件:{where}")
for i, (doc, metadata) in enumerate(zip(results['documents'][0], results['metadatas'][0])):
print(f"\n结果 {i+1}:")
print(f"来源:{metadata['source']} (第 {metadata['chunk_index']+1}/{metadata['total_chunks']} 块)")
print(f"内容:{doc[:100]}...")
# 使用
search_with_metadata("如何部署模型?", category="技术文档")4.3.4 高级技巧与最佳实践
1. 保留文档结构信息
python
def chunk_with_context(text: str, chunk_size: int = 500) -> list[str]:
"""
分块时保留上下文标题
例如:"## 2.3 模型部署\n\n模型部署需要考虑..."
而不是:"模型部署需要考虑..."
"""
chunks = []
current_headers = [] # 存储当前的标题层级
for line in text.split('\n'):
# 检测标题
if line.startswith('#'):
level = len(line.split()[0])
# 更新标题栈
current_headers = current_headers[:level-1] + [line]
# 每个块都包含完整的标题路径
header_context = '\n'.join(current_headers) + '\n\n'
# ... 分块逻辑
return chunks2. 去除噪声文本
python
import re
def clean_text(text: str) -> str:
"""
清理文档噪声
"""
# 去除多余空白
text = re.sub(r'\s+', ' ', text)
# 去除页眉页脚常见模式
text = re.sub(r'第\s*\d+\s*页', '', text)
text = re.sub(r'Page\s*\d+', '', text, flags=re.IGNORECASE)
# 去除 URL
text = re.sub(r'http[s]?://\S+', '', text)
# 去除邮箱
text = re.sub(r'\S+@\S+\.\S+', '', text)
return text.strip()3. 智能分块长度调整
python
def adaptive_chunk_size(text: str, base_size: int = 500) -> int:
"""
根据文档类型自适应调整分块大小
- 代码文档:较小块(300-400)
- 技术文档:中等块(500-700)
- 叙事文档:较大块(800-1000)
"""
# 检测代码密度
code_ratio = len(re.findall(r'```|def |class |import ', text)) / len(text.split())
if code_ratio > 0.1:
return base_size - 200 # 代码文档
elif code_ratio > 0.05:
return base_size # 技术文档
else:
return base_size + 300 # 叙事文档4. 批量文档处理
python
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
def batch_process_documents(folder_path: str, max_workers: int = 4):
"""
并行处理文件夹中的所有文档
"""
all_chunks = []
file_paths = []
# 收集所有支持的文件
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.endswith(('.pdf', '.docx', '.md', '.txt')):
file_paths.append(os.path.join(root, file))
print(f"发现 {len(file_paths)} 个文档")
# 并行处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {
executor.submit(process_document, fp): fp
for fp in file_paths
}
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
chunks = future.result()
all_chunks.extend(chunks)
print(f"✅ 处理完成:{os.path.basename(file_path)} ({len(chunks)} 块)")
except Exception as e:
print(f"❌ 处理失败:{file_path} - {e}")
return all_chunks
# 使用
chunks = batch_process_documents("./documents", max_workers=4)
print(f"\n总计处理 {len(chunks)} 个文档块")4.3.5 实战建议与学习路径
Week 1:文档解析
- 实现 PDF、Word、Markdown 解析
- 对比不同解析库的效果
- 处理真实文档(公司内部文档、技术书籍)
Week 2:分块策略
- 实现 3 种分块方法(固定、递归、语义)
- 对比不同策略对检索效果的影响
- 针对特定文档类型优化分块参数
Week 3-4:元数据与优化
- 设计元数据 Schema
- 实现完整的文档处理流程
- 性能优化:批量处理、并行解析
最佳实践清单
- ✅ 选择合适的分块大小:中文 400-600 字符,英文 500-800 tokens
- ✅ 必须设置重叠:overlap 至少 10-20% 的 chunk_size
- ✅ 保留文档结构:标题、段落、列表等格式信息
- ✅ 提取关键元数据:source、page、section、timestamp
- ✅ 清理噪声:页眉页脚、特殊字符、格式标记
- ✅ 测试不同策略:用真实查询评估分块效果
常见错误
- ❌ 分块过大(>1000字符)导致检索不精确
- ❌ 无重叠导致关键信息被截断
- ❌ 忽略文档结构导致上下文丢失
- ❌ 不清理噪声导致向量质量下降
- ❌ 缺少元数据无法追溯信息来源
推荐资源
- LangChain Text Splitters:https://python.langchain.com/docs/modules/data_connection/document_transformers/
- Unstructured 文档:https://unstructured-io.github.io/unstructured/
- PyMuPDF 教程:https://pymupdf.readthedocs.io/
- pdfplumber 文档:https://github.com/jsvine/pdfplumber
- python-docx 文档:https://python-docx.readthedocs.io/
性能指标
- 解析速度:>10 页/秒(PDF)
- 分块质量:检索召回率 >80%
- 元数据覆盖率:>90% 的块包含完整元数据
实战项目
项目1:多格式文档解析器
- 支持 PDF、Word、PPT、HTML、Markdown
- 自动识别文档类型
- 提取文本、表格、图片
项目2:智能分块系统
- 实现多种分块策略
- 自适应调整分块参数
- 评估分块质量
项目3:企业文档知识库
- 批量处理公司文档
- 提取完整元数据
- 支持按类别、时间、作者检索
总结
文档处理是 RAG 系统的基础,直接影响最终效果。关键要点:
- 选择合适的解析工具:PyMuPDF(PDF)、python-docx(Word)、Unstructured(通用)
- 优化分块策略:递归分块是通用首选,语义分块质量最高
- 提取丰富元数据:source、page、section 等信息至关重要
- 持续优化:根据实际检索效果调整参数
掌握文档处理技术后,你将能够构建高质量的 RAG 知识库,为后续的检索和生成打下坚实基础。