RocketMQ源码解析——存储部分(7)延迟消息的原理`ScheduleMessageService`

@[toc]

延迟消息描述介绍

RocketMQ的定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。broker有配置项messageDelayLevel,默认值为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevelbroker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:

  • level 为 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queuequeueId =delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

源码分析

第一次存储消息

 第一次存储延迟消息是在CommitLog的putMessage方法中进行的,关于这部分代码分析可以看看前面的分析CommitLog文件的文章。这里不重复分析,只截取部分的代码片段出来。

 public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    ......
     //如果不是事务消息 或者 是事务消息的提交阶段
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 如果设置了延迟时间
            if (msg.getDelayTimeLevel() > 0) {
                //延迟级别不能超过最大的延迟级别,超过也设置为最大的延迟级别
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                //设置延迟消息的topic
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                //延迟消息的queueId= 延迟级别-1
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId 备份真正的topic和queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }
    ......
 }

 后续的把消息保存到CommitLog文件的代码没有贴出来,这里是第一次存储的关键部分。

  1. 这里会先判断消息的标志位,如果标识位不是事务消息或者事务消息的提交阶段。
  2. 会进一步判断是不是设置了延迟时间。
  3. 如果设置的延迟时间大于最大的延迟时间则把延迟时间设置为最大延迟时间
  4. 把消息的queueId属性修改为PROPERTY_REAL_QUEUE_ID,对应的topic属性设置为PROPERTY_REAL_TOPIC。同时把真正的queueIdtopic保存在property属性中。然后保存到CommitLog。

 在这里可以看到RocketMQ对于延迟消息,第一次的消息存储,会把消息的topicqueueId先修改,然后存放到特定的topic中去进行保存。

第二次消息存储

 RocketMQ中有一个专门处理topicRMQ_SYS_SCHEDULE_TOPIC的服务类ScheduleMessageService。这个类的初始化是在DefaultMessageStore中会在RocketMQ的Broker启动的时候初始化。

初始化延迟文件和配置

ScheduleMessageService在Broker启动的时候会先调用其load方法,加载delayOffset.json文件然后加载对应的延迟级别配置。

    public boolean load() {
        //调用父类的加载文件的方法,父类会调用子类实现的configFilePath方法确定文件
        boolean result = super.load();
        //加载成功则进行解析延迟级别配置
        result = result && this.parseDelayLevel();
        return result;
    }

    @Override
    public String configFilePath() {
        //获取`delayOffset.json`文件
        return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()
            .getStorePathRootDir());
    }

    public boolean parseDelayLevel() {
        //不同延迟级别的基础时间长度,单位为毫秒
        HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
        timeUnitTable.put("s", 1000L);
        timeUnitTable.put("m", 1000L * 60);
        timeUnitTable.put("h", 1000L * 60 * 60);
        timeUnitTable.put("d", 1000L * 60 * 60 * 24);
        //获取延迟配置
        String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
        try {
            //进行分割解析,分割符号为空格
            String[] levelArray = levelString.split(" ");
            for (int i = 0; i < levelArray.length; i++) {
                String value = levelArray[i];
                String ch = value.substring(value.length() - 1);
                //获取对应的延迟毫秒数
                Long tu = timeUnitTable.get(ch);
                int level = i + 1;
                //寻找最大的延迟等级
                if (level > this.maxDelayLevel) {
                    this.maxDelayLevel = level;
                }
                //获取延迟的时长
                long num = Long.parseLong(value.substring(0, value.length() - 1));
                //计算真正的时长
                long delayTimeMillis = tu * num;
                //保存到延迟级别缓存中
                this.delayLevelTable.put(level, delayTimeMillis);
            }
        } catch (Exception e) {
            log.error("parseDelayLevel exception", e);
            log.info("levelString String = {}", levelString);
            return false;
        }

        return true;
    }

 上面就是对应的延迟级别的解析和配置,从上面可以看到我们可以通过修改配置文件的方式来修改RocketMQ的最大延迟时间和对应的延迟级别。

处理延迟消息

ScheduleMessageService是一个不停运行的线程,在start方法中会不断的针对不同延迟级别的消息进行处理

   public void start() {
        //设置运行状态为开始
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            //迭代延迟级别的缓存
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                //获取等级和 延迟时间长度
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                //获取对应延迟级别的偏移量缓存,这里缓存的是ConsumeQueue文件中的消息的偏移量
                Long offset = this.offsetTable.get(level);
                //如果偏移量为null,说明没有消息需要处理,这设置为0
                if (null == offset) {
                    offset = 0L;
                }
                //如果延迟级别不为null,则构建DeliverDelayedMessageTimerTask任务进行处理
                if (timeDelay != null) {
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
        ......
    }

 这里主要逻辑就是循环迭代对应的延迟级别缓存,然后根据不同的等级来获取对应的偏移量缓存。然后根据偏移量和延迟级创建一个DeliverDelayedMessageTimerTask进一步的处理。这里要说明的是offsetTable中存的是消息在ConsumeQueue中的偏移量。关于这部分的可以看看前面的ConsumeQueue相关的文章

 这里进一步看看DeliverDelayedMessageTimerTask

  @Override
        public void run() {
            try {
                if (isStarted()) {
                    //执行检查消息是否到时间的逻辑
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
                // XXX: warn and notify me
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
            }
        }

 主要逻辑在executeOnTimeup中。这个方法的逻辑有点长,这里贴出主要的部分,然后进行分析

       public void executeOnTimeup() {
            //根据 RMQ_SYS_SCHEDULE_TOPIC 和 延迟级别 找到对应的ConsumeQueue
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
                //根据传入的  offset 的从ConsumeQueue中获取对应的消息信息缓冲,这里获取到的不是真实的消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        //获取额外的信息单元
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        //从buffer中每次获取20个byte长度的信息,因为ConsumeQueue的存储单元大小为20byte
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            //获取消息在CommitLog中的真实位置
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            //获取消息的大小
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            //消息对应的tag的hashcode
                            long tagsCode = bufferCQ.getByteBuffer().getLong();
                            //如果额外的信息不为空,则获取
                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                            tagsCode, offsetPy, sizePy);
                                     //从CommitLog中获取消息的存储时间
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }

                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                            //下一个消息单元的偏移量
                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;
                            //消息的延迟时间到了
                            if (countdown <= 0) {
                                //锁定消息
                                MessageExt msgExt =
                                        ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                                offsetPy, sizePy);

                                if (msgExt != null) {
                                    try {
                                        //构建真正的消息,把真实的Topic和QueueId恢复
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                                    msgInner.getTopic(), msgInner);
                                            continue;
                                        }
                                        //把消息保存到CommitLog,等待消费
                                        PutMessageResult putMessageResult =
                                                ScheduleMessageService.this.writeMessageStore
                                                        .putMessage(msgInner);
                                        //保存成功,则继续处理下一条消息
                                        if (putMessageResult != null
                                                && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                            continue;
                                        } else {
                                            // XXX: warn and notify me
                                            log.error(
                                                    "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                                    msgExt.getTopic(), msgExt.getMsgId());
                                            //如果保存失败,则创建新的延迟任务。10秒钟之后重试
                                            ScheduleMessageService.this.timer.schedule(
                                                    new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                            nextOffset), DELAY_FOR_A_PERIOD);
                                            //更新偏移量
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                    nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) {
                      .......
                      //剩余的这些部分的逻辑就是重新构建DeliverDelayedMessageTimerTask,等待下个时间段的重试
        }

分析一下主要的逻辑步骤:

  1. 根据 topic为RMQ_SYS_SCHEDULE_TOPIC 和 延迟级别为queueId 找到对应的ConsumeQueue
  2. 根据传入的 offset 的从ConsumeQueue中获取对应的消息信息缓冲,这里获取到的不是真实的消息,而是前面分析的重新设置后的消息
  3. 从buffer中每次获取20个byte长度的信息,因为ConsumeQueue的存储单元大小为20byte。然后根据Offset从CommitLog中获取消息的落盘时间
  4. 计算当前时间和落盘时间的时间差,检查延迟时间是否到了
  5. 恢复消息真实的TopicQueueId然后保存到CommitLog中。等待消息的消费
  6. 期间如果有存在失败的,则重新创建DeliverDelayedMessageTimerTask任务,等待下一次的处理

 这就是整个延迟消息的处理逻辑。其实就是先把消息真实的信息保存在消息属性中,然后把消息的topic和queueId覆盖然后保存到延迟消息专用的topic中,其中queueId为延迟级别。然后等待延迟消息处理的线程处理延迟消息的topic,时间到了就恢复消息真实的topic和queueId然后重新保存到CommitLog中,等待消费。
 其中关于ConsumeQueueCommitLog的消息获取和保存的逻辑这里没有进行分析,需要了解的可以看前面的文章
CommitLog文件的文章
ConsumeQueue相关的文章

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容