]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction_picker_universal.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / db / compaction_picker_universal.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/compaction_picker_universal.h"
11 #ifndef ROCKSDB_LITE
12
13 #ifndef __STDC_FORMAT_MACROS
14 #define __STDC_FORMAT_MACROS
15 #endif
16
17 #include <inttypes.h>
18 #include <limits>
19 #include <queue>
20 #include <string>
21 #include <utility>
22 #include "db/column_family.h"
23 #include "monitoring/statistics.h"
24 #include "util/filename.h"
25 #include "util/log_buffer.h"
26 #include "util/random.h"
27 #include "util/string_util.h"
28 #include "util/sync_point.h"
29
30 namespace rocksdb {
31 namespace {
32 // Used in universal compaction when trivial move is enabled.
33 // This structure is used for the construction of min heap
34 // that contains the file meta data, the level of the file
35 // and the index of the file in that level
36
37 struct InputFileInfo {
38 InputFileInfo() : f(nullptr) {}
39
40 FileMetaData* f;
41 size_t level;
42 size_t index;
43 };
44
45 // Used in universal compaction when trivial move is enabled.
46 // This comparator is used for the construction of min heap
47 // based on the smallest key of the file.
48 struct SmallestKeyHeapComparator {
49 explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
50
51 bool operator()(InputFileInfo i1, InputFileInfo i2) const {
52 return (ucmp_->Compare(i1.f->smallest.user_key(),
53 i2.f->smallest.user_key()) > 0);
54 }
55
56 private:
57 const Comparator* ucmp_;
58 };
59
60 typedef std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
61 SmallestKeyHeapComparator>
62 SmallestKeyHeap;
63
64 // This function creates the heap that is used to find if the files are
65 // overlapping during universal compaction when the allow_trivial_move
66 // is set.
67 SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
68 SmallestKeyHeap smallest_key_priority_q =
69 SmallestKeyHeap(SmallestKeyHeapComparator(ucmp));
70
71 InputFileInfo input_file;
72
73 for (size_t l = 0; l < c->num_input_levels(); l++) {
74 if (c->num_input_files(l) != 0) {
75 if (l == 0 && c->start_level() == 0) {
76 for (size_t i = 0; i < c->num_input_files(0); i++) {
77 input_file.f = c->input(0, i);
78 input_file.level = 0;
79 input_file.index = i;
80 smallest_key_priority_q.push(std::move(input_file));
81 }
82 } else {
83 input_file.f = c->input(l, 0);
84 input_file.level = l;
85 input_file.index = 0;
86 smallest_key_priority_q.push(std::move(input_file));
87 }
88 }
89 }
90 return smallest_key_priority_q;
91 }
92
93 #ifndef NDEBUG
94 // smallest_seqno and largest_seqno are set iff. `files` is not empty.
95 void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
96 SequenceNumber* smallest_seqno,
97 SequenceNumber* largest_seqno) {
98 bool is_first = true;
99 for (FileMetaData* f : files) {
100 assert(f->smallest_seqno <= f->largest_seqno);
101 if (is_first) {
102 is_first = false;
103 *smallest_seqno = f->smallest_seqno;
104 *largest_seqno = f->largest_seqno;
105 } else {
106 if (f->smallest_seqno < *smallest_seqno) {
107 *smallest_seqno = f->smallest_seqno;
108 }
109 if (f->largest_seqno > *largest_seqno) {
110 *largest_seqno = f->largest_seqno;
111 }
112 }
113 }
114 }
115 #endif
116 } // namespace
117
118 // Algorithm that checks to see if there are any overlapping
119 // files in the input
120 bool UniversalCompactionPicker::IsInputFilesNonOverlapping(Compaction* c) {
121 auto comparator = icmp_->user_comparator();
122 int first_iter = 1;
123
124 InputFileInfo prev, curr, next;
125
126 SmallestKeyHeap smallest_key_priority_q =
127 create_level_heap(c, icmp_->user_comparator());
128
129 while (!smallest_key_priority_q.empty()) {
130 curr = smallest_key_priority_q.top();
131 smallest_key_priority_q.pop();
132
133 if (first_iter) {
134 prev = curr;
135 first_iter = 0;
136 } else {
137 if (comparator->Compare(prev.f->largest.user_key(),
138 curr.f->smallest.user_key()) >= 0) {
139 // found overlapping files, return false
140 return false;
141 }
142 assert(comparator->Compare(curr.f->largest.user_key(),
143 prev.f->largest.user_key()) > 0);
144 prev = curr;
145 }
146
147 next.f = nullptr;
148
149 if (curr.level != 0 && curr.index < c->num_input_files(curr.level) - 1) {
150 next.f = c->input(curr.level, curr.index + 1);
151 next.level = curr.level;
152 next.index = curr.index + 1;
153 }
154
155 if (next.f) {
156 smallest_key_priority_q.push(std::move(next));
157 }
158 }
159 return true;
160 }
161
162 bool UniversalCompactionPicker::NeedsCompaction(
163 const VersionStorageInfo* vstorage) const {
164 const int kLevel0 = 0;
165 return vstorage->CompactionScore(kLevel0) >= 1;
166 }
167
168 void UniversalCompactionPicker::SortedRun::Dump(char* out_buf,
169 size_t out_buf_size,
170 bool print_path) const {
171 if (level == 0) {
172 assert(file != nullptr);
173 if (file->fd.GetPathId() == 0 || !print_path) {
174 snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
175 } else {
176 snprintf(out_buf, out_buf_size, "file %" PRIu64
177 "(path "
178 "%" PRIu32 ")",
179 file->fd.GetNumber(), file->fd.GetPathId());
180 }
181 } else {
182 snprintf(out_buf, out_buf_size, "level %d", level);
183 }
184 }
185
186 void UniversalCompactionPicker::SortedRun::DumpSizeInfo(
187 char* out_buf, size_t out_buf_size, size_t sorted_run_count) const {
188 if (level == 0) {
189 assert(file != nullptr);
190 snprintf(out_buf, out_buf_size,
191 "file %" PRIu64 "[%" ROCKSDB_PRIszt
192 "] "
193 "with size %" PRIu64 " (compensated size %" PRIu64 ")",
194 file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(),
195 file->compensated_file_size);
196 } else {
197 snprintf(out_buf, out_buf_size,
198 "level %d[%" ROCKSDB_PRIszt
199 "] "
200 "with size %" PRIu64 " (compensated size %" PRIu64 ")",
201 level, sorted_run_count, size, compensated_file_size);
202 }
203 }
204
205 std::vector<UniversalCompactionPicker::SortedRun>
206 UniversalCompactionPicker::CalculateSortedRuns(
207 const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions) {
208 std::vector<UniversalCompactionPicker::SortedRun> ret;
209 for (FileMetaData* f : vstorage.LevelFiles(0)) {
210 ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
211 f->being_compacted);
212 }
213 for (int level = 1; level < vstorage.num_levels(); level++) {
214 uint64_t total_compensated_size = 0U;
215 uint64_t total_size = 0U;
216 bool being_compacted = false;
217 bool is_first = true;
218 for (FileMetaData* f : vstorage.LevelFiles(level)) {
219 total_compensated_size += f->compensated_file_size;
220 total_size += f->fd.GetFileSize();
221 if (ioptions.compaction_options_universal.allow_trivial_move == true) {
222 if (f->being_compacted) {
223 being_compacted = f->being_compacted;
224 }
225 } else {
226 // Compaction always includes all files for a non-zero level, so for a
227 // non-zero level, all the files should share the same being_compacted
228 // value.
229 // This assumption is only valid when
230 // ioptions.compaction_options_universal.allow_trivial_move is false
231 assert(is_first || f->being_compacted == being_compacted);
232 }
233 if (is_first) {
234 being_compacted = f->being_compacted;
235 is_first = false;
236 }
237 }
238 if (total_compensated_size > 0) {
239 ret.emplace_back(level, nullptr, total_size, total_compensated_size,
240 being_compacted);
241 }
242 }
243 return ret;
244 }
245
246 // Universal style of compaction. Pick files that are contiguous in
247 // time-range to compact.
248 //
249 Compaction* UniversalCompactionPicker::PickCompaction(
250 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
251 VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
252 const int kLevel0 = 0;
253 double score = vstorage->CompactionScore(kLevel0);
254 std::vector<SortedRun> sorted_runs =
255 CalculateSortedRuns(*vstorage, ioptions_);
256
257 if (sorted_runs.size() == 0 ||
258 sorted_runs.size() <
259 (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) {
260 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n",
261 cf_name.c_str());
262 TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
263 nullptr);
264 return nullptr;
265 }
266 VersionStorageInfo::LevelSummaryStorage tmp;
267 ROCKS_LOG_BUFFER_MAX_SZ(
268 log_buffer, 3072,
269 "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n",
270 cf_name.c_str(), sorted_runs.size(), vstorage->LevelSummary(&tmp));
271
272 // Check for size amplification first.
273 Compaction* c;
274 if ((c = PickCompactionToReduceSizeAmp(cf_name, mutable_cf_options, vstorage,
275 score, sorted_runs, log_buffer)) !=
276 nullptr) {
277 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: compacting for size amp\n",
278 cf_name.c_str());
279 } else {
280 // Size amplification is within limits. Try reducing read
281 // amplification while maintaining file size ratios.
282 unsigned int ratio = ioptions_.compaction_options_universal.size_ratio;
283
284 if ((c = PickCompactionToReduceSortedRuns(
285 cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX,
286 sorted_runs, log_buffer)) != nullptr) {
287 ROCKS_LOG_BUFFER(log_buffer,
288 "[%s] Universal: compacting for size ratio\n",
289 cf_name.c_str());
290 } else {
291 // Size amplification and file size ratios are within configured limits.
292 // If max read amplification is exceeding configured limits, then force
293 // compaction without looking at filesize ratios and try to reduce
294 // the number of files to fewer than level0_file_num_compaction_trigger.
295 // This is guaranteed by NeedsCompaction()
296 assert(sorted_runs.size() >=
297 static_cast<size_t>(
298 mutable_cf_options.level0_file_num_compaction_trigger));
299 // Get the total number of sorted runs that are not being compacted
300 int num_sr_not_compacted = 0;
301 for (size_t i = 0; i < sorted_runs.size(); i++) {
302 if (sorted_runs[i].being_compacted == false) {
303 num_sr_not_compacted++;
304 }
305 }
306
307 // The number of sorted runs that are not being compacted is greater than
308 // the maximum allowed number of sorted runs
309 if (num_sr_not_compacted >
310 mutable_cf_options.level0_file_num_compaction_trigger) {
311 unsigned int num_files =
312 num_sr_not_compacted -
313 mutable_cf_options.level0_file_num_compaction_trigger + 1;
314 if ((c = PickCompactionToReduceSortedRuns(
315 cf_name, mutable_cf_options, vstorage, score, UINT_MAX,
316 num_files, sorted_runs, log_buffer)) != nullptr) {
317 ROCKS_LOG_BUFFER(log_buffer,
318 "[%s] Universal: compacting for file num -- %u\n",
319 cf_name.c_str(), num_files);
320 }
321 }
322 }
323 }
324 if (c == nullptr) {
325 TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
326 nullptr);
327 return nullptr;
328 }
329
330 if (ioptions_.compaction_options_universal.allow_trivial_move == true) {
331 c->set_is_trivial_move(IsInputFilesNonOverlapping(c));
332 }
333
334 // validate that all the chosen files of L0 are non overlapping in time
335 #ifndef NDEBUG
336 SequenceNumber prev_smallest_seqno = 0U;
337 bool is_first = true;
338
339 size_t level_index = 0U;
340 if (c->start_level() == 0) {
341 for (auto f : *c->inputs(0)) {
342 assert(f->smallest_seqno <= f->largest_seqno);
343 if (is_first) {
344 is_first = false;
345 }
346 prev_smallest_seqno = f->smallest_seqno;
347 }
348 level_index = 1U;
349 }
350 for (; level_index < c->num_input_levels(); level_index++) {
351 if (c->num_input_files(level_index) != 0) {
352 SequenceNumber smallest_seqno = 0U;
353 SequenceNumber largest_seqno = 0U;
354 GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
355 &largest_seqno);
356 if (is_first) {
357 is_first = false;
358 } else if (prev_smallest_seqno > 0) {
359 // A level is considered as the bottommost level if there are
360 // no files in higher levels or if files in higher levels do
361 // not overlap with the files being compacted. Sequence numbers
362 // of files in bottommost level can be set to 0 to help
363 // compression. As a result, the following assert may not hold
364 // if the prev_smallest_seqno is 0.
365 assert(prev_smallest_seqno > largest_seqno);
366 }
367 prev_smallest_seqno = smallest_seqno;
368 }
369 }
370 #endif
371 // update statistics
372 MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION,
373 c->inputs(0)->size());
374
375 RegisterCompaction(c);
376
377 TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
378 c);
379 return c;
380 }
381
382 uint32_t UniversalCompactionPicker::GetPathId(
383 const ImmutableCFOptions& ioptions, uint64_t file_size) {
384 // Two conditions need to be satisfied:
385 // (1) the target path needs to be able to hold the file's size
386 // (2) Total size left in this and previous paths need to be not
387 // smaller than expected future file size before this new file is
388 // compacted, which is estimated based on size_ratio.
389 // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
390 // we will make sure the target file, probably with size of 16, will be
391 // placed in a path so that eventually when new files are generated and
392 // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
393 // before the path we chose.
394 //
395 // TODO(sdong): now the case of multiple column families is not
396 // considered in this algorithm. So the target size can be violated in
397 // that case. We need to improve it.
398 uint64_t accumulated_size = 0;
399 uint64_t future_size =
400 file_size * (100 - ioptions.compaction_options_universal.size_ratio) /
401 100;
402 uint32_t p = 0;
403 assert(!ioptions.db_paths.empty());
404 for (; p < ioptions.db_paths.size() - 1; p++) {
405 uint64_t target_size = ioptions.db_paths[p].target_size;
406 if (target_size > file_size &&
407 accumulated_size + (target_size - file_size) > future_size) {
408 return p;
409 }
410 accumulated_size += target_size;
411 }
412 return p;
413 }
414
415 //
416 // Consider compaction files based on their size differences with
417 // the next file in time order.
418 //
419 Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
420 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
421 VersionStorageInfo* vstorage, double score, unsigned int ratio,
422 unsigned int max_number_of_files_to_compact,
423 const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
424 unsigned int min_merge_width =
425 ioptions_.compaction_options_universal.min_merge_width;
426 unsigned int max_merge_width =
427 ioptions_.compaction_options_universal.max_merge_width;
428
429 const SortedRun* sr = nullptr;
430 bool done = false;
431 size_t start_index = 0;
432 unsigned int candidate_count = 0;
433
434 unsigned int max_files_to_compact =
435 std::min(max_merge_width, max_number_of_files_to_compact);
436 min_merge_width = std::max(min_merge_width, 2U);
437
438 // Caller checks the size before executing this function. This invariant is
439 // important because otherwise we may have a possible integer underflow when
440 // dealing with unsigned types.
441 assert(sorted_runs.size() > 0);
442
443 // Considers a candidate file only if it is smaller than the
444 // total size accumulated so far.
445 for (size_t loop = 0; loop < sorted_runs.size(); loop++) {
446 candidate_count = 0;
447
448 // Skip files that are already being compacted
449 for (sr = nullptr; loop < sorted_runs.size(); loop++) {
450 sr = &sorted_runs[loop];
451
452 if (!sr->being_compacted) {
453 candidate_count = 1;
454 break;
455 }
456 char file_num_buf[kFormatFileNumberBufSize];
457 sr->Dump(file_num_buf, sizeof(file_num_buf));
458 ROCKS_LOG_BUFFER(log_buffer,
459 "[%s] Universal: %s"
460 "[%d] being compacted, skipping",
461 cf_name.c_str(), file_num_buf, loop);
462
463 sr = nullptr;
464 }
465
466 // This file is not being compacted. Consider it as the
467 // first candidate to be compacted.
468 uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0;
469 if (sr != nullptr) {
470 char file_num_buf[kFormatFileNumberBufSize];
471 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
472 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Possible candidate %s[%d].",
473 cf_name.c_str(), file_num_buf, loop);
474 }
475
476 // Check if the succeeding files need compaction.
477 for (size_t i = loop + 1;
478 candidate_count < max_files_to_compact && i < sorted_runs.size();
479 i++) {
480 const SortedRun* succeeding_sr = &sorted_runs[i];
481 if (succeeding_sr->being_compacted) {
482 break;
483 }
484 // Pick files if the total/last candidate file size (increased by the
485 // specified ratio) is still larger than the next candidate file.
486 // candidate_size is the total size of files picked so far with the
487 // default kCompactionStopStyleTotalSize; with
488 // kCompactionStopStyleSimilarSize, it's simply the size of the last
489 // picked file.
490 double sz = candidate_size * (100.0 + ratio) / 100.0;
491 if (sz < static_cast<double>(succeeding_sr->size)) {
492 break;
493 }
494 if (ioptions_.compaction_options_universal.stop_style ==
495 kCompactionStopStyleSimilarSize) {
496 // Similar-size stopping rule: also check the last picked file isn't
497 // far larger than the next candidate file.
498 sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0;
499 if (sz < static_cast<double>(candidate_size)) {
500 // If the small file we've encountered begins a run of similar-size
501 // files, we'll pick them up on a future iteration of the outer
502 // loop. If it's some lonely straggler, it'll eventually get picked
503 // by the last-resort read amp strategy which disregards size ratios.
504 break;
505 }
506 candidate_size = succeeding_sr->compensated_file_size;
507 } else { // default kCompactionStopStyleTotalSize
508 candidate_size += succeeding_sr->compensated_file_size;
509 }
510 candidate_count++;
511 }
512
513 // Found a series of consecutive files that need compaction.
514 if (candidate_count >= (unsigned int)min_merge_width) {
515 start_index = loop;
516 done = true;
517 break;
518 } else {
519 for (size_t i = loop;
520 i < loop + candidate_count && i < sorted_runs.size(); i++) {
521 const SortedRun* skipping_sr = &sorted_runs[i];
522 char file_num_buf[256];
523 skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
524 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Skipping %s",
525 cf_name.c_str(), file_num_buf);
526 }
527 }
528 }
529 if (!done || candidate_count <= 1) {
530 return nullptr;
531 }
532 size_t first_index_after = start_index + candidate_count;
533 // Compression is enabled if files compacted earlier already reached
534 // size ratio of compression.
535 bool enable_compression = true;
536 int ratio_to_compress =
537 ioptions_.compaction_options_universal.compression_size_percent;
538 if (ratio_to_compress >= 0) {
539 uint64_t total_size = 0;
540 for (auto& sorted_run : sorted_runs) {
541 total_size += sorted_run.compensated_file_size;
542 }
543
544 uint64_t older_file_size = 0;
545 for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) {
546 older_file_size += sorted_runs[i].size;
547 if (older_file_size * 100L >= total_size * (long)ratio_to_compress) {
548 enable_compression = false;
549 break;
550 }
551 }
552 }
553
554 uint64_t estimated_total_size = 0;
555 for (unsigned int i = 0; i < first_index_after; i++) {
556 estimated_total_size += sorted_runs[i].size;
557 }
558 uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
559 int start_level = sorted_runs[start_index].level;
560 int output_level;
561 if (first_index_after == sorted_runs.size()) {
562 output_level = vstorage->num_levels() - 1;
563 } else if (sorted_runs[first_index_after].level == 0) {
564 output_level = 0;
565 } else {
566 output_level = sorted_runs[first_index_after].level - 1;
567 }
568
569 std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
570 for (size_t i = 0; i < inputs.size(); ++i) {
571 inputs[i].level = start_level + static_cast<int>(i);
572 }
573 for (size_t i = start_index; i < first_index_after; i++) {
574 auto& picking_sr = sorted_runs[i];
575 if (picking_sr.level == 0) {
576 FileMetaData* picking_file = picking_sr.file;
577 inputs[0].files.push_back(picking_file);
578 } else {
579 auto& files = inputs[picking_sr.level - start_level].files;
580 for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
581 files.push_back(f);
582 }
583 }
584 char file_num_buf[256];
585 picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
586 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(),
587 file_num_buf);
588 }
589
590 CompactionReason compaction_reason;
591 if (max_number_of_files_to_compact == UINT_MAX) {
592 compaction_reason = CompactionReason::kUniversalSortedRunNum;
593 } else {
594 compaction_reason = CompactionReason::kUniversalSizeRatio;
595 }
596 return new Compaction(
597 vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level,
598 mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id,
599 GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level,
600 1, enable_compression),
601 /* grandparents */ {}, /* is manual */ false, score,
602 false /* deletion_compaction */, compaction_reason);
603 }
604
605 // Look at overall size amplification. If size amplification
606 // exceeeds the configured value, then do a compaction
607 // of the candidate files all the way upto the earliest
608 // base file (overrides configured values of file-size ratios,
609 // min_merge_width and max_merge_width).
610 //
611 Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
612 const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
613 VersionStorageInfo* vstorage, double score,
614 const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
615 // percentage flexibilty while reducing size amplification
616 uint64_t ratio =
617 ioptions_.compaction_options_universal.max_size_amplification_percent;
618
619 unsigned int candidate_count = 0;
620 uint64_t candidate_size = 0;
621 size_t start_index = 0;
622 const SortedRun* sr = nullptr;
623
624 // Skip files that are already being compacted
625 for (size_t loop = 0; loop < sorted_runs.size() - 1; loop++) {
626 sr = &sorted_runs[loop];
627 if (!sr->being_compacted) {
628 start_index = loop; // Consider this as the first candidate.
629 break;
630 }
631 char file_num_buf[kFormatFileNumberBufSize];
632 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
633 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: skipping %s[%d] compacted %s",
634 cf_name.c_str(), file_num_buf, loop,
635 " cannot be a candidate to reduce size amp.\n");
636 sr = nullptr;
637 }
638
639 if (sr == nullptr) {
640 return nullptr; // no candidate files
641 }
642 {
643 char file_num_buf[kFormatFileNumberBufSize];
644 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
645 ROCKS_LOG_BUFFER(
646 log_buffer,
647 "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s",
648 cf_name.c_str(), file_num_buf, start_index, " to reduce size amp.\n");
649 }
650
651 // keep adding up all the remaining files
652 for (size_t loop = start_index; loop < sorted_runs.size() - 1; loop++) {
653 sr = &sorted_runs[loop];
654 if (sr->being_compacted) {
655 char file_num_buf[kFormatFileNumberBufSize];
656 sr->Dump(file_num_buf, sizeof(file_num_buf), true);
657 ROCKS_LOG_BUFFER(
658 log_buffer, "[%s] Universal: Possible candidate %s[%d] %s",
659 cf_name.c_str(), file_num_buf, start_index,
660 " is already being compacted. No size amp reduction possible.\n");
661 return nullptr;
662 }
663 candidate_size += sr->compensated_file_size;
664 candidate_count++;
665 }
666 if (candidate_count == 0) {
667 return nullptr;
668 }
669
670 // size of earliest file
671 uint64_t earliest_file_size = sorted_runs.back().size;
672
673 // size amplification = percentage of additional size
674 if (candidate_size * 100 < ratio * earliest_file_size) {
675 ROCKS_LOG_BUFFER(
676 log_buffer,
677 "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
678 " earliest-file-size %" PRIu64,
679 cf_name.c_str(), candidate_size, earliest_file_size);
680 return nullptr;
681 } else {
682 ROCKS_LOG_BUFFER(
683 log_buffer,
684 "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
685 " earliest-file-size %" PRIu64,
686 cf_name.c_str(), candidate_size, earliest_file_size);
687 }
688 assert(start_index < sorted_runs.size() - 1);
689
690 // Estimate total file size
691 uint64_t estimated_total_size = 0;
692 for (size_t loop = start_index; loop < sorted_runs.size(); loop++) {
693 estimated_total_size += sorted_runs[loop].size;
694 }
695 uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
696 int start_level = sorted_runs[start_index].level;
697
698 std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
699 for (size_t i = 0; i < inputs.size(); ++i) {
700 inputs[i].level = start_level + static_cast<int>(i);
701 }
702 // We always compact all the files, so always compress.
703 for (size_t loop = start_index; loop < sorted_runs.size(); loop++) {
704 auto& picking_sr = sorted_runs[loop];
705 if (picking_sr.level == 0) {
706 FileMetaData* f = picking_sr.file;
707 inputs[0].files.push_back(f);
708 } else {
709 auto& files = inputs[picking_sr.level - start_level].files;
710 for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
711 files.push_back(f);
712 }
713 }
714 char file_num_buf[256];
715 picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
716 ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: size amp picking %s",
717 cf_name.c_str(), file_num_buf);
718 }
719
720 return new Compaction(
721 vstorage, ioptions_, mutable_cf_options, std::move(inputs),
722 vstorage->num_levels() - 1,
723 mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1),
724 /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
725 GetCompressionType(ioptions_, vstorage, mutable_cf_options,
726 vstorage->num_levels() - 1, 1),
727 /* grandparents */ {}, /* is manual */ false, score,
728 false /* deletion_compaction */,
729 CompactionReason::kUniversalSizeAmplification);
730 }
731 } // namespace rocksdb
732
733 #endif // !ROCKSDB_LITE