一、consume queue消息存储结构
RocketMQ的消息存储是由consume queue和commit log配合完成的。其中consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。
我们可以在配置中指定consumequeue与commitlog存储的目录,每个topic下的每个queue都有一个对应的consumequeue文件,比如:
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,如下图所示:
[图片上传失败...(image-90f8b3-1563954737957)]
CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量
Size存储中消息的大小
Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)
二、ReputMessageService
(一) 核心属性
long reputFromOffset:ReputMessageService 从哪个物理偏移量开始转发消息给ConsumeQueue
(二) ConsumeQueue构建逻辑
在RocketMq中是通过ReputMessageService服务来实现ConsumeQueue的构建,ReputMessageService服务线程异步获取CommitLog的文件存储的日志信息,并且构造DispatchRequest请求,将DispatchRequest分发到CommitLogDispatcherBuildConsumeQueue中进行处理。
private void doReput() {
// 开始的物理位置偏移量如果小于CommitLog的偏移量,就设置reputFromOffset等于CommitLog的最小偏移量
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
// isCommitLogAvailable主要是判断reputFromOffset是否大于CommitLog的最大偏移量
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 从CommitLog中获取数据
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 构造DispatchRequest请求
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 将DispatchRequest转发到CommitLogDispatcherBuildConsumeQueue队列中处理
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 如果是master节点,通过监听器模式,通知该消息可以消费
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
// reputFromOffset往后移
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
// If user open the dledger pattern or the broker is master node,
// it will not ignore the exception and fix the reputFromOffset variable
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
上面的代码中,关键路径主要有三步:
1 从CommitLog中获取数据
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
getData方法最终MappedFile.selectMappedBuffer(int pos) 方法
public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos;
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
}
return null;
}
2 构造DispatchRequest请求
检查消息和并返回消息的size
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
3 消息转发
通过转发器转发请求
DefaultMessageStore.this.doDispatch(dispatchRequest);
转发器CommitLogDispatcherBuildConsumeQueue 处理DispatchRequest
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
CommitLogDispatcherBuildConsumeQueue通过调用putMessagePositionInfo方法,处理request,最终会调用到putMessagePositionInfo方法,将索引数据写到ConsumeQueue中
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
// 写到临时buffer中
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
// cqOffset代码在ConsumeQueue中的逻辑偏移量(第几个索引)
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
// 如果重复构建索引,就直接返回
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.maxPhysicOffset = offset + size;
// 追加到ConsumeQueue中,此时数据没有真正写到磁盘中,通过异步刷盘将数据刷新到磁盘中
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
三、数据恢复
(一) 数据不一致场景
生产者生产消息,并写入到CommitLog,然后通过ReputMessageService异步构造ConsumeQueue,只有索引数据写入到ConsumeQueue中以后,消息才能够被消费者消费。
但是如果索引数据还没有来得及ConsumeQueue中,此时服务器宕机,如何保证ConsumeQueue和CommitLog数据的一致呢?
(二) 解决数据不一致问题
Broker启动时,判断上次进程结束是否异常,然后加载CommitLog、ConsumeQueue等数据文件,如果文件加载成功,就调用recover方法,做数据修正,保证数据一致
public boolean load() {
boolean result = true;
try {
// 判断是否Broker上次是异常退出
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
// 加载 Commit Log
result = result && this.commitLog.load();
// 加载 Consume Queue
result = result && this.loadConsumeQueue();
if (result) {
// 加载检查点问题
this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);
// 文件恢复
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
从上面的代码中,可以看出是isTempFileExist()方法,判断Broker上一次是否异常退出,具体实现如下:
private boolean isTempFileExist() {
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
return file.exists();
}
在Broker启动的时候,会创建一个abort文件,如果Broker正常结束,就会删除改文件,如果是异常退出,那么文件就不会被删除。
在相关数据资源加载完毕以后,就要进行数据的修正,调用recover方法进行数据修正,具体实现如下:
private void recover(final boolean lastExitOK) {
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
// 正常退出,数据修正
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
// 异常退出数据修正
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
this.recoverTopicQueueTable();
}
下面分析一下异常退出是如何进行数据恢复。
(三) 异常数据恢复
异常退出需要从CommitLog的最后一个文件往前找, 找到第一个消息存储正常的文件。其次,如果commitlog 目录没有消息文件,如果在消息消费队列目录下存在文件,则需要销毁。
1 如果判断一个文件是存储正常???
Step1:首先判断文件的魔数,如果不是MESSAGE_MAGIC_CODE ,返回false ,表示该文件不符合commitlog 消息文件的存储格式。
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
if (magicCode != MESSAGE_MAGIC_CODE) {
return false;
}
Step2 :如果文件中第一条消息的存储时间等于0 , 返回false ,说明该消息存储文件中未存储任何消息。
long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
if (0 == storeTimestamp) {
return false;
}
Step3 :对比文件第一条消息的时间戳与检测点,文件第一条消息的时间戳小于文件检测点说明该文件部分消息是可靠的, 则从该文件开始恢复。
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
2 CommitLog和ConsumeQueue数据一致性保证
构造DispatchRequest,并且转发到ConsumeQueue队列中
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
// Normal data
if (size > 0) {
mappedFileOffset += size;
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
// 发送到DispatchRequest中
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
}
四、Broker进程关闭
Broker关闭时通过钩子调用BrokerController的shutdown()方法
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
最终会调用到reputMessageService.shutdown()
public void shutdown() {
// 如果isCommitLogAvailable() == true,就sleep,最多循环50次
for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
}
if (this.isCommitLogAvailable()) {
log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
}
// 线程关闭
super.shutdown();
}
isCommitLogAvailable实现逻辑如下:
private boolean isCommitLogAvailable() {
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
在borker关闭时,会判断是否abort文件
if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
shutDownNormal = true;
} else {
log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
}
//dispatchBehindBytes 最终调用behind方法
public long behind() {
return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
}