第九章 认证与安全
本章目标:掌握 OAuth2 密码模式 + JWT 的完整认证流程,使用 FastAPI 内置安全工具实现用户登录、Token 签发与验证,并实现基于角色的权限控制。
预计时长:50 分钟
9.1 OAuth2 密码模式 + JWT
认证流程概览
┌──────────┐ ┌──────────────┐
│ 客户端 │ 1. POST /token │ FastAPI 服务 │
│(前端/App)│ username + password │ │
│ │ ──────────────────→ │ 验证密码 │
│ │ │ 生成 JWT │
│ │ 2. 返回 access_token │ │
│ │ ←────────────────── │ │
│ │ │ │
│ │ 3. 请求 API │ │
│ │ Authorization: │ 验证 JWT │
│ │ Bearer <token> │ 返回数据 │
│ │ ──────────────────→ │ │
│ │ 4. 返回业务数据 │ │
│ │ ←────────────────── │ │
└──────────┘ └──────────────┘三步走:
| 步骤 | 做什么 | 用什么 |
|---|---|---|
| ① 密码哈希 | 存储用户密码时不存明文,存哈希值 | passlib |
| ② 签发 Token | 用户登录验证通过后,生成 JWT 返回 | python-jose |
| ③ 验证 Token | 后续请求携带 Token,服务端验证有效性 | python-jose |
安装依赖
bash
pip install "python-jose[cryptography]" "passlib[bcrypt]"使用 passlib 做密码哈希
永远不要存储明文密码! 使用 bcrypt 算法对密码进行哈希处理:
python
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
"""将明文密码转为哈希值(用于注册时存储)"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证明文密码是否匹配哈希值(用于登录时校验)"""
return pwd_context.verify(plain_password, hashed_password)
# 测试
hashed = hash_password("my_secret_123")
print(hashed) # $2b$12$LJ3m4ys... (每次不同)
print(verify_password("my_secret_123", hashed)) # True
print(verify_password("wrong_password", hashed)) # False| 函数 | 用途 | 调用时机 |
|---|---|---|
pwd_context.hash() | 明文 → 哈希 | 用户注册 |
pwd_context.verify() | 比较明文与哈希 | 用户登录 |
使用 python-jose 生成/验证 JWT
JWT(JSON Web Token)由三部分组成:
Header.Payload.Signature
eyJhbGc...|eyJzdWI...|SflKxwR...
↓ ↓ ↓
算法信息 载荷数据 数字签名python
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
SECRET_KEY = "your-secret-key-keep-it-safe"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str) -> dict:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 测试
token = create_access_token({"sub": "zhangsan", "role": "admin"})
print(token) # eyJhbGciOiJIUzI1NiIs...
payload = decode_access_token(token)
print(payload) # {'sub': 'zhangsan', 'role': 'admin', 'exp': 1700000000}| 参数 | 说明 |
|---|---|
sub(subject) | 标识用户身份,通常是用户名或用户 ID |
exp(expiration) | Token 过期时间,过期后自动失效 |
SECRET_KEY | 签名密钥,绝不能泄露 |
ALGORITHM | 签名算法,HS256 是常用选择 |
9.2 FastAPI 的安全工具
完整认证示例
下面把所有部分整合为一个完整可运行的示例:
python
from datetime import datetime, timedelta, timezone
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel
# ============ 配置 ============
SECRET_KEY = "a-very-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# ============ 密码工具 ============
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ============ OAuth2 方案声明 ============
# tokenUrl 告诉 Swagger UI 登录表单应该 POST 到哪个接口
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# ============ 模拟数据库 ============
fake_users_db = {
"zhangsan": {
"username": "zhangsan",
"hashed_password": pwd_context.hash("123456"),
"role": "admin",
},
"lisi": {
"username": "lisi",
"hashed_password": pwd_context.hash("abcdef"),
"role": "user",
},
}
# ============ 数据模型 ============
class Token(BaseModel):
access_token: str
token_type: str
class User(BaseModel):
username: str
role: str
# ============ 工具函数 ============
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def authenticate_user(username: str, password: str) -> dict | None:
user = fake_users_db.get(username)
if not user or not verify_password(password, user["hashed_password"]):
return None
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# ============ 核心依赖:获取当前用户 ============
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = fake_users_db.get(username)
if user is None:
raise credentials_exception
return User(username=user["username"], role=user["role"])
# ============ 应用 ============
app = FastAPI(title="认证示例")
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""用户登录,返回 JWT Token"""
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(
data={"sub": user["username"], "role": user["role"]},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
"""获取当前登录用户信息(需要 Token)"""
return current_user关键组件说明
| 组件 | 作用 |
|---|---|
OAuth2PasswordBearer(tokenUrl="token") | 声明认证方案,告诉 Swagger UI 在哪里登录。注入到依赖中时,自动从请求头提取 Authorization: Bearer <token> |
OAuth2PasswordRequestForm | 标准的 OAuth2 登录表单,包含 username 和 password 字段,作为 /token 接口的参数 |
get_current_user | 核心依赖函数。解码 Token → 查找用户 → 返回用户对象。任何需要认证的接口,加上 Depends(get_current_user) 即可 |
认证流程调用链
请求 GET /users/me
↓
Header: Authorization: Bearer eyJhbG...
↓
oauth2_scheme 提取 token 字符串
↓
get_current_user(token) 解码并验证
↓
返回 User 对象注入到路由函数在 Swagger UI 中测试
- 启动服务:
fastapi dev main.py - 打开 http://127.0.0.1:8000/docs
- 点击右上角 "Authorize" 按钮
- 输入
zhangsan/123456,点击登录 - 此后所有请求自动携带 Token
- 测试
GET /users/me接口
9.3 权限控制
基于角色的权限控制
在 get_current_user 基础上,创建角色检查依赖:
python
from functools import wraps
def require_role(required_role: str):
"""角色权限依赖工厂:生成检查特定角色的依赖函数"""
async def role_checker(current_user: User = Depends(get_current_user)) -> User:
if current_user.role != required_role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"权限不足,需要 {required_role} 角色",
)
return current_user
return role_checker
# ============ 使用示例 ============
@app.get("/admin/dashboard")
async def admin_dashboard(user: User = Depends(require_role("admin"))):
"""仅管理员可访问"""
return {"message": f"欢迎管理员 {user.username}", "data": "管理后台敏感数据"}
@app.get("/user/profile")
async def user_profile(user: User = Depends(require_role("user"))):
"""仅普通用户可访问"""
return {"message": f"你好 {user.username}", "profile": "个人资料信息"}
@app.get("/common/info")
async def common_info(user: User = Depends(get_current_user)):
"""任何已登录用户均可访问"""
return {"message": f"你好 {user.username},你的角色是 {user.role}"}权限控制效果
| 接口 | zhangsan(admin) | lisi(user) | 未登录 |
|---|---|---|---|
GET /admin/dashboard | ✅ 200 | ❌ 403 | ❌ 401 |
GET /user/profile | ❌ 403 | ✅ 200 | ❌ 401 |
GET /common/info | ✅ 200 | ✅ 200 | ❌ 401 |
多角色权限
如果需要支持多角色访问,稍作修改:
python
def require_roles(*allowed_roles: str):
"""允许多个角色访问"""
async def role_checker(current_user: User = Depends(get_current_user)) -> User:
if current_user.role not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"权限不足,需要以下角色之一:{', '.join(allowed_roles)}",
)
return current_user
return role_checker
@app.get("/reports")
async def view_reports(user: User = Depends(require_roles("admin", "manager"))):
"""管理员和经理均可查看报表"""
return {"reports": ["月度报表", "年度报表"]}9.4 动手练习
练习 1:完整认证系统
将上面的代码整合到一个 main.py 中运行,依次完成:
- 启动服务,用 Swagger UI 的 Authorize 按钮登录
zhangsan / 123456 - 请求
GET /users/me,确认返回用户信息 - 请求
GET /admin/dashboard,确认管理员可以访问 - 切换为
lisi / abcdef登录,再请求GET /admin/dashboard,观察 403 错误
练习 2:添加 Token 刷新接口
在现有代码基础上,新增一个 POST /token/refresh 接口:
python
@app.post("/token/refresh", response_model=Token)
async def refresh_token(current_user: User = Depends(get_current_user)):
"""用旧 Token 换新 Token(延长登录有效期)"""
new_token = create_access_token(
data={"sub": current_user.username, "role": current_user.role},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
)
return Token(access_token=new_token, token_type="bearer")练习 3:用户注册接口
新增 POST /register 接口,接收用户名和密码,将新用户添加到 fake_users_db 中(默认角色为 user)。
提示:
python
class UserCreate(BaseModel):
username: str
password: str
@app.post("/register", response_model=User)
async def register(user_data: UserCreate):
if user_data.username in fake_users_db:
raise HTTPException(status_code=400, detail="用户名已存在")
fake_users_db[user_data.username] = {
"username": user_data.username,
"hashed_password": pwd_context.hash(user_data.password),
"role": "user",
}
return User(username=user_data.username, role="user")本章小结
| 概念 | 要点 |
|---|---|
| 密码哈希 | 使用 passlib + bcrypt,永远不存明文密码 |
| JWT 生成 | python-jose 的 jwt.encode(),包含 sub 和 exp 字段 |
| JWT 验证 | jwt.decode() 自动检查签名和过期时间 |
OAuth2PasswordBearer | 声明认证方案,自动从请求头提取 Bearer Token |
OAuth2PasswordRequestForm | 标准登录表单(username + password) |
get_current_user | 核心认证依赖,解码 Token 并返回用户对象 |
| 权限控制 | 依赖工厂函数 require_role() 实现角色检查 |
| Swagger 集成 | 声明 tokenUrl 后,Swagger UI 自动出现 Authorize 按钮 |
下一章预告:随着代码越来越多,全部塞在一个文件里会难以维护。下一章我们将学习如何用 APIRouter 拆分路由,组织规范的项目结构,并使用 BaseSettings 管理配置。