RabbitMQ消息发布和消费的确认机制
长沙大鹏 人气:3前言
新公司项目使用的消息队列是RabbitMQ,之前其实没有在实际项目上用过RabbitMQ,所以对它的了解都谈不上入门。趁着周末休息的时间也猛补习了一波,写了两个窗体应用,一个消息发布端和消息消费端。园子里解释RabbitMQ基础的很多了,这里就不对RabbitMQ的基础再做叙述了,来点实际工作中一定会碰到的问题和解决的方案。
RabbitMQ 消息发布确认机制
默认情况下消息发布端执行BasicPublish方法后,消息是否到达指定的队列的结果发布端是未知的。BasicPublish方法的返回值是void。假设我们想对消息进行监控,针对消息发送失败后进行补发则需要一个消息确认机制来帮我们实现。
- 事务机制
- Confirm确认机制
上面是已知可通过RabbitMQ自带的特性实现消息确认机制的两种方式。
事务机制
事务机制依赖三个RabbitMQ提供的方法
- txSelect()
- txCommit()
- txRollback()
看名字大概知道意思了,特别是Commit和Rollback,使用方式和数据库的事务使用几乎一样,txSelect()声明事务的开始,txCommit()提交事务,txRollBack()执行提交失败后的回滚。
使用代码如下:
// 采取RabbitMQ事务方式传输消息 private void SendMessageByTransaction(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName, string routingKey, EnumRabbitExchangeType exchangeType, string message) { try { ConnectionFactory rabbitMqFactory = new ConnectionFactory() { HostName = mqConnection.HostName, UserName = mqConnection.UserName, Password = mqConnection.Password, Port = mqConnection.Port }; using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null); // 必须执行QueueBind 需要将routingKey与队列和交换机进行绑定 否则就算事务提交了队列也不会有数据~ channel.QueueBind(queueName, exchangeName, routingKey); byte[] messagebuffer = Encoding.UTF8.GetBytes(message); try { channel.TxSelect(); channel.BasicPublish(exchangeName, routingKey, null, messagebuffer); //if (1 == 1) throw new Exception("没错!我是故意抛出异常的!看看最终队列是否写入了消息~"); channel.TxCommit(); } catch (Exception ex) { Rtx_Receive.Text = Rtx_Receive.Text + $"\r 异常产生时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")},异常信息:{ex.Message}"; channel.TxRollback(); // TODO 进行补发OR其他逻辑处理 } Rtx_Receive.Text = Rtx_Receive.Text + $"\r 发送时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}"; } } } catch (Exception ex) { MessageBox.Show($"发送消息失败!{ex.Message}"); } }
需要注意的是 这里的事务其实也只能保证在执行BasicPublish方法后且TxCommit方法执行前但凡出现异常则回滚!
上面是什么意思呢?意思就是我只管消息发送到队列里,且在我定义的事务内没有出现异常,出现了异常则将发布的数据给撤销!
但是,如果事务也提交了,但是消息还是有可能不会送达队列里去!
比如,我将上面的代码改下
// 采取RabbitMQ事务方式传输消息 private void SendMessageByTransaction(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName, string routingKey, EnumRabbitExchangeType exchangeType, string message) { try { ConnectionFactory rabbitMqFactory = new ConnectionFactory() { HostName = mqConnection.HostName, UserName = mqConnection.UserName, Password = mqConnection.Password, Port = mqConnection.Port }; using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null); // 必须执行QueueBind 需要将routingKey与队列和交换机进行绑定 否则就算事务提交了队列也不会有数据~ // channel.QueueBind(queueName, exchangeName, routingKey); byte[] messagebuffer = Encoding.UTF8.GetBytes(message); try { channel.TxSelect(); channel.BasicPublish(exchangeName, routingKey, null, messagebuffer); //if (1 == 1) throw new Exception("没错!我是故意抛出异常的!看看最终队列是否写入了消息~"); channel.TxCommit(); } catch (Exception ex) { Rtx_Receive.Text = Rtx_Receive.Text + $"\r 异常产生时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")},异常信息:{ex.Message}"; channel.TxRollback(); // TODO 进行补发OR其他逻辑处理 } Rtx_Receive.Text = Rtx_Receive.Text + $"\r 发送时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}"; } } } catch (Exception ex) { MessageBox.Show($"发送消息失败!{ex.Message}"); } }
上面代码我将`channel.QueueBind(queueName, exchangeName, routingKey);` 这一行代码注释掉,不将routingKey进行绑定,然后在RabbitMQ管理页面将队列、交换机删除。如下图
再执行代码,发现队列是创建了,交换器也创建了,但是队列里没有数据!
当然,问题出在没有将队列和交换器以及routingKey进行绑定,我们的消息没有进入到队列的路由,最终导致了消息进入了所谓的“黑洞”。
所以上面的队列不是说能完全保证只要执行了TxRollback()我们的消息队列就一定会有数据!!!
Confirm确认机制
Confirm确认机制也很容易理解,它要求消息生产端(Producer)对消息发送后RabbitMQ服务端返回一个已接收的指令,Producer收到该指令则认为该消息已经发送成功。同时消费端(Consumer)也有同样的机制,在从RabbitMQ服务端接收到消息后,需要返回一个已处理的指令给服务端,服务端收到后则会认为该消息已被消费。
下面是采取Confirm确认机制后的发布消息代码
// 采取确认机制方式传输消息 private void SendMessageByAck(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName, string routingKey, EnumRabbitExchangeType exchangeType, string message) { try { ConnectionFactory rabbitMqFactory = new ConnectionFactory() { HostName = mqConnection.HostName, UserName = mqConnection.UserName, Password = mqConnection.Password, Port = mqConnection.Port }; using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: false, autoDelete: false, arguments: null); channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null); // 必须执行QueueBind 需要将routingKey与队列和交换机进行绑定 否则就算事务提交了队列也不会有数据~ channel.QueueBind(queueName, exchangeName, routingKey); byte[] messagebuffer = Encoding.UTF8.GetBytes(message); channel.ConfirmSelect(); // 启用服务器确认机制方式 channel.BasicPublish(exchangeName, routingKey, mandatory: true, null, messagebuffer); if (channel.WaitForConfirms()) { Rtx_Receive.Text = $"\r 消息发送成功! 发送时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}"; } } } } catch (Exception ex) { MessageBox.Show($"发送消息失败!{ex.Message}"); } }
关键代码在于执行BasicPublish之前调用channel.ConfirmSelect()启用服务器确认,然后在发布后通过调用 WaitForConfirms()得到消息发布结果,
true则表示消息已发布到了队列里。 OK,现在试下,到服务器上删除掉队列、交换机信息。然后代码去掉绑定交换机和路由键后试下,看看是否和事务方式一样无法确认消息是否真正抵达队列。
删除队列和交换机,接下来更改代码,直接把上面的 channel.QueueBind(queueName, exchangeName, routingKey);注释掉,然后执行下,看看channel.WaitForConfirms()返回true还是false~
哈哈哈~执行结果是true 但是我们队列是不会有消息进来的,所以确认机制和事务机制对消息的发布是否抵达队列监控是一样的,没有说哪一种方式能绝对保证消息抵达了队列
针对消息提交到了指定交换机但是最终没有写入到队列的消息如何追踪
我们有一种方式可以捕获发布了消息但是该消息最终没有写入到队列的情况,我们需要注册IModel的BasicReturn事件,更新后的代码如下:
// 采取确认机制方式传输消息 private void SendMessageByAck(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName, string routingKey, EnumRabbitExchangeType exchangeType, string message) { try { ConnectionFactory rabbitMqFactory = new ConnectionFactory() { HostName = mqConnection.HostName, UserName = mqConnection.UserName, Password = mqConnection.Password, Port = mqConnection.Port }; using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: false, autoDelete: false, arguments: null); channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null); // 必须执行QueueBind 需要将routingKey与队列和交换机进行绑定 否则就算事务提交了队列也不会有数据~ channel.QueueBind(queueName, exchangeName, routingKey); byte[] messagebuffer = Encoding.UTF8.GetBytes(message); channel.ConfirmSelect(); // 启用服务器确认机制方式 channel.BasicReturn += Channel_BasicReturn; //mandatory为true表示交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ 会调用Basic.Return 命令将消息返回给生产者 channel.BasicPublish(exchangeName, routingKey, mandatory: true, null, messagebuffer); if (channel.WaitForConfirms()) { Rtx_Receive.Text = $"\r 消息发送成功! 发送时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}"; } } } } catch (Exception ex) { MessageBox.Show($"发送消息失败!{ex.Message}"); } } /// <summary> /// 当消息发送不到队列时候触发 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e) { BeginInvoke(new Action(() => { Rtx_Receive.Text = $"\r 消息发送失败!"; })); }
消息发布确认机制结论
1、事务机制与确认机制都无法百分之百确认消息是否写入到了缓存,可以理解为两者都只能确认发布的动作是否成功~但是消息有无进入队列是无法给予客户端准确结果;
2、两者性能比较而言事务的性能损耗更大;
3、注册IModel的BasicReturn事件可以追踪到没有写入到队列的消息
RabbitMQ 消息消费确认机制
上面已经知道了如何对消息的发布进行确认,那么消费数据时候我们肯定也想在消费完成后确认该消息已经处理,希望队列对其进行删除。
而不是在我们的消费端程序未将消息处理后,队列就将其删除了。
在此之前说下RabbitMQ消费者对象的两种实现方式
- 继承DefaultBasicConsumer类
- 实例化EventingBasicConsumer对象
继承DefaultBasicConsumer方式
DefaultBasicConsumer是RabbitMQ.Client提供的一个消费者基类,该类实现了IBasicConsumer接口。
继承DefaultBasicConsumer类后可重写基类的部分方法来实现消息获取以及当前消费者各个状态变更的事件,本文的示例代码即采用这种方式实现消费者对象。
实例化EventingBasicConsumer对象
这种方式采取注册事件的方式接受消息发布者推送到队列的消息,代码如下:
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { string message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"接受到消息为:{message}"); };
消费端确认消息已处理有两种方式
- 自动确认
- 手动确认
自动确认
自动确认我理解的就是服务端认为你接受到消息后即确认了,但是如果当你拿到消息后,依赖此消息的业务逻辑未处理完毕,但是却中途异常了的话,此消息也会消失掉!所以建议消费端采取手动确认!
手动确认
手动确认可以完美解决上面自动确认出现的问题,但是它也意味着我们开发者需要对确认的流程进行一个完整的闭环。即所有的消息在消费端获取到后必须有一个明确的结果返回给服务端(Broker)。 我们对消息的处理结果要么是确认处理,要么是拒绝该消息(返回给Broker,再分发给其他消费者)。如果我们没有对消息接收后进行任何反馈的话该条消息在队列的状态会变成Unacked 直到我们消费端AMQP连接中断后该消息状态又会变成Ready。状态为Unacked的消息会导致所有消费者都无法对该消息进行二次消费(包含当前消费者),所以此类消息越多则占用的内存资源也会越多。当消息变回Ready也会很烦人,因为我们已经对该消息进行过一次处理了,如若我们没有对消息进行校验则又会执行一遍。 所以手动确认必须执行回执!!!!!!
下面是手动确认消息的代码:
private void ReceiveMessage(RabbitMQConnectionDTO connectionDTO, string exchangeName, string queueName, string routtingKey) { try { var factory = new ConnectionFactory { HostName = connectionDTO.HostName, Password = connectionDTO.Password, UserName = connectionDTO.UserName, Port = connectionDTO.Port, }; UseDefaultBasicConsumerType(factory, queueName); //DirectAcceptExchangeEvent(factory, exchangeName, queueName, routtingKey); } catch (Exception ex) { Rtx_SendContext.Text = $"出现异常:{ex.Message}"; } } private void UseDefaultBasicConsumerType(ConnectionFactory factory, string queueName) { var connection = factory.CreateConnection(); _channel = connection.CreateModel(); // accept only one unack-ed message at a time // uint prefetchSize, ushort prefetchCount, bool global _channel.BasicQos(0, 1, false); //定义一个继承了DefaultBasicConsumer类的消费类(DefaultBasicConsumer是继承了IBasicConsumer接口的一个基类,里面存在许多可重写的方法) MessageReceiver messageReceiver = new MessageReceiver(_channel, (string msg, ulong deliveryTag) => { string key = Txt_Key.Text.Trim(); string keyNoReturn = Txt_KeyNoReturn.Text.Trim(); bool isExecFlag = false; if (!string.IsNullOrWhiteSpace(key) && msg.StartsWith(key)) // 这里要小心 如果只有当前1个消费者那你懂的~~~~~~ _channel.BasicReject(deliveryTag, requeue: true); //requeue表示消息被拒绝后是否重新放回queue中 else if (!string.IsNullOrWhiteSpace(keyNoReturn) && msg.StartsWith(keyNoReturn)) _channel.BasicReject(deliveryTag, requeue: false); //requeue表示消息被拒绝后是否重新放回queue中 else { _channel.BasicAck(deliveryTag, multiple: false); //确认已处理消息 multiple表示是否确认多条 isExecFlag = true; } BeginInvoke(new Action(() => { Rtx_SendContext.Text = Rtx_SendContext.Text + "\r" + $"处理标识{isExecFlag.ToString()} " + string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msg); })); }); _channel.BasicConsume(queueName, false, messageReceiver); //不开启自动确认回执 }
这里的消费者对象通过继承DefaultBasicConsumer对象而实现
代码如下:
public class MessageReceiver : DefaultBasicConsumer { private readonly Logger _logger; private readonly Action<string, ulong> _action; public MessageReceiver(IModel channel, Action<string, ulong> action) { _action = action; _logger = LogManager.GetCurrentClassLogger(); } public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) { string msg = Encoding.UTF8.GetString(body); _logger.Debug($"***************************Consuming Topic Message 时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}*********************************"); _logger.Debug(string.Concat("Message received from the exchange ", exchange)); _logger.Debug(string.Concat("Consumer tag: ", consumerTag)); _logger.Debug(string.Concat("Delivery tag: ", deliveryTag)); _logger.Debug(string.Concat("Routing tag: ", routingKey)); _logger.Debug(string.Concat("Message: ", msg)); _action?.Invoke(msg, deliveryTag); } /// <summary> /// 捕获通道连接的关闭事件 /// </summary> /// <param name="model"></param> /// <param name="reason"></param> public override void HandleModelShutdown(object model, ShutdownEventArgs reason) { _logger.Debug($"进入MessageReceiver.HandleModelShutdown方法"); base.HandleModelShutdown(model, reason); } public override void HandleBasicConsumeOk(string consumerTag) { _logger.Debug($"进入MessageReceiver.HandleBasicConsumeOk方法 consumerTag:{consumerTag}"); base.HandleBasicConsumeOk(consumerTag); } /// <summary> /// 删除队列 会进入 /// </summary> /// <param name="consumerTag"></param> public override void HandleBasicCancel(string consumerTag) { _logger.Debug($"进入MessageReceiver.HandleBasicCancel方法 consumerTag:{consumerTag}"); base.HandleBasicCancel(consumerTag); } }
上面有引用到的_logger和_action先不用管,重点是_channel.BasicReject方法和_channel.BasicAck方法。他们分别是代表拒绝消费和确认消费。
上面的代码是消费者消费数据后给予了Broker明确的回执,我们试下将回执代码注释掉后看下队列里的消息变成什么样子了。 先删除掉交换器和队列,然后再发布数据,看看消费数据后不回执的消息状态~
这是发布消息到队列后,Ready状态的消息为1条
接下来,我们去消费数据但是不进行回执确认,看看结果如何
如上图,还是那条数据,状态从Ready变成了Unacked,这时候是因为我的消费端应用还没关闭,AMQP的链接也还在。我们到任务管理器内将消费应用关闭~
关闭后又变成了Ready, 意味着我们再次开启消费端程序又可以从队列获取到之前的消息了~
我们将上面的回执代码部分注释取消,看看回执成功后队列内的消息状态是什么样?
可以看到回执确认后,我们的消息就从队列里移除了~
消息消费确认机制结论
1、自动确认虽然省代码但是可能会出现消息丢失业务未处理完毕的情况;
2、手动确认消息则是在获取到消息后,在没有返回回执前,消息会一致存储在队列
本文对应的代码已上传至Github,地址:https://github.com/QQ897878763/RabbitMQ_Sample
程序的运行截图如下:
加载全部内容