Skip to content

第九章 认证与安全

本章目标:掌握 OAuth2 密码模式 + JWT 的完整认证流程,使用 FastAPI 内置安全工具实现用户登录、Token 签发与验证,并实现基于角色的权限控制。

预计时长:50 分钟


9.1 OAuth2 密码模式 + JWT

认证流程概览

┌──────────┐                      ┌──────────────┐
│  客户端   │  1. POST /token      │  FastAPI 服务 │
│(前端/App)│  username + password  │              │
│          │ ──────────────────→  │  验证密码     │
│          │                      │  生成 JWT     │
│          │  2. 返回 access_token │              │
│          │ ←────────────────── │              │
│          │                      │              │
│          │  3. 请求 API          │              │
│          │  Authorization:      │  验证 JWT     │
│          │  Bearer <token>      │  返回数据     │
│          │ ──────────────────→  │              │
│          │  4. 返回业务数据      │              │
│          │ ←────────────────── │              │
└──────────┘                      └──────────────┘

三步走

步骤做什么用什么
① 密码哈希存储用户密码时不存明文,存哈希值passlib
② 签发 Token用户登录验证通过后,生成 JWT 返回python-jose
③ 验证 Token后续请求携带 Token,服务端验证有效性python-jose

安装依赖

bash
pip install "python-jose[cryptography]" "passlib[bcrypt]"

使用 passlib 做密码哈希

永远不要存储明文密码! 使用 bcrypt 算法对密码进行哈希处理:

python
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    """将明文密码转为哈希值(用于注册时存储)"""
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    """验证明文密码是否匹配哈希值(用于登录时校验)"""
    return pwd_context.verify(plain_password, hashed_password)


# 测试
hashed = hash_password("my_secret_123")
print(hashed)  # $2b$12$LJ3m4ys... (每次不同)
print(verify_password("my_secret_123", hashed))  # True
print(verify_password("wrong_password", hashed))  # False
函数用途调用时机
pwd_context.hash()明文 → 哈希用户注册
pwd_context.verify()比较明文与哈希用户登录

使用 python-jose 生成/验证 JWT

JWT(JSON Web Token)由三部分组成:

Header.Payload.Signature
eyJhbGc...|eyJzdWI...|SflKxwR...
   ↓           ↓          ↓
  算法信息    载荷数据    数字签名
python
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError

SECRET_KEY = "your-secret-key-keep-it-safe"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def decode_access_token(token: str) -> dict:
    return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])


# 测试
token = create_access_token({"sub": "zhangsan", "role": "admin"})
print(token)  # eyJhbGciOiJIUzI1NiIs...

payload = decode_access_token(token)
print(payload)  # {'sub': 'zhangsan', 'role': 'admin', 'exp': 1700000000}
参数说明
sub(subject)标识用户身份,通常是用户名或用户 ID
exp(expiration)Token 过期时间,过期后自动失效
SECRET_KEY签名密钥,绝不能泄露
ALGORITHM签名算法,HS256 是常用选择

9.2 FastAPI 的安全工具

完整认证示例

下面把所有部分整合为一个完整可运行的示例:

python
from datetime import datetime, timedelta, timezone

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel

# ============ 配置 ============
SECRET_KEY = "a-very-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# ============ 密码工具 ============
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# ============ OAuth2 方案声明 ============
# tokenUrl 告诉 Swagger UI 登录表单应该 POST 到哪个接口
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# ============ 模拟数据库 ============
fake_users_db = {
    "zhangsan": {
        "username": "zhangsan",
        "hashed_password": pwd_context.hash("123456"),
        "role": "admin",
    },
    "lisi": {
        "username": "lisi",
        "hashed_password": pwd_context.hash("abcdef"),
        "role": "user",
    },
}


# ============ 数据模型 ============
class Token(BaseModel):
    access_token: str
    token_type: str


class User(BaseModel):
    username: str
    role: str


# ============ 工具函数 ============
def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)


def authenticate_user(username: str, password: str) -> dict | None:
    user = fake_users_db.get(username)
    if not user or not verify_password(password, user["hashed_password"]):
        return None
    return user


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


# ============ 核心依赖:获取当前用户 ============
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无法验证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = fake_users_db.get(username)
    if user is None:
        raise credentials_exception

    return User(username=user["username"], role=user["role"])


# ============ 应用 ============
app = FastAPI(title="认证示例")


@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    """用户登录,返回 JWT Token"""
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token = create_access_token(
        data={"sub": user["username"], "role": user["role"]},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
    )
    return Token(access_token=access_token, token_type="bearer")


@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    """获取当前登录用户信息(需要 Token)"""
    return current_user

关键组件说明

组件作用
OAuth2PasswordBearer(tokenUrl="token")声明认证方案,告诉 Swagger UI 在哪里登录。注入到依赖中时,自动从请求头提取 Authorization: Bearer <token>
OAuth2PasswordRequestForm标准的 OAuth2 登录表单,包含 usernamepassword 字段,作为 /token 接口的参数
get_current_user核心依赖函数。解码 Token → 查找用户 → 返回用户对象。任何需要认证的接口,加上 Depends(get_current_user) 即可

认证流程调用链

请求 GET /users/me

Header: Authorization: Bearer eyJhbG...

oauth2_scheme 提取 token 字符串

get_current_user(token) 解码并验证

返回 User 对象注入到路由函数

在 Swagger UI 中测试

  1. 启动服务:fastapi dev main.py
  2. 打开 http://127.0.0.1:8000/docs
  3. 点击右上角 "Authorize" 按钮
  4. 输入 zhangsan / 123456,点击登录
  5. 此后所有请求自动携带 Token
  6. 测试 GET /users/me 接口

9.3 权限控制

基于角色的权限控制

get_current_user 基础上,创建角色检查依赖:

python
from functools import wraps


def require_role(required_role: str):
    """角色权限依赖工厂:生成检查特定角色的依赖函数"""
    async def role_checker(current_user: User = Depends(get_current_user)) -> User:
        if current_user.role != required_role:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"权限不足,需要 {required_role} 角色",
            )
        return current_user
    return role_checker


# ============ 使用示例 ============

@app.get("/admin/dashboard")
async def admin_dashboard(user: User = Depends(require_role("admin"))):
    """仅管理员可访问"""
    return {"message": f"欢迎管理员 {user.username}", "data": "管理后台敏感数据"}


@app.get("/user/profile")
async def user_profile(user: User = Depends(require_role("user"))):
    """仅普通用户可访问"""
    return {"message": f"你好 {user.username}", "profile": "个人资料信息"}


@app.get("/common/info")
async def common_info(user: User = Depends(get_current_user)):
    """任何已登录用户均可访问"""
    return {"message": f"你好 {user.username},你的角色是 {user.role}"}

权限控制效果

接口zhangsan(admin)lisi(user)未登录
GET /admin/dashboard✅ 200❌ 403❌ 401
GET /user/profile❌ 403✅ 200❌ 401
GET /common/info✅ 200✅ 200❌ 401

多角色权限

如果需要支持多角色访问,稍作修改:

python
def require_roles(*allowed_roles: str):
    """允许多个角色访问"""
    async def role_checker(current_user: User = Depends(get_current_user)) -> User:
        if current_user.role not in allowed_roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"权限不足,需要以下角色之一:{', '.join(allowed_roles)}",
            )
        return current_user
    return role_checker


@app.get("/reports")
async def view_reports(user: User = Depends(require_roles("admin", "manager"))):
    """管理员和经理均可查看报表"""
    return {"reports": ["月度报表", "年度报表"]}

9.4 动手练习

练习 1:完整认证系统

将上面的代码整合到一个 main.py 中运行,依次完成:

  1. 启动服务,用 Swagger UI 的 Authorize 按钮登录 zhangsan / 123456
  2. 请求 GET /users/me,确认返回用户信息
  3. 请求 GET /admin/dashboard,确认管理员可以访问
  4. 切换为 lisi / abcdef 登录,再请求 GET /admin/dashboard,观察 403 错误

练习 2:添加 Token 刷新接口

在现有代码基础上,新增一个 POST /token/refresh 接口:

python
@app.post("/token/refresh", response_model=Token)
async def refresh_token(current_user: User = Depends(get_current_user)):
    """用旧 Token 换新 Token(延长登录有效期)"""
    new_token = create_access_token(
        data={"sub": current_user.username, "role": current_user.role},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
    )
    return Token(access_token=new_token, token_type="bearer")

练习 3:用户注册接口

新增 POST /register 接口,接收用户名和密码,将新用户添加到 fake_users_db 中(默认角色为 user)。

提示:

python
class UserCreate(BaseModel):
    username: str
    password: str

@app.post("/register", response_model=User)
async def register(user_data: UserCreate):
    if user_data.username in fake_users_db:
        raise HTTPException(status_code=400, detail="用户名已存在")
    fake_users_db[user_data.username] = {
        "username": user_data.username,
        "hashed_password": pwd_context.hash(user_data.password),
        "role": "user",
    }
    return User(username=user_data.username, role="user")

本章小结

概念要点
密码哈希使用 passlib + bcrypt,永远不存明文密码
JWT 生成python-josejwt.encode(),包含 subexp 字段
JWT 验证jwt.decode() 自动检查签名和过期时间
OAuth2PasswordBearer声明认证方案,自动从请求头提取 Bearer Token
OAuth2PasswordRequestForm标准登录表单(username + password)
get_current_user核心认证依赖,解码 Token 并返回用户对象
权限控制依赖工厂函数 require_role() 实现角色检查
Swagger 集成声明 tokenUrl 后,Swagger UI 自动出现 Authorize 按钮

下一章预告:随着代码越来越多,全部塞在一个文件里会难以维护。下一章我们将学习如何用 APIRouter 拆分路由,组织规范的项目结构,并使用 BaseSettings 管理配置。

坚持是一种品格