基于RabbitMQ的Rpc框架
珞樱树下 人气:0参考文档:https://www.cnblogs.com/ericli-ericli/p/5917018.html
MQ的使用场景大概包括解耦,提高峰值处理能力,送达和排序保证,缓冲等。
MQ概述
消息队列技术是分布式应用间交换信息的一种技术。
消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走。
通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。
MQ主要作用是接收和转发消息。你可以想想在生活中的一种场景:当你把信件的投进邮筒,邮递员肯定最终会将信件送给收件人。我们可以把MQ比作 邮局和邮递员。
MQ和邮局的主要区别是,它不处理消息,但是,它会接受数据、存储消息数据、转发消息。
RabbitMQ术语
生产者:
消息发送者,在MQ中被称为生产者(producer),一个发送消息的应用也被叫做生产者,用P表示
消费者:
生产者“生产”出消息后,最终由谁消费呢?等待接受消息的应用程序,我们称之为消费者(Consuming ),用C表示
队列:
消息只能存储在队列(queue )中。尽管消息在rabbitMQ和应用程序间流通,但是队列却是存在于RabbitMQ内部。
一个队列不受任何限制,它可以存储你想要存储的消息量,它本质上是一个无限的缓冲区。
多个生产者可以向同一个队列发送消息,多个消费者可以尝试从同一个消息队列中接收数据。
一个队列像下面这样(上面是它的队列名称)
注意:
生产者、消费者、中间件不必在一台机器上,实际应用中也是绝大多数不在一起的。我们可以用一张图表示RabbitMQ的构造:
注:此图片摘自于百度百科RabbitMQ。
RabbitMQ 实现RPC
(RPC) Remote Procedure Call Protocol 远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。
在一个大型的公司,系统由大大小小的服务构成,不同的团队维护不同的代码,部署在不同的机器。但是在做开发时候往往要用到其它团队的方法,因为已经有了实现。但是这些服务部署不同的机器上,想要调用就需要网络通信,这些代码繁琐且复杂,一不小心就会写的很低效。RPC协议定义了规划,其它的公司都给出了不同的实现。比如微软的wcf,以及现在火热的WebApi。
在RabbitMQ中RPC的实现也是很简单高效的,现在我们的客户端、服务端都是消息发布者与消息接收者。
首先客户端通过RPC向服务端发出请求
我这里有一堆东西需要你给我处理一下,correlation_id:这是我的请求标识,erply_to:你处理完过后把结果返回到这个队列中。
服务端拿到了请求,开始处理并返回
correlation_id:这是你的请求标识 ,原封不动的给你。 这时候客户端用自己的correlation_id与服务端返回的id进行对比。是我的,就接收。
在我们发布消息的时候,会调用channel对象的BasicPublish
方法,这个方法中有一个IBasicProperties
的参数basicProperties
。
在这对象中,有一个ReplyTo
属性,我们可以将生产者监听的消息队列名称存放在里面。当消费者程序接收到这条消息的时候,就可以在Receive事件的ea对象中获取ReplyTo属性的值
var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes);
那么当消息生产者接收到消息消费者任务完成的消息之后,该如何确定完的是哪一个任务呢?
在现实情况,消息生产者通常会发出多个任务,多个消息消费者分别进行不同的任务,这时候我们就需要知道是哪个消息消费者完成了任务。
当消息生产者调用channel对象的BasicPublish
方法发送消息时,IBasicProperties
对象除了可以帮助我们传递消息生产者监听的消息队列名,还可以帮我们传递一个CorrelationId
(相关Id),当发送任务消息的时候,我们给每个任务消息定义一个唯一的相关Id, 并存储在IBasicProperties
对象的CorrelationId
属性中。
var properties = channel.CreateBasicProperties(); properties.ReplyTo = replyQueueName; properties.CorrelationId = Guid.NewGuid().ToString();
这样消息消费者在接收到任务消息时,可以从Receive的ea参数中获取CorrelationId
。当任务完成时,再将保存有这个CorrelationId
的任务完成消息发送到消息生产者关注的消息队列中, 消息生产者就可以知道是哪个任务完成了
一些繁琐的细节rabbitmq已经为我们封装了,简单的SimpleRpcServer与SimpleRpcClient让Rpc实现的更为方便。
开发指南:RabbitMQ .NET/C# Client API Guide
API文档:https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v3.2.2/rabbitmq-dotnet-client-3.2.2-client-htmldoc/html/index.html
Client
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { SimpleRpcClient client = new SimpleRpcClient(channel, new PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName: string.Empty, routingKey: "RpcQueue")); var prop = channel.CreateBasicProperties(); prop.CorrelationId = Guid.NewGuid().ToString(); IBasicProperties outProp; var msg = client.Call(prop, Encoding.UTF8.GetBytes(args[0]), out outProp); if (prop.CorrelationId == outProp.CorrelationId) { Console.WriteLine($"Task {prop.CorrelationId} completed."); Console.WriteLine(Encoding.UTF8.GetString(msg)); } } } }
Server
- 创建MySimpleRpcServer类,继承自SimpleRpcServer类
- HandleSimpleCall方法里添加回调返回值
- ProcessRequest方法为任务处理方法
- 使用server.MainLoop() 启动服务
public class MySimpleRpcServer : SimpleRpcServer { public MySimpleRpcServer(Subscription subscription) : base(subscription) { } public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties) { replyProperties = null; return Encoding.UTF8.GetBytes($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} Task {requestProperties.CorrelationId} Completed."); } /// <summary> /// 进行处理 /// </summary> /// <param name="evt"></param> public override void ProcessRequest(BasicDeliverEventArgs evt) { Console.WriteLine("[x] Received {0}", Encoding.UTF8.GetString(evt.Body)); Thread.Sleep(4000); base.ProcessRequest(evt); } }
Program.cs
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare("RpcQueue", true, false, false, null); SimpleRpcServer rpc = new MySimpleRpcServer(new Subscription(channel, "RpcQueue")); rpc.MainLoop(); Console.ReadKey(); } } }
参考文档:RabbitMQ学习笔记(六) RPC 含手工实现RabbitMQ的RPC、使用SimpleRpcClient类和SimpleRpcServer类实现RPC的简单示例。
简易RPC框架-心跳与重连机制
参考文档:简易RPC框架-心跳与重连机制
心跳
就是告诉其它人自己还活着。在简易RPC框架中,采用的是TCP长连接,为了确保长连接有效,就需要客户端与服务端之间有一种通知机制告知对方的存活状态。
如何实现
客户端发送心跳消息
在状态空闲的时候定时给服务端发送消息类型为PING消息。
服务端接收心跳消息
捕获通道空闲状态事件,如果接收客户端PING消息,则发送PONG消息给服务端。如果在一定时间内没有收到客户端的PING消息,则说明客户端已经不在线,此时关闭通道。
客户端管理可用连接
由于服务端会因为长时间接收不到服务端的PING消息而关闭通道,这就导致缓存在客户端的连接的可用性发生变化。需要将不可用的从可用列表中转移出去,并对不可用连接进行处理,比如直接丢弃或者是重新连接。
预备知识
ChannelPipeline与handle的关系。netty中的这些handle和spring mvc中的filter作用是类似的,ChannelPipeline可以理解成handle的容器,里面可以被注册众多处理不同业务功能的事件处理器,比如:
- 编码
- 解码
- 心跳
- 权限
- 加密
- 解密
- 业务代码执行
- ......
具体实现
空闲状态处理器
可以利用netty提供的IdleStateHandler来发送PING-PONG消息。这个处理器主要是捕获通道超时事件,主要有三类
- 读超时,一定时间内没有从通道内读取到任何数据
- 写超时,一定时间内没有从通道内写入任何数据
- 读写超时,一定时间内没有从通道内读取或者是写入任何数据
客户端加入空闲状态处理器
客户端捕获读写超时,如果事件触发就给服务端发送PING消息。
服务端加入空闲状态处理器
服务端只需要捕获读超时即可,当读超时触发后就关闭通道。
为什么在空闲状态才发送心跳消息
在正常客户端与服务端有交互的情况下,说明双方都在正常工作不需要额外的心跳来告知对方的存活。只有双方在一定时间内没有接收到对方的消息时才开始采用心跳消息来探测对方的存活,这也是一种提升效率的做法。
抽象心跳处理器
创建AbstractHeartbeatHandler,并继承ChannelInboundHandlerAdapter,服务于客户端与服务端的心跳处理器。在读取方法中判断消息类型:
- 如果是PING消息就发送PONG消息给客户端
- 如果收到的是PONG消息,则直接打印消息说明客户端已经成功接收到服务端返回的PONG消息
- 如果是其它类型的消息,则通知下一个处理器处理消息
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception { if(!(msg instanceof RpcMessage)){ channelHandlerContext.fireChannelRead(msg); return; } RpcMessage message=(RpcMessage)msg; if(null==message||null==message.getMessageHeader()){ channelHandlerContext.fireChannelRead(msg); return; } if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){ logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody()); } else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){ this.sendPong(channelHandlerContext); } else { channelHandlerContext.fireChannelRead(msg); } }
空闲状态事件,可以根据不同的状态做不同的行为处理,定义三个可重写事件供客户端与服务端处理器具体确认处理事件。
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case READER_IDLE: this.handleReaderIdle(ctx); break; case WRITER_IDLE: this.handleWriterIdle(ctx); break; case ALL_IDLE: this.handleAllIdle(ctx); break; default: break; } } }
客户端心跳处理器
继承抽象心跳处理器,并重写事件发送PING消息。
public class ClientHeartbeatHandler extends AbstractHeartbeatHandler { @Override protected void handleAllIdle(ChannelHandlerContext ctx) { this.sendPing(ctx); } }
服务端心跳处理器
继承抽象心跳处理器,并重写事件关闭通道。
public class ServerHeartbeatHandler extends AbstractHeartbeatHandler { @Override protected void handleReaderIdle(ChannelHandlerContext ctx) { logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel"); ctx.close(); } }
客户端ChannelPipeline中加入心跳处理器
比如5秒内未写入或者读取通道数据就触发超时事件。
.addLast(new IdleStateHandler(0, 0, Constants.ALLIDLE_TIME_SECONDS));
服务端ChannelPipeline中加入心跳处理器
比如10秒未接收到通道消息就触发读超时事件。
.addLast(new IdleStateHandler(Constants.READER_TIME_SECONDS, 0, 0))
客户端消息示例
正常情况下心跳消息显示如下图所示,消息的内容可以根据自己的情况自行定义。
客户端下线消息示例
停止客户端程序,然后服务端读超时事件触发,并关闭通道。
客户端可用连接管理
由于上述的服务端心跳处理器,在触发读超时后会关闭通信管道,这导致客户端缓存的连接状态会出现不可用的情况,为了让客户端一直只能取到可用连接就需要对从缓存中获取到的连接做状态判断,如果可用直接返回,如果不可用则将连接从可用列表中删除然后取下一个可用连接。
修改获取连接方法
通过channel的isActive属性可以判断连接是否可用,如果不可以做删除并重新获取的操作。
public RpcClientInvoker getInvoker() { // ... int index = loadbalanceService.index(size); RpcClientInvoker invoker= RpcClientInvokerCache.get(index); if(invoker.getChannel().isActive()) { return invoker; } else { RpcClientInvokerCache.removeHandler(invoker); logger.info("invoker is not active,so remove it and get next one"); return this.getInvoker(); } }
后台启动任务处理不可用连接
启动一个每隔5秒执行一次任务的线程,定时取出不可用连接,然后重连,并将不可用连接删除。
这里我处理的重连是直接丢弃原有不可用连接,然后重新创建新连接。
private static final Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.class); static { executorService.schedule(new Runnable() { @Override public void run() { while (true) { List<RpcClientInvoker> notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers(); if (!CollectionUtils.isEmpty(notConnectedHandlers)) { for (RpcClientInvoker invoker : notConnectedHandlers) { RpcClientInvokerManager.getInstance(referenceConfig).connect(); } RpcClientInvokerCache.clearNotConnectedHandler(); } } } }, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS); }
本文源码
https://github.com/jiangmin168168/jim-framework
其他资料:
Docker安装RabbitMQ,RabbitMQ Management使用
加载全部内容