背景痛点:传统客服系统为什么总“掉链子”
去年帮一家做跨境电商的兄弟公司改造客服,老系统用的是“关键字+正则”硬匹配,痛点肉眼可见:
- 响应延迟:高峰期平均 RT 800 ms,一旦并发上到 200,直接飙到 2 s,用户开始刷“人工客服”。
- 意图识别不准:商品咨询、物流、退换货混在一起,Top-1 准确率 62%,导致 30% 对话被迫转人工。
- 扩展性差:新增一个“促销政策”意图,要改 5 张正则表,上线周期 3 天,业务同学等不起。
一句话:老架构在流量和语义双重压力下,既慢又笨,还难迭代。
技术选型:Rasa、Dialogflow 还是自研轻量方案?
我把当时能落地的三条路线都跑了一遍,用同一批 2.3 万条真实语料做 benchmark,结论如下:
| 维度 | Rasa 3.x | Dialogflow ES | 自研 BERT+Flask |
|---|---|---|---|
| 平均响应 | 320 ms | 180 ms | 120 ms |
| 训练耗时 | 25 min(GPU) | 云端托管 | 8 min(RTX3060) |
| 年费用 | 0 元(开源) | ¥0.8 万(1 万会话/月) | 0 元 |
| 私有部署 | 支持 | 不支持 | 支持 |
| 二次开发 | 重,Python 技能栈深 | 受限于平台 | 完全可控 |
说明:
- 响应延迟在 4 核 8 G Docker 容器内压测,并发 100,每次只测意图识别阶段。
- 训练成本按“新增 1 个意图、500 条样本”统计。
- 自研方案用 4 层 BERT-base 微调,参数量 110 M,推理阶段开 ONNX+Quantization,单卡 QPS 35,比 Rasa 的 DIET 高 35%。
如果团队对数据隐私、可定制性要求高,且具备中级 Python 能力,自研轻量方案最划算。
核心实现:Flask + BERT 的“最小可用”客服
1. 工程目录
bot/ ├─ app.py # Flask 主程序 ├─ auth.py # JWT 鉴权 ├─ intent/ │ ├─ predict.py # BERT 推理封装 │ └─ train.py # 微调脚本 ├─ cache.py # Redis 对话缓存 └─ tasks.py # Celery 异步任务2. Flask RESTful API(含 JWT)
# app.py from flask import Flask, request, jsonify from auth import jwt_required, create_token from intent.predict import IntentPredictor from cache import CacheManager import time app = Flask(__name__) predictor = IntentPredictor() cache = CacheManager() @app.post("/api/login") def login(): uid = request.json.get("uid") token = create_token(uid) return jsonify(token=token) @app.post("/api/chat") @jwt_required def chat(): uid = request.json["uid"] text = request.json["text"] # 1. 查缓存 cached = cache.get(uid, text) if cached: return jsonify(cached) # 2. 意图识别 intent, prob,耗时 = predictor.predict(text) # 3. 构造回复(这里仅演示静态映射) answer = REPLY_MAP.get(intent, "转人工") data = {"intent": intent, "prob": prob, "reply": answer} # 4. 写缓存 cache.set(uid, text, data, ex=120) return jsonify(data) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, threaded=True)鉴权模块直接用 PyJWT,对称密钥存环境变量,有效期 30 min,代码略。
3. BERT 微调关键代码
训练前先把语料拆成 8:2,标签用“商品咨询/物流查询/退换货/促销政策/其他”五类。
# intent/train.py import pandas as pd, torch, numpy as np from sklearn.model_selection import train_test_split from transformers import BertTokenizerFast, BertForSequenceClassification from torch.utils.data import DataLoader from tqdm import tqdm MAX_LEN = 64 BATCH = 32 LR = 2e-5 EPOCH = 4 def make_loader(texts, labels): enc = tokenizer(texts, truncation=True, padding='max_length', max_length=MAX_LEN) dataset = torch.utils.data.TensorDataset( torch.tensor(enc['input_ids']), torch.tensor(enc['attention_mask']), torch.tensor(labels)) return DataLoader(dataset, batch_size=BATCH, shuffle=True) # 1. 读数据 df = pd.read_csv("intent.csv") x_train, x_test, y_train, y_test = train_test_split( df.text, df.label, test_size=0.2, random_state=42) # 2. 预处理 tokenizer = BertTokenizerFast.from_pretrained("bert-base-chinese") train_loader = make_loader(x_train, y_train) test_loader = make_loader(x_test, y_test) # 3. 模型 model = BertForSequenceClassification.from_pretrained( "bert-base-chinese", num_labels=5).cuda() opt = torch.optim.AdamW(model.parameters(), lr=LR) # 4. 训练 for epoch in range(EPOCH): model.train() for ids, mask, lbs in tqdm(train_loader): opt.zero_grad() out = model(ids.cuda(), mask.cuda(), labels=lbs.cuda()) out.loss.backward() opt.step() # 验证 model.eval() hit = total = 0 with torch.no_grad(): for ids, mask, lbs in test_loader: logits = model(ids.cuda(), mask.cuda()).logits hit += (logits.argmax(1) == lbs.cuda()).sum().item() total += lbs.size(0) print(f"Epoch {epoch}: acc={hit/total:.3f}") # 5. 保存 torch.save(model.state_dict(), "intent/model.pt")时间复杂度:微调阶段一次迭代 O(E×N×L²),E 为 epoch、N 为样本数、L 为序列长度;推理阶段 BERT 的 self-attention 仍是 O(L²),但 MAX_LEN 压到 64 后,单条 30 ms 以内。
性能优化:让 QPS 从 20 涨到 50
1. Redis 对话缓存
# cache.py import redis, json, hashlib class CacheManager: def __init__(self): self.r = redis.Redis(host='localhost', port=6379, db=0) def _key(self, uid, text): return f"chat:{uid}:"+hashlib.md5(text.encode()).hexdigest() def get(self, uid, text): v = self.r.get(self._key(uid, text)) return json.loads(v) if v else None def set(self, uid, text, data, ex=120): self.r.set(self._key(uid, text), json.dumps(data), ex=ex)命中率:上线后观察 24 h,同样问题二次询问占比 42%,缓存直接削掉 40% 的 GPU 推理,QPS 提升 35%。
2. Celery 异步处理长文本
当用户一次性粘贴 500 字订单描述,先做意图,再把实体抽取、情感分析拆成异步任务,避免阻塞主线程。
# tasks.py from celery import Celery cel = Celery('bot', broker='redis://localhost:6379/1') @cel.task def async_nlp_long(text: str): # 耗时实体抽取、情感分析 entities = ner_model.extract(text) sentiment = sentiment_model.predict(text) # 结果写回 MySQL 或通知 webhook save_result(text, entities, sentiment)配置:
# celeryconfig.py task_serializer = 'json' result_backend = 'redis://localhost:6379/2' task_track_started = True worker_prefetch_multiplier = 1 # 公平分发主流程里只要.delay()即可,HTTP 响应 200 ms 内返回,后台慢慢算。
避坑指南:并发、冷启动与状态管理
- 并发请求踩坑:Flask 默认 threaded 模式,BERT 推理用 GPU 时,CUDA context 多线程会崩。解决:在
predict.py里加全局锁,或者把推理服务拆成独立容器,用 gRPC 调,锁-free。 - 对话状态管理:别把 session 存在 Python dict,一旦多进程(gunicorn)就互相看不见。统一用 Redis + TTL。
- 模型冷启动:容器刚拉起来第一次推理要加载 440 MB 参数,RT 飙到 3 s。降级方案:容器启动时预热,随机跑 10 条样本;同时把“模型未就绪”异常 catch 住,先返回静态兜底文案,后台异步加载完再开放流量。
安全考量:输入过滤与敏感词
# filter.py import re, ahocorasick # 1. 敏感词库树 ac = ahocorasick.Automaton() for w in open("sensitive.txt", encoding="utf8"): ac.add_word(w.strip(), w.strip()) ac.make_automaton() def mask_sensitive(text: str) -> str: for end, word in ac.iter(text): text = text.replace(word, "*" * len(word)) return text # 2. SQL 注入/脚本注入 def validate(text: str) -> bool: black = ['--', '/*', '<script', 'javascript:', 'union select'] return not any(b in text.lower() for b in black)在/api/chat入口先调mask_sensitive和validate,失败直接返回 400,不进入模型,节省算力。
延伸思考:把知识图谱拉进来做多轮
目前系统只能“单轮问答”,如果用户问“我昨天买的手机能价保吗”,需要知道“昨天”对应哪笔订单、商品是否属“3C”、价保规则时效。下一步:
- 把订单、商品、政策导入 Neo4j,构建“用户-订单-商品-规则”四元组。
- 在意图识别后加一步“子图查询”,用 Cypher 把实体节点查出来,再喂给生成式模型做最终回复。
- 失败路径:图谱没查到就回退关键词模板,保证鲁棒。
这样能把多轮准确率再抬 15%,同时让客服答案带“订单号”“价保天数”等动态信息,体验更拟人。
整套代码跑在 4 核 8 G 的腾讯云轻量,日活 3 千会话 CPU 占用 35%,内存 2.7 G,GPU 只用一张 2060 就绰绰有余。上线两周,转人工率从 30% 降到 12%,平均响应 180 ms,业务同学终于不再半夜被拉群修 Bug。下一步把图谱链路做完,再给大家分享“多轮+生成”的踩坑记录。