玩转Elasticsearch源码-一张图看懂ES启动流程

开篇

直接看图


ES启动流程.png

上图中虚线表示进入具体流程,实线表示下一步,为了后面讲解方便每个步骤都加了编号。
先简单介绍下启动流程主要涉及的类:

  • org.elasticsearch.bootstrap.Elasticsearch: 启动入口,main方法就在这个类里面,执行逻辑对应图中绿色部分
  • org.elasticsearch.bootstrap.Bootstrap:包含主要启动流程代码,执行逻辑对应图中红色部分
  • org.elasticsearch.node.Node:代表集群中的节点,执行逻辑对应图中蓝色部分

流程讲解

  1. main方法
  2. 设置了一个空的SecurityManager:

// we want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
// presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy)
//我们希望JVM认为已经安装了一个安全管理器,这样,如果基于安全管理器的存在或缺少安全管理器的内部策略决策就会像有一个安全管理器一样(e.g.、DNS缓存策略)
// grant all permissions so that we can later set the security manager to the one that we want
//授予所有权限,以便稍后可以将安全管理器设置为所需的权限

添加StatusConsoleListener到STATUS_LOGGER:

We want to detect situations where we touch logging before the configuration is loaded . If we do this , Log 4 j will status log an error message at the error level . With this error listener , we can capture if this happens . More broadly , we can detect any error - level status log message which likely indicates that something is broken . The listener is installed immediately on startup , and then when we get around to configuring logging we check that no error - level log messages have been logged by the status logger . If they have we fail startup and any such messages can be seen on the console
我们希望检测在加载配置之前进行日志记录的情况。如果这样做,log4j将在错误级别记录一条错误消息。使用这个错误监听器,我们可以捕捉到这种情况。更广泛地说,我们可以检测任何错误级别的状态日志消息,这些消息可能表示某个东西坏了。侦听器在启动时立即安装,然后在配置日志记录时,我们检查状态日志记录器没有记录错误级别的日志消息。如果它们启动失败,我们可以在控制台上看到任何此类消息。

实例化Elasticsearch:

Elasticsearch() {
        super("starts elasticsearch", () -> {}); // () -> {} 是启动前的回调
        //下面解析version,daemonize,pidfile,quiet参数
        versionOption = parser.acceptsAll(Arrays.asList("V", "version"),
            "Prints elasticsearch version information and exits");
        daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),
            "Starts Elasticsearch in the background")
            .availableUnless(versionOption);
        pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),
            "Creates a pid file in the specified path on start")
            .availableUnless(versionOption)
            .withRequiredArg()
            .withValuesConvertedBy(new PathConverter());
        quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),
            "Turns off standard output/error streams logging in console")
            .availableUnless(versionOption)
            .availableUnless(daemonizeOption);
    }

3.注册ShutdownHook,用于关闭系统时捕获IOException到terminal

            shutdownHookThread = new Thread(() -> {
                try {
                    this.close();
                } catch (final IOException e) {
                    try (
                        StringWriter sw = new StringWriter();
                        PrintWriter pw = new PrintWriter(sw)) {
                        e.printStackTrace(pw);
                        terminal.println(sw.toString());
                    } catch (final IOException impossible) {
                        // StringWriter#close declares a checked IOException from the Closeable interface but the Javadocs for StringWriter
                        // say that an exception here is impossible
                        throw new AssertionError(impossible);
                    }
                }
            });
            Runtime.getRuntime().addShutdownHook(shutdownHookThread);

然后调用beforeMain.run(),其实就是上面实例化Elasticsearch对象时创建的()->{} lambda表达式。
4.进入Command类的mainWithoutErrorHandling方法

 void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {
        final OptionSet options = parser.parse(args);//根据提供给解析器的选项规范解析给定的命令行参数

        if (options.has(helpOption)) {
            printHelp(terminal);
            return;
        }

        if (options.has(silentOption)) {//terminal打印最少内容
            terminal.setVerbosity(Terminal.Verbosity.SILENT);
        } else if (options.has(verboseOption)) {//terminal打印详细内容
            terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
        } else {
            terminal.setVerbosity(Terminal.Verbosity.NORMAL);
        }

        execute(terminal, options);
    }

5.进入EnvironmentAwareCommand的execute方法

protected void execute(Terminal terminal, OptionSet options) throws Exception {
        final Map<String, String> settings = new HashMap<>();
        for (final KeyValuePair kvp : settingOption.values(options)) {
            if (kvp.value.isEmpty()) {
                throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");
            }
            if (settings.containsKey(kvp.key)) {
                final String message = String.format(
                        Locale.ROOT,
                        "setting [%s] already set, saw [%s] and [%s]",
                        kvp.key,
                        settings.get(kvp.key),
                        kvp.value);
                throw new UserException(ExitCodes.USAGE, message);
            }
            settings.put(kvp.key, kvp.value);
        }

        //确保给定的设置存在,如果尚未设置,则从系统属性中读取它。
        putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
        putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
        putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");

        execute(terminal, options, createEnv(terminal, settings));
    }

6.进入InternalSettingsPreparer的prepareEnvironment方法,读取elasticsearch.yml并创建Environment。细节比较多,后面再细讲。


Environment对象.png

7.判断是否有-v参数,没有则准备进入init流程

 protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
        if (options.nonOptionArguments().isEmpty() == false) {
            throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());
        }
        if (options.has(versionOption)) { //如果有 -v 参数,打印版本号后直接退出
            terminal.println("Version: " + Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot())
                    + ", Build: " + Build.CURRENT.shortHash() + "/" + Build.CURRENT.date()
                    + ", JVM: " + JvmInfo.jvmInfo().version());
            return;
        }

        final boolean daemonize = options.has(daemonizeOption);
        final Path pidFile = pidfileOption.value(options);
        final boolean quiet = options.has(quietOption);

        try {
            init(daemonize, pidFile, quiet, env);
        } catch (NodeValidationException e) {
            throw new UserException(ExitCodes.CONFIG, e.getMessage());
        }
    }

8.调用Bootstrap.init
9.实例化Boostrap。保持keepAliveThread存活,可能是用于监控

Bootstrap() {
        keepAliveThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    keepAliveLatch.await();
                } catch (InterruptedException e) {
                    // bail out
                }
            }
        }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
        keepAliveThread.setDaemon(false);
        // keep this thread alive (non daemon thread) until we shutdown 保持这个线程存活(非守护进程线程),直到我们关机
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                keepAliveLatch.countDown();
            }
        });
    }

10.加载elasticsearch.keystore文件,重新创建Environment,然后调用LogConfigurator的静态方法configure,读取config目录下log4j2.properties然后配log4j属性
11.创建pid文件,检查lucene版本,不对应则抛出异常

 private static void checkLucene() {
        if (Version.CURRENT.luceneVersion.equals(org.apache.lucene.util.Version.LATEST) == false) {
            throw new AssertionError("Lucene version mismatch this version of Elasticsearch requires lucene version ["
                + Version.CURRENT.luceneVersion + "]  but the current lucene version is [" + org.apache.lucene.util.Version.LATEST + "]");
        }
    }

12.设置ElasticsearchUncaughtExceptionHandler用于打印fatal日志

            // install the default uncaught exception handler; must be done before security is
            // initialized as we do not want to grant the runtime permission
            // 安装默认未捕获异常处理程序;必须在初始化security之前完成,因为我们不想授予运行时权限
            // setDefaultUncaughtExceptionHandler
            Thread.setDefaultUncaughtExceptionHandler(
                new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));

13.进入Boostrap.setup
14.spawner.spawnNativePluginControllers(environment);尝试为给定模块生成控制器(native Controller)守护程序。 生成的进程将通过其stdin,stdout和stderr流保持与此JVM的连接,但对此包之外的代码不能使用对这些流的引用。
15.初始化本地资源 initializeNatives():检查用户是否作为根用户运行,是的话抛异常;系统调用和mlockAll检查;尝试设置最大线程数,最大虚拟内存,最大FD等。
初始化探针initializeProbes(),用于操作系统,进程,jvm的监控。
16.又加一个ShutdownHook

        if (addShutdownHook) {
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    try {
                        IOUtils.close(node, spawner);
                        LoggerContext context = (LoggerContext) LogManager.getContext(false);
                        Configurator.shutdown(context);
                    } catch (IOException ex) {
                        throw new ElasticsearchException("failed to stop node", ex);
                    }
                }
            });
        }

17.比较简单,直接看代码

 try {
            // look for jar hell
            JarHell.checkJarHell();
        } catch (IOException | URISyntaxException e) {
            throw new BootstrapException(e);
        }

        // Log ifconfig output before SecurityManager is installed
        IfConfig.logIfNecessary();

        // install SM after natives, shutdown hooks, etc.
        try {
            Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
        } catch (IOException | NoSuchAlgorithmException e) {
            throw new BootstrapException(e);
        }

18.实例化Node,重写validateNodeBeforeAcceptingRequests方法。具体主要包括三部分,第一是启动插件服务(es提供了插件功能来进行扩展功能,这也是它的一个亮点),加载需要的插件,第二是配置node环境,最后就是通过guice加载各个模块。下面22~32就是具体步骤。
19.进入Boostrap.start
20.node.start启动节点
21.keepAliveThread.start
22.Node实例化第一步,创建NodeEnvironment


NodeEnvironment.png

23.生成nodeId,打印nodeId,nodeName和jvmInfo和进程信息
24.创建 PluginsService 对象,创建过程中会读取并加载所有的模块和插件

25.又创建Environment

            // create the environment based on the finalized (processed) view of the settings 根据设置的最终(处理)视图创建环境
            // this is just to makes sure that people get the same settings, no matter where they ask them from 这只是为了确保人们得到相同的设置,无论他们从哪里询问
            this.environment = new Environment(this.settings, environment.configFile());

26.创建ThreadPool,然后给DeprecationLogger设置ThreadContext
27.创建NodeClient,用于执行actions
28.创建各个Service:ResourceWatcherService、NetworkService、ClusterService、IngestService、ClusterInfoService、UsageService、MonitorService、CircuitBreakerService、MetaStateService、IndicesService、MetaDataIndexUpgradeService、TemplateUpgradeService、TransportService、ResponseCollectorService、SearchTransportService、NodeService、SearchService、PersistentTasksClusterService
29.创建并添加modules:ScriptModule、AnalysisModule、SettingsModule、pluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule
30.Guice绑定和注入对象
31.初始化NodeClient

            client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
                    () -> clusterService.localNode().getId());

32.初始化rest处理器,这个非常重要,后面会专门讲解

if (NetworkModule.HTTP_ENABLED.get(settings)) {
                logger.debug("initializing HTTP handlers ..."); // 初始化http handler
                actionModule.initRestHandlers(() -> clusterService.state().nodes());
            }

33.修改状态为State.STARTED
34.启动pluginLifecycleComponents
35.通过 injector 获取各个类的对象,调用 start() 方法启动(实际进入各个类的中 doStart 方法): LifecycleComponent、IndicesService、IndicesClusterStateService、SnapshotsService、SnapshotShardsService、RoutingService、SearchService、MonitorService、NodeConnectionsService、ResourceWatcherService、GatewayService、Discovery、TransportService
36.启动HttpServerTransport和TransportService并绑定端口

if (WRITE_PORTS_FILE_SETTING.get(settings)) {
            if (NetworkModule.HTTP_ENABLED.get(settings)) {
                HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
                writePortsFile("http", http.boundAddress());
            }
            TransportService transport = injector.getInstance(TransportService.class);
            writePortsFile("transport", transport.boundAddress());
        }

总结

  • 本文只是讲解了ES启动的整体流程,其中很多细节会在本系列继续深入讲解
  • ES的源码读起来还是比较费劲的,流程比较长,经常读到后面忘了前面,需要通过画图的方式帮助梳理整个过程。
  • 为什么要读开源源码?
    1.知道底层实现,能够更好地使用,出问题能够快速定位和解决。
    2.学习别人优秀的代码和处理问题的方式,提高自己的系统设计能力。
    3.有机会可以对其进行扩展和改造。
欢迎您扫一扫上面的二维码关注个人微信公众号
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342