java基于netty NIO的简单聊天室的实现
Alexwym 人气:0本文着重讲解了java基于netty NIO的简单聊天室的实现,文中通过代码实例讲解的非常细致,对大家的工作和学习具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
一、为何要使用netty开发
由于之前已经用Java中的socket写过一版简单的聊天室,这里就不再对聊天室的具体架构进行细致的介绍了,主要关注于使用netty框架重构后带来的改变。对聊天室不了解的同学可以先看下我的博客(《JAVA简单聊天室的实现》)
本篇博客所使用的netty版本为4.1.36,完整工程已上传到Github(https://github.com/Alexlingl/Chatroom),其中lib文件夹下有相应的netty jar包和source包,自行导入即可。
1、为何要重构
之前的聊天室是基于Java原生socket实现的,socket的处理机制属于BIO模型,也就是阻塞IO模型。对于每一个客户端的连接我们都需要启动一个线程来处理,并且该线程会一直阻塞在读取用户数据上面。如此一来,一旦有大量的客户端并发连接我们的服务器,服务器将难以承受。之前用JMeter测试过,单纯使用Java原生socket开发的服务器所能支持的最大并发在2300左右。虽然采用线程池的策略可以在一定程度上提升最大并发数,但也无法超过1W。因此我们需要对其进行重构,使其能够具有更高的性能。
2、为何使用netty框架
使用netty框架主要还是为了提升代码的开发速度,并且减少代码维护成本。使用netty框架开发的程序在复杂度上比使用Java原生NIO类库开发的要小很多。具体可以看下我之前关于解决C10k问题的系列文章,里面有具体的代码。
3、为何netty框架只实现了NIO而没有AIO
前面在解决C10问题时,探究过NIO和AIO的区别,并且使用Java所提供的类库实现了两个小程序,理论上来说AIO性能明显要比NIO高,那为什么netty使用了NIO而不是AIO呢?
官方说法如下:
We obviously did not consider Windows as a serious platform so far, and that's why we were neglecting NIO.2 AIO API which was implemented using IOCP on Windows. (On Linux, it wasn't any faster because it was using the same OS facility - epoll.)
大意就是,windows上面有IOCP来支持AIO的实现,因此AIO的性能会比NIO好。而Linux上面不管是NIO还是AIO,底层都是用epoll实现的,性能差距不大。然而当下绝大部分的服务器还是建立在Linux上,因此没必要使用AIO(使用AIO反而会增加代码的复杂度,增大维护成本)。
二、基于netty NIO的处理模型
1、服务器的类关系
(1)、SubreqServer:创建两个NIO线程组,一个用来监听处理客户端的连接请求,一个用来监听客户端的消息。同时实例化一个ServerBootStrap启动类的对象来启动两个NIO线程组,并且配置必要的参数。
(2)、ChannelInitializer:初始化SocketChannel管道的各项参数,主要有指定解码器和编码器,并指明管道的处理类
(3)、SubreqServerHandler:SocketChannel管道的处理类,负责处理来自客户端的消息
2、客户端的类关系
(1)、SubreqClient:创建一个NIO线程组,用来监听客户端的消息。同时实例化一个ServerBootStrap启动类的对象来启动这个NIO线程组,并且配置必要的参数。最后让主程序阻塞在监听客户端的键盘输入。
(2)、ChannelInitializer:初始化SocketChannel管道的各项参数,主要有指定解码器和编码器,并指明管道的处理类
(3)、SubreqClientHandler:SocketChannel管道的处理类,负责处理来自服务器的消息
三、所涉及类库的源码解读
1、ChannelHandlerAdapter(用来对channel的注册和注销做出反应的类):
(1)、功能
用来实现当用户上线或下线时,通知其他在线的用户。
(2)、类定义
它是ChannelHandler的框架实现
(3)、HandlerAdded()和HandlerRemoved()方法
当有一个channel被注册后将会调用HandlerAdded(),而当有一个channel被注销后将调用HandlerRemoved()方法。并且根据注释我们知道,这两个方法默认不做任何处理,它希望由继承的子类自己去写相应的处理实现。
2、SimpleChannelInboundHandler(用来对接收的消息做出反应)
(1)、功能
用来实现当接收到消息时做出相应反应,如果是服务端,那么将当前消息转发给其他在线的客户端;如果是客户端,就将消息简单地打印出来。
(2)、类定义
这个类是一个泛型类,它只能用于处理一种具体类型的消息。注释中给出一种使用方法,StringHandler继承了SimpleChannelInboundHandler,并且指定泛型变量为String,因此这个继承类只能用于String类型消息的处理。
(2)、channelRead(ChannelHandlerContext ctx, Object msg)
这个方法主要用来处理类型为Object的消息,也就是所有消息。
首先当接收到msg时,先使用accpetInboundMessage()方法来判断该消息是否可以处理。我们来看下该方法的实现。
如果接收到的这个消息应该被处理就返回true,如果它应当被传到ChannelPipeLine的下一个ChannelInboundHandler就返回false。
我们继续来看下ChannelRead()方法。如果接受到的消息可以处理时。它将对消息进行强制转化,将其转为I,并且调用channelRead0()。它是一个抽象类,因此我们在继承SimpleChannelInboundHandler类时,需要根据自己的实际去实现这个方法。
四、关键的代码实现
1、server端
(1)、SubreqServer类
package nettyserverv1; import java.util.ArrayList; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; /** * Created by linguolong on 2019/05/08. * Chatroom server built using netty framework */ public class SubreqServer { //保存已注册的用户信息 public static ArrayList<UserInfo> userlist = new ArrayList<UserInfo>(); //自动生成注册用户 static{ for(int i=0;i<10;i++){ UserInfo user=new UserInfo(); user.setUserID("123"+i); user.setUserName("user"+i); user.setPassword("pwd"+i); userlist.add(user); } } public void bind(int port) throws Exception{ //配置服务端NIO 线程组 EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap server = new ServerBootstrap(); try { server.group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { /** * 解码器: 构造器传入了两个参数: * #1 单个对象序列化后最大字节长度,这是设置是1M; * #2 类解析器: weakCachingConcurrentResolver创建线程安全的WeakReferenceMa对类加载器进行缓存, * 支持多线程并发访问,当虚拟机内存不足时,会释放缓存中的内存,防止内存泄漏. */ ch.pipeline().addLast(new ObjectDecoder(1024*1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())) ); ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new SubreqServerHandler()); } }); System.out.println("Start the server success"); //绑定端口, 同步等待成功 ChannelFuture future = server.bind(port).sync(); //等待服务端监听端口关闭 future.channel().closeFuture().sync(); } finally { //优雅关闭 线程组 boss.shutdownGracefully(); worker.shutdownGracefully(); } } public static void main(String[] args) { SubreqServer server = new SubreqServer(); try { server.bind(18888); } catch (Exception e) { e.printStackTrace(); } } }
(2)、SubreqServerHandler类
package nettyserverv1; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.ChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * Created by linguolong on 2019/05/08. * Chatroom client built using netty framework */ public class SubreqServerHandler extends SimpleChannelInboundHandler<String>{ //新建一个channelGroup,用于存放连接的channel public static ChannelGroup online_channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerRemoved(ChannelHandlerContext ctx){ Channel leave_channel = ctx.channel(); for (Channel channel : online_channels) { if (channel != leave_channel){ channel.writeAndFlush("[用户 " + leave_channel.remoteAddress() + "]下线了!\n"); } } System.out.println(ctx.channel().id()+"下线了"); //把刚下线的channel移除出在线用户队列 online_channels.remove(leave_channel); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //判断用户是否已经登录 if(!check_login(ctx)){ //未登录则进行登录验证 check_identity(ctx,msg); }else{ //已登录则进行消息转发 Channel coming_channel = ctx.channel(); for (Channel channel : online_channels) { if (channel != coming_channel){ channel.writeAndFlush("[用户 " + coming_channel.remoteAddress() + "]: " + msg ); } else { channel.writeAndFlush("[我]: " + msg); } } } } //用户信息验证,检查用户ID和密码是否正确 public void check_identity(ChannelHandlerContext ctx, Object msg){ UserInfo req = (UserInfo) msg; System.out.println("service receive client login req :{"+ req.toString() +"}"); boolean login_flag = false; for(int i=0;i<SubreqServer.userlist.size();i++){ if( SubreqServer.userlist.get(i).getUserID().equalsIgnoreCase(req.getUserID())&&(SubreqServer.userlist.get(i).getPassword().equals(req.getPassword()))){ login_flag=true; } } if(login_flag){ System.out.println("账号"+req.getUserID()+"登录成功"); ctx.writeAndFlush("您已登录成功~\n"); //将当前的通道加入在线队列中 online_channels.add(ctx.channel()); } else{ System.out.println("账号"+req.getUserID()+"登录失败"); ctx.writeAndFlush("登录失败!"); //关闭连接 ctx.close(); online_channels.remove(ctx.channel()); } } //判断用户是否已经在线 public boolean check_login(ChannelHandlerContext ctx){ boolean online_flag = false; for(int i=0;i<online_channels.size();i++){ if(online_channels.contains(ctx.channel())){ online_flag = true; } } return online_flag; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //释放资源 ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } //ChannelRead0()只能处理类型为String的消息,因此我们这里不能用ChannelRead0()这个方法,这里的第二个参数类型使用了泛型 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { } }
(3)、用户信息UserInfo类
package nettyserverv1; import java.io.Serializable; /** * Created by linguolong on 2019/05/08. * Chatroom User Infomation */ public class UserInfo implements Serializable{ private String userID; private String userName; private String password; public String getUserID() { return userID; } public void setUserID(String userID) { this.userID = userID; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } @Override public String toString() { return "SubscribeReq: [messageID]:"+ userID + " [userName]:" +userName + " [password]:" +password; } }
2、Client端
(1)、SubreqClient类
package nettyclientv1; import java.io.BufferedReader; import java.io.InputStreamReader; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; /** * Created by linguolong on 2019/05/08. * Chatroom client built using netty framework */ public class SubreqClient { public void connect(int port, String host) throws Exception{ //配置客户端NIO 线程组 EventLoopGroup group = new NioEventLoopGroup(); Bootstrap client = new Bootstrap(); try { client.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //添加解码器 ch.pipeline().addLast(new ObjectDecoder(1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())) ); //添加编码器 ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new SubreqClientHandler()); } }); //异步获取当前已连接的channel Channel now_channel = client.connect(host,port).sync().channel(); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); //异步等待客户端连接端口关闭 // now_channel.closeFuture().sync(); //使客户端一直处于输入状态,直到读取到"bye" String message = " "; while (true) { //读到bye时退出 if(message.equals("bye")) break; message = reader.readLine(); now_channel.writeAndFlush(message+"\n"); } //读到了"bye"字符串,主动断开连接 now_channel.close(); } finally { //优雅关闭 线程组 group.shutdownGracefully(); } } public static void main(String[] args) { SubreqClient client = new SubreqClient(); try { client.connect(18888, "127.0.0.1"); } catch (Exception e) { e.printStackTrace(); } } }
(2)、SubreqClientHandler类
package nettyclientv1; import nettyserverv1.UserInfo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Created by linguolong on 2019/05/08. * Chatroom client built using netty framework */ public class SubreqClientHandler extends ChannelInboundHandlerAdapter{ public SubreqClientHandler() { } /** * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(subReq("1231","user1","pwd1")); ctx.flush(); } private UserInfo subReq(String id,String userName,String password){ UserInfo req = new UserInfo(); req.setUserID(id); req.setUserName(userName); req.setPassword(password); return req; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.print(msg.toString()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
五、说明
1、登录功能的实现:当客户端刚连接上服务器时,便构造一个UserInfo对象,对对象进行编码后发送给服务器。服务器接收到后对其进行解码,验证相应的账户ID和密码是否正确。
加载全部内容