一、UML图例
二、大致流程说明:
流程为非事务消息流程
- 在main方法中调用 new DefaultMQProducer(... ...).start(true);
- DefaultMQProducerImpl调用checkConfig()方法检查groupName等参数是否合法,然后调用MQClientManager的getAndCreateMQClientInstance()和registerProducer()方法,实例化一个MQClientInstance并将new出来的DefaultMQProducer注册到MQClientInstance中(放到了producerTable这个map中)
- MQClientInstance中,依次调用
-
MQClientAPIImpl.start():
启动 remotingClient,request-response channel,即通过netty与name server和broker连接 -
this.startScheduledTask():
Start various schedule tasks ,启动定时任务,定时更新remotingClient的NameSrv的地址等配置信息 -
PullMessageServic.start():
Start pull service,启动消息拉取客户端 -
RebalanceService.start():
Start rebalance service,启动负载均衡服务 -
this.sendHeartbeatToAllBrokerWithLock():
向broker发送心跳,并拉取配置更新remotingClient -
DefaultMQProducerImpl.start(false):
再次调用DefaultMQProducerImpl的start方法,不过不再进行初始化
这里为什么这么写,后续理解了再进行补充,反正是连上后,发送了两次心跳 -
this.sendHeartbeatToAllBrokerWithLock():
向broker发送心跳,并拉取配置更新remotingClient
三、深入MQClientInstance
1、mQClientAPIImpl.start()
先看源码:
- 这货就新建了一个netty client
- scheduleAtFixedRate启动了一个定时 :
This method is periodically invoked to scan and expire deprecated request,大意是周期性地调用,用来扫描并让废弃的请求过期 - this.nettyEventExecutor.start()
启动netty event监听,暂时不太清除干啥使的,后续补充!
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
四、MQClientInstance.startScheduledTask()
定义了一批定时任务:
-
fetchNameServerAddr
如果启动的时候没有指定Name Server的地址,则会尝试通过http等其他的方式获取
周期:延迟10秒,2分钟一次 -
updateTopicRouteInfoFromNameServer
从Name Server获取Topic及Route信息,并更新
周期:延迟10毫秒,每30秒一次 -
cleanOfflineBroker,sendHeartbeatToAllBrokerWithLock
清理以经下线的broker,发送心跳
周期:延迟1秒,每30秒一次 -
persistAllConsumerOffset
持久化 offset
周期:延迟10秒,每5秒执行一次 -
adjustThreadPool
调整线程池大小
周期:延迟1分钟,每1分钟执行一次
源码如下:
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
五、PullMessageServic.start()
启动拉取服务