Netty粘包拆包
kaico2018 人气:0为什么使用Netty框架
- NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
- 需要具备其他的额外技能做铺垫,例如熟悉Java多线程编程。这是因为NIO编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序。
- 可靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等问题,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐的工作量和难度都非常大。
- JDK NIO的BUG,例如臭名昭著的 epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该BUG发生概率降低了一些而已,它并没有被根本解决。该BUG以及与该BUG相关的问题单可以参见以下链接内容。
由于上述原因,在大多数场景下,不建议大家直接使用JDK的NIO类库,除非你精通NIO编程或者有特殊的需求。在绝大多数的业务场景中,我们可以使用NIO框架Netty来进行NIO编程,它既可以作为客户端也可以作为服务端,同时支持UDP和异步文件传输,功能非常强大。
Netty框架介绍
Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,它已经得到成百上千的商用项目验证,例如Hadoop的RPC框架Avro就使用了Netty作为底层通信框架,其他还有业界主流的RPC框架,也使用Netty来构建高性能的异步通信能力。
优点总结:
- API使用简单,开发门槛低;
- 功能强大,预置了多种编解码功能,支持多种主流协议;
- 定制能力强,可以通过ChannelHandler对通信框架进行灵活地扩展;
- 性能高,通过与其他业界主流的NIO框架对比,Netty的综合性能最优;
- 成熟、稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼;
- 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会加入;
- 经历了大规模的商业应用考验,质量得到验证。Netty在互联网、大数据、网络游戏、企业应用、电信软件等众多行业已经得到了成功商用,证明它已经完全能够满足不同行业的商业应用了。
Netty实战
首先引入Netty的jar包。
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.42.Final</version> </dependency>
Netty编写服务器端
NettyServer 类
public class NettyServer { /** * netty启动端口号 */ private static int port = 8080; public static void main(String[] args) { /** * 客户端创建两个线程池组分别为 boss线程组和工作线程组 */ // 用于接受客户端连接的请求 (并没有处理请求) NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于处理客户端连接的读写操作(处理请求操作) NioEventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); //NioServerSocketChannel 标记当前是服务器端 serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 设置我们分割最大长度为1024 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 获取数据的结果为string类型 socketChannel.pipeline().addLast(new StringEncoder()); //处理每个handler(也就是每次客户端请求) socketChannel.pipeline().addLast(new ServerHandler()); } }); try { //绑定端口号 ChannelFuture bind = serverBootstrap.bind(port); ChannelFuture sync = bind.sync(); System.out.println("服务器端启动成功:" + port); //等待监听我们的请求 sync.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { //优雅的关闭我们的线程池 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
ServerHandler 类
public class ServerHandler extends SimpleChannelInboundHandler { /* * @Author kaico * @Date 9:56 2020/10/8 * @Description //TODO 获取数据 * @Param [channelHandlerContext, o] * @return void **/ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { ByteBuf byteBuf = (ByteBuf) o; String request = byteBuf.toString(CharsetUtil.UTF_8); System.out.println("request:" + request); // 响应内容: channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("这里是Netty服务端\n", CharsetUtil.UTF_8)); } }
Netty客户端
NettyClient 类
public class NettyClient { public static void main(String[] args) { //创建nioEventLoopGroup NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress("127.0.0.1", 8080)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 设置我们分割最大长度为1024 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 获取数据的结果为string类型 ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); try { // 发起同步连接 ChannelFuture sync = bootstrap.connect().sync(); sync.channel().closeFuture().sync(); } catch (Exception e) { } finally { group.shutdownGracefully(); } } }
ClientHandler 类
public class ClientHandler extends SimpleChannelInboundHandler { /** * 活跃通道可以发送消息 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { // 发送数据 ctx.writeAndFlush(Unpooled.copiedBuffer("你是什么类型的服务端啊?\n", CharsetUtil.UTF_8)); } //客户端发十条消息 } /** * 读取消息 * * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("resp:" + byteBuf.toString(CharsetUtil.UTF_8)); } }
粘包与拆包
原因:因为我们现在tcp协议默认是长连接形式实现通讯,发送请求完了之后整个连接暂时不会关闭
1.短连接
客户端与服务器端建立连接的时候,客户端发送一条消息,客户端与服务器连接关闭
2.长连接
客户端与服务器端建立连接的时候,客户端发送多条消息,客户端与服务器连接关闭
什么是粘包:多次发送的消息,服务器一次合并读取msgmsg
什么是拆包:多次发送消息 服务器读取第一条数据完整+第二条不完整数据 第二条不完整数据 Msgm sg
为什么会造成拆包和粘包? 前提长连接、其次缓冲区
原因的造成:
Tcp协议为了能够高性能的传输数据,发送和接受时候都会采用缓冲区,必须等待缓冲区满了以后才可以发送或者读取;
当我们的应用程序如果发送的数据大于了我们的套字节的缓冲区大小的话,就会造成了拆包。拆分成多条消息读取。当我们应用程序如果发送的写入的消息如果小于套字节缓冲区大小的时候
粘包与拆包产生的背景:
Tcp协议为了高性能的传输,发送和接受的时候都采用了缓冲区
3. 当我们的应用程序发送的数据大于套字节缓冲区的时候,就会实现拆包。
4. 当我们的应用程序写入的数据小于套字节缓冲区的时候,多次发送的消息会合并到一起接受,这个过程我们可以称做为粘包。
5. 接受端不够及时的获取缓冲区的数据,也会产生粘包的问题
6. 进行mss(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>mss的时候将发生拆包。
解决思路:
7. 以固定的长度发送数据,到缓冲区
8. 可以在数据之间设置一些边界(\n或者\r\n)
9. 利用编码器LineBaseDFrameDecoder解决tcp粘包的问题
常用编码器:
- DelimiterBasedFrameDecoder 解决TCP的粘包解码器
- StringDecoder 消息转成String解码器
- LineBasedFrameDecoder 自动完成标识符分隔解码器
- FixedLengthFrameDecoder 固定长度解码器,二进制
- Base64Decoder 解码器
利用编码器LineBaseDFrameDecoder解决tcp粘包的问题的Java代码案例,核心思路就是增加边界 \n
服务器端类 NettyServer 的修改点
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 设置我们分割最大长度为1024 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 获取数据的结果为string类型 socketChannel.pipeline().addLast(new StringEncoder()); //发送数据的时候设置边界 \n socketChannel.pipeline().addLast(new ServerHandler()); } });
服务器端类 ServerHandler 的修改点
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { ByteBuf byteBuf = (ByteBuf) o; String request = byteBuf.toString(CharsetUtil.UTF_8); System.out.println("request:" + request); // 响应内容: channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("这里是Netty服务端\n", CharsetUtil.UTF_8)); }
客户端的类 NettyClient 的修改点
bootstrap.group(group).channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress("127.0.0.1", 8080)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 设置我们分割最大长度为1024 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 获取数据的结果为string类型 ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } });
客户端的类 ClientHandler 的修改点
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { // 发送数据 ctx.writeAndFlush(Unpooled.copiedBuffer("你是什么类型的服务端啊?\n", CharsetUtil.UTF_8)); } //客户端发十条消息 }
加载全部内容