]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction/compaction.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/compaction/compaction.h"
11
12 #include <cinttypes>
13 #include <vector>
14
15 #include "db/column_family.h"
16 #include "rocksdb/compaction_filter.h"
17 #include "rocksdb/sst_partitioner.h"
18 #include "test_util/sync_point.h"
19 #include "util/string_util.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
23 const uint64_t kRangeTombstoneSentinel =
24 PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
25
26 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
27 const InternalKey& b) {
28 auto c = user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key());
29 if (c != 0) {
30 return c;
31 }
32 auto a_footer = ExtractInternalKeyFooter(a.Encode());
33 auto b_footer = ExtractInternalKeyFooter(b.Encode());
34 if (a_footer == kRangeTombstoneSentinel) {
35 if (b_footer != kRangeTombstoneSentinel) {
36 return -1;
37 }
38 } else if (b_footer == kRangeTombstoneSentinel) {
39 return 1;
40 }
41 return 0;
42 }
43
44 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
45 const InternalKey& b) {
46 if (a == nullptr) {
47 return -1;
48 }
49 return sstableKeyCompare(user_cmp, *a, b);
50 }
51
52 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
53 const InternalKey* b) {
54 if (b == nullptr) {
55 return -1;
56 }
57 return sstableKeyCompare(user_cmp, a, *b);
58 }
59
60 uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
61 uint64_t sum = 0;
62 for (size_t i = 0; i < files.size() && files[i]; i++) {
63 sum += files[i]->fd.GetFileSize();
64 }
65 return sum;
66 }
67
68 void Compaction::SetInputVersion(Version* _input_version) {
69 input_version_ = _input_version;
70 cfd_ = input_version_->cfd();
71
72 cfd_->Ref();
73 input_version_->Ref();
74 edit_.SetColumnFamily(cfd_->GetID());
75 }
76
77 void Compaction::GetBoundaryKeys(
78 VersionStorageInfo* vstorage,
79 const std::vector<CompactionInputFiles>& inputs, Slice* smallest_user_key,
80 Slice* largest_user_key) {
81 bool initialized = false;
82 const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
83 for (size_t i = 0; i < inputs.size(); ++i) {
84 if (inputs[i].files.empty()) {
85 continue;
86 }
87 if (inputs[i].level == 0) {
88 // we need to consider all files on level 0
89 for (const auto* f : inputs[i].files) {
90 const Slice& start_user_key = f->smallest.user_key();
91 if (!initialized ||
92 ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
93 *smallest_user_key = start_user_key;
94 }
95 const Slice& end_user_key = f->largest.user_key();
96 if (!initialized ||
97 ucmp->Compare(end_user_key, *largest_user_key) > 0) {
98 *largest_user_key = end_user_key;
99 }
100 initialized = true;
101 }
102 } else {
103 // we only need to consider the first and last file
104 const Slice& start_user_key = inputs[i].files[0]->smallest.user_key();
105 if (!initialized ||
106 ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
107 *smallest_user_key = start_user_key;
108 }
109 const Slice& end_user_key = inputs[i].files.back()->largest.user_key();
110 if (!initialized || ucmp->Compare(end_user_key, *largest_user_key) > 0) {
111 *largest_user_key = end_user_key;
112 }
113 initialized = true;
114 }
115 }
116 }
117
118 std::vector<CompactionInputFiles> Compaction::PopulateWithAtomicBoundaries(
119 VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs) {
120 const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
121 for (size_t i = 0; i < inputs.size(); i++) {
122 if (inputs[i].level == 0 || inputs[i].files.empty()) {
123 continue;
124 }
125 inputs[i].atomic_compaction_unit_boundaries.reserve(inputs[i].files.size());
126 AtomicCompactionUnitBoundary cur_boundary;
127 size_t first_atomic_idx = 0;
128 auto add_unit_boundary = [&](size_t to) {
129 if (first_atomic_idx == to) return;
130 for (size_t k = first_atomic_idx; k < to; k++) {
131 inputs[i].atomic_compaction_unit_boundaries.push_back(cur_boundary);
132 }
133 first_atomic_idx = to;
134 };
135 for (size_t j = 0; j < inputs[i].files.size(); j++) {
136 const auto* f = inputs[i].files[j];
137 if (j == 0) {
138 // First file in a level.
139 cur_boundary.smallest = &f->smallest;
140 cur_boundary.largest = &f->largest;
141 } else if (sstableKeyCompare(ucmp, *cur_boundary.largest, f->smallest) ==
142 0) {
143 // SSTs overlap but the end key of the previous file was not
144 // artificially extended by a range tombstone. Extend the current
145 // boundary.
146 cur_boundary.largest = &f->largest;
147 } else {
148 // Atomic compaction unit has ended.
149 add_unit_boundary(j);
150 cur_boundary.smallest = &f->smallest;
151 cur_boundary.largest = &f->largest;
152 }
153 }
154 add_unit_boundary(inputs[i].files.size());
155 assert(inputs[i].files.size() ==
156 inputs[i].atomic_compaction_unit_boundaries.size());
157 }
158 return inputs;
159 }
160
161 // helper function to determine if compaction is creating files at the
162 // bottommost level
163 bool Compaction::IsBottommostLevel(
164 int output_level, VersionStorageInfo* vstorage,
165 const std::vector<CompactionInputFiles>& inputs) {
166 int output_l0_idx;
167 if (output_level == 0) {
168 output_l0_idx = 0;
169 for (const auto* file : vstorage->LevelFiles(0)) {
170 if (inputs[0].files.back() == file) {
171 break;
172 }
173 ++output_l0_idx;
174 }
175 assert(static_cast<size_t>(output_l0_idx) < vstorage->LevelFiles(0).size());
176 } else {
177 output_l0_idx = -1;
178 }
179 Slice smallest_key, largest_key;
180 GetBoundaryKeys(vstorage, inputs, &smallest_key, &largest_key);
181 return !vstorage->RangeMightExistAfterSortedRun(smallest_key, largest_key,
182 output_level, output_l0_idx);
183 }
184
185 // test function to validate the functionality of IsBottommostLevel()
186 // function -- determines if compaction with inputs and storage is bottommost
187 bool Compaction::TEST_IsBottommostLevel(
188 int output_level, VersionStorageInfo* vstorage,
189 const std::vector<CompactionInputFiles>& inputs) {
190 return IsBottommostLevel(output_level, vstorage, inputs);
191 }
192
193 bool Compaction::IsFullCompaction(
194 VersionStorageInfo* vstorage,
195 const std::vector<CompactionInputFiles>& inputs) {
196 size_t num_files_in_compaction = 0;
197 size_t total_num_files = 0;
198 for (int l = 0; l < vstorage->num_levels(); l++) {
199 total_num_files += vstorage->NumLevelFiles(l);
200 }
201 for (size_t i = 0; i < inputs.size(); i++) {
202 num_files_in_compaction += inputs[i].size();
203 }
204 return num_files_in_compaction == total_num_files;
205 }
206
207 Compaction::Compaction(VersionStorageInfo* vstorage,
208 const ImmutableCFOptions& _immutable_cf_options,
209 const MutableCFOptions& _mutable_cf_options,
210 const MutableDBOptions& _mutable_db_options,
211 std::vector<CompactionInputFiles> _inputs,
212 int _output_level, uint64_t _target_file_size,
213 uint64_t _max_compaction_bytes, uint32_t _output_path_id,
214 CompressionType _compression,
215 CompressionOptions _compression_opts,
216 uint32_t _max_subcompactions,
217 std::vector<FileMetaData*> _grandparents,
218 bool _manual_compaction, double _score,
219 bool _deletion_compaction,
220 CompactionReason _compaction_reason)
221 : input_vstorage_(vstorage),
222 start_level_(_inputs[0].level),
223 output_level_(_output_level),
224 max_output_file_size_(_target_file_size),
225 max_compaction_bytes_(_max_compaction_bytes),
226 max_subcompactions_(_max_subcompactions),
227 immutable_cf_options_(_immutable_cf_options),
228 mutable_cf_options_(_mutable_cf_options),
229 input_version_(nullptr),
230 number_levels_(vstorage->num_levels()),
231 cfd_(nullptr),
232 output_path_id_(_output_path_id),
233 output_compression_(_compression),
234 output_compression_opts_(_compression_opts),
235 deletion_compaction_(_deletion_compaction),
236 inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
237 grandparents_(std::move(_grandparents)),
238 score_(_score),
239 bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
240 is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
241 is_manual_compaction_(_manual_compaction),
242 is_trivial_move_(false),
243 compaction_reason_(_compaction_reason) {
244 MarkFilesBeingCompacted(true);
245 if (is_manual_compaction_) {
246 compaction_reason_ = CompactionReason::kManualCompaction;
247 }
248 if (max_subcompactions_ == 0) {
249 max_subcompactions_ = _mutable_db_options.max_subcompactions;
250 }
251
252 #ifndef NDEBUG
253 for (size_t i = 1; i < inputs_.size(); ++i) {
254 assert(inputs_[i].level > inputs_[i - 1].level);
255 }
256 #endif
257
258 // setup input_levels_
259 {
260 input_levels_.resize(num_input_levels());
261 for (size_t which = 0; which < num_input_levels(); which++) {
262 DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files,
263 &arena_);
264 }
265 }
266
267 GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_);
268 }
269
270 Compaction::~Compaction() {
271 if (input_version_ != nullptr) {
272 input_version_->Unref();
273 }
274 if (cfd_ != nullptr) {
275 cfd_->UnrefAndTryDelete();
276 }
277 }
278
279 bool Compaction::InputCompressionMatchesOutput() const {
280 int base_level = input_vstorage_->base_level();
281 bool matches = (GetCompressionType(immutable_cf_options_, input_vstorage_,
282 mutable_cf_options_, start_level_,
283 base_level) == output_compression_);
284 if (matches) {
285 TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
286 return true;
287 }
288 TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch");
289 return matches;
290 }
291
292 bool Compaction::IsTrivialMove() const {
293 // Avoid a move if there is lots of overlapping grandparent data.
294 // Otherwise, the move could create a parent file that will require
295 // a very expensive merge later on.
296 // If start_level_== output_level_, the purpose is to force compaction
297 // filter to be applied to that level, and thus cannot be a trivial move.
298
299 // Check if start level have files with overlapping ranges
300 if (start_level_ == 0 && input_vstorage_->level0_non_overlapping() == false) {
301 // We cannot move files from L0 to L1 if the files are overlapping
302 return false;
303 }
304
305 if (is_manual_compaction_ &&
306 (immutable_cf_options_.compaction_filter != nullptr ||
307 immutable_cf_options_.compaction_filter_factory != nullptr)) {
308 // This is a manual compaction and we have a compaction filter that should
309 // be executed, we cannot do a trivial move
310 return false;
311 }
312
313 // Used in universal compaction, where trivial move can be done if the
314 // input files are non overlapping
315 if ((mutable_cf_options_.compaction_options_universal.allow_trivial_move) &&
316 (output_level_ != 0)) {
317 return is_trivial_move_;
318 }
319
320 if (!(start_level_ != output_level_ && num_input_levels() == 1 &&
321 input(0, 0)->fd.GetPathId() == output_path_id() &&
322 InputCompressionMatchesOutput())) {
323 return false;
324 }
325
326 // assert inputs_.size() == 1
327
328 std::unique_ptr<SstPartitioner> partitioner = CreateSstPartitioner();
329
330 for (const auto& file : inputs_.front().files) {
331 std::vector<FileMetaData*> file_grand_parents;
332 if (output_level_ + 1 >= number_levels_) {
333 continue;
334 }
335 input_vstorage_->GetOverlappingInputs(output_level_ + 1, &file->smallest,
336 &file->largest, &file_grand_parents);
337 const auto compaction_size =
338 file->fd.GetFileSize() + TotalFileSize(file_grand_parents);
339 if (compaction_size > max_compaction_bytes_) {
340 return false;
341 }
342
343 if (partitioner.get() != nullptr) {
344 if (!partitioner->CanDoTrivialMove(file->smallest.user_key(),
345 file->largest.user_key())) {
346 return false;
347 }
348 }
349 }
350
351 return true;
352 }
353
354 void Compaction::AddInputDeletions(VersionEdit* out_edit) {
355 for (size_t which = 0; which < num_input_levels(); which++) {
356 for (size_t i = 0; i < inputs_[which].size(); i++) {
357 out_edit->DeleteFile(level(which), inputs_[which][i]->fd.GetNumber());
358 }
359 }
360 }
361
362 bool Compaction::KeyNotExistsBeyondOutputLevel(
363 const Slice& user_key, std::vector<size_t>* level_ptrs) const {
364 assert(input_version_ != nullptr);
365 assert(level_ptrs != nullptr);
366 assert(level_ptrs->size() == static_cast<size_t>(number_levels_));
367 if (bottommost_level_) {
368 return true;
369 } else if (output_level_ != 0 &&
370 cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
371 // Maybe use binary search to find right entry instead of linear search?
372 const Comparator* user_cmp = cfd_->user_comparator();
373 for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
374 const std::vector<FileMetaData*>& files =
375 input_vstorage_->LevelFiles(lvl);
376 for (; level_ptrs->at(lvl) < files.size(); level_ptrs->at(lvl)++) {
377 auto* f = files[level_ptrs->at(lvl)];
378 if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
379 // We've advanced far enough
380 // In the presence of user-defined timestamp, we may need to handle
381 // the case in which f->smallest.user_key() (including ts) has the
382 // same user key, but the ts part is smaller. If so,
383 // Compare(user_key, f->smallest.user_key()) returns -1.
384 // That's why we need CompareWithoutTimestamp().
385 if (user_cmp->CompareWithoutTimestamp(user_key,
386 f->smallest.user_key()) >= 0) {
387 // Key falls in this file's range, so it may
388 // exist beyond output level
389 return false;
390 }
391 break;
392 }
393 }
394 }
395 return true;
396 }
397 return false;
398 }
399
400 // Mark (or clear) each file that is being compacted
401 void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
402 for (size_t i = 0; i < num_input_levels(); i++) {
403 for (size_t j = 0; j < inputs_[i].size(); j++) {
404 assert(mark_as_compacted ? !inputs_[i][j]->being_compacted
405 : inputs_[i][j]->being_compacted);
406 inputs_[i][j]->being_compacted = mark_as_compacted;
407 }
408 }
409 }
410
411 // Sample output:
412 // If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5,
413 // print: "3@0 + 2@3 + 1@4 files to L5"
414 const char* Compaction::InputLevelSummary(
415 InputLevelSummaryBuffer* scratch) const {
416 int len = 0;
417 bool is_first = true;
418 for (auto& input_level : inputs_) {
419 if (input_level.empty()) {
420 continue;
421 }
422 if (!is_first) {
423 len +=
424 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " + ");
425 len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
426 } else {
427 is_first = false;
428 }
429 len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
430 "%" ROCKSDB_PRIszt "@%d", input_level.size(),
431 input_level.level);
432 len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
433 }
434 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
435 " files to L%d", output_level());
436
437 return scratch->buffer;
438 }
439
440 uint64_t Compaction::CalculateTotalInputSize() const {
441 uint64_t size = 0;
442 for (auto& input_level : inputs_) {
443 for (auto f : input_level.files) {
444 size += f->fd.GetFileSize();
445 }
446 }
447 return size;
448 }
449
450 void Compaction::ReleaseCompactionFiles(Status status) {
451 MarkFilesBeingCompacted(false);
452 cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);
453 }
454
455 void Compaction::ResetNextCompactionIndex() {
456 assert(input_version_ != nullptr);
457 input_vstorage_->ResetNextCompactionIndex(start_level_);
458 }
459
460 namespace {
461 int InputSummary(const std::vector<FileMetaData*>& files, char* output,
462 int len) {
463 *output = '\0';
464 int write = 0;
465 for (size_t i = 0; i < files.size(); i++) {
466 int sz = len - write;
467 int ret;
468 char sztxt[16];
469 AppendHumanBytes(files.at(i)->fd.GetFileSize(), sztxt, 16);
470 ret = snprintf(output + write, sz, "%" PRIu64 "(%s) ",
471 files.at(i)->fd.GetNumber(), sztxt);
472 if (ret < 0 || ret >= sz) break;
473 write += ret;
474 }
475 // if files.size() is non-zero, overwrite the last space
476 return write - !!files.size();
477 }
478 } // namespace
479
480 void Compaction::Summary(char* output, int len) {
481 int write =
482 snprintf(output, len, "Base version %" PRIu64 " Base level %d, inputs: [",
483 input_version_->GetVersionNumber(), start_level_);
484 if (write < 0 || write >= len) {
485 return;
486 }
487
488 for (size_t level_iter = 0; level_iter < num_input_levels(); ++level_iter) {
489 if (level_iter > 0) {
490 write += snprintf(output + write, len - write, "], [");
491 if (write < 0 || write >= len) {
492 return;
493 }
494 }
495 write +=
496 InputSummary(inputs_[level_iter].files, output + write, len - write);
497 if (write < 0 || write >= len) {
498 return;
499 }
500 }
501
502 snprintf(output + write, len - write, "]");
503 }
504
505 uint64_t Compaction::OutputFilePreallocationSize() const {
506 uint64_t preallocation_size = 0;
507
508 for (const auto& level_files : inputs_) {
509 for (const auto& file : level_files.files) {
510 preallocation_size += file->fd.GetFileSize();
511 }
512 }
513
514 if (max_output_file_size_ != port::kMaxUint64 &&
515 (immutable_cf_options_.compaction_style == kCompactionStyleLevel ||
516 output_level() > 0)) {
517 preallocation_size = std::min(max_output_file_size_, preallocation_size);
518 }
519
520 // Over-estimate slightly so we don't end up just barely crossing
521 // the threshold
522 // No point to prellocate more than 1GB.
523 return std::min(uint64_t{1073741824},
524 preallocation_size + (preallocation_size / 10));
525 }
526
527 std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
528 if (!cfd_->ioptions()->compaction_filter_factory) {
529 return nullptr;
530 }
531
532 CompactionFilter::Context context;
533 context.is_full_compaction = is_full_compaction_;
534 context.is_manual_compaction = is_manual_compaction_;
535 context.column_family_id = cfd_->GetID();
536 return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
537 context);
538 }
539
540 std::unique_ptr<SstPartitioner> Compaction::CreateSstPartitioner() const {
541 if (!immutable_cf_options_.sst_partitioner_factory) {
542 return nullptr;
543 }
544
545 SstPartitioner::Context context;
546 context.is_full_compaction = is_full_compaction_;
547 context.is_manual_compaction = is_manual_compaction_;
548 context.output_level = output_level_;
549 context.smallest_user_key = smallest_user_key_;
550 context.largest_user_key = largest_user_key_;
551 return immutable_cf_options_.sst_partitioner_factory->CreatePartitioner(
552 context);
553 }
554
555 bool Compaction::IsOutputLevelEmpty() const {
556 return inputs_.back().level != output_level_ || inputs_.back().empty();
557 }
558
559 bool Compaction::ShouldFormSubcompactions() const {
560 if (max_subcompactions_ <= 1 || cfd_ == nullptr) {
561 return false;
562 }
563 if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
564 return (start_level_ == 0 || is_manual_compaction_) && output_level_ > 0 &&
565 !IsOutputLevelEmpty();
566 } else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
567 return number_levels_ > 1 && output_level_ > 0;
568 } else {
569 return false;
570 }
571 }
572
573 uint64_t Compaction::MinInputFileOldestAncesterTime() const {
574 uint64_t min_oldest_ancester_time = port::kMaxUint64;
575 for (const auto& level_files : inputs_) {
576 for (const auto& file : level_files.files) {
577 uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
578 if (oldest_ancester_time != 0) {
579 min_oldest_ancester_time =
580 std::min(min_oldest_ancester_time, oldest_ancester_time);
581 }
582 }
583 }
584 return min_oldest_ancester_time;
585 }
586
587 int Compaction::GetInputBaseLevel() const {
588 return input_vstorage_->base_level();
589 }
590
591 } // namespace ROCKSDB_NAMESPACE