1.生产者和消费者确认机制
确认机制的本质:明确告诉对方:消息已经安全到达/已经被成功处理
如果没有确认机制:
生产者不知道消息有没有发成功
消费者不知道消息有没有处理成功
系统只能“猜”,必然丢消息
在消息队列中,生产者和消费者确认机制是确保消息可靠传递的关键。主要有两种确认机制:
生产者确认:当消息成功发送到交换器后,交换器会返回确认给生产者,确保消息的发送成功。
消费者确认:当消息被成功消费后,消费者会向交换器发送确认回执,确保消息的消费完成。
这两种机制有助于解决消息丢失的问题,确保在分布式系统中消息的可靠性和安全性
2.RabbitMQ的生产者确认机制
一般情况下,只要生产者与MQ之间的网络连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:
MQ内部处理消息的进程发生了异常
生产者发送消息到达MQ后未找到
Exchange生产者发送消息到达MQ的
Exchange后,未找到合适的Queue,因此无法路由
针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
具体如图所示:
总结如下:
当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
其它情况都会返回NACK,告知投递失败
其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
- 开启生产者确认
spring:rabbitmq:publisher-confirm-type:correlated# 开启publisher confirm机制,并设置confirm类型publisher-returns:true# 开启publisher return机制publisher-confirm-type 有三种模式:
none:关闭 confirm 机制
simple:以同步阻塞等待的方式返回 MQ 的回执消息
correlated:以异步回调方式的方式返回 MQ 的回执消息
每个 RabbitTemplate 只能配置一个 ReturnCallback
Java代码实现:
① Maven 依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency>② 生产者代码(Confirm + Return)
importcom.rabbitmq.client.*;publicclassRabbitProducerConfirmDemo{privatestaticfinalStringEXCHANGE="demo.exchange";privatestaticfinalStringROUTING_KEY="demo.key";publicstaticvoidmain(String[]args)throwsException{ConnectionFactoryfactory=newConnectionFactory();factory.setHost("localhost");Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel();// 开启 confirm 模式channel.confirmSelect();// Confirm 回调(消息是否到达 Broker)channel.addConfirmListener((deliveryTag,multiple)->System.out.println("消息发送成功,tag="+deliveryTag),(deliveryTag,multiple)->System.out.println("消息发送失败,tag="+deliveryTag));// Return 回调(路由失败)channel.addReturnListener(returnMessage->System.out.println("路由失败:"+newString(returnMessage.getBody())));channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.DIRECT,true);Stringmsg="hello rabbit confirm";channel.basicPublish(EXCHANGE,ROUTING_KEY,true,// mandatoryMessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());System.out.println("消息已发送");}}confirmSelect():开启生产者确认
Confirm:保证消息写入 Broker
Return:保证消息路由正确
3.RabbitMQ的消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该消息
nack:消息处理失败,RabbitMQ需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:如果是业务异常,会自动返回
nack;如果是消息处理或校验异常,自动返回
reject;
返回Reject的常见异常有:
Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:
o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.
o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.
o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.
o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message<Foo> but Message<Bar> is received.
java.lang.NoSuchMethodException: Added in version 1.6.3.
java.lang.ClassCastException: Added in version 1.6.3.
通过下面的配置可以修改SpringAMQP的ACK处理方式:
spring:rabbitmq:listener:simple:acknowledge-mode:manual # 手动处理Java代码实现:
RabbitMQ消费者默认情况下是自动确认,这里就不给代码演示了
RabbitMQ 消费者确认(手动 ACK),先处理业务,再ACK
importcom.rabbitmq.client.*;publicclassRabbitConsumerAckDemo{privatestaticfinalStringQUEUE="demo.queue";publicstaticvoidmain(String[]args)throwsException{ConnectionFactoryfactory=newConnectionFactory();factory.setHost("localhost");Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel();channel.queueDeclare(QUEUE,true,false,false,null);// 手动 ACKbooleanautoAck=false;channel.basicConsume(QUEUE,autoAck,(consumerTag,message)->{try{Stringmsg=newString(message.getBody());System.out.println("收到消息:"+msg);// 模拟业务处理if(msg.contains("error")){thrownewRuntimeException("业务异常");}// 业务成功 → ACKchannel.basicAck(message.getEnvelope().getDeliveryTag(),false);}catch(Exceptione){// 业务失败 → NACK,重新入队 or DLQchannel.basicNack(message.getEnvelope().getDeliveryTag(),false,true// 是否重新入队);}},consumerTag->{});}}4.kafka的生产者确认机制
Kafka 的确认由acks参数控制。
ACK=0:这是最不可靠的模式。生产者在发送消息后不会等待来自服务器的确认。这意味着消息可能会在发送之后丢失,而生产者将无法知道它是否成功到达服务器。
ACK=1:这是默认模式,也是一种折衷方式。在这种模式下,生产者会在消息发送后等待来自分区领导者(leader)的确认,但不会等待所有副本(replicas)的确认。这意味着只要消息被写入分区领导者,生产者就会收到确认。如果分区领导者成功写入消息,但在同步到所有副本之前宕机,消息可能会丢失。
ACK=-1:这是最可靠的模式。在这种模式下,生产者会在消息发送后等待所有副本的确认。只有在所有副本都成功写入消息后,生产者才会收到确认。这确保了消息的可靠性,但会导致更长的延迟。
Java代码实现:
① Maven 依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency>② Kafka 生产者代码
acks=all:ISR 全部写成功才返回幂等生产者:防止重试导致重复消息
importorg.apache.kafka.clients.producer.*;importjava.util.Properties;publicclassKafkaProducerAckDemo{publicstaticvoidmain(String[]args){Properties props=newProperties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");// 关键配置props.put(ProducerConfig.ACKS_CONFIG,"all");props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);props.put(ProducerConfig.RETRIES_CONFIG,3);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String,String>producer=newKafkaProducer<>(props);ProducerRecord<String,String>record=newProducerRecord<>("demo-topic","order-1","hello kafka");producer.send(record,(metadata,exception)->{if(exception==null){System.out.println("发送成功:"+metadata.topic()+"-"+metadata.partition());}else{System.out.println("发送失败:"+exception.getMessage());}});producer.close();}}5.kafka的消费者确认机制
kafka没有显示ACK,而是用offest表示“确认”
(1)自动提交offset
enable.auto.commit=true定时提交 offset
业务还没处理完,offset 已提交
宕机 →消息丢失
(2)手动提交 offset(推荐)
enable.auto.commit=falseJava代码实现:
enable.auto.commit=false业务成功后再提交 offset
提供至少一次的语义
importorg.apache.kafka.clients.consumer.*;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;publicclassKafkaConsumerManualCommitDemo{publicstaticvoidmain(String[]args){Properties props=newProperties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG,"demo-group");// 关键配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("demo-topic"));while(true){ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String>record:records){System.out.println("消费消息:"+record.value());// 处理业务逻辑}// 业务成功后,手动提交 offsetconsumer.commitSync();}}}