8.3 AI Agent 助手
构建一个能搜索、写代码、查数据库、操作文件的全能 Agent——从单工具调用到多 Agent 协作的完整实战。
难度:⭐⭐⭐⭐ | 预计时长:3-4 周
8.3.1 Agent 自动化编码
Agent 编码系统能够自主规划、编写、执行、测试、修复代码,完成完整的编程任务。
python
from enum import Enum
from typing import List, Dict, Any, Callable
import subprocess
import tempfile
import os
import json
class TaskStatus(Enum):
PLANNING = "planning"
CODING = "coding"
TESTING = "testing"
FIXING = "fixing"
DONE = "done"
FAILED = "failed"
@dataclass
class AgentStep:
action: str # plan / write_file / run_code / run_tests / fix_code
description: str
result: str = ""
success: bool = False
class CodingAgent:
"""
自动化编码 Agent:
接收任务描述 → 制定计划 → 编写代码 → 运行测试 → 自动修复 → 交付成果
"""
MAX_FIX_ATTEMPTS = 3 # 最大自动修复次数
def __init__(self, model: str = "gpt-4o", workspace_dir: str = "/tmp/agent_workspace"):
self.client = OpenAI()
self.model = model
self.workspace = workspace_dir
os.makedirs(workspace_dir, exist_ok=True)
self.steps: List[AgentStep] = []
self.status = TaskStatus.PLANNING
self.file_registry: Dict[str, str] = {} # 记录生成的文件
def run(self, task: str, language: str = "python") -> Dict[str, Any]:
"""
主执行入口:完整自动化编码流程
"""
print(f"🤖 开始任务:{task}\n")
self.steps = []
try:
# 1. 规划
plan = self._plan_task(task, language)
# 2. 逐步执行计划
for step in plan:
action = step.get("action")
if action == "write_file":
self._write_file(step["filename"], step["content"], step["description"])
elif action == "run_code":
output, success = self._run_code(step["filename"])
if not success:
# 自动修复
self._auto_fix_loop(step["filename"], output, language)
elif action == "write_tests":
self._write_tests(step["filename"], step["source_file"])
elif action == "run_tests":
self._run_tests(step["filename"])
self.status = TaskStatus.DONE
except Exception as e:
self.status = TaskStatus.FAILED
self.steps.append(AgentStep("error", str(e), str(e), False))
return self._generate_report(task)
def _plan_task(self, task: str, language: str) -> List[Dict]:
"""制定任务执行计划"""
self.status = TaskStatus.PLANNING
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "system",
"content": f"你是一个自动化编程 Agent,擅长将任务分解为可执行步骤,使用 {language} 实现。"
}, {
"role": "user",
"content": f"""为以下编程任务制定执行计划:
任务:{task}
语言:{language}
以 JSON 格式返回执行步骤列表:
{{
"steps": [
{{
"action": "write_file",
"filename": "main.py",
"description": "编写主模块",
"content": "完整的文件代码"
}},
{{
"action": "write_tests",
"filename": "test_main.py",
"source_file": "main.py",
"description": "编写单元测试"
}},
{{
"action": "run_tests",
"filename": "test_main.py",
"description": "运行测试验证"
}}
]
}}
action 取值:write_file / write_tests / run_code / run_tests
"""
}],
response_format={"type": "json_object"}
)
plan_data = json.loads(response.choices[0].message.content)
plan = plan_data.get("steps", [])
self.steps.append(AgentStep(
action="plan",
description=f"制定执行计划:{len(plan)} 个步骤",
result=str([s.get("description") for s in plan]),
success=True
))
print(f"📋 执行计划(共 {len(plan)} 步):")
for i, s in enumerate(plan, 1):
print(f" {i}. [{s['action']}] {s.get('description', '')}")
print()
return plan
def _write_file(self, filename: str, content: str, description: str):
"""写入文件到工作目录"""
self.status = TaskStatus.CODING
# 清理 markdown 代码块
import re
content = re.sub(r'^```\w*\n?', '', content, flags=re.MULTILINE)
content = re.sub(r'```$', '', content, flags=re.MULTILINE)
content = content.strip()
filepath = os.path.join(self.workspace, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
self.file_registry[filename] = filepath
self.steps.append(AgentStep(
action="write_file",
description=description,
result=f"已写入 {filename}({len(content.splitlines())} 行)",
success=True
))
print(f"✅ 写入 {filename}")
def _run_code(self, filename: str) -> Tuple[str, bool]:
"""执行代码文件"""
filepath = self.file_registry.get(filename)
if not filepath:
return f"文件不存在:{filename}", False
try:
result = subprocess.run(
["python3", filepath],
capture_output=True,
text=True,
timeout=30,
cwd=self.workspace
)
output = result.stdout + result.stderr
success = result.returncode == 0
self.steps.append(AgentStep(
action="run_code",
description=f"运行 {filename}",
result=output[:500],
success=success
))
return output, success
except subprocess.TimeoutExpired:
return "执行超时(>30s)", False
def _run_tests(self, test_file: str) -> bool:
"""运行测试文件"""
self.status = TaskStatus.TESTING
filepath = self.file_registry.get(test_file, os.path.join(self.workspace, test_file))
result = subprocess.run(
["python3", "-m", "pytest", filepath, "-v", "--tb=short"],
capture_output=True,
text=True,
timeout=60,
cwd=self.workspace
)
output = result.stdout + result.stderr
success = result.returncode == 0
self.steps.append(AgentStep(
action="run_tests",
description=f"运行测试 {test_file}",
result=output[:1000],
success=success
))
print(f"{'✅' if success else '❌'} 测试{'通过' if success else '失败'}")
if not success:
print(f" 错误摘要:{output[-300:]}")
return success
def _write_tests(self, test_filename: str, source_file: str):
"""为源文件生成测试代码"""
source_path = self.file_registry.get(source_file)
if not source_path:
return
with open(source_path, 'r') as f:
source_code = f.read()
generator = CodeGenerator(model=self.model)
test_code = generator.generate_tests(source_code, "python", "pytest")
# 确保导入路径正确
module_name = source_file.replace(".py", "")
if f"from {module_name}" not in test_code and f"import {module_name}" not in test_code:
test_code = f"from {module_name} import *\n\n" + test_code
self._write_file(test_filename, test_code, f"生成 {source_file} 的单元测试")
def _auto_fix_loop(self, filename: str, error_output: str, language: str):
"""自动修复循环:最多尝试 MAX_FIX_ATTEMPTS 次"""
self.status = TaskStatus.FIXING
filepath = self.file_registry.get(filename)
for attempt in range(1, self.MAX_FIX_ATTEMPTS + 1):
print(f"🔧 自动修复第 {attempt} 次...")
with open(filepath, 'r') as f:
current_code = f.read()
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": f"""修复以下 {language} 代码中的错误:
代码({filename}):
```{language}
错误信息:
要求:
1. 分析错误根因
2. 最小化修改(只改有问题的部分)
3. 直接输出修复后的完整代码(不要解释)
```{language}"""
}],
temperature=0.1,
stop=["```"]
)
fixed_code = response.choices[0].message.content.strip()
# 写回修复后的代码
with open(filepath, 'w') as f:
f.write(fixed_code)
# 重新运行验证
output, success = self._run_code(filename)
if success:
self.steps.append(AgentStep(
action="fix_code",
description=f"第 {attempt} 次修复成功",
result=output,
success=True
))
print(f"✅ 修复成功")
return
error_output = output
# 超过最大修复次数
raise RuntimeError(f"自动修复失败,已尝试 {self.MAX_FIX_ATTEMPTS} 次")
def _generate_report(self, task: str) -> Dict[str, Any]:
"""生成任务执行报告"""
success_count = sum(1 for s in self.steps if s.success)
total_count = len(self.steps)
return {
"task": task,
"status": self.status.value,
"success_rate": f"{success_count}/{total_count}",
"files_generated": list(self.file_registry.keys()),
"steps": [
{
"action": s.action,
"description": s.description,
"success": s.success,
"result_preview": s.result[:100]
}
for s in self.steps
],
"workspace": self.workspace
}自动化工作流是 AI 应用从“单点能力”走向“业务闭环”的关键形态。核心挑战是:任务拆解与编排、跨系统工具调用稳定性、异常回滚与人工兜底、流程可观测与成本控制。本节围绕“数据分析自动化 + 多 Agent 协作 + RPA 执行”构建端到端方案。
系统架构
业务触发器(定时任务 / Webhook / 人工发起)
│
┌─────────▼─────────┐
│ 工作流编排器 │
│ DAG / 状态机 / 重试 │
└─────────┬─────────┘
│
┌──────────────┼──────────────┐
│ │ │
┌────▼────┐ ┌─────▼─────┐ ┌────▼────┐
│分析 Agent│ │决策 Agent │ │执行 Agent│
│数据聚合 │ │策略选择/审批│ │API/RPA │
└────┬────┘ └─────┬─────┘ └────┬────┘
│ │ │
└──────────────┼──────────────┘
│
┌───────────▼───────────┐
│ 结果写回与通知 │
│ DB / 飞书 / 邮件 / Slack│
└───────────┬───────────┘
│
监控与审计(日志、指标、Trace)8.3.2 数据分析自动化
典型场景:每天自动生成“销售异常日报”,包括指标计算、异常检测、原因摘要、行动建议。
python
# pip install openai pandas numpy
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from openai import OpenAI
import json
client = OpenAI()
@dataclass
class KPIAnomaly:
metric: str
current_value: float
baseline_value: float
deviation_ratio: float
severity: str # low / medium / high / critical
possible_reasons: List[str]
@dataclass
class AnalysisReport:
report_date: str
kpis: Dict[str, float]
anomalies: List[KPIAnomaly]
summary: str
action_items: List[str]
class SalesAnalyzer:
"""销售数据自动分析器"""
def __init__(self, model: str = "gpt-4o-mini"):
self.model = model
self.client = OpenAI()
def compute_core_kpis(self, df: pd.DataFrame) -> Dict[str, float]:
"""计算核心业务指标"""
total_orders = len(df)
total_revenue = float(df["amount"].sum())
avg_order_value = float(df["amount"].mean()) if total_orders else 0.0
pay_rate = float((df["status"] == "paid").mean()) if total_orders else 0.0
refund_rate = float((df["status"] == "refunded").mean()) if total_orders else 0.0
return {
"total_orders": round(total_orders, 2),
"total_revenue": round(total_revenue, 2),
"avg_order_value": round(avg_order_value, 2),
"pay_rate": round(pay_rate, 4),
"refund_rate": round(refund_rate, 4)
}
def detect_anomalies(
self,
today_kpis: Dict[str, float],
baseline_kpis: Dict[str, float],
threshold: float = 0.2
) -> List[KPIAnomaly]:
"""基于阈值检测异常"""
anomalies = []
for metric, current in today_kpis.items():
baseline = baseline_kpis.get(metric, 0.0)
if baseline == 0:
continue
deviation = (current - baseline) / baseline
if abs(deviation) < threshold:
continue
abs_dev = abs(deviation)
if abs_dev >= 0.5:
severity = "critical"
elif abs_dev >= 0.35:
severity = "high"
elif abs_dev >= 0.25:
severity = "medium"
else:
severity = "low"
anomalies.append(KPIAnomaly(
metric=metric,
current_value=current,
baseline_value=baseline,
deviation_ratio=round(deviation, 4),
severity=severity,
possible_reasons=[]
))
return anomalies
def explain_anomalies_with_llm(
self,
anomalies: List[KPIAnomaly],
context: str = ""
) -> List[KPIAnomaly]:
"""用 LLM 补全异常原因"""
if not anomalies:
return anomalies
payload = [
{
"metric": a.metric,
"current": a.current_value,
"baseline": a.baseline_value,
"deviation_ratio": a.deviation_ratio,
"severity": a.severity
}
for a in anomalies
]
prompt = f"""你是电商数据分析师。请分析以下 KPI 异常并给出每项 2-4 个可能原因。
异常数据:
{json.dumps(payload, ensure_ascii=False, indent=2)}
{f'补充业务上下文:{context}' if context else ''}
以 JSON 返回:
{{
"reasons": [
<span v-pre>{{ "metric": "total_revenue", "possible_reasons": ["...", "..."] }}</span>
],
"summary": "一句话整体结论",
"actions": ["可执行动作1", "可执行动作2"]
}}
"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
temperature=0.2
)
data = json.loads(response.choices[0].message.content)
reason_map = {
item["metric"]: item.get("possible_reasons", [])
for item in data.get("reasons", [])
}
for a in anomalies:
a.possible_reasons = reason_map.get(a.metric, [])
self._last_summary = data.get("summary", "")
self._last_actions = data.get("actions", [])
return anomalies
def build_daily_report(
self,
today_df: pd.DataFrame,
baseline_df: pd.DataFrame,
biz_context: str = ""
) -> AnalysisReport:
"""生成完整日报"""
today_kpis = self.compute_core_kpis(today_df)
baseline_kpis = self.compute_core_kpis(baseline_df)
anomalies = self.detect_anomalies(today_kpis, baseline_kpis)
anomalies = self.explain_anomalies_with_llm(anomalies, biz_context)
return AnalysisReport(
report_date=datetime.now().strftime("%Y-%m-%d"),
kpis=today_kpis,
anomalies=anomalies,
summary=getattr(self, "_last_summary", "今日指标整体平稳"),
action_items=getattr(self, "_last_actions", [])
)8.3.3 多 Agent 协作完成复杂任务
在复杂流程中,建议采用“规划 Agent + 执行 Agent + 质检 Agent”的最小多智能体协作模型,避免单 Agent 过载。
python
from dataclasses import dataclass
from typing import List, Dict, Any, Literal
from openai import OpenAI
import json
client = OpenAI()
Role = Literal["planner", "executor", "reviewer"]
@dataclass
class AgentMessage:
role: Role
content: str
@dataclass
class WorkflowTask:
id: str
description: str
owner: Role
depends_on: List[str]
status: str = "pending" # pending / running / done / failed
output: str = ""
class MultiAgentCoordinator:
"""多 Agent 协作调度器(简化版)"""
def __init__(self, model: str = "gpt-4o"):
self.client = OpenAI()
self.model = model
self.messages: List[AgentMessage] = []
def plan(self, goal: str) -> List[WorkflowTask]:
prompt = f"""将目标拆解为可执行任务(最多 8 步):{goal}
返回 JSON:
{{
"tasks": [
{{
"id": "t1",
"description": "任务描述",
"owner": "planner|executor|reviewer",
"depends_on": []
}}
]
}}
规则:
- 前置依赖必须明确
- 至少包含 1 个 reviewer 任务用于质检
- 任务描述可直接执行,不要空泛表述
"""
resp = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
temperature=0.2
)
data = json.loads(resp.choices[0].message.content)
tasks = [WorkflowTask(**t) for t in data.get("tasks", [])]
return tasks
def run_task(self, task: WorkflowTask, global_context: Dict[str, Any]) -> WorkflowTask:
"""执行单任务"""
task.status = "running"
role_prompt = {
"planner": "你是流程规划专家,负责补充执行策略与风险点。",
"executor": "你是执行专家,输出可落地结果(结构化、可操作)。",
"reviewer": "你是质检专家,发现问题并给出修正建议与结论。"
}
prompt = f"""
角色:{task.owner}
任务:{task.description}
上下文:{json.dumps(global_context, ensure_ascii=False)}
输出要求:
1. 先给结果
2. 再给关键依据
3. 最后给下一步建议(如有)
"""
try:
resp = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": role_prompt[task.owner]},
{"role": "user", "content": prompt}
],
temperature=0.2
)
task.output = resp.choices[0].message.content.strip()
task.status = "done"
except Exception as e:
task.status = "failed"
task.output = f"执行失败:{str(e)}"
self.messages.append(AgentMessage(role=task.owner, content=task.output))
return task
def execute(self, goal: str) -> Dict[str, Any]:
"""按依赖顺序执行完整工作流"""
tasks = self.plan(goal)
done_map = {}
global_context = {"goal": goal}
while True:
pending = [t for t in tasks if t.status == "pending"]
if not pending:
break
progressed = False
for task in pending:
if all(dep in done_map for dep in task.depends_on):
finished = self.run_task(task, global_context)
done_map[task.id] = finished.output
global_context[task.id] = finished.output
progressed = True
if not progressed:
# 存在循环依赖或缺失依赖
for task in pending:
task.status = "failed"
task.output = "依赖解析失败:请检查 depends_on 配置"
break
return {
"goal": goal,
"tasks": [
{
"id": t.id,
"owner": t.owner,
"description": t.description,
"status": t.status,
"output": t.output[:300]
}
for t in tasks
],
"messages": [m.__dict__ for m in self.messages]
}8.3.4 RPA + AI 结合
当目标系统没有开放 API时,可使用“AI 决策 + RPA 执行”模式(如网页填报、后台录单、跨系统复制粘贴)。
python
# pip install playwright openai
import asyncio
from dataclasses import dataclass
from typing import Dict, Any, List
from openai import OpenAI
from playwright.async_api import async_playwright
import json
client = OpenAI()
@dataclass
class RpaStep:
action: str # click / fill / select / wait / extract
selector: str
value: str = ""
description: str = ""
class AIRPAExecutor:
"""AI 驱动的 RPA 执行器"""
def __init__(self, model: str = "gpt-4o-mini"):
self.model = model
self.client = OpenAI()
async def plan_steps(self, task_desc: str, page_schema: Dict[str, Any]) -> List[RpaStep]:
"""根据页面元素说明生成 RPA 操作序列"""
prompt = f"""你是 RPA 流程设计专家。
任务:{task_desc}
页面元素:{json.dumps(page_schema, ensure_ascii=False)}
请输出 JSON:
{{
"steps": [
<span v-pre>{{ "action": "fill", "selector": "#username", "value": "admin", "description": "填写账号" }}</span>
]
}}
约束:
- selector 必须来自 page_schema
- 不要输出危险动作(删除、批量覆盖)
- 每步描述要具体
"""
resp = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
temperature=0.1
)
data = json.loads(resp.choices[0].message.content)
return [RpaStep(**s) for s in data.get("steps", [])]
async def execute_steps(self, url: str, steps: List[RpaStep]) -> Dict[str, Any]:
"""执行 RPA 步骤"""
logs = []
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
await page.goto(url)
for idx, step in enumerate(steps, 1):
try:
if step.action == "click":
await page.click(step.selector)
elif step.action == "fill":
await page.fill(step.selector, step.value)
elif step.action == "select":
await page.select_option(step.selector, step.value)
elif step.action == "wait":
await page.wait_for_timeout(int(step.value or "1000"))
elif step.action == "extract":
text = await page.text_content(step.selector)
logs.append({"step": idx, "extract": text})
logs.append({
"step": idx,
"action": step.action,
"selector": step.selector,
"ok": True,
"desc": step.description
})
except Exception as e:
logs.append({
"step": idx,
"action": step.action,
"selector": step.selector,
"ok": False,
"error": str(e)
})
await browser.close()
return {"success": False, "logs": logs}
await browser.close()
return {"success": True, "logs": logs}8.3.5 统一编排与可观测性
工程实践中,建议把流程统一封装为 WorkflowEngine,并配置:超时、重试、幂等键、人工审批节点、审计日志。
python
from dataclasses import dataclass, field
from typing import Callable, Dict, Any, List, Optional
import time
import uuid
@dataclass
class NodeResult:
success: bool
output: Any = None
error: str = ""
duration_ms: int = 0
@dataclass
class WorkflowNode:
name: str
handler: Callable[Dict[str, Any](Dict[str, Any), Any]
retry: int = 1
timeout_sec: int = 30
requires_approval: bool = False
@dataclass
class WorkflowRun:
run_id: str
workflow_name: str
status: str
trace: List[Dict[str, Any]] = field(default_factory=list)
class WorkflowEngine:
"""轻量工作流引擎:顺序节点 + 重试 + 审计"""
def __init__(self):
self.nodes: List[WorkflowNode] = []
def add_node(self, node: WorkflowNode):
self.nodes.append(node)
def run(self, workflow_name: str, initial_context: Dict[str, Any]) -> WorkflowRun:
run = WorkflowRun(
run_id=str(uuid.uuid4()),
workflow_name=workflow_name,
status="running"
)
context = dict(initial_context)
for node in self.nodes:
if node.requires_approval and not context.get(f"approved_{node.name}", False):
run.status = "paused_for_approval"
run.trace.append({
"node": node.name,
"status": "paused",
"reason": "等待人工审批"
})
return run
success = False
last_error = ""
result_output = None
for attempt in range(1, node.retry + 1):
start = time.time()
try:
output = node.handler(context)
duration = int((time.time() - start) * 1000)
result_output = output
context[node.name] = output
run.trace.append({
"node": node.name,
"attempt": attempt,
"status": "success",
"duration_ms": duration
})
success = True
break
except Exception as e:
duration = int((time.time() - start) * 1000)
last_error = str(e)
run.trace.append({
"node": node.name,
"attempt": attempt,
"status": "failed",
"error": last_error,
"duration_ms": duration
})
if not success:
run.status = "failed"
run.trace.append({
"node": node.name,
"status": "abort",
"error": last_error
})
return run
run.status = "done"
run.trace.append({"status": "completed"})
return run8.3.6 FastAPI 接口示例
python
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Dict, Any
app = FastAPI(title="自动化工作流平台")
class WorkflowRequest(BaseModel):
workflow_name: str
input: Dict[str, Any]
@app.post("/workflow/run")
async def run_workflow(req: WorkflowRequest):
engine = WorkflowEngine()
# 示例节点:你可以替换为真实业务函数
engine.add_node(WorkflowNode("analyze", lambda ctx: {"risk": "medium"}, retry=2))
engine.add_node(WorkflowNode("decide", lambda ctx: {"route": "auto"}))
engine.add_node(WorkflowNode("execute", lambda ctx: {"ticket_id": "T20260218001"}))
result = engine.run(req.workflow_name, req.input)
return {
"run_id": result.run_id,
"workflow": result.workflow_name,
"status": result.status,
"trace": result.trace
}系统关键指标
| 指标 | 目标值 | 优化方向 |
|---|---|---|
| 自动化覆盖率 | ≥ 60% | 优先覆盖高频、规则稳定流程 |
| 流程成功率 | ≥ 95% | 重试机制、幂等设计、异常补偿 |
| 人工接管率 | ≤ 15% | 提升意图识别与决策稳定性 |
| 平均处理时长 | ≤ 2min/单 | 并行执行、减少阻塞节点 |
| 审计可追溯率 | 100% | 全链路日志 + Trace ID |
落地建议(MVP 优先)
- 先选一个高价值且规则相对稳定的流程(如日报生成、工单分发)做 MVP。
- 第一版使用“单工作流 + 少量节点 + 人工兜底”,先跑通闭环再扩展。
- 关键风险点必须加保护:超时、重试、审批、回滚、审计日志。
- 每周复盘指标:成功率、接管率、成本与收益,持续优化 Prompt 与流程节点。