Skip to content

AI 应用的流式输出全链路

从 LLM 的第一个 Token 到用户屏幕上的逐字渲染——拆解流式输出的每一层。


6. 前端流式渲染——从字节流到用户看到的文字

后端已经以 SSE 格式把 Token 推过来了。现在轮到前端:如何接收这些事件,并实时渲染到页面上?

这一章解决"最后一公里"——从浏览器收到字节流,到用户看到逐字呈现的文字,中间的每一步该怎么写。


6.1 Fetch + ReadableStream:手动解析 SSE

第 3 章提过,浏览器原生的 EventSource API 有两个硬伤:不支持 POST 请求、不支持自定义 Header。AI 应用通常需要两者(POST 发消息体、Header 带 Bearer Token),所以实战中更常用 Fetch API + ReadableStream 手动解析 SSE。

基础实现

javascript
// fetchSSE.js —— 用 Fetch API 手动解析 SSE 流
async function fetchSSE(prompt, onToken, onDone, onError) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': 'Bearer user-token-xxx',  // ← EventSource 做不到
    },
    body: JSON.stringify({ message: prompt }),
  });

  if (!response.ok) {
    onError(new Error(`HTTP ${response.status}`));
    return;
  }

  // 拿到 ReadableStream
  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';  // 缓冲区:处理不完整的 SSE 行

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    // 字节 → 文本
    buffer += decoder.decode(value, { stream: true });

    // 按行解析 SSE(以 \n\n 分隔事件)
    const lines = buffer.split('\n');
    buffer = lines.pop();  // 最后一行可能不完整,放回缓冲区

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6);  // 去掉 "data: " 前缀

        if (data === '[DONE]') {
          onDone();
          return;
        }

        try {
          const parsed = JSON.parse(data);
          if (parsed.error) {
            onError(new Error(parsed.error));
          } else if (parsed.content) {
            onToken(parsed.content);  // 逐 Token 回调
          }
        } catch (e) {
          // JSON 解析失败,忽略这行
        }
      }
    }
  }
}

使用示例

javascript
// 使用方式
fetchSSE(
  '你好,请介绍一下自己',
  (token) => {
    // 每收到一个 Token 就追加到页面
    document.getElementById('output').textContent += token;
  },
  () => {
    console.log('流式输出完成');
  },
  (error) => {
    console.error('出错了:', error.message);
  }
);

关键细节:缓冲区为什么必要

ReadableStream 的 chunk 不保证按行对齐!

  理想情况(每个 chunk 都是完整的 SSE 事件):
    chunk 1: "data: {\"content\":\"你\"}\n\n"
    chunk 2: "data: {\"content\":\"好\"}\n\n"
    → 每次刚好一个完整事件,直接解析

  实际情况(chunk 在任意位置断开):
    chunk 1: "data: {\"content\":\"你\"}\n\ndata: {\"con"
    chunk 2: "tent\":\"好\"}\n\n"
    → chunk 1 包含了 1.5 个事件
    → chunk 2 是前一个事件的后半部分
    → 不做缓冲 → JSON.parse 报错

  解决方案(上面代码已实现):
    1. 所有 chunk 先追加到 buffer
    2. 按 \n 分割
    3. 最后一个(可能不完整的)放回 buffer
    4. 完整的行才解析

支持中断(AbortController)

javascript
// 支持用户点击"停止生成"
function fetchSSEWithAbort(prompt, callbacks) {
  const controller = new AbortController();

  fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message: prompt }),
    signal: controller.signal,  // ← 关联 AbortController
  })
  .then(async (response) => {
    const reader = response.body.getReader();
    // ... 解析逻辑同上
  })
  .catch((err) => {
    if (err.name === 'AbortError') {
      console.log('用户主动中断');
    } else {
      callbacks.onError(err);
    }
  });

  // 返回中断函数
  return () => controller.abort();
}

// 使用
const stop = fetchSSEWithAbort('你好', { onToken, onDone, onError });
// 用户点击"停止"按钮
document.getElementById('stopBtn').onclick = stop;

实战首选:Fetch + ReadableStream 是目前 AI 应用前端对接流式输出的主流方案。ChatGPT 网页版、Claude 网页版都是这个路子。

6.2 EventSource API:浏览器原生方案

虽然 Fetch + ReadableStream 是主流,但 EventSource 仍有其价值——简单到不需要写解析逻辑

基本用法

javascript
// eventSource.js —— 浏览器原生 SSE 接收
// ⚠️ 只能 GET 请求,不能自定义 Header

const eventSource = new EventSource('/api/stream?prompt=你好');

// 收到消息
eventSource.onmessage = (event) => {
  if (event.data === '[DONE]') {
    eventSource.close();
    return;
  }
  const { content } = JSON.parse(event.data);
  document.getElementById('output').textContent += content;
};

// 自动重连(EventSource 的杀手特性)
// → 断线后浏览器会自动重连,并带上 Last-Event-Id
// → 服务器可以据此恢复推送

// 错误处理
eventSource.onerror = (err) => {
  if (eventSource.readyState === EventSource.CLOSED) {
    console.log('连接已关闭');
  } else {
    console.log('连接出错,正在自动重连...');
  }
};

EventSource 的局限性

EventSource vs Fetch 对比:

  特性                 EventSource           Fetch + ReadableStream
  ──────────────────────────────────────────────────────────────────
  请求方法              GET only ❌            任意(POST ✅)
  自定义 Header         不支持 ❌              支持 ✅
  请求体(body)         不支持 ❌              支持 ✅
  自动重连              内置 ✅                需手动实现
  SSE 解析              自动 ✅                需手动解析
  中断控制              close()                AbortController
  浏览器兼容            IE 不支持              现代浏览器均支持

  结论:
    → 简单的公开接口(无需认证)→ EventSource
    → 需要 POST + Token 认证 → Fetch API(AI 应用几乎都是这种)

用第三方库弥补 EventSource 的缺陷

javascript
// 使用 @microsoft/fetch-event-source 库
// npm install @microsoft/fetch-event-source
import { fetchEventSource } from '@microsoft/fetch-event-source';

await fetchEventSource('/api/chat', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer token',    // ← 支持自定义 Header!
  },
  body: JSON.stringify({ message: '你好' }),

  onmessage(event) {
    if (event.data === '[DONE]') return;
    const { content } = JSON.parse(event.data);
    document.getElementById('output').textContent += content;
  },

  onerror(err) {
    console.error('SSE Error:', err);
    throw err;  // 抛出错误停止重连
  },
});

推荐策略:如果不想手写 SSE 解析,用 @microsoft/fetch-event-source(微软出品)。它结合了 EventSource 的简洁和 Fetch 的灵活——支持 POST、自定义 Header、自动重连,API 和原生 EventSource 几乎一样。

6.3 流式 Markdown 渲染的难点与方案

AI 的回复通常包含 Markdown(标题、列表、代码块、表格)。流式输出时,Markdown 是残缺的——你拿到的是一个不断增长的半成品。如何把它渲染得不闪不跳?

核心难题

流式 Markdown 的三大难题:

  难题 1:不完整的语法

    Token 序列:   "## " → "标题" → "\n\n" → "正文"
    第 1 步收到:  "## "          → 该渲染成 h2 吗?标题文字还没来
    第 2 步收到:  "## 标题"      → 现在显示 h2
    第 3 步收到:  "## 标题\n\n"  → 确认完整了
    → 如果每步都重新解析,标题会"闪烁"

  难题 2:代码块状态

    Token 序列:   "```" → "python" → "\n" → "print" → "(" → ...
    收到 "```":         → 这是代码块的开始?还是行内代码?
    收到 "```python\n":  → 确认是代码块
    还没收到结尾 "```":  → 代码块一直"开着"
    → 后续所有文本都被吞进代码块里

  难题 3:频繁重渲染

    每秒可能收到 50-100 个 Token
    每个 Token 都触发一次:Markdown 解析 → DOM 更新
    → 性能下降、页面闪烁、用户体验差

方案 1:直接追加纯文本(最简单)

javascript
// 最简方案:不解析 Markdown,直接推纯文本
// 适合原型开发、快速验证

let content = '';

function onToken(token) {
  content += token;
  document.getElementById('output').textContent = content;
}

// 优点:极简、无闪烁
// 缺点:没有 Markdown 渲染(代码块、列表全是纯文本)

方案 2:react-markdown + 防抖(推荐)

jsx
// StreamingMarkdown.jsx —— React 流式 Markdown 渲染
import { useState, useRef, useCallback } from 'react';
import ReactMarkdown from 'react-markdown';
import remarkGfm from 'remark-gfm';
import { Prism as SyntaxHighlighter } from 'react-syntax-highlighter';
import { oneDark } from 'react-syntax-highlighter/dist/esm/styles/prism';

function StreamingMarkdown() {
  const [content, setContent] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);
  const bufferRef = useRef('');      // 累积缓冲
  const rafRef = useRef(null);       // requestAnimationFrame ID

  const flushBuffer = useCallback(() => {
    // 用 requestAnimationFrame 批量更新
    // → 每帧只渲染一次,无论这一帧收到多少个 Token
    setContent(bufferRef.current);
    rafRef.current = null;
  }, []);

  const onToken = useCallback((token) => {
    bufferRef.current += token;

    // 如果还没安排渲染,安排一次
    if (!rafRef.current) {
      rafRef.current = requestAnimationFrame(flushBuffer);
    }
  }, [flushBuffer]);

  return (
    <div className="markdown-body">
      <ReactMarkdown
        remarkPlugins={[remarkGfm]}
        components=&#123;&#123;
          // 自定义代码块渲染(带语法高亮)
          code({ node, inline, className, children, ...props }) {
            const match = /language-(\w+)/.exec(className || '');
            return !inline && match ? (
              <SyntaxHighlighter
                style={oneDark}
                language={match[1]}
                PreTag="div"
              >
                {String(children).replace(/\n$/, '')}
              </SyntaxHighlighter>
            ) : (
              <code className={className} {...props}>
                {children}
              </code>
            );
          },
        &#125;&#125;
      >
        {content}
      </ReactMarkdown>
    </div>
  );
}

方案 3:增量解析(高性能场景)

高性能方案:只解析变化的部分

  传统方案(每次全量解析):
    Token 1:解析 "你"         → 生成完整 AST → 渲染
    Token 2:解析 "你好"       → 生成完整 AST → 渲染
    Token 3:解析 "你好!"     → 生成完整 AST → 渲染
    → 每次都重新解析全部文本,O(n²) 复杂度

  增量方案:
    Token 1:追加 "你" 到最后一个文本节点   → 只更新一个 DOM 节点
    Token 2:追加 "好" 到最后一个文本节点   → 只更新一个 DOM 节点
    Token 3:追加 "!" 到最后一个文本节点   → 只更新一个 DOM 节点
    遇到 "\n\n":重新解析最后一个块         → 可能变成新段落
    遇到 "```":进入代码块模式              → 特殊处理
    → 大部分 Token 只需 O(1) 操作

  推荐库:
    → marked(轻量级 Markdown 解析器,支持流式)
    → markdown-it(更完整,可自定义插件)

requestAnimationFrame 批量更新的原理

为什么用 requestAnimationFrame?

  问题:
    100 tokens/s × 每个 Token 触发一次 setState
    = 每秒 100 次 React 重渲染
    = 页面卡死

  解决:requestAnimationFrame 批量合并

    时间线:
    0ms   Token "你"  → 追加到 buffer, 安排 rAF
    5ms   Token "好"  → 追加到 buffer(rAF 已安排,跳过)
    10ms  Token "!"  → 追加到 buffer(跳过)
    16ms  rAF 触发    → 一次性渲染 "你好!"  ← 3 个 Token 合并为 1 次渲染
    20ms  Token "今"  → 追加到 buffer, 安排新 rAF
    ...

  效果:
    → 无论 Token 速度多快,每帧最多渲染一次
    → 60fps 显示器 = 每秒最多 60 次渲染 = 流畅不卡
    → 用户感知不到合并(16ms 内的差异肉眼不可见)

实战建议:中小项目用方案 2(react-markdown + rAF 批量更新)即可。只有在内容超过 1 万字、需要实时语法高亮的场景下,才值得投入精力做增量解析。

6.4 React 中的流式状态管理

实际的 AI 对话界面要管理的状态远不止"一段文本"——多轮对话、加载状态、中断控制、自动滚动……这一节给出一个生产可用的完整方案

useStreamChat:封装一切的自定义 Hook

jsx
// useStreamChat.js —— 流式对话的自定义 Hook
import { useState, useRef, useCallback } from 'react';

export function useStreamChat() {
  const [messages, setMessages] = useState([]);    // 完整对话历史
  const [isStreaming, setIsStreaming] = useState(false);
  const abortRef = useRef(null);
  const bufferRef = useRef('');

  const sendMessage = useCallback(async (prompt) => {
    // 1. 添加用户消息
    const userMsg = { role: 'user', content: prompt };
    const assistantMsg = { role: 'assistant', content: '' };
    setMessages(prev => [...prev, userMsg, assistantMsg]);
    setIsStreaming(true);
    bufferRef.current = '';

    // 2. 发起流式请求
    const controller = new AbortController();
    abortRef.current = controller;

    try {
      const response = await fetch('/api/chat', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${getToken()}`,
        },
        body: JSON.stringify({ message: prompt }),
        signal: controller.signal,
      });

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let buffer = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop();

        for (const line of lines) {
          if (!line.startsWith('data: ')) continue;
          const data = line.slice(6);
          if (data === '[DONE]') break;

          try {
            const parsed = JSON.parse(data);
            if (parsed.content) {
              bufferRef.current += parsed.content;
              // 更新最后一条 assistant 消息
              setMessages(prev => {
                const updated = [...prev];
                updated[updated.length - 1] = {
                  ...updated[updated.length - 1],
                  content: bufferRef.current,
                };
                return updated;
              });
            }
          } catch {}
        }
      }
    } catch (err) {
      if (err.name !== 'AbortError') {
        // 真正的错误:更新最后一条消息为错误信息
        setMessages(prev => {
          const updated = [...prev];
          updated[updated.length - 1] = {
            ...updated[updated.length - 1],
            content: bufferRef.current || '生成失败,请重试。',
            error: true,
          };
          return updated;
        });
      }
    } finally {
      setIsStreaming(false);
      abortRef.current = null;
    }
  }, []);

  const stopGeneration = useCallback(() => {
    abortRef.current?.abort();
  }, []);

  return { messages, isStreaming, sendMessage, stopGeneration };
}

function getToken() { return localStorage.getItem('token') || ''; }

使用这个 Hook 构建聊天界面

jsx
// ChatPage.jsx —— 完整的流式聊天页面
import { useRef, useEffect } from 'react';
import ReactMarkdown from 'react-markdown';
import { useStreamChat } from './useStreamChat';

function ChatPage() {
  const { messages, isStreaming, sendMessage, stopGeneration } = useStreamChat();
  const bottomRef = useRef(null);
  const inputRef = useRef(null);

  // 自动滚动到底部
  useEffect(() => {
    bottomRef.current?.scrollIntoView({ behavior: 'smooth' });
  }, [messages]);

  const handleSubmit = (e) => {
    e.preventDefault();
    const prompt = inputRef.current.value.trim();
    if (!prompt || isStreaming) return;
    inputRef.current.value = '';
    sendMessage(prompt);
  };

  return (
    <div className="chat-container">
      {/* 消息列表 */}
      <div className="messages">
        {messages.map((msg, i) => (
          <div key={i} className={`message ${msg.role}`}>
            {msg.role === 'assistant' ? (
              <ReactMarkdown>{msg.content}</ReactMarkdown>
            ) : (
              <p>{msg.content}</p>
            )}
            {/* 流式输出时的光标动画 */}
            {msg.role === 'assistant' && isStreaming && i === messages.length - 1 && (
              <span className="cursor-blink">▊</span>
            )}
          </div>
        ))}
        <div ref={bottomRef} />
      </div>

      {/* 输入框 */}
      <form onSubmit={handleSubmit} className="input-area">
        <input ref={inputRef} placeholder="输入消息..." disabled={isStreaming} />
        {isStreaming ? (
          <button type="button" onClick={stopGeneration}>停止</button>
        ) : (
          <button type="submit">发送</button>
        )}
      </form>
    </div>
  );
}

光标闪烁动画

css
/* 模拟打字光标效果 */
.cursor-blink {
  display: inline-block;
  animation: blink 1s step-end infinite;
  color: #666;
  margin-left: 2px;
}

@keyframes blink {
  0%, 100% { opacity: 1; }
  50% { opacity: 0; }
}

自动滚动的细节

自动滚动的用户体验细节:

  简单版(上面代码已实现):
    → 每次消息更新都滚动到底部
    → 问题:用户往上翻看历史时,也会被强制拉回底部

  进阶版(ChatGPT 的做法):
    → 用户在底部附近(100px 以内)→ 自动滚动
    → 用户向上滚动了 → 不自动滚动 + 显示"↓ 回到最新"按钮

  实现要点:
    const isNearBottom = () => {
      const { scrollTop, scrollHeight, clientHeight } = containerRef.current;
      return scrollHeight - scrollTop - clientHeight < 100;
    };

    useEffect(() => {
      if (isNearBottom()) {
        bottomRef.current?.scrollIntoView({ behavior: 'smooth' });
      }
    }, [messages]);

完整方案回顾useStreamChat 这个 Hook 封装了流式对话的所有核心逻辑——多轮消息管理、SSE 解析、流式状态更新、中断控制、错误处理。业务组件只需要调用 sendMessage()stopGeneration(),完全不用关心流式通信的细节。


本章小结

知识点要点
Fetch + ReadableStream手动解析 SSE,支持 POST 和自定义 Header
缓冲区机制chunk 不按行对齐,必须用 buffer 处理碎片
AbortController实现"停止生成"功能
EventSource原生简单但限制多,推荐 @microsoft/fetch-event-source
Markdown 渲染react-markdown + rAF 批量更新解决闪烁
rAF 批量更新每帧最多渲染一次,100 tok/s 也不卡
useStreamChat封装多轮对话、中断、错误处理的自定义 Hook
自动滚动底部附近才自动滚,用户上翻时不强制拉回

下一章预告:生产级优化 —— Thinking/Reasoning 模式的流式处理、Function Calling 的流式适配、Token 计费监控、以及面向企业的安全审计方案。

坚持是一种品格