.NETCore RabbitMQ
我赢了算我输 人气:0前言
此文章用来记录自己学习延时队列过程的文章,并用.NET这两种方式实现了简单的Demo。
延时队列的应用场景 应用下单后,30分钟没有支付的话,则自动取消订单活动开始前30分钟,提醒参赛者参加活动。活动结束后,30分钟后提醒未进行评价的参赛人员进行评价…
上述的场景都可以使用延时队列进行对应的处理。
上面的场景虽说可以通过定时器也可以处理,但有点浪费资源, 而上述的场景时间是不定的,例如有两个活动需要提醒参赛者参加,一个是7点开始 ,另一个是8点开始,那么触发处理的一个是6点半,一个是7点半。
实现延时队列的两种方式
使用Rabbitmq实现延时队列可以让消息持久化,也支持分布式
缺点 | |
---|---|
第一种 | 第一种方式的缺陷以及解决方案 |
第二种 | 这个插件的当前设计并不真正适合具有大量延迟消息(例如成百上千或数百万)的场景。详情信息 |
利用rabbitmq死信队列x-dead-letter-exchange和x-dead-letter-routing-key
实现需要创建两对交换机和队列,其中需要对其中一对的队列进行设置x-dead-letter-exchange和x-dead-letter-routing-key属性,属性指定转发到另一对的交换机,
随后实现流程图如下:
.NETCore实现方式
项目:.NET Core 控制台项目
install-package RabbitMQ.Client
生产者代码:
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //创建连接 var connection = connectionFactory.CreateConnection(); //创建通道 var channl = connection.CreateModel(); //指定队列的x-dead-letter-exchange和x-dead-letter-routing-key Dictionary<string, object> queueArgs = new Dictionary<string, object>() { { "x-dead-letter-exchange","exchange.business.test" }, {"x-dead-letter-routing-key","businessRoutingkey" } }; //延时的交换机和队列绑定 channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null); channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs); channl.QueueBind("queue.business.dlx", "exchange.business.dlx", ""); //业务的交换机和队列绑定 channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null); channl.QueueDeclare("queue.business.test", true, false, false, null); channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null); Console.WriteLine("生产者开始发送消息"); while (true) { string message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); var properties = channl.CreateBasicProperties(); properties.Persistent = true; properties.Expiration = "5000"; //发送一条延时5秒的消息 channl.BasicPublish("exchange.business.dlx", "", properties, body); }
消费者
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //创建连接 var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //给消费时添加一个委托 consumer.Received += (obj, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); //打印消费的消息 Console.WriteLine(message); channel.BasicAck(ea.DeliveryTag, false); }; //消费queue.business.test队列的消息 channel.BasicConsume("queue.business.test", false, consumer); Console.ReadKey(); channel.Dispose(); connection.Close();
实现效果:
rabbitmq通过安装插件的形式实现(推荐)
使用rabbitmq_delayed_message_exchange
插件提供的x-delayed-message
类型的交换机
下载插件的地址:https://www.rabbitmq.com/community-plugins.html
选中rabbitmq_delayed_message_exchange插件
该插件使用只需要声明交换机的时候,指定x-delayed-message
类型,然后添加x-delayed-type
参数即可
.NET Core 实现
生产者
ConnectionFactory connectionFactory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); Dictionary<string, object> exchangeArgs = new Dictionary<string, object>() { {"x-delayed-type","direct" } }; //指定x-delayed-message 类型的交换机,并且添加x-delayed-type属性 channel.ExchangeDeclare("plug.delay.exchange", "x-delayed-message", true, false, exchangeArgs); channel.QueueDeclare("plug.delay.queue", true, false, false, null); channel.QueueBind("plug.delay.queue", "plug.delay.exchange", "plugdelay"); var properties = channel.CreateBasicProperties(); Console.WriteLine("生产者开始发送消息"); Dictionary<string, object> headers = new Dictionary<string, object>() { {"x-delay","5000" } }; properties.Persistent = true; properties.Headers = headers; while (true) { string message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("plug.delay.exchange", "plugdelay", properties, body); }
消费者:
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //创建连接 var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (obj, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine(message); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume("plug.delay.queue", false, consumer); Console.ReadKey(); channel.Dispose(); connection.Close();
实现效果:
第一种方式的缺陷以及解决方案
如果存在A、B消息进入了队列中,A在前,B在后,如果B消息的过期时间比A的过期时间要早,消费的时候,并不会先消费B,再消费A,而是B会等A先消费,即使A要晚过期
举例
生产者代码修改成如下:
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //创建连接 var connection = connectionFactory.CreateConnection(); //创建通道 var channl = connection.CreateModel(); Dictionary<string, object> queueArgs = new Dictionary<string, object>() { { "x-dead-letter-exchange","exchange.business.test" }, {"x-dead-letter-routing-key","businessRoutingkey" } }; //延时的交换机和队列绑定 channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null); channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs); channl.QueueBind("queue.business.dlx", "exchange.business.dlx", ""); //业务的交换机和队列绑定 channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null); channl.QueueDeclare("queue.business.test", true, false, false, null); channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null); string message1 = "Hello Word!1"; string message2 = "Hello Word!2"; var body1 = Encoding.UTF8.GetBytes(message1); var body2 = Encoding.UTF8.GetBytes(message2); var properties = channl.CreateBasicProperties(); properties.Persistent = true; //先发送过期时间5秒的消息 properties.Expiration = "5000"; channl.BasicPublish("exchange.business.dlx", "", properties, body2); //再发送过期时间3秒的消息 properties.Expiration = "3000"; channl.BasicPublish("exchange.business.dlx", "", properties, body1);
结果:
这里先发了延时20秒的A消息,然后又发了延时10秒的B消息,但是最终结果并不是先消费了B消息,而是等A消息过期后,立刻再去消费B。
这个会影响什么业务呢?好比两个C、D活动,C活动开始时间是7点,D活动开始时间是5点,那么D活动提醒需要等到C活动提醒后,才会立刻提醒,这明显不符合我们的业务需求。
解决方案 每个活动都是单独的创建自己的交换机和队列使用第二种实现方式,即使用插件的形式。
第一种不太现实,因为如果活动多的话,则会创建很多的队列,而且只会使用一次。
业务上还是推荐使用插件的实现方式。
第二种方式的效果
github地址:
https://github.com/MDZZ3/RabbitmqDelay
加载全部内容