]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction_iterator_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / compaction_iterator_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/compaction_iterator.h"
7
8 #include <string>
9 #include <vector>
10
11 #include "port/port.h"
12 #include "util/string_util.h"
13 #include "util/testharness.h"
14 #include "util/testutil.h"
15 #include "utilities/merge_operators.h"
16
17 namespace rocksdb {
18
19 // Expects no merging attempts.
20 class NoMergingMergeOp : public MergeOperator {
21 public:
22 bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
23 MergeOperationOutput* /*merge_out*/) const override {
24 ADD_FAILURE();
25 return false;
26 }
27 bool PartialMergeMulti(const Slice& /*key*/,
28 const std::deque<Slice>& /*operand_list*/,
29 std::string* /*new_value*/,
30 Logger* /*logger*/) const override {
31 ADD_FAILURE();
32 return false;
33 }
34 const char* Name() const override {
35 return "CompactionIteratorTest NoMergingMergeOp";
36 }
37 };
38
39 // Compaction filter that gets stuck when it sees a particular key,
40 // then gets unstuck when told to.
41 // Always returns Decition::kRemove.
42 class StallingFilter : public CompactionFilter {
43 public:
44 Decision FilterV2(int /*level*/, const Slice& key, ValueType /*type*/,
45 const Slice& /*existing_value*/, std::string* /*new_value*/,
46 std::string* /*skip_until*/) const override {
47 int k = std::atoi(key.ToString().c_str());
48 last_seen.store(k);
49 while (k >= stall_at.load()) {
50 std::this_thread::yield();
51 }
52 return Decision::kRemove;
53 }
54
55 const char* Name() const override {
56 return "CompactionIteratorTest StallingFilter";
57 }
58
59 // Wait until the filter sees a key >= k and stalls at that key.
60 // If `exact`, asserts that the seen key is equal to k.
61 void WaitForStall(int k, bool exact = true) {
62 stall_at.store(k);
63 while (last_seen.load() < k) {
64 std::this_thread::yield();
65 }
66 if (exact) {
67 EXPECT_EQ(k, last_seen.load());
68 }
69 }
70
71 // Filter will stall on key >= stall_at. Advance stall_at to unstall.
72 mutable std::atomic<int> stall_at{0};
73 // Last key the filter was called with.
74 mutable std::atomic<int> last_seen{0};
75 };
76
77 // Compaction filter that filter out all keys.
78 class FilterAllKeysCompactionFilter : public CompactionFilter {
79 public:
80 Decision FilterV2(int /*level*/, const Slice& /*key*/, ValueType /*type*/,
81 const Slice& /*existing_value*/, std::string* /*new_value*/,
82 std::string* /*skip_until*/) const override {
83 return Decision::kRemove;
84 }
85
86 const char* Name() const override { return "AllKeysCompactionFilter"; }
87 };
88
89 class LoggingForwardVectorIterator : public InternalIterator {
90 public:
91 struct Action {
92 enum class Type {
93 SEEK_TO_FIRST,
94 SEEK,
95 NEXT,
96 };
97
98 Type type;
99 std::string arg;
100
101 explicit Action(Type _type, std::string _arg = "")
102 : type(_type), arg(_arg) {}
103
104 bool operator==(const Action& rhs) const {
105 return std::tie(type, arg) == std::tie(rhs.type, rhs.arg);
106 }
107 };
108
109 LoggingForwardVectorIterator(const std::vector<std::string>& keys,
110 const std::vector<std::string>& values)
111 : keys_(keys), values_(values), current_(keys.size()) {
112 assert(keys_.size() == values_.size());
113 }
114
115 bool Valid() const override { return current_ < keys_.size(); }
116
117 void SeekToFirst() override {
118 log.emplace_back(Action::Type::SEEK_TO_FIRST);
119 current_ = 0;
120 }
121 void SeekToLast() override { assert(false); }
122
123 void Seek(const Slice& target) override {
124 log.emplace_back(Action::Type::SEEK, target.ToString());
125 current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) -
126 keys_.begin();
127 }
128
129 void SeekForPrev(const Slice& /*target*/) override { assert(false); }
130
131 void Next() override {
132 assert(Valid());
133 log.emplace_back(Action::Type::NEXT);
134 current_++;
135 }
136 void Prev() override { assert(false); }
137
138 Slice key() const override {
139 assert(Valid());
140 return Slice(keys_[current_]);
141 }
142 Slice value() const override {
143 assert(Valid());
144 return Slice(values_[current_]);
145 }
146
147 Status status() const override { return Status::OK(); }
148
149 std::vector<Action> log;
150
151 private:
152 std::vector<std::string> keys_;
153 std::vector<std::string> values_;
154 size_t current_;
155 };
156
157 class FakeCompaction : public CompactionIterator::CompactionProxy {
158 public:
159 FakeCompaction() = default;
160
161 int level(size_t /*compaction_input_level*/) const override { return 0; }
162 bool KeyNotExistsBeyondOutputLevel(
163 const Slice& /*user_key*/,
164 std::vector<size_t>* /*level_ptrs*/) const override {
165 return is_bottommost_level || key_not_exists_beyond_output_level;
166 }
167 bool bottommost_level() const override { return is_bottommost_level; }
168 int number_levels() const override { return 1; }
169 Slice GetLargestUserKey() const override {
170 return "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
171 }
172 bool allow_ingest_behind() const override { return false; }
173
174 bool preserve_deletes() const override { return false; }
175
176 bool key_not_exists_beyond_output_level = false;
177
178 bool is_bottommost_level = false;
179 };
180
181 // A simplifed snapshot checker which assumes each snapshot has a global
182 // last visible sequence.
183 class TestSnapshotChecker : public SnapshotChecker {
184 public:
185 explicit TestSnapshotChecker(
186 SequenceNumber last_committed_sequence,
187 const std::unordered_map<SequenceNumber, SequenceNumber>& snapshots = {})
188 : last_committed_sequence_(last_committed_sequence),
189 snapshots_(snapshots) {}
190
191 SnapshotCheckerResult CheckInSnapshot(
192 SequenceNumber seq, SequenceNumber snapshot_seq) const override {
193 if (snapshot_seq == kMaxSequenceNumber) {
194 return seq <= last_committed_sequence_
195 ? SnapshotCheckerResult::kInSnapshot
196 : SnapshotCheckerResult::kNotInSnapshot;
197 }
198 assert(snapshots_.count(snapshot_seq) > 0);
199 return seq <= snapshots_.at(snapshot_seq)
200 ? SnapshotCheckerResult::kInSnapshot
201 : SnapshotCheckerResult::kNotInSnapshot;
202 }
203
204 private:
205 SequenceNumber last_committed_sequence_;
206 // A map of valid snapshot to last visible sequence to the snapshot.
207 std::unordered_map<SequenceNumber, SequenceNumber> snapshots_;
208 };
209
210 // Test param:
211 // bool: whether to pass snapshot_checker to compaction iterator.
212 class CompactionIteratorTest : public testing::TestWithParam<bool> {
213 public:
214 CompactionIteratorTest()
215 : cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {}
216
217 void InitIterators(
218 const std::vector<std::string>& ks, const std::vector<std::string>& vs,
219 const std::vector<std::string>& range_del_ks,
220 const std::vector<std::string>& range_del_vs,
221 SequenceNumber last_sequence,
222 SequenceNumber last_committed_sequence = kMaxSequenceNumber,
223 MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr,
224 bool bottommost_level = false,
225 SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) {
226 std::unique_ptr<InternalIterator> unfragmented_range_del_iter(
227 new test::VectorIterator(range_del_ks, range_del_vs));
228 auto tombstone_list = std::make_shared<FragmentedRangeTombstoneList>(
229 std::move(unfragmented_range_del_iter), icmp_);
230 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
231 new FragmentedRangeTombstoneIterator(tombstone_list, icmp_,
232 kMaxSequenceNumber));
233 range_del_agg_.reset(new CompactionRangeDelAggregator(&icmp_, snapshots_));
234 range_del_agg_->AddTombstones(std::move(range_del_iter));
235
236 std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
237 if (filter || bottommost_level) {
238 compaction_proxy_ = new FakeCompaction();
239 compaction_proxy_->is_bottommost_level = bottommost_level;
240 compaction.reset(compaction_proxy_);
241 }
242 bool use_snapshot_checker = UseSnapshotChecker() || GetParam();
243 if (use_snapshot_checker || last_committed_sequence < kMaxSequenceNumber) {
244 snapshot_checker_.reset(
245 new TestSnapshotChecker(last_committed_sequence, snapshot_map_));
246 }
247 merge_helper_.reset(
248 new MergeHelper(Env::Default(), cmp_, merge_op, filter, nullptr, false,
249 0 /*latest_snapshot*/, snapshot_checker_.get(),
250 0 /*level*/, nullptr /*statistics*/, &shutting_down_));
251
252 iter_.reset(new LoggingForwardVectorIterator(ks, vs));
253 iter_->SeekToFirst();
254 c_iter_.reset(new CompactionIterator(
255 iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
256 earliest_write_conflict_snapshot, snapshot_checker_.get(),
257 Env::Default(), false /* report_detailed_time */, false,
258 range_del_agg_.get(), std::move(compaction), filter, &shutting_down_));
259 }
260
261 void AddSnapshot(SequenceNumber snapshot,
262 SequenceNumber last_visible_seq = kMaxSequenceNumber) {
263 snapshots_.push_back(snapshot);
264 snapshot_map_[snapshot] = last_visible_seq;
265 }
266
267 virtual bool UseSnapshotChecker() const { return false; }
268
269 void RunTest(
270 const std::vector<std::string>& input_keys,
271 const std::vector<std::string>& input_values,
272 const std::vector<std::string>& expected_keys,
273 const std::vector<std::string>& expected_values,
274 SequenceNumber last_committed_seq = kMaxSequenceNumber,
275 MergeOperator* merge_operator = nullptr,
276 CompactionFilter* compaction_filter = nullptr,
277 bool bottommost_level = false,
278 SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) {
279 InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber,
280 last_committed_seq, merge_operator, compaction_filter,
281 bottommost_level, earliest_write_conflict_snapshot);
282 c_iter_->SeekToFirst();
283 for (size_t i = 0; i < expected_keys.size(); i++) {
284 std::string info = "i = " + ToString(i);
285 ASSERT_TRUE(c_iter_->Valid()) << info;
286 ASSERT_OK(c_iter_->status()) << info;
287 ASSERT_EQ(expected_keys[i], c_iter_->key().ToString()) << info;
288 ASSERT_EQ(expected_values[i], c_iter_->value().ToString()) << info;
289 c_iter_->Next();
290 }
291 ASSERT_FALSE(c_iter_->Valid());
292 }
293
294 const Comparator* cmp_;
295 const InternalKeyComparator icmp_;
296 std::vector<SequenceNumber> snapshots_;
297 // A map of valid snapshot to last visible sequence to the snapshot.
298 std::unordered_map<SequenceNumber, SequenceNumber> snapshot_map_;
299 std::unique_ptr<MergeHelper> merge_helper_;
300 std::unique_ptr<LoggingForwardVectorIterator> iter_;
301 std::unique_ptr<CompactionIterator> c_iter_;
302 std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
303 std::unique_ptr<SnapshotChecker> snapshot_checker_;
304 std::atomic<bool> shutting_down_{false};
305 FakeCompaction* compaction_proxy_;
306 };
307
308 // It is possible that the output of the compaction iterator is empty even if
309 // the input is not.
310 TEST_P(CompactionIteratorTest, EmptyResult) {
311 InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
312 test::KeyStr("a", 3, kTypeValue)},
313 {"", "val"}, {}, {}, 5);
314 c_iter_->SeekToFirst();
315 ASSERT_FALSE(c_iter_->Valid());
316 }
317
318 // If there is a corruption after a single deletion, the corrupted key should
319 // be preserved.
320 TEST_P(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
321 InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
322 test::KeyStr("a", 3, kTypeValue, true),
323 test::KeyStr("b", 10, kTypeValue)},
324 {"", "val", "val2"}, {}, {}, 10);
325 c_iter_->SeekToFirst();
326 ASSERT_TRUE(c_iter_->Valid());
327 ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion),
328 c_iter_->key().ToString());
329 c_iter_->Next();
330 ASSERT_TRUE(c_iter_->Valid());
331 ASSERT_EQ(test::KeyStr("a", 3, kTypeValue, true), c_iter_->key().ToString());
332 c_iter_->Next();
333 ASSERT_TRUE(c_iter_->Valid());
334 ASSERT_EQ(test::KeyStr("b", 10, kTypeValue), c_iter_->key().ToString());
335 c_iter_->Next();
336 ASSERT_FALSE(c_iter_->Valid());
337 }
338
339 TEST_P(CompactionIteratorTest, SimpleRangeDeletion) {
340 InitIterators({test::KeyStr("morning", 5, kTypeValue),
341 test::KeyStr("morning", 2, kTypeValue),
342 test::KeyStr("night", 3, kTypeValue)},
343 {"zao", "zao", "wan"},
344 {test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 5);
345 c_iter_->SeekToFirst();
346 ASSERT_TRUE(c_iter_->Valid());
347 ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
348 c_iter_->Next();
349 ASSERT_TRUE(c_iter_->Valid());
350 ASSERT_EQ(test::KeyStr("night", 3, kTypeValue), c_iter_->key().ToString());
351 c_iter_->Next();
352 ASSERT_FALSE(c_iter_->Valid());
353 }
354
355 TEST_P(CompactionIteratorTest, RangeDeletionWithSnapshots) {
356 AddSnapshot(10);
357 std::vector<std::string> ks1;
358 ks1.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion));
359 std::vector<std::string> vs1{"mz"};
360 std::vector<std::string> ks2{test::KeyStr("morning", 15, kTypeValue),
361 test::KeyStr("morning", 5, kTypeValue),
362 test::KeyStr("night", 40, kTypeValue),
363 test::KeyStr("night", 20, kTypeValue)};
364 std::vector<std::string> vs2{"zao 15", "zao 5", "wan 40", "wan 20"};
365 InitIterators(ks2, vs2, ks1, vs1, 40);
366 c_iter_->SeekToFirst();
367 ASSERT_TRUE(c_iter_->Valid());
368 ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
369 c_iter_->Next();
370 ASSERT_TRUE(c_iter_->Valid());
371 ASSERT_EQ(test::KeyStr("night", 40, kTypeValue), c_iter_->key().ToString());
372 c_iter_->Next();
373 ASSERT_FALSE(c_iter_->Valid());
374 }
375
376 TEST_P(CompactionIteratorTest, CompactionFilterSkipUntil) {
377 class Filter : public CompactionFilter {
378 Decision FilterV2(int /*level*/, const Slice& key, ValueType t,
379 const Slice& existing_value, std::string* /*new_value*/,
380 std::string* skip_until) const override {
381 std::string k = key.ToString();
382 std::string v = existing_value.ToString();
383 // See InitIterators() call below for the sequence of keys and their
384 // filtering decisions. Here we closely assert that compaction filter is
385 // called with the expected keys and only them, and with the right values.
386 if (k == "a") {
387 EXPECT_EQ(ValueType::kValue, t);
388 EXPECT_EQ("av50", v);
389 return Decision::kKeep;
390 }
391 if (k == "b") {
392 EXPECT_EQ(ValueType::kValue, t);
393 EXPECT_EQ("bv60", v);
394 *skip_until = "d+";
395 return Decision::kRemoveAndSkipUntil;
396 }
397 if (k == "e") {
398 EXPECT_EQ(ValueType::kMergeOperand, t);
399 EXPECT_EQ("em71", v);
400 return Decision::kKeep;
401 }
402 if (k == "f") {
403 if (v == "fm65") {
404 EXPECT_EQ(ValueType::kMergeOperand, t);
405 *skip_until = "f";
406 } else {
407 EXPECT_EQ("fm30", v);
408 EXPECT_EQ(ValueType::kMergeOperand, t);
409 *skip_until = "g+";
410 }
411 return Decision::kRemoveAndSkipUntil;
412 }
413 if (k == "h") {
414 EXPECT_EQ(ValueType::kValue, t);
415 EXPECT_EQ("hv91", v);
416 return Decision::kKeep;
417 }
418 if (k == "i") {
419 EXPECT_EQ(ValueType::kMergeOperand, t);
420 EXPECT_EQ("im95", v);
421 *skip_until = "z";
422 return Decision::kRemoveAndSkipUntil;
423 }
424 ADD_FAILURE();
425 return Decision::kKeep;
426 }
427
428 const char* Name() const override {
429 return "CompactionIteratorTest.CompactionFilterSkipUntil::Filter";
430 }
431 };
432
433 NoMergingMergeOp merge_op;
434 Filter filter;
435 InitIterators(
436 {test::KeyStr("a", 50, kTypeValue), // keep
437 test::KeyStr("a", 45, kTypeMerge),
438 test::KeyStr("b", 60, kTypeValue), // skip to "d+"
439 test::KeyStr("b", 40, kTypeValue), test::KeyStr("c", 35, kTypeValue),
440 test::KeyStr("d", 70, kTypeMerge),
441 test::KeyStr("e", 71, kTypeMerge), // keep
442 test::KeyStr("f", 65, kTypeMerge), // skip to "f", aka keep
443 test::KeyStr("f", 30, kTypeMerge), // skip to "g+"
444 test::KeyStr("f", 25, kTypeValue), test::KeyStr("g", 90, kTypeValue),
445 test::KeyStr("h", 91, kTypeValue), // keep
446 test::KeyStr("i", 95, kTypeMerge), // skip to "z"
447 test::KeyStr("j", 99, kTypeValue)},
448 {"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30",
449 "fv25", "gv90", "hv91", "im95", "jv99"},
450 {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, &merge_op, &filter);
451
452 // Compaction should output just "a", "e" and "h" keys.
453 c_iter_->SeekToFirst();
454 ASSERT_TRUE(c_iter_->Valid());
455 ASSERT_EQ(test::KeyStr("a", 50, kTypeValue), c_iter_->key().ToString());
456 ASSERT_EQ("av50", c_iter_->value().ToString());
457 c_iter_->Next();
458 ASSERT_TRUE(c_iter_->Valid());
459 ASSERT_EQ(test::KeyStr("e", 71, kTypeMerge), c_iter_->key().ToString());
460 ASSERT_EQ("em71", c_iter_->value().ToString());
461 c_iter_->Next();
462 ASSERT_TRUE(c_iter_->Valid());
463 ASSERT_EQ(test::KeyStr("h", 91, kTypeValue), c_iter_->key().ToString());
464 ASSERT_EQ("hv91", c_iter_->value().ToString());
465 c_iter_->Next();
466 ASSERT_FALSE(c_iter_->Valid());
467
468 // Check that the compaction iterator did the correct sequence of calls on
469 // the underlying iterator.
470 using A = LoggingForwardVectorIterator::Action;
471 using T = A::Type;
472 std::vector<A> expected_actions = {
473 A(T::SEEK_TO_FIRST),
474 A(T::NEXT),
475 A(T::NEXT),
476 A(T::SEEK, test::KeyStr("d+", kMaxSequenceNumber, kValueTypeForSeek)),
477 A(T::NEXT),
478 A(T::NEXT),
479 A(T::SEEK, test::KeyStr("g+", kMaxSequenceNumber, kValueTypeForSeek)),
480 A(T::NEXT),
481 A(T::SEEK, test::KeyStr("z", kMaxSequenceNumber, kValueTypeForSeek))};
482 ASSERT_EQ(expected_actions, iter_->log);
483 }
484
485 TEST_P(CompactionIteratorTest, ShuttingDownInFilter) {
486 NoMergingMergeOp merge_op;
487 StallingFilter filter;
488 InitIterators(
489 {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeValue),
490 test::KeyStr("3", 3, kTypeValue), test::KeyStr("4", 4, kTypeValue)},
491 {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
492 &merge_op, &filter);
493 // Don't leave tombstones (kTypeDeletion) for filtered keys.
494 compaction_proxy_->key_not_exists_beyond_output_level = true;
495
496 std::atomic<bool> seek_done{false};
497 rocksdb::port::Thread compaction_thread([&] {
498 c_iter_->SeekToFirst();
499 EXPECT_FALSE(c_iter_->Valid());
500 EXPECT_TRUE(c_iter_->status().IsShutdownInProgress());
501 seek_done.store(true);
502 });
503
504 // Let key 1 through.
505 filter.WaitForStall(1);
506
507 // Shutdown during compaction filter call for key 2.
508 filter.WaitForStall(2);
509 shutting_down_.store(true);
510 EXPECT_FALSE(seek_done.load());
511
512 // Unstall filter and wait for SeekToFirst() to return.
513 filter.stall_at.store(3);
514 compaction_thread.join();
515 assert(seek_done.load());
516
517 // Check that filter was never called again.
518 EXPECT_EQ(2, filter.last_seen.load());
519 }
520
521 // Same as ShuttingDownInFilter, but shutdown happens during filter call for
522 // a merge operand, not for a value.
523 TEST_P(CompactionIteratorTest, ShuttingDownInMerge) {
524 NoMergingMergeOp merge_op;
525 StallingFilter filter;
526 InitIterators(
527 {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeMerge),
528 test::KeyStr("3", 3, kTypeMerge), test::KeyStr("4", 4, kTypeValue)},
529 {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
530 &merge_op, &filter);
531 compaction_proxy_->key_not_exists_beyond_output_level = true;
532
533 std::atomic<bool> seek_done{false};
534 rocksdb::port::Thread compaction_thread([&] {
535 c_iter_->SeekToFirst();
536 ASSERT_FALSE(c_iter_->Valid());
537 ASSERT_TRUE(c_iter_->status().IsShutdownInProgress());
538 seek_done.store(true);
539 });
540
541 // Let key 1 through.
542 filter.WaitForStall(1);
543
544 // Shutdown during compaction filter call for key 2.
545 filter.WaitForStall(2);
546 shutting_down_.store(true);
547 EXPECT_FALSE(seek_done.load());
548
549 // Unstall filter and wait for SeekToFirst() to return.
550 filter.stall_at.store(3);
551 compaction_thread.join();
552 assert(seek_done.load());
553
554 // Check that filter was never called again.
555 EXPECT_EQ(2, filter.last_seen.load());
556 }
557
558 TEST_P(CompactionIteratorTest, SingleMergeOperand) {
559 class Filter : public CompactionFilter {
560 Decision FilterV2(int /*level*/, const Slice& key, ValueType t,
561 const Slice& existing_value, std::string* /*new_value*/,
562 std::string* /*skip_until*/) const override {
563 std::string k = key.ToString();
564 std::string v = existing_value.ToString();
565
566 // See InitIterators() call below for the sequence of keys and their
567 // filtering decisions. Here we closely assert that compaction filter is
568 // called with the expected keys and only them, and with the right values.
569 if (k == "a") {
570 EXPECT_EQ(ValueType::kMergeOperand, t);
571 EXPECT_EQ("av1", v);
572 return Decision::kKeep;
573 } else if (k == "b") {
574 EXPECT_EQ(ValueType::kMergeOperand, t);
575 return Decision::kKeep;
576 } else if (k == "c") {
577 return Decision::kKeep;
578 }
579
580 ADD_FAILURE();
581 return Decision::kKeep;
582 }
583
584 const char* Name() const override {
585 return "CompactionIteratorTest.SingleMergeOperand::Filter";
586 }
587 };
588
589 class SingleMergeOp : public MergeOperator {
590 public:
591 bool FullMergeV2(const MergeOperationInput& merge_in,
592 MergeOperationOutput* merge_out) const override {
593 // See InitIterators() call below for why "c" is the only key for which
594 // FullMergeV2 should be called.
595 EXPECT_EQ("c", merge_in.key.ToString());
596
597 std::string temp_value;
598 if (merge_in.existing_value != nullptr) {
599 temp_value = merge_in.existing_value->ToString();
600 }
601
602 for (auto& operand : merge_in.operand_list) {
603 temp_value.append(operand.ToString());
604 }
605 merge_out->new_value = temp_value;
606
607 return true;
608 }
609
610 bool PartialMergeMulti(const Slice& key,
611 const std::deque<Slice>& operand_list,
612 std::string* new_value,
613 Logger* /*logger*/) const override {
614 std::string string_key = key.ToString();
615 EXPECT_TRUE(string_key == "a" || string_key == "b");
616
617 if (string_key == "a") {
618 EXPECT_EQ(1, operand_list.size());
619 } else if (string_key == "b") {
620 EXPECT_EQ(2, operand_list.size());
621 }
622
623 std::string temp_value;
624 for (auto& operand : operand_list) {
625 temp_value.append(operand.ToString());
626 }
627 swap(temp_value, *new_value);
628
629 return true;
630 }
631
632 const char* Name() const override {
633 return "CompactionIteratorTest SingleMergeOp";
634 }
635
636 bool AllowSingleOperand() const override { return true; }
637 };
638
639 SingleMergeOp merge_op;
640 Filter filter;
641 InitIterators(
642 // a should invoke PartialMergeMulti with a single merge operand.
643 {test::KeyStr("a", 50, kTypeMerge),
644 // b should invoke PartialMergeMulti with two operands.
645 test::KeyStr("b", 70, kTypeMerge), test::KeyStr("b", 60, kTypeMerge),
646 // c should invoke FullMerge due to kTypeValue at the beginning.
647 test::KeyStr("c", 90, kTypeMerge), test::KeyStr("c", 80, kTypeValue)},
648 {"av1", "bv2", "bv1", "cv2", "cv1"}, {}, {}, kMaxSequenceNumber,
649 kMaxSequenceNumber, &merge_op, &filter);
650
651 c_iter_->SeekToFirst();
652 ASSERT_TRUE(c_iter_->Valid());
653 ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), c_iter_->key().ToString());
654 ASSERT_EQ("av1", c_iter_->value().ToString());
655 c_iter_->Next();
656 ASSERT_TRUE(c_iter_->Valid());
657 ASSERT_EQ("bv1bv2", c_iter_->value().ToString());
658 c_iter_->Next();
659 ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
660 }
661
662 // In bottommost level, values earlier than earliest snapshot can be output
663 // with sequence = 0.
664 TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
665 AddSnapshot(1);
666 RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
667 {"v1", "v2"},
668 {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
669 {"v1", "v2"}, kMaxSequenceNumber /*last_commited_seq*/,
670 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
671 true /*bottommost_level*/);
672 }
673
674 // In bottommost level, deletions earlier than earliest snapshot can be removed
675 // permanently.
676 TEST_P(CompactionIteratorTest, RemoveDeletionAtBottomLevel) {
677 AddSnapshot(1);
678 RunTest({test::KeyStr("a", 1, kTypeDeletion),
679 test::KeyStr("b", 3, kTypeDeletion),
680 test::KeyStr("b", 1, kTypeValue)},
681 {"", "", ""},
682 {test::KeyStr("b", 3, kTypeDeletion),
683 test::KeyStr("b", 0, kTypeValue)},
684 {"", ""},
685 kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/,
686 nullptr /*compaction_filter*/, true /*bottommost_level*/);
687 }
688
689 // In bottommost level, single deletions earlier than earliest snapshot can be
690 // removed permanently.
691 TEST_P(CompactionIteratorTest, RemoveSingleDeletionAtBottomLevel) {
692 AddSnapshot(1);
693 RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
694 test::KeyStr("b", 2, kTypeSingleDeletion)},
695 {"", ""}, {test::KeyStr("b", 2, kTypeSingleDeletion)}, {""},
696 kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/,
697 nullptr /*compaction_filter*/, true /*bottommost_level*/);
698 }
699
700 INSTANTIATE_TEST_CASE_P(CompactionIteratorTestInstance, CompactionIteratorTest,
701 testing::Values(true, false));
702
703 // Tests how CompactionIterator work together with SnapshotChecker.
704 class CompactionIteratorWithSnapshotCheckerTest
705 : public CompactionIteratorTest {
706 public:
707 bool UseSnapshotChecker() const override { return true; }
708 };
709
710 // Uncommitted keys (keys with seq > last_committed_seq) should be output as-is
711 // while committed version of these keys should get compacted as usual.
712
713 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
714 PreserveUncommittedKeys_Value) {
715 RunTest(
716 {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue),
717 test::KeyStr("foo", 1, kTypeValue)},
718 {"v3", "v2", "v1"},
719 {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue)},
720 {"v3", "v2"}, 2 /*last_committed_seq*/);
721 }
722
723 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
724 PreserveUncommittedKeys_Deletion) {
725 RunTest({test::KeyStr("foo", 2, kTypeDeletion),
726 test::KeyStr("foo", 1, kTypeValue)},
727 {"", "v1"},
728 {test::KeyStr("foo", 2, kTypeDeletion),
729 test::KeyStr("foo", 1, kTypeValue)},
730 {"", "v1"}, 1 /*last_committed_seq*/);
731 }
732
733 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
734 PreserveUncommittedKeys_Merge) {
735 auto merge_op = MergeOperators::CreateStringAppendOperator();
736 RunTest(
737 {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
738 test::KeyStr("foo", 1, kTypeValue)},
739 {"v3", "v2", "v1"},
740 {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeValue)},
741 {"v3", "v1,v2"}, 2 /*last_committed_seq*/, merge_op.get());
742 }
743
744 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
745 PreserveUncommittedKeys_SingleDelete) {
746 RunTest({test::KeyStr("foo", 2, kTypeSingleDeletion),
747 test::KeyStr("foo", 1, kTypeValue)},
748 {"", "v1"},
749 {test::KeyStr("foo", 2, kTypeSingleDeletion),
750 test::KeyStr("foo", 1, kTypeValue)},
751 {"", "v1"}, 1 /*last_committed_seq*/);
752 }
753
754 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
755 PreserveUncommittedKeys_BlobIndex) {
756 RunTest({test::KeyStr("foo", 3, kTypeBlobIndex),
757 test::KeyStr("foo", 2, kTypeBlobIndex),
758 test::KeyStr("foo", 1, kTypeBlobIndex)},
759 {"v3", "v2", "v1"},
760 {test::KeyStr("foo", 3, kTypeBlobIndex),
761 test::KeyStr("foo", 2, kTypeBlobIndex)},
762 {"v3", "v2"}, 2 /*last_committed_seq*/);
763 }
764
765 // Test compaction iterator dedup keys visible to the same snapshot.
766
767 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Value) {
768 AddSnapshot(2, 1);
769 RunTest(
770 {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
771 test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
772 {"v4", "v3", "v2", "v1"},
773 {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
774 test::KeyStr("foo", 1, kTypeValue)},
775 {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
776 }
777
778 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Deletion) {
779 AddSnapshot(2, 1);
780 RunTest(
781 {test::KeyStr("foo", 4, kTypeValue),
782 test::KeyStr("foo", 3, kTypeDeletion),
783 test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
784 {"v4", "", "v2", "v1"},
785 {test::KeyStr("foo", 4, kTypeValue),
786 test::KeyStr("foo", 3, kTypeDeletion),
787 test::KeyStr("foo", 1, kTypeValue)},
788 {"v4", "", "v1"}, 3 /*last_committed_seq*/);
789 }
790
791 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Merge) {
792 AddSnapshot(2, 1);
793 AddSnapshot(4, 3);
794 auto merge_op = MergeOperators::CreateStringAppendOperator();
795 RunTest(
796 {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
797 test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
798 test::KeyStr("foo", 1, kTypeValue)},
799 {"v5", "v4", "v3", "v2", "v1"},
800 {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
801 test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)},
802 {"v5", "v4", "v2,v3", "v1"}, 4 /*last_committed_seq*/, merge_op.get());
803 }
804
805 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
806 DedupSameSnapshot_SingleDeletion) {
807 AddSnapshot(2, 1);
808 RunTest(
809 {test::KeyStr("foo", 4, kTypeValue),
810 test::KeyStr("foo", 3, kTypeSingleDeletion),
811 test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
812 {"v4", "", "v2", "v1"},
813 {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
814 {"v4", "v1"}, 3 /*last_committed_seq*/);
815 }
816
817 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_BlobIndex) {
818 AddSnapshot(2, 1);
819 RunTest({test::KeyStr("foo", 4, kTypeBlobIndex),
820 test::KeyStr("foo", 3, kTypeBlobIndex),
821 test::KeyStr("foo", 2, kTypeBlobIndex),
822 test::KeyStr("foo", 1, kTypeBlobIndex)},
823 {"v4", "v3", "v2", "v1"},
824 {test::KeyStr("foo", 4, kTypeBlobIndex),
825 test::KeyStr("foo", 3, kTypeBlobIndex),
826 test::KeyStr("foo", 1, kTypeBlobIndex)},
827 {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
828 }
829
830 // At bottom level, sequence numbers can be zero out, and deletions can be
831 // removed, but only when they are visible to earliest snapshot.
832
833 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
834 NotZeroOutSequenceIfNotVisibleToEarliestSnapshot) {
835 AddSnapshot(2, 1);
836 RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue),
837 test::KeyStr("c", 3, kTypeValue)},
838 {"v1", "v2", "v3"},
839 {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue),
840 test::KeyStr("c", 3, kTypeValue)},
841 {"v1", "v2", "v3"}, kMaxSequenceNumber /*last_commited_seq*/,
842 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
843 true /*bottommost_level*/);
844 }
845
846 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
847 NotRemoveDeletionIfNotVisibleToEarliestSnapshot) {
848 AddSnapshot(2, 1);
849 RunTest(
850 {test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 2, kTypeDeletion),
851 test::KeyStr("c", 3, kTypeDeletion)},
852 {"", "", ""},
853 {},
854 {"", ""}, kMaxSequenceNumber /*last_commited_seq*/,
855 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
856 true /*bottommost_level*/);
857 }
858
859 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
860 NotRemoveDeletionIfValuePresentToEarlierSnapshot) {
861 AddSnapshot(2,1);
862 RunTest(
863 {test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 1, kTypeValue),
864 test::KeyStr("b", 3, kTypeValue)},
865 {"", "", ""},
866 {test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 0, kTypeValue),
867 test::KeyStr("b", 3, kTypeValue)},
868 {"", "", ""}, kMaxSequenceNumber /*last_commited_seq*/,
869 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
870 true /*bottommost_level*/);
871 }
872
873 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
874 NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot) {
875 AddSnapshot(2, 1);
876 RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
877 test::KeyStr("b", 2, kTypeSingleDeletion),
878 test::KeyStr("c", 3, kTypeSingleDeletion)},
879 {"", "", ""},
880 {test::KeyStr("b", 2, kTypeSingleDeletion),
881 test::KeyStr("c", 3, kTypeSingleDeletion)},
882 {"", ""}, kMaxSequenceNumber /*last_commited_seq*/,
883 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
884 true /*bottommost_level*/);
885 }
886
887 // Single delete should not cancel out values that not visible to the
888 // same set of snapshots
889 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
890 SingleDeleteAcrossSnapshotBoundary) {
891 AddSnapshot(2, 1);
892 RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
893 test::KeyStr("a", 1, kTypeValue)},
894 {"", "v1"},
895 {test::KeyStr("a", 2, kTypeSingleDeletion),
896 test::KeyStr("a", 1, kTypeValue)},
897 {"", "v1"}, 2 /*last_committed_seq*/);
898 }
899
900 // Single delete should be kept in case it is not visible to the
901 // earliest write conflict snapshot. If a single delete is kept for this reason,
902 // corresponding value can be trimmed to save space.
903 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
904 KeepSingleDeletionForWriteConflictChecking) {
905 AddSnapshot(2, 0);
906 RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
907 test::KeyStr("a", 1, kTypeValue)},
908 {"", "v1"},
909 {test::KeyStr("a", 2, kTypeSingleDeletion),
910 test::KeyStr("a", 1, kTypeValue)},
911 {"", ""}, 2 /*last_committed_seq*/, nullptr /*merge_operator*/,
912 nullptr /*compaction_filter*/, false /*bottommost_level*/,
913 2 /*earliest_write_conflict_snapshot*/);
914 }
915
916 // Compaction filter should keep uncommitted key as-is, and
917 // * Convert the latest velue to deletion, and/or
918 // * if latest value is a merge, apply filter to all suequent merges.
919
920 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) {
921 std::unique_ptr<CompactionFilter> compaction_filter(
922 new FilterAllKeysCompactionFilter());
923 RunTest(
924 {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeValue),
925 test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeValue)},
926 {"v2", "v1", "v3", "v4"},
927 {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeDeletion),
928 test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeDeletion)},
929 {"v2", "", "v3", ""}, 1 /*last_committed_seq*/,
930 nullptr /*merge_operator*/, compaction_filter.get());
931 }
932
933 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Deletion) {
934 std::unique_ptr<CompactionFilter> compaction_filter(
935 new FilterAllKeysCompactionFilter());
936 RunTest(
937 {test::KeyStr("a", 2, kTypeDeletion), test::KeyStr("a", 1, kTypeValue)},
938 {"", "v1"},
939 {test::KeyStr("a", 2, kTypeDeletion),
940 test::KeyStr("a", 1, kTypeDeletion)},
941 {"", ""}, 1 /*last_committed_seq*/, nullptr /*merge_operator*/,
942 compaction_filter.get());
943 }
944
945 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
946 CompactionFilter_PartialMerge) {
947 std::shared_ptr<MergeOperator> merge_op =
948 MergeOperators::CreateStringAppendOperator();
949 std::unique_ptr<CompactionFilter> compaction_filter(
950 new FilterAllKeysCompactionFilter());
951 RunTest({test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
952 test::KeyStr("a", 1, kTypeMerge)},
953 {"v3", "v2", "v1"}, {test::KeyStr("a", 3, kTypeMerge)}, {"v3"},
954 2 /*last_committed_seq*/, merge_op.get(), compaction_filter.get());
955 }
956
957 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_FullMerge) {
958 std::shared_ptr<MergeOperator> merge_op =
959 MergeOperators::CreateStringAppendOperator();
960 std::unique_ptr<CompactionFilter> compaction_filter(
961 new FilterAllKeysCompactionFilter());
962 RunTest(
963 {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
964 test::KeyStr("a", 1, kTypeValue)},
965 {"v3", "v2", "v1"},
966 {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 1, kTypeDeletion)},
967 {"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(),
968 compaction_filter.get());
969 }
970
971 } // namespace rocksdb
972
973 int main(int argc, char** argv) {
974 ::testing::InitGoogleTest(&argc, argv);
975 return RUN_ALL_TESTS();
976 }