]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction/compaction.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include <cinttypes>
11 #include <vector>
12
13 #include "db/column_family.h"
14 #include "db/compaction/compaction.h"
15 #include "rocksdb/compaction_filter.h"
16 #include "test_util/sync_point.h"
17 #include "util/string_util.h"
18
19 namespace ROCKSDB_NAMESPACE {
20
21 const uint64_t kRangeTombstoneSentinel =
22 PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
23
24 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
25 const InternalKey& b) {
26 auto c = user_cmp->Compare(a.user_key(), b.user_key());
27 if (c != 0) {
28 return c;
29 }
30 auto a_footer = ExtractInternalKeyFooter(a.Encode());
31 auto b_footer = ExtractInternalKeyFooter(b.Encode());
32 if (a_footer == kRangeTombstoneSentinel) {
33 if (b_footer != kRangeTombstoneSentinel) {
34 return -1;
35 }
36 } else if (b_footer == kRangeTombstoneSentinel) {
37 return 1;
38 }
39 return 0;
40 }
41
42 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
43 const InternalKey& b) {
44 if (a == nullptr) {
45 return -1;
46 }
47 return sstableKeyCompare(user_cmp, *a, b);
48 }
49
50 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
51 const InternalKey* b) {
52 if (b == nullptr) {
53 return -1;
54 }
55 return sstableKeyCompare(user_cmp, a, *b);
56 }
57
58 uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
59 uint64_t sum = 0;
60 for (size_t i = 0; i < files.size() && files[i]; i++) {
61 sum += files[i]->fd.GetFileSize();
62 }
63 return sum;
64 }
65
66 void Compaction::SetInputVersion(Version* _input_version) {
67 input_version_ = _input_version;
68 cfd_ = input_version_->cfd();
69
70 cfd_->Ref();
71 input_version_->Ref();
72 edit_.SetColumnFamily(cfd_->GetID());
73 }
74
75 void Compaction::GetBoundaryKeys(
76 VersionStorageInfo* vstorage,
77 const std::vector<CompactionInputFiles>& inputs, Slice* smallest_user_key,
78 Slice* largest_user_key) {
79 bool initialized = false;
80 const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
81 for (size_t i = 0; i < inputs.size(); ++i) {
82 if (inputs[i].files.empty()) {
83 continue;
84 }
85 if (inputs[i].level == 0) {
86 // we need to consider all files on level 0
87 for (const auto* f : inputs[i].files) {
88 const Slice& start_user_key = f->smallest.user_key();
89 if (!initialized ||
90 ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
91 *smallest_user_key = start_user_key;
92 }
93 const Slice& end_user_key = f->largest.user_key();
94 if (!initialized ||
95 ucmp->Compare(end_user_key, *largest_user_key) > 0) {
96 *largest_user_key = end_user_key;
97 }
98 initialized = true;
99 }
100 } else {
101 // we only need to consider the first and last file
102 const Slice& start_user_key = inputs[i].files[0]->smallest.user_key();
103 if (!initialized ||
104 ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
105 *smallest_user_key = start_user_key;
106 }
107 const Slice& end_user_key = inputs[i].files.back()->largest.user_key();
108 if (!initialized || ucmp->Compare(end_user_key, *largest_user_key) > 0) {
109 *largest_user_key = end_user_key;
110 }
111 initialized = true;
112 }
113 }
114 }
115
116 std::vector<CompactionInputFiles> Compaction::PopulateWithAtomicBoundaries(
117 VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs) {
118 const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
119 for (size_t i = 0; i < inputs.size(); i++) {
120 if (inputs[i].level == 0 || inputs[i].files.empty()) {
121 continue;
122 }
123 inputs[i].atomic_compaction_unit_boundaries.reserve(inputs[i].files.size());
124 AtomicCompactionUnitBoundary cur_boundary;
125 size_t first_atomic_idx = 0;
126 auto add_unit_boundary = [&](size_t to) {
127 if (first_atomic_idx == to) return;
128 for (size_t k = first_atomic_idx; k < to; k++) {
129 inputs[i].atomic_compaction_unit_boundaries.push_back(cur_boundary);
130 }
131 first_atomic_idx = to;
132 };
133 for (size_t j = 0; j < inputs[i].files.size(); j++) {
134 const auto* f = inputs[i].files[j];
135 if (j == 0) {
136 // First file in a level.
137 cur_boundary.smallest = &f->smallest;
138 cur_boundary.largest = &f->largest;
139 } else if (sstableKeyCompare(ucmp, *cur_boundary.largest, f->smallest) ==
140 0) {
141 // SSTs overlap but the end key of the previous file was not
142 // artificially extended by a range tombstone. Extend the current
143 // boundary.
144 cur_boundary.largest = &f->largest;
145 } else {
146 // Atomic compaction unit has ended.
147 add_unit_boundary(j);
148 cur_boundary.smallest = &f->smallest;
149 cur_boundary.largest = &f->largest;
150 }
151 }
152 add_unit_boundary(inputs[i].files.size());
153 assert(inputs[i].files.size() ==
154 inputs[i].atomic_compaction_unit_boundaries.size());
155 }
156 return inputs;
157 }
158
159 // helper function to determine if compaction is creating files at the
160 // bottommost level
161 bool Compaction::IsBottommostLevel(
162 int output_level, VersionStorageInfo* vstorage,
163 const std::vector<CompactionInputFiles>& inputs) {
164 int output_l0_idx;
165 if (output_level == 0) {
166 output_l0_idx = 0;
167 for (const auto* file : vstorage->LevelFiles(0)) {
168 if (inputs[0].files.back() == file) {
169 break;
170 }
171 ++output_l0_idx;
172 }
173 assert(static_cast<size_t>(output_l0_idx) < vstorage->LevelFiles(0).size());
174 } else {
175 output_l0_idx = -1;
176 }
177 Slice smallest_key, largest_key;
178 GetBoundaryKeys(vstorage, inputs, &smallest_key, &largest_key);
179 return !vstorage->RangeMightExistAfterSortedRun(smallest_key, largest_key,
180 output_level, output_l0_idx);
181 }
182
183 // test function to validate the functionality of IsBottommostLevel()
184 // function -- determines if compaction with inputs and storage is bottommost
185 bool Compaction::TEST_IsBottommostLevel(
186 int output_level, VersionStorageInfo* vstorage,
187 const std::vector<CompactionInputFiles>& inputs) {
188 return IsBottommostLevel(output_level, vstorage, inputs);
189 }
190
191 bool Compaction::IsFullCompaction(
192 VersionStorageInfo* vstorage,
193 const std::vector<CompactionInputFiles>& inputs) {
194 size_t num_files_in_compaction = 0;
195 size_t total_num_files = 0;
196 for (int l = 0; l < vstorage->num_levels(); l++) {
197 total_num_files += vstorage->NumLevelFiles(l);
198 }
199 for (size_t i = 0; i < inputs.size(); i++) {
200 num_files_in_compaction += inputs[i].size();
201 }
202 return num_files_in_compaction == total_num_files;
203 }
204
205 Compaction::Compaction(VersionStorageInfo* vstorage,
206 const ImmutableCFOptions& _immutable_cf_options,
207 const MutableCFOptions& _mutable_cf_options,
208 std::vector<CompactionInputFiles> _inputs,
209 int _output_level, uint64_t _target_file_size,
210 uint64_t _max_compaction_bytes, uint32_t _output_path_id,
211 CompressionType _compression,
212 CompressionOptions _compression_opts,
213 uint32_t _max_subcompactions,
214 std::vector<FileMetaData*> _grandparents,
215 bool _manual_compaction, double _score,
216 bool _deletion_compaction,
217 CompactionReason _compaction_reason)
218 : input_vstorage_(vstorage),
219 start_level_(_inputs[0].level),
220 output_level_(_output_level),
221 max_output_file_size_(_target_file_size),
222 max_compaction_bytes_(_max_compaction_bytes),
223 max_subcompactions_(_max_subcompactions),
224 immutable_cf_options_(_immutable_cf_options),
225 mutable_cf_options_(_mutable_cf_options),
226 input_version_(nullptr),
227 number_levels_(vstorage->num_levels()),
228 cfd_(nullptr),
229 output_path_id_(_output_path_id),
230 output_compression_(_compression),
231 output_compression_opts_(_compression_opts),
232 deletion_compaction_(_deletion_compaction),
233 inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
234 grandparents_(std::move(_grandparents)),
235 score_(_score),
236 bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
237 is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
238 is_manual_compaction_(_manual_compaction),
239 is_trivial_move_(false),
240 compaction_reason_(_compaction_reason) {
241 MarkFilesBeingCompacted(true);
242 if (is_manual_compaction_) {
243 compaction_reason_ = CompactionReason::kManualCompaction;
244 }
245 if (max_subcompactions_ == 0) {
246 max_subcompactions_ = immutable_cf_options_.max_subcompactions;
247 }
248 if (!bottommost_level_) {
249 // Currently we only enable dictionary compression during compaction to the
250 // bottommost level.
251 output_compression_opts_.max_dict_bytes = 0;
252 output_compression_opts_.zstd_max_train_bytes = 0;
253 }
254
255 #ifndef NDEBUG
256 for (size_t i = 1; i < inputs_.size(); ++i) {
257 assert(inputs_[i].level > inputs_[i - 1].level);
258 }
259 #endif
260
261 // setup input_levels_
262 {
263 input_levels_.resize(num_input_levels());
264 for (size_t which = 0; which < num_input_levels(); which++) {
265 DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files,
266 &arena_);
267 }
268 }
269
270 GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_);
271 }
272
273 Compaction::~Compaction() {
274 if (input_version_ != nullptr) {
275 input_version_->Unref();
276 }
277 if (cfd_ != nullptr) {
278 cfd_->UnrefAndTryDelete();
279 }
280 }
281
282 bool Compaction::InputCompressionMatchesOutput() const {
283 int base_level = input_vstorage_->base_level();
284 bool matches = (GetCompressionType(immutable_cf_options_, input_vstorage_,
285 mutable_cf_options_, start_level_,
286 base_level) == output_compression_);
287 if (matches) {
288 TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
289 return true;
290 }
291 TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch");
292 return matches;
293 }
294
295 bool Compaction::IsTrivialMove() const {
296 // Avoid a move if there is lots of overlapping grandparent data.
297 // Otherwise, the move could create a parent file that will require
298 // a very expensive merge later on.
299 // If start_level_== output_level_, the purpose is to force compaction
300 // filter to be applied to that level, and thus cannot be a trivial move.
301
302 // Check if start level have files with overlapping ranges
303 if (start_level_ == 0 && input_vstorage_->level0_non_overlapping() == false) {
304 // We cannot move files from L0 to L1 if the files are overlapping
305 return false;
306 }
307
308 if (is_manual_compaction_ &&
309 (immutable_cf_options_.compaction_filter != nullptr ||
310 immutable_cf_options_.compaction_filter_factory != nullptr)) {
311 // This is a manual compaction and we have a compaction filter that should
312 // be executed, we cannot do a trivial move
313 return false;
314 }
315
316 // Used in universal compaction, where trivial move can be done if the
317 // input files are non overlapping
318 if ((mutable_cf_options_.compaction_options_universal.allow_trivial_move) &&
319 (output_level_ != 0)) {
320 return is_trivial_move_;
321 }
322
323 if (!(start_level_ != output_level_ && num_input_levels() == 1 &&
324 input(0, 0)->fd.GetPathId() == output_path_id() &&
325 InputCompressionMatchesOutput())) {
326 return false;
327 }
328
329 // assert inputs_.size() == 1
330
331 for (const auto& file : inputs_.front().files) {
332 std::vector<FileMetaData*> file_grand_parents;
333 if (output_level_ + 1 >= number_levels_) {
334 continue;
335 }
336 input_vstorage_->GetOverlappingInputs(output_level_ + 1, &file->smallest,
337 &file->largest, &file_grand_parents);
338 const auto compaction_size =
339 file->fd.GetFileSize() + TotalFileSize(file_grand_parents);
340 if (compaction_size > max_compaction_bytes_) {
341 return false;
342 }
343 }
344
345 return true;
346 }
347
348 void Compaction::AddInputDeletions(VersionEdit* out_edit) {
349 for (size_t which = 0; which < num_input_levels(); which++) {
350 for (size_t i = 0; i < inputs_[which].size(); i++) {
351 out_edit->DeleteFile(level(which), inputs_[which][i]->fd.GetNumber());
352 }
353 }
354 }
355
356 bool Compaction::KeyNotExistsBeyondOutputLevel(
357 const Slice& user_key, std::vector<size_t>* level_ptrs) const {
358 assert(input_version_ != nullptr);
359 assert(level_ptrs != nullptr);
360 assert(level_ptrs->size() == static_cast<size_t>(number_levels_));
361 if (bottommost_level_) {
362 return true;
363 } else if (output_level_ != 0 &&
364 cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
365 // Maybe use binary search to find right entry instead of linear search?
366 const Comparator* user_cmp = cfd_->user_comparator();
367 for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
368 const std::vector<FileMetaData*>& files =
369 input_vstorage_->LevelFiles(lvl);
370 for (; level_ptrs->at(lvl) < files.size(); level_ptrs->at(lvl)++) {
371 auto* f = files[level_ptrs->at(lvl)];
372 if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
373 // We've advanced far enough
374 if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
375 // Key falls in this file's range, so it may
376 // exist beyond output level
377 return false;
378 }
379 break;
380 }
381 }
382 }
383 return true;
384 }
385 return false;
386 }
387
388 // Mark (or clear) each file that is being compacted
389 void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
390 for (size_t i = 0; i < num_input_levels(); i++) {
391 for (size_t j = 0; j < inputs_[i].size(); j++) {
392 assert(mark_as_compacted ? !inputs_[i][j]->being_compacted
393 : inputs_[i][j]->being_compacted);
394 inputs_[i][j]->being_compacted = mark_as_compacted;
395 }
396 }
397 }
398
399 // Sample output:
400 // If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5,
401 // print: "3@0 + 2@3 + 1@4 files to L5"
402 const char* Compaction::InputLevelSummary(
403 InputLevelSummaryBuffer* scratch) const {
404 int len = 0;
405 bool is_first = true;
406 for (auto& input_level : inputs_) {
407 if (input_level.empty()) {
408 continue;
409 }
410 if (!is_first) {
411 len +=
412 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " + ");
413 len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
414 } else {
415 is_first = false;
416 }
417 len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
418 "%" ROCKSDB_PRIszt "@%d", input_level.size(),
419 input_level.level);
420 len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
421 }
422 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
423 " files to L%d", output_level());
424
425 return scratch->buffer;
426 }
427
428 uint64_t Compaction::CalculateTotalInputSize() const {
429 uint64_t size = 0;
430 for (auto& input_level : inputs_) {
431 for (auto f : input_level.files) {
432 size += f->fd.GetFileSize();
433 }
434 }
435 return size;
436 }
437
438 void Compaction::ReleaseCompactionFiles(Status status) {
439 MarkFilesBeingCompacted(false);
440 cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);
441 }
442
443 void Compaction::ResetNextCompactionIndex() {
444 assert(input_version_ != nullptr);
445 input_vstorage_->ResetNextCompactionIndex(start_level_);
446 }
447
448 namespace {
449 int InputSummary(const std::vector<FileMetaData*>& files, char* output,
450 int len) {
451 *output = '\0';
452 int write = 0;
453 for (size_t i = 0; i < files.size(); i++) {
454 int sz = len - write;
455 int ret;
456 char sztxt[16];
457 AppendHumanBytes(files.at(i)->fd.GetFileSize(), sztxt, 16);
458 ret = snprintf(output + write, sz, "%" PRIu64 "(%s) ",
459 files.at(i)->fd.GetNumber(), sztxt);
460 if (ret < 0 || ret >= sz) break;
461 write += ret;
462 }
463 // if files.size() is non-zero, overwrite the last space
464 return write - !!files.size();
465 }
466 } // namespace
467
468 void Compaction::Summary(char* output, int len) {
469 int write =
470 snprintf(output, len, "Base version %" PRIu64 " Base level %d, inputs: [",
471 input_version_->GetVersionNumber(), start_level_);
472 if (write < 0 || write >= len) {
473 return;
474 }
475
476 for (size_t level_iter = 0; level_iter < num_input_levels(); ++level_iter) {
477 if (level_iter > 0) {
478 write += snprintf(output + write, len - write, "], [");
479 if (write < 0 || write >= len) {
480 return;
481 }
482 }
483 write +=
484 InputSummary(inputs_[level_iter].files, output + write, len - write);
485 if (write < 0 || write >= len) {
486 return;
487 }
488 }
489
490 snprintf(output + write, len - write, "]");
491 }
492
493 uint64_t Compaction::OutputFilePreallocationSize() const {
494 uint64_t preallocation_size = 0;
495
496 for (const auto& level_files : inputs_) {
497 for (const auto& file : level_files.files) {
498 preallocation_size += file->fd.GetFileSize();
499 }
500 }
501
502 if (max_output_file_size_ != port::kMaxUint64 &&
503 (immutable_cf_options_.compaction_style == kCompactionStyleLevel ||
504 output_level() > 0)) {
505 preallocation_size = std::min(max_output_file_size_, preallocation_size);
506 }
507
508 // Over-estimate slightly so we don't end up just barely crossing
509 // the threshold
510 // No point to prellocate more than 1GB.
511 return std::min(uint64_t{1073741824},
512 preallocation_size + (preallocation_size / 10));
513 }
514
515 std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
516 if (!cfd_->ioptions()->compaction_filter_factory) {
517 return nullptr;
518 }
519
520 CompactionFilter::Context context;
521 context.is_full_compaction = is_full_compaction_;
522 context.is_manual_compaction = is_manual_compaction_;
523 context.column_family_id = cfd_->GetID();
524 return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
525 context);
526 }
527
528 bool Compaction::IsOutputLevelEmpty() const {
529 return inputs_.back().level != output_level_ || inputs_.back().empty();
530 }
531
532 bool Compaction::ShouldFormSubcompactions() const {
533 if (max_subcompactions_ <= 1 || cfd_ == nullptr) {
534 return false;
535 }
536 if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
537 return (start_level_ == 0 || is_manual_compaction_) && output_level_ > 0 &&
538 !IsOutputLevelEmpty();
539 } else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
540 return number_levels_ > 1 && output_level_ > 0;
541 } else {
542 return false;
543 }
544 }
545
546 uint64_t Compaction::MinInputFileOldestAncesterTime() const {
547 uint64_t min_oldest_ancester_time = port::kMaxUint64;
548 for (const auto& level_files : inputs_) {
549 for (const auto& file : level_files.files) {
550 uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
551 if (oldest_ancester_time != 0) {
552 min_oldest_ancester_time =
553 std::min(min_oldest_ancester_time, oldest_ancester_time);
554 }
555 }
556 }
557 return min_oldest_ancester_time;
558 }
559
560 int Compaction::GetInputBaseLevel() const {
561 return input_vstorage_->base_level();
562 }
563
564 } // namespace ROCKSDB_NAMESPACE