背景痛点:传统客服系统为什么“慢”
去年双十一,我们内部客服平台在零点瞬间涌入 6w+ 咨询,老系统直接“卡死”——页面转圈 8 s 才返回第一句欢迎语。复盘后把问题拆成三类:
- 同步阻塞:Tomcat 线程池打满,一条 SQL 查询慢就把整条调用链拖死。
- 冷启动慢:问答模型放在一个大单体里,每次升级都要重启 3 min,高峰期不敢发版。
- 多轮对话混乱:前端把“上一句”简单塞进 Redis,结果 key 过期或覆盖,用户刚选完“退货原因”,机器人又问“请问要退吗?”——体验翻车。
一句话:性能瓶颈 + 业务混乱,让“智能”客服既不智能,也不客服。
技术选型:RESTful、gRPC、规则引擎、ML 模型怎么挑
先给结论,再讲思考:
| 维度 | RESTful | gRPC | 规则引擎 | 机器学习 |
|---|---|---|---|---|
| 延迟 | 高(HTTP1.1) | 低(HTTP2 + ProtoBuf) | 极低 | 依赖 GPU |
| 流式推送 | 双向流 | |||
| 版本兼容 | 易 | 需 proto 约定 | 易 | 需重训 |
| 可解释性 | 无 | 无 | 高 | 中 |
| 迭代速度 | 快 | 中 | 慢 | 快(MLOps) |
我们最终混搭:
- 网关 ↔ 微服务:gRPC,降低 30% 延迟。
- 简单 FAQ:规则引擎兜底,毫秒级回复。
- 复杂意图:BERT 微调,准确率从 82% → 97%。
核心实现一:Kafka 异步事件解耦
把“用户发消息”当成事件,而不是 API 调用,系统吞吐量瞬间解放。
- 生产者(Flask 网关)
# producer.py from kafka import KafkaProducer import json, uuid, time producer = KafkaProducer( bootstrap_servers='kafka:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'), linger_ms=5, # 小批量合并 compression_type='gzip') def publish_message(user_id, text): event = { "event_id": str(uuid.uuid4()), "user_id": user_id, "text": text, "timestamp": int(time.time()*1000) } # 按 user_id 做分区,保证同一用户顺序消费 producer.send('cs-inbound', value=event, partition=hash(user_id)%12)- 消费者(意图识别微服务)
# consumer.py from kafka import KafkaConsumer import asyncio, json consumer = KafkaConsumer( 'cs-inbound', bootstrap_servers='kafka:9092', group_id='nlp-group', enable_auto_commit=False, # 手工提交,防崩溃重复 value_deserializer=lambda m: json.loads(m.decode('utf-8'))) async def handle_event(msg): # 调用 BERT 服务拿到意图 intent = await predict_intent(msg.value['text']) # 把结果写回 Kafka,供下游对话管理器消费 producer.send('cs-intent', value={ "event_id": msg.value['event_id'], "intent": intent }) consumer.commit(msg) # 处理完再提交 offset for message in consumer: asyncio.run(handle_event(message))上线后,同样 8 核机器,QPS 从 1.2k 提到 4.8k,CPU 利用率反而降到 55%,因为线程不再空等 IO。
核心实现二:BERT 微调让意图识别“听得懂人话”
开源通用模型在客服领域常“水土不服”,我们把历史 40 万条对话标注清洗,再做领域微调。
- 数据预处理
# prepare_data.py import pandas as pd, re, json, random, sklearn.utils def clean(txt): txt = re.sub(r'\d+', 'NUM', txt) # 数字脱敏 txt = re.sub(r'\s+', ' ', txt) return txt.strip() df = pd.read_csv('raw_chat.csv') df['text'] = df['text'].apply(clean) # 分层采样,保证每类意图均衡 df = sklearn.utils.resample(df, stratify=df['intent'], n_samples=50000) with open('train.json', 'w', encoding='utf8') as f: for _, row in df.iterrows(): f.write(json.dumps({"label": row.intent, "text": row.text}, ensure_ascii=False)+'\n')- 微调脚本(基于 Transformers)
# train_intent.py from transformers import BertTokenizerFast, BertForSequenceClassification, Trainer, TrainingArguments import datasets, evaluate tokenizer = BertTokenizerFast.from_pretrained('bert-base-chinese') def encode(examples): return tokenizer(examples['text'], truncation=True, padding='max_length', max_length=64) dataset = datasets.load_dataset('json', data_files='train.json')['train'] dataset = dataset.train_test_split(test_size=0.1).map(encode, batched=True) model = BertForSequenceClassification.from_pretrained('bert-base-chinese', num_labels=36) args = TrainingArguments( output_dir='intent_model', per_device_train_batch_size=64, learning_rate=2e-5, num_train_epochs=3, evaluation_strategy='epoch', load_best_model_at_end=True, metric_for_best_model='accuracy') def compute_metrics(eval_pred): logits, labels = eval_pred preds = logits.argmax(-1) return evaluate.load('accuracy').compute(predictions=preds, references=labels) trainer = Trainer(model=model, args=args, train_dataset=dataset['train'], eval_dataset=dataset['test'], compute_metrics=compute_metrics) trainer.train()训练 1.5 小时,验证集准确率提升 15%,再把模型转成 ONNX,CPU 推理 20 ms 内完成。
核心实现三:对话状态机(DSM)让多轮不“断片”
多轮对话=状态机,不是“if/else 地狱”。我们定义:
- State:节点,如“ greet / collect_order / confirm_addr / done ”
- Event:意图 + 实体
- Transition:条件 → 下一状态 + 回复模板
# dsm.py from transitions import Machine import redis, json r = redis.Redis(host='redis', decode_responses=True) class ChatContext: states = ['greet', 'collect_order', 'confirm_addr', 'done'] transitions = [ {'trigger': 'hello', 'source': 'greet', 'dest': 'collect_order'}, {'trigger': 'provide_order', 'source': 'collect_order', 'dest': 'confirm_addr', 'conditions': 'has_order'}, {'trigger': 'confirm', 'source': 'confirm_addr', 'dest': 'done'}, {'trigger': 'deny', 'source': 'confirm_addr', 'dest': 'collect_order'} ] def __init__(self, user_id): self.user_id = user_id self.order_id = None self.machine = Machine(model=self, states=ChatContext.states, initial=r.get(f'dsm:{user_id}:state') or 'greet', transitions=ChatContext.transitions, after_state_change='save_state') def save_state(self): r.hset(f'dsm:{user_id}', mapping={'state': self.state, 'order_id': self.order_id or ''}) r.expire(f'dsm:{user_id}', 600) # 10 min 过期 def has_order(self, event): # 条件函数 return bool(event.kwargs.get('order_id'))前端每轮只负责把“用户原文”送进来,网关返回“当前状态+机器人回复”,再也不担心上下文被覆盖。
性能测试:JMeter 压测对比
环境:4C8G × 3 节点,Kafka 18 分区,模型 CPU 推理。
| 指标 | 老系统(同步) | 新系统(异步+ONNX) |
|---|---|---|
| 平均 QPS | 1,200 | 4,850 |
| P99 延迟 | 2,300 ms | 280 ms |
| CPU 峰值 | 95% | 55% |
| 错误率 | 2.5% | 0.1% |
避坑指南:三个线上血泪教训
Redis 键设计
推荐biz:dsm:{user_id}+biz:msg:{user_id}:{event_id},别用chat:*这种模糊前缀,会被keys扫到阻塞。模型灰度发布
采用“影子流量”策略:复制 10% 消息到新版模型,对比结果写日志,指标持平后再全量。否则一次 bad case 可能让投诉翻倍。异常流量熔断
用 Sentinel 网关,针对“同一 IP 5 min 内>1k 次调用”直接返回 429,并把 IP 写进 Redis 黑名单 30 min,防止恶意刷接口拖垮 Kafka。
思考题:如何设计支持方言识别的客服系统?
BERT 默认语料以普通话为主,遇到粤语、川话就“抓瞎”。如果让你来落地,你会:
- 收集方言语音→文本,还是直接用拼音标注?
- 在模型层是“单独微调一个方言模型”还是“多任务共享底层”?
- 线上如何动态识别用户语种并路由到对应模型?
欢迎留言聊聊你的思路,一起把客服做成“老乡见老乡”。
整套方案上线三个月,平均响应时间降到原来的 1/8,客服同学终于能在双十一零点安心吃泡面。代码和脚本都已开源到团队仓库,有需求自取,记得顺手 Star。祝大家的客服系统都能“秒回”,不再背锅。