Skip to content

7.3 性能优化

LLM 应用的性能瓶颈不在 CPU,而在 Token 成本、API 延迟和并发吞吐。优化的核心是「少调用、快响应、省成本」。

学习时长:2-3 周


LLM 应用的性能优化目标是三个维度的平衡:低延迟(用户体验)、低成本(API 费用)、高吞吐(服务容量)。三者往往相互制约,需要结合业务场景做取舍。本节覆盖缓存、并发、Token 压缩到可观测性的完整工程实践。


7.3.1 缓存策略

缓存层次架构

请求 → [L1: 精确匹配缓存] → [L2: 语义相似缓存] → [L3: Prompt 前缀缓存] → LLM API
         (Redis, ~1ms)       (向量检索, ~10ms)      (API 侧, 自动)         (500-3000ms)

1. 精确匹配缓存(Redis)

python
# pip install redis orjson hashlib

import redis
import hashlib
import orjson
import time
from typing import Optional

redis_client = redis.Redis(host="localhost", port=6379, db=4, decode_responses=False)

class ExactMatchCache:
    """精确匹配缓存:相同输入 → 直接返回缓存结果"""

    def __init__(self, ttl: int = 3600, max_input_length: int = 500):
        self.ttl = ttl
        self.max_input_length = max_input_length
        self.prefix = "llm:exact:"

    def _make_key(self, messages: list, model: str, temperature: float) -> str:
        """生成缓存 key(消息内容 + 模型参数的哈希)"""
        # temperature=0 的确定性输出才适合缓存
        payload = orjson.dumps({
            "messages": messages,
            "model": model,
            "temperature": temperature
        }, option=orjson.OPT_SORT_KEYS)
        return self.prefix + hashlib.sha256(payload).hexdigest()

    def get(self, messages: list, model: str, temperature: float = 0) -> Optional[dict]:
        if temperature > 0.1:   # 有随机性的输出不缓存
            return None
        key = self._make_key(messages, model, temperature)
        raw = redis_client.get(key)
        if raw:
            redis_client.expire(key, self.ttl)  # 续期
            return orjson.loads(raw)
        return None

    def set(self, messages: list, model: str, temperature: float,
            response: dict, usage: dict):
        if temperature > 0.1:
            return
        key = self._make_key(messages, model, temperature)
        data = {
            "response": response,
            "usage": usage,
            "cached_at": int(time.time()),
            "hit_count": 0
        }
        redis_client.setex(key, self.ttl, orjson.dumps(data))

    def increment_hit(self, messages: list, model: str, temperature: float):
        key = self._make_key(messages, model, temperature)
        redis_client.hincrby(key, "hit_count", 1)

exact_cache = ExactMatchCache(ttl=7200)

2. 语义相似缓存

python
# pip install numpy openai redis

import numpy as np
from openai import OpenAI

client = OpenAI()

class SemanticCache:
    """语义缓存:相似问题复用答案(已在 7.1 中介绍,此处扩展为生产级版本)"""

    def __init__(
        self,
        similarity_threshold: float = 0.92,
        ttl: int = 3600,
        max_cache_size: int = 10000
    ):
        self.threshold = similarity_threshold
        self.ttl = ttl
        self.max_cache_size = max_cache_size
        self.prefix = "llm:semantic:"
        self.index_key = "llm:semantic:index"

    def _embed(self, text: str) -> np.ndarray:
        """获取文本向量"""
        resp = client.embeddings.create(
            model="text-embedding-3-small",
            input=text[:2000]           # 限制长度,控制成本
        )
        return np.array(resp.data[0].embedding, dtype=np.float32)

    def _cosine_sim(self, a: np.ndarray, b: np.ndarray) -> float:
        return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-8))

    def get(self, query: str) -> Optional[tuple[str, float]]:
        """查找语义相似答案,返回 (答案, 相似度)"""
        query_vec = self._embed(query)

        # 获取所有缓存条目(生产环境应使用向量数据库)
        keys = redis_client.lrange(self.index_key, 0, -1)
        best_key, best_sim = None, 0.0

        for key in keys:
            raw = redis_client.get(key)
            if not raw:
                continue
            entry = orjson.loads(raw)
            cached_vec = np.array(entry["embedding"], dtype=np.float32)
            sim = self._cosine_sim(query_vec, cached_vec)
            if sim > best_sim:
                best_sim, best_key = sim, key

        if best_sim >= self.threshold and best_key:
            entry = orjson.loads(redis_client.get(best_key))
            redis_client.expire(best_key, self.ttl)
            return entry["response"], best_sim

        return None, 0.0

    def set(self, query: str, response: str):
        """写入缓存"""
        import hashlib
        key = self.prefix + hashlib.md5(query.encode()).hexdigest()
        vec = self._embed(query)
        entry = {
            "query": query[:200],
            "embedding": vec.tolist(),
            "response": response,
            "created_at": int(time.time())
        }
        redis_client.setex(key, self.ttl, orjson.dumps(entry))

        # 维护索引(限制缓存数量)
        redis_client.lpush(self.index_key, key)
        redis_client.ltrim(self.index_key, 0, self.max_cache_size - 1)

semantic_cache = SemanticCache(similarity_threshold=0.92)

3. OpenAI Prompt 缓存(自动,无需额外代码)

python
# OpenAI 对超过 1024 tokens 的相同前缀自动缓存,费率降低 50%
# 利用规则:将静态内容(系统提示、RAG 文档)放在消息最前面

def build_cache_friendly_messages(
    system_prompt: str,
    retrieved_docs: list[str],
    conversation_history: list[dict],
    user_message: str
) -> list[dict]:
    """
    构建对 Prompt Cache 友好的消息结构
    原则:静态内容(可复用)放前面,动态内容(每次不同)放后面
    """
    messages = []

    # 1. 系统提示(完全静态,每次相同)→ 最先,最易命中缓存
    messages.append({
        "role": "system",
        "content": system_prompt   # 确保这部分内容不变
    })

    # 2. RAG 检索文档(同一知识库中相同文档会复用缓存)
    if retrieved_docs:
        docs_text = "\n\n".join([f"[文档{i+1}]\n{doc}" for i, doc in enumerate(retrieved_docs)])
        messages.append({
            "role": "user",
            "content": f"参考以下文档回答问题:\n\n{docs_text}"
        })
        messages.append({
            "role": "assistant",
            "content": "好的,我已阅读参考文档,请提问。"
        })

    # 3. 对话历史(相对静态,追加模式下前几轮可命中缓存)
    messages.extend(conversation_history[-6:])  # 保留最近3轮

    # 4. 当前用户输入(完全动态)→ 最后
    messages.append({"role": "user", "content": user_message})

    return messages

# Anthropic Claude 显式 Prompt Cache(cost↓74%,延迟↓85%)
def build_claude_cached_messages(system_prompt: str, docs: list[str], question: str):
    """Claude 显式标记缓存断点"""
    import anthropic
    aclient = anthropic.Anthropic()

    system_blocks = [
        {
            "type": "text",
            "text": system_prompt,
            "cache_control": {"type": "ephemeral"}  # 标记此处为缓存断点
        }
    ]

    user_content = []
    if docs:
        docs_text = "\n\n".join(docs)
        user_content.append({
            "type": "text",
            "text": f"参考文档:\n{docs_text}",
            "cache_control": {"type": "ephemeral"}  # 文档内容也缓存
        })
    user_content.append({"type": "text", "text": question})

    response = aclient.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        system=system_blocks,
        messages=[{"role": "user", "content": user_content}]
    )

    # 查看缓存命中情况
    usage = response.usage
    print(f"缓存读取 tokens: {getattr(usage, 'cache_read_input_tokens', 0)}")
    print(f"缓存创建 tokens: {getattr(usage, 'cache_creation_input_tokens', 0)}")
    return response.content[0].text

7.3.2 并发与异步处理

1. 异步批量 LLM 调用

python
# pip install openai asyncio

import asyncio
from openai import AsyncOpenAI
from typing import Any

aclient = AsyncOpenAI()

async def call_llm_single(
    messages: list,
    model: str = "gpt-4o-mini",
    semaphore: asyncio.Semaphore = None,
    **kwargs
) -> dict:
    """单个异步 LLM 调用(带信号量控制并发)"""
    if semaphore:
        async with semaphore:
            response = await aclient.chat.completions.create(
                model=model, messages=messages, **kwargs
            )
    else:
        response = await aclient.chat.completions.create(
            model=model, messages=messages, **kwargs
        )
    return {
        "content": response.choices[0].message.content,
        "usage": response.usage.model_dump()
    }

async def batch_llm_calls(
    batch: list[dict],         # [{"messages": [...], "meta": {...}}, ...]
    model: str = "gpt-4o-mini",
    max_concurrency: int = 10, # 最大并发数(避免触发限流)
    **kwargs
) -> list[dict]:
    """并发批量 LLM 调用(控制并发上限)"""
    semaphore = asyncio.Semaphore(max_concurrency)

    async def call_with_meta(item: dict) -> dict:
        result = await call_llm_single(
            item["messages"], model, semaphore, **kwargs
        )
        return {**result, "meta": item.get("meta", {})}

    tasks = [call_with_meta(item) for item in batch]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # 处理异常
    output = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            output.append({"error": str(result), "meta": batch[i].get("meta", {})})
        else:
            output.append(result)

    return output

# 使用示例:批量翻译
async def batch_translate(texts: list[str], target_lang: str = "英文") -> list[str]:
    batch = [
        {
            "messages": [{"role": "user",
                          "content": f"将以下内容翻译为{target_lang},只输出译文:\n{text}"}],
            "meta": {"index": i, "original": text}
        }
        for i, text in enumerate(texts)
    ]

    results = await batch_llm_calls(batch, model="gpt-4o-mini", max_concurrency=8)

    # 按原始顺序排列结果
    ordered = sorted(results, key=lambda x: x["meta"].get("index", 0))
    return [r.get("content", r.get("error", "")) for r in ordered]

# 运行
texts = ["你好", "人工智能很有趣", "今天天气不错"]
translations = asyncio.run(batch_translate(texts))

2. 连接池与客户端复用

python
import httpx
from openai import AsyncOpenAI
from contextlib import asynccontextmanager

# 全局复用的 HTTP 连接池(避免每次请求建立新连接)
_http_client: httpx.AsyncClient | None = None
_openai_client: AsyncOpenAI | None = None

def get_http_client() -> httpx.AsyncClient:
    global _http_client
    if _http_client is None or _http_client.is_closed:
        _http_client = httpx.AsyncClient(
            timeout=httpx.Timeout(60.0, connect=5.0),
            limits=httpx.Limits(
                max_connections=100,
                max_keepalive_connections=20,
                keepalive_expiry=30
            )
        )
    return _http_client

def get_openai_client() -> AsyncOpenAI:
    global _openai_client
    if _openai_client is None:
        _openai_client = AsyncOpenAI(
            http_client=get_http_client(),
            max_retries=2,
            timeout=60.0
        )
    return _openai_client

# FastAPI 生命周期中初始化和清理
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时预热连接池
    get_openai_client()
    yield
    # 关闭时释放连接
    if _http_client and not _http_client.is_closed:
        await _http_client.aclose()

3. 请求队列与背压控制

python
import asyncio
from dataclasses import dataclass, field
from typing import Callable

@dataclass
class QueuedRequest:
    request_id: str
    messages: list
    model: str
    priority: int = 0          # 优先级(数值越大越优先)
    future: asyncio.Future = field(default_factory=asyncio.get_event_loop().create_future)
    created_at: float = field(default_factory=asyncio.get_event_loop().time)

class LLMRequestQueue:
    """LLM 请求队列:控制并发、支持优先级、防止过载"""

    def __init__(
        self,
        max_concurrency: int = 10,
        max_queue_size: int = 100,
        timeout: float = 60.0
    ):
        self.max_concurrency = max_concurrency
        self.max_queue_size = max_queue_size
        self.timeout = timeout
        self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue(max_queue_size)
        self._semaphore = asyncio.Semaphore(max_concurrency)
        self._running = False

    async def start(self):
        self._running = True
        asyncio.create_task(self._worker())

    async def _worker(self):
        while self._running:
            try:
                _, req = await asyncio.wait_for(self._queue.get(), timeout=1.0)
            except asyncio.TimeoutError:
                continue

            asyncio.create_task(self._process(req))

    async def _process(self, req: QueuedRequest):
        async with self._semaphore:
            try:
                client = get_openai_client()
                response = await asyncio.wait_for(
                    client.chat.completions.create(
                        model=req.model, messages=req.messages
                    ),
                    timeout=self.timeout
                )
                req.future.set_result(response.choices[0].message.content)
            except Exception as e:
                req.future.set_exception(e)

    async def submit(self, messages: list, model: str = "gpt-4o-mini",
                     priority: int = 0) -> str:
        """提交请求到队列,返回结果"""
        if self._queue.full():
            raise RuntimeError("请求队列已满,请稍后重试")

        loop = asyncio.get_event_loop()
        req = QueuedRequest(
            request_id=str(id(messages)),
            messages=messages,
            model=model,
            priority=priority,
            future=loop.create_future()
        )
        await self._queue.put((-priority, req))   # 负号实现最大优先队列
        return await req.future

request_queue = LLMRequestQueue(max_concurrency=10, max_queue_size=100)

7.3.3 Token 用量优化与成本控制

1. 上下文压缩

python
from openai import OpenAI
import tiktoken

client = OpenAI()

def count_tokens(messages: list, model: str = "gpt-4o") -> int:
    """精确计算消息列表的 token 数"""
    try:
        enc = tiktoken.encoding_for_model(model)
    except KeyError:
        enc = tiktoken.get_encoding("cl100k_base")

    num_tokens = 3  # 每条消息的固定开销
    for msg in messages:
        num_tokens += 4  # role + content 的 overhead
        content = msg.get("content", "")
        if isinstance(content, str):
            num_tokens += len(enc.encode(content))
        elif isinstance(content, list):
            for block in content:
                if block.get("type") == "text":
                    num_tokens += len(enc.encode(block.get("text", "")))
    return num_tokens

def compress_conversation_history(
    messages: list,
    max_tokens: int = 3000,
    model: str = "gpt-4o-mini",
    keep_recent_turns: int = 3     # 始终保留最近 N 轮原文
) -> list:
    """
    超过 token 限制时压缩历史对话:
    策略:保留系统提示 + 最近几轮 + 摘要早期对话
    """
    if not messages:
        return messages

    system_msgs = [m for m in messages if m["role"] == "system"]
    non_system = [m for m in messages if m["role"] != "system"]

    # 如果未超限,直接返回
    if count_tokens(messages, model) <= max_tokens:
        return messages

    # 保留最近 N 轮(keep_recent_turns * 2 条消息)
    recent = non_system[-(keep_recent_turns * 2):]
    older = non_system[:-(keep_recent_turns * 2)]

    if not older:
        return messages  # 无法再压缩

    # 对早期对话生成摘要
    older_text = "\n".join([
        f"{'用户' if m['role']=='user' else 'AI'}: {m['content'][:200]}"
        for m in older
    ])

    summary_response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "请用3-5句话总结以下对话的关键信息,保留重要事实:"},
            {"role": "user", "content": older_text}
        ],
        max_tokens=300
    )
    summary = summary_response.choices[0].message.content

    # 构建压缩后的消息列表
    summary_msg = {
        "role": "system",
        "content": f"[早期对话摘要]\n{summary}"
    }

    compressed = system_msgs + [summary_msg] + recent
    print(f"历史压缩: {count_tokens(messages)}{count_tokens(compressed)} tokens")
    return compressed

def trim_system_prompt(system_prompt: str, max_chars: int = 2000) -> str:
    """裁剪过长的系统提示词"""
    if len(system_prompt) <= max_chars:
        return system_prompt
    # 保留开头和结尾(通常最重要)
    half = max_chars // 2
    return system_prompt[:half] + "\n...[已省略中间部分]...\n" + system_prompt[-half:]

2. 成本追踪与预算告警

python
import redis
import time
from dataclasses import dataclass

# 各模型每 1K token 价格(美元,2025年参考价)
MODEL_PRICING = {
    "gpt-4o":               {"input": 0.005,   "output": 0.015},
    "gpt-4o-mini":          {"input": 0.00015, "output": 0.0006},
    "claude-3-5-sonnet-20241022": {"input": 0.003, "output": 0.015},
    "claude-3-5-haiku-20241022":  {"input": 0.0008, "output": 0.004},
    "qwen-plus":            {"input": 0.0004,  "output": 0.0012},
    "qwen-turbo":           {"input": 0.00008, "output": 0.0002},
}

redis_client = redis.Redis(host="localhost", port=6379, db=5)

@dataclass
class UsageRecord:
    model: str
    input_tokens: int
    output_tokens: int
    cost_usd: float
    user_id: str
    timestamp: float

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    """计算 API 调用成本(美元)"""
    pricing = MODEL_PRICING.get(model, {"input": 0.01, "output": 0.03})
    return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1000

def record_usage(user_id: str, model: str, input_tokens: int, output_tokens: int):
    """记录用量并检查预算"""
    cost = calculate_cost(model, input_tokens, output_tokens)
    today = time.strftime("%Y-%m-%d")

    # 累计用量
    pipe = redis_client.pipeline()
    pipe.incrbyfloat(f"cost:user:{user_id}:{today}", cost)
    pipe.incrbyfloat(f"cost:global:{today}", cost)
    pipe.incrby(f"tokens:user:{user_id}:{today}", input_tokens + output_tokens)
    pipe.expire(f"cost:user:{user_id}:{today}", 86400 * 7)
    pipe.expire(f"cost:global:{today}", 86400 * 30)
    pipe.execute()

    # 预算告警
    daily_cost = float(redis_client.get(f"cost:user:{user_id}:{today}") or 0)
    _check_budget_alert(user_id, daily_cost)

    return cost

BUDGET_THRESHOLDS = {
    "free": 0.1,       # 免费用户日预算 $0.1
    "pro": 5.0,        # 付费用户日预算 $5
    "enterprise": 100  # 企业用户日预算 $100
}

def _check_budget_alert(user_id: str, current_cost: float):
    """检查是否超出预算阈值"""
    user_tier = "free"  # 实际从数据库获取
    threshold = BUDGET_THRESHOLDS.get(user_tier, 0.1)

    if current_cost > threshold * 0.8:
        print(f"⚠️  用户 {user_id} 已使用 {current_cost/threshold*100:.0f}% 的日预算")

    if current_cost > threshold:
        raise RuntimeError(f"已超出日预算限制(${threshold}),请升级套餐或明天再试")

def get_cost_report(user_id: str, days: int = 7) -> dict:
    """获取用户用量报告"""
    report = {}
    for i in range(days):
        date = time.strftime("%Y-%m-%d", time.localtime(time.time() - i * 86400))
        cost = float(redis_client.get(f"cost:user:{user_id}:{date}") or 0)
        tokens = int(redis_client.get(f"tokens:user:{user_id}:{date}") or 0)
        report[date] = {"cost_usd": round(cost, 4), "tokens": tokens}
    return report

3. Prompt 工程优化(减少 Token)

python
def optimize_prompt_tokens(
    system_prompt: str,
    user_message: str,
    remove_filler: bool = True,
    use_abbreviations: bool = True
) -> tuple[str, str]:
    """
    Token 优化技巧(在保持语义的前提下减少 token 数)
    """
    import re

    if remove_filler:
        # 移除冗余的礼貌用语和填充词(系统提示词中)
        filler_patterns = [
            r"请注意,?",
            r"请记住,?",
            r"你必须始终",
            r"在任何情况下都",
            r"非常重要的是,?",
        ]
        for pattern in filler_patterns:
            system_prompt = re.sub(pattern, "", system_prompt)

        # 压缩多余空行
        system_prompt = re.sub(r"\n{3,}", "\n\n", system_prompt.strip())

    if use_abbreviations:
        # 示例:长描述替换为简洁版本
        replacements = {
            "你是一个非常专业且经验丰富的": "你是专业的",
            "请确保你的回答": "回答需",
            "以清晰、结构化的方式": "清晰结构化地",
        }
        for long, short in replacements.items():
            system_prompt = system_prompt.replace(long, short)

    return system_prompt.strip(), user_message.strip()

def select_model_by_complexity(user_message: str, context_tokens: int) -> str:
    """
    根据问题复杂度动态选择模型(节省成本)
    简单问题用便宜模型,复杂问题用强模型
    """
    import re

    # 简单问题特征(直接用小模型)
    simple_patterns = [
        r"^(你好|hello|hi|谢谢|感谢).*$",
        r"^(|||不对|好的|).{0,5}$",
        r"^\d+[\+\-\*\/]\d+",           # 简单计算
        r"^(翻译|translate).{0,20}$",    # 短翻译
    ]
    for p in simple_patterns:
        if re.match(p, user_message.strip(), re.IGNORECASE):
            return "gpt-4o-mini"

    # 复杂问题特征(用强模型)
    complex_signals = [
        "代码", "分析", "设计", "架构", "推理", "证明",
        "比较", "评估", "优化", "debug", "为什么"
    ]
    complexity_score = sum(1 for s in complex_signals if s in user_message)

    if context_tokens > 8000 or complexity_score >= 3 or len(user_message) > 1000:
        return "gpt-4o"
    elif complexity_score >= 1 or len(user_message) > 200:
        return "gpt-4o-mini"
    else:
        return "gpt-4o-mini"

7.3.4 监控与可观测性

1. Langfuse 集成(开源 LLM 可观测平台)

python
# pip install langfuse openai

from langfuse import Langfuse
from langfuse.openai import openai  # 自动 patch OpenAI 客户端
import time

langfuse = Langfuse(
    public_key="pk-lf-...",
    secret_key="sk-lf-...",
    host="https://cloud.langfuse.com"  # 或自托管地址
)

# 方式一:自动追踪(使用 langfuse.openai 替换标准 openai)
def chat_with_tracing(user_id: str, session_id: str, message: str) -> str:
    """自动追踪的对话调用"""
    response = openai.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": message}],
        # Langfuse 元数据
        name="user-chat",
        metadata={"user_id": user_id, "session_id": session_id},
        tags=["production", "chat"],
        user_id=user_id,
        session_id=session_id
    )
    return response.choices[0].message.content

# 方式二:手动追踪(更精细的控制)
def rag_pipeline_with_tracing(question: str, user_id: str) -> str:
    """手动追踪 RAG 流水线每个步骤"""
    trace = langfuse.trace(
        name="rag-pipeline",
        user_id=user_id,
        input={"question": question},
        tags=["rag", "production"]
    )

    # 追踪向量检索步骤
    retrieval_span = trace.span(name="vector-retrieval")
    t0 = time.time()
    # ... 执行向量检索 ...
    docs = ["文档1", "文档2"]
    retrieval_span.end(
        output={"docs_count": len(docs), "latency_ms": int((time.time()-t0)*1000)}
    )

    # 追踪 LLM 生成步骤
    llm_gen = trace.generation(
        name="llm-generation",
        model="gpt-4o-mini",
        input=[{"role": "user", "content": question}],
    )
    from openai import OpenAI as RawOpenAI
    raw_client = RawOpenAI()
    response = raw_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": question}]
    )
    answer = response.choices[0].message.content

    llm_gen.end(
        output=answer,
        usage={
            "input": response.usage.prompt_tokens,
            "output": response.usage.completion_tokens
        }
    )

    # 记录整体 trace 输出
    trace.update(output={"answer": answer})
    return answer

# 添加用户反馈(用于评估模型质量)
def record_user_feedback(trace_id: str, score: int, comment: str = ""):
    """记录用户点赞/点踩(1=好, 0=差)"""
    langfuse.score(
        trace_id=trace_id,
        name="user-feedback",
        value=score,
        comment=comment
    )

2. 自定义 Prometheus 指标

python
# pip install prometheus-client

from prometheus_client import (
    Counter, Histogram, Gauge, Summary,
    start_http_server, REGISTRY
)
import functools
import time

# 定义指标
llm_requests_total = Counter(
    "llm_requests_total",
    "LLM API 请求总数",
    ["model", "status"]        # labels: 成功/失败 × 模型
)

llm_latency_seconds = Histogram(
    "llm_latency_seconds",
    "LLM API 响应延迟(秒)",
    ["model"],
    buckets=[0.5, 1, 2, 5, 10, 30, 60]
)

llm_tokens_total = Counter(
    "llm_tokens_total",
    "累计消耗 token 数",
    ["model", "type"]          # type: input / output
)

llm_cost_usd_total = Counter(
    "llm_cost_usd_total",
    "累计 API 成本(美元)",
    ["model"]
)

active_requests = Gauge(
    "llm_active_requests",
    "当前进行中的 LLM 请求数"
)

cache_hit_rate = Summary(
    "llm_cache_hit_ratio",
    "缓存命中率统计"
)

def instrument_llm_call(func):
    """装饰器:自动记录 LLM 调用指标"""
    @functools.wraps(func)
    async def wrapper(*args, model="gpt-4o-mini", **kwargs):
        active_requests.inc()
        start = time.time()
        status = "success"

        try:
            result = await func(*args, model=model, **kwargs)
            return result
        except Exception as e:
            status = "error"
            raise
        finally:
            elapsed = time.time() - start
            active_requests.dec()
            llm_requests_total.labels(model=model, status=status).inc()
            llm_latency_seconds.labels(model=model).observe(elapsed)

    return wrapper

# 暴露指标端点(FastAPI)
from fastapi import FastAPI
from fastapi.responses import Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST

app = FastAPI()

@app.get("/metrics")
async def metrics():
    """Prometheus 指标采集端点"""
    return Response(generate_latest(REGISTRY), media_type=CONTENT_TYPE_LATEST)

3. 结构化日志与链路追踪

python
# pip install structlog opentelemetry-sdk opentelemetry-exporter-otlp

import structlog
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# 初始化 OpenTelemetry 追踪
tracer_provider = TracerProvider()
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer("llm-service")

# 结构化日志配置
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.stdlib.add_log_level,
        structlog.processors.JSONRenderer()
    ]
)
logger = structlog.get_logger()

async def traced_llm_call(
    messages: list,
    model: str,
    user_id: str,
    session_id: str
) -> str:
    """带 OpenTelemetry 链路追踪的 LLM 调用"""
    with tracer.start_as_current_span("llm.chat") as span:
        span.set_attribute("llm.model", model)
        span.set_attribute("user.id", user_id)
        span.set_attribute("session.id", session_id)
        span.set_attribute("llm.input_messages", len(messages))

        # 绑定上下文变量到日志
        structlog.contextvars.bind_contextvars(
            trace_id=format(span.get_span_context().trace_id, "032x"),
            user_id=user_id,
            session_id=session_id
        )

        start = time.time()
        client = get_openai_client()

        try:
            response = await client.chat.completions.create(
                model=model, messages=messages
            )
            elapsed_ms = int((time.time() - start) * 1000)

            span.set_attribute("llm.input_tokens", response.usage.prompt_tokens)
            span.set_attribute("llm.output_tokens", response.usage.completion_tokens)
            span.set_attribute("llm.latency_ms", elapsed_ms)

            logger.info(
                "llm_call_success",
                model=model,
                input_tokens=response.usage.prompt_tokens,
                output_tokens=response.usage.completion_tokens,
                latency_ms=elapsed_ms,
                cost_usd=calculate_cost(model, response.usage.prompt_tokens,
                                        response.usage.completion_tokens)
            )

            return response.choices[0].message.content

        except Exception as e:
            span.record_exception(e)
            span.set_status(trace.StatusCode.ERROR, str(e))
            logger.error("llm_call_failed", model=model, error=str(e))
            raise
        finally:
            structlog.contextvars.clear_contextvars()

7.3.5 综合实战:性能优化仪表板

python
"""
性能优化效果量化:缓存命中率、延迟分布、成本趋势统计
"""

from fastapi import FastAPI, Depends
from openai import AsyncOpenAI
import time

app = FastAPI()
aclient = AsyncOpenAI()

# 集成所有优化组件
exact_cache = ExactMatchCache(ttl=7200)
sem_cache = SemanticCache(similarity_threshold=0.92)

class OptimizedLLMService:
    """集成缓存 + 成本追踪 + 指标上报的优化服务"""

    def __init__(self):
        self.cache_hits = {"exact": 0, "semantic": 0, "miss": 0}

    async def chat(
        self,
        messages: list,
        model: str = "gpt-4o-mini",
        user_id: str = "anonymous",
        temperature: float = 0,
        **kwargs
    ) -> dict:
        start = time.time()
        cache_type = "miss"
        input_tokens = output_tokens = 0

        # L1: 精确缓存
        cached = exact_cache.get(messages, model, temperature)
        if cached:
            cache_type = "exact"
            self.cache_hits["exact"] += 1
            cache_hit_rate.observe(1.0)
            return {**cached, "cache": "exact", "latency_ms": int((time.time()-start)*1000)}

        # L2: 语义缓存
        query = " ".join(m.get("content","") for m in messages if m.get("role")=="user")
        sem_result, sim = sem_cache.get(query)
        if sem_result:
            cache_type = "semantic"
            self.cache_hits["semantic"] += 1
            cache_hit_rate.observe(0.8)
            return {"response": sem_result, "cache": f"semantic({sim:.2f})",
                    "latency_ms": int((time.time()-start)*1000)}

        # L3: 上下文压缩 + 模型选择
        context_tokens = count_tokens(messages, model)
        if context_tokens > 3000:
            messages = compress_conversation_history(messages, max_tokens=3000)

        smart_model = select_model_by_complexity(query, count_tokens(messages, model))
        if model == "gpt-4o-mini":  # 仅在用户未指定强模型时自动降级
            model = smart_model

        # 调用 LLM
        active_requests.inc()
        try:
            response = await aclient.chat.completions.create(
                model=model, messages=messages, temperature=temperature, **kwargs
            )
            content = response.choices[0].message.content
            input_tokens = response.usage.prompt_tokens
            output_tokens = response.usage.completion_tokens
        finally:
            active_requests.dec()

        latency_ms = int((time.time() - start) * 1000)
        cost = calculate_cost(model, input_tokens, output_tokens)

        # 更新指标
        llm_requests_total.labels(model=model, status="success").inc()
        llm_latency_seconds.labels(model=model).observe(latency_ms / 1000)
        llm_tokens_total.labels(model=model, type="input").inc(input_tokens)
        llm_tokens_total.labels(model=model, type="output").inc(output_tokens)
        llm_cost_usd_total.labels(model=model).inc(cost)
        cache_hit_rate.observe(0.0)
        self.cache_hits["miss"] += 1

        # 写入缓存
        exact_cache.set(messages, model, temperature,
                        {"content": content}, {"input": input_tokens, "output": output_tokens})
        sem_cache.set(query, content)

        # 成本追踪
        record_usage(user_id, model, input_tokens, output_tokens)

        return {
            "content": content,
            "model": model,
            "cache": "miss",
            "latency_ms": latency_ms,
            "tokens": {"input": input_tokens, "output": output_tokens},
            "cost_usd": round(cost, 6)
        }

    def get_stats(self) -> dict:
        total = sum(self.cache_hits.values())
        return {
            "total_requests": total,
            "cache_hits": self.cache_hits,
            "hit_rate": round((self.cache_hits["exact"] + self.cache_hits["semantic"]) / max(total, 1), 3)
        }

llm_service = OptimizedLLMService()

@app.post("/v1/chat/optimized")
async def optimized_chat(message: str, user_id: str = "anonymous"):
    return await llm_service.chat(
        messages=[{"role": "user", "content": message}],
        user_id=user_id
    )

@app.get("/v1/stats")
async def get_stats():
    return llm_service.get_stats()

坚持是一种品格