]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/db_test_util.h" | |
11 | #include "port/stack_trace.h" | |
12 | ||
f67539c2 | 13 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
14 | |
15 | static int cfilter_count = 0; | |
16 | static int cfilter_skips = 0; | |
17 | ||
18 | // This is a static filter used for filtering | |
19 | // kvs during the compaction process. | |
20 | static std::string NEW_VALUE = "NewValue"; | |
21 | ||
22 | class DBTestCompactionFilter : public DBTestBase { | |
23 | public: | |
24 | DBTestCompactionFilter() : DBTestBase("/db_compaction_filter_test") {} | |
25 | }; | |
26 | ||
11fdf7f2 TL |
27 | // Param variant of DBTestBase::ChangeCompactOptions |
28 | class DBTestCompactionFilterWithCompactParam | |
29 | : public DBTestCompactionFilter, | |
30 | public ::testing::WithParamInterface<DBTestBase::OptionConfig> { | |
31 | public: | |
32 | DBTestCompactionFilterWithCompactParam() : DBTestCompactionFilter() { | |
33 | option_config_ = GetParam(); | |
34 | Destroy(last_options_); | |
35 | auto options = CurrentOptions(); | |
36 | if (option_config_ == kDefault || option_config_ == kUniversalCompaction || | |
37 | option_config_ == kUniversalCompactionMultiLevel) { | |
38 | options.create_if_missing = true; | |
39 | } | |
40 | if (option_config_ == kLevelSubcompactions || | |
41 | option_config_ == kUniversalSubcompactions) { | |
42 | assert(options.max_subcompactions > 1); | |
43 | } | |
44 | TryReopen(options); | |
45 | } | |
46 | }; | |
47 | ||
48 | #ifndef ROCKSDB_VALGRIND_RUN | |
49 | INSTANTIATE_TEST_CASE_P( | |
f67539c2 | 50 | CompactionFilterWithOption, DBTestCompactionFilterWithCompactParam, |
11fdf7f2 TL |
51 | ::testing::Values(DBTestBase::OptionConfig::kDefault, |
52 | DBTestBase::OptionConfig::kUniversalCompaction, | |
53 | DBTestBase::OptionConfig::kUniversalCompactionMultiLevel, | |
54 | DBTestBase::OptionConfig::kLevelSubcompactions, | |
55 | DBTestBase::OptionConfig::kUniversalSubcompactions)); | |
56 | #else | |
57 | // Run fewer cases in valgrind | |
f67539c2 | 58 | INSTANTIATE_TEST_CASE_P(CompactionFilterWithOption, |
11fdf7f2 TL |
59 | DBTestCompactionFilterWithCompactParam, |
60 | ::testing::Values(DBTestBase::OptionConfig::kDefault)); | |
61 | #endif // ROCKSDB_VALGRIND_RUN | |
62 | ||
7c673cae FG |
63 | class KeepFilter : public CompactionFilter { |
64 | public: | |
494da23a TL |
65 | bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, |
66 | std::string* /*new_value*/, | |
67 | bool* /*value_changed*/) const override { | |
7c673cae FG |
68 | cfilter_count++; |
69 | return false; | |
70 | } | |
71 | ||
494da23a | 72 | const char* Name() const override { return "KeepFilter"; } |
7c673cae FG |
73 | }; |
74 | ||
75 | class DeleteFilter : public CompactionFilter { | |
76 | public: | |
494da23a TL |
77 | bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, |
78 | std::string* /*new_value*/, | |
79 | bool* /*value_changed*/) const override { | |
7c673cae FG |
80 | cfilter_count++; |
81 | return true; | |
82 | } | |
83 | ||
494da23a | 84 | const char* Name() const override { return "DeleteFilter"; } |
7c673cae FG |
85 | }; |
86 | ||
87 | class DeleteISFilter : public CompactionFilter { | |
88 | public: | |
494da23a TL |
89 | bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/, |
90 | std::string* /*new_value*/, | |
91 | bool* /*value_changed*/) const override { | |
7c673cae FG |
92 | cfilter_count++; |
93 | int i = std::stoi(key.ToString()); | |
94 | if (i > 5 && i <= 105) { | |
95 | return true; | |
96 | } | |
97 | return false; | |
98 | } | |
99 | ||
494da23a | 100 | bool IgnoreSnapshots() const override { return true; } |
7c673cae | 101 | |
494da23a | 102 | const char* Name() const override { return "DeleteFilter"; } |
7c673cae FG |
103 | }; |
104 | ||
105 | // Skip x if floor(x/10) is even, use range skips. Requires that keys are | |
106 | // zero-padded to length 10. | |
107 | class SkipEvenFilter : public CompactionFilter { | |
108 | public: | |
494da23a TL |
109 | Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/, |
110 | const Slice& /*existing_value*/, std::string* /*new_value*/, | |
111 | std::string* skip_until) const override { | |
7c673cae FG |
112 | cfilter_count++; |
113 | int i = std::stoi(key.ToString()); | |
114 | if (i / 10 % 2 == 0) { | |
115 | char key_str[100]; | |
11fdf7f2 | 116 | snprintf(key_str, sizeof(key_str), "%010d", i / 10 * 10 + 10); |
7c673cae FG |
117 | *skip_until = key_str; |
118 | ++cfilter_skips; | |
119 | return Decision::kRemoveAndSkipUntil; | |
120 | } | |
121 | return Decision::kKeep; | |
122 | } | |
123 | ||
494da23a | 124 | bool IgnoreSnapshots() const override { return true; } |
7c673cae | 125 | |
494da23a | 126 | const char* Name() const override { return "DeleteFilter"; } |
7c673cae FG |
127 | }; |
128 | ||
129 | class DelayFilter : public CompactionFilter { | |
130 | public: | |
131 | explicit DelayFilter(DBTestBase* d) : db_test(d) {} | |
494da23a TL |
132 | bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, |
133 | std::string* /*new_value*/, | |
134 | bool* /*value_changed*/) const override { | |
7c673cae FG |
135 | db_test->env_->addon_time_.fetch_add(1000); |
136 | return true; | |
137 | } | |
138 | ||
494da23a | 139 | const char* Name() const override { return "DelayFilter"; } |
7c673cae FG |
140 | |
141 | private: | |
142 | DBTestBase* db_test; | |
143 | }; | |
144 | ||
145 | class ConditionalFilter : public CompactionFilter { | |
146 | public: | |
147 | explicit ConditionalFilter(const std::string* filtered_value) | |
148 | : filtered_value_(filtered_value) {} | |
494da23a TL |
149 | bool Filter(int /*level*/, const Slice& /*key*/, const Slice& value, |
150 | std::string* /*new_value*/, | |
151 | bool* /*value_changed*/) const override { | |
7c673cae FG |
152 | return value.ToString() == *filtered_value_; |
153 | } | |
154 | ||
494da23a | 155 | const char* Name() const override { return "ConditionalFilter"; } |
7c673cae FG |
156 | |
157 | private: | |
158 | const std::string* filtered_value_; | |
159 | }; | |
160 | ||
161 | class ChangeFilter : public CompactionFilter { | |
162 | public: | |
163 | explicit ChangeFilter() {} | |
164 | ||
494da23a TL |
165 | bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, |
166 | std::string* new_value, bool* value_changed) const override { | |
7c673cae FG |
167 | assert(new_value != nullptr); |
168 | *new_value = NEW_VALUE; | |
169 | *value_changed = true; | |
170 | return false; | |
171 | } | |
172 | ||
494da23a | 173 | const char* Name() const override { return "ChangeFilter"; } |
7c673cae FG |
174 | }; |
175 | ||
176 | class KeepFilterFactory : public CompactionFilterFactory { | |
177 | public: | |
178 | explicit KeepFilterFactory(bool check_context = false, | |
179 | bool check_context_cf_id = false) | |
180 | : check_context_(check_context), | |
181 | check_context_cf_id_(check_context_cf_id), | |
182 | compaction_filter_created_(false) {} | |
183 | ||
494da23a | 184 | std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
7c673cae FG |
185 | const CompactionFilter::Context& context) override { |
186 | if (check_context_) { | |
187 | EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction); | |
188 | EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction); | |
189 | } | |
190 | if (check_context_cf_id_) { | |
191 | EXPECT_EQ(expect_cf_id_.load(), context.column_family_id); | |
192 | } | |
193 | compaction_filter_created_ = true; | |
194 | return std::unique_ptr<CompactionFilter>(new KeepFilter()); | |
195 | } | |
196 | ||
197 | bool compaction_filter_created() const { return compaction_filter_created_; } | |
198 | ||
494da23a | 199 | const char* Name() const override { return "KeepFilterFactory"; } |
7c673cae FG |
200 | bool check_context_; |
201 | bool check_context_cf_id_; | |
202 | std::atomic_bool expect_full_compaction_; | |
203 | std::atomic_bool expect_manual_compaction_; | |
204 | std::atomic<uint32_t> expect_cf_id_; | |
205 | bool compaction_filter_created_; | |
206 | }; | |
207 | ||
208 | class DeleteFilterFactory : public CompactionFilterFactory { | |
209 | public: | |
494da23a | 210 | std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
7c673cae FG |
211 | const CompactionFilter::Context& context) override { |
212 | if (context.is_manual_compaction) { | |
213 | return std::unique_ptr<CompactionFilter>(new DeleteFilter()); | |
214 | } else { | |
215 | return std::unique_ptr<CompactionFilter>(nullptr); | |
216 | } | |
217 | } | |
218 | ||
494da23a | 219 | const char* Name() const override { return "DeleteFilterFactory"; } |
7c673cae FG |
220 | }; |
221 | ||
222 | // Delete Filter Factory which ignores snapshots | |
223 | class DeleteISFilterFactory : public CompactionFilterFactory { | |
224 | public: | |
494da23a | 225 | std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
7c673cae FG |
226 | const CompactionFilter::Context& context) override { |
227 | if (context.is_manual_compaction) { | |
228 | return std::unique_ptr<CompactionFilter>(new DeleteISFilter()); | |
229 | } else { | |
230 | return std::unique_ptr<CompactionFilter>(nullptr); | |
231 | } | |
232 | } | |
233 | ||
494da23a | 234 | const char* Name() const override { return "DeleteFilterFactory"; } |
7c673cae FG |
235 | }; |
236 | ||
237 | class SkipEvenFilterFactory : public CompactionFilterFactory { | |
238 | public: | |
494da23a | 239 | std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
7c673cae FG |
240 | const CompactionFilter::Context& context) override { |
241 | if (context.is_manual_compaction) { | |
242 | return std::unique_ptr<CompactionFilter>(new SkipEvenFilter()); | |
243 | } else { | |
244 | return std::unique_ptr<CompactionFilter>(nullptr); | |
245 | } | |
246 | } | |
247 | ||
494da23a | 248 | const char* Name() const override { return "SkipEvenFilterFactory"; } |
7c673cae FG |
249 | }; |
250 | ||
251 | class DelayFilterFactory : public CompactionFilterFactory { | |
252 | public: | |
253 | explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {} | |
494da23a | 254 | std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
11fdf7f2 | 255 | const CompactionFilter::Context& /*context*/) override { |
7c673cae FG |
256 | return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test)); |
257 | } | |
258 | ||
494da23a | 259 | const char* Name() const override { return "DelayFilterFactory"; } |
7c673cae FG |
260 | |
261 | private: | |
262 | DBTestBase* db_test; | |
263 | }; | |
264 | ||
265 | class ConditionalFilterFactory : public CompactionFilterFactory { | |
266 | public: | |
267 | explicit ConditionalFilterFactory(const Slice& filtered_value) | |
268 | : filtered_value_(filtered_value.ToString()) {} | |
269 | ||
494da23a | 270 | std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
11fdf7f2 | 271 | const CompactionFilter::Context& /*context*/) override { |
7c673cae FG |
272 | return std::unique_ptr<CompactionFilter>( |
273 | new ConditionalFilter(&filtered_value_)); | |
274 | } | |
275 | ||
494da23a | 276 | const char* Name() const override { return "ConditionalFilterFactory"; } |
7c673cae FG |
277 | |
278 | private: | |
279 | std::string filtered_value_; | |
280 | }; | |
281 | ||
282 | class ChangeFilterFactory : public CompactionFilterFactory { | |
283 | public: | |
284 | explicit ChangeFilterFactory() {} | |
285 | ||
494da23a | 286 | std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
11fdf7f2 | 287 | const CompactionFilter::Context& /*context*/) override { |
7c673cae FG |
288 | return std::unique_ptr<CompactionFilter>(new ChangeFilter()); |
289 | } | |
290 | ||
494da23a | 291 | const char* Name() const override { return "ChangeFilterFactory"; } |
7c673cae FG |
292 | }; |
293 | ||
294 | #ifndef ROCKSDB_LITE | |
295 | TEST_F(DBTestCompactionFilter, CompactionFilter) { | |
296 | Options options = CurrentOptions(); | |
297 | options.max_open_files = -1; | |
298 | options.num_levels = 3; | |
299 | options.compaction_filter_factory = std::make_shared<KeepFilterFactory>(); | |
300 | options = CurrentOptions(options); | |
301 | CreateAndReopenWithCF({"pikachu"}, options); | |
302 | ||
303 | // Write 100K keys, these are written to a few files in L0. | |
304 | const std::string value(10, 'x'); | |
305 | for (int i = 0; i < 100000; i++) { | |
306 | char key[100]; | |
307 | snprintf(key, sizeof(key), "B%010d", i); | |
308 | Put(1, key, value); | |
309 | } | |
310 | ASSERT_OK(Flush(1)); | |
311 | ||
312 | // Push all files to the highest level L2. Verify that | |
313 | // the compaction is each level invokes the filter for | |
314 | // all the keys in that level. | |
315 | cfilter_count = 0; | |
316 | dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); | |
317 | ASSERT_EQ(cfilter_count, 100000); | |
318 | cfilter_count = 0; | |
319 | dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); | |
320 | ASSERT_EQ(cfilter_count, 100000); | |
321 | ||
322 | ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); | |
323 | ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); | |
324 | ASSERT_NE(NumTableFilesAtLevel(2, 1), 0); | |
325 | cfilter_count = 0; | |
326 | ||
327 | // All the files are in the lowest level. | |
328 | // Verify that all but the 100001st record | |
329 | // has sequence number zero. The 100001st record | |
330 | // is at the tip of this snapshot and cannot | |
331 | // be zeroed out. | |
332 | int count = 0; | |
333 | int total = 0; | |
334 | Arena arena; | |
335 | { | |
336 | InternalKeyComparator icmp(options.comparator); | |
494da23a TL |
337 | ReadRangeDelAggregator range_del_agg(&icmp, |
338 | kMaxSequenceNumber /* upper_bound */); | |
339 | ScopedArenaIterator iter(dbfull()->NewInternalIterator( | |
340 | &arena, &range_del_agg, kMaxSequenceNumber, handles_[1])); | |
7c673cae FG |
341 | iter->SeekToFirst(); |
342 | ASSERT_OK(iter->status()); | |
343 | while (iter->Valid()) { | |
344 | ParsedInternalKey ikey(Slice(), 0, kTypeValue); | |
7c673cae FG |
345 | ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); |
346 | total++; | |
347 | if (ikey.sequence != 0) { | |
348 | count++; | |
349 | } | |
350 | iter->Next(); | |
351 | } | |
352 | } | |
353 | ASSERT_EQ(total, 100000); | |
494da23a | 354 | ASSERT_EQ(count, 0); |
7c673cae FG |
355 | |
356 | // overwrite all the 100K keys once again. | |
357 | for (int i = 0; i < 100000; i++) { | |
358 | char key[100]; | |
359 | snprintf(key, sizeof(key), "B%010d", i); | |
360 | ASSERT_OK(Put(1, key, value)); | |
361 | } | |
362 | ASSERT_OK(Flush(1)); | |
363 | ||
364 | // push all files to the highest level L2. This | |
365 | // means that all keys should pass at least once | |
366 | // via the compaction filter | |
367 | cfilter_count = 0; | |
368 | dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); | |
369 | ASSERT_EQ(cfilter_count, 100000); | |
370 | cfilter_count = 0; | |
371 | dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); | |
372 | ASSERT_EQ(cfilter_count, 100000); | |
373 | ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); | |
374 | ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); | |
375 | ASSERT_NE(NumTableFilesAtLevel(2, 1), 0); | |
376 | ||
377 | // create a new database with the compaction | |
378 | // filter in such a way that it deletes all keys | |
379 | options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(); | |
380 | options.create_if_missing = true; | |
381 | DestroyAndReopen(options); | |
382 | CreateAndReopenWithCF({"pikachu"}, options); | |
383 | ||
384 | // write all the keys once again. | |
385 | for (int i = 0; i < 100000; i++) { | |
386 | char key[100]; | |
387 | snprintf(key, sizeof(key), "B%010d", i); | |
388 | ASSERT_OK(Put(1, key, value)); | |
389 | } | |
390 | ASSERT_OK(Flush(1)); | |
391 | ASSERT_NE(NumTableFilesAtLevel(0, 1), 0); | |
392 | ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); | |
393 | ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0); | |
394 | ||
395 | // Push all files to the highest level L2. This | |
396 | // triggers the compaction filter to delete all keys, | |
397 | // verify that at the end of the compaction process, | |
398 | // nothing is left. | |
399 | cfilter_count = 0; | |
400 | dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); | |
401 | ASSERT_EQ(cfilter_count, 100000); | |
402 | cfilter_count = 0; | |
403 | dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); | |
404 | ASSERT_EQ(cfilter_count, 0); | |
405 | ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); | |
406 | ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); | |
407 | ||
408 | { | |
409 | // Scan the entire database to ensure that nothing is left | |
410 | std::unique_ptr<Iterator> iter( | |
411 | db_->NewIterator(ReadOptions(), handles_[1])); | |
412 | iter->SeekToFirst(); | |
413 | count = 0; | |
414 | while (iter->Valid()) { | |
415 | count++; | |
416 | iter->Next(); | |
417 | } | |
418 | ASSERT_EQ(count, 0); | |
419 | } | |
420 | ||
421 | // The sequence number of the remaining record | |
422 | // is not zeroed out even though it is at the | |
423 | // level Lmax because this record is at the tip | |
424 | count = 0; | |
425 | { | |
426 | InternalKeyComparator icmp(options.comparator); | |
494da23a TL |
427 | ReadRangeDelAggregator range_del_agg(&icmp, |
428 | kMaxSequenceNumber /* upper_bound */); | |
429 | ScopedArenaIterator iter(dbfull()->NewInternalIterator( | |
430 | &arena, &range_del_agg, kMaxSequenceNumber, handles_[1])); | |
7c673cae FG |
431 | iter->SeekToFirst(); |
432 | ASSERT_OK(iter->status()); | |
433 | while (iter->Valid()) { | |
434 | ParsedInternalKey ikey(Slice(), 0, kTypeValue); | |
435 | ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); | |
436 | ASSERT_NE(ikey.sequence, (unsigned)0); | |
437 | count++; | |
438 | iter->Next(); | |
439 | } | |
440 | ASSERT_EQ(count, 0); | |
441 | } | |
442 | } | |
443 | ||
444 | // Tests the edge case where compaction does not produce any output -- all | |
445 | // entries are deleted. The compaction should create bunch of 'DeleteFile' | |
446 | // entries in VersionEdit, but none of the 'AddFile's. | |
447 | TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) { | |
448 | Options options = CurrentOptions(); | |
449 | options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(); | |
450 | options.disable_auto_compactions = true; | |
451 | options.create_if_missing = true; | |
452 | DestroyAndReopen(options); | |
453 | ||
454 | // put some data | |
455 | for (int table = 0; table < 4; ++table) { | |
456 | for (int i = 0; i < 10 + table; ++i) { | |
457 | Put(ToString(table * 100 + i), "val"); | |
458 | } | |
459 | Flush(); | |
460 | } | |
461 | ||
462 | // this will produce empty file (delete compaction filter) | |
463 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
464 | ASSERT_EQ(0U, CountLiveFiles()); | |
465 | ||
466 | Reopen(options); | |
467 | ||
468 | Iterator* itr = db_->NewIterator(ReadOptions()); | |
469 | itr->SeekToFirst(); | |
470 | // empty db | |
471 | ASSERT_TRUE(!itr->Valid()); | |
472 | ||
473 | delete itr; | |
474 | } | |
475 | #endif // ROCKSDB_LITE | |
476 | ||
11fdf7f2 TL |
477 | TEST_P(DBTestCompactionFilterWithCompactParam, |
478 | CompactionFilterWithValueChange) { | |
479 | Options options = CurrentOptions(); | |
480 | options.num_levels = 3; | |
481 | options.compaction_filter_factory = std::make_shared<ChangeFilterFactory>(); | |
482 | CreateAndReopenWithCF({"pikachu"}, options); | |
7c673cae | 483 | |
11fdf7f2 TL |
484 | // Write 100K+1 keys, these are written to a few files |
485 | // in L0. We do this so that the current snapshot points | |
486 | // to the 100001 key.The compaction filter is not invoked | |
487 | // on keys that are visible via a snapshot because we | |
488 | // anyways cannot delete it. | |
489 | const std::string value(10, 'x'); | |
490 | for (int i = 0; i < 100001; i++) { | |
491 | char key[100]; | |
492 | snprintf(key, sizeof(key), "B%010d", i); | |
493 | Put(1, key, value); | |
494 | } | |
7c673cae | 495 | |
11fdf7f2 TL |
496 | // push all files to lower levels |
497 | ASSERT_OK(Flush(1)); | |
498 | if (option_config_ != kUniversalCompactionMultiLevel && | |
499 | option_config_ != kUniversalSubcompactions) { | |
500 | dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); | |
501 | dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); | |
502 | } else { | |
503 | dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, | |
504 | nullptr); | |
505 | } | |
7c673cae | 506 | |
11fdf7f2 TL |
507 | // re-write all data again |
508 | for (int i = 0; i < 100001; i++) { | |
509 | char key[100]; | |
510 | snprintf(key, sizeof(key), "B%010d", i); | |
511 | Put(1, key, value); | |
512 | } | |
7c673cae | 513 | |
11fdf7f2 TL |
514 | // push all files to lower levels. This should |
515 | // invoke the compaction filter for all 100000 keys. | |
516 | ASSERT_OK(Flush(1)); | |
517 | if (option_config_ != kUniversalCompactionMultiLevel && | |
518 | option_config_ != kUniversalSubcompactions) { | |
519 | dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); | |
520 | dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); | |
521 | } else { | |
522 | dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, | |
523 | nullptr); | |
524 | } | |
525 | ||
526 | // verify that all keys now have the new value that | |
527 | // was set by the compaction process. | |
528 | for (int i = 0; i < 100001; i++) { | |
529 | char key[100]; | |
530 | snprintf(key, sizeof(key), "B%010d", i); | |
531 | std::string newvalue = Get(1, key); | |
532 | ASSERT_EQ(newvalue.compare(NEW_VALUE), 0); | |
533 | } | |
7c673cae FG |
534 | } |
535 | ||
536 | TEST_F(DBTestCompactionFilter, CompactionFilterWithMergeOperator) { | |
537 | std::string one, two, three, four; | |
538 | PutFixed64(&one, 1); | |
539 | PutFixed64(&two, 2); | |
540 | PutFixed64(&three, 3); | |
541 | PutFixed64(&four, 4); | |
542 | ||
543 | Options options = CurrentOptions(); | |
544 | options.create_if_missing = true; | |
545 | options.merge_operator = MergeOperators::CreateUInt64AddOperator(); | |
546 | options.num_levels = 3; | |
547 | // Filter out keys with value is 2. | |
548 | options.compaction_filter_factory = | |
549 | std::make_shared<ConditionalFilterFactory>(two); | |
550 | DestroyAndReopen(options); | |
551 | ||
552 | // In the same compaction, a value type needs to be deleted based on | |
553 | // compaction filter, and there is a merge type for the key. compaction | |
554 | // filter result is ignored. | |
555 | ASSERT_OK(db_->Put(WriteOptions(), "foo", two)); | |
556 | ASSERT_OK(Flush()); | |
557 | ASSERT_OK(db_->Merge(WriteOptions(), "foo", one)); | |
558 | ASSERT_OK(Flush()); | |
559 | std::string newvalue = Get("foo"); | |
560 | ASSERT_EQ(newvalue, three); | |
561 | dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
562 | newvalue = Get("foo"); | |
563 | ASSERT_EQ(newvalue, three); | |
564 | ||
565 | // value key can be deleted based on compaction filter, leaving only | |
566 | // merge keys. | |
567 | ASSERT_OK(db_->Put(WriteOptions(), "bar", two)); | |
568 | ASSERT_OK(Flush()); | |
569 | dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
570 | newvalue = Get("bar"); | |
571 | ASSERT_EQ("NOT_FOUND", newvalue); | |
572 | ASSERT_OK(db_->Merge(WriteOptions(), "bar", two)); | |
573 | ASSERT_OK(Flush()); | |
574 | dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
575 | newvalue = Get("bar"); | |
576 | ASSERT_EQ(two, two); | |
577 | ||
578 | // Compaction filter never applies to merge keys. | |
579 | ASSERT_OK(db_->Put(WriteOptions(), "foobar", one)); | |
580 | ASSERT_OK(Flush()); | |
581 | ASSERT_OK(db_->Merge(WriteOptions(), "foobar", two)); | |
582 | ASSERT_OK(Flush()); | |
583 | newvalue = Get("foobar"); | |
584 | ASSERT_EQ(newvalue, three); | |
585 | dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
586 | newvalue = Get("foobar"); | |
587 | ASSERT_EQ(newvalue, three); | |
588 | ||
589 | // In the same compaction, both of value type and merge type keys need to be | |
590 | // deleted based on compaction filter, and there is a merge type for the key. | |
591 | // For both keys, compaction filter results are ignored. | |
592 | ASSERT_OK(db_->Put(WriteOptions(), "barfoo", two)); | |
593 | ASSERT_OK(Flush()); | |
594 | ASSERT_OK(db_->Merge(WriteOptions(), "barfoo", two)); | |
595 | ASSERT_OK(Flush()); | |
596 | newvalue = Get("barfoo"); | |
597 | ASSERT_EQ(newvalue, four); | |
598 | dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
599 | newvalue = Get("barfoo"); | |
600 | ASSERT_EQ(newvalue, four); | |
601 | } | |
602 | ||
603 | #ifndef ROCKSDB_LITE | |
604 | TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) { | |
605 | KeepFilterFactory* filter = new KeepFilterFactory(true, true); | |
606 | ||
607 | Options options = CurrentOptions(); | |
608 | options.compaction_style = kCompactionStyleUniversal; | |
609 | options.compaction_filter_factory.reset(filter); | |
610 | options.compression = kNoCompression; | |
611 | options.level0_file_num_compaction_trigger = 8; | |
612 | Reopen(options); | |
613 | int num_keys_per_file = 400; | |
614 | for (int j = 0; j < 3; j++) { | |
615 | // Write several keys. | |
616 | const std::string value(10, 'x'); | |
617 | for (int i = 0; i < num_keys_per_file; i++) { | |
618 | char key[100]; | |
619 | snprintf(key, sizeof(key), "B%08d%02d", i, j); | |
620 | Put(key, value); | |
621 | } | |
622 | dbfull()->TEST_FlushMemTable(); | |
623 | // Make sure next file is much smaller so automatic compaction will not | |
624 | // be triggered. | |
625 | num_keys_per_file /= 2; | |
626 | } | |
627 | dbfull()->TEST_WaitForCompact(); | |
628 | ||
629 | // Force a manual compaction | |
630 | cfilter_count = 0; | |
631 | filter->expect_manual_compaction_.store(true); | |
632 | filter->expect_full_compaction_.store(true); | |
633 | filter->expect_cf_id_.store(0); | |
634 | dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
635 | ASSERT_EQ(cfilter_count, 700); | |
636 | ASSERT_EQ(NumSortedRuns(0), 1); | |
637 | ASSERT_TRUE(filter->compaction_filter_created()); | |
638 | ||
639 | // Verify total number of keys is correct after manual compaction. | |
640 | { | |
641 | int count = 0; | |
642 | int total = 0; | |
643 | Arena arena; | |
644 | InternalKeyComparator icmp(options.comparator); | |
494da23a TL |
645 | ReadRangeDelAggregator range_del_agg(&icmp, |
646 | kMaxSequenceNumber /* snapshots */); | |
647 | ScopedArenaIterator iter(dbfull()->NewInternalIterator( | |
648 | &arena, &range_del_agg, kMaxSequenceNumber)); | |
7c673cae FG |
649 | iter->SeekToFirst(); |
650 | ASSERT_OK(iter->status()); | |
651 | while (iter->Valid()) { | |
652 | ParsedInternalKey ikey(Slice(), 0, kTypeValue); | |
7c673cae FG |
653 | ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); |
654 | total++; | |
655 | if (ikey.sequence != 0) { | |
656 | count++; | |
657 | } | |
658 | iter->Next(); | |
659 | } | |
660 | ASSERT_EQ(total, 700); | |
494da23a | 661 | ASSERT_EQ(count, 0); |
7c673cae FG |
662 | } |
663 | } | |
664 | #endif // ROCKSDB_LITE | |
665 | ||
666 | TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) { | |
667 | KeepFilterFactory* filter = new KeepFilterFactory(false, true); | |
668 | filter->expect_cf_id_.store(1); | |
669 | ||
670 | Options options = CurrentOptions(); | |
671 | options.compaction_filter_factory.reset(filter); | |
672 | options.compression = kNoCompression; | |
673 | options.level0_file_num_compaction_trigger = 2; | |
674 | CreateAndReopenWithCF({"pikachu"}, options); | |
675 | ||
676 | int num_keys_per_file = 400; | |
677 | for (int j = 0; j < 3; j++) { | |
678 | // Write several keys. | |
679 | const std::string value(10, 'x'); | |
680 | for (int i = 0; i < num_keys_per_file; i++) { | |
681 | char key[100]; | |
682 | snprintf(key, sizeof(key), "B%08d%02d", i, j); | |
683 | Put(1, key, value); | |
684 | } | |
685 | Flush(1); | |
686 | // Make sure next file is much smaller so automatic compaction will not | |
687 | // be triggered. | |
688 | num_keys_per_file /= 2; | |
689 | } | |
690 | dbfull()->TEST_WaitForCompact(); | |
691 | ||
692 | ASSERT_TRUE(filter->compaction_filter_created()); | |
693 | } | |
694 | ||
695 | #ifndef ROCKSDB_LITE | |
494da23a | 696 | // Compaction filters aplies to all records, regardless snapshots. |
7c673cae FG |
697 | TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) { |
698 | std::string five = ToString(5); | |
699 | Options options = CurrentOptions(); | |
700 | options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>(); | |
701 | options.disable_auto_compactions = true; | |
702 | options.create_if_missing = true; | |
703 | DestroyAndReopen(options); | |
704 | ||
705 | // Put some data. | |
706 | const Snapshot* snapshot = nullptr; | |
707 | for (int table = 0; table < 4; ++table) { | |
708 | for (int i = 0; i < 10; ++i) { | |
709 | Put(ToString(table * 100 + i), "val"); | |
710 | } | |
711 | Flush(); | |
712 | ||
713 | if (table == 0) { | |
714 | snapshot = db_->GetSnapshot(); | |
715 | } | |
716 | } | |
717 | assert(snapshot != nullptr); | |
718 | ||
719 | cfilter_count = 0; | |
720 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
721 | // The filter should delete 40 records. | |
f67539c2 | 722 | ASSERT_EQ(40, cfilter_count); |
7c673cae FG |
723 | |
724 | { | |
725 | // Scan the entire database as of the snapshot to ensure | |
726 | // that nothing is left | |
727 | ReadOptions read_options; | |
728 | read_options.snapshot = snapshot; | |
729 | std::unique_ptr<Iterator> iter(db_->NewIterator(read_options)); | |
730 | iter->SeekToFirst(); | |
731 | int count = 0; | |
732 | while (iter->Valid()) { | |
733 | count++; | |
734 | iter->Next(); | |
735 | } | |
736 | ASSERT_EQ(count, 6); | |
11fdf7f2 | 737 | read_options.snapshot = nullptr; |
7c673cae FG |
738 | std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options)); |
739 | iter1->SeekToFirst(); | |
740 | count = 0; | |
741 | while (iter1->Valid()) { | |
742 | count++; | |
743 | iter1->Next(); | |
744 | } | |
745 | // We have deleted 10 keys from 40 using the compaction filter | |
746 | // Keys 6-9 before the snapshot and 100-105 after the snapshot | |
747 | ASSERT_EQ(count, 30); | |
748 | } | |
749 | ||
750 | // Release the snapshot and compact again -> now all records should be | |
751 | // removed. | |
752 | db_->ReleaseSnapshot(snapshot); | |
753 | } | |
754 | #endif // ROCKSDB_LITE | |
755 | ||
756 | TEST_F(DBTestCompactionFilter, SkipUntil) { | |
757 | Options options = CurrentOptions(); | |
758 | options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>(); | |
759 | options.disable_auto_compactions = true; | |
760 | options.create_if_missing = true; | |
761 | DestroyAndReopen(options); | |
762 | ||
763 | // Write 100K keys, these are written to a few files in L0. | |
764 | for (int table = 0; table < 4; ++table) { | |
765 | // Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371]. | |
766 | for (int i = table * 6; i < 39 + table * 11; ++i) { | |
767 | char key[100]; | |
768 | snprintf(key, sizeof(key), "%010d", table * 100 + i); | |
769 | Put(key, std::to_string(table * 1000 + i)); | |
770 | } | |
771 | Flush(); | |
772 | } | |
773 | ||
774 | cfilter_skips = 0; | |
775 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
11fdf7f2 | 776 | // Number of skips in tables: 2, 3, 3, 3. |
7c673cae FG |
777 | ASSERT_EQ(11, cfilter_skips); |
778 | ||
779 | for (int table = 0; table < 4; ++table) { | |
780 | for (int i = table * 6; i < 39 + table * 11; ++i) { | |
781 | int k = table * 100 + i; | |
782 | char key[100]; | |
783 | snprintf(key, sizeof(key), "%010d", table * 100 + i); | |
784 | auto expected = std::to_string(table * 1000 + i); | |
785 | std::string val; | |
786 | Status s = db_->Get(ReadOptions(), key, &val); | |
787 | if (k / 10 % 2 == 0) { | |
788 | ASSERT_TRUE(s.IsNotFound()); | |
789 | } else { | |
790 | ASSERT_OK(s); | |
791 | ASSERT_EQ(expected, val); | |
792 | } | |
793 | } | |
794 | } | |
795 | } | |
796 | ||
11fdf7f2 TL |
797 | TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) { |
798 | BlockBasedTableOptions table_options; | |
799 | table_options.whole_key_filtering = false; | |
800 | table_options.filter_policy.reset(NewBloomFilterPolicy(100, false)); | |
801 | ||
802 | Options options = CurrentOptions(); | |
803 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
804 | options.prefix_extractor.reset(NewCappedPrefixTransform(9)); | |
805 | options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>(); | |
806 | options.disable_auto_compactions = true; | |
807 | options.create_if_missing = true; | |
808 | DestroyAndReopen(options); | |
809 | ||
810 | Put("0000000010", "v10"); | |
494da23a | 811 | Put("0000000020", "v20"); // skipped |
11fdf7f2 TL |
812 | Put("0000000050", "v50"); |
813 | Flush(); | |
814 | ||
815 | cfilter_skips = 0; | |
816 | EXPECT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
817 | EXPECT_EQ(1, cfilter_skips); | |
818 | ||
819 | Status s; | |
820 | std::string val; | |
821 | ||
822 | s = db_->Get(ReadOptions(), "0000000010", &val); | |
823 | ASSERT_OK(s); | |
824 | EXPECT_EQ("v10", val); | |
825 | ||
826 | s = db_->Get(ReadOptions(), "0000000020", &val); | |
827 | EXPECT_TRUE(s.IsNotFound()); | |
828 | ||
829 | s = db_->Get(ReadOptions(), "0000000050", &val); | |
830 | ASSERT_OK(s); | |
831 | EXPECT_EQ("v50", val); | |
832 | } | |
833 | ||
494da23a TL |
834 | class TestNotSupportedFilter : public CompactionFilter { |
835 | public: | |
836 | bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, | |
837 | std::string* /*new_value*/, | |
838 | bool* /*value_changed*/) const override { | |
839 | return true; | |
840 | } | |
841 | ||
842 | const char* Name() const override { return "NotSupported"; } | |
843 | bool IgnoreSnapshots() const override { return false; } | |
844 | }; | |
845 | ||
846 | TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalse) { | |
847 | Options options = CurrentOptions(); | |
848 | options.compaction_filter = new TestNotSupportedFilter(); | |
849 | DestroyAndReopen(options); | |
850 | ||
851 | Put("a", "v10"); | |
852 | Put("z", "v20"); | |
853 | Flush(); | |
854 | ||
855 | Put("a", "v10"); | |
856 | Put("z", "v20"); | |
857 | Flush(); | |
858 | ||
859 | // Comapction should fail because IgnoreSnapshots() = false | |
860 | EXPECT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr) | |
861 | .IsNotSupported()); | |
862 | ||
863 | delete options.compaction_filter; | |
864 | } | |
865 | ||
f67539c2 | 866 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
867 | |
868 | int main(int argc, char** argv) { | |
f67539c2 | 869 | ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); |
7c673cae FG |
870 | ::testing::InitGoogleTest(&argc, argv); |
871 | return RUN_ALL_TESTS(); | |
872 | } |