1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/compaction_picker_universal.h"
13 #ifndef __STDC_FORMAT_MACROS
14 #define __STDC_FORMAT_MACROS
22 #include "db/column_family.h"
23 #include "monitoring/statistics.h"
24 #include "util/filename.h"
25 #include "util/log_buffer.h"
26 #include "util/random.h"
27 #include "util/string_util.h"
28 #include "util/sync_point.h"
32 // Used in universal compaction when trivial move is enabled.
33 // This structure is used for the construction of min heap
34 // that contains the file meta data, the level of the file
35 // and the index of the file in that level
37 struct InputFileInfo
{
38 InputFileInfo() : f(nullptr), level(0), index(0) {}
45 // Used in universal compaction when trivial move is enabled.
46 // This comparator is used for the construction of min heap
47 // based on the smallest key of the file.
48 struct SmallestKeyHeapComparator
{
49 explicit SmallestKeyHeapComparator(const Comparator
* ucmp
) { ucmp_
= ucmp
; }
51 bool operator()(InputFileInfo i1
, InputFileInfo i2
) const {
52 return (ucmp_
->Compare(i1
.f
->smallest
.user_key(),
53 i2
.f
->smallest
.user_key()) > 0);
57 const Comparator
* ucmp_
;
60 typedef std::priority_queue
<InputFileInfo
, std::vector
<InputFileInfo
>,
61 SmallestKeyHeapComparator
>
64 // This function creates the heap that is used to find if the files are
65 // overlapping during universal compaction when the allow_trivial_move
67 SmallestKeyHeap
create_level_heap(Compaction
* c
, const Comparator
* ucmp
) {
68 SmallestKeyHeap smallest_key_priority_q
=
69 SmallestKeyHeap(SmallestKeyHeapComparator(ucmp
));
71 InputFileInfo input_file
;
73 for (size_t l
= 0; l
< c
->num_input_levels(); l
++) {
74 if (c
->num_input_files(l
) != 0) {
75 if (l
== 0 && c
->start_level() == 0) {
76 for (size_t i
= 0; i
< c
->num_input_files(0); i
++) {
77 input_file
.f
= c
->input(0, i
);
80 smallest_key_priority_q
.push(std::move(input_file
));
83 input_file
.f
= c
->input(l
, 0);
86 smallest_key_priority_q
.push(std::move(input_file
));
90 return smallest_key_priority_q
;
94 // smallest_seqno and largest_seqno are set iff. `files` is not empty.
95 void GetSmallestLargestSeqno(const std::vector
<FileMetaData
*>& files
,
96 SequenceNumber
* smallest_seqno
,
97 SequenceNumber
* largest_seqno
) {
99 for (FileMetaData
* f
: files
) {
100 assert(f
->fd
.smallest_seqno
<= f
->fd
.largest_seqno
);
103 *smallest_seqno
= f
->fd
.smallest_seqno
;
104 *largest_seqno
= f
->fd
.largest_seqno
;
106 if (f
->fd
.smallest_seqno
< *smallest_seqno
) {
107 *smallest_seqno
= f
->fd
.smallest_seqno
;
109 if (f
->fd
.largest_seqno
> *largest_seqno
) {
110 *largest_seqno
= f
->fd
.largest_seqno
;
118 // Algorithm that checks to see if there are any overlapping
119 // files in the input
120 bool UniversalCompactionPicker::IsInputFilesNonOverlapping(Compaction
* c
) {
121 auto comparator
= icmp_
->user_comparator();
124 InputFileInfo prev
, curr
, next
;
126 SmallestKeyHeap smallest_key_priority_q
=
127 create_level_heap(c
, icmp_
->user_comparator());
129 while (!smallest_key_priority_q
.empty()) {
130 curr
= smallest_key_priority_q
.top();
131 smallest_key_priority_q
.pop();
137 if (comparator
->Compare(prev
.f
->largest
.user_key(),
138 curr
.f
->smallest
.user_key()) >= 0) {
139 // found overlapping files, return false
142 assert(comparator
->Compare(curr
.f
->largest
.user_key(),
143 prev
.f
->largest
.user_key()) > 0);
149 if (curr
.level
!= 0 && curr
.index
< c
->num_input_files(curr
.level
) - 1) {
150 next
.f
= c
->input(curr
.level
, curr
.index
+ 1);
151 next
.level
= curr
.level
;
152 next
.index
= curr
.index
+ 1;
156 smallest_key_priority_q
.push(std::move(next
));
162 bool UniversalCompactionPicker::NeedsCompaction(
163 const VersionStorageInfo
* vstorage
) const {
164 const int kLevel0
= 0;
165 if (vstorage
->CompactionScore(kLevel0
) >= 1) {
168 if (!vstorage
->FilesMarkedForCompaction().empty()) {
174 void UniversalCompactionPicker::SortedRun::Dump(char* out_buf
,
176 bool print_path
) const {
178 assert(file
!= nullptr);
179 if (file
->fd
.GetPathId() == 0 || !print_path
) {
180 snprintf(out_buf
, out_buf_size
, "file %" PRIu64
, file
->fd
.GetNumber());
182 snprintf(out_buf
, out_buf_size
, "file %" PRIu64
185 file
->fd
.GetNumber(), file
->fd
.GetPathId());
188 snprintf(out_buf
, out_buf_size
, "level %d", level
);
192 void UniversalCompactionPicker::SortedRun::DumpSizeInfo(
193 char* out_buf
, size_t out_buf_size
, size_t sorted_run_count
) const {
195 assert(file
!= nullptr);
196 snprintf(out_buf
, out_buf_size
,
197 "file %" PRIu64
"[%" ROCKSDB_PRIszt
199 "with size %" PRIu64
" (compensated size %" PRIu64
")",
200 file
->fd
.GetNumber(), sorted_run_count
, file
->fd
.GetFileSize(),
201 file
->compensated_file_size
);
203 snprintf(out_buf
, out_buf_size
,
204 "level %d[%" ROCKSDB_PRIszt
206 "with size %" PRIu64
" (compensated size %" PRIu64
")",
207 level
, sorted_run_count
, size
, compensated_file_size
);
211 std::vector
<UniversalCompactionPicker::SortedRun
>
212 UniversalCompactionPicker::CalculateSortedRuns(
213 const VersionStorageInfo
& vstorage
, const ImmutableCFOptions
& /*ioptions*/,
214 const MutableCFOptions
& mutable_cf_options
) {
215 std::vector
<UniversalCompactionPicker::SortedRun
> ret
;
216 for (FileMetaData
* f
: vstorage
.LevelFiles(0)) {
217 ret
.emplace_back(0, f
, f
->fd
.GetFileSize(), f
->compensated_file_size
,
220 for (int level
= 1; level
< vstorage
.num_levels(); level
++) {
221 uint64_t total_compensated_size
= 0U;
222 uint64_t total_size
= 0U;
223 bool being_compacted
= false;
224 bool is_first
= true;
225 for (FileMetaData
* f
: vstorage
.LevelFiles(level
)) {
226 total_compensated_size
+= f
->compensated_file_size
;
227 total_size
+= f
->fd
.GetFileSize();
228 if (mutable_cf_options
.compaction_options_universal
.allow_trivial_move
==
230 if (f
->being_compacted
) {
231 being_compacted
= f
->being_compacted
;
234 // Compaction always includes all files for a non-zero level, so for a
235 // non-zero level, all the files should share the same being_compacted
237 // This assumption is only valid when
238 // mutable_cf_options.compaction_options_universal.allow_trivial_move is
240 assert(is_first
|| f
->being_compacted
== being_compacted
);
243 being_compacted
= f
->being_compacted
;
247 if (total_compensated_size
> 0) {
248 ret
.emplace_back(level
, nullptr, total_size
, total_compensated_size
,
255 // Universal style of compaction. Pick files that are contiguous in
256 // time-range to compact.
257 Compaction
* UniversalCompactionPicker::PickCompaction(
258 const std::string
& cf_name
, const MutableCFOptions
& mutable_cf_options
,
259 VersionStorageInfo
* vstorage
, LogBuffer
* log_buffer
) {
260 const int kLevel0
= 0;
261 double score
= vstorage
->CompactionScore(kLevel0
);
262 std::vector
<SortedRun
> sorted_runs
=
263 CalculateSortedRuns(*vstorage
, ioptions_
, mutable_cf_options
);
265 if (sorted_runs
.size() == 0 ||
266 (vstorage
->FilesMarkedForCompaction().empty() &&
267 sorted_runs
.size() < (unsigned int)mutable_cf_options
268 .level0_file_num_compaction_trigger
)) {
269 ROCKS_LOG_BUFFER(log_buffer
, "[%s] Universal: nothing to do\n",
271 TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
275 VersionStorageInfo::LevelSummaryStorage tmp
;
276 ROCKS_LOG_BUFFER_MAX_SZ(
278 "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt
"): %s\n",
279 cf_name
.c_str(), sorted_runs
.size(), vstorage
->LevelSummary(&tmp
));
281 // Check for size amplification first.
282 Compaction
* c
= nullptr;
283 if (sorted_runs
.size() >=
285 mutable_cf_options
.level0_file_num_compaction_trigger
)) {
286 if ((c
= PickCompactionToReduceSizeAmp(cf_name
, mutable_cf_options
,
287 vstorage
, score
, sorted_runs
,
288 log_buffer
)) != nullptr) {
289 ROCKS_LOG_BUFFER(log_buffer
, "[%s] Universal: compacting for size amp\n",
292 // Size amplification is within limits. Try reducing read
293 // amplification while maintaining file size ratios.
295 mutable_cf_options
.compaction_options_universal
.size_ratio
;
297 if ((c
= PickCompactionToReduceSortedRuns(
298 cf_name
, mutable_cf_options
, vstorage
, score
, ratio
, UINT_MAX
,
299 sorted_runs
, log_buffer
)) != nullptr) {
300 ROCKS_LOG_BUFFER(log_buffer
,
301 "[%s] Universal: compacting for size ratio\n",
304 // Size amplification and file size ratios are within configured limits.
305 // If max read amplification is exceeding configured limits, then force
306 // compaction without looking at filesize ratios and try to reduce
307 // the number of files to fewer than level0_file_num_compaction_trigger.
308 // This is guaranteed by NeedsCompaction()
309 assert(sorted_runs
.size() >=
311 mutable_cf_options
.level0_file_num_compaction_trigger
));
312 // Get the total number of sorted runs that are not being compacted
313 int num_sr_not_compacted
= 0;
314 for (size_t i
= 0; i
< sorted_runs
.size(); i
++) {
315 if (sorted_runs
[i
].being_compacted
== false) {
316 num_sr_not_compacted
++;
320 // The number of sorted runs that are not being compacted is greater
321 // than the maximum allowed number of sorted runs
322 if (num_sr_not_compacted
>
323 mutable_cf_options
.level0_file_num_compaction_trigger
) {
324 unsigned int num_files
=
325 num_sr_not_compacted
-
326 mutable_cf_options
.level0_file_num_compaction_trigger
+ 1;
327 if ((c
= PickCompactionToReduceSortedRuns(
328 cf_name
, mutable_cf_options
, vstorage
, score
, UINT_MAX
,
329 num_files
, sorted_runs
, log_buffer
)) != nullptr) {
330 ROCKS_LOG_BUFFER(log_buffer
,
331 "[%s] Universal: compacting for file num -- %u\n",
332 cf_name
.c_str(), num_files
);
340 if ((c
= PickDeleteTriggeredCompaction(cf_name
, mutable_cf_options
,
341 vstorage
, score
, sorted_runs
,
342 log_buffer
)) != nullptr) {
343 ROCKS_LOG_BUFFER(log_buffer
,
344 "[%s] Universal: delete triggered compaction\n",
350 TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
355 if (mutable_cf_options
.compaction_options_universal
.allow_trivial_move
==
357 c
->set_is_trivial_move(IsInputFilesNonOverlapping(c
));
360 // validate that all the chosen files of L0 are non overlapping in time
362 SequenceNumber prev_smallest_seqno
= 0U;
363 bool is_first
= true;
365 size_t level_index
= 0U;
366 if (c
->start_level() == 0) {
367 for (auto f
: *c
->inputs(0)) {
368 assert(f
->fd
.smallest_seqno
<= f
->fd
.largest_seqno
);
372 prev_smallest_seqno
= f
->fd
.smallest_seqno
;
376 for (; level_index
< c
->num_input_levels(); level_index
++) {
377 if (c
->num_input_files(level_index
) != 0) {
378 SequenceNumber smallest_seqno
= 0U;
379 SequenceNumber largest_seqno
= 0U;
380 GetSmallestLargestSeqno(*(c
->inputs(level_index
)), &smallest_seqno
,
384 } else if (prev_smallest_seqno
> 0) {
385 // A level is considered as the bottommost level if there are
386 // no files in higher levels or if files in higher levels do
387 // not overlap with the files being compacted. Sequence numbers
388 // of files in bottommost level can be set to 0 to help
389 // compression. As a result, the following assert may not hold
390 // if the prev_smallest_seqno is 0.
391 assert(prev_smallest_seqno
> largest_seqno
);
393 prev_smallest_seqno
= smallest_seqno
;
398 MeasureTime(ioptions_
.statistics
, NUM_FILES_IN_SINGLE_COMPACTION
,
399 c
->inputs(0)->size());
401 RegisterCompaction(c
);
402 vstorage
->ComputeCompactionScore(ioptions_
, mutable_cf_options
);
404 TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
409 uint32_t UniversalCompactionPicker::GetPathId(
410 const ImmutableCFOptions
& ioptions
,
411 const MutableCFOptions
& mutable_cf_options
, uint64_t file_size
) {
412 // Two conditions need to be satisfied:
413 // (1) the target path needs to be able to hold the file's size
414 // (2) Total size left in this and previous paths need to be not
415 // smaller than expected future file size before this new file is
416 // compacted, which is estimated based on size_ratio.
417 // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
418 // we will make sure the target file, probably with size of 16, will be
419 // placed in a path so that eventually when new files are generated and
420 // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
421 // before the path we chose.
423 // TODO(sdong): now the case of multiple column families is not
424 // considered in this algorithm. So the target size can be violated in
425 // that case. We need to improve it.
426 uint64_t accumulated_size
= 0;
427 uint64_t future_size
=
429 (100 - mutable_cf_options
.compaction_options_universal
.size_ratio
) / 100;
431 assert(!ioptions
.cf_paths
.empty());
432 for (; p
< ioptions
.cf_paths
.size() - 1; p
++) {
433 uint64_t target_size
= ioptions
.cf_paths
[p
].target_size
;
434 if (target_size
> file_size
&&
435 accumulated_size
+ (target_size
- file_size
) > future_size
) {
438 accumulated_size
+= target_size
;
444 // Consider compaction files based on their size differences with
445 // the next file in time order.
447 Compaction
* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
448 const std::string
& cf_name
, const MutableCFOptions
& mutable_cf_options
,
449 VersionStorageInfo
* vstorage
, double score
, unsigned int ratio
,
450 unsigned int max_number_of_files_to_compact
,
451 const std::vector
<SortedRun
>& sorted_runs
, LogBuffer
* log_buffer
) {
452 unsigned int min_merge_width
=
453 mutable_cf_options
.compaction_options_universal
.min_merge_width
;
454 unsigned int max_merge_width
=
455 mutable_cf_options
.compaction_options_universal
.max_merge_width
;
457 const SortedRun
* sr
= nullptr;
459 size_t start_index
= 0;
460 unsigned int candidate_count
= 0;
462 unsigned int max_files_to_compact
=
463 std::min(max_merge_width
, max_number_of_files_to_compact
);
464 min_merge_width
= std::max(min_merge_width
, 2U);
466 // Caller checks the size before executing this function. This invariant is
467 // important because otherwise we may have a possible integer underflow when
468 // dealing with unsigned types.
469 assert(sorted_runs
.size() > 0);
471 // Considers a candidate file only if it is smaller than the
472 // total size accumulated so far.
473 for (size_t loop
= 0; loop
< sorted_runs
.size(); loop
++) {
476 // Skip files that are already being compacted
477 for (sr
= nullptr; loop
< sorted_runs
.size(); loop
++) {
478 sr
= &sorted_runs
[loop
];
480 if (!sr
->being_compacted
) {
484 char file_num_buf
[kFormatFileNumberBufSize
];
485 sr
->Dump(file_num_buf
, sizeof(file_num_buf
));
486 ROCKS_LOG_BUFFER(log_buffer
,
488 "[%d] being compacted, skipping",
489 cf_name
.c_str(), file_num_buf
, loop
);
494 // This file is not being compacted. Consider it as the
495 // first candidate to be compacted.
496 uint64_t candidate_size
= sr
!= nullptr ? sr
->compensated_file_size
: 0;
498 char file_num_buf
[kFormatFileNumberBufSize
];
499 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
500 ROCKS_LOG_BUFFER(log_buffer
, "[%s] Universal: Possible candidate %s[%d].",
501 cf_name
.c_str(), file_num_buf
, loop
);
504 // Check if the succeeding files need compaction.
505 for (size_t i
= loop
+ 1;
506 candidate_count
< max_files_to_compact
&& i
< sorted_runs
.size();
508 const SortedRun
* succeeding_sr
= &sorted_runs
[i
];
509 if (succeeding_sr
->being_compacted
) {
512 // Pick files if the total/last candidate file size (increased by the
513 // specified ratio) is still larger than the next candidate file.
514 // candidate_size is the total size of files picked so far with the
515 // default kCompactionStopStyleTotalSize; with
516 // kCompactionStopStyleSimilarSize, it's simply the size of the last
518 double sz
= candidate_size
* (100.0 + ratio
) / 100.0;
519 if (sz
< static_cast<double>(succeeding_sr
->size
)) {
522 if (mutable_cf_options
.compaction_options_universal
.stop_style
==
523 kCompactionStopStyleSimilarSize
) {
524 // Similar-size stopping rule: also check the last picked file isn't
525 // far larger than the next candidate file.
526 sz
= (succeeding_sr
->size
* (100.0 + ratio
)) / 100.0;
527 if (sz
< static_cast<double>(candidate_size
)) {
528 // If the small file we've encountered begins a run of similar-size
529 // files, we'll pick them up on a future iteration of the outer
530 // loop. If it's some lonely straggler, it'll eventually get picked
531 // by the last-resort read amp strategy which disregards size ratios.
534 candidate_size
= succeeding_sr
->compensated_file_size
;
535 } else { // default kCompactionStopStyleTotalSize
536 candidate_size
+= succeeding_sr
->compensated_file_size
;
541 // Found a series of consecutive files that need compaction.
542 if (candidate_count
>= (unsigned int)min_merge_width
) {
547 for (size_t i
= loop
;
548 i
< loop
+ candidate_count
&& i
< sorted_runs
.size(); i
++) {
549 const SortedRun
* skipping_sr
= &sorted_runs
[i
];
550 char file_num_buf
[256];
551 skipping_sr
->DumpSizeInfo(file_num_buf
, sizeof(file_num_buf
), loop
);
552 ROCKS_LOG_BUFFER(log_buffer
, "[%s] Universal: Skipping %s",
553 cf_name
.c_str(), file_num_buf
);
557 if (!done
|| candidate_count
<= 1) {
560 size_t first_index_after
= start_index
+ candidate_count
;
561 // Compression is enabled if files compacted earlier already reached
562 // size ratio of compression.
563 bool enable_compression
= true;
564 int ratio_to_compress
=
565 mutable_cf_options
.compaction_options_universal
.compression_size_percent
;
566 if (ratio_to_compress
>= 0) {
567 uint64_t total_size
= 0;
568 for (auto& sorted_run
: sorted_runs
) {
569 total_size
+= sorted_run
.compensated_file_size
;
572 uint64_t older_file_size
= 0;
573 for (size_t i
= sorted_runs
.size() - 1; i
>= first_index_after
; i
--) {
574 older_file_size
+= sorted_runs
[i
].size
;
575 if (older_file_size
* 100L >= total_size
* (long)ratio_to_compress
) {
576 enable_compression
= false;
582 uint64_t estimated_total_size
= 0;
583 for (unsigned int i
= 0; i
< first_index_after
; i
++) {
584 estimated_total_size
+= sorted_runs
[i
].size
;
587 GetPathId(ioptions_
, mutable_cf_options
, estimated_total_size
);
588 int start_level
= sorted_runs
[start_index
].level
;
590 if (first_index_after
== sorted_runs
.size()) {
591 output_level
= vstorage
->num_levels() - 1;
592 } else if (sorted_runs
[first_index_after
].level
== 0) {
595 output_level
= sorted_runs
[first_index_after
].level
- 1;
598 // last level is reserved for the files ingested behind
599 if (ioptions_
.allow_ingest_behind
&&
600 (output_level
== vstorage
->num_levels() - 1)) {
601 assert(output_level
> 1);
605 std::vector
<CompactionInputFiles
> inputs(vstorage
->num_levels());
606 for (size_t i
= 0; i
< inputs
.size(); ++i
) {
607 inputs
[i
].level
= start_level
+ static_cast<int>(i
);
609 for (size_t i
= start_index
; i
< first_index_after
; i
++) {
610 auto& picking_sr
= sorted_runs
[i
];
611 if (picking_sr
.level
== 0) {
612 FileMetaData
* picking_file
= picking_sr
.file
;
613 inputs
[0].files
.push_back(picking_file
);
615 auto& files
= inputs
[picking_sr
.level
- start_level
].files
;
616 for (auto* f
: vstorage
->LevelFiles(picking_sr
.level
)) {
620 char file_num_buf
[256];
621 picking_sr
.DumpSizeInfo(file_num_buf
, sizeof(file_num_buf
), i
);
622 ROCKS_LOG_BUFFER(log_buffer
, "[%s] Universal: Picking %s", cf_name
.c_str(),
626 CompactionReason compaction_reason
;
627 if (max_number_of_files_to_compact
== UINT_MAX
) {
628 compaction_reason
= CompactionReason::kUniversalSizeRatio
;
630 compaction_reason
= CompactionReason::kUniversalSortedRunNum
;
632 return new Compaction(
633 vstorage
, ioptions_
, mutable_cf_options
, std::move(inputs
), output_level
,
634 MaxFileSizeForLevel(mutable_cf_options
, output_level
,
635 kCompactionStyleUniversal
),
637 GetCompressionType(ioptions_
, vstorage
, mutable_cf_options
, start_level
,
638 1, enable_compression
),
639 GetCompressionOptions(ioptions_
, vstorage
, start_level
,
641 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
642 score
, false /* deletion_compaction */, compaction_reason
);
645 // Look at overall size amplification. If size amplification
646 // exceeeds the configured value, then do a compaction
647 // of the candidate files all the way upto the earliest
648 // base file (overrides configured values of file-size ratios,
649 // min_merge_width and max_merge_width).
651 Compaction
* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
652 const std::string
& cf_name
, const MutableCFOptions
& mutable_cf_options
,
653 VersionStorageInfo
* vstorage
, double score
,
654 const std::vector
<SortedRun
>& sorted_runs
, LogBuffer
* log_buffer
) {
655 // percentage flexibility while reducing size amplification
656 uint64_t ratio
= mutable_cf_options
.compaction_options_universal
657 .max_size_amplification_percent
;
659 unsigned int candidate_count
= 0;
660 uint64_t candidate_size
= 0;
661 size_t start_index
= 0;
662 const SortedRun
* sr
= nullptr;
664 if (sorted_runs
.back().being_compacted
) {
668 // Skip files that are already being compacted
669 for (size_t loop
= 0; loop
< sorted_runs
.size() - 1; loop
++) {
670 sr
= &sorted_runs
[loop
];
671 if (!sr
->being_compacted
) {
672 start_index
= loop
; // Consider this as the first candidate.
675 char file_num_buf
[kFormatFileNumberBufSize
];
676 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
677 ROCKS_LOG_BUFFER(log_buffer
, "[%s] Universal: skipping %s[%d] compacted %s",
678 cf_name
.c_str(), file_num_buf
, loop
,
679 " cannot be a candidate to reduce size amp.\n");
684 return nullptr; // no candidate files
687 char file_num_buf
[kFormatFileNumberBufSize
];
688 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
691 "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt
"] %s",
692 cf_name
.c_str(), file_num_buf
, start_index
, " to reduce size amp.\n");
695 // keep adding up all the remaining files
696 for (size_t loop
= start_index
; loop
< sorted_runs
.size() - 1; loop
++) {
697 sr
= &sorted_runs
[loop
];
698 if (sr
->being_compacted
) {
699 char file_num_buf
[kFormatFileNumberBufSize
];
700 sr
->Dump(file_num_buf
, sizeof(file_num_buf
), true);
702 log_buffer
, "[%s] Universal: Possible candidate %s[%d] %s",
703 cf_name
.c_str(), file_num_buf
, start_index
,
704 " is already being compacted. No size amp reduction possible.\n");
707 candidate_size
+= sr
->compensated_file_size
;
710 if (candidate_count
== 0) {
714 // size of earliest file
715 uint64_t earliest_file_size
= sorted_runs
.back().size
;
717 // size amplification = percentage of additional size
718 if (candidate_size
* 100 < ratio
* earliest_file_size
) {
721 "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
722 " earliest-file-size %" PRIu64
,
723 cf_name
.c_str(), candidate_size
, earliest_file_size
);
728 "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
729 " earliest-file-size %" PRIu64
,
730 cf_name
.c_str(), candidate_size
, earliest_file_size
);
732 assert(start_index
< sorted_runs
.size() - 1);
734 // Estimate total file size
735 uint64_t estimated_total_size
= 0;
736 for (size_t loop
= start_index
; loop
< sorted_runs
.size(); loop
++) {
737 estimated_total_size
+= sorted_runs
[loop
].size
;
740 GetPathId(ioptions_
, mutable_cf_options
, estimated_total_size
);
741 int start_level
= sorted_runs
[start_index
].level
;
743 std::vector
<CompactionInputFiles
> inputs(vstorage
->num_levels());
744 for (size_t i
= 0; i
< inputs
.size(); ++i
) {
745 inputs
[i
].level
= start_level
+ static_cast<int>(i
);
747 // We always compact all the files, so always compress.
748 for (size_t loop
= start_index
; loop
< sorted_runs
.size(); loop
++) {
749 auto& picking_sr
= sorted_runs
[loop
];
750 if (picking_sr
.level
== 0) {
751 FileMetaData
* f
= picking_sr
.file
;
752 inputs
[0].files
.push_back(f
);
754 auto& files
= inputs
[picking_sr
.level
- start_level
].files
;
755 for (auto* f
: vstorage
->LevelFiles(picking_sr
.level
)) {
759 char file_num_buf
[256];
760 picking_sr
.DumpSizeInfo(file_num_buf
, sizeof(file_num_buf
), loop
);
761 ROCKS_LOG_BUFFER(log_buffer
, "[%s] Universal: size amp picking %s",
762 cf_name
.c_str(), file_num_buf
);
765 // output files at the bottom most level, unless it's reserved
766 int output_level
= vstorage
->num_levels() - 1;
767 // last level is reserved for the files ingested behind
768 if (ioptions_
.allow_ingest_behind
) {
769 assert(output_level
> 1);
773 return new Compaction(
774 vstorage
, ioptions_
, mutable_cf_options
, std::move(inputs
), output_level
,
775 MaxFileSizeForLevel(mutable_cf_options
, output_level
,
776 kCompactionStyleUniversal
),
777 /* max_grandparent_overlap_bytes */ LLONG_MAX
, path_id
,
778 GetCompressionType(ioptions_
, vstorage
, mutable_cf_options
, output_level
,
780 GetCompressionOptions(ioptions_
, vstorage
, output_level
),
781 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
782 score
, false /* deletion_compaction */,
783 CompactionReason::kUniversalSizeAmplification
);
786 // Pick files marked for compaction. Typically, files are marked by
787 // CompactOnDeleteCollector due to the presence of tombstones.
788 Compaction
* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
789 const std::string
& cf_name
, const MutableCFOptions
& mutable_cf_options
,
790 VersionStorageInfo
* vstorage
, double score
,
791 const std::vector
<SortedRun
>& /*sorted_runs*/, LogBuffer
* /*log_buffer*/) {
792 CompactionInputFiles start_level_inputs
;
794 std::vector
<CompactionInputFiles
> inputs
;
796 if (vstorage
->num_levels() == 1) {
797 // This is single level universal. Since we're basically trying to reclaim
798 // space by processing files marked for compaction due to high tombstone
799 // density, let's do the same thing as compaction to reduce size amp which
800 // has the same goals.
801 bool compact
= false;
803 start_level_inputs
.level
= 0;
804 start_level_inputs
.files
.clear();
806 for (FileMetaData
* f
: vstorage
->LevelFiles(0)) {
807 if (f
->marked_for_compaction
) {
811 start_level_inputs
.files
.push_back(f
);
814 if (start_level_inputs
.size() <= 1) {
815 // If only the last file in L0 is marked for compaction, ignore it
818 inputs
.push_back(start_level_inputs
);
822 // For multi-level universal, the strategy is to make this look more like
823 // leveled. We pick one of the files marked for compaction and compact with
824 // overlapping files in the adjacent level.
825 PickFilesMarkedForCompaction(cf_name
, vstorage
, &start_level
, &output_level
,
826 &start_level_inputs
);
827 if (start_level_inputs
.empty()) {
831 // Pick the first non-empty level after the start_level
832 for (output_level
= start_level
+ 1; output_level
< vstorage
->num_levels();
834 if (vstorage
->NumLevelFiles(output_level
) != 0) {
839 // If all higher levels are empty, pick the highest level as output level
840 if (output_level
== vstorage
->num_levels()) {
841 if (start_level
== 0) {
842 output_level
= vstorage
->num_levels() - 1;
844 // If start level is non-zero and all higher levels are empty, this
845 // compaction will translate into a trivial move. Since the idea is
846 // to reclaim space and trivial move doesn't help with that, we
847 // skip compaction in this case and return nullptr
851 if (ioptions_
.allow_ingest_behind
&&
852 output_level
== vstorage
->num_levels() - 1) {
853 assert(output_level
> 1);
857 if (output_level
!= 0) {
858 if (start_level
== 0) {
859 if (!GetOverlappingL0Files(vstorage
, &start_level_inputs
, output_level
,
865 CompactionInputFiles output_level_inputs
;
866 int parent_index
= -1;
868 output_level_inputs
.level
= output_level
;
869 if (!SetupOtherInputs(cf_name
, mutable_cf_options
, vstorage
,
870 &start_level_inputs
, &output_level_inputs
,
871 &parent_index
, -1)) {
874 inputs
.push_back(start_level_inputs
);
875 if (!output_level_inputs
.empty()) {
876 inputs
.push_back(output_level_inputs
);
878 if (FilesRangeOverlapWithCompaction(inputs
, output_level
)) {
882 inputs
.push_back(start_level_inputs
);
886 uint64_t estimated_total_size
= 0;
887 // Use size of the output level as estimated file size
888 for (FileMetaData
* f
: vstorage
->LevelFiles(output_level
)) {
889 estimated_total_size
+= f
->fd
.GetFileSize();
892 GetPathId(ioptions_
, mutable_cf_options
, estimated_total_size
);
893 return new Compaction(
894 vstorage
, ioptions_
, mutable_cf_options
, std::move(inputs
), output_level
,
895 MaxFileSizeForLevel(mutable_cf_options
, output_level
,
896 kCompactionStyleUniversal
),
897 /* max_grandparent_overlap_bytes */ LLONG_MAX
, path_id
,
898 GetCompressionType(ioptions_
, vstorage
, mutable_cf_options
, output_level
,
900 GetCompressionOptions(ioptions_
, vstorage
, output_level
),
901 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true,
902 score
, false /* deletion_compaction */,
903 CompactionReason::kFilesMarkedForCompaction
);
905 } // namespace rocksdb
907 #endif // !ROCKSDB_LITE