]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_universal_compaction_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_universal_compaction_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12 #if !defined(ROCKSDB_LITE)
13 #include "rocksdb/utilities/table_properties_collectors.h"
14 #include "test_util/sync_point.h"
15 #include "util/random.h"
16
17 namespace ROCKSDB_NAMESPACE {
18
19 static std::string CompressibleString(Random* rnd, int len) {
20 std::string r;
21 test::CompressibleString(rnd, 0.8, len, &r);
22 return r;
23 }
24
25 class DBTestUniversalCompactionBase
26 : public DBTestBase,
27 public ::testing::WithParamInterface<std::tuple<int, bool>> {
28 public:
29 explicit DBTestUniversalCompactionBase(const std::string& path)
30 : DBTestBase(path, /*env_do_fsync=*/false) {}
31 void SetUp() override {
32 num_levels_ = std::get<0>(GetParam());
33 exclusive_manual_compaction_ = std::get<1>(GetParam());
34 }
35 int num_levels_;
36 bool exclusive_manual_compaction_;
37 };
38
39 class DBTestUniversalCompaction : public DBTestUniversalCompactionBase {
40 public:
41 DBTestUniversalCompaction() :
42 DBTestUniversalCompactionBase("/db_universal_compaction_test") {}
43 };
44
45 class DBTestUniversalCompaction2 : public DBTestBase {
46 public:
47 DBTestUniversalCompaction2()
48 : DBTestBase("/db_universal_compaction_test2", /*env_do_fsync=*/false) {}
49 };
50
51 namespace {
52 void VerifyCompactionResult(
53 const ColumnFamilyMetaData& cf_meta,
54 const std::set<std::string>& overlapping_file_numbers) {
55 #ifndef NDEBUG
56 for (auto& level : cf_meta.levels) {
57 for (auto& file : level.files) {
58 assert(overlapping_file_numbers.find(file.name) ==
59 overlapping_file_numbers.end());
60 }
61 }
62 #endif
63 }
64
65 class KeepFilter : public CompactionFilter {
66 public:
67 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
68 std::string* /*new_value*/,
69 bool* /*value_changed*/) const override {
70 return false;
71 }
72
73 const char* Name() const override { return "KeepFilter"; }
74 };
75
76 class KeepFilterFactory : public CompactionFilterFactory {
77 public:
78 explicit KeepFilterFactory(bool check_context = false)
79 : check_context_(check_context) {}
80
81 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
82 const CompactionFilter::Context& context) override {
83 if (check_context_) {
84 EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
85 EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
86 }
87 return std::unique_ptr<CompactionFilter>(new KeepFilter());
88 }
89
90 const char* Name() const override { return "KeepFilterFactory"; }
91 bool check_context_;
92 std::atomic_bool expect_full_compaction_;
93 std::atomic_bool expect_manual_compaction_;
94 };
95 } // namespace
96
97 // Make sure we don't trigger a problem if the trigger condtion is given
98 // to be 0, which is invalid.
99 TEST_P(DBTestUniversalCompaction, UniversalCompactionSingleSortedRun) {
100 Options options = CurrentOptions();
101
102 options.compaction_style = kCompactionStyleUniversal;
103 options.num_levels = num_levels_;
104 // Config universal compaction to always compact to one single sorted run.
105 options.level0_file_num_compaction_trigger = 0;
106 options.compaction_options_universal.size_ratio = 10;
107 options.compaction_options_universal.min_merge_width = 2;
108 options.compaction_options_universal.max_size_amplification_percent = 0;
109
110 options.write_buffer_size = 105 << 10; // 105KB
111 options.arena_block_size = 4 << 10;
112 options.target_file_size_base = 32 << 10; // 32KB
113 // trigger compaction if there are >= 4 files
114 KeepFilterFactory* filter = new KeepFilterFactory(true);
115 filter->expect_manual_compaction_.store(false);
116 options.compaction_filter_factory.reset(filter);
117
118 DestroyAndReopen(options);
119 ASSERT_EQ(1, db_->GetOptions().level0_file_num_compaction_trigger);
120
121 Random rnd(301);
122 int key_idx = 0;
123
124 filter->expect_full_compaction_.store(true);
125
126 for (int num = 0; num < 16; num++) {
127 // Write 100KB file. And immediately it should be compacted to one file.
128 GenerateNewFile(&rnd, &key_idx);
129 ASSERT_OK(dbfull()->TEST_WaitForCompact());
130 ASSERT_EQ(NumSortedRuns(0), 1);
131 }
132 ASSERT_OK(Put(Key(key_idx), ""));
133 ASSERT_OK(dbfull()->TEST_WaitForCompact());
134 ASSERT_EQ(NumSortedRuns(0), 1);
135 }
136
137 TEST_P(DBTestUniversalCompaction, OptimizeFiltersForHits) {
138 Options options = CurrentOptions();
139 options.compaction_style = kCompactionStyleUniversal;
140 options.compaction_options_universal.size_ratio = 5;
141 options.num_levels = num_levels_;
142 options.write_buffer_size = 105 << 10; // 105KB
143 options.arena_block_size = 4 << 10;
144 options.target_file_size_base = 32 << 10; // 32KB
145 // trigger compaction if there are >= 4 files
146 options.level0_file_num_compaction_trigger = 4;
147 BlockBasedTableOptions bbto;
148 bbto.cache_index_and_filter_blocks = true;
149 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
150 bbto.whole_key_filtering = true;
151 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
152 options.optimize_filters_for_hits = true;
153 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
154 options.memtable_factory.reset(new SpecialSkipListFactory(3));
155
156 DestroyAndReopen(options);
157
158 // block compaction from happening
159 env_->SetBackgroundThreads(1, Env::LOW);
160 test::SleepingBackgroundTask sleeping_task_low;
161 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
162 Env::Priority::LOW);
163
164 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
165 ASSERT_OK(Put(Key(num * 10), "val"));
166 if (num) {
167 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
168 }
169 ASSERT_OK(Put(Key(30 + num * 10), "val"));
170 ASSERT_OK(Put(Key(60 + num * 10), "val"));
171 }
172 ASSERT_OK(Put("", ""));
173 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
174
175 // Query set of non existing keys
176 for (int i = 5; i < 90; i += 10) {
177 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
178 }
179
180 // Make sure bloom filter is used at least once.
181 ASSERT_GT(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0);
182 auto prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
183
184 // Make sure bloom filter is used for all but the last L0 file when looking
185 // up a non-existent key that's in the range of all L0 files.
186 ASSERT_EQ(Get(Key(35)), "NOT_FOUND");
187 ASSERT_EQ(prev_counter + NumTableFilesAtLevel(0) - 1,
188 TestGetTickerCount(options, BLOOM_FILTER_USEFUL));
189 prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
190
191 // Unblock compaction and wait it for happening.
192 sleeping_task_low.WakeUp();
193 ASSERT_OK(dbfull()->TEST_WaitForCompact());
194
195 // The same queries will not trigger bloom filter
196 for (int i = 5; i < 90; i += 10) {
197 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
198 }
199 ASSERT_EQ(prev_counter, TestGetTickerCount(options, BLOOM_FILTER_USEFUL));
200 }
201
202 // TODO(kailiu) The tests on UniversalCompaction has some issues:
203 // 1. A lot of magic numbers ("11" or "12").
204 // 2. Made assumption on the memtable flush conditions, which may change from
205 // time to time.
206 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrigger) {
207 Options options;
208 options.compaction_style = kCompactionStyleUniversal;
209 options.compaction_options_universal.size_ratio = 5;
210 options.num_levels = num_levels_;
211 options.write_buffer_size = 105 << 10; // 105KB
212 options.arena_block_size = 4 << 10;
213 options.target_file_size_base = 32 << 10; // 32KB
214 // trigger compaction if there are >= 4 files
215 options.level0_file_num_compaction_trigger = 4;
216 KeepFilterFactory* filter = new KeepFilterFactory(true);
217 filter->expect_manual_compaction_.store(false);
218 options.compaction_filter_factory.reset(filter);
219
220 options = CurrentOptions(options);
221 DestroyAndReopen(options);
222 CreateAndReopenWithCF({"pikachu"}, options);
223
224 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
225 "DBTestWritableFile.GetPreallocationStatus", [&](void* arg) {
226 ASSERT_TRUE(arg != nullptr);
227 size_t preallocation_size = *(static_cast<size_t*>(arg));
228 if (num_levels_ > 3) {
229 ASSERT_LE(preallocation_size, options.target_file_size_base * 1.1);
230 }
231 });
232 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
233
234 Random rnd(301);
235 int key_idx = 0;
236
237 filter->expect_full_compaction_.store(true);
238 // Stage 1:
239 // Generate a set of files at level 0, but don't trigger level-0
240 // compaction.
241 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
242 num++) {
243 // Write 100KB
244 GenerateNewFile(1, &rnd, &key_idx);
245 }
246
247 // Generate one more file at level-0, which should trigger level-0
248 // compaction.
249 GenerateNewFile(1, &rnd, &key_idx);
250 // Suppose each file flushed from mem table has size 1. Now we compact
251 // (level0_file_num_compaction_trigger+1)=4 files and should have a big
252 // file of size 4.
253 ASSERT_EQ(NumSortedRuns(1), 1);
254
255 // Stage 2:
256 // Now we have one file at level 0, with size 4. We also have some data in
257 // mem table. Let's continue generating new files at level 0, but don't
258 // trigger level-0 compaction.
259 // First, clean up memtable before inserting new data. This will generate
260 // a level-0 file, with size around 0.4 (according to previously written
261 // data amount).
262 filter->expect_full_compaction_.store(false);
263 ASSERT_OK(Flush(1));
264 for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
265 num++) {
266 GenerateNewFile(1, &rnd, &key_idx);
267 ASSERT_EQ(NumSortedRuns(1), num + 3);
268 }
269
270 // Generate one more file at level-0, which should trigger level-0
271 // compaction.
272 GenerateNewFile(1, &rnd, &key_idx);
273 // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
274 // After compaction, we should have 2 files, with size 4, 2.4.
275 ASSERT_EQ(NumSortedRuns(1), 2);
276
277 // Stage 3:
278 // Now we have 2 files at level 0, with size 4 and 2.4. Continue
279 // generating new files at level 0.
280 for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
281 num++) {
282 GenerateNewFile(1, &rnd, &key_idx);
283 ASSERT_EQ(NumSortedRuns(1), num + 3);
284 }
285
286 // Generate one more file at level-0, which should trigger level-0
287 // compaction.
288 GenerateNewFile(1, &rnd, &key_idx);
289 // Before compaction, we have 4 files at level 0, with size 4, 2.4, 1, 1.
290 // After compaction, we should have 3 files, with size 4, 2.4, 2.
291 ASSERT_EQ(NumSortedRuns(1), 3);
292
293 // Stage 4:
294 // Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a
295 // new file of size 1.
296 GenerateNewFile(1, &rnd, &key_idx);
297 ASSERT_OK(dbfull()->TEST_WaitForCompact());
298 // Level-0 compaction is triggered, but no file will be picked up.
299 ASSERT_EQ(NumSortedRuns(1), 4);
300
301 // Stage 5:
302 // Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
303 // a new file of size 1.
304 filter->expect_full_compaction_.store(true);
305 GenerateNewFile(1, &rnd, &key_idx);
306 ASSERT_OK(dbfull()->TEST_WaitForCompact());
307 // All files at level 0 will be compacted into a single one.
308 ASSERT_EQ(NumSortedRuns(1), 1);
309
310 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
311 }
312
313 TEST_P(DBTestUniversalCompaction, UniversalCompactionSizeAmplification) {
314 Options options = CurrentOptions();
315 options.compaction_style = kCompactionStyleUniversal;
316 options.num_levels = num_levels_;
317 options.write_buffer_size = 100 << 10; // 100KB
318 options.target_file_size_base = 32 << 10; // 32KB
319 options.level0_file_num_compaction_trigger = 3;
320 DestroyAndReopen(options);
321 CreateAndReopenWithCF({"pikachu"}, options);
322
323 // Trigger compaction if size amplification exceeds 110%
324 options.compaction_options_universal.max_size_amplification_percent = 110;
325 options = CurrentOptions(options);
326 ReopenWithColumnFamilies({"default", "pikachu"}, options);
327
328 Random rnd(301);
329 int key_idx = 0;
330
331 // Generate two files in Level 0. Both files are approx the same size.
332 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
333 num++) {
334 // Write 110KB (11 values, each 10K)
335 for (int i = 0; i < 11; i++) {
336 ASSERT_OK(Put(1, Key(key_idx), rnd.RandomString(10000)));
337 key_idx++;
338 }
339 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
340 ASSERT_EQ(NumSortedRuns(1), num + 1);
341 }
342 ASSERT_EQ(NumSortedRuns(1), 2);
343
344 // Flush whatever is remaining in memtable. This is typically
345 // small, which should not trigger size ratio based compaction
346 // but will instead trigger size amplification.
347 ASSERT_OK(Flush(1));
348
349 ASSERT_OK(dbfull()->TEST_WaitForCompact());
350
351 // Verify that size amplification did occur
352 ASSERT_EQ(NumSortedRuns(1), 1);
353 }
354
355 TEST_P(DBTestUniversalCompaction, DynamicUniversalCompactionSizeAmplification) {
356 Options options = CurrentOptions();
357 options.compaction_style = kCompactionStyleUniversal;
358 options.num_levels = 1;
359 options.write_buffer_size = 100 << 10; // 100KB
360 options.target_file_size_base = 32 << 10; // 32KB
361 options.level0_file_num_compaction_trigger = 3;
362 // Initial setup of compaction_options_universal will prevent universal
363 // compaction from happening
364 options.compaction_options_universal.size_ratio = 100;
365 options.compaction_options_universal.min_merge_width = 100;
366 DestroyAndReopen(options);
367
368 int total_picked_compactions = 0;
369 int total_size_amp_compactions = 0;
370 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
371 "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
372 if (arg) {
373 total_picked_compactions++;
374 Compaction* c = static_cast<Compaction*>(arg);
375 if (c->compaction_reason() ==
376 CompactionReason::kUniversalSizeAmplification) {
377 total_size_amp_compactions++;
378 }
379 }
380 });
381 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
382
383 MutableCFOptions mutable_cf_options;
384 CreateAndReopenWithCF({"pikachu"}, options);
385
386 Random rnd(301);
387 int key_idx = 0;
388
389 // Generate two files in Level 0. Both files are approx the same size.
390 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
391 num++) {
392 // Write 110KB (11 values, each 10K)
393 for (int i = 0; i < 11; i++) {
394 ASSERT_OK(Put(1, Key(key_idx), rnd.RandomString(10000)));
395 key_idx++;
396 }
397 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
398 ASSERT_EQ(NumSortedRuns(1), num + 1);
399 }
400 ASSERT_EQ(NumSortedRuns(1), 2);
401
402 // Flush whatever is remaining in memtable. This is typically
403 // small, which should not trigger size ratio based compaction
404 // but could instead trigger size amplification if it's set
405 // to 110.
406 ASSERT_OK(Flush(1));
407 ASSERT_OK(dbfull()->TEST_WaitForCompact());
408 // Verify compaction did not happen
409 ASSERT_EQ(NumSortedRuns(1), 3);
410
411 // Trigger compaction if size amplification exceeds 110% without reopening DB
412 ASSERT_EQ(dbfull()
413 ->GetOptions(handles_[1])
414 .compaction_options_universal.max_size_amplification_percent,
415 200U);
416 ASSERT_OK(dbfull()->SetOptions(handles_[1],
417 {{"compaction_options_universal",
418 "{max_size_amplification_percent=110;}"}}));
419 ASSERT_EQ(dbfull()
420 ->GetOptions(handles_[1])
421 .compaction_options_universal.max_size_amplification_percent,
422 110u);
423 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
424 &mutable_cf_options));
425 ASSERT_EQ(110u, mutable_cf_options.compaction_options_universal
426 .max_size_amplification_percent);
427
428 ASSERT_OK(dbfull()->TEST_WaitForCompact());
429 // Verify that size amplification did happen
430 ASSERT_EQ(NumSortedRuns(1), 1);
431 ASSERT_EQ(total_picked_compactions, 1);
432 ASSERT_EQ(total_size_amp_compactions, 1);
433 }
434
435 TEST_P(DBTestUniversalCompaction, DynamicUniversalCompactionReadAmplification) {
436 Options options = CurrentOptions();
437 options.compaction_style = kCompactionStyleUniversal;
438 options.num_levels = 1;
439 options.write_buffer_size = 100 << 10; // 100KB
440 options.target_file_size_base = 32 << 10; // 32KB
441 options.level0_file_num_compaction_trigger = 3;
442 // Initial setup of compaction_options_universal will prevent universal
443 // compaction from happening
444 options.compaction_options_universal.max_size_amplification_percent = 2000;
445 options.compaction_options_universal.size_ratio = 0;
446 options.compaction_options_universal.min_merge_width = 100;
447 DestroyAndReopen(options);
448
449 int total_picked_compactions = 0;
450 int total_size_ratio_compactions = 0;
451 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
452 "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
453 if (arg) {
454 total_picked_compactions++;
455 Compaction* c = static_cast<Compaction*>(arg);
456 if (c->compaction_reason() == CompactionReason::kUniversalSizeRatio) {
457 total_size_ratio_compactions++;
458 }
459 }
460 });
461 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
462
463 MutableCFOptions mutable_cf_options;
464 CreateAndReopenWithCF({"pikachu"}, options);
465
466 Random rnd(301);
467 int key_idx = 0;
468
469 // Generate three files in Level 0. All files are approx the same size.
470 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
471 // Write 110KB (11 values, each 10K)
472 for (int i = 0; i < 11; i++) {
473 ASSERT_OK(Put(1, Key(key_idx), rnd.RandomString(10000)));
474 key_idx++;
475 }
476 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
477 ASSERT_EQ(NumSortedRuns(1), num + 1);
478 }
479 ASSERT_EQ(NumSortedRuns(1), options.level0_file_num_compaction_trigger);
480
481 // Flush whatever is remaining in memtable. This is typically small, about
482 // 30KB.
483 ASSERT_OK(Flush(1));
484 ASSERT_OK(dbfull()->TEST_WaitForCompact());
485 // Verify compaction did not happen
486 ASSERT_EQ(NumSortedRuns(1), options.level0_file_num_compaction_trigger + 1);
487 ASSERT_EQ(total_picked_compactions, 0);
488
489 ASSERT_OK(dbfull()->SetOptions(
490 handles_[1],
491 {{"compaction_options_universal",
492 "{min_merge_width=2;max_merge_width=2;size_ratio=100;}"}}));
493 ASSERT_EQ(dbfull()
494 ->GetOptions(handles_[1])
495 .compaction_options_universal.min_merge_width,
496 2u);
497 ASSERT_EQ(dbfull()
498 ->GetOptions(handles_[1])
499 .compaction_options_universal.max_merge_width,
500 2u);
501 ASSERT_EQ(
502 dbfull()->GetOptions(handles_[1]).compaction_options_universal.size_ratio,
503 100u);
504
505 ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
506 &mutable_cf_options));
507 ASSERT_EQ(mutable_cf_options.compaction_options_universal.size_ratio, 100u);
508 ASSERT_EQ(mutable_cf_options.compaction_options_universal.min_merge_width,
509 2u);
510 ASSERT_EQ(mutable_cf_options.compaction_options_universal.max_merge_width,
511 2u);
512
513 ASSERT_OK(dbfull()->TEST_WaitForCompact());
514
515 // Files in L0 are approx: 0.3 (30KB), 1, 1, 1.
516 // On compaction: the files are below the size amp threshold, so we
517 // fallthrough to checking read amp conditions. The configured size ratio is
518 // not big enough to take 0.3 into consideration. So the next files 1 and 1
519 // are compacted together first as they satisfy size ratio condition and
520 // (min_merge_width, max_merge_width) condition, to give out a file size of 2.
521 // Next, the newly generated 2 and the last file 1 are compacted together. So
522 // at the end: #sortedRuns = 2, #picked_compactions = 2, and all the picked
523 // ones are size ratio based compactions.
524 ASSERT_EQ(NumSortedRuns(1), 2);
525 // If max_merge_width had not been changed dynamically above, and if it
526 // continued to be the default value of UINIT_MAX, total_picked_compactions
527 // would have been 1.
528 ASSERT_EQ(total_picked_compactions, 2);
529 ASSERT_EQ(total_size_ratio_compactions, 2);
530 }
531
532 TEST_P(DBTestUniversalCompaction, CompactFilesOnUniversalCompaction) {
533 const int kTestKeySize = 16;
534 const int kTestValueSize = 984;
535 const int kEntrySize = kTestKeySize + kTestValueSize;
536 const int kEntriesPerBuffer = 10;
537
538 ChangeCompactOptions();
539 Options options;
540 options.create_if_missing = true;
541 options.compaction_style = kCompactionStyleLevel;
542 options.num_levels = 1;
543 options.target_file_size_base = options.write_buffer_size;
544 options.compression = kNoCompression;
545 options = CurrentOptions(options);
546 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
547 CreateAndReopenWithCF({"pikachu"}, options);
548 ASSERT_EQ(options.compaction_style, kCompactionStyleUniversal);
549 Random rnd(301);
550 for (int key = 1024 * kEntriesPerBuffer; key >= 0; --key) {
551 ASSERT_OK(Put(1, ToString(key), rnd.RandomString(kTestValueSize)));
552 }
553 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
554 ASSERT_OK(dbfull()->TEST_WaitForCompact());
555 ColumnFamilyMetaData cf_meta;
556 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
557 std::vector<std::string> compaction_input_file_names;
558 for (auto file : cf_meta.levels[0].files) {
559 if (rnd.OneIn(2)) {
560 compaction_input_file_names.push_back(file.name);
561 }
562 }
563
564 if (compaction_input_file_names.size() == 0) {
565 compaction_input_file_names.push_back(
566 cf_meta.levels[0].files[0].name);
567 }
568
569 // expect fail since universal compaction only allow L0 output
570 ASSERT_FALSE(dbfull()
571 ->CompactFiles(CompactionOptions(), handles_[1],
572 compaction_input_file_names, 1)
573 .ok());
574
575 // expect ok and verify the compacted files no longer exist.
576 ASSERT_OK(dbfull()->CompactFiles(
577 CompactionOptions(), handles_[1],
578 compaction_input_file_names, 0));
579
580 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
581 VerifyCompactionResult(
582 cf_meta,
583 std::set<std::string>(compaction_input_file_names.begin(),
584 compaction_input_file_names.end()));
585
586 compaction_input_file_names.clear();
587
588 // Pick the first and the last file, expect everything is
589 // compacted into one single file.
590 compaction_input_file_names.push_back(
591 cf_meta.levels[0].files[0].name);
592 compaction_input_file_names.push_back(
593 cf_meta.levels[0].files[
594 cf_meta.levels[0].files.size() - 1].name);
595 ASSERT_OK(dbfull()->CompactFiles(
596 CompactionOptions(), handles_[1],
597 compaction_input_file_names, 0));
598
599 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
600 ASSERT_EQ(cf_meta.levels[0].files.size(), 1U);
601 }
602
603 TEST_P(DBTestUniversalCompaction, UniversalCompactionTargetLevel) {
604 Options options = CurrentOptions();
605 options.compaction_style = kCompactionStyleUniversal;
606 options.write_buffer_size = 100 << 10; // 100KB
607 options.num_levels = 7;
608 options.disable_auto_compactions = true;
609 DestroyAndReopen(options);
610
611 // Generate 3 overlapping files
612 Random rnd(301);
613 for (int i = 0; i < 210; i++) {
614 ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
615 }
616 ASSERT_OK(Flush());
617
618 for (int i = 200; i < 300; i++) {
619 ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
620 }
621 ASSERT_OK(Flush());
622
623 for (int i = 250; i < 260; i++) {
624 ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
625 }
626 ASSERT_OK(Flush());
627
628 ASSERT_EQ("3", FilesPerLevel(0));
629 // Compact all files into 1 file and put it in L4
630 CompactRangeOptions compact_options;
631 compact_options.change_level = true;
632 compact_options.target_level = 4;
633 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
634 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
635 ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
636 }
637
638 #ifndef ROCKSDB_VALGRIND_RUN
639 class DBTestUniversalCompactionMultiLevels
640 : public DBTestUniversalCompactionBase {
641 public:
642 DBTestUniversalCompactionMultiLevels() :
643 DBTestUniversalCompactionBase(
644 "/db_universal_compaction_multi_levels_test") {}
645 };
646
647 TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionMultiLevels) {
648 Options options = CurrentOptions();
649 options.compaction_style = kCompactionStyleUniversal;
650 options.num_levels = num_levels_;
651 options.write_buffer_size = 100 << 10; // 100KB
652 options.level0_file_num_compaction_trigger = 8;
653 options.max_background_compactions = 3;
654 options.target_file_size_base = 32 * 1024;
655 CreateAndReopenWithCF({"pikachu"}, options);
656
657 // Trigger compaction if size amplification exceeds 110%
658 options.compaction_options_universal.max_size_amplification_percent = 110;
659 options = CurrentOptions(options);
660 ReopenWithColumnFamilies({"default", "pikachu"}, options);
661
662 Random rnd(301);
663 int num_keys = 100000;
664 for (int i = 0; i < num_keys * 2; i++) {
665 ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
666 }
667
668 ASSERT_OK(dbfull()->TEST_WaitForCompact());
669
670 for (int i = num_keys; i < num_keys * 2; i++) {
671 ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
672 }
673 }
674
675 // Tests universal compaction with trivial move enabled
676 TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) {
677 int32_t trivial_move = 0;
678 int32_t non_trivial_move = 0;
679 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
680 "DBImpl::BackgroundCompaction:TrivialMove",
681 [&](void* /*arg*/) { trivial_move++; });
682 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
683 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
684 non_trivial_move++;
685 ASSERT_TRUE(arg != nullptr);
686 int output_level = *(static_cast<int*>(arg));
687 ASSERT_EQ(output_level, 0);
688 });
689 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
690
691 Options options = CurrentOptions();
692 options.compaction_style = kCompactionStyleUniversal;
693 options.compaction_options_universal.allow_trivial_move = true;
694 options.num_levels = 3;
695 options.write_buffer_size = 100 << 10; // 100KB
696 options.level0_file_num_compaction_trigger = 3;
697 options.max_background_compactions = 2;
698 options.target_file_size_base = 32 * 1024;
699 DestroyAndReopen(options);
700 CreateAndReopenWithCF({"pikachu"}, options);
701
702 // Trigger compaction if size amplification exceeds 110%
703 options.compaction_options_universal.max_size_amplification_percent = 110;
704 options = CurrentOptions(options);
705 ReopenWithColumnFamilies({"default", "pikachu"}, options);
706
707 Random rnd(301);
708 int num_keys = 150000;
709 for (int i = 0; i < num_keys; i++) {
710 ASSERT_OK(Put(1, Key(i), Key(i)));
711 }
712 std::vector<std::string> values;
713
714 ASSERT_OK(Flush(1));
715 ASSERT_OK(dbfull()->TEST_WaitForCompact());
716
717 ASSERT_GT(trivial_move, 0);
718 ASSERT_GT(non_trivial_move, 0);
719
720 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
721 }
722
723 INSTANTIATE_TEST_CASE_P(MultiLevels, DBTestUniversalCompactionMultiLevels,
724 ::testing::Combine(::testing::Values(3, 20),
725 ::testing::Bool()));
726
727 class DBTestUniversalCompactionParallel :
728 public DBTestUniversalCompactionBase {
729 public:
730 DBTestUniversalCompactionParallel() :
731 DBTestUniversalCompactionBase(
732 "/db_universal_compaction_prallel_test") {}
733 };
734
735 TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) {
736 Options options = CurrentOptions();
737 options.compaction_style = kCompactionStyleUniversal;
738 options.num_levels = num_levels_;
739 options.write_buffer_size = 1 << 10; // 1KB
740 options.level0_file_num_compaction_trigger = 3;
741 options.max_background_compactions = 3;
742 options.max_background_flushes = 3;
743 options.target_file_size_base = 1 * 1024;
744 options.compaction_options_universal.max_size_amplification_percent = 110;
745 DestroyAndReopen(options);
746 CreateAndReopenWithCF({"pikachu"}, options);
747
748 // Delay every compaction so multiple compactions will happen.
749 std::atomic<int> num_compactions_running(0);
750 std::atomic<bool> has_parallel(false);
751 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
752 "CompactionJob::Run():Start", [&](void* /*arg*/) {
753 if (num_compactions_running.fetch_add(1) > 0) {
754 has_parallel.store(true);
755 return;
756 }
757 for (int nwait = 0; nwait < 20000; nwait++) {
758 if (has_parallel.load() || num_compactions_running.load() > 1) {
759 has_parallel.store(true);
760 break;
761 }
762 env_->SleepForMicroseconds(1000);
763 }
764 });
765 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
766 "CompactionJob::Run():End",
767 [&](void* /*arg*/) { num_compactions_running.fetch_add(-1); });
768 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
769
770 options = CurrentOptions(options);
771 ReopenWithColumnFamilies({"default", "pikachu"}, options);
772
773 Random rnd(301);
774 int num_keys = 30000;
775 for (int i = 0; i < num_keys * 2; i++) {
776 ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
777 }
778 ASSERT_OK(dbfull()->TEST_WaitForCompact());
779
780 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
781 ASSERT_EQ(num_compactions_running.load(), 0);
782 ASSERT_TRUE(has_parallel.load());
783
784 for (int i = num_keys; i < num_keys * 2; i++) {
785 ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
786 }
787
788 // Reopen and check.
789 ReopenWithColumnFamilies({"default", "pikachu"}, options);
790 for (int i = num_keys; i < num_keys * 2; i++) {
791 ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
792 }
793 }
794
795 TEST_P(DBTestUniversalCompactionParallel, PickByFileNumberBug) {
796 Options options = CurrentOptions();
797 options.compaction_style = kCompactionStyleUniversal;
798 options.num_levels = num_levels_;
799 options.write_buffer_size = 1 * 1024; // 1KB
800 options.level0_file_num_compaction_trigger = 7;
801 options.max_background_compactions = 2;
802 options.target_file_size_base = 1024 * 1024; // 1MB
803
804 // Disable size amplifiction compaction
805 options.compaction_options_universal.max_size_amplification_percent =
806 UINT_MAX;
807 DestroyAndReopen(options);
808
809 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
810 {{"DBTestUniversalCompactionParallel::PickByFileNumberBug:0",
811 "BackgroundCallCompaction:0"},
812 {"UniversalCompactionBuilder::PickCompaction:Return",
813 "DBTestUniversalCompactionParallel::PickByFileNumberBug:1"},
814 {"DBTestUniversalCompactionParallel::PickByFileNumberBug:2",
815 "CompactionJob::Run():Start"}});
816
817 int total_picked_compactions = 0;
818 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
819 "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
820 if (arg) {
821 total_picked_compactions++;
822 }
823 });
824
825 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
826
827 // Write 7 files to trigger compaction
828 int key_idx = 1;
829 for (int i = 1; i <= 70; i++) {
830 std::string k = Key(key_idx++);
831 ASSERT_OK(Put(k, k));
832 if (i % 10 == 0) {
833 ASSERT_OK(Flush());
834 }
835 }
836
837 // Wait for the 1st background compaction process to start
838 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
839 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
840 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
841
842 // Write 3 files while 1st compaction is held
843 // These 3 files have different sizes to avoid compacting based on size_ratio
844 int num_keys = 1000;
845 for (int i = 0; i < 3; i++) {
846 for (int j = 1; j <= num_keys; j++) {
847 std::string k = Key(key_idx++);
848 ASSERT_OK(Put(k, k));
849 }
850 ASSERT_OK(Flush());
851 num_keys -= 100;
852 }
853
854 // Hold the 1st compaction from finishing
855 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
856 ASSERT_OK(dbfull()->TEST_WaitForCompact());
857
858 // There should only be one picked compaction as the score drops below one
859 // after the first one is picked.
860 EXPECT_EQ(total_picked_compactions, 1);
861 EXPECT_EQ(TotalTableFiles(), 4);
862
863 // Stop SyncPoint and destroy the DB and reopen it again
864 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
865 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
866 key_idx = 1;
867 total_picked_compactions = 0;
868 DestroyAndReopen(options);
869
870 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
871
872 // Write 7 files to trigger compaction
873 for (int i = 1; i <= 70; i++) {
874 std::string k = Key(key_idx++);
875 ASSERT_OK(Put(k, k));
876 if (i % 10 == 0) {
877 ASSERT_OK(Flush());
878 }
879 }
880
881 // Wait for the 1st background compaction process to start
882 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
883 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
884 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
885
886 // Write 8 files while 1st compaction is held
887 // These 8 files have different sizes to avoid compacting based on size_ratio
888 num_keys = 1000;
889 for (int i = 0; i < 8; i++) {
890 for (int j = 1; j <= num_keys; j++) {
891 std::string k = Key(key_idx++);
892 ASSERT_OK(Put(k, k));
893 }
894 ASSERT_OK(Flush());
895 num_keys -= 100;
896 }
897
898 // Wait for the 2nd background compaction process to start
899 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
900 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
901
902 // Hold the 1st and 2nd compaction from finishing
903 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
904 ASSERT_OK(dbfull()->TEST_WaitForCompact());
905
906 // This time we will trigger a compaction because of size ratio and
907 // another compaction because of number of files that are not compacted
908 // greater than 7
909 EXPECT_GE(total_picked_compactions, 2);
910 }
911
912 INSTANTIATE_TEST_CASE_P(Parallel, DBTestUniversalCompactionParallel,
913 ::testing::Combine(::testing::Values(1, 10),
914 ::testing::Values(false)));
915 #endif // ROCKSDB_VALGRIND_RUN
916
917 TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) {
918 Options options = CurrentOptions();
919 options.compaction_style = kCompactionStyleUniversal;
920 options.write_buffer_size = 105 << 10; // 105KB
921 options.arena_block_size = 4 << 10; // 4KB
922 options.target_file_size_base = 32 << 10; // 32KB
923 options.level0_file_num_compaction_trigger = 4;
924 options.num_levels = num_levels_;
925 options.compaction_options_universal.compression_size_percent = -1;
926 DestroyAndReopen(options);
927 CreateAndReopenWithCF({"pikachu"}, options);
928
929 Random rnd(301);
930 int key_idx = 0;
931
932 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
933 // Write 100KB (100 values, each 1K)
934 for (int i = 0; i < 100; i++) {
935 ASSERT_OK(Put(1, Key(key_idx), rnd.RandomString(990)));
936 key_idx++;
937 }
938 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
939
940 if (num < options.level0_file_num_compaction_trigger - 1) {
941 ASSERT_EQ(NumSortedRuns(1), num + 1);
942 }
943 }
944
945 ASSERT_OK(dbfull()->TEST_WaitForCompact());
946 ASSERT_EQ(NumSortedRuns(1), 1);
947 }
948
949 TEST_P(DBTestUniversalCompaction, UniversalCompactionStopStyleSimilarSize) {
950 Options options = CurrentOptions();
951 options.compaction_style = kCompactionStyleUniversal;
952 options.write_buffer_size = 105 << 10; // 105KB
953 options.arena_block_size = 4 << 10; // 4KB
954 options.target_file_size_base = 32 << 10; // 32KB
955 // trigger compaction if there are >= 4 files
956 options.level0_file_num_compaction_trigger = 4;
957 options.compaction_options_universal.size_ratio = 10;
958 options.compaction_options_universal.stop_style =
959 kCompactionStopStyleSimilarSize;
960 options.num_levels = num_levels_;
961 DestroyAndReopen(options);
962
963 Random rnd(301);
964 int key_idx = 0;
965
966 // Stage 1:
967 // Generate a set of files at level 0, but don't trigger level-0
968 // compaction.
969 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
970 num++) {
971 // Write 100KB (100 values, each 1K)
972 for (int i = 0; i < 100; i++) {
973 ASSERT_OK(Put(Key(key_idx), rnd.RandomString(990)));
974 key_idx++;
975 }
976 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
977 ASSERT_EQ(NumSortedRuns(), num + 1);
978 }
979
980 // Generate one more file at level-0, which should trigger level-0
981 // compaction.
982 for (int i = 0; i < 100; i++) {
983 ASSERT_OK(Put(Key(key_idx), rnd.RandomString(990)));
984 key_idx++;
985 }
986 ASSERT_OK(dbfull()->TEST_WaitForCompact());
987 // Suppose each file flushed from mem table has size 1. Now we compact
988 // (level0_file_num_compaction_trigger+1)=4 files and should have a big
989 // file of size 4.
990 ASSERT_EQ(NumSortedRuns(), 1);
991
992 // Stage 2:
993 // Now we have one file at level 0, with size 4. We also have some data in
994 // mem table. Let's continue generating new files at level 0, but don't
995 // trigger level-0 compaction.
996 // First, clean up memtable before inserting new data. This will generate
997 // a level-0 file, with size around 0.4 (according to previously written
998 // data amount).
999 dbfull()->Flush(FlushOptions());
1000 for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
1001 num++) {
1002 // Write 110KB (11 values, each 10K)
1003 for (int i = 0; i < 100; i++) {
1004 ASSERT_OK(Put(Key(key_idx), rnd.RandomString(990)));
1005 key_idx++;
1006 }
1007 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1008 ASSERT_EQ(NumSortedRuns(), num + 3);
1009 }
1010
1011 // Generate one more file at level-0, which should trigger level-0
1012 // compaction.
1013 for (int i = 0; i < 100; i++) {
1014 ASSERT_OK(Put(Key(key_idx), rnd.RandomString(990)));
1015 key_idx++;
1016 }
1017 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1018 // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
1019 // After compaction, we should have 3 files, with size 4, 0.4, 2.
1020 ASSERT_EQ(NumSortedRuns(), 3);
1021 // Stage 3:
1022 // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one
1023 // more file at level-0, which should trigger level-0 compaction.
1024 for (int i = 0; i < 100; i++) {
1025 ASSERT_OK(Put(Key(key_idx), rnd.RandomString(990)));
1026 key_idx++;
1027 }
1028 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1029 // Level-0 compaction is triggered, but no file will be picked up.
1030 ASSERT_EQ(NumSortedRuns(), 4);
1031 }
1032
1033 TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio1) {
1034 if (!Snappy_Supported()) {
1035 return;
1036 }
1037
1038 Options options = CurrentOptions();
1039 options.compaction_style = kCompactionStyleUniversal;
1040 options.write_buffer_size = 100 << 10; // 100KB
1041 options.target_file_size_base = 32 << 10; // 32KB
1042 options.level0_file_num_compaction_trigger = 2;
1043 options.num_levels = num_levels_;
1044 options.compaction_options_universal.compression_size_percent = 70;
1045 DestroyAndReopen(options);
1046
1047 Random rnd(301);
1048 int key_idx = 0;
1049
1050 // The first compaction (2) is compressed.
1051 for (int num = 0; num < 2; num++) {
1052 // Write 110KB (11 values, each 10K)
1053 for (int i = 0; i < 11; i++) {
1054 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1055 key_idx++;
1056 }
1057 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1058 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1059 }
1060 ASSERT_LT(TotalSize(), 110000U * 2 * 0.9);
1061
1062 // The second compaction (4) is compressed
1063 for (int num = 0; num < 2; num++) {
1064 // Write 110KB (11 values, each 10K)
1065 for (int i = 0; i < 11; i++) {
1066 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1067 key_idx++;
1068 }
1069 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1070 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1071 }
1072 ASSERT_LT(TotalSize(), 110000 * 4 * 0.9);
1073
1074 // The third compaction (2 4) is compressed since this time it is
1075 // (1 1 3.2) and 3.2/5.2 doesn't reach ratio.
1076 for (int num = 0; num < 2; num++) {
1077 // Write 110KB (11 values, each 10K)
1078 for (int i = 0; i < 11; i++) {
1079 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1080 key_idx++;
1081 }
1082 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1083 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1084 }
1085 ASSERT_LT(TotalSize(), 110000 * 6 * 0.9);
1086
1087 // When we start for the compaction up to (2 4 8), the latest
1088 // compressed is not compressed.
1089 for (int num = 0; num < 8; num++) {
1090 // Write 110KB (11 values, each 10K)
1091 for (int i = 0; i < 11; i++) {
1092 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1093 key_idx++;
1094 }
1095 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1096 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1097 }
1098 ASSERT_GT(TotalSize(), 110000 * 11 * 0.8 + 110000 * 2);
1099 }
1100
1101 TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio2) {
1102 if (!Snappy_Supported()) {
1103 return;
1104 }
1105 Options options = CurrentOptions();
1106 options.compaction_style = kCompactionStyleUniversal;
1107 options.write_buffer_size = 100 << 10; // 100KB
1108 options.target_file_size_base = 32 << 10; // 32KB
1109 options.level0_file_num_compaction_trigger = 2;
1110 options.num_levels = num_levels_;
1111 options.compaction_options_universal.compression_size_percent = 95;
1112 DestroyAndReopen(options);
1113
1114 Random rnd(301);
1115 int key_idx = 0;
1116
1117 // When we start for the compaction up to (2 4 8), the latest
1118 // compressed is compressed given the size ratio to compress.
1119 for (int num = 0; num < 14; num++) {
1120 // Write 120KB (12 values, each 10K)
1121 for (int i = 0; i < 12; i++) {
1122 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1123 key_idx++;
1124 }
1125 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1126 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1127 }
1128 ASSERT_LT(TotalSize(), 120000U * 12 * 0.82 + 120000 * 2);
1129 }
1130
1131 #ifndef ROCKSDB_VALGRIND_RUN
1132 // Test that checks trivial move in universal compaction
1133 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest1) {
1134 int32_t trivial_move = 0;
1135 int32_t non_trivial_move = 0;
1136 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1137 "DBImpl::BackgroundCompaction:TrivialMove",
1138 [&](void* /*arg*/) { trivial_move++; });
1139 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1140 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
1141 non_trivial_move++;
1142 ASSERT_TRUE(arg != nullptr);
1143 int output_level = *(static_cast<int*>(arg));
1144 ASSERT_EQ(output_level, 0);
1145 });
1146 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1147
1148 Options options = CurrentOptions();
1149 options.compaction_style = kCompactionStyleUniversal;
1150 options.compaction_options_universal.allow_trivial_move = true;
1151 options.num_levels = 2;
1152 options.write_buffer_size = 100 << 10; // 100KB
1153 options.level0_file_num_compaction_trigger = 3;
1154 options.max_background_compactions = 1;
1155 options.target_file_size_base = 32 * 1024;
1156 DestroyAndReopen(options);
1157 CreateAndReopenWithCF({"pikachu"}, options);
1158
1159 // Trigger compaction if size amplification exceeds 110%
1160 options.compaction_options_universal.max_size_amplification_percent = 110;
1161 options = CurrentOptions(options);
1162 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1163
1164 Random rnd(301);
1165 int num_keys = 250000;
1166 for (int i = 0; i < num_keys; i++) {
1167 ASSERT_OK(Put(1, Key(i), Key(i)));
1168 }
1169 std::vector<std::string> values;
1170
1171 ASSERT_OK(Flush(1));
1172 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1173
1174 ASSERT_GT(trivial_move, 0);
1175 ASSERT_GT(non_trivial_move, 0);
1176
1177 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1178 }
1179 // Test that checks trivial move in universal compaction
1180 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest2) {
1181 int32_t trivial_move = 0;
1182 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1183 "DBImpl::BackgroundCompaction:TrivialMove",
1184 [&](void* /*arg*/) { trivial_move++; });
1185 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1186 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
1187 ASSERT_TRUE(arg != nullptr);
1188 int output_level = *(static_cast<int*>(arg));
1189 ASSERT_EQ(output_level, 0);
1190 });
1191
1192 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1193
1194 Options options = CurrentOptions();
1195 options.compaction_style = kCompactionStyleUniversal;
1196 options.compaction_options_universal.allow_trivial_move = true;
1197 options.num_levels = 15;
1198 options.write_buffer_size = 100 << 10; // 100KB
1199 options.level0_file_num_compaction_trigger = 8;
1200 options.max_background_compactions = 2;
1201 options.target_file_size_base = 64 * 1024;
1202 DestroyAndReopen(options);
1203 CreateAndReopenWithCF({"pikachu"}, options);
1204
1205 // Trigger compaction if size amplification exceeds 110%
1206 options.compaction_options_universal.max_size_amplification_percent = 110;
1207 options = CurrentOptions(options);
1208 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1209
1210 Random rnd(301);
1211 int num_keys = 500000;
1212 for (int i = 0; i < num_keys; i++) {
1213 ASSERT_OK(Put(1, Key(i), Key(i)));
1214 }
1215 std::vector<std::string> values;
1216
1217 ASSERT_OK(Flush(1));
1218 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1219
1220 ASSERT_GT(trivial_move, 0);
1221
1222 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1223 }
1224 #endif // ROCKSDB_VALGRIND_RUN
1225
1226 TEST_P(DBTestUniversalCompaction, UniversalCompactionFourPaths) {
1227 Options options = CurrentOptions();
1228 options.db_paths.emplace_back(dbname_, 300 * 1024);
1229 options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024);
1230 options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024);
1231 options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024);
1232 options.memtable_factory.reset(
1233 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1234 options.compaction_style = kCompactionStyleUniversal;
1235 options.compaction_options_universal.size_ratio = 5;
1236 options.write_buffer_size = 111 << 10; // 114KB
1237 options.arena_block_size = 4 << 10;
1238 options.level0_file_num_compaction_trigger = 2;
1239 options.num_levels = 1;
1240
1241 std::vector<std::string> filenames;
1242 if (env_->GetChildren(options.db_paths[1].path, &filenames).ok()) {
1243 // Delete archival files.
1244 for (size_t i = 0; i < filenames.size(); ++i) {
1245 ASSERT_OK(
1246 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]));
1247 }
1248 ASSERT_OK(env_->DeleteDir(options.db_paths[1].path));
1249 }
1250 Reopen(options);
1251
1252 Random rnd(301);
1253 int key_idx = 0;
1254
1255 // First three 110KB files are not going to second path.
1256 // After that, (100K, 200K)
1257 for (int num = 0; num < 3; num++) {
1258 GenerateNewFile(&rnd, &key_idx);
1259 }
1260
1261 // Another 110KB triggers a compaction to 400K file to second path
1262 GenerateNewFile(&rnd, &key_idx);
1263 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1264
1265 // (1, 4)
1266 GenerateNewFile(&rnd, &key_idx);
1267 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1268 ASSERT_EQ(1, GetSstFileCount(dbname_));
1269
1270 // (1,1,4) -> (2, 4)
1271 GenerateNewFile(&rnd, &key_idx);
1272 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1273 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1274 ASSERT_EQ(0, GetSstFileCount(dbname_));
1275
1276 // (1, 2, 4) -> (3, 4)
1277 GenerateNewFile(&rnd, &key_idx);
1278 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1279 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1280 ASSERT_EQ(0, GetSstFileCount(dbname_));
1281
1282 // (1, 3, 4) -> (8)
1283 GenerateNewFile(&rnd, &key_idx);
1284 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1285
1286 // (1, 8)
1287 GenerateNewFile(&rnd, &key_idx);
1288 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1289 ASSERT_EQ(1, GetSstFileCount(dbname_));
1290
1291 // (1, 1, 8) -> (2, 8)
1292 GenerateNewFile(&rnd, &key_idx);
1293 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1294 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1295
1296 // (1, 2, 8) -> (3, 8)
1297 GenerateNewFile(&rnd, &key_idx);
1298 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1299 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1300 ASSERT_EQ(0, GetSstFileCount(dbname_));
1301
1302 // (1, 3, 8) -> (4, 8)
1303 GenerateNewFile(&rnd, &key_idx);
1304 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1305 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1306
1307 // (1, 4, 8) -> (5, 8)
1308 GenerateNewFile(&rnd, &key_idx);
1309 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1310 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1311 ASSERT_EQ(0, GetSstFileCount(dbname_));
1312
1313 for (int i = 0; i < key_idx; i++) {
1314 auto v = Get(Key(i));
1315 ASSERT_NE(v, "NOT_FOUND");
1316 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1317 }
1318
1319 Reopen(options);
1320
1321 for (int i = 0; i < key_idx; i++) {
1322 auto v = Get(Key(i));
1323 ASSERT_NE(v, "NOT_FOUND");
1324 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1325 }
1326
1327 Destroy(options);
1328 }
1329
1330 TEST_P(DBTestUniversalCompaction, UniversalCompactionCFPathUse) {
1331 Options options = CurrentOptions();
1332 options.db_paths.emplace_back(dbname_, 300 * 1024);
1333 options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024);
1334 options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024);
1335 options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024);
1336 options.memtable_factory.reset(
1337 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1338 options.compaction_style = kCompactionStyleUniversal;
1339 options.compaction_options_universal.size_ratio = 10;
1340 options.write_buffer_size = 111 << 10; // 114KB
1341 options.arena_block_size = 4 << 10;
1342 options.level0_file_num_compaction_trigger = 2;
1343 options.num_levels = 1;
1344
1345 std::vector<Options> option_vector;
1346 option_vector.emplace_back(options);
1347 ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
1348 // Configure CF1 specific paths.
1349 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 300 * 1024);
1350 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 300 * 1024);
1351 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 500 * 1024);
1352 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_4", 1024 * 1024 * 1024);
1353 option_vector.emplace_back(DBOptions(options), cf_opt1);
1354 CreateColumnFamilies({"one"},option_vector[1]);
1355
1356 // Configura CF2 specific paths.
1357 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 300 * 1024);
1358 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 300 * 1024);
1359 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 500 * 1024);
1360 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_4", 1024 * 1024 * 1024);
1361 option_vector.emplace_back(DBOptions(options), cf_opt2);
1362 CreateColumnFamilies({"two"},option_vector[2]);
1363
1364 ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
1365
1366 Random rnd(301);
1367 int key_idx = 0;
1368 int key_idx1 = 0;
1369 int key_idx2 = 0;
1370
1371 auto generate_file = [&]() {
1372 GenerateNewFile(0, &rnd, &key_idx);
1373 GenerateNewFile(1, &rnd, &key_idx1);
1374 GenerateNewFile(2, &rnd, &key_idx2);
1375 };
1376
1377 auto check_sstfilecount = [&](int path_id, int expected) {
1378 ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
1379 ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
1380 ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
1381 };
1382
1383 auto check_getvalues = [&]() {
1384 for (int i = 0; i < key_idx; i++) {
1385 auto v = Get(0, Key(i));
1386 ASSERT_NE(v, "NOT_FOUND");
1387 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1388 }
1389
1390 for (int i = 0; i < key_idx1; i++) {
1391 auto v = Get(1, Key(i));
1392 ASSERT_NE(v, "NOT_FOUND");
1393 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1394 }
1395
1396 for (int i = 0; i < key_idx2; i++) {
1397 auto v = Get(2, Key(i));
1398 ASSERT_NE(v, "NOT_FOUND");
1399 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1400 }
1401 };
1402
1403 // First three 110KB files are not going to second path.
1404 // After that, (100K, 200K)
1405 for (int num = 0; num < 3; num++) {
1406 generate_file();
1407 }
1408
1409 // Another 110KB triggers a compaction to 400K file to second path
1410 generate_file();
1411 check_sstfilecount(2, 1);
1412
1413 // (1, 4)
1414 generate_file();
1415 check_sstfilecount(2, 1);
1416 check_sstfilecount(0, 1);
1417
1418 // (1,1,4) -> (2, 4)
1419 generate_file();
1420 check_sstfilecount(2, 1);
1421 check_sstfilecount(1, 1);
1422 check_sstfilecount(0, 0);
1423
1424 // (1, 2, 4) -> (3, 4)
1425 generate_file();
1426 check_sstfilecount(2, 1);
1427 check_sstfilecount(1, 1);
1428 check_sstfilecount(0, 0);
1429
1430 // (1, 3, 4) -> (8)
1431 generate_file();
1432 check_sstfilecount(3, 1);
1433
1434 // (1, 8)
1435 generate_file();
1436 check_sstfilecount(3, 1);
1437 check_sstfilecount(0, 1);
1438
1439 // (1, 1, 8) -> (2, 8)
1440 generate_file();
1441 check_sstfilecount(3, 1);
1442 check_sstfilecount(1, 1);
1443
1444 // (1, 2, 8) -> (3, 8)
1445 generate_file();
1446 check_sstfilecount(3, 1);
1447 check_sstfilecount(1, 1);
1448 check_sstfilecount(0, 0);
1449
1450 // (1, 3, 8) -> (4, 8)
1451 generate_file();
1452 check_sstfilecount(2, 1);
1453 check_sstfilecount(3, 1);
1454
1455 // (1, 4, 8) -> (5, 8)
1456 generate_file();
1457 check_sstfilecount(3, 1);
1458 check_sstfilecount(2, 1);
1459 check_sstfilecount(0, 0);
1460
1461 check_getvalues();
1462
1463 ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
1464
1465 check_getvalues();
1466
1467 Destroy(options, true);
1468 }
1469
1470 TEST_P(DBTestUniversalCompaction, IncreaseUniversalCompactionNumLevels) {
1471 std::function<void(int)> verify_func = [&](int num_keys_in_db) {
1472 std::string keys_in_db;
1473 Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
1474 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1475 keys_in_db.append(iter->key().ToString());
1476 keys_in_db.push_back(',');
1477 }
1478 delete iter;
1479
1480 std::string expected_keys;
1481 for (int i = 0; i <= num_keys_in_db; i++) {
1482 expected_keys.append(Key(i));
1483 expected_keys.push_back(',');
1484 }
1485
1486 ASSERT_EQ(keys_in_db, expected_keys);
1487 };
1488
1489 Random rnd(301);
1490 int max_key1 = 200;
1491 int max_key2 = 600;
1492 int max_key3 = 800;
1493 const int KNumKeysPerFile = 10;
1494
1495 // Stage 1: open a DB with universal compaction, num_levels=1
1496 Options options = CurrentOptions();
1497 options.compaction_style = kCompactionStyleUniversal;
1498 options.num_levels = 1;
1499 options.write_buffer_size = 200 << 10; // 200KB
1500 options.level0_file_num_compaction_trigger = 3;
1501 options.memtable_factory.reset(new SpecialSkipListFactory(KNumKeysPerFile));
1502 options = CurrentOptions(options);
1503 CreateAndReopenWithCF({"pikachu"}, options);
1504
1505 for (int i = 0; i <= max_key1; i++) {
1506 // each value is 10K
1507 ASSERT_OK(Put(1, Key(i), rnd.RandomString(10000)));
1508 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
1509 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1510 }
1511 ASSERT_OK(Flush(1));
1512 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1513
1514 // Stage 2: reopen with universal compaction, num_levels=4
1515 options.compaction_style = kCompactionStyleUniversal;
1516 options.num_levels = 4;
1517 options = CurrentOptions(options);
1518 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1519
1520 verify_func(max_key1);
1521
1522 // Insert more keys
1523 for (int i = max_key1 + 1; i <= max_key2; i++) {
1524 // each value is 10K
1525 ASSERT_OK(Put(1, Key(i), rnd.RandomString(10000)));
1526 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
1527 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1528 }
1529 ASSERT_OK(Flush(1));
1530 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1531
1532 verify_func(max_key2);
1533 // Compaction to non-L0 has happened.
1534 ASSERT_GT(NumTableFilesAtLevel(options.num_levels - 1, 1), 0);
1535
1536 // Stage 3: Revert it back to one level and revert to num_levels=1.
1537 options.num_levels = 4;
1538 options.target_file_size_base = INT_MAX;
1539 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1540 // Compact all to level 0
1541 CompactRangeOptions compact_options;
1542 compact_options.change_level = true;
1543 compact_options.target_level = 0;
1544 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1545 ASSERT_OK(
1546 dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1547 // Need to restart it once to remove higher level records in manifest.
1548 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1549 // Final reopen
1550 options.compaction_style = kCompactionStyleUniversal;
1551 options.num_levels = 1;
1552 options = CurrentOptions(options);
1553 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1554
1555 // Insert more keys
1556 for (int i = max_key2 + 1; i <= max_key3; i++) {
1557 // each value is 10K
1558 ASSERT_OK(Put(1, Key(i), rnd.RandomString(10000)));
1559 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
1560 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1561 }
1562 ASSERT_OK(Flush(1));
1563 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1564 verify_func(max_key3);
1565 }
1566
1567
1568 TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
1569 if (!Snappy_Supported()) {
1570 return;
1571 }
1572 Options options = CurrentOptions();
1573 options.db_paths.emplace_back(dbname_, 500 * 1024);
1574 options.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024);
1575 options.compaction_style = kCompactionStyleUniversal;
1576 options.compaction_options_universal.size_ratio = 5;
1577 options.write_buffer_size = 111 << 10; // 114KB
1578 options.arena_block_size = 4 << 10;
1579 options.level0_file_num_compaction_trigger = 2;
1580 options.num_levels = 1;
1581 options.memtable_factory.reset(
1582 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1583
1584 std::vector<std::string> filenames;
1585 if (env_->GetChildren(options.db_paths[1].path, &filenames).ok()) {
1586 // Delete archival files.
1587 for (size_t i = 0; i < filenames.size(); ++i) {
1588 ASSERT_OK(
1589 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]));
1590 }
1591 ASSERT_OK(env_->DeleteDir(options.db_paths[1].path));
1592 }
1593 Reopen(options);
1594
1595 Random rnd(301);
1596 int key_idx = 0;
1597
1598 // First three 110KB files are not going to second path.
1599 // After that, (100K, 200K)
1600 for (int num = 0; num < 3; num++) {
1601 GenerateNewFile(&rnd, &key_idx);
1602 }
1603
1604 // Another 110KB triggers a compaction to 400K file to second path
1605 GenerateNewFile(&rnd, &key_idx);
1606 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1607
1608 // (1, 4)
1609 GenerateNewFile(&rnd, &key_idx);
1610 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1611 ASSERT_EQ(1, GetSstFileCount(dbname_));
1612
1613 // (1,1,4) -> (2, 4)
1614 GenerateNewFile(&rnd, &key_idx);
1615 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1616 ASSERT_EQ(1, GetSstFileCount(dbname_));
1617
1618 // (1, 2, 4) -> (3, 4)
1619 GenerateNewFile(&rnd, &key_idx);
1620 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1621 ASSERT_EQ(0, GetSstFileCount(dbname_));
1622
1623 // (1, 3, 4) -> (8)
1624 GenerateNewFile(&rnd, &key_idx);
1625 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1626 ASSERT_EQ(0, GetSstFileCount(dbname_));
1627
1628 // (1, 8)
1629 GenerateNewFile(&rnd, &key_idx);
1630 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1631 ASSERT_EQ(1, GetSstFileCount(dbname_));
1632
1633 // (1, 1, 8) -> (2, 8)
1634 GenerateNewFile(&rnd, &key_idx);
1635 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1636 ASSERT_EQ(1, GetSstFileCount(dbname_));
1637
1638 // (1, 2, 8) -> (3, 8)
1639 GenerateNewFile(&rnd, &key_idx);
1640 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1641 ASSERT_EQ(0, GetSstFileCount(dbname_));
1642
1643 // (1, 3, 8) -> (4, 8)
1644 GenerateNewFile(&rnd, &key_idx);
1645 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1646 ASSERT_EQ(0, GetSstFileCount(dbname_));
1647
1648 // (1, 4, 8) -> (5, 8)
1649 GenerateNewFile(&rnd, &key_idx);
1650 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1651 ASSERT_EQ(0, GetSstFileCount(dbname_));
1652
1653 for (int i = 0; i < key_idx; i++) {
1654 auto v = Get(Key(i));
1655 ASSERT_NE(v, "NOT_FOUND");
1656 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1657 }
1658
1659 Reopen(options);
1660
1661 for (int i = 0; i < key_idx; i++) {
1662 auto v = Get(Key(i));
1663 ASSERT_NE(v, "NOT_FOUND");
1664 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1665 }
1666
1667 Destroy(options);
1668 }
1669
1670 TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) {
1671 if (num_levels_ == 1) {
1672 // for single-level universal, everything's bottom level so nothing should
1673 // be executed in bottom-pri thread pool.
1674 return;
1675 }
1676 const int kNumFilesTrigger = 3;
1677 Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
1678 Options options = CurrentOptions();
1679 options.compaction_style = kCompactionStyleUniversal;
1680 options.num_levels = num_levels_;
1681 options.write_buffer_size = 100 << 10; // 100KB
1682 options.target_file_size_base = 32 << 10; // 32KB
1683 options.level0_file_num_compaction_trigger = kNumFilesTrigger;
1684 // Trigger compaction if size amplification exceeds 110%
1685 options.compaction_options_universal.max_size_amplification_percent = 110;
1686 DestroyAndReopen(options);
1687
1688 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1689 {// wait for the full compaction to be picked before adding files intended
1690 // for the second one.
1691 {"DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
1692 "DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0"},
1693 // the full (bottom-pri) compaction waits until a partial (low-pri)
1694 // compaction has started to verify they can run in parallel.
1695 {"DBImpl::BackgroundCompaction:NonTrivial",
1696 "DBImpl::BGWorkBottomCompaction"}});
1697 SyncPoint::GetInstance()->EnableProcessing();
1698
1699 Random rnd(301);
1700 for (int i = 0; i < 2; ++i) {
1701 for (int num = 0; num < kNumFilesTrigger; num++) {
1702 int key_idx = 0;
1703 GenerateNewFile(&rnd, &key_idx, true /* no_wait */);
1704 // use no_wait above because that one waits for flush and compaction. We
1705 // don't want to wait for compaction because the full compaction is
1706 // intentionally blocked while more files are flushed.
1707 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1708 }
1709 if (i == 0) {
1710 TEST_SYNC_POINT(
1711 "DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0");
1712 }
1713 }
1714 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1715
1716 // First compaction should output to bottom level. Second should output to L0
1717 // since older L0 files pending compaction prevent it from being placed lower.
1718 ASSERT_EQ(NumSortedRuns(), 2);
1719 ASSERT_GT(NumTableFilesAtLevel(0), 0);
1720 ASSERT_GT(NumTableFilesAtLevel(num_levels_ - 1), 0);
1721 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1722 Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
1723 }
1724
1725 TEST_P(DBTestUniversalCompaction, RecalculateScoreAfterPicking) {
1726 // Regression test for extra compactions scheduled. Once enough compactions
1727 // have been scheduled to bring the score below one, we should stop
1728 // scheduling more; otherwise, other CFs/DBs may be delayed unnecessarily.
1729 const int kNumFilesTrigger = 8;
1730 Options options = CurrentOptions();
1731 options.memtable_factory.reset(
1732 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1733 options.compaction_options_universal.max_merge_width = kNumFilesTrigger / 2;
1734 options.compaction_options_universal.max_size_amplification_percent =
1735 static_cast<unsigned int>(-1);
1736 options.compaction_style = kCompactionStyleUniversal;
1737 options.level0_file_num_compaction_trigger = kNumFilesTrigger;
1738 options.num_levels = num_levels_;
1739 Reopen(options);
1740
1741 std::atomic<int> num_compactions_attempted(0);
1742 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1743 "DBImpl::BackgroundCompaction:Start",
1744 [&](void* /*arg*/) { ++num_compactions_attempted; });
1745 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1746
1747 Random rnd(301);
1748 for (int num = 0; num < kNumFilesTrigger; num++) {
1749 ASSERT_EQ(NumSortedRuns(), num);
1750 int key_idx = 0;
1751 GenerateNewFile(&rnd, &key_idx);
1752 }
1753 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1754 // Compacting the first four files was enough to bring the score below one so
1755 // there's no need to schedule any more compactions.
1756 ASSERT_EQ(1, num_compactions_attempted);
1757 ASSERT_EQ(NumSortedRuns(), 5);
1758 }
1759
1760 TEST_P(DBTestUniversalCompaction, FinalSortedRunCompactFilesConflict) {
1761 // Regression test for conflict between:
1762 // (1) Running CompactFiles including file in the final sorted run; and
1763 // (2) Picking universal size-amp-triggered compaction, which always includes
1764 // the final sorted run.
1765 if (exclusive_manual_compaction_) {
1766 return;
1767 }
1768
1769 Options opts = CurrentOptions();
1770 opts.compaction_style = kCompactionStyleUniversal;
1771 opts.compaction_options_universal.max_size_amplification_percent = 50;
1772 opts.compaction_options_universal.min_merge_width = 2;
1773 opts.compression = kNoCompression;
1774 opts.level0_file_num_compaction_trigger = 2;
1775 opts.max_background_compactions = 2;
1776 opts.num_levels = num_levels_;
1777 Reopen(opts);
1778
1779 // make sure compaction jobs can be parallelized
1780 auto stop_token =
1781 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1782
1783 ASSERT_OK(Put("key", "val"));
1784 Flush();
1785 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1786 ASSERT_EQ(NumTableFilesAtLevel(num_levels_ - 1), 1);
1787 ColumnFamilyMetaData cf_meta;
1788 ColumnFamilyHandle* default_cfh = db_->DefaultColumnFamily();
1789 dbfull()->GetColumnFamilyMetaData(default_cfh, &cf_meta);
1790 ASSERT_EQ(1, cf_meta.levels[num_levels_ - 1].files.size());
1791 std::string first_sst_filename =
1792 cf_meta.levels[num_levels_ - 1].files[0].name;
1793
1794 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1795 {{"CompactFilesImpl:0",
1796 "DBTestUniversalCompaction:FinalSortedRunCompactFilesConflict:0"},
1797 {"DBImpl::BackgroundCompaction():AfterPickCompaction",
1798 "CompactFilesImpl:1"}});
1799 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1800
1801 port::Thread compact_files_thread([&]() {
1802 ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), default_cfh,
1803 {first_sst_filename}, num_levels_ - 1));
1804 });
1805
1806 TEST_SYNC_POINT(
1807 "DBTestUniversalCompaction:FinalSortedRunCompactFilesConflict:0");
1808 for (int i = 0; i < 2; ++i) {
1809 ASSERT_OK(Put("key", "val"));
1810 Flush();
1811 }
1812 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1813
1814 compact_files_thread.join();
1815 }
1816
1817 INSTANTIATE_TEST_CASE_P(NumLevels, DBTestUniversalCompaction,
1818 ::testing::Combine(::testing::Values(1, 3, 5),
1819 ::testing::Bool()));
1820
1821 class DBTestUniversalManualCompactionOutputPathId
1822 : public DBTestUniversalCompactionBase {
1823 public:
1824 DBTestUniversalManualCompactionOutputPathId() :
1825 DBTestUniversalCompactionBase(
1826 "/db_universal_compaction_manual_pid_test") {}
1827 };
1828
1829 TEST_P(DBTestUniversalManualCompactionOutputPathId,
1830 ManualCompactionOutputPathId) {
1831 Options options = CurrentOptions();
1832 options.create_if_missing = true;
1833 options.db_paths.emplace_back(dbname_, 1000000000);
1834 options.db_paths.emplace_back(dbname_ + "_2", 1000000000);
1835 options.compaction_style = kCompactionStyleUniversal;
1836 options.num_levels = num_levels_;
1837 options.target_file_size_base = 1 << 30; // Big size
1838 options.level0_file_num_compaction_trigger = 10;
1839 Destroy(options);
1840 DestroyAndReopen(options);
1841 CreateAndReopenWithCF({"pikachu"}, options);
1842 MakeTables(3, "p", "q", 1);
1843 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1844 ASSERT_EQ(2, TotalLiveFiles(1));
1845 ASSERT_EQ(2, GetSstFileCount(options.db_paths[0].path));
1846 ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
1847
1848 // Full compaction to DB path 0
1849 CompactRangeOptions compact_options;
1850 compact_options.target_path_id = 1;
1851 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1852 ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1853 ASSERT_EQ(1, TotalLiveFiles(1));
1854 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
1855 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1856
1857 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
1858 ASSERT_EQ(1, TotalLiveFiles(1));
1859 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
1860 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1861
1862 MakeTables(1, "p", "q", 1);
1863 ASSERT_EQ(2, TotalLiveFiles(1));
1864 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1865 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1866
1867 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
1868 ASSERT_EQ(2, TotalLiveFiles(1));
1869 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1870 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1871
1872 // Full compaction to DB path 0
1873 compact_options.target_path_id = 0;
1874 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1875 ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1876 ASSERT_EQ(1, TotalLiveFiles(1));
1877 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1878 ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
1879
1880 // Fail when compacting to an invalid path ID
1881 compact_options.target_path_id = 2;
1882 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1883 ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
1884 .IsInvalidArgument());
1885 }
1886
1887 INSTANTIATE_TEST_CASE_P(OutputPathId,
1888 DBTestUniversalManualCompactionOutputPathId,
1889 ::testing::Combine(::testing::Values(1, 8),
1890 ::testing::Bool()));
1891
1892 TEST_F(DBTestUniversalCompaction2, BasicL0toL1) {
1893 const int kNumKeys = 3000;
1894 const int kWindowSize = 100;
1895 const int kNumDelsTrigger = 90;
1896
1897 Options opts = CurrentOptions();
1898 opts.table_properties_collector_factories.emplace_back(
1899 NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
1900 opts.compaction_style = kCompactionStyleUniversal;
1901 opts.level0_file_num_compaction_trigger = 2;
1902 opts.compression = kNoCompression;
1903 opts.compaction_options_universal.size_ratio = 10;
1904 opts.compaction_options_universal.min_merge_width = 2;
1905 opts.compaction_options_universal.max_size_amplification_percent = 200;
1906 Reopen(opts);
1907
1908 // add an L1 file to prevent tombstones from dropping due to obsolescence
1909 // during flush
1910 int i;
1911 for (i = 0; i < 2000; ++i) {
1912 ASSERT_OK(Put(Key(i), "val"));
1913 }
1914 Flush();
1915 // MoveFilesToLevel(6);
1916 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1917
1918 for (i = 1999; i < kNumKeys; ++i) {
1919 if (i >= kNumKeys - kWindowSize &&
1920 i < kNumKeys - kWindowSize + kNumDelsTrigger) {
1921 ASSERT_OK(Delete(Key(i)));
1922 } else {
1923 ASSERT_OK(Put(Key(i), "val"));
1924 }
1925 }
1926 Flush();
1927
1928 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1929 ASSERT_EQ(0, NumTableFilesAtLevel(0));
1930 ASSERT_GT(NumTableFilesAtLevel(6), 0);
1931 }
1932
1933 #if defined(ENABLE_SINGLE_LEVEL_DTC)
1934 TEST_F(DBTestUniversalCompaction2, SingleLevel) {
1935 const int kNumKeys = 3000;
1936 const int kWindowSize = 100;
1937 const int kNumDelsTrigger = 90;
1938
1939 Options opts = CurrentOptions();
1940 opts.table_properties_collector_factories.emplace_back(
1941 NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
1942 opts.compaction_style = kCompactionStyleUniversal;
1943 opts.level0_file_num_compaction_trigger = 2;
1944 opts.compression = kNoCompression;
1945 opts.num_levels = 1;
1946 opts.compaction_options_universal.size_ratio = 10;
1947 opts.compaction_options_universal.min_merge_width = 2;
1948 opts.compaction_options_universal.max_size_amplification_percent = 200;
1949 Reopen(opts);
1950
1951 // add an L1 file to prevent tombstones from dropping due to obsolescence
1952 // during flush
1953 int i;
1954 for (i = 0; i < 2000; ++i) {
1955 ASSERT_OK(Put(Key(i), "val"));
1956 }
1957 Flush();
1958
1959 for (i = 1999; i < kNumKeys; ++i) {
1960 if (i >= kNumKeys - kWindowSize &&
1961 i < kNumKeys - kWindowSize + kNumDelsTrigger) {
1962 ASSERT_OK(Delete(Key(i)));
1963 } else {
1964 ASSERT_OK(Put(Key(i), "val"));
1965 }
1966 }
1967 Flush();
1968
1969 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1970 ASSERT_EQ(1, NumTableFilesAtLevel(0));
1971 }
1972 #endif // ENABLE_SINGLE_LEVEL_DTC
1973
1974 TEST_F(DBTestUniversalCompaction2, MultipleLevels) {
1975 const int kWindowSize = 100;
1976 const int kNumDelsTrigger = 90;
1977
1978 Options opts = CurrentOptions();
1979 opts.table_properties_collector_factories.emplace_back(
1980 NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
1981 opts.compaction_style = kCompactionStyleUniversal;
1982 opts.level0_file_num_compaction_trigger = 4;
1983 opts.compression = kNoCompression;
1984 opts.compaction_options_universal.size_ratio = 10;
1985 opts.compaction_options_universal.min_merge_width = 2;
1986 opts.compaction_options_universal.max_size_amplification_percent = 200;
1987 Reopen(opts);
1988
1989 // add an L1 file to prevent tombstones from dropping due to obsolescence
1990 // during flush
1991 int i;
1992 for (i = 0; i < 500; ++i) {
1993 ASSERT_OK(Put(Key(i), "val"));
1994 }
1995 Flush();
1996 for (i = 500; i < 1000; ++i) {
1997 ASSERT_OK(Put(Key(i), "val"));
1998 }
1999 Flush();
2000 for (i = 1000; i < 1500; ++i) {
2001 ASSERT_OK(Put(Key(i), "val"));
2002 }
2003 Flush();
2004 for (i = 1500; i < 2000; ++i) {
2005 ASSERT_OK(Put(Key(i), "val"));
2006 }
2007 Flush();
2008
2009 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2010 ASSERT_EQ(0, NumTableFilesAtLevel(0));
2011 ASSERT_GT(NumTableFilesAtLevel(6), 0);
2012
2013 for (i = 1999; i < 2333; ++i) {
2014 ASSERT_OK(Put(Key(i), "val"));
2015 }
2016 Flush();
2017 for (i = 2333; i < 2666; ++i) {
2018 ASSERT_OK(Put(Key(i), "val"));
2019 }
2020 Flush();
2021 for (i = 2666; i < 2999; ++i) {
2022 ASSERT_OK(Put(Key(i), "val"));
2023 }
2024 Flush();
2025
2026 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2027 ASSERT_EQ(0, NumTableFilesAtLevel(0));
2028 ASSERT_GT(NumTableFilesAtLevel(6), 0);
2029 ASSERT_GT(NumTableFilesAtLevel(5), 0);
2030
2031 for (i = 1900; i < 2100; ++i) {
2032 ASSERT_OK(Delete(Key(i)));
2033 }
2034 Flush();
2035
2036 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2037 ASSERT_EQ(0, NumTableFilesAtLevel(0));
2038 ASSERT_EQ(0, NumTableFilesAtLevel(1));
2039 ASSERT_EQ(0, NumTableFilesAtLevel(2));
2040 ASSERT_EQ(0, NumTableFilesAtLevel(3));
2041 ASSERT_EQ(0, NumTableFilesAtLevel(4));
2042 ASSERT_EQ(0, NumTableFilesAtLevel(5));
2043 ASSERT_GT(NumTableFilesAtLevel(6), 0);
2044 }
2045
2046 TEST_F(DBTestUniversalCompaction2, OverlappingL0) {
2047 const int kWindowSize = 100;
2048 const int kNumDelsTrigger = 90;
2049
2050 Options opts = CurrentOptions();
2051 opts.table_properties_collector_factories.emplace_back(
2052 NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
2053 opts.compaction_style = kCompactionStyleUniversal;
2054 opts.level0_file_num_compaction_trigger = 5;
2055 opts.compression = kNoCompression;
2056 opts.compaction_options_universal.size_ratio = 10;
2057 opts.compaction_options_universal.min_merge_width = 2;
2058 opts.compaction_options_universal.max_size_amplification_percent = 200;
2059 Reopen(opts);
2060
2061 // add an L1 file to prevent tombstones from dropping due to obsolescence
2062 // during flush
2063 int i;
2064 for (i = 0; i < 2000; ++i) {
2065 ASSERT_OK(Put(Key(i), "val"));
2066 }
2067 Flush();
2068 for (i = 2000; i < 3000; ++i) {
2069 ASSERT_OK(Put(Key(i), "val"));
2070 }
2071 Flush();
2072 for (i = 3500; i < 4000; ++i) {
2073 ASSERT_OK(Put(Key(i), "val"));
2074 }
2075 Flush();
2076 for (i = 2900; i < 3100; ++i) {
2077 ASSERT_OK(Delete(Key(i)));
2078 }
2079 Flush();
2080
2081 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2082 ASSERT_EQ(2, NumTableFilesAtLevel(0));
2083 ASSERT_GT(NumTableFilesAtLevel(6), 0);
2084 }
2085
2086 TEST_F(DBTestUniversalCompaction2, IngestBehind) {
2087 const int kNumKeys = 3000;
2088 const int kWindowSize = 100;
2089 const int kNumDelsTrigger = 90;
2090
2091 Options opts = CurrentOptions();
2092 opts.table_properties_collector_factories.emplace_back(
2093 NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
2094 opts.compaction_style = kCompactionStyleUniversal;
2095 opts.level0_file_num_compaction_trigger = 2;
2096 opts.compression = kNoCompression;
2097 opts.allow_ingest_behind = true;
2098 opts.compaction_options_universal.size_ratio = 10;
2099 opts.compaction_options_universal.min_merge_width = 2;
2100 opts.compaction_options_universal.max_size_amplification_percent = 200;
2101 Reopen(opts);
2102
2103 // add an L1 file to prevent tombstones from dropping due to obsolescence
2104 // during flush
2105 int i;
2106 for (i = 0; i < 2000; ++i) {
2107 ASSERT_OK(Put(Key(i), "val"));
2108 }
2109 Flush();
2110 // MoveFilesToLevel(6);
2111 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2112
2113 for (i = 1999; i < kNumKeys; ++i) {
2114 if (i >= kNumKeys - kWindowSize &&
2115 i < kNumKeys - kWindowSize + kNumDelsTrigger) {
2116 ASSERT_OK(Delete(Key(i)));
2117 } else {
2118 ASSERT_OK(Put(Key(i), "val"));
2119 }
2120 }
2121 Flush();
2122
2123 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2124 ASSERT_EQ(0, NumTableFilesAtLevel(0));
2125 ASSERT_EQ(0, NumTableFilesAtLevel(6));
2126 ASSERT_GT(NumTableFilesAtLevel(5), 0);
2127 }
2128
2129 TEST_F(DBTestUniversalCompaction2, PeriodicCompactionDefault) {
2130 Options options;
2131 options.compaction_style = kCompactionStyleUniversal;
2132 options.env = env_;
2133 KeepFilterFactory* filter = new KeepFilterFactory(true);
2134 options.compaction_filter_factory.reset(filter);
2135 Reopen(options);
2136 ASSERT_EQ(30 * 24 * 60 * 60,
2137 dbfull()->GetOptions().periodic_compaction_seconds);
2138
2139 KeepFilter df;
2140 options.compaction_filter_factory.reset();
2141 options.compaction_filter = &df;
2142 Reopen(options);
2143 ASSERT_EQ(30 * 24 * 60 * 60,
2144 dbfull()->GetOptions().periodic_compaction_seconds);
2145
2146 options.ttl = 60 * 24 * 60 * 60;
2147 options.compaction_filter = nullptr;
2148 Reopen(options);
2149 ASSERT_EQ(60 * 24 * 60 * 60,
2150 dbfull()->GetOptions().periodic_compaction_seconds);
2151 }
2152
2153 TEST_F(DBTestUniversalCompaction2, PeriodicCompaction) {
2154 Options opts = CurrentOptions();
2155 opts.env = env_;
2156 opts.compaction_style = kCompactionStyleUniversal;
2157 opts.level0_file_num_compaction_trigger = 10;
2158 opts.max_open_files = -1;
2159 opts.compaction_options_universal.size_ratio = 10;
2160 opts.compaction_options_universal.min_merge_width = 2;
2161 opts.compaction_options_universal.max_size_amplification_percent = 200;
2162 opts.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
2163 opts.num_levels = 5;
2164 env_->SetMockSleep();
2165 Reopen(opts);
2166
2167 // NOTE: Presumed unnecessary and removed: resetting mock time in env
2168
2169 int periodic_compactions = 0;
2170 int start_level = -1;
2171 int output_level = -1;
2172 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2173 "UniversalCompactionPicker::PickPeriodicCompaction:Return",
2174 [&](void* arg) {
2175 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
2176 ASSERT_TRUE(arg != nullptr);
2177 ASSERT_TRUE(compaction->compaction_reason() ==
2178 CompactionReason::kPeriodicCompaction);
2179 start_level = compaction->start_level();
2180 output_level = compaction->output_level();
2181 periodic_compactions++;
2182 });
2183 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2184
2185 // Case 1: Oldest flushed file excceeds periodic compaction threshold.
2186 ASSERT_OK(Put("foo", "bar"));
2187 Flush();
2188 ASSERT_EQ(0, periodic_compactions);
2189 // Move clock forward so that the flushed file would qualify periodic
2190 // compaction.
2191 env_->MockSleepForSeconds(48 * 60 * 60 + 100);
2192
2193 // Another flush would trigger compaction the oldest file.
2194 ASSERT_OK(Put("foo", "bar2"));
2195 Flush();
2196 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2197
2198 ASSERT_EQ(1, periodic_compactions);
2199 ASSERT_EQ(0, start_level);
2200 ASSERT_EQ(4, output_level);
2201
2202 // Case 2: Oldest compacted file excceeds periodic compaction threshold
2203 periodic_compactions = 0;
2204 // A flush doesn't trigger a periodic compaction when threshold not hit
2205 ASSERT_OK(Put("foo", "bar2"));
2206 Flush();
2207 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2208 ASSERT_EQ(0, periodic_compactions);
2209
2210 // After periodic compaction threshold hits, a flush will trigger
2211 // a compaction
2212 ASSERT_OK(Put("foo", "bar2"));
2213 env_->MockSleepForSeconds(48 * 60 * 60 + 100);
2214 Flush();
2215 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2216 ASSERT_EQ(1, periodic_compactions);
2217 ASSERT_EQ(0, start_level);
2218 ASSERT_EQ(4, output_level);
2219 }
2220
2221 } // namespace ROCKSDB_NAMESPACE
2222
2223 #endif // !defined(ROCKSDB_LITE)
2224
2225 int main(int argc, char** argv) {
2226 #if !defined(ROCKSDB_LITE)
2227 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2228 ::testing::InitGoogleTest(&argc, argv);
2229 return RUN_ALL_TESTS();
2230 #else
2231 (void) argc;
2232 (void) argv;
2233 return 0;
2234 #endif
2235 }