]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction/compaction_picker_universal.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_picker_universal.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/compaction/compaction_picker_universal.h"
11 #ifndef ROCKSDB_LITE
12
13 #include <cinttypes>
14 #include <limits>
15 #include <queue>
16 #include <string>
17 #include <utility>
18 #include "db/column_family.h"
19 #include "file/filename.h"
20 #include "logging/log_buffer.h"
21 #include "monitoring/statistics.h"
22 #include "test_util/sync_point.h"
23 #include "util/random.h"
24 #include "util/string_util.h"
25
26 namespace ROCKSDB_NAMESPACE {
27 namespace {
28 // A helper class that form universal compactions. The class is used by
29 // UniversalCompactionPicker::PickCompaction().
30 // The usage is to create the class, and get the compaction object by calling
31 // PickCompaction().
32 class UniversalCompactionBuilder {
33 public:
34 UniversalCompactionBuilder(
35 const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp,
36 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
37 const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
38 UniversalCompactionPicker* picker, LogBuffer* log_buffer)
39 : ioptions_(ioptions),
40 icmp_(icmp),
41 cf_name_(cf_name),
42 mutable_cf_options_(mutable_cf_options),
43 mutable_db_options_(mutable_db_options),
44 vstorage_(vstorage),
45 picker_(picker),
46 log_buffer_(log_buffer) {}
47
48 // Form and return the compaction object. The caller owns return object.
49 Compaction* PickCompaction();
50
51 private:
52 struct SortedRun {
53 SortedRun(int _level, FileMetaData* _file, uint64_t _size,
54 uint64_t _compensated_file_size, bool _being_compacted)
55 : level(_level),
56 file(_file),
57 size(_size),
58 compensated_file_size(_compensated_file_size),
59 being_compacted(_being_compacted) {
60 assert(compensated_file_size > 0);
61 assert(level != 0 || file != nullptr);
62 }
63
64 void Dump(char* out_buf, size_t out_buf_size,
65 bool print_path = false) const;
66
67 // sorted_run_count is added into the string to print
68 void DumpSizeInfo(char* out_buf, size_t out_buf_size,
69 size_t sorted_run_count) const;
70
71 int level;
72 // `file` Will be null for level > 0. For level = 0, the sorted run is
73 // for this file.
74 FileMetaData* file;
75 // For level > 0, `size` and `compensated_file_size` are sum of sizes all
76 // files in the level. `being_compacted` should be the same for all files
77 // in a non-zero level. Use the value here.
78 uint64_t size;
79 uint64_t compensated_file_size;
80 bool being_compacted;
81 };
82
83 // Pick Universal compaction to limit read amplification
84 Compaction* PickCompactionToReduceSortedRuns(
85 unsigned int ratio, unsigned int max_number_of_files_to_compact);
86
87 // Pick Universal compaction to limit space amplification.
88 Compaction* PickCompactionToReduceSizeAmp();
89
90 Compaction* PickDeleteTriggeredCompaction();
91
92 // Form a compaction from the sorted run indicated by start_index to the
93 // oldest sorted run.
94 // The caller is responsible for making sure that those files are not in
95 // compaction.
96 Compaction* PickCompactionToOldest(size_t start_index,
97 CompactionReason compaction_reason);
98
99 // Try to pick periodic compaction. The caller should only call it
100 // if there is at least one file marked for periodic compaction.
101 // null will be returned if no such a compaction can be formed
102 // because some files are being compacted.
103 Compaction* PickPeriodicCompaction();
104
105 // Used in universal compaction when the enabled_trivial_move
106 // option is set. Checks whether there are any overlapping files
107 // in the input. Returns true if the input files are non
108 // overlapping.
109 bool IsInputFilesNonOverlapping(Compaction* c);
110
111 const ImmutableCFOptions& ioptions_;
112 const InternalKeyComparator* icmp_;
113 double score_;
114 std::vector<SortedRun> sorted_runs_;
115 const std::string& cf_name_;
116 const MutableCFOptions& mutable_cf_options_;
117 const MutableDBOptions& mutable_db_options_;
118 VersionStorageInfo* vstorage_;
119 UniversalCompactionPicker* picker_;
120 LogBuffer* log_buffer_;
121
122 static std::vector<SortedRun> CalculateSortedRuns(
123 const VersionStorageInfo& vstorage);
124
125 // Pick a path ID to place a newly generated file, with its estimated file
126 // size.
127 static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
128 const MutableCFOptions& mutable_cf_options,
129 uint64_t file_size);
130 };
131
132 // Used in universal compaction when trivial move is enabled.
133 // This structure is used for the construction of min heap
134 // that contains the file meta data, the level of the file
135 // and the index of the file in that level
136
137 struct InputFileInfo {
138 InputFileInfo() : f(nullptr), level(0), index(0) {}
139
140 FileMetaData* f;
141 size_t level;
142 size_t index;
143 };
144
145 // Used in universal compaction when trivial move is enabled.
146 // This comparator is used for the construction of min heap
147 // based on the smallest key of the file.
148 struct SmallestKeyHeapComparator {
149 explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
150
151 bool operator()(InputFileInfo i1, InputFileInfo i2) const {
152 return (ucmp_->Compare(i1.f->smallest.user_key(),
153 i2.f->smallest.user_key()) > 0);
154 }
155
156 private:
157 const Comparator* ucmp_;
158 };
159
160 typedef std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
161 SmallestKeyHeapComparator>
162 SmallestKeyHeap;
163
164 // This function creates the heap that is used to find if the files are
165 // overlapping during universal compaction when the allow_trivial_move
166 // is set.
167 SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
168 SmallestKeyHeap smallest_key_priority_q =
169 SmallestKeyHeap(SmallestKeyHeapComparator(ucmp));
170
171 InputFileInfo input_file;
172
173 for (size_t l = 0; l < c->num_input_levels(); l++) {
174 if (c->num_input_files(l) != 0) {
175 if (l == 0 && c->start_level() == 0) {
176 for (size_t i = 0; i < c->num_input_files(0); i++) {
177 input_file.f = c->input(0, i);
178 input_file.level = 0;
179 input_file.index = i;
180 smallest_key_priority_q.push(std::move(input_file));
181 }
182 } else {
183 input_file.f = c->input(l, 0);
184 input_file.level = l;
185 input_file.index = 0;
186 smallest_key_priority_q.push(std::move(input_file));
187 }
188 }
189 }
190 return smallest_key_priority_q;
191 }
192
193 #ifndef NDEBUG
194 // smallest_seqno and largest_seqno are set iff. `files` is not empty.
195 void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
196 SequenceNumber* smallest_seqno,
197 SequenceNumber* largest_seqno) {
198 bool is_first = true;
199 for (FileMetaData* f : files) {
200 assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
201 if (is_first) {
202 is_first = false;
203 *smallest_seqno = f->fd.smallest_seqno;
204 *largest_seqno = f->fd.largest_seqno;
205 } else {
206 if (f->fd.smallest_seqno < *smallest_seqno) {
207 *smallest_seqno = f->fd.smallest_seqno;
208 }
209 if (f->fd.largest_seqno > *largest_seqno) {
210 *largest_seqno = f->fd.largest_seqno;
211 }
212 }
213 }
214 }
215 #endif
216 } // namespace
217
218 // Algorithm that checks to see if there are any overlapping
219 // files in the input
220 bool UniversalCompactionBuilder::IsInputFilesNonOverlapping(Compaction* c) {
221 auto comparator = icmp_->user_comparator();
222 int first_iter = 1;
223
224 InputFileInfo prev, curr, next;
225
226 SmallestKeyHeap smallest_key_priority_q =
227 create_level_heap(c, icmp_->user_comparator());
228
229 while (!smallest_key_priority_q.empty()) {
230 curr = smallest_key_priority_q.top();
231 smallest_key_priority_q.pop();
232
233 if (first_iter) {
234 prev = curr;
235 first_iter = 0;
236 } else {
237 if (comparator->Compare(prev.f->largest.user_key(),
238 curr.f->smallest.user_key()) >= 0) {
239 // found overlapping files, return false
240 return false;
241 }
242 assert(comparator->Compare(curr.f->largest.user_key(),
243 prev.f->largest.user_key()) > 0);
244 prev = curr;
245 }
246
247 next.f = nullptr;
248
249 if (c->level(curr.level) != 0 &&
250 curr.index < c->num_input_files(curr.level) - 1) {
251 next.f = c->input(curr.level, curr.index + 1);
252 next.level = curr.level;
253 next.index = curr.index + 1;
254 }
255
256 if (next.f) {
257 smallest_key_priority_q.push(std::move(next));
258 }
259 }
260 return true;
261 }
262
263 bool UniversalCompactionPicker::NeedsCompaction(
264 const VersionStorageInfo* vstorage) const {
265 const int kLevel0 = 0;
266 if (vstorage->CompactionScore(kLevel0) >= 1) {
267 return true;
268 }
269 if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) {
270 return true;
271 }
272 if (!vstorage->FilesMarkedForCompaction().empty()) {
273 return true;
274 }
275 return false;
276 }
277
278 Compaction* UniversalCompactionPicker::PickCompaction(
279 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
280 const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
281 LogBuffer* log_buffer, SequenceNumber /* earliest_memtable_seqno */) {
282 UniversalCompactionBuilder builder(ioptions_, icmp_, cf_name,
283 mutable_cf_options, mutable_db_options,
284 vstorage, this, log_buffer);
285 return builder.PickCompaction();
286 }
287
288 void UniversalCompactionBuilder::SortedRun::Dump(char* out_buf,
289 size_t out_buf_size,
290 bool print_path) const {
291 if (level == 0) {
292 assert(file != nullptr);
293 if (file->fd.GetPathId() == 0 || !print_path) {
294 snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
295 } else {
296 snprintf(out_buf, out_buf_size, "file %" PRIu64
297 "(path "
298 "%" PRIu32 ")",
299 file->fd.GetNumber(), file->fd.GetPathId());
300 }
301 } else {
302 snprintf(out_buf, out_buf_size, "level %d", level);
303 }
304 }
305
306 void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
307 char* out_buf, size_t out_buf_size, size_t sorted_run_count) const {
308 if (level == 0) {
309 assert(file != nullptr);
310 snprintf(out_buf, out_buf_size,
311 "file %" PRIu64 "[%" ROCKSDB_PRIszt
312 "] "
313 "with size %" PRIu64 " (compensated size %" PRIu64 ")",
314 file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(),
315 file->compensated_file_size);
316 } else {
317 snprintf(out_buf, out_buf_size,
318 "level %d[%" ROCKSDB_PRIszt
319 "] "
320 "with size %" PRIu64 " (compensated size %" PRIu64 ")",
321 level, sorted_run_count, size, compensated_file_size);
322 }
323 }
324
325 std::vector<UniversalCompactionBuilder::SortedRun>
326 UniversalCompactionBuilder::CalculateSortedRuns(
327 const VersionStorageInfo& vstorage) {
328 std::vector<UniversalCompactionBuilder::SortedRun> ret;
329 for (FileMetaData* f : vstorage.LevelFiles(0)) {
330 ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
331 f->being_compacted);
332 }
333 for (int level = 1; level < vstorage.num_levels(); level++) {
334 uint64_t total_compensated_size = 0U;
335 uint64_t total_size = 0U;
336 bool being_compacted = false;
337 for (FileMetaData* f : vstorage.LevelFiles(level)) {
338 total_compensated_size += f->compensated_file_size;
339 total_size += f->fd.GetFileSize();
340 // Size amp, read amp and periodic compactions always include all files
341 // for a non-zero level. However, a delete triggered compaction and
342 // a trivial move might pick a subset of files in a sorted run. So
343 // always check all files in a sorted run and mark the entire run as
344 // being compacted if one or more files are being compacted
345 if (f->being_compacted) {
346 being_compacted = f->being_compacted;
347 }
348 }
349 if (total_compensated_size > 0) {
350 ret.emplace_back(level, nullptr, total_size, total_compensated_size,
351 being_compacted);
352 }
353 }
354 return ret;
355 }
356
357 // Universal style of compaction. Pick files that are contiguous in
358 // time-range to compact.
359 Compaction* UniversalCompactionBuilder::PickCompaction() {
360 const int kLevel0 = 0;
361 score_ = vstorage_->CompactionScore(kLevel0);
362 sorted_runs_ = CalculateSortedRuns(*vstorage_);
363
364 if (sorted_runs_.size() == 0 ||
365 (vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
366 vstorage_->FilesMarkedForCompaction().empty() &&
367 sorted_runs_.size() < (unsigned int)mutable_cf_options_
368 .level0_file_num_compaction_trigger)) {
369 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: nothing to do\n",
370 cf_name_.c_str());
371 TEST_SYNC_POINT_CALLBACK(
372 "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
373 return nullptr;
374 }
375 VersionStorageInfo::LevelSummaryStorage tmp;
376 ROCKS_LOG_BUFFER_MAX_SZ(
377 log_buffer_, 3072,
378 "[%s] Universal: sorted runs: %" ROCKSDB_PRIszt " files: %s\n",
379 cf_name_.c_str(), sorted_runs_.size(), vstorage_->LevelSummary(&tmp));
380
381 Compaction* c = nullptr;
382 // Periodic compaction has higher priority than other type of compaction
383 // because it's a hard requirement.
384 if (!vstorage_->FilesMarkedForPeriodicCompaction().empty()) {
385 // Always need to do a full compaction for periodic compaction.
386 c = PickPeriodicCompaction();
387 }
388
389 // Check for size amplification.
390 if (c == nullptr &&
391 sorted_runs_.size() >=
392 static_cast<size_t>(
393 mutable_cf_options_.level0_file_num_compaction_trigger)) {
394 if ((c = PickCompactionToReduceSizeAmp()) != nullptr) {
395 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: compacting for size amp\n",
396 cf_name_.c_str());
397 } else {
398 // Size amplification is within limits. Try reducing read
399 // amplification while maintaining file size ratios.
400 unsigned int ratio =
401 mutable_cf_options_.compaction_options_universal.size_ratio;
402
403 if ((c = PickCompactionToReduceSortedRuns(ratio, UINT_MAX)) != nullptr) {
404 ROCKS_LOG_BUFFER(log_buffer_,
405 "[%s] Universal: compacting for size ratio\n",
406 cf_name_.c_str());
407 } else {
408 // Size amplification and file size ratios are within configured limits.
409 // If max read amplification is exceeding configured limits, then force
410 // compaction without looking at filesize ratios and try to reduce
411 // the number of files to fewer than level0_file_num_compaction_trigger.
412 // This is guaranteed by NeedsCompaction()
413 assert(sorted_runs_.size() >=
414 static_cast<size_t>(
415 mutable_cf_options_.level0_file_num_compaction_trigger));
416 // Get the total number of sorted runs that are not being compacted
417 int num_sr_not_compacted = 0;
418 for (size_t i = 0; i < sorted_runs_.size(); i++) {
419 if (sorted_runs_[i].being_compacted == false) {
420 num_sr_not_compacted++;
421 }
422 }
423
424 // The number of sorted runs that are not being compacted is greater
425 // than the maximum allowed number of sorted runs
426 if (num_sr_not_compacted >
427 mutable_cf_options_.level0_file_num_compaction_trigger) {
428 unsigned int num_files =
429 num_sr_not_compacted -
430 mutable_cf_options_.level0_file_num_compaction_trigger + 1;
431 if ((c = PickCompactionToReduceSortedRuns(UINT_MAX, num_files)) !=
432 nullptr) {
433 ROCKS_LOG_BUFFER(log_buffer_,
434 "[%s] Universal: compacting for file num -- %u\n",
435 cf_name_.c_str(), num_files);
436 }
437 }
438 }
439 }
440 }
441
442 if (c == nullptr) {
443 if ((c = PickDeleteTriggeredCompaction()) != nullptr) {
444 ROCKS_LOG_BUFFER(log_buffer_,
445 "[%s] Universal: delete triggered compaction\n",
446 cf_name_.c_str());
447 }
448 }
449
450 if (c == nullptr) {
451 TEST_SYNC_POINT_CALLBACK(
452 "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
453 return nullptr;
454 }
455
456 if (mutable_cf_options_.compaction_options_universal.allow_trivial_move ==
457 true &&
458 c->compaction_reason() != CompactionReason::kPeriodicCompaction) {
459 c->set_is_trivial_move(IsInputFilesNonOverlapping(c));
460 }
461
462 // validate that all the chosen files of L0 are non overlapping in time
463 #ifndef NDEBUG
464 bool is_first = true;
465
466 size_t level_index = 0U;
467 if (c->start_level() == 0) {
468 for (auto f : *c->inputs(0)) {
469 assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
470 if (is_first) {
471 is_first = false;
472 }
473 }
474 level_index = 1U;
475 }
476 for (; level_index < c->num_input_levels(); level_index++) {
477 if (c->num_input_files(level_index) != 0) {
478 SequenceNumber smallest_seqno = 0U;
479 SequenceNumber largest_seqno = 0U;
480 GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
481 &largest_seqno);
482 if (is_first) {
483 is_first = false;
484 }
485 }
486 }
487 #endif
488 // update statistics
489 RecordInHistogram(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION,
490 c->inputs(0)->size());
491
492 picker_->RegisterCompaction(c);
493 vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
494
495 TEST_SYNC_POINT_CALLBACK("UniversalCompactionBuilder::PickCompaction:Return",
496 c);
497 return c;
498 }
499
500 uint32_t UniversalCompactionBuilder::GetPathId(
501 const ImmutableCFOptions& ioptions,
502 const MutableCFOptions& mutable_cf_options, uint64_t file_size) {
503 // Two conditions need to be satisfied:
504 // (1) the target path needs to be able to hold the file's size
505 // (2) Total size left in this and previous paths need to be not
506 // smaller than expected future file size before this new file is
507 // compacted, which is estimated based on size_ratio.
508 // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
509 // we will make sure the target file, probably with size of 16, will be
510 // placed in a path so that eventually when new files are generated and
511 // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
512 // before the path we chose.
513 //
514 // TODO(sdong): now the case of multiple column families is not
515 // considered in this algorithm. So the target size can be violated in
516 // that case. We need to improve it.
517 uint64_t accumulated_size = 0;
518 uint64_t future_size =
519 file_size *
520 (100 - mutable_cf_options.compaction_options_universal.size_ratio) / 100;
521 uint32_t p = 0;
522 assert(!ioptions.cf_paths.empty());
523 for (; p < ioptions.cf_paths.size() - 1; p++) {
524 uint64_t target_size = ioptions.cf_paths[p].target_size;
525 if (target_size > file_size &&
526 accumulated_size + (target_size - file_size) > future_size) {
527 return p;
528 }
529 accumulated_size += target_size;
530 }
531 return p;
532 }
533
534 //
535 // Consider compaction files based on their size differences with
536 // the next file in time order.
537 //
538 Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
539 unsigned int ratio, unsigned int max_number_of_files_to_compact) {
540 unsigned int min_merge_width =
541 mutable_cf_options_.compaction_options_universal.min_merge_width;
542 unsigned int max_merge_width =
543 mutable_cf_options_.compaction_options_universal.max_merge_width;
544
545 const SortedRun* sr = nullptr;
546 bool done = false;
547 size_t start_index = 0;
548 unsigned int candidate_count = 0;
549
550 unsigned int max_files_to_compact =
551 std::min(max_merge_width, max_number_of_files_to_compact);
552 min_merge_width = std::max(min_merge_width, 2U);
553
554 // Caller checks the size before executing this function. This invariant is
555 // important because otherwise we may have a possible integer underflow when
556 // dealing with unsigned types.
557 assert(sorted_runs_.size() > 0);
558
559 // Considers a candidate file only if it is smaller than the
560 // total size accumulated so far.
561 for (size_t loop = 0; loop < sorted_runs_.size(); loop++) {
562 candidate_count = 0;
563
564 // Skip files that are already being compacted
565 for (sr = nullptr; loop < sorted_runs_.size(); loop++) {
566 sr = &sorted_runs_[loop];
567
568 if (!sr->being_compacted) {
569 candidate_count = 1;
570 break;
571 }
572 char file_num_buf[kFormatFileNumberBufSize];
573 sr->Dump(file_num_buf, sizeof(file_num_buf));
574 ROCKS_LOG_BUFFER(log_buffer_,
575 "[%s] Universal: %s"
576 "[%d] being compacted, skipping",
577 cf_name_.c_str(), file_num_buf, loop);
578
579 sr = nullptr;
580 }
581
582 // This file is not being compacted. Consider it as the
583 // first candidate to be compacted.
584 uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0;
585 if (sr != nullptr) {
586 char file_num_buf[kFormatFileNumberBufSize];
587 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
588 ROCKS_LOG_BUFFER(log_buffer_,
589 "[%s] Universal: Possible candidate %s[%d].",
590 cf_name_.c_str(), file_num_buf, loop);
591 }
592
593 // Check if the succeeding files need compaction.
594 for (size_t i = loop + 1;
595 candidate_count < max_files_to_compact && i < sorted_runs_.size();
596 i++) {
597 const SortedRun* succeeding_sr = &sorted_runs_[i];
598 if (succeeding_sr->being_compacted) {
599 break;
600 }
601 // Pick files if the total/last candidate file size (increased by the
602 // specified ratio) is still larger than the next candidate file.
603 // candidate_size is the total size of files picked so far with the
604 // default kCompactionStopStyleTotalSize; with
605 // kCompactionStopStyleSimilarSize, it's simply the size of the last
606 // picked file.
607 double sz = candidate_size * (100.0 + ratio) / 100.0;
608 if (sz < static_cast<double>(succeeding_sr->size)) {
609 break;
610 }
611 if (mutable_cf_options_.compaction_options_universal.stop_style ==
612 kCompactionStopStyleSimilarSize) {
613 // Similar-size stopping rule: also check the last picked file isn't
614 // far larger than the next candidate file.
615 sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0;
616 if (sz < static_cast<double>(candidate_size)) {
617 // If the small file we've encountered begins a run of similar-size
618 // files, we'll pick them up on a future iteration of the outer
619 // loop. If it's some lonely straggler, it'll eventually get picked
620 // by the last-resort read amp strategy which disregards size ratios.
621 break;
622 }
623 candidate_size = succeeding_sr->compensated_file_size;
624 } else { // default kCompactionStopStyleTotalSize
625 candidate_size += succeeding_sr->compensated_file_size;
626 }
627 candidate_count++;
628 }
629
630 // Found a series of consecutive files that need compaction.
631 if (candidate_count >= (unsigned int)min_merge_width) {
632 start_index = loop;
633 done = true;
634 break;
635 } else {
636 for (size_t i = loop;
637 i < loop + candidate_count && i < sorted_runs_.size(); i++) {
638 const SortedRun* skipping_sr = &sorted_runs_[i];
639 char file_num_buf[256];
640 skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
641 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Skipping %s",
642 cf_name_.c_str(), file_num_buf);
643 }
644 }
645 }
646 if (!done || candidate_count <= 1) {
647 return nullptr;
648 }
649 size_t first_index_after = start_index + candidate_count;
650 // Compression is enabled if files compacted earlier already reached
651 // size ratio of compression.
652 bool enable_compression = true;
653 int ratio_to_compress =
654 mutable_cf_options_.compaction_options_universal.compression_size_percent;
655 if (ratio_to_compress >= 0) {
656 uint64_t total_size = 0;
657 for (auto& sorted_run : sorted_runs_) {
658 total_size += sorted_run.compensated_file_size;
659 }
660
661 uint64_t older_file_size = 0;
662 for (size_t i = sorted_runs_.size() - 1; i >= first_index_after; i--) {
663 older_file_size += sorted_runs_[i].size;
664 if (older_file_size * 100L >= total_size * (long)ratio_to_compress) {
665 enable_compression = false;
666 break;
667 }
668 }
669 }
670
671 uint64_t estimated_total_size = 0;
672 for (unsigned int i = 0; i < first_index_after; i++) {
673 estimated_total_size += sorted_runs_[i].size;
674 }
675 uint32_t path_id =
676 GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
677 int start_level = sorted_runs_[start_index].level;
678 int output_level;
679 if (first_index_after == sorted_runs_.size()) {
680 output_level = vstorage_->num_levels() - 1;
681 } else if (sorted_runs_[first_index_after].level == 0) {
682 output_level = 0;
683 } else {
684 output_level = sorted_runs_[first_index_after].level - 1;
685 }
686
687 // last level is reserved for the files ingested behind
688 if (ioptions_.allow_ingest_behind &&
689 (output_level == vstorage_->num_levels() - 1)) {
690 assert(output_level > 1);
691 output_level--;
692 }
693
694 std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
695 for (size_t i = 0; i < inputs.size(); ++i) {
696 inputs[i].level = start_level + static_cast<int>(i);
697 }
698 for (size_t i = start_index; i < first_index_after; i++) {
699 auto& picking_sr = sorted_runs_[i];
700 if (picking_sr.level == 0) {
701 FileMetaData* picking_file = picking_sr.file;
702 inputs[0].files.push_back(picking_file);
703 } else {
704 auto& files = inputs[picking_sr.level - start_level].files;
705 for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
706 files.push_back(f);
707 }
708 }
709 char file_num_buf[256];
710 picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
711 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Picking %s",
712 cf_name_.c_str(), file_num_buf);
713 }
714
715 CompactionReason compaction_reason;
716 if (max_number_of_files_to_compact == UINT_MAX) {
717 compaction_reason = CompactionReason::kUniversalSizeRatio;
718 } else {
719 compaction_reason = CompactionReason::kUniversalSortedRunNum;
720 }
721 return new Compaction(
722 vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
723 std::move(inputs), output_level,
724 MaxFileSizeForLevel(mutable_cf_options_, output_level,
725 kCompactionStyleUniversal),
726 LLONG_MAX, path_id,
727 GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
728 1, enable_compression),
729 GetCompressionOptions(mutable_cf_options_, vstorage_, start_level,
730 enable_compression),
731 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
732 score_, false /* deletion_compaction */, compaction_reason);
733 }
734
735 // Look at overall size amplification. If size amplification
736 // exceeeds the configured value, then do a compaction
737 // of the candidate files all the way upto the earliest
738 // base file (overrides configured values of file-size ratios,
739 // min_merge_width and max_merge_width).
740 //
741 Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
742 // percentage flexibility while reducing size amplification
743 uint64_t ratio = mutable_cf_options_.compaction_options_universal
744 .max_size_amplification_percent;
745
746 unsigned int candidate_count = 0;
747 uint64_t candidate_size = 0;
748 size_t start_index = 0;
749 const SortedRun* sr = nullptr;
750
751 assert(!sorted_runs_.empty());
752 if (sorted_runs_.back().being_compacted) {
753 return nullptr;
754 }
755
756 // Skip files that are already being compacted
757 for (size_t loop = 0; loop + 1 < sorted_runs_.size(); loop++) {
758 sr = &sorted_runs_[loop];
759 if (!sr->being_compacted) {
760 start_index = loop; // Consider this as the first candidate.
761 break;
762 }
763 char file_num_buf[kFormatFileNumberBufSize];
764 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
765 ROCKS_LOG_BUFFER(log_buffer_,
766 "[%s] Universal: skipping %s[%d] compacted %s",
767 cf_name_.c_str(), file_num_buf, loop,
768 " cannot be a candidate to reduce size amp.\n");
769 sr = nullptr;
770 }
771
772 if (sr == nullptr) {
773 return nullptr; // no candidate files
774 }
775 {
776 char file_num_buf[kFormatFileNumberBufSize];
777 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
778 ROCKS_LOG_BUFFER(
779 log_buffer_,
780 "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s",
781 cf_name_.c_str(), file_num_buf, start_index, " to reduce size amp.\n");
782 }
783
784 // keep adding up all the remaining files
785 for (size_t loop = start_index; loop + 1 < sorted_runs_.size(); loop++) {
786 sr = &sorted_runs_[loop];
787 if (sr->being_compacted) {
788 char file_num_buf[kFormatFileNumberBufSize];
789 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
790 ROCKS_LOG_BUFFER(
791 log_buffer_, "[%s] Universal: Possible candidate %s[%d] %s",
792 cf_name_.c_str(), file_num_buf, start_index,
793 " is already being compacted. No size amp reduction possible.\n");
794 return nullptr;
795 }
796 candidate_size += sr->compensated_file_size;
797 candidate_count++;
798 }
799 if (candidate_count == 0) {
800 return nullptr;
801 }
802
803 // size of earliest file
804 uint64_t earliest_file_size = sorted_runs_.back().size;
805
806 // size amplification = percentage of additional size
807 if (candidate_size * 100 < ratio * earliest_file_size) {
808 ROCKS_LOG_BUFFER(
809 log_buffer_,
810 "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
811 " earliest-file-size %" PRIu64,
812 cf_name_.c_str(), candidate_size, earliest_file_size);
813 return nullptr;
814 } else {
815 ROCKS_LOG_BUFFER(
816 log_buffer_,
817 "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
818 " earliest-file-size %" PRIu64,
819 cf_name_.c_str(), candidate_size, earliest_file_size);
820 }
821 return PickCompactionToOldest(start_index,
822 CompactionReason::kUniversalSizeAmplification);
823 }
824
825 // Pick files marked for compaction. Typically, files are marked by
826 // CompactOnDeleteCollector due to the presence of tombstones.
827 Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
828 CompactionInputFiles start_level_inputs;
829 int output_level;
830 std::vector<CompactionInputFiles> inputs;
831
832 if (vstorage_->num_levels() == 1) {
833 // This is single level universal. Since we're basically trying to reclaim
834 // space by processing files marked for compaction due to high tombstone
835 // density, let's do the same thing as compaction to reduce size amp which
836 // has the same goals.
837 int start_index = -1;
838
839 start_level_inputs.level = 0;
840 start_level_inputs.files.clear();
841 output_level = 0;
842 // Find the first file marked for compaction. Ignore the last file
843 for (size_t loop = 0; loop + 1 < sorted_runs_.size(); loop++) {
844 SortedRun* sr = &sorted_runs_[loop];
845 if (sr->being_compacted) {
846 continue;
847 }
848 FileMetaData* f = vstorage_->LevelFiles(0)[loop];
849 if (f->marked_for_compaction) {
850 start_level_inputs.files.push_back(f);
851 start_index =
852 static_cast<int>(loop); // Consider this as the first candidate.
853 break;
854 }
855 }
856 if (start_index < 0) {
857 // Either no file marked, or they're already being compacted
858 return nullptr;
859 }
860
861 for (size_t loop = start_index + 1; loop < sorted_runs_.size(); loop++) {
862 SortedRun* sr = &sorted_runs_[loop];
863 if (sr->being_compacted) {
864 break;
865 }
866
867 FileMetaData* f = vstorage_->LevelFiles(0)[loop];
868 start_level_inputs.files.push_back(f);
869 }
870 if (start_level_inputs.size() <= 1) {
871 // If only the last file in L0 is marked for compaction, ignore it
872 return nullptr;
873 }
874 inputs.push_back(start_level_inputs);
875 } else {
876 int start_level;
877
878 // For multi-level universal, the strategy is to make this look more like
879 // leveled. We pick one of the files marked for compaction and compact with
880 // overlapping files in the adjacent level.
881 picker_->PickFilesMarkedForCompaction(cf_name_, vstorage_, &start_level,
882 &output_level, &start_level_inputs);
883 if (start_level_inputs.empty()) {
884 return nullptr;
885 }
886
887 // Pick the first non-empty level after the start_level
888 for (output_level = start_level + 1; output_level < vstorage_->num_levels();
889 output_level++) {
890 if (vstorage_->NumLevelFiles(output_level) != 0) {
891 break;
892 }
893 }
894
895 // If all higher levels are empty, pick the highest level as output level
896 if (output_level == vstorage_->num_levels()) {
897 if (start_level == 0) {
898 output_level = vstorage_->num_levels() - 1;
899 } else {
900 // If start level is non-zero and all higher levels are empty, this
901 // compaction will translate into a trivial move. Since the idea is
902 // to reclaim space and trivial move doesn't help with that, we
903 // skip compaction in this case and return nullptr
904 return nullptr;
905 }
906 }
907 if (ioptions_.allow_ingest_behind &&
908 output_level == vstorage_->num_levels() - 1) {
909 assert(output_level > 1);
910 output_level--;
911 }
912
913 if (output_level != 0) {
914 if (start_level == 0) {
915 if (!picker_->GetOverlappingL0Files(vstorage_, &start_level_inputs,
916 output_level, nullptr)) {
917 return nullptr;
918 }
919 }
920
921 CompactionInputFiles output_level_inputs;
922 int parent_index = -1;
923
924 output_level_inputs.level = output_level;
925 if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_,
926 &start_level_inputs, &output_level_inputs,
927 &parent_index, -1)) {
928 return nullptr;
929 }
930 inputs.push_back(start_level_inputs);
931 if (!output_level_inputs.empty()) {
932 inputs.push_back(output_level_inputs);
933 }
934 if (picker_->FilesRangeOverlapWithCompaction(inputs, output_level)) {
935 return nullptr;
936 }
937 } else {
938 inputs.push_back(start_level_inputs);
939 }
940 }
941
942 uint64_t estimated_total_size = 0;
943 // Use size of the output level as estimated file size
944 for (FileMetaData* f : vstorage_->LevelFiles(output_level)) {
945 estimated_total_size += f->fd.GetFileSize();
946 }
947 uint32_t path_id =
948 GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
949 return new Compaction(
950 vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
951 std::move(inputs), output_level,
952 MaxFileSizeForLevel(mutable_cf_options_, output_level,
953 kCompactionStyleUniversal),
954 /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
955 GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
956 output_level, 1),
957 GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
958 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
959 score_, false /* deletion_compaction */,
960 CompactionReason::kFilesMarkedForCompaction);
961 }
962
963 Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
964 size_t start_index, CompactionReason compaction_reason) {
965 assert(start_index < sorted_runs_.size());
966
967 // Estimate total file size
968 uint64_t estimated_total_size = 0;
969 for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) {
970 estimated_total_size += sorted_runs_[loop].size;
971 }
972 uint32_t path_id =
973 GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
974 int start_level = sorted_runs_[start_index].level;
975
976 std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
977 for (size_t i = 0; i < inputs.size(); ++i) {
978 inputs[i].level = start_level + static_cast<int>(i);
979 }
980 for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) {
981 auto& picking_sr = sorted_runs_[loop];
982 if (picking_sr.level == 0) {
983 FileMetaData* f = picking_sr.file;
984 inputs[0].files.push_back(f);
985 } else {
986 auto& files = inputs[picking_sr.level - start_level].files;
987 for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
988 files.push_back(f);
989 }
990 }
991 std::string comp_reason_print_string;
992 if (compaction_reason == CompactionReason::kPeriodicCompaction) {
993 comp_reason_print_string = "periodic compaction";
994 } else if (compaction_reason ==
995 CompactionReason::kUniversalSizeAmplification) {
996 comp_reason_print_string = "size amp";
997 } else {
998 assert(false);
999 comp_reason_print_string = "unknown: ";
1000 comp_reason_print_string.append(
1001 std::to_string(static_cast<int>(compaction_reason)));
1002 }
1003
1004 char file_num_buf[256];
1005 picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
1006 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: %s picking %s",
1007 cf_name_.c_str(), comp_reason_print_string.c_str(),
1008 file_num_buf);
1009 }
1010
1011 // output files at the bottom most level, unless it's reserved
1012 int output_level = vstorage_->num_levels() - 1;
1013 // last level is reserved for the files ingested behind
1014 if (ioptions_.allow_ingest_behind) {
1015 assert(output_level > 1);
1016 output_level--;
1017 }
1018
1019 // We never check size for
1020 // compaction_options_universal.compression_size_percent,
1021 // because we always compact all the files, so always compress.
1022 return new Compaction(
1023 vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
1024 std::move(inputs), output_level,
1025 MaxFileSizeForLevel(mutable_cf_options_, output_level,
1026 kCompactionStyleUniversal),
1027 LLONG_MAX, path_id,
1028 GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
1029 output_level, 1, true /* enable_compression */),
1030 GetCompressionOptions(mutable_cf_options_, vstorage_, output_level,
1031 true /* enable_compression */),
1032 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
1033 score_, false /* deletion_compaction */, compaction_reason);
1034 }
1035
1036 Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() {
1037 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Periodic Compaction",
1038 cf_name_.c_str());
1039
1040 // In universal compaction, sorted runs contain older data are almost always
1041 // generated earlier too. To simplify the problem, we just try to trigger
1042 // a full compaction. We start from the oldest sorted run and include
1043 // all sorted runs, until we hit a sorted already being compacted.
1044 // Since usually the largest (which is usually the oldest) sorted run is
1045 // included anyway, doing a full compaction won't increase write
1046 // amplification much.
1047
1048 // Get some information from marked files to check whether a file is
1049 // included in the compaction.
1050
1051 size_t start_index = sorted_runs_.size();
1052 while (start_index > 0 && !sorted_runs_[start_index - 1].being_compacted) {
1053 start_index--;
1054 }
1055 if (start_index == sorted_runs_.size()) {
1056 return nullptr;
1057 }
1058
1059 // There is a rare corner case where we can't pick up all the files
1060 // because some files are being compacted and we end up with picking files
1061 // but none of them need periodic compaction. Unless we simply recompact
1062 // the last sorted run (either the last level or last L0 file), we would just
1063 // execute the compaction, in order to simplify the logic.
1064 if (start_index == sorted_runs_.size() - 1) {
1065 bool included_file_marked = false;
1066 int start_level = sorted_runs_[start_index].level;
1067 FileMetaData* start_file = sorted_runs_[start_index].file;
1068 for (const std::pair<int, FileMetaData*>& level_file_pair :
1069 vstorage_->FilesMarkedForPeriodicCompaction()) {
1070 if (start_level != 0) {
1071 // Last sorted run is a level
1072 if (start_level == level_file_pair.first) {
1073 included_file_marked = true;
1074 break;
1075 }
1076 } else {
1077 // Last sorted run is a L0 file.
1078 if (start_file == level_file_pair.second) {
1079 included_file_marked = true;
1080 break;
1081 }
1082 }
1083 }
1084 if (!included_file_marked) {
1085 ROCKS_LOG_BUFFER(log_buffer_,
1086 "[%s] Universal: Cannot form a compaction covering file "
1087 "marked for periodic compaction",
1088 cf_name_.c_str());
1089 return nullptr;
1090 }
1091 }
1092
1093 Compaction* c = PickCompactionToOldest(start_index,
1094 CompactionReason::kPeriodicCompaction);
1095
1096 TEST_SYNC_POINT_CALLBACK(
1097 "UniversalCompactionPicker::PickPeriodicCompaction:Return", c);
1098
1099 return c;
1100 }
1101 } // namespace ROCKSDB_NAMESPACE
1102
1103 #endif // !ROCKSDB_LITE