Skip to content

第八章 数据库集成实战

本章目标:掌握 SQLAlchemy + FastAPI 的集成方法,将第二章的内存版 Todo 升级为数据库持久化版本,理解 Repository/Service 分层架构,了解 Alembic 数据库迁移基础。

预计时长:60 分钟


8.1 SQLAlchemy + FastAPI 集成

安装依赖

bash
pip install sqlalchemy

本章使用 SQLite(零配置),生产环境可替换为 PostgreSQL、MySQL 等,只需改连接字符串。

整体架构

FastAPI 应用

├── database.py       ← 数据库引擎 + 会话工厂
├── models.py         ← ORM 模型(对应数据库表)
├── schemas.py        ← Pydantic 模型(请求/响应数据结构)
└── main.py           ← 路由 + 业务逻辑

数据流:

客户端请求 JSON

Pydantic Schema 校验(schemas.py)

路由函数处理业务逻辑(main.py)

SQLAlchemy ORM 操作数据库(models.py)

数据库读写(SQLite / PostgreSQL / MySQL)

ORM 对象 → Pydantic Schema → JSON 响应

第一步:配置数据库连接(database.py)

创建 database.py

python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

SQLALCHEMY_DATABASE_URL = "sqlite:///./todo.db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    connect_args={"check_same_thread": False},  # SQLite 专用参数
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


class Base(DeclarativeBase):
    """所有 ORM 模型的基类"""
    pass

各参数说明:

参数说明
create_engine(url)创建数据库引擎,管理连接池
check_same_thread=False允许 SQLite 跨线程使用(FastAPI 线程池需要)
sessionmaker(...)会话工厂,每次调用生成一个新的数据库会话
autocommit=False手动控制事务提交
autoflush=False手动控制数据刷新到数据库
DeclarativeBaseSQLAlchemy 2.0 推荐的基类写法

连接字符串示例

  • SQLite:sqlite:///./todo.db
  • PostgreSQL:postgresql://user:pass@localhost:5432/dbname
  • MySQL:mysql+pymysql://user:pass@localhost:3306/dbname

第二步:定义 ORM 模型(models.py)

创建 models.py

python
from sqlalchemy import String, Boolean, Integer, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from database import Base
from datetime import datetime


class Todo(Base):
    __tablename__ = "todos"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    title: Mapped[str] = mapped_column(String(200), nullable=False)
    description: Mapped[str | None] = mapped_column(String(500), default=None)
    completed: Mapped[bool] = mapped_column(Boolean, default=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime, server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime, server_default=func.now(), onupdate=func.now()
    )

ORM 模型 vs Pydantic 模型的区别:

维度ORM 模型(models.py)Pydantic 模型(schemas.py)
作用映射数据库表结构定义 API 的请求/响应格式
基类DeclarativeBaseBaseModel
字段写法mapped_column(...)Python 类型注解
包含什么数据库细节(索引、约束、默认值)业务数据(校验规则、示例值)
使用场景数据库 CRUD接口输入/输出

第三步:定义 Pydantic 模型(schemas.py)

创建 schemas.py

python
from pydantic import BaseModel, Field
from datetime import datetime


class TodoCreate(BaseModel):
    """创建 Todo 的请求体"""
    title: str = Field(..., min_length=1, max_length=200, examples=["学习 FastAPI"])
    description: str | None = Field(None, max_length=500, examples=["完成第八章"])
    completed: bool = False


class TodoUpdate(BaseModel):
    """更新 Todo 的请求体(所有字段可选)"""
    title: str | None = Field(None, min_length=1, max_length=200)
    description: str | None = None
    completed: bool | None = None


class TodoResponse(BaseModel):
    """返回给客户端的 Todo"""
    id: int
    title: str
    description: str | None
    completed: bool
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}

关键配置from_attributes = True(Pydantic V2 写法)让 Pydantic 能从 ORM 对象的属性读取数据,而不是只从 dict 读取。没有这个配置,TodoResponse.model_validate(orm_obj) 会报错。

第四步:创建数据库会话依赖

database.py 中添加(也可以单独放到 dependencies.py):

python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase, Session
from typing import Generator

SQLALCHEMY_DATABASE_URL = "sqlite:///./todo.db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    connect_args={"check_same_thread": False},
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


class Base(DeclarativeBase):
    pass


def get_db() -> Generator[Session, None, None]:
    """数据库会话依赖——用 yield 确保请求结束后自动关闭连接"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

这个 get_db 就是第五章学过的 yield 依赖:

请求到达

get_db() 创建数据库会话
  ↓ yield db
路由函数使用 db 执行查询

函数返回响应
  ↓ finally
db.close() 归还连接

8.2 完整 CRUD 实战

Todo 数据库版(main.py)

创建 main.py

python
from fastapi import FastAPI, Depends, HTTPException, Query
from sqlalchemy.orm import Session

from database import engine, get_db, Base
from models import Todo
from schemas import TodoCreate, TodoUpdate, TodoResponse

Base.metadata.create_all(bind=engine)

app = FastAPI(title="Todo API - 数据库版")


@app.post("/todos", response_model=TodoResponse, status_code=201)
def create_todo(todo: TodoCreate, db: Session = Depends(get_db)):
    db_todo = Todo(
        title=todo.title,
        description=todo.description,
        completed=todo.completed,
    )
    db.add(db_todo)
    db.commit()
    db.refresh(db_todo)
    return db_todo


@app.get("/todos", response_model=list[TodoResponse])
def list_todos(
    completed: bool | None = None,
    skip: int = Query(0, ge=0),
    limit: int = Query(10, ge=1, le=100),
    db: Session = Depends(get_db),
):
    query = db.query(Todo)
    if completed is not None:
        query = query.filter(Todo.completed == completed)
    return query.offset(skip).limit(limit).all()


@app.get("/todos/{todo_id}", response_model=TodoResponse)
def get_todo(todo_id: int, db: Session = Depends(get_db)):
    todo = db.query(Todo).filter(Todo.id == todo_id).first()
    if not todo:
        raise HTTPException(status_code=404, detail="Todo 不存在")
    return todo


@app.put("/todos/{todo_id}", response_model=TodoResponse)
def update_todo(
    todo_id: int, todo_update: TodoUpdate, db: Session = Depends(get_db)
):
    todo = db.query(Todo).filter(Todo.id == todo_id).first()
    if not todo:
        raise HTTPException(status_code=404, detail="Todo 不存在")

    update_data = todo_update.model_dump(exclude_unset=True)
    for key, value in update_data.items():
        setattr(todo, key, value)

    db.commit()
    db.refresh(todo)
    return todo


@app.delete("/todos/{todo_id}")
def delete_todo(todo_id: int, db: Session = Depends(get_db)):
    todo = db.query(Todo).filter(Todo.id == todo_id).first()
    if not todo:
        raise HTTPException(status_code=404, detail="Todo 不存在")

    db.delete(todo)
    db.commit()
    return {"message": f"Todo {todo_id} 已删除"}

启动并测试

bash
fastapi dev main.py

启动后自动创建 todo.db 文件。打开 http://127.0.0.1:8000/docs 测试:

步骤操作预期结果
1POST /todos,body: {"title": "学习 SQLAlchemy"}返回带 idcreated_at 的完整 Todo
2POST /todos,body: {"title": "写单元测试", "completed": true}返回 id=2
3GET /todos返回 2 条记录
4GET /todos?completed=false只返回未完成的
5PUT /todos/1,body: {"completed": true}标记完成,updated_at 改变
6DELETE /todos/2删除成功
7重启服务,GET /todos数据依然存在(持久化!)

与第二章内存版的对比

维度第二章(内存版)本章(数据库版)
数据存储Python 字典SQLite 数据库文件
持久化重启丢失永久保存
ID 生成手动 next_id += 1数据库自增主键
查询能力列表遍历SQL 查询(索引、过滤、分页)
并发安全全局变量竞态风险数据库事务保证
时间记录created_atupdated_at 自动管理

CRUD 操作速查

操作SQLAlchemy 代码说明
Createdb.add(obj)db.commit()db.refresh(obj)添加、提交、刷新获取数据库生成的字段
Read 单条db.query(Model).filter(Model.id == id).first()返回对象或 None
Read 列表db.query(Model).offset(n).limit(m).all()分页查询
Updatesetattr(obj, key, value)db.commit()修改属性后提交
Deletedb.delete(obj)db.commit()删除后提交

Repository / Service 分层思路

随着项目增长,把所有逻辑写在路由函数里会导致代码臃肿、难以测试。推荐分层:

请求 → 路由层(Router) → 服务层(Service) → 仓储层(Repository) → 数据库
         ↓                  ↓                  ↓
    参数解析/校验       业务逻辑编排        纯数据库操作
职责示例
Router(路由层)接收请求、参数校验、调用 Service解析请求体、返回响应
Service(服务层)业务逻辑、规则校验、组合多个 Repository"创建 Todo 前检查标题是否重复"
Repository(仓储层)纯数据库操作、封装 SQLfind_by_id()create()delete()

创建 repository.py

python
from sqlalchemy.orm import Session
from models import Todo
from schemas import TodoCreate, TodoUpdate


class TodoRepository:
    """Todo 仓储层——封装所有数据库操作"""

    def __init__(self, db: Session):
        self.db = db

    def get_by_id(self, todo_id: int) -> Todo | None:
        return self.db.query(Todo).filter(Todo.id == todo_id).first()

    def get_list(
        self,
        skip: int = 0,
        limit: int = 10,
        completed: bool | None = None,
    ) -> list[Todo]:
        query = self.db.query(Todo)
        if completed is not None:
            query = query.filter(Todo.completed == completed)
        return query.offset(skip).limit(limit).all()

    def create(self, todo_data: TodoCreate) -> Todo:
        todo = Todo(
            title=todo_data.title,
            description=todo_data.description,
            completed=todo_data.completed,
        )
        self.db.add(todo)
        self.db.commit()
        self.db.refresh(todo)
        return todo

    def update(self, todo: Todo, todo_data: TodoUpdate) -> Todo:
        update_dict = todo_data.model_dump(exclude_unset=True)
        for key, value in update_dict.items():
            setattr(todo, key, value)
        self.db.commit()
        self.db.refresh(todo)
        return todo

    def delete(self, todo: Todo) -> None:
        self.db.delete(todo)
        self.db.commit()

创建 service.py

python
from fastapi import HTTPException
from repository import TodoRepository
from schemas import TodoCreate, TodoUpdate
from models import Todo


class TodoService:
    """Todo 服务层——编排业务逻辑"""

    def __init__(self, repo: TodoRepository):
        self.repo = repo

    def get_todo_or_404(self, todo_id: int) -> Todo:
        todo = self.repo.get_by_id(todo_id)
        if not todo:
            raise HTTPException(status_code=404, detail="Todo 不存在")
        return todo

    def list_todos(
        self, skip: int, limit: int, completed: bool | None
    ) -> list[Todo]:
        return self.repo.get_list(skip=skip, limit=limit, completed=completed)

    def create_todo(self, todo_data: TodoCreate) -> Todo:
        return self.repo.create(todo_data)

    def update_todo(self, todo_id: int, todo_data: TodoUpdate) -> Todo:
        todo = self.get_todo_or_404(todo_id)
        return self.repo.update(todo, todo_data)

    def delete_todo(self, todo_id: int) -> dict:
        todo = self.get_todo_or_404(todo_id)
        self.repo.delete(todo)
        return {"message": f"Todo {todo_id} 已删除"}

重构后的 main.py(分层版):

python
from fastapi import FastAPI, Depends, Query
from sqlalchemy.orm import Session

from database import engine, get_db, Base
from repository import TodoRepository
from service import TodoService
from schemas import TodoCreate, TodoUpdate, TodoResponse

Base.metadata.create_all(bind=engine)

app = FastAPI(title="Todo API - 分层架构版")


def get_todo_service(db: Session = Depends(get_db)) -> TodoService:
    """依赖注入:组装 Service"""
    repo = TodoRepository(db)
    return TodoService(repo)


@app.post("/todos", response_model=TodoResponse, status_code=201)
def create_todo(
    todo: TodoCreate,
    svc: TodoService = Depends(get_todo_service),
):
    return svc.create_todo(todo)


@app.get("/todos", response_model=list[TodoResponse])
def list_todos(
    completed: bool | None = None,
    skip: int = Query(0, ge=0),
    limit: int = Query(10, ge=1, le=100),
    svc: TodoService = Depends(get_todo_service),
):
    return svc.list_todos(skip=skip, limit=limit, completed=completed)


@app.get("/todos/{todo_id}", response_model=TodoResponse)
def get_todo(
    todo_id: int,
    svc: TodoService = Depends(get_todo_service),
):
    return svc.get_todo_or_404(todo_id)


@app.put("/todos/{todo_id}", response_model=TodoResponse)
def update_todo(
    todo_id: int,
    todo_update: TodoUpdate,
    svc: TodoService = Depends(get_todo_service),
):
    return svc.update_todo(todo_id, todo_update)


@app.delete("/todos/{todo_id}")
def delete_todo(
    todo_id: int,
    svc: TodoService = Depends(get_todo_service),
):
    return svc.delete_todo(todo_id)

分层的好处:

不分层(全写在路由里):            分层后:
                                  
@app.post("/todos")               @app.post("/todos")
def create(todo, db):             def create(todo, svc):
    # 校验逻辑                        return svc.create(todo)
    # 业务规则                        ↑ 路由层只做请求/响应
    # 数据库操作                   
    # 错误处理                     Service: 业务逻辑
    # 全混在一起                   Repository: 数据库操作
    # 难以测试                     → 各司其职,容易测试

8.3 数据库迁移简介

为什么需要数据库迁移?

前面我们用 Base.metadata.create_all() 创建表,但它有一个致命问题:

第一次启动 → 创建 todos 表(id, title, completed)         ✅ 正常

需求变更:给 Todo 加一个 priority 字段

再次启动 → create_all() 发现表已存在,什么都不做           ❌ 字段没加上!

create_all() 只能创建不存在的表,不能修改已有表结构。这就是数据库迁移工具的用武之地。

Alembic 简介

Alembic 是 SQLAlchemy 官方推荐的数据库迁移工具,类似于 Django 的 makemigrations / migrate

Alembic 核心流程:

1. init       → 初始化迁移环境
2. revision   → 生成迁移脚本(记录表结构变更)
3. upgrade    → 执行迁移(应用变更到数据库)
4. downgrade  → 回滚迁移(撤销变更)

类比 Git:
  revision  ≈ git commit    记录一次变更
  upgrade   ≈ git pull      应用变更
  downgrade ≈ git revert    撤销变更

安装

bash
pip install alembic

第一步:初始化

bash
alembic init alembic

生成的目录结构:

项目目录/
├── alembic/
│   ├── versions/        ← 迁移脚本存放目录
│   ├── env.py           ← 迁移环境配置(重点修改)
│   ├── script.py.mako   ← 迁移脚本模板
│   └── README
├── alembic.ini          ← Alembic 主配置文件
├── database.py
├── models.py
└── main.py

第二步:配置 Alembic

修改 alembic.ini,设置数据库连接:

ini
# 找到这行,改为你的数据库地址
sqlalchemy.url = sqlite:///./todo.db

修改 alembic/env.py,让 Alembic 能发现你的 ORM 模型:

python
# 在 env.py 顶部添加
import sys
from pathlib import Path

sys.path.append(str(Path(__file__).resolve().parents[1]))

from models import Todo  # noqa: F401  确保模型被导入
from database import Base

# 找到这行,改为:
target_metadata = Base.metadata

第三步:生成迁移脚本

bash
alembic revision --autogenerate -m "create todos table"

Alembic 会对比当前 ORM 模型和数据库实际结构,自动生成迁移脚本。在 alembic/versions/ 下会生成类似这样的文件:

python
"""create todos table

Revision ID: a1b2c3d4e5f6
Revises:
Create Date: 2026-03-24 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa

revision = 'a1b2c3d4e5f6'
down_revision = None


def upgrade() -> None:
    op.create_table(
        'todos',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('title', sa.String(length=200), nullable=False),
        sa.Column('description', sa.String(length=500), nullable=True),
        sa.Column('completed', sa.Boolean(), nullable=False),
        sa.Column('created_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)')),
        sa.Column('updated_at', sa.DateTime(), server_default=sa.text('(CURRENT_TIMESTAMP)')),
        sa.PrimaryKeyConstraint('id'),
    )
    op.create_index(op.f('ix_todos_id'), 'todos', ['id'], unique=False)


def downgrade() -> None:
    op.drop_index(op.f('ix_todos_id'), table_name='todos')
    op.drop_table('todos')

第四步:执行迁移

bash
# 升级到最新版本
alembic upgrade head

# 查看当前版本
alembic current

# 查看迁移历史
alembic history

后续迭代流程

假设需要给 Todo 加一个 priority 字段:

python
# 1. 修改 models.py,添加字段
class Todo(Base):
    __tablename__ = "todos"
    # ... 原有字段 ...
    priority: Mapped[int] = mapped_column(Integer, default=0)
bash
# 2. 生成迁移脚本
alembic revision --autogenerate -m "add priority to todos"

# 3. 执行迁移
alembic upgrade head
bash
# 如果需要回滚
alembic downgrade -1    # 回退一个版本

Alembic 常用命令速查

命令说明
alembic init alembic初始化迁移环境
alembic revision --autogenerate -m "描述"自动生成迁移脚本
alembic upgrade head升级到最新版本
alembic downgrade -1回退一个版本
alembic current查看当前数据库版本
alembic history查看迁移历史
alembic heads查看最新的迁移版本

使用 Alembic 后的项目启动方式

python
# main.py 中不再需要 create_all
# Base.metadata.create_all(bind=engine)  ← 删除这行

# 改用 Alembic 管理表结构:
#   首次部署: alembic upgrade head
#   后续迭代: alembic revision --autogenerate → alembic upgrade head

8.4 动手练习

练习 1:给 Todo 增加搜索功能

main.py 中新增一个按标题关键字搜索的接口:

python
@app.get("/todos/search/", response_model=list[TodoResponse])
def search_todos(
    keyword: str = Query(..., min_length=1, description="搜索关键字"),
    db: Session = Depends(get_db),
):
    """按标题模糊搜索 Todo"""
    todos = db.query(Todo).filter(Todo.title.contains(keyword)).all()
    return todos

测试:先创建几条 Todo,然后访问 GET /todos/search/?keyword=学习,看看能否正确返回。

练习 2:添加统计接口

python
from sqlalchemy import func as sa_func


@app.get("/todos/stats/")
def todo_stats(db: Session = Depends(get_db)):
    """返回 Todo 的统计信息"""
    total = db.query(sa_func.count(Todo.id)).scalar()
    completed = db.query(sa_func.count(Todo.id)).filter(
        Todo.completed == True
    ).scalar()
    return {
        "total": total,
        "completed": completed,
        "pending": total - completed,
        "completion_rate": f"{(completed / total * 100):.1f}%" if total > 0 else "0%",
    }

注意:搜索和统计接口的路径 /todos/search//todos/stats/ 必须写在 /todos/{todo_id} 前面,否则 searchstats 会被当作 todo_id 参数(第二章路径顺序问题)。

练习 3:尝试完整的 Alembic 流程

  1. 删除已有的 todo.db
  2. 执行 alembic init alembic 初始化
  3. 配置 alembic.inialembic/env.py
  4. 执行 alembic revision --autogenerate -m "init" 生成初始迁移
  5. 执行 alembic upgrade head 创建表
  6. 启动 FastAPI,创建几条 Todo 测试
  7. models.pyTodo 加一个 priority: Mapped[int] 字段
  8. 执行 alembic revision --autogenerate -m "add priority"
  9. 执行 alembic upgrade head 应用迁移
  10. 验证新字段已生效

本章小结

概念要点
SQLAlchemy 集成engine 管连接池,SessionLocal 管会话,Base 管模型基类
ORM 模型继承 Base,用 Mapped + mapped_column 定义字段
Pydantic 模型from_attributes = True 让 Pydantic 能读取 ORM 对象属性
数据库会话依赖yield 依赖确保每个请求用完自动关闭连接
CRUD 操作addcommitrefresh(创建);query.filter().first()(查询)
分层架构Router(路由)→ Service(业务)→ Repository(数据库),各司其职
Alembicrevision --autogenerate 生成迁移,upgrade head 应用,downgrade 回滚
最佳实践生产环境用 Alembic 管理表结构,不要用 create_all()

下一章预告:我们将学习认证与安全——使用 OAuth2 + JWT 实现用户登录,并通过依赖注入实现接口级别的权限控制。

坚持是一种品格