消息存储
offset?
本文要探讨的offset指的是上图中的Queue Offset。
为了保存消费的消费进度,避免重复消费,我们需要将offset保存下来。
针对集群消费,offset保存在broker,在客户端使用RemoteBrokerOffsetStore。
针对广播消费,offset保存在本地,在客户端使用LocalFileOffsetStore。
最后,比较重要的一点是,保存的offset指的是下一条消息的offset,而不是消费完最后一条消息的offset。
比如,你消费了上图中第一个Queue的offset为0的消息,其实保存的offset为1,表示下次我从offset=1的位置进行消费。
broker端
在broker端,通过ConsumerOffsetManager中的offsetTable来保存Topic下各个ConsumerGroup的消费进度。
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
从offsetTable的双层Map结构也是能够看出,我上面说的消费进度,细指为ConsumerGroup在Topic下每个queue的消费进度。
offsetTable毕竟只是内存结构,因此ConsumerOffsetManager继承了ConfigManager实现了持久化功能。
实现了encode,decode,configFilePath三个模板方法。用于指定序列化,反序列化的逻辑以及保存位置
public String encode() {
return this.encode(false);
}
@Override
public String configFilePath() {
return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
}
@Override
public void decode(String jsonString) {
if (jsonString != null) {
ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
if (obj != null) {
this.offsetTable = obj.offsetTable;
}
}
}
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
}
其中序列化,反序列化的逻辑很简单,就是使用到了我们的FastJson。
保存文件名为consumerOffset.json。
offset加载
broker启动时从本地文件加载
org.apache.rocketmq.broker.BrokerController#initialize
result = result && this.consumerOffsetManager.load();
offset持久化
定时触发,持久化到磁盘
org.apache.rocketmq.broker.BrokerController#initialize
//定期将consumeroffset持久化到磁盘
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
- shutdown触发
BrokerController#shutdown
offset更新
- ConsumerManageProcessor#updateConsumerOffset
用于consumer定时同步offset
- PullMessageProcessor
拉取消息时会顺带确认offset
- TransactionalMessageBridge
事务回查触发,暂不深入研究
consumer端
本文只讨论PUSH模式的集群消费,本地的offset缓存到RemoteBrokerOffsetStore的offsetTable中,定期同步到broker。
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
因为consumer每次重启都会重新拉取offset,只是一个临时存储,因此RemoteBrokerOffsetStore的offsetTable的设计没有像ConsumerOffsetManager那么复杂。
offset拉取
consumer启动后会进行第一次rebalance,并且之后都会定期rebalance。
在rebalance分配好messagequeue之后,会根据messagequeue生成processqueue进行消息拉取。
而在进行消息拉取前,有一个关键的操作,拉取对应messagequeue的offset。
RebalanceImpl#updateProcessQueueTableInRebalance
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//如果是顺序消费 但是lock失败 那么跳过
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
//从offsetstore删除之前的数据,可能之前有一段时间属于该消费者
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// 获取该mq应该从哪里开始消费
// pull模式 默认是0
// push模式 动态计算
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
//创建新的pullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
result = 0L;
} else {
result = -1;
}
break;
}
case CONSUME_FROM_TIMESTAMP: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
} else {
try {
long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
default:
break;
}
return result;
}
其中获取消息拉取初始位置有三种策略
CONSUME_FROM_LAST_OFFSET 最新的offset
CONSUME_FROM_FIRST_OFFSET 第一个offset
CONSUME_FROM_TIMESTAMP 根据时间戳获取offset
但是从源码中可以看出来,实际上的逻辑和我们想象的有点不同,上面三个的逻辑的触发前提是,从broker拉取不到offset进度。
这应该是为了防止重复消费以及少消费,毕竟rocketmq是业务相关的mq。
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
offset更新
在consumer端,针对offsetTable的更新,当然通过消费消息触发。
并发消费
ConsumeMessageConcurrentlyService#processConsumeResult
//removeMessage会返回offset
//不管消费成功还是失败 都会确认offset
//失败的消息会在重试topic
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
//更新offsetstore
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
//increaseOnly 表示offset只能增加
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
针对并发消费的offset,更新值来源于ProcessQueue#removeMessage方法
/**
* 移除pq中的msg,并且返回剩下第一条消息的offset
* 如果消息全部消费完,返回this.queueOffsetMax + 1
* 如果msgs为空 返回-1
* @param msgs
* @return
*/
public long removeMessage(final List<MessageExt> msgs) {
long result = -1;
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!msgTreeMap.isEmpty()) {
result = this.queueOffsetMax + 1;
int removedCnt = 0;
for (MessageExt msg : msgs) {
//通过map的key移除,也就是queueoffset
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
//如果消息没有全部消费完毕,剩下消息中最小的那个
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (Throwable t) {
log.error("removeMessage exception", t);
}
return result;
}
removeMessage的逻辑,用到了滑动窗口的算法。
比如10条消息,offset为 0 - 9。
在多线程并发消费的场景下
比如我第一个线程消费了offset为0的消息,那么offsetTable中的offset更新为1
然后我第二个线程消费了offset为5的消息,removeMessage返回的offset还是为1
只有前面的消息全被消费了,窗口才会滑动
顺序消费
ConsumeMessageOrderlyService#processConsumeResult
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
顺序消费,暂不研究。
offset持久化
最终的offset以broker为准,因此本地的offset要定期持久化到offset。
主要持久化逻辑在persistAll和persist方法。
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist
public void persist(MessageQueue mq) {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
}
}
persistAll和persist逻辑大致相同,核心逻辑都是通过updateConsumeOffsetToBroker持久化到broker。
触发持久化逻辑的时机有以下4个
定时同步
MQClientInstance#startScheduledTask
//定时同步offset到broker
//除了拉消息的时候会同步下 也有定时
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//持久化消费进度
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
shutdown
DefaultMQPullConsumerImpl#shutdown
public synchronized void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.persistConsumerOffset();//here
this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
DefaultMQPushConsumerImpl#shutdown
public synchronized void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.consumeMessageService.shutdown();
this.persistConsumerOffset();//here
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.destroy();
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
Rebalance
当一个queue不再属于当前consumer的时候,需要同步进步给broker,以便于新拿到queue的consumer从最新未消费的消息开始拉取
RebalancePullImpl#removeUnnecessaryMessageQueue
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
return true;
}
拉取消息
拉取消息的时候会顺带commit offset
DefaultMQPushConsumerImpl#pullMessage
// 拉取消息的时候顺带ack消息进度
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
//READ_FROM_MEMORY 获取本地缓存的消费进度
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),//需要拉取的下个offset ,拉取到了 ,不代表被消费到
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,//确认的offset
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC, //异步拉取消息
pullCallback //注意这个回调也传过去了
);
PullMessageProcessor#processRequest
boolean storeOffsetEnable = brokerAllowSuspend;
//需要hasCommitOffsetFlag=true
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
//需要当前broker是master
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
//这边应该是保存改group在该topic下面的消费进度
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
总结
- 针对集群消费offset保存在broker,针对广播消费offset保存在本地
- offset保存的值为消息下次开始消费的位置,而不是上次消费结束的位置
- 针对offset的提交采取基于TreeMap的滑动窗口机制
- 消费者启动会先从broker拉取对应Topic的offset进度,然后在进行消息拉取
- offset持久化触发的几种方式
问题
消息消费失败是否影响窗口滑动
正常情况下,消息消费失败不会影响窗口滑动,因为针对消费失败的消息,client会进行sendback。
sendback之后,消息经过延迟之后会发往Topic=%RETRY%{CONSUMERGROUP}的Retry队列
每个ConsumerGroup会强制监听Retry队列的消息
ConsumeMessageConcurrentlyService#processConsumeResult
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
//因为msgsize=1,所以只有失败的时候才会进入下面的循环
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//消费失败的消息 重新插入到commitlog 发送到group对应重试topic
boolean result = this.sendMessageBack(msg, context);
//如果发送请求失败 那么本地再消费一次试试
if (!result) {
//本地的重试也算重试次数!!!
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
//发送到重试队列失败的消息,重新消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
//如果消费失败 重新消费
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
而sendMessageBack失败的消息,会重新封装成另一个ConsumeRequest在本地再次消费。
这些失败的消息会从之前的consumeRequest移除,因此也就影响到了ProcessQueue#removeMessage的返回值。
但是这是一个优化,重试之后窗口大概率上还是会正常滑动。
消费者并发消费如何保证提交位置偏移量正确,保证消费消费不丢失? 比如a线程消费msgid=1,b线程消费msgid=2,b线程消费速度比a线程快。如何避免a线程消费失败,消息不丢失?
如何保证并发消费提交偏移量正确?
基于TreeMap的滑动窗口
如何保证消息消费不丢失?
滑动窗口+broker远端保存+sendback+本地重试兜底
应用重启,消息从哪里开始消费
如果broker保存了offset
那么从对应offset重新拉取消息
如果broker没有保存offset,或者其他情况丢失
那么根据配置的策略,从对应的offset开始拉取