Skip to content

7.1 后端架构设计(API 服务设计)

AI 应用后端与传统 Web 服务的核心差异:响应时间长(秒级)、输出流式、上游 API 不稳定、成本随请求量线性增长。


LLM 应用的后端与传统 Web 服务有显著差异:响应时间长(秒级甚至分钟级)、输出是流式的、上游 API 不稳定、成本随请求量线性增长。本节以 FastAPI 为核心框架,覆盖从单机服务到生产级多模型路由的完整工程实践。


7.1.1 LLM 应用架构模式

三种响应模式对比

模式适用场景首字延迟用户体验实现复杂度
同步(Sync)短文本分类、结构化提取高(等全量响应)差(等待转圈)
异步任务(Async Task)长文生成、报告分析立即返回任务ID中(轮询进度)
流式(Streaming SSE/WS)对话、实时写作极低(毫秒级首字)优(打字效果)

1. FastAPI 项目结构

ai_backend/
├── main.py              # 应用入口
├── api/
│   ├── chat.py          # 对话接口
│   ├── completions.py   # 文本补全接口
│   └── tasks.py         # 异步任务接口
├── services/
│   ├── llm_router.py    # 模型路由层
│   ├── rate_limiter.py  # 限流服务
│   └── cache.py         # 缓存服务
├── models/
│   └── schemas.py       # Pydantic 数据模型
├── middleware/
│   └── auth.py          # 认证中间件
└── config.py            # 配置管理

2. 同步响应接口

python
# pip install fastapi uvicorn openai pydantic

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from openai import OpenAI
from typing import Optional
import time

app = FastAPI(title="LLM API Service", version="1.0.0")
client = OpenAI()

class ChatRequest(BaseModel):
    message: str = Field(..., min_length=1, max_length=4000)
    model: str = Field(default="gpt-4o-mini")
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)
    max_tokens: int = Field(default=1024, ge=1, le=4096)
    system_prompt: Optional[str] = None

class ChatResponse(BaseModel):
    content: str
    model: str
    usage: dict
    latency_ms: int

@app.post("/v1/chat", response_model=ChatResponse)
async def chat_sync(request: ChatRequest):
    """同步对话接口(适合短文本,响应完整后返回)"""
    start = time.time()
    messages = []
    if request.system_prompt:
        messages.append({"role": "system", "content": request.system_prompt})
    messages.append({"role": "user", "content": request.message})

    try:
        response = client.chat.completions.create(
            model=request.model,
            messages=messages,
            temperature=request.temperature,
            max_tokens=request.max_tokens
        )
        return ChatResponse(
            content=response.choices[0].message.content,
            model=response.model,
            usage=response.usage.model_dump(),
            latency_ms=int((time.time() - start) * 1000)
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

3. 异步任务模式(Celery + Redis)

python
# pip install celery redis

from celery import Celery
from openai import OpenAI
import redis
import uuid

celery_app = Celery("llm_tasks", broker="redis://localhost:6379/0")
redis_client = redis.Redis(host="localhost", port=6379, db=1)
openai_client = OpenAI()

@celery_app.task(bind=True, max_retries=3)
def generate_long_content(self, task_id: str, prompt: str, system_prompt: str = ""):
    """异步长文生成任务"""
    try:
        redis_client.hset(f"task:{task_id}", mapping={"status": "processing", "progress": 0})

        messages = []
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        messages.append({"role": "user", "content": prompt})

        response = openai_client.chat.completions.create(
            model="gpt-4o", messages=messages, max_tokens=4096
        )

        redis_client.hset(f"task:{task_id}", mapping={
            "status": "completed",
            "result": response.choices[0].message.content,
            "progress": 100
        })
        redis_client.expire(f"task:{task_id}", 3600)

    except Exception as exc:
        redis_client.hset(f"task:{task_id}", mapping={"status": "failed", "error": str(exc)})
        raise self.retry(exc=exc, countdown=5)

@app.post("/v1/tasks/generate")
async def submit_task(prompt: str, system_prompt: str = ""):
    """提交长文生成任务,立即返回 task_id"""
    task_id = str(uuid.uuid4())
    redis_client.hset(f"task:{task_id}", mapping={"status": "pending", "progress": 0})
    generate_long_content.delay(task_id, prompt, system_prompt)
    return {"task_id": task_id, "status": "pending"}

@app.get("/v1/tasks/{task_id}")
async def get_task_result(task_id: str):
    """轮询任务状态与结果"""
    data = redis_client.hgetall(f"task:{task_id}")
    if not data:
        raise HTTPException(status_code=404, detail="任务不存在或已过期")
    return {
        "task_id": task_id,
        "status": data.get(b"status", b"").decode(),
        "progress": int(data.get(b"progress", b"0")),
        "result": data.get(b"result", b"").decode() or None,
        "error": data.get(b"error", b"").decode() or None
    }

7.1.2 流式输出(SSE / WebSocket)

1. SSE 流式对话(Server-Sent Events)

python
# pip install sse-starlette

from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
from openai import AsyncOpenAI
import json

async_client = AsyncOpenAI()

async def stream_llm_response(request: ChatRequest):
    """异步生成器:逐 token 生成 SSE 事件"""
    messages = [
        {"role": "system", "content": request.system_prompt or "你是一个有帮助的AI助手。"},
        {"role": "user", "content": request.message}
    ]
    try:
        stream = await async_client.chat.completions.create(
            model=request.model, messages=messages,
            temperature=request.temperature, stream=True
        )
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield {
                    "event": "token",
                    "data": json.dumps({"token": chunk.choices[0].delta.content}, ensure_ascii=False)
                }
        yield {"event": "done", "data": json.dumps({"status": "completed"})}
    except Exception as e:
        yield {"event": "error", "data": json.dumps({"error": str(e)})}

@app.post("/v1/chat/stream")
async def chat_stream(request: ChatRequest):
    """SSE 流式对话接口"""
    return EventSourceResponse(
        stream_llm_response(request),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )

# 原生 StreamingResponse 版本(更轻量)
@app.post("/v1/chat/stream-raw")
async def chat_stream_raw(request: ChatRequest):
    async def generate():
        messages = [{"role": "user", "content": request.message}]
        stream = await async_client.chat.completions.create(
            model=request.model, messages=messages, stream=True
        )
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                data = json.dumps({"token": chunk.choices[0].delta.content}, ensure_ascii=False)
                yield f"data: {data}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(), media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )

2. WebSocket 双向流式对话

python
from fastapi import WebSocket, WebSocketDisconnect

class ConnectionManager:
    """WebSocket 连接池"""
    def __init__(self):
        self.connections: dict[str, WebSocket] = {}

    async def connect(self, ws: WebSocket, client_id: str):
        await ws.accept()
        self.connections[client_id] = ws

    def disconnect(self, client_id: str):
        self.connections.pop(client_id, None)

manager = ConnectionManager()

@app.websocket("/ws/chat/{client_id}")
async def websocket_chat(websocket: WebSocket, client_id: str):
    """WebSocket 双向对话(支持多轮历史)"""
    await manager.connect(websocket, client_id)
    conversation_history = []

    try:
        while True:
            data = await websocket.receive_json()
            user_message = data.get("message", "")
            if not user_message:
                continue

            if not conversation_history:
                conversation_history.append({
                    "role": "system",
                    "content": data.get("system_prompt", "你是一个有帮助的AI助手。")
                })
            conversation_history.append({"role": "user", "content": user_message})

            await websocket.send_json({"type": "start"})

            full_response = ""
            stream = await async_client.chat.completions.create(
                model="gpt-4o-mini", messages=conversation_history, stream=True
            )
            async for chunk in stream:
                token = chunk.choices[0].delta.content
                if token:
                    full_response += token
                    await websocket.send_json({"type": "token", "content": token})

            conversation_history.append({"role": "assistant", "content": full_response})
            await websocket.send_json({"type": "done", "full_content": full_response})

    except WebSocketDisconnect:
        manager.disconnect(client_id)
    except Exception as e:
        await websocket.send_json({"type": "error", "message": str(e)})
        manager.disconnect(client_id)

7.1.3 限流、重试与降级策略

1. 请求限流(SlowAPI + Redis)

python
# pip install slowapi

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address, storage_uri="redis://localhost:6379")
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/v1/chat")
@limiter.limit("20/minute")
async def chat_with_limit(request: Request, body: ChatRequest):
    ...

# 按用户 ID 限流(更精细)
def get_user_id(request: Request) -> str:
    return request.headers.get("X-User-ID") or get_remote_address(request)

user_limiter = Limiter(key_func=get_user_id, storage_uri="redis://localhost:6379")

@app.post("/v1/chat/premium")
@user_limiter.limit("100/minute;1000/day")  # 分钟+天双重限流
async def chat_premium(request: Request, body: ChatRequest):
    ...

# Token 预算限流(按消耗量而非请求次数)
class TokenBudgetLimiter:
    def __init__(self, redis_client, daily_budget: int = 100000):
        self.redis = redis_client
        self.daily_budget = daily_budget

    async def check_and_consume(self, user_id: str, estimated_tokens: int) -> bool:
        import datetime
        key = f"token_budget:{user_id}:{datetime.date.today()}"
        current = int(self.redis.get(key) or 0)
        if current + estimated_tokens > self.daily_budget:
            return False
        pipe = self.redis.pipeline()
        pipe.incrby(key, estimated_tokens)
        pipe.expire(key, 86400)
        pipe.execute()
        return True

2. 自动重试(tenacity 指数退避)

python
# pip install tenacity

from tenacity import (
    retry, stop_after_attempt, wait_exponential,
    retry_if_exception_type, before_sleep_log
)
from openai import OpenAI, RateLimitError, APITimeoutError, APIConnectionError
import logging

logger = logging.getLogger(__name__)
client = OpenAI()

@retry(
    retry=retry_if_exception_type((RateLimitError, APITimeoutError, APIConnectionError)),
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=30),  # 2s → 4s → 8s 指数退避
    before_sleep=before_sleep_log(logger, logging.WARNING)
)
def call_llm_with_retry(messages: list, model: str = "gpt-4o-mini", **kwargs) -> str:
    response = client.chat.completions.create(model=model, messages=messages, **kwargs)
    return response.choices[0].message.content

# 异步版本
from tenacity import AsyncRetrying
from openai import AsyncOpenAI

async def call_llm_async_retry(messages: list, model: str = "gpt-4o-mini") -> str:
    aclient = AsyncOpenAI()
    async for attempt in AsyncRetrying(
        retry=retry_if_exception_type((RateLimitError, APITimeoutError)),
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=30)
    ):
        with attempt:
            response = await aclient.chat.completions.create(model=model, messages=messages)
            return response.choices[0].message.content

3. 降级策略(Fallback Chain)

python
from openai import OpenAI, RateLimitError, APIError

MODEL_FALLBACK_CHAIN = [
    {"model": "gpt-4o",         "provider": "openai"},
    {"model": "gpt-4o-mini",    "provider": "openai"},
    {"model": "claude-3-5-haiku-20241022", "provider": "anthropic"},
    {"model": "qwen-turbo",     "provider": "dashscope"},
]

def get_client(provider: str):
    if provider == "openai":
        return OpenAI()
    elif provider == "anthropic":
        import anthropic
        return anthropic.Anthropic()
    elif provider == "dashscope":
        return OpenAI(
            api_key="your_dashscope_key",
            base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
        )

def call_with_fallback(
    messages: list,
    preferred_model: str = "gpt-4o",
    max_tokens: int = 1024
) -> tuple[str, str]:
    """带降级链调用,返回 (内容, 实际使用的模型)"""
    start_idx = next(
        (i for i, m in enumerate(MODEL_FALLBACK_CHAIN) if m["model"] == preferred_model), 0
    )

    for config in MODEL_FALLBACK_CHAIN[start_idx:]:
        try:
            logger.info(f"尝试模型: {config['model']}")
            c = get_client(config["provider"])
            resp = c.chat.completions.create(
                model=config["model"], messages=messages, max_tokens=max_tokens
            )
            return resp.choices[0].message.content, config["model"]
        except RateLimitError:
            logger.warning(f"{config['model']} 触发限流,降级")
        except APIError as e:
            logger.warning(f"{config['model']} API 错误({e.status_code}),降级")
        except Exception as e:
            logger.error(f"{config['model']} 未知错误: {e},降级")

    raise RuntimeError("所有模型均不可用")

7.1.4 多模型路由与负载均衡

1. 基于规则的智能路由

python
from dataclasses import dataclass
from enum import Enum
import tiktoken

class RoutingStrategy(str, Enum):
    COST_OPTIMIZED = "cost"
    QUALITY_FIRST  = "quality"
    SPEED_FIRST    = "speed"
    LOAD_BALANCED  = "balanced"

@dataclass
class ModelConfig:
    name: str
    provider: str
    context_window: int
    cost_per_1k_input: float
    cost_per_1k_output: float
    avg_latency_ms: int
    quality_score: float
    supports_vision: bool = False
    max_concurrency: int = 50
    current_load: int = 0

MODEL_REGISTRY: dict[str, ModelConfig] = {
    "gpt-4o": ModelConfig(
        "gpt-4o", "openai", 128000, 0.005, 0.015, 2000, 9.5, supports_vision=True
    ),
    "gpt-4o-mini": ModelConfig(
        "gpt-4o-mini", "openai", 128000, 0.00015, 0.0006, 800, 7.5, supports_vision=True
    ),
    "claude-3-5-sonnet": ModelConfig(
        "claude-3-5-sonnet-20241022", "anthropic", 200000, 0.003, 0.015, 2500, 9.5
    ),
    "qwen-plus": ModelConfig(
        "qwen-plus", "dashscope", 131072, 0.0004, 0.0012, 1200, 7.8
    ),
}

def count_tokens(text: str, model: str = "gpt-4o") -> int:
    try:
        return len(tiktoken.encoding_for_model(model).encode(text))
    except Exception:
        return len(text) // 3

def select_model(
    messages: list,
    strategy: RoutingStrategy = RoutingStrategy.COST_OPTIMIZED,
    requires_vision: bool = False,
    min_quality_score: float = 0.0
) -> ModelConfig:
    """根据策略自动选择最优模型"""
    input_tokens = count_tokens(
        " ".join(m.get("content", "") for m in messages if isinstance(m.get("content"), str))
    )

    candidates = [
        m for m in MODEL_REGISTRY.values()
        if (not requires_vision or m.supports_vision)
        and m.context_window >= input_tokens * 1.5
        and m.quality_score >= min_quality_score
        and m.current_load < m.max_concurrency
    ]

    if not candidates:
        raise ValueError("无可用模型")

    if strategy == RoutingStrategy.COST_OPTIMIZED:
        return min(candidates, key=lambda m: m.cost_per_1k_input + m.cost_per_1k_output)
    elif strategy == RoutingStrategy.QUALITY_FIRST:
        return max(candidates, key=lambda m: m.quality_score)
    elif strategy == RoutingStrategy.SPEED_FIRST:
        return min(candidates, key=lambda m: m.avg_latency_ms)
    elif strategy == RoutingStrategy.LOAD_BALANCED:
        import random
        weights = [m.max_concurrency - m.current_load for m in candidates]
        return random.choices(candidates, weights=weights, k=1)[0]

    return candidates[0]

@app.post("/v1/chat/auto")
async def chat_auto_route(
    request: ChatRequest,
    strategy: RoutingStrategy = RoutingStrategy.COST_OPTIMIZED
):
    """自动路由:根据策略选择最优模型"""
    messages = [{"role": "user", "content": request.message}]
    selected = select_model(messages, strategy=strategy)
    logger.info(f"路由至: {selected.name}(策略: {strategy})")

    selected.current_load += 1
    try:
        content, used_model = call_with_fallback(messages, preferred_model=selected.name)
        return {"content": content, "model_used": used_model}
    finally:
        selected.current_load -= 1

2. 语义缓存(降低重复请求成本)

python
# pip install redis numpy openai

import redis
import numpy as np
import json
import hashlib
from openai import OpenAI

client = OpenAI()
redis_client = redis.Redis(host="localhost", port=6379, db=2)

def get_embedding(text: str) -> list[float]:
    return client.embeddings.create(
        model="text-embedding-3-small", input=text
    ).data[0].embedding

def cosine_similarity(a: list, b: list) -> float:
    a, b = np.array(a), np.array(b)
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

class SemanticCache:
    """语义缓存:相似问题复用答案,节省 API 成本"""

    def __init__(self, threshold: float = 0.95, ttl: int = 3600):
        self.threshold = threshold
        self.ttl = ttl
        self.prefix = "sem_cache:"

    def get(self, query: str) -> str | None:
        q_emb = get_embedding(query)
        for key in redis_client.keys(f"{self.prefix}*"):
            entry = json.loads(redis_client.get(key))
            if cosine_similarity(q_emb, entry["embedding"]) >= self.threshold:
                print(f"语义缓存命中")
                return entry["response"]
        return None

    def set(self, query: str, response: str):
        key = f"{self.prefix}{hashlib.md5(query.encode()).hexdigest()}"
        redis_client.setex(key, self.ttl, json.dumps({
            "query": query,
            "embedding": get_embedding(query),
            "response": response
        }))

cache = SemanticCache()

def cached_llm_call(query: str) -> tuple[str, bool]:
    """返回 (回复, 是否命中缓存)"""
    if cached := cache.get(query):
        return cached, True

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": query}]
    )
    answer = response.choices[0].message.content
    cache.set(query, answer)
    return answer, False

7.1.5 综合实战:生产级 LLM 服务

python
"""
生产级 LLM API 服务:认证 + 限流 + 路由 + 流式 + 监控
兼容 OpenAI API 格式,可作为统一代理层
"""

from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from contextlib import asynccontextmanager
from openai import AsyncOpenAI
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
import time, logging, uuid, json

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("LLM 服务启动")
    yield
    logger.info("LLM 服务关闭")

app = FastAPI(title="Production LLM API", lifespan=lifespan)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://yourdomain.com"],
    allow_methods=["GET", "POST"],
    allow_headers=["Authorization", "Content-Type"]
)

security = HTTPBearer()
async_client = AsyncOpenAI()

VALID_API_KEYS = {"sk-demo-key-123": {"user": "demo", "tier": "free"}}

async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
    if credentials.credentials not in VALID_API_KEYS:
        raise HTTPException(status_code=401, detail="Invalid API key")
    return VALID_API_KEYS[credentials.credentials]

class ProductionChatRequest(BaseModel):
    messages: list[dict]
    model: str = "gpt-4o-mini"
    temperature: float = 0.7
    max_tokens: int = 1024
    stream: bool = False

@app.middleware("http")
async def request_tracking(request: Request, call_next):
    """请求追踪:记录 ID、耗时"""
    rid = str(uuid.uuid4())[:8]
    request.state.request_id = rid
    start = time.time()
    logger.info(f"[{rid}] {request.method} {request.url.path}")
    response = await call_next(request)
    ms = int((time.time() - start) * 1000)
    logger.info(f"[{rid}] {response.status_code} ({ms}ms)")
    response.headers["X-Request-ID"] = rid
    response.headers["X-Response-Time"] = str(ms)
    return response

@app.post("/v1/chat/completions")
async def production_chat(
    request: ProductionChatRequest,
    user_info: dict = Depends(verify_api_key)
):
    """兼容 OpenAI 格式的对话接口(支持流式)"""
    if request.stream:
        async def generate():
            stream = await async_client.chat.completions.create(
                model=request.model, messages=request.messages,
                temperature=request.temperature, max_tokens=request.max_tokens,
                stream=True
            )
            async for chunk in stream:
                if chunk.choices[0].delta.content:
                    data = json.dumps({
                        "id": str(uuid.uuid4()),
                        "object": "chat.completion.chunk",
                        "choices": [{"delta": {"content": chunk.choices[0].delta.content}}]
                    })
                    yield f"data: {data}\n\n"
            yield "data: [DONE]\n\n"

        from fastapi.responses import StreamingResponse
        return StreamingResponse(
            generate(), media_type="text/event-stream",
            headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
        )

    response = await async_client.chat.completions.create(
        model=request.model, messages=request.messages,
        temperature=request.temperature, max_tokens=request.max_tokens
    )
    return response.model_dump()

@app.get("/health")
async def health_check():
    return {"status": "ok", "timestamp": time.time()}

@app.get("/v1/models")
async def list_models(user_info: dict = Depends(verify_api_key)):
    return {"data": [{"id": k} for k in MODEL_REGISTRY.keys()]}

# 启动:uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

学习资源

坚持是一种品格