CommitLog文件讲解
概述
commitlog文件的存储地址:$HOME\store\commitlog${fileName}
,每个文件的大小默认1G,commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0也就是fileFromOffset
值,当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,以此类推,第三个文件名字为00000000002147483648,起始偏移量为2147483648。消息存储的时候会顺序写入文件,当文件满了,写入下一个文件。
文件结构
顺序编号 |字段简称 |字段大小(字节)| 字段含义
---|---|---|---|---
1| msgSize |4| 代表这个消息的大小
2 |MAGICCODE| 4 |MAGICCODE = daa320a7
3 |BODY CRC |4 |消息体BODY CRC 当broker重启recover时会校验
4 |queueId |4 |消息队列id
5 |flag| 4 |
6 |QUEUEOFFSET| 8 |这个值是个自增值不是真正的consume queue的偏移量,可以代表这个consumeQueue队列或者tranStateTable队列中消息的个数,若是非事务消息或者commit事务消息,可以通过这个值查找到consumeQueue中数据,QUEUEOFFSET * 20才是偏移地址;若是PREPARED或者Rollback事务,则可以通过该值从tranStateTable中查找数据
7| PHYSICALOFFSET |8 |代表消息在commitLog中的物理起始地址偏移量
8 |SYSFLAG| 4 |指明消息是事物事物状态等消息特征,二进制为四个字节从右往左数:当4个字节均为0(值为0)时表示非事务消息;当第1个字节为1(值为1)时表示表示消息是压缩的(Compressed);当第2个字节为1(值为2)表示多消息(MultiTags);当第3个字节为1(值为4)时表示prepared消息;当第4个字节为1(值为8)时表示commit消息;当第3/4个字节均为1时(值为12)时表示rollback消息;当第3/4个字节均为0时表示非事务消息;
9 |BORNTIMESTAMP| 8 |消息产生端(producer)的时间戳
10| BORNHOST |8| 消息产生端(producer)地址(address:port)
11| STORETIMESTAMP |8 |消息在broker存储时间
12| STOREHOSTADDRESS |8 |消息存储到broker的地址(address:port)
13| RECONSUMETIMES| 8 |消息被某个订阅组重新消费了几次(订阅组之间独立计数),因为重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了,成功消费一次记录为0;
14| PreparedTransaction Offset| 8 |表示是prepared状态的事物消息
15| messagebodyLength |4| 消息体大小值
16| messagebody| bodyLength| 消息体内容
17| topicLength| 1 |topic名称内容大小
18| topic |topicLength |topic的内容值
19| propertiesLength| 2 |属性值大小
20| properties |propertiesLength |propertiesLength大小的属性数据
可以看到CommitLog文件的一个消息体的长度是不确定的,但是有字段messagebodyLength
来表示的是消息体大小和propertiesLength
表示属性值的大小。所以可以计算出这个消息数据的大小。
CommitLog
类分析
字段属性分析
//用来验证消息的合法性,类似于java的魔数的作用
public final static int MESSAGE_MAGIC_CODE = -626843481;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = -875286124;
//映射文件集合
private final MappedFileQueue mappedFileQueue;
//默认消息存储类,CommitLog的所有操作都是通过DefaultMessageStore来进行的
private final DefaultMessageStore defaultMessageStore;
//刷盘的任务类
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
//在启用了临时存储池的时候,定期把消息提交到FileChannel的任务类
private final FlushCommitLogService commitLogService;
//消息拼接的类
private final AppendMessageCallback appendMessageCallback;
//消息的编码器,线程私有
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
//消息topic的偏移信息
private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
private volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
//消息锁
private final PutMessageLock putMessageLock;
字段 | 作用 |
---|---|
MESSAGE_MAGIC_CODE | 用来验证消息的合法性,类似于java的魔数的作用 |
BLANK_MAGIC_CODE | 消息不够存储的时候用这个来表示 |
mappedFileQueue |
MappedFile 集合,表示的是CommitLog映射文件的集合 |
defaultMessageStore | 用于操作CommitLog 类的对象 |
flushCommitLogService | 定时刷盘的任务线程对象 |
commitLogService | 在启用了临时存储池的时候,定期把消息提交到FileChannel的任务类 |
appendMessageCallback | 异步拼接消息体的回调对象 |
batchEncoderThreadLocal | 用于对消息进行编码 |
topicQueueTable | 每个消息topic的偏移信息,因为RocketMQ的Topic都存在一个CommitLog文件中,所以需要记录每个Topic的消费进度信息 |
putMessageLock | 并发存储消息时候的锁 |
内部类分析
在CommitLog
中有几个内部类,跟文件的刷盘有关比如FlushRealTimeService
和别的类,以及跟消息编码有关的MessageExtBatchEncoder
,这里主要介绍跟消息提交和刷盘有关的几个内部类。后面的分析很多都是基于前面的两篇文章的基础上来进行分析的。
消息提交CommitRealTimeService
CommitRealTimeService
主要就是定时的把临时存储池中的消息commit到FileChannel
中,便于下次flush刷盘操作。而这个类只有在开启临时存储池的时候才会有用。
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//todo 获取配置的 刷新commitLog频次 默认200ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
//todo 获取配置的 提交数据页数 默认4
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
//todo 获取配置的 提交commitLog最大频次 默认200ms
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
//对数据进行提交
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
flushCommitLogService.wakeup();
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}
boolean result = false;
//进入这里表面服务准备停止,此时把还没提交的进行提交,最多重试10次
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
消息异步刷盘FlushRealTimeService
FlushRealTimeService
的主要作用就是刷盘相关的,直接上代码
class FlushRealTimeService extends FlushCommitLogService {
private long lastFlushTimestamp = 0;
private long printTimes = 0;
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
//如果任务没有停止,停止的时候会调用对应的shutdown方法,把对应的stop字段修改为true
while (!this.isStopped()) {
//todo 获取是否定时刷新日志的设定 参数为 flushCommitLogTimed 默认为false
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
//todo 获取刷新到磁盘的时间间隔 参数为 flushIntervalCommitLog 默认为500毫秒
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
//todo 获取一次刷新到磁盘的最少页数,参数为flushCommitLogLeastPages 默认为4
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
//todo 获取刷新CommitLog的频率 参数为flushCommitLogThoroughInterval 默认为10000毫秒
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
//计算日志刷新进度信息
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}
try {
//如果定时刷新日志,则把线程sleep对应的规定时间
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
//使用的是CountDownLatch等待对应时间
this.waitForRunning(interval);
}
//打印进度
if (printFlushProgress) {
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
//进行文件的刷盘
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
//获取文件的最后修改时间也就是最后的刷新时间
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
//todo 设置CheckPoint文件的physicMsgTimestamp 消息物理落盘时间
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
//todo 如果服务停止,那么把剩余的没有刷新到磁盘的消息刷盘,重复次数为10次
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end");
}
}
逻辑就是一直循环不断把映射文件队列中的消息进行刷盘。其中有几个参数可以人为的配置。
参数 | 作用 | 默认值 |
---|---|---|
flushCommitLogTimed | 是否定时刷新日志 | 默认为false |
flushIntervalCommitLog | 刷新到磁盘的时间间隔 | 默认为500毫秒 |
flushCommitLogLeastPages | 一次刷新到磁盘的最少页数 | 默认为4 |
flushCommitLogThoroughInterval | 刷新CommitLog的频率 | 默认为10000毫秒 |
消息同步刷盘GroupCommitService
这个类内部使用了CountDownLatch
来进行一个任务调度。先看看入口方法
public synchronized void putRequest(final GroupCommitRequest request) {
//添加写请求到集合中
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
//启动提交线程
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
可以看到这个方法是吧传入的提交消息的请求,放到一个写的队列中。
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
//服务没有停止则循环进行
while (!this.isStopped()) {
try {
//等待10毫秒后执行,这个里面会调用onWaitEnd 方法
this.waitForRunning(10);
//执行提交
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
//交换读写任务,能进入这里说明应用已经准备停止了,
this.swapRequests();
}
//
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
在运行的时候会先等待10毫秒,而这10毫秒期间会调用,内部的onWaitEnd
方法进而调用swapRequests
方法,吧读写请求进行交换。
protected void onWaitEnd() {
this.swapRequests();
}
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
在读写请求交换完了之后就是doCommit
方法了,这个方法就是吧请求的消息进行落盘
private void doCommit() {
synchronized (this.requestsRead) {
//如果读任务不为空则迭代处理
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of two times the flush
// todo 可能存在一条消息存在下一个文件中,因此最多可能存在两次刷盘
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
//如果文件刷盘的偏移量<请求的下一个偏移量,则说明还没有刷新完,还需要继续刷新
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
//刷新完毕 唤醒用户线程
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
//todo 刷新CheckPoint文件的physicMsgTimestamp 消息物理落盘时间
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
内部类的启动个关闭
上面的这些内部类,有的是根据构造CommitLog
类的时候进行初始化的。而对应的启动和停止在CommitLog
中,而这些方法的调用又是在前面字段属性介绍的DefaultMessageStore
中进行调用的。
public void start() {
// 开启刷盘线程
this.flushCommitLogService.start();
/**
* 如果使用的是临时存储池来保存消息,则启动定期提交消息的线程,把存储池的信息提交到fileChannel中
* 只有在开启了使用临时存储池 && 刷盘为异步刷盘 && 是master节点 的情况才会为true
*/
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
public void shutdown() {
//关闭提交临时存储池的任务
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.shutdown();
}
//关闭刷盘线程
this.flushCommitLogService.shutdown();
}
方法分析
构造方法
public CommitLog(final DefaultMessageStore defaultMessageStore) {
//创建MappedFileQueue对象,传入的路径是配置的CommitLog的文件路径,和默认的文件大小1G,同时传入提前创建MappedFile对象的AllocateMappedFileService
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
//刷盘的模式如果是 同步刷盘SYNC_FLUSH 则对应的刷盘线程对象为GroupCommitService
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
//刷盘模式为 异步刷盘ASYNC_FLUSH 则对应的刷盘线程对象为FlushRealTimeService
this.flushCommitLogService = new FlushRealTimeService();
}
//提交日志的线程任务对象 CommitRealTimeService
this.commitLogService = new CommitRealTimeService();
//拼接消息的回调对象
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
//定于对应的消息编码器,会设定消息的最大大小,默认是512k
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
//存储消息的时候用自旋锁还是互斥锁(用的是JDK的ReentrantLock),默认的是自旋锁(用的是JDK的原子类的AtomicBoolean)
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
构造函数主要是读取对应的配置信息,然后初始化对应的类。其中需要注意的是同步刷盘和异步刷盘使用的对象类型是不一样的。对应的配置参数有这些
参数 | 作用 | 默认值 |
---|---|---|
storePathCommitLog |
指定CommitLog的存储路径 | ${user.home}/store/commitlog |
mapedFileSizeCommitLog |
指定CommitLog的文件大小 | 默认1G |
flushDiskType |
指定CommitLog的刷盘类型 | 默认是异步刷盘 |
maxMessageSize |
单个消息的最大大小 | 默认512k |
useReentrantLockWhenPutMessage |
存储消息是否使用互斥锁(jdk的ReentrantLock ) |
默认是false ,使用自旋锁 (JDK的原子类的AtomicBoolean ) |
文件加载load
public boolean load() {
//加载映射文件集合
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
获取消息getData
这个方法会返回传入的偏移量所在的消息的buffer
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
//获取配置的CommitLog 的文件大小
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
//按offset查询映射文件,如果在偏移量为0的时候,会返回新创建的CommitLog文件映射对象,因为这是第一次插入
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
//位置=偏移量%文件大小
int pos = (int) (offset % mappedFileSize);
//获取消息所在映射buffer
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
添加消息putMessage
和putMessages
putMessage
和putMessages
都是添加消息到CommitLog的方法,只不过一个是添加单个消息,一个是添加多个消息的。这里只讲解添加单个消息的,添加多个消息的大家可以自行查看源码。逻辑步骤如下:
- 设置消息对应的存储时间并对消息体编码
- 获取消息topic和queueId为后面使用
- 获取消息的事务类型
- 如果不是事务消息,或者是事务消息的提交阶段,则还原消息原来的topic和queueId
- 获取存储锁
- 进行消息的存储,如果期间文件满了则再次存储,出错则抛错
- 释放锁和映射文件,增加对应的记录信息
- 进行刷盘
- 进行高可用刷盘
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
//获取当前系统时间作为消息写入时间
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting on the client)
//设置编码后的消息体
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
//从消息中获取topic
String topic = msg.getTopic();
//从消息中获取queueId
int queueId = msg.getQueueId();
//获取事务类型(非事务性(第3/4字节为0),提交事务(commit,第4字节为1,第3字节为0)消息)
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
//如果不是事务消息 或者 是事务消息的提交阶段
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 如果设置了延迟时间
if (msg.getDelayTimeLevel() > 0) {
//延迟级别不能超过最大的延迟级别,超过也设置为最大的延迟级别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//设置延迟消息的topic
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//延迟消息的queueId= 延迟级别-1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId 备份真正的topic和queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
//获取映射文件队列的最后一个映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//自旋锁或者互斥锁
putMessageLock.lock();
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
//开始锁定时间
this.beginTimeInLock = beginLockTimestamp;
//设置消息的存储时间
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
//映射文件不存在或者映射文件满了则创建一个文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//映射文件中添加消息,这里的appendMessageCallback是消息拼接对象,拼接过程不分析
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
//映射文件满了
case END_OF_FILE:
unlockMappedFile = mappedFile;
//创建一个文件来进行存储
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
//重新添加消息=》
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
// 消息过大
case MESSAGE_SIZE_EXCEEDED:
//消息属性过大
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
//释放锁
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
//消息存储的多定时间过长
if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
//解锁映射文件
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics 单次存储消息topic次数
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
//单次存储消息topic大小
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
//磁盘刷新=》
handleDiskFlush(result, putMessageResult, msg);
// 主从刷新=》
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
消息刷盘handleDiskFlush
刷盘的逻辑稍微简单,主要任务交给了前面讲的两个刷盘相关的内部类了。
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush 同步刷新
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
//是否等待存储
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//countdownLatch.await() 同步等待刷新结果,除非超时
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
//如果异步直接解除阻塞 countdownLatch.countDown()
service.wakeup();
}
}
// Asynchronous flush 异步刷新
else {
//没有启用临时存储池,则直接唤醒刷盘的任务
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
//如果使用临时存储池,需要先唤醒提交消息的任务
commitLogService.wakeup();
}
}
}
消息高可用刷盘handleHA
高可用的消息刷盘,只有在设置了主从同步方式为同步方式的时候,才会有后续的逻辑。逻辑就是判断主从之间的消息差偏移量是否在设置的范围内,如果是的就可以对主库进行刷盘。
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
//如果设置的主从之间是同步更新
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// 检查slave同步的位置是否小于 最大容忍的同步落后偏移量,如果是的则进行刷盘
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
//countDownLatch.await 同步等待刷新,除非等待超时
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
//设置从服务不可用的状态
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
服务正常恢复recoverNormally
recoverNormally
方法在RocketMQ正常关闭然后启动的时候会调用,这个方法就是把加载的映射文件列表进行遍历,对文件进行校验,和文件中的消息的魔数进行校验,来判断哪些数据是正常的,并计算出正常的数据的最大偏移量。然后,根据偏移量设置对应的提交和刷新的位置以及不正常数据的删除。
public void recoverNormally() {
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
//如果文件列表大于3就从倒数第3个开始,否则从第一个开始
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
//校验消息,然后返回转发请求,根据Magic_code正确,并且crc32正确,并且消息的msgSize记录大小和消息整体大小相等。则表示是合格的消息
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
//是一个合格的消息并且消息体大于0
if (dispatchRequest.isSuccess() && size > 0) {
//则读取的偏移量mapedFileOffset累加msgSize
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the return 0 representatives met last hole, this can not be included in truncate offset
//是合格的消息,但是消息体为0,表示读取到了文件的最后一块信息
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
//文件读完了
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
//最后读取的MapedFile对象的fileFromOffset加上最后读取的位置mapedFileOffset值
processOffset += mappedFileOffset;
//设置文件刷新到的offset
this.mappedFileQueue.setFlushedWhere(processOffset);
//设置文件提交到的offset
this.mappedFileQueue.setCommittedWhere(processOffset);
//删除offset之后的脏数据文件
this.mappedFileQueue.truncateDirtyFiles(processOffset);
}
}
服务异常恢复recoverAbnormally
异常恢复的逻辑比较复杂,会先检查对应的文件的最后的消息落盘时间。
- 开启消息索引功能(默认开启)并且使用安全的消息索引功能(默认不开启)的情况下:日志的落盘时间要小于checkpoint的最小落盘时间
- 没有开启的时候:落盘时间需要小于checkpoint文件中物理队列消息时间戳、逻辑队列消息时间戳这两个时间戳中最小值
如果检查符合要求之后才能进行的校验。这两个参数分别是
参数 | 描述 | 默认值 |
---|---|---|
messageIndexEnable |
是否开启的索引功能,开启后会保存到Index文件中 | true |
messageIndexSafe |
是否开启安全的消息索引功能 | false |
这里说明Index文件是对应的索引文件,后面会有文章分析的。
public void recoverAbnormally() {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file 从最后一个文件开始恢复
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
//检查文件是否符合恢复的条件
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
//校验消息并返回消息的大小
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// 如果size大于0表示是正常的消息,
if (size > 0) {
mappedFileOffset += size;
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
//如果消息在CommitLog中的物理起始偏移量 <
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
// 消息存储转发=》
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
// =》
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Intermediate file read error
else if (size == -1) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
// Come the end of the file, switch to the next file Since the return 0 representatives met last hole, this can not be included in truncate offset
//如果为0 表示文件的尾部不用处理,进入下一个文件
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
}
processOffset += mappedFileOffset;
// 设置刷新offset位置
this.mappedFileQueue.setFlushedWhere(processOffset);
// 设置commitOffset
this.mappedFileQueue.setCommittedWhere(processOffset);
// 删除脏数据文件=》
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data 清除消息队列冗余数据=》
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
// Commitlog case files are deleted
else {
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
// 销毁消息队列=》
this.defaultMessageStore.destroyLogics();
}
}
//
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//校验文件的magic_code
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
if (magicCode != MESSAGE_MAGIC_CODE) {
return false;
}
//获取消息存储时间字段STORETIMESTAMP
long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
//落盘时间需要大于0
if (0 == storeTimestamp) {
return false;
}
//开启消息索引功能(默认开启)并且使用安全的消息索引功能(默认不开启)
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
//日志的落盘时间要小于checkpoint的最小落盘时间
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {
//没有开启安全的消息索引功能,则落盘时间需要小于checkpoint文件中物理队列消息时间戳、逻辑队列消息时间戳这两个时间戳中最小值
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
return false;
}