1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
13 namespace ROCKSDB_NAMESPACE
{
15 static int cfilter_count
= 0;
16 static int cfilter_skips
= 0;
18 // This is a static filter used for filtering
19 // kvs during the compaction process.
20 static std::string NEW_VALUE
= "NewValue";
22 class DBTestCompactionFilter
: public DBTestBase
{
24 DBTestCompactionFilter()
25 : DBTestBase("/db_compaction_filter_test", /*env_do_fsync=*/true) {}
28 // Param variant of DBTestBase::ChangeCompactOptions
29 class DBTestCompactionFilterWithCompactParam
30 : public DBTestCompactionFilter
,
31 public ::testing::WithParamInterface
<DBTestBase::OptionConfig
> {
33 DBTestCompactionFilterWithCompactParam() : DBTestCompactionFilter() {
34 option_config_
= GetParam();
35 Destroy(last_options_
);
36 auto options
= CurrentOptions();
37 if (option_config_
== kDefault
|| option_config_
== kUniversalCompaction
||
38 option_config_
== kUniversalCompactionMultiLevel
) {
39 options
.create_if_missing
= true;
41 if (option_config_
== kLevelSubcompactions
||
42 option_config_
== kUniversalSubcompactions
) {
43 assert(options
.max_subcompactions
> 1);
49 #ifndef ROCKSDB_VALGRIND_RUN
50 INSTANTIATE_TEST_CASE_P(
51 CompactionFilterWithOption
, DBTestCompactionFilterWithCompactParam
,
52 ::testing::Values(DBTestBase::OptionConfig::kDefault
,
53 DBTestBase::OptionConfig::kUniversalCompaction
,
54 DBTestBase::OptionConfig::kUniversalCompactionMultiLevel
,
55 DBTestBase::OptionConfig::kLevelSubcompactions
,
56 DBTestBase::OptionConfig::kUniversalSubcompactions
));
58 // Run fewer cases in valgrind
59 INSTANTIATE_TEST_CASE_P(CompactionFilterWithOption
,
60 DBTestCompactionFilterWithCompactParam
,
61 ::testing::Values(DBTestBase::OptionConfig::kDefault
));
62 #endif // ROCKSDB_VALGRIND_RUN
64 class KeepFilter
: public CompactionFilter
{
66 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& /*value*/,
67 std::string
* /*new_value*/,
68 bool* /*value_changed*/) const override
{
73 const char* Name() const override
{ return "KeepFilter"; }
76 class DeleteFilter
: public CompactionFilter
{
78 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& /*value*/,
79 std::string
* /*new_value*/,
80 bool* /*value_changed*/) const override
{
85 const char* Name() const override
{ return "DeleteFilter"; }
88 class DeleteISFilter
: public CompactionFilter
{
90 bool Filter(int /*level*/, const Slice
& key
, const Slice
& /*value*/,
91 std::string
* /*new_value*/,
92 bool* /*value_changed*/) const override
{
94 int i
= std::stoi(key
.ToString());
95 if (i
> 5 && i
<= 105) {
101 bool IgnoreSnapshots() const override
{ return true; }
103 const char* Name() const override
{ return "DeleteFilter"; }
106 // Skip x if floor(x/10) is even, use range skips. Requires that keys are
107 // zero-padded to length 10.
108 class SkipEvenFilter
: public CompactionFilter
{
110 Decision
FilterV2(int /*level*/, const Slice
& key
, ValueType
/*value_type*/,
111 const Slice
& /*existing_value*/, std::string
* /*new_value*/,
112 std::string
* skip_until
) const override
{
114 int i
= std::stoi(key
.ToString());
115 if (i
/ 10 % 2 == 0) {
117 snprintf(key_str
, sizeof(key_str
), "%010d", i
/ 10 * 10 + 10);
118 *skip_until
= key_str
;
120 return Decision::kRemoveAndSkipUntil
;
122 return Decision::kKeep
;
125 bool IgnoreSnapshots() const override
{ return true; }
127 const char* Name() const override
{ return "DeleteFilter"; }
130 class ConditionalFilter
: public CompactionFilter
{
132 explicit ConditionalFilter(const std::string
* filtered_value
)
133 : filtered_value_(filtered_value
) {}
134 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& value
,
135 std::string
* /*new_value*/,
136 bool* /*value_changed*/) const override
{
137 return value
.ToString() == *filtered_value_
;
140 const char* Name() const override
{ return "ConditionalFilter"; }
143 const std::string
* filtered_value_
;
146 class ChangeFilter
: public CompactionFilter
{
148 explicit ChangeFilter() {}
150 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& /*value*/,
151 std::string
* new_value
, bool* value_changed
) const override
{
152 assert(new_value
!= nullptr);
153 *new_value
= NEW_VALUE
;
154 *value_changed
= true;
158 const char* Name() const override
{ return "ChangeFilter"; }
161 class KeepFilterFactory
: public CompactionFilterFactory
{
163 explicit KeepFilterFactory(bool check_context
= false,
164 bool check_context_cf_id
= false)
165 : check_context_(check_context
),
166 check_context_cf_id_(check_context_cf_id
),
167 compaction_filter_created_(false) {}
169 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
170 const CompactionFilter::Context
& context
) override
{
171 if (check_context_
) {
172 EXPECT_EQ(expect_full_compaction_
.load(), context
.is_full_compaction
);
173 EXPECT_EQ(expect_manual_compaction_
.load(), context
.is_manual_compaction
);
175 if (check_context_cf_id_
) {
176 EXPECT_EQ(expect_cf_id_
.load(), context
.column_family_id
);
178 compaction_filter_created_
= true;
179 return std::unique_ptr
<CompactionFilter
>(new KeepFilter());
182 bool compaction_filter_created() const { return compaction_filter_created_
; }
184 const char* Name() const override
{ return "KeepFilterFactory"; }
186 bool check_context_cf_id_
;
187 std::atomic_bool expect_full_compaction_
;
188 std::atomic_bool expect_manual_compaction_
;
189 std::atomic
<uint32_t> expect_cf_id_
;
190 bool compaction_filter_created_
;
193 class DeleteFilterFactory
: public CompactionFilterFactory
{
195 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
196 const CompactionFilter::Context
& context
) override
{
197 if (context
.is_manual_compaction
) {
198 return std::unique_ptr
<CompactionFilter
>(new DeleteFilter());
200 return std::unique_ptr
<CompactionFilter
>(nullptr);
204 const char* Name() const override
{ return "DeleteFilterFactory"; }
207 // Delete Filter Factory which ignores snapshots
208 class DeleteISFilterFactory
: public CompactionFilterFactory
{
210 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
211 const CompactionFilter::Context
& context
) override
{
212 if (context
.is_manual_compaction
) {
213 return std::unique_ptr
<CompactionFilter
>(new DeleteISFilter());
215 return std::unique_ptr
<CompactionFilter
>(nullptr);
219 const char* Name() const override
{ return "DeleteFilterFactory"; }
222 class SkipEvenFilterFactory
: public CompactionFilterFactory
{
224 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
225 const CompactionFilter::Context
& context
) override
{
226 if (context
.is_manual_compaction
) {
227 return std::unique_ptr
<CompactionFilter
>(new SkipEvenFilter());
229 return std::unique_ptr
<CompactionFilter
>(nullptr);
233 const char* Name() const override
{ return "SkipEvenFilterFactory"; }
236 class ConditionalFilterFactory
: public CompactionFilterFactory
{
238 explicit ConditionalFilterFactory(const Slice
& filtered_value
)
239 : filtered_value_(filtered_value
.ToString()) {}
241 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
242 const CompactionFilter::Context
& /*context*/) override
{
243 return std::unique_ptr
<CompactionFilter
>(
244 new ConditionalFilter(&filtered_value_
));
247 const char* Name() const override
{ return "ConditionalFilterFactory"; }
250 std::string filtered_value_
;
253 class ChangeFilterFactory
: public CompactionFilterFactory
{
255 explicit ChangeFilterFactory() {}
257 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
258 const CompactionFilter::Context
& /*context*/) override
{
259 return std::unique_ptr
<CompactionFilter
>(new ChangeFilter());
262 const char* Name() const override
{ return "ChangeFilterFactory"; }
266 TEST_F(DBTestCompactionFilter
, CompactionFilter
) {
267 Options options
= CurrentOptions();
268 options
.max_open_files
= -1;
269 options
.num_levels
= 3;
270 options
.compaction_filter_factory
= std::make_shared
<KeepFilterFactory
>();
271 options
= CurrentOptions(options
);
272 CreateAndReopenWithCF({"pikachu"}, options
);
274 // Write 100K keys, these are written to a few files in L0.
275 const std::string
value(10, 'x');
276 for (int i
= 0; i
< 100000; i
++) {
278 snprintf(key
, sizeof(key
), "B%010d", i
);
283 // Push all files to the highest level L2. Verify that
284 // the compaction is each level invokes the filter for
285 // all the keys in that level.
287 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
288 ASSERT_EQ(cfilter_count
, 100000);
290 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
291 ASSERT_EQ(cfilter_count
, 100000);
293 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
294 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
295 ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
298 // All the files are in the lowest level.
299 // Verify that all but the 100001st record
300 // has sequence number zero. The 100001st record
301 // is at the tip of this snapshot and cannot
307 InternalKeyComparator
icmp(options
.comparator
);
308 ReadRangeDelAggregator
range_del_agg(&icmp
,
309 kMaxSequenceNumber
/* upper_bound */);
310 ReadOptions read_options
;
311 ScopedArenaIterator
iter(dbfull()->NewInternalIterator(
312 read_options
, &arena
, &range_del_agg
, kMaxSequenceNumber
, handles_
[1]));
314 ASSERT_OK(iter
->status());
315 while (iter
->Valid()) {
316 ParsedInternalKey
ikey(Slice(), 0, kTypeValue
);
317 ASSERT_OK(ParseInternalKey(iter
->key(), &ikey
, true /* log_err_key */));
319 if (ikey
.sequence
!= 0) {
325 ASSERT_EQ(total
, 100000);
328 // overwrite all the 100K keys once again.
329 for (int i
= 0; i
< 100000; i
++) {
331 snprintf(key
, sizeof(key
), "B%010d", i
);
332 ASSERT_OK(Put(1, key
, value
));
336 // push all files to the highest level L2. This
337 // means that all keys should pass at least once
338 // via the compaction filter
340 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
341 ASSERT_EQ(cfilter_count
, 100000);
343 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
344 ASSERT_EQ(cfilter_count
, 100000);
345 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
346 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
347 ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
349 // create a new database with the compaction
350 // filter in such a way that it deletes all keys
351 options
.compaction_filter_factory
= std::make_shared
<DeleteFilterFactory
>();
352 options
.create_if_missing
= true;
353 DestroyAndReopen(options
);
354 CreateAndReopenWithCF({"pikachu"}, options
);
356 // write all the keys once again.
357 for (int i
= 0; i
< 100000; i
++) {
359 snprintf(key
, sizeof(key
), "B%010d", i
);
360 ASSERT_OK(Put(1, key
, value
));
363 ASSERT_NE(NumTableFilesAtLevel(0, 1), 0);
364 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
365 ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0);
367 // Push all files to the highest level L2. This
368 // triggers the compaction filter to delete all keys,
369 // verify that at the end of the compaction process,
372 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
373 ASSERT_EQ(cfilter_count
, 100000);
375 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
376 ASSERT_EQ(cfilter_count
, 0);
377 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
378 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
381 // Scan the entire database to ensure that nothing is left
382 std::unique_ptr
<Iterator
> iter(
383 db_
->NewIterator(ReadOptions(), handles_
[1]));
386 while (iter
->Valid()) {
393 // The sequence number of the remaining record
394 // is not zeroed out even though it is at the
395 // level Lmax because this record is at the tip
398 InternalKeyComparator
icmp(options
.comparator
);
399 ReadRangeDelAggregator
range_del_agg(&icmp
,
400 kMaxSequenceNumber
/* upper_bound */);
401 ReadOptions read_options
;
402 ScopedArenaIterator
iter(dbfull()->NewInternalIterator(
403 read_options
, &arena
, &range_del_agg
, kMaxSequenceNumber
, handles_
[1]));
405 ASSERT_OK(iter
->status());
406 while (iter
->Valid()) {
407 ParsedInternalKey
ikey(Slice(), 0, kTypeValue
);
408 ASSERT_OK(ParseInternalKey(iter
->key(), &ikey
, true /* log_err_key */));
409 ASSERT_NE(ikey
.sequence
, (unsigned)0);
417 // Tests the edge case where compaction does not produce any output -- all
418 // entries are deleted. The compaction should create bunch of 'DeleteFile'
419 // entries in VersionEdit, but none of the 'AddFile's.
420 TEST_F(DBTestCompactionFilter
, CompactionFilterDeletesAll
) {
421 Options options
= CurrentOptions();
422 options
.compaction_filter_factory
= std::make_shared
<DeleteFilterFactory
>();
423 options
.disable_auto_compactions
= true;
424 options
.create_if_missing
= true;
425 DestroyAndReopen(options
);
428 for (int table
= 0; table
< 4; ++table
) {
429 for (int i
= 0; i
< 10 + table
; ++i
) {
430 Put(ToString(table
* 100 + i
), "val");
435 // this will produce empty file (delete compaction filter)
436 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
437 ASSERT_EQ(0U, CountLiveFiles());
441 Iterator
* itr
= db_
->NewIterator(ReadOptions());
444 ASSERT_TRUE(!itr
->Valid());
448 #endif // ROCKSDB_LITE
450 TEST_P(DBTestCompactionFilterWithCompactParam
,
451 CompactionFilterWithValueChange
) {
452 Options options
= CurrentOptions();
453 options
.num_levels
= 3;
454 options
.compaction_filter_factory
= std::make_shared
<ChangeFilterFactory
>();
455 CreateAndReopenWithCF({"pikachu"}, options
);
457 // Write 100K+1 keys, these are written to a few files
458 // in L0. We do this so that the current snapshot points
459 // to the 100001 key.The compaction filter is not invoked
460 // on keys that are visible via a snapshot because we
461 // anyways cannot delete it.
462 const std::string
value(10, 'x');
463 for (int i
= 0; i
< 100001; i
++) {
465 snprintf(key
, sizeof(key
), "B%010d", i
);
469 // push all files to lower levels
471 if (option_config_
!= kUniversalCompactionMultiLevel
&&
472 option_config_
!= kUniversalSubcompactions
) {
473 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
474 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
476 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
480 // re-write all data again
481 for (int i
= 0; i
< 100001; i
++) {
483 snprintf(key
, sizeof(key
), "B%010d", i
);
487 // push all files to lower levels. This should
488 // invoke the compaction filter for all 100000 keys.
490 if (option_config_
!= kUniversalCompactionMultiLevel
&&
491 option_config_
!= kUniversalSubcompactions
) {
492 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_
[1]);
493 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_
[1]);
495 dbfull()->CompactRange(CompactRangeOptions(), handles_
[1], nullptr,
499 // verify that all keys now have the new value that
500 // was set by the compaction process.
501 for (int i
= 0; i
< 100001; i
++) {
503 snprintf(key
, sizeof(key
), "B%010d", i
);
504 std::string newvalue
= Get(1, key
);
505 ASSERT_EQ(newvalue
.compare(NEW_VALUE
), 0);
509 TEST_F(DBTestCompactionFilter
, CompactionFilterWithMergeOperator
) {
510 std::string one
, two
, three
, four
;
513 PutFixed64(&three
, 3);
514 PutFixed64(&four
, 4);
516 Options options
= CurrentOptions();
517 options
.create_if_missing
= true;
518 options
.merge_operator
= MergeOperators::CreateUInt64AddOperator();
519 options
.num_levels
= 3;
520 // Filter out keys with value is 2.
521 options
.compaction_filter_factory
=
522 std::make_shared
<ConditionalFilterFactory
>(two
);
523 DestroyAndReopen(options
);
525 // In the same compaction, a value type needs to be deleted based on
526 // compaction filter, and there is a merge type for the key. compaction
527 // filter result is ignored.
528 ASSERT_OK(db_
->Put(WriteOptions(), "foo", two
));
530 ASSERT_OK(db_
->Merge(WriteOptions(), "foo", one
));
532 std::string newvalue
= Get("foo");
533 ASSERT_EQ(newvalue
, three
);
534 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
535 newvalue
= Get("foo");
536 ASSERT_EQ(newvalue
, three
);
538 // value key can be deleted based on compaction filter, leaving only
540 ASSERT_OK(db_
->Put(WriteOptions(), "bar", two
));
542 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
543 newvalue
= Get("bar");
544 ASSERT_EQ("NOT_FOUND", newvalue
);
545 ASSERT_OK(db_
->Merge(WriteOptions(), "bar", two
));
547 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
548 newvalue
= Get("bar");
551 // Compaction filter never applies to merge keys.
552 ASSERT_OK(db_
->Put(WriteOptions(), "foobar", one
));
554 ASSERT_OK(db_
->Merge(WriteOptions(), "foobar", two
));
556 newvalue
= Get("foobar");
557 ASSERT_EQ(newvalue
, three
);
558 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
559 newvalue
= Get("foobar");
560 ASSERT_EQ(newvalue
, three
);
562 // In the same compaction, both of value type and merge type keys need to be
563 // deleted based on compaction filter, and there is a merge type for the key.
564 // For both keys, compaction filter results are ignored.
565 ASSERT_OK(db_
->Put(WriteOptions(), "barfoo", two
));
567 ASSERT_OK(db_
->Merge(WriteOptions(), "barfoo", two
));
569 newvalue
= Get("barfoo");
570 ASSERT_EQ(newvalue
, four
);
571 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
572 newvalue
= Get("barfoo");
573 ASSERT_EQ(newvalue
, four
);
577 TEST_F(DBTestCompactionFilter
, CompactionFilterContextManual
) {
578 KeepFilterFactory
* filter
= new KeepFilterFactory(true, true);
580 Options options
= CurrentOptions();
581 options
.compaction_style
= kCompactionStyleUniversal
;
582 options
.compaction_filter_factory
.reset(filter
);
583 options
.compression
= kNoCompression
;
584 options
.level0_file_num_compaction_trigger
= 8;
586 int num_keys_per_file
= 400;
587 for (int j
= 0; j
< 3; j
++) {
588 // Write several keys.
589 const std::string
value(10, 'x');
590 for (int i
= 0; i
< num_keys_per_file
; i
++) {
592 snprintf(key
, sizeof(key
), "B%08d%02d", i
, j
);
595 dbfull()->TEST_FlushMemTable();
596 // Make sure next file is much smaller so automatic compaction will not
598 num_keys_per_file
/= 2;
600 dbfull()->TEST_WaitForCompact();
602 // Force a manual compaction
604 filter
->expect_manual_compaction_
.store(true);
605 filter
->expect_full_compaction_
.store(true);
606 filter
->expect_cf_id_
.store(0);
607 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
608 ASSERT_EQ(cfilter_count
, 700);
609 ASSERT_EQ(NumSortedRuns(0), 1);
610 ASSERT_TRUE(filter
->compaction_filter_created());
612 // Verify total number of keys is correct after manual compaction.
617 InternalKeyComparator
icmp(options
.comparator
);
618 ReadRangeDelAggregator
range_del_agg(&icmp
,
619 kMaxSequenceNumber
/* snapshots */);
620 ReadOptions read_options
;
621 ScopedArenaIterator
iter(dbfull()->NewInternalIterator(
622 read_options
, &arena
, &range_del_agg
, kMaxSequenceNumber
));
624 ASSERT_OK(iter
->status());
625 while (iter
->Valid()) {
626 ParsedInternalKey
ikey(Slice(), 0, kTypeValue
);
627 ASSERT_OK(ParseInternalKey(iter
->key(), &ikey
, true /* log_err_key */));
629 if (ikey
.sequence
!= 0) {
634 ASSERT_EQ(total
, 700);
638 #endif // ROCKSDB_LITE
640 TEST_F(DBTestCompactionFilter
, CompactionFilterContextCfId
) {
641 KeepFilterFactory
* filter
= new KeepFilterFactory(false, true);
642 filter
->expect_cf_id_
.store(1);
644 Options options
= CurrentOptions();
645 options
.compaction_filter_factory
.reset(filter
);
646 options
.compression
= kNoCompression
;
647 options
.level0_file_num_compaction_trigger
= 2;
648 CreateAndReopenWithCF({"pikachu"}, options
);
650 int num_keys_per_file
= 400;
651 for (int j
= 0; j
< 3; j
++) {
652 // Write several keys.
653 const std::string
value(10, 'x');
654 for (int i
= 0; i
< num_keys_per_file
; i
++) {
656 snprintf(key
, sizeof(key
), "B%08d%02d", i
, j
);
660 // Make sure next file is much smaller so automatic compaction will not
662 num_keys_per_file
/= 2;
664 dbfull()->TEST_WaitForCompact();
666 ASSERT_TRUE(filter
->compaction_filter_created());
670 // Compaction filters aplies to all records, regardless snapshots.
671 TEST_F(DBTestCompactionFilter
, CompactionFilterIgnoreSnapshot
) {
672 std::string five
= ToString(5);
673 Options options
= CurrentOptions();
674 options
.compaction_filter_factory
= std::make_shared
<DeleteISFilterFactory
>();
675 options
.disable_auto_compactions
= true;
676 options
.create_if_missing
= true;
677 DestroyAndReopen(options
);
680 const Snapshot
* snapshot
= nullptr;
681 for (int table
= 0; table
< 4; ++table
) {
682 for (int i
= 0; i
< 10; ++i
) {
683 Put(ToString(table
* 100 + i
), "val");
688 snapshot
= db_
->GetSnapshot();
691 assert(snapshot
!= nullptr);
694 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
695 // The filter should delete 40 records.
696 ASSERT_EQ(40, cfilter_count
);
699 // Scan the entire database as of the snapshot to ensure
700 // that nothing is left
701 ReadOptions read_options
;
702 read_options
.snapshot
= snapshot
;
703 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
706 while (iter
->Valid()) {
711 read_options
.snapshot
= nullptr;
712 std::unique_ptr
<Iterator
> iter1(db_
->NewIterator(read_options
));
713 iter1
->SeekToFirst();
715 while (iter1
->Valid()) {
719 // We have deleted 10 keys from 40 using the compaction filter
720 // Keys 6-9 before the snapshot and 100-105 after the snapshot
721 ASSERT_EQ(count
, 30);
724 // Release the snapshot and compact again -> now all records should be
726 db_
->ReleaseSnapshot(snapshot
);
728 #endif // ROCKSDB_LITE
730 TEST_F(DBTestCompactionFilter
, SkipUntil
) {
731 Options options
= CurrentOptions();
732 options
.compaction_filter_factory
= std::make_shared
<SkipEvenFilterFactory
>();
733 options
.disable_auto_compactions
= true;
734 options
.create_if_missing
= true;
735 DestroyAndReopen(options
);
737 // Write 100K keys, these are written to a few files in L0.
738 for (int table
= 0; table
< 4; ++table
) {
739 // Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371].
740 for (int i
= table
* 6; i
< 39 + table
* 11; ++i
) {
742 snprintf(key
, sizeof(key
), "%010d", table
* 100 + i
);
743 Put(key
, std::to_string(table
* 1000 + i
));
749 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
750 // Number of skips in tables: 2, 3, 3, 3.
751 ASSERT_EQ(11, cfilter_skips
);
753 for (int table
= 0; table
< 4; ++table
) {
754 for (int i
= table
* 6; i
< 39 + table
* 11; ++i
) {
755 int k
= table
* 100 + i
;
757 snprintf(key
, sizeof(key
), "%010d", table
* 100 + i
);
758 auto expected
= std::to_string(table
* 1000 + i
);
760 Status s
= db_
->Get(ReadOptions(), key
, &val
);
761 if (k
/ 10 % 2 == 0) {
762 ASSERT_TRUE(s
.IsNotFound());
765 ASSERT_EQ(expected
, val
);
771 TEST_F(DBTestCompactionFilter
, SkipUntilWithBloomFilter
) {
772 BlockBasedTableOptions table_options
;
773 table_options
.whole_key_filtering
= false;
774 table_options
.filter_policy
.reset(NewBloomFilterPolicy(100, false));
776 Options options
= CurrentOptions();
777 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
778 options
.prefix_extractor
.reset(NewCappedPrefixTransform(9));
779 options
.compaction_filter_factory
= std::make_shared
<SkipEvenFilterFactory
>();
780 options
.disable_auto_compactions
= true;
781 options
.create_if_missing
= true;
782 DestroyAndReopen(options
);
784 Put("0000000010", "v10");
785 Put("0000000020", "v20"); // skipped
786 Put("0000000050", "v50");
790 EXPECT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
791 EXPECT_EQ(1, cfilter_skips
);
796 s
= db_
->Get(ReadOptions(), "0000000010", &val
);
798 EXPECT_EQ("v10", val
);
800 s
= db_
->Get(ReadOptions(), "0000000020", &val
);
801 EXPECT_TRUE(s
.IsNotFound());
803 s
= db_
->Get(ReadOptions(), "0000000050", &val
);
805 EXPECT_EQ("v50", val
);
808 class TestNotSupportedFilter
: public CompactionFilter
{
810 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& /*value*/,
811 std::string
* /*new_value*/,
812 bool* /*value_changed*/) const override
{
816 const char* Name() const override
{ return "NotSupported"; }
817 bool IgnoreSnapshots() const override
{ return false; }
820 TEST_F(DBTestCompactionFilter
, IgnoreSnapshotsFalse
) {
821 Options options
= CurrentOptions();
822 options
.compaction_filter
= new TestNotSupportedFilter();
823 DestroyAndReopen(options
);
833 // Comapction should fail because IgnoreSnapshots() = false
834 EXPECT_TRUE(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr)
837 delete options
.compaction_filter
;
840 } // namespace ROCKSDB_NAMESPACE
842 int main(int argc
, char** argv
) {
843 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
844 ::testing::InitGoogleTest(&argc
, argv
);
845 return RUN_ALL_TESTS();