亲宝软件园·资讯

展开

编程思想:如何设计一个好的通信网络协议

木宛城主 人气:1
当网络中两个进程需要通信时,我们往往会使用 `Socket` 来实现。`Socket` 都不陌生。当三次握手成功后,客户端与服务端就能通信,并且,彼此之间通信的数据包格式都是二进制,由` TCP/IP` 协议负责传输。 当客户端和服务端取得了二进制数据包后,我们往往需要『萃取』出想要的数据,这样才能更好的执行业务逻辑。所以,我们需要定义好数据结构来描述这些二进制数据的格式,这就是通信网络协议。简单讲,就是需要约定好二进制数据包中每一段字节的含义,比如从第 n 字节开始的 m 长度是核心数据,有了这样的约定后,我们就能解码出想要的数据,执行业务逻辑,这样我们就能畅通无阻的通信了。 ### 网络协议的设计 **概要划分** 一个最基本的网络协议必须包含 - 数据的长度 - 数据 了解 `TCP` 协议的同学一定听说过`粘包、拆包` 这两个术语。因为`TCP`协议是数据流协议,它的底层根据二进制缓冲区的实际情况进行包的划分。所以,不可避免的会出现`粘包,拆包` 现象 。为了解决它们,我们的网络协议往往会使用一个 4 字节的 `int` 类型来表示数据的大小。比如,`Netty` 就为我们提供了 `LengthFieldBasedFrameDecoder` 解码器,它可以有效的使用自定义长度帧来解决上述问题。 同时一个好的网络协议,还会将动作和业务数据分离。试想一下, `HTTP` 协议的分为请求头,请求体—— - 请求头:定义了接口地址、`Http Method`、`HTTP` 版本 - 请求体:定义了需要传递的数据 这就是一种分离关注点的思想。所以自定义的网络协议也可以包含: - 动作指令:比如定义 `code` 来分门别类的代表不同的业务逻辑 - 序列化算法:描述了 `JAVA` 对象和二进制之间转换的形式,提供多种序列化/反序列化方式。比如 `json`、`protobuf` 等等,甚至是自定义算法。比如:`rocketmq ` 等等。 同时,协议的开头可以定义一个约定的`魔数`。这个固定值(4字节),一般用来判断当前的数据包是否合法。比如,当我们使用 `telnet` 发送错误的数据包时,很显然,它不合法,会导致解码失败。所以,为了减轻服务器的压力,我们可以取出数据包的前`4`个字节与固定的`魔数`对比,如果是非法的格式,直接关闭连接,不继续解码。 **网络协议结构如下所示**: ```javascript +--------------+-----------+------------+-----------+----------+ | 魔数(4) | code(1) |序列化算法(1) |数据长度(4) |数据(n) | +--------------+-----------+------------+-----------+----------+ ``` ### RocketMQ 通信网络协议的实现 **RocketMQ 网络协议** 这一小节,我们从`RocketMQ` 中,分析优秀通信网络协议的实现。`RocketMQ` 项目中,客户端和服务端的通信是基于 Netty 之上构建的。同时,为了更加有效的通信,往往需要对发送的消息自定义网络协议。 `RocketMQ` 的网络协议,从数据分类的角度上看,可分为两大类 - 消息头数据(Header Data) - 消息体数据(Body Data) ![](https://img2020.cnblogs.com/blog/299214/202003/299214-20200330040625987-1566249621.png) 从左到右 - 第一段:4 个字节整数,等于2、3、4 长度总和 - 第二段:4 个字节整数,等于3 的长度。特别的 `byte[0]` 代表序列化算法,`byte[1~3]`才是真正的长度 - 第三段:代表消息头数据,结构如下 ```java { "code":0, "language":"JAVA", "version":0, "opaque":0, "flag":1, "remark":"hello, I am respponse /127.0.0.1:27603", "extFields":{ "count":"0", "messageTitle":"HelloMessageTitle" } } ``` - 第四段:代表消息体数据 **RocketMQ 消息头协议详细如下:** | Header 字段名 | 类型 | Request | Response | | ------------- | ---------------------- | ------------------------------------------------------------ | -------------------------------------------- | | code | 整数 | 请求操作代码,请求接收方根据不同的代码做不同的操作 | 应答结果代码,0表示成功,非0表示各种错误代码 | | language | 字符串 | 请求发起方实现语言,默认JAVA | 应答接收方实现语言 | | version | 整数 | 请求发起方程序版本 | 应答接收方程序版本 | | opaque | 整数 | 请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用 | 应答方不做修改,直接返回 | | flag | 整数 | 通信层的标志位 | 通信层的标志位 | | remark | 字符串 | 传输自定义文本信息 | 错误详细描述信息 | | extFields | HashMap | 请求自定义字段 | 应答自定义字段 | **编码过程** `RocketMQ` 的通信模块是基于 `Netty`的。通过定义 `NettyEncoder` 来实现对每一个 `Channel`的 出栈数据进行编码,如下所示: ```java @ChannelHandler.Sharable public class NettyEncoder extends MessageToByteEncoder { @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { ... } } } ``` 其中,核心的编码过程位于 `RemotingCommand` 对象中,`encodeHeader` 阶段,需要统计出消息总长度,即: - 定义消息头长度,一个整数表示:占4个字节 - 定义消息头数据,并计算其长度 - 定义消息体数据,并计算其长度 - 额外再加 4是因为需要加入消息总长度,一个整数表示:占4个字节 ```java public ByteBuffer encodeHeader(final int bodyLength) { // 1> 消息头长度,一个整数表示:占4个字节 int length = 4; // 2> 消息头数据 byte[] headerData; headerData = this.headerEncode(); // 再加消息头数据长度 length += headerData.length; // 3> 再加消息体数据长度 length += bodyLength; // 4> 额外加 4是因为需要加入消息总长度,一个整数表示:占4个字节 ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // 5> 将消息总长度加入 ByteBuffer result.putInt(length); // 6> 将消息的头长度加入 ByteBuffer result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // 7> 将消息头数据加入 ByteBuffer result.put(headerData); result.flip(); return result; } ``` 其中,`encode` 阶段会将 `CommandCustomHeader` 数据转换 `HashMap`,方便序列化 ```java public void makeCustomHeaderToNet() { if (this.customHeader != null) { Field[] fields = getClazzFields(customHeader.getClass()); if (null == this.extFields) { this.extFields = new HashMap(); } for (Field field : fields) { if (!Modifier.isStatic(field.getModifiers())) { String name = field.getName(); if (!name.startsWith("this")) { Object value = null; try { field.setAccessible(true); value = field.get(this.customHeader); } catch (Exception e) { log.error("Failed to access field [{}]", name, e); } if (value != null) { this.extFields.put(name, value.toString()); } } } } } } ``` 特别的,消息头序列化支持两种算法: - `JSON` - `RocketMQ` ```java private byte[] headerEncode() { this.makeCustomHeaderToNet(); if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { return RocketMQSerializable.rocketMQProtocolEncode(this); } else { return RemotingSerializable.encode(this); } } ``` 这儿需要值得注意的是,`encode`阶段将当前 `RPC` 类型和 `headerData`长度编码到一个 `byte[4]` 数组中,`byte[0]` 位序列化类型。 ```java public static byte[] markProtocolType(int source, SerializeType type) { byte[] result = new byte[4]; result[0] = type.getCode(); result[1] = (byte) ((source >> 16) & 0xFF); result[2] = (byte) ((source >> 8) & 0xFF); result[3] = (byte) (source & 0xFF); return result; } ``` 其中,通过与运算 `& 0xFF` 取低八位数据。 所以, 最终 `length` 长度等于序列化类型 + header length + header data + body data 的字节的长度。 **解码过程** `RocketMQ` 解码通过`NettyDecoder`来实现,它继承自 `LengthFieldBasedFrameDecoder`,其中调用了父类`LengthFieldBasedFrameDecoder`的构造函数 ```java super(FRAME_MAX_LENGTH, 0, 4, 0, 4); ``` 这些参数设置`4`个字节代表 `length`总长度,同时解码时跳过最开始的`4`个字节: ```java frame = (ByteBuf) super.decode(ctx, in); ``` 所以,得到的 `frame`= 序列化类型 + header length + header data + body data 。解码如下所示: ```java public static RemotingCommand decode(final ByteBuffer byteBuffer) { //总长度 int length = byteBuffer.limit(); //原始的 header length,4位 int oriHeaderLen = byteBuffer.getInt(); //真正的 header data 长度。忽略 byte[0]的 serializeType int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; } private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { switch (type) { case JSON: RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; } return null; } ``` 其中,`getProtocolType`,右移 `24`位,拿到 `serializeType`: ```java public static SerializeType getProtocolType(int source) { return SerializeType.valueOf((byte) ((source >> 24) & 0xFF)); } ``` `getHeaderLength` 拿到 0-24 位代表的 `headerData` length: ```java public static int getHeaderLength(int length) { return length & 0xFFFFFF; } ``` ### 小结 对于诸多中间件而言,底层的网络通信模块往往会使用 `Netty`。`Netty` 提供了诸多的编解码器,可以快速方便的上手。本文从如何设计一个网络协议入手,最终切入到 `RocketMQ` 底层网络协议的实现。可以看到,它并不复杂。仔细研读几遍变能理解其奥义。具体参考类`NettyEncoder`、`NettyDecoder`、`RemotingCommand`。

加载全部内容

相关教程
猜你喜欢
用户评论