二、RocketMQ-Broker启动流程

一、简述

本文简述一下Broker的启动流程,主要涉及的步骤及简要配置,不做过多深入。

二、BrokerStartup、BrokerController

启动类,类结构如下:

BrokerStartup.java

首先会调用createBrokerController()实例化一个BrokerController(它才是核心的启动类),然后调用start()方法,这个套路和Namesrv的思路一致。

1、createBrokerController()

与Namesrv类似,通过一顿骚操作,将cmd命令行的参数进行解析,并产出4个配置(这四个配置本身有默认配置):

  • BrokerConfig
    broker自身的一些配置,例如namesrvAddr(namesrv地址),brokerIP1brokerIP2brokerNamedefaultTopicQueueNums(默认8个队列),autoCreateTopicEnable(默认居然是true)等等
  • NettyServerConfig
    nettyserver的一些配置,最重要的 listenPort10911)及serverWorkerThreads(worker的线程数量)、serverOnewaySemaphoreValue(one-way模式发送的最大线程数)、serverAsyncSemaphoreValue(异步模式最大的线程数),serverSocketSndBufSize(发送消息最大长度)等等
  • NettyClientConfig
    netty客户端(producerconsumer)的一些配置,connectTimeoutMillis(超时时间默认3秒),channelNotActiveInterval(通道异常检查时间,默认60秒)等等
  • MessageStoreConfig
    消息commitLog固化配置,storePathCommitLog(commitLog目录),mapedFileSizeCommitLog(commitLog大小,默认1G),
    messageDelayLevel(消息延迟投递级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)

与Namesrv类似,如果cmd中含有:

  • -c path,则会加载path指定的配置文件中的配置
  • -p 会打印所有的配置,并退出
  • -m 只打印 @ImportantField 注解标注的配置,并退出

待上面的配置都加载完毕后,调用initialize进行初始化:

boolean initResult = controller.initialize();

2、controller.initialize()

这个方法很重要,做了一堆的事情,在罗列功能之前,先看这个图

log地址

RocketMQ默认会在/home(我的是windows,所以home=C:\User\asd)下新建一个store文件夹,里面存放了一堆一堆很重要的配置,commitlog(固化的消息),configtopicsubscriptionGroupconsumerFilterconsumerOffsetdelayOffset配置),consumequeue(消费者队列配置)等等,都很重要,而controller.initialize()方法主要就是在broker启动的时候对这些配置进行初始化

  • topicConfigManager.load()
    加载C:\Users\asd\store\config\topics.json
  • consumerOffsetManager.load()
    加载C:\Users\asd\store\config\consumerOffset.json
  • subscriptionGroupManager.load()
    加载C:\Users\asd\store\config\subscriptionGroup.json
  • consumerFilterManager.load()
    加载C:\Users\asd\store\config\consumerFilter.json
  • messageStore.load()
    加载C:\Users\asd\store\commitLog\ 下所有消息日志文件

该加载的都加载完了,然后启动NettyServer以及一堆的 Executor 线程池

  • remotingServer
    默认监听端口 10911
  • fastRemotingServer
    默认监听端口 10911 - 2 = 10909,莫非和VIP通道有关?后续补充
  • sendMessageExecutor
    发送消息的线程池
  • pullMessageExecutor
    拉取消息的线程池
  • queryMessageExecutor
    查询消息的线程池
  • adminBrokerExecutor
    不知道干啥的线程池
  • clientManageExecutor
    客户端连接管理线程池
  • heartbeatExecutor
    心跳线程池
  • endTransactionExecutor
    事务结束线程池
  • consumerManageExecutor
    消费者连接线程池

线程池初始化呢完毕后,开启一些定时任务

  • BrokerController.this.getBrokerStats().record();
    固化broker的状态,默认1天执行一次
  • BrokerController.this.consumerOffsetManager.persist();
    固化offset,延迟10秒,每5秒执行一次
  • BrokerController.this.consumerFilterManager.persist();
    Filter固化,延迟10秒,每10秒执行一次
  • BrokerController.this.protectBroker();
    保护broker?后续完善,延迟3分,每3分执行一次
  • BrokerController.this.printWaterMark();
    打印流水信息,发送、拉取、查询、结束事务消息等日志
    延迟10秒,每1秒一次
  • log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
    打印日志:获取已经固化到commitlog,但是还没有被消费的日志的byte大小
    延迟10秒,每60秒执行一次
  • BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
    如果fetchNamesrvAddrByAddressServer=true,则会执行
    主要是通过httpclient从给定的URL动态获取NameSrv的地址
    延迟10秒,每120秒执行一次
  • BrokerController.this.slaveSynchronize.syncAll();
    如果broker的角色是SLAVE,则会执行,主要是从主Broker定期同步topicoffsetdelayOffsetgroup信息
    延迟10秒,每60秒执行一次
  • BrokerController.this.printMasterAndSlaveDiff();
    如果broker的角色是不是SLAVE,则会执行,主要打印SLAVEMaster之间差异的byte大小

如果启用了TLS安全传输配置,则会启动 fileWatchService?后续补充

最后调用了三个方法:

  • initialTransaction()
    初始化事务消息的一些服务及Listener
  • initialAcl();
    初始化ACL的一些服务,进行访问控制
  • initialRpcHooks();
    暂时不知道干啥的

到这里,controller.initialize()算是完成了,回顾一下,其实思路很清晰:

  • 1、加载配置文件
  • 2、启动Netty服务
  • 3、初始化线程池
  • 4、启动定时任务
  • 5、其他的一些配置

3、start()

启动就比较直接了,首先启动一些服务:

  • messageStore.start()
    消息固化服务启动
  • remotingServer.start()
    nettyServer启动 10911
  • fastRemotingServer.start()
    vip nettyServer启动,端口10909
  • fileWatchService.start();
    启用TLS加密的服务启动
  • brokerOuterAPI.start();
    启用动态获取NameSrv的服务启动
  • pullRequestHoldService.start();
    获取请求的服务启动
  • clientHousekeepingService.start();
    启动
  • filterServerManager.start();
    过滤器启动
  • BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
    定时任务,从Namesrv获取所有broker注册的信息,延迟10秒,每10秒或者registerNameServerPeriod指定的时间,但是不能少于10秒执行一次
  • brokerStatsManager.start();
    broker状态检查启动,貌似啥也没干
  • this.brokerFastFailure.start();
  • transactionalMessageCheckService.start();
    如果不是SLAVE,则会启动事务
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342