Comparator
首先Comparator
是一个抽象类,导出了几个接口。
class Comparator {
public:
virtual ~Comparator();
//a < b --> (<0 ) || a > b --> (>0) || a==b --> 0
virtual int Compare(const Slice& a, const Slice& b) const = 0;
// Comparator 的名字
virtual const char* Name() const = 0;
// 下面两个函数都是用来减少像index block这样的内部
// 数据结构占用的空间
// 如果*start < limit,就在[start,limit)中找到一个
// 短字符串,并赋给*start返回 简单的comparator实现可能不改变*start,这也是正确的
virtual void FindShortestSeparator(
std::string* start,
const Slice& limit) const = 0;
// Changes *key to a short string >= *key.
// Simple comparator implementations may return with *key unchanged,
virtual void FindShortSuccessor(std::string* key) const = 0;
};
InternalKeyComparator
先看下Compare
函数
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}
逻辑非常简单易懂
- 首先比较
user_key
,如果user_key
不相同,就直接返回比较结果,否则继续进行第二步 - 在
user_key
相同的情况下,比较sequence_numer | value type
然后返回结果(注意其实这里sequence_number
已经是唯一的了)
再看看FindShortestSeparator
函数
void InternalKeyComparator::FindShortestSeparator(
std::string* start,
const Slice& limit) const {
// Attempt to shorten the user portion of the key
Slice user_start = ExtractUserKey(*start);
Slice user_limit = ExtractUserKey(limit);
std::string tmp(user_start.data(), user_start.size());
user_comparator_->FindShortestSeparator(&tmp, user_limit);
if (tmp.size() < user_start.size() &&
user_comparator_->Compare(user_start, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
assert(this->Compare(*start, tmp) < 0);
assert(this->Compare(tmp, limit) < 0);
start->swap(tmp);
}
}
该函数取出Internal Key
中的user key
字段,根据user
指定的comparator
找到并替换start
,如果start
被替换了,就用新的start
更新Internal Key
,并使用最大的sequence number
。否则保持不变。
接下来FindShortSuccessor
函数
void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
Slice user_key = ExtractUserKey(*key);
std::string tmp(user_key.data(), user_key.size());
user_comparator_->FindShortSuccessor(&tmp);
if (tmp.size() < user_key.size() &&
user_comparator_->Compare(user_key, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
assert(this->Compare(*key, tmp) < 0);
key->swap(tmp);
}
}
该函数取出Internal Key
中的user key
字段,根据user
指定的comparator
找到并替换key
,如果key
被替换了,就用新的key
更新Internal Key
,并使用最大的sequence number
。
memtable
向memtable
插入记录的接口
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len =
VarintLength(internal_key_size) + internal_key_size +
VarintLength(val_size) + val_size;
char* buf = arena_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert((p + val_size) - buf == encoded_len);
table_.Insert(buf);
}
KV记录在skiplist存储格式信息,
- 总长度
VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size;
Memtable
的查找接口,根据一个LookupKey
找到响应的记录
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}
根据传入的LookupKey
得到在memtable
中存储的key
,然后调用Skip list::Iterator
的Seek
函数查找。Seek
直接调用Skip list
的FindGreaterOrEqual(key)
接口,返回大于等于key
的Iterator
。然后取出user key
判断时候和传入的user key
相同,如果相同则取出value
,如果记录的Value Type
为kTypeDeletion
,返回Status::NotFound(Slice())