AI 应用的流式输出全链路
从 LLM 的第一个 Token 到用户屏幕上的逐字渲染——拆解流式输出的每一层。
8. 生产级优化——延迟、错误、成本
前面 7 章解决了"能跑起来"的问题。这一章解决"能上线"的问题——真实用户、真实流量、真实账单下必须处理的优化。
8.1 TTFT 优化:从请求到第一个 Token
TTFT(Time To First Token)是流式体验的第一印象。用户点击发送后,等待第一个字出现的时间。超过 2 秒用户就会感觉"卡了"。
TTFT 的组成
TTFT 的完整分解(从用户点击"发送"到看到第一个字):
用户点击 → [前端处理] → [网络请求] → [后端处理] → [LLM Prefill] → 第一个 Token
~5ms ~50-200ms ~10-50ms ~200-2000ms
各阶段的优化方向:
┌──────────────────────────────────────────────────┐
│ 阶段 耗时 优化手段 │
│ ─────────────────────────────────────────────────│
│ 前端处理 ~5ms 几乎无需优化 │
│ 网络 RTT ~50-200ms CDN / 就近部署 │
│ 后端处理 ~10-50ms 预热连接池 / 缓存 │
│ LLM Prefill ~200-2000ms 模型选择 / Prompt 优化 │
│ ─────────────────────────────────────────────────│
│ 总 TTFT ~300-2300ms 重点优化 Prefill 阶段 │
└──────────────────────────────────────────────────┘实用优化技巧
TTFT 优化清单(按效果排序):
1. 选小模型(效果最大)⭐⭐⭐⭐⭐
→ GPT-4o-mini 的 TTFT 比 GPT-4o 快 2-3 倍
→ 简单对话用小模型,复杂推理才上大模型
→ 路由策略:先判断复杂度,再选模型
2. 压缩 System Prompt ⭐⭐⭐⭐
→ Prefill 时间与输入 Token 数正相关
→ 2000 Token 的 System Prompt → TTFT +200ms
→ 精简到 500 Token → 省 150ms
→ 用 "你是 XX 助手" 替代 5 页的角色设定
3. 裁剪对话历史 ⭐⭐⭐⭐
→ 不要把 100 轮对话全发给 LLM
→ 策略:保留 System + 最近 10 轮 + 摘要
→ 或者用 LLM 生成对话摘要,替代完整历史
4. 连接池预热 ⭐⭐⭐
→ OpenAI SDK 默认每次请求新建 TCP 连接
→ 设置 http_client 复用连接 → 省 50-100ms
→ AsyncOpenAI 会自动复用
5. 流式"骨架屏" ⭐⭐
→ 在等待 TTFT 期间,显示思考动画
→ "AI 正在思考..."(点点点动画)
→ 不是真正的优化,但改善了感知体验python
# ttft_optimization.py —— 对话历史裁剪策略
def trim_conversation(messages: list, max_turns: int = 10, max_tokens: int = 4000):
"""裁剪对话历史,控制 TTFT"""
# 保留 system prompt
system = [m for m in messages if m["role"] == "system"]
# 保留最后 N 轮对话(1 轮 = user + assistant)
non_system = [m for m in messages if m["role"] != "system"]
recent = non_system[-(max_turns * 2):]
trimmed = system + recent
# 估算 Token 数(粗略:1 中文字 ≈ 1.5 Token)
total_chars = sum(len(m["content"]) for m in trimmed)
estimated_tokens = int(total_chars * 1.5)
# 如果还是太长,进一步裁剪
while estimated_tokens > max_tokens and len(recent) > 2:
recent = recent[2:] # 删掉最早的一轮
trimmed = system + recent
total_chars = sum(len(m["content"]) for m in trimmed)
estimated_tokens = int(total_chars * 1.5)
return trimmed8.2 断线重连与错误恢复
流式连接可能因为网络抖动、代理超时、服务器重启而中断。用户看到的效果是"回复生成到一半就停了"——这比生成失败更让人难受。
前端断线检测与重连
javascript
// reconnect.js —— 带指数退避的断线重连策略
async function fetchSSEWithRetry(prompt, callbacks, maxRetries = 3) {
let retryCount = 0;
let receivedContent = ''; // 已经收到的内容
while (retryCount <= maxRetries) {
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
message: prompt,
// 断线重连时,告诉后端已接收到的内容长度
resume_from: receivedContent.length,
}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) {
callbacks.onDone();
return; // 正常完成,退出重连循环
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop();
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const raw = line.slice(6);
if (raw === '[DONE]') { callbacks.onDone(); return; }
try {
const data = JSON.parse(raw);
if (data.content) {
receivedContent += data.content;
callbacks.onToken(data.content);
}
} catch {}
}
}
} catch (err) {
if (err.name === 'AbortError') return; // 用户主动中断
retryCount++;
if (retryCount > maxRetries) {
callbacks.onError(new Error('多次重连失败,请刷新重试'));
return;
}
// 指数退避:1s → 2s → 4s
const delay = Math.pow(2, retryCount - 1) * 1000;
callbacks.onRetry(retryCount, delay);
await new Promise(r => setTimeout(r, delay));
}
}
}后端心跳保活
python
# heartbeat.py —— SSE 心跳机制
import asyncio
import json
async def generate_with_heartbeat(prompt: str, heartbeat_interval: float = 15.0):
"""带心跳的流式生成器——防止代理/负载均衡器超时断连"""
async def heartbeat():
"""每 15 秒发一个空注释,保持连接"""
while True:
await asyncio.sleep(heartbeat_interval)
yield ": heartbeat\n\n" # SSE 注释,客户端会忽略
# 与 LLM 流式输出交错发送心跳
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
last_activity = asyncio.get_event_loop().time()
async for chunk in stream:
now = asyncio.get_event_loop().time()
# 如果距离上次活动超过 15 秒,先发心跳
if now - last_activity > heartbeat_interval:
yield ": heartbeat\n\n"
content = chunk.choices[0].delta.content
if content:
data = json.dumps({"content": content}, ensure_ascii=False)
yield f"data: {data}\n\n"
last_activity = now
yield "data: [DONE]\n\n"为什么需要心跳?
问题场景:
Nginx / CloudFlare / AWS ALB 都有默认的代理超时
→ Nginx: proxy_read_timeout 默认 60s
→ CloudFlare: 100s
→ AWS ALB: 60s
如果 LLM 生成慢(复杂推理 / 高负载),两个 Token 之间间隔 > 60s
→ 代理以为连接死了 → 主动断开
→ 用户看到"生成中断"
解决:
每 15 秒发一个 SSE 注释 ": heartbeat\n\n"
→ 代理看到有数据流动 → 不断连
→ 客户端 SSE 解析器自动忽略注释行8.3 并发与背压控制
当 100 个用户同时发消息,你的后端会向 LLM 发起 100 个并发流式请求。LLM 不堪重负 → 所有人的速度都变慢 → 更多人点"重试" → 雪崩。
信号量限制并发
python
# concurrency.py —— 用 Semaphore 控制 LLM 并发请求数
import asyncio
from fastapi import FastAPI, HTTPException
app = FastAPI()
# 全局信号量:最多同时 20 个流式请求
MAX_CONCURRENT_STREAMS = 20
semaphore = asyncio.Semaphore(MAX_CONCURRENT_STREAMS)
# 当前活跃连接数(用于监控)
active_streams = 0
async def generate_stream_with_limit(request):
global active_streams
# 尝试获取信号量(不阻塞)
if semaphore.locked() and semaphore._value == 0:
# 已满,返回排队提示或拒绝
yield f"data: {json.dumps({'error': '当前使用人数较多,请稍后重试'})}\n\n"
yield "data: [DONE]\n\n"
return
async with semaphore:
active_streams += 1
try:
async for chunk in _call_llm(request):
yield chunk
finally:
active_streams -= 1
# 监控端点
@app.get("/api/metrics")
async def metrics():
return {
"active_streams": active_streams,
"max_streams": MAX_CONCURRENT_STREAMS,
"available": MAX_CONCURRENT_STREAMS - active_streams,
"utilization": f"{active_streams / MAX_CONCURRENT_STREAMS * 100:.0f}%",
}队列 + 优先级
高并发场景的三种策略:
策略 1:直接拒绝(最简单)
→ 超过并发上限 → 返回 429
→ 适合:非关键场景、有客户端重试的情况
策略 2:排队等待(更友好)
→ 超过上限 → 进入队列 → 轮到了再执行
→ 通过 SSE 推送排队位置:"您前面还有 3 人"
→ 适合:用户愿意等待的场景
策略 3:优先级队列(企业级)
→ VIP 用户优先、免费用户排队
→ 基于用户等级分配不同的并发额度
→ 适合:SaaS 产品、多租户平台
实现要点:
→ asyncio.Queue 做队列
→ asyncio.PriorityQueue 做优先级
→ Redis + Lua 脚本做分布式限流背压监控
python
# backpressure.py —— 背压检测与自适应降级
import time
class BackpressureMonitor:
"""监控 LLM 响应速度,自动降级"""
def __init__(self):
self.ttft_history = [] # 最近 100 次 TTFT
self.speed_history = [] # 最近 100 次 tok/s
def record(self, ttft: float, tokens_per_sec: float):
self.ttft_history.append(ttft)
self.speed_history.append(tokens_per_sec)
# 只保留最近 100 条
self.ttft_history = self.ttft_history[-100:]
self.speed_history = self.speed_history[-100:]
@property
def avg_ttft(self) -> float:
return sum(self.ttft_history) / len(self.ttft_history) if self.ttft_history else 0
@property
def should_degrade(self) -> bool:
"""当平均 TTFT > 3s 或 速度 < 10 tok/s 时,触发降级"""
return self.avg_ttft > 3.0 or (
self.speed_history and
sum(self.speed_history[-10:]) / 10 < 10
)
def get_recommended_model(self, default: str) -> str:
"""根据背压情况推荐模型"""
if self.should_degrade:
# 自动切换到更小/更快的模型
return "gpt-4o-mini" if "gpt-4o" in default else default
return default核心原则:流式连接是长连接,一个请求可能持续 10-30 秒。如果不限制并发,20 个用户就能耗尽你的 LLM 配额和服务器资源。信号量 + 队列 + 背压降级,三者缺一不可。
8.4 成本监控与流式计费
LLM API 不便宜。一次对话可能花几毛钱,10 万用户每月的 Token 费用可能超过 10 万人民币。不做成本监控 = 等着被账单吓一跳。
主流 API 价格参考
主流 LLM API 定价(2024 年末参考价):
提供商 模型 输入价格 输出价格
─────────────────────────────────────────────────────────────
OpenAI GPT-4o $2.50/1M tokens $10.00/1M tokens
OpenAI GPT-4o-mini $0.15/1M tokens $0.60/1M tokens
Anthropic Claude 3.5 Sonnet $3.00/1M tokens $15.00/1M tokens
Anthropic Claude 3.5 Haiku $0.80/1M tokens $4.00/1M tokens
本地 Ollama Llama3 8B 免费 免费(电费除外)
成本估算示例(1 万次对话/天):
每次对话:~500 输入 Token + ~1000 输出 Token
GPT-4o: (500×2.5 + 1000×10)/1M × 10000 = $112.5/天 ≈ ¥800/天
GPT-4o-mini: (500×0.15 + 1000×0.6)/1M × 10000 = $6.75/天 ≈ ¥48/天
→ 小模型便宜 16 倍!流式计费中间件
python
# billing.py —— 流式输出中的实时 Token 计费
from dataclasses import dataclass, field
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
@dataclass
class UsageRecord:
user_id: str
model: str
input_tokens: int = 0
output_tokens: int = 0
start_time: datetime = field(default_factory=datetime.now)
duration_ms: float = 0
@property
def cost_usd(self) -> float:
"""估算本次请求成本"""
pricing = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
}
p = pricing.get(self.model, {"input": 1.0, "output": 3.0})
return (self.input_tokens * p["input"] + self.output_tokens * p["output"]) / 1_000_000
async def generate_with_billing(request, user_id: str):
"""带计费统计的流式生成器"""
import time
record = UsageRecord(user_id=user_id, model=request.model)
start = time.time()
output_token_count = 0
stream = await client.chat.completions.create(
model=request.model,
messages=request.messages,
stream=True,
stream_options={"include_usage": True},
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
output_token_count += 1
yield f"data: {json.dumps({'content': content}, ensure_ascii=False)}\n\n"
# 最后一个 chunk 包含准确的 usage 信息
if chunk.usage:
record.input_tokens = chunk.usage.prompt_tokens
record.output_tokens = chunk.usage.completion_tokens
record.duration_ms = (time.time() - start) * 1000
# 记录到数据库 / 日志(异步,不阻塞响应)
logger.info(f"📊 计费 | 用户:{record.user_id} | 模型:{record.model} | "
f"输入:{record.input_tokens} | 输出:{record.output_tokens} | "
f"成本:${record.cost_usd:.4f} | 耗时:{record.duration_ms:.0f}ms")
yield "data: [DONE]\n\n"成本预警与配额管理
成本控制策略(从简到繁):
1. 日志监控(最低要求)
→ 每次请求记录 Token 用量和成本
→ 每日/每周汇总报表
→ 发现异常就手动介入
2. 用户配额
→ 免费用户:每天 20 次对话
→ 付费用户:每月 10 万 Token
→ 超过配额 → 返回 "配额已用完,请明天再试"
3. 预算预警
→ 设置日预算上限(如 $100/天)
→ 接近上限时发送 Slack/邮件报警
→ 超过上限自动切换到更便宜的模型
4. 智能路由
→ 简单问题("你好"、"帮我翻译")→ GPT-4o-mini
→ 复杂问题(代码生成、长文分析)→ GPT-4o
→ 可以用一个微调的分类器判断问题复杂度
→ 节省 60-80% 的成本上线前务必做到:至少实现第 1 点(日志监控)。不然月底 OpenAI 的账单会教你做人。第 4 点(智能路由)是效果最好的成本优化,但需要额外的工程投入。
本章小结
| 知识点 | 要点 |
|---|---|
| TTFT 优化 | 选小模型 > 压缩 Prompt > 裁剪历史 > 连接池 |
| 断线重连 | 前端指数退避 + 后端 SSE 心跳保活 |
| 心跳机制 | : heartbeat\n\n 防代理超时断连 |
| 并发控制 | Semaphore 限流 + 队列排队 + 背压降级 |
| 成本监控 | stream_options={"include_usage": True} 获取用量 |
| 计费策略 | 用户配额 + 预算预警 + 智能路由省 60%+ 成本 |
全书总结
恭喜你读到这里!我们从 LLM 的第一个 Token 出发,走通了流式输出的每一层:
全链路回顾:
第 1 章 为什么流式输出 → TTFT、进度感知、产品价值
第 2 章 Token 生成机制 → 自回归解码、BPE、KV Cache
第 3 章 传输协议 → SSE(首选)、WebSocket、gRPC
第 4 章 LLM 提供商 API → OpenAI / Anthropic / Ollama 统一封装
第 5 章 后端中转 → FastAPI StreamingResponse + 错误处理
第 6 章 前端渲染 → Fetch + ReadableStream + Markdown + React
第 7 章 全栈实战 → 完整可运行的聊天应用
第 8 章 生产级优化 → TTFT / 重连 / 并发 / 成本
一句话总结:
LLM 逐 Token 生成 → SSE 逐事件推送 → 后端逐 chunk 转发 → 前端逐帧渲染流式输出不是一个 Feature,而是一种架构思维。 掌握了这条链路,你就掌握了构建现代 AI 应用的核心能力。