@[toc]
前面说了Broker端的部分启动逻辑中的,启动前的准备部分。知道了启动类是BrokerStartup
,其中启动前的准备逻辑主要是通过BrokerController
来进行处理的。接下来继续分析,后面的启动部分的逻辑
启动初始化完毕的服务的start
启动的入口还是在BrokerStartup
,然后通过BrokerController
来完成。
public static void main(String[] args) {
//创建createBrokerController然后启动
start(createBrokerController(args));
}
public static BrokerController start(BrokerController controller) {
try {
//启动controller
controller.start();
//日志打印相关
.......
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
主要的逻辑还是在BrokerController
中,这里的逻辑其实就是对前面的初始化时候的一些服务类的启动
public void start() throws Exception {
//启动消息存储相关的线程任务
if (this.messageStore != null) {
this.messageStore.start();
}
//启动broker服务端
if (this.remotingServer != null) {
this.remotingServer.start();
}
//启动只给 消息的生产者使用的netty服务端口
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
//启动监控SSL连接文件的服务
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
//启动外部API的客户端
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
//启动push模式相关的服务
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
//启动心跳连接的服务
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
//启动消息过滤服务
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
//如果没启动DLegerCommitLog ,就对Broker进行注册到NameServer上
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
//如果是Master则启动事务消息检查的线程,对应的类就是TransactionalMessageCheckService
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
//注册broker的任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
//状态管理的开启
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}
这里的逻辑其实还是一目了然的。同时也是比较重要的部分。这里对这几个内部的服务类进行分析。其中messageStore
这个变量就是DefaultMessageStore
这个类,start
方法就是其内的方法,这里不进行分析,前面已经有专门文章分析过了。可以看前面,继续后面的分析。对这几个服务进行说明一下
服务类 | 实现类 | 说明 |
---|---|---|
messageStore | DefaultMessageStore | 处理消息的存储相关的日志,比如CommitLog,ConsumeQueue等 |
remotingServer | RemotingServer | Broker的服务端,处理消费者和生产者的请求 |
fastRemotingServer | RemotingServer | 只给消息生产者的服务端 |
fileWatchService | FileWatchService | 启动监控服务连接时用到的SSL连接文件的服务 |
brokerOuterAPI | BrokerOuterAPI | RocketMQ控制台跟Broker交互时候的客户端 |
pullRequestHoldService | PullRequestHoldService | 处理push模式消费,或者延迟消费的服务 |
clientHousekeepingService | ClientHousekeepingService | 心跳连接用的服务 |
filterServerManager | FilterServerManager | 消息过来用的服务 |
transactionalMessageCheckService | TransactionalMessageCheckService | 定期检查和处理事务消息的服务 |
slaveSynchronize | SlaveSynchronize | 主从之间topic,消费便宜等信息同步用的 |
Broker相关服务的启动
在介绍这些启动服务之前,先说一下BrokerController
内部的一个注册事件到不同服务端口的方法registerProcessor
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
//注册消息发送的钩子方法
sendProcessor.registerSendMessageHook(sendMessageHookList);
//注册消费消息的钩子方法
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
//在对应的服务端注册对应的事件。
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
//拉取请求的事件
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
//注册消息的事件
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* ReplyMessageProcessor
*/
//再次拉取消息
ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
....
//注册其他的事件到不同的端
}
列举一下不同的端注册的一些事件。这里主要对两个端口的服务进行注册,一个是消费者和生产者都使用的端口10911
对应remotingServer
,另外一个是消息的生产者专用的10909
端口对应fastRemotingServer
。
服务端 | 事件 | code码 | 描述 |
---|---|---|---|
remotingServer | SEND_MESSAGE | 10 | 生产者发送信息 |
remotingServer | SEND_MESSAGE_V2 | 310 | 生产者发送信息 |
remotingServer | SEND_BATCH_MESSAGE | 320 | 批量发送消息 |
remotingServer | CONSUMER_SEND_MSG_BACK | 36 | 消费端消费失败的时候返回的消息 |
remotingServer | PULL_MESSAGE | 11 | 消费者拉取消息 |
remotingServer | SEND_REPLY_MESSAGE | 324 | 消费者回包消息,可以用类似RPC调用 |
其中还有很多别的事件,这里就不列举了。注册的这些事件会被保存到一个Map结构中,并与当前的netty服务绑定。
Broker服务的正式启动
remotingServer
是Broker的对消费者和生产者的一个服务。这个服务的启动就代表Broker的启动。而remotingServer
的实例对象是NettyRemotingServer
类,这个类的作用就是对netty的进一步的封装处理类。直接看start
方法。
public void start() {
//创建处理事件的线程组
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
/**
* 初始换服务连接相关的处理类,其中有针对握手时候模式(TSL)的一些处理逻辑,连接建立和其他事件的一些监听处理逻辑
* HandshakeHandler 用于处理跟Broker建立连接时候的连接模式
* NettyConnectManageHandler 用于监听连接的一些事件,比如连接建立,断开等,会触发对应的监听器
* NettyEncoder 解码用的处理器
* NettyServerHandler 处理请求到达时候根据不同的请求code,进行分发的逻辑
*/
prepareSharableHandlers();
//创建netty客户端,启动主从线程组是在类创建时候初始化的。
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//把握手的处理器加到netty的pipeline序列中
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
//把解码处理器,连接监听处理器,请求code处理器加到序列中
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
.......
}
可以看到这里的主要的逻辑
- 创建netty客户端,
- 然后初始化一些处理器
- 把处理器加到netty中然后启动
这里对于netty部分的不进行讲解,可以自行在网上查找资料。这里对几个处理器进行说明一下