news 2026/4/15 12:04:34

15、RabbitMQ

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
15、RabbitMQ

RabbitMQ是一个开源的消息队列系统,实现了高级消息队列协议(AMQP)。它提供了强大的消息传递功能,支持多种消息传递模式,是分布式系统中常用的消息中间件。


RabbitMQ核心概念

消息中间件

消息中间件是分布式系统中重要的组件,用于:

  • 解耦:生产者和消费者之间不需要直接通信
  • 异步:提高系统的响应性和吞吐量
  • 削峰:缓冲瞬时高并发请求
  • 可靠:确保消息不丢失

基本概念

  1. Producer(生产者):发送消息的应用程序
  2. Consumer(消费者):接收消息的应用程序
  3. Queue(队列):存储消息的缓冲区
  4. Exchange(交换机):接收生产者发送的消息,根据路由规则将消息转发到队列
  5. Binding(绑定):交换机和队列之间的连接关系
  6. Routing Key(路由键):消息发送时指定的路由信息
  7. Connection(连接):应用程序与RabbitMQ服务器之间的TCP连接
  8. Channel(通道):建立在连接之上的虚拟连接

工作模式

1. 简单模式(Simple)

一个生产者对应一个消费者,最简单的消息传递模式。

生产者代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuesimpleQueue(){returnnewQueue("simple.queue",true);}}@ServicepublicclassSimpleProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend("simple.queue",message);System.out.println("发送消息:"+message);}}

消费者代码:

@Component@RabbitListener(queues="simple.queue")publicclassSimpleConsumer{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("接收消息:"+message);// 处理消息逻辑}}

2. 工作队列模式(Work Queue)

一个生产者对应多个消费者,消费者竞争消费消息,实现负载均衡。

生产者代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicQueueworkQueue(){returnnewQueue("work.queue",true);}}@ServicepublicclassWorkProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend("work.queue",message);System.out.println("发送消息:"+message);}}

消费者代码:

@ComponentpublicclassWorkConsumer1{@RabbitListener(queues="work.queue")publicvoidreceiveMessage(Stringmessage){System.out.println("消费者1接收消息:"+message);try{// 模拟处理时间Thread.sleep(1000);}catch(InterruptedExceptione){Thread.currentThread().interrupt();}System.out.println("消费者1处理完成:"+message);}}@ComponentpublicclassWorkConsumer2{@RabbitListener(queues="work.queue")publicvoidreceiveMessage(Stringmessage){System.out.println("消费者2接收消息:"+message);try{// 模拟处理时间Thread.sleep(2000);}catch(InterruptedExceptione){Thread.currentThread().interrupt();}System.out.println("消费者2处理完成:"+message);}}

3. 发布订阅模式(Publish/Subscribe)

一个生产者发送消息,通过交换机广播给多个队列,多个消费者分别处理。

配置代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanout.exchange");}@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1",true);}@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2",true);}@BeanpublicBindingbinding1(FanoutExchangefanoutExchange,QueuefanoutQueue1){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@BeanpublicBindingbinding2(FanoutExchangefanoutExchange,QueuefanoutQueue2){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

生产者代码:

@ServicepublicclassFanoutProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend("fanout.exchange","",message);System.out.println("发布消息:"+message);}}

消费者代码:

@Component@RabbitListener(queues="fanout.queue1")publicclassFanoutConsumer1{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("消费者1接收消息:"+message);}}@Component@RabbitListener(queues="fanout.queue2")publicclassFanoutConsumer2{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("消费者2接收消息:"+message);}}

4. 路由模式(Routing)

根据路由键将消息发送到指定的队列。

配置代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange("direct.exchange");}@BeanpublicQueuedirectQueue1(){returnnewQueue("direct.queue1",true);}@BeanpublicQueuedirectQueue2(){returnnewQueue("direct.queue2",true);}@BeanpublicBindingbindingDirect1(DirectExchangedirectExchange,QueuedirectQueue1){returnBindingBuilder.bind(directQueue1).to(directExchange).with("info");}@BeanpublicBindingbindingDirect2(DirectExchangedirectExchange,QueuedirectQueue2){returnBindingBuilder.bind(directQueue2).to(directExchange).with("error");}}

生产者代码:

@ServicepublicclassDirectProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendInfoMessage(Stringmessage){rabbitTemplate.convertAndSend("direct.exchange","info","INFO: "+message);System.out.println("发送INFO消息:"+message);}publicvoidsendErrorMessage(Stringmessage){rabbitTemplate.convertAndSend("direct.exchange","error","ERROR: "+message);System.out.println("发送ERROR消息:"+message);}}

消费者代码:

@Component@RabbitListener(queues="direct.queue1")publicclassDirectConsumer1{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("INFO消费者接收消息:"+message);}}@Component@RabbitListener(queues="direct.queue2")publicclassDirectConsumer2{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("ERROR消费者接收消息:"+message);}}

5. 主题模式(Topic)

根据通配符匹配的路由键将消息发送到相应的队列。

配置代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange("topic.exchange");}@BeanpublicQueuetopicQueue1(){returnnewQueue("topic.queue1",true);}@BeanpublicQueuetopicQueue2(){returnnewQueue("topic.queue2",true);}@BeanpublicBindingbindingTopic1(TopicExchangetopicExchange,QueuetopicQueue1){returnBindingBuilder.bind(topicQueue1).to(topicExchange).with("user.*");}@BeanpublicBindingbindingTopic2(TopicExchangetopicExchange,QueuetopicQueue2){returnBindingBuilder.bind(topicQueue2).to(topicExchange).with("user.#");}}

生产者代码:

@ServicepublicclassTopicProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendUserCreateMessage(Stringmessage){rabbitTemplate.convertAndSend("topic.exchange","user.create","创建用户:"+message);System.out.println("发送用户创建消息:"+message);}publicvoidsendUserUpdateMessage(Stringmessage){rabbitTemplate.convertAndSend("topic.exchange","user.update","更新用户:"+message);System.out.println("发送用户更新消息:"+message);}publicvoidsendUserDeleteMessage(Stringmessage){rabbitTemplate.convertAndSend("topic.exchange","user.delete","删除用户:"+message);System.out.println("发送用户删除消息:"+message);}}

消费者代码:

@Component@RabbitListener(queues="topic.queue1")publicclassTopicConsumer1{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("用户操作消费者1接收消息:"+message);}}@Component@RabbitListener(queues="topic.queue2")publicclassTopicConsumer2{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("用户操作消费者2接收消息:"+message);}}

高级特性

消息确认机制

生产者确认:

@ConfigurationpublicclassRabbitConfig{@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);// 启用发布者确认rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{if(ack){System.out.println("消息发送成功:"+correlationData.getId());}else{System.out.println("消息发送失败:"+cause);}});// 启用发布者返回rabbitTemplate.setReturnsCallback(returned->{System.out.println("消息无法路由:"+returned.getMessage());});returnrabbitTemplate;}}

消费者确认:

@Component@RabbitListener(queues="confirm.queue")publicclassConfirmConsumer{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage,Channelchannel,Messagemsg)throwsIOException{try{System.out.println("接收消息:"+message);// 处理消息逻辑// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);}catch(Exceptione){// 处理失败,拒绝消息并重新入队channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,true);}}}

死信队列

处理无法被正常消费的消息。

配置代码:

@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuenormalQueue(){Map<String,Object>args=newHashMap<>();args.put("x-dead-letter-exchange","dead.letter.exchange");args.put("x-dead-letter-routing-key","dead.letter");args.put("x-message-ttl",60000);// 消息TTL 60秒returnnewQueue("normal.queue",true,false,false,args);}@BeanpublicQueuedeadLetterQueue(){returnnewQueue("dead.letter.queue",true);}@BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange("dead.letter.exchange");}@BeanpublicBindingdeadLetterBinding(DirectExchangedeadLetterExchange,QueuedeadLetterQueue){returnBindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead.letter");}}

死信消费者:

@Component@RabbitListener(queues="dead.letter.queue")publicclassDeadLetterConsumer{@RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println("处理死信消息:"+message);// 处理死信消息的逻辑}}

消息持久化

@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuedurableQueue(){// durable=true 表示队列持久化returnnewQueue("durable.queue",true);}@BeanpublicExchangedurableExchange(){// durable=true 表示交换机持久化returnnewDirectExchange("durable.exchange",true,false);}}@ServicepublicclassDurableProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){// MessageProperties 设置消息持久化MessagePropertiesproperties=newMessageProperties();properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Messagemsg=newMessage(message.getBytes(),properties);rabbitTemplate.send("durable.exchange","durable.key",msg);}}

监控和管理

管理界面功能

RabbitMQ管理界面提供了丰富的监控和管理功能:

  1. Overview:查看整体状态和统计信息
  2. Connections:查看连接信息
  3. Channels:查看通道信息
  4. Exchanges:管理交换机
  5. Queues:管理队列
  6. Admin:用户和权限管理

常用监控命令

# 查看队列信息 rabbitmqctl list_queues name messages consumers # 查看交换机信息 rabbitmqctl list_exchanges # 查看绑定关系 rabbitmqctl list_bindings # 查看连接信息 rabbitmqctl list_connections

Spring Boot Actuator集成

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
management:endpoints:web:exposure:include:health,info,rabbitendpoint:rabbit:enabled:true

性能优化

1. 连接复用

@ConfigurationpublicclassRabbitConfig{@BeanpublicConnectionFactoryconnectionFactory(){CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin123");// 设置连接缓存connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);connectionFactory.setConnectionCacheSize(10);// 设置通道缓存connectionFactory.setChannelCacheSize(25);returnconnectionFactory;}}

2. 批量发送

@ServicepublicclassBatchProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendBatchMessages(List<String>messages){List<Message>messageList=messages.stream().map(msg->MessageBuilder.withBody(msg.getBytes()).build()).collect(Collectors.toList());// 批量发送消息for(Messagemessage:messageList){rabbitTemplate.send("batch.exchange","batch.routing.key",message);}}}

3. 消费者优化

@ConfigurationpublicclassRabbitConfig{@BeanpublicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(ConnectionFactoryconnectionFactory){SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置消费者并发数factory.setConcurrentConsumers(3);factory.setMaxConcurrentConsumers(10);// 设置预取数量factory.setPrefetchCount(5);// 设置手动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);returnfactory;}}

故障排查

常见问题

  1. 消息丢失
    • 检查队列和消息是否持久化
    • 确认生产者和消费者确认机制是否正确配置
  2. 消息重复消费
    • 确保消费者处理逻辑的幂等性
    • 使用消息ID进行去重
  3. 连接断开
    • 检查网络连接稳定性
    • 配置连接重试机制
  4. 性能问题
    • 监控队列长度和消费者数量
    • 调整预取数量和并发消费者数

日志配置

<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency>
logging:level:com.rabbitmq:DEBUGorg.springframework.amqp:DEBUG
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/15 12:01:34

SMARTBI根据参数的不同值跳转至不同的报表

1、场景: 报表A上有返回按钮,点击可以返回至上一级的报表。但是有两个驾驶舱下钻时会下钻至同一个报表A,那报表A在返回时就需要做以下判断,判断是哪个驾驶舱下钻下来的,再跳转回对应的驾驶舱。实现方法可参考以下方式: 2、引入参数用于区分跳转位置 新建一个数据集,引…

作者头像 李华
网站建设 2026/4/15 12:01:30

Wan2.2-T2V-5B在儿童教育动画中的应用设想

Wan2.2-T2V-5B在儿童教育动画中的应用设想 你有没有想过&#xff0c;一个幼儿园老师&#xff0c;只需要打几个字——“三只小黄鸭在池塘游泳&#xff0c;卡通风格”——不到两秒&#xff0c;一段生动的小动画就出现在屏幕上&#xff1f;可以直接放进课件、发到家长群&#xff0…

作者头像 李华
网站建设 2026/4/14 23:13:06

Zig 编程语言 v0.15.2 中文手册

作为一个Copilot自动填充的HelloWorld都跑不起来的快速变更的新语言, 跟进新的手册学习显然很有必要. 使用Claude 4.5 Sonnet翻译, 下载: Water-Run/llm-translate-documents: 一些使用大模型翻译的英文文档. 或直接: git clone https://github.com/Water-Run/llm-translate-…

作者头像 李华
网站建设 2026/4/15 6:22:44

施耐德BMXNOE0110:Modicon M580平台的工业级以太网通信模块

施耐德电气BMXNOE0110是专为其旗舰Modicon M580可编程自动化控制器&#xff08;ePAC&#xff09;设计的一款标准型工业以太网通信模块。该模块是构建基于以太网架构的现代自动化系统的基础通信单元&#xff0c;负责为控制器提供稳定、高效的网络连接能力&#xff0c;是实现设备…

作者头像 李华
网站建设 2026/4/10 20:21:58

Wan2.2-T2V-A14B在跨模态理解方面的核心优势解析

Wan2.2-T2V-A14B在跨模态理解方面的核心优势解析 你有没有想过&#xff0c;有一天只需要写一句话&#xff1a;“一个穿红裙的女孩在黄昏的海边奔跑&#xff0c;海浪轻拍她的脚踝&#xff0c;夕阳把沙滩染成金色”&#xff0c;AI就能立刻为你生成一段720P高清、动作自然、光影细…

作者头像 李华
网站建设 2026/4/13 11:26:50

3秒语音克隆革命:NeuTTS Air重构嵌入式语音交互新范式

3秒语音克隆革命&#xff1a;NeuTTS Air重构嵌入式语音交互新范式 【免费下载链接】neutts-air 项目地址: https://ai.gitcode.com/hf_mirrors/neuphonic/neutts-air 在当今AI语音技术快速发展的时代&#xff0c;NeuTTS Air凭借其突破性的3秒语音克隆能力和本地化部署优…

作者头像 李华