RabbitMQ消费端ACK NACK及重回队列机制详解
刨红薯的小羊竿尔 人气:0ACK和NACK
当我们使用RabbitMQ时用于网络异常,业务处理异常或者业务错误导致消息无法立即消费时,在这种情况下,传输中的信息将无法正常投递——它们需要被重新投递。Acknowledgements机制让服务器和客户端知道何时需要重新投递。
当设置autoACK=false 时,就可以使用手工ACK。 其实手工方式包括了手工ACK、手工NACK。
- 手工 ACK 时,会发送给Broker一个应答,代表消息处理成功,Broker就可回送响应给Pro;
- NACK 则表示消息处理失败,如果设置了重回队列,Broker端就会将没有成功处理的消息重新发送。
使用方式
1、basicNack
Consumer消费时,若由于业务异常,可手工 NACK 记录日志,然后进行补偿: basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,如果requeue 参数设置为true ,则RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者, 如果requeue 参数设置为false ,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。
multiple 参数设置为false 则表示拒绝编号为deliveryTag的这一条消息,这时候basicNack 和basicReject 方法一样;multiple 参数设置为true 则表示拒绝deliveryTag 编号之前所有未被当前消费者确认的消息。
- 参数1: 消息
- 参数2: 是否应用于多消息
- 参数3: 是否重新放回队列,否则丢弃或者进入死信队列
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
2、basicAck
如果由于服务器宕机等严重问题,就需要手工 ACK 保障Con消费成功:deliveryTag
: 每次接收消息+1,可以做此消息处理通道的名字。
void basicAck(long deliveryTag, boolean multiple)
3、basicReject
basic.Reject拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。该方法reject后,该消费者还是会消费到该条被reject的消息。reject一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack 这个命令。
- 参数1: 消息
- 参数2: 是否重新放回队列,否则丢弃或者进入死信队列
void basicReject(long deliveryTag, boolean requeue);
4、basicRecover
basic.recover是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
channel.basicRecover(true);
消费端的重回队列
重回队列针对没有处理成功的消息,将消息重新投递给Broker。
重回队列会把消费失败的消息重新添加到队列尾端,供Consumer重新消费。
一般在实际应用中,都会关闭重回队列,即设置为false。
在确认ack后再把消息通过basicPublish到队列尾部,防止正常消息堆积。
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,JSON.toJSONBytes(new Object()));
实际运用
@RabbitListener(queues = "${rabbitmq.one-queue}", containerFactory = "rabbitListenerContainerFactory") @RabbitHandler public void doRmOnduty(Message message, Channel channel) { try { String body = new String(message.getBody()); //log.info("消息处理:{}",body); } catch (Exception e) { //消费异常,设置重回队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } finally { //最终确认消息消费成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
ACK机制可以保证Con拉取到了消息,若处理失败了,则队列中还有这个消息,仍然可以给Con处理。ack机制是 Con 告诉 Broker 当前消息是否成功消费,至于 Broker 如何处理 NACK,取决于 Con 是否设置了 requeue:如果 requeue=false, 则NACK 后 Broker 还是会删除消息的。
但一般处理消息失败都是因为代码逻辑出bug,即使队列中后来仍然保留该消息,然后再给Con消费,依旧报错。 当然,若一台机器宕机,消息还有,还可以给另外机器消费,这种情景下 ACK 很有用。
如果不使用 ACK 机制,直接把出错消息存库,便于日后查bug或重新执行。
加载全部内容