总体流程
首先从demo为入口分析整个启动过程
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
try {
String message = "send message";
final Message msg = new Message("topic", "tag",// tag
message.getBytes());// body
SendResult sendResult = producer.send(msg);
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
DefaultMQProducer构造方法如下:
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);
}
然后调用start方法后,即开始初始化流程,其间接调用了DefaultMQProducerImpl的start方法,即上面构造方法创建的DefaultMQProducerImpl,代码如下:
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 检查producerGroup
this.checkConfig();
// 如果producerGroup为该值,则改变instanceName为pid
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 获取或者创建客户端实例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 实例注册,实际是放到一个map中,producerGroup
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {// 是否要启动该实例
mQClientFactory.start();
}
this.serviceState = ServiceState.RUNNING;
break;
case 其他状态不处理:
//....
}
// 心跳发送
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
MQClientInstance创建和启动
MQClientInstance主要是客户端的一些操作的集合(不知道该怎么总结好....),通过MQClientManager的getAndCreateMQClientInstance方法创建,这里使用了单例模式去创建MQClientManager
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) {
return getAndCreateMQClientInstance(clientConfig, null);
}
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
//ip@instanceName@unitName
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
// 创建MQClientInstance并放入map中
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
} else {
}
}
return instance;
}
构造方法如下:
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
// 通信服务的一些配置
this.nettyClientConfig = new NettyClientConfig();
//设置线程数
this.nettyClientConfig.setClientCallbackExecutorThreads(
clientConfig.getClientCallbackExecutorThreads());
//网络请求的处理器
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
// 客户端一些基础API
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
}
this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);
// 拉取消息服务
this.pullMessageService = new PullMessageService(this);
// rebalance服务
this.rebalanceService = new RebalanceService(this);
//创建一个group为CLIENT_INNER_PRODUCER_GROUP的producer
// 该producer主要用于将消息发回Broker
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
// 将用户自建的producer属性复制过来li
this.defaultMQProducer.resetClientConfig(clientConfig);
// Consumer一些消费情况(TPS,RT等)的统计管理
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
}
可以看到MQClientInstance里包含了客户端(Producer、Consumer)的一些通用操作以及一些属性的保存,其中一些具体是功能后续文章分析
上面有点需要注意:
- 在判断producerGroup为MixAll.CLIENT_INNER_PRODUCER_GROUP时会改变InstanceName为pid,为什么需要这样做呢?
个人理解如下:
由于该Producer是系统内置的producer,用户系统行为的消息发送,如果一个JVM内用户启动了多个Producer,那么内置的producer是可以用一个的,所以这里将instanceName改成pid就好了,那么在getAndCreateMQClientInstance中获取的clientId(ip@instanceName@unitName)就会一样,就能取出之前创建的MQClientInstance对象
启动方法如下:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
}
//netty相关通信的初始化,然后开启一个定时任务扫描responseTable
this.mQClientAPIImpl.start();
// 启动定时任务
this.startScheduledTask();
// 启动消息拉取服务
this.pullMessageService.start();
// 启动Rebalance拉取服务
this.rebalanceService.start();
// 启动内置producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
this.serviceState = ServiceState.RUNNING;
break;
case//其他状态不处理;
}
}
}
MQClientInstance启动的时候会把一些客户端相关的服务都启动,另外启动了很多重要的定时任务:
- 拉取NameServer地址
- 更新Topic路由信息
- 清除下线的Broker
- 发送心跳给所有Broker
- 持久化Consumer的消费offset
- 调整线程池
整个启动的大体过程就是如此,细节很多,不在这里展开,会在另外文章具体分析