4.5 数据工程与 ETL
AI 应用中 60% 的时间在做数据准备——清洗、转换、标准化、质量检查,这才是决定 RAG/微调质量的关键。
学习时长:2-3 周
数据工程在 AI 应用中的位置
原始数据(乱、脏、杂)
↓ Extract(提取)
结构化数据
↓ Transform(清洗/转换)
标准化数据
↓ Load(加载)
向量数据库 / 训练数据集 / 知识库1. 数据采集(Extract)
多源数据接入
python
# 文件系统
from pathlib import Path
files = list(Path("docs/").rglob("*.md")) # 递归扫描
# 数据库
import asyncpg
rows = await conn.fetch("SELECT * FROM articles WHERE status='published'")
# API
import httpx
resp = await client.get("https://api.example.com/data")
# 网页爬取
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.goto(url)
content = await page.content()文件格式处理
python
# PDF → 文本
from pypdf import PdfReader
reader = PdfReader("doc.pdf")
text = "\n".join(page.extract_text() for page in reader.pages)
# Word → 文本
import docx
doc = docx.Document("doc.docx")
text = "\n".join(p.text for p in doc.paragraphs)
# Excel → 结构化数据
import pandas as pd
df = pd.read_excel("data.xlsx", sheet_name="Sheet1")
# HTML → 清洁文本
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, "html.parser")
text = soup.get_text(separator="\n", strip=True)2. 数据清洗(Transform)
文本清洗
python
import re
def clean_text(text: str) -> str:
"""通用文本清洗管线"""
# 1. 统一换行符
text = text.replace("\r\n", "\n").replace("\r", "\n")
# 2. 去除多余空白
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r" {2,}", " ", text)
# 3. 去除特殊字符
text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text)
# 4. 去除页眉页脚(PDF 常见)
text = re.sub(r"第\s*\d+\s*页.*?共\s*\d+\s*页", "", text)
return text.strip()PII 脱敏
python
import re
PII_PATTERNS = {
"phone": (r"1[3-9]\d{9}", "***手机号***"),
"id_card": (r"\d{17}[\dXx]", "***身份证***"),
"email": (r"[\w.]+@[\w.]+\.\w+", "***邮箱***"),
"bank_card": (r"\d{16,19}", "***银行卡***"),
}
def desensitize(text: str) -> str:
for name, (pattern, replacement) in PII_PATTERNS.items():
text = re.sub(pattern, replacement, text)
return text元数据提取与标准化
python
from pydantic import BaseModel
from datetime import datetime
class DocumentMetadata(BaseModel):
source: str # 来源文件路径
title: str # 文档标题
author: str | None
created_at: datetime
doc_type: str # pdf/md/html/xlsx
word_count: int
language: str # zh/en
tags: list[str]
def extract_metadata(file_path: str, content: str) -> DocumentMetadata:
return DocumentMetadata(
source=file_path,
title=extract_title(content),
author=extract_author(content),
created_at=get_file_mtime(file_path),
doc_type=Path(file_path).suffix.lstrip("."),
word_count=len(content),
language=detect_language(content),
tags=extract_tags(content)
)3. 数据质量检查
python
from dataclasses import dataclass
@dataclass
class QualityReport:
total: int
passed: int
failed: int
issues: list[str]
def quality_check(documents: list[dict]) -> QualityReport:
issues = []
passed = 0
for i, doc in enumerate(documents):
# 检查 1:内容不能为空
if not doc.get("content", "").strip():
issues.append(f"文档 {i}: 内容为空")
continue
# 检查 2:内容不能太短
if len(doc["content"]) < 50:
issues.append(f"文档 {i}: 内容过短 ({len(doc['content'])} 字)")
continue
# 检查 3:去重(标题或内容高度重复)
if is_duplicate(doc, documents[:i]):
issues.append(f"文档 {i}: 与已有文档重复")
continue
# 检查 4:编码问题
if has_encoding_issues(doc["content"]):
issues.append(f"文档 {i}: 存在编码乱码")
continue
passed += 1
return QualityReport(
total=len(documents), passed=passed,
failed=len(documents) - passed, issues=issues
)4. ETL Pipeline 实战
python
import asyncio
from pathlib import Path
async def etl_pipeline(source_dir: str, target: str = "vectordb"):
"""完整的 ETL 管线"""
# === Extract ===
print("📥 Step 1: 数据提取")
raw_docs = []
for file in Path(source_dir).rglob("*"):
if file.suffix in [".md", ".pdf", ".txt", ".html"]:
content = extract_content(file)
metadata = extract_metadata(str(file), content)
raw_docs.append({"content": content, "metadata": metadata})
print(f" 提取 {len(raw_docs)} 个文档")
# === Transform ===
print("🔧 Step 2: 数据清洗")
cleaned = []
for doc in raw_docs:
doc["content"] = clean_text(doc["content"])
doc["content"] = desensitize(doc["content"])
cleaned.append(doc)
# 质量检查
report = quality_check(cleaned)
print(f" 质量检查: {report.passed}/{report.total} 通过")
if report.issues:
for issue in report.issues[:5]:
print(f" ⚠️ {issue}")
# 过滤掉不合格的
valid_docs = [d for d in cleaned if len(d["content"]) >= 50]
# === Load ===
print("📤 Step 3: 数据加载")
if target == "vectordb":
chunks = chunk_documents(valid_docs)
await load_to_vectordb(chunks)
print(f" 写入 {len(chunks)} 个分块到向量数据库")
elif target == "training":
pairs = format_as_training_data(valid_docs)
save_jsonl(pairs, "training_data.jsonl")
print(f" 导出 {len(pairs)} 条训练数据")
print("✅ ETL 完成")
# 运行
asyncio.run(etl_pipeline("./raw_data", target="vectordb"))数据工程最佳实践
✅ 做:
- 数据版本控制(DVC / Git LFS)
- 每次 ETL 生成质量报告
- PII 脱敏后再进入 LLM 管线
- 保留原始数据(可回溯)
- 增量更新而非全量重建
❌ 不做:
- 不做质量检查直接入库
- 不处理编码问题(乱码会污染 Embedding)
- 不去重(重复数据影响检索质量)
- 不做元数据标注(无法过滤检索)学习资源
- LangChain Document Loaders
- Unstructured.io — 通用文档解析
- DVC 文档 — 数据版本控制