参考
有思考的大佬博客
https://www.jianshu.com/p/b73fdd893f98
概述
- CommitLog:RocketMQ把所有Topic所有Queue的消息都持久化到同一个物理文件CommitLog中。
- ConsumerQueue(逻辑消费队列):本身并不存储任何消息,但是Consumer通过监听这个逻辑队列,来获取到待消费的消息
- IndexFile:因为所有的消息都存在CommitLog中,如果要实现根据 key 查询 消息的方法,就会变得非常困难,所以为了解决这种业务需求,有了IndexFile的存在。
Consumer即可根据ConsumerQueue来查找待消费的消息了。
ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
而IndexFile(索引文件)则只是为了消息查询提供了一种通过key或时间区间来查询消息的方法(ps:这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程)
CommitLog
物理存储。CommitLog消息存放物理文件,每台broker上的commitLog被本机器所有queue共享不做区分。
commitLog -> 1个mappedFileQueue -> N个mappedFile--> Message
public class CommitLog {
private final MappedFileQueue mappedFileQueue;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
}
ConsumerQueue(逻辑消费队列)
Consumer通过监听ConsumerQueue来感知是否有新消息。相当于是一个CommitLog的Queue级别的逻辑消费队列(CommitLog是不分Topic和Queue的)。
事务消息就是基于这个原理:日志的prepare就是把消息写在CommitLog里面,但是不更新到ConsumerQueue里面,事务消息的Commit就是把消息更新到ConsumerQueue里面。
- 每个topic的每个queue都有一个唯一的ConsumerQueue,Consumequeue类文件的存储路径默认为$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个文件由30W条数据组成
public class ConsumeQueue {
public static final int CQ_STORE_UNIT_SIZE = 20;
private final MappedFileQueue mappedFileQueue;
private final String topic;
private final int queueId;
private final ByteBuffer byteBufferIndex;
- consume queue中存储单元是一个20字节定长的数据MESSAGE_POSITION_INFO,是顺序写顺序读,每个Queue对应一个ConsumeQueue
MESSAGE_POSITION_INFO:
- offset: CommitLog中的物理位移(long 8字节)
- size: CommitLog中的日志大小(该消息可能被压缩过)(int 4字节)
- tagsCode:和storeTimestamp相关tagsCode(long 8字节)
- 存储消息第一步会触发CommitLog的物理IO写消息,然后,再写CQueue,
- 直接面向消费者,因为每个Queue有一个单独的Consume Queue,而一个Topic下有多个Queue,从而支持并发的消费
- 当一个Msg从ConsumerQueue里面删除时,有一个BLANK顶上,也是20个定长字节
IndexFile
最终提供根据topic,key等参数,通过IndexFile的索引功能,在CommitLog中,找到offset结果的方法
QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)