写入流程
- 将一条或者多条操作的记录封装到WriteBatch
- 将记录对应的日志写到WAL文件中
- 将WriteBatch中的一条或者多条记录写到内存中的memtable中
Group Commit
每个写线程都会生成一个WriteThread::Write,待写入的数据加入write链表中,选出一个leader线程负责将这些数据写入wal和memtable(如果是并发写模式,则leader在写入wal数据后就唤醒其他写线程,将自己的数据写入memtable)。
当线程成为leader线程之后,将开始进入提交逻辑(以下简略部份逻辑):
- 调用WriteThread::EnterAsBatchGroupLeader函数,由leader线程构造一个WriteGroup对象的实例,WriteGroup对象的实例用于描述当作Group Commit要写入WAL的所有内容。
- 确定本批次要提交的最大长度max_size。如果leader线程要写入WAL的记录长度大于128k,则本次max_size为1MB;如果leader的记录长度小于128k, 则max_size为leader的记录长度+128k。
- 找到当前链表中最新的Write实例newestwriter,通过调用CreateMissingNewerLinks(newest_writer),将整个链表的链接成一个双向链表。
- 从leader所在的Write开始遍历,直至newestwrite。累加每个writer的size,超过max_size就提前截断;另外地,也检查writer的一些flag,与leaer不一致也提前截断。将符合的最后的一个write记录到WriteGroup::last_write中。
- 检查是否可以并发写入memtable,条件有:1. memtable本身支持;2. 没有merge操作 3. 设置allow_concurrent_memtable_write
- 写WAL文件,将write_group合并成一个新的WriteGroup实例merge_group,将merge_group中的记录fsync到WAL文件中。
- 如果不支持并发写memtable,则由leader串行将write_group的所有数据串行地写到memtable;否则,leader线程将通过调用LaunchParallelMemTableWriter函数通知所有的follower线程并发写memtable。
- 待所有的线程(不管leader线程或者follower线程)写完memtable,都会调用CompleteParallelMemTableWriter判断自己是否是最后一个完成写memtable的线程,如果不是最后一个则等待被通知;如果是最后一个是follower线程,通过调用ExitAsBatchGroupFollower函数,调用ExitAsBatchGroupLeader通知所有follower可以退出,并且通知leader线程。如果最后一个完成的是leader线程,则可以直接调用ExitAsBatchGroupLeader函数。
- ExitAsBatchGroupLeader函数除了通知follower线程提交已经完成,还有另一个作用。在这一轮Group Commit进行过程中,writer链表可能新添加了许多待提交的事务。当退出本次Group Commit之前,如果writer链表上有新的待提交的事务,将它设置成leader。这个成为leader的线程将被唤醒,重复leader线程进行Group Commit的逻辑。
源码分析
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used,
size_t batch_cnt,
PreReleaseCallback* pre_release_callback) {
assert(!seq_per_batch_ || batch_cnt != 0);
...
//disable_memtable或者unordered_write或者是走PipelinedWriteImpl的处理
...
//正常的写入流程
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, batch_cnt, pre_release_callback);
if (!write_options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL);
}
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
//将本线程要写入的数据加入batch Group(并wait住)
write_thread_.JoinBatchGroup(&w);
//如果出来的状态是并发写入memtable,则自己不是leader且leader已经将数据写入wal,可以将自己的数据写入memtable
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// we are a non-leader in a parallel group
if (w.ShouldWriteToMemtable()) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_memtable_time);
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
batch_per_txn_, write_options.memtable_insert_hint_per_batch);
PERF_TIMER_START(write_pre_and_post_process_time);
}
//判断自己是否是最后一个写入memtable的线程,如果是则需要做最后的处理
//设置last seq、选出下一个正在等待的线程中的leader继续做group commit逻辑
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
// we're responsible for exit batch group
// TODO(myabandeh): propagate status to write_group
auto last_sequence = w.write_group->last_sequence;
versions_->SetLastSequence(last_sequence);
MemTableInsertStatusCheck(w.status);
write_thread_.ExitAsBatchGroupFollower(&w);
}
assert(w.state == WriteThread::STATE_COMPLETED);
// STATE_COMPLETED conditional below handles exit
status = w.FinalStatus();
}
//非并发写入memtable模式,leader已经将数据写入wal和memtable
if (w.state == WriteThread::STATE_COMPLETED) {
if (log_used != nullptr) {
*log_used = w.log_used;
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
// write is complete and leader has updated sequence
return w.FinalStatus();
}
// else we are the leader of the write batch group
//当前线程被选为leader工作
assert(w.state == WriteThread::STATE_GROUP_LEADER);
// Once reaches this point, the current writer "w" will try to do its write
// job. It may also pick up some of the remaining writers in the "writers_"
// when it finds suitable, and finish them in the same write batch.
// This is how a write job could be done by the other writer.
WriteContext write_context;
WriteThread::WriteGroup write_group;
bool in_parallel_group = false;
uint64_t last_sequence = kMaxSequenceNumber;
mutex_.Lock();
bool need_log_sync = write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
if (!two_write_queues_ || !disable_memtable) {
// With concurrent writes we do preprocess only in the write thread that
// also does write to memtable to avoid sync issue on shared data structure
// with the other thread
// PreprocessWrite does its own perf timing.
PERF_TIMER_STOP(write_pre_and_post_process_time);
//预处理write,判断写入是否合法,是否需要切换memtable等
//获取上一次写入的sequence
status = PreprocessWrite(write_options, &need_log_sync, &write_context);
if (!two_write_queues_) {
// Assign it after ::PreprocessWrite since the sequence might advance
// inside it by WriteRecoverableState
last_sequence = versions_->LastSequence();
}
PERF_TIMER_START(write_pre_and_post_process_time);
}
log::Writer* log_writer = logs_.back().writer;
mutex_.Unlock();
...
//构建
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
if (status.ok()) {
//判断是否能并发写入memtable
//条件:
//检查是否设置allow_concurrent_memtable_write
//检查每一个batch是否其中有merge操作
bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
write_group.size > 1;
{check merge}
...
{记录一些状态}
...
//将数据写入WAL
if (!two_write_queues_) {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, last_sequence + 1);
}
}
assert(last_sequence != kMaxSequenceNumber);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += seq_inc;
//在写memtable之前为每一个线程的write设置好sequence
// PreReleaseCallback is called after WAL write and before memtable write
if (status.ok()) {
SequenceNumber next_sequence = current_sequence;
size_t index = 0;
// Note: the logic for advancing seq here must be consistent with the
// logic in WriteBatchInternal::InsertInto(write_group...) as well as
// with WriteBatchInternal::InsertInto(write_batch...) that is called on
// the merged batch during recovery from the WAL.
for (auto* writer : write_group) {
if (writer->CallbackFailed()) {
continue;
}
writer->sequence = next_sequence;
if (writer->pre_release_callback) {
Status ws = writer->pre_release_callback->Callback(
writer->sequence, disable_memtable, writer->log_used, index++,
pre_release_callback_cnt);
if (!ws.ok()) {
status = ws;
break;
}
}
//如果不是seq_per_batch,那么每一个put都会使seqence+1
if (seq_per_batch_) {
assert(writer->batch_cnt);
next_sequence += writer->batch_cnt;
} else if (writer->ShouldWriteToMemtable()) {
next_sequence += WriteBatchInternal::Count(writer->batch);
}
}
}
if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);
//如果不能并发,就leader自己将write group写入memtable
//如果可以并发,就先将自己的write写入memtable
if (!parallel) {
// w.sequence will be set inside InsertInto
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, &trim_history_scheduler_,
write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
batch_per_txn_);
} else {
//唤醒所有等待的follower线程,并发写入memtable
write_group.last_sequence = last_sequence;
write_thread_.LaunchParallelMemTableWriters(&write_group);
in_parallel_group = true;
// Each parallel follower is doing each own writes. The leader should
// also do its own.
if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence);
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
w.batch_cnt, batch_per_txn_,
write_options.memtable_insert_hint_per_batch);
}
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
}
}
PERF_TIMER_START(write_pre_and_post_process_time);
if (!w.CallbackFailed()) {
WriteStatusCheck(status);
}
if (need_log_sync) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
mutex_.Unlock();
// Requesting sync with two_write_queues_ is expected to be very rare. We
// hence provide a simple implementation that is not necessarily efficient.
if (two_write_queues_) {
if (manual_wal_flush_) {
status = FlushWAL(true);
} else {
status = SyncWAL();
}
}
}
bool should_exit_batch_group = true;
if (in_parallel_group) {
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
// 这个函数在并发写memtable时leader和follower都会调用
// 判断自己是否是最后一个完成写memtable的线程,如果不是最后一个则等待被通知;如果是最后一个是follower线程,通过调用ExitAsBatchGroupFollower函数,调用ExitAsBatchGroupLeader通知所有follower可以退出,并且通知leader线程。
should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
}
if (should_exit_batch_group) {
if (status.ok()) {
// Note: if we are to resume after non-OK statuses we need to revisit how
// we reacts to non-OK statuses here.
versions_->SetLastSequence(last_sequence);
}
MemTableInsertStatusCheck(w.status);
write_thread_.ExitAsBatchGroupLeader(write_group, status);
}
if (status.ok()) {
status = w.FinalStatus();
}
return status;
}
如何组织BatchGroup
1、JoinBatchGroup
void WriteThread::JoinBatchGroup(Writer* w) {
...
//将write链起来
bool linked_as_leader = LinkOne(w, &newest_writer_);
if (linked_as_leader) {
SetState(w, STATE_GROUP_LEADER);
}
//如果不是leader则等待唤醒
if (!linked_as_leader) {
/**
* Wait util:
* 1) An existing leader pick us as the new leader when it finishes
* 2) An existing leader pick us as its follewer and
* 2.1) finishes the memtable writes on our behalf
* 2.2) Or tell us to finish the memtable writes in pralallel
* 3) (pipelined write) An existing leader pick us as its follower and
* finish book-keeping and WAL write for us, enqueue us as pending
* memtable writer, and
* 3.1) we become memtable writer group leader, or
* 3.2) an existing memtable writer group leader tell us to finish memtable
* writes in parallel.
*/
AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&jbg_ctx);
}
}
bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
//利用原子变量CAS特性来解决并发问题,获得有序的writer链表
Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
// If write stall in effect, and w->no_slowdown is not true,
// block here until stall is cleared. If its true, then return
// immediately
if (writers == &write_stall_dummy_) {
//...
}
w->link_older = writers;
if (newest_writer->compare_exchange_weak(writers, w)) {
return (writers == nullptr);
}
}
}
2、EnterAsBatchGroupLeader
size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
WriteGroup* write_group) {
...
size_t size = WriteBatchInternal::ByteSize(leader->batch);
//确定本次提交的max_size。如果leader线程要写入WAL的记录长度大于128k,则本次max_size为1MB;如果leader的记录长度小于128k, 则max_size为leader的记录长度+128k。
//max_write_batch_group_size_bytes为options可配置值,默认1M,min_batch_size_bytes = 1M / 8
size_t max_size = max_write_batch_group_size_bytes;
const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
if (size <= min_batch_size_bytes) {
max_size = size + min_batch_size_bytes;
}
leader->write_group = write_group;
write_group->leader = leader;
write_group->last_writer = leader;
write_group->size = 1;
Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
// 将此链表构建成一个双向链表,更新的数据在当前newest writer后,所以不会加入双向链表
CreateMissingNewerLinks(newest_writer);
// Tricky. Iteration start (leader) is exclusive and finish
// (newest_writer) is inclusive. Iteration goes from old to new.
Writer* w = leader;
while (w != newest_writer) {
w = w->link_newer;
... //做一些writer的检查
auto batch_size = WriteBatchInternal::ByteSize(w->batch);
if (size + batch_size > max_size) {
// Do not make batch too big
break;
}
w->write_group = write_group;
size += batch_size;
write_group->last_writer = w;
write_group->size++;
}
TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
return size;
}
写入WAL
Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
bool need_log_sync, bool need_log_dir_sync,
SequenceNumber sequence) {
Status status;
assert(!write_group.leader->disable_wal);
// Same holds for all in the batch group
size_t write_with_wal = 0;
WriteBatch* to_be_cached_state = nullptr;
//创建一个merge batch,就是把多个batch append成一个
WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
&write_with_wal, &to_be_cached_state);
//sequence为last sequence+1,在读取的时候根据记录在计算sequence
WriteBatchInternal::SetSequence(merged_batch, sequence);
//整个record是一起写入的(batch都写在一起了)
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
写入Memtable
Status WriteBatchInternal::InsertInto(
...) {
//构建memtable inserter
MemTableInserter inserter(
sequence, memtables, flush_scheduler, trim_history_scheduler,
ignore_missing_column_families, recovery_log_number, db,
concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
batch_per_txn);
//遍历write batch
for (auto w : write_group) {
if (w->CallbackFailed()) {
continue;
}
w->sequence = inserter.sequence();
if (!w->ShouldWriteToMemtable()) {
// In seq_per_batch_ mode this advances the seq by one.
inserter.MaybeAdvanceSeq(true);
continue;
}
SetSequence(w->batch, inserter.sequence());
inserter.set_log_number_ref(w->log_ref);
//向memtable插入数据
w->status = w->batch->Iterate(&inserter);
if (!w->status.ok()) {
return w->status;
}
...
}
return Status::OK();
}
//插入一个batch的数据
Status WriteBatchInternal::Iterate(const WriteBatch* wb,
WriteBatch::Handler* handler, size_t begin,
size_t end) {
...
//初始化batch数据
Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
handler_continue = handler->Continue();
if (!handler_continue) {
break;
}
if (LIKELY(!s.IsTryAgain())) {
last_was_try_again = false;
tag = 0;
column_family = 0; // default
//读取batch的一个record
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob, &xid);
if (!s.ok()) {
return s;
}
}
...
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
//内部会对sequence++
s = handler->PutCF(column_family, key, value);
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
break;
///略
}
}