Skip to content

AI 应用的流式输出全链路

从 LLM 的第一个 Token 到用户屏幕上的逐字渲染——拆解流式输出的每一层。


7. 实战:构建一个完整的流式聊天应用

前 6 章我们拆解了流式输出的每一层——Token 生成、传输协议、LLM API、后端中转、前端渲染。现在是时候把它们串起来,构建一个完整的、可运行的流式聊天应用。

本章给出的代码可以直接复制运行,是一个最小但完整的全栈项目。


7.1 项目架构与技术选型

架构总览

完整的流式聊天应用架构:

  ┌──────────────────────────────────────┐
  │         React + Vite 前端            │
  │                                      │
  │  • Fetch + ReadableStream 接收 SSE   │
  │  • useStreamChat Hook 管理状态       │
  │  • react-markdown 渲染 Markdown      │
  │  • 自动滚动 + 光标动画               │
  │                                      │
  └──────────────┬───────────────────────┘
                 │ POST /api/chat (SSE)
                 │ Authorization: Bearer xxx
  ┌──────────────┴───────────────────────┐
  │         FastAPI 后端                  │
  │                                      │
  │  • 用户认证(JWT)                    │
  │  • 对话历史管理(内存/Redis)          │
  │  • OpenAI 流式调用 + SSE 转发        │
  │  • 错误处理 + 超时控制               │
  │                                      │
  └──────────────┬───────────────────────┘
                 │ stream=True
  ┌──────────────┴───────────────────────┐
  │       OpenAI API / Ollama            │
  │                                      │
  │  • GPT-4o(生产)                    │
  │  • Llama3(本地开发)                │
  └──────────────────────────────────────┘

技术选型

层级技术理由
前端框架React 19 + Vite生态成熟、Vite 启动快
SSE 接收Fetch + ReadableStream支持 POST + Header
Markdownreact-markdown + remark-gfm支持 GFM 表格、代码块
后端框架FastAPI原生异步、StreamingResponse
LLM 调用openai SDK (AsyncOpenAI)统一接口、兼容 Ollama
数据存储内存 dict(演示用)生产环境换 Redis

目录结构

streaming-chat/
├── backend/
│   ├── main.py           # FastAPI 入口
│   ├── requirements.txt  # Python 依赖
│   └── .env              # OPENAI_API_KEY
├── frontend/
│   ├── src/
│   │   ├── App.jsx       # 主组件
│   │   ├── hooks/
│   │   │   └── useStreamChat.js  # 流式通信 Hook
│   │   ├── components/
│   │   │   └── ChatMessage.jsx   # 消息组件
│   │   └── main.jsx      # 入口
│   ├── index.html
│   ├── package.json
│   └── vite.config.js
└── README.md

7.2 后端:FastAPI 流式聊天 API

一个文件,完整的后端——80 行代码。

python
# backend/main.py —— 完整的流式聊天后端
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from openai import AsyncOpenAI
from pydantic import BaseModel
import json
import os
import uuid

app = FastAPI(title="流式聊天 API")

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:5173"],  # Vite 默认端口
    allow_methods=["*"],
    allow_headers=["*"],
)

# LLM 客户端
client = AsyncOpenAI(
    api_key=os.getenv("OPENAI_API_KEY", "ollama"),
    base_url=os.getenv("OPENAI_BASE_URL", "http://localhost:11434/v1"),
)
MODEL = os.getenv("MODEL_NAME", "llama3:8b")

# 对话历史(内存存储,生产用 Redis)
conversations: dict[str, list] = {}

# ═════════════════════════════════
# 请求模型
# ═════════════════════════════════

class ChatRequest(BaseModel):
    message: str
    conversation_id: str = ""

# ═════════════════════════════════
# 流式生成器
# ═════════════════════════════════

async def generate_sse(request: ChatRequest):
    """调用 LLM → 逐 Token 推送 SSE"""

    # 获取或创建对话
    conv_id = request.conversation_id or str(uuid.uuid4())
    history = conversations.get(conv_id, [
        {"role": "system", "content": "你是一个有帮助的 AI 助手。请用中文回答。"}
    ])
    messages = history + [{"role": "user", "content": request.message}]

    # 推送 conversation_id(前端保存用)
    yield f"data: {json.dumps({'conversation_id': conv_id})}\n\n"

    try:
        stream = await client.chat.completions.create(
            model=MODEL,
            messages=messages,
            stream=True,
            temperature=0.7,
            max_tokens=2048,
        )

        full_response = ""
        async for chunk in stream:
            if chunk.choices and chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                full_response += content
                data = json.dumps({"content": content}, ensure_ascii=False)
                yield f"data: {data}\n\n"

        # 保存对话历史
        conversations[conv_id] = messages + [
            {"role": "assistant", "content": full_response}
        ]

    except Exception as e:
        yield f"data: {json.dumps({'error': str(e)})}\n\n"

    yield "data: [DONE]\n\n"

# ═════════════════════════════════
# API 端点
# ═════════════════════════════════

@app.post("/api/chat")
async def chat(request: ChatRequest):
    return StreamingResponse(
        generate_sse(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

@app.get("/api/health")
async def health():
    return {"status": "ok", "model": MODEL}
bash
# backend/requirements.txt
fastapi==0.115.*
uvicorn[standard]==0.30.*
openai==1.55.*
python-dotenv==1.0.*
bash
# 启动后端
cd backend
pip install -r requirements.txt

# 使用 Ollama(本地免费)
OPENAI_BASE_URL=http://localhost:11434/v1 MODEL_NAME=llama3:8b uvicorn main:app --reload

# 或使用 OpenAI(需要 API Key)
OPENAI_API_KEY=sk-xxx MODEL_NAME=gpt-4o uvicorn main:app --reload

7.3 前端:React 流式聊天界面

三个文件,完整的前端。

文件 1:useStreamChat Hook

javascript
// frontend/src/hooks/useStreamChat.js
import { useState, useRef, useCallback } from 'react';

export function useStreamChat() {
  const [messages, setMessages] = useState([]);
  const [isStreaming, setIsStreaming] = useState(false);
  const abortRef = useRef(null);
  const bufferRef = useRef('');
  const convIdRef = useRef('');

  const sendMessage = useCallback(async (prompt) => {
    const userMsg = { role: 'user', content: prompt };
    const assistantMsg = { role: 'assistant', content: '' };
    setMessages(prev => [...prev, userMsg, assistantMsg]);
    setIsStreaming(true);
    bufferRef.current = '';

    const controller = new AbortController();
    abortRef.current = controller;

    try {
      const res = await fetch('/api/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          message: prompt,
          conversation_id: convIdRef.current,
        }),
        signal: controller.signal,
      });

      const reader = res.body.getReader();
      const decoder = new TextDecoder();
      let sseBuffer = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        sseBuffer += decoder.decode(value, { stream: true });
        const lines = sseBuffer.split('\n');
        sseBuffer = lines.pop();

        for (const line of lines) {
          if (!line.startsWith('data: ')) continue;
          const raw = line.slice(6);
          if (raw === '[DONE]') continue;

          try {
            const data = JSON.parse(raw);

            // 保存 conversation_id
            if (data.conversation_id) {
              convIdRef.current = data.conversation_id;
              continue;
            }

            // 错误处理
            if (data.error) {
              bufferRef.current += `\n\n⚠️ ${data.error}`;
            }

            // 正常内容
            if (data.content) {
              bufferRef.current += data.content;
            }

            setMessages(prev => {
              const updated = [...prev];
              updated[updated.length - 1] = {
                role: 'assistant',
                content: bufferRef.current,
              };
              return updated;
            });
          } catch {}
        }
      }
    } catch (err) {
      if (err.name !== 'AbortError') {
        setMessages(prev => {
          const updated = [...prev];
          updated[updated.length - 1] = {
            role: 'assistant',
            content: '网络错误,请重试。',
            error: true,
          };
          return updated;
        });
      }
    } finally {
      setIsStreaming(false);
      abortRef.current = null;
    }
  }, []);

  const stopGeneration = useCallback(() => {
    abortRef.current?.abort();
  }, []);

  const clearMessages = useCallback(() => {
    setMessages([]);
    convIdRef.current = '';
  }, []);

  return { messages, isStreaming, sendMessage, stopGeneration, clearMessages };
}

文件 2:ChatMessage 组件

jsx
// frontend/src/components/ChatMessage.jsx
import ReactMarkdown from 'react-markdown';
import remarkGfm from 'remark-gfm';

export function ChatMessage({ message, isLast, isStreaming }) {
  const isUser = message.role === 'user';

  return (
    <div className={`message ${isUser ? 'user' : 'assistant'}`}>
      <div className="message-avatar">
        {isUser ? '👤' : '🤖'}
      </div>
      <div className="message-content">
        {isUser ? (
          <p>{message.content}</p>
        ) : (
          <ReactMarkdown remarkPlugins={[remarkGfm]}>
            {message.content}
          </ReactMarkdown>
        )}
        {!isUser && isStreaming && isLast && (
          <span className="cursor-blink">▊</span>
        )}
      </div>
    </div>
  );
}

文件 3:App 主组件

jsx
// frontend/src/App.jsx
import { useRef, useEffect } from 'react';
import { useStreamChat } from './hooks/useStreamChat';
import { ChatMessage } from './components/ChatMessage';
import './App.css';

export default function App() {
  const { messages, isStreaming, sendMessage, stopGeneration, clearMessages } =
    useStreamChat();
  const inputRef = useRef(null);
  const bottomRef = useRef(null);

  useEffect(() => {
    bottomRef.current?.scrollIntoView({ behavior: 'smooth' });
  }, [messages]);

  const handleSubmit = (e) => {
    e.preventDefault();
    const text = inputRef.current.value.trim();
    if (!text || isStreaming) return;
    inputRef.current.value = '';
    sendMessage(text);
  };

  return (
    <div className="app">
      <header className="header">
        <h1>💬 AI Chat</h1>
        <button onClick={clearMessages} className="clear-btn">
          清空对话
        </button>
      </header>

      <main className="chat-area">
        {messages.length === 0 && (
          <div className="empty-state">
            <p>👋 你好!输入消息开始对话</p>
          </div>
        )}
        {messages.map((msg, i) => (
          <ChatMessage
            key={i}
            message={msg}
            isLast={i === messages.length - 1}
            isStreaming={isStreaming}
          />
        ))}
        <div ref={bottomRef} />
      </main>

      <footer className="input-area">
        <form onSubmit={handleSubmit}>
          <input
            ref={inputRef}
            placeholder="输入你的问题..."
            disabled={isStreaming}
            autoFocus
          />
          {isStreaming ? (
            <button type="button" onClick={stopGeneration} className="stop-btn">
              ⏹ 停止
            </button>
          ) : (
            <button type="submit" className="send-btn">
              发送 ↑
            </button>
          )}
        </form>
      </footer>
    </div>
  );
}

CSS 样式

css
/* frontend/src/App.css */
* { margin: 0; padding: 0; box-sizing: border-box; }

body {
  font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
  background: #f7f7f8;
}

.app {
  max-width: 800px;
  margin: 0 auto;
  height: 100vh;
  display: flex;
  flex-direction: column;
}

.header {
  display: flex;
  justify-content: space-between;
  align-items: center;
  padding: 16px 20px;
  border-bottom: 1px solid #e5e5e5;
  background: white;
}

.header h1 { font-size: 20px; }

.clear-btn {
  padding: 6px 12px;
  border: 1px solid #ddd;
  border-radius: 6px;
  background: white;
  cursor: pointer;
}

.chat-area {
  flex: 1;
  overflow-y: auto;
  padding: 20px;
}

.empty-state {
  text-align: center;
  padding: 100px 20px;
  color: #999;
  font-size: 18px;
}

.message {
  display: flex;
  gap: 12px;
  margin-bottom: 24px;
}

.message-avatar {
  width: 36px;
  height: 36px;
  border-radius: 50%;
  display: flex;
  align-items: center;
  justify-content: center;
  font-size: 20px;
  flex-shrink: 0;
}

.message-content {
  flex: 1;
  line-height: 1.6;
}

.message.user .message-content {
  background: #e8f0fe;
  padding: 10px 16px;
  border-radius: 12px;
}

.message.assistant .message-content pre {
  background: #1e1e1e;
  color: #d4d4d4;
  padding: 16px;
  border-radius: 8px;
  overflow-x: auto;
  margin: 8px 0;
}

.message.assistant .message-content code {
  background: #f0f0f0;
  padding: 2px 6px;
  border-radius: 4px;
  font-size: 0.9em;
}

.message.assistant .message-content pre code {
  background: none;
  padding: 0;
}

.cursor-blink {
  animation: blink 1s step-end infinite;
  color: #666;
}

@keyframes blink {
  0%, 100% { opacity: 1; }
  50% { opacity: 0; }
}

.input-area {
  padding: 16px 20px;
  border-top: 1px solid #e5e5e5;
  background: white;
}

.input-area form {
  display: flex;
  gap: 8px;
}

.input-area input {
  flex: 1;
  padding: 12px 16px;
  border: 1px solid #ddd;
  border-radius: 8px;
  font-size: 16px;
  outline: none;
}

.input-area input:focus {
  border-color: #4a90d9;
  box-shadow: 0 0 0 2px rgba(74, 144, 217, 0.2);
}

.send-btn, .stop-btn {
  padding: 12px 20px;
  border: none;
  border-radius: 8px;
  font-size: 16px;
  cursor: pointer;
}

.send-btn {
  background: #4a90d9;
  color: white;
}

.stop-btn {
  background: #e74c3c;
  color: white;
}

Vite 配置(代理到后端)

javascript
// frontend/vite.config.js
import { defineConfig } from 'vite';
import react from '@vitejs/plugin-react';

export default defineConfig({
  plugins: [react()],
  server: {
    proxy: {
      '/api': {
        target: 'http://localhost:8000',
        changeOrigin: true,
      },
    },
  },
});
json
// frontend/package.json(核心依赖)
{
  "dependencies": {
    "react": "^19.0.0",
    "react-dom": "^19.0.0",
    "react-markdown": "^9.0.0",
    "remark-gfm": "^4.0.0"
  }
}

7.4 完整运行演示

一键启动

bash
# 终端 1:启动后端
cd backend
pip install -r requirements.txt

# 方案 A:用 Ollama 本地模型(推荐,免费)
ollama pull llama3:8b
uvicorn main:app --reload --port 8000

# 方案 B:用 OpenAI API
OPENAI_API_KEY=sk-xxx OPENAI_BASE_URL=https://api.openai.com/v1 MODEL_NAME=gpt-4o \
  uvicorn main:app --reload --port 8000

# 终端 2:启动前端
cd frontend
npm install
npm run dev
# → 访问 http://localhost:5173

curl 测试后端

bash
# 确认后端正常
curl http://localhost:8000/api/health
# → {"status":"ok","model":"llama3:8b"}

# 测试流式输出
curl -X POST http://localhost:8000/api/chat \
  -H "Content-Type: application/json" \
  -d '{"message": "用三句话介绍 Python"}' \
  --no-buffer

# 预期输出(逐行出现):
# data: {"conversation_id":"550e8400-..."}
# data: {"content":"Python"}
# data: {"content":"是"}
# data: {"content":"一种"}
# ...
# data: [DONE]

全链路数据流

一次完整的流式对话,数据流经的每一步:

  1. 用户输入 "你好"

  2. React: sendMessage("你好")
     │ → setMessages([...prev, userMsg, assistantMsg])
     │ → fetch POST /api/chat { message: "你好" }

  3. Vite Proxy → FastAPI
     │ → POST /api/chat 到达后端
     │ → 加载对话历史

  4. FastAPI → OpenAI/Ollama
     │ → client.chat.completions.create(stream=True)
     │ → 等待 TTFT(Prefill 阶段)

  5. LLM 开始 Decode
     │ → 逐 Token 生成
     │ → Token "你" → chunk → yield "data: {content: 你}\n\n"
     │ → Token "好" → chunk → yield "data: {content: 好}\n\n"

  6. SSE 事件流回到浏览器
     │ → ReadableStream.read()
     │ → 解析 "data: " 前缀
     │ → JSON.parse → { content: "你" }

  7. React 状态更新
     │ → bufferRef.current += "你"
     │ → setMessages(更新最后一条)
     │ → ReactMarkdown 重渲染
     │ → 用户看到 "你"(光标闪烁)

  8. 重复 5-7 直到 [DONE]
     │ → setIsStreaming(false)
     │ → 光标消失
     │ → 对话完成

恭喜! 如果你跟着做到这里,你已经有了一个完整的、可运行的流式聊天应用——后端 80 行 Python、前端 3 个 React 文件。所有代码都可以直接复制使用。


本章小结

知识点要点
项目架构React + FastAPI + OpenAI SDK,三层分离
后端核心StreamingResponse + 异步生成器,80 行代码
前端核心useStreamChat Hook 封装所有流式逻辑
Vite 代理开发时 /api 代理到后端,避免 CORS
Ollama 兼容只需改 base_url,代码完全不变
全链路用户输入 → POST → LLM 流式 → SSE → ReadableStream → React 更新

下一章预告:生产级优化 —— TTFT 优化、断线重连、Token 计费、并发背压、内容审计等真实上线必须解决的问题。

坚持是一种品格