RocketMQ源码解析——Broker部分之Broker启动过程BrokerStartup(1)

@[toc]

从启动脚本到启动类

 我们知道RocketMQ的Broker端的启动方式为,进入到RocketMQ的bin目录下,运行对应的mqbroker脚本。
 RocketMQ的Linux和Windows脚本内容也就是对应的mqbroker脚本,这里贴出来看看

//省略部分脚本
export ROCKETMQ_HOME

sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@

 上面这段是Linux系统下的shell脚本,

if not exist "%ROCKETMQ_HOME%\bin\runbroker.cmd" echo Please set the ROCKETMQ_HOME variable in your environment! & EXIT /B 1

call "%ROCKETMQ_HOME%\bin\runbroker.cmd" org.apache.rocketmq.broker.BrokerStartup %*

IF %ERRORLEVEL% EQU 0 (
   ECHO "Broker starts OK"
)

 上面这段是Windows系统下的cmd脚本命令。从这两段脚本中可以看出,最后的都会运行一个runbroker的脚本,还会把一个类的全路径名作为参数传进去,这个类就是启动类BrokerStartup

启动类BrokerStartup

BrokerStartup的启动函数是main函数,里面的代码也简单只有一行,主要的逻辑也简单,就是创建BrokerController类(后面说),然后启动。

    public static void main(String[] args) {
        //创建createBrokerController然后启动
        start(createBrokerController(args));
    }

启动的准备工作createBrokerController

public static BrokerController createBrokerController(String[] args) {
        //MQ的版本号
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //设置broker的netty客户端的发送缓冲大小,默认是128 kb
        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
            NettySystemConfig.socketSndbufSize = 131072;
        }
        //设置broker的netty客户端的接收缓冲大小,默认是128 kb
        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
            NettySystemConfig.socketRcvbufSize = 131072;
        }

        try {
            //PackageConflictDetect.detectFastjson();
            //命令行选项解析
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            //解析命令栏中的 mqbroker
            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
            }
            //相关配置的存储对象
            final BrokerConfig brokerConfig = new BrokerConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            final NettyClientConfig nettyClientConfig = new NettyClientConfig();
            //是否使用TLS (TLS是SSL的升级版本,TLS是SSL的标准化后的产物,有1.0 1.1 1.2三个版本)
            nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
            //设置netty的服务端监听的端口 10911
            nettyServerConfig.setListenPort(10911);
            //消息存储相关的配置
            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
            //如果Broker是Slave角色,消息占用内存大小的比率 比默认的40% 还要小 10%
            if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
                int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
                //设置消息的内存最大占比,如果内存占比超过设定值,那么就进行置换
                messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
            }


           ......省略配置读取保存相关代码


            //创建BrokerController
            final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);
            // remember all configs to prevent discard
            //缓存额外配置
            controller.getConfiguration().registerConfig(properties);
            //初始化BrokerController
            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }
            //注册关闭的钩子方法
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
                private AtomicInteger shutdownTimes = new AtomicInteger(0);
                @Override
                public void run() {
                    synchronized (this) {
                        log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                        if (!this.hasShutdown) {
                            this.hasShutdown = true;
                            long beginTime = System.currentTimeMillis();
                            //BrokerController的销毁方法
                            controller.shutdown();
                            long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                            log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                        }
                    }
                }
            }, "ShutdownHook"));

            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

createBrokerController这个方法逻辑比较简单,主要就是获取启动参数和命令单中的配置参数以及默认的配置参数进行融合。然后使用这些配置对象创建BrokerController,并调用其initialize方法。其中核心就是创建BrokerController,并调用其initialize方法。这里主要说一下Master和Slave在启动时候的区别

参数 说明 master slave
accessMessageInMemoryMaxRatio 消息占用内存大小的比率,超过这个值就需要进行内存置换 默认值40% 默认值30%
brokerId 机器的角色Id 0 大于0的数

 Broker在启动的时候会开三个端口

端口 作用
10911 接收消息推送的端口
10912 高可用的端口
10909 推送消息的vip端口

Broker端的核心BrokerController

BrokerController的构造方法

BrokerController是Broker启动逻辑的核心类。其构造函数中包含很多Broker端的内部服务类的初始化。

public BrokerController(
        final BrokerConfig brokerConfig,
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig
    ) {
        //前面准备的配置信息
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;
        //Consumer消费进度记录管理类
        this.consumerOffsetManager = new ConsumerOffsetManager(this);

        //消息topic维度的管理查询类 管理topic和topic相关配置关系
        this.topicConfigManager = new TopicConfigManager(this);

        //Consumer端使用pull的方式向Broker拉取消息请求的处理类
        this.pullMessageProcessor = new PullMessageProcessor(this);

        //Consumer端使用pull的方式拉取请求时,保存请求,当有消息到达时进行推送处理类
        this.pullRequestHoldService = new PullRequestHoldService(this);

        //消息到达Broker的时候的监听回调类,这里会调用到pullRequestHoldService中的方法
        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);

        //消费者id变化监听器
        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);

        //消费者管理类  按照group进行分组,对消费者的id变化进行监听
        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);

        //消费者的过滤管理类  按照topic进行分类
        this.consumerFilterManager = new ConsumerFilterManager(this);

        //生产者管理 按照group进行分类
        this.producerManager = new ProducerManager();

        //心跳连接处理类
        this.clientHousekeepingService = new ClientHousekeepingService(this);

        //控制台用的
        this.broker2Client = new Broker2Client(this);

        //订阅关系管理类
        this.subscriptionGroupManager = new SubscriptionGroupManager(this);

        //broker对外api
        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);

        this.filterServerManager = new FilterServerManager(this);

        //从broker 同步进度管理类
        this.slaveSynchronize = new SlaveSynchronize(this);
        //各种线程池的阻塞队列


        this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
        this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
        this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
        this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
        this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
        this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());

        this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));

        this.brokerFastFailure = new BrokerFastFailure(this);
        this.configuration = new Configuration(
            log,
            BrokerPathConfigHelper.getBrokerConfigPath(),
            this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
        );
    }

 列举一下这些核心类的作用

字段类 作用
ConsumerOffsetManager Consumer消费者的消费进度记录管理类
TopicConfigManager 消息topic维度的管理查询类
PullMessageProcessor Consumer端使用pull的方式向Broker拉取消息请求的处理类
PullRequestHoldService Consumer端使用push的方式拉取请求时,保存请求,当有消息到达时进行推送处理类。对于push方式消费,会对消费端的请求进行保存,当有消息到达的时候然后进行推送
NotifyMessageArrivingListener 消息到达Broker的时候的监听回调类,这里会调用到pullRequestHoldService中的notifyMessageArriving方法
DefaultConsumerIdsChangeListener 消费者id变化监听器,主要监听Consumer的注册和下线的事件
ConsumerManager 消费者管理类 按照group进行分组,对消费者的id变化进行监听
ConsumerFilterManager 消费者的过滤管理类 按照topic进行分类
ProducerManager 生产者管理 按照group进行分类
ClientHousekeepingService 心跳连接处理类
Broker2Client 控制台获取Broker信息用
SubscriptionGroupManager 订阅关系管理类
BrokerOuterAPI broker对外api
SlaveSynchronize 从broker 同步进度管理类

BrokerController的初始化initialize

 在BrokerStartupcreateBrokerController方法中,在创建了BrokerController对象之后会马上调用其initialize方法。来进行Broker的初始化。
initialize方法,内部的逻辑比较多。这里整理一下主要做的事情:

  1. 从服务器加载topic配置,不同的Consumer消费的进度情况,订阅关系,Consumer的过滤信息配置等信息。这些信息都保存在对应的文件当中
  2. 如果步骤1加载正常,会创建DefaultMessageStore对象。在这个对象中又有一堆逻辑
public boolean initialize() throws CloneNotSupportedException {
        //加载 topic 相关配置,文件地址为 {user.home}/store/config/topics.json
        boolean result = this.topicConfigManager.load();
        //加载 不同的Consumer消费的进度情况  文件地址为 {user.home}/store/config/consumerOffset.json
        result = result && this.consumerOffsetManager.load();
        //加载 订阅关系  文件地址  {user.home}/store/config/subscriptionGroup.json
        result = result && this.subscriptionGroupManager.load();
        //加载 Consumer的过滤信息配置  文件地址  {user.home}/store/config/consumerFilter.json
        result = result && this.consumerFilterManager.load();
        //如果上述文件加载正常
        if (result) {
            try {
                //创建DefaultMessageStore,
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                        this.brokerConfig);
                //使用的是DLegerCommitLog,则创建DLedgerRoleChangeHandler
                if (messageStoreConfig.isEnableDLegerCommitLog()) {
                    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
                }
                //Broker的消息统计类
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                //load plugin
                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
            } catch (IOException e) {
                result = false;
                log.error("Failed to initialize", e);
            }
        }
        //加载消息的日志文件,包含CommitLog,ConsumeQueue等
        result = result && this.messageStore.load();
        //如果上述文件存在一个加载不成功则直接返回
        if (result) {
            //开启服务端
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            //再开一个端口为10909 的服务端口,这个端口只给 消息的生产者使用
            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
            //处理消息生产者发送的生成消息api 相关的线程池
            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_"));
            //处理消费者发出的消费消息api 相关的线程池
            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getPullMessageThreadPoolNums(),
                this.brokerConfig.getPullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.pullThreadPoolQueue,
                new ThreadFactoryImpl("PullMessageThread_"));
            //处理回复消息api的线程池
            this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.replyThreadPoolQueue,
                new ThreadFactoryImpl("ProcessReplyMessageThread_"));
            //查询线程
            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.queryThreadPoolQueue,
                new ThreadFactoryImpl("QueryMessageThread_"));
            //-------------------------省略部分线程池的创建------------------------------//
            
            //为客户端注册需要处理API指令事件,以及消息发送和消费的回调方法
            this.registerProcessor();
            
            //-------------------------省略定时执行任务服务------------------------------//
            //初始化事务消息相关的服务
            initialTransaction();
            //消息轨迹
            initialAcl();
            //Rpc调用的钩子
            initialRpcHooks();
    }


    private void initialTransaction() {
        //加载TransactionalMessageService服务,实现类为TransactionalMessageServiceImpl
        this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
        if (null == this.transactionalMessageService) {
            this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
            log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
        }
        //AbstractTransactionalMessageCheckListener对应的服务类为LogTransactionalMessageCheckListener ,其中实现为空实现
        this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
        if (null == this.transactionalMessageCheckListener) {
            this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
            log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
        }
        //设置对应的brokerController到AbstractTransactionalMessageCheckListener中
        this.transactionalMessageCheckListener.setBrokerController(this);
        //创建TransactionalMessageCheckService,这个服务是周期检查事务的服务,
        this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
    }


        private void initialAcl() {
        if (!this.brokerConfig.isAclEnable()) {
            log.info("The broker dose not enable acl");
            return;
        }
        //初始化AccessValidator对应的实现PlainAccessValidator
        List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
        if (accessValidators == null || accessValidators.isEmpty()) {
            log.info("The broker dose not load the AccessValidator");
            return;
        }
        //把对应的权限校验加入对应的校验器中
        for (AccessValidator accessValidator: accessValidators) {
            final AccessValidator validator = accessValidator;
            accessValidatorMap.put(validator.getClass(),validator);
            this.registerServerRPCHook(new RPCHook() {

                @Override
                public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                    //Do not catch the exception
                    validator.validate(validator.parse(request, remoteAddr));
                }
                
                @Override
                public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
                }
            });
        }
    }

     private void initialRpcHooks() {
        //加载对应的RPC钩子方法
        List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
        if (rpcHooks == null || rpcHooks.isEmpty()) {
            return;
        }
        for (RPCHook rpcHook: rpcHooks) {
            //注册钩子方法
            this.registerServerRPCHook(rpcHook);
        }
    }

initialize方法中的逻辑比较多,主要可以分为以下几步:

  1. 服务器内的相关日志文件的加载,{user.home}/store/config/ 文件目录下的json配置文件(包含topics,consumerOffset,subscriptionGroup,consumerFilter)。以及通过DefaultMessageStore来加载CommitLog,ConsumeQueue,IndexFile等文件(这里关于DefaultMessageStore中对于CommitLog,ConsumeQueue,IndexFile等文件加载过程不进行讲解,可以看前面关于DefaultMessageStore的文章
  2. 如果上述文件加载成功,会启动对应的Broker客户端,然后创建一些线程池,在后面注册 API 指令事件后会监听到API的时候会进行处理
  3. 注册事件到对应的Broker客户端上,然后会记录对应的API事件和对应线程池封装到一个对象中
  4. 启动一些 定时任务,这些任务比如记录Broker状态,消费进度持久化等任务
  5. 初始化一些服务,比如事务相关(周期检查事务),消息权限校验初始化和Rpc调用钩子相关服务。对应的服务加载方式是Java的SPI方式。

BrokerStartup启动之前的初始化逻辑就是这些了,其中有一些需要看前面的文章,和源码进行分析。这里就不太过深入,后面的BrokerStartup的启动解析下篇文章解析

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335

推荐阅读更多精彩内容