Skip to content

AI 应用的流式输出全链路

从 LLM 的第一个 Token 到用户屏幕上的逐字渲染——拆解流式输出的每一层。


8. 生产级优化——延迟、错误、成本

前面 7 章解决了"能跑起来"的问题。这一章解决"能上线"的问题——真实用户、真实流量、真实账单下必须处理的优化。


8.1 TTFT 优化:从请求到第一个 Token

TTFT(Time To First Token)是流式体验的第一印象。用户点击发送后,等待第一个字出现的时间。超过 2 秒用户就会感觉"卡了"。

TTFT 的组成

TTFT 的完整分解(从用户点击"发送"到看到第一个字):

  用户点击 → [前端处理] → [网络请求] → [后端处理] → [LLM Prefill] → 第一个 Token
              ~5ms          ~50-200ms     ~10-50ms      ~200-2000ms

  各阶段的优化方向:
  ┌──────────────────────────────────────────────────┐
  │ 阶段           耗时         优化手段              │
  │ ─────────────────────────────────────────────────│
  │ 前端处理       ~5ms         几乎无需优化           │
  │ 网络 RTT       ~50-200ms    CDN / 就近部署        │
  │ 后端处理       ~10-50ms     预热连接池 / 缓存      │
  │ LLM Prefill    ~200-2000ms  模型选择 / Prompt 优化 │
  │ ─────────────────────────────────────────────────│
  │ 总 TTFT        ~300-2300ms  重点优化 Prefill 阶段  │
  └──────────────────────────────────────────────────┘

实用优化技巧

TTFT 优化清单(按效果排序):

  1. 选小模型(效果最大)⭐⭐⭐⭐⭐
     → GPT-4o-mini 的 TTFT 比 GPT-4o 快 2-3 倍
     → 简单对话用小模型,复杂推理才上大模型
     → 路由策略:先判断复杂度,再选模型

  2. 压缩 System Prompt ⭐⭐⭐⭐
     → Prefill 时间与输入 Token 数正相关
     → 2000 Token 的 System Prompt → TTFT +200ms
     → 精简到 500 Token → 省 150ms
     → 用 "你是 XX 助手" 替代 5 页的角色设定

  3. 裁剪对话历史 ⭐⭐⭐⭐
     → 不要把 100 轮对话全发给 LLM
     → 策略:保留 System + 最近 10 轮 + 摘要
     → 或者用 LLM 生成对话摘要,替代完整历史

  4. 连接池预热 ⭐⭐⭐
     → OpenAI SDK 默认每次请求新建 TCP 连接
     → 设置 http_client 复用连接 → 省 50-100ms
     → AsyncOpenAI 会自动复用

  5. 流式"骨架屏" ⭐⭐
     → 在等待 TTFT 期间,显示思考动画
     → "AI 正在思考..."(点点点动画)
     → 不是真正的优化,但改善了感知体验
python
# ttft_optimization.py —— 对话历史裁剪策略
def trim_conversation(messages: list, max_turns: int = 10, max_tokens: int = 4000):
    """裁剪对话历史,控制 TTFT"""

    # 保留 system prompt
    system = [m for m in messages if m["role"] == "system"]

    # 保留最后 N 轮对话(1 轮 = user + assistant)
    non_system = [m for m in messages if m["role"] != "system"]
    recent = non_system[-(max_turns * 2):]

    trimmed = system + recent

    # 估算 Token 数(粗略:1 中文字 ≈ 1.5 Token)
    total_chars = sum(len(m["content"]) for m in trimmed)
    estimated_tokens = int(total_chars * 1.5)

    # 如果还是太长,进一步裁剪
    while estimated_tokens > max_tokens and len(recent) > 2:
        recent = recent[2:]  # 删掉最早的一轮
        trimmed = system + recent
        total_chars = sum(len(m["content"]) for m in trimmed)
        estimated_tokens = int(total_chars * 1.5)

    return trimmed

8.2 断线重连与错误恢复

流式连接可能因为网络抖动、代理超时、服务器重启而中断。用户看到的效果是"回复生成到一半就停了"——这比生成失败更让人难受。

前端断线检测与重连

javascript
// reconnect.js —— 带指数退避的断线重连策略
async function fetchSSEWithRetry(prompt, callbacks, maxRetries = 3) {
  let retryCount = 0;
  let receivedContent = '';  // 已经收到的内容

  while (retryCount <= maxRetries) {
    try {
      const response = await fetch('/api/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          message: prompt,
          // 断线重连时,告诉后端已接收到的内容长度
          resume_from: receivedContent.length,
        }),
      });

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let buffer = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) {
          callbacks.onDone();
          return;  // 正常完成,退出重连循环
        }

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop();

        for (const line of lines) {
          if (!line.startsWith('data: ')) continue;
          const raw = line.slice(6);
          if (raw === '[DONE]') { callbacks.onDone(); return; }

          try {
            const data = JSON.parse(raw);
            if (data.content) {
              receivedContent += data.content;
              callbacks.onToken(data.content);
            }
          } catch {}
        }
      }
    } catch (err) {
      if (err.name === 'AbortError') return;  // 用户主动中断

      retryCount++;
      if (retryCount > maxRetries) {
        callbacks.onError(new Error('多次重连失败,请刷新重试'));
        return;
      }

      // 指数退避:1s → 2s → 4s
      const delay = Math.pow(2, retryCount - 1) * 1000;
      callbacks.onRetry(retryCount, delay);
      await new Promise(r => setTimeout(r, delay));
    }
  }
}

后端心跳保活

python
# heartbeat.py —— SSE 心跳机制
import asyncio
import json

async def generate_with_heartbeat(prompt: str, heartbeat_interval: float = 15.0):
    """带心跳的流式生成器——防止代理/负载均衡器超时断连"""

    async def heartbeat():
        """每 15 秒发一个空注释,保持连接"""
        while True:
            await asyncio.sleep(heartbeat_interval)
            yield ": heartbeat\n\n"  # SSE 注释,客户端会忽略

    # 与 LLM 流式输出交错发送心跳
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )

    last_activity = asyncio.get_event_loop().time()

    async for chunk in stream:
        now = asyncio.get_event_loop().time()

        # 如果距离上次活动超过 15 秒,先发心跳
        if now - last_activity > heartbeat_interval:
            yield ": heartbeat\n\n"

        content = chunk.choices[0].delta.content
        if content:
            data = json.dumps({"content": content}, ensure_ascii=False)
            yield f"data: {data}\n\n"
            last_activity = now

    yield "data: [DONE]\n\n"
为什么需要心跳?

  问题场景:
    Nginx / CloudFlare / AWS ALB 都有默认的代理超时
    → Nginx: proxy_read_timeout 默认 60s
    → CloudFlare: 100s
    → AWS ALB: 60s

    如果 LLM 生成慢(复杂推理 / 高负载),两个 Token 之间间隔 > 60s
    → 代理以为连接死了 → 主动断开
    → 用户看到"生成中断"

  解决:
    每 15 秒发一个 SSE 注释 ": heartbeat\n\n"
    → 代理看到有数据流动 → 不断连
    → 客户端 SSE 解析器自动忽略注释行

8.3 并发与背压控制

当 100 个用户同时发消息,你的后端会向 LLM 发起 100 个并发流式请求。LLM 不堪重负 → 所有人的速度都变慢 → 更多人点"重试" → 雪崩。

信号量限制并发

python
# concurrency.py —— 用 Semaphore 控制 LLM 并发请求数
import asyncio
from fastapi import FastAPI, HTTPException

app = FastAPI()

# 全局信号量:最多同时 20 个流式请求
MAX_CONCURRENT_STREAMS = 20
semaphore = asyncio.Semaphore(MAX_CONCURRENT_STREAMS)

# 当前活跃连接数(用于监控)
active_streams = 0

async def generate_stream_with_limit(request):
    global active_streams

    # 尝试获取信号量(不阻塞)
    if semaphore.locked() and semaphore._value == 0:
        # 已满,返回排队提示或拒绝
        yield f"data: {json.dumps({'error': '当前使用人数较多,请稍后重试'})}\n\n"
        yield "data: [DONE]\n\n"
        return

    async with semaphore:
        active_streams += 1
        try:
            async for chunk in _call_llm(request):
                yield chunk
        finally:
            active_streams -= 1

# 监控端点
@app.get("/api/metrics")
async def metrics():
    return {
        "active_streams": active_streams,
        "max_streams": MAX_CONCURRENT_STREAMS,
        "available": MAX_CONCURRENT_STREAMS - active_streams,
        "utilization": f"{active_streams / MAX_CONCURRENT_STREAMS * 100:.0f}%",
    }

队列 + 优先级

高并发场景的三种策略:

  策略 1:直接拒绝(最简单)
    → 超过并发上限 → 返回 429
    → 适合:非关键场景、有客户端重试的情况

  策略 2:排队等待(更友好)
    → 超过上限 → 进入队列 → 轮到了再执行
    → 通过 SSE 推送排队位置:"您前面还有 3 人"
    → 适合:用户愿意等待的场景

  策略 3:优先级队列(企业级)
    → VIP 用户优先、免费用户排队
    → 基于用户等级分配不同的并发额度
    → 适合:SaaS 产品、多租户平台

  实现要点:
    → asyncio.Queue 做队列
    → asyncio.PriorityQueue 做优先级
    → Redis + Lua 脚本做分布式限流

背压监控

python
# backpressure.py —— 背压检测与自适应降级
import time

class BackpressureMonitor:
    """监控 LLM 响应速度,自动降级"""

    def __init__(self):
        self.ttft_history = []        # 最近 100 次 TTFT
        self.speed_history = []       # 最近 100 次 tok/s

    def record(self, ttft: float, tokens_per_sec: float):
        self.ttft_history.append(ttft)
        self.speed_history.append(tokens_per_sec)
        # 只保留最近 100 条
        self.ttft_history = self.ttft_history[-100:]
        self.speed_history = self.speed_history[-100:]

    @property
    def avg_ttft(self) -> float:
        return sum(self.ttft_history) / len(self.ttft_history) if self.ttft_history else 0

    @property
    def should_degrade(self) -> bool:
        """当平均 TTFT > 3s 或 速度 < 10 tok/s 时,触发降级"""
        return self.avg_ttft > 3.0 or (
            self.speed_history and
            sum(self.speed_history[-10:]) / 10 < 10
        )

    def get_recommended_model(self, default: str) -> str:
        """根据背压情况推荐模型"""
        if self.should_degrade:
            # 自动切换到更小/更快的模型
            return "gpt-4o-mini" if "gpt-4o" in default else default
        return default

核心原则:流式连接是长连接,一个请求可能持续 10-30 秒。如果不限制并发,20 个用户就能耗尽你的 LLM 配额和服务器资源。信号量 + 队列 + 背压降级,三者缺一不可。

8.4 成本监控与流式计费

LLM API 不便宜。一次对话可能花几毛钱,10 万用户每月的 Token 费用可能超过 10 万人民币。不做成本监控 = 等着被账单吓一跳。

主流 API 价格参考

主流 LLM API 定价(2024 年末参考价):

  提供商        模型              输入价格           输出价格
  ─────────────────────────────────────────────────────────────
  OpenAI       GPT-4o           $2.50/1M tokens   $10.00/1M tokens
  OpenAI       GPT-4o-mini      $0.15/1M tokens   $0.60/1M tokens
  Anthropic    Claude 3.5 Sonnet $3.00/1M tokens  $15.00/1M tokens
  Anthropic    Claude 3.5 Haiku  $0.80/1M tokens   $4.00/1M tokens
  本地 Ollama   Llama3 8B        免费               免费(电费除外)

  成本估算示例(1 万次对话/天):
    每次对话:~500 输入 Token + ~1000 输出 Token
    GPT-4o:      (500×2.5 + 1000×10)/1M × 10000 = $112.5/天 ≈ ¥800/天
    GPT-4o-mini: (500×0.15 + 1000×0.6)/1M × 10000 = $6.75/天 ≈ ¥48/天
    → 小模型便宜 16 倍!

流式计费中间件

python
# billing.py —— 流式输出中的实时 Token 计费
from dataclasses import dataclass, field
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

@dataclass
class UsageRecord:
    user_id: str
    model: str
    input_tokens: int = 0
    output_tokens: int = 0
    start_time: datetime = field(default_factory=datetime.now)
    duration_ms: float = 0

    @property
    def cost_usd(self) -> float:
        """估算本次请求成本"""
        pricing = {
            "gpt-4o":         {"input": 2.50, "output": 10.00},
            "gpt-4o-mini":    {"input": 0.15, "output": 0.60},
        }
        p = pricing.get(self.model, {"input": 1.0, "output": 3.0})
        return (self.input_tokens * p["input"] + self.output_tokens * p["output"]) / 1_000_000

async def generate_with_billing(request, user_id: str):
    """带计费统计的流式生成器"""
    import time

    record = UsageRecord(user_id=user_id, model=request.model)
    start = time.time()
    output_token_count = 0

    stream = await client.chat.completions.create(
        model=request.model,
        messages=request.messages,
        stream=True,
        stream_options={"include_usage": True},
    )

    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            output_token_count += 1
            yield f"data: {json.dumps({'content': content}, ensure_ascii=False)}\n\n"

        # 最后一个 chunk 包含准确的 usage 信息
        if chunk.usage:
            record.input_tokens = chunk.usage.prompt_tokens
            record.output_tokens = chunk.usage.completion_tokens

    record.duration_ms = (time.time() - start) * 1000

    # 记录到数据库 / 日志(异步,不阻塞响应)
    logger.info(f"📊 计费 | 用户:{record.user_id} | 模型:{record.model} | "
                f"输入:{record.input_tokens} | 输出:{record.output_tokens} | "
                f"成本:${record.cost_usd:.4f} | 耗时:{record.duration_ms:.0f}ms")

    yield "data: [DONE]\n\n"

成本预警与配额管理

成本控制策略(从简到繁):

  1. 日志监控(最低要求)
     → 每次请求记录 Token 用量和成本
     → 每日/每周汇总报表
     → 发现异常就手动介入

  2. 用户配额
     → 免费用户:每天 20 次对话
     → 付费用户:每月 10 万 Token
     → 超过配额 → 返回 "配额已用完,请明天再试"

  3. 预算预警
     → 设置日预算上限(如 $100/天)
     → 接近上限时发送 Slack/邮件报警
     → 超过上限自动切换到更便宜的模型

  4. 智能路由
     → 简单问题("你好"、"帮我翻译")→ GPT-4o-mini
     → 复杂问题(代码生成、长文分析)→ GPT-4o
     → 可以用一个微调的分类器判断问题复杂度
     → 节省 60-80% 的成本

上线前务必做到:至少实现第 1 点(日志监控)。不然月底 OpenAI 的账单会教你做人。第 4 点(智能路由)是效果最好的成本优化,但需要额外的工程投入。


本章小结

知识点要点
TTFT 优化选小模型 > 压缩 Prompt > 裁剪历史 > 连接池
断线重连前端指数退避 + 后端 SSE 心跳保活
心跳机制: heartbeat\n\n 防代理超时断连
并发控制Semaphore 限流 + 队列排队 + 背压降级
成本监控stream_options={"include_usage": True} 获取用量
计费策略用户配额 + 预算预警 + 智能路由省 60%+ 成本

全书总结

恭喜你读到这里!我们从 LLM 的第一个 Token 出发,走通了流式输出的每一层

全链路回顾:

  第 1 章  为什么流式输出     → TTFT、进度感知、产品价值
  第 2 章  Token 生成机制     → 自回归解码、BPE、KV Cache
  第 3 章  传输协议           → SSE(首选)、WebSocket、gRPC
  第 4 章  LLM 提供商 API     → OpenAI / Anthropic / Ollama 统一封装
  第 5 章  后端中转           → FastAPI StreamingResponse + 错误处理
  第 6 章  前端渲染           → Fetch + ReadableStream + Markdown + React
  第 7 章  全栈实战           → 完整可运行的聊天应用
  第 8 章  生产级优化         → TTFT / 重连 / 并发 / 成本

  一句话总结:
    LLM 逐 Token 生成 → SSE 逐事件推送 → 后端逐 chunk 转发 → 前端逐帧渲染

流式输出不是一个 Feature,而是一种架构思维。 掌握了这条链路,你就掌握了构建现代 AI 应用的核心能力。

坚持是一种品格