1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/version_set.h"
21 #include <unordered_map>
24 #include "compaction/compaction.h"
25 #include "db/blob/blob_file_cache.h"
26 #include "db/blob/blob_file_reader.h"
27 #include "db/blob/blob_index.h"
28 #include "db/internal_stats.h"
29 #include "db/log_reader.h"
30 #include "db/log_writer.h"
31 #include "db/memtable.h"
32 #include "db/merge_context.h"
33 #include "db/merge_helper.h"
34 #include "db/pinned_iterators_manager.h"
35 #include "db/table_cache.h"
36 #include "db/version_builder.h"
37 #include "db/version_edit_handler.h"
38 #include "file/filename.h"
39 #include "file/random_access_file_reader.h"
40 #include "file/read_write_util.h"
41 #include "file/writable_file_writer.h"
42 #include "monitoring/file_read_sample.h"
43 #include "monitoring/perf_context_imp.h"
44 #include "monitoring/persistent_stats_history.h"
45 #include "rocksdb/env.h"
46 #include "rocksdb/merge_operator.h"
47 #include "rocksdb/write_buffer_manager.h"
48 #include "table/format.h"
49 #include "table/get_context.h"
50 #include "table/internal_iterator.h"
51 #include "table/merging_iterator.h"
52 #include "table/meta_blocks.h"
53 #include "table/multiget_context.h"
54 #include "table/plain/plain_table_factory.h"
55 #include "table/table_reader.h"
56 #include "table/two_level_iterator.h"
57 #include "test_util/sync_point.h"
58 #include "util/cast_util.h"
59 #include "util/coding.h"
60 #include "util/stop_watch.h"
61 #include "util/string_util.h"
62 #include "util/user_comparator_wrapper.h"
64 namespace ROCKSDB_NAMESPACE
{
68 // Find File in LevelFilesBrief data structure
69 // Within an index range defined by left and right
70 int FindFileInRange(const InternalKeyComparator
& icmp
,
71 const LevelFilesBrief
& file_level
,
75 auto cmp
= [&](const FdWithKeyRange
& f
, const Slice
& k
) -> bool {
76 return icmp
.InternalKeyComparator::Compare(f
.largest_key
, k
) < 0;
78 const auto &b
= file_level
.files
;
79 return static_cast<int>(std::lower_bound(b
+ left
,
80 b
+ right
, key
, cmp
) - b
);
83 Status
OverlapWithIterator(const Comparator
* ucmp
,
84 const Slice
& smallest_user_key
,
85 const Slice
& largest_user_key
,
86 InternalIterator
* iter
,
88 InternalKey
range_start(smallest_user_key
, kMaxSequenceNumber
,
90 iter
->Seek(range_start
.Encode());
91 if (!iter
->status().ok()) {
92 return iter
->status();
97 ParsedInternalKey seek_result
;
98 Status s
= ParseInternalKey(iter
->key(), &seek_result
,
99 false /* log_err_key */); // TODO
100 if (!s
.ok()) return s
;
102 if (ucmp
->CompareWithoutTimestamp(seek_result
.user_key
, largest_user_key
) <=
108 return iter
->status();
111 // Class to help choose the next file to search for the particular key.
112 // Searches and returns files level by level.
113 // We can search level-by-level since entries never hop across
114 // levels. Therefore we are guaranteed that if we find data
115 // in a smaller level, later levels are irrelevant (unless we
116 // are MergeInProgress).
119 FilePicker(std::vector
<FileMetaData
*>* files
, const Slice
& user_key
,
120 const Slice
& ikey
, autovector
<LevelFilesBrief
>* file_levels
,
121 unsigned int num_levels
, FileIndexer
* file_indexer
,
122 const Comparator
* user_comparator
,
123 const InternalKeyComparator
* internal_comparator
)
124 : num_levels_(num_levels
),
125 curr_level_(static_cast<unsigned int>(-1)),
126 returned_file_level_(static_cast<unsigned int>(-1)),
127 hit_file_level_(static_cast<unsigned int>(-1)),
128 search_left_bound_(0),
129 search_right_bound_(FileIndexer::kLevelMaxIndex
),
133 level_files_brief_(file_levels
),
134 is_hit_file_last_in_level_(false),
135 curr_file_level_(nullptr),
138 file_indexer_(file_indexer
),
139 user_comparator_(user_comparator
),
140 internal_comparator_(internal_comparator
) {
144 // Setup member variables to search first level.
145 search_ended_
= !PrepareNextLevel();
146 if (!search_ended_
) {
147 // Prefetch Level 0 table data to avoid cache miss if possible.
148 for (unsigned int i
= 0; i
< (*level_files_brief_
)[0].num_files
; ++i
) {
149 auto* r
= (*level_files_brief_
)[0].files
[i
].fd
.table_reader
;
157 int GetCurrentLevel() const { return curr_level_
; }
159 FdWithKeyRange
* GetNextFile() {
160 while (!search_ended_
) { // Loops over different levels.
161 while (curr_index_in_curr_level_
< curr_file_level_
->num_files
) {
162 // Loops over all files in current level.
163 FdWithKeyRange
* f
= &curr_file_level_
->files
[curr_index_in_curr_level_
];
164 hit_file_level_
= curr_level_
;
165 is_hit_file_last_in_level_
=
166 curr_index_in_curr_level_
== curr_file_level_
->num_files
- 1;
167 int cmp_largest
= -1;
169 // Do key range filtering of files or/and fractional cascading if:
170 // (1) not all the files are in level 0, or
171 // (2) there are more than 3 current level files
172 // If there are only 3 or less current level files in the system, we skip
173 // the key range filtering. In this case, more likely, the system is
174 // highly tuned to minimize number of tables queried by each query,
175 // so it is unlikely that key range filtering is more efficient than
176 // querying the files.
177 if (num_levels_
> 1 || curr_file_level_
->num_files
> 3) {
178 // Check if key is within a file's range. If search left bound and
179 // right bound point to the same find, we are sure key falls in
181 assert(curr_level_
== 0 ||
182 curr_index_in_curr_level_
== start_index_in_curr_level_
||
183 user_comparator_
->CompareWithoutTimestamp(
184 user_key_
, ExtractUserKey(f
->smallest_key
)) <= 0);
186 int cmp_smallest
= user_comparator_
->CompareWithoutTimestamp(
187 user_key_
, ExtractUserKey(f
->smallest_key
));
188 if (cmp_smallest
>= 0) {
189 cmp_largest
= user_comparator_
->CompareWithoutTimestamp(
190 user_key_
, ExtractUserKey(f
->largest_key
));
193 // Setup file search bound for the next level based on the
194 // comparison results
195 if (curr_level_
> 0) {
196 file_indexer_
->GetNextLevelIndex(curr_level_
,
197 curr_index_in_curr_level_
,
198 cmp_smallest
, cmp_largest
,
200 &search_right_bound_
);
202 // Key falls out of current file's range
203 if (cmp_smallest
< 0 || cmp_largest
> 0) {
204 if (curr_level_
== 0) {
205 ++curr_index_in_curr_level_
;
208 // Search next level.
214 // Sanity check to make sure that the files are correctly sorted
216 if (curr_level_
!= 0) {
217 int comp_sign
= internal_comparator_
->Compare(
218 prev_file_
->largest_key
, f
->smallest_key
);
219 assert(comp_sign
< 0);
221 // level == 0, the current file cannot be newer than the previous
222 // one. Use compressed data structure, has no attribute seqNo
223 assert(curr_index_in_curr_level_
> 0);
224 assert(!NewestFirstBySeqNo(files_
[0][curr_index_in_curr_level_
],
225 files_
[0][curr_index_in_curr_level_
-1]));
230 returned_file_level_
= curr_level_
;
231 if (curr_level_
> 0 && cmp_largest
< 0) {
232 // No more files to search in this level.
233 search_ended_
= !PrepareNextLevel();
235 ++curr_index_in_curr_level_
;
239 // Start searching next level.
240 search_ended_
= !PrepareNextLevel();
246 // getter for current file level
247 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
248 unsigned int GetHitFileLevel() { return hit_file_level_
; }
250 // Returns true if the most recent "hit file" (i.e., one returned by
251 // GetNextFile()) is at the last index in its level.
252 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_
; }
255 unsigned int num_levels_
;
256 unsigned int curr_level_
;
257 unsigned int returned_file_level_
;
258 unsigned int hit_file_level_
;
259 int32_t search_left_bound_
;
260 int32_t search_right_bound_
;
262 std::vector
<FileMetaData
*>* files_
;
264 autovector
<LevelFilesBrief
>* level_files_brief_
;
266 bool is_hit_file_last_in_level_
;
267 LevelFilesBrief
* curr_file_level_
;
268 unsigned int curr_index_in_curr_level_
;
269 unsigned int start_index_in_curr_level_
;
272 FileIndexer
* file_indexer_
;
273 const Comparator
* user_comparator_
;
274 const InternalKeyComparator
* internal_comparator_
;
276 FdWithKeyRange
* prev_file_
;
279 // Setup local variables to search next level.
280 // Returns false if there are no more levels to search.
281 bool PrepareNextLevel() {
283 while (curr_level_
< num_levels_
) {
284 curr_file_level_
= &(*level_files_brief_
)[curr_level_
];
285 if (curr_file_level_
->num_files
== 0) {
286 // When current level is empty, the search bound generated from upper
287 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
289 assert(search_left_bound_
== 0);
290 assert(search_right_bound_
== -1 ||
291 search_right_bound_
== FileIndexer::kLevelMaxIndex
);
292 // Since current level is empty, it will need to search all files in
294 search_left_bound_
= 0;
295 search_right_bound_
= FileIndexer::kLevelMaxIndex
;
300 // Some files may overlap each other. We find
301 // all files that overlap user_key and process them in order from
302 // newest to oldest. In the context of merge-operator, this can occur at
303 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
304 // are always compacted into a single entry).
306 if (curr_level_
== 0) {
307 // On Level-0, we read through all files to check for overlap.
310 // On Level-n (n>=1), files are sorted. Binary search to find the
311 // earliest file whose largest key >= ikey. Search left bound and
312 // right bound are used to narrow the range.
313 if (search_left_bound_
<= search_right_bound_
) {
314 if (search_right_bound_
== FileIndexer::kLevelMaxIndex
) {
315 search_right_bound_
=
316 static_cast<int32_t>(curr_file_level_
->num_files
) - 1;
318 // `search_right_bound_` is an inclusive upper-bound, but since it was
319 // determined based on user key, it is still possible the lookup key
320 // falls to the right of `search_right_bound_`'s corresponding file.
321 // So, pass a limit one higher, which allows us to detect this case.
323 FindFileInRange(*internal_comparator_
, *curr_file_level_
, ikey_
,
324 static_cast<uint32_t>(search_left_bound_
),
325 static_cast<uint32_t>(search_right_bound_
) + 1);
326 if (start_index
== search_right_bound_
+ 1) {
327 // `ikey_` comes after `search_right_bound_`. The lookup key does
328 // not exist on this level, so let's skip this level and do a full
329 // binary search on the next level.
330 search_left_bound_
= 0;
331 search_right_bound_
= FileIndexer::kLevelMaxIndex
;
336 // search_left_bound > search_right_bound, key does not exist in
337 // this level. Since no comparison is done in this level, it will
338 // need to search all files in the next level.
339 search_left_bound_
= 0;
340 search_right_bound_
= FileIndexer::kLevelMaxIndex
;
345 start_index_in_curr_level_
= start_index
;
346 curr_index_in_curr_level_
= start_index
;
348 prev_file_
= nullptr;
352 // curr_level_ = num_levels_. So, no more levels to search.
357 class FilePickerMultiGet
{
359 struct FilePickerContext
;
362 FilePickerMultiGet(MultiGetRange
* range
,
363 autovector
<LevelFilesBrief
>* file_levels
,
364 unsigned int num_levels
, FileIndexer
* file_indexer
,
365 const Comparator
* user_comparator
,
366 const InternalKeyComparator
* internal_comparator
)
367 : num_levels_(num_levels
),
368 curr_level_(static_cast<unsigned int>(-1)),
369 returned_file_level_(static_cast<unsigned int>(-1)),
370 hit_file_level_(static_cast<unsigned int>(-1)),
372 batch_iter_(range
->begin()),
373 batch_iter_prev_(range
->begin()),
374 upper_key_(range
->begin()),
375 maybe_repeat_key_(false),
376 current_level_range_(*range
, range
->begin(), range
->end()),
377 current_file_range_(*range
, range
->begin(), range
->end()),
378 level_files_brief_(file_levels
),
379 is_hit_file_last_in_level_(false),
380 curr_file_level_(nullptr),
381 file_indexer_(file_indexer
),
382 user_comparator_(user_comparator
),
383 internal_comparator_(internal_comparator
) {
384 for (auto iter
= range_
->begin(); iter
!= range_
->end(); ++iter
) {
385 fp_ctx_array_
[iter
.index()] =
386 FilePickerContext(0, FileIndexer::kLevelMaxIndex
);
389 // Setup member variables to search first level.
390 search_ended_
= !PrepareNextLevel();
391 if (!search_ended_
) {
393 // Prefetch Level 0 table data to avoid cache miss if possible.
394 // As of now, only PlainTableReader and CuckooTableReader do any
395 // prefetching. This may not be necessary anymore once we implement
396 // batching in those table readers
397 for (unsigned int i
= 0; i
< (*level_files_brief_
)[0].num_files
; ++i
) {
398 auto* r
= (*level_files_brief_
)[0].files
[i
].fd
.table_reader
;
400 for (auto iter
= range_
->begin(); iter
!= range_
->end(); ++iter
) {
401 r
->Prepare(iter
->ikey
);
408 int GetCurrentLevel() const { return curr_level_
; }
410 // Iterates through files in the current level until it finds a file that
411 // contains atleast one key from the MultiGet batch
412 bool GetNextFileInLevelWithKeys(MultiGetRange
* next_file_range
,
413 size_t* file_index
, FdWithKeyRange
** fd
,
414 bool* is_last_key_in_file
) {
415 size_t curr_file_index
= *file_index
;
416 FdWithKeyRange
* f
= nullptr;
417 bool file_hit
= false;
418 int cmp_largest
= -1;
419 if (curr_file_index
>= curr_file_level_
->num_files
) {
420 // In the unlikely case the next key is a duplicate of the current key,
421 // and the current key is the last in the level and the internal key
422 // was not found, we need to skip lookup for the remaining keys and
423 // reset the search bounds
424 if (batch_iter_
!= current_level_range_
.end()) {
426 for (; batch_iter_
!= current_level_range_
.end(); ++batch_iter_
) {
427 struct FilePickerContext
& fp_ctx
= fp_ctx_array_
[batch_iter_
.index()];
428 fp_ctx
.search_left_bound
= 0;
429 fp_ctx
.search_right_bound
= FileIndexer::kLevelMaxIndex
;
434 // Loops over keys in the MultiGet batch until it finds a file with
435 // atleast one of the keys. Then it keeps moving forward until the
436 // last key in the batch that falls in that file
437 while (batch_iter_
!= current_level_range_
.end() &&
438 (fp_ctx_array_
[batch_iter_
.index()].curr_index_in_curr_level
==
441 struct FilePickerContext
& fp_ctx
= fp_ctx_array_
[batch_iter_
.index()];
442 f
= &curr_file_level_
->files
[fp_ctx
.curr_index_in_curr_level
];
443 Slice
& user_key
= batch_iter_
->ukey_without_ts
;
445 // Do key range filtering of files or/and fractional cascading if:
446 // (1) not all the files are in level 0, or
447 // (2) there are more than 3 current level files
448 // If there are only 3 or less current level files in the system, we
449 // skip the key range filtering. In this case, more likely, the system
450 // is highly tuned to minimize number of tables queried by each query,
451 // so it is unlikely that key range filtering is more efficient than
452 // querying the files.
453 if (num_levels_
> 1 || curr_file_level_
->num_files
> 3) {
454 // Check if key is within a file's range. If search left bound and
455 // right bound point to the same find, we are sure key falls in
457 int cmp_smallest
= user_comparator_
->CompareWithoutTimestamp(
458 user_key
, false, ExtractUserKey(f
->smallest_key
), true);
460 assert(curr_level_
== 0 ||
461 fp_ctx
.curr_index_in_curr_level
==
462 fp_ctx
.start_index_in_curr_level
||
465 if (cmp_smallest
>= 0) {
466 cmp_largest
= user_comparator_
->CompareWithoutTimestamp(
467 user_key
, false, ExtractUserKey(f
->largest_key
), true);
472 // Setup file search bound for the next level based on the
473 // comparison results
474 if (curr_level_
> 0) {
475 file_indexer_
->GetNextLevelIndex(
476 curr_level_
, fp_ctx
.curr_index_in_curr_level
, cmp_smallest
,
477 cmp_largest
, &fp_ctx
.search_left_bound
,
478 &fp_ctx
.search_right_bound
);
480 // Key falls out of current file's range
481 if (cmp_smallest
< 0 || cmp_largest
> 0) {
482 next_file_range
->SkipKey(batch_iter_
);
489 if (cmp_largest
== 0) {
490 // cmp_largest is 0, which means the next key will not be in this
491 // file, so stop looking further. However, its possible there are
492 // duplicates in the batch, so find the upper bound for the batch
493 // in this file (upper_key_) by skipping past the duplicates. We
494 // leave batch_iter_ as is since we may have to pick up from there
495 // for the next file, if this file has a merge value rather than
497 upper_key_
= batch_iter_
;
499 while (upper_key_
!= current_level_range_
.end() &&
500 user_comparator_
->CompareWithoutTimestamp(
501 batch_iter_
->ukey_without_ts
, false,
502 upper_key_
->ukey_without_ts
, false) == 0) {
507 if (curr_level_
== 0) {
508 // We need to look through all files in level 0
509 ++fp_ctx
.curr_index_in_curr_level
;
515 (batch_iter_
!= current_level_range_
.end())
516 ? fp_ctx_array_
[batch_iter_
.index()].curr_index_in_curr_level
517 : curr_file_level_
->num_files
;
522 *file_index
= curr_file_index
;
523 *is_last_key_in_file
= cmp_largest
== 0;
524 if (!*is_last_key_in_file
) {
525 // If the largest key in the batch overlapping the file is not the
526 // largest key in the file, upper_ley_ would not have been updated so
528 upper_key_
= batch_iter_
;
533 FdWithKeyRange
* GetNextFile() {
534 while (!search_ended_
) {
535 // Start searching next level.
536 if (batch_iter_
== current_level_range_
.end()) {
537 search_ended_
= !PrepareNextLevel();
540 if (maybe_repeat_key_
) {
541 maybe_repeat_key_
= false;
542 // Check if we found the final value for the last key in the
543 // previous lookup range. If we did, then there's no need to look
544 // any further for that key, so advance batch_iter_. Else, keep
545 // batch_iter_ positioned on that key so we look it up again in
547 // For L0, always advance the key because we will look in the next
548 // file regardless for all keys not found yet
549 if (current_level_range_
.CheckKeyDone(batch_iter_
) ||
551 batch_iter_
= upper_key_
;
554 // batch_iter_prev_ will become the start key for the next file
556 batch_iter_prev_
= batch_iter_
;
559 MultiGetRange
next_file_range(current_level_range_
, batch_iter_prev_
,
560 current_level_range_
.end());
561 size_t curr_file_index
=
562 (batch_iter_
!= current_level_range_
.end())
563 ? fp_ctx_array_
[batch_iter_
.index()].curr_index_in_curr_level
564 : curr_file_level_
->num_files
;
566 bool is_last_key_in_file
;
567 if (!GetNextFileInLevelWithKeys(&next_file_range
, &curr_file_index
, &f
,
568 &is_last_key_in_file
)) {
569 search_ended_
= !PrepareNextLevel();
571 if (is_last_key_in_file
) {
572 // Since cmp_largest is 0, batch_iter_ still points to the last key
573 // that falls in this file, instead of the next one. Increment
574 // the file index for all keys between batch_iter_ and upper_key_
575 auto tmp_iter
= batch_iter_
;
576 while (tmp_iter
!= upper_key_
) {
577 ++(fp_ctx_array_
[tmp_iter
.index()].curr_index_in_curr_level
);
580 maybe_repeat_key_
= true;
582 // Set the range for this file
583 current_file_range_
=
584 MultiGetRange(next_file_range
, batch_iter_prev_
, upper_key_
);
585 returned_file_level_
= curr_level_
;
586 hit_file_level_
= curr_level_
;
587 is_hit_file_last_in_level_
=
588 curr_file_index
== curr_file_level_
->num_files
- 1;
597 // getter for current file level
598 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
599 unsigned int GetHitFileLevel() { return hit_file_level_
; }
601 // Returns true if the most recent "hit file" (i.e., one returned by
602 // GetNextFile()) is at the last index in its level.
603 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_
; }
605 const MultiGetRange
& CurrentFileRange() { return current_file_range_
; }
608 unsigned int num_levels_
;
609 unsigned int curr_level_
;
610 unsigned int returned_file_level_
;
611 unsigned int hit_file_level_
;
613 struct FilePickerContext
{
614 int32_t search_left_bound
;
615 int32_t search_right_bound
;
616 unsigned int curr_index_in_curr_level
;
617 unsigned int start_index_in_curr_level
;
619 FilePickerContext(int32_t left
, int32_t right
)
620 : search_left_bound(left
), search_right_bound(right
),
621 curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
623 FilePickerContext() = default;
625 std::array
<FilePickerContext
, MultiGetContext::MAX_BATCH_SIZE
> fp_ctx_array_
;
626 MultiGetRange
* range_
;
627 // Iterator to iterate through the keys in a MultiGet batch, that gets reset
628 // at the beginning of each level. Each call to GetNextFile() will position
629 // batch_iter_ at or right after the last key that was found in the returned
631 MultiGetRange::Iterator batch_iter_
;
632 // An iterator that records the previous position of batch_iter_, i.e last
633 // key found in the previous SST file, in order to serve as the start of
634 // the batch key range for the next SST file
635 MultiGetRange::Iterator batch_iter_prev_
;
636 MultiGetRange::Iterator upper_key_
;
637 bool maybe_repeat_key_
;
638 MultiGetRange current_level_range_
;
639 MultiGetRange current_file_range_
;
640 autovector
<LevelFilesBrief
>* level_files_brief_
;
642 bool is_hit_file_last_in_level_
;
643 LevelFilesBrief
* curr_file_level_
;
644 FileIndexer
* file_indexer_
;
645 const Comparator
* user_comparator_
;
646 const InternalKeyComparator
* internal_comparator_
;
648 // Setup local variables to search next level.
649 // Returns false if there are no more levels to search.
650 bool PrepareNextLevel() {
651 if (curr_level_
== 0) {
652 MultiGetRange::Iterator mget_iter
= current_level_range_
.begin();
653 if (fp_ctx_array_
[mget_iter
.index()].curr_index_in_curr_level
<
654 curr_file_level_
->num_files
) {
655 batch_iter_prev_
= current_level_range_
.begin();
656 upper_key_
= batch_iter_
= current_level_range_
.begin();
662 // Reset key range to saved value
663 while (curr_level_
< num_levels_
) {
664 bool level_contains_keys
= false;
665 curr_file_level_
= &(*level_files_brief_
)[curr_level_
];
666 if (curr_file_level_
->num_files
== 0) {
667 // When current level is empty, the search bound generated from upper
668 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
671 for (auto mget_iter
= current_level_range_
.begin();
672 mget_iter
!= current_level_range_
.end(); ++mget_iter
) {
673 struct FilePickerContext
& fp_ctx
= fp_ctx_array_
[mget_iter
.index()];
675 assert(fp_ctx
.search_left_bound
== 0);
676 assert(fp_ctx
.search_right_bound
== -1 ||
677 fp_ctx
.search_right_bound
== FileIndexer::kLevelMaxIndex
);
678 // Since current level is empty, it will need to search all files in
680 fp_ctx
.search_left_bound
= 0;
681 fp_ctx
.search_right_bound
= FileIndexer::kLevelMaxIndex
;
683 // Skip all subsequent empty levels
686 } while ((curr_level_
< num_levels_
) &&
687 (*level_files_brief_
)[curr_level_
].num_files
== 0);
691 // Some files may overlap each other. We find
692 // all files that overlap user_key and process them in order from
693 // newest to oldest. In the context of merge-operator, this can occur at
694 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
695 // are always compacted into a single entry).
696 int32_t start_index
= -1;
697 current_level_range_
=
698 MultiGetRange(*range_
, range_
->begin(), range_
->end());
699 for (auto mget_iter
= current_level_range_
.begin();
700 mget_iter
!= current_level_range_
.end(); ++mget_iter
) {
701 struct FilePickerContext
& fp_ctx
= fp_ctx_array_
[mget_iter
.index()];
702 if (curr_level_
== 0) {
703 // On Level-0, we read through all files to check for overlap.
705 level_contains_keys
= true;
707 // On Level-n (n>=1), files are sorted. Binary search to find the
708 // earliest file whose largest key >= ikey. Search left bound and
709 // right bound are used to narrow the range.
710 if (fp_ctx
.search_left_bound
<= fp_ctx
.search_right_bound
) {
711 if (fp_ctx
.search_right_bound
== FileIndexer::kLevelMaxIndex
) {
712 fp_ctx
.search_right_bound
=
713 static_cast<int32_t>(curr_file_level_
->num_files
) - 1;
715 // `search_right_bound_` is an inclusive upper-bound, but since it
716 // was determined based on user key, it is still possible the lookup
717 // key falls to the right of `search_right_bound_`'s corresponding
718 // file. So, pass a limit one higher, which allows us to detect this
720 Slice
& ikey
= mget_iter
->ikey
;
721 start_index
= FindFileInRange(
722 *internal_comparator_
, *curr_file_level_
, ikey
,
723 static_cast<uint32_t>(fp_ctx
.search_left_bound
),
724 static_cast<uint32_t>(fp_ctx
.search_right_bound
) + 1);
725 if (start_index
== fp_ctx
.search_right_bound
+ 1) {
726 // `ikey_` comes after `search_right_bound_`. The lookup key does
727 // not exist on this level, so let's skip this level and do a full
728 // binary search on the next level.
729 fp_ctx
.search_left_bound
= 0;
730 fp_ctx
.search_right_bound
= FileIndexer::kLevelMaxIndex
;
731 current_level_range_
.SkipKey(mget_iter
);
734 level_contains_keys
= true;
737 // search_left_bound > search_right_bound, key does not exist in
738 // this level. Since no comparison is done in this level, it will
739 // need to search all files in the next level.
740 fp_ctx
.search_left_bound
= 0;
741 fp_ctx
.search_right_bound
= FileIndexer::kLevelMaxIndex
;
742 current_level_range_
.SkipKey(mget_iter
);
746 fp_ctx
.start_index_in_curr_level
= start_index
;
747 fp_ctx
.curr_index_in_curr_level
= start_index
;
749 if (level_contains_keys
) {
750 batch_iter_prev_
= current_level_range_
.begin();
751 upper_key_
= batch_iter_
= current_level_range_
.begin();
756 // curr_level_ = num_levels_. So, no more levels to search.
760 } // anonymous namespace
762 VersionStorageInfo::~VersionStorageInfo() { delete[] files_
; }
764 Version::~Version() {
767 // Remove from linked list
768 prev_
->next_
= next_
;
769 next_
->prev_
= prev_
;
771 // Drop references to files
772 for (int level
= 0; level
< storage_info_
.num_levels_
; level
++) {
773 for (size_t i
= 0; i
< storage_info_
.files_
[level
].size(); i
++) {
774 FileMetaData
* f
= storage_info_
.files_
[level
][i
];
778 assert(cfd_
!= nullptr);
779 uint32_t path_id
= f
->fd
.GetPathId();
780 assert(path_id
< cfd_
->ioptions()->cf_paths
.size());
781 vset_
->obsolete_files_
.push_back(
782 ObsoleteFileInfo(f
, cfd_
->ioptions()->cf_paths
[path_id
].path
));
788 int FindFile(const InternalKeyComparator
& icmp
,
789 const LevelFilesBrief
& file_level
,
791 return FindFileInRange(icmp
, file_level
, key
, 0,
792 static_cast<uint32_t>(file_level
.num_files
));
795 void DoGenerateLevelFilesBrief(LevelFilesBrief
* file_level
,
796 const std::vector
<FileMetaData
*>& files
,
801 size_t num
= files
.size();
802 file_level
->num_files
= num
;
803 char* mem
= arena
->AllocateAligned(num
* sizeof(FdWithKeyRange
));
804 file_level
->files
= new (mem
)FdWithKeyRange
[num
];
806 for (size_t i
= 0; i
< num
; i
++) {
807 Slice smallest_key
= files
[i
]->smallest
.Encode();
808 Slice largest_key
= files
[i
]->largest
.Encode();
810 // Copy key slice to sequential memory
811 size_t smallest_size
= smallest_key
.size();
812 size_t largest_size
= largest_key
.size();
813 mem
= arena
->AllocateAligned(smallest_size
+ largest_size
);
814 memcpy(mem
, smallest_key
.data(), smallest_size
);
815 memcpy(mem
+ smallest_size
, largest_key
.data(), largest_size
);
817 FdWithKeyRange
& f
= file_level
->files
[i
];
819 f
.file_metadata
= files
[i
];
820 f
.smallest_key
= Slice(mem
, smallest_size
);
821 f
.largest_key
= Slice(mem
+ smallest_size
, largest_size
);
825 static bool AfterFile(const Comparator
* ucmp
,
826 const Slice
* user_key
, const FdWithKeyRange
* f
) {
827 // nullptr user_key occurs before all keys and is therefore never after *f
828 return (user_key
!= nullptr &&
829 ucmp
->CompareWithoutTimestamp(*user_key
,
830 ExtractUserKey(f
->largest_key
)) > 0);
833 static bool BeforeFile(const Comparator
* ucmp
,
834 const Slice
* user_key
, const FdWithKeyRange
* f
) {
835 // nullptr user_key occurs after all keys and is therefore never before *f
836 return (user_key
!= nullptr &&
837 ucmp
->CompareWithoutTimestamp(*user_key
,
838 ExtractUserKey(f
->smallest_key
)) < 0);
841 bool SomeFileOverlapsRange(
842 const InternalKeyComparator
& icmp
,
843 bool disjoint_sorted_files
,
844 const LevelFilesBrief
& file_level
,
845 const Slice
* smallest_user_key
,
846 const Slice
* largest_user_key
) {
847 const Comparator
* ucmp
= icmp
.user_comparator();
848 if (!disjoint_sorted_files
) {
849 // Need to check against all files
850 for (size_t i
= 0; i
< file_level
.num_files
; i
++) {
851 const FdWithKeyRange
* f
= &(file_level
.files
[i
]);
852 if (AfterFile(ucmp
, smallest_user_key
, f
) ||
853 BeforeFile(ucmp
, largest_user_key
, f
)) {
856 return true; // Overlap
862 // Binary search over file list
864 if (smallest_user_key
!= nullptr) {
865 // Find the leftmost possible internal key for smallest_user_key
867 small
.SetMinPossibleForUserKey(*smallest_user_key
);
868 index
= FindFile(icmp
, file_level
, small
.Encode());
871 if (index
>= file_level
.num_files
) {
872 // beginning of range is after all files, so no overlap.
876 return !BeforeFile(ucmp
, largest_user_key
, &file_level
.files
[index
]);
881 class LevelIterator final
: public InternalIterator
{
883 // @param read_options Must outlive this iterator.
884 LevelIterator(TableCache
* table_cache
, const ReadOptions
& read_options
,
885 const FileOptions
& file_options
,
886 const InternalKeyComparator
& icomparator
,
887 const LevelFilesBrief
* flevel
,
888 const SliceTransform
* prefix_extractor
, bool should_sample
,
889 HistogramImpl
* file_read_hist
, TableReaderCaller caller
,
890 bool skip_filters
, int level
, RangeDelAggregator
* range_del_agg
,
891 const std::vector
<AtomicCompactionUnitBoundary
>*
892 compaction_boundaries
= nullptr,
893 bool allow_unprepared_value
= false)
894 : table_cache_(table_cache
),
895 read_options_(read_options
),
896 file_options_(file_options
),
897 icomparator_(icomparator
),
898 user_comparator_(icomparator
.user_comparator()),
900 prefix_extractor_(prefix_extractor
),
901 file_read_hist_(file_read_hist
),
902 should_sample_(should_sample
),
904 skip_filters_(skip_filters
),
905 allow_unprepared_value_(allow_unprepared_value
),
906 file_index_(flevel_
->num_files
),
908 range_del_agg_(range_del_agg
),
909 pinned_iters_mgr_(nullptr),
910 compaction_boundaries_(compaction_boundaries
) {
911 // Empty level is not supported.
912 assert(flevel_
!= nullptr && flevel_
->num_files
> 0);
915 ~LevelIterator() override
{ delete file_iter_
.Set(nullptr); }
917 void Seek(const Slice
& target
) override
;
918 void SeekForPrev(const Slice
& target
) override
;
919 void SeekToFirst() override
;
920 void SeekToLast() override
;
921 void Next() final override
;
922 bool NextAndGetResult(IterateResult
* result
) override
;
923 void Prev() override
;
925 bool Valid() const override
{ return file_iter_
.Valid(); }
926 Slice
key() const override
{
928 return file_iter_
.key();
931 Slice
value() const override
{
933 return file_iter_
.value();
936 Status
status() const override
{
937 return file_iter_
.iter() ? file_iter_
.status() : Status::OK();
940 bool PrepareValue() override
{
941 return file_iter_
.PrepareValue();
944 inline bool MayBeOutOfLowerBound() override
{
946 return may_be_out_of_lower_bound_
&& file_iter_
.MayBeOutOfLowerBound();
949 inline IterBoundCheck
UpperBoundCheckResult() override
{
951 return file_iter_
.UpperBoundCheckResult();
953 return IterBoundCheck::kUnknown
;
957 void SetPinnedItersMgr(PinnedIteratorsManager
* pinned_iters_mgr
) override
{
958 pinned_iters_mgr_
= pinned_iters_mgr
;
959 if (file_iter_
.iter()) {
960 file_iter_
.SetPinnedItersMgr(pinned_iters_mgr
);
964 bool IsKeyPinned() const override
{
965 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
966 file_iter_
.iter() && file_iter_
.IsKeyPinned();
969 bool IsValuePinned() const override
{
970 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
971 file_iter_
.iter() && file_iter_
.IsValuePinned();
975 // Return true if at least one invalid file is seen and skipped.
976 bool SkipEmptyFileForward();
977 void SkipEmptyFileBackward();
978 void SetFileIterator(InternalIterator
* iter
);
979 void InitFileIterator(size_t new_file_index
);
981 const Slice
& file_smallest_key(size_t file_index
) {
982 assert(file_index
< flevel_
->num_files
);
983 return flevel_
->files
[file_index
].smallest_key
;
986 bool KeyReachedUpperBound(const Slice
& internal_key
) {
987 return read_options_
.iterate_upper_bound
!= nullptr &&
988 user_comparator_
.CompareWithoutTimestamp(
989 ExtractUserKey(internal_key
), /*a_has_ts=*/true,
990 *read_options_
.iterate_upper_bound
, /*b_has_ts=*/false) >= 0;
993 InternalIterator
* NewFileIterator() {
994 assert(file_index_
< flevel_
->num_files
);
995 auto file_meta
= flevel_
->files
[file_index_
];
996 if (should_sample_
) {
997 sample_file_read_inc(file_meta
.file_metadata
);
1000 const InternalKey
* smallest_compaction_key
= nullptr;
1001 const InternalKey
* largest_compaction_key
= nullptr;
1002 if (compaction_boundaries_
!= nullptr) {
1003 smallest_compaction_key
= (*compaction_boundaries_
)[file_index_
].smallest
;
1004 largest_compaction_key
= (*compaction_boundaries_
)[file_index_
].largest
;
1006 CheckMayBeOutOfLowerBound();
1007 return table_cache_
->NewIterator(
1008 read_options_
, file_options_
, icomparator_
, *file_meta
.file_metadata
,
1009 range_del_agg_
, prefix_extractor_
,
1010 nullptr /* don't need reference to table */, file_read_hist_
, caller_
,
1011 /*arena=*/nullptr, skip_filters_
, level_
,
1012 /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key
,
1013 largest_compaction_key
, allow_unprepared_value_
);
1016 // Check if current file being fully within iterate_lower_bound.
1018 // Note MyRocks may update iterate bounds between seek. To workaround it,
1019 // we need to check and update may_be_out_of_lower_bound_ accordingly.
1020 void CheckMayBeOutOfLowerBound() {
1021 if (read_options_
.iterate_lower_bound
!= nullptr &&
1022 file_index_
< flevel_
->num_files
) {
1023 may_be_out_of_lower_bound_
=
1024 user_comparator_
.CompareWithoutTimestamp(
1025 ExtractUserKey(file_smallest_key(file_index_
)), /*a_has_ts=*/true,
1026 *read_options_
.iterate_lower_bound
, /*b_has_ts=*/false) < 0;
1030 TableCache
* table_cache_
;
1031 const ReadOptions
& read_options_
;
1032 const FileOptions
& file_options_
;
1033 const InternalKeyComparator
& icomparator_
;
1034 const UserComparatorWrapper user_comparator_
;
1035 const LevelFilesBrief
* flevel_
;
1036 mutable FileDescriptor current_value_
;
1037 // `prefix_extractor_` may be non-null even for total order seek. Checking
1038 // this variable is not the right way to identify whether prefix iterator
1040 const SliceTransform
* prefix_extractor_
;
1042 HistogramImpl
* file_read_hist_
;
1043 bool should_sample_
;
1044 TableReaderCaller caller_
;
1046 bool allow_unprepared_value_
;
1047 bool may_be_out_of_lower_bound_
= true;
1050 RangeDelAggregator
* range_del_agg_
;
1051 IteratorWrapper file_iter_
; // May be nullptr
1052 PinnedIteratorsManager
* pinned_iters_mgr_
;
1054 // To be propagated to RangeDelAggregator in order to safely truncate range
1056 const std::vector
<AtomicCompactionUnitBoundary
>* compaction_boundaries_
;
1059 void LevelIterator::Seek(const Slice
& target
) {
1060 // Check whether the seek key fall under the same file
1061 bool need_to_reseek
= true;
1062 if (file_iter_
.iter() != nullptr && file_index_
< flevel_
->num_files
) {
1063 const FdWithKeyRange
& cur_file
= flevel_
->files
[file_index_
];
1064 if (icomparator_
.InternalKeyComparator::Compare(
1065 target
, cur_file
.largest_key
) <= 0 &&
1066 icomparator_
.InternalKeyComparator::Compare(
1067 target
, cur_file
.smallest_key
) >= 0) {
1068 need_to_reseek
= false;
1069 assert(static_cast<size_t>(FindFile(icomparator_
, *flevel_
, target
)) ==
1073 if (need_to_reseek
) {
1074 TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
1075 size_t new_file_index
= FindFile(icomparator_
, *flevel_
, target
);
1076 InitFileIterator(new_file_index
);
1079 if (file_iter_
.iter() != nullptr) {
1080 file_iter_
.Seek(target
);
1082 if (SkipEmptyFileForward() && prefix_extractor_
!= nullptr &&
1083 !read_options_
.total_order_seek
&& !read_options_
.auto_prefix_mode
&&
1084 file_iter_
.iter() != nullptr && file_iter_
.Valid()) {
1085 // We've skipped the file we initially positioned to. In the prefix
1086 // seek case, it is likely that the file is skipped because of
1087 // prefix bloom or hash, where more keys are skipped. We then check
1088 // the current key and invalidate the iterator if the prefix is
1090 // When doing prefix iterator seek, when keys for one prefix have
1091 // been exhausted, it can jump to any key that is larger. Here we are
1092 // enforcing a stricter contract than that, in order to make it easier for
1093 // higher layers (merging and DB iterator) to reason the correctness:
1094 // 1. Within the prefix, the result should be accurate.
1095 // 2. If keys for the prefix is exhausted, it is either positioned to the
1096 // next key after the prefix, or make the iterator invalid.
1097 // A side benefit will be that it invalidates the iterator earlier so that
1098 // the upper level merging iterator can merge fewer child iterators.
1099 size_t ts_sz
= user_comparator_
.timestamp_size();
1100 Slice target_user_key_without_ts
=
1101 ExtractUserKeyAndStripTimestamp(target
, ts_sz
);
1102 Slice file_user_key_without_ts
=
1103 ExtractUserKeyAndStripTimestamp(file_iter_
.key(), ts_sz
);
1104 if (prefix_extractor_
->InDomain(target_user_key_without_ts
) &&
1105 (!prefix_extractor_
->InDomain(file_user_key_without_ts
) ||
1106 user_comparator_
.CompareWithoutTimestamp(
1107 prefix_extractor_
->Transform(target_user_key_without_ts
), false,
1108 prefix_extractor_
->Transform(file_user_key_without_ts
),
1110 SetFileIterator(nullptr);
1113 CheckMayBeOutOfLowerBound();
1116 void LevelIterator::SeekForPrev(const Slice
& target
) {
1117 size_t new_file_index
= FindFile(icomparator_
, *flevel_
, target
);
1118 if (new_file_index
>= flevel_
->num_files
) {
1119 new_file_index
= flevel_
->num_files
- 1;
1122 InitFileIterator(new_file_index
);
1123 if (file_iter_
.iter() != nullptr) {
1124 file_iter_
.SeekForPrev(target
);
1125 SkipEmptyFileBackward();
1127 CheckMayBeOutOfLowerBound();
1130 void LevelIterator::SeekToFirst() {
1131 InitFileIterator(0);
1132 if (file_iter_
.iter() != nullptr) {
1133 file_iter_
.SeekToFirst();
1135 SkipEmptyFileForward();
1136 CheckMayBeOutOfLowerBound();
1139 void LevelIterator::SeekToLast() {
1140 InitFileIterator(flevel_
->num_files
- 1);
1141 if (file_iter_
.iter() != nullptr) {
1142 file_iter_
.SeekToLast();
1144 SkipEmptyFileBackward();
1145 CheckMayBeOutOfLowerBound();
1148 void LevelIterator::Next() {
1151 SkipEmptyFileForward();
1154 bool LevelIterator::NextAndGetResult(IterateResult
* result
) {
1156 bool is_valid
= file_iter_
.NextAndGetResult(result
);
1158 SkipEmptyFileForward();
1161 result
->key
= key();
1162 result
->bound_check_result
= file_iter_
.UpperBoundCheckResult();
1163 // Ideally, we should return the real file_iter_.value_prepared but the
1164 // information is not here. It would casue an extra PrepareValue()
1165 // for the first key of a file.
1166 result
->value_prepared
= !allow_unprepared_value_
;
1172 void LevelIterator::Prev() {
1175 SkipEmptyFileBackward();
1178 bool LevelIterator::SkipEmptyFileForward() {
1179 bool seen_empty_file
= false;
1180 while (file_iter_
.iter() == nullptr ||
1181 (!file_iter_
.Valid() && file_iter_
.status().ok() &&
1182 file_iter_
.iter()->UpperBoundCheckResult() !=
1183 IterBoundCheck::kOutOfBound
)) {
1184 seen_empty_file
= true;
1185 // Move to next file
1186 if (file_index_
>= flevel_
->num_files
- 1) {
1187 // Already at the last file
1188 SetFileIterator(nullptr);
1191 if (KeyReachedUpperBound(file_smallest_key(file_index_
+ 1))) {
1192 SetFileIterator(nullptr);
1195 InitFileIterator(file_index_
+ 1);
1196 if (file_iter_
.iter() != nullptr) {
1197 file_iter_
.SeekToFirst();
1200 return seen_empty_file
;
1203 void LevelIterator::SkipEmptyFileBackward() {
1204 while (file_iter_
.iter() == nullptr ||
1205 (!file_iter_
.Valid() && file_iter_
.status().ok())) {
1206 // Move to previous file
1207 if (file_index_
== 0) {
1208 // Already the first file
1209 SetFileIterator(nullptr);
1212 InitFileIterator(file_index_
- 1);
1213 if (file_iter_
.iter() != nullptr) {
1214 file_iter_
.SeekToLast();
1219 void LevelIterator::SetFileIterator(InternalIterator
* iter
) {
1220 if (pinned_iters_mgr_
&& iter
) {
1221 iter
->SetPinnedItersMgr(pinned_iters_mgr_
);
1224 InternalIterator
* old_iter
= file_iter_
.Set(iter
);
1225 if (pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled()) {
1226 pinned_iters_mgr_
->PinIterator(old_iter
);
1232 void LevelIterator::InitFileIterator(size_t new_file_index
) {
1233 if (new_file_index
>= flevel_
->num_files
) {
1234 file_index_
= new_file_index
;
1235 SetFileIterator(nullptr);
1238 // If the file iterator shows incomplete, we try it again if users seek
1239 // to the same file, as this time we may go to a different data block
1240 // which is cached in block cache.
1242 if (file_iter_
.iter() != nullptr && !file_iter_
.status().IsIncomplete() &&
1243 new_file_index
== file_index_
) {
1244 // file_iter_ is already constructed with this iterator, so
1245 // no need to change anything
1247 file_index_
= new_file_index
;
1248 InternalIterator
* iter
= NewFileIterator();
1249 SetFileIterator(iter
);
1253 } // anonymous namespace
1255 Status
Version::GetTableProperties(std::shared_ptr
<const TableProperties
>* tp
,
1256 const FileMetaData
* file_meta
,
1257 const std::string
* fname
) const {
1258 auto table_cache
= cfd_
->table_cache();
1259 auto ioptions
= cfd_
->ioptions();
1260 Status s
= table_cache
->GetTableProperties(
1261 file_options_
, cfd_
->internal_comparator(), file_meta
->fd
, tp
,
1262 mutable_cf_options_
.prefix_extractor
.get(), true /* no io */);
1267 // We only ignore error type `Incomplete` since it's by design that we
1268 // disallow table when it's not in table cache.
1269 if (!s
.IsIncomplete()) {
1273 // 2. Table is not present in table cache, we'll read the table properties
1274 // directly from the properties block in the file.
1275 std::unique_ptr
<FSRandomAccessFile
> file
;
1276 std::string file_name
;
1277 if (fname
!= nullptr) {
1281 TableFileName(ioptions
->cf_paths
, file_meta
->fd
.GetNumber(),
1282 file_meta
->fd
.GetPathId());
1284 s
= ioptions
->fs
->NewRandomAccessFile(file_name
, file_options_
, &file
,
1290 TableProperties
* raw_table_properties
;
1291 // By setting the magic number to kInvalidTableMagicNumber, we can by
1292 // pass the magic number check in the footer.
1293 std::unique_ptr
<RandomAccessFileReader
> file_reader(
1294 new RandomAccessFileReader(
1295 std::move(file
), file_name
, nullptr /* env */, io_tracer_
,
1296 nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
1297 nullptr /* rate_limiter */, ioptions
->listeners
));
1298 s
= ReadTableProperties(
1299 file_reader
.get(), file_meta
->fd
.GetFileSize(),
1300 Footer::kInvalidTableMagicNumber
/* table's magic number */, *ioptions
,
1301 &raw_table_properties
, false /* compression_type_missing */);
1305 RecordTick(ioptions
->statistics
, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES
);
1307 *tp
= std::shared_ptr
<const TableProperties
>(raw_table_properties
);
1311 Status
Version::GetPropertiesOfAllTables(TablePropertiesCollection
* props
) {
1313 for (int level
= 0; level
< storage_info_
.num_levels_
; level
++) {
1314 s
= GetPropertiesOfAllTables(props
, level
);
1320 return Status::OK();
1323 Status
Version::TablesRangeTombstoneSummary(int max_entries_to_print
,
1324 std::string
* out_str
) {
1325 if (max_entries_to_print
<= 0) {
1326 return Status::OK();
1328 int num_entries_left
= max_entries_to_print
;
1330 std::stringstream ss
;
1332 for (int level
= 0; level
< storage_info_
.num_levels_
; level
++) {
1333 for (const auto& file_meta
: storage_info_
.files_
[level
]) {
1335 TableFileName(cfd_
->ioptions()->cf_paths
, file_meta
->fd
.GetNumber(),
1336 file_meta
->fd
.GetPathId());
1338 ss
<< "=== file : " << fname
<< " ===\n";
1340 TableCache
* table_cache
= cfd_
->table_cache();
1341 std::unique_ptr
<FragmentedRangeTombstoneIterator
> tombstone_iter
;
1343 Status s
= table_cache
->GetRangeTombstoneIterator(
1344 ReadOptions(), cfd_
->internal_comparator(), *file_meta
,
1349 if (tombstone_iter
) {
1350 tombstone_iter
->SeekToFirst();
1352 while (tombstone_iter
->Valid() && num_entries_left
> 0) {
1353 ss
<< "start: " << tombstone_iter
->start_key().ToString(true)
1354 << " end: " << tombstone_iter
->end_key().ToString(true)
1355 << " seq: " << tombstone_iter
->seq() << '\n';
1356 tombstone_iter
->Next();
1359 if (num_entries_left
<= 0) {
1364 if (num_entries_left
<= 0) {
1368 assert(num_entries_left
>= 0);
1369 if (num_entries_left
<= 0) {
1370 ss
<< "(results may not be complete)\n";
1373 *out_str
= ss
.str();
1374 return Status::OK();
1377 Status
Version::GetPropertiesOfAllTables(TablePropertiesCollection
* props
,
1379 for (const auto& file_meta
: storage_info_
.files_
[level
]) {
1381 TableFileName(cfd_
->ioptions()->cf_paths
, file_meta
->fd
.GetNumber(),
1382 file_meta
->fd
.GetPathId());
1383 // 1. If the table is already present in table cache, load table
1384 // properties from there.
1385 std::shared_ptr
<const TableProperties
> table_properties
;
1386 Status s
= GetTableProperties(&table_properties
, file_meta
, &fname
);
1388 props
->insert({fname
, table_properties
});
1394 return Status::OK();
1397 Status
Version::GetPropertiesOfTablesInRange(
1398 const Range
* range
, std::size_t n
, TablePropertiesCollection
* props
) const {
1399 for (int level
= 0; level
< storage_info_
.num_non_empty_levels(); level
++) {
1400 for (decltype(n
) i
= 0; i
< n
; i
++) {
1401 // Convert user_key into a corresponding internal key.
1402 InternalKey
k1(range
[i
].start
, kMaxSequenceNumber
, kValueTypeForSeek
);
1403 InternalKey
k2(range
[i
].limit
, kMaxSequenceNumber
, kValueTypeForSeek
);
1404 std::vector
<FileMetaData
*> files
;
1405 storage_info_
.GetOverlappingInputs(level
, &k1
, &k2
, &files
, -1, nullptr,
1407 for (const auto& file_meta
: files
) {
1409 TableFileName(cfd_
->ioptions()->cf_paths
,
1410 file_meta
->fd
.GetNumber(), file_meta
->fd
.GetPathId());
1411 if (props
->count(fname
) == 0) {
1412 // 1. If the table is already present in table cache, load table
1413 // properties from there.
1414 std::shared_ptr
<const TableProperties
> table_properties
;
1415 Status s
= GetTableProperties(&table_properties
, file_meta
, &fname
);
1417 props
->insert({fname
, table_properties
});
1426 return Status::OK();
1429 Status
Version::GetAggregatedTableProperties(
1430 std::shared_ptr
<const TableProperties
>* tp
, int level
) {
1431 TablePropertiesCollection props
;
1434 s
= GetPropertiesOfAllTables(&props
);
1436 s
= GetPropertiesOfAllTables(&props
, level
);
1442 auto* new_tp
= new TableProperties();
1443 for (const auto& item
: props
) {
1444 new_tp
->Add(*item
.second
);
1447 return Status::OK();
1450 size_t Version::GetMemoryUsageByTableReaders() {
1451 size_t total_usage
= 0;
1452 for (auto& file_level
: storage_info_
.level_files_brief_
) {
1453 for (size_t i
= 0; i
< file_level
.num_files
; i
++) {
1454 total_usage
+= cfd_
->table_cache()->GetMemoryUsageByTableReader(
1455 file_options_
, cfd_
->internal_comparator(), file_level
.files
[i
].fd
,
1456 mutable_cf_options_
.prefix_extractor
.get());
1462 void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData
* cf_meta
) {
1466 cf_meta
->name
= cfd_
->GetName();
1468 cf_meta
->file_count
= 0;
1469 cf_meta
->levels
.clear();
1471 auto* ioptions
= cfd_
->ioptions();
1472 auto* vstorage
= storage_info();
1474 for (int level
= 0; level
< cfd_
->NumberLevels(); level
++) {
1475 uint64_t level_size
= 0;
1476 cf_meta
->file_count
+= vstorage
->LevelFiles(level
).size();
1477 std::vector
<SstFileMetaData
> files
;
1478 for (const auto& file
: vstorage
->LevelFiles(level
)) {
1479 uint32_t path_id
= file
->fd
.GetPathId();
1480 std::string file_path
;
1481 if (path_id
< ioptions
->cf_paths
.size()) {
1482 file_path
= ioptions
->cf_paths
[path_id
].path
;
1484 assert(!ioptions
->cf_paths
.empty());
1485 file_path
= ioptions
->cf_paths
.back().path
;
1487 const uint64_t file_number
= file
->fd
.GetNumber();
1488 files
.emplace_back(SstFileMetaData
{
1489 MakeTableFileName("", file_number
), file_number
, file_path
,
1490 static_cast<size_t>(file
->fd
.GetFileSize()), file
->fd
.smallest_seqno
,
1491 file
->fd
.largest_seqno
, file
->smallest
.user_key().ToString(),
1492 file
->largest
.user_key().ToString(),
1493 file
->stats
.num_reads_sampled
.load(std::memory_order_relaxed
),
1494 file
->being_compacted
, file
->oldest_blob_file_number
,
1495 file
->TryGetOldestAncesterTime(), file
->TryGetFileCreationTime(),
1496 file
->file_checksum
, file
->file_checksum_func_name
});
1497 files
.back().num_entries
= file
->num_entries
;
1498 files
.back().num_deletions
= file
->num_deletions
;
1499 level_size
+= file
->fd
.GetFileSize();
1501 cf_meta
->levels
.emplace_back(
1502 level
, level_size
, std::move(files
));
1503 cf_meta
->size
+= level_size
;
1507 uint64_t Version::GetSstFilesSize() {
1508 uint64_t sst_files_size
= 0;
1509 for (int level
= 0; level
< storage_info_
.num_levels_
; level
++) {
1510 for (const auto& file_meta
: storage_info_
.LevelFiles(level
)) {
1511 sst_files_size
+= file_meta
->fd
.GetFileSize();
1514 return sst_files_size
;
1517 void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time
) {
1518 uint64_t oldest_time
= port::kMaxUint64
;
1519 for (int level
= 0; level
< storage_info_
.num_non_empty_levels_
; level
++) {
1520 for (FileMetaData
* meta
: storage_info_
.LevelFiles(level
)) {
1521 assert(meta
->fd
.table_reader
!= nullptr);
1522 uint64_t file_creation_time
= meta
->TryGetFileCreationTime();
1523 if (file_creation_time
== kUnknownFileCreationTime
) {
1527 if (file_creation_time
< oldest_time
) {
1528 oldest_time
= file_creation_time
;
1532 *creation_time
= oldest_time
;
1535 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
1536 // Estimation will be inaccurate when:
1537 // (1) there exist merge keys
1538 // (2) keys are directly overwritten
1539 // (3) deletion on non-existing keys
1540 // (4) low number of samples
1541 if (current_num_samples_
== 0) {
1545 if (current_num_non_deletions_
<= current_num_deletions_
) {
1549 uint64_t est
= current_num_non_deletions_
- current_num_deletions_
;
1551 uint64_t file_count
= 0;
1552 for (int level
= 0; level
< num_levels_
; ++level
) {
1553 file_count
+= files_
[level
].size();
1556 if (current_num_samples_
< file_count
) {
1557 // casting to avoid overflowing
1559 static_cast<uint64_t>(
1560 (est
* static_cast<double>(file_count
) / current_num_samples_
)
1567 double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
1569 assert(level
< num_levels_
);
1570 uint64_t sum_file_size_bytes
= 0;
1571 uint64_t sum_data_size_bytes
= 0;
1572 for (auto* file_meta
: files_
[level
]) {
1573 sum_file_size_bytes
+= file_meta
->fd
.GetFileSize();
1574 sum_data_size_bytes
+= file_meta
->raw_key_size
+ file_meta
->raw_value_size
;
1576 if (sum_file_size_bytes
== 0) {
1579 return static_cast<double>(sum_data_size_bytes
) / sum_file_size_bytes
;
1582 void Version::AddIterators(const ReadOptions
& read_options
,
1583 const FileOptions
& soptions
,
1584 MergeIteratorBuilder
* merge_iter_builder
,
1585 RangeDelAggregator
* range_del_agg
,
1586 bool allow_unprepared_value
) {
1587 assert(storage_info_
.finalized_
);
1589 for (int level
= 0; level
< storage_info_
.num_non_empty_levels(); level
++) {
1590 AddIteratorsForLevel(read_options
, soptions
, merge_iter_builder
, level
,
1591 range_del_agg
, allow_unprepared_value
);
1595 void Version::AddIteratorsForLevel(const ReadOptions
& read_options
,
1596 const FileOptions
& soptions
,
1597 MergeIteratorBuilder
* merge_iter_builder
,
1599 RangeDelAggregator
* range_del_agg
,
1600 bool allow_unprepared_value
) {
1601 assert(storage_info_
.finalized_
);
1602 if (level
>= storage_info_
.num_non_empty_levels()) {
1603 // This is an empty level
1605 } else if (storage_info_
.LevelFilesBrief(level
).num_files
== 0) {
1606 // No files in this level
1610 bool should_sample
= should_sample_file_read();
1612 auto* arena
= merge_iter_builder
->GetArena();
1614 // Merge all level zero files together since they may overlap
1615 for (size_t i
= 0; i
< storage_info_
.LevelFilesBrief(0).num_files
; i
++) {
1616 const auto& file
= storage_info_
.LevelFilesBrief(0).files
[i
];
1617 merge_iter_builder
->AddIterator(cfd_
->table_cache()->NewIterator(
1618 read_options
, soptions
, cfd_
->internal_comparator(),
1619 *file
.file_metadata
, range_del_agg
,
1620 mutable_cf_options_
.prefix_extractor
.get(), nullptr,
1621 cfd_
->internal_stats()->GetFileReadHist(0),
1622 TableReaderCaller::kUserIterator
, arena
,
1623 /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_
,
1624 /*smallest_compaction_key=*/nullptr,
1625 /*largest_compaction_key=*/nullptr, allow_unprepared_value
));
1627 if (should_sample
) {
1628 // Count ones for every L0 files. This is done per iterator creation
1629 // rather than Seek(), while files in other levels are recored per seek.
1630 // If users execute one range query per iterator, there may be some
1631 // discrepancy here.
1632 for (FileMetaData
* meta
: storage_info_
.LevelFiles(0)) {
1633 sample_file_read_inc(meta
);
1636 } else if (storage_info_
.LevelFilesBrief(level
).num_files
> 0) {
1637 // For levels > 0, we can use a concatenating iterator that sequentially
1638 // walks through the non-overlapping files in the level, opening them
1640 auto* mem
= arena
->AllocateAligned(sizeof(LevelIterator
));
1641 merge_iter_builder
->AddIterator(new (mem
) LevelIterator(
1642 cfd_
->table_cache(), read_options
, soptions
,
1643 cfd_
->internal_comparator(), &storage_info_
.LevelFilesBrief(level
),
1644 mutable_cf_options_
.prefix_extractor
.get(), should_sample_file_read(),
1645 cfd_
->internal_stats()->GetFileReadHist(level
),
1646 TableReaderCaller::kUserIterator
, IsFilterSkipped(level
), level
,
1648 /*compaction_boundaries=*/nullptr, allow_unprepared_value
));
1652 Status
Version::OverlapWithLevelIterator(const ReadOptions
& read_options
,
1653 const FileOptions
& file_options
,
1654 const Slice
& smallest_user_key
,
1655 const Slice
& largest_user_key
,
1656 int level
, bool* overlap
) {
1657 assert(storage_info_
.finalized_
);
1659 auto icmp
= cfd_
->internal_comparator();
1660 auto ucmp
= icmp
.user_comparator();
1664 ReadRangeDelAggregator
range_del_agg(&icmp
,
1665 kMaxSequenceNumber
/* upper_bound */);
1670 for (size_t i
= 0; i
< storage_info_
.LevelFilesBrief(0).num_files
; i
++) {
1671 const auto file
= &storage_info_
.LevelFilesBrief(0).files
[i
];
1672 if (AfterFile(ucmp
, &smallest_user_key
, file
) ||
1673 BeforeFile(ucmp
, &largest_user_key
, file
)) {
1676 ScopedArenaIterator
iter(cfd_
->table_cache()->NewIterator(
1677 read_options
, file_options
, cfd_
->internal_comparator(),
1678 *file
->file_metadata
, &range_del_agg
,
1679 mutable_cf_options_
.prefix_extractor
.get(), nullptr,
1680 cfd_
->internal_stats()->GetFileReadHist(0),
1681 TableReaderCaller::kUserIterator
, &arena
,
1682 /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_
,
1683 /*smallest_compaction_key=*/nullptr,
1684 /*largest_compaction_key=*/nullptr,
1685 /*allow_unprepared_value=*/false));
1686 status
= OverlapWithIterator(
1687 ucmp
, smallest_user_key
, largest_user_key
, iter
.get(), overlap
);
1688 if (!status
.ok() || *overlap
) {
1692 } else if (storage_info_
.LevelFilesBrief(level
).num_files
> 0) {
1693 auto mem
= arena
.AllocateAligned(sizeof(LevelIterator
));
1694 ScopedArenaIterator
iter(new (mem
) LevelIterator(
1695 cfd_
->table_cache(), read_options
, file_options
,
1696 cfd_
->internal_comparator(), &storage_info_
.LevelFilesBrief(level
),
1697 mutable_cf_options_
.prefix_extractor
.get(), should_sample_file_read(),
1698 cfd_
->internal_stats()->GetFileReadHist(level
),
1699 TableReaderCaller::kUserIterator
, IsFilterSkipped(level
), level
,
1701 status
= OverlapWithIterator(
1702 ucmp
, smallest_user_key
, largest_user_key
, iter
.get(), overlap
);
1705 if (status
.ok() && *overlap
== false &&
1706 range_del_agg
.IsRangeOverlapped(smallest_user_key
, largest_user_key
)) {
1712 VersionStorageInfo::VersionStorageInfo(
1713 const InternalKeyComparator
* internal_comparator
,
1714 const Comparator
* user_comparator
, int levels
,
1715 CompactionStyle compaction_style
, VersionStorageInfo
* ref_vstorage
,
1716 bool _force_consistency_checks
)
1717 : internal_comparator_(internal_comparator
),
1718 user_comparator_(user_comparator
),
1719 // cfd is nullptr if Version is dummy
1720 num_levels_(levels
),
1721 num_non_empty_levels_(0),
1722 file_indexer_(user_comparator
),
1723 compaction_style_(compaction_style
),
1724 files_(new std::vector
<FileMetaData
*>[num_levels_
]),
1725 base_level_(num_levels_
== 1 ? -1 : 1),
1726 level_multiplier_(0.0),
1727 files_by_compaction_pri_(num_levels_
),
1728 level0_non_overlapping_(false),
1729 next_file_to_compact_by_size_(num_levels_
),
1730 compaction_score_(num_levels_
),
1731 compaction_level_(num_levels_
),
1732 l0_delay_trigger_count_(0),
1733 accumulated_file_size_(0),
1734 accumulated_raw_key_size_(0),
1735 accumulated_raw_value_size_(0),
1736 accumulated_num_non_deletions_(0),
1737 accumulated_num_deletions_(0),
1738 current_num_non_deletions_(0),
1739 current_num_deletions_(0),
1740 current_num_samples_(0),
1741 estimated_compaction_needed_bytes_(0),
1743 force_consistency_checks_(_force_consistency_checks
) {
1744 if (ref_vstorage
!= nullptr) {
1745 accumulated_file_size_
= ref_vstorage
->accumulated_file_size_
;
1746 accumulated_raw_key_size_
= ref_vstorage
->accumulated_raw_key_size_
;
1747 accumulated_raw_value_size_
= ref_vstorage
->accumulated_raw_value_size_
;
1748 accumulated_num_non_deletions_
=
1749 ref_vstorage
->accumulated_num_non_deletions_
;
1750 accumulated_num_deletions_
= ref_vstorage
->accumulated_num_deletions_
;
1751 current_num_non_deletions_
= ref_vstorage
->current_num_non_deletions_
;
1752 current_num_deletions_
= ref_vstorage
->current_num_deletions_
;
1753 current_num_samples_
= ref_vstorage
->current_num_samples_
;
1754 oldest_snapshot_seqnum_
= ref_vstorage
->oldest_snapshot_seqnum_
;
1758 Version::Version(ColumnFamilyData
* column_family_data
, VersionSet
* vset
,
1759 const FileOptions
& file_opt
,
1760 const MutableCFOptions mutable_cf_options
,
1761 const std::shared_ptr
<IOTracer
>& io_tracer
,
1762 uint64_t version_number
)
1764 cfd_(column_family_data
),
1765 info_log_((cfd_
== nullptr) ? nullptr : cfd_
->ioptions()->info_log
),
1766 db_statistics_((cfd_
== nullptr) ? nullptr
1767 : cfd_
->ioptions()->statistics
),
1768 table_cache_((cfd_
== nullptr) ? nullptr : cfd_
->table_cache()),
1769 blob_file_cache_(cfd_
? cfd_
->blob_file_cache() : nullptr),
1770 merge_operator_((cfd_
== nullptr) ? nullptr
1771 : cfd_
->ioptions()->merge_operator
),
1773 (cfd_
== nullptr) ? nullptr : &cfd_
->internal_comparator(),
1774 (cfd_
== nullptr) ? nullptr : cfd_
->user_comparator(),
1775 cfd_
== nullptr ? 0 : cfd_
->NumberLevels(),
1776 cfd_
== nullptr ? kCompactionStyleLevel
1777 : cfd_
->ioptions()->compaction_style
,
1778 (cfd_
== nullptr || cfd_
->current() == nullptr)
1780 : cfd_
->current()->storage_info(),
1781 cfd_
== nullptr ? false : cfd_
->ioptions()->force_consistency_checks
),
1786 file_options_(file_opt
),
1787 mutable_cf_options_(mutable_cf_options
),
1788 max_file_size_for_l0_meta_pin_(
1789 MaxFileSizeForL0MetaPin(mutable_cf_options_
)),
1790 version_number_(version_number
),
1791 io_tracer_(io_tracer
) {}
1793 Status
Version::GetBlob(const ReadOptions
& read_options
, const Slice
& user_key
,
1794 PinnableSlice
* value
) const {
1797 if (read_options
.read_tier
== kBlockCacheTier
) {
1798 return Status::Incomplete("Cannot read blob: no disk I/O allowed");
1801 BlobIndex blob_index
;
1804 Status s
= blob_index
.DecodeFrom(*value
);
1810 if (blob_index
.HasTTL() || blob_index
.IsInlined()) {
1811 return Status::Corruption("Unexpected TTL/inlined blob index");
1814 const auto& blob_files
= storage_info_
.GetBlobFiles();
1816 const uint64_t blob_file_number
= blob_index
.file_number();
1818 const auto it
= blob_files
.find(blob_file_number
);
1819 if (it
== blob_files
.end()) {
1820 return Status::Corruption("Invalid blob file number");
1823 CacheHandleGuard
<BlobFileReader
> blob_file_reader
;
1826 assert(blob_file_cache_
);
1827 const Status s
= blob_file_cache_
->GetBlobFileReader(blob_file_number
,
1834 assert(blob_file_reader
.GetValue());
1835 const Status s
= blob_file_reader
.GetValue()->GetBlob(
1836 read_options
, user_key
, blob_index
.offset(), blob_index
.size(),
1837 blob_index
.compression(), value
);
1842 void Version::Get(const ReadOptions
& read_options
, const LookupKey
& k
,
1843 PinnableSlice
* value
, std::string
* timestamp
, Status
* status
,
1844 MergeContext
* merge_context
,
1845 SequenceNumber
* max_covering_tombstone_seq
, bool* value_found
,
1846 bool* key_exists
, SequenceNumber
* seq
, ReadCallback
* callback
,
1847 bool* is_blob
, bool do_merge
) {
1848 Slice ikey
= k
.internal_key();
1849 Slice user_key
= k
.user_key();
1851 assert(status
->ok() || status
->IsMergeInProgress());
1853 if (key_exists
!= nullptr) {
1854 // will falsify below if not found
1858 PinnedIteratorsManager pinned_iters_mgr
;
1859 uint64_t tracing_get_id
= BlockCacheTraceHelper::kReservedGetId
;
1860 if (vset_
&& vset_
->block_cache_tracer_
&&
1861 vset_
->block_cache_tracer_
->is_tracing_enabled()) {
1862 tracing_get_id
= vset_
->block_cache_tracer_
->NextGetId();
1865 // Note: the old StackableDB-based BlobDB passes in
1866 // GetImplOptions::is_blob_index; for the integrated BlobDB implementation, we
1867 // need to provide it here.
1868 bool is_blob_index
= false;
1869 bool* const is_blob_to_use
= is_blob
? is_blob
: &is_blob_index
;
1871 GetContext
get_context(
1872 user_comparator(), merge_operator_
, info_log_
, db_statistics_
,
1873 status
->ok() ? GetContext::kNotFound
: GetContext::kMerge
, user_key
,
1874 do_merge
? value
: nullptr, do_merge
? timestamp
: nullptr, value_found
,
1875 merge_context
, do_merge
, max_covering_tombstone_seq
, this->env_
, seq
,
1876 merge_operator_
? &pinned_iters_mgr
: nullptr, callback
, is_blob_to_use
,
1879 // Pin blocks that we read to hold merge operands
1880 if (merge_operator_
) {
1881 pinned_iters_mgr
.StartPinning();
1885 storage_info_
.files_
, user_key
, ikey
, &storage_info_
.level_files_brief_
,
1886 storage_info_
.num_non_empty_levels_
, &storage_info_
.file_indexer_
,
1887 user_comparator(), internal_comparator());
1888 FdWithKeyRange
* f
= fp
.GetNextFile();
1890 while (f
!= nullptr) {
1891 if (*max_covering_tombstone_seq
> 0) {
1892 // The remaining files we look at will only contain covered keys, so we
1896 if (get_context
.sample()) {
1897 sample_file_read_inc(f
->file_metadata
);
1900 bool timer_enabled
=
1901 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex
&&
1902 get_perf_context()->per_level_perf_context_enabled
;
1903 StopWatchNano
timer(env_
, timer_enabled
/* auto_start */);
1904 *status
= table_cache_
->Get(
1905 read_options
, *internal_comparator(), *f
->file_metadata
, ikey
,
1906 &get_context
, mutable_cf_options_
.prefix_extractor
.get(),
1907 cfd_
->internal_stats()->GetFileReadHist(fp
.GetHitFileLevel()),
1908 IsFilterSkipped(static_cast<int>(fp
.GetHitFileLevel()),
1909 fp
.IsHitFileLastInLevel()),
1910 fp
.GetHitFileLevel(), max_file_size_for_l0_meta_pin_
);
1911 // TODO: examine the behavior for corrupted key
1912 if (timer_enabled
) {
1913 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos
, timer
.ElapsedNanos(),
1914 fp
.GetHitFileLevel());
1916 if (!status
->ok()) {
1920 // report the counters before returning
1921 if (get_context
.State() != GetContext::kNotFound
&&
1922 get_context
.State() != GetContext::kMerge
&&
1923 db_statistics_
!= nullptr) {
1924 get_context
.ReportCounters();
1926 switch (get_context
.State()) {
1927 case GetContext::kNotFound
:
1928 // Keep searching in other files
1930 case GetContext::kMerge
:
1931 // TODO: update per-level perfcontext user_key_return_count for kMerge
1933 case GetContext::kFound
:
1934 if (is_blob_index
) {
1935 if (do_merge
&& value
) {
1936 *status
= GetBlob(read_options
, user_key
, value
);
1937 if (!status
->ok()) {
1938 if (status
->IsIncomplete()) {
1939 get_context
.MarkKeyMayExist();
1946 if (fp
.GetHitFileLevel() == 0) {
1947 RecordTick(db_statistics_
, GET_HIT_L0
);
1948 } else if (fp
.GetHitFileLevel() == 1) {
1949 RecordTick(db_statistics_
, GET_HIT_L1
);
1950 } else if (fp
.GetHitFileLevel() >= 2) {
1951 RecordTick(db_statistics_
, GET_HIT_L2_AND_UP
);
1953 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count
, 1,
1954 fp
.GetHitFileLevel());
1956 case GetContext::kDeleted
:
1957 // Use empty error message for speed
1958 *status
= Status::NotFound();
1960 case GetContext::kCorrupt
:
1961 *status
= Status::Corruption("corrupted key for ", user_key
);
1963 case GetContext::kUnexpectedBlobIndex
:
1964 ROCKS_LOG_ERROR(info_log_
, "Encounter unexpected blob index.");
1965 *status
= Status::NotSupported(
1966 "Encounter unexpected blob index. Please open DB with "
1967 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
1970 f
= fp
.GetNextFile();
1972 if (db_statistics_
!= nullptr) {
1973 get_context
.ReportCounters();
1975 if (GetContext::kMerge
== get_context
.State()) {
1977 *status
= Status::OK();
1980 if (!merge_operator_
) {
1981 *status
= Status::InvalidArgument(
1982 "merge_operator is not properly initialized.");
1985 // merge_operands are in saver and we hit the beginning of the key history
1986 // do a final merge of nullptr and operands;
1987 std::string
* str_value
= value
!= nullptr ? value
->GetSelf() : nullptr;
1988 *status
= MergeHelper::TimedFullMerge(
1989 merge_operator_
, user_key
, nullptr, merge_context
->GetOperands(),
1990 str_value
, info_log_
, db_statistics_
, env_
,
1991 nullptr /* result_operand */, true);
1992 if (LIKELY(value
!= nullptr)) {
1996 if (key_exists
!= nullptr) {
1997 *key_exists
= false;
1999 *status
= Status::NotFound(); // Use an empty error message for speed
2003 void Version::MultiGet(const ReadOptions
& read_options
, MultiGetRange
* range
,
2004 ReadCallback
* callback
, bool* is_blob
) {
2005 PinnedIteratorsManager pinned_iters_mgr
;
2007 // Pin blocks that we read to hold merge operands
2008 if (merge_operator_
) {
2009 pinned_iters_mgr
.StartPinning();
2011 uint64_t tracing_mget_id
= BlockCacheTraceHelper::kReservedGetId
;
2013 if (vset_
&& vset_
->block_cache_tracer_
&&
2014 vset_
->block_cache_tracer_
->is_tracing_enabled()) {
2015 tracing_mget_id
= vset_
->block_cache_tracer_
->NextGetId();
2017 // Even though we know the batch size won't be > MAX_BATCH_SIZE,
2018 // use autovector in order to avoid unnecessary construction of GetContext
2019 // objects, which is expensive
2020 autovector
<GetContext
, 16> get_ctx
;
2021 for (auto iter
= range
->begin(); iter
!= range
->end(); ++iter
) {
2022 assert(iter
->s
->ok() || iter
->s
->IsMergeInProgress());
2023 get_ctx
.emplace_back(
2024 user_comparator(), merge_operator_
, info_log_
, db_statistics_
,
2025 iter
->s
->ok() ? GetContext::kNotFound
: GetContext::kMerge
,
2026 iter
->ukey_with_ts
, iter
->value
, iter
->timestamp
, nullptr,
2027 &(iter
->merge_context
), true, &iter
->max_covering_tombstone_seq
,
2028 this->env_
, nullptr, merge_operator_
? &pinned_iters_mgr
: nullptr,
2029 callback
, is_blob
, tracing_mget_id
);
2030 // MergeInProgress status, if set, has been transferred to the get_context
2031 // state, so we set status to ok here. From now on, the iter status will
2032 // be used for IO errors, and get_context state will be used for any
2034 *(iter
->s
) = Status::OK();
2036 int get_ctx_index
= 0;
2037 for (auto iter
= range
->begin(); iter
!= range
->end();
2038 ++iter
, get_ctx_index
++) {
2039 iter
->get_context
= &(get_ctx
[get_ctx_index
]);
2042 MultiGetRange
file_picker_range(*range
, range
->begin(), range
->end());
2043 FilePickerMultiGet
fp(
2045 &storage_info_
.level_files_brief_
, storage_info_
.num_non_empty_levels_
,
2046 &storage_info_
.file_indexer_
, user_comparator(), internal_comparator());
2047 FdWithKeyRange
* f
= fp
.GetNextFile();
2049 uint64_t num_index_read
= 0;
2050 uint64_t num_filter_read
= 0;
2051 uint64_t num_data_read
= 0;
2052 uint64_t num_sst_read
= 0;
2054 while (f
!= nullptr) {
2055 MultiGetRange file_range
= fp
.CurrentFileRange();
2056 bool timer_enabled
=
2057 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex
&&
2058 get_perf_context()->per_level_perf_context_enabled
;
2059 StopWatchNano
timer(env_
, timer_enabled
/* auto_start */);
2060 s
= table_cache_
->MultiGet(
2061 read_options
, *internal_comparator(), *f
->file_metadata
, &file_range
,
2062 mutable_cf_options_
.prefix_extractor
.get(),
2063 cfd_
->internal_stats()->GetFileReadHist(fp
.GetHitFileLevel()),
2064 IsFilterSkipped(static_cast<int>(fp
.GetHitFileLevel()),
2065 fp
.IsHitFileLastInLevel()),
2066 fp
.GetHitFileLevel());
2067 // TODO: examine the behavior for corrupted key
2068 if (timer_enabled
) {
2069 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos
, timer
.ElapsedNanos(),
2070 fp
.GetHitFileLevel());
2073 // TODO: Set status for individual keys appropriately
2074 for (auto iter
= file_range
.begin(); iter
!= file_range
.end(); ++iter
) {
2076 file_range
.MarkKeyDone(iter
);
2080 uint64_t batch_size
= 0;
2081 for (auto iter
= file_range
.begin(); s
.ok() && iter
!= file_range
.end();
2083 GetContext
& get_context
= *iter
->get_context
;
2084 Status
* status
= iter
->s
;
2085 // The Status in the KeyContext takes precedence over GetContext state
2086 // Status may be an error if there were any IO errors in the table
2087 // reader. We never expect Status to be NotFound(), as that is
2088 // determined by get_context
2089 assert(!status
->IsNotFound());
2090 if (!status
->ok()) {
2091 file_range
.MarkKeyDone(iter
);
2095 if (get_context
.sample()) {
2096 sample_file_read_inc(f
->file_metadata
);
2099 num_index_read
+= get_context
.get_context_stats_
.num_index_read
;
2100 num_filter_read
+= get_context
.get_context_stats_
.num_filter_read
;
2101 num_data_read
+= get_context
.get_context_stats_
.num_data_read
;
2102 num_sst_read
+= get_context
.get_context_stats_
.num_sst_read
;
2104 // report the counters before returning
2105 if (get_context
.State() != GetContext::kNotFound
&&
2106 get_context
.State() != GetContext::kMerge
&&
2107 db_statistics_
!= nullptr) {
2108 get_context
.ReportCounters();
2110 if (iter
->max_covering_tombstone_seq
> 0) {
2111 // The remaining files we look at will only contain covered keys, so
2112 // we stop here for this key
2113 file_picker_range
.SkipKey(iter
);
2116 switch (get_context
.State()) {
2117 case GetContext::kNotFound
:
2118 // Keep searching in other files
2120 case GetContext::kMerge
:
2121 // TODO: update per-level perfcontext user_key_return_count for kMerge
2123 case GetContext::kFound
:
2124 if (fp
.GetHitFileLevel() == 0) {
2125 RecordTick(db_statistics_
, GET_HIT_L0
);
2126 } else if (fp
.GetHitFileLevel() == 1) {
2127 RecordTick(db_statistics_
, GET_HIT_L1
);
2128 } else if (fp
.GetHitFileLevel() >= 2) {
2129 RecordTick(db_statistics_
, GET_HIT_L2_AND_UP
);
2131 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count
, 1,
2132 fp
.GetHitFileLevel());
2133 file_range
.AddValueSize(iter
->value
->size());
2134 file_range
.MarkKeyDone(iter
);
2135 if (file_range
.GetValueSize() > read_options
.value_size_soft_limit
) {
2136 s
= Status::Aborted();
2140 case GetContext::kDeleted
:
2141 // Use empty error message for speed
2142 *status
= Status::NotFound();
2143 file_range
.MarkKeyDone(iter
);
2145 case GetContext::kCorrupt
:
2147 Status::Corruption("corrupted key for ", iter
->lkey
->user_key());
2148 file_range
.MarkKeyDone(iter
);
2150 case GetContext::kUnexpectedBlobIndex
:
2151 ROCKS_LOG_ERROR(info_log_
, "Encounter unexpected blob index.");
2152 *status
= Status::NotSupported(
2153 "Encounter unexpected blob index. Please open DB with "
2154 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2155 file_range
.MarkKeyDone(iter
);
2160 // Report MultiGet stats per level.
2161 if (fp
.IsHitFileLastInLevel()) {
2162 // Dump the stats if this is the last file of this level and reset for
2164 RecordInHistogram(db_statistics_
,
2165 NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL
,
2166 num_index_read
+ num_filter_read
);
2167 RecordInHistogram(db_statistics_
, NUM_DATA_BLOCKS_READ_PER_LEVEL
,
2169 RecordInHistogram(db_statistics_
, NUM_SST_READ_PER_LEVEL
, num_sst_read
);
2170 num_filter_read
= 0;
2176 RecordInHistogram(db_statistics_
, SST_BATCH_SIZE
, batch_size
);
2177 if (!s
.ok() || file_picker_range
.empty()) {
2180 f
= fp
.GetNextFile();
2183 // Process any left over keys
2184 for (auto iter
= range
->begin(); s
.ok() && iter
!= range
->end(); ++iter
) {
2185 GetContext
& get_context
= *iter
->get_context
;
2186 Status
* status
= iter
->s
;
2187 Slice user_key
= iter
->lkey
->user_key();
2189 if (db_statistics_
!= nullptr) {
2190 get_context
.ReportCounters();
2192 if (GetContext::kMerge
== get_context
.State()) {
2193 if (!merge_operator_
) {
2194 *status
= Status::InvalidArgument(
2195 "merge_operator is not properly initialized.");
2196 range
->MarkKeyDone(iter
);
2199 // merge_operands are in saver and we hit the beginning of the key history
2200 // do a final merge of nullptr and operands;
2201 std::string
* str_value
=
2202 iter
->value
!= nullptr ? iter
->value
->GetSelf() : nullptr;
2203 *status
= MergeHelper::TimedFullMerge(
2204 merge_operator_
, user_key
, nullptr, iter
->merge_context
.GetOperands(),
2205 str_value
, info_log_
, db_statistics_
, env_
,
2206 nullptr /* result_operand */, true);
2207 if (LIKELY(iter
->value
!= nullptr)) {
2208 iter
->value
->PinSelf();
2209 range
->AddValueSize(iter
->value
->size());
2210 range
->MarkKeyDone(iter
);
2211 if (range
->GetValueSize() > read_options
.value_size_soft_limit
) {
2212 s
= Status::Aborted();
2217 range
->MarkKeyDone(iter
);
2218 *status
= Status::NotFound(); // Use an empty error message for speed
2222 for (auto iter
= range
->begin(); iter
!= range
->end(); ++iter
) {
2223 range
->MarkKeyDone(iter
);
2228 bool Version::IsFilterSkipped(int level
, bool is_file_last_in_level
) {
2229 // Reaching the bottom level implies misses at all upper levels, so we'll
2230 // skip checking the filters when we predict a hit.
2231 return cfd_
->ioptions()->optimize_filters_for_hits
&&
2232 (level
> 0 || is_file_last_in_level
) &&
2233 level
== storage_info_
.num_non_empty_levels() - 1;
2236 void VersionStorageInfo::GenerateLevelFilesBrief() {
2237 level_files_brief_
.resize(num_non_empty_levels_
);
2238 for (int level
= 0; level
< num_non_empty_levels_
; level
++) {
2239 DoGenerateLevelFilesBrief(
2240 &level_files_brief_
[level
], files_
[level
], &arena_
);
2244 void Version::PrepareApply(
2245 const MutableCFOptions
& mutable_cf_options
,
2246 bool update_stats
) {
2247 TEST_SYNC_POINT_CALLBACK(
2248 "Version::PrepareApply:forced_check",
2249 reinterpret_cast<void*>(&storage_info_
.force_consistency_checks_
));
2250 UpdateAccumulatedStats(update_stats
);
2251 storage_info_
.UpdateNumNonEmptyLevels();
2252 storage_info_
.CalculateBaseBytes(*cfd_
->ioptions(), mutable_cf_options
);
2253 storage_info_
.UpdateFilesByCompactionPri(cfd_
->ioptions()->compaction_pri
);
2254 storage_info_
.GenerateFileIndexer();
2255 storage_info_
.GenerateLevelFilesBrief();
2256 storage_info_
.GenerateLevel0NonOverlapping();
2257 storage_info_
.GenerateBottommostFiles();
2260 bool Version::MaybeInitializeFileMetaData(FileMetaData
* file_meta
) {
2261 if (file_meta
->init_stats_from_file
||
2262 file_meta
->compensated_file_size
> 0) {
2265 std::shared_ptr
<const TableProperties
> tp
;
2266 Status s
= GetTableProperties(&tp
, file_meta
);
2267 file_meta
->init_stats_from_file
= true;
2269 ROCKS_LOG_ERROR(vset_
->db_options_
->info_log
,
2270 "Unable to load table properties for file %" PRIu64
2272 file_meta
->fd
.GetNumber(), s
.ToString().c_str());
2275 if (tp
.get() == nullptr) return false;
2276 file_meta
->num_entries
= tp
->num_entries
;
2277 file_meta
->num_deletions
= tp
->num_deletions
;
2278 file_meta
->raw_value_size
= tp
->raw_value_size
;
2279 file_meta
->raw_key_size
= tp
->raw_key_size
;
2284 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData
* file_meta
) {
2285 TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
2288 assert(file_meta
->init_stats_from_file
);
2289 accumulated_file_size_
+= file_meta
->fd
.GetFileSize();
2290 accumulated_raw_key_size_
+= file_meta
->raw_key_size
;
2291 accumulated_raw_value_size_
+= file_meta
->raw_value_size
;
2292 accumulated_num_non_deletions_
+=
2293 file_meta
->num_entries
- file_meta
->num_deletions
;
2294 accumulated_num_deletions_
+= file_meta
->num_deletions
;
2296 current_num_non_deletions_
+=
2297 file_meta
->num_entries
- file_meta
->num_deletions
;
2298 current_num_deletions_
+= file_meta
->num_deletions
;
2299 current_num_samples_
++;
2302 void VersionStorageInfo::RemoveCurrentStats(FileMetaData
* file_meta
) {
2303 if (file_meta
->init_stats_from_file
) {
2304 current_num_non_deletions_
-=
2305 file_meta
->num_entries
- file_meta
->num_deletions
;
2306 current_num_deletions_
-= file_meta
->num_deletions
;
2307 current_num_samples_
--;
2311 void Version::UpdateAccumulatedStats(bool update_stats
) {
2313 // maximum number of table properties loaded from files.
2314 const int kMaxInitCount
= 20;
2316 // here only the first kMaxInitCount files which haven't been
2317 // initialized from file will be updated with num_deletions.
2318 // The motivation here is to cap the maximum I/O per Version creation.
2319 // The reason for choosing files from lower-level instead of higher-level
2320 // is that such design is able to propagate the initialization from
2321 // lower-level to higher-level: When the num_deletions of lower-level
2322 // files are updated, it will make the lower-level files have accurate
2323 // compensated_file_size, making lower-level to higher-level compaction
2324 // will be triggered, which creates higher-level files whose num_deletions
2325 // will be updated here.
2327 level
< storage_info_
.num_levels_
&& init_count
< kMaxInitCount
;
2329 for (auto* file_meta
: storage_info_
.files_
[level
]) {
2330 if (MaybeInitializeFileMetaData(file_meta
)) {
2331 // each FileMeta will be initialized only once.
2332 storage_info_
.UpdateAccumulatedStats(file_meta
);
2333 // when option "max_open_files" is -1, all the file metadata has
2334 // already been read, so MaybeInitializeFileMetaData() won't incur
2335 // any I/O cost. "max_open_files=-1" means that the table cache passed
2336 // to the VersionSet and then to the ColumnFamilySet has a size of
2337 // TableCache::kInfiniteCapacity
2338 if (vset_
->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
2339 TableCache::kInfiniteCapacity
) {
2342 if (++init_count
>= kMaxInitCount
) {
2348 // In case all sampled-files contain only deletion entries, then we
2349 // load the table-property of a file in higher-level to initialize
2351 for (int level
= storage_info_
.num_levels_
- 1;
2352 storage_info_
.accumulated_raw_value_size_
== 0 && level
>= 0;
2354 for (int i
= static_cast<int>(storage_info_
.files_
[level
].size()) - 1;
2355 storage_info_
.accumulated_raw_value_size_
== 0 && i
>= 0; --i
) {
2356 if (MaybeInitializeFileMetaData(storage_info_
.files_
[level
][i
])) {
2357 storage_info_
.UpdateAccumulatedStats(storage_info_
.files_
[level
][i
]);
2363 storage_info_
.ComputeCompensatedSizes();
2366 void VersionStorageInfo::ComputeCompensatedSizes() {
2367 static const int kDeletionWeightOnCompaction
= 2;
2368 uint64_t average_value_size
= GetAverageValueSize();
2370 // compute the compensated size
2371 for (int level
= 0; level
< num_levels_
; level
++) {
2372 for (auto* file_meta
: files_
[level
]) {
2373 // Here we only compute compensated_file_size for those file_meta
2374 // which compensated_file_size is uninitialized (== 0). This is true only
2375 // for files that have been created right now and no other thread has
2376 // access to them. That's why we can safely mutate compensated_file_size.
2377 if (file_meta
->compensated_file_size
== 0) {
2378 file_meta
->compensated_file_size
= file_meta
->fd
.GetFileSize();
2379 // Here we only boost the size of deletion entries of a file only
2380 // when the number of deletion entries is greater than the number of
2381 // non-deletion entries in the file. The motivation here is that in
2382 // a stable workload, the number of deletion entries should be roughly
2383 // equal to the number of non-deletion entries. If we compensate the
2384 // size of deletion entries in a stable workload, the deletion
2385 // compensation logic might introduce unwanted effet which changes the
2386 // shape of LSM tree.
2387 if (file_meta
->num_deletions
* 2 >= file_meta
->num_entries
) {
2388 file_meta
->compensated_file_size
+=
2389 (file_meta
->num_deletions
* 2 - file_meta
->num_entries
) *
2390 average_value_size
* kDeletionWeightOnCompaction
;
2397 int VersionStorageInfo::MaxInputLevel() const {
2398 if (compaction_style_
== kCompactionStyleLevel
) {
2399 return num_levels() - 2;
2404 int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind
) const {
2405 if (allow_ingest_behind
) {
2406 assert(num_levels() > 1);
2407 return num_levels() - 2;
2409 return num_levels() - 1;
2412 void VersionStorageInfo::EstimateCompactionBytesNeeded(
2413 const MutableCFOptions
& mutable_cf_options
) {
2414 // Only implemented for level-based compaction
2415 if (compaction_style_
!= kCompactionStyleLevel
) {
2416 estimated_compaction_needed_bytes_
= 0;
2420 // Start from Level 0, if level 0 qualifies compaction to level 1,
2421 // we estimate the size of compaction.
2422 // Then we move on to the next level and see whether it qualifies compaction
2423 // to the next level. The size of the level is estimated as the actual size
2424 // on the level plus the input bytes from the previous level if there is any.
2425 // If it exceeds, take the exceeded bytes as compaction input and add the size
2426 // of the compaction size to tatal size.
2427 // We keep doing it to Level 2, 3, etc, until the last level and return the
2428 // accumulated bytes.
2430 uint64_t bytes_compact_to_next_level
= 0;
2431 uint64_t level_size
= 0;
2432 for (auto* f
: files_
[0]) {
2433 level_size
+= f
->fd
.GetFileSize();
2436 bool level0_compact_triggered
= false;
2437 if (static_cast<int>(files_
[0].size()) >=
2438 mutable_cf_options
.level0_file_num_compaction_trigger
||
2439 level_size
>= mutable_cf_options
.max_bytes_for_level_base
) {
2440 level0_compact_triggered
= true;
2441 estimated_compaction_needed_bytes_
= level_size
;
2442 bytes_compact_to_next_level
= level_size
;
2444 estimated_compaction_needed_bytes_
= 0;
2448 uint64_t bytes_next_level
= 0;
2449 for (int level
= base_level(); level
<= MaxInputLevel(); level
++) {
2451 if (bytes_next_level
> 0) {
2453 uint64_t level_size2
= 0;
2454 for (auto* f
: files_
[level
]) {
2455 level_size2
+= f
->fd
.GetFileSize();
2457 assert(level_size2
== bytes_next_level
);
2459 level_size
= bytes_next_level
;
2460 bytes_next_level
= 0;
2462 for (auto* f
: files_
[level
]) {
2463 level_size
+= f
->fd
.GetFileSize();
2466 if (level
== base_level() && level0_compact_triggered
) {
2467 // Add base level size to compaction if level0 compaction triggered.
2468 estimated_compaction_needed_bytes_
+= level_size
;
2470 // Add size added by previous compaction
2471 level_size
+= bytes_compact_to_next_level
;
2472 bytes_compact_to_next_level
= 0;
2473 uint64_t level_target
= MaxBytesForLevel(level
);
2474 if (level_size
> level_target
) {
2475 bytes_compact_to_next_level
= level_size
- level_target
;
2476 // Estimate the actual compaction fan-out ratio as size ratio between
2479 assert(bytes_next_level
== 0);
2480 if (level
+ 1 < num_levels_
) {
2481 for (auto* f
: files_
[level
+ 1]) {
2482 bytes_next_level
+= f
->fd
.GetFileSize();
2485 if (bytes_next_level
> 0) {
2486 assert(level_size
> 0);
2487 estimated_compaction_needed_bytes_
+= static_cast<uint64_t>(
2488 static_cast<double>(bytes_compact_to_next_level
) *
2489 (static_cast<double>(bytes_next_level
) /
2490 static_cast<double>(level_size
) +
2498 uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions
& ioptions
,
2499 const MutableCFOptions
& mutable_cf_options
,
2500 const std::vector
<FileMetaData
*>& files
) {
2501 uint32_t ttl_expired_files_count
= 0;
2503 int64_t _current_time
;
2504 auto status
= ioptions
.env
->GetCurrentTime(&_current_time
);
2506 const uint64_t current_time
= static_cast<uint64_t>(_current_time
);
2507 for (FileMetaData
* f
: files
) {
2508 if (!f
->being_compacted
) {
2509 uint64_t oldest_ancester_time
= f
->TryGetOldestAncesterTime();
2510 if (oldest_ancester_time
!= 0 &&
2511 oldest_ancester_time
< (current_time
- mutable_cf_options
.ttl
)) {
2512 ttl_expired_files_count
++;
2517 return ttl_expired_files_count
;
2519 } // anonymous namespace
2521 void VersionStorageInfo::ComputeCompactionScore(
2522 const ImmutableCFOptions
& immutable_cf_options
,
2523 const MutableCFOptions
& mutable_cf_options
) {
2524 for (int level
= 0; level
<= MaxInputLevel(); level
++) {
2527 // We treat level-0 specially by bounding the number of files
2528 // instead of number of bytes for two reasons:
2530 // (1) With larger write-buffer sizes, it is nice not to do too
2531 // many level-0 compactions.
2533 // (2) The files in level-0 are merged on every read and
2534 // therefore we wish to avoid too many files when the individual
2535 // file size is small (perhaps because of a small write-buffer
2536 // setting, or very high compression ratios, or lots of
2537 // overwrites/deletions).
2538 int num_sorted_runs
= 0;
2539 uint64_t total_size
= 0;
2540 for (auto* f
: files_
[level
]) {
2541 if (!f
->being_compacted
) {
2542 total_size
+= f
->compensated_file_size
;
2546 if (compaction_style_
== kCompactionStyleUniversal
) {
2547 // For universal compaction, we use level0 score to indicate
2548 // compaction score for the whole DB. Adding other levels as if
2549 // they are L0 files.
2550 for (int i
= 1; i
< num_levels(); i
++) {
2551 // Its possible that a subset of the files in a level may be in a
2552 // compaction, due to delete triggered compaction or trivial move.
2553 // In that case, the below check may not catch a level being
2554 // compacted as it only checks the first file. The worst that can
2555 // happen is a scheduled compaction thread will find nothing to do.
2556 if (!files_
[i
].empty() && !files_
[i
][0]->being_compacted
) {
2562 if (compaction_style_
== kCompactionStyleFIFO
) {
2563 score
= static_cast<double>(total_size
) /
2564 mutable_cf_options
.compaction_options_fifo
.max_table_files_size
;
2565 if (mutable_cf_options
.compaction_options_fifo
.allow_compaction
) {
2567 static_cast<double>(num_sorted_runs
) /
2568 mutable_cf_options
.level0_file_num_compaction_trigger
,
2571 if (mutable_cf_options
.ttl
> 0) {
2573 static_cast<double>(GetExpiredTtlFilesCount(
2574 immutable_cf_options
, mutable_cf_options
, files_
[level
])),
2579 score
= static_cast<double>(num_sorted_runs
) /
2580 mutable_cf_options
.level0_file_num_compaction_trigger
;
2581 if (compaction_style_
== kCompactionStyleLevel
&& num_levels() > 1) {
2582 // Level-based involves L0->L0 compactions that can lead to oversized
2583 // L0 files. Take into account size as well to avoid later giant
2584 // compactions to the base level.
2585 uint64_t l0_target_size
= mutable_cf_options
.max_bytes_for_level_base
;
2586 if (immutable_cf_options
.level_compaction_dynamic_level_bytes
&&
2587 level_multiplier_
!= 0.0) {
2588 // Prevent L0 to Lbase fanout from growing larger than
2589 // `level_multiplier_`. This prevents us from getting stuck picking
2590 // L0 forever even when it is hurting write-amp. That could happen
2591 // in dynamic level compaction's write-burst mode where the base
2592 // level's target size can grow to be enormous.
2594 std::max(l0_target_size
,
2595 static_cast<uint64_t>(level_max_bytes_
[base_level_
] /
2596 level_multiplier_
));
2599 std::max(score
, static_cast<double>(total_size
) / l0_target_size
);
2603 // Compute the ratio of current size to size limit.
2604 uint64_t level_bytes_no_compacting
= 0;
2605 for (auto f
: files_
[level
]) {
2606 if (!f
->being_compacted
) {
2607 level_bytes_no_compacting
+= f
->compensated_file_size
;
2610 score
= static_cast<double>(level_bytes_no_compacting
) /
2611 MaxBytesForLevel(level
);
2613 compaction_level_
[level
] = level
;
2614 compaction_score_
[level
] = score
;
2617 // sort all the levels based on their score. Higher scores get listed
2618 // first. Use bubble sort because the number of entries are small.
2619 for (int i
= 0; i
< num_levels() - 2; i
++) {
2620 for (int j
= i
+ 1; j
< num_levels() - 1; j
++) {
2621 if (compaction_score_
[i
] < compaction_score_
[j
]) {
2622 double score
= compaction_score_
[i
];
2623 int level
= compaction_level_
[i
];
2624 compaction_score_
[i
] = compaction_score_
[j
];
2625 compaction_level_
[i
] = compaction_level_
[j
];
2626 compaction_score_
[j
] = score
;
2627 compaction_level_
[j
] = level
;
2631 ComputeFilesMarkedForCompaction();
2632 ComputeBottommostFilesMarkedForCompaction();
2633 if (mutable_cf_options
.ttl
> 0) {
2634 ComputeExpiredTtlFiles(immutable_cf_options
, mutable_cf_options
.ttl
);
2636 if (mutable_cf_options
.periodic_compaction_seconds
> 0) {
2637 ComputeFilesMarkedForPeriodicCompaction(
2638 immutable_cf_options
, mutable_cf_options
.periodic_compaction_seconds
);
2640 EstimateCompactionBytesNeeded(mutable_cf_options
);
2643 void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
2644 files_marked_for_compaction_
.clear();
2645 int last_qualify_level
= 0;
2647 // Do not include files from the last level with data
2648 // If table properties collector suggests a file on the last level,
2649 // we should not move it to a new level.
2650 for (int level
= num_levels() - 1; level
>= 1; level
--) {
2651 if (!files_
[level
].empty()) {
2652 last_qualify_level
= level
- 1;
2657 for (int level
= 0; level
<= last_qualify_level
; level
++) {
2658 for (auto* f
: files_
[level
]) {
2659 if (!f
->being_compacted
&& f
->marked_for_compaction
) {
2660 files_marked_for_compaction_
.emplace_back(level
, f
);
2666 void VersionStorageInfo::ComputeExpiredTtlFiles(
2667 const ImmutableCFOptions
& ioptions
, const uint64_t ttl
) {
2670 expired_ttl_files_
.clear();
2672 int64_t _current_time
;
2673 auto status
= ioptions
.env
->GetCurrentTime(&_current_time
);
2677 const uint64_t current_time
= static_cast<uint64_t>(_current_time
);
2679 for (int level
= 0; level
< num_levels() - 1; level
++) {
2680 for (FileMetaData
* f
: files_
[level
]) {
2681 if (!f
->being_compacted
) {
2682 uint64_t oldest_ancester_time
= f
->TryGetOldestAncesterTime();
2683 if (oldest_ancester_time
> 0 &&
2684 oldest_ancester_time
< (current_time
- ttl
)) {
2685 expired_ttl_files_
.emplace_back(level
, f
);
2692 void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
2693 const ImmutableCFOptions
& ioptions
,
2694 const uint64_t periodic_compaction_seconds
) {
2695 assert(periodic_compaction_seconds
> 0);
2697 files_marked_for_periodic_compaction_
.clear();
2699 int64_t temp_current_time
;
2700 auto status
= ioptions
.env
->GetCurrentTime(&temp_current_time
);
2704 const uint64_t current_time
= static_cast<uint64_t>(temp_current_time
);
2706 // If periodic_compaction_seconds is larger than current time, periodic
2707 // compaction can't possibly be triggered.
2708 if (periodic_compaction_seconds
> current_time
) {
2712 const uint64_t allowed_time_limit
=
2713 current_time
- periodic_compaction_seconds
;
2715 for (int level
= 0; level
< num_levels(); level
++) {
2716 for (auto f
: files_
[level
]) {
2717 if (!f
->being_compacted
) {
2718 // Compute a file's modification time in the following order:
2719 // 1. Use file_creation_time table property if it is > 0.
2720 // 2. Use creation_time table property if it is > 0.
2721 // 3. Use file's mtime metadata if the above two table properties are 0.
2722 // Don't consider the file at all if the modification time cannot be
2723 // correctly determined based on the above conditions.
2724 uint64_t file_modification_time
= f
->TryGetFileCreationTime();
2725 if (file_modification_time
== kUnknownFileCreationTime
) {
2726 file_modification_time
= f
->TryGetOldestAncesterTime();
2728 if (file_modification_time
== kUnknownOldestAncesterTime
) {
2729 auto file_path
= TableFileName(ioptions
.cf_paths
, f
->fd
.GetNumber(),
2731 status
= ioptions
.env
->GetFileModificationTime(
2732 file_path
, &file_modification_time
);
2734 ROCKS_LOG_WARN(ioptions
.info_log
,
2735 "Can't get file modification time: %s: %s",
2736 file_path
.c_str(), status
.ToString().c_str());
2740 if (file_modification_time
> 0 &&
2741 file_modification_time
< allowed_time_limit
) {
2742 files_marked_for_periodic_compaction_
.emplace_back(level
, f
);
2751 // used to sort files by size
2757 // Compator that is used to sort files based on their size
2758 // In normal mode: descending size
2759 bool CompareCompensatedSizeDescending(const Fsize
& first
, const Fsize
& second
) {
2760 return (first
.file
->compensated_file_size
>
2761 second
.file
->compensated_file_size
);
2763 } // anonymous namespace
2765 void VersionStorageInfo::AddFile(int level
, FileMetaData
* f
) {
2766 auto& level_files
= files_
[level
];
2767 level_files
.push_back(f
);
2771 const uint64_t file_number
= f
->fd
.GetNumber();
2773 assert(file_locations_
.find(file_number
) == file_locations_
.end());
2774 file_locations_
.emplace(file_number
,
2775 FileLocation(level
, level_files
.size() - 1));
2778 void VersionStorageInfo::AddBlobFile(
2779 std::shared_ptr
<BlobFileMetaData
> blob_file_meta
) {
2780 assert(blob_file_meta
);
2782 const uint64_t blob_file_number
= blob_file_meta
->GetBlobFileNumber();
2784 auto it
= blob_files_
.lower_bound(blob_file_number
);
2785 assert(it
== blob_files_
.end() || it
->first
!= blob_file_number
);
2788 it
, BlobFiles::value_type(blob_file_number
, std::move(blob_file_meta
)));
2791 // Version::PrepareApply() need to be called before calling the function, or
2792 // following functions called:
2793 // 1. UpdateNumNonEmptyLevels();
2794 // 2. CalculateBaseBytes();
2795 // 3. UpdateFilesByCompactionPri();
2796 // 4. GenerateFileIndexer();
2797 // 5. GenerateLevelFilesBrief();
2798 // 6. GenerateLevel0NonOverlapping();
2799 // 7. GenerateBottommostFiles();
2800 void VersionStorageInfo::SetFinalized() {
2803 if (compaction_style_
!= kCompactionStyleLevel
) {
2804 // Not level based compaction.
2807 assert(base_level_
< 0 || num_levels() == 1 ||
2808 (base_level_
>= 1 && base_level_
< num_levels()));
2809 // Verify all levels newer than base_level are empty except L0
2810 for (int level
= 1; level
< base_level(); level
++) {
2811 assert(NumLevelBytes(level
) == 0);
2813 uint64_t max_bytes_prev_level
= 0;
2814 for (int level
= base_level(); level
< num_levels() - 1; level
++) {
2815 if (LevelFiles(level
).size() == 0) {
2818 assert(MaxBytesForLevel(level
) >= max_bytes_prev_level
);
2819 max_bytes_prev_level
= MaxBytesForLevel(level
);
2821 int num_empty_non_l0_level
= 0;
2822 for (int level
= 0; level
< num_levels(); level
++) {
2823 assert(LevelFiles(level
).size() == 0 ||
2824 LevelFiles(level
).size() == LevelFilesBrief(level
).num_files
);
2825 if (level
> 0 && NumLevelBytes(level
) > 0) {
2826 num_empty_non_l0_level
++;
2828 if (LevelFiles(level
).size() > 0) {
2829 assert(level
< num_non_empty_levels());
2832 assert(compaction_level_
.size() > 0);
2833 assert(compaction_level_
.size() == compaction_score_
.size());
2837 void VersionStorageInfo::UpdateNumNonEmptyLevels() {
2838 num_non_empty_levels_
= num_levels_
;
2839 for (int i
= num_levels_
- 1; i
>= 0; i
--) {
2840 if (files_
[i
].size() != 0) {
2843 num_non_empty_levels_
= i
;
2849 // Sort `temp` based on ratio of overlapping size over file size
2850 void SortFileByOverlappingRatio(
2851 const InternalKeyComparator
& icmp
, const std::vector
<FileMetaData
*>& files
,
2852 const std::vector
<FileMetaData
*>& next_level_files
,
2853 std::vector
<Fsize
>* temp
) {
2854 std::unordered_map
<uint64_t, uint64_t> file_to_order
;
2855 auto next_level_it
= next_level_files
.begin();
2857 for (auto& file
: files
) {
2858 uint64_t overlapping_bytes
= 0;
2859 // Skip files in next level that is smaller than current file
2860 while (next_level_it
!= next_level_files
.end() &&
2861 icmp
.Compare((*next_level_it
)->largest
, file
->smallest
) < 0) {
2865 while (next_level_it
!= next_level_files
.end() &&
2866 icmp
.Compare((*next_level_it
)->smallest
, file
->largest
) < 0) {
2867 overlapping_bytes
+= (*next_level_it
)->fd
.file_size
;
2869 if (icmp
.Compare((*next_level_it
)->largest
, file
->largest
) > 0) {
2870 // next level file cross large boundary of current file.
2876 assert(file
->compensated_file_size
!= 0);
2877 file_to_order
[file
->fd
.GetNumber()] =
2878 overlapping_bytes
* 1024u / file
->compensated_file_size
;
2881 std::sort(temp
->begin(), temp
->end(),
2882 [&](const Fsize
& f1
, const Fsize
& f2
) -> bool {
2883 return file_to_order
[f1
.file
->fd
.GetNumber()] <
2884 file_to_order
[f2
.file
->fd
.GetNumber()];
2889 void VersionStorageInfo::UpdateFilesByCompactionPri(
2890 CompactionPri compaction_pri
) {
2891 if (compaction_style_
== kCompactionStyleNone
||
2892 compaction_style_
== kCompactionStyleFIFO
||
2893 compaction_style_
== kCompactionStyleUniversal
) {
2897 // No need to sort the highest level because it is never compacted.
2898 for (int level
= 0; level
< num_levels() - 1; level
++) {
2899 const std::vector
<FileMetaData
*>& files
= files_
[level
];
2900 auto& files_by_compaction_pri
= files_by_compaction_pri_
[level
];
2901 assert(files_by_compaction_pri
.size() == 0);
2903 // populate a temp vector for sorting based on size
2904 std::vector
<Fsize
> temp(files
.size());
2905 for (size_t i
= 0; i
< files
.size(); i
++) {
2907 temp
[i
].file
= files
[i
];
2910 // sort the top number_of_files_to_sort_ based on file size
2911 size_t num
= VersionStorageInfo::kNumberFilesToSort
;
2912 if (num
> temp
.size()) {
2915 switch (compaction_pri
) {
2916 case kByCompensatedSize
:
2917 std::partial_sort(temp
.begin(), temp
.begin() + num
, temp
.end(),
2918 CompareCompensatedSizeDescending
);
2920 case kOldestLargestSeqFirst
:
2921 std::sort(temp
.begin(), temp
.end(),
2922 [](const Fsize
& f1
, const Fsize
& f2
) -> bool {
2923 return f1
.file
->fd
.largest_seqno
<
2924 f2
.file
->fd
.largest_seqno
;
2927 case kOldestSmallestSeqFirst
:
2928 std::sort(temp
.begin(), temp
.end(),
2929 [](const Fsize
& f1
, const Fsize
& f2
) -> bool {
2930 return f1
.file
->fd
.smallest_seqno
<
2931 f2
.file
->fd
.smallest_seqno
;
2934 case kMinOverlappingRatio
:
2935 SortFileByOverlappingRatio(*internal_comparator_
, files_
[level
],
2936 files_
[level
+ 1], &temp
);
2941 assert(temp
.size() == files
.size());
2943 // initialize files_by_compaction_pri_
2944 for (size_t i
= 0; i
< temp
.size(); i
++) {
2945 files_by_compaction_pri
.push_back(static_cast<int>(temp
[i
].index
));
2947 next_file_to_compact_by_size_
[level
] = 0;
2948 assert(files_
[level
].size() == files_by_compaction_pri_
[level
].size());
2952 void VersionStorageInfo::GenerateLevel0NonOverlapping() {
2953 assert(!finalized_
);
2954 level0_non_overlapping_
= true;
2955 if (level_files_brief_
.size() == 0) {
2959 // A copy of L0 files sorted by smallest key
2960 std::vector
<FdWithKeyRange
> level0_sorted_file(
2961 level_files_brief_
[0].files
,
2962 level_files_brief_
[0].files
+ level_files_brief_
[0].num_files
);
2963 std::sort(level0_sorted_file
.begin(), level0_sorted_file
.end(),
2964 [this](const FdWithKeyRange
& f1
, const FdWithKeyRange
& f2
) -> bool {
2965 return (internal_comparator_
->Compare(f1
.smallest_key
,
2966 f2
.smallest_key
) < 0);
2969 for (size_t i
= 1; i
< level0_sorted_file
.size(); ++i
) {
2970 FdWithKeyRange
& f
= level0_sorted_file
[i
];
2971 FdWithKeyRange
& prev
= level0_sorted_file
[i
- 1];
2972 if (internal_comparator_
->Compare(prev
.largest_key
, f
.smallest_key
) >= 0) {
2973 level0_non_overlapping_
= false;
2979 void VersionStorageInfo::GenerateBottommostFiles() {
2980 assert(!finalized_
);
2981 assert(bottommost_files_
.empty());
2982 for (size_t level
= 0; level
< level_files_brief_
.size(); ++level
) {
2983 for (size_t file_idx
= 0; file_idx
< level_files_brief_
[level
].num_files
;
2985 const FdWithKeyRange
& f
= level_files_brief_
[level
].files
[file_idx
];
2988 l0_file_idx
= static_cast<int>(file_idx
);
2992 Slice smallest_user_key
= ExtractUserKey(f
.smallest_key
);
2993 Slice largest_user_key
= ExtractUserKey(f
.largest_key
);
2994 if (!RangeMightExistAfterSortedRun(smallest_user_key
, largest_user_key
,
2995 static_cast<int>(level
),
2997 bottommost_files_
.emplace_back(static_cast<int>(level
),
3004 void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum
) {
3005 assert(seqnum
>= oldest_snapshot_seqnum_
);
3006 oldest_snapshot_seqnum_
= seqnum
;
3007 if (oldest_snapshot_seqnum_
> bottommost_files_mark_threshold_
) {
3008 ComputeBottommostFilesMarkedForCompaction();
3012 void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
3013 bottommost_files_marked_for_compaction_
.clear();
3014 bottommost_files_mark_threshold_
= kMaxSequenceNumber
;
3015 for (auto& level_and_file
: bottommost_files_
) {
3016 if (!level_and_file
.second
->being_compacted
&&
3017 level_and_file
.second
->fd
.largest_seqno
!= 0 &&
3018 level_and_file
.second
->num_deletions
> 1) {
3019 // largest_seqno might be nonzero due to containing the final key in an
3020 // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
3021 // ensures the file really contains deleted or overwritten keys.
3022 if (level_and_file
.second
->fd
.largest_seqno
< oldest_snapshot_seqnum_
) {
3023 bottommost_files_marked_for_compaction_
.push_back(level_and_file
);
3025 bottommost_files_mark_threshold_
=
3026 std::min(bottommost_files_mark_threshold_
,
3027 level_and_file
.second
->fd
.largest_seqno
);
3033 void Version::Ref() {
3037 bool Version::Unref() {
3047 bool VersionStorageInfo::OverlapInLevel(int level
,
3048 const Slice
* smallest_user_key
,
3049 const Slice
* largest_user_key
) {
3050 if (level
>= num_non_empty_levels_
) {
3051 // empty level, no overlap
3054 return SomeFileOverlapsRange(*internal_comparator_
, (level
> 0),
3055 level_files_brief_
[level
], smallest_user_key
,
3059 // Store in "*inputs" all files in "level" that overlap [begin,end]
3060 // If hint_index is specified, then it points to a file in the
3061 // overlapping range.
3062 // The file_index returns a pointer to any file in an overlapping range.
3063 void VersionStorageInfo::GetOverlappingInputs(
3064 int level
, const InternalKey
* begin
, const InternalKey
* end
,
3065 std::vector
<FileMetaData
*>* inputs
, int hint_index
, int* file_index
,
3066 bool expand_range
, InternalKey
** next_smallest
) const {
3067 if (level
>= num_non_empty_levels_
) {
3068 // this level is empty, no overlapping inputs
3076 const Comparator
* user_cmp
= user_comparator_
;
3078 GetOverlappingInputsRangeBinarySearch(level
, begin
, end
, inputs
, hint_index
,
3079 file_index
, false, next_smallest
);
3083 if (next_smallest
) {
3084 // next_smallest key only makes sense for non-level 0, where files are
3086 *next_smallest
= nullptr;
3089 Slice user_begin
, user_end
;
3090 if (begin
!= nullptr) {
3091 user_begin
= begin
->user_key();
3093 if (end
!= nullptr) {
3094 user_end
= end
->user_key();
3097 // index stores the file index need to check.
3098 std::list
<size_t> index
;
3099 for (size_t i
= 0; i
< level_files_brief_
[level
].num_files
; i
++) {
3100 index
.emplace_back(i
);
3103 while (!index
.empty()) {
3104 bool found_overlapping_file
= false;
3105 auto iter
= index
.begin();
3106 while (iter
!= index
.end()) {
3107 FdWithKeyRange
* f
= &(level_files_brief_
[level
].files
[*iter
]);
3108 const Slice file_start
= ExtractUserKey(f
->smallest_key
);
3109 const Slice file_limit
= ExtractUserKey(f
->largest_key
);
3110 if (begin
!= nullptr &&
3111 user_cmp
->CompareWithoutTimestamp(file_limit
, user_begin
) < 0) {
3112 // "f" is completely before specified range; skip it
3114 } else if (end
!= nullptr &&
3115 user_cmp
->CompareWithoutTimestamp(file_start
, user_end
) > 0) {
3116 // "f" is completely after specified range; skip it
3120 inputs
->emplace_back(files_
[level
][*iter
]);
3121 found_overlapping_file
= true;
3122 // record the first file index.
3123 if (file_index
&& *file_index
== -1) {
3124 *file_index
= static_cast<int>(*iter
);
3126 // the related file is overlap, erase to avoid checking again.
3127 iter
= index
.erase(iter
);
3129 if (begin
!= nullptr &&
3130 user_cmp
->CompareWithoutTimestamp(file_start
, user_begin
) < 0) {
3131 user_begin
= file_start
;
3133 if (end
!= nullptr &&
3134 user_cmp
->CompareWithoutTimestamp(file_limit
, user_end
) > 0) {
3135 user_end
= file_limit
;
3140 // if all the files left are not overlap, break
3141 if (!found_overlapping_file
) {
3147 // Store in "*inputs" files in "level" that within range [begin,end]
3148 // Guarantee a "clean cut" boundary between the files in inputs
3149 // and the surrounding files and the maxinum number of files.
3150 // This will ensure that no parts of a key are lost during compaction.
3151 // If hint_index is specified, then it points to a file in the range.
3152 // The file_index returns a pointer to any file in an overlapping range.
3153 void VersionStorageInfo::GetCleanInputsWithinInterval(
3154 int level
, const InternalKey
* begin
, const InternalKey
* end
,
3155 std::vector
<FileMetaData
*>* inputs
, int hint_index
, int* file_index
) const {
3160 if (level
>= num_non_empty_levels_
|| level
== 0 ||
3161 level_files_brief_
[level
].num_files
== 0) {
3162 // this level is empty, no inputs within range
3163 // also don't support clean input interval within L0
3167 GetOverlappingInputsRangeBinarySearch(level
, begin
, end
, inputs
,
3168 hint_index
, file_index
,
3169 true /* within_interval */);
3172 // Store in "*inputs" all files in "level" that overlap [begin,end]
3173 // Employ binary search to find at least one file that overlaps the
3174 // specified range. From that file, iterate backwards and
3175 // forwards to find all overlapping files.
3176 // if within_range is set, then only store the maximum clean inputs
3177 // within range [begin, end]. "clean" means there is a boudnary
3178 // between the files in "*inputs" and the surrounding files
3179 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
3180 int level
, const InternalKey
* begin
, const InternalKey
* end
,
3181 std::vector
<FileMetaData
*>* inputs
, int hint_index
, int* file_index
,
3182 bool within_interval
, InternalKey
** next_smallest
) const {
3185 auto user_cmp
= user_comparator_
;
3186 const FdWithKeyRange
* files
= level_files_brief_
[level
].files
;
3187 const int num_files
= static_cast<int>(level_files_brief_
[level
].num_files
);
3189 // begin to use binary search to find lower bound
3191 int start_index
= 0;
3192 int end_index
= num_files
;
3194 if (begin
!= nullptr) {
3195 // if within_interval is true, with file_key would find
3196 // not overlapping ranges in std::lower_bound.
3197 auto cmp
= [&user_cmp
, &within_interval
](const FdWithKeyRange
& f
,
3198 const InternalKey
* k
) {
3199 auto& file_key
= within_interval
? f
.file_metadata
->smallest
3200 : f
.file_metadata
->largest
;
3201 return sstableKeyCompare(user_cmp
, file_key
, *k
) < 0;
3204 start_index
= static_cast<int>(
3205 std::lower_bound(files
,
3206 files
+ (hint_index
== -1 ? num_files
: hint_index
),
3210 if (start_index
> 0 && within_interval
) {
3211 bool is_overlapping
= true;
3212 while (is_overlapping
&& start_index
< num_files
) {
3213 auto& pre_limit
= files
[start_index
- 1].file_metadata
->largest
;
3214 auto& cur_start
= files
[start_index
].file_metadata
->smallest
;
3215 is_overlapping
= sstableKeyCompare(user_cmp
, pre_limit
, cur_start
) == 0;
3216 start_index
+= is_overlapping
;
3221 if (end
!= nullptr) {
3222 // if within_interval is true, with file_key would find
3223 // not overlapping ranges in std::upper_bound.
3224 auto cmp
= [&user_cmp
, &within_interval
](const InternalKey
* k
,
3225 const FdWithKeyRange
& f
) {
3226 auto& file_key
= within_interval
? f
.file_metadata
->largest
3227 : f
.file_metadata
->smallest
;
3228 return sstableKeyCompare(user_cmp
, *k
, file_key
) < 0;
3231 end_index
= static_cast<int>(
3232 std::upper_bound(files
+ start_index
, files
+ num_files
, end
, cmp
) -
3235 if (end_index
< num_files
&& within_interval
) {
3236 bool is_overlapping
= true;
3237 while (is_overlapping
&& end_index
> start_index
) {
3238 auto& next_start
= files
[end_index
].file_metadata
->smallest
;
3239 auto& cur_limit
= files
[end_index
- 1].file_metadata
->largest
;
3241 sstableKeyCompare(user_cmp
, cur_limit
, next_start
) == 0;
3242 end_index
-= is_overlapping
;
3247 assert(start_index
<= end_index
);
3249 // If there were no overlapping files, return immediately.
3250 if (start_index
== end_index
) {
3251 if (next_smallest
) {
3252 *next_smallest
= nullptr;
3257 assert(start_index
< end_index
);
3259 // returns the index where an overlap is found
3261 *file_index
= start_index
;
3264 // insert overlapping files into vector
3265 for (int i
= start_index
; i
< end_index
; i
++) {
3266 inputs
->push_back(files_
[level
][i
]);
3269 if (next_smallest
!= nullptr) {
3270 // Provide the next key outside the range covered by inputs
3271 if (end_index
< static_cast<int>(files_
[level
].size())) {
3272 **next_smallest
= files_
[level
][end_index
]->smallest
;
3274 *next_smallest
= nullptr;
3279 uint64_t VersionStorageInfo::NumLevelBytes(int level
) const {
3281 assert(level
< num_levels());
3282 return TotalFileSize(files_
[level
]);
3285 const char* VersionStorageInfo::LevelSummary(
3286 LevelSummaryStorage
* scratch
) const {
3288 if (compaction_style_
== kCompactionStyleLevel
&& num_levels() > 1) {
3289 assert(base_level_
< static_cast<int>(level_max_bytes_
.size()));
3290 if (level_multiplier_
!= 0.0) {
3292 scratch
->buffer
, sizeof(scratch
->buffer
),
3293 "base level %d level multiplier %.2f max bytes base %" PRIu64
" ",
3294 base_level_
, level_multiplier_
, level_max_bytes_
[base_level_
]);
3298 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
, "files[");
3299 for (int i
= 0; i
< num_levels(); i
++) {
3300 int sz
= sizeof(scratch
->buffer
) - len
;
3301 int ret
= snprintf(scratch
->buffer
+ len
, sz
, "%d ", int(files_
[i
].size()));
3302 if (ret
< 0 || ret
>= sz
) break;
3306 // overwrite the last space
3309 len
+= snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
,
3310 "] max score %.2f", compaction_score_
[0]);
3312 if (!files_marked_for_compaction_
.empty()) {
3313 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
,
3314 " (%" ROCKSDB_PRIszt
" files need compaction)",
3315 files_marked_for_compaction_
.size());
3318 return scratch
->buffer
;
3321 const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage
* scratch
,
3323 int len
= snprintf(scratch
->buffer
, sizeof(scratch
->buffer
), "files_size[");
3324 for (const auto& f
: files_
[level
]) {
3325 int sz
= sizeof(scratch
->buffer
) - len
;
3327 AppendHumanBytes(f
->fd
.GetFileSize(), sztxt
, sizeof(sztxt
));
3328 int ret
= snprintf(scratch
->buffer
+ len
, sz
,
3329 "#%" PRIu64
"(seq=%" PRIu64
",sz=%s,%d) ",
3330 f
->fd
.GetNumber(), f
->fd
.smallest_seqno
, sztxt
,
3331 static_cast<int>(f
->being_compacted
));
3332 if (ret
< 0 || ret
>= sz
)
3336 // overwrite the last space (only if files_[level].size() is non-zero)
3337 if (files_
[level
].size() && len
> 0) {
3340 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
, "]");
3341 return scratch
->buffer
;
3344 int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
3345 uint64_t result
= 0;
3346 std::vector
<FileMetaData
*> overlaps
;
3347 for (int level
= 1; level
< num_levels() - 1; level
++) {
3348 for (const auto& f
: files_
[level
]) {
3349 GetOverlappingInputs(level
+ 1, &f
->smallest
, &f
->largest
, &overlaps
);
3350 const uint64_t sum
= TotalFileSize(overlaps
);
3359 uint64_t VersionStorageInfo::MaxBytesForLevel(int level
) const {
3360 // Note: the result for level zero is not really used since we set
3361 // the level-0 compaction threshold based on number of files.
3363 assert(level
< static_cast<int>(level_max_bytes_
.size()));
3364 return level_max_bytes_
[level
];
3367 void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions
& ioptions
,
3368 const MutableCFOptions
& options
) {
3369 // Special logic to set number of sorted runs.
3370 // It is to match the previous behavior when all files are in L0.
3371 int num_l0_count
= static_cast<int>(files_
[0].size());
3372 if (compaction_style_
== kCompactionStyleUniversal
) {
3373 // For universal compaction, we use level0 score to indicate
3374 // compaction score for the whole DB. Adding other levels as if
3375 // they are L0 files.
3376 for (int i
= 1; i
< num_levels(); i
++) {
3377 if (!files_
[i
].empty()) {
3382 set_l0_delay_trigger_count(num_l0_count
);
3384 level_max_bytes_
.resize(ioptions
.num_levels
);
3385 if (!ioptions
.level_compaction_dynamic_level_bytes
) {
3386 base_level_
= (ioptions
.compaction_style
== kCompactionStyleLevel
) ? 1 : -1;
3388 // Calculate for static bytes base case
3389 for (int i
= 0; i
< ioptions
.num_levels
; ++i
) {
3390 if (i
== 0 && ioptions
.compaction_style
== kCompactionStyleUniversal
) {
3391 level_max_bytes_
[i
] = options
.max_bytes_for_level_base
;
3393 level_max_bytes_
[i
] = MultiplyCheckOverflow(
3394 MultiplyCheckOverflow(level_max_bytes_
[i
- 1],
3395 options
.max_bytes_for_level_multiplier
),
3396 options
.MaxBytesMultiplerAdditional(i
- 1));
3398 level_max_bytes_
[i
] = options
.max_bytes_for_level_base
;
3402 uint64_t max_level_size
= 0;
3404 int first_non_empty_level
= -1;
3405 // Find size of non-L0 level of most data.
3406 // Cannot use the size of the last level because it can be empty or less
3407 // than previous levels after compaction.
3408 for (int i
= 1; i
< num_levels_
; i
++) {
3409 uint64_t total_size
= 0;
3410 for (const auto& f
: files_
[i
]) {
3411 total_size
+= f
->fd
.GetFileSize();
3413 if (total_size
> 0 && first_non_empty_level
== -1) {
3414 first_non_empty_level
= i
;
3416 if (total_size
> max_level_size
) {
3417 max_level_size
= total_size
;
3421 // Prefill every level's max bytes to disallow compaction from there.
3422 for (int i
= 0; i
< num_levels_
; i
++) {
3423 level_max_bytes_
[i
] = std::numeric_limits
<uint64_t>::max();
3426 if (max_level_size
== 0) {
3427 // No data for L1 and up. L0 compacts to last level directly.
3428 // No compaction from L1+ needs to be scheduled.
3429 base_level_
= num_levels_
- 1;
3431 uint64_t l0_size
= 0;
3432 for (const auto& f
: files_
[0]) {
3433 l0_size
+= f
->fd
.GetFileSize();
3436 uint64_t base_bytes_max
=
3437 std::max(options
.max_bytes_for_level_base
, l0_size
);
3438 uint64_t base_bytes_min
= static_cast<uint64_t>(
3439 base_bytes_max
/ options
.max_bytes_for_level_multiplier
);
3441 // Try whether we can make last level's target size to be max_level_size
3442 uint64_t cur_level_size
= max_level_size
;
3443 for (int i
= num_levels_
- 2; i
>= first_non_empty_level
; i
--) {
3444 // Round up after dividing
3445 cur_level_size
= static_cast<uint64_t>(
3446 cur_level_size
/ options
.max_bytes_for_level_multiplier
);
3449 // Calculate base level and its size.
3450 uint64_t base_level_size
;
3451 if (cur_level_size
<= base_bytes_min
) {
3452 // Case 1. If we make target size of last level to be max_level_size,
3453 // target size of the first non-empty level would be smaller than
3454 // base_bytes_min. We set it be base_bytes_min.
3455 base_level_size
= base_bytes_min
+ 1U;
3456 base_level_
= first_non_empty_level
;
3457 ROCKS_LOG_INFO(ioptions
.info_log
,
3458 "More existing levels in DB than needed. "
3459 "max_bytes_for_level_multiplier may not be guaranteed.");
3461 // Find base level (where L0 data is compacted to).
3462 base_level_
= first_non_empty_level
;
3463 while (base_level_
> 1 && cur_level_size
> base_bytes_max
) {
3465 cur_level_size
= static_cast<uint64_t>(
3466 cur_level_size
/ options
.max_bytes_for_level_multiplier
);
3468 if (cur_level_size
> base_bytes_max
) {
3469 // Even L1 will be too large
3470 assert(base_level_
== 1);
3471 base_level_size
= base_bytes_max
;
3473 base_level_size
= cur_level_size
;
3477 level_multiplier_
= options
.max_bytes_for_level_multiplier
;
3478 assert(base_level_size
> 0);
3479 if (l0_size
> base_level_size
&&
3480 (l0_size
> options
.max_bytes_for_level_base
||
3481 static_cast<int>(files_
[0].size() / 2) >=
3482 options
.level0_file_num_compaction_trigger
)) {
3483 // We adjust the base level according to actual L0 size, and adjust
3484 // the level multiplier accordingly, when:
3485 // 1. the L0 size is larger than level size base, or
3486 // 2. number of L0 files reaches twice the L0->L1 compaction trigger
3487 // We don't do this otherwise to keep the LSM-tree structure stable
3488 // unless the L0 compation is backlogged.
3489 base_level_size
= l0_size
;
3490 if (base_level_
== num_levels_
- 1) {
3491 level_multiplier_
= 1.0;
3493 level_multiplier_
= std::pow(
3494 static_cast<double>(max_level_size
) /
3495 static_cast<double>(base_level_size
),
3496 1.0 / static_cast<double>(num_levels_
- base_level_
- 1));
3500 uint64_t level_size
= base_level_size
;
3501 for (int i
= base_level_
; i
< num_levels_
; i
++) {
3502 if (i
> base_level_
) {
3503 level_size
= MultiplyCheckOverflow(level_size
, level_multiplier_
);
3505 // Don't set any level below base_bytes_max. Otherwise, the LSM can
3506 // assume an hourglass shape where L1+ sizes are smaller than L0. This
3507 // causes compaction scoring, which depends on level sizes, to favor L1+
3508 // at the expense of L0, which may fill up and stall.
3509 level_max_bytes_
[i
] = std::max(level_size
, base_bytes_max
);
3515 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
3516 // Estimate the live data size by adding up the size of a maximal set of
3517 // sst files with no range overlap in same or higher level. The less
3518 // compacted, the more optimistic (smaller) this estimate is. Also,
3519 // for multiple sorted runs within a level, file order will matter.
3522 auto ikey_lt
= [this](InternalKey
* x
, InternalKey
* y
) {
3523 return internal_comparator_
->Compare(*x
, *y
) < 0;
3525 // (Ordered) map of largest keys in files being included in size estimate
3526 std::map
<InternalKey
*, FileMetaData
*, decltype(ikey_lt
)> ranges(ikey_lt
);
3528 for (int l
= num_levels_
- 1; l
>= 0; l
--) {
3529 bool found_end
= false;
3530 for (auto file
: files_
[l
]) {
3531 // Find the first file already included with largest key is larger than
3532 // the smallest key of `file`. If that file does not overlap with the
3533 // current file, none of the files in the map does. If there is
3534 // no potential overlap, we can safely insert the rest of this level
3535 // (if the level is not 0) into the map without checking again because
3536 // the elements in the level are sorted and non-overlapping.
3537 auto lb
= (found_end
&& l
!= 0) ?
3538 ranges
.end() : ranges
.lower_bound(&file
->smallest
);
3539 found_end
= (lb
== ranges
.end());
3540 if (found_end
|| internal_comparator_
->Compare(
3541 file
->largest
, (*lb
).second
->smallest
) < 0) {
3542 ranges
.emplace_hint(lb
, &file
->largest
, file
);
3543 size
+= file
->fd
.file_size
;
3550 bool VersionStorageInfo::RangeMightExistAfterSortedRun(
3551 const Slice
& smallest_user_key
, const Slice
& largest_user_key
,
3552 int last_level
, int last_l0_idx
) {
3553 assert((last_l0_idx
!= -1) == (last_level
== 0));
3554 // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
3555 // bottommost only if it's the oldest L0 file and there are no files on older
3556 // levels. It'd be better to consider it bottommost if there's no overlap in
3557 // older levels/files.
3558 if (last_level
== 0 &&
3559 last_l0_idx
!= static_cast<int>(LevelFiles(0).size() - 1)) {
3563 // Checks whether there are files living beyond the `last_level`. If lower
3564 // levels have files, it checks for overlap between [`smallest_key`,
3565 // `largest_key`] and those files. Bottomlevel optimizations can be made if
3566 // there are no files in lower levels or if there is no overlap with the files
3567 // in the lower levels.
3568 for (int level
= last_level
+ 1; level
< num_levels(); level
++) {
3569 // The range is not in the bottommost level if there are files in lower
3570 // levels when the `last_level` is 0 or if there are files in lower levels
3571 // which overlap with [`smallest_key`, `largest_key`].
3572 if (files_
[level
].size() > 0 &&
3574 OverlapInLevel(level
, &smallest_user_key
, &largest_user_key
))) {
3581 void Version::AddLiveFiles(std::vector
<uint64_t>* live_table_files
,
3582 std::vector
<uint64_t>* live_blob_files
) const {
3583 assert(live_table_files
);
3584 assert(live_blob_files
);
3586 for (int level
= 0; level
< storage_info_
.num_levels(); ++level
) {
3587 const auto& level_files
= storage_info_
.LevelFiles(level
);
3588 for (const auto& meta
: level_files
) {
3591 live_table_files
->emplace_back(meta
->fd
.GetNumber());
3595 const auto& blob_files
= storage_info_
.GetBlobFiles();
3596 for (const auto& pair
: blob_files
) {
3597 const auto& meta
= pair
.second
;
3600 live_blob_files
->emplace_back(meta
->GetBlobFileNumber());
3604 std::string
Version::DebugString(bool hex
, bool print_stats
) const {
3606 for (int level
= 0; level
< storage_info_
.num_levels_
; level
++) {
3609 // 17:123[1 .. 124]['a' .. 'd']
3610 // 20:43[124 .. 128]['e' .. 'g']
3612 // if print_stats=true:
3613 // 17:123[1 .. 124]['a' .. 'd'](4096)
3614 r
.append("--- level ");
3615 AppendNumberTo(&r
, level
);
3616 r
.append(" --- version# ");
3617 AppendNumberTo(&r
, version_number_
);
3619 const std::vector
<FileMetaData
*>& files
= storage_info_
.files_
[level
];
3620 for (size_t i
= 0; i
< files
.size(); i
++) {
3622 AppendNumberTo(&r
, files
[i
]->fd
.GetNumber());
3624 AppendNumberTo(&r
, files
[i
]->fd
.GetFileSize());
3626 AppendNumberTo(&r
, files
[i
]->fd
.smallest_seqno
);
3628 AppendNumberTo(&r
, files
[i
]->fd
.largest_seqno
);
3631 r
.append(files
[i
]->smallest
.DebugString(hex
));
3633 r
.append(files
[i
]->largest
.DebugString(hex
));
3635 if (files
[i
]->oldest_blob_file_number
!= kInvalidBlobFileNumber
) {
3636 r
.append(" blob_file:");
3637 AppendNumberTo(&r
, files
[i
]->oldest_blob_file_number
);
3642 files
[i
]->stats
.num_reads_sampled
.load(std::memory_order_relaxed
)));
3649 const auto& blob_files
= storage_info_
.GetBlobFiles();
3650 if (!blob_files
.empty()) {
3651 r
.append("--- blob files --- version# ");
3652 AppendNumberTo(&r
, version_number_
);
3654 for (const auto& pair
: blob_files
) {
3655 const auto& blob_file_meta
= pair
.second
;
3656 assert(blob_file_meta
);
3658 r
.append(blob_file_meta
->DebugString());
3666 // this is used to batch writes to the manifest file
3667 struct VersionSet::ManifestWriter
{
3670 InstrumentedCondVar cv
;
3671 ColumnFamilyData
* cfd
;
3672 const MutableCFOptions mutable_cf_options
;
3673 const autovector
<VersionEdit
*>& edit_list
;
3674 const std::function
<void(const Status
&)> manifest_write_callback
;
3676 explicit ManifestWriter(
3677 InstrumentedMutex
* mu
, ColumnFamilyData
* _cfd
,
3678 const MutableCFOptions
& cf_options
, const autovector
<VersionEdit
*>& e
,
3679 const std::function
<void(const Status
&)>& manifest_wcb
)
3683 mutable_cf_options(cf_options
),
3685 manifest_write_callback(manifest_wcb
) {}
3686 ~ManifestWriter() { status
.PermitUncheckedError(); }
3688 bool IsAllWalEdits() const {
3689 bool all_wal_edits
= true;
3690 for (const auto& e
: edit_list
) {
3691 if (!e
->IsWalManipulation()) {
3692 all_wal_edits
= false;
3696 return all_wal_edits
;
3700 Status
AtomicGroupReadBuffer::AddEdit(VersionEdit
* edit
) {
3702 if (edit
->is_in_atomic_group_
) {
3703 TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
3704 if (replay_buffer_
.empty()) {
3705 replay_buffer_
.resize(edit
->remaining_entries_
+ 1);
3706 TEST_SYNC_POINT_CALLBACK(
3707 "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit
);
3709 read_edits_in_atomic_group_
++;
3710 if (read_edits_in_atomic_group_
+ edit
->remaining_entries_
!=
3711 static_cast<uint32_t>(replay_buffer_
.size())) {
3712 TEST_SYNC_POINT_CALLBACK(
3713 "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit
);
3714 return Status::Corruption("corrupted atomic group");
3716 replay_buffer_
[read_edits_in_atomic_group_
- 1] = *edit
;
3717 if (read_edits_in_atomic_group_
== replay_buffer_
.size()) {
3718 TEST_SYNC_POINT_CALLBACK(
3719 "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit
);
3720 return Status::OK();
3722 return Status::OK();
3726 if (!replay_buffer().empty()) {
3727 TEST_SYNC_POINT_CALLBACK(
3728 "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit
);
3729 return Status::Corruption("corrupted atomic group");
3731 return Status::OK();
3734 bool AtomicGroupReadBuffer::IsFull() const {
3735 return read_edits_in_atomic_group_
== replay_buffer_
.size();
3738 bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_
.empty(); }
3740 void AtomicGroupReadBuffer::Clear() {
3741 read_edits_in_atomic_group_
= 0;
3742 replay_buffer_
.clear();
3745 VersionSet::VersionSet(const std::string
& dbname
,
3746 const ImmutableDBOptions
* _db_options
,
3747 const FileOptions
& storage_options
, Cache
* table_cache
,
3748 WriteBufferManager
* write_buffer_manager
,
3749 WriteController
* write_controller
,
3750 BlockCacheTracer
* const block_cache_tracer
,
3751 const std::shared_ptr
<IOTracer
>& io_tracer
)
3752 : column_family_set_(
3753 new ColumnFamilySet(dbname
, _db_options
, storage_options
, table_cache
,
3754 write_buffer_manager
, write_controller
,
3755 block_cache_tracer
, io_tracer
)),
3756 table_cache_(table_cache
),
3757 env_(_db_options
->env
),
3758 fs_(_db_options
->fs
, io_tracer
),
3760 db_options_(_db_options
),
3761 next_file_number_(2),
3762 manifest_file_number_(0), // Filled by Recover()
3763 options_file_number_(0),
3764 pending_manifest_file_number_(0),
3766 last_allocated_sequence_(0),
3767 last_published_sequence_(0),
3768 prev_log_number_(0),
3769 current_version_number_(0),
3770 manifest_file_size_(0),
3771 file_options_(storage_options
),
3772 block_cache_tracer_(block_cache_tracer
),
3773 io_tracer_(io_tracer
) {}
3775 VersionSet::~VersionSet() {
3776 // we need to delete column_family_set_ because its destructor depends on
3778 column_family_set_
.reset();
3779 for (auto& file
: obsolete_files_
) {
3780 if (file
.metadata
->table_reader_handle
) {
3781 table_cache_
->Release(file
.metadata
->table_reader_handle
);
3782 TableCache::Evict(table_cache_
, file
.metadata
->fd
.GetNumber());
3784 file
.DeleteMetadata();
3786 obsolete_files_
.clear();
3787 io_status_
.PermitUncheckedError();
3790 void VersionSet::Reset() {
3791 if (column_family_set_
) {
3792 WriteBufferManager
* wbm
= column_family_set_
->write_buffer_manager();
3793 WriteController
* wc
= column_family_set_
->write_controller();
3794 column_family_set_
.reset(
3795 new ColumnFamilySet(dbname_
, db_options_
, file_options_
, table_cache_
,
3796 wbm
, wc
, block_cache_tracer_
, io_tracer_
));
3799 next_file_number_
.store(2);
3800 min_log_number_to_keep_2pc_
.store(0);
3801 manifest_file_number_
= 0;
3802 options_file_number_
= 0;
3803 pending_manifest_file_number_
= 0;
3804 last_sequence_
.store(0);
3805 last_allocated_sequence_
.store(0);
3806 last_published_sequence_
.store(0);
3807 prev_log_number_
= 0;
3808 descriptor_log_
.reset();
3809 current_version_number_
= 0;
3810 manifest_writers_
.clear();
3811 manifest_file_size_
= 0;
3812 obsolete_files_
.clear();
3813 obsolete_manifests_
.clear();
3817 void VersionSet::AppendVersion(ColumnFamilyData
* column_family_data
,
3819 // compute new compaction score
3820 v
->storage_info()->ComputeCompactionScore(
3821 *column_family_data
->ioptions(),
3822 *column_family_data
->GetLatestMutableCFOptions());
3825 v
->storage_info_
.SetFinalized();
3828 assert(v
->refs_
== 0);
3829 Version
* current
= column_family_data
->current();
3830 assert(v
!= current
);
3831 if (current
!= nullptr) {
3832 assert(current
->refs_
> 0);
3835 column_family_data
->SetCurrent(v
);
3838 // Append to linked list
3839 v
->prev_
= column_family_data
->dummy_versions()->prev_
;
3840 v
->next_
= column_family_data
->dummy_versions();
3841 v
->prev_
->next_
= v
;
3842 v
->next_
->prev_
= v
;
3845 Status
VersionSet::ProcessManifestWrites(
3846 std::deque
<ManifestWriter
>& writers
, InstrumentedMutex
* mu
,
3847 FSDirectory
* db_directory
, bool new_descriptor_log
,
3848 const ColumnFamilyOptions
* new_cf_options
) {
3850 assert(!writers
.empty());
3851 ManifestWriter
& first_writer
= writers
.front();
3852 ManifestWriter
* last_writer
= &first_writer
;
3854 assert(!manifest_writers_
.empty());
3855 assert(manifest_writers_
.front() == &first_writer
);
3857 autovector
<VersionEdit
*> batch_edits
;
3858 autovector
<Version
*> versions
;
3859 autovector
<const MutableCFOptions
*> mutable_cf_options_ptrs
;
3860 std::vector
<std::unique_ptr
<BaseReferencedVersionBuilder
>> builder_guards
;
3862 if (first_writer
.edit_list
.front()->IsColumnFamilyManipulation()) {
3863 // No group commits for column family add or drop
3864 LogAndApplyCFHelper(first_writer
.edit_list
.front());
3865 batch_edits
.push_back(first_writer
.edit_list
.front());
3867 auto it
= manifest_writers_
.cbegin();
3868 size_t group_start
= std::numeric_limits
<size_t>::max();
3869 while (it
!= manifest_writers_
.cend()) {
3870 if ((*it
)->edit_list
.front()->IsColumnFamilyManipulation()) {
3871 // no group commits for column family add or drop
3874 last_writer
= *(it
++);
3875 assert(last_writer
!= nullptr);
3876 assert(last_writer
->cfd
!= nullptr);
3877 if (last_writer
->cfd
->IsDropped()) {
3878 // If we detect a dropped CF at this point, and the corresponding
3879 // version edits belong to an atomic group, then we need to find out
3880 // the preceding version edits in the same atomic group, and update
3881 // their `remaining_entries_` member variable because we are NOT going
3882 // to write the version edits' of dropped CF to the MANIFEST. If we
3883 // don't update, then Recover can report corrupted atomic group because
3884 // the `remaining_entries_` do not match.
3885 if (!batch_edits
.empty()) {
3886 if (batch_edits
.back()->is_in_atomic_group_
&&
3887 batch_edits
.back()->remaining_entries_
> 0) {
3888 assert(group_start
< batch_edits
.size());
3889 const auto& edit_list
= last_writer
->edit_list
;
3891 while (k
< edit_list
.size()) {
3892 if (!edit_list
[k
]->is_in_atomic_group_
) {
3894 } else if (edit_list
[k
]->remaining_entries_
== 0) {
3900 for (auto i
= group_start
; i
< batch_edits
.size(); ++i
) {
3901 assert(static_cast<uint32_t>(k
) <=
3902 batch_edits
.back()->remaining_entries_
);
3903 batch_edits
[i
]->remaining_entries_
-= static_cast<uint32_t>(k
);
3909 // We do a linear search on versions because versions is small.
3910 // TODO(yanqin) maybe consider unordered_map
3911 Version
* version
= nullptr;
3912 VersionBuilder
* builder
= nullptr;
3913 for (int i
= 0; i
!= static_cast<int>(versions
.size()); ++i
) {
3914 uint32_t cf_id
= last_writer
->cfd
->GetID();
3915 if (versions
[i
]->cfd()->GetID() == cf_id
) {
3916 version
= versions
[i
];
3917 assert(!builder_guards
.empty() &&
3918 builder_guards
.size() == versions
.size());
3919 builder
= builder_guards
[i
]->version_builder();
3920 TEST_SYNC_POINT_CALLBACK(
3921 "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id
);
3925 if (version
== nullptr) {
3926 // WAL manipulations do not need to be applied to versions.
3927 if (!last_writer
->IsAllWalEdits()) {
3928 version
= new Version(last_writer
->cfd
, this, file_options_
,
3929 last_writer
->mutable_cf_options
, io_tracer_
,
3930 current_version_number_
++);
3931 versions
.push_back(version
);
3932 mutable_cf_options_ptrs
.push_back(&last_writer
->mutable_cf_options
);
3933 builder_guards
.emplace_back(
3934 new BaseReferencedVersionBuilder(last_writer
->cfd
));
3935 builder
= builder_guards
.back()->version_builder();
3937 assert(last_writer
->IsAllWalEdits() || builder
);
3938 assert(last_writer
->IsAllWalEdits() || version
);
3939 TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion",
3942 for (const auto& e
: last_writer
->edit_list
) {
3943 if (e
->is_in_atomic_group_
) {
3944 if (batch_edits
.empty() || !batch_edits
.back()->is_in_atomic_group_
||
3945 (batch_edits
.back()->is_in_atomic_group_
&&
3946 batch_edits
.back()->remaining_entries_
== 0)) {
3947 group_start
= batch_edits
.size();
3949 } else if (group_start
!= std::numeric_limits
<size_t>::max()) {
3950 group_start
= std::numeric_limits
<size_t>::max();
3952 Status s
= LogAndApplyHelper(last_writer
->cfd
, builder
, e
, mu
);
3954 // free up the allocated memory
3955 for (auto v
: versions
) {
3960 batch_edits
.push_back(e
);
3963 for (int i
= 0; i
< static_cast<int>(versions
.size()); ++i
) {
3964 assert(!builder_guards
.empty() &&
3965 builder_guards
.size() == versions
.size());
3966 auto* builder
= builder_guards
[i
]->version_builder();
3967 Status s
= builder
->SaveTo(versions
[i
]->storage_info());
3969 // free up the allocated memory
3970 for (auto v
: versions
) {
3979 // Verify that version edits of atomic groups have correct
3980 // remaining_entries_.
3982 while (k
< batch_edits
.size()) {
3983 while (k
< batch_edits
.size() && !batch_edits
[k
]->is_in_atomic_group_
) {
3986 if (k
== batch_edits
.size()) {
3990 while (i
< batch_edits
.size()) {
3991 if (!batch_edits
[i
]->is_in_atomic_group_
) {
3994 assert(i
- k
+ batch_edits
[i
]->remaining_entries_
==
3995 batch_edits
[k
]->remaining_entries_
);
3996 if (batch_edits
[i
]->remaining_entries_
== 0) {
4002 assert(batch_edits
[i
- 1]->is_in_atomic_group_
);
4003 assert(0 == batch_edits
[i
- 1]->remaining_entries_
);
4004 std::vector
<VersionEdit
*> tmp
;
4005 for (size_t j
= k
; j
!= i
; ++j
) {
4006 tmp
.emplace_back(batch_edits
[j
]);
4008 TEST_SYNC_POINT_CALLBACK(
4009 "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp
);
4014 assert(pending_manifest_file_number_
== 0);
4015 if (!descriptor_log_
||
4016 manifest_file_size_
> db_options_
->max_manifest_file_size
) {
4017 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
4018 new_descriptor_log
= true;
4020 pending_manifest_file_number_
= manifest_file_number_
;
4023 // Local cached copy of state variable(s). WriteCurrentStateToManifest()
4024 // reads its content after releasing db mutex to avoid race with
4025 // SwitchMemtable().
4026 std::unordered_map
<uint32_t, MutableCFState
> curr_state
;
4027 VersionEdit wal_additions
;
4028 if (new_descriptor_log
) {
4029 pending_manifest_file_number_
= NewFileNumber();
4030 batch_edits
.back()->SetNextFile(next_file_number_
.load());
4032 // if we are writing out new snapshot make sure to persist max column
4034 if (column_family_set_
->GetMaxColumnFamily() > 0) {
4035 first_writer
.edit_list
.front()->SetMaxColumnFamily(
4036 column_family_set_
->GetMaxColumnFamily());
4038 for (const auto* cfd
: *column_family_set_
) {
4039 assert(curr_state
.find(cfd
->GetID()) == curr_state
.end());
4040 curr_state
[cfd
->GetID()] = {cfd
->GetLogNumber()};
4043 for (const auto& wal
: wals_
.GetWals()) {
4044 wal_additions
.AddWal(wal
.first
, wal
.second
);
4048 uint64_t new_manifest_file_size
= 0;
4052 FileOptions opt_file_opts
= fs_
->OptimizeForManifestWrite(file_options_
);
4055 TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
4056 if (!first_writer
.edit_list
.front()->IsColumnFamilyManipulation()) {
4057 for (int i
= 0; i
< static_cast<int>(versions
.size()); ++i
) {
4058 assert(!builder_guards
.empty() &&
4059 builder_guards
.size() == versions
.size());
4060 assert(!mutable_cf_options_ptrs
.empty() &&
4061 builder_guards
.size() == versions
.size());
4062 ColumnFamilyData
* cfd
= versions
[i
]->cfd_
;
4063 s
= builder_guards
[i
]->version_builder()->LoadTableHandlers(
4064 cfd
->internal_stats(), 1 /* max_threads */,
4065 true /* prefetch_index_and_filter_in_cache */,
4066 false /* is_initial_load */,
4067 mutable_cf_options_ptrs
[i
]->prefix_extractor
.get(),
4068 MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs
[i
]));
4070 if (db_options_
->paranoid_checks
) {
4078 if (s
.ok() && new_descriptor_log
) {
4079 // This is fine because everything inside of this block is serialized --
4080 // only one thread can be here at the same time
4081 // create new manifest file
4082 ROCKS_LOG_INFO(db_options_
->info_log
, "Creating manifest %" PRIu64
"\n",
4083 pending_manifest_file_number_
);
4084 std::string descriptor_fname
=
4085 DescriptorFileName(dbname_
, pending_manifest_file_number_
);
4086 std::unique_ptr
<FSWritableFile
> descriptor_file
;
4087 io_s
= NewWritableFile(fs_
.get(), descriptor_fname
, &descriptor_file
,
4090 descriptor_file
->SetPreallocationBlockSize(
4091 db_options_
->manifest_preallocation_size
);
4093 std::unique_ptr
<WritableFileWriter
> file_writer(new WritableFileWriter(
4094 std::move(descriptor_file
), descriptor_fname
, opt_file_opts
, env_
,
4095 io_tracer_
, nullptr, db_options_
->listeners
));
4096 descriptor_log_
.reset(
4097 new log::Writer(std::move(file_writer
), 0, false));
4098 s
= WriteCurrentStateToManifest(curr_state
, wal_additions
,
4099 descriptor_log_
.get(), io_s
);
4106 if (!first_writer
.edit_list
.front()->IsColumnFamilyManipulation()) {
4107 for (int i
= 0; i
< static_cast<int>(versions
.size()); ++i
) {
4108 versions
[i
]->PrepareApply(*mutable_cf_options_ptrs
[i
], true);
4112 // Write new records to MANIFEST log
4116 for (auto& e
: batch_edits
) {
4118 if (!e
->EncodeTo(&record
)) {
4119 s
= Status::Corruption("Unable to encode VersionEdit:" +
4120 e
->DebugString(true));
4123 TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
4124 rocksdb_kill_odds
* REDUCE_ODDS2
);
4126 if (batch_edits
.size() > 1 && batch_edits
.size() - 1 == idx
) {
4127 TEST_SYNC_POINT_CALLBACK(
4128 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
4131 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
4134 #endif /* !NDEBUG */
4135 io_s
= descriptor_log_
->AddRecord(record
);
4142 io_s
= SyncManifest(env_
, db_options_
, descriptor_log_
->file());
4143 TEST_SYNC_POINT_CALLBACK(
4144 "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s
);
4148 ROCKS_LOG_ERROR(db_options_
->info_log
, "MANIFEST write %s\n",
4149 s
.ToString().c_str());
4153 // If we just created a new descriptor file, install it by writing a
4154 // new CURRENT file that points to it.
4155 if (s
.ok() && new_descriptor_log
) {
4156 io_s
= SetCurrentFile(fs_
.get(), dbname_
, pending_manifest_file_number_
,
4161 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
4165 // find offset in manifest file where this version is stored.
4166 new_manifest_file_size
= descriptor_log_
->file()->GetFileSize();
4169 if (first_writer
.edit_list
.front()->is_column_family_drop_
) {
4170 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
4171 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
4172 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
4175 LogFlush(db_options_
->info_log
);
4176 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
4181 // Apply WAL edits, DB mutex must be held.
4182 for (auto& e
: batch_edits
) {
4183 if (e
->IsWalAddition()) {
4184 s
= wals_
.AddWals(e
->GetWalAdditions());
4185 } else if (e
->IsWalDeletion()) {
4186 s
= wals_
.DeleteWalsBefore(e
->GetWalDeletion().GetLogNumber());
4195 if (io_status_
.ok()) {
4198 } else if (!io_status_
.ok()) {
4202 // Append the old manifest file to the obsolete_manifest_ list to be deleted
4203 // by PurgeObsoleteFiles later.
4204 if (s
.ok() && new_descriptor_log
) {
4205 obsolete_manifests_
.emplace_back(
4206 DescriptorFileName("", manifest_file_number_
));
4209 // Install the new versions
4211 if (first_writer
.edit_list
.front()->is_column_family_add_
) {
4212 assert(batch_edits
.size() == 1);
4213 assert(new_cf_options
!= nullptr);
4214 CreateColumnFamily(*new_cf_options
, first_writer
.edit_list
.front());
4215 } else if (first_writer
.edit_list
.front()->is_column_family_drop_
) {
4216 assert(batch_edits
.size() == 1);
4217 first_writer
.cfd
->SetDropped();
4218 first_writer
.cfd
->UnrefAndTryDelete();
4220 // Each version in versions corresponds to a column family.
4221 // For each column family, update its log number indicating that logs
4222 // with number smaller than this should be ignored.
4223 for (const auto version
: versions
) {
4224 uint64_t max_log_number_in_batch
= 0;
4225 uint32_t cf_id
= version
->cfd_
->GetID();
4226 for (const auto& e
: batch_edits
) {
4227 if (e
->has_log_number_
&& e
->column_family_
== cf_id
) {
4228 max_log_number_in_batch
=
4229 std::max(max_log_number_in_batch
, e
->log_number_
);
4232 if (max_log_number_in_batch
!= 0) {
4233 assert(version
->cfd_
->GetLogNumber() <= max_log_number_in_batch
);
4234 version
->cfd_
->SetLogNumber(max_log_number_in_batch
);
4238 uint64_t last_min_log_number_to_keep
= 0;
4239 for (auto& e
: batch_edits
) {
4240 if (e
->has_min_log_number_to_keep_
) {
4241 last_min_log_number_to_keep
=
4242 std::max(last_min_log_number_to_keep
, e
->min_log_number_to_keep_
);
4246 if (last_min_log_number_to_keep
!= 0) {
4247 // Should only be set in 2PC mode.
4248 MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep
);
4251 for (int i
= 0; i
< static_cast<int>(versions
.size()); ++i
) {
4252 ColumnFamilyData
* cfd
= versions
[i
]->cfd_
;
4253 AppendVersion(cfd
, versions
[i
]);
4256 manifest_file_number_
= pending_manifest_file_number_
;
4257 manifest_file_size_
= new_manifest_file_size
;
4258 prev_log_number_
= first_writer
.edit_list
.front()->prev_log_number_
;
4260 std::string version_edits
;
4261 for (auto& e
: batch_edits
) {
4262 version_edits
+= ("\n" + e
->DebugString(true));
4264 ROCKS_LOG_ERROR(db_options_
->info_log
,
4265 "Error in committing version edit to MANIFEST: %s",
4266 version_edits
.c_str());
4267 for (auto v
: versions
) {
4270 // If manifest append failed for whatever reason, the file could be
4271 // corrupted. So we need to force the next version update to start a
4272 // new manifest file.
4273 descriptor_log_
.reset();
4274 if (new_descriptor_log
) {
4275 ROCKS_LOG_INFO(db_options_
->info_log
,
4276 "Deleting manifest %" PRIu64
" current manifest %" PRIu64
4278 pending_manifest_file_number_
, manifest_file_number_
);
4279 Status manifest_del_status
= env_
->DeleteFile(
4280 DescriptorFileName(dbname_
, pending_manifest_file_number_
));
4281 if (!manifest_del_status
.ok()) {
4282 ROCKS_LOG_WARN(db_options_
->info_log
,
4283 "Failed to delete manifest %" PRIu64
": %s",
4284 pending_manifest_file_number_
,
4285 manifest_del_status
.ToString().c_str());
4290 pending_manifest_file_number_
= 0;
4292 // wake up all the waiting writers
4294 ManifestWriter
* ready
= manifest_writers_
.front();
4295 manifest_writers_
.pop_front();
4296 bool need_signal
= true;
4297 for (const auto& w
: writers
) {
4299 need_signal
= false;
4305 if (ready
->manifest_write_callback
) {
4306 (ready
->manifest_write_callback
)(s
);
4311 if (ready
== last_writer
) {
4315 if (!manifest_writers_
.empty()) {
4316 manifest_writers_
.front()->cv
.Signal();
4321 // 'datas' is gramatically incorrect. We still use this notation to indicate
4322 // that this variable represents a collection of column_family_data.
4323 Status
VersionSet::LogAndApply(
4324 const autovector
<ColumnFamilyData
*>& column_family_datas
,
4325 const autovector
<const MutableCFOptions
*>& mutable_cf_options_list
,
4326 const autovector
<autovector
<VersionEdit
*>>& edit_lists
,
4327 InstrumentedMutex
* mu
, FSDirectory
* db_directory
, bool new_descriptor_log
,
4328 const ColumnFamilyOptions
* new_cf_options
,
4329 const std::vector
<std::function
<void(const Status
&)>>& manifest_wcbs
) {
4332 for (const auto& elist
: edit_lists
) {
4333 num_edits
+= static_cast<int>(elist
.size());
4335 if (num_edits
== 0) {
4336 return Status::OK();
4337 } else if (num_edits
> 1) {
4339 for (const auto& edit_list
: edit_lists
) {
4340 for (const auto& edit
: edit_list
) {
4341 assert(!edit
->IsColumnFamilyManipulation());
4344 #endif /* ! NDEBUG */
4347 int num_cfds
= static_cast<int>(column_family_datas
.size());
4348 if (num_cfds
== 1 && column_family_datas
[0] == nullptr) {
4349 assert(edit_lists
.size() == 1 && edit_lists
[0].size() == 1);
4350 assert(edit_lists
[0][0]->is_column_family_add_
);
4351 assert(new_cf_options
!= nullptr);
4353 std::deque
<ManifestWriter
> writers
;
4355 assert(static_cast<size_t>(num_cfds
) == mutable_cf_options_list
.size());
4356 assert(static_cast<size_t>(num_cfds
) == edit_lists
.size());
4358 for (int i
= 0; i
< num_cfds
; ++i
) {
4360 manifest_wcbs
.empty() ? [](const Status
&) {} : manifest_wcbs
[i
];
4361 writers
.emplace_back(mu
, column_family_datas
[i
],
4362 *mutable_cf_options_list
[i
], edit_lists
[i
], wcb
);
4363 manifest_writers_
.push_back(&writers
[i
]);
4365 assert(!writers
.empty());
4366 ManifestWriter
& first_writer
= writers
.front();
4367 TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting",
4369 while (!first_writer
.done
&& &first_writer
!= manifest_writers_
.front()) {
4370 first_writer
.cv
.Wait();
4372 if (first_writer
.done
) {
4373 // All non-CF-manipulation operations can be grouped together and committed
4374 // to MANIFEST. They should all have finished. The status code is stored in
4375 // the first manifest writer.
4377 for (const auto& writer
: writers
) {
4378 assert(writer
.done
);
4380 TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu
);
4381 #endif /* !NDEBUG */
4382 return first_writer
.status
;
4385 int num_undropped_cfds
= 0;
4386 for (auto cfd
: column_family_datas
) {
4387 // if cfd == nullptr, it is a column family add.
4388 if (cfd
== nullptr || !cfd
->IsDropped()) {
4389 ++num_undropped_cfds
;
4392 if (0 == num_undropped_cfds
) {
4393 for (int i
= 0; i
!= num_cfds
; ++i
) {
4394 manifest_writers_
.pop_front();
4396 // Notify new head of manifest write queue.
4397 if (!manifest_writers_
.empty()) {
4398 manifest_writers_
.front()->cv
.Signal();
4400 return Status::ColumnFamilyDropped();
4403 return ProcessManifestWrites(writers
, mu
, db_directory
, new_descriptor_log
,
4407 void VersionSet::LogAndApplyCFHelper(VersionEdit
* edit
) {
4408 assert(edit
->IsColumnFamilyManipulation());
4409 edit
->SetNextFile(next_file_number_
.load());
4410 // The log might have data that is not visible to memtbale and hence have not
4411 // updated the last_sequence_ yet. It is also possible that the log has is
4412 // expecting some new data that is not written yet. Since LastSequence is an
4413 // upper bound on the sequence, it is ok to record
4414 // last_allocated_sequence_ as the last sequence.
4415 edit
->SetLastSequence(db_options_
->two_write_queues
? last_allocated_sequence_
4417 if (edit
->is_column_family_drop_
) {
4418 // if we drop column family, we have to make sure to save max column family,
4419 // so that we don't reuse existing ID
4420 edit
->SetMaxColumnFamily(column_family_set_
->GetMaxColumnFamily());
4424 Status
VersionSet::LogAndApplyHelper(ColumnFamilyData
* cfd
,
4425 VersionBuilder
* builder
, VersionEdit
* edit
,
4426 InstrumentedMutex
* mu
) {
4431 assert(!edit
->IsColumnFamilyManipulation());
4433 if (edit
->has_log_number_
) {
4434 assert(edit
->log_number_
>= cfd
->GetLogNumber());
4435 assert(edit
->log_number_
< next_file_number_
.load());
4438 if (!edit
->has_prev_log_number_
) {
4439 edit
->SetPrevLogNumber(prev_log_number_
);
4441 edit
->SetNextFile(next_file_number_
.load());
4442 // The log might have data that is not visible to memtbale and hence have not
4443 // updated the last_sequence_ yet. It is also possible that the log has is
4444 // expecting some new data that is not written yet. Since LastSequence is an
4445 // upper bound on the sequence, it is ok to record
4446 // last_allocated_sequence_ as the last sequence.
4447 edit
->SetLastSequence(db_options_
->two_write_queues
? last_allocated_sequence_
4450 // The builder can be nullptr only if edit is WAL manipulation,
4451 // because WAL edits do not need to be applied to versions,
4452 // we return Status::OK() in this case.
4453 assert(builder
|| edit
->IsWalManipulation());
4454 return builder
? builder
->Apply(edit
) : Status::OK();
4457 Status
VersionSet::ApplyOneVersionEditToBuilder(
4459 const std::unordered_map
<std::string
, ColumnFamilyOptions
>& name_to_options
,
4460 std::unordered_map
<int, std::string
>& column_families_not_found
,
4461 std::unordered_map
<uint32_t, std::unique_ptr
<BaseReferencedVersionBuilder
>>&
4463 VersionEditParams
* version_edit_params
) {
4464 // Not found means that user didn't supply that column
4465 // family option AND we encountered column family add
4466 // record. Once we encounter column family drop record,
4467 // we will delete the column family from
4468 // column_families_not_found.
4469 bool cf_in_not_found
= (column_families_not_found
.find(edit
.column_family_
) !=
4470 column_families_not_found
.end());
4471 // in builders means that user supplied that column family
4472 // option AND that we encountered column family add record
4473 bool cf_in_builders
= builders
.find(edit
.column_family_
) != builders
.end();
4475 // they can't both be true
4476 assert(!(cf_in_not_found
&& cf_in_builders
));
4478 ColumnFamilyData
* cfd
= nullptr;
4480 if (edit
.is_column_family_add_
) {
4481 if (cf_in_builders
|| cf_in_not_found
) {
4482 return Status::Corruption(
4483 "Manifest adding the same column family twice: " +
4484 edit
.column_family_name_
);
4486 auto cf_options
= name_to_options
.find(edit
.column_family_name_
);
4487 // implicitly add persistent_stats column family without requiring user
4489 bool is_persistent_stats_column_family
=
4490 edit
.column_family_name_
.compare(kPersistentStatsColumnFamilyName
) == 0;
4491 if (cf_options
== name_to_options
.end() &&
4492 !is_persistent_stats_column_family
) {
4493 column_families_not_found
.insert(
4494 {edit
.column_family_
, edit
.column_family_name_
});
4496 // recover persistent_stats CF from a DB that already contains it
4497 if (is_persistent_stats_column_family
) {
4498 ColumnFamilyOptions cfo
;
4499 OptimizeForPersistentStats(&cfo
);
4500 cfd
= CreateColumnFamily(cfo
, &edit
);
4502 cfd
= CreateColumnFamily(cf_options
->second
, &edit
);
4504 cfd
->set_initialized();
4505 builders
.insert(std::make_pair(
4506 edit
.column_family_
, std::unique_ptr
<BaseReferencedVersionBuilder
>(
4507 new BaseReferencedVersionBuilder(cfd
))));
4509 } else if (edit
.is_column_family_drop_
) {
4510 if (cf_in_builders
) {
4511 auto builder
= builders
.find(edit
.column_family_
);
4512 assert(builder
!= builders
.end());
4513 builders
.erase(builder
);
4514 cfd
= column_family_set_
->GetColumnFamily(edit
.column_family_
);
4515 assert(cfd
!= nullptr);
4516 if (cfd
->UnrefAndTryDelete()) {
4519 // who else can have reference to cfd!?
4522 } else if (cf_in_not_found
) {
4523 column_families_not_found
.erase(edit
.column_family_
);
4525 return Status::Corruption(
4526 "Manifest - dropping non-existing column family");
4528 } else if (edit
.IsWalAddition()) {
4529 Status s
= wals_
.AddWals(edit
.GetWalAdditions());
4533 } else if (edit
.IsWalDeletion()) {
4534 Status s
= wals_
.DeleteWalsBefore(edit
.GetWalDeletion().GetLogNumber());
4538 } else if (!cf_in_not_found
) {
4539 if (!cf_in_builders
) {
4540 return Status::Corruption(
4541 "Manifest record referencing unknown column family");
4544 cfd
= column_family_set_
->GetColumnFamily(edit
.column_family_
);
4545 // this should never happen since cf_in_builders is true
4546 assert(cfd
!= nullptr);
4548 // if it is not column family add or column family drop,
4549 // then it's a file add/delete, which should be forwarded
4551 auto builder
= builders
.find(edit
.column_family_
);
4552 assert(builder
!= builders
.end());
4553 Status s
= builder
->second
->version_builder()->Apply(&edit
);
4558 return ExtractInfoFromVersionEdit(cfd
, edit
, version_edit_params
);
4561 Status
VersionSet::ExtractInfoFromVersionEdit(
4562 ColumnFamilyData
* cfd
, const VersionEdit
& from_edit
,
4563 VersionEditParams
* version_edit_params
) {
4564 if (cfd
!= nullptr) {
4565 if (from_edit
.has_db_id_
) {
4566 version_edit_params
->SetDBId(from_edit
.db_id_
);
4568 if (from_edit
.has_log_number_
) {
4569 if (cfd
->GetLogNumber() > from_edit
.log_number_
) {
4571 db_options_
->info_log
,
4572 "MANIFEST corruption detected, but ignored - Log numbers in "
4573 "records NOT monotonically increasing");
4575 cfd
->SetLogNumber(from_edit
.log_number_
);
4576 version_edit_params
->SetLogNumber(from_edit
.log_number_
);
4579 if (from_edit
.has_comparator_
&&
4580 from_edit
.comparator_
!= cfd
->user_comparator()->Name()) {
4581 return Status::InvalidArgument(
4582 cfd
->user_comparator()->Name(),
4583 "does not match existing comparator " + from_edit
.comparator_
);
4587 if (from_edit
.has_prev_log_number_
) {
4588 version_edit_params
->SetPrevLogNumber(from_edit
.prev_log_number_
);
4591 if (from_edit
.has_next_file_number_
) {
4592 version_edit_params
->SetNextFile(from_edit
.next_file_number_
);
4595 if (from_edit
.has_max_column_family_
) {
4596 version_edit_params
->SetMaxColumnFamily(from_edit
.max_column_family_
);
4599 if (from_edit
.has_min_log_number_to_keep_
) {
4600 version_edit_params
->min_log_number_to_keep_
=
4601 std::max(version_edit_params
->min_log_number_to_keep_
,
4602 from_edit
.min_log_number_to_keep_
);
4605 if (from_edit
.has_last_sequence_
) {
4606 version_edit_params
->SetLastSequence(from_edit
.last_sequence_
);
4608 return Status::OK();
4611 Status
VersionSet::GetCurrentManifestPath(const std::string
& dbname
,
4613 std::string
* manifest_path
,
4614 uint64_t* manifest_file_number
) {
4615 assert(fs
!= nullptr);
4616 assert(manifest_path
!= nullptr);
4617 assert(manifest_file_number
!= nullptr);
4620 Status s
= ReadFileToString(fs
, CurrentFileName(dbname
), &fname
);
4624 if (fname
.empty() || fname
.back() != '\n') {
4625 return Status::Corruption("CURRENT file does not end with newline");
4627 // remove the trailing '\n'
4628 fname
.resize(fname
.size() - 1);
4630 bool parse_ok
= ParseFileName(fname
, manifest_file_number
, &type
);
4631 if (!parse_ok
|| type
!= kDescriptorFile
) {
4632 return Status::Corruption("CURRENT file corrupted");
4634 *manifest_path
= dbname
;
4635 if (dbname
.back() != '/') {
4636 manifest_path
->push_back('/');
4638 manifest_path
->append(fname
);
4639 return Status::OK();
4642 Status
VersionSet::ReadAndRecover(
4643 log::Reader
& reader
, AtomicGroupReadBuffer
* read_buffer
,
4644 const std::unordered_map
<std::string
, ColumnFamilyOptions
>& name_to_options
,
4645 std::unordered_map
<int, std::string
>& column_families_not_found
,
4646 std::unordered_map
<uint32_t, std::unique_ptr
<BaseReferencedVersionBuilder
>>&
4648 Status
* log_read_status
, VersionEditParams
* version_edit_params
,
4649 std::string
* db_id
) {
4650 assert(read_buffer
!= nullptr);
4651 assert(log_read_status
!= nullptr);
4654 std::string scratch
;
4655 size_t recovered_edits
= 0;
4656 while (s
.ok() && reader
.ReadRecord(&record
, &scratch
) &&
4657 log_read_status
->ok()) {
4659 s
= edit
.DecodeFrom(record
);
4663 if (edit
.has_db_id_
) {
4664 db_id_
= edit
.GetDbId();
4665 if (db_id
!= nullptr) {
4666 db_id
->assign(edit
.GetDbId());
4669 s
= read_buffer
->AddEdit(&edit
);
4673 if (edit
.is_in_atomic_group_
) {
4674 if (read_buffer
->IsFull()) {
4675 // Apply edits in an atomic group when we have read all edits in the
4677 for (auto& e
: read_buffer
->replay_buffer()) {
4678 s
= ApplyOneVersionEditToBuilder(e
, name_to_options
,
4679 column_families_not_found
, builders
,
4680 version_edit_params
);
4689 read_buffer
->Clear();
4692 // Apply a normal edit immediately.
4693 s
= ApplyOneVersionEditToBuilder(edit
, name_to_options
,
4694 column_families_not_found
, builders
,
4695 version_edit_params
);
4701 if (!log_read_status
->ok()) {
4702 s
= *log_read_status
;
4705 // Clear the buffer if we fail to decode/apply an edit.
4706 read_buffer
->Clear();
4708 TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits",
4713 Status
VersionSet::Recover(
4714 const std::vector
<ColumnFamilyDescriptor
>& column_families
, bool read_only
,
4715 std::string
* db_id
) {
4716 // Read "CURRENT" file, which contains a pointer to the current manifest file
4717 std::string manifest_path
;
4718 Status s
= GetCurrentManifestPath(dbname_
, fs_
.get(), &manifest_path
,
4719 &manifest_file_number_
);
4724 ROCKS_LOG_INFO(db_options_
->info_log
, "Recovering from manifest file: %s\n",
4725 manifest_path
.c_str());
4727 std::unique_ptr
<SequentialFileReader
> manifest_file_reader
;
4729 std::unique_ptr
<FSSequentialFile
> manifest_file
;
4730 s
= fs_
->NewSequentialFile(manifest_path
,
4731 fs_
->OptimizeForManifestRead(file_options_
),
4732 &manifest_file
, nullptr);
4736 manifest_file_reader
.reset(
4737 new SequentialFileReader(std::move(manifest_file
), manifest_path
,
4738 db_options_
->log_readahead_size
, io_tracer_
));
4740 uint64_t current_manifest_file_size
= 0;
4741 uint64_t log_number
= 0;
4743 VersionSet::LogReporter reporter
;
4744 Status log_read_status
;
4745 reporter
.status
= &log_read_status
;
4746 log::Reader
reader(nullptr, std::move(manifest_file_reader
), &reporter
,
4747 true /* checksum */, 0 /* log_number */);
4748 VersionEditHandler
handler(
4749 read_only
, column_families
, const_cast<VersionSet
*>(this),
4750 /*track_missing_files=*/false,
4751 /*no_error_if_table_files_missing=*/false, io_tracer_
);
4752 handler
.Iterate(reader
, &log_read_status
);
4753 s
= handler
.status();
4755 log_number
= handler
.GetVersionEditParams().log_number_
;
4756 current_manifest_file_size
= reader
.GetReadOffset();
4757 assert(current_manifest_file_size
!= 0);
4758 handler
.GetDbId(db_id
);
4763 manifest_file_size_
= current_manifest_file_size
;
4765 db_options_
->info_log
,
4766 "Recovered from manifest file:%s succeeded,"
4767 "manifest_file_number is %" PRIu64
", next_file_number is %" PRIu64
4768 ", last_sequence is %" PRIu64
", log_number is %" PRIu64
4769 ",prev_log_number is %" PRIu64
",max_column_family is %" PRIu32
4770 ",min_log_number_to_keep is %" PRIu64
"\n",
4771 manifest_path
.c_str(), manifest_file_number_
, next_file_number_
.load(),
4772 last_sequence_
.load(), log_number
, prev_log_number_
,
4773 column_family_set_
->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
4775 for (auto cfd
: *column_family_set_
) {
4776 if (cfd
->IsDropped()) {
4779 ROCKS_LOG_INFO(db_options_
->info_log
,
4780 "Column family [%s] (ID %" PRIu32
4781 "), log number is %" PRIu64
"\n",
4782 cfd
->GetName().c_str(), cfd
->GetID(), cfd
->GetLogNumber());
4790 class ManifestPicker
{
4792 explicit ManifestPicker(const std::string
& dbname
,
4793 const std::vector
<std::string
>& files_in_dbname
);
4794 // REQUIRES Valid() == true
4795 std::string
GetNextManifest(uint64_t* file_number
, std::string
* file_name
);
4796 bool Valid() const { return manifest_file_iter_
!= manifest_files_
.end(); }
4799 const std::string
& dbname_
;
4800 // MANIFEST file names(s)
4801 std::vector
<std::string
> manifest_files_
;
4802 std::vector
<std::string
>::const_iterator manifest_file_iter_
;
4805 ManifestPicker::ManifestPicker(const std::string
& dbname
,
4806 const std::vector
<std::string
>& files_in_dbname
)
4808 // populate manifest files
4809 assert(!files_in_dbname
.empty());
4810 for (const auto& fname
: files_in_dbname
) {
4811 uint64_t file_num
= 0;
4813 bool parse_ok
= ParseFileName(fname
, &file_num
, &file_type
);
4814 if (parse_ok
&& file_type
== kDescriptorFile
) {
4815 manifest_files_
.push_back(fname
);
4818 // seek to first manifest
4819 std::sort(manifest_files_
.begin(), manifest_files_
.end(),
4820 [](const std::string
& lhs
, const std::string
& rhs
) {
4825 bool parse_ok1
= ParseFileName(lhs
, &num1
, &type1
);
4826 bool parse_ok2
= ParseFileName(rhs
, &num2
, &type2
);
4836 manifest_file_iter_
= manifest_files_
.begin();
4839 std::string
ManifestPicker::GetNextManifest(uint64_t* number
,
4840 std::string
* file_name
) {
4843 if (manifest_file_iter_
!= manifest_files_
.end()) {
4844 ret
.assign(dbname_
);
4845 if (ret
.back() != kFilePathSeparator
) {
4846 ret
.push_back(kFilePathSeparator
);
4848 ret
.append(*manifest_file_iter_
);
4851 bool parse
= ParseFileName(*manifest_file_iter_
, number
, &type
);
4852 assert(type
== kDescriptorFile
);
4860 *file_name
= *manifest_file_iter_
;
4862 ++manifest_file_iter_
;
4868 Status
VersionSet::TryRecover(
4869 const std::vector
<ColumnFamilyDescriptor
>& column_families
, bool read_only
,
4870 const std::vector
<std::string
>& files_in_dbname
, std::string
* db_id
,
4871 bool* has_missing_table_file
) {
4872 ManifestPicker
manifest_picker(dbname_
, files_in_dbname
);
4873 if (!manifest_picker
.Valid()) {
4874 return Status::Corruption("Cannot locate MANIFEST file in " + dbname_
);
4877 std::string manifest_path
=
4878 manifest_picker
.GetNextManifest(&manifest_file_number_
, nullptr);
4879 while (!manifest_path
.empty()) {
4880 s
= TryRecoverFromOneManifest(manifest_path
, column_families
, read_only
,
4881 db_id
, has_missing_table_file
);
4882 if (s
.ok() || !manifest_picker
.Valid()) {
4887 manifest_picker
.GetNextManifest(&manifest_file_number_
, nullptr);
4892 Status
VersionSet::TryRecoverFromOneManifest(
4893 const std::string
& manifest_path
,
4894 const std::vector
<ColumnFamilyDescriptor
>& column_families
, bool read_only
,
4895 std::string
* db_id
, bool* has_missing_table_file
) {
4896 ROCKS_LOG_INFO(db_options_
->info_log
, "Trying to recover from manifest: %s\n",
4897 manifest_path
.c_str());
4898 std::unique_ptr
<SequentialFileReader
> manifest_file_reader
;
4901 std::unique_ptr
<FSSequentialFile
> manifest_file
;
4902 s
= fs_
->NewSequentialFile(manifest_path
,
4903 fs_
->OptimizeForManifestRead(file_options_
),
4904 &manifest_file
, nullptr);
4908 manifest_file_reader
.reset(
4909 new SequentialFileReader(std::move(manifest_file
), manifest_path
,
4910 db_options_
->log_readahead_size
, io_tracer_
));
4914 VersionSet::LogReporter reporter
;
4915 reporter
.status
= &s
;
4916 log::Reader
reader(nullptr, std::move(manifest_file_reader
), &reporter
,
4917 /*checksum=*/true, /*log_num=*/0);
4918 VersionEditHandlerPointInTime
handler_pit(
4919 read_only
, column_families
, const_cast<VersionSet
*>(this), io_tracer_
);
4921 handler_pit
.Iterate(reader
, &s
);
4923 handler_pit
.GetDbId(db_id
);
4925 assert(nullptr != has_missing_table_file
);
4926 *has_missing_table_file
= handler_pit
.HasMissingFiles();
4928 return handler_pit
.status();
4931 Status
VersionSet::ListColumnFamilies(std::vector
<std::string
>* column_families
,
4932 const std::string
& dbname
,
4934 // these are just for performance reasons, not correcntes,
4935 // so we're fine using the defaults
4936 FileOptions soptions
;
4937 // Read "CURRENT" file, which contains a pointer to the current manifest file
4938 std::string manifest_path
;
4939 uint64_t manifest_file_number
;
4941 GetCurrentManifestPath(dbname
, fs
, &manifest_path
, &manifest_file_number
);
4946 std::unique_ptr
<SequentialFileReader
> file_reader
;
4948 std::unique_ptr
<FSSequentialFile
> file
;
4949 s
= fs
->NewSequentialFile(manifest_path
, soptions
, &file
, nullptr);
4953 file_reader
.reset(new SequentialFileReader(std::move(file
), manifest_path
,
4954 nullptr /*IOTracer*/));
4957 VersionSet::LogReporter reporter
;
4958 reporter
.status
= &s
;
4959 log::Reader
reader(nullptr, std::move(file_reader
), &reporter
,
4960 true /* checksum */, 0 /* log_number */);
4962 ListColumnFamiliesHandler handler
;
4963 handler
.Iterate(reader
, &s
);
4965 assert(column_families
);
4966 column_families
->clear();
4967 if (handler
.status().ok()) {
4968 for (const auto& iter
: handler
.GetColumnFamilyNames()) {
4969 column_families
->push_back(iter
.second
);
4973 return handler
.status();
4976 #ifndef ROCKSDB_LITE
4977 Status
VersionSet::ReduceNumberOfLevels(const std::string
& dbname
,
4978 const Options
* options
,
4979 const FileOptions
& file_options
,
4981 if (new_levels
<= 1) {
4982 return Status::InvalidArgument(
4983 "Number of levels needs to be bigger than 1");
4986 ImmutableDBOptions
db_options(*options
);
4987 ColumnFamilyOptions
cf_options(*options
);
4988 std::shared_ptr
<Cache
> tc(NewLRUCache(options
->max_open_files
- 10,
4989 options
->table_cache_numshardbits
));
4990 WriteController
wc(options
->delayed_write_rate
);
4991 WriteBufferManager
wb(options
->db_write_buffer_size
);
4992 VersionSet
versions(dbname
, &db_options
, file_options
, tc
.get(), &wb
, &wc
,
4993 nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/);
4996 std::vector
<ColumnFamilyDescriptor
> dummy
;
4997 ColumnFamilyDescriptor
dummy_descriptor(kDefaultColumnFamilyName
,
4998 ColumnFamilyOptions(*options
));
4999 dummy
.push_back(dummy_descriptor
);
5000 status
= versions
.Recover(dummy
);
5005 Version
* current_version
=
5006 versions
.GetColumnFamilySet()->GetDefault()->current();
5007 auto* vstorage
= current_version
->storage_info();
5008 int current_levels
= vstorage
->num_levels();
5010 if (current_levels
<= new_levels
) {
5011 return Status::OK();
5014 // Make sure there are file only on one level from
5015 // (new_levels-1) to (current_levels-1)
5016 int first_nonempty_level
= -1;
5017 int first_nonempty_level_filenum
= 0;
5018 for (int i
= new_levels
- 1; i
< current_levels
; i
++) {
5019 int file_num
= vstorage
->NumLevelFiles(i
);
5020 if (file_num
!= 0) {
5021 if (first_nonempty_level
< 0) {
5022 first_nonempty_level
= i
;
5023 first_nonempty_level_filenum
= file_num
;
5026 snprintf(msg
, sizeof(msg
),
5027 "Found at least two levels containing files: "
5028 "[%d:%d],[%d:%d].\n",
5029 first_nonempty_level
, first_nonempty_level_filenum
, i
,
5031 return Status::InvalidArgument(msg
);
5036 // we need to allocate an array with the old number of levels size to
5037 // avoid SIGSEGV in WriteCurrentStatetoManifest()
5038 // however, all levels bigger or equal to new_levels will be empty
5039 std::vector
<FileMetaData
*>* new_files_list
=
5040 new std::vector
<FileMetaData
*>[current_levels
];
5041 for (int i
= 0; i
< new_levels
- 1; i
++) {
5042 new_files_list
[i
] = vstorage
->LevelFiles(i
);
5045 if (first_nonempty_level
> 0) {
5046 auto& new_last_level
= new_files_list
[new_levels
- 1];
5048 new_last_level
= vstorage
->LevelFiles(first_nonempty_level
);
5050 for (size_t i
= 0; i
< new_last_level
.size(); ++i
) {
5051 const FileMetaData
* const meta
= new_last_level
[i
];
5054 const uint64_t file_number
= meta
->fd
.GetNumber();
5056 vstorage
->file_locations_
[file_number
] =
5057 VersionStorageInfo::FileLocation(new_levels
- 1, i
);
5061 delete[] vstorage
-> files_
;
5062 vstorage
->files_
= new_files_list
;
5063 vstorage
->num_levels_
= new_levels
;
5065 MutableCFOptions
mutable_cf_options(*options
);
5067 InstrumentedMutex dummy_mutex
;
5068 InstrumentedMutexLock
l(&dummy_mutex
);
5069 return versions
.LogAndApply(
5070 versions
.GetColumnFamilySet()->GetDefault(),
5071 mutable_cf_options
, &ve
, &dummy_mutex
, nullptr, true);
5074 // Get the checksum information including the checksum and checksum function
5075 // name of all SST files in VersionSet. Store the information in
5076 // FileChecksumList which contains a map from file number to its checksum info.
5077 // If DB is not running, make sure call VersionSet::Recover() to load the file
5078 // metadata from Manifest to VersionSet before calling this function.
5079 Status
VersionSet::GetLiveFilesChecksumInfo(FileChecksumList
* checksum_list
) {
5080 // Clean the previously stored checksum information if any.
5082 if (checksum_list
== nullptr) {
5083 s
= Status::InvalidArgument("checksum_list is nullptr");
5086 checksum_list
->reset();
5088 for (auto cfd
: *column_family_set_
) {
5089 if (cfd
->IsDropped() || !cfd
->initialized()) {
5092 for (int level
= 0; level
< cfd
->NumberLevels(); level
++) {
5093 for (const auto& file
:
5094 cfd
->current()->storage_info()->LevelFiles(level
)) {
5095 s
= checksum_list
->InsertOneFileChecksum(file
->fd
.GetNumber(),
5096 file
->file_checksum
,
5097 file
->file_checksum_func_name
);
5113 Status
VersionSet::DumpManifest(Options
& options
, std::string
& dscname
,
5114 bool verbose
, bool hex
, bool json
) {
5115 // Open the specified manifest file.
5116 std::unique_ptr
<SequentialFileReader
> file_reader
;
5119 std::unique_ptr
<FSSequentialFile
> file
;
5120 const std::shared_ptr
<FileSystem
>& fs
= options
.env
->GetFileSystem();
5121 s
= fs
->NewSequentialFile(
5123 fs
->OptimizeForManifestRead(file_options_
), &file
,
5128 file_reader
.reset(new SequentialFileReader(
5129 std::move(file
), dscname
, db_options_
->log_readahead_size
, io_tracer_
));
5132 std::vector
<ColumnFamilyDescriptor
> column_families(
5133 1, ColumnFamilyDescriptor(kDefaultColumnFamilyName
, options
));
5134 DumpManifestHandler
handler(column_families
, this, io_tracer_
, verbose
, hex
,
5137 VersionSet::LogReporter reporter
;
5138 reporter
.status
= &s
;
5139 log::Reader
reader(nullptr, std::move(file_reader
), &reporter
,
5140 true /* checksum */, 0 /* log_number */);
5141 handler
.Iterate(reader
, &s
);
5144 return handler
.status();
5146 #endif // ROCKSDB_LITE
5148 void VersionSet::MarkFileNumberUsed(uint64_t number
) {
5149 // only called during recovery and repair which are single threaded, so this
5150 // works because there can't be concurrent calls
5151 if (next_file_number_
.load(std::memory_order_relaxed
) <= number
) {
5152 next_file_number_
.store(number
+ 1, std::memory_order_relaxed
);
5155 // Called only either from ::LogAndApply which is protected by mutex or during
5156 // recovery which is single-threaded.
5157 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number
) {
5158 if (min_log_number_to_keep_2pc_
.load(std::memory_order_relaxed
) < number
) {
5159 min_log_number_to_keep_2pc_
.store(number
, std::memory_order_relaxed
);
5163 Status
VersionSet::WriteCurrentStateToManifest(
5164 const std::unordered_map
<uint32_t, MutableCFState
>& curr_state
,
5165 const VersionEdit
& wal_additions
, log::Writer
* log
, IOStatus
& io_s
) {
5166 // TODO: Break up into multiple records to reduce memory usage on recovery?
5168 // WARNING: This method doesn't hold a mutex!!
5170 // This is done without DB mutex lock held, but only within single-threaded
5171 // LogAndApply. Column family manipulations can only happen within LogAndApply
5172 // (the same single thread), so we're safe to iterate.
5175 if (db_options_
->write_dbid_to_manifest
) {
5176 VersionEdit edit_for_db_id
;
5177 assert(!db_id_
.empty());
5178 edit_for_db_id
.SetDBId(db_id_
);
5179 std::string db_id_record
;
5180 if (!edit_for_db_id
.EncodeTo(&db_id_record
)) {
5181 return Status::Corruption("Unable to Encode VersionEdit:" +
5182 edit_for_db_id
.DebugString(true));
5184 io_s
= log
->AddRecord(db_id_record
);
5191 if (!wal_additions
.GetWalAdditions().empty()) {
5192 TEST_SYNC_POINT_CALLBACK("VersionSet::WriteCurrentStateToManifest:SaveWal",
5193 const_cast<VersionEdit
*>(&wal_additions
));
5195 if (!wal_additions
.EncodeTo(&record
)) {
5196 return Status::Corruption("Unable to Encode VersionEdit: " +
5197 wal_additions
.DebugString(true));
5199 io_s
= log
->AddRecord(record
);
5205 for (auto cfd
: *column_family_set_
) {
5208 if (cfd
->IsDropped()) {
5211 assert(cfd
->initialized());
5213 // Store column family info
5215 if (cfd
->GetID() != 0) {
5216 // default column family is always there,
5217 // no need to explicitly write it
5218 edit
.AddColumnFamily(cfd
->GetName());
5219 edit
.SetColumnFamily(cfd
->GetID());
5221 edit
.SetComparatorName(
5222 cfd
->internal_comparator().user_comparator()->Name());
5224 if (!edit
.EncodeTo(&record
)) {
5225 return Status::Corruption(
5226 "Unable to Encode VersionEdit:" + edit
.DebugString(true));
5228 io_s
= log
->AddRecord(record
);
5237 edit
.SetColumnFamily(cfd
->GetID());
5239 assert(cfd
->current());
5240 assert(cfd
->current()->storage_info());
5242 for (int level
= 0; level
< cfd
->NumberLevels(); level
++) {
5243 for (const auto& f
:
5244 cfd
->current()->storage_info()->LevelFiles(level
)) {
5245 edit
.AddFile(level
, f
->fd
.GetNumber(), f
->fd
.GetPathId(),
5246 f
->fd
.GetFileSize(), f
->smallest
, f
->largest
,
5247 f
->fd
.smallest_seqno
, f
->fd
.largest_seqno
,
5248 f
->marked_for_compaction
, f
->oldest_blob_file_number
,
5249 f
->oldest_ancester_time
, f
->file_creation_time
,
5250 f
->file_checksum
, f
->file_checksum_func_name
);
5254 const auto& blob_files
= cfd
->current()->storage_info()->GetBlobFiles();
5255 for (const auto& pair
: blob_files
) {
5256 const uint64_t blob_file_number
= pair
.first
;
5257 const auto& meta
= pair
.second
;
5260 assert(blob_file_number
== meta
->GetBlobFileNumber());
5262 edit
.AddBlobFile(blob_file_number
, meta
->GetTotalBlobCount(),
5263 meta
->GetTotalBlobBytes(), meta
->GetChecksumMethod(),
5264 meta
->GetChecksumValue());
5265 if (meta
->GetGarbageBlobCount() > 0) {
5266 edit
.AddBlobFileGarbage(blob_file_number
, meta
->GetGarbageBlobCount(),
5267 meta
->GetGarbageBlobBytes());
5271 const auto iter
= curr_state
.find(cfd
->GetID());
5272 assert(iter
!= curr_state
.end());
5273 uint64_t log_number
= iter
->second
.log_number
;
5274 edit
.SetLogNumber(log_number
);
5276 if (!edit
.EncodeTo(&record
)) {
5277 return Status::Corruption(
5278 "Unable to Encode VersionEdit:" + edit
.DebugString(true));
5280 io_s
= log
->AddRecord(record
);
5286 return Status::OK();
5289 // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
5290 // function is called repeatedly with consecutive pairs of slices. For example
5291 // if the slice list is [a, b, c, d] this function is called with arguments
5292 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
5293 // we avoid doing binary search for the keys b and c twice and instead somehow
5294 // maintain state of where they first appear in the files.
5295 uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions
& options
,
5296 Version
* v
, const Slice
& start
,
5297 const Slice
& end
, int start_level
,
5298 int end_level
, TableReaderCaller caller
) {
5299 const auto& icmp
= v
->cfd_
->internal_comparator();
5302 assert(icmp
.Compare(start
, end
) <= 0);
5304 uint64_t total_full_size
= 0;
5305 const auto* vstorage
= v
->storage_info();
5306 const int num_non_empty_levels
= vstorage
->num_non_empty_levels();
5307 end_level
= (end_level
== -1) ? num_non_empty_levels
5308 : std::min(end_level
, num_non_empty_levels
);
5310 assert(start_level
<= end_level
);
5312 // Outline of the optimization that uses options.files_size_error_margin.
5313 // When approximating the files total size that is used to store a keys range,
5314 // we first sum up the sizes of the files that fully fall into the range.
5315 // Then we sum up the sizes of all the files that may intersect with the range
5316 // (this includes all files in L0 as well). Then, if total_intersecting_size
5317 // is smaller than total_full_size * options.files_size_error_margin - we can
5318 // infer that the intersecting files have a sufficiently negligible
5319 // contribution to the total size, and we can approximate the storage required
5320 // for the keys in range as just half of the intersecting_files_size.
5321 // E.g., if the value of files_size_error_margin is 0.1, then the error of the
5322 // approximation is limited to only ~10% of the total size of files that fully
5323 // fall into the keys range. In such case, this helps to avoid a costly
5324 // process of binary searching the intersecting files that is required only
5325 // for a more precise calculation of the total size.
5327 autovector
<FdWithKeyRange
*, 32> first_files
;
5328 autovector
<FdWithKeyRange
*, 16> last_files
;
5330 // scan all the levels
5331 for (int level
= start_level
; level
< end_level
; ++level
) {
5332 const LevelFilesBrief
& files_brief
= vstorage
->LevelFilesBrief(level
);
5333 if (files_brief
.num_files
== 0) {
5334 // empty level, skip exploration
5339 // level 0 files are not in sorted order, we need to iterate through
5340 // the list to compute the total bytes that require scanning,
5341 // so handle the case explicitly (similarly to first_files case)
5342 for (size_t i
= 0; i
< files_brief
.num_files
; i
++) {
5343 first_files
.push_back(&files_brief
.files
[i
]);
5349 assert(files_brief
.num_files
> 0);
5351 // identify the file position for start key
5352 const int idx_start
=
5353 FindFileInRange(icmp
, files_brief
, start
, 0,
5354 static_cast<uint32_t>(files_brief
.num_files
- 1));
5355 assert(static_cast<size_t>(idx_start
) < files_brief
.num_files
);
5357 // identify the file position for end key
5358 int idx_end
= idx_start
;
5359 if (icmp
.Compare(files_brief
.files
[idx_end
].largest_key
, end
) < 0) {
5361 FindFileInRange(icmp
, files_brief
, end
, idx_start
,
5362 static_cast<uint32_t>(files_brief
.num_files
- 1));
5364 assert(idx_end
>= idx_start
&&
5365 static_cast<size_t>(idx_end
) < files_brief
.num_files
);
5367 // scan all files from the starting index to the ending index
5368 // (inferred from the sorted order)
5370 // first scan all the intermediate full files (excluding first and last)
5371 for (int i
= idx_start
+ 1; i
< idx_end
; ++i
) {
5372 uint64_t file_size
= files_brief
.files
[i
].fd
.GetFileSize();
5373 // The entire file falls into the range, so we can just take its size.
5375 ApproximateSize(v
, files_brief
.files
[i
], start
, end
, caller
));
5376 total_full_size
+= file_size
;
5379 // save the first and the last files (which may be the same file), so we
5380 // can scan them later.
5381 first_files
.push_back(&files_brief
.files
[idx_start
]);
5382 if (idx_start
!= idx_end
) {
5383 // we need to estimate size for both files, only if they are different
5384 last_files
.push_back(&files_brief
.files
[idx_end
]);
5388 // The sum of all file sizes that intersect the [start, end] keys range.
5389 uint64_t total_intersecting_size
= 0;
5390 for (const auto* file_ptr
: first_files
) {
5391 total_intersecting_size
+= file_ptr
->fd
.GetFileSize();
5393 for (const auto* file_ptr
: last_files
) {
5394 total_intersecting_size
+= file_ptr
->fd
.GetFileSize();
5397 // Now scan all the first & last files at each level, and estimate their size.
5398 // If the total_intersecting_size is less than X% of the total_full_size - we
5399 // want to approximate the result in order to avoid the costly binary search
5400 // inside ApproximateSize. We use half of file size as an approximation below.
5402 const double margin
= options
.files_size_error_margin
;
5403 if (margin
> 0 && total_intersecting_size
<
5404 static_cast<uint64_t>(total_full_size
* margin
)) {
5405 total_full_size
+= total_intersecting_size
/ 2;
5407 // Estimate for all the first files (might also be last files), at each
5409 for (const auto file_ptr
: first_files
) {
5410 total_full_size
+= ApproximateSize(v
, *file_ptr
, start
, end
, caller
);
5413 // Estimate for all the last files, at each level
5414 for (const auto file_ptr
: last_files
) {
5415 // We could use ApproximateSize here, but calling ApproximateOffsetOf
5416 // directly is just more efficient.
5417 total_full_size
+= ApproximateOffsetOf(v
, *file_ptr
, end
, caller
);
5421 return total_full_size
;
5424 uint64_t VersionSet::ApproximateOffsetOf(Version
* v
, const FdWithKeyRange
& f
,
5426 TableReaderCaller caller
) {
5429 const auto& icmp
= v
->cfd_
->internal_comparator();
5431 uint64_t result
= 0;
5432 if (icmp
.Compare(f
.largest_key
, key
) <= 0) {
5433 // Entire file is before "key", so just add the file size
5434 result
= f
.fd
.GetFileSize();
5435 } else if (icmp
.Compare(f
.smallest_key
, key
) > 0) {
5436 // Entire file is after "key", so ignore
5439 // "key" falls in the range for this table. Add the
5440 // approximate offset of "key" within the table.
5441 TableCache
* table_cache
= v
->cfd_
->table_cache();
5442 if (table_cache
!= nullptr) {
5443 result
= table_cache
->ApproximateOffsetOf(
5444 key
, f
.file_metadata
->fd
, caller
, icmp
,
5445 v
->GetMutableCFOptions().prefix_extractor
.get());
5451 uint64_t VersionSet::ApproximateSize(Version
* v
, const FdWithKeyRange
& f
,
5452 const Slice
& start
, const Slice
& end
,
5453 TableReaderCaller caller
) {
5456 const auto& icmp
= v
->cfd_
->internal_comparator();
5457 assert(icmp
.Compare(start
, end
) <= 0);
5459 if (icmp
.Compare(f
.largest_key
, start
) <= 0 ||
5460 icmp
.Compare(f
.smallest_key
, end
) > 0) {
5461 // Entire file is before or after the start/end keys range
5465 if (icmp
.Compare(f
.smallest_key
, start
) >= 0) {
5466 // Start of the range is before the file start - approximate by end offset
5467 return ApproximateOffsetOf(v
, f
, end
, caller
);
5470 if (icmp
.Compare(f
.largest_key
, end
) < 0) {
5471 // End of the range is after the file end - approximate by subtracting
5472 // start offset from the file size
5473 uint64_t start_offset
= ApproximateOffsetOf(v
, f
, start
, caller
);
5474 assert(f
.fd
.GetFileSize() >= start_offset
);
5475 return f
.fd
.GetFileSize() - start_offset
;
5478 // The interval falls entirely in the range for this file.
5479 TableCache
* table_cache
= v
->cfd_
->table_cache();
5480 if (table_cache
== nullptr) {
5483 return table_cache
->ApproximateSize(
5484 start
, end
, f
.file_metadata
->fd
, caller
, icmp
,
5485 v
->GetMutableCFOptions().prefix_extractor
.get());
5488 void VersionSet::AddLiveFiles(std::vector
<uint64_t>* live_table_files
,
5489 std::vector
<uint64_t>* live_blob_files
) const {
5490 assert(live_table_files
);
5491 assert(live_blob_files
);
5493 // pre-calculate space requirement
5494 size_t total_table_files
= 0;
5495 size_t total_blob_files
= 0;
5497 assert(column_family_set_
);
5498 for (auto cfd
: *column_family_set_
) {
5501 if (!cfd
->initialized()) {
5505 Version
* const dummy_versions
= cfd
->dummy_versions();
5506 assert(dummy_versions
);
5508 for (Version
* v
= dummy_versions
->next_
; v
!= dummy_versions
;
5512 const auto* vstorage
= v
->storage_info();
5515 for (int level
= 0; level
< vstorage
->num_levels(); ++level
) {
5516 total_table_files
+= vstorage
->LevelFiles(level
).size();
5519 total_blob_files
+= vstorage
->GetBlobFiles().size();
5523 // just one time extension to the right size
5524 live_table_files
->reserve(live_table_files
->size() + total_table_files
);
5525 live_blob_files
->reserve(live_blob_files
->size() + total_blob_files
);
5527 assert(column_family_set_
);
5528 for (auto cfd
: *column_family_set_
) {
5530 if (!cfd
->initialized()) {
5534 auto* current
= cfd
->current();
5535 bool found_current
= false;
5537 Version
* const dummy_versions
= cfd
->dummy_versions();
5538 assert(dummy_versions
);
5540 for (Version
* v
= dummy_versions
->next_
; v
!= dummy_versions
;
5542 v
->AddLiveFiles(live_table_files
, live_blob_files
);
5544 found_current
= true;
5548 if (!found_current
&& current
!= nullptr) {
5549 // Should never happen unless it is a bug.
5551 current
->AddLiveFiles(live_table_files
, live_blob_files
);
5556 InternalIterator
* VersionSet::MakeInputIterator(
5557 const ReadOptions
& read_options
, const Compaction
* c
,
5558 RangeDelAggregator
* range_del_agg
,
5559 const FileOptions
& file_options_compactions
) {
5560 auto cfd
= c
->column_family_data();
5561 // Level-0 files have to be merged together. For other levels,
5562 // we will make a concatenating iterator per level.
5563 // TODO(opt): use concatenating iterator for level-0 if there is no overlap
5564 const size_t space
= (c
->level() == 0 ? c
->input_levels(0)->num_files
+
5565 c
->num_input_levels() - 1
5566 : c
->num_input_levels());
5567 InternalIterator
** list
= new InternalIterator
* [space
];
5569 for (size_t which
= 0; which
< c
->num_input_levels(); which
++) {
5570 if (c
->input_levels(which
)->num_files
!= 0) {
5571 if (c
->level(which
) == 0) {
5572 const LevelFilesBrief
* flevel
= c
->input_levels(which
);
5573 for (size_t i
= 0; i
< flevel
->num_files
; i
++) {
5574 list
[num
++] = cfd
->table_cache()->NewIterator(
5575 read_options
, file_options_compactions
,
5576 cfd
->internal_comparator(), *flevel
->files
[i
].file_metadata
,
5577 range_del_agg
, c
->mutable_cf_options()->prefix_extractor
.get(),
5578 /*table_reader_ptr=*/nullptr,
5579 /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction
,
5581 /*skip_filters=*/false,
5582 /*level=*/static_cast<int>(c
->level(which
)),
5583 MaxFileSizeForL0MetaPin(*c
->mutable_cf_options()),
5584 /*smallest_compaction_key=*/nullptr,
5585 /*largest_compaction_key=*/nullptr,
5586 /*allow_unprepared_value=*/false);
5589 // Create concatenating iterator for the files from this level
5590 list
[num
++] = new LevelIterator(
5591 cfd
->table_cache(), read_options
, file_options_compactions
,
5592 cfd
->internal_comparator(), c
->input_levels(which
),
5593 c
->mutable_cf_options()->prefix_extractor
.get(),
5594 /*should_sample=*/false,
5595 /*no per level latency histogram=*/nullptr,
5596 TableReaderCaller::kCompaction
, /*skip_filters=*/false,
5597 /*level=*/static_cast<int>(c
->level(which
)), range_del_agg
,
5598 c
->boundaries(which
));
5602 assert(num
<= space
);
5603 InternalIterator
* result
=
5604 NewMergingIterator(&c
->column_family_data()->internal_comparator(), list
,
5605 static_cast<int>(num
));
5610 // verify that the files listed in this compaction are present
5611 // in the current version
5612 bool VersionSet::VerifyCompactionFileConsistency(Compaction
* c
) {
5614 Version
* version
= c
->column_family_data()->current();
5615 const VersionStorageInfo
* vstorage
= version
->storage_info();
5616 if (c
->input_version() != version
) {
5618 db_options_
->info_log
,
5619 "[%s] compaction output being applied to a different base version from"
5621 c
->column_family_data()->GetName().c_str());
5623 if (vstorage
->compaction_style_
== kCompactionStyleLevel
&&
5624 c
->start_level() == 0 && c
->num_input_levels() > 2U) {
5625 // We are doing a L0->base_level compaction. The assumption is if
5626 // base level is not L1, levels from L1 to base_level - 1 is empty.
5627 // This is ensured by having one compaction from L0 going on at the
5628 // same time in level-based compaction. So that during the time, no
5629 // compaction/flush can put files to those levels.
5630 for (int l
= c
->start_level() + 1; l
< c
->output_level(); l
++) {
5631 if (vstorage
->NumLevelFiles(l
) != 0) {
5638 for (size_t input
= 0; input
< c
->num_input_levels(); ++input
) {
5639 int level
= c
->level(input
);
5640 for (size_t i
= 0; i
< c
->num_input_files(input
); ++i
) {
5641 uint64_t number
= c
->input(input
, i
)->fd
.GetNumber();
5643 for (size_t j
= 0; j
< vstorage
->files_
[level
].size(); j
++) {
5644 FileMetaData
* f
= vstorage
->files_
[level
][j
];
5645 if (f
->fd
.GetNumber() == number
) {
5651 return false; // input files non existent in current version
5658 return true; // everything good
5661 Status
VersionSet::GetMetadataForFile(uint64_t number
, int* filelevel
,
5662 FileMetaData
** meta
,
5663 ColumnFamilyData
** cfd
) {
5664 for (auto cfd_iter
: *column_family_set_
) {
5665 if (!cfd_iter
->initialized()) {
5668 Version
* version
= cfd_iter
->current();
5669 const auto* vstorage
= version
->storage_info();
5670 for (int level
= 0; level
< vstorage
->num_levels(); level
++) {
5671 for (const auto& file
: vstorage
->LevelFiles(level
)) {
5672 if (file
->fd
.GetNumber() == number
) {
5676 return Status::OK();
5681 return Status::NotFound("File not present in any level");
5684 void VersionSet::GetLiveFilesMetaData(std::vector
<LiveFileMetaData
>* metadata
) {
5685 for (auto cfd
: *column_family_set_
) {
5686 if (cfd
->IsDropped() || !cfd
->initialized()) {
5689 for (int level
= 0; level
< cfd
->NumberLevels(); level
++) {
5690 for (const auto& file
:
5691 cfd
->current()->storage_info()->LevelFiles(level
)) {
5692 LiveFileMetaData filemetadata
;
5693 filemetadata
.column_family_name
= cfd
->GetName();
5694 uint32_t path_id
= file
->fd
.GetPathId();
5695 if (path_id
< cfd
->ioptions()->cf_paths
.size()) {
5696 filemetadata
.db_path
= cfd
->ioptions()->cf_paths
[path_id
].path
;
5698 assert(!cfd
->ioptions()->cf_paths
.empty());
5699 filemetadata
.db_path
= cfd
->ioptions()->cf_paths
.back().path
;
5701 const uint64_t file_number
= file
->fd
.GetNumber();
5702 filemetadata
.name
= MakeTableFileName("", file_number
);
5703 filemetadata
.file_number
= file_number
;
5704 filemetadata
.level
= level
;
5705 filemetadata
.size
= static_cast<size_t>(file
->fd
.GetFileSize());
5706 filemetadata
.smallestkey
= file
->smallest
.user_key().ToString();
5707 filemetadata
.largestkey
= file
->largest
.user_key().ToString();
5708 filemetadata
.smallest_seqno
= file
->fd
.smallest_seqno
;
5709 filemetadata
.largest_seqno
= file
->fd
.largest_seqno
;
5710 filemetadata
.num_reads_sampled
= file
->stats
.num_reads_sampled
.load(
5711 std::memory_order_relaxed
);
5712 filemetadata
.being_compacted
= file
->being_compacted
;
5713 filemetadata
.num_entries
= file
->num_entries
;
5714 filemetadata
.num_deletions
= file
->num_deletions
;
5715 filemetadata
.oldest_blob_file_number
= file
->oldest_blob_file_number
;
5716 filemetadata
.file_checksum
= file
->file_checksum
;
5717 filemetadata
.file_checksum_func_name
= file
->file_checksum_func_name
;
5718 metadata
->push_back(filemetadata
);
5724 void VersionSet::GetObsoleteFiles(std::vector
<ObsoleteFileInfo
>* files
,
5725 std::vector
<ObsoleteBlobFileInfo
>* blob_files
,
5726 std::vector
<std::string
>* manifest_filenames
,
5727 uint64_t min_pending_output
) {
5730 assert(manifest_filenames
);
5731 assert(files
->empty());
5732 assert(blob_files
->empty());
5733 assert(manifest_filenames
->empty());
5735 std::vector
<ObsoleteFileInfo
> pending_files
;
5736 for (auto& f
: obsolete_files_
) {
5737 if (f
.metadata
->fd
.GetNumber() < min_pending_output
) {
5738 files
->emplace_back(std::move(f
));
5740 pending_files
.emplace_back(std::move(f
));
5743 obsolete_files_
.swap(pending_files
);
5745 std::vector
<ObsoleteBlobFileInfo
> pending_blob_files
;
5746 for (auto& blob_file
: obsolete_blob_files_
) {
5747 if (blob_file
.GetBlobFileNumber() < min_pending_output
) {
5748 blob_files
->emplace_back(std::move(blob_file
));
5750 pending_blob_files
.emplace_back(std::move(blob_file
));
5753 obsolete_blob_files_
.swap(pending_blob_files
);
5755 obsolete_manifests_
.swap(*manifest_filenames
);
5758 ColumnFamilyData
* VersionSet::CreateColumnFamily(
5759 const ColumnFamilyOptions
& cf_options
, const VersionEdit
* edit
) {
5760 assert(edit
->is_column_family_add_
);
5762 MutableCFOptions dummy_cf_options
;
5763 Version
* dummy_versions
=
5764 new Version(nullptr, this, file_options_
, dummy_cf_options
, io_tracer_
);
5765 // Ref() dummy version once so that later we can call Unref() to delete it
5766 // by avoiding calling "delete" explicitly (~Version is private)
5767 dummy_versions
->Ref();
5768 auto new_cfd
= column_family_set_
->CreateColumnFamily(
5769 edit
->column_family_name_
, edit
->column_family_
, dummy_versions
,
5772 Version
* v
= new Version(new_cfd
, this, file_options_
,
5773 *new_cfd
->GetLatestMutableCFOptions(), io_tracer_
,
5774 current_version_number_
++);
5776 // Fill level target base information.
5777 v
->storage_info()->CalculateBaseBytes(*new_cfd
->ioptions(),
5778 *new_cfd
->GetLatestMutableCFOptions());
5779 AppendVersion(new_cfd
, v
);
5780 // GetLatestMutableCFOptions() is safe here without mutex since the
5781 // cfd is not available to client
5782 new_cfd
->CreateNewMemtable(*new_cfd
->GetLatestMutableCFOptions(),
5784 new_cfd
->SetLogNumber(edit
->log_number_
);
5788 uint64_t VersionSet::GetNumLiveVersions(Version
* dummy_versions
) {
5790 for (Version
* v
= dummy_versions
->next_
; v
!= dummy_versions
; v
= v
->next_
) {
5796 uint64_t VersionSet::GetTotalSstFilesSize(Version
* dummy_versions
) {
5797 std::unordered_set
<uint64_t> unique_files
;
5798 uint64_t total_files_size
= 0;
5799 for (Version
* v
= dummy_versions
->next_
; v
!= dummy_versions
; v
= v
->next_
) {
5800 VersionStorageInfo
* storage_info
= v
->storage_info();
5801 for (int level
= 0; level
< storage_info
->num_levels_
; level
++) {
5802 for (const auto& file_meta
: storage_info
->LevelFiles(level
)) {
5803 if (unique_files
.find(file_meta
->fd
.packed_number_and_path_id
) ==
5804 unique_files
.end()) {
5805 unique_files
.insert(file_meta
->fd
.packed_number_and_path_id
);
5806 total_files_size
+= file_meta
->fd
.GetFileSize();
5811 return total_files_size
;
5814 Status
VersionSet::VerifyFileMetadata(const std::string
& fpath
,
5815 const FileMetaData
& meta
) const {
5817 Status status
= fs_
->GetFileSize(fpath
, IOOptions(), &fsize
, nullptr);
5819 if (fsize
!= meta
.fd
.GetFileSize()) {
5820 status
= Status::Corruption("File size mismatch: " + fpath
);
5826 ReactiveVersionSet::ReactiveVersionSet(
5827 const std::string
& dbname
, const ImmutableDBOptions
* _db_options
,
5828 const FileOptions
& _file_options
, Cache
* table_cache
,
5829 WriteBufferManager
* write_buffer_manager
, WriteController
* write_controller
,
5830 const std::shared_ptr
<IOTracer
>& io_tracer
)
5831 : VersionSet(dbname
, _db_options
, _file_options
, table_cache
,
5832 write_buffer_manager
, write_controller
,
5833 /*block_cache_tracer=*/nullptr, io_tracer
),
5834 number_of_edits_to_skip_(0) {}
5836 ReactiveVersionSet::~ReactiveVersionSet() {}
5838 Status
ReactiveVersionSet::Recover(
5839 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
5840 std::unique_ptr
<log::FragmentBufferedReader
>* manifest_reader
,
5841 std::unique_ptr
<log::Reader::Reporter
>* manifest_reporter
,
5842 std::unique_ptr
<Status
>* manifest_reader_status
) {
5843 assert(manifest_reader
!= nullptr);
5844 assert(manifest_reporter
!= nullptr);
5845 assert(manifest_reader_status
!= nullptr);
5847 std::unordered_map
<std::string
, ColumnFamilyOptions
> cf_name_to_options
;
5848 for (const auto& cf
: column_families
) {
5849 cf_name_to_options
.insert({cf
.name
, cf
.options
});
5852 // add default column family
5853 auto default_cf_iter
= cf_name_to_options
.find(kDefaultColumnFamilyName
);
5854 if (default_cf_iter
== cf_name_to_options
.end()) {
5855 return Status::InvalidArgument("Default column family not specified");
5857 VersionEdit default_cf_edit
;
5858 default_cf_edit
.AddColumnFamily(kDefaultColumnFamilyName
);
5859 default_cf_edit
.SetColumnFamily(0);
5860 ColumnFamilyData
* default_cfd
=
5861 CreateColumnFamily(default_cf_iter
->second
, &default_cf_edit
);
5862 // In recovery, nobody else can access it, so it's fine to set it to be
5863 // initialized earlier.
5864 default_cfd
->set_initialized();
5865 VersionBuilderMap builders
;
5866 std::unordered_map
<int, std::string
> column_families_not_found
;
5868 std::make_pair(0, std::unique_ptr
<BaseReferencedVersionBuilder
>(
5869 new BaseReferencedVersionBuilder(default_cfd
))));
5871 manifest_reader_status
->reset(new Status());
5872 manifest_reporter
->reset(new LogReporter());
5873 static_cast_with_check
<LogReporter
>(manifest_reporter
->get())->status
=
5874 manifest_reader_status
->get();
5875 Status s
= MaybeSwitchManifest(manifest_reporter
->get(), manifest_reader
);
5876 log::Reader
* reader
= manifest_reader
->get();
5879 VersionEdit version_edit
;
5880 while (s
.ok() && retry
< 1) {
5881 assert(reader
!= nullptr);
5882 s
= ReadAndRecover(*reader
, &read_buffer_
, cf_name_to_options
,
5883 column_families_not_found
, builders
,
5884 manifest_reader_status
->get(), &version_edit
);
5886 bool enough
= version_edit
.has_next_file_number_
&&
5887 version_edit
.has_log_number_
&&
5888 version_edit
.has_last_sequence_
;
5890 for (const auto& cf
: column_families
) {
5891 auto cfd
= column_family_set_
->GetColumnFamily(cf
.name
);
5892 if (cfd
== nullptr) {
5899 for (const auto& cf
: column_families
) {
5900 auto cfd
= column_family_set_
->GetColumnFamily(cf
.name
);
5901 assert(cfd
!= nullptr);
5902 if (!cfd
->IsDropped()) {
5903 auto builder_iter
= builders
.find(cfd
->GetID());
5904 assert(builder_iter
!= builders
.end());
5905 auto builder
= builder_iter
->second
->version_builder();
5906 assert(builder
!= nullptr);
5907 s
= builder
->LoadTableHandlers(
5908 cfd
->internal_stats(), db_options_
->max_file_opening_threads
,
5909 false /* prefetch_index_and_filter_in_cache */,
5910 true /* is_initial_load */,
5911 cfd
->GetLatestMutableCFOptions()->prefix_extractor
.get(),
5912 MaxFileSizeForL0MetaPin(*cfd
->GetLatestMutableCFOptions()));
5915 if (s
.IsPathNotFound()) {
5931 if (!version_edit
.has_prev_log_number_
) {
5932 version_edit
.prev_log_number_
= 0;
5934 column_family_set_
->UpdateMaxColumnFamily(version_edit
.max_column_family_
);
5936 MarkMinLogNumberToKeep2PC(version_edit
.min_log_number_to_keep_
);
5937 MarkFileNumberUsed(version_edit
.prev_log_number_
);
5938 MarkFileNumberUsed(version_edit
.log_number_
);
5940 for (auto cfd
: *column_family_set_
) {
5941 assert(builders
.count(cfd
->GetID()) > 0);
5942 auto builder
= builders
[cfd
->GetID()]->version_builder();
5943 if (!builder
->CheckConsistencyForNumLevels()) {
5944 s
= Status::InvalidArgument(
5945 "db has more levels than options.num_levels");
5952 for (auto cfd
: *column_family_set_
) {
5953 if (cfd
->IsDropped()) {
5956 assert(cfd
->initialized());
5957 auto builders_iter
= builders
.find(cfd
->GetID());
5958 assert(builders_iter
!= builders
.end());
5959 auto* builder
= builders_iter
->second
->version_builder();
5961 Version
* v
= new Version(cfd
, this, file_options_
,
5962 *cfd
->GetLatestMutableCFOptions(), io_tracer_
,
5963 current_version_number_
++);
5964 s
= builder
->SaveTo(v
->storage_info());
5967 // Install recovered version
5968 v
->PrepareApply(*cfd
->GetLatestMutableCFOptions(),
5969 !(db_options_
->skip_stats_update_on_db_open
));
5970 AppendVersion(cfd
, v
);
5972 ROCKS_LOG_ERROR(db_options_
->info_log
,
5973 "[%s]: inconsistent version: %s\n",
5974 cfd
->GetName().c_str(), s
.ToString().c_str());
5981 next_file_number_
.store(version_edit
.next_file_number_
+ 1);
5982 last_allocated_sequence_
= version_edit
.last_sequence_
;
5983 last_published_sequence_
= version_edit
.last_sequence_
;
5984 last_sequence_
= version_edit
.last_sequence_
;
5985 prev_log_number_
= version_edit
.prev_log_number_
;
5986 for (auto cfd
: *column_family_set_
) {
5987 if (cfd
->IsDropped()) {
5990 ROCKS_LOG_INFO(db_options_
->info_log
,
5991 "Column family [%s] (ID %u), log number is %" PRIu64
"\n",
5992 cfd
->GetName().c_str(), cfd
->GetID(), cfd
->GetLogNumber());
5998 Status
ReactiveVersionSet::ReadAndApply(
5999 InstrumentedMutex
* mu
,
6000 std::unique_ptr
<log::FragmentBufferedReader
>* manifest_reader
,
6001 std::unordered_set
<ColumnFamilyData
*>* cfds_changed
) {
6002 assert(manifest_reader
!= nullptr);
6003 assert(cfds_changed
!= nullptr);
6007 uint64_t applied_edits
= 0;
6010 std::string scratch
;
6011 log::Reader
* reader
= manifest_reader
->get();
6012 std::string old_manifest_path
= reader
->file()->file_name();
6013 while (reader
->ReadRecord(&record
, &scratch
)) {
6015 s
= edit
.DecodeFrom(record
);
6020 // Skip the first VersionEdits of each MANIFEST generated by
6021 // VersionSet::WriteCurrentStatetoManifest.
6022 if (number_of_edits_to_skip_
> 0) {
6023 ColumnFamilyData
* cfd
=
6024 column_family_set_
->GetColumnFamily(edit
.column_family_
);
6025 if (cfd
!= nullptr && !cfd
->IsDropped()) {
6026 --number_of_edits_to_skip_
;
6031 s
= read_buffer_
.AddEdit(&edit
);
6035 VersionEdit temp_edit
;
6036 if (edit
.is_in_atomic_group_
) {
6037 if (read_buffer_
.IsFull()) {
6038 // Apply edits in an atomic group when we have read all edits in the
6040 for (auto& e
: read_buffer_
.replay_buffer()) {
6041 s
= ApplyOneVersionEditToBuilder(e
, cfds_changed
, &temp_edit
);
6050 read_buffer_
.Clear();
6053 // Apply a normal edit immediately.
6054 s
= ApplyOneVersionEditToBuilder(edit
, cfds_changed
, &temp_edit
);
6063 // Clear the buffer if we fail to decode/apply an edit.
6064 read_buffer_
.Clear();
6066 // It's possible that:
6067 // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
6068 // Or the version(s) rebuilt from tailing the MANIFEST is inconsistent.
6069 // 2) we have finished reading the current MANIFEST.
6070 // 3) we have encountered an IOError reading the current MANIFEST.
6071 // We need to look for the next MANIFEST and start from there. If we cannot
6072 // find the next MANIFEST, we should exit the loop.
6073 Status tmp_s
= MaybeSwitchManifest(reader
->GetReporter(), manifest_reader
);
6074 reader
= manifest_reader
->get();
6076 if (reader
->file()->file_name() == old_manifest_path
) {
6077 // Still processing the same MANIFEST, thus no need to continue this
6078 // loop since no record is available if we have reached here.
6081 // We have switched to a new MANIFEST whose first records have been
6082 // generated by VersionSet::WriteCurrentStatetoManifest. Since the
6083 // secondary instance has already finished recovering upon start, there
6084 // is no need for the secondary to process these records. Actually, if
6085 // the secondary were to replay these records, the secondary may end up
6086 // adding the same SST files AGAIN to each column family, causing
6087 // consistency checks done by VersionBuilder to fail. Therefore, we
6088 // record the number of records to skip at the beginning of the new
6089 // MANIFEST and ignore them.
6090 number_of_edits_to_skip_
= 0;
6091 for (auto* cfd
: *column_family_set_
) {
6092 if (cfd
->IsDropped()) {
6095 // Increase number_of_edits_to_skip by 2 because
6096 // WriteCurrentStatetoManifest() writes 2 version edits for each
6097 // column family at the beginning of the newly-generated MANIFEST.
6098 // TODO(yanqin) remove hard-coded value.
6099 if (db_options_
->write_dbid_to_manifest
) {
6100 number_of_edits_to_skip_
+= 3;
6102 number_of_edits_to_skip_
+= 2;
6111 for (auto cfd
: *column_family_set_
) {
6112 auto builder_iter
= active_version_builders_
.find(cfd
->GetID());
6113 if (builder_iter
== active_version_builders_
.end()) {
6116 auto builder
= builder_iter
->second
->version_builder();
6117 if (!builder
->CheckConsistencyForNumLevels()) {
6118 s
= Status::InvalidArgument(
6119 "db has more levels than options.num_levels");
6124 TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits",
6129 Status
ReactiveVersionSet::ApplyOneVersionEditToBuilder(
6130 VersionEdit
& edit
, std::unordered_set
<ColumnFamilyData
*>* cfds_changed
,
6131 VersionEdit
* version_edit
) {
6132 ColumnFamilyData
* cfd
=
6133 column_family_set_
->GetColumnFamily(edit
.column_family_
);
6135 // If we cannot find this column family in our column family set, then it
6136 // may be a new column family created by the primary after the secondary
6137 // starts. It is also possible that the secondary instance opens only a subset
6138 // of column families. Ignore it for now.
6139 if (nullptr == cfd
) {
6140 return Status::OK();
6142 if (active_version_builders_
.find(edit
.column_family_
) ==
6143 active_version_builders_
.end() &&
6144 !cfd
->IsDropped()) {
6145 std::unique_ptr
<BaseReferencedVersionBuilder
> builder_guard(
6146 new BaseReferencedVersionBuilder(cfd
));
6147 active_version_builders_
.insert(
6148 std::make_pair(edit
.column_family_
, std::move(builder_guard
)));
6151 auto builder_iter
= active_version_builders_
.find(edit
.column_family_
);
6152 assert(builder_iter
!= active_version_builders_
.end());
6153 auto builder
= builder_iter
->second
->version_builder();
6154 assert(builder
!= nullptr);
6156 if (edit
.is_column_family_add_
) {
6157 // TODO (yanqin) for now the secondary ignores column families created
6158 // after Open. This also simplifies handling of switching to a new MANIFEST
6159 // and processing the snapshot of the system at the beginning of the
6161 } else if (edit
.is_column_family_drop_
) {
6162 // Drop the column family by setting it to be 'dropped' without destroying
6163 // the column family handle.
6164 // TODO (haoyu) figure out how to handle column faimly drop for
6165 // secondary instance. (Is it possible that the ref count for cfd is 0 but
6166 // the ref count for its versions is higher than 0?)
6168 if (cfd
->UnrefAndTryDelete()) {
6171 active_version_builders_
.erase(builder_iter
);
6173 Status s
= builder
->Apply(&edit
);
6178 Status s
= ExtractInfoFromVersionEdit(cfd
, edit
, version_edit
);
6183 if (cfd
!= nullptr && !cfd
->IsDropped()) {
6184 s
= builder
->LoadTableHandlers(
6185 cfd
->internal_stats(), db_options_
->max_file_opening_threads
,
6186 false /* prefetch_index_and_filter_in_cache */,
6187 false /* is_initial_load */,
6188 cfd
->GetLatestMutableCFOptions()->prefix_extractor
.get(),
6189 MaxFileSizeForL0MetaPin(*cfd
->GetLatestMutableCFOptions()));
6190 TEST_SYNC_POINT_CALLBACK(
6191 "ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
6192 "AfterLoadTableHandlers",
6196 auto version
= new Version(cfd
, this, file_options_
,
6197 *cfd
->GetLatestMutableCFOptions(), io_tracer_
,
6198 current_version_number_
++);
6199 s
= builder
->SaveTo(version
->storage_info());
6201 version
->PrepareApply(*cfd
->GetLatestMutableCFOptions(), true);
6202 AppendVersion(cfd
, version
);
6203 active_version_builders_
.erase(builder_iter
);
6204 if (cfds_changed
->count(cfd
) == 0) {
6205 cfds_changed
->insert(cfd
);
6210 } else if (s
.IsPathNotFound()) {
6213 // Some other error has occurred during LoadTableHandlers.
6217 if (version_edit
->HasNextFile()) {
6218 next_file_number_
.store(version_edit
->next_file_number_
+ 1);
6220 if (version_edit
->has_last_sequence_
) {
6221 last_allocated_sequence_
= version_edit
->last_sequence_
;
6222 last_published_sequence_
= version_edit
->last_sequence_
;
6223 last_sequence_
= version_edit
->last_sequence_
;
6225 if (version_edit
->has_prev_log_number_
) {
6226 prev_log_number_
= version_edit
->prev_log_number_
;
6227 MarkFileNumberUsed(version_edit
->prev_log_number_
);
6229 if (version_edit
->has_log_number_
) {
6230 MarkFileNumberUsed(version_edit
->log_number_
);
6232 column_family_set_
->UpdateMaxColumnFamily(version_edit
->max_column_family_
);
6233 MarkMinLogNumberToKeep2PC(version_edit
->min_log_number_to_keep_
);
6238 Status
ReactiveVersionSet::MaybeSwitchManifest(
6239 log::Reader::Reporter
* reporter
,
6240 std::unique_ptr
<log::FragmentBufferedReader
>* manifest_reader
) {
6241 assert(manifest_reader
!= nullptr);
6244 std::string manifest_path
;
6245 s
= GetCurrentManifestPath(dbname_
, fs_
.get(), &manifest_path
,
6246 &manifest_file_number_
);
6247 std::unique_ptr
<FSSequentialFile
> manifest_file
;
6249 if (nullptr == manifest_reader
->get() ||
6250 manifest_reader
->get()->file()->file_name() != manifest_path
) {
6252 "ReactiveVersionSet::MaybeSwitchManifest:"
6253 "AfterGetCurrentManifestPath:0");
6255 "ReactiveVersionSet::MaybeSwitchManifest:"
6256 "AfterGetCurrentManifestPath:1");
6257 s
= fs_
->NewSequentialFile(manifest_path
,
6258 env_
->OptimizeForManifestRead(file_options_
),
6259 &manifest_file
, nullptr);
6261 // No need to switch manifest.
6265 std::unique_ptr
<SequentialFileReader
> manifest_file_reader
;
6267 manifest_file_reader
.reset(new SequentialFileReader(
6268 std::move(manifest_file
), manifest_path
,
6269 db_options_
->log_readahead_size
, io_tracer_
));
6270 manifest_reader
->reset(new log::FragmentBufferedReader(
6271 nullptr, std::move(manifest_file_reader
), reporter
,
6272 true /* checksum */, 0 /* log_number */));
6273 ROCKS_LOG_INFO(db_options_
->info_log
, "Switched to new manifest: %s\n",
6274 manifest_path
.c_str());
6275 // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
6276 // active_version_builders_ map because we choose to construct the
6277 // versions from scratch, thanks to the first part of each MANIFEST
6278 // written by VersionSet::WriteCurrentStatetoManifest. This is not
6279 // necessary, but we choose this at present for the sake of simplicity.
6280 active_version_builders_
.clear();
6282 } while (s
.IsPathNotFound());
6286 } // namespace ROCKSDB_NAMESPACE