1.1 编程语言
AI 应用开发需要掌握后端(Python)、Web 框架(FastAPI)和前端(React)三大技术栈。本文档围绕这三个方向,提供从基础到实战的完整学习内容。
一、Python 核心
Python 是 AI 应用开发的主力语言,拥有丰富的机器学习和数据处理生态。
1. 基础语法
变量与数据类型
Python 是动态类型语言,变量无需声明类型:
python
# 基本数据类型
name = "Alice" # str 字符串
age = 25 # int 整数
score = 95.5 # float 浮点数
is_active = True # bool 布尔值
data = None # NoneType 空值
# 类型检查与转换
print(type(name)) # <class 'str'>
num_str = str(age) # 整数转字符串 -> "25"
str_num = int("100") # 字符串转整数 -> 100运算符
python
# 算术运算符
print(10 / 3) # 3.3333 普通除法
print(10 // 3) # 3 整除
print(10 % 3) # 1 取余
print(2 ** 10) # 1024 幂运算
# 比较运算符:==, !=, >, <, >=, <=
# 逻辑运算符:and, or, not
# 成员运算符:in, not in
# 身份运算符:is, is not(比较对象引用,非值)
print("AI" in "AI应用开发") # True
print([] is not None) # True控制流
python
# if/elif/else
def classify_score(score):
if score >= 90:
return "优秀"
elif score >= 60:
return "及格"
else:
return "不及格"
# for 循环
for i in range(5): # 0, 1, 2, 3, 4
print(i)
for key, value in {"a": 1, "b": 2}.items():
print(f"{key}: {value}")
# while 循环
count = 0
while count < 3:
print(count)
count += 1
# 海象运算符(Python 3.8+)
while (line := input("输入: ")) != "quit":
print(f"你输入了: {line}")函数定义与参数传递
python
# 基本函数
def greet(name: str) -> str:
return f"Hello, {name}!"
# 默认参数
def connect(host: str, port: int = 8080):
print(f"连接到 {host}:{port}")
# *args:接收任意数量的位置参数,打包为元组
def add(*args):
return sum(args)
print(add(1, 2, 3)) # 6
# **kwargs:接收任意数量的关键字参数,打包为字典
def create_user(**kwargs):
for key, value in kwargs.items():
print(f"{key} = {value}")
create_user(name="Alice", age=25, role="admin")
# 混合使用(顺序:普通参数 -> *args -> 关键字参数 -> **kwargs)
def func(a, b, *args, key="default", **kwargs):
print(a, b, args, key, kwargs)模块与包的导入
python
# 标准导入
import os
import json
# 从模块导入特定对象
from pathlib import Path
from typing import List, Dict, Optional
# 别名导入
import numpy as np
import pandas as pd
# 相对导入(在包内部使用)
from . import utils
from ..config import settings文件操作与异常处理
python
# 文件读写
with open("data.txt", "r", encoding="utf-8") as f:
content = f.read()
with open("output.json", "w", encoding="utf-8") as f:
json.dump({"key": "value"}, f, ensure_ascii=False, indent=2)
# 异常处理
try:
result = 10 / 0
except ZeroDivisionError as e:
print(f"除零错误: {e}")
except (TypeError, ValueError) as e:
print(f"类型或值错误: {e}")
except Exception as e:
print(f"未知错误: {e}")
else:
print("没有异常时执行")
finally:
print("无论如何都会执行")
# 自定义异常
class APIError(Exception):
def __init__(self, status_code: int, message: str):
self.status_code = status_code
self.message = message
super().__init__(self.message)2. 数据结构
内置数据结构
python
# List(列表)- 有序、可变、可重复
fruits = ["apple", "banana", "cherry"]
fruits.append("date") # 末尾添加
fruits.insert(0, "avocado") # 指定位置插入
fruits.pop() # 移除末尾元素
fruits.sort() # 排序
sliced = fruits[1:3] # 切片
# Tuple(元组)- 有序、不可变
point = (3, 4)
x, y = point # 解包
# point[0] = 5 # 报错!元组不可变
# Dict(字典)- 键值对、无序(Python 3.7+ 保持插入顺序)
user = {"name": "Alice", "age": 25}
user["email"] = "alice@example.com" # 添加
name = user.get("name", "Unknown") # 安全获取
user.pop("age") # 删除
# Set(集合)- 无序、不重复
tags = {"python", "ai", "web"}
tags.add("ml")
tags.discard("web") # 删除(不存在不报错)
# 集合运算
a = {1, 2, 3}
b = {2, 3, 4}
print(a & b) # {2, 3} 交集
print(a | b) # {1, 2, 3, 4} 并集
print(a - b) # {1} 差集推导式与生成器表达式
python
# 列表推导式
squares = [x ** 2 for x in range(10)]
evens = [x for x in range(20) if x % 2 == 0]
# 字典推导式
word_lengths = {word: len(word) for word in ["hello", "world", "python"]}
# 集合推导式
unique_chars = {c.lower() for c in "Hello World" if c.isalpha()}
# 生成器表达式(惰性求值,节省内存)
gen = (x ** 2 for x in range(1000000)) # 不会立即计算
print(next(gen)) # 0
print(next(gen)) # 1常用内置函数
python
nums = [3, 1, 4, 1, 5, 9, 2, 6]
# map:对每个元素应用函数
doubled = list(map(lambda x: x * 2, nums))
# filter:过滤元素
big = list(filter(lambda x: x > 4, nums))
# reduce:累积计算
from functools import reduce
total = reduce(lambda a, b: a + b, nums)
# zip:并行遍历多个可迭代对象
names = ["Alice", "Bob", "Charlie"]
scores = [90, 85, 95]
for name, score in zip(names, scores):
print(f"{name}: {score}")
# enumerate:带索引遍历
for i, name in enumerate(names, start=1):
print(f"{i}. {name}")
# sorted:自定义排序
users = [{"name": "Bob", "age": 30}, {"name": "Alice", "age": 25}]
sorted_users = sorted(users, key=lambda u: u["age"])标准库:collections 与 itertools
python
from collections import deque, Counter, defaultdict, namedtuple
# deque:双端队列,两端操作 O(1)
dq = deque([1, 2, 3])
dq.appendleft(0) # 左端添加
dq.pop() # 右端弹出
# Counter:计数器
text = "hello world"
counter = Counter(text)
print(counter.most_common(3)) # [('l', 3), ('o', 2), ('h', 1)]
# defaultdict:带默认值的字典
word_index = defaultdict(list)
for i, word in enumerate(["apple", "banana", "apple"]):
word_index[word].append(i)
# {'apple': [0, 2], 'banana': [1]}
# namedtuple:命名元组
Point = namedtuple("Point", ["x", "y"])
p = Point(3, 4)
print(p.x, p.y)
from itertools import chain, product, combinations
# chain:连接多个可迭代对象
list(chain([1, 2], [3, 4])) # [1, 2, 3, 4]
# product:笛卡尔积
list(product("AB", "12")) # [('A','1'), ('A','2'), ('B','1'), ('B','2')]
# combinations:组合
list(combinations([1, 2, 3], 2)) # [(1,2), (1,3), (2,3)]3. 面向对象编程
类与实例
python
class User:
"""用户类"""
# 类属性(所有实例共享)
platform = "AI App"
def __init__(self, name: str, email: str):
# 实例属性
self.name = name
self.email = email
self._login_count = 0 # 约定的私有属性(单下划线)
def login(self):
"""实例方法"""
self._login_count += 1
return f"{self.name} 已登录(第 {self._login_count} 次)"
@classmethod
def from_dict(cls, data: dict):
"""类方法:替代构造函数"""
return cls(name=data["name"], email=data["email"])
@staticmethod
def validate_email(email: str) -> bool:
"""静态方法:不依赖实例或类"""
return "@" in email and "." in email
@property
def login_count(self) -> int:
"""属性装饰器:将方法伪装为属性访问"""
return self._login_count
# 使用
user = User("Alice", "alice@example.com")
print(user.login()) # Alice 已登录(第 1 次)
print(user.login_count) # 1(像属性一样访问)
user2 = User.from_dict({"name": "Bob", "email": "bob@example.com"})
print(User.validate_email("invalid")) # False继承与多态
python
class LLMProvider:
"""大模型提供者基类"""
def __init__(self, model_name: str):
self.model_name = model_name
def chat(self, message: str) -> str:
raise NotImplementedError("子类必须实现 chat 方法")
def __str__(self):
return f"LLMProvider({self.model_name})"
class OpenAIProvider(LLMProvider):
def __init__(self, model_name: str = "gpt-4", api_key: str = ""):
super().__init__(model_name)
self.api_key = api_key
def chat(self, message: str) -> str:
# 实际调用 OpenAI API
return f"[{self.model_name}] 回复: ..."
class ClaudeProvider(LLMProvider):
def __init__(self, model_name: str = "claude-3", api_key: str = ""):
super().__init__(model_name)
self.api_key = api_key
def chat(self, message: str) -> str:
return f"[{self.model_name}] 回复: ..."
# 多态:统一接口处理不同实现
def get_response(provider: LLMProvider, message: str) -> str:
return provider.chat(message)
openai = OpenAIProvider(api_key="sk-xxx")
claude = ClaudeProvider(api_key="sk-xxx")
print(get_response(openai, "你好"))
print(get_response(claude, "你好"))特殊方法(魔术方法)
python
class ChatMessage:
def __init__(self, role: str, content: str):
self.role = role
self.content = content
def __str__(self):
"""print() 时调用"""
return f"[{self.role}]: {self.content}"
def __repr__(self):
"""调试/开发时的表示"""
return f"ChatMessage(role='{self.role}', content='{self.content[:20]}...')"
def __len__(self):
return len(self.content)
def __eq__(self, other):
return self.role == other.role and self.content == other.content
class Pipeline:
"""可调用对象示例"""
def __init__(self, *steps):
self.steps = steps
def __call__(self, data):
"""使实例可以像函数一样调用"""
for step in self.steps:
data = step(data)
return data
# 使用
pipeline = Pipeline(str.lower, str.strip, lambda s: s.replace(" ", "_"))
print(pipeline(" Hello World ")) # hello_world4. 高级特性
装饰器
装饰器是 Python 中非常强大的特性,在 AI 应用中广泛用于日志、缓存、权限校验等场景。
python
import time
from functools import wraps
# 基本装饰器:函数执行时间统计
def timer(func):
@wraps(func) # 保留原函数的元信息
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start
print(f"{func.__name__} 执行耗时: {elapsed:.4f}s")
return result
return wrapper
@timer
def train_model(epochs: int):
time.sleep(0.1) # 模拟训练
return "训练完成"
train_model(10) # train_model 执行耗时: 0.1003s
# 带参数的装饰器
def retry(max_attempts: int = 3, delay: float = 1.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts:
raise
print(f"第 {attempt} 次失败: {e},{delay}s 后重试...")
time.sleep(delay)
return wrapper
return decorator
@retry(max_attempts=3, delay=0.5)
def call_api():
"""调用外部 API,失败自动重试"""
# ...
pass
# 装饰器链(执行顺序:从下往上装饰,从上往下执行)
@timer
@retry(max_attempts=2)
def fetch_data():
pass上下文管理器
python
# 使用类实现
class DatabaseConnection:
def __init__(self, db_url: str):
self.db_url = db_url
self.connection = None
def __enter__(self):
print(f"连接数据库: {self.db_url}")
self.connection = "模拟连接对象"
return self.connection
def __exit__(self, exc_type, exc_val, exc_tb):
print("关闭数据库连接")
self.connection = None
return False # 不吞掉异常
with DatabaseConnection("postgresql://localhost/mydb") as conn:
print(f"使用连接: {conn}")
# 使用 contextmanager 装饰器(更简洁)
from contextlib import contextmanager
@contextmanager
def timer_context(label: str):
start = time.time()
try:
yield # yield 之前是 __enter__,之后是 __exit__
finally:
elapsed = time.time() - start
print(f"{label} 耗时: {elapsed:.4f}s")
with timer_context("模型推理"):
time.sleep(0.1)迭代器与生成器
python
# 生成器函数(用 yield 逐个产出值,节省内存)
def read_large_file(file_path: str, chunk_size: int = 1024):
"""逐块读取大文件"""
with open(file_path, "r") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
# 用于流式处理 AI 响应
def stream_response(text: str):
"""模拟流式输出"""
for char in text:
yield char
time.sleep(0.02)
for char in stream_response("Hello, AI!"):
print(char, end="", flush=True)
# 自定义迭代器类
class BatchIterator:
"""将数据分批次迭代"""
def __init__(self, data: list, batch_size: int):
self.data = data
self.batch_size = batch_size
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index >= len(self.data):
raise StopIteration
batch = self.data[self.index:self.index + self.batch_size]
self.index += self.batch_size
return batch
for batch in BatchIterator(list(range(10)), batch_size=3):
print(batch) # [0,1,2] [3,4,5] [6,7,8] [9]异步编程
异步编程是 AI 应用开发的核心能力——调用大模型 API、处理并发请求都依赖异步。
python
import asyncio
import aiohttp
# 基本的 async/await
async def fetch_url(url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# 并发执行多个异步任务
async def fetch_all(urls: list[str]) -> list[str]:
tasks = [fetch_url(url) for url in urls]
results = await asyncio.gather(*tasks) # 并发执行
return results
# 异步生成器(流式调用 LLM API)
async def stream_chat(prompt: str):
"""模拟流式调用大模型"""
response_text = "这是一个异步流式响应的示例"
for char in response_text:
await asyncio.sleep(0.05) # 模拟网络延迟
yield char
async def main():
# 消费异步生成器
async for char in stream_chat("你好"):
print(char, end="", flush=True)
print()
# 使用 asyncio.Queue 做任务队列
queue = asyncio.Queue()
await queue.put("task1")
await queue.put("task2")
while not queue.empty():
task = await queue.get()
print(f"处理: {task}")
asyncio.run(main())
# 并发 vs 并行
# - threading:适合 I/O 密集型(网络请求、文件读写),受 GIL 限制
# - multiprocessing:适合 CPU 密集型(数据处理、模型推理),真正并行
# - asyncio:适合大量 I/O 并发(推荐用于 AI 应用的 API 调用)
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
# 线程池
def blocking_io_task(url):
import urllib.request
return urllib.request.urlopen(url).read()
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(blocking_io_task, url) for url in urls]
# 多进程
def cpu_intensive_task(data):
return sum(x ** 2 for x in data)
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(cpu_intensive_task, [range(10000)] * 4)5. 实战练习
练习1:装饰器实现函数执行时间统计
python
import time
from functools import wraps
def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"[Timer] {func.__name__}() 耗时 {elapsed:.6f}s")
return result
return wrapper
@timer
def fibonacci(n: int) -> int:
if n <= 1:
return n
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return b
print(fibonacci(1000))练习2:asyncio 异步爬虫
python
import asyncio
import aiohttp
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return {"url": url, "status": response.status, "length": len(await response.text())}
async def crawl(urls: list[str], concurrency: int = 5):
semaphore = asyncio.Semaphore(concurrency) # 限制并发数
async def limited_fetch(session, url):
async with semaphore:
return await fetch(session, url)
async with aiohttp.ClientSession() as session:
tasks = [limited_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
urls = [f"https://httpbin.org/get?page={i}" for i in range(10)]
results = asyncio.run(crawl(urls))
for r in results:
print(r)练习3:上下文管理器管理数据库连接
python
from contextlib import contextmanager
class SimpleDB:
"""简化的数据库连接管理器"""
def __init__(self, db_url: str):
self.db_url = db_url
def __enter__(self):
self.conn = self._connect()
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self.conn.rollback()
print(f"事务回滚: {exc_val}")
else:
self.conn.commit()
print("事务提交")
self.conn.close()
return False
def _connect(self):
print(f"连接到 {self.db_url}")
# 实际项目中使用 psycopg2、pymysql 等
return type("Connection", (), {
"execute": lambda self, sql: print(f"执行: {sql}"),
"commit": lambda self: None,
"rollback": lambda self: None,
"close": lambda self: print("连接已关闭"),
})()
with SimpleDB("postgresql://localhost/mydb") as conn:
conn.execute("INSERT INTO users (name) VALUES ('Alice')")
conn.execute("INSERT INTO users (name) VALUES ('Bob')")二、Web 开发
1. FastAPI 框架(推荐)
FastAPI 是现代 Python Web 框架,天然支持异步,自动生成 API 文档,非常适合构建 AI 应用后端。
基础使用
python
from fastapi import FastAPI, HTTPException, Depends, Query
from pydantic import BaseModel, Field
from typing import Optional
app = FastAPI(title="AI 应用 API", version="1.0.0")
# --- Pydantic 模型 ---
class ChatRequest(BaseModel):
message: str = Field(..., min_length=1, max_length=4096, description="用户消息")
model: str = Field(default="gpt-4", description="模型名称")
temperature: float = Field(default=0.7, ge=0, le=2)
class ChatResponse(BaseModel):
reply: str
model: str
usage: dict
# --- 路由定义 ---
# GET 请求 + 查询参数
@app.get("/models")
async def list_models(
provider: Optional[str] = Query(None, description="按提供商过滤")
):
models = ["gpt-4", "gpt-3.5-turbo", "claude-3"]
if provider:
models = [m for m in models if provider.lower() in m]
return {"models": models}
# POST 请求 + 请求体
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
# 调用大模型 API
reply = f"收到你的消息: {request.message}"
return ChatResponse(
reply=reply,
model=request.model,
usage={"prompt_tokens": 10, "completion_tokens": 20}
)
# 路径参数
@app.get("/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
return {"id": conversation_id, "messages": []}
# DELETE 请求 + 状态码
@app.delete("/conversations/{conversation_id}", status_code=204)
async def delete_conversation(conversation_id: str):
pass依赖注入
python
from fastapi import Depends, Header
# 依赖函数
async def get_api_key(x_api_key: str = Header(...)):
if x_api_key != "valid-key":
raise HTTPException(status_code=401, detail="无效的 API Key")
return x_api_key
async def get_current_user(api_key: str = Depends(get_api_key)):
# 根据 API Key 查找用户
return {"user_id": "user_123", "api_key": api_key}
@app.get("/profile")
async def get_profile(user: dict = Depends(get_current_user)):
return user进阶功能
python
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import asyncio
app = FastAPI()
# --- CORS 中间件 ---
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # 前端地址
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- 流式响应(AI 应用核心) ---
async def generate_stream(prompt: str):
"""模拟大模型流式输出"""
response = f"这是对「{prompt}」的回复,逐字输出。"
for char in response:
yield f"data: {char}\n\n" # SSE 格式
await asyncio.sleep(0.05)
yield "data: [DONE]\n\n"
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
return StreamingResponse(
generate_stream(request.message),
media_type="text/event-stream"
)
# --- 文件上传 ---
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
content = await file.read()
return {
"filename": file.filename,
"size": len(content),
"content_type": file.content_type
}
# --- 后台任务 ---
def process_document(file_path: str):
"""在后台处理文档(如向量化)"""
# 耗时操作
pass
@app.post("/documents")
async def create_document(
file: UploadFile = File(...),
background_tasks: BackgroundTasks = None
):
file_path = f"/tmp/{file.filename}"
background_tasks.add_task(process_document, file_path)
return {"message": "文档上传成功,正在后台处理"}
# --- WebSocket ---
from fastapi import WebSocket
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
# 流式回复
for char in f"回复: {data}":
await websocket.send_text(char)
await asyncio.sleep(0.05)API 设计最佳实践
python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from datetime import datetime
app = FastAPI()
# --- 统一响应格式 ---
class APIResponse:
@staticmethod
def success(data=None, message="success"):
return {"code": 200, "message": message, "data": data}
@staticmethod
def error(code: int, message: str, detail=None):
return {"code": code, "message": message, "detail": detail}
# --- 全局异常处理 ---
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content=APIResponse.error(exc.status_code, exc.detail)
)
# --- JWT 认证 ---
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
SECRET_KEY = "your-secret-key"
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token 已过期")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="无效的 Token")
# --- 接口版本管理 ---
from fastapi import APIRouter
v1_router = APIRouter(prefix="/api/v1", tags=["v1"])
v2_router = APIRouter(prefix="/api/v2", tags=["v2"])
@v1_router.get("/chat")
async def chat_v1():
return {"version": "v1"}
@v2_router.get("/chat")
async def chat_v2():
return {"version": "v2", "features": ["streaming", "function_calling"]}
app.include_router(v1_router)
app.include_router(v2_router)数据库集成
python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, sessionmaker
from sqlalchemy import String, Text, DateTime
from datetime import datetime
# 异步引擎
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# 模型定义
class Base(DeclarativeBase):
pass
class Conversation(Base):
__tablename__ = "conversations"
id: Mapped[str] = mapped_column(String(36), primary_key=True)
title: Mapped[str] = mapped_column(String(200))
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
class Message(Base):
__tablename__ = "messages"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
conversation_id: Mapped[str] = mapped_column(String(36))
role: Mapped[str] = mapped_column(String(20)) # user / assistant / system
content: Mapped[str] = mapped_column(Text)
# 依赖注入获取数据库会话
async def get_db():
async with async_session() as session:
yield session
@app.get("/conversations")
async def list_conversations(db: AsyncSession = Depends(get_db)):
from sqlalchemy import select
result = await db.execute(select(Conversation).order_by(Conversation.created_at.desc()))
return result.scalars().all()2. Flask 框架(可选)
Flask 更轻量,适合快速原型开发:
python
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/chat", methods=["POST"])
def chat():
data = request.get_json()
message = data.get("message", "")
return jsonify({"reply": f"回复: {message}"})
@app.route("/models")
def list_models():
return jsonify({"models": ["gpt-4", "claude-3"]})
if __name__ == "__main__":
app.run(debug=True, port=5000)Flask 与 FastAPI 对比:FastAPI 原生支持异步、自动 OpenAPI 文档、类型校验,更适合 AI 应用场景。Flask 生态成熟但需要额外插件。
三、前端基础
1. HTML/CSS/JavaScript 基础
HTML5 核心
html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI 聊天应用</title>
</head>
<body>
<!-- 语义化标签 -->
<header>
<nav>导航栏</nav>
</header>
<main>
<section class="chat-container">
<article class="message user">你好</article>
<article class="message assistant">你好!有什么可以帮你的?</article>
</section>
<form id="chat-form">
<textarea id="input" placeholder="输入消息..." required></textarea>
<button type="submit">发送</button>
</form>
</main>
<footer>
<p>AI Chat App © 2025</p>
</footer>
</body>
</html>CSS3 布局
css
/* Flexbox 布局 —— 聊天界面 */
.chat-container {
display: flex;
flex-direction: column;
gap: 12px;
max-width: 800px;
margin: 0 auto;
padding: 20px;
height: calc(100vh - 120px);
overflow-y: auto;
}
.message {
max-width: 70%;
padding: 12px 16px;
border-radius: 12px;
line-height: 1.6;
}
.message.user {
align-self: flex-end;
background-color: #007AFF;
color: white;
}
.message.assistant {
align-self: flex-start;
background-color: #F0F0F0;
color: #333;
}
/* Grid 布局 —— 页面整体结构 */
body {
display: grid;
grid-template-rows: 60px 1fr 40px;
min-height: 100vh;
}
/* 响应式设计 */
@media (max-width: 768px) {
.message {
max-width: 90%;
}
.chat-container {
padding: 10px;
}
}JavaScript 核心
javascript
// ES6+ 基础
const greeting = (name) => `Hello, ${name}!`;
// 解构赋值
const { model, temperature = 0.7 } = config;
const [first, ...rest] = items;
// Promise 与 async/await
async function sendMessage(message) {
try {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return await response.json();
} catch (error) {
console.error("请求失败:", error);
}
}
// 流式请求(SSE)—— AI 应用核心
async function streamChat(message, onChunk) {
const response = await fetch("/api/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split("\n").filter(line => line.startsWith("data: "));
for (const line of lines) {
const data = line.slice(6); // 去掉 "data: "
if (data === "[DONE]") return;
onChunk(data);
}
}
}
// DOM 操作
const form = document.getElementById("chat-form");
form.addEventListener("submit", async (e) => {
e.preventDefault();
const input = document.getElementById("input");
const message = input.value.trim();
if (!message) return;
appendMessage("user", message);
input.value = "";
let assistantMsg = appendMessage("assistant", "");
await streamChat(message, (chunk) => {
assistantMsg.textContent += chunk;
});
});
function appendMessage(role, content) {
const container = document.querySelector(".chat-container");
const div = document.createElement("article");
div.className = `message ${role}`;
div.textContent = content;
container.appendChild(div);
container.scrollTop = container.scrollHeight;
return div;
}2. React 框架(推荐)
核心概念
jsx
// 函数组件 + JSX
function ChatMessage({ role, content }) {
return (
<div className={`message ${role}`}>
<span className="role-label">{role === "user" ? "你" : "AI"}</span>
<p>{content}</p>
</div>
);
}
// Props 与条件渲染
function MessageList({ messages, isLoading }) {
return (
<div className="message-list">
{messages.map((msg, index) => (
<ChatMessage key={index} role={msg.role} content={msg.content} />
))}
{isLoading && <div className="loading">AI 正在思考...</div>}
</div>
);
}Hooks
jsx
import { useState, useEffect, useRef, useCallback, useMemo } from "react";
function ChatApp() {
// useState:管理组件状态
const [messages, setMessages] = useState([]);
const [input, setInput] = useState("");
const [isLoading, setIsLoading] = useState(false);
// useRef:引用 DOM 元素或保存不触发重渲染的值
const messagesEndRef = useRef(null);
const abortControllerRef = useRef(null);
// useEffect:处理副作用
useEffect(() => {
// 消息更新时自动滚动到底部
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, [messages]);
// useCallback:缓存函数引用,避免子组件不必要的重渲染
const sendMessage = useCallback(async () => {
if (!input.trim() || isLoading) return;
const userMessage = { role: "user", content: input };
setMessages(prev => [...prev, userMessage]);
setInput("");
setIsLoading(true);
try {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message: input }),
});
const data = await response.json();
setMessages(prev => [...prev, { role: "assistant", content: data.reply }]);
} catch (error) {
console.error("发送失败:", error);
} finally {
setIsLoading(false);
}
}, [input, isLoading]);
// useMemo:缓存计算结果
const messageCount = useMemo(() => messages.length, [messages]);
return (
<div className="chat-app">
<header>AI 聊天 ({messageCount} 条消息)</header>
<MessageList messages={messages} isLoading={isLoading} />
<div ref={messagesEndRef} />
<div className="input-area">
<textarea
value={input}
onChange={e => setInput(e.target.value)}
onKeyDown={e => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
}}
placeholder="输入消息..."
/>
<button onClick={sendMessage} disabled={isLoading}>发送</button>
</div>
</div>
);
}自定义 Hook
jsx
// 封装流式聊天逻辑
function useStreamChat() {
const [streamingText, setStreamingText] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const startStream = useCallback(async (message) => {
setIsStreaming(true);
setStreamingText("");
const response = await fetch("/api/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const chunks = text.split("\n")
.filter(line => line.startsWith("data: "))
.map(line => line.slice(6))
.filter(data => data !== "[DONE]");
for (const chunk of chunks) {
setStreamingText(prev => prev + chunk);
}
}
setIsStreaming(false);
return streamingText;
}, []);
return { streamingText, isStreaming, startStream };
}AI 应用常用功能
jsx
// Markdown 渲染
import ReactMarkdown from "react-markdown";
import { Prism as SyntaxHighlighter } from "react-syntax-highlighter";
import { oneDark } from "react-syntax-highlighter/dist/esm/styles/prism";
function MarkdownMessage({ content }) {
return (
<ReactMarkdown
components={{
code({ node, inline, className, children, ...props }) {
const match = /language-(\w+)/.exec(className || "");
return !inline && match ? (
<SyntaxHighlighter
style={oneDark}
language={match[1]}
PreTag="div"
{...props}
>
{String(children).replace(/\n$/, "")}
</SyntaxHighlighter>
) : (
<code className={className} {...props}>{children}</code>
);
},
}}
>
{content}
</ReactMarkdown>
);
}
// 文件上传组件
function FileUpload({ onUpload }) {
const [dragActive, setDragActive] = useState(false);
const handleDrop = async (e) => {
e.preventDefault();
setDragActive(false);
const files = Array.from(e.dataTransfer.files);
for (const file of files) {
const formData = new FormData();
formData.append("file", file);
const response = await fetch("/api/upload", {
method: "POST",
body: formData,
});
const data = await response.json();
onUpload(data);
}
};
return (
<div
className={`upload-zone ${dragActive ? "active" : ""}`}
onDragOver={e => { e.preventDefault(); setDragActive(true); }}
onDragLeave={() => setDragActive(false)}
onDrop={handleDrop}
>
拖拽文件到此处上传
</div>
);
}状态管理与路由
jsx
// React Router
import { BrowserRouter, Routes, Route, Link } from "react-router-dom";
function App() {
return (
<BrowserRouter>
<nav>
<Link to="/">聊天</Link>
<Link to="/history">历史</Link>
<Link to="/settings">设置</Link>
</nav>
<Routes>
<Route path="/" element={<ChatApp />} />
<Route path="/history" element={<HistoryPage />} />
<Route path="/settings" element={<SettingsPage />} />
</Routes>
</BrowserRouter>
);
}
// Context API 全局状态
import { createContext, useContext, useReducer } from "react";
const AppContext = createContext();
function appReducer(state, action) {
switch (action.type) {
case "SET_MODEL":
return { ...state, model: action.payload };
case "ADD_CONVERSATION":
return { ...state, conversations: [...state.conversations, action.payload] };
default:
return state;
}
}
function AppProvider({ children }) {
const [state, dispatch] = useReducer(appReducer, {
model: "gpt-4",
conversations: [],
});
return (
<AppContext.Provider value={{ state, dispatch }}>
{children}
</AppContext.Provider>
);
}
function useApp() {
return useContext(AppContext);
}3. 前端工具链
bash
# 包管理器
npm install # 安装依赖
npm run dev # 启动开发服务器
npx create-vite my-app --template react # 使用 Vite 创建 React 项目
# 常用依赖
npm install react-router-dom # 路由
npm install react-markdown # Markdown 渲染
npm install react-syntax-highlighter # 代码高亮
npm install axios # HTTP 请求库
npm install tailwindcss # CSS 框架
# 代码规范
npm install -D eslint prettier # 代码检查与格式化Tailwind CSS 快速上手
jsx
// Tailwind CSS 实现聊天界面
function ChatUI() {
return (
<div className="flex flex-col h-screen bg-gray-100">
{/* 头部 */}
<header className="bg-white shadow px-6 py-4">
<h1 className="text-xl font-bold text-gray-800">AI 聊天</h1>
</header>
{/* 消息区域 */}
<main className="flex-1 overflow-y-auto p-6 space-y-4">
<div className="flex justify-end">
<div className="bg-blue-500 text-white px-4 py-2 rounded-lg max-w-[70%]">
你好
</div>
</div>
<div className="flex justify-start">
<div className="bg-white text-gray-800 px-4 py-2 rounded-lg max-w-[70%] shadow">
你好!有什么可以帮助你的吗?
</div>
</div>
</main>
{/* 输入区域 */}
<div className="bg-white border-t px-6 py-4">
<div className="flex gap-3">
<textarea className="flex-1 border rounded-lg px-4 py-2 resize-none focus:outline-none focus:ring-2 focus:ring-blue-500" rows="2" placeholder="输入消息..." />
<button className="bg-blue-500 text-white px-6 py-2 rounded-lg hover:bg-blue-600 transition">
发送
</button>
</div>
</div>
</div>
);
}四、学习建议
- Python 优先:Python 是 AI 应用的基石,特别是异步编程(asyncio)和装饰器需要熟练掌握
- FastAPI 为主:其异步特性和流式响应能力是 AI 应用后端的最佳选择
- 前端按需选择:React 社区资源更丰富,Vue 上手更快,根据团队技术栈决定
- 边学边做:每个知识点都配合实际代码练习,不要只看不写
- 关注 AI 场景:重点掌握流式输出、文件上传、WebSocket 等 AI 应用高频功能
推荐学习路线
Python 基础语法 → 数据结构 → 面向对象 → 高级特性(装饰器/异步)
↓
FastAPI 基础 → API 设计 → 数据库集成 → 流式响应
↓
HTML/CSS/JS 基础 → React 组件与 Hooks → 聊天界面实战
↓
综合实战:完整的 AI 对话应用(前后端)