Skip to content

AI 应用的流式输出全链路

从 LLM 的第一个 Token 到用户屏幕上的逐字渲染——拆解流式输出的每一层。


3. 流式传输协议——SSE、WebSocket、gRPC

上一章我们知道 LLM 是逐 Token 生成的。但 Token 生成在 GPU 上,用户坐在浏览器前——中间隔着网络。用什么协议把 Token 实时推送给客户端?

这一章拆解三种主流方案:SSE、WebSocket、gRPC,搞清楚各自的原理、代码实现和适用场景。


3.1 SSE:最简单的服务器推送

什么是 SSE

SSE(Server-Sent Events)是 HTML5 规范的一部分,专门用于服务器向客户端单向推送数据

SSE 的通信模型:

  Client                          Server
    │                               │
    │   GET /stream                 │
    │   Accept: text/event-stream   │
    │ ─────────────────────────────▶│
    │                               │
    │   200 OK                      │
    │   Content-Type: text/event-stream
    │ ◀═════════════════════════════│
    │   data: 你                    │
    │ ◀═════════════════════════════│
    │   data: 好                    │
    │ ◀═════════════════════════════│
    │   data: 世                    │
    │ ◀═════════════════════════════│
    │   data: 界                    │
    │ ◀═════════════════════════════│
    │   data: [DONE]                │
    │ ◀═════════════════════════════│
    │                               │

  特点:
    • 基于普通 HTTP(GET 请求)
    • 服务器持续推送,客户端只接收
    • 单向通信:Server → Client
    • 天然支持断线自动重连

SSE 协议格式

SSE 的数据格式极其简单——纯文本,字段用换行分隔:

SSE 数据格式(每条消息):

  event: message_type    ← 可选,事件类型
  id: 123                ← 可选,消息 ID(用于断线重连)
  retry: 3000            ← 可选,重连间隔(毫秒)
  data: 实际数据内容      ← 必填,数据
  data: 可以多行          ← 多个 data 字段会用 \n 拼接
                         ← 空行表示一条消息结束

  示例(OpenAI 的流式输出格式):

  data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"}}]}

  data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"}}]}

  data: [DONE]

Python 服务端实现(FastAPI)

python
# sse_server.py —— 用 FastAPI 实现 SSE 端点
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def token_generator(prompt: str):
    """模拟 LLM 逐 Token 输出"""
    tokens = ["你好", "!", "今天", "天气", "不错", ",", "适合", "写代码", "。"]
    for token in tokens:
        # 模拟 Token 生成延迟(真实场景来自 LLM)
        await asyncio.sleep(0.1)
        # SSE 格式:data: {JSON}\n\n
        data = json.dumps({"token": token}, ensure_ascii=False)
        yield f"data: {data}\n\n"
    # 发送结束标记
    yield "data: [DONE]\n\n"

@app.get("/stream")
async def stream_chat(prompt: str = "你好"):
    return StreamingResponse(
        token_generator(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

JavaScript 客户端实现

javascript
// sse_client.js —— 浏览器端接收 SSE
const eventSource = new EventSource("/stream?prompt=你好");

eventSource.onmessage = (event) => {
  if (event.data === "[DONE]") {
    eventSource.close();
    console.log("流式输出完成");
    return;
  }

  const { token } = JSON.parse(event.data);
  // 追加到页面上
  document.getElementById("output").textContent += token;
};

eventSource.onerror = (error) => {
  console.error("SSE 连接出错:", error);
  eventSource.close();
};
SSE 的优缺点总结:

  ✅ 优点:
    • 实现极简——基于普通 HTTP,无需额外协议
    • 浏览器原生支持 EventSource API
    • 自动断线重连(Last-Event-Id 机制)
    • 穿越防火墙和代理无压力(就是普通 HTTP)
    • OpenAI / Anthropic 等所有主流 API 的选择

  ❌ 缺点:
    • 单向通信——客户端不能通过同一连接发消息
    • 只支持文本数据(不能传二进制)
    • 浏览器 EventSource API 不支持自定义 Header
      → 不能在 Header 中传 Bearer Token
      → 需要用 Fetch API 手动解析(第 6 章详解)
    • 并发连接数受浏览器限制(HTTP/1.1 每域名 6 个)

为什么 AI 应用首选 SSE? 因为 LLM 流式输出就是"服务器单向推送文本"——完美匹配 SSE 的设计。不需要双向通信,不需要二进制,简单就是最大的优势。

3.2 WebSocket:双向实时通信

SSE 是单向的——服务器推、客户端收。如果你需要双向实时通信(比如客户端随时发中断指令),WebSocket 是更强大的选择。

WebSocket 通信模型

WebSocket 的通信流程:

  Client                          Server
    │                               │
    │   GET /ws (HTTP Upgrade)      │
    │   Upgrade: websocket          │
    │ ─────────────────────────────▶│
    │                               │
    │   101 Switching Protocols     │
    │ ◀─────────────────────────────│
    │                               │
    │ ══════ WebSocket 连接建立 ════│  ← 不再是 HTTP!
    │                               │
    │   {"action":"chat",           │
    │    "message":"你好"}           │
    │ ═════════════════════════════▶│  ← 客户端发消息
    │                               │
    │   {"token":"你"}              │
    │ ◀═════════════════════════════│  ← 服务器推 Token
    │   {"token":"好"}              │
    │ ◀═════════════════════════════│
    │                               │
    │   {"action":"stop"}           │
    │ ═════════════════════════════▶│  ← 客户端发中断!
    │                               │
    │   {"status":"stopped"}        │
    │ ◀═════════════════════════════│
    │                               │

  vs SSE 的核心区别:
    SSE:       Server → Client(单向)
    WebSocket: Server ⇄ Client(双向)

Python 服务端实现(FastAPI)

python
# ws_server.py —— 用 FastAPI 实现 WebSocket 流式输出
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import json

app = FastAPI()

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            # 接收客户端消息
            data = await websocket.receive_json()

            if data.get("action") == "chat":
                prompt = data.get("message", "")
                # 模拟逐 Token 生成
                tokens = ["你好", "!", "我是", "AI", "助手", "。"]

                for token in tokens:
                    await asyncio.sleep(0.1)
                    await websocket.send_json({"type": "token", "content": token})

                # 发送完成标记
                await websocket.send_json({"type": "done"})

            elif data.get("action") == "stop":
                # 客户端请求中断生成
                await websocket.send_json({"type": "stopped"})

    except WebSocketDisconnect:
        print("客户端断开连接")

JavaScript 客户端实现

javascript
// ws_client.js —— 浏览器端 WebSocket 通信
const ws = new WebSocket("ws://localhost:8000/ws/chat");

ws.onopen = () => {
  // 发送聊天消息
  ws.send(JSON.stringify({
    action: "chat",
    message: "你好,请介绍一下自己"
  }));
};

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);

  switch (data.type) {
    case "token":
      document.getElementById("output").textContent += data.content;
      break;
    case "done":
      console.log("生成完成");
      break;
    case "stopped":
      console.log("已中断生成");
      break;
  }
};

// 用户点击"停止"按钮
document.getElementById("stopBtn").onclick = () => {
  ws.send(JSON.stringify({ action: "stop" }));
};
WebSocket 的优缺点总结:

  ✅ 优点:
    • 全双工——客户端可以随时发消息(中断、追问)
    • 支持二进制数据(图片、音频流)
    • 延迟更低(无需每次 HTTP 握手)
    • 无并发连接数限制(单连接即可)

  ❌ 缺点:
    • 实现更复杂——需要管理连接状态、心跳、重连
    • 不支持 HTTP 中间件(认证、日志、限流更麻烦)
    • 某些企业代理/防火墙会阻断 WebSocket
    • 没有自动重连(需要自己实现)
    • 对于"单向推送"场景,过度设计了

AI 应用中 WebSocket 的适用场景:当你需要"边生成边让用户中断"、或者一个连接内处理多轮连续对话、或者传输音频/图片等二进制数据时,WebSocket 比 SSE 更合适。

3.3 gRPC Streaming:高性能 RPC 流

gRPC 是 Google 开源的高性能 RPC 框架,基于 HTTP/2 和 Protocol Buffers。它原生支持四种流模式,在微服务和内部通信场景下表现出色。

gRPC 的四种通信模式

gRPC 四种模式(与流式输出的关系):

  1. Unary(一元调用)
     Client ──请求──▶ Server ──响应──▶ Client
     → 普通的 RPC 调用,非流式

  2. Server Streaming(服务端流)⭐ AI 流式输出用这个
     Client ──请求──▶ Server ══多条响应══▶ Client
     → 类似 SSE:服务器持续推送
     → 最适合 LLM 流式 Token 输出

  3. Client Streaming(客户端流)
     Client ══多条请求══▶ Server ──响应──▶ Client
     → 客户端分批发送数据(如音频流)

  4. Bidirectional Streaming(双向流)
     Client ══多条══▶ Server
     Client ◀══多条══ Server
     → 类似 WebSocket:双向流

Proto 定义

protobuf
// chat.proto —— 定义流式聊天服务
syntax = "proto3";

service ChatService {
  // Server Streaming:客户端发一条消息,服务端流式返回 Token
  rpc StreamChat(ChatRequest) returns (stream ChatToken);
}

message ChatRequest {
  string prompt = 1;
  float temperature = 2;
}

message ChatToken {
  string content = 1;      // Token 文本
  bool is_done = 2;        // 是否结束
  int32 token_index = 3;   // Token 序号
}

Python 服务端实现

python
# grpc_server.py —— gRPC 流式聊天服务
import grpc
from concurrent import futures
import time
import chat_pb2
import chat_pb2_grpc

class ChatServicer(chat_pb2_grpc.ChatServiceServicer):
    def StreamChat(self, request, context):
        """Server Streaming:逐 Token 推送"""
        tokens = ["你好", "!", "我是", "AI", "助手", "。"]

        for i, token in enumerate(tokens):
            time.sleep(0.1)  # 模拟生成延迟
            yield chat_pb2.ChatToken(
                content=token,
                is_done=False,
                token_index=i,
            )

        # 发送结束标记
        yield chat_pb2.ChatToken(content="", is_done=True, token_index=len(tokens))

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
chat_pb2_grpc.add_ChatServiceServicer_to_server(ChatServicer(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()

Python 客户端实现

python
# grpc_client.py —— 接收 gRPC 流式输出
import grpc
import chat_pb2
import chat_pb2_grpc

channel = grpc.insecure_channel("localhost:50051")
stub = chat_pb2_grpc.ChatServiceStub(channel)

request = chat_pb2.ChatRequest(prompt="你好", temperature=0.7)

# 接收流式响应
for token in stub.StreamChat(request):
    if token.is_done:
        print("\n生成完成")
        break
    print(token.content, end="", flush=True)
gRPC Streaming 的优缺点:

  ✅ 优点:
    • 性能极高——Protobuf 二进制序列化比 JSON 快 5-10x
    • HTTP/2 多路复用——单连接多流
    • 强类型——Proto 文件定义接口,编译时检查
    • 双向流支持——比 WebSocket 更结构化
    • 适合微服务间通信

  ❌ 缺点:
    • 浏览器不能直接调用(需要 gRPC-Web 代理)
    • 实现复杂度高(Proto 编译、代码生成)
    • 调试不方便(二进制协议,不能用 curl)
    • 对于简单的 AI 应用场景,杀鸡用牛刀

  适用场景:
    → 内部微服务之间的流式通信(推理服务 → 后端网关)
    → vLLM / TGI 等推理框架的内部通信
    → 不适合直接面向浏览器客户端

AI 应用中的角色:gRPC 通常出现在后端内部——推理服务和 API 网关之间。面向前端用户时,通常会在网关层转成 SSE 或 WebSocket。

3.4 三者对比与选型指南

三种方案都讲完了。该选哪个?

全维度对比

维度SSEWebSocketgRPC Streaming
方向单向(Server→Client)双向双向
协议HTTP/1.1独立协议(ws://)HTTP/2
数据格式纯文本文本 / 二进制Protobuf(二进制)
浏览器支持✅ 原生 EventSource✅ 原生 WebSocket❌ 需要 gRPC-Web
自动重连✅ 内置❌ 需手动实现❌ 需手动实现
Header 自定义❌ EventSource 不支持✅ 握手时可带✅ metadata
代理穿越✅ 普通 HTTP⚠️ 可能被阻断⚠️ 需要 HTTP/2 支持
实现复杂度⭐ 最简单⭐⭐ 中等⭐⭐⭐ 最复杂
性能⭐⭐ 够用⭐⭐⭐ 好⭐⭐⭐⭐⭐ 最高
调试✅ curl 直接看⚠️ 需要工具❌ 二进制,难看

AI 应用的选型决策树

你的 AI 应用该选哪个协议?

  你的客户端是什么?

  ├── 浏览器 / 移动端 Web
  │   │
  │   ├── 只需要接收 AI 的回复?(最常见的场景)
  │   │   └── ✅ SSE(首选)
  │   │       理由:最简单、兼容性最好、所有 LLM API 都用这个
  │   │
  │   ├── 需要双向通信?(边生成边中断、语音对话)
  │   │   └── ✅ WebSocket
  │   │       理由:全双工、支持二进制
  │   │
  │   └── 需要极致性能?(高频交互、大量并发)
  │       └── ✅ WebSocket
  │           理由:无 HTTP 开销、单连接复用

  ├── 后端微服务之间
  │   └── ✅ gRPC Streaming
  │       理由:高性能、强类型、双向流

  └── Python / Node.js 脚本
      └── ✅ SSE(简单场景) or gRPC(高性能场景)

典型 AI 应用架构中三者的位置

完整架构中的协议分层:

  ┌───────────┐    SSE / WebSocket    ┌──────────┐
  │  浏览器    │ ◀════════════════════ │  API     │
  │  (React)  │                      │  Gateway │
  └───────────┘                      │ (FastAPI)│
                                     └────┬─────┘

                                     gRPC Streaming

                                     ┌────▼─────┐
                                     │  推理服务  │
                                     │  (vLLM)  │
                                     └──────────┘

  三层协议各司其职:
    浏览器 ⇄ API Gateway:SSE(简单、兼容)或 WebSocket(需要双向)
    API Gateway ⇄ 推理服务:gRPC(高性能、内部通信)

  实际上,大多数 AI 应用的架构更简单:
    浏览器 ──SSE──▶ FastAPI ──HTTP──▶ OpenAI API(也返回 SSE)
    → 全链路都是 SSE,最简单的方案

经验法则:除非你有明确的理由用 WebSocket 或 gRPC,否则默认选 SSE。它是 OpenAI、Anthropic、Google 等所有主流 LLM 提供商的选择,也是最简单的方案。简单意味着更少的 bug、更低的维护成本。


本章小结

知识点要点
SSE单向推送、基于 HTTP、自动重连、AI 应用首选
WebSocket双向通信、支持二进制、适合需要中断/语音场景
gRPC高性能、强类型、适合后端微服务间通信
SSE 格式data: {JSON}\n\n,以空行分隔消息
浏览器限制EventSource 不支持自定义 Header(需 Fetch 替代)
选型原则默认 SSE、需要双向用 WebSocket、内部通信用 gRPC
典型架构浏览器←SSE→网关←gRPC→推理服务

下一章预告:LLM 提供商的流式 API —— 实际调用 OpenAI、Anthropic、Ollama 的流式接口,看看每个 chunk 里到底长什么样,以及如何统一封装多个提供商。

坚持是一种品格