1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/compaction.h"
12 #ifndef __STDC_FORMAT_MACROS
13 #define __STDC_FORMAT_MACROS
19 #include "db/column_family.h"
20 #include "rocksdb/compaction_filter.h"
21 #include "util/string_util.h"
22 #include "util/sync_point.h"
26 const uint64_t kRangeTombstoneSentinel
=
27 PackSequenceAndType(kMaxSequenceNumber
, kTypeRangeDeletion
);
29 int sstableKeyCompare(const Comparator
* user_cmp
, const InternalKey
& a
,
30 const InternalKey
& b
) {
31 auto c
= user_cmp
->Compare(a
.user_key(), b
.user_key());
35 auto a_footer
= ExtractInternalKeyFooter(a
.Encode());
36 auto b_footer
= ExtractInternalKeyFooter(b
.Encode());
37 if (a_footer
== kRangeTombstoneSentinel
) {
38 if (b_footer
!= kRangeTombstoneSentinel
) {
41 } else if (b_footer
== kRangeTombstoneSentinel
) {
47 int sstableKeyCompare(const Comparator
* user_cmp
, const InternalKey
* a
,
48 const InternalKey
& b
) {
52 return sstableKeyCompare(user_cmp
, *a
, b
);
55 int sstableKeyCompare(const Comparator
* user_cmp
, const InternalKey
& a
,
56 const InternalKey
* b
) {
60 return sstableKeyCompare(user_cmp
, a
, *b
);
63 uint64_t TotalFileSize(const std::vector
<FileMetaData
*>& files
) {
65 for (size_t i
= 0; i
< files
.size() && files
[i
]; i
++) {
66 sum
+= files
[i
]->fd
.GetFileSize();
71 void Compaction::SetInputVersion(Version
* _input_version
) {
72 input_version_
= _input_version
;
73 cfd_
= input_version_
->cfd();
76 input_version_
->Ref();
77 edit_
.SetColumnFamily(cfd_
->GetID());
80 void Compaction::GetBoundaryKeys(
81 VersionStorageInfo
* vstorage
,
82 const std::vector
<CompactionInputFiles
>& inputs
, Slice
* smallest_user_key
,
83 Slice
* largest_user_key
) {
84 bool initialized
= false;
85 const Comparator
* ucmp
= vstorage
->InternalComparator()->user_comparator();
86 for (size_t i
= 0; i
< inputs
.size(); ++i
) {
87 if (inputs
[i
].files
.empty()) {
90 if (inputs
[i
].level
== 0) {
91 // we need to consider all files on level 0
92 for (const auto* f
: inputs
[i
].files
) {
93 const Slice
& start_user_key
= f
->smallest
.user_key();
95 ucmp
->Compare(start_user_key
, *smallest_user_key
) < 0) {
96 *smallest_user_key
= start_user_key
;
98 const Slice
& end_user_key
= f
->largest
.user_key();
100 ucmp
->Compare(end_user_key
, *largest_user_key
) > 0) {
101 *largest_user_key
= end_user_key
;
106 // we only need to consider the first and last file
107 const Slice
& start_user_key
= inputs
[i
].files
[0]->smallest
.user_key();
109 ucmp
->Compare(start_user_key
, *smallest_user_key
) < 0) {
110 *smallest_user_key
= start_user_key
;
112 const Slice
& end_user_key
= inputs
[i
].files
.back()->largest
.user_key();
113 if (!initialized
|| ucmp
->Compare(end_user_key
, *largest_user_key
) > 0) {
114 *largest_user_key
= end_user_key
;
121 std::vector
<CompactionInputFiles
> Compaction::PopulateWithAtomicBoundaries(
122 VersionStorageInfo
* vstorage
, std::vector
<CompactionInputFiles
> inputs
) {
123 const Comparator
* ucmp
= vstorage
->InternalComparator()->user_comparator();
124 for (size_t i
= 0; i
< inputs
.size(); i
++) {
125 if (inputs
[i
].level
== 0 || inputs
[i
].files
.empty()) {
128 inputs
[i
].atomic_compaction_unit_boundaries
.reserve(inputs
[i
].files
.size());
129 AtomicCompactionUnitBoundary cur_boundary
;
130 size_t first_atomic_idx
= 0;
131 auto add_unit_boundary
= [&](size_t to
) {
132 if (first_atomic_idx
== to
) return;
133 for (size_t k
= first_atomic_idx
; k
< to
; k
++) {
134 inputs
[i
].atomic_compaction_unit_boundaries
.push_back(cur_boundary
);
136 first_atomic_idx
= to
;
138 for (size_t j
= 0; j
< inputs
[i
].files
.size(); j
++) {
139 const auto* f
= inputs
[i
].files
[j
];
141 // First file in a level.
142 cur_boundary
.smallest
= &f
->smallest
;
143 cur_boundary
.largest
= &f
->largest
;
144 } else if (sstableKeyCompare(ucmp
, *cur_boundary
.largest
, f
->smallest
) ==
146 // SSTs overlap but the end key of the previous file was not
147 // artificially extended by a range tombstone. Extend the current
149 cur_boundary
.largest
= &f
->largest
;
151 // Atomic compaction unit has ended.
152 add_unit_boundary(j
);
153 cur_boundary
.smallest
= &f
->smallest
;
154 cur_boundary
.largest
= &f
->largest
;
157 add_unit_boundary(inputs
[i
].files
.size());
158 assert(inputs
[i
].files
.size() ==
159 inputs
[i
].atomic_compaction_unit_boundaries
.size());
164 // helper function to determine if compaction is creating files at the
166 bool Compaction::IsBottommostLevel(
167 int output_level
, VersionStorageInfo
* vstorage
,
168 const std::vector
<CompactionInputFiles
>& inputs
) {
170 if (output_level
== 0) {
172 for (const auto* file
: vstorage
->LevelFiles(0)) {
173 if (inputs
[0].files
.back() == file
) {
178 assert(static_cast<size_t>(output_l0_idx
) < vstorage
->LevelFiles(0).size());
182 Slice smallest_key
, largest_key
;
183 GetBoundaryKeys(vstorage
, inputs
, &smallest_key
, &largest_key
);
184 return !vstorage
->RangeMightExistAfterSortedRun(smallest_key
, largest_key
,
185 output_level
, output_l0_idx
);
188 // test function to validate the functionality of IsBottommostLevel()
189 // function -- determines if compaction with inputs and storage is bottommost
190 bool Compaction::TEST_IsBottommostLevel(
191 int output_level
, VersionStorageInfo
* vstorage
,
192 const std::vector
<CompactionInputFiles
>& inputs
) {
193 return IsBottommostLevel(output_level
, vstorage
, inputs
);
196 bool Compaction::IsFullCompaction(
197 VersionStorageInfo
* vstorage
,
198 const std::vector
<CompactionInputFiles
>& inputs
) {
199 size_t num_files_in_compaction
= 0;
200 size_t total_num_files
= 0;
201 for (int l
= 0; l
< vstorage
->num_levels(); l
++) {
202 total_num_files
+= vstorage
->NumLevelFiles(l
);
204 for (size_t i
= 0; i
< inputs
.size(); i
++) {
205 num_files_in_compaction
+= inputs
[i
].size();
207 return num_files_in_compaction
== total_num_files
;
210 Compaction::Compaction(VersionStorageInfo
* vstorage
,
211 const ImmutableCFOptions
& _immutable_cf_options
,
212 const MutableCFOptions
& _mutable_cf_options
,
213 std::vector
<CompactionInputFiles
> _inputs
,
214 int _output_level
, uint64_t _target_file_size
,
215 uint64_t _max_compaction_bytes
, uint32_t _output_path_id
,
216 CompressionType _compression
,
217 CompressionOptions _compression_opts
,
218 uint32_t _max_subcompactions
,
219 std::vector
<FileMetaData
*> _grandparents
,
220 bool _manual_compaction
, double _score
,
221 bool _deletion_compaction
,
222 CompactionReason _compaction_reason
)
223 : input_vstorage_(vstorage
),
224 start_level_(_inputs
[0].level
),
225 output_level_(_output_level
),
226 max_output_file_size_(_target_file_size
),
227 max_compaction_bytes_(_max_compaction_bytes
),
228 max_subcompactions_(_max_subcompactions
),
229 immutable_cf_options_(_immutable_cf_options
),
230 mutable_cf_options_(_mutable_cf_options
),
231 input_version_(nullptr),
232 number_levels_(vstorage
->num_levels()),
234 output_path_id_(_output_path_id
),
235 output_compression_(_compression
),
236 output_compression_opts_(_compression_opts
),
237 deletion_compaction_(_deletion_compaction
),
238 inputs_(PopulateWithAtomicBoundaries(vstorage
, std::move(_inputs
))),
239 grandparents_(std::move(_grandparents
)),
241 bottommost_level_(IsBottommostLevel(output_level_
, vstorage
, inputs_
)),
242 is_full_compaction_(IsFullCompaction(vstorage
, inputs_
)),
243 is_manual_compaction_(_manual_compaction
),
244 is_trivial_move_(false),
245 compaction_reason_(_compaction_reason
) {
246 MarkFilesBeingCompacted(true);
247 if (is_manual_compaction_
) {
248 compaction_reason_
= CompactionReason::kManualCompaction
;
250 if (max_subcompactions_
== 0) {
251 max_subcompactions_
= immutable_cf_options_
.max_subcompactions
;
253 if (!bottommost_level_
) {
254 // Currently we only enable dictionary compression during compaction to the
256 output_compression_opts_
.max_dict_bytes
= 0;
257 output_compression_opts_
.zstd_max_train_bytes
= 0;
261 for (size_t i
= 1; i
< inputs_
.size(); ++i
) {
262 assert(inputs_
[i
].level
> inputs_
[i
- 1].level
);
266 // setup input_levels_
268 input_levels_
.resize(num_input_levels());
269 for (size_t which
= 0; which
< num_input_levels(); which
++) {
270 DoGenerateLevelFilesBrief(&input_levels_
[which
], inputs_
[which
].files
,
275 GetBoundaryKeys(vstorage
, inputs_
, &smallest_user_key_
, &largest_user_key_
);
278 Compaction::~Compaction() {
279 if (input_version_
!= nullptr) {
280 input_version_
->Unref();
282 if (cfd_
!= nullptr) {
289 bool Compaction::InputCompressionMatchesOutput() const {
290 int base_level
= input_vstorage_
->base_level();
291 bool matches
= (GetCompressionType(immutable_cf_options_
, input_vstorage_
,
292 mutable_cf_options_
, start_level_
,
293 base_level
) == output_compression_
);
295 TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
298 TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch");
302 bool Compaction::IsTrivialMove() const {
303 // Avoid a move if there is lots of overlapping grandparent data.
304 // Otherwise, the move could create a parent file that will require
305 // a very expensive merge later on.
306 // If start_level_== output_level_, the purpose is to force compaction
307 // filter to be applied to that level, and thus cannot be a trivial move.
309 // Check if start level have files with overlapping ranges
310 if (start_level_
== 0 && input_vstorage_
->level0_non_overlapping() == false) {
311 // We cannot move files from L0 to L1 if the files are overlapping
315 if (is_manual_compaction_
&&
316 (immutable_cf_options_
.compaction_filter
!= nullptr ||
317 immutable_cf_options_
.compaction_filter_factory
!= nullptr)) {
318 // This is a manual compaction and we have a compaction filter that should
319 // be executed, we cannot do a trivial move
323 // Used in universal compaction, where trivial move can be done if the
324 // input files are non overlapping
325 if ((mutable_cf_options_
.compaction_options_universal
.allow_trivial_move
) &&
326 (output_level_
!= 0)) {
327 return is_trivial_move_
;
330 if (!(start_level_
!= output_level_
&& num_input_levels() == 1 &&
331 input(0, 0)->fd
.GetPathId() == output_path_id() &&
332 InputCompressionMatchesOutput())) {
336 // assert inputs_.size() == 1
338 for (const auto& file
: inputs_
.front().files
) {
339 std::vector
<FileMetaData
*> file_grand_parents
;
340 if (output_level_
+ 1 >= number_levels_
) {
343 input_vstorage_
->GetOverlappingInputs(output_level_
+ 1, &file
->smallest
,
344 &file
->largest
, &file_grand_parents
);
345 const auto compaction_size
=
346 file
->fd
.GetFileSize() + TotalFileSize(file_grand_parents
);
347 if (compaction_size
> max_compaction_bytes_
) {
355 void Compaction::AddInputDeletions(VersionEdit
* out_edit
) {
356 for (size_t which
= 0; which
< num_input_levels(); which
++) {
357 for (size_t i
= 0; i
< inputs_
[which
].size(); i
++) {
358 out_edit
->DeleteFile(level(which
), inputs_
[which
][i
]->fd
.GetNumber());
363 bool Compaction::KeyNotExistsBeyondOutputLevel(
364 const Slice
& user_key
, std::vector
<size_t>* level_ptrs
) const {
365 assert(input_version_
!= nullptr);
366 assert(level_ptrs
!= nullptr);
367 assert(level_ptrs
->size() == static_cast<size_t>(number_levels_
));
368 if (bottommost_level_
) {
370 } else if (output_level_
!= 0 &&
371 cfd_
->ioptions()->compaction_style
== kCompactionStyleLevel
) {
372 // Maybe use binary search to find right entry instead of linear search?
373 const Comparator
* user_cmp
= cfd_
->user_comparator();
374 for (int lvl
= output_level_
+ 1; lvl
< number_levels_
; lvl
++) {
375 const std::vector
<FileMetaData
*>& files
=
376 input_vstorage_
->LevelFiles(lvl
);
377 for (; level_ptrs
->at(lvl
) < files
.size(); level_ptrs
->at(lvl
)++) {
378 auto* f
= files
[level_ptrs
->at(lvl
)];
379 if (user_cmp
->Compare(user_key
, f
->largest
.user_key()) <= 0) {
380 // We've advanced far enough
381 if (user_cmp
->Compare(user_key
, f
->smallest
.user_key()) >= 0) {
382 // Key falls in this file's range, so it may
383 // exist beyond output level
395 // Mark (or clear) each file that is being compacted
396 void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted
) {
397 for (size_t i
= 0; i
< num_input_levels(); i
++) {
398 for (size_t j
= 0; j
< inputs_
[i
].size(); j
++) {
399 assert(mark_as_compacted
? !inputs_
[i
][j
]->being_compacted
400 : inputs_
[i
][j
]->being_compacted
);
401 inputs_
[i
][j
]->being_compacted
= mark_as_compacted
;
407 // If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5,
408 // print: "3@0 + 2@3 + 1@4 files to L5"
409 const char* Compaction::InputLevelSummary(
410 InputLevelSummaryBuffer
* scratch
) const {
412 bool is_first
= true;
413 for (auto& input_level
: inputs_
) {
414 if (input_level
.empty()) {
419 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
, " + ");
420 len
= std::min(len
, static_cast<int>(sizeof(scratch
->buffer
)));
424 len
+= snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
,
425 "%" ROCKSDB_PRIszt
"@%d", input_level
.size(),
427 len
= std::min(len
, static_cast<int>(sizeof(scratch
->buffer
)));
429 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
,
430 " files to L%d", output_level());
432 return scratch
->buffer
;
435 uint64_t Compaction::CalculateTotalInputSize() const {
437 for (auto& input_level
: inputs_
) {
438 for (auto f
: input_level
.files
) {
439 size
+= f
->fd
.GetFileSize();
445 void Compaction::ReleaseCompactionFiles(Status status
) {
446 MarkFilesBeingCompacted(false);
447 cfd_
->compaction_picker()->ReleaseCompactionFiles(this, status
);
450 void Compaction::ResetNextCompactionIndex() {
451 assert(input_version_
!= nullptr);
452 input_vstorage_
->ResetNextCompactionIndex(start_level_
);
456 int InputSummary(const std::vector
<FileMetaData
*>& files
, char* output
,
460 for (size_t i
= 0; i
< files
.size(); i
++) {
461 int sz
= len
- write
;
464 AppendHumanBytes(files
.at(i
)->fd
.GetFileSize(), sztxt
, 16);
465 ret
= snprintf(output
+ write
, sz
, "%" PRIu64
"(%s) ",
466 files
.at(i
)->fd
.GetNumber(), sztxt
);
467 if (ret
< 0 || ret
>= sz
) break;
470 // if files.size() is non-zero, overwrite the last space
471 return write
- !!files
.size();
475 void Compaction::Summary(char* output
, int len
) {
477 snprintf(output
, len
, "Base version %" PRIu64
" Base level %d, inputs: [",
478 input_version_
->GetVersionNumber(), start_level_
);
479 if (write
< 0 || write
>= len
) {
483 for (size_t level_iter
= 0; level_iter
< num_input_levels(); ++level_iter
) {
484 if (level_iter
> 0) {
485 write
+= snprintf(output
+ write
, len
- write
, "], [");
486 if (write
< 0 || write
>= len
) {
491 InputSummary(inputs_
[level_iter
].files
, output
+ write
, len
- write
);
492 if (write
< 0 || write
>= len
) {
497 snprintf(output
+ write
, len
- write
, "]");
500 uint64_t Compaction::OutputFilePreallocationSize() const {
501 uint64_t preallocation_size
= 0;
503 for (const auto& level_files
: inputs_
) {
504 for (const auto& file
: level_files
.files
) {
505 preallocation_size
+= file
->fd
.GetFileSize();
509 if (max_output_file_size_
!= port::kMaxUint64
&&
510 (immutable_cf_options_
.compaction_style
== kCompactionStyleLevel
||
511 output_level() > 0)) {
512 preallocation_size
= std::min(max_output_file_size_
, preallocation_size
);
515 // Over-estimate slightly so we don't end up just barely crossing
517 // No point to prellocate more than 1GB.
518 return std::min(uint64_t{1073741824},
519 preallocation_size
+ (preallocation_size
/ 10));
522 std::unique_ptr
<CompactionFilter
> Compaction::CreateCompactionFilter() const {
523 if (!cfd_
->ioptions()->compaction_filter_factory
) {
527 CompactionFilter::Context context
;
528 context
.is_full_compaction
= is_full_compaction_
;
529 context
.is_manual_compaction
= is_manual_compaction_
;
530 context
.column_family_id
= cfd_
->GetID();
531 return cfd_
->ioptions()->compaction_filter_factory
->CreateCompactionFilter(
535 bool Compaction::IsOutputLevelEmpty() const {
536 return inputs_
.back().level
!= output_level_
|| inputs_
.back().empty();
539 bool Compaction::ShouldFormSubcompactions() const {
540 if (max_subcompactions_
<= 1 || cfd_
== nullptr) {
543 if (cfd_
->ioptions()->compaction_style
== kCompactionStyleLevel
) {
544 return (start_level_
== 0 || is_manual_compaction_
) && output_level_
> 0 &&
545 !IsOutputLevelEmpty();
546 } else if (cfd_
->ioptions()->compaction_style
== kCompactionStyleUniversal
) {
547 return number_levels_
> 1 && output_level_
> 0;
553 uint64_t Compaction::MaxInputFileCreationTime() const {
554 uint64_t max_creation_time
= 0;
555 for (const auto& file
: inputs_
[0].files
) {
556 if (file
->fd
.table_reader
!= nullptr &&
557 file
->fd
.table_reader
->GetTableProperties() != nullptr) {
558 uint64_t creation_time
=
559 file
->fd
.table_reader
->GetTableProperties()->creation_time
;
560 max_creation_time
= std::max(max_creation_time
, creation_time
);
563 return max_creation_time
;
566 int Compaction::GetInputBaseLevel() const {
567 return input_vstorage_
->base_level();
570 } // namespace rocksdb