背景
监控系统触发MQ积压告警,核心业务消息队列积压量在15分钟内从50万激增至1000万条,导致订单支付、物流推送等核心业务链路出现严重延迟。从MQ 监控看,发现上游系统异常推送800万条营销消息 。
直接诱因:夜间批量促销活动未做流量评估,上游系统异常推送800万条营销消息
架构缺陷:消费者采用单线程串行消费模式,监控阈值设置过高(500万才触发告警)
方案
方案分两个部分:
紧急的 止血救命包 (临时方案)
长期的 架构治根 ( 长期方案)
紧急止血 + 架构治根 ,一共是5个步骤
第1步: 定位原因:
第2步: 紧急止血包-极速扩容:
临时扩容Consumer实例和开启批量消费。
第3步: 弃卒保帅-非核心业务降级:
暂停非关键业务的消费者和降低非关键业务消费者线程数。
第4步: 并行爆破-上重武器:
消息转储 和 消费者更大规模扩容。
第5步:架构治根。 长治久安-上牛逼的架构方案:
高吞吐架构升级和高并发压的应急预案。
第1步:定位原因
生产侧问题(较少见,10%)
Broker侧问题(较少见,10%)
消费侧问题(最可能,80%)
第2步:紧急止血包(临时消费者扩容)
临时扩容Consumer实例
开启批量消费。
第3步. 弃卒保帅:消费者降级
暂停非关键业务的消费者
降低非关键业务消费者线程数。
第4步. 并行爆破:上重武器
- 消息转储 和 消费者更大规模扩容。
第5步. 架构治根:架构升级, 长治久安
高吞吐架构升级
高并发压的应急预案。
详细方案:
第1步:定位消息积压原因
在消息处理流程中,若客户端的消费速度跟不上服务端的发送速度,未处理的消息会不断累积,这部分消息就是堆积消息。
消息堆积会直接导致消费延迟,想要高效排查和解决这类问题,首先定位原因。
第一步:定位消息积压原因
定位消息积压原因
遇到消息积压时,很多人第一反应是“扩容消费者”,但在操作前必须先明确:到底是什么拖慢了消费速度?
RocketMQ的消费链路就像一条流水线,任何环节“堵车”都会引发积压,我们得先给这条流水线做个全面“检查”。
MQ消息积压的核心本质:生产速率>消费速率,导致消息在Broker队列中堆积。
全链路分析如下:
生产侧问题(较少见,10%)
- 业务高峰(如秒杀、大促)、补偿机制重发、生产端线程池失控等导致的瞬时流量冲击。
Broker侧问题(较少见,10%)
磁盘IO瓶颈(PageCache刷盘慢);
主从同步延迟;
网络分区或资源限制。
消费侧问题(最可能,80%)
(1)性能瓶颈:
Consumer陷入死循环,导致卡死;
业务逻辑复杂(如慢SQL、外部API调用、高耗时计算);
单条消息处理时间过长(超过100ms需警惕)。
(2)资源不足:
消费者实例数量不足(未随流量动态扩容);
消费者宕机或线程阻塞(如GC停顿、死锁)。
(3)配置缺陷:
顺序消费中单条消息卡住会阻塞整个队列(顺序消息会持续重试,普通消息仅重试16次);
广播模式下重复处理导致效率低下。
(4)重试风暴:
- 消息因依赖服务异常等原因频繁失败重试。
消息积压本质是“生产速度>消费速度+Broker转发能力”。
由于Broker通常是高可用集群,生产侧若无人工故障也较稳定,因此排查时应优先考虑消费侧问题。
大致的排查步骤如下:
排查Consumer是否处于“假死”状态
打开RocketMQ Dashboard(运维必备工具),查看Consumer分组的“在线客户端”列表。若某台服务器的Consumer长时间未上报心跳(LastHeartbeatTime超过2分钟),大概率是“消费者假死”。
这种情况多因消费者线程被Full GC卡住或代码中存在死循环。例如曾遇到某台服务器因循环中频繁打印日志导致CPU占用100%,Consumer线程直接卡死,积压量持续增加。
注意:需为Consumer配置JVM监控,重点关注GC频率和耗时。比如假死机器的Young GC耗时超500ms,老年代频繁Full GC,就会直接影响Consumer正常工作。
检查队列负载是否均衡
RocketMQ的Consumer采用“队列均分”策略,每个Consumer分配多个Message Queue(MQ)。若某台Consumer分配100个MQ,另一台仅分配10个,会导致“忙闲不均”。
通过Dashboard可查看每个Consumer实例的“已分配队列数”。比如三台新扩容服务器因网络配置问题未连接NameServer,导致老服务器承担80%队列,消费能力被压垮。
实操建议:若队列分配不均,可先重启Consumer实例触发重新负载均衡;若问题持续,检查Consumer分组配置,确保consumeFromWhere和messageModel设置正确(默认CLUSTERING模式会自动均衡)。
检查消费线程是否“效率低下”
RocketMQ Consumer默认消费线程数为20(由consumeThreadMin和consumeThreadMax控制)。若业务逻辑复杂(如涉及数据库查询、接口调用),20个线程可能不足,导致大量任务排队。
比如日志中发现线程池任务堆积量超1000,而实际工作线程仅10个——因为初始化时误将consumeThreadMin和Max均设为10,无法应对流量激增。
重点:线程数并非越多越好,需结合CPU核心数调整。IO密集型任务可设为CPU核心数的5-10倍(如50);CPU密集型任务超过32通常无意义,反而会因上下文切换降低效率。
节点线程数计算模型:单节点并发度需合理设置,过大易增加线程切换开销。理想环境下最优线程数计算模型:
单机vCPU核数为C;
忽略线程切换耗时,I/O操作不消耗CPU;
线程有足够消息处理,内存充足;
逻辑中CPU计算耗时为T1,外部I/O操作为T2。
则单个线程的TPS为1/(T1+T2),若CPU使用率达100%,单机最大线程数为C*(T1+T2)/T1。
第2步:紧急止血包(临时消费者扩容)
明确原因后进入“急救阶段”,需先让消费速度追上生产速度,再逐步消化历史积压。
第一招:临时扩容Consumer
这是最直接的方法,相当于增加高速公路车道。RocketMQ的Consumer无状态,理论上可无限扩容,但需注意两点:
扩容数量不超过MQ总数
每个MQ同一时间仅能被一个Consumer消费。
例如集群有100个MQ,最多可扩容至100个Consumer实例(每个实例分配1个MQ);
若集群有200个MQ且当前仅10个Consumer,理论上可先扩容至50个实例,充分利用队列资源。
第二招:开启批量消费,提高单次处理量
RocketMQ支持批量消费,默认每次拉取1条消息(参数consumeMessageBatchMaxSize默认值为1)。
若业务允许,可改为一次拉取10-32条,减少网络交互,提升吞吐量。
比如将该参数改为16,配合扩容后消费速度从500条/秒提升至8000条/秒——相当于从每次搬1箱货变为搬16箱,效率显著提升。但需注意:
保持幂等性
批量处理可能出现重复消费(如处理到第10条时消费者挂了,重启后16条消息会重新消费),因此业务代码必须支持幂等(如用唯一ID去重)。比如因未做幂等导致数据库出现重复订单,后面还得脚本去重。
避免参数过大
超过32后吞吐量提升不明显,反而增加内存压力。曾尝试设为100,导致Consumer内存使用率超80%,险些触发OOM,最终确定16-32为最佳范围。
第3步. 弃卒保帅:消费者降级+ 暂停Producer或限流
消费者降级
暂停非关键业务的消费者
降低非关键业务消费者线程数。
暂停Producer或限流,控制消息源头
若积压量极大(比如千万级以上)且消费速度短期内无法追上,可暂时让Producer停止发消息或降低发送频率。
注意:暂停Producer前必须与业务方沟通。
例如电商大促期间,暂停支付回调消息会影响商家收款,比如与前端协商在用户支付成功页增加“稍后刷新”提示,同时将Producer从2000 TPS限流至500 TPS,为消费者争取缓冲时间。
注意:
暂停后需监控Consumer的“堆积量”是否下降(理想状态为每分钟下降10-20万条)。
若未变化,可能是消费者重试逻辑导致消息反复投递(如消息处理失败后进入重试队列,积压量“假死”),此时需检查maxReconsumeTimes参数(默认16次,超过后进入死信队列)。
第4步. 并行爆破:上重武器
消息转储 和 消费者更大规模扩容。
解决第一步 临时扩容场景下的 MQ 分区总数不足的解决方案
若前期MQ数量不足(如仅4个MQ且已分配4个Consumer),第一步的临时扩容Consumer 意义不大,可按以下步骤处理:
1、 临时转储队列 : 创建原队列数10倍(或N倍)的新Topic , 也就是 临时转储队列;
2、 消息转储 : 开发临时转发程序,将积压消息均匀分发至新Topic的队列中;
3、 消费者更大规模扩容: 对应扩容Consumer(10倍),每个Consumer消费一个临时队列,同时扩容依赖的业务服务(如缓存、数据库);
4、消费完成后恢复原有架构,避免资源浪费。
对应扩容Consumer(10倍),每个Consumer消费一个临时队列,同时扩容依赖的业务服务(如缓存、数据库);
实操步骤:
1、 临时创建新Consumer分组(如加后缀-tmp),避免与原有消费者竞争资源;2、 启动时指定–consumerThreadMin 50 --consumerThreadMax 50(临时调高线程数);3、 观察Dashboard的“消费速度”,理想状态下每台新服务器分配4-5个MQ,消费速度可提升3-5倍。
第5步. 架构治根:架构升级,彻底 根治
高吞吐架构升级
积压 的应急预案。
(5.1)高吞吐架构升级:从 “被动应对” 到 “主动防御”
高吞吐架构的核心目标是:让生产速率≤消费速率 + Broker 承载能力,从根源上减少积压风险。需从生产端、消费端、Broker 端三个维度系统性优化,结合业务场景(如秒杀、大促)设计针对性方案。
(1)生产端:控制 “消息源头” 的速率与质量
生产端是消息的 “起点”,需通过限流、瘦身、异步化等手段,避免瞬时流量冲击 MQ。
生产端 动态限流:给生产端装 “刹车”
核心措施:基于 MQ Broker 的 “消息堆积量” 动态调整生产速率。
实现方式:
在 Producer 端集成 “积压量监测接口”(如调用 RocketMQ Dashboard 的/topic/stats接口),当某 Topic 积压量超过 50 万条时,自动触发限流(通过令牌桶算法将 TPS 从 2000 降至 500)。
业务适配:
秒杀场景下,结合前端限流(如按钮置灰、排队提示)和后端限流(Redis 计数器 + Lua 脚本),确保生产速率不超过消费端最大处理能力的 80%(预留 20% 缓冲)。
生产端 消息 “瘦身”:减少无效数据传输
核心问题:大消息(>1MB)会导致 Broker 存储效率下降、消费端处理耗时增加(如解析大 JSON 耗时 100ms+)。
优化措施:
消息体只保留 “核心字段”(如订单 ID、用户 ID、金额),非核心字段(如用户地址、商品详情)通过 “消息 + 数据库” 组合获取(消费端拿到消息后,再查 DB 补充信息);
大字段压缩:使用 Protobuf 替代 JSON(压缩率提升 50%+),或对超过 500KB 的消息进行 GZIP 压缩;
禁止 “日志型消息”:如非必要,不将接口调用日志、调试信息写入 MQ(改用 ELK 日志系统)。
(2)消费端:提升 “处理效率” 与 “容错能力”
消费端是消息处理的 “主力”.
消费端 需通过并行化、轻量化、隔离化设计,将单条消息处理耗时压缩至 50ms 以内(非复杂业务)。
队列拆分:
按 “业务类型” 或 “用户 ID 哈希” 拆分 Topic 队列,避免单队列阻塞。
示例:原 “订单消息” Topic(100 个队列)拆分为 “支付订单”(50 个队列)、“取消订单”(30 个队列)、“退款订单”(20 个队列),分别对应独立消费者组,避免某类消息(如退款)处理慢阻塞全量;
线程池优化:
核心参数优化:
IO 密集型业务(如调用外部 API、查 DB)→ 线程数 = CPU 核心数 ×5(如 8 核 CPU→40 线程);
CPU 密集型业务(如数据计算)→ 线程数 = CPU 核心数 ×2;
线程池隔离:
使用 Hystrix 或 Resilience4j 为不同消息类型分配独立线程池(如支付消息用payThreadPool,物流消息用logisticsThreadPool),避免某类消息线程池满导致全局阻塞。
业务逻辑轻量化:砍掉 “慢操作”
慢 SQL 优化:消费端涉及的 DB 操作必须加索引,禁止select *、复杂 JOIN(耗时>50ms 的 SQL 需拆分或异步化);
外部依赖缓存:调用第三方接口(如支付回调、物流查询)时,增加本地缓存(Caffeine,过期时间 5 分钟)+ 分布式缓存(Redis),减少远程调用耗时(从 200ms→10ms);
异步处理非核心步骤:如订单消息消费时,“扣减库存”(核心)同步处理,“发送短信通知”(非核心)丢入本地线程池异步执行(失败后不影响主流程)。
批量消费 + 幂等设计:提升吞吐量 + 防重复
批量消费参数固化:consumeMessageBatchMaxSize固定为 16-32(经实测,此范围吞吐量提升最明显,且内存可控);
幂等实现:- 消息层面:为每条消息生成唯一 ID(如 UUID),消费端首次处理时写入 “消息处理表”(ID + 状态),重复消息直接跳过;- 业务层面:订单支付消息通过 “订单 ID + 支付状态” 去重(如已支付的订单,再次收到支付消息时直接返回成功)。
(3)Broker 端:强化 “承载能力” 与 “稳定性”
Broker 是消息存储与转发的核心,需通过硬件升级、集群扩容、参数优化提升极限承载能力(目标:单 Broker 支持 10 万 TPS+,集群支持百万 TPS+)。
硬件与存储优化
磁盘:使用 SSD(随机读写速度是机械硬盘的 10 倍 +),且单 Broker 磁盘容量≥1TB(避免频繁清理旧消息);
内存:为 Broker 配置足够大的 PageCache(如 16 核 32G 机器,分配 16G 作为 PageCache),减少磁盘 IO 压力(消息先写入 PageCache,再异步刷盘);
网络:Broker 节点间使用万兆网卡,避免跨机房部署(主从节点同机房,延迟控制在 1ms 内)。
集群扩容与负载均衡
集群规模:按 “生产 TPS×2” 配置 Broker 节点(如预估生产端 50 万 TPS,集群部署 10 个 Broker 节点,单节点承载 5 万 TPS);
队列均衡:每个 Topic 的队列数 = Broker 节点数 ×8(如 10 个 Broker→80 个队列),确保队列均匀分布在各 Broker(避免某台 Broker 负载过高);
主从架构:每个 Broker 配置 1 个从节点,主节点故障时自动切换(RocketMQ 支持主从自动切换,切换时间<30 秒)。
刷盘与清理策略优化
刷盘策略:高吞吐场景用ASYNC_FLUSH(异步刷盘,写入 PageCache 即返回成功,由后台线程定时刷盘),牺牲部分一致性换性能;
消息清理:设置合理的fileReservedTime(消息保留时间),非核心消息保留 24 小时,核心消息保留 7 天(避免旧消息占用磁盘空间)。
(5.2) 应急预案:从 “无序应对” 到 积压的 “标准化处理流程”
应急预案的核心是:在 10 分钟内发现问题,30 分钟内启动处理,2 小时内控制积压增长。
需明确 “监控指标→报警机制→响应流程→责任分工”,确保事故发生时 “人人知道该做什么”。
(5.2.1)监控预警体系:让积压 “早发现、早预警”
需建立多维度监控指标,覆盖生产端、消费端、Broker 端,且报警阈值需结合业务场景动态调整(如秒杀时阈值放宽,日常收紧)。
建立监控体系,事前:给 Consumer 装“仪表盘”,实时监控关键指标
不能仅关注“堆积量”,以下指标更重要:
消费延迟(consumeLatency):消息产生到被消费的时间差,超过10秒需警觉;
拉取吞吐量(pullTPS):突然下降50%可能是网络或Broker异常;
消费线程池利用率:通过ThreadPoolExecutor.getActiveCount()监控,长期接近consumeThreadMax说明线程不足。
后续为每个Consumer实例配置Prometheus监控,结合Grafana仪表盘,积压预警准确率大幅提升。
指标落地:Grafana 仪表盘配置
实时显示 “积压量趋势图”(每 5 秒刷新一次)、“消费速率 VS 生产速率对比图”、“各 Broker 节点负载分布图”;
配置 “异常指标联动报警”:如积压量>10 万条且消费速率<生产速率时,自动触发报警(避免单一指标误报)。
(5.2.2)应急响应流程:标准化处理步骤(附责任分工)
将应急响应分为 “发现→控制→消化→恢复” 四阶段,明确每个阶段的操作人、操作步骤、验证标准。
(3)应急工具与脚本:让操作 “一键执行”
提前准备可复用的脚本和工具,避免应急时 “手忙脚乱写代码”。
快速扩容消费者脚本(scale_consumer.sh)
积压的 应急预案准备
建立应急预案,事中:准备“应急预案模板”,让新人也能快速上手
将急救步骤编写为脚本:
# 快速扩容消费者脚本 # 循环创建50个临时消费者实例fori in{1..50};dojava-jar consumer.jar \--consumerGroup order_consumer_tmp \ # 临时消费者分组,避免冲突--namesrvAddr xxx:9876\ #NameServer地址--consumeThreadMin50\ # 最小消费线程数(临时调高)--consumeMessageBatchMaxSize16\ # 批量消费参数--clientIP $(curl ifconfig.me)&# 自动获取公网IP,便于定位实例 done # 暂停死信队列脚本 # 通过Dashboard接口修改消费者配置 curl-XPOST http://rocketmq-dashboard:8080/consumer/update \-H"Content-Type: application/json"\-d'{"groupName":"order_consumer","maxReconsumeTimes":0}'# 关闭重试,避免消息反复投递注意:应急预案需定期演练。曾在演练中发现脚本路径错误,提前修正避免了实战中的问题。
消息转发脚本或者Java程序
功能:从原 Topic 批量拉取积压消息,转发至临时 Topic(支持按消息产生时间过滤,优先转发新消息)。
限流生产端 API(供业务方调用)
提供 HTTP 接口:/producer/limit?topic=xxx&tps=500,用于紧急降低指定 Topic 的生产速率。
(4)演练机制:让方案 “经得住实战”
演练频率:每月 1 次常规演练,大促前 1 周增加 1 次高压力演练(模拟 1000 万条消息积压);
演练场景:
场景 1:消费端线程池满导致积压(模拟:消费端故意加 sleep (1s));
场景 2:Broker 磁盘 IO 瓶颈导致积压(模拟:限制 Broker 磁盘读写速度至 10MB/s);
场景 3:生产端瞬时流量冲击(模拟:用 JMeter 发送 10 万 TPS 消息);
演练输出:每次演练后输出《问题清单》(如脚本路径错误、报警延迟),24 小时内修复并更新方案。
总结
高吞吐架构升级是 “治本”,通过生产端削峰、消费端提效、Broker 端扩容提升系统极限能力;应急预案是 “治标”,通过监控预警、标准化流程、工具支撑确保事故发生时快速响应。二者结合,可将消息积压的概率降低 90% 以上,即使发生积压,也能在 2 小时内控制,4 小时内恢复,真正实现 “长治久安”。
生产场景百万级消息积压 攻关的 总结
第1步: 定位原因:
第2步: 紧急止血包-极速扩容:
临时扩容Consumer实例和开启批量消费。
第3步: 弃卒保帅-非核心业务降级:
暂停非关键业务的消费者和降低非关键业务消费者线程数。
第4步: 并行爆破-上重武器:
消息转储 和 消费者更大规模扩容。
第5步:架构治根。 长治久安-上牛逼的架构方案:
高吞吐架构升级和高并发压的应急预案。