mqtt离线消息的实现

在介绍mqtt离线消息之前,先了解下mqtt协议的几个概念:
QoS(Quality of Service)
指代消息传输的服务质量。它包括以下级别:

服务质量 具体含义
QoS0 代表最多分发一次
QoS1 代表至少达到一次
QoS2 代表仅分发一次

cleanSession
cleanSession 标志是 MQTT 协议中对一个客户端建立 TCP 连接后是否关心之前状态的定义。具体语义如下:

cleanSession 具体含义
true 非持久化连接,客户端再次上线时,将不再关心之前所有的订阅关系以及离线消息
false 持久化连接,客户端再次上线时,还需要处理之前的离线消息,而之前的订阅关系也会持续生效

QoS 和 cleanSession 的不同组合产生的结果如下表所示:

QoS 级别 cleanSession=true cleanSession=false
QoS0 无离线消息,在线消息只尝试推一次 无离线消息,在线消息只尝试推一次
QoS1 无离线消息,在线消息保证可达 有离线消息,所有消息保证可达
QoS2 无离线消息,在线消息保证只推一次 有离线消息,所有消息保证只推一次

对于 QoS > 0的消息,如果是持久化连接,当客户端不在线时,发送消息会保存离线消息到broker,当客户端上线时,mqtt会从broker拉取消息推送给客户端。
mqtt 离线消息实现相关的主要存储结构以及作用为:
inflightWindow :
为了提高消息吞吐效率和减少网络波动带来的影响,已发送但未确认的报文将被存放在 inflightWindow 中直至完成确认从inflightWindow 中移除,key为packetId(报文标识符)

ConcurrentHashMap<Integer, InflightMessage> inflightWindow  

consumeOffsetTable:内存中保存了消息的消费进度,当客户端断开连接,从内存中删除,再次上线会重新创建

ConcurrentHashMap<String /*broker^rootTopic*/, Map<String /*queueId^clientId*/, Long/*consumeOffset*/>> consumeOffsetTable

最初的离线消息的方案:


mqtt离线消息实现.png

这个方案在正常收发以及在producer先发送完所有的消息然后consumer上线拉取离线消息时是不会有问题的,之前测试也没有发现问题,但是在以下场景进行离线消息推送会存在问题,最近一次变更测试中,测试了以下场景:
producer发送2000条消息
1、qos =1、 cleansession=false的consumer订阅了一个topicA
2、producer发送500条消息时,consumer断开连接,到1000条消息的时候consumer再次连接,这个过程中producer一直发送消息,此时离线消息和在线消息会一起推送,前500条消息,后1000条消息是在线消息,中间还有500条是离线消息。
这个场景测试会有大量消息丢失和很多重复消息,经过打印大量日志分析造成这个现象的有两个地方代码逻辑需要优化:
1、客户端在线时,推送消息时,消息放入inflightWindow ,收到客户端的ACK,从inflightWindow 中移除该消息,更新consumeOffsetTable,当客户端掉线会更新消费进度到redis中
问题:先收到的ACK删除的消息的消费进度不一定时最大的,比如如下图所示


inflightWindow 中消息

当客户端收到消息进度为5的消息ACK时,consumeOffsetTable消费进度更新到5,此时客户端掉线,consumeOffsetTable消费进度5会更新到redis中,而消费进度6、7、8的消息会作为 NOT ACK的消息持久化到redis中,当客户端在次上线时,会先取出NOT ACK的消息发送,然后从broker拉取消息会从消费进度5开始,这样6、7、8会重复消费,一次是inflightWindow 作为未收到ACK重发,还有一次是作为离线消息从broker拉取消费
2、在上面的例子中,producer发送500条消息时,consumer断开连接,假设此时consumeOffsetTable中offset是500,断开连接会更新到redis中,当producer发送到1000条消息的时候consumer再次连接,会从redis中取出offset,代码逻辑如下:

    private long calcNextOffset(ConcurrentHashMap<String, Map<String, Long>> offsetTable, String key,
        String innerKey,
        PersistService persistService) {
        if (!offsetTable.containsKey(key)) {
            long persistOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
            Map<String, Long> offsetMap = new HashMap<>();
            Map<String, Long> previous = offsetTable.putIfAbsent(key, offsetMap);
            if (previous != null) {
                offsetMap = previous;
            }
            offsetMap.putIfAbsent(innerKey, persistOffset);
        } else if (!offsetTable.get(key).containsKey(innerKey)) {
            long persistOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
            Map<String, Long> offsetMap = offsetTable.get(key);
            offsetMap.putIfAbsent(innerKey, persistOffset);
        }
        return offsetTable.get(key).get(innerKey);
    }

但是由于producer在线消息也在发送,当consumer上线时,假设此时在线消息的offset到了1021,导致这个离线消息从broker拉取消息的offset并不是500而是1021,从而使得500到1000之间的离线消息都会丢失,这个问题实际上就是离线消息和在线消息consumeOffset混合使用导致
最后为了解决上述两个问题,优化后的方案为:

方案主要作了两个地方的逻辑修改:
1、改进之前consumeOffsetTable的offset更新是在MQTT收到客户端的ACK更新消费进度,改为在消息放入inflightWindow时就把consumeOffsetTable的消费进度更新,即使inflightWindow中消息没有收到ACK,也会作为NOT ACK的消息处理,不会丢失消息
2、在客户端上线推送离线消息时,计算拉取离线消息的消费进度改为直接从redis中取出,不必判断consumeOffsetTable里面是否有值,避免了在线跟离线消息混合使用consumeOffsetTable,把在线跟离线的消费进度分开处理

   private long calcNextOffset(String key, String innerKey, PersistService persistService) {
        long offlineMsgOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
        return offlineMsgOffset + 1;
    }

改进后的方案如下:


改进后的MQTT离线方案

除了以上问题,还有一个目前待MQTT离线消息解决的问题,先介绍一下这个问题,拉取离线消息主要就是获取拉取消息消费进度的开始结束值,前面的问题都是解决了消费进度开始值的问题,还有一个遗留问题是拉取离线消息消费进度结束值有问题。
出现这个问题场景还是上述测试场景

   private long getMaxOffset(String enodeName,
        String topic,
        int queueId) throws InterruptedException, RemotingTimeoutException, RemotingCommandException, RemotingSendRequestException, RemotingConnectException {
        GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setQueueId(queueId);
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);

     return this.mqttBridgeController.getEnodeService().getMaxOffsetInQueue(enodeName, topic, request);
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容

  • Design 1. Motivation 我们设计Kafka用来作为统一的平台来处理大公司可能拥有的所有实时数据源...
    BlackManba_24阅读 1,335评论 0 8
  • MQ是消息服务中间件,基于高可用分布式集群技术,是消费模式基于发布订阅模式的消息系统。支持Java,C++以及.N...
    freesan44阅读 2,328评论 0 1
  • 从来都觉得“少年”是对一个男孩子最好的形容。 不知道大家有没有羡慕过一个人。 阿尧第一次出现在我的生活中的时候,就...
    白眼书生阅读 374评论 1 3
  • 一、Jenkins概述 二、安装Jenkins 安装 Java 下载地址: https://www.oracle....
    胖虎喜欢小红阅读 788评论 0 1
  • 仰月望星空,千里欲冰封。 举目问苍穹,万里雪花飘。
    一切美好阅读 579评论 18 12