7.3 性能优化
LLM 应用的性能瓶颈不在 CPU,而在 Token 成本、API 延迟和并发吞吐。优化的核心是「少调用、快响应、省成本」。
学习时长:2-3 周
LLM 应用的性能优化目标是三个维度的平衡:低延迟(用户体验)、低成本(API 费用)、高吞吐(服务容量)。三者往往相互制约,需要结合业务场景做取舍。本节覆盖缓存、并发、Token 压缩到可观测性的完整工程实践。
7.3.1 缓存策略
缓存层次架构
请求 → [L1: 精确匹配缓存] → [L2: 语义相似缓存] → [L3: Prompt 前缀缓存] → LLM API
(Redis, ~1ms) (向量检索, ~10ms) (API 侧, 自动) (500-3000ms)1. 精确匹配缓存(Redis)
python
# pip install redis orjson hashlib
import redis
import hashlib
import orjson
import time
from typing import Optional
redis_client = redis.Redis(host="localhost", port=6379, db=4, decode_responses=False)
class ExactMatchCache:
"""精确匹配缓存:相同输入 → 直接返回缓存结果"""
def __init__(self, ttl: int = 3600, max_input_length: int = 500):
self.ttl = ttl
self.max_input_length = max_input_length
self.prefix = "llm:exact:"
def _make_key(self, messages: list, model: str, temperature: float) -> str:
"""生成缓存 key(消息内容 + 模型参数的哈希)"""
# temperature=0 的确定性输出才适合缓存
payload = orjson.dumps({
"messages": messages,
"model": model,
"temperature": temperature
}, option=orjson.OPT_SORT_KEYS)
return self.prefix + hashlib.sha256(payload).hexdigest()
def get(self, messages: list, model: str, temperature: float = 0) -> Optional[dict]:
if temperature > 0.1: # 有随机性的输出不缓存
return None
key = self._make_key(messages, model, temperature)
raw = redis_client.get(key)
if raw:
redis_client.expire(key, self.ttl) # 续期
return orjson.loads(raw)
return None
def set(self, messages: list, model: str, temperature: float,
response: dict, usage: dict):
if temperature > 0.1:
return
key = self._make_key(messages, model, temperature)
data = {
"response": response,
"usage": usage,
"cached_at": int(time.time()),
"hit_count": 0
}
redis_client.setex(key, self.ttl, orjson.dumps(data))
def increment_hit(self, messages: list, model: str, temperature: float):
key = self._make_key(messages, model, temperature)
redis_client.hincrby(key, "hit_count", 1)
exact_cache = ExactMatchCache(ttl=7200)2. 语义相似缓存
python
# pip install numpy openai redis
import numpy as np
from openai import OpenAI
client = OpenAI()
class SemanticCache:
"""语义缓存:相似问题复用答案(已在 7.1 中介绍,此处扩展为生产级版本)"""
def __init__(
self,
similarity_threshold: float = 0.92,
ttl: int = 3600,
max_cache_size: int = 10000
):
self.threshold = similarity_threshold
self.ttl = ttl
self.max_cache_size = max_cache_size
self.prefix = "llm:semantic:"
self.index_key = "llm:semantic:index"
def _embed(self, text: str) -> np.ndarray:
"""获取文本向量"""
resp = client.embeddings.create(
model="text-embedding-3-small",
input=text[:2000] # 限制长度,控制成本
)
return np.array(resp.data[0].embedding, dtype=np.float32)
def _cosine_sim(self, a: np.ndarray, b: np.ndarray) -> float:
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-8))
def get(self, query: str) -> Optional[tuple[str, float]]:
"""查找语义相似答案,返回 (答案, 相似度)"""
query_vec = self._embed(query)
# 获取所有缓存条目(生产环境应使用向量数据库)
keys = redis_client.lrange(self.index_key, 0, -1)
best_key, best_sim = None, 0.0
for key in keys:
raw = redis_client.get(key)
if not raw:
continue
entry = orjson.loads(raw)
cached_vec = np.array(entry["embedding"], dtype=np.float32)
sim = self._cosine_sim(query_vec, cached_vec)
if sim > best_sim:
best_sim, best_key = sim, key
if best_sim >= self.threshold and best_key:
entry = orjson.loads(redis_client.get(best_key))
redis_client.expire(best_key, self.ttl)
return entry["response"], best_sim
return None, 0.0
def set(self, query: str, response: str):
"""写入缓存"""
import hashlib
key = self.prefix + hashlib.md5(query.encode()).hexdigest()
vec = self._embed(query)
entry = {
"query": query[:200],
"embedding": vec.tolist(),
"response": response,
"created_at": int(time.time())
}
redis_client.setex(key, self.ttl, orjson.dumps(entry))
# 维护索引(限制缓存数量)
redis_client.lpush(self.index_key, key)
redis_client.ltrim(self.index_key, 0, self.max_cache_size - 1)
semantic_cache = SemanticCache(similarity_threshold=0.92)3. OpenAI Prompt 缓存(自动,无需额外代码)
python
# OpenAI 对超过 1024 tokens 的相同前缀自动缓存,费率降低 50%
# 利用规则:将静态内容(系统提示、RAG 文档)放在消息最前面
def build_cache_friendly_messages(
system_prompt: str,
retrieved_docs: list[str],
conversation_history: list[dict],
user_message: str
) -> list[dict]:
"""
构建对 Prompt Cache 友好的消息结构
原则:静态内容(可复用)放前面,动态内容(每次不同)放后面
"""
messages = []
# 1. 系统提示(完全静态,每次相同)→ 最先,最易命中缓存
messages.append({
"role": "system",
"content": system_prompt # 确保这部分内容不变
})
# 2. RAG 检索文档(同一知识库中相同文档会复用缓存)
if retrieved_docs:
docs_text = "\n\n".join([f"[文档{i+1}]\n{doc}" for i, doc in enumerate(retrieved_docs)])
messages.append({
"role": "user",
"content": f"参考以下文档回答问题:\n\n{docs_text}"
})
messages.append({
"role": "assistant",
"content": "好的,我已阅读参考文档,请提问。"
})
# 3. 对话历史(相对静态,追加模式下前几轮可命中缓存)
messages.extend(conversation_history[-6:]) # 保留最近3轮
# 4. 当前用户输入(完全动态)→ 最后
messages.append({"role": "user", "content": user_message})
return messages
# Anthropic Claude 显式 Prompt Cache(cost↓74%,延迟↓85%)
def build_claude_cached_messages(system_prompt: str, docs: list[str], question: str):
"""Claude 显式标记缓存断点"""
import anthropic
aclient = anthropic.Anthropic()
system_blocks = [
{
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"} # 标记此处为缓存断点
}
]
user_content = []
if docs:
docs_text = "\n\n".join(docs)
user_content.append({
"type": "text",
"text": f"参考文档:\n{docs_text}",
"cache_control": {"type": "ephemeral"} # 文档内容也缓存
})
user_content.append({"type": "text", "text": question})
response = aclient.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=system_blocks,
messages=[{"role": "user", "content": user_content}]
)
# 查看缓存命中情况
usage = response.usage
print(f"缓存读取 tokens: {getattr(usage, 'cache_read_input_tokens', 0)}")
print(f"缓存创建 tokens: {getattr(usage, 'cache_creation_input_tokens', 0)}")
return response.content[0].text7.3.2 并发与异步处理
1. 异步批量 LLM 调用
python
# pip install openai asyncio
import asyncio
from openai import AsyncOpenAI
from typing import Any
aclient = AsyncOpenAI()
async def call_llm_single(
messages: list,
model: str = "gpt-4o-mini",
semaphore: asyncio.Semaphore = None,
**kwargs
) -> dict:
"""单个异步 LLM 调用(带信号量控制并发)"""
if semaphore:
async with semaphore:
response = await aclient.chat.completions.create(
model=model, messages=messages, **kwargs
)
else:
response = await aclient.chat.completions.create(
model=model, messages=messages, **kwargs
)
return {
"content": response.choices[0].message.content,
"usage": response.usage.model_dump()
}
async def batch_llm_calls(
batch: list[dict], # [{"messages": [...], "meta": {...}}, ...]
model: str = "gpt-4o-mini",
max_concurrency: int = 10, # 最大并发数(避免触发限流)
**kwargs
) -> list[dict]:
"""并发批量 LLM 调用(控制并发上限)"""
semaphore = asyncio.Semaphore(max_concurrency)
async def call_with_meta(item: dict) -> dict:
result = await call_llm_single(
item["messages"], model, semaphore, **kwargs
)
return {**result, "meta": item.get("meta", {})}
tasks = [call_with_meta(item) for item in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常
output = []
for i, result in enumerate(results):
if isinstance(result, Exception):
output.append({"error": str(result), "meta": batch[i].get("meta", {})})
else:
output.append(result)
return output
# 使用示例:批量翻译
async def batch_translate(texts: list[str], target_lang: str = "英文") -> list[str]:
batch = [
{
"messages": [{"role": "user",
"content": f"将以下内容翻译为{target_lang},只输出译文:\n{text}"}],
"meta": {"index": i, "original": text}
}
for i, text in enumerate(texts)
]
results = await batch_llm_calls(batch, model="gpt-4o-mini", max_concurrency=8)
# 按原始顺序排列结果
ordered = sorted(results, key=lambda x: x["meta"].get("index", 0))
return [r.get("content", r.get("error", "")) for r in ordered]
# 运行
texts = ["你好", "人工智能很有趣", "今天天气不错"]
translations = asyncio.run(batch_translate(texts))2. 连接池与客户端复用
python
import httpx
from openai import AsyncOpenAI
from contextlib import asynccontextmanager
# 全局复用的 HTTP 连接池(避免每次请求建立新连接)
_http_client: httpx.AsyncClient | None = None
_openai_client: AsyncOpenAI | None = None
def get_http_client() -> httpx.AsyncClient:
global _http_client
if _http_client is None or _http_client.is_closed:
_http_client = httpx.AsyncClient(
timeout=httpx.Timeout(60.0, connect=5.0),
limits=httpx.Limits(
max_connections=100,
max_keepalive_connections=20,
keepalive_expiry=30
)
)
return _http_client
def get_openai_client() -> AsyncOpenAI:
global _openai_client
if _openai_client is None:
_openai_client = AsyncOpenAI(
http_client=get_http_client(),
max_retries=2,
timeout=60.0
)
return _openai_client
# FastAPI 生命周期中初始化和清理
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时预热连接池
get_openai_client()
yield
# 关闭时释放连接
if _http_client and not _http_client.is_closed:
await _http_client.aclose()3. 请求队列与背压控制
python
import asyncio
from dataclasses import dataclass, field
from typing import Callable
@dataclass
class QueuedRequest:
request_id: str
messages: list
model: str
priority: int = 0 # 优先级(数值越大越优先)
future: asyncio.Future = field(default_factory=asyncio.get_event_loop().create_future)
created_at: float = field(default_factory=asyncio.get_event_loop().time)
class LLMRequestQueue:
"""LLM 请求队列:控制并发、支持优先级、防止过载"""
def __init__(
self,
max_concurrency: int = 10,
max_queue_size: int = 100,
timeout: float = 60.0
):
self.max_concurrency = max_concurrency
self.max_queue_size = max_queue_size
self.timeout = timeout
self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue(max_queue_size)
self._semaphore = asyncio.Semaphore(max_concurrency)
self._running = False
async def start(self):
self._running = True
asyncio.create_task(self._worker())
async def _worker(self):
while self._running:
try:
_, req = await asyncio.wait_for(self._queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
asyncio.create_task(self._process(req))
async def _process(self, req: QueuedRequest):
async with self._semaphore:
try:
client = get_openai_client()
response = await asyncio.wait_for(
client.chat.completions.create(
model=req.model, messages=req.messages
),
timeout=self.timeout
)
req.future.set_result(response.choices[0].message.content)
except Exception as e:
req.future.set_exception(e)
async def submit(self, messages: list, model: str = "gpt-4o-mini",
priority: int = 0) -> str:
"""提交请求到队列,返回结果"""
if self._queue.full():
raise RuntimeError("请求队列已满,请稍后重试")
loop = asyncio.get_event_loop()
req = QueuedRequest(
request_id=str(id(messages)),
messages=messages,
model=model,
priority=priority,
future=loop.create_future()
)
await self._queue.put((-priority, req)) # 负号实现最大优先队列
return await req.future
request_queue = LLMRequestQueue(max_concurrency=10, max_queue_size=100)7.3.3 Token 用量优化与成本控制
1. 上下文压缩
python
from openai import OpenAI
import tiktoken
client = OpenAI()
def count_tokens(messages: list, model: str = "gpt-4o") -> int:
"""精确计算消息列表的 token 数"""
try:
enc = tiktoken.encoding_for_model(model)
except KeyError:
enc = tiktoken.get_encoding("cl100k_base")
num_tokens = 3 # 每条消息的固定开销
for msg in messages:
num_tokens += 4 # role + content 的 overhead
content = msg.get("content", "")
if isinstance(content, str):
num_tokens += len(enc.encode(content))
elif isinstance(content, list):
for block in content:
if block.get("type") == "text":
num_tokens += len(enc.encode(block.get("text", "")))
return num_tokens
def compress_conversation_history(
messages: list,
max_tokens: int = 3000,
model: str = "gpt-4o-mini",
keep_recent_turns: int = 3 # 始终保留最近 N 轮原文
) -> list:
"""
超过 token 限制时压缩历史对话:
策略:保留系统提示 + 最近几轮 + 摘要早期对话
"""
if not messages:
return messages
system_msgs = [m for m in messages if m["role"] == "system"]
non_system = [m for m in messages if m["role"] != "system"]
# 如果未超限,直接返回
if count_tokens(messages, model) <= max_tokens:
return messages
# 保留最近 N 轮(keep_recent_turns * 2 条消息)
recent = non_system[-(keep_recent_turns * 2):]
older = non_system[:-(keep_recent_turns * 2)]
if not older:
return messages # 无法再压缩
# 对早期对话生成摘要
older_text = "\n".join([
f"{'用户' if m['role']=='user' else 'AI'}: {m['content'][:200]}"
for m in older
])
summary_response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "请用3-5句话总结以下对话的关键信息,保留重要事实:"},
{"role": "user", "content": older_text}
],
max_tokens=300
)
summary = summary_response.choices[0].message.content
# 构建压缩后的消息列表
summary_msg = {
"role": "system",
"content": f"[早期对话摘要]\n{summary}"
}
compressed = system_msgs + [summary_msg] + recent
print(f"历史压缩: {count_tokens(messages)} → {count_tokens(compressed)} tokens")
return compressed
def trim_system_prompt(system_prompt: str, max_chars: int = 2000) -> str:
"""裁剪过长的系统提示词"""
if len(system_prompt) <= max_chars:
return system_prompt
# 保留开头和结尾(通常最重要)
half = max_chars // 2
return system_prompt[:half] + "\n...[已省略中间部分]...\n" + system_prompt[-half:]2. 成本追踪与预算告警
python
import redis
import time
from dataclasses import dataclass
# 各模型每 1K token 价格(美元,2025年参考价)
MODEL_PRICING = {
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"claude-3-5-sonnet-20241022": {"input": 0.003, "output": 0.015},
"claude-3-5-haiku-20241022": {"input": 0.0008, "output": 0.004},
"qwen-plus": {"input": 0.0004, "output": 0.0012},
"qwen-turbo": {"input": 0.00008, "output": 0.0002},
}
redis_client = redis.Redis(host="localhost", port=6379, db=5)
@dataclass
class UsageRecord:
model: str
input_tokens: int
output_tokens: int
cost_usd: float
user_id: str
timestamp: float
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""计算 API 调用成本(美元)"""
pricing = MODEL_PRICING.get(model, {"input": 0.01, "output": 0.03})
return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1000
def record_usage(user_id: str, model: str, input_tokens: int, output_tokens: int):
"""记录用量并检查预算"""
cost = calculate_cost(model, input_tokens, output_tokens)
today = time.strftime("%Y-%m-%d")
# 累计用量
pipe = redis_client.pipeline()
pipe.incrbyfloat(f"cost:user:{user_id}:{today}", cost)
pipe.incrbyfloat(f"cost:global:{today}", cost)
pipe.incrby(f"tokens:user:{user_id}:{today}", input_tokens + output_tokens)
pipe.expire(f"cost:user:{user_id}:{today}", 86400 * 7)
pipe.expire(f"cost:global:{today}", 86400 * 30)
pipe.execute()
# 预算告警
daily_cost = float(redis_client.get(f"cost:user:{user_id}:{today}") or 0)
_check_budget_alert(user_id, daily_cost)
return cost
BUDGET_THRESHOLDS = {
"free": 0.1, # 免费用户日预算 $0.1
"pro": 5.0, # 付费用户日预算 $5
"enterprise": 100 # 企业用户日预算 $100
}
def _check_budget_alert(user_id: str, current_cost: float):
"""检查是否超出预算阈值"""
user_tier = "free" # 实际从数据库获取
threshold = BUDGET_THRESHOLDS.get(user_tier, 0.1)
if current_cost > threshold * 0.8:
print(f"⚠️ 用户 {user_id} 已使用 {current_cost/threshold*100:.0f}% 的日预算")
if current_cost > threshold:
raise RuntimeError(f"已超出日预算限制(${threshold}),请升级套餐或明天再试")
def get_cost_report(user_id: str, days: int = 7) -> dict:
"""获取用户用量报告"""
report = {}
for i in range(days):
date = time.strftime("%Y-%m-%d", time.localtime(time.time() - i * 86400))
cost = float(redis_client.get(f"cost:user:{user_id}:{date}") or 0)
tokens = int(redis_client.get(f"tokens:user:{user_id}:{date}") or 0)
report[date] = {"cost_usd": round(cost, 4), "tokens": tokens}
return report3. Prompt 工程优化(减少 Token)
python
def optimize_prompt_tokens(
system_prompt: str,
user_message: str,
remove_filler: bool = True,
use_abbreviations: bool = True
) -> tuple[str, str]:
"""
Token 优化技巧(在保持语义的前提下减少 token 数)
"""
import re
if remove_filler:
# 移除冗余的礼貌用语和填充词(系统提示词中)
filler_patterns = [
r"请注意,?",
r"请记住,?",
r"你必须始终",
r"在任何情况下都",
r"非常重要的是,?",
]
for pattern in filler_patterns:
system_prompt = re.sub(pattern, "", system_prompt)
# 压缩多余空行
system_prompt = re.sub(r"\n{3,}", "\n\n", system_prompt.strip())
if use_abbreviations:
# 示例:长描述替换为简洁版本
replacements = {
"你是一个非常专业且经验丰富的": "你是专业的",
"请确保你的回答": "回答需",
"以清晰、结构化的方式": "清晰结构化地",
}
for long, short in replacements.items():
system_prompt = system_prompt.replace(long, short)
return system_prompt.strip(), user_message.strip()
def select_model_by_complexity(user_message: str, context_tokens: int) -> str:
"""
根据问题复杂度动态选择模型(节省成本)
简单问题用便宜模型,复杂问题用强模型
"""
import re
# 简单问题特征(直接用小模型)
simple_patterns = [
r"^(你好|hello|hi|谢谢|感谢).*$",
r"^(是|否|对|不对|好的|好).{0,5}$",
r"^\d+[\+\-\*\/]\d+", # 简单计算
r"^(翻译|translate).{0,20}$", # 短翻译
]
for p in simple_patterns:
if re.match(p, user_message.strip(), re.IGNORECASE):
return "gpt-4o-mini"
# 复杂问题特征(用强模型)
complex_signals = [
"代码", "分析", "设计", "架构", "推理", "证明",
"比较", "评估", "优化", "debug", "为什么"
]
complexity_score = sum(1 for s in complex_signals if s in user_message)
if context_tokens > 8000 or complexity_score >= 3 or len(user_message) > 1000:
return "gpt-4o"
elif complexity_score >= 1 or len(user_message) > 200:
return "gpt-4o-mini"
else:
return "gpt-4o-mini"7.3.4 监控与可观测性
1. Langfuse 集成(开源 LLM 可观测平台)
python
# pip install langfuse openai
from langfuse import Langfuse
from langfuse.openai import openai # 自动 patch OpenAI 客户端
import time
langfuse = Langfuse(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://cloud.langfuse.com" # 或自托管地址
)
# 方式一:自动追踪(使用 langfuse.openai 替换标准 openai)
def chat_with_tracing(user_id: str, session_id: str, message: str) -> str:
"""自动追踪的对话调用"""
response = openai.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": message}],
# Langfuse 元数据
name="user-chat",
metadata={"user_id": user_id, "session_id": session_id},
tags=["production", "chat"],
user_id=user_id,
session_id=session_id
)
return response.choices[0].message.content
# 方式二:手动追踪(更精细的控制)
def rag_pipeline_with_tracing(question: str, user_id: str) -> str:
"""手动追踪 RAG 流水线每个步骤"""
trace = langfuse.trace(
name="rag-pipeline",
user_id=user_id,
input={"question": question},
tags=["rag", "production"]
)
# 追踪向量检索步骤
retrieval_span = trace.span(name="vector-retrieval")
t0 = time.time()
# ... 执行向量检索 ...
docs = ["文档1", "文档2"]
retrieval_span.end(
output={"docs_count": len(docs), "latency_ms": int((time.time()-t0)*1000)}
)
# 追踪 LLM 生成步骤
llm_gen = trace.generation(
name="llm-generation",
model="gpt-4o-mini",
input=[{"role": "user", "content": question}],
)
from openai import OpenAI as RawOpenAI
raw_client = RawOpenAI()
response = raw_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": question}]
)
answer = response.choices[0].message.content
llm_gen.end(
output=answer,
usage={
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens
}
)
# 记录整体 trace 输出
trace.update(output={"answer": answer})
return answer
# 添加用户反馈(用于评估模型质量)
def record_user_feedback(trace_id: str, score: int, comment: str = ""):
"""记录用户点赞/点踩(1=好, 0=差)"""
langfuse.score(
trace_id=trace_id,
name="user-feedback",
value=score,
comment=comment
)2. 自定义 Prometheus 指标
python
# pip install prometheus-client
from prometheus_client import (
Counter, Histogram, Gauge, Summary,
start_http_server, REGISTRY
)
import functools
import time
# 定义指标
llm_requests_total = Counter(
"llm_requests_total",
"LLM API 请求总数",
["model", "status"] # labels: 成功/失败 × 模型
)
llm_latency_seconds = Histogram(
"llm_latency_seconds",
"LLM API 响应延迟(秒)",
["model"],
buckets=[0.5, 1, 2, 5, 10, 30, 60]
)
llm_tokens_total = Counter(
"llm_tokens_total",
"累计消耗 token 数",
["model", "type"] # type: input / output
)
llm_cost_usd_total = Counter(
"llm_cost_usd_total",
"累计 API 成本(美元)",
["model"]
)
active_requests = Gauge(
"llm_active_requests",
"当前进行中的 LLM 请求数"
)
cache_hit_rate = Summary(
"llm_cache_hit_ratio",
"缓存命中率统计"
)
def instrument_llm_call(func):
"""装饰器:自动记录 LLM 调用指标"""
@functools.wraps(func)
async def wrapper(*args, model="gpt-4o-mini", **kwargs):
active_requests.inc()
start = time.time()
status = "success"
try:
result = await func(*args, model=model, **kwargs)
return result
except Exception as e:
status = "error"
raise
finally:
elapsed = time.time() - start
active_requests.dec()
llm_requests_total.labels(model=model, status=status).inc()
llm_latency_seconds.labels(model=model).observe(elapsed)
return wrapper
# 暴露指标端点(FastAPI)
from fastapi import FastAPI
from fastapi.responses import Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
app = FastAPI()
@app.get("/metrics")
async def metrics():
"""Prometheus 指标采集端点"""
return Response(generate_latest(REGISTRY), media_type=CONTENT_TYPE_LATEST)3. 结构化日志与链路追踪
python
# pip install structlog opentelemetry-sdk opentelemetry-exporter-otlp
import structlog
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# 初始化 OpenTelemetry 追踪
tracer_provider = TracerProvider()
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer("llm-service")
# 结构化日志配置
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_log_level,
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
async def traced_llm_call(
messages: list,
model: str,
user_id: str,
session_id: str
) -> str:
"""带 OpenTelemetry 链路追踪的 LLM 调用"""
with tracer.start_as_current_span("llm.chat") as span:
span.set_attribute("llm.model", model)
span.set_attribute("user.id", user_id)
span.set_attribute("session.id", session_id)
span.set_attribute("llm.input_messages", len(messages))
# 绑定上下文变量到日志
structlog.contextvars.bind_contextvars(
trace_id=format(span.get_span_context().trace_id, "032x"),
user_id=user_id,
session_id=session_id
)
start = time.time()
client = get_openai_client()
try:
response = await client.chat.completions.create(
model=model, messages=messages
)
elapsed_ms = int((time.time() - start) * 1000)
span.set_attribute("llm.input_tokens", response.usage.prompt_tokens)
span.set_attribute("llm.output_tokens", response.usage.completion_tokens)
span.set_attribute("llm.latency_ms", elapsed_ms)
logger.info(
"llm_call_success",
model=model,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
latency_ms=elapsed_ms,
cost_usd=calculate_cost(model, response.usage.prompt_tokens,
response.usage.completion_tokens)
)
return response.choices[0].message.content
except Exception as e:
span.record_exception(e)
span.set_status(trace.StatusCode.ERROR, str(e))
logger.error("llm_call_failed", model=model, error=str(e))
raise
finally:
structlog.contextvars.clear_contextvars()7.3.5 综合实战:性能优化仪表板
python
"""
性能优化效果量化:缓存命中率、延迟分布、成本趋势统计
"""
from fastapi import FastAPI, Depends
from openai import AsyncOpenAI
import time
app = FastAPI()
aclient = AsyncOpenAI()
# 集成所有优化组件
exact_cache = ExactMatchCache(ttl=7200)
sem_cache = SemanticCache(similarity_threshold=0.92)
class OptimizedLLMService:
"""集成缓存 + 成本追踪 + 指标上报的优化服务"""
def __init__(self):
self.cache_hits = {"exact": 0, "semantic": 0, "miss": 0}
async def chat(
self,
messages: list,
model: str = "gpt-4o-mini",
user_id: str = "anonymous",
temperature: float = 0,
**kwargs
) -> dict:
start = time.time()
cache_type = "miss"
input_tokens = output_tokens = 0
# L1: 精确缓存
cached = exact_cache.get(messages, model, temperature)
if cached:
cache_type = "exact"
self.cache_hits["exact"] += 1
cache_hit_rate.observe(1.0)
return {**cached, "cache": "exact", "latency_ms": int((time.time()-start)*1000)}
# L2: 语义缓存
query = " ".join(m.get("content","") for m in messages if m.get("role")=="user")
sem_result, sim = sem_cache.get(query)
if sem_result:
cache_type = "semantic"
self.cache_hits["semantic"] += 1
cache_hit_rate.observe(0.8)
return {"response": sem_result, "cache": f"semantic({sim:.2f})",
"latency_ms": int((time.time()-start)*1000)}
# L3: 上下文压缩 + 模型选择
context_tokens = count_tokens(messages, model)
if context_tokens > 3000:
messages = compress_conversation_history(messages, max_tokens=3000)
smart_model = select_model_by_complexity(query, count_tokens(messages, model))
if model == "gpt-4o-mini": # 仅在用户未指定强模型时自动降级
model = smart_model
# 调用 LLM
active_requests.inc()
try:
response = await aclient.chat.completions.create(
model=model, messages=messages, temperature=temperature, **kwargs
)
content = response.choices[0].message.content
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
finally:
active_requests.dec()
latency_ms = int((time.time() - start) * 1000)
cost = calculate_cost(model, input_tokens, output_tokens)
# 更新指标
llm_requests_total.labels(model=model, status="success").inc()
llm_latency_seconds.labels(model=model).observe(latency_ms / 1000)
llm_tokens_total.labels(model=model, type="input").inc(input_tokens)
llm_tokens_total.labels(model=model, type="output").inc(output_tokens)
llm_cost_usd_total.labels(model=model).inc(cost)
cache_hit_rate.observe(0.0)
self.cache_hits["miss"] += 1
# 写入缓存
exact_cache.set(messages, model, temperature,
{"content": content}, {"input": input_tokens, "output": output_tokens})
sem_cache.set(query, content)
# 成本追踪
record_usage(user_id, model, input_tokens, output_tokens)
return {
"content": content,
"model": model,
"cache": "miss",
"latency_ms": latency_ms,
"tokens": {"input": input_tokens, "output": output_tokens},
"cost_usd": round(cost, 6)
}
def get_stats(self) -> dict:
total = sum(self.cache_hits.values())
return {
"total_requests": total,
"cache_hits": self.cache_hits,
"hit_rate": round((self.cache_hits["exact"] + self.cache_hits["semantic"]) / max(total, 1), 3)
}
llm_service = OptimizedLLMService()
@app.post("/v1/chat/optimized")
async def optimized_chat(message: str, user_id: str = "anonymous"):
return await llm_service.chat(
messages=[{"role": "user", "content": message}],
user_id=user_id
)
@app.get("/v1/stats")
async def get_stats():
return llm_service.get_stats()