news 2026/5/15 14:17:19

RabbitMQ和Kafka消息队列确认机制详解:生产者、消费者与消息可靠性

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
RabbitMQ和Kafka消息队列确认机制详解:生产者、消费者与消息可靠性

1.生产者和消费者确认机制

确认机制的本质:明确告诉对方:消息已经安全到达/已经被成功处理

如果没有确认机制:

  • 生产者不知道消息有没有发成功

  • 消费者不知道消息有没有处理成功

  • 系统只能“猜”,必然丢消息

在消息队列中,生产者和消费者确认机制是确保消息可靠传递的关键。主要有两种确认机制:

  • 生产者确认:当消息成功发送到交换器后,交换器会返回确认给生产者,确保消息的发送成功。

  • 消费者确认:当消息被成功消费后,消费者会向交换器发送确认回执,确保消息的消费完成。

这两种机制有助于解决消息丢失的问题,确保在分布式系统中消息的可靠性和安全性

2.RabbitMQ的生产者确认机制

一般情况下,只要生产者与MQ之间的网络连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常

  • 生产者发送消息到达MQ后未找到Exchange

  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执
具体如图所示:

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。

  • 开启生产者确认
spring:rabbitmq:publisher-confirm-type:correlated# 开启publisher confirm机制,并设置confirm类型publisher-returns:true# 开启publisher return机制

publisher-confirm-type 有三种模式:

  1. none:关闭 confirm 机制

  2. simple:以同步阻塞等待的方式返回 MQ 的回执消息

  3. correlated:以异步回调方式的方式返回 MQ 的回执消息

每个 RabbitTemplate 只能配置一个 ReturnCallback

Java代码实现:

① Maven 依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency>

② 生产者代码(Confirm + Return)

importcom.rabbitmq.client.*;publicclassRabbitProducerConfirmDemo{privatestaticfinalStringEXCHANGE="demo.exchange";privatestaticfinalStringROUTING_KEY="demo.key";publicstaticvoidmain(String[]args)throwsException{ConnectionFactoryfactory=newConnectionFactory();factory.setHost("localhost");Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel();// 开启 confirm 模式channel.confirmSelect();// Confirm 回调(消息是否到达 Broker)channel.addConfirmListener((deliveryTag,multiple)->System.out.println("消息发送成功,tag="+deliveryTag),(deliveryTag,multiple)->System.out.println("消息发送失败,tag="+deliveryTag));// Return 回调(路由失败)channel.addReturnListener(returnMessage->System.out.println("路由失败:"+newString(returnMessage.getBody())));channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.DIRECT,true);Stringmsg="hello rabbit confirm";channel.basicPublish(EXCHANGE,ROUTING_KEY,true,// mandatoryMessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());System.out.println("消息已发送");}}

confirmSelect():开启生产者确认

Confirm:保证消息写入 Broker

Return:保证消息路由正确


3.RabbitMQ的消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息

  • nack:消息处理失败,RabbitMQ需要再次投递消息

  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack

    • 如果是消息处理或校验异常,自动返回reject;

返回Reject的常见异常有:

Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:

  • o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.

  • o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.

  • o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.

  • o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message<Foo> but Message<Bar> is received.

  • java.lang.NoSuchMethodException: Added in version 1.6.3.

  • java.lang.ClassCastException: Added in version 1.6.3.

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:rabbitmq:listener:simple:acknowledge-mode:manual # 手动处理

Java代码实现:

  1. RabbitMQ消费者默认情况下是自动确认,这里就不给代码演示了

  2. RabbitMQ 消费者确认(手动 ACK),先处理业务,再ACK

importcom.rabbitmq.client.*;publicclassRabbitConsumerAckDemo{privatestaticfinalStringQUEUE="demo.queue";publicstaticvoidmain(String[]args)throwsException{ConnectionFactoryfactory=newConnectionFactory();factory.setHost("localhost");Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel();channel.queueDeclare(QUEUE,true,false,false,null);// 手动 ACKbooleanautoAck=false;channel.basicConsume(QUEUE,autoAck,(consumerTag,message)->{try{Stringmsg=newString(message.getBody());System.out.println("收到消息:"+msg);// 模拟业务处理if(msg.contains("error")){thrownewRuntimeException("业务异常");}// 业务成功 → ACKchannel.basicAck(message.getEnvelope().getDeliveryTag(),false);}catch(Exceptione){// 业务失败 → NACK,重新入队 or DLQchannel.basicNack(message.getEnvelope().getDeliveryTag(),false,true// 是否重新入队);}},consumerTag->{});}}

4.kafka的生产者确认机制

Kafka 的确认由acks参数控制。

  • ACK=0:这是最不可靠的模式。生产者在发送消息后不会等待来自服务器的确认。这意味着消息可能会在发送之后丢失,而生产者将无法知道它是否成功到达服务器。

  • ACK=1:这是默认模式,也是一种折衷方式。在这种模式下,生产者会在消息发送后等待来自分区领导者(leader)的确认,但不会等待所有副本(replicas)的确认。这意味着只要消息被写入分区领导者,生产者就会收到确认。如果分区领导者成功写入消息,但在同步到所有副本之前宕机,消息可能会丢失。

  • ACK=-1:这是最可靠的模式。在这种模式下,生产者会在消息发送后等待所有副本的确认。只有在所有副本都成功写入消息后,生产者才会收到确认。这确保了消息的可靠性,但会导致更长的延迟。

Java代码实现:

① Maven 依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency>

② Kafka 生产者代码

acks=all:ISR 全部写成功才返回

幂等生产者:防止重试导致重复消息

importorg.apache.kafka.clients.producer.*;importjava.util.Properties;publicclassKafkaProducerAckDemo{publicstaticvoidmain(String[]args){Properties props=newProperties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");// 关键配置props.put(ProducerConfig.ACKS_CONFIG,"all");props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);props.put(ProducerConfig.RETRIES_CONFIG,3);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String,String>producer=newKafkaProducer<>(props);ProducerRecord<String,String>record=newProducerRecord<>("demo-topic","order-1","hello kafka");producer.send(record,(metadata,exception)->{if(exception==null){System.out.println("发送成功:"+metadata.topic()+"-"+metadata.partition());}else{System.out.println("发送失败:"+exception.getMessage());}});producer.close();}}

5.kafka的消费者确认机制

kafka没有显示ACK,而是用offest表示“确认”

(1)自动提交offset

enable.auto.commit=true
  • 定时提交 offset

  • 业务还没处理完,offset 已提交

  • 宕机 →消息丢失

(2)手动提交 offset(推荐)

enable.auto.commit=false

Java代码实现:

  • enable.auto.commit=false

  • 业务成功后再提交 offset

  • 提供至少一次的语义

importorg.apache.kafka.clients.consumer.*;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;publicclassKafkaConsumerManualCommitDemo{publicstaticvoidmain(String[]args){Properties props=newProperties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG,"demo-group");// 关键配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("demo-topic"));while(true){ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String>record:records){System.out.println("消费消息:"+record.value());// 处理业务逻辑}// 业务成功后,手动提交 offsetconsumer.commitSync();}}}
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/15 14:17:02

从零构建图Agent系统:基于DP-420文档的4步极速落地法

第一章&#xff1a;MCP DP-420 图 Agent 系统概述MCP DP-420 图 Agent 系统是一种专为分布式图数据处理与智能代理协同计算设计的架构平台&#xff0c;广泛应用于大规模知识图谱、网络拓扑分析及多智能体系统中。该系统通过模块化设计实现了图结构数据的高效存储、动态更新与并…

作者头像 李华
网站建设 2026/5/15 14:17:19

GRETNA 2.0.0脑网络分析5步实战指南:从数据到可视化

GRETNA 2.0.0脑网络分析5步实战指南&#xff1a;从数据到可视化 【免费下载链接】GRETNA A Graph-theoretical Network Analysis Toolkit in MATLAB 项目地址: https://gitcode.com/gh_mirrors/gr/GRETNA 您是否曾在脑功能网络分析中感到力不从心&#xff1f;面对海量的…

作者头像 李华
网站建设 2026/5/10 20:03:15

大内存通过mmap分配,释放后什么情况不能通过munmap直接归还OS

大于等于128KB的内存分配通常使用mmap&#xff0c;其释放一般能通过munmap直接归还操作系统&#xff0c;但在某些特定情况下&#xff0c;可能无法立即或完整地归还。下面这个表格汇总了这些情况及其原因。情况分类具体场景原因简析系统资源限制​进程的虚拟内存区域&#xff08…

作者头像 李华
网站建设 2026/5/15 7:42:05

系统发育树可视化新体验:TreeViewer功能全解析

系统发育树可视化新体验&#xff1a;TreeViewer功能全解析 【免费下载链接】TreeViewer Cross-platform software to draw phylogenetic trees 项目地址: https://gitcode.com/gh_mirrors/tr/TreeViewer 在生物信息学研究中&#xff0c;系统发育树的可视化是理解物种进化…

作者头像 李华
网站建设 2026/5/12 8:02:14

MediaPipe边缘部署终极指南:避坑手册与性能优化指南

为什么你的MediaPipe在Jetson上总是安装失败&#xff1f; 【免费下载链接】mediapipe Cross-platform, customizable ML solutions for live and streaming media. 项目地址: https://gitcode.com/gh_mirrors/me/mediapipe MediaPipe作为Google开源的多媒体机器学习框架…

作者头像 李华
网站建设 2026/5/9 20:11:18

智能赋能绿色共生:智慧园区的发展逻辑与实践路径

城市化进程加速推动下&#xff0c;城市空间的高效利用与可持续发展已成为核心议题。智慧园区作为现代城市发展的关键载体&#xff0c;正通过智能技术与绿色理念的深度融合&#xff0c;突破传统园区资源浪费、管理粗放的瓶颈&#xff0c;为城市高质量发展注入强劲动能。这种“智…

作者头像 李华