RocketMq消息存储

一个消息中间件最核心的东西就是消息存储结构。

这是kafka的消息存储:

clipboard.png

每个topic_partition对应一个日志文件,Producer对该日志文件进行“顺序写”,Consumer对该文件进行“顺序读”。这种存储方式,对于每个文件来说是顺序IO,但是当并发的读写多个partition的时候,对应多个文件的顺序IO,表现在文件系统的磁盘层面,还是随机IO。
因此出现了当partition或者topic个数过多时,Kafka的性能急剧下降。参见http://blog.csdn.net/chunlongyu/article/details/53913758

RocketMq的消息存储:
RocketMQ采用了单一的日志文件,即把同1台机器上面所有topic的所有queue的消息,存放在一个文件里面,从而避免了随机的磁盘写入。

clipboard.png

如上图所示,所有消息都存在一个单一的CommitLog文件里面,然后有后台线程异步的同步到ConsumeQueue,再由Consumer进行消费。
这里至所以可以用“异步线程”,也是因为消息队列天生就是用来“缓冲消息”的。只要消息到了CommitLog,发送的消息也就不会丢。只要消息不丢,那就有了“充足的回旋余地”,用一个后台线程慢慢同步到ConsumeQueue,再由Consumer消费。
可以说,这也是在消息队列内部的一个典型的“最终一致性”的案例:Producer发了消息,进了CommitLog,此时Consumer并不可见。但没关系,只要消息不丢,消息最终肯定会进入ConsumeQueue,让Consumer可见。

深入看一下这两个结构。

ConsumeQueue

ConsumeQueue的存储位置
默认的存储位置:${user.home} \store\consumequeue${topicName}${queueId}${fileName}
可以修改:配置文件的
storePathRootDir=/home/haieradmin/mqstore/rocketmqstore
storePathCommitLog=/home/haieradmin/mqstore/rocketmqstore/commitlog
这两个参数。

测试先发送100个消息:

public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new
DefaultMQProducer("produce1");

producer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("DemoTopic", "TagA",
("Hello RocketMQ").getBytes()
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}

发送100个完全一样的消息

SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128085F6B4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128085FD31, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F00000012808606D9, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128086075F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270BE8E7, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270BE96D, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270BEC87, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270BED0D, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128086CB34, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128086CFB4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128086D811, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128086DA94, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CA577, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CD5A9, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CD62F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CE170, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280889405, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280889FDF, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088A262, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088A976, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CFEF4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CFF7A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D0000, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D031A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088AE8D, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088AF13, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088AF99, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B01F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D064E, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D06D4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D075A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D07E0, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B354, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B3DA, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B460, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B795, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D0DC4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D0E4A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D117F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D14B4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B81B, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B8A1, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B927, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B9AD, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D17E9, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D186F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D1BA4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D1C2A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BA33, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BAB9, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BB3F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BBC5, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D1CB0, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D1D36, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D1DBC, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D20F1, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BC4B, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BCD1, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BD57, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BDDD, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2177, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D21FD, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2283, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D259D, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088C7A0, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088C826, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088CAA9, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088CDDE, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2B66, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2BEC, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2F21, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2FA7, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088DC32, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088DCB8, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088E90F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088EE26, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D3570, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D3B38, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D3BBE, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D3ED8, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280890074, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128089058B, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128089080E, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280890D25, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D49E4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4A6A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4AF0, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4B76, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280890DAB, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F00000012808912C2, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280891545, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F00000012808915CB, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4E8F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4F15, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4F9B, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D5021, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280891651, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F00000012808916D7, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128089175D, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F00000012808917E3, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=12]

可以看出,这100条消息时分别发送到两个broker上

去看一下

clipboard.png

为什么是4个队列。可以看这篇。http://www.jianshu.com/p/ccdf6fc710b0

进去一个队列,发现有一个00000000000000000000的文件。
顺便说一下,消息队列文件名规则:
commitlog文件的默认存储大小1G,不是ConsumeQueue的
文件名以已有存储容量依次递增,类似如下:
00000000000000000000
00000000001073741824
00000000002147483648
。。。

这是个二进制文件,打开看一下。
[haieradmin@IBMMQ03 0]$ od -Ax -tx1 00000000000000000000

000000 00 00 00 12 80 85 f6 b4 00 00 00 86 00 00 00 00
000010 00 27 a8 07 00 00 00 12 80 86 cb 34 00 00 00 86
000020 00 00 00 00 00 27 a8 07 00 00 00 12 80 88 94 05
000030 00 00 00 86 00 00 00 00 00 27 a8 07 00 00 00 12
000040 80 88 ae 8d 00 00 00 86 00 00 00 00 00 27 a8 07
000050 00 00 00 12 80 88 b3 54 00 00 00 86 00 00 00 00
000060 00 27 a8 07 00 00 00 12 80 88 b8 1b 00 00 00 86
000070 00 00 00 00 00 27 a8 07 00 00 00 12 80 88 ba 33
000080 00 00 00 86 00 00 00 00 00 27 a8 07 00 00 00 12
000090 80 88 bc 4b 00 00 00 86 00 00 00 00 00 27 a8 07
0000a0 00 00 00 12 80 88 c7 a0 00 00 00 86 00 00 00 00
0000b0 00 27 a8 07 00 00 00 12 80 88 dc 32 00 00 00 86
0000c0 00 00 00 00 00 27 a8 07 00 00 00 12 80 89 00 74
0000d0 00 00 00 86 00 00 00 00 00 27 a8 07 00 00 00 12
0000e0 80 89 0d ab 00 00 00 86 00 00 00 00 00 27 a8 07
0000f0 00 00 00 12 80 89 16 51 00 00 00 86 00 00 00 00
000100 00 27 a8 07 00 00 00 00 00 00 00 00 00 00 00 00
000110 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00

解释一下:
最左边的是这行数据开始的地址。16进制。比如第一行000000开始于第0个字节。第二行000010开始于第16个字节,这只是为了显示给你看的,方便数字节。

ConsumeQueue中每个消息时20Byte长。结构为

clipboard.png

CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量
Size存储中消息的大小
Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)

所以

clipboard.png

上面蓝框是一条ConsumeQueue消息,红色的00000086是对应真实消息的size。
我之前故意发送了全部相同的消息,所以size都是一样的。因为tag是一样的,所以Message Tag HashCode也是一样的。
可以数一下,这个ConsumeQueue中现在一共13条数据,和前面的日志显示是一样的。

测一下消费

package com.yunsheng.simpleExample;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class SyncConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        //声明并初始化一个consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
        consumer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");

        //这里设置的是一个consumer的消费策略
        //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
        //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
        //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //设置consumer所订阅的Topic和Tag,*代表全部的Tag
        consumer.subscribe("DemoTopic", "*");

        //设置一个Listener,主要进行消息的逻辑处理
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
//
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
//                if (msgs.size() > 0){
//                    System.out.println(new String(msgs.get(0).getBody()));
//                }

                //返回消费状态
                //CONSUME_SUCCESS 消费成功
                //RECONSUME_LATER 消费失败,需要稍后重新消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //调用start()方法启动consumer
        consumer.start();

        System.out.println("Consumer Started.");
    }
}

看下输出:

ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, storeSize=134, queueOffset=2, sysFlag=0, bornTimestamp=1507713810962, bornHost=/192.168.116.77:51970, storeTimestamp=1507713811012, storeHost=/10.135.17.26:10911, msgId=0A87111A00002A9F000000128088A976, commitLogOffset=79465851254, bodyCRC=1774740973, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=DemoTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=13, WAIT=true, TAGS=TagA}, body=14]]]

现在查下ConsumeQueue,恩,都还在的。
不是消费过了吗?怎么还有?
但是现在再运行SyncConsumer ,并不会消费消息。
所以,一定是有某种手段保存了消费的进度。这种手段就是,在DefaultMQPushConsumer实例启动时,会到broker上,拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。
所以,现在换一个consumerGroup的名字,会发现又消费了。

RocketMq的删除机制。RocketMq并不会立即删除消息,所以消息是可以被重复消费的。 RocketMq的消息时定期清除,默认3天。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容