1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12 #if !defined(ROCKSDB_LITE)
13 #include "util/sync_point.h"
17 static std::string
CompressibleString(Random
* rnd
, int len
) {
19 test::CompressibleString(rnd
, 0.8, len
, &r
);
23 class DBTestUniversalCompactionBase
25 public ::testing::WithParamInterface
<std::tuple
<int, bool>> {
27 explicit DBTestUniversalCompactionBase(
28 const std::string
& path
) : DBTestBase(path
) {}
29 virtual void SetUp() override
{
30 num_levels_
= std::get
<0>(GetParam());
31 exclusive_manual_compaction_
= std::get
<1>(GetParam());
34 bool exclusive_manual_compaction_
;
37 class DBTestUniversalCompaction
: public DBTestUniversalCompactionBase
{
39 DBTestUniversalCompaction() :
40 DBTestUniversalCompactionBase("/db_universal_compaction_test") {}
44 void VerifyCompactionResult(
45 const ColumnFamilyMetaData
& cf_meta
,
46 const std::set
<std::string
>& overlapping_file_numbers
) {
48 for (auto& level
: cf_meta
.levels
) {
49 for (auto& file
: level
.files
) {
50 assert(overlapping_file_numbers
.find(file
.name
) ==
51 overlapping_file_numbers
.end());
57 class KeepFilter
: public CompactionFilter
{
59 virtual bool Filter(int level
, const Slice
& key
, const Slice
& value
,
60 std::string
* new_value
, bool* value_changed
) const
65 virtual const char* Name() const override
{ return "KeepFilter"; }
68 class KeepFilterFactory
: public CompactionFilterFactory
{
70 explicit KeepFilterFactory(bool check_context
= false)
71 : check_context_(check_context
) {}
73 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
74 const CompactionFilter::Context
& context
) override
{
76 EXPECT_EQ(expect_full_compaction_
.load(), context
.is_full_compaction
);
77 EXPECT_EQ(expect_manual_compaction_
.load(), context
.is_manual_compaction
);
79 return std::unique_ptr
<CompactionFilter
>(new KeepFilter());
82 virtual const char* Name() const override
{ return "KeepFilterFactory"; }
84 std::atomic_bool expect_full_compaction_
;
85 std::atomic_bool expect_manual_compaction_
;
88 class DelayFilter
: public CompactionFilter
{
90 explicit DelayFilter(DBTestBase
* d
) : db_test(d
) {}
91 virtual bool Filter(int level
, const Slice
& key
, const Slice
& value
,
92 std::string
* new_value
,
93 bool* value_changed
) const override
{
94 db_test
->env_
->addon_time_
.fetch_add(1000);
98 virtual const char* Name() const override
{ return "DelayFilter"; }
104 class DelayFilterFactory
: public CompactionFilterFactory
{
106 explicit DelayFilterFactory(DBTestBase
* d
) : db_test(d
) {}
107 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
108 const CompactionFilter::Context
& context
) override
{
109 return std::unique_ptr
<CompactionFilter
>(new DelayFilter(db_test
));
112 virtual const char* Name() const override
{ return "DelayFilterFactory"; }
119 // Make sure we don't trigger a problem if the trigger conditon is given
120 // to be 0, which is invalid.
121 TEST_P(DBTestUniversalCompaction
, UniversalCompactionSingleSortedRun
) {
122 Options options
= CurrentOptions();
124 options
.compaction_style
= kCompactionStyleUniversal
;
125 options
.num_levels
= num_levels_
;
126 // Config universal compaction to always compact to one single sorted run.
127 options
.level0_file_num_compaction_trigger
= 0;
128 options
.compaction_options_universal
.size_ratio
= 10;
129 options
.compaction_options_universal
.min_merge_width
= 2;
130 options
.compaction_options_universal
.max_size_amplification_percent
= 0;
132 options
.write_buffer_size
= 105 << 10; // 105KB
133 options
.arena_block_size
= 4 << 10;
134 options
.target_file_size_base
= 32 << 10; // 32KB
135 // trigger compaction if there are >= 4 files
136 KeepFilterFactory
* filter
= new KeepFilterFactory(true);
137 filter
->expect_manual_compaction_
.store(false);
138 options
.compaction_filter_factory
.reset(filter
);
140 DestroyAndReopen(options
);
141 ASSERT_EQ(1, db_
->GetOptions().level0_file_num_compaction_trigger
);
146 filter
->expect_full_compaction_
.store(true);
148 for (int num
= 0; num
< 16; num
++) {
149 // Write 100KB file. And immediately it should be compacted to one file.
150 GenerateNewFile(&rnd
, &key_idx
);
151 dbfull()->TEST_WaitForCompact();
152 ASSERT_EQ(NumSortedRuns(0), 1);
154 ASSERT_OK(Put(Key(key_idx
), ""));
155 dbfull()->TEST_WaitForCompact();
156 ASSERT_EQ(NumSortedRuns(0), 1);
159 TEST_P(DBTestUniversalCompaction
, OptimizeFiltersForHits
) {
160 Options options
= CurrentOptions();
161 options
.compaction_style
= kCompactionStyleUniversal
;
162 options
.compaction_options_universal
.size_ratio
= 5;
163 options
.num_levels
= num_levels_
;
164 options
.write_buffer_size
= 105 << 10; // 105KB
165 options
.arena_block_size
= 4 << 10;
166 options
.target_file_size_base
= 32 << 10; // 32KB
167 // trigger compaction if there are >= 4 files
168 options
.level0_file_num_compaction_trigger
= 4;
169 BlockBasedTableOptions bbto
;
170 bbto
.cache_index_and_filter_blocks
= true;
171 bbto
.filter_policy
.reset(NewBloomFilterPolicy(10, false));
172 bbto
.whole_key_filtering
= true;
173 options
.table_factory
.reset(NewBlockBasedTableFactory(bbto
));
174 options
.optimize_filters_for_hits
= true;
175 options
.statistics
= rocksdb::CreateDBStatistics();
176 options
.memtable_factory
.reset(new SpecialSkipListFactory(3));
178 DestroyAndReopen(options
);
180 // block compaction from happening
181 env_
->SetBackgroundThreads(1, Env::LOW
);
182 test::SleepingBackgroundTask sleeping_task_low
;
183 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
186 for (int num
= 0; num
< options
.level0_file_num_compaction_trigger
; num
++) {
187 Put(Key(num
* 10), "val");
189 dbfull()->TEST_WaitForFlushMemTable();
191 Put(Key(30 + num
* 10), "val");
192 Put(Key(60 + num
* 10), "val");
195 dbfull()->TEST_WaitForFlushMemTable();
197 // Query set of non existing keys
198 for (int i
= 5; i
< 90; i
+= 10) {
199 ASSERT_EQ(Get(Key(i
)), "NOT_FOUND");
202 // Make sure bloom filter is used at least once.
203 ASSERT_GT(TestGetTickerCount(options
, BLOOM_FILTER_USEFUL
), 0);
204 auto prev_counter
= TestGetTickerCount(options
, BLOOM_FILTER_USEFUL
);
206 // Make sure bloom filter is used for all but the last L0 file when looking
207 // up a non-existent key that's in the range of all L0 files.
208 ASSERT_EQ(Get(Key(35)), "NOT_FOUND");
209 ASSERT_EQ(prev_counter
+ NumTableFilesAtLevel(0) - 1,
210 TestGetTickerCount(options
, BLOOM_FILTER_USEFUL
));
211 prev_counter
= TestGetTickerCount(options
, BLOOM_FILTER_USEFUL
);
213 // Unblock compaction and wait it for happening.
214 sleeping_task_low
.WakeUp();
215 dbfull()->TEST_WaitForCompact();
217 // The same queries will not trigger bloom filter
218 for (int i
= 5; i
< 90; i
+= 10) {
219 ASSERT_EQ(Get(Key(i
)), "NOT_FOUND");
221 ASSERT_EQ(prev_counter
, TestGetTickerCount(options
, BLOOM_FILTER_USEFUL
));
224 // TODO(kailiu) The tests on UniversalCompaction has some issues:
225 // 1. A lot of magic numbers ("11" or "12").
226 // 2. Made assumption on the memtable flush conditions, which may change from
228 TEST_P(DBTestUniversalCompaction
, UniversalCompactionTrigger
) {
230 options
.compaction_style
= kCompactionStyleUniversal
;
231 options
.compaction_options_universal
.size_ratio
= 5;
232 options
.num_levels
= num_levels_
;
233 options
.write_buffer_size
= 105 << 10; // 105KB
234 options
.arena_block_size
= 4 << 10;
235 options
.target_file_size_base
= 32 << 10; // 32KB
236 // trigger compaction if there are >= 4 files
237 options
.level0_file_num_compaction_trigger
= 4;
238 KeepFilterFactory
* filter
= new KeepFilterFactory(true);
239 filter
->expect_manual_compaction_
.store(false);
240 options
.compaction_filter_factory
.reset(filter
);
242 options
= CurrentOptions(options
);
243 DestroyAndReopen(options
);
244 CreateAndReopenWithCF({"pikachu"}, options
);
246 rocksdb::SyncPoint::GetInstance()->SetCallBack(
247 "DBTestWritableFile.GetPreallocationStatus", [&](void* arg
) {
248 ASSERT_TRUE(arg
!= nullptr);
249 size_t preallocation_size
= *(static_cast<size_t*>(arg
));
250 if (num_levels_
> 3) {
251 ASSERT_LE(preallocation_size
, options
.target_file_size_base
* 1.1);
254 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
259 filter
->expect_full_compaction_
.store(true);
261 // Generate a set of files at level 0, but don't trigger level-0
263 for (int num
= 0; num
< options
.level0_file_num_compaction_trigger
- 1;
266 GenerateNewFile(1, &rnd
, &key_idx
);
269 // Generate one more file at level-0, which should trigger level-0
271 GenerateNewFile(1, &rnd
, &key_idx
);
272 // Suppose each file flushed from mem table has size 1. Now we compact
273 // (level0_file_num_compaction_trigger+1)=4 files and should have a big
275 ASSERT_EQ(NumSortedRuns(1), 1);
278 // Now we have one file at level 0, with size 4. We also have some data in
279 // mem table. Let's continue generating new files at level 0, but don't
280 // trigger level-0 compaction.
281 // First, clean up memtable before inserting new data. This will generate
282 // a level-0 file, with size around 0.4 (according to previously written
284 filter
->expect_full_compaction_
.store(false);
286 for (int num
= 0; num
< options
.level0_file_num_compaction_trigger
- 3;
288 GenerateNewFile(1, &rnd
, &key_idx
);
289 ASSERT_EQ(NumSortedRuns(1), num
+ 3);
292 // Generate one more file at level-0, which should trigger level-0
294 GenerateNewFile(1, &rnd
, &key_idx
);
295 // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
296 // After compaction, we should have 2 files, with size 4, 2.4.
297 ASSERT_EQ(NumSortedRuns(1), 2);
300 // Now we have 2 files at level 0, with size 4 and 2.4. Continue
301 // generating new files at level 0.
302 for (int num
= 0; num
< options
.level0_file_num_compaction_trigger
- 3;
304 GenerateNewFile(1, &rnd
, &key_idx
);
305 ASSERT_EQ(NumSortedRuns(1), num
+ 3);
308 // Generate one more file at level-0, which should trigger level-0
310 GenerateNewFile(1, &rnd
, &key_idx
);
311 // Before compaction, we have 4 files at level 0, with size 4, 2.4, 1, 1.
312 // After compaction, we should have 3 files, with size 4, 2.4, 2.
313 ASSERT_EQ(NumSortedRuns(1), 3);
316 // Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a
317 // new file of size 1.
318 GenerateNewFile(1, &rnd
, &key_idx
);
319 dbfull()->TEST_WaitForCompact();
320 // Level-0 compaction is triggered, but no file will be picked up.
321 ASSERT_EQ(NumSortedRuns(1), 4);
324 // Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
325 // a new file of size 1.
326 filter
->expect_full_compaction_
.store(true);
327 GenerateNewFile(1, &rnd
, &key_idx
);
328 dbfull()->TEST_WaitForCompact();
329 // All files at level 0 will be compacted into a single one.
330 ASSERT_EQ(NumSortedRuns(1), 1);
332 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
335 TEST_P(DBTestUniversalCompaction
, UniversalCompactionSizeAmplification
) {
336 Options options
= CurrentOptions();
337 options
.compaction_style
= kCompactionStyleUniversal
;
338 options
.num_levels
= num_levels_
;
339 options
.write_buffer_size
= 100 << 10; // 100KB
340 options
.target_file_size_base
= 32 << 10; // 32KB
341 options
.level0_file_num_compaction_trigger
= 3;
342 DestroyAndReopen(options
);
343 CreateAndReopenWithCF({"pikachu"}, options
);
345 // Trigger compaction if size amplification exceeds 110%
346 options
.compaction_options_universal
.max_size_amplification_percent
= 110;
347 options
= CurrentOptions(options
);
348 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
353 // Generate two files in Level 0. Both files are approx the same size.
354 for (int num
= 0; num
< options
.level0_file_num_compaction_trigger
- 1;
356 // Write 110KB (11 values, each 10K)
357 for (int i
= 0; i
< 11; i
++) {
358 ASSERT_OK(Put(1, Key(key_idx
), RandomString(&rnd
, 10000)));
361 dbfull()->TEST_WaitForFlushMemTable(handles_
[1]);
362 ASSERT_EQ(NumSortedRuns(1), num
+ 1);
364 ASSERT_EQ(NumSortedRuns(1), 2);
366 // Flush whatever is remaining in memtable. This is typically
367 // small, which should not trigger size ratio based compaction
368 // but will instead trigger size amplification.
371 dbfull()->TEST_WaitForCompact();
373 // Verify that size amplification did occur
374 ASSERT_EQ(NumSortedRuns(1), 1);
377 TEST_P(DBTestUniversalCompaction
, CompactFilesOnUniversalCompaction
) {
378 const int kTestKeySize
= 16;
379 const int kTestValueSize
= 984;
380 const int kEntrySize
= kTestKeySize
+ kTestValueSize
;
381 const int kEntriesPerBuffer
= 10;
383 ChangeCompactOptions();
385 options
.create_if_missing
= true;
386 options
.compaction_style
= kCompactionStyleLevel
;
387 options
.num_levels
= 1;
388 options
.target_file_size_base
= options
.write_buffer_size
;
389 options
.compression
= kNoCompression
;
390 options
= CurrentOptions(options
);
391 options
.write_buffer_size
= kEntrySize
* kEntriesPerBuffer
;
392 CreateAndReopenWithCF({"pikachu"}, options
);
393 ASSERT_EQ(options
.compaction_style
, kCompactionStyleUniversal
);
395 for (int key
= 1024 * kEntriesPerBuffer
; key
>= 0; --key
) {
396 ASSERT_OK(Put(1, ToString(key
), RandomString(&rnd
, kTestValueSize
)));
398 dbfull()->TEST_WaitForFlushMemTable(handles_
[1]);
399 dbfull()->TEST_WaitForCompact();
400 ColumnFamilyMetaData cf_meta
;
401 dbfull()->GetColumnFamilyMetaData(handles_
[1], &cf_meta
);
402 std::vector
<std::string
> compaction_input_file_names
;
403 for (auto file
: cf_meta
.levels
[0].files
) {
405 compaction_input_file_names
.push_back(file
.name
);
409 if (compaction_input_file_names
.size() == 0) {
410 compaction_input_file_names
.push_back(
411 cf_meta
.levels
[0].files
[0].name
);
414 // expect fail since universal compaction only allow L0 output
415 ASSERT_FALSE(dbfull()
416 ->CompactFiles(CompactionOptions(), handles_
[1],
417 compaction_input_file_names
, 1)
420 // expect ok and verify the compacted files no longer exist.
421 ASSERT_OK(dbfull()->CompactFiles(
422 CompactionOptions(), handles_
[1],
423 compaction_input_file_names
, 0));
425 dbfull()->GetColumnFamilyMetaData(handles_
[1], &cf_meta
);
426 VerifyCompactionResult(
428 std::set
<std::string
>(compaction_input_file_names
.begin(),
429 compaction_input_file_names
.end()));
431 compaction_input_file_names
.clear();
433 // Pick the first and the last file, expect everything is
434 // compacted into one single file.
435 compaction_input_file_names
.push_back(
436 cf_meta
.levels
[0].files
[0].name
);
437 compaction_input_file_names
.push_back(
438 cf_meta
.levels
[0].files
[
439 cf_meta
.levels
[0].files
.size() - 1].name
);
440 ASSERT_OK(dbfull()->CompactFiles(
441 CompactionOptions(), handles_
[1],
442 compaction_input_file_names
, 0));
444 dbfull()->GetColumnFamilyMetaData(handles_
[1], &cf_meta
);
445 ASSERT_EQ(cf_meta
.levels
[0].files
.size(), 1U);
448 TEST_P(DBTestUniversalCompaction
, UniversalCompactionTargetLevel
) {
449 Options options
= CurrentOptions();
450 options
.compaction_style
= kCompactionStyleUniversal
;
451 options
.write_buffer_size
= 100 << 10; // 100KB
452 options
.num_levels
= 7;
453 options
.disable_auto_compactions
= true;
454 DestroyAndReopen(options
);
456 // Generate 3 overlapping files
458 for (int i
= 0; i
< 210; i
++) {
459 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 100)));
463 for (int i
= 200; i
< 300; i
++) {
464 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 100)));
468 for (int i
= 250; i
< 260; i
++) {
469 ASSERT_OK(Put(Key(i
), RandomString(&rnd
, 100)));
473 ASSERT_EQ("3", FilesPerLevel(0));
474 // Compact all files into 1 file and put it in L4
475 CompactRangeOptions compact_options
;
476 compact_options
.change_level
= true;
477 compact_options
.target_level
= 4;
478 compact_options
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
479 db_
->CompactRange(compact_options
, nullptr, nullptr);
480 ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
484 class DBTestUniversalCompactionMultiLevels
485 : public DBTestUniversalCompactionBase
{
487 DBTestUniversalCompactionMultiLevels() :
488 DBTestUniversalCompactionBase(
489 "/db_universal_compaction_multi_levels_test") {}
492 TEST_P(DBTestUniversalCompactionMultiLevels
, UniversalCompactionMultiLevels
) {
493 Options options
= CurrentOptions();
494 options
.compaction_style
= kCompactionStyleUniversal
;
495 options
.num_levels
= num_levels_
;
496 options
.write_buffer_size
= 100 << 10; // 100KB
497 options
.level0_file_num_compaction_trigger
= 8;
498 options
.max_background_compactions
= 3;
499 options
.target_file_size_base
= 32 * 1024;
500 CreateAndReopenWithCF({"pikachu"}, options
);
502 // Trigger compaction if size amplification exceeds 110%
503 options
.compaction_options_universal
.max_size_amplification_percent
= 110;
504 options
= CurrentOptions(options
);
505 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
508 int num_keys
= 100000;
509 for (int i
= 0; i
< num_keys
* 2; i
++) {
510 ASSERT_OK(Put(1, Key(i
% num_keys
), Key(i
)));
513 dbfull()->TEST_WaitForCompact();
515 for (int i
= num_keys
; i
< num_keys
* 2; i
++) {
516 ASSERT_EQ(Get(1, Key(i
% num_keys
)), Key(i
));
519 // Tests universal compaction with trivial move enabled
520 TEST_P(DBTestUniversalCompactionMultiLevels
, UniversalCompactionTrivialMove
) {
521 int32_t trivial_move
= 0;
522 int32_t non_trivial_move
= 0;
523 rocksdb::SyncPoint::GetInstance()->SetCallBack(
524 "DBImpl::BackgroundCompaction:TrivialMove",
525 [&](void* arg
) { trivial_move
++; });
526 rocksdb::SyncPoint::GetInstance()->SetCallBack(
527 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg
) {
529 ASSERT_TRUE(arg
!= nullptr);
530 int output_level
= *(static_cast<int*>(arg
));
531 ASSERT_EQ(output_level
, 0);
533 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
535 Options options
= CurrentOptions();
536 options
.compaction_style
= kCompactionStyleUniversal
;
537 options
.compaction_options_universal
.allow_trivial_move
= true;
538 options
.num_levels
= 3;
539 options
.write_buffer_size
= 100 << 10; // 100KB
540 options
.level0_file_num_compaction_trigger
= 3;
541 options
.max_background_compactions
= 2;
542 options
.target_file_size_base
= 32 * 1024;
543 DestroyAndReopen(options
);
544 CreateAndReopenWithCF({"pikachu"}, options
);
546 // Trigger compaction if size amplification exceeds 110%
547 options
.compaction_options_universal
.max_size_amplification_percent
= 110;
548 options
= CurrentOptions(options
);
549 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
552 int num_keys
= 150000;
553 for (int i
= 0; i
< num_keys
; i
++) {
554 ASSERT_OK(Put(1, Key(i
), Key(i
)));
556 std::vector
<std::string
> values
;
559 dbfull()->TEST_WaitForCompact();
561 ASSERT_GT(trivial_move
, 0);
562 ASSERT_GT(non_trivial_move
, 0);
564 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
567 INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels
,
568 DBTestUniversalCompactionMultiLevels
,
569 ::testing::Combine(::testing::Values(3, 20),
572 class DBTestUniversalCompactionParallel
:
573 public DBTestUniversalCompactionBase
{
575 DBTestUniversalCompactionParallel() :
576 DBTestUniversalCompactionBase(
577 "/db_universal_compaction_prallel_test") {}
580 TEST_P(DBTestUniversalCompactionParallel
, UniversalCompactionParallel
) {
581 Options options
= CurrentOptions();
582 options
.compaction_style
= kCompactionStyleUniversal
;
583 options
.num_levels
= num_levels_
;
584 options
.write_buffer_size
= 1 << 10; // 1KB
585 options
.level0_file_num_compaction_trigger
= 3;
586 options
.max_background_compactions
= 3;
587 options
.max_background_flushes
= 3;
588 options
.target_file_size_base
= 1 * 1024;
589 options
.compaction_options_universal
.max_size_amplification_percent
= 110;
590 DestroyAndReopen(options
);
591 CreateAndReopenWithCF({"pikachu"}, options
);
593 // Delay every compaction so multiple compactions will happen.
594 std::atomic
<int> num_compactions_running(0);
595 std::atomic
<bool> has_parallel(false);
596 rocksdb::SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Start",
598 if (num_compactions_running
.fetch_add(1) > 0) {
599 has_parallel
.store(true);
602 for (int nwait
= 0; nwait
< 20000; nwait
++) {
603 if (has_parallel
.load() || num_compactions_running
.load() > 1) {
604 has_parallel
.store(true);
607 env_
->SleepForMicroseconds(1000);
610 rocksdb::SyncPoint::GetInstance()->SetCallBack(
611 "CompactionJob::Run():End",
612 [&](void* arg
) { num_compactions_running
.fetch_add(-1); });
613 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
615 options
= CurrentOptions(options
);
616 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
619 int num_keys
= 30000;
620 for (int i
= 0; i
< num_keys
* 2; i
++) {
621 ASSERT_OK(Put(1, Key(i
% num_keys
), Key(i
)));
623 dbfull()->TEST_WaitForCompact();
625 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
626 ASSERT_EQ(num_compactions_running
.load(), 0);
627 ASSERT_TRUE(has_parallel
.load());
629 for (int i
= num_keys
; i
< num_keys
* 2; i
++) {
630 ASSERT_EQ(Get(1, Key(i
% num_keys
)), Key(i
));
634 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
635 for (int i
= num_keys
; i
< num_keys
* 2; i
++) {
636 ASSERT_EQ(Get(1, Key(i
% num_keys
)), Key(i
));
640 TEST_P(DBTestUniversalCompactionParallel
, PickByFileNumberBug
) {
641 Options options
= CurrentOptions();
642 options
.compaction_style
= kCompactionStyleUniversal
;
643 options
.num_levels
= num_levels_
;
644 options
.write_buffer_size
= 1 * 1024; // 1KB
645 options
.level0_file_num_compaction_trigger
= 7;
646 options
.max_background_compactions
= 2;
647 options
.target_file_size_base
= 1024 * 1024; // 1MB
649 // Disable size amplifiction compaction
650 options
.compaction_options_universal
.max_size_amplification_percent
=
652 DestroyAndReopen(options
);
654 rocksdb::SyncPoint::GetInstance()->LoadDependency(
655 {{"DBTestUniversalCompactionParallel::PickByFileNumberBug:0",
656 "BackgroundCallCompaction:0"},
657 {"UniversalCompactionPicker::PickCompaction:Return",
658 "DBTestUniversalCompactionParallel::PickByFileNumberBug:1"},
659 {"DBTestUniversalCompactionParallel::PickByFileNumberBug:2",
660 "CompactionJob::Run():Start"}});
662 int total_picked_compactions
= 0;
663 rocksdb::SyncPoint::GetInstance()->SetCallBack(
664 "UniversalCompactionPicker::PickCompaction:Return", [&](void* arg
) {
666 total_picked_compactions
++;
670 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
672 // Write 7 files to trigger compaction
674 for (int i
= 1; i
<= 70; i
++) {
675 std::string k
= Key(key_idx
++);
676 ASSERT_OK(Put(k
, k
));
682 // Wait for the 1st background compaction process to start
683 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
684 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
685 rocksdb::SyncPoint::GetInstance()->ClearTrace();
687 // Write 3 files while 1st compaction is held
688 // These 3 files have different sizes to avoid compacting based on size_ratio
690 for (int i
= 0; i
< 3; i
++) {
691 for (int j
= 1; j
<= num_keys
; j
++) {
692 std::string k
= Key(key_idx
++);
693 ASSERT_OK(Put(k
, k
));
699 // Wait for the 2nd background compaction process to start
700 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
701 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
703 // Hold the 1st and 2nd compaction from finishing
704 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
705 dbfull()->TEST_WaitForCompact();
707 // Although 2 compaction threads started, the second one did not compact
708 // anything because the number of files not being compacted is less than
709 // level0_file_num_compaction_trigger
710 EXPECT_EQ(total_picked_compactions
, 1);
711 EXPECT_EQ(TotalTableFiles(), 4);
713 // Stop SyncPoint and destroy the DB and reopen it again
714 rocksdb::SyncPoint::GetInstance()->ClearTrace();
715 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
717 total_picked_compactions
= 0;
718 DestroyAndReopen(options
);
720 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
722 // Write 7 files to trigger compaction
723 for (int i
= 1; i
<= 70; i
++) {
724 std::string k
= Key(key_idx
++);
725 ASSERT_OK(Put(k
, k
));
731 // Wait for the 1st background compaction process to start
732 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
733 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
734 rocksdb::SyncPoint::GetInstance()->ClearTrace();
736 // Write 8 files while 1st compaction is held
737 // These 8 files have different sizes to avoid compacting based on size_ratio
739 for (int i
= 0; i
< 8; i
++) {
740 for (int j
= 1; j
<= num_keys
; j
++) {
741 std::string k
= Key(key_idx
++);
742 ASSERT_OK(Put(k
, k
));
748 // Wait for the 2nd background compaction process to start
749 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
750 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
752 // Hold the 1st and 2nd compaction from finishing
753 TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
754 dbfull()->TEST_WaitForCompact();
756 // This time we will trigger a compaction because of size ratio and
757 // another compaction because of number of files that are not compacted
759 EXPECT_GE(total_picked_compactions
, 2);
762 INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel
,
763 DBTestUniversalCompactionParallel
,
764 ::testing::Combine(::testing::Values(1, 10),
765 ::testing::Values(false)));
767 TEST_P(DBTestUniversalCompaction
, UniversalCompactionOptions
) {
768 Options options
= CurrentOptions();
769 options
.compaction_style
= kCompactionStyleUniversal
;
770 options
.write_buffer_size
= 105 << 10; // 105KB
771 options
.arena_block_size
= 4 << 10; // 4KB
772 options
.target_file_size_base
= 32 << 10; // 32KB
773 options
.level0_file_num_compaction_trigger
= 4;
774 options
.num_levels
= num_levels_
;
775 options
.compaction_options_universal
.compression_size_percent
= -1;
776 DestroyAndReopen(options
);
777 CreateAndReopenWithCF({"pikachu"}, options
);
782 for (int num
= 0; num
< options
.level0_file_num_compaction_trigger
; num
++) {
783 // Write 100KB (100 values, each 1K)
784 for (int i
= 0; i
< 100; i
++) {
785 ASSERT_OK(Put(1, Key(key_idx
), RandomString(&rnd
, 990)));
788 dbfull()->TEST_WaitForFlushMemTable(handles_
[1]);
790 if (num
< options
.level0_file_num_compaction_trigger
- 1) {
791 ASSERT_EQ(NumSortedRuns(1), num
+ 1);
795 dbfull()->TEST_WaitForCompact();
796 ASSERT_EQ(NumSortedRuns(1), 1);
799 TEST_P(DBTestUniversalCompaction
, UniversalCompactionStopStyleSimilarSize
) {
800 Options options
= CurrentOptions();
801 options
.compaction_style
= kCompactionStyleUniversal
;
802 options
.write_buffer_size
= 105 << 10; // 105KB
803 options
.arena_block_size
= 4 << 10; // 4KB
804 options
.target_file_size_base
= 32 << 10; // 32KB
805 // trigger compaction if there are >= 4 files
806 options
.level0_file_num_compaction_trigger
= 4;
807 options
.compaction_options_universal
.size_ratio
= 10;
808 options
.compaction_options_universal
.stop_style
=
809 kCompactionStopStyleSimilarSize
;
810 options
.num_levels
= num_levels_
;
811 DestroyAndReopen(options
);
817 // Generate a set of files at level 0, but don't trigger level-0
819 for (int num
= 0; num
< options
.level0_file_num_compaction_trigger
- 1;
821 // Write 100KB (100 values, each 1K)
822 for (int i
= 0; i
< 100; i
++) {
823 ASSERT_OK(Put(Key(key_idx
), RandomString(&rnd
, 990)));
826 dbfull()->TEST_WaitForFlushMemTable();
827 ASSERT_EQ(NumSortedRuns(), num
+ 1);
830 // Generate one more file at level-0, which should trigger level-0
832 for (int i
= 0; i
< 100; i
++) {
833 ASSERT_OK(Put(Key(key_idx
), RandomString(&rnd
, 990)));
836 dbfull()->TEST_WaitForCompact();
837 // Suppose each file flushed from mem table has size 1. Now we compact
838 // (level0_file_num_compaction_trigger+1)=4 files and should have a big
840 ASSERT_EQ(NumSortedRuns(), 1);
843 // Now we have one file at level 0, with size 4. We also have some data in
844 // mem table. Let's continue generating new files at level 0, but don't
845 // trigger level-0 compaction.
846 // First, clean up memtable before inserting new data. This will generate
847 // a level-0 file, with size around 0.4 (according to previously written
849 dbfull()->Flush(FlushOptions());
850 for (int num
= 0; num
< options
.level0_file_num_compaction_trigger
- 3;
852 // Write 110KB (11 values, each 10K)
853 for (int i
= 0; i
< 100; i
++) {
854 ASSERT_OK(Put(Key(key_idx
), RandomString(&rnd
, 990)));
857 dbfull()->TEST_WaitForFlushMemTable();
858 ASSERT_EQ(NumSortedRuns(), num
+ 3);
861 // Generate one more file at level-0, which should trigger level-0
863 for (int i
= 0; i
< 100; i
++) {
864 ASSERT_OK(Put(Key(key_idx
), RandomString(&rnd
, 990)));
867 dbfull()->TEST_WaitForCompact();
868 // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
869 // After compaction, we should have 3 files, with size 4, 0.4, 2.
870 ASSERT_EQ(NumSortedRuns(), 3);
872 // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one
873 // more file at level-0, which should trigger level-0 compaction.
874 for (int i
= 0; i
< 100; i
++) {
875 ASSERT_OK(Put(Key(key_idx
), RandomString(&rnd
, 990)));
878 dbfull()->TEST_WaitForCompact();
879 // Level-0 compaction is triggered, but no file will be picked up.
880 ASSERT_EQ(NumSortedRuns(), 4);
883 TEST_P(DBTestUniversalCompaction
, UniversalCompactionCompressRatio1
) {
884 if (!Snappy_Supported()) {
888 Options options
= CurrentOptions();
889 options
.compaction_style
= kCompactionStyleUniversal
;
890 options
.write_buffer_size
= 100 << 10; // 100KB
891 options
.target_file_size_base
= 32 << 10; // 32KB
892 options
.level0_file_num_compaction_trigger
= 2;
893 options
.num_levels
= num_levels_
;
894 options
.compaction_options_universal
.compression_size_percent
= 70;
895 DestroyAndReopen(options
);
900 // The first compaction (2) is compressed.
901 for (int num
= 0; num
< 2; num
++) {
902 // Write 110KB (11 values, each 10K)
903 for (int i
= 0; i
< 11; i
++) {
904 ASSERT_OK(Put(Key(key_idx
), CompressibleString(&rnd
, 10000)));
907 dbfull()->TEST_WaitForFlushMemTable();
908 dbfull()->TEST_WaitForCompact();
910 ASSERT_LT(TotalSize(), 110000U * 2 * 0.9);
912 // The second compaction (4) is compressed
913 for (int num
= 0; num
< 2; num
++) {
914 // Write 110KB (11 values, each 10K)
915 for (int i
= 0; i
< 11; i
++) {
916 ASSERT_OK(Put(Key(key_idx
), CompressibleString(&rnd
, 10000)));
919 dbfull()->TEST_WaitForFlushMemTable();
920 dbfull()->TEST_WaitForCompact();
922 ASSERT_LT(TotalSize(), 110000 * 4 * 0.9);
924 // The third compaction (2 4) is compressed since this time it is
925 // (1 1 3.2) and 3.2/5.2 doesn't reach ratio.
926 for (int num
= 0; num
< 2; num
++) {
927 // Write 110KB (11 values, each 10K)
928 for (int i
= 0; i
< 11; i
++) {
929 ASSERT_OK(Put(Key(key_idx
), CompressibleString(&rnd
, 10000)));
932 dbfull()->TEST_WaitForFlushMemTable();
933 dbfull()->TEST_WaitForCompact();
935 ASSERT_LT(TotalSize(), 110000 * 6 * 0.9);
937 // When we start for the compaction up to (2 4 8), the latest
938 // compressed is not compressed.
939 for (int num
= 0; num
< 8; num
++) {
940 // Write 110KB (11 values, each 10K)
941 for (int i
= 0; i
< 11; i
++) {
942 ASSERT_OK(Put(Key(key_idx
), CompressibleString(&rnd
, 10000)));
945 dbfull()->TEST_WaitForFlushMemTable();
946 dbfull()->TEST_WaitForCompact();
948 ASSERT_GT(TotalSize(), 110000 * 11 * 0.8 + 110000 * 2);
951 TEST_P(DBTestUniversalCompaction
, UniversalCompactionCompressRatio2
) {
952 if (!Snappy_Supported()) {
955 Options options
= CurrentOptions();
956 options
.compaction_style
= kCompactionStyleUniversal
;
957 options
.write_buffer_size
= 100 << 10; // 100KB
958 options
.target_file_size_base
= 32 << 10; // 32KB
959 options
.level0_file_num_compaction_trigger
= 2;
960 options
.num_levels
= num_levels_
;
961 options
.compaction_options_universal
.compression_size_percent
= 95;
962 DestroyAndReopen(options
);
967 // When we start for the compaction up to (2 4 8), the latest
968 // compressed is compressed given the size ratio to compress.
969 for (int num
= 0; num
< 14; num
++) {
970 // Write 120KB (12 values, each 10K)
971 for (int i
= 0; i
< 12; i
++) {
972 ASSERT_OK(Put(Key(key_idx
), CompressibleString(&rnd
, 10000)));
975 dbfull()->TEST_WaitForFlushMemTable();
976 dbfull()->TEST_WaitForCompact();
978 ASSERT_LT(TotalSize(), 120000U * 12 * 0.8 + 120000 * 2);
981 // Test that checks trivial move in universal compaction
982 TEST_P(DBTestUniversalCompaction
, UniversalCompactionTrivialMoveTest1
) {
983 int32_t trivial_move
= 0;
984 int32_t non_trivial_move
= 0;
985 rocksdb::SyncPoint::GetInstance()->SetCallBack(
986 "DBImpl::BackgroundCompaction:TrivialMove",
987 [&](void* arg
) { trivial_move
++; });
988 rocksdb::SyncPoint::GetInstance()->SetCallBack(
989 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg
) {
991 ASSERT_TRUE(arg
!= nullptr);
992 int output_level
= *(static_cast<int*>(arg
));
993 ASSERT_EQ(output_level
, 0);
995 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
997 Options options
= CurrentOptions();
998 options
.compaction_style
= kCompactionStyleUniversal
;
999 options
.compaction_options_universal
.allow_trivial_move
= true;
1000 options
.num_levels
= 2;
1001 options
.write_buffer_size
= 100 << 10; // 100KB
1002 options
.level0_file_num_compaction_trigger
= 3;
1003 options
.max_background_compactions
= 1;
1004 options
.target_file_size_base
= 32 * 1024;
1005 DestroyAndReopen(options
);
1006 CreateAndReopenWithCF({"pikachu"}, options
);
1008 // Trigger compaction if size amplification exceeds 110%
1009 options
.compaction_options_universal
.max_size_amplification_percent
= 110;
1010 options
= CurrentOptions(options
);
1011 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1014 int num_keys
= 250000;
1015 for (int i
= 0; i
< num_keys
; i
++) {
1016 ASSERT_OK(Put(1, Key(i
), Key(i
)));
1018 std::vector
<std::string
> values
;
1020 ASSERT_OK(Flush(1));
1021 dbfull()->TEST_WaitForCompact();
1023 ASSERT_GT(trivial_move
, 0);
1024 ASSERT_GT(non_trivial_move
, 0);
1026 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1028 // Test that checks trivial move in universal compaction
1029 TEST_P(DBTestUniversalCompaction
, UniversalCompactionTrivialMoveTest2
) {
1030 int32_t trivial_move
= 0;
1031 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1032 "DBImpl::BackgroundCompaction:TrivialMove",
1033 [&](void* arg
) { trivial_move
++; });
1034 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1035 "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg
) {
1036 ASSERT_TRUE(arg
!= nullptr);
1037 int output_level
= *(static_cast<int*>(arg
));
1038 ASSERT_EQ(output_level
, 0);
1041 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1043 Options options
= CurrentOptions();
1044 options
.compaction_style
= kCompactionStyleUniversal
;
1045 options
.compaction_options_universal
.allow_trivial_move
= true;
1046 options
.num_levels
= 15;
1047 options
.write_buffer_size
= 100 << 10; // 100KB
1048 options
.level0_file_num_compaction_trigger
= 8;
1049 options
.max_background_compactions
= 2;
1050 options
.target_file_size_base
= 64 * 1024;
1051 DestroyAndReopen(options
);
1052 CreateAndReopenWithCF({"pikachu"}, options
);
1054 // Trigger compaction if size amplification exceeds 110%
1055 options
.compaction_options_universal
.max_size_amplification_percent
= 110;
1056 options
= CurrentOptions(options
);
1057 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1060 int num_keys
= 500000;
1061 for (int i
= 0; i
< num_keys
; i
++) {
1062 ASSERT_OK(Put(1, Key(i
), Key(i
)));
1064 std::vector
<std::string
> values
;
1066 ASSERT_OK(Flush(1));
1067 dbfull()->TEST_WaitForCompact();
1069 ASSERT_GT(trivial_move
, 0);
1071 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1074 TEST_P(DBTestUniversalCompaction
, UniversalCompactionFourPaths
) {
1075 Options options
= CurrentOptions();
1076 options
.db_paths
.emplace_back(dbname_
, 300 * 1024);
1077 options
.db_paths
.emplace_back(dbname_
+ "_2", 300 * 1024);
1078 options
.db_paths
.emplace_back(dbname_
+ "_3", 500 * 1024);
1079 options
.db_paths
.emplace_back(dbname_
+ "_4", 1024 * 1024 * 1024);
1080 options
.memtable_factory
.reset(
1081 new SpecialSkipListFactory(KNumKeysByGenerateNewFile
- 1));
1082 options
.compaction_style
= kCompactionStyleUniversal
;
1083 options
.compaction_options_universal
.size_ratio
= 5;
1084 options
.write_buffer_size
= 111 << 10; // 114KB
1085 options
.arena_block_size
= 4 << 10;
1086 options
.level0_file_num_compaction_trigger
= 2;
1087 options
.num_levels
= 1;
1089 std::vector
<std::string
> filenames
;
1090 env_
->GetChildren(options
.db_paths
[1].path
, &filenames
);
1091 // Delete archival files.
1092 for (size_t i
= 0; i
< filenames
.size(); ++i
) {
1093 env_
->DeleteFile(options
.db_paths
[1].path
+ "/" + filenames
[i
]);
1095 env_
->DeleteDir(options
.db_paths
[1].path
);
1101 // First three 110KB files are not going to second path.
1102 // After that, (100K, 200K)
1103 for (int num
= 0; num
< 3; num
++) {
1104 GenerateNewFile(&rnd
, &key_idx
);
1107 // Another 110KB triggers a compaction to 400K file to second path
1108 GenerateNewFile(&rnd
, &key_idx
);
1109 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[2].path
));
1112 GenerateNewFile(&rnd
, &key_idx
);
1113 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[2].path
));
1114 ASSERT_EQ(1, GetSstFileCount(dbname_
));
1116 // (1,1,4) -> (2, 4)
1117 GenerateNewFile(&rnd
, &key_idx
);
1118 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[2].path
));
1119 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1120 ASSERT_EQ(0, GetSstFileCount(dbname_
));
1122 // (1, 2, 4) -> (3, 4)
1123 GenerateNewFile(&rnd
, &key_idx
);
1124 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[2].path
));
1125 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1126 ASSERT_EQ(0, GetSstFileCount(dbname_
));
1129 GenerateNewFile(&rnd
, &key_idx
);
1130 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[3].path
));
1133 GenerateNewFile(&rnd
, &key_idx
);
1134 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[3].path
));
1135 ASSERT_EQ(1, GetSstFileCount(dbname_
));
1137 // (1, 1, 8) -> (2, 8)
1138 GenerateNewFile(&rnd
, &key_idx
);
1139 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[3].path
));
1140 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1142 // (1, 2, 8) -> (3, 8)
1143 GenerateNewFile(&rnd
, &key_idx
);
1144 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[3].path
));
1145 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1146 ASSERT_EQ(0, GetSstFileCount(dbname_
));
1148 // (1, 3, 8) -> (4, 8)
1149 GenerateNewFile(&rnd
, &key_idx
);
1150 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[2].path
));
1151 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[3].path
));
1153 // (1, 4, 8) -> (5, 8)
1154 GenerateNewFile(&rnd
, &key_idx
);
1155 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[3].path
));
1156 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[2].path
));
1157 ASSERT_EQ(0, GetSstFileCount(dbname_
));
1159 for (int i
= 0; i
< key_idx
; i
++) {
1160 auto v
= Get(Key(i
));
1161 ASSERT_NE(v
, "NOT_FOUND");
1162 ASSERT_TRUE(v
.size() == 1 || v
.size() == 990);
1167 for (int i
= 0; i
< key_idx
; i
++) {
1168 auto v
= Get(Key(i
));
1169 ASSERT_NE(v
, "NOT_FOUND");
1170 ASSERT_TRUE(v
.size() == 1 || v
.size() == 990);
1176 TEST_P(DBTestUniversalCompaction
, IncreaseUniversalCompactionNumLevels
) {
1177 std::function
<void(int)> verify_func
= [&](int num_keys_in_db
) {
1178 std::string keys_in_db
;
1179 Iterator
* iter
= dbfull()->NewIterator(ReadOptions(), handles_
[1]);
1180 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
1181 keys_in_db
.append(iter
->key().ToString());
1182 keys_in_db
.push_back(',');
1186 std::string expected_keys
;
1187 for (int i
= 0; i
<= num_keys_in_db
; i
++) {
1188 expected_keys
.append(Key(i
));
1189 expected_keys
.push_back(',');
1192 ASSERT_EQ(keys_in_db
, expected_keys
);
1199 const int KNumKeysPerFile
= 10;
1201 // Stage 1: open a DB with universal compaction, num_levels=1
1202 Options options
= CurrentOptions();
1203 options
.compaction_style
= kCompactionStyleUniversal
;
1204 options
.num_levels
= 1;
1205 options
.write_buffer_size
= 200 << 10; // 200KB
1206 options
.level0_file_num_compaction_trigger
= 3;
1207 options
.memtable_factory
.reset(new SpecialSkipListFactory(KNumKeysPerFile
));
1208 options
= CurrentOptions(options
);
1209 CreateAndReopenWithCF({"pikachu"}, options
);
1211 for (int i
= 0; i
<= max_key1
; i
++) {
1212 // each value is 10K
1213 ASSERT_OK(Put(1, Key(i
), RandomString(&rnd
, 10000)));
1214 dbfull()->TEST_WaitForFlushMemTable(handles_
[1]);
1215 dbfull()->TEST_WaitForCompact();
1217 ASSERT_OK(Flush(1));
1218 dbfull()->TEST_WaitForCompact();
1220 // Stage 2: reopen with universal compaction, num_levels=4
1221 options
.compaction_style
= kCompactionStyleUniversal
;
1222 options
.num_levels
= 4;
1223 options
= CurrentOptions(options
);
1224 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1226 verify_func(max_key1
);
1229 for (int i
= max_key1
+ 1; i
<= max_key2
; i
++) {
1230 // each value is 10K
1231 ASSERT_OK(Put(1, Key(i
), RandomString(&rnd
, 10000)));
1232 dbfull()->TEST_WaitForFlushMemTable(handles_
[1]);
1233 dbfull()->TEST_WaitForCompact();
1235 ASSERT_OK(Flush(1));
1236 dbfull()->TEST_WaitForCompact();
1238 verify_func(max_key2
);
1239 // Compaction to non-L0 has happened.
1240 ASSERT_GT(NumTableFilesAtLevel(options
.num_levels
- 1, 1), 0);
1242 // Stage 3: Revert it back to one level and revert to num_levels=1.
1243 options
.num_levels
= 4;
1244 options
.target_file_size_base
= INT_MAX
;
1245 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1246 // Compact all to level 0
1247 CompactRangeOptions compact_options
;
1248 compact_options
.change_level
= true;
1249 compact_options
.target_level
= 0;
1250 compact_options
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
1251 dbfull()->CompactRange(compact_options
, handles_
[1], nullptr, nullptr);
1252 // Need to restart it once to remove higher level records in manifest.
1253 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1255 options
.compaction_style
= kCompactionStyleUniversal
;
1256 options
.num_levels
= 1;
1257 options
= CurrentOptions(options
);
1258 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1261 for (int i
= max_key2
+ 1; i
<= max_key3
; i
++) {
1262 // each value is 10K
1263 ASSERT_OK(Put(1, Key(i
), RandomString(&rnd
, 10000)));
1264 dbfull()->TEST_WaitForFlushMemTable(handles_
[1]);
1265 dbfull()->TEST_WaitForCompact();
1267 ASSERT_OK(Flush(1));
1268 dbfull()->TEST_WaitForCompact();
1269 verify_func(max_key3
);
1273 TEST_P(DBTestUniversalCompaction
, UniversalCompactionSecondPathRatio
) {
1274 if (!Snappy_Supported()) {
1277 Options options
= CurrentOptions();
1278 options
.db_paths
.emplace_back(dbname_
, 500 * 1024);
1279 options
.db_paths
.emplace_back(dbname_
+ "_2", 1024 * 1024 * 1024);
1280 options
.compaction_style
= kCompactionStyleUniversal
;
1281 options
.compaction_options_universal
.size_ratio
= 5;
1282 options
.write_buffer_size
= 111 << 10; // 114KB
1283 options
.arena_block_size
= 4 << 10;
1284 options
.level0_file_num_compaction_trigger
= 2;
1285 options
.num_levels
= 1;
1286 options
.memtable_factory
.reset(
1287 new SpecialSkipListFactory(KNumKeysByGenerateNewFile
- 1));
1289 std::vector
<std::string
> filenames
;
1290 env_
->GetChildren(options
.db_paths
[1].path
, &filenames
);
1291 // Delete archival files.
1292 for (size_t i
= 0; i
< filenames
.size(); ++i
) {
1293 env_
->DeleteFile(options
.db_paths
[1].path
+ "/" + filenames
[i
]);
1295 env_
->DeleteDir(options
.db_paths
[1].path
);
1301 // First three 110KB files are not going to second path.
1302 // After that, (100K, 200K)
1303 for (int num
= 0; num
< 3; num
++) {
1304 GenerateNewFile(&rnd
, &key_idx
);
1307 // Another 110KB triggers a compaction to 400K file to second path
1308 GenerateNewFile(&rnd
, &key_idx
);
1309 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1312 GenerateNewFile(&rnd
, &key_idx
);
1313 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1314 ASSERT_EQ(1, GetSstFileCount(dbname_
));
1316 // (1,1,4) -> (2, 4)
1317 GenerateNewFile(&rnd
, &key_idx
);
1318 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1319 ASSERT_EQ(1, GetSstFileCount(dbname_
));
1321 // (1, 2, 4) -> (3, 4)
1322 GenerateNewFile(&rnd
, &key_idx
);
1323 ASSERT_EQ(2, GetSstFileCount(options
.db_paths
[1].path
));
1324 ASSERT_EQ(0, GetSstFileCount(dbname_
));
1327 GenerateNewFile(&rnd
, &key_idx
);
1328 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1329 ASSERT_EQ(0, GetSstFileCount(dbname_
));
1332 GenerateNewFile(&rnd
, &key_idx
);
1333 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1334 ASSERT_EQ(1, GetSstFileCount(dbname_
));
1336 // (1, 1, 8) -> (2, 8)
1337 GenerateNewFile(&rnd
, &key_idx
);
1338 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1339 ASSERT_EQ(1, GetSstFileCount(dbname_
));
1341 // (1, 2, 8) -> (3, 8)
1342 GenerateNewFile(&rnd
, &key_idx
);
1343 ASSERT_EQ(2, GetSstFileCount(options
.db_paths
[1].path
));
1344 ASSERT_EQ(0, GetSstFileCount(dbname_
));
1346 // (1, 3, 8) -> (4, 8)
1347 GenerateNewFile(&rnd
, &key_idx
);
1348 ASSERT_EQ(2, GetSstFileCount(options
.db_paths
[1].path
));
1349 ASSERT_EQ(0, GetSstFileCount(dbname_
));
1351 // (1, 4, 8) -> (5, 8)
1352 GenerateNewFile(&rnd
, &key_idx
);
1353 ASSERT_EQ(2, GetSstFileCount(options
.db_paths
[1].path
));
1354 ASSERT_EQ(0, GetSstFileCount(dbname_
));
1356 for (int i
= 0; i
< key_idx
; i
++) {
1357 auto v
= Get(Key(i
));
1358 ASSERT_NE(v
, "NOT_FOUND");
1359 ASSERT_TRUE(v
.size() == 1 || v
.size() == 990);
1364 for (int i
= 0; i
< key_idx
; i
++) {
1365 auto v
= Get(Key(i
));
1366 ASSERT_NE(v
, "NOT_FOUND");
1367 ASSERT_TRUE(v
.size() == 1 || v
.size() == 990);
1373 INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels
, DBTestUniversalCompaction
,
1374 ::testing::Combine(::testing::Values(1, 3, 5),
1375 ::testing::Bool()));
1377 class DBTestUniversalManualCompactionOutputPathId
1378 : public DBTestUniversalCompactionBase
{
1380 DBTestUniversalManualCompactionOutputPathId() :
1381 DBTestUniversalCompactionBase(
1382 "/db_universal_compaction_manual_pid_test") {}
1385 TEST_P(DBTestUniversalManualCompactionOutputPathId
,
1386 ManualCompactionOutputPathId
) {
1387 Options options
= CurrentOptions();
1388 options
.create_if_missing
= true;
1389 options
.db_paths
.emplace_back(dbname_
, 1000000000);
1390 options
.db_paths
.emplace_back(dbname_
+ "_2", 1000000000);
1391 options
.compaction_style
= kCompactionStyleUniversal
;
1392 options
.num_levels
= num_levels_
;
1393 options
.target_file_size_base
= 1 << 30; // Big size
1394 options
.level0_file_num_compaction_trigger
= 10;
1396 DestroyAndReopen(options
);
1397 CreateAndReopenWithCF({"pikachu"}, options
);
1398 MakeTables(3, "p", "q", 1);
1399 dbfull()->TEST_WaitForCompact();
1400 ASSERT_EQ(2, TotalLiveFiles(1));
1401 ASSERT_EQ(2, GetSstFileCount(options
.db_paths
[0].path
));
1402 ASSERT_EQ(0, GetSstFileCount(options
.db_paths
[1].path
));
1404 // Full compaction to DB path 0
1405 CompactRangeOptions compact_options
;
1406 compact_options
.target_path_id
= 1;
1407 compact_options
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
1408 db_
->CompactRange(compact_options
, handles_
[1], nullptr, nullptr);
1409 ASSERT_EQ(1, TotalLiveFiles(1));
1410 ASSERT_EQ(0, GetSstFileCount(options
.db_paths
[0].path
));
1411 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1413 ReopenWithColumnFamilies({kDefaultColumnFamilyName
, "pikachu"}, options
);
1414 ASSERT_EQ(1, TotalLiveFiles(1));
1415 ASSERT_EQ(0, GetSstFileCount(options
.db_paths
[0].path
));
1416 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1418 MakeTables(1, "p", "q", 1);
1419 ASSERT_EQ(2, TotalLiveFiles(1));
1420 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[0].path
));
1421 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1423 ReopenWithColumnFamilies({kDefaultColumnFamilyName
, "pikachu"}, options
);
1424 ASSERT_EQ(2, TotalLiveFiles(1));
1425 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[0].path
));
1426 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[1].path
));
1428 // Full compaction to DB path 0
1429 compact_options
.target_path_id
= 0;
1430 compact_options
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
1431 db_
->CompactRange(compact_options
, handles_
[1], nullptr, nullptr);
1432 ASSERT_EQ(1, TotalLiveFiles(1));
1433 ASSERT_EQ(1, GetSstFileCount(options
.db_paths
[0].path
));
1434 ASSERT_EQ(0, GetSstFileCount(options
.db_paths
[1].path
));
1436 // Fail when compacting to an invalid path ID
1437 compact_options
.target_path_id
= 2;
1438 compact_options
.exclusive_manual_compaction
= exclusive_manual_compaction_
;
1439 ASSERT_TRUE(db_
->CompactRange(compact_options
, handles_
[1], nullptr, nullptr)
1440 .IsInvalidArgument());
1443 INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId
,
1444 DBTestUniversalManualCompactionOutputPathId
,
1445 ::testing::Combine(::testing::Values(1, 8),
1446 ::testing::Bool()));
1448 } // namespace rocksdb
1450 #endif // !defined(ROCKSDB_LITE)
1452 int main(int argc
, char** argv
) {
1453 #if !defined(ROCKSDB_LITE)
1454 rocksdb::port::InstallStackTraceHandler();
1455 ::testing::InitGoogleTest(&argc
, argv
);
1456 return RUN_ALL_TESTS();