1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/version_set.h"
12 #ifndef __STDC_FORMAT_MACROS
13 #define __STDC_FORMAT_MACROS
23 #include <unordered_map>
25 #include "db/compaction.h"
26 #include "db/internal_stats.h"
27 #include "db/log_reader.h"
28 #include "db/log_writer.h"
29 #include "db/memtable.h"
30 #include "db/merge_context.h"
31 #include "db/merge_helper.h"
32 #include "db/pinned_iterators_manager.h"
33 #include "db/table_cache.h"
34 #include "db/version_builder.h"
35 #include "monitoring/file_read_sample.h"
36 #include "monitoring/perf_context_imp.h"
37 #include "rocksdb/env.h"
38 #include "rocksdb/merge_operator.h"
39 #include "rocksdb/write_buffer_manager.h"
40 #include "table/format.h"
41 #include "table/get_context.h"
42 #include "table/internal_iterator.h"
43 #include "table/merging_iterator.h"
44 #include "table/meta_blocks.h"
45 #include "table/plain_table_factory.h"
46 #include "table/table_reader.h"
47 #include "table/two_level_iterator.h"
48 #include "util/coding.h"
49 #include "util/file_reader_writer.h"
50 #include "util/filename.h"
51 #include "util/stop_watch.h"
52 #include "util/string_util.h"
53 #include "util/sync_point.h"
54 #include "util/user_comparator_wrapper.h"
60 // Find File in LevelFilesBrief data structure
61 // Within an index range defined by left and right
62 int FindFileInRange(const InternalKeyComparator
& icmp
,
63 const LevelFilesBrief
& file_level
,
67 auto cmp
= [&](const FdWithKeyRange
& f
, const Slice
& k
) -> bool {
68 return icmp
.InternalKeyComparator::Compare(f
.largest_key
, k
) < 0;
70 const auto &b
= file_level
.files
;
71 return static_cast<int>(std::lower_bound(b
+ left
,
72 b
+ right
, key
, cmp
) - b
);
75 Status
OverlapWithIterator(const Comparator
* ucmp
,
76 const Slice
& smallest_user_key
,
77 const Slice
& largest_user_key
,
78 InternalIterator
* iter
,
80 InternalKey
range_start(smallest_user_key
, kMaxSequenceNumber
,
82 iter
->Seek(range_start
.Encode());
83 if (!iter
->status().ok()) {
84 return iter
->status();
89 ParsedInternalKey seek_result
;
90 if (!ParseInternalKey(iter
->key(), &seek_result
)) {
91 return Status::Corruption("DB have corrupted keys");
94 if (ucmp
->Compare(seek_result
.user_key
, largest_user_key
) <= 0) {
99 return iter
->status();
102 // Class to help choose the next file to search for the particular key.
103 // Searches and returns files level by level.
104 // We can search level-by-level since entries never hop across
105 // levels. Therefore we are guaranteed that if we find data
106 // in a smaller level, later levels are irrelevant (unless we
107 // are MergeInProgress).
110 FilePicker(std::vector
<FileMetaData
*>* files
, const Slice
& user_key
,
111 const Slice
& ikey
, autovector
<LevelFilesBrief
>* file_levels
,
112 unsigned int num_levels
, FileIndexer
* file_indexer
,
113 const Comparator
* user_comparator
,
114 const InternalKeyComparator
* internal_comparator
)
115 : num_levels_(num_levels
),
116 curr_level_(static_cast<unsigned int>(-1)),
117 returned_file_level_(static_cast<unsigned int>(-1)),
118 hit_file_level_(static_cast<unsigned int>(-1)),
119 search_left_bound_(0),
120 search_right_bound_(FileIndexer::kLevelMaxIndex
),
124 level_files_brief_(file_levels
),
125 is_hit_file_last_in_level_(false),
126 curr_file_level_(nullptr),
129 file_indexer_(file_indexer
),
130 user_comparator_(user_comparator
),
131 internal_comparator_(internal_comparator
) {
135 // Setup member variables to search first level.
136 search_ended_
= !PrepareNextLevel();
137 if (!search_ended_
) {
138 // Prefetch Level 0 table data to avoid cache miss if possible.
139 for (unsigned int i
= 0; i
< (*level_files_brief_
)[0].num_files
; ++i
) {
140 auto* r
= (*level_files_brief_
)[0].files
[i
].fd
.table_reader
;
148 int GetCurrentLevel() const { return curr_level_
; }
150 FdWithKeyRange
* GetNextFile() {
151 while (!search_ended_
) { // Loops over different levels.
152 while (curr_index_in_curr_level_
< curr_file_level_
->num_files
) {
153 // Loops over all files in current level.
154 FdWithKeyRange
* f
= &curr_file_level_
->files
[curr_index_in_curr_level_
];
155 hit_file_level_
= curr_level_
;
156 is_hit_file_last_in_level_
=
157 curr_index_in_curr_level_
== curr_file_level_
->num_files
- 1;
158 int cmp_largest
= -1;
160 // Do key range filtering of files or/and fractional cascading if:
161 // (1) not all the files are in level 0, or
162 // (2) there are more than 3 current level files
163 // If there are only 3 or less current level files in the system, we skip
164 // the key range filtering. In this case, more likely, the system is
165 // highly tuned to minimize number of tables queried by each query,
166 // so it is unlikely that key range filtering is more efficient than
167 // querying the files.
168 if (num_levels_
> 1 || curr_file_level_
->num_files
> 3) {
169 // Check if key is within a file's range. If search left bound and
170 // right bound point to the same find, we are sure key falls in
174 curr_index_in_curr_level_
== start_index_in_curr_level_
||
175 user_comparator_
->Compare(user_key_
,
176 ExtractUserKey(f
->smallest_key
)) <= 0);
178 int cmp_smallest
= user_comparator_
->Compare(user_key_
,
179 ExtractUserKey(f
->smallest_key
));
180 if (cmp_smallest
>= 0) {
181 cmp_largest
= user_comparator_
->Compare(user_key_
,
182 ExtractUserKey(f
->largest_key
));
185 // Setup file search bound for the next level based on the
186 // comparison results
187 if (curr_level_
> 0) {
188 file_indexer_
->GetNextLevelIndex(curr_level_
,
189 curr_index_in_curr_level_
,
190 cmp_smallest
, cmp_largest
,
192 &search_right_bound_
);
194 // Key falls out of current file's range
195 if (cmp_smallest
< 0 || cmp_largest
> 0) {
196 if (curr_level_
== 0) {
197 ++curr_index_in_curr_level_
;
200 // Search next level.
206 // Sanity check to make sure that the files are correctly sorted
208 if (curr_level_
!= 0) {
209 int comp_sign
= internal_comparator_
->Compare(
210 prev_file_
->largest_key
, f
->smallest_key
);
211 assert(comp_sign
< 0);
213 // level == 0, the current file cannot be newer than the previous
214 // one. Use compressed data structure, has no attribute seqNo
215 assert(curr_index_in_curr_level_
> 0);
216 assert(!NewestFirstBySeqNo(files_
[0][curr_index_in_curr_level_
],
217 files_
[0][curr_index_in_curr_level_
-1]));
222 returned_file_level_
= curr_level_
;
223 if (curr_level_
> 0 && cmp_largest
< 0) {
224 // No more files to search in this level.
225 search_ended_
= !PrepareNextLevel();
227 ++curr_index_in_curr_level_
;
231 // Start searching next level.
232 search_ended_
= !PrepareNextLevel();
238 // getter for current file level
239 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
240 unsigned int GetHitFileLevel() { return hit_file_level_
; }
242 // Returns true if the most recent "hit file" (i.e., one returned by
243 // GetNextFile()) is at the last index in its level.
244 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_
; }
247 unsigned int num_levels_
;
248 unsigned int curr_level_
;
249 unsigned int returned_file_level_
;
250 unsigned int hit_file_level_
;
251 int32_t search_left_bound_
;
252 int32_t search_right_bound_
;
254 std::vector
<FileMetaData
*>* files_
;
256 autovector
<LevelFilesBrief
>* level_files_brief_
;
258 bool is_hit_file_last_in_level_
;
259 LevelFilesBrief
* curr_file_level_
;
260 unsigned int curr_index_in_curr_level_
;
261 unsigned int start_index_in_curr_level_
;
264 FileIndexer
* file_indexer_
;
265 const Comparator
* user_comparator_
;
266 const InternalKeyComparator
* internal_comparator_
;
268 FdWithKeyRange
* prev_file_
;
271 // Setup local variables to search next level.
272 // Returns false if there are no more levels to search.
273 bool PrepareNextLevel() {
275 while (curr_level_
< num_levels_
) {
276 curr_file_level_
= &(*level_files_brief_
)[curr_level_
];
277 if (curr_file_level_
->num_files
== 0) {
278 // When current level is empty, the search bound generated from upper
279 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
281 assert(search_left_bound_
== 0);
282 assert(search_right_bound_
== -1 ||
283 search_right_bound_
== FileIndexer::kLevelMaxIndex
);
284 // Since current level is empty, it will need to search all files in
286 search_left_bound_
= 0;
287 search_right_bound_
= FileIndexer::kLevelMaxIndex
;
292 // Some files may overlap each other. We find
293 // all files that overlap user_key and process them in order from
294 // newest to oldest. In the context of merge-operator, this can occur at
295 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
296 // are always compacted into a single entry).
298 if (curr_level_
== 0) {
299 // On Level-0, we read through all files to check for overlap.
302 // On Level-n (n>=1), files are sorted. Binary search to find the
303 // earliest file whose largest key >= ikey. Search left bound and
304 // right bound are used to narrow the range.
305 if (search_left_bound_
<= search_right_bound_
) {
306 if (search_right_bound_
== FileIndexer::kLevelMaxIndex
) {
307 search_right_bound_
=
308 static_cast<int32_t>(curr_file_level_
->num_files
) - 1;
310 // `search_right_bound_` is an inclusive upper-bound, but since it was
311 // determined based on user key, it is still possible the lookup key
312 // falls to the right of `search_right_bound_`'s corresponding file.
313 // So, pass a limit one higher, which allows us to detect this case.
315 FindFileInRange(*internal_comparator_
, *curr_file_level_
, ikey_
,
316 static_cast<uint32_t>(search_left_bound_
),
317 static_cast<uint32_t>(search_right_bound_
) + 1);
318 if (start_index
== search_right_bound_
+ 1) {
319 // `ikey_` comes after `search_right_bound_`. The lookup key does
320 // not exist on this level, so let's skip this level and do a full
321 // binary search on the next level.
322 search_left_bound_
= 0;
323 search_right_bound_
= FileIndexer::kLevelMaxIndex
;
328 // search_left_bound > search_right_bound, key does not exist in
329 // this level. Since no comparison is done in this level, it will
330 // need to search all files in the next level.
331 search_left_bound_
= 0;
332 search_right_bound_
= FileIndexer::kLevelMaxIndex
;
337 start_index_in_curr_level_
= start_index
;
338 curr_index_in_curr_level_
= start_index
;
340 prev_file_
= nullptr;
344 // curr_level_ = num_levels_. So, no more levels to search.
348 } // anonymous namespace
350 VersionStorageInfo::~VersionStorageInfo() { delete[] files_
; }
352 Version::~Version() {
355 // Remove from linked list
356 prev_
->next_
= next_
;
357 next_
->prev_
= prev_
;
359 // Drop references to files
360 for (int level
= 0; level
< storage_info_
.num_levels_
; level
++) {
361 for (size_t i
= 0; i
< storage_info_
.files_
[level
].size(); i
++) {
362 FileMetaData
* f
= storage_info_
.files_
[level
][i
];
366 assert(cfd_
!= nullptr);
367 uint32_t path_id
= f
->fd
.GetPathId();
368 assert(path_id
< cfd_
->ioptions()->cf_paths
.size());
369 vset_
->obsolete_files_
.push_back(
370 ObsoleteFileInfo(f
, cfd_
->ioptions()->cf_paths
[path_id
].path
));
376 int FindFile(const InternalKeyComparator
& icmp
,
377 const LevelFilesBrief
& file_level
,
379 return FindFileInRange(icmp
, file_level
, key
, 0,
380 static_cast<uint32_t>(file_level
.num_files
));
383 void DoGenerateLevelFilesBrief(LevelFilesBrief
* file_level
,
384 const std::vector
<FileMetaData
*>& files
,
389 size_t num
= files
.size();
390 file_level
->num_files
= num
;
391 char* mem
= arena
->AllocateAligned(num
* sizeof(FdWithKeyRange
));
392 file_level
->files
= new (mem
)FdWithKeyRange
[num
];
394 for (size_t i
= 0; i
< num
; i
++) {
395 Slice smallest_key
= files
[i
]->smallest
.Encode();
396 Slice largest_key
= files
[i
]->largest
.Encode();
398 // Copy key slice to sequential memory
399 size_t smallest_size
= smallest_key
.size();
400 size_t largest_size
= largest_key
.size();
401 mem
= arena
->AllocateAligned(smallest_size
+ largest_size
);
402 memcpy(mem
, smallest_key
.data(), smallest_size
);
403 memcpy(mem
+ smallest_size
, largest_key
.data(), largest_size
);
405 FdWithKeyRange
& f
= file_level
->files
[i
];
407 f
.file_metadata
= files
[i
];
408 f
.smallest_key
= Slice(mem
, smallest_size
);
409 f
.largest_key
= Slice(mem
+ smallest_size
, largest_size
);
413 static bool AfterFile(const Comparator
* ucmp
,
414 const Slice
* user_key
, const FdWithKeyRange
* f
) {
415 // nullptr user_key occurs before all keys and is therefore never after *f
416 return (user_key
!= nullptr &&
417 ucmp
->Compare(*user_key
, ExtractUserKey(f
->largest_key
)) > 0);
420 static bool BeforeFile(const Comparator
* ucmp
,
421 const Slice
* user_key
, const FdWithKeyRange
* f
) {
422 // nullptr user_key occurs after all keys and is therefore never before *f
423 return (user_key
!= nullptr &&
424 ucmp
->Compare(*user_key
, ExtractUserKey(f
->smallest_key
)) < 0);
427 bool SomeFileOverlapsRange(
428 const InternalKeyComparator
& icmp
,
429 bool disjoint_sorted_files
,
430 const LevelFilesBrief
& file_level
,
431 const Slice
* smallest_user_key
,
432 const Slice
* largest_user_key
) {
433 const Comparator
* ucmp
= icmp
.user_comparator();
434 if (!disjoint_sorted_files
) {
435 // Need to check against all files
436 for (size_t i
= 0; i
< file_level
.num_files
; i
++) {
437 const FdWithKeyRange
* f
= &(file_level
.files
[i
]);
438 if (AfterFile(ucmp
, smallest_user_key
, f
) ||
439 BeforeFile(ucmp
, largest_user_key
, f
)) {
442 return true; // Overlap
448 // Binary search over file list
450 if (smallest_user_key
!= nullptr) {
451 // Find the leftmost possible internal key for smallest_user_key
453 small
.SetMinPossibleForUserKey(*smallest_user_key
);
454 index
= FindFile(icmp
, file_level
, small
.Encode());
457 if (index
>= file_level
.num_files
) {
458 // beginning of range is after all files, so no overlap.
462 return !BeforeFile(ucmp
, largest_user_key
, &file_level
.files
[index
]);
467 class LevelIterator final
: public InternalIterator
{
470 TableCache
* table_cache
, const ReadOptions
& read_options
,
471 const EnvOptions
& env_options
, const InternalKeyComparator
& icomparator
,
472 const LevelFilesBrief
* flevel
, const SliceTransform
* prefix_extractor
,
473 bool should_sample
, HistogramImpl
* file_read_hist
, bool for_compaction
,
474 bool skip_filters
, int level
, RangeDelAggregator
* range_del_agg
,
475 const std::vector
<AtomicCompactionUnitBoundary
>* compaction_boundaries
=
477 : table_cache_(table_cache
),
478 read_options_(read_options
),
479 env_options_(env_options
),
480 icomparator_(icomparator
),
481 user_comparator_(icomparator
.user_comparator()),
483 prefix_extractor_(prefix_extractor
),
484 file_read_hist_(file_read_hist
),
485 should_sample_(should_sample
),
486 for_compaction_(for_compaction
),
487 skip_filters_(skip_filters
),
488 file_index_(flevel_
->num_files
),
490 range_del_agg_(range_del_agg
),
491 pinned_iters_mgr_(nullptr),
492 compaction_boundaries_(compaction_boundaries
) {
493 // Empty level is not supported.
494 assert(flevel_
!= nullptr && flevel_
->num_files
> 0);
497 ~LevelIterator() override
{ delete file_iter_
.Set(nullptr); }
499 void Seek(const Slice
& target
) override
;
500 void SeekForPrev(const Slice
& target
) override
;
501 void SeekToFirst() override
;
502 void SeekToLast() override
;
503 void Next() override
;
504 void Prev() override
;
506 bool Valid() const override
{ return file_iter_
.Valid(); }
507 Slice
key() const override
{
509 return file_iter_
.key();
511 Slice
value() const override
{
513 return file_iter_
.value();
515 Status
status() const override
{
516 return file_iter_
.iter() ? file_iter_
.status() : Status::OK();
518 void SetPinnedItersMgr(PinnedIteratorsManager
* pinned_iters_mgr
) override
{
519 pinned_iters_mgr_
= pinned_iters_mgr
;
520 if (file_iter_
.iter()) {
521 file_iter_
.SetPinnedItersMgr(pinned_iters_mgr
);
524 bool IsKeyPinned() const override
{
525 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
526 file_iter_
.iter() && file_iter_
.IsKeyPinned();
528 bool IsValuePinned() const override
{
529 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
530 file_iter_
.iter() && file_iter_
.IsValuePinned();
534 void SkipEmptyFileForward();
535 void SkipEmptyFileBackward();
536 void SetFileIterator(InternalIterator
* iter
);
537 void InitFileIterator(size_t new_file_index
);
539 const Slice
& file_smallest_key(size_t file_index
) {
540 assert(file_index
< flevel_
->num_files
);
541 return flevel_
->files
[file_index
].smallest_key
;
544 bool KeyReachedUpperBound(const Slice
& internal_key
) {
545 return read_options_
.iterate_upper_bound
!= nullptr &&
546 user_comparator_
.Compare(ExtractUserKey(internal_key
),
547 *read_options_
.iterate_upper_bound
) >= 0;
550 InternalIterator
* NewFileIterator() {
551 assert(file_index_
< flevel_
->num_files
);
552 auto file_meta
= flevel_
->files
[file_index_
];
553 if (should_sample_
) {
554 sample_file_read_inc(file_meta
.file_metadata
);
557 const InternalKey
* smallest_compaction_key
= nullptr;
558 const InternalKey
* largest_compaction_key
= nullptr;
559 if (compaction_boundaries_
!= nullptr) {
560 smallest_compaction_key
= (*compaction_boundaries_
)[file_index_
].smallest
;
561 largest_compaction_key
= (*compaction_boundaries_
)[file_index_
].largest
;
563 return table_cache_
->NewIterator(
564 read_options_
, env_options_
, icomparator_
, *file_meta
.file_metadata
,
565 range_del_agg_
, prefix_extractor_
,
566 nullptr /* don't need reference to table */,
567 file_read_hist_
, for_compaction_
, nullptr /* arena */, skip_filters_
,
568 level_
, smallest_compaction_key
, largest_compaction_key
);
571 TableCache
* table_cache_
;
572 const ReadOptions read_options_
;
573 const EnvOptions
& env_options_
;
574 const InternalKeyComparator
& icomparator_
;
575 const UserComparatorWrapper user_comparator_
;
576 const LevelFilesBrief
* flevel_
;
577 mutable FileDescriptor current_value_
;
578 const SliceTransform
* prefix_extractor_
;
580 HistogramImpl
* file_read_hist_
;
582 bool for_compaction_
;
586 RangeDelAggregator
* range_del_agg_
;
587 IteratorWrapper file_iter_
; // May be nullptr
588 PinnedIteratorsManager
* pinned_iters_mgr_
;
590 // To be propagated to RangeDelAggregator in order to safely truncate range
592 const std::vector
<AtomicCompactionUnitBoundary
>* compaction_boundaries_
;
595 void LevelIterator::Seek(const Slice
& target
) {
596 size_t new_file_index
= FindFile(icomparator_
, *flevel_
, target
);
598 InitFileIterator(new_file_index
);
599 if (file_iter_
.iter() != nullptr) {
600 file_iter_
.Seek(target
);
602 SkipEmptyFileForward();
605 void LevelIterator::SeekForPrev(const Slice
& target
) {
606 size_t new_file_index
= FindFile(icomparator_
, *flevel_
, target
);
607 if (new_file_index
>= flevel_
->num_files
) {
608 new_file_index
= flevel_
->num_files
- 1;
611 InitFileIterator(new_file_index
);
612 if (file_iter_
.iter() != nullptr) {
613 file_iter_
.SeekForPrev(target
);
614 SkipEmptyFileBackward();
618 void LevelIterator::SeekToFirst() {
620 if (file_iter_
.iter() != nullptr) {
621 file_iter_
.SeekToFirst();
623 SkipEmptyFileForward();
626 void LevelIterator::SeekToLast() {
627 InitFileIterator(flevel_
->num_files
- 1);
628 if (file_iter_
.iter() != nullptr) {
629 file_iter_
.SeekToLast();
631 SkipEmptyFileBackward();
634 void LevelIterator::Next() {
637 SkipEmptyFileForward();
640 void LevelIterator::Prev() {
643 SkipEmptyFileBackward();
646 void LevelIterator::SkipEmptyFileForward() {
647 while (file_iter_
.iter() == nullptr ||
648 (!file_iter_
.Valid() && file_iter_
.status().ok() &&
649 !file_iter_
.iter()->IsOutOfBound())) {
651 if (file_index_
>= flevel_
->num_files
- 1) {
652 // Already at the last file
653 SetFileIterator(nullptr);
656 if (KeyReachedUpperBound(file_smallest_key(file_index_
+ 1))) {
657 SetFileIterator(nullptr);
660 InitFileIterator(file_index_
+ 1);
661 if (file_iter_
.iter() != nullptr) {
662 file_iter_
.SeekToFirst();
667 void LevelIterator::SkipEmptyFileBackward() {
668 while (file_iter_
.iter() == nullptr ||
669 (!file_iter_
.Valid() && file_iter_
.status().ok())) {
670 // Move to previous file
671 if (file_index_
== 0) {
672 // Already the first file
673 SetFileIterator(nullptr);
676 InitFileIterator(file_index_
- 1);
677 if (file_iter_
.iter() != nullptr) {
678 file_iter_
.SeekToLast();
683 void LevelIterator::SetFileIterator(InternalIterator
* iter
) {
684 if (pinned_iters_mgr_
&& iter
) {
685 iter
->SetPinnedItersMgr(pinned_iters_mgr_
);
688 InternalIterator
* old_iter
= file_iter_
.Set(iter
);
689 if (pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled()) {
690 pinned_iters_mgr_
->PinIterator(old_iter
);
696 void LevelIterator::InitFileIterator(size_t new_file_index
) {
697 if (new_file_index
>= flevel_
->num_files
) {
698 file_index_
= new_file_index
;
699 SetFileIterator(nullptr);
702 // If the file iterator shows incomplete, we try it again if users seek
703 // to the same file, as this time we may go to a different data block
704 // which is cached in block cache.
706 if (file_iter_
.iter() != nullptr && !file_iter_
.status().IsIncomplete() &&
707 new_file_index
== file_index_
) {
708 // file_iter_ is already constructed with this iterator, so
709 // no need to change anything
711 file_index_
= new_file_index
;
712 InternalIterator
* iter
= NewFileIterator();
713 SetFileIterator(iter
);
717 } // anonymous namespace
719 // A wrapper of version builder which references the current version in
720 // constructor and unref it in the destructor.
721 // Both of the constructor and destructor need to be called inside DB Mutex.
722 class BaseReferencedVersionBuilder
{
724 explicit BaseReferencedVersionBuilder(ColumnFamilyData
* cfd
)
725 : version_builder_(new VersionBuilder(
726 cfd
->current()->version_set()->env_options(), cfd
->table_cache(),
727 cfd
->current()->storage_info(), cfd
->ioptions()->info_log
)),
728 version_(cfd
->current()) {
731 ~BaseReferencedVersionBuilder() {
734 VersionBuilder
* version_builder() { return version_builder_
.get(); }
737 std::unique_ptr
<VersionBuilder
> version_builder_
;
741 Status
Version::GetTableProperties(std::shared_ptr
<const TableProperties
>* tp
,
742 const FileMetaData
* file_meta
,
743 const std::string
* fname
) const {
744 auto table_cache
= cfd_
->table_cache();
745 auto ioptions
= cfd_
->ioptions();
746 Status s
= table_cache
->GetTableProperties(
747 env_options_
, cfd_
->internal_comparator(), file_meta
->fd
, tp
,
748 mutable_cf_options_
.prefix_extractor
.get(), true /* no io */);
753 // We only ignore error type `Incomplete` since it's by design that we
754 // disallow table when it's not in table cache.
755 if (!s
.IsIncomplete()) {
759 // 2. Table is not present in table cache, we'll read the table properties
760 // directly from the properties block in the file.
761 std::unique_ptr
<RandomAccessFile
> file
;
762 std::string file_name
;
763 if (fname
!= nullptr) {
767 TableFileName(ioptions
->cf_paths
, file_meta
->fd
.GetNumber(),
768 file_meta
->fd
.GetPathId());
770 s
= ioptions
->env
->NewRandomAccessFile(file_name
, &file
, env_options_
);
775 TableProperties
* raw_table_properties
;
776 // By setting the magic number to kInvalidTableMagicNumber, we can by
777 // pass the magic number check in the footer.
778 std::unique_ptr
<RandomAccessFileReader
> file_reader(
779 new RandomAccessFileReader(
780 std::move(file
), file_name
, nullptr /* env */, nullptr /* stats */,
781 0 /* hist_type */, nullptr /* file_read_hist */,
782 nullptr /* rate_limiter */, false /* for_compaction*/,
783 ioptions
->listeners
));
784 s
= ReadTableProperties(
785 file_reader
.get(), file_meta
->fd
.GetFileSize(),
786 Footer::kInvalidTableMagicNumber
/* table's magic number */, *ioptions
,
787 &raw_table_properties
, false /* compression_type_missing */);
791 RecordTick(ioptions
->statistics
, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES
);
793 *tp
= std::shared_ptr
<const TableProperties
>(raw_table_properties
);
797 Status
Version::GetPropertiesOfAllTables(TablePropertiesCollection
* props
) {
799 for (int level
= 0; level
< storage_info_
.num_levels_
; level
++) {
800 s
= GetPropertiesOfAllTables(props
, level
);
809 Status
Version::GetPropertiesOfAllTables(TablePropertiesCollection
* props
,
811 for (const auto& file_meta
: storage_info_
.files_
[level
]) {
813 TableFileName(cfd_
->ioptions()->cf_paths
, file_meta
->fd
.GetNumber(),
814 file_meta
->fd
.GetPathId());
815 // 1. If the table is already present in table cache, load table
816 // properties from there.
817 std::shared_ptr
<const TableProperties
> table_properties
;
818 Status s
= GetTableProperties(&table_properties
, file_meta
, &fname
);
820 props
->insert({fname
, table_properties
});
829 Status
Version::GetPropertiesOfTablesInRange(
830 const Range
* range
, std::size_t n
, TablePropertiesCollection
* props
) const {
831 for (int level
= 0; level
< storage_info_
.num_non_empty_levels(); level
++) {
832 for (decltype(n
) i
= 0; i
< n
; i
++) {
833 // Convert user_key into a corresponding internal key.
834 InternalKey
k1(range
[i
].start
, kMaxSequenceNumber
, kValueTypeForSeek
);
835 InternalKey
k2(range
[i
].limit
, kMaxSequenceNumber
, kValueTypeForSeek
);
836 std::vector
<FileMetaData
*> files
;
837 storage_info_
.GetOverlappingInputs(level
, &k1
, &k2
, &files
, -1, nullptr,
839 for (const auto& file_meta
: files
) {
841 TableFileName(cfd_
->ioptions()->cf_paths
,
842 file_meta
->fd
.GetNumber(), file_meta
->fd
.GetPathId());
843 if (props
->count(fname
) == 0) {
844 // 1. If the table is already present in table cache, load table
845 // properties from there.
846 std::shared_ptr
<const TableProperties
> table_properties
;
847 Status s
= GetTableProperties(&table_properties
, file_meta
, &fname
);
849 props
->insert({fname
, table_properties
});
861 Status
Version::GetAggregatedTableProperties(
862 std::shared_ptr
<const TableProperties
>* tp
, int level
) {
863 TablePropertiesCollection props
;
866 s
= GetPropertiesOfAllTables(&props
);
868 s
= GetPropertiesOfAllTables(&props
, level
);
874 auto* new_tp
= new TableProperties();
875 for (const auto& item
: props
) {
876 new_tp
->Add(*item
.second
);
882 size_t Version::GetMemoryUsageByTableReaders() {
883 size_t total_usage
= 0;
884 for (auto& file_level
: storage_info_
.level_files_brief_
) {
885 for (size_t i
= 0; i
< file_level
.num_files
; i
++) {
886 total_usage
+= cfd_
->table_cache()->GetMemoryUsageByTableReader(
887 env_options_
, cfd_
->internal_comparator(), file_level
.files
[i
].fd
,
888 mutable_cf_options_
.prefix_extractor
.get());
894 void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData
* cf_meta
) {
898 cf_meta
->name
= cfd_
->GetName();
900 cf_meta
->file_count
= 0;
901 cf_meta
->levels
.clear();
903 auto* ioptions
= cfd_
->ioptions();
904 auto* vstorage
= storage_info();
906 for (int level
= 0; level
< cfd_
->NumberLevels(); level
++) {
907 uint64_t level_size
= 0;
908 cf_meta
->file_count
+= vstorage
->LevelFiles(level
).size();
909 std::vector
<SstFileMetaData
> files
;
910 for (const auto& file
: vstorage
->LevelFiles(level
)) {
911 uint32_t path_id
= file
->fd
.GetPathId();
912 std::string file_path
;
913 if (path_id
< ioptions
->cf_paths
.size()) {
914 file_path
= ioptions
->cf_paths
[path_id
].path
;
916 assert(!ioptions
->cf_paths
.empty());
917 file_path
= ioptions
->cf_paths
.back().path
;
919 files
.emplace_back(SstFileMetaData
{
920 MakeTableFileName("", file
->fd
.GetNumber()),
922 static_cast<size_t>(file
->fd
.GetFileSize()),
923 file
->fd
.smallest_seqno
,
924 file
->fd
.largest_seqno
,
925 file
->smallest
.user_key().ToString(),
926 file
->largest
.user_key().ToString(),
927 file
->stats
.num_reads_sampled
.load(std::memory_order_relaxed
),
928 file
->being_compacted
});
929 files
.back().num_entries
= file
->num_entries
;
930 files
.back().num_deletions
= file
->num_deletions
;
931 level_size
+= file
->fd
.GetFileSize();
933 cf_meta
->levels
.emplace_back(
934 level
, level_size
, std::move(files
));
935 cf_meta
->size
+= level_size
;
939 uint64_t Version::GetSstFilesSize() {
940 uint64_t sst_files_size
= 0;
941 for (int level
= 0; level
< storage_info_
.num_levels_
; level
++) {
942 for (const auto& file_meta
: storage_info_
.LevelFiles(level
)) {
943 sst_files_size
+= file_meta
->fd
.GetFileSize();
946 return sst_files_size
;
949 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
950 // Estimation will be inaccurate when:
951 // (1) there exist merge keys
952 // (2) keys are directly overwritten
953 // (3) deletion on non-existing keys
954 // (4) low number of samples
955 if (current_num_samples_
== 0) {
959 if (current_num_non_deletions_
<= current_num_deletions_
) {
963 uint64_t est
= current_num_non_deletions_
- current_num_deletions_
;
965 uint64_t file_count
= 0;
966 for (int level
= 0; level
< num_levels_
; ++level
) {
967 file_count
+= files_
[level
].size();
970 if (current_num_samples_
< file_count
) {
971 // casting to avoid overflowing
973 static_cast<uint64_t>(
974 (est
* static_cast<double>(file_count
) / current_num_samples_
)
981 double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
983 assert(level
< num_levels_
);
984 uint64_t sum_file_size_bytes
= 0;
985 uint64_t sum_data_size_bytes
= 0;
986 for (auto* file_meta
: files_
[level
]) {
987 sum_file_size_bytes
+= file_meta
->fd
.GetFileSize();
988 sum_data_size_bytes
+= file_meta
->raw_key_size
+ file_meta
->raw_value_size
;
990 if (sum_file_size_bytes
== 0) {
993 return static_cast<double>(sum_data_size_bytes
) / sum_file_size_bytes
;
996 void Version::AddIterators(const ReadOptions
& read_options
,
997 const EnvOptions
& soptions
,
998 MergeIteratorBuilder
* merge_iter_builder
,
999 RangeDelAggregator
* range_del_agg
) {
1000 assert(storage_info_
.finalized_
);
1002 for (int level
= 0; level
< storage_info_
.num_non_empty_levels(); level
++) {
1003 AddIteratorsForLevel(read_options
, soptions
, merge_iter_builder
, level
,
1008 void Version::AddIteratorsForLevel(const ReadOptions
& read_options
,
1009 const EnvOptions
& soptions
,
1010 MergeIteratorBuilder
* merge_iter_builder
,
1012 RangeDelAggregator
* range_del_agg
) {
1013 assert(storage_info_
.finalized_
);
1014 if (level
>= storage_info_
.num_non_empty_levels()) {
1015 // This is an empty level
1017 } else if (storage_info_
.LevelFilesBrief(level
).num_files
== 0) {
1018 // No files in this level
1022 bool should_sample
= should_sample_file_read();
1024 auto* arena
= merge_iter_builder
->GetArena();
1026 // Merge all level zero files together since they may overlap
1027 for (size_t i
= 0; i
< storage_info_
.LevelFilesBrief(0).num_files
; i
++) {
1028 const auto& file
= storage_info_
.LevelFilesBrief(0).files
[i
];
1029 merge_iter_builder
->AddIterator(cfd_
->table_cache()->NewIterator(
1030 read_options
, soptions
, cfd_
->internal_comparator(), *file
.file_metadata
,
1031 range_del_agg
, mutable_cf_options_
.prefix_extractor
.get(), nullptr,
1032 cfd_
->internal_stats()->GetFileReadHist(0), false, arena
,
1033 false /* skip_filters */, 0 /* level */));
1035 if (should_sample
) {
1036 // Count ones for every L0 files. This is done per iterator creation
1037 // rather than Seek(), while files in other levels are recored per seek.
1038 // If users execute one range query per iterator, there may be some
1039 // discrepancy here.
1040 for (FileMetaData
* meta
: storage_info_
.LevelFiles(0)) {
1041 sample_file_read_inc(meta
);
1044 } else if (storage_info_
.LevelFilesBrief(level
).num_files
> 0) {
1045 // For levels > 0, we can use a concatenating iterator that sequentially
1046 // walks through the non-overlapping files in the level, opening them
1048 auto* mem
= arena
->AllocateAligned(sizeof(LevelIterator
));
1049 merge_iter_builder
->AddIterator(new (mem
) LevelIterator(
1050 cfd_
->table_cache(), read_options
, soptions
,
1051 cfd_
->internal_comparator(), &storage_info_
.LevelFilesBrief(level
),
1052 mutable_cf_options_
.prefix_extractor
.get(), should_sample_file_read(),
1053 cfd_
->internal_stats()->GetFileReadHist(level
),
1054 false /* for_compaction */, IsFilterSkipped(level
), level
,
1059 Status
Version::OverlapWithLevelIterator(const ReadOptions
& read_options
,
1060 const EnvOptions
& env_options
,
1061 const Slice
& smallest_user_key
,
1062 const Slice
& largest_user_key
,
1063 int level
, bool* overlap
) {
1064 assert(storage_info_
.finalized_
);
1066 auto icmp
= cfd_
->internal_comparator();
1067 auto ucmp
= icmp
.user_comparator();
1071 ReadRangeDelAggregator
range_del_agg(&icmp
,
1072 kMaxSequenceNumber
/* upper_bound */);
1077 for (size_t i
= 0; i
< storage_info_
.LevelFilesBrief(0).num_files
; i
++) {
1078 const auto file
= &storage_info_
.LevelFilesBrief(0).files
[i
];
1079 if (AfterFile(ucmp
, &smallest_user_key
, file
) ||
1080 BeforeFile(ucmp
, &largest_user_key
, file
)) {
1083 ScopedArenaIterator
iter(cfd_
->table_cache()->NewIterator(
1084 read_options
, env_options
, cfd_
->internal_comparator(), *file
->file_metadata
,
1085 &range_del_agg
, mutable_cf_options_
.prefix_extractor
.get(), nullptr,
1086 cfd_
->internal_stats()->GetFileReadHist(0), false, &arena
,
1087 false /* skip_filters */, 0 /* level */));
1088 status
= OverlapWithIterator(
1089 ucmp
, smallest_user_key
, largest_user_key
, iter
.get(), overlap
);
1090 if (!status
.ok() || *overlap
) {
1094 } else if (storage_info_
.LevelFilesBrief(level
).num_files
> 0) {
1095 auto mem
= arena
.AllocateAligned(sizeof(LevelIterator
));
1096 ScopedArenaIterator
iter(new (mem
) LevelIterator(
1097 cfd_
->table_cache(), read_options
, env_options
,
1098 cfd_
->internal_comparator(), &storage_info_
.LevelFilesBrief(level
),
1099 mutable_cf_options_
.prefix_extractor
.get(), should_sample_file_read(),
1100 cfd_
->internal_stats()->GetFileReadHist(level
),
1101 false /* for_compaction */, IsFilterSkipped(level
), level
,
1103 status
= OverlapWithIterator(
1104 ucmp
, smallest_user_key
, largest_user_key
, iter
.get(), overlap
);
1107 if (status
.ok() && *overlap
== false &&
1108 range_del_agg
.IsRangeOverlapped(smallest_user_key
, largest_user_key
)) {
1114 VersionStorageInfo::VersionStorageInfo(
1115 const InternalKeyComparator
* internal_comparator
,
1116 const Comparator
* user_comparator
, int levels
,
1117 CompactionStyle compaction_style
, VersionStorageInfo
* ref_vstorage
,
1118 bool _force_consistency_checks
)
1119 : internal_comparator_(internal_comparator
),
1120 user_comparator_(user_comparator
),
1121 // cfd is nullptr if Version is dummy
1122 num_levels_(levels
),
1123 num_non_empty_levels_(0),
1124 file_indexer_(user_comparator
),
1125 compaction_style_(compaction_style
),
1126 files_(new std::vector
<FileMetaData
*>[num_levels_
]),
1127 base_level_(num_levels_
== 1 ? -1 : 1),
1128 level_multiplier_(0.0),
1129 files_by_compaction_pri_(num_levels_
),
1130 level0_non_overlapping_(false),
1131 next_file_to_compact_by_size_(num_levels_
),
1132 compaction_score_(num_levels_
),
1133 compaction_level_(num_levels_
),
1134 l0_delay_trigger_count_(0),
1135 accumulated_file_size_(0),
1136 accumulated_raw_key_size_(0),
1137 accumulated_raw_value_size_(0),
1138 accumulated_num_non_deletions_(0),
1139 accumulated_num_deletions_(0),
1140 current_num_non_deletions_(0),
1141 current_num_deletions_(0),
1142 current_num_samples_(0),
1143 estimated_compaction_needed_bytes_(0),
1145 force_consistency_checks_(_force_consistency_checks
) {
1146 if (ref_vstorage
!= nullptr) {
1147 accumulated_file_size_
= ref_vstorage
->accumulated_file_size_
;
1148 accumulated_raw_key_size_
= ref_vstorage
->accumulated_raw_key_size_
;
1149 accumulated_raw_value_size_
= ref_vstorage
->accumulated_raw_value_size_
;
1150 accumulated_num_non_deletions_
=
1151 ref_vstorage
->accumulated_num_non_deletions_
;
1152 accumulated_num_deletions_
= ref_vstorage
->accumulated_num_deletions_
;
1153 current_num_non_deletions_
= ref_vstorage
->current_num_non_deletions_
;
1154 current_num_deletions_
= ref_vstorage
->current_num_deletions_
;
1155 current_num_samples_
= ref_vstorage
->current_num_samples_
;
1156 oldest_snapshot_seqnum_
= ref_vstorage
->oldest_snapshot_seqnum_
;
1160 Version::Version(ColumnFamilyData
* column_family_data
, VersionSet
* vset
,
1161 const EnvOptions
& env_opt
,
1162 const MutableCFOptions mutable_cf_options
,
1163 uint64_t version_number
)
1165 cfd_(column_family_data
),
1166 info_log_((cfd_
== nullptr) ? nullptr : cfd_
->ioptions()->info_log
),
1167 db_statistics_((cfd_
== nullptr) ? nullptr
1168 : cfd_
->ioptions()->statistics
),
1169 table_cache_((cfd_
== nullptr) ? nullptr : cfd_
->table_cache()),
1170 merge_operator_((cfd_
== nullptr) ? nullptr
1171 : cfd_
->ioptions()->merge_operator
),
1173 (cfd_
== nullptr) ? nullptr : &cfd_
->internal_comparator(),
1174 (cfd_
== nullptr) ? nullptr : cfd_
->user_comparator(),
1175 cfd_
== nullptr ? 0 : cfd_
->NumberLevels(),
1176 cfd_
== nullptr ? kCompactionStyleLevel
1177 : cfd_
->ioptions()->compaction_style
,
1178 (cfd_
== nullptr || cfd_
->current() == nullptr)
1180 : cfd_
->current()->storage_info(),
1181 cfd_
== nullptr ? false : cfd_
->ioptions()->force_consistency_checks
),
1186 env_options_(env_opt
),
1187 mutable_cf_options_(mutable_cf_options
),
1188 version_number_(version_number
) {}
1190 void Version::Get(const ReadOptions
& read_options
, const LookupKey
& k
,
1191 PinnableSlice
* value
, Status
* status
,
1192 MergeContext
* merge_context
,
1193 SequenceNumber
* max_covering_tombstone_seq
, bool* value_found
,
1194 bool* key_exists
, SequenceNumber
* seq
, ReadCallback
* callback
,
1196 Slice ikey
= k
.internal_key();
1197 Slice user_key
= k
.user_key();
1199 assert(status
->ok() || status
->IsMergeInProgress());
1201 if (key_exists
!= nullptr) {
1202 // will falsify below if not found
1206 PinnedIteratorsManager pinned_iters_mgr
;
1207 GetContext
get_context(
1208 user_comparator(), merge_operator_
, info_log_
, db_statistics_
,
1209 status
->ok() ? GetContext::kNotFound
: GetContext::kMerge
, user_key
,
1210 value
, value_found
, merge_context
, max_covering_tombstone_seq
, this->env_
,
1211 seq
, merge_operator_
? &pinned_iters_mgr
: nullptr, callback
, is_blob
);
1213 // Pin blocks that we read to hold merge operands
1214 if (merge_operator_
) {
1215 pinned_iters_mgr
.StartPinning();
1219 storage_info_
.files_
, user_key
, ikey
, &storage_info_
.level_files_brief_
,
1220 storage_info_
.num_non_empty_levels_
, &storage_info_
.file_indexer_
,
1221 user_comparator(), internal_comparator());
1222 FdWithKeyRange
* f
= fp
.GetNextFile();
1224 while (f
!= nullptr) {
1225 if (*max_covering_tombstone_seq
> 0) {
1226 // The remaining files we look at will only contain covered keys, so we
1230 if (get_context
.sample()) {
1231 sample_file_read_inc(f
->file_metadata
);
1234 bool timer_enabled
=
1235 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex
&&
1236 get_perf_context()->per_level_perf_context_enabled
;
1237 StopWatchNano
timer(env_
, timer_enabled
/* auto_start */);
1238 *status
= table_cache_
->Get(
1239 read_options
, *internal_comparator(), *f
->file_metadata
, ikey
,
1240 &get_context
, mutable_cf_options_
.prefix_extractor
.get(),
1241 cfd_
->internal_stats()->GetFileReadHist(fp
.GetHitFileLevel()),
1242 IsFilterSkipped(static_cast<int>(fp
.GetHitFileLevel()),
1243 fp
.IsHitFileLastInLevel()),
1244 fp
.GetCurrentLevel());
1245 // TODO: examine the behavior for corrupted key
1246 if (timer_enabled
) {
1247 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos
, timer
.ElapsedNanos(),
1248 fp
.GetCurrentLevel());
1250 if (!status
->ok()) {
1254 // report the counters before returning
1255 if (get_context
.State() != GetContext::kNotFound
&&
1256 get_context
.State() != GetContext::kMerge
&&
1257 db_statistics_
!= nullptr) {
1258 get_context
.ReportCounters();
1260 switch (get_context
.State()) {
1261 case GetContext::kNotFound
:
1262 // Keep searching in other files
1264 case GetContext::kMerge
:
1265 // TODO: update per-level perfcontext user_key_return_count for kMerge
1267 case GetContext::kFound
:
1268 if (fp
.GetHitFileLevel() == 0) {
1269 RecordTick(db_statistics_
, GET_HIT_L0
);
1270 } else if (fp
.GetHitFileLevel() == 1) {
1271 RecordTick(db_statistics_
, GET_HIT_L1
);
1272 } else if (fp
.GetHitFileLevel() >= 2) {
1273 RecordTick(db_statistics_
, GET_HIT_L2_AND_UP
);
1275 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count
, 1, fp
.GetHitFileLevel());
1277 case GetContext::kDeleted
:
1278 // Use empty error message for speed
1279 *status
= Status::NotFound();
1281 case GetContext::kCorrupt
:
1282 *status
= Status::Corruption("corrupted key for ", user_key
);
1284 case GetContext::kBlobIndex
:
1285 ROCKS_LOG_ERROR(info_log_
, "Encounter unexpected blob index.");
1286 *status
= Status::NotSupported(
1287 "Encounter unexpected blob index. Please open DB with "
1288 "rocksdb::blob_db::BlobDB instead.");
1291 f
= fp
.GetNextFile();
1294 if (db_statistics_
!= nullptr) {
1295 get_context
.ReportCounters();
1297 if (GetContext::kMerge
== get_context
.State()) {
1298 if (!merge_operator_
) {
1299 *status
= Status::InvalidArgument(
1300 "merge_operator is not properly initialized.");
1303 // merge_operands are in saver and we hit the beginning of the key history
1304 // do a final merge of nullptr and operands;
1305 std::string
* str_value
= value
!= nullptr ? value
->GetSelf() : nullptr;
1306 *status
= MergeHelper::TimedFullMerge(
1307 merge_operator_
, user_key
, nullptr, merge_context
->GetOperands(),
1308 str_value
, info_log_
, db_statistics_
, env_
,
1309 nullptr /* result_operand */, true);
1310 if (LIKELY(value
!= nullptr)) {
1314 if (key_exists
!= nullptr) {
1315 *key_exists
= false;
1317 *status
= Status::NotFound(); // Use an empty error message for speed
1321 bool Version::IsFilterSkipped(int level
, bool is_file_last_in_level
) {
1322 // Reaching the bottom level implies misses at all upper levels, so we'll
1323 // skip checking the filters when we predict a hit.
1324 return cfd_
->ioptions()->optimize_filters_for_hits
&&
1325 (level
> 0 || is_file_last_in_level
) &&
1326 level
== storage_info_
.num_non_empty_levels() - 1;
1329 void VersionStorageInfo::GenerateLevelFilesBrief() {
1330 level_files_brief_
.resize(num_non_empty_levels_
);
1331 for (int level
= 0; level
< num_non_empty_levels_
; level
++) {
1332 DoGenerateLevelFilesBrief(
1333 &level_files_brief_
[level
], files_
[level
], &arena_
);
1337 void Version::PrepareApply(
1338 const MutableCFOptions
& mutable_cf_options
,
1339 bool update_stats
) {
1340 UpdateAccumulatedStats(update_stats
);
1341 storage_info_
.UpdateNumNonEmptyLevels();
1342 storage_info_
.CalculateBaseBytes(*cfd_
->ioptions(), mutable_cf_options
);
1343 storage_info_
.UpdateFilesByCompactionPri(cfd_
->ioptions()->compaction_pri
);
1344 storage_info_
.GenerateFileIndexer();
1345 storage_info_
.GenerateLevelFilesBrief();
1346 storage_info_
.GenerateLevel0NonOverlapping();
1347 storage_info_
.GenerateBottommostFiles();
1350 bool Version::MaybeInitializeFileMetaData(FileMetaData
* file_meta
) {
1351 if (file_meta
->init_stats_from_file
||
1352 file_meta
->compensated_file_size
> 0) {
1355 std::shared_ptr
<const TableProperties
> tp
;
1356 Status s
= GetTableProperties(&tp
, file_meta
);
1357 file_meta
->init_stats_from_file
= true;
1359 ROCKS_LOG_ERROR(vset_
->db_options_
->info_log
,
1360 "Unable to load table properties for file %" PRIu64
1362 file_meta
->fd
.GetNumber(), s
.ToString().c_str());
1365 if (tp
.get() == nullptr) return false;
1366 file_meta
->num_entries
= tp
->num_entries
;
1367 file_meta
->num_deletions
= tp
->num_deletions
;
1368 file_meta
->raw_value_size
= tp
->raw_value_size
;
1369 file_meta
->raw_key_size
= tp
->raw_key_size
;
1374 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData
* file_meta
) {
1375 assert(file_meta
->init_stats_from_file
);
1376 accumulated_file_size_
+= file_meta
->fd
.GetFileSize();
1377 accumulated_raw_key_size_
+= file_meta
->raw_key_size
;
1378 accumulated_raw_value_size_
+= file_meta
->raw_value_size
;
1379 accumulated_num_non_deletions_
+=
1380 file_meta
->num_entries
- file_meta
->num_deletions
;
1381 accumulated_num_deletions_
+= file_meta
->num_deletions
;
1383 current_num_non_deletions_
+=
1384 file_meta
->num_entries
- file_meta
->num_deletions
;
1385 current_num_deletions_
+= file_meta
->num_deletions
;
1386 current_num_samples_
++;
1389 void VersionStorageInfo::RemoveCurrentStats(FileMetaData
* file_meta
) {
1390 if (file_meta
->init_stats_from_file
) {
1391 current_num_non_deletions_
-=
1392 file_meta
->num_entries
- file_meta
->num_deletions
;
1393 current_num_deletions_
-= file_meta
->num_deletions
;
1394 current_num_samples_
--;
1398 void Version::UpdateAccumulatedStats(bool update_stats
) {
1400 // maximum number of table properties loaded from files.
1401 const int kMaxInitCount
= 20;
1403 // here only the first kMaxInitCount files which haven't been
1404 // initialized from file will be updated with num_deletions.
1405 // The motivation here is to cap the maximum I/O per Version creation.
1406 // The reason for choosing files from lower-level instead of higher-level
1407 // is that such design is able to propagate the initialization from
1408 // lower-level to higher-level: When the num_deletions of lower-level
1409 // files are updated, it will make the lower-level files have accurate
1410 // compensated_file_size, making lower-level to higher-level compaction
1411 // will be triggered, which creates higher-level files whose num_deletions
1412 // will be updated here.
1414 level
< storage_info_
.num_levels_
&& init_count
< kMaxInitCount
;
1416 for (auto* file_meta
: storage_info_
.files_
[level
]) {
1417 if (MaybeInitializeFileMetaData(file_meta
)) {
1418 // each FileMeta will be initialized only once.
1419 storage_info_
.UpdateAccumulatedStats(file_meta
);
1420 // when option "max_open_files" is -1, all the file metadata has
1421 // already been read, so MaybeInitializeFileMetaData() won't incur
1422 // any I/O cost. "max_open_files=-1" means that the table cache passed
1423 // to the VersionSet and then to the ColumnFamilySet has a size of
1424 // TableCache::kInfiniteCapacity
1425 if (vset_
->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
1426 TableCache::kInfiniteCapacity
) {
1429 if (++init_count
>= kMaxInitCount
) {
1435 // In case all sampled-files contain only deletion entries, then we
1436 // load the table-property of a file in higher-level to initialize
1438 for (int level
= storage_info_
.num_levels_
- 1;
1439 storage_info_
.accumulated_raw_value_size_
== 0 && level
>= 0;
1441 for (int i
= static_cast<int>(storage_info_
.files_
[level
].size()) - 1;
1442 storage_info_
.accumulated_raw_value_size_
== 0 && i
>= 0; --i
) {
1443 if (MaybeInitializeFileMetaData(storage_info_
.files_
[level
][i
])) {
1444 storage_info_
.UpdateAccumulatedStats(storage_info_
.files_
[level
][i
]);
1450 storage_info_
.ComputeCompensatedSizes();
1453 void VersionStorageInfo::ComputeCompensatedSizes() {
1454 static const int kDeletionWeightOnCompaction
= 2;
1455 uint64_t average_value_size
= GetAverageValueSize();
1457 // compute the compensated size
1458 for (int level
= 0; level
< num_levels_
; level
++) {
1459 for (auto* file_meta
: files_
[level
]) {
1460 // Here we only compute compensated_file_size for those file_meta
1461 // which compensated_file_size is uninitialized (== 0). This is true only
1462 // for files that have been created right now and no other thread has
1463 // access to them. That's why we can safely mutate compensated_file_size.
1464 if (file_meta
->compensated_file_size
== 0) {
1465 file_meta
->compensated_file_size
= file_meta
->fd
.GetFileSize();
1466 // Here we only boost the size of deletion entries of a file only
1467 // when the number of deletion entries is greater than the number of
1468 // non-deletion entries in the file. The motivation here is that in
1469 // a stable workload, the number of deletion entries should be roughly
1470 // equal to the number of non-deletion entries. If we compensate the
1471 // size of deletion entries in a stable workload, the deletion
1472 // compensation logic might introduce unwanted effet which changes the
1473 // shape of LSM tree.
1474 if (file_meta
->num_deletions
* 2 >= file_meta
->num_entries
) {
1475 file_meta
->compensated_file_size
+=
1476 (file_meta
->num_deletions
* 2 - file_meta
->num_entries
) *
1477 average_value_size
* kDeletionWeightOnCompaction
;
1484 int VersionStorageInfo::MaxInputLevel() const {
1485 if (compaction_style_
== kCompactionStyleLevel
) {
1486 return num_levels() - 2;
1491 int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind
) const {
1492 if (allow_ingest_behind
) {
1493 assert(num_levels() > 1);
1494 return num_levels() - 2;
1496 return num_levels() - 1;
1499 void VersionStorageInfo::EstimateCompactionBytesNeeded(
1500 const MutableCFOptions
& mutable_cf_options
) {
1501 // Only implemented for level-based compaction
1502 if (compaction_style_
!= kCompactionStyleLevel
) {
1503 estimated_compaction_needed_bytes_
= 0;
1507 // Start from Level 0, if level 0 qualifies compaction to level 1,
1508 // we estimate the size of compaction.
1509 // Then we move on to the next level and see whether it qualifies compaction
1510 // to the next level. The size of the level is estimated as the actual size
1511 // on the level plus the input bytes from the previous level if there is any.
1512 // If it exceeds, take the exceeded bytes as compaction input and add the size
1513 // of the compaction size to tatal size.
1514 // We keep doing it to Level 2, 3, etc, until the last level and return the
1515 // accumulated bytes.
1517 uint64_t bytes_compact_to_next_level
= 0;
1518 uint64_t level_size
= 0;
1519 for (auto* f
: files_
[0]) {
1520 level_size
+= f
->fd
.GetFileSize();
1523 bool level0_compact_triggered
= false;
1524 if (static_cast<int>(files_
[0].size()) >=
1525 mutable_cf_options
.level0_file_num_compaction_trigger
||
1526 level_size
>= mutable_cf_options
.max_bytes_for_level_base
) {
1527 level0_compact_triggered
= true;
1528 estimated_compaction_needed_bytes_
= level_size
;
1529 bytes_compact_to_next_level
= level_size
;
1531 estimated_compaction_needed_bytes_
= 0;
1535 uint64_t bytes_next_level
= 0;
1536 for (int level
= base_level(); level
<= MaxInputLevel(); level
++) {
1538 if (bytes_next_level
> 0) {
1540 uint64_t level_size2
= 0;
1541 for (auto* f
: files_
[level
]) {
1542 level_size2
+= f
->fd
.GetFileSize();
1544 assert(level_size2
== bytes_next_level
);
1546 level_size
= bytes_next_level
;
1547 bytes_next_level
= 0;
1549 for (auto* f
: files_
[level
]) {
1550 level_size
+= f
->fd
.GetFileSize();
1553 if (level
== base_level() && level0_compact_triggered
) {
1554 // Add base level size to compaction if level0 compaction triggered.
1555 estimated_compaction_needed_bytes_
+= level_size
;
1557 // Add size added by previous compaction
1558 level_size
+= bytes_compact_to_next_level
;
1559 bytes_compact_to_next_level
= 0;
1560 uint64_t level_target
= MaxBytesForLevel(level
);
1561 if (level_size
> level_target
) {
1562 bytes_compact_to_next_level
= level_size
- level_target
;
1563 // Estimate the actual compaction fan-out ratio as size ratio between
1566 assert(bytes_next_level
== 0);
1567 if (level
+ 1 < num_levels_
) {
1568 for (auto* f
: files_
[level
+ 1]) {
1569 bytes_next_level
+= f
->fd
.GetFileSize();
1572 if (bytes_next_level
> 0) {
1573 assert(level_size
> 0);
1574 estimated_compaction_needed_bytes_
+= static_cast<uint64_t>(
1575 static_cast<double>(bytes_compact_to_next_level
) *
1576 (static_cast<double>(bytes_next_level
) /
1577 static_cast<double>(level_size
) +
1585 uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions
& ioptions
,
1586 const MutableCFOptions
& mutable_cf_options
,
1587 const std::vector
<FileMetaData
*>& files
) {
1588 uint32_t ttl_expired_files_count
= 0;
1590 int64_t _current_time
;
1591 auto status
= ioptions
.env
->GetCurrentTime(&_current_time
);
1593 const uint64_t current_time
= static_cast<uint64_t>(_current_time
);
1594 for (auto f
: files
) {
1595 if (!f
->being_compacted
&& f
->fd
.table_reader
!= nullptr &&
1596 f
->fd
.table_reader
->GetTableProperties() != nullptr) {
1597 auto creation_time
=
1598 f
->fd
.table_reader
->GetTableProperties()->creation_time
;
1599 if (creation_time
> 0 &&
1600 creation_time
< (current_time
- mutable_cf_options
.ttl
)) {
1601 ttl_expired_files_count
++;
1606 return ttl_expired_files_count
;
1608 } // anonymous namespace
1610 void VersionStorageInfo::ComputeCompactionScore(
1611 const ImmutableCFOptions
& immutable_cf_options
,
1612 const MutableCFOptions
& mutable_cf_options
) {
1613 for (int level
= 0; level
<= MaxInputLevel(); level
++) {
1616 // We treat level-0 specially by bounding the number of files
1617 // instead of number of bytes for two reasons:
1619 // (1) With larger write-buffer sizes, it is nice not to do too
1620 // many level-0 compactions.
1622 // (2) The files in level-0 are merged on every read and
1623 // therefore we wish to avoid too many files when the individual
1624 // file size is small (perhaps because of a small write-buffer
1625 // setting, or very high compression ratios, or lots of
1626 // overwrites/deletions).
1627 int num_sorted_runs
= 0;
1628 uint64_t total_size
= 0;
1629 for (auto* f
: files_
[level
]) {
1630 if (!f
->being_compacted
) {
1631 total_size
+= f
->compensated_file_size
;
1635 if (compaction_style_
== kCompactionStyleUniversal
) {
1636 // For universal compaction, we use level0 score to indicate
1637 // compaction score for the whole DB. Adding other levels as if
1638 // they are L0 files.
1639 for (int i
= 1; i
< num_levels(); i
++) {
1640 if (!files_
[i
].empty() && !files_
[i
][0]->being_compacted
) {
1646 if (compaction_style_
== kCompactionStyleFIFO
) {
1647 score
= static_cast<double>(total_size
) /
1648 mutable_cf_options
.compaction_options_fifo
.max_table_files_size
;
1649 if (mutable_cf_options
.compaction_options_fifo
.allow_compaction
) {
1651 static_cast<double>(num_sorted_runs
) /
1652 mutable_cf_options
.level0_file_num_compaction_trigger
,
1655 if (mutable_cf_options
.ttl
> 0) {
1657 static_cast<double>(GetExpiredTtlFilesCount(
1658 immutable_cf_options
, mutable_cf_options
, files_
[level
])),
1663 score
= static_cast<double>(num_sorted_runs
) /
1664 mutable_cf_options
.level0_file_num_compaction_trigger
;
1665 if (compaction_style_
== kCompactionStyleLevel
&& num_levels() > 1) {
1666 // Level-based involves L0->L0 compactions that can lead to oversized
1667 // L0 files. Take into account size as well to avoid later giant
1668 // compactions to the base level.
1670 score
, static_cast<double>(total_size
) /
1671 mutable_cf_options
.max_bytes_for_level_base
);
1675 // Compute the ratio of current size to size limit.
1676 uint64_t level_bytes_no_compacting
= 0;
1677 for (auto f
: files_
[level
]) {
1678 if (!f
->being_compacted
) {
1679 level_bytes_no_compacting
+= f
->compensated_file_size
;
1682 score
= static_cast<double>(level_bytes_no_compacting
) /
1683 MaxBytesForLevel(level
);
1685 compaction_level_
[level
] = level
;
1686 compaction_score_
[level
] = score
;
1689 // sort all the levels based on their score. Higher scores get listed
1690 // first. Use bubble sort because the number of entries are small.
1691 for (int i
= 0; i
< num_levels() - 2; i
++) {
1692 for (int j
= i
+ 1; j
< num_levels() - 1; j
++) {
1693 if (compaction_score_
[i
] < compaction_score_
[j
]) {
1694 double score
= compaction_score_
[i
];
1695 int level
= compaction_level_
[i
];
1696 compaction_score_
[i
] = compaction_score_
[j
];
1697 compaction_level_
[i
] = compaction_level_
[j
];
1698 compaction_score_
[j
] = score
;
1699 compaction_level_
[j
] = level
;
1703 ComputeFilesMarkedForCompaction();
1704 ComputeBottommostFilesMarkedForCompaction();
1705 if (mutable_cf_options
.ttl
> 0) {
1706 ComputeExpiredTtlFiles(immutable_cf_options
, mutable_cf_options
.ttl
);
1708 EstimateCompactionBytesNeeded(mutable_cf_options
);
1711 void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
1712 files_marked_for_compaction_
.clear();
1713 int last_qualify_level
= 0;
1715 // Do not include files from the last level with data
1716 // If table properties collector suggests a file on the last level,
1717 // we should not move it to a new level.
1718 for (int level
= num_levels() - 1; level
>= 1; level
--) {
1719 if (!files_
[level
].empty()) {
1720 last_qualify_level
= level
- 1;
1725 for (int level
= 0; level
<= last_qualify_level
; level
++) {
1726 for (auto* f
: files_
[level
]) {
1727 if (!f
->being_compacted
&& f
->marked_for_compaction
) {
1728 files_marked_for_compaction_
.emplace_back(level
, f
);
1734 void VersionStorageInfo::ComputeExpiredTtlFiles(
1735 const ImmutableCFOptions
& ioptions
, const uint64_t ttl
) {
1738 expired_ttl_files_
.clear();
1740 int64_t _current_time
;
1741 auto status
= ioptions
.env
->GetCurrentTime(&_current_time
);
1745 const uint64_t current_time
= static_cast<uint64_t>(_current_time
);
1747 for (int level
= 0; level
< num_levels() - 1; level
++) {
1748 for (auto f
: files_
[level
]) {
1749 if (!f
->being_compacted
&& f
->fd
.table_reader
!= nullptr &&
1750 f
->fd
.table_reader
->GetTableProperties() != nullptr) {
1751 auto creation_time
=
1752 f
->fd
.table_reader
->GetTableProperties()->creation_time
;
1753 if (creation_time
> 0 && creation_time
< (current_time
- ttl
)) {
1754 expired_ttl_files_
.emplace_back(level
, f
);
1763 // used to sort files by size
1769 // Compator that is used to sort files based on their size
1770 // In normal mode: descending size
1771 bool CompareCompensatedSizeDescending(const Fsize
& first
, const Fsize
& second
) {
1772 return (first
.file
->compensated_file_size
>
1773 second
.file
->compensated_file_size
);
1775 } // anonymous namespace
1777 void VersionStorageInfo::AddFile(int level
, FileMetaData
* f
, Logger
* info_log
) {
1778 auto* level_files
= &files_
[level
];
1781 if (level
> 0 && !level_files
->empty() &&
1782 internal_comparator_
->Compare(
1783 (*level_files
)[level_files
->size() - 1]->largest
, f
->smallest
) >= 0) {
1784 auto* f2
= (*level_files
)[level_files
->size() - 1];
1785 if (info_log
!= nullptr) {
1786 Error(info_log
, "Adding new file %" PRIu64
1787 " range (%s, %s) to level %d but overlapping "
1788 "with existing file %" PRIu64
" %s %s",
1789 f
->fd
.GetNumber(), f
->smallest
.DebugString(true).c_str(),
1790 f
->largest
.DebugString(true).c_str(), level
, f2
->fd
.GetNumber(),
1791 f2
->smallest
.DebugString(true).c_str(),
1792 f2
->largest
.DebugString(true).c_str());
1801 level_files
->push_back(f
);
1804 // Version::PrepareApply() need to be called before calling the function, or
1805 // following functions called:
1806 // 1. UpdateNumNonEmptyLevels();
1807 // 2. CalculateBaseBytes();
1808 // 3. UpdateFilesByCompactionPri();
1809 // 4. GenerateFileIndexer();
1810 // 5. GenerateLevelFilesBrief();
1811 // 6. GenerateLevel0NonOverlapping();
1812 // 7. GenerateBottommostFiles();
1813 void VersionStorageInfo::SetFinalized() {
1816 if (compaction_style_
!= kCompactionStyleLevel
) {
1817 // Not level based compaction.
1820 assert(base_level_
< 0 || num_levels() == 1 ||
1821 (base_level_
>= 1 && base_level_
< num_levels()));
1822 // Verify all levels newer than base_level are empty except L0
1823 for (int level
= 1; level
< base_level(); level
++) {
1824 assert(NumLevelBytes(level
) == 0);
1826 uint64_t max_bytes_prev_level
= 0;
1827 for (int level
= base_level(); level
< num_levels() - 1; level
++) {
1828 if (LevelFiles(level
).size() == 0) {
1831 assert(MaxBytesForLevel(level
) >= max_bytes_prev_level
);
1832 max_bytes_prev_level
= MaxBytesForLevel(level
);
1834 int num_empty_non_l0_level
= 0;
1835 for (int level
= 0; level
< num_levels(); level
++) {
1836 assert(LevelFiles(level
).size() == 0 ||
1837 LevelFiles(level
).size() == LevelFilesBrief(level
).num_files
);
1838 if (level
> 0 && NumLevelBytes(level
) > 0) {
1839 num_empty_non_l0_level
++;
1841 if (LevelFiles(level
).size() > 0) {
1842 assert(level
< num_non_empty_levels());
1845 assert(compaction_level_
.size() > 0);
1846 assert(compaction_level_
.size() == compaction_score_
.size());
1850 void VersionStorageInfo::UpdateNumNonEmptyLevels() {
1851 num_non_empty_levels_
= num_levels_
;
1852 for (int i
= num_levels_
- 1; i
>= 0; i
--) {
1853 if (files_
[i
].size() != 0) {
1856 num_non_empty_levels_
= i
;
1862 // Sort `temp` based on ratio of overlapping size over file size
1863 void SortFileByOverlappingRatio(
1864 const InternalKeyComparator
& icmp
, const std::vector
<FileMetaData
*>& files
,
1865 const std::vector
<FileMetaData
*>& next_level_files
,
1866 std::vector
<Fsize
>* temp
) {
1867 std::unordered_map
<uint64_t, uint64_t> file_to_order
;
1868 auto next_level_it
= next_level_files
.begin();
1870 for (auto& file
: files
) {
1871 uint64_t overlapping_bytes
= 0;
1872 // Skip files in next level that is smaller than current file
1873 while (next_level_it
!= next_level_files
.end() &&
1874 icmp
.Compare((*next_level_it
)->largest
, file
->smallest
) < 0) {
1878 while (next_level_it
!= next_level_files
.end() &&
1879 icmp
.Compare((*next_level_it
)->smallest
, file
->largest
) < 0) {
1880 overlapping_bytes
+= (*next_level_it
)->fd
.file_size
;
1882 if (icmp
.Compare((*next_level_it
)->largest
, file
->largest
) > 0) {
1883 // next level file cross large boundary of current file.
1889 assert(file
->compensated_file_size
!= 0);
1890 file_to_order
[file
->fd
.GetNumber()] =
1891 overlapping_bytes
* 1024u / file
->compensated_file_size
;
1894 std::sort(temp
->begin(), temp
->end(),
1895 [&](const Fsize
& f1
, const Fsize
& f2
) -> bool {
1896 return file_to_order
[f1
.file
->fd
.GetNumber()] <
1897 file_to_order
[f2
.file
->fd
.GetNumber()];
1902 void VersionStorageInfo::UpdateFilesByCompactionPri(
1903 CompactionPri compaction_pri
) {
1904 if (compaction_style_
== kCompactionStyleNone
||
1905 compaction_style_
== kCompactionStyleFIFO
||
1906 compaction_style_
== kCompactionStyleUniversal
) {
1910 // No need to sort the highest level because it is never compacted.
1911 for (int level
= 0; level
< num_levels() - 1; level
++) {
1912 const std::vector
<FileMetaData
*>& files
= files_
[level
];
1913 auto& files_by_compaction_pri
= files_by_compaction_pri_
[level
];
1914 assert(files_by_compaction_pri
.size() == 0);
1916 // populate a temp vector for sorting based on size
1917 std::vector
<Fsize
> temp(files
.size());
1918 for (size_t i
= 0; i
< files
.size(); i
++) {
1920 temp
[i
].file
= files
[i
];
1923 // sort the top number_of_files_to_sort_ based on file size
1924 size_t num
= VersionStorageInfo::kNumberFilesToSort
;
1925 if (num
> temp
.size()) {
1928 switch (compaction_pri
) {
1929 case kByCompensatedSize
:
1930 std::partial_sort(temp
.begin(), temp
.begin() + num
, temp
.end(),
1931 CompareCompensatedSizeDescending
);
1933 case kOldestLargestSeqFirst
:
1934 std::sort(temp
.begin(), temp
.end(),
1935 [](const Fsize
& f1
, const Fsize
& f2
) -> bool {
1936 return f1
.file
->fd
.largest_seqno
<
1937 f2
.file
->fd
.largest_seqno
;
1940 case kOldestSmallestSeqFirst
:
1941 std::sort(temp
.begin(), temp
.end(),
1942 [](const Fsize
& f1
, const Fsize
& f2
) -> bool {
1943 return f1
.file
->fd
.smallest_seqno
<
1944 f2
.file
->fd
.smallest_seqno
;
1947 case kMinOverlappingRatio
:
1948 SortFileByOverlappingRatio(*internal_comparator_
, files_
[level
],
1949 files_
[level
+ 1], &temp
);
1954 assert(temp
.size() == files
.size());
1956 // initialize files_by_compaction_pri_
1957 for (size_t i
= 0; i
< temp
.size(); i
++) {
1958 files_by_compaction_pri
.push_back(static_cast<int>(temp
[i
].index
));
1960 next_file_to_compact_by_size_
[level
] = 0;
1961 assert(files_
[level
].size() == files_by_compaction_pri_
[level
].size());
1965 void VersionStorageInfo::GenerateLevel0NonOverlapping() {
1966 assert(!finalized_
);
1967 level0_non_overlapping_
= true;
1968 if (level_files_brief_
.size() == 0) {
1972 // A copy of L0 files sorted by smallest key
1973 std::vector
<FdWithKeyRange
> level0_sorted_file(
1974 level_files_brief_
[0].files
,
1975 level_files_brief_
[0].files
+ level_files_brief_
[0].num_files
);
1976 std::sort(level0_sorted_file
.begin(), level0_sorted_file
.end(),
1977 [this](const FdWithKeyRange
& f1
, const FdWithKeyRange
& f2
) -> bool {
1978 return (internal_comparator_
->Compare(f1
.smallest_key
,
1979 f2
.smallest_key
) < 0);
1982 for (size_t i
= 1; i
< level0_sorted_file
.size(); ++i
) {
1983 FdWithKeyRange
& f
= level0_sorted_file
[i
];
1984 FdWithKeyRange
& prev
= level0_sorted_file
[i
- 1];
1985 if (internal_comparator_
->Compare(prev
.largest_key
, f
.smallest_key
) >= 0) {
1986 level0_non_overlapping_
= false;
1992 void VersionStorageInfo::GenerateBottommostFiles() {
1993 assert(!finalized_
);
1994 assert(bottommost_files_
.empty());
1995 for (size_t level
= 0; level
< level_files_brief_
.size(); ++level
) {
1996 for (size_t file_idx
= 0; file_idx
< level_files_brief_
[level
].num_files
;
1998 const FdWithKeyRange
& f
= level_files_brief_
[level
].files
[file_idx
];
2001 l0_file_idx
= static_cast<int>(file_idx
);
2005 Slice smallest_user_key
= ExtractUserKey(f
.smallest_key
);
2006 Slice largest_user_key
= ExtractUserKey(f
.largest_key
);
2007 if (!RangeMightExistAfterSortedRun(smallest_user_key
, largest_user_key
,
2008 static_cast<int>(level
),
2010 bottommost_files_
.emplace_back(static_cast<int>(level
),
2017 void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum
) {
2018 assert(seqnum
>= oldest_snapshot_seqnum_
);
2019 oldest_snapshot_seqnum_
= seqnum
;
2020 if (oldest_snapshot_seqnum_
> bottommost_files_mark_threshold_
) {
2021 ComputeBottommostFilesMarkedForCompaction();
2025 void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
2026 bottommost_files_marked_for_compaction_
.clear();
2027 bottommost_files_mark_threshold_
= kMaxSequenceNumber
;
2028 for (auto& level_and_file
: bottommost_files_
) {
2029 if (!level_and_file
.second
->being_compacted
&&
2030 level_and_file
.second
->fd
.largest_seqno
!= 0 &&
2031 level_and_file
.second
->num_deletions
> 1) {
2032 // largest_seqno might be nonzero due to containing the final key in an
2033 // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
2034 // ensures the file really contains deleted or overwritten keys.
2035 if (level_and_file
.second
->fd
.largest_seqno
< oldest_snapshot_seqnum_
) {
2036 bottommost_files_marked_for_compaction_
.push_back(level_and_file
);
2038 bottommost_files_mark_threshold_
=
2039 std::min(bottommost_files_mark_threshold_
,
2040 level_and_file
.second
->fd
.largest_seqno
);
2046 void Version::Ref() {
2050 bool Version::Unref() {
2060 bool VersionStorageInfo::OverlapInLevel(int level
,
2061 const Slice
* smallest_user_key
,
2062 const Slice
* largest_user_key
) {
2063 if (level
>= num_non_empty_levels_
) {
2064 // empty level, no overlap
2067 return SomeFileOverlapsRange(*internal_comparator_
, (level
> 0),
2068 level_files_brief_
[level
], smallest_user_key
,
2072 // Store in "*inputs" all files in "level" that overlap [begin,end]
2073 // If hint_index is specified, then it points to a file in the
2074 // overlapping range.
2075 // The file_index returns a pointer to any file in an overlapping range.
2076 void VersionStorageInfo::GetOverlappingInputs(
2077 int level
, const InternalKey
* begin
, const InternalKey
* end
,
2078 std::vector
<FileMetaData
*>* inputs
, int hint_index
, int* file_index
,
2079 bool expand_range
, InternalKey
** next_smallest
) const {
2080 if (level
>= num_non_empty_levels_
) {
2081 // this level is empty, no overlapping inputs
2089 const Comparator
* user_cmp
= user_comparator_
;
2091 GetOverlappingInputsRangeBinarySearch(level
, begin
, end
, inputs
, hint_index
,
2092 file_index
, false, next_smallest
);
2096 if (next_smallest
) {
2097 // next_smallest key only makes sense for non-level 0, where files are
2099 *next_smallest
= nullptr;
2102 Slice user_begin
, user_end
;
2103 if (begin
!= nullptr) {
2104 user_begin
= begin
->user_key();
2106 if (end
!= nullptr) {
2107 user_end
= end
->user_key();
2110 // index stores the file index need to check.
2111 std::list
<size_t> index
;
2112 for (size_t i
= 0; i
< level_files_brief_
[level
].num_files
; i
++) {
2113 index
.emplace_back(i
);
2116 while (!index
.empty()) {
2117 bool found_overlapping_file
= false;
2118 auto iter
= index
.begin();
2119 while (iter
!= index
.end()) {
2120 FdWithKeyRange
* f
= &(level_files_brief_
[level
].files
[*iter
]);
2121 const Slice file_start
= ExtractUserKey(f
->smallest_key
);
2122 const Slice file_limit
= ExtractUserKey(f
->largest_key
);
2123 if (begin
!= nullptr && user_cmp
->Compare(file_limit
, user_begin
) < 0) {
2124 // "f" is completely before specified range; skip it
2126 } else if (end
!= nullptr &&
2127 user_cmp
->Compare(file_start
, user_end
) > 0) {
2128 // "f" is completely after specified range; skip it
2132 inputs
->emplace_back(files_
[level
][*iter
]);
2133 found_overlapping_file
= true;
2134 // record the first file index.
2135 if (file_index
&& *file_index
== -1) {
2136 *file_index
= static_cast<int>(*iter
);
2138 // the related file is overlap, erase to avoid checking again.
2139 iter
= index
.erase(iter
);
2141 if (begin
!= nullptr &&
2142 user_cmp
->Compare(file_start
, user_begin
) < 0) {
2143 user_begin
= file_start
;
2145 if (end
!= nullptr && user_cmp
->Compare(file_limit
, user_end
) > 0) {
2146 user_end
= file_limit
;
2151 // if all the files left are not overlap, break
2152 if (!found_overlapping_file
) {
2158 // Store in "*inputs" files in "level" that within range [begin,end]
2159 // Guarantee a "clean cut" boundary between the files in inputs
2160 // and the surrounding files and the maxinum number of files.
2161 // This will ensure that no parts of a key are lost during compaction.
2162 // If hint_index is specified, then it points to a file in the range.
2163 // The file_index returns a pointer to any file in an overlapping range.
2164 void VersionStorageInfo::GetCleanInputsWithinInterval(
2165 int level
, const InternalKey
* begin
, const InternalKey
* end
,
2166 std::vector
<FileMetaData
*>* inputs
, int hint_index
, int* file_index
) const {
2171 if (level
>= num_non_empty_levels_
|| level
== 0 ||
2172 level_files_brief_
[level
].num_files
== 0) {
2173 // this level is empty, no inputs within range
2174 // also don't support clean input interval within L0
2178 const auto& level_files
= level_files_brief_
[level
];
2179 if (begin
== nullptr) {
2180 begin
= &level_files
.files
[0].file_metadata
->smallest
;
2182 if (end
== nullptr) {
2183 end
= &level_files
.files
[level_files
.num_files
- 1].file_metadata
->largest
;
2186 GetOverlappingInputsRangeBinarySearch(level
, begin
, end
, inputs
,
2187 hint_index
, file_index
,
2188 true /* within_interval */);
2191 // Store in "*inputs" all files in "level" that overlap [begin,end]
2192 // Employ binary search to find at least one file that overlaps the
2193 // specified range. From that file, iterate backwards and
2194 // forwards to find all overlapping files.
2195 // if within_range is set, then only store the maximum clean inputs
2196 // within range [begin, end]. "clean" means there is a boudnary
2197 // between the files in "*inputs" and the surrounding files
2198 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
2199 int level
, const InternalKey
* begin
, const InternalKey
* end
,
2200 std::vector
<FileMetaData
*>* inputs
, int hint_index
, int* file_index
,
2201 bool within_interval
, InternalKey
** next_smallest
) const {
2205 int max
= static_cast<int>(files_
[level
].size()) - 1;
2206 bool foundOverlap
= false;
2207 auto user_cmp
= user_comparator_
;
2209 // if the caller already knows the index of a file that has overlap,
2210 // then we can skip the binary search.
2211 if (hint_index
!= -1) {
2213 foundOverlap
= true;
2216 while (!foundOverlap
&& min
<= max
) {
2217 mid
= (min
+ max
)/2;
2218 FdWithKeyRange
* f
= &(level_files_brief_
[level
].files
[mid
]);
2219 auto& smallest
= f
->file_metadata
->smallest
;
2220 auto& largest
= f
->file_metadata
->largest
;
2221 if ((!within_interval
&& sstableKeyCompare(user_cmp
, begin
, largest
) > 0) ||
2222 (within_interval
&& sstableKeyCompare(user_cmp
, begin
, smallest
) > 0)) {
2224 } else if ((!within_interval
&&
2225 sstableKeyCompare(user_cmp
, smallest
, end
) > 0) ||
2227 sstableKeyCompare(user_cmp
, largest
, end
) > 0)) {
2230 foundOverlap
= true;
2235 // If there were no overlapping files, return immediately.
2236 if (!foundOverlap
) {
2237 if (next_smallest
) {
2238 *next_smallest
= nullptr;
2242 // returns the index where an overlap is found
2247 int start_index
, end_index
;
2248 if (within_interval
) {
2249 ExtendFileRangeWithinInterval(level
, begin
, end
, mid
,
2250 &start_index
, &end_index
);
2252 ExtendFileRangeOverlappingInterval(level
, begin
, end
, mid
,
2253 &start_index
, &end_index
);
2254 assert(end_index
>= start_index
);
2256 // insert overlapping files into vector
2257 for (int i
= start_index
; i
<= end_index
; i
++) {
2258 inputs
->push_back(files_
[level
][i
]);
2261 if (next_smallest
!= nullptr) {
2262 // Provide the next key outside the range covered by inputs
2263 if (++end_index
< static_cast<int>(files_
[level
].size())) {
2264 **next_smallest
= files_
[level
][end_index
]->smallest
;
2266 *next_smallest
= nullptr;
2271 // Store in *start_index and *end_index the range of all files in
2272 // "level" that overlap [begin,end]
2273 // The mid_index specifies the index of at least one file that
2274 // overlaps the specified range. From that file, iterate backward
2275 // and forward to find all overlapping files.
2276 // Use FileLevel in searching, make it faster
2277 void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
2278 int level
, const InternalKey
* begin
, const InternalKey
* end
,
2279 unsigned int mid_index
, int* start_index
, int* end_index
) const {
2280 auto user_cmp
= user_comparator_
;
2281 const FdWithKeyRange
* files
= level_files_brief_
[level
].files
;
2284 // assert that the file at mid_index overlaps with the range
2285 assert(mid_index
< level_files_brief_
[level
].num_files
);
2286 const FdWithKeyRange
* f
= &files
[mid_index
];
2287 auto& smallest
= f
->file_metadata
->smallest
;
2288 auto& largest
= f
->file_metadata
->largest
;
2289 if (sstableKeyCompare(user_cmp
, begin
, smallest
) <= 0) {
2290 assert(sstableKeyCompare(user_cmp
, smallest
, end
) <= 0);
2292 // fprintf(stderr, "ExtendFileRangeOverlappingInterval\n%s - %s\n%s - %s\n%d %d\n",
2293 // begin ? begin->DebugString().c_str() : "(null)",
2294 // end ? end->DebugString().c_str() : "(null)",
2295 // smallest->DebugString().c_str(),
2296 // largest->DebugString().c_str(),
2297 // sstableKeyCompare(user_cmp, smallest, begin),
2298 // sstableKeyCompare(user_cmp, largest, begin));
2299 assert(sstableKeyCompare(user_cmp
, begin
, largest
) <= 0);
2303 *start_index
= mid_index
+ 1;
2304 *end_index
= mid_index
;
2305 int count
__attribute__((__unused__
));
2308 // check backwards from 'mid' to lower indices
2309 for (int i
= mid_index
; i
>= 0 ; i
--) {
2310 const FdWithKeyRange
* f
= &files
[i
];
2311 auto& largest
= f
->file_metadata
->largest
;
2312 if (sstableKeyCompare(user_cmp
, begin
, largest
) <= 0) {
2314 assert((count
++, true));
2319 // check forward from 'mid+1' to higher indices
2320 for (unsigned int i
= mid_index
+1;
2321 i
< level_files_brief_
[level
].num_files
; i
++) {
2322 const FdWithKeyRange
* f
= &files
[i
];
2323 auto& smallest
= f
->file_metadata
->smallest
;
2324 if (sstableKeyCompare(user_cmp
, smallest
, end
) <= 0) {
2325 assert((count
++, true));
2331 assert(count
== *end_index
- *start_index
+ 1);
2334 // Store in *start_index and *end_index the clean range of all files in
2335 // "level" within [begin,end]
2336 // The mid_index specifies the index of at least one file within
2337 // the specified range. From that file, iterate backward
2338 // and forward to find all overlapping files and then "shrink" to
2339 // the clean range required.
2340 // Use FileLevel in searching, make it faster
2341 void VersionStorageInfo::ExtendFileRangeWithinInterval(
2342 int level
, const InternalKey
* begin
, const InternalKey
* end
,
2343 unsigned int mid_index
, int* start_index
, int* end_index
) const {
2345 auto* user_cmp
= user_comparator_
;
2346 const FdWithKeyRange
* files
= level_files_brief_
[level
].files
;
2349 // assert that the file at mid_index is within the range
2350 assert(mid_index
< level_files_brief_
[level
].num_files
);
2351 const FdWithKeyRange
* f
= &files
[mid_index
];
2352 auto& smallest
= f
->file_metadata
->smallest
;
2353 auto& largest
= f
->file_metadata
->largest
;
2354 assert(sstableKeyCompare(user_cmp
, begin
, smallest
) <= 0 &&
2355 sstableKeyCompare(user_cmp
, largest
, end
) <= 0);
2358 ExtendFileRangeOverlappingInterval(level
, begin
, end
, mid_index
,
2359 start_index
, end_index
);
2360 int left
= *start_index
;
2361 int right
= *end_index
;
2362 // shrink from left to right
2363 while (left
<= right
) {
2364 auto& smallest
= files
[left
].file_metadata
->smallest
;
2365 if (sstableKeyCompare(user_cmp
, begin
, smallest
) > 0) {
2369 if (left
> 0) { // If not first file
2370 auto& largest
= files
[left
- 1].file_metadata
->largest
;
2371 if (sstableKeyCompare(user_cmp
, smallest
, largest
) == 0) {
2378 // shrink from right to left
2379 while (left
<= right
) {
2380 auto& largest
= files
[right
].file_metadata
->largest
;
2381 if (sstableKeyCompare(user_cmp
, largest
, end
) > 0) {
2385 if (right
< static_cast<int>(level_files_brief_
[level
].num_files
) -
2386 1) { // If not the last file
2387 auto& smallest
= files
[right
+ 1].file_metadata
->smallest
;
2388 if (sstableKeyCompare(user_cmp
, smallest
, largest
) == 0) {
2389 // The last user key in range overlaps with the next file's first key
2397 *start_index
= left
;
2401 uint64_t VersionStorageInfo::NumLevelBytes(int level
) const {
2403 assert(level
< num_levels());
2404 return TotalFileSize(files_
[level
]);
2407 const char* VersionStorageInfo::LevelSummary(
2408 LevelSummaryStorage
* scratch
) const {
2410 if (compaction_style_
== kCompactionStyleLevel
&& num_levels() > 1) {
2411 assert(base_level_
< static_cast<int>(level_max_bytes_
.size()));
2412 if (level_multiplier_
!= 0.0) {
2414 scratch
->buffer
, sizeof(scratch
->buffer
),
2415 "base level %d level multiplier %.2f max bytes base %" PRIu64
" ",
2416 base_level_
, level_multiplier_
, level_max_bytes_
[base_level_
]);
2420 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
, "files[");
2421 for (int i
= 0; i
< num_levels(); i
++) {
2422 int sz
= sizeof(scratch
->buffer
) - len
;
2423 int ret
= snprintf(scratch
->buffer
+ len
, sz
, "%d ", int(files_
[i
].size()));
2424 if (ret
< 0 || ret
>= sz
) break;
2428 // overwrite the last space
2431 len
+= snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
,
2432 "] max score %.2f", compaction_score_
[0]);
2434 if (!files_marked_for_compaction_
.empty()) {
2435 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
,
2436 " (%" ROCKSDB_PRIszt
" files need compaction)",
2437 files_marked_for_compaction_
.size());
2440 return scratch
->buffer
;
2443 const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage
* scratch
,
2445 int len
= snprintf(scratch
->buffer
, sizeof(scratch
->buffer
), "files_size[");
2446 for (const auto& f
: files_
[level
]) {
2447 int sz
= sizeof(scratch
->buffer
) - len
;
2449 AppendHumanBytes(f
->fd
.GetFileSize(), sztxt
, sizeof(sztxt
));
2450 int ret
= snprintf(scratch
->buffer
+ len
, sz
,
2451 "#%" PRIu64
"(seq=%" PRIu64
",sz=%s,%d) ",
2452 f
->fd
.GetNumber(), f
->fd
.smallest_seqno
, sztxt
,
2453 static_cast<int>(f
->being_compacted
));
2454 if (ret
< 0 || ret
>= sz
)
2458 // overwrite the last space (only if files_[level].size() is non-zero)
2459 if (files_
[level
].size() && len
> 0) {
2462 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
, "]");
2463 return scratch
->buffer
;
2466 int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
2467 uint64_t result
= 0;
2468 std::vector
<FileMetaData
*> overlaps
;
2469 for (int level
= 1; level
< num_levels() - 1; level
++) {
2470 for (const auto& f
: files_
[level
]) {
2471 GetOverlappingInputs(level
+ 1, &f
->smallest
, &f
->largest
, &overlaps
);
2472 const uint64_t sum
= TotalFileSize(overlaps
);
2481 uint64_t VersionStorageInfo::MaxBytesForLevel(int level
) const {
2482 // Note: the result for level zero is not really used since we set
2483 // the level-0 compaction threshold based on number of files.
2485 assert(level
< static_cast<int>(level_max_bytes_
.size()));
2486 return level_max_bytes_
[level
];
2489 void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions
& ioptions
,
2490 const MutableCFOptions
& options
) {
2491 // Special logic to set number of sorted runs.
2492 // It is to match the previous behavior when all files are in L0.
2493 int num_l0_count
= static_cast<int>(files_
[0].size());
2494 if (compaction_style_
== kCompactionStyleUniversal
) {
2495 // For universal compaction, we use level0 score to indicate
2496 // compaction score for the whole DB. Adding other levels as if
2497 // they are L0 files.
2498 for (int i
= 1; i
< num_levels(); i
++) {
2499 if (!files_
[i
].empty()) {
2504 set_l0_delay_trigger_count(num_l0_count
);
2506 level_max_bytes_
.resize(ioptions
.num_levels
);
2507 if (!ioptions
.level_compaction_dynamic_level_bytes
) {
2508 base_level_
= (ioptions
.compaction_style
== kCompactionStyleLevel
) ? 1 : -1;
2510 // Calculate for static bytes base case
2511 for (int i
= 0; i
< ioptions
.num_levels
; ++i
) {
2512 if (i
== 0 && ioptions
.compaction_style
== kCompactionStyleUniversal
) {
2513 level_max_bytes_
[i
] = options
.max_bytes_for_level_base
;
2515 level_max_bytes_
[i
] = MultiplyCheckOverflow(
2516 MultiplyCheckOverflow(level_max_bytes_
[i
- 1],
2517 options
.max_bytes_for_level_multiplier
),
2518 options
.MaxBytesMultiplerAdditional(i
- 1));
2520 level_max_bytes_
[i
] = options
.max_bytes_for_level_base
;
2524 uint64_t max_level_size
= 0;
2526 int first_non_empty_level
= -1;
2527 // Find size of non-L0 level of most data.
2528 // Cannot use the size of the last level because it can be empty or less
2529 // than previous levels after compaction.
2530 for (int i
= 1; i
< num_levels_
; i
++) {
2531 uint64_t total_size
= 0;
2532 for (const auto& f
: files_
[i
]) {
2533 total_size
+= f
->fd
.GetFileSize();
2535 if (total_size
> 0 && first_non_empty_level
== -1) {
2536 first_non_empty_level
= i
;
2538 if (total_size
> max_level_size
) {
2539 max_level_size
= total_size
;
2543 // Prefill every level's max bytes to disallow compaction from there.
2544 for (int i
= 0; i
< num_levels_
; i
++) {
2545 level_max_bytes_
[i
] = std::numeric_limits
<uint64_t>::max();
2548 if (max_level_size
== 0) {
2549 // No data for L1 and up. L0 compacts to last level directly.
2550 // No compaction from L1+ needs to be scheduled.
2551 base_level_
= num_levels_
- 1;
2553 uint64_t l0_size
= 0;
2554 for (const auto& f
: files_
[0]) {
2555 l0_size
+= f
->fd
.GetFileSize();
2558 uint64_t base_bytes_max
=
2559 std::max(options
.max_bytes_for_level_base
, l0_size
);
2560 uint64_t base_bytes_min
= static_cast<uint64_t>(
2561 base_bytes_max
/ options
.max_bytes_for_level_multiplier
);
2563 // Try whether we can make last level's target size to be max_level_size
2564 uint64_t cur_level_size
= max_level_size
;
2565 for (int i
= num_levels_
- 2; i
>= first_non_empty_level
; i
--) {
2566 // Round up after dividing
2567 cur_level_size
= static_cast<uint64_t>(
2568 cur_level_size
/ options
.max_bytes_for_level_multiplier
);
2571 // Calculate base level and its size.
2572 uint64_t base_level_size
;
2573 if (cur_level_size
<= base_bytes_min
) {
2574 // Case 1. If we make target size of last level to be max_level_size,
2575 // target size of the first non-empty level would be smaller than
2576 // base_bytes_min. We set it be base_bytes_min.
2577 base_level_size
= base_bytes_min
+ 1U;
2578 base_level_
= first_non_empty_level
;
2579 ROCKS_LOG_WARN(ioptions
.info_log
,
2580 "More existing levels in DB than needed. "
2581 "max_bytes_for_level_multiplier may not be guaranteed.");
2583 // Find base level (where L0 data is compacted to).
2584 base_level_
= first_non_empty_level
;
2585 while (base_level_
> 1 && cur_level_size
> base_bytes_max
) {
2587 cur_level_size
= static_cast<uint64_t>(
2588 cur_level_size
/ options
.max_bytes_for_level_multiplier
);
2590 if (cur_level_size
> base_bytes_max
) {
2591 // Even L1 will be too large
2592 assert(base_level_
== 1);
2593 base_level_size
= base_bytes_max
;
2595 base_level_size
= cur_level_size
;
2599 level_multiplier_
= options
.max_bytes_for_level_multiplier
;
2600 assert(base_level_size
> 0);
2601 if (l0_size
> base_level_size
&&
2602 (l0_size
> options
.max_bytes_for_level_base
||
2603 static_cast<int>(files_
[0].size() / 2) >=
2604 options
.level0_file_num_compaction_trigger
)) {
2605 // We adjust the base level according to actual L0 size, and adjust
2606 // the level multiplier accordingly, when:
2607 // 1. the L0 size is larger than level size base, or
2608 // 2. number of L0 files reaches twice the L0->L1 compaction trigger
2609 // We don't do this otherwise to keep the LSM-tree structure stable
2610 // unless the L0 compation is backlogged.
2611 base_level_size
= l0_size
;
2612 if (base_level_
== num_levels_
- 1) {
2613 level_multiplier_
= 1.0;
2615 level_multiplier_
= std::pow(
2616 static_cast<double>(max_level_size
) /
2617 static_cast<double>(base_level_size
),
2618 1.0 / static_cast<double>(num_levels_
- base_level_
- 1));
2622 uint64_t level_size
= base_level_size
;
2623 for (int i
= base_level_
; i
< num_levels_
; i
++) {
2624 if (i
> base_level_
) {
2625 level_size
= MultiplyCheckOverflow(level_size
, level_multiplier_
);
2627 // Don't set any level below base_bytes_max. Otherwise, the LSM can
2628 // assume an hourglass shape where L1+ sizes are smaller than L0. This
2629 // causes compaction scoring, which depends on level sizes, to favor L1+
2630 // at the expense of L0, which may fill up and stall.
2631 level_max_bytes_
[i
] = std::max(level_size
, base_bytes_max
);
2637 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
2638 // Estimate the live data size by adding up the size of the last level for all
2639 // key ranges. Note: Estimate depends on the ordering of files in level 0
2640 // because files in level 0 can be overlapping.
2643 auto ikey_lt
= [this](InternalKey
* x
, InternalKey
* y
) {
2644 return internal_comparator_
->Compare(*x
, *y
) < 0;
2646 // (Ordered) map of largest keys in non-overlapping files
2647 std::map
<InternalKey
*, FileMetaData
*, decltype(ikey_lt
)> ranges(ikey_lt
);
2649 for (int l
= num_levels_
- 1; l
>= 0; l
--) {
2650 bool found_end
= false;
2651 for (auto file
: files_
[l
]) {
2652 // Find the first file where the largest key is larger than the smallest
2653 // key of the current file. If this file does not overlap with the
2654 // current file, none of the files in the map does. If there is
2655 // no potential overlap, we can safely insert the rest of this level
2656 // (if the level is not 0) into the map without checking again because
2657 // the elements in the level are sorted and non-overlapping.
2658 auto lb
= (found_end
&& l
!= 0) ?
2659 ranges
.end() : ranges
.lower_bound(&file
->smallest
);
2660 found_end
= (lb
== ranges
.end());
2661 if (found_end
|| internal_comparator_
->Compare(
2662 file
->largest
, (*lb
).second
->smallest
) < 0) {
2663 ranges
.emplace_hint(lb
, &file
->largest
, file
);
2664 size
+= file
->fd
.file_size
;
2671 bool VersionStorageInfo::RangeMightExistAfterSortedRun(
2672 const Slice
& smallest_user_key
, const Slice
& largest_user_key
,
2673 int last_level
, int last_l0_idx
) {
2674 assert((last_l0_idx
!= -1) == (last_level
== 0));
2675 // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
2676 // bottommost only if it's the oldest L0 file and there are no files on older
2677 // levels. It'd be better to consider it bottommost if there's no overlap in
2678 // older levels/files.
2679 if (last_level
== 0 &&
2680 last_l0_idx
!= static_cast<int>(LevelFiles(0).size() - 1)) {
2684 // Checks whether there are files living beyond the `last_level`. If lower
2685 // levels have files, it checks for overlap between [`smallest_key`,
2686 // `largest_key`] and those files. Bottomlevel optimizations can be made if
2687 // there are no files in lower levels or if there is no overlap with the files
2688 // in the lower levels.
2689 for (int level
= last_level
+ 1; level
< num_levels(); level
++) {
2690 // The range is not in the bottommost level if there are files in lower
2691 // levels when the `last_level` is 0 or if there are files in lower levels
2692 // which overlap with [`smallest_key`, `largest_key`].
2693 if (files_
[level
].size() > 0 &&
2695 OverlapInLevel(level
, &smallest_user_key
, &largest_user_key
))) {
2702 void Version::AddLiveFiles(std::vector
<FileDescriptor
>* live
) {
2703 for (int level
= 0; level
< storage_info_
.num_levels(); level
++) {
2704 const std::vector
<FileMetaData
*>& files
= storage_info_
.files_
[level
];
2705 for (const auto& file
: files
) {
2706 live
->push_back(file
->fd
);
2711 std::string
Version::DebugString(bool hex
, bool print_stats
) const {
2713 for (int level
= 0; level
< storage_info_
.num_levels_
; level
++) {
2716 // 17:123['a' .. 'd']
2717 // 20:43['e' .. 'g']
2719 // if print_stats=true:
2720 // 17:123['a' .. 'd'](4096)
2721 r
.append("--- level ");
2722 AppendNumberTo(&r
, level
);
2723 r
.append(" --- version# ");
2724 AppendNumberTo(&r
, version_number_
);
2726 const std::vector
<FileMetaData
*>& files
= storage_info_
.files_
[level
];
2727 for (size_t i
= 0; i
< files
.size(); i
++) {
2729 AppendNumberTo(&r
, files
[i
]->fd
.GetNumber());
2731 AppendNumberTo(&r
, files
[i
]->fd
.GetFileSize());
2733 r
.append(files
[i
]->smallest
.DebugString(hex
));
2735 r
.append(files
[i
]->largest
.DebugString(hex
));
2740 files
[i
]->stats
.num_reads_sampled
.load(std::memory_order_relaxed
)));
2749 // this is used to batch writes to the manifest file
2750 struct VersionSet::ManifestWriter
{
2753 InstrumentedCondVar cv
;
2754 ColumnFamilyData
* cfd
;
2755 const MutableCFOptions mutable_cf_options
;
2756 const autovector
<VersionEdit
*>& edit_list
;
2758 explicit ManifestWriter(InstrumentedMutex
* mu
, ColumnFamilyData
* _cfd
,
2759 const MutableCFOptions
& cf_options
,
2760 const autovector
<VersionEdit
*>& e
)
2764 mutable_cf_options(cf_options
),
2768 VersionSet::VersionSet(const std::string
& dbname
,
2769 const ImmutableDBOptions
* _db_options
,
2770 const EnvOptions
& storage_options
, Cache
* table_cache
,
2771 WriteBufferManager
* write_buffer_manager
,
2772 WriteController
* write_controller
)
2773 : column_family_set_(
2774 new ColumnFamilySet(dbname
, _db_options
, storage_options
, table_cache
,
2775 write_buffer_manager
, write_controller
)),
2776 env_(_db_options
->env
),
2778 db_options_(_db_options
),
2779 next_file_number_(2),
2780 manifest_file_number_(0), // Filled by Recover()
2781 options_file_number_(0),
2782 pending_manifest_file_number_(0),
2784 last_allocated_sequence_(0),
2785 last_published_sequence_(0),
2786 prev_log_number_(0),
2787 current_version_number_(0),
2788 manifest_file_size_(0),
2789 env_options_(storage_options
) {}
2791 void CloseTables(void* ptr
, size_t) {
2792 TableReader
* table_reader
= reinterpret_cast<TableReader
*>(ptr
);
2793 table_reader
->Close();
2796 VersionSet::~VersionSet() {
2797 // we need to delete column_family_set_ because its destructor depends on
2799 Cache
* table_cache
= column_family_set_
->get_table_cache();
2800 table_cache
->ApplyToAllCacheEntries(&CloseTables
, false /* thread_safe */);
2801 column_family_set_
.reset();
2802 for (auto& file
: obsolete_files_
) {
2803 if (file
.metadata
->table_reader_handle
) {
2804 table_cache
->Release(file
.metadata
->table_reader_handle
);
2805 TableCache::Evict(table_cache
, file
.metadata
->fd
.GetNumber());
2807 file
.DeleteMetadata();
2809 obsolete_files_
.clear();
2812 void VersionSet::AppendVersion(ColumnFamilyData
* column_family_data
,
2814 // compute new compaction score
2815 v
->storage_info()->ComputeCompactionScore(
2816 *column_family_data
->ioptions(),
2817 *column_family_data
->GetLatestMutableCFOptions());
2820 v
->storage_info_
.SetFinalized();
2823 assert(v
->refs_
== 0);
2824 Version
* current
= column_family_data
->current();
2825 assert(v
!= current
);
2826 if (current
!= nullptr) {
2827 assert(current
->refs_
> 0);
2830 column_family_data
->SetCurrent(v
);
2833 // Append to linked list
2834 v
->prev_
= column_family_data
->dummy_versions()->prev_
;
2835 v
->next_
= column_family_data
->dummy_versions();
2836 v
->prev_
->next_
= v
;
2837 v
->next_
->prev_
= v
;
2840 Status
VersionSet::ProcessManifestWrites(
2841 std::deque
<ManifestWriter
>& writers
, InstrumentedMutex
* mu
,
2842 Directory
* db_directory
, bool new_descriptor_log
,
2843 const ColumnFamilyOptions
* new_cf_options
) {
2844 assert(!writers
.empty());
2845 ManifestWriter
& first_writer
= writers
.front();
2846 ManifestWriter
* last_writer
= &first_writer
;
2848 assert(!manifest_writers_
.empty());
2849 assert(manifest_writers_
.front() == &first_writer
);
2851 autovector
<VersionEdit
*> batch_edits
;
2852 autovector
<Version
*> versions
;
2853 autovector
<const MutableCFOptions
*> mutable_cf_options_ptrs
;
2854 std::vector
<std::unique_ptr
<BaseReferencedVersionBuilder
>> builder_guards
;
2856 if (first_writer
.edit_list
.front()->IsColumnFamilyManipulation()) {
2857 // No group commits for column family add or drop
2858 LogAndApplyCFHelper(first_writer
.edit_list
.front());
2859 batch_edits
.push_back(first_writer
.edit_list
.front());
2861 auto it
= manifest_writers_
.cbegin();
2862 size_t group_start
= std::numeric_limits
<size_t>::max();
2863 while (it
!= manifest_writers_
.cend()) {
2864 if ((*it
)->edit_list
.front()->IsColumnFamilyManipulation()) {
2865 // no group commits for column family add or drop
2868 last_writer
= *(it
++);
2869 assert(last_writer
!= nullptr);
2870 assert(last_writer
->cfd
!= nullptr);
2871 if (last_writer
->cfd
->IsDropped()) {
2872 // If we detect a dropped CF at this point, and the corresponding
2873 // version edits belong to an atomic group, then we need to find out
2874 // the preceding version edits in the same atomic group, and update
2875 // their `remaining_entries_` member variable because we are NOT going
2876 // to write the version edits' of dropped CF to the MANIFEST. If we
2877 // don't update, then Recover can report corrupted atomic group because
2878 // the `remaining_entries_` do not match.
2879 if (!batch_edits
.empty()) {
2880 if (batch_edits
.back()->is_in_atomic_group_
&&
2881 batch_edits
.back()->remaining_entries_
> 0) {
2882 assert(group_start
< batch_edits
.size());
2883 const auto& edit_list
= last_writer
->edit_list
;
2885 while (k
< edit_list
.size()) {
2886 if (!edit_list
[k
]->is_in_atomic_group_
) {
2888 } else if (edit_list
[k
]->remaining_entries_
== 0) {
2894 for (auto i
= group_start
; i
< batch_edits
.size(); ++i
) {
2895 assert(static_cast<uint32_t>(k
) <=
2896 batch_edits
.back()->remaining_entries_
);
2897 batch_edits
[i
]->remaining_entries_
-= static_cast<uint32_t>(k
);
2903 // We do a linear search on versions because versions is small.
2904 // TODO(yanqin) maybe consider unordered_map
2905 Version
* version
= nullptr;
2906 VersionBuilder
* builder
= nullptr;
2907 for (int i
= 0; i
!= static_cast<int>(versions
.size()); ++i
) {
2908 uint32_t cf_id
= last_writer
->cfd
->GetID();
2909 if (versions
[i
]->cfd()->GetID() == cf_id
) {
2910 version
= versions
[i
];
2911 assert(!builder_guards
.empty() &&
2912 builder_guards
.size() == versions
.size());
2913 builder
= builder_guards
[i
]->version_builder();
2914 TEST_SYNC_POINT_CALLBACK(
2915 "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id
);
2919 if (version
== nullptr) {
2920 version
= new Version(last_writer
->cfd
, this, env_options_
,
2921 last_writer
->mutable_cf_options
,
2922 current_version_number_
++);
2923 versions
.push_back(version
);
2924 mutable_cf_options_ptrs
.push_back(&last_writer
->mutable_cf_options
);
2925 builder_guards
.emplace_back(
2926 new BaseReferencedVersionBuilder(last_writer
->cfd
));
2927 builder
= builder_guards
.back()->version_builder();
2929 assert(builder
!= nullptr); // make checker happy
2930 for (const auto& e
: last_writer
->edit_list
) {
2931 if (e
->is_in_atomic_group_
) {
2932 if (batch_edits
.empty() || !batch_edits
.back()->is_in_atomic_group_
||
2933 (batch_edits
.back()->is_in_atomic_group_
&&
2934 batch_edits
.back()->remaining_entries_
== 0)) {
2935 group_start
= batch_edits
.size();
2937 } else if (group_start
!= std::numeric_limits
<size_t>::max()) {
2938 group_start
= std::numeric_limits
<size_t>::max();
2940 LogAndApplyHelper(last_writer
->cfd
, builder
, e
, mu
);
2941 batch_edits
.push_back(e
);
2944 for (int i
= 0; i
< static_cast<int>(versions
.size()); ++i
) {
2945 assert(!builder_guards
.empty() &&
2946 builder_guards
.size() == versions
.size());
2947 auto* builder
= builder_guards
[i
]->version_builder();
2948 builder
->SaveTo(versions
[i
]->storage_info());
2953 // Verify that version edits of atomic groups have correct
2954 // remaining_entries_.
2956 while (k
< batch_edits
.size()) {
2957 while (k
< batch_edits
.size() && !batch_edits
[k
]->is_in_atomic_group_
) {
2960 if (k
== batch_edits
.size()) {
2964 while (i
< batch_edits
.size()) {
2965 if (!batch_edits
[i
]->is_in_atomic_group_
) {
2968 assert(i
- k
+ batch_edits
[i
]->remaining_entries_
==
2969 batch_edits
[k
]->remaining_entries_
);
2970 if (batch_edits
[i
]->remaining_entries_
== 0) {
2976 assert(batch_edits
[i
- 1]->is_in_atomic_group_
);
2977 assert(0 == batch_edits
[i
- 1]->remaining_entries_
);
2978 std::vector
<VersionEdit
*> tmp
;
2979 for (size_t j
= k
; j
!= i
; ++j
) {
2980 tmp
.emplace_back(batch_edits
[j
]);
2982 TEST_SYNC_POINT_CALLBACK(
2983 "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp
);
2988 uint64_t new_manifest_file_size
= 0;
2991 assert(pending_manifest_file_number_
== 0);
2992 if (!descriptor_log_
||
2993 manifest_file_size_
> db_options_
->max_manifest_file_size
) {
2994 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
2995 pending_manifest_file_number_
= NewFileNumber();
2996 batch_edits
.back()->SetNextFile(next_file_number_
.load());
2997 new_descriptor_log
= true;
2999 pending_manifest_file_number_
= manifest_file_number_
;
3002 if (new_descriptor_log
) {
3003 // if we are writing out new snapshot make sure to persist max column
3005 if (column_family_set_
->GetMaxColumnFamily() > 0) {
3006 first_writer
.edit_list
.front()->SetMaxColumnFamily(
3007 column_family_set_
->GetMaxColumnFamily());
3012 EnvOptions opt_env_opts
= env_
->OptimizeForManifestWrite(env_options_
);
3015 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
3016 if (!first_writer
.edit_list
.front()->IsColumnFamilyManipulation()) {
3017 for (int i
= 0; i
< static_cast<int>(versions
.size()); ++i
) {
3018 assert(!builder_guards
.empty() &&
3019 builder_guards
.size() == versions
.size());
3020 assert(!mutable_cf_options_ptrs
.empty() &&
3021 builder_guards
.size() == versions
.size());
3022 ColumnFamilyData
* cfd
= versions
[i
]->cfd_
;
3023 builder_guards
[i
]->version_builder()->LoadTableHandlers(
3024 cfd
->internal_stats(), cfd
->ioptions()->optimize_filters_for_hits
,
3025 true /* prefetch_index_and_filter_in_cache */,
3026 false /* is_initial_load */,
3027 mutable_cf_options_ptrs
[i
]->prefix_extractor
.get());
3031 // This is fine because everything inside of this block is serialized --
3032 // only one thread can be here at the same time
3033 if (new_descriptor_log
) {
3034 // create new manifest file
3035 ROCKS_LOG_INFO(db_options_
->info_log
, "Creating manifest %" PRIu64
"\n",
3036 pending_manifest_file_number_
);
3037 std::string descriptor_fname
=
3038 DescriptorFileName(dbname_
, pending_manifest_file_number_
);
3039 std::unique_ptr
<WritableFile
> descriptor_file
;
3040 s
= NewWritableFile(env_
, descriptor_fname
, &descriptor_file
,
3043 descriptor_file
->SetPreallocationBlockSize(
3044 db_options_
->manifest_preallocation_size
);
3046 std::unique_ptr
<WritableFileWriter
> file_writer(new WritableFileWriter(
3047 std::move(descriptor_file
), descriptor_fname
, opt_env_opts
, env_
,
3048 nullptr, db_options_
->listeners
));
3049 descriptor_log_
.reset(
3050 new log::Writer(std::move(file_writer
), 0, false));
3051 s
= WriteSnapshot(descriptor_log_
.get());
3055 if (!first_writer
.edit_list
.front()->IsColumnFamilyManipulation()) {
3056 for (int i
= 0; i
< static_cast<int>(versions
.size()); ++i
) {
3057 versions
[i
]->PrepareApply(*mutable_cf_options_ptrs
[i
], true);
3061 // Write new records to MANIFEST log
3066 for (auto& e
: batch_edits
) {
3068 if (!e
->EncodeTo(&record
)) {
3069 s
= Status::Corruption("Unable to encode VersionEdit:" +
3070 e
->DebugString(true));
3073 TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
3074 rocksdb_kill_odds
* REDUCE_ODDS2
);
3076 if (batch_edits
.size() > 1 && batch_edits
.size() - 1 == idx
) {
3078 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0");
3080 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
3083 #endif /* !NDEBUG */
3084 s
= descriptor_log_
->AddRecord(record
);
3090 s
= SyncManifest(env_
, db_options_
, descriptor_log_
->file());
3093 ROCKS_LOG_ERROR(db_options_
->info_log
, "MANIFEST write %s\n",
3094 s
.ToString().c_str());
3098 // If we just created a new descriptor file, install it by writing a
3099 // new CURRENT file that points to it.
3100 if (s
.ok() && new_descriptor_log
) {
3101 s
= SetCurrentFile(env_
, dbname_
, pending_manifest_file_number_
,
3103 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
3107 // find offset in manifest file where this version is stored.
3108 new_manifest_file_size
= descriptor_log_
->file()->GetFileSize();
3111 if (first_writer
.edit_list
.front()->is_column_family_drop_
) {
3112 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
3113 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
3114 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
3117 LogFlush(db_options_
->info_log
);
3118 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
3122 // Append the old manifest file to the obsolete_manifest_ list to be deleted
3123 // by PurgeObsoleteFiles later.
3124 if (s
.ok() && new_descriptor_log
) {
3125 obsolete_manifests_
.emplace_back(
3126 DescriptorFileName("", manifest_file_number_
));
3129 // Install the new versions
3131 if (first_writer
.edit_list
.front()->is_column_family_add_
) {
3132 assert(batch_edits
.size() == 1);
3133 assert(new_cf_options
!= nullptr);
3134 CreateColumnFamily(*new_cf_options
, first_writer
.edit_list
.front());
3135 } else if (first_writer
.edit_list
.front()->is_column_family_drop_
) {
3136 assert(batch_edits
.size() == 1);
3137 first_writer
.cfd
->SetDropped();
3138 if (first_writer
.cfd
->Unref()) {
3139 delete first_writer
.cfd
;
3142 // Each version in versions corresponds to a column family.
3143 // For each column family, update its log number indicating that logs
3144 // with number smaller than this should be ignored.
3145 for (const auto version
: versions
) {
3146 uint64_t max_log_number_in_batch
= 0;
3147 uint32_t cf_id
= version
->cfd_
->GetID();
3148 for (const auto& e
: batch_edits
) {
3149 if (e
->has_log_number_
&& e
->column_family_
== cf_id
) {
3150 max_log_number_in_batch
=
3151 std::max(max_log_number_in_batch
, e
->log_number_
);
3154 if (max_log_number_in_batch
!= 0) {
3155 assert(version
->cfd_
->GetLogNumber() <= max_log_number_in_batch
);
3156 version
->cfd_
->SetLogNumber(max_log_number_in_batch
);
3160 uint64_t last_min_log_number_to_keep
= 0;
3161 for (auto& e
: batch_edits
) {
3162 if (e
->has_min_log_number_to_keep_
) {
3163 last_min_log_number_to_keep
=
3164 std::max(last_min_log_number_to_keep
, e
->min_log_number_to_keep_
);
3168 if (last_min_log_number_to_keep
!= 0) {
3169 // Should only be set in 2PC mode.
3170 MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep
);
3173 for (int i
= 0; i
< static_cast<int>(versions
.size()); ++i
) {
3174 ColumnFamilyData
* cfd
= versions
[i
]->cfd_
;
3175 AppendVersion(cfd
, versions
[i
]);
3178 manifest_file_number_
= pending_manifest_file_number_
;
3179 manifest_file_size_
= new_manifest_file_size
;
3180 prev_log_number_
= first_writer
.edit_list
.front()->prev_log_number_
;
3182 std::string version_edits
;
3183 for (auto& e
: batch_edits
) {
3184 version_edits
+= ("\n" + e
->DebugString(true));
3186 ROCKS_LOG_ERROR(db_options_
->info_log
,
3187 "Error in committing version edit to MANIFEST: %s",
3188 version_edits
.c_str());
3189 for (auto v
: versions
) {
3192 if (new_descriptor_log
) {
3193 ROCKS_LOG_INFO(db_options_
->info_log
,
3194 "Deleting manifest %" PRIu64
" current manifest %" PRIu64
3196 manifest_file_number_
, pending_manifest_file_number_
);
3197 descriptor_log_
.reset();
3199 DescriptorFileName(dbname_
, pending_manifest_file_number_
));
3203 pending_manifest_file_number_
= 0;
3205 // wake up all the waiting writers
3207 ManifestWriter
* ready
= manifest_writers_
.front();
3208 manifest_writers_
.pop_front();
3209 bool need_signal
= true;
3210 for (const auto& w
: writers
) {
3212 need_signal
= false;
3221 if (ready
== last_writer
) {
3225 if (!manifest_writers_
.empty()) {
3226 manifest_writers_
.front()->cv
.Signal();
3231 // 'datas' is gramatically incorrect. We still use this notation to indicate
3232 // that this variable represents a collection of column_family_data.
3233 Status
VersionSet::LogAndApply(
3234 const autovector
<ColumnFamilyData
*>& column_family_datas
,
3235 const autovector
<const MutableCFOptions
*>& mutable_cf_options_list
,
3236 const autovector
<autovector
<VersionEdit
*>>& edit_lists
,
3237 InstrumentedMutex
* mu
, Directory
* db_directory
, bool new_descriptor_log
,
3238 const ColumnFamilyOptions
* new_cf_options
) {
3241 for (const auto& elist
: edit_lists
) {
3242 num_edits
+= static_cast<int>(elist
.size());
3244 if (num_edits
== 0) {
3245 return Status::OK();
3246 } else if (num_edits
> 1) {
3248 for (const auto& edit_list
: edit_lists
) {
3249 for (const auto& edit
: edit_list
) {
3250 assert(!edit
->IsColumnFamilyManipulation());
3253 #endif /* ! NDEBUG */
3256 int num_cfds
= static_cast<int>(column_family_datas
.size());
3257 if (num_cfds
== 1 && column_family_datas
[0] == nullptr) {
3258 assert(edit_lists
.size() == 1 && edit_lists
[0].size() == 1);
3259 assert(edit_lists
[0][0]->is_column_family_add_
);
3260 assert(new_cf_options
!= nullptr);
3262 std::deque
<ManifestWriter
> writers
;
3264 assert(static_cast<size_t>(num_cfds
) == mutable_cf_options_list
.size());
3265 assert(static_cast<size_t>(num_cfds
) == edit_lists
.size());
3267 for (int i
= 0; i
< num_cfds
; ++i
) {
3268 writers
.emplace_back(mu
, column_family_datas
[i
],
3269 *mutable_cf_options_list
[i
], edit_lists
[i
]);
3270 manifest_writers_
.push_back(&writers
[i
]);
3272 assert(!writers
.empty());
3273 ManifestWriter
& first_writer
= writers
.front();
3274 while (!first_writer
.done
&& &first_writer
!= manifest_writers_
.front()) {
3275 first_writer
.cv
.Wait();
3277 if (first_writer
.done
) {
3278 // All non-CF-manipulation operations can be grouped together and committed
3279 // to MANIFEST. They should all have finished. The status code is stored in
3280 // the first manifest writer.
3282 for (const auto& writer
: writers
) {
3283 assert(writer
.done
);
3285 #endif /* !NDEBUG */
3286 return first_writer
.status
;
3289 int num_undropped_cfds
= 0;
3290 for (auto cfd
: column_family_datas
) {
3291 // if cfd == nullptr, it is a column family add.
3292 if (cfd
== nullptr || !cfd
->IsDropped()) {
3293 ++num_undropped_cfds
;
3296 if (0 == num_undropped_cfds
) {
3297 // TODO (yanqin) maybe use a different status code to denote column family
3298 // drop other than OK and ShutdownInProgress
3299 for (int i
= 0; i
!= num_cfds
; ++i
) {
3300 manifest_writers_
.pop_front();
3302 // Notify new head of manifest write queue.
3303 if (!manifest_writers_
.empty()) {
3304 manifest_writers_
.front()->cv
.Signal();
3306 return Status::ShutdownInProgress();
3309 return ProcessManifestWrites(writers
, mu
, db_directory
, new_descriptor_log
,
3313 void VersionSet::LogAndApplyCFHelper(VersionEdit
* edit
) {
3314 assert(edit
->IsColumnFamilyManipulation());
3315 edit
->SetNextFile(next_file_number_
.load());
3316 // The log might have data that is not visible to memtbale and hence have not
3317 // updated the last_sequence_ yet. It is also possible that the log has is
3318 // expecting some new data that is not written yet. Since LastSequence is an
3319 // upper bound on the sequence, it is ok to record
3320 // last_allocated_sequence_ as the last sequence.
3321 edit
->SetLastSequence(db_options_
->two_write_queues
? last_allocated_sequence_
3323 if (edit
->is_column_family_drop_
) {
3324 // if we drop column family, we have to make sure to save max column family,
3325 // so that we don't reuse existing ID
3326 edit
->SetMaxColumnFamily(column_family_set_
->GetMaxColumnFamily());
3330 void VersionSet::LogAndApplyHelper(ColumnFamilyData
* cfd
,
3331 VersionBuilder
* builder
, VersionEdit
* edit
,
3332 InstrumentedMutex
* mu
) {
3337 assert(!edit
->IsColumnFamilyManipulation());
3339 if (edit
->has_log_number_
) {
3340 assert(edit
->log_number_
>= cfd
->GetLogNumber());
3341 assert(edit
->log_number_
< next_file_number_
.load());
3344 if (!edit
->has_prev_log_number_
) {
3345 edit
->SetPrevLogNumber(prev_log_number_
);
3347 edit
->SetNextFile(next_file_number_
.load());
3348 // The log might have data that is not visible to memtbale and hence have not
3349 // updated the last_sequence_ yet. It is also possible that the log has is
3350 // expecting some new data that is not written yet. Since LastSequence is an
3351 // upper bound on the sequence, it is ok to record
3352 // last_allocated_sequence_ as the last sequence.
3353 edit
->SetLastSequence(db_options_
->two_write_queues
? last_allocated_sequence_
3356 builder
->Apply(edit
);
3359 Status
VersionSet::ApplyOneVersionEditToBuilder(
3361 const std::unordered_map
<std::string
, ColumnFamilyOptions
>& name_to_options
,
3362 std::unordered_map
<int, std::string
>& column_families_not_found
,
3363 std::unordered_map
<uint32_t, std::unique_ptr
<BaseReferencedVersionBuilder
>>&
3365 bool* have_log_number
, uint64_t* log_number
, bool* have_prev_log_number
,
3366 uint64_t* previous_log_number
, bool* have_next_file
, uint64_t* next_file
,
3367 bool* have_last_sequence
, SequenceNumber
* last_sequence
,
3368 uint64_t* min_log_number_to_keep
, uint32_t* max_column_family
) {
3369 // Not found means that user didn't supply that column
3370 // family option AND we encountered column family add
3371 // record. Once we encounter column family drop record,
3372 // we will delete the column family from
3373 // column_families_not_found.
3374 bool cf_in_not_found
= (column_families_not_found
.find(edit
.column_family_
) !=
3375 column_families_not_found
.end());
3376 // in builders means that user supplied that column family
3377 // option AND that we encountered column family add record
3378 bool cf_in_builders
= builders
.find(edit
.column_family_
) != builders
.end();
3380 // they can't both be true
3381 assert(!(cf_in_not_found
&& cf_in_builders
));
3383 ColumnFamilyData
* cfd
= nullptr;
3385 if (edit
.is_column_family_add_
) {
3386 if (cf_in_builders
|| cf_in_not_found
) {
3387 return Status::Corruption(
3388 "Manifest adding the same column family twice: " +
3389 edit
.column_family_name_
);
3391 auto cf_options
= name_to_options
.find(edit
.column_family_name_
);
3392 if (cf_options
== name_to_options
.end()) {
3393 column_families_not_found
.insert(
3394 {edit
.column_family_
, edit
.column_family_name_
});
3396 cfd
= CreateColumnFamily(cf_options
->second
, &edit
);
3397 cfd
->set_initialized();
3398 builders
.insert(std::make_pair(
3399 edit
.column_family_
, std::unique_ptr
<BaseReferencedVersionBuilder
>(
3400 new BaseReferencedVersionBuilder(cfd
))));
3402 } else if (edit
.is_column_family_drop_
) {
3403 if (cf_in_builders
) {
3404 auto builder
= builders
.find(edit
.column_family_
);
3405 assert(builder
!= builders
.end());
3406 builders
.erase(builder
);
3407 cfd
= column_family_set_
->GetColumnFamily(edit
.column_family_
);
3408 assert(cfd
!= nullptr);
3413 // who else can have reference to cfd!?
3416 } else if (cf_in_not_found
) {
3417 column_families_not_found
.erase(edit
.column_family_
);
3419 return Status::Corruption(
3420 "Manifest - dropping non-existing column family");
3422 } else if (!cf_in_not_found
) {
3423 if (!cf_in_builders
) {
3424 return Status::Corruption(
3425 "Manifest record referencing unknown column family");
3428 cfd
= column_family_set_
->GetColumnFamily(edit
.column_family_
);
3429 // this should never happen since cf_in_builders is true
3430 assert(cfd
!= nullptr);
3432 // if it is not column family add or column family drop,
3433 // then it's a file add/delete, which should be forwarded
3435 auto builder
= builders
.find(edit
.column_family_
);
3436 assert(builder
!= builders
.end());
3437 builder
->second
->version_builder()->Apply(&edit
);
3439 return ExtractInfoFromVersionEdit(
3440 cfd
, edit
, have_log_number
, log_number
, have_prev_log_number
,
3441 previous_log_number
, have_next_file
, next_file
, have_last_sequence
,
3442 last_sequence
, min_log_number_to_keep
, max_column_family
);
3445 Status
VersionSet::ExtractInfoFromVersionEdit(
3446 ColumnFamilyData
* cfd
, const VersionEdit
& edit
, bool* have_log_number
,
3447 uint64_t* log_number
, bool* have_prev_log_number
,
3448 uint64_t* previous_log_number
, bool* have_next_file
, uint64_t* next_file
,
3449 bool* have_last_sequence
, SequenceNumber
* last_sequence
,
3450 uint64_t* min_log_number_to_keep
, uint32_t* max_column_family
) {
3451 if (cfd
!= nullptr) {
3452 if (edit
.has_log_number_
) {
3453 if (cfd
->GetLogNumber() > edit
.log_number_
) {
3455 db_options_
->info_log
,
3456 "MANIFEST corruption detected, but ignored - Log numbers in "
3457 "records NOT monotonically increasing");
3459 cfd
->SetLogNumber(edit
.log_number_
);
3460 *have_log_number
= true;
3461 *log_number
= edit
.log_number_
;
3464 if (edit
.has_comparator_
&&
3465 edit
.comparator_
!= cfd
->user_comparator()->Name()) {
3466 return Status::InvalidArgument(
3467 cfd
->user_comparator()->Name(),
3468 "does not match existing comparator " + edit
.comparator_
);
3472 if (edit
.has_prev_log_number_
) {
3473 *previous_log_number
= edit
.prev_log_number_
;
3474 *have_prev_log_number
= true;
3477 if (edit
.has_next_file_number_
) {
3478 *next_file
= edit
.next_file_number_
;
3479 *have_next_file
= true;
3482 if (edit
.has_max_column_family_
) {
3483 *max_column_family
= edit
.max_column_family_
;
3486 if (edit
.has_min_log_number_to_keep_
) {
3487 *min_log_number_to_keep
=
3488 std::max(*min_log_number_to_keep
, edit
.min_log_number_to_keep_
);
3491 if (edit
.has_last_sequence_
) {
3492 *last_sequence
= edit
.last_sequence_
;
3493 *have_last_sequence
= true;
3495 return Status::OK();
3498 Status
VersionSet::GetCurrentManifestPath(std::string
* manifest_path
) {
3499 assert(manifest_path
!= nullptr);
3501 Status s
= ReadFileToString(env_
, CurrentFileName(dbname_
), &fname
);
3505 if (fname
.empty() || fname
.back() != '\n') {
3506 return Status::Corruption("CURRENT file does not end with newline");
3508 // remove the trailing '\n'
3509 fname
.resize(fname
.size() - 1);
3511 bool parse_ok
= ParseFileName(fname
, &manifest_file_number_
, &type
);
3512 if (!parse_ok
|| type
!= kDescriptorFile
) {
3513 return Status::Corruption("CURRENT file corrupted");
3515 *manifest_path
= dbname_
;
3516 if (dbname_
.back() != '/') {
3517 manifest_path
->push_back('/');
3519 *manifest_path
+= fname
;
3520 return Status::OK();
3523 Status
VersionSet::Recover(
3524 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
3526 std::unordered_map
<std::string
, ColumnFamilyOptions
> cf_name_to_options
;
3527 for (auto cf
: column_families
) {
3528 cf_name_to_options
.insert({cf
.name
, cf
.options
});
3530 // keeps track of column families in manifest that were not found in
3531 // column families parameters. if those column families are not dropped
3532 // by subsequent manifest records, Recover() will return failure status
3533 std::unordered_map
<int, std::string
> column_families_not_found
;
3535 // Read "CURRENT" file, which contains a pointer to the current manifest file
3536 std::string manifest_path
;
3537 Status s
= GetCurrentManifestPath(&manifest_path
);
3542 ROCKS_LOG_INFO(db_options_
->info_log
, "Recovering from manifest file: %s\n",
3543 manifest_path
.c_str());
3545 std::unique_ptr
<SequentialFileReader
> manifest_file_reader
;
3547 std::unique_ptr
<SequentialFile
> manifest_file
;
3548 s
= env_
->NewSequentialFile(manifest_path
, &manifest_file
,
3549 env_
->OptimizeForManifestRead(env_options_
));
3553 manifest_file_reader
.reset(
3554 new SequentialFileReader(std::move(manifest_file
), manifest_path
));
3556 uint64_t current_manifest_file_size
;
3557 s
= env_
->GetFileSize(manifest_path
, ¤t_manifest_file_size
);
3562 bool have_log_number
= false;
3563 bool have_prev_log_number
= false;
3564 bool have_next_file
= false;
3565 bool have_last_sequence
= false;
3566 uint64_t next_file
= 0;
3567 uint64_t last_sequence
= 0;
3568 uint64_t log_number
= 0;
3569 uint64_t previous_log_number
= 0;
3570 uint32_t max_column_family
= 0;
3571 uint64_t min_log_number_to_keep
= 0;
3572 std::unordered_map
<uint32_t, std::unique_ptr
<BaseReferencedVersionBuilder
>>
3575 // add default column family
3576 auto default_cf_iter
= cf_name_to_options
.find(kDefaultColumnFamilyName
);
3577 if (default_cf_iter
== cf_name_to_options
.end()) {
3578 return Status::InvalidArgument("Default column family not specified");
3580 VersionEdit default_cf_edit
;
3581 default_cf_edit
.AddColumnFamily(kDefaultColumnFamilyName
);
3582 default_cf_edit
.SetColumnFamily(0);
3583 ColumnFamilyData
* default_cfd
=
3584 CreateColumnFamily(default_cf_iter
->second
, &default_cf_edit
);
3585 // In recovery, nobody else can access it, so it's fine to set it to be
3586 // initialized earlier.
3587 default_cfd
->set_initialized();
3589 std::make_pair(0, std::unique_ptr
<BaseReferencedVersionBuilder
>(
3590 new BaseReferencedVersionBuilder(default_cfd
))));
3593 VersionSet::LogReporter reporter
;
3594 reporter
.status
= &s
;
3595 log::Reader
reader(nullptr, std::move(manifest_file_reader
), &reporter
,
3596 true /* checksum */, 0 /* log_number */);
3598 std::string scratch
;
3599 std::vector
<VersionEdit
> replay_buffer
;
3600 size_t num_entries_decoded
= 0;
3601 while (reader
.ReadRecord(&record
, &scratch
) && s
.ok()) {
3603 s
= edit
.DecodeFrom(record
);
3608 if (edit
.is_in_atomic_group_
) {
3609 if (replay_buffer
.empty()) {
3610 replay_buffer
.resize(edit
.remaining_entries_
+ 1);
3611 TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:FirstInAtomicGroup",
3614 ++num_entries_decoded
;
3615 if (num_entries_decoded
+ edit
.remaining_entries_
!=
3616 static_cast<uint32_t>(replay_buffer
.size())) {
3617 TEST_SYNC_POINT_CALLBACK(
3618 "VersionSet::Recover:IncorrectAtomicGroupSize", &edit
);
3619 s
= Status::Corruption("corrupted atomic group");
3622 replay_buffer
[num_entries_decoded
- 1] = std::move(edit
);
3623 if (num_entries_decoded
== replay_buffer
.size()) {
3624 TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup",
3626 for (auto& e
: replay_buffer
) {
3627 s
= ApplyOneVersionEditToBuilder(
3628 e
, cf_name_to_options
, column_families_not_found
, builders
,
3629 &have_log_number
, &log_number
, &have_prev_log_number
,
3630 &previous_log_number
, &have_next_file
, &next_file
,
3631 &have_last_sequence
, &last_sequence
, &min_log_number_to_keep
,
3632 &max_column_family
);
3637 replay_buffer
.clear();
3638 num_entries_decoded
= 0;
3640 TEST_SYNC_POINT("VersionSet::Recover:AtomicGroup");
3642 if (!replay_buffer
.empty()) {
3643 TEST_SYNC_POINT_CALLBACK(
3644 "VersionSet::Recover:AtomicGroupMixedWithNormalEdits", &edit
);
3645 s
= Status::Corruption("corrupted atomic group");
3648 s
= ApplyOneVersionEditToBuilder(
3649 edit
, cf_name_to_options
, column_families_not_found
, builders
,
3650 &have_log_number
, &log_number
, &have_prev_log_number
,
3651 &previous_log_number
, &have_next_file
, &next_file
,
3652 &have_last_sequence
, &last_sequence
, &min_log_number_to_keep
,
3653 &max_column_family
);
3662 if (!have_next_file
) {
3663 s
= Status::Corruption("no meta-nextfile entry in descriptor");
3664 } else if (!have_log_number
) {
3665 s
= Status::Corruption("no meta-lognumber entry in descriptor");
3666 } else if (!have_last_sequence
) {
3667 s
= Status::Corruption("no last-sequence-number entry in descriptor");
3670 if (!have_prev_log_number
) {
3671 previous_log_number
= 0;
3674 column_family_set_
->UpdateMaxColumnFamily(max_column_family
);
3676 // When reading DB generated using old release, min_log_number_to_keep=0.
3677 // All log files will be scanned for potential prepare entries.
3678 MarkMinLogNumberToKeep2PC(min_log_number_to_keep
);
3679 MarkFileNumberUsed(previous_log_number
);
3680 MarkFileNumberUsed(log_number
);
3683 // there were some column families in the MANIFEST that weren't specified
3684 // in the argument. This is OK in read_only mode
3685 if (read_only
== false && !column_families_not_found
.empty()) {
3686 std::string list_of_not_found
;
3687 for (const auto& cf
: column_families_not_found
) {
3688 list_of_not_found
+= ", " + cf
.second
;
3690 list_of_not_found
= list_of_not_found
.substr(2);
3691 s
= Status::InvalidArgument(
3692 "You have to open all column families. Column families not opened: " +
3697 for (auto cfd
: *column_family_set_
) {
3698 assert(builders
.count(cfd
->GetID()) > 0);
3699 auto* builder
= builders
[cfd
->GetID()]->version_builder();
3700 if (!builder
->CheckConsistencyForNumLevels()) {
3701 s
= Status::InvalidArgument(
3702 "db has more levels than options.num_levels");
3709 for (auto cfd
: *column_family_set_
) {
3710 if (cfd
->IsDropped()) {
3714 cfd
->table_cache()->SetTablesAreImmortal();
3716 assert(cfd
->initialized());
3717 auto builders_iter
= builders
.find(cfd
->GetID());
3718 assert(builders_iter
!= builders
.end());
3719 auto builder
= builders_iter
->second
->version_builder();
3721 // unlimited table cache. Pre-load table handle now.
3722 // Need to do it out of the mutex.
3723 builder
->LoadTableHandlers(
3724 cfd
->internal_stats(), db_options_
->max_file_opening_threads
,
3725 false /* prefetch_index_and_filter_in_cache */,
3726 true /* is_initial_load */,
3727 cfd
->GetLatestMutableCFOptions()->prefix_extractor
.get());
3729 Version
* v
= new Version(cfd
, this, env_options_
,
3730 *cfd
->GetLatestMutableCFOptions(),
3731 current_version_number_
++);
3732 builder
->SaveTo(v
->storage_info());
3734 // Install recovered version
3735 v
->PrepareApply(*cfd
->GetLatestMutableCFOptions(),
3736 !(db_options_
->skip_stats_update_on_db_open
));
3737 AppendVersion(cfd
, v
);
3740 manifest_file_size_
= current_manifest_file_size
;
3741 next_file_number_
.store(next_file
+ 1);
3742 last_allocated_sequence_
= last_sequence
;
3743 last_published_sequence_
= last_sequence
;
3744 last_sequence_
= last_sequence
;
3745 prev_log_number_
= previous_log_number
;
3748 db_options_
->info_log
,
3749 "Recovered from manifest file:%s succeeded,"
3750 "manifest_file_number is %" PRIu64
", next_file_number is %" PRIu64
3751 ", last_sequence is %" PRIu64
", log_number is %" PRIu64
3752 ",prev_log_number is %" PRIu64
",max_column_family is %" PRIu32
3753 ",min_log_number_to_keep is %" PRIu64
"\n",
3754 manifest_path
.c_str(), manifest_file_number_
,
3755 next_file_number_
.load(), last_sequence_
.load(), log_number
,
3756 prev_log_number_
, column_family_set_
->GetMaxColumnFamily(),
3757 min_log_number_to_keep_2pc());
3759 for (auto cfd
: *column_family_set_
) {
3760 if (cfd
->IsDropped()) {
3763 ROCKS_LOG_INFO(db_options_
->info_log
,
3764 "Column family [%s] (ID %" PRIu32
3765 "), log number is %" PRIu64
"\n",
3766 cfd
->GetName().c_str(), cfd
->GetID(), cfd
->GetLogNumber());
3773 Status
VersionSet::ListColumnFamilies(std::vector
<std::string
>* column_families
,
3774 const std::string
& dbname
, Env
* env
) {
3775 // these are just for performance reasons, not correcntes,
3776 // so we're fine using the defaults
3777 EnvOptions soptions
;
3778 // Read "CURRENT" file, which contains a pointer to the current manifest file
3779 std::string current
;
3780 Status s
= ReadFileToString(env
, CurrentFileName(dbname
), ¤t
);
3784 if (current
.empty() || current
[current
.size()-1] != '\n') {
3785 return Status::Corruption("CURRENT file does not end with newline");
3787 current
.resize(current
.size() - 1);
3789 std::string dscname
= dbname
+ "/" + current
;
3791 std::unique_ptr
<SequentialFileReader
> file_reader
;
3793 std::unique_ptr
<SequentialFile
> file
;
3794 s
= env
->NewSequentialFile(dscname
, &file
, soptions
);
3798 file_reader
.reset(new SequentialFileReader(std::move(file
), dscname
));
3801 std::map
<uint32_t, std::string
> column_family_names
;
3802 // default column family is always implicitly there
3803 column_family_names
.insert({0, kDefaultColumnFamilyName
});
3804 VersionSet::LogReporter reporter
;
3805 reporter
.status
= &s
;
3806 log::Reader
reader(nullptr, std::move(file_reader
), &reporter
,
3807 true /* checksum */, 0 /* log_number */);
3809 std::string scratch
;
3810 while (reader
.ReadRecord(&record
, &scratch
) && s
.ok()) {
3812 s
= edit
.DecodeFrom(record
);
3816 if (edit
.is_column_family_add_
) {
3817 if (column_family_names
.find(edit
.column_family_
) !=
3818 column_family_names
.end()) {
3819 s
= Status::Corruption("Manifest adding the same column family twice");
3822 column_family_names
.insert(
3823 {edit
.column_family_
, edit
.column_family_name_
});
3824 } else if (edit
.is_column_family_drop_
) {
3825 if (column_family_names
.find(edit
.column_family_
) ==
3826 column_family_names
.end()) {
3827 s
= Status::Corruption(
3828 "Manifest - dropping non-existing column family");
3831 column_family_names
.erase(edit
.column_family_
);
3835 column_families
->clear();
3837 for (const auto& iter
: column_family_names
) {
3838 column_families
->push_back(iter
.second
);
3845 #ifndef ROCKSDB_LITE
3846 Status
VersionSet::ReduceNumberOfLevels(const std::string
& dbname
,
3847 const Options
* options
,
3848 const EnvOptions
& env_options
,
3850 if (new_levels
<= 1) {
3851 return Status::InvalidArgument(
3852 "Number of levels needs to be bigger than 1");
3855 ImmutableDBOptions
db_options(*options
);
3856 ColumnFamilyOptions
cf_options(*options
);
3857 std::shared_ptr
<Cache
> tc(NewLRUCache(options
->max_open_files
- 10,
3858 options
->table_cache_numshardbits
));
3859 WriteController
wc(options
->delayed_write_rate
);
3860 WriteBufferManager
wb(options
->db_write_buffer_size
);
3861 VersionSet
versions(dbname
, &db_options
, env_options
, tc
.get(), &wb
, &wc
);
3864 std::vector
<ColumnFamilyDescriptor
> dummy
;
3865 ColumnFamilyDescriptor
dummy_descriptor(kDefaultColumnFamilyName
,
3866 ColumnFamilyOptions(*options
));
3867 dummy
.push_back(dummy_descriptor
);
3868 status
= versions
.Recover(dummy
);
3873 Version
* current_version
=
3874 versions
.GetColumnFamilySet()->GetDefault()->current();
3875 auto* vstorage
= current_version
->storage_info();
3876 int current_levels
= vstorage
->num_levels();
3878 if (current_levels
<= new_levels
) {
3879 return Status::OK();
3882 // Make sure there are file only on one level from
3883 // (new_levels-1) to (current_levels-1)
3884 int first_nonempty_level
= -1;
3885 int first_nonempty_level_filenum
= 0;
3886 for (int i
= new_levels
- 1; i
< current_levels
; i
++) {
3887 int file_num
= vstorage
->NumLevelFiles(i
);
3888 if (file_num
!= 0) {
3889 if (first_nonempty_level
< 0) {
3890 first_nonempty_level
= i
;
3891 first_nonempty_level_filenum
= file_num
;
3894 snprintf(msg
, sizeof(msg
),
3895 "Found at least two levels containing files: "
3896 "[%d:%d],[%d:%d].\n",
3897 first_nonempty_level
, first_nonempty_level_filenum
, i
,
3899 return Status::InvalidArgument(msg
);
3904 // we need to allocate an array with the old number of levels size to
3905 // avoid SIGSEGV in WriteSnapshot()
3906 // however, all levels bigger or equal to new_levels will be empty
3907 std::vector
<FileMetaData
*>* new_files_list
=
3908 new std::vector
<FileMetaData
*>[current_levels
];
3909 for (int i
= 0; i
< new_levels
- 1; i
++) {
3910 new_files_list
[i
] = vstorage
->LevelFiles(i
);
3913 if (first_nonempty_level
> 0) {
3914 new_files_list
[new_levels
- 1] = vstorage
->LevelFiles(first_nonempty_level
);
3917 delete[] vstorage
-> files_
;
3918 vstorage
->files_
= new_files_list
;
3919 vstorage
->num_levels_
= new_levels
;
3921 MutableCFOptions
mutable_cf_options(*options
);
3923 InstrumentedMutex dummy_mutex
;
3924 InstrumentedMutexLock
l(&dummy_mutex
);
3925 return versions
.LogAndApply(
3926 versions
.GetColumnFamilySet()->GetDefault(),
3927 mutable_cf_options
, &ve
, &dummy_mutex
, nullptr, true);
3930 Status
VersionSet::DumpManifest(Options
& options
, std::string
& dscname
,
3931 bool verbose
, bool hex
, bool json
) {
3932 // Open the specified manifest file.
3933 std::unique_ptr
<SequentialFileReader
> file_reader
;
3936 std::unique_ptr
<SequentialFile
> file
;
3937 s
= options
.env
->NewSequentialFile(
3938 dscname
, &file
, env_
->OptimizeForManifestRead(env_options_
));
3942 file_reader
.reset(new SequentialFileReader(std::move(file
), dscname
));
3945 bool have_prev_log_number
= false;
3946 bool have_next_file
= false;
3947 bool have_last_sequence
= false;
3948 uint64_t next_file
= 0;
3949 uint64_t last_sequence
= 0;
3950 uint64_t previous_log_number
= 0;
3952 std::unordered_map
<uint32_t, std::string
> comparators
;
3953 std::unordered_map
<uint32_t, std::unique_ptr
<BaseReferencedVersionBuilder
>>
3956 // add default column family
3957 VersionEdit default_cf_edit
;
3958 default_cf_edit
.AddColumnFamily(kDefaultColumnFamilyName
);
3959 default_cf_edit
.SetColumnFamily(0);
3960 ColumnFamilyData
* default_cfd
=
3961 CreateColumnFamily(ColumnFamilyOptions(options
), &default_cf_edit
);
3963 std::make_pair(0, std::unique_ptr
<BaseReferencedVersionBuilder
>(
3964 new BaseReferencedVersionBuilder(default_cfd
))));
3967 VersionSet::LogReporter reporter
;
3968 reporter
.status
= &s
;
3969 log::Reader
reader(nullptr, std::move(file_reader
), &reporter
,
3970 true /* checksum */, 0 /* log_number */);
3972 std::string scratch
;
3973 while (reader
.ReadRecord(&record
, &scratch
) && s
.ok()) {
3975 s
= edit
.DecodeFrom(record
);
3980 // Write out each individual edit
3981 if (verbose
&& !json
) {
3982 printf("%s\n", edit
.DebugString(hex
).c_str());
3984 printf("%s\n", edit
.DebugJSON(count
, hex
).c_str());
3988 bool cf_in_builders
=
3989 builders
.find(edit
.column_family_
) != builders
.end();
3991 if (edit
.has_comparator_
) {
3992 comparators
.insert({edit
.column_family_
, edit
.comparator_
});
3995 ColumnFamilyData
* cfd
= nullptr;
3997 if (edit
.is_column_family_add_
) {
3998 if (cf_in_builders
) {
3999 s
= Status::Corruption(
4000 "Manifest adding the same column family twice");
4003 cfd
= CreateColumnFamily(ColumnFamilyOptions(options
), &edit
);
4004 cfd
->set_initialized();
4005 builders
.insert(std::make_pair(
4006 edit
.column_family_
, std::unique_ptr
<BaseReferencedVersionBuilder
>(
4007 new BaseReferencedVersionBuilder(cfd
))));
4008 } else if (edit
.is_column_family_drop_
) {
4009 if (!cf_in_builders
) {
4010 s
= Status::Corruption(
4011 "Manifest - dropping non-existing column family");
4014 auto builder_iter
= builders
.find(edit
.column_family_
);
4015 builders
.erase(builder_iter
);
4016 comparators
.erase(edit
.column_family_
);
4017 cfd
= column_family_set_
->GetColumnFamily(edit
.column_family_
);
4018 assert(cfd
!= nullptr);
4023 if (!cf_in_builders
) {
4024 s
= Status::Corruption(
4025 "Manifest record referencing unknown column family");
4029 cfd
= column_family_set_
->GetColumnFamily(edit
.column_family_
);
4030 // this should never happen since cf_in_builders is true
4031 assert(cfd
!= nullptr);
4033 // if it is not column family add or column family drop,
4034 // then it's a file add/delete, which should be forwarded
4036 auto builder
= builders
.find(edit
.column_family_
);
4037 assert(builder
!= builders
.end());
4038 builder
->second
->version_builder()->Apply(&edit
);
4041 if (cfd
!= nullptr && edit
.has_log_number_
) {
4042 cfd
->SetLogNumber(edit
.log_number_
);
4046 if (edit
.has_prev_log_number_
) {
4047 previous_log_number
= edit
.prev_log_number_
;
4048 have_prev_log_number
= true;
4051 if (edit
.has_next_file_number_
) {
4052 next_file
= edit
.next_file_number_
;
4053 have_next_file
= true;
4056 if (edit
.has_last_sequence_
) {
4057 last_sequence
= edit
.last_sequence_
;
4058 have_last_sequence
= true;
4061 if (edit
.has_max_column_family_
) {
4062 column_family_set_
->UpdateMaxColumnFamily(edit
.max_column_family_
);
4065 if (edit
.has_min_log_number_to_keep_
) {
4066 MarkMinLogNumberToKeep2PC(edit
.min_log_number_to_keep_
);
4070 file_reader
.reset();
4073 if (!have_next_file
) {
4074 s
= Status::Corruption("no meta-nextfile entry in descriptor");
4075 printf("no meta-nextfile entry in descriptor");
4076 } else if (!have_last_sequence
) {
4077 printf("no last-sequence-number entry in descriptor");
4078 s
= Status::Corruption("no last-sequence-number entry in descriptor");
4081 if (!have_prev_log_number
) {
4082 previous_log_number
= 0;
4087 for (auto cfd
: *column_family_set_
) {
4088 if (cfd
->IsDropped()) {
4091 auto builders_iter
= builders
.find(cfd
->GetID());
4092 assert(builders_iter
!= builders
.end());
4093 auto builder
= builders_iter
->second
->version_builder();
4095 Version
* v
= new Version(cfd
, this, env_options_
,
4096 *cfd
->GetLatestMutableCFOptions(),
4097 current_version_number_
++);
4098 builder
->SaveTo(v
->storage_info());
4099 v
->PrepareApply(*cfd
->GetLatestMutableCFOptions(), false);
4101 printf("--------------- Column family \"%s\" (ID %" PRIu32
4102 ") --------------\n",
4103 cfd
->GetName().c_str(), cfd
->GetID());
4104 printf("log number: %" PRIu64
"\n", cfd
->GetLogNumber());
4105 auto comparator
= comparators
.find(cfd
->GetID());
4106 if (comparator
!= comparators
.end()) {
4107 printf("comparator: %s\n", comparator
->second
.c_str());
4109 printf("comparator: <NO COMPARATOR>\n");
4111 printf("%s \n", v
->DebugString(hex
).c_str());
4115 next_file_number_
.store(next_file
+ 1);
4116 last_allocated_sequence_
= last_sequence
;
4117 last_published_sequence_
= last_sequence
;
4118 last_sequence_
= last_sequence
;
4119 prev_log_number_
= previous_log_number
;
4121 printf("next_file_number %" PRIu64
" last_sequence %" PRIu64
4122 " prev_log_number %" PRIu64
" max_column_family %" PRIu32
4123 " min_log_number_to_keep "
4125 next_file_number_
.load(), last_sequence
, previous_log_number
,
4126 column_family_set_
->GetMaxColumnFamily(),
4127 min_log_number_to_keep_2pc());
4132 #endif // ROCKSDB_LITE
4134 void VersionSet::MarkFileNumberUsed(uint64_t number
) {
4135 // only called during recovery and repair which are single threaded, so this
4136 // works because there can't be concurrent calls
4137 if (next_file_number_
.load(std::memory_order_relaxed
) <= number
) {
4138 next_file_number_
.store(number
+ 1, std::memory_order_relaxed
);
4142 // Called only either from ::LogAndApply which is protected by mutex or during
4143 // recovery which is single-threaded.
4144 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number
) {
4145 if (min_log_number_to_keep_2pc_
.load(std::memory_order_relaxed
) < number
) {
4146 min_log_number_to_keep_2pc_
.store(number
, std::memory_order_relaxed
);
4150 Status
VersionSet::WriteSnapshot(log::Writer
* log
) {
4151 // TODO: Break up into multiple records to reduce memory usage on recovery?
4153 // WARNING: This method doesn't hold a mutex!!
4155 // This is done without DB mutex lock held, but only within single-threaded
4156 // LogAndApply. Column family manipulations can only happen within LogAndApply
4157 // (the same single thread), so we're safe to iterate.
4158 for (auto cfd
: *column_family_set_
) {
4159 if (cfd
->IsDropped()) {
4162 assert(cfd
->initialized());
4164 // Store column family info
4166 if (cfd
->GetID() != 0) {
4167 // default column family is always there,
4168 // no need to explicitly write it
4169 edit
.AddColumnFamily(cfd
->GetName());
4170 edit
.SetColumnFamily(cfd
->GetID());
4172 edit
.SetComparatorName(
4173 cfd
->internal_comparator().user_comparator()->Name());
4175 if (!edit
.EncodeTo(&record
)) {
4176 return Status::Corruption(
4177 "Unable to Encode VersionEdit:" + edit
.DebugString(true));
4179 Status s
= log
->AddRecord(record
);
4188 edit
.SetColumnFamily(cfd
->GetID());
4190 for (int level
= 0; level
< cfd
->NumberLevels(); level
++) {
4191 for (const auto& f
:
4192 cfd
->current()->storage_info()->LevelFiles(level
)) {
4193 edit
.AddFile(level
, f
->fd
.GetNumber(), f
->fd
.GetPathId(),
4194 f
->fd
.GetFileSize(), f
->smallest
, f
->largest
,
4195 f
->fd
.smallest_seqno
, f
->fd
.largest_seqno
,
4196 f
->marked_for_compaction
);
4199 edit
.SetLogNumber(cfd
->GetLogNumber());
4201 if (!edit
.EncodeTo(&record
)) {
4202 return Status::Corruption(
4203 "Unable to Encode VersionEdit:" + edit
.DebugString(true));
4205 Status s
= log
->AddRecord(record
);
4212 return Status::OK();
4215 // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
4216 // function is called repeatedly with consecutive pairs of slices. For example
4217 // if the slice list is [a, b, c, d] this function is called with arguments
4218 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
4219 // we avoid doing binary search for the keys b and c twice and instead somehow
4220 // maintain state of where they first appear in the files.
4221 uint64_t VersionSet::ApproximateSize(Version
* v
, const Slice
& start
,
4222 const Slice
& end
, int start_level
,
4225 assert(v
->cfd_
->internal_comparator().Compare(start
, end
) <= 0);
4228 const auto* vstorage
= v
->storage_info();
4229 end_level
= end_level
== -1
4230 ? vstorage
->num_non_empty_levels()
4231 : std::min(end_level
, vstorage
->num_non_empty_levels());
4233 assert(start_level
<= end_level
);
4235 for (int level
= start_level
; level
< end_level
; level
++) {
4236 const LevelFilesBrief
& files_brief
= vstorage
->LevelFilesBrief(level
);
4237 if (!files_brief
.num_files
) {
4238 // empty level, skip exploration
4243 // level 0 data is sorted order, handle the use case explicitly
4244 size
+= ApproximateSizeLevel0(v
, files_brief
, start
, end
);
4249 assert(files_brief
.num_files
> 0);
4251 // identify the file position for starting key
4252 const uint64_t idx_start
= FindFileInRange(
4253 v
->cfd_
->internal_comparator(), files_brief
, start
,
4254 /*start=*/0, static_cast<uint32_t>(files_brief
.num_files
- 1));
4255 assert(idx_start
< files_brief
.num_files
);
4257 // scan all files from the starting position until the ending position
4258 // inferred from the sorted order
4259 for (uint64_t i
= idx_start
; i
< files_brief
.num_files
; i
++) {
4261 val
= ApproximateSize(v
, files_brief
.files
[i
], end
);
4263 // the files after this will not have the range
4269 if (i
== idx_start
) {
4270 // subtract the bytes needed to be scanned to get to the starting
4272 val
= ApproximateSize(v
, files_brief
.files
[i
], start
);
4273 assert(size
>= val
);
4282 uint64_t VersionSet::ApproximateSizeLevel0(Version
* v
,
4283 const LevelFilesBrief
& files_brief
,
4284 const Slice
& key_start
,
4285 const Slice
& key_end
) {
4286 // level 0 files are not in sorted order, we need to iterate through
4287 // the list to compute the total bytes that require scanning
4289 for (size_t i
= 0; i
< files_brief
.num_files
; i
++) {
4290 const uint64_t start
= ApproximateSize(v
, files_brief
.files
[i
], key_start
);
4291 const uint64_t end
= ApproximateSize(v
, files_brief
.files
[i
], key_end
);
4292 assert(end
>= start
);
4293 size
+= end
- start
;
4298 uint64_t VersionSet::ApproximateSize(Version
* v
, const FdWithKeyRange
& f
,
4303 uint64_t result
= 0;
4304 if (v
->cfd_
->internal_comparator().Compare(f
.largest_key
, key
) <= 0) {
4305 // Entire file is before "key", so just add the file size
4306 result
= f
.fd
.GetFileSize();
4307 } else if (v
->cfd_
->internal_comparator().Compare(f
.smallest_key
, key
) > 0) {
4308 // Entire file is after "key", so ignore
4311 // "key" falls in the range for this table. Add the
4312 // approximate offset of "key" within the table.
4313 TableReader
* table_reader_ptr
;
4314 InternalIterator
* iter
= v
->cfd_
->table_cache()->NewIterator(
4315 ReadOptions(), v
->env_options_
, v
->cfd_
->internal_comparator(),
4316 *f
.file_metadata
, nullptr /* range_del_agg */,
4317 v
->GetMutableCFOptions().prefix_extractor
.get(), &table_reader_ptr
);
4318 if (table_reader_ptr
!= nullptr) {
4319 result
= table_reader_ptr
->ApproximateOffsetOf(key
);
4326 void VersionSet::AddLiveFiles(std::vector
<FileDescriptor
>* live_list
) {
4327 // pre-calculate space requirement
4328 int64_t total_files
= 0;
4329 for (auto cfd
: *column_family_set_
) {
4330 if (!cfd
->initialized()) {
4333 Version
* dummy_versions
= cfd
->dummy_versions();
4334 for (Version
* v
= dummy_versions
->next_
; v
!= dummy_versions
;
4336 const auto* vstorage
= v
->storage_info();
4337 for (int level
= 0; level
< vstorage
->num_levels(); level
++) {
4338 total_files
+= vstorage
->LevelFiles(level
).size();
4343 // just one time extension to the right size
4344 live_list
->reserve(live_list
->size() + static_cast<size_t>(total_files
));
4346 for (auto cfd
: *column_family_set_
) {
4347 if (!cfd
->initialized()) {
4350 auto* current
= cfd
->current();
4351 bool found_current
= false;
4352 Version
* dummy_versions
= cfd
->dummy_versions();
4353 for (Version
* v
= dummy_versions
->next_
; v
!= dummy_versions
;
4355 v
->AddLiveFiles(live_list
);
4357 found_current
= true;
4360 if (!found_current
&& current
!= nullptr) {
4361 // Should never happen unless it is a bug.
4363 current
->AddLiveFiles(live_list
);
4368 InternalIterator
* VersionSet::MakeInputIterator(
4369 const Compaction
* c
, RangeDelAggregator
* range_del_agg
,
4370 const EnvOptions
& env_options_compactions
) {
4371 auto cfd
= c
->column_family_data();
4372 ReadOptions read_options
;
4373 read_options
.verify_checksums
= true;
4374 read_options
.fill_cache
= false;
4375 // Compaction iterators shouldn't be confined to a single prefix.
4376 // Compactions use Seek() for
4377 // (a) concurrent compactions,
4378 // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
4379 read_options
.total_order_seek
= true;
4381 // Level-0 files have to be merged together. For other levels,
4382 // we will make a concatenating iterator per level.
4383 // TODO(opt): use concatenating iterator for level-0 if there is no overlap
4384 const size_t space
= (c
->level() == 0 ? c
->input_levels(0)->num_files
+
4385 c
->num_input_levels() - 1
4386 : c
->num_input_levels());
4387 InternalIterator
** list
= new InternalIterator
* [space
];
4389 for (size_t which
= 0; which
< c
->num_input_levels(); which
++) {
4390 if (c
->input_levels(which
)->num_files
!= 0) {
4391 if (c
->level(which
) == 0) {
4392 const LevelFilesBrief
* flevel
= c
->input_levels(which
);
4393 for (size_t i
= 0; i
< flevel
->num_files
; i
++) {
4394 list
[num
++] = cfd
->table_cache()->NewIterator(
4395 read_options
, env_options_compactions
, cfd
->internal_comparator(),
4396 *flevel
->files
[i
].file_metadata
, range_del_agg
,
4397 c
->mutable_cf_options()->prefix_extractor
.get(),
4398 nullptr /* table_reader_ptr */,
4399 nullptr /* no per level latency histogram */,
4400 true /* for_compaction */, nullptr /* arena */,
4401 false /* skip_filters */, static_cast<int>(which
) /* level */);
4404 // Create concatenating iterator for the files from this level
4405 list
[num
++] = new LevelIterator(
4406 cfd
->table_cache(), read_options
, env_options_compactions
,
4407 cfd
->internal_comparator(), c
->input_levels(which
),
4408 c
->mutable_cf_options()->prefix_extractor
.get(),
4409 false /* should_sample */,
4410 nullptr /* no per level latency histogram */,
4411 true /* for_compaction */, false /* skip_filters */,
4412 static_cast<int>(which
) /* level */, range_del_agg
,
4413 c
->boundaries(which
));
4417 assert(num
<= space
);
4418 InternalIterator
* result
=
4419 NewMergingIterator(&c
->column_family_data()->internal_comparator(), list
,
4420 static_cast<int>(num
));
4425 // verify that the files listed in this compaction are present
4426 // in the current version
4427 bool VersionSet::VerifyCompactionFileConsistency(Compaction
* c
) {
4429 Version
* version
= c
->column_family_data()->current();
4430 const VersionStorageInfo
* vstorage
= version
->storage_info();
4431 if (c
->input_version() != version
) {
4433 db_options_
->info_log
,
4434 "[%s] compaction output being applied to a different base version from"
4436 c
->column_family_data()->GetName().c_str());
4438 if (vstorage
->compaction_style_
== kCompactionStyleLevel
&&
4439 c
->start_level() == 0 && c
->num_input_levels() > 2U) {
4440 // We are doing a L0->base_level compaction. The assumption is if
4441 // base level is not L1, levels from L1 to base_level - 1 is empty.
4442 // This is ensured by having one compaction from L0 going on at the
4443 // same time in level-based compaction. So that during the time, no
4444 // compaction/flush can put files to those levels.
4445 for (int l
= c
->start_level() + 1; l
< c
->output_level(); l
++) {
4446 if (vstorage
->NumLevelFiles(l
) != 0) {
4453 for (size_t input
= 0; input
< c
->num_input_levels(); ++input
) {
4454 int level
= c
->level(input
);
4455 for (size_t i
= 0; i
< c
->num_input_files(input
); ++i
) {
4456 uint64_t number
= c
->input(input
, i
)->fd
.GetNumber();
4458 for (size_t j
= 0; j
< vstorage
->files_
[level
].size(); j
++) {
4459 FileMetaData
* f
= vstorage
->files_
[level
][j
];
4460 if (f
->fd
.GetNumber() == number
) {
4466 return false; // input files non existent in current version
4473 return true; // everything good
4476 Status
VersionSet::GetMetadataForFile(uint64_t number
, int* filelevel
,
4477 FileMetaData
** meta
,
4478 ColumnFamilyData
** cfd
) {
4479 for (auto cfd_iter
: *column_family_set_
) {
4480 if (!cfd_iter
->initialized()) {
4483 Version
* version
= cfd_iter
->current();
4484 const auto* vstorage
= version
->storage_info();
4485 for (int level
= 0; level
< vstorage
->num_levels(); level
++) {
4486 for (const auto& file
: vstorage
->LevelFiles(level
)) {
4487 if (file
->fd
.GetNumber() == number
) {
4491 return Status::OK();
4496 return Status::NotFound("File not present in any level");
4499 void VersionSet::GetLiveFilesMetaData(std::vector
<LiveFileMetaData
>* metadata
) {
4500 for (auto cfd
: *column_family_set_
) {
4501 if (cfd
->IsDropped() || !cfd
->initialized()) {
4504 for (int level
= 0; level
< cfd
->NumberLevels(); level
++) {
4505 for (const auto& file
:
4506 cfd
->current()->storage_info()->LevelFiles(level
)) {
4507 LiveFileMetaData filemetadata
;
4508 filemetadata
.column_family_name
= cfd
->GetName();
4509 uint32_t path_id
= file
->fd
.GetPathId();
4510 if (path_id
< cfd
->ioptions()->cf_paths
.size()) {
4511 filemetadata
.db_path
= cfd
->ioptions()->cf_paths
[path_id
].path
;
4513 assert(!cfd
->ioptions()->cf_paths
.empty());
4514 filemetadata
.db_path
= cfd
->ioptions()->cf_paths
.back().path
;
4516 filemetadata
.name
= MakeTableFileName("", file
->fd
.GetNumber());
4517 filemetadata
.level
= level
;
4518 filemetadata
.size
= static_cast<size_t>(file
->fd
.GetFileSize());
4519 filemetadata
.smallestkey
= file
->smallest
.user_key().ToString();
4520 filemetadata
.largestkey
= file
->largest
.user_key().ToString();
4521 filemetadata
.smallest_seqno
= file
->fd
.smallest_seqno
;
4522 filemetadata
.largest_seqno
= file
->fd
.largest_seqno
;
4523 filemetadata
.num_reads_sampled
= file
->stats
.num_reads_sampled
.load(
4524 std::memory_order_relaxed
);
4525 filemetadata
.being_compacted
= file
->being_compacted
;
4526 filemetadata
.num_entries
= file
->num_entries
;
4527 filemetadata
.num_deletions
= file
->num_deletions
;
4528 metadata
->push_back(filemetadata
);
4534 void VersionSet::GetObsoleteFiles(std::vector
<ObsoleteFileInfo
>* files
,
4535 std::vector
<std::string
>* manifest_filenames
,
4536 uint64_t min_pending_output
) {
4537 assert(manifest_filenames
->empty());
4538 obsolete_manifests_
.swap(*manifest_filenames
);
4539 std::vector
<ObsoleteFileInfo
> pending_files
;
4540 for (auto& f
: obsolete_files_
) {
4541 if (f
.metadata
->fd
.GetNumber() < min_pending_output
) {
4542 files
->push_back(std::move(f
));
4544 pending_files
.push_back(std::move(f
));
4547 obsolete_files_
.swap(pending_files
);
4550 ColumnFamilyData
* VersionSet::CreateColumnFamily(
4551 const ColumnFamilyOptions
& cf_options
, VersionEdit
* edit
) {
4552 assert(edit
->is_column_family_add_
);
4554 MutableCFOptions dummy_cf_options
;
4555 Version
* dummy_versions
=
4556 new Version(nullptr, this, env_options_
, dummy_cf_options
);
4557 // Ref() dummy version once so that later we can call Unref() to delete it
4558 // by avoiding calling "delete" explicitly (~Version is private)
4559 dummy_versions
->Ref();
4560 auto new_cfd
= column_family_set_
->CreateColumnFamily(
4561 edit
->column_family_name_
, edit
->column_family_
, dummy_versions
,
4564 Version
* v
= new Version(new_cfd
, this, env_options_
,
4565 *new_cfd
->GetLatestMutableCFOptions(),
4566 current_version_number_
++);
4568 // Fill level target base information.
4569 v
->storage_info()->CalculateBaseBytes(*new_cfd
->ioptions(),
4570 *new_cfd
->GetLatestMutableCFOptions());
4571 AppendVersion(new_cfd
, v
);
4572 // GetLatestMutableCFOptions() is safe here without mutex since the
4573 // cfd is not available to client
4574 new_cfd
->CreateNewMemtable(*new_cfd
->GetLatestMutableCFOptions(),
4576 new_cfd
->SetLogNumber(edit
->log_number_
);
4580 uint64_t VersionSet::GetNumLiveVersions(Version
* dummy_versions
) {
4582 for (Version
* v
= dummy_versions
->next_
; v
!= dummy_versions
; v
= v
->next_
) {
4588 uint64_t VersionSet::GetTotalSstFilesSize(Version
* dummy_versions
) {
4589 std::unordered_set
<uint64_t> unique_files
;
4590 uint64_t total_files_size
= 0;
4591 for (Version
* v
= dummy_versions
->next_
; v
!= dummy_versions
; v
= v
->next_
) {
4592 VersionStorageInfo
* storage_info
= v
->storage_info();
4593 for (int level
= 0; level
< storage_info
->num_levels_
; level
++) {
4594 for (const auto& file_meta
: storage_info
->LevelFiles(level
)) {
4595 if (unique_files
.find(file_meta
->fd
.packed_number_and_path_id
) ==
4596 unique_files
.end()) {
4597 unique_files
.insert(file_meta
->fd
.packed_number_and_path_id
);
4598 total_files_size
+= file_meta
->fd
.GetFileSize();
4603 return total_files_size
;
4606 ReactiveVersionSet::ReactiveVersionSet(const std::string
& dbname
,
4607 const ImmutableDBOptions
* _db_options
,
4608 const EnvOptions
& _env_options
,
4610 WriteBufferManager
* write_buffer_manager
,
4611 WriteController
* write_controller
)
4612 : VersionSet(dbname
, _db_options
, _env_options
, table_cache
,
4613 write_buffer_manager
, write_controller
) {}
4615 ReactiveVersionSet::~ReactiveVersionSet() {}
4617 Status
ReactiveVersionSet::Recover(
4618 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
4619 std::unique_ptr
<log::FragmentBufferedReader
>* manifest_reader
,
4620 std::unique_ptr
<log::Reader::Reporter
>* manifest_reporter
,
4621 std::unique_ptr
<Status
>* manifest_reader_status
) {
4622 assert(manifest_reader
!= nullptr);
4623 assert(manifest_reporter
!= nullptr);
4624 assert(manifest_reader_status
!= nullptr);
4626 std::unordered_map
<std::string
, ColumnFamilyOptions
> cf_name_to_options
;
4627 for (const auto& cf
: column_families
) {
4628 cf_name_to_options
.insert({cf
.name
, cf
.options
});
4631 // add default column family
4632 auto default_cf_iter
= cf_name_to_options
.find(kDefaultColumnFamilyName
);
4633 if (default_cf_iter
== cf_name_to_options
.end()) {
4634 return Status::InvalidArgument("Default column family not specified");
4636 VersionEdit default_cf_edit
;
4637 default_cf_edit
.AddColumnFamily(kDefaultColumnFamilyName
);
4638 default_cf_edit
.SetColumnFamily(0);
4639 ColumnFamilyData
* default_cfd
=
4640 CreateColumnFamily(default_cf_iter
->second
, &default_cf_edit
);
4641 // In recovery, nobody else can access it, so it's fine to set it to be
4642 // initialized earlier.
4643 default_cfd
->set_initialized();
4645 bool have_log_number
= false;
4646 bool have_prev_log_number
= false;
4647 bool have_next_file
= false;
4648 bool have_last_sequence
= false;
4649 uint64_t next_file
= 0;
4650 uint64_t last_sequence
= 0;
4651 uint64_t log_number
= 0;
4652 uint64_t previous_log_number
= 0;
4653 uint32_t max_column_family
= 0;
4654 uint64_t min_log_number_to_keep
= 0;
4655 std::unordered_map
<uint32_t, std::unique_ptr
<BaseReferencedVersionBuilder
>>
4657 std::unordered_map
<int, std::string
> column_families_not_found
;
4659 std::make_pair(0, std::unique_ptr
<BaseReferencedVersionBuilder
>(
4660 new BaseReferencedVersionBuilder(default_cfd
))));
4662 manifest_reader_status
->reset(new Status());
4663 manifest_reporter
->reset(new LogReporter());
4664 static_cast<LogReporter
*>(manifest_reporter
->get())->status
=
4665 manifest_reader_status
->get();
4666 Status s
= MaybeSwitchManifest(manifest_reporter
->get(), manifest_reader
);
4667 log::Reader
* reader
= manifest_reader
->get();
4670 while (s
.ok() && retry
< 1) {
4671 assert(reader
!= nullptr);
4673 std::string scratch
;
4674 while (s
.ok() && reader
->ReadRecord(&record
, &scratch
)) {
4676 s
= edit
.DecodeFrom(record
);
4680 s
= ApplyOneVersionEditToBuilder(
4681 edit
, cf_name_to_options
, column_families_not_found
, builders
,
4682 &have_log_number
, &log_number
, &have_prev_log_number
,
4683 &previous_log_number
, &have_next_file
, &next_file
,
4684 &have_last_sequence
, &last_sequence
, &min_log_number_to_keep
,
4685 &max_column_family
);
4688 bool enough
= have_next_file
&& have_log_number
&& have_last_sequence
;
4690 for (const auto& cf
: column_families
) {
4691 auto cfd
= column_family_set_
->GetColumnFamily(cf
.name
);
4692 if (cfd
== nullptr) {
4699 for (const auto& cf
: column_families
) {
4700 auto cfd
= column_family_set_
->GetColumnFamily(cf
.name
);
4701 assert(cfd
!= nullptr);
4702 if (!cfd
->IsDropped()) {
4703 auto builder_iter
= builders
.find(cfd
->GetID());
4704 assert(builder_iter
!= builders
.end());
4705 auto builder
= builder_iter
->second
->version_builder();
4706 assert(builder
!= nullptr);
4707 s
= builder
->LoadTableHandlers(
4708 cfd
->internal_stats(), db_options_
->max_file_opening_threads
,
4709 false /* prefetch_index_and_filter_in_cache */,
4710 true /* is_initial_load */,
4711 cfd
->GetLatestMutableCFOptions()->prefix_extractor
.get());
4714 if (s
.IsPathNotFound()) {
4730 if (!have_prev_log_number
) {
4731 previous_log_number
= 0;
4733 column_family_set_
->UpdateMaxColumnFamily(max_column_family
);
4735 MarkMinLogNumberToKeep2PC(min_log_number_to_keep
);
4736 MarkFileNumberUsed(previous_log_number
);
4737 MarkFileNumberUsed(log_number
);
4739 for (auto cfd
: *column_family_set_
) {
4740 assert(builders
.count(cfd
->GetID()) > 0);
4741 auto builder
= builders
[cfd
->GetID()]->version_builder();
4742 if (!builder
->CheckConsistencyForNumLevels()) {
4743 s
= Status::InvalidArgument(
4744 "db has more levels than options.num_levels");
4751 for (auto cfd
: *column_family_set_
) {
4752 if (cfd
->IsDropped()) {
4755 assert(cfd
->initialized());
4756 auto builders_iter
= builders
.find(cfd
->GetID());
4757 assert(builders_iter
!= builders
.end());
4758 auto* builder
= builders_iter
->second
->version_builder();
4760 Version
* v
= new Version(cfd
, this, env_options_
,
4761 *cfd
->GetLatestMutableCFOptions(),
4762 current_version_number_
++);
4763 builder
->SaveTo(v
->storage_info());
4765 // Install recovered version
4766 v
->PrepareApply(*cfd
->GetLatestMutableCFOptions(),
4767 !(db_options_
->skip_stats_update_on_db_open
));
4768 AppendVersion(cfd
, v
);
4770 next_file_number_
.store(next_file
+ 1);
4771 last_allocated_sequence_
= last_sequence
;
4772 last_published_sequence_
= last_sequence
;
4773 last_sequence_
= last_sequence
;
4774 prev_log_number_
= previous_log_number
;
4775 for (auto cfd
: *column_family_set_
) {
4776 if (cfd
->IsDropped()) {
4779 ROCKS_LOG_INFO(db_options_
->info_log
,
4780 "Column family [%s] (ID %u), log number is %" PRIu64
"\n",
4781 cfd
->GetName().c_str(), cfd
->GetID(), cfd
->GetLogNumber());
4787 Status
ReactiveVersionSet::ReadAndApply(
4788 InstrumentedMutex
* mu
,
4789 std::unique_ptr
<log::FragmentBufferedReader
>* manifest_reader
,
4790 std::unordered_set
<ColumnFamilyData
*>* cfds_changed
) {
4791 assert(manifest_reader
!= nullptr);
4792 assert(cfds_changed
!= nullptr);
4796 bool have_log_number
= false;
4797 bool have_prev_log_number
= false;
4798 bool have_next_file
= false;
4799 bool have_last_sequence
= false;
4800 uint64_t next_file
= 0;
4801 uint64_t last_sequence
= 0;
4802 uint64_t log_number
= 0;
4803 uint64_t previous_log_number
= 0;
4804 uint32_t max_column_family
= 0;
4805 uint64_t min_log_number_to_keep
= 0;
4809 std::string scratch
;
4810 log::Reader
* reader
= manifest_reader
->get();
4811 std::string old_manifest_path
= reader
->file()->file_name();
4812 while (reader
->ReadRecord(&record
, &scratch
)) {
4814 s
= edit
.DecodeFrom(record
);
4818 ColumnFamilyData
* cfd
=
4819 column_family_set_
->GetColumnFamily(edit
.column_family_
);
4820 // If we cannot find this column family in our column family set, then it
4821 // may be a new column family created by the primary after the secondary
4822 // starts. Ignore it for now.
4823 if (nullptr == cfd
) {
4826 if (active_version_builders_
.find(edit
.column_family_
) ==
4827 active_version_builders_
.end()) {
4828 std::unique_ptr
<BaseReferencedVersionBuilder
> builder_guard(
4829 new BaseReferencedVersionBuilder(cfd
));
4830 active_version_builders_
.insert(
4831 std::make_pair(edit
.column_family_
, std::move(builder_guard
)));
4833 s
= ApplyOneVersionEditToBuilder(
4834 edit
, &have_log_number
, &log_number
, &have_prev_log_number
,
4835 &previous_log_number
, &have_next_file
, &next_file
,
4836 &have_last_sequence
, &last_sequence
, &min_log_number_to_keep
,
4837 &max_column_family
);
4841 auto builder_iter
= active_version_builders_
.find(edit
.column_family_
);
4842 assert(builder_iter
!= active_version_builders_
.end());
4843 auto builder
= builder_iter
->second
->version_builder();
4844 assert(builder
!= nullptr);
4845 s
= builder
->LoadTableHandlers(
4846 cfd
->internal_stats(), db_options_
->max_file_opening_threads
,
4847 false /* prefetch_index_and_filter_in_cache */,
4848 false /* is_initial_load */,
4849 cfd
->GetLatestMutableCFOptions()->prefix_extractor
.get());
4850 TEST_SYNC_POINT_CALLBACK(
4851 "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", &s
);
4852 if (!s
.ok() && !s
.IsPathNotFound()) {
4854 } else if (s
.IsPathNotFound()) {
4856 } else { // s.ok() == true
4857 auto version
= new Version(cfd
, this, env_options_
,
4858 *cfd
->GetLatestMutableCFOptions(),
4859 current_version_number_
++);
4860 builder
->SaveTo(version
->storage_info());
4861 version
->PrepareApply(*cfd
->GetLatestMutableCFOptions(), true);
4862 AppendVersion(cfd
, version
);
4863 active_version_builders_
.erase(builder_iter
);
4864 if (cfds_changed
->count(cfd
) == 0) {
4865 cfds_changed
->insert(cfd
);
4868 if (have_next_file
) {
4869 next_file_number_
.store(next_file
+ 1);
4871 if (have_last_sequence
) {
4872 last_allocated_sequence_
= last_sequence
;
4873 last_published_sequence_
= last_sequence
;
4874 last_sequence_
= last_sequence
;
4876 if (have_prev_log_number
) {
4877 prev_log_number_
= previous_log_number
;
4878 MarkFileNumberUsed(previous_log_number
);
4880 if (have_log_number
) {
4881 MarkFileNumberUsed(log_number
);
4883 column_family_set_
->UpdateMaxColumnFamily(max_column_family
);
4884 MarkMinLogNumberToKeep2PC(min_log_number_to_keep
);
4886 // It's possible that:
4887 // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
4888 // 2) we have finished reading the current MANIFEST.
4889 // 3) we have encountered an IOError reading the current MANIFEST.
4890 // We need to look for the next MANIFEST and start from there. If we cannot
4891 // find the next MANIFEST, we should exit the loop.
4892 s
= MaybeSwitchManifest(reader
->GetReporter(), manifest_reader
);
4893 reader
= manifest_reader
->get();
4894 if (s
.ok() && reader
->file()->file_name() == old_manifest_path
) {
4900 for (auto cfd
: *column_family_set_
) {
4901 auto builder_iter
= active_version_builders_
.find(cfd
->GetID());
4902 if (builder_iter
== active_version_builders_
.end()) {
4905 auto builder
= builder_iter
->second
->version_builder();
4906 if (!builder
->CheckConsistencyForNumLevels()) {
4907 s
= Status::InvalidArgument(
4908 "db has more levels than options.num_levels");
4916 Status
ReactiveVersionSet::ApplyOneVersionEditToBuilder(
4917 VersionEdit
& edit
, bool* have_log_number
, uint64_t* log_number
,
4918 bool* have_prev_log_number
, uint64_t* previous_log_number
,
4919 bool* have_next_file
, uint64_t* next_file
, bool* have_last_sequence
,
4920 SequenceNumber
* last_sequence
, uint64_t* min_log_number_to_keep
,
4921 uint32_t* max_column_family
) {
4922 ColumnFamilyData
* cfd
= nullptr;
4924 if (edit
.is_column_family_add_
) {
4925 // TODO (yanqin) for now the secondary ignores column families created
4926 // after Open. This also simplifies handling of switching to a new MANIFEST
4927 // and processing the snapshot of the system at the beginning of the
4929 return Status::OK();
4930 } else if (edit
.is_column_family_drop_
) {
4931 cfd
= column_family_set_
->GetColumnFamily(edit
.column_family_
);
4932 // Drop a CF created by primary after secondary starts? Then ignore
4933 if (cfd
== nullptr) {
4934 return Status::OK();
4936 // Drop the column family by setting it to be 'dropped' without destroying
4937 // the column family handle.
4944 cfd
= column_family_set_
->GetColumnFamily(edit
.column_family_
);
4945 // Operation on a CF created after Open? Then ignore
4946 if (cfd
== nullptr) {
4947 return Status::OK();
4949 auto builder_iter
= active_version_builders_
.find(edit
.column_family_
);
4950 assert(builder_iter
!= active_version_builders_
.end());
4951 auto builder
= builder_iter
->second
->version_builder();
4952 assert(builder
!= nullptr);
4953 builder
->Apply(&edit
);
4955 return ExtractInfoFromVersionEdit(
4956 cfd
, edit
, have_log_number
, log_number
, have_prev_log_number
,
4957 previous_log_number
, have_next_file
, next_file
, have_last_sequence
,
4958 last_sequence
, min_log_number_to_keep
, max_column_family
);
4961 Status
ReactiveVersionSet::MaybeSwitchManifest(
4962 log::Reader::Reporter
* reporter
,
4963 std::unique_ptr
<log::FragmentBufferedReader
>* manifest_reader
) {
4964 assert(manifest_reader
!= nullptr);
4967 std::string manifest_path
;
4968 s
= GetCurrentManifestPath(&manifest_path
);
4969 std::unique_ptr
<SequentialFile
> manifest_file
;
4971 if (nullptr == manifest_reader
->get() ||
4972 manifest_reader
->get()->file()->file_name() != manifest_path
) {
4974 "ReactiveVersionSet::MaybeSwitchManifest:"
4975 "AfterGetCurrentManifestPath:0");
4977 "ReactiveVersionSet::MaybeSwitchManifest:"
4978 "AfterGetCurrentManifestPath:1");
4979 s
= env_
->NewSequentialFile(
4980 manifest_path
, &manifest_file
,
4981 env_
->OptimizeForManifestRead(env_options_
));
4983 // No need to switch manifest.
4987 std::unique_ptr
<SequentialFileReader
> manifest_file_reader
;
4989 manifest_file_reader
.reset(
4990 new SequentialFileReader(std::move(manifest_file
), manifest_path
));
4991 manifest_reader
->reset(new log::FragmentBufferedReader(
4992 nullptr, std::move(manifest_file_reader
), reporter
,
4993 true /* checksum */, 0 /* log_number */));
4994 ROCKS_LOG_INFO(db_options_
->info_log
, "Switched to new manifest: %s\n",
4995 manifest_path
.c_str());
4996 // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
4997 // active_version_builders_ map because we choose to construct the
4998 // versions from scratch, thanks to the first part of each MANIFEST
4999 // written by VersionSet::WriteSnapshot. This is not necessary, but we
5000 // choose this at present for the sake of simplicity.
5001 active_version_builders_
.clear();
5003 } while (s
.IsPathNotFound());
5007 } // namespace rocksdb