RocksDB源码分析 Read(一)内存读取


SuperVersion* sv = GetAndRefSuperVersion;
SequenceNumber snapshot;
//获取snapshot (目前最大的sequence)
bool done = false;
  if (!skip_memtable) {
    // Get value associated with key
    if (get_impl_options.get_value) {
      if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
                       &merge_context, &max_covering_tombstone_seq,
                       read_options, get_impl_options.callback,
                       get_impl_options.is_blob_index)) {
        done = true;
        RecordTick(stats_, MEMTABLE_HIT);
      } else if ((s.ok() || s.IsMergeInProgress()) &&
                 sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s,
                              &merge_context, &max_covering_tombstone_seq,
                              read_options, get_impl_options.callback,
                              get_impl_options.is_blob_index)) {
        done = true;
        RecordTick(stats_, MEMTABLE_HIT);

memtable get

存在memtable里的key是key+(type and sequence)其中type and seq混合8字节

//先查询bloom filter
if (bloom_filter_) {
  // when both memtable_whole_key_filtering and prefix_extractor_ are set,
  // only do whole key filtering for Get() to save CPU
  if (moptions_.memtable_whole_key_filtering) {
    may_contain =
        bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz));
  } else {
    may_contain =
        !prefix_extractor_->InDomain(user_key) ||

GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
                 is_blob_index, value, s, merge_context, seq,
                 &found_final_value, &merge_in_progress);
void MemTable::GetFromTable(...){
  //构建saver 和回调
  Saver saver;
  saver.status = s;
  saver.found_final_value = found_final_value;
  saver.merge_in_progress = merge_in_progress;
  saver.key = &key;
  saver.value = value;
  saver.seq = kMaxSequenceNumber;
  saver.mem = this;
  saver.merge_context = merge_context;
  saver.max_covering_tombstone_seq = max_covering_tombstone_seq;
  saver.merge_operator = moptions_.merge_operator;
  saver.logger = moptions_.info_log;
  saver.inplace_update_support = moptions_.inplace_update_support;
  saver.statistics = moptions_.statistics;
  saver.env_ = env_;
  saver.callback_ = callback;
  saver.is_blob_index = is_blob_index;
  saver.do_merge = do_merge;
  table_->Get(key, &saver, SaveValue);

void MemTableRep::Get(const LookupKey& k, void* callback_args,
                      bool (*callback_func)(void* arg, const char* entry)) {
  auto iter = GetDynamicPrefixIterator();
  for (iter->Seek(k.internal_key(), k.memtable_key().data());
       iter->Valid() && callback_func(callback_args, iter->key());
       iter->Next()) {

inline void InlineSkipList<Comparator>::Iterator::Seek(const char* target) {
  //寻找key和sequence符合条件的 node
  //Returns the earliest node with a key >= key.
  // Return nullptr if there is no such node.
  //key在skip list里从大到小排
  //所以查找会找到key >= 我们需要的key
  node_ = list_->FindGreaterOrEqual(target);

static bool SaveValue(void* arg, const char* entry) {
  Saver* s = reinterpret_cast<Saver*>(arg);
  assert(s != nullptr);
  MergeContext* merge_context = s->merge_context;
  SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
  const MergeOperator* merge_operator = s->merge_operator;

  assert(merge_context != nullptr);

  // entry format is:
  //    klength  varint32
  //    userkey  char[klength-8]
  //    tag      uint64
  //    vlength  varint32f
  //    value    char[vlength]
  // Check that it belongs to same user key.  We do not check the
  // sequence number since the Seek() call above should have skipped
  // all entries with overly large sequence numbers.
  uint32_t key_length;
  const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  Slice user_key_slice = Slice(key_ptr, key_length - 8);
  if (s->mem->GetInternalKeyComparator()
          ->CompareWithoutTimestamp(user_key_slice, s->key->user_key()) == 0) {
    // Correct user key
    const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
    ValueType type;
    SequenceNumber seq;
    UnPackSequenceAndType(tag, &seq, &type);
    // If the value is not in the snapshot, skip it
    if (!s->CheckCallback(seq)) {
      return true;  // to continue to the next seq

    s->seq = seq;

    if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
        max_covering_tombstone_seq > seq) {
      type = kTypeRangeDeletion;
    switch (type) {
      //根据type处理key value

  // s->state could be Corrupt, merge or notfound
  return false;



SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  //如果刚获取完superversion,就发现已经过期了。那就把这个给删了,直接通过加锁获取当前最新的super version
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
      } else {
        sv_to_delete = sv;
    } else {
    sv = super_version_->Ref();

    delete sv_to_delete;
  assert(sv != nullptr);
  return sv;


void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
                                          SuperVersion* sv) {
  if (!cfd->ReturnThreadLocalSuperVersion(sv)) {

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
    // storage has not been altered and no Scrape has happened. The
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  return false;


void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
    const MutableCFOptions& mutable_cf_options) {
  SuperVersion* new_superversion = sv_context->new_superversion.release();
  new_superversion->db_mutex = db_mutex;
  new_superversion->mutable_cf_options = mutable_cf_options;
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  super_version_->version_number = super_version_number_;
  super_version_->write_stall_condition =

  if (old_superversion != nullptr) {
    // Reset SuperVersions cached in thread local storage.
    // This should be done before old_superversion->Unref(). That's to ensure
    // that local_sv_ never holds the last reference to SuperVersion, since
    // it has no means to safely do SuperVersion cleanup.

    if (old_superversion->mutable_cf_options.write_buffer_size !=
        mutable_cf_options.write_buffer_size) {
    if (old_superversion->write_stall_condition !=
        new_superversion->write_stall_condition) {
          new_superversion->write_stall_condition, GetName(), ioptions());
    if (old_superversion->Unref()) {




