亲宝软件园·资讯

展开

BFT-SMaRt:用Netty做客户端的可靠信道

一面千人 人气:3

目录

  • 一、Netty服务端的构建
    • 1. 父类构造函数
      • ① 查找缓存
      • ② 相关日志
    • 2. 服务端构造
      • ① 配置读取
      • ② 服务端配置
    • 3. 服务端功能
      • ① 通用接口功能
      • ② Channel处理器
    • 4. 节点通信层已完成
  • 二、CounterClient入口
    • 1. 构建服务代理
      • ① 视图控制器
      • ② 启动Netty客户端
    • 2. Netty通信原理
    • 3. 客户端功能
      • ① 通用接口功能
      • ② channel处理器
      • ③ 已完成内容
    • 4. 调用排序消息
  • 三、后记

关键字:Netty BFT-SMaRt Channel findCache KeyLoader Bootstrap NioEventLoopGroup ChannelFuture 视图

Netty是目前最高效便捷的NIO框架。Netty可提供更加高可用、更好健壮性的稳定大规模连接的IO通道。任何一款区块链早期的技术产品,都是从联盟链开始演进,因为联盟链降低了很多原教旨的难度。回到BFT-SMaRt,它的网络连接分为节点之间的连接,节点与客户端之间的连接。节点之间的连接,我们在BFT-SMaRt:用Java做节点间的可靠信道一文中详细分析了在共识逻辑之前节点之间能够做到的连接准备。那么,本文将继续探索在BFT-SMaRt项目中,节点与客户端之间的连接是如何实现的。

作为源码研究的起点,有两个现成的入口:

  • 服务端:ServerCommunicationSystem构造函数的最后一个步骤,即clientsConn的创建。
  • 客户端:CounterClient类的入口命令,将本地作为客户端对节点发起访问请求。

一、Netty服务端的构建

首先构建服务端,转到ServerCommunicationSystem构造函数的最后一行。

clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(controller);

这里采用了工厂模式的设计:构建一个controller基类,业务方可有多个实现类,在工厂get方法中传入实现类对象,通过不同的实现类,返回不同的处理对象。BFT-SMaRt并未有多个实现类,这里可以在上层业务方进行丰富。

public class CommunicationSystemServerSideFactory {
    public static CommunicationSystemServerSide getCommunicationSystemServerSide(ServerViewController controller) {
        return new NettyClientServerCommunicationSystemServerSide(controller);
    } // 直接返回NettyClientServerCommunicationSystemServerSide对象
}

直接返回NettyClientServerCommunicationSystemServerSide对象,以下称NettyClientServerCommunicationSystemServerSide类为Netty服务端类。

1. 父类构造函数

直接进入NettyClientServerCommunicationSystemServerSide类的构造函数,函数体内无super指定父类构造函数,因此隐式调用父类SimpleChannelInboundHandler的无参构造函数。

对于不熟悉继承关系下构造函数的执行顺序的朋友,请自行补充上。

protected SimpleChannelInboundHandler() {
    this(true);
}

父类的无参构造函数指定了本地的有参构造。设定了本地属性autoRelease为true。

protected SimpleChannelInboundHandler(boolean autoRelease) {
  this.matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
  this.autoRelease = autoRelease;
}

接下来执行TypeParameterMatcher的find方法。find方法主要维护一个查找缓存,包括构建和使用。

① 查找缓存

该方法首先获得并配置查找缓存findCache:

Map<Class<?>, Map<String, TypeParameterMatcher>> findCache = InternalThreadLocalMap.get().typeParameterMatcherFindCache(); // InternalThreadLocalMap容器
Class<?> thisClass = object.getClass();
Map<String, TypeParameterMatcher> map = (Map)findCache.get(thisClass); // 类型参数匹配器
if (map == null) {
map = new HashMap();
findCache.put(thisClass, map);
}

查找缓存会将热度较高的内容优先缓存,以增进查询速度。

查找缓存的容器结构是通过InternalThreadLocalMap来构建,注意从SimpleChannelInboundHandler开始,始终带着泛型<I>进入,而本例中的泛型类为TOMMessage,该类是共识排序消息类,将会在BFT-SMaRt共识部分展开介绍。那么,find方法会将泛型类放置到查找缓存findCache中。

a) 匹配器

接下来,获得并配置类型参数匹配器,也是用于增强查找。

TypeParameterMatcher matcher = (TypeParameterMatcher)((Map)map).get(typeParamName);
if (matcher == null) {
    matcher = get(find0(object, parametrizedSuperclass, typeParamName));
    ((Map)map).put(typeParamName, matcher);
}
return matcher;

匹配器使用到Java的反射机制来查找类。

首先通过本地map查找类型参数匹配器,如果没有查到,则初始构建。使用调用find时传入的类型参数名,调用find0方法通过反射机制得到泛型类,然后调用get方法通过反射机制获得对应匹配器,最后填充进匹配器map,共同构成查找缓存findCache的内容。最后回顾一下findCache容器的结构。

Map<Class<?>, Map<String, TypeParameterMatcher>>

因此,一个类可以有多个对应不同类型参数名的匹配器。

② 相关日志

该缓存的容器结构是InternalThreadLocalMap,类加载进入内存,首先执行static静态方法。

static {
    logger.debug("-Dio.netty.threadLocalMap.stringBuilder.initialSize: {}", STRING_BUILDER_INITIAL_SIZE);
    STRING_BUILDER_MAX_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalMap.stringBuilder.maxSize", 4096);
    logger.debug("-Dio.netty.threadLocalMap.stringBuilder.maxSize: {}", STRING_BUILDER_MAX_SIZE);
}

打印出日志,StringBuilder的初始化长度以及最大长度。日志输出如下:

11:18:32.645 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
11:18:32.645 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096

2. 服务端构造

回到NettyClientServerCommunicationSystemServerSide的构造函数,首先是配置读取及分析。通过配置文件获得私钥、IP、端口号、节点总数、节点id等信息。

① 配置读取

配置读取可分为三方面:

  • 私钥读取
  • IP加端口号读取处理
  • 配置域信息读取

a) 私钥读取

Netty服务端类有一个私有属性字段privKey,用于存储私钥,以备后续签名使用。

private PrivateKey privKey;

该字段通过服务端类的构造函数赋值。

privKey = controller.getStaticConf().getPrivateKey();

跳转到Configuration类,调用getPrivateKey方法。私钥内容是从配置域controller中获取。

return keyLoader.loadPrivateKey();

keyLoader对象是在Configuration类构造时传入。而Configuration类的构造要追踪到其子类TOMConfiguration的构造函数,继续TOMConfiguration是在ViewController构造时调用。这部分内容将在CounterClient入口时展开。回到keyLoader,它是KeyLoader的实例,而KeyLoader有三个子类。

  • RSAKeyLoader,适用于RSA类非对称加密算法簇的秘钥加载。
  • ECDSAKeyLoader,适用于ECDSA类非对称加密算法簇的秘钥加载,全称椭圆曲线数字签名算法。是ECC与DSA的结合。Java原生类库中在jdk1.7以后已经加入支持。
  • SunECKeyLoader,适用于jdk自带的sunEC加密秘钥的加载,位于sun.security.ec.SunEC。

下面是他们的类图关系。

b) IP端口号

接下来是从配置域中读取节点服务器端的IP端口号。

// 获取IP、端口号
String myAddress;
String confAddress = controller.getStaticConf().getRemoteAddress(controller.getStaticConf().getProcessId())
      .getAddress().getHostAddress();
if (InetAddress.getLoopbackAddress().getHostAddress().equals(confAddress)) {
   myAddress = InetAddress.getLoopbackAddress().getHostAddress();
}
else if (controller.getStaticConf().getBindAddress().equals("")) {
   myAddress = InetAddress.getLocalHost().getHostAddress();
   // 如果Netty绑定到环回地址,客户端将无法连接节点。为了解决这个问题,我们绑定到config/hosts.config中提供的地址。
   if (InetAddress.getLoopbackAddress().getHostAddress().equals(myAddress)
         && !myAddress.equals(confAddress)) {
      myAddress = confAddress;
   }
} else {
   myAddress = controller.getStaticConf().getBindAddress();
}
int myPort = controller.getStaticConf().getPort(controller.getStaticConf().getProcessId());

这段读取代码与上一篇节点间通信如出一辙。但值得注意的是,配置域端口号是由两项组成,我们再次查看配置域内容。

#server id, address and port (the ids from 0 to n-1 are the service replicas) 
0 127.0.0.1 11000 11001
1 127.0.0.1 11010 11011
2 127.0.0.1 11020 11021
3 127.0.0.1 11030 11031

IP后面有两个端口号,第一列为客户端通信端口,第二列为节点间通信端口。就拿节点id为0的第一行举例,本地作为节点服务,其他节点要通过(server <-> server)11001端口进行访问,而其他客户端需要通过(client <-> server)11000端口进行访问。这一段在下面日志输出代码中也有体现。

logger.info("Port (client <-> server) = "
      + controller.getStaticConf().getPort(controller.getStaticConf().getProcessId()));
logger.info("Port (server <-> server) = "

日志打印:

14:36:19.223 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - Port (client <-> server) = 11000
14:38:02.617 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - Port (server <-> server) = 11001

c) 配置域信息

最后就是配置中的其他信息了。首先看代码,然后看日志输出。

logger.info("ID = " + controller.getStaticConf().getProcessId());      // 节点id
logger.info("N = " + controller.getCurrentViewN());                // 节点总数
logger.info("F = " + controller.getCurrentViewF());                // 节点最大容错数
logger.info("requestTimeout = " + controller.getStaticConf().getRequestTimeout());
logger.info("maxBatch = " + controller.getStaticConf().getMaxBatchSize());
// 根据配置中是否使用签名,打印不同的提示信息
if(controller.getStaticConf().getUseSignatures() == 1) logger.info("Using Signatures");
                     else if (controller.getStaticConf().getUseSignatures() == 2)           logger.info("Using benchmark signature verification");

logger.info("Binded replica to IP address " + myAddress);
// SSL/TLS 协议版本
logger.info("SSL/TLS enabled, protocol version: {}", controller.getStaticConf().getSSLTLSProtocolVersion());

系统配置中关于是否使用签名的配置项,用于定义客户端是否应该对消息认证码使用签名。

system.communication.useSignatures

接下来相关日志输出内容。

14:36:16.545 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - ID = 0
14:36:16.989 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - N = 4
14:36:17.230 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - F = 1
14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - requestTimeout = 2000
14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - maxBatch = 1024
14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - Binded replica to IP address 127.0.0.1
14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - SSL/TLS enabled, protocol version: TLSv1.2

② 服务端配置

讲到这里,想延伸讨论一下Netty的必要性和实现原理。Netty的必要性可以从Socket(Socket之前的网络传输技术发展就不赘述了)说起。

a) Netty的必要性

一个Socket通常是由一个线程来管理,它实现了最初安全可靠的点到点IO通信。但当客户端较多时,可能会耗尽服务端的线程资源,这是一种阻塞IO的模型。实践过程中,大量的由线程维护的网络连接始终在监听状态而没有数据传输,这是对于线程资源的浪费。

我们在上一篇关于节点间通信的研究中,使用的就是这种模型,然而那是基于节点数量不多的联盟链场景。4节点的网络只需要6条线程即可承担,不用实现复杂逻辑来处理大流量的资源维护,简单稳定的阻塞IO模型显然是更加适用的。但是,本文的研究重点转向了客户端通信,这就需要一个能够处理大流量的新模型。

设想一种任务加线程池的方式。线程不再死盯着一条两方参与的连接,而是被线程池统一管理起来。网络传输的工作会被放到任务中去,线程池通过调度机制领取任务并执行网络传输工作。在任务多的时候,调度逻辑会将任务排到一个队列中去,然后根据调度机制,启动对应规模的线程数量来控制处理任务的速度。每条线程执行完任务就会自动回归到线程池可用资源库,等待执行新的通信任务。这就是一种非阻塞IO的模型。

我们继续延伸,所谓的“调度逻辑”是如何接收任务的,这里引用到linux的多路复用IO模型,即一个select可以通过顺序扫描(轮询)的方式监测多个通道是否有通信就绪的状态,一旦有,就会启动一个回调函数将该通信内容封装到任务容器,并排到队列中去。回到函数会启动线程池资源的实例来处理IO的工作,而select将该通信实例交出去以后,就可以释放资源继续监听。

Netty就是对以上内容的封装框架,更易于使用。

b) Netty的实现原理

Netty是基于事件驱动模式的、Reactor线程模型的。事件驱动是相对于主动轮询提出的,主动轮询是说主线程在不断检查是否有事件发生,如果有则调用事件处理函数。事件驱动仍旧是主线程去检查事件的发生,当有事件时将事件放到一个队列,同时还有一条线程在不断的消费这个队列,消费时调用事件的处理函数。事件驱动方式基于主动轮询,又提出了一个线程专门作为事件消费对象,分担了原主线程的工作内容,这取自观察者模式,也更加符合单一职责原则。Reactor线程模型是一种分发机制,首先它会不断的执行selector.select()方法,用来检测并产生新的事件,然后分发事件给到适当的线程来处理。Reactor模型良好地实现了事件驱动理念。Netty应用Reactor线程模型,分为主从关系,主线程用于发现事件,从线程用于消费事件。

  • bossGroup,线程池在bind一个端口以后返回一条线程作为主线程,接收产生新事件。
  • workerGroup,线程池用来消费事件。

Netty有一个Bootstrap概念,BoostStrap是引导程序的含义,通过引导程序,可以快速配置,串联搭建起来一个Netty项目,其中ServerBootstrap是针对服务端的,Bootstrap是针对客户端的。所以,包括但不限于以上两个线程池的内容,全部被包含在Bootstrap的实例中。Netty同时也是一个异步框架,所有的操作包括绑定、IO通信等都会返回一个ChannelFuture对象,该对象可以判断isDone,isSuccess,getCause,isCanceled,以及通过addListener加入监听回调。Channel是具体的Netty中用于处理通信的组件,针对不同的通信环境,都会有不同的Channel子类来处理,处理内容包括维持通道、连接配置参数、异步IO处理ChannelHandler、返回ChannelFuture。Selector是Channel的管理器,轮询器,可以管理Channel。另外,所有Group均为线程池的意思,而NioEventLoop的含义是一个维护队列的线程。

c) 结合源码

接下来回到Netty服务端的源码配置。

ServerBootstrap b = new ServerBootstrap(); // 综上所述,先构建一个服务端启动程序。
EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreads); // 构建主线程池,初始容量为8条,用于监听Accept事件。
EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()); // 构建从线程池,初始容量为当前系统中所有的可用线程数量。

我们按流程构建了ServerBootstrap对象,然后构建了主线程池bossGroup,初始设定为8,最后构建了从线程池workerGroup,初始设定为系统当前所有可用线程数量。这也不难理解,因为实践过程中,我们总会注意到事件的发现相较于事件的处理是更快速的。因此8条主线程可以覆盖事件发现的工作,而为了更高效使用机器性能,剩余的线程资源都用来事件的消费了。下面是BFT-SMaRt自定义的编解码工具工厂。

sessionReplicaToClient = new ConcurrentHashMap<>(); // 并发主流容器,线程安全且高效的HashMap
rl = new ReentrantReadWriteLock(); // 可重入读写锁
serverPipelineFactory = new NettyServerPipelineFactory(this, sessionReplicaToClient, controller, rl); // 本地开发的工具工厂,用于编解码处理。

接下来将以上准备好的资源设定配置到ServerBootstrap。

b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_SNDBUF, tcpSendBufferSize)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMsec)
.option(ChannelOption.SO_BACKLOG, connectionBacklog)
.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(serverPipelineFactory.getDecoder());
        ch.pipeline().addLast(serverPipelineFactory.getEncoder());
        ch.pipeline().addLast(serverPipelineFactory.getHandler());
    }
    })
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);

首先通过ServerBootstrap的group方法配置主从线程池,然后配置channel类,接着配置一系列参数option。最后为事件消费线程配置参数和channel初始化initChannel,编辑码部分不多展开(编解码也是Netty的功能特色),主要是消费事件的处理类,设定为this,即当前服务端类。接下来给ServerBootstrap绑定IP端口 。

ChannelFuture f = b.bind(new InetSocketAddress(myAddress, myPort)).sync();

ServerBootstrap绑定IP端口会返回一个ChannelFuture对象,通过sync()阻塞等待绑定完成返回状态给新建ChannelFuture对象f。最后,将f的channel赋值给服务端类的私有属性Channel对象mainChannel。

mainChannel = f.channel();

3. 服务端功能

NettyClientServerCommunicationSystemServerSide类继承自SimpleChannelInboundHandler<TOMMessage>,实现了CommunicationSystemServerSide接口。其中CommunicationSystemServerSide接口是BFT-SMaRt自定义的,主要用于描述客户端通信中服务端的常用功能。

① 通用接口功能

下面进入CommunicationSystemServerSide接口,查看接口函数。

public interface CommunicationSystemServerSide {   
   public void send(int[] targets, TOMMessage sm, boolean serializeClassHeaders);
   public int[] getClients();
   public void setRequestReceiver(RequestReceiver requestReceiver);
   public void shutdown();
}

其中send方法是网络通信中,节点给客户端发送消息的具体方法,消息类型为TOMMessage。getClient方法会遍历sessionReplicaToClient数据集合,将已建立的节点-客户端会话连接中的客户端统计出来并返回一个整形数组,目前该方法未被使用。setRequestReceiver是设置本地属性requestReceiver。shutdown方法是关闭当前Netty系统。

② Channel处理器

前面介绍了,Bootstrap构建的Netty系统中的Channel处理器就是服务端类本身,我们回到该类的声明部分,也看到了它是继承自SimpleChannelInboundHandler<TOMMessage>,继续追本溯源,它是ChannelHandler的子类,这符合处理器的声明。

public ServerBootstrap childHandler(ChannelHandler childHandler)

下面,我们查看在服务端类中关于Channel处理器的重写方法,也是包含4个方法:

  • channelActive
  • channelInactive
  • exceptionCaught
  • channelRead0

前三个方法都是来自于祖父类ChannelInboundHandlerAdapter,这三个方法是捕捉了channel的三个生命周期,方法体就是在不同的生命周期需要补充做的事。他们的实现更多像是一种标准。第四个方法来自于父类SimpleChannelInboundHandler,是核心的channel读取数据的方法。

a) Channel生命周期

进入ChannelInboundHandlerAdapter类,该类完整地展示了Channel生命周期中的所有状态。

一个Channel从最初注册到Selector上面,变为活跃状态,便可以读取数据,期间可以更改可写入能力,捕获异常,触发用户事件。接着Channel可能变为不活跃状态。Channel也可以随时选择从Selector上解除注册。

服务端类对于Channel生命周期的3个实现,是一些常规处理和日志提醒。

b) 读取数据

channelRead0方法是Channel读取信道中数据的核心方法。

protected void channelRead0(ChannelHandlerContext ctx, TOMMessage sm) throws Exception {
   if (this.closed) {
      closeChannelAndEventLoop(ctx.channel());
      return;
   }
   // 交付消息到TOM层
   if (requestReceiver == null)
      logger.warn("Request receiver is still null!");
   else
      requestReceiver.requestReceived(sm);
}

closeChannelAndEventLoop是前面生命周期方法中常用的方法,用于清空数据、解除注册、关闭channel,关闭相关线程。下面核心方法是调用了requestReceiver的requestReceived方法。requestReceiver是前面介绍的通用接口setRequestReceiver设置的,RequestReceiver接口目前只有一个实现类是TOMLayer。这也放在共识阶段再研究。

4. 节点通信层已完成

到目前为止,结合前一篇文章,本地节点的服务端通信系统ServerCommunicationSystem就全部构建完成了。

public ServerCommunicationSystem(ServerViewController controller, ServiceReplica replica) throws Exception {
    super("Server Comm. System");
    this.controller = controller;
    messageHandler = new MessageHandler();
    inQueue = new LinkedBlockingQueue<SystemMessage>(controller.getStaticConf().getInQueueSize());

    serversConn = new ServersCommunicationLayer(controller, inQueue, replica);
    clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(controller);
}

目前,节点服务部分重新回到bftsmart.tom.ServiceReplica#init方法,

cs = new ServerCommunicationSystem(this.SVController, this);

这一行代码的内容通过这两篇文章已经全部介绍完毕,从下一篇开始,继续向下分析,将initTOMLayer()作为入口介绍节点服务中共识部分的实现原理。

二、CounterClient入口

前文通过一个章节的叙述,分析了BFT-SMaRt在节点客户端通信过程中Netty服务端的构建。本章介绍另一方,即Netty客户端的构建,计划从CounterClient的main函数作为入口开始研究。

public static void main(String[] args) throws IOException {
    if (args.length < 2) { // 参数个数校验,最少2个
        System.out.println("CounterClient <process id> <increment> [<operations>]");
        System.out.println("if <increment> equals 0, read-only");
        System.out.println("default <number of operations> equals 1000");
        System.exit(-1);
    }
    // (1)通过节点id,建立客户端服务代理
    ServiceProxy counterProxy = new ServiceProxy(Integer.parseInt(args[0]));
    try {
        int inc = Integer.parseInt(args[1]); // 影响值
        // 操作次数,默认1000次
        int numberOfOps = (args.length > 2) ? Integer.parseInt(args[2]) : 1000;
        for (int i = 0; i < numberOfOps; i++) { // 按照操作次数循环
            // 将影响值放入输出流out
            ByteArrayOutputStream out = new ByteArrayOutputStream(4);
            new DataOutputStream(out).writeInt(inc);
            System.out.print("Invocation " + i);
            // (2)调用实操方法,通过输出流传入影响值
            byte[] reply = (inc == 0) ?
                    counterProxy.invokeUnordered(out.toByteArray()) :
                    counterProxy.invokeOrdered(out.toByteArray()); //magic happens here
            if (reply != null) {
                // 通过输入流读取返回值
                int newValue = new DataInputStream(new ByteArrayInputStream(reply)).readInt();
                System.out.println(", returned value: " + newValue);
            } else {
                System.out.println(", ERROR! Exiting.");
                break;
            }
        }
    } catch (IOException | NumberFormatException e) {
        counterProxy.close(); // 关闭代理
    }
}

该方法中比较重要的两个步骤,

  • 其一是为客户端构建服务代理
  • 其二是调用实操方法,消费影响值。

1. 构建服务代理

进入ServiceProxy类的构造函数,

public ServiceProxy(int processId) {
   this(processId, null, null, null, null);
}

跳转到本地的五参数的构造函数,

public ServiceProxy(int processId, String configHome,
      Comparator<byte[]> replyComparator, Extractor replyExtractor, KeyLoader loader) {
   // 代理服务初始化,包括网络通信、共识视图、系统配置
   if (configHome == null) {
      init(processId, loader);
   } else {
      init(processId, configHome, loader);
   }
   // 构建一个TOMMessage数组,大小为节点总数。
   replies = new TOMMessage[getViewManager().getCurrentViewN()];
   // 比较器,继承自jdk工具Comparator,重写方法实现可比较两个字节数组是否相等的功能。
   comparator = (replyComparator != null) ? replyComparator : new Comparator<byte[]>() {
      @Override
      public int compare(byte[] o1, byte[] o2) {
         return Arrays.equals(o1, o2) ? 0 : -1;
      }
   };
   // 导出器,继承自工具Extractor,可构建自定义的响应消息提取器。
   extractor = (replyExtractor != null) ? replyExtractor : new Extractor() {
      @Override
      public TOMMessage extractResponse(TOMMessage[] replies, int sameContent, int lastReceived) {
         return replies[lastReceived];
      }
   };
}

主要代码为初始化,由于当前configHome传入为null,调用的两个参数的init方法。由于ServiceProxy类是TOMSender的子类,因此调用的init方法是父类的方法。

public void init(int processId, KeyLoader loader) {
   // 构建视图控制器
   this.viewController = new ClientViewController(processId, loader);
   // 启动Netty通信
   startsCS(processId);
}

① 视图控制器

ViewController类是系统的上下文环境,是由系统配置项构建。以下是BFT-SMaRt关于视图的类图关系。

视图层级的根节点是View类,它实现了Serializable接口,所以视图都是可序列化的。视图最基本的属性就是id,容错数,节点id数组以及连接地址集合。在视图控制器ViewController中,最终可以得到所有网络配置属性及方法。

a) 配置整合

ViewController类是View子类,包含了更全面的属性字段,这些属性的值来自于两个渠道:

  • 配置文件包括host.config以及system.config
  • 配置类Configuration。

而TOMConfiguration类通过一个map容器Map<String, String>对象configs,有机地将以上两个渠道的所有配置全部提取出来,在内存中构建了静态配置对象staticConf。

b) 视图存储

继续ViewController类的研究,视图除了在内存中使用,也可以被持久化存储在文件中。这个能力来自于接口ViewStorage,该接口提供了两个功能,

public interface ViewStorage {
    public boolean storeView(View view);    // 是否存储成功
    public View readView();                 // 读取视图
}

目前该接口的实现只有DefaultViewStorage类,它可以将视图对象通过对象输出流写入文件保存在磁盘上,同时还可以从磁盘上通过对象输入流将文件数据恢复成内存中的View对象。

c) 服务端视图控制器

根据上面的类图,ServerViewController是ViewController的一个子类。作为共识节点服务端,它主要提供了共识方面的属性功能。核心属性如下:

private int quorumBFT; // ((n + f) / 2) replicas
private int quorumCFT; // (n / 2) replicas
private int[] otherProcesses;
private int[] lastJoinStet;
private List<TOMMessage> updates = new LinkedList<TOMMessage>();
private TOMLayer tomLayer;

quorumBFT是指在BFT网络中的有效确认数,同理quorumCFT则是在CFT网络中的有效确认数。lastJoinStet是用来记录最后加入加点的,是指那些配置域以外的节点,可以是TTP,或者是陌生节点,需要重新配置reconfigure视图参数。updates是共识消息TOMMessage的容器,tomLayer是共识层的对象。ServerViewController对象是构建节点通信系统的参数,这是前面所遗漏的部分,在此补充上。

首先在节点服务类的构建中包含:

this.SVController = new ServerViewController(id, configHome, loader);

接着构造函数内继续执行init方法,会构建节点通信系统,

cs = new ServerCommunicationSystem(this.SVController, this);

第一个参数传入的就是服务端视图对象,该对象在构建节点通信系统时发挥了重要作用,例如读取系统配置,判断节点来源等等。那么后续的内容在上一篇博文中就已经非常详细了,这里就到此为止。

d) 客户端视图控制器

我们回到TOMSender的init方法,构建客户端视图控制器。相对来讲,ClientViewController的内容就很少了,它只有两个构造函数和两个自有方法。

public ClientViewController(int procId, KeyLoader loader) {
    super(procId, loader); // 初始化系统配置
    View cv = getViewStore().readView(); // 从磁盘读取视图对象
    // 调用reconfigureTo将视图内容配置View属性。
    if(cv == null){
        // 若未读取成功,则通过配置参数构建新视图
        reconfigureTo(new View(0, getStaticConf().getInitialView(),
            getStaticConf().getF(), getInitAdddresses()));
    }else{
        reconfigureTo(cv);
    }
}

初始化配置,然后读取视图,通过ViewController的reconfigureTo方法(注意区分ServiceProxy也有一个reconfigureTo方法,要比这个方法复杂)替换视图。

public void reconfigureTo(View newView) {
    this.lastView = this.currentView;   // 将当前视图变为上一个视图
    this.currentView = newView;         // 传入的新视图变为当前视图
}

到此,客户端的视图控制器就构建完成了。

② 启动Netty客户端

前面第一章节已经详细介绍了节点Netty服务端的构建,下面就开始启动相对应的Netty客户端。通过TOMSender类的startCS方法。可以注意到参数clientId在前面的名字是processId,该参数是用来标识客户端的,而不是用来指定请求节点的。

private void startsCS(int clientId) {
   this.cs = CommunicationSystemClientSideFactory.getCommunicationSystemClientSide(clientId, this.viewController);
   this.cs.setReplyReceiver(this); // This object itself shall be a reply receiver
   this.me = this.viewController.getStaticConf().getProcessId();
   this.useSignatures = this.viewController.getStaticConf().getUseSignatures() == 1;
   this.session = new Random().nextInt();
}

我们注意到,Netty服务端的构建只传入了一个服务端视图控制器ServerViewController对象,

public NettyClientServerCommunicationSystemServerSide(ServerViewController controller)

而Netty客户端的构建传入了客户端id和客户端视图控制器ClientViewController对象两个参数。

public NettyClientServerCommunicationSystemClientSide(int clientId, ClientViewController controller)
  • NettyClientServerCommunicationSystemClientSide,后面简称为Netty客户端类。
  • NettyClientServerCommunicationSystemServerSide,前面介绍过了,简称为Netty服务端类。

下面进入到客户端类的构造函数,首先执行的父类构造函数super(),由于Netty客户端类和服务端类都是继承自同一个父类SimpleChannelInboundHandler,因此可参照(一-1)父类构造函数的内容。接下来,客户端类并没有主线程池而只有从线程池workerGroup,即bossGroup是Netty服务端类特有的。

this.workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());

构建从线程池,初始容量为当前系统中所有的可用线程数量。接下来,私钥的读取也可参考(一-2-①-a)。然后通过视图对象获取所有的节点id。

int[] currV = controller.getCurrentViewProcesses();

对节点id数组进行遍历,向每个节点发起连接请求,secretKeyFactory是加密工具组件。

ChannelFuture future = connectToReplica(replicaId, secretKeyFactory);

客户端指定自身id,然后向共识网络发起请求,而不是指定节点,进入共识网络以后,会遍历节点分别建立连接,这与理论部分中的逻辑图是吻合的。

a) 连接到指定节点

进入connectToReplica连接到指定节点方法。

public synchronized ChannelFuture connectToReplica(int replicaId, SecretKeyFactory fac)
      throws NoSuchAlgorithmException, InvalidKeySpecException, InvalidKeyException {
   // 2端参与的连接认证密码,暂未使用
   String str = this.clientId + ":" + replicaId;
   PBEKeySpec spec = TOMUtil.generateKeySpec(str.toCharArray());
   SecretKey authKey = fac.generateSecret(spec);
   // 配置启动程序
   Bootstrap b = new Bootstrap();
   b.group(workerGroup);
   b.channel(NioSocketChannel.class);
   b.option(ChannelOption.SO_KEEPALIVE, true);
   b.option(ChannelOption.TCP_NODELAY, true);
   b.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize);
   b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMsec);
   b.handler(getChannelInitializer()); // 添加channel处理器
   // 启动连接到指定的节点replicaId,返回ChannelFuture
   ChannelFuture channelFuture = b.connect(controller.getRemoteAddress(replicaId));
   // 缓存连接会话到sessionClientToReplica,这是一个ConcurrentHashMap容器
   NettyClientServerSession ncss = new NettyClientServerSession(
         channelFuture.channel(), replicaId); // 构建Netty客户端请求服务端的会话对象
   sessionClientToReplica.put(replicaId, ncss);

   return channelFuture;
}

相对Netty服务端来讲,连接过程差不多但简单很多,仍旧通过启动程序Bootstrap完成快速构建。首先为Bootstrap添加了线程池workerGroup,然后指定了Channel类型为NioSocketChannel(注意区分服务端的channel类型为NioServerSocketChannel),接着配置参数,添加Channel处理器。然后,通过连接方法建立与指定节点的连接通信。

ChannelFuture f = b.connect(controller.getRemoteAddress(replicaId));

这对应的是Netty服务端的,

ChannelFuture f = b.bind(new InetSocketAddress(myAddress, myPort)).sync();

客户端的connect也可以加一个阻塞等待sync(),即:

ChannelFuture f = b.connect(controller.getRemoteAddress(replicaId)).sync();

服务端成功绑定了IP端口,就开始监听该地址了。此时客户端通过connect方法连接指定地址的节点。connect方法的参数是通过controller.getRemoteAddress(replicaId)返回的SocketAddress类型对象,内容就是IP端口号。连接成功以后,客户端ChannelFuture可以通过isSuccess()方法返回true来得到结果。回到客户端类的构造函数,

future.awaitUninterruptibly();

将线程阻塞在这一行代码,保持线程的通信可用性,直到Channel关闭,此处是通过捕捉connect方法抛出的异常而完成线程关闭。

public ChannelFuture connect(SocketAddress remoteAddress) {
    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    } else {
        this.validate();
        return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
    }
}

connect方法中往下调用,会根据通道的不同情况抛出异常。

b) Channel处理器

注意观察以上代码,作为重要的组成部分,channel处理器是通过getChannelInitializer()构建的。

private ChannelInitializer getChannelInitializer() throws NoSuchAlgorithmException {

   final NettyClientPipelineFactory nettyClientPipelineFactory = new NettyClientPipelineFactory(this,
         sessionClientToReplica, controller, rl);

   ChannelInitializer channelInitializer = new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) throws Exception {
         ch.pipeline().addLast(nettyClientPipelineFactory.getDecoder());
         ch.pipeline().addLast(nettyClientPipelineFactory.getEncoder());
         ch.pipeline().addLast(nettyClientPipelineFactory.getHandler());
      }
   };
   return channelInitializer;
}

这部分代码与Netty服务端也很相似,最终Channel处理器也指向了客户端类本身。

2. Netty通信原理

到此为止,我们构建了Netty服务端,也建立了客户端并对服务端发起了连接请求。那么通过以上Netty相关的内容,下面通过一张图来介绍Netty的通信原理。

首先由服务端启动程序初始化channel,然后发起绑定将channel注册到主线程池的按顺序的某一个线程的selector。Selector会轮询channel的accept事件。这时候如果有客户端启动程序发起的connect连接触发accept事件,该事件会执行一个任务并被放到任务队列中去,等待消费。当runAllTasks消费到该任务,则建立起另一条channel并注册到从线程池的按顺序的某一个线程的selector。Selector会轮询读写事件。Channel中当数据被接收完成,表示读就绪就是读事件;同样的,Channel中当可以写数据时,标识写就绪就是写事件。读写事件发生都会单独执行一个任务并被放到任务队列中去,等待任务消费。当runAllTasks消费到该任务,则会处理具体读写事件。

3. 客户端功能

客户端与节点之间通过Netty建立了通信。客户端类实现了CommunicationSystemClientSide接口,与服务端同样继承了SimpleChannelInboundHandler<TOMMessage>,因此也可分为通用客户端接口类和Channel生命周期方法。

① 通用接口功能

客户端类实现了CommunicationSystemClientSide接口,该接口定义了客户端节点通信中,节点应该具备的功能。

public interface CommunicationSystemClientSide {
   public void send(boolean sign, int[] targets, TOMMessage sm);
   public void setReplyReceiver(ReplyReceiver trr); // 设置ReplyReceiver字段
   public void sign(TOMMessage sm); // 未使用
   public void close();
   public void updateConnections();
}

a) 发送消息准备

首先看send接口,在客户端类的实现体中,send首先计算出基于BFT或CFT的最少确认数quorum。

int quorum;
Integer[] targetArray = Arrays.stream(targets).boxed().toArray(Integer[]::new);
Collections.shuffle(Arrays.asList(targetArray), new Random());
if (controller.getStaticConf().isBFT()) {
   quorum = (int) Math.ceil((controller.getCurrentViewN() + controller.getCurrentViewF()) / 2) + 1;
} else {
   quorum = (int) Math.ceil((controller.getCurrentViewN()) / 2) + 1;
}
listener.waitForChannels(quorum); // 等待前面的传输完成,收集足够的消息确认数

当共识要求的最少确认数达成以后,客户端才可以发起请求。请求的类型是TOMMessage,被发送前要通过字节数组输出流序列化得到消息对象sm的序列化消息serializedMessage。(这个在EOS的合约请求中也是常见的,data对象中除明文参数以外,还会有hex作为请求的序列化消息,便于传输。)

if (sm.serializedMessage == null) {
   DataOutputStream dos = null;
   try {
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      dos = new DataOutputStream(baos);
      sm.wExternal(dos);
      dos.flush();
      sm.serializedMessage = baos.toByteArray();
   } catch (IOException ex) {
      logger.debug("Impossible to serialize message: " + sm);
   }
}

b) 消息签名

接下来,要通过本地私钥给已序列化的消息进行签名。

sm.serializedMessageSignature = signMessage(privKey, sm.serializedMessage);

调用本地signMessage方法进行签名。

public byte[] signMessage(PrivateKey key, byte[] message) {
   try {
      // 签名引擎,用于签名的工具。
      if (signatureEngine == null) {
         signatureEngine = TOMUtil.getSigEngine();
      }
      byte[] result = null;
      // 载入私钥 java.security.Signature.initSign(java.security.PrivateKey)
      signatureEngine.initSign(key);
      // 载入待签名消息 java.security.Signature.update(byte[])
      signatureEngine.update(message);
      // 执行签名 java.security.Signature.sign()
      result = signatureEngine.sign();
      // 返回签名结果
      return result;
   } catch (Exception e) {
      logger.error("Failed to sign message", e);
      return null;
   }
}

签名的底层代码是由jdk中java.security.Signature包所完成的,感兴趣的小伙伴可深入研究。签名后得到了消息对象sm的serializedMessageSignature字段的值。

c) 发送消息

接下来,会遍历发送终点,即所有的节点。每拿到一个节点,则将该节点作为目的地存入消息对象sm的destination字段。

sm.destination = targets[target];

接下来上锁并获得channel,

rl.readLock().lock();
Channel channel = ((NettyClientServerSession) sessionClientToReplica.get(targets[target])).getChannel();
rl.readLock().unlock();

sessionClientToReplica容器在前面建立Netty客户端时谈到过,是一个连接会话的容器,当再次获取连接时不必重新构建。在这个容器中按照节点查找得到指定节点的连接channel。判断如果该channel是可用的,则发送。

if (channel.isActive()) {
   sm.signed = sign; // 签名成功后会将该标示位sign置为true。
   ChannelFuture f = channel.writeAndFlush(sm);
   f.addListener(listener);
   sent++; // 发送计数器,用于共识确认数。
}

将消息写入channel,然后为该处理添加监听器,以便能捕捉处理状态事件。该监听器在Netty客户端类构建时被赋值。

this.listener = new SyncListener();

SyncListener类是Netty客户端类的内部类,它重写了operationComplete事件。同时,增加了方法waitForChannels,用于共识机制中,收集回复确认数的相关通道的等待。

d) 关闭通道

close方法是用来将通道关闭的。

public void close() {
   this.closed = true;                  // 设置关闭标志位
   rl.readLock().lock();                // 上锁
   ArrayList<NettyClientServerSession> sessions = new ArrayList<>(sessionClientToReplica.values());      // 读取连接会话容器中的所有连接
   rl.readLock().unlock();
   for (NettyClientServerSession ncss : sessions) { // 遍历关闭
      Channel c = ncss.getChannel();    // 从对象中获取channel
      closeChannelAndEventLoop(c);      // 安全关闭channel以及EventLoop线程
   }
}

e) 更新连接(视图)

当有新的节点加入时,需要更新视图。以便于客户端遍历节点发送消息,因此会涉及修改Netty客户端的连接。我们知道,Netty客户端与各个节点的连接被放在了连接会话容器sessionClientToReplica。更新连接时,说明视图已经更新完毕。那么要对视图中所有节点进行遍历。

int[] currV = controller.getCurrentViewProcesses();
for (int i = 0; i < currV.length; i++) {
    ...
}

遍历每一个节点,然后判断是否在sessionClientToReplica存在连接,如果存在说明是老节点,如果不存在说明是新节点。那么针对新节点,要建立新的连接并放到sessionClientToReplica。建立连接的方式与新建是相同的。

ChannelFuture future = connectToReplica(replicaId, secretKeyFactory);
future.awaitUninterruptibly();

参考前面(二-1-②-a)。

② channel处理器

这部分主要是处理channel的生命周期,与Netty服务端的内容基本一致。也是四个方法:

  • channelActive,标识
  • channelInactive,调用scheduleReconnect
  • channelUnregistered,调用scheduleReconnect
  • exceptionCaught,输出错误日志

这四个实现与Netty服务端完全一致。这里补充一下scheduleReconnect方法的内容。

a) 定时重连

scheduleReconnect顾名思义,是定时重连的含义。

private void scheduleReconnect(final ChannelHandlerContext ctx, int time) {
   if (closed) { // 如果是已关闭状态,则走关闭流程。
      closeChannelAndEventLoop(ctx.channel());
      return;
   }
   // 未关闭状态,则定时重连。
   final EventLoop loop = ctx.channel().eventLoop();    // 首先获得eventLoop线程
   loop.schedule(new Runnable() {                       // 为线程增加定时任务
      @Override
      public void run() {
         reconnect(ctx);                                // 任务执行reconnect方法。
      }
   }, time, TimeUnit.SECONDS);
}

下面进入重连方法reconnect,这个方法在Netty服务端也有,与新建连接差不多,但是重连一般都会在sessionClientToReplica容器已存在。

public void reconnect(final ChannelHandlerContext ctx) {
   rl.writeLock().lock();
   ArrayList<NettyClientServerSession> sessions = new ArrayList<NettyClientServerSession>(
         sessionClientToReplica.values());
   for (NettyClientServerSession ncss : sessions) { // 遍历连接
      if (ncss.getChannel() == ctx.channel()) {
         int replicaId = ncss.getReplicaId();
         try {
            if (controller.getRemoteAddress(replicaId) != null) {
               ChannelFuture future;
               try {
                  // 建立连接
                  future = connectToReplica(replicaId, secretKeyFactory);
               } catch (InvalidKeyException | InvalidKeySpecException e) {
                  logger.error("Error in key.",e);
               }
               logger.info("ClientID {}, re-connection to replica {}, at address: {}", clientId, replicaId,
                     controller.getRemoteAddress(replicaId));
            } else {
               // 说明该节点已经删除,则从sessionClientToReplica删除连接。
               removeClient(replicaId);
            }
         } catch (NoSuchAlgorithmException ex) {
            logger.error("Failed to reconnect to replica", ex);
         }
      }
   }
   rl.writeLock().unlock();
}

建立连接时仍旧调用connectToReplica方法。参考前面(二-1-②-a)。

③ 已完成内容

致此,CounterClient中通过节点id,建立客户端服务代理的工作已完成。

4. 调用排序消息

BFT-SMaRt中经常出现的TOM前缀的内容,一般我都会归并到共识中去,这几篇文章也未展开。那么TOM的含义是什么?其实就是Total ordered messages的含义,也就是全排序消息,这就是共识的一种体现。

byte[] reply = (inc == 0) ?
        counterProxy.invokeUnordered(out.toByteArray()) :
        counterProxy.invokeOrdered(out.toByteArray()); 

回到CounterClient,当影响值为0时,调用无序方法invokeUnordered,对应类型为TOMMessageType.UNORDERED_REQUEST。当影响值为其他值时,调用共识方法invokeOrdered。后续我的猜测是通过Netty服务端拿到这个值,然后通过ServerViewController的lastView的值加上影响值,然后变为currentView即可,返回currentView的最新的值。这部分内容可以在下一篇详细展开。

三、后记

经过本文以及前面几篇BFT-SMaRt相关的文章,可靠信道的部分就全部介绍完了。后续会展开节点对于消息的共识逻辑,以及视图更换后状态同步的逻辑的研究。

更多文章请转到一面千人的博客园。

加载全部内容

相关教程
猜你喜欢
用户评论