亲宝软件园·资讯

展开

elasticsearch的zenDiscovery和master选举机制

zziawan 人气:0

前言

上一篇通过 ElectMasterService源码,分析了master选举的原理的大部分内容:master候选节点ID排序保证选举一致性及通过设置最小可见候选节点数目避免brain split。节点排序后选举只能保证局部一致性,如果发生节点接收到了错误的集群状态就会选举出错误的master,因此必须有其它措施来保证选举的一致性。这就是上一篇所提到的第二点:被选举的数量达到一定的数目同时自己也选举自己,这个节点才能成为master。这一点体现在zenDiscovery中,本篇将结合节点的发现过程进一步介绍master选举机制。

节点启动后首先启动join线程,join线程会寻找cluster的master节点,如果集群之前已经启动,并且运行良好,则试图连接集群的master节点,加入集群。否则(集群正在启动)选举master节点,如果自己被选为master,则向集群中其它节点发送一个集群状态更新的task,如果master是其它节点则试图加入该集群。

join的代码

private void innerJoinCluster() {
        DiscoveryNode masterNode = null;
        final Thread currentThread = Thread.currentThread();
     //一直阻塞直到找到master节点,在集群刚刚启动,或者集群master丢失的情况,这种阻塞能够保证集群一致性
        while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
            masterNode = findMaster();
        }
      //有可能自己会被选举为master(集群启动,或者加入时正在选举)
      if (clusterService.localNode().equals(masterNode)) {
      //如果本身是master,则需要向其它所有节点发送集群状态更新
            clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {
                @Override
                public ClusterState execute(ClusterState currentState) {
            //选举时错误的,之前的master状态良好,则不更新状态,仍旧使用之前状态。
                    if (currentState.nodes().masterNode() != null) {
                       return currentState;
                    }
                    DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().localNode().id());
                    // update the fact that we are the master...
                    ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
                    currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();
                    // eagerly run reroute to remove dead nodes from routing table
                    RoutingAllocation.Result result = allocationService.reroute(currentState);
                    return ClusterState.builder(currentState).routingResult(result).build();
                }
                @Override
                public void onFailure(String source, Throwable t) {
                    logger.error("unexpected failure during [{}]", t, source);
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                }
                @Override
                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                    if (newState.nodes().localNodeMaster()) {
                        // we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
                        joinThreadControl.markThreadAsDone(currentThread);
                        nodesFD.updateNodesAndPing(newState); // start the nodes FD
                    } else {
                        // if we're not a master it means another node published a cluster state while we were pinging
                        // make sure we go through another pinging round and actively join it
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    }
                    sendInitialStateEventIfNeeded();
                    long count = clusterJoinsCounter.incrementAndGet();
                    logger.trace("cluster joins counter set to [{}] (elected as master)", count);
                }
            });
        } else {
            // 找到的节点不是我,试图连接该master
            final boolean success = joinElectedMaster(masterNode);
            // finalize join through the cluster state update thread
            final DiscoveryNode finalMasterNode = masterNode;
            clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new ClusterStateNonMasterUpdateTask() {
                @Override
                public ClusterState execute(ClusterState currentState) throws Exception {
                    if (!success) {
                        // failed to join. Try again...
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        return currentState;
                    }
                    if (currentState.getNodes().masterNode() == null) {
                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                        // a valid master.
                        logger.debug("no master node is set, despite of join request completing. retrying pings.");
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        return currentState;
                    }
                    if (!currentState.getNodes().masterNode().equals(finalMasterNode)) {
                        return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
                    }
                    // Note: we do not have to start master fault detection here because it's set at {@link #handleNewClusterStateFromMaster }
                    // when the first cluster state arrives.
                    joinThreadControl.markThreadAsDone(currentThread);
                    return currentState;
                }
                @Override
                public void onFailure(String source, @Nullable Throwable t) {
                    logger.error("unexpected error while trying to finalize cluster join", t);
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                }
            });
        }
    }

加载全部内容

相关教程
猜你喜欢
用户评论