Netty实战源码解析NIO编程
Zhongger 人气:01 前言
很久之前就想写与Netty相关的博客了,但由于个人时间安排的问题一直拖到了现在,借助这个机会,重新温习Java高级编程的同时,也把Netty实战以及源码剖析分享给各位读者。
2 Netty是什么?
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server. 'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.
摘自官网,翻译过来就是:Netty是一个基于NIO的客户端-服务端框架,能过快速而简单地开发像客户端-服务端协议的网络应用。它极大地精简了 TCP 和 UDP 套接字服务器等网络编程。“快速而简单”并不意味着生成的应用程序会受到可维护性或性能问题的影响。Netty 是根据从许多协议(如 FTP、SMTP、HTTP 以及各种二进制和基于文本的遗留协议)的实现中获得的经验精心设计的。结果,Netty 成功地找到了一种方法,可以在不妥协的情况下实现易于开发、性能、稳定性和灵活性。
3 Java I/O模型简介
要说到网络通信,就离不开I/O模型,可以把I/O模型简单理解为使用什么通道进行数据的发送和接收。
Java共支持三种网络编程模型:BIO、NIO、AIO
- BIO,同步阻塞IO,服务器实现模式为一个连接一个线程,即客户端有一个请求连接服务器时,服务器就会启动一个线程进行处理,可见当有多个客户端发出请求时,服务器需要启动等量的线程,而且当客户端没有响应时,线程也必须一直等待,长期下来需要大量的线程且线程利用率低,会造成浪费。
- NIO,同步非阻塞,服务器用一个线程来处理多个请求,客户端发送的请求会注册到多路复用器(selector选择器)上,有I/O请求的客户端分配线程处理。
- AIO,异步非阻塞,AIO引入了异步通道的概念,采用Proactor模式,简化程序编写,有效的请求才启动线程,特点是要先由操作系统完成后才通知服务的程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。客户端发送的请求先交给操作系统处理,OS处理后再通知线程
Netty其实就是基于Java的NIO的。接下来,我们通过编写代码来体验一下这三种IO模型吧
3.1 BIO代码实现
package com.Zhongger; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author zhongmingyi * @date 2021/9/15 1:29 下午 */ public class BIOServer { public static void main(String[] args) throws IOException { ExecutorService threadPool = Executors.newCachedThreadPool(); ServerSocket serverSocket = new ServerSocket(8989); System.out.println("服务端已启动"); while (true) { Socket socket = serverSocket.accept(); threadPool.execute(new Runnable() { @Override public void run() { handle(socket); } }); } } public static void handle(Socket socket) { byte[] bytes = new byte[1024]; try { InputStream inputStream = socket.getInputStream(); int read = 0; while (true) { read = inputStream.read(bytes); if (read != -1) { System.out.println("客户端发送给服务端的数据:" + new String(bytes, 0, read)); } else { break; } } } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
上述代码中:
- 首先是服务器开启了一个ServerSocket,绑定在8989端口上,循环等待接受客户端的连接
- 当客户端连接到了服务器后,ServerSocket.accept方法可以获取到客户端的Socket
- 每当有一个客户端连接了服务器,线程池就会启动一个线程去处理Socket中的IO数据流,通过InputStream的read方法读取客户端发给服务器的数据,并输出打印;当InputStream没有数据了,最后将Socket关闭,该线程会回收到线程池中
可以看到,BIO模型里,服务器的实现模式为一个Socket连接对应一个线程。 BIO的知识点就介绍到这里,相信大家在【计算机网络】课程的学习中,肯定有接触过Socket编程,实现一个简易版的聊天工具。
4 Java NIO
4.1 基本介绍
JDK 1.4中的java.nio.*
包中引入新的Java I/O库,NIO其实有两种解释:
- New I/O:原因在于它相对于之前的I/O类库是新增的。
- Non-block I/O:由于之前老的I/O类库是阻塞I/O,New I/O类库的目标就是要让Java支持非阻塞I/O,所以,更多的人喜欢称之为非阻塞I/O。
NIO有三个核心组件:
- Buffer缓冲区 (相当于运载了货物的火车)
- Channel管道(相当于轨道,负责运输Buffer)
- Selector选择器(相当于车票,用来选择火车应该通过哪个Channel去运输)
NIO是面向块(缓冲区)的处理,数据读取到一个它稍后处理的缓冲区,需要时可以在缓冲区里前后移动,增加了在处理过程中的灵活性,使用NIO可以提供非阻塞式的高伸缩性网络。这使得一个线程可以在Buffer里有数据的时候去读取,没有可用数据时就可以去做其他事情,不会阻塞读;线程也可以写入一些数据到Buffer中,无需等待写入所有的数据,也可以去做别的事情,不会阻塞写。
4.2 三大核心组件的关系
三大核心组件的关系简单描述图如下:
如图所示:
- 每个Thread对应一个Selector,每个Selector对应多个Channel
- 每个Channel都会有一个对应的Buffer,Channel是双向的,可以返回底层操作系统的情况(比如Linux,通道就是双向的),这与BIO中流是单向的不同
- Selector切换到哪个Channel进行处理是由事件Event决定的
- Buffer是一个内存块,底层就是数组,可以写入数据,可以通过flip方法来切换成读取数据,Buffer也是双向的
4.3 Buffer缓冲区
Buffer:缓冲区本质上是一个可以读写数据的内存块,可以理解为一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能过跟踪和记录缓冲区的变化情况。Channel提供了从网络、文件读取数据的渠道,但读取或者写入数据都需要经过Buffer。
Buffer是一个顶层的抽象类,它的子类有多种实现,常用的子类如下:
- ByteBuffer:用于操作字节缓冲区
- CharBuffer:用于操作字符缓冲区
- ShortBuffer:用于操作短整型缓冲区
- IntBuffer:用于操作整型缓冲区
- LongBuffer:用于操作长整型缓冲区
- FloatBuffer:用于操作浮点型缓冲区
- DoubleBuffer:用于操作双精度浮点型缓冲区
上述缓冲区的管理方式基本上一致,都可以用类的allocate(int capacity)
方法去获取缓冲区对象。前面说到,Buffer是和数据打交道的载体,也就是读取缓冲区的数据或者把写数据到缓冲区中。所以,Buffer缓冲区的核心方法就是 put()方法和get()方法以及对应的重载方法、扩展方法等。
Buffer类中有以下四个属性:
// Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity;
- Capacity:Buffer缓冲区能够容纳的数据元素的最大数量,容量在缓冲区创建时被设定,中途无法被修改。Capacity也就规定了Buffer中底层的数组的大小
- Limit:Buffer缓冲区里的当前的终点,不能对缓冲区超过Limit的位置进行读写操作,Limit是可以被修改的
- Position:Buffer缓冲区中下一个要被读或写的元素的索引位置,Position会自动由相应的 get( )和 put( )函数更新,为下一次读/写做准备
- 标记Mark一个备忘位置。用于记录上一次读写的位置。
简单看下ByteBuffer的使用,体会下往其中写入数据、切换成读模式后上面这四个值的变换:
ByteBuffer byteBuffer = ByteBuffer.allocate(1024); System.out.println("初始时-->limit--->" + byteBuffer.limit()); System.out.println("初始时-->position--->" + byteBuffer.position()); System.out.println("初始时-->capacity--->" + byteBuffer.capacity()); System.out.println("初始时-->mark--->" + byteBuffer.mark()); System.out.println("--------------------------------------"); // 添加一些数据到缓冲区中 String s = "后端Dancer"; byteBuffer.put(s.getBytes()); // 看一下初始时4个核心变量的值 System.out.println("put完之后-->limit--->" + byteBuffer.limit()); System.out.println("put完之后-->position--->" + byteBuffer.position()); System.out.println("put完之后-->capacity--->" + byteBuffer.capacity()); System.out.println("put完之后-->mark--->" + byteBuffer.mark()); System.out.println("--------------------------------------"); byteBuffer.flip(); System.out.println("flip完之后-->limit--->" + byteBuffer.limit()); System.out.println("flip完之后-->position--->" + byteBuffer.position()); System.out.println("flip完之后-->capacity--->" + byteBuffer.capacity()); System.out.println("flip完之后-->mark--->" + byteBuffer.mark());
这里介绍一个比较高效的ByteBuffer,MappedByteBuffer 它可以实现文件在堆外内存(非JVM内存的系统内存)的修改:
public static void mappedByteBufferTest() throws IOException { RandomAccessFile file = new RandomAccessFile("/Users/bytedance/Desktop/file.txt", "rw"); FileChannel fileChannel = file.getChannel(); //0~3的位置是直接映射到内存中的,可以修改文件中的这部分内容 MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 3); mappedByteBuffer.put(0, (byte) 'Y'); mappedByteBuffer.put(2, (byte) 'K'); file.close(); }
4.4 Channel通道
BIO中流是单向的,要么是输入流,要么是输出流。然后NIO中,Channel作为运输数据的通道,是双向的。Channel是一个抽象类,常用的实现有:ServerSocketChannel、SocketChannel、FileChannel、DatagramChannel,其中DatagramChannel是用于UDP数据,而其他三者用于TCP数据。
FileChannel类主要用于对本地文件进行IO操作,常用的方法有:
- public int read(ByteBuffer var1) 从Channel读取数据并放到ByteBuffer中
- public int write(ByteBuffer var1) 从把ByteBuffer中的数据写入到Channel中
- public long transferFrom(ReadableByteChannel var1, long var2, long var4) 从ReadableByteChannel复制数据到Channel中
- public long transferTo(long var1, long var3, WritableByteChannel var5) 从Channel中把数据复制到WritableByteChannel
下面简单看下这个例子,将字符串输出到文件:
import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; /** * @author zhongmingyi * @date 2021/9/16 10:38 下午 */ public class FileChannelTest { public static void main(String[] args) throws IOException { String text = "Hello, Zhongger!"; //创建一个文件输出流 FileOutputStream fileOutputStream = new FileOutputStream("/Users/bytedance/Desktop/file.txt"); //通过文件输出流获取到FileChannel FileChannel fileChannel = fileOutputStream.getChannel(); //创建一个ByteBuffer,将数据写入ByteBuffer中 ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put(text.getBytes()); //切换成读模式 buffer.flip(); //把ByteBuffer里到数据写入到FileChannel中 fileChannel.write(buffer); //关闭文件输出流 fileOutputStream.close(); } }
再看下从本地文件读取数据的例子
public static void readFromFile() throws IOException { //创建一个文件输入流 FileInputStream fileInputStream = new FileInputStream("/Users/bytedance/Desktop/file.txt"); //通过文件输入流获取到FileChannel FileChannel fileChannel = fileInputStream.getChannel(); //创建一个ByteBuffer,将Channel的数据读取到ByteBuffer中 ByteBuffer buffer = ByteBuffer.allocate(1024); fileChannel.read(buffer); //输出ByteBuffer中的数据 System.out.println(new String(buffer.array())); //关闭文件输入流 fileInputStream.close(); }
从一个文件读取数据到Buffer,再把Buffer写入到另外一个文件
public static void readFromOneFileWriteToOtherFile() throws IOException { FileInputStream fileInputStream = new FileInputStream("/Users/bytedance/Desktop/file.txt"); FileOutputStream fileOutputStream = new FileOutputStream("/Users/bytedance/Desktop/file2.txt"); FileChannel fileInputStreamChannel = fileInputStream.getChannel(); FileChannel fileOutputStreamChannel = fileOutputStream.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); while (true) { //Buffer复位,防止越界 buffer.clear(); int read = fileInputStreamChannel.read(buffer); if (read == -1) { break; } //切读 buffer.flip(); fileOutputStreamChannel.write(buffer); } fileInputStream.close(); fileOutputStream.close(); }
把一个文件复制到另一个文件:
public static void transferFrom() throws IOException { FileInputStream fileInputStream = new FileInputStream("/Users/bytedance/Desktop/file.txt"); FileOutputStream fileOutputStream = new FileOutputStream("/Users/bytedance/Desktop/file_copy.txt"); FileChannel fileInputStreamChannel = fileInputStream.getChannel(); FileChannel fileOutputStreamChannel = fileOutputStream.getChannel(); fileOutputStreamChannel.transferFrom(fileInputStreamChannel, 0, fileInputStreamChannel.size()); fileInputStream.close(); fileOutputStream.close(); }
再简单介绍下:FileChannel提供了map方法把文件映射到虚拟内存,通常情况可以映射整个文件,如果文件比较大,可以进行分段映射。简单地说就是通过映射的方式来减少一次内核态到用户态之间的拷贝,因此可以提高复制的性能。
public static void mappedByteBufferTest() throws IOException { RandomAccessFile file = new RandomAccessFile("/Users/bytedance/Desktop/file.txt", "rw"); FileChannel fileChannel = file.getChannel(); MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 3); mappedByteBuffer.put(0, (byte) 'Y'); mappedByteBuffer.put(2, (byte) 'K'); file.close(); }
4.5 Selector选择器
Selector选择器是NIO中的多路复用器,一个线程对应一个Selector,而Selector中可以注册多个Channel,当Channel中有事件发生时,线程就可以去处理这个事件,若没有事件发生时,线程可以空出来去做其他事情。使用Selector的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。
4.5.1 Selector的创建
通过调用Selector.open()方法创建一个Selector对象,如下:
Selector selector = Selector.open();
4.5.2 注册Channel到Selector
Channel必须是非阻塞的,否则会抛出IllegalBlockingModeException 异常
channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_ACCEPT);
该方法可以将Channel设置为非阻塞的
abstract SelectableChannel configureBlocking(boolean block)
注意: SelectableChannel抽象类的configureBlocking()方法是由AbstractSelectableChannel抽象类实现的,SocketChannel、ServerSocketChannel、DatagramChannel都是直接继承了AbstractSelectableChannel抽象类,因此它们可以调用configureBlocking方法设置为非阻塞的模式。
register() 方法的第二个参数,是一个“ interest集合 ”,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:
SelectionKey.OP_ACCEPT SelectionKey.OP_WRITE SelectionKey.OP_READ SelectionKey.OP_CONNECT
通道触发了一个事件意思是该事件已经就绪。比如某个Channel成功连接到另一个服务器称为“连接就绪(connect)”。一个ServerSocketChannel准备好接收新进入的连接称为“接收就绪(accept)”。一个有数据可读的通道可以说是“ 读就绪(read)”。等待写数据的通道可以说是“ 写就绪(write) ”。
4.5.3 SelectionKey
一个SelectionKey键表示了一个特定的通道对象(Channel)和一个特定的选择器对象(Selector)之间的注册关系。
key.attachment(); //返回SelectionKey的attachment,attachment可以在注册channel的时候指定。 key.channel(); // 返回该SelectionKey对应的channel。 key.selector(); // 返回该SelectionKey对应的Selector。 key.interestOps(); //返回代表需要Selector监控的IO操作的bit mask key.readyOps(); // 返回一个bit mask,代表在相应channel上可以进行的IO操作。```
4.5.4 从Selector中选择Channel
Selector维护注册过的Channel集合,并且这种注册关系都被封装在SelectionKey当中。 Selector维护的三种类型SelectionKey集合:
- 已注册的键的集合(Registered key set)。所有与选择器关联的通道所生成的键的集合称为已经注册的键的集合。并不是所有注册过的键都仍然有效。这个集合通过 keys() 方法返回,并且可能是空的。这个已注册的键的集合不是可以直接修改的;试图这么做的话将引发java.lang.UnsupportedOperationException。
- 已选择的键的集合(Selected key set)。所有选择器监听到关联的通道所生成的键的集合称为已经选择的键的集合。这个集合通过 selectedKeys() 方法返回,并且可能是空的。
- 已取消的键的集合(Cancelled key set)。已注册的键的集合的子集,这个集合包含了 cancel() 方法被调用过的键(这个键已经被无效化),但它们还没有被注销。这个集合是选择器对象的私有成员,因而无法直接访问。
注意: 当键被取消( 可以通过isValid( ) 方法来判断)时,它将被放在相关的选择器的已取消的键的集合里。注册不会立即被取消,但键会立即失效。当再次调用 select( ) 方法时(或者一个正在进行的select()调用结束时),已取消的键的集合中的被取消的键将被清理掉,并且相应的注销也将完成。通道会被注销,而新的SelectionKey将被返回。当通道关闭时,所有相关的键会自动取消(记住,一个通道可以被注册到多个选择器上)。当选择器关闭时,所有被注册到该选择器的通道都将被注销,并且相关的键将立即被无效化(取消)。一旦键被无效化,调用它的与选择相关的方法就将抛出CancelledKeyException。
select()方法介绍:
在刚初始化的Selector对象中,这三个集合都是空的。 通过Selector的select()方法可以选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。下面是Selector几个重载的select()方法:
int select():阻塞到至少有一个通道在你注册的事件上就绪了。 int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。 int selectNow():非阻塞,只要有通道就绪就立刻返回。 select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。
一旦调用select()方法,并且返回值不为0时,则 可以通过调用Selector的selectedKeys()方法来访问已选择键集合 。如下: Set selectedKeys=selector.selectedKeys(); 进而可以放到和某SelectionKey关联的Selector和Channel。如下所示:
while (true) { if (selector.select(1000) == 0) { System.out.println("服务器未连接到客户端。。。"); continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); if (selectionKey.isAcceptable()) { System.out.println("服务端接受到了客户端请求"); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } if (selectionKey.isReadable()) { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment(); channel.read(byteBuffer); System.out.println("from 客户端 " + new String(byteBuffer.array())); } iterator.remove(); } }
4.5.5 停止选择的方法
选择器执行选择的过程,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,那么我们有以下三种方式可以唤醒在select()方法中阻塞的线程。
- wakeup()方法 :通过调用Selector对象的wakeup()方法让处在阻塞状态的select()方法立刻返回 该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。
- close()方法 :通过close()方法关闭Selector, 该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似wakeup()),同时使得注册到该Selector的所有Channel被注销,所有的键将被取消,但是Channel本身并不会关闭。
4.5.6 NIO客户端、服务端
服务端代码:
package com.Zhongger; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; /** * @author zhongmingyi * @date 2021/9/15 3:00 下午 */ public class NIOServer { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8886)); serverSocketChannel.configureBlocking(false); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { if (selector.select(1000) == 0) { System.out.println("服务器未连接到客户端。。。"); continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); if (selectionKey.isAcceptable()) { System.out.println("服务端接受到了客户端请求"); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } if (selectionKey.isReadable()) { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment(); channel.read(byteBuffer); System.out.println("from 客户端 " + new String(byteBuffer.array())); } iterator.remove(); } } } }
客户端代码:
package com.Zhongger; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; /** * @author zhongmingyi * @date 2021/9/24 1:27 下午 */ public class NIOClient { public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8886); //连接服务器 if (!socketChannel.connect(inetSocketAddress)) { while (!socketChannel.finishConnect()) { System.out.println("连接需要时间,客户端不会阻塞,可以做其他工作"); } } //连接成功,发送数据 String str = "Hello,Zhongger!"; ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes()); socketChannel.write(byteBuffer); System.in.read(); } }
5 Java NIO 小结
- 事件驱动模型
- 避免多线程
- 单线程处理多任务
- 非阻塞I/O,I/O读写不再阻塞
- 基于block的传输,通常比基于流的传输更高效
- 更高级的IO函数,zero-copy
- IO多路复用大大提高了Java网络应用的可伸缩性和实用性
加载全部内容