AI 应用的流式输出全链路
从 LLM 的第一个 Token 到用户屏幕上的逐字渲染——拆解流式输出的每一层。
5. 后端流式中转——FastAPI / Node.js 实现
上一章我们封装了 stream_chat() 统一接口,能对接各大 LLM 提供商。但这个函数跑在哪里?总不能让浏览器直接调 OpenAI 吧?
这一章解决"中间层"的问题——搭建一个后端服务,把 LLM 的流式输出通过 SSE 转发给前端。
5.1 为什么不能前端直连 LLM API
新手最常犯的错误:在前端 JavaScript 里直接调 OpenAI API。
前端直连 LLM API 的三大致命问题:
┌─────────────────────────────────────────────┐
│ ❌ 问题 1:API Key 暴露 │
│ │
│ 前端代码对用户完全透明: │
│ → 打开 DevTools → Network → 看到请求 Header │
│ → Authorization: Bearer sk-abc123... │
│ → 你的 API Key 被偷了 │
│ → 别人用你的 Key 跑大量请求 → 巨额账单 │
│ │
│ 这不是"小心一点就能避免"的问题 │
│ → 任何在前端代码中出现的秘钥都是裸奔的 │
└─────────────────────────────────────────────┘
┌─────────────────────────────────────────────┐
│ ❌ 问题 2:无法加业务逻辑 │
│ │
│ 实际应用需要在 LLM 调用前后做很多事: │
│ → 用户认证(这个用户有没有登录?配额用完没?) │
│ → 内容审核(用户输入有没有敏感词?) │
│ → 对话历史管理(从数据库加载上下文) │
│ → Token 计费(记录用量、扣费) │
│ → 日志记录(排查问题、审计) │
│ → 这些逻辑放前端 = 全部可被绕过 │
└─────────────────────────────────────────────┘
┌─────────────────────────────────────────────┐
│ ❌ 问题 3:CORS 限制 │
│ │
│ 浏览器安全策略限制跨域请求 │
│ → 你的网站 https://myapp.com │
│ → 直连 https://api.openai.com │
│ → CORS 策略不允许(OpenAI 没给你开白名单) │
│ → 请求直接被浏览器拦截 │
└─────────────────────────────────────────────┘正确的架构
正确架构 vs 错误架构:
❌ 错误(前端直连):
浏览器 ──(带 API Key)──▶ api.openai.com
→ API Key 暴露、没有业务逻辑、CORS 报错
✅ 正确(后端中转):
浏览器 ──(带用户 Token)──▶ 你的后端 ──(带 API Key)──▶ api.openai.com
│ │
│ 用户看到的: │ 后端做的事:
│ • SSE 流式输出 │ • 验证用户身份
│ • 不知道 API Key │ • 加载对话历史
│ │ • 调用 LLM
│ │ • 记录 Token 用量
│ │ • 内容审核
│ │ • 写日志一句话总结:后端中转层不是可选的——它是 API Key 安全、业务逻辑、合规审计的必要基础。
5.2 FastAPI 流式中转实战(Python)
这一节我们用 FastAPI 搭建一个完整的流式中转服务——接收前端请求,调用 LLM,以 SSE 格式推流给浏览器。
最简版:5 分钟跑起来
python
# server.py —— FastAPI 流式中转服务(最简版)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from openai import AsyncOpenAI
import json
app = FastAPI()
# CORS:允许前端跨域请求
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # 你的前端地址
allow_methods=["*"],
allow_headers=["*"],
)
client = AsyncOpenAI() # 从环境变量读取 OPENAI_API_KEY
async def generate_stream(prompt: str):
"""调用 OpenAI 流式 API,转换为 SSE 格式"""
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
# 转成 SSE 格式推给前端
data = json.dumps({"content": content}, ensure_ascii=False)
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
@app.post("/api/chat")
async def chat(request: dict):
prompt = request.get("message", "")
return StreamingResponse(
generate_stream(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
},
)bash
# 启动服务
pip install fastapi uvicorn openai
OPENAI_API_KEY=sk-xxx uvicorn server:app --reload --port 8000
# 用 curl 测试
curl -X POST http://localhost:8000/api/chat \
-H "Content-Type: application/json" \
-d '{"message": "你好"}' \
--no-buffer
# → 看到逐行输出 data: {"content":"你"} ...生产版:鉴权 + 会话历史 + Token 计费
python
# server_production.py —— 生产级流式中转服务
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from openai import AsyncOpenAI
from pydantic import BaseModel
import json
import time
import logging
app = FastAPI()
client = AsyncOpenAI()
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════
# 请求模型
# ═══════════════════════════════════════════
class ChatRequest(BaseModel):
message: str
conversation_id: str = ""
model: str = "gpt-4o"
temperature: float = 0.7
max_tokens: int = 1024
# ═══════════════════════════════════════════
# 用户认证(简化示例)
# ═══════════════════════════════════════════
async def get_current_user(request: Request) -> dict:
token = request.headers.get("Authorization", "").replace("Bearer ", "")
if not token:
raise HTTPException(status_code=401, detail="未登录")
# 实际项目:验证 JWT / 查数据库
return {"user_id": "user_123", "plan": "pro", "token_quota": 100000}
# ═══════════════════════════════════════════
# 流式生成器(核心)
# ═══════════════════════════════════════════
async def generate_stream(
request: ChatRequest,
user: dict,
):
start_time = time.time()
total_tokens = 0
# 1. 加载对话历史(实际项目从数据库/Redis 读取)
history = load_conversation(request.conversation_id)
messages = history + [{"role": "user", "content": request.message}]
try:
# 2. 调用 LLM
stream = await client.chat.completions.create(
model=request.model,
messages=messages,
temperature=request.temperature,
max_tokens=request.max_tokens,
stream=True,
stream_options={"include_usage": True},
)
full_response = ""
async for chunk in stream:
# 3. 提取内容增量
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
data = json.dumps({"content": content}, ensure_ascii=False)
yield f"data: {data}\n\n"
# 4. 提取 Token 用量
if chunk.usage:
total_tokens = chunk.usage.total_tokens
# 5. 保存对话历史
save_conversation(
request.conversation_id,
messages + [{"role": "assistant", "content": full_response}],
)
# 6. 记录用量
logger.info(f"用户 {user['user_id']} | 模型 {request.model} | "
f"Token {total_tokens} | 耗时 {time.time()-start_time:.2f}s")
except Exception as e:
# 7. 错误处理——通过 SSE 推送错误信息
error_data = json.dumps({"error": str(e)}, ensure_ascii=False)
yield f"data: {error_data}\n\n"
yield "data: [DONE]\n\n"
# ═══════════════════════════════════════════
# API 端点
# ═══════════════════════════════════════════
@app.post("/api/chat")
async def chat(
request: ChatRequest,
user: dict = Depends(get_current_user),
):
return StreamingResponse(
generate_stream(request, user),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
# 辅助函数(实际用 Redis / PostgreSQL 实现)
def load_conversation(conv_id: str) -> list:
return [] # 占位
def save_conversation(conv_id: str, messages: list):
pass # 占位关键细节:X-Accel-Buffering
为什么要设置 X-Accel-Buffering: no ?
问题场景:
浏览器 ←──── Nginx ←──── FastAPI
│
└── Nginx 默认会缓冲后端响应
→ 攒够一定数据量才转发
→ 流式输出变成了"一坨一坨"的
解决:
X-Accel-Buffering: no
→ 告诉 Nginx 不要缓冲这个响应
→ 每个 SSE 事件立即转发给客户端
同理,如果用 CloudFlare / AWS ALB 等 CDN:
→ 也需要关闭响应缓冲
→ 否则流式体验会严重退化5.3 Express / Next.js 流式中转实战(Node.js)
如果你的前端是 React / Next.js 技术栈,用 Node.js 做后端中转会更自然。
Express 版
javascript
// server.js —— Express 流式中转
import express from 'express';
import OpenAI from 'openai';
import cors from 'cors';
const app = express();
app.use(cors());
app.use(express.json());
const openai = new OpenAI(); // 读取 OPENAI_API_KEY
app.post('/api/chat', async (req, res) => {
const { message } = req.body;
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
try {
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: message }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
});
app.listen(8000, () => console.log('Server running on :8000'));Next.js App Router 版(Route Handler)
typescript
// app/api/chat/route.ts —— Next.js 流式中转
import OpenAI from 'openai';
const openai = new OpenAI();
export async function POST(req: Request) {
const { message } = await req.json();
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: message }],
stream: true,
});
// 用 ReadableStream 包装为 SSE
const encoder = new TextEncoder();
const readable = new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
const data = `data: ${JSON.stringify({ content })}\n\n`;
controller.enqueue(encoder.encode(data));
}
}
controller.enqueue(encoder.encode('data: [DONE]\n\n'));
controller.close();
},
});
return new Response(readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}Python vs Node.js 中转对比
选 FastAPI 还是 Next.js 做中转?
适合用 FastAPI(Python):
✅ 后端是 Python 技术栈
✅ 需要复杂的业务逻辑(ML pipeline、数据处理)
✅ 团队熟悉 Python
✅ 需要用到第 4 章的统一封装(stream_chat())
适合用 Next.js(Node.js):
✅ 前端是 React / Next.js
✅ 想要全栈同一语言
✅ 简单的中转场景(不需要复杂后端逻辑)
✅ 部署在 Vercel 等 Serverless 平台
共同点:
→ 都用 StreamingResponse / ReadableStream
→ 都以 SSE 格式推给前端
→ 都需要设置 Cache-Control: no-cache
→ 前端代码完全一样(只是后端语言不同)5.4 流式中转的错误处理与优雅降级
流式输出的错误处理比非流式更复杂——因为响应已经开始发送了,你不能像普通 API 那样返回一个 HTTP 400/500。
流式场景的三类错误
流式中转可能遇到的错误:
类型 1:请求前错误(还没开始流)
──────────────────────────────
• 用户认证失败
• 参数校验失败
• 配额用完
→ 处理方式:正常返回 HTTP 4xx,跟普通 API 一样
类型 2:流式过程中 LLM 报错
──────────────────────────────
• OpenAI 限流(429 Too Many Requests)
• 模型过载(503 Service Unavailable)
• 网络中断
→ 此时 HTTP 200 已经发出去了!不能改状态码了
→ 处理方式:通过 SSE 事件推送错误信息
类型 3:流式过程中超时
──────────────────────────────
• LLM 生成太慢(> 60s)
• 网络抖动导致长时间无数据
→ 处理方式:设置超时 + 发送超时通知健壮的错误处理实现
python
# error_handling.py —— 流式中转的完整错误处理
import asyncio
import json
from openai import AsyncOpenAI, APIError, RateLimitError
client = AsyncOpenAI()
async def robust_stream(prompt: str, timeout: float = 60.0):
"""带超时和错误处理的流式生成器"""
try:
stream = await asyncio.wait_for(
client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
),
timeout=10.0, # 等待首个响应的超时(TTFT 超时)
)
last_chunk_time = asyncio.get_event_loop().time()
async for chunk in stream:
# 检查 Token 间超时
now = asyncio.get_event_loop().time()
if now - last_chunk_time > timeout:
yield _error_event("生成超时,请重试")
break
last_chunk_time = now
content = chunk.choices[0].delta.content
if content:
yield f"data: {json.dumps({'content': content}, ensure_ascii=False)}\n\n"
except asyncio.TimeoutError:
yield _error_event("请求超时:模型响应时间过长")
except RateLimitError:
yield _error_event("请求过于频繁,请稍后重试")
except APIError as e:
yield _error_event(f"AI 服务暂时不可用:{e.message}")
except Exception as e:
yield _error_event(f"系统错误:{str(e)}")
yield "data: [DONE]\n\n"
def _error_event(message: str) -> str:
"""生成标准化的 SSE 错误事件"""
data = json.dumps({"error": message}, ensure_ascii=False)
return f"data: {data}\n\n"优雅降级策略
当主模型不可用时的降级方案:
降级链:GPT-4o → GPT-4o-mini → Ollama 本地模型
async def stream_with_fallback(prompt, models=None):
models = models or [
StreamConfig(provider="openai", model="gpt-4o"),
StreamConfig(provider="openai", model="gpt-4o-mini"),
StreamConfig(provider="ollama", model="llama3:8b"),
]
for config in models:
try:
async for text in stream_chat(messages, config):
yield text
return # 成功了就不再尝试下一个
except Exception as e:
logger.warning(f"{config.model} 失败: {e},尝试降级...")
# 通知前端正在切换模型
yield _error_event(f"正在切换到备用模型...")
# 所有模型都失败
yield _error_event("所有 AI 模型暂时不可用,请稍后重试")
降级策略的关键原则:
1. 对用户透明——告知正在切换模型
2. 记录降级日志——便于排查和报警
3. 本地模型兜底——确保核心功能可用
4. 不要静默失败——总是给用户一个响应核心要点:流式场景的错误不能用 HTTP 状态码表达(因为 200 已经发出去了),必须通过SSE 事件内的 error 字段传递给前端。前端需要同时处理正常内容和错误事件。
本章小结
| 知识点 | 要点 |
|---|---|
| 为什么要中转 | API Key 安全、业务逻辑、CORS |
| FastAPI 方案 | StreamingResponse + 异步生成器 |
| Next.js 方案 | ReadableStream + Route Handler |
| X-Accel-Buffering | 必须关闭 Nginx/CDN 缓冲 |
| 流式错误处理 | 不能用 HTTP 状态码,用 SSE 事件推送 |
| 超时控制 | TTFT 超时 + Token 间超时 |
| 降级策略 | 大模型 → 小模型 → 本地模型 |
下一章预告:前端流式渲染 —— 浏览器如何接收这些 SSE 事件?Fetch + ReadableStream 手动解析、EventSource API、流式 Markdown 增量渲染,以及 React 中的状态管理。