1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/compaction/compaction.h"
15 #include "db/column_family.h"
16 #include "rocksdb/compaction_filter.h"
17 #include "rocksdb/sst_partitioner.h"
18 #include "test_util/sync_point.h"
19 #include "util/string_util.h"
21 namespace ROCKSDB_NAMESPACE
{
23 const uint64_t kRangeTombstoneSentinel
=
24 PackSequenceAndType(kMaxSequenceNumber
, kTypeRangeDeletion
);
26 int sstableKeyCompare(const Comparator
* user_cmp
, const InternalKey
& a
,
27 const InternalKey
& b
) {
28 auto c
= user_cmp
->CompareWithoutTimestamp(a
.user_key(), b
.user_key());
32 auto a_footer
= ExtractInternalKeyFooter(a
.Encode());
33 auto b_footer
= ExtractInternalKeyFooter(b
.Encode());
34 if (a_footer
== kRangeTombstoneSentinel
) {
35 if (b_footer
!= kRangeTombstoneSentinel
) {
38 } else if (b_footer
== kRangeTombstoneSentinel
) {
44 int sstableKeyCompare(const Comparator
* user_cmp
, const InternalKey
* a
,
45 const InternalKey
& b
) {
49 return sstableKeyCompare(user_cmp
, *a
, b
);
52 int sstableKeyCompare(const Comparator
* user_cmp
, const InternalKey
& a
,
53 const InternalKey
* b
) {
57 return sstableKeyCompare(user_cmp
, a
, *b
);
60 uint64_t TotalFileSize(const std::vector
<FileMetaData
*>& files
) {
62 for (size_t i
= 0; i
< files
.size() && files
[i
]; i
++) {
63 sum
+= files
[i
]->fd
.GetFileSize();
68 void Compaction::SetInputVersion(Version
* _input_version
) {
69 input_version_
= _input_version
;
70 cfd_
= input_version_
->cfd();
73 input_version_
->Ref();
74 edit_
.SetColumnFamily(cfd_
->GetID());
77 void Compaction::GetBoundaryKeys(
78 VersionStorageInfo
* vstorage
,
79 const std::vector
<CompactionInputFiles
>& inputs
, Slice
* smallest_user_key
,
80 Slice
* largest_user_key
) {
81 bool initialized
= false;
82 const Comparator
* ucmp
= vstorage
->InternalComparator()->user_comparator();
83 for (size_t i
= 0; i
< inputs
.size(); ++i
) {
84 if (inputs
[i
].files
.empty()) {
87 if (inputs
[i
].level
== 0) {
88 // we need to consider all files on level 0
89 for (const auto* f
: inputs
[i
].files
) {
90 const Slice
& start_user_key
= f
->smallest
.user_key();
92 ucmp
->Compare(start_user_key
, *smallest_user_key
) < 0) {
93 *smallest_user_key
= start_user_key
;
95 const Slice
& end_user_key
= f
->largest
.user_key();
97 ucmp
->Compare(end_user_key
, *largest_user_key
) > 0) {
98 *largest_user_key
= end_user_key
;
103 // we only need to consider the first and last file
104 const Slice
& start_user_key
= inputs
[i
].files
[0]->smallest
.user_key();
106 ucmp
->Compare(start_user_key
, *smallest_user_key
) < 0) {
107 *smallest_user_key
= start_user_key
;
109 const Slice
& end_user_key
= inputs
[i
].files
.back()->largest
.user_key();
110 if (!initialized
|| ucmp
->Compare(end_user_key
, *largest_user_key
) > 0) {
111 *largest_user_key
= end_user_key
;
118 std::vector
<CompactionInputFiles
> Compaction::PopulateWithAtomicBoundaries(
119 VersionStorageInfo
* vstorage
, std::vector
<CompactionInputFiles
> inputs
) {
120 const Comparator
* ucmp
= vstorage
->InternalComparator()->user_comparator();
121 for (size_t i
= 0; i
< inputs
.size(); i
++) {
122 if (inputs
[i
].level
== 0 || inputs
[i
].files
.empty()) {
125 inputs
[i
].atomic_compaction_unit_boundaries
.reserve(inputs
[i
].files
.size());
126 AtomicCompactionUnitBoundary cur_boundary
;
127 size_t first_atomic_idx
= 0;
128 auto add_unit_boundary
= [&](size_t to
) {
129 if (first_atomic_idx
== to
) return;
130 for (size_t k
= first_atomic_idx
; k
< to
; k
++) {
131 inputs
[i
].atomic_compaction_unit_boundaries
.push_back(cur_boundary
);
133 first_atomic_idx
= to
;
135 for (size_t j
= 0; j
< inputs
[i
].files
.size(); j
++) {
136 const auto* f
= inputs
[i
].files
[j
];
138 // First file in a level.
139 cur_boundary
.smallest
= &f
->smallest
;
140 cur_boundary
.largest
= &f
->largest
;
141 } else if (sstableKeyCompare(ucmp
, *cur_boundary
.largest
, f
->smallest
) ==
143 // SSTs overlap but the end key of the previous file was not
144 // artificially extended by a range tombstone. Extend the current
146 cur_boundary
.largest
= &f
->largest
;
148 // Atomic compaction unit has ended.
149 add_unit_boundary(j
);
150 cur_boundary
.smallest
= &f
->smallest
;
151 cur_boundary
.largest
= &f
->largest
;
154 add_unit_boundary(inputs
[i
].files
.size());
155 assert(inputs
[i
].files
.size() ==
156 inputs
[i
].atomic_compaction_unit_boundaries
.size());
161 // helper function to determine if compaction is creating files at the
163 bool Compaction::IsBottommostLevel(
164 int output_level
, VersionStorageInfo
* vstorage
,
165 const std::vector
<CompactionInputFiles
>& inputs
) {
167 if (output_level
== 0) {
169 for (const auto* file
: vstorage
->LevelFiles(0)) {
170 if (inputs
[0].files
.back() == file
) {
175 assert(static_cast<size_t>(output_l0_idx
) < vstorage
->LevelFiles(0).size());
179 Slice smallest_key
, largest_key
;
180 GetBoundaryKeys(vstorage
, inputs
, &smallest_key
, &largest_key
);
181 return !vstorage
->RangeMightExistAfterSortedRun(smallest_key
, largest_key
,
182 output_level
, output_l0_idx
);
185 // test function to validate the functionality of IsBottommostLevel()
186 // function -- determines if compaction with inputs and storage is bottommost
187 bool Compaction::TEST_IsBottommostLevel(
188 int output_level
, VersionStorageInfo
* vstorage
,
189 const std::vector
<CompactionInputFiles
>& inputs
) {
190 return IsBottommostLevel(output_level
, vstorage
, inputs
);
193 bool Compaction::IsFullCompaction(
194 VersionStorageInfo
* vstorage
,
195 const std::vector
<CompactionInputFiles
>& inputs
) {
196 size_t num_files_in_compaction
= 0;
197 size_t total_num_files
= 0;
198 for (int l
= 0; l
< vstorage
->num_levels(); l
++) {
199 total_num_files
+= vstorage
->NumLevelFiles(l
);
201 for (size_t i
= 0; i
< inputs
.size(); i
++) {
202 num_files_in_compaction
+= inputs
[i
].size();
204 return num_files_in_compaction
== total_num_files
;
207 Compaction::Compaction(VersionStorageInfo
* vstorage
,
208 const ImmutableCFOptions
& _immutable_cf_options
,
209 const MutableCFOptions
& _mutable_cf_options
,
210 const MutableDBOptions
& _mutable_db_options
,
211 std::vector
<CompactionInputFiles
> _inputs
,
212 int _output_level
, uint64_t _target_file_size
,
213 uint64_t _max_compaction_bytes
, uint32_t _output_path_id
,
214 CompressionType _compression
,
215 CompressionOptions _compression_opts
,
216 uint32_t _max_subcompactions
,
217 std::vector
<FileMetaData
*> _grandparents
,
218 bool _manual_compaction
, double _score
,
219 bool _deletion_compaction
,
220 CompactionReason _compaction_reason
)
221 : input_vstorage_(vstorage
),
222 start_level_(_inputs
[0].level
),
223 output_level_(_output_level
),
224 max_output_file_size_(_target_file_size
),
225 max_compaction_bytes_(_max_compaction_bytes
),
226 max_subcompactions_(_max_subcompactions
),
227 immutable_cf_options_(_immutable_cf_options
),
228 mutable_cf_options_(_mutable_cf_options
),
229 input_version_(nullptr),
230 number_levels_(vstorage
->num_levels()),
232 output_path_id_(_output_path_id
),
233 output_compression_(_compression
),
234 output_compression_opts_(_compression_opts
),
235 deletion_compaction_(_deletion_compaction
),
236 inputs_(PopulateWithAtomicBoundaries(vstorage
, std::move(_inputs
))),
237 grandparents_(std::move(_grandparents
)),
239 bottommost_level_(IsBottommostLevel(output_level_
, vstorage
, inputs_
)),
240 is_full_compaction_(IsFullCompaction(vstorage
, inputs_
)),
241 is_manual_compaction_(_manual_compaction
),
242 is_trivial_move_(false),
243 compaction_reason_(_compaction_reason
) {
244 MarkFilesBeingCompacted(true);
245 if (is_manual_compaction_
) {
246 compaction_reason_
= CompactionReason::kManualCompaction
;
248 if (max_subcompactions_
== 0) {
249 max_subcompactions_
= _mutable_db_options
.max_subcompactions
;
253 for (size_t i
= 1; i
< inputs_
.size(); ++i
) {
254 assert(inputs_
[i
].level
> inputs_
[i
- 1].level
);
258 // setup input_levels_
260 input_levels_
.resize(num_input_levels());
261 for (size_t which
= 0; which
< num_input_levels(); which
++) {
262 DoGenerateLevelFilesBrief(&input_levels_
[which
], inputs_
[which
].files
,
267 GetBoundaryKeys(vstorage
, inputs_
, &smallest_user_key_
, &largest_user_key_
);
270 Compaction::~Compaction() {
271 if (input_version_
!= nullptr) {
272 input_version_
->Unref();
274 if (cfd_
!= nullptr) {
275 cfd_
->UnrefAndTryDelete();
279 bool Compaction::InputCompressionMatchesOutput() const {
280 int base_level
= input_vstorage_
->base_level();
281 bool matches
= (GetCompressionType(immutable_cf_options_
, input_vstorage_
,
282 mutable_cf_options_
, start_level_
,
283 base_level
) == output_compression_
);
285 TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
288 TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch");
292 bool Compaction::IsTrivialMove() const {
293 // Avoid a move if there is lots of overlapping grandparent data.
294 // Otherwise, the move could create a parent file that will require
295 // a very expensive merge later on.
296 // If start_level_== output_level_, the purpose is to force compaction
297 // filter to be applied to that level, and thus cannot be a trivial move.
299 // Check if start level have files with overlapping ranges
300 if (start_level_
== 0 && input_vstorage_
->level0_non_overlapping() == false) {
301 // We cannot move files from L0 to L1 if the files are overlapping
305 if (is_manual_compaction_
&&
306 (immutable_cf_options_
.compaction_filter
!= nullptr ||
307 immutable_cf_options_
.compaction_filter_factory
!= nullptr)) {
308 // This is a manual compaction and we have a compaction filter that should
309 // be executed, we cannot do a trivial move
313 // Used in universal compaction, where trivial move can be done if the
314 // input files are non overlapping
315 if ((mutable_cf_options_
.compaction_options_universal
.allow_trivial_move
) &&
316 (output_level_
!= 0)) {
317 return is_trivial_move_
;
320 if (!(start_level_
!= output_level_
&& num_input_levels() == 1 &&
321 input(0, 0)->fd
.GetPathId() == output_path_id() &&
322 InputCompressionMatchesOutput())) {
326 // assert inputs_.size() == 1
328 std::unique_ptr
<SstPartitioner
> partitioner
= CreateSstPartitioner();
330 for (const auto& file
: inputs_
.front().files
) {
331 std::vector
<FileMetaData
*> file_grand_parents
;
332 if (output_level_
+ 1 >= number_levels_
) {
335 input_vstorage_
->GetOverlappingInputs(output_level_
+ 1, &file
->smallest
,
336 &file
->largest
, &file_grand_parents
);
337 const auto compaction_size
=
338 file
->fd
.GetFileSize() + TotalFileSize(file_grand_parents
);
339 if (compaction_size
> max_compaction_bytes_
) {
343 if (partitioner
.get() != nullptr) {
344 if (!partitioner
->CanDoTrivialMove(file
->smallest
.user_key(),
345 file
->largest
.user_key())) {
354 void Compaction::AddInputDeletions(VersionEdit
* out_edit
) {
355 for (size_t which
= 0; which
< num_input_levels(); which
++) {
356 for (size_t i
= 0; i
< inputs_
[which
].size(); i
++) {
357 out_edit
->DeleteFile(level(which
), inputs_
[which
][i
]->fd
.GetNumber());
362 bool Compaction::KeyNotExistsBeyondOutputLevel(
363 const Slice
& user_key
, std::vector
<size_t>* level_ptrs
) const {
364 assert(input_version_
!= nullptr);
365 assert(level_ptrs
!= nullptr);
366 assert(level_ptrs
->size() == static_cast<size_t>(number_levels_
));
367 if (bottommost_level_
) {
369 } else if (output_level_
!= 0 &&
370 cfd_
->ioptions()->compaction_style
== kCompactionStyleLevel
) {
371 // Maybe use binary search to find right entry instead of linear search?
372 const Comparator
* user_cmp
= cfd_
->user_comparator();
373 for (int lvl
= output_level_
+ 1; lvl
< number_levels_
; lvl
++) {
374 const std::vector
<FileMetaData
*>& files
=
375 input_vstorage_
->LevelFiles(lvl
);
376 for (; level_ptrs
->at(lvl
) < files
.size(); level_ptrs
->at(lvl
)++) {
377 auto* f
= files
[level_ptrs
->at(lvl
)];
378 if (user_cmp
->Compare(user_key
, f
->largest
.user_key()) <= 0) {
379 // We've advanced far enough
380 // In the presence of user-defined timestamp, we may need to handle
381 // the case in which f->smallest.user_key() (including ts) has the
382 // same user key, but the ts part is smaller. If so,
383 // Compare(user_key, f->smallest.user_key()) returns -1.
384 // That's why we need CompareWithoutTimestamp().
385 if (user_cmp
->CompareWithoutTimestamp(user_key
,
386 f
->smallest
.user_key()) >= 0) {
387 // Key falls in this file's range, so it may
388 // exist beyond output level
400 // Mark (or clear) each file that is being compacted
401 void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted
) {
402 for (size_t i
= 0; i
< num_input_levels(); i
++) {
403 for (size_t j
= 0; j
< inputs_
[i
].size(); j
++) {
404 assert(mark_as_compacted
? !inputs_
[i
][j
]->being_compacted
405 : inputs_
[i
][j
]->being_compacted
);
406 inputs_
[i
][j
]->being_compacted
= mark_as_compacted
;
412 // If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5,
413 // print: "3@0 + 2@3 + 1@4 files to L5"
414 const char* Compaction::InputLevelSummary(
415 InputLevelSummaryBuffer
* scratch
) const {
417 bool is_first
= true;
418 for (auto& input_level
: inputs_
) {
419 if (input_level
.empty()) {
424 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
, " + ");
425 len
= std::min(len
, static_cast<int>(sizeof(scratch
->buffer
)));
429 len
+= snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
,
430 "%" ROCKSDB_PRIszt
"@%d", input_level
.size(),
432 len
= std::min(len
, static_cast<int>(sizeof(scratch
->buffer
)));
434 snprintf(scratch
->buffer
+ len
, sizeof(scratch
->buffer
) - len
,
435 " files to L%d", output_level());
437 return scratch
->buffer
;
440 uint64_t Compaction::CalculateTotalInputSize() const {
442 for (auto& input_level
: inputs_
) {
443 for (auto f
: input_level
.files
) {
444 size
+= f
->fd
.GetFileSize();
450 void Compaction::ReleaseCompactionFiles(Status status
) {
451 MarkFilesBeingCompacted(false);
452 cfd_
->compaction_picker()->ReleaseCompactionFiles(this, status
);
455 void Compaction::ResetNextCompactionIndex() {
456 assert(input_version_
!= nullptr);
457 input_vstorage_
->ResetNextCompactionIndex(start_level_
);
461 int InputSummary(const std::vector
<FileMetaData
*>& files
, char* output
,
465 for (size_t i
= 0; i
< files
.size(); i
++) {
466 int sz
= len
- write
;
469 AppendHumanBytes(files
.at(i
)->fd
.GetFileSize(), sztxt
, 16);
470 ret
= snprintf(output
+ write
, sz
, "%" PRIu64
"(%s) ",
471 files
.at(i
)->fd
.GetNumber(), sztxt
);
472 if (ret
< 0 || ret
>= sz
) break;
475 // if files.size() is non-zero, overwrite the last space
476 return write
- !!files
.size();
480 void Compaction::Summary(char* output
, int len
) {
482 snprintf(output
, len
, "Base version %" PRIu64
" Base level %d, inputs: [",
483 input_version_
->GetVersionNumber(), start_level_
);
484 if (write
< 0 || write
>= len
) {
488 for (size_t level_iter
= 0; level_iter
< num_input_levels(); ++level_iter
) {
489 if (level_iter
> 0) {
490 write
+= snprintf(output
+ write
, len
- write
, "], [");
491 if (write
< 0 || write
>= len
) {
496 InputSummary(inputs_
[level_iter
].files
, output
+ write
, len
- write
);
497 if (write
< 0 || write
>= len
) {
502 snprintf(output
+ write
, len
- write
, "]");
505 uint64_t Compaction::OutputFilePreallocationSize() const {
506 uint64_t preallocation_size
= 0;
508 for (const auto& level_files
: inputs_
) {
509 for (const auto& file
: level_files
.files
) {
510 preallocation_size
+= file
->fd
.GetFileSize();
514 if (max_output_file_size_
!= port::kMaxUint64
&&
515 (immutable_cf_options_
.compaction_style
== kCompactionStyleLevel
||
516 output_level() > 0)) {
517 preallocation_size
= std::min(max_output_file_size_
, preallocation_size
);
520 // Over-estimate slightly so we don't end up just barely crossing
522 // No point to prellocate more than 1GB.
523 return std::min(uint64_t{1073741824},
524 preallocation_size
+ (preallocation_size
/ 10));
527 std::unique_ptr
<CompactionFilter
> Compaction::CreateCompactionFilter() const {
528 if (!cfd_
->ioptions()->compaction_filter_factory
) {
532 CompactionFilter::Context context
;
533 context
.is_full_compaction
= is_full_compaction_
;
534 context
.is_manual_compaction
= is_manual_compaction_
;
535 context
.column_family_id
= cfd_
->GetID();
536 return cfd_
->ioptions()->compaction_filter_factory
->CreateCompactionFilter(
540 std::unique_ptr
<SstPartitioner
> Compaction::CreateSstPartitioner() const {
541 if (!immutable_cf_options_
.sst_partitioner_factory
) {
545 SstPartitioner::Context context
;
546 context
.is_full_compaction
= is_full_compaction_
;
547 context
.is_manual_compaction
= is_manual_compaction_
;
548 context
.output_level
= output_level_
;
549 context
.smallest_user_key
= smallest_user_key_
;
550 context
.largest_user_key
= largest_user_key_
;
551 return immutable_cf_options_
.sst_partitioner_factory
->CreatePartitioner(
555 bool Compaction::IsOutputLevelEmpty() const {
556 return inputs_
.back().level
!= output_level_
|| inputs_
.back().empty();
559 bool Compaction::ShouldFormSubcompactions() const {
560 if (max_subcompactions_
<= 1 || cfd_
== nullptr) {
563 if (cfd_
->ioptions()->compaction_style
== kCompactionStyleLevel
) {
564 return (start_level_
== 0 || is_manual_compaction_
) && output_level_
> 0 &&
565 !IsOutputLevelEmpty();
566 } else if (cfd_
->ioptions()->compaction_style
== kCompactionStyleUniversal
) {
567 return number_levels_
> 1 && output_level_
> 0;
573 uint64_t Compaction::MinInputFileOldestAncesterTime() const {
574 uint64_t min_oldest_ancester_time
= port::kMaxUint64
;
575 for (const auto& level_files
: inputs_
) {
576 for (const auto& file
: level_files
.files
) {
577 uint64_t oldest_ancester_time
= file
->TryGetOldestAncesterTime();
578 if (oldest_ancester_time
!= 0) {
579 min_oldest_ancester_time
=
580 std::min(min_oldest_ancester_time
, oldest_ancester_time
);
584 return min_oldest_ancester_time
;
587 int Compaction::GetInputBaseLevel() const {
588 return input_vstorage_
->base_level();
591 } // namespace ROCKSDB_NAMESPACE