8.5 数据分析 Agent(代码助手)
自然语言转 SQL、代码生成与审查、数据可视化——让 AI 成为你的数据分析搭档。
难度:⭐⭐⭐⭐ | 预计时长:2-3 周
代码助手是开发者效率工具中增长最快的 AI 应用,核心挑战是:精准理解代码上下文、生成可运行的代码、识别安全漏洞、自主规划并执行多步编程任务。本节构建一个涵盖代码生成、审查、SQL 转换及自动化编码的完整系统。
系统架构
开发者输入(自然语言 / 代码片段 / 函数签名)
│
┌────────────┼────────────┐
│ │ │
┌────▼────┐ ┌───▼────┐ ┌───▼────┐
│ 代码生成 │ │代码审查│ │NL→SQL │
│ & 补全 │ │& 解释 │ │ │
└────┬────┘ └───┬────┘ └───┬────┘
│ │ │
└────────────┼────────────┘
│
┌────────▼────────┐
│ Agent 自动化 │
│ 编码执行引擎 │
│ (Plan→Code→ │
│ Test→Fix) │
└─────────────────┘8.5.1 代码生成与补全
代码生成的核心策略
| 场景 | 策略 | 关键技术 |
|---|---|---|
| 函数生成 | 函数签名 + 文档注释 → 实现体 | Few-shot、类型提示 |
| 类/模块生成 | 需求描述 → 完整类定义 | 结构化输出、接口设计 |
| 代码补全 | 光标上下文 → 续写 | 前缀/后缀填充(FIM) |
| 测试生成 | 函数代码 → 单元测试 | 边界覆盖、Mock 推断 |
| 代码翻译 | A 语言 → B 语言 | 语义保留、惯用法转换 |
代码生成器实现
from openai import OpenAI
from dataclasses import dataclass
from typing import List, Optional
import ast
import subprocess
import tempfile
import os
client = OpenAI()
@dataclass
class CodeGenerationResult:
code: str
language: str
explanation: str
test_code: Optional[str] = None
is_runnable: bool = False
syntax_errors: List[str] = None
class CodeGenerator:
"""智能代码生成器"""
LANGUAGE_CONFIGS = {
"python": {
"comment": "#",
"extension": ".py",
"run_cmd": "python3"
},
"javascript": {
"comment": "//",
"extension": ".js",
"run_cmd": "node"
},
"typescript": {
"comment": "//",
"extension": ".ts",
"run_cmd": "ts-node"
},
"go": {
"comment": "//",
"extension": ".go",
"run_cmd": "go run"
},
"sql": {
"comment": "--",
"extension": ".sql",
"run_cmd": None
}
}
def __init__(self, model: str = "gpt-4o"):
self.client = OpenAI()
self.model = model
def generate_function(
self,
description: str,
language: str = "python",
context_code: str = "",
style_guide: str = "",
include_tests: bool = True
) -> CodeGenerationResult:
"""根据自然语言描述生成函数"""
system_prompt = f"""你是一名资深{language}工程师,擅长编写清晰、高效、符合最佳实践的代码。
代码要求:
- 包含完整的类型注解(如语言支持)
- 添加简洁的文档字符串
- 处理边界情况和异常
- 变量命名清晰、符合{language}命名规范
{f'风格指南:{style_guide}' if style_guide else ''}
"""
user_prompt = f"""请实现以下功能:
{description}
{f'已有代码上下文:\n```{language}\n{context_code}\n```' if context_code else ''}
请以 JSON 格式返回:
{{
"code": "完整的函数/类代码",
"explanation": "实现思路简要说明(2-3句)",
"complexity": "时间复杂度 O(?)",
"edge_cases": ["处理了哪些边界情况"]
}}
"""
import json
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
response_format={"type": "json_object"},
temperature=0.2
)
result = json.loads(response.choices[0].message.content)
code = result.get("code", "")
# 语法检查(Python)
syntax_errors = []
if language == "python":
syntax_errors = self._check_python_syntax(code)
# 生成测试代码
test_code = None
if include_tests and not syntax_errors:
test_code = self.generate_tests(code, language)
return CodeGenerationResult(
code=code,
language=language,
explanation=result.get("explanation", ""),
test_code=test_code,
is_runnable=len(syntax_errors) == 0,
syntax_errors=syntax_errors
)
def generate_tests(
self,
source_code: str,
language: str = "python",
test_framework: str = "pytest"
) -> str:
"""为代码自动生成单元测试"""
framework_guide = {
"pytest": "使用 pytest,测试函数以 test_ 开头,使用 assert 断言",
"unittest": "使用 unittest.TestCase,测试方法以 test_ 开头",
"jest": "使用 Jest,describe/it 结构,expect().toBe() 断言",
"go_test": "使用 Go testing 包,测试函数以 Test 开头"
}
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": f"""为以下代码编写完整的单元测试:
```{language}
测试框架:{framework_guide.get(test_framework, test_framework)}
覆盖要求:
1. 正常输入的典型用例(≥3个)
2. 边界条件(空输入、最大值、最小值等)
3. 异常情况(非法输入、类型错误等)
4. 每个测试用例附注释说明测试意图
直接输出测试代码,不要任何额外说明。"""
}],
temperature=0.2
)
return response.choices[0].message.content.strip()
def translate_code(
self,
source_code: str,
from_lang: str,
to_lang: str
) -> str:
"""跨语言代码翻译"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": f"""将以下 {from_lang} 代码翻译为 {to_lang},要求:
1. 保持功能完全一致
2. 使用 {to_lang} 的惯用写法(不要逐行直译)
3. 保留注释的语义
4. 使用 {to_lang} 的标准库(避免不必要的第三方依赖)
```{from_lang}
{source_code}直接输出翻译后的 {to_lang} 代码。""" }], temperature=0.1 )
return response.choices[0].message.content.strip()
def _check_python_syntax(self, code: str) -> List[str]:
"""检查 Python 代码语法错误"""
errors = []
# 提取代码块(去除 markdown 格式)
import re
code_match = re.search(r'```(?:python)?\n([\s\S]*?)```', code)
clean_code = code_match.group(1) if code_match else code
try:
ast.parse(clean_code)
except SyntaxError as e:
errors.append(f"SyntaxError at line {e.lineno}: {e.msg}")
return errors
def complete_code(
self,
prefix: str,
suffix: str = "",
language: str = "python",
max_tokens: int = 200
) -> str:
"""代码补全(光标位置续写,支持 FIM 模式)"""
if suffix:
# FIM(Fill-in-the-Middle)模式
prompt = f"```{language}\n{prefix}<|fim_middle|>{suffix}\n```\n只输出填充的代码,不要任何解释:"
else:
prompt = f"```{language}\n{prefix}"
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": f"你是代码补全助手,直接续写{language}代码,不要任何注释或解释。"
}, {
"role": "user",
"content": prompt
}],
max_tokens=max_tokens,
temperature=0.1,
stop=["```"]
)
return response.choices[0].message.content.strip()
---
#### 8.5.2 代码审查与解释
**多维度代码审查**
```python
import json
from typing import Dict, List
@dataclass
class ReviewIssue:
severity: str # critical / warning / suggestion
category: str # security / performance / style / logic
line_range: str # "L10-L15"
description: str
suggestion: str
@dataclass
class CodeReviewResult:
issues: List[ReviewIssue]
overall_score: int # 1-10
summary: str
refactored_code: Optional[str] = None
class CodeReviewer:
"""代码审查器:安全、性能、可读性、逻辑多维检查"""
REVIEW_PROMPT = """请对以下 {language} 代码进行专业 Code Review。
```{language}
{code}
::: v-pre从以下维度检查:
- 安全性:注入攻击、敏感信息泄露、权限控制、输入校验
- 性能:时间/空间复杂度、不必要的循环、N+1 查询、内存泄漏
- 可读性:命名规范、注释完整性、函数长度、职责单一
- 逻辑正确性:边界条件、并发安全、异常处理、资源释放
- 最佳实践:设计模式、语言惯用法、依赖管理
以 JSON 格式返回: {{ "overall_score": 7, "summary": "整体评价(2-3句)", "issues": [ {{ "severity": "critical", "category": "security", "line_range": "L15-L18", "description": "问题描述", "suggestion": "具体修复建议" }} ] }}
severity 取值:critical(必须修复)/ warning(建议修复)/ suggestion(优化建议) """
def __init__(self, model: str = "gpt-4o"):
self.client = OpenAI()
self.model = model
def review(
self,
code: str,
language: str = "python",
context: str = "",
focus: List[str] = None
) -> CodeReviewResult:
"""执行代码审查"""
prompt = self.REVIEW_PROMPT.format(
language=language,
code=code
)
if context:
prompt += f"\n\n**业务上下文**:{context}"
if focus:
prompt += f"\n\n**重点关注**:{', '.join(focus)}"
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "system",
"content": "你是一名具有10年经验的资深工程师,擅长代码安全审查和性能优化。"
}, {
"role": "user",
"content": prompt
}],
response_format={"type": "json_object"},
temperature=0.1
)
data = json.loads(response.choices[0].message.content)
issues = [
ReviewIssue(**issue)
for issue in data.get("issues", [])
]
# 自动生成修复后的代码(仅针对 critical 和 warning 问题)
critical_issues = [i for i in issues if i.severity in ("critical", "warning")]
refactored = None
if critical_issues:
refactored = self._auto_fix(code, language, critical_issues)
return CodeReviewResult(
issues=issues,
overall_score=data.get("overall_score", 5),
summary=data.get("summary", ""),
refactored_code=refactored
)
def _auto_fix(
self,
code: str,
language: str,
issues: List[ReviewIssue]
) -> str:
"""根据审查问题自动修复代码"""
issues_desc = "\n".join([
f"- [{i.severity.upper()}] {i.line_range}: {i.description} → {i.suggestion}"
for i in issues
])
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": f"""修复以下 {language} 代码中的问题:
原始代码:
:::
{code}需修复的问题:
要求:
- 只修复列出的问题,不做其他改动
- 在修改处添加简短注释说明修复原因
- 直接输出修复后的完整代码
}],
temperature=0.1,
stop=["```"]
)
return response.choices[0].message.content.strip()
def explain_code(
self,
code: str,
language: str = "python",
level: str = "intermediate" # beginner / intermediate / expert
) -> str:
"""代码解释(适配不同学习层次)"""
level_guide = {
"beginner": "用通俗易懂的语言解释,避免术语,多用类比",
"intermediate": "解释实现原理和关键技术点,适当提及设计选择",
"expert": "深入分析算法复杂度、设计模式、潜在优化空间"
}
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": f"""请解释以下 {language} 代码:
```{language}
{code}目标读者:
解释结构:
功能概述(一句话)
逐段解析(每段代码的作用)
关键算法或设计说明
使用示例""" }] )
return response.choices[0].message.content.strip()def generate_docstring( self, code: str, language: str = "python", style: str = "google" # google / numpy / sphinx ) -> str: """自动生成文档注释"""
style_examples = { "google": 'Args:\n x (int): ...\nReturns:\n str: ...', "numpy": 'Parameters\n----------\nx : int\n ...', "sphinx": ':param x: ...\n:type x: int\n:returns: ...' } response = self.client.chat.completions.create( model="gpt-4o-mini", messages=[{ "role": "user", "content": f"""为以下代码生成 {style} 风格的文档注释:
{code}文档格式示例:
要求:
准确描述功能、参数、返回值和可能的异常
如有复杂逻辑,在 Notes 部分说明
直接输出添加了文档注释的完整代码""" }], temperature=0.1 )
return response.choices[0].message.content.strip()
---
#### 8.5.3 自然语言转 SQL(Text-to-SQL)
Text-to-SQL 的核心难点是**准确理解业务语义**和**生成语法正确的 SQL**,需要将数据库 Schema 作为上下文注入。
```python
from typing import List, Dict, Optional, Tuple
import json
import re
@dataclass
class TableSchema:
"""数据库表结构"""
name: str
columns: List[Dict] # [{"name": "id", "type": "INT", "comment": "主键"}]
primary_key: str = "id"
sample_data: List[Dict] = None # 少量示例数据,帮助模型理解数据形态
@dataclass
class SQLResult:
sql: str
explanation: str # SQL 逻辑解释
confidence: float # 0-1,置信度
warnings: List[str] # 潜在风险提示(如全表扫描)
alternatives: List[str] # 备选 SQL
class Text2SQLConverter:
"""自然语言转 SQL 转换器"""
def __init__(self, model: str = "gpt-4o", dialect: str = "mysql"):
self.client = OpenAI()
self.model = model
self.dialect = dialect # mysql / postgresql / sqlite / bigquery
def build_schema_context(
self,
tables: List[TableSchema],
max_sample_rows: int = 3
) -> str:
"""构建 Schema 上下文提示"""
schema_parts = []
for table in tables:
# 生成 CREATE TABLE 语句
columns_def = []
for col in table.columns:
col_def = f" {col['name']} {col['type']}"
if col.get("comment"):
col_def += f" -- {col['comment']}"
columns_def.append(col_def)
create_sql = f"CREATE TABLE {table.name} (\n"
create_sql += ",\n".join(columns_def)
create_sql += f"\n);"
schema_parts.append(create_sql)
# 添加示例数据
if table.sample_data and max_sample_rows > 0:
samples = table.sample_data[:max_sample_rows]
schema_parts.append(f"-- {table.name} 示例数据:")
schema_parts.append(
f"-- " + str(samples).replace("\n", " ")
)
return "\n\n".join(schema_parts)
def convert(
self,
question: str,
tables: List[TableSchema],
additional_context: str = "",
allow_dml: bool = False # 是否允许 INSERT/UPDATE/DELETE
) -> SQLResult:
"""将自然语言问题转换为 SQL"""
schema_context = self.build_schema_context(tables)
dml_restriction = "" if allow_dml else "只生成 SELECT 查询,不生成 INSERT/UPDATE/DELETE/DROP 等修改语句。"
prompt = f"""你是一名精通 {self.dialect} 的数据库专家。请将用户的自然语言问题转换为 SQL 查询。
数据库结构(Schema):
```sql
{schema_context}{f'业务背景:{additional_context}' if additional_context else ''}
用户问题:
以 JSON 格式返回: {{ "sql": "完整的 SQL 查询", "explanation": "SQL 逻辑解释(每个子句的作用)", "confidence": 0.95, "warnings": ["注意:此查询可能导致全表扫描,建议添加索引"], "alternatives": ["等价的备选 SQL(如果有更简洁的写法)"] }}
SQL 要求:
语法完全符合 {self.dialect} 规范
使用表别名提高可读性
复杂查询添加注释
注意 NULL 处理、字符串比较大小写 """
response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"}, temperature=0.1 ) data = json.loads(response.choices[0].message.content) # 安全检查 sql = data.get("sql", "") warnings = data.get("warnings", []) if not allow_dml: dml_check = self._check_dml(sql) if dml_check: warnings.insert(0, f"安全拦截:检测到 {dml_check} 操作,已阻止执行") sql = f"-- 已拦截:{sql}" return SQLResult( sql=sql, explanation=data.get("explanation", ""), confidence=data.get("confidence", 0.8), warnings=warnings, alternatives=data.get("alternatives", []) )def _check_dml(self, sql: str) -> Optional[str]: """检测危险的 DML 语句""" sql_upper = sql.upper().strip() for keyword in ["INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE", "ALTER"]: if re.search(rf'\b{keyword}\b', sql_upper): return keyword return None
def iterative_refine( self, question: str, tables: List[TableSchema], user_feedback: str, previous_sql: str ) -> SQLResult: """基于用户反馈迭代优化 SQL"""
response = self.client.chat.completions.create( model=self.model, messages=[{ "role": "user", "content": f"""基于以下反馈修改 SQL:
原始问题:{question} 当前 SQL:
{previous_sql}用户反馈:
请修正 SQL 并解释修改内容,以 JSON 格式返回(同上次格式)。""" }], response_format={"type": "json_object"}, temperature=0.1 )
data = json.loads(response.choices[0].message.content)
return SQLResult(
sql=data.get("sql", ""),
explanation=data.get("explanation", ""),
confidence=data.get("confidence", 0.9),
warnings=data.get("warnings", []),
alternatives=data.get("alternatives", [])
)
def explain_sql(self, sql: str) -> str:
"""用自然语言解释已有 SQL"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"""用通俗的语言解释这段 SQL 的功能:
{sql}解释结构:
查询目的(一句话)
数据来源(从哪些表取数)
过滤条件
聚合逻辑(如有)
排序与限制
性能注意事项""" }] )
return response.choices[0].message.content.strip()
---
#### 8.5.4 Agent 自动化编码
Agent 编码系统能够**自主规划、编写、执行、测试、修复**代码,完成完整的编程任务。
```python
from enum import Enum
from typing import List, Dict, Any, Callable
import subprocess
import tempfile
import os
import json
class TaskStatus(Enum):
PLANNING = "planning"
CODING = "coding"
TESTING = "testing"
FIXING = "fixing"
DONE = "done"
FAILED = "failed"
@dataclass
class AgentStep:
action: str # plan / write_file / run_code / run_tests / fix_code
description: str
result: str = ""
success: bool = False
class CodingAgent:
"""
自动化编码 Agent:
接收任务描述 → 制定计划 → 编写代码 → 运行测试 → 自动修复 → 交付成果
"""
MAX_FIX_ATTEMPTS = 3 # 最大自动修复次数
def __init__(self, model: str = "gpt-4o", workspace_dir: str = "/tmp/agent_workspace"):
self.client = OpenAI()
self.model = model
self.workspace = workspace_dir
os.makedirs(workspace_dir, exist_ok=True)
self.steps: List[AgentStep] = []
self.status = TaskStatus.PLANNING
self.file_registry: Dict[str, str] = {} # 记录生成的文件
def run(self, task: str, language: str = "python") -> Dict[str, Any]:
"""
主执行入口:完整自动化编码流程
"""
print(f"🤖 开始任务:{task}\n")
self.steps = []
try:
# 1. 规划
plan = self._plan_task(task, language)
# 2. 逐步执行计划
for step in plan:
action = step.get("action")
if action == "write_file":
self._write_file(step["filename"], step["content"], step["description"])
elif action == "run_code":
output, success = self._run_code(step["filename"])
if not success:
# 自动修复
self._auto_fix_loop(step["filename"], output, language)
elif action == "write_tests":
self._write_tests(step["filename"], step["source_file"])
elif action == "run_tests":
self._run_tests(step["filename"])
self.status = TaskStatus.DONE
except Exception as e:
self.status = TaskStatus.FAILED
self.steps.append(AgentStep("error", str(e), str(e), False))
return self._generate_report(task)
def _plan_task(self, task: str, language: str) -> List[Dict]:
"""制定任务执行计划"""
self.status = TaskStatus.PLANNING
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "system",
"content": f"你是一个自动化编程 Agent,擅长将任务分解为可执行步骤,使用 {language} 实现。"
}, {
"role": "user",
"content": f"""为以下编程任务制定执行计划:
任务:{task}
语言:{language}
以 JSON 格式返回执行步骤列表:
{{
"steps": [
{{
"action": "write_file",
"filename": "main.py",
"description": "编写主模块",
"content": "完整的文件代码"
}},
{{
"action": "write_tests",
"filename": "test_main.py",
"source_file": "main.py",
"description": "编写单元测试"
}},
{{
"action": "run_tests",
"filename": "test_main.py",
"description": "运行测试验证"
}}
]
}}
action 取值:write_file / write_tests / run_code / run_tests
"""
}],
response_format={"type": "json_object"}
)
plan_data = json.loads(response.choices[0].message.content)
plan = plan_data.get("steps", [])
self.steps.append(AgentStep(
action="plan",
description=f"制定执行计划:{len(plan)} 个步骤",
result=str([s.get("description") for s in plan]),
success=True
))
print(f"📋 执行计划(共 {len(plan)} 步):")
for i, s in enumerate(plan, 1):
print(f" {i}. [{s['action']}] {s.get('description', '')}")
print()
return plan
def _write_file(self, filename: str, content: str, description: str):
"""写入文件到工作目录"""
self.status = TaskStatus.CODING
# 清理 markdown 代码块
import re
content = re.sub(r'^```\w*\n?', '', content, flags=re.MULTILINE)
content = re.sub(r'```$', '', content, flags=re.MULTILINE)
content = content.strip()
filepath = os.path.join(self.workspace, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
self.file_registry[filename] = filepath
self.steps.append(AgentStep(
action="write_file",
description=description,
result=f"已写入 {filename}({len(content.splitlines())} 行)",
success=True
))
print(f"✅ 写入 {filename}")
def _run_code(self, filename: str) -> Tuple[str, bool]:
"""执行代码文件"""
filepath = self.file_registry.get(filename)
if not filepath:
return f"文件不存在:{filename}", False
try:
result = subprocess.run(
["python3", filepath],
capture_output=True,
text=True,
timeout=30,
cwd=self.workspace
)
output = result.stdout + result.stderr
success = result.returncode == 0
self.steps.append(AgentStep(
action="run_code",
description=f"运行 {filename}",
result=output[:500],
success=success
))
return output, success
except subprocess.TimeoutExpired:
return "执行超时(>30s)", False
def _run_tests(self, test_file: str) -> bool:
"""运行测试文件"""
self.status = TaskStatus.TESTING
filepath = self.file_registry.get(test_file, os.path.join(self.workspace, test_file))
result = subprocess.run(
["python3", "-m", "pytest", filepath, "-v", "--tb=short"],
capture_output=True,
text=True,
timeout=60,
cwd=self.workspace
)
output = result.stdout + result.stderr
success = result.returncode == 0
self.steps.append(AgentStep(
action="run_tests",
description=f"运行测试 {test_file}",
result=output[:1000],
success=success
))
print(f"{'✅' if success else '❌'} 测试{'通过' if success else '失败'}")
if not success:
print(f" 错误摘要:{output[-300:]}")
return success
def _write_tests(self, test_filename: str, source_file: str):
"""为源文件生成测试代码"""
source_path = self.file_registry.get(source_file)
if not source_path:
return
with open(source_path, 'r') as f:
source_code = f.read()
generator = CodeGenerator(model=self.model)
test_code = generator.generate_tests(source_code, "python", "pytest")
# 确保导入路径正确
module_name = source_file.replace(".py", "")
if f"from {module_name}" not in test_code and f"import {module_name}" not in test_code:
test_code = f"from {module_name} import *\n\n" + test_code
self._write_file(test_filename, test_code, f"生成 {source_file} 的单元测试")
def _auto_fix_loop(self, filename: str, error_output: str, language: str):
"""自动修复循环:最多尝试 MAX_FIX_ATTEMPTS 次"""
self.status = TaskStatus.FIXING
filepath = self.file_registry.get(filename)
for attempt in range(1, self.MAX_FIX_ATTEMPTS + 1):
print(f"🔧 自动修复第 {attempt} 次...")
with open(filepath, 'r') as f:
current_code = f.read()
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": f"""修复以下 {language} 代码中的错误:
代码({filename}):
```{language}
{current_code}错误信息:
{error_output[:2000]}要求:
- 分析错误根因
- 最小化修改(只改有问题的部分)
- 直接输出修复后的完整代码(不要解释)
}],
temperature=0.1,
stop=["```"]
)
fixed_code = response.choices[0].message.content.strip()
# 写回修复后的代码
with open(filepath, 'w') as f:
f.write(fixed_code)
# 重新运行验证
output, success = self._run_code(filename)
if success:
self.steps.append(AgentStep(
action="fix_code",
description=f"第 {attempt} 次修复成功",
result=output,
success=True
))
print(f"✅ 修复成功")
return
error_output = output
# 超过最大修复次数
raise RuntimeError(f"自动修复失败,已尝试 {self.MAX_FIX_ATTEMPTS} 次")
def _generate_report(self, task: str) -> Dict[str, Any]:
"""生成任务执行报告"""
success_count = sum(1 for s in self.steps if s.success)
total_count = len(self.steps)
return {
"task": task,
"status": self.status.value,
"success_rate": f"{success_count}/{total_count}",
"files_generated": list(self.file_registry.keys()),
"steps": [
{
"action": s.action,
"description": s.description,
"success": s.success,
"result_preview": s.result[:100]
}
for s in self.steps
],
"workspace": self.workspace
}8.5.5 FastAPI 服务实现
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, List
import asyncio
app = FastAPI(title="AI 代码助手")
class CodeGenRequest(BaseModel):
description: str
language: str = "python"
context_code: str = ""
include_tests: bool = True
class CodeReviewRequest(BaseModel):
code: str
language: str = "python"
context: str = ""
class SQLRequest(BaseModel):
question: str
tables: List[dict] # TableSchema 序列化
dialect: str = "mysql"
allow_dml: bool = False
class AgentRequest(BaseModel):
task: str
language: str = "python"
class ExplainRequest(BaseModel):
code: str
language: str = "python"
level: str = "intermediate"
@app.post("/code/generate")
async def generate_code(req: CodeGenRequest):
"""代码生成接口"""
generator = CodeGenerator(model="gpt-4o")
result = generator.generate_function(
description=req.description,
language=req.language,
context_code=req.context_code,
include_tests=req.include_tests
)
return {
"code": result.code,
"explanation": result.explanation,
"test_code": result.test_code,
"is_runnable": result.is_runnable,
"syntax_errors": result.syntax_errors
}
@app.post("/code/review")
async def review_code(req: CodeReviewRequest):
"""代码审查接口"""
reviewer = CodeReviewer(model="gpt-4o")
result = reviewer.review(
code=req.code,
language=req.language,
context=req.context
)
return {
"overall_score": result.overall_score,
"summary": result.summary,
"issues": [
{
"severity": i.severity,
"category": i.category,
"line_range": i.line_range,
"description": i.description,
"suggestion": i.suggestion
}
for i in result.issues
],
"refactored_code": result.refactored_code
}
@app.post("/code/explain")
async def explain_code(req: ExplainRequest):
"""代码解释接口"""
reviewer = CodeReviewer()
explanation = reviewer.explain_code(req.code, req.language, req.level)
return {"explanation": explanation}
@app.post("/sql/convert")
async def convert_to_sql(req: SQLRequest):
"""自然语言转 SQL"""
tables = [TableSchema(**t) for t in req.tables]
converter = Text2SQLConverter(dialect=req.dialect)
result = converter.convert(
question=req.question,
tables=tables,
allow_dml=req.allow_dml
)
return {
"sql": result.sql,
"explanation": result.explanation,
"confidence": result.confidence,
"warnings": result.warnings,
"alternatives": result.alternatives
}
@app.post("/agent/run")
async def run_agent(req: AgentRequest):
"""Agent 自动化编码(同步,适合轻量任务)"""
agent = CodingAgent(model="gpt-4o")
report = agent.run(req.task, req.language)
return report
@app.post("/agent/run/stream")
async def run_agent_stream(req: AgentRequest):
"""Agent 自动化编码(流式进度推送)"""
from openai import AsyncOpenAI
async def generate():
agent = CodingAgent(model="gpt-4o")
# 注入进度回调
original_step_append = agent.steps.append
def tracked_append(step: AgentStep):
original_step_append(step)
import json
progress = json.dumps({
"action": step.action,
"description": step.description,
"success": step.success
}, ensure_ascii=False)
asyncio.create_task(send_event(progress))
agent.steps.append = tracked_append
yield f"data: {json.dumps({'type': 'start', 'task': req.task}, ensure_ascii=False)}\n\n"
try:
report = agent.run(req.task, req.language)
yield f"data: {json.dumps({'type': 'done', 'report': report}, ensure_ascii=False)}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)}, ensure_ascii=False)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")系统关键指标
| 指标 | 目标值 | 优化方向 |
|---|---|---|
| 代码生成一次通过率 | ≥ 80% | Few-shot、类型提示、语法验证 |
| 审查问题检测率 | ≥ 90% | 分维度审查、专用安全模型 |
| Text-to-SQL 准确率 | ≥ 85% | Schema 注入、Few-shot 样例、迭代修正 |
| Agent 任务完成率 | ≥ 70% | 自动重试、沙箱执行、最大尝试次数 |
| 平均响应延迟 | ≤ 5s | 流式输出、小模型预筛 |