]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_universal_compaction_test.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / db / db_universal_compaction_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12 #if !defined(ROCKSDB_LITE)
13 #include "util/sync_point.h"
14
15 namespace rocksdb {
16
17 static std::string CompressibleString(Random* rnd, int len) {
18 std::string r;
19 test::CompressibleString(rnd, 0.8, len, &r);
20 return r;
21 }
22
23 class DBTestUniversalCompactionBase
24 : public DBTestBase,
25 public ::testing::WithParamInterface<std::tuple<int, bool>> {
26 public:
27 explicit DBTestUniversalCompactionBase(
28 const std::string& path) : DBTestBase(path) {}
29 virtual void SetUp() override {
30 num_levels_ = std::get<0>(GetParam());
31 exclusive_manual_compaction_ = std::get<1>(GetParam());
32 }
33 int num_levels_;
34 bool exclusive_manual_compaction_;
35 };
36
37 class DBTestUniversalCompaction : public DBTestUniversalCompactionBase {
38 public:
39 DBTestUniversalCompaction() :
40 DBTestUniversalCompactionBase("/db_universal_compaction_test") {}
41 };
42
43 namespace {
44 void VerifyCompactionResult(
45 const ColumnFamilyMetaData& cf_meta,
46 const std::set<std::string>& overlapping_file_numbers) {
47 #ifndef NDEBUG
48 for (auto& level : cf_meta.levels) {
49 for (auto& file : level.files) {
50 assert(overlapping_file_numbers.find(file.name) ==
51 overlapping_file_numbers.end());
52 }
53 }
54 #endif
55 }
56
57 class KeepFilter : public CompactionFilter {
58 public:
59 virtual bool Filter(int level, const Slice& key, const Slice& value,
60 std::string* new_value, bool* value_changed) const
61 override {
62 return false;
63 }
64
65 virtual const char* Name() const override { return "KeepFilter"; }
66 };
67
68 class KeepFilterFactory : public CompactionFilterFactory {
69 public:
70 explicit KeepFilterFactory(bool check_context = false)
71 : check_context_(check_context) {}
72
73 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
74 const CompactionFilter::Context& context) override {
75 if (check_context_) {
76 EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
77 EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
78 }
79 return std::unique_ptr<CompactionFilter>(new KeepFilter());
80 }
81
82 virtual const char* Name() const override { return "KeepFilterFactory"; }
83 bool check_context_;
84 std::atomic_bool expect_full_compaction_;
85 std::atomic_bool expect_manual_compaction_;
86 };
87
88 class DelayFilter : public CompactionFilter {
89 public:
90 explicit DelayFilter(DBTestBase* d) : db_test(d) {}
91 virtual bool Filter(int level, const Slice& key, const Slice& value,
92 std::string* new_value,
93 bool* value_changed) const override {
94 db_test->env_->addon_time_.fetch_add(1000);
95 return true;
96 }
97
98 virtual const char* Name() const override { return "DelayFilter"; }
99
100 private:
101 DBTestBase* db_test;
102 };
103
104 class DelayFilterFactory : public CompactionFilterFactory {
105 public:
106 explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
107 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
108 const CompactionFilter::Context& context) override {
109 return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
110 }
111
112 virtual const char* Name() const override { return "DelayFilterFactory"; }
113
114 private:
115 DBTestBase* db_test;
116 };
117 } // namespace
118
119 // Make sure we don't trigger a problem if the trigger conditon is given
120 // to be 0, which is invalid.
121 TEST_P(DBTestUniversalCompaction, UniversalCompactionSingleSortedRun) {
122 Options options = CurrentOptions();
123
124 options.compaction_style = kCompactionStyleUniversal;
125 options.num_levels = num_levels_;
126 // Config universal compaction to always compact to one single sorted run.
127 options.level0_file_num_compaction_trigger = 0;
128 options.compaction_options_universal.size_ratio = 10;
129 options.compaction_options_universal.min_merge_width = 2;
130 options.compaction_options_universal.max_size_amplification_percent = 0;
131
132 options.write_buffer_size = 105 << 10; // 105KB
133 options.arena_block_size = 4 << 10;
134 options.target_file_size_base = 32 << 10; // 32KB
135 // trigger compaction if there are >= 4 files
136 KeepFilterFactory* filter = new KeepFilterFactory(true);
137 filter->expect_manual_compaction_.store(false);
138 options.compaction_filter_factory.reset(filter);
139
140 DestroyAndReopen(options);
141 ASSERT_EQ(1, db_->GetOptions().level0_file_num_compaction_trigger);
142
143 Random rnd(301);
144 int key_idx = 0;
145
146 filter->expect_full_compaction_.store(true);
147
148 for (int num = 0; num < 16; num++) {
149 // Write 100KB file. And immediately it should be compacted to one file.
150 GenerateNewFile(&rnd, &key_idx);
151 dbfull()->TEST_WaitForCompact();
152 ASSERT_EQ(NumSortedRuns(0), 1);
153 }
154 ASSERT_OK(Put(Key(key_idx), ""));
155 dbfull()->TEST_WaitForCompact();
156 ASSERT_EQ(NumSortedRuns(0), 1);
157 }
158
159 TEST_P(DBTestUniversalCompaction, OptimizeFiltersForHits) {
160 Options options = CurrentOptions();
161 options.compaction_style = kCompactionStyleUniversal;
162 options.compaction_options_universal.size_ratio = 5;
163 options.num_levels = num_levels_;
164 options.write_buffer_size = 105 << 10; // 105KB
165 options.arena_block_size = 4 << 10;
166 options.target_file_size_base = 32 << 10; // 32KB
167 // trigger compaction if there are >= 4 files
168 options.level0_file_num_compaction_trigger = 4;
169 BlockBasedTableOptions bbto;
170 bbto.cache_index_and_filter_blocks = true;
171 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
172 bbto.whole_key_filtering = true;
173 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
174 options.optimize_filters_for_hits = true;
175 options.statistics = rocksdb::CreateDBStatistics();
176 options.memtable_factory.reset(new SpecialSkipListFactory(3));
177
178 DestroyAndReopen(options);
179
180 // block compaction from happening
181 env_->SetBackgroundThreads(1, Env::LOW);
182 test::SleepingBackgroundTask sleeping_task_low;
183 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
184 Env::Priority::LOW);
185
186 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
187 Put(Key(num * 10), "val");
188 if (num) {
189 dbfull()->TEST_WaitForFlushMemTable();
190 }
191 Put(Key(30 + num * 10), "val");
192 Put(Key(60 + num * 10), "val");
193 }
194 Put("", "");
195 dbfull()->TEST_WaitForFlushMemTable();
196
197 // Query set of non existing keys
198 for (int i = 5; i < 90; i += 10) {
199 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
200 }
201
202 // Make sure bloom filter is used at least once.
203 ASSERT_GT(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0);
204 auto prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
205
206 // Make sure bloom filter is used for all but the last L0 file when looking
207 // up a non-existent key that's in the range of all L0 files.
208 ASSERT_EQ(Get(Key(35)), "NOT_FOUND");
209 ASSERT_EQ(prev_counter + NumTableFilesAtLevel(0) - 1,
210 TestGetTickerCount(options, BLOOM_FILTER_USEFUL));
211 prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
212
213 // Unblock compaction and wait it for happening.
214 sleeping_task_low.WakeUp();
215 dbfull()->TEST_WaitForCompact();
216
217 // The same queries will not trigger bloom filter
218 for (int i = 5; i < 90; i += 10) {
219 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
220 }
221 ASSERT_EQ(prev_counter, TestGetTickerCount(options, BLOOM_FILTER_USEFUL));
222 }
223
224 // TODO(kailiu) The tests on UniversalCompaction has some issues:
225 // 1. A lot of magic numbers ("11" or "12").
226 // 2. Made assumption on the memtable flush conditions, which may change from
227 // time to time.
228 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrigger) {
229 Options options;
230 options.compaction_style = kCompactionStyleUniversal;
231 options.compaction_options_universal.size_ratio = 5;
232 options.num_levels = num_levels_;
233 options.write_buffer_size = 105 << 10; // 105KB
234 options.arena_block_size = 4 << 10;
235 options.target_file_size_base = 32 << 10; // 32KB
236 // trigger compaction if there are >= 4 files
237 options.level0_file_num_compaction_trigger = 4;
238 KeepFilterFactory* filter = new KeepFilterFactory(true);
239 filter->expect_manual_compaction_.store(false);
240 options.compaction_filter_factory.reset(filter);
241
242 options = CurrentOptions(options);
243 DestroyAndReopen(options);
244 CreateAndReopenWithCF({"pikachu"}, options);
245
246 rocksdb::SyncPoint::GetInstance()->SetCallBack(
247 "DBTestWritableFile.GetPreallocationStatus", [&](void* arg) {
248 ASSERT_TRUE(arg != nullptr);
249 size_t preallocation_size = *(static_cast<size_t*>(arg));
250 if (num_levels_ > 3) {
251 ASSERT_LE(preallocation_size, options.target_file_size_base * 1.1);
252 }
253 });
254 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
255
256 Random rnd(301);
257 int key_idx = 0;
258
259 filter->expect_full_compaction_.store(true);
260 // Stage 1:
261 // Generate a set of files at level 0, but don't trigger level-0
262 // compaction.
263 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
264 num++) {
265 // Write 100KB
266 GenerateNewFile(1, &rnd, &key_idx);
267 }
268
269 // Generate one more file at level-0, which should trigger level-0
270 // compaction.
271 GenerateNewFile(1, &rnd, &key_idx);
272 // Suppose each file flushed from mem table has size 1. Now we compact
273 // (level0_file_num_compaction_trigger+1)=4 files and should have a big
274 // file of size 4.
275 ASSERT_EQ(NumSortedRuns(1), 1);
276
277 // Stage 2:
278 // Now we have one file at level 0, with size 4. We also have some data in
279 // mem table. Let's continue generating new files at level 0, but don't
280 // trigger level-0 compaction.
281 // First, clean up memtable before inserting new data. This will generate
282 // a level-0 file, with size around 0.4 (according to previously written
283 // data amount).
284 filter->expect_full_compaction_.store(false);
285 ASSERT_OK(Flush(1));
286 for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
287 num++) {
288 GenerateNewFile(1, &rnd, &key_idx);
289 ASSERT_EQ(NumSortedRuns(1), num + 3);
290 }
291
292 // Generate one more file at level-0, which should trigger level-0
293 // compaction.
294 GenerateNewFile(1, &rnd, &key_idx);
295 // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
296 // After compaction, we should have 2 files, with size 4, 2.4.
297 ASSERT_EQ(NumSortedRuns(1), 2);
298
299 // Stage 3:
300 // Now we have 2 files at level 0, with size 4 and 2.4. Continue
301 // generating new files at level 0.
302 for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
303 num++) {
304 GenerateNewFile(1, &rnd, &key_idx);
305 ASSERT_EQ(NumSortedRuns(1), num + 3);
306 }
307
308 // Generate one more file at level-0, which should trigger level-0
309 // compaction.
310 GenerateNewFile(1, &rnd, &key_idx);
311 // Before compaction, we have 4 files at level 0, with size 4, 2.4, 1, 1.
312 // After compaction, we should have 3 files, with size 4, 2.4, 2.
313 ASSERT_EQ(NumSortedRuns(1), 3);
314
315 // Stage 4:
316 // Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a
317 // new file of size 1.
318 GenerateNewFile(1, &rnd, &key_idx);
319 dbfull()->TEST_WaitForCompact();
320 // Level-0 compaction is triggered, but no file will be picked up.
321 ASSERT_EQ(NumSortedRuns(1), 4);
322
323 // Stage 5:
324 // Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
325 // a new file of size 1.
326 filter->expect_full_compaction_.store(true);
327 GenerateNewFile(1, &rnd, &key_idx);
328 dbfull()->TEST_WaitForCompact();
329 // All files at level 0 will be compacted into a single one.
330 ASSERT_EQ(NumSortedRuns(1), 1);
331
332 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
333 }
334
335 TEST_P(DBTestUniversalCompaction, UniversalCompactionSizeAmplification) {
336 Options options = CurrentOptions();
337 options.compaction_style = kCompactionStyleUniversal;
338 options.num_levels = num_levels_;
339 options.write_buffer_size = 100 << 10; // 100KB
340 options.target_file_size_base = 32 << 10; // 32KB
341 options.level0_file_num_compaction_trigger = 3;
342 DestroyAndReopen(options);
343 CreateAndReopenWithCF({"pikachu"}, options);
344
345 // Trigger compaction if size amplification exceeds 110%
346 options.compaction_options_universal.max_size_amplification_percent = 110;
347 options = CurrentOptions(options);
348 ReopenWithColumnFamilies({"default", "pikachu"}, options);
349
350 Random rnd(301);
351 int key_idx = 0;
352
353 // Generate two files in Level 0. Both files are approx the same size.
354 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
355 num++) {
356 // Write 110KB (11 values, each 10K)
357 for (int i = 0; i < 11; i++) {
358 ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
359 key_idx++;
360 }
361 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
362 ASSERT_EQ(NumSortedRuns(1), num + 1);
363 }
364 ASSERT_EQ(NumSortedRuns(1), 2);
365
366 // Flush whatever is remaining in memtable. This is typically
367 // small, which should not trigger size ratio based compaction
368 // but will instead trigger size amplification.
369 ASSERT_OK(Flush(1));
370
371 dbfull()->TEST_WaitForCompact();
372
373 // Verify that size amplification did occur
374 ASSERT_EQ(NumSortedRuns(1), 1);
375 }
376
377 TEST_P(DBTestUniversalCompaction, CompactFilesOnUniversalCompaction) {
378 const int kTestKeySize = 16;
379 const int kTestValueSize = 984;
380 const int kEntrySize = kTestKeySize + kTestValueSize;
381 const int kEntriesPerBuffer = 10;
382
383 ChangeCompactOptions();
384 Options options;
385 options.create_if_missing = true;
386 options.compaction_style = kCompactionStyleLevel;
387 options.num_levels = 1;
388 options.target_file_size_base = options.write_buffer_size;
389 options.compression = kNoCompression;
390 options = CurrentOptions(options);
391 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
392 CreateAndReopenWithCF({"pikachu"}, options);
393 ASSERT_EQ(options.compaction_style, kCompactionStyleUniversal);
394 Random rnd(301);
395 for (int key = 1024 * kEntriesPerBuffer; key >= 0; --key) {
396 ASSERT_OK(Put(1, ToString(key), RandomString(&rnd, kTestValueSize)));
397 }
398 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
399 dbfull()->TEST_WaitForCompact();
400 ColumnFamilyMetaData cf_meta;
401 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
402 std::vector<std::string> compaction_input_file_names;
403 for (auto file : cf_meta.levels[0].files) {
404 if (rnd.OneIn(2)) {
405 compaction_input_file_names.push_back(file.name);
406 }
407 }
408
409 if (compaction_input_file_names.size() == 0) {
410 compaction_input_file_names.push_back(
411 cf_meta.levels[0].files[0].name);
412 }
413
414 // expect fail since universal compaction only allow L0 output
415 ASSERT_FALSE(dbfull()
416 ->CompactFiles(CompactionOptions(), handles_[1],
417 compaction_input_file_names, 1)
418 .ok());
419
420 // expect ok and verify the compacted files no longer exist.
421 ASSERT_OK(dbfull()->CompactFiles(
422 CompactionOptions(), handles_[1],
423 compaction_input_file_names, 0));
424
425 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
426 VerifyCompactionResult(
427 cf_meta,
428 std::set<std::string>(compaction_input_file_names.begin(),
429 compaction_input_file_names.end()));
430
431 compaction_input_file_names.clear();
432
433 // Pick the first and the last file, expect everything is
434 // compacted into one single file.
435 compaction_input_file_names.push_back(
436 cf_meta.levels[0].files[0].name);
437 compaction_input_file_names.push_back(
438 cf_meta.levels[0].files[
439 cf_meta.levels[0].files.size() - 1].name);
440 ASSERT_OK(dbfull()->CompactFiles(
441 CompactionOptions(), handles_[1],
442 compaction_input_file_names, 0));
443
444 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
445 ASSERT_EQ(cf_meta.levels[0].files.size(), 1U);
446 }
447
448 TEST_P(DBTestUniversalCompaction, UniversalCompactionTargetLevel) {
449 Options options = CurrentOptions();
450 options.compaction_style = kCompactionStyleUniversal;
451 options.write_buffer_size = 100 << 10; // 100KB
452 options.num_levels = 7;
453 options.disable_auto_compactions = true;
454 DestroyAndReopen(options);
455
456 // Generate 3 overlapping files
457 Random rnd(301);
458 for (int i = 0; i < 210; i++) {
459 ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
460 }
461 ASSERT_OK(Flush());
462
463 for (int i = 200; i < 300; i++) {
464 ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
465 }
466 ASSERT_OK(Flush());
467
468 for (int i = 250; i < 260; i++) {
469 ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
470 }
471 ASSERT_OK(Flush());
472
473 ASSERT_EQ("3", FilesPerLevel(0));
474 // Compact all files into 1 file and put it in L4
475 CompactRangeOptions compact_options;
476 compact_options.change_level = true;
477 compact_options.target_level = 4;
478 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
479 db_->CompactRange(compact_options, nullptr, nullptr);
480 ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
481 }
482
483
484 class DBTestUniversalCompactionMultiLevels
485 : public DBTestUniversalCompactionBase {
486 public:
487 DBTestUniversalCompactionMultiLevels() :
488 DBTestUniversalCompactionBase(
489 "/db_universal_compaction_multi_levels_test") {}
490 };
491
492 TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionMultiLevels) {
493 Options options = CurrentOptions();
494 options.compaction_style = kCompactionStyleUniversal;
495 options.num_levels = num_levels_;
496 options.write_buffer_size = 100 << 10; // 100KB
497 options.level0_file_num_compaction_trigger = 8;
498 options.max_background_compactions = 3;
499 options.target_file_size_base = 32 * 1024;
500 CreateAndReopenWithCF({"pikachu"}, options);
501
502 // Trigger compaction if size amplification exceeds 110%
503 options.compaction_options_universal.max_size_amplification_percent = 110;
504 options = CurrentOptions(options);
505 ReopenWithColumnFamilies({"default", "pikachu"}, options);
506
507 Random rnd(301);
508 int num_keys = 100000;
509 for (int i = 0; i < num_keys * 2; i++) {
510 ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
511 }
512
513 dbfull()->TEST_WaitForCompact();
514
515 for (int i = num_keys; i < num_keys * 2; i++) {
516 ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
517 }
518 }
519 // Tests universal compaction with trivial move enabled
520 TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) {
521 int32_t trivial_move = 0;
522 int32_t non_trivial_move = 0;
523 rocksdb::SyncPoint::GetInstance()->SetCallBack(
524 "DBImpl::BackgroundCompaction:TrivialMove",
525 [&](void* arg) { trivial_move++; });
526 rocksdb::SyncPoint::GetInstance()->SetCallBack(
527 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
528 non_trivial_move++;
529 ASSERT_TRUE(arg != nullptr);
530 int output_level = *(static_cast<int*>(arg));
531 ASSERT_EQ(output_level, 0);
532 });
533 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
534
535 Options options = CurrentOptions();
536 options.compaction_style = kCompactionStyleUniversal;
537 options.compaction_options_universal.allow_trivial_move = true;
538 options.num_levels = 3;
539 options.write_buffer_size = 100 << 10; // 100KB
540 options.level0_file_num_compaction_trigger = 3;
541 options.max_background_compactions = 2;
542 options.target_file_size_base = 32 * 1024;
543 DestroyAndReopen(options);
544 CreateAndReopenWithCF({"pikachu"}, options);
545
546 // Trigger compaction if size amplification exceeds 110%
547 options.compaction_options_universal.max_size_amplification_percent = 110;
548 options = CurrentOptions(options);
549 ReopenWithColumnFamilies({"default", "pikachu"}, options);
550
551 Random rnd(301);
552 int num_keys = 150000;
553 for (int i = 0; i < num_keys; i++) {
554 ASSERT_OK(Put(1, Key(i), Key(i)));
555 }
556 std::vector<std::string> values;
557
558 ASSERT_OK(Flush(1));
559 dbfull()->TEST_WaitForCompact();
560
561 ASSERT_GT(trivial_move, 0);
562 ASSERT_GT(non_trivial_move, 0);
563
564 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
565 }
566
567 INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels,
568 DBTestUniversalCompactionMultiLevels,
569 ::testing::Combine(::testing::Values(3, 20),
570 ::testing::Bool()));
571
572 class DBTestUniversalCompactionParallel :
573 public DBTestUniversalCompactionBase {
574 public:
575 DBTestUniversalCompactionParallel() :
576 DBTestUniversalCompactionBase(
577 "/db_universal_compaction_prallel_test") {}
578 };
579
580 TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) {
581 Options options = CurrentOptions();
582 options.compaction_style = kCompactionStyleUniversal;
583 options.num_levels = num_levels_;
584 options.write_buffer_size = 1 << 10; // 1KB
585 options.level0_file_num_compaction_trigger = 3;
586 options.max_background_compactions = 3;
587 options.max_background_flushes = 3;
588 options.target_file_size_base = 1 * 1024;
589 options.compaction_options_universal.max_size_amplification_percent = 110;
590 DestroyAndReopen(options);
591 CreateAndReopenWithCF({"pikachu"}, options);
592
593 // Delay every compaction so multiple compactions will happen.
594 std::atomic<int> num_compactions_running(0);
595 std::atomic<bool> has_parallel(false);
596 rocksdb::SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Start",
597 [&](void* arg) {
598 if (num_compactions_running.fetch_add(1) > 0) {
599 has_parallel.store(true);
600 return;
601 }
602 for (int nwait = 0; nwait < 20000; nwait++) {
603 if (has_parallel.load() || num_compactions_running.load() > 1) {
604 has_parallel.store(true);
605 break;
606 }
607 env_->SleepForMicroseconds(1000);
608 }
609 });
610 rocksdb::SyncPoint::GetInstance()->SetCallBack(
611 "CompactionJob::Run():End",
612 [&](void* arg) { num_compactions_running.fetch_add(-1); });
613 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
614
615 options = CurrentOptions(options);
616 ReopenWithColumnFamilies({"default", "pikachu"}, options);
617
618 Random rnd(301);
619 int num_keys = 30000;
620 for (int i = 0; i < num_keys * 2; i++) {
621 ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
622 }
623 dbfull()->TEST_WaitForCompact();
624
625 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
626 ASSERT_EQ(num_compactions_running.load(), 0);
627 ASSERT_TRUE(has_parallel.load());
628
629 for (int i = num_keys; i < num_keys * 2; i++) {
630 ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
631 }
632
633 // Reopen and check.
634 ReopenWithColumnFamilies({"default", "pikachu"}, options);
635 for (int i = num_keys; i < num_keys * 2; i++) {
636 ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
637 }
638 }
639
640 TEST_P(DBTestUniversalCompactionParallel, PickByFileNumberBug) {
641 Options options = CurrentOptions();
642 options.compaction_style = kCompactionStyleUniversal;
643 options.num_levels = num_levels_;
644 options.write_buffer_size = 1 * 1024; // 1KB
645 options.level0_file_num_compaction_trigger = 7;
646 options.max_background_compactions = 2;
647 options.target_file_size_base = 1024 * 1024; // 1MB
648
649 // Disable size amplifiction compaction
650 options.compaction_options_universal.max_size_amplification_percent =
651 UINT_MAX;
652 DestroyAndReopen(options);
653
654 rocksdb::SyncPoint::GetInstance()->LoadDependency(
655 {{"DBTestUniversalCompactionParallel::PickByFileNumberBug:0",
656 "BackgroundCallCompaction:0"},
657 {"UniversalCompactionPicker::PickCompaction:Return",
658 "DBTestUniversalCompactionParallel::PickByFileNumberBug:1"},
659 {"DBTestUniversalCompactionParallel::PickByFileNumberBug:2",
660 "CompactionJob::Run():Start"}});
661
662 int total_picked_compactions = 0;
663 rocksdb::SyncPoint::GetInstance()->SetCallBack(
664 "UniversalCompactionPicker::PickCompaction:Return", [&](void* arg) {
665 if (arg) {
666 total_picked_compactions++;
667 }
668 });
669
670 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
671
672 // Write 7 files to trigger compaction
673 int key_idx = 1;
674 for (int i = 1; i <= 70; i++) {
675 std::string k = Key(key_idx++);
676 ASSERT_OK(Put(k, k));
677 if (i % 10 == 0) {
678 ASSERT_OK(Flush());
679 }
680 }
681
682 // Wait for the 1st background compaction process to start
683 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
684 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
685 rocksdb::SyncPoint::GetInstance()->ClearTrace();
686
687 // Write 3 files while 1st compaction is held
688 // These 3 files have different sizes to avoid compacting based on size_ratio
689 int num_keys = 1000;
690 for (int i = 0; i < 3; i++) {
691 for (int j = 1; j <= num_keys; j++) {
692 std::string k = Key(key_idx++);
693 ASSERT_OK(Put(k, k));
694 }
695 ASSERT_OK(Flush());
696 num_keys -= 100;
697 }
698
699 // Wait for the 2nd background compaction process to start
700 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
701 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
702
703 // Hold the 1st and 2nd compaction from finishing
704 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
705 dbfull()->TEST_WaitForCompact();
706
707 // Although 2 compaction threads started, the second one did not compact
708 // anything because the number of files not being compacted is less than
709 // level0_file_num_compaction_trigger
710 EXPECT_EQ(total_picked_compactions, 1);
711 EXPECT_EQ(TotalTableFiles(), 4);
712
713 // Stop SyncPoint and destroy the DB and reopen it again
714 rocksdb::SyncPoint::GetInstance()->ClearTrace();
715 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
716 key_idx = 1;
717 total_picked_compactions = 0;
718 DestroyAndReopen(options);
719
720 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
721
722 // Write 7 files to trigger compaction
723 for (int i = 1; i <= 70; i++) {
724 std::string k = Key(key_idx++);
725 ASSERT_OK(Put(k, k));
726 if (i % 10 == 0) {
727 ASSERT_OK(Flush());
728 }
729 }
730
731 // Wait for the 1st background compaction process to start
732 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
733 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
734 rocksdb::SyncPoint::GetInstance()->ClearTrace();
735
736 // Write 8 files while 1st compaction is held
737 // These 8 files have different sizes to avoid compacting based on size_ratio
738 num_keys = 1000;
739 for (int i = 0; i < 8; i++) {
740 for (int j = 1; j <= num_keys; j++) {
741 std::string k = Key(key_idx++);
742 ASSERT_OK(Put(k, k));
743 }
744 ASSERT_OK(Flush());
745 num_keys -= 100;
746 }
747
748 // Wait for the 2nd background compaction process to start
749 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
750 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
751
752 // Hold the 1st and 2nd compaction from finishing
753 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
754 dbfull()->TEST_WaitForCompact();
755
756 // This time we will trigger a compaction because of size ratio and
757 // another compaction because of number of files that are not compacted
758 // greater than 7
759 EXPECT_GE(total_picked_compactions, 2);
760 }
761
762 INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel,
763 DBTestUniversalCompactionParallel,
764 ::testing::Combine(::testing::Values(1, 10),
765 ::testing::Values(false)));
766
767 TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) {
768 Options options = CurrentOptions();
769 options.compaction_style = kCompactionStyleUniversal;
770 options.write_buffer_size = 105 << 10; // 105KB
771 options.arena_block_size = 4 << 10; // 4KB
772 options.target_file_size_base = 32 << 10; // 32KB
773 options.level0_file_num_compaction_trigger = 4;
774 options.num_levels = num_levels_;
775 options.compaction_options_universal.compression_size_percent = -1;
776 DestroyAndReopen(options);
777 CreateAndReopenWithCF({"pikachu"}, options);
778
779 Random rnd(301);
780 int key_idx = 0;
781
782 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
783 // Write 100KB (100 values, each 1K)
784 for (int i = 0; i < 100; i++) {
785 ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 990)));
786 key_idx++;
787 }
788 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
789
790 if (num < options.level0_file_num_compaction_trigger - 1) {
791 ASSERT_EQ(NumSortedRuns(1), num + 1);
792 }
793 }
794
795 dbfull()->TEST_WaitForCompact();
796 ASSERT_EQ(NumSortedRuns(1), 1);
797 }
798
799 TEST_P(DBTestUniversalCompaction, UniversalCompactionStopStyleSimilarSize) {
800 Options options = CurrentOptions();
801 options.compaction_style = kCompactionStyleUniversal;
802 options.write_buffer_size = 105 << 10; // 105KB
803 options.arena_block_size = 4 << 10; // 4KB
804 options.target_file_size_base = 32 << 10; // 32KB
805 // trigger compaction if there are >= 4 files
806 options.level0_file_num_compaction_trigger = 4;
807 options.compaction_options_universal.size_ratio = 10;
808 options.compaction_options_universal.stop_style =
809 kCompactionStopStyleSimilarSize;
810 options.num_levels = num_levels_;
811 DestroyAndReopen(options);
812
813 Random rnd(301);
814 int key_idx = 0;
815
816 // Stage 1:
817 // Generate a set of files at level 0, but don't trigger level-0
818 // compaction.
819 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
820 num++) {
821 // Write 100KB (100 values, each 1K)
822 for (int i = 0; i < 100; i++) {
823 ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
824 key_idx++;
825 }
826 dbfull()->TEST_WaitForFlushMemTable();
827 ASSERT_EQ(NumSortedRuns(), num + 1);
828 }
829
830 // Generate one more file at level-0, which should trigger level-0
831 // compaction.
832 for (int i = 0; i < 100; i++) {
833 ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
834 key_idx++;
835 }
836 dbfull()->TEST_WaitForCompact();
837 // Suppose each file flushed from mem table has size 1. Now we compact
838 // (level0_file_num_compaction_trigger+1)=4 files and should have a big
839 // file of size 4.
840 ASSERT_EQ(NumSortedRuns(), 1);
841
842 // Stage 2:
843 // Now we have one file at level 0, with size 4. We also have some data in
844 // mem table. Let's continue generating new files at level 0, but don't
845 // trigger level-0 compaction.
846 // First, clean up memtable before inserting new data. This will generate
847 // a level-0 file, with size around 0.4 (according to previously written
848 // data amount).
849 dbfull()->Flush(FlushOptions());
850 for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
851 num++) {
852 // Write 110KB (11 values, each 10K)
853 for (int i = 0; i < 100; i++) {
854 ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
855 key_idx++;
856 }
857 dbfull()->TEST_WaitForFlushMemTable();
858 ASSERT_EQ(NumSortedRuns(), num + 3);
859 }
860
861 // Generate one more file at level-0, which should trigger level-0
862 // compaction.
863 for (int i = 0; i < 100; i++) {
864 ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
865 key_idx++;
866 }
867 dbfull()->TEST_WaitForCompact();
868 // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
869 // After compaction, we should have 3 files, with size 4, 0.4, 2.
870 ASSERT_EQ(NumSortedRuns(), 3);
871 // Stage 3:
872 // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one
873 // more file at level-0, which should trigger level-0 compaction.
874 for (int i = 0; i < 100; i++) {
875 ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
876 key_idx++;
877 }
878 dbfull()->TEST_WaitForCompact();
879 // Level-0 compaction is triggered, but no file will be picked up.
880 ASSERT_EQ(NumSortedRuns(), 4);
881 }
882
883 TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio1) {
884 if (!Snappy_Supported()) {
885 return;
886 }
887
888 Options options = CurrentOptions();
889 options.compaction_style = kCompactionStyleUniversal;
890 options.write_buffer_size = 100 << 10; // 100KB
891 options.target_file_size_base = 32 << 10; // 32KB
892 options.level0_file_num_compaction_trigger = 2;
893 options.num_levels = num_levels_;
894 options.compaction_options_universal.compression_size_percent = 70;
895 DestroyAndReopen(options);
896
897 Random rnd(301);
898 int key_idx = 0;
899
900 // The first compaction (2) is compressed.
901 for (int num = 0; num < 2; num++) {
902 // Write 110KB (11 values, each 10K)
903 for (int i = 0; i < 11; i++) {
904 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
905 key_idx++;
906 }
907 dbfull()->TEST_WaitForFlushMemTable();
908 dbfull()->TEST_WaitForCompact();
909 }
910 ASSERT_LT(TotalSize(), 110000U * 2 * 0.9);
911
912 // The second compaction (4) is compressed
913 for (int num = 0; num < 2; num++) {
914 // Write 110KB (11 values, each 10K)
915 for (int i = 0; i < 11; i++) {
916 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
917 key_idx++;
918 }
919 dbfull()->TEST_WaitForFlushMemTable();
920 dbfull()->TEST_WaitForCompact();
921 }
922 ASSERT_LT(TotalSize(), 110000 * 4 * 0.9);
923
924 // The third compaction (2 4) is compressed since this time it is
925 // (1 1 3.2) and 3.2/5.2 doesn't reach ratio.
926 for (int num = 0; num < 2; num++) {
927 // Write 110KB (11 values, each 10K)
928 for (int i = 0; i < 11; i++) {
929 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
930 key_idx++;
931 }
932 dbfull()->TEST_WaitForFlushMemTable();
933 dbfull()->TEST_WaitForCompact();
934 }
935 ASSERT_LT(TotalSize(), 110000 * 6 * 0.9);
936
937 // When we start for the compaction up to (2 4 8), the latest
938 // compressed is not compressed.
939 for (int num = 0; num < 8; num++) {
940 // Write 110KB (11 values, each 10K)
941 for (int i = 0; i < 11; i++) {
942 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
943 key_idx++;
944 }
945 dbfull()->TEST_WaitForFlushMemTable();
946 dbfull()->TEST_WaitForCompact();
947 }
948 ASSERT_GT(TotalSize(), 110000 * 11 * 0.8 + 110000 * 2);
949 }
950
951 TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio2) {
952 if (!Snappy_Supported()) {
953 return;
954 }
955 Options options = CurrentOptions();
956 options.compaction_style = kCompactionStyleUniversal;
957 options.write_buffer_size = 100 << 10; // 100KB
958 options.target_file_size_base = 32 << 10; // 32KB
959 options.level0_file_num_compaction_trigger = 2;
960 options.num_levels = num_levels_;
961 options.compaction_options_universal.compression_size_percent = 95;
962 DestroyAndReopen(options);
963
964 Random rnd(301);
965 int key_idx = 0;
966
967 // When we start for the compaction up to (2 4 8), the latest
968 // compressed is compressed given the size ratio to compress.
969 for (int num = 0; num < 14; num++) {
970 // Write 120KB (12 values, each 10K)
971 for (int i = 0; i < 12; i++) {
972 ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
973 key_idx++;
974 }
975 dbfull()->TEST_WaitForFlushMemTable();
976 dbfull()->TEST_WaitForCompact();
977 }
978 ASSERT_LT(TotalSize(), 120000U * 12 * 0.8 + 120000 * 2);
979 }
980
981 // Test that checks trivial move in universal compaction
982 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest1) {
983 int32_t trivial_move = 0;
984 int32_t non_trivial_move = 0;
985 rocksdb::SyncPoint::GetInstance()->SetCallBack(
986 "DBImpl::BackgroundCompaction:TrivialMove",
987 [&](void* arg) { trivial_move++; });
988 rocksdb::SyncPoint::GetInstance()->SetCallBack(
989 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
990 non_trivial_move++;
991 ASSERT_TRUE(arg != nullptr);
992 int output_level = *(static_cast<int*>(arg));
993 ASSERT_EQ(output_level, 0);
994 });
995 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
996
997 Options options = CurrentOptions();
998 options.compaction_style = kCompactionStyleUniversal;
999 options.compaction_options_universal.allow_trivial_move = true;
1000 options.num_levels = 2;
1001 options.write_buffer_size = 100 << 10; // 100KB
1002 options.level0_file_num_compaction_trigger = 3;
1003 options.max_background_compactions = 1;
1004 options.target_file_size_base = 32 * 1024;
1005 DestroyAndReopen(options);
1006 CreateAndReopenWithCF({"pikachu"}, options);
1007
1008 // Trigger compaction if size amplification exceeds 110%
1009 options.compaction_options_universal.max_size_amplification_percent = 110;
1010 options = CurrentOptions(options);
1011 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1012
1013 Random rnd(301);
1014 int num_keys = 250000;
1015 for (int i = 0; i < num_keys; i++) {
1016 ASSERT_OK(Put(1, Key(i), Key(i)));
1017 }
1018 std::vector<std::string> values;
1019
1020 ASSERT_OK(Flush(1));
1021 dbfull()->TEST_WaitForCompact();
1022
1023 ASSERT_GT(trivial_move, 0);
1024 ASSERT_GT(non_trivial_move, 0);
1025
1026 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1027 }
1028 // Test that checks trivial move in universal compaction
1029 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest2) {
1030 int32_t trivial_move = 0;
1031 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1032 "DBImpl::BackgroundCompaction:TrivialMove",
1033 [&](void* arg) { trivial_move++; });
1034 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1035 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
1036 ASSERT_TRUE(arg != nullptr);
1037 int output_level = *(static_cast<int*>(arg));
1038 ASSERT_EQ(output_level, 0);
1039 });
1040
1041 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1042
1043 Options options = CurrentOptions();
1044 options.compaction_style = kCompactionStyleUniversal;
1045 options.compaction_options_universal.allow_trivial_move = true;
1046 options.num_levels = 15;
1047 options.write_buffer_size = 100 << 10; // 100KB
1048 options.level0_file_num_compaction_trigger = 8;
1049 options.max_background_compactions = 2;
1050 options.target_file_size_base = 64 * 1024;
1051 DestroyAndReopen(options);
1052 CreateAndReopenWithCF({"pikachu"}, options);
1053
1054 // Trigger compaction if size amplification exceeds 110%
1055 options.compaction_options_universal.max_size_amplification_percent = 110;
1056 options = CurrentOptions(options);
1057 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1058
1059 Random rnd(301);
1060 int num_keys = 500000;
1061 for (int i = 0; i < num_keys; i++) {
1062 ASSERT_OK(Put(1, Key(i), Key(i)));
1063 }
1064 std::vector<std::string> values;
1065
1066 ASSERT_OK(Flush(1));
1067 dbfull()->TEST_WaitForCompact();
1068
1069 ASSERT_GT(trivial_move, 0);
1070
1071 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1072 }
1073
1074 TEST_P(DBTestUniversalCompaction, UniversalCompactionFourPaths) {
1075 Options options = CurrentOptions();
1076 options.db_paths.emplace_back(dbname_, 300 * 1024);
1077 options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024);
1078 options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024);
1079 options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024);
1080 options.memtable_factory.reset(
1081 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1082 options.compaction_style = kCompactionStyleUniversal;
1083 options.compaction_options_universal.size_ratio = 5;
1084 options.write_buffer_size = 111 << 10; // 114KB
1085 options.arena_block_size = 4 << 10;
1086 options.level0_file_num_compaction_trigger = 2;
1087 options.num_levels = 1;
1088
1089 std::vector<std::string> filenames;
1090 env_->GetChildren(options.db_paths[1].path, &filenames);
1091 // Delete archival files.
1092 for (size_t i = 0; i < filenames.size(); ++i) {
1093 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1094 }
1095 env_->DeleteDir(options.db_paths[1].path);
1096 Reopen(options);
1097
1098 Random rnd(301);
1099 int key_idx = 0;
1100
1101 // First three 110KB files are not going to second path.
1102 // After that, (100K, 200K)
1103 for (int num = 0; num < 3; num++) {
1104 GenerateNewFile(&rnd, &key_idx);
1105 }
1106
1107 // Another 110KB triggers a compaction to 400K file to second path
1108 GenerateNewFile(&rnd, &key_idx);
1109 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1110
1111 // (1, 4)
1112 GenerateNewFile(&rnd, &key_idx);
1113 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1114 ASSERT_EQ(1, GetSstFileCount(dbname_));
1115
1116 // (1,1,4) -> (2, 4)
1117 GenerateNewFile(&rnd, &key_idx);
1118 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1119 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1120 ASSERT_EQ(0, GetSstFileCount(dbname_));
1121
1122 // (1, 2, 4) -> (3, 4)
1123 GenerateNewFile(&rnd, &key_idx);
1124 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1125 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1126 ASSERT_EQ(0, GetSstFileCount(dbname_));
1127
1128 // (1, 3, 4) -> (8)
1129 GenerateNewFile(&rnd, &key_idx);
1130 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1131
1132 // (1, 8)
1133 GenerateNewFile(&rnd, &key_idx);
1134 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1135 ASSERT_EQ(1, GetSstFileCount(dbname_));
1136
1137 // (1, 1, 8) -> (2, 8)
1138 GenerateNewFile(&rnd, &key_idx);
1139 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1140 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1141
1142 // (1, 2, 8) -> (3, 8)
1143 GenerateNewFile(&rnd, &key_idx);
1144 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1145 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1146 ASSERT_EQ(0, GetSstFileCount(dbname_));
1147
1148 // (1, 3, 8) -> (4, 8)
1149 GenerateNewFile(&rnd, &key_idx);
1150 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1151 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1152
1153 // (1, 4, 8) -> (5, 8)
1154 GenerateNewFile(&rnd, &key_idx);
1155 ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1156 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1157 ASSERT_EQ(0, GetSstFileCount(dbname_));
1158
1159 for (int i = 0; i < key_idx; i++) {
1160 auto v = Get(Key(i));
1161 ASSERT_NE(v, "NOT_FOUND");
1162 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1163 }
1164
1165 Reopen(options);
1166
1167 for (int i = 0; i < key_idx; i++) {
1168 auto v = Get(Key(i));
1169 ASSERT_NE(v, "NOT_FOUND");
1170 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1171 }
1172
1173 Destroy(options);
1174 }
1175
1176 TEST_P(DBTestUniversalCompaction, IncreaseUniversalCompactionNumLevels) {
1177 std::function<void(int)> verify_func = [&](int num_keys_in_db) {
1178 std::string keys_in_db;
1179 Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
1180 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1181 keys_in_db.append(iter->key().ToString());
1182 keys_in_db.push_back(',');
1183 }
1184 delete iter;
1185
1186 std::string expected_keys;
1187 for (int i = 0; i <= num_keys_in_db; i++) {
1188 expected_keys.append(Key(i));
1189 expected_keys.push_back(',');
1190 }
1191
1192 ASSERT_EQ(keys_in_db, expected_keys);
1193 };
1194
1195 Random rnd(301);
1196 int max_key1 = 200;
1197 int max_key2 = 600;
1198 int max_key3 = 800;
1199 const int KNumKeysPerFile = 10;
1200
1201 // Stage 1: open a DB with universal compaction, num_levels=1
1202 Options options = CurrentOptions();
1203 options.compaction_style = kCompactionStyleUniversal;
1204 options.num_levels = 1;
1205 options.write_buffer_size = 200 << 10; // 200KB
1206 options.level0_file_num_compaction_trigger = 3;
1207 options.memtable_factory.reset(new SpecialSkipListFactory(KNumKeysPerFile));
1208 options = CurrentOptions(options);
1209 CreateAndReopenWithCF({"pikachu"}, options);
1210
1211 for (int i = 0; i <= max_key1; i++) {
1212 // each value is 10K
1213 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
1214 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1215 dbfull()->TEST_WaitForCompact();
1216 }
1217 ASSERT_OK(Flush(1));
1218 dbfull()->TEST_WaitForCompact();
1219
1220 // Stage 2: reopen with universal compaction, num_levels=4
1221 options.compaction_style = kCompactionStyleUniversal;
1222 options.num_levels = 4;
1223 options = CurrentOptions(options);
1224 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1225
1226 verify_func(max_key1);
1227
1228 // Insert more keys
1229 for (int i = max_key1 + 1; i <= max_key2; i++) {
1230 // each value is 10K
1231 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
1232 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1233 dbfull()->TEST_WaitForCompact();
1234 }
1235 ASSERT_OK(Flush(1));
1236 dbfull()->TEST_WaitForCompact();
1237
1238 verify_func(max_key2);
1239 // Compaction to non-L0 has happened.
1240 ASSERT_GT(NumTableFilesAtLevel(options.num_levels - 1, 1), 0);
1241
1242 // Stage 3: Revert it back to one level and revert to num_levels=1.
1243 options.num_levels = 4;
1244 options.target_file_size_base = INT_MAX;
1245 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1246 // Compact all to level 0
1247 CompactRangeOptions compact_options;
1248 compact_options.change_level = true;
1249 compact_options.target_level = 0;
1250 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1251 dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
1252 // Need to restart it once to remove higher level records in manifest.
1253 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1254 // Final reopen
1255 options.compaction_style = kCompactionStyleUniversal;
1256 options.num_levels = 1;
1257 options = CurrentOptions(options);
1258 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1259
1260 // Insert more keys
1261 for (int i = max_key2 + 1; i <= max_key3; i++) {
1262 // each value is 10K
1263 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
1264 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1265 dbfull()->TEST_WaitForCompact();
1266 }
1267 ASSERT_OK(Flush(1));
1268 dbfull()->TEST_WaitForCompact();
1269 verify_func(max_key3);
1270 }
1271
1272
1273 TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
1274 if (!Snappy_Supported()) {
1275 return;
1276 }
1277 Options options = CurrentOptions();
1278 options.db_paths.emplace_back(dbname_, 500 * 1024);
1279 options.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024);
1280 options.compaction_style = kCompactionStyleUniversal;
1281 options.compaction_options_universal.size_ratio = 5;
1282 options.write_buffer_size = 111 << 10; // 114KB
1283 options.arena_block_size = 4 << 10;
1284 options.level0_file_num_compaction_trigger = 2;
1285 options.num_levels = 1;
1286 options.memtable_factory.reset(
1287 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1288
1289 std::vector<std::string> filenames;
1290 env_->GetChildren(options.db_paths[1].path, &filenames);
1291 // Delete archival files.
1292 for (size_t i = 0; i < filenames.size(); ++i) {
1293 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1294 }
1295 env_->DeleteDir(options.db_paths[1].path);
1296 Reopen(options);
1297
1298 Random rnd(301);
1299 int key_idx = 0;
1300
1301 // First three 110KB files are not going to second path.
1302 // After that, (100K, 200K)
1303 for (int num = 0; num < 3; num++) {
1304 GenerateNewFile(&rnd, &key_idx);
1305 }
1306
1307 // Another 110KB triggers a compaction to 400K file to second path
1308 GenerateNewFile(&rnd, &key_idx);
1309 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1310
1311 // (1, 4)
1312 GenerateNewFile(&rnd, &key_idx);
1313 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1314 ASSERT_EQ(1, GetSstFileCount(dbname_));
1315
1316 // (1,1,4) -> (2, 4)
1317 GenerateNewFile(&rnd, &key_idx);
1318 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1319 ASSERT_EQ(1, GetSstFileCount(dbname_));
1320
1321 // (1, 2, 4) -> (3, 4)
1322 GenerateNewFile(&rnd, &key_idx);
1323 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1324 ASSERT_EQ(0, GetSstFileCount(dbname_));
1325
1326 // (1, 3, 4) -> (8)
1327 GenerateNewFile(&rnd, &key_idx);
1328 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1329 ASSERT_EQ(0, GetSstFileCount(dbname_));
1330
1331 // (1, 8)
1332 GenerateNewFile(&rnd, &key_idx);
1333 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1334 ASSERT_EQ(1, GetSstFileCount(dbname_));
1335
1336 // (1, 1, 8) -> (2, 8)
1337 GenerateNewFile(&rnd, &key_idx);
1338 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1339 ASSERT_EQ(1, GetSstFileCount(dbname_));
1340
1341 // (1, 2, 8) -> (3, 8)
1342 GenerateNewFile(&rnd, &key_idx);
1343 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1344 ASSERT_EQ(0, GetSstFileCount(dbname_));
1345
1346 // (1, 3, 8) -> (4, 8)
1347 GenerateNewFile(&rnd, &key_idx);
1348 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1349 ASSERT_EQ(0, GetSstFileCount(dbname_));
1350
1351 // (1, 4, 8) -> (5, 8)
1352 GenerateNewFile(&rnd, &key_idx);
1353 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1354 ASSERT_EQ(0, GetSstFileCount(dbname_));
1355
1356 for (int i = 0; i < key_idx; i++) {
1357 auto v = Get(Key(i));
1358 ASSERT_NE(v, "NOT_FOUND");
1359 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1360 }
1361
1362 Reopen(options);
1363
1364 for (int i = 0; i < key_idx; i++) {
1365 auto v = Get(Key(i));
1366 ASSERT_NE(v, "NOT_FOUND");
1367 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1368 }
1369
1370 Destroy(options);
1371 }
1372
1373 INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction,
1374 ::testing::Combine(::testing::Values(1, 3, 5),
1375 ::testing::Bool()));
1376
1377 class DBTestUniversalManualCompactionOutputPathId
1378 : public DBTestUniversalCompactionBase {
1379 public:
1380 DBTestUniversalManualCompactionOutputPathId() :
1381 DBTestUniversalCompactionBase(
1382 "/db_universal_compaction_manual_pid_test") {}
1383 };
1384
1385 TEST_P(DBTestUniversalManualCompactionOutputPathId,
1386 ManualCompactionOutputPathId) {
1387 Options options = CurrentOptions();
1388 options.create_if_missing = true;
1389 options.db_paths.emplace_back(dbname_, 1000000000);
1390 options.db_paths.emplace_back(dbname_ + "_2", 1000000000);
1391 options.compaction_style = kCompactionStyleUniversal;
1392 options.num_levels = num_levels_;
1393 options.target_file_size_base = 1 << 30; // Big size
1394 options.level0_file_num_compaction_trigger = 10;
1395 Destroy(options);
1396 DestroyAndReopen(options);
1397 CreateAndReopenWithCF({"pikachu"}, options);
1398 MakeTables(3, "p", "q", 1);
1399 dbfull()->TEST_WaitForCompact();
1400 ASSERT_EQ(2, TotalLiveFiles(1));
1401 ASSERT_EQ(2, GetSstFileCount(options.db_paths[0].path));
1402 ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
1403
1404 // Full compaction to DB path 0
1405 CompactRangeOptions compact_options;
1406 compact_options.target_path_id = 1;
1407 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1408 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
1409 ASSERT_EQ(1, TotalLiveFiles(1));
1410 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
1411 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1412
1413 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
1414 ASSERT_EQ(1, TotalLiveFiles(1));
1415 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
1416 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1417
1418 MakeTables(1, "p", "q", 1);
1419 ASSERT_EQ(2, TotalLiveFiles(1));
1420 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1421 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1422
1423 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
1424 ASSERT_EQ(2, TotalLiveFiles(1));
1425 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1426 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1427
1428 // Full compaction to DB path 0
1429 compact_options.target_path_id = 0;
1430 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1431 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
1432 ASSERT_EQ(1, TotalLiveFiles(1));
1433 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1434 ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
1435
1436 // Fail when compacting to an invalid path ID
1437 compact_options.target_path_id = 2;
1438 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1439 ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
1440 .IsInvalidArgument());
1441 }
1442
1443 INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId,
1444 DBTestUniversalManualCompactionOutputPathId,
1445 ::testing::Combine(::testing::Values(1, 8),
1446 ::testing::Bool()));
1447
1448 } // namespace rocksdb
1449
1450 #endif // !defined(ROCKSDB_LITE)
1451
1452 int main(int argc, char** argv) {
1453 #if !defined(ROCKSDB_LITE)
1454 rocksdb::port::InstallStackTraceHandler();
1455 ::testing::InitGoogleTest(&argc, argv);
1456 return RUN_ALL_TESTS();
1457 #else
1458 return 0;
1459 #endif
1460 }