学习时长:2-3 周
Function Calling(函数调用)是让 LLM 能够调用外部工具和 API 的核心技术,是构建实用 AI 应用的关键能力。本节将深入讲解 Function Calling 的原理、实现方式和最佳实践。
5.3.1 Function Calling 基础
什么是 Function Calling?
定义:
Function Calling 是 LLM 的一种能力,允许模型识别何时需要调用外部函数/工具,并生成符合函数签名的结构化参数。
核心流程:
用户输入: "北京今天天气怎么样?"
↓
┌─────────────────────────────────┐
│ LLM 分析 │
│ - 识别需要查询天气 │
│ - 选择 get_weather 函数 │
│ - 生成参数: {"city": "北京"} │
└─────────────────────────────────┘
↓
┌─────────────────────────────────┐
│ 应用程序执行函数 │
│ result = get_weather("北京") │
│ → "北京今天 15°C,晴天" │
└─────────────────────────────────┘
↓
┌─────────────────────────────────┐
│ LLM 生成最终回复 │
│ "北京今天天气晴朗,气温15°C" │
└─────────────────────────────────┘Function Calling vs Prompt Engineering
| 对比维度 | Prompt Engineering | Function Calling |
|---|---|---|
| 参数提取 | 不可靠(需要解析文本) | 结构化 JSON,可靠 |
| 类型安全 | ❌ 无类型检查 | ✅ 强类型约束 |
| 错误处理 | 难以处理 | 自动验证参数 |
| 可维护性 | 低(Prompt 脆弱) | 高(函数签名明确) |
| 性能 | 需要额外解析 | 直接返回结构化数据 |
示例对比:
python
# ❌ Prompt Engineering 方式
prompt = """
从用户输入中提取城市名称,格式: City: [城市名]
用户输入: 北京今天天气怎么样?
"""
response = llm.invoke(prompt)
# 输出: "City: 北京" (需要手动解析)
city = response.split(":")[1].strip()
# ✅ Function Calling 方式
functions = [{
"name": "get_weather",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string"}
}
}
}]
response = llm.invoke(user_input, functions=functions)
# 输出: {"name": "get_weather", "arguments": {"city": "北京"}}
# 直接得到结构化参数支持 Function Calling 的模型
| 模型 | 支持情况 | 特点 |
|---|---|---|
| OpenAI GPT-4/4o | ✅ 原生支持 | 最稳定,准确率高 |
| OpenAI GPT-3.5-turbo | ✅ 原生支持 | 性价比高 |
| Claude 3.5 Sonnet | ✅ Tool Use | 准确率接近 GPT-4 |
| Gemini 1.5 Pro | ✅ Function Calling | Google 原生支持 |
| Qwen-Plus/Max | ✅ 原生支持 | 中文场景优秀 |
| LLaMA 3.1 | ⚠️ 需微调 | 开源模型,需训练 |
| Mistral Large | ✅ 原生支持 | 欧洲开源 |
5.3.2 OpenAI Function Calling 详解
基础用法
1. 定义函数 Schema
python
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
# 定义函数描述
functions = [
{
"name": "get_current_weather",
"description": "获取指定城市的当前天气信息",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市名称,例如:北京、上海"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "温度单位"
}
},
"required": ["location"]
}
}
]2. 调用 API
python
messages = [
{"role": "user", "content": "北京今天天气怎么样?"}
]
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
functions=functions,
function_call="auto" # auto | none | {"name": "函数名"}
)
# 检查是否需要调用函数
message = response.choices[0].message
if message.function_call:
# LLM 决定调用函数
function_name = message.function_call.name
function_args = json.loads(message.function_call.arguments)
print(f"调用函数: {function_name}")
print(f"参数: {function_args}")
# 输出:
# 调用函数: get_current_weather
# 参数: {'location': '北京', 'unit': 'celsius'}3. 执行函数并返回结果
python
import json
def get_current_weather(location: str, unit: str = "celsius") -> str:
"""实际的天气查询函数"""
# 实际应用中调用天气 API
weather_data = {
"location": location,
"temperature": 15,
"unit": unit,
"description": "晴天"
}
return json.dumps(weather_data, ensure_ascii=False)
# 执行函数
if message.function_call:
function_name = message.function_call.name
function_args = json.loads(message.function_call.arguments)
# 动态调用函数
available_functions = {
"get_current_weather": get_current_weather
}
function_to_call = available_functions[function_name]
function_response = function_to_call(**function_args)
# 将函数结果返回给 LLM
messages.append(message) # 添加 LLM 的函数调用消息
messages.append({
"role": "function",
"name": function_name,
"content": function_response
})
# 再次调用 LLM 生成最终回复
second_response = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
final_answer = second_response.choices[0].message.content
print(final_answer)
# 输出: "北京今天天气晴朗,气温15摄氏度。"完整示例:天气查询助手
python
from openai import OpenAI
import json
client = OpenAI()
# 定义多个函数
functions = [
{
"name": "get_current_weather",
"description": "获取指定城市的当前天气",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市名称"
}
},
"required": ["location"]
}
},
{
"name": "get_weather_forecast",
"description": "获取未来几天的天气预报",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市名称"
},
"days": {
"type": "integer",
"description": "预报天数(1-7)",
"minimum": 1,
"maximum": 7
}
},
"required": ["location", "days"]
}
}
]
# 实现函数
def get_current_weather(location: str) -> str:
"""模拟天气查询"""
return json.dumps({
"location": location,
"temperature": 15,
"condition": "晴天"
}, ensure_ascii=False)
def get_weather_forecast(location: str, days: int) -> str:
"""模拟天气预报"""
forecast = []
for i in range(days):
forecast.append({
"day": i + 1,
"temperature": 15 + i,
"condition": "晴天" if i % 2 == 0 else "多云"
})
return json.dumps({
"location": location,
"forecast": forecast
}, ensure_ascii=False)
# 函数映射
available_functions = {
"get_current_weather": get_current_weather,
"get_weather_forecast": get_weather_forecast
}
def run_conversation(user_message: str) -> str:
"""运行对话"""
messages = [{"role": "user", "content": user_message}]
# 第一次调用:让 LLM 决定是否需要调用函数
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
functions=functions,
function_call="auto"
)
message = response.choices[0].message
# 如果需要调用函数
if message.function_call:
function_name = message.function_call.name
function_args = json.loads(message.function_call.arguments)
print(f"🔧 调用函数: {function_name}({function_args})")
# 执行函数
function_response = available_functions[function_name](**function_args)
print(f"📊 函数返回: {function_response}")
# 将结果返回给 LLM
messages.append(message)
messages.append({
"role": "function",
"name": function_name,
"content": function_response
})
# 第二次调用:生成最终回复
second_response = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return second_response.choices[0].message.content
else:
# 不需要调用函数,直接返回
return message.content
# 测试
print(run_conversation("北京今天天气怎么样?"))
print("\n" + "="*50 + "\n")
print(run_conversation("上海未来3天的天气预报"))输出:
🔧 调用函数: get_current_weather({'location': '北京'})
📊 函数返回: {"location": "北京", "temperature": 15, "condition": "晴天"}
北京今天天气晴朗,气温15摄氏度。
==================================================
🔧 调用函数: get_weather_forecast({'location': '上海', 'days': 3})
📊 函数返回: {"location": "上海", "forecast": [{"day": 1, "temperature": 15, "condition": "晴天"}, ...]}
上海未来3天的天气预报如下:
- 第1天:晴天,15°C
- 第2天:多云,16°C
- 第3天:晴天,17°C高级特性
1. 强制调用特定函数
python
# 强制调用指定函数(即使用户输入不明确)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "天气"}],
functions=functions,
function_call={"name": "get_current_weather"} # 强制调用
)2. 并行函数调用(Parallel Function Calling)
OpenAI 支持在一次请求中调用多个函数。
python
# 用户: "对比北京和上海的天气"
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "对比北京和上海的天气"}],
tools=[ # 使用新的 tools 格式
{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "获取天气",
"parameters": {...}
}
}
]
)
# LLM 可能返回多个 tool_calls
message = response.choices[0].message
if message.tool_calls:
for tool_call in message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
# 执行函数...3. 流式输出 + Function Calling
python
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
functions=functions,
stream=True
)
function_call_data = {"name": "", "arguments": ""}
for chunk in response:
delta = chunk.choices[0].delta
if delta.function_call:
# 逐步接收函数调用信息
if delta.function_call.name:
function_call_data["name"] += delta.function_call.name
if delta.function_call.arguments:
function_call_data["arguments"] += delta.function_call.arguments
# 完整接收后执行函数
if function_call_data["name"]:
function_args = json.loads(function_call_data["arguments"])
result = execute_function(function_call_data["name"], function_args)5.3.3 其他模型的 Function Calling
Claude 3.5 Tool Use
Anthropic Claude 使用 "Tool Use" 而非 "Function Calling",但原理相同。
python
import anthropic
client = anthropic.Anthropic(api_key="your-api-key")
# 定义工具
tools = [
{
"name": "get_weather",
"description": "获取指定城市的天气信息",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市名称"
}
},
"required": ["location"]
}
}
]
# 调用
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "北京今天天气怎么样?"}
]
)
# 检查是否需要调用工具
for content in response.content:
if content.type == "tool_use":
tool_name = content.name
tool_input = content.input
print(f"调用工具: {tool_name}")
print(f"参数: {tool_input}")
# 执行工具
result = get_weather(**tool_input)
# 返回结果给 Claude
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "北京今天天气怎么样?"},
{"role": "assistant", "content": response.content},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": content.id,
"content": result
}
]
}
]
)
print(response.content[0].text)Google Gemini Function Calling
python
import google.generativeai as genai
genai.configure(api_key="your-api-key")
# 定义函数
def get_weather(location: str) -> dict:
"""获取天气"""
return {"location": location, "temperature": 15, "condition": "晴天"}
# 创建模型
model = genai.GenerativeModel(
model_name='gemini-1.5-pro',
tools=[get_weather] # 直接传入 Python 函数
)
# 调用
chat = model.start_chat()
response = chat.send_message("北京今天天气怎么样?")
# 检查函数调用
if response.candidates[0].content.parts[0].function_call:
function_call = response.candidates[0].content.parts[0].function_call
function_name = function_call.name
function_args = dict(function_call.args)
# 执行函数
result = get_weather(**function_args)
# 返回结果
response = chat.send_message(
genai.protos.Content(
parts=[
genai.protos.Part(
function_response=genai.protos.FunctionResponse(
name=function_name,
response={"result": result}
)
)
]
)
)
print(response.text)阿里通义千问 Function Calling
python
from dashscope import Generation
# 定义函数
functions = [
{
"name": "get_weather",
"description": "获取天气信息",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "城市名称"}
},
"required": ["location"]
}
}
]
# 调用
response = Generation.call(
model='qwen-plus',
messages=[
{"role": "user", "content": "北京今天天气怎么样?"}
],
tools=functions,
result_format='message'
)
# 处理函数调用
if response.output.choices[0].message.tool_calls:
tool_call = response.output.choices[0].message.tool_calls[0]
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
# 执行并返回结果...5.3.4 工具设计最佳实践
1. 函数命名规范
python
# ❌ 不好的命名
def f1(x):
"""做某事"""
pass
# ✅ 好的命名
def get_current_weather(location: str, unit: str = "celsius") -> dict:
"""
获取指定城市的当前天气信息
Args:
location: 城市名称,如"北京"、"上海"
unit: 温度单位,"celsius"(摄氏度) 或 "fahrenheit"(华氏度)
Returns:
包含温度、天气状况等信息的字典
"""
pass命名原则:
- ✅ 使用动词开头(get、create、update、delete、search)
- ✅ 名称清晰表达功能
- ✅ 避免缩写和简写
- ✅ 保持一致的命名风格
2. 参数设计
使用类型提示和详细描述:
python
from typing import Literal, Optional
from pydantic import BaseModel, Field
class WeatherInput(BaseModel):
"""天气查询参数"""
location: str = Field(
description="城市名称,支持中文(如'北京')或英文(如'Beijing')"
)
unit: Literal["celsius", "fahrenheit"] = Field(
default="celsius",
description="温度单位:celsius(摄氏度) 或 fahrenheit(华氏度)"
)
include_forecast: Optional[bool] = Field(
default=False,
description="是否包含未来天气预报"
)
# 转换为 OpenAI Function Schema
def pydantic_to_openai_schema(model: type[BaseModel]) -> dict:
"""将 Pydantic 模型转换为 OpenAI Function Schema"""
schema = model.schema()
return {
"type": "object",
"properties": schema["properties"],
"required": schema.get("required", [])
}
weather_function = {
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": pydantic_to_openai_schema(WeatherInput)
}3. 错误处理
python
def get_weather(location: str) -> str:
"""健壮的天气查询函数"""
try:
# 参数验证
if not location or not location.strip():
return json.dumps({
"error": "城市名称不能为空",
"code": "INVALID_INPUT"
}, ensure_ascii=False)
# 调用外部 API
response = requests.get(
f"https://api.weather.com/v1/current",
params={"city": location},
timeout=5
)
response.raise_for_status()
data = response.json()
return json.dumps({
"success": True,
"data": data
}, ensure_ascii=False)
except requests.Timeout:
return json.dumps({
"error": "天气服务响应超时,请稍后重试",
"code": "TIMEOUT"
}, ensure_ascii=False)
except requests.HTTPError as e:
if e.response.status_code == 404:
return json.dumps({
"error": f"未找到城市 '{location}' 的天气信息",
"code": "CITY_NOT_FOUND"
}, ensure_ascii=False)
else:
return json.dumps({
"error": "天气服务暂时不可用",
"code": "SERVICE_ERROR"
}, ensure_ascii=False)
except Exception as e:
# 记录错误日志
logger.error(f"Weather API error: {e}")
return json.dumps({
"error": "查询天气时发生未知错误",
"code": "UNKNOWN_ERROR"
}, ensure_ascii=False)4. 返回值设计
结构化返回:
python
# ❌ 不好的返回值
def search_products(query: str) -> str:
return "找到了3个产品:iPhone 15, iPad Pro, MacBook"
# ✅ 好的返回值(结构化 JSON)
def search_products(query: str) -> str:
results = {
"query": query,
"total": 3,
"products": [
{
"id": "001",
"name": "iPhone 15",
"price": 5999,
"stock": 100
},
{
"id": "002",
"name": "iPad Pro",
"price": 6799,
"stock": 50
},
{
"id": "003",
"name": "MacBook",
"price": 9999,
"stock": 30
}
]
}
return json.dumps(results, ensure_ascii=False, indent=2)返回值原则:
- ✅ 使用 JSON 格式
- ✅ 包含元数据(total、page、timestamp)
- ✅ 错误信息明确
- ✅ 数据类型一致
5.3.5 实战案例
案例 1: 数据库查询助手
需求: 用自然语言查询数据库
python
import sqlite3
import json
from openai import OpenAI
client = OpenAI()
# 初始化数据库
conn = sqlite3.connect('company.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY,
name TEXT,
department TEXT,
salary INTEGER,
hire_date TEXT
)
''')
# 插入示例数据
sample_data = [
(1, '张三', '技术部', 15000, '2020-01-15'),
(2, '李四', '销售部', 12000, '2019-06-20'),
(3, '王五', '技术部', 18000, '2021-03-10'),
]
cursor.executemany('INSERT OR REPLACE INTO employees VALUES (?,?,?,?,?)', sample_data)
conn.commit()
# 定义查询函数
def query_database(sql: str) -> str:
"""
执行 SQL 查询
Args:
sql: SQL 查询语句(只支持 SELECT)
Returns:
查询结果的 JSON 字符串
"""
try:
# 安全检查:只允许 SELECT 语句
if not sql.strip().upper().startswith('SELECT'):
return json.dumps({
"error": "只支持 SELECT 查询",
"code": "INVALID_SQL"
}, ensure_ascii=False)
cursor.execute(sql)
columns = [description[0] for description in cursor.description]
results = cursor.fetchall()
# 转换为字典列表
data = [dict(zip(columns, row)) for row in results]
return json.dumps({
"success": True,
"count": len(data),
"data": data
}, ensure_ascii=False, indent=2)
except sqlite3.Error as e:
return json.dumps({
"error": f"SQL 执行错误: {str(e)}",
"code": "SQL_ERROR"
}, ensure_ascii=False)
# 定义函数 Schema
functions = [
{
"name": "query_database",
"description": """
查询员工数据库。
数据库结构:
- employees 表
- id: 员工ID (INTEGER)
- name: 姓名 (TEXT)
- department: 部门 (TEXT)
- salary: 薪资 (INTEGER)
- hire_date: 入职日期 (TEXT, 格式: YYYY-MM-DD)
示例查询:
- 查询所有员工: SELECT * FROM employees
- 查询技术部员工: SELECT * FROM employees WHERE department = '技术部'
- 查询薪资排名: SELECT * FROM employees ORDER BY salary DESC
""",
"parameters": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL SELECT 查询语句"
}
},
"required": ["sql"]
}
}
]
def chat_with_database(user_query: str) -> str:
"""与数据库对话"""
messages = [
{
"role": "system",
"content": "你是一个数据库查询助手。根据用户的自然语言问题,生成合适的 SQL 查询。"
},
{
"role": "user",
"content": user_query
}
]
# 第一次调用:生成 SQL
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
functions=functions,
function_call="auto"
)
message = response.choices[0].message
if message.function_call:
function_name = message.function_call.name
function_args = json.loads(message.function_call.arguments)
print(f"🔍 生成的 SQL: {function_args['sql']}")
# 执行查询
result = query_database(**function_args)
print(f"📊 查询结果: {result}")
# 返回结果给 LLM
messages.append(message)
messages.append({
"role": "function",
"name": function_name,
"content": result
})
# 第二次调用:生成自然语言回复
second_response = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return second_response.choices[0].message.content
else:
return message.content
# 测试
print(chat_with_database("技术部有多少人?"))
print("\n" + "="*50 + "\n")
print(chat_with_database("薪资最高的3名员工是谁?"))
print("\n" + "="*50 + "\n")
print(chat_with_database("2020年入职的员工有哪些?"))输出:
🔍 生成的 SQL: SELECT COUNT(*) as count FROM employees WHERE department = '技术部'
📊 查询结果: {"success": true, "count": 1, "data": [{"count": 2}]}
技术部有2名员工。
==================================================
🔍 生成的 SQL: SELECT name, salary FROM employees ORDER BY salary DESC LIMIT 3
📊 查询结果: {"success": true, "count": 3, "data": [{"name": "王五", "salary": 18000}, ...]}
薪资最高的3名员工是:
1. 王五 - 18000元
2. 张三 - 15000元
3. 李四 - 12000元案例 2: 多步骤工作流
需求: 订机票流程(搜索 → 确认 → 预订)
python
from openai import OpenAI
from datetime import datetime
import json
client = OpenAI()
# 模拟数据库
flights_db = [
{"id": "CA1234", "from": "北京", "to": "上海", "date": "2024-03-15", "time": "08:00", "price": 800, "seats": 10},
{"id": "MU5678", "from": "北京", "to": "上海", "date": "2024-03-15", "time": "14:00", "price": 650, "seats": 5},
{"id": "CZ9012", "from": "北京", "to": "上海", "date": "2024-03-15", "time": "18:00", "price": 700, "seats": 8},
]
bookings = []
# 定义工具函数
def search_flights(origin: str, destination: str, date: str) -> str:
"""搜索航班"""
results = [
f for f in flights_db
if f["from"] == origin and f["to"] == destination and f["date"] == date
]
return json.dumps({
"success": True,
"count": len(results),
"flights": results
}, ensure_ascii=False, indent=2)
def check_availability(flight_id: str) -> str:
"""检查航班可用性"""
flight = next((f for f in flights_db if f["id"] == flight_id), None)
if not flight:
return json.dumps({"error": "航班不存在"}, ensure_ascii=False)
return json.dumps({
"success": True,
"flight_id": flight_id,
"available_seats": flight["seats"],
"price": flight["price"]
}, ensure_ascii=False)
def book_flight(flight_id: str, passenger_name: str, passenger_count: int = 1) -> str:
"""预订航班"""
flight = next((f for f in flights_db if f["id"] == flight_id), None)
if not flight:
return json.dumps({"error": "航班不存在"}, ensure_ascii=False)
if flight["seats"] < passenger_count:
return json.dumps({"error": "座位不足"}, ensure_ascii=False)
# 扣减座位
flight["seats"] -= passenger_count
# 创建订单
booking = {
"booking_id": f"BK{len(bookings) + 1:04d}",
"flight_id": flight_id,
"passenger_name": passenger_name,
"passenger_count": passenger_count,
"total_price": flight["price"] * passenger_count,
"status": "confirmed",
"booking_time": datetime.now().isoformat()
}
bookings.append(booking)
return json.dumps({
"success": True,
"booking": booking
}, ensure_ascii=False, indent=2)
# 定义函数 Schema
functions = [
{
"name": "search_flights",
"description": "搜索航班信息",
"parameters": {
"type": "object",
"properties": {
"origin": {"type": "string", "description": "出发城市"},
"destination": {"type": "string", "description": "目的地城市"},
"date": {"type": "string", "description": "日期,格式 YYYY-MM-DD"}
},
"required": ["origin", "destination", "date"]
}
},
{
"name": "check_availability",
"description": "检查指定航班的座位和价格",
"parameters": {
"type": "object",
"properties": {
"flight_id": {"type": "string", "description": "航班号"}
},
"required": ["flight_id"]
}
},
{
"name": "book_flight",
"description": "预订航班",
"parameters": {
"type": "object",
"properties": {
"flight_id": {"type": "string", "description": "航班号"},
"passenger_name": {"type": "string", "description": "乘客姓名"},
"passenger_count": {"type": "integer", "description": "乘客数量", "default": 1}
},
"required": ["flight_id", "passenger_name"]
}
}
]
available_functions = {
"search_flights": search_flights,
"check_availability": check_availability,
"book_flight": book_flight
}
def flight_booking_agent(user_input: str) -> str:
"""航班预订 Agent"""
messages = [
{
"role": "system",
"content": """
你是一个航班预订助手。帮助用户完成以下流程:
1. 搜索航班
2. 询问用户偏好(时间、价格)
3. 检查航班可用性
4. 确认乘客信息
5. 完成预订
注意:
- 预订前必须确认用户意图
- 提供清晰的航班信息对比
- 预订成功后提供订单号
"""
},
{"role": "user", "content": user_input}
]
max_iterations = 10
iteration = 0
while iteration < max_iterations:
iteration += 1
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
functions=functions,
function_call="auto"
)
message = response.choices[0].message
messages.append(message)
# 如果没有函数调用,返回最终回复
if not message.function_call:
return message.content
# 执行函数
function_name = message.function_call.name
function_args = json.loads(message.function_call.arguments)
print(f"\n🔧 [{iteration}] 调用函数: {function_name}")
print(f"📝 参数: {json.dumps(function_args, ensure_ascii=False)}")
function_result = available_functions[function_name](**function_args)
print(f"✅ 结果: {function_result[:200]}...")
# 将结果添加到消息历史
messages.append({
"role": "function",
"name": function_name,
"content": function_result
})
return "抱歉,处理时间过长,请重新开始。"
# 测试
print(flight_booking_agent("我想订明天从北京到上海的机票,下午出发,价格便宜点"))输出:
🔧 [1] 调用函数: search_flights
📝 参数: {"origin": "北京", "destination": "上海", "date": "2024-03-15"}
✅ 结果: {"success": true, "count": 3, "flights": [{"id": "CA1234", ...}]}
找到3个航班:
1. CA1234 - 08:00 出发,800元
2. MU5678 - 14:00 出发,650元(推荐)
3. CZ9012 - 18:00 出发,700元
根据您的需求(下午出发、价格便宜),我推荐 MU5678 航班。是否预订?
[用户确认后...]
🔧 [2] 调用函数: book_flight
📝 参数: {"flight_id": "MU5678", "passenger_name": "张三", "passenger_count": 1}
✅ 结果: {"success": true, "booking": {"booking_id": "BK0001", ...}}
预订成功!
订单号:BK0001
航班:MU5678
乘客:张三
价格:650元案例 3: 智能客服系统
python
from openai import OpenAI
import json
client = OpenAI()
# 模拟知识库
knowledge_base = {
"退款政策": "购买后7天内可无理由退款,需保持商品完好。",
"配送时间": "一般3-5个工作日送达,偏远地区可能需要7-10天。",
"售后服务": "提供1年质保,人为损坏不在保修范围内。"
}
orders_db = {
"ORD001": {"status": "已发货", "tracking": "SF1234567890", "items": ["iPhone 15"]},
"ORD002": {"status": "配送中", "tracking": "YT9876543210", "items": ["iPad Pro"]}
}
# 工具函数
def search_knowledge(query: str) -> str:
"""搜索知识库"""
results = []
for key, value in knowledge_base.items():
if query in key or query in value:
results.append(f"{key}: {value}")
if results:
return json.dumps({
"success": True,
"results": results
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"message": "未找到相关信息"
}, ensure_ascii=False)
def query_order_status(order_id: str) -> str:
"""查询订单状态"""
order = orders_db.get(order_id)
if order:
return json.dumps({
"success": True,
"order_id": order_id,
"status": order["status"],
"tracking": order["tracking"],
"items": order["items"]
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"message": "订单不存在"
}, ensure_ascii=False)
def create_ticket(issue_type: str, description: str, priority: str = "medium") -> str:
"""创建工单(升级到人工)"""
ticket = {
"ticket_id": f"TK{len(range(100)) + 1:04d}",
"issue_type": issue_type,
"description": description,
"priority": priority,
"status": "待处理"
}
return json.dumps({
"success": True,
"message": "已创建工单,客服将在24小时内联系您",
"ticket": ticket
}, ensure_ascii=False)
# 函数定义
functions = [
{
"name": "search_knowledge",
"description": "搜索知识库,查找常见问题的答案",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索关键词"}
},
"required": ["query"]
}
},
{
"name": "query_order_status",
"description": "查询订单状态和物流信息",
"parameters": {
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "订单号"}
},
"required": ["order_id"]
}
},
{
"name": "create_ticket",
"description": "创建工单,升级到人工客服处理",
"parameters": {
"type": "object",
"properties": {
"issue_type": {
"type": "string",
"enum": ["退款", "投诉", "技术问题", "其他"],
"description": "问题类型"
},
"description": {"type": "string", "description": "问题描述"},
"priority": {
"type": "string",
"enum": ["low", "medium", "high"],
"default": "medium",
"description": "优先级"
}
},
"required": ["issue_type", "description"]
}
}
]
available_functions = {
"search_knowledge": search_knowledge,
"query_order_status": query_order_status,
"create_ticket": create_ticket
}
def customer_service_bot(user_message: str, conversation_history: list = None) -> tuple[str, list]:
"""客服机器人"""
if conversation_history is None:
conversation_history = [
{
"role": "system",
"content": """
你是一个智能客服助手。职责:
1. 回答常见问题(使用 search_knowledge)
2. 查询订单状态(使用 query_order_status)
3. 无法解决的问题升级到人工(使用 create_ticket)
原则:
- 友好、耐心、专业
- 先尝试自己解决,实在不行再升级
- 提供清晰的信息和下一步指引
"""
}
]
conversation_history.append({"role": "user", "content": user_message})
response = client.chat.completions.create(
model="gpt-4o",
messages=conversation_history,
functions=functions,
function_call="auto"
)
message = response.choices[0].message
conversation_history.append(message)
if message.function_call:
function_name = message.function_call.name
function_args = json.loads(message.function_call.arguments)
print(f"🔧 调用: {function_name}({function_args})")
function_result = available_functions[function_name](**function_args)
conversation_history.append({
"role": "function",
"name": function_name,
"content": function_result
})
# 再次调用生成回复
second_response = client.chat.completions.create(
model="gpt-4o",
messages=conversation_history
)
final_message = second_response.choices[0].message
conversation_history.append(final_message)
return final_message.content, conversation_history
else:
return message.content, conversation_history
# 测试多轮对话
history = None
queries = [
"你们的退款政策是什么?",
"我的订单 ORD001 到哪了?",
"商品有质量问题,我要投诉!"
]
for query in queries:
print(f"\n👤 用户: {query}")
response, history = customer_service_bot(query, history)
print(f"🤖 客服: {response}")
print("-" * 50)5.3.6 高级技巧
1. 动态函数注册
根据用户上下文动态选择可用函数。
python
class DynamicFunctionRegistry:
"""动态函数注册器"""
def __init__(self):
self.all_functions = {
"basic": [get_weather, get_time],
"premium": [get_weather, get_time, search_web, send_email],
"admin": [get_weather, get_time, search_web, send_email, query_database, execute_code]
}
def get_functions_for_user(self, user_tier: str) -> list:
"""根据用户等级返回可用函数"""
return self.all_functions.get(user_tier, self.all_functions["basic"])
# 使用
registry = DynamicFunctionRegistry()
user_tier = "premium" # 从用户信息获取
available_functions = registry.get_functions_for_user(user_tier)
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
functions=[func_to_schema(f) for f in available_functions]
)2. 函数链式调用
一个函数的输出作为另一个函数的输入。
python
# 定义函数
def get_user_location(user_id: str) -> str:
"""获取用户位置"""
return json.dumps({"user_id": user_id, "city": "北京"})
def get_weather(city: str) -> str:
"""获取天气"""
return json.dumps({"city": city, "temperature": 15, "condition": "晴天"})
# LLM 会自动链式调用:
# 1. 调用 get_user_location(user_id="123")
# 2. 从结果中提取 city="北京"
# 3. 调用 get_weather(city="北京")
# 4. 生成最终回复
functions = [
func_to_schema(get_user_location),
func_to_schema(get_weather)
]
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "用户123那里天气怎么样?"}],
functions=functions
)3. 函数执行缓存
避免重复调用相同函数。
python
from functools import lru_cache
import hashlib
class FunctionCache:
"""函数调用缓存"""
def __init__(self, ttl: int = 300):
self.cache = {}
self.ttl = ttl # 缓存有效期(秒)
def get_cache_key(self, function_name: str, args: dict) -> str:
"""生成缓存键"""
args_str = json.dumps(args, sort_keys=True)
return hashlib.md5(f"{function_name}:{args_str}".encode()).hexdigest()
def get(self, function_name: str, args: dict):
"""获取缓存"""
key = self.get_cache_key(function_name, args)
cached = self.cache.get(key)
if cached:
timestamp, result = cached
if time.time() - timestamp < self.ttl:
print(f"🎯 缓存命中: {function_name}")
return result
return None
def set(self, function_name: str, args: dict, result: str):
"""设置缓存"""
key = self.get_cache_key(function_name, args)
self.cache[key] = (time.time(), result)
# 使用
cache = FunctionCache(ttl=300)
def execute_function_with_cache(function_name: str, args: dict) -> str:
"""带缓存的函数执行"""
# 检查缓存
cached_result = cache.get(function_name, args)
if cached_result:
return cached_result
# 执行函数
result = available_functions[function_name](**args)
# 存入缓存
cache.set(function_name, args, result)
return result4. 异步函数调用
提升性能,特别是需要调用多个外部 API 时。
python
import asyncio
import aiohttp
async def async_get_weather(city: str) -> str:
"""异步天气查询"""
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.weather.com?city={city}") as response:
data = await response.json()
return json.dumps(data)
async def async_get_news(topic: str) -> str:
"""异步新闻查询"""
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.news.com?topic={topic}") as response:
data = await response.json()
return json.dumps(data)
async def parallel_function_calls(calls: list[tuple[str, dict]]) -> list[str]:
"""并行执行多个函数"""
tasks = []
for function_name, args in calls:
if function_name == "get_weather":
tasks.append(async_get_weather(**args))
elif function_name == "get_news":
tasks.append(async_get_news(**args))
results = await asyncio.gather(*tasks)
return results
# 使用
calls = [
("get_weather", {"city": "北京"}),
("get_weather", {"city": "上海"}),
("get_news", {"topic": "AI"})
]
results = asyncio.run(parallel_function_calls(calls))5.3.7 安全性考虑
1. 输入验证
python
from pydantic import BaseModel, Field, validator
class SafeSearchInput(BaseModel):
"""安全的搜索输入"""
query: str = Field(max_length=200)
@validator('query')
def validate_query(cls, v):
# 禁止 SQL 注入关键词
dangerous_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT', '--', ';']
if any(kw in v.upper() for kw in dangerous_keywords):
raise ValueError("查询包含危险关键词")
return v
def safe_search(query: str) -> str:
"""安全的搜索函数"""
try:
validated = SafeSearchInput(query=query)
# 执行搜索...
except ValueError as e:
return json.dumps({"error": str(e)})2. 权限控制
python
class FunctionPermissionManager:
"""函数权限管理器"""
def __init__(self):
self.permissions = {
"user": ["get_weather", "search_web"],
"admin": ["get_weather", "search_web", "query_database", "send_email"],
"superadmin": ["*"] # 所有权限
}
def can_execute(self, user_role: str, function_name: str) -> bool:
"""检查用户是否有权限执行函数"""
allowed = self.permissions.get(user_role, [])
return "*" in allowed or function_name in allowed
def execute_with_permission(self, user_role: str, function_name: str, args: dict) -> str:
"""带权限检查的函数执行"""
if not self.can_execute(user_role, function_name):
return json.dumps({
"error": "权限不足",
"code": "PERMISSION_DENIED"
})
return available_functions[function_name](**args)
# 使用
permission_manager = FunctionPermissionManager()
result = permission_manager.execute_with_permission(
user_role="user",
function_name="query_database", # 用户无权限
args={"sql": "SELECT * FROM users"}
)
# 返回: {"error": "权限不足", "code": "PERMISSION_DENIED"}3. 速率限制
python
from collections import defaultdict
import time
class RateLimiter:
"""速率限制器"""
def __init__(self, max_calls: int = 10, window: int = 60):
self.max_calls = max_calls
self.window = window
self.calls = defaultdict(list)
def is_allowed(self, user_id: str, function_name: str) -> bool:
"""检查是否允许调用"""
key = f"{user_id}:{function_name}"
now = time.time()
# 清理过期记录
self.calls[key] = [t for t in self.calls[key] if now - t < self.window]
# 检查是否超限
if len(self.calls[key]) >= self.max_calls:
return False
# 记录本次调用
self.calls[key].append(now)
return True
# 使用
rate_limiter = RateLimiter(max_calls=5, window=60)
def execute_with_rate_limit(user_id: str, function_name: str, args: dict) -> str:
"""带速率限制的函数执行"""
if not rate_limiter.is_allowed(user_id, function_name):
return json.dumps({
"error": "调用频率过高,请稍后再试",
"code": "RATE_LIMIT_EXCEEDED"
})
return available_functions[function_name](**args)5.3.8 调试与监控
1. 详细日志
python
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(f'function_calls_{datetime.now():%Y%m%d}.log'),
logging.StreamHandler()
]
)
def execute_function_with_logging(function_name: str, args: dict) -> str:
"""带日志的函数执行"""
start_time = time.time()
logging.info(f"开始执行函数: {function_name}")
logging.info(f"参数: {json.dumps(args, ensure_ascii=False)}")
try:
result = available_functions[function_name](**args)
execution_time = time.time() - start_time
logging.info(f"执行成功,耗时: {execution_time:.2f}秒")
logging.info(f"结果: {result[:200]}...")
return result
except Exception as e:
logging.error(f"执行失败: {str(e)}", exc_info=True)
return json.dumps({"error": str(e)})2. 性能监控
python
from dataclasses import dataclass
from typing import Dict, List
import statistics
@dataclass
class FunctionMetrics:
"""函数调用指标"""
function_name: str
call_count: int = 0
total_time: float = 0.0
errors: int = 0
avg_time: float = 0.0
class FunctionMonitor:
"""函数调用监控器"""
def __init__(self):
self.metrics: Dict[str, FunctionMetrics] = {}
self.call_history: List[dict] = []
def record_call(self, function_name: str, execution_time: float, success: bool):
"""记录函数调用"""
if function_name not in self.metrics:
self.metrics[function_name] = FunctionMetrics(function_name=function_name)
metric = self.metrics[function_name]
metric.call_count += 1
metric.total_time += execution_time
metric.avg_time = metric.total_time / metric.call_count
if not success:
metric.errors += 1
self.call_history.append({
"function": function_name,
"time": execution_time,
"success": success,
"timestamp": datetime.now().isoformat()
})
def get_report(self) -> str:
"""生成监控报告"""
report = "函数调用统计报告\n"
report += "=" * 50 + "\n"
for name, metric in self.metrics.items():
report += f"\n函数: {name}\n"
report += f" 调用次数: {metric.call_count}\n"
report += f" 平均耗时: {metric.avg_time:.3f}秒\n"
report += f" 错误次数: {metric.errors}\n"
report += f" 成功率: {(1 - metric.errors/metric.call_count)*100:.1f}%\n"
return report
# 使用
monitor = FunctionMonitor()
def execute_with_monitoring(function_name: str, args: dict) -> str:
"""带监控的函数执行"""
start_time = time.time()
success = True
try:
result = available_functions[function_name](**args)
return result
except Exception as e:
success = False
return json.dumps({"error": str(e)})
finally:
execution_time = time.time() - start_time
monitor.record_call(function_name, execution_time, success)
# 定期输出报告
print(monitor.get_report())5.3.9 学习资源
官方文档:
- [Open