]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/version_set.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / version_set.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/version_set.h"
11
12 #ifndef __STDC_FORMAT_MACROS
13 #define __STDC_FORMAT_MACROS
14 #endif
15
16 #include <inttypes.h>
17 #include <stdio.h>
18 #include <algorithm>
19 #include <list>
20 #include <map>
21 #include <set>
22 #include <string>
23 #include <unordered_map>
24 #include <vector>
25 #include "db/compaction.h"
26 #include "db/internal_stats.h"
27 #include "db/log_reader.h"
28 #include "db/log_writer.h"
29 #include "db/memtable.h"
30 #include "db/merge_context.h"
31 #include "db/merge_helper.h"
32 #include "db/pinned_iterators_manager.h"
33 #include "db/table_cache.h"
34 #include "db/version_builder.h"
35 #include "monitoring/file_read_sample.h"
36 #include "monitoring/perf_context_imp.h"
37 #include "rocksdb/env.h"
38 #include "rocksdb/merge_operator.h"
39 #include "rocksdb/write_buffer_manager.h"
40 #include "table/format.h"
41 #include "table/get_context.h"
42 #include "table/internal_iterator.h"
43 #include "table/merging_iterator.h"
44 #include "table/meta_blocks.h"
45 #include "table/plain_table_factory.h"
46 #include "table/table_reader.h"
47 #include "table/two_level_iterator.h"
48 #include "util/coding.h"
49 #include "util/file_reader_writer.h"
50 #include "util/filename.h"
51 #include "util/stop_watch.h"
52 #include "util/string_util.h"
53 #include "util/sync_point.h"
54 #include "util/user_comparator_wrapper.h"
55
56 namespace rocksdb {
57
58 namespace {
59
60 // Find File in LevelFilesBrief data structure
61 // Within an index range defined by left and right
62 int FindFileInRange(const InternalKeyComparator& icmp,
63 const LevelFilesBrief& file_level,
64 const Slice& key,
65 uint32_t left,
66 uint32_t right) {
67 auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
68 return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
69 };
70 const auto &b = file_level.files;
71 return static_cast<int>(std::lower_bound(b + left,
72 b + right, key, cmp) - b);
73 }
74
75 Status OverlapWithIterator(const Comparator* ucmp,
76 const Slice& smallest_user_key,
77 const Slice& largest_user_key,
78 InternalIterator* iter,
79 bool* overlap) {
80 InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
81 kValueTypeForSeek);
82 iter->Seek(range_start.Encode());
83 if (!iter->status().ok()) {
84 return iter->status();
85 }
86
87 *overlap = false;
88 if (iter->Valid()) {
89 ParsedInternalKey seek_result;
90 if (!ParseInternalKey(iter->key(), &seek_result)) {
91 return Status::Corruption("DB have corrupted keys");
92 }
93
94 if (ucmp->Compare(seek_result.user_key, largest_user_key) <= 0) {
95 *overlap = true;
96 }
97 }
98
99 return iter->status();
100 }
101
102 // Class to help choose the next file to search for the particular key.
103 // Searches and returns files level by level.
104 // We can search level-by-level since entries never hop across
105 // levels. Therefore we are guaranteed that if we find data
106 // in a smaller level, later levels are irrelevant (unless we
107 // are MergeInProgress).
108 class FilePicker {
109 public:
110 FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
111 const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
112 unsigned int num_levels, FileIndexer* file_indexer,
113 const Comparator* user_comparator,
114 const InternalKeyComparator* internal_comparator)
115 : num_levels_(num_levels),
116 curr_level_(static_cast<unsigned int>(-1)),
117 returned_file_level_(static_cast<unsigned int>(-1)),
118 hit_file_level_(static_cast<unsigned int>(-1)),
119 search_left_bound_(0),
120 search_right_bound_(FileIndexer::kLevelMaxIndex),
121 #ifndef NDEBUG
122 files_(files),
123 #endif
124 level_files_brief_(file_levels),
125 is_hit_file_last_in_level_(false),
126 curr_file_level_(nullptr),
127 user_key_(user_key),
128 ikey_(ikey),
129 file_indexer_(file_indexer),
130 user_comparator_(user_comparator),
131 internal_comparator_(internal_comparator) {
132 #ifdef NDEBUG
133 (void)files;
134 #endif
135 // Setup member variables to search first level.
136 search_ended_ = !PrepareNextLevel();
137 if (!search_ended_) {
138 // Prefetch Level 0 table data to avoid cache miss if possible.
139 for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
140 auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
141 if (r) {
142 r->Prepare(ikey);
143 }
144 }
145 }
146 }
147
148 int GetCurrentLevel() const { return curr_level_; }
149
150 FdWithKeyRange* GetNextFile() {
151 while (!search_ended_) { // Loops over different levels.
152 while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
153 // Loops over all files in current level.
154 FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
155 hit_file_level_ = curr_level_;
156 is_hit_file_last_in_level_ =
157 curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
158 int cmp_largest = -1;
159
160 // Do key range filtering of files or/and fractional cascading if:
161 // (1) not all the files are in level 0, or
162 // (2) there are more than 3 current level files
163 // If there are only 3 or less current level files in the system, we skip
164 // the key range filtering. In this case, more likely, the system is
165 // highly tuned to minimize number of tables queried by each query,
166 // so it is unlikely that key range filtering is more efficient than
167 // querying the files.
168 if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
169 // Check if key is within a file's range. If search left bound and
170 // right bound point to the same find, we are sure key falls in
171 // range.
172 assert(
173 curr_level_ == 0 ||
174 curr_index_in_curr_level_ == start_index_in_curr_level_ ||
175 user_comparator_->Compare(user_key_,
176 ExtractUserKey(f->smallest_key)) <= 0);
177
178 int cmp_smallest = user_comparator_->Compare(user_key_,
179 ExtractUserKey(f->smallest_key));
180 if (cmp_smallest >= 0) {
181 cmp_largest = user_comparator_->Compare(user_key_,
182 ExtractUserKey(f->largest_key));
183 }
184
185 // Setup file search bound for the next level based on the
186 // comparison results
187 if (curr_level_ > 0) {
188 file_indexer_->GetNextLevelIndex(curr_level_,
189 curr_index_in_curr_level_,
190 cmp_smallest, cmp_largest,
191 &search_left_bound_,
192 &search_right_bound_);
193 }
194 // Key falls out of current file's range
195 if (cmp_smallest < 0 || cmp_largest > 0) {
196 if (curr_level_ == 0) {
197 ++curr_index_in_curr_level_;
198 continue;
199 } else {
200 // Search next level.
201 break;
202 }
203 }
204 }
205 #ifndef NDEBUG
206 // Sanity check to make sure that the files are correctly sorted
207 if (prev_file_) {
208 if (curr_level_ != 0) {
209 int comp_sign = internal_comparator_->Compare(
210 prev_file_->largest_key, f->smallest_key);
211 assert(comp_sign < 0);
212 } else {
213 // level == 0, the current file cannot be newer than the previous
214 // one. Use compressed data structure, has no attribute seqNo
215 assert(curr_index_in_curr_level_ > 0);
216 assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
217 files_[0][curr_index_in_curr_level_-1]));
218 }
219 }
220 prev_file_ = f;
221 #endif
222 returned_file_level_ = curr_level_;
223 if (curr_level_ > 0 && cmp_largest < 0) {
224 // No more files to search in this level.
225 search_ended_ = !PrepareNextLevel();
226 } else {
227 ++curr_index_in_curr_level_;
228 }
229 return f;
230 }
231 // Start searching next level.
232 search_ended_ = !PrepareNextLevel();
233 }
234 // Search ended.
235 return nullptr;
236 }
237
238 // getter for current file level
239 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
240 unsigned int GetHitFileLevel() { return hit_file_level_; }
241
242 // Returns true if the most recent "hit file" (i.e., one returned by
243 // GetNextFile()) is at the last index in its level.
244 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
245
246 private:
247 unsigned int num_levels_;
248 unsigned int curr_level_;
249 unsigned int returned_file_level_;
250 unsigned int hit_file_level_;
251 int32_t search_left_bound_;
252 int32_t search_right_bound_;
253 #ifndef NDEBUG
254 std::vector<FileMetaData*>* files_;
255 #endif
256 autovector<LevelFilesBrief>* level_files_brief_;
257 bool search_ended_;
258 bool is_hit_file_last_in_level_;
259 LevelFilesBrief* curr_file_level_;
260 unsigned int curr_index_in_curr_level_;
261 unsigned int start_index_in_curr_level_;
262 Slice user_key_;
263 Slice ikey_;
264 FileIndexer* file_indexer_;
265 const Comparator* user_comparator_;
266 const InternalKeyComparator* internal_comparator_;
267 #ifndef NDEBUG
268 FdWithKeyRange* prev_file_;
269 #endif
270
271 // Setup local variables to search next level.
272 // Returns false if there are no more levels to search.
273 bool PrepareNextLevel() {
274 curr_level_++;
275 while (curr_level_ < num_levels_) {
276 curr_file_level_ = &(*level_files_brief_)[curr_level_];
277 if (curr_file_level_->num_files == 0) {
278 // When current level is empty, the search bound generated from upper
279 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
280 // also empty.
281 assert(search_left_bound_ == 0);
282 assert(search_right_bound_ == -1 ||
283 search_right_bound_ == FileIndexer::kLevelMaxIndex);
284 // Since current level is empty, it will need to search all files in
285 // the next level
286 search_left_bound_ = 0;
287 search_right_bound_ = FileIndexer::kLevelMaxIndex;
288 curr_level_++;
289 continue;
290 }
291
292 // Some files may overlap each other. We find
293 // all files that overlap user_key and process them in order from
294 // newest to oldest. In the context of merge-operator, this can occur at
295 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
296 // are always compacted into a single entry).
297 int32_t start_index;
298 if (curr_level_ == 0) {
299 // On Level-0, we read through all files to check for overlap.
300 start_index = 0;
301 } else {
302 // On Level-n (n>=1), files are sorted. Binary search to find the
303 // earliest file whose largest key >= ikey. Search left bound and
304 // right bound are used to narrow the range.
305 if (search_left_bound_ <= search_right_bound_) {
306 if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
307 search_right_bound_ =
308 static_cast<int32_t>(curr_file_level_->num_files) - 1;
309 }
310 // `search_right_bound_` is an inclusive upper-bound, but since it was
311 // determined based on user key, it is still possible the lookup key
312 // falls to the right of `search_right_bound_`'s corresponding file.
313 // So, pass a limit one higher, which allows us to detect this case.
314 start_index =
315 FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
316 static_cast<uint32_t>(search_left_bound_),
317 static_cast<uint32_t>(search_right_bound_) + 1);
318 if (start_index == search_right_bound_ + 1) {
319 // `ikey_` comes after `search_right_bound_`. The lookup key does
320 // not exist on this level, so let's skip this level and do a full
321 // binary search on the next level.
322 search_left_bound_ = 0;
323 search_right_bound_ = FileIndexer::kLevelMaxIndex;
324 curr_level_++;
325 continue;
326 }
327 } else {
328 // search_left_bound > search_right_bound, key does not exist in
329 // this level. Since no comparison is done in this level, it will
330 // need to search all files in the next level.
331 search_left_bound_ = 0;
332 search_right_bound_ = FileIndexer::kLevelMaxIndex;
333 curr_level_++;
334 continue;
335 }
336 }
337 start_index_in_curr_level_ = start_index;
338 curr_index_in_curr_level_ = start_index;
339 #ifndef NDEBUG
340 prev_file_ = nullptr;
341 #endif
342 return true;
343 }
344 // curr_level_ = num_levels_. So, no more levels to search.
345 return false;
346 }
347 };
348 } // anonymous namespace
349
350 VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
351
352 Version::~Version() {
353 assert(refs_ == 0);
354
355 // Remove from linked list
356 prev_->next_ = next_;
357 next_->prev_ = prev_;
358
359 // Drop references to files
360 for (int level = 0; level < storage_info_.num_levels_; level++) {
361 for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
362 FileMetaData* f = storage_info_.files_[level][i];
363 assert(f->refs > 0);
364 f->refs--;
365 if (f->refs <= 0) {
366 assert(cfd_ != nullptr);
367 uint32_t path_id = f->fd.GetPathId();
368 assert(path_id < cfd_->ioptions()->cf_paths.size());
369 vset_->obsolete_files_.push_back(
370 ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
371 }
372 }
373 }
374 }
375
376 int FindFile(const InternalKeyComparator& icmp,
377 const LevelFilesBrief& file_level,
378 const Slice& key) {
379 return FindFileInRange(icmp, file_level, key, 0,
380 static_cast<uint32_t>(file_level.num_files));
381 }
382
383 void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
384 const std::vector<FileMetaData*>& files,
385 Arena* arena) {
386 assert(file_level);
387 assert(arena);
388
389 size_t num = files.size();
390 file_level->num_files = num;
391 char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
392 file_level->files = new (mem)FdWithKeyRange[num];
393
394 for (size_t i = 0; i < num; i++) {
395 Slice smallest_key = files[i]->smallest.Encode();
396 Slice largest_key = files[i]->largest.Encode();
397
398 // Copy key slice to sequential memory
399 size_t smallest_size = smallest_key.size();
400 size_t largest_size = largest_key.size();
401 mem = arena->AllocateAligned(smallest_size + largest_size);
402 memcpy(mem, smallest_key.data(), smallest_size);
403 memcpy(mem + smallest_size, largest_key.data(), largest_size);
404
405 FdWithKeyRange& f = file_level->files[i];
406 f.fd = files[i]->fd;
407 f.file_metadata = files[i];
408 f.smallest_key = Slice(mem, smallest_size);
409 f.largest_key = Slice(mem + smallest_size, largest_size);
410 }
411 }
412
413 static bool AfterFile(const Comparator* ucmp,
414 const Slice* user_key, const FdWithKeyRange* f) {
415 // nullptr user_key occurs before all keys and is therefore never after *f
416 return (user_key != nullptr &&
417 ucmp->Compare(*user_key, ExtractUserKey(f->largest_key)) > 0);
418 }
419
420 static bool BeforeFile(const Comparator* ucmp,
421 const Slice* user_key, const FdWithKeyRange* f) {
422 // nullptr user_key occurs after all keys and is therefore never before *f
423 return (user_key != nullptr &&
424 ucmp->Compare(*user_key, ExtractUserKey(f->smallest_key)) < 0);
425 }
426
427 bool SomeFileOverlapsRange(
428 const InternalKeyComparator& icmp,
429 bool disjoint_sorted_files,
430 const LevelFilesBrief& file_level,
431 const Slice* smallest_user_key,
432 const Slice* largest_user_key) {
433 const Comparator* ucmp = icmp.user_comparator();
434 if (!disjoint_sorted_files) {
435 // Need to check against all files
436 for (size_t i = 0; i < file_level.num_files; i++) {
437 const FdWithKeyRange* f = &(file_level.files[i]);
438 if (AfterFile(ucmp, smallest_user_key, f) ||
439 BeforeFile(ucmp, largest_user_key, f)) {
440 // No overlap
441 } else {
442 return true; // Overlap
443 }
444 }
445 return false;
446 }
447
448 // Binary search over file list
449 uint32_t index = 0;
450 if (smallest_user_key != nullptr) {
451 // Find the leftmost possible internal key for smallest_user_key
452 InternalKey small;
453 small.SetMinPossibleForUserKey(*smallest_user_key);
454 index = FindFile(icmp, file_level, small.Encode());
455 }
456
457 if (index >= file_level.num_files) {
458 // beginning of range is after all files, so no overlap.
459 return false;
460 }
461
462 return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
463 }
464
465 namespace {
466
467 class LevelIterator final : public InternalIterator {
468 public:
469 LevelIterator(
470 TableCache* table_cache, const ReadOptions& read_options,
471 const EnvOptions& env_options, const InternalKeyComparator& icomparator,
472 const LevelFilesBrief* flevel, const SliceTransform* prefix_extractor,
473 bool should_sample, HistogramImpl* file_read_hist, bool for_compaction,
474 bool skip_filters, int level, RangeDelAggregator* range_del_agg,
475 const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries =
476 nullptr)
477 : table_cache_(table_cache),
478 read_options_(read_options),
479 env_options_(env_options),
480 icomparator_(icomparator),
481 user_comparator_(icomparator.user_comparator()),
482 flevel_(flevel),
483 prefix_extractor_(prefix_extractor),
484 file_read_hist_(file_read_hist),
485 should_sample_(should_sample),
486 for_compaction_(for_compaction),
487 skip_filters_(skip_filters),
488 file_index_(flevel_->num_files),
489 level_(level),
490 range_del_agg_(range_del_agg),
491 pinned_iters_mgr_(nullptr),
492 compaction_boundaries_(compaction_boundaries) {
493 // Empty level is not supported.
494 assert(flevel_ != nullptr && flevel_->num_files > 0);
495 }
496
497 ~LevelIterator() override { delete file_iter_.Set(nullptr); }
498
499 void Seek(const Slice& target) override;
500 void SeekForPrev(const Slice& target) override;
501 void SeekToFirst() override;
502 void SeekToLast() override;
503 void Next() override;
504 void Prev() override;
505
506 bool Valid() const override { return file_iter_.Valid(); }
507 Slice key() const override {
508 assert(Valid());
509 return file_iter_.key();
510 }
511 Slice value() const override {
512 assert(Valid());
513 return file_iter_.value();
514 }
515 Status status() const override {
516 return file_iter_.iter() ? file_iter_.status() : Status::OK();
517 }
518 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
519 pinned_iters_mgr_ = pinned_iters_mgr;
520 if (file_iter_.iter()) {
521 file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
522 }
523 }
524 bool IsKeyPinned() const override {
525 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
526 file_iter_.iter() && file_iter_.IsKeyPinned();
527 }
528 bool IsValuePinned() const override {
529 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
530 file_iter_.iter() && file_iter_.IsValuePinned();
531 }
532
533 private:
534 void SkipEmptyFileForward();
535 void SkipEmptyFileBackward();
536 void SetFileIterator(InternalIterator* iter);
537 void InitFileIterator(size_t new_file_index);
538
539 const Slice& file_smallest_key(size_t file_index) {
540 assert(file_index < flevel_->num_files);
541 return flevel_->files[file_index].smallest_key;
542 }
543
544 bool KeyReachedUpperBound(const Slice& internal_key) {
545 return read_options_.iterate_upper_bound != nullptr &&
546 user_comparator_.Compare(ExtractUserKey(internal_key),
547 *read_options_.iterate_upper_bound) >= 0;
548 }
549
550 InternalIterator* NewFileIterator() {
551 assert(file_index_ < flevel_->num_files);
552 auto file_meta = flevel_->files[file_index_];
553 if (should_sample_) {
554 sample_file_read_inc(file_meta.file_metadata);
555 }
556
557 const InternalKey* smallest_compaction_key = nullptr;
558 const InternalKey* largest_compaction_key = nullptr;
559 if (compaction_boundaries_ != nullptr) {
560 smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
561 largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
562 }
563 return table_cache_->NewIterator(
564 read_options_, env_options_, icomparator_, *file_meta.file_metadata,
565 range_del_agg_, prefix_extractor_,
566 nullptr /* don't need reference to table */,
567 file_read_hist_, for_compaction_, nullptr /* arena */, skip_filters_,
568 level_, smallest_compaction_key, largest_compaction_key);
569 }
570
571 TableCache* table_cache_;
572 const ReadOptions read_options_;
573 const EnvOptions& env_options_;
574 const InternalKeyComparator& icomparator_;
575 const UserComparatorWrapper user_comparator_;
576 const LevelFilesBrief* flevel_;
577 mutable FileDescriptor current_value_;
578 const SliceTransform* prefix_extractor_;
579
580 HistogramImpl* file_read_hist_;
581 bool should_sample_;
582 bool for_compaction_;
583 bool skip_filters_;
584 size_t file_index_;
585 int level_;
586 RangeDelAggregator* range_del_agg_;
587 IteratorWrapper file_iter_; // May be nullptr
588 PinnedIteratorsManager* pinned_iters_mgr_;
589
590 // To be propagated to RangeDelAggregator in order to safely truncate range
591 // tombstones.
592 const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
593 };
594
595 void LevelIterator::Seek(const Slice& target) {
596 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
597
598 InitFileIterator(new_file_index);
599 if (file_iter_.iter() != nullptr) {
600 file_iter_.Seek(target);
601 }
602 SkipEmptyFileForward();
603 }
604
605 void LevelIterator::SeekForPrev(const Slice& target) {
606 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
607 if (new_file_index >= flevel_->num_files) {
608 new_file_index = flevel_->num_files - 1;
609 }
610
611 InitFileIterator(new_file_index);
612 if (file_iter_.iter() != nullptr) {
613 file_iter_.SeekForPrev(target);
614 SkipEmptyFileBackward();
615 }
616 }
617
618 void LevelIterator::SeekToFirst() {
619 InitFileIterator(0);
620 if (file_iter_.iter() != nullptr) {
621 file_iter_.SeekToFirst();
622 }
623 SkipEmptyFileForward();
624 }
625
626 void LevelIterator::SeekToLast() {
627 InitFileIterator(flevel_->num_files - 1);
628 if (file_iter_.iter() != nullptr) {
629 file_iter_.SeekToLast();
630 }
631 SkipEmptyFileBackward();
632 }
633
634 void LevelIterator::Next() {
635 assert(Valid());
636 file_iter_.Next();
637 SkipEmptyFileForward();
638 }
639
640 void LevelIterator::Prev() {
641 assert(Valid());
642 file_iter_.Prev();
643 SkipEmptyFileBackward();
644 }
645
646 void LevelIterator::SkipEmptyFileForward() {
647 while (file_iter_.iter() == nullptr ||
648 (!file_iter_.Valid() && file_iter_.status().ok() &&
649 !file_iter_.iter()->IsOutOfBound())) {
650 // Move to next file
651 if (file_index_ >= flevel_->num_files - 1) {
652 // Already at the last file
653 SetFileIterator(nullptr);
654 return;
655 }
656 if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
657 SetFileIterator(nullptr);
658 return;
659 }
660 InitFileIterator(file_index_ + 1);
661 if (file_iter_.iter() != nullptr) {
662 file_iter_.SeekToFirst();
663 }
664 }
665 }
666
667 void LevelIterator::SkipEmptyFileBackward() {
668 while (file_iter_.iter() == nullptr ||
669 (!file_iter_.Valid() && file_iter_.status().ok())) {
670 // Move to previous file
671 if (file_index_ == 0) {
672 // Already the first file
673 SetFileIterator(nullptr);
674 return;
675 }
676 InitFileIterator(file_index_ - 1);
677 if (file_iter_.iter() != nullptr) {
678 file_iter_.SeekToLast();
679 }
680 }
681 }
682
683 void LevelIterator::SetFileIterator(InternalIterator* iter) {
684 if (pinned_iters_mgr_ && iter) {
685 iter->SetPinnedItersMgr(pinned_iters_mgr_);
686 }
687
688 InternalIterator* old_iter = file_iter_.Set(iter);
689 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
690 pinned_iters_mgr_->PinIterator(old_iter);
691 } else {
692 delete old_iter;
693 }
694 }
695
696 void LevelIterator::InitFileIterator(size_t new_file_index) {
697 if (new_file_index >= flevel_->num_files) {
698 file_index_ = new_file_index;
699 SetFileIterator(nullptr);
700 return;
701 } else {
702 // If the file iterator shows incomplete, we try it again if users seek
703 // to the same file, as this time we may go to a different data block
704 // which is cached in block cache.
705 //
706 if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
707 new_file_index == file_index_) {
708 // file_iter_ is already constructed with this iterator, so
709 // no need to change anything
710 } else {
711 file_index_ = new_file_index;
712 InternalIterator* iter = NewFileIterator();
713 SetFileIterator(iter);
714 }
715 }
716 }
717 } // anonymous namespace
718
719 // A wrapper of version builder which references the current version in
720 // constructor and unref it in the destructor.
721 // Both of the constructor and destructor need to be called inside DB Mutex.
722 class BaseReferencedVersionBuilder {
723 public:
724 explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
725 : version_builder_(new VersionBuilder(
726 cfd->current()->version_set()->env_options(), cfd->table_cache(),
727 cfd->current()->storage_info(), cfd->ioptions()->info_log)),
728 version_(cfd->current()) {
729 version_->Ref();
730 }
731 ~BaseReferencedVersionBuilder() {
732 version_->Unref();
733 }
734 VersionBuilder* version_builder() { return version_builder_.get(); }
735
736 private:
737 std::unique_ptr<VersionBuilder> version_builder_;
738 Version* version_;
739 };
740
741 Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
742 const FileMetaData* file_meta,
743 const std::string* fname) const {
744 auto table_cache = cfd_->table_cache();
745 auto ioptions = cfd_->ioptions();
746 Status s = table_cache->GetTableProperties(
747 env_options_, cfd_->internal_comparator(), file_meta->fd, tp,
748 mutable_cf_options_.prefix_extractor.get(), true /* no io */);
749 if (s.ok()) {
750 return s;
751 }
752
753 // We only ignore error type `Incomplete` since it's by design that we
754 // disallow table when it's not in table cache.
755 if (!s.IsIncomplete()) {
756 return s;
757 }
758
759 // 2. Table is not present in table cache, we'll read the table properties
760 // directly from the properties block in the file.
761 std::unique_ptr<RandomAccessFile> file;
762 std::string file_name;
763 if (fname != nullptr) {
764 file_name = *fname;
765 } else {
766 file_name =
767 TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
768 file_meta->fd.GetPathId());
769 }
770 s = ioptions->env->NewRandomAccessFile(file_name, &file, env_options_);
771 if (!s.ok()) {
772 return s;
773 }
774
775 TableProperties* raw_table_properties;
776 // By setting the magic number to kInvalidTableMagicNumber, we can by
777 // pass the magic number check in the footer.
778 std::unique_ptr<RandomAccessFileReader> file_reader(
779 new RandomAccessFileReader(
780 std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
781 0 /* hist_type */, nullptr /* file_read_hist */,
782 nullptr /* rate_limiter */, false /* for_compaction*/,
783 ioptions->listeners));
784 s = ReadTableProperties(
785 file_reader.get(), file_meta->fd.GetFileSize(),
786 Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
787 &raw_table_properties, false /* compression_type_missing */);
788 if (!s.ok()) {
789 return s;
790 }
791 RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
792
793 *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
794 return s;
795 }
796
797 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
798 Status s;
799 for (int level = 0; level < storage_info_.num_levels_; level++) {
800 s = GetPropertiesOfAllTables(props, level);
801 if (!s.ok()) {
802 return s;
803 }
804 }
805
806 return Status::OK();
807 }
808
809 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
810 int level) {
811 for (const auto& file_meta : storage_info_.files_[level]) {
812 auto fname =
813 TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
814 file_meta->fd.GetPathId());
815 // 1. If the table is already present in table cache, load table
816 // properties from there.
817 std::shared_ptr<const TableProperties> table_properties;
818 Status s = GetTableProperties(&table_properties, file_meta, &fname);
819 if (s.ok()) {
820 props->insert({fname, table_properties});
821 } else {
822 return s;
823 }
824 }
825
826 return Status::OK();
827 }
828
829 Status Version::GetPropertiesOfTablesInRange(
830 const Range* range, std::size_t n, TablePropertiesCollection* props) const {
831 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
832 for (decltype(n) i = 0; i < n; i++) {
833 // Convert user_key into a corresponding internal key.
834 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
835 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
836 std::vector<FileMetaData*> files;
837 storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
838 false);
839 for (const auto& file_meta : files) {
840 auto fname =
841 TableFileName(cfd_->ioptions()->cf_paths,
842 file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
843 if (props->count(fname) == 0) {
844 // 1. If the table is already present in table cache, load table
845 // properties from there.
846 std::shared_ptr<const TableProperties> table_properties;
847 Status s = GetTableProperties(&table_properties, file_meta, &fname);
848 if (s.ok()) {
849 props->insert({fname, table_properties});
850 } else {
851 return s;
852 }
853 }
854 }
855 }
856 }
857
858 return Status::OK();
859 }
860
861 Status Version::GetAggregatedTableProperties(
862 std::shared_ptr<const TableProperties>* tp, int level) {
863 TablePropertiesCollection props;
864 Status s;
865 if (level < 0) {
866 s = GetPropertiesOfAllTables(&props);
867 } else {
868 s = GetPropertiesOfAllTables(&props, level);
869 }
870 if (!s.ok()) {
871 return s;
872 }
873
874 auto* new_tp = new TableProperties();
875 for (const auto& item : props) {
876 new_tp->Add(*item.second);
877 }
878 tp->reset(new_tp);
879 return Status::OK();
880 }
881
882 size_t Version::GetMemoryUsageByTableReaders() {
883 size_t total_usage = 0;
884 for (auto& file_level : storage_info_.level_files_brief_) {
885 for (size_t i = 0; i < file_level.num_files; i++) {
886 total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
887 env_options_, cfd_->internal_comparator(), file_level.files[i].fd,
888 mutable_cf_options_.prefix_extractor.get());
889 }
890 }
891 return total_usage;
892 }
893
894 void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
895 assert(cf_meta);
896 assert(cfd_);
897
898 cf_meta->name = cfd_->GetName();
899 cf_meta->size = 0;
900 cf_meta->file_count = 0;
901 cf_meta->levels.clear();
902
903 auto* ioptions = cfd_->ioptions();
904 auto* vstorage = storage_info();
905
906 for (int level = 0; level < cfd_->NumberLevels(); level++) {
907 uint64_t level_size = 0;
908 cf_meta->file_count += vstorage->LevelFiles(level).size();
909 std::vector<SstFileMetaData> files;
910 for (const auto& file : vstorage->LevelFiles(level)) {
911 uint32_t path_id = file->fd.GetPathId();
912 std::string file_path;
913 if (path_id < ioptions->cf_paths.size()) {
914 file_path = ioptions->cf_paths[path_id].path;
915 } else {
916 assert(!ioptions->cf_paths.empty());
917 file_path = ioptions->cf_paths.back().path;
918 }
919 files.emplace_back(SstFileMetaData{
920 MakeTableFileName("", file->fd.GetNumber()),
921 file_path,
922 static_cast<size_t>(file->fd.GetFileSize()),
923 file->fd.smallest_seqno,
924 file->fd.largest_seqno,
925 file->smallest.user_key().ToString(),
926 file->largest.user_key().ToString(),
927 file->stats.num_reads_sampled.load(std::memory_order_relaxed),
928 file->being_compacted});
929 files.back().num_entries = file->num_entries;
930 files.back().num_deletions = file->num_deletions;
931 level_size += file->fd.GetFileSize();
932 }
933 cf_meta->levels.emplace_back(
934 level, level_size, std::move(files));
935 cf_meta->size += level_size;
936 }
937 }
938
939 uint64_t Version::GetSstFilesSize() {
940 uint64_t sst_files_size = 0;
941 for (int level = 0; level < storage_info_.num_levels_; level++) {
942 for (const auto& file_meta : storage_info_.LevelFiles(level)) {
943 sst_files_size += file_meta->fd.GetFileSize();
944 }
945 }
946 return sst_files_size;
947 }
948
949 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
950 // Estimation will be inaccurate when:
951 // (1) there exist merge keys
952 // (2) keys are directly overwritten
953 // (3) deletion on non-existing keys
954 // (4) low number of samples
955 if (current_num_samples_ == 0) {
956 return 0;
957 }
958
959 if (current_num_non_deletions_ <= current_num_deletions_) {
960 return 0;
961 }
962
963 uint64_t est = current_num_non_deletions_ - current_num_deletions_;
964
965 uint64_t file_count = 0;
966 for (int level = 0; level < num_levels_; ++level) {
967 file_count += files_[level].size();
968 }
969
970 if (current_num_samples_ < file_count) {
971 // casting to avoid overflowing
972 return
973 static_cast<uint64_t>(
974 (est * static_cast<double>(file_count) / current_num_samples_)
975 );
976 } else {
977 return est;
978 }
979 }
980
981 double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
982 int level) const {
983 assert(level < num_levels_);
984 uint64_t sum_file_size_bytes = 0;
985 uint64_t sum_data_size_bytes = 0;
986 for (auto* file_meta : files_[level]) {
987 sum_file_size_bytes += file_meta->fd.GetFileSize();
988 sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
989 }
990 if (sum_file_size_bytes == 0) {
991 return -1.0;
992 }
993 return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
994 }
995
996 void Version::AddIterators(const ReadOptions& read_options,
997 const EnvOptions& soptions,
998 MergeIteratorBuilder* merge_iter_builder,
999 RangeDelAggregator* range_del_agg) {
1000 assert(storage_info_.finalized_);
1001
1002 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1003 AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
1004 range_del_agg);
1005 }
1006 }
1007
1008 void Version::AddIteratorsForLevel(const ReadOptions& read_options,
1009 const EnvOptions& soptions,
1010 MergeIteratorBuilder* merge_iter_builder,
1011 int level,
1012 RangeDelAggregator* range_del_agg) {
1013 assert(storage_info_.finalized_);
1014 if (level >= storage_info_.num_non_empty_levels()) {
1015 // This is an empty level
1016 return;
1017 } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
1018 // No files in this level
1019 return;
1020 }
1021
1022 bool should_sample = should_sample_file_read();
1023
1024 auto* arena = merge_iter_builder->GetArena();
1025 if (level == 0) {
1026 // Merge all level zero files together since they may overlap
1027 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1028 const auto& file = storage_info_.LevelFilesBrief(0).files[i];
1029 merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
1030 read_options, soptions, cfd_->internal_comparator(), *file.file_metadata,
1031 range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
1032 cfd_->internal_stats()->GetFileReadHist(0), false, arena,
1033 false /* skip_filters */, 0 /* level */));
1034 }
1035 if (should_sample) {
1036 // Count ones for every L0 files. This is done per iterator creation
1037 // rather than Seek(), while files in other levels are recored per seek.
1038 // If users execute one range query per iterator, there may be some
1039 // discrepancy here.
1040 for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
1041 sample_file_read_inc(meta);
1042 }
1043 }
1044 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1045 // For levels > 0, we can use a concatenating iterator that sequentially
1046 // walks through the non-overlapping files in the level, opening them
1047 // lazily.
1048 auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
1049 merge_iter_builder->AddIterator(new (mem) LevelIterator(
1050 cfd_->table_cache(), read_options, soptions,
1051 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1052 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1053 cfd_->internal_stats()->GetFileReadHist(level),
1054 false /* for_compaction */, IsFilterSkipped(level), level,
1055 range_del_agg));
1056 }
1057 }
1058
1059 Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
1060 const EnvOptions& env_options,
1061 const Slice& smallest_user_key,
1062 const Slice& largest_user_key,
1063 int level, bool* overlap) {
1064 assert(storage_info_.finalized_);
1065
1066 auto icmp = cfd_->internal_comparator();
1067 auto ucmp = icmp.user_comparator();
1068
1069 Arena arena;
1070 Status status;
1071 ReadRangeDelAggregator range_del_agg(&icmp,
1072 kMaxSequenceNumber /* upper_bound */);
1073
1074 *overlap = false;
1075
1076 if (level == 0) {
1077 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1078 const auto file = &storage_info_.LevelFilesBrief(0).files[i];
1079 if (AfterFile(ucmp, &smallest_user_key, file) ||
1080 BeforeFile(ucmp, &largest_user_key, file)) {
1081 continue;
1082 }
1083 ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
1084 read_options, env_options, cfd_->internal_comparator(), *file->file_metadata,
1085 &range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
1086 cfd_->internal_stats()->GetFileReadHist(0), false, &arena,
1087 false /* skip_filters */, 0 /* level */));
1088 status = OverlapWithIterator(
1089 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1090 if (!status.ok() || *overlap) {
1091 break;
1092 }
1093 }
1094 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1095 auto mem = arena.AllocateAligned(sizeof(LevelIterator));
1096 ScopedArenaIterator iter(new (mem) LevelIterator(
1097 cfd_->table_cache(), read_options, env_options,
1098 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1099 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1100 cfd_->internal_stats()->GetFileReadHist(level),
1101 false /* for_compaction */, IsFilterSkipped(level), level,
1102 &range_del_agg));
1103 status = OverlapWithIterator(
1104 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1105 }
1106
1107 if (status.ok() && *overlap == false &&
1108 range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
1109 *overlap = true;
1110 }
1111 return status;
1112 }
1113
1114 VersionStorageInfo::VersionStorageInfo(
1115 const InternalKeyComparator* internal_comparator,
1116 const Comparator* user_comparator, int levels,
1117 CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
1118 bool _force_consistency_checks)
1119 : internal_comparator_(internal_comparator),
1120 user_comparator_(user_comparator),
1121 // cfd is nullptr if Version is dummy
1122 num_levels_(levels),
1123 num_non_empty_levels_(0),
1124 file_indexer_(user_comparator),
1125 compaction_style_(compaction_style),
1126 files_(new std::vector<FileMetaData*>[num_levels_]),
1127 base_level_(num_levels_ == 1 ? -1 : 1),
1128 level_multiplier_(0.0),
1129 files_by_compaction_pri_(num_levels_),
1130 level0_non_overlapping_(false),
1131 next_file_to_compact_by_size_(num_levels_),
1132 compaction_score_(num_levels_),
1133 compaction_level_(num_levels_),
1134 l0_delay_trigger_count_(0),
1135 accumulated_file_size_(0),
1136 accumulated_raw_key_size_(0),
1137 accumulated_raw_value_size_(0),
1138 accumulated_num_non_deletions_(0),
1139 accumulated_num_deletions_(0),
1140 current_num_non_deletions_(0),
1141 current_num_deletions_(0),
1142 current_num_samples_(0),
1143 estimated_compaction_needed_bytes_(0),
1144 finalized_(false),
1145 force_consistency_checks_(_force_consistency_checks) {
1146 if (ref_vstorage != nullptr) {
1147 accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
1148 accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
1149 accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
1150 accumulated_num_non_deletions_ =
1151 ref_vstorage->accumulated_num_non_deletions_;
1152 accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
1153 current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
1154 current_num_deletions_ = ref_vstorage->current_num_deletions_;
1155 current_num_samples_ = ref_vstorage->current_num_samples_;
1156 oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
1157 }
1158 }
1159
1160 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
1161 const EnvOptions& env_opt,
1162 const MutableCFOptions mutable_cf_options,
1163 uint64_t version_number)
1164 : env_(vset->env_),
1165 cfd_(column_family_data),
1166 info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
1167 db_statistics_((cfd_ == nullptr) ? nullptr
1168 : cfd_->ioptions()->statistics),
1169 table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
1170 merge_operator_((cfd_ == nullptr) ? nullptr
1171 : cfd_->ioptions()->merge_operator),
1172 storage_info_(
1173 (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
1174 (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
1175 cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
1176 cfd_ == nullptr ? kCompactionStyleLevel
1177 : cfd_->ioptions()->compaction_style,
1178 (cfd_ == nullptr || cfd_->current() == nullptr)
1179 ? nullptr
1180 : cfd_->current()->storage_info(),
1181 cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
1182 vset_(vset),
1183 next_(this),
1184 prev_(this),
1185 refs_(0),
1186 env_options_(env_opt),
1187 mutable_cf_options_(mutable_cf_options),
1188 version_number_(version_number) {}
1189
1190 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
1191 PinnableSlice* value, Status* status,
1192 MergeContext* merge_context,
1193 SequenceNumber* max_covering_tombstone_seq, bool* value_found,
1194 bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
1195 bool* is_blob) {
1196 Slice ikey = k.internal_key();
1197 Slice user_key = k.user_key();
1198
1199 assert(status->ok() || status->IsMergeInProgress());
1200
1201 if (key_exists != nullptr) {
1202 // will falsify below if not found
1203 *key_exists = true;
1204 }
1205
1206 PinnedIteratorsManager pinned_iters_mgr;
1207 GetContext get_context(
1208 user_comparator(), merge_operator_, info_log_, db_statistics_,
1209 status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
1210 value, value_found, merge_context, max_covering_tombstone_seq, this->env_,
1211 seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
1212
1213 // Pin blocks that we read to hold merge operands
1214 if (merge_operator_) {
1215 pinned_iters_mgr.StartPinning();
1216 }
1217
1218 FilePicker fp(
1219 storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
1220 storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
1221 user_comparator(), internal_comparator());
1222 FdWithKeyRange* f = fp.GetNextFile();
1223
1224 while (f != nullptr) {
1225 if (*max_covering_tombstone_seq > 0) {
1226 // The remaining files we look at will only contain covered keys, so we
1227 // stop here.
1228 break;
1229 }
1230 if (get_context.sample()) {
1231 sample_file_read_inc(f->file_metadata);
1232 }
1233
1234 bool timer_enabled =
1235 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
1236 get_perf_context()->per_level_perf_context_enabled;
1237 StopWatchNano timer(env_, timer_enabled /* auto_start */);
1238 *status = table_cache_->Get(
1239 read_options, *internal_comparator(), *f->file_metadata, ikey,
1240 &get_context, mutable_cf_options_.prefix_extractor.get(),
1241 cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1242 IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1243 fp.IsHitFileLastInLevel()),
1244 fp.GetCurrentLevel());
1245 // TODO: examine the behavior for corrupted key
1246 if (timer_enabled) {
1247 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
1248 fp.GetCurrentLevel());
1249 }
1250 if (!status->ok()) {
1251 return;
1252 }
1253
1254 // report the counters before returning
1255 if (get_context.State() != GetContext::kNotFound &&
1256 get_context.State() != GetContext::kMerge &&
1257 db_statistics_ != nullptr) {
1258 get_context.ReportCounters();
1259 }
1260 switch (get_context.State()) {
1261 case GetContext::kNotFound:
1262 // Keep searching in other files
1263 break;
1264 case GetContext::kMerge:
1265 // TODO: update per-level perfcontext user_key_return_count for kMerge
1266 break;
1267 case GetContext::kFound:
1268 if (fp.GetHitFileLevel() == 0) {
1269 RecordTick(db_statistics_, GET_HIT_L0);
1270 } else if (fp.GetHitFileLevel() == 1) {
1271 RecordTick(db_statistics_, GET_HIT_L1);
1272 } else if (fp.GetHitFileLevel() >= 2) {
1273 RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
1274 }
1275 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel());
1276 return;
1277 case GetContext::kDeleted:
1278 // Use empty error message for speed
1279 *status = Status::NotFound();
1280 return;
1281 case GetContext::kCorrupt:
1282 *status = Status::Corruption("corrupted key for ", user_key);
1283 return;
1284 case GetContext::kBlobIndex:
1285 ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
1286 *status = Status::NotSupported(
1287 "Encounter unexpected blob index. Please open DB with "
1288 "rocksdb::blob_db::BlobDB instead.");
1289 return;
1290 }
1291 f = fp.GetNextFile();
1292 }
1293
1294 if (db_statistics_ != nullptr) {
1295 get_context.ReportCounters();
1296 }
1297 if (GetContext::kMerge == get_context.State()) {
1298 if (!merge_operator_) {
1299 *status = Status::InvalidArgument(
1300 "merge_operator is not properly initialized.");
1301 return;
1302 }
1303 // merge_operands are in saver and we hit the beginning of the key history
1304 // do a final merge of nullptr and operands;
1305 std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
1306 *status = MergeHelper::TimedFullMerge(
1307 merge_operator_, user_key, nullptr, merge_context->GetOperands(),
1308 str_value, info_log_, db_statistics_, env_,
1309 nullptr /* result_operand */, true);
1310 if (LIKELY(value != nullptr)) {
1311 value->PinSelf();
1312 }
1313 } else {
1314 if (key_exists != nullptr) {
1315 *key_exists = false;
1316 }
1317 *status = Status::NotFound(); // Use an empty error message for speed
1318 }
1319 }
1320
1321 bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
1322 // Reaching the bottom level implies misses at all upper levels, so we'll
1323 // skip checking the filters when we predict a hit.
1324 return cfd_->ioptions()->optimize_filters_for_hits &&
1325 (level > 0 || is_file_last_in_level) &&
1326 level == storage_info_.num_non_empty_levels() - 1;
1327 }
1328
1329 void VersionStorageInfo::GenerateLevelFilesBrief() {
1330 level_files_brief_.resize(num_non_empty_levels_);
1331 for (int level = 0; level < num_non_empty_levels_; level++) {
1332 DoGenerateLevelFilesBrief(
1333 &level_files_brief_[level], files_[level], &arena_);
1334 }
1335 }
1336
1337 void Version::PrepareApply(
1338 const MutableCFOptions& mutable_cf_options,
1339 bool update_stats) {
1340 UpdateAccumulatedStats(update_stats);
1341 storage_info_.UpdateNumNonEmptyLevels();
1342 storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
1343 storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
1344 storage_info_.GenerateFileIndexer();
1345 storage_info_.GenerateLevelFilesBrief();
1346 storage_info_.GenerateLevel0NonOverlapping();
1347 storage_info_.GenerateBottommostFiles();
1348 }
1349
1350 bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
1351 if (file_meta->init_stats_from_file ||
1352 file_meta->compensated_file_size > 0) {
1353 return false;
1354 }
1355 std::shared_ptr<const TableProperties> tp;
1356 Status s = GetTableProperties(&tp, file_meta);
1357 file_meta->init_stats_from_file = true;
1358 if (!s.ok()) {
1359 ROCKS_LOG_ERROR(vset_->db_options_->info_log,
1360 "Unable to load table properties for file %" PRIu64
1361 " --- %s\n",
1362 file_meta->fd.GetNumber(), s.ToString().c_str());
1363 return false;
1364 }
1365 if (tp.get() == nullptr) return false;
1366 file_meta->num_entries = tp->num_entries;
1367 file_meta->num_deletions = tp->num_deletions;
1368 file_meta->raw_value_size = tp->raw_value_size;
1369 file_meta->raw_key_size = tp->raw_key_size;
1370
1371 return true;
1372 }
1373
1374 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
1375 assert(file_meta->init_stats_from_file);
1376 accumulated_file_size_ += file_meta->fd.GetFileSize();
1377 accumulated_raw_key_size_ += file_meta->raw_key_size;
1378 accumulated_raw_value_size_ += file_meta->raw_value_size;
1379 accumulated_num_non_deletions_ +=
1380 file_meta->num_entries - file_meta->num_deletions;
1381 accumulated_num_deletions_ += file_meta->num_deletions;
1382
1383 current_num_non_deletions_ +=
1384 file_meta->num_entries - file_meta->num_deletions;
1385 current_num_deletions_ += file_meta->num_deletions;
1386 current_num_samples_++;
1387 }
1388
1389 void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
1390 if (file_meta->init_stats_from_file) {
1391 current_num_non_deletions_ -=
1392 file_meta->num_entries - file_meta->num_deletions;
1393 current_num_deletions_ -= file_meta->num_deletions;
1394 current_num_samples_--;
1395 }
1396 }
1397
1398 void Version::UpdateAccumulatedStats(bool update_stats) {
1399 if (update_stats) {
1400 // maximum number of table properties loaded from files.
1401 const int kMaxInitCount = 20;
1402 int init_count = 0;
1403 // here only the first kMaxInitCount files which haven't been
1404 // initialized from file will be updated with num_deletions.
1405 // The motivation here is to cap the maximum I/O per Version creation.
1406 // The reason for choosing files from lower-level instead of higher-level
1407 // is that such design is able to propagate the initialization from
1408 // lower-level to higher-level: When the num_deletions of lower-level
1409 // files are updated, it will make the lower-level files have accurate
1410 // compensated_file_size, making lower-level to higher-level compaction
1411 // will be triggered, which creates higher-level files whose num_deletions
1412 // will be updated here.
1413 for (int level = 0;
1414 level < storage_info_.num_levels_ && init_count < kMaxInitCount;
1415 ++level) {
1416 for (auto* file_meta : storage_info_.files_[level]) {
1417 if (MaybeInitializeFileMetaData(file_meta)) {
1418 // each FileMeta will be initialized only once.
1419 storage_info_.UpdateAccumulatedStats(file_meta);
1420 // when option "max_open_files" is -1, all the file metadata has
1421 // already been read, so MaybeInitializeFileMetaData() won't incur
1422 // any I/O cost. "max_open_files=-1" means that the table cache passed
1423 // to the VersionSet and then to the ColumnFamilySet has a size of
1424 // TableCache::kInfiniteCapacity
1425 if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
1426 TableCache::kInfiniteCapacity) {
1427 continue;
1428 }
1429 if (++init_count >= kMaxInitCount) {
1430 break;
1431 }
1432 }
1433 }
1434 }
1435 // In case all sampled-files contain only deletion entries, then we
1436 // load the table-property of a file in higher-level to initialize
1437 // that value.
1438 for (int level = storage_info_.num_levels_ - 1;
1439 storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
1440 --level) {
1441 for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
1442 storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
1443 if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
1444 storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
1445 }
1446 }
1447 }
1448 }
1449
1450 storage_info_.ComputeCompensatedSizes();
1451 }
1452
1453 void VersionStorageInfo::ComputeCompensatedSizes() {
1454 static const int kDeletionWeightOnCompaction = 2;
1455 uint64_t average_value_size = GetAverageValueSize();
1456
1457 // compute the compensated size
1458 for (int level = 0; level < num_levels_; level++) {
1459 for (auto* file_meta : files_[level]) {
1460 // Here we only compute compensated_file_size for those file_meta
1461 // which compensated_file_size is uninitialized (== 0). This is true only
1462 // for files that have been created right now and no other thread has
1463 // access to them. That's why we can safely mutate compensated_file_size.
1464 if (file_meta->compensated_file_size == 0) {
1465 file_meta->compensated_file_size = file_meta->fd.GetFileSize();
1466 // Here we only boost the size of deletion entries of a file only
1467 // when the number of deletion entries is greater than the number of
1468 // non-deletion entries in the file. The motivation here is that in
1469 // a stable workload, the number of deletion entries should be roughly
1470 // equal to the number of non-deletion entries. If we compensate the
1471 // size of deletion entries in a stable workload, the deletion
1472 // compensation logic might introduce unwanted effet which changes the
1473 // shape of LSM tree.
1474 if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
1475 file_meta->compensated_file_size +=
1476 (file_meta->num_deletions * 2 - file_meta->num_entries) *
1477 average_value_size * kDeletionWeightOnCompaction;
1478 }
1479 }
1480 }
1481 }
1482 }
1483
1484 int VersionStorageInfo::MaxInputLevel() const {
1485 if (compaction_style_ == kCompactionStyleLevel) {
1486 return num_levels() - 2;
1487 }
1488 return 0;
1489 }
1490
1491 int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
1492 if (allow_ingest_behind) {
1493 assert(num_levels() > 1);
1494 return num_levels() - 2;
1495 }
1496 return num_levels() - 1;
1497 }
1498
1499 void VersionStorageInfo::EstimateCompactionBytesNeeded(
1500 const MutableCFOptions& mutable_cf_options) {
1501 // Only implemented for level-based compaction
1502 if (compaction_style_ != kCompactionStyleLevel) {
1503 estimated_compaction_needed_bytes_ = 0;
1504 return;
1505 }
1506
1507 // Start from Level 0, if level 0 qualifies compaction to level 1,
1508 // we estimate the size of compaction.
1509 // Then we move on to the next level and see whether it qualifies compaction
1510 // to the next level. The size of the level is estimated as the actual size
1511 // on the level plus the input bytes from the previous level if there is any.
1512 // If it exceeds, take the exceeded bytes as compaction input and add the size
1513 // of the compaction size to tatal size.
1514 // We keep doing it to Level 2, 3, etc, until the last level and return the
1515 // accumulated bytes.
1516
1517 uint64_t bytes_compact_to_next_level = 0;
1518 uint64_t level_size = 0;
1519 for (auto* f : files_[0]) {
1520 level_size += f->fd.GetFileSize();
1521 }
1522 // Level 0
1523 bool level0_compact_triggered = false;
1524 if (static_cast<int>(files_[0].size()) >=
1525 mutable_cf_options.level0_file_num_compaction_trigger ||
1526 level_size >= mutable_cf_options.max_bytes_for_level_base) {
1527 level0_compact_triggered = true;
1528 estimated_compaction_needed_bytes_ = level_size;
1529 bytes_compact_to_next_level = level_size;
1530 } else {
1531 estimated_compaction_needed_bytes_ = 0;
1532 }
1533
1534 // Level 1 and up.
1535 uint64_t bytes_next_level = 0;
1536 for (int level = base_level(); level <= MaxInputLevel(); level++) {
1537 level_size = 0;
1538 if (bytes_next_level > 0) {
1539 #ifndef NDEBUG
1540 uint64_t level_size2 = 0;
1541 for (auto* f : files_[level]) {
1542 level_size2 += f->fd.GetFileSize();
1543 }
1544 assert(level_size2 == bytes_next_level);
1545 #endif
1546 level_size = bytes_next_level;
1547 bytes_next_level = 0;
1548 } else {
1549 for (auto* f : files_[level]) {
1550 level_size += f->fd.GetFileSize();
1551 }
1552 }
1553 if (level == base_level() && level0_compact_triggered) {
1554 // Add base level size to compaction if level0 compaction triggered.
1555 estimated_compaction_needed_bytes_ += level_size;
1556 }
1557 // Add size added by previous compaction
1558 level_size += bytes_compact_to_next_level;
1559 bytes_compact_to_next_level = 0;
1560 uint64_t level_target = MaxBytesForLevel(level);
1561 if (level_size > level_target) {
1562 bytes_compact_to_next_level = level_size - level_target;
1563 // Estimate the actual compaction fan-out ratio as size ratio between
1564 // the two levels.
1565
1566 assert(bytes_next_level == 0);
1567 if (level + 1 < num_levels_) {
1568 for (auto* f : files_[level + 1]) {
1569 bytes_next_level += f->fd.GetFileSize();
1570 }
1571 }
1572 if (bytes_next_level > 0) {
1573 assert(level_size > 0);
1574 estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
1575 static_cast<double>(bytes_compact_to_next_level) *
1576 (static_cast<double>(bytes_next_level) /
1577 static_cast<double>(level_size) +
1578 1));
1579 }
1580 }
1581 }
1582 }
1583
1584 namespace {
1585 uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
1586 const MutableCFOptions& mutable_cf_options,
1587 const std::vector<FileMetaData*>& files) {
1588 uint32_t ttl_expired_files_count = 0;
1589
1590 int64_t _current_time;
1591 auto status = ioptions.env->GetCurrentTime(&_current_time);
1592 if (status.ok()) {
1593 const uint64_t current_time = static_cast<uint64_t>(_current_time);
1594 for (auto f : files) {
1595 if (!f->being_compacted && f->fd.table_reader != nullptr &&
1596 f->fd.table_reader->GetTableProperties() != nullptr) {
1597 auto creation_time =
1598 f->fd.table_reader->GetTableProperties()->creation_time;
1599 if (creation_time > 0 &&
1600 creation_time < (current_time - mutable_cf_options.ttl)) {
1601 ttl_expired_files_count++;
1602 }
1603 }
1604 }
1605 }
1606 return ttl_expired_files_count;
1607 }
1608 } // anonymous namespace
1609
1610 void VersionStorageInfo::ComputeCompactionScore(
1611 const ImmutableCFOptions& immutable_cf_options,
1612 const MutableCFOptions& mutable_cf_options) {
1613 for (int level = 0; level <= MaxInputLevel(); level++) {
1614 double score;
1615 if (level == 0) {
1616 // We treat level-0 specially by bounding the number of files
1617 // instead of number of bytes for two reasons:
1618 //
1619 // (1) With larger write-buffer sizes, it is nice not to do too
1620 // many level-0 compactions.
1621 //
1622 // (2) The files in level-0 are merged on every read and
1623 // therefore we wish to avoid too many files when the individual
1624 // file size is small (perhaps because of a small write-buffer
1625 // setting, or very high compression ratios, or lots of
1626 // overwrites/deletions).
1627 int num_sorted_runs = 0;
1628 uint64_t total_size = 0;
1629 for (auto* f : files_[level]) {
1630 if (!f->being_compacted) {
1631 total_size += f->compensated_file_size;
1632 num_sorted_runs++;
1633 }
1634 }
1635 if (compaction_style_ == kCompactionStyleUniversal) {
1636 // For universal compaction, we use level0 score to indicate
1637 // compaction score for the whole DB. Adding other levels as if
1638 // they are L0 files.
1639 for (int i = 1; i < num_levels(); i++) {
1640 if (!files_[i].empty() && !files_[i][0]->being_compacted) {
1641 num_sorted_runs++;
1642 }
1643 }
1644 }
1645
1646 if (compaction_style_ == kCompactionStyleFIFO) {
1647 score = static_cast<double>(total_size) /
1648 mutable_cf_options.compaction_options_fifo.max_table_files_size;
1649 if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
1650 score = std::max(
1651 static_cast<double>(num_sorted_runs) /
1652 mutable_cf_options.level0_file_num_compaction_trigger,
1653 score);
1654 }
1655 if (mutable_cf_options.ttl > 0) {
1656 score = std::max(
1657 static_cast<double>(GetExpiredTtlFilesCount(
1658 immutable_cf_options, mutable_cf_options, files_[level])),
1659 score);
1660 }
1661
1662 } else {
1663 score = static_cast<double>(num_sorted_runs) /
1664 mutable_cf_options.level0_file_num_compaction_trigger;
1665 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
1666 // Level-based involves L0->L0 compactions that can lead to oversized
1667 // L0 files. Take into account size as well to avoid later giant
1668 // compactions to the base level.
1669 score = std::max(
1670 score, static_cast<double>(total_size) /
1671 mutable_cf_options.max_bytes_for_level_base);
1672 }
1673 }
1674 } else {
1675 // Compute the ratio of current size to size limit.
1676 uint64_t level_bytes_no_compacting = 0;
1677 for (auto f : files_[level]) {
1678 if (!f->being_compacted) {
1679 level_bytes_no_compacting += f->compensated_file_size;
1680 }
1681 }
1682 score = static_cast<double>(level_bytes_no_compacting) /
1683 MaxBytesForLevel(level);
1684 }
1685 compaction_level_[level] = level;
1686 compaction_score_[level] = score;
1687 }
1688
1689 // sort all the levels based on their score. Higher scores get listed
1690 // first. Use bubble sort because the number of entries are small.
1691 for (int i = 0; i < num_levels() - 2; i++) {
1692 for (int j = i + 1; j < num_levels() - 1; j++) {
1693 if (compaction_score_[i] < compaction_score_[j]) {
1694 double score = compaction_score_[i];
1695 int level = compaction_level_[i];
1696 compaction_score_[i] = compaction_score_[j];
1697 compaction_level_[i] = compaction_level_[j];
1698 compaction_score_[j] = score;
1699 compaction_level_[j] = level;
1700 }
1701 }
1702 }
1703 ComputeFilesMarkedForCompaction();
1704 ComputeBottommostFilesMarkedForCompaction();
1705 if (mutable_cf_options.ttl > 0) {
1706 ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
1707 }
1708 EstimateCompactionBytesNeeded(mutable_cf_options);
1709 }
1710
1711 void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
1712 files_marked_for_compaction_.clear();
1713 int last_qualify_level = 0;
1714
1715 // Do not include files from the last level with data
1716 // If table properties collector suggests a file on the last level,
1717 // we should not move it to a new level.
1718 for (int level = num_levels() - 1; level >= 1; level--) {
1719 if (!files_[level].empty()) {
1720 last_qualify_level = level - 1;
1721 break;
1722 }
1723 }
1724
1725 for (int level = 0; level <= last_qualify_level; level++) {
1726 for (auto* f : files_[level]) {
1727 if (!f->being_compacted && f->marked_for_compaction) {
1728 files_marked_for_compaction_.emplace_back(level, f);
1729 }
1730 }
1731 }
1732 }
1733
1734 void VersionStorageInfo::ComputeExpiredTtlFiles(
1735 const ImmutableCFOptions& ioptions, const uint64_t ttl) {
1736 assert(ttl > 0);
1737
1738 expired_ttl_files_.clear();
1739
1740 int64_t _current_time;
1741 auto status = ioptions.env->GetCurrentTime(&_current_time);
1742 if (!status.ok()) {
1743 return;
1744 }
1745 const uint64_t current_time = static_cast<uint64_t>(_current_time);
1746
1747 for (int level = 0; level < num_levels() - 1; level++) {
1748 for (auto f : files_[level]) {
1749 if (!f->being_compacted && f->fd.table_reader != nullptr &&
1750 f->fd.table_reader->GetTableProperties() != nullptr) {
1751 auto creation_time =
1752 f->fd.table_reader->GetTableProperties()->creation_time;
1753 if (creation_time > 0 && creation_time < (current_time - ttl)) {
1754 expired_ttl_files_.emplace_back(level, f);
1755 }
1756 }
1757 }
1758 }
1759 }
1760
1761 namespace {
1762
1763 // used to sort files by size
1764 struct Fsize {
1765 size_t index;
1766 FileMetaData* file;
1767 };
1768
1769 // Compator that is used to sort files based on their size
1770 // In normal mode: descending size
1771 bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
1772 return (first.file->compensated_file_size >
1773 second.file->compensated_file_size);
1774 }
1775 } // anonymous namespace
1776
1777 void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
1778 auto* level_files = &files_[level];
1779 // Must not overlap
1780 #ifndef NDEBUG
1781 if (level > 0 && !level_files->empty() &&
1782 internal_comparator_->Compare(
1783 (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) {
1784 auto* f2 = (*level_files)[level_files->size() - 1];
1785 if (info_log != nullptr) {
1786 Error(info_log, "Adding new file %" PRIu64
1787 " range (%s, %s) to level %d but overlapping "
1788 "with existing file %" PRIu64 " %s %s",
1789 f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
1790 f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
1791 f2->smallest.DebugString(true).c_str(),
1792 f2->largest.DebugString(true).c_str());
1793 LogFlush(info_log);
1794 }
1795 assert(false);
1796 }
1797 #else
1798 (void)info_log;
1799 #endif
1800 f->refs++;
1801 level_files->push_back(f);
1802 }
1803
1804 // Version::PrepareApply() need to be called before calling the function, or
1805 // following functions called:
1806 // 1. UpdateNumNonEmptyLevels();
1807 // 2. CalculateBaseBytes();
1808 // 3. UpdateFilesByCompactionPri();
1809 // 4. GenerateFileIndexer();
1810 // 5. GenerateLevelFilesBrief();
1811 // 6. GenerateLevel0NonOverlapping();
1812 // 7. GenerateBottommostFiles();
1813 void VersionStorageInfo::SetFinalized() {
1814 finalized_ = true;
1815 #ifndef NDEBUG
1816 if (compaction_style_ != kCompactionStyleLevel) {
1817 // Not level based compaction.
1818 return;
1819 }
1820 assert(base_level_ < 0 || num_levels() == 1 ||
1821 (base_level_ >= 1 && base_level_ < num_levels()));
1822 // Verify all levels newer than base_level are empty except L0
1823 for (int level = 1; level < base_level(); level++) {
1824 assert(NumLevelBytes(level) == 0);
1825 }
1826 uint64_t max_bytes_prev_level = 0;
1827 for (int level = base_level(); level < num_levels() - 1; level++) {
1828 if (LevelFiles(level).size() == 0) {
1829 continue;
1830 }
1831 assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
1832 max_bytes_prev_level = MaxBytesForLevel(level);
1833 }
1834 int num_empty_non_l0_level = 0;
1835 for (int level = 0; level < num_levels(); level++) {
1836 assert(LevelFiles(level).size() == 0 ||
1837 LevelFiles(level).size() == LevelFilesBrief(level).num_files);
1838 if (level > 0 && NumLevelBytes(level) > 0) {
1839 num_empty_non_l0_level++;
1840 }
1841 if (LevelFiles(level).size() > 0) {
1842 assert(level < num_non_empty_levels());
1843 }
1844 }
1845 assert(compaction_level_.size() > 0);
1846 assert(compaction_level_.size() == compaction_score_.size());
1847 #endif
1848 }
1849
1850 void VersionStorageInfo::UpdateNumNonEmptyLevels() {
1851 num_non_empty_levels_ = num_levels_;
1852 for (int i = num_levels_ - 1; i >= 0; i--) {
1853 if (files_[i].size() != 0) {
1854 return;
1855 } else {
1856 num_non_empty_levels_ = i;
1857 }
1858 }
1859 }
1860
1861 namespace {
1862 // Sort `temp` based on ratio of overlapping size over file size
1863 void SortFileByOverlappingRatio(
1864 const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
1865 const std::vector<FileMetaData*>& next_level_files,
1866 std::vector<Fsize>* temp) {
1867 std::unordered_map<uint64_t, uint64_t> file_to_order;
1868 auto next_level_it = next_level_files.begin();
1869
1870 for (auto& file : files) {
1871 uint64_t overlapping_bytes = 0;
1872 // Skip files in next level that is smaller than current file
1873 while (next_level_it != next_level_files.end() &&
1874 icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
1875 next_level_it++;
1876 }
1877
1878 while (next_level_it != next_level_files.end() &&
1879 icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
1880 overlapping_bytes += (*next_level_it)->fd.file_size;
1881
1882 if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
1883 // next level file cross large boundary of current file.
1884 break;
1885 }
1886 next_level_it++;
1887 }
1888
1889 assert(file->compensated_file_size != 0);
1890 file_to_order[file->fd.GetNumber()] =
1891 overlapping_bytes * 1024u / file->compensated_file_size;
1892 }
1893
1894 std::sort(temp->begin(), temp->end(),
1895 [&](const Fsize& f1, const Fsize& f2) -> bool {
1896 return file_to_order[f1.file->fd.GetNumber()] <
1897 file_to_order[f2.file->fd.GetNumber()];
1898 });
1899 }
1900 } // namespace
1901
1902 void VersionStorageInfo::UpdateFilesByCompactionPri(
1903 CompactionPri compaction_pri) {
1904 if (compaction_style_ == kCompactionStyleNone ||
1905 compaction_style_ == kCompactionStyleFIFO ||
1906 compaction_style_ == kCompactionStyleUniversal) {
1907 // don't need this
1908 return;
1909 }
1910 // No need to sort the highest level because it is never compacted.
1911 for (int level = 0; level < num_levels() - 1; level++) {
1912 const std::vector<FileMetaData*>& files = files_[level];
1913 auto& files_by_compaction_pri = files_by_compaction_pri_[level];
1914 assert(files_by_compaction_pri.size() == 0);
1915
1916 // populate a temp vector for sorting based on size
1917 std::vector<Fsize> temp(files.size());
1918 for (size_t i = 0; i < files.size(); i++) {
1919 temp[i].index = i;
1920 temp[i].file = files[i];
1921 }
1922
1923 // sort the top number_of_files_to_sort_ based on file size
1924 size_t num = VersionStorageInfo::kNumberFilesToSort;
1925 if (num > temp.size()) {
1926 num = temp.size();
1927 }
1928 switch (compaction_pri) {
1929 case kByCompensatedSize:
1930 std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
1931 CompareCompensatedSizeDescending);
1932 break;
1933 case kOldestLargestSeqFirst:
1934 std::sort(temp.begin(), temp.end(),
1935 [](const Fsize& f1, const Fsize& f2) -> bool {
1936 return f1.file->fd.largest_seqno <
1937 f2.file->fd.largest_seqno;
1938 });
1939 break;
1940 case kOldestSmallestSeqFirst:
1941 std::sort(temp.begin(), temp.end(),
1942 [](const Fsize& f1, const Fsize& f2) -> bool {
1943 return f1.file->fd.smallest_seqno <
1944 f2.file->fd.smallest_seqno;
1945 });
1946 break;
1947 case kMinOverlappingRatio:
1948 SortFileByOverlappingRatio(*internal_comparator_, files_[level],
1949 files_[level + 1], &temp);
1950 break;
1951 default:
1952 assert(false);
1953 }
1954 assert(temp.size() == files.size());
1955
1956 // initialize files_by_compaction_pri_
1957 for (size_t i = 0; i < temp.size(); i++) {
1958 files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
1959 }
1960 next_file_to_compact_by_size_[level] = 0;
1961 assert(files_[level].size() == files_by_compaction_pri_[level].size());
1962 }
1963 }
1964
1965 void VersionStorageInfo::GenerateLevel0NonOverlapping() {
1966 assert(!finalized_);
1967 level0_non_overlapping_ = true;
1968 if (level_files_brief_.size() == 0) {
1969 return;
1970 }
1971
1972 // A copy of L0 files sorted by smallest key
1973 std::vector<FdWithKeyRange> level0_sorted_file(
1974 level_files_brief_[0].files,
1975 level_files_brief_[0].files + level_files_brief_[0].num_files);
1976 std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
1977 [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
1978 return (internal_comparator_->Compare(f1.smallest_key,
1979 f2.smallest_key) < 0);
1980 });
1981
1982 for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
1983 FdWithKeyRange& f = level0_sorted_file[i];
1984 FdWithKeyRange& prev = level0_sorted_file[i - 1];
1985 if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
1986 level0_non_overlapping_ = false;
1987 break;
1988 }
1989 }
1990 }
1991
1992 void VersionStorageInfo::GenerateBottommostFiles() {
1993 assert(!finalized_);
1994 assert(bottommost_files_.empty());
1995 for (size_t level = 0; level < level_files_brief_.size(); ++level) {
1996 for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
1997 ++file_idx) {
1998 const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
1999 int l0_file_idx;
2000 if (level == 0) {
2001 l0_file_idx = static_cast<int>(file_idx);
2002 } else {
2003 l0_file_idx = -1;
2004 }
2005 Slice smallest_user_key = ExtractUserKey(f.smallest_key);
2006 Slice largest_user_key = ExtractUserKey(f.largest_key);
2007 if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
2008 static_cast<int>(level),
2009 l0_file_idx)) {
2010 bottommost_files_.emplace_back(static_cast<int>(level),
2011 f.file_metadata);
2012 }
2013 }
2014 }
2015 }
2016
2017 void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
2018 assert(seqnum >= oldest_snapshot_seqnum_);
2019 oldest_snapshot_seqnum_ = seqnum;
2020 if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
2021 ComputeBottommostFilesMarkedForCompaction();
2022 }
2023 }
2024
2025 void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
2026 bottommost_files_marked_for_compaction_.clear();
2027 bottommost_files_mark_threshold_ = kMaxSequenceNumber;
2028 for (auto& level_and_file : bottommost_files_) {
2029 if (!level_and_file.second->being_compacted &&
2030 level_and_file.second->fd.largest_seqno != 0 &&
2031 level_and_file.second->num_deletions > 1) {
2032 // largest_seqno might be nonzero due to containing the final key in an
2033 // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
2034 // ensures the file really contains deleted or overwritten keys.
2035 if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
2036 bottommost_files_marked_for_compaction_.push_back(level_and_file);
2037 } else {
2038 bottommost_files_mark_threshold_ =
2039 std::min(bottommost_files_mark_threshold_,
2040 level_and_file.second->fd.largest_seqno);
2041 }
2042 }
2043 }
2044 }
2045
2046 void Version::Ref() {
2047 ++refs_;
2048 }
2049
2050 bool Version::Unref() {
2051 assert(refs_ >= 1);
2052 --refs_;
2053 if (refs_ == 0) {
2054 delete this;
2055 return true;
2056 }
2057 return false;
2058 }
2059
2060 bool VersionStorageInfo::OverlapInLevel(int level,
2061 const Slice* smallest_user_key,
2062 const Slice* largest_user_key) {
2063 if (level >= num_non_empty_levels_) {
2064 // empty level, no overlap
2065 return false;
2066 }
2067 return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
2068 level_files_brief_[level], smallest_user_key,
2069 largest_user_key);
2070 }
2071
2072 // Store in "*inputs" all files in "level" that overlap [begin,end]
2073 // If hint_index is specified, then it points to a file in the
2074 // overlapping range.
2075 // The file_index returns a pointer to any file in an overlapping range.
2076 void VersionStorageInfo::GetOverlappingInputs(
2077 int level, const InternalKey* begin, const InternalKey* end,
2078 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
2079 bool expand_range, InternalKey** next_smallest) const {
2080 if (level >= num_non_empty_levels_) {
2081 // this level is empty, no overlapping inputs
2082 return;
2083 }
2084
2085 inputs->clear();
2086 if (file_index) {
2087 *file_index = -1;
2088 }
2089 const Comparator* user_cmp = user_comparator_;
2090 if (level > 0) {
2091 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
2092 file_index, false, next_smallest);
2093 return;
2094 }
2095
2096 if (next_smallest) {
2097 // next_smallest key only makes sense for non-level 0, where files are
2098 // non-overlapping
2099 *next_smallest = nullptr;
2100 }
2101
2102 Slice user_begin, user_end;
2103 if (begin != nullptr) {
2104 user_begin = begin->user_key();
2105 }
2106 if (end != nullptr) {
2107 user_end = end->user_key();
2108 }
2109
2110 // index stores the file index need to check.
2111 std::list<size_t> index;
2112 for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
2113 index.emplace_back(i);
2114 }
2115
2116 while (!index.empty()) {
2117 bool found_overlapping_file = false;
2118 auto iter = index.begin();
2119 while (iter != index.end()) {
2120 FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
2121 const Slice file_start = ExtractUserKey(f->smallest_key);
2122 const Slice file_limit = ExtractUserKey(f->largest_key);
2123 if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
2124 // "f" is completely before specified range; skip it
2125 iter++;
2126 } else if (end != nullptr &&
2127 user_cmp->Compare(file_start, user_end) > 0) {
2128 // "f" is completely after specified range; skip it
2129 iter++;
2130 } else {
2131 // if overlap
2132 inputs->emplace_back(files_[level][*iter]);
2133 found_overlapping_file = true;
2134 // record the first file index.
2135 if (file_index && *file_index == -1) {
2136 *file_index = static_cast<int>(*iter);
2137 }
2138 // the related file is overlap, erase to avoid checking again.
2139 iter = index.erase(iter);
2140 if (expand_range) {
2141 if (begin != nullptr &&
2142 user_cmp->Compare(file_start, user_begin) < 0) {
2143 user_begin = file_start;
2144 }
2145 if (end != nullptr && user_cmp->Compare(file_limit, user_end) > 0) {
2146 user_end = file_limit;
2147 }
2148 }
2149 }
2150 }
2151 // if all the files left are not overlap, break
2152 if (!found_overlapping_file) {
2153 break;
2154 }
2155 }
2156 }
2157
2158 // Store in "*inputs" files in "level" that within range [begin,end]
2159 // Guarantee a "clean cut" boundary between the files in inputs
2160 // and the surrounding files and the maxinum number of files.
2161 // This will ensure that no parts of a key are lost during compaction.
2162 // If hint_index is specified, then it points to a file in the range.
2163 // The file_index returns a pointer to any file in an overlapping range.
2164 void VersionStorageInfo::GetCleanInputsWithinInterval(
2165 int level, const InternalKey* begin, const InternalKey* end,
2166 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
2167 inputs->clear();
2168 if (file_index) {
2169 *file_index = -1;
2170 }
2171 if (level >= num_non_empty_levels_ || level == 0 ||
2172 level_files_brief_[level].num_files == 0) {
2173 // this level is empty, no inputs within range
2174 // also don't support clean input interval within L0
2175 return;
2176 }
2177
2178 const auto& level_files = level_files_brief_[level];
2179 if (begin == nullptr) {
2180 begin = &level_files.files[0].file_metadata->smallest;
2181 }
2182 if (end == nullptr) {
2183 end = &level_files.files[level_files.num_files - 1].file_metadata->largest;
2184 }
2185
2186 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
2187 hint_index, file_index,
2188 true /* within_interval */);
2189 }
2190
2191 // Store in "*inputs" all files in "level" that overlap [begin,end]
2192 // Employ binary search to find at least one file that overlaps the
2193 // specified range. From that file, iterate backwards and
2194 // forwards to find all overlapping files.
2195 // if within_range is set, then only store the maximum clean inputs
2196 // within range [begin, end]. "clean" means there is a boudnary
2197 // between the files in "*inputs" and the surrounding files
2198 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
2199 int level, const InternalKey* begin, const InternalKey* end,
2200 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
2201 bool within_interval, InternalKey** next_smallest) const {
2202 assert(level > 0);
2203 int min = 0;
2204 int mid = 0;
2205 int max = static_cast<int>(files_[level].size()) - 1;
2206 bool foundOverlap = false;
2207 auto user_cmp = user_comparator_;
2208
2209 // if the caller already knows the index of a file that has overlap,
2210 // then we can skip the binary search.
2211 if (hint_index != -1) {
2212 mid = hint_index;
2213 foundOverlap = true;
2214 }
2215
2216 while (!foundOverlap && min <= max) {
2217 mid = (min + max)/2;
2218 FdWithKeyRange* f = &(level_files_brief_[level].files[mid]);
2219 auto& smallest = f->file_metadata->smallest;
2220 auto& largest = f->file_metadata->largest;
2221 if ((!within_interval && sstableKeyCompare(user_cmp, begin, largest) > 0) ||
2222 (within_interval && sstableKeyCompare(user_cmp, begin, smallest) > 0)) {
2223 min = mid + 1;
2224 } else if ((!within_interval &&
2225 sstableKeyCompare(user_cmp, smallest, end) > 0) ||
2226 (within_interval &&
2227 sstableKeyCompare(user_cmp, largest, end) > 0)) {
2228 max = mid - 1;
2229 } else {
2230 foundOverlap = true;
2231 break;
2232 }
2233 }
2234
2235 // If there were no overlapping files, return immediately.
2236 if (!foundOverlap) {
2237 if (next_smallest) {
2238 *next_smallest = nullptr;
2239 }
2240 return;
2241 }
2242 // returns the index where an overlap is found
2243 if (file_index) {
2244 *file_index = mid;
2245 }
2246
2247 int start_index, end_index;
2248 if (within_interval) {
2249 ExtendFileRangeWithinInterval(level, begin, end, mid,
2250 &start_index, &end_index);
2251 } else {
2252 ExtendFileRangeOverlappingInterval(level, begin, end, mid,
2253 &start_index, &end_index);
2254 assert(end_index >= start_index);
2255 }
2256 // insert overlapping files into vector
2257 for (int i = start_index; i <= end_index; i++) {
2258 inputs->push_back(files_[level][i]);
2259 }
2260
2261 if (next_smallest != nullptr) {
2262 // Provide the next key outside the range covered by inputs
2263 if (++end_index < static_cast<int>(files_[level].size())) {
2264 **next_smallest = files_[level][end_index]->smallest;
2265 } else {
2266 *next_smallest = nullptr;
2267 }
2268 }
2269 }
2270
2271 // Store in *start_index and *end_index the range of all files in
2272 // "level" that overlap [begin,end]
2273 // The mid_index specifies the index of at least one file that
2274 // overlaps the specified range. From that file, iterate backward
2275 // and forward to find all overlapping files.
2276 // Use FileLevel in searching, make it faster
2277 void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
2278 int level, const InternalKey* begin, const InternalKey* end,
2279 unsigned int mid_index, int* start_index, int* end_index) const {
2280 auto user_cmp = user_comparator_;
2281 const FdWithKeyRange* files = level_files_brief_[level].files;
2282 #ifndef NDEBUG
2283 {
2284 // assert that the file at mid_index overlaps with the range
2285 assert(mid_index < level_files_brief_[level].num_files);
2286 const FdWithKeyRange* f = &files[mid_index];
2287 auto& smallest = f->file_metadata->smallest;
2288 auto& largest = f->file_metadata->largest;
2289 if (sstableKeyCompare(user_cmp, begin, smallest) <= 0) {
2290 assert(sstableKeyCompare(user_cmp, smallest, end) <= 0);
2291 } else {
2292 // fprintf(stderr, "ExtendFileRangeOverlappingInterval\n%s - %s\n%s - %s\n%d %d\n",
2293 // begin ? begin->DebugString().c_str() : "(null)",
2294 // end ? end->DebugString().c_str() : "(null)",
2295 // smallest->DebugString().c_str(),
2296 // largest->DebugString().c_str(),
2297 // sstableKeyCompare(user_cmp, smallest, begin),
2298 // sstableKeyCompare(user_cmp, largest, begin));
2299 assert(sstableKeyCompare(user_cmp, begin, largest) <= 0);
2300 }
2301 }
2302 #endif
2303 *start_index = mid_index + 1;
2304 *end_index = mid_index;
2305 int count __attribute__((__unused__));
2306 count = 0;
2307
2308 // check backwards from 'mid' to lower indices
2309 for (int i = mid_index; i >= 0 ; i--) {
2310 const FdWithKeyRange* f = &files[i];
2311 auto& largest = f->file_metadata->largest;
2312 if (sstableKeyCompare(user_cmp, begin, largest) <= 0) {
2313 *start_index = i;
2314 assert((count++, true));
2315 } else {
2316 break;
2317 }
2318 }
2319 // check forward from 'mid+1' to higher indices
2320 for (unsigned int i = mid_index+1;
2321 i < level_files_brief_[level].num_files; i++) {
2322 const FdWithKeyRange* f = &files[i];
2323 auto& smallest = f->file_metadata->smallest;
2324 if (sstableKeyCompare(user_cmp, smallest, end) <= 0) {
2325 assert((count++, true));
2326 *end_index = i;
2327 } else {
2328 break;
2329 }
2330 }
2331 assert(count == *end_index - *start_index + 1);
2332 }
2333
2334 // Store in *start_index and *end_index the clean range of all files in
2335 // "level" within [begin,end]
2336 // The mid_index specifies the index of at least one file within
2337 // the specified range. From that file, iterate backward
2338 // and forward to find all overlapping files and then "shrink" to
2339 // the clean range required.
2340 // Use FileLevel in searching, make it faster
2341 void VersionStorageInfo::ExtendFileRangeWithinInterval(
2342 int level, const InternalKey* begin, const InternalKey* end,
2343 unsigned int mid_index, int* start_index, int* end_index) const {
2344 assert(level != 0);
2345 auto* user_cmp = user_comparator_;
2346 const FdWithKeyRange* files = level_files_brief_[level].files;
2347 #ifndef NDEBUG
2348 {
2349 // assert that the file at mid_index is within the range
2350 assert(mid_index < level_files_brief_[level].num_files);
2351 const FdWithKeyRange* f = &files[mid_index];
2352 auto& smallest = f->file_metadata->smallest;
2353 auto& largest = f->file_metadata->largest;
2354 assert(sstableKeyCompare(user_cmp, begin, smallest) <= 0 &&
2355 sstableKeyCompare(user_cmp, largest, end) <= 0);
2356 }
2357 #endif
2358 ExtendFileRangeOverlappingInterval(level, begin, end, mid_index,
2359 start_index, end_index);
2360 int left = *start_index;
2361 int right = *end_index;
2362 // shrink from left to right
2363 while (left <= right) {
2364 auto& smallest = files[left].file_metadata->smallest;
2365 if (sstableKeyCompare(user_cmp, begin, smallest) > 0) {
2366 left++;
2367 continue;
2368 }
2369 if (left > 0) { // If not first file
2370 auto& largest = files[left - 1].file_metadata->largest;
2371 if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
2372 left++;
2373 continue;
2374 }
2375 }
2376 break;
2377 }
2378 // shrink from right to left
2379 while (left <= right) {
2380 auto& largest = files[right].file_metadata->largest;
2381 if (sstableKeyCompare(user_cmp, largest, end) > 0) {
2382 right--;
2383 continue;
2384 }
2385 if (right < static_cast<int>(level_files_brief_[level].num_files) -
2386 1) { // If not the last file
2387 auto& smallest = files[right + 1].file_metadata->smallest;
2388 if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
2389 // The last user key in range overlaps with the next file's first key
2390 right--;
2391 continue;
2392 }
2393 }
2394 break;
2395 }
2396
2397 *start_index = left;
2398 *end_index = right;
2399 }
2400
2401 uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
2402 assert(level >= 0);
2403 assert(level < num_levels());
2404 return TotalFileSize(files_[level]);
2405 }
2406
2407 const char* VersionStorageInfo::LevelSummary(
2408 LevelSummaryStorage* scratch) const {
2409 int len = 0;
2410 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
2411 assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
2412 if (level_multiplier_ != 0.0) {
2413 len = snprintf(
2414 scratch->buffer, sizeof(scratch->buffer),
2415 "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
2416 base_level_, level_multiplier_, level_max_bytes_[base_level_]);
2417 }
2418 }
2419 len +=
2420 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
2421 for (int i = 0; i < num_levels(); i++) {
2422 int sz = sizeof(scratch->buffer) - len;
2423 int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
2424 if (ret < 0 || ret >= sz) break;
2425 len += ret;
2426 }
2427 if (len > 0) {
2428 // overwrite the last space
2429 --len;
2430 }
2431 len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
2432 "] max score %.2f", compaction_score_[0]);
2433
2434 if (!files_marked_for_compaction_.empty()) {
2435 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
2436 " (%" ROCKSDB_PRIszt " files need compaction)",
2437 files_marked_for_compaction_.size());
2438 }
2439
2440 return scratch->buffer;
2441 }
2442
2443 const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
2444 int level) const {
2445 int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
2446 for (const auto& f : files_[level]) {
2447 int sz = sizeof(scratch->buffer) - len;
2448 char sztxt[16];
2449 AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
2450 int ret = snprintf(scratch->buffer + len, sz,
2451 "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
2452 f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
2453 static_cast<int>(f->being_compacted));
2454 if (ret < 0 || ret >= sz)
2455 break;
2456 len += ret;
2457 }
2458 // overwrite the last space (only if files_[level].size() is non-zero)
2459 if (files_[level].size() && len > 0) {
2460 --len;
2461 }
2462 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
2463 return scratch->buffer;
2464 }
2465
2466 int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
2467 uint64_t result = 0;
2468 std::vector<FileMetaData*> overlaps;
2469 for (int level = 1; level < num_levels() - 1; level++) {
2470 for (const auto& f : files_[level]) {
2471 GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
2472 const uint64_t sum = TotalFileSize(overlaps);
2473 if (sum > result) {
2474 result = sum;
2475 }
2476 }
2477 }
2478 return result;
2479 }
2480
2481 uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
2482 // Note: the result for level zero is not really used since we set
2483 // the level-0 compaction threshold based on number of files.
2484 assert(level >= 0);
2485 assert(level < static_cast<int>(level_max_bytes_.size()));
2486 return level_max_bytes_[level];
2487 }
2488
2489 void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
2490 const MutableCFOptions& options) {
2491 // Special logic to set number of sorted runs.
2492 // It is to match the previous behavior when all files are in L0.
2493 int num_l0_count = static_cast<int>(files_[0].size());
2494 if (compaction_style_ == kCompactionStyleUniversal) {
2495 // For universal compaction, we use level0 score to indicate
2496 // compaction score for the whole DB. Adding other levels as if
2497 // they are L0 files.
2498 for (int i = 1; i < num_levels(); i++) {
2499 if (!files_[i].empty()) {
2500 num_l0_count++;
2501 }
2502 }
2503 }
2504 set_l0_delay_trigger_count(num_l0_count);
2505
2506 level_max_bytes_.resize(ioptions.num_levels);
2507 if (!ioptions.level_compaction_dynamic_level_bytes) {
2508 base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
2509
2510 // Calculate for static bytes base case
2511 for (int i = 0; i < ioptions.num_levels; ++i) {
2512 if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
2513 level_max_bytes_[i] = options.max_bytes_for_level_base;
2514 } else if (i > 1) {
2515 level_max_bytes_[i] = MultiplyCheckOverflow(
2516 MultiplyCheckOverflow(level_max_bytes_[i - 1],
2517 options.max_bytes_for_level_multiplier),
2518 options.MaxBytesMultiplerAdditional(i - 1));
2519 } else {
2520 level_max_bytes_[i] = options.max_bytes_for_level_base;
2521 }
2522 }
2523 } else {
2524 uint64_t max_level_size = 0;
2525
2526 int first_non_empty_level = -1;
2527 // Find size of non-L0 level of most data.
2528 // Cannot use the size of the last level because it can be empty or less
2529 // than previous levels after compaction.
2530 for (int i = 1; i < num_levels_; i++) {
2531 uint64_t total_size = 0;
2532 for (const auto& f : files_[i]) {
2533 total_size += f->fd.GetFileSize();
2534 }
2535 if (total_size > 0 && first_non_empty_level == -1) {
2536 first_non_empty_level = i;
2537 }
2538 if (total_size > max_level_size) {
2539 max_level_size = total_size;
2540 }
2541 }
2542
2543 // Prefill every level's max bytes to disallow compaction from there.
2544 for (int i = 0; i < num_levels_; i++) {
2545 level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
2546 }
2547
2548 if (max_level_size == 0) {
2549 // No data for L1 and up. L0 compacts to last level directly.
2550 // No compaction from L1+ needs to be scheduled.
2551 base_level_ = num_levels_ - 1;
2552 } else {
2553 uint64_t l0_size = 0;
2554 for (const auto& f : files_[0]) {
2555 l0_size += f->fd.GetFileSize();
2556 }
2557
2558 uint64_t base_bytes_max =
2559 std::max(options.max_bytes_for_level_base, l0_size);
2560 uint64_t base_bytes_min = static_cast<uint64_t>(
2561 base_bytes_max / options.max_bytes_for_level_multiplier);
2562
2563 // Try whether we can make last level's target size to be max_level_size
2564 uint64_t cur_level_size = max_level_size;
2565 for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
2566 // Round up after dividing
2567 cur_level_size = static_cast<uint64_t>(
2568 cur_level_size / options.max_bytes_for_level_multiplier);
2569 }
2570
2571 // Calculate base level and its size.
2572 uint64_t base_level_size;
2573 if (cur_level_size <= base_bytes_min) {
2574 // Case 1. If we make target size of last level to be max_level_size,
2575 // target size of the first non-empty level would be smaller than
2576 // base_bytes_min. We set it be base_bytes_min.
2577 base_level_size = base_bytes_min + 1U;
2578 base_level_ = first_non_empty_level;
2579 ROCKS_LOG_WARN(ioptions.info_log,
2580 "More existing levels in DB than needed. "
2581 "max_bytes_for_level_multiplier may not be guaranteed.");
2582 } else {
2583 // Find base level (where L0 data is compacted to).
2584 base_level_ = first_non_empty_level;
2585 while (base_level_ > 1 && cur_level_size > base_bytes_max) {
2586 --base_level_;
2587 cur_level_size = static_cast<uint64_t>(
2588 cur_level_size / options.max_bytes_for_level_multiplier);
2589 }
2590 if (cur_level_size > base_bytes_max) {
2591 // Even L1 will be too large
2592 assert(base_level_ == 1);
2593 base_level_size = base_bytes_max;
2594 } else {
2595 base_level_size = cur_level_size;
2596 }
2597 }
2598
2599 level_multiplier_ = options.max_bytes_for_level_multiplier;
2600 assert(base_level_size > 0);
2601 if (l0_size > base_level_size &&
2602 (l0_size > options.max_bytes_for_level_base ||
2603 static_cast<int>(files_[0].size() / 2) >=
2604 options.level0_file_num_compaction_trigger)) {
2605 // We adjust the base level according to actual L0 size, and adjust
2606 // the level multiplier accordingly, when:
2607 // 1. the L0 size is larger than level size base, or
2608 // 2. number of L0 files reaches twice the L0->L1 compaction trigger
2609 // We don't do this otherwise to keep the LSM-tree structure stable
2610 // unless the L0 compation is backlogged.
2611 base_level_size = l0_size;
2612 if (base_level_ == num_levels_ - 1) {
2613 level_multiplier_ = 1.0;
2614 } else {
2615 level_multiplier_ = std::pow(
2616 static_cast<double>(max_level_size) /
2617 static_cast<double>(base_level_size),
2618 1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
2619 }
2620 }
2621
2622 uint64_t level_size = base_level_size;
2623 for (int i = base_level_; i < num_levels_; i++) {
2624 if (i > base_level_) {
2625 level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
2626 }
2627 // Don't set any level below base_bytes_max. Otherwise, the LSM can
2628 // assume an hourglass shape where L1+ sizes are smaller than L0. This
2629 // causes compaction scoring, which depends on level sizes, to favor L1+
2630 // at the expense of L0, which may fill up and stall.
2631 level_max_bytes_[i] = std::max(level_size, base_bytes_max);
2632 }
2633 }
2634 }
2635 }
2636
2637 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
2638 // Estimate the live data size by adding up the size of the last level for all
2639 // key ranges. Note: Estimate depends on the ordering of files in level 0
2640 // because files in level 0 can be overlapping.
2641 uint64_t size = 0;
2642
2643 auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
2644 return internal_comparator_->Compare(*x, *y) < 0;
2645 };
2646 // (Ordered) map of largest keys in non-overlapping files
2647 std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
2648
2649 for (int l = num_levels_ - 1; l >= 0; l--) {
2650 bool found_end = false;
2651 for (auto file : files_[l]) {
2652 // Find the first file where the largest key is larger than the smallest
2653 // key of the current file. If this file does not overlap with the
2654 // current file, none of the files in the map does. If there is
2655 // no potential overlap, we can safely insert the rest of this level
2656 // (if the level is not 0) into the map without checking again because
2657 // the elements in the level are sorted and non-overlapping.
2658 auto lb = (found_end && l != 0) ?
2659 ranges.end() : ranges.lower_bound(&file->smallest);
2660 found_end = (lb == ranges.end());
2661 if (found_end || internal_comparator_->Compare(
2662 file->largest, (*lb).second->smallest) < 0) {
2663 ranges.emplace_hint(lb, &file->largest, file);
2664 size += file->fd.file_size;
2665 }
2666 }
2667 }
2668 return size;
2669 }
2670
2671 bool VersionStorageInfo::RangeMightExistAfterSortedRun(
2672 const Slice& smallest_user_key, const Slice& largest_user_key,
2673 int last_level, int last_l0_idx) {
2674 assert((last_l0_idx != -1) == (last_level == 0));
2675 // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
2676 // bottommost only if it's the oldest L0 file and there are no files on older
2677 // levels. It'd be better to consider it bottommost if there's no overlap in
2678 // older levels/files.
2679 if (last_level == 0 &&
2680 last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
2681 return true;
2682 }
2683
2684 // Checks whether there are files living beyond the `last_level`. If lower
2685 // levels have files, it checks for overlap between [`smallest_key`,
2686 // `largest_key`] and those files. Bottomlevel optimizations can be made if
2687 // there are no files in lower levels or if there is no overlap with the files
2688 // in the lower levels.
2689 for (int level = last_level + 1; level < num_levels(); level++) {
2690 // The range is not in the bottommost level if there are files in lower
2691 // levels when the `last_level` is 0 or if there are files in lower levels
2692 // which overlap with [`smallest_key`, `largest_key`].
2693 if (files_[level].size() > 0 &&
2694 (last_level == 0 ||
2695 OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
2696 return true;
2697 }
2698 }
2699 return false;
2700 }
2701
2702 void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
2703 for (int level = 0; level < storage_info_.num_levels(); level++) {
2704 const std::vector<FileMetaData*>& files = storage_info_.files_[level];
2705 for (const auto& file : files) {
2706 live->push_back(file->fd);
2707 }
2708 }
2709 }
2710
2711 std::string Version::DebugString(bool hex, bool print_stats) const {
2712 std::string r;
2713 for (int level = 0; level < storage_info_.num_levels_; level++) {
2714 // E.g.,
2715 // --- level 1 ---
2716 // 17:123['a' .. 'd']
2717 // 20:43['e' .. 'g']
2718 //
2719 // if print_stats=true:
2720 // 17:123['a' .. 'd'](4096)
2721 r.append("--- level ");
2722 AppendNumberTo(&r, level);
2723 r.append(" --- version# ");
2724 AppendNumberTo(&r, version_number_);
2725 r.append(" ---\n");
2726 const std::vector<FileMetaData*>& files = storage_info_.files_[level];
2727 for (size_t i = 0; i < files.size(); i++) {
2728 r.push_back(' ');
2729 AppendNumberTo(&r, files[i]->fd.GetNumber());
2730 r.push_back(':');
2731 AppendNumberTo(&r, files[i]->fd.GetFileSize());
2732 r.append("[");
2733 r.append(files[i]->smallest.DebugString(hex));
2734 r.append(" .. ");
2735 r.append(files[i]->largest.DebugString(hex));
2736 r.append("]");
2737 if (print_stats) {
2738 r.append("(");
2739 r.append(ToString(
2740 files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
2741 r.append(")");
2742 }
2743 r.append("\n");
2744 }
2745 }
2746 return r;
2747 }
2748
2749 // this is used to batch writes to the manifest file
2750 struct VersionSet::ManifestWriter {
2751 Status status;
2752 bool done;
2753 InstrumentedCondVar cv;
2754 ColumnFamilyData* cfd;
2755 const MutableCFOptions mutable_cf_options;
2756 const autovector<VersionEdit*>& edit_list;
2757
2758 explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
2759 const MutableCFOptions& cf_options,
2760 const autovector<VersionEdit*>& e)
2761 : done(false),
2762 cv(mu),
2763 cfd(_cfd),
2764 mutable_cf_options(cf_options),
2765 edit_list(e) {}
2766 };
2767
2768 VersionSet::VersionSet(const std::string& dbname,
2769 const ImmutableDBOptions* _db_options,
2770 const EnvOptions& storage_options, Cache* table_cache,
2771 WriteBufferManager* write_buffer_manager,
2772 WriteController* write_controller)
2773 : column_family_set_(
2774 new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
2775 write_buffer_manager, write_controller)),
2776 env_(_db_options->env),
2777 dbname_(dbname),
2778 db_options_(_db_options),
2779 next_file_number_(2),
2780 manifest_file_number_(0), // Filled by Recover()
2781 options_file_number_(0),
2782 pending_manifest_file_number_(0),
2783 last_sequence_(0),
2784 last_allocated_sequence_(0),
2785 last_published_sequence_(0),
2786 prev_log_number_(0),
2787 current_version_number_(0),
2788 manifest_file_size_(0),
2789 env_options_(storage_options) {}
2790
2791 void CloseTables(void* ptr, size_t) {
2792 TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
2793 table_reader->Close();
2794 }
2795
2796 VersionSet::~VersionSet() {
2797 // we need to delete column_family_set_ because its destructor depends on
2798 // VersionSet
2799 Cache* table_cache = column_family_set_->get_table_cache();
2800 table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */);
2801 column_family_set_.reset();
2802 for (auto& file : obsolete_files_) {
2803 if (file.metadata->table_reader_handle) {
2804 table_cache->Release(file.metadata->table_reader_handle);
2805 TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
2806 }
2807 file.DeleteMetadata();
2808 }
2809 obsolete_files_.clear();
2810 }
2811
2812 void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
2813 Version* v) {
2814 // compute new compaction score
2815 v->storage_info()->ComputeCompactionScore(
2816 *column_family_data->ioptions(),
2817 *column_family_data->GetLatestMutableCFOptions());
2818
2819 // Mark v finalized
2820 v->storage_info_.SetFinalized();
2821
2822 // Make "v" current
2823 assert(v->refs_ == 0);
2824 Version* current = column_family_data->current();
2825 assert(v != current);
2826 if (current != nullptr) {
2827 assert(current->refs_ > 0);
2828 current->Unref();
2829 }
2830 column_family_data->SetCurrent(v);
2831 v->Ref();
2832
2833 // Append to linked list
2834 v->prev_ = column_family_data->dummy_versions()->prev_;
2835 v->next_ = column_family_data->dummy_versions();
2836 v->prev_->next_ = v;
2837 v->next_->prev_ = v;
2838 }
2839
2840 Status VersionSet::ProcessManifestWrites(
2841 std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
2842 Directory* db_directory, bool new_descriptor_log,
2843 const ColumnFamilyOptions* new_cf_options) {
2844 assert(!writers.empty());
2845 ManifestWriter& first_writer = writers.front();
2846 ManifestWriter* last_writer = &first_writer;
2847
2848 assert(!manifest_writers_.empty());
2849 assert(manifest_writers_.front() == &first_writer);
2850
2851 autovector<VersionEdit*> batch_edits;
2852 autovector<Version*> versions;
2853 autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
2854 std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
2855
2856 if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
2857 // No group commits for column family add or drop
2858 LogAndApplyCFHelper(first_writer.edit_list.front());
2859 batch_edits.push_back(first_writer.edit_list.front());
2860 } else {
2861 auto it = manifest_writers_.cbegin();
2862 size_t group_start = std::numeric_limits<size_t>::max();
2863 while (it != manifest_writers_.cend()) {
2864 if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
2865 // no group commits for column family add or drop
2866 break;
2867 }
2868 last_writer = *(it++);
2869 assert(last_writer != nullptr);
2870 assert(last_writer->cfd != nullptr);
2871 if (last_writer->cfd->IsDropped()) {
2872 // If we detect a dropped CF at this point, and the corresponding
2873 // version edits belong to an atomic group, then we need to find out
2874 // the preceding version edits in the same atomic group, and update
2875 // their `remaining_entries_` member variable because we are NOT going
2876 // to write the version edits' of dropped CF to the MANIFEST. If we
2877 // don't update, then Recover can report corrupted atomic group because
2878 // the `remaining_entries_` do not match.
2879 if (!batch_edits.empty()) {
2880 if (batch_edits.back()->is_in_atomic_group_ &&
2881 batch_edits.back()->remaining_entries_ > 0) {
2882 assert(group_start < batch_edits.size());
2883 const auto& edit_list = last_writer->edit_list;
2884 size_t k = 0;
2885 while (k < edit_list.size()) {
2886 if (!edit_list[k]->is_in_atomic_group_) {
2887 break;
2888 } else if (edit_list[k]->remaining_entries_ == 0) {
2889 ++k;
2890 break;
2891 }
2892 ++k;
2893 }
2894 for (auto i = group_start; i < batch_edits.size(); ++i) {
2895 assert(static_cast<uint32_t>(k) <=
2896 batch_edits.back()->remaining_entries_);
2897 batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
2898 }
2899 }
2900 }
2901 continue;
2902 }
2903 // We do a linear search on versions because versions is small.
2904 // TODO(yanqin) maybe consider unordered_map
2905 Version* version = nullptr;
2906 VersionBuilder* builder = nullptr;
2907 for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
2908 uint32_t cf_id = last_writer->cfd->GetID();
2909 if (versions[i]->cfd()->GetID() == cf_id) {
2910 version = versions[i];
2911 assert(!builder_guards.empty() &&
2912 builder_guards.size() == versions.size());
2913 builder = builder_guards[i]->version_builder();
2914 TEST_SYNC_POINT_CALLBACK(
2915 "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
2916 break;
2917 }
2918 }
2919 if (version == nullptr) {
2920 version = new Version(last_writer->cfd, this, env_options_,
2921 last_writer->mutable_cf_options,
2922 current_version_number_++);
2923 versions.push_back(version);
2924 mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
2925 builder_guards.emplace_back(
2926 new BaseReferencedVersionBuilder(last_writer->cfd));
2927 builder = builder_guards.back()->version_builder();
2928 }
2929 assert(builder != nullptr); // make checker happy
2930 for (const auto& e : last_writer->edit_list) {
2931 if (e->is_in_atomic_group_) {
2932 if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
2933 (batch_edits.back()->is_in_atomic_group_ &&
2934 batch_edits.back()->remaining_entries_ == 0)) {
2935 group_start = batch_edits.size();
2936 }
2937 } else if (group_start != std::numeric_limits<size_t>::max()) {
2938 group_start = std::numeric_limits<size_t>::max();
2939 }
2940 LogAndApplyHelper(last_writer->cfd, builder, e, mu);
2941 batch_edits.push_back(e);
2942 }
2943 }
2944 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
2945 assert(!builder_guards.empty() &&
2946 builder_guards.size() == versions.size());
2947 auto* builder = builder_guards[i]->version_builder();
2948 builder->SaveTo(versions[i]->storage_info());
2949 }
2950 }
2951
2952 #ifndef NDEBUG
2953 // Verify that version edits of atomic groups have correct
2954 // remaining_entries_.
2955 size_t k = 0;
2956 while (k < batch_edits.size()) {
2957 while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
2958 ++k;
2959 }
2960 if (k == batch_edits.size()) {
2961 break;
2962 }
2963 size_t i = k;
2964 while (i < batch_edits.size()) {
2965 if (!batch_edits[i]->is_in_atomic_group_) {
2966 break;
2967 }
2968 assert(i - k + batch_edits[i]->remaining_entries_ ==
2969 batch_edits[k]->remaining_entries_);
2970 if (batch_edits[i]->remaining_entries_ == 0) {
2971 ++i;
2972 break;
2973 }
2974 ++i;
2975 }
2976 assert(batch_edits[i - 1]->is_in_atomic_group_);
2977 assert(0 == batch_edits[i - 1]->remaining_entries_);
2978 std::vector<VersionEdit*> tmp;
2979 for (size_t j = k; j != i; ++j) {
2980 tmp.emplace_back(batch_edits[j]);
2981 }
2982 TEST_SYNC_POINT_CALLBACK(
2983 "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
2984 k = i;
2985 }
2986 #endif // NDEBUG
2987
2988 uint64_t new_manifest_file_size = 0;
2989 Status s;
2990
2991 assert(pending_manifest_file_number_ == 0);
2992 if (!descriptor_log_ ||
2993 manifest_file_size_ > db_options_->max_manifest_file_size) {
2994 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
2995 pending_manifest_file_number_ = NewFileNumber();
2996 batch_edits.back()->SetNextFile(next_file_number_.load());
2997 new_descriptor_log = true;
2998 } else {
2999 pending_manifest_file_number_ = manifest_file_number_;
3000 }
3001
3002 if (new_descriptor_log) {
3003 // if we are writing out new snapshot make sure to persist max column
3004 // family.
3005 if (column_family_set_->GetMaxColumnFamily() > 0) {
3006 first_writer.edit_list.front()->SetMaxColumnFamily(
3007 column_family_set_->GetMaxColumnFamily());
3008 }
3009 }
3010
3011 {
3012 EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
3013 mu->Unlock();
3014
3015 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
3016 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3017 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3018 assert(!builder_guards.empty() &&
3019 builder_guards.size() == versions.size());
3020 assert(!mutable_cf_options_ptrs.empty() &&
3021 builder_guards.size() == versions.size());
3022 ColumnFamilyData* cfd = versions[i]->cfd_;
3023 builder_guards[i]->version_builder()->LoadTableHandlers(
3024 cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
3025 true /* prefetch_index_and_filter_in_cache */,
3026 false /* is_initial_load */,
3027 mutable_cf_options_ptrs[i]->prefix_extractor.get());
3028 }
3029 }
3030
3031 // This is fine because everything inside of this block is serialized --
3032 // only one thread can be here at the same time
3033 if (new_descriptor_log) {
3034 // create new manifest file
3035 ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
3036 pending_manifest_file_number_);
3037 std::string descriptor_fname =
3038 DescriptorFileName(dbname_, pending_manifest_file_number_);
3039 std::unique_ptr<WritableFile> descriptor_file;
3040 s = NewWritableFile(env_, descriptor_fname, &descriptor_file,
3041 opt_env_opts);
3042 if (s.ok()) {
3043 descriptor_file->SetPreallocationBlockSize(
3044 db_options_->manifest_preallocation_size);
3045
3046 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
3047 std::move(descriptor_file), descriptor_fname, opt_env_opts, env_,
3048 nullptr, db_options_->listeners));
3049 descriptor_log_.reset(
3050 new log::Writer(std::move(file_writer), 0, false));
3051 s = WriteSnapshot(descriptor_log_.get());
3052 }
3053 }
3054
3055 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3056 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3057 versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
3058 }
3059 }
3060
3061 // Write new records to MANIFEST log
3062 if (s.ok()) {
3063 #ifndef NDEBUG
3064 size_t idx = 0;
3065 #endif
3066 for (auto& e : batch_edits) {
3067 std::string record;
3068 if (!e->EncodeTo(&record)) {
3069 s = Status::Corruption("Unable to encode VersionEdit:" +
3070 e->DebugString(true));
3071 break;
3072 }
3073 TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
3074 rocksdb_kill_odds * REDUCE_ODDS2);
3075 #ifndef NDEBUG
3076 if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
3077 TEST_SYNC_POINT(
3078 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0");
3079 TEST_SYNC_POINT(
3080 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
3081 }
3082 ++idx;
3083 #endif /* !NDEBUG */
3084 s = descriptor_log_->AddRecord(record);
3085 if (!s.ok()) {
3086 break;
3087 }
3088 }
3089 if (s.ok()) {
3090 s = SyncManifest(env_, db_options_, descriptor_log_->file());
3091 }
3092 if (!s.ok()) {
3093 ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
3094 s.ToString().c_str());
3095 }
3096 }
3097
3098 // If we just created a new descriptor file, install it by writing a
3099 // new CURRENT file that points to it.
3100 if (s.ok() && new_descriptor_log) {
3101 s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
3102 db_directory);
3103 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
3104 }
3105
3106 if (s.ok()) {
3107 // find offset in manifest file where this version is stored.
3108 new_manifest_file_size = descriptor_log_->file()->GetFileSize();
3109 }
3110
3111 if (first_writer.edit_list.front()->is_column_family_drop_) {
3112 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
3113 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
3114 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
3115 }
3116
3117 LogFlush(db_options_->info_log);
3118 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
3119 mu->Lock();
3120 }
3121
3122 // Append the old manifest file to the obsolete_manifest_ list to be deleted
3123 // by PurgeObsoleteFiles later.
3124 if (s.ok() && new_descriptor_log) {
3125 obsolete_manifests_.emplace_back(
3126 DescriptorFileName("", manifest_file_number_));
3127 }
3128
3129 // Install the new versions
3130 if (s.ok()) {
3131 if (first_writer.edit_list.front()->is_column_family_add_) {
3132 assert(batch_edits.size() == 1);
3133 assert(new_cf_options != nullptr);
3134 CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
3135 } else if (first_writer.edit_list.front()->is_column_family_drop_) {
3136 assert(batch_edits.size() == 1);
3137 first_writer.cfd->SetDropped();
3138 if (first_writer.cfd->Unref()) {
3139 delete first_writer.cfd;
3140 }
3141 } else {
3142 // Each version in versions corresponds to a column family.
3143 // For each column family, update its log number indicating that logs
3144 // with number smaller than this should be ignored.
3145 for (const auto version : versions) {
3146 uint64_t max_log_number_in_batch = 0;
3147 uint32_t cf_id = version->cfd_->GetID();
3148 for (const auto& e : batch_edits) {
3149 if (e->has_log_number_ && e->column_family_ == cf_id) {
3150 max_log_number_in_batch =
3151 std::max(max_log_number_in_batch, e->log_number_);
3152 }
3153 }
3154 if (max_log_number_in_batch != 0) {
3155 assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
3156 version->cfd_->SetLogNumber(max_log_number_in_batch);
3157 }
3158 }
3159
3160 uint64_t last_min_log_number_to_keep = 0;
3161 for (auto& e : batch_edits) {
3162 if (e->has_min_log_number_to_keep_) {
3163 last_min_log_number_to_keep =
3164 std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
3165 }
3166 }
3167
3168 if (last_min_log_number_to_keep != 0) {
3169 // Should only be set in 2PC mode.
3170 MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
3171 }
3172
3173 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3174 ColumnFamilyData* cfd = versions[i]->cfd_;
3175 AppendVersion(cfd, versions[i]);
3176 }
3177 }
3178 manifest_file_number_ = pending_manifest_file_number_;
3179 manifest_file_size_ = new_manifest_file_size;
3180 prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
3181 } else {
3182 std::string version_edits;
3183 for (auto& e : batch_edits) {
3184 version_edits += ("\n" + e->DebugString(true));
3185 }
3186 ROCKS_LOG_ERROR(db_options_->info_log,
3187 "Error in committing version edit to MANIFEST: %s",
3188 version_edits.c_str());
3189 for (auto v : versions) {
3190 delete v;
3191 }
3192 if (new_descriptor_log) {
3193 ROCKS_LOG_INFO(db_options_->info_log,
3194 "Deleting manifest %" PRIu64 " current manifest %" PRIu64
3195 "\n",
3196 manifest_file_number_, pending_manifest_file_number_);
3197 descriptor_log_.reset();
3198 env_->DeleteFile(
3199 DescriptorFileName(dbname_, pending_manifest_file_number_));
3200 }
3201 }
3202
3203 pending_manifest_file_number_ = 0;
3204
3205 // wake up all the waiting writers
3206 while (true) {
3207 ManifestWriter* ready = manifest_writers_.front();
3208 manifest_writers_.pop_front();
3209 bool need_signal = true;
3210 for (const auto& w : writers) {
3211 if (&w == ready) {
3212 need_signal = false;
3213 break;
3214 }
3215 }
3216 ready->status = s;
3217 ready->done = true;
3218 if (need_signal) {
3219 ready->cv.Signal();
3220 }
3221 if (ready == last_writer) {
3222 break;
3223 }
3224 }
3225 if (!manifest_writers_.empty()) {
3226 manifest_writers_.front()->cv.Signal();
3227 }
3228 return s;
3229 }
3230
3231 // 'datas' is gramatically incorrect. We still use this notation to indicate
3232 // that this variable represents a collection of column_family_data.
3233 Status VersionSet::LogAndApply(
3234 const autovector<ColumnFamilyData*>& column_family_datas,
3235 const autovector<const MutableCFOptions*>& mutable_cf_options_list,
3236 const autovector<autovector<VersionEdit*>>& edit_lists,
3237 InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log,
3238 const ColumnFamilyOptions* new_cf_options) {
3239 mu->AssertHeld();
3240 int num_edits = 0;
3241 for (const auto& elist : edit_lists) {
3242 num_edits += static_cast<int>(elist.size());
3243 }
3244 if (num_edits == 0) {
3245 return Status::OK();
3246 } else if (num_edits > 1) {
3247 #ifndef NDEBUG
3248 for (const auto& edit_list : edit_lists) {
3249 for (const auto& edit : edit_list) {
3250 assert(!edit->IsColumnFamilyManipulation());
3251 }
3252 }
3253 #endif /* ! NDEBUG */
3254 }
3255
3256 int num_cfds = static_cast<int>(column_family_datas.size());
3257 if (num_cfds == 1 && column_family_datas[0] == nullptr) {
3258 assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
3259 assert(edit_lists[0][0]->is_column_family_add_);
3260 assert(new_cf_options != nullptr);
3261 }
3262 std::deque<ManifestWriter> writers;
3263 if (num_cfds > 0) {
3264 assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
3265 assert(static_cast<size_t>(num_cfds) == edit_lists.size());
3266 }
3267 for (int i = 0; i < num_cfds; ++i) {
3268 writers.emplace_back(mu, column_family_datas[i],
3269 *mutable_cf_options_list[i], edit_lists[i]);
3270 manifest_writers_.push_back(&writers[i]);
3271 }
3272 assert(!writers.empty());
3273 ManifestWriter& first_writer = writers.front();
3274 while (!first_writer.done && &first_writer != manifest_writers_.front()) {
3275 first_writer.cv.Wait();
3276 }
3277 if (first_writer.done) {
3278 // All non-CF-manipulation operations can be grouped together and committed
3279 // to MANIFEST. They should all have finished. The status code is stored in
3280 // the first manifest writer.
3281 #ifndef NDEBUG
3282 for (const auto& writer : writers) {
3283 assert(writer.done);
3284 }
3285 #endif /* !NDEBUG */
3286 return first_writer.status;
3287 }
3288
3289 int num_undropped_cfds = 0;
3290 for (auto cfd : column_family_datas) {
3291 // if cfd == nullptr, it is a column family add.
3292 if (cfd == nullptr || !cfd->IsDropped()) {
3293 ++num_undropped_cfds;
3294 }
3295 }
3296 if (0 == num_undropped_cfds) {
3297 // TODO (yanqin) maybe use a different status code to denote column family
3298 // drop other than OK and ShutdownInProgress
3299 for (int i = 0; i != num_cfds; ++i) {
3300 manifest_writers_.pop_front();
3301 }
3302 // Notify new head of manifest write queue.
3303 if (!manifest_writers_.empty()) {
3304 manifest_writers_.front()->cv.Signal();
3305 }
3306 return Status::ShutdownInProgress();
3307 }
3308
3309 return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
3310 new_cf_options);
3311 }
3312
3313 void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
3314 assert(edit->IsColumnFamilyManipulation());
3315 edit->SetNextFile(next_file_number_.load());
3316 // The log might have data that is not visible to memtbale and hence have not
3317 // updated the last_sequence_ yet. It is also possible that the log has is
3318 // expecting some new data that is not written yet. Since LastSequence is an
3319 // upper bound on the sequence, it is ok to record
3320 // last_allocated_sequence_ as the last sequence.
3321 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
3322 : last_sequence_);
3323 if (edit->is_column_family_drop_) {
3324 // if we drop column family, we have to make sure to save max column family,
3325 // so that we don't reuse existing ID
3326 edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
3327 }
3328 }
3329
3330 void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
3331 VersionBuilder* builder, VersionEdit* edit,
3332 InstrumentedMutex* mu) {
3333 #ifdef NDEBUG
3334 (void)cfd;
3335 #endif
3336 mu->AssertHeld();
3337 assert(!edit->IsColumnFamilyManipulation());
3338
3339 if (edit->has_log_number_) {
3340 assert(edit->log_number_ >= cfd->GetLogNumber());
3341 assert(edit->log_number_ < next_file_number_.load());
3342 }
3343
3344 if (!edit->has_prev_log_number_) {
3345 edit->SetPrevLogNumber(prev_log_number_);
3346 }
3347 edit->SetNextFile(next_file_number_.load());
3348 // The log might have data that is not visible to memtbale and hence have not
3349 // updated the last_sequence_ yet. It is also possible that the log has is
3350 // expecting some new data that is not written yet. Since LastSequence is an
3351 // upper bound on the sequence, it is ok to record
3352 // last_allocated_sequence_ as the last sequence.
3353 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
3354 : last_sequence_);
3355
3356 builder->Apply(edit);
3357 }
3358
3359 Status VersionSet::ApplyOneVersionEditToBuilder(
3360 VersionEdit& edit,
3361 const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
3362 std::unordered_map<int, std::string>& column_families_not_found,
3363 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
3364 builders,
3365 bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
3366 uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
3367 bool* have_last_sequence, SequenceNumber* last_sequence,
3368 uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
3369 // Not found means that user didn't supply that column
3370 // family option AND we encountered column family add
3371 // record. Once we encounter column family drop record,
3372 // we will delete the column family from
3373 // column_families_not_found.
3374 bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
3375 column_families_not_found.end());
3376 // in builders means that user supplied that column family
3377 // option AND that we encountered column family add record
3378 bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
3379
3380 // they can't both be true
3381 assert(!(cf_in_not_found && cf_in_builders));
3382
3383 ColumnFamilyData* cfd = nullptr;
3384
3385 if (edit.is_column_family_add_) {
3386 if (cf_in_builders || cf_in_not_found) {
3387 return Status::Corruption(
3388 "Manifest adding the same column family twice: " +
3389 edit.column_family_name_);
3390 }
3391 auto cf_options = name_to_options.find(edit.column_family_name_);
3392 if (cf_options == name_to_options.end()) {
3393 column_families_not_found.insert(
3394 {edit.column_family_, edit.column_family_name_});
3395 } else {
3396 cfd = CreateColumnFamily(cf_options->second, &edit);
3397 cfd->set_initialized();
3398 builders.insert(std::make_pair(
3399 edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
3400 new BaseReferencedVersionBuilder(cfd))));
3401 }
3402 } else if (edit.is_column_family_drop_) {
3403 if (cf_in_builders) {
3404 auto builder = builders.find(edit.column_family_);
3405 assert(builder != builders.end());
3406 builders.erase(builder);
3407 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
3408 assert(cfd != nullptr);
3409 if (cfd->Unref()) {
3410 delete cfd;
3411 cfd = nullptr;
3412 } else {
3413 // who else can have reference to cfd!?
3414 assert(false);
3415 }
3416 } else if (cf_in_not_found) {
3417 column_families_not_found.erase(edit.column_family_);
3418 } else {
3419 return Status::Corruption(
3420 "Manifest - dropping non-existing column family");
3421 }
3422 } else if (!cf_in_not_found) {
3423 if (!cf_in_builders) {
3424 return Status::Corruption(
3425 "Manifest record referencing unknown column family");
3426 }
3427
3428 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
3429 // this should never happen since cf_in_builders is true
3430 assert(cfd != nullptr);
3431
3432 // if it is not column family add or column family drop,
3433 // then it's a file add/delete, which should be forwarded
3434 // to builder
3435 auto builder = builders.find(edit.column_family_);
3436 assert(builder != builders.end());
3437 builder->second->version_builder()->Apply(&edit);
3438 }
3439 return ExtractInfoFromVersionEdit(
3440 cfd, edit, have_log_number, log_number, have_prev_log_number,
3441 previous_log_number, have_next_file, next_file, have_last_sequence,
3442 last_sequence, min_log_number_to_keep, max_column_family);
3443 }
3444
3445 Status VersionSet::ExtractInfoFromVersionEdit(
3446 ColumnFamilyData* cfd, const VersionEdit& edit, bool* have_log_number,
3447 uint64_t* log_number, bool* have_prev_log_number,
3448 uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
3449 bool* have_last_sequence, SequenceNumber* last_sequence,
3450 uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
3451 if (cfd != nullptr) {
3452 if (edit.has_log_number_) {
3453 if (cfd->GetLogNumber() > edit.log_number_) {
3454 ROCKS_LOG_WARN(
3455 db_options_->info_log,
3456 "MANIFEST corruption detected, but ignored - Log numbers in "
3457 "records NOT monotonically increasing");
3458 } else {
3459 cfd->SetLogNumber(edit.log_number_);
3460 *have_log_number = true;
3461 *log_number = edit.log_number_;
3462 }
3463 }
3464 if (edit.has_comparator_ &&
3465 edit.comparator_ != cfd->user_comparator()->Name()) {
3466 return Status::InvalidArgument(
3467 cfd->user_comparator()->Name(),
3468 "does not match existing comparator " + edit.comparator_);
3469 }
3470 }
3471
3472 if (edit.has_prev_log_number_) {
3473 *previous_log_number = edit.prev_log_number_;
3474 *have_prev_log_number = true;
3475 }
3476
3477 if (edit.has_next_file_number_) {
3478 *next_file = edit.next_file_number_;
3479 *have_next_file = true;
3480 }
3481
3482 if (edit.has_max_column_family_) {
3483 *max_column_family = edit.max_column_family_;
3484 }
3485
3486 if (edit.has_min_log_number_to_keep_) {
3487 *min_log_number_to_keep =
3488 std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_);
3489 }
3490
3491 if (edit.has_last_sequence_) {
3492 *last_sequence = edit.last_sequence_;
3493 *have_last_sequence = true;
3494 }
3495 return Status::OK();
3496 }
3497
3498 Status VersionSet::GetCurrentManifestPath(std::string* manifest_path) {
3499 assert(manifest_path != nullptr);
3500 std::string fname;
3501 Status s = ReadFileToString(env_, CurrentFileName(dbname_), &fname);
3502 if (!s.ok()) {
3503 return s;
3504 }
3505 if (fname.empty() || fname.back() != '\n') {
3506 return Status::Corruption("CURRENT file does not end with newline");
3507 }
3508 // remove the trailing '\n'
3509 fname.resize(fname.size() - 1);
3510 FileType type;
3511 bool parse_ok = ParseFileName(fname, &manifest_file_number_, &type);
3512 if (!parse_ok || type != kDescriptorFile) {
3513 return Status::Corruption("CURRENT file corrupted");
3514 }
3515 *manifest_path = dbname_;
3516 if (dbname_.back() != '/') {
3517 manifest_path->push_back('/');
3518 }
3519 *manifest_path += fname;
3520 return Status::OK();
3521 }
3522
3523 Status VersionSet::Recover(
3524 const std::vector<ColumnFamilyDescriptor>& column_families,
3525 bool read_only) {
3526 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
3527 for (auto cf : column_families) {
3528 cf_name_to_options.insert({cf.name, cf.options});
3529 }
3530 // keeps track of column families in manifest that were not found in
3531 // column families parameters. if those column families are not dropped
3532 // by subsequent manifest records, Recover() will return failure status
3533 std::unordered_map<int, std::string> column_families_not_found;
3534
3535 // Read "CURRENT" file, which contains a pointer to the current manifest file
3536 std::string manifest_path;
3537 Status s = GetCurrentManifestPath(&manifest_path);
3538 if (!s.ok()) {
3539 return s;
3540 }
3541
3542 ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
3543 manifest_path.c_str());
3544
3545 std::unique_ptr<SequentialFileReader> manifest_file_reader;
3546 {
3547 std::unique_ptr<SequentialFile> manifest_file;
3548 s = env_->NewSequentialFile(manifest_path, &manifest_file,
3549 env_->OptimizeForManifestRead(env_options_));
3550 if (!s.ok()) {
3551 return s;
3552 }
3553 manifest_file_reader.reset(
3554 new SequentialFileReader(std::move(manifest_file), manifest_path));
3555 }
3556 uint64_t current_manifest_file_size;
3557 s = env_->GetFileSize(manifest_path, &current_manifest_file_size);
3558 if (!s.ok()) {
3559 return s;
3560 }
3561
3562 bool have_log_number = false;
3563 bool have_prev_log_number = false;
3564 bool have_next_file = false;
3565 bool have_last_sequence = false;
3566 uint64_t next_file = 0;
3567 uint64_t last_sequence = 0;
3568 uint64_t log_number = 0;
3569 uint64_t previous_log_number = 0;
3570 uint32_t max_column_family = 0;
3571 uint64_t min_log_number_to_keep = 0;
3572 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
3573 builders;
3574
3575 // add default column family
3576 auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
3577 if (default_cf_iter == cf_name_to_options.end()) {
3578 return Status::InvalidArgument("Default column family not specified");
3579 }
3580 VersionEdit default_cf_edit;
3581 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
3582 default_cf_edit.SetColumnFamily(0);
3583 ColumnFamilyData* default_cfd =
3584 CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
3585 // In recovery, nobody else can access it, so it's fine to set it to be
3586 // initialized earlier.
3587 default_cfd->set_initialized();
3588 builders.insert(
3589 std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
3590 new BaseReferencedVersionBuilder(default_cfd))));
3591
3592 {
3593 VersionSet::LogReporter reporter;
3594 reporter.status = &s;
3595 log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
3596 true /* checksum */, 0 /* log_number */);
3597 Slice record;
3598 std::string scratch;
3599 std::vector<VersionEdit> replay_buffer;
3600 size_t num_entries_decoded = 0;
3601 while (reader.ReadRecord(&record, &scratch) && s.ok()) {
3602 VersionEdit edit;
3603 s = edit.DecodeFrom(record);
3604 if (!s.ok()) {
3605 break;
3606 }
3607
3608 if (edit.is_in_atomic_group_) {
3609 if (replay_buffer.empty()) {
3610 replay_buffer.resize(edit.remaining_entries_ + 1);
3611 TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:FirstInAtomicGroup",
3612 &edit);
3613 }
3614 ++num_entries_decoded;
3615 if (num_entries_decoded + edit.remaining_entries_ !=
3616 static_cast<uint32_t>(replay_buffer.size())) {
3617 TEST_SYNC_POINT_CALLBACK(
3618 "VersionSet::Recover:IncorrectAtomicGroupSize", &edit);
3619 s = Status::Corruption("corrupted atomic group");
3620 break;
3621 }
3622 replay_buffer[num_entries_decoded - 1] = std::move(edit);
3623 if (num_entries_decoded == replay_buffer.size()) {
3624 TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup",
3625 &edit);
3626 for (auto& e : replay_buffer) {
3627 s = ApplyOneVersionEditToBuilder(
3628 e, cf_name_to_options, column_families_not_found, builders,
3629 &have_log_number, &log_number, &have_prev_log_number,
3630 &previous_log_number, &have_next_file, &next_file,
3631 &have_last_sequence, &last_sequence, &min_log_number_to_keep,
3632 &max_column_family);
3633 if (!s.ok()) {
3634 break;
3635 }
3636 }
3637 replay_buffer.clear();
3638 num_entries_decoded = 0;
3639 }
3640 TEST_SYNC_POINT("VersionSet::Recover:AtomicGroup");
3641 } else {
3642 if (!replay_buffer.empty()) {
3643 TEST_SYNC_POINT_CALLBACK(
3644 "VersionSet::Recover:AtomicGroupMixedWithNormalEdits", &edit);
3645 s = Status::Corruption("corrupted atomic group");
3646 break;
3647 }
3648 s = ApplyOneVersionEditToBuilder(
3649 edit, cf_name_to_options, column_families_not_found, builders,
3650 &have_log_number, &log_number, &have_prev_log_number,
3651 &previous_log_number, &have_next_file, &next_file,
3652 &have_last_sequence, &last_sequence, &min_log_number_to_keep,
3653 &max_column_family);
3654 }
3655 if (!s.ok()) {
3656 break;
3657 }
3658 }
3659 }
3660
3661 if (s.ok()) {
3662 if (!have_next_file) {
3663 s = Status::Corruption("no meta-nextfile entry in descriptor");
3664 } else if (!have_log_number) {
3665 s = Status::Corruption("no meta-lognumber entry in descriptor");
3666 } else if (!have_last_sequence) {
3667 s = Status::Corruption("no last-sequence-number entry in descriptor");
3668 }
3669
3670 if (!have_prev_log_number) {
3671 previous_log_number = 0;
3672 }
3673
3674 column_family_set_->UpdateMaxColumnFamily(max_column_family);
3675
3676 // When reading DB generated using old release, min_log_number_to_keep=0.
3677 // All log files will be scanned for potential prepare entries.
3678 MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
3679 MarkFileNumberUsed(previous_log_number);
3680 MarkFileNumberUsed(log_number);
3681 }
3682
3683 // there were some column families in the MANIFEST that weren't specified
3684 // in the argument. This is OK in read_only mode
3685 if (read_only == false && !column_families_not_found.empty()) {
3686 std::string list_of_not_found;
3687 for (const auto& cf : column_families_not_found) {
3688 list_of_not_found += ", " + cf.second;
3689 }
3690 list_of_not_found = list_of_not_found.substr(2);
3691 s = Status::InvalidArgument(
3692 "You have to open all column families. Column families not opened: " +
3693 list_of_not_found);
3694 }
3695
3696 if (s.ok()) {
3697 for (auto cfd : *column_family_set_) {
3698 assert(builders.count(cfd->GetID()) > 0);
3699 auto* builder = builders[cfd->GetID()]->version_builder();
3700 if (!builder->CheckConsistencyForNumLevels()) {
3701 s = Status::InvalidArgument(
3702 "db has more levels than options.num_levels");
3703 break;
3704 }
3705 }
3706 }
3707
3708 if (s.ok()) {
3709 for (auto cfd : *column_family_set_) {
3710 if (cfd->IsDropped()) {
3711 continue;
3712 }
3713 if (read_only) {
3714 cfd->table_cache()->SetTablesAreImmortal();
3715 }
3716 assert(cfd->initialized());
3717 auto builders_iter = builders.find(cfd->GetID());
3718 assert(builders_iter != builders.end());
3719 auto builder = builders_iter->second->version_builder();
3720
3721 // unlimited table cache. Pre-load table handle now.
3722 // Need to do it out of the mutex.
3723 builder->LoadTableHandlers(
3724 cfd->internal_stats(), db_options_->max_file_opening_threads,
3725 false /* prefetch_index_and_filter_in_cache */,
3726 true /* is_initial_load */,
3727 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
3728
3729 Version* v = new Version(cfd, this, env_options_,
3730 *cfd->GetLatestMutableCFOptions(),
3731 current_version_number_++);
3732 builder->SaveTo(v->storage_info());
3733
3734 // Install recovered version
3735 v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
3736 !(db_options_->skip_stats_update_on_db_open));
3737 AppendVersion(cfd, v);
3738 }
3739
3740 manifest_file_size_ = current_manifest_file_size;
3741 next_file_number_.store(next_file + 1);
3742 last_allocated_sequence_ = last_sequence;
3743 last_published_sequence_ = last_sequence;
3744 last_sequence_ = last_sequence;
3745 prev_log_number_ = previous_log_number;
3746
3747 ROCKS_LOG_INFO(
3748 db_options_->info_log,
3749 "Recovered from manifest file:%s succeeded,"
3750 "manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
3751 ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
3752 ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
3753 ",min_log_number_to_keep is %" PRIu64 "\n",
3754 manifest_path.c_str(), manifest_file_number_,
3755 next_file_number_.load(), last_sequence_.load(), log_number,
3756 prev_log_number_, column_family_set_->GetMaxColumnFamily(),
3757 min_log_number_to_keep_2pc());
3758
3759 for (auto cfd : *column_family_set_) {
3760 if (cfd->IsDropped()) {
3761 continue;
3762 }
3763 ROCKS_LOG_INFO(db_options_->info_log,
3764 "Column family [%s] (ID %" PRIu32
3765 "), log number is %" PRIu64 "\n",
3766 cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
3767 }
3768 }
3769
3770 return s;
3771 }
3772
3773 Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
3774 const std::string& dbname, Env* env) {
3775 // these are just for performance reasons, not correcntes,
3776 // so we're fine using the defaults
3777 EnvOptions soptions;
3778 // Read "CURRENT" file, which contains a pointer to the current manifest file
3779 std::string current;
3780 Status s = ReadFileToString(env, CurrentFileName(dbname), &current);
3781 if (!s.ok()) {
3782 return s;
3783 }
3784 if (current.empty() || current[current.size()-1] != '\n') {
3785 return Status::Corruption("CURRENT file does not end with newline");
3786 }
3787 current.resize(current.size() - 1);
3788
3789 std::string dscname = dbname + "/" + current;
3790
3791 std::unique_ptr<SequentialFileReader> file_reader;
3792 {
3793 std::unique_ptr<SequentialFile> file;
3794 s = env->NewSequentialFile(dscname, &file, soptions);
3795 if (!s.ok()) {
3796 return s;
3797 }
3798 file_reader.reset(new SequentialFileReader(std::move(file), dscname));
3799 }
3800
3801 std::map<uint32_t, std::string> column_family_names;
3802 // default column family is always implicitly there
3803 column_family_names.insert({0, kDefaultColumnFamilyName});
3804 VersionSet::LogReporter reporter;
3805 reporter.status = &s;
3806 log::Reader reader(nullptr, std::move(file_reader), &reporter,
3807 true /* checksum */, 0 /* log_number */);
3808 Slice record;
3809 std::string scratch;
3810 while (reader.ReadRecord(&record, &scratch) && s.ok()) {
3811 VersionEdit edit;
3812 s = edit.DecodeFrom(record);
3813 if (!s.ok()) {
3814 break;
3815 }
3816 if (edit.is_column_family_add_) {
3817 if (column_family_names.find(edit.column_family_) !=
3818 column_family_names.end()) {
3819 s = Status::Corruption("Manifest adding the same column family twice");
3820 break;
3821 }
3822 column_family_names.insert(
3823 {edit.column_family_, edit.column_family_name_});
3824 } else if (edit.is_column_family_drop_) {
3825 if (column_family_names.find(edit.column_family_) ==
3826 column_family_names.end()) {
3827 s = Status::Corruption(
3828 "Manifest - dropping non-existing column family");
3829 break;
3830 }
3831 column_family_names.erase(edit.column_family_);
3832 }
3833 }
3834
3835 column_families->clear();
3836 if (s.ok()) {
3837 for (const auto& iter : column_family_names) {
3838 column_families->push_back(iter.second);
3839 }
3840 }
3841
3842 return s;
3843 }
3844
3845 #ifndef ROCKSDB_LITE
3846 Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
3847 const Options* options,
3848 const EnvOptions& env_options,
3849 int new_levels) {
3850 if (new_levels <= 1) {
3851 return Status::InvalidArgument(
3852 "Number of levels needs to be bigger than 1");
3853 }
3854
3855 ImmutableDBOptions db_options(*options);
3856 ColumnFamilyOptions cf_options(*options);
3857 std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
3858 options->table_cache_numshardbits));
3859 WriteController wc(options->delayed_write_rate);
3860 WriteBufferManager wb(options->db_write_buffer_size);
3861 VersionSet versions(dbname, &db_options, env_options, tc.get(), &wb, &wc);
3862 Status status;
3863
3864 std::vector<ColumnFamilyDescriptor> dummy;
3865 ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
3866 ColumnFamilyOptions(*options));
3867 dummy.push_back(dummy_descriptor);
3868 status = versions.Recover(dummy);
3869 if (!status.ok()) {
3870 return status;
3871 }
3872
3873 Version* current_version =
3874 versions.GetColumnFamilySet()->GetDefault()->current();
3875 auto* vstorage = current_version->storage_info();
3876 int current_levels = vstorage->num_levels();
3877
3878 if (current_levels <= new_levels) {
3879 return Status::OK();
3880 }
3881
3882 // Make sure there are file only on one level from
3883 // (new_levels-1) to (current_levels-1)
3884 int first_nonempty_level = -1;
3885 int first_nonempty_level_filenum = 0;
3886 for (int i = new_levels - 1; i < current_levels; i++) {
3887 int file_num = vstorage->NumLevelFiles(i);
3888 if (file_num != 0) {
3889 if (first_nonempty_level < 0) {
3890 first_nonempty_level = i;
3891 first_nonempty_level_filenum = file_num;
3892 } else {
3893 char msg[255];
3894 snprintf(msg, sizeof(msg),
3895 "Found at least two levels containing files: "
3896 "[%d:%d],[%d:%d].\n",
3897 first_nonempty_level, first_nonempty_level_filenum, i,
3898 file_num);
3899 return Status::InvalidArgument(msg);
3900 }
3901 }
3902 }
3903
3904 // we need to allocate an array with the old number of levels size to
3905 // avoid SIGSEGV in WriteSnapshot()
3906 // however, all levels bigger or equal to new_levels will be empty
3907 std::vector<FileMetaData*>* new_files_list =
3908 new std::vector<FileMetaData*>[current_levels];
3909 for (int i = 0; i < new_levels - 1; i++) {
3910 new_files_list[i] = vstorage->LevelFiles(i);
3911 }
3912
3913 if (first_nonempty_level > 0) {
3914 new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
3915 }
3916
3917 delete[] vstorage -> files_;
3918 vstorage->files_ = new_files_list;
3919 vstorage->num_levels_ = new_levels;
3920
3921 MutableCFOptions mutable_cf_options(*options);
3922 VersionEdit ve;
3923 InstrumentedMutex dummy_mutex;
3924 InstrumentedMutexLock l(&dummy_mutex);
3925 return versions.LogAndApply(
3926 versions.GetColumnFamilySet()->GetDefault(),
3927 mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
3928 }
3929
3930 Status VersionSet::DumpManifest(Options& options, std::string& dscname,
3931 bool verbose, bool hex, bool json) {
3932 // Open the specified manifest file.
3933 std::unique_ptr<SequentialFileReader> file_reader;
3934 Status s;
3935 {
3936 std::unique_ptr<SequentialFile> file;
3937 s = options.env->NewSequentialFile(
3938 dscname, &file, env_->OptimizeForManifestRead(env_options_));
3939 if (!s.ok()) {
3940 return s;
3941 }
3942 file_reader.reset(new SequentialFileReader(std::move(file), dscname));
3943 }
3944
3945 bool have_prev_log_number = false;
3946 bool have_next_file = false;
3947 bool have_last_sequence = false;
3948 uint64_t next_file = 0;
3949 uint64_t last_sequence = 0;
3950 uint64_t previous_log_number = 0;
3951 int count = 0;
3952 std::unordered_map<uint32_t, std::string> comparators;
3953 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
3954 builders;
3955
3956 // add default column family
3957 VersionEdit default_cf_edit;
3958 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
3959 default_cf_edit.SetColumnFamily(0);
3960 ColumnFamilyData* default_cfd =
3961 CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
3962 builders.insert(
3963 std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
3964 new BaseReferencedVersionBuilder(default_cfd))));
3965
3966 {
3967 VersionSet::LogReporter reporter;
3968 reporter.status = &s;
3969 log::Reader reader(nullptr, std::move(file_reader), &reporter,
3970 true /* checksum */, 0 /* log_number */);
3971 Slice record;
3972 std::string scratch;
3973 while (reader.ReadRecord(&record, &scratch) && s.ok()) {
3974 VersionEdit edit;
3975 s = edit.DecodeFrom(record);
3976 if (!s.ok()) {
3977 break;
3978 }
3979
3980 // Write out each individual edit
3981 if (verbose && !json) {
3982 printf("%s\n", edit.DebugString(hex).c_str());
3983 } else if (json) {
3984 printf("%s\n", edit.DebugJSON(count, hex).c_str());
3985 }
3986 count++;
3987
3988 bool cf_in_builders =
3989 builders.find(edit.column_family_) != builders.end();
3990
3991 if (edit.has_comparator_) {
3992 comparators.insert({edit.column_family_, edit.comparator_});
3993 }
3994
3995 ColumnFamilyData* cfd = nullptr;
3996
3997 if (edit.is_column_family_add_) {
3998 if (cf_in_builders) {
3999 s = Status::Corruption(
4000 "Manifest adding the same column family twice");
4001 break;
4002 }
4003 cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
4004 cfd->set_initialized();
4005 builders.insert(std::make_pair(
4006 edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
4007 new BaseReferencedVersionBuilder(cfd))));
4008 } else if (edit.is_column_family_drop_) {
4009 if (!cf_in_builders) {
4010 s = Status::Corruption(
4011 "Manifest - dropping non-existing column family");
4012 break;
4013 }
4014 auto builder_iter = builders.find(edit.column_family_);
4015 builders.erase(builder_iter);
4016 comparators.erase(edit.column_family_);
4017 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4018 assert(cfd != nullptr);
4019 cfd->Unref();
4020 delete cfd;
4021 cfd = nullptr;
4022 } else {
4023 if (!cf_in_builders) {
4024 s = Status::Corruption(
4025 "Manifest record referencing unknown column family");
4026 break;
4027 }
4028
4029 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4030 // this should never happen since cf_in_builders is true
4031 assert(cfd != nullptr);
4032
4033 // if it is not column family add or column family drop,
4034 // then it's a file add/delete, which should be forwarded
4035 // to builder
4036 auto builder = builders.find(edit.column_family_);
4037 assert(builder != builders.end());
4038 builder->second->version_builder()->Apply(&edit);
4039 }
4040
4041 if (cfd != nullptr && edit.has_log_number_) {
4042 cfd->SetLogNumber(edit.log_number_);
4043 }
4044
4045
4046 if (edit.has_prev_log_number_) {
4047 previous_log_number = edit.prev_log_number_;
4048 have_prev_log_number = true;
4049 }
4050
4051 if (edit.has_next_file_number_) {
4052 next_file = edit.next_file_number_;
4053 have_next_file = true;
4054 }
4055
4056 if (edit.has_last_sequence_) {
4057 last_sequence = edit.last_sequence_;
4058 have_last_sequence = true;
4059 }
4060
4061 if (edit.has_max_column_family_) {
4062 column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
4063 }
4064
4065 if (edit.has_min_log_number_to_keep_) {
4066 MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
4067 }
4068 }
4069 }
4070 file_reader.reset();
4071
4072 if (s.ok()) {
4073 if (!have_next_file) {
4074 s = Status::Corruption("no meta-nextfile entry in descriptor");
4075 printf("no meta-nextfile entry in descriptor");
4076 } else if (!have_last_sequence) {
4077 printf("no last-sequence-number entry in descriptor");
4078 s = Status::Corruption("no last-sequence-number entry in descriptor");
4079 }
4080
4081 if (!have_prev_log_number) {
4082 previous_log_number = 0;
4083 }
4084 }
4085
4086 if (s.ok()) {
4087 for (auto cfd : *column_family_set_) {
4088 if (cfd->IsDropped()) {
4089 continue;
4090 }
4091 auto builders_iter = builders.find(cfd->GetID());
4092 assert(builders_iter != builders.end());
4093 auto builder = builders_iter->second->version_builder();
4094
4095 Version* v = new Version(cfd, this, env_options_,
4096 *cfd->GetLatestMutableCFOptions(),
4097 current_version_number_++);
4098 builder->SaveTo(v->storage_info());
4099 v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
4100
4101 printf("--------------- Column family \"%s\" (ID %" PRIu32
4102 ") --------------\n",
4103 cfd->GetName().c_str(), cfd->GetID());
4104 printf("log number: %" PRIu64 "\n", cfd->GetLogNumber());
4105 auto comparator = comparators.find(cfd->GetID());
4106 if (comparator != comparators.end()) {
4107 printf("comparator: %s\n", comparator->second.c_str());
4108 } else {
4109 printf("comparator: <NO COMPARATOR>\n");
4110 }
4111 printf("%s \n", v->DebugString(hex).c_str());
4112 delete v;
4113 }
4114
4115 next_file_number_.store(next_file + 1);
4116 last_allocated_sequence_ = last_sequence;
4117 last_published_sequence_ = last_sequence;
4118 last_sequence_ = last_sequence;
4119 prev_log_number_ = previous_log_number;
4120
4121 printf("next_file_number %" PRIu64 " last_sequence %" PRIu64
4122 " prev_log_number %" PRIu64 " max_column_family %" PRIu32
4123 " min_log_number_to_keep "
4124 "%" PRIu64 "\n",
4125 next_file_number_.load(), last_sequence, previous_log_number,
4126 column_family_set_->GetMaxColumnFamily(),
4127 min_log_number_to_keep_2pc());
4128 }
4129
4130 return s;
4131 }
4132 #endif // ROCKSDB_LITE
4133
4134 void VersionSet::MarkFileNumberUsed(uint64_t number) {
4135 // only called during recovery and repair which are single threaded, so this
4136 // works because there can't be concurrent calls
4137 if (next_file_number_.load(std::memory_order_relaxed) <= number) {
4138 next_file_number_.store(number + 1, std::memory_order_relaxed);
4139 }
4140 }
4141
4142 // Called only either from ::LogAndApply which is protected by mutex or during
4143 // recovery which is single-threaded.
4144 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
4145 if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
4146 min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
4147 }
4148 }
4149
4150 Status VersionSet::WriteSnapshot(log::Writer* log) {
4151 // TODO: Break up into multiple records to reduce memory usage on recovery?
4152
4153 // WARNING: This method doesn't hold a mutex!!
4154
4155 // This is done without DB mutex lock held, but only within single-threaded
4156 // LogAndApply. Column family manipulations can only happen within LogAndApply
4157 // (the same single thread), so we're safe to iterate.
4158 for (auto cfd : *column_family_set_) {
4159 if (cfd->IsDropped()) {
4160 continue;
4161 }
4162 assert(cfd->initialized());
4163 {
4164 // Store column family info
4165 VersionEdit edit;
4166 if (cfd->GetID() != 0) {
4167 // default column family is always there,
4168 // no need to explicitly write it
4169 edit.AddColumnFamily(cfd->GetName());
4170 edit.SetColumnFamily(cfd->GetID());
4171 }
4172 edit.SetComparatorName(
4173 cfd->internal_comparator().user_comparator()->Name());
4174 std::string record;
4175 if (!edit.EncodeTo(&record)) {
4176 return Status::Corruption(
4177 "Unable to Encode VersionEdit:" + edit.DebugString(true));
4178 }
4179 Status s = log->AddRecord(record);
4180 if (!s.ok()) {
4181 return s;
4182 }
4183 }
4184
4185 {
4186 // Save files
4187 VersionEdit edit;
4188 edit.SetColumnFamily(cfd->GetID());
4189
4190 for (int level = 0; level < cfd->NumberLevels(); level++) {
4191 for (const auto& f :
4192 cfd->current()->storage_info()->LevelFiles(level)) {
4193 edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
4194 f->fd.GetFileSize(), f->smallest, f->largest,
4195 f->fd.smallest_seqno, f->fd.largest_seqno,
4196 f->marked_for_compaction);
4197 }
4198 }
4199 edit.SetLogNumber(cfd->GetLogNumber());
4200 std::string record;
4201 if (!edit.EncodeTo(&record)) {
4202 return Status::Corruption(
4203 "Unable to Encode VersionEdit:" + edit.DebugString(true));
4204 }
4205 Status s = log->AddRecord(record);
4206 if (!s.ok()) {
4207 return s;
4208 }
4209 }
4210 }
4211
4212 return Status::OK();
4213 }
4214
4215 // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
4216 // function is called repeatedly with consecutive pairs of slices. For example
4217 // if the slice list is [a, b, c, d] this function is called with arguments
4218 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
4219 // we avoid doing binary search for the keys b and c twice and instead somehow
4220 // maintain state of where they first appear in the files.
4221 uint64_t VersionSet::ApproximateSize(Version* v, const Slice& start,
4222 const Slice& end, int start_level,
4223 int end_level) {
4224 // pre-condition
4225 assert(v->cfd_->internal_comparator().Compare(start, end) <= 0);
4226
4227 uint64_t size = 0;
4228 const auto* vstorage = v->storage_info();
4229 end_level = end_level == -1
4230 ? vstorage->num_non_empty_levels()
4231 : std::min(end_level, vstorage->num_non_empty_levels());
4232
4233 assert(start_level <= end_level);
4234
4235 for (int level = start_level; level < end_level; level++) {
4236 const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
4237 if (!files_brief.num_files) {
4238 // empty level, skip exploration
4239 continue;
4240 }
4241
4242 if (!level) {
4243 // level 0 data is sorted order, handle the use case explicitly
4244 size += ApproximateSizeLevel0(v, files_brief, start, end);
4245 continue;
4246 }
4247
4248 assert(level > 0);
4249 assert(files_brief.num_files > 0);
4250
4251 // identify the file position for starting key
4252 const uint64_t idx_start = FindFileInRange(
4253 v->cfd_->internal_comparator(), files_brief, start,
4254 /*start=*/0, static_cast<uint32_t>(files_brief.num_files - 1));
4255 assert(idx_start < files_brief.num_files);
4256
4257 // scan all files from the starting position until the ending position
4258 // inferred from the sorted order
4259 for (uint64_t i = idx_start; i < files_brief.num_files; i++) {
4260 uint64_t val;
4261 val = ApproximateSize(v, files_brief.files[i], end);
4262 if (!val) {
4263 // the files after this will not have the range
4264 break;
4265 }
4266
4267 size += val;
4268
4269 if (i == idx_start) {
4270 // subtract the bytes needed to be scanned to get to the starting
4271 // key
4272 val = ApproximateSize(v, files_brief.files[i], start);
4273 assert(size >= val);
4274 size -= val;
4275 }
4276 }
4277 }
4278
4279 return size;
4280 }
4281
4282 uint64_t VersionSet::ApproximateSizeLevel0(Version* v,
4283 const LevelFilesBrief& files_brief,
4284 const Slice& key_start,
4285 const Slice& key_end) {
4286 // level 0 files are not in sorted order, we need to iterate through
4287 // the list to compute the total bytes that require scanning
4288 uint64_t size = 0;
4289 for (size_t i = 0; i < files_brief.num_files; i++) {
4290 const uint64_t start = ApproximateSize(v, files_brief.files[i], key_start);
4291 const uint64_t end = ApproximateSize(v, files_brief.files[i], key_end);
4292 assert(end >= start);
4293 size += end - start;
4294 }
4295 return size;
4296 }
4297
4298 uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
4299 const Slice& key) {
4300 // pre-condition
4301 assert(v);
4302
4303 uint64_t result = 0;
4304 if (v->cfd_->internal_comparator().Compare(f.largest_key, key) <= 0) {
4305 // Entire file is before "key", so just add the file size
4306 result = f.fd.GetFileSize();
4307 } else if (v->cfd_->internal_comparator().Compare(f.smallest_key, key) > 0) {
4308 // Entire file is after "key", so ignore
4309 result = 0;
4310 } else {
4311 // "key" falls in the range for this table. Add the
4312 // approximate offset of "key" within the table.
4313 TableReader* table_reader_ptr;
4314 InternalIterator* iter = v->cfd_->table_cache()->NewIterator(
4315 ReadOptions(), v->env_options_, v->cfd_->internal_comparator(),
4316 *f.file_metadata, nullptr /* range_del_agg */,
4317 v->GetMutableCFOptions().prefix_extractor.get(), &table_reader_ptr);
4318 if (table_reader_ptr != nullptr) {
4319 result = table_reader_ptr->ApproximateOffsetOf(key);
4320 }
4321 delete iter;
4322 }
4323 return result;
4324 }
4325
4326 void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
4327 // pre-calculate space requirement
4328 int64_t total_files = 0;
4329 for (auto cfd : *column_family_set_) {
4330 if (!cfd->initialized()) {
4331 continue;
4332 }
4333 Version* dummy_versions = cfd->dummy_versions();
4334 for (Version* v = dummy_versions->next_; v != dummy_versions;
4335 v = v->next_) {
4336 const auto* vstorage = v->storage_info();
4337 for (int level = 0; level < vstorage->num_levels(); level++) {
4338 total_files += vstorage->LevelFiles(level).size();
4339 }
4340 }
4341 }
4342
4343 // just one time extension to the right size
4344 live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
4345
4346 for (auto cfd : *column_family_set_) {
4347 if (!cfd->initialized()) {
4348 continue;
4349 }
4350 auto* current = cfd->current();
4351 bool found_current = false;
4352 Version* dummy_versions = cfd->dummy_versions();
4353 for (Version* v = dummy_versions->next_; v != dummy_versions;
4354 v = v->next_) {
4355 v->AddLiveFiles(live_list);
4356 if (v == current) {
4357 found_current = true;
4358 }
4359 }
4360 if (!found_current && current != nullptr) {
4361 // Should never happen unless it is a bug.
4362 assert(false);
4363 current->AddLiveFiles(live_list);
4364 }
4365 }
4366 }
4367
4368 InternalIterator* VersionSet::MakeInputIterator(
4369 const Compaction* c, RangeDelAggregator* range_del_agg,
4370 const EnvOptions& env_options_compactions) {
4371 auto cfd = c->column_family_data();
4372 ReadOptions read_options;
4373 read_options.verify_checksums = true;
4374 read_options.fill_cache = false;
4375 // Compaction iterators shouldn't be confined to a single prefix.
4376 // Compactions use Seek() for
4377 // (a) concurrent compactions,
4378 // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
4379 read_options.total_order_seek = true;
4380
4381 // Level-0 files have to be merged together. For other levels,
4382 // we will make a concatenating iterator per level.
4383 // TODO(opt): use concatenating iterator for level-0 if there is no overlap
4384 const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
4385 c->num_input_levels() - 1
4386 : c->num_input_levels());
4387 InternalIterator** list = new InternalIterator* [space];
4388 size_t num = 0;
4389 for (size_t which = 0; which < c->num_input_levels(); which++) {
4390 if (c->input_levels(which)->num_files != 0) {
4391 if (c->level(which) == 0) {
4392 const LevelFilesBrief* flevel = c->input_levels(which);
4393 for (size_t i = 0; i < flevel->num_files; i++) {
4394 list[num++] = cfd->table_cache()->NewIterator(
4395 read_options, env_options_compactions, cfd->internal_comparator(),
4396 *flevel->files[i].file_metadata, range_del_agg,
4397 c->mutable_cf_options()->prefix_extractor.get(),
4398 nullptr /* table_reader_ptr */,
4399 nullptr /* no per level latency histogram */,
4400 true /* for_compaction */, nullptr /* arena */,
4401 false /* skip_filters */, static_cast<int>(which) /* level */);
4402 }
4403 } else {
4404 // Create concatenating iterator for the files from this level
4405 list[num++] = new LevelIterator(
4406 cfd->table_cache(), read_options, env_options_compactions,
4407 cfd->internal_comparator(), c->input_levels(which),
4408 c->mutable_cf_options()->prefix_extractor.get(),
4409 false /* should_sample */,
4410 nullptr /* no per level latency histogram */,
4411 true /* for_compaction */, false /* skip_filters */,
4412 static_cast<int>(which) /* level */, range_del_agg,
4413 c->boundaries(which));
4414 }
4415 }
4416 }
4417 assert(num <= space);
4418 InternalIterator* result =
4419 NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
4420 static_cast<int>(num));
4421 delete[] list;
4422 return result;
4423 }
4424
4425 // verify that the files listed in this compaction are present
4426 // in the current version
4427 bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
4428 #ifndef NDEBUG
4429 Version* version = c->column_family_data()->current();
4430 const VersionStorageInfo* vstorage = version->storage_info();
4431 if (c->input_version() != version) {
4432 ROCKS_LOG_INFO(
4433 db_options_->info_log,
4434 "[%s] compaction output being applied to a different base version from"
4435 " input version",
4436 c->column_family_data()->GetName().c_str());
4437
4438 if (vstorage->compaction_style_ == kCompactionStyleLevel &&
4439 c->start_level() == 0 && c->num_input_levels() > 2U) {
4440 // We are doing a L0->base_level compaction. The assumption is if
4441 // base level is not L1, levels from L1 to base_level - 1 is empty.
4442 // This is ensured by having one compaction from L0 going on at the
4443 // same time in level-based compaction. So that during the time, no
4444 // compaction/flush can put files to those levels.
4445 for (int l = c->start_level() + 1; l < c->output_level(); l++) {
4446 if (vstorage->NumLevelFiles(l) != 0) {
4447 return false;
4448 }
4449 }
4450 }
4451 }
4452
4453 for (size_t input = 0; input < c->num_input_levels(); ++input) {
4454 int level = c->level(input);
4455 for (size_t i = 0; i < c->num_input_files(input); ++i) {
4456 uint64_t number = c->input(input, i)->fd.GetNumber();
4457 bool found = false;
4458 for (size_t j = 0; j < vstorage->files_[level].size(); j++) {
4459 FileMetaData* f = vstorage->files_[level][j];
4460 if (f->fd.GetNumber() == number) {
4461 found = true;
4462 break;
4463 }
4464 }
4465 if (!found) {
4466 return false; // input files non existent in current version
4467 }
4468 }
4469 }
4470 #else
4471 (void)c;
4472 #endif
4473 return true; // everything good
4474 }
4475
4476 Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
4477 FileMetaData** meta,
4478 ColumnFamilyData** cfd) {
4479 for (auto cfd_iter : *column_family_set_) {
4480 if (!cfd_iter->initialized()) {
4481 continue;
4482 }
4483 Version* version = cfd_iter->current();
4484 const auto* vstorage = version->storage_info();
4485 for (int level = 0; level < vstorage->num_levels(); level++) {
4486 for (const auto& file : vstorage->LevelFiles(level)) {
4487 if (file->fd.GetNumber() == number) {
4488 *meta = file;
4489 *filelevel = level;
4490 *cfd = cfd_iter;
4491 return Status::OK();
4492 }
4493 }
4494 }
4495 }
4496 return Status::NotFound("File not present in any level");
4497 }
4498
4499 void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
4500 for (auto cfd : *column_family_set_) {
4501 if (cfd->IsDropped() || !cfd->initialized()) {
4502 continue;
4503 }
4504 for (int level = 0; level < cfd->NumberLevels(); level++) {
4505 for (const auto& file :
4506 cfd->current()->storage_info()->LevelFiles(level)) {
4507 LiveFileMetaData filemetadata;
4508 filemetadata.column_family_name = cfd->GetName();
4509 uint32_t path_id = file->fd.GetPathId();
4510 if (path_id < cfd->ioptions()->cf_paths.size()) {
4511 filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
4512 } else {
4513 assert(!cfd->ioptions()->cf_paths.empty());
4514 filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
4515 }
4516 filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
4517 filemetadata.level = level;
4518 filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
4519 filemetadata.smallestkey = file->smallest.user_key().ToString();
4520 filemetadata.largestkey = file->largest.user_key().ToString();
4521 filemetadata.smallest_seqno = file->fd.smallest_seqno;
4522 filemetadata.largest_seqno = file->fd.largest_seqno;
4523 filemetadata.num_reads_sampled = file->stats.num_reads_sampled.load(
4524 std::memory_order_relaxed);
4525 filemetadata.being_compacted = file->being_compacted;
4526 filemetadata.num_entries = file->num_entries;
4527 filemetadata.num_deletions = file->num_deletions;
4528 metadata->push_back(filemetadata);
4529 }
4530 }
4531 }
4532 }
4533
4534 void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
4535 std::vector<std::string>* manifest_filenames,
4536 uint64_t min_pending_output) {
4537 assert(manifest_filenames->empty());
4538 obsolete_manifests_.swap(*manifest_filenames);
4539 std::vector<ObsoleteFileInfo> pending_files;
4540 for (auto& f : obsolete_files_) {
4541 if (f.metadata->fd.GetNumber() < min_pending_output) {
4542 files->push_back(std::move(f));
4543 } else {
4544 pending_files.push_back(std::move(f));
4545 }
4546 }
4547 obsolete_files_.swap(pending_files);
4548 }
4549
4550 ColumnFamilyData* VersionSet::CreateColumnFamily(
4551 const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
4552 assert(edit->is_column_family_add_);
4553
4554 MutableCFOptions dummy_cf_options;
4555 Version* dummy_versions =
4556 new Version(nullptr, this, env_options_, dummy_cf_options);
4557 // Ref() dummy version once so that later we can call Unref() to delete it
4558 // by avoiding calling "delete" explicitly (~Version is private)
4559 dummy_versions->Ref();
4560 auto new_cfd = column_family_set_->CreateColumnFamily(
4561 edit->column_family_name_, edit->column_family_, dummy_versions,
4562 cf_options);
4563
4564 Version* v = new Version(new_cfd, this, env_options_,
4565 *new_cfd->GetLatestMutableCFOptions(),
4566 current_version_number_++);
4567
4568 // Fill level target base information.
4569 v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
4570 *new_cfd->GetLatestMutableCFOptions());
4571 AppendVersion(new_cfd, v);
4572 // GetLatestMutableCFOptions() is safe here without mutex since the
4573 // cfd is not available to client
4574 new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
4575 LastSequence());
4576 new_cfd->SetLogNumber(edit->log_number_);
4577 return new_cfd;
4578 }
4579
4580 uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
4581 uint64_t count = 0;
4582 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
4583 count++;
4584 }
4585 return count;
4586 }
4587
4588 uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
4589 std::unordered_set<uint64_t> unique_files;
4590 uint64_t total_files_size = 0;
4591 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
4592 VersionStorageInfo* storage_info = v->storage_info();
4593 for (int level = 0; level < storage_info->num_levels_; level++) {
4594 for (const auto& file_meta : storage_info->LevelFiles(level)) {
4595 if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
4596 unique_files.end()) {
4597 unique_files.insert(file_meta->fd.packed_number_and_path_id);
4598 total_files_size += file_meta->fd.GetFileSize();
4599 }
4600 }
4601 }
4602 }
4603 return total_files_size;
4604 }
4605
4606 ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
4607 const ImmutableDBOptions* _db_options,
4608 const EnvOptions& _env_options,
4609 Cache* table_cache,
4610 WriteBufferManager* write_buffer_manager,
4611 WriteController* write_controller)
4612 : VersionSet(dbname, _db_options, _env_options, table_cache,
4613 write_buffer_manager, write_controller) {}
4614
4615 ReactiveVersionSet::~ReactiveVersionSet() {}
4616
4617 Status ReactiveVersionSet::Recover(
4618 const std::vector<ColumnFamilyDescriptor>& column_families,
4619 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
4620 std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
4621 std::unique_ptr<Status>* manifest_reader_status) {
4622 assert(manifest_reader != nullptr);
4623 assert(manifest_reporter != nullptr);
4624 assert(manifest_reader_status != nullptr);
4625
4626 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
4627 for (const auto& cf : column_families) {
4628 cf_name_to_options.insert({cf.name, cf.options});
4629 }
4630
4631 // add default column family
4632 auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
4633 if (default_cf_iter == cf_name_to_options.end()) {
4634 return Status::InvalidArgument("Default column family not specified");
4635 }
4636 VersionEdit default_cf_edit;
4637 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
4638 default_cf_edit.SetColumnFamily(0);
4639 ColumnFamilyData* default_cfd =
4640 CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
4641 // In recovery, nobody else can access it, so it's fine to set it to be
4642 // initialized earlier.
4643 default_cfd->set_initialized();
4644
4645 bool have_log_number = false;
4646 bool have_prev_log_number = false;
4647 bool have_next_file = false;
4648 bool have_last_sequence = false;
4649 uint64_t next_file = 0;
4650 uint64_t last_sequence = 0;
4651 uint64_t log_number = 0;
4652 uint64_t previous_log_number = 0;
4653 uint32_t max_column_family = 0;
4654 uint64_t min_log_number_to_keep = 0;
4655 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
4656 builders;
4657 std::unordered_map<int, std::string> column_families_not_found;
4658 builders.insert(
4659 std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
4660 new BaseReferencedVersionBuilder(default_cfd))));
4661
4662 manifest_reader_status->reset(new Status());
4663 manifest_reporter->reset(new LogReporter());
4664 static_cast<LogReporter*>(manifest_reporter->get())->status =
4665 manifest_reader_status->get();
4666 Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
4667 log::Reader* reader = manifest_reader->get();
4668
4669 int retry = 0;
4670 while (s.ok() && retry < 1) {
4671 assert(reader != nullptr);
4672 Slice record;
4673 std::string scratch;
4674 while (s.ok() && reader->ReadRecord(&record, &scratch)) {
4675 VersionEdit edit;
4676 s = edit.DecodeFrom(record);
4677 if (!s.ok()) {
4678 break;
4679 }
4680 s = ApplyOneVersionEditToBuilder(
4681 edit, cf_name_to_options, column_families_not_found, builders,
4682 &have_log_number, &log_number, &have_prev_log_number,
4683 &previous_log_number, &have_next_file, &next_file,
4684 &have_last_sequence, &last_sequence, &min_log_number_to_keep,
4685 &max_column_family);
4686 }
4687 if (s.ok()) {
4688 bool enough = have_next_file && have_log_number && have_last_sequence;
4689 if (enough) {
4690 for (const auto& cf : column_families) {
4691 auto cfd = column_family_set_->GetColumnFamily(cf.name);
4692 if (cfd == nullptr) {
4693 enough = false;
4694 break;
4695 }
4696 }
4697 }
4698 if (enough) {
4699 for (const auto& cf : column_families) {
4700 auto cfd = column_family_set_->GetColumnFamily(cf.name);
4701 assert(cfd != nullptr);
4702 if (!cfd->IsDropped()) {
4703 auto builder_iter = builders.find(cfd->GetID());
4704 assert(builder_iter != builders.end());
4705 auto builder = builder_iter->second->version_builder();
4706 assert(builder != nullptr);
4707 s = builder->LoadTableHandlers(
4708 cfd->internal_stats(), db_options_->max_file_opening_threads,
4709 false /* prefetch_index_and_filter_in_cache */,
4710 true /* is_initial_load */,
4711 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
4712 if (!s.ok()) {
4713 enough = false;
4714 if (s.IsPathNotFound()) {
4715 s = Status::OK();
4716 }
4717 break;
4718 }
4719 }
4720 }
4721 }
4722 if (enough) {
4723 break;
4724 }
4725 }
4726 ++retry;
4727 }
4728
4729 if (s.ok()) {
4730 if (!have_prev_log_number) {
4731 previous_log_number = 0;
4732 }
4733 column_family_set_->UpdateMaxColumnFamily(max_column_family);
4734
4735 MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
4736 MarkFileNumberUsed(previous_log_number);
4737 MarkFileNumberUsed(log_number);
4738
4739 for (auto cfd : *column_family_set_) {
4740 assert(builders.count(cfd->GetID()) > 0);
4741 auto builder = builders[cfd->GetID()]->version_builder();
4742 if (!builder->CheckConsistencyForNumLevels()) {
4743 s = Status::InvalidArgument(
4744 "db has more levels than options.num_levels");
4745 break;
4746 }
4747 }
4748 }
4749
4750 if (s.ok()) {
4751 for (auto cfd : *column_family_set_) {
4752 if (cfd->IsDropped()) {
4753 continue;
4754 }
4755 assert(cfd->initialized());
4756 auto builders_iter = builders.find(cfd->GetID());
4757 assert(builders_iter != builders.end());
4758 auto* builder = builders_iter->second->version_builder();
4759
4760 Version* v = new Version(cfd, this, env_options_,
4761 *cfd->GetLatestMutableCFOptions(),
4762 current_version_number_++);
4763 builder->SaveTo(v->storage_info());
4764
4765 // Install recovered version
4766 v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
4767 !(db_options_->skip_stats_update_on_db_open));
4768 AppendVersion(cfd, v);
4769 }
4770 next_file_number_.store(next_file + 1);
4771 last_allocated_sequence_ = last_sequence;
4772 last_published_sequence_ = last_sequence;
4773 last_sequence_ = last_sequence;
4774 prev_log_number_ = previous_log_number;
4775 for (auto cfd : *column_family_set_) {
4776 if (cfd->IsDropped()) {
4777 continue;
4778 }
4779 ROCKS_LOG_INFO(db_options_->info_log,
4780 "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
4781 cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
4782 }
4783 }
4784 return s;
4785 }
4786
4787 Status ReactiveVersionSet::ReadAndApply(
4788 InstrumentedMutex* mu,
4789 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
4790 std::unordered_set<ColumnFamilyData*>* cfds_changed) {
4791 assert(manifest_reader != nullptr);
4792 assert(cfds_changed != nullptr);
4793 mu->AssertHeld();
4794
4795 Status s;
4796 bool have_log_number = false;
4797 bool have_prev_log_number = false;
4798 bool have_next_file = false;
4799 bool have_last_sequence = false;
4800 uint64_t next_file = 0;
4801 uint64_t last_sequence = 0;
4802 uint64_t log_number = 0;
4803 uint64_t previous_log_number = 0;
4804 uint32_t max_column_family = 0;
4805 uint64_t min_log_number_to_keep = 0;
4806
4807 while (s.ok()) {
4808 Slice record;
4809 std::string scratch;
4810 log::Reader* reader = manifest_reader->get();
4811 std::string old_manifest_path = reader->file()->file_name();
4812 while (reader->ReadRecord(&record, &scratch)) {
4813 VersionEdit edit;
4814 s = edit.DecodeFrom(record);
4815 if (!s.ok()) {
4816 break;
4817 }
4818 ColumnFamilyData* cfd =
4819 column_family_set_->GetColumnFamily(edit.column_family_);
4820 // If we cannot find this column family in our column family set, then it
4821 // may be a new column family created by the primary after the secondary
4822 // starts. Ignore it for now.
4823 if (nullptr == cfd) {
4824 continue;
4825 }
4826 if (active_version_builders_.find(edit.column_family_) ==
4827 active_version_builders_.end()) {
4828 std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
4829 new BaseReferencedVersionBuilder(cfd));
4830 active_version_builders_.insert(
4831 std::make_pair(edit.column_family_, std::move(builder_guard)));
4832 }
4833 s = ApplyOneVersionEditToBuilder(
4834 edit, &have_log_number, &log_number, &have_prev_log_number,
4835 &previous_log_number, &have_next_file, &next_file,
4836 &have_last_sequence, &last_sequence, &min_log_number_to_keep,
4837 &max_column_family);
4838 if (!s.ok()) {
4839 break;
4840 }
4841 auto builder_iter = active_version_builders_.find(edit.column_family_);
4842 assert(builder_iter != active_version_builders_.end());
4843 auto builder = builder_iter->second->version_builder();
4844 assert(builder != nullptr);
4845 s = builder->LoadTableHandlers(
4846 cfd->internal_stats(), db_options_->max_file_opening_threads,
4847 false /* prefetch_index_and_filter_in_cache */,
4848 false /* is_initial_load */,
4849 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
4850 TEST_SYNC_POINT_CALLBACK(
4851 "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", &s);
4852 if (!s.ok() && !s.IsPathNotFound()) {
4853 break;
4854 } else if (s.IsPathNotFound()) {
4855 s = Status::OK();
4856 } else { // s.ok() == true
4857 auto version = new Version(cfd, this, env_options_,
4858 *cfd->GetLatestMutableCFOptions(),
4859 current_version_number_++);
4860 builder->SaveTo(version->storage_info());
4861 version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
4862 AppendVersion(cfd, version);
4863 active_version_builders_.erase(builder_iter);
4864 if (cfds_changed->count(cfd) == 0) {
4865 cfds_changed->insert(cfd);
4866 }
4867 }
4868 if (have_next_file) {
4869 next_file_number_.store(next_file + 1);
4870 }
4871 if (have_last_sequence) {
4872 last_allocated_sequence_ = last_sequence;
4873 last_published_sequence_ = last_sequence;
4874 last_sequence_ = last_sequence;
4875 }
4876 if (have_prev_log_number) {
4877 prev_log_number_ = previous_log_number;
4878 MarkFileNumberUsed(previous_log_number);
4879 }
4880 if (have_log_number) {
4881 MarkFileNumberUsed(log_number);
4882 }
4883 column_family_set_->UpdateMaxColumnFamily(max_column_family);
4884 MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
4885 }
4886 // It's possible that:
4887 // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
4888 // 2) we have finished reading the current MANIFEST.
4889 // 3) we have encountered an IOError reading the current MANIFEST.
4890 // We need to look for the next MANIFEST and start from there. If we cannot
4891 // find the next MANIFEST, we should exit the loop.
4892 s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
4893 reader = manifest_reader->get();
4894 if (s.ok() && reader->file()->file_name() == old_manifest_path) {
4895 break;
4896 }
4897 }
4898
4899 if (s.ok()) {
4900 for (auto cfd : *column_family_set_) {
4901 auto builder_iter = active_version_builders_.find(cfd->GetID());
4902 if (builder_iter == active_version_builders_.end()) {
4903 continue;
4904 }
4905 auto builder = builder_iter->second->version_builder();
4906 if (!builder->CheckConsistencyForNumLevels()) {
4907 s = Status::InvalidArgument(
4908 "db has more levels than options.num_levels");
4909 break;
4910 }
4911 }
4912 }
4913 return s;
4914 }
4915
4916 Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
4917 VersionEdit& edit, bool* have_log_number, uint64_t* log_number,
4918 bool* have_prev_log_number, uint64_t* previous_log_number,
4919 bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
4920 SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
4921 uint32_t* max_column_family) {
4922 ColumnFamilyData* cfd = nullptr;
4923 Status status;
4924 if (edit.is_column_family_add_) {
4925 // TODO (yanqin) for now the secondary ignores column families created
4926 // after Open. This also simplifies handling of switching to a new MANIFEST
4927 // and processing the snapshot of the system at the beginning of the
4928 // MANIFEST.
4929 return Status::OK();
4930 } else if (edit.is_column_family_drop_) {
4931 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4932 // Drop a CF created by primary after secondary starts? Then ignore
4933 if (cfd == nullptr) {
4934 return Status::OK();
4935 }
4936 // Drop the column family by setting it to be 'dropped' without destroying
4937 // the column family handle.
4938 cfd->SetDropped();
4939 if (cfd->Unref()) {
4940 delete cfd;
4941 cfd = nullptr;
4942 }
4943 } else {
4944 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4945 // Operation on a CF created after Open? Then ignore
4946 if (cfd == nullptr) {
4947 return Status::OK();
4948 }
4949 auto builder_iter = active_version_builders_.find(edit.column_family_);
4950 assert(builder_iter != active_version_builders_.end());
4951 auto builder = builder_iter->second->version_builder();
4952 assert(builder != nullptr);
4953 builder->Apply(&edit);
4954 }
4955 return ExtractInfoFromVersionEdit(
4956 cfd, edit, have_log_number, log_number, have_prev_log_number,
4957 previous_log_number, have_next_file, next_file, have_last_sequence,
4958 last_sequence, min_log_number_to_keep, max_column_family);
4959 }
4960
4961 Status ReactiveVersionSet::MaybeSwitchManifest(
4962 log::Reader::Reporter* reporter,
4963 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
4964 assert(manifest_reader != nullptr);
4965 Status s;
4966 do {
4967 std::string manifest_path;
4968 s = GetCurrentManifestPath(&manifest_path);
4969 std::unique_ptr<SequentialFile> manifest_file;
4970 if (s.ok()) {
4971 if (nullptr == manifest_reader->get() ||
4972 manifest_reader->get()->file()->file_name() != manifest_path) {
4973 TEST_SYNC_POINT(
4974 "ReactiveVersionSet::MaybeSwitchManifest:"
4975 "AfterGetCurrentManifestPath:0");
4976 TEST_SYNC_POINT(
4977 "ReactiveVersionSet::MaybeSwitchManifest:"
4978 "AfterGetCurrentManifestPath:1");
4979 s = env_->NewSequentialFile(
4980 manifest_path, &manifest_file,
4981 env_->OptimizeForManifestRead(env_options_));
4982 } else {
4983 // No need to switch manifest.
4984 break;
4985 }
4986 }
4987 std::unique_ptr<SequentialFileReader> manifest_file_reader;
4988 if (s.ok()) {
4989 manifest_file_reader.reset(
4990 new SequentialFileReader(std::move(manifest_file), manifest_path));
4991 manifest_reader->reset(new log::FragmentBufferedReader(
4992 nullptr, std::move(manifest_file_reader), reporter,
4993 true /* checksum */, 0 /* log_number */));
4994 ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
4995 manifest_path.c_str());
4996 // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
4997 // active_version_builders_ map because we choose to construct the
4998 // versions from scratch, thanks to the first part of each MANIFEST
4999 // written by VersionSet::WriteSnapshot. This is not necessary, but we
5000 // choose this at present for the sake of simplicity.
5001 active_version_builders_.clear();
5002 }
5003 } while (s.IsPathNotFound());
5004 return s;
5005 }
5006
5007 } // namespace rocksdb