Skip to content

4.5 数据工程与 ETL

AI 应用中 60% 的时间在做数据准备——清洗、转换、标准化、质量检查,这才是决定 RAG/微调质量的关键。

学习时长:2-3 周


数据工程在 AI 应用中的位置

原始数据(乱、脏、杂)
    ↓ Extract(提取)
结构化数据
    ↓ Transform(清洗/转换)
标准化数据
    ↓ Load(加载)
向量数据库 / 训练数据集 / 知识库

1. 数据采集(Extract)

多源数据接入

python
# 文件系统
from pathlib import Path
files = list(Path("docs/").rglob("*.md"))  # 递归扫描

# 数据库
import asyncpg
rows = await conn.fetch("SELECT * FROM articles WHERE status='published'")

# API
import httpx
resp = await client.get("https://api.example.com/data")

# 网页爬取
from playwright.async_api import async_playwright
async with async_playwright() as p:
    browser = await p.chromium.launch()
    page = await browser.new_page()
    await page.goto(url)
    content = await page.content()

文件格式处理

python
# PDF → 文本
from pypdf import PdfReader
reader = PdfReader("doc.pdf")
text = "\n".join(page.extract_text() for page in reader.pages)

# Word → 文本
import docx
doc = docx.Document("doc.docx")
text = "\n".join(p.text for p in doc.paragraphs)

# Excel → 结构化数据
import pandas as pd
df = pd.read_excel("data.xlsx", sheet_name="Sheet1")

# HTML → 清洁文本
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, "html.parser")
text = soup.get_text(separator="\n", strip=True)

2. 数据清洗(Transform)

文本清洗

python
import re

def clean_text(text: str) -> str:
    """通用文本清洗管线"""
    # 1. 统一换行符
    text = text.replace("\r\n", "\n").replace("\r", "\n")
    
    # 2. 去除多余空白
    text = re.sub(r"\n{3,}", "\n\n", text)
    text = re.sub(r" {2,}", " ", text)
    
    # 3. 去除特殊字符
    text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text)
    
    # 4. 去除页眉页脚(PDF 常见)
    text = re.sub(r"第\s*\d+\s*.*?\s*\d+\s*页", "", text)
    
    return text.strip()

PII 脱敏

python
import re

PII_PATTERNS = {
    "phone": (r"1[3-9]\d{9}", "***手机号***"),
    "id_card": (r"\d{17}[\dXx]", "***身份证***"),
    "email": (r"[\w.]+@[\w.]+\.\w+", "***邮箱***"),
    "bank_card": (r"\d{16,19}", "***银行卡***"),
}

def desensitize(text: str) -> str:
    for name, (pattern, replacement) in PII_PATTERNS.items():
        text = re.sub(pattern, replacement, text)
    return text

元数据提取与标准化

python
from pydantic import BaseModel
from datetime import datetime

class DocumentMetadata(BaseModel):
    source: str           # 来源文件路径
    title: str            # 文档标题
    author: str | None
    created_at: datetime
    doc_type: str         # pdf/md/html/xlsx
    word_count: int
    language: str         # zh/en
    tags: list[str]

def extract_metadata(file_path: str, content: str) -> DocumentMetadata:
    return DocumentMetadata(
        source=file_path,
        title=extract_title(content),
        author=extract_author(content),
        created_at=get_file_mtime(file_path),
        doc_type=Path(file_path).suffix.lstrip("."),
        word_count=len(content),
        language=detect_language(content),
        tags=extract_tags(content)
    )

3. 数据质量检查

python
from dataclasses import dataclass

@dataclass
class QualityReport:
    total: int
    passed: int
    failed: int
    issues: list[str]

def quality_check(documents: list[dict]) -> QualityReport:
    issues = []
    passed = 0
    
    for i, doc in enumerate(documents):
        # 检查 1:内容不能为空
        if not doc.get("content", "").strip():
            issues.append(f"文档 {i}: 内容为空")
            continue
        
        # 检查 2:内容不能太短
        if len(doc["content"]) < 50:
            issues.append(f"文档 {i}: 内容过短 ({len(doc['content'])} 字)")
            continue
        
        # 检查 3:去重(标题或内容高度重复)
        if is_duplicate(doc, documents[:i]):
            issues.append(f"文档 {i}: 与已有文档重复")
            continue
        
        # 检查 4:编码问题
        if has_encoding_issues(doc["content"]):
            issues.append(f"文档 {i}: 存在编码乱码")
            continue
        
        passed += 1
    
    return QualityReport(
        total=len(documents), passed=passed,
        failed=len(documents) - passed, issues=issues
    )

4. ETL Pipeline 实战

python
import asyncio
from pathlib import Path

async def etl_pipeline(source_dir: str, target: str = "vectordb"):
    """完整的 ETL 管线"""
    
    # === Extract ===
    print("📥 Step 1: 数据提取")
    raw_docs = []
    for file in Path(source_dir).rglob("*"):
        if file.suffix in [".md", ".pdf", ".txt", ".html"]:
            content = extract_content(file)
            metadata = extract_metadata(str(file), content)
            raw_docs.append({"content": content, "metadata": metadata})
    print(f"   提取 {len(raw_docs)} 个文档")
    
    # === Transform ===
    print("🔧 Step 2: 数据清洗")
    cleaned = []
    for doc in raw_docs:
        doc["content"] = clean_text(doc["content"])
        doc["content"] = desensitize(doc["content"])
        cleaned.append(doc)
    
    # 质量检查
    report = quality_check(cleaned)
    print(f"   质量检查: {report.passed}/{report.total} 通过")
    if report.issues:
        for issue in report.issues[:5]:
            print(f"   ⚠️ {issue}")
    
    # 过滤掉不合格的
    valid_docs = [d for d in cleaned if len(d["content"]) >= 50]
    
    # === Load ===
    print("📤 Step 3: 数据加载")
    if target == "vectordb":
        chunks = chunk_documents(valid_docs)
        await load_to_vectordb(chunks)
        print(f"   写入 {len(chunks)} 个分块到向量数据库")
    elif target == "training":
        pairs = format_as_training_data(valid_docs)
        save_jsonl(pairs, "training_data.jsonl")
        print(f"   导出 {len(pairs)} 条训练数据")
    
    print("✅ ETL 完成")

# 运行
asyncio.run(etl_pipeline("./raw_data", target="vectordb"))

数据工程最佳实践

✅ 做:
  - 数据版本控制(DVC / Git LFS)
  - 每次 ETL 生成质量报告
  - PII 脱敏后再进入 LLM 管线
  - 保留原始数据(可回溯)
  - 增量更新而非全量重建

❌ 不做:
  - 不做质量检查直接入库
  - 不处理编码问题(乱码会污染 Embedding)
  - 不去重(重复数据影响检索质量)
  - 不做元数据标注(无法过滤检索)

学习资源

坚持是一种品格