背景痛点:传统轮询为何撑不住高并发
小程序客服场景里,用户消息像“秒杀”一样瞬间涌入。老做法是在网关层做轮询:每 200 ms 扫一次数据库,看有没有新消息。
这种做法在并发量 < 500 QPS 时还能跑,一旦冲到 2000 QPS,CPU 空转、连接数暴涨,MySQL 的SELECT ... FOR UPDATE直接把行锁打成表锁,RT 99 从 120 ms 飙升到 1.8 s。
人工规则系统也好不到哪去:
- 关键词用
if-elif堆成“屎山”,新增一条规则要回归测试 2 天 - 上下文靠
session["step"]硬编码,用户中途换问题就“断片”
维护成本随业务线性上升,而响应质量却随规则数量指数下降。
架构对比:智能体 vs 传统中间件
| 指标 | 传统客服中间件 | 智能体架构(本文方案) |
|---|---|---|
| 峰值 QPS | 2 k | 12 k |
| 平均 RT | 600 ms | 45 ms |
| 水平扩展 | 加机器+改配置 | 加 Pod,自动注册队列 |
| 上下文保持 | Session Sticky | 状态机+Redis,无粘性 |
| 故障恢复 | 人工重启 | 状态机快照,30 s 内自愈 |
压测环境:4C8G × 10 节点,K6 模拟 5 万并发,消息大小 0.5 KB。
结论:智能体把“无状态”做到极致,任何节点宕机都不丢对话。
核心实现:三条代码路径搞定
1. 消息分级队列(RabbitMQ)
把用户消息按“优先级”拆成三级队列:
chat.high:VIP 用户chat.normal:普通用户chat.dlq:死信,人工兜底
channel 绑定与 QoS 配置如下,保证 prefetch 不爆内存:
import pika def open_channel(): params = pika.URLParameters("amqp://guest:guest@rmq:5672") conn = pika.BlockingConnection(params) ch = conn.channel() ch.basic_qos(prefetch_count=200, global_qos=False) # 限流 ch.queue_declare(queue="chat.high", durable=True, arguments={"x-max-priority": 10}) return ch2. 对话状态机(Redis + TTL)
状态机 = 会话 ID → 状态 JSON,Redis Hash 存储,TTL 900 s 自动清脏数据。
import redis, json, time r = redis.Redis(host="rds", max_connections=50) # 连接池 def transit(sid, state, data=None): key = f"sm:{sid}" payload = {"state": state, "data": data or {}, "ts": time.time()} r.hset(key, mapping=payload) r.expire(key, 900)异常恢复:每次状态写入都带ts,重启时若now - ts > 600则触发“超时重入”节点,引导用户重新描述问题。
3. 轻量 NLP 引擎(Rasa NLU)
Dockerfile 只保留spacy+sklearn组件,镜像 210 MB,启动 3 s。
FROM rasa/rasa:3.6.2-spacy-en COPY config.yml models/ /app/ CMD ["rasa", "run", "--enable-api", "-p", "5005"]调用示例(带意图置信度过滤):
import requests def parse(text): resp = requests.post("http://nlu:5005/model/parse", json={"text": text}) data = resp.json() if data["intent"]["confidence"] < 0.35: return None return data["intent"]["name"]性能优化:把 45 ms 再砍一半
连接池调优
- MySQL:
max_connections = 1000,应用侧SQLALCHEMY_POOL_SIZE = 25,POOL_MAX_OVERFLOW = 50 - Redis:
max_connections = 50,socket_keepalive = True,retry_on_timeout = True
压测显示,池子太小会出现“连接等待”二次 RTT,调到上述值后 P99 延迟再降 18 ms。
敏感词过滤:AC 自动机
暴力in匹配 1 万关键词,单次耗时 7 ms;换 AC 自动机(pyahocorasick)后 0.18 ms,CPU 降 30%。
import ahocorasick A = ahocorasick.Automaton() for idx, key in enumerate(keywords): A.add_word(key, (idx, key)) A.make_automaton() def filter(text): for end, (idx, key) in A.iter(text): text = text.replace(key, "*"*len(key)) return text时间复杂度:构建 O(∑len(key)),匹配 O(len(text)),内存占用 20 MB 以内。
避坑指南:上线前必读
微信消息格式校验
微信会偷偷把<script>大小写混写,正则必须加re.IGNORECASE,否则直接 400。会话漂移
多 Pod 场景下,用户可能前一秒命中 Pod A,后一秒到 Pod B。状态机以 Redis 为准,禁止本地缓存;同时开启 Redis Stream 的consumer-group,保证同一sid永远路由到同一 consumer。冷启动预热
新镜像刚拉起来,Rasa 模型第一次“懒加载”首包 2 s。解决:- 启动脚本里先发一条“假句子””做 warmup
- K8s 配
readinessProbe调initialDelaySeconds: 15,防止流量提前切入
代码规范与注释
- 统一 Black 格式化,行宽 88
- 关键算法写复杂度,例如:
def sm_match(states, pattern): """ 使用字典树匹配状态,时间复杂度 OO(len(pattern)) """让后人改代码时一眼看出瓶颈。
延伸思考:智能体的会话边界到底在哪?
智能体再智能,也怕用户“东拉西扯”。目前靠“意图置信度 < 阈值”触发澄清,但边界仍模糊。
可以探索的方向:
- 引入对话行为识别(Dialogue Act Detection),把“提问/肯定/否定”做细粒度标签
- 用 BERT 做句子连贯性打分,低于 0.5 自动切新会话
- 让业务运营给“边界”打标,强化学习在线调策略,实现“越用越准”
把消息队列、状态机、NLP 三个齿轮咬合好,小程序客服就能从“能用”变“好用”。
整套代码已放在 GitHub,镜像直接docker-compose up,五分钟就能跑通。
如果你也踩过“轮询”坑,或者有更巧妙的边界检测思路,欢迎留言一起拆招。