Skip to content

学习时长:2-3 周

Function Calling(函数调用)是让 LLM 能够调用外部工具和 API 的核心技术,是构建实用 AI 应用的关键能力。本节将深入讲解 Function Calling 的原理、实现方式和最佳实践。


5.3.1 Function Calling 基础

什么是 Function Calling?

定义:

Function Calling 是 LLM 的一种能力,允许模型识别何时需要调用外部函数/工具,并生成符合函数签名的结构化参数。

核心流程:

用户输入: "北京今天天气怎么样?"

┌─────────────────────────────────┐
│ LLM 分析                        │
│ - 识别需要查询天气              │
│ - 选择 get_weather 函数         │
│ - 生成参数: {"city": "北京"}   │
└─────────────────────────────────┘

┌─────────────────────────────────┐
│ 应用程序执行函数                │
│ result = get_weather("北京")   │
│ → "北京今天 15°C,晴天"        │
└─────────────────────────────────┘

┌─────────────────────────────────┐
│ LLM 生成最终回复                │
│ "北京今天天气晴朗,气温15°C"   │
└─────────────────────────────────┘

Function Calling vs Prompt Engineering

对比维度Prompt EngineeringFunction Calling
参数提取不可靠(需要解析文本)结构化 JSON,可靠
类型安全❌ 无类型检查✅ 强类型约束
错误处理难以处理自动验证参数
可维护性低(Prompt 脆弱)高(函数签名明确)
性能需要额外解析直接返回结构化数据

示例对比:

python
# ❌ Prompt Engineering 方式
prompt = """
从用户输入中提取城市名称,格式: City: [城市名]
用户输入: 北京今天天气怎么样?
"""
response = llm.invoke(prompt)
# 输出: "City: 北京" (需要手动解析)
city = response.split(":")[1].strip()

# ✅ Function Calling 方式
functions = [{
    "name": "get_weather",
    "parameters": {
        "type": "object",
        "properties": {
            "city": {"type": "string"}
        }
    }
}]
response = llm.invoke(user_input, functions=functions)
# 输出: {"name": "get_weather", "arguments": {"city": "北京"}}
# 直接得到结构化参数

支持 Function Calling 的模型

模型支持情况特点
OpenAI GPT-4/4o✅ 原生支持最稳定,准确率高
OpenAI GPT-3.5-turbo✅ 原生支持性价比高
Claude 3.5 Sonnet✅ Tool Use准确率接近 GPT-4
Gemini 1.5 Pro✅ Function CallingGoogle 原生支持
Qwen-Plus/Max✅ 原生支持中文场景优秀
LLaMA 3.1⚠️ 需微调开源模型,需训练
Mistral Large✅ 原生支持欧洲开源

5.3.2 OpenAI Function Calling 详解

基础用法

1. 定义函数 Schema

python
from openai import OpenAI

client = OpenAI(api_key="your-api-key")

# 定义函数描述
functions = [
    {
        "name": "get_current_weather",
        "description": "获取指定城市的当前天气信息",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "城市名称,例如:北京、上海"
                },
                "unit": {
                    "type": "string",
                    "enum": ["celsius", "fahrenheit"],
                    "description": "温度单位"
                }
            },
            "required": ["location"]
        }
    }
]

2. 调用 API

python
messages = [
    {"role": "user", "content": "北京今天天气怎么样?"}
]

response = client.chat.completions.create(
    model="gpt-4o",
    messages=messages,
    functions=functions,
    function_call="auto"  # auto | none | {"name": "函数名"}
)

# 检查是否需要调用函数
message = response.choices[0].message

if message.function_call:
    # LLM 决定调用函数
    function_name = message.function_call.name
    function_args = json.loads(message.function_call.arguments)
    
    print(f"调用函数: {function_name}")
    print(f"参数: {function_args}")
    # 输出:
    # 调用函数: get_current_weather
    # 参数: {'location': '北京', 'unit': 'celsius'}

3. 执行函数并返回结果

python
import json

def get_current_weather(location: str, unit: str = "celsius") -> str:
    """实际的天气查询函数"""
    # 实际应用中调用天气 API
    weather_data = {
        "location": location,
        "temperature": 15,
        "unit": unit,
        "description": "晴天"
    }
    return json.dumps(weather_data, ensure_ascii=False)

# 执行函数
if message.function_call:
    function_name = message.function_call.name
    function_args = json.loads(message.function_call.arguments)
    
    # 动态调用函数
    available_functions = {
        "get_current_weather": get_current_weather
    }
    function_to_call = available_functions[function_name]
    function_response = function_to_call(**function_args)
    
    # 将函数结果返回给 LLM
    messages.append(message)  # 添加 LLM 的函数调用消息
    messages.append({
        "role": "function",
        "name": function_name,
        "content": function_response
    })
    
    # 再次调用 LLM 生成最终回复
    second_response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages
    )
    
    final_answer = second_response.choices[0].message.content
    print(final_answer)
    # 输出: "北京今天天气晴朗,气温15摄氏度。"

完整示例:天气查询助手

python
from openai import OpenAI
import json

client = OpenAI()

# 定义多个函数
functions = [
    {
        "name": "get_current_weather",
        "description": "获取指定城市的当前天气",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "城市名称"
                }
            },
            "required": ["location"]
        }
    },
    {
        "name": "get_weather_forecast",
        "description": "获取未来几天的天气预报",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "城市名称"
                },
                "days": {
                    "type": "integer",
                    "description": "预报天数(1-7)",
                    "minimum": 1,
                    "maximum": 7
                }
            },
            "required": ["location", "days"]
        }
    }
]

# 实现函数
def get_current_weather(location: str) -> str:
    """模拟天气查询"""
    return json.dumps({
        "location": location,
        "temperature": 15,
        "condition": "晴天"
    }, ensure_ascii=False)

def get_weather_forecast(location: str, days: int) -> str:
    """模拟天气预报"""
    forecast = []
    for i in range(days):
        forecast.append({
            "day": i + 1,
            "temperature": 15 + i,
            "condition": "晴天" if i % 2 == 0 else "多云"
        })
    return json.dumps({
        "location": location,
        "forecast": forecast
    }, ensure_ascii=False)

# 函数映射
available_functions = {
    "get_current_weather": get_current_weather,
    "get_weather_forecast": get_weather_forecast
}

def run_conversation(user_message: str) -> str:
    """运行对话"""
    messages = [{"role": "user", "content": user_message}]
    
    # 第一次调用:让 LLM 决定是否需要调用函数
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        functions=functions,
        function_call="auto"
    )
    
    message = response.choices[0].message
    
    # 如果需要调用函数
    if message.function_call:
        function_name = message.function_call.name
        function_args = json.loads(message.function_call.arguments)
        
        print(f"🔧 调用函数: {function_name}({function_args})")
        
        # 执行函数
        function_response = available_functions[function_name](**function_args)
        
        print(f"📊 函数返回: {function_response}")
        
        # 将结果返回给 LLM
        messages.append(message)
        messages.append({
            "role": "function",
            "name": function_name,
            "content": function_response
        })
        
        # 第二次调用:生成最终回复
        second_response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages
        )
        
        return second_response.choices[0].message.content
    else:
        # 不需要调用函数,直接返回
        return message.content

# 测试
print(run_conversation("北京今天天气怎么样?"))
print("\n" + "="*50 + "\n")
print(run_conversation("上海未来3天的天气预报"))

输出:

🔧 调用函数: get_current_weather({'location': '北京'})
📊 函数返回: {"location": "北京", "temperature": 15, "condition": "晴天"}
北京今天天气晴朗,气温15摄氏度。

==================================================

🔧 调用函数: get_weather_forecast({'location': '上海', 'days': 3})
📊 函数返回: {"location": "上海", "forecast": [{"day": 1, "temperature": 15, "condition": "晴天"}, ...]}
上海未来3天的天气预报如下:
- 第1天:晴天,15°C
- 第2天:多云,16°C
- 第3天:晴天,17°C

高级特性

1. 强制调用特定函数

python
# 强制调用指定函数(即使用户输入不明确)
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "天气"}],
    functions=functions,
    function_call={"name": "get_current_weather"}  # 强制调用
)

2. 并行函数调用(Parallel Function Calling)

OpenAI 支持在一次请求中调用多个函数。

python
# 用户: "对比北京和上海的天气"
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "对比北京和上海的天气"}],
    tools=[  # 使用新的 tools 格式
        {
            "type": "function",
            "function": {
                "name": "get_current_weather",
                "description": "获取天气",
                "parameters": {...}
            }
        }
    ]
)

# LLM 可能返回多个 tool_calls
message = response.choices[0].message
if message.tool_calls:
    for tool_call in message.tool_calls:
        function_name = tool_call.function.name
        function_args = json.loads(tool_call.function.arguments)
        # 执行函数...

3. 流式输出 + Function Calling

python
response = client.chat.completions.create(
    model="gpt-4o",
    messages=messages,
    functions=functions,
    stream=True
)

function_call_data = {"name": "", "arguments": ""}

for chunk in response:
    delta = chunk.choices[0].delta
    
    if delta.function_call:
        # 逐步接收函数调用信息
        if delta.function_call.name:
            function_call_data["name"] += delta.function_call.name
        if delta.function_call.arguments:
            function_call_data["arguments"] += delta.function_call.arguments

# 完整接收后执行函数
if function_call_data["name"]:
    function_args = json.loads(function_call_data["arguments"])
    result = execute_function(function_call_data["name"], function_args)

5.3.3 其他模型的 Function Calling

Claude 3.5 Tool Use

Anthropic Claude 使用 "Tool Use" 而非 "Function Calling",但原理相同。

python
import anthropic

client = anthropic.Anthropic(api_key="your-api-key")

# 定义工具
tools = [
    {
        "name": "get_weather",
        "description": "获取指定城市的天气信息",
        "input_schema": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "城市名称"
                }
            },
            "required": ["location"]
        }
    }
]

# 调用
response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    tools=tools,
    messages=[
        {"role": "user", "content": "北京今天天气怎么样?"}
    ]
)

# 检查是否需要调用工具
for content in response.content:
    if content.type == "tool_use":
        tool_name = content.name
        tool_input = content.input
        
        print(f"调用工具: {tool_name}")
        print(f"参数: {tool_input}")
        
        # 执行工具
        result = get_weather(**tool_input)
        
        # 返回结果给 Claude
        response = client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            tools=tools,
            messages=[
                {"role": "user", "content": "北京今天天气怎么样?"},
                {"role": "assistant", "content": response.content},
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": content.id,
                            "content": result
                        }
                    ]
                }
            ]
        )
        
        print(response.content[0].text)

Google Gemini Function Calling

python
import google.generativeai as genai

genai.configure(api_key="your-api-key")

# 定义函数
def get_weather(location: str) -> dict:
    """获取天气"""
    return {"location": location, "temperature": 15, "condition": "晴天"}

# 创建模型
model = genai.GenerativeModel(
    model_name='gemini-1.5-pro',
    tools=[get_weather]  # 直接传入 Python 函数
)

# 调用
chat = model.start_chat()
response = chat.send_message("北京今天天气怎么样?")

# 检查函数调用
if response.candidates[0].content.parts[0].function_call:
    function_call = response.candidates[0].content.parts[0].function_call
    function_name = function_call.name
    function_args = dict(function_call.args)
    
    # 执行函数
    result = get_weather(**function_args)
    
    # 返回结果
    response = chat.send_message(
        genai.protos.Content(
            parts=[
                genai.protos.Part(
                    function_response=genai.protos.FunctionResponse(
                        name=function_name,
                        response={"result": result}
                    )
                )
            ]
        )
    )
    
    print(response.text)

阿里通义千问 Function Calling

python
from dashscope import Generation

# 定义函数
functions = [
    {
        "name": "get_weather",
        "description": "获取天气信息",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {"type": "string", "description": "城市名称"}
            },
            "required": ["location"]
        }
    }
]

# 调用
response = Generation.call(
    model='qwen-plus',
    messages=[
        {"role": "user", "content": "北京今天天气怎么样?"}
    ],
    tools=functions,
    result_format='message'
)

# 处理函数调用
if response.output.choices[0].message.tool_calls:
    tool_call = response.output.choices[0].message.tool_calls[0]
    function_name = tool_call.function.name
    function_args = json.loads(tool_call.function.arguments)
    
    # 执行并返回结果...

5.3.4 工具设计最佳实践

1. 函数命名规范

python
# ❌ 不好的命名
def f1(x):
    """做某事"""
    pass

# ✅ 好的命名
def get_current_weather(location: str, unit: str = "celsius") -> dict:
    """
    获取指定城市的当前天气信息
    
    Args:
        location: 城市名称,如"北京"、"上海"
        unit: 温度单位,"celsius"(摄氏度) 或 "fahrenheit"(华氏度)
    
    Returns:
        包含温度、天气状况等信息的字典
    """
    pass

命名原则:

  • ✅ 使用动词开头(get、create、update、delete、search)
  • ✅ 名称清晰表达功能
  • ✅ 避免缩写和简写
  • ✅ 保持一致的命名风格

2. 参数设计

使用类型提示和详细描述:

python
from typing import Literal, Optional
from pydantic import BaseModel, Field

class WeatherInput(BaseModel):
    """天气查询参数"""
    location: str = Field(
        description="城市名称,支持中文(如'北京')或英文(如'Beijing')"
    )
    unit: Literal["celsius", "fahrenheit"] = Field(
        default="celsius",
        description="温度单位:celsius(摄氏度) 或 fahrenheit(华氏度)"
    )
    include_forecast: Optional[bool] = Field(
        default=False,
        description="是否包含未来天气预报"
    )

# 转换为 OpenAI Function Schema
def pydantic_to_openai_schema(model: type[BaseModel]) -> dict:
    """将 Pydantic 模型转换为 OpenAI Function Schema"""
    schema = model.schema()
    return {
        "type": "object",
        "properties": schema["properties"],
        "required": schema.get("required", [])
    }

weather_function = {
    "name": "get_weather",
    "description": "获取指定城市的天气信息",
    "parameters": pydantic_to_openai_schema(WeatherInput)
}

3. 错误处理

python
def get_weather(location: str) -> str:
    """健壮的天气查询函数"""
    try:
        # 参数验证
        if not location or not location.strip():
            return json.dumps({
                "error": "城市名称不能为空",
                "code": "INVALID_INPUT"
            }, ensure_ascii=False)
        
        # 调用外部 API
        response = requests.get(
            f"https://api.weather.com/v1/current",
            params={"city": location},
            timeout=5
        )
        response.raise_for_status()
        
        data = response.json()
        return json.dumps({
            "success": True,
            "data": data
        }, ensure_ascii=False)
        
    except requests.Timeout:
        return json.dumps({
            "error": "天气服务响应超时,请稍后重试",
            "code": "TIMEOUT"
        }, ensure_ascii=False)
    
    except requests.HTTPError as e:
        if e.response.status_code == 404:
            return json.dumps({
                "error": f"未找到城市 '{location}' 的天气信息",
                "code": "CITY_NOT_FOUND"
            }, ensure_ascii=False)
        else:
            return json.dumps({
                "error": "天气服务暂时不可用",
                "code": "SERVICE_ERROR"
            }, ensure_ascii=False)
    
    except Exception as e:
        # 记录错误日志
        logger.error(f"Weather API error: {e}")
        return json.dumps({
            "error": "查询天气时发生未知错误",
            "code": "UNKNOWN_ERROR"
        }, ensure_ascii=False)

4. 返回值设计

结构化返回:

python
# ❌ 不好的返回值
def search_products(query: str) -> str:
    return "找到了3个产品:iPhone 15, iPad Pro, MacBook"

# ✅ 好的返回值(结构化 JSON)
def search_products(query: str) -> str:
    results = {
        "query": query,
        "total": 3,
        "products": [
            {
                "id": "001",
                "name": "iPhone 15",
                "price": 5999,
                "stock": 100
            },
            {
                "id": "002",
                "name": "iPad Pro",
                "price": 6799,
                "stock": 50
            },
            {
                "id": "003",
                "name": "MacBook",
                "price": 9999,
                "stock": 30
            }
        ]
    }
    return json.dumps(results, ensure_ascii=False, indent=2)

返回值原则:

  • ✅ 使用 JSON 格式
  • ✅ 包含元数据(total、page、timestamp)
  • ✅ 错误信息明确
  • ✅ 数据类型一致

5.3.5 实战案例

案例 1: 数据库查询助手

需求: 用自然语言查询数据库

python
import sqlite3
import json
from openai import OpenAI

client = OpenAI()

# 初始化数据库
conn = sqlite3.connect('company.db')
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE IF NOT EXISTS employees (
    id INTEGER PRIMARY KEY,
    name TEXT,
    department TEXT,
    salary INTEGER,
    hire_date TEXT
)
''')

# 插入示例数据
sample_data = [
    (1, '张三', '技术部', 15000, '2020-01-15'),
    (2, '李四', '销售部', 12000, '2019-06-20'),
    (3, '王五', '技术部', 18000, '2021-03-10'),
]
cursor.executemany('INSERT OR REPLACE INTO employees VALUES (?,?,?,?,?)', sample_data)
conn.commit()

# 定义查询函数
def query_database(sql: str) -> str:
    """
    执行 SQL 查询
    
    Args:
        sql: SQL 查询语句(只支持 SELECT)
    
    Returns:
        查询结果的 JSON 字符串
    """
    try:
        # 安全检查:只允许 SELECT 语句
        if not sql.strip().upper().startswith('SELECT'):
            return json.dumps({
                "error": "只支持 SELECT 查询",
                "code": "INVALID_SQL"
            }, ensure_ascii=False)
        
        cursor.execute(sql)
        columns = [description[0] for description in cursor.description]
        results = cursor.fetchall()
        
        # 转换为字典列表
        data = [dict(zip(columns, row)) for row in results]
        
        return json.dumps({
            "success": True,
            "count": len(data),
            "data": data
        }, ensure_ascii=False, indent=2)
    
    except sqlite3.Error as e:
        return json.dumps({
            "error": f"SQL 执行错误: {str(e)}",
            "code": "SQL_ERROR"
        }, ensure_ascii=False)

# 定义函数 Schema
functions = [
    {
        "name": "query_database",
        "description": """
        查询员工数据库。
        
        数据库结构:
        - employees 表
          - id: 员工ID (INTEGER)
          - name: 姓名 (TEXT)
          - department: 部门 (TEXT)
          - salary: 薪资 (INTEGER)
          - hire_date: 入职日期 (TEXT, 格式: YYYY-MM-DD)
        
        示例查询:
        - 查询所有员工: SELECT * FROM employees
        - 查询技术部员工: SELECT * FROM employees WHERE department = '技术部'
        - 查询薪资排名: SELECT * FROM employees ORDER BY salary DESC
        """,
        "parameters": {
            "type": "object",
            "properties": {
                "sql": {
                    "type": "string",
                    "description": "SQL SELECT 查询语句"
                }
            },
            "required": ["sql"]
        }
    }
]

def chat_with_database(user_query: str) -> str:
    """与数据库对话"""
    messages = [
        {
            "role": "system",
            "content": "你是一个数据库查询助手。根据用户的自然语言问题,生成合适的 SQL 查询。"
        },
        {
            "role": "user",
            "content": user_query
        }
    ]
    
    # 第一次调用:生成 SQL
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        functions=functions,
        function_call="auto"
    )
    
    message = response.choices[0].message
    
    if message.function_call:
        function_name = message.function_call.name
        function_args = json.loads(message.function_call.arguments)
        
        print(f"🔍 生成的 SQL: {function_args['sql']}")
        
        # 执行查询
        result = query_database(**function_args)
        
        print(f"📊 查询结果: {result}")
        
        # 返回结果给 LLM
        messages.append(message)
        messages.append({
            "role": "function",
            "name": function_name,
            "content": result
        })
        
        # 第二次调用:生成自然语言回复
        second_response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages
        )
        
        return second_response.choices[0].message.content
    else:
        return message.content

# 测试
print(chat_with_database("技术部有多少人?"))
print("\n" + "="*50 + "\n")
print(chat_with_database("薪资最高的3名员工是谁?"))
print("\n" + "="*50 + "\n")
print(chat_with_database("2020年入职的员工有哪些?"))

输出:

🔍 生成的 SQL: SELECT COUNT(*) as count FROM employees WHERE department = '技术部'
📊 查询结果: {"success": true, "count": 1, "data": [{"count": 2}]}
技术部有2名员工。

==================================================

🔍 生成的 SQL: SELECT name, salary FROM employees ORDER BY salary DESC LIMIT 3
📊 查询结果: {"success": true, "count": 3, "data": [{"name": "王五", "salary": 18000}, ...]}
薪资最高的3名员工是:
1. 王五 - 18000元
2. 张三 - 15000元
3. 李四 - 12000元

案例 2: 多步骤工作流

需求: 订机票流程(搜索 → 确认 → 预订)

python
from openai import OpenAI
from datetime import datetime
import json

client = OpenAI()

# 模拟数据库
flights_db = [
    {"id": "CA1234", "from": "北京", "to": "上海", "date": "2024-03-15", "time": "08:00", "price": 800, "seats": 10},
    {"id": "MU5678", "from": "北京", "to": "上海", "date": "2024-03-15", "time": "14:00", "price": 650, "seats": 5},
    {"id": "CZ9012", "from": "北京", "to": "上海", "date": "2024-03-15", "time": "18:00", "price": 700, "seats": 8},
]

bookings = []

# 定义工具函数
def search_flights(origin: str, destination: str, date: str) -> str:
    """搜索航班"""
    results = [
        f for f in flights_db
        if f["from"] == origin and f["to"] == destination and f["date"] == date
    ]
    
    return json.dumps({
        "success": True,
        "count": len(results),
        "flights": results
    }, ensure_ascii=False, indent=2)

def check_availability(flight_id: str) -> str:
    """检查航班可用性"""
    flight = next((f for f in flights_db if f["id"] == flight_id), None)
    
    if not flight:
        return json.dumps({"error": "航班不存在"}, ensure_ascii=False)
    
    return json.dumps({
        "success": True,
        "flight_id": flight_id,
        "available_seats": flight["seats"],
        "price": flight["price"]
    }, ensure_ascii=False)

def book_flight(flight_id: str, passenger_name: str, passenger_count: int = 1) -> str:
    """预订航班"""
    flight = next((f for f in flights_db if f["id"] == flight_id), None)
    
    if not flight:
        return json.dumps({"error": "航班不存在"}, ensure_ascii=False)
    
    if flight["seats"] < passenger_count:
        return json.dumps({"error": "座位不足"}, ensure_ascii=False)
    
    # 扣减座位
    flight["seats"] -= passenger_count
    
    # 创建订单
    booking = {
        "booking_id": f"BK{len(bookings) + 1:04d}",
        "flight_id": flight_id,
        "passenger_name": passenger_name,
        "passenger_count": passenger_count,
        "total_price": flight["price"] * passenger_count,
        "status": "confirmed",
        "booking_time": datetime.now().isoformat()
    }
    bookings.append(booking)
    
    return json.dumps({
        "success": True,
        "booking": booking
    }, ensure_ascii=False, indent=2)

# 定义函数 Schema
functions = [
    {
        "name": "search_flights",
        "description": "搜索航班信息",
        "parameters": {
            "type": "object",
            "properties": {
                "origin": {"type": "string", "description": "出发城市"},
                "destination": {"type": "string", "description": "目的地城市"},
                "date": {"type": "string", "description": "日期,格式 YYYY-MM-DD"}
            },
            "required": ["origin", "destination", "date"]
        }
    },
    {
        "name": "check_availability",
        "description": "检查指定航班的座位和价格",
        "parameters": {
            "type": "object",
            "properties": {
                "flight_id": {"type": "string", "description": "航班号"}
            },
            "required": ["flight_id"]
        }
    },
    {
        "name": "book_flight",
        "description": "预订航班",
        "parameters": {
            "type": "object",
            "properties": {
                "flight_id": {"type": "string", "description": "航班号"},
                "passenger_name": {"type": "string", "description": "乘客姓名"},
                "passenger_count": {"type": "integer", "description": "乘客数量", "default": 1}
            },
            "required": ["flight_id", "passenger_name"]
        }
    }
]

available_functions = {
    "search_flights": search_flights,
    "check_availability": check_availability,
    "book_flight": book_flight
}

def flight_booking_agent(user_input: str) -> str:
    """航班预订 Agent"""
    messages = [
        {
            "role": "system",
            "content": """
            你是一个航班预订助手。帮助用户完成以下流程:
            1. 搜索航班
            2. 询问用户偏好(时间、价格)
            3. 检查航班可用性
            4. 确认乘客信息
            5. 完成预订
            
            注意:
            - 预订前必须确认用户意图
            - 提供清晰的航班信息对比
            - 预订成功后提供订单号
            """
        },
        {"role": "user", "content": user_input}
    ]
    
    max_iterations = 10
    iteration = 0
    
    while iteration < max_iterations:
        iteration += 1
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            functions=functions,
            function_call="auto"
        )
        
        message = response.choices[0].message
        messages.append(message)
        
        # 如果没有函数调用,返回最终回复
        if not message.function_call:
            return message.content
        
        # 执行函数
        function_name = message.function_call.name
        function_args = json.loads(message.function_call.arguments)
        
        print(f"\n🔧 [{iteration}] 调用函数: {function_name}")
        print(f"📝 参数: {json.dumps(function_args, ensure_ascii=False)}")
        
        function_result = available_functions[function_name](**function_args)
        
        print(f"✅ 结果: {function_result[:200]}...")
        
        # 将结果添加到消息历史
        messages.append({
            "role": "function",
            "name": function_name,
            "content": function_result
        })
    
    return "抱歉,处理时间过长,请重新开始。"

# 测试
print(flight_booking_agent("我想订明天从北京到上海的机票,下午出发,价格便宜点"))

输出:

🔧 [1] 调用函数: search_flights
📝 参数: {"origin": "北京", "destination": "上海", "date": "2024-03-15"}
✅ 结果: {"success": true, "count": 3, "flights": [{"id": "CA1234", ...}]}

找到3个航班:
1. CA1234 - 08:00 出发,800元
2. MU5678 - 14:00 出发,650元(推荐)
3. CZ9012 - 18:00 出发,700元

根据您的需求(下午出发、价格便宜),我推荐 MU5678 航班。是否预订?

[用户确认后...]

🔧 [2] 调用函数: book_flight
📝 参数: {"flight_id": "MU5678", "passenger_name": "张三", "passenger_count": 1}
✅ 结果: {"success": true, "booking": {"booking_id": "BK0001", ...}}

预订成功!
订单号:BK0001
航班:MU5678
乘客:张三
价格:650元

案例 3: 智能客服系统

python
from openai import OpenAI
import json

client = OpenAI()

# 模拟知识库
knowledge_base = {
    "退款政策": "购买后7天内可无理由退款,需保持商品完好。",
    "配送时间": "一般3-5个工作日送达,偏远地区可能需要7-10天。",
    "售后服务": "提供1年质保,人为损坏不在保修范围内。"
}

orders_db = {
    "ORD001": {"status": "已发货", "tracking": "SF1234567890", "items": ["iPhone 15"]},
    "ORD002": {"status": "配送中", "tracking": "YT9876543210", "items": ["iPad Pro"]}
}

# 工具函数
def search_knowledge(query: str) -> str:
    """搜索知识库"""
    results = []
    for key, value in knowledge_base.items():
        if query in key or query in value:
            results.append(f"{key}: {value}")
    
    if results:
        return json.dumps({
            "success": True,
            "results": results
        }, ensure_ascii=False)
    else:
        return json.dumps({
            "success": False,
            "message": "未找到相关信息"
        }, ensure_ascii=False)

def query_order_status(order_id: str) -> str:
    """查询订单状态"""
    order = orders_db.get(order_id)
    
    if order:
        return json.dumps({
            "success": True,
            "order_id": order_id,
            "status": order["status"],
            "tracking": order["tracking"],
            "items": order["items"]
        }, ensure_ascii=False)
    else:
        return json.dumps({
            "success": False,
            "message": "订单不存在"
        }, ensure_ascii=False)

def create_ticket(issue_type: str, description: str, priority: str = "medium") -> str:
    """创建工单(升级到人工)"""
    ticket = {
        "ticket_id": f"TK{len(range(100)) + 1:04d}",
        "issue_type": issue_type,
        "description": description,
        "priority": priority,
        "status": "待处理"
    }
    
    return json.dumps({
        "success": True,
        "message": "已创建工单,客服将在24小时内联系您",
        "ticket": ticket
    }, ensure_ascii=False)

# 函数定义
functions = [
    {
        "name": "search_knowledge",
        "description": "搜索知识库,查找常见问题的答案",
        "parameters": {
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "搜索关键词"}
            },
            "required": ["query"]
        }
    },
    {
        "name": "query_order_status",
        "description": "查询订单状态和物流信息",
        "parameters": {
            "type": "object",
            "properties": {
                "order_id": {"type": "string", "description": "订单号"}
            },
            "required": ["order_id"]
        }
    },
    {
        "name": "create_ticket",
        "description": "创建工单,升级到人工客服处理",
        "parameters": {
            "type": "object",
            "properties": {
                "issue_type": {
                    "type": "string",
                    "enum": ["退款", "投诉", "技术问题", "其他"],
                    "description": "问题类型"
                },
                "description": {"type": "string", "description": "问题描述"},
                "priority": {
                    "type": "string",
                    "enum": ["low", "medium", "high"],
                    "default": "medium",
                    "description": "优先级"
                }
            },
            "required": ["issue_type", "description"]
        }
    }
]

available_functions = {
    "search_knowledge": search_knowledge,
    "query_order_status": query_order_status,
    "create_ticket": create_ticket
}

def customer_service_bot(user_message: str, conversation_history: list = None) -> tuple[str, list]:
    """客服机器人"""
    if conversation_history is None:
        conversation_history = [
            {
                "role": "system",
                "content": """
                你是一个智能客服助手。职责:
                1. 回答常见问题(使用 search_knowledge)
                2. 查询订单状态(使用 query_order_status)
                3. 无法解决的问题升级到人工(使用 create_ticket)
                
                原则:
                - 友好、耐心、专业
                - 先尝试自己解决,实在不行再升级
                - 提供清晰的信息和下一步指引
                """
            }
        ]
    
    conversation_history.append({"role": "user", "content": user_message})
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=conversation_history,
        functions=functions,
        function_call="auto"
    )
    
    message = response.choices[0].message
    conversation_history.append(message)
    
    if message.function_call:
        function_name = message.function_call.name
        function_args = json.loads(message.function_call.arguments)
        
        print(f"🔧 调用: {function_name}({function_args})")
        
        function_result = available_functions[function_name](**function_args)
        
        conversation_history.append({
            "role": "function",
            "name": function_name,
            "content": function_result
        })
        
        # 再次调用生成回复
        second_response = client.chat.completions.create(
            model="gpt-4o",
            messages=conversation_history
        )
        
        final_message = second_response.choices[0].message
        conversation_history.append(final_message)
        
        return final_message.content, conversation_history
    else:
        return message.content, conversation_history

# 测试多轮对话
history = None
queries = [
    "你们的退款政策是什么?",
    "我的订单 ORD001 到哪了?",
    "商品有质量问题,我要投诉!"
]

for query in queries:
    print(f"\n👤 用户: {query}")
    response, history = customer_service_bot(query, history)
    print(f"🤖 客服: {response}")
    print("-" * 50)

5.3.6 高级技巧

1. 动态函数注册

根据用户上下文动态选择可用函数。

python
class DynamicFunctionRegistry:
    """动态函数注册器"""
    
    def __init__(self):
        self.all_functions = {
            "basic": [get_weather, get_time],
            "premium": [get_weather, get_time, search_web, send_email],
            "admin": [get_weather, get_time, search_web, send_email, query_database, execute_code]
        }
    
    def get_functions_for_user(self, user_tier: str) -> list:
        """根据用户等级返回可用函数"""
        return self.all_functions.get(user_tier, self.all_functions["basic"])

# 使用
registry = DynamicFunctionRegistry()
user_tier = "premium"  # 从用户信息获取

available_functions = registry.get_functions_for_user(user_tier)

response = client.chat.completions.create(
    model="gpt-4o",
    messages=messages,
    functions=[func_to_schema(f) for f in available_functions]
)

2. 函数链式调用

一个函数的输出作为另一个函数的输入。

python
# 定义函数
def get_user_location(user_id: str) -> str:
    """获取用户位置"""
    return json.dumps({"user_id": user_id, "city": "北京"})

def get_weather(city: str) -> str:
    """获取天气"""
    return json.dumps({"city": city, "temperature": 15, "condition": "晴天"})

# LLM 会自动链式调用:
# 1. 调用 get_user_location(user_id="123")
# 2. 从结果中提取 city="北京"
# 3. 调用 get_weather(city="北京")
# 4. 生成最终回复

functions = [
    func_to_schema(get_user_location),
    func_to_schema(get_weather)
]

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "用户123那里天气怎么样?"}],
    functions=functions
)

3. 函数执行缓存

避免重复调用相同函数。

python
from functools import lru_cache
import hashlib

class FunctionCache:
    """函数调用缓存"""
    
    def __init__(self, ttl: int = 300):
        self.cache = {}
        self.ttl = ttl  # 缓存有效期(秒)
    
    def get_cache_key(self, function_name: str, args: dict) -> str:
        """生成缓存键"""
        args_str = json.dumps(args, sort_keys=True)
        return hashlib.md5(f"{function_name}:{args_str}".encode()).hexdigest()
    
    def get(self, function_name: str, args: dict):
        """获取缓存"""
        key = self.get_cache_key(function_name, args)
        cached = self.cache.get(key)
        
        if cached:
            timestamp, result = cached
            if time.time() - timestamp < self.ttl:
                print(f"🎯 缓存命中: {function_name}")
                return result
        
        return None
    
    def set(self, function_name: str, args: dict, result: str):
        """设置缓存"""
        key = self.get_cache_key(function_name, args)
        self.cache[key] = (time.time(), result)

# 使用
cache = FunctionCache(ttl=300)

def execute_function_with_cache(function_name: str, args: dict) -> str:
    """带缓存的函数执行"""
    # 检查缓存
    cached_result = cache.get(function_name, args)
    if cached_result:
        return cached_result
    
    # 执行函数
    result = available_functions[function_name](**args)
    
    # 存入缓存
    cache.set(function_name, args, result)
    
    return result

4. 异步函数调用

提升性能,特别是需要调用多个外部 API 时。

python
import asyncio
import aiohttp

async def async_get_weather(city: str) -> str:
    """异步天气查询"""
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.weather.com?city={city}") as response:
            data = await response.json()
            return json.dumps(data)

async def async_get_news(topic: str) -> str:
    """异步新闻查询"""
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.news.com?topic={topic}") as response:
            data = await response.json()
            return json.dumps(data)

async def parallel_function_calls(calls: list[tuple[str, dict]]) -> list[str]:
    """并行执行多个函数"""
    tasks = []
    for function_name, args in calls:
        if function_name == "get_weather":
            tasks.append(async_get_weather(**args))
        elif function_name == "get_news":
            tasks.append(async_get_news(**args))
    
    results = await asyncio.gather(*tasks)
    return results

# 使用
calls = [
    ("get_weather", {"city": "北京"}),
    ("get_weather", {"city": "上海"}),
    ("get_news", {"topic": "AI"})
]

results = asyncio.run(parallel_function_calls(calls))

5.3.7 安全性考虑

1. 输入验证

python
from pydantic import BaseModel, Field, validator

class SafeSearchInput(BaseModel):
    """安全的搜索输入"""
    query: str = Field(max_length=200)
    
    @validator('query')
    def validate_query(cls, v):
        # 禁止 SQL 注入关键词
        dangerous_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT', '--', ';']
        if any(kw in v.upper() for kw in dangerous_keywords):
            raise ValueError("查询包含危险关键词")
        return v

def safe_search(query: str) -> str:
    """安全的搜索函数"""
    try:
        validated = SafeSearchInput(query=query)
        # 执行搜索...
    except ValueError as e:
        return json.dumps({"error": str(e)})

2. 权限控制

python
class FunctionPermissionManager:
    """函数权限管理器"""
    
    def __init__(self):
        self.permissions = {
            "user": ["get_weather", "search_web"],
            "admin": ["get_weather", "search_web", "query_database", "send_email"],
            "superadmin": ["*"]  # 所有权限
        }
    
    def can_execute(self, user_role: str, function_name: str) -> bool:
        """检查用户是否有权限执行函数"""
        allowed = self.permissions.get(user_role, [])
        return "*" in allowed or function_name in allowed
    
    def execute_with_permission(self, user_role: str, function_name: str, args: dict) -> str:
        """带权限检查的函数执行"""
        if not self.can_execute(user_role, function_name):
            return json.dumps({
                "error": "权限不足",
                "code": "PERMISSION_DENIED"
            })
        
        return available_functions[function_name](**args)

# 使用
permission_manager = FunctionPermissionManager()

result = permission_manager.execute_with_permission(
    user_role="user",
    function_name="query_database",  # 用户无权限
    args={"sql": "SELECT * FROM users"}
)
# 返回: {"error": "权限不足", "code": "PERMISSION_DENIED"}

3. 速率限制

python
from collections import defaultdict
import time

class RateLimiter:
    """速率限制器"""
    
    def __init__(self, max_calls: int = 10, window: int = 60):
        self.max_calls = max_calls
        self.window = window
        self.calls = defaultdict(list)
    
    def is_allowed(self, user_id: str, function_name: str) -> bool:
        """检查是否允许调用"""
        key = f"{user_id}:{function_name}"
        now = time.time()
        
        # 清理过期记录
        self.calls[key] = [t for t in self.calls[key] if now - t < self.window]
        
        # 检查是否超限
        if len(self.calls[key]) >= self.max_calls:
            return False
        
        # 记录本次调用
        self.calls[key].append(now)
        return True

# 使用
rate_limiter = RateLimiter(max_calls=5, window=60)

def execute_with_rate_limit(user_id: str, function_name: str, args: dict) -> str:
    """带速率限制的函数执行"""
    if not rate_limiter.is_allowed(user_id, function_name):
        return json.dumps({
            "error": "调用频率过高,请稍后再试",
            "code": "RATE_LIMIT_EXCEEDED"
        })
    
    return available_functions[function_name](**args)

5.3.8 调试与监控

1. 详细日志

python
import logging
from datetime import datetime

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.FileHandler(f'function_calls_{datetime.now():%Y%m%d}.log'),
        logging.StreamHandler()
    ]
)

def execute_function_with_logging(function_name: str, args: dict) -> str:
    """带日志的函数执行"""
    start_time = time.time()
    
    logging.info(f"开始执行函数: {function_name}")
    logging.info(f"参数: {json.dumps(args, ensure_ascii=False)}")
    
    try:
        result = available_functions[function_name](**args)
        
        execution_time = time.time() - start_time
        logging.info(f"执行成功,耗时: {execution_time:.2f}秒")
        logging.info(f"结果: {result[:200]}...")
        
        return result
    
    except Exception as e:
        logging.error(f"执行失败: {str(e)}", exc_info=True)
        return json.dumps({"error": str(e)})

2. 性能监控

python
from dataclasses import dataclass
from typing import Dict, List
import statistics

@dataclass
class FunctionMetrics:
    """函数调用指标"""
    function_name: str
    call_count: int = 0
    total_time: float = 0.0
    errors: int = 0
    avg_time: float = 0.0

class FunctionMonitor:
    """函数调用监控器"""
    
    def __init__(self):
        self.metrics: Dict[str, FunctionMetrics] = {}
        self.call_history: List[dict] = []
    
    def record_call(self, function_name: str, execution_time: float, success: bool):
        """记录函数调用"""
        if function_name not in self.metrics:
            self.metrics[function_name] = FunctionMetrics(function_name=function_name)
        
        metric = self.metrics[function_name]
        metric.call_count += 1
        metric.total_time += execution_time
        metric.avg_time = metric.total_time / metric.call_count
        
        if not success:
            metric.errors += 1
        
        self.call_history.append({
            "function": function_name,
            "time": execution_time,
            "success": success,
            "timestamp": datetime.now().isoformat()
        })
    
    def get_report(self) -> str:
        """生成监控报告"""
        report = "函数调用统计报告\n"
        report += "=" * 50 + "\n"
        
        for name, metric in self.metrics.items():
            report += f"\n函数: {name}\n"
            report += f"  调用次数: {metric.call_count}\n"
            report += f"  平均耗时: {metric.avg_time:.3f}\n"
            report += f"  错误次数: {metric.errors}\n"
            report += f"  成功率: {(1 - metric.errors/metric.call_count)*100:.1f}%\n"
        
        return report

# 使用
monitor = FunctionMonitor()

def execute_with_monitoring(function_name: str, args: dict) -> str:
    """带监控的函数执行"""
    start_time = time.time()
    success = True
    
    try:
        result = available_functions[function_name](**args)
        return result
    except Exception as e:
        success = False
        return json.dumps({"error": str(e)})
    finally:
        execution_time = time.time() - start_time
        monitor.record_call(function_name, execution_time, success)

# 定期输出报告
print(monitor.get_report())

5.3.9 学习资源

官方文档:

  • [Open

坚持是一种品格