Skip to content

大模型网关设计与实现

构建统一的 LLM Gateway——多模型路由、智能降级、速率限流、Token 计费、请求日志,用 Python + FastAPI 实现一个生产级的大模型统一接入层。


1. 为什么需要大模型网关

1.1 直接调用的痛点:密钥分散、格式不统一、无法监控

直接调用多个模型 API 的五大痛点:

  ❌ 密钥分散
  ═══════════════════════════════════════
  OpenAI 一个 Key、Claude 一个 Key、豆包一个 Key
  散落在各个服务的环境变量里,无法统一管理

  ❌ 格式不统一
  ═══════════════════════════════════════
  OpenAI 用 tools,Claude 用 input_schema
  每换一个模型就要改代码

  ❌ 无法监控
  ═══════════════════════════════════════
  不知道哪个模型用了多少 Token
  不知道哪个用户花了多少钱
  
  ❌ 无法容错
  ═══════════════════════════════════════
  GPT-4o 挂了,整个服务就挂了
  没有自动降级和重试

  ❌ 无法限流
  ═══════════════════════════════════════
  某个用户疯狂调用,把配额打爆
  影响所有用户

1.2 网关解决什么:统一接口 + 路由 + 限流 + 日志

大模型网关的架构:

  业务服务 A ──┐
  业务服务 B ──┤──→ 🌐 LLM Gateway ──┬──→ OpenAI
  业务服务 C ──┘     │                ├──→ Claude
                     │                ├──→ Gemini
                     │                └──→ DeepSeek

                     ├── 统一 OpenAI 格式接口
                     ├── 智能路由 & 负载均衡
                     ├── 自动降级 & 熔断
                     ├── 速率限制 & 配额
                     └── 日志 & 计费 & 监控

1.3 开源方案对比:OneAPI / LiteLLM / OpenRouter

方案类型语言特点适用
OneAPI自部署Go可视化管理、渠道管理完善团队内部
LiteLLMSDK/代理Python100+ 模型适配、Proxy 模式Python 项目
OpenRouter云服务统一 API、按量付费不想自建
本教程自建Python学习原理、完全可控理解设计

1.4 核心架构:我们要构建什么

我们要实现的 LLM Gateway 模块:

  ┌─────────────────────────────────────────┐
  │              FastAPI 网关                 │
  ├──────────┬──────────┬──────────┬────────┤
  │ 统一接口  │ 路由层    │ 限流层    │ 日志层 │
  │ OpenAI   │ 模型路由  │ RPM/TPM  │ 请求记录│
  │ 格式适配  │ 负载均衡  │ 配额管理  │ Token  │
  │          │ 健康检查  │ 令牌桶   │ 计费   │
  ├──────────┴──────────┴──────────┴────────┤
  │           Provider 适配层                 │
  ├──────┬──────┬──────┬──────┬─────────────┤
  │OpenAI│Claude│Gemini│豆包  │ DeepSeek    │
  └──────┴──────┴──────┴──────┴─────────────┘

第 1 章核心知识回顾:

概念一句话解释
LLM Gateway大模型统一接入层,解决多模型管理问题
核心能力统一接口 + 路由 + 降级 + 限流 + 日志
OneAPIGo 实现的开源方案,可视化管理

2. 统一 API 接口设计:一套格式适配所有模型

2.1 以 OpenAI 格式为标准:为什么选它

理由说明
行业标准几乎所有模型都提供 OpenAI 兼容接口
生态成熟SDK、工具链都围绕 OpenAI 格式设计
业务无感业务代码不需要知道后端用的是哪个模型

2.2 统一请求/响应数据模型

python
from pydantic import BaseModel, Field
from typing import Any

class ChatMessage(BaseModel):
    role: str                          # system / user / assistant / tool
    content: str | list[dict] | None = None
    name: str | None = None
    tool_calls: list[dict] | None = None
    tool_call_id: str | None = None

class ChatRequest(BaseModel):
    """统一请求格式(OpenAI 兼容)"""
    model: str                         # 虚拟模型名,如 "fast" / "smart"
    messages: list[ChatMessage]
    temperature: float = 0.7
    max_tokens: int | None = None
    stream: bool = False
    tools: list[dict] | None = None
    tool_choice: str | dict | None = None
    
    # 网关扩展字段
    user_id: str | None = None         # 用于限流和计费
    metadata: dict[str, Any] = {}      # 业务元数据

class ChatResponse(BaseModel):
    """统一响应格式"""
    id: str
    model: str                         # 实际使用的模型名
    choices: list[dict]
    usage: dict                        # {"prompt_tokens": N, "completion_tokens": M, "total_tokens": T}
    
    # 网关扩展
    gateway_model: str = ""            # 虚拟模型名
    provider: str = ""                 # 实际 Provider
    latency_ms: int = 0                # 响应延迟

2.3 模型名映射:虚拟模型名 → 实际模型

python
# 虚拟模型名映射配置
MODEL_MAPPING = {
    # 虚拟名 → 实际模型配置
    "fast": {
        "provider": "openai",
        "model": "gpt-4o-mini",
        "fallback": ["deepseek-chat", "qwen-turbo"],
    },
    "smart": {
        "provider": "openai",
        "model": "gpt-4o",
        "fallback": ["claude-3-5-sonnet", "deepseek-chat"],
    },
    "cheap": {
        "provider": "deepseek",
        "model": "deepseek-chat",
        "fallback": ["qwen-turbo"],
    },
    # 也支持直接指定模型名
    "gpt-4o": {"provider": "openai", "model": "gpt-4o"},
    "claude-3-5-sonnet": {"provider": "anthropic", "model": "claude-3-5-sonnet-20241022"},
}

def resolve_model(virtual_name: str) -> dict:
    """解析虚拟模型名为实际模型配置"""
    config = MODEL_MAPPING.get(virtual_name)
    if not config:
        raise ValueError(f"未知模型: {virtual_name}")
    return config

2.4 流式与非流式的统一处理

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI(title="LLM Gateway")

@app.post("/v1/chat/completions")
async def chat_completions(request: ChatRequest):
    """统一入口:兼容 OpenAI 格式"""
    model_config = resolve_model(request.model)
    provider = get_provider(model_config["provider"])
    
    if request.stream:
        async def stream_generator():
            async for chunk in provider.chat_stream(request, model_config):
                yield f"data: {json.dumps(chunk)}\n\n"
            yield "data: [DONE]\n\n"
        
        return StreamingResponse(stream_generator(), media_type="text/event-stream")
    else:
        response = await provider.chat(request, model_config)
        return response

💡 虚拟模型名是网关的精髓——业务代码用 model="fast"model="smart",网关负责映射到实际模型。切换模型只需改网关配置,业务代码零改动。

第 2 章核心知识回顾:

概念一句话解释
OpenAI 格式行业标准,所有模型都能适配
虚拟模型名fast/smart/cheap 映射到实际模型
统一入口/v1/chat/completions 一个接口搞定

3. 多模型适配层:OpenAI / Claude / Gemini / 国产模型

3.1 Provider 抽象接口设计

python
from abc import ABC, abstractmethod

class LLMProvider(ABC):
    """大模型 Provider 抽象基类"""
    
    @abstractmethod
    async def chat(self, request: ChatRequest, config: dict) -> ChatResponse:
        """非流式调用"""
        pass
    
    @abstractmethod
    async def chat_stream(self, request: ChatRequest, config: dict):
        """流式调用,yield 标准 chunk"""
        pass
    
    @abstractmethod
    def is_healthy(self) -> bool:
        """健康检查"""
        pass

# Provider 注册表
PROVIDERS: dict[str, LLMProvider] = {}

def register_provider(name: str, provider: LLMProvider):
    PROVIDERS[name] = provider

def get_provider(name: str) -> LLMProvider:
    if name not in PROVIDERS:
        raise ValueError(f"未注册的 Provider: {name}")
    return PROVIDERS[name]

3.2 OpenAI Provider 实现

python
from openai import AsyncOpenAI
import time, uuid

class OpenAIProvider(LLMProvider):
    def __init__(self, api_key: str, base_url: str = None):
        self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
        self._healthy = True
    
    async def chat(self, request: ChatRequest, config: dict) -> ChatResponse:
        start = time.time()
        response = await self.client.chat.completions.create(
            model=config["model"],
            messages=[m.model_dump(exclude_none=True) for m in request.messages],
            temperature=request.temperature,
            max_tokens=request.max_tokens,
            tools=request.tools,
        )
        latency = int((time.time() - start) * 1000)
        
        return ChatResponse(
            id=response.id,
            model=response.model,
            choices=[c.model_dump() for c in response.choices],
            usage=response.usage.model_dump(),
            provider="openai",
            latency_ms=latency,
        )
    
    async def chat_stream(self, request: ChatRequest, config: dict):
        stream = await self.client.chat.completions.create(
            model=config["model"],
            messages=[m.model_dump(exclude_none=True) for m in request.messages],
            temperature=request.temperature,
            stream=True,
        )
        async for chunk in stream:
            yield chunk.model_dump()
    
    def is_healthy(self) -> bool:
        return self._healthy

3.3 Claude Provider:格式转换与适配

python
import anthropic

class ClaudeProvider(LLMProvider):
    def __init__(self, api_key: str):
        self.client = anthropic.AsyncAnthropic(api_key=api_key)
    
    async def chat(self, request: ChatRequest, config: dict) -> ChatResponse:
        # 格式转换:OpenAI → Claude
        system_msg = next((m.content for m in request.messages if m.role == "system"), None)
        messages = [{"role": m.role, "content": m.content}
                    for m in request.messages if m.role != "system"]
        
        # tools 转换:parameters → input_schema
        tools = None
        if request.tools:
            tools = [{
                "name": t["function"]["name"],
                "description": t["function"].get("description", ""),
                "input_schema": t["function"]["parameters"],
            } for t in request.tools]
        
        response = await self.client.messages.create(
            model=config["model"],
            system=system_msg or "",
            messages=messages,
            max_tokens=request.max_tokens or 4096,
            tools=tools,
        )
        
        # 响应归一化:Claude → OpenAI 格式
        return self._normalize_response(response)
    
    def _normalize_response(self, response) -> ChatResponse:
        """Claude 响应转 OpenAI 格式"""
        content = ""
        tool_calls = []
        for block in response.content:
            if block.type == "text":
                content = block.text
            elif block.type == "tool_use":
                tool_calls.append({
                    "id": block.id, "type": "function",
                    "function": {"name": block.name, "arguments": json.dumps(block.input)}
                })
        
        return ChatResponse(
            id=response.id, model=response.model,
            choices=[{"message": {"role": "assistant", "content": content,
                                   "tool_calls": tool_calls or None}}],
            usage={"prompt_tokens": response.usage.input_tokens,
                   "completion_tokens": response.usage.output_tokens,
                   "total_tokens": response.usage.input_tokens + response.usage.output_tokens},
            provider="anthropic",
        )

3.4 国产模型适配:豆包 / 通义 / DeepSeek

python
class DeepSeekProvider(LLMProvider):
    """DeepSeek:完全兼容 OpenAI 格式"""
    def __init__(self, api_key: str):
        self.client = AsyncOpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com",  # 换个 base_url 就行
        )
    
    async def chat(self, request: ChatRequest, config: dict) -> ChatResponse:
        # 和 OpenAI 一模一样!
        response = await self.client.chat.completions.create(
            model=config["model"],
            messages=[m.model_dump(exclude_none=True) for m in request.messages],
            temperature=request.temperature,
        )
        return ChatResponse(
            id=response.id, model=response.model,
            choices=[c.model_dump() for c in response.choices],
            usage=response.usage.model_dump(),
            provider="deepseek",
        )

# 注册所有 Provider
register_provider("openai", OpenAIProvider(api_key=os.getenv("OPENAI_API_KEY")))
register_provider("anthropic", ClaudeProvider(api_key=os.getenv("ANTHROPIC_API_KEY")))
register_provider("deepseek", DeepSeekProvider(api_key=os.getenv("DEEPSEEK_API_KEY")))

💡 DeepSeek/通义/豆包 都兼容 OpenAI 格式——只需换 base_url 就行。真正需要适配的只有 Claude(input_schema)和 Gemini(大写类型)。

第 3 章核心知识回顾:

Provider适配难度关键差异
OpenAI⭐ 无需适配标准格式
DeepSeek/通义⭐ 换 base_url兼容 OpenAI
Claude⭐⭐⭐ 需转换input_schema + content blocks
Gemini⭐⭐⭐ 需转换大写类型 + 不同响应结构

4. 智能路由:模型选择与负载均衡

4.1 基于规则的路由:场景 → 模型映射

python
class RoutingRule(BaseModel):
    """路由规则"""
    pattern: str               # 匹配模式
    target_model: str          # 目标模型
    priority: int = 0          # 优先级

class Router:
    def __init__(self, rules: list[RoutingRule] = None):
        self.rules = sorted(rules or [], key=lambda r: r.priority, reverse=True)
    
    def route(self, request: ChatRequest) -> dict:
        # 1. 先尝试规则匹配
        for rule in self.rules:
            if self._match(rule, request):
                return resolve_model(rule.target_model)
        
        # 2. 默认按模型名解析
        return resolve_model(request.model)

4.2 基于成本的路由:便宜模型优先

python
# 模型定价(每百万 Token)
MODEL_PRICING = {
    "gpt-4o":        {"input": 2.50, "output": 10.00},
    "gpt-4o-mini":   {"input": 0.15, "output": 0.60},
    "claude-3-5":    {"input": 3.00, "output": 15.00},
    "deepseek-chat": {"input": 0.14, "output": 0.28},
    "qwen-turbo":    {"input": 0.30, "output": 0.60},
}

def route_by_cost(request: ChatRequest, candidates: list[str]) -> str:
    """按成本排序,优先使用便宜模型"""
    available = [m for m in candidates if get_provider_for(m).is_healthy()]
    return min(available, key=lambda m: MODEL_PRICING[m]["input"])

4.3 负载均衡:同模型多 Key 轮询

python
import itertools

class KeyPool:
    """API Key 池:支持轮询和加权"""
    
    def __init__(self, keys: list[dict]):
        # [{"key": "sk-xxx", "weight": 1, "rpm_limit": 500}, ...]
        self.keys = keys
        self._cycle = itertools.cycle(range(len(keys)))
        self._usage = {k["key"]: 0 for k in keys}
    
    def get_next_key(self) -> str:
        """轮询获取下一个可用 Key"""
        for _ in range(len(self.keys)):
            idx = next(self._cycle)
            key_config = self.keys[idx]
            if self._usage[key_config["key"]] < key_config.get("rpm_limit", 9999):
                self._usage[key_config["key"]] += 1
                return key_config["key"]
        raise RuntimeError("所有 API Key 均已达到限额")

# 使用
openai_keys = KeyPool([
    {"key": "sk-key1", "rpm_limit": 500},
    {"key": "sk-key2", "rpm_limit": 500},
    {"key": "sk-key3", "rpm_limit": 1000},
])

4.4 健康检查与故障自动摘除

python
import asyncio
from datetime import datetime, timedelta

class HealthChecker:
    """Provider 健康检查"""
    
    def __init__(self):
        self._status: dict[str, dict] = {}  # provider → {healthy, last_check, fail_count}
    
    def record_success(self, provider: str):
        self._status[provider] = {"healthy": True, "fail_count": 0, "last_check": datetime.now()}
    
    def record_failure(self, provider: str):
        status = self._status.get(provider, {"fail_count": 0})
        status["fail_count"] = status.get("fail_count", 0) + 1
        status["last_check"] = datetime.now()
        
        # 连续失败 3 次,标记为不健康
        if status["fail_count"] >= 3:
            status["healthy"] = False
        self._status[provider] = status
    
    def is_healthy(self, provider: str) -> bool:
        status = self._status.get(provider, {"healthy": True})
        # 不健康的 Provider,30 秒后自动恢复尝试
        if not status.get("healthy") and datetime.now() - status["last_check"] > timedelta(seconds=30):
            status["healthy"] = True
        return status.get("healthy", True)

health_checker = HealthChecker()

💡 多 Key 轮询是突破 RPM 限制的常见手段——OpenAI 单 Key 500 RPM,3 个 Key 轮询就是 1500 RPM。网关统一管理 Key 池,业务代码无感知。

第 4 章核心知识回顾:

概念一句话解释
规则路由按请求特征匹配到指定模型
成本路由同等能力下优先用便宜模型
Key 池多 Key 轮询突破单 Key 限额
健康检查连续失败 3 次自动摘除,30s 后恢复

5. 降级与容错:模型挂了怎么办

5.1 Fallback 链:自动降级到备用模型

python
async def chat_with_fallback(request: ChatRequest) -> ChatResponse:
    """带 Fallback 的调用"""
    model_config = resolve_model(request.model)
    fallback_chain = [model_config["model"]] + model_config.get("fallback", [])
    
    last_error = None
    for model_name in fallback_chain:
        provider_name = get_provider_name(model_name)
        
        if not health_checker.is_healthy(provider_name):
            continue
        
        try:
            provider = get_provider(provider_name)
            config = {"model": model_name}
            response = await asyncio.wait_for(
                provider.chat(request, config),
                timeout=30,
            )
            health_checker.record_success(provider_name)
            return response
        except Exception as e:
            last_error = e
            health_checker.record_failure(provider_name)
            logger.warning(f"模型 {model_name} 调用失败: {e},尝试降级...")
    
    raise RuntimeError(f"所有模型均不可用。最后错误: {last_error}")

5.2 超时控制与快速失败

python
# 不同模型的超时配置
TIMEOUT_CONFIG = {
    "gpt-4o": 60,
    "gpt-4o-mini": 30,
    "claude-3-5-sonnet": 60,
    "deepseek-chat": 45,
}

async def call_with_timeout(provider, request, config):
    timeout = TIMEOUT_CONFIG.get(config["model"], 30)
    try:
        return await asyncio.wait_for(provider.chat(request, config), timeout=timeout)
    except asyncio.TimeoutError:
        raise TimeoutError(f"模型 {config['model']} 响应超时({timeout}s)")

5.3 熔断器模式:连续失败后暂停调用

python
from enum import Enum

class CircuitState(str, Enum):
    CLOSED = "closed"        # 正常:允许调用
    OPEN = "open"            # 熔断:拒绝调用
    HALF_OPEN = "half_open"  # 半开:试探性调用

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = None
    
    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                return True
            return False
        return True  # HALF_OPEN: 允许试探
    
    def record_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

5.4 降级告警与通知

python
async def notify_fallback(original_model: str, fallback_model: str, error: str):
    """降级时发送告警"""
    message = f"⚠️ 模型降级告警\n原模型: {original_model}\n降级到: {fallback_model}\n原因: {error}"
    
    # 发送到飞书/Slack/钉钉
    await send_webhook(os.getenv("ALERT_WEBHOOK_URL"), {"text": message})

💡 Fallback 链 + 熔断器是生产必备——GPT-4o 偶尔会抖动或限流,没有自动降级的系统就是在裸奔。降级链建议:同级替代 > 降级到更弱模型 > 返回缓存。

第 5 章核心知识回顾:

概念一句话解释
Fallback 链主模型失败 → 依次尝试备用模型
熔断器连续 5 次失败 → 暂停 60s → 半开试探
超时配置不同模型不同超时,快速失败进入降级

6. 速率限制与配额管理

6.1 限流策略:RPM / TPM / 并发数

限流维度说明典型值
RPM每分钟请求数500
TPM每分钟 Token 数200,000
并发数同时进行的请求50
日配额每天的 Token 总量10,000,000

6.2 令牌桶算法实现

python
import time
import asyncio

class TokenBucket:
    """令牌桶限流器"""
    
    def __init__(self, rate: int, capacity: int):
        self.rate = rate            # 每秒填充速率
        self.capacity = capacity    # 桶容量
        self.tokens = capacity
        self.last_refill = time.time()
        self._lock = asyncio.Lock()
    
    async def acquire(self, tokens: int = 1) -> bool:
        async with self._lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
        self.last_refill = now

# 每分钟 500 请求 = 每秒 ~8.3 个
rpm_limiter = TokenBucket(rate=500/60, capacity=500)

6.3 基于用户的配额管理

python
class QuotaManager:
    """用户配额管理"""
    
    def __init__(self):
        self._usage: dict[str, dict] = {}  # user_id → {tokens_today, requests_today}
    
    async def check_quota(self, user_id: str, estimated_tokens: int = 0) -> bool:
        usage = self._usage.get(user_id, {"tokens": 0, "requests": 0})
        quota = await self._get_user_quota(user_id)
        
        if usage["tokens"] + estimated_tokens > quota["daily_tokens"]:
            return False
        if usage["requests"] >= quota["daily_requests"]:
            return False
        return True
    
    async def record_usage(self, user_id: str, tokens: int):
        if user_id not in self._usage:
            self._usage[user_id] = {"tokens": 0, "requests": 0}
        self._usage[user_id]["tokens"] += tokens
        self._usage[user_id]["requests"] += 1
    
    async def _get_user_quota(self, user_id: str) -> dict:
        # 从数据库或配置获取用户配额
        return {"daily_tokens": 1_000_000, "daily_requests": 1000}

6.4 Redis 分布式限流

python
import redis.asyncio as redis

class RedisRateLimiter:
    """基于 Redis 的分布式限流(滑动窗口)"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    async def is_allowed(self, key: str, limit: int, window: int = 60) -> bool:
        pipe = self.redis.pipeline()
        now = time.time()
        window_start = now - window
        
        # 滑动窗口:删除过期记录,统计当前窗口内的请求数
        pipe.zremrangebyscore(key, 0, window_start)
        pipe.zcard(key)
        pipe.zadd(key, {str(now): now})
        pipe.expire(key, window)
        
        results = await pipe.execute()
        current_count = results[1]
        
        return current_count < limit

# 使用
limiter = RedisRateLimiter(redis.from_url("redis://localhost:6379"))
allowed = await limiter.is_allowed(f"rate:{user_id}:rpm", limit=100, window=60)

💡 单机用令牌桶,分布式用 Redis 滑动窗口——令牌桶简单高效但仅限单进程;生产环境多实例部署必须用 Redis 做分布式限流。

第 6 章核心知识回顾:

概念一句话解释
RPM/TPM按请求数/Token 数限流
令牌桶匀速填充令牌,突发可用桶中存量
用户配额按用户维度统计和限制
Redis 限流滑动窗口,支持分布式多实例

7. 日志、计费与可观测性

7.1 请求日志:记录每次调用的完整信息

python
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class RequestLog:
    request_id: str
    user_id: str
    model_requested: str         # 用户请求的虚拟模型名
    model_actual: str            # 实际使用的模型
    provider: str
    prompt_tokens: int = 0
    completion_tokens: int = 0
    total_tokens: int = 0
    latency_ms: int = 0
    status: str = "success"      # success / failed / timeout / rate_limited
    error: str = ""
    timestamp: datetime = field(default_factory=datetime.now)

class RequestLogger:
    def __init__(self):
        self._logs: list[RequestLog] = []
    
    def log(self, entry: RequestLog):
        self._logs.append(entry)
        # 异步写入数据库
        asyncio.create_task(self._save_to_db(entry))
    
    async def _save_to_db(self, entry: RequestLog):
        await db.execute(
            "INSERT INTO request_logs (request_id, user_id, model_requested, ...) VALUES ($1, $2, ...)",
            entry.request_id, entry.user_id, entry.model_requested, ...
        )

7.2 Token 计费:按用量统计成本

python
def calculate_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float:
    """计算单次调用成本(美元)"""
    pricing = MODEL_PRICING.get(model, {"input": 1.0, "output": 3.0})
    cost = (prompt_tokens * pricing["input"] + completion_tokens * pricing["output"]) / 1_000_000
    return round(cost, 6)

# 示例
# GPT-4o: 1000 prompt + 500 completion
# = (1000 * 2.50 + 500 * 10.00) / 1M = $0.0075

# DeepSeek: 同样用量
# = (1000 * 0.14 + 500 * 0.28) / 1M = $0.00028

7.3 用量仪表盘:按模型/用户/时间维度

sql
-- 按模型统计今日用量
SELECT model_actual, 
       COUNT(*) as requests,
       SUM(total_tokens) as total_tokens,
       SUM(cost) as total_cost,
       AVG(latency_ms) as avg_latency
FROM request_logs
WHERE timestamp >= CURRENT_DATE
GROUP BY model_actual
ORDER BY total_cost DESC;

-- 按用户统计月度用量
SELECT user_id,
       COUNT(*) as requests,
       SUM(total_tokens) as tokens,
       SUM(cost) as cost
FROM request_logs
WHERE timestamp >= DATE_TRUNC('month', CURRENT_DATE)
GROUP BY user_id
ORDER BY cost DESC;

7.4 Prometheus 指标与告警

python
from prometheus_client import Counter, Histogram, Gauge

# 请求计数
request_counter = Counter(
    "llm_gateway_requests_total",
    "Total LLM requests",
    ["model", "provider", "status"]
)

# 延迟直方图
latency_histogram = Histogram(
    "llm_gateway_latency_seconds",
    "Request latency",
    ["model", "provider"]
)

# Token 用量
token_counter = Counter(
    "llm_gateway_tokens_total",
    "Total tokens consumed",
    ["model", "type"]  # type: prompt / completion
)

# 在请求处理中记录
def record_metrics(log: RequestLog):
    request_counter.labels(model=log.model_actual, provider=log.provider, status=log.status).inc()
    latency_histogram.labels(model=log.model_actual, provider=log.provider).observe(log.latency_ms / 1000)
    token_counter.labels(model=log.model_actual, type="prompt").inc(log.prompt_tokens)
    token_counter.labels(model=log.model_actual, type="completion").inc(log.completion_tokens)

💡 计费是网关的"最后一公里"——知道每个用户、每个模型花了多少钱,才能做成本优化。DeepSeek 同样质量的输出成本只有 GPT-4o 的 1/10,这种差异只有通过计费才能量化。

第 7 章核心知识回顾:

概念一句话解释
请求日志每次调用记录模型/Token/延迟/状态
Token 计费按模型定价 × 用量 = 成本
PrometheusCounter/Histogram 暴露指标,Grafana 看板

8. 完整实现与生产部署

8.1 项目结构与代码组织

llm-gateway/
├── app/
│   ├── main.py              # FastAPI 入口
│   ├── models.py            # Pydantic 数据模型
│   ├── config.py            # 配置加载
│   ├── providers/           # 模型适配层
│   │   ├── base.py          # Provider 抽象基类
│   │   ├── openai.py
│   │   ├── claude.py
│   │   └── deepseek.py
│   ├── routing/             # 路由层
│   │   ├── router.py
│   │   ├── fallback.py
│   │   └── health.py
│   ├── middleware/           # 中间件
│   │   ├── rate_limit.py
│   │   ├── auth.py
│   │   └── logging.py
│   └── billing/             # 计费
│       ├── calculator.py
│       └── quota.py
├── config/
│   └── gateway.yaml         # 网关配置
├── Dockerfile
├── docker-compose.yaml
└── requirements.txt

8.2 配置管理:YAML 定义模型与路由规则

yaml
# config/gateway.yaml
providers:
  openai:
    api_keys:
      - key: "sk-key1"
        rpm_limit: 500
      - key: "sk-key2"
        rpm_limit: 500

  anthropic:
    api_keys:
      - key: "sk-ant-xxx"
        rpm_limit: 1000

  deepseek:
    base_url: "https://api.deepseek.com"
    api_keys:
      - key: "sk-ds-xxx"

models:
  fast:
    provider: openai
    model: gpt-4o-mini
    fallback: [deepseek-chat, qwen-turbo]
    timeout: 30
  
  smart:
    provider: openai
    model: gpt-4o
    fallback: [claude-3-5-sonnet, deepseek-chat]
    timeout: 60

rate_limits:
  default:
    rpm: 100
    tpm: 200000
    daily_tokens: 5000000

8.3 Docker 部署与健康检查

dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
yaml
# docker-compose.yaml
services:
  gateway:
    build: .
    ports: ["8000:8000"]
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
    depends_on: [redis, postgres]
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
  
  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
  
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: llm_gateway

8.4 性能测试与优化

python
# 压测脚本
import asyncio
import httpx
import time

async def benchmark(url: str, n: int = 100, concurrency: int = 10):
    semaphore = asyncio.Semaphore(concurrency)
    results = []
    
    async def send_one():
        async with semaphore:
            start = time.time()
            async with httpx.AsyncClient() as client:
                resp = await client.post(f"{url}/v1/chat/completions", json={
                    "model": "fast",
                    "messages": [{"role": "user", "content": "hello"}],
                })
            latency = (time.time() - start) * 1000
            results.append({"status": resp.status_code, "latency": latency})
    
    await asyncio.gather(*[send_one() for _ in range(n)])
    
    avg = sum(r["latency"] for r in results) / len(results)
    p95 = sorted(r["latency"] for r in results)[int(n * 0.95)]
    print(f"总请求: {n}, 并发: {concurrency}")
    print(f"平均延迟: {avg:.0f}ms, P95: {p95:.0f}ms")
    print(f"成功率: {sum(1 for r in results if r['status'] == 200) / n * 100:.1f}%")

附录:大模型网关速查手册

A.1 各模型 API 差异速查

模型base_url工具参数流式格式
OpenAIapi.openai.comparametersdata: {chunk}
Claudeapi.anthropic.cominput_schemaevent: content_block_delta
DeepSeekapi.deepseek.comparameters兼容 OpenAI
通义dashscope.aliyuncs.comparameters兼容 OpenAI
豆包ark.cn-beijing.volces.comparameters兼容 OpenAI

A.2 网关配置 YAML 模板

yaml
providers:
  openai:
    api_keys: [{key: "sk-xxx", rpm_limit: 500}]
  deepseek:
    base_url: "https://api.deepseek.com"
    api_keys: [{key: "sk-xxx"}]

models:
  fast: {provider: openai, model: gpt-4o-mini, fallback: [deepseek-chat]}
  smart: {provider: openai, model: gpt-4o, fallback: [claude-3-5-sonnet]}

rate_limits:
  default: {rpm: 100, tpm: 200000}

A.3 常见问题与排查指南

问题可能原因排查方法
429 Too Many RequestsKey 限流检查 Key 池 RPM 用量
所有模型都超时网络问题检查 DNS 和代理
Claude 参数错误格式转换 bug检查 tools → input_schema
计费不准流式未统计检查 stream 的 usage 收集

A.4 模型定价速查表

模型输入($/1M tokens)输出($/1M tokens)
GPT-4o$2.50$10.00
GPT-4o-mini$0.15$0.60
Claude 3.5 Sonnet$3.00$15.00
DeepSeek Chat$0.14$0.28
通义 Qwen-Turbo$0.30$0.60

坚持是一种品格