第七章 异步与性能
本章目标:理解 Python 异步编程的核心概念,掌握 FastAPI 中
async def与def的正确使用场景,了解异步数据库操作方案。预计时长:30 分钟
7.1 async/await 基础回顾
同步 vs 异步
同步执行(Synchronous):任务排队,一个做完再做下一个
任务A ██████████
任务B ██████████
任务C ██████████
总耗时 ─────────────────────────────→ 30s
异步执行(Asynchronous):等待时切换去做别的事
任务A ████░░░░████
任务B ████░░░░████
任务C ████░░░░████
总耗时 ────────────────→ 16s
█ = CPU 计算 ░ = 等待 I/O(网络、磁盘、数据库)核心思想:Web 请求的时间大多花在「等待」上(等数据库返回、等外部 API 响应)。异步编程让 CPU 在等待时去服务其他请求,而不是干等着。
生活类比
| 场景 | 同步做法 | 异步做法 |
|---|---|---|
| 煮咖啡 + 烤面包 | 先煮咖啡 3 分钟,再烤面包 2 分钟 → 共 5 分钟 | 同时启动咖啡机和烤面包机 → 共 3 分钟 |
| 处理 3 个 API 请求 | 每个等数据库 100ms,串行 → 300ms | 3 个并发等待 → 约 100ms |
| 餐厅服务员点餐 | 等一桌做完菜才去下一桌 → 效率极低 | 收完所有桌的单,厨房并行做 → 效率极高 |
Python 的 async/await 语法
import asyncio
def sync_task():
"""同步函数:阻塞整个线程"""
import time
time.sleep(1)
return "sync done"
async def async_task():
"""异步函数(协程):不阻塞,让出控制权"""
await asyncio.sleep(1)
return "async done"
async def main():
"""并发执行多个异步任务"""
results = await asyncio.gather(
async_task(),
async_task(),
async_task(),
)
print(results) # 约 1 秒完成,而非 3 秒
asyncio.run(main())关键概念速查
| 概念 | 说明 |
|---|---|
async def | 定义协程函数,调用它返回协程对象 |
await | 等待协程完成,同时让出控制权给事件循环 |
| 事件循环(Event Loop) | 调度器,决定当前执行哪个协程 |
asyncio.gather() | 并发运行多个协程 |
asyncio.sleep() | 异步版 time.sleep(),不阻塞事件循环 |
async/await 执行流程
事件循环 (Event Loop)
│
├─ 协程A: await asyncio.sleep(1)
│ └─ "我要等 1 秒,先让别人用 CPU"
│ ↓ 控制权交回事件循环
├─ 协程B: await asyncio.sleep(1)
│ └─ "我也要等 1 秒,先让别人用 CPU"
│ ↓ 控制权交回事件循环
├─ 协程C: 执行纯计算...
│ ↓ 计算完毕
│
│ ... 1 秒后 ...
│
├─ 协程A: sleep 结束,继续执行
├─ 协程B: sleep 结束,继续执行
└─ 三个协程全部完成,总耗时 ≈ 1 秒7.2 FastAPI 的异步处理机制
async def vs def:怎么选?
这是 FastAPI 初学者最常见的困惑。决策树:
你的函数内部要做什么?
│
├── 调用 await 异步操作(异步数据库、httpx、aiofiles...)
│ └── 用 async def ✅
│
├── 调用同步阻塞操作(普通数据库驱动、requests、open()...)
│ └── 用普通 def ✅ ← FastAPI 自动放到线程池
│
└── 纯计算,不涉及 I/O
└── 用普通 def 或 async def 都行 ✅| 场景 | 推荐写法 | 原因 |
|---|---|---|
调用 await 异步库 | async def | 充分利用异步,不浪费线程 |
调用 requests.get() 等同步 I/O | def(普通函数) | FastAPI 自动在线程池中运行 |
| 纯计算(字符串处理等) | def 或 async def | 差别不大 |
错误:async def 里调 time.sleep() | ❌ 阻塞整个事件循环 | 用 await asyncio.sleep() 或改普通 def |
代码示例
创建 chapter07_async.py:
from fastapi import FastAPI
import asyncio
import time
app = FastAPI(title="异步示例")
@app.get("/async-io")
async def async_io_example():
"""正确:异步函数中用 await"""
await asyncio.sleep(1)
return {"method": "async", "message": "非阻塞等待 1 秒"}
@app.get("/sync-io")
def sync_io_example():
"""正确:同步函数,FastAPI 自动放到线程池"""
time.sleep(1)
return {"method": "sync", "message": "线程池中等待 1 秒"}
@app.get("/bad-example")
async def bad_example():
"""错误:async 函数里用同步阻塞"""
time.sleep(1) # 阻塞事件循环!其他所有请求都要等
return {"method": "bad", "message": "千万别这样写"}fastapi dev chapter07_async.pyFastAPI 对 def 的线程池处理
请求进入 FastAPI
│
├── 路由函数是 async def
│ └── 直接在事件循环(主线程)中执行
│ → 必须确保内部不会同步阻塞
│
└── 路由函数是普通 def
└── 自动提交到线程池执行
→ 等效于 loop.run_in_executor(None, func)
→ 即使有同步阻塞也不会影响其他请求不确定用哪个?用普通 def 更安全。 FastAPI 会自动处理线程池调度。
I/O 密集型 vs CPU 密集型
| 类型 | 特点 | 示例 | 推荐方案 |
|---|---|---|---|
| I/O 密集型 | 大量时间等外部响应 | 数据库查询、HTTP 请求、文件读写 | async def + 异步库 |
| CPU 密集型 | 大量时间做计算 | 图像处理、数据分析、加密运算 | 多进程 / 任务队列 |
CPU 密集型任务的正确处理方式:
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
app = FastAPI()
def heavy_computation(n: int) -> int:
"""模拟 CPU 密集型任务"""
total = 0
for i in range(n * 1000000):
total += i
return total
@app.get("/compute/threadpool/{n}")
async def compute_with_threadpool(n: int):
"""方案 1:放到线程池(适合中等计算量)"""
result = await run_in_threadpool(heavy_computation, n)
return {"result": result}
@app.get("/compute/sync/{n}")
def compute_sync(n: int):
"""方案 2:直接用普通 def(FastAPI 自动线程池)"""
result = heavy_computation(n)
return {"result": result}生产建议:真正耗时的 CPU 任务(视频转码、大数据处理),推荐用 Celery、RQ 等任务队列异步执行,不要在请求中同步等待。
并发性能对比实验
创建 perf_test.py:
from fastapi import FastAPI
import asyncio
import time
app = FastAPI()
@app.get("/sync")
def sync_endpoint():
time.sleep(0.1)
return {"type": "sync"}
@app.get("/async")
async def async_endpoint():
await asyncio.sleep(0.1)
return {"type": "async"}用以下脚本做基准测试(需先 pip install httpx):
import httpx
import asyncio
import time
async def benchmark(url: str, total: int = 50):
async with httpx.AsyncClient() as client:
start = time.time()
tasks = [client.get(url) for _ in range(total)]
await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"{url} → {total} 请求耗时: {elapsed:.2f}s")
async def main():
print("启动测试(请先启动 perf_test.py)...")
await benchmark("http://127.0.0.1:8000/async")
await benchmark("http://127.0.0.1:8000/sync")
asyncio.run(main())典型结果(50 个并发请求,每个等待 100ms):
| 端点 | 50 请求总耗时 | 说明 |
|---|---|---|
/async | ~0.15s | 所有请求并发等待,接近单次耗时 |
/sync | ~0.65s | 线程池默认 40 线程,需分批执行 |
7.3 异步数据库操作简介
数据库操作是最常见的 I/O 操作。使用异步驱动,可以让查询不阻塞事件循环。
主流异步方案一览
| 方案 | 说明 | 适合场景 |
|---|---|---|
| SQLAlchemy 2.0 async | 主流 ORM 的异步扩展 | 已有 SQLAlchemy 经验,大型项目 |
| SQLModel | FastAPI 作者开发,SQLAlchemy + Pydantic 融合 | FastAPI 项目首选 |
| Tortoise ORM | 受 Django ORM 启发的异步 ORM | 喜欢 Django 风格 |
| databases | 轻量异步数据库库 | 简单查询,不需要完整 ORM |
| asyncpg | PostgreSQL 异步驱动(最快) | 极致性能需求 |
| aiosqlite | SQLite 异步驱动 | 开发测试,小型项目 |
SQLAlchemy 2.0 异步示例(预览)
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import select
engine = create_async_engine("sqlite+aiosqlite:///./async_demo.db")
async_session = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
class Item(Base):
__tablename__ = "items"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_items():
async with async_session() as session:
result = await session.execute(select(Item))
return result.scalars().all()Tortoise ORM 示例(预览)
from tortoise import fields, models
from tortoise.contrib.fastapi import register_tortoise
from fastapi import FastAPI
app = FastAPI()
class Todo(models.Model):
id = fields.IntField(pk=True)
title = fields.CharField(max_length=200)
done = fields.BooleanField(default=False)
class Meta:
table = "todos"
register_tortoise(
app,
db_url="sqlite://./tortoise_demo.db",
modules={"models": ["__main__"]},
generate_schemas=True,
)
@app.get("/todos")
async def get_todos():
return await Todo.all()同步 vs 异步数据库对比
同步数据库(psycopg2 等)+ 普通 def:
请求1 ████[等DB]████ ← 占用线程池中的一个线程
请求2 ████[等DB]████
→ 受线程池大小限制(默认 40 线程)
异步数据库(asyncpg 等)+ async def:
请求1 ████[等DB]████
请求2 ████[等DB]████
请求3 ████[等DB]████
→ 单线程处理数千并发,不受线程池限制| 维度 | 同步方案 | 异步方案 |
|---|---|---|
| 并发上限 | 受线程池大小限制 | 理论上无上限(受内存限制) |
| 资源占用 | 每连接一个线程 | 单线程多路复用 |
| 代码复杂度 | 简单直观 | 需要理解 async/await |
| 生态成熟度 | 非常成熟 | 日趋成熟 |
| 适合场景 | 中小并发 | 高并发场景 |
下一章我们会用同步 SQLAlchemy 完成一个完整的数据库 CRUD 项目。掌握基础后,切换到异步版本只需改几行配置。
7.4 动手练习
练习 1:验证 async def 中使用 time.sleep() 的危害
创建 async_test.py:
from fastapi import FastAPI
import time
import asyncio
app = FastAPI()
@app.get("/bad/{seconds}")
async def bad_sleep(seconds: int):
"""错误示范:async 中用 time.sleep"""
time.sleep(seconds)
return {"slept": seconds, "method": "time.sleep (BAD)"}
@app.get("/good/{seconds}")
async def good_sleep(seconds: int):
"""正确示范:async 中用 asyncio.sleep"""
await asyncio.sleep(seconds)
return {"slept": seconds, "method": "asyncio.sleep (GOOD)"}fastapi dev async_test.py测试方法——同时在两个浏览器标签页快速打开:
- 先打开
http://127.0.0.1:8000/bad/3,立刻打开http://127.0.0.1:8000/bad/3- 第二个请求要等 6 秒(被第一个阻塞了)
- 先打开
http://127.0.0.1:8000/good/3,立刻打开http://127.0.0.1:8000/good/3- 两个请求都只要 3 秒(并发执行)
练习 2:用 run_in_threadpool 包装同步第三方库
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
import time
app = FastAPI()
def slow_sync_operation() -> str:
"""模拟只有同步版本的第三方库"""
time.sleep(2)
return "操作完成"
@app.get("/safe")
async def safe_endpoint():
"""用 run_in_threadpool 避免阻塞事件循环"""
result = await run_in_threadpool(slow_sync_operation)
return {"result": result}思考题:上面的 /safe 接口如果改成普通 def(去掉 async 和 run_in_threadpool),效果一样吗?为什么?
本章小结
| 概念 | 要点 |
|---|---|
| async/await | 异步编程核心语法,await 让出控制权给事件循环 |
async def 路由 | 直接在事件循环执行,适合调用异步库 |
普通 def 路由 | FastAPI 自动放到线程池,不阻塞事件循环 |
| 黄金法则 | async def 里禁止使用同步阻塞操作 |
| I/O 密集型 | 用 async def + 异步库获得最佳并发 |
| CPU 密集型 | 用 run_in_threadpool、多进程或任务队列 |
| 异步数据库 | SQLAlchemy async、Tortoise ORM、SQLModel 等方案 |
| 安全选择 | 不确定时用普通 def,FastAPI 自动处理线程池 |
下一章预告:我们将使用 SQLAlchemy + FastAPI 完成数据库集成,把之前内存版的 Todo 应用升级为数据库持久化版本,并学习 Repository/Service 分层架构和 Alembic 数据库迁移。