亲宝软件园·资讯

展开

.NET Core 使用RabbitMQ

王瘦子Kawhi 人气:0

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

RabbitMQ安装

请参考我的第一篇博客:

安装完成之后访问Web控制台

http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。guest仅限localhost访问,外网无法使用此账号!

 

 .NET Core 使用RabbitMQ

通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

定义生产者.

 

 本文的代码生产者是基础的消息队列生产者,源代码请看我的开源项目 UWl.Admin.Core

public class RabbitServer: IRabbitMQ
    {
        private IConnection connection;
        private ConnectionFactory connectionFactory;
        public RabbitServer()
        {
            try
            {
                connectionFactory = new ConnectionFactory()
                {
                    UserName = Appsettings.app(new string[] { "RabbitMQConfig", "UserName" }),
                    Password = Appsettings.app(new string[] { "RabbitMQConfig", "Password" }),
                    HostName = Appsettings.app(new string[] { "RabbitMQConfig", "HostName" }),
                    AutomaticRecoveryEnabled= Convert.ToBoolean(Appsettings.app(new string[] { "RabbitMQConfig", "AutomaticRecoveryEnabled" })),
                    TopologyRecoveryEnabled= Convert.ToBoolean(Appsettings.app(new string[] { "RabbitMQConfig", "TopologyRecoveryEnabled" })),
                };
            }
            catch (Exception)
            {
                throw;
            }
        }

        public IConnection GetConnection()
        {
            return this.connectionFactory.CreateConnection();
        }
        /// <summary>
        /// RabbitMQ指定队列名称模式发送消息
        /// </summary>
        /// <param name="queuename">队列名字</param>
        /// <param name="obj">传输数据</param>
        public void SendData(string queuename, object obj)
        {
            connection = GetConnection();
            if (obj == null)
                return;
            if (connection == null)
                return;
            if (queuename.IsNullOrEmpty())
                return;
            using (connection)
            {
                using (var channel= connection.CreateModel())
                {
                    //声明一个队列    //队列模式   一共有四种
                    channel.QueueDeclare(queuename, false, false, false, null);
                    //第一个参数:预计大小,第二个参数每次读取几个,第三个参数是否本地
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //交付模式
                    var prop = channel.CreateBasicProperties();
                    // 非持久性(1)或持久性(2)。
                    prop.DeliveryMode = 2;
                    //将对象转化为json字符串
                    var json = JsonConvert.SerializeObject(obj);
                    //将字符串转换为二进制
                    var bytes= Encoding.UTF8.GetBytes(json);
                    //开始传送
                    channel.BasicPublish("", queuename, prop,bytes);
                }
            }
        }
    }
定义消费者.

消费者我是使用.Net Core控制台程序来写的源代码放到了百度网盘请自行下载 RebbitMQDemo 链接: http://pan.baidu.com/s/1n9CaSiAuB9t63Fh_YIU78A 提取码: 3939
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "wzw",
                Password = "wzw",
                HostName = "localhost"
            };
            //创建连接
            var connection = factory.CreateConnection();
            //创建通道
            var channel = connection.CreateModel();

            //接收到的消息处理事件
            EventingBasicConsumer Recipient = new EventingBasicConsumer(channel);
            Recipient.Received += (ch, ea) =>
            {
                var RecipientMsg = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine($"后台处理方法收到消息:{RecipientMsg}");
                //确认该消息已被处理
                channel.BasicAck(ea.DeliveryTag, false);
                Console.WriteLine($"消息已经处理【{ea.DeliveryTag}】");
            };
            channel.BasicConsume("hello", false, Recipient);
            Console.WriteLine("后台处理方法已启动");
            Console.ReadKey();
            channel.Dispose();
            connection.Close();

RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

使用RabbitMQ的Exchange

前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

加载全部内容

相关教程
猜你喜欢
用户评论