读码:LevelDB - Compaction 流程
6、 & ~
6.1
ldb 数据文件,是将内存中不可变 memtable 的数据落到磁盘生成的。这些数据文件的 key 范围会有重合。leveldb 会记录每个 ldb 文件对应的 key 最小值和最大值等元信息。
class Version {
private:
// List of files per level
std::vector files_[config::kNumLevels];
}
struct FileMetaData {
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}
int refs;
int allowed_seeks; // Seeks allowed until compaction
uint64_t number;
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
};
ldb 文件的数据内容源自:遍历“不可变 memtable” 中跳表的 层节点,插入“data 块”。跳表的 层节点链表本就是有序的,第一个节点的 key 即为 最小 key,最后一个节点的 key 即为 最大 key。
Get key 检索时,对于 层,先对所有 ldb 文件的最小 key 最大 key 进行比较,匹配到所有目标 ldb 文件,并按文件 id 序号从大到小排序(文件 id 序号越大,说明文件越新),从最新的目标 ldb 文件开始检索,找到了就返回。
为了保障检索效率,leveldb 会通过 major compaction 过程尽量控制住 level0 的文件数:
- 如果 文件数超过 kL0_CompactionTrigger(=4),就可以触发 compaction。
- 如果 文件数超过 kL0_SlowdownWritesTrigger(=8),就会对写入进行限速和攒批。
- 如果 文件数超过 kL0_StopWritesTrigger(=12),写入就会停顿,等待 compaction。
6.2 ~
~ 的数据文件是从 开始逐层 compaction 而来,这些文件:
- 同一层内的文件之间 key 的区间不会出现重合,这样可以减少磁盘读取,提升检索效率;不同层的文件之间 key 区间可以重合。 compaction 的逻辑需要保障这个性质。
- 文件的内容一定比 文件的内容旧,所以如果在 层检索有了结果,就不必到 层检索。
- 在 compaction 过程中对于同一个 key 会尽可能丢掉旧的键值数据,以此,减少磁盘占用,提升检索效率。
7、Compaction
compaction 过程,作为任务,由一个独立的后台线程来执行。
compaction 主要分两 2 种:
- 1、“minor compaction” - 内存中的 “不可变 memtable” 转存到 文件。这个 compaction 优先级更高,所以一旦检测到存在“不可变 memtable”就优先处理。
- 2、“major compaction” - ~ 逐层之间的 compaction。其中 与 ~ 不同, 数据文件之间 key 区间存在重合,处理逻辑上存在特殊之处。
“minor compaction” 流程比较简单直接,后续不细说。
7.1 major compaction 触发条件
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
for (int level = 0; level < config::kNumLevels - 1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast(level_bytes) / MaxBytesForLevel(options_, level);
}
if (score > best_score) {
best_level = level;
best_score = score;
}
}
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}
static double MaxBytesForLevel(const Options* options, int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
// Result for both level-0 and level-1
double result = 10. * 1048576.0;
while (level > 1) {
result *= 10;
level--;
}
return result;
}
leveldb 打开/初始化期间或者每次 compaction 结束时都会计算下次 compaction 最应该处理的哪个 level 的数据文件以及紧迫程度(compaction_score_
),不过对于 和 其他 level 的计算逻辑有区别:
- 1、 compaction 紧迫程度,取决于文件数和 compaction 阈值(kL0_CompactionTrigger = 4)的倍数
- 2、其他 level 则取决于文件占用的存储空间和阈值的倍数,level 越大,阈值越大 - 阈值为
10. * 1048576.0
字节(10MB),相邻层之间的阈值为 10 倍关系。
bool Version::UpdateStats(const GetStats& stats) {
FileMetaData* f = stats.seek_file;
if (f != nullptr) {
f->allowed_seeks--; // allowed_seeks 初始值 1 << 30
if (f->allowed_seeks <= 0 && file_to_compact_ == nullptr) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
每次 Get 检索的最后都会对本次检索涉及的第一个数据文件进行计数,如果文件的检索次数达到上限,就伺机进行 compaction。不过这个条件触发 compaction 的优先级比较低。
7.2 major compaction 过程
不考虑一些细节优化之处,compaction 的核心流程为:
- 1、根据触发条件,确定本次 compaction 对哪个 level 的数据库文件处理,然后进一步确定从 level 和 level+1 层中选择哪些数据库文件 compaction:
- (1)、最简单的情况是, 如果该 level 是第一次做 compaction,则选择该 level 的第一个数据文件(也就是最先生成的那个),不过:
- 1)、如果之前该 level 做过 major compaction,暂存了状态(处理到哪个最大 key),那么根据暂存状态最大 key 选择下一个数据文件即可,如果没有下一个数据文件了,则从头选择该 level 的第一个数据文件(也就是最先生成的那个)。
- 2)、
seek_compaction
触发的 compaction 是针对特定数据库文件的。
- (2)、如果 level = 0,因为 的数据库文件之间 key 的范围可以重合,所以根据 (1) 选定数据文件的 key 最大值和最小值,还需要进一步确定是否应该把 的其他数据库文件包含进来。
- (3)、compaction 输出的数据库属于 level+1 层,因为 ~ 同一层的数据库文件之间 key 区间不能有重合,所以还需要根据 (1) 和 (2) 选定的数据库文件的 key 最大值和最小值,确定 level+1 层有哪些数据库文件需要加入到本次 compaction
- (1)、最简单的情况是, 如果该 level 是第一次做 compaction,则选择该 level 的第一个数据文件(也就是最先生成的那个),不过:
// A Compaction encapsulates information about a compaction.
class Compaction {
private:
int level_; // compaction 目标 level
uint64_t max_output_file_size_; // compaction 产生的新文件大小阈值,如果达到阈值,就结束 compaction
Version* input_version_; // 当前对哪个 Version 做 compaction?
VersionEdit edit_; // 每次 compaction 会产生一个新的 Version
// Each compaction reads inputs from "level_" and "level_+1"
// compaction 只会在目标层和相邻下一层之间进行
std::vector inputs_[2]; // The two sets of inputs
// State used to check for number of overlapping grandparent files
// (parent == level_ + 1, grandparent == level_ + 2)
std::vector grandparents_; // 用于判断是否可以将 level 层的文件直接移动到 level+1 层,见 bool Compaction::IsTrivialMove() const
size_t grandparent_index_; // Index in grandparent_starts_
bool seen_key_; // Some output key has been seen
int64_t overlapped_bytes_; // Bytes of overlap between current output
// and grandparent files
// State for implementing IsBaseLevelForKey
// level_ptrs_ holds indices into input_version_->levels_: our state
// is that we are positioned at one of the file ranges for each
// higher level than the ones involved in this compaction (i.e. for
// all L >= level_ + 2).
size_t level_ptrs_[config::kNumLevels];
}
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
c = new Compaction(options_, level);
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f); // 选择该 level 上次 compaction 的文件的下一个文件
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]); // 如果该 level 整个 key 空间都 compaction 过,则从头开始选择第一个文件
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return nullptr;
}
c->input_version_ = current_;
c->input_version_->Ref();
// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
SetupOtherInputs(c);
return c;
}
- 2、如果选定的输入文件,仅有一个文件,且是 level 层文件,则直接将这个文件标记在为本次 compaction 后为 level+1 层文件,从而避免了不必要的文件读写。
- 3、否则,对选定好的 level 和 level+1 层数据库文件,打开文件,创建好数据遍历读取的迭代器:
- (1)、单个文件的遍历读取实际是个两层遍历(
TwoLevelIterator
) - 第1层遍历针对“index 块”,第2层遍历针对“data 块”,根据“index 块”得到“data 块”的 offset 和 size,遍历“data 块”,获取实际的键值对数据,一个“data 块” 遍历完了,则从“index 块”获取下一个“data 块”的 offset 和 size 继续遍历。 - (2)、因为 compaction 输出的数据库文件内容也需要保持按 key 有序的性质,所以 compaction 过程实际是对多个输入文件的有序遍历。不过 数据库文件的 key 区间可以重合,所以处理方式也有所区别
- 1)、 层的多个数据库文件有序遍历,类似于 N 路归并排序算法 - 每次取 N 个迭代器的头部元素进行比较,取最小的那个元素返回。
- 2)、 ~ 层的数据库文件 key 区间不会有重合,所以比较简单,按照 key 区间先做一下排序,然后逐个文件遍历读取就可以。
- (1)、单个文件的遍历读取实际是个两层遍历(
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
ReadOptions options;
options.verify_checksums = options_->paranoid_checks;
options.fill_cache = false;
// Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level. // TODO(opt): use concatenating iterator for level-0 if there is no overlap
const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
Iterator** list = new Iterator*[space];
int num = 0;
for (int which = 0; which < 2; which++) {
if (!c->inputs_[which].empty()) {
if (c->level() + which == 0) {
const std::vector& files = c->inputs_[which];
for (size_t i = 0; i < files.size(); i++) {
list[num++] = table_cache_->NewIterator(options, files[i]->number,
files[i]->file_size);
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
&GetFileIterator, table_cache_, options);
}
}
}
assert(num <= space);
Iterator* result = NewMergingIterator(&icmp_, list, num);
delete[] list;
return result;
}
- 4、leveldb 基于的 lsm-tree(Log Structured Merge Tree) 结构,并不会对数据进行原地修改和删除 - 同 key 的多次写入(包括删除),都会在结构中增加一条键值记录,检索时以最后一次写入的内容为结果。那么如果系统 workloads 会对相同的 key 存在大量写入,就比较浪费存储空间。所以 compaction 过程中会对同一个 key 多次写入做消除合并,仅保留最后一次写入的(如果最后一次是删除操作,并且 level+2(包括)之后所有层的 key 区间都不会包含这个 key,则最后一次的删除操作也可以消除掉)。
compaction 的输入遍历是有序的,那么同一个 key 的多次写入记录在遍历时是连续的,并且
InternalKeyComparator
比较器的比较逻辑决定了后写入的会被先遍历到。以如下写入序列来解释消除合并逻辑(以 UK 表示用户写入的原始 key,以 表示带写入序列号的内部 key):> - 假设写入序列为:“PUT ” -> “PUT ”,那么 compaction 遍历序列为“PUT ” -> “PUT ”。
>> - (1) 处理 “PUT ” 时,发现前面没处理过 UK 的记录,则保留 “PUT ”记录写入输出中,同时设置状态位 - 处理过 UK 的记录
>> - (2) 处理 “PUT ” 时,发现前面处理过 UK 的记录(并且这条记录不再被任何快照依赖),则直接将 “PUT ” 记录丢弃。
> - 假设写入序列为:“PUT ” -> “DEL ”,那么 compaction 遍历序列为 “DEL ” -> “PUT ”。
>> - (1) 处理“DEL ” 时,发现前面没处理过 UK 的记录,
>>> - 1) 如果 level+2(包括)之后所有层也都不包含 UK 的记录(并且这条记录不再被任何快照依赖),则直接丢弃(从检索性能上来说,可能不丢弃更好一点?)
>>> - 2) 如果 level+2(包括)之后层中包含 UK 的记录,则保留 “DEL ”记录写到输出中,同时设置状态位 - 处理过 UK 的记录
>> - (2) 处理“PUT ” 时,发现前面处理过 UK 的记录,则直接将 “PUT ” 记录丢弃。
> - 如果发现当前处理的 IK 的 UK 与前一次处理的 IK 的 UK 不相同,则重置状态位。
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}
last_sequence_for_key = ikey.sequence;
}
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); // 先比较用户 key 部分
if (r == 0) { // 如果用户key 相同,则比较序列号,序列号大的排在前面
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}
- 5、如果 compaction 输出的数据文件大小达到阈值(2MB),则结束该文件的写入,并创建一个新的 ldb 数据文件,继续 compaction 写入。在 compaction 完成时,这些若干个输出的 ldb 数据文件会被标记为新版本的 level+1 层数据文件。
// Leveldb will write up to this amount of bytes to a file before
// switching to a new one.
// Most clients should leave this parameter alone. However if your
// filesystem is more efficient with larger files, you could
// consider increasing the value. The downside will be longer
// compactions and hence longer latency/performance hiccups.
// Another reason to increase this parameter might be when you are
// initially populating a large database.
size_t max_file_size = 2 * 1024 * 1024;
- 6、一次 compaction 完成后,输入文件的有效内容都已经写到输出文件,如果不考虑其他因素,这些输入文件可以被删除,保留输出文件即可。不过可能存在检索操作依赖这些输入文件,所以不会即刻删除这些输入文件,而是以版本信息在逻辑上维护每一层最新包含哪些文件(以及如果本次 compaction 未能完整地完成,则还需要把处理到 level 的哪个 key 为止记录到版本信息中)。所以每次 compaction 都会产生一个新的逻辑上的数据版本,将这个版本切换为 leveldb 运行时依赖的当前版本(
VersionSet::current_
),后续的检索操作就都在这个新版本所包含数据库文件中进行。另外:- 1、重新计算下次 compaction 最应该处理的哪个 level 的数据文件以及紧迫程度(
compaction_score_
) - 2、将新版本信息写入版本描述 MANIFEST 文件中(如果尚不存在,则新建一个),并且如果版本描述 MANIFEST 文件是新创建的,则将该 MANIFEST 文件名存为 CURRENT 文件。这样,如果当前 leveldb 进程挂了或者机器节点宕机,还可以基于 CURRENT 文件指向的 MANIFEST 文件中的版本信息,恢复出最新最新版本状态。
- 3、对当前不再依赖(依赖关系怎么维护的?)的 ldb 文件/MANIFEST 文件/“WAL log” 文件进行删除清理。
- 不被任何版本依赖的 ldb 文件,都可以删除
- 不被当前 CURRENT 文件指向的 MANIFEST 文件,可以删除
- “WAL log” 文件保留最新的两个即可(memtable 和不可变 memtable 对应的“WAL log”文件)
- 1、重新计算下次 compaction 最应该处理的哪个 level 的数据文件以及紧迫程度(
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->num_input_files(0), compact->compaction->level(),
compact->compaction->num_input_files(1), compact->compaction->level() + 1,
static_cast(compact->total_bytes));
// Add compaction outputs
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
void DBImpl::RemoveObsoleteFiles() {
mutex_.AssertHeld();
if (!bg_error_.ok()) {
// After a background error, we don't know whether a new version may
// or may not have been committed, so we cannot safely garbage collect.
return;
}
// Make a set of all of the live files
std::set live = pending_outputs_;
versions_->AddLiveFiles(&live);
std::vector filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
uint64_t number;
FileType type;
std::vector files_to_delete;
for (std::string& filename : filenames) {
if (ParseFileName(filename, &number, &type)) {
bool keep = true;
switch (type) {
case kLogFile:
keep = ((number >= versions_->LogNumber()) ||
(number == versions_->PrevLogNumber()));
break;
case kDescriptorFile:
// Keep my manifest file, and any newer incarnations'
// (in case there is a race that allows other incarnations)
keep = (number >= versions_->ManifestFileNumber());
break;
case kTableFile:
keep = (live.find(number) != live.end());
break;
case kTempFile:
// Any temp files that are currently being written to must
// be recorded in pending_outputs_, which is inserted into "live"
keep = (live.find(number) != live.end());
break;
case kCurrentFile:
case kDBLockFile:
case kInfoLogFile:
keep = true;
break;
}
if (!keep) {
files_to_delete.push_back(std::move(filename));
if (type == kTableFile) {
table_cache_->Evict(number);
}
Log(options_.info_log, "Delete type=%d #%lld\n", static_cast(type),
static_cast(number));
}
}
}
// While deleting all files unblock other threads. All files being deleted
// have unique names which will not collide with newly created files and
// are therefore safe to delete while allowing other threads to proceed.
mutex_.Unlock();
for (const std::string& filename : files_to_delete) {
env_->RemoveFile(dbname_ + "/" + filename);
}
mutex_.Lock();
}
8、版本
8.1 MANIFEST - descriptor log 文件
如前所述,数据版本变更信息(VersionEdit)会持久化存储到 MANIFEST 文件中。MANIFEST 文件内容的编码结构同 WAL log。
数据版本变更信息包含8个部分:
- 1、用户自定义 key 比较器的名称(如果用户配置的话)
- 2、最新 WAL log 文件的 id
- 3、前一个 WAL log 文件的 id
- 4、下一个可用的文件 id
- 5、下一个可用的序列号
- 6、每个 level 最近一次的 compaction 暂存信息(compaction 遍历的最后一个 key)
- 7、每个 level 可以删除的数据文件的元数据(compaction 的输入文件)
- 8、每个 level 新增的数据文件的元数据(compaction 的输出文件)
// Tag numbers for serialized VersionEdit. These numbers are written to
// disk and should not be changed.
enum Tag {
kComparator = 1,
kLogNumber = 2,
kNextFileNumber = 3,
kLastSequence = 4,
kCompactPointer = 5,
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9
};
void VersionEdit::EncodeTo(std::string* dst) const {
if (has_comparator_) {
PutVarint32(dst, kComparator);
PutLengthPrefixedSlice(dst, comparator_);
}
if (has_log_number_) {
PutVarint32(dst, kLogNumber);
PutVarint64(dst, log_number_);
}
if (has_prev_log_number_) {
PutVarint32(dst, kPrevLogNumber);
PutVarint64(dst, prev_log_number_);
}
if (has_next_file_number_) {
PutVarint32(dst, kNextFileNumber);
PutVarint64(dst, next_file_number_);
}
if (has_last_sequence_) {
PutVarint32(dst, kLastSequence);
PutVarint64(dst, last_sequence_);
}
for (size_t i = 0; i < compact_pointers_.size(); i++) {
PutVarint32(dst, kCompactPointer);
PutVarint32(dst, compact_pointers_[i].first); // level
PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode());
}
for (const auto& deleted_file_kvp : deleted_files_) {
PutVarint32(dst, kDeletedFile);
PutVarint32(dst, deleted_file_kvp.first); // level
PutVarint64(dst, deleted_file_kvp.second); // file number
}
for (size_t i = 0; i < new_files_.size(); i++) {
const FileMetaData& f = new_files_[i].second;
PutVarint32(dst, kNewFile);
PutVarint32(dst, new_files_[i].first); // level
PutVarint64(dst, f.number);
PutVarint64(dst, f.file_size);
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
}
}
8.2 CURRENT 文件
CURRENT 文件中存储的是最新有效的 MANIFEST 文件名。
数据库打开,恢复状态的流程为:
- 1、从 CURRENT 中获取最新有效的 MANIFEST 文件名
- 2、从最新有效的 MANIFEST 文件中读取到最后一个 VersionEdit 版本信息
- 3、根据 VersionEdit 版本信息,计算出最新可用的 Version 信息(仅包含每个 level 包含哪些数据文件),并计算该版本下一次 compaction 应该处理哪个 level 的数据文件(
compaction_level_
)和紧迫程度(compaction_score_
) - 4、将最新可用的 Version 加入到
versions_
(VersionSet
) 的版本链中,并恢复versions_
的一些状态信息,以及确定是否复用 CURRENT指向的那个 MANIFEST 文件(如果 MANIFEST 文件大小超过阈值(size_t max_file_size = 2 * 1024 * 1024
) 则不复用,而是创建一个新的) - 5、根据从“最后一个 VersionEdit 原始版本信息” 恢复到
versions_
的“前一个 WAL log 文件 id”-min_log
和“最新一个 WAL log 文件 id” -prev_log
,将文件 id 大于等于min_log
或者等于pre_log
的所有 WAL log 文件找出来,按照文件 id 从小大到排序, 然后顺序遍历解析,恢复出 memtable 内存状态,并在最后确保所有内存状态持久化落到 文件中。- “将文件 id 大于等于
min_log
或者等于pre_log
的所有 WAL log 文件找出来,按照文件 id 从小大到排序, 然后顺序遍历解析,恢复出 memtable 内存状态”:这个逻辑也意味着并不需要在创建一个新的 WAL log 文件时就产生一个新的版本,也不需要把最新版本信息持久化写到 MANIFEST 文件 中。因此也不会和 compaction 版本变更存在操作冲突。
- “将文件 id 大于等于
8.3 VersionEdit & VersionSet & Version
class VersionEdit {
public:
// Add the specified file at the specified number.
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
// REQUIRES: "smallest" and "largest" are smallest and largest keys in file
void AddFile(int level, uint64_t file, uint64_t file_size,
const InternalKey& smallest, const InternalKey& largest) {
FileMetaData f;
f.number = file;
f.file_size = file_size;
f.smallest = smallest;
f.largest = largest;
new_files_.push_back(std::make_pair(level, f));
}
// Delete the specified "file" from the specified "level".
void RemoveFile(int level, uint64_t file) {
deleted_files_.insert(std::make_pair(level, file));
}
private:
typedef std::set> DeletedFileSet;
std::string comparator_;
uint64_t log_number_;
uint64_t prev_log_number_;
uint64_t next_file_number_;
SequenceNumber last_sequence_;
bool has_comparator_;
bool has_log_number_;
bool has_prev_log_number_;
bool has_next_file_number_;
bool has_last_sequence_;
std::vector> compact_pointers_;
DeletedFileSet deleted_files_;
std::vector> new_files_;
}
VersionEdit 是 Version 版本信息处于编辑/变更的状态,是 Version 版本信息操作的一种日志结构,比如:
- compaction 过程涉及对最新版本的两个 level 可删除文件和新增文件的记录,以及 compaction 暂存状态的记录。
- 持久化记录到 MANIFEST 文件中,用于恢复数据库的状态。
class VersionSet {
public:
VersionSet(const std::string& dbname, const Options* options,
TableCache* table_cache, const InternalKeyComparator*);
// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
// current version. Will release *mu while actually writing to the file.
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(VersionEdit* edit, port::Mutex* mu)
EXCLUSIVE_LOCKS_REQUIRED(mu);
// Recover the last saved descriptor from persistent storage.
Status Recover(bool* save_manifest);
// Return the current version.
Version* current() const { return current_; }
// Return the current manifest file number
uint64_t ManifestFileNumber() const { return manifest_file_number_; }
// Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_++; }
// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
Compaction* PickCompaction();
// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.
Iterator* MakeInputIterator(Compaction* c);
private:
Env* const env_;
const std::string dbname_;
const Options* const options_;
TableCache* const table_cache_;
const InternalKeyComparator icmp_;
uint64_t next_file_number_;
uint64_t manifest_file_number_;
uint64_t last_sequence_;
uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// Opened lazily
WritableFile* descriptor_file_;
log::Writer* descriptor_log_;
Version dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions_.prev_
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];
}
VersionSet 对象维护着版本双向链表以及其他必要信息,用于支持 compaction 操作、对 ~ 的检索操作 等。
class Version {
public:
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
// REQUIRES: lock is not held
Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
GetStats* stats);
private:
Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
int refs_; // Number of live refs to this version
// List of files per level
std::vector files_[config::kNumLevels];
// Next file to compact based on seek stats.
FileMetaData* file_to_compact_;
int file_to_compact_level_;
// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().
double compaction_score_;
int compaction_level_;
}
compaction 的输入文件在 compaction 之后本来是可以删除的,但是这些文件可能被检索/遍历等操作所依赖引用,所以基于 Version 版本来解决文件粒度的并发操作冲突。只有当一个版本不被任何操作所引用时,才会被释放,只有这个版本依赖的数据文件也才可以被删除。
和 leveldb 中很多类实现的对象生命周期维护方式一样, Version 也采用引用计数的方式来维护生命周期,如果版本对象的引用计数归零,则自动析构自己,析构的逻辑包括从版本双链表中移除自己(这里可能存在并发冲突吗?),以及移除对各 level 文件元信息的引用。
void Version::Ref() { ++refs_; }
void Version::Unref() {
assert(this != &vset_->dummy_versions_);
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
delete this;
}
}
Version::~Version() {
assert(refs_ == 0);
// Remove from linked list
prev_->next_ = next_;
next_->prev_ = prev_;
// Drop references to files
for (int level = 0; level < config::kNumLevels; level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
}