AI 应用的流式输出全链路
从 LLM 的第一个 Token 到用户屏幕上的逐字渲染——拆解流式输出的每一层。
附录
A. 流式输出 API 速查表
OpenAI 流式调用
python
# Python
from openai import AsyncOpenAI
client = AsyncOpenAI()
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "你好"}],
stream=True,
stream_options={"include_usage": True}, # 获取 Token 用量
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
if chunk.usage:
print(f"\nTokens: {chunk.usage.total_tokens}")javascript
// JavaScript
import OpenAI from 'openai';
const openai = new OpenAI();
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: '你好' }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) process.stdout.write(content);
}Anthropic 流式调用
python
# Python
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": "你好"}],
) as stream:
async for text in stream.text_stream:
print(text, end="")Ollama / vLLM(OpenAI 兼容模式)
python
# Python —— 只需改 base_url
from openai import AsyncOpenAI
# Ollama
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="ollama")
# vLLM
client = AsyncOpenAI(base_url="http://localhost:8000/v1", api_key="token")
# 调用方式完全一样
stream = await client.chat.completions.create(
model="llama3:8b", # Ollama 模型名
messages=[{"role": "user", "content": "你好"}],
stream=True,
)FastAPI SSE 端点
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
async def sse_generator():
# ... 调用 LLM 流式 API ...
for token in tokens:
yield f"data: {json.dumps({'content': token}, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
@app.post("/api/chat")
async def chat(request: dict):
return StreamingResponse(
sse_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)前端 Fetch + ReadableStream
javascript
async function fetchSSE(prompt, onToken) {
const res = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: prompt }),
});
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buf = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
const lines = buf.split('\n');
buf = lines.pop();
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const raw = line.slice(6);
if (raw === '[DONE]') return;
const { content } = JSON.parse(raw);
if (content) onToken(content);
}
}
}B. 常见问题与排查
1. 流式输出变成"一坨一坨"的
症状:不是逐字出现,而是攒一大段才显示
排查顺序:
1. Nginx 缓冲 → 加 X-Accel-Buffering: no
2. CloudFlare → 关闭响应缓冲(或用 Enterprise 版)
3. AWS ALB → 检查 idle_timeout 设置
4. 后端代码 → 确认 yield 后没有额外缓冲
5. gzip 压缩 → 流式响应不要开 gzip(会攒数据再压缩)2. EventSource 不能发 POST 请求
解决方案(三选一):
1. 用 Fetch + ReadableStream 手动解析 SSE(推荐)
2. 用 @microsoft/fetch-event-source 库
3. 后端同时提供 GET 端点(把参数放 URL query)3. 流式输出中文乱码
原因:
→ UTF-8 多字节字符被 chunk 切断
→ 一个中文字 = 3 字节 UTF-8
→ chunk 可能在第 2 字节处断开
解决:
→ TextDecoder 的 { stream: true } 选项(自动处理碎片)
const decoder = new TextDecoder(); // ❌ 可能乱码
const decoder = new TextDecoder();
decoder.decode(value, { stream: true }); // ✅ 处理碎片4. OpenAI 返回 429 Too Many Requests
原因:超过了 API 速率限制
解决:
1. 加指数退避重试(1s → 2s → 4s)
2. 减少并发请求数(Semaphore 限制)
3. 升级 OpenAI 账户等级(Tier 1 → Tier 2+)
4. 多 API Key 轮换5. 流式输出到一半突然停了
排查:
1. 检查 finish_reason:
→ "length" → max_tokens 太小,加大限制
→ "content_filter" → 触发安全过滤
→ null + 停了 → 网络断连
2. 检查代理超时:
→ Nginx proxy_read_timeout 默认 60s
→ 加心跳保活(: heartbeat\n\n)
3. 检查客户端:
→ AbortController 是否被意外触发
→ 浏览器 tab 切到后台是否被节流6. CORS 报错
Access-Control-Allow-Origin 错误:
后端加中间件:
# FastAPI
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_methods=["*"],
allow_headers=["*"],
)
开发时用 Vite 代理更简单:
// vite.config.js
server: { proxy: { '/api': 'http://localhost:8000' } }C. 推荐工具与库
Python
| 工具 | 用途 | 推荐理由 |
|---|---|---|
openai | OpenAI/兼容 API 调用 | 官方 SDK,AsyncOpenAI 支持异步 |
anthropic | Anthropic Claude 调用 | 官方 SDK,.text_stream 便捷 |
fastapi | 后端框架 | 原生异步 + StreamingResponse |
uvicorn | ASGI 服务器 | FastAPI 的标配 |
tiktoken | Token 计数 | OpenAI 官方分词器 |
httpx | HTTP 客户端 | 异步 + 流式请求 |
litellm | 多提供商统一调用 | 100+ 模型统一 API |
JavaScript / TypeScript
| 工具 | 用途 | 推荐理由 |
|---|---|---|
openai | OpenAI SDK | 官方 Node.js SDK |
@anthropic-ai/sdk | Anthropic SDK | 官方 SDK |
@microsoft/fetch-event-source | SSE 客户端 | 支持 POST + Header |
react-markdown | Markdown 渲染 | React 生态首选 |
remark-gfm | GFM 支持 | 表格、删除线等 |
ai (Vercel AI SDK) | 全栈 AI 工具包 | useChat Hook 开箱即用 |
开发调试
| 工具 | 用途 |
|---|---|
curl --no-buffer | 命令行测试 SSE 输出 |
| Chrome DevTools → Network → EventStream | 浏览器查看 SSE 事件 |
| Ollama | 本地跑 LLM,零成本测试流式 |
httpie --stream | 比 curl 更友好的 HTTP 客户端 |
生产部署
| 工具 | 用途 |
|---|---|
| Docker + Docker Compose | 容器化部署 |
| Nginx(关闭缓冲) | 反向代理 |
| Redis | 会话存储 / 分布式限流 |
| Prometheus + Grafana | TTFT / Token 用量监控 |
| Sentry | 错误追踪(流式异常捕获) |
Vercel AI SDK 特别推荐:如果你用 Next.js,Vercel 的
ai包提供了useChatHook——一行代码搞定流式对话,自动处理 SSE 解析、状态管理、中断控制。适合快速搭原型。
全书完。 🎉