Skip to content

向量数据库实战(Milvus/Chroma)

从 Embedding 的第一个维度到 RAG 系统的最后一公里——手把手带你玩转向量数据库。


9. 生产级优化——性能、成本、运维

RAG 系统跑起来了,但从"能用"到"好用"还有一段路。这一章我们聊生产环境中最常遇到的问题:数据量上来后怎么优化写入速度、怎么控制成本、数据更新怎么处理、以及从 Chroma 迁移到 Milvus 的实操路径


9.1 写入优化:批量导入与并行策略

当你要导入百万级文档时,逐条插入的效率是灾难性的。

批量插入 vs 逐条插入

逐条插入 vs 批量插入的性能对比:

  插入 10 万条 1024 维向量到 Milvus:

  方式              耗时          QPS
  ───────────────────────────────────────
  逐条插入          ~300s         333/s
  批量 100 条/批    ~30s          3333/s
  批量 1000 条/批   ~10s          10000/s
  批量 5000 条/批   ~8s           12500/s
  批量 10000 条/批  ~7s           14285/s

  → 批量 1000 条 vs 逐条:速度提升 30 倍
  → 批量超过 5000 条后收益递减
  → 推荐每批 1000-5000 条

批量导入实战代码

python
from tqdm import tqdm

def batch_ingest(chunks: list[str], embeddings, collection,
                 batch_size: int = 1000):
    """分批导入数据到 Milvus"""
    total = len(chunks)

    for i in tqdm(range(0, total, batch_size), desc="导入进度"):
        batch_end = min(i + batch_size, total)

        collection.insert([
            chunks[i:batch_end],
            embeddings["dense"][i:batch_end],
            embeddings["sparse"][i:batch_end],
        ])

    collection.flush()
    print(f"导入完成:{total} 条")

Embedding 生成的并行优化

python
import concurrent.futures
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("BAAI/bge-large-zh-v1.5")

def parallel_embedding(texts: list[str], batch_size: int = 256) -> list:
    """分批生成 Embedding,充分利用 GPU/CPU"""

    all_embeddings = []

    # SentenceTransformer 的 encode 本身支持批量
    # 设置合理的 batch_size 可以充分利用 GPU 显存
    embeddings = model.encode(
        texts,
        batch_size=batch_size,       # GPU 批量大小
        normalize_embeddings=True,
        show_progress_bar=True
    )

    return embeddings.tolist()

# 100 万条文本的向量化时间估算:
#   CPU(8 核):~4 小时
#   GPU(RTX 3090):~20 分钟
#   GPU(A100):~8 分钟

Embedding 缓存:避免重复计算

python
import hashlib
import json
import os

class EmbeddingCache:
    """本地文件缓存,避免重复计算 Embedding"""

    def __init__(self, cache_dir: str = "./embedding_cache"):
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)

    def _hash(self, text: str) -> str:
        return hashlib.md5(text.encode()).hexdigest()

    def get(self, text: str) -> list[float] | None:
        path = os.path.join(self.cache_dir, f"{self._hash(text)}.json")
        if os.path.exists(path):
            with open(path, "r") as f:
                return json.load(f)
        return None

    def set(self, text: str, embedding: list[float]):
        path = os.path.join(self.cache_dir, f"{self._hash(text)}.json")
        with open(path, "w") as f:
            json.dump(embedding, f)

    def get_or_compute(self, text: str, model) -> list[float]:
        cached = self.get(text)
        if cached:
            return cached
        embedding = model.encode([text], normalize_embeddings=True)[0].tolist()
        self.set(text, embedding)
        return embedding

# 使用示例
cache = EmbeddingCache()
# 第一次调用:计算 + 缓存
emb = cache.get_or_compute("Python 性能优化", model)
# 第二次调用:直接读缓存,0 计算成本
emb = cache.get_or_compute("Python 性能优化", model)

成本节省:当文档更新频率低但查询频率高时,Embedding 缓存能节省大量计算。特别是使用 OpenAI API 时,缓存能直接降低 API 调用费用。

9.2 查询调优:索引参数与缓存

查询延迟的分解

一次 RAG 查询的延迟分解:

  总延迟 ≈ 900ms
  ─────────────────────────────────────
  │ Embedding 生成     │  30ms  │  3%  │
  │ 向量搜索           │  10ms  │  1%  │
  │ Reranker           │  40ms  │  4%  │
  │ LLM 生成           │ 800ms  │ 89%  │
  │ 网络/序列化         │  20ms  │  2%  │
  ─────────────────────────────────────

  优化优先级:
    1. LLM 延迟(最大头)→ 流式输出、更快的模型
    2. Reranker → GPU 加速、减少候选数
    3. Embedding → 缓存、更小的模型
    4. 向量搜索 → 索引参数调优(通常已经够快)

索引参数调优清单

场景HNSW Mef_search预期延迟召回率
极速模式832~1ms~90%
平衡模式(推荐)16128~3ms~95%
高精度模式32256~8ms~99%
极致精度64512~15ms~99.5%

查询结果缓存

python
from functools import lru_cache
import hashlib

# 方式 1:简单的内存缓存
@lru_cache(maxsize=1000)
def cached_search(query_hash: str) -> list:
    """缓存搜索结果,相同问题直接返回"""
    # 实际搜索逻辑
    pass

def search(query: str) -> list:
    query_hash = hashlib.md5(query.encode()).hexdigest()
    return cached_search(query_hash)


# 方式 2:Redis 缓存(生产推荐)
import redis
import json

redis_client = redis.Redis(host="localhost", port=6379)

def cached_rag_search(query: str, ttl: int = 3600) -> list:
    """Redis 缓存 RAG 搜索结果,TTL 1 小时"""
    cache_key = f"rag:{hashlib.md5(query.encode()).hexdigest()}"

    # 检查缓存
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    # 执行搜索
    results = actual_search(query)

    # 写入缓存
    redis_client.setex(cache_key, ttl, json.dumps(results))
    return results

缓存策略:RAG 系统中最适合缓存的是"热门问题"——通常 20% 的问题占了 80% 的查询量。对这些问题做缓存,可以大幅降低计算成本。

9.3 数据更新策略

知识库不是一次导入就完事的——文档会新增、修改、删除。怎么处理数据更新是生产环境的核心问题。

三种更新策略

策略对比:

  1. 全量重建(最简单,最粗暴)
  ─────────────────────────────────────
    流程:删除旧 Collection → 创建新 Collection → 重新导入全量数据
    优点:逻辑简单,数据绝对一致
    缺点:耗时长,导入期间服务不可用(除非用别名切换)
    适用:数据量小(< 10 万),更新频率低(每天/每周一次)

  2. 增量更新(最常用) ⭐
  ─────────────────────────────────────
    流程:只处理变更的文档
      新增文档 → add
      修改文档 → delete 旧块 + add 新块
      删除文档 → delete
    优点:速度快,不影响现有服务
    缺点:需要维护文档变更的追踪机制
    适用:大多数场景

  3. 双 Collection 蓝绿切换(最安全)
  ─────────────────────────────────────
    流程:
      1. 在 Collection B 中准备新数据
      2. 验证 B 的数据正确性
      3. 用别名(Alias)把流量切到 B
      4. 删除旧的 Collection A
    优点:零停机,可回滚
    缺点:需要双倍资源
    适用:生产环境、不容许服务中断

增量更新实战代码

python
import hashlib
import json
import os

class IncrementalUpdater:
    """增量更新管理器——追踪文档变更,只更新差异部分"""

    def __init__(self, state_file: str = "./update_state.json"):
        self.state_file = state_file
        self.state = self._load_state()

    def _load_state(self) -> dict:
        if os.path.exists(self.state_file):
            with open(self.state_file, "r") as f:
                return json.load(f)
        return {}

    def _save_state(self):
        with open(self.state_file, "w") as f:
            json.dump(self.state, f)

    def _file_hash(self, file_path: str) -> str:
        with open(file_path, "rb") as f:
            return hashlib.md5(f.read()).hexdigest()

    def check_updates(self, file_paths: list[str]) -> dict:
        """检查哪些文件需要更新"""
        changes = {"new": [], "modified": [], "deleted": []}

        current_files = set()
        for path in file_paths:
            current_files.add(path)
            file_hash = self._file_hash(path)

            if path not in self.state:
                changes["new"].append(path)
            elif self.state[path] != file_hash:
                changes["modified"].append(path)

            self.state[path] = file_hash

        # 检查已删除的文件
        for path in list(self.state.keys()):
            if path not in current_files:
                changes["deleted"].append(path)
                del self.state[path]

        self._save_state()
        return changes

    def apply_updates(self, changes, collection, ingest_fn, source_prefix=""):
        """应用变更到向量数据库"""

        # 处理删除
        for path in changes["deleted"]:
            source = source_prefix + os.path.basename(path)
            collection.delete(expr=f'source == "{source}"')
            print(f"  删除:{source}")

        # 处理修改(先删后增)
        for path in changes["modified"]:
            source = source_prefix + os.path.basename(path)
            collection.delete(expr=f'source == "{source}"')
            ingest_fn(path, source=source)
            print(f"  更新:{source}")

        # 处理新增
        for path in changes["new"]:
            source = source_prefix + os.path.basename(path)
            ingest_fn(path, source=source)
            print(f"  新增:{source}")

# 使用示例
updater = IncrementalUpdater()
changes = updater.check_updates(["./docs/file1.md", "./docs/file2.md"])
print(f"新增 {len(changes['new'])} 个,修改 {len(changes['modified'])} 个,删除 {len(changes['deleted'])} 个")
updater.apply_updates(changes, collection, ingest_document_milvus)

9.4 监控、运维与迁移

Milvus 监控(Prometheus + Grafana)

Milvus 内置 Prometheus 指标,开箱可用:

  关键监控指标:
  ─────────────────────────────────────
  │ 指标                    │ 告警阈值          │
  ├────────────────────────┼──────────────────┤
  │ 搜索延迟 P99            │ > 100ms 告警     │
  │ 插入 QPS               │ 按业务需求        │
  │ 内存使用率              │ > 80% 告警       │
  │ 磁盘使用率              │ > 85% 告警       │
  │ Collection 加载状态      │ 未加载则告警      │
  │ Compaction 积压量       │ > 100 段告警     │
  └────────────────────────┴──────────────────┘

  配置方式:
    Milvus 默认暴露 :9091/metrics 端口
    Grafana Dashboard ID:17535(官方模板)

Chroma → Milvus 迁移

当数据量增长到 Chroma 撑不住时,迁移到 Milvus:

python
import chromadb
from pymilvus import connections, Collection

def migrate_chroma_to_milvus(
    chroma_path: str,
    chroma_collection_name: str,
    milvus_collection: Collection,
    batch_size: int = 1000
):
    """从 Chroma 迁移数据到 Milvus"""

    # 连接 Chroma
    chroma_client = chromadb.PersistentClient(path=chroma_path)
    chroma_col = chroma_client.get_collection(chroma_collection_name)

    total = chroma_col.count()
    print(f"待迁移数据量:{total}")

    # 分批读取并写入 Milvus
    for offset in range(0, total, batch_size):
        # 从 Chroma 读取
        results = chroma_col.get(
            limit=batch_size,
            offset=offset,
            include=["documents", "embeddings", "metadatas"]
        )

        # 写入 Milvus(需要根据 Schema 调整字段顺序)
        milvus_collection.insert([
            results["documents"],
            [m.get("source", "") for m in results["metadatas"]],
            results["embeddings"],
        ])

        print(f"  已迁移 {min(offset + batch_size, total)}/{total}")

    milvus_collection.flush()
    print("迁移完成!")

# ⚠️ 注意:迁移后需要重新建索引
# milvus_collection.create_index(...)
# milvus_collection.load()
迁移检查清单:

  □ 确认 Embedding 模型一致(不能换模型!)
  □ 确认向量维度一致
  □ Schema 字段类型匹配
  □ 迁移后重建索引
  □ 运行评测脚本验证检索质量
  □ 切换应用代码的数据库连接
  □ 保留旧数据作为回滚备份(至少 7 天)

迁移时机:当 Chroma 出现以下信号时考虑迁移到 Milvus:查询延迟 > 100ms、数据量 > 50 万、需要多服务并发访问、需要混合检索能力。


本章小结

知识点要点
批量插入每批 1000-5000 条,比逐条快 30 倍
Embedding 缓存文件/Redis 缓存,避免重复计算
延迟瓶颈LLM 生成占 89%,向量搜索只占 1%
索引调优M=16 + ef=128 是推荐起点
查询缓存热门问题的 Redis 缓存,TTL 1 小时
增量更新MD5 追踪文件变更,只更新差异部分
蓝绿切换用别名实现零停机数据更新
监控Prometheus + Grafana,关注延迟 P99 和内存使用率
迁移Chroma → Milvus:分批读写 + 重建索引 + 验证质量

🎉 恭喜! 你已经完成了向量数据库从入门到生产实战的完整学习路径。从 Embedding 的基本概念到构建生产级 RAG 系统,你已经掌握了向量数据库的核心知识和实战技能。接下来,去用你学到的知识构建自己的 AI 应用吧!

坚持是一种品格