]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/version_set.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / version_set.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/version_set.h"
11
12#ifndef __STDC_FORMAT_MACROS
13#define __STDC_FORMAT_MACROS
14#endif
15
16#include <inttypes.h>
17#include <stdio.h>
18#include <algorithm>
7c673cae
FG
19#include <map>
20#include <set>
21#include <string>
22#include <unordered_map>
23#include <vector>
11fdf7f2 24#include <list>
7c673cae
FG
25#include "db/compaction.h"
26#include "db/internal_stats.h"
27#include "db/log_reader.h"
28#include "db/log_writer.h"
29#include "db/memtable.h"
30#include "db/merge_context.h"
31#include "db/merge_helper.h"
32#include "db/pinned_iterators_manager.h"
33#include "db/table_cache.h"
34#include "db/version_builder.h"
11fdf7f2 35#include "monitoring/file_read_sample.h"
7c673cae
FG
36#include "monitoring/perf_context_imp.h"
37#include "rocksdb/env.h"
38#include "rocksdb/merge_operator.h"
39#include "rocksdb/write_buffer_manager.h"
40#include "table/format.h"
41#include "table/get_context.h"
42#include "table/internal_iterator.h"
43#include "table/merging_iterator.h"
44#include "table/meta_blocks.h"
45#include "table/plain_table_factory.h"
46#include "table/table_reader.h"
47#include "table/two_level_iterator.h"
48#include "util/coding.h"
49#include "util/file_reader_writer.h"
50#include "util/filename.h"
51#include "util/stop_watch.h"
52#include "util/string_util.h"
53#include "util/sync_point.h"
54
55namespace rocksdb {
56
57namespace {
58
59// Find File in LevelFilesBrief data structure
60// Within an index range defined by left and right
61int FindFileInRange(const InternalKeyComparator& icmp,
62 const LevelFilesBrief& file_level,
63 const Slice& key,
64 uint32_t left,
65 uint32_t right) {
11fdf7f2
TL
66 auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
67 return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
68 };
69 const auto &b = file_level.files;
70 return static_cast<int>(std::lower_bound(b + left,
71 b + right, key, cmp) - b);
72}
73
74Status OverlapWithIterator(const Comparator* ucmp,
75 const Slice& smallest_user_key,
76 const Slice& largest_user_key,
77 InternalIterator* iter,
78 bool* overlap) {
79 InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
80 kValueTypeForSeek);
81 iter->Seek(range_start.Encode());
82 if (!iter->status().ok()) {
83 return iter->status();
84 }
85
86 *overlap = false;
87 if (iter->Valid()) {
88 ParsedInternalKey seek_result;
89 if (!ParseInternalKey(iter->key(), &seek_result)) {
90 return Status::Corruption("DB have corrupted keys");
91 }
92
93 if (ucmp->Compare(seek_result.user_key, largest_user_key) <= 0) {
94 *overlap = true;
7c673cae
FG
95 }
96 }
11fdf7f2
TL
97
98 return iter->status();
7c673cae
FG
99}
100
101// Class to help choose the next file to search for the particular key.
102// Searches and returns files level by level.
103// We can search level-by-level since entries never hop across
104// levels. Therefore we are guaranteed that if we find data
105// in a smaller level, later levels are irrelevant (unless we
106// are MergeInProgress).
107class FilePicker {
108 public:
109 FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
110 const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
111 unsigned int num_levels, FileIndexer* file_indexer,
112 const Comparator* user_comparator,
113 const InternalKeyComparator* internal_comparator)
114 : num_levels_(num_levels),
115 curr_level_(static_cast<unsigned int>(-1)),
116 returned_file_level_(static_cast<unsigned int>(-1)),
117 hit_file_level_(static_cast<unsigned int>(-1)),
118 search_left_bound_(0),
119 search_right_bound_(FileIndexer::kLevelMaxIndex),
120#ifndef NDEBUG
121 files_(files),
122#endif
123 level_files_brief_(file_levels),
124 is_hit_file_last_in_level_(false),
11fdf7f2 125 curr_file_level_(nullptr),
7c673cae
FG
126 user_key_(user_key),
127 ikey_(ikey),
128 file_indexer_(file_indexer),
129 user_comparator_(user_comparator),
130 internal_comparator_(internal_comparator) {
11fdf7f2
TL
131#ifdef NDEBUG
132 (void)files;
133#endif
7c673cae
FG
134 // Setup member variables to search first level.
135 search_ended_ = !PrepareNextLevel();
136 if (!search_ended_) {
137 // Prefetch Level 0 table data to avoid cache miss if possible.
138 for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
139 auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
140 if (r) {
141 r->Prepare(ikey);
142 }
143 }
144 }
145 }
146
11fdf7f2 147 int GetCurrentLevel() const { return curr_level_; }
7c673cae
FG
148
149 FdWithKeyRange* GetNextFile() {
150 while (!search_ended_) { // Loops over different levels.
151 while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
152 // Loops over all files in current level.
153 FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
154 hit_file_level_ = curr_level_;
155 is_hit_file_last_in_level_ =
156 curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
157 int cmp_largest = -1;
158
159 // Do key range filtering of files or/and fractional cascading if:
160 // (1) not all the files are in level 0, or
11fdf7f2
TL
161 // (2) there are more than 3 current level files
162 // If there are only 3 or less current level files in the system, we skip
7c673cae
FG
163 // the key range filtering. In this case, more likely, the system is
164 // highly tuned to minimize number of tables queried by each query,
165 // so it is unlikely that key range filtering is more efficient than
166 // querying the files.
167 if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
168 // Check if key is within a file's range. If search left bound and
169 // right bound point to the same find, we are sure key falls in
170 // range.
171 assert(
172 curr_level_ == 0 ||
173 curr_index_in_curr_level_ == start_index_in_curr_level_ ||
174 user_comparator_->Compare(user_key_,
175 ExtractUserKey(f->smallest_key)) <= 0);
176
177 int cmp_smallest = user_comparator_->Compare(user_key_,
178 ExtractUserKey(f->smallest_key));
179 if (cmp_smallest >= 0) {
180 cmp_largest = user_comparator_->Compare(user_key_,
181 ExtractUserKey(f->largest_key));
182 }
183
184 // Setup file search bound for the next level based on the
185 // comparison results
186 if (curr_level_ > 0) {
187 file_indexer_->GetNextLevelIndex(curr_level_,
188 curr_index_in_curr_level_,
189 cmp_smallest, cmp_largest,
190 &search_left_bound_,
191 &search_right_bound_);
192 }
193 // Key falls out of current file's range
194 if (cmp_smallest < 0 || cmp_largest > 0) {
195 if (curr_level_ == 0) {
196 ++curr_index_in_curr_level_;
197 continue;
198 } else {
199 // Search next level.
200 break;
201 }
202 }
203 }
204#ifndef NDEBUG
205 // Sanity check to make sure that the files are correctly sorted
206 if (prev_file_) {
207 if (curr_level_ != 0) {
208 int comp_sign = internal_comparator_->Compare(
209 prev_file_->largest_key, f->smallest_key);
210 assert(comp_sign < 0);
211 } else {
212 // level == 0, the current file cannot be newer than the previous
213 // one. Use compressed data structure, has no attribute seqNo
214 assert(curr_index_in_curr_level_ > 0);
215 assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
216 files_[0][curr_index_in_curr_level_-1]));
217 }
218 }
219 prev_file_ = f;
220#endif
221 returned_file_level_ = curr_level_;
222 if (curr_level_ > 0 && cmp_largest < 0) {
223 // No more files to search in this level.
224 search_ended_ = !PrepareNextLevel();
225 } else {
226 ++curr_index_in_curr_level_;
227 }
228 return f;
229 }
230 // Start searching next level.
231 search_ended_ = !PrepareNextLevel();
232 }
233 // Search ended.
234 return nullptr;
235 }
236
237 // getter for current file level
238 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
239 unsigned int GetHitFileLevel() { return hit_file_level_; }
240
241 // Returns true if the most recent "hit file" (i.e., one returned by
242 // GetNextFile()) is at the last index in its level.
243 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
244
245 private:
246 unsigned int num_levels_;
247 unsigned int curr_level_;
248 unsigned int returned_file_level_;
249 unsigned int hit_file_level_;
250 int32_t search_left_bound_;
251 int32_t search_right_bound_;
252#ifndef NDEBUG
253 std::vector<FileMetaData*>* files_;
254#endif
255 autovector<LevelFilesBrief>* level_files_brief_;
256 bool search_ended_;
257 bool is_hit_file_last_in_level_;
258 LevelFilesBrief* curr_file_level_;
259 unsigned int curr_index_in_curr_level_;
260 unsigned int start_index_in_curr_level_;
261 Slice user_key_;
262 Slice ikey_;
263 FileIndexer* file_indexer_;
264 const Comparator* user_comparator_;
265 const InternalKeyComparator* internal_comparator_;
266#ifndef NDEBUG
267 FdWithKeyRange* prev_file_;
268#endif
269
270 // Setup local variables to search next level.
271 // Returns false if there are no more levels to search.
272 bool PrepareNextLevel() {
273 curr_level_++;
274 while (curr_level_ < num_levels_) {
275 curr_file_level_ = &(*level_files_brief_)[curr_level_];
276 if (curr_file_level_->num_files == 0) {
277 // When current level is empty, the search bound generated from upper
278 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
279 // also empty.
280 assert(search_left_bound_ == 0);
281 assert(search_right_bound_ == -1 ||
282 search_right_bound_ == FileIndexer::kLevelMaxIndex);
283 // Since current level is empty, it will need to search all files in
284 // the next level
285 search_left_bound_ = 0;
286 search_right_bound_ = FileIndexer::kLevelMaxIndex;
287 curr_level_++;
288 continue;
289 }
290
291 // Some files may overlap each other. We find
292 // all files that overlap user_key and process them in order from
293 // newest to oldest. In the context of merge-operator, this can occur at
294 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
295 // are always compacted into a single entry).
296 int32_t start_index;
297 if (curr_level_ == 0) {
298 // On Level-0, we read through all files to check for overlap.
299 start_index = 0;
300 } else {
301 // On Level-n (n>=1), files are sorted. Binary search to find the
302 // earliest file whose largest key >= ikey. Search left bound and
303 // right bound are used to narrow the range.
304 if (search_left_bound_ == search_right_bound_) {
305 start_index = search_left_bound_;
306 } else if (search_left_bound_ < search_right_bound_) {
307 if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
308 search_right_bound_ =
309 static_cast<int32_t>(curr_file_level_->num_files) - 1;
310 }
311 start_index =
312 FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
313 static_cast<uint32_t>(search_left_bound_),
314 static_cast<uint32_t>(search_right_bound_));
315 } else {
316 // search_left_bound > search_right_bound, key does not exist in
317 // this level. Since no comparison is done in this level, it will
318 // need to search all files in the next level.
319 search_left_bound_ = 0;
320 search_right_bound_ = FileIndexer::kLevelMaxIndex;
321 curr_level_++;
322 continue;
323 }
324 }
325 start_index_in_curr_level_ = start_index;
326 curr_index_in_curr_level_ = start_index;
327#ifndef NDEBUG
328 prev_file_ = nullptr;
329#endif
330 return true;
331 }
332 // curr_level_ = num_levels_. So, no more levels to search.
333 return false;
334 }
335};
336} // anonymous namespace
337
338VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
339
340Version::~Version() {
341 assert(refs_ == 0);
342
343 // Remove from linked list
344 prev_->next_ = next_;
345 next_->prev_ = prev_;
346
347 // Drop references to files
348 for (int level = 0; level < storage_info_.num_levels_; level++) {
349 for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
350 FileMetaData* f = storage_info_.files_[level][i];
351 assert(f->refs > 0);
352 f->refs--;
353 if (f->refs <= 0) {
11fdf7f2
TL
354 assert(cfd_ != nullptr);
355 uint32_t path_id = f->fd.GetPathId();
356 assert(path_id < cfd_->ioptions()->cf_paths.size());
357 vset_->obsolete_files_.push_back(
358 ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
7c673cae
FG
359 }
360 }
361 }
362}
363
364int FindFile(const InternalKeyComparator& icmp,
365 const LevelFilesBrief& file_level,
366 const Slice& key) {
367 return FindFileInRange(icmp, file_level, key, 0,
368 static_cast<uint32_t>(file_level.num_files));
369}
370
371void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
372 const std::vector<FileMetaData*>& files,
373 Arena* arena) {
374 assert(file_level);
375 assert(arena);
376
377 size_t num = files.size();
378 file_level->num_files = num;
379 char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
380 file_level->files = new (mem)FdWithKeyRange[num];
381
382 for (size_t i = 0; i < num; i++) {
383 Slice smallest_key = files[i]->smallest.Encode();
384 Slice largest_key = files[i]->largest.Encode();
385
386 // Copy key slice to sequential memory
387 size_t smallest_size = smallest_key.size();
388 size_t largest_size = largest_key.size();
389 mem = arena->AllocateAligned(smallest_size + largest_size);
390 memcpy(mem, smallest_key.data(), smallest_size);
391 memcpy(mem + smallest_size, largest_key.data(), largest_size);
392
393 FdWithKeyRange& f = file_level->files[i];
394 f.fd = files[i]->fd;
11fdf7f2 395 f.file_metadata = files[i];
7c673cae
FG
396 f.smallest_key = Slice(mem, smallest_size);
397 f.largest_key = Slice(mem + smallest_size, largest_size);
398 }
399}
400
401static bool AfterFile(const Comparator* ucmp,
402 const Slice* user_key, const FdWithKeyRange* f) {
403 // nullptr user_key occurs before all keys and is therefore never after *f
404 return (user_key != nullptr &&
405 ucmp->Compare(*user_key, ExtractUserKey(f->largest_key)) > 0);
406}
407
408static bool BeforeFile(const Comparator* ucmp,
409 const Slice* user_key, const FdWithKeyRange* f) {
410 // nullptr user_key occurs after all keys and is therefore never before *f
411 return (user_key != nullptr &&
412 ucmp->Compare(*user_key, ExtractUserKey(f->smallest_key)) < 0);
413}
414
415bool SomeFileOverlapsRange(
416 const InternalKeyComparator& icmp,
417 bool disjoint_sorted_files,
418 const LevelFilesBrief& file_level,
419 const Slice* smallest_user_key,
420 const Slice* largest_user_key) {
421 const Comparator* ucmp = icmp.user_comparator();
422 if (!disjoint_sorted_files) {
423 // Need to check against all files
424 for (size_t i = 0; i < file_level.num_files; i++) {
425 const FdWithKeyRange* f = &(file_level.files[i]);
426 if (AfterFile(ucmp, smallest_user_key, f) ||
427 BeforeFile(ucmp, largest_user_key, f)) {
428 // No overlap
429 } else {
430 return true; // Overlap
431 }
432 }
433 return false;
434 }
435
436 // Binary search over file list
437 uint32_t index = 0;
438 if (smallest_user_key != nullptr) {
11fdf7f2 439 // Find the leftmost possible internal key for smallest_user_key
7c673cae 440 InternalKey small;
11fdf7f2 441 small.SetMinPossibleForUserKey(*smallest_user_key);
7c673cae
FG
442 index = FindFile(icmp, file_level, small.Encode());
443 }
444
445 if (index >= file_level.num_files) {
446 // beginning of range is after all files, so no overlap.
447 return false;
448 }
449
450 return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
451}
452
453namespace {
11fdf7f2 454class LevelIterator final : public InternalIterator {
7c673cae 455 public:
11fdf7f2
TL
456 LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
457 const EnvOptions& env_options,
458 const InternalKeyComparator& icomparator,
459 const LevelFilesBrief* flevel,
460 const SliceTransform* prefix_extractor, bool should_sample,
461 HistogramImpl* file_read_hist, bool for_compaction,
462 bool skip_filters, int level, RangeDelAggregator* range_del_agg)
463 : table_cache_(table_cache),
464 read_options_(read_options),
465 env_options_(env_options),
466 icomparator_(icomparator),
7c673cae 467 flevel_(flevel),
11fdf7f2
TL
468 prefix_extractor_(prefix_extractor),
469 file_read_hist_(file_read_hist),
470 should_sample_(should_sample),
471 for_compaction_(for_compaction),
472 skip_filters_(skip_filters),
473 file_index_(flevel_->num_files),
474 level_(level),
475 range_del_agg_(range_del_agg),
476 pinned_iters_mgr_(nullptr) {
477 // Empty level is not supported.
478 assert(flevel_ != nullptr && flevel_->num_files > 0);
7c673cae
FG
479 }
480
11fdf7f2
TL
481 virtual ~LevelIterator() { delete file_iter_.Set(nullptr); }
482
483 virtual void Seek(const Slice& target) override;
484 virtual void SeekForPrev(const Slice& target) override;
485 virtual void SeekToFirst() override;
486 virtual void SeekToLast() override;
487 virtual void Next() override;
488 virtual void Prev() override;
489
490 virtual bool Valid() const override { return file_iter_.Valid(); }
491 virtual Slice key() const override {
7c673cae 492 assert(Valid());
11fdf7f2 493 return file_iter_.key();
7c673cae 494 }
11fdf7f2 495 virtual Slice value() const override {
7c673cae 496 assert(Valid());
11fdf7f2
TL
497 return file_iter_.value();
498 }
499 virtual Status status() const override {
500 return file_iter_.iter() ? file_iter_.status() : Status::OK();
501 }
502 virtual void SetPinnedItersMgr(
503 PinnedIteratorsManager* pinned_iters_mgr) override {
504 pinned_iters_mgr_ = pinned_iters_mgr;
505 if (file_iter_.iter()) {
506 file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
7c673cae
FG
507 }
508 }
11fdf7f2
TL
509 virtual bool IsKeyPinned() const override {
510 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
511 file_iter_.iter() && file_iter_.IsKeyPinned();
7c673cae 512 }
11fdf7f2
TL
513 virtual bool IsValuePinned() const override {
514 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
515 file_iter_.iter() && file_iter_.IsValuePinned();
7c673cae 516 }
7c673cae
FG
517
518 private:
11fdf7f2
TL
519 void SkipEmptyFileForward();
520 void SkipEmptyFileBackward();
521 void SetFileIterator(InternalIterator* iter);
522 void InitFileIterator(size_t new_file_index);
7c673cae 523
11fdf7f2
TL
524 const Slice& file_smallest_key(size_t file_index) {
525 assert(file_index < flevel_->num_files);
526 return flevel_->files[file_index].smallest_key;
527 }
7c673cae 528
11fdf7f2
TL
529 bool KeyReachedUpperBound(const Slice& internal_key) {
530 return read_options_.iterate_upper_bound != nullptr &&
531 icomparator_.user_comparator()->Compare(
532 ExtractUserKey(internal_key),
533 *read_options_.iterate_upper_bound) >= 0;
7c673cae
FG
534 }
535
11fdf7f2
TL
536 InternalIterator* NewFileIterator() {
537 assert(file_index_ < flevel_->num_files);
538 auto file_meta = flevel_->files[file_index_];
539 if (should_sample_) {
540 sample_file_read_inc(file_meta.file_metadata);
541 }
542
543 return table_cache_->NewIterator(
544 read_options_, env_options_, icomparator_, *file_meta.file_metadata,
545 range_del_agg_, prefix_extractor_,
546 nullptr /* don't need reference to table */,
547 file_read_hist_, for_compaction_, nullptr /* arena */, skip_filters_,
548 level_);
7c673cae
FG
549 }
550
7c673cae
FG
551 TableCache* table_cache_;
552 const ReadOptions read_options_;
553 const EnvOptions& env_options_;
554 const InternalKeyComparator& icomparator_;
11fdf7f2
TL
555 const LevelFilesBrief* flevel_;
556 mutable FileDescriptor current_value_;
557 const SliceTransform* prefix_extractor_;
558
7c673cae 559 HistogramImpl* file_read_hist_;
11fdf7f2 560 bool should_sample_;
7c673cae
FG
561 bool for_compaction_;
562 bool skip_filters_;
11fdf7f2 563 size_t file_index_;
7c673cae
FG
564 int level_;
565 RangeDelAggregator* range_del_agg_;
11fdf7f2
TL
566 IteratorWrapper file_iter_; // May be nullptr
567 PinnedIteratorsManager* pinned_iters_mgr_;
7c673cae
FG
568};
569
11fdf7f2
TL
570void LevelIterator::Seek(const Slice& target) {
571 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
572
573 InitFileIterator(new_file_index);
574 if (file_iter_.iter() != nullptr) {
575 file_iter_.Seek(target);
576 }
577 SkipEmptyFileForward();
578}
579
580void LevelIterator::SeekForPrev(const Slice& target) {
581 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
582 if (new_file_index >= flevel_->num_files) {
583 new_file_index = flevel_->num_files - 1;
584 }
585
586 InitFileIterator(new_file_index);
587 if (file_iter_.iter() != nullptr) {
588 file_iter_.SeekForPrev(target);
589 SkipEmptyFileBackward();
590 }
591}
592
593void LevelIterator::SeekToFirst() {
594 InitFileIterator(0);
595 if (file_iter_.iter() != nullptr) {
596 file_iter_.SeekToFirst();
597 }
598 SkipEmptyFileForward();
599}
600
601void LevelIterator::SeekToLast() {
602 InitFileIterator(flevel_->num_files - 1);
603 if (file_iter_.iter() != nullptr) {
604 file_iter_.SeekToLast();
605 }
606 SkipEmptyFileBackward();
607}
608
609void LevelIterator::Next() {
610 assert(Valid());
611 file_iter_.Next();
612 SkipEmptyFileForward();
613}
614
615void LevelIterator::Prev() {
616 assert(Valid());
617 file_iter_.Prev();
618 SkipEmptyFileBackward();
619}
620
621void LevelIterator::SkipEmptyFileForward() {
622 while (file_iter_.iter() == nullptr ||
623 (!file_iter_.Valid() && file_iter_.status().ok() &&
624 !file_iter_.iter()->IsOutOfBound())) {
625 // Move to next file
626 if (file_index_ >= flevel_->num_files - 1) {
627 // Already at the last file
628 SetFileIterator(nullptr);
629 return;
630 }
631 if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
632 SetFileIterator(nullptr);
633 return;
634 }
635 InitFileIterator(file_index_ + 1);
636 if (file_iter_.iter() != nullptr) {
637 file_iter_.SeekToFirst();
638 }
639 }
640}
641
642void LevelIterator::SkipEmptyFileBackward() {
643 while (file_iter_.iter() == nullptr ||
644 (!file_iter_.Valid() && file_iter_.status().ok())) {
645 // Move to previous file
646 if (file_index_ == 0) {
647 // Already the first file
648 SetFileIterator(nullptr);
649 return;
650 }
651 InitFileIterator(file_index_ - 1);
652 if (file_iter_.iter() != nullptr) {
653 file_iter_.SeekToLast();
654 }
655 }
656}
657
658void LevelIterator::SetFileIterator(InternalIterator* iter) {
659 if (pinned_iters_mgr_ && iter) {
660 iter->SetPinnedItersMgr(pinned_iters_mgr_);
661 }
662
663 InternalIterator* old_iter = file_iter_.Set(iter);
664 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
665 pinned_iters_mgr_->PinIterator(old_iter);
666 } else {
667 delete old_iter;
668 }
669}
670
671void LevelIterator::InitFileIterator(size_t new_file_index) {
672 if (new_file_index >= flevel_->num_files) {
673 file_index_ = new_file_index;
674 SetFileIterator(nullptr);
675 return;
676 } else {
677 // If the file iterator shows incomplete, we try it again if users seek
678 // to the same file, as this time we may go to a different data block
679 // which is cached in block cache.
680 //
681 if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
682 new_file_index == file_index_) {
683 // file_iter_ is already constructed with this iterator, so
684 // no need to change anything
685 } else {
686 file_index_ = new_file_index;
687 InternalIterator* iter = NewFileIterator();
688 SetFileIterator(iter);
689 }
690 }
691}
692
7c673cae
FG
693// A wrapper of version builder which references the current version in
694// constructor and unref it in the destructor.
695// Both of the constructor and destructor need to be called inside DB Mutex.
696class BaseReferencedVersionBuilder {
697 public:
698 explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
699 : version_builder_(new VersionBuilder(
700 cfd->current()->version_set()->env_options(), cfd->table_cache(),
701 cfd->current()->storage_info(), cfd->ioptions()->info_log)),
702 version_(cfd->current()) {
703 version_->Ref();
704 }
705 ~BaseReferencedVersionBuilder() {
706 delete version_builder_;
707 version_->Unref();
708 }
709 VersionBuilder* version_builder() { return version_builder_; }
710
711 private:
712 VersionBuilder* version_builder_;
713 Version* version_;
714};
715} // anonymous namespace
716
717Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
718 const FileMetaData* file_meta,
719 const std::string* fname) const {
720 auto table_cache = cfd_->table_cache();
721 auto ioptions = cfd_->ioptions();
722 Status s = table_cache->GetTableProperties(
11fdf7f2
TL
723 env_options_, cfd_->internal_comparator(), file_meta->fd, tp,
724 mutable_cf_options_.prefix_extractor.get(), true /* no io */);
7c673cae
FG
725 if (s.ok()) {
726 return s;
727 }
728
729 // We only ignore error type `Incomplete` since it's by design that we
730 // disallow table when it's not in table cache.
731 if (!s.IsIncomplete()) {
732 return s;
733 }
734
735 // 2. Table is not present in table cache, we'll read the table properties
736 // directly from the properties block in the file.
737 std::unique_ptr<RandomAccessFile> file;
11fdf7f2 738 std::string file_name;
7c673cae 739 if (fname != nullptr) {
11fdf7f2 740 file_name = *fname;
7c673cae 741 } else {
11fdf7f2
TL
742 file_name =
743 TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
744 file_meta->fd.GetPathId());
7c673cae 745 }
11fdf7f2 746 s = ioptions->env->NewRandomAccessFile(file_name, &file, env_options_);
7c673cae
FG
747 if (!s.ok()) {
748 return s;
749 }
750
751 TableProperties* raw_table_properties;
752 // By setting the magic number to kInvalidTableMagicNumber, we can by
753 // pass the magic number check in the footer.
754 std::unique_ptr<RandomAccessFileReader> file_reader(
11fdf7f2 755 new RandomAccessFileReader(std::move(file), file_name));
7c673cae
FG
756 s = ReadTableProperties(
757 file_reader.get(), file_meta->fd.GetFileSize(),
11fdf7f2
TL
758 Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
759 &raw_table_properties, false /* compression_type_missing */);
7c673cae
FG
760 if (!s.ok()) {
761 return s;
762 }
763 RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
764
765 *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
766 return s;
767}
768
769Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
770 Status s;
771 for (int level = 0; level < storage_info_.num_levels_; level++) {
772 s = GetPropertiesOfAllTables(props, level);
773 if (!s.ok()) {
774 return s;
775 }
776 }
777
778 return Status::OK();
779}
780
781Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
782 int level) {
783 for (const auto& file_meta : storage_info_.files_[level]) {
784 auto fname =
11fdf7f2 785 TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
7c673cae
FG
786 file_meta->fd.GetPathId());
787 // 1. If the table is already present in table cache, load table
788 // properties from there.
789 std::shared_ptr<const TableProperties> table_properties;
790 Status s = GetTableProperties(&table_properties, file_meta, &fname);
791 if (s.ok()) {
792 props->insert({fname, table_properties});
793 } else {
794 return s;
795 }
796 }
797
798 return Status::OK();
799}
800
801Status Version::GetPropertiesOfTablesInRange(
802 const Range* range, std::size_t n, TablePropertiesCollection* props) const {
803 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
804 for (decltype(n) i = 0; i < n; i++) {
805 // Convert user_key into a corresponding internal key.
806 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
807 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
808 std::vector<FileMetaData*> files;
809 storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
810 false);
811 for (const auto& file_meta : files) {
812 auto fname =
11fdf7f2 813 TableFileName(cfd_->ioptions()->cf_paths,
7c673cae
FG
814 file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
815 if (props->count(fname) == 0) {
816 // 1. If the table is already present in table cache, load table
817 // properties from there.
818 std::shared_ptr<const TableProperties> table_properties;
819 Status s = GetTableProperties(&table_properties, file_meta, &fname);
820 if (s.ok()) {
821 props->insert({fname, table_properties});
822 } else {
823 return s;
824 }
825 }
826 }
827 }
828 }
829
830 return Status::OK();
831}
832
833Status Version::GetAggregatedTableProperties(
834 std::shared_ptr<const TableProperties>* tp, int level) {
835 TablePropertiesCollection props;
836 Status s;
837 if (level < 0) {
838 s = GetPropertiesOfAllTables(&props);
839 } else {
840 s = GetPropertiesOfAllTables(&props, level);
841 }
842 if (!s.ok()) {
843 return s;
844 }
845
846 auto* new_tp = new TableProperties();
847 for (const auto& item : props) {
848 new_tp->Add(*item.second);
849 }
850 tp->reset(new_tp);
851 return Status::OK();
852}
853
854size_t Version::GetMemoryUsageByTableReaders() {
855 size_t total_usage = 0;
856 for (auto& file_level : storage_info_.level_files_brief_) {
857 for (size_t i = 0; i < file_level.num_files; i++) {
858 total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
11fdf7f2
TL
859 env_options_, cfd_->internal_comparator(), file_level.files[i].fd,
860 mutable_cf_options_.prefix_extractor.get());
7c673cae
FG
861 }
862 }
863 return total_usage;
864}
865
866void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
867 assert(cf_meta);
868 assert(cfd_);
869
870 cf_meta->name = cfd_->GetName();
871 cf_meta->size = 0;
872 cf_meta->file_count = 0;
873 cf_meta->levels.clear();
874
875 auto* ioptions = cfd_->ioptions();
876 auto* vstorage = storage_info();
877
878 for (int level = 0; level < cfd_->NumberLevels(); level++) {
879 uint64_t level_size = 0;
880 cf_meta->file_count += vstorage->LevelFiles(level).size();
881 std::vector<SstFileMetaData> files;
882 for (const auto& file : vstorage->LevelFiles(level)) {
883 uint32_t path_id = file->fd.GetPathId();
884 std::string file_path;
11fdf7f2
TL
885 if (path_id < ioptions->cf_paths.size()) {
886 file_path = ioptions->cf_paths[path_id].path;
7c673cae 887 } else {
11fdf7f2
TL
888 assert(!ioptions->cf_paths.empty());
889 file_path = ioptions->cf_paths.back().path;
7c673cae 890 }
11fdf7f2 891 files.emplace_back(SstFileMetaData{
7c673cae
FG
892 MakeTableFileName("", file->fd.GetNumber()),
893 file_path,
11fdf7f2
TL
894 static_cast<size_t>(file->fd.GetFileSize()),
895 file->fd.smallest_seqno,
896 file->fd.largest_seqno,
7c673cae
FG
897 file->smallest.user_key().ToString(),
898 file->largest.user_key().ToString(),
11fdf7f2
TL
899 file->stats.num_reads_sampled.load(std::memory_order_relaxed),
900 file->being_compacted});
7c673cae
FG
901 level_size += file->fd.GetFileSize();
902 }
903 cf_meta->levels.emplace_back(
904 level, level_size, std::move(files));
905 cf_meta->size += level_size;
906 }
907}
908
11fdf7f2
TL
909uint64_t Version::GetSstFilesSize() {
910 uint64_t sst_files_size = 0;
911 for (int level = 0; level < storage_info_.num_levels_; level++) {
912 for (const auto& file_meta : storage_info_.LevelFiles(level)) {
913 sst_files_size += file_meta->fd.GetFileSize();
914 }
915 }
916 return sst_files_size;
917}
7c673cae
FG
918
919uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
920 // Estimation will be inaccurate when:
921 // (1) there exist merge keys
922 // (2) keys are directly overwritten
923 // (3) deletion on non-existing keys
924 // (4) low number of samples
925 if (current_num_samples_ == 0) {
926 return 0;
927 }
928
929 if (current_num_non_deletions_ <= current_num_deletions_) {
930 return 0;
931 }
932
933 uint64_t est = current_num_non_deletions_ - current_num_deletions_;
934
935 uint64_t file_count = 0;
936 for (int level = 0; level < num_levels_; ++level) {
937 file_count += files_[level].size();
938 }
939
940 if (current_num_samples_ < file_count) {
941 // casting to avoid overflowing
942 return
943 static_cast<uint64_t>(
944 (est * static_cast<double>(file_count) / current_num_samples_)
945 );
946 } else {
947 return est;
948 }
949}
950
951double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
952 int level) const {
953 assert(level < num_levels_);
954 uint64_t sum_file_size_bytes = 0;
955 uint64_t sum_data_size_bytes = 0;
956 for (auto* file_meta : files_[level]) {
957 sum_file_size_bytes += file_meta->fd.GetFileSize();
958 sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
959 }
960 if (sum_file_size_bytes == 0) {
961 return -1.0;
962 }
963 return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
964}
965
966void Version::AddIterators(const ReadOptions& read_options,
967 const EnvOptions& soptions,
968 MergeIteratorBuilder* merge_iter_builder,
969 RangeDelAggregator* range_del_agg) {
970 assert(storage_info_.finalized_);
971
972 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
973 AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
974 range_del_agg);
975 }
976}
977
978void Version::AddIteratorsForLevel(const ReadOptions& read_options,
979 const EnvOptions& soptions,
980 MergeIteratorBuilder* merge_iter_builder,
981 int level,
982 RangeDelAggregator* range_del_agg) {
983 assert(storage_info_.finalized_);
984 if (level >= storage_info_.num_non_empty_levels()) {
985 // This is an empty level
986 return;
987 } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
988 // No files in this level
989 return;
990 }
991
11fdf7f2
TL
992 bool should_sample = should_sample_file_read();
993
7c673cae
FG
994 auto* arena = merge_iter_builder->GetArena();
995 if (level == 0) {
996 // Merge all level zero files together since they may overlap
997 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
998 const auto& file = storage_info_.LevelFilesBrief(0).files[i];
999 merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
11fdf7f2
TL
1000 read_options, soptions, cfd_->internal_comparator(), *file.file_metadata,
1001 range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
1002 cfd_->internal_stats()->GetFileReadHist(0), false, arena,
1003 false /* skip_filters */, 0 /* level */));
7c673cae 1004 }
11fdf7f2
TL
1005 if (should_sample) {
1006 // Count ones for every L0 files. This is done per iterator creation
1007 // rather than Seek(), while files in other levels are recored per seek.
1008 // If users execute one range query per iterator, there may be some
1009 // discrepancy here.
1010 for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
1011 sample_file_read_inc(meta);
1012 }
1013 }
1014 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
7c673cae
FG
1015 // For levels > 0, we can use a concatenating iterator that sequentially
1016 // walks through the non-overlapping files in the level, opening them
1017 // lazily.
11fdf7f2
TL
1018 auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
1019 merge_iter_builder->AddIterator(new (mem) LevelIterator(
1020 cfd_->table_cache(), read_options, soptions,
1021 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1022 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1023 cfd_->internal_stats()->GetFileReadHist(level),
1024 false /* for_compaction */, IsFilterSkipped(level), level,
1025 range_del_agg));
7c673cae
FG
1026 }
1027}
1028
11fdf7f2
TL
1029Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
1030 const EnvOptions& env_options,
1031 const Slice& smallest_user_key,
1032 const Slice& largest_user_key,
1033 int level, bool* overlap) {
1034 assert(storage_info_.finalized_);
1035
1036 auto icmp = cfd_->internal_comparator();
1037 auto ucmp = icmp.user_comparator();
1038
1039 Arena arena;
1040 Status status;
1041 RangeDelAggregator range_del_agg(icmp, {}, false);
1042
1043 *overlap = false;
1044
1045 if (level == 0) {
1046 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1047 const auto file = &storage_info_.LevelFilesBrief(0).files[i];
1048 if (AfterFile(ucmp, &smallest_user_key, file) ||
1049 BeforeFile(ucmp, &largest_user_key, file)) {
1050 continue;
1051 }
1052 ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
1053 read_options, env_options, cfd_->internal_comparator(), *file->file_metadata,
1054 &range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
1055 cfd_->internal_stats()->GetFileReadHist(0), false, &arena,
1056 false /* skip_filters */, 0 /* level */));
1057 status = OverlapWithIterator(
1058 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1059 if (!status.ok() || *overlap) {
1060 break;
1061 }
1062 }
1063 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1064 auto mem = arena.AllocateAligned(sizeof(LevelIterator));
1065 ScopedArenaIterator iter(new (mem) LevelIterator(
1066 cfd_->table_cache(), read_options, env_options,
1067 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1068 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1069 cfd_->internal_stats()->GetFileReadHist(level),
1070 false /* for_compaction */, IsFilterSkipped(level), level,
1071 &range_del_agg));
1072 status = OverlapWithIterator(
1073 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1074 }
1075
1076 if (status.ok() && *overlap == false &&
1077 range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
1078 *overlap = true;
1079 }
1080 return status;
1081}
1082
7c673cae
FG
1083VersionStorageInfo::VersionStorageInfo(
1084 const InternalKeyComparator* internal_comparator,
1085 const Comparator* user_comparator, int levels,
1086 CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
1087 bool _force_consistency_checks)
1088 : internal_comparator_(internal_comparator),
1089 user_comparator_(user_comparator),
1090 // cfd is nullptr if Version is dummy
1091 num_levels_(levels),
1092 num_non_empty_levels_(0),
1093 file_indexer_(user_comparator),
1094 compaction_style_(compaction_style),
1095 files_(new std::vector<FileMetaData*>[num_levels_]),
1096 base_level_(num_levels_ == 1 ? -1 : 1),
1097 files_by_compaction_pri_(num_levels_),
1098 level0_non_overlapping_(false),
1099 next_file_to_compact_by_size_(num_levels_),
1100 compaction_score_(num_levels_),
1101 compaction_level_(num_levels_),
1102 l0_delay_trigger_count_(0),
1103 accumulated_file_size_(0),
1104 accumulated_raw_key_size_(0),
1105 accumulated_raw_value_size_(0),
1106 accumulated_num_non_deletions_(0),
1107 accumulated_num_deletions_(0),
1108 current_num_non_deletions_(0),
1109 current_num_deletions_(0),
1110 current_num_samples_(0),
1111 estimated_compaction_needed_bytes_(0),
1112 finalized_(false),
1113 force_consistency_checks_(_force_consistency_checks) {
1114 if (ref_vstorage != nullptr) {
1115 accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
1116 accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
1117 accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
1118 accumulated_num_non_deletions_ =
1119 ref_vstorage->accumulated_num_non_deletions_;
1120 accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
1121 current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
1122 current_num_deletions_ = ref_vstorage->current_num_deletions_;
1123 current_num_samples_ = ref_vstorage->current_num_samples_;
11fdf7f2 1124 oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
7c673cae
FG
1125 }
1126}
1127
1128Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
11fdf7f2
TL
1129 const EnvOptions& env_opt,
1130 const MutableCFOptions mutable_cf_options,
7c673cae
FG
1131 uint64_t version_number)
1132 : env_(vset->env_),
1133 cfd_(column_family_data),
1134 info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
1135 db_statistics_((cfd_ == nullptr) ? nullptr
1136 : cfd_->ioptions()->statistics),
1137 table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
1138 merge_operator_((cfd_ == nullptr) ? nullptr
1139 : cfd_->ioptions()->merge_operator),
1140 storage_info_(
1141 (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
1142 (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
1143 cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
1144 cfd_ == nullptr ? kCompactionStyleLevel
1145 : cfd_->ioptions()->compaction_style,
1146 (cfd_ == nullptr || cfd_->current() == nullptr)
1147 ? nullptr
1148 : cfd_->current()->storage_info(),
1149 cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
1150 vset_(vset),
1151 next_(this),
1152 prev_(this),
1153 refs_(0),
11fdf7f2
TL
1154 env_options_(env_opt),
1155 mutable_cf_options_(mutable_cf_options),
7c673cae
FG
1156 version_number_(version_number) {}
1157
1158void Version::Get(const ReadOptions& read_options, const LookupKey& k,
1159 PinnableSlice* value, Status* status,
1160 MergeContext* merge_context,
1161 RangeDelAggregator* range_del_agg, bool* value_found,
11fdf7f2
TL
1162 bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
1163 bool* is_blob) {
7c673cae
FG
1164 Slice ikey = k.internal_key();
1165 Slice user_key = k.user_key();
1166
1167 assert(status->ok() || status->IsMergeInProgress());
1168
1169 if (key_exists != nullptr) {
1170 // will falsify below if not found
1171 *key_exists = true;
1172 }
1173
1174 PinnedIteratorsManager pinned_iters_mgr;
1175 GetContext get_context(
1176 user_comparator(), merge_operator_, info_log_, db_statistics_,
1177 status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
1178 value, value_found, merge_context, range_del_agg, this->env_, seq,
11fdf7f2 1179 merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
7c673cae
FG
1180
1181 // Pin blocks that we read to hold merge operands
1182 if (merge_operator_) {
1183 pinned_iters_mgr.StartPinning();
1184 }
1185
1186 FilePicker fp(
1187 storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
1188 storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
1189 user_comparator(), internal_comparator());
1190 FdWithKeyRange* f = fp.GetNextFile();
11fdf7f2 1191
7c673cae 1192 while (f != nullptr) {
11fdf7f2
TL
1193 if (get_context.sample()) {
1194 sample_file_read_inc(f->file_metadata);
1195 }
1196
7c673cae 1197 *status = table_cache_->Get(
11fdf7f2
TL
1198 read_options, *internal_comparator(), *f->file_metadata, ikey,
1199 &get_context, mutable_cf_options_.prefix_extractor.get(),
7c673cae
FG
1200 cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1201 IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1202 fp.IsHitFileLastInLevel()),
1203 fp.GetCurrentLevel());
1204 // TODO: examine the behavior for corrupted key
1205 if (!status->ok()) {
1206 return;
1207 }
1208
11fdf7f2
TL
1209 // report the counters before returning
1210 if (get_context.State() != GetContext::kNotFound &&
1211 get_context.State() != GetContext::kMerge &&
1212 db_statistics_ != nullptr) {
1213 get_context.ReportCounters();
1214 }
7c673cae
FG
1215 switch (get_context.State()) {
1216 case GetContext::kNotFound:
1217 // Keep searching in other files
1218 break;
11fdf7f2
TL
1219 case GetContext::kMerge:
1220 break;
7c673cae
FG
1221 case GetContext::kFound:
1222 if (fp.GetHitFileLevel() == 0) {
1223 RecordTick(db_statistics_, GET_HIT_L0);
1224 } else if (fp.GetHitFileLevel() == 1) {
1225 RecordTick(db_statistics_, GET_HIT_L1);
1226 } else if (fp.GetHitFileLevel() >= 2) {
1227 RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
1228 }
1229 return;
1230 case GetContext::kDeleted:
1231 // Use empty error message for speed
1232 *status = Status::NotFound();
1233 return;
1234 case GetContext::kCorrupt:
1235 *status = Status::Corruption("corrupted key for ", user_key);
1236 return;
11fdf7f2
TL
1237 case GetContext::kBlobIndex:
1238 ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
1239 *status = Status::NotSupported(
1240 "Encounter unexpected blob index. Please open DB with "
1241 "rocksdb::blob_db::BlobDB instead.");
1242 return;
7c673cae
FG
1243 }
1244 f = fp.GetNextFile();
1245 }
1246
11fdf7f2
TL
1247 if (db_statistics_ != nullptr) {
1248 get_context.ReportCounters();
1249 }
7c673cae
FG
1250 if (GetContext::kMerge == get_context.State()) {
1251 if (!merge_operator_) {
1252 *status = Status::InvalidArgument(
1253 "merge_operator is not properly initialized.");
1254 return;
1255 }
1256 // merge_operands are in saver and we hit the beginning of the key history
1257 // do a final merge of nullptr and operands;
1258 std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
1259 *status = MergeHelper::TimedFullMerge(
1260 merge_operator_, user_key, nullptr, merge_context->GetOperands(),
11fdf7f2
TL
1261 str_value, info_log_, db_statistics_, env_,
1262 nullptr /* result_operand */, true);
7c673cae
FG
1263 if (LIKELY(value != nullptr)) {
1264 value->PinSelf();
1265 }
1266 } else {
1267 if (key_exists != nullptr) {
1268 *key_exists = false;
1269 }
1270 *status = Status::NotFound(); // Use an empty error message for speed
1271 }
1272}
1273
1274bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
1275 // Reaching the bottom level implies misses at all upper levels, so we'll
1276 // skip checking the filters when we predict a hit.
1277 return cfd_->ioptions()->optimize_filters_for_hits &&
1278 (level > 0 || is_file_last_in_level) &&
1279 level == storage_info_.num_non_empty_levels() - 1;
1280}
1281
1282void VersionStorageInfo::GenerateLevelFilesBrief() {
1283 level_files_brief_.resize(num_non_empty_levels_);
1284 for (int level = 0; level < num_non_empty_levels_; level++) {
1285 DoGenerateLevelFilesBrief(
1286 &level_files_brief_[level], files_[level], &arena_);
1287 }
1288}
1289
1290void Version::PrepareApply(
1291 const MutableCFOptions& mutable_cf_options,
1292 bool update_stats) {
1293 UpdateAccumulatedStats(update_stats);
1294 storage_info_.UpdateNumNonEmptyLevels();
1295 storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
1296 storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
1297 storage_info_.GenerateFileIndexer();
1298 storage_info_.GenerateLevelFilesBrief();
1299 storage_info_.GenerateLevel0NonOverlapping();
11fdf7f2 1300 storage_info_.GenerateBottommostFiles();
7c673cae
FG
1301}
1302
1303bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
1304 if (file_meta->init_stats_from_file ||
1305 file_meta->compensated_file_size > 0) {
1306 return false;
1307 }
1308 std::shared_ptr<const TableProperties> tp;
1309 Status s = GetTableProperties(&tp, file_meta);
1310 file_meta->init_stats_from_file = true;
1311 if (!s.ok()) {
1312 ROCKS_LOG_ERROR(vset_->db_options_->info_log,
1313 "Unable to load table properties for file %" PRIu64
1314 " --- %s\n",
1315 file_meta->fd.GetNumber(), s.ToString().c_str());
1316 return false;
1317 }
1318 if (tp.get() == nullptr) return false;
1319 file_meta->num_entries = tp->num_entries;
1320 file_meta->num_deletions = GetDeletedKeys(tp->user_collected_properties);
1321 file_meta->raw_value_size = tp->raw_value_size;
1322 file_meta->raw_key_size = tp->raw_key_size;
1323
1324 return true;
1325}
1326
1327void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
1328 assert(file_meta->init_stats_from_file);
1329 accumulated_file_size_ += file_meta->fd.GetFileSize();
1330 accumulated_raw_key_size_ += file_meta->raw_key_size;
1331 accumulated_raw_value_size_ += file_meta->raw_value_size;
1332 accumulated_num_non_deletions_ +=
1333 file_meta->num_entries - file_meta->num_deletions;
1334 accumulated_num_deletions_ += file_meta->num_deletions;
1335
1336 current_num_non_deletions_ +=
1337 file_meta->num_entries - file_meta->num_deletions;
1338 current_num_deletions_ += file_meta->num_deletions;
1339 current_num_samples_++;
1340}
1341
1342void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
1343 if (file_meta->init_stats_from_file) {
1344 current_num_non_deletions_ -=
1345 file_meta->num_entries - file_meta->num_deletions;
1346 current_num_deletions_ -= file_meta->num_deletions;
1347 current_num_samples_--;
1348 }
1349}
1350
1351void Version::UpdateAccumulatedStats(bool update_stats) {
1352 if (update_stats) {
1353 // maximum number of table properties loaded from files.
1354 const int kMaxInitCount = 20;
1355 int init_count = 0;
1356 // here only the first kMaxInitCount files which haven't been
1357 // initialized from file will be updated with num_deletions.
1358 // The motivation here is to cap the maximum I/O per Version creation.
1359 // The reason for choosing files from lower-level instead of higher-level
1360 // is that such design is able to propagate the initialization from
1361 // lower-level to higher-level: When the num_deletions of lower-level
1362 // files are updated, it will make the lower-level files have accurate
1363 // compensated_file_size, making lower-level to higher-level compaction
1364 // will be triggered, which creates higher-level files whose num_deletions
1365 // will be updated here.
1366 for (int level = 0;
1367 level < storage_info_.num_levels_ && init_count < kMaxInitCount;
1368 ++level) {
1369 for (auto* file_meta : storage_info_.files_[level]) {
1370 if (MaybeInitializeFileMetaData(file_meta)) {
1371 // each FileMeta will be initialized only once.
1372 storage_info_.UpdateAccumulatedStats(file_meta);
1373 // when option "max_open_files" is -1, all the file metadata has
1374 // already been read, so MaybeInitializeFileMetaData() won't incur
11fdf7f2
TL
1375 // any I/O cost. "max_open_files=-1" means that the table cache passed
1376 // to the VersionSet and then to the ColumnFamilySet has a size of
1377 // TableCache::kInfiniteCapacity
1378 if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
1379 TableCache::kInfiniteCapacity) {
7c673cae
FG
1380 continue;
1381 }
1382 if (++init_count >= kMaxInitCount) {
1383 break;
1384 }
1385 }
1386 }
1387 }
1388 // In case all sampled-files contain only deletion entries, then we
1389 // load the table-property of a file in higher-level to initialize
1390 // that value.
1391 for (int level = storage_info_.num_levels_ - 1;
1392 storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
1393 --level) {
1394 for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
1395 storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
1396 if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
1397 storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
1398 }
1399 }
1400 }
1401 }
1402
1403 storage_info_.ComputeCompensatedSizes();
1404}
1405
1406void VersionStorageInfo::ComputeCompensatedSizes() {
1407 static const int kDeletionWeightOnCompaction = 2;
1408 uint64_t average_value_size = GetAverageValueSize();
1409
1410 // compute the compensated size
1411 for (int level = 0; level < num_levels_; level++) {
1412 for (auto* file_meta : files_[level]) {
1413 // Here we only compute compensated_file_size for those file_meta
1414 // which compensated_file_size is uninitialized (== 0). This is true only
1415 // for files that have been created right now and no other thread has
1416 // access to them. That's why we can safely mutate compensated_file_size.
1417 if (file_meta->compensated_file_size == 0) {
1418 file_meta->compensated_file_size = file_meta->fd.GetFileSize();
1419 // Here we only boost the size of deletion entries of a file only
1420 // when the number of deletion entries is greater than the number of
1421 // non-deletion entries in the file. The motivation here is that in
1422 // a stable workload, the number of deletion entries should be roughly
1423 // equal to the number of non-deletion entries. If we compensate the
1424 // size of deletion entries in a stable workload, the deletion
1425 // compensation logic might introduce unwanted effet which changes the
1426 // shape of LSM tree.
1427 if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
1428 file_meta->compensated_file_size +=
1429 (file_meta->num_deletions * 2 - file_meta->num_entries) *
1430 average_value_size * kDeletionWeightOnCompaction;
1431 }
1432 }
1433 }
1434 }
1435}
1436
1437int VersionStorageInfo::MaxInputLevel() const {
1438 if (compaction_style_ == kCompactionStyleLevel) {
1439 return num_levels() - 2;
1440 }
1441 return 0;
1442}
1443
11fdf7f2
TL
1444int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
1445 if (allow_ingest_behind) {
1446 assert(num_levels() > 1);
1447 return num_levels() - 2;
1448 }
1449 return num_levels() - 1;
1450}
1451
7c673cae
FG
1452void VersionStorageInfo::EstimateCompactionBytesNeeded(
1453 const MutableCFOptions& mutable_cf_options) {
1454 // Only implemented for level-based compaction
1455 if (compaction_style_ != kCompactionStyleLevel) {
1456 estimated_compaction_needed_bytes_ = 0;
1457 return;
1458 }
1459
1460 // Start from Level 0, if level 0 qualifies compaction to level 1,
1461 // we estimate the size of compaction.
1462 // Then we move on to the next level and see whether it qualifies compaction
1463 // to the next level. The size of the level is estimated as the actual size
1464 // on the level plus the input bytes from the previous level if there is any.
1465 // If it exceeds, take the exceeded bytes as compaction input and add the size
1466 // of the compaction size to tatal size.
1467 // We keep doing it to Level 2, 3, etc, until the last level and return the
1468 // accumulated bytes.
1469
1470 uint64_t bytes_compact_to_next_level = 0;
11fdf7f2
TL
1471 uint64_t level_size = 0;
1472 for (auto* f : files_[0]) {
1473 level_size += f->fd.GetFileSize();
1474 }
7c673cae
FG
1475 // Level 0
1476 bool level0_compact_triggered = false;
11fdf7f2
TL
1477 if (static_cast<int>(files_[0].size()) >=
1478 mutable_cf_options.level0_file_num_compaction_trigger ||
1479 level_size >= mutable_cf_options.max_bytes_for_level_base) {
7c673cae 1480 level0_compact_triggered = true;
11fdf7f2
TL
1481 estimated_compaction_needed_bytes_ = level_size;
1482 bytes_compact_to_next_level = level_size;
7c673cae
FG
1483 } else {
1484 estimated_compaction_needed_bytes_ = 0;
1485 }
1486
1487 // Level 1 and up.
1488 uint64_t bytes_next_level = 0;
1489 for (int level = base_level(); level <= MaxInputLevel(); level++) {
11fdf7f2 1490 level_size = 0;
7c673cae
FG
1491 if (bytes_next_level > 0) {
1492#ifndef NDEBUG
1493 uint64_t level_size2 = 0;
1494 for (auto* f : files_[level]) {
1495 level_size2 += f->fd.GetFileSize();
1496 }
1497 assert(level_size2 == bytes_next_level);
1498#endif
1499 level_size = bytes_next_level;
1500 bytes_next_level = 0;
1501 } else {
1502 for (auto* f : files_[level]) {
1503 level_size += f->fd.GetFileSize();
1504 }
1505 }
1506 if (level == base_level() && level0_compact_triggered) {
1507 // Add base level size to compaction if level0 compaction triggered.
1508 estimated_compaction_needed_bytes_ += level_size;
1509 }
1510 // Add size added by previous compaction
1511 level_size += bytes_compact_to_next_level;
1512 bytes_compact_to_next_level = 0;
1513 uint64_t level_target = MaxBytesForLevel(level);
1514 if (level_size > level_target) {
1515 bytes_compact_to_next_level = level_size - level_target;
1516 // Estimate the actual compaction fan-out ratio as size ratio between
1517 // the two levels.
1518
1519 assert(bytes_next_level == 0);
1520 if (level + 1 < num_levels_) {
1521 for (auto* f : files_[level + 1]) {
1522 bytes_next_level += f->fd.GetFileSize();
1523 }
1524 }
1525 if (bytes_next_level > 0) {
1526 assert(level_size > 0);
1527 estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
1528 static_cast<double>(bytes_compact_to_next_level) *
1529 (static_cast<double>(bytes_next_level) /
1530 static_cast<double>(level_size) +
1531 1));
1532 }
1533 }
1534 }
1535}
1536
11fdf7f2
TL
1537namespace {
1538uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
1539 const MutableCFOptions& mutable_cf_options,
1540 const std::vector<FileMetaData*>& files) {
1541 uint32_t ttl_expired_files_count = 0;
1542
1543 int64_t _current_time;
1544 auto status = ioptions.env->GetCurrentTime(&_current_time);
1545 if (status.ok()) {
1546 const uint64_t current_time = static_cast<uint64_t>(_current_time);
1547 for (auto f : files) {
1548 if (!f->being_compacted && f->fd.table_reader != nullptr &&
1549 f->fd.table_reader->GetTableProperties() != nullptr) {
1550 auto creation_time =
1551 f->fd.table_reader->GetTableProperties()->creation_time;
1552 if (creation_time > 0 &&
1553 creation_time < (current_time -
1554 mutable_cf_options.compaction_options_fifo.ttl)) {
1555 ttl_expired_files_count++;
1556 }
1557 }
1558 }
1559 }
1560 return ttl_expired_files_count;
1561}
1562} // anonymous namespace
1563
7c673cae
FG
1564void VersionStorageInfo::ComputeCompactionScore(
1565 const ImmutableCFOptions& immutable_cf_options,
1566 const MutableCFOptions& mutable_cf_options) {
1567 for (int level = 0; level <= MaxInputLevel(); level++) {
1568 double score;
1569 if (level == 0) {
1570 // We treat level-0 specially by bounding the number of files
1571 // instead of number of bytes for two reasons:
1572 //
1573 // (1) With larger write-buffer sizes, it is nice not to do too
1574 // many level-0 compactions.
1575 //
1576 // (2) The files in level-0 are merged on every read and
1577 // therefore we wish to avoid too many files when the individual
1578 // file size is small (perhaps because of a small write-buffer
1579 // setting, or very high compression ratios, or lots of
1580 // overwrites/deletions).
1581 int num_sorted_runs = 0;
1582 uint64_t total_size = 0;
1583 for (auto* f : files_[level]) {
1584 if (!f->being_compacted) {
1585 total_size += f->compensated_file_size;
1586 num_sorted_runs++;
1587 }
1588 }
1589 if (compaction_style_ == kCompactionStyleUniversal) {
1590 // For universal compaction, we use level0 score to indicate
1591 // compaction score for the whole DB. Adding other levels as if
1592 // they are L0 files.
1593 for (int i = 1; i < num_levels(); i++) {
1594 if (!files_[i].empty() && !files_[i][0]->being_compacted) {
1595 num_sorted_runs++;
1596 }
1597 }
1598 }
1599
1600 if (compaction_style_ == kCompactionStyleFIFO) {
11fdf7f2
TL
1601 score = static_cast<double>(total_size) /
1602 mutable_cf_options.compaction_options_fifo.max_table_files_size;
1603 if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
1604 score = std::max(
1605 static_cast<double>(num_sorted_runs) /
1606 mutable_cf_options.level0_file_num_compaction_trigger,
1607 score);
1608 }
1609 if (mutable_cf_options.compaction_options_fifo.ttl > 0) {
1610 score = std::max(
1611 static_cast<double>(GetExpiredTtlFilesCount(
1612 immutable_cf_options, mutable_cf_options, files_[level])),
1613 score);
1614 }
1615
7c673cae
FG
1616 } else {
1617 score = static_cast<double>(num_sorted_runs) /
1618 mutable_cf_options.level0_file_num_compaction_trigger;
1619 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
1620 // Level-based involves L0->L0 compactions that can lead to oversized
1621 // L0 files. Take into account size as well to avoid later giant
1622 // compactions to the base level.
1623 score = std::max(
1624 score, static_cast<double>(total_size) /
1625 mutable_cf_options.max_bytes_for_level_base);
1626 }
1627 }
1628 } else {
1629 // Compute the ratio of current size to size limit.
1630 uint64_t level_bytes_no_compacting = 0;
1631 for (auto f : files_[level]) {
1632 if (!f->being_compacted) {
1633 level_bytes_no_compacting += f->compensated_file_size;
1634 }
1635 }
1636 score = static_cast<double>(level_bytes_no_compacting) /
1637 MaxBytesForLevel(level);
1638 }
1639 compaction_level_[level] = level;
1640 compaction_score_[level] = score;
1641 }
1642
1643 // sort all the levels based on their score. Higher scores get listed
1644 // first. Use bubble sort because the number of entries are small.
1645 for (int i = 0; i < num_levels() - 2; i++) {
1646 for (int j = i + 1; j < num_levels() - 1; j++) {
1647 if (compaction_score_[i] < compaction_score_[j]) {
1648 double score = compaction_score_[i];
1649 int level = compaction_level_[i];
1650 compaction_score_[i] = compaction_score_[j];
1651 compaction_level_[i] = compaction_level_[j];
1652 compaction_score_[j] = score;
1653 compaction_level_[j] = level;
1654 }
1655 }
1656 }
1657 ComputeFilesMarkedForCompaction();
11fdf7f2
TL
1658 ComputeBottommostFilesMarkedForCompaction();
1659 if (mutable_cf_options.ttl > 0) {
1660 ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
1661 }
7c673cae
FG
1662 EstimateCompactionBytesNeeded(mutable_cf_options);
1663}
1664
1665void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
1666 files_marked_for_compaction_.clear();
1667 int last_qualify_level = 0;
1668
1669 // Do not include files from the last level with data
1670 // If table properties collector suggests a file on the last level,
1671 // we should not move it to a new level.
1672 for (int level = num_levels() - 1; level >= 1; level--) {
1673 if (!files_[level].empty()) {
1674 last_qualify_level = level - 1;
1675 break;
1676 }
1677 }
1678
1679 for (int level = 0; level <= last_qualify_level; level++) {
1680 for (auto* f : files_[level]) {
1681 if (!f->being_compacted && f->marked_for_compaction) {
1682 files_marked_for_compaction_.emplace_back(level, f);
1683 }
1684 }
1685 }
1686}
1687
11fdf7f2
TL
1688void VersionStorageInfo::ComputeExpiredTtlFiles(
1689 const ImmutableCFOptions& ioptions, const uint64_t ttl) {
1690 assert(ttl > 0);
1691
1692 expired_ttl_files_.clear();
1693
1694 int64_t _current_time;
1695 auto status = ioptions.env->GetCurrentTime(&_current_time);
1696 if (!status.ok()) {
1697 return;
1698 }
1699 const uint64_t current_time = static_cast<uint64_t>(_current_time);
1700
1701 for (int level = 0; level < num_levels() - 1; level++) {
1702 for (auto f : files_[level]) {
1703 if (!f->being_compacted && f->fd.table_reader != nullptr &&
1704 f->fd.table_reader->GetTableProperties() != nullptr) {
1705 auto creation_time =
1706 f->fd.table_reader->GetTableProperties()->creation_time;
1707 if (creation_time > 0 && creation_time < (current_time - ttl)) {
1708 expired_ttl_files_.emplace_back(level, f);
1709 }
1710 }
1711 }
1712 }
1713}
1714
7c673cae
FG
1715namespace {
1716
1717// used to sort files by size
1718struct Fsize {
1719 size_t index;
1720 FileMetaData* file;
1721};
1722
1723// Compator that is used to sort files based on their size
1724// In normal mode: descending size
1725bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
1726 return (first.file->compensated_file_size >
1727 second.file->compensated_file_size);
1728}
1729} // anonymous namespace
1730
1731void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
1732 auto* level_files = &files_[level];
1733 // Must not overlap
1734#ifndef NDEBUG
1735 if (level > 0 && !level_files->empty() &&
1736 internal_comparator_->Compare(
1737 (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) {
1738 auto* f2 = (*level_files)[level_files->size() - 1];
1739 if (info_log != nullptr) {
1740 Error(info_log, "Adding new file %" PRIu64
1741 " range (%s, %s) to level %d but overlapping "
1742 "with existing file %" PRIu64 " %s %s",
1743 f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
1744 f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
1745 f2->smallest.DebugString(true).c_str(),
1746 f2->largest.DebugString(true).c_str());
1747 LogFlush(info_log);
1748 }
1749 assert(false);
1750 }
11fdf7f2
TL
1751#else
1752 (void)info_log;
7c673cae
FG
1753#endif
1754 f->refs++;
1755 level_files->push_back(f);
1756}
1757
1758// Version::PrepareApply() need to be called before calling the function, or
1759// following functions called:
1760// 1. UpdateNumNonEmptyLevels();
1761// 2. CalculateBaseBytes();
1762// 3. UpdateFilesByCompactionPri();
1763// 4. GenerateFileIndexer();
1764// 5. GenerateLevelFilesBrief();
1765// 6. GenerateLevel0NonOverlapping();
11fdf7f2 1766// 7. GenerateBottommostFiles();
7c673cae
FG
1767void VersionStorageInfo::SetFinalized() {
1768 finalized_ = true;
1769#ifndef NDEBUG
1770 if (compaction_style_ != kCompactionStyleLevel) {
1771 // Not level based compaction.
1772 return;
1773 }
1774 assert(base_level_ < 0 || num_levels() == 1 ||
1775 (base_level_ >= 1 && base_level_ < num_levels()));
1776 // Verify all levels newer than base_level are empty except L0
1777 for (int level = 1; level < base_level(); level++) {
1778 assert(NumLevelBytes(level) == 0);
1779 }
1780 uint64_t max_bytes_prev_level = 0;
1781 for (int level = base_level(); level < num_levels() - 1; level++) {
1782 if (LevelFiles(level).size() == 0) {
1783 continue;
1784 }
1785 assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
1786 max_bytes_prev_level = MaxBytesForLevel(level);
1787 }
1788 int num_empty_non_l0_level = 0;
1789 for (int level = 0; level < num_levels(); level++) {
1790 assert(LevelFiles(level).size() == 0 ||
1791 LevelFiles(level).size() == LevelFilesBrief(level).num_files);
1792 if (level > 0 && NumLevelBytes(level) > 0) {
1793 num_empty_non_l0_level++;
1794 }
1795 if (LevelFiles(level).size() > 0) {
1796 assert(level < num_non_empty_levels());
1797 }
1798 }
1799 assert(compaction_level_.size() > 0);
1800 assert(compaction_level_.size() == compaction_score_.size());
1801#endif
1802}
1803
1804void VersionStorageInfo::UpdateNumNonEmptyLevels() {
1805 num_non_empty_levels_ = num_levels_;
1806 for (int i = num_levels_ - 1; i >= 0; i--) {
1807 if (files_[i].size() != 0) {
1808 return;
1809 } else {
1810 num_non_empty_levels_ = i;
1811 }
1812 }
1813}
1814
1815namespace {
1816// Sort `temp` based on ratio of overlapping size over file size
1817void SortFileByOverlappingRatio(
1818 const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
1819 const std::vector<FileMetaData*>& next_level_files,
1820 std::vector<Fsize>* temp) {
1821 std::unordered_map<uint64_t, uint64_t> file_to_order;
1822 auto next_level_it = next_level_files.begin();
1823
1824 for (auto& file : files) {
1825 uint64_t overlapping_bytes = 0;
1826 // Skip files in next level that is smaller than current file
1827 while (next_level_it != next_level_files.end() &&
1828 icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
1829 next_level_it++;
1830 }
1831
1832 while (next_level_it != next_level_files.end() &&
1833 icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
1834 overlapping_bytes += (*next_level_it)->fd.file_size;
1835
1836 if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
1837 // next level file cross large boundary of current file.
1838 break;
1839 }
1840 next_level_it++;
1841 }
1842
1843 assert(file->fd.file_size != 0);
1844 file_to_order[file->fd.GetNumber()] =
1845 overlapping_bytes * 1024u / file->fd.file_size;
1846 }
1847
1848 std::sort(temp->begin(), temp->end(),
1849 [&](const Fsize& f1, const Fsize& f2) -> bool {
1850 return file_to_order[f1.file->fd.GetNumber()] <
1851 file_to_order[f2.file->fd.GetNumber()];
1852 });
1853}
1854} // namespace
1855
1856void VersionStorageInfo::UpdateFilesByCompactionPri(
1857 CompactionPri compaction_pri) {
11fdf7f2
TL
1858 if (compaction_style_ == kCompactionStyleNone ||
1859 compaction_style_ == kCompactionStyleFIFO ||
7c673cae
FG
1860 compaction_style_ == kCompactionStyleUniversal) {
1861 // don't need this
1862 return;
1863 }
1864 // No need to sort the highest level because it is never compacted.
1865 for (int level = 0; level < num_levels() - 1; level++) {
1866 const std::vector<FileMetaData*>& files = files_[level];
1867 auto& files_by_compaction_pri = files_by_compaction_pri_[level];
1868 assert(files_by_compaction_pri.size() == 0);
1869
1870 // populate a temp vector for sorting based on size
1871 std::vector<Fsize> temp(files.size());
1872 for (size_t i = 0; i < files.size(); i++) {
1873 temp[i].index = i;
1874 temp[i].file = files[i];
1875 }
1876
1877 // sort the top number_of_files_to_sort_ based on file size
1878 size_t num = VersionStorageInfo::kNumberFilesToSort;
1879 if (num > temp.size()) {
1880 num = temp.size();
1881 }
1882 switch (compaction_pri) {
1883 case kByCompensatedSize:
1884 std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
1885 CompareCompensatedSizeDescending);
1886 break;
1887 case kOldestLargestSeqFirst:
1888 std::sort(temp.begin(), temp.end(),
1889 [](const Fsize& f1, const Fsize& f2) -> bool {
11fdf7f2
TL
1890 return f1.file->fd.largest_seqno <
1891 f2.file->fd.largest_seqno;
7c673cae
FG
1892 });
1893 break;
1894 case kOldestSmallestSeqFirst:
1895 std::sort(temp.begin(), temp.end(),
1896 [](const Fsize& f1, const Fsize& f2) -> bool {
11fdf7f2
TL
1897 return f1.file->fd.smallest_seqno <
1898 f2.file->fd.smallest_seqno;
7c673cae
FG
1899 });
1900 break;
1901 case kMinOverlappingRatio:
1902 SortFileByOverlappingRatio(*internal_comparator_, files_[level],
1903 files_[level + 1], &temp);
1904 break;
1905 default:
1906 assert(false);
1907 }
1908 assert(temp.size() == files.size());
1909
1910 // initialize files_by_compaction_pri_
1911 for (size_t i = 0; i < temp.size(); i++) {
1912 files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
1913 }
1914 next_file_to_compact_by_size_[level] = 0;
1915 assert(files_[level].size() == files_by_compaction_pri_[level].size());
1916 }
1917}
1918
1919void VersionStorageInfo::GenerateLevel0NonOverlapping() {
1920 assert(!finalized_);
1921 level0_non_overlapping_ = true;
1922 if (level_files_brief_.size() == 0) {
1923 return;
1924 }
1925
1926 // A copy of L0 files sorted by smallest key
1927 std::vector<FdWithKeyRange> level0_sorted_file(
1928 level_files_brief_[0].files,
1929 level_files_brief_[0].files + level_files_brief_[0].num_files);
1930 std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
1931 [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
1932 return (internal_comparator_->Compare(f1.smallest_key,
1933 f2.smallest_key) < 0);
1934 });
1935
1936 for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
1937 FdWithKeyRange& f = level0_sorted_file[i];
1938 FdWithKeyRange& prev = level0_sorted_file[i - 1];
1939 if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
1940 level0_non_overlapping_ = false;
1941 break;
1942 }
1943 }
1944}
1945
11fdf7f2
TL
1946void VersionStorageInfo::GenerateBottommostFiles() {
1947 assert(!finalized_);
1948 assert(bottommost_files_.empty());
1949 for (size_t level = 0; level < level_files_brief_.size(); ++level) {
1950 for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
1951 ++file_idx) {
1952 const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
1953 int l0_file_idx;
1954 if (level == 0) {
1955 l0_file_idx = static_cast<int>(file_idx);
1956 } else {
1957 l0_file_idx = -1;
1958 }
1959 if (!RangeMightExistAfterSortedRun(f.smallest_key, f.largest_key,
1960 static_cast<int>(level),
1961 l0_file_idx)) {
1962 bottommost_files_.emplace_back(static_cast<int>(level),
1963 f.file_metadata);
1964 }
1965 }
1966 }
1967}
1968
1969void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
1970 assert(seqnum >= oldest_snapshot_seqnum_);
1971 oldest_snapshot_seqnum_ = seqnum;
1972 if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
1973 ComputeBottommostFilesMarkedForCompaction();
1974 }
1975}
1976
1977void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
1978 bottommost_files_marked_for_compaction_.clear();
1979 bottommost_files_mark_threshold_ = kMaxSequenceNumber;
1980 for (auto& level_and_file : bottommost_files_) {
1981 if (!level_and_file.second->being_compacted &&
1982 level_and_file.second->fd.largest_seqno != 0 &&
1983 level_and_file.second->num_deletions > 1) {
1984 // largest_seqno might be nonzero due to containing the final key in an
1985 // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
1986 // ensures the file really contains deleted or overwritten keys.
1987 if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
1988 bottommost_files_marked_for_compaction_.push_back(level_and_file);
1989 } else {
1990 bottommost_files_mark_threshold_ =
1991 std::min(bottommost_files_mark_threshold_,
1992 level_and_file.second->fd.largest_seqno);
1993 }
1994 }
1995 }
1996}
1997
7c673cae
FG
1998void Version::Ref() {
1999 ++refs_;
2000}
2001
2002bool Version::Unref() {
2003 assert(refs_ >= 1);
2004 --refs_;
2005 if (refs_ == 0) {
2006 delete this;
2007 return true;
2008 }
2009 return false;
2010}
2011
2012bool VersionStorageInfo::OverlapInLevel(int level,
2013 const Slice* smallest_user_key,
2014 const Slice* largest_user_key) {
2015 if (level >= num_non_empty_levels_) {
2016 // empty level, no overlap
2017 return false;
2018 }
2019 return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
2020 level_files_brief_[level], smallest_user_key,
2021 largest_user_key);
2022}
2023
2024// Store in "*inputs" all files in "level" that overlap [begin,end]
2025// If hint_index is specified, then it points to a file in the
2026// overlapping range.
2027// The file_index returns a pointer to any file in an overlapping range.
2028void VersionStorageInfo::GetOverlappingInputs(
2029 int level, const InternalKey* begin, const InternalKey* end,
2030 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
11fdf7f2 2031 bool expand_range, InternalKey** next_smallest) const {
7c673cae
FG
2032 if (level >= num_non_empty_levels_) {
2033 // this level is empty, no overlapping inputs
2034 return;
2035 }
2036
2037 inputs->clear();
11fdf7f2
TL
2038 if (file_index) {
2039 *file_index = -1;
2040 }
2041 const Comparator* user_cmp = user_comparator_;
2042 if (level > 0) {
2043 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
2044 file_index, false, next_smallest);
2045 return;
2046 }
2047
2048 if (next_smallest) {
2049 // next_smallest key only makes sense for non-level 0, where files are
2050 // non-overlapping
2051 *next_smallest = nullptr;
2052 }
2053
7c673cae
FG
2054 Slice user_begin, user_end;
2055 if (begin != nullptr) {
2056 user_begin = begin->user_key();
2057 }
2058 if (end != nullptr) {
2059 user_end = end->user_key();
2060 }
7c673cae 2061
11fdf7f2
TL
2062 // index stores the file index need to check.
2063 std::list<size_t> index;
2064 for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
2065 index.emplace_back(i);
2066 }
2067
2068 while (!index.empty()) {
2069 bool found_overlapping_file = false;
2070 auto iter = index.begin();
2071 while (iter != index.end()) {
2072 FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
2073 const Slice file_start = ExtractUserKey(f->smallest_key);
2074 const Slice file_limit = ExtractUserKey(f->largest_key);
2075 if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
2076 // "f" is completely before specified range; skip it
2077 iter++;
2078 } else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 0) {
2079 // "f" is completely after specified range; skip it
2080 iter++;
2081 } else {
2082 // if overlap
2083 inputs->emplace_back(files_[level][*iter]);
2084 found_overlapping_file = true;
2085 // record the first file index.
2086 if (file_index && *file_index == -1) {
2087 *file_index = static_cast<int>(*iter);
2088 }
2089 // the related file is overlap, erase to avoid checking again.
2090 iter = index.erase(iter);
2091 if (expand_range) {
2092 if (begin != nullptr &&
2093 user_cmp->Compare(file_start, user_begin) < 0) {
2094 user_begin = file_start;
2095 }
2096 if (end != nullptr &&
2097 user_cmp->Compare(file_limit, user_end) > 0) {
2098 user_end = file_limit;
2099 }
7c673cae 2100 }
7c673cae
FG
2101 }
2102 }
11fdf7f2
TL
2103 // if all the files left are not overlap, break
2104 if (!found_overlapping_file) {
2105 break;
2106 }
7c673cae
FG
2107 }
2108}
2109
2110// Store in "*inputs" files in "level" that within range [begin,end]
2111// Guarantee a "clean cut" boundary between the files in inputs
2112// and the surrounding files and the maxinum number of files.
2113// This will ensure that no parts of a key are lost during compaction.
2114// If hint_index is specified, then it points to a file in the range.
2115// The file_index returns a pointer to any file in an overlapping range.
2116void VersionStorageInfo::GetCleanInputsWithinInterval(
2117 int level, const InternalKey* begin, const InternalKey* end,
2118 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
11fdf7f2
TL
2119 inputs->clear();
2120 if (file_index) {
2121 *file_index = -1;
2122 }
2123 if (level >= num_non_empty_levels_ || level == 0 ||
2124 level_files_brief_[level].num_files == 0) {
7c673cae 2125 // this level is empty, no inputs within range
11fdf7f2 2126 // also don't support clean input interval within L0
7c673cae
FG
2127 return;
2128 }
2129
11fdf7f2
TL
2130 const auto& level_files = level_files_brief_[level];
2131 if (begin == nullptr) {
2132 begin = &level_files.files[0].file_metadata->smallest;
7c673cae 2133 }
11fdf7f2
TL
2134 if (end == nullptr) {
2135 end = &level_files.files[level_files.num_files - 1].file_metadata->largest;
7c673cae 2136 }
11fdf7f2
TL
2137
2138 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
2139 hint_index, file_index,
2140 true /* within_interval */);
2141}
2142
2143namespace {
2144
2145const uint64_t kRangeTombstoneSentinel =
2146 PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
2147
2148// Utility for comparing sstable boundary keys. Returns -1 if either a or b is
2149// null which provides the property that a==null indicates a key that is less
2150// than any key and b==null indicates a key that is greater than any key. Note
2151// that the comparison is performed primarily on the user-key portion of the
2152// key. If the user-keys compare equal, an additional test is made to sort
2153// range tombstone sentinel keys before other keys with the same user-key. The
2154// result is that 2 user-keys will compare equal if they differ purely on
2155// their sequence number and value, but the range tombstone sentinel for that
2156// user-key will compare not equal. This is necessary because the range
2157// tombstone sentinel key is set as the largest key for an sstable even though
2158// that key never appears in the database. We don't want adjacent sstables to
2159// be considered overlapping if they are separated by the range tombstone
2160// sentinel.
2161int sstableKeyCompare(const Comparator* user_cmp,
2162 const InternalKey& a, const InternalKey& b) {
2163 auto c = user_cmp->Compare(a.user_key(), b.user_key());
2164 if (c != 0) {
2165 return c;
2166 }
2167 auto a_footer = ExtractInternalKeyFooter(a.Encode());
2168 auto b_footer = ExtractInternalKeyFooter(b.Encode());
2169 if (a_footer == kRangeTombstoneSentinel) {
2170 if (b_footer != kRangeTombstoneSentinel) {
2171 return -1;
2172 }
2173 } else if (b_footer == kRangeTombstoneSentinel) {
2174 return 1;
7c673cae 2175 }
11fdf7f2
TL
2176 return 0;
2177}
2178
2179int sstableKeyCompare(const Comparator* user_cmp,
2180 const InternalKey* a, const InternalKey& b) {
2181 if (a == nullptr) {
2182 return -1;
7c673cae 2183 }
11fdf7f2 2184 return sstableKeyCompare(user_cmp, *a, b);
7c673cae
FG
2185}
2186
11fdf7f2
TL
2187int sstableKeyCompare(const Comparator* user_cmp,
2188 const InternalKey& a, const InternalKey* b) {
2189 if (b == nullptr) {
2190 return -1;
2191 }
2192 return sstableKeyCompare(user_cmp, a, *b);
2193}
2194
2195} // namespace
2196
7c673cae
FG
2197// Store in "*inputs" all files in "level" that overlap [begin,end]
2198// Employ binary search to find at least one file that overlaps the
2199// specified range. From that file, iterate backwards and
2200// forwards to find all overlapping files.
2201// if within_range is set, then only store the maximum clean inputs
2202// within range [begin, end]. "clean" means there is a boudnary
2203// between the files in "*inputs" and the surrounding files
2204void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
11fdf7f2 2205 int level, const InternalKey* begin, const InternalKey* end,
7c673cae 2206 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
11fdf7f2 2207 bool within_interval, InternalKey** next_smallest) const {
7c673cae
FG
2208 assert(level > 0);
2209 int min = 0;
2210 int mid = 0;
2211 int max = static_cast<int>(files_[level].size()) - 1;
2212 bool foundOverlap = false;
11fdf7f2 2213 auto user_cmp = user_comparator_;
7c673cae
FG
2214
2215 // if the caller already knows the index of a file that has overlap,
2216 // then we can skip the binary search.
2217 if (hint_index != -1) {
2218 mid = hint_index;
2219 foundOverlap = true;
2220 }
2221
2222 while (!foundOverlap && min <= max) {
2223 mid = (min + max)/2;
2224 FdWithKeyRange* f = &(level_files_brief_[level].files[mid]);
11fdf7f2
TL
2225 auto& smallest = f->file_metadata->smallest;
2226 auto& largest = f->file_metadata->largest;
2227 if ((!within_interval && sstableKeyCompare(user_cmp, begin, largest) > 0) ||
2228 (within_interval && sstableKeyCompare(user_cmp, begin, smallest) > 0)) {
7c673cae
FG
2229 min = mid + 1;
2230 } else if ((!within_interval &&
11fdf7f2 2231 sstableKeyCompare(user_cmp, smallest, end) > 0) ||
7c673cae 2232 (within_interval &&
11fdf7f2 2233 sstableKeyCompare(user_cmp, largest, end) > 0)) {
7c673cae
FG
2234 max = mid - 1;
2235 } else {
2236 foundOverlap = true;
2237 break;
2238 }
2239 }
2240
2241 // If there were no overlapping files, return immediately.
2242 if (!foundOverlap) {
11fdf7f2
TL
2243 if (next_smallest) {
2244 next_smallest = nullptr;
2245 }
7c673cae
FG
2246 return;
2247 }
2248 // returns the index where an overlap is found
2249 if (file_index) {
2250 *file_index = mid;
2251 }
2252
2253 int start_index, end_index;
2254 if (within_interval) {
11fdf7f2
TL
2255 ExtendFileRangeWithinInterval(level, begin, end, mid,
2256 &start_index, &end_index);
7c673cae 2257 } else {
11fdf7f2 2258 ExtendFileRangeOverlappingInterval(level, begin, end, mid,
7c673cae 2259 &start_index, &end_index);
11fdf7f2 2260 assert(end_index >= start_index);
7c673cae 2261 }
7c673cae
FG
2262 // insert overlapping files into vector
2263 for (int i = start_index; i <= end_index; i++) {
2264 inputs->push_back(files_[level][i]);
2265 }
11fdf7f2
TL
2266
2267 if (next_smallest != nullptr) {
2268 // Provide the next key outside the range covered by inputs
2269 if (++end_index < static_cast<int>(files_[level].size())) {
2270 **next_smallest = files_[level][end_index]->smallest;
2271 } else {
2272 *next_smallest = nullptr;
2273 }
2274 }
7c673cae
FG
2275}
2276
2277// Store in *start_index and *end_index the range of all files in
2278// "level" that overlap [begin,end]
2279// The mid_index specifies the index of at least one file that
2280// overlaps the specified range. From that file, iterate backward
2281// and forward to find all overlapping files.
2282// Use FileLevel in searching, make it faster
2283void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
11fdf7f2 2284 int level, const InternalKey* begin, const InternalKey* end,
7c673cae 2285 unsigned int mid_index, int* start_index, int* end_index) const {
11fdf7f2 2286 auto user_cmp = user_comparator_;
7c673cae
FG
2287 const FdWithKeyRange* files = level_files_brief_[level].files;
2288#ifndef NDEBUG
2289 {
2290 // assert that the file at mid_index overlaps with the range
2291 assert(mid_index < level_files_brief_[level].num_files);
2292 const FdWithKeyRange* f = &files[mid_index];
11fdf7f2
TL
2293 auto& smallest = f->file_metadata->smallest;
2294 auto& largest = f->file_metadata->largest;
2295 if (sstableKeyCompare(user_cmp, begin, smallest) <= 0) {
2296 assert(sstableKeyCompare(user_cmp, smallest, end) <= 0);
7c673cae 2297 } else {
11fdf7f2
TL
2298 // fprintf(stderr, "ExtendFileRangeOverlappingInterval\n%s - %s\n%s - %s\n%d %d\n",
2299 // begin ? begin->DebugString().c_str() : "(null)",
2300 // end ? end->DebugString().c_str() : "(null)",
2301 // smallest->DebugString().c_str(),
2302 // largest->DebugString().c_str(),
2303 // sstableKeyCompare(user_cmp, smallest, begin),
2304 // sstableKeyCompare(user_cmp, largest, begin));
2305 assert(sstableKeyCompare(user_cmp, begin, largest) <= 0);
7c673cae
FG
2306 }
2307 }
2308#endif
2309 *start_index = mid_index + 1;
2310 *end_index = mid_index;
11fdf7f2
TL
2311 int count __attribute__((__unused__));
2312 count = 0;
7c673cae
FG
2313
2314 // check backwards from 'mid' to lower indices
2315 for (int i = mid_index; i >= 0 ; i--) {
2316 const FdWithKeyRange* f = &files[i];
11fdf7f2
TL
2317 auto& largest = f->file_metadata->largest;
2318 if (sstableKeyCompare(user_cmp, begin, largest) <= 0) {
7c673cae
FG
2319 *start_index = i;
2320 assert((count++, true));
2321 } else {
2322 break;
2323 }
2324 }
2325 // check forward from 'mid+1' to higher indices
2326 for (unsigned int i = mid_index+1;
2327 i < level_files_brief_[level].num_files; i++) {
2328 const FdWithKeyRange* f = &files[i];
11fdf7f2
TL
2329 auto& smallest = f->file_metadata->smallest;
2330 if (sstableKeyCompare(user_cmp, smallest, end) <= 0) {
7c673cae
FG
2331 assert((count++, true));
2332 *end_index = i;
2333 } else {
2334 break;
2335 }
2336 }
2337 assert(count == *end_index - *start_index + 1);
2338}
2339
2340// Store in *start_index and *end_index the clean range of all files in
2341// "level" within [begin,end]
2342// The mid_index specifies the index of at least one file within
2343// the specified range. From that file, iterate backward
2344// and forward to find all overlapping files and then "shrink" to
2345// the clean range required.
2346// Use FileLevel in searching, make it faster
2347void VersionStorageInfo::ExtendFileRangeWithinInterval(
11fdf7f2 2348 int level, const InternalKey* begin, const InternalKey* end,
7c673cae
FG
2349 unsigned int mid_index, int* start_index, int* end_index) const {
2350 assert(level != 0);
11fdf7f2 2351 auto* user_cmp = user_comparator_;
7c673cae
FG
2352 const FdWithKeyRange* files = level_files_brief_[level].files;
2353#ifndef NDEBUG
2354 {
2355 // assert that the file at mid_index is within the range
2356 assert(mid_index < level_files_brief_[level].num_files);
2357 const FdWithKeyRange* f = &files[mid_index];
11fdf7f2
TL
2358 auto& smallest = f->file_metadata->smallest;
2359 auto& largest = f->file_metadata->largest;
2360 assert(sstableKeyCompare(user_cmp, begin, smallest) <= 0 &&
2361 sstableKeyCompare(user_cmp, largest, end) <= 0);
7c673cae
FG
2362 }
2363#endif
11fdf7f2 2364 ExtendFileRangeOverlappingInterval(level, begin, end, mid_index,
7c673cae
FG
2365 start_index, end_index);
2366 int left = *start_index;
2367 int right = *end_index;
2368 // shrink from left to right
2369 while (left <= right) {
11fdf7f2
TL
2370 auto& smallest = files[left].file_metadata->smallest;
2371 if (sstableKeyCompare(user_cmp, begin, smallest) > 0) {
7c673cae
FG
2372 left++;
2373 continue;
2374 }
2375 if (left > 0) { // If not first file
11fdf7f2
TL
2376 auto& largest = files[left - 1].file_metadata->largest;
2377 if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
7c673cae
FG
2378 left++;
2379 continue;
2380 }
2381 }
2382 break;
2383 }
2384 // shrink from right to left
2385 while (left <= right) {
11fdf7f2
TL
2386 auto& largest = files[right].file_metadata->largest;
2387 if (sstableKeyCompare(user_cmp, largest, end) > 0) {
7c673cae
FG
2388 right--;
2389 continue;
2390 }
2391 if (right < static_cast<int>(level_files_brief_[level].num_files) -
2392 1) { // If not the last file
11fdf7f2
TL
2393 auto& smallest = files[right + 1].file_metadata->smallest;
2394 if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
7c673cae
FG
2395 // The last user key in range overlaps with the next file's first key
2396 right--;
2397 continue;
2398 }
2399 }
2400 break;
2401 }
2402
2403 *start_index = left;
2404 *end_index = right;
2405}
2406
2407uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
2408 assert(level >= 0);
2409 assert(level < num_levels());
2410 return TotalFileSize(files_[level]);
2411}
2412
2413const char* VersionStorageInfo::LevelSummary(
2414 LevelSummaryStorage* scratch) const {
2415 int len = 0;
2416 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
2417 assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
2418 len = snprintf(scratch->buffer, sizeof(scratch->buffer),
2419 "base level %d max bytes base %" PRIu64 " ", base_level_,
2420 level_max_bytes_[base_level_]);
2421 }
2422 len +=
2423 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
2424 for (int i = 0; i < num_levels(); i++) {
2425 int sz = sizeof(scratch->buffer) - len;
2426 int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
2427 if (ret < 0 || ret >= sz) break;
2428 len += ret;
2429 }
2430 if (len > 0) {
2431 // overwrite the last space
2432 --len;
2433 }
2434 len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
2435 "] max score %.2f", compaction_score_[0]);
2436
2437 if (!files_marked_for_compaction_.empty()) {
2438 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
2439 " (%" ROCKSDB_PRIszt " files need compaction)",
2440 files_marked_for_compaction_.size());
2441 }
2442
2443 return scratch->buffer;
2444}
2445
2446const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
2447 int level) const {
2448 int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
2449 for (const auto& f : files_[level]) {
2450 int sz = sizeof(scratch->buffer) - len;
2451 char sztxt[16];
2452 AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
2453 int ret = snprintf(scratch->buffer + len, sz,
2454 "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
11fdf7f2 2455 f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
7c673cae
FG
2456 static_cast<int>(f->being_compacted));
2457 if (ret < 0 || ret >= sz)
2458 break;
2459 len += ret;
2460 }
2461 // overwrite the last space (only if files_[level].size() is non-zero)
2462 if (files_[level].size() && len > 0) {
2463 --len;
2464 }
2465 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
2466 return scratch->buffer;
2467}
2468
2469int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
2470 uint64_t result = 0;
2471 std::vector<FileMetaData*> overlaps;
2472 for (int level = 1; level < num_levels() - 1; level++) {
2473 for (const auto& f : files_[level]) {
2474 GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
2475 const uint64_t sum = TotalFileSize(overlaps);
2476 if (sum > result) {
2477 result = sum;
2478 }
2479 }
2480 }
2481 return result;
2482}
2483
2484uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
2485 // Note: the result for level zero is not really used since we set
2486 // the level-0 compaction threshold based on number of files.
2487 assert(level >= 0);
2488 assert(level < static_cast<int>(level_max_bytes_.size()));
2489 return level_max_bytes_[level];
2490}
2491
2492void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
2493 const MutableCFOptions& options) {
2494 // Special logic to set number of sorted runs.
2495 // It is to match the previous behavior when all files are in L0.
2496 int num_l0_count = static_cast<int>(files_[0].size());
2497 if (compaction_style_ == kCompactionStyleUniversal) {
2498 // For universal compaction, we use level0 score to indicate
2499 // compaction score for the whole DB. Adding other levels as if
2500 // they are L0 files.
2501 for (int i = 1; i < num_levels(); i++) {
2502 if (!files_[i].empty()) {
2503 num_l0_count++;
2504 }
2505 }
2506 }
2507 set_l0_delay_trigger_count(num_l0_count);
2508
2509 level_max_bytes_.resize(ioptions.num_levels);
2510 if (!ioptions.level_compaction_dynamic_level_bytes) {
2511 base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
2512
2513 // Calculate for static bytes base case
2514 for (int i = 0; i < ioptions.num_levels; ++i) {
2515 if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
2516 level_max_bytes_[i] = options.max_bytes_for_level_base;
2517 } else if (i > 1) {
2518 level_max_bytes_[i] = MultiplyCheckOverflow(
2519 MultiplyCheckOverflow(level_max_bytes_[i - 1],
2520 options.max_bytes_for_level_multiplier),
2521 options.MaxBytesMultiplerAdditional(i - 1));
2522 } else {
2523 level_max_bytes_[i] = options.max_bytes_for_level_base;
2524 }
2525 }
2526 } else {
2527 uint64_t max_level_size = 0;
2528
2529 int first_non_empty_level = -1;
2530 // Find size of non-L0 level of most data.
2531 // Cannot use the size of the last level because it can be empty or less
2532 // than previous levels after compaction.
2533 for (int i = 1; i < num_levels_; i++) {
2534 uint64_t total_size = 0;
2535 for (const auto& f : files_[i]) {
2536 total_size += f->fd.GetFileSize();
2537 }
2538 if (total_size > 0 && first_non_empty_level == -1) {
2539 first_non_empty_level = i;
2540 }
2541 if (total_size > max_level_size) {
2542 max_level_size = total_size;
2543 }
2544 }
2545
2546 // Prefill every level's max bytes to disallow compaction from there.
2547 for (int i = 0; i < num_levels_; i++) {
2548 level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
2549 }
2550
2551 if (max_level_size == 0) {
2552 // No data for L1 and up. L0 compacts to last level directly.
2553 // No compaction from L1+ needs to be scheduled.
2554 base_level_ = num_levels_ - 1;
2555 } else {
2556 uint64_t base_bytes_max = options.max_bytes_for_level_base;
2557 uint64_t base_bytes_min = static_cast<uint64_t>(
2558 base_bytes_max / options.max_bytes_for_level_multiplier);
2559
2560 // Try whether we can make last level's target size to be max_level_size
2561 uint64_t cur_level_size = max_level_size;
2562 for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
2563 // Round up after dividing
2564 cur_level_size = static_cast<uint64_t>(
2565 cur_level_size / options.max_bytes_for_level_multiplier);
2566 }
2567
2568 // Calculate base level and its size.
2569 uint64_t base_level_size;
2570 if (cur_level_size <= base_bytes_min) {
2571 // Case 1. If we make target size of last level to be max_level_size,
2572 // target size of the first non-empty level would be smaller than
2573 // base_bytes_min. We set it be base_bytes_min.
2574 base_level_size = base_bytes_min + 1U;
2575 base_level_ = first_non_empty_level;
2576 ROCKS_LOG_WARN(ioptions.info_log,
2577 "More existing levels in DB than needed. "
2578 "max_bytes_for_level_multiplier may not be guaranteed.");
2579 } else {
2580 // Find base level (where L0 data is compacted to).
2581 base_level_ = first_non_empty_level;
2582 while (base_level_ > 1 && cur_level_size > base_bytes_max) {
2583 --base_level_;
2584 cur_level_size = static_cast<uint64_t>(
2585 cur_level_size / options.max_bytes_for_level_multiplier);
2586 }
2587 if (cur_level_size > base_bytes_max) {
2588 // Even L1 will be too large
2589 assert(base_level_ == 1);
2590 base_level_size = base_bytes_max;
2591 } else {
2592 base_level_size = cur_level_size;
2593 }
2594 }
2595
2596 uint64_t level_size = base_level_size;
2597 for (int i = base_level_; i < num_levels_; i++) {
2598 if (i > base_level_) {
2599 level_size = MultiplyCheckOverflow(
2600 level_size, options.max_bytes_for_level_multiplier);
2601 }
11fdf7f2
TL
2602 // Don't set any level below base_bytes_max. Otherwise, the LSM can
2603 // assume an hourglass shape where L1+ sizes are smaller than L0. This
2604 // causes compaction scoring, which depends on level sizes, to favor L1+
2605 // at the expense of L0, which may fill up and stall.
2606 level_max_bytes_[i] = std::max(level_size, base_bytes_max);
7c673cae
FG
2607 }
2608 }
2609 }
2610}
2611
2612uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
2613 // Estimate the live data size by adding up the size of the last level for all
2614 // key ranges. Note: Estimate depends on the ordering of files in level 0
2615 // because files in level 0 can be overlapping.
2616 uint64_t size = 0;
2617
2618 auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
2619 return internal_comparator_->Compare(*x, *y) < 0;
2620 };
2621 // (Ordered) map of largest keys in non-overlapping files
2622 std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
2623
2624 for (int l = num_levels_ - 1; l >= 0; l--) {
2625 bool found_end = false;
2626 for (auto file : files_[l]) {
2627 // Find the first file where the largest key is larger than the smallest
2628 // key of the current file. If this file does not overlap with the
2629 // current file, none of the files in the map does. If there is
2630 // no potential overlap, we can safely insert the rest of this level
2631 // (if the level is not 0) into the map without checking again because
2632 // the elements in the level are sorted and non-overlapping.
2633 auto lb = (found_end && l != 0) ?
2634 ranges.end() : ranges.lower_bound(&file->smallest);
2635 found_end = (lb == ranges.end());
2636 if (found_end || internal_comparator_->Compare(
2637 file->largest, (*lb).second->smallest) < 0) {
2638 ranges.emplace_hint(lb, &file->largest, file);
2639 size += file->fd.file_size;
2640 }
2641 }
2642 }
2643 return size;
2644}
2645
11fdf7f2
TL
2646bool VersionStorageInfo::RangeMightExistAfterSortedRun(
2647 const Slice& smallest_key, const Slice& largest_key, int last_level,
2648 int last_l0_idx) {
2649 assert((last_l0_idx != -1) == (last_level == 0));
2650 // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
2651 // bottommost only if it's the oldest L0 file and there are no files on older
2652 // levels. It'd be better to consider it bottommost if there's no overlap in
2653 // older levels/files.
2654 if (last_level == 0 &&
2655 last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
2656 return true;
2657 }
2658
2659 // Checks whether there are files living beyond the `last_level`. If lower
2660 // levels have files, it checks for overlap between [`smallest_key`,
2661 // `largest_key`] and those files. Bottomlevel optimizations can be made if
2662 // there are no files in lower levels or if there is no overlap with the files
2663 // in the lower levels.
2664 for (int level = last_level + 1; level < num_levels(); level++) {
2665 // The range is not in the bottommost level if there are files in lower
2666 // levels when the `last_level` is 0 or if there are files in lower levels
2667 // which overlap with [`smallest_key`, `largest_key`].
2668 if (files_[level].size() > 0 &&
2669 (last_level == 0 ||
2670 OverlapInLevel(level, &smallest_key, &largest_key))) {
2671 return true;
2672 }
2673 }
2674 return false;
2675}
7c673cae
FG
2676
2677void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
2678 for (int level = 0; level < storage_info_.num_levels(); level++) {
2679 const std::vector<FileMetaData*>& files = storage_info_.files_[level];
2680 for (const auto& file : files) {
2681 live->push_back(file->fd);
2682 }
2683 }
2684}
2685
11fdf7f2 2686std::string Version::DebugString(bool hex, bool print_stats) const {
7c673cae
FG
2687 std::string r;
2688 for (int level = 0; level < storage_info_.num_levels_; level++) {
2689 // E.g.,
2690 // --- level 1 ---
2691 // 17:123['a' .. 'd']
2692 // 20:43['e' .. 'g']
11fdf7f2
TL
2693 //
2694 // if print_stats=true:
2695 // 17:123['a' .. 'd'](4096)
7c673cae
FG
2696 r.append("--- level ");
2697 AppendNumberTo(&r, level);
2698 r.append(" --- version# ");
2699 AppendNumberTo(&r, version_number_);
2700 r.append(" ---\n");
2701 const std::vector<FileMetaData*>& files = storage_info_.files_[level];
2702 for (size_t i = 0; i < files.size(); i++) {
2703 r.push_back(' ');
2704 AppendNumberTo(&r, files[i]->fd.GetNumber());
2705 r.push_back(':');
2706 AppendNumberTo(&r, files[i]->fd.GetFileSize());
2707 r.append("[");
2708 r.append(files[i]->smallest.DebugString(hex));
2709 r.append(" .. ");
2710 r.append(files[i]->largest.DebugString(hex));
11fdf7f2
TL
2711 r.append("]");
2712 if (print_stats) {
2713 r.append("(");
2714 r.append(ToString(
2715 files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
2716 r.append(")");
2717 }
2718 r.append("\n");
7c673cae
FG
2719 }
2720 }
2721 return r;
2722}
2723
2724// this is used to batch writes to the manifest file
2725struct VersionSet::ManifestWriter {
2726 Status status;
2727 bool done;
2728 InstrumentedCondVar cv;
2729 ColumnFamilyData* cfd;
11fdf7f2 2730 const MutableCFOptions mutable_cf_options;
7c673cae
FG
2731 const autovector<VersionEdit*>& edit_list;
2732
2733 explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
11fdf7f2 2734 const MutableCFOptions& cf_options,
7c673cae 2735 const autovector<VersionEdit*>& e)
11fdf7f2
TL
2736 : done(false),
2737 cv(mu),
2738 cfd(_cfd),
2739 mutable_cf_options(cf_options),
2740 edit_list(e) {}
7c673cae
FG
2741};
2742
2743VersionSet::VersionSet(const std::string& dbname,
11fdf7f2 2744 const ImmutableDBOptions* _db_options,
7c673cae
FG
2745 const EnvOptions& storage_options, Cache* table_cache,
2746 WriteBufferManager* write_buffer_manager,
2747 WriteController* write_controller)
2748 : column_family_set_(
11fdf7f2 2749 new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
7c673cae 2750 write_buffer_manager, write_controller)),
11fdf7f2 2751 env_(_db_options->env),
7c673cae 2752 dbname_(dbname),
11fdf7f2 2753 db_options_(_db_options),
7c673cae
FG
2754 next_file_number_(2),
2755 manifest_file_number_(0), // Filled by Recover()
11fdf7f2 2756 options_file_number_(0),
7c673cae
FG
2757 pending_manifest_file_number_(0),
2758 last_sequence_(0),
11fdf7f2
TL
2759 last_allocated_sequence_(0),
2760 last_published_sequence_(0),
7c673cae
FG
2761 prev_log_number_(0),
2762 current_version_number_(0),
2763 manifest_file_size_(0),
11fdf7f2 2764 env_options_(storage_options) {}
7c673cae
FG
2765
2766void CloseTables(void* ptr, size_t) {
2767 TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
2768 table_reader->Close();
2769}
2770
2771VersionSet::~VersionSet() {
2772 // we need to delete column_family_set_ because its destructor depends on
2773 // VersionSet
11fdf7f2
TL
2774 Cache* table_cache = column_family_set_->get_table_cache();
2775 table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */);
7c673cae 2776 column_family_set_.reset();
11fdf7f2
TL
2777 for (auto& file : obsolete_files_) {
2778 if (file.metadata->table_reader_handle) {
2779 table_cache->Release(file.metadata->table_reader_handle);
2780 TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
2781 }
2782 file.DeleteMetadata();
7c673cae
FG
2783 }
2784 obsolete_files_.clear();
2785}
2786
2787void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
2788 Version* v) {
2789 // compute new compaction score
2790 v->storage_info()->ComputeCompactionScore(
2791 *column_family_data->ioptions(),
2792 *column_family_data->GetLatestMutableCFOptions());
2793
2794 // Mark v finalized
2795 v->storage_info_.SetFinalized();
2796
2797 // Make "v" current
2798 assert(v->refs_ == 0);
2799 Version* current = column_family_data->current();
2800 assert(v != current);
2801 if (current != nullptr) {
2802 assert(current->refs_ > 0);
2803 current->Unref();
2804 }
2805 column_family_data->SetCurrent(v);
2806 v->Ref();
2807
2808 // Append to linked list
2809 v->prev_ = column_family_data->dummy_versions()->prev_;
2810 v->next_ = column_family_data->dummy_versions();
2811 v->prev_->next_ = v;
2812 v->next_->prev_ = v;
2813}
2814
11fdf7f2
TL
2815Status VersionSet::ProcessManifestWrites(
2816 std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
2817 Directory* db_directory, bool new_descriptor_log,
2818 const ColumnFamilyOptions* new_cf_options) {
2819 assert(!writers.empty());
2820 ManifestWriter& first_writer = writers.front();
2821 ManifestWriter* last_writer = &first_writer;
7c673cae 2822
11fdf7f2
TL
2823 assert(!manifest_writers_.empty());
2824 assert(manifest_writers_.front() == &first_writer);
7c673cae
FG
2825
2826 autovector<VersionEdit*> batch_edits;
11fdf7f2
TL
2827 autovector<Version*> versions;
2828 autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
2829 std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
2830
2831 if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
2832 // No group commits for column family add or drop
2833 LogAndApplyCFHelper(first_writer.edit_list.front());
2834 batch_edits.push_back(first_writer.edit_list.front());
7c673cae 2835 } else {
11fdf7f2
TL
2836 auto it = manifest_writers_.cbegin();
2837 while (it != manifest_writers_.cend()) {
2838 if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
7c673cae 2839 // no group commits for column family add or drop
7c673cae
FG
2840 break;
2841 }
11fdf7f2
TL
2842 last_writer = *(it++);
2843 assert(last_writer != nullptr);
2844 assert(last_writer->cfd != nullptr);
2845 if (last_writer->cfd != nullptr && last_writer->cfd->IsDropped()) {
2846 continue;
2847 }
2848 // We do a linear search on versions because versions is small.
2849 // TODO(yanqin) maybe consider unordered_map
2850 Version* version = nullptr;
2851 VersionBuilder* builder = nullptr;
2852 for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
2853 uint32_t cf_id = last_writer->cfd->GetID();
2854 if (versions[i]->cfd()->GetID() == cf_id) {
2855 version = versions[i];
2856 assert(!builder_guards.empty() &&
2857 builder_guards.size() == versions.size());
2858 builder = builder_guards[i]->version_builder();
2859 TEST_SYNC_POINT_CALLBACK(
2860 "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
2861 break;
2862 }
2863 }
2864 if (version == nullptr) {
2865 version = new Version(last_writer->cfd, this, env_options_,
2866 last_writer->mutable_cf_options,
2867 current_version_number_++);
2868 versions.push_back(version);
2869 mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
2870 builder_guards.emplace_back(
2871 new BaseReferencedVersionBuilder(last_writer->cfd));
2872 builder = builder_guards.back()->version_builder();
2873 }
2874 assert(builder != nullptr); // make checker happy
2875 for (const auto& e : last_writer->edit_list) {
2876 LogAndApplyHelper(last_writer->cfd, builder, version, e, mu);
2877 batch_edits.push_back(e);
7c673cae
FG
2878 }
2879 }
11fdf7f2
TL
2880 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
2881 assert(!builder_guards.empty() &&
2882 builder_guards.size() == versions.size());
2883 auto* builder = builder_guards[i]->version_builder();
2884 builder->SaveTo(versions[i]->storage_info());
2885 }
7c673cae
FG
2886 }
2887
7c673cae
FG
2888 uint64_t new_manifest_file_size = 0;
2889 Status s;
2890
2891 assert(pending_manifest_file_number_ == 0);
2892 if (!descriptor_log_ ||
2893 manifest_file_size_ > db_options_->max_manifest_file_size) {
2894 pending_manifest_file_number_ = NewFileNumber();
2895 batch_edits.back()->SetNextFile(next_file_number_.load());
2896 new_descriptor_log = true;
2897 } else {
2898 pending_manifest_file_number_ = manifest_file_number_;
2899 }
2900
2901 if (new_descriptor_log) {
11fdf7f2
TL
2902 // if we are writing out new snapshot make sure to persist max column
2903 // family.
7c673cae 2904 if (column_family_set_->GetMaxColumnFamily() > 0) {
11fdf7f2 2905 first_writer.edit_list.front()->SetMaxColumnFamily(
7c673cae
FG
2906 column_family_set_->GetMaxColumnFamily());
2907 }
2908 }
2909
7c673cae 2910 {
11fdf7f2 2911 EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
7c673cae
FG
2912 mu->Unlock();
2913
2914 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
11fdf7f2
TL
2915 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation() &&
2916 column_family_set_->get_table_cache()->GetCapacity() ==
2917 TableCache::kInfiniteCapacity) {
2918 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
2919 assert(!builder_guards.empty() &&
2920 builder_guards.size() == versions.size());
2921 assert(!mutable_cf_options_ptrs.empty() &&
2922 builder_guards.size() == versions.size());
2923 ColumnFamilyData* cfd = versions[i]->cfd_;
2924 builder_guards[i]->version_builder()->LoadTableHandlers(
2925 cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
2926 true /* prefetch_index_and_filter_in_cache */,
2927 mutable_cf_options_ptrs[i]->prefix_extractor.get());
2928 }
7c673cae
FG
2929 }
2930
2931 // This is fine because everything inside of this block is serialized --
2932 // only one thread can be here at the same time
2933 if (new_descriptor_log) {
11fdf7f2 2934 // create new manifest file
7c673cae
FG
2935 ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
2936 pending_manifest_file_number_);
11fdf7f2
TL
2937 std::string descriptor_fname =
2938 DescriptorFileName(dbname_, pending_manifest_file_number_);
7c673cae 2939 unique_ptr<WritableFile> descriptor_file;
11fdf7f2
TL
2940 s = NewWritableFile(env_, descriptor_fname, &descriptor_file,
2941 opt_env_opts);
7c673cae
FG
2942 if (s.ok()) {
2943 descriptor_file->SetPreallocationBlockSize(
2944 db_options_->manifest_preallocation_size);
2945
11fdf7f2
TL
2946 unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
2947 std::move(descriptor_file), descriptor_fname, opt_env_opts));
7c673cae
FG
2948 descriptor_log_.reset(
2949 new log::Writer(std::move(file_writer), 0, false));
2950 s = WriteSnapshot(descriptor_log_.get());
2951 }
2952 }
2953
11fdf7f2
TL
2954 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
2955 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
2956 versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
2957 }
7c673cae
FG
2958 }
2959
11fdf7f2 2960 // Write new records to MANIFEST log
7c673cae
FG
2961 if (s.ok()) {
2962 for (auto& e : batch_edits) {
2963 std::string record;
2964 if (!e->EncodeTo(&record)) {
11fdf7f2
TL
2965 s = Status::Corruption("Unable to encode VersionEdit:" +
2966 e->DebugString(true));
7c673cae
FG
2967 break;
2968 }
2969 TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
2970 rocksdb_kill_odds * REDUCE_ODDS2);
2971 s = descriptor_log_->AddRecord(record);
2972 if (!s.ok()) {
2973 break;
2974 }
2975 }
2976 if (s.ok()) {
2977 s = SyncManifest(env_, db_options_, descriptor_log_->file());
2978 }
2979 if (!s.ok()) {
11fdf7f2 2980 ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
7c673cae
FG
2981 s.ToString().c_str());
2982 }
2983 }
2984
2985 // If we just created a new descriptor file, install it by writing a
2986 // new CURRENT file that points to it.
2987 if (s.ok() && new_descriptor_log) {
2988 s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
2989 db_directory);
2990 }
2991
2992 if (s.ok()) {
2993 // find offset in manifest file where this version is stored.
2994 new_manifest_file_size = descriptor_log_->file()->GetFileSize();
2995 }
2996
11fdf7f2 2997 if (first_writer.edit_list.front()->is_column_family_drop_) {
7c673cae
FG
2998 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
2999 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
3000 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
3001 }
3002
3003 LogFlush(db_options_->info_log);
3004 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
3005 mu->Lock();
3006 }
3007
11fdf7f2 3008 // Append the old manifest file to the obsolete_manifest_ list to be deleted
7c673cae
FG
3009 // by PurgeObsoleteFiles later.
3010 if (s.ok() && new_descriptor_log) {
3011 obsolete_manifests_.emplace_back(
3012 DescriptorFileName("", manifest_file_number_));
3013 }
3014
11fdf7f2 3015 // Install the new versions
7c673cae 3016 if (s.ok()) {
11fdf7f2 3017 if (first_writer.edit_list.front()->is_column_family_add_) {
7c673cae
FG
3018 assert(batch_edits.size() == 1);
3019 assert(new_cf_options != nullptr);
11fdf7f2
TL
3020 CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
3021 } else if (first_writer.edit_list.front()->is_column_family_drop_) {
7c673cae 3022 assert(batch_edits.size() == 1);
11fdf7f2
TL
3023 first_writer.cfd->SetDropped();
3024 if (first_writer.cfd->Unref()) {
3025 delete first_writer.cfd;
7c673cae
FG
3026 }
3027 } else {
11fdf7f2
TL
3028 // Each version in versions corresponds to a column family.
3029 // For each column family, update its log number indicating that logs
3030 // with number smaller than this should be ignored.
3031 for (const auto version : versions) {
3032 uint64_t max_log_number_in_batch = 0;
3033 uint32_t cf_id = version->cfd_->GetID();
3034 for (const auto& e : batch_edits) {
3035 if (e->has_log_number_ && e->column_family_ == cf_id) {
3036 max_log_number_in_batch =
3037 std::max(max_log_number_in_batch, e->log_number_);
3038 }
3039 }
3040 if (max_log_number_in_batch != 0) {
3041 assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
3042 version->cfd_->SetLogNumber(max_log_number_in_batch);
3043 }
3044 }
3045
3046 uint64_t last_min_log_number_to_keep = 0;
7c673cae 3047 for (auto& e : batch_edits) {
11fdf7f2
TL
3048 if (e->has_min_log_number_to_keep_) {
3049 last_min_log_number_to_keep =
3050 std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
7c673cae
FG
3051 }
3052 }
11fdf7f2
TL
3053
3054 if (last_min_log_number_to_keep != 0) {
3055 // Should only be set in 2PC mode.
3056 MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
7c673cae 3057 }
7c673cae 3058
11fdf7f2
TL
3059 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3060 ColumnFamilyData* cfd = versions[i]->cfd_;
3061 AppendVersion(cfd, versions[i]);
3062 }
3063 }
7c673cae
FG
3064 manifest_file_number_ = pending_manifest_file_number_;
3065 manifest_file_size_ = new_manifest_file_size;
11fdf7f2 3066 prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
7c673cae
FG
3067 } else {
3068 std::string version_edits;
3069 for (auto& e : batch_edits) {
11fdf7f2
TL
3070 version_edits += ("\n" + e->DebugString(true));
3071 }
3072 ROCKS_LOG_ERROR(db_options_->info_log,
3073 "Error in committing version edit to MANIFEST: %s",
3074 version_edits.c_str());
3075 for (auto v : versions) {
3076 delete v;
7c673cae 3077 }
7c673cae 3078 if (new_descriptor_log) {
11fdf7f2
TL
3079 ROCKS_LOG_INFO(db_options_->info_log,
3080 "Deleting manifest %" PRIu64 " current manifest %" PRIu64
3081 "\n",
7c673cae
FG
3082 manifest_file_number_, pending_manifest_file_number_);
3083 descriptor_log_.reset();
3084 env_->DeleteFile(
3085 DescriptorFileName(dbname_, pending_manifest_file_number_));
3086 }
3087 }
11fdf7f2 3088
7c673cae
FG
3089 pending_manifest_file_number_ = 0;
3090
3091 // wake up all the waiting writers
3092 while (true) {
3093 ManifestWriter* ready = manifest_writers_.front();
3094 manifest_writers_.pop_front();
11fdf7f2
TL
3095 bool need_signal = true;
3096 for (const auto& w : writers) {
3097 if (&w == ready) {
3098 need_signal = false;
3099 break;
3100 }
3101 }
3102 ready->status = s;
3103 ready->done = true;
3104 if (need_signal) {
7c673cae
FG
3105 ready->cv.Signal();
3106 }
11fdf7f2
TL
3107 if (ready == last_writer) {
3108 break;
3109 }
7c673cae 3110 }
7c673cae
FG
3111 if (!manifest_writers_.empty()) {
3112 manifest_writers_.front()->cv.Signal();
3113 }
3114 return s;
3115}
3116
11fdf7f2
TL
3117// 'datas' is gramatically incorrect. We still use this notation is to indicate
3118// that this variable represents a collection of column_family_data.
3119Status VersionSet::LogAndApply(
3120 const std::vector<ColumnFamilyData*>& column_family_datas,
3121 const std::vector<MutableCFOptions>& mutable_cf_options_list,
3122 const std::vector<autovector<VersionEdit*>>& edit_lists,
3123 InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log,
3124 const ColumnFamilyOptions* new_cf_options) {
3125 mu->AssertHeld();
3126 int num_edits = 0;
3127 for (const auto& elist : edit_lists) {
3128 num_edits += static_cast<int>(elist.size());
3129 }
3130 if (num_edits == 0) {
3131 return Status::OK();
3132 } else if (num_edits > 1) {
3133#ifndef NDEBUG
3134 for (const auto& edit_list : edit_lists) {
3135 for (const auto& edit : edit_list) {
3136 assert(!edit->IsColumnFamilyManipulation());
3137 }
3138 }
3139#endif /* ! NDEBUG */
3140 }
3141
3142 int num_cfds = static_cast<int>(column_family_datas.size());
3143 if (num_cfds == 1 && column_family_datas[0] == nullptr) {
3144 assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
3145 assert(edit_lists[0][0]->is_column_family_add_);
3146 assert(new_cf_options != nullptr);
3147 }
3148 std::deque<ManifestWriter> writers;
3149 if (num_cfds > 0) {
3150 assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
3151 assert(static_cast<size_t>(num_cfds) == edit_lists.size());
3152 }
3153 for (int i = 0; i < num_cfds; ++i) {
3154 writers.emplace_back(mu, column_family_datas[i], mutable_cf_options_list[i],
3155 edit_lists[i]);
3156 manifest_writers_.push_back(&writers[i]);
3157 }
3158 assert(!writers.empty());
3159 ManifestWriter& first_writer = writers.front();
3160 while (!first_writer.done && &first_writer != manifest_writers_.front()) {
3161 first_writer.cv.Wait();
3162 }
3163 if (first_writer.done) {
3164 // All non-CF-manipulation operations can be grouped together and committed
3165 // to MANIFEST. They should all have finished. The status code is stored in
3166 // the first manifest writer.
3167#ifndef NDEBUG
3168 for (const auto& writer : writers) {
3169 assert(writer.done);
3170 }
3171#endif /* !NDEBUG */
3172 return first_writer.status;
3173 }
3174
3175 int num_undropped_cfds = 0;
3176 for (auto cfd : column_family_datas) {
3177 // if cfd == nullptr, it is a column family add.
3178 if (cfd == nullptr || !cfd->IsDropped()) {
3179 ++num_undropped_cfds;
3180 }
3181 }
3182 if (0 == num_undropped_cfds) {
3183 // TODO (yanqin) maybe use a different status code to denote column family
3184 // drop other than OK and ShutdownInProgress
3185 for (int i = 0; i != num_cfds; ++i) {
3186 manifest_writers_.pop_front();
3187 }
3188 // Notify new head of manifest write queue.
3189 if (!manifest_writers_.empty()) {
3190 manifest_writers_.front()->cv.Signal();
3191 }
3192 return Status::OK();
3193 }
3194
3195 return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
3196 new_cf_options);
3197}
3198
7c673cae
FG
3199void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
3200 assert(edit->IsColumnFamilyManipulation());
3201 edit->SetNextFile(next_file_number_.load());
11fdf7f2
TL
3202 // The log might have data that is not visible to memtbale and hence have not
3203 // updated the last_sequence_ yet. It is also possible that the log has is
3204 // expecting some new data that is not written yet. Since LastSequence is an
3205 // upper bound on the sequence, it is ok to record
3206 // last_allocated_sequence_ as the last sequence.
3207 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
3208 : last_sequence_);
7c673cae
FG
3209 if (edit->is_column_family_drop_) {
3210 // if we drop column family, we have to make sure to save max column family,
3211 // so that we don't reuse existing ID
3212 edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
3213 }
3214}
3215
3216void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
11fdf7f2 3217 VersionBuilder* builder, Version* /*v*/,
7c673cae 3218 VersionEdit* edit, InstrumentedMutex* mu) {
11fdf7f2
TL
3219#ifdef NDEBUG
3220 (void)cfd;
3221#endif
7c673cae
FG
3222 mu->AssertHeld();
3223 assert(!edit->IsColumnFamilyManipulation());
3224
3225 if (edit->has_log_number_) {
3226 assert(edit->log_number_ >= cfd->GetLogNumber());
3227 assert(edit->log_number_ < next_file_number_.load());
3228 }
3229
3230 if (!edit->has_prev_log_number_) {
3231 edit->SetPrevLogNumber(prev_log_number_);
3232 }
3233 edit->SetNextFile(next_file_number_.load());
11fdf7f2
TL
3234 // The log might have data that is not visible to memtbale and hence have not
3235 // updated the last_sequence_ yet. It is also possible that the log has is
3236 // expecting some new data that is not written yet. Since LastSequence is an
3237 // upper bound on the sequence, it is ok to record
3238 // last_allocated_sequence_ as the last sequence.
3239 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
3240 : last_sequence_);
7c673cae
FG
3241
3242 builder->Apply(edit);
3243}
3244
11fdf7f2
TL
3245Status VersionSet::ApplyOneVersionEdit(
3246 VersionEdit& edit,
3247 const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
3248 std::unordered_map<int, std::string>& column_families_not_found,
3249 std::unordered_map<uint32_t, BaseReferencedVersionBuilder*>& builders,
3250 bool* have_log_number, uint64_t* /* log_number */,
3251 bool* have_prev_log_number, uint64_t* previous_log_number,
3252 bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
3253 SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
3254 uint32_t* max_column_family) {
3255 // Not found means that user didn't supply that column
3256 // family option AND we encountered column family add
3257 // record. Once we encounter column family drop record,
3258 // we will delete the column family from
3259 // column_families_not_found.
3260 bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
3261 column_families_not_found.end());
3262 // in builders means that user supplied that column family
3263 // option AND that we encountered column family add record
3264 bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
3265
3266 // they can't both be true
3267 assert(!(cf_in_not_found && cf_in_builders));
3268
3269 ColumnFamilyData* cfd = nullptr;
3270
3271 if (edit.is_column_family_add_) {
3272 if (cf_in_builders || cf_in_not_found) {
3273 return Status::Corruption(
3274 "Manifest adding the same column family twice: " +
3275 edit.column_family_name_);
3276 }
3277 auto cf_options = name_to_options.find(edit.column_family_name_);
3278 if (cf_options == name_to_options.end()) {
3279 column_families_not_found.insert(
3280 {edit.column_family_, edit.column_family_name_});
3281 } else {
3282 cfd = CreateColumnFamily(cf_options->second, &edit);
3283 cfd->set_initialized();
3284 builders.insert(
3285 {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
3286 }
3287 } else if (edit.is_column_family_drop_) {
3288 if (cf_in_builders) {
3289 auto builder = builders.find(edit.column_family_);
3290 assert(builder != builders.end());
3291 delete builder->second;
3292 builders.erase(builder);
3293 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
3294 assert(cfd != nullptr);
3295 if (cfd->Unref()) {
3296 delete cfd;
3297 cfd = nullptr;
3298 } else {
3299 // who else can have reference to cfd!?
3300 assert(false);
3301 }
3302 } else if (cf_in_not_found) {
3303 column_families_not_found.erase(edit.column_family_);
3304 } else {
3305 return Status::Corruption(
3306 "Manifest - dropping non-existing column family");
3307 }
3308 } else if (!cf_in_not_found) {
3309 if (!cf_in_builders) {
3310 return Status::Corruption(
3311 "Manifest record referencing unknown column family");
3312 }
3313
3314 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
3315 // this should never happen since cf_in_builders is true
3316 assert(cfd != nullptr);
3317
3318 // if it is not column family add or column family drop,
3319 // then it's a file add/delete, which should be forwarded
3320 // to builder
3321 auto builder = builders.find(edit.column_family_);
3322 assert(builder != builders.end());
3323 builder->second->version_builder()->Apply(&edit);
3324 }
3325
3326 if (cfd != nullptr) {
3327 if (edit.has_log_number_) {
3328 if (cfd->GetLogNumber() > edit.log_number_) {
3329 ROCKS_LOG_WARN(
3330 db_options_->info_log,
3331 "MANIFEST corruption detected, but ignored - Log numbers in "
3332 "records NOT monotonically increasing");
3333 } else {
3334 cfd->SetLogNumber(edit.log_number_);
3335 *have_log_number = true;
3336 }
3337 }
3338 if (edit.has_comparator_ &&
3339 edit.comparator_ != cfd->user_comparator()->Name()) {
3340 return Status::InvalidArgument(
3341 cfd->user_comparator()->Name(),
3342 "does not match existing comparator " + edit.comparator_);
3343 }
3344 }
3345
3346 if (edit.has_prev_log_number_) {
3347 *previous_log_number = edit.prev_log_number_;
3348 *have_prev_log_number = true;
3349 }
3350
3351 if (edit.has_next_file_number_) {
3352 *next_file = edit.next_file_number_;
3353 *have_next_file = true;
3354 }
3355
3356 if (edit.has_max_column_family_) {
3357 *max_column_family = edit.max_column_family_;
3358 }
3359
3360 if (edit.has_min_log_number_to_keep_) {
3361 *min_log_number_to_keep =
3362 std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_);
3363 }
3364
3365 if (edit.has_last_sequence_) {
3366 *last_sequence = edit.last_sequence_;
3367 *have_last_sequence = true;
3368 }
3369 return Status::OK();
3370}
3371
7c673cae
FG
3372Status VersionSet::Recover(
3373 const std::vector<ColumnFamilyDescriptor>& column_families,
3374 bool read_only) {
3375 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
3376 for (auto cf : column_families) {
3377 cf_name_to_options.insert({cf.name, cf.options});
3378 }
3379 // keeps track of column families in manifest that were not found in
3380 // column families parameters. if those column families are not dropped
3381 // by subsequent manifest records, Recover() will return failure status
3382 std::unordered_map<int, std::string> column_families_not_found;
3383
3384 // Read "CURRENT" file, which contains a pointer to the current manifest file
3385 std::string manifest_filename;
3386 Status s = ReadFileToString(
3387 env_, CurrentFileName(dbname_), &manifest_filename
3388 );
3389 if (!s.ok()) {
3390 return s;
3391 }
3392 if (manifest_filename.empty() ||
3393 manifest_filename.back() != '\n') {
3394 return Status::Corruption("CURRENT file does not end with newline");
3395 }
3396 // remove the trailing '\n'
3397 manifest_filename.resize(manifest_filename.size() - 1);
3398 FileType type;
3399 bool parse_ok =
3400 ParseFileName(manifest_filename, &manifest_file_number_, &type);
3401 if (!parse_ok || type != kDescriptorFile) {
3402 return Status::Corruption("CURRENT file corrupted");
3403 }
3404
3405 ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
3406 manifest_filename.c_str());
3407
3408 manifest_filename = dbname_ + "/" + manifest_filename;
3409 unique_ptr<SequentialFileReader> manifest_file_reader;
3410 {
3411 unique_ptr<SequentialFile> manifest_file;
3412 s = env_->NewSequentialFile(manifest_filename, &manifest_file,
11fdf7f2 3413 env_->OptimizeForManifestRead(env_options_));
7c673cae
FG
3414 if (!s.ok()) {
3415 return s;
3416 }
3417 manifest_file_reader.reset(
11fdf7f2 3418 new SequentialFileReader(std::move(manifest_file), manifest_filename));
7c673cae
FG
3419 }
3420 uint64_t current_manifest_file_size;
3421 s = env_->GetFileSize(manifest_filename, &current_manifest_file_size);
3422 if (!s.ok()) {
3423 return s;
3424 }
3425
3426 bool have_log_number = false;
3427 bool have_prev_log_number = false;
3428 bool have_next_file = false;
3429 bool have_last_sequence = false;
3430 uint64_t next_file = 0;
3431 uint64_t last_sequence = 0;
3432 uint64_t log_number = 0;
3433 uint64_t previous_log_number = 0;
3434 uint32_t max_column_family = 0;
11fdf7f2 3435 uint64_t min_log_number_to_keep = 0;
7c673cae
FG
3436 std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
3437
3438 // add default column family
3439 auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
3440 if (default_cf_iter == cf_name_to_options.end()) {
3441 return Status::InvalidArgument("Default column family not specified");
3442 }
3443 VersionEdit default_cf_edit;
3444 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
3445 default_cf_edit.SetColumnFamily(0);
3446 ColumnFamilyData* default_cfd =
3447 CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
11fdf7f2
TL
3448 // In recovery, nobody else can access it, so it's fine to set it to be
3449 // initialized earlier.
3450 default_cfd->set_initialized();
7c673cae
FG
3451 builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
3452
3453 {
3454 VersionSet::LogReporter reporter;
3455 reporter.status = &s;
11fdf7f2
TL
3456 log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
3457 true /* checksum */, 0 /* log_number */);
7c673cae
FG
3458 Slice record;
3459 std::string scratch;
11fdf7f2
TL
3460 std::vector<VersionEdit> replay_buffer;
3461 size_t num_entries_decoded = 0;
7c673cae
FG
3462 while (reader.ReadRecord(&record, &scratch) && s.ok()) {
3463 VersionEdit edit;
3464 s = edit.DecodeFrom(record);
3465 if (!s.ok()) {
3466 break;
3467 }
3468
11fdf7f2
TL
3469 if (edit.is_in_atomic_group_) {
3470 if (replay_buffer.empty()) {
3471 replay_buffer.resize(edit.remaining_entries_ + 1);
7c673cae 3472 }
11fdf7f2
TL
3473 ++num_entries_decoded;
3474 if (num_entries_decoded + edit.remaining_entries_ !=
3475 static_cast<uint32_t>(replay_buffer.size())) {
3476 return Status::Corruption("corrupted atomic group");
7c673cae 3477 }
11fdf7f2
TL
3478 replay_buffer[num_entries_decoded - 1] = std::move(edit);
3479 if (num_entries_decoded == replay_buffer.size()) {
3480 for (auto& e : replay_buffer) {
3481 s = ApplyOneVersionEdit(
3482 e, cf_name_to_options, column_families_not_found, builders,
3483 &have_log_number, &log_number, &have_prev_log_number,
3484 &previous_log_number, &have_next_file, &next_file,
3485 &have_last_sequence, &last_sequence, &min_log_number_to_keep,
3486 &max_column_family);
3487 if (!s.ok()) {
3488 break;
3489 }
7c673cae 3490 }
11fdf7f2
TL
3491 replay_buffer.clear();
3492 num_entries_decoded = 0;
7c673cae 3493 }
11fdf7f2
TL
3494 } else {
3495 if (!replay_buffer.empty()) {
3496 return Status::Corruption("corrupted atomic group");
7c673cae 3497 }
11fdf7f2
TL
3498 s = ApplyOneVersionEdit(
3499 edit, cf_name_to_options, column_families_not_found, builders,
3500 &have_log_number, &log_number, &have_prev_log_number,
3501 &previous_log_number, &have_next_file, &next_file,
3502 &have_last_sequence, &last_sequence, &min_log_number_to_keep,
3503 &max_column_family);
7c673cae 3504 }
11fdf7f2
TL
3505 if (!s.ok()) {
3506 break;
7c673cae
FG
3507 }
3508 }
3509 }
3510
3511 if (s.ok()) {
3512 if (!have_next_file) {
3513 s = Status::Corruption("no meta-nextfile entry in descriptor");
3514 } else if (!have_log_number) {
3515 s = Status::Corruption("no meta-lognumber entry in descriptor");
3516 } else if (!have_last_sequence) {
3517 s = Status::Corruption("no last-sequence-number entry in descriptor");
3518 }
3519
3520 if (!have_prev_log_number) {
3521 previous_log_number = 0;
3522 }
3523
3524 column_family_set_->UpdateMaxColumnFamily(max_column_family);
3525
11fdf7f2
TL
3526 // When reading DB generated using old release, min_log_number_to_keep=0.
3527 // All log files will be scanned for potential prepare entries.
3528 MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
3529 MarkFileNumberUsed(previous_log_number);
3530 MarkFileNumberUsed(log_number);
7c673cae
FG
3531 }
3532
3533 // there were some column families in the MANIFEST that weren't specified
3534 // in the argument. This is OK in read_only mode
3535 if (read_only == false && !column_families_not_found.empty()) {
3536 std::string list_of_not_found;
3537 for (const auto& cf : column_families_not_found) {
3538 list_of_not_found += ", " + cf.second;
3539 }
3540 list_of_not_found = list_of_not_found.substr(2);
3541 s = Status::InvalidArgument(
3542 "You have to open all column families. Column families not opened: " +
3543 list_of_not_found);
3544 }
3545
11fdf7f2
TL
3546 if (s.ok()) {
3547 for (auto cfd : *column_family_set_) {
3548 assert(builders.count(cfd->GetID()) > 0);
3549 auto* builder = builders[cfd->GetID()]->version_builder();
3550 if (!builder->CheckConsistencyForNumLevels()) {
3551 s = Status::InvalidArgument(
3552 "db has more levels than options.num_levels");
3553 break;
3554 }
3555 }
3556 }
3557
7c673cae
FG
3558 if (s.ok()) {
3559 for (auto cfd : *column_family_set_) {
3560 if (cfd->IsDropped()) {
3561 continue;
3562 }
11fdf7f2
TL
3563 if (read_only) {
3564 cfd->table_cache()->SetTablesAreImmortal();
3565 }
3566 assert(cfd->initialized());
7c673cae
FG
3567 auto builders_iter = builders.find(cfd->GetID());
3568 assert(builders_iter != builders.end());
3569 auto* builder = builders_iter->second->version_builder();
3570
11fdf7f2
TL
3571 if (GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
3572 TableCache::kInfiniteCapacity) {
7c673cae
FG
3573 // unlimited table cache. Pre-load table handle now.
3574 // Need to do it out of the mutex.
3575 builder->LoadTableHandlers(
3576 cfd->internal_stats(), db_options_->max_file_opening_threads,
11fdf7f2
TL
3577 false /* prefetch_index_and_filter_in_cache */,
3578 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
7c673cae
FG
3579 }
3580
11fdf7f2
TL
3581 Version* v = new Version(cfd, this, env_options_,
3582 *cfd->GetLatestMutableCFOptions(),
3583 current_version_number_++);
7c673cae
FG
3584 builder->SaveTo(v->storage_info());
3585
3586 // Install recovered version
3587 v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
3588 !(db_options_->skip_stats_update_on_db_open));
3589 AppendVersion(cfd, v);
3590 }
3591
3592 manifest_file_size_ = current_manifest_file_size;
3593 next_file_number_.store(next_file + 1);
11fdf7f2
TL
3594 last_allocated_sequence_ = last_sequence;
3595 last_published_sequence_ = last_sequence;
7c673cae
FG
3596 last_sequence_ = last_sequence;
3597 prev_log_number_ = previous_log_number;
3598
3599 ROCKS_LOG_INFO(
3600 db_options_->info_log,
3601 "Recovered from manifest file:%s succeeded,"
3602 "manifest_file_number is %lu, next_file_number is %lu, "
3603 "last_sequence is %lu, log_number is %lu,"
3604 "prev_log_number is %lu,"
11fdf7f2
TL
3605 "max_column_family is %u,"
3606 "min_log_number_to_keep is %lu\n",
7c673cae
FG
3607 manifest_filename.c_str(), (unsigned long)manifest_file_number_,
3608 (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
3609 (unsigned long)log_number, (unsigned long)prev_log_number_,
11fdf7f2 3610 column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
7c673cae
FG
3611
3612 for (auto cfd : *column_family_set_) {
3613 if (cfd->IsDropped()) {
3614 continue;
3615 }
3616 ROCKS_LOG_INFO(db_options_->info_log,
3617 "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
3618 cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
3619 }
3620 }
3621
3622 for (auto& builder : builders) {
3623 delete builder.second;
3624 }
3625
3626 return s;
3627}
3628
3629Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
3630 const std::string& dbname, Env* env) {
3631 // these are just for performance reasons, not correcntes,
3632 // so we're fine using the defaults
3633 EnvOptions soptions;
3634 // Read "CURRENT" file, which contains a pointer to the current manifest file
3635 std::string current;
3636 Status s = ReadFileToString(env, CurrentFileName(dbname), &current);
3637 if (!s.ok()) {
3638 return s;
3639 }
3640 if (current.empty() || current[current.size()-1] != '\n') {
3641 return Status::Corruption("CURRENT file does not end with newline");
3642 }
3643 current.resize(current.size() - 1);
3644
3645 std::string dscname = dbname + "/" + current;
3646
3647 unique_ptr<SequentialFileReader> file_reader;
3648 {
3649 unique_ptr<SequentialFile> file;
3650 s = env->NewSequentialFile(dscname, &file, soptions);
3651 if (!s.ok()) {
3652 return s;
3653 }
11fdf7f2 3654 file_reader.reset(new SequentialFileReader(std::move(file), dscname));
7c673cae
FG
3655 }
3656
3657 std::map<uint32_t, std::string> column_family_names;
3658 // default column family is always implicitly there
3659 column_family_names.insert({0, kDefaultColumnFamilyName});
3660 VersionSet::LogReporter reporter;
3661 reporter.status = &s;
11fdf7f2
TL
3662 log::Reader reader(nullptr, std::move(file_reader), &reporter,
3663 true /* checksum */, 0 /* log_number */);
7c673cae
FG
3664 Slice record;
3665 std::string scratch;
3666 while (reader.ReadRecord(&record, &scratch) && s.ok()) {
3667 VersionEdit edit;
3668 s = edit.DecodeFrom(record);
3669 if (!s.ok()) {
3670 break;
3671 }
3672 if (edit.is_column_family_add_) {
3673 if (column_family_names.find(edit.column_family_) !=
3674 column_family_names.end()) {
3675 s = Status::Corruption("Manifest adding the same column family twice");
3676 break;
3677 }
3678 column_family_names.insert(
3679 {edit.column_family_, edit.column_family_name_});
3680 } else if (edit.is_column_family_drop_) {
3681 if (column_family_names.find(edit.column_family_) ==
3682 column_family_names.end()) {
3683 s = Status::Corruption(
3684 "Manifest - dropping non-existing column family");
3685 break;
3686 }
3687 column_family_names.erase(edit.column_family_);
3688 }
3689 }
3690
3691 column_families->clear();
3692 if (s.ok()) {
3693 for (const auto& iter : column_family_names) {
3694 column_families->push_back(iter.second);
3695 }
3696 }
3697
3698 return s;
3699}
3700
3701#ifndef ROCKSDB_LITE
3702Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
3703 const Options* options,
3704 const EnvOptions& env_options,
3705 int new_levels) {
3706 if (new_levels <= 1) {
3707 return Status::InvalidArgument(
3708 "Number of levels needs to be bigger than 1");
3709 }
3710
3711 ImmutableDBOptions db_options(*options);
3712 ColumnFamilyOptions cf_options(*options);
3713 std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
3714 options->table_cache_numshardbits));
3715 WriteController wc(options->delayed_write_rate);
3716 WriteBufferManager wb(options->db_write_buffer_size);
3717 VersionSet versions(dbname, &db_options, env_options, tc.get(), &wb, &wc);
3718 Status status;
3719
3720 std::vector<ColumnFamilyDescriptor> dummy;
3721 ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
3722 ColumnFamilyOptions(*options));
3723 dummy.push_back(dummy_descriptor);
3724 status = versions.Recover(dummy);
3725 if (!status.ok()) {
3726 return status;
3727 }
3728
3729 Version* current_version =
3730 versions.GetColumnFamilySet()->GetDefault()->current();
3731 auto* vstorage = current_version->storage_info();
3732 int current_levels = vstorage->num_levels();
3733
3734 if (current_levels <= new_levels) {
3735 return Status::OK();
3736 }
3737
3738 // Make sure there are file only on one level from
3739 // (new_levels-1) to (current_levels-1)
3740 int first_nonempty_level = -1;
3741 int first_nonempty_level_filenum = 0;
3742 for (int i = new_levels - 1; i < current_levels; i++) {
3743 int file_num = vstorage->NumLevelFiles(i);
3744 if (file_num != 0) {
3745 if (first_nonempty_level < 0) {
3746 first_nonempty_level = i;
3747 first_nonempty_level_filenum = file_num;
3748 } else {
3749 char msg[255];
3750 snprintf(msg, sizeof(msg),
3751 "Found at least two levels containing files: "
3752 "[%d:%d],[%d:%d].\n",
3753 first_nonempty_level, first_nonempty_level_filenum, i,
3754 file_num);
3755 return Status::InvalidArgument(msg);
3756 }
3757 }
3758 }
3759
3760 // we need to allocate an array with the old number of levels size to
3761 // avoid SIGSEGV in WriteSnapshot()
3762 // however, all levels bigger or equal to new_levels will be empty
3763 std::vector<FileMetaData*>* new_files_list =
3764 new std::vector<FileMetaData*>[current_levels];
3765 for (int i = 0; i < new_levels - 1; i++) {
3766 new_files_list[i] = vstorage->LevelFiles(i);
3767 }
3768
3769 if (first_nonempty_level > 0) {
3770 new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
3771 }
3772
3773 delete[] vstorage -> files_;
3774 vstorage->files_ = new_files_list;
3775 vstorage->num_levels_ = new_levels;
3776
3777 MutableCFOptions mutable_cf_options(*options);
3778 VersionEdit ve;
3779 InstrumentedMutex dummy_mutex;
3780 InstrumentedMutexLock l(&dummy_mutex);
3781 return versions.LogAndApply(
3782 versions.GetColumnFamilySet()->GetDefault(),
3783 mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
3784}
3785
3786Status VersionSet::DumpManifest(Options& options, std::string& dscname,
3787 bool verbose, bool hex, bool json) {
3788 // Open the specified manifest file.
3789 unique_ptr<SequentialFileReader> file_reader;
3790 Status s;
3791 {
3792 unique_ptr<SequentialFile> file;
11fdf7f2
TL
3793 s = options.env->NewSequentialFile(
3794 dscname, &file, env_->OptimizeForManifestRead(env_options_));
7c673cae
FG
3795 if (!s.ok()) {
3796 return s;
3797 }
11fdf7f2 3798 file_reader.reset(new SequentialFileReader(std::move(file), dscname));
7c673cae
FG
3799 }
3800
3801 bool have_prev_log_number = false;
3802 bool have_next_file = false;
3803 bool have_last_sequence = false;
3804 uint64_t next_file = 0;
3805 uint64_t last_sequence = 0;
3806 uint64_t previous_log_number = 0;
3807 int count = 0;
3808 std::unordered_map<uint32_t, std::string> comparators;
3809 std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
3810
3811 // add default column family
3812 VersionEdit default_cf_edit;
3813 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
3814 default_cf_edit.SetColumnFamily(0);
3815 ColumnFamilyData* default_cfd =
3816 CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
3817 builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
3818
3819 {
3820 VersionSet::LogReporter reporter;
3821 reporter.status = &s;
11fdf7f2
TL
3822 log::Reader reader(nullptr, std::move(file_reader), &reporter,
3823 true /* checksum */, 0 /* log_number */);
7c673cae
FG
3824 Slice record;
3825 std::string scratch;
3826 while (reader.ReadRecord(&record, &scratch) && s.ok()) {
3827 VersionEdit edit;
3828 s = edit.DecodeFrom(record);
3829 if (!s.ok()) {
3830 break;
3831 }
3832
3833 // Write out each individual edit
3834 if (verbose && !json) {
3835 printf("%s\n", edit.DebugString(hex).c_str());
3836 } else if (json) {
3837 printf("%s\n", edit.DebugJSON(count, hex).c_str());
3838 }
3839 count++;
3840
3841 bool cf_in_builders =
3842 builders.find(edit.column_family_) != builders.end();
3843
3844 if (edit.has_comparator_) {
3845 comparators.insert({edit.column_family_, edit.comparator_});
3846 }
3847
3848 ColumnFamilyData* cfd = nullptr;
3849
3850 if (edit.is_column_family_add_) {
3851 if (cf_in_builders) {
3852 s = Status::Corruption(
3853 "Manifest adding the same column family twice");
3854 break;
3855 }
3856 cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
11fdf7f2 3857 cfd->set_initialized();
7c673cae
FG
3858 builders.insert(
3859 {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
3860 } else if (edit.is_column_family_drop_) {
3861 if (!cf_in_builders) {
3862 s = Status::Corruption(
3863 "Manifest - dropping non-existing column family");
3864 break;
3865 }
3866 auto builder_iter = builders.find(edit.column_family_);
3867 delete builder_iter->second;
3868 builders.erase(builder_iter);
3869 comparators.erase(edit.column_family_);
3870 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
3871 assert(cfd != nullptr);
3872 cfd->Unref();
3873 delete cfd;
3874 cfd = nullptr;
3875 } else {
3876 if (!cf_in_builders) {
3877 s = Status::Corruption(
3878 "Manifest record referencing unknown column family");
3879 break;
3880 }
3881
3882 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
3883 // this should never happen since cf_in_builders is true
3884 assert(cfd != nullptr);
3885
3886 // if it is not column family add or column family drop,
3887 // then it's a file add/delete, which should be forwarded
3888 // to builder
3889 auto builder = builders.find(edit.column_family_);
3890 assert(builder != builders.end());
3891 builder->second->version_builder()->Apply(&edit);
3892 }
3893
3894 if (cfd != nullptr && edit.has_log_number_) {
3895 cfd->SetLogNumber(edit.log_number_);
3896 }
3897
11fdf7f2 3898
7c673cae
FG
3899 if (edit.has_prev_log_number_) {
3900 previous_log_number = edit.prev_log_number_;
3901 have_prev_log_number = true;
3902 }
3903
3904 if (edit.has_next_file_number_) {
3905 next_file = edit.next_file_number_;
3906 have_next_file = true;
3907 }
3908
3909 if (edit.has_last_sequence_) {
3910 last_sequence = edit.last_sequence_;
3911 have_last_sequence = true;
3912 }
3913
3914 if (edit.has_max_column_family_) {
3915 column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
3916 }
11fdf7f2
TL
3917
3918 if (edit.has_min_log_number_to_keep_) {
3919 MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
3920 }
7c673cae
FG
3921 }
3922 }
3923 file_reader.reset();
3924
3925 if (s.ok()) {
3926 if (!have_next_file) {
3927 s = Status::Corruption("no meta-nextfile entry in descriptor");
3928 printf("no meta-nextfile entry in descriptor");
3929 } else if (!have_last_sequence) {
3930 printf("no last-sequence-number entry in descriptor");
3931 s = Status::Corruption("no last-sequence-number entry in descriptor");
3932 }
3933
3934 if (!have_prev_log_number) {
3935 previous_log_number = 0;
3936 }
3937 }
3938
3939 if (s.ok()) {
3940 for (auto cfd : *column_family_set_) {
3941 if (cfd->IsDropped()) {
3942 continue;
3943 }
3944 auto builders_iter = builders.find(cfd->GetID());
3945 assert(builders_iter != builders.end());
3946 auto builder = builders_iter->second->version_builder();
3947
11fdf7f2
TL
3948 Version* v = new Version(cfd, this, env_options_,
3949 *cfd->GetLatestMutableCFOptions(),
3950 current_version_number_++);
7c673cae
FG
3951 builder->SaveTo(v->storage_info());
3952 v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
3953
3954 printf("--------------- Column family \"%s\" (ID %u) --------------\n",
3955 cfd->GetName().c_str(), (unsigned int)cfd->GetID());
3956 printf("log number: %lu\n", (unsigned long)cfd->GetLogNumber());
3957 auto comparator = comparators.find(cfd->GetID());
3958 if (comparator != comparators.end()) {
3959 printf("comparator: %s\n", comparator->second.c_str());
3960 } else {
3961 printf("comparator: <NO COMPARATOR>\n");
3962 }
3963 printf("%s \n", v->DebugString(hex).c_str());
3964 delete v;
3965 }
3966
3967 // Free builders
3968 for (auto& builder : builders) {
3969 delete builder.second;
3970 }
3971
3972 next_file_number_.store(next_file + 1);
11fdf7f2
TL
3973 last_allocated_sequence_ = last_sequence;
3974 last_published_sequence_ = last_sequence;
7c673cae
FG
3975 last_sequence_ = last_sequence;
3976 prev_log_number_ = previous_log_number;
3977
3978 printf(
3979 "next_file_number %lu last_sequence "
11fdf7f2
TL
3980 "%lu prev_log_number %lu max_column_family %u min_log_number_to_keep "
3981 "%" PRIu64 "\n",
7c673cae
FG
3982 (unsigned long)next_file_number_.load(), (unsigned long)last_sequence,
3983 (unsigned long)previous_log_number,
11fdf7f2 3984 column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
7c673cae
FG
3985 }
3986
3987 return s;
3988}
3989#endif // ROCKSDB_LITE
3990
11fdf7f2
TL
3991void VersionSet::MarkFileNumberUsed(uint64_t number) {
3992 // only called during recovery and repair which are single threaded, so this
3993 // works because there can't be concurrent calls
7c673cae
FG
3994 if (next_file_number_.load(std::memory_order_relaxed) <= number) {
3995 next_file_number_.store(number + 1, std::memory_order_relaxed);
3996 }
3997}
3998
11fdf7f2
TL
3999// Called only either from ::LogAndApply which is protected by mutex or during
4000// recovery which is single-threaded.
4001void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
4002 if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
4003 min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
4004 }
4005}
4006
7c673cae
FG
4007Status VersionSet::WriteSnapshot(log::Writer* log) {
4008 // TODO: Break up into multiple records to reduce memory usage on recovery?
4009
4010 // WARNING: This method doesn't hold a mutex!!
4011
4012 // This is done without DB mutex lock held, but only within single-threaded
4013 // LogAndApply. Column family manipulations can only happen within LogAndApply
4014 // (the same single thread), so we're safe to iterate.
4015 for (auto cfd : *column_family_set_) {
4016 if (cfd->IsDropped()) {
4017 continue;
4018 }
11fdf7f2 4019 assert(cfd->initialized());
7c673cae
FG
4020 {
4021 // Store column family info
4022 VersionEdit edit;
4023 if (cfd->GetID() != 0) {
4024 // default column family is always there,
4025 // no need to explicitly write it
4026 edit.AddColumnFamily(cfd->GetName());
4027 edit.SetColumnFamily(cfd->GetID());
4028 }
4029 edit.SetComparatorName(
4030 cfd->internal_comparator().user_comparator()->Name());
4031 std::string record;
4032 if (!edit.EncodeTo(&record)) {
4033 return Status::Corruption(
4034 "Unable to Encode VersionEdit:" + edit.DebugString(true));
4035 }
4036 Status s = log->AddRecord(record);
4037 if (!s.ok()) {
4038 return s;
4039 }
4040 }
4041
4042 {
4043 // Save files
4044 VersionEdit edit;
4045 edit.SetColumnFamily(cfd->GetID());
4046
4047 for (int level = 0; level < cfd->NumberLevels(); level++) {
4048 for (const auto& f :
4049 cfd->current()->storage_info()->LevelFiles(level)) {
4050 edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
4051 f->fd.GetFileSize(), f->smallest, f->largest,
11fdf7f2 4052 f->fd.smallest_seqno, f->fd.largest_seqno,
7c673cae
FG
4053 f->marked_for_compaction);
4054 }
4055 }
4056 edit.SetLogNumber(cfd->GetLogNumber());
4057 std::string record;
4058 if (!edit.EncodeTo(&record)) {
4059 return Status::Corruption(
4060 "Unable to Encode VersionEdit:" + edit.DebugString(true));
4061 }
4062 Status s = log->AddRecord(record);
4063 if (!s.ok()) {
4064 return s;
4065 }
4066 }
4067 }
4068
4069 return Status::OK();
4070}
4071
4072// TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
4073// function is called repeatedly with consecutive pairs of slices. For example
4074// if the slice list is [a, b, c, d] this function is called with arguments
4075// (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
4076// we avoid doing binary search for the keys b and c twice and instead somehow
4077// maintain state of where they first appear in the files.
4078uint64_t VersionSet::ApproximateSize(Version* v, const Slice& start,
4079 const Slice& end, int start_level,
4080 int end_level) {
4081 // pre-condition
4082 assert(v->cfd_->internal_comparator().Compare(start, end) <= 0);
4083
4084 uint64_t size = 0;
4085 const auto* vstorage = v->storage_info();
4086 end_level = end_level == -1
4087 ? vstorage->num_non_empty_levels()
4088 : std::min(end_level, vstorage->num_non_empty_levels());
4089
4090 assert(start_level <= end_level);
4091
4092 for (int level = start_level; level < end_level; level++) {
4093 const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
4094 if (!files_brief.num_files) {
4095 // empty level, skip exploration
4096 continue;
4097 }
4098
4099 if (!level) {
4100 // level 0 data is sorted order, handle the use case explicitly
4101 size += ApproximateSizeLevel0(v, files_brief, start, end);
4102 continue;
4103 }
4104
4105 assert(level > 0);
4106 assert(files_brief.num_files > 0);
4107
4108 // identify the file position for starting key
4109 const uint64_t idx_start = FindFileInRange(
4110 v->cfd_->internal_comparator(), files_brief, start,
4111 /*start=*/0, static_cast<uint32_t>(files_brief.num_files - 1));
4112 assert(idx_start < files_brief.num_files);
4113
4114 // scan all files from the starting position until the ending position
4115 // inferred from the sorted order
4116 for (uint64_t i = idx_start; i < files_brief.num_files; i++) {
4117 uint64_t val;
4118 val = ApproximateSize(v, files_brief.files[i], end);
4119 if (!val) {
4120 // the files after this will not have the range
4121 break;
4122 }
4123
4124 size += val;
4125
4126 if (i == idx_start) {
4127 // subtract the bytes needed to be scanned to get to the starting
4128 // key
4129 val = ApproximateSize(v, files_brief.files[i], start);
4130 assert(size >= val);
4131 size -= val;
4132 }
4133 }
4134 }
4135
4136 return size;
4137}
4138
4139uint64_t VersionSet::ApproximateSizeLevel0(Version* v,
4140 const LevelFilesBrief& files_brief,
4141 const Slice& key_start,
4142 const Slice& key_end) {
4143 // level 0 files are not in sorted order, we need to iterate through
4144 // the list to compute the total bytes that require scanning
4145 uint64_t size = 0;
4146 for (size_t i = 0; i < files_brief.num_files; i++) {
4147 const uint64_t start = ApproximateSize(v, files_brief.files[i], key_start);
4148 const uint64_t end = ApproximateSize(v, files_brief.files[i], key_end);
4149 assert(end >= start);
4150 size += end - start;
4151 }
4152 return size;
4153}
4154
4155uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
4156 const Slice& key) {
4157 // pre-condition
4158 assert(v);
4159
4160 uint64_t result = 0;
4161 if (v->cfd_->internal_comparator().Compare(f.largest_key, key) <= 0) {
4162 // Entire file is before "key", so just add the file size
4163 result = f.fd.GetFileSize();
4164 } else if (v->cfd_->internal_comparator().Compare(f.smallest_key, key) > 0) {
4165 // Entire file is after "key", so ignore
4166 result = 0;
4167 } else {
4168 // "key" falls in the range for this table. Add the
4169 // approximate offset of "key" within the table.
4170 TableReader* table_reader_ptr;
4171 InternalIterator* iter = v->cfd_->table_cache()->NewIterator(
11fdf7f2
TL
4172 ReadOptions(), v->env_options_, v->cfd_->internal_comparator(),
4173 *f.file_metadata, nullptr /* range_del_agg */,
4174 v->GetMutableCFOptions().prefix_extractor.get(), &table_reader_ptr);
7c673cae
FG
4175 if (table_reader_ptr != nullptr) {
4176 result = table_reader_ptr->ApproximateOffsetOf(key);
4177 }
4178 delete iter;
4179 }
4180 return result;
4181}
4182
4183void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
4184 // pre-calculate space requirement
4185 int64_t total_files = 0;
4186 for (auto cfd : *column_family_set_) {
11fdf7f2
TL
4187 if (!cfd->initialized()) {
4188 continue;
4189 }
7c673cae
FG
4190 Version* dummy_versions = cfd->dummy_versions();
4191 for (Version* v = dummy_versions->next_; v != dummy_versions;
4192 v = v->next_) {
4193 const auto* vstorage = v->storage_info();
4194 for (int level = 0; level < vstorage->num_levels(); level++) {
4195 total_files += vstorage->LevelFiles(level).size();
4196 }
4197 }
4198 }
4199
4200 // just one time extension to the right size
4201 live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
4202
4203 for (auto cfd : *column_family_set_) {
11fdf7f2
TL
4204 if (!cfd->initialized()) {
4205 continue;
4206 }
7c673cae
FG
4207 auto* current = cfd->current();
4208 bool found_current = false;
4209 Version* dummy_versions = cfd->dummy_versions();
4210 for (Version* v = dummy_versions->next_; v != dummy_versions;
4211 v = v->next_) {
4212 v->AddLiveFiles(live_list);
4213 if (v == current) {
4214 found_current = true;
4215 }
4216 }
4217 if (!found_current && current != nullptr) {
4218 // Should never happen unless it is a bug.
4219 assert(false);
4220 current->AddLiveFiles(live_list);
4221 }
4222 }
4223}
4224
4225InternalIterator* VersionSet::MakeInputIterator(
11fdf7f2
TL
4226 const Compaction* c, RangeDelAggregator* range_del_agg,
4227 const EnvOptions& env_options_compactions) {
7c673cae
FG
4228 auto cfd = c->column_family_data();
4229 ReadOptions read_options;
4230 read_options.verify_checksums = true;
4231 read_options.fill_cache = false;
11fdf7f2
TL
4232 // Compaction iterators shouldn't be confined to a single prefix.
4233 // Compactions use Seek() for
4234 // (a) concurrent compactions,
4235 // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
4236 read_options.total_order_seek = true;
7c673cae
FG
4237
4238 // Level-0 files have to be merged together. For other levels,
4239 // we will make a concatenating iterator per level.
4240 // TODO(opt): use concatenating iterator for level-0 if there is no overlap
4241 const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
4242 c->num_input_levels() - 1
4243 : c->num_input_levels());
4244 InternalIterator** list = new InternalIterator* [space];
4245 size_t num = 0;
4246 for (size_t which = 0; which < c->num_input_levels(); which++) {
4247 if (c->input_levels(which)->num_files != 0) {
4248 if (c->level(which) == 0) {
4249 const LevelFilesBrief* flevel = c->input_levels(which);
4250 for (size_t i = 0; i < flevel->num_files; i++) {
4251 list[num++] = cfd->table_cache()->NewIterator(
11fdf7f2
TL
4252 read_options, env_options_compactions, cfd->internal_comparator(),
4253 *flevel->files[i].file_metadata, range_del_agg,
4254 c->mutable_cf_options()->prefix_extractor.get(),
7c673cae
FG
4255 nullptr /* table_reader_ptr */,
4256 nullptr /* no per level latency histogram */,
4257 true /* for_compaction */, nullptr /* arena */,
11fdf7f2 4258 false /* skip_filters */, static_cast<int>(which) /* level */);
7c673cae
FG
4259 }
4260 } else {
4261 // Create concatenating iterator for the files from this level
11fdf7f2
TL
4262 list[num++] = new LevelIterator(
4263 cfd->table_cache(), read_options, env_options_compactions,
4264 cfd->internal_comparator(), c->input_levels(which),
4265 c->mutable_cf_options()->prefix_extractor.get(),
4266 false /* should_sample */,
4267 nullptr /* no per level latency histogram */,
4268 true /* for_compaction */, false /* skip_filters */,
4269 static_cast<int>(which) /* level */, range_del_agg);
7c673cae
FG
4270 }
4271 }
4272 }
4273 assert(num <= space);
4274 InternalIterator* result =
4275 NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
4276 static_cast<int>(num));
4277 delete[] list;
4278 return result;
4279}
4280
4281// verify that the files listed in this compaction are present
4282// in the current version
4283bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
4284#ifndef NDEBUG
4285 Version* version = c->column_family_data()->current();
4286 const VersionStorageInfo* vstorage = version->storage_info();
4287 if (c->input_version() != version) {
4288 ROCKS_LOG_INFO(
4289 db_options_->info_log,
4290 "[%s] compaction output being applied to a different base version from"
4291 " input version",
4292 c->column_family_data()->GetName().c_str());
4293
4294 if (vstorage->compaction_style_ == kCompactionStyleLevel &&
4295 c->start_level() == 0 && c->num_input_levels() > 2U) {
4296 // We are doing a L0->base_level compaction. The assumption is if
4297 // base level is not L1, levels from L1 to base_level - 1 is empty.
4298 // This is ensured by having one compaction from L0 going on at the
4299 // same time in level-based compaction. So that during the time, no
4300 // compaction/flush can put files to those levels.
4301 for (int l = c->start_level() + 1; l < c->output_level(); l++) {
4302 if (vstorage->NumLevelFiles(l) != 0) {
4303 return false;
4304 }
4305 }
4306 }
4307 }
4308
4309 for (size_t input = 0; input < c->num_input_levels(); ++input) {
4310 int level = c->level(input);
4311 for (size_t i = 0; i < c->num_input_files(input); ++i) {
4312 uint64_t number = c->input(input, i)->fd.GetNumber();
4313 bool found = false;
4314 for (size_t j = 0; j < vstorage->files_[level].size(); j++) {
4315 FileMetaData* f = vstorage->files_[level][j];
4316 if (f->fd.GetNumber() == number) {
4317 found = true;
4318 break;
4319 }
4320 }
4321 if (!found) {
4322 return false; // input files non existent in current version
4323 }
4324 }
4325 }
11fdf7f2
TL
4326#else
4327 (void)c;
7c673cae
FG
4328#endif
4329 return true; // everything good
4330}
4331
4332Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
4333 FileMetaData** meta,
4334 ColumnFamilyData** cfd) {
4335 for (auto cfd_iter : *column_family_set_) {
11fdf7f2
TL
4336 if (!cfd_iter->initialized()) {
4337 continue;
4338 }
7c673cae
FG
4339 Version* version = cfd_iter->current();
4340 const auto* vstorage = version->storage_info();
4341 for (int level = 0; level < vstorage->num_levels(); level++) {
4342 for (const auto& file : vstorage->LevelFiles(level)) {
4343 if (file->fd.GetNumber() == number) {
4344 *meta = file;
4345 *filelevel = level;
4346 *cfd = cfd_iter;
4347 return Status::OK();
4348 }
4349 }
4350 }
4351 }
4352 return Status::NotFound("File not present in any level");
4353}
4354
4355void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
4356 for (auto cfd : *column_family_set_) {
11fdf7f2 4357 if (cfd->IsDropped() || !cfd->initialized()) {
7c673cae
FG
4358 continue;
4359 }
4360 for (int level = 0; level < cfd->NumberLevels(); level++) {
4361 for (const auto& file :
4362 cfd->current()->storage_info()->LevelFiles(level)) {
4363 LiveFileMetaData filemetadata;
4364 filemetadata.column_family_name = cfd->GetName();
4365 uint32_t path_id = file->fd.GetPathId();
11fdf7f2
TL
4366 if (path_id < cfd->ioptions()->cf_paths.size()) {
4367 filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
7c673cae 4368 } else {
11fdf7f2
TL
4369 assert(!cfd->ioptions()->cf_paths.empty());
4370 filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
7c673cae
FG
4371 }
4372 filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
4373 filemetadata.level = level;
11fdf7f2 4374 filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
7c673cae
FG
4375 filemetadata.smallestkey = file->smallest.user_key().ToString();
4376 filemetadata.largestkey = file->largest.user_key().ToString();
11fdf7f2
TL
4377 filemetadata.smallest_seqno = file->fd.smallest_seqno;
4378 filemetadata.largest_seqno = file->fd.largest_seqno;
7c673cae
FG
4379 metadata->push_back(filemetadata);
4380 }
4381 }
4382 }
4383}
4384
11fdf7f2 4385void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
7c673cae
FG
4386 std::vector<std::string>* manifest_filenames,
4387 uint64_t min_pending_output) {
4388 assert(manifest_filenames->empty());
4389 obsolete_manifests_.swap(*manifest_filenames);
11fdf7f2
TL
4390 std::vector<ObsoleteFileInfo> pending_files;
4391 for (auto& f : obsolete_files_) {
4392 if (f.metadata->fd.GetNumber() < min_pending_output) {
4393 files->push_back(std::move(f));
7c673cae 4394 } else {
11fdf7f2 4395 pending_files.push_back(std::move(f));
7c673cae
FG
4396 }
4397 }
4398 obsolete_files_.swap(pending_files);
4399}
4400
4401ColumnFamilyData* VersionSet::CreateColumnFamily(
4402 const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
4403 assert(edit->is_column_family_add_);
4404
11fdf7f2
TL
4405 MutableCFOptions dummy_cf_options;
4406 Version* dummy_versions =
4407 new Version(nullptr, this, env_options_, dummy_cf_options);
7c673cae
FG
4408 // Ref() dummy version once so that later we can call Unref() to delete it
4409 // by avoiding calling "delete" explicitly (~Version is private)
4410 dummy_versions->Ref();
4411 auto new_cfd = column_family_set_->CreateColumnFamily(
4412 edit->column_family_name_, edit->column_family_, dummy_versions,
4413 cf_options);
4414
11fdf7f2
TL
4415 Version* v = new Version(new_cfd, this, env_options_,
4416 *new_cfd->GetLatestMutableCFOptions(),
4417 current_version_number_++);
7c673cae
FG
4418
4419 // Fill level target base information.
4420 v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
4421 *new_cfd->GetLatestMutableCFOptions());
4422 AppendVersion(new_cfd, v);
4423 // GetLatestMutableCFOptions() is safe here without mutex since the
4424 // cfd is not available to client
4425 new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
4426 LastSequence());
4427 new_cfd->SetLogNumber(edit->log_number_);
4428 return new_cfd;
4429}
4430
4431uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
4432 uint64_t count = 0;
4433 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
4434 count++;
4435 }
4436 return count;
4437}
4438
4439uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
4440 std::unordered_set<uint64_t> unique_files;
4441 uint64_t total_files_size = 0;
4442 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
4443 VersionStorageInfo* storage_info = v->storage_info();
4444 for (int level = 0; level < storage_info->num_levels_; level++) {
4445 for (const auto& file_meta : storage_info->LevelFiles(level)) {
4446 if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
4447 unique_files.end()) {
4448 unique_files.insert(file_meta->fd.packed_number_and_path_id);
4449 total_files_size += file_meta->fd.GetFileSize();
4450 }
4451 }
4452 }
4453 }
4454 return total_files_size;
4455}
4456
4457} // namespace rocksdb