1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
15 static int cfilter_count
= 0;
16 static int cfilter_skips
= 0;
18 // This is a static filter used for filtering
19 // kvs during the compaction process.
20 static std::string NEW_VALUE
= "NewValue";
22 class DBTestCompactionFilter
: public DBTestBase
{
24 DBTestCompactionFilter() : DBTestBase("/db_compaction_filter_test") {}
27 // Param variant of DBTestBase::ChangeCompactOptions
28 class DBTestCompactionFilterWithCompactParam
29 : public DBTestCompactionFilter
,
30 public ::testing::WithParamInterface
<DBTestBase::OptionConfig
> {
32 DBTestCompactionFilterWithCompactParam() : DBTestCompactionFilter() {
33 option_config_
= GetParam();
34 Destroy(last_options_
);
35 auto options
= CurrentOptions();
36 if (option_config_
== kDefault
|| option_config_
== kUniversalCompaction
||
37 option_config_
== kUniversalCompactionMultiLevel
) {
38 options
.create_if_missing
= true;
40 if (option_config_
== kLevelSubcompactions
||
41 option_config_
== kUniversalSubcompactions
) {
42 assert(options
.max_subcompactions
> 1);
48 #ifndef ROCKSDB_VALGRIND_RUN
49 INSTANTIATE_TEST_CASE_P(
50 DBTestCompactionFilterWithCompactOption
,
51 DBTestCompactionFilterWithCompactParam
,
52 ::testing::Values(DBTestBase::OptionConfig::kDefault
,
53 DBTestBase::OptionConfig::kUniversalCompaction
,
54 DBTestBase::OptionConfig::kUniversalCompactionMultiLevel
,
55 DBTestBase::OptionConfig::kLevelSubcompactions
,
56 DBTestBase::OptionConfig::kUniversalSubcompactions
));
58 // Run fewer cases in valgrind
59 INSTANTIATE_TEST_CASE_P(DBTestCompactionFilterWithCompactOption
,
60 DBTestCompactionFilterWithCompactParam
,
61 ::testing::Values(DBTestBase::OptionConfig::kDefault
));
62 #endif // ROCKSDB_VALGRIND_RUN
64 class KeepFilter
: public CompactionFilter
{
66 virtual bool Filter(int /*level*/, const Slice
& /*key*/,
67 const Slice
& /*value*/, std::string
* /*new_value*/,
68 bool* /*value_changed*/) const override
{
73 virtual const char* Name() const override
{ return "KeepFilter"; }
76 class DeleteFilter
: public CompactionFilter
{
78 virtual bool Filter(int /*level*/, const Slice
& /*key*/,
79 const Slice
& /*value*/, std::string
* /*new_value*/,
80 bool* /*value_changed*/) const override
{
85 virtual const char* Name() const override
{ return "DeleteFilter"; }
88 class DeleteISFilter
: public CompactionFilter
{
90 virtual bool Filter(int /*level*/, const Slice
& key
, const Slice
& /*value*/,
91 std::string
* /*new_value*/,
92 bool* /*value_changed*/) const override
{
94 int i
= std::stoi(key
.ToString());
95 if (i
> 5 && i
<= 105) {
101 virtual bool IgnoreSnapshots() const override
{ return true; }
103 virtual const char* Name() const override
{ return "DeleteFilter"; }
106 // Skip x if floor(x/10) is even, use range skips. Requires that keys are
107 // zero-padded to length 10.
108 class SkipEvenFilter
: public CompactionFilter
{
110 virtual Decision
FilterV2(int /*level*/, const Slice
& key
,
111 ValueType
/*value_type*/,
112 const Slice
& /*existing_value*/,
113 std::string
* /*new_value*/,
114 std::string
* skip_until
) const override
{
116 int i
= std::stoi(key
.ToString());
117 if (i
/ 10 % 2 == 0) {
119 snprintf(key_str
, sizeof(key_str
), "%010d", i
/ 10 * 10 + 10);
120 *skip_until
= key_str
;
122 return Decision::kRemoveAndSkipUntil
;
124 return Decision::kKeep
;
127 virtual bool IgnoreSnapshots() const override
{ return true; }
129 virtual const char* Name() const override
{ return "DeleteFilter"; }
132 class DelayFilter
: public CompactionFilter
{
134 explicit DelayFilter(DBTestBase
* d
) : db_test(d
) {}
135 virtual bool Filter(int /*level*/, const Slice
& /*key*/,
136 const Slice
& /*value*/, std::string
* /*new_value*/,
137 bool* /*value_changed*/) const override
{
138 db_test
->env_
->addon_time_
.fetch_add(1000);
142 virtual const char* Name() const override
{ return "DelayFilter"; }
148 class ConditionalFilter
: public CompactionFilter
{
150 explicit ConditionalFilter(const std::string
* filtered_value
)
151 : filtered_value_(filtered_value
) {}
152 virtual bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& value
,
153 std::string
* /*new_value*/,
154 bool* /*value_changed*/) const override
{
155 return value
.ToString() == *filtered_value_
;
158 virtual const char* Name() const override
{ return "ConditionalFilter"; }
161 const std::string
* filtered_value_
;
164 class ChangeFilter
: public CompactionFilter
{
166 explicit ChangeFilter() {}
168 virtual bool Filter(int /*level*/, const Slice
& /*key*/,
169 const Slice
& /*value*/, std::string
* new_value
,
170 bool* value_changed
) const override
{
171 assert(new_value
!= nullptr);
172 *new_value
= NEW_VALUE
;
173 *value_changed
= true;
177 virtual const char* Name() const override
{ return "ChangeFilter"; }
180 class KeepFilterFactory
: public CompactionFilterFactory
{
182 explicit KeepFilterFactory(bool check_context
= false,
183 bool check_context_cf_id
= false)
184 : check_context_(check_context
),
185 check_context_cf_id_(check_context_cf_id
),
186 compaction_filter_created_(false) {}
188 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
189 const CompactionFilter::Context
& context
) override
{
190 if (check_context_
) {
191 EXPECT_EQ(expect_full_compaction_
.load(), context
.is_full_compaction
);
192 EXPECT_EQ(expect_manual_compaction_
.load(), context
.is_manual_compaction
);
194 if (check_context_cf_id_
) {
195 EXPECT_EQ(expect_cf_id_
.load(), context
.column_family_id
);
197 compaction_filter_created_
= true;
198 return std::unique_ptr
<CompactionFilter
>(new KeepFilter());
201 bool compaction_filter_created() const { return compaction_filter_created_
; }
203 virtual const char* Name() const override
{ return "KeepFilterFactory"; }
205 bool check_context_cf_id_
;
206 std::atomic_bool expect_full_compaction_
;
207 std::atomic_bool expect_manual_compaction_
;
208 std::atomic
<uint32_t> expect_cf_id_
;
209 bool compaction_filter_created_
;
212 class DeleteFilterFactory
: public CompactionFilterFactory
{
214 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
215 const CompactionFilter::Context
& context
) override
{
216 if (context
.is_manual_compaction
) {
217 return std::unique_ptr
<CompactionFilter
>(new DeleteFilter());
219 return std::unique_ptr
<CompactionFilter
>(nullptr);
223 virtual const char* Name() const override
{ return "DeleteFilterFactory"; }
226 // Delete Filter Factory which ignores snapshots
227 class DeleteISFilterFactory
: public CompactionFilterFactory
{
229 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
230 const CompactionFilter::Context
& context
) override
{
231 if (context
.is_manual_compaction
) {
232 return std::unique_ptr
<CompactionFilter
>(new DeleteISFilter());
234 return std::unique_ptr
<CompactionFilter
>(nullptr);
238 virtual const char* Name() const override
{ return "DeleteFilterFactory"; }
241 class SkipEvenFilterFactory
: public CompactionFilterFactory
{
243 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
244 const CompactionFilter::Context
& context
) override
{
245 if (context
.is_manual_compaction
) {
246 return std::unique_ptr
<CompactionFilter
>(new SkipEvenFilter());
248 return std::unique_ptr
<CompactionFilter
>(nullptr);
252 virtual const char* Name() const override
{ return "SkipEvenFilterFactory"; }
255 class DelayFilterFactory
: public CompactionFilterFactory
{
257 explicit DelayFilterFactory(DBTestBase
* d
) : db_test(d
) {}
258 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
259 const CompactionFilter::Context
& /*context*/) override
{
260 return std::unique_ptr
<CompactionFilter
>(new DelayFilter(db_test
));
263 virtual const char* Name() const override
{ return "DelayFilterFactory"; }
269 class ConditionalFilterFactory
: public CompactionFilterFactory
{
271 explicit ConditionalFilterFactory(const Slice
& filtered_value
)
272 : filtered_value_(filtered_value
.ToString()) {}
274 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
275 const CompactionFilter::Context
& /*context*/) override
{
276 return std::unique_ptr
<CompactionFilter
>(
277 new ConditionalFilter(&filtered_value_
));
280 virtual const char* Name() const override
{
281 return "ConditionalFilterFactory";
285 std::string filtered_value_
;
288 class ChangeFilterFactory
: public CompactionFilterFactory
{
290 explicit ChangeFilterFactory() {}
292 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
293 const CompactionFilter::Context
& /*context*/) override
{
294 return std::unique_ptr
<CompactionFilter
>(new ChangeFilter());
297 virtual const char* Name() const override
{ return "ChangeFilterFactory"; }
301 TEST_F(DBTestCompactionFilter
, CompactionFilter
) {
302 Options options
= CurrentOptions();
303 options
.max_open_files
= -1;
304 options
.num_levels
= 3;
305 options
.compaction_filter_factory
= std::make_shared
<KeepFilterFactory
>();
306 options
= CurrentOptions(options
);
307 CreateAndReopenWithCF({"pikachu"}, options
);
309 // Write 100K keys, these are written to a few files in L0.
310 const std::string
value(10, 'x');
311 for (int i
= 0; i
< 100000; i
++) {
313 snprintf(key
, sizeof(key
), "B%010d", i
);
318 // Push all files to the highest level L2. Verify that
319 // the compaction is each level invokes the filter for
320 // all the keys in that level.
322 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
323 ASSERT_EQ(cfilter_count
, 100000);
325 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
326 ASSERT_EQ(cfilter_count
, 100000);
328 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
329 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
330 ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
333 // All the files are in the lowest level.
334 // Verify that all but the 100001st record
335 // has sequence number zero. The 100001st record
336 // is at the tip of this snapshot and cannot
342 InternalKeyComparator
icmp(options
.comparator
);
343 RangeDelAggregator
range_del_agg(icmp
, {} /* snapshots */);
344 ScopedArenaIterator
iter(
345 dbfull()->NewInternalIterator(&arena
, &range_del_agg
, handles_
[1]));
347 ASSERT_OK(iter
->status());
348 while (iter
->Valid()) {
349 ParsedInternalKey
ikey(Slice(), 0, kTypeValue
);
350 ASSERT_EQ(ParseInternalKey(iter
->key(), &ikey
), true);
352 if (ikey
.sequence
!= 0) {
358 ASSERT_EQ(total
, 100000);
361 // overwrite all the 100K keys once again.
362 for (int i
= 0; i
< 100000; i
++) {
364 snprintf(key
, sizeof(key
), "B%010d", i
);
365 ASSERT_OK(Put(1, key
, value
));
369 // push all files to the highest level L2. This
370 // means that all keys should pass at least once
371 // via the compaction filter
373 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
374 ASSERT_EQ(cfilter_count
, 100000);
376 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
377 ASSERT_EQ(cfilter_count
, 100000);
378 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
379 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
380 ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
382 // create a new database with the compaction
383 // filter in such a way that it deletes all keys
384 options
.compaction_filter_factory
= std::make_shared
<DeleteFilterFactory
>();
385 options
.create_if_missing
= true;
386 DestroyAndReopen(options
);
387 CreateAndReopenWithCF({"pikachu"}, options
);
389 // write all the keys once again.
390 for (int i
= 0; i
< 100000; i
++) {
392 snprintf(key
, sizeof(key
), "B%010d", i
);
393 ASSERT_OK(Put(1, key
, value
));
396 ASSERT_NE(NumTableFilesAtLevel(0, 1), 0);
397 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
398 ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0);
400 // Push all files to the highest level L2. This
401 // triggers the compaction filter to delete all keys,
402 // verify that at the end of the compaction process,
405 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
406 ASSERT_EQ(cfilter_count
, 100000);
408 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
409 ASSERT_EQ(cfilter_count
, 0);
410 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
411 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
414 // Scan the entire database to ensure that nothing is left
415 std::unique_ptr
<Iterator
> iter(
416 db_
->NewIterator(ReadOptions(), handles_
[1]));
419 while (iter
->Valid()) {
426 // The sequence number of the remaining record
427 // is not zeroed out even though it is at the
428 // level Lmax because this record is at the tip
431 InternalKeyComparator
icmp(options
.comparator
);
432 RangeDelAggregator
range_del_agg(icmp
, {} /* snapshots */);
433 ScopedArenaIterator
iter(
434 dbfull()->NewInternalIterator(&arena
, &range_del_agg
, handles_
[1]));
436 ASSERT_OK(iter
->status());
437 while (iter
->Valid()) {
438 ParsedInternalKey
ikey(Slice(), 0, kTypeValue
);
439 ASSERT_EQ(ParseInternalKey(iter
->key(), &ikey
), true);
440 ASSERT_NE(ikey
.sequence
, (unsigned)0);
448 // Tests the edge case where compaction does not produce any output -- all
449 // entries are deleted. The compaction should create bunch of 'DeleteFile'
450 // entries in VersionEdit, but none of the 'AddFile's.
451 TEST_F(DBTestCompactionFilter
, CompactionFilterDeletesAll
) {
452 Options options
= CurrentOptions();
453 options
.compaction_filter_factory
= std::make_shared
<DeleteFilterFactory
>();
454 options
.disable_auto_compactions
= true;
455 options
.create_if_missing
= true;
456 DestroyAndReopen(options
);
459 for (int table
= 0; table
< 4; ++table
) {
460 for (int i
= 0; i
< 10 + table
; ++i
) {
461 Put(ToString(table
* 100 + i
), "val");
466 // this will produce empty file (delete compaction filter)
467 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
468 ASSERT_EQ(0U, CountLiveFiles());
472 Iterator
* itr
= db_
->NewIterator(ReadOptions());
475 ASSERT_TRUE(!itr
->Valid());
479 #endif // ROCKSDB_LITE
481 TEST_P(DBTestCompactionFilterWithCompactParam
,
482 CompactionFilterWithValueChange
) {
483 Options options
= CurrentOptions();
484 options
.num_levels
= 3;
485 options
.compaction_filter_factory
= std::make_shared
<ChangeFilterFactory
>();
486 CreateAndReopenWithCF({"pikachu"}, options
);
488 // Write 100K+1 keys, these are written to a few files
489 // in L0. We do this so that the current snapshot points
490 // to the 100001 key.The compaction filter is not invoked
491 // on keys that are visible via a snapshot because we
492 // anyways cannot delete it.
493 const std::string
value(10, 'x');
494 for (int i
= 0; i
< 100001; i
++) {
496 snprintf(key
, sizeof(key
), "B%010d", i
);
500 // push all files to lower levels
502 if (option_config_
!= kUniversalCompactionMultiLevel
&&
503 option_config_
!= kUniversalSubcompactions
) {
504 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
505 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
507 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
511 // re-write all data again
512 for (int i
= 0; i
< 100001; i
++) {
514 snprintf(key
, sizeof(key
), "B%010d", i
);
518 // push all files to lower levels. This should
519 // invoke the compaction filter for all 100000 keys.
521 if (option_config_
!= kUniversalCompactionMultiLevel
&&
522 option_config_
!= kUniversalSubcompactions
) {
523 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
524 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
526 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
530 // verify that all keys now have the new value that
531 // was set by the compaction process.
532 for (int i
= 0; i
< 100001; i
++) {
534 snprintf(key
, sizeof(key
), "B%010d", i
);
535 std::string newvalue
= Get(1, key
);
536 ASSERT_EQ(newvalue
.compare(NEW_VALUE
), 0);
540 TEST_F(DBTestCompactionFilter
, CompactionFilterWithMergeOperator
) {
541 std::string one
, two
, three
, four
;
544 PutFixed64(&three
, 3);
545 PutFixed64(&four
, 4);
547 Options options
= CurrentOptions();
548 options
.create_if_missing
= true;
549 options
.merge_operator
= MergeOperators::CreateUInt64AddOperator();
550 options
.num_levels
= 3;
551 // Filter out keys with value is 2.
552 options
.compaction_filter_factory
=
553 std::make_shared
<ConditionalFilterFactory
>(two
);
554 DestroyAndReopen(options
);
556 // In the same compaction, a value type needs to be deleted based on
557 // compaction filter, and there is a merge type for the key. compaction
558 // filter result is ignored.
559 ASSERT_OK(db_
->Put(WriteOptions(), "foo", two
));
561 ASSERT_OK(db_
->Merge(WriteOptions(), "foo", one
));
563 std::string newvalue
= Get("foo");
564 ASSERT_EQ(newvalue
, three
);
565 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
566 newvalue
= Get("foo");
567 ASSERT_EQ(newvalue
, three
);
569 // value key can be deleted based on compaction filter, leaving only
571 ASSERT_OK(db_
->Put(WriteOptions(), "bar", two
));
573 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
574 newvalue
= Get("bar");
575 ASSERT_EQ("NOT_FOUND", newvalue
);
576 ASSERT_OK(db_
->Merge(WriteOptions(), "bar", two
));
578 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
579 newvalue
= Get("bar");
582 // Compaction filter never applies to merge keys.
583 ASSERT_OK(db_
->Put(WriteOptions(), "foobar", one
));
585 ASSERT_OK(db_
->Merge(WriteOptions(), "foobar", two
));
587 newvalue
= Get("foobar");
588 ASSERT_EQ(newvalue
, three
);
589 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
590 newvalue
= Get("foobar");
591 ASSERT_EQ(newvalue
, three
);
593 // In the same compaction, both of value type and merge type keys need to be
594 // deleted based on compaction filter, and there is a merge type for the key.
595 // For both keys, compaction filter results are ignored.
596 ASSERT_OK(db_
->Put(WriteOptions(), "barfoo", two
));
598 ASSERT_OK(db_
->Merge(WriteOptions(), "barfoo", two
));
600 newvalue
= Get("barfoo");
601 ASSERT_EQ(newvalue
, four
);
602 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
603 newvalue
= Get("barfoo");
604 ASSERT_EQ(newvalue
, four
);
608 TEST_F(DBTestCompactionFilter
, CompactionFilterContextManual
) {
609 KeepFilterFactory
* filter
= new KeepFilterFactory(true, true);
611 Options options
= CurrentOptions();
612 options
.compaction_style
= kCompactionStyleUniversal
;
613 options
.compaction_filter_factory
.reset(filter
);
614 options
.compression
= kNoCompression
;
615 options
.level0_file_num_compaction_trigger
= 8;
617 int num_keys_per_file
= 400;
618 for (int j
= 0; j
< 3; j
++) {
619 // Write several keys.
620 const std::string
value(10, 'x');
621 for (int i
= 0; i
< num_keys_per_file
; i
++) {
623 snprintf(key
, sizeof(key
), "B%08d%02d", i
, j
);
626 dbfull()->TEST_FlushMemTable();
627 // Make sure next file is much smaller so automatic compaction will not
629 num_keys_per_file
/= 2;
631 dbfull()->TEST_WaitForCompact();
633 // Force a manual compaction
635 filter
->expect_manual_compaction_
.store(true);
636 filter
->expect_full_compaction_
.store(true);
637 filter
->expect_cf_id_
.store(0);
638 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
639 ASSERT_EQ(cfilter_count
, 700);
640 ASSERT_EQ(NumSortedRuns(0), 1);
641 ASSERT_TRUE(filter
->compaction_filter_created());
643 // Verify total number of keys is correct after manual compaction.
648 InternalKeyComparator
icmp(options
.comparator
);
649 RangeDelAggregator
range_del_agg(icmp
, {} /* snapshots */);
650 ScopedArenaIterator
iter(
651 dbfull()->NewInternalIterator(&arena
, &range_del_agg
));
653 ASSERT_OK(iter
->status());
654 while (iter
->Valid()) {
655 ParsedInternalKey
ikey(Slice(), 0, kTypeValue
);
656 ASSERT_EQ(ParseInternalKey(iter
->key(), &ikey
), true);
658 if (ikey
.sequence
!= 0) {
663 ASSERT_EQ(total
, 700);
667 #endif // ROCKSDB_LITE
669 TEST_F(DBTestCompactionFilter
, CompactionFilterContextCfId
) {
670 KeepFilterFactory
* filter
= new KeepFilterFactory(false, true);
671 filter
->expect_cf_id_
.store(1);
673 Options options
= CurrentOptions();
674 options
.compaction_filter_factory
.reset(filter
);
675 options
.compression
= kNoCompression
;
676 options
.level0_file_num_compaction_trigger
= 2;
677 CreateAndReopenWithCF({"pikachu"}, options
);
679 int num_keys_per_file
= 400;
680 for (int j
= 0; j
< 3; j
++) {
681 // Write several keys.
682 const std::string
value(10, 'x');
683 for (int i
= 0; i
< num_keys_per_file
; i
++) {
685 snprintf(key
, sizeof(key
), "B%08d%02d", i
, j
);
689 // Make sure next file is much smaller so automatic compaction will not
691 num_keys_per_file
/= 2;
693 dbfull()->TEST_WaitForCompact();
695 ASSERT_TRUE(filter
->compaction_filter_created());
699 // Compaction filters should only be applied to records that are newer than the
700 // latest snapshot. This test inserts records and applies a delete filter.
701 TEST_F(DBTestCompactionFilter
, CompactionFilterSnapshot
) {
702 Options options
= CurrentOptions();
703 options
.compaction_filter_factory
= std::make_shared
<DeleteFilterFactory
>();
704 options
.disable_auto_compactions
= true;
705 options
.create_if_missing
= true;
706 DestroyAndReopen(options
);
709 const Snapshot
* snapshot
= nullptr;
710 for (int table
= 0; table
< 4; ++table
) {
711 for (int i
= 0; i
< 10; ++i
) {
712 Put(ToString(table
* 100 + i
), "val");
717 snapshot
= db_
->GetSnapshot();
720 assert(snapshot
!= nullptr);
723 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
724 // The filter should delete 10 records.
725 ASSERT_EQ(30U, cfilter_count
);
727 // Release the snapshot and compact again -> now all records should be
729 db_
->ReleaseSnapshot(snapshot
);
730 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
731 ASSERT_EQ(0U, CountLiveFiles());
734 // Compaction filters should only be applied to records that are newer than the
735 // latest snapshot. However, if the compaction filter asks to ignore snapshots
736 // records newer than the snapshot will also be processed
737 TEST_F(DBTestCompactionFilter
, CompactionFilterIgnoreSnapshot
) {
738 std::string five
= ToString(5);
739 Options options
= CurrentOptions();
740 options
.compaction_filter_factory
= std::make_shared
<DeleteISFilterFactory
>();
741 options
.disable_auto_compactions
= true;
742 options
.create_if_missing
= true;
743 DestroyAndReopen(options
);
746 const Snapshot
* snapshot
= nullptr;
747 for (int table
= 0; table
< 4; ++table
) {
748 for (int i
= 0; i
< 10; ++i
) {
749 Put(ToString(table
* 100 + i
), "val");
754 snapshot
= db_
->GetSnapshot();
757 assert(snapshot
!= nullptr);
760 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
761 // The filter should delete 40 records.
762 ASSERT_EQ(40U, cfilter_count
);
765 // Scan the entire database as of the snapshot to ensure
766 // that nothing is left
767 ReadOptions read_options
;
768 read_options
.snapshot
= snapshot
;
769 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
772 while (iter
->Valid()) {
777 read_options
.snapshot
= nullptr;
778 std::unique_ptr
<Iterator
> iter1(db_
->NewIterator(read_options
));
779 iter1
->SeekToFirst();
781 while (iter1
->Valid()) {
785 // We have deleted 10 keys from 40 using the compaction filter
786 // Keys 6-9 before the snapshot and 100-105 after the snapshot
787 ASSERT_EQ(count
, 30);
790 // Release the snapshot and compact again -> now all records should be
792 db_
->ReleaseSnapshot(snapshot
);
794 #endif // ROCKSDB_LITE
796 TEST_F(DBTestCompactionFilter
, SkipUntil
) {
797 Options options
= CurrentOptions();
798 options
.compaction_filter_factory
= std::make_shared
<SkipEvenFilterFactory
>();
799 options
.disable_auto_compactions
= true;
800 options
.create_if_missing
= true;
801 DestroyAndReopen(options
);
803 // Write 100K keys, these are written to a few files in L0.
804 for (int table
= 0; table
< 4; ++table
) {
805 // Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371].
806 for (int i
= table
* 6; i
< 39 + table
* 11; ++i
) {
808 snprintf(key
, sizeof(key
), "%010d", table
* 100 + i
);
809 Put(key
, std::to_string(table
* 1000 + i
));
815 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
816 // Number of skips in tables: 2, 3, 3, 3.
817 ASSERT_EQ(11, cfilter_skips
);
819 for (int table
= 0; table
< 4; ++table
) {
820 for (int i
= table
* 6; i
< 39 + table
* 11; ++i
) {
821 int k
= table
* 100 + i
;
823 snprintf(key
, sizeof(key
), "%010d", table
* 100 + i
);
824 auto expected
= std::to_string(table
* 1000 + i
);
826 Status s
= db_
->Get(ReadOptions(), key
, &val
);
827 if (k
/ 10 % 2 == 0) {
828 ASSERT_TRUE(s
.IsNotFound());
831 ASSERT_EQ(expected
, val
);
837 TEST_F(DBTestCompactionFilter
, SkipUntilWithBloomFilter
) {
838 BlockBasedTableOptions table_options
;
839 table_options
.whole_key_filtering
= false;
840 table_options
.filter_policy
.reset(NewBloomFilterPolicy(100, false));
842 Options options
= CurrentOptions();
843 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
844 options
.prefix_extractor
.reset(NewCappedPrefixTransform(9));
845 options
.compaction_filter_factory
= std::make_shared
<SkipEvenFilterFactory
>();
846 options
.disable_auto_compactions
= true;
847 options
.create_if_missing
= true;
848 DestroyAndReopen(options
);
850 Put("0000000010", "v10");
851 Put("0000000020", "v20"); // skipped
852 Put("0000000050", "v50");
856 EXPECT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
857 EXPECT_EQ(1, cfilter_skips
);
862 s
= db_
->Get(ReadOptions(), "0000000010", &val
);
864 EXPECT_EQ("v10", val
);
866 s
= db_
->Get(ReadOptions(), "0000000020", &val
);
867 EXPECT_TRUE(s
.IsNotFound());
869 s
= db_
->Get(ReadOptions(), "0000000050", &val
);
871 EXPECT_EQ("v50", val
);
874 } // namespace rocksdb
876 int main(int argc
, char** argv
) {
877 rocksdb::port::InstallStackTraceHandler();
878 ::testing::InitGoogleTest(&argc
, argv
);
879 return RUN_ALL_TESTS();