1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/compaction/compaction_picker_universal.h"
18 #include "db/column_family.h"
19 #include "file/filename.h"
20 #include "logging/log_buffer.h"
21 #include "monitoring/statistics.h"
22 #include "test_util/sync_point.h"
23 #include "util/random.h"
24 #include "util/string_util.h"
26 namespace ROCKSDB_NAMESPACE
{
28 // A helper class that form universal compactions. The class is used by
29 // UniversalCompactionPicker::PickCompaction().
30 // The usage is to create the class, and get the compaction object by calling
32 class UniversalCompactionBuilder
{
34 UniversalCompactionBuilder(const ImmutableCFOptions
& ioptions
,
35 const InternalKeyComparator
* icmp
,
36 const std::string
& cf_name
,
37 const MutableCFOptions
& mutable_cf_options
,
38 VersionStorageInfo
* vstorage
,
39 UniversalCompactionPicker
* picker
,
40 LogBuffer
* log_buffer
)
41 : ioptions_(ioptions
),
44 mutable_cf_options_(mutable_cf_options
),
47 log_buffer_(log_buffer
) {}
49 // Form and return the compaction object. The caller owns return object.
50 Compaction
* PickCompaction();
54 SortedRun(int _level
, FileMetaData
* _file
, uint64_t _size
,
55 uint64_t _compensated_file_size
, bool _being_compacted
)
59 compensated_file_size(_compensated_file_size
),
60 being_compacted(_being_compacted
) {
61 assert(compensated_file_size
> 0);
62 assert(level
!= 0 || file
!= nullptr);
65 void Dump(char* out_buf
, size_t out_buf_size
,
66 bool print_path
= false) const;
68 // sorted_run_count is added into the string to print
69 void DumpSizeInfo(char* out_buf
, size_t out_buf_size
,
70 size_t sorted_run_count
) const;
73 // `file` Will be null for level > 0. For level = 0, the sorted run is
76 // For level > 0, `size` and `compensated_file_size` are sum of sizes all
77 // files in the level. `being_compacted` should be the same for all files
78 // in a non-zero level. Use the value here.
80 uint64_t compensated_file_size
;
84 // Pick Universal compaction to limit read amplification
85 Compaction
* PickCompactionToReduceSortedRuns(
86 unsigned int ratio
, unsigned int max_number_of_files_to_compact
);
88 // Pick Universal compaction to limit space amplification.
89 Compaction
* PickCompactionToReduceSizeAmp();
91 Compaction
* PickDeleteTriggeredCompaction();
93 // Form a compaction from the sorted run indicated by start_index to the
95 // The caller is responsible for making sure that those files are not in
97 Compaction
* PickCompactionToOldest(size_t start_index
,
98 CompactionReason compaction_reason
);
100 // Try to pick periodic compaction. The caller should only call it
101 // if there is at least one file marked for periodic compaction.
102 // null will be returned if no such a compaction can be formed
103 // because some files are being compacted.
104 Compaction
* PickPeriodicCompaction();
106 // Used in universal compaction when the enabled_trivial_move
107 // option is set. Checks whether there are any overlapping files
108 // in the input. Returns true if the input files are non
110 bool IsInputFilesNonOverlapping(Compaction
* c
);
112 const ImmutableCFOptions
& ioptions_
;
113 const InternalKeyComparator
* icmp_
;
115 std::vector
<SortedRun
> sorted_runs_
;
116 const std::string
& cf_name_
;
117 const MutableCFOptions
& mutable_cf_options_
;
118 VersionStorageInfo
* vstorage_
;
119 UniversalCompactionPicker
* picker_
;
120 LogBuffer
* log_buffer_
;
122 static std::vector
<SortedRun
> CalculateSortedRuns(
123 const VersionStorageInfo
& vstorage
, const ImmutableCFOptions
& ioptions
,
124 const MutableCFOptions
& mutable_cf_options
);
126 // Pick a path ID to place a newly generated file, with its estimated file
128 static uint32_t GetPathId(const ImmutableCFOptions
& ioptions
,
129 const MutableCFOptions
& mutable_cf_options
,
133 // Used in universal compaction when trivial move is enabled.
134 // This structure is used for the construction of min heap
135 // that contains the file meta data, the level of the file
136 // and the index of the file in that level
138 struct InputFileInfo
{
139 InputFileInfo() : f(nullptr), level(0), index(0) {}
146 // Used in universal compaction when trivial move is enabled.
147 // This comparator is used for the construction of min heap
148 // based on the smallest key of the file.
149 struct SmallestKeyHeapComparator
{
150 explicit SmallestKeyHeapComparator(const Comparator
* ucmp
) { ucmp_
= ucmp
; }
152 bool operator()(InputFileInfo i1
, InputFileInfo i2
) const {
153 return (ucmp_
->Compare(i1
.f
->smallest
.user_key(),
154 i2
.f
->smallest
.user_key()) > 0);
158 const Comparator
* ucmp_
;
161 typedef std::priority_queue
<InputFileInfo
, std::vector
<InputFileInfo
>,
162 SmallestKeyHeapComparator
>
165 // This function creates the heap that is used to find if the files are
166 // overlapping during universal compaction when the allow_trivial_move
168 SmallestKeyHeap
create_level_heap(Compaction
* c
, const Comparator
* ucmp
) {
169 SmallestKeyHeap smallest_key_priority_q
=
170 SmallestKeyHeap(SmallestKeyHeapComparator(ucmp
));
172 InputFileInfo input_file
;
174 for (size_t l
= 0; l
< c
->num_input_levels(); l
++) {
175 if (c
->num_input_files(l
) != 0) {
176 if (l
== 0 && c
->start_level() == 0) {
177 for (size_t i
= 0; i
< c
->num_input_files(0); i
++) {
178 input_file
.f
= c
->input(0, i
);
179 input_file
.level
= 0;
180 input_file
.index
= i
;
181 smallest_key_priority_q
.push(std::move(input_file
));
184 input_file
.f
= c
->input(l
, 0);
185 input_file
.level
= l
;
186 input_file
.index
= 0;
187 smallest_key_priority_q
.push(std::move(input_file
));
191 return smallest_key_priority_q
;
195 // smallest_seqno and largest_seqno are set iff. `files` is not empty.
196 void GetSmallestLargestSeqno(const std::vector
<FileMetaData
*>& files
,
197 SequenceNumber
* smallest_seqno
,
198 SequenceNumber
* largest_seqno
) {
199 bool is_first
= true;
200 for (FileMetaData
* f
: files
) {
201 assert(f
->fd
.smallest_seqno
<= f
->fd
.largest_seqno
);
204 *smallest_seqno
= f
->fd
.smallest_seqno
;
205 *largest_seqno
= f
->fd
.largest_seqno
;
207 if (f
->fd
.smallest_seqno
< *smallest_seqno
) {
208 *smallest_seqno
= f
->fd
.smallest_seqno
;
210 if (f
->fd
.largest_seqno
> *largest_seqno
) {
211 *largest_seqno
= f
->fd
.largest_seqno
;
219 // Algorithm that checks to see if there are any overlapping
220 // files in the input
221 bool UniversalCompactionBuilder::IsInputFilesNonOverlapping(Compaction
* c
) {
222 auto comparator
= icmp_
->user_comparator();
225 InputFileInfo prev
, curr
, next
;
227 SmallestKeyHeap smallest_key_priority_q
=
228 create_level_heap(c
, icmp_
->user_comparator());
230 while (!smallest_key_priority_q
.empty()) {
231 curr
= smallest_key_priority_q
.top();
232 smallest_key_priority_q
.pop();
238 if (comparator
->Compare(prev
.f
->largest
.user_key(),
239 curr
.f
->smallest
.user_key()) >= 0) {
240 // found overlapping files, return false
243 assert(comparator
->Compare(curr
.f
->largest
.user_key(),
244 prev
.f
->largest
.user_key()) > 0);
250 if (c
->level(curr
.level
) != 0 &&
251 curr
.index
< c
->num_input_files(curr
.level
) - 1) {
252 next
.f
= c
->input(curr
.level
, curr
.index
+ 1);
253 next
.level
= curr
.level
;
254 next
.index
= curr
.index
+ 1;
258 smallest_key_priority_q
.push(std::move(next
));
264 bool UniversalCompactionPicker::NeedsCompaction(
265 const VersionStorageInfo
* vstorage
) const {
266 const int kLevel0
= 0;
267 if (vstorage
->CompactionScore(kLevel0
) >= 1) {
270 if (!vstorage
->FilesMarkedForPeriodicCompaction().empty()) {
273 if (!vstorage
->FilesMarkedForCompaction().empty()) {
279 Compaction
* UniversalCompactionPicker::PickCompaction(
280 const std::string
& cf_name
, const MutableCFOptions
& mutable_cf_options
,
281 VersionStorageInfo
* vstorage
, LogBuffer
* log_buffer
,
282 SequenceNumber
/* earliest_memtable_seqno */) {
283 UniversalCompactionBuilder
builder(ioptions_
, icmp_
, cf_name
,
284 mutable_cf_options
, vstorage
, this,
286 return builder
.PickCompaction();
289 void UniversalCompactionBuilder::SortedRun::Dump(char* out_buf
,
291 bool print_path
) const {
293 assert(file
!= nullptr);
294 if (file
->fd
.GetPathId() == 0 || !print_path
) {
295 snprintf(out_buf
, out_buf_size
, "file %" PRIu64
, file
->fd
.GetNumber());
297 snprintf(out_buf
, out_buf_size
, "file %" PRIu64
300 file
->fd
.GetNumber(), file
->fd
.GetPathId());
303 snprintf(out_buf
, out_buf_size
, "level %d", level
);
307 void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
308 char* out_buf
, size_t out_buf_size
, size_t sorted_run_count
) const {
310 assert(file
!= nullptr);
311 snprintf(out_buf
, out_buf_size
,
312 "file %" PRIu64
"[%" ROCKSDB_PRIszt
314 "with size %" PRIu64
" (compensated size %" PRIu64
")",
315 file
->fd
.GetNumber(), sorted_run_count
, file
->fd
.GetFileSize(),
316 file
->compensated_file_size
);
318 snprintf(out_buf
, out_buf_size
,
319 "level %d[%" ROCKSDB_PRIszt
321 "with size %" PRIu64
" (compensated size %" PRIu64
")",
322 level
, sorted_run_count
, size
, compensated_file_size
);
326 std::vector
<UniversalCompactionBuilder::SortedRun
>
327 UniversalCompactionBuilder::CalculateSortedRuns(
328 const VersionStorageInfo
& vstorage
, const ImmutableCFOptions
& /*ioptions*/,
329 const MutableCFOptions
& mutable_cf_options
) {
330 std::vector
<UniversalCompactionBuilder::SortedRun
> ret
;
331 for (FileMetaData
* f
: vstorage
.LevelFiles(0)) {
332 ret
.emplace_back(0, f
, f
->fd
.GetFileSize(), f
->compensated_file_size
,
335 for (int level
= 1; level
< vstorage
.num_levels(); level
++) {
336 uint64_t total_compensated_size
= 0U;
337 uint64_t total_size
= 0U;
338 bool being_compacted
= false;
339 bool is_first
= true;
340 for (FileMetaData
* f
: vstorage
.LevelFiles(level
)) {
341 total_compensated_size
+= f
->compensated_file_size
;
342 total_size
+= f
->fd
.GetFileSize();
343 if (mutable_cf_options
.compaction_options_universal
.allow_trivial_move
==
345 if (f
->being_compacted
) {
346 being_compacted
= f
->being_compacted
;
349 // Compaction always includes all files for a non-zero level, so for a
350 // non-zero level, all the files should share the same being_compacted
352 // This assumption is only valid when
353 // mutable_cf_options.compaction_options_universal.allow_trivial_move
355 assert(is_first
|| f
->being_compacted
== being_compacted
);
358 being_compacted
= f
->being_compacted
;
362 if (total_compensated_size
> 0) {
363 ret
.emplace_back(level
, nullptr, total_size
, total_compensated_size
,
370 // Universal style of compaction. Pick files that are contiguous in
371 // time-range to compact.
372 Compaction
* UniversalCompactionBuilder::PickCompaction() {
373 const int kLevel0
= 0;
374 score_
= vstorage_
->CompactionScore(kLevel0
);
376 CalculateSortedRuns(*vstorage_
, ioptions_
, mutable_cf_options_
);
378 if (sorted_runs_
.size() == 0 ||
379 (vstorage_
->FilesMarkedForPeriodicCompaction().empty() &&
380 vstorage_
->FilesMarkedForCompaction().empty() &&
381 sorted_runs_
.size() < (unsigned int)mutable_cf_options_
382 .level0_file_num_compaction_trigger
)) {
383 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: nothing to do\n",
385 TEST_SYNC_POINT_CALLBACK(
386 "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
389 VersionStorageInfo::LevelSummaryStorage tmp
;
390 ROCKS_LOG_BUFFER_MAX_SZ(
392 "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt
"): %s\n",
393 cf_name_
.c_str(), sorted_runs_
.size(), vstorage_
->LevelSummary(&tmp
));
395 Compaction
* c
= nullptr;
396 // Periodic compaction has higher priority than other type of compaction
397 // because it's a hard requirement.
398 if (!vstorage_
->FilesMarkedForPeriodicCompaction().empty()) {
399 // Always need to do a full compaction for periodic compaction.
400 c
= PickPeriodicCompaction();
403 // Check for size amplification.
405 sorted_runs_
.size() >=
407 mutable_cf_options_
.level0_file_num_compaction_trigger
)) {
408 if ((c
= PickCompactionToReduceSizeAmp()) != nullptr) {
409 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: compacting for size amp\n",
412 // Size amplification is within limits. Try reducing read
413 // amplification while maintaining file size ratios.
415 mutable_cf_options_
.compaction_options_universal
.size_ratio
;
417 if ((c
= PickCompactionToReduceSortedRuns(ratio
, UINT_MAX
)) != nullptr) {
418 ROCKS_LOG_BUFFER(log_buffer_
,
419 "[%s] Universal: compacting for size ratio\n",
422 // Size amplification and file size ratios are within configured limits.
423 // If max read amplification is exceeding configured limits, then force
424 // compaction without looking at filesize ratios and try to reduce
425 // the number of files to fewer than level0_file_num_compaction_trigger.
426 // This is guaranteed by NeedsCompaction()
427 assert(sorted_runs_
.size() >=
429 mutable_cf_options_
.level0_file_num_compaction_trigger
));
430 // Get the total number of sorted runs that are not being compacted
431 int num_sr_not_compacted
= 0;
432 for (size_t i
= 0; i
< sorted_runs_
.size(); i
++) {
433 if (sorted_runs_
[i
].being_compacted
== false) {
434 num_sr_not_compacted
++;
438 // The number of sorted runs that are not being compacted is greater
439 // than the maximum allowed number of sorted runs
440 if (num_sr_not_compacted
>
441 mutable_cf_options_
.level0_file_num_compaction_trigger
) {
442 unsigned int num_files
=
443 num_sr_not_compacted
-
444 mutable_cf_options_
.level0_file_num_compaction_trigger
+ 1;
445 if ((c
= PickCompactionToReduceSortedRuns(UINT_MAX
, num_files
)) !=
447 ROCKS_LOG_BUFFER(log_buffer_
,
448 "[%s] Universal: compacting for file num -- %u\n",
449 cf_name_
.c_str(), num_files
);
457 if ((c
= PickDeleteTriggeredCompaction()) != nullptr) {
458 ROCKS_LOG_BUFFER(log_buffer_
,
459 "[%s] Universal: delete triggered compaction\n",
465 TEST_SYNC_POINT_CALLBACK(
466 "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
470 if (mutable_cf_options_
.compaction_options_universal
.allow_trivial_move
==
472 c
->compaction_reason() != CompactionReason::kPeriodicCompaction
) {
473 c
->set_is_trivial_move(IsInputFilesNonOverlapping(c
));
476 // validate that all the chosen files of L0 are non overlapping in time
478 SequenceNumber prev_smallest_seqno
= 0U;
479 bool is_first
= true;
481 size_t level_index
= 0U;
482 if (c
->start_level() == 0) {
483 for (auto f
: *c
->inputs(0)) {
484 assert(f
->fd
.smallest_seqno
<= f
->fd
.largest_seqno
);
488 prev_smallest_seqno
= f
->fd
.smallest_seqno
;
492 for (; level_index
< c
->num_input_levels(); level_index
++) {
493 if (c
->num_input_files(level_index
) != 0) {
494 SequenceNumber smallest_seqno
= 0U;
495 SequenceNumber largest_seqno
= 0U;
496 GetSmallestLargestSeqno(*(c
->inputs(level_index
)), &smallest_seqno
,
500 } else if (prev_smallest_seqno
> 0) {
501 // A level is considered as the bottommost level if there are
502 // no files in higher levels or if files in higher levels do
503 // not overlap with the files being compacted. Sequence numbers
504 // of files in bottommost level can be set to 0 to help
505 // compression. As a result, the following assert may not hold
506 // if the prev_smallest_seqno is 0.
507 assert(prev_smallest_seqno
> largest_seqno
);
509 prev_smallest_seqno
= smallest_seqno
;
514 RecordInHistogram(ioptions_
.statistics
, NUM_FILES_IN_SINGLE_COMPACTION
,
515 c
->inputs(0)->size());
517 picker_
->RegisterCompaction(c
);
518 vstorage_
->ComputeCompactionScore(ioptions_
, mutable_cf_options_
);
520 TEST_SYNC_POINT_CALLBACK("UniversalCompactionBuilder::PickCompaction:Return",
525 uint32_t UniversalCompactionBuilder::GetPathId(
526 const ImmutableCFOptions
& ioptions
,
527 const MutableCFOptions
& mutable_cf_options
, uint64_t file_size
) {
528 // Two conditions need to be satisfied:
529 // (1) the target path needs to be able to hold the file's size
530 // (2) Total size left in this and previous paths need to be not
531 // smaller than expected future file size before this new file is
532 // compacted, which is estimated based on size_ratio.
533 // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
534 // we will make sure the target file, probably with size of 16, will be
535 // placed in a path so that eventually when new files are generated and
536 // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
537 // before the path we chose.
539 // TODO(sdong): now the case of multiple column families is not
540 // considered in this algorithm. So the target size can be violated in
541 // that case. We need to improve it.
542 uint64_t accumulated_size
= 0;
543 uint64_t future_size
=
545 (100 - mutable_cf_options
.compaction_options_universal
.size_ratio
) / 100;
547 assert(!ioptions
.cf_paths
.empty());
548 for (; p
< ioptions
.cf_paths
.size() - 1; p
++) {
549 uint64_t target_size
= ioptions
.cf_paths
[p
].target_size
;
550 if (target_size
> file_size
&&
551 accumulated_size
+ (target_size
- file_size
) > future_size
) {
554 accumulated_size
+= target_size
;
560 // Consider compaction files based on their size differences with
561 // the next file in time order.
563 Compaction
* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
564 unsigned int ratio
, unsigned int max_number_of_files_to_compact
) {
565 unsigned int min_merge_width
=
566 mutable_cf_options_
.compaction_options_universal
.min_merge_width
;
567 unsigned int max_merge_width
=
568 mutable_cf_options_
.compaction_options_universal
.max_merge_width
;
570 const SortedRun
* sr
= nullptr;
572 size_t start_index
= 0;
573 unsigned int candidate_count
= 0;
575 unsigned int max_files_to_compact
=
576 std::min(max_merge_width
, max_number_of_files_to_compact
);
577 min_merge_width
= std::max(min_merge_width
, 2U);
579 // Caller checks the size before executing this function. This invariant is
580 // important because otherwise we may have a possible integer underflow when
581 // dealing with unsigned types.
582 assert(sorted_runs_
.size() > 0);
584 // Considers a candidate file only if it is smaller than the
585 // total size accumulated so far.
586 for (size_t loop
= 0; loop
< sorted_runs_
.size(); loop
++) {
589 // Skip files that are already being compacted
590 for (sr
= nullptr; loop
< sorted_runs_
.size(); loop
++) {
591 sr
= &sorted_runs_
[loop
];
593 if (!sr
->being_compacted
) {
597 char file_num_buf
[kFormatFileNumberBufSize
];
598 sr
->Dump(file_num_buf
, sizeof(file_num_buf
));
599 ROCKS_LOG_BUFFER(log_buffer_
,
601 "[%d] being compacted, skipping",
602 cf_name_
.c_str(), file_num_buf
, loop
);
607 // This file is not being compacted. Consider it as the
608 // first candidate to be compacted.
609 uint64_t candidate_size
= sr
!= nullptr ? sr
->compensated_file_size
: 0;
611 char file_num_buf
[kFormatFileNumberBufSize
];
612 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
613 ROCKS_LOG_BUFFER(log_buffer_
,
614 "[%s] Universal: Possible candidate %s[%d].",
615 cf_name_
.c_str(), file_num_buf
, loop
);
618 // Check if the succeeding files need compaction.
619 for (size_t i
= loop
+ 1;
620 candidate_count
< max_files_to_compact
&& i
< sorted_runs_
.size();
622 const SortedRun
* succeeding_sr
= &sorted_runs_
[i
];
623 if (succeeding_sr
->being_compacted
) {
626 // Pick files if the total/last candidate file size (increased by the
627 // specified ratio) is still larger than the next candidate file.
628 // candidate_size is the total size of files picked so far with the
629 // default kCompactionStopStyleTotalSize; with
630 // kCompactionStopStyleSimilarSize, it's simply the size of the last
632 double sz
= candidate_size
* (100.0 + ratio
) / 100.0;
633 if (sz
< static_cast<double>(succeeding_sr
->size
)) {
636 if (mutable_cf_options_
.compaction_options_universal
.stop_style
==
637 kCompactionStopStyleSimilarSize
) {
638 // Similar-size stopping rule: also check the last picked file isn't
639 // far larger than the next candidate file.
640 sz
= (succeeding_sr
->size
* (100.0 + ratio
)) / 100.0;
641 if (sz
< static_cast<double>(candidate_size
)) {
642 // If the small file we've encountered begins a run of similar-size
643 // files, we'll pick them up on a future iteration of the outer
644 // loop. If it's some lonely straggler, it'll eventually get picked
645 // by the last-resort read amp strategy which disregards size ratios.
648 candidate_size
= succeeding_sr
->compensated_file_size
;
649 } else { // default kCompactionStopStyleTotalSize
650 candidate_size
+= succeeding_sr
->compensated_file_size
;
655 // Found a series of consecutive files that need compaction.
656 if (candidate_count
>= (unsigned int)min_merge_width
) {
661 for (size_t i
= loop
;
662 i
< loop
+ candidate_count
&& i
< sorted_runs_
.size(); i
++) {
663 const SortedRun
* skipping_sr
= &sorted_runs_
[i
];
664 char file_num_buf
[256];
665 skipping_sr
->DumpSizeInfo(file_num_buf
, sizeof(file_num_buf
), loop
);
666 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: Skipping %s",
667 cf_name_
.c_str(), file_num_buf
);
671 if (!done
|| candidate_count
<= 1) {
674 size_t first_index_after
= start_index
+ candidate_count
;
675 // Compression is enabled if files compacted earlier already reached
676 // size ratio of compression.
677 bool enable_compression
= true;
678 int ratio_to_compress
=
679 mutable_cf_options_
.compaction_options_universal
.compression_size_percent
;
680 if (ratio_to_compress
>= 0) {
681 uint64_t total_size
= 0;
682 for (auto& sorted_run
: sorted_runs_
) {
683 total_size
+= sorted_run
.compensated_file_size
;
686 uint64_t older_file_size
= 0;
687 for (size_t i
= sorted_runs_
.size() - 1; i
>= first_index_after
; i
--) {
688 older_file_size
+= sorted_runs_
[i
].size
;
689 if (older_file_size
* 100L >= total_size
* (long)ratio_to_compress
) {
690 enable_compression
= false;
696 uint64_t estimated_total_size
= 0;
697 for (unsigned int i
= 0; i
< first_index_after
; i
++) {
698 estimated_total_size
+= sorted_runs_
[i
].size
;
701 GetPathId(ioptions_
, mutable_cf_options_
, estimated_total_size
);
702 int start_level
= sorted_runs_
[start_index
].level
;
704 if (first_index_after
== sorted_runs_
.size()) {
705 output_level
= vstorage_
->num_levels() - 1;
706 } else if (sorted_runs_
[first_index_after
].level
== 0) {
709 output_level
= sorted_runs_
[first_index_after
].level
- 1;
712 // last level is reserved for the files ingested behind
713 if (ioptions_
.allow_ingest_behind
&&
714 (output_level
== vstorage_
->num_levels() - 1)) {
715 assert(output_level
> 1);
719 std::vector
<CompactionInputFiles
> inputs(vstorage_
->num_levels());
720 for (size_t i
= 0; i
< inputs
.size(); ++i
) {
721 inputs
[i
].level
= start_level
+ static_cast<int>(i
);
723 for (size_t i
= start_index
; i
< first_index_after
; i
++) {
724 auto& picking_sr
= sorted_runs_
[i
];
725 if (picking_sr
.level
== 0) {
726 FileMetaData
* picking_file
= picking_sr
.file
;
727 inputs
[0].files
.push_back(picking_file
);
729 auto& files
= inputs
[picking_sr
.level
- start_level
].files
;
730 for (auto* f
: vstorage_
->LevelFiles(picking_sr
.level
)) {
734 char file_num_buf
[256];
735 picking_sr
.DumpSizeInfo(file_num_buf
, sizeof(file_num_buf
), i
);
736 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: Picking %s",
737 cf_name_
.c_str(), file_num_buf
);
740 CompactionReason compaction_reason
;
741 if (max_number_of_files_to_compact
== UINT_MAX
) {
742 compaction_reason
= CompactionReason::kUniversalSizeRatio
;
744 compaction_reason
= CompactionReason::kUniversalSortedRunNum
;
746 return new Compaction(
747 vstorage_
, ioptions_
, mutable_cf_options_
, std::move(inputs
),
749 MaxFileSizeForLevel(mutable_cf_options_
, output_level
,
750 kCompactionStyleUniversal
),
752 GetCompressionType(ioptions_
, vstorage_
, mutable_cf_options_
, start_level
,
753 1, enable_compression
),
754 GetCompressionOptions(ioptions_
, vstorage_
, start_level
,
756 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
757 score_
, false /* deletion_compaction */, compaction_reason
);
760 // Look at overall size amplification. If size amplification
761 // exceeeds the configured value, then do a compaction
762 // of the candidate files all the way upto the earliest
763 // base file (overrides configured values of file-size ratios,
764 // min_merge_width and max_merge_width).
766 Compaction
* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
767 // percentage flexibility while reducing size amplification
768 uint64_t ratio
= mutable_cf_options_
.compaction_options_universal
769 .max_size_amplification_percent
;
771 unsigned int candidate_count
= 0;
772 uint64_t candidate_size
= 0;
773 size_t start_index
= 0;
774 const SortedRun
* sr
= nullptr;
776 assert(!sorted_runs_
.empty());
777 if (sorted_runs_
.back().being_compacted
) {
781 // Skip files that are already being compacted
782 for (size_t loop
= 0; loop
< sorted_runs_
.size() - 1; loop
++) {
783 sr
= &sorted_runs_
[loop
];
784 if (!sr
->being_compacted
) {
785 start_index
= loop
; // Consider this as the first candidate.
788 char file_num_buf
[kFormatFileNumberBufSize
];
789 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
790 ROCKS_LOG_BUFFER(log_buffer_
,
791 "[%s] Universal: skipping %s[%d] compacted %s",
792 cf_name_
.c_str(), file_num_buf
, loop
,
793 " cannot be a candidate to reduce size amp.\n");
798 return nullptr; // no candidate files
801 char file_num_buf
[kFormatFileNumberBufSize
];
802 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
805 "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt
"] %s",
806 cf_name_
.c_str(), file_num_buf
, start_index
, " to reduce size amp.\n");
809 // keep adding up all the remaining files
810 for (size_t loop
= start_index
; loop
< sorted_runs_
.size() - 1; loop
++) {
811 sr
= &sorted_runs_
[loop
];
812 if (sr
->being_compacted
) {
813 char file_num_buf
[kFormatFileNumberBufSize
];
814 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
816 log_buffer_
, "[%s] Universal: Possible candidate %s[%d] %s",
817 cf_name_
.c_str(), file_num_buf
, start_index
,
818 " is already being compacted. No size amp reduction possible.\n");
821 candidate_size
+= sr
->compensated_file_size
;
824 if (candidate_count
== 0) {
828 // size of earliest file
829 uint64_t earliest_file_size
= sorted_runs_
.back().size
;
831 // size amplification = percentage of additional size
832 if (candidate_size
* 100 < ratio
* earliest_file_size
) {
835 "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
836 " earliest-file-size %" PRIu64
,
837 cf_name_
.c_str(), candidate_size
, earliest_file_size
);
842 "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
843 " earliest-file-size %" PRIu64
,
844 cf_name_
.c_str(), candidate_size
, earliest_file_size
);
846 return PickCompactionToOldest(start_index
,
847 CompactionReason::kUniversalSizeAmplification
);
850 // Pick files marked for compaction. Typically, files are marked by
851 // CompactOnDeleteCollector due to the presence of tombstones.
852 Compaction
* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
853 CompactionInputFiles start_level_inputs
;
855 std::vector
<CompactionInputFiles
> inputs
;
857 if (vstorage_
->num_levels() == 1) {
858 // This is single level universal. Since we're basically trying to reclaim
859 // space by processing files marked for compaction due to high tombstone
860 // density, let's do the same thing as compaction to reduce size amp which
861 // has the same goals.
862 bool compact
= false;
864 start_level_inputs
.level
= 0;
865 start_level_inputs
.files
.clear();
867 for (FileMetaData
* f
: vstorage_
->LevelFiles(0)) {
868 if (f
->marked_for_compaction
) {
872 start_level_inputs
.files
.push_back(f
);
875 if (start_level_inputs
.size() <= 1) {
876 // If only the last file in L0 is marked for compaction, ignore it
879 inputs
.push_back(start_level_inputs
);
883 // For multi-level universal, the strategy is to make this look more like
884 // leveled. We pick one of the files marked for compaction and compact with
885 // overlapping files in the adjacent level.
886 picker_
->PickFilesMarkedForCompaction(cf_name_
, vstorage_
, &start_level
,
887 &output_level
, &start_level_inputs
);
888 if (start_level_inputs
.empty()) {
892 // Pick the first non-empty level after the start_level
893 for (output_level
= start_level
+ 1; output_level
< vstorage_
->num_levels();
895 if (vstorage_
->NumLevelFiles(output_level
) != 0) {
900 // If all higher levels are empty, pick the highest level as output level
901 if (output_level
== vstorage_
->num_levels()) {
902 if (start_level
== 0) {
903 output_level
= vstorage_
->num_levels() - 1;
905 // If start level is non-zero and all higher levels are empty, this
906 // compaction will translate into a trivial move. Since the idea is
907 // to reclaim space and trivial move doesn't help with that, we
908 // skip compaction in this case and return nullptr
912 if (ioptions_
.allow_ingest_behind
&&
913 output_level
== vstorage_
->num_levels() - 1) {
914 assert(output_level
> 1);
918 if (output_level
!= 0) {
919 if (start_level
== 0) {
920 if (!picker_
->GetOverlappingL0Files(vstorage_
, &start_level_inputs
,
921 output_level
, nullptr)) {
926 CompactionInputFiles output_level_inputs
;
927 int parent_index
= -1;
929 output_level_inputs
.level
= output_level
;
930 if (!picker_
->SetupOtherInputs(cf_name_
, mutable_cf_options_
, vstorage_
,
931 &start_level_inputs
, &output_level_inputs
,
932 &parent_index
, -1)) {
935 inputs
.push_back(start_level_inputs
);
936 if (!output_level_inputs
.empty()) {
937 inputs
.push_back(output_level_inputs
);
939 if (picker_
->FilesRangeOverlapWithCompaction(inputs
, output_level
)) {
943 inputs
.push_back(start_level_inputs
);
947 uint64_t estimated_total_size
= 0;
948 // Use size of the output level as estimated file size
949 for (FileMetaData
* f
: vstorage_
->LevelFiles(output_level
)) {
950 estimated_total_size
+= f
->fd
.GetFileSize();
953 GetPathId(ioptions_
, mutable_cf_options_
, estimated_total_size
);
954 return new Compaction(
955 vstorage_
, ioptions_
, mutable_cf_options_
, std::move(inputs
),
957 MaxFileSizeForLevel(mutable_cf_options_
, output_level
,
958 kCompactionStyleUniversal
),
959 /* max_grandparent_overlap_bytes */ LLONG_MAX
, path_id
,
960 GetCompressionType(ioptions_
, vstorage_
, mutable_cf_options_
,
962 GetCompressionOptions(ioptions_
, vstorage_
, output_level
),
963 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true,
964 score_
, false /* deletion_compaction */,
965 CompactionReason::kFilesMarkedForCompaction
);
968 Compaction
* UniversalCompactionBuilder::PickCompactionToOldest(
969 size_t start_index
, CompactionReason compaction_reason
) {
970 assert(start_index
< sorted_runs_
.size());
972 // Estimate total file size
973 uint64_t estimated_total_size
= 0;
974 for (size_t loop
= start_index
; loop
< sorted_runs_
.size(); loop
++) {
975 estimated_total_size
+= sorted_runs_
[loop
].size
;
978 GetPathId(ioptions_
, mutable_cf_options_
, estimated_total_size
);
979 int start_level
= sorted_runs_
[start_index
].level
;
981 std::vector
<CompactionInputFiles
> inputs(vstorage_
->num_levels());
982 for (size_t i
= 0; i
< inputs
.size(); ++i
) {
983 inputs
[i
].level
= start_level
+ static_cast<int>(i
);
985 for (size_t loop
= start_index
; loop
< sorted_runs_
.size(); loop
++) {
986 auto& picking_sr
= sorted_runs_
[loop
];
987 if (picking_sr
.level
== 0) {
988 FileMetaData
* f
= picking_sr
.file
;
989 inputs
[0].files
.push_back(f
);
991 auto& files
= inputs
[picking_sr
.level
- start_level
].files
;
992 for (auto* f
: vstorage_
->LevelFiles(picking_sr
.level
)) {
996 std::string comp_reason_print_string
;
997 if (compaction_reason
== CompactionReason::kPeriodicCompaction
) {
998 comp_reason_print_string
= "periodic compaction";
999 } else if (compaction_reason
==
1000 CompactionReason::kUniversalSizeAmplification
) {
1001 comp_reason_print_string
= "size amp";
1006 char file_num_buf
[256];
1007 picking_sr
.DumpSizeInfo(file_num_buf
, sizeof(file_num_buf
), loop
);
1008 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: %s picking %s",
1009 cf_name_
.c_str(), comp_reason_print_string
.c_str(),
1013 // output files at the bottom most level, unless it's reserved
1014 int output_level
= vstorage_
->num_levels() - 1;
1015 // last level is reserved for the files ingested behind
1016 if (ioptions_
.allow_ingest_behind
) {
1017 assert(output_level
> 1);
1021 // We never check size for
1022 // compaction_options_universal.compression_size_percent,
1023 // because we always compact all the files, so always compress.
1024 return new Compaction(
1025 vstorage_
, ioptions_
, mutable_cf_options_
, std::move(inputs
),
1027 MaxFileSizeForLevel(mutable_cf_options_
, output_level
,
1028 kCompactionStyleUniversal
),
1030 GetCompressionType(ioptions_
, vstorage_
, mutable_cf_options_
, start_level
,
1031 1, true /* enable_compression */),
1032 GetCompressionOptions(ioptions_
, vstorage_
, start_level
,
1033 true /* enable_compression */),
1034 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
1035 score_
, false /* deletion_compaction */, compaction_reason
);
1038 Compaction
* UniversalCompactionBuilder::PickPeriodicCompaction() {
1039 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: Periodic Compaction",
1042 // In universal compaction, sorted runs contain older data are almost always
1043 // generated earlier too. To simplify the problem, we just try to trigger
1044 // a full compaction. We start from the oldest sorted run and include
1045 // all sorted runs, until we hit a sorted already being compacted.
1046 // Since usually the largest (which is usually the oldest) sorted run is
1047 // included anyway, doing a full compaction won't increase write
1048 // amplification much.
1050 // Get some information from marked files to check whether a file is
1051 // included in the compaction.
1053 size_t start_index
= sorted_runs_
.size();
1054 while (start_index
> 0 && !sorted_runs_
[start_index
- 1].being_compacted
) {
1057 if (start_index
== sorted_runs_
.size()) {
1061 // There is a rare corner case where we can't pick up all the files
1062 // because some files are being compacted and we end up with picking files
1063 // but none of them need periodic compaction. Unless we simply recompact
1064 // the last sorted run (either the last level or last L0 file), we would just
1065 // execute the compaction, in order to simplify the logic.
1066 if (start_index
== sorted_runs_
.size() - 1) {
1067 bool included_file_marked
= false;
1068 int start_level
= sorted_runs_
[start_index
].level
;
1069 FileMetaData
* start_file
= sorted_runs_
[start_index
].file
;
1070 for (const std::pair
<int, FileMetaData
*>& level_file_pair
:
1071 vstorage_
->FilesMarkedForPeriodicCompaction()) {
1072 if (start_level
!= 0) {
1073 // Last sorted run is a level
1074 if (start_level
== level_file_pair
.first
) {
1075 included_file_marked
= true;
1079 // Last sorted run is a L0 file.
1080 if (start_file
== level_file_pair
.second
) {
1081 included_file_marked
= true;
1086 if (!included_file_marked
) {
1087 ROCKS_LOG_BUFFER(log_buffer_
,
1088 "[%s] Universal: Cannot form a compaction covering file "
1089 "marked for periodic compaction",
1095 Compaction
* c
= PickCompactionToOldest(start_index
,
1096 CompactionReason::kPeriodicCompaction
);
1098 TEST_SYNC_POINT_CALLBACK(
1099 "UniversalCompactionPicker::PickPeriodicCompaction:Return", c
);
1103 } // namespace ROCKSDB_NAMESPACE
1105 #endif // !ROCKSDB_LITE