rocketmq之ConsumeQueue学习笔记

一、consume queue消息存储结构

RocketMQ的消息存储是由consume queue和commit log配合完成的。其中consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。

我们可以在配置中指定consumequeue与commitlog存储的目录,每个topic下的每个queue都有一个对应的consumequeue文件,比如:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,如下图所示:

[图片上传失败...(image-90f8b3-1563954737957)]

  1. CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量

  2. Size存储中消息的大小

  3. Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)

二、ReputMessageService

(一) 核心属性

long reputFromOffset:ReputMessageService 从哪个物理偏移量开始转发消息给ConsumeQueue

(二) ConsumeQueue构建逻辑

在RocketMq中是通过ReputMessageService服务来实现ConsumeQueue的构建,ReputMessageService服务线程异步获取CommitLog的文件存储的日志信息,并且构造DispatchRequest请求,将DispatchRequest分发到CommitLogDispatcherBuildConsumeQueue中进行处理。

private void doReput() {
    // 开始的物理位置偏移量如果小于CommitLog的偏移量,就设置reputFromOffset等于CommitLog的最小偏移量
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
        log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
            this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    // isCommitLogAvailable主要是判断reputFromOffset是否大于CommitLog的最大偏移量
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
            && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
            break;
        }
                // 从CommitLog中获取数据
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        if (result != null) {
            try {
                this.reputFromOffset = result.getStartOffset();

                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    // 构造DispatchRequest请求
                    DispatchRequest dispatchRequest =
                        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            // 将DispatchRequest转发到CommitLogDispatcherBuildConsumeQueue队列中处理
                            DefaultMessageStore.this.doDispatch(dispatchRequest);

                            // 如果是master节点,通过监听器模式,通知该消息可以消费
                            if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                            }
                                                        // reputFromOffset往后移
                            this.reputFromOffset += size;
                            readSize += size;
                            if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                    .addAndGet(dispatchRequest.getMsgSize());
                            }
                        } else if (size == 0) {
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                    } else if (!dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                            this.reputFromOffset += size;
                        } else {
                            doNext = false;
                            // If user open the dledger pattern or the broker is master node,
                            // it will not ignore the exception and fix the reputFromOffset variable
                            if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                    this.reputFromOffset);
                                this.reputFromOffset += result.getSize() - readSize;
                            }
                        }
                    }
                }
            } finally {
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}

上面的代码中,关键路径主要有三步:

1 从CommitLog中获取数据

SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);

getData方法最终MappedFile.selectMappedBuffer(int pos) 方法

public SelectMappedBufferResult selectMappedBuffer(int pos) {
    int readPosition = getReadPosition();
    if (pos < readPosition && pos >= 0) {
        if (this.hold()) {
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            byteBuffer.position(pos);
            int size = readPosition - pos;
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
        }
    }
    return null;
}

2 构造DispatchRequest请求

检查消息和并返回消息的size

DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);

3 消息转发

通过转发器转发请求

DefaultMessageStore.this.doDispatch(dispatchRequest);

转发器CommitLogDispatcherBuildConsumeQueue 处理DispatchRequest

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

CommitLogDispatcherBuildConsumeQueue通过调用putMessagePositionInfo方法,处理request,最终会调用到putMessagePositionInfo方法,将索引数据写到ConsumeQueue中

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) {
    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }
        // 写到临时buffer中
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);
        // cqOffset代码在ConsumeQueue中的逻辑偏移量(第几个索引)
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {

        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            this.minLogicOffset = expectLogicOffset;
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                + mappedFile.getWrotePosition());
        }

        if (cqOffset != 0) {
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
                        // 如果重复构建索引,就直接返回
            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }

            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                    "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset,
                    currentLogicOffset,
                    this.topic,
                    this.queueId,
                    expectLogicOffset - currentLogicOffset
                );
            }
        }
        this.maxPhysicOffset = offset + size;
        // 追加到ConsumeQueue中,此时数据没有真正写到磁盘中,通过异步刷盘将数据刷新到磁盘中
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

三、数据恢复

(一) 数据不一致场景

生产者生产消息,并写入到CommitLog,然后通过ReputMessageService异步构造ConsumeQueue,只有索引数据写入到ConsumeQueue中以后,消息才能够被消费者消费。

但是如果索引数据还没有来得及ConsumeQueue中,此时服务器宕机,如何保证ConsumeQueue和CommitLog数据的一致呢?

(二) 解决数据不一致问题

Broker启动时,判断上次进程结束是否异常,然后加载CommitLog、ConsumeQueue等数据文件,如果文件加载成功,就调用recover方法,做数据修正,保证数据一致

public boolean load() {
    boolean result = true;
    try {
        // 判断是否Broker上次是异常退出
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
        // 加载 Commit Log
        result = result && this.commitLog.load();

        // 加载 Consume Queue
        result = result && this.loadConsumeQueue();
        if (result) {
            // 加载检查点问题
            this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            this.indexService.load(lastExitOK);
                        // 文件恢复
            this.recover(lastExitOK);
            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }
    if (!result) {
        this.allocateMappedFileService.shutdown();
    }
    return result;
}

从上面的代码中,可以看出是isTempFileExist()方法,判断Broker上一次是否异常退出,具体实现如下:

private boolean isTempFileExist() {
    String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
    File file = new File(fileName);
    return file.exists();
}

在Broker启动的时候,会创建一个abort文件,如果Broker正常结束,就会删除改文件,如果是异常退出,那么文件就不会被删除。

在相关数据资源加载完毕以后,就要进行数据的修正,调用recover方法进行数据修正,具体实现如下:

private void recover(final boolean lastExitOK) {
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    if (lastExitOK) {
        // 正常退出,数据修正
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        // 异常退出数据修正
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }
    this.recoverTopicQueueTable();
}

下面分析一下异常退出是如何进行数据恢复。

(三) 异常数据恢复

异常退出需要从CommitLog的最后一个文件往前找, 找到第一个消息存储正常的文件。其次,如果commitlog 目录没有消息文件,如果在消息消费队列目录下存在文件,则需要销毁。

1 如果判断一个文件是存储正常???

Step1:首先判断文件的魔数,如果不是MESSAGE_MAGIC_CODE ,返回false ,表示该文件不符合commitlog 消息文件的存储格式。

ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
if (magicCode != MESSAGE_MAGIC_CODE) {
    return false;
}

Step2 :如果文件中第一条消息的存储时间等于0 , 返回false ,说明该消息存储文件中未存储任何消息。

long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
if (0 == storeTimestamp) {
    return false;
}

Step3 :对比文件第一条消息的时间戳与检测点,文件第一条消息的时间戳小于文件检测点说明该文件部分消息是可靠的, 则从该文件开始恢复。

if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
    log.info("find check timestamp, {} {}",
        storeTimestamp,
        UtilAll.timeMillisToHumanString(storeTimestamp));
    return true;
}

2 CommitLog和ConsumeQueue数据一致性保证

构造DispatchRequest,并且转发到ConsumeQueue队列中

DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();

if (dispatchRequest.isSuccess()) {
    // Normal data
    if (size > 0) {
        mappedFileOffset += size;

        if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
            if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                this.defaultMessageStore.doDispatch(dispatchRequest);
            }
        } else {
            // 发送到DispatchRequest中
            this.defaultMessageStore.doDispatch(dispatchRequest);
        }
    }
    // Come the end of the file, switch to the next file
    // Since the return 0 representatives met last hole, this can
    // not be included in truncate offset
    else if (size == 0) {
        index++;
        if (index >= mappedFiles.size()) {
            // The current branch under normal circumstances should
            // not happen
            log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
            break;
        } else {
            mappedFile = mappedFiles.get(index);
            byteBuffer = mappedFile.sliceByteBuffer();
            processOffset = mappedFile.getFileFromOffset();
            mappedFileOffset = 0;
            log.info("recover next physics file, " + mappedFile.getFileName());
        }
    }
}

四、Broker进程关闭

Broker关闭时通过钩子调用BrokerController的shutdown()方法

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    private volatile boolean hasShutdown = false;
    private AtomicInteger shutdownTimes = new AtomicInteger(0);

    @Override
    public void run() {
        synchronized (this) {
            log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
            if (!this.hasShutdown) {
                this.hasShutdown = true;
                long beginTime = System.currentTimeMillis();
                controller.shutdown();
                long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
            }
        }
    }
}, "ShutdownHook"));

最终会调用到reputMessageService.shutdown()

public void shutdown() {
    // 如果isCommitLogAvailable() == true,就sleep,最多循环50次
    for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException ignored) {
        }
    }

    if (this.isCommitLogAvailable()) {
        log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
            DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
    }
        // 线程关闭
    super.shutdown();
}

isCommitLogAvailable实现逻辑如下:

private boolean isCommitLogAvailable() {
    return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}

在borker关闭时,会判断是否abort文件

if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
    this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
    shutDownNormal = true;
} else {
    log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
}

//dispatchBehindBytes 最终调用behind方法
public long behind() {
    return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,482评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,377评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,762评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,273评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,289评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,046评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,351评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,988评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,476评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,948评论 2 324
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,064评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,712评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,261评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,264评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,486评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,511评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,802评论 2 345

推荐阅读更多精彩内容