]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_compaction_filter_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / db_compaction_filter_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/db_test_util.h"
11#include "port/stack_trace.h"
12
f67539c2 13namespace ROCKSDB_NAMESPACE {
7c673cae
FG
14
15static int cfilter_count = 0;
16static int cfilter_skips = 0;
17
18// This is a static filter used for filtering
19// kvs during the compaction process.
20static std::string NEW_VALUE = "NewValue";
21
22class DBTestCompactionFilter : public DBTestBase {
23 public:
24 DBTestCompactionFilter() : DBTestBase("/db_compaction_filter_test") {}
25};
26
11fdf7f2
TL
27// Param variant of DBTestBase::ChangeCompactOptions
28class DBTestCompactionFilterWithCompactParam
29 : public DBTestCompactionFilter,
30 public ::testing::WithParamInterface<DBTestBase::OptionConfig> {
31 public:
32 DBTestCompactionFilterWithCompactParam() : DBTestCompactionFilter() {
33 option_config_ = GetParam();
34 Destroy(last_options_);
35 auto options = CurrentOptions();
36 if (option_config_ == kDefault || option_config_ == kUniversalCompaction ||
37 option_config_ == kUniversalCompactionMultiLevel) {
38 options.create_if_missing = true;
39 }
40 if (option_config_ == kLevelSubcompactions ||
41 option_config_ == kUniversalSubcompactions) {
42 assert(options.max_subcompactions > 1);
43 }
44 TryReopen(options);
45 }
46};
47
48#ifndef ROCKSDB_VALGRIND_RUN
49INSTANTIATE_TEST_CASE_P(
f67539c2 50 CompactionFilterWithOption, DBTestCompactionFilterWithCompactParam,
11fdf7f2
TL
51 ::testing::Values(DBTestBase::OptionConfig::kDefault,
52 DBTestBase::OptionConfig::kUniversalCompaction,
53 DBTestBase::OptionConfig::kUniversalCompactionMultiLevel,
54 DBTestBase::OptionConfig::kLevelSubcompactions,
55 DBTestBase::OptionConfig::kUniversalSubcompactions));
56#else
57// Run fewer cases in valgrind
f67539c2 58INSTANTIATE_TEST_CASE_P(CompactionFilterWithOption,
11fdf7f2
TL
59 DBTestCompactionFilterWithCompactParam,
60 ::testing::Values(DBTestBase::OptionConfig::kDefault));
61#endif // ROCKSDB_VALGRIND_RUN
62
7c673cae
FG
63class KeepFilter : public CompactionFilter {
64 public:
494da23a
TL
65 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
66 std::string* /*new_value*/,
67 bool* /*value_changed*/) const override {
7c673cae
FG
68 cfilter_count++;
69 return false;
70 }
71
494da23a 72 const char* Name() const override { return "KeepFilter"; }
7c673cae
FG
73};
74
75class DeleteFilter : public CompactionFilter {
76 public:
494da23a
TL
77 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
78 std::string* /*new_value*/,
79 bool* /*value_changed*/) const override {
7c673cae
FG
80 cfilter_count++;
81 return true;
82 }
83
494da23a 84 const char* Name() const override { return "DeleteFilter"; }
7c673cae
FG
85};
86
87class DeleteISFilter : public CompactionFilter {
88 public:
494da23a
TL
89 bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/,
90 std::string* /*new_value*/,
91 bool* /*value_changed*/) const override {
7c673cae
FG
92 cfilter_count++;
93 int i = std::stoi(key.ToString());
94 if (i > 5 && i <= 105) {
95 return true;
96 }
97 return false;
98 }
99
494da23a 100 bool IgnoreSnapshots() const override { return true; }
7c673cae 101
494da23a 102 const char* Name() const override { return "DeleteFilter"; }
7c673cae
FG
103};
104
105// Skip x if floor(x/10) is even, use range skips. Requires that keys are
106// zero-padded to length 10.
107class SkipEvenFilter : public CompactionFilter {
108 public:
494da23a
TL
109 Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/,
110 const Slice& /*existing_value*/, std::string* /*new_value*/,
111 std::string* skip_until) const override {
7c673cae
FG
112 cfilter_count++;
113 int i = std::stoi(key.ToString());
114 if (i / 10 % 2 == 0) {
115 char key_str[100];
11fdf7f2 116 snprintf(key_str, sizeof(key_str), "%010d", i / 10 * 10 + 10);
7c673cae
FG
117 *skip_until = key_str;
118 ++cfilter_skips;
119 return Decision::kRemoveAndSkipUntil;
120 }
121 return Decision::kKeep;
122 }
123
494da23a 124 bool IgnoreSnapshots() const override { return true; }
7c673cae 125
494da23a 126 const char* Name() const override { return "DeleteFilter"; }
7c673cae
FG
127};
128
129class DelayFilter : public CompactionFilter {
130 public:
131 explicit DelayFilter(DBTestBase* d) : db_test(d) {}
494da23a
TL
132 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
133 std::string* /*new_value*/,
134 bool* /*value_changed*/) const override {
7c673cae
FG
135 db_test->env_->addon_time_.fetch_add(1000);
136 return true;
137 }
138
494da23a 139 const char* Name() const override { return "DelayFilter"; }
7c673cae
FG
140
141 private:
142 DBTestBase* db_test;
143};
144
145class ConditionalFilter : public CompactionFilter {
146 public:
147 explicit ConditionalFilter(const std::string* filtered_value)
148 : filtered_value_(filtered_value) {}
494da23a
TL
149 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& value,
150 std::string* /*new_value*/,
151 bool* /*value_changed*/) const override {
7c673cae
FG
152 return value.ToString() == *filtered_value_;
153 }
154
494da23a 155 const char* Name() const override { return "ConditionalFilter"; }
7c673cae
FG
156
157 private:
158 const std::string* filtered_value_;
159};
160
161class ChangeFilter : public CompactionFilter {
162 public:
163 explicit ChangeFilter() {}
164
494da23a
TL
165 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
166 std::string* new_value, bool* value_changed) const override {
7c673cae
FG
167 assert(new_value != nullptr);
168 *new_value = NEW_VALUE;
169 *value_changed = true;
170 return false;
171 }
172
494da23a 173 const char* Name() const override { return "ChangeFilter"; }
7c673cae
FG
174};
175
176class KeepFilterFactory : public CompactionFilterFactory {
177 public:
178 explicit KeepFilterFactory(bool check_context = false,
179 bool check_context_cf_id = false)
180 : check_context_(check_context),
181 check_context_cf_id_(check_context_cf_id),
182 compaction_filter_created_(false) {}
183
494da23a 184 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
7c673cae
FG
185 const CompactionFilter::Context& context) override {
186 if (check_context_) {
187 EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
188 EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
189 }
190 if (check_context_cf_id_) {
191 EXPECT_EQ(expect_cf_id_.load(), context.column_family_id);
192 }
193 compaction_filter_created_ = true;
194 return std::unique_ptr<CompactionFilter>(new KeepFilter());
195 }
196
197 bool compaction_filter_created() const { return compaction_filter_created_; }
198
494da23a 199 const char* Name() const override { return "KeepFilterFactory"; }
7c673cae
FG
200 bool check_context_;
201 bool check_context_cf_id_;
202 std::atomic_bool expect_full_compaction_;
203 std::atomic_bool expect_manual_compaction_;
204 std::atomic<uint32_t> expect_cf_id_;
205 bool compaction_filter_created_;
206};
207
208class DeleteFilterFactory : public CompactionFilterFactory {
209 public:
494da23a 210 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
7c673cae
FG
211 const CompactionFilter::Context& context) override {
212 if (context.is_manual_compaction) {
213 return std::unique_ptr<CompactionFilter>(new DeleteFilter());
214 } else {
215 return std::unique_ptr<CompactionFilter>(nullptr);
216 }
217 }
218
494da23a 219 const char* Name() const override { return "DeleteFilterFactory"; }
7c673cae
FG
220};
221
222// Delete Filter Factory which ignores snapshots
223class DeleteISFilterFactory : public CompactionFilterFactory {
224 public:
494da23a 225 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
7c673cae
FG
226 const CompactionFilter::Context& context) override {
227 if (context.is_manual_compaction) {
228 return std::unique_ptr<CompactionFilter>(new DeleteISFilter());
229 } else {
230 return std::unique_ptr<CompactionFilter>(nullptr);
231 }
232 }
233
494da23a 234 const char* Name() const override { return "DeleteFilterFactory"; }
7c673cae
FG
235};
236
237class SkipEvenFilterFactory : public CompactionFilterFactory {
238 public:
494da23a 239 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
7c673cae
FG
240 const CompactionFilter::Context& context) override {
241 if (context.is_manual_compaction) {
242 return std::unique_ptr<CompactionFilter>(new SkipEvenFilter());
243 } else {
244 return std::unique_ptr<CompactionFilter>(nullptr);
245 }
246 }
247
494da23a 248 const char* Name() const override { return "SkipEvenFilterFactory"; }
7c673cae
FG
249};
250
251class DelayFilterFactory : public CompactionFilterFactory {
252 public:
253 explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
494da23a 254 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
11fdf7f2 255 const CompactionFilter::Context& /*context*/) override {
7c673cae
FG
256 return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
257 }
258
494da23a 259 const char* Name() const override { return "DelayFilterFactory"; }
7c673cae
FG
260
261 private:
262 DBTestBase* db_test;
263};
264
265class ConditionalFilterFactory : public CompactionFilterFactory {
266 public:
267 explicit ConditionalFilterFactory(const Slice& filtered_value)
268 : filtered_value_(filtered_value.ToString()) {}
269
494da23a 270 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
11fdf7f2 271 const CompactionFilter::Context& /*context*/) override {
7c673cae
FG
272 return std::unique_ptr<CompactionFilter>(
273 new ConditionalFilter(&filtered_value_));
274 }
275
494da23a 276 const char* Name() const override { return "ConditionalFilterFactory"; }
7c673cae
FG
277
278 private:
279 std::string filtered_value_;
280};
281
282class ChangeFilterFactory : public CompactionFilterFactory {
283 public:
284 explicit ChangeFilterFactory() {}
285
494da23a 286 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
11fdf7f2 287 const CompactionFilter::Context& /*context*/) override {
7c673cae
FG
288 return std::unique_ptr<CompactionFilter>(new ChangeFilter());
289 }
290
494da23a 291 const char* Name() const override { return "ChangeFilterFactory"; }
7c673cae
FG
292};
293
294#ifndef ROCKSDB_LITE
295TEST_F(DBTestCompactionFilter, CompactionFilter) {
296 Options options = CurrentOptions();
297 options.max_open_files = -1;
298 options.num_levels = 3;
299 options.compaction_filter_factory = std::make_shared<KeepFilterFactory>();
300 options = CurrentOptions(options);
301 CreateAndReopenWithCF({"pikachu"}, options);
302
303 // Write 100K keys, these are written to a few files in L0.
304 const std::string value(10, 'x');
305 for (int i = 0; i < 100000; i++) {
306 char key[100];
307 snprintf(key, sizeof(key), "B%010d", i);
308 Put(1, key, value);
309 }
310 ASSERT_OK(Flush(1));
311
312 // Push all files to the highest level L2. Verify that
313 // the compaction is each level invokes the filter for
314 // all the keys in that level.
315 cfilter_count = 0;
316 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
317 ASSERT_EQ(cfilter_count, 100000);
318 cfilter_count = 0;
319 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
320 ASSERT_EQ(cfilter_count, 100000);
321
322 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
323 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
324 ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
325 cfilter_count = 0;
326
327 // All the files are in the lowest level.
328 // Verify that all but the 100001st record
329 // has sequence number zero. The 100001st record
330 // is at the tip of this snapshot and cannot
331 // be zeroed out.
332 int count = 0;
333 int total = 0;
334 Arena arena;
335 {
336 InternalKeyComparator icmp(options.comparator);
494da23a
TL
337 ReadRangeDelAggregator range_del_agg(&icmp,
338 kMaxSequenceNumber /* upper_bound */);
339 ScopedArenaIterator iter(dbfull()->NewInternalIterator(
340 &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
7c673cae
FG
341 iter->SeekToFirst();
342 ASSERT_OK(iter->status());
343 while (iter->Valid()) {
344 ParsedInternalKey ikey(Slice(), 0, kTypeValue);
7c673cae
FG
345 ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
346 total++;
347 if (ikey.sequence != 0) {
348 count++;
349 }
350 iter->Next();
351 }
352 }
353 ASSERT_EQ(total, 100000);
494da23a 354 ASSERT_EQ(count, 0);
7c673cae
FG
355
356 // overwrite all the 100K keys once again.
357 for (int i = 0; i < 100000; i++) {
358 char key[100];
359 snprintf(key, sizeof(key), "B%010d", i);
360 ASSERT_OK(Put(1, key, value));
361 }
362 ASSERT_OK(Flush(1));
363
364 // push all files to the highest level L2. This
365 // means that all keys should pass at least once
366 // via the compaction filter
367 cfilter_count = 0;
368 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
369 ASSERT_EQ(cfilter_count, 100000);
370 cfilter_count = 0;
371 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
372 ASSERT_EQ(cfilter_count, 100000);
373 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
374 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
375 ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
376
377 // create a new database with the compaction
378 // filter in such a way that it deletes all keys
379 options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
380 options.create_if_missing = true;
381 DestroyAndReopen(options);
382 CreateAndReopenWithCF({"pikachu"}, options);
383
384 // write all the keys once again.
385 for (int i = 0; i < 100000; i++) {
386 char key[100];
387 snprintf(key, sizeof(key), "B%010d", i);
388 ASSERT_OK(Put(1, key, value));
389 }
390 ASSERT_OK(Flush(1));
391 ASSERT_NE(NumTableFilesAtLevel(0, 1), 0);
392 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
393 ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0);
394
395 // Push all files to the highest level L2. This
396 // triggers the compaction filter to delete all keys,
397 // verify that at the end of the compaction process,
398 // nothing is left.
399 cfilter_count = 0;
400 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
401 ASSERT_EQ(cfilter_count, 100000);
402 cfilter_count = 0;
403 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
404 ASSERT_EQ(cfilter_count, 0);
405 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
406 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
407
408 {
409 // Scan the entire database to ensure that nothing is left
410 std::unique_ptr<Iterator> iter(
411 db_->NewIterator(ReadOptions(), handles_[1]));
412 iter->SeekToFirst();
413 count = 0;
414 while (iter->Valid()) {
415 count++;
416 iter->Next();
417 }
418 ASSERT_EQ(count, 0);
419 }
420
421 // The sequence number of the remaining record
422 // is not zeroed out even though it is at the
423 // level Lmax because this record is at the tip
424 count = 0;
425 {
426 InternalKeyComparator icmp(options.comparator);
494da23a
TL
427 ReadRangeDelAggregator range_del_agg(&icmp,
428 kMaxSequenceNumber /* upper_bound */);
429 ScopedArenaIterator iter(dbfull()->NewInternalIterator(
430 &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
7c673cae
FG
431 iter->SeekToFirst();
432 ASSERT_OK(iter->status());
433 while (iter->Valid()) {
434 ParsedInternalKey ikey(Slice(), 0, kTypeValue);
435 ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
436 ASSERT_NE(ikey.sequence, (unsigned)0);
437 count++;
438 iter->Next();
439 }
440 ASSERT_EQ(count, 0);
441 }
442}
443
444// Tests the edge case where compaction does not produce any output -- all
445// entries are deleted. The compaction should create bunch of 'DeleteFile'
446// entries in VersionEdit, but none of the 'AddFile's.
447TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) {
448 Options options = CurrentOptions();
449 options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
450 options.disable_auto_compactions = true;
451 options.create_if_missing = true;
452 DestroyAndReopen(options);
453
454 // put some data
455 for (int table = 0; table < 4; ++table) {
456 for (int i = 0; i < 10 + table; ++i) {
457 Put(ToString(table * 100 + i), "val");
458 }
459 Flush();
460 }
461
462 // this will produce empty file (delete compaction filter)
463 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
464 ASSERT_EQ(0U, CountLiveFiles());
465
466 Reopen(options);
467
468 Iterator* itr = db_->NewIterator(ReadOptions());
469 itr->SeekToFirst();
470 // empty db
471 ASSERT_TRUE(!itr->Valid());
472
473 delete itr;
474}
475#endif // ROCKSDB_LITE
476
11fdf7f2
TL
477TEST_P(DBTestCompactionFilterWithCompactParam,
478 CompactionFilterWithValueChange) {
479 Options options = CurrentOptions();
480 options.num_levels = 3;
481 options.compaction_filter_factory = std::make_shared<ChangeFilterFactory>();
482 CreateAndReopenWithCF({"pikachu"}, options);
7c673cae 483
11fdf7f2
TL
484 // Write 100K+1 keys, these are written to a few files
485 // in L0. We do this so that the current snapshot points
486 // to the 100001 key.The compaction filter is not invoked
487 // on keys that are visible via a snapshot because we
488 // anyways cannot delete it.
489 const std::string value(10, 'x');
490 for (int i = 0; i < 100001; i++) {
491 char key[100];
492 snprintf(key, sizeof(key), "B%010d", i);
493 Put(1, key, value);
494 }
7c673cae 495
11fdf7f2
TL
496 // push all files to lower levels
497 ASSERT_OK(Flush(1));
498 if (option_config_ != kUniversalCompactionMultiLevel &&
499 option_config_ != kUniversalSubcompactions) {
500 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
501 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
502 } else {
503 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
504 nullptr);
505 }
7c673cae 506
11fdf7f2
TL
507 // re-write all data again
508 for (int i = 0; i < 100001; i++) {
509 char key[100];
510 snprintf(key, sizeof(key), "B%010d", i);
511 Put(1, key, value);
512 }
7c673cae 513
11fdf7f2
TL
514 // push all files to lower levels. This should
515 // invoke the compaction filter for all 100000 keys.
516 ASSERT_OK(Flush(1));
517 if (option_config_ != kUniversalCompactionMultiLevel &&
518 option_config_ != kUniversalSubcompactions) {
519 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
520 dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
521 } else {
522 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
523 nullptr);
524 }
525
526 // verify that all keys now have the new value that
527 // was set by the compaction process.
528 for (int i = 0; i < 100001; i++) {
529 char key[100];
530 snprintf(key, sizeof(key), "B%010d", i);
531 std::string newvalue = Get(1, key);
532 ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
533 }
7c673cae
FG
534}
535
536TEST_F(DBTestCompactionFilter, CompactionFilterWithMergeOperator) {
537 std::string one, two, three, four;
538 PutFixed64(&one, 1);
539 PutFixed64(&two, 2);
540 PutFixed64(&three, 3);
541 PutFixed64(&four, 4);
542
543 Options options = CurrentOptions();
544 options.create_if_missing = true;
545 options.merge_operator = MergeOperators::CreateUInt64AddOperator();
546 options.num_levels = 3;
547 // Filter out keys with value is 2.
548 options.compaction_filter_factory =
549 std::make_shared<ConditionalFilterFactory>(two);
550 DestroyAndReopen(options);
551
552 // In the same compaction, a value type needs to be deleted based on
553 // compaction filter, and there is a merge type for the key. compaction
554 // filter result is ignored.
555 ASSERT_OK(db_->Put(WriteOptions(), "foo", two));
556 ASSERT_OK(Flush());
557 ASSERT_OK(db_->Merge(WriteOptions(), "foo", one));
558 ASSERT_OK(Flush());
559 std::string newvalue = Get("foo");
560 ASSERT_EQ(newvalue, three);
561 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
562 newvalue = Get("foo");
563 ASSERT_EQ(newvalue, three);
564
565 // value key can be deleted based on compaction filter, leaving only
566 // merge keys.
567 ASSERT_OK(db_->Put(WriteOptions(), "bar", two));
568 ASSERT_OK(Flush());
569 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
570 newvalue = Get("bar");
571 ASSERT_EQ("NOT_FOUND", newvalue);
572 ASSERT_OK(db_->Merge(WriteOptions(), "bar", two));
573 ASSERT_OK(Flush());
574 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
575 newvalue = Get("bar");
576 ASSERT_EQ(two, two);
577
578 // Compaction filter never applies to merge keys.
579 ASSERT_OK(db_->Put(WriteOptions(), "foobar", one));
580 ASSERT_OK(Flush());
581 ASSERT_OK(db_->Merge(WriteOptions(), "foobar", two));
582 ASSERT_OK(Flush());
583 newvalue = Get("foobar");
584 ASSERT_EQ(newvalue, three);
585 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
586 newvalue = Get("foobar");
587 ASSERT_EQ(newvalue, three);
588
589 // In the same compaction, both of value type and merge type keys need to be
590 // deleted based on compaction filter, and there is a merge type for the key.
591 // For both keys, compaction filter results are ignored.
592 ASSERT_OK(db_->Put(WriteOptions(), "barfoo", two));
593 ASSERT_OK(Flush());
594 ASSERT_OK(db_->Merge(WriteOptions(), "barfoo", two));
595 ASSERT_OK(Flush());
596 newvalue = Get("barfoo");
597 ASSERT_EQ(newvalue, four);
598 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
599 newvalue = Get("barfoo");
600 ASSERT_EQ(newvalue, four);
601}
602
603#ifndef ROCKSDB_LITE
604TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
605 KeepFilterFactory* filter = new KeepFilterFactory(true, true);
606
607 Options options = CurrentOptions();
608 options.compaction_style = kCompactionStyleUniversal;
609 options.compaction_filter_factory.reset(filter);
610 options.compression = kNoCompression;
611 options.level0_file_num_compaction_trigger = 8;
612 Reopen(options);
613 int num_keys_per_file = 400;
614 for (int j = 0; j < 3; j++) {
615 // Write several keys.
616 const std::string value(10, 'x');
617 for (int i = 0; i < num_keys_per_file; i++) {
618 char key[100];
619 snprintf(key, sizeof(key), "B%08d%02d", i, j);
620 Put(key, value);
621 }
622 dbfull()->TEST_FlushMemTable();
623 // Make sure next file is much smaller so automatic compaction will not
624 // be triggered.
625 num_keys_per_file /= 2;
626 }
627 dbfull()->TEST_WaitForCompact();
628
629 // Force a manual compaction
630 cfilter_count = 0;
631 filter->expect_manual_compaction_.store(true);
632 filter->expect_full_compaction_.store(true);
633 filter->expect_cf_id_.store(0);
634 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
635 ASSERT_EQ(cfilter_count, 700);
636 ASSERT_EQ(NumSortedRuns(0), 1);
637 ASSERT_TRUE(filter->compaction_filter_created());
638
639 // Verify total number of keys is correct after manual compaction.
640 {
641 int count = 0;
642 int total = 0;
643 Arena arena;
644 InternalKeyComparator icmp(options.comparator);
494da23a
TL
645 ReadRangeDelAggregator range_del_agg(&icmp,
646 kMaxSequenceNumber /* snapshots */);
647 ScopedArenaIterator iter(dbfull()->NewInternalIterator(
648 &arena, &range_del_agg, kMaxSequenceNumber));
7c673cae
FG
649 iter->SeekToFirst();
650 ASSERT_OK(iter->status());
651 while (iter->Valid()) {
652 ParsedInternalKey ikey(Slice(), 0, kTypeValue);
7c673cae
FG
653 ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
654 total++;
655 if (ikey.sequence != 0) {
656 count++;
657 }
658 iter->Next();
659 }
660 ASSERT_EQ(total, 700);
494da23a 661 ASSERT_EQ(count, 0);
7c673cae
FG
662 }
663}
664#endif // ROCKSDB_LITE
665
666TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) {
667 KeepFilterFactory* filter = new KeepFilterFactory(false, true);
668 filter->expect_cf_id_.store(1);
669
670 Options options = CurrentOptions();
671 options.compaction_filter_factory.reset(filter);
672 options.compression = kNoCompression;
673 options.level0_file_num_compaction_trigger = 2;
674 CreateAndReopenWithCF({"pikachu"}, options);
675
676 int num_keys_per_file = 400;
677 for (int j = 0; j < 3; j++) {
678 // Write several keys.
679 const std::string value(10, 'x');
680 for (int i = 0; i < num_keys_per_file; i++) {
681 char key[100];
682 snprintf(key, sizeof(key), "B%08d%02d", i, j);
683 Put(1, key, value);
684 }
685 Flush(1);
686 // Make sure next file is much smaller so automatic compaction will not
687 // be triggered.
688 num_keys_per_file /= 2;
689 }
690 dbfull()->TEST_WaitForCompact();
691
692 ASSERT_TRUE(filter->compaction_filter_created());
693}
694
695#ifndef ROCKSDB_LITE
494da23a 696// Compaction filters aplies to all records, regardless snapshots.
7c673cae
FG
697TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
698 std::string five = ToString(5);
699 Options options = CurrentOptions();
700 options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>();
701 options.disable_auto_compactions = true;
702 options.create_if_missing = true;
703 DestroyAndReopen(options);
704
705 // Put some data.
706 const Snapshot* snapshot = nullptr;
707 for (int table = 0; table < 4; ++table) {
708 for (int i = 0; i < 10; ++i) {
709 Put(ToString(table * 100 + i), "val");
710 }
711 Flush();
712
713 if (table == 0) {
714 snapshot = db_->GetSnapshot();
715 }
716 }
717 assert(snapshot != nullptr);
718
719 cfilter_count = 0;
720 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
721 // The filter should delete 40 records.
f67539c2 722 ASSERT_EQ(40, cfilter_count);
7c673cae
FG
723
724 {
725 // Scan the entire database as of the snapshot to ensure
726 // that nothing is left
727 ReadOptions read_options;
728 read_options.snapshot = snapshot;
729 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
730 iter->SeekToFirst();
731 int count = 0;
732 while (iter->Valid()) {
733 count++;
734 iter->Next();
735 }
736 ASSERT_EQ(count, 6);
11fdf7f2 737 read_options.snapshot = nullptr;
7c673cae
FG
738 std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options));
739 iter1->SeekToFirst();
740 count = 0;
741 while (iter1->Valid()) {
742 count++;
743 iter1->Next();
744 }
745 // We have deleted 10 keys from 40 using the compaction filter
746 // Keys 6-9 before the snapshot and 100-105 after the snapshot
747 ASSERT_EQ(count, 30);
748 }
749
750 // Release the snapshot and compact again -> now all records should be
751 // removed.
752 db_->ReleaseSnapshot(snapshot);
753}
754#endif // ROCKSDB_LITE
755
756TEST_F(DBTestCompactionFilter, SkipUntil) {
757 Options options = CurrentOptions();
758 options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
759 options.disable_auto_compactions = true;
760 options.create_if_missing = true;
761 DestroyAndReopen(options);
762
763 // Write 100K keys, these are written to a few files in L0.
764 for (int table = 0; table < 4; ++table) {
765 // Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371].
766 for (int i = table * 6; i < 39 + table * 11; ++i) {
767 char key[100];
768 snprintf(key, sizeof(key), "%010d", table * 100 + i);
769 Put(key, std::to_string(table * 1000 + i));
770 }
771 Flush();
772 }
773
774 cfilter_skips = 0;
775 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
11fdf7f2 776 // Number of skips in tables: 2, 3, 3, 3.
7c673cae
FG
777 ASSERT_EQ(11, cfilter_skips);
778
779 for (int table = 0; table < 4; ++table) {
780 for (int i = table * 6; i < 39 + table * 11; ++i) {
781 int k = table * 100 + i;
782 char key[100];
783 snprintf(key, sizeof(key), "%010d", table * 100 + i);
784 auto expected = std::to_string(table * 1000 + i);
785 std::string val;
786 Status s = db_->Get(ReadOptions(), key, &val);
787 if (k / 10 % 2 == 0) {
788 ASSERT_TRUE(s.IsNotFound());
789 } else {
790 ASSERT_OK(s);
791 ASSERT_EQ(expected, val);
792 }
793 }
794 }
795}
796
11fdf7f2
TL
797TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) {
798 BlockBasedTableOptions table_options;
799 table_options.whole_key_filtering = false;
800 table_options.filter_policy.reset(NewBloomFilterPolicy(100, false));
801
802 Options options = CurrentOptions();
803 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
804 options.prefix_extractor.reset(NewCappedPrefixTransform(9));
805 options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
806 options.disable_auto_compactions = true;
807 options.create_if_missing = true;
808 DestroyAndReopen(options);
809
810 Put("0000000010", "v10");
494da23a 811 Put("0000000020", "v20"); // skipped
11fdf7f2
TL
812 Put("0000000050", "v50");
813 Flush();
814
815 cfilter_skips = 0;
816 EXPECT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
817 EXPECT_EQ(1, cfilter_skips);
818
819 Status s;
820 std::string val;
821
822 s = db_->Get(ReadOptions(), "0000000010", &val);
823 ASSERT_OK(s);
824 EXPECT_EQ("v10", val);
825
826 s = db_->Get(ReadOptions(), "0000000020", &val);
827 EXPECT_TRUE(s.IsNotFound());
828
829 s = db_->Get(ReadOptions(), "0000000050", &val);
830 ASSERT_OK(s);
831 EXPECT_EQ("v50", val);
832}
833
494da23a
TL
834class TestNotSupportedFilter : public CompactionFilter {
835 public:
836 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
837 std::string* /*new_value*/,
838 bool* /*value_changed*/) const override {
839 return true;
840 }
841
842 const char* Name() const override { return "NotSupported"; }
843 bool IgnoreSnapshots() const override { return false; }
844};
845
846TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalse) {
847 Options options = CurrentOptions();
848 options.compaction_filter = new TestNotSupportedFilter();
849 DestroyAndReopen(options);
850
851 Put("a", "v10");
852 Put("z", "v20");
853 Flush();
854
855 Put("a", "v10");
856 Put("z", "v20");
857 Flush();
858
859 // Comapction should fail because IgnoreSnapshots() = false
860 EXPECT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
861 .IsNotSupported());
862
863 delete options.compaction_filter;
864}
865
f67539c2 866} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
867
868int main(int argc, char** argv) {
f67539c2 869 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
7c673cae
FG
870 ::testing::InitGoogleTest(&argc, argv);
871 return RUN_ALL_TESTS();
872}