Skip to content

AI 应用的成本控制与优化

从一个月烧 $500 到 $50——手把手教你把 AI 应用的账单砍下来,同时不牺牲用户体验。


6. 流量控制与限流:防止账单爆炸

前面几章讲的都是"省钱"——但还有一个更恐怖的问题:账单失控

一个没有限流的 AI 应用,被爬虫扫一遍、被恶意用户刷一波、或者仅仅是一个热门帖子带来的流量洪峰——都可能让你的月账单从 $50 暴涨到 $5000。这一章,我们给应用装上"刹车"。


6.1 没有限流的 AI 应用 = 定时炸弹

真实的灾难场景

灾难案例:

  案例 1:爬虫攻击
  ─────────────────────────────────────
  某知识库问答 API 被爬虫发现
  爬虫 24 小时内发送了 50 万次请求
  每次请求成本 $0.002
  当天额外成本:$1000
  → 月预算一天就花完了

  案例 2:用户滥用
  ─────────────────────────────────────
  一个用户写脚本自动提问
  每分钟发送 100 个问题
  一天 144000 个请求
  当天成本:$288
  → 一个用户就占了全部预算

  案例 3:病毒式传播
  ─────────────────────────────────────
  产品突然上了热搜
  日活从 100 暴涨到 10000
  日请求量从 500 暴涨到 50000
  本来 $30/月 的账单变成 $3000/月
  → 好消息是用户暴涨,坏消息是钱不够了

  共同点:没有限流 = 对成本失去控制

6.2 Token 预算机制:每个用户每天花多少

最直接的成本控制——给每个用户设预算上限。

预算层级设计

三级预算体系:

  Level 1:全局预算(防灾难)
  ─────────────────────────────────────
    月总预算:$200
    日预算:$10(留余量)
    → 超过日预算 → 全局降级或暂停服务

  Level 2:用户级预算(防滥用)
  ─────────────────────────────────────
    免费用户:每天 20 次查询
    付费用户:每天 200 次查询
    企业用户:每天 2000 次查询
    → 超过配额 → 提示"今日额度已用完"

  Level 3:请求级预算(防意外)
  ─────────────────────────────────────
    单次请求 max_tokens ≤ 1000
    单次输入 Token ≤ 5000
    → 超过限制 → 截断或拒绝

Token 预算管理器

python
import redis
from datetime import datetime

class TokenBudgetManager:
    """Token 预算管理:每个用户每天的 Token 使用限额"""

    def __init__(self, redis_client):
        self.redis = redis_client
        self.limits = {
            "free": {"daily_tokens": 50000, "daily_requests": 20},
            "pro": {"daily_tokens": 500000, "daily_requests": 200},
            "enterprise": {"daily_tokens": 5000000, "daily_requests": 2000},
        }

    def _today_key(self, user_id: str) -> str:
        today = datetime.now().strftime("%Y%m%d")
        return f"budget:{user_id}:{today}"

    def check_budget(self, user_id: str, tier: str = "free") -> dict:
        """检查用户是否还有预算"""
        key = self._today_key(user_id)
        used_tokens = int(self.redis.hget(key, "tokens") or 0)
        used_requests = int(self.redis.hget(key, "requests") or 0)
        limits = self.limits[tier]

        return {
            "allowed": (used_tokens < limits["daily_tokens"]
                       and used_requests < limits["daily_requests"]),
            "used_tokens": used_tokens,
            "remaining_tokens": max(0, limits["daily_tokens"] - used_tokens),
            "used_requests": used_requests,
            "remaining_requests": max(0, limits["daily_requests"] - used_requests),
        }

    def record_usage(self, user_id: str, tokens: int):
        """记录一次 API 调用的 Token 消耗"""
        key = self._today_key(user_id)
        pipe = self.redis.pipeline()
        pipe.hincrby(key, "tokens", tokens)
        pipe.hincrby(key, "requests", 1)
        pipe.expire(key, 86400 * 2)  # 2 天后自动清理
        pipe.execute()

# 使用
budget = TokenBudgetManager(redis_client)

# 请求前检查
check = budget.check_budget("user_123", tier="free")
if not check["allowed"]:
    return {"error": "今日额度已用完,请明天再试", "remaining": 0}

# 请求后记录
budget.record_usage("user_123", tokens=response.usage.total_tokens)

6.3 实战:FastAPI + 中间件实现速率限制

速率限制中间件

python
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import time

app = FastAPI()

class RateLimiter:
    """滑动窗口速率限制器"""

    def __init__(self, redis_client, requests_per_minute: int = 10):
        self.redis = redis_client
        self.rpm = requests_per_minute

    def is_allowed(self, identifier: str) -> bool:
        now = time.time()
        window_key = f"rate:{identifier}"

        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(window_key, 0, now - 60)  # 清除 60 秒前的记录
        pipe.zadd(window_key, {str(now): now})           # 添加当前请求
        pipe.zcard(window_key)                            # 计算窗口内请求数
        pipe.expire(window_key, 120)                      # 2 分钟后自动清理
        results = pipe.execute()

        request_count = results[2]
        return request_count <= self.rpm

rate_limiter = RateLimiter(redis_client, requests_per_minute=10)
budget_manager = TokenBudgetManager(redis_client)

@app.middleware("http")
async def cost_control_middleware(request: Request, call_next):
    """成本控制中间件:限流 + 预算检查"""

    if request.url.path.startswith("/api/chat"):
        # 获取用户标识(IP 或 API Key)
        user_id = request.headers.get("X-API-Key", request.client.host)

        # 1. 速率限制
        if not rate_limiter.is_allowed(user_id):
            return JSONResponse(
                status_code=429,
                content={"error": "请求过于频繁,请稍后再试", "retry_after": 60}
            )

        # 2. 预算检查
        check = budget_manager.check_budget(user_id, tier="free")
        if not check["allowed"]:
            return JSONResponse(
                status_code=429,
                content={
                    "error": "今日额度已用完",
                    "used_requests": check["used_requests"],
                    "remaining_requests": 0
                }
            )

    response = await call_next(request)
    return response

完整的 Chat API

python
@app.post("/api/chat")
async def chat(request: Request):
    body = await request.json()
    question = body.get("question", "")
    user_id = request.headers.get("X-API-Key", request.client.host)

    # 调用 LLM(已通过中间件的限流和预算检查)
    result = cached_chat(question, model="gpt-4o-mini")

    # 记录 Token 用量
    if not result.get("cached"):
        budget_manager.record_usage(user_id, result.get("tokens", 0))

    return {
        "answer": result["answer"],
        "cached": result.get("cached", False),
        "tokens_used": result.get("tokens", 0)
    }

缓存命中不计费:注意代码中只有未命中缓存时才记录 Token 消耗。这意味着缓存命中的请求不占用用户配额——对用户更友好,也鼓励"好问题"(高频问题会被缓存)。

6.4 账单告警与自动熔断

限流防的是"小偷",熔断防的是"灾难"——当成本即将突破预算时,自动降级或暂停服务。

多级告警

成本告警的三条线:

  日预算:$10

  🟢 正常(< 70%)
  ─────────────────────────────────────
    日消耗 < $7 → 一切正常,无需干预

  🟡 黄线告警(70-90%)
  ─────────────────────────────────────
    日消耗 $7-$9 → 发送告警通知
    → 自动降级:所有请求切换到最便宜的模型
    → 通知管理员关注

  🔴 红线熔断(> 90%)
  ─────────────────────────────────────
    日消耗 > $9 → 自动熔断
    → 只允许缓存命中的请求通过
    → 新的 API 调用全部拒绝
    → 返回:"系统繁忙,请稍后再试"

自动熔断实现

python
class CostGuard:
    """成本守卫:监控 + 告警 + 熔断"""

    def __init__(self, redis_client, daily_budget: float = 10.0):
        self.redis = redis_client
        self.daily_budget = daily_budget

    def _today_cost_key(self) -> str:
        return f"cost:{datetime.now().strftime('%Y%m%d')}"

    def record_cost(self, cost: float):
        """记录一次请求的成本"""
        key = self._today_cost_key()
        self.redis.incrbyfloat(key, cost)
        self.redis.expire(key, 86400 * 2)

    def get_status(self) -> dict:
        """获取当前成本状态"""
        key = self._today_cost_key()
        today_cost = float(self.redis.get(key) or 0)
        ratio = today_cost / self.daily_budget

        if ratio >= 0.9:
            level = "critical"     # 🔴 熔断
            action = "circuit_break"
        elif ratio >= 0.7:
            level = "warning"      # 🟡 降级
            action = "downgrade"
        else:
            level = "normal"       # 🟢 正常
            action = "none"

        return {
            "today_cost": f"${today_cost:.4f}",
            "daily_budget": f"${self.daily_budget:.2f}",
            "usage_ratio": f"{ratio:.1%}",
            "level": level,
            "action": action,
        }

    def get_allowed_model(self, preferred_model: str = "gpt-4o-mini") -> str | None:
        """根据成本状态决定允许使用的模型"""
        status = self.get_status()

        if status["action"] == "circuit_break":
            return None  # 熔断,不允许调用
        elif status["action"] == "downgrade":
            return "gpt-4o-mini"  # 降级到最便宜
        else:
            return preferred_model  # 正常使用

# 集成到 Chat API
guard = CostGuard(redis_client, daily_budget=10.0)

@app.post("/api/chat")
async def chat_with_guard(request: Request):
    body = await request.json()
    question = body["question"]

    # 成本守卫检查
    model = guard.get_allowed_model("gpt-4o-mini")
    if model is None:
        return JSONResponse(
            status_code=503,
            content={"error": "系统繁忙,请稍后再试", "retry_after": 3600}
        )

    result = cached_chat(question, model=model)

    # 记录成本
    if not result.get("cached"):
        cost = result["tokens"] * 0.15 / 1_000_000  # mini 输入价
        guard.record_cost(cost)

    return {"answer": result["answer"], "model": model}

告警通知

python
import requests

def send_cost_alert(status: dict):
    """发送成本告警通知(企业微信/钉钉/Slack)"""

    if status["level"] == "warning":
        message = f"⚠️ AI 成本告警:今日已消耗 {status['today_cost']}," \
                  f"达到预算的 {status['usage_ratio']},已自动降级模型"
    elif status["level"] == "critical":
        message = f"🚨 AI 成本熔断:今日已消耗 {status['today_cost']}," \
                  f"超过预算 90%,已暂停 API 调用"
    else:
        return

    # 发送到企业微信 Webhook
    requests.post("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx", json={
        "msgtype": "text",
        "text": {"content": message}
    })

关键原则:永远不要相信"这种事不会发生"。没有限流的 AI API 就是开放的钱包。即使你的用户都是好人,一个配置错误的脚本就能让你的账单爆炸。


本章小结

知识点要点
为什么限流爬虫/滥用/流量洪峰都能让账单失控
三级预算全局预算 → 用户级预算 → 请求级预算
Token 配额免费 20 次/天,付费 200 次,企业 2000 次
滑动窗口限流Redis ZSET 实现,精确到分钟级
FastAPI 中间件一处拦截,全局生效
缓存不计费缓存命中不扣用户额度,鼓励好问题
多级告警70% 黄线降级,90% 红线熔断
自动熔断超预算后只允许缓存请求,拒绝新 API 调用

下一章预告:推理优化——同样的质量,更少的计算。我们会探讨流式输出、批处理合并、本地部署的成本计算,以及模型量化如何用更小的模型做同样的事。

坚持是一种品格