Skip to content

AI 应用的流式输出全链路

从 LLM 的第一个 Token 到用户屏幕上的逐字渲染——拆解流式输出的每一层。


附录


A. 流式输出 API 速查表

OpenAI 流式调用

python
# Python
from openai import AsyncOpenAI
client = AsyncOpenAI()

stream = await client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "你好"}],
    stream=True,
    stream_options={"include_usage": True},  # 获取 Token 用量
)

async for chunk in stream:
    if chunk.choices and chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")
    if chunk.usage:
        print(f"\nTokens: {chunk.usage.total_tokens}")
javascript
// JavaScript
import OpenAI from 'openai';
const openai = new OpenAI();

const stream = await openai.chat.completions.create({
  model: 'gpt-4o',
  messages: [{ role: 'user', content: '你好' }],
  stream: true,
});

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content;
  if (content) process.stdout.write(content);
}

Anthropic 流式调用

python
# Python
from anthropic import AsyncAnthropic
client = AsyncAnthropic()

async with client.messages.stream(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=[{"role": "user", "content": "你好"}],
) as stream:
    async for text in stream.text_stream:
        print(text, end="")

Ollama / vLLM(OpenAI 兼容模式)

python
# Python —— 只需改 base_url
from openai import AsyncOpenAI

# Ollama
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="ollama")

# vLLM
client = AsyncOpenAI(base_url="http://localhost:8000/v1", api_key="token")

# 调用方式完全一样
stream = await client.chat.completions.create(
    model="llama3:8b",  # Ollama 模型名
    messages=[{"role": "user", "content": "你好"}],
    stream=True,
)

FastAPI SSE 端点

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

async def sse_generator():
    # ... 调用 LLM 流式 API ...
    for token in tokens:
        yield f"data: {json.dumps({'content': token}, ensure_ascii=False)}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/api/chat")
async def chat(request: dict):
    return StreamingResponse(
        sse_generator(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

前端 Fetch + ReadableStream

javascript
async function fetchSSE(prompt, onToken) {
  const res = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message: prompt }),
  });

  const reader = res.body.getReader();
  const decoder = new TextDecoder();
  let buf = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    buf += decoder.decode(value, { stream: true });
    const lines = buf.split('\n');
    buf = lines.pop();
    for (const line of lines) {
      if (!line.startsWith('data: ')) continue;
      const raw = line.slice(6);
      if (raw === '[DONE]') return;
      const { content } = JSON.parse(raw);
      if (content) onToken(content);
    }
  }
}

B. 常见问题与排查

1. 流式输出变成"一坨一坨"的

症状:不是逐字出现,而是攒一大段才显示

排查顺序:
  1. Nginx 缓冲 → 加 X-Accel-Buffering: no
  2. CloudFlare → 关闭响应缓冲(或用 Enterprise 版)
  3. AWS ALB → 检查 idle_timeout 设置
  4. 后端代码 → 确认 yield 后没有额外缓冲
  5. gzip 压缩 → 流式响应不要开 gzip(会攒数据再压缩)

2. EventSource 不能发 POST 请求

解决方案(三选一):
  1. 用 Fetch + ReadableStream 手动解析 SSE(推荐)
  2. 用 @microsoft/fetch-event-source 库
  3. 后端同时提供 GET 端点(把参数放 URL query)

3. 流式输出中文乱码

原因:
  → UTF-8 多字节字符被 chunk 切断
  → 一个中文字 = 3 字节 UTF-8
  → chunk 可能在第 2 字节处断开

解决:
  → TextDecoder 的 { stream: true } 选项(自动处理碎片)
  const decoder = new TextDecoder();           // ❌ 可能乱码
  const decoder = new TextDecoder();
  decoder.decode(value, { stream: true });     // ✅ 处理碎片

4. OpenAI 返回 429 Too Many Requests

原因:超过了 API 速率限制

解决:
  1. 加指数退避重试(1s → 2s → 4s)
  2. 减少并发请求数(Semaphore 限制)
  3. 升级 OpenAI 账户等级(Tier 1 → Tier 2+)
  4. 多 API Key 轮换

5. 流式输出到一半突然停了

排查:
  1. 检查 finish_reason:
     → "length" → max_tokens 太小,加大限制
     → "content_filter" → 触发安全过滤
     → null + 停了 → 网络断连

  2. 检查代理超时:
     → Nginx proxy_read_timeout 默认 60s
     → 加心跳保活(: heartbeat\n\n)

  3. 检查客户端:
     → AbortController 是否被意外触发
     → 浏览器 tab 切到后台是否被节流

6. CORS 报错

Access-Control-Allow-Origin 错误:

  后端加中间件:
    # FastAPI
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["http://localhost:3000"],
        allow_methods=["*"],
        allow_headers=["*"],
    )

  开发时用 Vite 代理更简单:
    // vite.config.js
    server: { proxy: { '/api': 'http://localhost:8000' } }

C. 推荐工具与库

Python

工具用途推荐理由
openaiOpenAI/兼容 API 调用官方 SDK,AsyncOpenAI 支持异步
anthropicAnthropic Claude 调用官方 SDK,.text_stream 便捷
fastapi后端框架原生异步 + StreamingResponse
uvicornASGI 服务器FastAPI 的标配
tiktokenToken 计数OpenAI 官方分词器
httpxHTTP 客户端异步 + 流式请求
litellm多提供商统一调用100+ 模型统一 API

JavaScript / TypeScript

工具用途推荐理由
openaiOpenAI SDK官方 Node.js SDK
@anthropic-ai/sdkAnthropic SDK官方 SDK
@microsoft/fetch-event-sourceSSE 客户端支持 POST + Header
react-markdownMarkdown 渲染React 生态首选
remark-gfmGFM 支持表格、删除线等
ai (Vercel AI SDK)全栈 AI 工具包useChat Hook 开箱即用

开发调试

工具用途
curl --no-buffer命令行测试 SSE 输出
Chrome DevTools → Network → EventStream浏览器查看 SSE 事件
Ollama本地跑 LLM,零成本测试流式
httpie --stream比 curl 更友好的 HTTP 客户端

生产部署

工具用途
Docker + Docker Compose容器化部署
Nginx(关闭缓冲)反向代理
Redis会话存储 / 分布式限流
Prometheus + GrafanaTTFT / Token 用量监控
Sentry错误追踪(流式异常捕获)

Vercel AI SDK 特别推荐:如果你用 Next.js,Vercel 的 ai 包提供了 useChat Hook——一行代码搞定流式对话,自动处理 SSE 解析、状态管理、中断控制。适合快速搭原型。


全书完。 🎉

坚持是一种品格