#MemTable
在levelDB中所有KV数据都是存储在Memtable,Immutable Memtable和SSTable中的,Immutable Memtable从结构上讲和Memtable是完全一样的,区别仅仅在于其是只读的,不允许写入操作,而Memtable则是允许写入和读取的。当Memtable写入的数据占用内存到达指定数量,则自动转换为Immutable Memtable,等待Dump到磁盘中,系统会自动生成新的Memtable供写操作写入新数据,理解了Memtable,那么Immutable Memtable自然不在话下。
LevelDb的MemTable提供了将KV数据写入,删除以及读取KV记录的操作接口,但是事实上Memtable并不存在真正的删除操作,删除某个Key的Value在Memtable内是作为插入一条记录实施的,但是会打上一个Key的删除标记,真正的删除操作是Lazy的,会在以后的Compaction过程中去掉这个KV。 需要注意的是,LevelDb的Memtable中KV对是根据Key大小有序存储的,在系统插入新的KV时,LevelDb要把这个KV插到合适的位置上以保持这种Key有序性。其实,LevelDb的Memtable类只是一个接口类,真正的操作是通过背后的SkipList来做的,包括插入操作和读取操作等,所以Memtable的核心数据结构是一个SkipList。
#MemTable 声明
db数据在内存中的存储格式。写操作的数据都会先写到memtable中。memtable的size 有限制最大值(write_buffer_size)。memtable的实现是skiplist,当一个memtable size达到阀值时,会变成只读的memtable(immutable memtable),同时生成一个新的 memtable供新的写入。后台的compact进程会负责将immutable memtable dump 成sstable。所以,同时最多会存在两个memtable(正在写的memtable和immutable memtable)。
class MemTable {
public:
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator);
// Increase reference count.
void Ref() { ++refs_; }
// Drop reference count. Delete if no more references exist.
void Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
delete this;
}
}
// Returns an estimate of the number of bytes of data in use by this
// data structure.
//
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
size_t ApproximateMemoryUsage();
// Return an iterator that yields the contents of the memtable.
//
// The caller must ensure that the underlying MemTable remains live
// while the returned iterator is live. The keys returned by this
// iterator are internal keys encoded by AppendInternalKey in the
// db/format.{h,cc} module.
Iterator* NewIterator();
// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
// Typically value will be empty if type==kTypeDeletion.
void Add(SequenceNumber seq, ValueType type,
const Slice& key,
const Slice& value);
// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s);
private:
~MemTable(); // Private since only Unref() should be used to delete it
struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
int operator()(const char* a, const char* b) const;
};
friend class MemTableIterator;
friend class MemTableBackwardIterator;
typedef SkipList<const char*, KeyComparator> Table;
KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_;
// No copying allowed
MemTable(const MemTable&);
void operator=(const MemTable&);
};
MemTable只是一层封装,实际的数据结构是一个SkipListtypedef SkipList<const char*, KeyComparator> Table;
MemTable有一个公有的构造函数,Ref()和unRef()
和一个返回迭代器的成员函数,除此之外,就只有3个公有的成员函数了,分别是:
- size_t ApproximateMemoryUsage();
- void Add(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value);
- bool Get(const LookupKey& key, std::string* value, Status* s);
其中,第一个成员函数只是简单的封装,最后调用的是Arena中的成员函数
size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); }
Arena是一个内存管理类,可以参考这里。
下面来分析另外两个成员函数:
-
void Add(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value);
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, const Slice& value) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] // value_size : varint32 of value.size() // value bytes : char[value.size()] size_t key_size = key.size(); size_t val_size = value.size(); size_t internal_key_size = key_size + 8; const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; char* buf = arena_.Allocate(encoded_len); char* p = EncodeVarint32(buf, internal_key_size); memcpy(p, key.data(), key_size); p += key_size; EncodeFixed64(p, (s << 8) | type); p += 8; p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert((p + val_size) - buf == encoded_len); table_.Insert(buf); }
MemTable::add只是简单的将SequenceNumber和ValueType 以及消息编码成一个字符串,存放在buf数组中,然后调用table.Insert(buf)插入数据。
这里要解释的是buf的内容: 它包括
其中,internal_key只是一个结构体,封装了key,SequenceNumber和Type; 因为SequenceNumber和Type一起存放在一个64位的整型里面,所以才有:
internal_key_size = key.size + 8
该函数里的其他语句就是将internal_key_size和val.isze()编码成varint ,然后把数据依次存放到buf中。
-
bool Get(const LookupKey& key, std::string* value, Status* s);
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); if (iter.Valid()) { // entry format is: // klength varint32 // userkey char[klength] // tag uint64 // vlength varint32 // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. const char* entry = iter.key(); uint32_t key_length; const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length); if (comparator_.comparator.user_comparator()->Compare( Slice(key_ptr, key_length - 8), key.user_key()) == 0) { // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast<ValueType>(tag & 0xff)) { case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); value->assign(v.data(), v.size()); return true; } case kTypeDeletion: *s = Status::NotFound(Slice()); return true; } } } return false; }
理解MemTable 更多的是理解消息封装和编码,在MemTable::Get 函数中又出现了LookupKey ,我们暂时不要管他,先看函数体。 函数体通过在Table 中查找key,找到就返回一个迭代器,然后获取迭代器的值,解码得到信息。
通过下面的语句获取internal_key_size ,因为varint 最多不会超过5个字节,所以有起点是entry 终点是 entry + 5,具体怎么在一个字符串数组中获取一个int ,请参考这里。
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
再次比较返回的key与user_key 是否相同,这里之所以会有 key_length - 8 是因为key_length包含了key.size()和SequenceNumber(8个字节,这8个字节里前7个字节是序列号,最后一个字节是消息类型)。
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0) {
// Correct user key
对key后面的8个字节解码,然后获取这8个字节的最后一个字节,判断消息是什么类型:
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
如果消息不是kTypeDeletion类型,那么消息就是有效的,接下来就获取value的size,再获取value的值。