]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_compaction_filter_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / db_compaction_filter_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12
13 namespace rocksdb {
14
15 static int cfilter_count = 0;
16 static int cfilter_skips = 0;
17
18 // This is a static filter used for filtering
19 // kvs during the compaction process.
20 static std::string NEW_VALUE = "NewValue";
21
22 class DBTestCompactionFilter : public DBTestBase {
23 public:
24 DBTestCompactionFilter() : DBTestBase("/db_compaction_filter_test") {}
25 };
26
27 // Param variant of DBTestBase::ChangeCompactOptions
28 class DBTestCompactionFilterWithCompactParam
29 : public DBTestCompactionFilter,
30 public ::testing::WithParamInterface<DBTestBase::OptionConfig> {
31 public:
32 DBTestCompactionFilterWithCompactParam() : DBTestCompactionFilter() {
33 option_config_ = GetParam();
34 Destroy(last_options_);
35 auto options = CurrentOptions();
36 if (option_config_ == kDefault || option_config_ == kUniversalCompaction ||
37 option_config_ == kUniversalCompactionMultiLevel) {
38 options.create_if_missing = true;
39 }
40 if (option_config_ == kLevelSubcompactions ||
41 option_config_ == kUniversalSubcompactions) {
42 assert(options.max_subcompactions > 1);
43 }
44 TryReopen(options);
45 }
46 };
47
48 #ifndef ROCKSDB_VALGRIND_RUN
49 INSTANTIATE_TEST_CASE_P(
50 DBTestCompactionFilterWithCompactOption,
51 DBTestCompactionFilterWithCompactParam,
52 ::testing::Values(DBTestBase::OptionConfig::kDefault,
53 DBTestBase::OptionConfig::kUniversalCompaction,
54 DBTestBase::OptionConfig::kUniversalCompactionMultiLevel,
55 DBTestBase::OptionConfig::kLevelSubcompactions,
56 DBTestBase::OptionConfig::kUniversalSubcompactions));
57 #else
58 // Run fewer cases in valgrind
59 INSTANTIATE_TEST_CASE_P(DBTestCompactionFilterWithCompactOption,
60 DBTestCompactionFilterWithCompactParam,
61 ::testing::Values(DBTestBase::OptionConfig::kDefault));
62 #endif // ROCKSDB_VALGRIND_RUN
63
64 class KeepFilter : public CompactionFilter {
65 public:
66 virtual bool Filter(int /*level*/, const Slice& /*key*/,
67 const Slice& /*value*/, std::string* /*new_value*/,
68 bool* /*value_changed*/) const override {
69 cfilter_count++;
70 return false;
71 }
72
73 virtual const char* Name() const override { return "KeepFilter"; }
74 };
75
76 class DeleteFilter : public CompactionFilter {
77 public:
78 virtual bool Filter(int /*level*/, const Slice& /*key*/,
79 const Slice& /*value*/, std::string* /*new_value*/,
80 bool* /*value_changed*/) const override {
81 cfilter_count++;
82 return true;
83 }
84
85 virtual const char* Name() const override { return "DeleteFilter"; }
86 };
87
88 class DeleteISFilter : public CompactionFilter {
89 public:
90 virtual bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/,
91 std::string* /*new_value*/,
92 bool* /*value_changed*/) const override {
93 cfilter_count++;
94 int i = std::stoi(key.ToString());
95 if (i > 5 && i <= 105) {
96 return true;
97 }
98 return false;
99 }
100
101 virtual bool IgnoreSnapshots() const override { return true; }
102
103 virtual const char* Name() const override { return "DeleteFilter"; }
104 };
105
106 // Skip x if floor(x/10) is even, use range skips. Requires that keys are
107 // zero-padded to length 10.
108 class SkipEvenFilter : public CompactionFilter {
109 public:
110 virtual Decision FilterV2(int /*level*/, const Slice& key,
111 ValueType /*value_type*/,
112 const Slice& /*existing_value*/,
113 std::string* /*new_value*/,
114 std::string* skip_until) const override {
115 cfilter_count++;
116 int i = std::stoi(key.ToString());
117 if (i / 10 % 2 == 0) {
118 char key_str[100];
119 snprintf(key_str, sizeof(key_str), "%010d", i / 10 * 10 + 10);
120 *skip_until = key_str;
121 ++cfilter_skips;
122 return Decision::kRemoveAndSkipUntil;
123 }
124 return Decision::kKeep;
125 }
126
127 virtual bool IgnoreSnapshots() const override { return true; }
128
129 virtual const char* Name() const override { return "DeleteFilter"; }
130 };
131
132 class DelayFilter : public CompactionFilter {
133 public:
134 explicit DelayFilter(DBTestBase* d) : db_test(d) {}
135 virtual bool Filter(int /*level*/, const Slice& /*key*/,
136 const Slice& /*value*/, std::string* /*new_value*/,
137 bool* /*value_changed*/) const override {
138 db_test->env_->addon_time_.fetch_add(1000);
139 return true;
140 }
141
142 virtual const char* Name() const override { return "DelayFilter"; }
143
144 private:
145 DBTestBase* db_test;
146 };
147
148 class ConditionalFilter : public CompactionFilter {
149 public:
150 explicit ConditionalFilter(const std::string* filtered_value)
151 : filtered_value_(filtered_value) {}
152 virtual bool Filter(int /*level*/, const Slice& /*key*/, const Slice& value,
153 std::string* /*new_value*/,
154 bool* /*value_changed*/) const override {
155 return value.ToString() == *filtered_value_;
156 }
157
158 virtual const char* Name() const override { return "ConditionalFilter"; }
159
160 private:
161 const std::string* filtered_value_;
162 };
163
164 class ChangeFilter : public CompactionFilter {
165 public:
166 explicit ChangeFilter() {}
167
168 virtual bool Filter(int /*level*/, const Slice& /*key*/,
169 const Slice& /*value*/, std::string* new_value,
170 bool* value_changed) const override {
171 assert(new_value != nullptr);
172 *new_value = NEW_VALUE;
173 *value_changed = true;
174 return false;
175 }
176
177 virtual const char* Name() const override { return "ChangeFilter"; }
178 };
179
180 class KeepFilterFactory : public CompactionFilterFactory {
181 public:
182 explicit KeepFilterFactory(bool check_context = false,
183 bool check_context_cf_id = false)
184 : check_context_(check_context),
185 check_context_cf_id_(check_context_cf_id),
186 compaction_filter_created_(false) {}
187
188 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
189 const CompactionFilter::Context& context) override {
190 if (check_context_) {
191 EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
192 EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
193 }
194 if (check_context_cf_id_) {
195 EXPECT_EQ(expect_cf_id_.load(), context.column_family_id);
196 }
197 compaction_filter_created_ = true;
198 return std::unique_ptr<CompactionFilter>(new KeepFilter());
199 }
200
201 bool compaction_filter_created() const { return compaction_filter_created_; }
202
203 virtual const char* Name() const override { return "KeepFilterFactory"; }
204 bool check_context_;
205 bool check_context_cf_id_;
206 std::atomic_bool expect_full_compaction_;
207 std::atomic_bool expect_manual_compaction_;
208 std::atomic<uint32_t> expect_cf_id_;
209 bool compaction_filter_created_;
210 };
211
212 class DeleteFilterFactory : public CompactionFilterFactory {
213 public:
214 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
215 const CompactionFilter::Context& context) override {
216 if (context.is_manual_compaction) {
217 return std::unique_ptr<CompactionFilter>(new DeleteFilter());
218 } else {
219 return std::unique_ptr<CompactionFilter>(nullptr);
220 }
221 }
222
223 virtual const char* Name() const override { return "DeleteFilterFactory"; }
224 };
225
226 // Delete Filter Factory which ignores snapshots
227 class DeleteISFilterFactory : public CompactionFilterFactory {
228 public:
229 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
230 const CompactionFilter::Context& context) override {
231 if (context.is_manual_compaction) {
232 return std::unique_ptr<CompactionFilter>(new DeleteISFilter());
233 } else {
234 return std::unique_ptr<CompactionFilter>(nullptr);
235 }
236 }
237
238 virtual const char* Name() const override { return "DeleteFilterFactory"; }
239 };
240
241 class SkipEvenFilterFactory : public CompactionFilterFactory {
242 public:
243 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
244 const CompactionFilter::Context& context) override {
245 if (context.is_manual_compaction) {
246 return std::unique_ptr<CompactionFilter>(new SkipEvenFilter());
247 } else {
248 return std::unique_ptr<CompactionFilter>(nullptr);
249 }
250 }
251
252 virtual const char* Name() const override { return "SkipEvenFilterFactory"; }
253 };
254
255 class DelayFilterFactory : public CompactionFilterFactory {
256 public:
257 explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
258 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
259 const CompactionFilter::Context& /*context*/) override {
260 return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
261 }
262
263 virtual const char* Name() const override { return "DelayFilterFactory"; }
264
265 private:
266 DBTestBase* db_test;
267 };
268
269 class ConditionalFilterFactory : public CompactionFilterFactory {
270 public:
271 explicit ConditionalFilterFactory(const Slice& filtered_value)
272 : filtered_value_(filtered_value.ToString()) {}
273
274 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
275 const CompactionFilter::Context& /*context*/) override {
276 return std::unique_ptr<CompactionFilter>(
277 new ConditionalFilter(&filtered_value_));
278 }
279
280 virtual const char* Name() const override {
281 return "ConditionalFilterFactory";
282 }
283
284 private:
285 std::string filtered_value_;
286 };
287
288 class ChangeFilterFactory : public CompactionFilterFactory {
289 public:
290 explicit ChangeFilterFactory() {}
291
292 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
293 const CompactionFilter::Context& /*context*/) override {
294 return std::unique_ptr<CompactionFilter>(new ChangeFilter());
295 }
296
297 virtual const char* Name() const override { return "ChangeFilterFactory"; }
298 };
299
300 #ifndef ROCKSDB_LITE
301 TEST_F(DBTestCompactionFilter, CompactionFilter) {
302 Options options = CurrentOptions();
303 options.max_open_files = -1;
304 options.num_levels = 3;
305 options.compaction_filter_factory = std::make_shared<KeepFilterFactory>();
306 options = CurrentOptions(options);
307 CreateAndReopenWithCF({"pikachu"}, options);
308
309 // Write 100K keys, these are written to a few files in L0.
310 const std::string value(10, 'x');
311 for (int i = 0; i < 100000; i++) {
312 char key[100];
313 snprintf(key, sizeof(key), "B%010d", i);
314 Put(1, key, value);
315 }
316 ASSERT_OK(Flush(1));
317
318 // Push all files to the highest level L2. Verify that
319 // the compaction is each level invokes the filter for
320 // all the keys in that level.
321 cfilter_count = 0;
322 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
323 ASSERT_EQ(cfilter_count, 100000);
324 cfilter_count = 0;
325 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
326 ASSERT_EQ(cfilter_count, 100000);
327
328 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
329 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
330 ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
331 cfilter_count = 0;
332
333 // All the files are in the lowest level.
334 // Verify that all but the 100001st record
335 // has sequence number zero. The 100001st record
336 // is at the tip of this snapshot and cannot
337 // be zeroed out.
338 int count = 0;
339 int total = 0;
340 Arena arena;
341 {
342 InternalKeyComparator icmp(options.comparator);
343 RangeDelAggregator range_del_agg(icmp, {} /* snapshots */);
344 ScopedArenaIterator iter(
345 dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1]));
346 iter->SeekToFirst();
347 ASSERT_OK(iter->status());
348 while (iter->Valid()) {
349 ParsedInternalKey ikey(Slice(), 0, kTypeValue);
350 ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
351 total++;
352 if (ikey.sequence != 0) {
353 count++;
354 }
355 iter->Next();
356 }
357 }
358 ASSERT_EQ(total, 100000);
359 ASSERT_EQ(count, 1);
360
361 // overwrite all the 100K keys once again.
362 for (int i = 0; i < 100000; i++) {
363 char key[100];
364 snprintf(key, sizeof(key), "B%010d", i);
365 ASSERT_OK(Put(1, key, value));
366 }
367 ASSERT_OK(Flush(1));
368
369 // push all files to the highest level L2. This
370 // means that all keys should pass at least once
371 // via the compaction filter
372 cfilter_count = 0;
373 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
374 ASSERT_EQ(cfilter_count, 100000);
375 cfilter_count = 0;
376 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
377 ASSERT_EQ(cfilter_count, 100000);
378 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
379 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
380 ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
381
382 // create a new database with the compaction
383 // filter in such a way that it deletes all keys
384 options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
385 options.create_if_missing = true;
386 DestroyAndReopen(options);
387 CreateAndReopenWithCF({"pikachu"}, options);
388
389 // write all the keys once again.
390 for (int i = 0; i < 100000; i++) {
391 char key[100];
392 snprintf(key, sizeof(key), "B%010d", i);
393 ASSERT_OK(Put(1, key, value));
394 }
395 ASSERT_OK(Flush(1));
396 ASSERT_NE(NumTableFilesAtLevel(0, 1), 0);
397 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
398 ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0);
399
400 // Push all files to the highest level L2. This
401 // triggers the compaction filter to delete all keys,
402 // verify that at the end of the compaction process,
403 // nothing is left.
404 cfilter_count = 0;
405 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
406 ASSERT_EQ(cfilter_count, 100000);
407 cfilter_count = 0;
408 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
409 ASSERT_EQ(cfilter_count, 0);
410 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
411 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
412
413 {
414 // Scan the entire database to ensure that nothing is left
415 std::unique_ptr<Iterator> iter(
416 db_->NewIterator(ReadOptions(), handles_[1]));
417 iter->SeekToFirst();
418 count = 0;
419 while (iter->Valid()) {
420 count++;
421 iter->Next();
422 }
423 ASSERT_EQ(count, 0);
424 }
425
426 // The sequence number of the remaining record
427 // is not zeroed out even though it is at the
428 // level Lmax because this record is at the tip
429 count = 0;
430 {
431 InternalKeyComparator icmp(options.comparator);
432 RangeDelAggregator range_del_agg(icmp, {} /* snapshots */);
433 ScopedArenaIterator iter(
434 dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1]));
435 iter->SeekToFirst();
436 ASSERT_OK(iter->status());
437 while (iter->Valid()) {
438 ParsedInternalKey ikey(Slice(), 0, kTypeValue);
439 ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
440 ASSERT_NE(ikey.sequence, (unsigned)0);
441 count++;
442 iter->Next();
443 }
444 ASSERT_EQ(count, 0);
445 }
446 }
447
448 // Tests the edge case where compaction does not produce any output -- all
449 // entries are deleted. The compaction should create bunch of 'DeleteFile'
450 // entries in VersionEdit, but none of the 'AddFile's.
451 TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) {
452 Options options = CurrentOptions();
453 options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
454 options.disable_auto_compactions = true;
455 options.create_if_missing = true;
456 DestroyAndReopen(options);
457
458 // put some data
459 for (int table = 0; table < 4; ++table) {
460 for (int i = 0; i < 10 + table; ++i) {
461 Put(ToString(table * 100 + i), "val");
462 }
463 Flush();
464 }
465
466 // this will produce empty file (delete compaction filter)
467 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
468 ASSERT_EQ(0U, CountLiveFiles());
469
470 Reopen(options);
471
472 Iterator* itr = db_->NewIterator(ReadOptions());
473 itr->SeekToFirst();
474 // empty db
475 ASSERT_TRUE(!itr->Valid());
476
477 delete itr;
478 }
479 #endif // ROCKSDB_LITE
480
481 TEST_P(DBTestCompactionFilterWithCompactParam,
482 CompactionFilterWithValueChange) {
483 Options options = CurrentOptions();
484 options.num_levels = 3;
485 options.compaction_filter_factory = std::make_shared<ChangeFilterFactory>();
486 CreateAndReopenWithCF({"pikachu"}, options);
487
488 // Write 100K+1 keys, these are written to a few files
489 // in L0. We do this so that the current snapshot points
490 // to the 100001 key.The compaction filter is not invoked
491 // on keys that are visible via a snapshot because we
492 // anyways cannot delete it.
493 const std::string value(10, 'x');
494 for (int i = 0; i < 100001; i++) {
495 char key[100];
496 snprintf(key, sizeof(key), "B%010d", i);
497 Put(1, key, value);
498 }
499
500 // push all files to lower levels
501 ASSERT_OK(Flush(1));
502 if (option_config_ != kUniversalCompactionMultiLevel &&
503 option_config_ != kUniversalSubcompactions) {
504 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
505 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
506 } else {
507 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
508 nullptr);
509 }
510
511 // re-write all data again
512 for (int i = 0; i < 100001; i++) {
513 char key[100];
514 snprintf(key, sizeof(key), "B%010d", i);
515 Put(1, key, value);
516 }
517
518 // push all files to lower levels. This should
519 // invoke the compaction filter for all 100000 keys.
520 ASSERT_OK(Flush(1));
521 if (option_config_ != kUniversalCompactionMultiLevel &&
522 option_config_ != kUniversalSubcompactions) {
523 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
524 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
525 } else {
526 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
527 nullptr);
528 }
529
530 // verify that all keys now have the new value that
531 // was set by the compaction process.
532 for (int i = 0; i < 100001; i++) {
533 char key[100];
534 snprintf(key, sizeof(key), "B%010d", i);
535 std::string newvalue = Get(1, key);
536 ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
537 }
538 }
539
540 TEST_F(DBTestCompactionFilter, CompactionFilterWithMergeOperator) {
541 std::string one, two, three, four;
542 PutFixed64(&one, 1);
543 PutFixed64(&two, 2);
544 PutFixed64(&three, 3);
545 PutFixed64(&four, 4);
546
547 Options options = CurrentOptions();
548 options.create_if_missing = true;
549 options.merge_operator = MergeOperators::CreateUInt64AddOperator();
550 options.num_levels = 3;
551 // Filter out keys with value is 2.
552 options.compaction_filter_factory =
553 std::make_shared<ConditionalFilterFactory>(two);
554 DestroyAndReopen(options);
555
556 // In the same compaction, a value type needs to be deleted based on
557 // compaction filter, and there is a merge type for the key. compaction
558 // filter result is ignored.
559 ASSERT_OK(db_->Put(WriteOptions(), "foo", two));
560 ASSERT_OK(Flush());
561 ASSERT_OK(db_->Merge(WriteOptions(), "foo", one));
562 ASSERT_OK(Flush());
563 std::string newvalue = Get("foo");
564 ASSERT_EQ(newvalue, three);
565 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
566 newvalue = Get("foo");
567 ASSERT_EQ(newvalue, three);
568
569 // value key can be deleted based on compaction filter, leaving only
570 // merge keys.
571 ASSERT_OK(db_->Put(WriteOptions(), "bar", two));
572 ASSERT_OK(Flush());
573 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
574 newvalue = Get("bar");
575 ASSERT_EQ("NOT_FOUND", newvalue);
576 ASSERT_OK(db_->Merge(WriteOptions(), "bar", two));
577 ASSERT_OK(Flush());
578 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
579 newvalue = Get("bar");
580 ASSERT_EQ(two, two);
581
582 // Compaction filter never applies to merge keys.
583 ASSERT_OK(db_->Put(WriteOptions(), "foobar", one));
584 ASSERT_OK(Flush());
585 ASSERT_OK(db_->Merge(WriteOptions(), "foobar", two));
586 ASSERT_OK(Flush());
587 newvalue = Get("foobar");
588 ASSERT_EQ(newvalue, three);
589 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
590 newvalue = Get("foobar");
591 ASSERT_EQ(newvalue, three);
592
593 // In the same compaction, both of value type and merge type keys need to be
594 // deleted based on compaction filter, and there is a merge type for the key.
595 // For both keys, compaction filter results are ignored.
596 ASSERT_OK(db_->Put(WriteOptions(), "barfoo", two));
597 ASSERT_OK(Flush());
598 ASSERT_OK(db_->Merge(WriteOptions(), "barfoo", two));
599 ASSERT_OK(Flush());
600 newvalue = Get("barfoo");
601 ASSERT_EQ(newvalue, four);
602 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
603 newvalue = Get("barfoo");
604 ASSERT_EQ(newvalue, four);
605 }
606
607 #ifndef ROCKSDB_LITE
608 TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
609 KeepFilterFactory* filter = new KeepFilterFactory(true, true);
610
611 Options options = CurrentOptions();
612 options.compaction_style = kCompactionStyleUniversal;
613 options.compaction_filter_factory.reset(filter);
614 options.compression = kNoCompression;
615 options.level0_file_num_compaction_trigger = 8;
616 Reopen(options);
617 int num_keys_per_file = 400;
618 for (int j = 0; j < 3; j++) {
619 // Write several keys.
620 const std::string value(10, 'x');
621 for (int i = 0; i < num_keys_per_file; i++) {
622 char key[100];
623 snprintf(key, sizeof(key), "B%08d%02d", i, j);
624 Put(key, value);
625 }
626 dbfull()->TEST_FlushMemTable();
627 // Make sure next file is much smaller so automatic compaction will not
628 // be triggered.
629 num_keys_per_file /= 2;
630 }
631 dbfull()->TEST_WaitForCompact();
632
633 // Force a manual compaction
634 cfilter_count = 0;
635 filter->expect_manual_compaction_.store(true);
636 filter->expect_full_compaction_.store(true);
637 filter->expect_cf_id_.store(0);
638 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
639 ASSERT_EQ(cfilter_count, 700);
640 ASSERT_EQ(NumSortedRuns(0), 1);
641 ASSERT_TRUE(filter->compaction_filter_created());
642
643 // Verify total number of keys is correct after manual compaction.
644 {
645 int count = 0;
646 int total = 0;
647 Arena arena;
648 InternalKeyComparator icmp(options.comparator);
649 RangeDelAggregator range_del_agg(icmp, {} /* snapshots */);
650 ScopedArenaIterator iter(
651 dbfull()->NewInternalIterator(&arena, &range_del_agg));
652 iter->SeekToFirst();
653 ASSERT_OK(iter->status());
654 while (iter->Valid()) {
655 ParsedInternalKey ikey(Slice(), 0, kTypeValue);
656 ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
657 total++;
658 if (ikey.sequence != 0) {
659 count++;
660 }
661 iter->Next();
662 }
663 ASSERT_EQ(total, 700);
664 ASSERT_EQ(count, 1);
665 }
666 }
667 #endif // ROCKSDB_LITE
668
669 TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) {
670 KeepFilterFactory* filter = new KeepFilterFactory(false, true);
671 filter->expect_cf_id_.store(1);
672
673 Options options = CurrentOptions();
674 options.compaction_filter_factory.reset(filter);
675 options.compression = kNoCompression;
676 options.level0_file_num_compaction_trigger = 2;
677 CreateAndReopenWithCF({"pikachu"}, options);
678
679 int num_keys_per_file = 400;
680 for (int j = 0; j < 3; j++) {
681 // Write several keys.
682 const std::string value(10, 'x');
683 for (int i = 0; i < num_keys_per_file; i++) {
684 char key[100];
685 snprintf(key, sizeof(key), "B%08d%02d", i, j);
686 Put(1, key, value);
687 }
688 Flush(1);
689 // Make sure next file is much smaller so automatic compaction will not
690 // be triggered.
691 num_keys_per_file /= 2;
692 }
693 dbfull()->TEST_WaitForCompact();
694
695 ASSERT_TRUE(filter->compaction_filter_created());
696 }
697
698 #ifndef ROCKSDB_LITE
699 // Compaction filters should only be applied to records that are newer than the
700 // latest snapshot. This test inserts records and applies a delete filter.
701 TEST_F(DBTestCompactionFilter, CompactionFilterSnapshot) {
702 Options options = CurrentOptions();
703 options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
704 options.disable_auto_compactions = true;
705 options.create_if_missing = true;
706 DestroyAndReopen(options);
707
708 // Put some data.
709 const Snapshot* snapshot = nullptr;
710 for (int table = 0; table < 4; ++table) {
711 for (int i = 0; i < 10; ++i) {
712 Put(ToString(table * 100 + i), "val");
713 }
714 Flush();
715
716 if (table == 0) {
717 snapshot = db_->GetSnapshot();
718 }
719 }
720 assert(snapshot != nullptr);
721
722 cfilter_count = 0;
723 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
724 // The filter should delete 10 records.
725 ASSERT_EQ(30U, cfilter_count);
726
727 // Release the snapshot and compact again -> now all records should be
728 // removed.
729 db_->ReleaseSnapshot(snapshot);
730 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
731 ASSERT_EQ(0U, CountLiveFiles());
732 }
733
734 // Compaction filters should only be applied to records that are newer than the
735 // latest snapshot. However, if the compaction filter asks to ignore snapshots
736 // records newer than the snapshot will also be processed
737 TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
738 std::string five = ToString(5);
739 Options options = CurrentOptions();
740 options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>();
741 options.disable_auto_compactions = true;
742 options.create_if_missing = true;
743 DestroyAndReopen(options);
744
745 // Put some data.
746 const Snapshot* snapshot = nullptr;
747 for (int table = 0; table < 4; ++table) {
748 for (int i = 0; i < 10; ++i) {
749 Put(ToString(table * 100 + i), "val");
750 }
751 Flush();
752
753 if (table == 0) {
754 snapshot = db_->GetSnapshot();
755 }
756 }
757 assert(snapshot != nullptr);
758
759 cfilter_count = 0;
760 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
761 // The filter should delete 40 records.
762 ASSERT_EQ(40U, cfilter_count);
763
764 {
765 // Scan the entire database as of the snapshot to ensure
766 // that nothing is left
767 ReadOptions read_options;
768 read_options.snapshot = snapshot;
769 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
770 iter->SeekToFirst();
771 int count = 0;
772 while (iter->Valid()) {
773 count++;
774 iter->Next();
775 }
776 ASSERT_EQ(count, 6);
777 read_options.snapshot = nullptr;
778 std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options));
779 iter1->SeekToFirst();
780 count = 0;
781 while (iter1->Valid()) {
782 count++;
783 iter1->Next();
784 }
785 // We have deleted 10 keys from 40 using the compaction filter
786 // Keys 6-9 before the snapshot and 100-105 after the snapshot
787 ASSERT_EQ(count, 30);
788 }
789
790 // Release the snapshot and compact again -> now all records should be
791 // removed.
792 db_->ReleaseSnapshot(snapshot);
793 }
794 #endif // ROCKSDB_LITE
795
796 TEST_F(DBTestCompactionFilter, SkipUntil) {
797 Options options = CurrentOptions();
798 options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
799 options.disable_auto_compactions = true;
800 options.create_if_missing = true;
801 DestroyAndReopen(options);
802
803 // Write 100K keys, these are written to a few files in L0.
804 for (int table = 0; table < 4; ++table) {
805 // Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371].
806 for (int i = table * 6; i < 39 + table * 11; ++i) {
807 char key[100];
808 snprintf(key, sizeof(key), "%010d", table * 100 + i);
809 Put(key, std::to_string(table * 1000 + i));
810 }
811 Flush();
812 }
813
814 cfilter_skips = 0;
815 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
816 // Number of skips in tables: 2, 3, 3, 3.
817 ASSERT_EQ(11, cfilter_skips);
818
819 for (int table = 0; table < 4; ++table) {
820 for (int i = table * 6; i < 39 + table * 11; ++i) {
821 int k = table * 100 + i;
822 char key[100];
823 snprintf(key, sizeof(key), "%010d", table * 100 + i);
824 auto expected = std::to_string(table * 1000 + i);
825 std::string val;
826 Status s = db_->Get(ReadOptions(), key, &val);
827 if (k / 10 % 2 == 0) {
828 ASSERT_TRUE(s.IsNotFound());
829 } else {
830 ASSERT_OK(s);
831 ASSERT_EQ(expected, val);
832 }
833 }
834 }
835 }
836
837 TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) {
838 BlockBasedTableOptions table_options;
839 table_options.whole_key_filtering = false;
840 table_options.filter_policy.reset(NewBloomFilterPolicy(100, false));
841
842 Options options = CurrentOptions();
843 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
844 options.prefix_extractor.reset(NewCappedPrefixTransform(9));
845 options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
846 options.disable_auto_compactions = true;
847 options.create_if_missing = true;
848 DestroyAndReopen(options);
849
850 Put("0000000010", "v10");
851 Put("0000000020", "v20"); // skipped
852 Put("0000000050", "v50");
853 Flush();
854
855 cfilter_skips = 0;
856 EXPECT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
857 EXPECT_EQ(1, cfilter_skips);
858
859 Status s;
860 std::string val;
861
862 s = db_->Get(ReadOptions(), "0000000010", &val);
863 ASSERT_OK(s);
864 EXPECT_EQ("v10", val);
865
866 s = db_->Get(ReadOptions(), "0000000020", &val);
867 EXPECT_TRUE(s.IsNotFound());
868
869 s = db_->Get(ReadOptions(), "0000000050", &val);
870 ASSERT_OK(s);
871 EXPECT_EQ("v50", val);
872 }
873
874 } // namespace rocksdb
875
876 int main(int argc, char** argv) {
877 rocksdb::port::InstallStackTraceHandler();
878 ::testing::InitGoogleTest(&argc, argv);
879 return RUN_ALL_TESTS();
880 }