亲宝软件园·资讯

展开

elasticsearch分布式及数据的功能

zziawan 人气:0

从功能上说,可以分为两部分,分布式功能和数据功能。分布式功能主要是节点集群及集群附属功能如restful借口、集群性能检测功能等,数据功能主要是索引和搜索。代码上这些功能并不是完全独立,而是由相互交叉部分。当然分布式功能是为数据功能服务,数据功能肯定也难以完全独立于分布式功能。

它的源码有以下几个特点:

模块化:

每个功能都以模块化的方式实现,最后以一个借口向外暴露,最终通过guice(google轻量级DI框架)进行管理。整个系统有30多个模块(version1.5)。

接口解耦:

es代码中使用了大量的接口进行代码解耦,刚开始看的感觉是非常难以找到相关功能的实现,但是也正是这些接口使得代码实现的非常优雅。

异步通信:

作为一个高效的分布式系统,es中异步通信实现非常之多,从集群通信到搜索功能,使用了异步通信框架netty作为节点间的通信框架。

以上的这些特点在后面的代码分析中会一一体现。概述的结尾以es的启动过程来结束,es的启动类是Bootstrap,启动脚本调研这个类的main方法开始启动node。它的类图如下所示:

上图仅仅显示了它的field,其中node是要启动的节点。keepAliveThread线程保证节点运行期间Bootstrap会一直存在,可以接收关机命令进行从而优雅关闭。下面是启动前的属性设置,代码如下:

private void setup(boolean addShutdownHook, Tuple<Settings, Environment> tuple) throws Exception {
     if (tuple.v1().getAsBoolean("bootstrap.mlockall", false)) {//尝试锁定内存
            Natives.tryMlockall();
        }
        tuple = setupJmx(tuple);
        NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder().settings(tuple.v1()).loadConfigSettings(false);
        node = nodeBuilder.build();//初始化node
        if (addShutdownHook) {//添加关闭node的hook
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    node.close();
                }
            });
        }
    }

尝试锁定内存左右是保证节点运行期间的内存不变动,以防因为内存变得带来性能上的波动,这里调用的是c方法。最后来看一下main方法:

public static void main(String[] args) {
.... 
String stage = "Initialization";//标明启动阶段用于构造错误信息。
        try {
            if (!foreground) {
                Loggers.disableConsoleLogging();
                System.out.close();
            }
            bootstrap.setup(true, tuple);
            stage = "Startup";
            bootstrap.start();//bootstrap的启动过程也就是node的启动过程
            if (!foreground) {
                System.err.close();
            }
//构造一个线程,保证bootstrap不退出,仍然可以接收命令。
            keepAliveLatch = new CountDownLatch(1); 
            // keep this thread alive (non daemon thread) until we shutdown/
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    keepAliveLatch.countDown();
                }
            });
            keepAliveThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        keepAliveLatch.await();
                    } catch (InterruptedException e) {
                        // bail out
                    }
                }
            }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
            keepAliveThread.setDaemon(false);
            keepAliveThread.start();
        } catch (Throwable e) {
            ESLogger logger = Loggers.getLogger(Bootstrap.class);
            if (bootstrap.node != null) {
                logger = Loggers.getLogger(Bootstrap.class, bootstrap.node.settings().get("name"));
            }
            String errorMessage = buildErrorMessage(stage, e);
            if (foreground) {
                System.err.println(errorMessage);
                System.err.flush();
            } else {
                logger.error(errorMessage);
            }
            Loggers.disableConsoleLogging();
            if (logger.isDebugEnabled()) {
                logger.debug("Exception", e);
            }
            System.exit(3);
        }

main函数有省略,这里start函数调用node的start函数,node的start函数中将各个模块加载启动,从而启动整个系统。这一过程将在接下来进行分析。node启动后会注入hook,同时启动keepAliveThread,至此整个node就启动起来。

加载全部内容

相关教程
猜你喜欢
用户评论