]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction_picker.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / compaction_picker.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/compaction_picker.h"
11
12 #ifndef __STDC_FORMAT_MACROS
13 #define __STDC_FORMAT_MACROS
14 #endif
15
16 #include <inttypes.h>
17 #include <limits>
18 #include <queue>
19 #include <string>
20 #include <utility>
21 #include <vector>
22 #include "db/column_family.h"
23 #include "monitoring/statistics.h"
24 #include "util/filename.h"
25 #include "util/log_buffer.h"
26 #include "util/random.h"
27 #include "util/string_util.h"
28 #include "util/sync_point.h"
29
30 namespace rocksdb {
31
32 namespace {
33 uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
34 uint64_t sum = 0;
35 for (size_t i = 0; i < files.size() && files[i]; i++) {
36 sum += files[i]->compensated_file_size;
37 }
38 return sum;
39 }
40 } // anonymous namespace
41
42 bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
43 size_t min_files_to_compact,
44 uint64_t max_compact_bytes_per_del_file,
45 CompactionInputFiles* comp_inputs) {
46 size_t compact_bytes = static_cast<size_t>(level_files[0]->fd.file_size);
47 size_t compact_bytes_per_del_file = port::kMaxSizet;
48 // compaction range will be [0, span_len).
49 size_t span_len;
50 // pull in files until the amount of compaction work per deleted file begins
51 // increasing.
52 size_t new_compact_bytes_per_del_file = 0;
53 for (span_len = 1; span_len < level_files.size(); ++span_len) {
54 compact_bytes += static_cast<size_t>(level_files[span_len]->fd.file_size);
55 new_compact_bytes_per_del_file = compact_bytes / span_len;
56 if (level_files[span_len]->being_compacted ||
57 new_compact_bytes_per_del_file > compact_bytes_per_del_file) {
58 break;
59 }
60 compact_bytes_per_del_file = new_compact_bytes_per_del_file;
61 }
62
63 if (span_len >= min_files_to_compact &&
64 compact_bytes_per_del_file < max_compact_bytes_per_del_file) {
65 assert(comp_inputs != nullptr);
66 comp_inputs->level = 0;
67 for (size_t i = 0; i < span_len; ++i) {
68 comp_inputs->files.push_back(level_files[i]);
69 }
70 return true;
71 }
72 return false;
73 }
74
75 // Determine compression type, based on user options, level of the output
76 // file and whether compression is disabled.
77 // If enable_compression is false, then compression is always disabled no
78 // matter what the values of the other two parameters are.
79 // Otherwise, the compression type is determined based on options and level.
80 CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
81 const VersionStorageInfo* vstorage,
82 const MutableCFOptions& mutable_cf_options,
83 int level, int base_level,
84 const bool enable_compression) {
85 if (!enable_compression) {
86 // disable compression
87 return kNoCompression;
88 }
89
90 // If bottommost_compression is set and we are compacting to the
91 // bottommost level then we should use it.
92 if (ioptions.bottommost_compression != kDisableCompressionOption &&
93 level >= (vstorage->num_non_empty_levels() - 1)) {
94 return ioptions.bottommost_compression;
95 }
96 // If the user has specified a different compression level for each level,
97 // then pick the compression for that level.
98 if (!ioptions.compression_per_level.empty()) {
99 assert(level == 0 || level >= base_level);
100 int idx = (level == 0) ? 0 : level - base_level + 1;
101
102 const int n = static_cast<int>(ioptions.compression_per_level.size()) - 1;
103 // It is possible for level_ to be -1; in that case, we use level
104 // 0's compression. This occurs mostly in backwards compatibility
105 // situations when the builder doesn't know what level the file
106 // belongs to. Likewise, if level is beyond the end of the
107 // specified compression levels, use the last value.
108 return ioptions.compression_per_level[std::max(0, std::min(idx, n))];
109 } else {
110 return mutable_cf_options.compression;
111 }
112 }
113
114 CompressionOptions GetCompressionOptions(const ImmutableCFOptions& ioptions,
115 const VersionStorageInfo* vstorage,
116 int level,
117 const bool enable_compression) {
118 if (!enable_compression) {
119 return ioptions.compression_opts;
120 }
121 // If bottommost_compression is set and we are compacting to the
122 // bottommost level then we should use the specified compression options
123 // for the bottmomost_compression.
124 if (ioptions.bottommost_compression != kDisableCompressionOption &&
125 level >= (vstorage->num_non_empty_levels() - 1) &&
126 ioptions.bottommost_compression_opts.enabled) {
127 return ioptions.bottommost_compression_opts;
128 }
129 return ioptions.compression_opts;
130 }
131
132 CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
133 const InternalKeyComparator* icmp)
134 : ioptions_(ioptions), icmp_(icmp) {}
135
136 CompactionPicker::~CompactionPicker() {}
137
138 // Delete this compaction from the list of running compactions.
139 void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
140 UnregisterCompaction(c);
141 if (!status.ok()) {
142 c->ResetNextCompactionIndex();
143 }
144 }
145
146 void CompactionPicker::GetRange(const CompactionInputFiles& inputs,
147 InternalKey* smallest,
148 InternalKey* largest) const {
149 const int level = inputs.level;
150 assert(!inputs.empty());
151 smallest->Clear();
152 largest->Clear();
153
154 if (level == 0) {
155 for (size_t i = 0; i < inputs.size(); i++) {
156 FileMetaData* f = inputs[i];
157 if (i == 0) {
158 *smallest = f->smallest;
159 *largest = f->largest;
160 } else {
161 if (icmp_->Compare(f->smallest, *smallest) < 0) {
162 *smallest = f->smallest;
163 }
164 if (icmp_->Compare(f->largest, *largest) > 0) {
165 *largest = f->largest;
166 }
167 }
168 }
169 } else {
170 *smallest = inputs[0]->smallest;
171 *largest = inputs[inputs.size() - 1]->largest;
172 }
173 }
174
175 void CompactionPicker::GetRange(const CompactionInputFiles& inputs1,
176 const CompactionInputFiles& inputs2,
177 InternalKey* smallest,
178 InternalKey* largest) const {
179 assert(!inputs1.empty() || !inputs2.empty());
180 if (inputs1.empty()) {
181 GetRange(inputs2, smallest, largest);
182 } else if (inputs2.empty()) {
183 GetRange(inputs1, smallest, largest);
184 } else {
185 InternalKey smallest1, smallest2, largest1, largest2;
186 GetRange(inputs1, &smallest1, &largest1);
187 GetRange(inputs2, &smallest2, &largest2);
188 *smallest =
189 icmp_->Compare(smallest1, smallest2) < 0 ? smallest1 : smallest2;
190 *largest = icmp_->Compare(largest1, largest2) < 0 ? largest2 : largest1;
191 }
192 }
193
194 void CompactionPicker::GetRange(const std::vector<CompactionInputFiles>& inputs,
195 InternalKey* smallest,
196 InternalKey* largest) const {
197 InternalKey current_smallest;
198 InternalKey current_largest;
199 bool initialized = false;
200 for (const auto& in : inputs) {
201 if (in.empty()) {
202 continue;
203 }
204 GetRange(in, &current_smallest, &current_largest);
205 if (!initialized) {
206 *smallest = current_smallest;
207 *largest = current_largest;
208 initialized = true;
209 } else {
210 if (icmp_->Compare(current_smallest, *smallest) < 0) {
211 *smallest = current_smallest;
212 }
213 if (icmp_->Compare(current_largest, *largest) > 0) {
214 *largest = current_largest;
215 }
216 }
217 }
218 assert(initialized);
219 }
220
221 bool CompactionPicker::ExpandInputsToCleanCut(const std::string& /*cf_name*/,
222 VersionStorageInfo* vstorage,
223 CompactionInputFiles* inputs,
224 InternalKey** next_smallest) {
225 // This isn't good compaction
226 assert(!inputs->empty());
227
228 const int level = inputs->level;
229 // GetOverlappingInputs will always do the right thing for level-0.
230 // So we don't need to do any expansion if level == 0.
231 if (level == 0) {
232 return true;
233 }
234
235 InternalKey smallest, largest;
236
237 // Keep expanding inputs until we are sure that there is a "clean cut"
238 // boundary between the files in input and the surrounding files.
239 // This will ensure that no parts of a key are lost during compaction.
240 int hint_index = -1;
241 size_t old_size;
242 do {
243 old_size = inputs->size();
244 GetRange(*inputs, &smallest, &largest);
245 inputs->clear();
246 vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files,
247 hint_index, &hint_index, true,
248 next_smallest);
249 } while (inputs->size() > old_size);
250
251 // we started off with inputs non-empty and the previous loop only grew
252 // inputs. thus, inputs should be non-empty here
253 assert(!inputs->empty());
254
255 // If, after the expansion, there are files that are already under
256 // compaction, then we must drop/cancel this compaction.
257 if (AreFilesInCompaction(inputs->files)) {
258 return false;
259 }
260 return true;
261 }
262
263 bool CompactionPicker::RangeOverlapWithCompaction(
264 const Slice& smallest_user_key, const Slice& largest_user_key,
265 int level) const {
266 const Comparator* ucmp = icmp_->user_comparator();
267 for (Compaction* c : compactions_in_progress_) {
268 if (c->output_level() == level &&
269 ucmp->Compare(smallest_user_key, c->GetLargestUserKey()) <= 0 &&
270 ucmp->Compare(largest_user_key, c->GetSmallestUserKey()) >= 0) {
271 // Overlap
272 return true;
273 }
274 }
275 // Did not overlap with any running compaction in level `level`
276 return false;
277 }
278
279 bool CompactionPicker::FilesRangeOverlapWithCompaction(
280 const std::vector<CompactionInputFiles>& inputs, int level) const {
281 bool is_empty = true;
282 for (auto& in : inputs) {
283 if (!in.empty()) {
284 is_empty = false;
285 break;
286 }
287 }
288 if (is_empty) {
289 // No files in inputs
290 return false;
291 }
292
293 InternalKey smallest, largest;
294 GetRange(inputs, &smallest, &largest);
295 return RangeOverlapWithCompaction(smallest.user_key(), largest.user_key(),
296 level);
297 }
298
299 // Returns true if any one of specified files are being compacted
300 bool CompactionPicker::AreFilesInCompaction(
301 const std::vector<FileMetaData*>& files) {
302 for (size_t i = 0; i < files.size(); i++) {
303 if (files[i]->being_compacted) {
304 return true;
305 }
306 }
307 return false;
308 }
309
310 Compaction* CompactionPicker::CompactFiles(
311 const CompactionOptions& compact_options,
312 const std::vector<CompactionInputFiles>& input_files, int output_level,
313 VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
314 uint32_t output_path_id) {
315 assert(input_files.size());
316 // This compaction output should not overlap with a running compaction as
317 // `SanitizeCompactionInputFiles` should've checked earlier and db mutex
318 // shouldn't have been released since.
319 assert(!FilesRangeOverlapWithCompaction(input_files, output_level));
320
321 CompressionType compression_type;
322 if (compact_options.compression == kDisableCompressionOption) {
323 int base_level;
324 if (ioptions_.compaction_style == kCompactionStyleLevel) {
325 base_level = vstorage->base_level();
326 } else {
327 base_level = 1;
328 }
329 compression_type =
330 GetCompressionType(ioptions_, vstorage, mutable_cf_options,
331 output_level, base_level);
332 } else {
333 // TODO(ajkr): `CompactionOptions` offers configurable `CompressionType`
334 // without configurable `CompressionOptions`, which is inconsistent.
335 compression_type = compact_options.compression;
336 }
337 auto c = new Compaction(
338 vstorage, ioptions_, mutable_cf_options, input_files, output_level,
339 compact_options.output_file_size_limit,
340 mutable_cf_options.max_compaction_bytes, output_path_id, compression_type,
341 GetCompressionOptions(ioptions_, vstorage, output_level),
342 compact_options.max_subcompactions,
343 /* grandparents */ {}, true);
344 RegisterCompaction(c);
345 return c;
346 }
347
348 Status CompactionPicker::GetCompactionInputsFromFileNumbers(
349 std::vector<CompactionInputFiles>* input_files,
350 std::unordered_set<uint64_t>* input_set, const VersionStorageInfo* vstorage,
351 const CompactionOptions& /*compact_options*/) const {
352 if (input_set->size() == 0U) {
353 return Status::InvalidArgument(
354 "Compaction must include at least one file.");
355 }
356 assert(input_files);
357
358 std::vector<CompactionInputFiles> matched_input_files;
359 matched_input_files.resize(vstorage->num_levels());
360 int first_non_empty_level = -1;
361 int last_non_empty_level = -1;
362 // TODO(yhchiang): use a lazy-initialized mapping from
363 // file_number to FileMetaData in Version.
364 for (int level = 0; level < vstorage->num_levels(); ++level) {
365 for (auto file : vstorage->LevelFiles(level)) {
366 auto iter = input_set->find(file->fd.GetNumber());
367 if (iter != input_set->end()) {
368 matched_input_files[level].files.push_back(file);
369 input_set->erase(iter);
370 last_non_empty_level = level;
371 if (first_non_empty_level == -1) {
372 first_non_empty_level = level;
373 }
374 }
375 }
376 }
377
378 if (!input_set->empty()) {
379 std::string message(
380 "Cannot find matched SST files for the following file numbers:");
381 for (auto fn : *input_set) {
382 message += " ";
383 message += ToString(fn);
384 }
385 return Status::InvalidArgument(message);
386 }
387
388 for (int level = first_non_empty_level; level <= last_non_empty_level;
389 ++level) {
390 matched_input_files[level].level = level;
391 input_files->emplace_back(std::move(matched_input_files[level]));
392 }
393
394 return Status::OK();
395 }
396
397 // Returns true if any one of the parent files are being compacted
398 bool CompactionPicker::IsRangeInCompaction(VersionStorageInfo* vstorage,
399 const InternalKey* smallest,
400 const InternalKey* largest,
401 int level, int* level_index) {
402 std::vector<FileMetaData*> inputs;
403 assert(level < NumberLevels());
404
405 vstorage->GetOverlappingInputs(level, smallest, largest, &inputs,
406 level_index ? *level_index : 0, level_index);
407 return AreFilesInCompaction(inputs);
408 }
409
410 // Populates the set of inputs of all other levels that overlap with the
411 // start level.
412 // Now we assume all levels except start level and output level are empty.
413 // Will also attempt to expand "start level" if that doesn't expand
414 // "output level" or cause "level" to include a file for compaction that has an
415 // overlapping user-key with another file.
416 // REQUIRES: input_level and output_level are different
417 // REQUIRES: inputs->empty() == false
418 // Returns false if files on parent level are currently in compaction, which
419 // means that we can't compact them
420 bool CompactionPicker::SetupOtherInputs(
421 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
422 VersionStorageInfo* vstorage, CompactionInputFiles* inputs,
423 CompactionInputFiles* output_level_inputs, int* parent_index,
424 int base_index) {
425 assert(!inputs->empty());
426 assert(output_level_inputs->empty());
427 const int input_level = inputs->level;
428 const int output_level = output_level_inputs->level;
429 if (input_level == output_level) {
430 // no possibility of conflict
431 return true;
432 }
433
434 // For now, we only support merging two levels, start level and output level.
435 // We need to assert other levels are empty.
436 for (int l = input_level + 1; l < output_level; l++) {
437 assert(vstorage->NumLevelFiles(l) == 0);
438 }
439
440 InternalKey smallest, largest;
441
442 // Get the range one last time.
443 GetRange(*inputs, &smallest, &largest);
444
445 // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to
446 // include in compaction
447 vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
448 &output_level_inputs->files, *parent_index,
449 parent_index);
450 if (AreFilesInCompaction(output_level_inputs->files)) {
451 return false;
452 }
453 if (!output_level_inputs->empty()) {
454 if (!ExpandInputsToCleanCut(cf_name, vstorage, output_level_inputs)) {
455 return false;
456 }
457 }
458
459 // See if we can further grow the number of inputs in "level" without
460 // changing the number of "level+1" files we pick up. We also choose NOT
461 // to expand if this would cause "level" to include some entries for some
462 // user key, while excluding other entries for the same user key. This
463 // can happen when one user key spans multiple files.
464 if (!output_level_inputs->empty()) {
465 const uint64_t limit = mutable_cf_options.max_compaction_bytes;
466 const uint64_t output_level_inputs_size =
467 TotalCompensatedFileSize(output_level_inputs->files);
468 const uint64_t inputs_size = TotalCompensatedFileSize(inputs->files);
469 bool expand_inputs = false;
470
471 CompactionInputFiles expanded_inputs;
472 expanded_inputs.level = input_level;
473 // Get closed interval of output level
474 InternalKey all_start, all_limit;
475 GetRange(*inputs, *output_level_inputs, &all_start, &all_limit);
476 bool try_overlapping_inputs = true;
477 vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
478 &expanded_inputs.files, base_index, nullptr);
479 uint64_t expanded_inputs_size =
480 TotalCompensatedFileSize(expanded_inputs.files);
481 if (!ExpandInputsToCleanCut(cf_name, vstorage, &expanded_inputs)) {
482 try_overlapping_inputs = false;
483 }
484 if (try_overlapping_inputs && expanded_inputs.size() > inputs->size() &&
485 output_level_inputs_size + expanded_inputs_size < limit &&
486 !AreFilesInCompaction(expanded_inputs.files)) {
487 InternalKey new_start, new_limit;
488 GetRange(expanded_inputs, &new_start, &new_limit);
489 CompactionInputFiles expanded_output_level_inputs;
490 expanded_output_level_inputs.level = output_level;
491 vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
492 &expanded_output_level_inputs.files,
493 *parent_index, parent_index);
494 assert(!expanded_output_level_inputs.empty());
495 if (!AreFilesInCompaction(expanded_output_level_inputs.files) &&
496 ExpandInputsToCleanCut(cf_name, vstorage,
497 &expanded_output_level_inputs) &&
498 expanded_output_level_inputs.size() == output_level_inputs->size()) {
499 expand_inputs = true;
500 }
501 }
502 if (!expand_inputs) {
503 vstorage->GetCleanInputsWithinInterval(input_level, &all_start,
504 &all_limit, &expanded_inputs.files,
505 base_index, nullptr);
506 expanded_inputs_size = TotalCompensatedFileSize(expanded_inputs.files);
507 if (expanded_inputs.size() > inputs->size() &&
508 output_level_inputs_size + expanded_inputs_size < limit &&
509 !AreFilesInCompaction(expanded_inputs.files)) {
510 expand_inputs = true;
511 }
512 }
513 if (expand_inputs) {
514 ROCKS_LOG_INFO(ioptions_.info_log,
515 "[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt
516 "(%" PRIu64 "+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt
517 "+%" ROCKSDB_PRIszt " (%" PRIu64 "+%" PRIu64 " bytes)\n",
518 cf_name.c_str(), input_level, inputs->size(),
519 output_level_inputs->size(), inputs_size,
520 output_level_inputs_size, expanded_inputs.size(),
521 output_level_inputs->size(), expanded_inputs_size,
522 output_level_inputs_size);
523 inputs->files = expanded_inputs.files;
524 }
525 }
526 return true;
527 }
528
529 void CompactionPicker::GetGrandparents(
530 VersionStorageInfo* vstorage, const CompactionInputFiles& inputs,
531 const CompactionInputFiles& output_level_inputs,
532 std::vector<FileMetaData*>* grandparents) {
533 InternalKey start, limit;
534 GetRange(inputs, output_level_inputs, &start, &limit);
535 // Compute the set of grandparent files that overlap this compaction
536 // (parent == level+1; grandparent == level+2)
537 if (output_level_inputs.level + 1 < NumberLevels()) {
538 vstorage->GetOverlappingInputs(output_level_inputs.level + 1, &start,
539 &limit, grandparents);
540 }
541 }
542
543 Compaction* CompactionPicker::CompactRange(
544 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
545 VersionStorageInfo* vstorage, int input_level, int output_level,
546 uint32_t output_path_id, uint32_t max_subcompactions,
547 const InternalKey* begin, const InternalKey* end,
548 InternalKey** compaction_end, bool* manual_conflict) {
549 // CompactionPickerFIFO has its own implementation of compact range
550 assert(ioptions_.compaction_style != kCompactionStyleFIFO);
551
552 if (input_level == ColumnFamilyData::kCompactAllLevels) {
553 assert(ioptions_.compaction_style == kCompactionStyleUniversal);
554
555 // Universal compaction with more than one level always compacts all the
556 // files together to the last level.
557 assert(vstorage->num_levels() > 1);
558 // DBImpl::CompactRange() set output level to be the last level
559 if (ioptions_.allow_ingest_behind) {
560 assert(output_level == vstorage->num_levels() - 2);
561 } else {
562 assert(output_level == vstorage->num_levels() - 1);
563 }
564 // DBImpl::RunManualCompaction will make full range for universal compaction
565 assert(begin == nullptr);
566 assert(end == nullptr);
567 *compaction_end = nullptr;
568
569 int start_level = 0;
570 for (; start_level < vstorage->num_levels() &&
571 vstorage->NumLevelFiles(start_level) == 0;
572 start_level++) {
573 }
574 if (start_level == vstorage->num_levels()) {
575 return nullptr;
576 }
577
578 if ((start_level == 0) && (!level0_compactions_in_progress_.empty())) {
579 *manual_conflict = true;
580 // Only one level 0 compaction allowed
581 return nullptr;
582 }
583
584 std::vector<CompactionInputFiles> inputs(vstorage->num_levels() -
585 start_level);
586 for (int level = start_level; level < vstorage->num_levels(); level++) {
587 inputs[level - start_level].level = level;
588 auto& files = inputs[level - start_level].files;
589 for (FileMetaData* f : vstorage->LevelFiles(level)) {
590 files.push_back(f);
591 }
592 if (AreFilesInCompaction(files)) {
593 *manual_conflict = true;
594 return nullptr;
595 }
596 }
597
598 // 2 non-exclusive manual compactions could run at the same time producing
599 // overlaping outputs in the same level.
600 if (FilesRangeOverlapWithCompaction(inputs, output_level)) {
601 // This compaction output could potentially conflict with the output
602 // of a currently running compaction, we cannot run it.
603 *manual_conflict = true;
604 return nullptr;
605 }
606
607 Compaction* c = new Compaction(
608 vstorage, ioptions_, mutable_cf_options, std::move(inputs),
609 output_level,
610 MaxFileSizeForLevel(mutable_cf_options, output_level,
611 ioptions_.compaction_style),
612 /* max_compaction_bytes */ LLONG_MAX, output_path_id,
613 GetCompressionType(ioptions_, vstorage, mutable_cf_options,
614 output_level, 1),
615 GetCompressionOptions(ioptions_, vstorage, output_level),
616 max_subcompactions, /* grandparents */ {}, /* is manual */ true);
617 RegisterCompaction(c);
618 return c;
619 }
620
621 CompactionInputFiles inputs;
622 inputs.level = input_level;
623 bool covering_the_whole_range = true;
624
625 // All files are 'overlapping' in universal style compaction.
626 // We have to compact the entire range in one shot.
627 if (ioptions_.compaction_style == kCompactionStyleUniversal) {
628 begin = nullptr;
629 end = nullptr;
630 }
631
632 vstorage->GetOverlappingInputs(input_level, begin, end, &inputs.files);
633 if (inputs.empty()) {
634 return nullptr;
635 }
636
637 if ((input_level == 0) && (!level0_compactions_in_progress_.empty())) {
638 // Only one level 0 compaction allowed
639 TEST_SYNC_POINT("CompactionPicker::CompactRange:Conflict");
640 *manual_conflict = true;
641 return nullptr;
642 }
643
644 // Avoid compacting too much in one shot in case the range is large.
645 // But we cannot do this for level-0 since level-0 files can overlap
646 // and we must not pick one file and drop another older file if the
647 // two files overlap.
648 if (input_level > 0) {
649 const uint64_t limit = mutable_cf_options.max_compaction_bytes;
650 uint64_t total = 0;
651 for (size_t i = 0; i + 1 < inputs.size(); ++i) {
652 uint64_t s = inputs[i]->compensated_file_size;
653 total += s;
654 if (total >= limit) {
655 covering_the_whole_range = false;
656 inputs.files.resize(i + 1);
657 break;
658 }
659 }
660 }
661 assert(output_path_id < static_cast<uint32_t>(ioptions_.cf_paths.size()));
662
663 InternalKey key_storage;
664 InternalKey* next_smallest = &key_storage;
665 if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs, &next_smallest) ==
666 false) {
667 // manual compaction is now multi-threaded, so it can
668 // happen that ExpandWhileOverlapping fails
669 // we handle it higher in RunManualCompaction
670 *manual_conflict = true;
671 return nullptr;
672 }
673
674 if (covering_the_whole_range || !next_smallest) {
675 *compaction_end = nullptr;
676 } else {
677 **compaction_end = *next_smallest;
678 }
679
680 CompactionInputFiles output_level_inputs;
681 if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
682 assert(input_level == 0);
683 output_level = vstorage->base_level();
684 assert(output_level > 0);
685 }
686 output_level_inputs.level = output_level;
687 if (input_level != output_level) {
688 int parent_index = -1;
689 if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
690 &output_level_inputs, &parent_index, -1)) {
691 // manual compaction is now multi-threaded, so it can
692 // happen that SetupOtherInputs fails
693 // we handle it higher in RunManualCompaction
694 *manual_conflict = true;
695 return nullptr;
696 }
697 }
698
699 std::vector<CompactionInputFiles> compaction_inputs({inputs});
700 if (!output_level_inputs.empty()) {
701 compaction_inputs.push_back(output_level_inputs);
702 }
703 for (size_t i = 0; i < compaction_inputs.size(); i++) {
704 if (AreFilesInCompaction(compaction_inputs[i].files)) {
705 *manual_conflict = true;
706 return nullptr;
707 }
708 }
709
710 // 2 non-exclusive manual compactions could run at the same time producing
711 // overlaping outputs in the same level.
712 if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) {
713 // This compaction output could potentially conflict with the output
714 // of a currently running compaction, we cannot run it.
715 *manual_conflict = true;
716 return nullptr;
717 }
718
719 std::vector<FileMetaData*> grandparents;
720 GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
721 Compaction* compaction = new Compaction(
722 vstorage, ioptions_, mutable_cf_options, std::move(compaction_inputs),
723 output_level,
724 MaxFileSizeForLevel(mutable_cf_options, output_level,
725 ioptions_.compaction_style, vstorage->base_level(),
726 ioptions_.level_compaction_dynamic_level_bytes),
727 mutable_cf_options.max_compaction_bytes, output_path_id,
728 GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
729 vstorage->base_level()),
730 GetCompressionOptions(ioptions_, vstorage, output_level),
731 max_subcompactions, std::move(grandparents),
732 /* is manual compaction */ true);
733
734 TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
735 RegisterCompaction(compaction);
736
737 // Creating a compaction influences the compaction score because the score
738 // takes running compactions into account (by skipping files that are already
739 // being compacted). Since we just changed compaction score, we recalculate it
740 // here
741 vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options);
742
743 return compaction;
744 }
745
746 #ifndef ROCKSDB_LITE
747 namespace {
748 // Test whether two files have overlapping key-ranges.
749 bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a,
750 const SstFileMetaData& b) {
751 if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
752 if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
753 // b.smallestkey <= a.smallestkey <= b.largestkey
754 return true;
755 }
756 } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
757 // a.smallestkey < b.smallestkey <= a.largestkey
758 return true;
759 }
760 if (c->Compare(a.largestkey, b.largestkey) <= 0) {
761 if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
762 // b.smallestkey <= a.largestkey <= b.largestkey
763 return true;
764 }
765 } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
766 // a.smallestkey <= b.largestkey < a.largestkey
767 return true;
768 }
769 return false;
770 }
771 } // namespace
772
773 Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
774 std::unordered_set<uint64_t>* input_files,
775 const ColumnFamilyMetaData& cf_meta, const int output_level) const {
776 auto& levels = cf_meta.levels;
777 auto comparator = icmp_->user_comparator();
778
779 // TODO(yhchiang): add is_adjustable to CompactionOptions
780
781 // the smallest and largest key of the current compaction input
782 std::string smallestkey;
783 std::string largestkey;
784 // a flag for initializing smallest and largest key
785 bool is_first = false;
786 const int kNotFound = -1;
787
788 // For each level, it does the following things:
789 // 1. Find the first and the last compaction input files
790 // in the current level.
791 // 2. Include all files between the first and the last
792 // compaction input files.
793 // 3. Update the compaction key-range.
794 // 4. For all remaining levels, include files that have
795 // overlapping key-range with the compaction key-range.
796 for (int l = 0; l <= output_level; ++l) {
797 auto& current_files = levels[l].files;
798 int first_included = static_cast<int>(current_files.size());
799 int last_included = kNotFound;
800
801 // identify the first and the last compaction input files
802 // in the current level.
803 for (size_t f = 0; f < current_files.size(); ++f) {
804 if (input_files->find(TableFileNameToNumber(current_files[f].name)) !=
805 input_files->end()) {
806 first_included = std::min(first_included, static_cast<int>(f));
807 last_included = std::max(last_included, static_cast<int>(f));
808 if (is_first == false) {
809 smallestkey = current_files[f].smallestkey;
810 largestkey = current_files[f].largestkey;
811 is_first = true;
812 }
813 }
814 }
815 if (last_included == kNotFound) {
816 continue;
817 }
818
819 if (l != 0) {
820 // expend the compaction input of the current level if it
821 // has overlapping key-range with other non-compaction input
822 // files in the same level.
823 while (first_included > 0) {
824 if (comparator->Compare(current_files[first_included - 1].largestkey,
825 current_files[first_included].smallestkey) <
826 0) {
827 break;
828 }
829 first_included--;
830 }
831
832 while (last_included < static_cast<int>(current_files.size()) - 1) {
833 if (comparator->Compare(current_files[last_included + 1].smallestkey,
834 current_files[last_included].largestkey) > 0) {
835 break;
836 }
837 last_included++;
838 }
839 } else if (output_level > 0) {
840 last_included = static_cast<int>(current_files.size() - 1);
841 }
842
843 // include all files between the first and the last compaction input files.
844 for (int f = first_included; f <= last_included; ++f) {
845 if (current_files[f].being_compacted) {
846 return Status::Aborted("Necessary compaction input file " +
847 current_files[f].name +
848 " is currently being compacted.");
849 }
850 input_files->insert(TableFileNameToNumber(current_files[f].name));
851 }
852
853 // update smallest and largest key
854 if (l == 0) {
855 for (int f = first_included; f <= last_included; ++f) {
856 if (comparator->Compare(smallestkey, current_files[f].smallestkey) >
857 0) {
858 smallestkey = current_files[f].smallestkey;
859 }
860 if (comparator->Compare(largestkey, current_files[f].largestkey) < 0) {
861 largestkey = current_files[f].largestkey;
862 }
863 }
864 } else {
865 if (comparator->Compare(smallestkey,
866 current_files[first_included].smallestkey) > 0) {
867 smallestkey = current_files[first_included].smallestkey;
868 }
869 if (comparator->Compare(largestkey,
870 current_files[last_included].largestkey) < 0) {
871 largestkey = current_files[last_included].largestkey;
872 }
873 }
874
875 SstFileMetaData aggregated_file_meta;
876 aggregated_file_meta.smallestkey = smallestkey;
877 aggregated_file_meta.largestkey = largestkey;
878
879 // For all lower levels, include all overlapping files.
880 // We need to add overlapping files from the current level too because even
881 // if there no input_files in level l, we would still need to add files
882 // which overlap with the range containing the input_files in levels 0 to l
883 // Level 0 doesn't need to be handled this way because files are sorted by
884 // time and not by key
885 for (int m = std::max(l, 1); m <= output_level; ++m) {
886 for (auto& next_lv_file : levels[m].files) {
887 if (HaveOverlappingKeyRanges(comparator, aggregated_file_meta,
888 next_lv_file)) {
889 if (next_lv_file.being_compacted) {
890 return Status::Aborted(
891 "File " + next_lv_file.name +
892 " that has overlapping key range with one of the compaction "
893 " input file is currently being compacted.");
894 }
895 input_files->insert(TableFileNameToNumber(next_lv_file.name));
896 }
897 }
898 }
899 }
900 if (RangeOverlapWithCompaction(smallestkey, largestkey, output_level)) {
901 return Status::Aborted(
902 "A running compaction is writing to the same output level in an "
903 "overlapping key range");
904 }
905 return Status::OK();
906 }
907
908 Status CompactionPicker::SanitizeCompactionInputFiles(
909 std::unordered_set<uint64_t>* input_files,
910 const ColumnFamilyMetaData& cf_meta, const int output_level) const {
911 assert(static_cast<int>(cf_meta.levels.size()) - 1 ==
912 cf_meta.levels[cf_meta.levels.size() - 1].level);
913 if (output_level >= static_cast<int>(cf_meta.levels.size())) {
914 return Status::InvalidArgument(
915 "Output level for column family " + cf_meta.name +
916 " must between [0, " +
917 ToString(cf_meta.levels[cf_meta.levels.size() - 1].level) + "].");
918 }
919
920 if (output_level > MaxOutputLevel()) {
921 return Status::InvalidArgument(
922 "Exceed the maximum output level defined by "
923 "the current compaction algorithm --- " +
924 ToString(MaxOutputLevel()));
925 }
926
927 if (output_level < 0) {
928 return Status::InvalidArgument("Output level cannot be negative.");
929 }
930
931 if (input_files->size() == 0) {
932 return Status::InvalidArgument(
933 "A compaction must contain at least one file.");
934 }
935
936 Status s = SanitizeCompactionInputFilesForAllLevels(input_files, cf_meta,
937 output_level);
938
939 if (!s.ok()) {
940 return s;
941 }
942
943 // for all input files, check whether the file number matches
944 // any currently-existing files.
945 for (auto file_num : *input_files) {
946 bool found = false;
947 for (const auto& level_meta : cf_meta.levels) {
948 for (const auto& file_meta : level_meta.files) {
949 if (file_num == TableFileNameToNumber(file_meta.name)) {
950 if (file_meta.being_compacted) {
951 return Status::Aborted("Specified compaction input file " +
952 MakeTableFileName("", file_num) +
953 " is already being compacted.");
954 }
955 found = true;
956 break;
957 }
958 }
959 if (found) {
960 break;
961 }
962 }
963 if (!found) {
964 return Status::InvalidArgument(
965 "Specified compaction input file " + MakeTableFileName("", file_num) +
966 " does not exist in column family " + cf_meta.name + ".");
967 }
968 }
969
970 return Status::OK();
971 }
972 #endif // !ROCKSDB_LITE
973
974 void CompactionPicker::RegisterCompaction(Compaction* c) {
975 if (c == nullptr) {
976 return;
977 }
978 assert(ioptions_.compaction_style != kCompactionStyleLevel ||
979 c->output_level() == 0 ||
980 !FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level()));
981 if (c->start_level() == 0 ||
982 ioptions_.compaction_style == kCompactionStyleUniversal) {
983 level0_compactions_in_progress_.insert(c);
984 }
985 compactions_in_progress_.insert(c);
986 }
987
988 void CompactionPicker::UnregisterCompaction(Compaction* c) {
989 if (c == nullptr) {
990 return;
991 }
992 if (c->start_level() == 0 ||
993 ioptions_.compaction_style == kCompactionStyleUniversal) {
994 level0_compactions_in_progress_.erase(c);
995 }
996 compactions_in_progress_.erase(c);
997 }
998
999 void CompactionPicker::PickFilesMarkedForCompaction(
1000 const std::string& cf_name, VersionStorageInfo* vstorage, int* start_level,
1001 int* output_level, CompactionInputFiles* start_level_inputs) {
1002 if (vstorage->FilesMarkedForCompaction().empty()) {
1003 return;
1004 }
1005
1006 auto continuation = [&, cf_name](std::pair<int, FileMetaData*> level_file) {
1007 // If it's being compacted it has nothing to do here.
1008 // If this assert() fails that means that some function marked some
1009 // files as being_compacted, but didn't call ComputeCompactionScore()
1010 assert(!level_file.second->being_compacted);
1011 *start_level = level_file.first;
1012 *output_level =
1013 (*start_level == 0) ? vstorage->base_level() : *start_level + 1;
1014
1015 if (*start_level == 0 && !level0_compactions_in_progress()->empty()) {
1016 return false;
1017 }
1018
1019 start_level_inputs->files = {level_file.second};
1020 start_level_inputs->level = *start_level;
1021 return ExpandInputsToCleanCut(cf_name, vstorage, start_level_inputs);
1022 };
1023
1024 // take a chance on a random file first
1025 Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage));
1026 size_t random_file_index = static_cast<size_t>(rnd.Uniform(
1027 static_cast<uint64_t>(vstorage->FilesMarkedForCompaction().size())));
1028
1029 if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) {
1030 // found the compaction!
1031 return;
1032 }
1033
1034 for (auto& level_file : vstorage->FilesMarkedForCompaction()) {
1035 if (continuation(level_file)) {
1036 // found the compaction!
1037 return;
1038 }
1039 }
1040 start_level_inputs->files.clear();
1041 }
1042
1043 bool CompactionPicker::GetOverlappingL0Files(
1044 VersionStorageInfo* vstorage, CompactionInputFiles* start_level_inputs,
1045 int output_level, int* parent_index) {
1046 // Two level 0 compaction won't run at the same time, so don't need to worry
1047 // about files on level 0 being compacted.
1048 assert(level0_compactions_in_progress()->empty());
1049 InternalKey smallest, largest;
1050 GetRange(*start_level_inputs, &smallest, &largest);
1051 // Note that the next call will discard the file we placed in
1052 // c->inputs_[0] earlier and replace it with an overlapping set
1053 // which will include the picked file.
1054 start_level_inputs->files.clear();
1055 vstorage->GetOverlappingInputs(0, &smallest, &largest,
1056 &(start_level_inputs->files));
1057
1058 // If we include more L0 files in the same compaction run it can
1059 // cause the 'smallest' and 'largest' key to get extended to a
1060 // larger range. So, re-invoke GetRange to get the new key range
1061 GetRange(*start_level_inputs, &smallest, &largest);
1062 if (IsRangeInCompaction(vstorage, &smallest, &largest, output_level,
1063 parent_index)) {
1064 return false;
1065 }
1066 assert(!start_level_inputs->files.empty());
1067
1068 return true;
1069 }
1070
1071 bool LevelCompactionPicker::NeedsCompaction(
1072 const VersionStorageInfo* vstorage) const {
1073 if (!vstorage->ExpiredTtlFiles().empty()) {
1074 return true;
1075 }
1076 if (!vstorage->BottommostFilesMarkedForCompaction().empty()) {
1077 return true;
1078 }
1079 if (!vstorage->FilesMarkedForCompaction().empty()) {
1080 return true;
1081 }
1082 for (int i = 0; i <= vstorage->MaxInputLevel(); i++) {
1083 if (vstorage->CompactionScore(i) >= 1) {
1084 return true;
1085 }
1086 }
1087 return false;
1088 }
1089
1090 namespace {
1091 // A class to build a leveled compaction step-by-step.
1092 class LevelCompactionBuilder {
1093 public:
1094 LevelCompactionBuilder(const std::string& cf_name,
1095 VersionStorageInfo* vstorage,
1096 CompactionPicker* compaction_picker,
1097 LogBuffer* log_buffer,
1098 const MutableCFOptions& mutable_cf_options,
1099 const ImmutableCFOptions& ioptions)
1100 : cf_name_(cf_name),
1101 vstorage_(vstorage),
1102 compaction_picker_(compaction_picker),
1103 log_buffer_(log_buffer),
1104 mutable_cf_options_(mutable_cf_options),
1105 ioptions_(ioptions) {}
1106
1107 // Pick and return a compaction.
1108 Compaction* PickCompaction();
1109
1110 // Pick the initial files to compact to the next level. (or together
1111 // in Intra-L0 compactions)
1112 void SetupInitialFiles();
1113
1114 // If the initial files are from L0 level, pick other L0
1115 // files if needed.
1116 bool SetupOtherL0FilesIfNeeded();
1117
1118 // Based on initial files, setup other files need to be compacted
1119 // in this compaction, accordingly.
1120 bool SetupOtherInputsIfNeeded();
1121
1122 Compaction* GetCompaction();
1123
1124 // For the specfied level, pick a file that we want to compact.
1125 // Returns false if there is no file to compact.
1126 // If it returns true, inputs->files.size() will be exactly one.
1127 // If level is 0 and there is already a compaction on that level, this
1128 // function will return false.
1129 bool PickFileToCompact();
1130
1131 // For L0->L0, picks the longest span of files that aren't currently
1132 // undergoing compaction for which work-per-deleted-file decreases. The span
1133 // always starts from the newest L0 file.
1134 //
1135 // Intra-L0 compaction is independent of all other files, so it can be
1136 // performed even when L0->base_level compactions are blocked.
1137 //
1138 // Returns true if `inputs` is populated with a span of files to be compacted;
1139 // otherwise, returns false.
1140 bool PickIntraL0Compaction();
1141
1142 void PickExpiredTtlFiles();
1143
1144 const std::string& cf_name_;
1145 VersionStorageInfo* vstorage_;
1146 CompactionPicker* compaction_picker_;
1147 LogBuffer* log_buffer_;
1148 int start_level_ = -1;
1149 int output_level_ = -1;
1150 int parent_index_ = -1;
1151 int base_index_ = -1;
1152 double start_level_score_ = 0;
1153 bool is_manual_ = false;
1154 CompactionInputFiles start_level_inputs_;
1155 std::vector<CompactionInputFiles> compaction_inputs_;
1156 CompactionInputFiles output_level_inputs_;
1157 std::vector<FileMetaData*> grandparents_;
1158 CompactionReason compaction_reason_ = CompactionReason::kUnknown;
1159
1160 const MutableCFOptions& mutable_cf_options_;
1161 const ImmutableCFOptions& ioptions_;
1162 // Pick a path ID to place a newly generated file, with its level
1163 static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
1164 const MutableCFOptions& mutable_cf_options,
1165 int level);
1166
1167 static const int kMinFilesForIntraL0Compaction = 4;
1168 };
1169
1170 void LevelCompactionBuilder::PickExpiredTtlFiles() {
1171 if (vstorage_->ExpiredTtlFiles().empty()) {
1172 return;
1173 }
1174
1175 auto continuation = [&](std::pair<int, FileMetaData*> level_file) {
1176 // If it's being compacted it has nothing to do here.
1177 // If this assert() fails that means that some function marked some
1178 // files as being_compacted, but didn't call ComputeCompactionScore()
1179 assert(!level_file.second->being_compacted);
1180 start_level_ = level_file.first;
1181 output_level_ =
1182 (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
1183
1184 if ((start_level_ == vstorage_->num_non_empty_levels() - 1) ||
1185 (start_level_ == 0 &&
1186 !compaction_picker_->level0_compactions_in_progress()->empty())) {
1187 return false;
1188 }
1189
1190 start_level_inputs_.files = {level_file.second};
1191 start_level_inputs_.level = start_level_;
1192 return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
1193 &start_level_inputs_);
1194 };
1195
1196 for (auto& level_file : vstorage_->ExpiredTtlFiles()) {
1197 if (continuation(level_file)) {
1198 // found the compaction!
1199 return;
1200 }
1201 }
1202
1203 start_level_inputs_.files.clear();
1204 }
1205
1206 void LevelCompactionBuilder::SetupInitialFiles() {
1207 // Find the compactions by size on all levels.
1208 bool skipped_l0_to_base = false;
1209 for (int i = 0; i < compaction_picker_->NumberLevels() - 1; i++) {
1210 start_level_score_ = vstorage_->CompactionScore(i);
1211 start_level_ = vstorage_->CompactionScoreLevel(i);
1212 assert(i == 0 || start_level_score_ <= vstorage_->CompactionScore(i - 1));
1213 if (start_level_score_ >= 1) {
1214 if (skipped_l0_to_base && start_level_ == vstorage_->base_level()) {
1215 // If L0->base_level compaction is pending, don't schedule further
1216 // compaction from base level. Otherwise L0->base_level compaction
1217 // may starve.
1218 continue;
1219 }
1220 output_level_ =
1221 (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
1222 if (PickFileToCompact()) {
1223 // found the compaction!
1224 if (start_level_ == 0) {
1225 // L0 score = `num L0 files` / `level0_file_num_compaction_trigger`
1226 compaction_reason_ = CompactionReason::kLevelL0FilesNum;
1227 } else {
1228 // L1+ score = `Level files size` / `MaxBytesForLevel`
1229 compaction_reason_ = CompactionReason::kLevelMaxLevelSize;
1230 }
1231 break;
1232 } else {
1233 // didn't find the compaction, clear the inputs
1234 start_level_inputs_.clear();
1235 if (start_level_ == 0) {
1236 skipped_l0_to_base = true;
1237 // L0->base_level may be blocked due to ongoing L0->base_level
1238 // compactions. It may also be blocked by an ongoing compaction from
1239 // base_level downwards.
1240 //
1241 // In these cases, to reduce L0 file count and thus reduce likelihood
1242 // of write stalls, we can attempt compacting a span of files within
1243 // L0.
1244 if (PickIntraL0Compaction()) {
1245 output_level_ = 0;
1246 compaction_reason_ = CompactionReason::kLevelL0FilesNum;
1247 break;
1248 }
1249 }
1250 }
1251 }
1252 }
1253
1254 // if we didn't find a compaction, check if there are any files marked for
1255 // compaction
1256 if (start_level_inputs_.empty()) {
1257 parent_index_ = base_index_ = -1;
1258
1259 // PickFilesMarkedForCompaction();
1260 compaction_picker_->PickFilesMarkedForCompaction(
1261 cf_name_, vstorage_, &start_level_, &output_level_, &start_level_inputs_);
1262 if (!start_level_inputs_.empty()) {
1263 is_manual_ = true;
1264 compaction_reason_ = CompactionReason::kFilesMarkedForCompaction;
1265 return;
1266 }
1267
1268 size_t i;
1269 for (i = 0; i < vstorage_->BottommostFilesMarkedForCompaction().size();
1270 ++i) {
1271 auto& level_and_file = vstorage_->BottommostFilesMarkedForCompaction()[i];
1272 assert(!level_and_file.second->being_compacted);
1273 start_level_inputs_.level = output_level_ = start_level_ =
1274 level_and_file.first;
1275 start_level_inputs_.files = {level_and_file.second};
1276 if (compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
1277 &start_level_inputs_)) {
1278 break;
1279 }
1280 }
1281 if (i == vstorage_->BottommostFilesMarkedForCompaction().size()) {
1282 start_level_inputs_.clear();
1283 } else {
1284 assert(!start_level_inputs_.empty());
1285 compaction_reason_ = CompactionReason::kBottommostFiles;
1286 return;
1287 }
1288
1289 assert(start_level_inputs_.empty());
1290 PickExpiredTtlFiles();
1291 if (!start_level_inputs_.empty()) {
1292 compaction_reason_ = CompactionReason::kTtl;
1293 }
1294 }
1295 }
1296
1297 bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() {
1298 if (start_level_ == 0 && output_level_ != 0) {
1299 return compaction_picker_->GetOverlappingL0Files(
1300 vstorage_, &start_level_inputs_, output_level_, &parent_index_);
1301 }
1302 return true;
1303 }
1304
1305 bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
1306 // Setup input files from output level. For output to L0, we only compact
1307 // spans of files that do not interact with any pending compactions, so don't
1308 // need to consider other levels.
1309 if (output_level_ != 0) {
1310 output_level_inputs_.level = output_level_;
1311 if (!compaction_picker_->SetupOtherInputs(
1312 cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_,
1313 &output_level_inputs_, &parent_index_, base_index_)) {
1314 return false;
1315 }
1316
1317 compaction_inputs_.push_back(start_level_inputs_);
1318 if (!output_level_inputs_.empty()) {
1319 compaction_inputs_.push_back(output_level_inputs_);
1320 }
1321
1322 // In some edge cases we could pick a compaction that will be compacting
1323 // a key range that overlap with another running compaction, and both
1324 // of them have the same output level. This could happen if
1325 // (1) we are running a non-exclusive manual compaction
1326 // (2) AddFile ingest a new file into the LSM tree
1327 // We need to disallow this from happening.
1328 if (compaction_picker_->FilesRangeOverlapWithCompaction(compaction_inputs_,
1329 output_level_)) {
1330 // This compaction output could potentially conflict with the output
1331 // of a currently running compaction, we cannot run it.
1332 return false;
1333 }
1334 compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_,
1335 output_level_inputs_, &grandparents_);
1336 } else {
1337 compaction_inputs_.push_back(start_level_inputs_);
1338 }
1339 return true;
1340 }
1341
1342 Compaction* LevelCompactionBuilder::PickCompaction() {
1343 // Pick up the first file to start compaction. It may have been extended
1344 // to a clean cut.
1345 SetupInitialFiles();
1346 if (start_level_inputs_.empty()) {
1347 return nullptr;
1348 }
1349 assert(start_level_ >= 0 && output_level_ >= 0);
1350
1351 // If it is a L0 -> base level compaction, we need to set up other L0
1352 // files if needed.
1353 if (!SetupOtherL0FilesIfNeeded()) {
1354 return nullptr;
1355 }
1356
1357 // Pick files in the output level and expand more files in the start level
1358 // if needed.
1359 if (!SetupOtherInputsIfNeeded()) {
1360 return nullptr;
1361 }
1362
1363 // Form a compaction object containing the files we picked.
1364 Compaction* c = GetCompaction();
1365
1366 TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);
1367
1368 return c;
1369 }
1370
1371 Compaction* LevelCompactionBuilder::GetCompaction() {
1372 auto c = new Compaction(
1373 vstorage_, ioptions_, mutable_cf_options_, std::move(compaction_inputs_),
1374 output_level_,
1375 MaxFileSizeForLevel(mutable_cf_options_, output_level_,
1376 ioptions_.compaction_style, vstorage_->base_level(),
1377 ioptions_.level_compaction_dynamic_level_bytes),
1378 mutable_cf_options_.max_compaction_bytes,
1379 GetPathId(ioptions_, mutable_cf_options_, output_level_),
1380 GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
1381 output_level_, vstorage_->base_level()),
1382 GetCompressionOptions(ioptions_, vstorage_, output_level_),
1383 /* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
1384 start_level_score_, false /* deletion_compaction */, compaction_reason_);
1385
1386 // If it's level 0 compaction, make sure we don't execute any other level 0
1387 // compactions in parallel
1388 compaction_picker_->RegisterCompaction(c);
1389
1390 // Creating a compaction influences the compaction score because the score
1391 // takes running compactions into account (by skipping files that are already
1392 // being compacted). Since we just changed compaction score, we recalculate it
1393 // here
1394 vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
1395 return c;
1396 }
1397
1398 /*
1399 * Find the optimal path to place a file
1400 * Given a level, finds the path where levels up to it will fit in levels
1401 * up to and including this path
1402 */
1403 uint32_t LevelCompactionBuilder::GetPathId(
1404 const ImmutableCFOptions& ioptions,
1405 const MutableCFOptions& mutable_cf_options, int level) {
1406 uint32_t p = 0;
1407 assert(!ioptions.cf_paths.empty());
1408
1409 // size remaining in the most recent path
1410 uint64_t current_path_size = ioptions.cf_paths[0].target_size;
1411
1412 uint64_t level_size;
1413 int cur_level = 0;
1414
1415 // max_bytes_for_level_base denotes L1 size.
1416 // We estimate L0 size to be the same as L1.
1417 level_size = mutable_cf_options.max_bytes_for_level_base;
1418
1419 // Last path is the fallback
1420 while (p < ioptions.cf_paths.size() - 1) {
1421 if (level_size <= current_path_size) {
1422 if (cur_level == level) {
1423 // Does desired level fit in this path?
1424 return p;
1425 } else {
1426 current_path_size -= level_size;
1427 if (cur_level > 0) {
1428 if (ioptions.level_compaction_dynamic_level_bytes) {
1429 // Currently, level_compaction_dynamic_level_bytes is ignored when
1430 // multiple db paths are specified. https://github.com/facebook/
1431 // rocksdb/blob/master/db/column_family.cc.
1432 // Still, adding this check to avoid accidentally using
1433 // max_bytes_for_level_multiplier_additional
1434 level_size = static_cast<uint64_t>(
1435 level_size * mutable_cf_options.max_bytes_for_level_multiplier);
1436 } else {
1437 level_size = static_cast<uint64_t>(
1438 level_size * mutable_cf_options.max_bytes_for_level_multiplier *
1439 mutable_cf_options.MaxBytesMultiplerAdditional(cur_level));
1440 }
1441 }
1442 cur_level++;
1443 continue;
1444 }
1445 }
1446 p++;
1447 current_path_size = ioptions.cf_paths[p].target_size;
1448 }
1449 return p;
1450 }
1451
1452 bool LevelCompactionBuilder::PickFileToCompact() {
1453 // level 0 files are overlapping. So we cannot pick more
1454 // than one concurrent compactions at this level. This
1455 // could be made better by looking at key-ranges that are
1456 // being compacted at level 0.
1457 if (start_level_ == 0 &&
1458 !compaction_picker_->level0_compactions_in_progress()->empty()) {
1459 TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0");
1460 return false;
1461 }
1462
1463 start_level_inputs_.clear();
1464
1465 assert(start_level_ >= 0);
1466
1467 // Pick the largest file in this level that is not already
1468 // being compacted
1469 const std::vector<int>& file_size =
1470 vstorage_->FilesByCompactionPri(start_level_);
1471 const std::vector<FileMetaData*>& level_files =
1472 vstorage_->LevelFiles(start_level_);
1473
1474 unsigned int cmp_idx;
1475 for (cmp_idx = vstorage_->NextCompactionIndex(start_level_);
1476 cmp_idx < file_size.size(); cmp_idx++) {
1477 int index = file_size[cmp_idx];
1478 auto* f = level_files[index];
1479
1480 // do not pick a file to compact if it is being compacted
1481 // from n-1 level.
1482 if (f->being_compacted) {
1483 continue;
1484 }
1485
1486 start_level_inputs_.files.push_back(f);
1487 start_level_inputs_.level = start_level_;
1488 if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
1489 &start_level_inputs_) ||
1490 compaction_picker_->FilesRangeOverlapWithCompaction(
1491 {start_level_inputs_}, output_level_)) {
1492 // A locked (pending compaction) input-level file was pulled in due to
1493 // user-key overlap.
1494 start_level_inputs_.clear();
1495 continue;
1496 }
1497
1498 // Now that input level is fully expanded, we check whether any output files
1499 // are locked due to pending compaction.
1500 //
1501 // Note we rely on ExpandInputsToCleanCut() to tell us whether any output-
1502 // level files are locked, not just the extra ones pulled in for user-key
1503 // overlap.
1504 InternalKey smallest, largest;
1505 compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
1506 CompactionInputFiles output_level_inputs;
1507 output_level_inputs.level = output_level_;
1508 vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest,
1509 &output_level_inputs.files);
1510 if (!output_level_inputs.empty() &&
1511 !compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
1512 &output_level_inputs)) {
1513 start_level_inputs_.clear();
1514 continue;
1515 }
1516 base_index_ = index;
1517 break;
1518 }
1519
1520 // store where to start the iteration in the next call to PickCompaction
1521 vstorage_->SetNextCompactionIndex(start_level_, cmp_idx);
1522
1523 return start_level_inputs_.size() > 0;
1524 }
1525
1526 bool LevelCompactionBuilder::PickIntraL0Compaction() {
1527 start_level_inputs_.clear();
1528 const std::vector<FileMetaData*>& level_files =
1529 vstorage_->LevelFiles(0 /* level */);
1530 if (level_files.size() <
1531 static_cast<size_t>(
1532 mutable_cf_options_.level0_file_num_compaction_trigger + 2) ||
1533 level_files[0]->being_compacted) {
1534 // If L0 isn't accumulating much files beyond the regular trigger, don't
1535 // resort to L0->L0 compaction yet.
1536 return false;
1537 }
1538 return FindIntraL0Compaction(level_files, kMinFilesForIntraL0Compaction,
1539 port::kMaxUint64, &start_level_inputs_);
1540 }
1541 } // namespace
1542
1543 Compaction* LevelCompactionPicker::PickCompaction(
1544 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1545 VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
1546 LevelCompactionBuilder builder(cf_name, vstorage, this, log_buffer,
1547 mutable_cf_options, ioptions_);
1548 return builder.PickCompaction();
1549 }
1550
1551 } // namespace rocksdb