]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/compaction_picker.cc
update sources to v12.2.3
[ceph.git] / ceph / src / rocksdb / db / compaction_picker.cc
CommitLineData
7c673cae
FG
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under the BSD-style license found in the
3// LICENSE file in the root directory of this source tree. An additional grant
4// of patent rights can be found in the PATENTS file in the same directory.
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/compaction_picker.h"
11
12#ifndef __STDC_FORMAT_MACROS
13#define __STDC_FORMAT_MACROS
14#endif
15
16#include <inttypes.h>
17#include <limits>
18#include <queue>
19#include <string>
20#include <utility>
21#include "db/column_family.h"
22#include "monitoring/statistics.h"
23#include "util/filename.h"
24#include "util/log_buffer.h"
25#include "util/random.h"
26#include "util/string_util.h"
27#include "util/sync_point.h"
28
29namespace rocksdb {
30
31namespace {
32uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
33 uint64_t sum = 0;
34 for (size_t i = 0; i < files.size() && files[i]; i++) {
35 sum += files[i]->compensated_file_size;
36 }
37 return sum;
38}
39} // anonymous namespace
40
41// Determine compression type, based on user options, level of the output
42// file and whether compression is disabled.
43// If enable_compression is false, then compression is always disabled no
44// matter what the values of the other two parameters are.
45// Otherwise, the compression type is determined based on options and level.
46CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
47 const VersionStorageInfo* vstorage,
48 const MutableCFOptions& mutable_cf_options,
49 int level, int base_level,
50 const bool enable_compression) {
51 if (!enable_compression) {
52 // disable compression
53 return kNoCompression;
54 }
55
56 // If bottommost_compression is set and we are compacting to the
57 // bottommost level then we should use it.
58 if (ioptions.bottommost_compression != kDisableCompressionOption &&
59 level > base_level && level >= (vstorage->num_non_empty_levels() - 1)) {
60 return ioptions.bottommost_compression;
61 }
62 // If the user has specified a different compression level for each level,
63 // then pick the compression for that level.
64 if (!ioptions.compression_per_level.empty()) {
65 assert(level == 0 || level >= base_level);
66 int idx = (level == 0) ? 0 : level - base_level + 1;
67
68 const int n = static_cast<int>(ioptions.compression_per_level.size()) - 1;
69 // It is possible for level_ to be -1; in that case, we use level
70 // 0's compression. This occurs mostly in backwards compatibility
71 // situations when the builder doesn't know what level the file
72 // belongs to. Likewise, if level is beyond the end of the
73 // specified compression levels, use the last value.
74 return ioptions.compression_per_level[std::max(0, std::min(idx, n))];
75 } else {
76 return mutable_cf_options.compression;
77 }
78}
79
80CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
81 const InternalKeyComparator* icmp)
82 : ioptions_(ioptions), icmp_(icmp) {}
83
84CompactionPicker::~CompactionPicker() {}
85
86// Delete this compaction from the list of running compactions.
87void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
88 UnregisterCompaction(c);
89 if (!status.ok()) {
90 c->ResetNextCompactionIndex();
91 }
92}
93
94void CompactionPicker::GetRange(const CompactionInputFiles& inputs,
95 InternalKey* smallest,
96 InternalKey* largest) const {
97 const int level = inputs.level;
98 assert(!inputs.empty());
99 smallest->Clear();
100 largest->Clear();
101
102 if (level == 0) {
103 for (size_t i = 0; i < inputs.size(); i++) {
104 FileMetaData* f = inputs[i];
105 if (i == 0) {
106 *smallest = f->smallest;
107 *largest = f->largest;
108 } else {
109 if (icmp_->Compare(f->smallest, *smallest) < 0) {
110 *smallest = f->smallest;
111 }
112 if (icmp_->Compare(f->largest, *largest) > 0) {
113 *largest = f->largest;
114 }
115 }
116 }
117 } else {
118 *smallest = inputs[0]->smallest;
119 *largest = inputs[inputs.size() - 1]->largest;
120 }
121}
122
123void CompactionPicker::GetRange(const CompactionInputFiles& inputs1,
124 const CompactionInputFiles& inputs2,
125 InternalKey* smallest,
126 InternalKey* largest) const {
127 assert(!inputs1.empty() || !inputs2.empty());
128 if (inputs1.empty()) {
129 GetRange(inputs2, smallest, largest);
130 } else if (inputs2.empty()) {
131 GetRange(inputs1, smallest, largest);
132 } else {
133 InternalKey smallest1, smallest2, largest1, largest2;
134 GetRange(inputs1, &smallest1, &largest1);
135 GetRange(inputs2, &smallest2, &largest2);
136 *smallest =
137 icmp_->Compare(smallest1, smallest2) < 0 ? smallest1 : smallest2;
138 *largest = icmp_->Compare(largest1, largest2) < 0 ? largest2 : largest1;
139 }
140}
141
142void CompactionPicker::GetRange(const std::vector<CompactionInputFiles>& inputs,
143 InternalKey* smallest,
144 InternalKey* largest) const {
145 InternalKey current_smallest;
146 InternalKey current_largest;
147 bool initialized = false;
148 for (const auto& in : inputs) {
149 if (in.empty()) {
150 continue;
151 }
152 GetRange(in, &current_smallest, &current_largest);
153 if (!initialized) {
154 *smallest = current_smallest;
155 *largest = current_largest;
156 initialized = true;
157 } else {
158 if (icmp_->Compare(current_smallest, *smallest) < 0) {
159 *smallest = current_smallest;
160 }
161 if (icmp_->Compare(current_largest, *largest) > 0) {
162 *largest = current_largest;
163 }
164 }
165 }
166 assert(initialized);
167}
168
169bool CompactionPicker::ExpandInputsToCleanCut(const std::string& cf_name,
170 VersionStorageInfo* vstorage,
171 CompactionInputFiles* inputs) {
172 // This isn't good compaction
173 assert(!inputs->empty());
174
175 const int level = inputs->level;
176 // GetOverlappingInputs will always do the right thing for level-0.
177 // So we don't need to do any expansion if level == 0.
178 if (level == 0) {
179 return true;
180 }
181
182 InternalKey smallest, largest;
183
184 // Keep expanding inputs until we are sure that there is a "clean cut"
185 // boundary between the files in input and the surrounding files.
186 // This will ensure that no parts of a key are lost during compaction.
187 int hint_index = -1;
188 size_t old_size;
189 do {
190 old_size = inputs->size();
191 GetRange(*inputs, &smallest, &largest);
192 inputs->clear();
193 vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files,
194 hint_index, &hint_index);
195 } while (inputs->size() > old_size);
196
197 // we started off with inputs non-empty and the previous loop only grew
198 // inputs. thus, inputs should be non-empty here
199 assert(!inputs->empty());
200
201 // If, after the expansion, there are files that are already under
202 // compaction, then we must drop/cancel this compaction.
203 if (AreFilesInCompaction(inputs->files)) {
204 ROCKS_LOG_WARN(
205 ioptions_.info_log,
206 "[%s] ExpandWhileOverlapping() failure because some of the necessary"
207 " compaction input files are currently being compacted.",
208 cf_name.c_str());
209 return false;
210 }
211 return true;
212}
213
214bool CompactionPicker::RangeOverlapWithCompaction(
215 const Slice& smallest_user_key, const Slice& largest_user_key,
216 int level) const {
217 const Comparator* ucmp = icmp_->user_comparator();
218 for (Compaction* c : compactions_in_progress_) {
219 if (c->output_level() == level &&
220 ucmp->Compare(smallest_user_key, c->GetLargestUserKey()) <= 0 &&
221 ucmp->Compare(largest_user_key, c->GetSmallestUserKey()) >= 0) {
222 // Overlap
223 return true;
224 }
225 }
226 // Did not overlap with any running compaction in level `level`
227 return false;
228}
229
230bool CompactionPicker::FilesRangeOverlapWithCompaction(
231 const std::vector<CompactionInputFiles>& inputs, int level) const {
232 bool is_empty = true;
233 for (auto& in : inputs) {
234 if (!in.empty()) {
235 is_empty = false;
236 break;
237 }
238 }
239 if (is_empty) {
240 // No files in inputs
241 return false;
242 }
243
244 InternalKey smallest, largest;
245 GetRange(inputs, &smallest, &largest);
246 return RangeOverlapWithCompaction(smallest.user_key(), largest.user_key(),
247 level);
248}
249
250// Returns true if any one of specified files are being compacted
251bool CompactionPicker::AreFilesInCompaction(
252 const std::vector<FileMetaData*>& files) {
253 for (size_t i = 0; i < files.size(); i++) {
254 if (files[i]->being_compacted) {
255 return true;
256 }
257 }
258 return false;
259}
260
261Compaction* CompactionPicker::CompactFiles(
262 const CompactionOptions& compact_options,
263 const std::vector<CompactionInputFiles>& input_files, int output_level,
264 VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
265 uint32_t output_path_id) {
266 assert(input_files.size());
267
268 // TODO(rven ): we might be able to run concurrent level 0 compaction
269 // if the key ranges of the two compactions do not overlap, but for now
270 // we do not allow it.
271 if ((input_files[0].level == 0) && !level0_compactions_in_progress_.empty()) {
272 return nullptr;
273 }
274 // This compaction output could overlap with a running compaction
275 if (FilesRangeOverlapWithCompaction(input_files, output_level)) {
276 return nullptr;
277 }
278 auto c =
279 new Compaction(vstorage, ioptions_, mutable_cf_options, input_files,
280 output_level, compact_options.output_file_size_limit,
281 mutable_cf_options.max_compaction_bytes, output_path_id,
282 compact_options.compression, /* grandparents */ {}, true);
283
284 // If it's level 0 compaction, make sure we don't execute any other level 0
285 // compactions in parallel
286 RegisterCompaction(c);
287 return c;
288}
289
290Status CompactionPicker::GetCompactionInputsFromFileNumbers(
291 std::vector<CompactionInputFiles>* input_files,
292 std::unordered_set<uint64_t>* input_set, const VersionStorageInfo* vstorage,
293 const CompactionOptions& compact_options) const {
294 if (input_set->size() == 0U) {
295 return Status::InvalidArgument(
296 "Compaction must include at least one file.");
297 }
298 assert(input_files);
299
300 std::vector<CompactionInputFiles> matched_input_files;
301 matched_input_files.resize(vstorage->num_levels());
302 int first_non_empty_level = -1;
303 int last_non_empty_level = -1;
304 // TODO(yhchiang): use a lazy-initialized mapping from
305 // file_number to FileMetaData in Version.
306 for (int level = 0; level < vstorage->num_levels(); ++level) {
307 for (auto file : vstorage->LevelFiles(level)) {
308 auto iter = input_set->find(file->fd.GetNumber());
309 if (iter != input_set->end()) {
310 matched_input_files[level].files.push_back(file);
311 input_set->erase(iter);
312 last_non_empty_level = level;
313 if (first_non_empty_level == -1) {
314 first_non_empty_level = level;
315 }
316 }
317 }
318 }
319
320 if (!input_set->empty()) {
321 std::string message(
322 "Cannot find matched SST files for the following file numbers:");
323 for (auto fn : *input_set) {
324 message += " ";
325 message += ToString(fn);
326 }
327 return Status::InvalidArgument(message);
328 }
329
330 for (int level = first_non_empty_level; level <= last_non_empty_level;
331 ++level) {
332 matched_input_files[level].level = level;
333 input_files->emplace_back(std::move(matched_input_files[level]));
334 }
335
336 return Status::OK();
337}
338
339// Returns true if any one of the parent files are being compacted
340bool CompactionPicker::IsRangeInCompaction(VersionStorageInfo* vstorage,
341 const InternalKey* smallest,
342 const InternalKey* largest,
343 int level, int* level_index) {
344 std::vector<FileMetaData*> inputs;
345 assert(level < NumberLevels());
346
347 vstorage->GetOverlappingInputs(level, smallest, largest, &inputs,
348 *level_index, level_index);
349 return AreFilesInCompaction(inputs);
350}
351
352// Populates the set of inputs of all other levels that overlap with the
353// start level.
354// Now we assume all levels except start level and output level are empty.
355// Will also attempt to expand "start level" if that doesn't expand
356// "output level" or cause "level" to include a file for compaction that has an
357// overlapping user-key with another file.
358// REQUIRES: input_level and output_level are different
359// REQUIRES: inputs->empty() == false
360// Returns false if files on parent level are currently in compaction, which
361// means that we can't compact them
362bool CompactionPicker::SetupOtherInputs(
363 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
364 VersionStorageInfo* vstorage, CompactionInputFiles* inputs,
365 CompactionInputFiles* output_level_inputs, int* parent_index,
366 int base_index) {
367 assert(!inputs->empty());
368 assert(output_level_inputs->empty());
369 const int input_level = inputs->level;
370 const int output_level = output_level_inputs->level;
371 assert(input_level != output_level);
372
373 // For now, we only support merging two levels, start level and output level.
374 // We need to assert other levels are empty.
375 for (int l = input_level + 1; l < output_level; l++) {
376 assert(vstorage->NumLevelFiles(l) == 0);
377 }
378
379 InternalKey smallest, largest;
380
381 // Get the range one last time.
382 GetRange(*inputs, &smallest, &largest);
383
384 // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to
385 // include in compaction
386 vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
387 &output_level_inputs->files, *parent_index,
388 parent_index);
389 if (AreFilesInCompaction(output_level_inputs->files)) {
390 return false;
391 }
392 if (!output_level_inputs->empty()) {
393 if (!ExpandInputsToCleanCut(cf_name, vstorage, output_level_inputs)) {
394 return false;
395 }
396 }
397
398 // See if we can further grow the number of inputs in "level" without
399 // changing the number of "level+1" files we pick up. We also choose NOT
400 // to expand if this would cause "level" to include some entries for some
401 // user key, while excluding other entries for the same user key. This
402 // can happen when one user key spans multiple files.
403 if (!output_level_inputs->empty()) {
404 const uint64_t limit = mutable_cf_options.max_compaction_bytes;
405 const uint64_t output_level_inputs_size =
406 TotalCompensatedFileSize(output_level_inputs->files);
407 const uint64_t inputs_size = TotalCompensatedFileSize(inputs->files);
408 bool expand_inputs = false;
409
410 CompactionInputFiles expanded_inputs;
411 expanded_inputs.level = input_level;
412 // Get closed interval of output level
413 InternalKey all_start, all_limit;
414 GetRange(*inputs, *output_level_inputs, &all_start, &all_limit);
415 bool try_overlapping_inputs = true;
416 vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
417 &expanded_inputs.files, base_index, nullptr);
418 uint64_t expanded_inputs_size =
419 TotalCompensatedFileSize(expanded_inputs.files);
420 if (!ExpandInputsToCleanCut(cf_name, vstorage, &expanded_inputs)) {
421 try_overlapping_inputs = false;
422 }
423 if (try_overlapping_inputs && expanded_inputs.size() > inputs->size() &&
424 output_level_inputs_size + expanded_inputs_size < limit &&
425 !AreFilesInCompaction(expanded_inputs.files)) {
426 InternalKey new_start, new_limit;
427 GetRange(expanded_inputs, &new_start, &new_limit);
428 CompactionInputFiles expanded_output_level_inputs;
429 expanded_output_level_inputs.level = output_level;
430 vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
431 &expanded_output_level_inputs.files,
432 *parent_index, parent_index);
433 assert(!expanded_output_level_inputs.empty());
434 if (!AreFilesInCompaction(expanded_output_level_inputs.files) &&
435 ExpandInputsToCleanCut(cf_name, vstorage,
436 &expanded_output_level_inputs) &&
437 expanded_output_level_inputs.size() == output_level_inputs->size()) {
438 expand_inputs = true;
439 }
440 }
441 if (!expand_inputs) {
442 vstorage->GetCleanInputsWithinInterval(input_level, &all_start,
443 &all_limit, &expanded_inputs.files,
444 base_index, nullptr);
445 expanded_inputs_size = TotalCompensatedFileSize(expanded_inputs.files);
446 if (expanded_inputs.size() > inputs->size() &&
447 output_level_inputs_size + expanded_inputs_size < limit &&
448 !AreFilesInCompaction(expanded_inputs.files)) {
449 expand_inputs = true;
450 }
451 }
452 if (expand_inputs) {
453 ROCKS_LOG_INFO(ioptions_.info_log,
454 "[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt
455 "(%" PRIu64 "+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt
456 "+%" ROCKSDB_PRIszt " (%" PRIu64 "+%" PRIu64 "bytes)\n",
457 cf_name.c_str(), input_level, inputs->size(),
458 output_level_inputs->size(), inputs_size,
459 output_level_inputs_size, expanded_inputs.size(),
460 output_level_inputs->size(), expanded_inputs_size,
461 output_level_inputs_size);
462 inputs->files = expanded_inputs.files;
463 }
464 }
465 return true;
466}
467
468void CompactionPicker::GetGrandparents(
469 VersionStorageInfo* vstorage, const CompactionInputFiles& inputs,
470 const CompactionInputFiles& output_level_inputs,
471 std::vector<FileMetaData*>* grandparents) {
472 InternalKey start, limit;
473 GetRange(inputs, output_level_inputs, &start, &limit);
474 // Compute the set of grandparent files that overlap this compaction
475 // (parent == level+1; grandparent == level+2)
476 if (output_level_inputs.level + 1 < NumberLevels()) {
477 vstorage->GetOverlappingInputs(output_level_inputs.level + 1, &start,
478 &limit, grandparents);
479 }
480}
481
482Compaction* CompactionPicker::CompactRange(
483 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
484 VersionStorageInfo* vstorage, int input_level, int output_level,
485 uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
486 InternalKey** compaction_end, bool* manual_conflict) {
487 // CompactionPickerFIFO has its own implementation of compact range
488 assert(ioptions_.compaction_style != kCompactionStyleFIFO);
489
490 if (input_level == ColumnFamilyData::kCompactAllLevels) {
491 assert(ioptions_.compaction_style == kCompactionStyleUniversal);
492
493 // Universal compaction with more than one level always compacts all the
494 // files together to the last level.
495 assert(vstorage->num_levels() > 1);
496 // DBImpl::CompactRange() set output level to be the last level
497 assert(output_level == vstorage->num_levels() - 1);
498 // DBImpl::RunManualCompaction will make full range for universal compaction
499 assert(begin == nullptr);
500 assert(end == nullptr);
501 *compaction_end = nullptr;
502
503 int start_level = 0;
504 for (; start_level < vstorage->num_levels() &&
505 vstorage->NumLevelFiles(start_level) == 0;
506 start_level++) {
507 }
508 if (start_level == vstorage->num_levels()) {
509 return nullptr;
510 }
511
512 if ((start_level == 0) && (!level0_compactions_in_progress_.empty())) {
513 *manual_conflict = true;
514 // Only one level 0 compaction allowed
515 return nullptr;
516 }
517
518 std::vector<CompactionInputFiles> inputs(vstorage->num_levels() -
519 start_level);
520 for (int level = start_level; level < vstorage->num_levels(); level++) {
521 inputs[level - start_level].level = level;
522 auto& files = inputs[level - start_level].files;
523 for (FileMetaData* f : vstorage->LevelFiles(level)) {
524 files.push_back(f);
525 }
526 if (AreFilesInCompaction(files)) {
527 *manual_conflict = true;
528 return nullptr;
529 }
530 }
531
532 // 2 non-exclusive manual compactions could run at the same time producing
533 // overlaping outputs in the same level.
534 if (FilesRangeOverlapWithCompaction(inputs, output_level)) {
535 // This compaction output could potentially conflict with the output
536 // of a currently running compaction, we cannot run it.
537 *manual_conflict = true;
538 return nullptr;
539 }
540
541 Compaction* c = new Compaction(
542 vstorage, ioptions_, mutable_cf_options, std::move(inputs),
543 output_level, mutable_cf_options.MaxFileSizeForLevel(output_level),
544 /* max_compaction_bytes */ LLONG_MAX, output_path_id,
545 GetCompressionType(ioptions_, vstorage, mutable_cf_options,
546 output_level, 1),
547 /* grandparents */ {}, /* is manual */ true);
548 RegisterCompaction(c);
549 return c;
550 }
551
552 CompactionInputFiles inputs;
553 inputs.level = input_level;
554 bool covering_the_whole_range = true;
555
556 // All files are 'overlapping' in universal style compaction.
557 // We have to compact the entire range in one shot.
558 if (ioptions_.compaction_style == kCompactionStyleUniversal) {
559 begin = nullptr;
560 end = nullptr;
561 }
562
563 vstorage->GetOverlappingInputs(input_level, begin, end, &inputs.files);
564 if (inputs.empty()) {
565 return nullptr;
566 }
567
568 if ((input_level == 0) && (!level0_compactions_in_progress_.empty())) {
569 // Only one level 0 compaction allowed
570 TEST_SYNC_POINT("CompactionPicker::CompactRange:Conflict");
571 *manual_conflict = true;
572 return nullptr;
573 }
574
575 // Avoid compacting too much in one shot in case the range is large.
576 // But we cannot do this for level-0 since level-0 files can overlap
577 // and we must not pick one file and drop another older file if the
578 // two files overlap.
579 if (input_level > 0) {
580 const uint64_t limit = mutable_cf_options.max_compaction_bytes;
581 uint64_t total = 0;
582 for (size_t i = 0; i + 1 < inputs.size(); ++i) {
583 uint64_t s = inputs[i]->compensated_file_size;
584 total += s;
585 if (total >= limit) {
586 **compaction_end = inputs[i + 1]->smallest;
587 covering_the_whole_range = false;
588 inputs.files.resize(i + 1);
589 break;
590 }
591 }
592 }
593 assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size()));
594
595 if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs) == false) {
596 // manual compaction is now multi-threaded, so it can
597 // happen that ExpandWhileOverlapping fails
598 // we handle it higher in RunManualCompaction
599 *manual_conflict = true;
600 return nullptr;
601 }
602
603 if (covering_the_whole_range) {
604 *compaction_end = nullptr;
605 }
606
607 CompactionInputFiles output_level_inputs;
608 if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
609 assert(input_level == 0);
610 output_level = vstorage->base_level();
611 assert(output_level > 0);
612 }
613 output_level_inputs.level = output_level;
614 if (input_level != output_level) {
615 int parent_index = -1;
616 if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
617 &output_level_inputs, &parent_index, -1)) {
618 // manual compaction is now multi-threaded, so it can
619 // happen that SetupOtherInputs fails
620 // we handle it higher in RunManualCompaction
621 *manual_conflict = true;
622 return nullptr;
623 }
624 }
625
626 std::vector<CompactionInputFiles> compaction_inputs({inputs});
627 if (!output_level_inputs.empty()) {
628 compaction_inputs.push_back(output_level_inputs);
629 }
630 for (size_t i = 0; i < compaction_inputs.size(); i++) {
631 if (AreFilesInCompaction(compaction_inputs[i].files)) {
632 *manual_conflict = true;
633 return nullptr;
634 }
635 }
636
637 // 2 non-exclusive manual compactions could run at the same time producing
638 // overlaping outputs in the same level.
639 if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) {
640 // This compaction output could potentially conflict with the output
641 // of a currently running compaction, we cannot run it.
642 *manual_conflict = true;
643 return nullptr;
644 }
645
646 std::vector<FileMetaData*> grandparents;
647 GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
648 Compaction* compaction = new Compaction(
649 vstorage, ioptions_, mutable_cf_options, std::move(compaction_inputs),
650 output_level, mutable_cf_options.MaxFileSizeForLevel(output_level),
651 mutable_cf_options.max_compaction_bytes, output_path_id,
652 GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
653 vstorage->base_level()),
654 std::move(grandparents), /* is manual compaction */ true);
655
656 TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
657 RegisterCompaction(compaction);
658
659 // Creating a compaction influences the compaction score because the score
660 // takes running compactions into account (by skipping files that are already
661 // being compacted). Since we just changed compaction score, we recalculate it
662 // here
663 vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options);
664
665 return compaction;
666}
667
668#ifndef ROCKSDB_LITE
669namespace {
670// Test whether two files have overlapping key-ranges.
671bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a,
672 const SstFileMetaData& b) {
673 if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
674 if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
675 // b.smallestkey <= a.smallestkey <= b.largestkey
676 return true;
677 }
678 } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
679 // a.smallestkey < b.smallestkey <= a.largestkey
680 return true;
681 }
682 if (c->Compare(a.largestkey, b.largestkey) <= 0) {
683 if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
684 // b.smallestkey <= a.largestkey <= b.largestkey
685 return true;
686 }
687 } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
688 // a.smallestkey <= b.largestkey < a.largestkey
689 return true;
690 }
691 return false;
692}
693} // namespace
694
695Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
696 std::unordered_set<uint64_t>* input_files,
697 const ColumnFamilyMetaData& cf_meta, const int output_level) const {
698 auto& levels = cf_meta.levels;
699 auto comparator = icmp_->user_comparator();
700
701 // TODO(yhchiang): If there is any input files of L1 or up and there
702 // is at least one L0 files. All L0 files older than the L0 file needs
703 // to be included. Otherwise, it is a false conditoin
704
705 // TODO(yhchiang): add is_adjustable to CompactionOptions
706
707 // the smallest and largest key of the current compaction input
708 std::string smallestkey;
709 std::string largestkey;
710 // a flag for initializing smallest and largest key
711 bool is_first = false;
712 const int kNotFound = -1;
713
714 // For each level, it does the following things:
715 // 1. Find the first and the last compaction input files
716 // in the current level.
717 // 2. Include all files between the first and the last
718 // compaction input files.
719 // 3. Update the compaction key-range.
720 // 4. For all remaining levels, include files that have
721 // overlapping key-range with the compaction key-range.
722 for (int l = 0; l <= output_level; ++l) {
723 auto& current_files = levels[l].files;
724 int first_included = static_cast<int>(current_files.size());
725 int last_included = kNotFound;
726
727 // identify the first and the last compaction input files
728 // in the current level.
729 for (size_t f = 0; f < current_files.size(); ++f) {
730 if (input_files->find(TableFileNameToNumber(current_files[f].name)) !=
731 input_files->end()) {
732 first_included = std::min(first_included, static_cast<int>(f));
733 last_included = std::max(last_included, static_cast<int>(f));
734 if (is_first == false) {
735 smallestkey = current_files[f].smallestkey;
736 largestkey = current_files[f].largestkey;
737 is_first = true;
738 }
739 }
740 }
741 if (last_included == kNotFound) {
742 continue;
743 }
744
745 if (l != 0) {
746 // expend the compaction input of the current level if it
747 // has overlapping key-range with other non-compaction input
748 // files in the same level.
749 while (first_included > 0) {
750 if (comparator->Compare(current_files[first_included - 1].largestkey,
751 current_files[first_included].smallestkey) <
752 0) {
753 break;
754 }
755 first_included--;
756 }
757
758 while (last_included < static_cast<int>(current_files.size()) - 1) {
759 if (comparator->Compare(current_files[last_included + 1].smallestkey,
760 current_files[last_included].largestkey) > 0) {
761 break;
762 }
763 last_included++;
764 }
765 }
766
767 // include all files between the first and the last compaction input files.
768 for (int f = first_included; f <= last_included; ++f) {
769 if (current_files[f].being_compacted) {
770 return Status::Aborted("Necessary compaction input file " +
771 current_files[f].name +
772 " is currently being compacted.");
773 }
774 input_files->insert(TableFileNameToNumber(current_files[f].name));
775 }
776
777 // update smallest and largest key
778 if (l == 0) {
779 for (int f = first_included; f <= last_included; ++f) {
780 if (comparator->Compare(smallestkey, current_files[f].smallestkey) >
781 0) {
782 smallestkey = current_files[f].smallestkey;
783 }
784 if (comparator->Compare(largestkey, current_files[f].largestkey) < 0) {
785 largestkey = current_files[f].largestkey;
786 }
787 }
788 } else {
789 if (comparator->Compare(smallestkey,
790 current_files[first_included].smallestkey) > 0) {
791 smallestkey = current_files[first_included].smallestkey;
792 }
793 if (comparator->Compare(largestkey,
794 current_files[last_included].largestkey) < 0) {
795 largestkey = current_files[last_included].largestkey;
796 }
797 }
798
799 SstFileMetaData aggregated_file_meta;
800 aggregated_file_meta.smallestkey = smallestkey;
801 aggregated_file_meta.largestkey = largestkey;
802
803 // For all lower levels, include all overlapping files.
804 // We need to add overlapping files from the current level too because even
805 // if there no input_files in level l, we would still need to add files
806 // which overlap with the range containing the input_files in levels 0 to l
807 // Level 0 doesn't need to be handled this way because files are sorted by
808 // time and not by key
809 for (int m = std::max(l, 1); m <= output_level; ++m) {
810 for (auto& next_lv_file : levels[m].files) {
811 if (HaveOverlappingKeyRanges(comparator, aggregated_file_meta,
812 next_lv_file)) {
813 if (next_lv_file.being_compacted) {
814 return Status::Aborted(
815 "File " + next_lv_file.name +
816 " that has overlapping key range with one of the compaction "
817 " input file is currently being compacted.");
818 }
819 input_files->insert(TableFileNameToNumber(next_lv_file.name));
820 }
821 }
822 }
823 }
824 return Status::OK();
825}
826
827Status CompactionPicker::SanitizeCompactionInputFiles(
828 std::unordered_set<uint64_t>* input_files,
829 const ColumnFamilyMetaData& cf_meta, const int output_level) const {
830 assert(static_cast<int>(cf_meta.levels.size()) - 1 ==
831 cf_meta.levels[cf_meta.levels.size() - 1].level);
832 if (output_level >= static_cast<int>(cf_meta.levels.size())) {
833 return Status::InvalidArgument(
834 "Output level for column family " + cf_meta.name +
835 " must between [0, " +
836 ToString(cf_meta.levels[cf_meta.levels.size() - 1].level) + "].");
837 }
838
839 if (output_level > MaxOutputLevel()) {
840 return Status::InvalidArgument(
841 "Exceed the maximum output level defined by "
842 "the current compaction algorithm --- " +
843 ToString(MaxOutputLevel()));
844 }
845
846 if (output_level < 0) {
847 return Status::InvalidArgument("Output level cannot be negative.");
848 }
849
850 if (input_files->size() == 0) {
851 return Status::InvalidArgument(
852 "A compaction must contain at least one file.");
853 }
854
855 Status s = SanitizeCompactionInputFilesForAllLevels(input_files, cf_meta,
856 output_level);
857
858 if (!s.ok()) {
859 return s;
860 }
861
862 // for all input files, check whether the file number matches
863 // any currently-existing files.
864 for (auto file_num : *input_files) {
865 bool found = false;
866 for (auto level_meta : cf_meta.levels) {
867 for (auto file_meta : level_meta.files) {
868 if (file_num == TableFileNameToNumber(file_meta.name)) {
869 if (file_meta.being_compacted) {
870 return Status::Aborted("Specified compaction input file " +
871 MakeTableFileName("", file_num) +
872 " is already being compacted.");
873 }
874 found = true;
875 break;
876 }
877 }
878 if (found) {
879 break;
880 }
881 }
882 if (!found) {
883 return Status::InvalidArgument(
884 "Specified compaction input file " + MakeTableFileName("", file_num) +
885 " does not exist in column family " + cf_meta.name + ".");
886 }
887 }
888
889 return Status::OK();
890}
891#endif // !ROCKSDB_LITE
892
893void CompactionPicker::RegisterCompaction(Compaction* c) {
894 if (c == nullptr) {
895 return;
896 }
897 assert(ioptions_.compaction_style != kCompactionStyleLevel ||
898 c->output_level() == 0 ||
899 !FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level()));
900 if (c->start_level() == 0 ||
901 ioptions_.compaction_style == kCompactionStyleUniversal) {
902 level0_compactions_in_progress_.insert(c);
903 }
904 compactions_in_progress_.insert(c);
905}
906
907void CompactionPicker::UnregisterCompaction(Compaction* c) {
908 if (c == nullptr) {
909 return;
910 }
911 if (c->start_level() == 0 ||
912 ioptions_.compaction_style == kCompactionStyleUniversal) {
913 level0_compactions_in_progress_.erase(c);
914 }
915 compactions_in_progress_.erase(c);
916}
917
918bool LevelCompactionPicker::NeedsCompaction(
919 const VersionStorageInfo* vstorage) const {
920 if (!vstorage->FilesMarkedForCompaction().empty()) {
921 return true;
922 }
923 for (int i = 0; i <= vstorage->MaxInputLevel(); i++) {
924 if (vstorage->CompactionScore(i) >= 1) {
925 return true;
926 }
927 }
928 return false;
929}
930
931namespace {
932// A class to build a leveled compaction step-by-step.
933class LevelCompactionBuilder {
934 public:
935 LevelCompactionBuilder(const std::string& cf_name,
936 VersionStorageInfo* vstorage,
937 CompactionPicker* compaction_picker,
938 LogBuffer* log_buffer,
939 const MutableCFOptions& mutable_cf_options,
940 const ImmutableCFOptions& ioptions)
941 : cf_name_(cf_name),
942 vstorage_(vstorage),
943 compaction_picker_(compaction_picker),
944 log_buffer_(log_buffer),
945 mutable_cf_options_(mutable_cf_options),
946 ioptions_(ioptions) {}
947
948 // Pick and return a compaction.
949 Compaction* PickCompaction();
950
951 // Pick the initial files to compact to the next level. (or together
952 // in Intra-L0 compactions)
953 void SetupInitialFiles();
954
955 // If the initial files are from L0 level, pick other L0
956 // files if needed.
957 bool SetupOtherL0FilesIfNeeded();
958
959 // Based on initial files, setup other files need to be compacted
960 // in this compaction, accordingly.
961 bool SetupOtherInputsIfNeeded();
962
963 Compaction* GetCompaction();
964
965 // For the specfied level, pick a file that we want to compact.
966 // Returns false if there is no file to compact.
967 // If it returns true, inputs->files.size() will be exactly one.
968 // If level is 0 and there is already a compaction on that level, this
969 // function will return false.
970 bool PickFileToCompact();
971
972 // For L0->L0, picks the longest span of files that aren't currently
973 // undergoing compaction for which work-per-deleted-file decreases. The span
974 // always starts from the newest L0 file.
975 //
976 // Intra-L0 compaction is independent of all other files, so it can be
977 // performed even when L0->base_level compactions are blocked.
978 //
979 // Returns true if `inputs` is populated with a span of files to be compacted;
980 // otherwise, returns false.
981 bool PickIntraL0Compaction();
982
983 // If there is any file marked for compaction, put put it into inputs.
984 void PickFilesMarkedForCompaction();
985
986 const std::string& cf_name_;
987 VersionStorageInfo* vstorage_;
988 CompactionPicker* compaction_picker_;
989 LogBuffer* log_buffer_;
990 int start_level_ = -1;
991 int output_level_ = -1;
992 int parent_index_ = -1;
993 int base_index_ = -1;
994 double start_level_score_ = 0;
995 bool is_manual_ = false;
996 CompactionInputFiles start_level_inputs_;
997 std::vector<CompactionInputFiles> compaction_inputs_;
998 CompactionInputFiles output_level_inputs_;
999 std::vector<FileMetaData*> grandparents_;
1000 CompactionReason compaction_reason_ = CompactionReason::kUnknown;
1001
1002 const MutableCFOptions& mutable_cf_options_;
1003 const ImmutableCFOptions& ioptions_;
1004 // Pick a path ID to place a newly generated file, with its level
1005 static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
1006 const MutableCFOptions& mutable_cf_options,
1007 int level);
1008
1009 static const int kMinFilesForIntraL0Compaction = 4;
1010};
1011
1012void LevelCompactionBuilder::PickFilesMarkedForCompaction() {
1013 if (vstorage_->FilesMarkedForCompaction().empty()) {
1014 return;
1015 }
1016
1017 auto continuation = [&](std::pair<int, FileMetaData*> level_file) {
1018 // If it's being compacted it has nothing to do here.
1019 // If this assert() fails that means that some function marked some
1020 // files as being_compacted, but didn't call ComputeCompactionScore()
1021 assert(!level_file.second->being_compacted);
1022 start_level_ = level_file.first;
1023 output_level_ =
1024 (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
1025
1026 if (start_level_ == 0 &&
1027 !compaction_picker_->level0_compactions_in_progress()->empty()) {
1028 return false;
1029 }
1030
1031 start_level_inputs_.files = {level_file.second};
1032 start_level_inputs_.level = start_level_;
1033 return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
1034 &start_level_inputs_);
1035 };
1036
1037 // take a chance on a random file first
1038 Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage_));
1039 size_t random_file_index = static_cast<size_t>(rnd.Uniform(
1040 static_cast<uint64_t>(vstorage_->FilesMarkedForCompaction().size())));
1041
1042 if (continuation(vstorage_->FilesMarkedForCompaction()[random_file_index])) {
1043 // found the compaction!
1044 return;
1045 }
1046
1047 for (auto& level_file : vstorage_->FilesMarkedForCompaction()) {
1048 if (continuation(level_file)) {
1049 // found the compaction!
1050 return;
1051 }
1052 }
1053 start_level_inputs_.files.clear();
1054}
1055
1056void LevelCompactionBuilder::SetupInitialFiles() {
1057 // Find the compactions by size on all levels.
1058 bool skipped_l0_to_base = false;
1059 for (int i = 0; i < compaction_picker_->NumberLevels() - 1; i++) {
1060 start_level_score_ = vstorage_->CompactionScore(i);
1061 start_level_ = vstorage_->CompactionScoreLevel(i);
1062 assert(i == 0 || start_level_score_ <= vstorage_->CompactionScore(i - 1));
1063 if (start_level_score_ >= 1) {
1064 if (skipped_l0_to_base && start_level_ == vstorage_->base_level()) {
1065 // If L0->base_level compaction is pending, don't schedule further
1066 // compaction from base level. Otherwise L0->base_level compaction
1067 // may starve.
1068 continue;
1069 }
1070 output_level_ =
1071 (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
1072 if (PickFileToCompact() &&
1073 compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
1074 &start_level_inputs_) &&
1075 !compaction_picker_->FilesRangeOverlapWithCompaction(
1076 {start_level_inputs_}, output_level_)) {
1077 // found the compaction!
1078 if (start_level_ == 0) {
1079 // L0 score = `num L0 files` / `level0_file_num_compaction_trigger`
1080 compaction_reason_ = CompactionReason::kLevelL0FilesNum;
1081 } else {
1082 // L1+ score = `Level files size` / `MaxBytesForLevel`
1083 compaction_reason_ = CompactionReason::kLevelMaxLevelSize;
1084 }
1085 break;
1086 } else {
1087 // didn't find the compaction, clear the inputs
1088 start_level_inputs_.clear();
1089 if (start_level_ == 0) {
1090 skipped_l0_to_base = true;
1091 // L0->base_level may be blocked due to ongoing L0->base_level
1092 // compactions. It may also be blocked by an ongoing compaction from
1093 // base_level downwards.
1094 //
1095 // In these cases, to reduce L0 file count and thus reduce likelihood
1096 // of write stalls, we can attempt compacting a span of files within
1097 // L0.
1098 if (PickIntraL0Compaction()) {
1099 output_level_ = 0;
1100 compaction_reason_ = CompactionReason::kLevelL0FilesNum;
1101 break;
1102 }
1103 }
1104 }
1105 }
1106 }
1107
1108 // if we didn't find a compaction, check if there are any files marked for
1109 // compaction
1110 if (start_level_inputs_.empty()) {
1111 is_manual_ = true;
1112 parent_index_ = base_index_ = -1;
1113 PickFilesMarkedForCompaction();
1114 if (!start_level_inputs_.empty()) {
1115 compaction_reason_ = CompactionReason::kFilesMarkedForCompaction;
1116 }
1117 }
1118}
1119
1120bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() {
1121 if (start_level_ == 0 && output_level_ != 0) {
1122 // Two level 0 compaction won't run at the same time, so don't need to worry
1123 // about files on level 0 being compacted.
1124 assert(compaction_picker_->level0_compactions_in_progress()->empty());
1125 InternalKey smallest, largest;
1126 compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
1127 // Note that the next call will discard the file we placed in
1128 // c->inputs_[0] earlier and replace it with an overlapping set
1129 // which will include the picked file.
1130 start_level_inputs_.files.clear();
1131 vstorage_->GetOverlappingInputs(0, &smallest, &largest,
1132 &start_level_inputs_.files);
1133
1134 // If we include more L0 files in the same compaction run it can
1135 // cause the 'smallest' and 'largest' key to get extended to a
1136 // larger range. So, re-invoke GetRange to get the new key range
1137 compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
1138 if (compaction_picker_->IsRangeInCompaction(
1139 vstorage_, &smallest, &largest, output_level_, &parent_index_)) {
1140 return false;
1141 }
1142 }
1143 assert(!start_level_inputs_.files.empty());
1144
1145 return true;
1146}
1147
1148bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
1149 // Setup input files from output level. For output to L0, we only compact
1150 // spans of files that do not interact with any pending compactions, so don't
1151 // need to consider other levels.
1152 if (output_level_ != 0) {
1153 output_level_inputs_.level = output_level_;
1154 if (!compaction_picker_->SetupOtherInputs(
1155 cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_,
1156 &output_level_inputs_, &parent_index_, base_index_)) {
1157 return false;
1158 }
1159
1160 compaction_inputs_.push_back(start_level_inputs_);
1161 if (!output_level_inputs_.empty()) {
1162 compaction_inputs_.push_back(output_level_inputs_);
1163 }
1164
1165 // In some edge cases we could pick a compaction that will be compacting
1166 // a key range that overlap with another running compaction, and both
1167 // of them have the same output level. This could happen if
1168 // (1) we are running a non-exclusive manual compaction
1169 // (2) AddFile ingest a new file into the LSM tree
1170 // We need to disallow this from happening.
1171 if (compaction_picker_->FilesRangeOverlapWithCompaction(compaction_inputs_,
1172 output_level_)) {
1173 // This compaction output could potentially conflict with the output
1174 // of a currently running compaction, we cannot run it.
1175 return false;
1176 }
1177 compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_,
1178 output_level_inputs_, &grandparents_);
1179 } else {
1180 compaction_inputs_.push_back(start_level_inputs_);
1181 }
1182 return true;
1183}
1184
1185Compaction* LevelCompactionBuilder::PickCompaction() {
1186 // Pick up the first file to start compaction. It may have been extended
1187 // to a clean cut.
1188 SetupInitialFiles();
1189 if (start_level_inputs_.empty()) {
1190 return nullptr;
1191 }
1192 assert(start_level_ >= 0 && output_level_ >= 0);
1193
1194 // If it is a L0 -> base level compaction, we need to set up other L0
1195 // files if needed.
1196 if (!SetupOtherL0FilesIfNeeded()) {
1197 return nullptr;
1198 }
1199
1200 // Pick files in the output level and expand more files in the start level
1201 // if needed.
1202 if (!SetupOtherInputsIfNeeded()) {
1203 return nullptr;
1204 }
1205
1206 // Form a compaction object containing the files we picked.
1207 Compaction* c = GetCompaction();
1208
1209 TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);
1210
1211 return c;
1212}
1213
1214Compaction* LevelCompactionBuilder::GetCompaction() {
1215 auto c = new Compaction(
1216 vstorage_, ioptions_, mutable_cf_options_, std::move(compaction_inputs_),
1217 output_level_, mutable_cf_options_.MaxFileSizeForLevel(output_level_),
1218 mutable_cf_options_.max_compaction_bytes,
1219 GetPathId(ioptions_, mutable_cf_options_, output_level_),
1220 GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
1221 output_level_, vstorage_->base_level()),
1222 std::move(grandparents_), is_manual_, start_level_score_,
1223 false /* deletion_compaction */, compaction_reason_);
1224
1225 // If it's level 0 compaction, make sure we don't execute any other level 0
1226 // compactions in parallel
1227 compaction_picker_->RegisterCompaction(c);
1228
1229 // Creating a compaction influences the compaction score because the score
1230 // takes running compactions into account (by skipping files that are already
1231 // being compacted). Since we just changed compaction score, we recalculate it
1232 // here
1233 vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
1234 return c;
1235}
1236
1237/*
1238 * Find the optimal path to place a file
1239 * Given a level, finds the path where levels up to it will fit in levels
1240 * up to and including this path
1241 */
1242uint32_t LevelCompactionBuilder::GetPathId(
1243 const ImmutableCFOptions& ioptions,
1244 const MutableCFOptions& mutable_cf_options, int level) {
1245 uint32_t p = 0;
1246 assert(!ioptions.db_paths.empty());
1247
1248 // size remaining in the most recent path
1249 uint64_t current_path_size = ioptions.db_paths[0].target_size;
1250
1251 uint64_t level_size;
1252 int cur_level = 0;
1253
b32b8144
FG
1254 // max_bytes_for_level_base denotes L1 size.
1255 // We estimate L0 size to be the same as L1.
7c673cae
FG
1256 level_size = mutable_cf_options.max_bytes_for_level_base;
1257
1258 // Last path is the fallback
1259 while (p < ioptions.db_paths.size() - 1) {
1260 if (level_size <= current_path_size) {
1261 if (cur_level == level) {
1262 // Does desired level fit in this path?
1263 return p;
1264 } else {
1265 current_path_size -= level_size;
b32b8144
FG
1266 if (cur_level > 0) {
1267 level_size = static_cast<uint64_t>(
1268 level_size * mutable_cf_options.max_bytes_for_level_multiplier);
1269 }
7c673cae
FG
1270 cur_level++;
1271 continue;
1272 }
1273 }
1274 p++;
1275 current_path_size = ioptions.db_paths[p].target_size;
1276 }
1277 return p;
1278}
1279
1280bool LevelCompactionBuilder::PickFileToCompact() {
1281 // level 0 files are overlapping. So we cannot pick more
1282 // than one concurrent compactions at this level. This
1283 // could be made better by looking at key-ranges that are
1284 // being compacted at level 0.
1285 if (start_level_ == 0 &&
1286 !compaction_picker_->level0_compactions_in_progress()->empty()) {
1287 TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0");
1288 return false;
1289 }
1290
1291 start_level_inputs_.clear();
1292
1293 assert(start_level_ >= 0);
1294
1295 // Pick the largest file in this level that is not already
1296 // being compacted
1297 const std::vector<int>& file_size =
1298 vstorage_->FilesByCompactionPri(start_level_);
1299 const std::vector<FileMetaData*>& level_files =
1300 vstorage_->LevelFiles(start_level_);
1301
1302 // record the first file that is not yet compacted
1303 int nextIndex = -1;
1304
1305 for (unsigned int i = vstorage_->NextCompactionIndex(start_level_);
1306 i < file_size.size(); i++) {
1307 int index = file_size[i];
1308 auto* f = level_files[index];
1309
1310 // do not pick a file to compact if it is being compacted
1311 // from n-1 level.
1312 if (f->being_compacted) {
1313 continue;
1314 }
1315
1316 // remember the startIndex for the next call to PickCompaction
1317 if (nextIndex == -1) {
1318 nextIndex = i;
1319 }
1320
1321 // Do not pick this file if its parents at level+1 are being compacted.
1322 // Maybe we can avoid redoing this work in SetupOtherInputs
1323 parent_index_ = -1;
1324 if (compaction_picker_->IsRangeInCompaction(vstorage_, &f->smallest,
1325 &f->largest, output_level_,
1326 &parent_index_)) {
1327 continue;
1328 }
1329 start_level_inputs_.files.push_back(f);
1330 start_level_inputs_.level = start_level_;
1331 base_index_ = index;
1332 break;
1333 }
1334
1335 // store where to start the iteration in the next call to PickCompaction
1336 vstorage_->SetNextCompactionIndex(start_level_, nextIndex);
1337
1338 return start_level_inputs_.size() > 0;
1339}
1340
1341bool LevelCompactionBuilder::PickIntraL0Compaction() {
1342 start_level_inputs_.clear();
1343 const std::vector<FileMetaData*>& level_files =
1344 vstorage_->LevelFiles(0 /* level */);
1345 if (level_files.size() <
1346 static_cast<size_t>(
1347 mutable_cf_options_.level0_file_num_compaction_trigger + 2) ||
1348 level_files[0]->being_compacted) {
1349 // If L0 isn't accumulating much files beyond the regular trigger, don't
1350 // resort to L0->L0 compaction yet.
1351 return false;
1352 }
1353
1354 size_t compact_bytes = level_files[0]->fd.file_size;
1355 size_t compact_bytes_per_del_file = port::kMaxSizet;
1356 // compaction range will be [0, span_len).
1357 size_t span_len;
1358 // pull in files until the amount of compaction work per deleted file begins
1359 // increasing.
1360 for (span_len = 1; span_len < level_files.size(); ++span_len) {
1361 compact_bytes += level_files[span_len]->fd.file_size;
1362 size_t new_compact_bytes_per_del_file = compact_bytes / span_len;
1363 if (level_files[span_len]->being_compacted ||
1364 new_compact_bytes_per_del_file > compact_bytes_per_del_file) {
1365 break;
1366 }
1367 compact_bytes_per_del_file = new_compact_bytes_per_del_file;
1368 }
1369
1370 if (span_len >= kMinFilesForIntraL0Compaction) {
1371 start_level_inputs_.level = 0;
1372 for (size_t i = 0; i < span_len; ++i) {
1373 start_level_inputs_.files.push_back(level_files[i]);
1374 }
1375 return true;
1376 }
1377 return false;
1378}
1379} // namespace
1380
1381Compaction* LevelCompactionPicker::PickCompaction(
1382 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1383 VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
1384 LevelCompactionBuilder builder(cf_name, vstorage, this, log_buffer,
1385 mutable_cf_options, ioptions_);
1386 return builder.PickCompaction();
1387}
1388
1389#ifndef ROCKSDB_LITE
1390bool FIFOCompactionPicker::NeedsCompaction(
1391 const VersionStorageInfo* vstorage) const {
1392 const int kLevel0 = 0;
1393 return vstorage->CompactionScore(kLevel0) >= 1;
1394}
1395
1396Compaction* FIFOCompactionPicker::PickCompaction(
1397 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1398 VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
1399 assert(vstorage->num_levels() == 1);
1400 const int kLevel0 = 0;
1401 const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
1402 uint64_t total_size = 0;
1403 for (const auto& file : level_files) {
1404 total_size += file->fd.file_size;
1405 }
1406
1407 if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size ||
1408 level_files.size() == 0) {
1409 // total size not exceeded
1410 ROCKS_LOG_BUFFER(log_buffer,
1411 "[%s] FIFO compaction: nothing to do. Total size %" PRIu64
1412 ", max size %" PRIu64 "\n",
1413 cf_name.c_str(), total_size,
1414 ioptions_.compaction_options_fifo.max_table_files_size);
1415 return nullptr;
1416 }
1417
1418 if (!level0_compactions_in_progress_.empty()) {
1419 ROCKS_LOG_BUFFER(
1420 log_buffer,
1421 "[%s] FIFO compaction: Already executing compaction. No need "
1422 "to run parallel compactions since compactions are very fast",
1423 cf_name.c_str());
1424 return nullptr;
1425 }
1426
1427 std::vector<CompactionInputFiles> inputs;
1428 inputs.emplace_back();
1429 inputs[0].level = 0;
1430 // delete old files (FIFO)
1431 for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
1432 auto f = *ritr;
1433 total_size -= f->compensated_file_size;
1434 inputs[0].files.push_back(f);
1435 char tmp_fsize[16];
1436 AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
1437 ROCKS_LOG_BUFFER(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
1438 " with size %s for deletion",
1439 cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
1440 if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) {
1441 break;
1442 }
1443 }
1444 Compaction* c = new Compaction(
1445 vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
1446 kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0),
1447 /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
1448 RegisterCompaction(c);
1449 return c;
1450}
1451
1452Compaction* FIFOCompactionPicker::CompactRange(
1453 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1454 VersionStorageInfo* vstorage, int input_level, int output_level,
1455 uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
1456 InternalKey** compaction_end, bool* manual_conflict) {
1457 assert(input_level == 0);
1458 assert(output_level == 0);
1459 *compaction_end = nullptr;
1460 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
1461 Compaction* c =
1462 PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer);
1463 log_buffer.FlushBufferToLog();
1464 return c;
1465}
1466
1467#endif // !ROCKSDB_LITE
1468
1469} // namespace rocksdb