手把手教你用netty撸一个ZkClient
N-L 人气:3原文地址: https://juejin.im/post/5dd296c0e51d4508182449a6
前言
有这个想法的缘由是前一阵子突发奇想, 想尝试能不能直接利用js连接到zookeeper, 从而获取到dubbo的注册信息.
后来一番查找资料后, 发现由于纯js不支持tcp socket通讯, 所以纯js是无法实现的. 但是发现有些大神却使用nodeJs实现zk的客户端. 这就成功地激起了我的兴趣. 简单地研究了一下zk通信协议后, 我开始尝试徒手撸一个zk的客户端.当然是用java实现
构思
zookeeper的通信协议是一种典型的"header/content"结构, 在header里面指定了content的字节数, content就是具体的报文数据.
既然是header/content结构, 那么很容易就能想到利用netty的LengthFieldPrepender来进行编码, 以及利用LengthFieldBasedFrameDecoder来进行解码. 有了netty这一大神器, 做什么事情都能事半功倍.因此决定了使用netty来进行开发.
客户端选型决定好了之后, 还得需要有个服务端来进行调试. 从协议上看, zk不同的版本之间应该不会存在太多的兼容性问题, 但是差异肯定是存在的. 所以为了方便起见,我们这里限定了服务端的zookeeper的版本是3.4.12, 更高的版本或更低的版本没有做过严格的兼容测试.
备注: 为了简化工作量, 除了版本外, 该客户端也只在单机模式测试过, 并没有验证过在集群模式上是否能跑通.[捂脸]
准备工作
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
如上面列表所示, zkCli为我们提供了很多命令, 本文我们将实现三个具有代表性的命令:
1. connect host:port
这个命令其实是用来跟服务端建立会话的.
为了避免跟socket建立tcp连接的connect方法相混淆, 我更愿意把它称作"login",
所以在实现的时候, 它对应的方法名也就是login
2. create [-s] [-e] path data acl
这个命令是用来创建zk的node节点的.
其中包括持久性节点, 持久性顺序节点, 临时性节点, 临时性顺序节点等,
还可以指定ACL权限
3. ls path [watch]
ls命令就是列举zk某个路径下的所有子路径,
在具体实现里, 我把这个命令叫做getChildren
在zookeep的通信协议里面, connect命令(login)是其他所有命令的必要前置条件. 因为作为一个客户端, 你必须跟服务端建立了会话之后,下面的命令请求才能被服务端接受和处理.
而除了connect命令之外, 其他的所有的命令其实都是大相径庭的. 因此你会发现, 理解了create命令和ls命令之后, 再实现其他命令也是很简单的事情的, 只需要了解它们的通信协议,其他的都是照葫芦画瓢的事情了.
当然了解它们的通信协议并不是个简单的事情, 而且每一个命令的报文结构都不大相同. 实际上在码代码的时候, 百分之七八十的精力基本都耗在了理解每个命令的报文结构上面.
代码实现
来看具体实现之前, 先来看一下项目的总结结构:
1. bean包
封装了每个命令需要的字段参数, 在序列化报文时只需要序列化对应的bean即可. 同样, 在服务端返回内容时, 也只需要把报文序列化成对应的对象即可.
2. factories包
上面提到过, zk的每个命令的报文结构都是不一样的,所以在序列化和反序列化时, 对应到netty的codec也是不一样的.这个实现了一个codec静态方法工厂, 需要的时候直接从codec工厂拿对应的codec即可.
3. registrys包
其实就是一个缓存中心, 缓存了每个命令对应的requestId和codec, 在服务端返回时, 从这个缓存中心根据requestId拿到对应codec来进行反序列化
4. utils包
一些工具类, 不需要多解释
5. zkcodec包
每个命令对应的codec和handler实现
6. NettyZkClient类
就是本文要介绍的zk客户端了
7. test
为了方便调试准备的单元测试, 先了解代码实现原理的可用直接从这个单元测试入手
看完代码结构后, 我们再来看每个命令的具体实现.
login命令
首先来看一下login命令的通信报文结构体, 如下图所示:
简单介绍一下每个字段的含义, 具体的含义大家可以在网上搜索做更深入的了解:
1. header
上面提到, zk的每个报文都是header/content模式, 其中header占用4个字节, 表示接下来的content的长度(字节数)
2. protocolVersion
顾名思义, 这个字段表示协议的版本号,用4个字节表示. 这里我们写死为0即可(好浪费~~~)
3. lastZxidSeen
等下我们会看到, zk服务端每次的响应都会返回一个zxid.顾名思义, 这个结构就是表示客户端接收到最新的zxid.用8个字节表示. 由于login一般都是第一次跟服务端通讯, 所以这里也是写死为0即可
4. timeout
login请求的目的是为了跟zk服务端建立会话, 这个参数表示的是会话的有效时间, 用四个字节表示
5. senssionId
会话ID, 用8个字节表示, 用于我们还没有建立会话,所以这个也是直接写死为0即可
6. passwordLen
用4个字节来表示接下来的密码的字节数
7. password
passwordLen个字节的密码, 用bytes[]表示
8. readOnly
boolean类型的,所以用一个字节表示即可
知道了报文结构之后, 我们就可以开始写代码了.
首先定义一个ZkLoginRequest的bean.在java里面, 8个字节可以用long类型表示, 4个字节可以用int表示, String类型可以很简单地转换成byte数组. 所以最后定义的bean如下所示:
public class ZkLoginRequest implements Serializable {
private Integer protocolVersion;
private Long lastZxidSeen;
private int timeout;
private Long sessionId;
private String password;
private boolean readOnly = true;
}
因为zk的通讯是基于字节的, 所以我们仅仅定义java对象是不行的, 还需要把java对象转换成字节, 才能发送服务端.而且服务端只会接收header/content形式的报文, 所以我们还得计算出整个java对象序列化之后的字节数, 把它赋值到header中去.
幸运的是, 这两个工作netty都为我们提供了很好的工具, 我们直接使用就可以了.
实现ZkLoginCodec把java对象转换成ByteBuf
ZkLoginCodec包括encode和decode两部分, 其中decode是用于解码服务端的响应的, encode是用于编码发送请求的, 如下所示, 把ZkLoginRequest转换成netty的ByteBuf
@Override
protected void encode(ChannelHandlerContext ctx, ZkLoginRequest msg, ByteBuf outByteBuf) throws Exception {
outByteBuf.writeInt(msg.getProtocolVersion());
outByteBuf.writeLong(msg.getLastZxidSeen());
outByteBuf.writeInt(msg.getTimeout());
outByteBuf.writeLong(msg.getSessionId());
String passWord = msg.getPassword();
SerializeUtils.writeStringToBuffer(passWord, outByteBuf);
outByteBuf.writeBoolean(msg.isReadOnly());
}
直接使用netty内置的LengthFieldPrepender
netty内置的LengthFieldPrepender可以把报文转换成header/content形式的结构, 其中构造参数表示header所占的字节数, 我们这里是4个字节, 所以是4.
// 编码器, 给报文加上一个4个字节大小的HEADER
nioSocketChannel.pipeline().addLast(new LengthFieldPrepender(4))
ZkLoginRequest对象经过这两个codec编码之后,zk服务端就能正确解析它的报文了, 如无意外的话, 服务端会针对我们的这个socket建立会话, 然后给我们响应一个报文, 报文中会包含sessionId. 响应的结构体如下所示:
可以看到, 响应报文跟我们的请求报文是差不多的,除了sessionId, 其他的基本是原封不动地给我们返回来了. 所以我们这里对响应报文含义不多做解释. 直接来看看如何解析服务端返回的响应报文.
从上图我们可以看到, 返回的报文也是header/content形式, 所以我们还是可以使用netty内置的解码器来获取header和content.
使用LengthFieldBasedFrameDecoder跳过header
不熟悉LengthFieldBasedFrameDecoder的同学可以先看看netty的官网, 这里不对这个类的参数做过多解释, 只需要知道我们是想跳过header,只获取响应报文的content部分即可
nioSocketChannel.pipeline()
// 解码器, 将HEADER-CONTENT格式的报文解析成只包含CONTENT
.addLast(ZkLoginHandler.LOGIN_LENGTH_FIELD_BASED_FRAME_DECODER,new LengthFieldBasedFrameDecoder(2048, 0, 4, 0, 4))
ZkLoginCodec把content反序列成ZkLoginResp
也就是ZkLoginCodec的decode部分
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
ZkLoginResp zkLoginResp = new ZkLoginResp();
zkLoginResp.setProtocolVersion(in.readInt());
zkLoginResp.setTimeout(in.readInt());
zkLoginResp.setSessionId(in.readLong());
String password = SerializeUtils.readStringToBuffer(in);
zkLoginResp.setPassword(password);
zkLoginResp.setReadOnly(in.readBoolean());
out.add(zkLoginResp);
}
完成这一步后, 我们的客户端就成功地跟服务端建立了会话了, 后面就可以愉快地发送其他请求了
create命令
opCode和requestHeader
在开始实现create命令之前, 先来了解一下两个术语,这两个术语不单是create命令需要用到的, 等下要实现的getChildren命令也同样需要用到.
- opCode
zk的服务端跟客户端约定好了, 每一个命令都对应一个opCode, 客户端发送命令请求时必须带上这个opCode, 服务端才能知道如何去处理这个命令. 例如create命令对应的opCode是1, 等下要实现的getChildren命令的opCode是8
- requestHeader
这个header不要跟header/content中的header混淆了. requestHeader还是content的一部分, 它包含了两个字段, 每个字段占4个字节, 如下图:
1. xid, 通俗点理解的就是requestId, 客户端维护这个xid的唯一性, 服务端返回响应时会原封不动的返回这个xid,
这样客户端就可以知道服务端返回的报文时对应哪个请求的了.毕竟socket通讯都是异步的.
2. type
这个更好理解, 就是上面的opcode
create命令的报文结构
说真的, create命令的报文有一丁点复杂, 所以
在看createRequest的报文结构之前,还得先了解另外一个概念: ACL权限;
zookeeper的ACL权限涉及到以下3个点:
- scheme 身份的认证有4种方式:
- id 授权的对象
- permission 权限
具体可以看这篇博客, 说得比较清楚.
在本文中我们写死了scheme是"world", id是"anyone", permission是31(转成二进制即11111,拥有CREATE、READ、WRITE、DELETE、ADMIN五种权限)
zk中的ACL的报文结构如下所示:
1. perms是permission的简写, 4个字节表示
2. 因为scheme是用字符串表示的, 所以需要用4个字节表示scheme字符串的长度, 用schemelen个字节表示scheme
3. id也是用字符串表示的, 跟scheme同理
了解了requestHeader和ACL的结构体之后, createRequest的报文结构也就比较好理解了, 如下图所示:
1. requestHeader
包含了xid和type
2. pathLen
要创建的path的字符串长度
3. path
要创建的path, 例如你想在zk上创建一个https://img.qb5200.com/download-x/dubbo节点, path就是"/path"
4. dataLen
要创建的path下的data的长度
5. data
要创建的path下的数据, 例如"192.168.99.101:2181"
6. aclListSize
zk的ACL权限是list形式的,表示不同的权限控制维度
7. aclList
aclListSize个ACL结构体
8. flags
该节点的类型, 可以是持久性节点, 持久性顺序节点, 临时节点, 临时顺序节点4种
接下来的工作就是login差不多了:
- 实现一个ZkCreateCodec把ZkCreateRequest转化成ByteBuf
- 利用LengthFieldPrepender把ByteBuf构建成Header/content的报文结构
- 利用LengthFieldBasedFrameDecoder从服务端的响应解析出content的内容
- 把content内容的ByteBuf转换成ZkCreateResponse
其中createRes的报文结构如下图所示:
1. xid
这个就是createReq中requestHeader的xid
2. zxid
这个可以跟login报文中的lastZxidSeen关联起来, 可以理解为服务端的xid
3. errcode
errcode为0表示正常响应, 如果不为0,说明有异常出现, 后面的path字段也会为空
4. pathLen和path
其实也是createReuest中的path和pathLen
3. getChildren命令(ls path)
如果上面的create命令能理解的话, 那getChildren命令也就很容易理解了, 两者只有报文结构不一样,因而codec也会有一点点不同.
getChildrenRequest的报文结构:
响应的结构体:
运行代码
要想快速地体验一下这个简陋的zkClient的话, 可以直接从单元测试入手:
public class ZkClientTest {
@Test
public void testZkClient() throws Exception {
// NettyZkClient的构造方法里面会调用login() 跟服务端建立会话
NettyZkClient nettyZkClient = new NettyZkClient(30000);
// 创建一个临时顺序节点
ZkCreateResponse createResponse = nettyZkClient.create("/as", 12312, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(new Gson().toJson(createResponse));
// 获取/下的所有子路径
List<String> list = nettyZkClient.getChildren("/");
System.out.println(new Gson().toJson(list));
}
}
实现新的命令
因为不同的命令之间的逻辑大多数是相同的, 因此我已经把一些通用的逻辑抽象了出来, 如果想要实现其他命令的话, 只需要做几步简单的工作. 例如,我想实现一个get path命令, 那么只需要:
- 查找文档,确定get命令的报文结构, 这一步是最麻烦的
- 创建GetRequest类, 继承RequestHeader类, 实现ZkRequest接口
- 创建GetResp类, 继承AbstractZkResonse类, 实现ZkResponse
- 编写GetRequestCodec, 实现ByteBuf和GetRequest, ZkResponse的转换
- 修改ZkCodecFactories类, 把GetRequest和GetRequestCodec关联起来
这样就可以实现了一个新的命令.当然还得必须再提一下, 第一步是最麻烦的, 可能要花掉百分之七八十的工作量.
说到这, 可能会有人问, 去哪里了解每个命令的报文结构呢? 方法其实有很多的, 可能官方文档就有(然而我暂时没找到). 我的办法是最简单的, 就是阅读现有ZkClient的代码. 但是现有的zkClient并不能非常直观地体现出来, 还得结合调试代码, 阅读服务端代码(解析报文),抓包等等方法.
源码
撸完了这个zkClient客户端后, 发现不够过瘾,所以后来又撸了一个redis客户端和kafka的producer/consumer.
撸完后, 发现只要了解通信协议, netty基本上可以实现任何C/S架构的客户端. 因此把这几个客户端整理到了一起,放到了github上面.
后面还想继续实现elastic-search, mysql, dubbo等等客户端(其实调研过了是可行的, 只是还没有精力去实现)
最后附上github源码地址:
https://github.com/NorthWard/awesome-netty
感兴趣的同学可以参考一下,共同学习共同进步.
加载全部内容