BFT-SMaRt:用Java做节点间的可靠信道
一面千人 人气:1目录
- 一、引子
- 二、名词统一
- 1. 节点id
- 2. 节点
- 3. 本地节点
- 4. 配置域
- 5. TTP
- 6. 陌生域
- 三、节点服务类
- 四、节点通信系统概览
- 五、节点通信层准备
- 1. 创建socket服务端
- 2. [种类组合]本地节点与远端节点
- 3. 本地和远端节点均为配置域
- 4. 建立socket连接
- ① 校验规则
- ② 安全握手
- ③ IO开启
- ④ 独立发送线程
- ⑤ 独立接收线程
- 5. 配置域的本地和远端节点连接完成
- 6. 重连
- 六、启动节点通信层
- 1. IO开启
- 2. 待处理连接
- 3. 节点连接对象完成
- ① 线程状况
- ② 已涉及类图
- ③ [种类组合]结果现状
- ④ 代码调试进程
- 七、后记
关键字:区块链 可靠信道 BFT-SMaRt Socket SSL/TLS 网络通信
信道的可靠是BFT的前提。(参见两军问题)
本文通过跟踪BFT-SMaRt通信层源码,研究节点间可靠信道的实现原理。本文涉及区块链方面的内容较少,重点研究使用Java语言建立可靠网络通道的技术,请选择性阅读。
通信层系统,是分布式网络中获得可靠且认证的点对点通道的保证。BFT-SMaRt的安全通信是基于SSL/TLS标准。
- 节点之间建立互为信任的Socket IO连接,实现点对点的消息处理。
- 节点与客户端之间建立健壮性、可用性更高的Netty NIO连接,实现大规模的消息处理。
本文主要介绍第一种情况:在BFT-SMaRt中,作为服务端的节点之间的连接构建方法。
一、引子
接上一篇 BFT-SMaRt的理论与实践 ,启动分布式计数器服务示例程序时,需输入命令:
runscripts/smartrun.sh bftsmart.demo.counter.CounterServer 0
命令调用的是CounterServer类的内容,先查看CounterServer的类结构。
接着再看一下CounterServer的类图。
通过CounterServer的类图可以清晰地展示它的关系结构。其父类DefaultSingleRecoverable实现了Recoverable和SingleExecutable接口,而SingleExecutable继承了Executable。追本溯源,根部是以下两个接口:
- Recoverable:恢复程序,实现此接口的类应该实现状态转移协议。通常,类应该同时实现这个接口和一个Executable。
- Executable:执行程序,实现此接口,可接收无序的客户端请求。如果要支持有序的请求,可以选择其子类接口FIFOExecutable、BatchExecutable或SingleExecutable。
回到CounterServer的源码,它是所有官方示例中架构最简单的,这会提高我们的学习效率。CounterServer有两个类属性:
- counter,计数字段,保存计数器的状态值。
- iterations,操作次数,日志记录以及数据恢复时使用。
下面开始项目调试,我们为命令手动配置添加请求参数“0”作为节点id,然后启动命令进入CounterServer的main入口函数:
public static void main(String[] args){
if(args.length < 1) {
System.out.println("Use: java CounterServer <processId>");
System.exit(-1);
}
new CounterServer(Integer.parseInt(args[0]));
}
进来先校验参数个数,然后调用CounterServer构造函数。函数内创建了一个ServiceReplica对象。
public CounterServer(int id) {
new ServiceReplica(id, this, this);
}
传入的第一个参数是命令带入的唯一参数,即节点id。后面两个参数的值都是this,是将CounterServer分别作为执行程序和恢复程序。
二、名词统一
在本文的研究中,会涉及到一些由本系统提出,并且非常重要的名词概念。为避免后续发生同一件事的称呼混乱,造成困扰,在这里统一声明。
1. 节点id
replicaId、processId、remoteId、TTPid指的都是节点id,但包含以下几种情况:
- replicaId:节点作为一个副本的时候。
- processId:一个处理单元作为节点的时候。
- remoteId:外部的节点id。
- TTPid:设置的TTP节点的id。
2. 节点
Replica是分布式系统中的副本,在区块链网络中代表一个服务节点,节点不一定是一台机器,也可能是一个处理单元,下面统一称作节点。注意,所有节点都是用一套代码编译部署的环境。
3. 本地节点
本文依据命令runscripts/smartrun.sh bftsmart.demo.counter.CounterServer 0
,因此本地节点都指的是CounterServer,id为0的节点。本文只研究本地作为节点的情况。而本地作为客户端的情况(CounterClient作为入口),在后续文章介绍。
4. 配置域
组网配置文件host.config所描述的,由确定数量且顺序编号的节点所组成的网络,我们可以称之为配置域。下面就是示例节点的host.config的内容。
#server id, address and port (the ids from 0 to n-1 are the service replicas)
0 127.0.0.1 11000 11001
1 127.0.0.1 11010 11011
2 127.0.0.1 11020 11021
3 127.0.0.1 11030 11031
5. TTP
在system.config系统配置文件中有相关配置项:
system.ttp.id
该配置项的值是一个节点id,所以只能配置一个,我们称之为TTPid。一般是在配置域外,可用作向系统添加和删除节点。注意,根据约定,TTPid一定大于配置域任意id。
6. 陌生域
那么如果一个节点id既不属于配置域,又不是ttpid,我们称之为陌生id。除配置域和ttp以外的所有空间,我们称之为陌生域。陌生id的加入,在主流区块链产品例如比特币等,都会有完整的解决方案。BFT-SMaRt的分布式网络中除了启动时的配置域和TTP,也允许陌生id的接入,后面会有相关介绍。换句话讲,P2P网络最精彩的部分就是与陌生域的自由联系。当然了,如果是成熟的联盟链产品,会通过权限控制管理配置域、TTP和陌生域。
三、节点服务类
进入ServiceReplica的构造函数就意味着离开了示例程序,深入到了BFT-SMaRt标准库内容。ServiceReplica类可以被称为本地节点服务类,主要用作管理本地作为节点的基础服务,包括网络通信和节点间消息共识。这个类从DeliveryThread接收消息,并管理应用程序的执行和对客户端的回复。对于顺序消息逐个执行的应用程序,ServiceReplica接收一致决定的批处理,逐个交付,并使用批处理回复。在应用程序成批执行消息的情况下,该批消息被交付给应用程序,ServiceReplica不需要成批组织应答。
/**
* Constructor
*
* @param id 节点(副本)ID
* @param configHome 配置文件
* @param executor 执行器
* @param recoverer 恢复器
* @param verifier 请求校验器
* @param replier 请求回复器
* @param loader 加载签名器
*/
public ServiceReplica(int id, String configHome, Executable executor, Recoverable recoverer, RequestVerifier verifier, Replier replier, KeyLoader loader) {
this.id = id;
// 读取配置文件,构建配置域视图实例。
this.SVController = new ServerViewController(id, configHome, loader); // NEXT TODO
this.executor = executor; // 传入执行程序
this.recoverer = recoverer; // 传入恢复程序
this.replier = (replier != null ? replier : new DefaultReplier()); // 回复的包装类
this.verifier = verifier; // null,判断请求有效性,只是true/flase,未展开有效判断的逻辑。
this.init(); // 节点初始化(重点)
// 节点环境,上下文内容,属共识层内容,在恢复程序和回复消息时都被需要。
this.recoverer.setReplicaContext(replicaCtx); // NEXT TODO
this.replier.setReplicaContext(replicaCtx); // NEXT TODO
}
回复器接口Replier只有一个实现类DefaultReplier。它的功能就是在正常的回复开始前,保证对节点上下文ReplicaContext的校验。当ReplicaContext为空时,会挂起等待,直到有值时,才会走正常的回复。 ServerViewController是共识层的内容,本篇不展开。接下来,进入执行init初始化函数。
private void init() {
try {
cs = new ServerCommunicationSystem(this.SVController, this); // 创建本地节点通信系统
} catch (Exception ex) {
logger.error("Failed to initialize replica-to-replica communication system", ex);
throw new RuntimeException("Unable to build a communication system.");
}
if (this.SVController.isInCurrentView()) {
logger.info("In current view: " + this.SVController.getCurrentView());
initTOMLayer(); // NEXT TODO:初始化共识协议层,BFT-SMaRt的要达成的共识对象是TOM,即total ordered message,追求的是所有节点上面的全部消息有序且一致。
} else { // 该节点不属于当前配置域
logger.info("Not in current view: " + this.SVController.getCurrentView());
logger.info("Waiting for the TTP: " + this.SVController.getCurrentView());
waitTTPJoinMsgLock.lock();
try {
// 等待“TTP节点”流程(ServerConnection -> run -> replica.joinMsgReceived)
canProceed.awaitUninterruptibly();
} finally {
waitTTPJoinMsgLock.unlock();
}
}
initReplica(); // 启动本地节点通信系统
}
这段代码是初始化节点的全部动作,其中涉及TTP节点的流程参照下文。
节点初始化的主流程分为三步:创建本地节点通信系统,初始化共识协议层,启动本地节点通信系统。本文焦点在可靠通信,主要介绍本地节点通信系统的创建以及启动。初始化共识协议会放在下一篇来讲。
四、节点通信系统概览
本地节点通信系统的工作主要靠类ServerCommunicationSystem来维护。接着上面的代码进入它的构造函数。
public ServerCommunicationSystem(ServerViewController controller, ServiceReplica replica) throws Exception {
super("Server Comm. System");
this.controller = controller;
// 初始化消息处理工具
messageHandler = new MessageHandler();
// 请求消息容器,是一个阻塞消息队列,大小根据配置文件。
inQueue = new LinkedBlockingQueue<SystemMessage>(controller.getStaticConf().getInQueueSize());
// 本地节点与远端节点的连接对象(本文重点部分)
serversConn = new ServersCommunicationLayer(controller, inQueue, replica);
// 本地节点与外部客户端的连接对象(本文不介绍)
clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(controller);
}
节点通信系统类ServerCommunicationSystem承担了所有本地节点的通信工作,它是一个统一的管理类。从业务角度来考虑,节点的通信可分为两种:
- 与其他若干节点通信的负责对象是 serversConn。
- 与外部客户端通信的负责对象是 clientsConn。
下面,我们梳理源码中的线索,主要研究负责节点通信的对象的构建。
五、节点通信层准备
本地节点与远端节点的通信是通过ServersCommunicationLayer类完成的。ServersCommunicationLayer类本身是一个线程类,在启动该线程之前,要通过配置文件做一些准备工作。前面提到了配置域的概念,这是在配置文件中设置完成的,因此在启动ServersCommunicationLayer线程之前,可预先完成配置域的本地和远端节点的连接。
1. 创建socket服务端
进入ServersCommunicationLayer构造函数,首先创造本地的基于SSL/TLS安全协议的socket服务端对象serverSocketSSLTLS。
// 通信层属性设置
this.controller = controller;
this.inQueue = inQueue;
this.me = controller.getStaticConf().getProcessId();
this.replica = replica;
this.ssltlsProtocolVersion = controller.getStaticConf().getSSLTLSProtocolVersion();
//获取访问地址,IP加端口。
String myAddress;
String confAddress = controller.getStaticConf().getRemoteAddress(controller.getStaticConf().getProcessId())
.getAddress().getHostAddress();
if (InetAddress.getLoopbackAddress().getHostAddress().equals(confAddress)) {
myAddress = InetAddress.getLoopbackAddress().getHostAddress();
} else if (controller.getStaticConf().getBindAddress().equals("")) {
myAddress = InetAddress.getLocalHost().getHostAddress();
if (InetAddress.getLoopbackAddress().getHostAddress().equals(myAddress) && !myAddress.equals(confAddress)) {
myAddress = confAddress;
}
} else {
myAddress = controller.getStaticConf().getBindAddress();
}
int myPort = controller.getStaticConf().getServerToServerPort(controller.getStaticConf().getProcessId());
// 开始构建基于SSL/TLS协议的Socket服务端对象。
FileInputStream fis = null;
try {
fis = new FileInputStream("config/keysSSL_TLS/" + controller.getStaticConf().getSSLTLSKeyStore());
ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(fis, SECRET.toCharArray());
} finally {
if (fis != null) {
fis.close();
}
}
String algorithm = Security.getProperty("ssl.KeyManagerFactory.algorithm");
kmf = KeyManagerFactory.getInstance(algorithm);
kmf.init(ks, SECRET.toCharArray());
trustMgrFactory = TrustManagerFactory.getInstance(algorithm);
trustMgrFactory.init(ks);
context = SSLContext.getInstance(this.ssltlsProtocolVersion);
context.init(kmf.getKeyManagers(), trustMgrFactory.getTrustManagers(), new SecureRandom());
serverSocketFactory = context.getServerSocketFactory();
// 获得socket服务端对象
this.serverSocketSSLTLS = (SSLServerSocket) serverSocketFactory.createServerSocket(myPort, 100,
InetAddress.getByName(myAddress));
serverSocketSSLTLS.setEnabledCipherSuites(this.controller.getStaticConf().getEnabledCiphers());
String[] ciphers = serverSocketFactory.getSupportedCipherSuites();
for (int i = 0; i < ciphers.length; i++) {
logger.trace("Supported Cipher: {} ", ciphers[i]);
}
serverSocketSSLTLS.setEnableSessionCreation(true);
serverSocketSSLTLS.setReuseAddress(true);
serverSocketSSLTLS.setNeedClientAuth(true);
serverSocketSSLTLS.setWantClientAuth(true);
// 用户密码部分,待应用程序去实现,目前SECRET是写死的。
SecretKeyFactory fac = TOMUtil.getSecretFactory();
PBEKeySpec spec = TOMUtil.generateKeySpec(SECRET.toCharArray());
selfPwd = fac.generateSecret(spec);
通过上面的源码,我们梳理出来建立基于SSL_TLS的socket服务端对象的步骤:
- 读取SSL_TLS秘钥配置文件,构建KeyStore实例并加载秘钥。
- 构建基于SunX509算法的KeyManagerFactory实例,传入KeyStore完成初始化。
- 构建TrustManagerFactory实例,传入KeyStore完成初始化。
- 构建SSL上下文SSLContext实例,传入KeyManagerFactory和TrustManagerFactory。
- 通过上下文创建基于SSLTLS的socket连接实例serverSocketSSLTLS。
- 为serverSocketSSLTLS设置CipherSuites,加密套件。
- 为serverSocketSSLTLS设置支持创建多会话。
- 为serverSocketSSLTLS设置支持地址重用。
- 为serverSocketSSLTLS设置想要want客户端认证,如果客户端不提供认证也继续。
- 为serverSocketSSLTLS设置必须need客户端认证,如果客户端不提供认证不继续。
以上所有步骤如再进一步深入就下探到了rt.jar的javax.net.ssl包的内容,超出本文研究范围。到目前为止,我们获得了一个在节点间通信的基于SSLTLS的socket连接serverSocketSSLTLS。
private SSLServerSocket serverSocketSSLTLS;
用户密码
通信层还提供了节点的本地密码加密功能selfPwd,这个密码目前是由SECRET写死在当前类中的,如果是基于BFT-SMaRt开发,可以继承SECRET,通过节点管理员输入,记录本地的用户密码。不同于SSL/TLS安全协议中的各种秘钥,以及区块链必要的公私钥,用户密码在用户体验方面更人性化,通过用户密码来管理整个节点的权利,是有益的。
2. [种类组合]本地节点与远端节点
本地节点与远端节点的通信包含一种情况,如下表所示:
本地节点\远端节点 | 配置域 | TTP | 陌生域 |
---|---|---|---|
配置域 | [A]本地和远端节点均为配置域 | [D]配置域接收TTP | [G]配置域接收陌生节点 |
TTP | [B]本地节点作为TTP接入配置域 | [E] TTP只能配置一个 | [H]TTP接入陌生节点 |
陌生域 | [C]陌生节点接入配置域 | [F]陌生节点接入TTP | [I]陌生节点接入陌生节点 |
根据表格所示,共有九种情况,我分别用大写字母作为标识将他们区分。下面会在相应章节分析所有的情况,为便于查找,会使用这些标识关联。其中[E]是悖论,因此,
[E]:悖论。
下面进入ServersCommunicationLayer的构造函数。通信层类ServersCommunicationLayer,是一个线程类,构建时传入配置域视图实例、节点对象、消息队列。
3. 本地和远端节点均为配置域
ServersCommunicationLayer拥有一个私有属性的HashMap容器connections,对应的是网络中每个节点都有一个自己的connections容器。该容器用来储存本地节点的所有对外连接,以远端id为key,连接对象为值。目前为止,该容器仍旧是空的,接下来,首先存储域内其他3个节点的连接对象,id分别为1,2,3。
if (controller.isInCurrentView()) { // 本地节点在配置域
int[] initialV = controller.getCurrentViewAcceptors(); // 配置域节点id数组,[0,3]
for (int i = 0; i < initialV.length; i++) {
if (initialV[i] != me) { // 配置域中的非本地节点,找出其他3个
getConnection(initialV[i]);
}
}
}
遍历配置域的节点id,筛选出非本地节点的远端节点作为网络内远程节点id。
private ServerConnection getConnection(int remoteId) { // 其他3个节点id就是remoteId了。
connectionsLock.lock();
ServerConnection ret = this.connections.get(remoteId);
if (ret == null) { // 如果remoteId未曾建立连接,则新建,并按id检索更新其连接对象。
// 建立连接(重点),第二个参数socket为null
ret = new ServerConnection(controller, null,
remoteId, this.inQueue, this.replica);
this.connections.put(remoteId, ret); // 以id为key,连接对象为值,存储到HashMap容器。
}
connectionsLock.unlock();
return ret;
}
4. 建立socket连接
本地节点与配置域内的其他3个节点建立连接,是通过ServerConnection类的构造函数。注意此时传入的socket对象为null。
ServerConnection是系统首次创建连接。
public ServerConnection(ServerViewController controller,
SSLSocket socket, int remoteId,
LinkedBlockingQueue<SystemMessage> inQueue,
ServiceReplica replica) {
this.controller = controller;
this.socket = socket;
this.remoteId = remoteId;
this.inQueue = inQueue;
this.outQueue = new LinkedBlockingQueue<byte[]>(this.controller.getStaticConf().getOutQueueSize());
if (isToConnect()) { // 校验规则
ssltlsCreateConnection(); // 安全握手,建立通道。
}
if (this.socket != null) {
try {
// socket不为空时,建立socket IO通信流,用于发送和读取
socketOutStream = new DataOutputStream(this.socket.getOutputStream());
socketInStream = new DataInputStream(this.socket.getInputStream());
} catch (IOException ex) {
logger.error("Error creating connection to " + remoteId, ex);
}
}
// 系统配置:指定通信系统是否应该使用独立线程来发送数据
this.useSenderThread = this.controller.getStaticConf().isUseSenderThread();
if (useSenderThread && (this.controller.getStaticConf().getTTPId() != remoteId)) {
new SenderThread().start(); // 默认使用独立发送线程,启动SenderThread。
} else {
sendLock = new ReentrantLock(); // 使用当前线程发送,加重入锁
}
if (!this.controller.getStaticConf().isTheTTP()) { // 当前节点不是TTP
if (this.controller.getStaticConf().getTTPId() == remoteId) { // 对方节点是TTP
new TTPReceiverThread(replica).start(); // 启用TTP接入线程
} else { // remoteId不是TTP
new ReceiverThread().start(); // 启动接收线程。
}
}
}
① 校验规则
isToConnect()返回true或false。默认情况不连接,校验规则有三条:
- 当远端节点是TTP时,不连接。
- 当本地节点是TTP是,连接。
- 当本地节点在配置域,同时远端节点id小于本地节点的id,连接。(id分配从小到大,如果比本地节点小,说明也在配置域里)
如果本地节点和远端节点均为配置域,则前两条与TTP相关的规则都没用了。接着看第三条,本地节点此时是0,远端节点为1,则不连接。这条规则很有趣,它规定了节点id从小到大去触发连接。我们通过例子来分析:
- 本地启动id为0的节点,此时 isToConnect = false,不执行ssltlsCreateConnection建立连接。
- 本地启动id为1的节点,那么假定id为0的远端节点已经启动完成,则此时 isToConnect = true,执行ssltlsCreateConnection建立连接。如果远端节点为2或3,仍旧 isToConnect = false。
所以,系统期待节点能够按照配置域中的顺序编号依次启动,完成首次连接。但如果节点不按照配置域的序号启动会发生什么?例如:
- 本地启动id为1的节点,那么假定id为0的远端节点已经启动完成,则此时 isToConnect = true,执行ssltlsCreateConnection建立连接。但此时远端节点0并未启动,执行ssltlsCreateConnection会报错:
17:25:06.985 [main] ERROR bftsmart.communication.server.ServerConnection - Connection refused (SocketException)
但这个错是被捕捉到的,因此不影响后续流程。节点如果满足条件的话,仍旧会创建独立发送线程以及接收线程。
② 安全握手
安全连接的建立首先需要有连接请求,然后握手。如果没有连接,在握手逻辑时就会被检查到,那么就会中断当前流程,等待直到获得远端节点的连接请求。然后封装SSL/TLS协议,最后得到Socket服务端对象。通过socket服务端始终监听socket客户端,可以达到基于socket连接的两方的在安全加密的通道内持续通信的目的。
这一次调用ServerConnection函数,是建立与配置域内的3个节点的连接,因此符合校验规则。下面直接进入ssltlsCreateConnection方法。根据SSL/TLS协议,建立Socket服务端与Socket客户端的通信连接的第一步,就是双方进行安全握手。
基于SSL/TLS的Socket服务端对象创建流程参照下文创建socket服务端,同样的,我们为Socket客户端对象增加SSL/TLS协议。
拿到对象以后,为其增加异步的握手完成监听HandshakeCompletedListener
,当握手完成时,会在日志中打印相关信息,便于调试及流程展示。真实场景的日志信息如下:
-- SSL/TLS handshake complete!, Id:0 # CipherSuite: TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
-- SSL/TLS handshake complete!, Id:1 # CipherSuite: TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
-- SSL/TLS handshake complete!, Id:2 # CipherSuite: TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
接下来,启动握手。这部分内容是与本项目无关的底层逻辑,握手分为:
- performInitialHandshake,连接状态为握手时,发送初始化消息,包括握手hash,hello版本,格式检查。
- kickstartHandshake,处理其他连接状态,
握手的过程就是正式建立数据流之前,通道先进行安全预热,经过几次预热以后,就完成了握手过程,双方就建立了信任通道。
若配置域内远端节点未启动,则握手失败。这里的握手代码只是执行一次,但在启动本地节点通信系统后会不断地向connections的节点发起定时任务requestsTimer,发送多次的握手请求。这里我们假定其他3个节点均已启动,远端节点与本地节点都有连接请求,可以完成安全握手。
握手成功,开启数据通信。将本地节点id发送给新连接的数据输出流,作为新连接接入的身份介绍。
new DataOutputStream(this.socket.getOutputStream())
.writeInt(this.controller.getStaticConf().getProcessId());
本地节点向输出流写入本地id。注意所有节点用的是同一套代码,所以作为远端节点来讲,就是获得了一个输入流,内容为remoteId(为0)。
③ IO开启
回到ServerConnection,ssltlsCreateConnection方法已经让this.socket获得了有效的连接对象,下面就开启IO。
if (this.socket != null) {
try {
// socket不为空时,建立socket IO通信流,用于发送和读取
socketOutStream = new DataOutputStream(this.socket.getOutputStream());
socketInStream = new DataInputStream(this.socket.getInputStream());
} catch (IOException ex) {
logger.error("Error creating connection to " + remoteId, ex);
}
}
this.socket有两种情况可以获得有效对象:
- ServerConnection构造函数的参数中socket字段传入为有效对象。
- isToConnect()返回true,ssltlsCreateConnection安全握手构建有效对象。
如果以上两种情况均不满足,则this.socket为null,则不运行该分支代码,无法开启IO。
④ 独立发送线程
继续往下跟踪源码。
// UseSenderThread是一个系统配置。
this.useSenderThread = this.controller.getStaticConf().isUseSenderThread();
if (useSenderThread && (this.controller.getStaticConf().getTTPId() != remoteId)) {
new SenderThread().start(); // 启动独立发送线程
} else {
sendLock = new ReentrantLock();
}
如果在system.config中,配置了system.communication.useSenderThread为true,且远端节点是属于配置域或陌生域。则在建立连接时,会启动独立线程SenderThread来发送数据。
如果系统配置useSenderThread为false,或者远端节点是TTP,则不启动独立发送线程,创建重进入锁,用于主线程发送消息时上锁使用。
SenderThread线程类源码如下。
private class SenderThread extends Thread {
public SenderThread() {
super("Sender for " + remoteId);
}
@Override
public void run() {
byte[] data = null;
while (doWork) {
try {
// 从out队列(保存待发送消息)获取并移除队头的元素
data = outQueue.poll(POOL_TIME, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
}
if (data != null) {
logger.trace("Sending data to, RemoteId:{}", remoteId);
sendBytes(data); // 发送数据
}
}
logger.debug("Sender for " + remoteId + " stopped!");
}
}
outQueue发送队列保存了所有待发送消息。
protected LinkedBlockingQueue<byte[]> outQueue;
独立发送线程通过调用发送队列的poll方法获取并移除队列中队头的元素,作为发送数据,通过sendBytes方法发送出去。上一小节我们已经创建了IO流对象,独立发送线程使用socket输出流对象,在sendBytes方法体中将字节化的数据放入通道发送出去。
private final void sendBytes(byte[] messageData) {
boolean abort = false;
do {
if (abort)
return; // 如果需要重连,则终止方法
if (socket != null && socketOutStream != null) {
try {
byte[] data = new byte[5 + messageData.length];// 没有包含MAC,消息验证码
int value = messageData.length; // 长度
System.arraycopy(new byte[] { (byte) (value >>> 24), (byte) (value >>> 16), (byte) (value >>> 8),
(byte) value }, 0, data, 0, 4); // data前个字节,用数据长度做位运算,给主数据做混淆
// 从data第5个字节开始,复制一份原数据。
System.arraycopy(messageData, 0, data, 4, messageData.length);
// data的最后一个字节,写入一个0
System.arraycopy(new byte[] { (byte) 0 }, 0, data, 4 + messageData.length, 1);
socketOutStream.write(data); // 发送数据
return;
} catch (IOException ex) {
closeSocket();
waitAndConnect(); // IO异常,重连
abort = true;
}
} else {
waitAndConnect(); // socket丢失,重连
abort = true;
}
} while (doWork);
}
如果过程中出现任何连接异常,则尝试重连。
⑤ 独立接收线程
回到ServerConnection,继续跟踪代码。
if (!this.controller.getStaticConf().isTheTTP()) {
if (this.controller.getStaticConf().getTTPId() == remoteId) {
new TTPReceiverThread(replica).start(); // TTP部分
} else {
new ReceiverThread().start();
}
}
如果本地节点不是TTP,远端节点是TTP,则启动TTPReceiverThread独立线程。如果远端节点不是TTP,可能是配置域或陌生域,则启动独立接收线程ReceiverThread。目前还未涉及到TTP的连接,因此进入接收线程。
protected class ReceiverThread extends Thread {
public ReceiverThread() {
super("Receiver for " + remoteId);
}
@Override
public void run() {
while (doWork) {
if (socket != null && socketInStream != null) {
try {
// 先读取消息长度,分配空间。
int dataLength = socketInStream.readInt();
byte[] data = new byte[dataLength];
// 读入数据
int read = 0;
do {
read += socketInStream.read(data, read, dataLength - read);
} while (read < dataLength);
byte hasMAC = socketInStream.readByte(); // 输入流继续读入一个字节,判断是否存在消息验证码
logger.trace("Read: {}, HasMAC: {}", read, hasMAC);
SystemMessage sm = (SystemMessage) (new ObjectInputStream(new ByteArrayInputStream(data))
.readObject()); // 包装消息为SystemMessage对象
// SSL/TLS协议的验证已完成
sm.authenticated = true;
if (sm.getSender() == remoteId) { // 发送者为远端id
if (!inQueue.offer(sm)) { // 将消息添加至读入队列
logger.warn("Inqueue full (message from " + remoteId + " discarded).");
}
}
} catch (ClassNotFoundException ex) {
logger.info("Invalid message received. Ignoring!");
} catch (IOException ex) {
if (doWork) {
logger.debug("Closing socket and reconnecting");
closeSocket();
waitAndConnect(); // IO异常,重连
}
}
} else {
waitAndConnect(); // socket丢失,重连
}
}
}
}
读入的消息都会被转化为统一的SystemMessage对象,被放入读入队列inQueue等待后续处理。
private LinkedBlockingQueue<SystemMessage> inQueue;
5. 配置域的本地和远端节点连接完成
到目前为止,ServersCommunicationLayer构造函数就差最后一行就执行完毕。
start(); // ServersCommunicationLayer构造函数的最后一行用于启动主线程
如果假定配置域中四个节点的代码均执行挂起在ServersCommunicationLayer构造函数的最后一行,也就是还没有启动ServersCommunicationLayer构造函数主线程。那么每个节点系统内都是有且只有6条线程存在。
结合上一节的源码分析,本地节点与远端节点建立了独立的发送线程和独立的接收线程。共有3个远端节点,因此就产生了6条线程。这也符合软件设计原则中的单一职责原则
。如上图所示,线程服务对象分别对应着节点id为1,2,3。本地节点是0。本地节点与配置域的3个远端节点建立了ServerConnection。并且将连接对象ServerConnection缓存到了本地容器connections中。未来可以直接通过remoteId得到连接对象。
那么,配置域节点间是否真正建立起有效连接了呢?
答案是否定的,不一定。这取决于这4个节点是否按照顺序执行。如果节点0首先启动程序挂起到start(),然后节点1再启动程序同样挂起到start()。这时候,根据前文介绍的isToConnect()规则校验,节点1会与节点0成功建立连接。以此类推,配置域内所有节点都会成功建立与其他3个节点的连接,所以此时配置域内有1-0、2-1、2-0、3-2、3-1、3-0根据公式
\[
C(4,2) = 6
\]
共6个连接,共24个线程。如果不按照以上顺序执行,由于我们假定所有节点都挂起在start(),因此其实只有一次机会去建立连接。节点1在启动时去找节点0但失败了,就失去了这次连接的机会。同样的,配置域内其他节点也是这样。这时候配置域内就不一定是几个连接了,因为可能有的节点按顺序了,有的没有,例如启动顺序为1,0,2,3 就会建立5条连接,仍旧24个线程,这里正好缺了一条节点0和节点1的连接。而值得注意的是,无论是否建立连接,每个节点都会有6个线程,所以配置域中4个节点24个线程是不会改变的。
如果,配置域间未成功建立连接,后续怎么办?
那就需要启动ServerCommunicationSystem主线程start(),通过代码编写定时任务,每个节点都会启动独立的线程,让节点之间拥有持续的连接机会,而不是仅这一次。
6. 重连
socket连接在建立过程或IO通信过程中,出现IO异常或socket丢失的时候,都会做重连处理。重连处理通过执行方法waitAndConnect,以及方法reconnect。
private void waitAndConnect() {
if (doWork) {
try {
Thread.sleep(POOL_TIME); // 阻塞线程5s
} catch (InterruptedException ie) {
}
outQueue.clear(); // 清空发送队列
reconnect(null); // 调用重连
}
}
该方法首先会等待5秒钟,然后再清空发送队列,调用重连方法reconnect。
protected void reconnect(SSLSocket newSocket) {
connectLock.lock();
if (socket == null || !socket.isConnected()) {
if (isToConnect()) { // 校验规则
ssltlsCreateConnection(); // 安全握手
} else {
socket = newSocket;
}
if (socket != null) {
try {
// 建立连接的标志就是启动socket IO流,与首次连接ServerConnection是一样的。
socketOutStream = new DataOutputStream(socket.getOutputStream());
socketInStream = new DataInputStream(socket.getInputStream());
} catch (IOException ex) {
logger.error("Failed to authenticate to replica", ex);
}
}
}
connectLock.unlock();
}
前面分析过的ServerConnection是首次连接,reconnect是重连,因此建立连接的代码差不多,ssltlsCreateConnection只会在这两个方法被调用。建立连接的标志就是启动socket IO流,校验后完成握手即可创建IO流,重连完成。
六、启动节点通信层
通过前面几节的详细论述,ServersCommunicationLayer构造函数最后一行之前的所有流程我们都已研究清楚。本节将执行start()启动ServersCommunicationLayer的主线程。下面进入run函数。
public void run() {
while (doWork) {
try {
// 启动已有连接的通信监听
SSLSocket newSocket = (SSLSocket) serverSocketSSLTLS.accept();
setSSLSocketOptions(newSocket);
// 读入连接建立时存有本地节点id的输出流。现在的角色变了,
// 那个本地节点成为了远端节点,那么输出流就是当前本地节点的输入流。
int remoteId = new DataInputStream(newSocket.getInputStream()).readInt();
if (!this.controller.isInCurrentView() &&
(this.controller.getStaticConf().getTTPId() != remoteId)) {
// 本地节点不是配置域且远端节点不是TTP,对应情况[B][C][H][I]
waitViewLock.lock();
pendingConn.add(new PendingConnection(newSocket, remoteId));
waitViewLock.unlock();
} else { // 远端节点为TTP或本地节点为配置域,对应情况[A][D][F][G]
logger.debug("Trying establish connection with Replica: {}", remoteId);
establishConnection(newSocket, remoteId);
}
} catch (SocketTimeoutException ex) {
logger.trace("Server socket timed out, retrying");
} catch (SSLHandshakeException sslex) {
sslex.printStackTrace();
} catch (IOException ex) {
logger.error("Problem during thread execution", ex);
}
}
try {
serverSocket.close();
} catch (IOException ex) {
logger.error("Failed to close server socket", ex);
}
logger.info("ServerCommunicationLayer stopped.");
}
ServersCommunicationLayer主线程,首先如果连接已成功,开始监听,建立通道。通道内第一个消息是远端节点id,分析远端id和本地id的情况,得到:
- 本地节点不是配置域且远端节点不是TTP,对应情况[B][C][H][I],放入容器pendingConn,等待后续处理。
- 远端节点为TTP或本地节点为配置域,对应情况[A][D][F][G],直接调用establishConnection建立连接。
1. IO开启
第二种情况比较简单,
else { // 远端节点为TTP或本地节点为配置域,对应情况[A][D][F][G]
logger.debug("Trying establish connection with Replica: {}", remoteId);
establishConnection(newSocket, remoteId);
}
我们直接进入establishConnection函数。
private void establishConnection(SSLSocket newSocket, int remoteId) throws IOException {
if ((this.controller.getStaticConf().getTTPId() == remoteId) || this.controller.isCurrentViewMember(remoteId)) {
// 远端ID为TTP或配置域,从[A][D][F][G]中筛选,剩余[A][D][F]
connectionsLock.lock();
if (this.connections.get(remoteId) == null) { // connections中不存在,说明首次连接
this.connections.put(remoteId, // 传入newSocket调用ServerConnection
new ServerConnection(controller, newSocket, remoteId, inQueue, replica));
} else { // connections中已有连接对象,重连即可。
logger.debug("ReConnecting with replica: {}", remoteId);
this.connections.get(remoteId).reconnect(newSocket);
}
connectionsLock.unlock();
} else { // 本地节点为配置域接收远端陌生节点,属于[G]情况的处理,直接关闭。
logger.debug("Closing connection with replica: {}", remoteId);
newSocket.close();
}
}
参考以上代码中的注释,首先情况[G]会被直接关闭。
[G]:关闭连接
剩余[A][D][F],如果connections容器中能查到,则直接传入socket调用reconnect方法重连。前文分析了reconnect方法,但与这一次分支不同。
if (isToConnect()) {
ssltlsCreateConnection(); // [A]
} else {
socket = newSocket; // [D][F]
}
决定权又来到了isToConnect()。我们再次对比规则(默认不连接):
- 远端TTP,不连接 [D]
- 本地TTP,连接
- 本地配置域且id大于远端id,TTPid肯定大于配置域,所以远端id一定是配置域。双双配置域[A]
所以,[D][F]为不连接传入,[A]为连接。
[A]:如果失去了准备阶段的首次配置域连接机会,这里可以再次建立连接。构建IO通道是建立连接的标志。
[D][F]:使用当前socket,既然读到了remoteId,说明连接已成功。那么就使用该socket建立IO通道。
2. 待处理连接
回到第一种情况,本地节点不是配置域且远端节点不是TTP,对应情况[B][C][H][I],放入容器pendingConn,等待后续处理。
if (!this.controller.isInCurrentView() && (this.controller.getStaticConf().getTTPId() != remoteId)) {
// 本地节点不是配置域且远端节点不是TTP,对应情况[B][C][H][I]
waitViewLock.lock();
pendingConn.add(new PendingConnection(newSocket, remoteId));
waitViewLock.unlock();
}
PendingConnection类是pendingConn 的元素,用于存储挂起连接。陌生节点在接收到连接的响应之后,只有学习了当前视图后才可接受连接。这主要是为了处理陌生域的情况。
private List<PendingConnection> pendingConn = new LinkedList<PendingConnection>();
3. 节点连接对象完成
ServerCommunicationSystem构造函数中ServersCommunicationLayer的对象serversConn完成。下面我们通过4个维度进行总结。
① 线程状况
此时本地线程除了上面的6条以外,又增加了一条刚刚启动的ServersCommunicationLayer的线程。
② 已涉及类图
接着,我们梳理一下到目前为止,涉及到的几个类的类图关系。
实现Runnable接口的都是线程类,目前ServerCommunicationSystem还未构建完成,因此其他3个线程类共创建了7条线程,如上所示。
③ [种类组合]结果现状
这里再次更新一下前面本地节点与远端节点的种类组合的结果情况,参见五-2。
本地节点\远端节点 | 配置域 | TTP | 陌生域 |
---|---|---|---|
配置域 | [A] 构建IO通道(终态) | [D] 构建IO通道 | [G] 关闭连接(终态) |
TTP | [B] 待处理 | [E] 悖论(终态) | [H] 待处理 |
陌生域 | [C] 待处理 | [F] 构建IO通道 | [I] 待处理 |
如上表所示,[A][E][G]情况已经处理完毕。剩余的部分,其中[D][F]还有待TTP流程处理。而[B][C][H][I]还需进一步处理。
④ 代码调试进程
到目前为止,代码调试的执行情况进展到了ServerCommunicationSystem构造函数。
下面继续ServerCommunicationSystem的构造,进入clientConn的构造。
七、后记
经过第五、第六两章的论述,节点通信对象serversConn已完成。节点间的通信是通过原生socket建立的,其中也涉及到我们手动对socket进行SSL/TLS安全协议的构建。前文也分析了线程的情况,由于我们假定在联盟链的场景中,作为服务端的节点并不多,因此维护长连接的线程开销可以承受。然而如果从公链角度或者客户端请求连接来考虑,原生socket是不足以支撑这种情况的。我们看到节点间的通信甚至没有使用到线程池以及后续的一系列改善方案,这是随着场景的变化需要不断升级适配的。本章开始介绍客户端的通信,客户端通信的特点是需要维护更大规模的连接以及IO通信。因此,客户端的通信采用了目前较主流更健壮的Netty NIO的方式。下面一篇文章将继续分析BFT-SMaRt在底层网络通信架构上对于Netty的使用。
更多文章请转到一面千人的博客园。
加载全部内容