Skip to content

第七章 异步与性能

本章目标:理解 Python 异步编程的核心概念,掌握 FastAPI 中 async defdef 的正确使用场景,了解异步数据库操作方案。

预计时长:30 分钟


7.1 async/await 基础回顾

同步 vs 异步

同步执行(Synchronous):任务排队,一个做完再做下一个
  任务A ██████████
  任务B           ██████████
  任务C                     ██████████
  总耗时 ─────────────────────────────→ 30s

异步执行(Asynchronous):等待时切换去做别的事
  任务A ████░░░░████
  任务B     ████░░░░████
  任务C         ████░░░░████
  总耗时 ────────────────→ 16s

█ = CPU 计算    ░ = 等待 I/O(网络、磁盘、数据库)

核心思想:Web 请求的时间大多花在「等待」上(等数据库返回、等外部 API 响应)。异步编程让 CPU 在等待时去服务其他请求,而不是干等着。

生活类比

场景同步做法异步做法
煮咖啡 + 烤面包先煮咖啡 3 分钟,再烤面包 2 分钟 → 共 5 分钟同时启动咖啡机和烤面包机 → 共 3 分钟
处理 3 个 API 请求每个等数据库 100ms,串行 → 300ms3 个并发等待 → 约 100ms
餐厅服务员点餐等一桌做完菜才去下一桌 → 效率极低收完所有桌的单,厨房并行做 → 效率极高

Python 的 async/await 语法

python
import asyncio


def sync_task():
    """同步函数:阻塞整个线程"""
    import time
    time.sleep(1)
    return "sync done"


async def async_task():
    """异步函数(协程):不阻塞,让出控制权"""
    await asyncio.sleep(1)
    return "async done"


async def main():
    """并发执行多个异步任务"""
    results = await asyncio.gather(
        async_task(),
        async_task(),
        async_task(),
    )
    print(results)  # 约 1 秒完成,而非 3 秒


asyncio.run(main())

关键概念速查

概念说明
async def定义协程函数,调用它返回协程对象
await等待协程完成,同时让出控制权给事件循环
事件循环(Event Loop)调度器,决定当前执行哪个协程
asyncio.gather()并发运行多个协程
asyncio.sleep()异步版 time.sleep(),不阻塞事件循环

async/await 执行流程

事件循环 (Event Loop)

├─ 协程A: await asyncio.sleep(1)
│   └─ "我要等 1 秒,先让别人用 CPU"
│        ↓ 控制权交回事件循环
├─ 协程B: await asyncio.sleep(1)
│   └─ "我也要等 1 秒,先让别人用 CPU"
│        ↓ 控制权交回事件循环
├─ 协程C: 执行纯计算...
│        ↓ 计算完毕

│  ... 1 秒后 ...

├─ 协程A: sleep 结束,继续执行
├─ 协程B: sleep 结束,继续执行
└─ 三个协程全部完成,总耗时 ≈ 1 秒

7.2 FastAPI 的异步处理机制

async def vs def:怎么选?

这是 FastAPI 初学者最常见的困惑。决策树:

你的函数内部要做什么?

├── 调用 await 异步操作(异步数据库、httpx、aiofiles...)
│   └── 用 async def ✅

├── 调用同步阻塞操作(普通数据库驱动、requests、open()...)
│   └── 用普通 def ✅  ← FastAPI 自动放到线程池

└── 纯计算,不涉及 I/O
    └── 用普通 def 或 async def 都行 ✅
场景推荐写法原因
调用 await 异步库async def充分利用异步,不浪费线程
调用 requests.get() 等同步 I/Odef(普通函数)FastAPI 自动在线程池中运行
纯计算(字符串处理等)defasync def差别不大
错误async def 里调 time.sleep()❌ 阻塞整个事件循环await asyncio.sleep() 或改普通 def

代码示例

创建 chapter07_async.py

python
from fastapi import FastAPI
import asyncio
import time

app = FastAPI(title="异步示例")


@app.get("/async-io")
async def async_io_example():
    """正确:异步函数中用 await"""
    await asyncio.sleep(1)
    return {"method": "async", "message": "非阻塞等待 1 秒"}


@app.get("/sync-io")
def sync_io_example():
    """正确:同步函数,FastAPI 自动放到线程池"""
    time.sleep(1)
    return {"method": "sync", "message": "线程池中等待 1 秒"}


@app.get("/bad-example")
async def bad_example():
    """错误:async 函数里用同步阻塞"""
    time.sleep(1)  # 阻塞事件循环!其他所有请求都要等
    return {"method": "bad", "message": "千万别这样写"}
bash
fastapi dev chapter07_async.py

FastAPI 对 def 的线程池处理

请求进入 FastAPI

├── 路由函数是 async def
│   └── 直接在事件循环(主线程)中执行
│       → 必须确保内部不会同步阻塞

└── 路由函数是普通 def
    └── 自动提交到线程池执行
        → 等效于 loop.run_in_executor(None, func)
        → 即使有同步阻塞也不会影响其他请求

不确定用哪个?用普通 def 更安全。 FastAPI 会自动处理线程池调度。

I/O 密集型 vs CPU 密集型

类型特点示例推荐方案
I/O 密集型大量时间等外部响应数据库查询、HTTP 请求、文件读写async def + 异步库
CPU 密集型大量时间做计算图像处理、数据分析、加密运算多进程 / 任务队列

CPU 密集型任务的正确处理方式:

python
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool

app = FastAPI()


def heavy_computation(n: int) -> int:
    """模拟 CPU 密集型任务"""
    total = 0
    for i in range(n * 1000000):
        total += i
    return total


@app.get("/compute/threadpool/{n}")
async def compute_with_threadpool(n: int):
    """方案 1:放到线程池(适合中等计算量)"""
    result = await run_in_threadpool(heavy_computation, n)
    return {"result": result}


@app.get("/compute/sync/{n}")
def compute_sync(n: int):
    """方案 2:直接用普通 def(FastAPI 自动线程池)"""
    result = heavy_computation(n)
    return {"result": result}

生产建议:真正耗时的 CPU 任务(视频转码、大数据处理),推荐用 Celery、RQ 等任务队列异步执行,不要在请求中同步等待。

并发性能对比实验

创建 perf_test.py

python
from fastapi import FastAPI
import asyncio
import time

app = FastAPI()


@app.get("/sync")
def sync_endpoint():
    time.sleep(0.1)
    return {"type": "sync"}


@app.get("/async")
async def async_endpoint():
    await asyncio.sleep(0.1)
    return {"type": "async"}

用以下脚本做基准测试(需先 pip install httpx):

python
import httpx
import asyncio
import time


async def benchmark(url: str, total: int = 50):
    async with httpx.AsyncClient() as client:
        start = time.time()
        tasks = [client.get(url) for _ in range(total)]
        await asyncio.gather(*tasks)
        elapsed = time.time() - start
        print(f"{url}{total} 请求耗时: {elapsed:.2f}s")


async def main():
    print("启动测试(请先启动 perf_test.py)...")
    await benchmark("http://127.0.0.1:8000/async")
    await benchmark("http://127.0.0.1:8000/sync")


asyncio.run(main())

典型结果(50 个并发请求,每个等待 100ms):

端点50 请求总耗时说明
/async~0.15s所有请求并发等待,接近单次耗时
/sync~0.65s线程池默认 40 线程,需分批执行

7.3 异步数据库操作简介

数据库操作是最常见的 I/O 操作。使用异步驱动,可以让查询不阻塞事件循环。

主流异步方案一览

方案说明适合场景
SQLAlchemy 2.0 async主流 ORM 的异步扩展已有 SQLAlchemy 经验,大型项目
SQLModelFastAPI 作者开发,SQLAlchemy + Pydantic 融合FastAPI 项目首选
Tortoise ORM受 Django ORM 启发的异步 ORM喜欢 Django 风格
databases轻量异步数据库库简单查询,不需要完整 ORM
asyncpgPostgreSQL 异步驱动(最快)极致性能需求
aiosqliteSQLite 异步驱动开发测试,小型项目

SQLAlchemy 2.0 异步示例(预览)

python
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import select

engine = create_async_engine("sqlite+aiosqlite:///./async_demo.db")
async_session = async_sessionmaker(engine, expire_on_commit=False)


class Base(DeclarativeBase):
    pass


class Item(Base):
    __tablename__ = "items"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]


async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)


async def get_items():
    async with async_session() as session:
        result = await session.execute(select(Item))
        return result.scalars().all()

Tortoise ORM 示例(预览)

python
from tortoise import fields, models
from tortoise.contrib.fastapi import register_tortoise
from fastapi import FastAPI

app = FastAPI()


class Todo(models.Model):
    id = fields.IntField(pk=True)
    title = fields.CharField(max_length=200)
    done = fields.BooleanField(default=False)

    class Meta:
        table = "todos"


register_tortoise(
    app,
    db_url="sqlite://./tortoise_demo.db",
    modules={"models": ["__main__"]},
    generate_schemas=True,
)


@app.get("/todos")
async def get_todos():
    return await Todo.all()

同步 vs 异步数据库对比

同步数据库(psycopg2 等)+ 普通 def:
  请求1  ████[等DB]████       ← 占用线程池中的一个线程
  请求2      ████[等DB]████
  → 受线程池大小限制(默认 40 线程)

异步数据库(asyncpg 等)+ async def:
  请求1  ████[等DB]████
  请求2    ████[等DB]████
  请求3      ████[等DB]████
  → 单线程处理数千并发,不受线程池限制
维度同步方案异步方案
并发上限受线程池大小限制理论上无上限(受内存限制)
资源占用每连接一个线程单线程多路复用
代码复杂度简单直观需要理解 async/await
生态成熟度非常成熟日趋成熟
适合场景中小并发高并发场景

下一章我们会用同步 SQLAlchemy 完成一个完整的数据库 CRUD 项目。掌握基础后,切换到异步版本只需改几行配置。


7.4 动手练习

练习 1:验证 async def 中使用 time.sleep() 的危害

创建 async_test.py

python
from fastapi import FastAPI
import time
import asyncio

app = FastAPI()


@app.get("/bad/{seconds}")
async def bad_sleep(seconds: int):
    """错误示范:async 中用 time.sleep"""
    time.sleep(seconds)
    return {"slept": seconds, "method": "time.sleep (BAD)"}


@app.get("/good/{seconds}")
async def good_sleep(seconds: int):
    """正确示范:async 中用 asyncio.sleep"""
    await asyncio.sleep(seconds)
    return {"slept": seconds, "method": "asyncio.sleep (GOOD)"}
bash
fastapi dev async_test.py

测试方法——同时在两个浏览器标签页快速打开:

  1. 先打开 http://127.0.0.1:8000/bad/3,立刻打开 http://127.0.0.1:8000/bad/3
    • 第二个请求要等 6 秒(被第一个阻塞了)
  2. 先打开 http://127.0.0.1:8000/good/3,立刻打开 http://127.0.0.1:8000/good/3
    • 两个请求都只要 3 秒(并发执行)

练习 2:用 run_in_threadpool 包装同步第三方库

python
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
import time

app = FastAPI()


def slow_sync_operation() -> str:
    """模拟只有同步版本的第三方库"""
    time.sleep(2)
    return "操作完成"


@app.get("/safe")
async def safe_endpoint():
    """用 run_in_threadpool 避免阻塞事件循环"""
    result = await run_in_threadpool(slow_sync_operation)
    return {"result": result}

思考题:上面的 /safe 接口如果改成普通 def(去掉 asyncrun_in_threadpool),效果一样吗?为什么?


本章小结

概念要点
async/await异步编程核心语法,await 让出控制权给事件循环
async def 路由直接在事件循环执行,适合调用异步库
普通 def 路由FastAPI 自动放到线程池,不阻塞事件循环
黄金法则async def禁止使用同步阻塞操作
I/O 密集型async def + 异步库获得最佳并发
CPU 密集型run_in_threadpool、多进程或任务队列
异步数据库SQLAlchemy async、Tortoise ORM、SQLModel 等方案
安全选择不确定时用普通 def,FastAPI 自动处理线程池

下一章预告:我们将使用 SQLAlchemy + FastAPI 完成数据库集成,把之前内存版的 Todo 应用升级为数据库持久化版本,并学习 Repository/Service 分层架构和 Alembic 数据库迁移。

坚持是一种品格