.NET Core 使用RabbitMQ
王瘦子Kawhi 人气:0RabbitMQ简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。
RabbitMQ安装
请参考我的第一篇博客:
安装完成之后访问Web控制台
http://服务器ip:15672/
注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。guest仅限localhost访问,外网无法使用此账号!
.NET Core 使用RabbitMQ
通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/
定义生产者.
本文的代码生产者是基础的消息队列生产者,源代码请看我的开源项目 UWl.Admin.Core
public class RabbitServer: IRabbitMQ { private IConnection connection; private ConnectionFactory connectionFactory; public RabbitServer() { try { connectionFactory = new ConnectionFactory() { UserName = Appsettings.app(new string[] { "RabbitMQConfig", "UserName" }), Password = Appsettings.app(new string[] { "RabbitMQConfig", "Password" }), HostName = Appsettings.app(new string[] { "RabbitMQConfig", "HostName" }), AutomaticRecoveryEnabled= Convert.ToBoolean(Appsettings.app(new string[] { "RabbitMQConfig", "AutomaticRecoveryEnabled" })), TopologyRecoveryEnabled= Convert.ToBoolean(Appsettings.app(new string[] { "RabbitMQConfig", "TopologyRecoveryEnabled" })), }; } catch (Exception) { throw; } } public IConnection GetConnection() { return this.connectionFactory.CreateConnection(); } /// <summary> /// RabbitMQ指定队列名称模式发送消息 /// </summary> /// <param name="queuename">队列名字</param> /// <param name="obj">传输数据</param> public void SendData(string queuename, object obj) { connection = GetConnection(); if (obj == null) return; if (connection == null) return; if (queuename.IsNullOrEmpty()) return; using (connection) { using (var channel= connection.CreateModel()) { //声明一个队列 //队列模式 一共有四种 channel.QueueDeclare(queuename, false, false, false, null); //第一个参数:预计大小,第二个参数每次读取几个,第三个参数是否本地 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //交付模式 var prop = channel.CreateBasicProperties(); // 非持久性(1)或持久性(2)。 prop.DeliveryMode = 2; //将对象转化为json字符串 var json = JsonConvert.SerializeObject(obj); //将字符串转换为二进制 var bytes= Encoding.UTF8.GetBytes(json); //开始传送 channel.BasicPublish("", queuename, prop,bytes); } } } }
定义消费者.
消费者我是使用.Net Core控制台程序来写的源代码放到了百度网盘请自行下载 RebbitMQDemo 链接: http://pan.baidu.com/s/1n9CaSiAuB9t63Fh_YIU78A 提取码: 3939
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "wzw",
Password = "wzw",
HostName = "localhost"
};
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//接收到的消息处理事件
EventingBasicConsumer Recipient = new EventingBasicConsumer(channel);
Recipient.Received += (ch, ea) =>
{
var RecipientMsg = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine($"后台处理方法收到消息:{RecipientMsg}");
//确认该消息已被处理
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"消息已经处理【{ea.DeliveryTag}】");
};
channel.BasicConsume("hello", false, Recipient);
Console.WriteLine("后台处理方法已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
RabbitMQ消费失败的处理
RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。
使用RabbitMQ的Exchange
前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)
AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。
加载全部内容