在《rocket mq 底层存储源码分析(1)-存储总概》章节中,我们提到rockmq 有两类索引,一类是【逻辑位移索引】,该索引类比于List的index,conusmer端可以通过该index,查询出具体的业务消息【内容】。另一类是【key查询索引】,通过该索引去查询指定的消息,broker端会返回通过key的散列值,落在相同的槽的一定数量的消息给客户端。
因此,本章主要是从底层实现去分析rmq是如何构建这两类索引。
rmq构建索引 是委托默认存储业务抽象类DefaultMessageStore
的内部线程类ReputMessageService
完成的,因此我们直接上源码:
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
run方法比较简洁,在一个while循环里,先睡眠一秒,然后进行doReput()
构建。
继续跟进
doReput()
private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
...
//1、获取业务消息映射文件 字节内容
//这里获取reputFromOffset所在的MappedFile,区间在[reputFromOffset, MappedFile.end]的所有字节消息,添加
//该MappedFile 的持有引用数MappedFile.hold();
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
//2、读取单条 业务消息 内容
//每次滚动获取下一条消息的字节内容
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
//dispatchRequest 代表一条消息承载,
// 2、构建存放该消息的位置索引,并且放入缓存(逻辑offset 以及 物理offset 对应关系)
// 3、构建构建 消息查询索引,该索引的作用为通过topic 和 key 快速定位具体消息.
DefaultMessageStore.this.doDispatch(dispatchRequest);
...
//更新已经构建好索引的消息的位置
this.reputFromOffset += size;
//readSize 控制映射文件内的物理位移
readSize += size;
...
} else if (size == 0) {
//代码走到这里,说明本映射文件中的所有消息均构建索引完成,因此滚动到下一个索引文件
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) { //代码容错处理
if (size > 0) {
//代码走到这里,表明该条消息声明的总长度不等于实际长度。
//因此,需要通过 this.reputFromOffset += size 确保逻辑位移能往下构建。
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
} //end inner for
} finally {
//释放MappedFile的引用
result.release();
}
} else {
//代码走到这里,说明已经为最新的业务消息构建索引完成,
//因此需要跳出最外层的for循环。然后让该业务线程休眠一秒后,
//继续查询是否有新的、待构建索引的业务消息。
doNext = false;
}
} //end outer for
} //end method
public void doDispatch(DispatchRequest req) {
final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
//3、构建【逻辑位移索引】
DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
//4、构建【key查询索引】
DefaultMessageStore.this.indexService.buildIndex(req);
}
}
接下来我们主要从三个步骤分析整个构建流程::
1、获取业务消息映射文件 字节内容
2、读取单条 业务消息 内容
3、构建【逻辑位移索引】
4、构建【key查询索引】
1、获取业务消息映射文件 字节内容
首先构建的整个流程就围绕着for (boolean doNext = true; this.isCommitLogAvailable() && doNext; )
这个for循环;
private boolean isCommitLogAvailable() {
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
根据循环条件,我们可以看出,当 【全局构建索引的物理位移(reputFromOffset )】 小于 当前最新的业务消息物理位移时,并且doNext标志位为true,才会为之构建索引。
我们直接分析如何获取业务消息映射文件 字节内容,进入:
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset)
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
//default value 1G
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
//通过物理位移找出 位移 所在的映射文件
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
之前我们已近分析过,如何通过指定的物理位移找出其所在的内存映射文件。通过int pos = (int)(offset % mappedFileSize)
找出具体位置,继续进入mappedFile.selectMappedBuffer(pos)
:
public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos;
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
}
return null;
}
根据pos
,从内存映射文件的pagecache中拷贝一份字节内容,然后在封装在SelectMappedBufferResult
实例中,换言之,映射文件内 待构建索引 的 字业务消息字节内容均存在于SelectMappedBufferResult
临时实例中。
2、读取单条 业务消息 内容
步骤1中获取SelectMappedBufferResult
后,通过for (int readSize = 0; readSize < result.getSize() && doNext; )
内部循环,逐条构建 索引。
我们直接分析如何通过checkMessageAndReturnSize(result.getByteBuffer(), false, false)
获取一条业务消息内容,换言之,
我们可以认为DispatchRequest
实例代表一条业务消息。
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
try {
// 1 TOTAL SIZE
int totalSize = byteBuffer.getInt();
// 2 MAGIC CODE
int magicCode = byteBuffer.getInt();
switch (magicCode) {
case MESSAGE_MAGIC_CODE:
break;
case BLANK_MAGIC_CODE:
return new DispatchRequest(0, true /* success */);
default:
log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
return new DispatchRequest(-1, false /* success */);
}
...
// 15 BODY
int bodyLen = byteBuffer.getInt();
if (bodyLen > 0) {
if (readBody) {
byteBuffer.get(bytesContent, 0, bodyLen);
if (checkCRC) {
int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
if (crc != bodyCRC) {
log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
return new DispatchRequest(-1, false/* success */);
}
}
} else {
byteBuffer.position(byteBuffer.position() + bodyLen);
}
}
...
int readLength = calMsgLength(bodyLen, topicLen, propertiesLength);
if (totalSize != readLength) {
doNothingForDeadCode(reconsumeTimes);
doNothingForDeadCode(flag);
doNothingForDeadCode(bornTimeStamp);
doNothingForDeadCode(byteBuffer1);
doNothingForDeadCode(byteBuffer2);
log.error(
"[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
totalSize, readLength, bodyLen, topicLen, propertiesLength);
return new DispatchRequest(totalSize, false/* success */);
}
return new DispatchRequest(//
topic, // 1
queueId, // 2
physicOffset, // 3
totalSize, // 4
tagsCode, // 5
storeTimestamp, // 6
queueOffset, // 7
keys, // 8
uniqKey, //9
sysFlag, // 9
preparedTransactionOffset// 10
);
} catch (Exception e) {
}
return new DispatchRequest(-1, false /* success */);
}
简单总结一下,就是按照存储 业务消息 的 格式,逐个字段在重新读取回来。当然,在该上下文中,我们不会去读取以及校验消息内容,仅仅读取消息头以及扩展字段等,并简单校验消息头以及长度是否准确。
3、构建【逻辑位移索引】
获取完整的一条业务消息后,我们直接分析如何构建【逻辑位移索引】,因此继续跟进
doDispatch(dispatchRequest)
public void doDispatch(DispatchRequest req) {
final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
//存放该消息的位置索引
DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
//构建 消息索引,该索引的作用为通过topic 和 key 快速定位具体消息
DefaultMessageStore.this.indexService.buildIndex(req);
}
}
从代码中,我们可以看出,逻辑上,仅仅为普通的业务消息,以及事务消息中,已提交的消息构建索引,接着跟进putMessagePositionInfo(...)
:
/**
* 为该条消息持久化位置索引,该索引的作用为通过逻辑位移可以快速查询消息内容
* @param topic 消息的topic
* @param queueId 消息所在的queueId
* @param offset 该消息的物理位移(也即消息的存放的开始物理地址)
* @param size 消息大小
* @param tagsCode 消息的tagsCode
* @param storeTimestamp 消息的存储时间
* @param logicOffset 消息的逻辑位移
*/
public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
long logicOffset) {
//通过topic 和 queueId 确定消息所在逻辑消费队列(ConsumeQueue)
//而findConsumeQueue实现类查询不到就创建的语义
ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
}
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(//
topic, //
queueId, //
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
之前有分析过,通过topic 以及 queueId可以确定集群中的一条逻辑消费队列,因此我们在这里先分析一下DefaultMessageStore.consumeQueueTable
属性:
ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>
。
可以看出,DefaultMessageStore.consumeQueueTable
是一个ConcurrentHashMap
,我们可以通过topic
,查询出l逻辑消费队列管理表ConcurrentHashMap<Integer, ConsumeQueue>
,最后在通过queueId,即队列标识,获取ConsumeQueue
逻辑消费队列抽象。我们来看看ConsumeQueue
的一些内部属性:
public class ConsumeQueue {
public static final int CQ_STORE_UNIT_SIZE = 20;
private final MappedFileQueue mappedFileQueue;
private final String topic;
private final int queueId;
private final String storePath;
private final int mappedFileSize;
private long maxPhysicOffset = -1;
private volatile long minLogicOffset = 0;
}
每一个ConsumeQueue
实例维护一条逻辑队列,通过该实例我们可以获取topic-queueId维度下,每条队列的相关信息。例如,该条队列目前可消费的最大业务消息,由maxPhysicOffset
属性维护,该属性代表业务消息的物理位移;该条队列目前最小可消费的逻辑位移,由minLogicOffset
属性维护,该属性代表逻辑位移的存储物理位移。storePath
则表明该消费逻辑队列的存储路径,mappedFileSize
为存储文件大小。mappedFileQueue
则对连续内存映射文件抽象。因此,有了该业务抽象后,上层业务可以通过ConsumeQueue
的实例方法对业务消息进行更简便的操作了。
而 findConsumeQueue(String topic, int queueId)
最主要是实现了consumeQueueTable
通过topic-queueId维度下查询ConsumeQueue
的同时,也实现了putIfExist
的语义。
分析完this.findConsumeQueue(topic, queueId)
后,我们接着分析putMessagePositionInfoWrapper(...)
:
public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
long logicOffset) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
//将消息位置索引存放到缓冲区
boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
if (result) {
//更新存储快照的逻辑消息存储时间
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
return;
}
...
}
...
}
每条业务消息,最大尝试30次构建索引,如果构建索引成功的话,就更新【逻辑位移索引】存储快照时间,值为【逻辑位移索引】所对应的【业务消息】写入pagecache的时间戳。存储快照的作用是为了,如果系统出现非正常宕机,那就可以恢复快照时间之前的内容,这些内容确保是正常的,快照恢复的内容在下一章分析。
我们接着进入putMessagePositionInfo(...)
/**
* 构建消息位置索引,大小为20
* @param offset 业务消息的物理位移
* @param size 业务消息的大小
* @param tagsCode 业务消息的key的哈希值
* @param cqOffset 业务消息 所在的逻辑位移
* @return
*/
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset <= this.maxPhysicOffset) {
return true;
}
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
//获取映射文件,如果 不存在 或者 满 就创建
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
//代码走到这里,表明该mappedFile是mappedFileQueue 的第一个内存映射文件,cqOffset!=0表明consumer offset不是第一个,即前面有清除过期的mappedFile
//mappedFile.getWrotePosition() = 0说明是第一个写入该内存映射文件的位置索引消息,因此该consumeQueue.minLogicOffset = expectLogicOffset,即更新该consumeQueue实例业务消息的最小可消费逻辑位移的物理位移。
this.minLogicOffset = expectLogicOffset;
//重新设置刷盘位置以及写入缓存的位置
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
//expectLogicOffset经过对MappedFileSize(1G)取模的值 表明该位置索引在该mappedFile的位置,所以需要填充前面的空位
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
...
}
//这里更新 consumeQueue 最大可以消费业务消息的物理位移
this.maxPhysicOffset = offset;
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
上述代码就是【逻辑位移索引】的构建 及 写入pagecache中。
每一个【逻辑位移索引】的大小固定为20字节(CQ_STORE_UNIT_SIZE),存储内容在说一下,8字节的 业务消息的物理位移,4字节的 业务消息的大小以及8字节的 业务消息的key的哈希值。
与业务消息的存储 抽象一样,逻辑位移索引的存储也是 通过MappedFileQueue
连续物理存储的抽象类,业务使用方可以通过 物理偏移量位置 快速定位 指定的offset所在的MappedFile、创建、删除MappedFile等操作。
【逻辑位移索引】的物理位移计算方式为cqOffset * CQ_STORE_UNIT_SIZE
,其中cqOffset
为逻辑位移,类比于List的index,而CQ_STORE_UNIT_SIZE
为每一个【逻辑位移索引】的大小,值为20;换言之,我们需要查询指定index的业务消息时,我们先通过cqOffset * CQ_STORE_UNIT_SIZE
运算,查询出【逻辑位移索引】的具体内容,然后在通过8字节的 业务消息的物理位移找出业务消息 所在的物理存储位移,就可以找出一条完整的业务消息内容了。
到这里,我们只是分析【逻辑位移索引】是如何构建及写入pagecache中,还没分析到它是如何刷盘的。
我们接着分析【逻辑位移索引】刷盘。
【逻辑位移索引】刷盘是由DefaultMessageStore
的内部线程类FlushConsumeQueueService
实现的。
我们通过源码再来分析一下,首先是刷盘线程入口run()
:
public void run() {
while (!this.isStopped()) {
try {
//刷盘间隔 1000 也即 每秒刷盘一次
int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
this.waitForRunning(interval);
this.doFlush(1);
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
...
}
从代码上,可以看出,整个刷盘流程是,先每次执行this.doFlush(1)
前,先this.waitForRunning(interval)
等待指定时间(1s),接着跟进this.doFlush(1)
:
private void doFlush(int retryTimes) {
//1)获取默认刷屏缓存页大小 2
int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
long logicsMsgTimestamp = 0;
//彻底刷屏间隔 1000 * 60
int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
long currentTimeMillis = System.currentTimeMillis();
//2)计算当前时间戳与上次刷盘的时间戳之差是否超过刷盘间隔
if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
//代码周到这里,说明需要强行执行刷盘操作
//更新上一次刷盘时间
this.lastFlushTimestamp = currentTimeMillis;
//值为0,业务含义为强制刷盘
flushConsumeQueueLeastPages = 0;
//获取存储检查点的物理消息时间戳
logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
}
ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
//3)遍历该broker中的所有ConsumeQueue,
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
//ConsumeQueue 自己维护物理存储抽象类
for (ConsumeQueue cq : maps.values()) {
boolean result = false;
for (int i = 0; i < retryTimes && !result; i++) {
//4)物理刷盘【逻辑位移索引】
result = cq.flush(flushConsumeQueueLeastPages);
}
}
}
if (0 == flushConsumeQueueLeastPages) {
if (logicsMsgTimestamp > 0) {
//回写存储时间
DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
}
//5)刷盘时间存储快照元数据文件
DefaultMessageStore.this.getStoreCheckpoint().flush();
}
}
总结一下上述流程,只要以满足以下两个条件中的其中一个,就会执行物理刷盘操作:
第一,目前写入pagecache中的【逻辑位移索引】是否写满了2个缓存页(8k);
第二,上次刷盘的时间与当前时间间隔超过1分钟。
其中,由刷盘线程FlushConsumeQueueService
的内部实例属性this.lastFlushTimestamp
维持上次刷盘时间,当2)中的 if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval))
判断为true
时,说明当前时间戳与上次刷盘的时间戳之差超过刷盘间隔。
因此,这次流程需要强行刷盘,进而,更新上一次刷盘时间( this.lastFlushTimestamp = currentTimeMillis
);
设置刷盘缓存页数(flushConsumeQueueLeastPages = 0
,该值为0时,表明强行刷盘);读取上一次【逻辑位移索引】完整刷盘的存储时间快照(logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp()
),而完整刷盘的的含义是遍历broker内所有的业务逻辑消费队列(ConsumeQueue)实例,并委托实例执行自身的刷盘操作。
而3)中的for(...)
就是完整遍历的过程,由各自的ConsumeQueue
实例执行刷盘操作cq.flush(flushConsumeQueueLeastPages),参数flushConsumeQueueLeastPages
不为0时,说明至少写满flushConsumeQueueLeastPages
页数时,才执行强制刷盘,而flushConsumeQueueLeastPages
的值为2,缓存页的大小为4k。
另外,当flushConsumeQueueLeastPages
为0,即达到强制刷盘的条件下,才会执行5)中的存储Checkpoint刷盘。当然,内部的logicsMsgTimestamp
值为上一次【逻辑位移索引】完整刷盘的存储时间。这样一来,可以确保那些小于logicsMsgTimestamp
的【逻辑位移索引】均可以完整刷盘。
以上就是【逻辑位移索引】刷盘逻辑了。
4、构建【key查询索引】
直接跟进:
DefaultMessageStore.this.indexService.buildIndex(req);
public void buildIndex(DispatchRequest req) {
//step1.创建索引文件
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
//客户端指定的keys,
String keys = msg.getKeys();
//msg.getCommitLogOffset()指该消息的开始存放物理位置,说明已经为该消息构建过索引了
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
//step2 .构建索引文件
//如果在producer客户端指定了uniqKey, 也就是producer为该条消息生成的唯一id,则为该topic-uniqKey 构建索引
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
//为多个key构建索引
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
step1中,获取最后一个索引文件,同样,如果满了,或者没有,就创建一个,在创建一个新的索引文件同时,会开启一个线程异步刷盘上一个索引文集。这里就不详细分析retryGetAndCreateIndexFile()
该方法的,读者可以自行解读。
step2中, 根据客户端是否有指定UniqKey或者普通的key,然后会为每一个key都单独构建一个【key查询索引】,换言之,一个业务消息会存在多个【key查询索引】。接下来,我们重点分析索引构建方法 putKey(...),该方法是IndexFile
索引文件类的一个实例方法,因此,在分析该方法时,我们先看看索引文件的结构
public class IndexFile {
//hash槽大小
private static int hashSlotSize = 4;
//索引消息大小
private static int indexSize = 20;
private static int invalidIndex = 0;
//hash槽 个数
private final int hashSlotNum;
//索引个数
private final int indexNum;
//内存映射文件
private final MappedFile mappedFile;
//文件头
private final IndexHeader indexHeader;
/**
*
* @param fileName userRootPath/index/YYMMDDhhmmss + mmm(毫秒)
* @param hashSlotNum default value = 5000000
* @param indexNum default value = 5000000 * 4
*
* 如果IndexFile 在IndexService.indexFileList.isEmpty()空时创建,则 endPhyOffset = endTimestamp = 0;
* 否则, endPhyOffset = IndexService.indexFileList.last().endPhyOffset
* endTimestamp = IndexService.indexFileList.last().endTimestamp
* @param endPhyOffset
* @param endTimestamp
* @throws IOException
*/
public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
final long endPhyOffset, final long endTimestamp) throws IOException {
//fileTotalSize = 40 + (5,000,000[hashSlotNum] * 4[hashSlotSize]) + ((4 * 5,000,000)[indexNum] * 20[indexSize])
//1G = 1024 * 1024 * 1024 = 1,073,741,824 (10亿级)
int fileTotalSize =
IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
this.mappedFile = new MappedFile(fileName, fileTotalSize);
this.fileChannel = this.mappedFile.getFileChannel();
this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
this.hashSlotNum = hashSlotNum;
this.indexNum = indexNum;
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
this.indexHeader = new IndexHeader(byteBuffer);
if (endPhyOffset > 0) {
this.indexHeader.setBeginPhyOffset(endPhyOffset);
this.indexHeader.setEndPhyOffset(endPhyOffset);
}
if (endTimestamp > 0) {
this.indexHeader.setBeginTimestamp(endTimestamp);
this.indexHeader.setEndTimestamp(endTimestamp);
}
}
首先,索引文件格式分布为为:文件头IndexHeader
(40字节),主要存放索引文件的一些元数据;接下来就是五百万个hash槽(5000000),每一个槽大小为4字节,存放的内容为索引逻辑位移,换言之,4*5000000的字节用于存放逻辑位移,这个位移是查询的关键,待会再分析;剩下的字节内容全部用来按序存放索引内容字节;
在说一下文件头IndexHeader
:
public class IndexHeader {
//文件头存储大小
public static final int INDEX_HEADER_SIZE = 40;
//表示该索引文件 第一个构建索引的 消息 的存储物理时间(存放于缓存中)
private static int beginTimestampIndex = 0;
//表示该索引文件 最后一个构建索引的 消息 的存储物理时间(存放于缓存中)
private static int endTimestampIndex = 8;
//表示该索引文件 第一个构建索引的 消息 的存储物理地址
private static int beginPhyoffsetIndex = 16;
//表示该索引文件 最一个构建索引的 消息 的存储物理地址
private static int endPhyoffsetIndex = 24;
//应为消息是存储顺序来构建索引的,所以,消息的索引存储顺序也是递增的
private static int hashSlotcountIndex = 32;
}
内容见注释。
进入putKey(...),构建索引。
/**
*
* @param key 指定的key: topic-uniqueKey
* @param phyOffset 消息的持久化物理位移
* @param storeTimestamp 消息的存储时间,即消息放进缓存的时间
* @return
*/
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
//计算key的hash值
int keyHash = indexKeyHashMethod(key);
//hashSlotNum = 5000000 (5百万个hash槽)
//通过简单的hash算法,算出key所在的相对槽位置
int slotPos = keyHash % this.hashSlotNum;
//绝对hash槽位置 = 40 + slotPos(相对槽位置) * 4(hash槽大小)
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
...
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
....
//绝对索引位置 = 40(索引头大小) + 5000000(hashSlotNum) * 4(hashSlotSize) + IndexCount(当前已索引条数) * 20(索引大小)
//this.indexHeader.getIndexCount() * indexSize 这里保证每条索引实体所存放的位置是递增的
//存放索引的物理位移
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
//absSlotPos:绝对槽位置,该位置存放具有相同hash值的 最大 [key索引]逻辑位移
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
//散列 槽 +1
this.indexHeader.incHashSlotCount();
//索引个数+1
this.indexHeader.incIndexCount();
//更新索引头文件的尾部 消息物理位置,
this.indexHeader.setEndPhyOffset(phyOffset);
//更新索引头文件的尾部 消息存储时间,
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
...
}
...
return false;
}
以上就是构建【key查询索引】的主要代码,在分析前,我们先回顾一下HashMap,HahMap的内部数据结构是由数组 + 链表实现,而数组就是我们常说的槽,具有相同hash值或hash值取模数组长度后值相同的元素均路由到同一个槽中,然后在以链表的形式关联起来;而rmq也用了类似的方式组织及实现自身的索引方式。
IndexFile
索引文件实例就类比一个HashMap
实例,而索引文件的内部属性hashSlotNum
就类比于HashMap
的数组大小;HashMap
的槽中,第一个元素是最后一个插入该槽中的元素,而IndexFile
的槽中,存储的内容则是最后一个插入该槽中的【key查询索引】逻辑位移;通过该逻辑位移,我们可以查询出索引内容物理位移,并获取相关内容;HashMap
中,每一个元素是通过next引用获取下一个元素的引用;而【key查询索引】则通过存储上一个索引的逻辑位移表示引用。
我们分析一下如何通过索引的逻辑位移算出它的物理存储位移:
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize
首先文件头占据40字节(IndexHeader.INDEX_HEADER_SIZE
),接下来就是五百万个hash槽(5000000),每一个槽大小为4字节,存放的内容为索引逻辑位移,换言之,4*5000000(this.hashSlotNum * hashSlotSize)的字节用于存放逻辑位移;而每条索引大小为20字节。因此当我们获取索引的逻辑位移时,在加上逻辑位移乘以其大小(this.indexHeader.getIndexCount() * indexSize),即可算出索引存储内容的物理位移。
我们在来分析一下索引存储结构, 索引消息的大小(20字节),4(1) + 4(2) + 8(3) + 4(4)
(1):业务消息的key的hash值,
(2):业务消息的物理位置
(3):业务消息的存储时间以及索引文件创建的时间差(秒数)
(4):在相同槽位置中,上一条索引消息所在的逻辑索引,这里是查询的关键,
这里举个栗子:)
第一条索引消息的 的情况 :
1st_index_msg, logicIndex = 1,根据key,散列到slot_2,则1st_index_msg.slotValue = 0(logicIndex)
第二条索引消息的 的情况 :
2nd_index_msg, logicIndex = 2,根据key,散列到slot_2,则2nd_index_msg.slotValue = 1(logicIndex)
第三条索引消息的 的情况 :
3rd_index_msg, logicIndex = 3,根据key,散列到slot_3,则3rd_index_msg.slotValue = 0(logicIndex)
第四条索引消息的 的情况 :
4th_index_msg, logicIndex = 4,根据key,散列到slot_3,则4rd_index_msg.slotValue = 3(logicIndex)
第五条索引消息的 的情况 :
5th_index_msg, logicIndex = 5,根据key,散列到slot_2,则5th_index_msg.slotValue = 2(logicIndex)
这里就达到了一种类似链表的效果:
_ _ _ _ _ _ _ _
|slot_2| ------------------------------------------------------------|slot_3|
~~~~|~~~~ ~~~~|~~~
|1st_index_msg,logicIndex=1,preLogicIndex=0| |3rd_index_msg,logicIndex=3,preLogicIndex=0|
| |
|2nd_index_msg,logicIndex=2,preLogicIndex=1| |4th_index_msg,logicIndex=4,preLogicIndex=3|
|
|5th_index_msg,logicIndex=5,preLogicIndex=2|
换言之,当客户端需要通过指定的key来查询消息时,先通过散列算法,把key所在的slot槽值给算出
例如,还是到absSlotPos = slot_2, 获取的值为 logicIndex = 5
然后通过logicIndex 算出absIndexPos,即索引所在的绝对存储位置,分别获取phyOffset(具体消息的物理位置),以及preLogicIndex
然后在通过preLogicIndex 按照上述步骤找出具体的索引消息,直到preLogicIndex=0 为止。
以上就是【key查询索引】构建细节。
总结一下本章内容,通过ReputMessageService
该内部线程类,从指定的 业务消息存储物理位移开始,逐条读取完整的一条消息,然后为之构建【逻辑位移索引】以及【key查询索引】,直到构建至最新的消息。
到这里,我们已近把rmq 底层存储的相关内容以全部分析完。
以上即使rmq构建索引章节的全部内容。