RocketMQ源码解析——Broker部分之Broker启动过程BrokerStartup(2)

@[toc]
 前面说了Broker端的部分启动逻辑中的,启动前的准备部分。知道了启动类是BrokerStartup,其中启动前的准备逻辑主要是通过BrokerController来进行处理的。接下来继续分析,后面的启动部分的逻辑

启动初始化完毕的服务的start

 启动的入口还是在BrokerStartup,然后通过BrokerController来完成。

public static void main(String[] args) {
        //创建createBrokerController然后启动
        start(createBrokerController(args));
    }

    public static BrokerController start(BrokerController controller) {
        try {
            //启动controller
            controller.start();
           //日志打印相关
                .......
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

 主要的逻辑还是在BrokerController中,这里的逻辑其实就是对前面的初始化时候的一些服务类的启动

public void start() throws Exception {
        //启动消息存储相关的线程任务
        if (this.messageStore != null) {
            this.messageStore.start();
        }
        //启动broker服务端
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
        //启动只给 消息的生产者使用的netty服务端口
        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.start();
        }
        //启动监控SSL连接文件的服务
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
        //启动外部API的客户端
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }
        //启动push模式相关的服务
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }
        //启动心跳连接的服务
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }
        //启动消息过滤服务
        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }
        //如果没启动DLegerCommitLog ,就对Broker进行注册到NameServer上
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            //如果是Master则启动事务消息检查的线程,对应的类就是TransactionalMessageCheckService
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
            this.registerBrokerAll(true, false, true);
        }

        //注册broker的任务
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
        //状态管理的开启
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }

        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }


    }

 这里的逻辑其实还是一目了然的。同时也是比较重要的部分。这里对这几个内部的服务类进行分析。其中messageStore这个变量就是DefaultMessageStore这个类,start方法就是其内的方法,这里不进行分析,前面已经有专门文章分析过了。可以看前面,继续后面的分析。对这几个服务进行说明一下

服务类 实现类 说明
messageStore DefaultMessageStore 处理消息的存储相关的日志,比如CommitLog,ConsumeQueue等
remotingServer RemotingServer Broker的服务端,处理消费者和生产者的请求
fastRemotingServer RemotingServer 只给消息生产者的服务端
fileWatchService FileWatchService 启动监控服务连接时用到的SSL连接文件的服务
brokerOuterAPI BrokerOuterAPI RocketMQ控制台跟Broker交互时候的客户端
pullRequestHoldService PullRequestHoldService 处理push模式消费,或者延迟消费的服务
clientHousekeepingService ClientHousekeepingService 心跳连接用的服务
filterServerManager FilterServerManager 消息过来用的服务
transactionalMessageCheckService TransactionalMessageCheckService 定期检查和处理事务消息的服务
slaveSynchronize SlaveSynchronize 主从之间topic,消费便宜等信息同步用的

Broker相关服务的启动

 在介绍这些启动服务之前,先说一下BrokerController内部的一个注册事件到不同服务端口的方法registerProcessor

public void registerProcessor() {
        /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        //注册消息发送的钩子方法
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        //注册消费消息的钩子方法
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
        //在对应的服务端注册对应的事件。
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        /**
         * PullMessageProcessor
         */
        //拉取请求的事件
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
        //注册消息的事件
        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

        /**
         * ReplyMessageProcessor
         */
        //再次拉取消息
        ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
        replyMessageProcessor.registerSendMessageHook(sendMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
            ....
        //注册其他的事件到不同的端
}

 列举一下不同的端注册的一些事件。这里主要对两个端口的服务进行注册,一个是消费者和生产者都使用的端口10911对应remotingServer,另外一个是消息的生产者专用的10909端口对应fastRemotingServer

服务端 事件 code码 描述
remotingServer SEND_MESSAGE 10 生产者发送信息
remotingServer SEND_MESSAGE_V2 310 生产者发送信息
remotingServer SEND_BATCH_MESSAGE 320 批量发送消息
remotingServer CONSUMER_SEND_MSG_BACK 36 消费端消费失败的时候返回的消息
remotingServer PULL_MESSAGE 11 消费者拉取消息
remotingServer SEND_REPLY_MESSAGE 324 消费者回包消息,可以用类似RPC调用

其中还有很多别的事件,这里就不列举了。注册的这些事件会被保存到一个Map结构中,并与当前的netty服务绑定。

Broker服务的正式启动

remotingServer是Broker的对消费者和生产者的一个服务。这个服务的启动就代表Broker的启动。而remotingServer的实例对象是NettyRemotingServer类,这个类的作用就是对netty的进一步的封装处理类。直接看start方法。

public void start() {
        //创建处理事件的线程组
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
        /**
         *  初始换服务连接相关的处理类,其中有针对握手时候模式(TSL)的一些处理逻辑,连接建立和其他事件的一些监听处理逻辑
         *  HandshakeHandler 用于处理跟Broker建立连接时候的连接模式
         *  NettyConnectManageHandler 用于监听连接的一些事件,比如连接建立,断开等,会触发对应的监听器
         *  NettyEncoder 解码用的处理器
         *  NettyServerHandler 处理请求到达时候根据不同的请求code,进行分发的逻辑
         */
        prepareSharableHandlers();
        //创建netty客户端,启动主从线程组是在类创建时候初始化的。
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        //把握手的处理器加到netty的pipeline序列中
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                //把解码处理器,连接监听处理器,请求code处理器加到序列中
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

        .......
    }

 可以看到这里的主要的逻辑

  1. 创建netty客户端,
  2. 然后初始化一些处理器
  3. 把处理器加到netty中然后启动

这里对于netty部分的不进行讲解,可以自行在网上查找资料。这里对几个处理器进行说明一下

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容