一、简述
本文简述一下Broker的启动流程,主要涉及的步骤及简要配置,不做过多深入。
二、BrokerStartup、BrokerController
启动类,类结构如下:
首先会调用createBrokerController()实例化一个BrokerController(它才是核心的启动类),然后调用start()方法,这个套路和Namesrv的思路一致。
1、createBrokerController()
与Namesrv类似,通过一顿骚操作,将cmd命令行的参数进行解析,并产出4个配置(这四个配置本身有默认配置):
-
BrokerConfig:
broker自身的一些配置,例如namesrvAddr(namesrv地址),brokerIP1,brokerIP2,brokerName,defaultTopicQueueNums(默认8个队列),autoCreateTopicEnable(默认居然是true)等等 -
NettyServerConfig
nettyserver的一些配置,最重要的 listenPort (10911)及serverWorkerThreads(worker的线程数量)、serverOnewaySemaphoreValue(one-way模式发送的最大线程数)、serverAsyncSemaphoreValue(异步模式最大的线程数),serverSocketSndBufSize(发送消息最大长度)等等 -
NettyClientConfig
netty客户端(producer、consumer)的一些配置,connectTimeoutMillis(超时时间默认3秒),channelNotActiveInterval(通道异常检查时间,默认60秒)等等 -
MessageStoreConfig
消息commitLog固化配置,storePathCommitLog(commitLog目录),mapedFileSizeCommitLog(commitLog大小,默认1G),
messageDelayLevel(消息延迟投递级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
与Namesrv类似,如果cmd中含有:
- -c path,则会加载path指定的配置文件中的配置
- -p 会打印所有的配置,并退出
- -m 只打印 @ImportantField 注解标注的配置,并退出
待上面的配置都加载完毕后,调用initialize进行初始化:
boolean initResult = controller.initialize();
2、controller.initialize()
这个方法很重要,做了一堆的事情,在罗列功能之前,先看这个图
RocketMQ默认会在/home(我的是windows,所以home=C:\User\asd)下新建一个store文件夹,里面存放了一堆一堆很重要的配置,commitlog(固化的消息),config(topic、subscriptionGroup、consumerFilter、consumerOffset、delayOffset配置),consumequeue(消费者队列配置)等等,都很重要,而controller.initialize()方法主要就是在broker启动的时候对这些配置进行初始化
-
topicConfigManager.load()
加载C:\Users\asd\store\config\topics.json -
consumerOffsetManager.load()
加载C:\Users\asd\store\config\consumerOffset.json -
subscriptionGroupManager.load()
加载C:\Users\asd\store\config\subscriptionGroup.json -
consumerFilterManager.load()
加载C:\Users\asd\store\config\consumerFilter.json -
messageStore.load()
加载C:\Users\asd\store\commitLog\ 下所有消息日志文件
该加载的都加载完了,然后启动NettyServer以及一堆的 Executor 线程池
-
remotingServer
默认监听端口 10911 -
fastRemotingServer
默认监听端口 10911 - 2 = 10909,莫非和VIP通道有关?后续补充 -
sendMessageExecutor
发送消息的线程池 -
pullMessageExecutor
拉取消息的线程池 -
queryMessageExecutor
查询消息的线程池 -
adminBrokerExecutor
不知道干啥的线程池 -
clientManageExecutor
客户端连接管理线程池 -
heartbeatExecutor
心跳线程池 -
endTransactionExecutor
事务结束线程池 -
consumerManageExecutor
消费者连接线程池
线程池初始化呢完毕后,开启一些定时任务
-
BrokerController.this.getBrokerStats().record();
固化broker的状态,默认1天执行一次 -
BrokerController.this.consumerOffsetManager.persist();
固化offset,延迟10秒,每5秒执行一次 -
BrokerController.this.consumerFilterManager.persist();
Filter固化,延迟10秒,每10秒执行一次 -
BrokerController.this.protectBroker();
保护broker?后续完善,延迟3分,每3分执行一次 -
BrokerController.this.printWaterMark();
打印流水信息,发送、拉取、查询、结束事务消息等日志
延迟10秒,每1秒一次 -
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
打印日志:获取已经固化到commitlog,但是还没有被消费的日志的byte大小
延迟10秒,每60秒执行一次 -
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
如果fetchNamesrvAddrByAddressServer=true,则会执行
主要是通过httpclient从给定的URL动态获取NameSrv的地址
延迟10秒,每120秒执行一次 -
BrokerController.this.slaveSynchronize.syncAll();
如果broker的角色是SLAVE,则会执行,主要是从主Broker定期同步topic、offset、delayOffset、group信息
延迟10秒,每60秒执行一次 -
BrokerController.this.printMasterAndSlaveDiff();
如果broker的角色是不是SLAVE,则会执行,主要打印SLAVE与Master之间差异的byte大小
如果启用了TLS安全传输配置,则会启动 fileWatchService?后续补充
最后调用了三个方法:
-
initialTransaction()
初始化事务消息的一些服务及Listener -
initialAcl();
初始化ACL的一些服务,进行访问控制 -
initialRpcHooks();
暂时不知道干啥的
到这里,controller.initialize()算是完成了,回顾一下,其实思路很清晰:
- 1、加载配置文件
- 2、启动Netty服务
- 3、初始化线程池
- 4、启动定时任务
- 5、其他的一些配置
3、start()
启动就比较直接了,首先启动一些服务:
-
messageStore.start()
消息固化服务启动 -
remotingServer.start()
nettyServer启动 10911 -
fastRemotingServer.start()
vip nettyServer启动,端口10909 -
fileWatchService.start();
启用TLS加密的服务启动 -
brokerOuterAPI.start();
启用动态获取NameSrv的服务启动 -
pullRequestHoldService.start();
获取请求的服务启动 -
clientHousekeepingService.start();
启动 -
filterServerManager.start();
过滤器启动 -
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
定时任务,从Namesrv获取所有broker注册的信息,延迟10秒,每10秒或者registerNameServerPeriod指定的时间,但是不能少于10秒执行一次 -
brokerStatsManager.start();
broker状态检查启动,貌似啥也没干 - this.brokerFastFailure.start();
-
transactionalMessageCheckService.start();
如果不是SLAVE,则会启动事务