亲宝软件园·资讯

展开

RabbitMQ消费端ACK NACK及重回队列机制详解

刨红薯的小羊竿尔 人气:0

ACK和NACK

当我们使用RabbitMQ时用于网络异常,业务处理异常或者业务错误导致消息无法立即消费时,在这种情况下,传输中的信息将无法正常投递——它们需要被重新投递。Acknowledgements机制让服务器和客户端知道何时需要重新投递。

当设置autoACK=false 时,就可以使用手工ACK。 其实手工方式包括了手工ACK、手工NACK。

使用方式

1、basicNack

Consumer消费时,若由于业务异常,可手工 NACK 记录日志,然后进行补偿: basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息第三个参数是否requeue,如果requeue 参数设置为true ,则RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者, 如果requeue 参数设置为false ,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。

multiple 参数设置为false 则表示拒绝编号为deliveryTag的这一条消息,这时候basicNack 和basicReject 方法一样;multiple 参数设置为true 则表示拒绝deliveryTag 编号之前所有未被当前消费者确认的消息。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

2、basicAck

如果由于服务器宕机等严重问题,就需要手工 ACK 保障Con消费成功:deliveryTag : 每次接收消息+1,可以做此消息处理通道的名字。

void basicAck(long deliveryTag, boolean multiple)

3、basicReject

basic.Reject拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。该方法reject后,该消费者还是会消费到该条被reject的消息。reject一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack 这个命令。

void basicReject(long deliveryTag, boolean requeue);

4、basicRecover

basic.recover是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。

channel.basicRecover(true);

消费端的重回队列

重回队列针对没有处理成功的消息,将消息重新投递给Broker。
重回队列会把消费失败的消息重新添加到队列尾端,供Consumer重新消费。
一般在实际应用中,都会关闭重回队列,即设置为false。

在确认ack后再把消息通过basicPublish到队列尾部,防止正常消息堆积。

channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,JSON.toJSONBytes(new Object()));

实际运用

@RabbitListener(queues = "${rabbitmq.one-queue}",
        containerFactory = "rabbitListenerContainerFactory")
@RabbitHandler
public void doRmOnduty(Message message, Channel channel) {
    try {
        String body = new String(message.getBody());
        //log.info("消息处理:{}",body);
    } catch (Exception e) {
        //消费异常,设置重回队列
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    } finally {
        //最终确认消息消费成功
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

ACK机制可以保证Con拉取到了消息,若处理失败了,则队列中还有这个消息,仍然可以给Con处理。ack机制是 Con 告诉 Broker 当前消息是否成功消费,至于 Broker 如何处理 NACK,取决于 Con 是否设置了 requeue:如果 requeue=false, 则NACK 后 Broker 还是会删除消息的。

但一般处理消息失败都是因为代码逻辑出bug,即使队列中后来仍然保留该消息,然后再给Con消费,依旧报错。 当然,若一台机器宕机,消息还有,还可以给另外机器消费,这种情景下 ACK 很有用。

如果不使用 ACK 机制,直接把出错消息存库,便于日后查bug或重新执行。

加载全部内容

相关教程
猜你喜欢
用户评论