亲宝软件园·资讯

展开

Netty是如何处理新连接接入事件的?

dashuai的博客 人气:3

 

更多技术分享可关注我

前言

前面的分析从Netty服务端启动过程入手,一路走到了Netty的心脏——NioEventLoop,又总结了Netty的异步API和设计原理,现在回到Netty服务端本身,看看服务端对客户端新连接接入的处理是怎么样的过程。

原文:​Netty是如何处理新连接接入事件的?

Java NIO处理新连接的编码模板

首先,对于新连接接入,从NIO层面有一个宏观的印象:

1、通过I/O多路复用器——Selector检测客户端新连接

对应到Netty,新连接通过服务端的NioServerSocketChannel(底层封装的JDK的ServerSocketChannel)绑定的I/O多路复用器(由NioEventLoop线程驱动)轮询OP_ACCEPT(=16)事件

2、轮询到新连接,就创建客户端的Channel

对应到Netty就是NioSocketChannel(底层封装JDK的SocketChannel)

3、为新连接分配绑定新的Selector

对应到Netty,就是通过线程选择器,从它的第二个线程池——worker线程池中挑选一个NIO线,在这个线程中去执行将JDK的SocketChannel注册到新的Selector的流程,将Netty封装的NioSocketChannel作为附加对象也绑定到该Selector

4、向客户端Channel绑定的Selector注册I/O读、或者写事件

对应到Netty,就是默认注册读事件,因为Netty的设计理念是读优先。以后本条Channel的读写事件就由worker线程池中的NIO线程管理

以上4步,其实就是对下面一段JDK NIO demo的抽象和封装,并解决了一些bug的过程,如下:

接下来的几篇文章会逐步拆解每个步骤,并学习Netty的设计思路。

简单复习Netty的多线程Reactor架构

前面分析过NioEventLoopGroup和线程池对应,NioEventLoop实例和NIO线程对应,一个EventLoop实例将由一个永远都不会改变的Thread驱动其内部的run方法(和Runnable的run不是一个)。

简单说,Netty服务端创建的boss和worker就是两个线程池,对于一个服务器的端口,bossGroup里只会启动一个NIO线程用来处理该端口上的客户端新连接的检测和接入流程。

具体的说,Netty会在服务端的Channel的pipeline上,默认创建一个新连接接入的handler,只用于服务端接入客户端新连接,而workerGroup里有多个NIO线程(默认2倍的CPU核数个),负责已建立的Channel上的读写事件的检测、注册或者处理,等操作。当boss线程池的那一个NIO线程检测到新连接后就可以稍做休息(或者继续检测处理新连接),此时worker线程池就开始忙碌,如下图所示:

细节回顾可以参考:Netty的线程调度模型分析(1)

下面开始总结,boss线程和worker线程池之间是如何配合的。

再看JDK的select方法

在总结之前,个人认为有必要先回顾JDK的select,必须正确理解I/O多路复用器——Selector上所谓的轮询一次,返回就绪的Channel数目的真正意义,即这个过程有一个前提是自从上次select后开始计算的。这样干巴巴的解释可能不太清楚,下面举个例子,比如有两个已经建立的Channel,分别是A和B,而且A和B分别注册到了一个Selector上,接着在该Selector调用select():

  • 第一次调用select(),发现只有A有I/O事件就绪,select会立即返回1,然后处理之

  • 第二次调用select(),发现另一个通道B也有I/O事件就绪,此时select()还是返回1——即是自上次select后开始计算的

还有一点注意:如果第一次轮询后,对A没有做任何操作,那么就有两个就绪的Channel。

另外还要知道,select返回后可通过其返回值判断有没有Channel就绪,如果有就绪的Channel,那么可以使用selectedKeys()方法拿到就绪的Channel及其一些属性。下面看selectedKeys()的使用:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

当给Selector注册Channel时,调用的register()方法会返回一个SelectionKey对象,这个对象代表了注册到该Selector的Channel,可以遍历这个集合来访问就绪的通道。

以上,前面的线程调度模型都分析过,回忆这个图:

细节回顾可以参考:

Netty的线程调度模型分析(2)

Netty的线程调度模型分析(3)

Netty处理新连接接入事件的源码分析

前面文章总结了NioEventLoopGroup实例化时,如果外部没有配置,那么会默认创建一个线程执行器——ThreadPerTaskExcutor,一个NioEventLoop组成的数组(线程池),还有一个线程选择器——chooser。

又知道当实例化NioEventLoop并填充底层线程数组时,Netty会为每个NioEventLoop创建并绑定一个I/O多路复用器——Selector和一个异步任务队列——MPSCQ,接下来又总结了Netty的NioEventLoop线程启动的触发时机有两个:

  • 宏观上,服务端绑定端口时会触发boss线程池里的一个NIO线程启动,即用户代码调用bind方法。如果深入bind方法内部,那么会发现NIO线程第一次启动的精确时机是为JDK的ServerSocketChannel注册I/O多路复用器的时候——Netty会封装这个注册逻辑为一个异步task,使用NIO线程驱动,如果没有启动,那么就启动之,以后的Channel绑定端口的逻辑也会被封装为异步task,复用已经启动的这个NIO线程

  • 新连接接入时会触发worker线程池里的NIO线程启动。线程池的线程选择器会为新连接绑定一个worker里的NIO线程,第一次接入或者线程池的线程还没完全启动完毕,就会顺势启动

总之,Netty服务端启动后,服务端的Channel已经绑定到了boss线程池的NIO线程中,并不断检测是否有OP_ACCEPT事件发生,直到检测出有该事件发生就处理之,即boss线程池里的NioEventLoop线程只做了两件事:

1、轮询OP_ACCEPT事件

2、检测到OP_ACCEPT事件后就处理该事件,处理过程其实就是客户端Channel(新连接)接入的过程

下面继续回顾NioEventLoo线程的事件循环的核心方法——run,它在NIO线程启动时开始运行:

在这之前,先在run方法打断点:然后启动实验用的最小版Netty服务端的demo,之后分别在三个客户端使用telnet命令对其顺序发送3个请求,模拟客户端3个新连接接入的过程,下面进入run跟踪源码:

 

1、首先调用Netty封装的select方法,前面分析过当有客户端新连接接入,即代表已经触发了OP_ACCEPT事件,Selector的select方法会立即返回1,如下:

这里要理解JDK的select方法返回值到底是什么。select()方法会返回注册的interest的I/O事件已经就绪的那些通道的数目,抠字眼,首先得看是哪些Channel注册在了当前I/O多路复用器上,其次,看这些Channel上注册的interest的I/O事件是否就绪,如上代码的局部变量selectedKeys==1,但是我实验的客户端连接是3个,这里可能会有疑问,selectedKeys为何不是3呢?

因为当前绑定在boss线程上的I/O多路复用器只注册了服务端的Channel,即底层只有一个ServerSocketChannel,且当前注册的interest的I/O事件只有OP_ACCEPT,故无论多少个新连接接入,这里都只会返回1。

还有一个误区:不要认为Selector的select返回值是已准备就绪的Channel的总数,其实它返回的是从上一个select()调用后进入就绪状态的Channel的数量。

继续分析:轮询出有感兴趣的I/O事件就绪的Channel后,会break循环,回到外部的run方法,开始处理这个I/O事件,这里就是处理新连接的接入事件,核心方法之前也分析过,就是processSelectedKeys:

在详细的细节可以参考:

Netty的线程调度模型分析(7)

Netty的线程调度模型分析(8)

这个方法有两个变体,前面文章也分析过原因,我选择有代表性的processSelectedKeysOptimized,看里面的processSelectedKey(key,channel)方法,这才真正到了Netty处理I/O事件的方法入口,如下:

 如下是processSelectedKey方法的实现:

首先看黄色1处,取出ServerSocketChannel的unsafe对象,前面也总结过,Netty封装的Channel的底层都会有一个Unsafe对象与之绑定,Unsafe是个内部接口,聚合在Channel接口内部,作用是协助Channel进行网络I/O的操作,因为它的设计初衷就是Channel的内部辅助类,不应该被Netty的使用者调用,所以被命名为Unsafe,而不是说这个类的API都是不安全的。

继续执行到黄色2处,会判断当前Channel是否打开,其实就是判断的ServerSocketChannel。一切顺利继续执行黄色3处,看到了熟悉的NIO API,下面专门看黄色3处后面的一堆代码:

在黄色3处,k内部的readyOps集合是该Channel已经准备就绪的I/O操作的集合,OP_ACCEPT这个宏是16,所以这里的readyOps变量为16。 

接着马上会执行到黄色4处的if判断逻辑,由于readyOps为16,这里通过判断,进入if内部,执行黄色5处的代码。该处逻辑是一个read操作,很好理解。当NioEventLoop的run方法里轮询到ServerSocketChannel的accept事件后,服务端第一步就是对其执行读操作,这是很自然的想法。因为这是服务端,所以下面会进入到NioMessageUnsafe实例的read方法:

在黄色1处,首先保证是NioEventLoop线程在执行,如果是外部线程执行的,那么无效。接下来,会获取服务端Channel的Config和默认创建的服务端Channel的pipeline。在黄色2处有一个RecvByteBufAllocator.Handle allocHandle变量,它获取了RecvByteBuf分配器Handle,顾名思义就是设置接收的缓冲区大小,简单说是通过二分算法获取一个不会浪费空间,但是又足够大小的缓冲区,是一种性能优化的策略,以后分析Netty内存图像时在深入。

接着在黄色2处的下一行是一个重置配置的方法,目的是重置已累积的所有计数器,并为下一个读取循环读取多少消息/字节数据提供建议。Netty默认一次读取16个新连接,如下:

然后继续看NioMessageUnsafe实例的read方法,在黄色3处,进入一个do-while循环:

 

首先调用doReadMessages方法,在do—while循环中读取一个个的客户端新连接,并将读取到的新连接用readBuf这个集合存储,readBuf就是NioMessageUnsafe类内部的一个普通的ArrayList。

下面进入doReadMessages方法,如下该方法内部逻辑似曾相识。

首先,在黄色1处封装了JDK的NIO API,即获取客户端的socket——NIO对应的是SocketChannel,完成该操作意味着TCP/IP协议栈完成了TCP的三次握手,TCP的逻辑链路正式建立,然后,在黄色2处,Netty将客户端Channel封装为自己的客户端channel——NioSocketChannel。因为这里明确了是服务端在处理accept事件,故不需要反射创建NioSocketChannel,直接实例化即可,后续在详细分析Netty的客户端channel创建过程。最后,封装的Channel保存到readBuf这个ArrayList中,doReadMessages方法返回1。 

回到上层的do-while循环:

doReadMessages返回的localRead==1,说明本次读取新连接成功,do-while的一次循环读新连接完毕,会继续读下一个新连接,直到全部读完,或者达到阈值。也就是说Netty在读取新连接时也权衡了性能,如果连接太多,那么Netty不会一直卡在这里处理,它默认do-while循环处理16个,这个逻辑在黄色5处的判断条件里,超过阈值就退出do-while。

下面看黄色5处的判断逻辑——即continueReading()方法,简单看下:

Netty设计理念是读优先,会给服务端Channel自动注册OP_READ事件——也就是isAutoRead()方法会返回true,那个maxMessagePerRead默认配置的是16,即每一次集中处理accept事件时,最多读取的连接数为16个,是权衡了性能而设计的,这个可以由用户配置。

继续回看NioMessageUnsafe实例的read方法,如果有新连接,那么继续do-while循环,直到发生异常,或者读取的新连接数量达到了阈值,或者已经没有新连接可读,doReadMessages返回0,退出do-while循环。这里说明一下,正常情况doReadMessages里的accept一定不会阻塞,因为只有当Channel里有就绪的I/O事件,换句话说,有数据可以读,才会进入accept环节,本质是因为Netty服务端为NIO模型配置的是非阻塞I/O,即Netty会自动对各个Channel有如下的配置:

而且,如果服务端Channel有就绪的I/O事件,那么accept()一定会返回客户端Channel,除非实例化Netty的客户端Channel——NioSocketChannel时出现异常。

如果doReadMessages返回0,那么就会break出do-while循环,接下来大动脉——Netty的pipeline就该干活了,如下NioMessageUnsafe实例的read方法的后面的源码:

在黄色6处,遍历保存客户端新Channel的集合——readBuf,然后将每个新连接传播出去——调用pipeline.fireChannelRead(),将每条新连接沿着服务端Channel的pipeline传递,交给Channel后续的入站handler,而黄色7处,会传播一个读操作完成的事件——fireChannelReadComplete();后续会逐渐的拆解并详细分析pipeline的设计,这里知道即可。

至此,Netty服务端检测处理客户端新连接的过程分析完毕。

做个小结

1、权衡性能,NIO线程一次处理的新连接不能太多,Netty默认是一次最多处理16个

2、Netty的pipeline机制和读取新连接后的衔接过程——触发和传递

3、Selector的select返回值的理解

4、深刻理解同步非阻塞,即NIO模式下,accept方法为什么不会阻塞

加载全部内容

相关教程
猜你喜欢
用户评论