背景痛点:外部群里的“三座大山”
做 toB 客服的同学都懂,企业微信(WeCom Work 3.x 版本)把外部群聊消息收拢到「客户联系」事件后,回调地址瞬间成了流量黑洞。官方文档写得轻描淡写,真上生产却踩坑不断:
- 消息乱序:同一用户连发 3 条,时间戳居然逆序到达,客服机器人答非所问。
- 重复推送:官方重试 3 次起步,网络抖动时 5~7 次也常见,不做幂等就会“亲,您的问题已解决”刷屏。
- 高峰期阻塞:上午 10 点促销,回调 QPS 从 200 飙到 2 k,同步 Flask 直接 502,老板在群里疯狂艾特。
一句话:HTTP 回调 + 同步处理撑不住,得把“收”和“算”拆开。
技术对比:同步阻塞 vs 异步 IO + 消息队列
先用最土的方案做基线:Flask + sync view,一条消息落库再调 NLP 接口,平均 RT 600 ms。Wireshark 抓包看 TCP 流,发现服务器 ACK 延迟一路飘红,高峰期重传率 4.7%。
再上异步方案:Python 3.11 + asyncio + aiohttp,收到回调直接塞 RabbitMQ,worker 协程池消费。同样 8C16G 机器,QPS 从 420 提到 1680,CPU idle 还多出 25%。下图是同窗口 30 s 的对比,绿线是异步队列方案,几乎没有 5xx。
结论:IO 等待占大头时,把 CPU 让给其他协程,比盲目加机器划算得多。
核心实现:三板斧落地
1. asyncio 消息分发器
# dispatcher.py import asyncio, aiohttp, json, os from typing import Dict, Any from aio_pika import connect_robust, Message, DeliveryMode QUEUE_NAME = "external_chat" ROUTING_KEY = "msg.new" async def publish_to_mq(body: bytes) -> None: connection = await connect_robust(os.getenv("AMQP_URI")) async with connection: channel = await connection.channel() queue = await channel.declare_queue(QUEUE_NAME, durable=True) msg = Message(body, delivery_mode=DeliveryMode.PERSISTENT) await channel.default_exchange.publish(msg, routing_key=ROUTING_KEY) async def handle_callback(request: aiohttp.web.Request) -> aiohttp.web.Response: try: body = await request.read() await publish_to_mq(body) # 不落库,直接扔队列 return aiohttp.web.Response(text="success") except Exception as e: logger.exception("callback error") return aiohttp.web.Response(text="fail", status=500)2. RabbitMQ 削峰 + 死信队列
高峰期先把消息堆在队列里,worker 匀速消费;万一 NLP 下游超时,把消息踢到死信队列(DLX)人工复查。
# rabbitmq-dlx.yaml queues: - name: external_chat durable: true arguments: x-dead-letter-exchange: "dlx" x-dead-letter-routing-key: "failed" x-max-length: 100000 # 防内存爆炸3. 带 JWT 验证的回调接口
企业微信每次回调带msg_signature+timestamp+nonce,但咱们内部微服务统一用 JWT,干脆在入口网关做一层转换,代码里再验企业微信签名,双保险更安全。
# auth.py import hmac, hashlib, time from typing import Tuple def verify_wx_signature(token: str, signature: str, timestamp: str, nonce: str, echostr: str) -> bool: tmp = [token, timestamp, nonce, echostr] tmp.sort() sign = hashlib.sha1("".join(tmp).encode()).hexdigest() return hmac.compare_digest(sign, signature) def retry_on_error(func, max_times: int = 3): async def wrapper(*args, **kwargs): for i in range(1, max_times + 1): try: return await func(*args, **kwargs) except Exception as e: if i == max_times: raise await asyncio.sleep(0.1 * i) return None return wrapper性能优化:让 2 k QPS 稳如老狗
1. 协程池大小推导
经验公式(IO 密集型):
N = min(32, CPU_COUNT * 5 + 2)16 核机器 → 82 并发协程,再乘以 3 个副本,轻松吃下 2 k QPS。Locust 压测报告如下:
平均 RT 38 ms,P99 110 ms,比同步方案缩短 5 倍。
2. 内存泄漏检测
asyncio 任务如果忘了await或异常吞掉,会默默挂起。用asyncio.all_tasks()定时打印:
import asyncio, gc, objgraph async def monitor(): while True: await asyncio.sleep(60) tasks = [t for t in asyncio.all_tasks() if not t.done()] logger.info("pending tasks=%s", len(tasks)) if len(tasks) > 1000: objgraph.show_growth(limit=10)发现aiohttp.ClientSession未关闭,及时加await session.close()后内存平稳。
避坑指南:生产级细节
1. 消息去重 5 种方案
- 内存 set:重启即丢,开发调试用。
- Redis setex:10 分钟滑动窗口,重启不丢,占内存小。
- MySQL 唯一索引:最稳,但 RT 高。
- 业务层幂等:用
msgid+fromuser做 UK,推荐。 - 雪花 ID + 缓存:需要发号器,复杂场景再用。
结论:外部群对顺序不敏感,直接msgid做 UK 最香。
2. Rate Limiting 实现
用令牌桶算法,单进程 500 token/s,超量直接回 429,企业微信会 1 min 后重试,不丢消息。
# limiter.py import asyncio, time from typing import Optional class TokenBucket: def __init__(self, rate: int, burst: int): self._rate = rate self._burst = burst self._tokens = burst self._last = time.monotonic() self._lock = asyncio.Lock() async def acquire(self, block: bool = True) -> bool: async with self._lock: now = time.monotonic() delta = now - self._last self._tokens = min(self._burst, self._tokens + delta * self._rate) self._last = now if self._tokens >= 1: self._tokens -= 1 return True if not block: return False await asyncio.sleep(1 / self._rate) return await self.acquire(block)代码规范小结
- 统一 black < 88 字符,black + isort 进 CI。
- 所有公开函数写类型注解,返回值不省略。
- 异常必须 raise 到日志,禁止
except: pass。 - 单元测试覆盖率 85% 以上,mock 掉微信服务器。
延伸思考:WebSocket 长连接能否替代 HTTP 回调?
HTTP 回调简单、防火墙友好,但 TLS 握手+重试导致额外 30% 流量。WebSocket 长连接可双向推送,理论上减少 1/3 延迟;可惜企业微信目前只给服务商开放“客服账号”长连,外部群仍走回调。自研方案可让网关层维护 WebSocket,把回调转成内部事件,再推给客服坐席浏览器,实现“运营后台实时看群聊”。下一版准备用 go-zero 做网关,压测 5 w 长连,届时再发笔记。
踩完坑回头看,异步 IO + 消息队列就是外部群客服的“最佳解”:代码量不大,水平扩展却爽。把收、发、算三层拆开,再留好监控 + 限流 + 死信,老板再也不用担心促销日客服宕机。祝你 2 k QPS 也稳如狗,早日下班。