第八章 数据库集成实战
本章目标:掌握 SQLAlchemy + FastAPI 的集成方法,将第二章的内存版 Todo 升级为数据库持久化版本,理解 Repository/Service 分层架构,了解 Alembic 数据库迁移基础。
预计时长:60 分钟
8.1 SQLAlchemy + FastAPI 集成
安装依赖
pip install sqlalchemy本章使用 SQLite(零配置),生产环境可替换为 PostgreSQL、MySQL 等,只需改连接字符串。
整体架构
FastAPI 应用
│
├── database.py ← 数据库引擎 + 会话工厂
├── models.py ← ORM 模型(对应数据库表)
├── schemas.py ← Pydantic 模型(请求/响应数据结构)
└── main.py ← 路由 + 业务逻辑数据流:
客户端请求 JSON
↓
Pydantic Schema 校验(schemas.py)
↓
路由函数处理业务逻辑(main.py)
↓
SQLAlchemy ORM 操作数据库(models.py)
↓
数据库读写(SQLite / PostgreSQL / MySQL)
↓
ORM 对象 → Pydantic Schema → JSON 响应第一步:配置数据库连接(database.py)
创建 database.py:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
SQLALCHEMY_DATABASE_URL = "sqlite:///./todo.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}, # SQLite 专用参数
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
"""所有 ORM 模型的基类"""
pass各参数说明:
| 参数 | 说明 |
|---|---|
create_engine(url) | 创建数据库引擎,管理连接池 |
check_same_thread=False | 允许 SQLite 跨线程使用(FastAPI 线程池需要) |
sessionmaker(...) | 会话工厂,每次调用生成一个新的数据库会话 |
autocommit=False | 手动控制事务提交 |
autoflush=False | 手动控制数据刷新到数据库 |
DeclarativeBase | SQLAlchemy 2.0 推荐的基类写法 |
连接字符串示例:
- SQLite:
sqlite:///./todo.db- PostgreSQL:
postgresql://user:pass@localhost:5432/dbname- MySQL:
mysql+pymysql://user:pass@localhost:3306/dbname
第二步:定义 ORM 模型(models.py)
创建 models.py:
from sqlalchemy import String, Boolean, Integer, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from database import Base
from datetime import datetime
class Todo(Base):
__tablename__ = "todos"
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
title: Mapped[str] = mapped_column(String(200), nullable=False)
description: Mapped[str | None] = mapped_column(String(500), default=None)
completed: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime, server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, server_default=func.now(), onupdate=func.now()
)ORM 模型 vs Pydantic 模型的区别:
| 维度 | ORM 模型(models.py) | Pydantic 模型(schemas.py) |
|---|---|---|
| 作用 | 映射数据库表结构 | 定义 API 的请求/响应格式 |
| 基类 | DeclarativeBase | BaseModel |
| 字段写法 | mapped_column(...) | Python 类型注解 |
| 包含什么 | 数据库细节(索引、约束、默认值) | 业务数据(校验规则、示例值) |
| 使用场景 | 数据库 CRUD | 接口输入/输出 |
第三步:定义 Pydantic 模型(schemas.py)
创建 schemas.py:
from pydantic import BaseModel, Field
from datetime import datetime
class TodoCreate(BaseModel):
"""创建 Todo 的请求体"""
title: str = Field(..., min_length=1, max_length=200, examples=["学习 FastAPI"])
description: str | None = Field(None, max_length=500, examples=["完成第八章"])
completed: bool = False
class TodoUpdate(BaseModel):
"""更新 Todo 的请求体(所有字段可选)"""
title: str | None = Field(None, min_length=1, max_length=200)
description: str | None = None
completed: bool | None = None
class TodoResponse(BaseModel):
"""返回给客户端的 Todo"""
id: int
title: str
description: str | None
completed: bool
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}关键配置:
from_attributes = True(Pydantic V2 写法)让 Pydantic 能从 ORM 对象的属性读取数据,而不是只从 dict 读取。没有这个配置,TodoResponse.model_validate(orm_obj)会报错。
第四步:创建数据库会话依赖
在 database.py 中添加(也可以单独放到 dependencies.py):
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase, Session
from typing import Generator
SQLALCHEMY_DATABASE_URL = "sqlite:///./todo.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
pass
def get_db() -> Generator[Session, None, None]:
"""数据库会话依赖——用 yield 确保请求结束后自动关闭连接"""
db = SessionLocal()
try:
yield db
finally:
db.close()这个 get_db 就是第五章学过的 yield 依赖:
请求到达
↓
get_db() 创建数据库会话
↓ yield db
路由函数使用 db 执行查询
↓
函数返回响应
↓ finally
db.close() 归还连接8.2 完整 CRUD 实战
Todo 数据库版(main.py)
创建 main.py:
from fastapi import FastAPI, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from database import engine, get_db, Base
from models import Todo
from schemas import TodoCreate, TodoUpdate, TodoResponse
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Todo API - 数据库版")
@app.post("/todos", response_model=TodoResponse, status_code=201)
def create_todo(todo: TodoCreate, db: Session = Depends(get_db)):
db_todo = Todo(
title=todo.title,
description=todo.description,
completed=todo.completed,
)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
@app.get("/todos", response_model=list[TodoResponse])
def list_todos(
completed: bool | None = None,
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100),
db: Session = Depends(get_db),
):
query = db.query(Todo)
if completed is not None:
query = query.filter(Todo.completed == completed)
return query.offset(skip).limit(limit).all()
@app.get("/todos/{todo_id}", response_model=TodoResponse)
def get_todo(todo_id: int, db: Session = Depends(get_db)):
todo = db.query(Todo).filter(Todo.id == todo_id).first()
if not todo:
raise HTTPException(status_code=404, detail="Todo 不存在")
return todo
@app.put("/todos/{todo_id}", response_model=TodoResponse)
def update_todo(
todo_id: int, todo_update: TodoUpdate, db: Session = Depends(get_db)
):
todo = db.query(Todo).filter(Todo.id == todo_id).first()
if not todo:
raise HTTPException(status_code=404, detail="Todo 不存在")
update_data = todo_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(todo, key, value)
db.commit()
db.refresh(todo)
return todo
@app.delete("/todos/{todo_id}")
def delete_todo(todo_id: int, db: Session = Depends(get_db)):
todo = db.query(Todo).filter(Todo.id == todo_id).first()
if not todo:
raise HTTPException(status_code=404, detail="Todo 不存在")
db.delete(todo)
db.commit()
return {"message": f"Todo {todo_id} 已删除"}启动并测试
fastapi dev main.py启动后自动创建 todo.db 文件。打开 http://127.0.0.1:8000/docs 测试:
| 步骤 | 操作 | 预期结果 |
|---|---|---|
| 1 | POST /todos,body: {"title": "学习 SQLAlchemy"} | 返回带 id、created_at 的完整 Todo |
| 2 | POST /todos,body: {"title": "写单元测试", "completed": true} | 返回 id=2 |
| 3 | GET /todos | 返回 2 条记录 |
| 4 | GET /todos?completed=false | 只返回未完成的 |
| 5 | PUT /todos/1,body: {"completed": true} | 标记完成,updated_at 改变 |
| 6 | DELETE /todos/2 | 删除成功 |
| 7 | 重启服务,GET /todos | 数据依然存在(持久化!) |
与第二章内存版的对比
| 维度 | 第二章(内存版) | 本章(数据库版) |
|---|---|---|
| 数据存储 | Python 字典 | SQLite 数据库文件 |
| 持久化 | 重启丢失 | 永久保存 |
| ID 生成 | 手动 next_id += 1 | 数据库自增主键 |
| 查询能力 | 列表遍历 | SQL 查询(索引、过滤、分页) |
| 并发安全 | 全局变量竞态风险 | 数据库事务保证 |
| 时间记录 | 无 | created_at、updated_at 自动管理 |
CRUD 操作速查
| 操作 | SQLAlchemy 代码 | 说明 |
|---|---|---|
| Create | db.add(obj) → db.commit() → db.refresh(obj) | 添加、提交、刷新获取数据库生成的字段 |
| Read 单条 | db.query(Model).filter(Model.id == id).first() | 返回对象或 None |
| Read 列表 | db.query(Model).offset(n).limit(m).all() | 分页查询 |
| Update | setattr(obj, key, value) → db.commit() | 修改属性后提交 |
| Delete | db.delete(obj) → db.commit() | 删除后提交 |
Repository / Service 分层思路
随着项目增长,把所有逻辑写在路由函数里会导致代码臃肿、难以测试。推荐分层:
请求 → 路由层(Router) → 服务层(Service) → 仓储层(Repository) → 数据库
↓ ↓ ↓
参数解析/校验 业务逻辑编排 纯数据库操作| 层 | 职责 | 示例 |
|---|---|---|
| Router(路由层) | 接收请求、参数校验、调用 Service | 解析请求体、返回响应 |
| Service(服务层) | 业务逻辑、规则校验、组合多个 Repository | "创建 Todo 前检查标题是否重复" |
| Repository(仓储层) | 纯数据库操作、封装 SQL | find_by_id()、create()、delete() |
创建 repository.py:
from sqlalchemy.orm import Session
from models import Todo
from schemas import TodoCreate, TodoUpdate
class TodoRepository:
"""Todo 仓储层——封装所有数据库操作"""
def __init__(self, db: Session):
self.db = db
def get_by_id(self, todo_id: int) -> Todo | None:
return self.db.query(Todo).filter(Todo.id == todo_id).first()
def get_list(
self,
skip: int = 0,
limit: int = 10,
completed: bool | None = None,
) -> list[Todo]:
query = self.db.query(Todo)
if completed is not None:
query = query.filter(Todo.completed == completed)
return query.offset(skip).limit(limit).all()
def create(self, todo_data: TodoCreate) -> Todo:
todo = Todo(
title=todo_data.title,
description=todo_data.description,
completed=todo_data.completed,
)
self.db.add(todo)
self.db.commit()
self.db.refresh(todo)
return todo
def update(self, todo: Todo, todo_data: TodoUpdate) -> Todo:
update_dict = todo_data.model_dump(exclude_unset=True)
for key, value in update_dict.items():
setattr(todo, key, value)
self.db.commit()
self.db.refresh(todo)
return todo
def delete(self, todo: Todo) -> None:
self.db.delete(todo)
self.db.commit()创建 service.py:
from fastapi import HTTPException
from repository import TodoRepository
from schemas import TodoCreate, TodoUpdate
from models import Todo
class TodoService:
"""Todo 服务层——编排业务逻辑"""
def __init__(self, repo: TodoRepository):
self.repo = repo
def get_todo_or_404(self, todo_id: int) -> Todo:
todo = self.repo.get_by_id(todo_id)
if not todo:
raise HTTPException(status_code=404, detail="Todo 不存在")
return todo
def list_todos(
self, skip: int, limit: int, completed: bool | None
) -> list[Todo]:
return self.repo.get_list(skip=skip, limit=limit, completed=completed)
def create_todo(self, todo_data: TodoCreate) -> Todo:
return self.repo.create(todo_data)
def update_todo(self, todo_id: int, todo_data: TodoUpdate) -> Todo:
todo = self.get_todo_or_404(todo_id)
return self.repo.update(todo, todo_data)
def delete_todo(self, todo_id: int) -> dict:
todo = self.get_todo_or_404(todo_id)
self.repo.delete(todo)
return {"message": f"Todo {todo_id} 已删除"}重构后的 main.py(分层版):
from fastapi import FastAPI, Depends, Query
from sqlalchemy.orm import Session
from database import engine, get_db, Base
from repository import TodoRepository
from service import TodoService
from schemas import TodoCreate, TodoUpdate, TodoResponse
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Todo API - 分层架构版")
def get_todo_service(db: Session = Depends(get_db)) -> TodoService:
"""依赖注入:组装 Service"""
repo = TodoRepository(db)
return TodoService(repo)
@app.post("/todos", response_model=TodoResponse, status_code=201)
def create_todo(
todo: TodoCreate,
svc: TodoService = Depends(get_todo_service),
):
return svc.create_todo(todo)
@app.get("/todos", response_model=list[TodoResponse])
def list_todos(
completed: bool | None = None,
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100),
svc: TodoService = Depends(get_todo_service),
):
return svc.list_todos(skip=skip, limit=limit, completed=completed)
@app.get("/todos/{todo_id}", response_model=TodoResponse)
def get_todo(
todo_id: int,
svc: TodoService = Depends(get_todo_service),
):
return svc.get_todo_or_404(todo_id)
@app.put("/todos/{todo_id}", response_model=TodoResponse)
def update_todo(
todo_id: int,
todo_update: TodoUpdate,
svc: TodoService = Depends(get_todo_service),
):
return svc.update_todo(todo_id, todo_update)
@app.delete("/todos/{todo_id}")
def delete_todo(
todo_id: int,
svc: TodoService = Depends(get_todo_service),
):
return svc.delete_todo(todo_id)分层的好处:
不分层(全写在路由里): 分层后:
@app.post("/todos") @app.post("/todos")
def create(todo, db): def create(todo, svc):
# 校验逻辑 return svc.create(todo)
# 业务规则 ↑ 路由层只做请求/响应
# 数据库操作
# 错误处理 Service: 业务逻辑
# 全混在一起 Repository: 数据库操作
# 难以测试 → 各司其职,容易测试8.3 数据库迁移简介
为什么需要数据库迁移?
前面我们用 Base.metadata.create_all() 创建表,但它有一个致命问题:
第一次启动 → 创建 todos 表(id, title, completed) ✅ 正常
↓
需求变更:给 Todo 加一个 priority 字段
↓
再次启动 → create_all() 发现表已存在,什么都不做 ❌ 字段没加上!create_all() 只能创建不存在的表,不能修改已有表结构。这就是数据库迁移工具的用武之地。
Alembic 简介
Alembic 是 SQLAlchemy 官方推荐的数据库迁移工具,类似于 Django 的 makemigrations / migrate。
Alembic 核心流程:
1. init → 初始化迁移环境
2. revision → 生成迁移脚本(记录表结构变更)
3. upgrade → 执行迁移(应用变更到数据库)
4. downgrade → 回滚迁移(撤销变更)
类比 Git:
revision ≈ git commit 记录一次变更
upgrade ≈ git pull 应用变更
downgrade ≈ git revert 撤销变更安装
pip install alembic第一步:初始化
alembic init alembic生成的目录结构:
项目目录/
├── alembic/
│ ├── versions/ ← 迁移脚本存放目录
│ ├── env.py ← 迁移环境配置(重点修改)
│ ├── script.py.mako ← 迁移脚本模板
│ └── README
├── alembic.ini ← Alembic 主配置文件
├── database.py
├── models.py
└── main.py第二步:配置 Alembic
修改 alembic.ini,设置数据库连接:
# 找到这行,改为你的数据库地址
sqlalchemy.url = sqlite:///./todo.db修改 alembic/env.py,让 Alembic 能发现你的 ORM 模型:
# 在 env.py 顶部添加
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parents[1]))
from models import Todo # noqa: F401 确保模型被导入
from database import Base
# 找到这行,改为:
target_metadata = Base.metadata第三步:生成迁移脚本
alembic revision --autogenerate -m "create todos table"Alembic 会对比当前 ORM 模型和数据库实际结构,自动生成迁移脚本。在 alembic/versions/ 下会生成类似这样的文件:
"""create todos table
Revision ID: a1b2c3d4e5f6
Revises:
Create Date: 2026-03-24 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'a1b2c3d4e5f6'
down_revision = None
def upgrade() -> None:
op.create_table(
'todos',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(length=200), nullable=False),
sa.Column('description', sa.String(length=500), nullable=True),
sa.Column('completed', sa.Boolean(), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)')),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)')),
sa.PrimaryKeyConstraint('id'),
)
op.create_index(op.f('ix_todos_id'), 'todos', ['id'], unique=False)
def downgrade() -> None:
op.drop_index(op.f('ix_todos_id'), table_name='todos')
op.drop_table('todos')第四步:执行迁移
# 升级到最新版本
alembic upgrade head
# 查看当前版本
alembic current
# 查看迁移历史
alembic history后续迭代流程
假设需要给 Todo 加一个 priority 字段:
# 1. 修改 models.py,添加字段
class Todo(Base):
__tablename__ = "todos"
# ... 原有字段 ...
priority: Mapped[int] = mapped_column(Integer, default=0)# 2. 生成迁移脚本
alembic revision --autogenerate -m "add priority to todos"
# 3. 执行迁移
alembic upgrade head# 如果需要回滚
alembic downgrade -1 # 回退一个版本Alembic 常用命令速查
| 命令 | 说明 |
|---|---|
alembic init alembic | 初始化迁移环境 |
alembic revision --autogenerate -m "描述" | 自动生成迁移脚本 |
alembic upgrade head | 升级到最新版本 |
alembic downgrade -1 | 回退一个版本 |
alembic current | 查看当前数据库版本 |
alembic history | 查看迁移历史 |
alembic heads | 查看最新的迁移版本 |
使用 Alembic 后的项目启动方式
# main.py 中不再需要 create_all
# Base.metadata.create_all(bind=engine) ← 删除这行
# 改用 Alembic 管理表结构:
# 首次部署: alembic upgrade head
# 后续迭代: alembic revision --autogenerate → alembic upgrade head8.4 动手练习
练习 1:给 Todo 增加搜索功能
在 main.py 中新增一个按标题关键字搜索的接口:
@app.get("/todos/search/", response_model=list[TodoResponse])
def search_todos(
keyword: str = Query(..., min_length=1, description="搜索关键字"),
db: Session = Depends(get_db),
):
"""按标题模糊搜索 Todo"""
todos = db.query(Todo).filter(Todo.title.contains(keyword)).all()
return todos测试:先创建几条 Todo,然后访问 GET /todos/search/?keyword=学习,看看能否正确返回。
练习 2:添加统计接口
from sqlalchemy import func as sa_func
@app.get("/todos/stats/")
def todo_stats(db: Session = Depends(get_db)):
"""返回 Todo 的统计信息"""
total = db.query(sa_func.count(Todo.id)).scalar()
completed = db.query(sa_func.count(Todo.id)).filter(
Todo.completed == True
).scalar()
return {
"total": total,
"completed": completed,
"pending": total - completed,
"completion_rate": f"{(completed / total * 100):.1f}%" if total > 0 else "0%",
}注意:搜索和统计接口的路径
/todos/search/、/todos/stats/必须写在/todos/{todo_id}前面,否则search和stats会被当作todo_id参数(第二章路径顺序问题)。
练习 3:尝试完整的 Alembic 流程
- 删除已有的
todo.db - 执行
alembic init alembic初始化 - 配置
alembic.ini和alembic/env.py - 执行
alembic revision --autogenerate -m "init"生成初始迁移 - 执行
alembic upgrade head创建表 - 启动 FastAPI,创建几条 Todo 测试
- 给
models.py的Todo加一个priority: Mapped[int]字段 - 执行
alembic revision --autogenerate -m "add priority" - 执行
alembic upgrade head应用迁移 - 验证新字段已生效
本章小结
| 概念 | 要点 |
|---|---|
| SQLAlchemy 集成 | engine 管连接池,SessionLocal 管会话,Base 管模型基类 |
| ORM 模型 | 继承 Base,用 Mapped + mapped_column 定义字段 |
| Pydantic 模型 | from_attributes = True 让 Pydantic 能读取 ORM 对象属性 |
| 数据库会话依赖 | yield 依赖确保每个请求用完自动关闭连接 |
| CRUD 操作 | add → commit → refresh(创建);query.filter().first()(查询) |
| 分层架构 | Router(路由)→ Service(业务)→ Repository(数据库),各司其职 |
| Alembic | revision --autogenerate 生成迁移,upgrade head 应用,downgrade 回滚 |
| 最佳实践 | 生产环境用 Alembic 管理表结构,不要用 create_all() |
下一章预告:我们将学习认证与安全——使用 OAuth2 + JWT 实现用户登录,并通过依赖注入实现接口级别的权限控制。