RocketMQ源码(一):NameServer的启动
RocketMQ源码(二):broker的启动(一)
RocketMQ源码(三):broker的启动(二)
RocketMQ源码(五):producer发送消息
RocketMQ源码(六):broker接收消息
RocketMQ源码(七):consumer的启动
RocketMQ源码(八):consumer消息拉取(一)
RocketMQ源码(九):consumer消息拉取(二)
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
上面这段代码是org.apache.rocketmq.example.simple包下,provider的一个简单demo程序,以这个程序作为切入点,来看下producer的启动过程和消息的推送过程。首先来认识下DefaultMQProducer这个类,先看一下它的属性
// 生产者组
private String producerGroup;
// 默认的topic名称,用于在NameServer端标识broker接收自动创建topic
private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
// 默认的topic队列数
private volatile int defaultTopicQueueNums = 4;
// 发送消息超时时间
private int sendMsgTimeout = 3000;
// 消息体进行压缩的阈值
private int compressMsgBodyOverHowmuch = 1024 * 4;
// 同步失败后的发送重试次数
private int retryTimesWhenSendFailed = 2;
// 异步发送失败后的重试次数
private int retryTimesWhenSendAsyncFailed = 2;
// 内部发送失败时,是否发送到另一个broker
private boolean retryAnotherBrokerWhenNotStoreOK = false;
// 消息体最大容量
private int maxMessageSize = 1024 * 1024 * 4; // 4M
private TraceDispatcher traceDispatcher = null;
DefaultMQProducer继承于ClientConfig,同时也继承了一些其他的属性
// 从系统参数获取nameServer地址
private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
// 获取本地ip
private String clientIP = RemotingUtil.getLocalAddress();
// 实例名
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
protected String namespace;
protected AccessChannel accessChannel = AccessChannel.LOCAL;
/**
* Pulling topic information interval from the named server
*/
// 轮询NameServer的时间间隔
private int pollNameServerInterval = 1000 * 30;
/**
* Heartbeat interval in microseconds with message broker
*/
// 向Broker发送心跳包的时间间隔
private int heartbeatBrokerInterval = 1000 * 30;
回到一开始的示例代码,首先是实例化一个DefaultMQProducer
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
// 定义发送消息线程池的队列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
// 默认的发送消息的线程池
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), // 获取Java虚拟机的可用的处理器数量
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
主要就是实例化了一个 DefaultMQProducerImpl 对象,从名字可以看出它是DefaultMQProducer 的一个默认实现类。在后面的启动过程中会详细的说明这个类。回到示例代码,接下来就是调用start方法,正式的启动producer
public void start() throws MQClientException {
// 设置producerGroup
this.setProducerGroup(withNamespace(this.producerGroup));
// 启动defaultMqProducerImpl
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
从这里可以看到主要就是调用了DefaultMQProducerImpl的start方法
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// 先设置状态为 START_FAILED
this.serviceState = ServiceState.START_FAILED;
// 检查配置
this.checkConfig();
// 设置instanceName,以便后续生成唯一的clientID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 初始化MQClientInstance,负责与broker和namesrv进行网络通信
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 校验 producerGroup 是否重复
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 保存的默认 TopicKey 即:TBW102,用于解决 topic 自动创建问题
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 扫描超时的请求
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
首先判断的是ServiceState,也就是启动状态
public enum ServiceState {
/**
* Service just created,not start
*/
CREATE_JUST,
/**
* Service Running
*/
RUNNING,
/**
* Service shutdown
*/
SHUTDOWN_ALREADY,
/**
* Service Start failure
*/
START_FAILED;
}
从名字可以明显的看出,这个状态对应的含义
回到start方法,首先设置状态为 START_FAILED ,防止启动过程中失败,接下来是实例化mQClientFactory,可以看到的是这里通过MQClientManager的单例对象,getOrCreateMQClientInstance方法创建了一个mQClientFactory对象。
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
clientId主要由生产者客户端的ip地址以及实例名称,根据unitName的有无,附加unitName
因此,无论是生产者还是消费者,同一个客户端只会创建一个MQClientInstance,接下来看下MQClientInstance的相关属性,认识下MQClientInstance
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final InternalLogger log = ClientLogger.getLog();
// 配置信息
private final ClientConfig clientConfig;
// MQClientInstance在同一台机器上的创建序号
private final int instanceIndex;
// 客户端id
private final String clientId;
private final long bootTimestamp = System.currentTimeMillis();
// 生产组--》消息生产者,也就是在应用程序一端,每个生产者组在同一台应用服务器只需要初始化一个生产者实例
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
// 消费组--》消费者,也就是在应用程序一端,每个消费组,在同一台应用服务器只需要初始化一个消费者即可
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
// 主要是处理运维命令的
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
// 网络配置
private final NettyClientConfig nettyClientConfig;
// MQ 客户端实现类
private final MQClientAPIImpl mQClientAPIImpl;
// MQ 管理命令实现类
private final MQAdminImpl mQAdminImpl;
// topic 路由信息
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
private final Lock lockNamesrv = new ReentrantLock();
private final Lock lockHeartbeat = new ReentrantLock();
// broker信息,这些信息存在于NameServer,但缓存在本地客户端,供生产者、消费者共同使用
// 有可能导致消息丢失
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MQClientFactoryScheduledThread");
}
});
// 客户端处关于Broker和NameSrv的命令处理器
private final ClientRemotingProcessor clientRemotingProcessor;
// 消息拉取线程,一个MQClientInstance 只会启动一个消息拉取线程
private final PullMessageService pullMessageService;
// 队列动态负载线程
private final RebalanceService rebalanceService;
// 消息生产者
private final DefaultMQProducer defaultMQProducer;
// 消费端统计
private final ConsumerStatsManager consumerStatsManager;
// 心跳包发送次数
private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
// 状态
private ServiceState serviceState = ServiceState.CREATE_JUST;
private Random random = new Random();
其中的MQClientAPIImpl继续看下它的初始化方法
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
// 初始化 remotingClient 并且注册各种事件的处理器
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
this.remotingClient.registerRPCHook(rpcHook);
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
从这里可以看到,绑定了很多的请求类型的处理给ClientRemotingProcessor,同时初始化了NettyRemotingClient,也就是netty客户端
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
if (nettyClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
log.info("SSL enabled for client");
} catch (IOException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
log.error("Failed to create SSLContext", e);
throw new RuntimeException("Failed to create SSLContext", e);
}
}
}
此时Netty的客户端仅仅完成了对Bootstrap的初始化,以及对NioEventLoopGroup的设置和初始化
再回到DefaultMQProducerImpl 的start方法
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
保存的默认 TopicKey 即:TBW102,用于解决 topic 自动创建问题
因为startFactory为true,所以接下来会调用mQClientFactory也就是MQClientInstance的start方法
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 开启 netty 服务
this.mQClientAPIImpl.start();
// 开启一些定时任务
this.startScheduledTask();
// 开启拉取消息服务,PUSH模式的消费者使用,pull模式下会服务会一直等待
this.pullMessageService.start();
// consumer平衡消费组和消息队列的关系
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
和DefaultMQProducer一样,先设置serviceState为START_FAILED,然后检查NamesrvAddr是否为空,如果为空,通过fetchNameServerAddr方法,发起httpGet请求从http://jmenv.tbsite.net:8080/rocketmq/nsaddr获取nameserver地址。接下来通过mQClientAPIImpl.start方法真正的开启netty服务
public void start() {
this.remotingClient.start();
}
这里实际调用的是NettyRemotingClient的start方法
public void start() {
// 创建 EventExecutorGroup
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
// 指定空闲时间后断开连接
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
// 当nettyclient监听到Netty事件,比如连接 断开 异常等情况 执行客户端自定义的事件处理
new NettyConnectManageHandler(),
//
new NettyClientHandler());
}
});
// 过滤超时的response
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
这里完成了Bootstrap对前面创建的EventLoopGroup以及handler的绑定,和Broker启动过程中一样。
回到MQClientInstance中,接下来通过startScheduledTask开启一些定时任务
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
// 定时更新NameServer地址
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 定时更新consumer和producer关于topic缓存的路由信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// 清空已经下线的broker信息,向所有的broker发送心跳。默认时间30s,因此这个时间段内有可能消息丢失
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 持久化 consumer 的消费进度
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 自适应调节线程池大小
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
第一个就是调用前面说的fetchNameServerAddr方法,不断获取最新的NameServer地址
第二个就是定时调用updateTopicRouteInfoFromNameServer
public void updateTopicRouteInfoFromNameServer() {
Set<String> topicList = new HashSet<String>();
// Consumer
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}
// Producer
{
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
Set<String> lst = impl.getPublishTopicList();
topicList.addAll(lst);
}
}
}
for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}
先是拿到当前实例下所有consumer和provider的topic,然后再调用updateTopicRouteInfoFromNameServer方法
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
// 为了避免重复从 NameServer 获取配置信息,在这里使用了ReentrantLock,并且设有超时时间。固定为3000s
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
// 获取默认topic(TBW102)的路由信息
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
}
// 从NameServer获取指定topic的路由信息
else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
// 判断路由信息是否变化
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
// 更新broker信息
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// 更新发送者的缓存
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// 更新消费者的缓存
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
先是通过getTopicRouteInfoFromNameServer从nameServer服务上获取topic对应的路由信息,封装到TopicRouteData 对象中
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
throws RemotingException, MQClientException, InterruptedException {
return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
break;
}
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
接下来通过topicRouteDataIsChange判断路由信息是否变化了
private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
if (olddata == null || nowdata == null)
return true;
TopicRouteData old = olddata.cloneTopicRouteData();
TopicRouteData now = nowdata.cloneTopicRouteData();
Collections.sort(old.getQueueDatas());
Collections.sort(old.getBrokerDatas());
Collections.sort(now.getQueueDatas());
Collections.sort(now.getBrokerDatas());
return !old.equals(now);
}
当判断了需要更新的时候,就会首先从topicRouteData中获取BrokerData,更新broker的路由信息,再分别更新provider和consumer的路由信息
第三个定时器调用了两个方法,分别用于移除废弃的broker和向所有的broker发送心跳
第四个定时器是用于消费者持久化队列的消费信息到broker,这个在分析消费者的时候再详细说明
第五定时器是调整消费者端的线程池的大小
回到MQClientInstance的start方法
开启pullMessageService,拉取消息,和消费者相关
开启rebalanceService,均衡消费者,和消费者相关
再回到DefaultMQProducerImpl的start方法中
接下来是
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 扫描超时的请求
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
这里是向所有的broker发送一次心跳,以及扫描所有超时的请求
到此,Producer的启动结束