]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_universal_compaction_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / db_universal_compaction_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12 #if !defined(ROCKSDB_LITE)
13 #include "rocksdb/utilities/table_properties_collectors.h"
14 #include "util/sync_point.h"
15
16 namespace rocksdb {
17
18 static std::string CompressibleString(Random* rnd, int len) {
19 std::string r;
20 test::CompressibleString(rnd, 0.8, len, &r);
21 return r;
22 }
23
24 class DBTestUniversalCompactionBase
25 : public DBTestBase,
26 public ::testing::WithParamInterface<std::tuple<int, bool>> {
27 public:
28 explicit DBTestUniversalCompactionBase(
29 const std::string& path) : DBTestBase(path) {}
30 virtual void SetUp() override {
31 num_levels_ = std::get<0>(GetParam());
32 exclusive_manual_compaction_ = std::get<1>(GetParam());
33 }
34 int num_levels_;
35 bool exclusive_manual_compaction_;
36 };
37
38 class DBTestUniversalCompaction : public DBTestUniversalCompactionBase {
39 public:
40 DBTestUniversalCompaction() :
41 DBTestUniversalCompactionBase("/db_universal_compaction_test") {}
42 };
43
44 class DBTestUniversalDeleteTrigCompaction : public DBTestBase {
45 public:
46 DBTestUniversalDeleteTrigCompaction()
47 : DBTestBase("/db_universal_compaction_test") {}
48 };
49
50 namespace {
51 void VerifyCompactionResult(
52 const ColumnFamilyMetaData& cf_meta,
53 const std::set<std::string>& overlapping_file_numbers) {
54 #ifndef NDEBUG
55 for (auto& level : cf_meta.levels) {
56 for (auto& file : level.files) {
57 assert(overlapping_file_numbers.find(file.name) ==
58 overlapping_file_numbers.end());
59 }
60 }
61 #endif
62 }
63
64 class KeepFilter : public CompactionFilter {
65 public:
66 virtual bool Filter(int /*level*/, const Slice& /*key*/,
67 const Slice& /*value*/, std::string* /*new_value*/,
68 bool* /*value_changed*/) const override {
69 return false;
70 }
71
72 virtual const char* Name() const override { return "KeepFilter"; }
73 };
74
75 class KeepFilterFactory : public CompactionFilterFactory {
76 public:
77 explicit KeepFilterFactory(bool check_context = false)
78 : check_context_(check_context) {}
79
80 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
81 const CompactionFilter::Context& context) override {
82 if (check_context_) {
83 EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
84 EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
85 }
86 return std::unique_ptr<CompactionFilter>(new KeepFilter());
87 }
88
89 virtual const char* Name() const override { return "KeepFilterFactory"; }
90 bool check_context_;
91 std::atomic_bool expect_full_compaction_;
92 std::atomic_bool expect_manual_compaction_;
93 };
94
95 class DelayFilter : public CompactionFilter {
96 public:
97 explicit DelayFilter(DBTestBase* d) : db_test(d) {}
98 virtual bool Filter(int /*level*/, const Slice& /*key*/,
99 const Slice& /*value*/, std::string* /*new_value*/,
100 bool* /*value_changed*/) const override {
101 db_test->env_->addon_time_.fetch_add(1000);
102 return true;
103 }
104
105 virtual const char* Name() const override { return "DelayFilter"; }
106
107 private:
108 DBTestBase* db_test;
109 };
110
111 class DelayFilterFactory : public CompactionFilterFactory {
112 public:
113 explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
114 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
115 const CompactionFilter::Context& /*context*/) override {
116 return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
117 }
118
119 virtual const char* Name() const override { return "DelayFilterFactory"; }
120
121 private:
122 DBTestBase* db_test;
123 };
124 } // namespace
125
126 // Make sure we don't trigger a problem if the trigger condtion is given
127 // to be 0, which is invalid.
128 TEST_P(DBTestUniversalCompaction, UniversalCompactionSingleSortedRun) {
129 Options options = CurrentOptions();
130
131 options.compaction_style = kCompactionStyleUniversal;
132 options.num_levels = num_levels_;
133 // Config universal compaction to always compact to one single sorted run.
134 options.level0_file_num_compaction_trigger = 0;
135 options.compaction_options_universal.size_ratio = 10;
136 options.compaction_options_universal.min_merge_width = 2;
137 options.compaction_options_universal.max_size_amplification_percent = 0;
138
139 options.write_buffer_size = 105 << 10; // 105KB
140 options.arena_block_size = 4 << 10;
141 options.target_file_size_base = 32 << 10; // 32KB
142 // trigger compaction if there are >= 4 files
143 KeepFilterFactory* filter = new KeepFilterFactory(true);
144 filter->expect_manual_compaction_.store(false);
145 options.compaction_filter_factory.reset(filter);
146
147 DestroyAndReopen(options);
148 ASSERT_EQ(1, db_->GetOptions().level0_file_num_compaction_trigger);
149
150 Random rnd(301);
151 int key_idx = 0;
152
153 filter->expect_full_compaction_.store(true);
154
155 for (int num = 0; num < 16; num++) {
156 // Write 100KB file. And immediately it should be compacted to one file.
157 GenerateNewFile(&rnd, &key_idx);
158 dbfull()->TEST_WaitForCompact();
159 ASSERT_EQ(NumSortedRuns(0), 1);
160 }
161 ASSERT_OK(Put(Key(key_idx), ""));
162 dbfull()->TEST_WaitForCompact();
163 ASSERT_EQ(NumSortedRuns(0), 1);
164 }
165
166 TEST_P(DBTestUniversalCompaction, OptimizeFiltersForHits) {
167 Options options = CurrentOptions();
168 options.compaction_style = kCompactionStyleUniversal;
169 options.compaction_options_universal.size_ratio = 5;
170 options.num_levels = num_levels_;
171 options.write_buffer_size = 105 << 10; // 105KB
172 options.arena_block_size = 4 << 10;
173 options.target_file_size_base = 32 << 10; // 32KB
174 // trigger compaction if there are >= 4 files
175 options.level0_file_num_compaction_trigger = 4;
176 BlockBasedTableOptions bbto;
177 bbto.cache_index_and_filter_blocks = true;
178 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
179 bbto.whole_key_filtering = true;
180 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
181 options.optimize_filters_for_hits = true;
182 options.statistics = rocksdb::CreateDBStatistics();
183 options.memtable_factory.reset(new SpecialSkipListFactory(3));
184
185 DestroyAndReopen(options);
186
187 // block compaction from happening
188 env_->SetBackgroundThreads(1, Env::LOW);
189 test::SleepingBackgroundTask sleeping_task_low;
190 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
191 Env::Priority::LOW);
192
193 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
194 Put(Key(num * 10), "val");
195 if (num) {
196 dbfull()->TEST_WaitForFlushMemTable();
197 }
198 Put(Key(30 + num * 10), "val");
199 Put(Key(60 + num * 10), "val");
200 }
201 Put("", "");
202 dbfull()->TEST_WaitForFlushMemTable();
203
204 // Query set of non existing keys
205 for (int i = 5; i < 90; i += 10) {
206 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
207 }
208
209 // Make sure bloom filter is used at least once.
210 ASSERT_GT(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0);
211 auto prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
212
213 // Make sure bloom filter is used for all but the last L0 file when looking
214 // up a non-existent key that's in the range of all L0 files.
215 ASSERT_EQ(Get(Key(35)), "NOT_FOUND");
216 ASSERT_EQ(prev_counter + NumTableFilesAtLevel(0) - 1,
217 TestGetTickerCount(options, BLOOM_FILTER_USEFUL));
218 prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
219
220 // Unblock compaction and wait it for happening.
221 sleeping_task_low.WakeUp();
222 dbfull()->TEST_WaitForCompact();
223
224 // The same queries will not trigger bloom filter
225 for (int i = 5; i < 90; i += 10) {
226 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
227 }
228 ASSERT_EQ(prev_counter, TestGetTickerCount(options, BLOOM_FILTER_USEFUL));
229 }
230
231 // TODO(kailiu) The tests on UniversalCompaction has some issues:
232 // 1. A lot of magic numbers ("11" or "12").
233 // 2. Made assumption on the memtable flush conditions, which may change from
234 // time to time.
235 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrigger) {
236 Options options;
237 options.compaction_style = kCompactionStyleUniversal;
238 options.compaction_options_universal.size_ratio = 5;
239 options.num_levels = num_levels_;
240 options.write_buffer_size = 105 << 10; // 105KB
241 options.arena_block_size = 4 << 10;
242 options.target_file_size_base = 32 << 10; // 32KB
243 // trigger compaction if there are >= 4 files
244 options.level0_file_num_compaction_trigger = 4;
245 KeepFilterFactory* filter = new KeepFilterFactory(true);
246 filter->expect_manual_compaction_.store(false);
247 options.compaction_filter_factory.reset(filter);
248
249 options = CurrentOptions(options);
250 DestroyAndReopen(options);
251 CreateAndReopenWithCF({"pikachu"}, options);
252
253 rocksdb::SyncPoint::GetInstance()->SetCallBack(
254 "DBTestWritableFile.GetPreallocationStatus", [&](void* arg) {
255 ASSERT_TRUE(arg != nullptr);
256 size_t preallocation_size = *(static_cast<size_t*>(arg));
257 if (num_levels_ > 3) {
258 ASSERT_LE(preallocation_size, options.target_file_size_base * 1.1);
259 }
260 });
261 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
262
263 Random rnd(301);
264 int key_idx = 0;
265
266 filter->expect_full_compaction_.store(true);
267 // Stage 1:
268 // Generate a set of files at level 0, but don't trigger level-0
269 // compaction.
270 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
271 num++) {
272 // Write 100KB
273 GenerateNewFile(1, &rnd, &key_idx);
274 }
275
276 // Generate one more file at level-0, which should trigger level-0
277 // compaction.
278 GenerateNewFile(1, &rnd, &key_idx);
279 // Suppose each file flushed from mem table has size 1. Now we compact
280 // (level0_file_num_compaction_trigger+1)=4 files and should have a big
281 // file of size 4.
282 ASSERT_EQ(NumSortedRuns(1), 1);
283
284 // Stage 2:
285 // Now we have one file at level 0, with size 4. We also have some data in
286 // mem table. Let's continue generating new files at level 0, but don't
287 // trigger level-0 compaction.
288 // First, clean up memtable before inserting new data. This will generate
289 // a level-0 file, with size around 0.4 (according to previously written
290 // data amount).
291 filter->expect_full_compaction_.store(false);
292 ASSERT_OK(Flush(1));
293 for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
294 num++) {
295 GenerateNewFile(1, &rnd, &key_idx);
296 ASSERT_EQ(NumSortedRuns(1), num + 3);
297 }
298
299 // Generate one more file at level-0, which should trigger level-0
300 // compaction.
301 GenerateNewFile(1, &rnd, &key_idx);
302 // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
303 // After compaction, we should have 2 files, with size 4, 2.4.
304 ASSERT_EQ(NumSortedRuns(1), 2);
305
306 // Stage 3:
307 // Now we have 2 files at level 0, with size 4 and 2.4. Continue
308 // generating new files at level 0.
309 for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
310 num++) {
311 GenerateNewFile(1, &rnd, &key_idx);
312 ASSERT_EQ(NumSortedRuns(1), num + 3);
313 }
314
315 // Generate one more file at level-0, which should trigger level-0
316 // compaction.
317 GenerateNewFile(1, &rnd, &key_idx);
318 // Before compaction, we have 4 files at level 0, with size 4, 2.4, 1, 1.
319 // After compaction, we should have 3 files, with size 4, 2.4, 2.
320 ASSERT_EQ(NumSortedRuns(1), 3);
321
322 // Stage 4:
323 // Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a
324 // new file of size 1.
325 GenerateNewFile(1, &rnd, &key_idx);
326 dbfull()->TEST_WaitForCompact();
327 // Level-0 compaction is triggered, but no file will be picked up.
328 ASSERT_EQ(NumSortedRuns(1), 4);
329
330 // Stage 5:
331 // Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
332 // a new file of size 1.
333 filter->expect_full_compaction_.store(true);
334 GenerateNewFile(1, &rnd, &key_idx);
335 dbfull()->TEST_WaitForCompact();
336 // All files at level 0 will be compacted into a single one.
337 ASSERT_EQ(NumSortedRuns(1), 1);
338
339 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
340 }
341
342 TEST_P(DBTestUniversalCompaction, UniversalCompactionSizeAmplification) {
343 Options options = CurrentOptions();
344 options.compaction_style = kCompactionStyleUniversal;
345 options.num_levels = num_levels_;
346 options.write_buffer_size = 100 << 10; // 100KB
347 options.target_file_size_base = 32 << 10; // 32KB
348 options.level0_file_num_compaction_trigger = 3;
349 DestroyAndReopen(options);
350 CreateAndReopenWithCF({"pikachu"}, options);
351
352 // Trigger compaction if size amplification exceeds 110%
353 options.compaction_options_universal.max_size_amplification_percent = 110;
354 options = CurrentOptions(options);
355 ReopenWithColumnFamilies({"default", "pikachu"}, options);
356
357 Random rnd(301);
358 int key_idx = 0;
359
360 // Generate two files in Level 0. Both files are approx the same size.
361 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
362 num++) {
363 // Write 110KB (11 values, each 10K)
364 for (int i = 0; i < 11; i++) {
365 ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
366 key_idx++;
367 }
368 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
369 ASSERT_EQ(NumSortedRuns(1), num + 1);
370 }
371 ASSERT_EQ(NumSortedRuns(1), 2);
372
373 // Flush whatever is remaining in memtable. This is typically
374 // small, which should not trigger size ratio based compaction
375 // but will instead trigger size amplification.
376 ASSERT_OK(Flush(1));
377
378 dbfull()->TEST_WaitForCompact();
379
380 // Verify that size amplification did occur
381 ASSERT_EQ(NumSortedRuns(1), 1);
382 }
383
384 TEST_P(DBTestUniversalCompaction, DynamicUniversalCompactionSizeAmplification) {
385 Options options = CurrentOptions();
386 options.compaction_style = kCompactionStyleUniversal;
387 options.num_levels = 1;
388 options.write_buffer_size = 100 << 10; // 100KB
389 options.target_file_size_base = 32 << 10; // 32KB
390 options.level0_file_num_compaction_trigger = 3;
391 // Initial setup of compaction_options_universal will prevent universal
392 // compaction from happening
393 options.compaction_options_universal.size_ratio = 100;
394 options.compaction_options_universal.min_merge_width = 100;
395 DestroyAndReopen(options);
396
397 int total_picked_compactions = 0;
398 int total_size_amp_compactions = 0;
399 rocksdb::SyncPoint::GetInstance()->SetCallBack(
400 "UniversalCompactionPicker::PickCompaction:Return", [&](void* arg) {
401 if (arg) {
402 total_picked_compactions++;
403 Compaction* c = static_cast<Compaction*>(arg);
404 if (c->compaction_reason() ==
405 CompactionReason::kUniversalSizeAmplification) {
406 total_size_amp_compactions++;
407 }
408 }
409 });
410 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
411
412 MutableCFOptions mutable_cf_options;
413 CreateAndReopenWithCF({"pikachu"}, options);
414
415 Random rnd(301);
416 int key_idx = 0;
417
418 // Generate two files in Level 0. Both files are approx the same size.
419 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
420 num++) {
421 // Write 110KB (11 values, each 10K)
422 for (int i = 0; i < 11; i++) {
423 ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
424 key_idx++;
425 }
426 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
427 ASSERT_EQ(NumSortedRuns(1), num + 1);
428 }
429 ASSERT_EQ(NumSortedRuns(1), 2);
430
431 // Flush whatever is remaining in memtable. This is typically
432 // small, which should not trigger size ratio based compaction
433 // but could instead trigger size amplification if it's set
434 // to 110.
435 ASSERT_OK(Flush(1));
436 dbfull()->TEST_WaitForCompact();
437 // Verify compaction did not happen
438 ASSERT_EQ(NumSortedRuns(1), 3);
439
440 // Trigger compaction if size amplification exceeds 110% without reopening DB
441 ASSERT_EQ(dbfull()
442 ->GetOptions(handles_[1])
443 .compaction_options_universal.max_size_amplification_percent,
444 200);
445 ASSERT_OK(dbfull()->SetOptions(handles_[1],
446 {{"compaction_options_universal",
447 "{max_size_amplification_percent=110;}"}}));
448 ASSERT_EQ(dbfull()
449 ->GetOptions(handles_[1])
450 .compaction_options_universal.max_size_amplification_percent,
451 110);
452 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
453 &mutable_cf_options));
454 ASSERT_EQ(110, mutable_cf_options.compaction_options_universal
455 .max_size_amplification_percent);
456
457 dbfull()->TEST_WaitForCompact();
458 // Verify that size amplification did happen
459 ASSERT_EQ(NumSortedRuns(1), 1);
460 ASSERT_EQ(total_picked_compactions, 1);
461 ASSERT_EQ(total_size_amp_compactions, 1);
462 }
463
464 TEST_P(DBTestUniversalCompaction, DynamicUniversalCompactionReadAmplification) {
465 Options options = CurrentOptions();
466 options.compaction_style = kCompactionStyleUniversal;
467 options.num_levels = 1;
468 options.write_buffer_size = 100 << 10; // 100KB
469 options.target_file_size_base = 32 << 10; // 32KB
470 options.level0_file_num_compaction_trigger = 3;
471 // Initial setup of compaction_options_universal will prevent universal
472 // compaction from happening
473 options.compaction_options_universal.max_size_amplification_percent = 2000;
474 options.compaction_options_universal.size_ratio = 0;
475 options.compaction_options_universal.min_merge_width = 100;
476 DestroyAndReopen(options);
477
478 int total_picked_compactions = 0;
479 int total_size_ratio_compactions = 0;
480 rocksdb::SyncPoint::GetInstance()->SetCallBack(
481 "UniversalCompactionPicker::PickCompaction:Return", [&](void* arg) {
482 if (arg) {
483 total_picked_compactions++;
484 Compaction* c = static_cast<Compaction*>(arg);
485 if (c->compaction_reason() == CompactionReason::kUniversalSizeRatio) {
486 total_size_ratio_compactions++;
487 }
488 }
489 });
490 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
491
492 MutableCFOptions mutable_cf_options;
493 CreateAndReopenWithCF({"pikachu"}, options);
494
495 Random rnd(301);
496 int key_idx = 0;
497
498 // Generate three files in Level 0. All files are approx the same size.
499 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
500 // Write 110KB (11 values, each 10K)
501 for (int i = 0; i < 11; i++) {
502 ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
503 key_idx++;
504 }
505 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
506 ASSERT_EQ(NumSortedRuns(1), num + 1);
507 }
508 ASSERT_EQ(NumSortedRuns(1), options.level0_file_num_compaction_trigger);
509
510 // Flush whatever is remaining in memtable. This is typically small, about
511 // 30KB.
512 ASSERT_OK(Flush(1));
513 dbfull()->TEST_WaitForCompact();
514 // Verify compaction did not happen
515 ASSERT_EQ(NumSortedRuns(1), options.level0_file_num_compaction_trigger + 1);
516 ASSERT_EQ(total_picked_compactions, 0);
517
518 ASSERT_OK(dbfull()->SetOptions(
519 handles_[1],
520 {{"compaction_options_universal",
521 "{min_merge_width=2;max_merge_width=2;size_ratio=100;}"}}));
522 ASSERT_EQ(dbfull()
523 ->GetOptions(handles_[1])
524 .compaction_options_universal.min_merge_width,
525 2);
526 ASSERT_EQ(dbfull()
527 ->GetOptions(handles_[1])
528 .compaction_options_universal.max_merge_width,
529 2);
530 ASSERT_EQ(
531 dbfull()->GetOptions(handles_[1]).compaction_options_universal.size_ratio,
532 100);
533
534 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
535 &mutable_cf_options));
536 ASSERT_EQ(mutable_cf_options.compaction_options_universal.size_ratio, 100);
537 ASSERT_EQ(mutable_cf_options.compaction_options_universal.min_merge_width, 2);
538 ASSERT_EQ(mutable_cf_options.compaction_options_universal.max_merge_width, 2);
539
540 dbfull()->TEST_WaitForCompact();
541
542 // Files in L0 are approx: 0.3 (30KB), 1, 1, 1.
543 // On compaction: the files are below the size amp threshold, so we
544 // fallthrough to checking read amp conditions. The configured size ratio is
545 // not big enough to take 0.3 into consideration. So the next files 1 and 1
546 // are compacted together first as they satisfy size ratio condition and
547 // (min_merge_width, max_merge_width) condition, to give out a file size of 2.
548 // Next, the newly generated 2 and the last file 1 are compacted together. So
549 // at the end: #sortedRuns = 2, #picked_compactions = 2, and all the picked
550 // ones are size ratio based compactions.
551 ASSERT_EQ(NumSortedRuns(1), 2);
552 // If max_merge_width had not been changed dynamically above, and if it
553 // continued to be the default value of UINIT_MAX, total_picked_compactions
554 // would have been 1.
555 ASSERT_EQ(total_picked_compactions, 2);
556 ASSERT_EQ(total_size_ratio_compactions, 2);
557 }
558
559 TEST_P(DBTestUniversalCompaction, CompactFilesOnUniversalCompaction) {
560 const int kTestKeySize = 16;
561 const int kTestValueSize = 984;
562 const int kEntrySize = kTestKeySize + kTestValueSize;
563 const int kEntriesPerBuffer = 10;
564
565 ChangeCompactOptions();
566 Options options;
567 options.create_if_missing = true;
568 options.compaction_style = kCompactionStyleLevel;
569 options.num_levels = 1;
570 options.target_file_size_base = options.write_buffer_size;
571 options.compression = kNoCompression;
572 options = CurrentOptions(options);
573 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
574 CreateAndReopenWithCF({"pikachu"}, options);
575 ASSERT_EQ(options.compaction_style, kCompactionStyleUniversal);
576 Random rnd(301);
577 for (int key = 1024 * kEntriesPerBuffer; key >= 0; --key) {
578 ASSERT_OK(Put(1, ToString(key), RandomString(&rnd, kTestValueSize)));
579 }
580 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
581 dbfull()->TEST_WaitForCompact();
582 ColumnFamilyMetaData cf_meta;
583 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
584 std::vector<std::string> compaction_input_file_names;
585 for (auto file : cf_meta.levels[0].files) {
586 if (rnd.OneIn(2)) {
587 compaction_input_file_names.push_back(file.name);
588 }
589 }
590
591 if (compaction_input_file_names.size() == 0) {
592 compaction_input_file_names.push_back(
593 cf_meta.levels[0].files[0].name);
594 }
595
596 // expect fail since universal compaction only allow L0 output
597 ASSERT_FALSE(dbfull()
598 ->CompactFiles(CompactionOptions(), handles_[1],
599 compaction_input_file_names, 1)
600 .ok());
601
602 // expect ok and verify the compacted files no longer exist.
603 ASSERT_OK(dbfull()->CompactFiles(
604 CompactionOptions(), handles_[1],
605 compaction_input_file_names, 0));
606
607 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
608 VerifyCompactionResult(
609 cf_meta,
610 std::set<std::string>(compaction_input_file_names.begin(),
611 compaction_input_file_names.end()));
612
613 compaction_input_file_names.clear();
614
615 // Pick the first and the last file, expect everything is
616 // compacted into one single file.
617 compaction_input_file_names.push_back(
618 cf_meta.levels[0].files[0].name);
619 compaction_input_file_names.push_back(
620 cf_meta.levels[0].files[
621 cf_meta.levels[0].files.size() - 1].name);
622 ASSERT_OK(dbfull()->CompactFiles(
623 CompactionOptions(), handles_[1],
624 compaction_input_file_names, 0));
625
626 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
627 ASSERT_EQ(cf_meta.levels[0].files.size(), 1U);
628 }
629
630 TEST_P(DBTestUniversalCompaction, UniversalCompactionTargetLevel) {
631 Options options = CurrentOptions();
632 options.compaction_style = kCompactionStyleUniversal;
633 options.write_buffer_size = 100 << 10; // 100KB
634 options.num_levels = 7;
635 options.disable_auto_compactions = true;
636 DestroyAndReopen(options);
637
638 // Generate 3 overlapping files
639 Random rnd(301);
640 for (int i = 0; i < 210; i++) {
641 ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
642 }
643 ASSERT_OK(Flush());
644
645 for (int i = 200; i < 300; i++) {
646 ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
647 }
648 ASSERT_OK(Flush());
649
650 for (int i = 250; i < 260; i++) {
651 ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
652 }
653 ASSERT_OK(Flush());
654
655 ASSERT_EQ("3", FilesPerLevel(0));
656 // Compact all files into 1 file and put it in L4
657 CompactRangeOptions compact_options;
658 compact_options.change_level = true;
659 compact_options.target_level = 4;
660 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
661 db_->CompactRange(compact_options, nullptr, nullptr);
662 ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
663 }
664
665 #ifndef ROCKSDB_VALGRIND_RUN
666 class DBTestUniversalCompactionMultiLevels
667 : public DBTestUniversalCompactionBase {
668 public:
669 DBTestUniversalCompactionMultiLevels() :
670 DBTestUniversalCompactionBase(
671 "/db_universal_compaction_multi_levels_test") {}
672 };
673
674 TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionMultiLevels) {
675 Options options = CurrentOptions();
676 options.compaction_style = kCompactionStyleUniversal;
677 options.num_levels = num_levels_;
678 options.write_buffer_size = 100 << 10; // 100KB
679 options.level0_file_num_compaction_trigger = 8;
680 options.max_background_compactions = 3;
681 options.target_file_size_base = 32 * 1024;
682 CreateAndReopenWithCF({"pikachu"}, options);
683
684 // Trigger compaction if size amplification exceeds 110%
685 options.compaction_options_universal.max_size_amplification_percent = 110;
686 options = CurrentOptions(options);
687 ReopenWithColumnFamilies({"default", "pikachu"}, options);
688
689 Random rnd(301);
690 int num_keys = 100000;
691 for (int i = 0; i < num_keys * 2; i++) {
692 ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
693 }
694
695 dbfull()->TEST_WaitForCompact();
696
697 for (int i = num_keys; i < num_keys * 2; i++) {
698 ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
699 }
700 }
701
702 // Tests universal compaction with trivial move enabled
703 TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) {
704 int32_t trivial_move = 0;
705 int32_t non_trivial_move = 0;
706 rocksdb::SyncPoint::GetInstance()->SetCallBack(
707 "DBImpl::BackgroundCompaction:TrivialMove",
708 [&](void* /*arg*/) { trivial_move++; });
709 rocksdb::SyncPoint::GetInstance()->SetCallBack(
710 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
711 non_trivial_move++;
712 ASSERT_TRUE(arg != nullptr);
713 int output_level = *(static_cast<int*>(arg));
714 ASSERT_EQ(output_level, 0);
715 });
716 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
717
718 Options options = CurrentOptions();
719 options.compaction_style = kCompactionStyleUniversal;
720 options.compaction_options_universal.allow_trivial_move = true;
721 options.num_levels = 3;
722 options.write_buffer_size = 100 << 10; // 100KB
723 options.level0_file_num_compaction_trigger = 3;
724 options.max_background_compactions = 2;
725 options.target_file_size_base = 32 * 1024;
726 DestroyAndReopen(options);
727 CreateAndReopenWithCF({"pikachu"}, options);
728
729 // Trigger compaction if size amplification exceeds 110%
730 options.compaction_options_universal.max_size_amplification_percent = 110;
731 options = CurrentOptions(options);
732 ReopenWithColumnFamilies({"default", "pikachu"}, options);
733
734 Random rnd(301);
735 int num_keys = 150000;
736 for (int i = 0; i < num_keys; i++) {
737 ASSERT_OK(Put(1, Key(i), Key(i)));
738 }
739 std::vector<std::string> values;
740
741 ASSERT_OK(Flush(1));
742 dbfull()->TEST_WaitForCompact();
743
744 ASSERT_GT(trivial_move, 0);
745 ASSERT_GT(non_trivial_move, 0);
746
747 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
748 }
749
750 INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels,
751 DBTestUniversalCompactionMultiLevels,
752 ::testing::Combine(::testing::Values(3, 20),
753 ::testing::Bool()));
754
755 class DBTestUniversalCompactionParallel :
756 public DBTestUniversalCompactionBase {
757 public:
758 DBTestUniversalCompactionParallel() :
759 DBTestUniversalCompactionBase(
760 "/db_universal_compaction_prallel_test") {}
761 };
762
763 TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) {
764 Options options = CurrentOptions();
765 options.compaction_style = kCompactionStyleUniversal;
766 options.num_levels = num_levels_;
767 options.write_buffer_size = 1 << 10; // 1KB
768 options.level0_file_num_compaction_trigger = 3;
769 options.max_background_compactions = 3;
770 options.max_background_flushes = 3;
771 options.target_file_size_base = 1 * 1024;
772 options.compaction_options_universal.max_size_amplification_percent = 110;
773 DestroyAndReopen(options);
774 CreateAndReopenWithCF({"pikachu"}, options);
775
776 // Delay every compaction so multiple compactions will happen.
777 std::atomic<int> num_compactions_running(0);
778 std::atomic<bool> has_parallel(false);
779 rocksdb::SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Start",
780 [&](void* /*arg*/) {
781 if (num_compactions_running.fetch_add(1) > 0) {
782 has_parallel.store(true);
783 return;
784 }
785 for (int nwait = 0; nwait < 20000; nwait++) {
786 if (has_parallel.load() || num_compactions_running.load() > 1) {
787 has_parallel.store(true);
788 break;
789 }
790 env_->SleepForMicroseconds(1000);
791 }
792 });
793 rocksdb::SyncPoint::GetInstance()->SetCallBack(
794 "CompactionJob::Run():End",
795 [&](void* /*arg*/) { num_compactions_running.fetch_add(-1); });
796 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
797
798 options = CurrentOptions(options);
799 ReopenWithColumnFamilies({"default", "pikachu"}, options);
800
801 Random rnd(301);
802 int num_keys = 30000;
803 for (int i = 0; i < num_keys * 2; i++) {
804 ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
805 }
806 dbfull()->TEST_WaitForCompact();
807
808 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
809 ASSERT_EQ(num_compactions_running.load(), 0);
810 ASSERT_TRUE(has_parallel.load());
811
812 for (int i = num_keys; i < num_keys * 2; i++) {
813 ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
814 }
815
816 // Reopen and check.
817 ReopenWithColumnFamilies({"default", "pikachu"}, options);
818 for (int i = num_keys; i < num_keys * 2; i++) {
819 ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
820 }
821 }
822
823 TEST_P(DBTestUniversalCompactionParallel, PickByFileNumberBug) {
824 Options options = CurrentOptions();
825 options.compaction_style = kCompactionStyleUniversal;
826 options.num_levels = num_levels_;
827 options.write_buffer_size = 1 * 1024; // 1KB
828 options.level0_file_num_compaction_trigger = 7;
829 options.max_background_compactions = 2;
830 options.target_file_size_base = 1024 * 1024; // 1MB
831
832 // Disable size amplifiction compaction
833 options.compaction_options_universal.max_size_amplification_percent =
834 UINT_MAX;
835 DestroyAndReopen(options);
836
837 rocksdb::SyncPoint::GetInstance()->LoadDependency(
838 {{"DBTestUniversalCompactionParallel::PickByFileNumberBug:0",
839 "BackgroundCallCompaction:0"},
840 {"UniversalCompactionPicker::PickCompaction:Return",
841 "DBTestUniversalCompactionParallel::PickByFileNumberBug:1"},
842 {"DBTestUniversalCompactionParallel::PickByFileNumberBug:2",
843 "CompactionJob::Run():Start"}});
844
845 int total_picked_compactions = 0;
846 rocksdb::SyncPoint::GetInstance()->SetCallBack(
847 "UniversalCompactionPicker::PickCompaction:Return", [&](void* arg) {
848 if (arg) {
849 total_picked_compactions++;
850 }
851 });
852
853 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
854
855 // Write 7 files to trigger compaction
856 int key_idx = 1;
857 for (int i = 1; i <= 70; i++) {
858 std::string k = Key(key_idx++);
859 ASSERT_OK(Put(k, k));
860 if (i % 10 == 0) {
861 ASSERT_OK(Flush());
862 }
863 }
864
865 // Wait for the 1st background compaction process to start
866 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
867 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
868 rocksdb::SyncPoint::GetInstance()->ClearTrace();
869
870 // Write 3 files while 1st compaction is held
871 // These 3 files have different sizes to avoid compacting based on size_ratio
872 int num_keys = 1000;
873 for (int i = 0; i < 3; i++) {
874 for (int j = 1; j <= num_keys; j++) {
875 std::string k = Key(key_idx++);
876 ASSERT_OK(Put(k, k));
877 }
878 ASSERT_OK(Flush());
879 num_keys -= 100;
880 }
881
882 // Hold the 1st compaction from finishing
883 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
884 dbfull()->TEST_WaitForCompact();
885
886 // There should only be one picked compaction as the score drops below one
887 // after the first one is picked.
888 EXPECT_EQ(total_picked_compactions, 1);
889 EXPECT_EQ(TotalTableFiles(), 4);
890
891 // Stop SyncPoint and destroy the DB and reopen it again
892 rocksdb::SyncPoint::GetInstance()->ClearTrace();
893 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
894 key_idx = 1;
895 total_picked_compactions = 0;
896 DestroyAndReopen(options);
897
898 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
899
900 // Write 7 files to trigger compaction
901 for (int i = 1; i <= 70; i++) {
902 std::string k = Key(key_idx++);
903 ASSERT_OK(Put(k, k));
904 if (i % 10 == 0) {
905 ASSERT_OK(Flush());
906 }
907 }
908
909 // Wait for the 1st background compaction process to start
910 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
911 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
912 rocksdb::SyncPoint::GetInstance()->ClearTrace();
913
914 // Write 8 files while 1st compaction is held
915 // These 8 files have different sizes to avoid compacting based on size_ratio
916 num_keys = 1000;
917 for (int i = 0; i < 8; i++) {
918 for (int j = 1; j <= num_keys; j++) {
919 std::string k = Key(key_idx++);
920 ASSERT_OK(Put(k, k));
921 }
922 ASSERT_OK(Flush());
923 num_keys -= 100;
924 }
925
926 // Wait for the 2nd background compaction process to start
927 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
928 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
929
930 // Hold the 1st and 2nd compaction from finishing
931 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
932 dbfull()->TEST_WaitForCompact();
933
934 // This time we will trigger a compaction because of size ratio and
935 // another compaction because of number of files that are not compacted
936 // greater than 7
937 EXPECT_GE(total_picked_compactions, 2);
938 }
939
940 INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel,
941 DBTestUniversalCompactionParallel,
942 ::testing::Combine(::testing::Values(1, 10),
943 ::testing::Values(false)));
944 #endif // ROCKSDB_VALGRIND_RUN
945
946 TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) {
947 Options options = CurrentOptions();
948 options.compaction_style = kCompactionStyleUniversal;
949 options.write_buffer_size = 105 << 10; // 105KB
950 options.arena_block_size = 4 << 10; // 4KB
951 options.target_file_size_base = 32 << 10; // 32KB
952 options.level0_file_num_compaction_trigger = 4;
953 options.num_levels = num_levels_;
954 options.compaction_options_universal.compression_size_percent = -1;
955 DestroyAndReopen(options);
956 CreateAndReopenWithCF({"pikachu"}, options);
957
958 Random rnd(301);
959 int key_idx = 0;
960
961 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
962 // Write 100KB (100 values, each 1K)
963 for (int i = 0; i < 100; i++) {
964 ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 990)));
965 key_idx++;
966 }
967 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
968
969 if (num < options.level0_file_num_compaction_trigger - 1) {
970 ASSERT_EQ(NumSortedRuns(1), num + 1);
971 }
972 }
973
974 dbfull()->TEST_WaitForCompact();
975 ASSERT_EQ(NumSortedRuns(1), 1);
976 }
977
978 TEST_P(DBTestUniversalCompaction, UniversalCompactionStopStyleSimilarSize) {
979 Options options = CurrentOptions();
980 options.compaction_style = kCompactionStyleUniversal;
981 options.write_buffer_size = 105 << 10; // 105KB
982 options.arena_block_size = 4 << 10; // 4KB
983 options.target_file_size_base = 32 << 10; // 32KB
984 // trigger compaction if there are >= 4 files
985 options.level0_file_num_compaction_trigger = 4;
986 options.compaction_options_universal.size_ratio = 10;
987 options.compaction_options_universal.stop_style =
988 kCompactionStopStyleSimilarSize;
989 options.num_levels = num_levels_;
990 DestroyAndReopen(options);
991
992 Random rnd(301);
993 int key_idx = 0;
994
995 // Stage 1:
996 // Generate a set of files at level 0, but don't trigger level-0
997 // compaction.
998 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
999 num++) {
1000 // Write 100KB (100 values, each 1K)
1001 for (int i = 0; i < 100; i++) {
1002 ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
1003 key_idx++;
1004 }
1005 dbfull()->TEST_WaitForFlushMemTable();
1006 ASSERT_EQ(NumSortedRuns(), num + 1);
1007 }
1008
1009 // Generate one more file at level-0, which should trigger level-0
1010 // compaction.
1011 for (int i = 0; i < 100; i++) {
1012 ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
1013 key_idx++;
1014 }
1015 dbfull()->TEST_WaitForCompact();
1016 // Suppose each file flushed from mem table has size 1. Now we compact
1017 // (level0_file_num_compaction_trigger+1)=4 files and should have a big
1018 // file of size 4.
1019 ASSERT_EQ(NumSortedRuns(), 1);
1020
1021 // Stage 2:
1022 // Now we have one file at level 0, with size 4. We also have some data in
1023 // mem table. Let's continue generating new files at level 0, but don't
1024 // trigger level-0 compaction.
1025 // First, clean up memtable before inserting new data. This will generate
1026 // a level-0 file, with size around 0.4 (according to previously written
1027 // data amount).
1028 dbfull()->Flush(FlushOptions());
1029 for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
1030 num++) {
1031 // Write 110KB (11 values, each 10K)
1032 for (int i = 0; i < 100; i++) {
1033 ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
1034 key_idx++;
1035 }
1036 dbfull()->TEST_WaitForFlushMemTable();
1037 ASSERT_EQ(NumSortedRuns(), num + 3);
1038 }
1039
1040 // Generate one more file at level-0, which should trigger level-0
1041 // compaction.
1042 for (int i = 0; i < 100; i++) {
1043 ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
1044 key_idx++;
1045 }
1046 dbfull()->TEST_WaitForCompact();
1047 // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
1048 // After compaction, we should have 3 files, with size 4, 0.4, 2.
1049 ASSERT_EQ(NumSortedRuns(), 3);
1050 // Stage 3:
1051 // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one
1052 // more file at level-0, which should trigger level-0 compaction.
1053 for (int i = 0; i < 100; i++) {
1054 ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
1055 key_idx++;
1056 }
1057 dbfull()->TEST_WaitForCompact();
1058 // Level-0 compaction is triggered, but no file will be picked up.
1059 ASSERT_EQ(NumSortedRuns(), 4);
1060 }
1061
1062 TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio1) {
1063 if (!Snappy_Supported()) {
1064 return;
1065 }
1066
1067 Options options = CurrentOptions();
1068 options.compaction_style = kCompactionStyleUniversal;
1069 options.write_buffer_size = 100 << 10; // 100KB
1070 options.target_file_size_base = 32 << 10; // 32KB
1071 options.level0_file_num_compaction_trigger = 2;
1072 options.num_levels = num_levels_;
1073 options.compaction_options_universal.compression_size_percent = 70;
1074 DestroyAndReopen(options);
1075
1076 Random rnd(301);
1077 int key_idx = 0;
1078
1079 // The first compaction (2) is compressed.
1080 for (int num = 0; num < 2; num++) {
1081 // Write 110KB (11 values, each 10K)
1082 for (int i = 0; i < 11; i++) {
1083 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1084 key_idx++;
1085 }
1086 dbfull()->TEST_WaitForFlushMemTable();
1087 dbfull()->TEST_WaitForCompact();
1088 }
1089 ASSERT_LT(TotalSize(), 110000U * 2 * 0.9);
1090
1091 // The second compaction (4) is compressed
1092 for (int num = 0; num < 2; num++) {
1093 // Write 110KB (11 values, each 10K)
1094 for (int i = 0; i < 11; i++) {
1095 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1096 key_idx++;
1097 }
1098 dbfull()->TEST_WaitForFlushMemTable();
1099 dbfull()->TEST_WaitForCompact();
1100 }
1101 ASSERT_LT(TotalSize(), 110000 * 4 * 0.9);
1102
1103 // The third compaction (2 4) is compressed since this time it is
1104 // (1 1 3.2) and 3.2/5.2 doesn't reach ratio.
1105 for (int num = 0; num < 2; num++) {
1106 // Write 110KB (11 values, each 10K)
1107 for (int i = 0; i < 11; i++) {
1108 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1109 key_idx++;
1110 }
1111 dbfull()->TEST_WaitForFlushMemTable();
1112 dbfull()->TEST_WaitForCompact();
1113 }
1114 ASSERT_LT(TotalSize(), 110000 * 6 * 0.9);
1115
1116 // When we start for the compaction up to (2 4 8), the latest
1117 // compressed is not compressed.
1118 for (int num = 0; num < 8; num++) {
1119 // Write 110KB (11 values, each 10K)
1120 for (int i = 0; i < 11; i++) {
1121 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1122 key_idx++;
1123 }
1124 dbfull()->TEST_WaitForFlushMemTable();
1125 dbfull()->TEST_WaitForCompact();
1126 }
1127 ASSERT_GT(TotalSize(), 110000 * 11 * 0.8 + 110000 * 2);
1128 }
1129
1130 TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio2) {
1131 if (!Snappy_Supported()) {
1132 return;
1133 }
1134 Options options = CurrentOptions();
1135 options.compaction_style = kCompactionStyleUniversal;
1136 options.write_buffer_size = 100 << 10; // 100KB
1137 options.target_file_size_base = 32 << 10; // 32KB
1138 options.level0_file_num_compaction_trigger = 2;
1139 options.num_levels = num_levels_;
1140 options.compaction_options_universal.compression_size_percent = 95;
1141 DestroyAndReopen(options);
1142
1143 Random rnd(301);
1144 int key_idx = 0;
1145
1146 // When we start for the compaction up to (2 4 8), the latest
1147 // compressed is compressed given the size ratio to compress.
1148 for (int num = 0; num < 14; num++) {
1149 // Write 120KB (12 values, each 10K)
1150 for (int i = 0; i < 12; i++) {
1151 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1152 key_idx++;
1153 }
1154 dbfull()->TEST_WaitForFlushMemTable();
1155 dbfull()->TEST_WaitForCompact();
1156 }
1157 ASSERT_LT(TotalSize(), 120000U * 12 * 0.8 + 120000 * 2);
1158 }
1159
1160 #ifndef ROCKSDB_VALGRIND_RUN
1161 // Test that checks trivial move in universal compaction
1162 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest1) {
1163 int32_t trivial_move = 0;
1164 int32_t non_trivial_move = 0;
1165 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1166 "DBImpl::BackgroundCompaction:TrivialMove",
1167 [&](void* /*arg*/) { trivial_move++; });
1168 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1169 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
1170 non_trivial_move++;
1171 ASSERT_TRUE(arg != nullptr);
1172 int output_level = *(static_cast<int*>(arg));
1173 ASSERT_EQ(output_level, 0);
1174 });
1175 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1176
1177 Options options = CurrentOptions();
1178 options.compaction_style = kCompactionStyleUniversal;
1179 options.compaction_options_universal.allow_trivial_move = true;
1180 options.num_levels = 2;
1181 options.write_buffer_size = 100 << 10; // 100KB
1182 options.level0_file_num_compaction_trigger = 3;
1183 options.max_background_compactions = 1;
1184 options.target_file_size_base = 32 * 1024;
1185 DestroyAndReopen(options);
1186 CreateAndReopenWithCF({"pikachu"}, options);
1187
1188 // Trigger compaction if size amplification exceeds 110%
1189 options.compaction_options_universal.max_size_amplification_percent = 110;
1190 options = CurrentOptions(options);
1191 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1192
1193 Random rnd(301);
1194 int num_keys = 250000;
1195 for (int i = 0; i < num_keys; i++) {
1196 ASSERT_OK(Put(1, Key(i), Key(i)));
1197 }
1198 std::vector<std::string> values;
1199
1200 ASSERT_OK(Flush(1));
1201 dbfull()->TEST_WaitForCompact();
1202
1203 ASSERT_GT(trivial_move, 0);
1204 ASSERT_GT(non_trivial_move, 0);
1205
1206 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1207 }
1208 // Test that checks trivial move in universal compaction
1209 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest2) {
1210 int32_t trivial_move = 0;
1211 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1212 "DBImpl::BackgroundCompaction:TrivialMove",
1213 [&](void* /*arg*/) { trivial_move++; });
1214 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1215 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
1216 ASSERT_TRUE(arg != nullptr);
1217 int output_level = *(static_cast<int*>(arg));
1218 ASSERT_EQ(output_level, 0);
1219 });
1220
1221 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1222
1223 Options options = CurrentOptions();
1224 options.compaction_style = kCompactionStyleUniversal;
1225 options.compaction_options_universal.allow_trivial_move = true;
1226 options.num_levels = 15;
1227 options.write_buffer_size = 100 << 10; // 100KB
1228 options.level0_file_num_compaction_trigger = 8;
1229 options.max_background_compactions = 2;
1230 options.target_file_size_base = 64 * 1024;
1231 DestroyAndReopen(options);
1232 CreateAndReopenWithCF({"pikachu"}, options);
1233
1234 // Trigger compaction if size amplification exceeds 110%
1235 options.compaction_options_universal.max_size_amplification_percent = 110;
1236 options = CurrentOptions(options);
1237 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1238
1239 Random rnd(301);
1240 int num_keys = 500000;
1241 for (int i = 0; i < num_keys; i++) {
1242 ASSERT_OK(Put(1, Key(i), Key(i)));
1243 }
1244 std::vector<std::string> values;
1245
1246 ASSERT_OK(Flush(1));
1247 dbfull()->TEST_WaitForCompact();
1248
1249 ASSERT_GT(trivial_move, 0);
1250
1251 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1252 }
1253 #endif // ROCKSDB_VALGRIND_RUN
1254
1255 TEST_P(DBTestUniversalCompaction, UniversalCompactionFourPaths) {
1256 Options options = CurrentOptions();
1257 options.db_paths.emplace_back(dbname_, 300 * 1024);
1258 options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024);
1259 options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024);
1260 options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024);
1261 options.memtable_factory.reset(
1262 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1263 options.compaction_style = kCompactionStyleUniversal;
1264 options.compaction_options_universal.size_ratio = 5;
1265 options.write_buffer_size = 111 << 10; // 114KB
1266 options.arena_block_size = 4 << 10;
1267 options.level0_file_num_compaction_trigger = 2;
1268 options.num_levels = 1;
1269
1270 std::vector<std::string> filenames;
1271 env_->GetChildren(options.db_paths[1].path, &filenames);
1272 // Delete archival files.
1273 for (size_t i = 0; i < filenames.size(); ++i) {
1274 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1275 }
1276 env_->DeleteDir(options.db_paths[1].path);
1277 Reopen(options);
1278
1279 Random rnd(301);
1280 int key_idx = 0;
1281
1282 // First three 110KB files are not going to second path.
1283 // After that, (100K, 200K)
1284 for (int num = 0; num < 3; num++) {
1285 GenerateNewFile(&rnd, &key_idx);
1286 }
1287
1288 // Another 110KB triggers a compaction to 400K file to second path
1289 GenerateNewFile(&rnd, &key_idx);
1290 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1291
1292 // (1, 4)
1293 GenerateNewFile(&rnd, &key_idx);
1294 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1295 ASSERT_EQ(1, GetSstFileCount(dbname_));
1296
1297 // (1,1,4) -> (2, 4)
1298 GenerateNewFile(&rnd, &key_idx);
1299 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1300 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1301 ASSERT_EQ(0, GetSstFileCount(dbname_));
1302
1303 // (1, 2, 4) -> (3, 4)
1304 GenerateNewFile(&rnd, &key_idx);
1305 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1306 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1307 ASSERT_EQ(0, GetSstFileCount(dbname_));
1308
1309 // (1, 3, 4) -> (8)
1310 GenerateNewFile(&rnd, &key_idx);
1311 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1312
1313 // (1, 8)
1314 GenerateNewFile(&rnd, &key_idx);
1315 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1316 ASSERT_EQ(1, GetSstFileCount(dbname_));
1317
1318 // (1, 1, 8) -> (2, 8)
1319 GenerateNewFile(&rnd, &key_idx);
1320 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1321 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1322
1323 // (1, 2, 8) -> (3, 8)
1324 GenerateNewFile(&rnd, &key_idx);
1325 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1326 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1327 ASSERT_EQ(0, GetSstFileCount(dbname_));
1328
1329 // (1, 3, 8) -> (4, 8)
1330 GenerateNewFile(&rnd, &key_idx);
1331 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1332 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1333
1334 // (1, 4, 8) -> (5, 8)
1335 GenerateNewFile(&rnd, &key_idx);
1336 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1337 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1338 ASSERT_EQ(0, GetSstFileCount(dbname_));
1339
1340 for (int i = 0; i < key_idx; i++) {
1341 auto v = Get(Key(i));
1342 ASSERT_NE(v, "NOT_FOUND");
1343 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1344 }
1345
1346 Reopen(options);
1347
1348 for (int i = 0; i < key_idx; i++) {
1349 auto v = Get(Key(i));
1350 ASSERT_NE(v, "NOT_FOUND");
1351 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1352 }
1353
1354 Destroy(options);
1355 }
1356
1357 TEST_P(DBTestUniversalCompaction, UniversalCompactionCFPathUse) {
1358 Options options = CurrentOptions();
1359 options.db_paths.emplace_back(dbname_, 300 * 1024);
1360 options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024);
1361 options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024);
1362 options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024);
1363 options.memtable_factory.reset(
1364 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1365 options.compaction_style = kCompactionStyleUniversal;
1366 options.compaction_options_universal.size_ratio = 10;
1367 options.write_buffer_size = 111 << 10; // 114KB
1368 options.arena_block_size = 4 << 10;
1369 options.level0_file_num_compaction_trigger = 2;
1370 options.num_levels = 1;
1371
1372 std::vector<Options> option_vector;
1373 option_vector.emplace_back(options);
1374 ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
1375 // Configure CF1 specific paths.
1376 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 300 * 1024);
1377 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 300 * 1024);
1378 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 500 * 1024);
1379 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_4", 1024 * 1024 * 1024);
1380 option_vector.emplace_back(DBOptions(options), cf_opt1);
1381 CreateColumnFamilies({"one"},option_vector[1]);
1382
1383 // Configura CF2 specific paths.
1384 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 300 * 1024);
1385 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 300 * 1024);
1386 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 500 * 1024);
1387 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_4", 1024 * 1024 * 1024);
1388 option_vector.emplace_back(DBOptions(options), cf_opt2);
1389 CreateColumnFamilies({"two"},option_vector[2]);
1390
1391 ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
1392
1393 Random rnd(301);
1394 int key_idx = 0;
1395 int key_idx1 = 0;
1396 int key_idx2 = 0;
1397
1398 auto generate_file = [&]() {
1399 GenerateNewFile(0, &rnd, &key_idx);
1400 GenerateNewFile(1, &rnd, &key_idx1);
1401 GenerateNewFile(2, &rnd, &key_idx2);
1402 };
1403
1404 auto check_sstfilecount = [&](int path_id, int expected) {
1405 ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
1406 ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
1407 ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
1408 };
1409
1410 auto check_getvalues = [&]() {
1411 for (int i = 0; i < key_idx; i++) {
1412 auto v = Get(0, Key(i));
1413 ASSERT_NE(v, "NOT_FOUND");
1414 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1415 }
1416
1417 for (int i = 0; i < key_idx1; i++) {
1418 auto v = Get(1, Key(i));
1419 ASSERT_NE(v, "NOT_FOUND");
1420 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1421 }
1422
1423 for (int i = 0; i < key_idx2; i++) {
1424 auto v = Get(2, Key(i));
1425 ASSERT_NE(v, "NOT_FOUND");
1426 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1427 }
1428 };
1429
1430 // First three 110KB files are not going to second path.
1431 // After that, (100K, 200K)
1432 for (int num = 0; num < 3; num++) {
1433 generate_file();
1434 }
1435
1436 // Another 110KB triggers a compaction to 400K file to second path
1437 generate_file();
1438 check_sstfilecount(2, 1);
1439
1440 // (1, 4)
1441 generate_file();
1442 check_sstfilecount(2, 1);
1443 check_sstfilecount(0, 1);
1444
1445 // (1,1,4) -> (2, 4)
1446 generate_file();
1447 check_sstfilecount(2, 1);
1448 check_sstfilecount(1, 1);
1449 check_sstfilecount(0, 0);
1450
1451 // (1, 2, 4) -> (3, 4)
1452 generate_file();
1453 check_sstfilecount(2, 1);
1454 check_sstfilecount(1, 1);
1455 check_sstfilecount(0, 0);
1456
1457 // (1, 3, 4) -> (8)
1458 generate_file();
1459 check_sstfilecount(3, 1);
1460
1461 // (1, 8)
1462 generate_file();
1463 check_sstfilecount(3, 1);
1464 check_sstfilecount(0, 1);
1465
1466 // (1, 1, 8) -> (2, 8)
1467 generate_file();
1468 check_sstfilecount(3, 1);
1469 check_sstfilecount(1, 1);
1470
1471 // (1, 2, 8) -> (3, 8)
1472 generate_file();
1473 check_sstfilecount(3, 1);
1474 check_sstfilecount(1, 1);
1475 check_sstfilecount(0, 0);
1476
1477 // (1, 3, 8) -> (4, 8)
1478 generate_file();
1479 check_sstfilecount(2, 1);
1480 check_sstfilecount(3, 1);
1481
1482 // (1, 4, 8) -> (5, 8)
1483 generate_file();
1484 check_sstfilecount(3, 1);
1485 check_sstfilecount(2, 1);
1486 check_sstfilecount(0, 0);
1487
1488 check_getvalues();
1489
1490 ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
1491
1492 check_getvalues();
1493
1494 Destroy(options, true);
1495 }
1496
1497 TEST_P(DBTestUniversalCompaction, IncreaseUniversalCompactionNumLevels) {
1498 std::function<void(int)> verify_func = [&](int num_keys_in_db) {
1499 std::string keys_in_db;
1500 Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
1501 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1502 keys_in_db.append(iter->key().ToString());
1503 keys_in_db.push_back(',');
1504 }
1505 delete iter;
1506
1507 std::string expected_keys;
1508 for (int i = 0; i <= num_keys_in_db; i++) {
1509 expected_keys.append(Key(i));
1510 expected_keys.push_back(',');
1511 }
1512
1513 ASSERT_EQ(keys_in_db, expected_keys);
1514 };
1515
1516 Random rnd(301);
1517 int max_key1 = 200;
1518 int max_key2 = 600;
1519 int max_key3 = 800;
1520 const int KNumKeysPerFile = 10;
1521
1522 // Stage 1: open a DB with universal compaction, num_levels=1
1523 Options options = CurrentOptions();
1524 options.compaction_style = kCompactionStyleUniversal;
1525 options.num_levels = 1;
1526 options.write_buffer_size = 200 << 10; // 200KB
1527 options.level0_file_num_compaction_trigger = 3;
1528 options.memtable_factory.reset(new SpecialSkipListFactory(KNumKeysPerFile));
1529 options = CurrentOptions(options);
1530 CreateAndReopenWithCF({"pikachu"}, options);
1531
1532 for (int i = 0; i <= max_key1; i++) {
1533 // each value is 10K
1534 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
1535 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1536 dbfull()->TEST_WaitForCompact();
1537 }
1538 ASSERT_OK(Flush(1));
1539 dbfull()->TEST_WaitForCompact();
1540
1541 // Stage 2: reopen with universal compaction, num_levels=4
1542 options.compaction_style = kCompactionStyleUniversal;
1543 options.num_levels = 4;
1544 options = CurrentOptions(options);
1545 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1546
1547 verify_func(max_key1);
1548
1549 // Insert more keys
1550 for (int i = max_key1 + 1; i <= max_key2; i++) {
1551 // each value is 10K
1552 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
1553 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1554 dbfull()->TEST_WaitForCompact();
1555 }
1556 ASSERT_OK(Flush(1));
1557 dbfull()->TEST_WaitForCompact();
1558
1559 verify_func(max_key2);
1560 // Compaction to non-L0 has happened.
1561 ASSERT_GT(NumTableFilesAtLevel(options.num_levels - 1, 1), 0);
1562
1563 // Stage 3: Revert it back to one level and revert to num_levels=1.
1564 options.num_levels = 4;
1565 options.target_file_size_base = INT_MAX;
1566 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1567 // Compact all to level 0
1568 CompactRangeOptions compact_options;
1569 compact_options.change_level = true;
1570 compact_options.target_level = 0;
1571 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1572 dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
1573 // Need to restart it once to remove higher level records in manifest.
1574 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1575 // Final reopen
1576 options.compaction_style = kCompactionStyleUniversal;
1577 options.num_levels = 1;
1578 options = CurrentOptions(options);
1579 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1580
1581 // Insert more keys
1582 for (int i = max_key2 + 1; i <= max_key3; i++) {
1583 // each value is 10K
1584 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
1585 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1586 dbfull()->TEST_WaitForCompact();
1587 }
1588 ASSERT_OK(Flush(1));
1589 dbfull()->TEST_WaitForCompact();
1590 verify_func(max_key3);
1591 }
1592
1593
1594 TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
1595 if (!Snappy_Supported()) {
1596 return;
1597 }
1598 Options options = CurrentOptions();
1599 options.db_paths.emplace_back(dbname_, 500 * 1024);
1600 options.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024);
1601 options.compaction_style = kCompactionStyleUniversal;
1602 options.compaction_options_universal.size_ratio = 5;
1603 options.write_buffer_size = 111 << 10; // 114KB
1604 options.arena_block_size = 4 << 10;
1605 options.level0_file_num_compaction_trigger = 2;
1606 options.num_levels = 1;
1607 options.memtable_factory.reset(
1608 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1609
1610 std::vector<std::string> filenames;
1611 env_->GetChildren(options.db_paths[1].path, &filenames);
1612 // Delete archival files.
1613 for (size_t i = 0; i < filenames.size(); ++i) {
1614 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1615 }
1616 env_->DeleteDir(options.db_paths[1].path);
1617 Reopen(options);
1618
1619 Random rnd(301);
1620 int key_idx = 0;
1621
1622 // First three 110KB files are not going to second path.
1623 // After that, (100K, 200K)
1624 for (int num = 0; num < 3; num++) {
1625 GenerateNewFile(&rnd, &key_idx);
1626 }
1627
1628 // Another 110KB triggers a compaction to 400K file to second path
1629 GenerateNewFile(&rnd, &key_idx);
1630 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1631
1632 // (1, 4)
1633 GenerateNewFile(&rnd, &key_idx);
1634 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1635 ASSERT_EQ(1, GetSstFileCount(dbname_));
1636
1637 // (1,1,4) -> (2, 4)
1638 GenerateNewFile(&rnd, &key_idx);
1639 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1640 ASSERT_EQ(1, GetSstFileCount(dbname_));
1641
1642 // (1, 2, 4) -> (3, 4)
1643 GenerateNewFile(&rnd, &key_idx);
1644 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1645 ASSERT_EQ(0, GetSstFileCount(dbname_));
1646
1647 // (1, 3, 4) -> (8)
1648 GenerateNewFile(&rnd, &key_idx);
1649 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1650 ASSERT_EQ(0, GetSstFileCount(dbname_));
1651
1652 // (1, 8)
1653 GenerateNewFile(&rnd, &key_idx);
1654 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1655 ASSERT_EQ(1, GetSstFileCount(dbname_));
1656
1657 // (1, 1, 8) -> (2, 8)
1658 GenerateNewFile(&rnd, &key_idx);
1659 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1660 ASSERT_EQ(1, GetSstFileCount(dbname_));
1661
1662 // (1, 2, 8) -> (3, 8)
1663 GenerateNewFile(&rnd, &key_idx);
1664 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1665 ASSERT_EQ(0, GetSstFileCount(dbname_));
1666
1667 // (1, 3, 8) -> (4, 8)
1668 GenerateNewFile(&rnd, &key_idx);
1669 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1670 ASSERT_EQ(0, GetSstFileCount(dbname_));
1671
1672 // (1, 4, 8) -> (5, 8)
1673 GenerateNewFile(&rnd, &key_idx);
1674 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1675 ASSERT_EQ(0, GetSstFileCount(dbname_));
1676
1677 for (int i = 0; i < key_idx; i++) {
1678 auto v = Get(Key(i));
1679 ASSERT_NE(v, "NOT_FOUND");
1680 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1681 }
1682
1683 Reopen(options);
1684
1685 for (int i = 0; i < key_idx; i++) {
1686 auto v = Get(Key(i));
1687 ASSERT_NE(v, "NOT_FOUND");
1688 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1689 }
1690
1691 Destroy(options);
1692 }
1693
1694 TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) {
1695 if (num_levels_ == 1) {
1696 // for single-level universal, everything's bottom level so nothing should
1697 // be executed in bottom-pri thread pool.
1698 return;
1699 }
1700 const int kNumFilesTrigger = 3;
1701 Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
1702 Options options = CurrentOptions();
1703 options.compaction_style = kCompactionStyleUniversal;
1704 options.num_levels = num_levels_;
1705 options.write_buffer_size = 100 << 10; // 100KB
1706 options.target_file_size_base = 32 << 10; // 32KB
1707 options.level0_file_num_compaction_trigger = kNumFilesTrigger;
1708 // Trigger compaction if size amplification exceeds 110%
1709 options.compaction_options_universal.max_size_amplification_percent = 110;
1710 DestroyAndReopen(options);
1711
1712 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1713 {// wait for the full compaction to be picked before adding files intended
1714 // for the second one.
1715 {"DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
1716 "DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0"},
1717 // the full (bottom-pri) compaction waits until a partial (low-pri)
1718 // compaction has started to verify they can run in parallel.
1719 {"DBImpl::BackgroundCompaction:NonTrivial",
1720 "DBImpl::BGWorkBottomCompaction"}});
1721 SyncPoint::GetInstance()->EnableProcessing();
1722
1723 Random rnd(301);
1724 for (int i = 0; i < 2; ++i) {
1725 for (int num = 0; num < kNumFilesTrigger; num++) {
1726 int key_idx = 0;
1727 GenerateNewFile(&rnd, &key_idx, true /* no_wait */);
1728 // use no_wait above because that one waits for flush and compaction. We
1729 // don't want to wait for compaction because the full compaction is
1730 // intentionally blocked while more files are flushed.
1731 dbfull()->TEST_WaitForFlushMemTable();
1732 }
1733 if (i == 0) {
1734 TEST_SYNC_POINT(
1735 "DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0");
1736 }
1737 }
1738 dbfull()->TEST_WaitForCompact();
1739
1740 // First compaction should output to bottom level. Second should output to L0
1741 // since older L0 files pending compaction prevent it from being placed lower.
1742 ASSERT_EQ(NumSortedRuns(), 2);
1743 ASSERT_GT(NumTableFilesAtLevel(0), 0);
1744 ASSERT_GT(NumTableFilesAtLevel(num_levels_ - 1), 0);
1745 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1746 Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
1747 }
1748
1749 TEST_P(DBTestUniversalCompaction, RecalculateScoreAfterPicking) {
1750 // Regression test for extra compactions scheduled. Once enough compactions
1751 // have been scheduled to bring the score below one, we should stop
1752 // scheduling more; otherwise, other CFs/DBs may be delayed unnecessarily.
1753 const int kNumFilesTrigger = 8;
1754 Options options = CurrentOptions();
1755 options.compaction_options_universal.max_merge_width = kNumFilesTrigger / 2;
1756 options.compaction_options_universal.max_size_amplification_percent =
1757 static_cast<unsigned int>(-1);
1758 options.compaction_style = kCompactionStyleUniversal;
1759 options.level0_file_num_compaction_trigger = kNumFilesTrigger;
1760 options.num_levels = num_levels_;
1761 options.write_buffer_size = 100 << 10; // 100KB
1762 Reopen(options);
1763
1764 std::atomic<int> num_compactions_attempted(0);
1765 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1766 "DBImpl::BackgroundCompaction:Start", [&](void* /*arg*/) {
1767 ++num_compactions_attempted;
1768 });
1769 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1770
1771 Random rnd(301);
1772 for (int num = 0; num < kNumFilesTrigger; num++) {
1773 ASSERT_EQ(NumSortedRuns(), num);
1774 int key_idx = 0;
1775 GenerateNewFile(&rnd, &key_idx);
1776 }
1777 dbfull()->TEST_WaitForCompact();
1778 // Compacting the first four files was enough to bring the score below one so
1779 // there's no need to schedule any more compactions.
1780 ASSERT_EQ(1, num_compactions_attempted);
1781 ASSERT_EQ(NumSortedRuns(), 5);
1782 }
1783
1784 TEST_P(DBTestUniversalCompaction, FinalSortedRunCompactFilesConflict) {
1785 // Regression test for conflict between:
1786 // (1) Running CompactFiles including file in the final sorted run; and
1787 // (2) Picking universal size-amp-triggered compaction, which always includes
1788 // the final sorted run.
1789 if (exclusive_manual_compaction_) {
1790 return;
1791 }
1792
1793 Options opts = CurrentOptions();
1794 opts.compaction_style = kCompactionStyleUniversal;
1795 opts.compaction_options_universal.max_size_amplification_percent = 50;
1796 opts.compaction_options_universal.min_merge_width = 2;
1797 opts.compression = kNoCompression;
1798 opts.level0_file_num_compaction_trigger = 2;
1799 opts.max_background_compactions = 2;
1800 opts.num_levels = num_levels_;
1801 Reopen(opts);
1802
1803 // make sure compaction jobs can be parallelized
1804 auto stop_token =
1805 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1806
1807 Put("key", "val");
1808 Flush();
1809 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
1810 ASSERT_EQ(NumTableFilesAtLevel(num_levels_ - 1), 1);
1811 ColumnFamilyMetaData cf_meta;
1812 ColumnFamilyHandle* default_cfh = db_->DefaultColumnFamily();
1813 dbfull()->GetColumnFamilyMetaData(default_cfh, &cf_meta);
1814 ASSERT_EQ(1, cf_meta.levels[num_levels_ - 1].files.size());
1815 std::string first_sst_filename =
1816 cf_meta.levels[num_levels_ - 1].files[0].name;
1817
1818 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1819 {{"CompactFilesImpl:0",
1820 "DBTestUniversalCompaction:FinalSortedRunCompactFilesConflict:0"},
1821 {"DBImpl::BackgroundCompaction():AfterPickCompaction",
1822 "CompactFilesImpl:1"}});
1823 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1824
1825 port::Thread compact_files_thread([&]() {
1826 ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), default_cfh,
1827 {first_sst_filename}, num_levels_ - 1));
1828 });
1829
1830 TEST_SYNC_POINT(
1831 "DBTestUniversalCompaction:FinalSortedRunCompactFilesConflict:0");
1832 for (int i = 0; i < 2; ++i) {
1833 Put("key", "val");
1834 Flush();
1835 }
1836 dbfull()->TEST_WaitForCompact();
1837
1838 compact_files_thread.join();
1839 }
1840
1841 INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction,
1842 ::testing::Combine(::testing::Values(1, 3, 5),
1843 ::testing::Bool()));
1844
1845 class DBTestUniversalManualCompactionOutputPathId
1846 : public DBTestUniversalCompactionBase {
1847 public:
1848 DBTestUniversalManualCompactionOutputPathId() :
1849 DBTestUniversalCompactionBase(
1850 "/db_universal_compaction_manual_pid_test") {}
1851 };
1852
1853 TEST_P(DBTestUniversalManualCompactionOutputPathId,
1854 ManualCompactionOutputPathId) {
1855 Options options = CurrentOptions();
1856 options.create_if_missing = true;
1857 options.db_paths.emplace_back(dbname_, 1000000000);
1858 options.db_paths.emplace_back(dbname_ + "_2", 1000000000);
1859 options.compaction_style = kCompactionStyleUniversal;
1860 options.num_levels = num_levels_;
1861 options.target_file_size_base = 1 << 30; // Big size
1862 options.level0_file_num_compaction_trigger = 10;
1863 Destroy(options);
1864 DestroyAndReopen(options);
1865 CreateAndReopenWithCF({"pikachu"}, options);
1866 MakeTables(3, "p", "q", 1);
1867 dbfull()->TEST_WaitForCompact();
1868 ASSERT_EQ(2, TotalLiveFiles(1));
1869 ASSERT_EQ(2, GetSstFileCount(options.db_paths[0].path));
1870 ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
1871
1872 // Full compaction to DB path 0
1873 CompactRangeOptions compact_options;
1874 compact_options.target_path_id = 1;
1875 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1876 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
1877 ASSERT_EQ(1, TotalLiveFiles(1));
1878 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
1879 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1880
1881 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
1882 ASSERT_EQ(1, TotalLiveFiles(1));
1883 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
1884 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1885
1886 MakeTables(1, "p", "q", 1);
1887 ASSERT_EQ(2, TotalLiveFiles(1));
1888 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1889 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1890
1891 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
1892 ASSERT_EQ(2, TotalLiveFiles(1));
1893 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1894 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1895
1896 // Full compaction to DB path 0
1897 compact_options.target_path_id = 0;
1898 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1899 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
1900 ASSERT_EQ(1, TotalLiveFiles(1));
1901 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1902 ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
1903
1904 // Fail when compacting to an invalid path ID
1905 compact_options.target_path_id = 2;
1906 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1907 ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
1908 .IsInvalidArgument());
1909 }
1910
1911 INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId,
1912 DBTestUniversalManualCompactionOutputPathId,
1913 ::testing::Combine(::testing::Values(1, 8),
1914 ::testing::Bool()));
1915
1916 TEST_F(DBTestUniversalDeleteTrigCompaction, BasicL0toL1) {
1917 const int kNumKeys = 3000;
1918 const int kWindowSize = 100;
1919 const int kNumDelsTrigger = 90;
1920
1921 Options opts = CurrentOptions();
1922 opts.table_properties_collector_factories.emplace_back(
1923 NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
1924 opts.compaction_style = kCompactionStyleUniversal;
1925 opts.level0_file_num_compaction_trigger = 2;
1926 opts.compression = kNoCompression;
1927 opts.compaction_options_universal.size_ratio = 10;
1928 opts.compaction_options_universal.min_merge_width = 2;
1929 opts.compaction_options_universal.max_size_amplification_percent = 200;
1930 Reopen(opts);
1931
1932 // add an L1 file to prevent tombstones from dropping due to obsolescence
1933 // during flush
1934 int i;
1935 for (i = 0; i < 2000; ++i) {
1936 Put(Key(i), "val");
1937 }
1938 Flush();
1939 // MoveFilesToLevel(6);
1940 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
1941
1942 for (i = 1999; i < kNumKeys; ++i) {
1943 if (i >= kNumKeys - kWindowSize &&
1944 i < kNumKeys - kWindowSize + kNumDelsTrigger) {
1945 Delete(Key(i));
1946 } else {
1947 Put(Key(i), "val");
1948 }
1949 }
1950 Flush();
1951
1952 dbfull()->TEST_WaitForCompact();
1953 ASSERT_EQ(0, NumTableFilesAtLevel(0));
1954 ASSERT_GT(NumTableFilesAtLevel(6), 0);
1955 }
1956
1957 TEST_F(DBTestUniversalDeleteTrigCompaction, SingleLevel) {
1958 const int kNumKeys = 3000;
1959 const int kWindowSize = 100;
1960 const int kNumDelsTrigger = 90;
1961
1962 Options opts = CurrentOptions();
1963 opts.table_properties_collector_factories.emplace_back(
1964 NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
1965 opts.compaction_style = kCompactionStyleUniversal;
1966 opts.level0_file_num_compaction_trigger = 2;
1967 opts.compression = kNoCompression;
1968 opts.num_levels = 1;
1969 opts.compaction_options_universal.size_ratio = 10;
1970 opts.compaction_options_universal.min_merge_width = 2;
1971 opts.compaction_options_universal.max_size_amplification_percent = 200;
1972 Reopen(opts);
1973
1974 // add an L1 file to prevent tombstones from dropping due to obsolescence
1975 // during flush
1976 int i;
1977 for (i = 0; i < 2000; ++i) {
1978 Put(Key(i), "val");
1979 }
1980 Flush();
1981
1982 for (i = 1999; i < kNumKeys; ++i) {
1983 if (i >= kNumKeys - kWindowSize &&
1984 i < kNumKeys - kWindowSize + kNumDelsTrigger) {
1985 Delete(Key(i));
1986 } else {
1987 Put(Key(i), "val");
1988 }
1989 }
1990 Flush();
1991
1992 dbfull()->TEST_WaitForCompact();
1993 ASSERT_EQ(1, NumTableFilesAtLevel(0));
1994 }
1995
1996 TEST_F(DBTestUniversalDeleteTrigCompaction, MultipleLevels) {
1997 const int kWindowSize = 100;
1998 const int kNumDelsTrigger = 90;
1999
2000 Options opts = CurrentOptions();
2001 opts.table_properties_collector_factories.emplace_back(
2002 NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
2003 opts.compaction_style = kCompactionStyleUniversal;
2004 opts.level0_file_num_compaction_trigger = 4;
2005 opts.compression = kNoCompression;
2006 opts.compaction_options_universal.size_ratio = 10;
2007 opts.compaction_options_universal.min_merge_width = 2;
2008 opts.compaction_options_universal.max_size_amplification_percent = 200;
2009 Reopen(opts);
2010
2011 // add an L1 file to prevent tombstones from dropping due to obsolescence
2012 // during flush
2013 int i;
2014 for (i = 0; i < 500; ++i) {
2015 Put(Key(i), "val");
2016 }
2017 Flush();
2018 for (i = 500; i < 1000; ++i) {
2019 Put(Key(i), "val");
2020 }
2021 Flush();
2022 for (i = 1000; i < 1500; ++i) {
2023 Put(Key(i), "val");
2024 }
2025 Flush();
2026 for (i = 1500; i < 2000; ++i) {
2027 Put(Key(i), "val");
2028 }
2029 Flush();
2030
2031 dbfull()->TEST_WaitForCompact();
2032 ASSERT_EQ(0, NumTableFilesAtLevel(0));
2033 ASSERT_GT(NumTableFilesAtLevel(6), 0);
2034
2035 for (i = 1999; i < 2333; ++i) {
2036 Put(Key(i), "val");
2037 }
2038 Flush();
2039 for (i = 2333; i < 2666; ++i) {
2040 Put(Key(i), "val");
2041 }
2042 Flush();
2043 for (i = 2666; i < 2999; ++i) {
2044 Put(Key(i), "val");
2045 }
2046 Flush();
2047
2048 dbfull()->TEST_WaitForCompact();
2049 ASSERT_EQ(0, NumTableFilesAtLevel(0));
2050 ASSERT_GT(NumTableFilesAtLevel(6), 0);
2051 ASSERT_GT(NumTableFilesAtLevel(5), 0);
2052
2053 for (i = 1900; i < 2100; ++i) {
2054 Delete(Key(i));
2055 }
2056 Flush();
2057
2058 dbfull()->TEST_WaitForCompact();
2059 ASSERT_EQ(0, NumTableFilesAtLevel(0));
2060 ASSERT_EQ(0, NumTableFilesAtLevel(1));
2061 ASSERT_EQ(0, NumTableFilesAtLevel(2));
2062 ASSERT_EQ(0, NumTableFilesAtLevel(3));
2063 ASSERT_EQ(0, NumTableFilesAtLevel(4));
2064 ASSERT_EQ(0, NumTableFilesAtLevel(5));
2065 ASSERT_GT(NumTableFilesAtLevel(6), 0);
2066 }
2067
2068 TEST_F(DBTestUniversalDeleteTrigCompaction, OverlappingL0) {
2069 const int kWindowSize = 100;
2070 const int kNumDelsTrigger = 90;
2071
2072 Options opts = CurrentOptions();
2073 opts.table_properties_collector_factories.emplace_back(
2074 NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
2075 opts.compaction_style = kCompactionStyleUniversal;
2076 opts.level0_file_num_compaction_trigger = 5;
2077 opts.compression = kNoCompression;
2078 opts.compaction_options_universal.size_ratio = 10;
2079 opts.compaction_options_universal.min_merge_width = 2;
2080 opts.compaction_options_universal.max_size_amplification_percent = 200;
2081 Reopen(opts);
2082
2083 // add an L1 file to prevent tombstones from dropping due to obsolescence
2084 // during flush
2085 int i;
2086 for (i = 0; i < 2000; ++i) {
2087 Put(Key(i), "val");
2088 }
2089 Flush();
2090 for (i = 2000; i < 3000; ++i) {
2091 Put(Key(i), "val");
2092 }
2093 Flush();
2094 for (i = 3500; i < 4000; ++i) {
2095 Put(Key(i), "val");
2096 }
2097 Flush();
2098 for (i = 2900; i < 3100; ++i) {
2099 Delete(Key(i));
2100 }
2101 Flush();
2102
2103 dbfull()->TEST_WaitForCompact();
2104 ASSERT_EQ(2, NumTableFilesAtLevel(0));
2105 ASSERT_GT(NumTableFilesAtLevel(6), 0);
2106 }
2107
2108 TEST_F(DBTestUniversalDeleteTrigCompaction, IngestBehind) {
2109 const int kNumKeys = 3000;
2110 const int kWindowSize = 100;
2111 const int kNumDelsTrigger = 90;
2112
2113 Options opts = CurrentOptions();
2114 opts.table_properties_collector_factories.emplace_back(
2115 NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
2116 opts.compaction_style = kCompactionStyleUniversal;
2117 opts.level0_file_num_compaction_trigger = 2;
2118 opts.compression = kNoCompression;
2119 opts.allow_ingest_behind = true;
2120 opts.compaction_options_universal.size_ratio = 10;
2121 opts.compaction_options_universal.min_merge_width = 2;
2122 opts.compaction_options_universal.max_size_amplification_percent = 200;
2123 Reopen(opts);
2124
2125 // add an L1 file to prevent tombstones from dropping due to obsolescence
2126 // during flush
2127 int i;
2128 for (i = 0; i < 2000; ++i) {
2129 Put(Key(i), "val");
2130 }
2131 Flush();
2132 // MoveFilesToLevel(6);
2133 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
2134
2135 for (i = 1999; i < kNumKeys; ++i) {
2136 if (i >= kNumKeys - kWindowSize &&
2137 i < kNumKeys - kWindowSize + kNumDelsTrigger) {
2138 Delete(Key(i));
2139 } else {
2140 Put(Key(i), "val");
2141 }
2142 }
2143 Flush();
2144
2145 dbfull()->TEST_WaitForCompact();
2146 ASSERT_EQ(0, NumTableFilesAtLevel(0));
2147 ASSERT_EQ(0, NumTableFilesAtLevel(6));
2148 ASSERT_GT(NumTableFilesAtLevel(5), 0);
2149 }
2150
2151 } // namespace rocksdb
2152
2153 #endif // !defined(ROCKSDB_LITE)
2154
2155 int main(int argc, char** argv) {
2156 #if !defined(ROCKSDB_LITE)
2157 rocksdb::port::InstallStackTraceHandler();
2158 ::testing::InitGoogleTest(&argc, argv);
2159 return RUN_ALL_TESTS();
2160 #else
2161 (void) argc;
2162 (void) argv;
2163 return 0;
2164 #endif
2165 }