]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction/compaction_picker_universal.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_picker_universal.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/compaction/compaction_picker_universal.h"
11 #ifndef ROCKSDB_LITE
12
13 #include <cinttypes>
14 #include <limits>
15 #include <queue>
16 #include <string>
17 #include <utility>
18 #include "db/column_family.h"
19 #include "file/filename.h"
20 #include "logging/log_buffer.h"
21 #include "monitoring/statistics.h"
22 #include "test_util/sync_point.h"
23 #include "util/random.h"
24 #include "util/string_util.h"
25
26 namespace ROCKSDB_NAMESPACE {
27 namespace {
28 // A helper class that form universal compactions. The class is used by
29 // UniversalCompactionPicker::PickCompaction().
30 // The usage is to create the class, and get the compaction object by calling
31 // PickCompaction().
32 class UniversalCompactionBuilder {
33 public:
34 UniversalCompactionBuilder(const ImmutableCFOptions& ioptions,
35 const InternalKeyComparator* icmp,
36 const std::string& cf_name,
37 const MutableCFOptions& mutable_cf_options,
38 VersionStorageInfo* vstorage,
39 UniversalCompactionPicker* picker,
40 LogBuffer* log_buffer)
41 : ioptions_(ioptions),
42 icmp_(icmp),
43 cf_name_(cf_name),
44 mutable_cf_options_(mutable_cf_options),
45 vstorage_(vstorage),
46 picker_(picker),
47 log_buffer_(log_buffer) {}
48
49 // Form and return the compaction object. The caller owns return object.
50 Compaction* PickCompaction();
51
52 private:
53 struct SortedRun {
54 SortedRun(int _level, FileMetaData* _file, uint64_t _size,
55 uint64_t _compensated_file_size, bool _being_compacted)
56 : level(_level),
57 file(_file),
58 size(_size),
59 compensated_file_size(_compensated_file_size),
60 being_compacted(_being_compacted) {
61 assert(compensated_file_size > 0);
62 assert(level != 0 || file != nullptr);
63 }
64
65 void Dump(char* out_buf, size_t out_buf_size,
66 bool print_path = false) const;
67
68 // sorted_run_count is added into the string to print
69 void DumpSizeInfo(char* out_buf, size_t out_buf_size,
70 size_t sorted_run_count) const;
71
72 int level;
73 // `file` Will be null for level > 0. For level = 0, the sorted run is
74 // for this file.
75 FileMetaData* file;
76 // For level > 0, `size` and `compensated_file_size` are sum of sizes all
77 // files in the level. `being_compacted` should be the same for all files
78 // in a non-zero level. Use the value here.
79 uint64_t size;
80 uint64_t compensated_file_size;
81 bool being_compacted;
82 };
83
84 // Pick Universal compaction to limit read amplification
85 Compaction* PickCompactionToReduceSortedRuns(
86 unsigned int ratio, unsigned int max_number_of_files_to_compact);
87
88 // Pick Universal compaction to limit space amplification.
89 Compaction* PickCompactionToReduceSizeAmp();
90
91 Compaction* PickDeleteTriggeredCompaction();
92
93 // Form a compaction from the sorted run indicated by start_index to the
94 // oldest sorted run.
95 // The caller is responsible for making sure that those files are not in
96 // compaction.
97 Compaction* PickCompactionToOldest(size_t start_index,
98 CompactionReason compaction_reason);
99
100 // Try to pick periodic compaction. The caller should only call it
101 // if there is at least one file marked for periodic compaction.
102 // null will be returned if no such a compaction can be formed
103 // because some files are being compacted.
104 Compaction* PickPeriodicCompaction();
105
106 // Used in universal compaction when the enabled_trivial_move
107 // option is set. Checks whether there are any overlapping files
108 // in the input. Returns true if the input files are non
109 // overlapping.
110 bool IsInputFilesNonOverlapping(Compaction* c);
111
112 const ImmutableCFOptions& ioptions_;
113 const InternalKeyComparator* icmp_;
114 double score_;
115 std::vector<SortedRun> sorted_runs_;
116 const std::string& cf_name_;
117 const MutableCFOptions& mutable_cf_options_;
118 VersionStorageInfo* vstorage_;
119 UniversalCompactionPicker* picker_;
120 LogBuffer* log_buffer_;
121
122 static std::vector<SortedRun> CalculateSortedRuns(
123 const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions,
124 const MutableCFOptions& mutable_cf_options);
125
126 // Pick a path ID to place a newly generated file, with its estimated file
127 // size.
128 static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
129 const MutableCFOptions& mutable_cf_options,
130 uint64_t file_size);
131 };
132
133 // Used in universal compaction when trivial move is enabled.
134 // This structure is used for the construction of min heap
135 // that contains the file meta data, the level of the file
136 // and the index of the file in that level
137
138 struct InputFileInfo {
139 InputFileInfo() : f(nullptr), level(0), index(0) {}
140
141 FileMetaData* f;
142 size_t level;
143 size_t index;
144 };
145
146 // Used in universal compaction when trivial move is enabled.
147 // This comparator is used for the construction of min heap
148 // based on the smallest key of the file.
149 struct SmallestKeyHeapComparator {
150 explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
151
152 bool operator()(InputFileInfo i1, InputFileInfo i2) const {
153 return (ucmp_->Compare(i1.f->smallest.user_key(),
154 i2.f->smallest.user_key()) > 0);
155 }
156
157 private:
158 const Comparator* ucmp_;
159 };
160
161 typedef std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
162 SmallestKeyHeapComparator>
163 SmallestKeyHeap;
164
165 // This function creates the heap that is used to find if the files are
166 // overlapping during universal compaction when the allow_trivial_move
167 // is set.
168 SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
169 SmallestKeyHeap smallest_key_priority_q =
170 SmallestKeyHeap(SmallestKeyHeapComparator(ucmp));
171
172 InputFileInfo input_file;
173
174 for (size_t l = 0; l < c->num_input_levels(); l++) {
175 if (c->num_input_files(l) != 0) {
176 if (l == 0 && c->start_level() == 0) {
177 for (size_t i = 0; i < c->num_input_files(0); i++) {
178 input_file.f = c->input(0, i);
179 input_file.level = 0;
180 input_file.index = i;
181 smallest_key_priority_q.push(std::move(input_file));
182 }
183 } else {
184 input_file.f = c->input(l, 0);
185 input_file.level = l;
186 input_file.index = 0;
187 smallest_key_priority_q.push(std::move(input_file));
188 }
189 }
190 }
191 return smallest_key_priority_q;
192 }
193
194 #ifndef NDEBUG
195 // smallest_seqno and largest_seqno are set iff. `files` is not empty.
196 void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
197 SequenceNumber* smallest_seqno,
198 SequenceNumber* largest_seqno) {
199 bool is_first = true;
200 for (FileMetaData* f : files) {
201 assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
202 if (is_first) {
203 is_first = false;
204 *smallest_seqno = f->fd.smallest_seqno;
205 *largest_seqno = f->fd.largest_seqno;
206 } else {
207 if (f->fd.smallest_seqno < *smallest_seqno) {
208 *smallest_seqno = f->fd.smallest_seqno;
209 }
210 if (f->fd.largest_seqno > *largest_seqno) {
211 *largest_seqno = f->fd.largest_seqno;
212 }
213 }
214 }
215 }
216 #endif
217 } // namespace
218
219 // Algorithm that checks to see if there are any overlapping
220 // files in the input
221 bool UniversalCompactionBuilder::IsInputFilesNonOverlapping(Compaction* c) {
222 auto comparator = icmp_->user_comparator();
223 int first_iter = 1;
224
225 InputFileInfo prev, curr, next;
226
227 SmallestKeyHeap smallest_key_priority_q =
228 create_level_heap(c, icmp_->user_comparator());
229
230 while (!smallest_key_priority_q.empty()) {
231 curr = smallest_key_priority_q.top();
232 smallest_key_priority_q.pop();
233
234 if (first_iter) {
235 prev = curr;
236 first_iter = 0;
237 } else {
238 if (comparator->Compare(prev.f->largest.user_key(),
239 curr.f->smallest.user_key()) >= 0) {
240 // found overlapping files, return false
241 return false;
242 }
243 assert(comparator->Compare(curr.f->largest.user_key(),
244 prev.f->largest.user_key()) > 0);
245 prev = curr;
246 }
247
248 next.f = nullptr;
249
250 if (c->level(curr.level) != 0 &&
251 curr.index < c->num_input_files(curr.level) - 1) {
252 next.f = c->input(curr.level, curr.index + 1);
253 next.level = curr.level;
254 next.index = curr.index + 1;
255 }
256
257 if (next.f) {
258 smallest_key_priority_q.push(std::move(next));
259 }
260 }
261 return true;
262 }
263
264 bool UniversalCompactionPicker::NeedsCompaction(
265 const VersionStorageInfo* vstorage) const {
266 const int kLevel0 = 0;
267 if (vstorage->CompactionScore(kLevel0) >= 1) {
268 return true;
269 }
270 if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) {
271 return true;
272 }
273 if (!vstorage->FilesMarkedForCompaction().empty()) {
274 return true;
275 }
276 return false;
277 }
278
279 Compaction* UniversalCompactionPicker::PickCompaction(
280 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
281 VersionStorageInfo* vstorage, LogBuffer* log_buffer,
282 SequenceNumber /* earliest_memtable_seqno */) {
283 UniversalCompactionBuilder builder(ioptions_, icmp_, cf_name,
284 mutable_cf_options, vstorage, this,
285 log_buffer);
286 return builder.PickCompaction();
287 }
288
289 void UniversalCompactionBuilder::SortedRun::Dump(char* out_buf,
290 size_t out_buf_size,
291 bool print_path) const {
292 if (level == 0) {
293 assert(file != nullptr);
294 if (file->fd.GetPathId() == 0 || !print_path) {
295 snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
296 } else {
297 snprintf(out_buf, out_buf_size, "file %" PRIu64
298 "(path "
299 "%" PRIu32 ")",
300 file->fd.GetNumber(), file->fd.GetPathId());
301 }
302 } else {
303 snprintf(out_buf, out_buf_size, "level %d", level);
304 }
305 }
306
307 void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
308 char* out_buf, size_t out_buf_size, size_t sorted_run_count) const {
309 if (level == 0) {
310 assert(file != nullptr);
311 snprintf(out_buf, out_buf_size,
312 "file %" PRIu64 "[%" ROCKSDB_PRIszt
313 "] "
314 "with size %" PRIu64 " (compensated size %" PRIu64 ")",
315 file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(),
316 file->compensated_file_size);
317 } else {
318 snprintf(out_buf, out_buf_size,
319 "level %d[%" ROCKSDB_PRIszt
320 "] "
321 "with size %" PRIu64 " (compensated size %" PRIu64 ")",
322 level, sorted_run_count, size, compensated_file_size);
323 }
324 }
325
326 std::vector<UniversalCompactionBuilder::SortedRun>
327 UniversalCompactionBuilder::CalculateSortedRuns(
328 const VersionStorageInfo& vstorage, const ImmutableCFOptions& /*ioptions*/,
329 const MutableCFOptions& mutable_cf_options) {
330 std::vector<UniversalCompactionBuilder::SortedRun> ret;
331 for (FileMetaData* f : vstorage.LevelFiles(0)) {
332 ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
333 f->being_compacted);
334 }
335 for (int level = 1; level < vstorage.num_levels(); level++) {
336 uint64_t total_compensated_size = 0U;
337 uint64_t total_size = 0U;
338 bool being_compacted = false;
339 bool is_first = true;
340 for (FileMetaData* f : vstorage.LevelFiles(level)) {
341 total_compensated_size += f->compensated_file_size;
342 total_size += f->fd.GetFileSize();
343 if (mutable_cf_options.compaction_options_universal.allow_trivial_move ==
344 true) {
345 if (f->being_compacted) {
346 being_compacted = f->being_compacted;
347 }
348 } else {
349 // Compaction always includes all files for a non-zero level, so for a
350 // non-zero level, all the files should share the same being_compacted
351 // value.
352 // This assumption is only valid when
353 // mutable_cf_options.compaction_options_universal.allow_trivial_move
354 // is false
355 assert(is_first || f->being_compacted == being_compacted);
356 }
357 if (is_first) {
358 being_compacted = f->being_compacted;
359 is_first = false;
360 }
361 }
362 if (total_compensated_size > 0) {
363 ret.emplace_back(level, nullptr, total_size, total_compensated_size,
364 being_compacted);
365 }
366 }
367 return ret;
368 }
369
370 // Universal style of compaction. Pick files that are contiguous in
371 // time-range to compact.
372 Compaction* UniversalCompactionBuilder::PickCompaction() {
373 const int kLevel0 = 0;
374 score_ = vstorage_->CompactionScore(kLevel0);
375 sorted_runs_ =
376 CalculateSortedRuns(*vstorage_, ioptions_, mutable_cf_options_);
377
378 if (sorted_runs_.size() == 0 ||
379 (vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
380 vstorage_->FilesMarkedForCompaction().empty() &&
381 sorted_runs_.size() < (unsigned int)mutable_cf_options_
382 .level0_file_num_compaction_trigger)) {
383 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: nothing to do\n",
384 cf_name_.c_str());
385 TEST_SYNC_POINT_CALLBACK(
386 "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
387 return nullptr;
388 }
389 VersionStorageInfo::LevelSummaryStorage tmp;
390 ROCKS_LOG_BUFFER_MAX_SZ(
391 log_buffer_, 3072,
392 "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n",
393 cf_name_.c_str(), sorted_runs_.size(), vstorage_->LevelSummary(&tmp));
394
395 Compaction* c = nullptr;
396 // Periodic compaction has higher priority than other type of compaction
397 // because it's a hard requirement.
398 if (!vstorage_->FilesMarkedForPeriodicCompaction().empty()) {
399 // Always need to do a full compaction for periodic compaction.
400 c = PickPeriodicCompaction();
401 }
402
403 // Check for size amplification.
404 if (c == nullptr &&
405 sorted_runs_.size() >=
406 static_cast<size_t>(
407 mutable_cf_options_.level0_file_num_compaction_trigger)) {
408 if ((c = PickCompactionToReduceSizeAmp()) != nullptr) {
409 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: compacting for size amp\n",
410 cf_name_.c_str());
411 } else {
412 // Size amplification is within limits. Try reducing read
413 // amplification while maintaining file size ratios.
414 unsigned int ratio =
415 mutable_cf_options_.compaction_options_universal.size_ratio;
416
417 if ((c = PickCompactionToReduceSortedRuns(ratio, UINT_MAX)) != nullptr) {
418 ROCKS_LOG_BUFFER(log_buffer_,
419 "[%s] Universal: compacting for size ratio\n",
420 cf_name_.c_str());
421 } else {
422 // Size amplification and file size ratios are within configured limits.
423 // If max read amplification is exceeding configured limits, then force
424 // compaction without looking at filesize ratios and try to reduce
425 // the number of files to fewer than level0_file_num_compaction_trigger.
426 // This is guaranteed by NeedsCompaction()
427 assert(sorted_runs_.size() >=
428 static_cast<size_t>(
429 mutable_cf_options_.level0_file_num_compaction_trigger));
430 // Get the total number of sorted runs that are not being compacted
431 int num_sr_not_compacted = 0;
432 for (size_t i = 0; i < sorted_runs_.size(); i++) {
433 if (sorted_runs_[i].being_compacted == false) {
434 num_sr_not_compacted++;
435 }
436 }
437
438 // The number of sorted runs that are not being compacted is greater
439 // than the maximum allowed number of sorted runs
440 if (num_sr_not_compacted >
441 mutable_cf_options_.level0_file_num_compaction_trigger) {
442 unsigned int num_files =
443 num_sr_not_compacted -
444 mutable_cf_options_.level0_file_num_compaction_trigger + 1;
445 if ((c = PickCompactionToReduceSortedRuns(UINT_MAX, num_files)) !=
446 nullptr) {
447 ROCKS_LOG_BUFFER(log_buffer_,
448 "[%s] Universal: compacting for file num -- %u\n",
449 cf_name_.c_str(), num_files);
450 }
451 }
452 }
453 }
454 }
455
456 if (c == nullptr) {
457 if ((c = PickDeleteTriggeredCompaction()) != nullptr) {
458 ROCKS_LOG_BUFFER(log_buffer_,
459 "[%s] Universal: delete triggered compaction\n",
460 cf_name_.c_str());
461 }
462 }
463
464 if (c == nullptr) {
465 TEST_SYNC_POINT_CALLBACK(
466 "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
467 return nullptr;
468 }
469
470 if (mutable_cf_options_.compaction_options_universal.allow_trivial_move ==
471 true &&
472 c->compaction_reason() != CompactionReason::kPeriodicCompaction) {
473 c->set_is_trivial_move(IsInputFilesNonOverlapping(c));
474 }
475
476 // validate that all the chosen files of L0 are non overlapping in time
477 #ifndef NDEBUG
478 SequenceNumber prev_smallest_seqno = 0U;
479 bool is_first = true;
480
481 size_t level_index = 0U;
482 if (c->start_level() == 0) {
483 for (auto f : *c->inputs(0)) {
484 assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
485 if (is_first) {
486 is_first = false;
487 }
488 prev_smallest_seqno = f->fd.smallest_seqno;
489 }
490 level_index = 1U;
491 }
492 for (; level_index < c->num_input_levels(); level_index++) {
493 if (c->num_input_files(level_index) != 0) {
494 SequenceNumber smallest_seqno = 0U;
495 SequenceNumber largest_seqno = 0U;
496 GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
497 &largest_seqno);
498 if (is_first) {
499 is_first = false;
500 } else if (prev_smallest_seqno > 0) {
501 // A level is considered as the bottommost level if there are
502 // no files in higher levels or if files in higher levels do
503 // not overlap with the files being compacted. Sequence numbers
504 // of files in bottommost level can be set to 0 to help
505 // compression. As a result, the following assert may not hold
506 // if the prev_smallest_seqno is 0.
507 assert(prev_smallest_seqno > largest_seqno);
508 }
509 prev_smallest_seqno = smallest_seqno;
510 }
511 }
512 #endif
513 // update statistics
514 RecordInHistogram(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION,
515 c->inputs(0)->size());
516
517 picker_->RegisterCompaction(c);
518 vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
519
520 TEST_SYNC_POINT_CALLBACK("UniversalCompactionBuilder::PickCompaction:Return",
521 c);
522 return c;
523 }
524
525 uint32_t UniversalCompactionBuilder::GetPathId(
526 const ImmutableCFOptions& ioptions,
527 const MutableCFOptions& mutable_cf_options, uint64_t file_size) {
528 // Two conditions need to be satisfied:
529 // (1) the target path needs to be able to hold the file's size
530 // (2) Total size left in this and previous paths need to be not
531 // smaller than expected future file size before this new file is
532 // compacted, which is estimated based on size_ratio.
533 // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
534 // we will make sure the target file, probably with size of 16, will be
535 // placed in a path so that eventually when new files are generated and
536 // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
537 // before the path we chose.
538 //
539 // TODO(sdong): now the case of multiple column families is not
540 // considered in this algorithm. So the target size can be violated in
541 // that case. We need to improve it.
542 uint64_t accumulated_size = 0;
543 uint64_t future_size =
544 file_size *
545 (100 - mutable_cf_options.compaction_options_universal.size_ratio) / 100;
546 uint32_t p = 0;
547 assert(!ioptions.cf_paths.empty());
548 for (; p < ioptions.cf_paths.size() - 1; p++) {
549 uint64_t target_size = ioptions.cf_paths[p].target_size;
550 if (target_size > file_size &&
551 accumulated_size + (target_size - file_size) > future_size) {
552 return p;
553 }
554 accumulated_size += target_size;
555 }
556 return p;
557 }
558
559 //
560 // Consider compaction files based on their size differences with
561 // the next file in time order.
562 //
563 Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
564 unsigned int ratio, unsigned int max_number_of_files_to_compact) {
565 unsigned int min_merge_width =
566 mutable_cf_options_.compaction_options_universal.min_merge_width;
567 unsigned int max_merge_width =
568 mutable_cf_options_.compaction_options_universal.max_merge_width;
569
570 const SortedRun* sr = nullptr;
571 bool done = false;
572 size_t start_index = 0;
573 unsigned int candidate_count = 0;
574
575 unsigned int max_files_to_compact =
576 std::min(max_merge_width, max_number_of_files_to_compact);
577 min_merge_width = std::max(min_merge_width, 2U);
578
579 // Caller checks the size before executing this function. This invariant is
580 // important because otherwise we may have a possible integer underflow when
581 // dealing with unsigned types.
582 assert(sorted_runs_.size() > 0);
583
584 // Considers a candidate file only if it is smaller than the
585 // total size accumulated so far.
586 for (size_t loop = 0; loop < sorted_runs_.size(); loop++) {
587 candidate_count = 0;
588
589 // Skip files that are already being compacted
590 for (sr = nullptr; loop < sorted_runs_.size(); loop++) {
591 sr = &sorted_runs_[loop];
592
593 if (!sr->being_compacted) {
594 candidate_count = 1;
595 break;
596 }
597 char file_num_buf[kFormatFileNumberBufSize];
598 sr->Dump(file_num_buf, sizeof(file_num_buf));
599 ROCKS_LOG_BUFFER(log_buffer_,
600 "[%s] Universal: %s"
601 "[%d] being compacted, skipping",
602 cf_name_.c_str(), file_num_buf, loop);
603
604 sr = nullptr;
605 }
606
607 // This file is not being compacted. Consider it as the
608 // first candidate to be compacted.
609 uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0;
610 if (sr != nullptr) {
611 char file_num_buf[kFormatFileNumberBufSize];
612 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
613 ROCKS_LOG_BUFFER(log_buffer_,
614 "[%s] Universal: Possible candidate %s[%d].",
615 cf_name_.c_str(), file_num_buf, loop);
616 }
617
618 // Check if the succeeding files need compaction.
619 for (size_t i = loop + 1;
620 candidate_count < max_files_to_compact && i < sorted_runs_.size();
621 i++) {
622 const SortedRun* succeeding_sr = &sorted_runs_[i];
623 if (succeeding_sr->being_compacted) {
624 break;
625 }
626 // Pick files if the total/last candidate file size (increased by the
627 // specified ratio) is still larger than the next candidate file.
628 // candidate_size is the total size of files picked so far with the
629 // default kCompactionStopStyleTotalSize; with
630 // kCompactionStopStyleSimilarSize, it's simply the size of the last
631 // picked file.
632 double sz = candidate_size * (100.0 + ratio) / 100.0;
633 if (sz < static_cast<double>(succeeding_sr->size)) {
634 break;
635 }
636 if (mutable_cf_options_.compaction_options_universal.stop_style ==
637 kCompactionStopStyleSimilarSize) {
638 // Similar-size stopping rule: also check the last picked file isn't
639 // far larger than the next candidate file.
640 sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0;
641 if (sz < static_cast<double>(candidate_size)) {
642 // If the small file we've encountered begins a run of similar-size
643 // files, we'll pick them up on a future iteration of the outer
644 // loop. If it's some lonely straggler, it'll eventually get picked
645 // by the last-resort read amp strategy which disregards size ratios.
646 break;
647 }
648 candidate_size = succeeding_sr->compensated_file_size;
649 } else { // default kCompactionStopStyleTotalSize
650 candidate_size += succeeding_sr->compensated_file_size;
651 }
652 candidate_count++;
653 }
654
655 // Found a series of consecutive files that need compaction.
656 if (candidate_count >= (unsigned int)min_merge_width) {
657 start_index = loop;
658 done = true;
659 break;
660 } else {
661 for (size_t i = loop;
662 i < loop + candidate_count && i < sorted_runs_.size(); i++) {
663 const SortedRun* skipping_sr = &sorted_runs_[i];
664 char file_num_buf[256];
665 skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
666 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Skipping %s",
667 cf_name_.c_str(), file_num_buf);
668 }
669 }
670 }
671 if (!done || candidate_count <= 1) {
672 return nullptr;
673 }
674 size_t first_index_after = start_index + candidate_count;
675 // Compression is enabled if files compacted earlier already reached
676 // size ratio of compression.
677 bool enable_compression = true;
678 int ratio_to_compress =
679 mutable_cf_options_.compaction_options_universal.compression_size_percent;
680 if (ratio_to_compress >= 0) {
681 uint64_t total_size = 0;
682 for (auto& sorted_run : sorted_runs_) {
683 total_size += sorted_run.compensated_file_size;
684 }
685
686 uint64_t older_file_size = 0;
687 for (size_t i = sorted_runs_.size() - 1; i >= first_index_after; i--) {
688 older_file_size += sorted_runs_[i].size;
689 if (older_file_size * 100L >= total_size * (long)ratio_to_compress) {
690 enable_compression = false;
691 break;
692 }
693 }
694 }
695
696 uint64_t estimated_total_size = 0;
697 for (unsigned int i = 0; i < first_index_after; i++) {
698 estimated_total_size += sorted_runs_[i].size;
699 }
700 uint32_t path_id =
701 GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
702 int start_level = sorted_runs_[start_index].level;
703 int output_level;
704 if (first_index_after == sorted_runs_.size()) {
705 output_level = vstorage_->num_levels() - 1;
706 } else if (sorted_runs_[first_index_after].level == 0) {
707 output_level = 0;
708 } else {
709 output_level = sorted_runs_[first_index_after].level - 1;
710 }
711
712 // last level is reserved for the files ingested behind
713 if (ioptions_.allow_ingest_behind &&
714 (output_level == vstorage_->num_levels() - 1)) {
715 assert(output_level > 1);
716 output_level--;
717 }
718
719 std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
720 for (size_t i = 0; i < inputs.size(); ++i) {
721 inputs[i].level = start_level + static_cast<int>(i);
722 }
723 for (size_t i = start_index; i < first_index_after; i++) {
724 auto& picking_sr = sorted_runs_[i];
725 if (picking_sr.level == 0) {
726 FileMetaData* picking_file = picking_sr.file;
727 inputs[0].files.push_back(picking_file);
728 } else {
729 auto& files = inputs[picking_sr.level - start_level].files;
730 for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
731 files.push_back(f);
732 }
733 }
734 char file_num_buf[256];
735 picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
736 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Picking %s",
737 cf_name_.c_str(), file_num_buf);
738 }
739
740 CompactionReason compaction_reason;
741 if (max_number_of_files_to_compact == UINT_MAX) {
742 compaction_reason = CompactionReason::kUniversalSizeRatio;
743 } else {
744 compaction_reason = CompactionReason::kUniversalSortedRunNum;
745 }
746 return new Compaction(
747 vstorage_, ioptions_, mutable_cf_options_, std::move(inputs),
748 output_level,
749 MaxFileSizeForLevel(mutable_cf_options_, output_level,
750 kCompactionStyleUniversal),
751 LLONG_MAX, path_id,
752 GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
753 1, enable_compression),
754 GetCompressionOptions(ioptions_, vstorage_, start_level,
755 enable_compression),
756 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
757 score_, false /* deletion_compaction */, compaction_reason);
758 }
759
760 // Look at overall size amplification. If size amplification
761 // exceeeds the configured value, then do a compaction
762 // of the candidate files all the way upto the earliest
763 // base file (overrides configured values of file-size ratios,
764 // min_merge_width and max_merge_width).
765 //
766 Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
767 // percentage flexibility while reducing size amplification
768 uint64_t ratio = mutable_cf_options_.compaction_options_universal
769 .max_size_amplification_percent;
770
771 unsigned int candidate_count = 0;
772 uint64_t candidate_size = 0;
773 size_t start_index = 0;
774 const SortedRun* sr = nullptr;
775
776 assert(!sorted_runs_.empty());
777 if (sorted_runs_.back().being_compacted) {
778 return nullptr;
779 }
780
781 // Skip files that are already being compacted
782 for (size_t loop = 0; loop < sorted_runs_.size() - 1; loop++) {
783 sr = &sorted_runs_[loop];
784 if (!sr->being_compacted) {
785 start_index = loop; // Consider this as the first candidate.
786 break;
787 }
788 char file_num_buf[kFormatFileNumberBufSize];
789 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
790 ROCKS_LOG_BUFFER(log_buffer_,
791 "[%s] Universal: skipping %s[%d] compacted %s",
792 cf_name_.c_str(), file_num_buf, loop,
793 " cannot be a candidate to reduce size amp.\n");
794 sr = nullptr;
795 }
796
797 if (sr == nullptr) {
798 return nullptr; // no candidate files
799 }
800 {
801 char file_num_buf[kFormatFileNumberBufSize];
802 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
803 ROCKS_LOG_BUFFER(
804 log_buffer_,
805 "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s",
806 cf_name_.c_str(), file_num_buf, start_index, " to reduce size amp.\n");
807 }
808
809 // keep adding up all the remaining files
810 for (size_t loop = start_index; loop < sorted_runs_.size() - 1; loop++) {
811 sr = &sorted_runs_[loop];
812 if (sr->being_compacted) {
813 char file_num_buf[kFormatFileNumberBufSize];
814 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
815 ROCKS_LOG_BUFFER(
816 log_buffer_, "[%s] Universal: Possible candidate %s[%d] %s",
817 cf_name_.c_str(), file_num_buf, start_index,
818 " is already being compacted. No size amp reduction possible.\n");
819 return nullptr;
820 }
821 candidate_size += sr->compensated_file_size;
822 candidate_count++;
823 }
824 if (candidate_count == 0) {
825 return nullptr;
826 }
827
828 // size of earliest file
829 uint64_t earliest_file_size = sorted_runs_.back().size;
830
831 // size amplification = percentage of additional size
832 if (candidate_size * 100 < ratio * earliest_file_size) {
833 ROCKS_LOG_BUFFER(
834 log_buffer_,
835 "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
836 " earliest-file-size %" PRIu64,
837 cf_name_.c_str(), candidate_size, earliest_file_size);
838 return nullptr;
839 } else {
840 ROCKS_LOG_BUFFER(
841 log_buffer_,
842 "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
843 " earliest-file-size %" PRIu64,
844 cf_name_.c_str(), candidate_size, earliest_file_size);
845 }
846 return PickCompactionToOldest(start_index,
847 CompactionReason::kUniversalSizeAmplification);
848 }
849
850 // Pick files marked for compaction. Typically, files are marked by
851 // CompactOnDeleteCollector due to the presence of tombstones.
852 Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
853 CompactionInputFiles start_level_inputs;
854 int output_level;
855 std::vector<CompactionInputFiles> inputs;
856
857 if (vstorage_->num_levels() == 1) {
858 // This is single level universal. Since we're basically trying to reclaim
859 // space by processing files marked for compaction due to high tombstone
860 // density, let's do the same thing as compaction to reduce size amp which
861 // has the same goals.
862 bool compact = false;
863
864 start_level_inputs.level = 0;
865 start_level_inputs.files.clear();
866 output_level = 0;
867 for (FileMetaData* f : vstorage_->LevelFiles(0)) {
868 if (f->marked_for_compaction) {
869 compact = true;
870 }
871 if (compact) {
872 start_level_inputs.files.push_back(f);
873 }
874 }
875 if (start_level_inputs.size() <= 1) {
876 // If only the last file in L0 is marked for compaction, ignore it
877 return nullptr;
878 }
879 inputs.push_back(start_level_inputs);
880 } else {
881 int start_level;
882
883 // For multi-level universal, the strategy is to make this look more like
884 // leveled. We pick one of the files marked for compaction and compact with
885 // overlapping files in the adjacent level.
886 picker_->PickFilesMarkedForCompaction(cf_name_, vstorage_, &start_level,
887 &output_level, &start_level_inputs);
888 if (start_level_inputs.empty()) {
889 return nullptr;
890 }
891
892 // Pick the first non-empty level after the start_level
893 for (output_level = start_level + 1; output_level < vstorage_->num_levels();
894 output_level++) {
895 if (vstorage_->NumLevelFiles(output_level) != 0) {
896 break;
897 }
898 }
899
900 // If all higher levels are empty, pick the highest level as output level
901 if (output_level == vstorage_->num_levels()) {
902 if (start_level == 0) {
903 output_level = vstorage_->num_levels() - 1;
904 } else {
905 // If start level is non-zero and all higher levels are empty, this
906 // compaction will translate into a trivial move. Since the idea is
907 // to reclaim space and trivial move doesn't help with that, we
908 // skip compaction in this case and return nullptr
909 return nullptr;
910 }
911 }
912 if (ioptions_.allow_ingest_behind &&
913 output_level == vstorage_->num_levels() - 1) {
914 assert(output_level > 1);
915 output_level--;
916 }
917
918 if (output_level != 0) {
919 if (start_level == 0) {
920 if (!picker_->GetOverlappingL0Files(vstorage_, &start_level_inputs,
921 output_level, nullptr)) {
922 return nullptr;
923 }
924 }
925
926 CompactionInputFiles output_level_inputs;
927 int parent_index = -1;
928
929 output_level_inputs.level = output_level;
930 if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_,
931 &start_level_inputs, &output_level_inputs,
932 &parent_index, -1)) {
933 return nullptr;
934 }
935 inputs.push_back(start_level_inputs);
936 if (!output_level_inputs.empty()) {
937 inputs.push_back(output_level_inputs);
938 }
939 if (picker_->FilesRangeOverlapWithCompaction(inputs, output_level)) {
940 return nullptr;
941 }
942 } else {
943 inputs.push_back(start_level_inputs);
944 }
945 }
946
947 uint64_t estimated_total_size = 0;
948 // Use size of the output level as estimated file size
949 for (FileMetaData* f : vstorage_->LevelFiles(output_level)) {
950 estimated_total_size += f->fd.GetFileSize();
951 }
952 uint32_t path_id =
953 GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
954 return new Compaction(
955 vstorage_, ioptions_, mutable_cf_options_, std::move(inputs),
956 output_level,
957 MaxFileSizeForLevel(mutable_cf_options_, output_level,
958 kCompactionStyleUniversal),
959 /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
960 GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
961 output_level, 1),
962 GetCompressionOptions(ioptions_, vstorage_, output_level),
963 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true,
964 score_, false /* deletion_compaction */,
965 CompactionReason::kFilesMarkedForCompaction);
966 }
967
968 Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
969 size_t start_index, CompactionReason compaction_reason) {
970 assert(start_index < sorted_runs_.size());
971
972 // Estimate total file size
973 uint64_t estimated_total_size = 0;
974 for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) {
975 estimated_total_size += sorted_runs_[loop].size;
976 }
977 uint32_t path_id =
978 GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
979 int start_level = sorted_runs_[start_index].level;
980
981 std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
982 for (size_t i = 0; i < inputs.size(); ++i) {
983 inputs[i].level = start_level + static_cast<int>(i);
984 }
985 for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) {
986 auto& picking_sr = sorted_runs_[loop];
987 if (picking_sr.level == 0) {
988 FileMetaData* f = picking_sr.file;
989 inputs[0].files.push_back(f);
990 } else {
991 auto& files = inputs[picking_sr.level - start_level].files;
992 for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
993 files.push_back(f);
994 }
995 }
996 std::string comp_reason_print_string;
997 if (compaction_reason == CompactionReason::kPeriodicCompaction) {
998 comp_reason_print_string = "periodic compaction";
999 } else if (compaction_reason ==
1000 CompactionReason::kUniversalSizeAmplification) {
1001 comp_reason_print_string = "size amp";
1002 } else {
1003 assert(false);
1004 }
1005
1006 char file_num_buf[256];
1007 picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
1008 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: %s picking %s",
1009 cf_name_.c_str(), comp_reason_print_string.c_str(),
1010 file_num_buf);
1011 }
1012
1013 // output files at the bottom most level, unless it's reserved
1014 int output_level = vstorage_->num_levels() - 1;
1015 // last level is reserved for the files ingested behind
1016 if (ioptions_.allow_ingest_behind) {
1017 assert(output_level > 1);
1018 output_level--;
1019 }
1020
1021 // We never check size for
1022 // compaction_options_universal.compression_size_percent,
1023 // because we always compact all the files, so always compress.
1024 return new Compaction(
1025 vstorage_, ioptions_, mutable_cf_options_, std::move(inputs),
1026 output_level,
1027 MaxFileSizeForLevel(mutable_cf_options_, output_level,
1028 kCompactionStyleUniversal),
1029 LLONG_MAX, path_id,
1030 GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
1031 1, true /* enable_compression */),
1032 GetCompressionOptions(ioptions_, vstorage_, start_level,
1033 true /* enable_compression */),
1034 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
1035 score_, false /* deletion_compaction */, compaction_reason);
1036 }
1037
1038 Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() {
1039 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Periodic Compaction",
1040 cf_name_.c_str());
1041
1042 // In universal compaction, sorted runs contain older data are almost always
1043 // generated earlier too. To simplify the problem, we just try to trigger
1044 // a full compaction. We start from the oldest sorted run and include
1045 // all sorted runs, until we hit a sorted already being compacted.
1046 // Since usually the largest (which is usually the oldest) sorted run is
1047 // included anyway, doing a full compaction won't increase write
1048 // amplification much.
1049
1050 // Get some information from marked files to check whether a file is
1051 // included in the compaction.
1052
1053 size_t start_index = sorted_runs_.size();
1054 while (start_index > 0 && !sorted_runs_[start_index - 1].being_compacted) {
1055 start_index--;
1056 }
1057 if (start_index == sorted_runs_.size()) {
1058 return nullptr;
1059 }
1060
1061 // There is a rare corner case where we can't pick up all the files
1062 // because some files are being compacted and we end up with picking files
1063 // but none of them need periodic compaction. Unless we simply recompact
1064 // the last sorted run (either the last level or last L0 file), we would just
1065 // execute the compaction, in order to simplify the logic.
1066 if (start_index == sorted_runs_.size() - 1) {
1067 bool included_file_marked = false;
1068 int start_level = sorted_runs_[start_index].level;
1069 FileMetaData* start_file = sorted_runs_[start_index].file;
1070 for (const std::pair<int, FileMetaData*>& level_file_pair :
1071 vstorage_->FilesMarkedForPeriodicCompaction()) {
1072 if (start_level != 0) {
1073 // Last sorted run is a level
1074 if (start_level == level_file_pair.first) {
1075 included_file_marked = true;
1076 break;
1077 }
1078 } else {
1079 // Last sorted run is a L0 file.
1080 if (start_file == level_file_pair.second) {
1081 included_file_marked = true;
1082 break;
1083 }
1084 }
1085 }
1086 if (!included_file_marked) {
1087 ROCKS_LOG_BUFFER(log_buffer_,
1088 "[%s] Universal: Cannot form a compaction covering file "
1089 "marked for periodic compaction",
1090 cf_name_.c_str());
1091 return nullptr;
1092 }
1093 }
1094
1095 Compaction* c = PickCompactionToOldest(start_index,
1096 CompactionReason::kPeriodicCompaction);
1097
1098 TEST_SYNC_POINT_CALLBACK(
1099 "UniversalCompactionPicker::PickPeriodicCompaction:Return", c);
1100
1101 return c;
1102 }
1103 } // namespace ROCKSDB_NAMESPACE
1104
1105 #endif // !ROCKSDB_LITE