]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_compaction_filter_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_compaction_filter_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12
13 namespace ROCKSDB_NAMESPACE {
14
15 static int cfilter_count = 0;
16 static int cfilter_skips = 0;
17
18 // This is a static filter used for filtering
19 // kvs during the compaction process.
20 static std::string NEW_VALUE = "NewValue";
21
22 class DBTestCompactionFilter : public DBTestBase {
23 public:
24 DBTestCompactionFilter()
25 : DBTestBase("/db_compaction_filter_test", /*env_do_fsync=*/true) {}
26 };
27
28 // Param variant of DBTestBase::ChangeCompactOptions
29 class DBTestCompactionFilterWithCompactParam
30 : public DBTestCompactionFilter,
31 public ::testing::WithParamInterface<DBTestBase::OptionConfig> {
32 public:
33 DBTestCompactionFilterWithCompactParam() : DBTestCompactionFilter() {
34 option_config_ = GetParam();
35 Destroy(last_options_);
36 auto options = CurrentOptions();
37 if (option_config_ == kDefault || option_config_ == kUniversalCompaction ||
38 option_config_ == kUniversalCompactionMultiLevel) {
39 options.create_if_missing = true;
40 }
41 if (option_config_ == kLevelSubcompactions ||
42 option_config_ == kUniversalSubcompactions) {
43 assert(options.max_subcompactions > 1);
44 }
45 TryReopen(options);
46 }
47 };
48
49 #ifndef ROCKSDB_VALGRIND_RUN
50 INSTANTIATE_TEST_CASE_P(
51 CompactionFilterWithOption, DBTestCompactionFilterWithCompactParam,
52 ::testing::Values(DBTestBase::OptionConfig::kDefault,
53 DBTestBase::OptionConfig::kUniversalCompaction,
54 DBTestBase::OptionConfig::kUniversalCompactionMultiLevel,
55 DBTestBase::OptionConfig::kLevelSubcompactions,
56 DBTestBase::OptionConfig::kUniversalSubcompactions));
57 #else
58 // Run fewer cases in valgrind
59 INSTANTIATE_TEST_CASE_P(CompactionFilterWithOption,
60 DBTestCompactionFilterWithCompactParam,
61 ::testing::Values(DBTestBase::OptionConfig::kDefault));
62 #endif // ROCKSDB_VALGRIND_RUN
63
64 class KeepFilter : public CompactionFilter {
65 public:
66 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
67 std::string* /*new_value*/,
68 bool* /*value_changed*/) const override {
69 cfilter_count++;
70 return false;
71 }
72
73 const char* Name() const override { return "KeepFilter"; }
74 };
75
76 class DeleteFilter : public CompactionFilter {
77 public:
78 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
79 std::string* /*new_value*/,
80 bool* /*value_changed*/) const override {
81 cfilter_count++;
82 return true;
83 }
84
85 const char* Name() const override { return "DeleteFilter"; }
86 };
87
88 class DeleteISFilter : public CompactionFilter {
89 public:
90 bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/,
91 std::string* /*new_value*/,
92 bool* /*value_changed*/) const override {
93 cfilter_count++;
94 int i = std::stoi(key.ToString());
95 if (i > 5 && i <= 105) {
96 return true;
97 }
98 return false;
99 }
100
101 bool IgnoreSnapshots() const override { return true; }
102
103 const char* Name() const override { return "DeleteFilter"; }
104 };
105
106 // Skip x if floor(x/10) is even, use range skips. Requires that keys are
107 // zero-padded to length 10.
108 class SkipEvenFilter : public CompactionFilter {
109 public:
110 Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/,
111 const Slice& /*existing_value*/, std::string* /*new_value*/,
112 std::string* skip_until) const override {
113 cfilter_count++;
114 int i = std::stoi(key.ToString());
115 if (i / 10 % 2 == 0) {
116 char key_str[100];
117 snprintf(key_str, sizeof(key_str), "%010d", i / 10 * 10 + 10);
118 *skip_until = key_str;
119 ++cfilter_skips;
120 return Decision::kRemoveAndSkipUntil;
121 }
122 return Decision::kKeep;
123 }
124
125 bool IgnoreSnapshots() const override { return true; }
126
127 const char* Name() const override { return "DeleteFilter"; }
128 };
129
130 class ConditionalFilter : public CompactionFilter {
131 public:
132 explicit ConditionalFilter(const std::string* filtered_value)
133 : filtered_value_(filtered_value) {}
134 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& value,
135 std::string* /*new_value*/,
136 bool* /*value_changed*/) const override {
137 return value.ToString() == *filtered_value_;
138 }
139
140 const char* Name() const override { return "ConditionalFilter"; }
141
142 private:
143 const std::string* filtered_value_;
144 };
145
146 class ChangeFilter : public CompactionFilter {
147 public:
148 explicit ChangeFilter() {}
149
150 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
151 std::string* new_value, bool* value_changed) const override {
152 assert(new_value != nullptr);
153 *new_value = NEW_VALUE;
154 *value_changed = true;
155 return false;
156 }
157
158 const char* Name() const override { return "ChangeFilter"; }
159 };
160
161 class KeepFilterFactory : public CompactionFilterFactory {
162 public:
163 explicit KeepFilterFactory(bool check_context = false,
164 bool check_context_cf_id = false)
165 : check_context_(check_context),
166 check_context_cf_id_(check_context_cf_id),
167 compaction_filter_created_(false) {}
168
169 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
170 const CompactionFilter::Context& context) override {
171 if (check_context_) {
172 EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
173 EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
174 }
175 if (check_context_cf_id_) {
176 EXPECT_EQ(expect_cf_id_.load(), context.column_family_id);
177 }
178 compaction_filter_created_ = true;
179 return std::unique_ptr<CompactionFilter>(new KeepFilter());
180 }
181
182 bool compaction_filter_created() const { return compaction_filter_created_; }
183
184 const char* Name() const override { return "KeepFilterFactory"; }
185 bool check_context_;
186 bool check_context_cf_id_;
187 std::atomic_bool expect_full_compaction_;
188 std::atomic_bool expect_manual_compaction_;
189 std::atomic<uint32_t> expect_cf_id_;
190 bool compaction_filter_created_;
191 };
192
193 class DeleteFilterFactory : public CompactionFilterFactory {
194 public:
195 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
196 const CompactionFilter::Context& context) override {
197 if (context.is_manual_compaction) {
198 return std::unique_ptr<CompactionFilter>(new DeleteFilter());
199 } else {
200 return std::unique_ptr<CompactionFilter>(nullptr);
201 }
202 }
203
204 const char* Name() const override { return "DeleteFilterFactory"; }
205 };
206
207 // Delete Filter Factory which ignores snapshots
208 class DeleteISFilterFactory : public CompactionFilterFactory {
209 public:
210 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
211 const CompactionFilter::Context& context) override {
212 if (context.is_manual_compaction) {
213 return std::unique_ptr<CompactionFilter>(new DeleteISFilter());
214 } else {
215 return std::unique_ptr<CompactionFilter>(nullptr);
216 }
217 }
218
219 const char* Name() const override { return "DeleteFilterFactory"; }
220 };
221
222 class SkipEvenFilterFactory : public CompactionFilterFactory {
223 public:
224 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
225 const CompactionFilter::Context& context) override {
226 if (context.is_manual_compaction) {
227 return std::unique_ptr<CompactionFilter>(new SkipEvenFilter());
228 } else {
229 return std::unique_ptr<CompactionFilter>(nullptr);
230 }
231 }
232
233 const char* Name() const override { return "SkipEvenFilterFactory"; }
234 };
235
236 class ConditionalFilterFactory : public CompactionFilterFactory {
237 public:
238 explicit ConditionalFilterFactory(const Slice& filtered_value)
239 : filtered_value_(filtered_value.ToString()) {}
240
241 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
242 const CompactionFilter::Context& /*context*/) override {
243 return std::unique_ptr<CompactionFilter>(
244 new ConditionalFilter(&filtered_value_));
245 }
246
247 const char* Name() const override { return "ConditionalFilterFactory"; }
248
249 private:
250 std::string filtered_value_;
251 };
252
253 class ChangeFilterFactory : public CompactionFilterFactory {
254 public:
255 explicit ChangeFilterFactory() {}
256
257 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
258 const CompactionFilter::Context& /*context*/) override {
259 return std::unique_ptr<CompactionFilter>(new ChangeFilter());
260 }
261
262 const char* Name() const override { return "ChangeFilterFactory"; }
263 };
264
265 #ifndef ROCKSDB_LITE
266 TEST_F(DBTestCompactionFilter, CompactionFilter) {
267 Options options = CurrentOptions();
268 options.max_open_files = -1;
269 options.num_levels = 3;
270 options.compaction_filter_factory = std::make_shared<KeepFilterFactory>();
271 options = CurrentOptions(options);
272 CreateAndReopenWithCF({"pikachu"}, options);
273
274 // Write 100K keys, these are written to a few files in L0.
275 const std::string value(10, 'x');
276 for (int i = 0; i < 100000; i++) {
277 char key[100];
278 snprintf(key, sizeof(key), "B%010d", i);
279 Put(1, key, value);
280 }
281 ASSERT_OK(Flush(1));
282
283 // Push all files to the highest level L2. Verify that
284 // the compaction is each level invokes the filter for
285 // all the keys in that level.
286 cfilter_count = 0;
287 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
288 ASSERT_EQ(cfilter_count, 100000);
289 cfilter_count = 0;
290 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
291 ASSERT_EQ(cfilter_count, 100000);
292
293 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
294 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
295 ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
296 cfilter_count = 0;
297
298 // All the files are in the lowest level.
299 // Verify that all but the 100001st record
300 // has sequence number zero. The 100001st record
301 // is at the tip of this snapshot and cannot
302 // be zeroed out.
303 int count = 0;
304 int total = 0;
305 Arena arena;
306 {
307 InternalKeyComparator icmp(options.comparator);
308 ReadRangeDelAggregator range_del_agg(&icmp,
309 kMaxSequenceNumber /* upper_bound */);
310 ReadOptions read_options;
311 ScopedArenaIterator iter(dbfull()->NewInternalIterator(
312 read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
313 iter->SeekToFirst();
314 ASSERT_OK(iter->status());
315 while (iter->Valid()) {
316 ParsedInternalKey ikey(Slice(), 0, kTypeValue);
317 ASSERT_OK(ParseInternalKey(iter->key(), &ikey, true /* log_err_key */));
318 total++;
319 if (ikey.sequence != 0) {
320 count++;
321 }
322 iter->Next();
323 }
324 }
325 ASSERT_EQ(total, 100000);
326 ASSERT_EQ(count, 0);
327
328 // overwrite all the 100K keys once again.
329 for (int i = 0; i < 100000; i++) {
330 char key[100];
331 snprintf(key, sizeof(key), "B%010d", i);
332 ASSERT_OK(Put(1, key, value));
333 }
334 ASSERT_OK(Flush(1));
335
336 // push all files to the highest level L2. This
337 // means that all keys should pass at least once
338 // via the compaction filter
339 cfilter_count = 0;
340 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
341 ASSERT_EQ(cfilter_count, 100000);
342 cfilter_count = 0;
343 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
344 ASSERT_EQ(cfilter_count, 100000);
345 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
346 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
347 ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
348
349 // create a new database with the compaction
350 // filter in such a way that it deletes all keys
351 options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
352 options.create_if_missing = true;
353 DestroyAndReopen(options);
354 CreateAndReopenWithCF({"pikachu"}, options);
355
356 // write all the keys once again.
357 for (int i = 0; i < 100000; i++) {
358 char key[100];
359 snprintf(key, sizeof(key), "B%010d", i);
360 ASSERT_OK(Put(1, key, value));
361 }
362 ASSERT_OK(Flush(1));
363 ASSERT_NE(NumTableFilesAtLevel(0, 1), 0);
364 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
365 ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0);
366
367 // Push all files to the highest level L2. This
368 // triggers the compaction filter to delete all keys,
369 // verify that at the end of the compaction process,
370 // nothing is left.
371 cfilter_count = 0;
372 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
373 ASSERT_EQ(cfilter_count, 100000);
374 cfilter_count = 0;
375 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
376 ASSERT_EQ(cfilter_count, 0);
377 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
378 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
379
380 {
381 // Scan the entire database to ensure that nothing is left
382 std::unique_ptr<Iterator> iter(
383 db_->NewIterator(ReadOptions(), handles_[1]));
384 iter->SeekToFirst();
385 count = 0;
386 while (iter->Valid()) {
387 count++;
388 iter->Next();
389 }
390 ASSERT_EQ(count, 0);
391 }
392
393 // The sequence number of the remaining record
394 // is not zeroed out even though it is at the
395 // level Lmax because this record is at the tip
396 count = 0;
397 {
398 InternalKeyComparator icmp(options.comparator);
399 ReadRangeDelAggregator range_del_agg(&icmp,
400 kMaxSequenceNumber /* upper_bound */);
401 ReadOptions read_options;
402 ScopedArenaIterator iter(dbfull()->NewInternalIterator(
403 read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
404 iter->SeekToFirst();
405 ASSERT_OK(iter->status());
406 while (iter->Valid()) {
407 ParsedInternalKey ikey(Slice(), 0, kTypeValue);
408 ASSERT_OK(ParseInternalKey(iter->key(), &ikey, true /* log_err_key */));
409 ASSERT_NE(ikey.sequence, (unsigned)0);
410 count++;
411 iter->Next();
412 }
413 ASSERT_EQ(count, 0);
414 }
415 }
416
417 // Tests the edge case where compaction does not produce any output -- all
418 // entries are deleted. The compaction should create bunch of 'DeleteFile'
419 // entries in VersionEdit, but none of the 'AddFile's.
420 TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) {
421 Options options = CurrentOptions();
422 options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
423 options.disable_auto_compactions = true;
424 options.create_if_missing = true;
425 DestroyAndReopen(options);
426
427 // put some data
428 for (int table = 0; table < 4; ++table) {
429 for (int i = 0; i < 10 + table; ++i) {
430 Put(ToString(table * 100 + i), "val");
431 }
432 Flush();
433 }
434
435 // this will produce empty file (delete compaction filter)
436 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
437 ASSERT_EQ(0U, CountLiveFiles());
438
439 Reopen(options);
440
441 Iterator* itr = db_->NewIterator(ReadOptions());
442 itr->SeekToFirst();
443 // empty db
444 ASSERT_TRUE(!itr->Valid());
445
446 delete itr;
447 }
448 #endif // ROCKSDB_LITE
449
450 TEST_P(DBTestCompactionFilterWithCompactParam,
451 CompactionFilterWithValueChange) {
452 Options options = CurrentOptions();
453 options.num_levels = 3;
454 options.compaction_filter_factory = std::make_shared<ChangeFilterFactory>();
455 CreateAndReopenWithCF({"pikachu"}, options);
456
457 // Write 100K+1 keys, these are written to a few files
458 // in L0. We do this so that the current snapshot points
459 // to the 100001 key.The compaction filter is not invoked
460 // on keys that are visible via a snapshot because we
461 // anyways cannot delete it.
462 const std::string value(10, 'x');
463 for (int i = 0; i < 100001; i++) {
464 char key[100];
465 snprintf(key, sizeof(key), "B%010d", i);
466 Put(1, key, value);
467 }
468
469 // push all files to lower levels
470 ASSERT_OK(Flush(1));
471 if (option_config_ != kUniversalCompactionMultiLevel &&
472 option_config_ != kUniversalSubcompactions) {
473 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
474 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
475 } else {
476 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
477 nullptr);
478 }
479
480 // re-write all data again
481 for (int i = 0; i < 100001; i++) {
482 char key[100];
483 snprintf(key, sizeof(key), "B%010d", i);
484 Put(1, key, value);
485 }
486
487 // push all files to lower levels. This should
488 // invoke the compaction filter for all 100000 keys.
489 ASSERT_OK(Flush(1));
490 if (option_config_ != kUniversalCompactionMultiLevel &&
491 option_config_ != kUniversalSubcompactions) {
492 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
493 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
494 } else {
495 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
496 nullptr);
497 }
498
499 // verify that all keys now have the new value that
500 // was set by the compaction process.
501 for (int i = 0; i < 100001; i++) {
502 char key[100];
503 snprintf(key, sizeof(key), "B%010d", i);
504 std::string newvalue = Get(1, key);
505 ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
506 }
507 }
508
509 TEST_F(DBTestCompactionFilter, CompactionFilterWithMergeOperator) {
510 std::string one, two, three, four;
511 PutFixed64(&one, 1);
512 PutFixed64(&two, 2);
513 PutFixed64(&three, 3);
514 PutFixed64(&four, 4);
515
516 Options options = CurrentOptions();
517 options.create_if_missing = true;
518 options.merge_operator = MergeOperators::CreateUInt64AddOperator();
519 options.num_levels = 3;
520 // Filter out keys with value is 2.
521 options.compaction_filter_factory =
522 std::make_shared<ConditionalFilterFactory>(two);
523 DestroyAndReopen(options);
524
525 // In the same compaction, a value type needs to be deleted based on
526 // compaction filter, and there is a merge type for the key. compaction
527 // filter result is ignored.
528 ASSERT_OK(db_->Put(WriteOptions(), "foo", two));
529 ASSERT_OK(Flush());
530 ASSERT_OK(db_->Merge(WriteOptions(), "foo", one));
531 ASSERT_OK(Flush());
532 std::string newvalue = Get("foo");
533 ASSERT_EQ(newvalue, three);
534 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
535 newvalue = Get("foo");
536 ASSERT_EQ(newvalue, three);
537
538 // value key can be deleted based on compaction filter, leaving only
539 // merge keys.
540 ASSERT_OK(db_->Put(WriteOptions(), "bar", two));
541 ASSERT_OK(Flush());
542 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
543 newvalue = Get("bar");
544 ASSERT_EQ("NOT_FOUND", newvalue);
545 ASSERT_OK(db_->Merge(WriteOptions(), "bar", two));
546 ASSERT_OK(Flush());
547 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
548 newvalue = Get("bar");
549 ASSERT_EQ(two, two);
550
551 // Compaction filter never applies to merge keys.
552 ASSERT_OK(db_->Put(WriteOptions(), "foobar", one));
553 ASSERT_OK(Flush());
554 ASSERT_OK(db_->Merge(WriteOptions(), "foobar", two));
555 ASSERT_OK(Flush());
556 newvalue = Get("foobar");
557 ASSERT_EQ(newvalue, three);
558 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
559 newvalue = Get("foobar");
560 ASSERT_EQ(newvalue, three);
561
562 // In the same compaction, both of value type and merge type keys need to be
563 // deleted based on compaction filter, and there is a merge type for the key.
564 // For both keys, compaction filter results are ignored.
565 ASSERT_OK(db_->Put(WriteOptions(), "barfoo", two));
566 ASSERT_OK(Flush());
567 ASSERT_OK(db_->Merge(WriteOptions(), "barfoo", two));
568 ASSERT_OK(Flush());
569 newvalue = Get("barfoo");
570 ASSERT_EQ(newvalue, four);
571 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
572 newvalue = Get("barfoo");
573 ASSERT_EQ(newvalue, four);
574 }
575
576 #ifndef ROCKSDB_LITE
577 TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
578 KeepFilterFactory* filter = new KeepFilterFactory(true, true);
579
580 Options options = CurrentOptions();
581 options.compaction_style = kCompactionStyleUniversal;
582 options.compaction_filter_factory.reset(filter);
583 options.compression = kNoCompression;
584 options.level0_file_num_compaction_trigger = 8;
585 Reopen(options);
586 int num_keys_per_file = 400;
587 for (int j = 0; j < 3; j++) {
588 // Write several keys.
589 const std::string value(10, 'x');
590 for (int i = 0; i < num_keys_per_file; i++) {
591 char key[100];
592 snprintf(key, sizeof(key), "B%08d%02d", i, j);
593 Put(key, value);
594 }
595 dbfull()->TEST_FlushMemTable();
596 // Make sure next file is much smaller so automatic compaction will not
597 // be triggered.
598 num_keys_per_file /= 2;
599 }
600 dbfull()->TEST_WaitForCompact();
601
602 // Force a manual compaction
603 cfilter_count = 0;
604 filter->expect_manual_compaction_.store(true);
605 filter->expect_full_compaction_.store(true);
606 filter->expect_cf_id_.store(0);
607 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
608 ASSERT_EQ(cfilter_count, 700);
609 ASSERT_EQ(NumSortedRuns(0), 1);
610 ASSERT_TRUE(filter->compaction_filter_created());
611
612 // Verify total number of keys is correct after manual compaction.
613 {
614 int count = 0;
615 int total = 0;
616 Arena arena;
617 InternalKeyComparator icmp(options.comparator);
618 ReadRangeDelAggregator range_del_agg(&icmp,
619 kMaxSequenceNumber /* snapshots */);
620 ReadOptions read_options;
621 ScopedArenaIterator iter(dbfull()->NewInternalIterator(
622 read_options, &arena, &range_del_agg, kMaxSequenceNumber));
623 iter->SeekToFirst();
624 ASSERT_OK(iter->status());
625 while (iter->Valid()) {
626 ParsedInternalKey ikey(Slice(), 0, kTypeValue);
627 ASSERT_OK(ParseInternalKey(iter->key(), &ikey, true /* log_err_key */));
628 total++;
629 if (ikey.sequence != 0) {
630 count++;
631 }
632 iter->Next();
633 }
634 ASSERT_EQ(total, 700);
635 ASSERT_EQ(count, 0);
636 }
637 }
638 #endif // ROCKSDB_LITE
639
640 TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) {
641 KeepFilterFactory* filter = new KeepFilterFactory(false, true);
642 filter->expect_cf_id_.store(1);
643
644 Options options = CurrentOptions();
645 options.compaction_filter_factory.reset(filter);
646 options.compression = kNoCompression;
647 options.level0_file_num_compaction_trigger = 2;
648 CreateAndReopenWithCF({"pikachu"}, options);
649
650 int num_keys_per_file = 400;
651 for (int j = 0; j < 3; j++) {
652 // Write several keys.
653 const std::string value(10, 'x');
654 for (int i = 0; i < num_keys_per_file; i++) {
655 char key[100];
656 snprintf(key, sizeof(key), "B%08d%02d", i, j);
657 Put(1, key, value);
658 }
659 Flush(1);
660 // Make sure next file is much smaller so automatic compaction will not
661 // be triggered.
662 num_keys_per_file /= 2;
663 }
664 dbfull()->TEST_WaitForCompact();
665
666 ASSERT_TRUE(filter->compaction_filter_created());
667 }
668
669 #ifndef ROCKSDB_LITE
670 // Compaction filters aplies to all records, regardless snapshots.
671 TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
672 std::string five = ToString(5);
673 Options options = CurrentOptions();
674 options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>();
675 options.disable_auto_compactions = true;
676 options.create_if_missing = true;
677 DestroyAndReopen(options);
678
679 // Put some data.
680 const Snapshot* snapshot = nullptr;
681 for (int table = 0; table < 4; ++table) {
682 for (int i = 0; i < 10; ++i) {
683 Put(ToString(table * 100 + i), "val");
684 }
685 Flush();
686
687 if (table == 0) {
688 snapshot = db_->GetSnapshot();
689 }
690 }
691 assert(snapshot != nullptr);
692
693 cfilter_count = 0;
694 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
695 // The filter should delete 40 records.
696 ASSERT_EQ(40, cfilter_count);
697
698 {
699 // Scan the entire database as of the snapshot to ensure
700 // that nothing is left
701 ReadOptions read_options;
702 read_options.snapshot = snapshot;
703 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
704 iter->SeekToFirst();
705 int count = 0;
706 while (iter->Valid()) {
707 count++;
708 iter->Next();
709 }
710 ASSERT_EQ(count, 6);
711 read_options.snapshot = nullptr;
712 std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options));
713 iter1->SeekToFirst();
714 count = 0;
715 while (iter1->Valid()) {
716 count++;
717 iter1->Next();
718 }
719 // We have deleted 10 keys from 40 using the compaction filter
720 // Keys 6-9 before the snapshot and 100-105 after the snapshot
721 ASSERT_EQ(count, 30);
722 }
723
724 // Release the snapshot and compact again -> now all records should be
725 // removed.
726 db_->ReleaseSnapshot(snapshot);
727 }
728 #endif // ROCKSDB_LITE
729
730 TEST_F(DBTestCompactionFilter, SkipUntil) {
731 Options options = CurrentOptions();
732 options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
733 options.disable_auto_compactions = true;
734 options.create_if_missing = true;
735 DestroyAndReopen(options);
736
737 // Write 100K keys, these are written to a few files in L0.
738 for (int table = 0; table < 4; ++table) {
739 // Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371].
740 for (int i = table * 6; i < 39 + table * 11; ++i) {
741 char key[100];
742 snprintf(key, sizeof(key), "%010d", table * 100 + i);
743 Put(key, std::to_string(table * 1000 + i));
744 }
745 Flush();
746 }
747
748 cfilter_skips = 0;
749 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
750 // Number of skips in tables: 2, 3, 3, 3.
751 ASSERT_EQ(11, cfilter_skips);
752
753 for (int table = 0; table < 4; ++table) {
754 for (int i = table * 6; i < 39 + table * 11; ++i) {
755 int k = table * 100 + i;
756 char key[100];
757 snprintf(key, sizeof(key), "%010d", table * 100 + i);
758 auto expected = std::to_string(table * 1000 + i);
759 std::string val;
760 Status s = db_->Get(ReadOptions(), key, &val);
761 if (k / 10 % 2 == 0) {
762 ASSERT_TRUE(s.IsNotFound());
763 } else {
764 ASSERT_OK(s);
765 ASSERT_EQ(expected, val);
766 }
767 }
768 }
769 }
770
771 TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) {
772 BlockBasedTableOptions table_options;
773 table_options.whole_key_filtering = false;
774 table_options.filter_policy.reset(NewBloomFilterPolicy(100, false));
775
776 Options options = CurrentOptions();
777 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
778 options.prefix_extractor.reset(NewCappedPrefixTransform(9));
779 options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
780 options.disable_auto_compactions = true;
781 options.create_if_missing = true;
782 DestroyAndReopen(options);
783
784 Put("0000000010", "v10");
785 Put("0000000020", "v20"); // skipped
786 Put("0000000050", "v50");
787 Flush();
788
789 cfilter_skips = 0;
790 EXPECT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
791 EXPECT_EQ(1, cfilter_skips);
792
793 Status s;
794 std::string val;
795
796 s = db_->Get(ReadOptions(), "0000000010", &val);
797 ASSERT_OK(s);
798 EXPECT_EQ("v10", val);
799
800 s = db_->Get(ReadOptions(), "0000000020", &val);
801 EXPECT_TRUE(s.IsNotFound());
802
803 s = db_->Get(ReadOptions(), "0000000050", &val);
804 ASSERT_OK(s);
805 EXPECT_EQ("v50", val);
806 }
807
808 class TestNotSupportedFilter : public CompactionFilter {
809 public:
810 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
811 std::string* /*new_value*/,
812 bool* /*value_changed*/) const override {
813 return true;
814 }
815
816 const char* Name() const override { return "NotSupported"; }
817 bool IgnoreSnapshots() const override { return false; }
818 };
819
820 TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalse) {
821 Options options = CurrentOptions();
822 options.compaction_filter = new TestNotSupportedFilter();
823 DestroyAndReopen(options);
824
825 Put("a", "v10");
826 Put("z", "v20");
827 Flush();
828
829 Put("a", "v10");
830 Put("z", "v20");
831 Flush();
832
833 // Comapction should fail because IgnoreSnapshots() = false
834 EXPECT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
835 .IsNotSupported());
836
837 delete options.compaction_filter;
838 }
839
840 } // namespace ROCKSDB_NAMESPACE
841
842 int main(int argc, char** argv) {
843 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
844 ::testing::InitGoogleTest(&argc, argv);
845 return RUN_ALL_TESTS();
846 }