1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
13 #include "db/column_family.h"
14 #include "db/compaction/compaction.h"
15 #include "rocksdb/compaction_filter.h"
16 #include "test_util/sync_point.h"
17 #include "util/string_util.h"
19 namespace ROCKSDB_NAMESPACE
{
21 const uint64_t kRangeTombstoneSentinel
=
22 PackSequenceAndType(kMaxSequenceNumber
, kTypeRangeDeletion
);
24 int sstableKeyCompare(const Comparator
* user_cmp
, const InternalKey
& a
,
25 const InternalKey
& b
) {
26 auto c
= user_cmp
->Compare(a
.user_key(), b
.user_key());
30 auto a_footer
= ExtractInternalKeyFooter(a
.Encode());
31 auto b_footer
= ExtractInternalKeyFooter(b
.Encode());
32 if (a_footer
== kRangeTombstoneSentinel
) {
33 if (b_footer
!= kRangeTombstoneSentinel
) {
36 } else if (b_footer
== kRangeTombstoneSentinel
) {
42 int sstableKeyCompare(const Comparator
* user_cmp
, const InternalKey
* a
,
43 const InternalKey
& b
) {
47 return sstableKeyCompare(user_cmp
, *a
, b
);
50 int sstableKeyCompare(const Comparator
* user_cmp
, const InternalKey
& a
,
51 const InternalKey
* b
) {
55 return sstableKeyCompare(user_cmp
, a
, *b
);
58 uint64_t TotalFileSize(const std::vector
<FileMetaData
*>& files
) {
60 for (size_t i
= 0; i
< files
.size() && files
[i
]; i
++) {
61 sum
+= files
[i
]->fd
.GetFileSize();
66 void Compaction::SetInputVersion(Version
* _input_version
) {
67 input_version_
= _input_version
;
68 cfd_
= input_version_
->cfd();
71 input_version_
->Ref();
72 edit_
.SetColumnFamily(cfd_
->GetID());
75 void Compaction::GetBoundaryKeys(
76 VersionStorageInfo
* vstorage
,
77 const std::vector
<CompactionInputFiles
>& inputs
, Slice
* smallest_user_key
,
78 Slice
* largest_user_key
) {
79 bool initialized
= false;
80 const Comparator
* ucmp
= vstorage
->InternalComparator()->user_comparator();
81 for (size_t i
= 0; i
< inputs
.size(); ++i
) {
82 if (inputs
[i
].files
.empty()) {
85 if (inputs
[i
].level
== 0) {
86 // we need to consider all files on level 0
87 for (const auto* f
: inputs
[i
].files
) {
88 const Slice
& start_user_key
= f
->smallest
.user_key();
90 ucmp
->Compare(start_user_key
, *smallest_user_key
) < 0) {
91 *smallest_user_key
= start_user_key
;
93 const Slice
& end_user_key
= f
->largest
.user_key();
95 ucmp
->Compare(end_user_key
, *largest_user_key
) > 0) {
96 *largest_user_key
= end_user_key
;
101 // we only need to consider the first and last file
102 const Slice
& start_user_key
= inputs
[i
].files
[0]->smallest
.user_key();
104 ucmp
->Compare(start_user_key
, *smallest_user_key
) < 0) {
105 *smallest_user_key
= start_user_key
;
107 const Slice
& end_user_key
= inputs
[i
].files
.back()->largest
.user_key();
108 if (!initialized
|| ucmp
->Compare(end_user_key
, *largest_user_key
) > 0) {
109 *largest_user_key
= end_user_key
;
116 std::vector
<CompactionInputFiles
> Compaction::PopulateWithAtomicBoundaries(
117 VersionStorageInfo
* vstorage
, std::vector
<CompactionInputFiles
> inputs
) {
118 const Comparator
* ucmp
= vstorage
->InternalComparator()->user_comparator();
119 for (size_t i
= 0; i
< inputs
.size(); i
++) {
120 if (inputs
[i
].level
== 0 || inputs
[i
].files
.empty()) {
123 inputs
[i
].atomic_compaction_unit_boundaries
.reserve(inputs
[i
].files
.size());
124 AtomicCompactionUnitBoundary cur_boundary
;
125 size_t first_atomic_idx
= 0;
126 auto add_unit_boundary
= [&](size_t to
) {
127 if (first_atomic_idx
== to
) return;
128 for (size_t k
= first_atomic_idx
; k
< to
; k
++) {
129 inputs
[i
].atomic_compaction_unit_boundaries
.push_back(cur_boundary
);
131 first_atomic_idx
= to
;
133 for (size_t j
= 0; j
< inputs
[i
].files
.size(); j
++) {
134 const auto* f
= inputs
[i
].files
[j
];
136 // First file in a level.
137 cur_boundary
.smallest
= &f
->smallest
;
138 cur_boundary
.largest
= &f
->largest
;
139 } else if (sstableKeyCompare(ucmp
, *cur_boundary
.largest
, f
->smallest
) ==
141 // SSTs overlap but the end key of the previous file was not
142 // artificially extended by a range tombstone. Extend the current
144 cur_boundary
.largest
= &f
->largest
;
146 // Atomic compaction unit has ended.
147 add_unit_boundary(j
);
148 cur_boundary
.smallest
= &f
->smallest
;
149 cur_boundary
.largest
= &f
->largest
;
152 add_unit_boundary(inputs
[i
].files
.size());
153 assert(inputs
[i
].files
.size() ==
154 inputs
[i
].atomic_compaction_unit_boundaries
.size());
159 // helper function to determine if compaction is creating files at the
161 bool Compaction::IsBottommostLevel(
162 int output_level
, VersionStorageInfo
* vstorage
,
163 const std::vector
<CompactionInputFiles
>& inputs
) {
165 if (output_level
== 0) {
167 for (const auto* file
: vstorage
->LevelFiles(0)) {
168 if (inputs
[0].files
.back() == file
) {
173 assert(static_cast<size_t>(output_l0_idx
) < vstorage
->LevelFiles(0).size());
177 Slice smallest_key
, largest_key
;
178 GetBoundaryKeys(vstorage
, inputs
, &smallest_key
, &largest_key
);
179 return !vstorage
->RangeMightExistAfterSortedRun(smallest_key
, largest_key
,
180 output_level
, output_l0_idx
);
183 // test function to validate the functionality of IsBottommostLevel()
184 // function -- determines if compaction with inputs and storage is bottommost
185 bool Compaction::TEST_IsBottommostLevel(
186 int output_level
, VersionStorageInfo
* vstorage
,
187 const std::vector
<CompactionInputFiles
>& inputs
) {
188 return IsBottommostLevel(output_level
, vstorage
, inputs
);
191 bool Compaction::IsFullCompaction(
192 VersionStorageInfo
* vstorage
,
193 const std::vector
<CompactionInputFiles
>& inputs
) {
194 size_t num_files_in_compaction
= 0;
195 size_t total_num_files
= 0;
196 for (int l
= 0; l
< vstorage
->num_levels(); l
++) {
197 total_num_files
+= vstorage
->NumLevelFiles(l
);
199 for (size_t i
= 0; i
< inputs
.size(); i
++) {
200 num_files_in_compaction
+= inputs
[i
].size();
202 return num_files_in_compaction
== total_num_files
;
205 Compaction::Compaction(VersionStorageInfo
* vstorage
,
206 const ImmutableCFOptions
& _immutable_cf_options
,
207 const MutableCFOptions
& _mutable_cf_options
,
208 std::vector
<CompactionInputFiles
> _inputs
,
209 int _output_level
, uint64_t _target_file_size
,
210 uint64_t _max_compaction_bytes
, uint32_t _output_path_id
,
211 CompressionType _compression
,
212 CompressionOptions _compression_opts
,
213 uint32_t _max_subcompactions
,
214 std::vector
<FileMetaData
*> _grandparents
,
215 bool _manual_compaction
, double _score
,
216 bool _deletion_compaction
,
217 CompactionReason _compaction_reason
)
218 : input_vstorage_(vstorage
),
219 start_level_(_inputs
[0].level
),
220 output_level_(_output_level
),
221 max_output_file_size_(_target_file_size
),
222 max_compaction_bytes_(_max_compaction_bytes
),
223 max_subcompactions_(_max_subcompactions
),
224 immutable_cf_options_(_immutable_cf_options
),
225 mutable_cf_options_(_mutable_cf_options
),
226 input_version_(nullptr),
227 number_levels_(vstorage
->num_levels()),
229 output_path_id_(_output_path_id
),
230 output_compression_(_compression
),
231 output_compression_opts_(_compression_opts
),
232 deletion_compaction_(_deletion_compaction
),
233 inputs_(PopulateWithAtomicBoundaries(vstorage
, std::move(_inputs
))),
234 grandparents_(std::move(_grandparents
)),
236 bottommost_level_(IsBottommostLevel(output_level_
, vstorage
, inputs_
)),
237 is_full_compaction_(IsFullCompaction(vstorage
, inputs_
)),
238 is_manual_compaction_(_manual_compaction
),
239 is_trivial_move_(false),
240 compaction_reason_(_compaction_reason
) {
241 MarkFilesBeingCompacted(true);
242 if (is_manual_compaction_
) {
243 compaction_reason_
= CompactionReason::kManualCompaction
;
245 if (max_subcompactions_
== 0) {
246 max_subcompactions_
= immutable_cf_options_
.max_subcompactions
;
248 if (!bottommost_level_
) {
249 // Currently we only enable dictionary compression during compaction to the
251 output_compression_opts_
.max_dict_bytes
= 0;
252 output_compression_opts_
.zstd_max_train_bytes
= 0;
256 for (size_t i
= 1; i
< inputs_
.size(); ++i
) {
257 assert(inputs_
[i
].level
> inputs_
[i
- 1].level
);
261 // setup input_levels_
263 input_levels_
.resize(num_input_levels());
264 for (size_t which
= 0; which
< num_input_levels(); which
++) {
265 DoGenerateLevelFilesBrief(&input_levels_
[which
], inputs_
[which
].files
,
270 GetBoundaryKeys(vstorage
, inputs_
, &smallest_user_key_
, &largest_user_key_
);
273 Compaction::~Compaction() {
274 if (input_version_
!= nullptr) {
275 input_version_
->Unref();
277 if (cfd_
!= nullptr) {
278 cfd_
->UnrefAndTryDelete();
282 bool Compaction::InputCompressionMatchesOutput() const {
283 int base_level
= input_vstorage_
->base_level();
284 bool matches
= (GetCompressionType(immutable_cf_options_
, input_vstorage_
,
285 mutable_cf_options_
, start_level_
,
286 base_level
) == output_compression_
);
288 TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
291 TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch");
295 bool Compaction::IsTrivialMove() const {
296 // Avoid a move if there is lots of overlapping grandparent data.
297 // Otherwise, the move could create a parent file that will require
298 // a very expensive merge later on.
299 // If start_level_== output_level_, the purpose is to force compaction
300 // filter to be applied to that level, and thus cannot be a trivial move.
302 // Check if start level have files with overlapping ranges
303 if (start_level_
== 0 && input_vstorage_
->level0_non_overlapping() == false) {
304 // We cannot move files from L0 to L1 if the files are overlapping
308 if (is_manual_compaction_
&&
309 (immutable_cf_options_
.compaction_filter
!= nullptr ||
310 immutable_cf_options_
.compaction_filter_factory
!= nullptr)) {
311 // This is a manual compaction and we have a compaction filter that should
312 // be executed, we cannot do a trivial move
316 // Used in universal compaction, where trivial move can be done if the
317 // input files are non overlapping
318 if ((mutable_cf_options_
.compaction_options_universal
.allow_trivial_move
) &&
319 (output_level_
!= 0)) {
320 return is_trivial_move_
;
323 if (!(start_level_
!= output_level_
&& num_input_levels() == 1 &&
324 input(0, 0)->fd
.GetPathId() == output_path_id() &&
325 InputCompressionMatchesOutput())) {
329 // assert inputs_.size() == 1
331 for (const auto& file
: inputs_
.front().files
) {
332 std::vector
<FileMetaData
*> file_grand_parents
;
333 if (output_level_
+ 1 >= number_levels_
) {
336 input_vstorage_
->GetOverlappingInputs(output_level_
+ 1, &file
->smallest
,
337 &file
->largest
, &file_grand_parents
);
338 const auto compaction_size
=
339 file
->fd
.GetFileSize() + TotalFileSize(file_grand_parents
);
340 if (compaction_size
> max_compaction_bytes_
) {
348 void Compaction::AddInputDeletions(VersionEdit
* out_edit
) {
349 for (size_t which
= 0; which
< num_input_levels(); which
++) {
350 for (size_t i
= 0; i
< inputs_
[which
].size(); i
++) {
351 out_edit
->DeleteFile(level(which
), inputs_
[which
][i
]->fd
.GetNumber());
356 bool Compaction::KeyNotExistsBeyondOutputLevel(
357 const Slice
& user_key
, std::vector
<size_t>* level_ptrs
) const {
358 assert(input_version_
!= nullptr);
359 assert(level_ptrs
!= nullptr);
360 assert(level_ptrs
->size() == static_cast<size_t>(number_levels_
));
361 if (bottommost_level_
) {
363 } else if (output_level_
!= 0 &&
364 cfd_
->ioptions()->compaction_style
== kCompactionStyleLevel
) {
365 // Maybe use binary search to find right entry instead of linear search?
366 const Comparator
* user_cmp
= cfd_
->user_comparator();
367 for (int lvl
= output_level_
+ 1; lvl
< number_levels_
; lvl
++) {
368 const std::vector
<FileMetaData
*>& files
=
369 input_vstorage_
->LevelFiles(lvl
);
370 for (; level_ptrs
->at(lvl
) < files
.size(); level_ptrs
->at(lvl
)++) {
371 auto* f
= files
[level_ptrs
->at(lvl
)];
372 if (user_cmp
->Compare(user_key
, f
->largest
.user_key()) <= 0) {
373 // We've advanced far enough
374 if (user_cmp
->Compare(user_key
, f
->smallest
.user_key()) >= 0) {
375 // Key falls in this file's range, so it may
376 // exist beyond output level
388 // Mark (or clear) each file that is being compacted
389 void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted
) {
390 for (size_t i
= 0; i
< num_input_levels(); i
++) {
391 for (size_t j
= 0; j
< inputs_
[i
].size(); j
++) {
392 assert(mark_as_compacted
? !inputs_
[i
][j
]->being_compacted
393 : inputs_
[i
][j
]->being_compacted
);
394 inputs_
[i
][j
]->being_compacted
= mark_as_compacted
;
400 // If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5,
401 // print: "3@0 + 2@3 + 1@4 files to L5"
402 const char* Compaction::InputLevelSummary(
403 InputLevelSummaryBuffer
* scratch
) const {
405 bool is_first
= true;
406 for (auto& input_level
: inputs_
) {
407 if (input_level
.empty()) {
412 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
, " + ");
413 len
= std::min(len
, static_cast<int>(sizeof(scratch
->buffer
)));
417 len
+= snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
,
418 "%" ROCKSDB_PRIszt
"@%d", input_level
.size(),
420 len
= std::min(len
, static_cast<int>(sizeof(scratch
->buffer
)));
422 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
,
423 " files to L%d", output_level());
425 return scratch
->buffer
;
428 uint64_t Compaction::CalculateTotalInputSize() const {
430 for (auto& input_level
: inputs_
) {
431 for (auto f
: input_level
.files
) {
432 size
+= f
->fd
.GetFileSize();
438 void Compaction::ReleaseCompactionFiles(Status status
) {
439 MarkFilesBeingCompacted(false);
440 cfd_
->compaction_picker()->ReleaseCompactionFiles(this, status
);
443 void Compaction::ResetNextCompactionIndex() {
444 assert(input_version_
!= nullptr);
445 input_vstorage_
->ResetNextCompactionIndex(start_level_
);
449 int InputSummary(const std::vector
<FileMetaData
*>& files
, char* output
,
453 for (size_t i
= 0; i
< files
.size(); i
++) {
454 int sz
= len
- write
;
457 AppendHumanBytes(files
.at(i
)->fd
.GetFileSize(), sztxt
, 16);
458 ret
= snprintf(output
+ write
, sz
, "%" PRIu64
"(%s) ",
459 files
.at(i
)->fd
.GetNumber(), sztxt
);
460 if (ret
< 0 || ret
>= sz
) break;
463 // if files.size() is non-zero, overwrite the last space
464 return write
- !!files
.size();
468 void Compaction::Summary(char* output
, int len
) {
470 snprintf(output
, len
, "Base version %" PRIu64
" Base level %d, inputs: [",
471 input_version_
->GetVersionNumber(), start_level_
);
472 if (write
< 0 || write
>= len
) {
476 for (size_t level_iter
= 0; level_iter
< num_input_levels(); ++level_iter
) {
477 if (level_iter
> 0) {
478 write
+= snprintf(output
+ write
, len
- write
, "], [");
479 if (write
< 0 || write
>= len
) {
484 InputSummary(inputs_
[level_iter
].files
, output
+ write
, len
- write
);
485 if (write
< 0 || write
>= len
) {
490 snprintf(output
+ write
, len
- write
, "]");
493 uint64_t Compaction::OutputFilePreallocationSize() const {
494 uint64_t preallocation_size
= 0;
496 for (const auto& level_files
: inputs_
) {
497 for (const auto& file
: level_files
.files
) {
498 preallocation_size
+= file
->fd
.GetFileSize();
502 if (max_output_file_size_
!= port::kMaxUint64
&&
503 (immutable_cf_options_
.compaction_style
== kCompactionStyleLevel
||
504 output_level() > 0)) {
505 preallocation_size
= std::min(max_output_file_size_
, preallocation_size
);
508 // Over-estimate slightly so we don't end up just barely crossing
510 // No point to prellocate more than 1GB.
511 return std::min(uint64_t{1073741824},
512 preallocation_size
+ (preallocation_size
/ 10));
515 std::unique_ptr
<CompactionFilter
> Compaction::CreateCompactionFilter() const {
516 if (!cfd_
->ioptions()->compaction_filter_factory
) {
520 CompactionFilter::Context context
;
521 context
.is_full_compaction
= is_full_compaction_
;
522 context
.is_manual_compaction
= is_manual_compaction_
;
523 context
.column_family_id
= cfd_
->GetID();
524 return cfd_
->ioptions()->compaction_filter_factory
->CreateCompactionFilter(
528 bool Compaction::IsOutputLevelEmpty() const {
529 return inputs_
.back().level
!= output_level_
|| inputs_
.back().empty();
532 bool Compaction::ShouldFormSubcompactions() const {
533 if (max_subcompactions_
<= 1 || cfd_
== nullptr) {
536 if (cfd_
->ioptions()->compaction_style
== kCompactionStyleLevel
) {
537 return (start_level_
== 0 || is_manual_compaction_
) && output_level_
> 0 &&
538 !IsOutputLevelEmpty();
539 } else if (cfd_
->ioptions()->compaction_style
== kCompactionStyleUniversal
) {
540 return number_levels_
> 1 && output_level_
> 0;
546 uint64_t Compaction::MinInputFileOldestAncesterTime() const {
547 uint64_t min_oldest_ancester_time
= port::kMaxUint64
;
548 for (const auto& level_files
: inputs_
) {
549 for (const auto& file
: level_files
.files
) {
550 uint64_t oldest_ancester_time
= file
->TryGetOldestAncesterTime();
551 if (oldest_ancester_time
!= 0) {
552 min_oldest_ancester_time
=
553 std::min(min_oldest_ancester_time
, oldest_ancester_time
);
557 return min_oldest_ancester_time
;
560 int Compaction::GetInputBaseLevel() const {
561 return input_vstorage_
->base_level();
564 } // namespace ROCKSDB_NAMESPACE