1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/compaction/compaction_picker_universal.h"
18 #include "db/column_family.h"
19 #include "file/filename.h"
20 #include "logging/log_buffer.h"
21 #include "monitoring/statistics.h"
22 #include "test_util/sync_point.h"
23 #include "util/random.h"
24 #include "util/string_util.h"
26 namespace ROCKSDB_NAMESPACE
{
28 // A helper class that form universal compactions. The class is used by
29 // UniversalCompactionPicker::PickCompaction().
30 // The usage is to create the class, and get the compaction object by calling
32 class UniversalCompactionBuilder
{
34 UniversalCompactionBuilder(
35 const ImmutableCFOptions
& ioptions
, const InternalKeyComparator
* icmp
,
36 const std::string
& cf_name
, const MutableCFOptions
& mutable_cf_options
,
37 const MutableDBOptions
& mutable_db_options
, VersionStorageInfo
* vstorage
,
38 UniversalCompactionPicker
* picker
, LogBuffer
* log_buffer
)
39 : ioptions_(ioptions
),
42 mutable_cf_options_(mutable_cf_options
),
43 mutable_db_options_(mutable_db_options
),
46 log_buffer_(log_buffer
) {}
48 // Form and return the compaction object. The caller owns return object.
49 Compaction
* PickCompaction();
53 SortedRun(int _level
, FileMetaData
* _file
, uint64_t _size
,
54 uint64_t _compensated_file_size
, bool _being_compacted
)
58 compensated_file_size(_compensated_file_size
),
59 being_compacted(_being_compacted
) {
60 assert(compensated_file_size
> 0);
61 assert(level
!= 0 || file
!= nullptr);
64 void Dump(char* out_buf
, size_t out_buf_size
,
65 bool print_path
= false) const;
67 // sorted_run_count is added into the string to print
68 void DumpSizeInfo(char* out_buf
, size_t out_buf_size
,
69 size_t sorted_run_count
) const;
72 // `file` Will be null for level > 0. For level = 0, the sorted run is
75 // For level > 0, `size` and `compensated_file_size` are sum of sizes all
76 // files in the level. `being_compacted` should be the same for all files
77 // in a non-zero level. Use the value here.
79 uint64_t compensated_file_size
;
83 // Pick Universal compaction to limit read amplification
84 Compaction
* PickCompactionToReduceSortedRuns(
85 unsigned int ratio
, unsigned int max_number_of_files_to_compact
);
87 // Pick Universal compaction to limit space amplification.
88 Compaction
* PickCompactionToReduceSizeAmp();
90 Compaction
* PickDeleteTriggeredCompaction();
92 // Form a compaction from the sorted run indicated by start_index to the
94 // The caller is responsible for making sure that those files are not in
96 Compaction
* PickCompactionToOldest(size_t start_index
,
97 CompactionReason compaction_reason
);
99 // Try to pick periodic compaction. The caller should only call it
100 // if there is at least one file marked for periodic compaction.
101 // null will be returned if no such a compaction can be formed
102 // because some files are being compacted.
103 Compaction
* PickPeriodicCompaction();
105 // Used in universal compaction when the enabled_trivial_move
106 // option is set. Checks whether there are any overlapping files
107 // in the input. Returns true if the input files are non
109 bool IsInputFilesNonOverlapping(Compaction
* c
);
111 const ImmutableCFOptions
& ioptions_
;
112 const InternalKeyComparator
* icmp_
;
114 std::vector
<SortedRun
> sorted_runs_
;
115 const std::string
& cf_name_
;
116 const MutableCFOptions
& mutable_cf_options_
;
117 const MutableDBOptions
& mutable_db_options_
;
118 VersionStorageInfo
* vstorage_
;
119 UniversalCompactionPicker
* picker_
;
120 LogBuffer
* log_buffer_
;
122 static std::vector
<SortedRun
> CalculateSortedRuns(
123 const VersionStorageInfo
& vstorage
);
125 // Pick a path ID to place a newly generated file, with its estimated file
127 static uint32_t GetPathId(const ImmutableCFOptions
& ioptions
,
128 const MutableCFOptions
& mutable_cf_options
,
132 // Used in universal compaction when trivial move is enabled.
133 // This structure is used for the construction of min heap
134 // that contains the file meta data, the level of the file
135 // and the index of the file in that level
137 struct InputFileInfo
{
138 InputFileInfo() : f(nullptr), level(0), index(0) {}
145 // Used in universal compaction when trivial move is enabled.
146 // This comparator is used for the construction of min heap
147 // based on the smallest key of the file.
148 struct SmallestKeyHeapComparator
{
149 explicit SmallestKeyHeapComparator(const Comparator
* ucmp
) { ucmp_
= ucmp
; }
151 bool operator()(InputFileInfo i1
, InputFileInfo i2
) const {
152 return (ucmp_
->Compare(i1
.f
->smallest
.user_key(),
153 i2
.f
->smallest
.user_key()) > 0);
157 const Comparator
* ucmp_
;
160 typedef std::priority_queue
<InputFileInfo
, std::vector
<InputFileInfo
>,
161 SmallestKeyHeapComparator
>
164 // This function creates the heap that is used to find if the files are
165 // overlapping during universal compaction when the allow_trivial_move
167 SmallestKeyHeap
create_level_heap(Compaction
* c
, const Comparator
* ucmp
) {
168 SmallestKeyHeap smallest_key_priority_q
=
169 SmallestKeyHeap(SmallestKeyHeapComparator(ucmp
));
171 InputFileInfo input_file
;
173 for (size_t l
= 0; l
< c
->num_input_levels(); l
++) {
174 if (c
->num_input_files(l
) != 0) {
175 if (l
== 0 && c
->start_level() == 0) {
176 for (size_t i
= 0; i
< c
->num_input_files(0); i
++) {
177 input_file
.f
= c
->input(0, i
);
178 input_file
.level
= 0;
179 input_file
.index
= i
;
180 smallest_key_priority_q
.push(std::move(input_file
));
183 input_file
.f
= c
->input(l
, 0);
184 input_file
.level
= l
;
185 input_file
.index
= 0;
186 smallest_key_priority_q
.push(std::move(input_file
));
190 return smallest_key_priority_q
;
194 // smallest_seqno and largest_seqno are set iff. `files` is not empty.
195 void GetSmallestLargestSeqno(const std::vector
<FileMetaData
*>& files
,
196 SequenceNumber
* smallest_seqno
,
197 SequenceNumber
* largest_seqno
) {
198 bool is_first
= true;
199 for (FileMetaData
* f
: files
) {
200 assert(f
->fd
.smallest_seqno
<= f
->fd
.largest_seqno
);
203 *smallest_seqno
= f
->fd
.smallest_seqno
;
204 *largest_seqno
= f
->fd
.largest_seqno
;
206 if (f
->fd
.smallest_seqno
< *smallest_seqno
) {
207 *smallest_seqno
= f
->fd
.smallest_seqno
;
209 if (f
->fd
.largest_seqno
> *largest_seqno
) {
210 *largest_seqno
= f
->fd
.largest_seqno
;
218 // Algorithm that checks to see if there are any overlapping
219 // files in the input
220 bool UniversalCompactionBuilder::IsInputFilesNonOverlapping(Compaction
* c
) {
221 auto comparator
= icmp_
->user_comparator();
224 InputFileInfo prev
, curr
, next
;
226 SmallestKeyHeap smallest_key_priority_q
=
227 create_level_heap(c
, icmp_
->user_comparator());
229 while (!smallest_key_priority_q
.empty()) {
230 curr
= smallest_key_priority_q
.top();
231 smallest_key_priority_q
.pop();
237 if (comparator
->Compare(prev
.f
->largest
.user_key(),
238 curr
.f
->smallest
.user_key()) >= 0) {
239 // found overlapping files, return false
242 assert(comparator
->Compare(curr
.f
->largest
.user_key(),
243 prev
.f
->largest
.user_key()) > 0);
249 if (c
->level(curr
.level
) != 0 &&
250 curr
.index
< c
->num_input_files(curr
.level
) - 1) {
251 next
.f
= c
->input(curr
.level
, curr
.index
+ 1);
252 next
.level
= curr
.level
;
253 next
.index
= curr
.index
+ 1;
257 smallest_key_priority_q
.push(std::move(next
));
263 bool UniversalCompactionPicker::NeedsCompaction(
264 const VersionStorageInfo
* vstorage
) const {
265 const int kLevel0
= 0;
266 if (vstorage
->CompactionScore(kLevel0
) >= 1) {
269 if (!vstorage
->FilesMarkedForPeriodicCompaction().empty()) {
272 if (!vstorage
->FilesMarkedForCompaction().empty()) {
278 Compaction
* UniversalCompactionPicker::PickCompaction(
279 const std::string
& cf_name
, const MutableCFOptions
& mutable_cf_options
,
280 const MutableDBOptions
& mutable_db_options
, VersionStorageInfo
* vstorage
,
281 LogBuffer
* log_buffer
, SequenceNumber
/* earliest_memtable_seqno */) {
282 UniversalCompactionBuilder
builder(ioptions_
, icmp_
, cf_name
,
283 mutable_cf_options
, mutable_db_options
,
284 vstorage
, this, log_buffer
);
285 return builder
.PickCompaction();
288 void UniversalCompactionBuilder::SortedRun::Dump(char* out_buf
,
290 bool print_path
) const {
292 assert(file
!= nullptr);
293 if (file
->fd
.GetPathId() == 0 || !print_path
) {
294 snprintf(out_buf
, out_buf_size
, "file %" PRIu64
, file
->fd
.GetNumber());
296 snprintf(out_buf
, out_buf_size
, "file %" PRIu64
299 file
->fd
.GetNumber(), file
->fd
.GetPathId());
302 snprintf(out_buf
, out_buf_size
, "level %d", level
);
306 void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
307 char* out_buf
, size_t out_buf_size
, size_t sorted_run_count
) const {
309 assert(file
!= nullptr);
310 snprintf(out_buf
, out_buf_size
,
311 "file %" PRIu64
"[%" ROCKSDB_PRIszt
313 "with size %" PRIu64
" (compensated size %" PRIu64
")",
314 file
->fd
.GetNumber(), sorted_run_count
, file
->fd
.GetFileSize(),
315 file
->compensated_file_size
);
317 snprintf(out_buf
, out_buf_size
,
318 "level %d[%" ROCKSDB_PRIszt
320 "with size %" PRIu64
" (compensated size %" PRIu64
")",
321 level
, sorted_run_count
, size
, compensated_file_size
);
325 std::vector
<UniversalCompactionBuilder::SortedRun
>
326 UniversalCompactionBuilder::CalculateSortedRuns(
327 const VersionStorageInfo
& vstorage
) {
328 std::vector
<UniversalCompactionBuilder::SortedRun
> ret
;
329 for (FileMetaData
* f
: vstorage
.LevelFiles(0)) {
330 ret
.emplace_back(0, f
, f
->fd
.GetFileSize(), f
->compensated_file_size
,
333 for (int level
= 1; level
< vstorage
.num_levels(); level
++) {
334 uint64_t total_compensated_size
= 0U;
335 uint64_t total_size
= 0U;
336 bool being_compacted
= false;
337 for (FileMetaData
* f
: vstorage
.LevelFiles(level
)) {
338 total_compensated_size
+= f
->compensated_file_size
;
339 total_size
+= f
->fd
.GetFileSize();
340 // Size amp, read amp and periodic compactions always include all files
341 // for a non-zero level. However, a delete triggered compaction and
342 // a trivial move might pick a subset of files in a sorted run. So
343 // always check all files in a sorted run and mark the entire run as
344 // being compacted if one or more files are being compacted
345 if (f
->being_compacted
) {
346 being_compacted
= f
->being_compacted
;
349 if (total_compensated_size
> 0) {
350 ret
.emplace_back(level
, nullptr, total_size
, total_compensated_size
,
357 // Universal style of compaction. Pick files that are contiguous in
358 // time-range to compact.
359 Compaction
* UniversalCompactionBuilder::PickCompaction() {
360 const int kLevel0
= 0;
361 score_
= vstorage_
->CompactionScore(kLevel0
);
362 sorted_runs_
= CalculateSortedRuns(*vstorage_
);
364 if (sorted_runs_
.size() == 0 ||
365 (vstorage_
->FilesMarkedForPeriodicCompaction().empty() &&
366 vstorage_
->FilesMarkedForCompaction().empty() &&
367 sorted_runs_
.size() < (unsigned int)mutable_cf_options_
368 .level0_file_num_compaction_trigger
)) {
369 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: nothing to do\n",
371 TEST_SYNC_POINT_CALLBACK(
372 "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
375 VersionStorageInfo::LevelSummaryStorage tmp
;
376 ROCKS_LOG_BUFFER_MAX_SZ(
378 "[%s] Universal: sorted runs: %" ROCKSDB_PRIszt
" files: %s\n",
379 cf_name_
.c_str(), sorted_runs_
.size(), vstorage_
->LevelSummary(&tmp
));
381 Compaction
* c
= nullptr;
382 // Periodic compaction has higher priority than other type of compaction
383 // because it's a hard requirement.
384 if (!vstorage_
->FilesMarkedForPeriodicCompaction().empty()) {
385 // Always need to do a full compaction for periodic compaction.
386 c
= PickPeriodicCompaction();
389 // Check for size amplification.
391 sorted_runs_
.size() >=
393 mutable_cf_options_
.level0_file_num_compaction_trigger
)) {
394 if ((c
= PickCompactionToReduceSizeAmp()) != nullptr) {
395 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: compacting for size amp\n",
398 // Size amplification is within limits. Try reducing read
399 // amplification while maintaining file size ratios.
401 mutable_cf_options_
.compaction_options_universal
.size_ratio
;
403 if ((c
= PickCompactionToReduceSortedRuns(ratio
, UINT_MAX
)) != nullptr) {
404 ROCKS_LOG_BUFFER(log_buffer_
,
405 "[%s] Universal: compacting for size ratio\n",
408 // Size amplification and file size ratios are within configured limits.
409 // If max read amplification is exceeding configured limits, then force
410 // compaction without looking at filesize ratios and try to reduce
411 // the number of files to fewer than level0_file_num_compaction_trigger.
412 // This is guaranteed by NeedsCompaction()
413 assert(sorted_runs_
.size() >=
415 mutable_cf_options_
.level0_file_num_compaction_trigger
));
416 // Get the total number of sorted runs that are not being compacted
417 int num_sr_not_compacted
= 0;
418 for (size_t i
= 0; i
< sorted_runs_
.size(); i
++) {
419 if (sorted_runs_
[i
].being_compacted
== false) {
420 num_sr_not_compacted
++;
424 // The number of sorted runs that are not being compacted is greater
425 // than the maximum allowed number of sorted runs
426 if (num_sr_not_compacted
>
427 mutable_cf_options_
.level0_file_num_compaction_trigger
) {
428 unsigned int num_files
=
429 num_sr_not_compacted
-
430 mutable_cf_options_
.level0_file_num_compaction_trigger
+ 1;
431 if ((c
= PickCompactionToReduceSortedRuns(UINT_MAX
, num_files
)) !=
433 ROCKS_LOG_BUFFER(log_buffer_
,
434 "[%s] Universal: compacting for file num -- %u\n",
435 cf_name_
.c_str(), num_files
);
443 if ((c
= PickDeleteTriggeredCompaction()) != nullptr) {
444 ROCKS_LOG_BUFFER(log_buffer_
,
445 "[%s] Universal: delete triggered compaction\n",
451 TEST_SYNC_POINT_CALLBACK(
452 "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
456 if (mutable_cf_options_
.compaction_options_universal
.allow_trivial_move
==
458 c
->compaction_reason() != CompactionReason::kPeriodicCompaction
) {
459 c
->set_is_trivial_move(IsInputFilesNonOverlapping(c
));
462 // validate that all the chosen files of L0 are non overlapping in time
464 bool is_first
= true;
466 size_t level_index
= 0U;
467 if (c
->start_level() == 0) {
468 for (auto f
: *c
->inputs(0)) {
469 assert(f
->fd
.smallest_seqno
<= f
->fd
.largest_seqno
);
476 for (; level_index
< c
->num_input_levels(); level_index
++) {
477 if (c
->num_input_files(level_index
) != 0) {
478 SequenceNumber smallest_seqno
= 0U;
479 SequenceNumber largest_seqno
= 0U;
480 GetSmallestLargestSeqno(*(c
->inputs(level_index
)), &smallest_seqno
,
489 RecordInHistogram(ioptions_
.statistics
, NUM_FILES_IN_SINGLE_COMPACTION
,
490 c
->inputs(0)->size());
492 picker_
->RegisterCompaction(c
);
493 vstorage_
->ComputeCompactionScore(ioptions_
, mutable_cf_options_
);
495 TEST_SYNC_POINT_CALLBACK("UniversalCompactionBuilder::PickCompaction:Return",
500 uint32_t UniversalCompactionBuilder::GetPathId(
501 const ImmutableCFOptions
& ioptions
,
502 const MutableCFOptions
& mutable_cf_options
, uint64_t file_size
) {
503 // Two conditions need to be satisfied:
504 // (1) the target path needs to be able to hold the file's size
505 // (2) Total size left in this and previous paths need to be not
506 // smaller than expected future file size before this new file is
507 // compacted, which is estimated based on size_ratio.
508 // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
509 // we will make sure the target file, probably with size of 16, will be
510 // placed in a path so that eventually when new files are generated and
511 // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
512 // before the path we chose.
514 // TODO(sdong): now the case of multiple column families is not
515 // considered in this algorithm. So the target size can be violated in
516 // that case. We need to improve it.
517 uint64_t accumulated_size
= 0;
518 uint64_t future_size
=
520 (100 - mutable_cf_options
.compaction_options_universal
.size_ratio
) / 100;
522 assert(!ioptions
.cf_paths
.empty());
523 for (; p
< ioptions
.cf_paths
.size() - 1; p
++) {
524 uint64_t target_size
= ioptions
.cf_paths
[p
].target_size
;
525 if (target_size
> file_size
&&
526 accumulated_size
+ (target_size
- file_size
) > future_size
) {
529 accumulated_size
+= target_size
;
535 // Consider compaction files based on their size differences with
536 // the next file in time order.
538 Compaction
* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
539 unsigned int ratio
, unsigned int max_number_of_files_to_compact
) {
540 unsigned int min_merge_width
=
541 mutable_cf_options_
.compaction_options_universal
.min_merge_width
;
542 unsigned int max_merge_width
=
543 mutable_cf_options_
.compaction_options_universal
.max_merge_width
;
545 const SortedRun
* sr
= nullptr;
547 size_t start_index
= 0;
548 unsigned int candidate_count
= 0;
550 unsigned int max_files_to_compact
=
551 std::min(max_merge_width
, max_number_of_files_to_compact
);
552 min_merge_width
= std::max(min_merge_width
, 2U);
554 // Caller checks the size before executing this function. This invariant is
555 // important because otherwise we may have a possible integer underflow when
556 // dealing with unsigned types.
557 assert(sorted_runs_
.size() > 0);
559 // Considers a candidate file only if it is smaller than the
560 // total size accumulated so far.
561 for (size_t loop
= 0; loop
< sorted_runs_
.size(); loop
++) {
564 // Skip files that are already being compacted
565 for (sr
= nullptr; loop
< sorted_runs_
.size(); loop
++) {
566 sr
= &sorted_runs_
[loop
];
568 if (!sr
->being_compacted
) {
572 char file_num_buf
[kFormatFileNumberBufSize
];
573 sr
->Dump(file_num_buf
, sizeof(file_num_buf
));
574 ROCKS_LOG_BUFFER(log_buffer_
,
576 "[%d] being compacted, skipping",
577 cf_name_
.c_str(), file_num_buf
, loop
);
582 // This file is not being compacted. Consider it as the
583 // first candidate to be compacted.
584 uint64_t candidate_size
= sr
!= nullptr ? sr
->compensated_file_size
: 0;
586 char file_num_buf
[kFormatFileNumberBufSize
];
587 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
588 ROCKS_LOG_BUFFER(log_buffer_
,
589 "[%s] Universal: Possible candidate %s[%d].",
590 cf_name_
.c_str(), file_num_buf
, loop
);
593 // Check if the succeeding files need compaction.
594 for (size_t i
= loop
+ 1;
595 candidate_count
< max_files_to_compact
&& i
< sorted_runs_
.size();
597 const SortedRun
* succeeding_sr
= &sorted_runs_
[i
];
598 if (succeeding_sr
->being_compacted
) {
601 // Pick files if the total/last candidate file size (increased by the
602 // specified ratio) is still larger than the next candidate file.
603 // candidate_size is the total size of files picked so far with the
604 // default kCompactionStopStyleTotalSize; with
605 // kCompactionStopStyleSimilarSize, it's simply the size of the last
607 double sz
= candidate_size
* (100.0 + ratio
) / 100.0;
608 if (sz
< static_cast<double>(succeeding_sr
->size
)) {
611 if (mutable_cf_options_
.compaction_options_universal
.stop_style
==
612 kCompactionStopStyleSimilarSize
) {
613 // Similar-size stopping rule: also check the last picked file isn't
614 // far larger than the next candidate file.
615 sz
= (succeeding_sr
->size
* (100.0 + ratio
)) / 100.0;
616 if (sz
< static_cast<double>(candidate_size
)) {
617 // If the small file we've encountered begins a run of similar-size
618 // files, we'll pick them up on a future iteration of the outer
619 // loop. If it's some lonely straggler, it'll eventually get picked
620 // by the last-resort read amp strategy which disregards size ratios.
623 candidate_size
= succeeding_sr
->compensated_file_size
;
624 } else { // default kCompactionStopStyleTotalSize
625 candidate_size
+= succeeding_sr
->compensated_file_size
;
630 // Found a series of consecutive files that need compaction.
631 if (candidate_count
>= (unsigned int)min_merge_width
) {
636 for (size_t i
= loop
;
637 i
< loop
+ candidate_count
&& i
< sorted_runs_
.size(); i
++) {
638 const SortedRun
* skipping_sr
= &sorted_runs_
[i
];
639 char file_num_buf
[256];
640 skipping_sr
->DumpSizeInfo(file_num_buf
, sizeof(file_num_buf
), loop
);
641 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: Skipping %s",
642 cf_name_
.c_str(), file_num_buf
);
646 if (!done
|| candidate_count
<= 1) {
649 size_t first_index_after
= start_index
+ candidate_count
;
650 // Compression is enabled if files compacted earlier already reached
651 // size ratio of compression.
652 bool enable_compression
= true;
653 int ratio_to_compress
=
654 mutable_cf_options_
.compaction_options_universal
.compression_size_percent
;
655 if (ratio_to_compress
>= 0) {
656 uint64_t total_size
= 0;
657 for (auto& sorted_run
: sorted_runs_
) {
658 total_size
+= sorted_run
.compensated_file_size
;
661 uint64_t older_file_size
= 0;
662 for (size_t i
= sorted_runs_
.size() - 1; i
>= first_index_after
; i
--) {
663 older_file_size
+= sorted_runs_
[i
].size
;
664 if (older_file_size
* 100L >= total_size
* (long)ratio_to_compress
) {
665 enable_compression
= false;
671 uint64_t estimated_total_size
= 0;
672 for (unsigned int i
= 0; i
< first_index_after
; i
++) {
673 estimated_total_size
+= sorted_runs_
[i
].size
;
676 GetPathId(ioptions_
, mutable_cf_options_
, estimated_total_size
);
677 int start_level
= sorted_runs_
[start_index
].level
;
679 if (first_index_after
== sorted_runs_
.size()) {
680 output_level
= vstorage_
->num_levels() - 1;
681 } else if (sorted_runs_
[first_index_after
].level
== 0) {
684 output_level
= sorted_runs_
[first_index_after
].level
- 1;
687 // last level is reserved for the files ingested behind
688 if (ioptions_
.allow_ingest_behind
&&
689 (output_level
== vstorage_
->num_levels() - 1)) {
690 assert(output_level
> 1);
694 std::vector
<CompactionInputFiles
> inputs(vstorage_
->num_levels());
695 for (size_t i
= 0; i
< inputs
.size(); ++i
) {
696 inputs
[i
].level
= start_level
+ static_cast<int>(i
);
698 for (size_t i
= start_index
; i
< first_index_after
; i
++) {
699 auto& picking_sr
= sorted_runs_
[i
];
700 if (picking_sr
.level
== 0) {
701 FileMetaData
* picking_file
= picking_sr
.file
;
702 inputs
[0].files
.push_back(picking_file
);
704 auto& files
= inputs
[picking_sr
.level
- start_level
].files
;
705 for (auto* f
: vstorage_
->LevelFiles(picking_sr
.level
)) {
709 char file_num_buf
[256];
710 picking_sr
.DumpSizeInfo(file_num_buf
, sizeof(file_num_buf
), i
);
711 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: Picking %s",
712 cf_name_
.c_str(), file_num_buf
);
715 CompactionReason compaction_reason
;
716 if (max_number_of_files_to_compact
== UINT_MAX
) {
717 compaction_reason
= CompactionReason::kUniversalSizeRatio
;
719 compaction_reason
= CompactionReason::kUniversalSortedRunNum
;
721 return new Compaction(
722 vstorage_
, ioptions_
, mutable_cf_options_
, mutable_db_options_
,
723 std::move(inputs
), output_level
,
724 MaxFileSizeForLevel(mutable_cf_options_
, output_level
,
725 kCompactionStyleUniversal
),
727 GetCompressionType(ioptions_
, vstorage_
, mutable_cf_options_
, start_level
,
728 1, enable_compression
),
729 GetCompressionOptions(mutable_cf_options_
, vstorage_
, start_level
,
731 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
732 score_
, false /* deletion_compaction */, compaction_reason
);
735 // Look at overall size amplification. If size amplification
736 // exceeeds the configured value, then do a compaction
737 // of the candidate files all the way upto the earliest
738 // base file (overrides configured values of file-size ratios,
739 // min_merge_width and max_merge_width).
741 Compaction
* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
742 // percentage flexibility while reducing size amplification
743 uint64_t ratio
= mutable_cf_options_
.compaction_options_universal
744 .max_size_amplification_percent
;
746 unsigned int candidate_count
= 0;
747 uint64_t candidate_size
= 0;
748 size_t start_index
= 0;
749 const SortedRun
* sr
= nullptr;
751 assert(!sorted_runs_
.empty());
752 if (sorted_runs_
.back().being_compacted
) {
756 // Skip files that are already being compacted
757 for (size_t loop
= 0; loop
+ 1 < sorted_runs_
.size(); loop
++) {
758 sr
= &sorted_runs_
[loop
];
759 if (!sr
->being_compacted
) {
760 start_index
= loop
; // Consider this as the first candidate.
763 char file_num_buf
[kFormatFileNumberBufSize
];
764 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
765 ROCKS_LOG_BUFFER(log_buffer_
,
766 "[%s] Universal: skipping %s[%d] compacted %s",
767 cf_name_
.c_str(), file_num_buf
, loop
,
768 " cannot be a candidate to reduce size amp.\n");
773 return nullptr; // no candidate files
776 char file_num_buf
[kFormatFileNumberBufSize
];
777 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
780 "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt
"] %s",
781 cf_name_
.c_str(), file_num_buf
, start_index
, " to reduce size amp.\n");
784 // keep adding up all the remaining files
785 for (size_t loop
= start_index
; loop
+ 1 < sorted_runs_
.size(); loop
++) {
786 sr
= &sorted_runs_
[loop
];
787 if (sr
->being_compacted
) {
788 char file_num_buf
[kFormatFileNumberBufSize
];
789 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
791 log_buffer_
, "[%s] Universal: Possible candidate %s[%d] %s",
792 cf_name_
.c_str(), file_num_buf
, start_index
,
793 " is already being compacted. No size amp reduction possible.\n");
796 candidate_size
+= sr
->compensated_file_size
;
799 if (candidate_count
== 0) {
803 // size of earliest file
804 uint64_t earliest_file_size
= sorted_runs_
.back().size
;
806 // size amplification = percentage of additional size
807 if (candidate_size
* 100 < ratio
* earliest_file_size
) {
810 "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
811 " earliest-file-size %" PRIu64
,
812 cf_name_
.c_str(), candidate_size
, earliest_file_size
);
817 "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
818 " earliest-file-size %" PRIu64
,
819 cf_name_
.c_str(), candidate_size
, earliest_file_size
);
821 return PickCompactionToOldest(start_index
,
822 CompactionReason::kUniversalSizeAmplification
);
825 // Pick files marked for compaction. Typically, files are marked by
826 // CompactOnDeleteCollector due to the presence of tombstones.
827 Compaction
* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
828 CompactionInputFiles start_level_inputs
;
830 std::vector
<CompactionInputFiles
> inputs
;
832 if (vstorage_
->num_levels() == 1) {
833 // This is single level universal. Since we're basically trying to reclaim
834 // space by processing files marked for compaction due to high tombstone
835 // density, let's do the same thing as compaction to reduce size amp which
836 // has the same goals.
837 int start_index
= -1;
839 start_level_inputs
.level
= 0;
840 start_level_inputs
.files
.clear();
842 // Find the first file marked for compaction. Ignore the last file
843 for (size_t loop
= 0; loop
+ 1 < sorted_runs_
.size(); loop
++) {
844 SortedRun
* sr
= &sorted_runs_
[loop
];
845 if (sr
->being_compacted
) {
848 FileMetaData
* f
= vstorage_
->LevelFiles(0)[loop
];
849 if (f
->marked_for_compaction
) {
850 start_level_inputs
.files
.push_back(f
);
852 static_cast<int>(loop
); // Consider this as the first candidate.
856 if (start_index
< 0) {
857 // Either no file marked, or they're already being compacted
861 for (size_t loop
= start_index
+ 1; loop
< sorted_runs_
.size(); loop
++) {
862 SortedRun
* sr
= &sorted_runs_
[loop
];
863 if (sr
->being_compacted
) {
867 FileMetaData
* f
= vstorage_
->LevelFiles(0)[loop
];
868 start_level_inputs
.files
.push_back(f
);
870 if (start_level_inputs
.size() <= 1) {
871 // If only the last file in L0 is marked for compaction, ignore it
874 inputs
.push_back(start_level_inputs
);
878 // For multi-level universal, the strategy is to make this look more like
879 // leveled. We pick one of the files marked for compaction and compact with
880 // overlapping files in the adjacent level.
881 picker_
->PickFilesMarkedForCompaction(cf_name_
, vstorage_
, &start_level
,
882 &output_level
, &start_level_inputs
);
883 if (start_level_inputs
.empty()) {
887 // Pick the first non-empty level after the start_level
888 for (output_level
= start_level
+ 1; output_level
< vstorage_
->num_levels();
890 if (vstorage_
->NumLevelFiles(output_level
) != 0) {
895 // If all higher levels are empty, pick the highest level as output level
896 if (output_level
== vstorage_
->num_levels()) {
897 if (start_level
== 0) {
898 output_level
= vstorage_
->num_levels() - 1;
900 // If start level is non-zero and all higher levels are empty, this
901 // compaction will translate into a trivial move. Since the idea is
902 // to reclaim space and trivial move doesn't help with that, we
903 // skip compaction in this case and return nullptr
907 if (ioptions_
.allow_ingest_behind
&&
908 output_level
== vstorage_
->num_levels() - 1) {
909 assert(output_level
> 1);
913 if (output_level
!= 0) {
914 if (start_level
== 0) {
915 if (!picker_
->GetOverlappingL0Files(vstorage_
, &start_level_inputs
,
916 output_level
, nullptr)) {
921 CompactionInputFiles output_level_inputs
;
922 int parent_index
= -1;
924 output_level_inputs
.level
= output_level
;
925 if (!picker_
->SetupOtherInputs(cf_name_
, mutable_cf_options_
, vstorage_
,
926 &start_level_inputs
, &output_level_inputs
,
927 &parent_index
, -1)) {
930 inputs
.push_back(start_level_inputs
);
931 if (!output_level_inputs
.empty()) {
932 inputs
.push_back(output_level_inputs
);
934 if (picker_
->FilesRangeOverlapWithCompaction(inputs
, output_level
)) {
938 inputs
.push_back(start_level_inputs
);
942 uint64_t estimated_total_size
= 0;
943 // Use size of the output level as estimated file size
944 for (FileMetaData
* f
: vstorage_
->LevelFiles(output_level
)) {
945 estimated_total_size
+= f
->fd
.GetFileSize();
948 GetPathId(ioptions_
, mutable_cf_options_
, estimated_total_size
);
949 return new Compaction(
950 vstorage_
, ioptions_
, mutable_cf_options_
, mutable_db_options_
,
951 std::move(inputs
), output_level
,
952 MaxFileSizeForLevel(mutable_cf_options_
, output_level
,
953 kCompactionStyleUniversal
),
954 /* max_grandparent_overlap_bytes */ LLONG_MAX
, path_id
,
955 GetCompressionType(ioptions_
, vstorage_
, mutable_cf_options_
,
957 GetCompressionOptions(mutable_cf_options_
, vstorage_
, output_level
),
958 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
959 score_
, false /* deletion_compaction */,
960 CompactionReason::kFilesMarkedForCompaction
);
963 Compaction
* UniversalCompactionBuilder::PickCompactionToOldest(
964 size_t start_index
, CompactionReason compaction_reason
) {
965 assert(start_index
< sorted_runs_
.size());
967 // Estimate total file size
968 uint64_t estimated_total_size
= 0;
969 for (size_t loop
= start_index
; loop
< sorted_runs_
.size(); loop
++) {
970 estimated_total_size
+= sorted_runs_
[loop
].size
;
973 GetPathId(ioptions_
, mutable_cf_options_
, estimated_total_size
);
974 int start_level
= sorted_runs_
[start_index
].level
;
976 std::vector
<CompactionInputFiles
> inputs(vstorage_
->num_levels());
977 for (size_t i
= 0; i
< inputs
.size(); ++i
) {
978 inputs
[i
].level
= start_level
+ static_cast<int>(i
);
980 for (size_t loop
= start_index
; loop
< sorted_runs_
.size(); loop
++) {
981 auto& picking_sr
= sorted_runs_
[loop
];
982 if (picking_sr
.level
== 0) {
983 FileMetaData
* f
= picking_sr
.file
;
984 inputs
[0].files
.push_back(f
);
986 auto& files
= inputs
[picking_sr
.level
- start_level
].files
;
987 for (auto* f
: vstorage_
->LevelFiles(picking_sr
.level
)) {
991 std::string comp_reason_print_string
;
992 if (compaction_reason
== CompactionReason::kPeriodicCompaction
) {
993 comp_reason_print_string
= "periodic compaction";
994 } else if (compaction_reason
==
995 CompactionReason::kUniversalSizeAmplification
) {
996 comp_reason_print_string
= "size amp";
999 comp_reason_print_string
= "unknown: ";
1000 comp_reason_print_string
.append(
1001 std::to_string(static_cast<int>(compaction_reason
)));
1004 char file_num_buf
[256];
1005 picking_sr
.DumpSizeInfo(file_num_buf
, sizeof(file_num_buf
), loop
);
1006 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: %s picking %s",
1007 cf_name_
.c_str(), comp_reason_print_string
.c_str(),
1011 // output files at the bottom most level, unless it's reserved
1012 int output_level
= vstorage_
->num_levels() - 1;
1013 // last level is reserved for the files ingested behind
1014 if (ioptions_
.allow_ingest_behind
) {
1015 assert(output_level
> 1);
1019 // We never check size for
1020 // compaction_options_universal.compression_size_percent,
1021 // because we always compact all the files, so always compress.
1022 return new Compaction(
1023 vstorage_
, ioptions_
, mutable_cf_options_
, mutable_db_options_
,
1024 std::move(inputs
), output_level
,
1025 MaxFileSizeForLevel(mutable_cf_options_
, output_level
,
1026 kCompactionStyleUniversal
),
1028 GetCompressionType(ioptions_
, vstorage_
, mutable_cf_options_
,
1029 output_level
, 1, true /* enable_compression */),
1030 GetCompressionOptions(mutable_cf_options_
, vstorage_
, output_level
,
1031 true /* enable_compression */),
1032 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
1033 score_
, false /* deletion_compaction */, compaction_reason
);
1036 Compaction
* UniversalCompactionBuilder::PickPeriodicCompaction() {
1037 ROCKS_LOG_BUFFER(log_buffer_
, "[%s] Universal: Periodic Compaction",
1040 // In universal compaction, sorted runs contain older data are almost always
1041 // generated earlier too. To simplify the problem, we just try to trigger
1042 // a full compaction. We start from the oldest sorted run and include
1043 // all sorted runs, until we hit a sorted already being compacted.
1044 // Since usually the largest (which is usually the oldest) sorted run is
1045 // included anyway, doing a full compaction won't increase write
1046 // amplification much.
1048 // Get some information from marked files to check whether a file is
1049 // included in the compaction.
1051 size_t start_index
= sorted_runs_
.size();
1052 while (start_index
> 0 && !sorted_runs_
[start_index
- 1].being_compacted
) {
1055 if (start_index
== sorted_runs_
.size()) {
1059 // There is a rare corner case where we can't pick up all the files
1060 // because some files are being compacted and we end up with picking files
1061 // but none of them need periodic compaction. Unless we simply recompact
1062 // the last sorted run (either the last level or last L0 file), we would just
1063 // execute the compaction, in order to simplify the logic.
1064 if (start_index
== sorted_runs_
.size() - 1) {
1065 bool included_file_marked
= false;
1066 int start_level
= sorted_runs_
[start_index
].level
;
1067 FileMetaData
* start_file
= sorted_runs_
[start_index
].file
;
1068 for (const std::pair
<int, FileMetaData
*>& level_file_pair
:
1069 vstorage_
->FilesMarkedForPeriodicCompaction()) {
1070 if (start_level
!= 0) {
1071 // Last sorted run is a level
1072 if (start_level
== level_file_pair
.first
) {
1073 included_file_marked
= true;
1077 // Last sorted run is a L0 file.
1078 if (start_file
== level_file_pair
.second
) {
1079 included_file_marked
= true;
1084 if (!included_file_marked
) {
1085 ROCKS_LOG_BUFFER(log_buffer_
,
1086 "[%s] Universal: Cannot form a compaction covering file "
1087 "marked for periodic compaction",
1093 Compaction
* c
= PickCompactionToOldest(start_index
,
1094 CompactionReason::kPeriodicCompaction
);
1096 TEST_SYNC_POINT_CALLBACK(
1097 "UniversalCompactionPicker::PickPeriodicCompaction:Return", c
);
1101 } // namespace ROCKSDB_NAMESPACE
1103 #endif // !ROCKSDB_LITE