class DBTestCompactionFilter : public DBTestBase {
public:
DBTestCompactionFilter()
- : DBTestBase("/db_compaction_filter_test", /*env_do_fsync=*/true) {}
+ : DBTestBase("db_compaction_filter_test", /*env_do_fsync=*/true) {}
};
// Param variant of DBTestBase::ChangeCompactOptions
option_config_ == kUniversalSubcompactions) {
assert(options.max_subcompactions > 1);
}
- TryReopen(options);
+ Reopen(options);
}
};
-#ifndef ROCKSDB_VALGRIND_RUN
+#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
INSTANTIATE_TEST_CASE_P(
CompactionFilterWithOption, DBTestCompactionFilterWithCompactParam,
::testing::Values(DBTestBase::OptionConfig::kDefault,
DBTestBase::OptionConfig::kLevelSubcompactions,
DBTestBase::OptionConfig::kUniversalSubcompactions));
#else
-// Run fewer cases in valgrind
+// Run fewer cases in non-full valgrind to save time.
INSTANTIATE_TEST_CASE_P(CompactionFilterWithOption,
DBTestCompactionFilterWithCompactParam,
::testing::Values(DBTestBase::OptionConfig::kDefault));
-#endif // ROCKSDB_VALGRIND_RUN
+#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
class KeepFilter : public CompactionFilter {
public:
return true;
}
+ bool FilterMergeOperand(int /*level*/, const Slice& /*key*/,
+ const Slice& /*operand*/) const override {
+ return true;
+ }
+
const char* Name() const override { return "DeleteFilter"; }
};
bool compaction_filter_created_;
};
+// This filter factory is configured with a `TableFileCreationReason`. Only
+// table files created for that reason will undergo filtering. This
+// configurability makes it useful to tests for filtering non-compaction table
+// files, such as "CompactionFilterFlush" and "CompactionFilterRecovery".
class DeleteFilterFactory : public CompactionFilterFactory {
public:
+ explicit DeleteFilterFactory(TableFileCreationReason reason)
+ : reason_(reason) {}
+
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
- if (context.is_manual_compaction) {
- return std::unique_ptr<CompactionFilter>(new DeleteFilter());
- } else {
+ EXPECT_EQ(reason_, context.reason);
+ if (context.reason == TableFileCreationReason::kCompaction &&
+ !context.is_manual_compaction) {
+ // Table files created by automatic compaction do not undergo filtering.
+ // Presumably some tests rely on this.
return std::unique_ptr<CompactionFilter>(nullptr);
}
+ return std::unique_ptr<CompactionFilter>(new DeleteFilter());
+ }
+
+ bool ShouldFilterTableFileCreation(
+ TableFileCreationReason reason) const override {
+ return reason_ == reason;
}
const char* Name() const override { return "DeleteFilterFactory"; }
+
+ private:
+ const TableFileCreationReason reason_;
};
// Delete Filter Factory which ignores snapshots
for (int i = 0; i < 100000; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
- Put(1, key, value);
+ ASSERT_OK(Put(1, key, value));
}
ASSERT_OK(Flush(1));
// the compaction is each level invokes the filter for
// all the keys in that level.
cfilter_count = 0;
- dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
ASSERT_EQ(cfilter_count, 100000);
cfilter_count = 0;
- dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
+ ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]));
ASSERT_EQ(cfilter_count, 100000);
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
Arena arena;
{
InternalKeyComparator icmp(options.comparator);
- ReadRangeDelAggregator range_del_agg(&icmp,
- kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
- read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
+ read_options, &arena, kMaxSequenceNumber, handles_[1]));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
}
iter->Next();
}
+ ASSERT_OK(iter->status());
}
ASSERT_EQ(total, 100000);
ASSERT_EQ(count, 0);
// means that all keys should pass at least once
// via the compaction filter
cfilter_count = 0;
- dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
ASSERT_EQ(cfilter_count, 100000);
cfilter_count = 0;
- dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
+ ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]));
ASSERT_EQ(cfilter_count, 100000);
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
// create a new database with the compaction
// filter in such a way that it deletes all keys
- options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
+ options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(
+ TableFileCreationReason::kCompaction);
options.create_if_missing = true;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// verify that at the end of the compaction process,
// nothing is left.
cfilter_count = 0;
- dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
ASSERT_EQ(cfilter_count, 100000);
cfilter_count = 0;
- dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
+ ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]));
ASSERT_EQ(cfilter_count, 0);
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
count++;
iter->Next();
}
+ ASSERT_OK(iter->status());
ASSERT_EQ(count, 0);
}
count = 0;
{
InternalKeyComparator icmp(options.comparator);
- ReadRangeDelAggregator range_del_agg(&icmp,
- kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
- read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
+ read_options, &arena, kMaxSequenceNumber, handles_[1]));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
// entries in VersionEdit, but none of the 'AddFile's.
TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) {
Options options = CurrentOptions();
- options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
+ options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(
+ TableFileCreationReason::kCompaction);
options.disable_auto_compactions = true;
options.create_if_missing = true;
DestroyAndReopen(options);
// put some data
for (int table = 0; table < 4; ++table) {
for (int i = 0; i < 10 + table; ++i) {
- Put(ToString(table * 100 + i), "val");
+ ASSERT_OK(Put(std::to_string(table * 100 + i), "val"));
}
- Flush();
+ ASSERT_OK(Flush());
}
// this will produce empty file (delete compaction filter)
Iterator* itr = db_->NewIterator(ReadOptions());
itr->SeekToFirst();
+ ASSERT_OK(itr->status());
// empty db
ASSERT_TRUE(!itr->Valid());
}
#endif // ROCKSDB_LITE
+TEST_F(DBTestCompactionFilter, CompactionFilterFlush) {
+ // Tests a `CompactionFilterFactory` that filters when table file is created
+ // by flush.
+ Options options = CurrentOptions();
+ options.compaction_filter_factory =
+ std::make_shared<DeleteFilterFactory>(TableFileCreationReason::kFlush);
+ options.merge_operator = MergeOperators::CreateStringAppendOperator();
+ Reopen(options);
+
+ // Puts and Merges are purged in flush.
+ ASSERT_OK(Put("a", "v"));
+ ASSERT_OK(Merge("b", "v"));
+ ASSERT_OK(Flush());
+ ASSERT_EQ("NOT_FOUND", Get("a"));
+ ASSERT_EQ("NOT_FOUND", Get("b"));
+
+ // However, Puts and Merges are preserved by recovery.
+ ASSERT_OK(Put("a", "v"));
+ ASSERT_OK(Merge("b", "v"));
+ Reopen(options);
+ ASSERT_EQ("v", Get("a"));
+ ASSERT_EQ("v", Get("b"));
+
+ // Likewise, compaction does not apply filtering.
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_EQ("v", Get("a"));
+ ASSERT_EQ("v", Get("b"));
+}
+
+TEST_F(DBTestCompactionFilter, CompactionFilterRecovery) {
+ // Tests a `CompactionFilterFactory` that filters when table file is created
+ // by recovery.
+ Options options = CurrentOptions();
+ options.compaction_filter_factory =
+ std::make_shared<DeleteFilterFactory>(TableFileCreationReason::kRecovery);
+ options.merge_operator = MergeOperators::CreateStringAppendOperator();
+ Reopen(options);
+
+ // Puts and Merges are purged in recovery.
+ ASSERT_OK(Put("a", "v"));
+ ASSERT_OK(Merge("b", "v"));
+ Reopen(options);
+ ASSERT_EQ("NOT_FOUND", Get("a"));
+ ASSERT_EQ("NOT_FOUND", Get("b"));
+
+ // However, Puts and Merges are preserved by flush.
+ ASSERT_OK(Put("a", "v"));
+ ASSERT_OK(Merge("b", "v"));
+ ASSERT_OK(Flush());
+ ASSERT_EQ("v", Get("a"));
+ ASSERT_EQ("v", Get("b"));
+
+ // Likewise, compaction does not apply filtering.
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_EQ("v", Get("a"));
+ ASSERT_EQ("v", Get("b"));
+}
+
TEST_P(DBTestCompactionFilterWithCompactParam,
CompactionFilterWithValueChange) {
Options options = CurrentOptions();
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
- Put(1, key, value);
+ ASSERT_OK(Put(1, key, value));
}
// push all files to lower levels
ASSERT_OK(Flush(1));
if (option_config_ != kUniversalCompactionMultiLevel &&
option_config_ != kUniversalSubcompactions) {
- dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
- dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
+ ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]));
} else {
- dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
- nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
+ nullptr, nullptr));
}
// re-write all data again
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
- Put(1, key, value);
+ ASSERT_OK(Put(1, key, value));
}
// push all files to lower levels. This should
ASSERT_OK(Flush(1));
if (option_config_ != kUniversalCompactionMultiLevel &&
option_config_ != kUniversalSubcompactions) {
- dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
- dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
+ ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]));
} else {
- dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
- nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
+ nullptr, nullptr));
}
// verify that all keys now have the new value that
ASSERT_OK(Flush());
std::string newvalue = Get("foo");
ASSERT_EQ(newvalue, three);
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
newvalue = Get("foo");
ASSERT_EQ(newvalue, three);
// merge keys.
ASSERT_OK(db_->Put(WriteOptions(), "bar", two));
ASSERT_OK(Flush());
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
newvalue = Get("bar");
ASSERT_EQ("NOT_FOUND", newvalue);
ASSERT_OK(db_->Merge(WriteOptions(), "bar", two));
ASSERT_OK(Flush());
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
newvalue = Get("bar");
ASSERT_EQ(two, two);
ASSERT_OK(Flush());
newvalue = Get("foobar");
ASSERT_EQ(newvalue, three);
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
newvalue = Get("foobar");
ASSERT_EQ(newvalue, three);
ASSERT_OK(Flush());
newvalue = Get("barfoo");
ASSERT_EQ(newvalue, four);
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
newvalue = Get("barfoo");
ASSERT_EQ(newvalue, four);
}
for (int i = 0; i < num_keys_per_file; i++) {
char key[100];
snprintf(key, sizeof(key), "B%08d%02d", i, j);
- Put(key, value);
+ ASSERT_OK(Put(key, value));
}
- dbfull()->TEST_FlushMemTable();
+ ASSERT_OK(dbfull()->TEST_FlushMemTable());
// Make sure next file is much smaller so automatic compaction will not
// be triggered.
num_keys_per_file /= 2;
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Force a manual compaction
cfilter_count = 0;
filter->expect_manual_compaction_.store(true);
filter->expect_full_compaction_.store(true);
filter->expect_cf_id_.store(0);
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(cfilter_count, 700);
ASSERT_EQ(NumSortedRuns(0), 1);
ASSERT_TRUE(filter->compaction_filter_created());
int total = 0;
Arena arena;
InternalKeyComparator icmp(options.comparator);
- ReadRangeDelAggregator range_del_agg(&icmp,
- kMaxSequenceNumber /* snapshots */);
ReadOptions read_options;
- ScopedArenaIterator iter(dbfull()->NewInternalIterator(
- read_options, &arena, &range_del_agg, kMaxSequenceNumber));
+ ScopedArenaIterator iter(dbfull()->NewInternalIterator(read_options, &arena,
+ kMaxSequenceNumber));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
for (int i = 0; i < num_keys_per_file; i++) {
char key[100];
snprintf(key, sizeof(key), "B%08d%02d", i, j);
- Put(1, key, value);
+ ASSERT_OK(Put(1, key, value));
}
- Flush(1);
+ ASSERT_OK(Flush(1));
// Make sure next file is much smaller so automatic compaction will not
// be triggered.
num_keys_per_file /= 2;
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_TRUE(filter->compaction_filter_created());
}
#ifndef ROCKSDB_LITE
// Compaction filters aplies to all records, regardless snapshots.
TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
- std::string five = ToString(5);
+ std::string five = std::to_string(5);
Options options = CurrentOptions();
options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>();
options.disable_auto_compactions = true;
const Snapshot* snapshot = nullptr;
for (int table = 0; table < 4; ++table) {
for (int i = 0; i < 10; ++i) {
- Put(ToString(table * 100 + i), "val");
+ ASSERT_OK(Put(std::to_string(table * 100 + i), "val"));
}
- Flush();
+ ASSERT_OK(Flush());
if (table == 0) {
snapshot = db_->GetSnapshot();
read_options.snapshot = snapshot;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
iter->SeekToFirst();
+ ASSERT_OK(iter->status());
int count = 0;
while (iter->Valid()) {
count++;
ASSERT_EQ(count, 6);
read_options.snapshot = nullptr;
std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options));
+ ASSERT_OK(iter1->status());
iter1->SeekToFirst();
count = 0;
while (iter1->Valid()) {
for (int i = table * 6; i < 39 + table * 11; ++i) {
char key[100];
snprintf(key, sizeof(key), "%010d", table * 100 + i);
- Put(key, std::to_string(table * 1000 + i));
+ ASSERT_OK(Put(key, std::to_string(table * 1000 + i)));
}
- Flush();
+ ASSERT_OK(Flush());
}
cfilter_skips = 0;
options.create_if_missing = true;
DestroyAndReopen(options);
- Put("0000000010", "v10");
- Put("0000000020", "v20"); // skipped
- Put("0000000050", "v50");
- Flush();
+ ASSERT_OK(Put("0000000010", "v10"));
+ ASSERT_OK(Put("0000000020", "v20")); // skipped
+ ASSERT_OK(Put("0000000050", "v50"));
+ ASSERT_OK(Flush());
cfilter_skips = 0;
EXPECT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
options.compaction_filter = new TestNotSupportedFilter();
DestroyAndReopen(options);
- Put("a", "v10");
- Put("z", "v20");
- Flush();
+ ASSERT_OK(Put("a", "v10"));
+ ASSERT_OK(Put("z", "v20"));
+ ASSERT_OK(Flush());
- Put("a", "v10");
- Put("z", "v20");
- Flush();
+ ASSERT_OK(Put("a", "v10"));
+ ASSERT_OK(Put("z", "v20"));
+ ASSERT_OK(Flush());
// Comapction should fail because IgnoreSnapshots() = false
EXPECT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
delete options.compaction_filter;
}
+class TestNotSupportedFilterFactory : public CompactionFilterFactory {
+ public:
+ explicit TestNotSupportedFilterFactory(TableFileCreationReason reason)
+ : reason_(reason) {}
+
+ bool ShouldFilterTableFileCreation(
+ TableFileCreationReason reason) const override {
+ return reason_ == reason;
+ }
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /* context */) override {
+ return std::unique_ptr<CompactionFilter>(new TestNotSupportedFilter());
+ }
+
+ const char* Name() const override { return "TestNotSupportedFilterFactory"; }
+
+ private:
+ const TableFileCreationReason reason_;
+};
+
+TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalseDuringFlush) {
+ Options options = CurrentOptions();
+ options.compaction_filter_factory =
+ std::make_shared<TestNotSupportedFilterFactory>(
+ TableFileCreationReason::kFlush);
+ Reopen(options);
+
+ ASSERT_OK(Put("a", "v10"));
+ ASSERT_TRUE(Flush().IsNotSupported());
+}
+
+TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalseRecovery) {
+ Options options = CurrentOptions();
+ options.compaction_filter_factory =
+ std::make_shared<TestNotSupportedFilterFactory>(
+ TableFileCreationReason::kRecovery);
+ Reopen(options);
+
+ ASSERT_OK(Put("a", "v10"));
+ ASSERT_TRUE(TryReopen(options).IsNotSupported());
+}
+
+TEST_F(DBTestCompactionFilter, DropKeyWithSingleDelete) {
+ Options options = GetDefaultOptions();
+ options.create_if_missing = true;
+
+ Reopen(options);
+
+ ASSERT_OK(Put("a", "v0"));
+ ASSERT_OK(Put("b", "v0"));
+ const Snapshot* snapshot = db_->GetSnapshot();
+
+ ASSERT_OK(SingleDelete("b"));
+ ASSERT_OK(Flush());
+
+ {
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = options.num_levels - 1;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ }
+
+ db_->ReleaseSnapshot(snapshot);
+ Close();
+
+ class DeleteFilterV2 : public CompactionFilter {
+ public:
+ Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/,
+ const Slice& /*existing_value*/,
+ std::string* /*new_value*/,
+ std::string* /*skip_until*/) const override {
+ if (key.starts_with("b")) {
+ return Decision::kPurge;
+ }
+ return Decision::kRemove;
+ }
+
+ const char* Name() const override { return "DeleteFilterV2"; }
+ } delete_filter_v2;
+
+ options.compaction_filter = &delete_filter_v2;
+ options.level0_file_num_compaction_trigger = 2;
+ Reopen(options);
+
+ ASSERT_OK(Put("b", "v1"));
+ ASSERT_OK(Put("x", "v1"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("r", "v1"));
+ ASSERT_OK(Put("z", "v1"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ Close();
+
+ options.compaction_filter = nullptr;
+ Reopen(options);
+ ASSERT_OK(SingleDelete("b"));
+ ASSERT_OK(Flush());
+ {
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ }
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {