Skip to content

8.5 数据分析 Agent(代码助手)

自然语言转 SQL、代码生成与审查、数据可视化——让 AI 成为你的数据分析搭档。

难度:⭐⭐⭐⭐ | 预计时长:2-3 周


代码助手是开发者效率工具中增长最快的 AI 应用,核心挑战是:精准理解代码上下文、生成可运行的代码、识别安全漏洞、自主规划并执行多步编程任务。本节构建一个涵盖代码生成、审查、SQL 转换及自动化编码的完整系统。

系统架构

开发者输入(自然语言 / 代码片段 / 函数签名)

       ┌────────────┼────────────┐
       │            │            │
  ┌────▼────┐  ┌───▼────┐  ┌───▼────┐
  │ 代码生成 │  │代码审查│  │NL→SQL  │
  │ & 补全  │  │& 解释  │  │        │
  └────┬────┘  └───┬────┘  └───┬────┘
       │            │            │
       └────────────┼────────────┘

           ┌────────▼────────┐
           │  Agent 自动化   │
           │  编码执行引擎   │
           │  (Plan→Code→   │
           │   Test→Fix)    │
           └─────────────────┘

8.5.1 代码生成与补全

代码生成的核心策略

场景策略关键技术
函数生成函数签名 + 文档注释 → 实现体Few-shot、类型提示
类/模块生成需求描述 → 完整类定义结构化输出、接口设计
代码补全光标上下文 → 续写前缀/后缀填充(FIM)
测试生成函数代码 → 单元测试边界覆盖、Mock 推断
代码翻译A 语言 → B 语言语义保留、惯用法转换

代码生成器实现

python
from openai import OpenAI
from dataclasses import dataclass
from typing import List, Optional
import ast
import subprocess
import tempfile
import os

client = OpenAI()

@dataclass
class CodeGenerationResult:
    code: str
    language: str
    explanation: str
    test_code: Optional[str] = None
    is_runnable: bool = False
    syntax_errors: List[str] = None

class CodeGenerator:
    """智能代码生成器"""

    LANGUAGE_CONFIGS = {
        "python": {
            "comment": "#",
            "extension": ".py",
            "run_cmd": "python3"
        },
        "javascript": {
            "comment": "//",
            "extension": ".js",
            "run_cmd": "node"
        },
        "typescript": {
            "comment": "//",
            "extension": ".ts",
            "run_cmd": "ts-node"
        },
        "go": {
            "comment": "//",
            "extension": ".go",
            "run_cmd": "go run"
        },
        "sql": {
            "comment": "--",
            "extension": ".sql",
            "run_cmd": None
        }
    }

    def __init__(self, model: str = "gpt-4o"):
        self.client = OpenAI()
        self.model = model

    def generate_function(
        self,
        description: str,
        language: str = "python",
        context_code: str = "",
        style_guide: str = "",
        include_tests: bool = True
    ) -> CodeGenerationResult:
        """根据自然语言描述生成函数"""

        system_prompt = f"""你是一名资深{language}工程师,擅长编写清晰、高效、符合最佳实践的代码。
代码要求:
- 包含完整的类型注解(如语言支持)
- 添加简洁的文档字符串
- 处理边界情况和异常
- 变量命名清晰、符合{language}命名规范
{f'风格指南:{style_guide}' if style_guide else ''}
"""

        user_prompt = f"""请实现以下功能:

{description}

{f'已有代码上下文:\n```{language}\n{context_code}\n```' if context_code else ''}

请以 JSON 格式返回:
{{
  "code": "完整的函数/类代码",
  "explanation": "实现思路简要说明(2-3句)",
  "complexity": "时间复杂度 O(?)",
  "edge_cases": ["处理了哪些边界情况"]
}}
"""

        import json
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            response_format={"type": "json_object"},
            temperature=0.2
        )

        result = json.loads(response.choices[0].message.content)
        code = result.get("code", "")

        # 语法检查(Python)
        syntax_errors = []
        if language == "python":
            syntax_errors = self._check_python_syntax(code)

        # 生成测试代码
        test_code = None
        if include_tests and not syntax_errors:
            test_code = self.generate_tests(code, language)

        return CodeGenerationResult(
            code=code,
            language=language,
            explanation=result.get("explanation", ""),
            test_code=test_code,
            is_runnable=len(syntax_errors) == 0,
            syntax_errors=syntax_errors
        )

    def generate_tests(
        self,
        source_code: str,
        language: str = "python",
        test_framework: str = "pytest"
    ) -> str:
        """为代码自动生成单元测试"""

        framework_guide = {
            "pytest": "使用 pytest,测试函数以 test_ 开头,使用 assert 断言",
            "unittest": "使用 unittest.TestCase,测试方法以 test_ 开头",
            "jest": "使用 Jest,describe/it 结构,expect().toBe() 断言",
            "go_test": "使用 Go testing 包,测试函数以 Test 开头"
        }

        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{
                "role": "user",
                "content": f"""为以下代码编写完整的单元测试:

```{language}


测试框架:{framework_guide.get(test_framework, test_framework)}

覆盖要求:
1. 正常输入的典型用例(≥3个)
2. 边界条件(空输入、最大值、最小值等)
3. 异常情况(非法输入、类型错误等)
4. 每个测试用例附注释说明测试意图

直接输出测试代码,不要任何额外说明。"""
            }],
            temperature=0.2
        )

        return response.choices[0].message.content.strip()

    def translate_code(
        self,
        source_code: str,
        from_lang: str,
        to_lang: str
    ) -> str:
        """跨语言代码翻译"""

        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{
                "role": "user",
                "content": f"""将以下 {from_lang} 代码翻译为 {to_lang},要求:
1. 保持功能完全一致
2. 使用 {to_lang} 的惯用写法(不要逐行直译)
3. 保留注释的语义
4. 使用 {to_lang} 的标准库(避免不必要的第三方依赖)

```{from_lang}
{source_code}

直接输出翻译后的 {to_lang} 代码。""" }], temperature=0.1 )

    return response.choices[0].message.content.strip()

def _check_python_syntax(self, code: str) -> List[str]:
    """检查 Python 代码语法错误"""
    errors = []
    # 提取代码块(去除 markdown 格式)
    import re
    code_match = re.search(r'```(?:python)?\n([\s\S]*?)```', code)
    clean_code = code_match.group(1) if code_match else code

    try:
        ast.parse(clean_code)
    except SyntaxError as e:
        errors.append(f"SyntaxError at line {e.lineno}: {e.msg}")
    return errors

def complete_code(
    self,
    prefix: str,
    suffix: str = "",
    language: str = "python",
    max_tokens: int = 200
) -> str:
    """代码补全(光标位置续写,支持 FIM 模式)"""

    if suffix:
        # FIM(Fill-in-the-Middle)模式
        prompt = f"```{language}\n{prefix}<|fim_middle|>{suffix}\n```\n只输出填充的代码,不要任何解释:"
    else:
        prompt = f"```{language}\n{prefix}"

    response = self.client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "system",
            "content": f"你是代码补全助手,直接续写{language}代码,不要任何注释或解释。"
        }, {
            "role": "user",
            "content": prompt
        }],
        max_tokens=max_tokens,
        temperature=0.1,
        stop=["```"]
    )

    return response.choices[0].message.content.strip()

---

#### 8.5.2 代码审查与解释

**多维度代码审查**

```python
import json
from typing import Dict, List

@dataclass
class ReviewIssue:
    severity: str      # critical / warning / suggestion
    category: str      # security / performance / style / logic
    line_range: str    # "L10-L15"
    description: str
    suggestion: str

@dataclass
class CodeReviewResult:
    issues: List[ReviewIssue]
    overall_score: int   # 1-10
    summary: str
    refactored_code: Optional[str] = None

class CodeReviewer:
    """代码审查器:安全、性能、可读性、逻辑多维检查"""

    REVIEW_PROMPT = """请对以下 {language} 代码进行专业 Code Review。

```{language}
{code}
::: v-pre

从以下维度检查:

  1. 安全性:注入攻击、敏感信息泄露、权限控制、输入校验
  2. 性能:时间/空间复杂度、不必要的循环、N+1 查询、内存泄漏
  3. 可读性:命名规范、注释完整性、函数长度、职责单一
  4. 逻辑正确性:边界条件、并发安全、异常处理、资源释放
  5. 最佳实践:设计模式、语言惯用法、依赖管理

以 JSON 格式返回: {{ "overall_score": 7, "summary": "整体评价(2-3句)", "issues": [ {{ "severity": "critical", "category": "security", "line_range": "L15-L18", "description": "问题描述", "suggestion": "具体修复建议" }} ] }}

severity 取值:critical(必须修复)/ warning(建议修复)/ suggestion(优化建议) """

def __init__(self, model: str = "gpt-4o"):
    self.client = OpenAI()
    self.model = model

def review(
    self,
    code: str,
    language: str = "python",
    context: str = "",
    focus: List[str] = None
) -> CodeReviewResult:
    """执行代码审查"""

    prompt = self.REVIEW_PROMPT.format(
        language=language,
        code=code
    )

    if context:
        prompt += f"\n\n**业务上下文**:{context}"

    if focus:
        prompt += f"\n\n**重点关注**:{', '.join(focus)}"

    response = self.client.chat.completions.create(
        model=self.model,
        messages=[{
            "role": "system",
            "content": "你是一名具有10年经验的资深工程师,擅长代码安全审查和性能优化。"
        }, {
            "role": "user",
            "content": prompt
        }],
        response_format={"type": "json_object"},
        temperature=0.1
    )

    data = json.loads(response.choices[0].message.content)

    issues = [
        ReviewIssue(**issue)
        for issue in data.get("issues", [])
    ]

    # 自动生成修复后的代码(仅针对 critical 和 warning 问题)
    critical_issues = [i for i in issues if i.severity in ("critical", "warning")]
    refactored = None
    if critical_issues:
        refactored = self._auto_fix(code, language, critical_issues)

    return CodeReviewResult(
        issues=issues,
        overall_score=data.get("overall_score", 5),
        summary=data.get("summary", ""),
        refactored_code=refactored
    )

def _auto_fix(
    self,
    code: str,
    language: str,
    issues: List[ReviewIssue]
) -> str:
    """根据审查问题自动修复代码"""

    issues_desc = "\n".join([
        f"- [{i.severity.upper()}] {i.line_range}: {i.description} → {i.suggestion}"
        for i in issues
    ])

    response = self.client.chat.completions.create(
        model=self.model,
        messages=[{
            "role": "user",
            "content": f"""修复以下 {language} 代码中的问题:

原始代码:

:::
{code}

需修复的问题:

要求:

  • 只修复列出的问题,不做其他改动
  • 在修改处添加简短注释说明修复原因
  • 直接输出修复后的完整代码
            }],
            temperature=0.1,
            stop=["```"]
        )

        return response.choices[0].message.content.strip()

    def explain_code(
        self,
        code: str,
        language: str = "python",
        level: str = "intermediate"  # beginner / intermediate / expert
    ) -> str:
        """代码解释(适配不同学习层次)"""

        level_guide = {
            "beginner": "用通俗易懂的语言解释,避免术语,多用类比",
            "intermediate": "解释实现原理和关键技术点,适当提及设计选择",
            "expert": "深入分析算法复杂度、设计模式、潜在优化空间"
        }

        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{
                "role": "user",
                "content": f"""请解释以下 {language} 代码:

```{language}
{code}

目标读者:

解释结构:

  1. 功能概述(一句话)

  2. 逐段解析(每段代码的作用)

  3. 关键算法或设计说明

  4. 使用示例""" }] )

     return response.choices[0].message.content.strip()
    

    def generate_docstring( self, code: str, language: str = "python", style: str = "google" # google / numpy / sphinx ) -> str: """自动生成文档注释"""

     style_examples = {
         "google": 'Args:\n    x (int): ...\nReturns:\n    str: ...',
         "numpy": 'Parameters\n----------\nx : int\n    ...',
         "sphinx": ':param x: ...\n:type x: int\n:returns: ...'
     }
    
     response = self.client.chat.completions.create(
         model="gpt-4o-mini",
         messages=[{
             "role": "user",
             "content": f"""为以下代码生成 {style} 风格的文档注释:
    
{code}

文档格式示例:

要求:

  • 准确描述功能、参数、返回值和可能的异常

  • 如有复杂逻辑,在 Notes 部分说明

  • 直接输出添加了文档注释的完整代码""" }], temperature=0.1 )

      return response.choices[0].message.content.strip()
    

---

#### 8.5.3 自然语言转 SQL(Text-to-SQL)

Text-to-SQL 的核心难点是**准确理解业务语义**和**生成语法正确的 SQL**,需要将数据库 Schema 作为上下文注入。

```python
from typing import List, Dict, Optional, Tuple
import json
import re

@dataclass
class TableSchema:
    """数据库表结构"""
    name: str
    columns: List[Dict]        # [{"name": "id", "type": "INT", "comment": "主键"}]
    primary_key: str = "id"
    sample_data: List[Dict] = None  # 少量示例数据,帮助模型理解数据形态

@dataclass
class SQLResult:
    sql: str
    explanation: str           # SQL 逻辑解释
    confidence: float          # 0-1,置信度
    warnings: List[str]        # 潜在风险提示(如全表扫描)
    alternatives: List[str]    # 备选 SQL

class Text2SQLConverter:
    """自然语言转 SQL 转换器"""

    def __init__(self, model: str = "gpt-4o", dialect: str = "mysql"):
        self.client = OpenAI()
        self.model = model
        self.dialect = dialect  # mysql / postgresql / sqlite / bigquery

    def build_schema_context(
        self,
        tables: List[TableSchema],
        max_sample_rows: int = 3
    ) -> str:
        """构建 Schema 上下文提示"""

        schema_parts = []
        for table in tables:
            # 生成 CREATE TABLE 语句
            columns_def = []
            for col in table.columns:
                col_def = f"  {col['name']} {col['type']}"
                if col.get("comment"):
                    col_def += f"  -- {col['comment']}"
                columns_def.append(col_def)

            create_sql = f"CREATE TABLE {table.name} (\n"
            create_sql += ",\n".join(columns_def)
            create_sql += f"\n);"

            schema_parts.append(create_sql)

            # 添加示例数据
            if table.sample_data and max_sample_rows > 0:
                samples = table.sample_data[:max_sample_rows]
                schema_parts.append(f"-- {table.name} 示例数据:")
                schema_parts.append(
                    f"-- " + str(samples).replace("\n", " ")
                )

        return "\n\n".join(schema_parts)

    def convert(
        self,
        question: str,
        tables: List[TableSchema],
        additional_context: str = "",
        allow_dml: bool = False   # 是否允许 INSERT/UPDATE/DELETE
    ) -> SQLResult:
        """将自然语言问题转换为 SQL"""

        schema_context = self.build_schema_context(tables)

        dml_restriction = "" if allow_dml else "只生成 SELECT 查询,不生成 INSERT/UPDATE/DELETE/DROP 等修改语句。"

        prompt = f"""你是一名精通 {self.dialect} 的数据库专家。请将用户的自然语言问题转换为 SQL 查询。

数据库结构(Schema):
```sql
{schema_context}

{f'业务背景:{additional_context}' if additional_context else ''}

用户问题:

以 JSON 格式返回: {{ "sql": "完整的 SQL 查询", "explanation": "SQL 逻辑解释(每个子句的作用)", "confidence": 0.95, "warnings": ["注意:此查询可能导致全表扫描,建议添加索引"], "alternatives": ["等价的备选 SQL(如果有更简洁的写法)"] }}

SQL 要求:

  • 语法完全符合 {self.dialect} 规范

  • 使用表别名提高可读性

  • 复杂查询添加注释

  • 注意 NULL 处理、字符串比较大小写 """

      response = self.client.chat.completions.create(
          model=self.model,
          messages=[{"role": "user", "content": prompt}],
          response_format={"type": "json_object"},
          temperature=0.1
      )
    
      data = json.loads(response.choices[0].message.content)
    
      # 安全检查
      sql = data.get("sql", "")
      warnings = data.get("warnings", [])
      if not allow_dml:
          dml_check = self._check_dml(sql)
          if dml_check:
              warnings.insert(0, f"安全拦截:检测到 {dml_check} 操作,已阻止执行")
              sql = f"-- 已拦截:{sql}"
    
      return SQLResult(
          sql=sql,
          explanation=data.get("explanation", ""),
          confidence=data.get("confidence", 0.8),
          warnings=warnings,
          alternatives=data.get("alternatives", [])
      )
    

    def _check_dml(self, sql: str) -> Optional[str]: """检测危险的 DML 语句""" sql_upper = sql.upper().strip() for keyword in ["INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE", "ALTER"]: if re.search(rf'\b{keyword}\b', sql_upper): return keyword return None

    def iterative_refine( self, question: str, tables: List[TableSchema], user_feedback: str, previous_sql: str ) -> SQLResult: """基于用户反馈迭代优化 SQL"""

      response = self.client.chat.completions.create(
          model=self.model,
          messages=[{
              "role": "user",
              "content": f"""基于以下反馈修改 SQL:
    

原始问题:{question} 当前 SQL:

sql
{previous_sql}

用户反馈:

请修正 SQL 并解释修改内容,以 JSON 格式返回(同上次格式)。""" }], response_format={"type": "json_object"}, temperature=0.1 )

    data = json.loads(response.choices[0].message.content)
    return SQLResult(
        sql=data.get("sql", ""),
        explanation=data.get("explanation", ""),
        confidence=data.get("confidence", 0.9),
        warnings=data.get("warnings", []),
        alternatives=data.get("alternatives", [])
    )

def explain_sql(self, sql: str) -> str:
    """用自然语言解释已有 SQL"""

    response = self.client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": f"""用通俗的语言解释这段 SQL 的功能:
sql
{sql}

解释结构:

  1. 查询目的(一句话)

  2. 数据来源(从哪些表取数)

  3. 过滤条件

  4. 聚合逻辑(如有)

  5. 排序与限制

  6. 性能注意事项""" }] )

     return response.choices[0].message.content.strip()
    

---

#### 8.5.4 Agent 自动化编码

Agent 编码系统能够**自主规划、编写、执行、测试、修复**代码,完成完整的编程任务。

```python
from enum import Enum
from typing import List, Dict, Any, Callable
import subprocess
import tempfile
import os
import json

class TaskStatus(Enum):
    PLANNING = "planning"
    CODING = "coding"
    TESTING = "testing"
    FIXING = "fixing"
    DONE = "done"
    FAILED = "failed"

@dataclass
class AgentStep:
    action: str       # plan / write_file / run_code / run_tests / fix_code
    description: str
    result: str = ""
    success: bool = False

class CodingAgent:
    """
    自动化编码 Agent:
    接收任务描述 → 制定计划 → 编写代码 → 运行测试 → 自动修复 → 交付成果
    """

    MAX_FIX_ATTEMPTS = 3  # 最大自动修复次数

    def __init__(self, model: str = "gpt-4o", workspace_dir: str = "/tmp/agent_workspace"):
        self.client = OpenAI()
        self.model = model
        self.workspace = workspace_dir
        os.makedirs(workspace_dir, exist_ok=True)

        self.steps: List[AgentStep] = []
        self.status = TaskStatus.PLANNING
        self.file_registry: Dict[str, str] = {}  # 记录生成的文件

    def run(self, task: str, language: str = "python") -> Dict[str, Any]:
        """
        主执行入口:完整自动化编码流程
        """
        print(f"🤖 开始任务:{task}\n")
        self.steps = []

        try:
            # 1. 规划
            plan = self._plan_task(task, language)

            # 2. 逐步执行计划
            for step in plan:
                action = step.get("action")

                if action == "write_file":
                    self._write_file(step["filename"], step["content"], step["description"])

                elif action == "run_code":
                    output, success = self._run_code(step["filename"])
                    if not success:
                        # 自动修复
                        self._auto_fix_loop(step["filename"], output, language)

                elif action == "write_tests":
                    self._write_tests(step["filename"], step["source_file"])

                elif action == "run_tests":
                    self._run_tests(step["filename"])

            self.status = TaskStatus.DONE

        except Exception as e:
            self.status = TaskStatus.FAILED
            self.steps.append(AgentStep("error", str(e), str(e), False))

        return self._generate_report(task)

    def _plan_task(self, task: str, language: str) -> List[Dict]:
        """制定任务执行计划"""
        self.status = TaskStatus.PLANNING

        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{
                "role": "system",
                "content": f"你是一个自动化编程 Agent,擅长将任务分解为可执行步骤,使用 {language} 实现。"
            }, {
                "role": "user",
                "content": f"""为以下编程任务制定执行计划:

任务:{task}
语言:{language}

以 JSON 格式返回执行步骤列表:
&#123;&#123;
  "steps": [
    &#123;&#123;
      "action": "write_file",
      "filename": "main.py",
      "description": "编写主模块",
      "content": "完整的文件代码"
    &#125;&#125;,
    &#123;&#123;
      "action": "write_tests",
      "filename": "test_main.py",
      "source_file": "main.py",
      "description": "编写单元测试"
    &#125;&#125;,
    &#123;&#123;
      "action": "run_tests",
      "filename": "test_main.py",
      "description": "运行测试验证"
    &#125;&#125;
  ]
&#125;&#125;

action 取值:write_file / write_tests / run_code / run_tests
"""
            }],
            response_format={"type": "json_object"}
        )

        plan_data = json.loads(response.choices[0].message.content)
        plan = plan_data.get("steps", [])

        self.steps.append(AgentStep(
            action="plan",
            description=f"制定执行计划:{len(plan)} 个步骤",
            result=str([s.get("description") for s in plan]),
            success=True
        ))

        print(f"📋 执行计划(共 {len(plan)} 步):")
        for i, s in enumerate(plan, 1):
            print(f"  {i}. [{s['action']}] {s.get('description', '')}")
        print()

        return plan

    def _write_file(self, filename: str, content: str, description: str):
        """写入文件到工作目录"""
        self.status = TaskStatus.CODING

        # 清理 markdown 代码块
        import re
        content = re.sub(r'^```\w*\n?', '', content, flags=re.MULTILINE)
        content = re.sub(r'```$', '', content, flags=re.MULTILINE)
        content = content.strip()

        filepath = os.path.join(self.workspace, filename)
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(content)

        self.file_registry[filename] = filepath

        self.steps.append(AgentStep(
            action="write_file",
            description=description,
            result=f"已写入 {filename}({len(content.splitlines())} 行)",
            success=True
        ))
        print(f"✅ 写入 {filename}")

    def _run_code(self, filename: str) -> Tuple[str, bool]:
        """执行代码文件"""
        filepath = self.file_registry.get(filename)
        if not filepath:
            return f"文件不存在:{filename}", False

        try:
            result = subprocess.run(
                ["python3", filepath],
                capture_output=True,
                text=True,
                timeout=30,
                cwd=self.workspace
            )
            output = result.stdout + result.stderr
            success = result.returncode == 0

            self.steps.append(AgentStep(
                action="run_code",
                description=f"运行 {filename}",
                result=output[:500],
                success=success
            ))

            return output, success

        except subprocess.TimeoutExpired:
            return "执行超时(>30s)", False

    def _run_tests(self, test_file: str) -> bool:
        """运行测试文件"""
        self.status = TaskStatus.TESTING
        filepath = self.file_registry.get(test_file, os.path.join(self.workspace, test_file))

        result = subprocess.run(
            ["python3", "-m", "pytest", filepath, "-v", "--tb=short"],
            capture_output=True,
            text=True,
            timeout=60,
            cwd=self.workspace
        )

        output = result.stdout + result.stderr
        success = result.returncode == 0

        self.steps.append(AgentStep(
            action="run_tests",
            description=f"运行测试 {test_file}",
            result=output[:1000],
            success=success
        ))

        print(f"{'✅' if success else '❌'} 测试{'通过' if success else '失败'}")
        if not success:
            print(f"   错误摘要:{output[-300:]}")

        return success

    def _write_tests(self, test_filename: str, source_file: str):
        """为源文件生成测试代码"""
        source_path = self.file_registry.get(source_file)
        if not source_path:
            return

        with open(source_path, 'r') as f:
            source_code = f.read()

        generator = CodeGenerator(model=self.model)
        test_code = generator.generate_tests(source_code, "python", "pytest")

        # 确保导入路径正确
        module_name = source_file.replace(".py", "")
        if f"from {module_name}" not in test_code and f"import {module_name}" not in test_code:
            test_code = f"from {module_name} import *\n\n" + test_code

        self._write_file(test_filename, test_code, f"生成 {source_file} 的单元测试")

    def _auto_fix_loop(self, filename: str, error_output: str, language: str):
        """自动修复循环:最多尝试 MAX_FIX_ATTEMPTS 次"""
        self.status = TaskStatus.FIXING
        filepath = self.file_registry.get(filename)

        for attempt in range(1, self.MAX_FIX_ATTEMPTS + 1):
            print(f"🔧 自动修复第 {attempt} 次...")

            with open(filepath, 'r') as f:
                current_code = f.read()

            response = self.client.chat.completions.create(
                model=self.model,
                messages=[{
                    "role": "user",
                    "content": f"""修复以下 {language} 代码中的错误:

代码({filename}):
```{language}
{current_code}

错误信息:

{error_output[:2000]}

要求:

  1. 分析错误根因
  2. 最小化修改(只改有问题的部分)
  3. 直接输出修复后的完整代码(不要解释)
                }],
                temperature=0.1,
                stop=["```"]
            )

            fixed_code = response.choices[0].message.content.strip()

            # 写回修复后的代码
            with open(filepath, 'w') as f:
                f.write(fixed_code)

            # 重新运行验证
            output, success = self._run_code(filename)
            if success:
                self.steps.append(AgentStep(
                    action="fix_code",
                    description=f"第 {attempt} 次修复成功",
                    result=output,
                    success=True
                ))
                print(f"✅ 修复成功")
                return

            error_output = output

        # 超过最大修复次数
        raise RuntimeError(f"自动修复失败,已尝试 {self.MAX_FIX_ATTEMPTS} 次")

    def _generate_report(self, task: str) -> Dict[str, Any]:
        """生成任务执行报告"""
        success_count = sum(1 for s in self.steps if s.success)
        total_count = len(self.steps)

        return {
            "task": task,
            "status": self.status.value,
            "success_rate": f"{success_count}/{total_count}",
            "files_generated": list(self.file_registry.keys()),
            "steps": [
                {
                    "action": s.action,
                    "description": s.description,
                    "success": s.success,
                    "result_preview": s.result[:100]
                }
                for s in self.steps
            ],
            "workspace": self.workspace
        }

8.5.5 FastAPI 服务实现

python
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, List
import asyncio

app = FastAPI(title="AI 代码助手")

class CodeGenRequest(BaseModel):
    description: str
    language: str = "python"
    context_code: str = ""
    include_tests: bool = True

class CodeReviewRequest(BaseModel):
    code: str
    language: str = "python"
    context: str = ""

class SQLRequest(BaseModel):
    question: str
    tables: List[dict]        # TableSchema 序列化
    dialect: str = "mysql"
    allow_dml: bool = False

class AgentRequest(BaseModel):
    task: str
    language: str = "python"

class ExplainRequest(BaseModel):
    code: str
    language: str = "python"
    level: str = "intermediate"

@app.post("/code/generate")
async def generate_code(req: CodeGenRequest):
    """代码生成接口"""
    generator = CodeGenerator(model="gpt-4o")
    result = generator.generate_function(
        description=req.description,
        language=req.language,
        context_code=req.context_code,
        include_tests=req.include_tests
    )
    return {
        "code": result.code,
        "explanation": result.explanation,
        "test_code": result.test_code,
        "is_runnable": result.is_runnable,
        "syntax_errors": result.syntax_errors
    }

@app.post("/code/review")
async def review_code(req: CodeReviewRequest):
    """代码审查接口"""
    reviewer = CodeReviewer(model="gpt-4o")
    result = reviewer.review(
        code=req.code,
        language=req.language,
        context=req.context
    )
    return {
        "overall_score": result.overall_score,
        "summary": result.summary,
        "issues": [
            {
                "severity": i.severity,
                "category": i.category,
                "line_range": i.line_range,
                "description": i.description,
                "suggestion": i.suggestion
            }
            for i in result.issues
        ],
        "refactored_code": result.refactored_code
    }

@app.post("/code/explain")
async def explain_code(req: ExplainRequest):
    """代码解释接口"""
    reviewer = CodeReviewer()
    explanation = reviewer.explain_code(req.code, req.language, req.level)
    return {"explanation": explanation}

@app.post("/sql/convert")
async def convert_to_sql(req: SQLRequest):
    """自然语言转 SQL"""
    tables = [TableSchema(**t) for t in req.tables]
    converter = Text2SQLConverter(dialect=req.dialect)
    result = converter.convert(
        question=req.question,
        tables=tables,
        allow_dml=req.allow_dml
    )
    return {
        "sql": result.sql,
        "explanation": result.explanation,
        "confidence": result.confidence,
        "warnings": result.warnings,
        "alternatives": result.alternatives
    }

@app.post("/agent/run")
async def run_agent(req: AgentRequest):
    """Agent 自动化编码(同步,适合轻量任务)"""
    agent = CodingAgent(model="gpt-4o")
    report = agent.run(req.task, req.language)
    return report

@app.post("/agent/run/stream")
async def run_agent_stream(req: AgentRequest):
    """Agent 自动化编码(流式进度推送)"""
    from openai import AsyncOpenAI

    async def generate():
        agent = CodingAgent(model="gpt-4o")

        # 注入进度回调
        original_step_append = agent.steps.append

        def tracked_append(step: AgentStep):
            original_step_append(step)
            import json
            progress = json.dumps({
                "action": step.action,
                "description": step.description,
                "success": step.success
            }, ensure_ascii=False)
            asyncio.create_task(send_event(progress))

        agent.steps.append = tracked_append

        yield f"data: {json.dumps({'type': 'start', 'task': req.task}, ensure_ascii=False)}\n\n"

        try:
            report = agent.run(req.task, req.language)
            yield f"data: {json.dumps({'type': 'done', 'report': report}, ensure_ascii=False)}\n\n"
        except Exception as e:
            yield f"data: {json.dumps({'type': 'error', 'message': str(e)}, ensure_ascii=False)}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

系统关键指标

指标目标值优化方向
代码生成一次通过率≥ 80%Few-shot、类型提示、语法验证
审查问题检测率≥ 90%分维度审查、专用安全模型
Text-to-SQL 准确率≥ 85%Schema 注入、Few-shot 样例、迭代修正
Agent 任务完成率≥ 70%自动重试、沙箱执行、最大尝试次数
平均响应延迟≤ 5s流式输出、小模型预筛

坚持是一种品格