7.1 后端架构设计(API 服务设计)
AI 应用后端与传统 Web 服务的核心差异:响应时间长(秒级)、输出流式、上游 API 不稳定、成本随请求量线性增长。
LLM 应用的后端与传统 Web 服务有显著差异:响应时间长(秒级甚至分钟级)、输出是流式的、上游 API 不稳定、成本随请求量线性增长。本节以 FastAPI 为核心框架,覆盖从单机服务到生产级多模型路由的完整工程实践。
7.1.1 LLM 应用架构模式
三种响应模式对比
| 模式 | 适用场景 | 首字延迟 | 用户体验 | 实现复杂度 |
|---|---|---|---|---|
| 同步(Sync) | 短文本分类、结构化提取 | 高(等全量响应) | 差(等待转圈) | 低 |
| 异步任务(Async Task) | 长文生成、报告分析 | 立即返回任务ID | 中(轮询进度) | 中 |
| 流式(Streaming SSE/WS) | 对话、实时写作 | 极低(毫秒级首字) | 优(打字效果) | 中 |
1. FastAPI 项目结构
ai_backend/
├── main.py # 应用入口
├── api/
│ ├── chat.py # 对话接口
│ ├── completions.py # 文本补全接口
│ └── tasks.py # 异步任务接口
├── services/
│ ├── llm_router.py # 模型路由层
│ ├── rate_limiter.py # 限流服务
│ └── cache.py # 缓存服务
├── models/
│ └── schemas.py # Pydantic 数据模型
├── middleware/
│ └── auth.py # 认证中间件
└── config.py # 配置管理2. 同步响应接口
python
# pip install fastapi uvicorn openai pydantic
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from openai import OpenAI
from typing import Optional
import time
app = FastAPI(title="LLM API Service", version="1.0.0")
client = OpenAI()
class ChatRequest(BaseModel):
message: str = Field(..., min_length=1, max_length=4000)
model: str = Field(default="gpt-4o-mini")
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
max_tokens: int = Field(default=1024, ge=1, le=4096)
system_prompt: Optional[str] = None
class ChatResponse(BaseModel):
content: str
model: str
usage: dict
latency_ms: int
@app.post("/v1/chat", response_model=ChatResponse)
async def chat_sync(request: ChatRequest):
"""同步对话接口(适合短文本,响应完整后返回)"""
start = time.time()
messages = []
if request.system_prompt:
messages.append({"role": "system", "content": request.system_prompt})
messages.append({"role": "user", "content": request.message})
try:
response = client.chat.completions.create(
model=request.model,
messages=messages,
temperature=request.temperature,
max_tokens=request.max_tokens
)
return ChatResponse(
content=response.choices[0].message.content,
model=response.model,
usage=response.usage.model_dump(),
latency_ms=int((time.time() - start) * 1000)
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))3. 异步任务模式(Celery + Redis)
python
# pip install celery redis
from celery import Celery
from openai import OpenAI
import redis
import uuid
celery_app = Celery("llm_tasks", broker="redis://localhost:6379/0")
redis_client = redis.Redis(host="localhost", port=6379, db=1)
openai_client = OpenAI()
@celery_app.task(bind=True, max_retries=3)
def generate_long_content(self, task_id: str, prompt: str, system_prompt: str = ""):
"""异步长文生成任务"""
try:
redis_client.hset(f"task:{task_id}", mapping={"status": "processing", "progress": 0})
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
response = openai_client.chat.completions.create(
model="gpt-4o", messages=messages, max_tokens=4096
)
redis_client.hset(f"task:{task_id}", mapping={
"status": "completed",
"result": response.choices[0].message.content,
"progress": 100
})
redis_client.expire(f"task:{task_id}", 3600)
except Exception as exc:
redis_client.hset(f"task:{task_id}", mapping={"status": "failed", "error": str(exc)})
raise self.retry(exc=exc, countdown=5)
@app.post("/v1/tasks/generate")
async def submit_task(prompt: str, system_prompt: str = ""):
"""提交长文生成任务,立即返回 task_id"""
task_id = str(uuid.uuid4())
redis_client.hset(f"task:{task_id}", mapping={"status": "pending", "progress": 0})
generate_long_content.delay(task_id, prompt, system_prompt)
return {"task_id": task_id, "status": "pending"}
@app.get("/v1/tasks/{task_id}")
async def get_task_result(task_id: str):
"""轮询任务状态与结果"""
data = redis_client.hgetall(f"task:{task_id}")
if not data:
raise HTTPException(status_code=404, detail="任务不存在或已过期")
return {
"task_id": task_id,
"status": data.get(b"status", b"").decode(),
"progress": int(data.get(b"progress", b"0")),
"result": data.get(b"result", b"").decode() or None,
"error": data.get(b"error", b"").decode() or None
}7.1.2 流式输出(SSE / WebSocket)
1. SSE 流式对话(Server-Sent Events)
python
# pip install sse-starlette
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
from openai import AsyncOpenAI
import json
async_client = AsyncOpenAI()
async def stream_llm_response(request: ChatRequest):
"""异步生成器:逐 token 生成 SSE 事件"""
messages = [
{"role": "system", "content": request.system_prompt or "你是一个有帮助的AI助手。"},
{"role": "user", "content": request.message}
]
try:
stream = await async_client.chat.completions.create(
model=request.model, messages=messages,
temperature=request.temperature, stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield {
"event": "token",
"data": json.dumps({"token": chunk.choices[0].delta.content}, ensure_ascii=False)
}
yield {"event": "done", "data": json.dumps({"status": "completed"})}
except Exception as e:
yield {"event": "error", "data": json.dumps({"error": str(e)})}
@app.post("/v1/chat/stream")
async def chat_stream(request: ChatRequest):
"""SSE 流式对话接口"""
return EventSourceResponse(
stream_llm_response(request),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
# 原生 StreamingResponse 版本(更轻量)
@app.post("/v1/chat/stream-raw")
async def chat_stream_raw(request: ChatRequest):
async def generate():
messages = [{"role": "user", "content": request.message}]
stream = await async_client.chat.completions.create(
model=request.model, messages=messages, stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
data = json.dumps({"token": chunk.choices[0].delta.content}, ensure_ascii=False)
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)2. WebSocket 双向流式对话
python
from fastapi import WebSocket, WebSocketDisconnect
class ConnectionManager:
"""WebSocket 连接池"""
def __init__(self):
self.connections: dict[str, WebSocket] = {}
async def connect(self, ws: WebSocket, client_id: str):
await ws.accept()
self.connections[client_id] = ws
def disconnect(self, client_id: str):
self.connections.pop(client_id, None)
manager = ConnectionManager()
@app.websocket("/ws/chat/{client_id}")
async def websocket_chat(websocket: WebSocket, client_id: str):
"""WebSocket 双向对话(支持多轮历史)"""
await manager.connect(websocket, client_id)
conversation_history = []
try:
while True:
data = await websocket.receive_json()
user_message = data.get("message", "")
if not user_message:
continue
if not conversation_history:
conversation_history.append({
"role": "system",
"content": data.get("system_prompt", "你是一个有帮助的AI助手。")
})
conversation_history.append({"role": "user", "content": user_message})
await websocket.send_json({"type": "start"})
full_response = ""
stream = await async_client.chat.completions.create(
model="gpt-4o-mini", messages=conversation_history, stream=True
)
async for chunk in stream:
token = chunk.choices[0].delta.content
if token:
full_response += token
await websocket.send_json({"type": "token", "content": token})
conversation_history.append({"role": "assistant", "content": full_response})
await websocket.send_json({"type": "done", "full_content": full_response})
except WebSocketDisconnect:
manager.disconnect(client_id)
except Exception as e:
await websocket.send_json({"type": "error", "message": str(e)})
manager.disconnect(client_id)7.1.3 限流、重试与降级策略
1. 请求限流(SlowAPI + Redis)
python
# pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address, storage_uri="redis://localhost:6379")
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/v1/chat")
@limiter.limit("20/minute")
async def chat_with_limit(request: Request, body: ChatRequest):
...
# 按用户 ID 限流(更精细)
def get_user_id(request: Request) -> str:
return request.headers.get("X-User-ID") or get_remote_address(request)
user_limiter = Limiter(key_func=get_user_id, storage_uri="redis://localhost:6379")
@app.post("/v1/chat/premium")
@user_limiter.limit("100/minute;1000/day") # 分钟+天双重限流
async def chat_premium(request: Request, body: ChatRequest):
...
# Token 预算限流(按消耗量而非请求次数)
class TokenBudgetLimiter:
def __init__(self, redis_client, daily_budget: int = 100000):
self.redis = redis_client
self.daily_budget = daily_budget
async def check_and_consume(self, user_id: str, estimated_tokens: int) -> bool:
import datetime
key = f"token_budget:{user_id}:{datetime.date.today()}"
current = int(self.redis.get(key) or 0)
if current + estimated_tokens > self.daily_budget:
return False
pipe = self.redis.pipeline()
pipe.incrby(key, estimated_tokens)
pipe.expire(key, 86400)
pipe.execute()
return True2. 自动重试(tenacity 指数退避)
python
# pip install tenacity
from tenacity import (
retry, stop_after_attempt, wait_exponential,
retry_if_exception_type, before_sleep_log
)
from openai import OpenAI, RateLimitError, APITimeoutError, APIConnectionError
import logging
logger = logging.getLogger(__name__)
client = OpenAI()
@retry(
retry=retry_if_exception_type((RateLimitError, APITimeoutError, APIConnectionError)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30), # 2s → 4s → 8s 指数退避
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def call_llm_with_retry(messages: list, model: str = "gpt-4o-mini", **kwargs) -> str:
response = client.chat.completions.create(model=model, messages=messages, **kwargs)
return response.choices[0].message.content
# 异步版本
from tenacity import AsyncRetrying
from openai import AsyncOpenAI
async def call_llm_async_retry(messages: list, model: str = "gpt-4o-mini") -> str:
aclient = AsyncOpenAI()
async for attempt in AsyncRetrying(
retry=retry_if_exception_type((RateLimitError, APITimeoutError)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30)
):
with attempt:
response = await aclient.chat.completions.create(model=model, messages=messages)
return response.choices[0].message.content3. 降级策略(Fallback Chain)
python
from openai import OpenAI, RateLimitError, APIError
MODEL_FALLBACK_CHAIN = [
{"model": "gpt-4o", "provider": "openai"},
{"model": "gpt-4o-mini", "provider": "openai"},
{"model": "claude-3-5-haiku-20241022", "provider": "anthropic"},
{"model": "qwen-turbo", "provider": "dashscope"},
]
def get_client(provider: str):
if provider == "openai":
return OpenAI()
elif provider == "anthropic":
import anthropic
return anthropic.Anthropic()
elif provider == "dashscope":
return OpenAI(
api_key="your_dashscope_key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
def call_with_fallback(
messages: list,
preferred_model: str = "gpt-4o",
max_tokens: int = 1024
) -> tuple[str, str]:
"""带降级链调用,返回 (内容, 实际使用的模型)"""
start_idx = next(
(i for i, m in enumerate(MODEL_FALLBACK_CHAIN) if m["model"] == preferred_model), 0
)
for config in MODEL_FALLBACK_CHAIN[start_idx:]:
try:
logger.info(f"尝试模型: {config['model']}")
c = get_client(config["provider"])
resp = c.chat.completions.create(
model=config["model"], messages=messages, max_tokens=max_tokens
)
return resp.choices[0].message.content, config["model"]
except RateLimitError:
logger.warning(f"{config['model']} 触发限流,降级")
except APIError as e:
logger.warning(f"{config['model']} API 错误({e.status_code}),降级")
except Exception as e:
logger.error(f"{config['model']} 未知错误: {e},降级")
raise RuntimeError("所有模型均不可用")7.1.4 多模型路由与负载均衡
1. 基于规则的智能路由
python
from dataclasses import dataclass
from enum import Enum
import tiktoken
class RoutingStrategy(str, Enum):
COST_OPTIMIZED = "cost"
QUALITY_FIRST = "quality"
SPEED_FIRST = "speed"
LOAD_BALANCED = "balanced"
@dataclass
class ModelConfig:
name: str
provider: str
context_window: int
cost_per_1k_input: float
cost_per_1k_output: float
avg_latency_ms: int
quality_score: float
supports_vision: bool = False
max_concurrency: int = 50
current_load: int = 0
MODEL_REGISTRY: dict[str, ModelConfig] = {
"gpt-4o": ModelConfig(
"gpt-4o", "openai", 128000, 0.005, 0.015, 2000, 9.5, supports_vision=True
),
"gpt-4o-mini": ModelConfig(
"gpt-4o-mini", "openai", 128000, 0.00015, 0.0006, 800, 7.5, supports_vision=True
),
"claude-3-5-sonnet": ModelConfig(
"claude-3-5-sonnet-20241022", "anthropic", 200000, 0.003, 0.015, 2500, 9.5
),
"qwen-plus": ModelConfig(
"qwen-plus", "dashscope", 131072, 0.0004, 0.0012, 1200, 7.8
),
}
def count_tokens(text: str, model: str = "gpt-4o") -> int:
try:
return len(tiktoken.encoding_for_model(model).encode(text))
except Exception:
return len(text) // 3
def select_model(
messages: list,
strategy: RoutingStrategy = RoutingStrategy.COST_OPTIMIZED,
requires_vision: bool = False,
min_quality_score: float = 0.0
) -> ModelConfig:
"""根据策略自动选择最优模型"""
input_tokens = count_tokens(
" ".join(m.get("content", "") for m in messages if isinstance(m.get("content"), str))
)
candidates = [
m for m in MODEL_REGISTRY.values()
if (not requires_vision or m.supports_vision)
and m.context_window >= input_tokens * 1.5
and m.quality_score >= min_quality_score
and m.current_load < m.max_concurrency
]
if not candidates:
raise ValueError("无可用模型")
if strategy == RoutingStrategy.COST_OPTIMIZED:
return min(candidates, key=lambda m: m.cost_per_1k_input + m.cost_per_1k_output)
elif strategy == RoutingStrategy.QUALITY_FIRST:
return max(candidates, key=lambda m: m.quality_score)
elif strategy == RoutingStrategy.SPEED_FIRST:
return min(candidates, key=lambda m: m.avg_latency_ms)
elif strategy == RoutingStrategy.LOAD_BALANCED:
import random
weights = [m.max_concurrency - m.current_load for m in candidates]
return random.choices(candidates, weights=weights, k=1)[0]
return candidates[0]
@app.post("/v1/chat/auto")
async def chat_auto_route(
request: ChatRequest,
strategy: RoutingStrategy = RoutingStrategy.COST_OPTIMIZED
):
"""自动路由:根据策略选择最优模型"""
messages = [{"role": "user", "content": request.message}]
selected = select_model(messages, strategy=strategy)
logger.info(f"路由至: {selected.name}(策略: {strategy})")
selected.current_load += 1
try:
content, used_model = call_with_fallback(messages, preferred_model=selected.name)
return {"content": content, "model_used": used_model}
finally:
selected.current_load -= 12. 语义缓存(降低重复请求成本)
python
# pip install redis numpy openai
import redis
import numpy as np
import json
import hashlib
from openai import OpenAI
client = OpenAI()
redis_client = redis.Redis(host="localhost", port=6379, db=2)
def get_embedding(text: str) -> list[float]:
return client.embeddings.create(
model="text-embedding-3-small", input=text
).data[0].embedding
def cosine_similarity(a: list, b: list) -> float:
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
class SemanticCache:
"""语义缓存:相似问题复用答案,节省 API 成本"""
def __init__(self, threshold: float = 0.95, ttl: int = 3600):
self.threshold = threshold
self.ttl = ttl
self.prefix = "sem_cache:"
def get(self, query: str) -> str | None:
q_emb = get_embedding(query)
for key in redis_client.keys(f"{self.prefix}*"):
entry = json.loads(redis_client.get(key))
if cosine_similarity(q_emb, entry["embedding"]) >= self.threshold:
print(f"语义缓存命中")
return entry["response"]
return None
def set(self, query: str, response: str):
key = f"{self.prefix}{hashlib.md5(query.encode()).hexdigest()}"
redis_client.setex(key, self.ttl, json.dumps({
"query": query,
"embedding": get_embedding(query),
"response": response
}))
cache = SemanticCache()
def cached_llm_call(query: str) -> tuple[str, bool]:
"""返回 (回复, 是否命中缓存)"""
if cached := cache.get(query):
return cached, True
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}]
)
answer = response.choices[0].message.content
cache.set(query, answer)
return answer, False7.1.5 综合实战:生产级 LLM 服务
python
"""
生产级 LLM API 服务:认证 + 限流 + 路由 + 流式 + 监控
兼容 OpenAI API 格式,可作为统一代理层
"""
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from contextlib import asynccontextmanager
from openai import AsyncOpenAI
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
import time, logging, uuid, json
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("LLM 服务启动")
yield
logger.info("LLM 服务关闭")
app = FastAPI(title="Production LLM API", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"],
allow_methods=["GET", "POST"],
allow_headers=["Authorization", "Content-Type"]
)
security = HTTPBearer()
async_client = AsyncOpenAI()
VALID_API_KEYS = {"sk-demo-key-123": {"user": "demo", "tier": "free"}}
async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
if credentials.credentials not in VALID_API_KEYS:
raise HTTPException(status_code=401, detail="Invalid API key")
return VALID_API_KEYS[credentials.credentials]
class ProductionChatRequest(BaseModel):
messages: list[dict]
model: str = "gpt-4o-mini"
temperature: float = 0.7
max_tokens: int = 1024
stream: bool = False
@app.middleware("http")
async def request_tracking(request: Request, call_next):
"""请求追踪:记录 ID、耗时"""
rid = str(uuid.uuid4())[:8]
request.state.request_id = rid
start = time.time()
logger.info(f"[{rid}] {request.method} {request.url.path}")
response = await call_next(request)
ms = int((time.time() - start) * 1000)
logger.info(f"[{rid}] {response.status_code} ({ms}ms)")
response.headers["X-Request-ID"] = rid
response.headers["X-Response-Time"] = str(ms)
return response
@app.post("/v1/chat/completions")
async def production_chat(
request: ProductionChatRequest,
user_info: dict = Depends(verify_api_key)
):
"""兼容 OpenAI 格式的对话接口(支持流式)"""
if request.stream:
async def generate():
stream = await async_client.chat.completions.create(
model=request.model, messages=request.messages,
temperature=request.temperature, max_tokens=request.max_tokens,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
data = json.dumps({
"id": str(uuid.uuid4()),
"object": "chat.completion.chunk",
"choices": [{"delta": {"content": chunk.choices[0].delta.content}}]
})
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
from fastapi.responses import StreamingResponse
return StreamingResponse(
generate(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
response = await async_client.chat.completions.create(
model=request.model, messages=request.messages,
temperature=request.temperature, max_tokens=request.max_tokens
)
return response.model_dump()
@app.get("/health")
async def health_check():
return {"status": "ok", "timestamp": time.time()}
@app.get("/v1/models")
async def list_models(user_info: dict = Depends(verify_api_key)):
return {"data": [{"id": k} for k in MODEL_REGISTRY.keys()]}
# 启动:uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4学习资源