]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction_picker_universal.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / compaction_picker_universal.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/compaction_picker_universal.h"
11 #ifndef ROCKSDB_LITE
12
13 #ifndef __STDC_FORMAT_MACROS
14 #define __STDC_FORMAT_MACROS
15 #endif
16
17 #include <inttypes.h>
18 #include <limits>
19 #include <queue>
20 #include <string>
21 #include <utility>
22 #include "db/column_family.h"
23 #include "monitoring/statistics.h"
24 #include "util/filename.h"
25 #include "util/log_buffer.h"
26 #include "util/random.h"
27 #include "util/string_util.h"
28 #include "util/sync_point.h"
29
30 namespace rocksdb {
31 namespace {
32 // Used in universal compaction when trivial move is enabled.
33 // This structure is used for the construction of min heap
34 // that contains the file meta data, the level of the file
35 // and the index of the file in that level
36
37 struct InputFileInfo {
38 InputFileInfo() : f(nullptr), level(0), index(0) {}
39
40 FileMetaData* f;
41 size_t level;
42 size_t index;
43 };
44
45 // Used in universal compaction when trivial move is enabled.
46 // This comparator is used for the construction of min heap
47 // based on the smallest key of the file.
48 struct SmallestKeyHeapComparator {
49 explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
50
51 bool operator()(InputFileInfo i1, InputFileInfo i2) const {
52 return (ucmp_->Compare(i1.f->smallest.user_key(),
53 i2.f->smallest.user_key()) > 0);
54 }
55
56 private:
57 const Comparator* ucmp_;
58 };
59
60 typedef std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
61 SmallestKeyHeapComparator>
62 SmallestKeyHeap;
63
64 // This function creates the heap that is used to find if the files are
65 // overlapping during universal compaction when the allow_trivial_move
66 // is set.
67 SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
68 SmallestKeyHeap smallest_key_priority_q =
69 SmallestKeyHeap(SmallestKeyHeapComparator(ucmp));
70
71 InputFileInfo input_file;
72
73 for (size_t l = 0; l < c->num_input_levels(); l++) {
74 if (c->num_input_files(l) != 0) {
75 if (l == 0 && c->start_level() == 0) {
76 for (size_t i = 0; i < c->num_input_files(0); i++) {
77 input_file.f = c->input(0, i);
78 input_file.level = 0;
79 input_file.index = i;
80 smallest_key_priority_q.push(std::move(input_file));
81 }
82 } else {
83 input_file.f = c->input(l, 0);
84 input_file.level = l;
85 input_file.index = 0;
86 smallest_key_priority_q.push(std::move(input_file));
87 }
88 }
89 }
90 return smallest_key_priority_q;
91 }
92
93 #ifndef NDEBUG
94 // smallest_seqno and largest_seqno are set iff. `files` is not empty.
95 void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
96 SequenceNumber* smallest_seqno,
97 SequenceNumber* largest_seqno) {
98 bool is_first = true;
99 for (FileMetaData* f : files) {
100 assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
101 if (is_first) {
102 is_first = false;
103 *smallest_seqno = f->fd.smallest_seqno;
104 *largest_seqno = f->fd.largest_seqno;
105 } else {
106 if (f->fd.smallest_seqno < *smallest_seqno) {
107 *smallest_seqno = f->fd.smallest_seqno;
108 }
109 if (f->fd.largest_seqno > *largest_seqno) {
110 *largest_seqno = f->fd.largest_seqno;
111 }
112 }
113 }
114 }
115 #endif
116 } // namespace
117
118 // Algorithm that checks to see if there are any overlapping
119 // files in the input
120 bool UniversalCompactionPicker::IsInputFilesNonOverlapping(Compaction* c) {
121 auto comparator = icmp_->user_comparator();
122 int first_iter = 1;
123
124 InputFileInfo prev, curr, next;
125
126 SmallestKeyHeap smallest_key_priority_q =
127 create_level_heap(c, icmp_->user_comparator());
128
129 while (!smallest_key_priority_q.empty()) {
130 curr = smallest_key_priority_q.top();
131 smallest_key_priority_q.pop();
132
133 if (first_iter) {
134 prev = curr;
135 first_iter = 0;
136 } else {
137 if (comparator->Compare(prev.f->largest.user_key(),
138 curr.f->smallest.user_key()) >= 0) {
139 // found overlapping files, return false
140 return false;
141 }
142 assert(comparator->Compare(curr.f->largest.user_key(),
143 prev.f->largest.user_key()) > 0);
144 prev = curr;
145 }
146
147 next.f = nullptr;
148
149 if (curr.level != 0 && curr.index < c->num_input_files(curr.level) - 1) {
150 next.f = c->input(curr.level, curr.index + 1);
151 next.level = curr.level;
152 next.index = curr.index + 1;
153 }
154
155 if (next.f) {
156 smallest_key_priority_q.push(std::move(next));
157 }
158 }
159 return true;
160 }
161
162 bool UniversalCompactionPicker::NeedsCompaction(
163 const VersionStorageInfo* vstorage) const {
164 const int kLevel0 = 0;
165 if (vstorage->CompactionScore(kLevel0) >= 1) {
166 return true;
167 }
168 if (!vstorage->FilesMarkedForCompaction().empty()) {
169 return true;
170 }
171 return false;
172 }
173
174 void UniversalCompactionPicker::SortedRun::Dump(char* out_buf,
175 size_t out_buf_size,
176 bool print_path) const {
177 if (level == 0) {
178 assert(file != nullptr);
179 if (file->fd.GetPathId() == 0 || !print_path) {
180 snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
181 } else {
182 snprintf(out_buf, out_buf_size, "file %" PRIu64
183 "(path "
184 "%" PRIu32 ")",
185 file->fd.GetNumber(), file->fd.GetPathId());
186 }
187 } else {
188 snprintf(out_buf, out_buf_size, "level %d", level);
189 }
190 }
191
192 void UniversalCompactionPicker::SortedRun::DumpSizeInfo(
193 char* out_buf, size_t out_buf_size, size_t sorted_run_count) const {
194 if (level == 0) {
195 assert(file != nullptr);
196 snprintf(out_buf, out_buf_size,
197 "file %" PRIu64 "[%" ROCKSDB_PRIszt
198 "] "
199 "with size %" PRIu64 " (compensated size %" PRIu64 ")",
200 file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(),
201 file->compensated_file_size);
202 } else {
203 snprintf(out_buf, out_buf_size,
204 "level %d[%" ROCKSDB_PRIszt
205 "] "
206 "with size %" PRIu64 " (compensated size %" PRIu64 ")",
207 level, sorted_run_count, size, compensated_file_size);
208 }
209 }
210
211 std::vector<UniversalCompactionPicker::SortedRun>
212 UniversalCompactionPicker::CalculateSortedRuns(
213 const VersionStorageInfo& vstorage, const ImmutableCFOptions& /*ioptions*/,
214 const MutableCFOptions& mutable_cf_options) {
215 std::vector<UniversalCompactionPicker::SortedRun> ret;
216 for (FileMetaData* f : vstorage.LevelFiles(0)) {
217 ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
218 f->being_compacted);
219 }
220 for (int level = 1; level < vstorage.num_levels(); level++) {
221 uint64_t total_compensated_size = 0U;
222 uint64_t total_size = 0U;
223 bool being_compacted = false;
224 bool is_first = true;
225 for (FileMetaData* f : vstorage.LevelFiles(level)) {
226 total_compensated_size += f->compensated_file_size;
227 total_size += f->fd.GetFileSize();
228 if (mutable_cf_options.compaction_options_universal.allow_trivial_move ==
229 true) {
230 if (f->being_compacted) {
231 being_compacted = f->being_compacted;
232 }
233 } else {
234 // Compaction always includes all files for a non-zero level, so for a
235 // non-zero level, all the files should share the same being_compacted
236 // value.
237 // This assumption is only valid when
238 // mutable_cf_options.compaction_options_universal.allow_trivial_move is
239 // false
240 assert(is_first || f->being_compacted == being_compacted);
241 }
242 if (is_first) {
243 being_compacted = f->being_compacted;
244 is_first = false;
245 }
246 }
247 if (total_compensated_size > 0) {
248 ret.emplace_back(level, nullptr, total_size, total_compensated_size,
249 being_compacted);
250 }
251 }
252 return ret;
253 }
254
255 // Universal style of compaction. Pick files that are contiguous in
256 // time-range to compact.
257 Compaction* UniversalCompactionPicker::PickCompaction(
258 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
259 VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
260 const int kLevel0 = 0;
261 double score = vstorage->CompactionScore(kLevel0);
262 std::vector<SortedRun> sorted_runs =
263 CalculateSortedRuns(*vstorage, ioptions_, mutable_cf_options);
264
265 if (sorted_runs.size() == 0 ||
266 (vstorage->FilesMarkedForCompaction().empty() &&
267 sorted_runs.size() < (unsigned int)mutable_cf_options
268 .level0_file_num_compaction_trigger)) {
269 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n",
270 cf_name.c_str());
271 TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
272 nullptr);
273 return nullptr;
274 }
275 VersionStorageInfo::LevelSummaryStorage tmp;
276 ROCKS_LOG_BUFFER_MAX_SZ(
277 log_buffer, 3072,
278 "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n",
279 cf_name.c_str(), sorted_runs.size(), vstorage->LevelSummary(&tmp));
280
281 // Check for size amplification first.
282 Compaction* c = nullptr;
283 if (sorted_runs.size() >=
284 static_cast<size_t>(
285 mutable_cf_options.level0_file_num_compaction_trigger)) {
286 if ((c = PickCompactionToReduceSizeAmp(cf_name, mutable_cf_options,
287 vstorage, score, sorted_runs,
288 log_buffer)) != nullptr) {
289 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: compacting for size amp\n",
290 cf_name.c_str());
291 } else {
292 // Size amplification is within limits. Try reducing read
293 // amplification while maintaining file size ratios.
294 unsigned int ratio =
295 mutable_cf_options.compaction_options_universal.size_ratio;
296
297 if ((c = PickCompactionToReduceSortedRuns(
298 cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX,
299 sorted_runs, log_buffer)) != nullptr) {
300 ROCKS_LOG_BUFFER(log_buffer,
301 "[%s] Universal: compacting for size ratio\n",
302 cf_name.c_str());
303 } else {
304 // Size amplification and file size ratios are within configured limits.
305 // If max read amplification is exceeding configured limits, then force
306 // compaction without looking at filesize ratios and try to reduce
307 // the number of files to fewer than level0_file_num_compaction_trigger.
308 // This is guaranteed by NeedsCompaction()
309 assert(sorted_runs.size() >=
310 static_cast<size_t>(
311 mutable_cf_options.level0_file_num_compaction_trigger));
312 // Get the total number of sorted runs that are not being compacted
313 int num_sr_not_compacted = 0;
314 for (size_t i = 0; i < sorted_runs.size(); i++) {
315 if (sorted_runs[i].being_compacted == false) {
316 num_sr_not_compacted++;
317 }
318 }
319
320 // The number of sorted runs that are not being compacted is greater
321 // than the maximum allowed number of sorted runs
322 if (num_sr_not_compacted >
323 mutable_cf_options.level0_file_num_compaction_trigger) {
324 unsigned int num_files =
325 num_sr_not_compacted -
326 mutable_cf_options.level0_file_num_compaction_trigger + 1;
327 if ((c = PickCompactionToReduceSortedRuns(
328 cf_name, mutable_cf_options, vstorage, score, UINT_MAX,
329 num_files, sorted_runs, log_buffer)) != nullptr) {
330 ROCKS_LOG_BUFFER(log_buffer,
331 "[%s] Universal: compacting for file num -- %u\n",
332 cf_name.c_str(), num_files);
333 }
334 }
335 }
336 }
337 }
338
339 if (c == nullptr) {
340 if ((c = PickDeleteTriggeredCompaction(cf_name, mutable_cf_options,
341 vstorage, score, sorted_runs,
342 log_buffer)) != nullptr) {
343 ROCKS_LOG_BUFFER(log_buffer,
344 "[%s] Universal: delete triggered compaction\n",
345 cf_name.c_str());
346 }
347 }
348
349 if (c == nullptr) {
350 TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
351 nullptr);
352 return nullptr;
353 }
354
355 if (mutable_cf_options.compaction_options_universal.allow_trivial_move ==
356 true) {
357 c->set_is_trivial_move(IsInputFilesNonOverlapping(c));
358 }
359
360 // validate that all the chosen files of L0 are non overlapping in time
361 #ifndef NDEBUG
362 SequenceNumber prev_smallest_seqno = 0U;
363 bool is_first = true;
364
365 size_t level_index = 0U;
366 if (c->start_level() == 0) {
367 for (auto f : *c->inputs(0)) {
368 assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
369 if (is_first) {
370 is_first = false;
371 }
372 prev_smallest_seqno = f->fd.smallest_seqno;
373 }
374 level_index = 1U;
375 }
376 for (; level_index < c->num_input_levels(); level_index++) {
377 if (c->num_input_files(level_index) != 0) {
378 SequenceNumber smallest_seqno = 0U;
379 SequenceNumber largest_seqno = 0U;
380 GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
381 &largest_seqno);
382 if (is_first) {
383 is_first = false;
384 } else if (prev_smallest_seqno > 0) {
385 // A level is considered as the bottommost level if there are
386 // no files in higher levels or if files in higher levels do
387 // not overlap with the files being compacted. Sequence numbers
388 // of files in bottommost level can be set to 0 to help
389 // compression. As a result, the following assert may not hold
390 // if the prev_smallest_seqno is 0.
391 assert(prev_smallest_seqno > largest_seqno);
392 }
393 prev_smallest_seqno = smallest_seqno;
394 }
395 }
396 #endif
397 // update statistics
398 MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION,
399 c->inputs(0)->size());
400
401 RegisterCompaction(c);
402 vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options);
403
404 TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
405 c);
406 return c;
407 }
408
409 uint32_t UniversalCompactionPicker::GetPathId(
410 const ImmutableCFOptions& ioptions,
411 const MutableCFOptions& mutable_cf_options, uint64_t file_size) {
412 // Two conditions need to be satisfied:
413 // (1) the target path needs to be able to hold the file's size
414 // (2) Total size left in this and previous paths need to be not
415 // smaller than expected future file size before this new file is
416 // compacted, which is estimated based on size_ratio.
417 // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
418 // we will make sure the target file, probably with size of 16, will be
419 // placed in a path so that eventually when new files are generated and
420 // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
421 // before the path we chose.
422 //
423 // TODO(sdong): now the case of multiple column families is not
424 // considered in this algorithm. So the target size can be violated in
425 // that case. We need to improve it.
426 uint64_t accumulated_size = 0;
427 uint64_t future_size =
428 file_size *
429 (100 - mutable_cf_options.compaction_options_universal.size_ratio) / 100;
430 uint32_t p = 0;
431 assert(!ioptions.cf_paths.empty());
432 for (; p < ioptions.cf_paths.size() - 1; p++) {
433 uint64_t target_size = ioptions.cf_paths[p].target_size;
434 if (target_size > file_size &&
435 accumulated_size + (target_size - file_size) > future_size) {
436 return p;
437 }
438 accumulated_size += target_size;
439 }
440 return p;
441 }
442
443 //
444 // Consider compaction files based on their size differences with
445 // the next file in time order.
446 //
447 Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
448 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
449 VersionStorageInfo* vstorage, double score, unsigned int ratio,
450 unsigned int max_number_of_files_to_compact,
451 const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
452 unsigned int min_merge_width =
453 mutable_cf_options.compaction_options_universal.min_merge_width;
454 unsigned int max_merge_width =
455 mutable_cf_options.compaction_options_universal.max_merge_width;
456
457 const SortedRun* sr = nullptr;
458 bool done = false;
459 size_t start_index = 0;
460 unsigned int candidate_count = 0;
461
462 unsigned int max_files_to_compact =
463 std::min(max_merge_width, max_number_of_files_to_compact);
464 min_merge_width = std::max(min_merge_width, 2U);
465
466 // Caller checks the size before executing this function. This invariant is
467 // important because otherwise we may have a possible integer underflow when
468 // dealing with unsigned types.
469 assert(sorted_runs.size() > 0);
470
471 // Considers a candidate file only if it is smaller than the
472 // total size accumulated so far.
473 for (size_t loop = 0; loop < sorted_runs.size(); loop++) {
474 candidate_count = 0;
475
476 // Skip files that are already being compacted
477 for (sr = nullptr; loop < sorted_runs.size(); loop++) {
478 sr = &sorted_runs[loop];
479
480 if (!sr->being_compacted) {
481 candidate_count = 1;
482 break;
483 }
484 char file_num_buf[kFormatFileNumberBufSize];
485 sr->Dump(file_num_buf, sizeof(file_num_buf));
486 ROCKS_LOG_BUFFER(log_buffer,
487 "[%s] Universal: %s"
488 "[%d] being compacted, skipping",
489 cf_name.c_str(), file_num_buf, loop);
490
491 sr = nullptr;
492 }
493
494 // This file is not being compacted. Consider it as the
495 // first candidate to be compacted.
496 uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0;
497 if (sr != nullptr) {
498 char file_num_buf[kFormatFileNumberBufSize];
499 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
500 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Possible candidate %s[%d].",
501 cf_name.c_str(), file_num_buf, loop);
502 }
503
504 // Check if the succeeding files need compaction.
505 for (size_t i = loop + 1;
506 candidate_count < max_files_to_compact && i < sorted_runs.size();
507 i++) {
508 const SortedRun* succeeding_sr = &sorted_runs[i];
509 if (succeeding_sr->being_compacted) {
510 break;
511 }
512 // Pick files if the total/last candidate file size (increased by the
513 // specified ratio) is still larger than the next candidate file.
514 // candidate_size is the total size of files picked so far with the
515 // default kCompactionStopStyleTotalSize; with
516 // kCompactionStopStyleSimilarSize, it's simply the size of the last
517 // picked file.
518 double sz = candidate_size * (100.0 + ratio) / 100.0;
519 if (sz < static_cast<double>(succeeding_sr->size)) {
520 break;
521 }
522 if (mutable_cf_options.compaction_options_universal.stop_style ==
523 kCompactionStopStyleSimilarSize) {
524 // Similar-size stopping rule: also check the last picked file isn't
525 // far larger than the next candidate file.
526 sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0;
527 if (sz < static_cast<double>(candidate_size)) {
528 // If the small file we've encountered begins a run of similar-size
529 // files, we'll pick them up on a future iteration of the outer
530 // loop. If it's some lonely straggler, it'll eventually get picked
531 // by the last-resort read amp strategy which disregards size ratios.
532 break;
533 }
534 candidate_size = succeeding_sr->compensated_file_size;
535 } else { // default kCompactionStopStyleTotalSize
536 candidate_size += succeeding_sr->compensated_file_size;
537 }
538 candidate_count++;
539 }
540
541 // Found a series of consecutive files that need compaction.
542 if (candidate_count >= (unsigned int)min_merge_width) {
543 start_index = loop;
544 done = true;
545 break;
546 } else {
547 for (size_t i = loop;
548 i < loop + candidate_count && i < sorted_runs.size(); i++) {
549 const SortedRun* skipping_sr = &sorted_runs[i];
550 char file_num_buf[256];
551 skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
552 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Skipping %s",
553 cf_name.c_str(), file_num_buf);
554 }
555 }
556 }
557 if (!done || candidate_count <= 1) {
558 return nullptr;
559 }
560 size_t first_index_after = start_index + candidate_count;
561 // Compression is enabled if files compacted earlier already reached
562 // size ratio of compression.
563 bool enable_compression = true;
564 int ratio_to_compress =
565 mutable_cf_options.compaction_options_universal.compression_size_percent;
566 if (ratio_to_compress >= 0) {
567 uint64_t total_size = 0;
568 for (auto& sorted_run : sorted_runs) {
569 total_size += sorted_run.compensated_file_size;
570 }
571
572 uint64_t older_file_size = 0;
573 for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) {
574 older_file_size += sorted_runs[i].size;
575 if (older_file_size * 100L >= total_size * (long)ratio_to_compress) {
576 enable_compression = false;
577 break;
578 }
579 }
580 }
581
582 uint64_t estimated_total_size = 0;
583 for (unsigned int i = 0; i < first_index_after; i++) {
584 estimated_total_size += sorted_runs[i].size;
585 }
586 uint32_t path_id =
587 GetPathId(ioptions_, mutable_cf_options, estimated_total_size);
588 int start_level = sorted_runs[start_index].level;
589 int output_level;
590 if (first_index_after == sorted_runs.size()) {
591 output_level = vstorage->num_levels() - 1;
592 } else if (sorted_runs[first_index_after].level == 0) {
593 output_level = 0;
594 } else {
595 output_level = sorted_runs[first_index_after].level - 1;
596 }
597
598 // last level is reserved for the files ingested behind
599 if (ioptions_.allow_ingest_behind &&
600 (output_level == vstorage->num_levels() - 1)) {
601 assert(output_level > 1);
602 output_level--;
603 }
604
605 std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
606 for (size_t i = 0; i < inputs.size(); ++i) {
607 inputs[i].level = start_level + static_cast<int>(i);
608 }
609 for (size_t i = start_index; i < first_index_after; i++) {
610 auto& picking_sr = sorted_runs[i];
611 if (picking_sr.level == 0) {
612 FileMetaData* picking_file = picking_sr.file;
613 inputs[0].files.push_back(picking_file);
614 } else {
615 auto& files = inputs[picking_sr.level - start_level].files;
616 for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
617 files.push_back(f);
618 }
619 }
620 char file_num_buf[256];
621 picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
622 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(),
623 file_num_buf);
624 }
625
626 CompactionReason compaction_reason;
627 if (max_number_of_files_to_compact == UINT_MAX) {
628 compaction_reason = CompactionReason::kUniversalSizeRatio;
629 } else {
630 compaction_reason = CompactionReason::kUniversalSortedRunNum;
631 }
632 return new Compaction(
633 vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level,
634 MaxFileSizeForLevel(mutable_cf_options, output_level,
635 kCompactionStyleUniversal),
636 LLONG_MAX, path_id,
637 GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level,
638 1, enable_compression),
639 GetCompressionOptions(ioptions_, vstorage, start_level,
640 enable_compression),
641 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
642 score, false /* deletion_compaction */, compaction_reason);
643 }
644
645 // Look at overall size amplification. If size amplification
646 // exceeeds the configured value, then do a compaction
647 // of the candidate files all the way upto the earliest
648 // base file (overrides configured values of file-size ratios,
649 // min_merge_width and max_merge_width).
650 //
651 Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
652 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
653 VersionStorageInfo* vstorage, double score,
654 const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
655 // percentage flexibility while reducing size amplification
656 uint64_t ratio = mutable_cf_options.compaction_options_universal
657 .max_size_amplification_percent;
658
659 unsigned int candidate_count = 0;
660 uint64_t candidate_size = 0;
661 size_t start_index = 0;
662 const SortedRun* sr = nullptr;
663
664 if (sorted_runs.back().being_compacted) {
665 return nullptr;
666 }
667
668 // Skip files that are already being compacted
669 for (size_t loop = 0; loop < sorted_runs.size() - 1; loop++) {
670 sr = &sorted_runs[loop];
671 if (!sr->being_compacted) {
672 start_index = loop; // Consider this as the first candidate.
673 break;
674 }
675 char file_num_buf[kFormatFileNumberBufSize];
676 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
677 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: skipping %s[%d] compacted %s",
678 cf_name.c_str(), file_num_buf, loop,
679 " cannot be a candidate to reduce size amp.\n");
680 sr = nullptr;
681 }
682
683 if (sr == nullptr) {
684 return nullptr; // no candidate files
685 }
686 {
687 char file_num_buf[kFormatFileNumberBufSize];
688 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
689 ROCKS_LOG_BUFFER(
690 log_buffer,
691 "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s",
692 cf_name.c_str(), file_num_buf, start_index, " to reduce size amp.\n");
693 }
694
695 // keep adding up all the remaining files
696 for (size_t loop = start_index; loop < sorted_runs.size() - 1; loop++) {
697 sr = &sorted_runs[loop];
698 if (sr->being_compacted) {
699 char file_num_buf[kFormatFileNumberBufSize];
700 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
701 ROCKS_LOG_BUFFER(
702 log_buffer, "[%s] Universal: Possible candidate %s[%d] %s",
703 cf_name.c_str(), file_num_buf, start_index,
704 " is already being compacted. No size amp reduction possible.\n");
705 return nullptr;
706 }
707 candidate_size += sr->compensated_file_size;
708 candidate_count++;
709 }
710 if (candidate_count == 0) {
711 return nullptr;
712 }
713
714 // size of earliest file
715 uint64_t earliest_file_size = sorted_runs.back().size;
716
717 // size amplification = percentage of additional size
718 if (candidate_size * 100 < ratio * earliest_file_size) {
719 ROCKS_LOG_BUFFER(
720 log_buffer,
721 "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
722 " earliest-file-size %" PRIu64,
723 cf_name.c_str(), candidate_size, earliest_file_size);
724 return nullptr;
725 } else {
726 ROCKS_LOG_BUFFER(
727 log_buffer,
728 "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
729 " earliest-file-size %" PRIu64,
730 cf_name.c_str(), candidate_size, earliest_file_size);
731 }
732 assert(start_index < sorted_runs.size() - 1);
733
734 // Estimate total file size
735 uint64_t estimated_total_size = 0;
736 for (size_t loop = start_index; loop < sorted_runs.size(); loop++) {
737 estimated_total_size += sorted_runs[loop].size;
738 }
739 uint32_t path_id =
740 GetPathId(ioptions_, mutable_cf_options, estimated_total_size);
741 int start_level = sorted_runs[start_index].level;
742
743 std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
744 for (size_t i = 0; i < inputs.size(); ++i) {
745 inputs[i].level = start_level + static_cast<int>(i);
746 }
747 // We always compact all the files, so always compress.
748 for (size_t loop = start_index; loop < sorted_runs.size(); loop++) {
749 auto& picking_sr = sorted_runs[loop];
750 if (picking_sr.level == 0) {
751 FileMetaData* f = picking_sr.file;
752 inputs[0].files.push_back(f);
753 } else {
754 auto& files = inputs[picking_sr.level - start_level].files;
755 for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
756 files.push_back(f);
757 }
758 }
759 char file_num_buf[256];
760 picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
761 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: size amp picking %s",
762 cf_name.c_str(), file_num_buf);
763 }
764
765 // output files at the bottom most level, unless it's reserved
766 int output_level = vstorage->num_levels() - 1;
767 // last level is reserved for the files ingested behind
768 if (ioptions_.allow_ingest_behind) {
769 assert(output_level > 1);
770 output_level--;
771 }
772
773 return new Compaction(
774 vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level,
775 MaxFileSizeForLevel(mutable_cf_options, output_level,
776 kCompactionStyleUniversal),
777 /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
778 GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
779 1),
780 GetCompressionOptions(ioptions_, vstorage, output_level),
781 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
782 score, false /* deletion_compaction */,
783 CompactionReason::kUniversalSizeAmplification);
784 }
785
786 // Pick files marked for compaction. Typically, files are marked by
787 // CompactOnDeleteCollector due to the presence of tombstones.
788 Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
789 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
790 VersionStorageInfo* vstorage, double score,
791 const std::vector<SortedRun>& /*sorted_runs*/, LogBuffer* /*log_buffer*/) {
792 CompactionInputFiles start_level_inputs;
793 int output_level;
794 std::vector<CompactionInputFiles> inputs;
795
796 if (vstorage->num_levels() == 1) {
797 // This is single level universal. Since we're basically trying to reclaim
798 // space by processing files marked for compaction due to high tombstone
799 // density, let's do the same thing as compaction to reduce size amp which
800 // has the same goals.
801 bool compact = false;
802
803 start_level_inputs.level = 0;
804 start_level_inputs.files.clear();
805 output_level = 0;
806 for (FileMetaData* f : vstorage->LevelFiles(0)) {
807 if (f->marked_for_compaction) {
808 compact = true;
809 }
810 if (compact) {
811 start_level_inputs.files.push_back(f);
812 }
813 }
814 if (start_level_inputs.size() <= 1) {
815 // If only the last file in L0 is marked for compaction, ignore it
816 return nullptr;
817 }
818 inputs.push_back(start_level_inputs);
819 } else {
820 int start_level;
821
822 // For multi-level universal, the strategy is to make this look more like
823 // leveled. We pick one of the files marked for compaction and compact with
824 // overlapping files in the adjacent level.
825 PickFilesMarkedForCompaction(cf_name, vstorage, &start_level, &output_level,
826 &start_level_inputs);
827 if (start_level_inputs.empty()) {
828 return nullptr;
829 }
830
831 // Pick the first non-empty level after the start_level
832 for (output_level = start_level + 1; output_level < vstorage->num_levels();
833 output_level++) {
834 if (vstorage->NumLevelFiles(output_level) != 0) {
835 break;
836 }
837 }
838
839 // If all higher levels are empty, pick the highest level as output level
840 if (output_level == vstorage->num_levels()) {
841 if (start_level == 0) {
842 output_level = vstorage->num_levels() - 1;
843 } else {
844 // If start level is non-zero and all higher levels are empty, this
845 // compaction will translate into a trivial move. Since the idea is
846 // to reclaim space and trivial move doesn't help with that, we
847 // skip compaction in this case and return nullptr
848 return nullptr;
849 }
850 }
851 if (ioptions_.allow_ingest_behind &&
852 output_level == vstorage->num_levels() - 1) {
853 assert(output_level > 1);
854 output_level--;
855 }
856
857 if (output_level != 0) {
858 if (start_level == 0) {
859 if (!GetOverlappingL0Files(vstorage, &start_level_inputs, output_level,
860 nullptr)) {
861 return nullptr;
862 }
863 }
864
865 CompactionInputFiles output_level_inputs;
866 int parent_index = -1;
867
868 output_level_inputs.level = output_level;
869 if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage,
870 &start_level_inputs, &output_level_inputs,
871 &parent_index, -1)) {
872 return nullptr;
873 }
874 inputs.push_back(start_level_inputs);
875 if (!output_level_inputs.empty()) {
876 inputs.push_back(output_level_inputs);
877 }
878 if (FilesRangeOverlapWithCompaction(inputs, output_level)) {
879 return nullptr;
880 }
881 } else {
882 inputs.push_back(start_level_inputs);
883 }
884 }
885
886 uint64_t estimated_total_size = 0;
887 // Use size of the output level as estimated file size
888 for (FileMetaData* f : vstorage->LevelFiles(output_level)) {
889 estimated_total_size += f->fd.GetFileSize();
890 }
891 uint32_t path_id =
892 GetPathId(ioptions_, mutable_cf_options, estimated_total_size);
893 return new Compaction(
894 vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level,
895 MaxFileSizeForLevel(mutable_cf_options, output_level,
896 kCompactionStyleUniversal),
897 /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
898 GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
899 1),
900 GetCompressionOptions(ioptions_, vstorage, output_level),
901 /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true,
902 score, false /* deletion_compaction */,
903 CompactionReason::kFilesMarkedForCompaction);
904 }
905 } // namespace rocksdb
906
907 #endif // !ROCKSDB_LITE