大模型网关设计与实现
构建统一的 LLM Gateway——多模型路由、智能降级、速率限流、Token 计费、请求日志,用 Python + FastAPI 实现一个生产级的大模型统一接入层。
1. 为什么需要大模型网关
1.1 直接调用的痛点:密钥分散、格式不统一、无法监控
直接调用多个模型 API 的五大痛点:
❌ 密钥分散
═══════════════════════════════════════
OpenAI 一个 Key、Claude 一个 Key、豆包一个 Key
散落在各个服务的环境变量里,无法统一管理
❌ 格式不统一
═══════════════════════════════════════
OpenAI 用 tools,Claude 用 input_schema
每换一个模型就要改代码
❌ 无法监控
═══════════════════════════════════════
不知道哪个模型用了多少 Token
不知道哪个用户花了多少钱
❌ 无法容错
═══════════════════════════════════════
GPT-4o 挂了,整个服务就挂了
没有自动降级和重试
❌ 无法限流
═══════════════════════════════════════
某个用户疯狂调用,把配额打爆
影响所有用户1.2 网关解决什么:统一接口 + 路由 + 限流 + 日志
大模型网关的架构:
业务服务 A ──┐
业务服务 B ──┤──→ 🌐 LLM Gateway ──┬──→ OpenAI
业务服务 C ──┘ │ ├──→ Claude
│ ├──→ Gemini
│ └──→ DeepSeek
│
├── 统一 OpenAI 格式接口
├── 智能路由 & 负载均衡
├── 自动降级 & 熔断
├── 速率限制 & 配额
└── 日志 & 计费 & 监控1.3 开源方案对比:OneAPI / LiteLLM / OpenRouter
| 方案 | 类型 | 语言 | 特点 | 适用 |
|---|---|---|---|---|
| OneAPI | 自部署 | Go | 可视化管理、渠道管理完善 | 团队内部 |
| LiteLLM | SDK/代理 | Python | 100+ 模型适配、Proxy 模式 | Python 项目 |
| OpenRouter | 云服务 | — | 统一 API、按量付费 | 不想自建 |
| 本教程 | 自建 | Python | 学习原理、完全可控 | 理解设计 |
1.4 核心架构:我们要构建什么
我们要实现的 LLM Gateway 模块:
┌─────────────────────────────────────────┐
│ FastAPI 网关 │
├──────────┬──────────┬──────────┬────────┤
│ 统一接口 │ 路由层 │ 限流层 │ 日志层 │
│ OpenAI │ 模型路由 │ RPM/TPM │ 请求记录│
│ 格式适配 │ 负载均衡 │ 配额管理 │ Token │
│ │ 健康检查 │ 令牌桶 │ 计费 │
├──────────┴──────────┴──────────┴────────┤
│ Provider 适配层 │
├──────┬──────┬──────┬──────┬─────────────┤
│OpenAI│Claude│Gemini│豆包 │ DeepSeek │
└──────┴──────┴──────┴──────┴─────────────┘第 1 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| LLM Gateway | 大模型统一接入层,解决多模型管理问题 |
| 核心能力 | 统一接口 + 路由 + 降级 + 限流 + 日志 |
| OneAPI | Go 实现的开源方案,可视化管理 |
2. 统一 API 接口设计:一套格式适配所有模型
2.1 以 OpenAI 格式为标准:为什么选它
| 理由 | 说明 |
|---|---|
| 行业标准 | 几乎所有模型都提供 OpenAI 兼容接口 |
| 生态成熟 | SDK、工具链都围绕 OpenAI 格式设计 |
| 业务无感 | 业务代码不需要知道后端用的是哪个模型 |
2.2 统一请求/响应数据模型
python
from pydantic import BaseModel, Field
from typing import Any
class ChatMessage(BaseModel):
role: str # system / user / assistant / tool
content: str | list[dict] | None = None
name: str | None = None
tool_calls: list[dict] | None = None
tool_call_id: str | None = None
class ChatRequest(BaseModel):
"""统一请求格式(OpenAI 兼容)"""
model: str # 虚拟模型名,如 "fast" / "smart"
messages: list[ChatMessage]
temperature: float = 0.7
max_tokens: int | None = None
stream: bool = False
tools: list[dict] | None = None
tool_choice: str | dict | None = None
# 网关扩展字段
user_id: str | None = None # 用于限流和计费
metadata: dict[str, Any] = {} # 业务元数据
class ChatResponse(BaseModel):
"""统一响应格式"""
id: str
model: str # 实际使用的模型名
choices: list[dict]
usage: dict # {"prompt_tokens": N, "completion_tokens": M, "total_tokens": T}
# 网关扩展
gateway_model: str = "" # 虚拟模型名
provider: str = "" # 实际 Provider
latency_ms: int = 0 # 响应延迟2.3 模型名映射:虚拟模型名 → 实际模型
python
# 虚拟模型名映射配置
MODEL_MAPPING = {
# 虚拟名 → 实际模型配置
"fast": {
"provider": "openai",
"model": "gpt-4o-mini",
"fallback": ["deepseek-chat", "qwen-turbo"],
},
"smart": {
"provider": "openai",
"model": "gpt-4o",
"fallback": ["claude-3-5-sonnet", "deepseek-chat"],
},
"cheap": {
"provider": "deepseek",
"model": "deepseek-chat",
"fallback": ["qwen-turbo"],
},
# 也支持直接指定模型名
"gpt-4o": {"provider": "openai", "model": "gpt-4o"},
"claude-3-5-sonnet": {"provider": "anthropic", "model": "claude-3-5-sonnet-20241022"},
}
def resolve_model(virtual_name: str) -> dict:
"""解析虚拟模型名为实际模型配置"""
config = MODEL_MAPPING.get(virtual_name)
if not config:
raise ValueError(f"未知模型: {virtual_name}")
return config2.4 流式与非流式的统一处理
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI(title="LLM Gateway")
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatRequest):
"""统一入口:兼容 OpenAI 格式"""
model_config = resolve_model(request.model)
provider = get_provider(model_config["provider"])
if request.stream:
async def stream_generator():
async for chunk in provider.chat_stream(request, model_config):
yield f"data: {json.dumps(chunk)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(stream_generator(), media_type="text/event-stream")
else:
response = await provider.chat(request, model_config)
return response💡 虚拟模型名是网关的精髓——业务代码用
model="fast"或model="smart",网关负责映射到实际模型。切换模型只需改网关配置,业务代码零改动。
第 2 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| OpenAI 格式 | 行业标准,所有模型都能适配 |
| 虚拟模型名 | fast/smart/cheap 映射到实际模型 |
| 统一入口 | /v1/chat/completions 一个接口搞定 |
3. 多模型适配层:OpenAI / Claude / Gemini / 国产模型
3.1 Provider 抽象接口设计
python
from abc import ABC, abstractmethod
class LLMProvider(ABC):
"""大模型 Provider 抽象基类"""
@abstractmethod
async def chat(self, request: ChatRequest, config: dict) -> ChatResponse:
"""非流式调用"""
pass
@abstractmethod
async def chat_stream(self, request: ChatRequest, config: dict):
"""流式调用,yield 标准 chunk"""
pass
@abstractmethod
def is_healthy(self) -> bool:
"""健康检查"""
pass
# Provider 注册表
PROVIDERS: dict[str, LLMProvider] = {}
def register_provider(name: str, provider: LLMProvider):
PROVIDERS[name] = provider
def get_provider(name: str) -> LLMProvider:
if name not in PROVIDERS:
raise ValueError(f"未注册的 Provider: {name}")
return PROVIDERS[name]3.2 OpenAI Provider 实现
python
from openai import AsyncOpenAI
import time, uuid
class OpenAIProvider(LLMProvider):
def __init__(self, api_key: str, base_url: str = None):
self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
self._healthy = True
async def chat(self, request: ChatRequest, config: dict) -> ChatResponse:
start = time.time()
response = await self.client.chat.completions.create(
model=config["model"],
messages=[m.model_dump(exclude_none=True) for m in request.messages],
temperature=request.temperature,
max_tokens=request.max_tokens,
tools=request.tools,
)
latency = int((time.time() - start) * 1000)
return ChatResponse(
id=response.id,
model=response.model,
choices=[c.model_dump() for c in response.choices],
usage=response.usage.model_dump(),
provider="openai",
latency_ms=latency,
)
async def chat_stream(self, request: ChatRequest, config: dict):
stream = await self.client.chat.completions.create(
model=config["model"],
messages=[m.model_dump(exclude_none=True) for m in request.messages],
temperature=request.temperature,
stream=True,
)
async for chunk in stream:
yield chunk.model_dump()
def is_healthy(self) -> bool:
return self._healthy3.3 Claude Provider:格式转换与适配
python
import anthropic
class ClaudeProvider(LLMProvider):
def __init__(self, api_key: str):
self.client = anthropic.AsyncAnthropic(api_key=api_key)
async def chat(self, request: ChatRequest, config: dict) -> ChatResponse:
# 格式转换:OpenAI → Claude
system_msg = next((m.content for m in request.messages if m.role == "system"), None)
messages = [{"role": m.role, "content": m.content}
for m in request.messages if m.role != "system"]
# tools 转换:parameters → input_schema
tools = None
if request.tools:
tools = [{
"name": t["function"]["name"],
"description": t["function"].get("description", ""),
"input_schema": t["function"]["parameters"],
} for t in request.tools]
response = await self.client.messages.create(
model=config["model"],
system=system_msg or "",
messages=messages,
max_tokens=request.max_tokens or 4096,
tools=tools,
)
# 响应归一化:Claude → OpenAI 格式
return self._normalize_response(response)
def _normalize_response(self, response) -> ChatResponse:
"""Claude 响应转 OpenAI 格式"""
content = ""
tool_calls = []
for block in response.content:
if block.type == "text":
content = block.text
elif block.type == "tool_use":
tool_calls.append({
"id": block.id, "type": "function",
"function": {"name": block.name, "arguments": json.dumps(block.input)}
})
return ChatResponse(
id=response.id, model=response.model,
choices=[{"message": {"role": "assistant", "content": content,
"tool_calls": tool_calls or None}}],
usage={"prompt_tokens": response.usage.input_tokens,
"completion_tokens": response.usage.output_tokens,
"total_tokens": response.usage.input_tokens + response.usage.output_tokens},
provider="anthropic",
)3.4 国产模型适配:豆包 / 通义 / DeepSeek
python
class DeepSeekProvider(LLMProvider):
"""DeepSeek:完全兼容 OpenAI 格式"""
def __init__(self, api_key: str):
self.client = AsyncOpenAI(
api_key=api_key,
base_url="https://api.deepseek.com", # 换个 base_url 就行
)
async def chat(self, request: ChatRequest, config: dict) -> ChatResponse:
# 和 OpenAI 一模一样!
response = await self.client.chat.completions.create(
model=config["model"],
messages=[m.model_dump(exclude_none=True) for m in request.messages],
temperature=request.temperature,
)
return ChatResponse(
id=response.id, model=response.model,
choices=[c.model_dump() for c in response.choices],
usage=response.usage.model_dump(),
provider="deepseek",
)
# 注册所有 Provider
register_provider("openai", OpenAIProvider(api_key=os.getenv("OPENAI_API_KEY")))
register_provider("anthropic", ClaudeProvider(api_key=os.getenv("ANTHROPIC_API_KEY")))
register_provider("deepseek", DeepSeekProvider(api_key=os.getenv("DEEPSEEK_API_KEY")))💡 DeepSeek/通义/豆包 都兼容 OpenAI 格式——只需换
base_url就行。真正需要适配的只有 Claude(input_schema)和 Gemini(大写类型)。
第 3 章核心知识回顾:
| Provider | 适配难度 | 关键差异 |
|---|---|---|
| OpenAI | ⭐ 无需适配 | 标准格式 |
| DeepSeek/通义 | ⭐ 换 base_url | 兼容 OpenAI |
| Claude | ⭐⭐⭐ 需转换 | input_schema + content blocks |
| Gemini | ⭐⭐⭐ 需转换 | 大写类型 + 不同响应结构 |
4. 智能路由:模型选择与负载均衡
4.1 基于规则的路由:场景 → 模型映射
python
class RoutingRule(BaseModel):
"""路由规则"""
pattern: str # 匹配模式
target_model: str # 目标模型
priority: int = 0 # 优先级
class Router:
def __init__(self, rules: list[RoutingRule] = None):
self.rules = sorted(rules or [], key=lambda r: r.priority, reverse=True)
def route(self, request: ChatRequest) -> dict:
# 1. 先尝试规则匹配
for rule in self.rules:
if self._match(rule, request):
return resolve_model(rule.target_model)
# 2. 默认按模型名解析
return resolve_model(request.model)4.2 基于成本的路由:便宜模型优先
python
# 模型定价(每百万 Token)
MODEL_PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5": {"input": 3.00, "output": 15.00},
"deepseek-chat": {"input": 0.14, "output": 0.28},
"qwen-turbo": {"input": 0.30, "output": 0.60},
}
def route_by_cost(request: ChatRequest, candidates: list[str]) -> str:
"""按成本排序,优先使用便宜模型"""
available = [m for m in candidates if get_provider_for(m).is_healthy()]
return min(available, key=lambda m: MODEL_PRICING[m]["input"])4.3 负载均衡:同模型多 Key 轮询
python
import itertools
class KeyPool:
"""API Key 池:支持轮询和加权"""
def __init__(self, keys: list[dict]):
# [{"key": "sk-xxx", "weight": 1, "rpm_limit": 500}, ...]
self.keys = keys
self._cycle = itertools.cycle(range(len(keys)))
self._usage = {k["key"]: 0 for k in keys}
def get_next_key(self) -> str:
"""轮询获取下一个可用 Key"""
for _ in range(len(self.keys)):
idx = next(self._cycle)
key_config = self.keys[idx]
if self._usage[key_config["key"]] < key_config.get("rpm_limit", 9999):
self._usage[key_config["key"]] += 1
return key_config["key"]
raise RuntimeError("所有 API Key 均已达到限额")
# 使用
openai_keys = KeyPool([
{"key": "sk-key1", "rpm_limit": 500},
{"key": "sk-key2", "rpm_limit": 500},
{"key": "sk-key3", "rpm_limit": 1000},
])4.4 健康检查与故障自动摘除
python
import asyncio
from datetime import datetime, timedelta
class HealthChecker:
"""Provider 健康检查"""
def __init__(self):
self._status: dict[str, dict] = {} # provider → {healthy, last_check, fail_count}
def record_success(self, provider: str):
self._status[provider] = {"healthy": True, "fail_count": 0, "last_check": datetime.now()}
def record_failure(self, provider: str):
status = self._status.get(provider, {"fail_count": 0})
status["fail_count"] = status.get("fail_count", 0) + 1
status["last_check"] = datetime.now()
# 连续失败 3 次,标记为不健康
if status["fail_count"] >= 3:
status["healthy"] = False
self._status[provider] = status
def is_healthy(self, provider: str) -> bool:
status = self._status.get(provider, {"healthy": True})
# 不健康的 Provider,30 秒后自动恢复尝试
if not status.get("healthy") and datetime.now() - status["last_check"] > timedelta(seconds=30):
status["healthy"] = True
return status.get("healthy", True)
health_checker = HealthChecker()💡 多 Key 轮询是突破 RPM 限制的常见手段——OpenAI 单 Key 500 RPM,3 个 Key 轮询就是 1500 RPM。网关统一管理 Key 池,业务代码无感知。
第 4 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| 规则路由 | 按请求特征匹配到指定模型 |
| 成本路由 | 同等能力下优先用便宜模型 |
| Key 池 | 多 Key 轮询突破单 Key 限额 |
| 健康检查 | 连续失败 3 次自动摘除,30s 后恢复 |
5. 降级与容错:模型挂了怎么办
5.1 Fallback 链:自动降级到备用模型
python
async def chat_with_fallback(request: ChatRequest) -> ChatResponse:
"""带 Fallback 的调用"""
model_config = resolve_model(request.model)
fallback_chain = [model_config["model"]] + model_config.get("fallback", [])
last_error = None
for model_name in fallback_chain:
provider_name = get_provider_name(model_name)
if not health_checker.is_healthy(provider_name):
continue
try:
provider = get_provider(provider_name)
config = {"model": model_name}
response = await asyncio.wait_for(
provider.chat(request, config),
timeout=30,
)
health_checker.record_success(provider_name)
return response
except Exception as e:
last_error = e
health_checker.record_failure(provider_name)
logger.warning(f"模型 {model_name} 调用失败: {e},尝试降级...")
raise RuntimeError(f"所有模型均不可用。最后错误: {last_error}")5.2 超时控制与快速失败
python
# 不同模型的超时配置
TIMEOUT_CONFIG = {
"gpt-4o": 60,
"gpt-4o-mini": 30,
"claude-3-5-sonnet": 60,
"deepseek-chat": 45,
}
async def call_with_timeout(provider, request, config):
timeout = TIMEOUT_CONFIG.get(config["model"], 30)
try:
return await asyncio.wait_for(provider.chat(request, config), timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"模型 {config['model']} 响应超时({timeout}s)")5.3 熔断器模式:连续失败后暂停调用
python
from enum import Enum
class CircuitState(str, Enum):
CLOSED = "closed" # 正常:允许调用
OPEN = "open" # 熔断:拒绝调用
HALF_OPEN = "half_open" # 半开:试探性调用
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None
def can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
return True # HALF_OPEN: 允许试探
def record_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN5.4 降级告警与通知
python
async def notify_fallback(original_model: str, fallback_model: str, error: str):
"""降级时发送告警"""
message = f"⚠️ 模型降级告警\n原模型: {original_model}\n降级到: {fallback_model}\n原因: {error}"
# 发送到飞书/Slack/钉钉
await send_webhook(os.getenv("ALERT_WEBHOOK_URL"), {"text": message})💡 Fallback 链 + 熔断器是生产必备——GPT-4o 偶尔会抖动或限流,没有自动降级的系统就是在裸奔。降级链建议:同级替代 > 降级到更弱模型 > 返回缓存。
第 5 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| Fallback 链 | 主模型失败 → 依次尝试备用模型 |
| 熔断器 | 连续 5 次失败 → 暂停 60s → 半开试探 |
| 超时配置 | 不同模型不同超时,快速失败进入降级 |
6. 速率限制与配额管理
6.1 限流策略:RPM / TPM / 并发数
| 限流维度 | 说明 | 典型值 |
|---|---|---|
| RPM | 每分钟请求数 | 500 |
| TPM | 每分钟 Token 数 | 200,000 |
| 并发数 | 同时进行的请求 | 50 |
| 日配额 | 每天的 Token 总量 | 10,000,000 |
6.2 令牌桶算法实现
python
import time
import asyncio
class TokenBucket:
"""令牌桶限流器"""
def __init__(self, rate: int, capacity: int):
self.rate = rate # 每秒填充速率
self.capacity = capacity # 桶容量
self.tokens = capacity
self.last_refill = time.time()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> bool:
async with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
# 每分钟 500 请求 = 每秒 ~8.3 个
rpm_limiter = TokenBucket(rate=500/60, capacity=500)6.3 基于用户的配额管理
python
class QuotaManager:
"""用户配额管理"""
def __init__(self):
self._usage: dict[str, dict] = {} # user_id → {tokens_today, requests_today}
async def check_quota(self, user_id: str, estimated_tokens: int = 0) -> bool:
usage = self._usage.get(user_id, {"tokens": 0, "requests": 0})
quota = await self._get_user_quota(user_id)
if usage["tokens"] + estimated_tokens > quota["daily_tokens"]:
return False
if usage["requests"] >= quota["daily_requests"]:
return False
return True
async def record_usage(self, user_id: str, tokens: int):
if user_id not in self._usage:
self._usage[user_id] = {"tokens": 0, "requests": 0}
self._usage[user_id]["tokens"] += tokens
self._usage[user_id]["requests"] += 1
async def _get_user_quota(self, user_id: str) -> dict:
# 从数据库或配置获取用户配额
return {"daily_tokens": 1_000_000, "daily_requests": 1000}6.4 Redis 分布式限流
python
import redis.asyncio as redis
class RedisRateLimiter:
"""基于 Redis 的分布式限流(滑动窗口)"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def is_allowed(self, key: str, limit: int, window: int = 60) -> bool:
pipe = self.redis.pipeline()
now = time.time()
window_start = now - window
# 滑动窗口:删除过期记录,统计当前窗口内的请求数
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, window)
results = await pipe.execute()
current_count = results[1]
return current_count < limit
# 使用
limiter = RedisRateLimiter(redis.from_url("redis://localhost:6379"))
allowed = await limiter.is_allowed(f"rate:{user_id}:rpm", limit=100, window=60)💡 单机用令牌桶,分布式用 Redis 滑动窗口——令牌桶简单高效但仅限单进程;生产环境多实例部署必须用 Redis 做分布式限流。
第 6 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| RPM/TPM | 按请求数/Token 数限流 |
| 令牌桶 | 匀速填充令牌,突发可用桶中存量 |
| 用户配额 | 按用户维度统计和限制 |
| Redis 限流 | 滑动窗口,支持分布式多实例 |
7. 日志、计费与可观测性
7.1 请求日志:记录每次调用的完整信息
python
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class RequestLog:
request_id: str
user_id: str
model_requested: str # 用户请求的虚拟模型名
model_actual: str # 实际使用的模型
provider: str
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
latency_ms: int = 0
status: str = "success" # success / failed / timeout / rate_limited
error: str = ""
timestamp: datetime = field(default_factory=datetime.now)
class RequestLogger:
def __init__(self):
self._logs: list[RequestLog] = []
def log(self, entry: RequestLog):
self._logs.append(entry)
# 异步写入数据库
asyncio.create_task(self._save_to_db(entry))
async def _save_to_db(self, entry: RequestLog):
await db.execute(
"INSERT INTO request_logs (request_id, user_id, model_requested, ...) VALUES ($1, $2, ...)",
entry.request_id, entry.user_id, entry.model_requested, ...
)7.2 Token 计费:按用量统计成本
python
def calculate_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float:
"""计算单次调用成本(美元)"""
pricing = MODEL_PRICING.get(model, {"input": 1.0, "output": 3.0})
cost = (prompt_tokens * pricing["input"] + completion_tokens * pricing["output"]) / 1_000_000
return round(cost, 6)
# 示例
# GPT-4o: 1000 prompt + 500 completion
# = (1000 * 2.50 + 500 * 10.00) / 1M = $0.0075
# DeepSeek: 同样用量
# = (1000 * 0.14 + 500 * 0.28) / 1M = $0.000287.3 用量仪表盘:按模型/用户/时间维度
sql
-- 按模型统计今日用量
SELECT model_actual,
COUNT(*) as requests,
SUM(total_tokens) as total_tokens,
SUM(cost) as total_cost,
AVG(latency_ms) as avg_latency
FROM request_logs
WHERE timestamp >= CURRENT_DATE
GROUP BY model_actual
ORDER BY total_cost DESC;
-- 按用户统计月度用量
SELECT user_id,
COUNT(*) as requests,
SUM(total_tokens) as tokens,
SUM(cost) as cost
FROM request_logs
WHERE timestamp >= DATE_TRUNC('month', CURRENT_DATE)
GROUP BY user_id
ORDER BY cost DESC;7.4 Prometheus 指标与告警
python
from prometheus_client import Counter, Histogram, Gauge
# 请求计数
request_counter = Counter(
"llm_gateway_requests_total",
"Total LLM requests",
["model", "provider", "status"]
)
# 延迟直方图
latency_histogram = Histogram(
"llm_gateway_latency_seconds",
"Request latency",
["model", "provider"]
)
# Token 用量
token_counter = Counter(
"llm_gateway_tokens_total",
"Total tokens consumed",
["model", "type"] # type: prompt / completion
)
# 在请求处理中记录
def record_metrics(log: RequestLog):
request_counter.labels(model=log.model_actual, provider=log.provider, status=log.status).inc()
latency_histogram.labels(model=log.model_actual, provider=log.provider).observe(log.latency_ms / 1000)
token_counter.labels(model=log.model_actual, type="prompt").inc(log.prompt_tokens)
token_counter.labels(model=log.model_actual, type="completion").inc(log.completion_tokens)💡 计费是网关的"最后一公里"——知道每个用户、每个模型花了多少钱,才能做成本优化。DeepSeek 同样质量的输出成本只有 GPT-4o 的 1/10,这种差异只有通过计费才能量化。
第 7 章核心知识回顾:
| 概念 | 一句话解释 |
|---|---|
| 请求日志 | 每次调用记录模型/Token/延迟/状态 |
| Token 计费 | 按模型定价 × 用量 = 成本 |
| Prometheus | Counter/Histogram 暴露指标,Grafana 看板 |
8. 完整实现与生产部署
8.1 项目结构与代码组织
llm-gateway/
├── app/
│ ├── main.py # FastAPI 入口
│ ├── models.py # Pydantic 数据模型
│ ├── config.py # 配置加载
│ ├── providers/ # 模型适配层
│ │ ├── base.py # Provider 抽象基类
│ │ ├── openai.py
│ │ ├── claude.py
│ │ └── deepseek.py
│ ├── routing/ # 路由层
│ │ ├── router.py
│ │ ├── fallback.py
│ │ └── health.py
│ ├── middleware/ # 中间件
│ │ ├── rate_limit.py
│ │ ├── auth.py
│ │ └── logging.py
│ └── billing/ # 计费
│ ├── calculator.py
│ └── quota.py
├── config/
│ └── gateway.yaml # 网关配置
├── Dockerfile
├── docker-compose.yaml
└── requirements.txt8.2 配置管理:YAML 定义模型与路由规则
yaml
# config/gateway.yaml
providers:
openai:
api_keys:
- key: "sk-key1"
rpm_limit: 500
- key: "sk-key2"
rpm_limit: 500
anthropic:
api_keys:
- key: "sk-ant-xxx"
rpm_limit: 1000
deepseek:
base_url: "https://api.deepseek.com"
api_keys:
- key: "sk-ds-xxx"
models:
fast:
provider: openai
model: gpt-4o-mini
fallback: [deepseek-chat, qwen-turbo]
timeout: 30
smart:
provider: openai
model: gpt-4o
fallback: [claude-3-5-sonnet, deepseek-chat]
timeout: 60
rate_limits:
default:
rpm: 100
tpm: 200000
daily_tokens: 50000008.3 Docker 部署与健康检查
dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]yaml
# docker-compose.yaml
services:
gateway:
build: .
ports: ["8000:8000"]
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
depends_on: [redis, postgres]
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
redis:
image: redis:7-alpine
ports: ["6379:6379"]
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: llm_gateway8.4 性能测试与优化
python
# 压测脚本
import asyncio
import httpx
import time
async def benchmark(url: str, n: int = 100, concurrency: int = 10):
semaphore = asyncio.Semaphore(concurrency)
results = []
async def send_one():
async with semaphore:
start = time.time()
async with httpx.AsyncClient() as client:
resp = await client.post(f"{url}/v1/chat/completions", json={
"model": "fast",
"messages": [{"role": "user", "content": "hello"}],
})
latency = (time.time() - start) * 1000
results.append({"status": resp.status_code, "latency": latency})
await asyncio.gather(*[send_one() for _ in range(n)])
avg = sum(r["latency"] for r in results) / len(results)
p95 = sorted(r["latency"] for r in results)[int(n * 0.95)]
print(f"总请求: {n}, 并发: {concurrency}")
print(f"平均延迟: {avg:.0f}ms, P95: {p95:.0f}ms")
print(f"成功率: {sum(1 for r in results if r['status'] == 200) / n * 100:.1f}%")附录:大模型网关速查手册
A.1 各模型 API 差异速查
| 模型 | base_url | 工具参数 | 流式格式 |
|---|---|---|---|
| OpenAI | api.openai.com | parameters | data: {chunk} |
| Claude | api.anthropic.com | input_schema | event: content_block_delta |
| DeepSeek | api.deepseek.com | parameters | 兼容 OpenAI |
| 通义 | dashscope.aliyuncs.com | parameters | 兼容 OpenAI |
| 豆包 | ark.cn-beijing.volces.com | parameters | 兼容 OpenAI |
A.2 网关配置 YAML 模板
yaml
providers:
openai:
api_keys: [{key: "sk-xxx", rpm_limit: 500}]
deepseek:
base_url: "https://api.deepseek.com"
api_keys: [{key: "sk-xxx"}]
models:
fast: {provider: openai, model: gpt-4o-mini, fallback: [deepseek-chat]}
smart: {provider: openai, model: gpt-4o, fallback: [claude-3-5-sonnet]}
rate_limits:
default: {rpm: 100, tpm: 200000}A.3 常见问题与排查指南
| 问题 | 可能原因 | 排查方法 |
|---|---|---|
| 429 Too Many Requests | Key 限流 | 检查 Key 池 RPM 用量 |
| 所有模型都超时 | 网络问题 | 检查 DNS 和代理 |
| Claude 参数错误 | 格式转换 bug | 检查 tools → input_schema |
| 计费不准 | 流式未统计 | 检查 stream 的 usage 收集 |
A.4 模型定价速查表
| 模型 | 输入($/1M tokens) | 输出($/1M tokens) |
|---|---|---|
| GPT-4o | $2.50 | $10.00 |
| GPT-4o-mini | $0.15 | $0.60 |
| Claude 3.5 Sonnet | $3.00 | $15.00 |
| DeepSeek Chat | $0.14 | $0.28 |
| 通义 Qwen-Turbo | $0.30 | $0.60 |