8.1 AI 聊天应用(智能客服系统)
从零构建一个生产级 AI 对话系统:意图识别 → RAG 知识库 → 工具调用 → 多轮对话 → 流式输出。
难度:⭐⭐⭐ | 预计时长:2-3 周
智能客服是 LLM 落地最成熟的场景之一,核心挑战是:准确理解用户意图、从知识库精确检索答案、维护多轮对话上下文、识别何时转人工。本节构建一个完整的电商客服系统作为参考实现。
系统架构
用户输入
↓
[意图分类器]──────────────────────────────────────┐
↓ FAQ/产品咨询 ↓ 订单查询/退款 ↓ 投诉/转人工
[RAG 知识检索] [工具调用/API] [人工坐席路由]
↓ ↓ ↓
[答案生成 + 引用] [结构化数据回复] [会话移交]
↓
[多轮对话历史管理]
↓
[满意度评分收集]8.1.1 意图识别
python
# pip install openai pydantic
from openai import OpenAI
from pydantic import BaseModel
from enum import Enum
from typing import Optional
client = OpenAI()
class Intent(str, Enum):
FAQ = "faq" # 常见问题咨询
PRODUCT_QUERY = "product" # 产品/价格查询
ORDER_QUERY = "order" # 订单状态查询
RETURN_REFUND = "return" # 退换货申请
COMPLAINT = "complaint" # 投诉建议
HUMAN_AGENT = "human" # 转人工
GREETING = "greeting" # 打招呼/闲聊
OUT_OF_SCOPE = "out_of_scope" # 超出服务范围
class IntentResult(BaseModel):
intent: Intent
confidence: float # 0.0 - 1.0
entities: dict # 提取的实体(订单号、商品名等)
summary: str # 一句话总结用户诉求
INTENT_SYSTEM_PROMPT = """你是电商平台的意图识别系统。分析用户输入,返回JSON:
{
"intent": "faq|product|order|return|complaint|human|greeting|out_of_scope",
"confidence": 0.0-1.0,
"entities": {
"order_id": "订单号(如有)",
"product_name": "商品名(如有)",
"issue_type": "问题类型(如有)"
},
"summary": "一句话描述用户诉求"
}
意图判断规则:
- faq: 物流时间、退货政策、支付方式等通用问题
- product: 询问具体商品规格、价格、库存
- order: 查询订单进度、物流状态
- return: 申请退货、退款、换货
- complaint: 表达不满、投诉服务或商品质量
- human: 明确要求转人工、情绪激动
- greeting: 你好、谢谢等礼貌性话语"""
def classify_intent(user_message: str, conversation_context: str = "") -> IntentResult:
"""意图分类(含上下文感知)"""
import json
context_note = f"\n\n历史对话摘要:{conversation_context}" if conversation_context else ""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": INTENT_SYSTEM_PROMPT},
{"role": "user", "content": f"用户消息:{user_message}{context_note}"}
],
response_format={"type": "json_object"},
temperature=0,
max_tokens=200
)
data = json.loads(response.choices[0].message.content)
return IntentResult(**data)
# 使用示例
result = classify_intent("我的订单20240315001还没到,现在是什么状态?")
print(f"意图: {result.intent}, 置信度: {result.confidence}")
print(f"实体: {result.entities}") # {"order_id": "20240315001"}8.1.2 知识库管理(RAG)
python
# pip install chromadb openai
import chromadb
from openai import OpenAI
from pathlib import Path
import json
client = OpenAI()
chroma_client = chromadb.PersistentClient(path="./customer_service_db")
collection = chroma_client.get_or_create_collection(
name="faq_knowledge",
metadata={"hnsw:space": "cosine"}
)
# 知识库文档结构
FAQ_DOCS = [
{
"id": "faq_001",
"category": "物流",
"question": "订单多久可以发货?",
"answer": "工作日下午3点前下单,当天发货;3点后下单,次日发货。节假日可能延迟1-2天。",
"tags": ["发货", "物流", "时效"]
},
{
"id": "faq_002",
"category": "退换货",
"question": "如何申请退货退款?",
"answer": "收到商品7天内可申请无理由退货(不影响二次销售)。在订单详情页点击"申请退款",填写退货原因并提交。退款将在收到退货后3-5个工作日内原路退回。",
"tags": ["退货", "退款", "7天无理由"]
},
{
"id": "faq_003",
"category": "支付",
"question": "支持哪些支付方式?",
"answer": "支持微信支付、支付宝、银行卡(借记卡/信用卡)、花呗分期。企业客户可开通账期付款。",
"tags": ["支付", "微信", "支付宝", "信用卡"]
},
# ... 更多文档
]
def embed_text(text: str) -> list[float]:
"""文本向量化"""
resp = client.embeddings.create(model="text-embedding-3-small", input=text)
return resp.data[0].embedding
def build_knowledge_base(docs: list[dict]):
"""构建知识库向量索引"""
ids, embeddings, documents, metadatas = [], [], [], []
for doc in docs:
# 将问题+答案合并后向量化(提升检索召回率)
text = f"问题:{doc['question']}\n答案:{doc['answer']}"
ids.append(doc["id"])
embeddings.append(embed_text(text))
documents.append(text)
metadatas.append({
"category": doc["category"],
"question": doc["question"],
"answer": doc["answer"],
"tags": ",".join(doc.get("tags", []))
})
collection.upsert(ids=ids, embeddings=embeddings,
documents=documents, metadatas=metadatas)
print(f"知识库构建完成,共 {len(docs)} 条文档")
def retrieve_knowledge(query: str, top_k: int = 3,
category_filter: str = None) -> list[dict]:
"""语义检索知识库"""
where = {"category": category_filter} if category_filter else None
results = collection.query(
query_embeddings=[embed_text(query)],
n_results=top_k,
where=where,
include=["metadatas", "distances"]
)
docs = []
for meta, dist in zip(results["metadatas"][0], results["distances"][0]):
similarity = 1 - dist # 余弦距离转相似度
if similarity > 0.5: # 相似度阈值过滤
docs.append({
"question": meta["question"],
"answer": meta["answer"],
"category": meta["category"],
"similarity": round(similarity, 3)
})
return docs
# 构建知识库
build_knowledge_base(FAQ_DOCS)8.1.3 工具调用(订单/账户 API)
python
from openai import OpenAI
import json
from typing import Any
client = OpenAI()
# ---- 模拟业务 API ----
def query_order_status(order_id: str) -> dict:
"""查询订单状态(实际对接订单系统)"""
mock_orders = {
"20240315001": {
"status": "已发货", "carrier": "顺丰速运",
"tracking_no": "SF1234567890",
"estimated_delivery": "2024-03-17",
"items": [{"name": "iPhone 15 Pro", "qty": 1, "price": 8999}]
},
"20240316002": {
"status": "待支付", "amount": 299.0,
"expire_time": "2024-03-16 18:00:00"
}
}
return mock_orders.get(order_id, {"error": f"未找到订单 {order_id}"})
def submit_return_request(order_id: str, reason: str, items: list[str]) -> dict:
"""提交退货申请"""
return {
"return_id": f"RET{order_id}001",
"status": "申请已提交",
"message": "退货申请已提交,请在3日内将商品寄回,运费由我们承担。",
"return_address": "上海市浦东新区XX路XX号 退货仓 收",
"deadline": "2024-03-23"
}
def query_user_points(user_id: str) -> dict:
"""查询用户积分"""
return {"points": 1580, "level": "黄金会员", "next_level_needed": 420}
# ---- 工具定义(OpenAI Function Calling 格式)----
CUSTOMER_SERVICE_TOOLS = [
{
"type": "function",
"function": {
"name": "query_order_status",
"description": "查询订单的当前状态、物流信息和商品详情",
"parameters": {
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "订单号,如 20240315001"}
},
"required": ["order_id"]
}
}
},
{
"type": "function",
"function": {
"name": "submit_return_request",
"description": "为用户提交退货退款申请",
"parameters": {
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "需要退货的订单号"},
"reason": {"type": "string", "description": "退货原因"},
"items": {
"type": "array",
"items": {"type": "string"},
"description": "需要退货的商品名称列表"
}
},
"required": ["order_id", "reason", "items"]
}
}
},
{
"type": "function",
"function": {
"name": "query_user_points",
"description": "查询用户的积分余额和会员等级",
"parameters": {
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "用户ID"}
},
"required": ["user_id"]
}
}
}
]
TOOL_MAP = {
"query_order_status": query_order_status,
"submit_return_request": submit_return_request,
"query_user_points": query_user_points
}
def execute_tool(tool_name: str, arguments: dict) -> Any:
"""执行工具调用"""
tool_func = TOOL_MAP.get(tool_name)
if not tool_func:
return {"error": f"未知工具:{tool_name}"}
return tool_func(**arguments)8.1.4 多轮对话管理
python
import redis
import json
import time
from dataclasses import dataclass, field, asdict
from typing import Optional
redis_client = redis.Redis(host="localhost", port=6379, db=6, decode_responses=True)
@dataclass
class CustomerSession:
session_id: str
user_id: str
channel: str # web / app / wechat
conversation: list = field(default_factory=list)
intent_history: list = field(default_factory=list)
context: dict = field(default_factory=dict) # 上下文槽位(订单号等)
status: str = "active" # active / transferred / closed
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
satisfaction_score: Optional[int] = None
@property
def session_key(self) -> str:
return f"cs:session:{self.session_id}"
def save(self, ttl: int = 3600):
self.updated_at = time.time()
redis_client.setex(
self.session_key, ttl,
json.dumps(asdict(self), ensure_ascii=False)
)
@classmethod
def load(cls, session_id: str) -> Optional["CustomerSession"]:
raw = redis_client.get(f"cs:session:{session_id}")
if not raw:
return None
data = json.loads(raw)
return cls(**data)
def add_message(self, role: str, content: str, metadata: dict = None):
self.conversation.append({
"role": role,
"content": content,
"timestamp": time.time(),
**(metadata or {})
})
# 控制对话历史长度(保留最近 20 条)
if len(self.conversation) > 20:
self.conversation = self.conversation[-20:]
self.save()
def update_context(self, **kwargs):
"""更新会话上下文槽位(如从对话中提取的订单号)"""
self.context.update(kwargs)
self.save()
def should_transfer_to_human(self) -> tuple[bool, str]:
"""判断是否需要转人工"""
# 规则1:用户明确要求转人工
recent_messages = self.conversation[-3:]
for msg in recent_messages:
if msg["role"] == "user":
if any(kw in msg["content"] for kw in
["转人工", "人工客服", "真人", "投诉", "manager"]):
return True, "用户主动请求转人工"
# 规则2:同一意图重复超过 2 次(未解决)
recent_intents = self.intent_history[-4:]
if len(recent_intents) >= 3:
intent_counts = {}
for i in recent_intents:
intent_counts[i] = intent_counts.get(i, 0) + 1
if max(intent_counts.values()) >= 3:
return True, "问题未能解决,循环重复"
# 规则3:连续 2 次 AI 未能回答(回复了兜底话术)
fallback_count = sum(
1 for msg in self.conversation[-4:]
if msg["role"] == "assistant" and
any(kw in msg["content"] for kw in ["很抱歉", "无法帮助", "请联系"])
)
if fallback_count >= 2:
return True, "AI 多次未能解决问题"
return False, ""8.1.5 核心对话引擎
python
CUSTOMER_SERVICE_SYSTEM_PROMPT = """你是「优购商城」的AI客服助手小优,专业、友好、高效。
## 服务能力
- 解答常见问题(物流、退换货、支付、会员)
- 查询订单状态和物流信息
- 协助处理退货退款申请
- 查询用户积分和会员权益
## 回复规范
1. 称呼用户为"您",保持礼貌专业
2. 回复简洁直接,不超过150字
3. 涉及具体数据(订单、物流),必须调用工具获取真实数据
4. 无法处理的问题,主动提示转人工
5. 每次回复末尾可适当引导下一步操作
## 兜底话术
当无法确认答案时:
"非常抱歉,这个问题我需要进一步确认。您可以:
①继续描述问题,我来帮您查询
②输入"转人工"由专业客服为您服务
③拨打客服热线 400-XXX-XXXX"
"""
def run_customer_service_turn(
session: CustomerSession,
user_message: str,
retrieved_docs: list[dict]
) -> str:
"""执行单轮客服对话(含工具调用循环)"""
# 构建 RAG 上下文
rag_context = ""
if retrieved_docs:
rag_context = "\n\n## 相关知识库内容\n" + "\n".join([
f"Q: {doc['question']}\nA: {doc['answer']}"
for doc in retrieved_docs[:2]
])
# 构建消息列表
system_content = CUSTOMER_SERVICE_SYSTEM_PROMPT + rag_context
if session.context:
system_content += f"\n\n## 当前会话上下文\n{json.dumps(session.context, ensure_ascii=False)}"
messages = [{"role": "system", "content": system_content}]
# 加入对话历史(不含时间戳等元数据)
for msg in session.conversation[-8:]:
messages.append({"role": msg["role"], "content": msg["content"]})
messages.append({"role": "user", "content": user_message})
# 工具调用循环(最多 3 轮,防止死循环)
max_tool_rounds = 3
for _ in range(max_tool_rounds):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
tools=CUSTOMER_SERVICE_TOOLS,
tool_choice="auto",
max_tokens=400
)
msg = response.choices[0].message
# 无工具调用,直接返回文本
if not msg.tool_calls:
return msg.content
# 执行工具调用
messages.append(msg)
for tool_call in msg.tool_calls:
func_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
# 自动填充 user_id(避免用户伪造)
if "user_id" in arguments:
arguments["user_id"] = session.user_id
tool_result = execute_tool(func_name, arguments)
# 提取实体到会话上下文
if func_name == "query_order_status" and "order_id" in arguments:
session.update_context(order_id=arguments["order_id"])
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(tool_result, ensure_ascii=False)
})
# 超过最大轮数,让模型基于工具结果生成最终回复
final = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
max_tokens=400
)
return final.choices[0].message.content8.1.6 完整服务入口
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uuid
app = FastAPI(title="智能客服系统")
class ChatRequest(BaseModel):
session_id: str = ""
user_id: str
message: str
channel: str = "web"
class ChatResponse(BaseModel):
session_id: str
reply: str
intent: str
transferred: bool = False
transfer_reason: str = ""
need_satisfaction_survey: bool = False
@app.post("/api/customer-service/chat", response_model=ChatResponse)
async def customer_service_chat(request: ChatRequest):
# 加载或创建会话
session_id = request.session_id or str(uuid.uuid4())
session = CustomerSession.load(session_id) or CustomerSession(
session_id=session_id,
user_id=request.user_id,
channel=request.channel
)
if session.status != "active":
raise HTTPException(status_code=400, detail="会话已关闭")
# 记录用户消息
session.add_message("user", request.message)
# 意图识别
intent_result = classify_intent(
request.message,
conversation_context=session.context.get("summary", "")
)
session.intent_history.append(intent_result.intent)
# 从意图提取实体到上下文
if intent_result.entities.get("order_id"):
session.update_context(order_id=intent_result.entities["order_id"])
# 检查是否需要转人工
should_transfer, transfer_reason = session.should_transfer_to_human()
if should_transfer or intent_result.intent == Intent.HUMAN_AGENT:
session.status = "transferred"
session.save()
return ChatResponse(
session_id=session_id,
reply="正在为您转接人工客服,请稍候...预计等待时间 2 分钟。",
intent=intent_result.intent,
transferred=True,
transfer_reason=transfer_reason or "用户请求"
)
# 打招呼直接回复,不走 RAG
if intent_result.intent == Intent.GREETING:
reply = "您好!我是优购商城客服小优,很高兴为您服务。请问有什么可以帮助您的?"
session.add_message("assistant", reply)
return ChatResponse(session_id=session_id, reply=reply, intent=intent_result.intent)
# 知识库检索(FAQ 和产品咨询类走 RAG)
retrieved_docs = []
if intent_result.intent in (Intent.FAQ, Intent.PRODUCT_QUERY):
category_map = {Intent.FAQ: None, Intent.PRODUCT_QUERY: "产品"}
retrieved_docs = retrieve_knowledge(
request.message, top_k=3,
category_filter=category_map.get(intent_result.intent)
)
# 生成回复
reply = run_customer_service_turn(session, request.message, retrieved_docs)
session.add_message("assistant", reply, {"intent": intent_result.intent})
# 对话满意度调查触发(每 5 轮或对话结束时)
need_survey = len(session.conversation) % 10 == 0
return ChatResponse(
session_id=session_id,
reply=reply,
intent=intent_result.intent,
need_satisfaction_survey=need_survey
)
@app.post("/api/customer-service/feedback")
async def submit_feedback(session_id: str, score: int, comment: str = ""):
"""用户满意度反馈(1-5分)"""
session = CustomerSession.load(session_id)
if not session:
raise HTTPException(status_code=404, detail="会话不存在")
session.satisfaction_score = score
session.save()
return {"status": "ok", "message": "感谢您的反馈!"}系统关键指标
| 指标 | 目标值 | 优化方向 |
|---|---|---|
| 意图识别准确率 | ≥ 95% | 扩充训练样本、分类模型微调 |
| 知识库检索准确率 | ≥ 90% | 优化分块策略、混合检索 |
| 首轮解决率 | ≥ 70% | 丰富知识库、完善工具接入 |
| 平均响应延迟 | ≤ 2s | 流式输出、缓存、模型降级 |
| 转人工率 | ≤ 20% | 扩充知识库覆盖范围 |
| 用户满意度 | ≥ 4.2/5 | 优化回复风格、减少无效兜底 |