news 2026/3/8 3:04:28

Java-205 RabbitMQ 工作模式实战:Work Queue 负载均衡 + fanout 发布订阅(手动ACK/QoS/临时队列)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Java-205 RabbitMQ 工作模式实战:Work Queue 负载均衡 + fanout 发布订阅(手动ACK/QoS/临时队列)

TL;DR

  • 场景:同一队列多消费者分摊任务 + 一条消息广播给多个订阅方
  • 结论:Work Queue 依赖 manual ack + basicQos 控制分发;fanout 依赖交换器绑定与临时队列实现“一对多”
  • 产出:给出 Java 生产/消费代码骨架、未命名交换器用法、临时队列与 binding 的验证路径

RabbitMQ 工作模式

Work Queue

生产者发送消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可以达到负载均衡的效果。

NewTask

packageicu.wzk.demo;publicclassTestTask{privatestaticfinalStringHOST="localhost";privatestaticfinalStringVIRTUAL_HOST="/";privatestaticfinalStringUSERNAME="admin";privatestaticfinalStringPASSWORD="secret";privatestaticfinalintPORT=5672;privatestaticfinalStringQUEUE_NAME="wzk-icu";privatestaticfinalString[]WORKS={"hello.","hello..","hello...","hello....","hello.....","hello......","hello.......","hello........","hello.........","hello.........."};publicstaticvoidmain(String[]args){ConnectionFactoryfactory=newConnectionFactory();factory.setHost(HOST);factory.setVirtualHost(VIRTUAL_HOST);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);factory.setPort(PORT);try(Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel()){channel.queueDeclare(QUEUE_NAME,false,false,false,null);Stringexchange="";for(Stringwork:WORKS){channel.basicPublish(exchange,QUEUE_NAME,null,work.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '"+work+"'");}}catch(Exceptione){e.printStackTrace();}}}

对应的执行结果如下所示:

TestTask2

packageicu.wzk.demo;publicclassTestTask2{privatestaticfinalStringTASK_QUEUE_NAME="wzk-icu";publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{ConnectionFactoryfactory=newConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("secret");// false = 手动 ack(推荐配合 basicQos)booleanautoAck=false;try(Connectionconn=factory.newConnection();Channelchannel=conn.createChannel()){// 每次只拉取 1 条,避免一次性堆给消费者channel.basicQos(1);channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);DeliverCallbackdeliverCallback=(consumerTag,delivery)->{Stringtask=newString(delivery.getBody(),StandardCharsets.UTF_8);longtag=delivery.getEnvelope().getDeliveryTag();System.out.println(" [x] Received '"+task+"'");try{doWork(task);System.out.println(" [x] Done");// 手动确认channel.basicAck(tag,false);}catch(InterruptedExceptione){Thread.currentThread().interrupt();// 中断时重回队列channel.basicNack(tag,false,true);}catch(Exceptione){// 失败时重回队列(按需改为 false 丢弃)channel.basicNack(tag,false,true);}};channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,consumerTag->{});// 阻塞主线程,保持进程存活synchronized(TestTask2.class){try{TestTask2.class.wait();}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}}}privatestaticvoiddoWork(Stringtask)throwsInterruptedException{System.out.println("task = "+task);for(charch:task.toCharArray()){if(ch=='.'){Thread.sleep(1000);}}}}

执行结果如下所示:

发布订阅

在RabbitMQ消息队列系统中,fanout类型交换器是一种重要的消息分发机制。这种交换器的工作方式类似于广播模式,具有以下特点:

  1. 消息分发机制:
  • 生产者将消息发送到Exchange时,会完全忽略RoutingKey(路由键)的设置
  • Exchange会将收到的每一条消息复制并推送到所有绑定到该Exchange的队列
  • 每个消费者队列都会收到完整的消息副本
  1. 队列绑定过程:
  • 每个消费者在订阅时,RabbitMQ会自动为其创建一个专属队列
  • 这个新创建的队列会与指定的Exchange建立绑定关系
  • 绑定过程不涉及任何路由规则或过滤条件
  1. 典型应用场景:
  • 实时通知系统(如股票行情推送)
  • 日志收集系统(多个日志处理服务同时接收日志)
  • 事件广播系统(订单创建事件通知多个微服务)
  1. 技术实现细节:
# 生产者示例代码channel.exchange_declare(exchange='logs',exchange_type='fanout')channel.basic_publish(exchange='logs',routing_key='',body=message)# 消费者示例代码result=channel.queue_declare(queue='',exclusive=True)channel.queue_bind(exchange='logs',queue=result.method.queue)
  1. 与传统点对点模式的区别:
  • 传统模式:一个消息只能被一个消费者处理
  • fanout模式:一个消息会被所有消费者处理
  • 消息生命周期:在fanout模式下,消息会被复制多份分别存储在不同队列中

这种模式特别适合需要"一对多"消息分发的场景,确保所有订阅者都能及时获取完整的消息内容。由于不依赖RoutingKey,系统设计更加简单,但需要注意可能带来的资源消耗问题,因为每条消息都会被复制多份。

交换器的类型前面已经介绍过了:
● direct
● topic
● headers
● fanout

发布订阅使用 fanout,创建交换器叫做 logs:

channel.exchangeDeclare("logs","fanout");

fanout 交换器很简单,从名字可以看到叫用风扇吹出去,将收到的消息发送给它知道的所有队列。

rabbitmqctl list_exchanges

列出 RabbitMQ 的交换器,包括了 amq.* 和 默认未命名的交换器。

未命名交换器

在前面我们虽然没有指定交换器,但是依然可以向队列发送消息,这是因为我们使用了默认的交换器。

channel.basicPublish("","hello",null,message.getBytes());

第一个参数就是交换器的名称,为空字符串,直接使用 RoutingKey 向队列发送消息,如果该 RoutingKey 指定的队列存在的话。
如果我们要向指定的交换发布器发送消息:

channel.basicPublish("logs","",null,message.getBytes());

临时队列

生产者和消费者都是通过队列名称来发送和接收该队列中的消息。在使用RabbitMQ时,创建队列的过程需要注意以下几点:

  1. 队列创建机制:
  • 当连接到RabbitMQ时,通常需要创建一个新的空队列
  • 队列命名方式有两种选择:
    • 自定义命名:可以指定一个有意义的队列名称,如"order_processing_queue"
    • 自动生成:可以让RabbitMQ服务器生成随机队列名(如"amq.gen-JzTY20BRgKO-HjmUJj0wLg")
  1. 队列生命周期管理:
  • 临时队列特性:当声明队列时将"exclusive"参数设为true,该队列就变成独占队列
  • 自动删除机制:对于独占队列,一旦消费者断开连接,RabbitMQ会自动删除该队列
  • 持久化队列:如果需要长期保留队列,可以设置"durable"参数为true

应用场景示例:

  1. 临时任务处理:使用自动生成的队列名处理一次性任务
  2. RPC调用:客户端创建临时队列接收服务端响应
  3. 消息广播:多个消费者各自创建独占队列绑定到同一个交换机

注意事项:

  • 生产环境中建议使用明确的队列命名规范
  • 自动删除队列适合临时性消息交换场景
  • 确保消费者异常断开时队列能正确释放资源
StringqueueName=channel.queueDeclare().getQueue();

上述代码我们声明了一个非持久化的、排他的、自动删除的队列,并且名字都是服务器随机生成的。

进行绑定


在创建了消息队列和fanout类型的交换器之后,我们需要将两者进行绑定,让交换器将消息发送给该队列。fanout交换器会将收到的所有消息广播到所有与之绑定的队列中,这种模式非常适合需要消息广播的场景,比如系统日志分发或实时通知推送。

下面是具体的绑定操作示例代码:

channel.queueBind(queueName,"logs","");

这段代码中各个参数的含义是:

  • queueName:要绑定的队列名称
  • logs:交换器的名称
  • "":路由键(routing key),在fanout类型中不需要使用,因此为空字符串

此时,logs 交换器会将接收到的所有消息无条件地追加到我们的队列中。为了验证绑定是否成功,可以使用下面的命令列出 RabbitMQ 中交换器的绑定关系:

rabbitmqctl list_bindings

这个命令会显示类似如下的输出:

exchange_name queue_name routing_key logs queue1 "" logs queue2 ""

输出结果会清晰地展示每个交换器与队列之间的绑定关系,包括交换器名称、队列名称和使用的路由键。在fanout类型的绑定中,路由键通常会显示为空字符串,因为fanout交换器会忽略路由键的设置。

错误速查

症状根因定位修复
多消费者但分发不均、某个消费者“吃撑”未设置 basicQos 或 prefetch 过大看消费者侧吞吐与未确认数(unacked)是否偏高
channel.basicQos(1..N);manual ack 后再调优 N
消费者异常后消息丢失autoAck=true或提前 ackbasicConsume的 autoAck 参数、日志中 ack 时机
autoAck=false;仅在业务完成后basicAck
失败消息无限重试、队列“打转”basicNack(..., requeue=true)对不可恢复错误也重回队列观察同一消息反复出现,且无退避
区分可恢复/不可恢复:不可恢复requeue=false或走 DLX/重试队列
重启后队列/消息消失队列非持久化 + 消息非持久化queueDeclare(durable=false)、发布属性是否为 persistent
生产:durable queue + persistent message;配合镜像/Quorum 队列策略视需求
发布订阅消费者断开后队列没了使用临时队列 exclusive/auto-delete 的预期行为queueDeclare(queue="", exclusive=true)或 server 生成队列名
需要持久订阅就改为命名队列 + durable;临时订阅保持现状
fanout 下设置 routingKey 但无效果fanout 忽略 routingKey检查exchange_type=fanout
需要按 key 路由就改 direct/topic;fanout 保持 routingKey 为空即可
发送到 exchange 后消费者收不到未 queueBind 或 bind 到错误 exchangerabbitmqctl list_bindings看是否存在绑定
确认queueBind(queueName,"logs","")与 exchange 名一致
使用默认交换器发送失败(无路由)routingKey(队列名)不存在或拼错生产者basicPublish的 routingKey 与实际队列名比对
先确保queueDeclare创建目标队列;routingKey 必须等于队列名
消费端进程退出导致不再消费try-with-resources 结束或主线程未阻塞看 main 是否退出、连接是否关闭
保持进程存活(你用 wait() 方式可行);或用更标准的生命周期管理
连接失败/权限错误vhost/用户名密码/权限不匹配RabbitMQ 日志与连接异常栈
校验 vhost 存在、用户对 vhost 的 configure/write/read 权限

其他系列

🚀 AI篇持续更新中(长期更新)

AI炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用AI工具指南!
AI研究-132 Java 生态前沿 2025:Spring、Quarkus、GraalVM、CRaC 与云原生落地
🔗 AI模块直达链接

💻 Java篇持续更新中(长期更新)

Java-196 消息队列选型:RabbitMQ vs RocketMQ vs Kafka
MyBatis 已完结,Spring 已完结,Nginx已完结,Tomcat已完结,分布式服务已完结,Dubbo已完结,MySQL已完结,MongoDB已完结,Neo4j已完结,FastDFS 已完结,OSS已完结,GuavaCache已完结,EVCache已完结,RabbitMQ正在更新… 深入浅出助你打牢基础!
🔗 Java模块直达链接

📊 大数据板块已完成多项干货更新(300篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!
大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT案例 详解
🔗 大数据模块直达链接

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/28 14:30:00

AI如何帮你一键生成虚拟串口通信程序

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 开发一个跨平台虚拟串口通信软件,使用Python实现,要求:1. 支持Windows/Linux/MacOS三平台;2. 提供GUI界面可配置串口参数&#xff08…

作者头像 李华
网站建设 2026/3/2 3:12:23

Flutter炫酷UI设计模板完全指南:10+跨平台界面实战教程

Flutter炫酷UI设计模板完全指南:10跨平台界面实战教程 【免费下载链接】awesome-flutter-ui 10 flutter(android, ios) UI design examples :zap: - login, books, profile, food order, movie streaming, walkthrough, widgets 项目地址: https://gitcode.com/gh…

作者头像 李华
网站建设 2026/3/2 15:01:18

基于朴素贝叶斯电商评价数据情感分析与预测任务书

河北东方学院本科毕业论文(设计)任务书题 目宋体五号居中学 院人工智能学院专 业宋体五号居中班级与教务系统班级一致学生姓名宋体五号居中指导教师张三(讲师)日 期2024年10月20日毕业论文(设计&#xff0…

作者头像 李华
网站建设 2026/3/7 22:46:41

Knockout.js无障碍测试实战指南:让动态内容完美适配屏幕阅读器

Knockout.js无障碍测试实战指南:让动态内容完美适配屏幕阅读器 【免费下载链接】knockout Knockout makes it easier to create rich, responsive UIs with JavaScript 项目地址: https://gitcode.com/gh_mirrors/kn/knockout 在当今追求极致用户体验的开发环…

作者头像 李华
网站建设 2026/3/2 19:58:52

Kotaemon医保政策问答:报销比例一键查询

医保政策问答系统的技术实现探讨在智能问答与自然语言处理技术快速发展的今天,越来越多的公共服务开始向数字化、自动化转型。像“Kotaemon医保政策问答”这类应用,虽然不属于传统意义上的嵌入式或功率电子系统,但其背后依然涉及一系列复杂的…

作者头像 李华
网站建设 2026/3/5 12:21:51

突破传统:CUT3R如何实现动态场景的实时三维建模

突破传统:CUT3R如何实现动态场景的实时三维建模 【免费下载链接】CUT3R Official implementation of Continuous 3D Perception Model with Persistent State 项目地址: https://gitcode.com/gh_mirrors/cu/CUT3R 在计算机视觉领域,实时三维感知模…

作者头像 李华