AI 应用的流式输出全链路
从 LLM 的第一个 Token 到用户屏幕上的逐字渲染——拆解流式输出的每一层。
6. 前端流式渲染——从字节流到用户看到的文字
后端已经以 SSE 格式把 Token 推过来了。现在轮到前端:如何接收这些事件,并实时渲染到页面上?
这一章解决"最后一公里"——从浏览器收到字节流,到用户看到逐字呈现的文字,中间的每一步该怎么写。
6.1 Fetch + ReadableStream:手动解析 SSE
第 3 章提过,浏览器原生的 EventSource API 有两个硬伤:不支持 POST 请求、不支持自定义 Header。AI 应用通常需要两者(POST 发消息体、Header 带 Bearer Token),所以实战中更常用 Fetch API + ReadableStream 手动解析 SSE。
基础实现
// fetchSSE.js —— 用 Fetch API 手动解析 SSE 流
async function fetchSSE(prompt, onToken, onDone, onError) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer user-token-xxx', // ← EventSource 做不到
},
body: JSON.stringify({ message: prompt }),
});
if (!response.ok) {
onError(new Error(`HTTP ${response.status}`));
return;
}
// 拿到 ReadableStream
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = ''; // 缓冲区:处理不完整的 SSE 行
while (true) {
const { done, value } = await reader.read();
if (done) break;
// 字节 → 文本
buffer += decoder.decode(value, { stream: true });
// 按行解析 SSE(以 \n\n 分隔事件)
const lines = buffer.split('\n');
buffer = lines.pop(); // 最后一行可能不完整,放回缓冲区
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6); // 去掉 "data: " 前缀
if (data === '[DONE]') {
onDone();
return;
}
try {
const parsed = JSON.parse(data);
if (parsed.error) {
onError(new Error(parsed.error));
} else if (parsed.content) {
onToken(parsed.content); // 逐 Token 回调
}
} catch (e) {
// JSON 解析失败,忽略这行
}
}
}
}
}使用示例
// 使用方式
fetchSSE(
'你好,请介绍一下自己',
(token) => {
// 每收到一个 Token 就追加到页面
document.getElementById('output').textContent += token;
},
() => {
console.log('流式输出完成');
},
(error) => {
console.error('出错了:', error.message);
}
);关键细节:缓冲区为什么必要
ReadableStream 的 chunk 不保证按行对齐!
理想情况(每个 chunk 都是完整的 SSE 事件):
chunk 1: "data: {\"content\":\"你\"}\n\n"
chunk 2: "data: {\"content\":\"好\"}\n\n"
→ 每次刚好一个完整事件,直接解析
实际情况(chunk 在任意位置断开):
chunk 1: "data: {\"content\":\"你\"}\n\ndata: {\"con"
chunk 2: "tent\":\"好\"}\n\n"
→ chunk 1 包含了 1.5 个事件
→ chunk 2 是前一个事件的后半部分
→ 不做缓冲 → JSON.parse 报错
解决方案(上面代码已实现):
1. 所有 chunk 先追加到 buffer
2. 按 \n 分割
3. 最后一个(可能不完整的)放回 buffer
4. 完整的行才解析支持中断(AbortController)
// 支持用户点击"停止生成"
function fetchSSEWithAbort(prompt, callbacks) {
const controller = new AbortController();
fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: prompt }),
signal: controller.signal, // ← 关联 AbortController
})
.then(async (response) => {
const reader = response.body.getReader();
// ... 解析逻辑同上
})
.catch((err) => {
if (err.name === 'AbortError') {
console.log('用户主动中断');
} else {
callbacks.onError(err);
}
});
// 返回中断函数
return () => controller.abort();
}
// 使用
const stop = fetchSSEWithAbort('你好', { onToken, onDone, onError });
// 用户点击"停止"按钮
document.getElementById('stopBtn').onclick = stop;实战首选:Fetch + ReadableStream 是目前 AI 应用前端对接流式输出的主流方案。ChatGPT 网页版、Claude 网页版都是这个路子。
6.2 EventSource API:浏览器原生方案
虽然 Fetch + ReadableStream 是主流,但 EventSource 仍有其价值——简单到不需要写解析逻辑。
基本用法
// eventSource.js —— 浏览器原生 SSE 接收
// ⚠️ 只能 GET 请求,不能自定义 Header
const eventSource = new EventSource('/api/stream?prompt=你好');
// 收到消息
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const { content } = JSON.parse(event.data);
document.getElementById('output').textContent += content;
};
// 自动重连(EventSource 的杀手特性)
// → 断线后浏览器会自动重连,并带上 Last-Event-Id
// → 服务器可以据此恢复推送
// 错误处理
eventSource.onerror = (err) => {
if (eventSource.readyState === EventSource.CLOSED) {
console.log('连接已关闭');
} else {
console.log('连接出错,正在自动重连...');
}
};EventSource 的局限性
EventSource vs Fetch 对比:
特性 EventSource Fetch + ReadableStream
──────────────────────────────────────────────────────────────────
请求方法 GET only ❌ 任意(POST ✅)
自定义 Header 不支持 ❌ 支持 ✅
请求体(body) 不支持 ❌ 支持 ✅
自动重连 内置 ✅ 需手动实现
SSE 解析 自动 ✅ 需手动解析
中断控制 close() AbortController
浏览器兼容 IE 不支持 现代浏览器均支持
结论:
→ 简单的公开接口(无需认证)→ EventSource
→ 需要 POST + Token 认证 → Fetch API(AI 应用几乎都是这种)用第三方库弥补 EventSource 的缺陷
// 使用 @microsoft/fetch-event-source 库
// npm install @microsoft/fetch-event-source
import { fetchEventSource } from '@microsoft/fetch-event-source';
await fetchEventSource('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer token', // ← 支持自定义 Header!
},
body: JSON.stringify({ message: '你好' }),
onmessage(event) {
if (event.data === '[DONE]') return;
const { content } = JSON.parse(event.data);
document.getElementById('output').textContent += content;
},
onerror(err) {
console.error('SSE Error:', err);
throw err; // 抛出错误停止重连
},
});推荐策略:如果不想手写 SSE 解析,用
@microsoft/fetch-event-source(微软出品)。它结合了 EventSource 的简洁和 Fetch 的灵活——支持 POST、自定义 Header、自动重连,API 和原生 EventSource 几乎一样。
6.3 流式 Markdown 渲染的难点与方案
AI 的回复通常包含 Markdown(标题、列表、代码块、表格)。流式输出时,Markdown 是残缺的——你拿到的是一个不断增长的半成品。如何把它渲染得不闪不跳?
核心难题
流式 Markdown 的三大难题:
难题 1:不完整的语法
Token 序列: "## " → "标题" → "\n\n" → "正文"
第 1 步收到: "## " → 该渲染成 h2 吗?标题文字还没来
第 2 步收到: "## 标题" → 现在显示 h2
第 3 步收到: "## 标题\n\n" → 确认完整了
→ 如果每步都重新解析,标题会"闪烁"
难题 2:代码块状态
Token 序列: "```" → "python" → "\n" → "print" → "(" → ...
收到 "```": → 这是代码块的开始?还是行内代码?
收到 "```python\n": → 确认是代码块
还没收到结尾 "```": → 代码块一直"开着"
→ 后续所有文本都被吞进代码块里
难题 3:频繁重渲染
每秒可能收到 50-100 个 Token
每个 Token 都触发一次:Markdown 解析 → DOM 更新
→ 性能下降、页面闪烁、用户体验差方案 1:直接追加纯文本(最简单)
// 最简方案:不解析 Markdown,直接推纯文本
// 适合原型开发、快速验证
let content = '';
function onToken(token) {
content += token;
document.getElementById('output').textContent = content;
}
// 优点:极简、无闪烁
// 缺点:没有 Markdown 渲染(代码块、列表全是纯文本)方案 2:react-markdown + 防抖(推荐)
// StreamingMarkdown.jsx —— React 流式 Markdown 渲染
import { useState, useRef, useCallback } from 'react';
import ReactMarkdown from 'react-markdown';
import remarkGfm from 'remark-gfm';
import { Prism as SyntaxHighlighter } from 'react-syntax-highlighter';
import { oneDark } from 'react-syntax-highlighter/dist/esm/styles/prism';
function StreamingMarkdown() {
const [content, setContent] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const bufferRef = useRef(''); // 累积缓冲
const rafRef = useRef(null); // requestAnimationFrame ID
const flushBuffer = useCallback(() => {
// 用 requestAnimationFrame 批量更新
// → 每帧只渲染一次,无论这一帧收到多少个 Token
setContent(bufferRef.current);
rafRef.current = null;
}, []);
const onToken = useCallback((token) => {
bufferRef.current += token;
// 如果还没安排渲染,安排一次
if (!rafRef.current) {
rafRef.current = requestAnimationFrame(flushBuffer);
}
}, [flushBuffer]);
return (
<div className="markdown-body">
<ReactMarkdown
remarkPlugins={[remarkGfm]}
components={{
// 自定义代码块渲染(带语法高亮)
code({ node, inline, className, children, ...props }) {
const match = /language-(\w+)/.exec(className || '');
return !inline && match ? (
<SyntaxHighlighter
style={oneDark}
language={match[1]}
PreTag="div"
>
{String(children).replace(/\n$/, '')}
</SyntaxHighlighter>
) : (
<code className={className} {...props}>
{children}
</code>
);
},
}}
>
{content}
</ReactMarkdown>
</div>
);
}方案 3:增量解析(高性能场景)
高性能方案:只解析变化的部分
传统方案(每次全量解析):
Token 1:解析 "你" → 生成完整 AST → 渲染
Token 2:解析 "你好" → 生成完整 AST → 渲染
Token 3:解析 "你好!" → 生成完整 AST → 渲染
→ 每次都重新解析全部文本,O(n²) 复杂度
增量方案:
Token 1:追加 "你" 到最后一个文本节点 → 只更新一个 DOM 节点
Token 2:追加 "好" 到最后一个文本节点 → 只更新一个 DOM 节点
Token 3:追加 "!" 到最后一个文本节点 → 只更新一个 DOM 节点
遇到 "\n\n":重新解析最后一个块 → 可能变成新段落
遇到 "```":进入代码块模式 → 特殊处理
→ 大部分 Token 只需 O(1) 操作
推荐库:
→ marked(轻量级 Markdown 解析器,支持流式)
→ markdown-it(更完整,可自定义插件)requestAnimationFrame 批量更新的原理
为什么用 requestAnimationFrame?
问题:
100 tokens/s × 每个 Token 触发一次 setState
= 每秒 100 次 React 重渲染
= 页面卡死
解决:requestAnimationFrame 批量合并
时间线:
0ms Token "你" → 追加到 buffer, 安排 rAF
5ms Token "好" → 追加到 buffer(rAF 已安排,跳过)
10ms Token "!" → 追加到 buffer(跳过)
16ms rAF 触发 → 一次性渲染 "你好!" ← 3 个 Token 合并为 1 次渲染
20ms Token "今" → 追加到 buffer, 安排新 rAF
...
效果:
→ 无论 Token 速度多快,每帧最多渲染一次
→ 60fps 显示器 = 每秒最多 60 次渲染 = 流畅不卡
→ 用户感知不到合并(16ms 内的差异肉眼不可见)实战建议:中小项目用方案 2(react-markdown + rAF 批量更新)即可。只有在内容超过 1 万字、需要实时语法高亮的场景下,才值得投入精力做增量解析。
6.4 React 中的流式状态管理
实际的 AI 对话界面要管理的状态远不止"一段文本"——多轮对话、加载状态、中断控制、自动滚动……这一节给出一个生产可用的完整方案。
useStreamChat:封装一切的自定义 Hook
// useStreamChat.js —— 流式对话的自定义 Hook
import { useState, useRef, useCallback } from 'react';
export function useStreamChat() {
const [messages, setMessages] = useState([]); // 完整对话历史
const [isStreaming, setIsStreaming] = useState(false);
const abortRef = useRef(null);
const bufferRef = useRef('');
const sendMessage = useCallback(async (prompt) => {
// 1. 添加用户消息
const userMsg = { role: 'user', content: prompt };
const assistantMsg = { role: 'assistant', content: '' };
setMessages(prev => [...prev, userMsg, assistantMsg]);
setIsStreaming(true);
bufferRef.current = '';
// 2. 发起流式请求
const controller = new AbortController();
abortRef.current = controller;
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${getToken()}`,
},
body: JSON.stringify({ message: prompt }),
signal: controller.signal,
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop();
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6);
if (data === '[DONE]') break;
try {
const parsed = JSON.parse(data);
if (parsed.content) {
bufferRef.current += parsed.content;
// 更新最后一条 assistant 消息
setMessages(prev => {
const updated = [...prev];
updated[updated.length - 1] = {
...updated[updated.length - 1],
content: bufferRef.current,
};
return updated;
});
}
} catch {}
}
}
} catch (err) {
if (err.name !== 'AbortError') {
// 真正的错误:更新最后一条消息为错误信息
setMessages(prev => {
const updated = [...prev];
updated[updated.length - 1] = {
...updated[updated.length - 1],
content: bufferRef.current || '生成失败,请重试。',
error: true,
};
return updated;
});
}
} finally {
setIsStreaming(false);
abortRef.current = null;
}
}, []);
const stopGeneration = useCallback(() => {
abortRef.current?.abort();
}, []);
return { messages, isStreaming, sendMessage, stopGeneration };
}
function getToken() { return localStorage.getItem('token') || ''; }使用这个 Hook 构建聊天界面
// ChatPage.jsx —— 完整的流式聊天页面
import { useRef, useEffect } from 'react';
import ReactMarkdown from 'react-markdown';
import { useStreamChat } from './useStreamChat';
function ChatPage() {
const { messages, isStreaming, sendMessage, stopGeneration } = useStreamChat();
const bottomRef = useRef(null);
const inputRef = useRef(null);
// 自动滚动到底部
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages]);
const handleSubmit = (e) => {
e.preventDefault();
const prompt = inputRef.current.value.trim();
if (!prompt || isStreaming) return;
inputRef.current.value = '';
sendMessage(prompt);
};
return (
<div className="chat-container">
{/* 消息列表 */}
<div className="messages">
{messages.map((msg, i) => (
<div key={i} className={`message ${msg.role}`}>
{msg.role === 'assistant' ? (
<ReactMarkdown>{msg.content}</ReactMarkdown>
) : (
<p>{msg.content}</p>
)}
{/* 流式输出时的光标动画 */}
{msg.role === 'assistant' && isStreaming && i === messages.length - 1 && (
<span className="cursor-blink">▊</span>
)}
</div>
))}
<div ref={bottomRef} />
</div>
{/* 输入框 */}
<form onSubmit={handleSubmit} className="input-area">
<input ref={inputRef} placeholder="输入消息..." disabled={isStreaming} />
{isStreaming ? (
<button type="button" onClick={stopGeneration}>停止</button>
) : (
<button type="submit">发送</button>
)}
</form>
</div>
);
}光标闪烁动画
/* 模拟打字光标效果 */
.cursor-blink {
display: inline-block;
animation: blink 1s step-end infinite;
color: #666;
margin-left: 2px;
}
@keyframes blink {
0%, 100% { opacity: 1; }
50% { opacity: 0; }
}自动滚动的细节
自动滚动的用户体验细节:
简单版(上面代码已实现):
→ 每次消息更新都滚动到底部
→ 问题:用户往上翻看历史时,也会被强制拉回底部
进阶版(ChatGPT 的做法):
→ 用户在底部附近(100px 以内)→ 自动滚动
→ 用户向上滚动了 → 不自动滚动 + 显示"↓ 回到最新"按钮
实现要点:
const isNearBottom = () => {
const { scrollTop, scrollHeight, clientHeight } = containerRef.current;
return scrollHeight - scrollTop - clientHeight < 100;
};
useEffect(() => {
if (isNearBottom()) {
bottomRef.current?.scrollIntoView({ behavior: 'smooth' });
}
}, [messages]);完整方案回顾:
useStreamChat这个 Hook 封装了流式对话的所有核心逻辑——多轮消息管理、SSE 解析、流式状态更新、中断控制、错误处理。业务组件只需要调用sendMessage()和stopGeneration(),完全不用关心流式通信的细节。
本章小结
| 知识点 | 要点 |
|---|---|
| Fetch + ReadableStream | 手动解析 SSE,支持 POST 和自定义 Header |
| 缓冲区机制 | chunk 不按行对齐,必须用 buffer 处理碎片 |
| AbortController | 实现"停止生成"功能 |
| EventSource | 原生简单但限制多,推荐 @microsoft/fetch-event-source |
| Markdown 渲染 | react-markdown + rAF 批量更新解决闪烁 |
| rAF 批量更新 | 每帧最多渲染一次,100 tok/s 也不卡 |
| useStreamChat | 封装多轮对话、中断、错误处理的自定义 Hook |
| 自动滚动 | 底部附近才自动滚,用户上翻时不强制拉回 |
下一章预告:生产级优化 —— Thinking/Reasoning 模式的流式处理、Function Calling 的流式适配、Token 计费监控、以及面向企业的安全审计方案。