MetaQ中间件原理

metaq是阿里团队的消息中间件,之前也有用过和了解过kafka,据说metaq是基于kafka的源码改过来的,他们之间的区别在哪里,接下来一探究竟。

由此实现一个重要的功能:挡住前端的数据洪峰,保证后端系统的稳定性。

1.支持严格的消息顺序

2.支持Topic与Queue两种模式

3.亿级消息堆积能力

4.比较友好的分布式特性

5.同时支持Push与Pull方式消费消息

究竟metaq是如何支持这5个特性的,带着问题去分析metaq。

MetaQ发展历史

2007年,淘宝实施了“五彩石”项目,将交易系统由单机交易升级到了分布式,这个过程中产生了Notify

2010年,阿里巴巴B2B部门基于ActiveMQ的5.1版本也开发了自己的一款消息引擎,称为Napoli

2011年,Linkin推出Kafka消息引擎,阿里巴巴在研究了Kafka的整体机制和架构设计之后,基于Kafka的设计使用Java进行了完全重写并推出了MetaQ 1.0版本,主要是用于解决顺序消息和海量堆积的问题,由开源社区killme2008维护。

2012年,阿里巴巴对于MetaQ进行了架构重组升级,开发出了MetaQ 2.0,这时就发现MetaQ原本基于Kafka的架构在阿里巴巴如此庞大的体系下很难进行水平扩展,所以在2012年的时候就开发了RocketMQ 3.0

2015年,又基于RocketMQ开发了阿里云上的Aliware MQNotify 3.0

2016年,阿里巴巴将RocketMQ的内核引擎捐赠给了Apache基金会。

MetaQRocketMQ区别:两者等价,在阿里内部称为MetaQ 3.0,对外称为RocketMQ 3.0

以上就是RocketMQ的整体发展历史,其实在阿里巴巴内部围绕着RocketMQ内核打造了三款产品,分别是MetaQ、Notify和Aliware MQ。这三者分别采用了不同的模型:

MetaQ主要使用了拉模型,解决了顺序消息和海量堆积问题。

Notify主要使用了推模型,解决了事务消息

而云产品Aliware MQ则是提供了商业化的版本。

物理机群部署架构图


NameServer集群:MetaQ 1.x和MetaQ 2.x是依赖ZooKeeper的,由于ZooKeeper功能过重,RocketMQ(即MetaQ 3.x)去掉了对ZooKeeper依赖,采用自己的NameServer。

Broker:消息中转角色,负责存储消息,转发消息。

Consumer:Push Consumer / Pull Consumer。前者向Consumer对象注册一个Listener接口,收到消息后回调Listener接口方法,采用long-polling长轮询实现push;后者主动由Consumer主动拉取信息,同kafka。

Producer:消息生产者。

消息领域模型

Message:单位消息

Topic:软分区,对应相同的topic时,生产者对应消费者的分区标识

Tag:消息在topic基础上的二级分类

Message Queue:硬分区,物理上区分topic,一个topic对应多个message queue

Group:Consumer Group,一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致;Producer Group,一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。

Offset:绝对偏移值,message queue中有两类offset(commitOffset和offset),前者存储在OffsetStore中表示消费到的位置,后者是在PullRequest中为拉取消息位置。

广播消费:Producer 向一些队列轮流发送消息,队列集合称为 Topic,每一个 consumer 实例消费这个 Topic 对应的所有队列。

集群消费:多个 Consumer 实例平均消费这个 topic 对应的队列集合。

Broker分析

Broker端负载均衡

Broker以组为单位向Consumer提供消息服务,group中分为master和slave两种角色,master和slave的消息同步后续介绍。然后通过NameServer暴露给Consumer具体通信地址,采用message queue消息队列结构来提供消费接口。针对某一topic情况下,message queue会根据queue id分布在不同的broker上,Consumer的消息消费压力则会分摊在不同的Broker上的message queue,从而达到负载均衡的作用。

负载均衡关系图:

虽然每个topic下面有很多message queue,但是message queue本身并不存储消息。真正的消息存储会写在CommitLog的文件,message queue只是存储CommitLog中对应的位置信息,方便通过message queue找到对应存储在CommitLog的消息。不同的topic,message queue都是写到相同的CommitLog 文件,也就是说CommitLog完全的顺序写,而顺序读写是metaq高吞吐量的基础。

Broker存储结构

下图为本机启动broker后文件系统截图:

重试队列:%RETRY%+consumergroup,push consumer默认订阅用于消费失败后的重试消费

死信队列:多次(默认16次)消费失败后进入DLQ队列,需要人工处理

定时队列:用于定时和延时消息

ConsumeQueue: 即message queue,根据topic和queueId区分的消息队列,对MappedFileQueue进行封装

CommitLog: Broker中顺序存储的消息结构,管理消息commit和flush,对MappedFileQueue进行封装

MappedFileQueue: 对~/store/commitlog/中MappedFile封装成文件队列,进行文件大小格式检查,对mappedFile进行管理。

MappedFile: 实际broker数据文件映射成的类,即~/store/commitlog/中00000000000000000000、00000000001073741824等文件,每个文件默认大小上限为1G。

消息写入



CommitLog负责将Producer的消息写入文件中,写入过程中单例加锁

MappedFile unlockMappedFile = null;

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

putMessageLock.lock(); //spin or ReentrantLock ,depending on store config

try {

    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();

    this.beginTimeInLock = beginLockTimestamp;

    // Here settings are stored timestamp, in order to ensure an orderly

    // global

    msg.setStoreTimestamp(beginLockTimestamp);

    if (null == mappedFile || mappedFile.isFull()) {

        mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise

    }

    if (null == mappedFile) {

        log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());

        beginTimeInLock = 0;

        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);

    }

    result = mappedFile.appendMessage(msg, this.appendMessageCallback);

    ...

} finally {

    putMessageLock.unlock();

}

消息存储

核心消息存储体DefaultMessageStore类,主要包含两个重要成员:

//消息存放的物理主体 

private final CommitLog commitLog 

//根据topic分离的消费队列,记录消息的直接地址 

private final ConcurrentMap> consumeQueueTable

metaq会启动一个定时服务ReputMessageService分别定时调用(间隔1ms)来生成消费者队列和索引文件,两者均会以文件形式落盘。

metaq使用PullMessageProcessor来处理来自Consumer消费消息的请求,然后MessageStore存储体根据group、topic、queueId以及maxMsgNums来从CommitLog消息物理文件中获取对应消息的MappedByteBuffer然后返回消息给Consumer。

持久化消费进度

metaq文件目录下有两个文件用于持久化消费进度,每次写入 consumerOffset.json,将原内容备份到 consumerOffset.json.bak。

consumerOffset.json:消费进度存储文件

consumerOffset.json.bak:消费进度存储文件备份

Consumer消息完后Broker持久化其消费进度,关键代码如下:

//com.alibaba.rocketmq.broker.offset.ConsumerOffsetManager

private ConcurrentMap> offsetTable =

        new ConcurrentHashMap>(512);

private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {

        ConcurrentMap map = this.offsetTable.get(key);

        if (null == map) {

            map = new ConcurrentHashMap(32);

            map.put(queueId, offset);

            this.offsetTable.put(key, map);

        } else {

            Long storeOffset = map.put(queueId, offset);

            if (storeOffset != null && offset < storeOffset) {

                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);

            }

        }

    }

Broker落盘策略


GroupCommitService、FlushRealTimeService、CommitRealTimeService均是ServiceThread的子类(ServiceThread线程同步待分析)。FlushRealTimeService和CommitRealTimeService顾名思义前者是实时Flush到磁盘,直接采用FileChannel的force()来flush;后者则是Commit实时,采用FileChannel的write()先写到内存字节缓冲区,然后唤醒flush线程。

同步刷盘是在每条消息都确认落盘了之后才向发送者返回响应;而异步刷盘中,只要消息保存到Broker的内存就向发送者返回响应,Broker会有专门的线程对内存中的消息进行批量存储。所以异步刷盘的策略下,当机器突然掉电时,Broker内存中的消息因无法刷到磁盘导致丢失。

FlushRealTimeService两种flush定时:固定式flush和唤醒式flush,采用固定时间或堵塞等待上一个flush线程完成flush并唤醒。

while (!this.isStopped()) {

    ...

    if (flushCommitLogTimed) {

        Thread.sleep(interval);

    } else {

        this.waitForRunning(interval);

    }

    ...

    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

}

Broker flush行为

//com.alibaba.rocketmq.store.MappedFile:int flush(final int flushLeastPages)

try {

    //We only append data to fileChannel or mappedByteBuffer, never both.

    if (writeBuffer != null || this.fileChannel.position() != 0) {

        this.fileChannel.force(false);

    } else {

        this.mappedByteBuffer.force();

    }

} catch (Throwable e) {

    log.error("Error occurred when force data to disk.", e);

}

采用FileChannel.force()来落盘,路径为wrotePosion->flushedPositionwrotePosition。

采用MappedByteBuffer.force()来落盘,路径为wrotePosion->committedPosion->flushedPositionwrotePosition。

其中FileChannel为NIO中流式读写IO,MappedByteBuffer将文件映射成虚拟内存然后对内存直接操作。对10M文件进行读写对比(数据本机跑,参考:FileChannel和 MappedByteBuffer性能对比):


读性能上,FileChannel要优于MappedByteBuffer。写性能上,FileChannel的34ms为直接落盘耗时,MappedByteBuffer还需要等待系统同步。MappedByteBuffer还需要额外的map耗时。总体来说NIO的FileChannel表现更好,这也是MetaQ采用FileChannel原因。

MetaQ高吞吐量技术

利用Linux文件系统内存cache来提高性能

本地磁盘文件->socket发送,4步骤数据流向:

hard driver -> kernel space ---- [DMA copy]

kernel space -> user space ---- [CPU copy]

user space -> kernel space ---- [CPU copy]

kernel space -> protocol engine ---- [DMA copy] 

左右图的上部分为user和kernel的上下文切换。下部分为数据流向图,其中左图为正常流程中Linux从文件系统读取文件然后通过socket发送的流程,其中共经历了4次内存拷贝;右图为改进版本,通过mmap内存映射将文件映射进内存,绕开了user space和kernel space的二次拷贝,实现zero copy(针对user space)。

//正常读写方式

read(file, tmp_buf, len);

write(socket, tmp_buf, len); 

//mmap读写方式

tmp_buf = mmap(file, len);

write(socket, tmp_buf, len);

Broker索引服务

采用MesaageId查询消息不需要采用索引服务,从MesaageId可以解析出Broker地址和Commit Log Offset然后拉取消息。

metaq的IndexFile主要参考HashMap的实现,采用拉链法解决哈希冲突,每个slot倒序指向最新的索引即目前索引数量。

Header:记录落Broker时间戳、偏移量,槽位数目和索引个数

SlotTable:数组插槽,插槽位置 = key的Hash值 % 插槽数量,每个槽位记录当前索引总数

Index Linked List:插槽后接的链表结构,记录key的Hash值、物理偏移地址、落盘时间和哈希冲突后上一个索引地址

通过MessageKey检索消息:通过key定位slot,加锁从最大索引值开始倒序查找,比对hash值和落盘时间,返回一致时的物理偏移地址。

MetaQ的生产消费和存储结构关系

Producer逻辑

发送方式

定义了三种发送方式:

SYNC:同步发送,发送方线程发送后同步堵塞等待SendResult,若failed则重试下一个broker。

ASYNC:异步发送,超时可抛出Timeout异常。

ONEWAY:单向发送,发送方不等待broker响应也没有回调函数触发,速度快但可靠性弱。

发送流程

1、查询本地缓存是否存储了TopicPublishInfo,否则从NameServer获取。

2、根据选择策略获取待发送队列。

3、获取消息队列对应的broker实际IP。

4、设置消息Unique ID,zip压缩消息。

5、检查信息合法性,调用NettyClient发送消息

TopicPublishInfo包含队列优先级、消息队列列表、路由信息以及一个线程安全的index坐标。

选择策略

发送线程以线程独立的方式自增,遍历MessageQueue选择一条待发送的MessageQueue。

volatile作用。

若启动容错策略(默认false)

1、通过ThreadLocalIndex遍历选出一条非faultItme的messageQueue。

2、若无非faultItme的broker,将faultItme按照可用<不可用,失败时长短<失败时长长,恢复时间点早<恢复时间点晚来排序,pickOneAtLeast遍历排序链表的前半部分选出一个broker。即至少不是最差策略 —— bad but not worst。

3、依次轮流写入broker中对应的messageQueue中,发端的负载均衡体现在这里。

异常情况:每次Producer发送失败时,维护并更新一个broker的faultItem的Map,则逻辑认为N秒内不可用,用于容错策略下的比较。

非异常情况下:看源码发现也会归为faultItem节点,延时设置为上一次从发出请求到响应的时长。

容错策略下 pickOneAtLeast逻辑:

    public String pickOneAtLeast() {

        final Enumeration elements = this.faultItemTable.elements();

        List tmpList = new LinkedList();

        while (elements.hasMoreElements()) {

            final FaultItem faultItem = elements.nextElement();

            tmpList.add(faultItem);

        }

        if (!tmpList.isEmpty()) {

            Collections.shuffle(tmpList);

            Collections.sort(tmpList);

            final int half = tmpList.size() / 2;

            if (half <= 0) {

                return tmpList.get(0).getName();

            } else {

                final int i = this.whichItemWorst.getAndIncrement() % half;

                return tmpList.get(i).getName();

            }

        }

        return null;

    }

    // For Class FaultItem

    public int compareTo(final LatencyFaultToleranceImpl.FaultItem other) {

        if (this.isAvailable() != other.isAvailable()) {

            if (this.isAvailable())

                return -1;

            if (other.isAvailable())

                return 1;

        }

        if (this.currentLatency < other.currentLatency)

            return -1;

        else if (this.currentLatency > other.currentLatency) {

            return 1;

        }

        if (this.startTimestamp < other.startTimestamp)

            return -1;

        else if (this.startTimestamp > other.startTimestamp) {

            return 1;

        }

        return 0;

    }

PushConsumer逻辑(PullConsumer不适用)

拉取消息的流程示意

黄色箭头代表PullRequest的流向,蓝色箭头代表ConsumeRequest的流向,灰色箭头根据offset拉取消息。

RebalanceService确定consumer拉取的queue,为需要拉取的queue生成一个PullRequest,放入PullRequestQueue中,拉取消息的位置从nextOffset从Broker远程拉取。

PullMessageService不断从PullRequestQueue中消费PullRequest,根据nextOffset去broker拉取消息,若queue已经dropped则更新offset到broker并丢弃此拉消息请求。

PullMessageService异步拉取消息,同时将PullRequest封装在PullCallback中,PullCallback封装在ResponseFuture中,并以自增的请求id为键,ResponseFuture为值放入ResponseTable中。

Broker收到请求,如果offset之后有新的消息会立即发送异步响应;否则等待直到producer有新的消息发送后返回或者超时。如果通信异常或者Broker超时未返回响应,nettyClient会定时清理超时的请求,释放PullRequest回到PullRequestQueue。

用最新的offset更新ResponseFuture里的PullRequest并推送给PullRequestQueue里以进行下一次拉取。批量拉取到的消息分批提交给consumeExecutor线程处理。

PullRequest的触发

第一种方式,是每次进行rebalance之后生成pullResult并调用ConsumeMessageService.submitConsumeRequest,该方式是拉取消息的起点

第二种方式,每次获取PullResult的状态,状态为FOUND则调用ConsumeMessageService.submitConsumeRequest将请求扔给ConsumeExecutor去启用线程消费,然后更新offset重新将pullRequest入队;其他状态如NO_NEW_MSG NO_MATHCHED_MSG则更新offset直接将pullRequest入队。

Consumer端Rebalance

rebalance的触发情况:

默认waitInterval = 20000ms

启动MQConsumerInner调用rebalanceImmediately()

broker通知consumer,group改变调用rebalanceImmediately()

Consumer采用策略模式AllocateMessageQueueStrategy来定义不同的队列分配机制:

AllocateMessageQueueAveragely:平均分配队列策略,如下图:

A. queue数量小于消费者数量

B. 消费者数量大于queue数量,且queue数量 % 消费者数量 > 0 

B. 消费者数量大于或等于queue数量,且queue数量 % 消费者数量 = 0 

AllocateMessageQueueAveragelyByCircle:环状分配消息队列策略

AllocateMessageQueueByConfig:根据配置分配消息队列策略

AllocateMessageQueueByMachineRoom:

AllocateMessageQueueConsistentHash:一致性哈希算法分配策略

Consumer拉消息详细调用过程


PullMessageService拉取线程不停的读取PullRequestQueue根据request拉取消息,然后将消息丢到ProcessQueue中并新建ConsumeRequest提交到ConsumeService处理, 然后生成下一批的PullRequest丢到PullRequestQueue,形成无限循环。ProcessQueue中以TreeMap形式保存待处理的消息,key为消息对应的offset,并自动进行排序:

private final TreeMap msgTreeMap = new TreeMap();

/**

* Concurrently max span offset.it has no effect on sequential consumption

*/

private int consumeConcurrentlyMaxSpan = 2000;

//metaq拉取流控

if (!this.consumeOrderly) {

    if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {

        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);

        if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {

            log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);

        }

        return;

    }

}

消息快速拉取:

metaq中拉取线程不需要等待消费线程的处理,一批消息拉取后未消费完可直接拉取第二批消息(消费成功或消费失败回发成功均会offset前移),拉取前判断msgTreeMap中最大offset值-最小offset值是否超过流控阈值,超过后延时50ms重新拉取。msgTreeMap每次会清除掉消费成功和消费失败回发成功的消息,剩下保存的是消费失败且未回发成功的消息,回发不成功会本地重试且远端offset不会前移(ack卡顿)。

分离拉取offset(PullRequest#nextOffset)和消费offset(OffsetStore#offset)分离拉取和处理进度,提升拉取效率,并根据消费者处理卡主情况做拉取阀值控制。拉取和处理分离,保证不丢数据,提升效率同时代价是重复消息。

offset管理

offset分类:

switch (this.defaultMQPushConsumer.getMessageModel()) {

    case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;

    case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;

    default: break;

}

广播模式下采用offset本地保存,集群消费模式下采用offset远端保存(即broker端)。对于广播模式,每个queue对应所有的consumer,而consumer消费之间互相独立,故offset保存在consumer本地即可。对于集群模式,consumer之间消费需要broker进行协调,故offset保存在broker端。

offset、queue、consumer关系:

private ConcurrentMap offsetTable =

        new ConcurrentHashMap();

在集群模式下,多个消费者会负载到不同的消费队列上,因为消息消费进度是基于消息队列进行保存的,也就是不同的消费者之间的消费进度保存是不会存在并发的。

offset持久化触发:

定时offset持久化

拉取消息后触发offset持久化

分配消息队列触发offset持久化

更新最小offset:

consumerA拉取了queueA的offset:1-10批量消息,成功消费的消息会被剔除掉,剩2和8未成功消费且回发broker失败则update offset会设置为处理队列中最小的偏移量,来保证消息肯定能被消费成功。但由于外部原因(consumer宕机、新consumer加入等)会触发rebalance,导致queueA对应consumerB,2-10会被重复消费。这也是前面提到的:拉取和处理分离,保证不丢数据,提升效率同时代价是重复消息。

消息消费失败处理

public void processConsumeResult(

    final ConsumeConcurrentlyStatus status,

    final ConsumeConcurrentlyContext context,

    final ConsumeRequest consumeRequest) {

    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())

        return;

    switch (status) {

        case CONSUME_SUCCESS:

            if (ackIndex >= consumeRequest.getMsgs().size()) {

                ackIndex = consumeRequest.getMsgs().size() - 1;

            }

            int ok = ackIndex + 1;

            int failed = consumeRequest.getMsgs().size() - ok;

            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);

            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);

            break;

        case RECONSUME_LATER:

            ackIndex = -1;

            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),

                consumeRequest.getMsgs().size());

            break;

        default:

            break;

    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {

        case BROADCASTING:

            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {

                MessageExt msg = consumeRequest.getMsgs().get(i);

                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());

            }

            break;

        case CLUSTERING:

            List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());

            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {

                MessageExt msg = consumeRequest.getMsgs().get(i);

                boolean result = this.sendMessageBack(msg, context);

                if (!result) {

                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);

                    msgBackFailed.add(msg);

                }

            }

            if (!msgBackFailed.isEmpty()) {

                consumeRequest.getMsgs().removeAll(msgBackFailed);

                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());

            }

            break;

        default:

            break;

    }

    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());

    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {

        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);

    }

}

广播模式:无论是否消费失败,不发回消息到Broker,只打印Log。

集群模式下:回发不成功会本地重试且远端offset不会前移,消费失败但发回Broker成功远端offset会前移。如果发回Broker 成功,结果因为例如网络异常,导致Consumer以为发回失败,判定消费发回失败会导致消息重复消费。

消费失败回发broker失败:

调用submitConsumeRequestLater延迟重新消费。

消费失败回发broker成功:

broker端修改topic和queueId,将数据写到SCHEDULE_TOPIC对应队列中,最终通过ScheduleMessageService的定时任务来进行处理。定时任务读取这些数据,修改topic为RETRY,交给commitLog存储,ReputService将消息写入RETRY队列中,默认集群模式的consumer会订阅Retry队列,然后消费掉这些消息。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

引起重复消费的case

rebalance导致重复消费:

假设新增消费者前,ConsumerA正在消费MessageQueue-M,消费到第3个offset,这个时候新增了ConsumerB,那么根据集群模式的AllocateMessageQueue的策略,可能MessageQueue-M被分配给了ConsumerB,这个时候ConsumerA由于消费的offset没有实时更新回去,会导致ConsumerB和ConsumerA之前的消费有重叠。或者消费失败回传失败的时候rebalance也会导致重复消费。

发送时消息重复:

MQ Producer 发送消息场景下,消息已成功发送到服务端并完成持久化,此时网络闪断或者客户端宕机导致服务端应答给客户端失败。如果此时 MQ Producer 意识到消息发送失败并尝试再次发送消息,MQ 消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

投递时消息重复:

MQ Consumer 消费消息场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,MQ 服务端将在网络恢复后再次尝试投递之前已被处理过的消息,MQ 消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

metaq其他消息类型

顺序消息

全局顺序

对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 

分区顺序

对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 

性能对比:

Topic类型支持事务消息支持定时消息性能

无序消息是是最高

分区消息否否高

全局消息否否一般

事务消息


发送方向 MQ 服务端发送消息;

MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。

发送方开始执行本地事务逻辑。

发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。

在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。

发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7。

定时消息和延时消息

略......

HA高可用

broker有三种角色,ASYNC_MASTER、SYNC_MASTER和SLAVE,几种搭配方式:

ASYNC_MASTER、SLAVE:容许丢消息,但是要broker一直可用,master异步传输CommitLog到slave

SYNC_MASTER、SLAVE:不允许丢消息,master同步传输CommitLog到slave

ASYNC_MASTER:如果只是想简单部署则使用这种方式

在broker集群中每个master相互之间是独立,master之间不会有交互,每个master维护自己的CommitLog、自己的ConsumeQueue,但是每一个master都有可能收到同一个topic下的producer发来的消息

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容