1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/compaction_iterator.h"
11 #include "port/port.h"
12 #include "util/string_util.h"
13 #include "util/testharness.h"
14 #include "util/testutil.h"
15 #include "utilities/merge_operators.h"
19 // Expects no merging attempts.
20 class NoMergingMergeOp
: public MergeOperator
{
22 bool FullMergeV2(const MergeOperationInput
& /*merge_in*/,
23 MergeOperationOutput
* /*merge_out*/) const override
{
27 bool PartialMergeMulti(const Slice
& /*key*/,
28 const std::deque
<Slice
>& /*operand_list*/,
29 std::string
* /*new_value*/,
30 Logger
* /*logger*/) const override
{
34 const char* Name() const override
{
35 return "CompactionIteratorTest NoMergingMergeOp";
39 // Compaction filter that gets stuck when it sees a particular key,
40 // then gets unstuck when told to.
41 // Always returns Decition::kRemove.
42 class StallingFilter
: public CompactionFilter
{
44 Decision
FilterV2(int /*level*/, const Slice
& key
, ValueType
/*type*/,
45 const Slice
& /*existing_value*/, std::string
* /*new_value*/,
46 std::string
* /*skip_until*/) const override
{
47 int k
= std::atoi(key
.ToString().c_str());
49 while (k
>= stall_at
.load()) {
50 std::this_thread::yield();
52 return Decision::kRemove
;
55 const char* Name() const override
{
56 return "CompactionIteratorTest StallingFilter";
59 // Wait until the filter sees a key >= k and stalls at that key.
60 // If `exact`, asserts that the seen key is equal to k.
61 void WaitForStall(int k
, bool exact
= true) {
63 while (last_seen
.load() < k
) {
64 std::this_thread::yield();
67 EXPECT_EQ(k
, last_seen
.load());
71 // Filter will stall on key >= stall_at. Advance stall_at to unstall.
72 mutable std::atomic
<int> stall_at
{0};
73 // Last key the filter was called with.
74 mutable std::atomic
<int> last_seen
{0};
77 // Compaction filter that filter out all keys.
78 class FilterAllKeysCompactionFilter
: public CompactionFilter
{
80 Decision
FilterV2(int /*level*/, const Slice
& /*key*/, ValueType
/*type*/,
81 const Slice
& /*existing_value*/, std::string
* /*new_value*/,
82 std::string
* /*skip_until*/) const override
{
83 return Decision::kRemove
;
86 const char* Name() const override
{ return "AllKeysCompactionFilter"; }
89 class LoggingForwardVectorIterator
: public InternalIterator
{
101 explicit Action(Type _type
, std::string _arg
= "")
102 : type(_type
), arg(_arg
) {}
104 bool operator==(const Action
& rhs
) const {
105 return std::tie(type
, arg
) == std::tie(rhs
.type
, rhs
.arg
);
109 LoggingForwardVectorIterator(const std::vector
<std::string
>& keys
,
110 const std::vector
<std::string
>& values
)
111 : keys_(keys
), values_(values
), current_(keys
.size()) {
112 assert(keys_
.size() == values_
.size());
115 bool Valid() const override
{ return current_
< keys_
.size(); }
117 void SeekToFirst() override
{
118 log
.emplace_back(Action::Type::SEEK_TO_FIRST
);
121 void SeekToLast() override
{ assert(false); }
123 void Seek(const Slice
& target
) override
{
124 log
.emplace_back(Action::Type::SEEK
, target
.ToString());
125 current_
= std::lower_bound(keys_
.begin(), keys_
.end(), target
.ToString()) -
129 void SeekForPrev(const Slice
& /*target*/) override
{ assert(false); }
131 void Next() override
{
133 log
.emplace_back(Action::Type::NEXT
);
136 void Prev() override
{ assert(false); }
138 Slice
key() const override
{
140 return Slice(keys_
[current_
]);
142 Slice
value() const override
{
144 return Slice(values_
[current_
]);
147 Status
status() const override
{ return Status::OK(); }
149 std::vector
<Action
> log
;
152 std::vector
<std::string
> keys_
;
153 std::vector
<std::string
> values_
;
157 class FakeCompaction
: public CompactionIterator::CompactionProxy
{
159 FakeCompaction() = default;
161 int level(size_t /*compaction_input_level*/) const override
{ return 0; }
162 bool KeyNotExistsBeyondOutputLevel(
163 const Slice
& /*user_key*/,
164 std::vector
<size_t>* /*level_ptrs*/) const override
{
165 return is_bottommost_level
|| key_not_exists_beyond_output_level
;
167 bool bottommost_level() const override
{ return is_bottommost_level
; }
168 int number_levels() const override
{ return 1; }
169 Slice
GetLargestUserKey() const override
{
170 return "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
172 bool allow_ingest_behind() const override
{ return false; }
174 bool preserve_deletes() const override
{ return false; }
176 bool key_not_exists_beyond_output_level
= false;
178 bool is_bottommost_level
= false;
181 // A simplifed snapshot checker which assumes each snapshot has a global
182 // last visible sequence.
183 class TestSnapshotChecker
: public SnapshotChecker
{
185 explicit TestSnapshotChecker(
186 SequenceNumber last_committed_sequence
,
187 const std::unordered_map
<SequenceNumber
, SequenceNumber
>& snapshots
= {})
188 : last_committed_sequence_(last_committed_sequence
),
189 snapshots_(snapshots
) {}
191 SnapshotCheckerResult
CheckInSnapshot(
192 SequenceNumber seq
, SequenceNumber snapshot_seq
) const override
{
193 if (snapshot_seq
== kMaxSequenceNumber
) {
194 return seq
<= last_committed_sequence_
195 ? SnapshotCheckerResult::kInSnapshot
196 : SnapshotCheckerResult::kNotInSnapshot
;
198 assert(snapshots_
.count(snapshot_seq
) > 0);
199 return seq
<= snapshots_
.at(snapshot_seq
)
200 ? SnapshotCheckerResult::kInSnapshot
201 : SnapshotCheckerResult::kNotInSnapshot
;
205 SequenceNumber last_committed_sequence_
;
206 // A map of valid snapshot to last visible sequence to the snapshot.
207 std::unordered_map
<SequenceNumber
, SequenceNumber
> snapshots_
;
211 // bool: whether to pass snapshot_checker to compaction iterator.
212 class CompactionIteratorTest
: public testing::TestWithParam
<bool> {
214 CompactionIteratorTest()
215 : cmp_(BytewiseComparator()), icmp_(cmp_
), snapshots_({}) {}
218 const std::vector
<std::string
>& ks
, const std::vector
<std::string
>& vs
,
219 const std::vector
<std::string
>& range_del_ks
,
220 const std::vector
<std::string
>& range_del_vs
,
221 SequenceNumber last_sequence
,
222 SequenceNumber last_committed_sequence
= kMaxSequenceNumber
,
223 MergeOperator
* merge_op
= nullptr, CompactionFilter
* filter
= nullptr,
224 bool bottommost_level
= false,
225 SequenceNumber earliest_write_conflict_snapshot
= kMaxSequenceNumber
) {
226 std::unique_ptr
<InternalIterator
> unfragmented_range_del_iter(
227 new test::VectorIterator(range_del_ks
, range_del_vs
));
228 auto tombstone_list
= std::make_shared
<FragmentedRangeTombstoneList
>(
229 std::move(unfragmented_range_del_iter
), icmp_
);
230 std::unique_ptr
<FragmentedRangeTombstoneIterator
> range_del_iter(
231 new FragmentedRangeTombstoneIterator(tombstone_list
, icmp_
,
232 kMaxSequenceNumber
));
233 range_del_agg_
.reset(new CompactionRangeDelAggregator(&icmp_
, snapshots_
));
234 range_del_agg_
->AddTombstones(std::move(range_del_iter
));
236 std::unique_ptr
<CompactionIterator::CompactionProxy
> compaction
;
237 if (filter
|| bottommost_level
) {
238 compaction_proxy_
= new FakeCompaction();
239 compaction_proxy_
->is_bottommost_level
= bottommost_level
;
240 compaction
.reset(compaction_proxy_
);
242 bool use_snapshot_checker
= UseSnapshotChecker() || GetParam();
243 if (use_snapshot_checker
|| last_committed_sequence
< kMaxSequenceNumber
) {
244 snapshot_checker_
.reset(
245 new TestSnapshotChecker(last_committed_sequence
, snapshot_map_
));
248 new MergeHelper(Env::Default(), cmp_
, merge_op
, filter
, nullptr, false,
249 0 /*latest_snapshot*/, snapshot_checker_
.get(),
250 0 /*level*/, nullptr /*statistics*/, &shutting_down_
));
252 iter_
.reset(new LoggingForwardVectorIterator(ks
, vs
));
253 iter_
->SeekToFirst();
254 c_iter_
.reset(new CompactionIterator(
255 iter_
.get(), cmp_
, merge_helper_
.get(), last_sequence
, &snapshots_
,
256 earliest_write_conflict_snapshot
, snapshot_checker_
.get(),
257 Env::Default(), false /* report_detailed_time */, false,
258 range_del_agg_
.get(), std::move(compaction
), filter
, &shutting_down_
));
261 void AddSnapshot(SequenceNumber snapshot
,
262 SequenceNumber last_visible_seq
= kMaxSequenceNumber
) {
263 snapshots_
.push_back(snapshot
);
264 snapshot_map_
[snapshot
] = last_visible_seq
;
267 virtual bool UseSnapshotChecker() const { return false; }
270 const std::vector
<std::string
>& input_keys
,
271 const std::vector
<std::string
>& input_values
,
272 const std::vector
<std::string
>& expected_keys
,
273 const std::vector
<std::string
>& expected_values
,
274 SequenceNumber last_committed_seq
= kMaxSequenceNumber
,
275 MergeOperator
* merge_operator
= nullptr,
276 CompactionFilter
* compaction_filter
= nullptr,
277 bool bottommost_level
= false,
278 SequenceNumber earliest_write_conflict_snapshot
= kMaxSequenceNumber
) {
279 InitIterators(input_keys
, input_values
, {}, {}, kMaxSequenceNumber
,
280 last_committed_seq
, merge_operator
, compaction_filter
,
281 bottommost_level
, earliest_write_conflict_snapshot
);
282 c_iter_
->SeekToFirst();
283 for (size_t i
= 0; i
< expected_keys
.size(); i
++) {
284 std::string info
= "i = " + ToString(i
);
285 ASSERT_TRUE(c_iter_
->Valid()) << info
;
286 ASSERT_OK(c_iter_
->status()) << info
;
287 ASSERT_EQ(expected_keys
[i
], c_iter_
->key().ToString()) << info
;
288 ASSERT_EQ(expected_values
[i
], c_iter_
->value().ToString()) << info
;
291 ASSERT_FALSE(c_iter_
->Valid());
294 const Comparator
* cmp_
;
295 const InternalKeyComparator icmp_
;
296 std::vector
<SequenceNumber
> snapshots_
;
297 // A map of valid snapshot to last visible sequence to the snapshot.
298 std::unordered_map
<SequenceNumber
, SequenceNumber
> snapshot_map_
;
299 std::unique_ptr
<MergeHelper
> merge_helper_
;
300 std::unique_ptr
<LoggingForwardVectorIterator
> iter_
;
301 std::unique_ptr
<CompactionIterator
> c_iter_
;
302 std::unique_ptr
<CompactionRangeDelAggregator
> range_del_agg_
;
303 std::unique_ptr
<SnapshotChecker
> snapshot_checker_
;
304 std::atomic
<bool> shutting_down_
{false};
305 FakeCompaction
* compaction_proxy_
;
308 // It is possible that the output of the compaction iterator is empty even if
310 TEST_P(CompactionIteratorTest
, EmptyResult
) {
311 InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion
),
312 test::KeyStr("a", 3, kTypeValue
)},
313 {"", "val"}, {}, {}, 5);
314 c_iter_
->SeekToFirst();
315 ASSERT_FALSE(c_iter_
->Valid());
318 // If there is a corruption after a single deletion, the corrupted key should
320 TEST_P(CompactionIteratorTest
, CorruptionAfterSingleDeletion
) {
321 InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion
),
322 test::KeyStr("a", 3, kTypeValue
, true),
323 test::KeyStr("b", 10, kTypeValue
)},
324 {"", "val", "val2"}, {}, {}, 10);
325 c_iter_
->SeekToFirst();
326 ASSERT_TRUE(c_iter_
->Valid());
327 ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion
),
328 c_iter_
->key().ToString());
330 ASSERT_TRUE(c_iter_
->Valid());
331 ASSERT_EQ(test::KeyStr("a", 3, kTypeValue
, true), c_iter_
->key().ToString());
333 ASSERT_TRUE(c_iter_
->Valid());
334 ASSERT_EQ(test::KeyStr("b", 10, kTypeValue
), c_iter_
->key().ToString());
336 ASSERT_FALSE(c_iter_
->Valid());
339 TEST_P(CompactionIteratorTest
, SimpleRangeDeletion
) {
340 InitIterators({test::KeyStr("morning", 5, kTypeValue
),
341 test::KeyStr("morning", 2, kTypeValue
),
342 test::KeyStr("night", 3, kTypeValue
)},
343 {"zao", "zao", "wan"},
344 {test::KeyStr("ma", 4, kTypeRangeDeletion
)}, {"mz"}, 5);
345 c_iter_
->SeekToFirst();
346 ASSERT_TRUE(c_iter_
->Valid());
347 ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue
), c_iter_
->key().ToString());
349 ASSERT_TRUE(c_iter_
->Valid());
350 ASSERT_EQ(test::KeyStr("night", 3, kTypeValue
), c_iter_
->key().ToString());
352 ASSERT_FALSE(c_iter_
->Valid());
355 TEST_P(CompactionIteratorTest
, RangeDeletionWithSnapshots
) {
357 std::vector
<std::string
> ks1
;
358 ks1
.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion
));
359 std::vector
<std::string
> vs1
{"mz"};
360 std::vector
<std::string
> ks2
{test::KeyStr("morning", 15, kTypeValue
),
361 test::KeyStr("morning", 5, kTypeValue
),
362 test::KeyStr("night", 40, kTypeValue
),
363 test::KeyStr("night", 20, kTypeValue
)};
364 std::vector
<std::string
> vs2
{"zao 15", "zao 5", "wan 40", "wan 20"};
365 InitIterators(ks2
, vs2
, ks1
, vs1
, 40);
366 c_iter_
->SeekToFirst();
367 ASSERT_TRUE(c_iter_
->Valid());
368 ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue
), c_iter_
->key().ToString());
370 ASSERT_TRUE(c_iter_
->Valid());
371 ASSERT_EQ(test::KeyStr("night", 40, kTypeValue
), c_iter_
->key().ToString());
373 ASSERT_FALSE(c_iter_
->Valid());
376 TEST_P(CompactionIteratorTest
, CompactionFilterSkipUntil
) {
377 class Filter
: public CompactionFilter
{
378 Decision
FilterV2(int /*level*/, const Slice
& key
, ValueType t
,
379 const Slice
& existing_value
, std::string
* /*new_value*/,
380 std::string
* skip_until
) const override
{
381 std::string k
= key
.ToString();
382 std::string v
= existing_value
.ToString();
383 // See InitIterators() call below for the sequence of keys and their
384 // filtering decisions. Here we closely assert that compaction filter is
385 // called with the expected keys and only them, and with the right values.
387 EXPECT_EQ(ValueType::kValue
, t
);
388 EXPECT_EQ("av50", v
);
389 return Decision::kKeep
;
392 EXPECT_EQ(ValueType::kValue
, t
);
393 EXPECT_EQ("bv60", v
);
395 return Decision::kRemoveAndSkipUntil
;
398 EXPECT_EQ(ValueType::kMergeOperand
, t
);
399 EXPECT_EQ("em71", v
);
400 return Decision::kKeep
;
404 EXPECT_EQ(ValueType::kMergeOperand
, t
);
407 EXPECT_EQ("fm30", v
);
408 EXPECT_EQ(ValueType::kMergeOperand
, t
);
411 return Decision::kRemoveAndSkipUntil
;
414 EXPECT_EQ(ValueType::kValue
, t
);
415 EXPECT_EQ("hv91", v
);
416 return Decision::kKeep
;
419 EXPECT_EQ(ValueType::kMergeOperand
, t
);
420 EXPECT_EQ("im95", v
);
422 return Decision::kRemoveAndSkipUntil
;
425 return Decision::kKeep
;
428 const char* Name() const override
{
429 return "CompactionIteratorTest.CompactionFilterSkipUntil::Filter";
433 NoMergingMergeOp merge_op
;
436 {test::KeyStr("a", 50, kTypeValue
), // keep
437 test::KeyStr("a", 45, kTypeMerge
),
438 test::KeyStr("b", 60, kTypeValue
), // skip to "d+"
439 test::KeyStr("b", 40, kTypeValue
), test::KeyStr("c", 35, kTypeValue
),
440 test::KeyStr("d", 70, kTypeMerge
),
441 test::KeyStr("e", 71, kTypeMerge
), // keep
442 test::KeyStr("f", 65, kTypeMerge
), // skip to "f", aka keep
443 test::KeyStr("f", 30, kTypeMerge
), // skip to "g+"
444 test::KeyStr("f", 25, kTypeValue
), test::KeyStr("g", 90, kTypeValue
),
445 test::KeyStr("h", 91, kTypeValue
), // keep
446 test::KeyStr("i", 95, kTypeMerge
), // skip to "z"
447 test::KeyStr("j", 99, kTypeValue
)},
448 {"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30",
449 "fv25", "gv90", "hv91", "im95", "jv99"},
450 {}, {}, kMaxSequenceNumber
, kMaxSequenceNumber
, &merge_op
, &filter
);
452 // Compaction should output just "a", "e" and "h" keys.
453 c_iter_
->SeekToFirst();
454 ASSERT_TRUE(c_iter_
->Valid());
455 ASSERT_EQ(test::KeyStr("a", 50, kTypeValue
), c_iter_
->key().ToString());
456 ASSERT_EQ("av50", c_iter_
->value().ToString());
458 ASSERT_TRUE(c_iter_
->Valid());
459 ASSERT_EQ(test::KeyStr("e", 71, kTypeMerge
), c_iter_
->key().ToString());
460 ASSERT_EQ("em71", c_iter_
->value().ToString());
462 ASSERT_TRUE(c_iter_
->Valid());
463 ASSERT_EQ(test::KeyStr("h", 91, kTypeValue
), c_iter_
->key().ToString());
464 ASSERT_EQ("hv91", c_iter_
->value().ToString());
466 ASSERT_FALSE(c_iter_
->Valid());
468 // Check that the compaction iterator did the correct sequence of calls on
469 // the underlying iterator.
470 using A
= LoggingForwardVectorIterator::Action
;
472 std::vector
<A
> expected_actions
= {
476 A(T::SEEK
, test::KeyStr("d+", kMaxSequenceNumber
, kValueTypeForSeek
)),
479 A(T::SEEK
, test::KeyStr("g+", kMaxSequenceNumber
, kValueTypeForSeek
)),
481 A(T::SEEK
, test::KeyStr("z", kMaxSequenceNumber
, kValueTypeForSeek
))};
482 ASSERT_EQ(expected_actions
, iter_
->log
);
485 TEST_P(CompactionIteratorTest
, ShuttingDownInFilter
) {
486 NoMergingMergeOp merge_op
;
487 StallingFilter filter
;
489 {test::KeyStr("1", 1, kTypeValue
), test::KeyStr("2", 2, kTypeValue
),
490 test::KeyStr("3", 3, kTypeValue
), test::KeyStr("4", 4, kTypeValue
)},
491 {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber
, kMaxSequenceNumber
,
493 // Don't leave tombstones (kTypeDeletion) for filtered keys.
494 compaction_proxy_
->key_not_exists_beyond_output_level
= true;
496 std::atomic
<bool> seek_done
{false};
497 rocksdb::port::Thread
compaction_thread([&] {
498 c_iter_
->SeekToFirst();
499 EXPECT_FALSE(c_iter_
->Valid());
500 EXPECT_TRUE(c_iter_
->status().IsShutdownInProgress());
501 seek_done
.store(true);
504 // Let key 1 through.
505 filter
.WaitForStall(1);
507 // Shutdown during compaction filter call for key 2.
508 filter
.WaitForStall(2);
509 shutting_down_
.store(true);
510 EXPECT_FALSE(seek_done
.load());
512 // Unstall filter and wait for SeekToFirst() to return.
513 filter
.stall_at
.store(3);
514 compaction_thread
.join();
515 assert(seek_done
.load());
517 // Check that filter was never called again.
518 EXPECT_EQ(2, filter
.last_seen
.load());
521 // Same as ShuttingDownInFilter, but shutdown happens during filter call for
522 // a merge operand, not for a value.
523 TEST_P(CompactionIteratorTest
, ShuttingDownInMerge
) {
524 NoMergingMergeOp merge_op
;
525 StallingFilter filter
;
527 {test::KeyStr("1", 1, kTypeValue
), test::KeyStr("2", 2, kTypeMerge
),
528 test::KeyStr("3", 3, kTypeMerge
), test::KeyStr("4", 4, kTypeValue
)},
529 {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber
, kMaxSequenceNumber
,
531 compaction_proxy_
->key_not_exists_beyond_output_level
= true;
533 std::atomic
<bool> seek_done
{false};
534 rocksdb::port::Thread
compaction_thread([&] {
535 c_iter_
->SeekToFirst();
536 ASSERT_FALSE(c_iter_
->Valid());
537 ASSERT_TRUE(c_iter_
->status().IsShutdownInProgress());
538 seek_done
.store(true);
541 // Let key 1 through.
542 filter
.WaitForStall(1);
544 // Shutdown during compaction filter call for key 2.
545 filter
.WaitForStall(2);
546 shutting_down_
.store(true);
547 EXPECT_FALSE(seek_done
.load());
549 // Unstall filter and wait for SeekToFirst() to return.
550 filter
.stall_at
.store(3);
551 compaction_thread
.join();
552 assert(seek_done
.load());
554 // Check that filter was never called again.
555 EXPECT_EQ(2, filter
.last_seen
.load());
558 TEST_P(CompactionIteratorTest
, SingleMergeOperand
) {
559 class Filter
: public CompactionFilter
{
560 Decision
FilterV2(int /*level*/, const Slice
& key
, ValueType t
,
561 const Slice
& existing_value
, std::string
* /*new_value*/,
562 std::string
* /*skip_until*/) const override
{
563 std::string k
= key
.ToString();
564 std::string v
= existing_value
.ToString();
566 // See InitIterators() call below for the sequence of keys and their
567 // filtering decisions. Here we closely assert that compaction filter is
568 // called with the expected keys and only them, and with the right values.
570 EXPECT_EQ(ValueType::kMergeOperand
, t
);
572 return Decision::kKeep
;
573 } else if (k
== "b") {
574 EXPECT_EQ(ValueType::kMergeOperand
, t
);
575 return Decision::kKeep
;
576 } else if (k
== "c") {
577 return Decision::kKeep
;
581 return Decision::kKeep
;
584 const char* Name() const override
{
585 return "CompactionIteratorTest.SingleMergeOperand::Filter";
589 class SingleMergeOp
: public MergeOperator
{
591 bool FullMergeV2(const MergeOperationInput
& merge_in
,
592 MergeOperationOutput
* merge_out
) const override
{
593 // See InitIterators() call below for why "c" is the only key for which
594 // FullMergeV2 should be called.
595 EXPECT_EQ("c", merge_in
.key
.ToString());
597 std::string temp_value
;
598 if (merge_in
.existing_value
!= nullptr) {
599 temp_value
= merge_in
.existing_value
->ToString();
602 for (auto& operand
: merge_in
.operand_list
) {
603 temp_value
.append(operand
.ToString());
605 merge_out
->new_value
= temp_value
;
610 bool PartialMergeMulti(const Slice
& key
,
611 const std::deque
<Slice
>& operand_list
,
612 std::string
* new_value
,
613 Logger
* /*logger*/) const override
{
614 std::string string_key
= key
.ToString();
615 EXPECT_TRUE(string_key
== "a" || string_key
== "b");
617 if (string_key
== "a") {
618 EXPECT_EQ(1, operand_list
.size());
619 } else if (string_key
== "b") {
620 EXPECT_EQ(2, operand_list
.size());
623 std::string temp_value
;
624 for (auto& operand
: operand_list
) {
625 temp_value
.append(operand
.ToString());
627 swap(temp_value
, *new_value
);
632 const char* Name() const override
{
633 return "CompactionIteratorTest SingleMergeOp";
636 bool AllowSingleOperand() const override
{ return true; }
639 SingleMergeOp merge_op
;
642 // a should invoke PartialMergeMulti with a single merge operand.
643 {test::KeyStr("a", 50, kTypeMerge
),
644 // b should invoke PartialMergeMulti with two operands.
645 test::KeyStr("b", 70, kTypeMerge
), test::KeyStr("b", 60, kTypeMerge
),
646 // c should invoke FullMerge due to kTypeValue at the beginning.
647 test::KeyStr("c", 90, kTypeMerge
), test::KeyStr("c", 80, kTypeValue
)},
648 {"av1", "bv2", "bv1", "cv2", "cv1"}, {}, {}, kMaxSequenceNumber
,
649 kMaxSequenceNumber
, &merge_op
, &filter
);
651 c_iter_
->SeekToFirst();
652 ASSERT_TRUE(c_iter_
->Valid());
653 ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge
), c_iter_
->key().ToString());
654 ASSERT_EQ("av1", c_iter_
->value().ToString());
656 ASSERT_TRUE(c_iter_
->Valid());
657 ASSERT_EQ("bv1bv2", c_iter_
->value().ToString());
659 ASSERT_EQ("cv1cv2", c_iter_
->value().ToString());
662 // In bottommost level, values earlier than earliest snapshot can be output
663 // with sequence = 0.
664 TEST_P(CompactionIteratorTest
, ZeroOutSequenceAtBottomLevel
) {
666 RunTest({test::KeyStr("a", 1, kTypeValue
), test::KeyStr("b", 2, kTypeValue
)},
668 {test::KeyStr("a", 0, kTypeValue
), test::KeyStr("b", 2, kTypeValue
)},
669 {"v1", "v2"}, kMaxSequenceNumber
/*last_commited_seq*/,
670 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
671 true /*bottommost_level*/);
674 // In bottommost level, deletions earlier than earliest snapshot can be removed
676 TEST_P(CompactionIteratorTest
, RemoveDeletionAtBottomLevel
) {
678 RunTest({test::KeyStr("a", 1, kTypeDeletion
),
679 test::KeyStr("b", 3, kTypeDeletion
),
680 test::KeyStr("b", 1, kTypeValue
)},
682 {test::KeyStr("b", 3, kTypeDeletion
),
683 test::KeyStr("b", 0, kTypeValue
)},
685 kMaxSequenceNumber
/*last_commited_seq*/, nullptr /*merge_operator*/,
686 nullptr /*compaction_filter*/, true /*bottommost_level*/);
689 // In bottommost level, single deletions earlier than earliest snapshot can be
690 // removed permanently.
691 TEST_P(CompactionIteratorTest
, RemoveSingleDeletionAtBottomLevel
) {
693 RunTest({test::KeyStr("a", 1, kTypeSingleDeletion
),
694 test::KeyStr("b", 2, kTypeSingleDeletion
)},
695 {"", ""}, {test::KeyStr("b", 2, kTypeSingleDeletion
)}, {""},
696 kMaxSequenceNumber
/*last_commited_seq*/, nullptr /*merge_operator*/,
697 nullptr /*compaction_filter*/, true /*bottommost_level*/);
700 INSTANTIATE_TEST_CASE_P(CompactionIteratorTestInstance
, CompactionIteratorTest
,
701 testing::Values(true, false));
703 // Tests how CompactionIterator work together with SnapshotChecker.
704 class CompactionIteratorWithSnapshotCheckerTest
705 : public CompactionIteratorTest
{
707 bool UseSnapshotChecker() const override
{ return true; }
710 // Uncommitted keys (keys with seq > last_committed_seq) should be output as-is
711 // while committed version of these keys should get compacted as usual.
713 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
714 PreserveUncommittedKeys_Value
) {
716 {test::KeyStr("foo", 3, kTypeValue
), test::KeyStr("foo", 2, kTypeValue
),
717 test::KeyStr("foo", 1, kTypeValue
)},
719 {test::KeyStr("foo", 3, kTypeValue
), test::KeyStr("foo", 2, kTypeValue
)},
720 {"v3", "v2"}, 2 /*last_committed_seq*/);
723 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
724 PreserveUncommittedKeys_Deletion
) {
725 RunTest({test::KeyStr("foo", 2, kTypeDeletion
),
726 test::KeyStr("foo", 1, kTypeValue
)},
728 {test::KeyStr("foo", 2, kTypeDeletion
),
729 test::KeyStr("foo", 1, kTypeValue
)},
730 {"", "v1"}, 1 /*last_committed_seq*/);
733 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
734 PreserveUncommittedKeys_Merge
) {
735 auto merge_op
= MergeOperators::CreateStringAppendOperator();
737 {test::KeyStr("foo", 3, kTypeMerge
), test::KeyStr("foo", 2, kTypeMerge
),
738 test::KeyStr("foo", 1, kTypeValue
)},
740 {test::KeyStr("foo", 3, kTypeMerge
), test::KeyStr("foo", 2, kTypeValue
)},
741 {"v3", "v1,v2"}, 2 /*last_committed_seq*/, merge_op
.get());
744 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
745 PreserveUncommittedKeys_SingleDelete
) {
746 RunTest({test::KeyStr("foo", 2, kTypeSingleDeletion
),
747 test::KeyStr("foo", 1, kTypeValue
)},
749 {test::KeyStr("foo", 2, kTypeSingleDeletion
),
750 test::KeyStr("foo", 1, kTypeValue
)},
751 {"", "v1"}, 1 /*last_committed_seq*/);
754 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
755 PreserveUncommittedKeys_BlobIndex
) {
756 RunTest({test::KeyStr("foo", 3, kTypeBlobIndex
),
757 test::KeyStr("foo", 2, kTypeBlobIndex
),
758 test::KeyStr("foo", 1, kTypeBlobIndex
)},
760 {test::KeyStr("foo", 3, kTypeBlobIndex
),
761 test::KeyStr("foo", 2, kTypeBlobIndex
)},
762 {"v3", "v2"}, 2 /*last_committed_seq*/);
765 // Test compaction iterator dedup keys visible to the same snapshot.
767 TEST_F(CompactionIteratorWithSnapshotCheckerTest
, DedupSameSnapshot_Value
) {
770 {test::KeyStr("foo", 4, kTypeValue
), test::KeyStr("foo", 3, kTypeValue
),
771 test::KeyStr("foo", 2, kTypeValue
), test::KeyStr("foo", 1, kTypeValue
)},
772 {"v4", "v3", "v2", "v1"},
773 {test::KeyStr("foo", 4, kTypeValue
), test::KeyStr("foo", 3, kTypeValue
),
774 test::KeyStr("foo", 1, kTypeValue
)},
775 {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
778 TEST_F(CompactionIteratorWithSnapshotCheckerTest
, DedupSameSnapshot_Deletion
) {
781 {test::KeyStr("foo", 4, kTypeValue
),
782 test::KeyStr("foo", 3, kTypeDeletion
),
783 test::KeyStr("foo", 2, kTypeValue
), test::KeyStr("foo", 1, kTypeValue
)},
784 {"v4", "", "v2", "v1"},
785 {test::KeyStr("foo", 4, kTypeValue
),
786 test::KeyStr("foo", 3, kTypeDeletion
),
787 test::KeyStr("foo", 1, kTypeValue
)},
788 {"v4", "", "v1"}, 3 /*last_committed_seq*/);
791 TEST_F(CompactionIteratorWithSnapshotCheckerTest
, DedupSameSnapshot_Merge
) {
794 auto merge_op
= MergeOperators::CreateStringAppendOperator();
796 {test::KeyStr("foo", 5, kTypeMerge
), test::KeyStr("foo", 4, kTypeMerge
),
797 test::KeyStr("foo", 3, kTypeMerge
), test::KeyStr("foo", 2, kTypeMerge
),
798 test::KeyStr("foo", 1, kTypeValue
)},
799 {"v5", "v4", "v3", "v2", "v1"},
800 {test::KeyStr("foo", 5, kTypeMerge
), test::KeyStr("foo", 4, kTypeMerge
),
801 test::KeyStr("foo", 3, kTypeMerge
), test::KeyStr("foo", 1, kTypeValue
)},
802 {"v5", "v4", "v2,v3", "v1"}, 4 /*last_committed_seq*/, merge_op
.get());
805 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
806 DedupSameSnapshot_SingleDeletion
) {
809 {test::KeyStr("foo", 4, kTypeValue
),
810 test::KeyStr("foo", 3, kTypeSingleDeletion
),
811 test::KeyStr("foo", 2, kTypeValue
), test::KeyStr("foo", 1, kTypeValue
)},
812 {"v4", "", "v2", "v1"},
813 {test::KeyStr("foo", 4, kTypeValue
), test::KeyStr("foo", 1, kTypeValue
)},
814 {"v4", "v1"}, 3 /*last_committed_seq*/);
817 TEST_F(CompactionIteratorWithSnapshotCheckerTest
, DedupSameSnapshot_BlobIndex
) {
819 RunTest({test::KeyStr("foo", 4, kTypeBlobIndex
),
820 test::KeyStr("foo", 3, kTypeBlobIndex
),
821 test::KeyStr("foo", 2, kTypeBlobIndex
),
822 test::KeyStr("foo", 1, kTypeBlobIndex
)},
823 {"v4", "v3", "v2", "v1"},
824 {test::KeyStr("foo", 4, kTypeBlobIndex
),
825 test::KeyStr("foo", 3, kTypeBlobIndex
),
826 test::KeyStr("foo", 1, kTypeBlobIndex
)},
827 {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
830 // At bottom level, sequence numbers can be zero out, and deletions can be
831 // removed, but only when they are visible to earliest snapshot.
833 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
834 NotZeroOutSequenceIfNotVisibleToEarliestSnapshot
) {
836 RunTest({test::KeyStr("a", 1, kTypeValue
), test::KeyStr("b", 2, kTypeValue
),
837 test::KeyStr("c", 3, kTypeValue
)},
839 {test::KeyStr("a", 0, kTypeValue
), test::KeyStr("b", 2, kTypeValue
),
840 test::KeyStr("c", 3, kTypeValue
)},
841 {"v1", "v2", "v3"}, kMaxSequenceNumber
/*last_commited_seq*/,
842 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
843 true /*bottommost_level*/);
846 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
847 NotRemoveDeletionIfNotVisibleToEarliestSnapshot
) {
850 {test::KeyStr("a", 1, kTypeDeletion
), test::KeyStr("b", 2, kTypeDeletion
),
851 test::KeyStr("c", 3, kTypeDeletion
)},
854 {"", ""}, kMaxSequenceNumber
/*last_commited_seq*/,
855 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
856 true /*bottommost_level*/);
859 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
860 NotRemoveDeletionIfValuePresentToEarlierSnapshot
) {
863 {test::KeyStr("a", 4, kTypeDeletion
), test::KeyStr("a", 1, kTypeValue
),
864 test::KeyStr("b", 3, kTypeValue
)},
866 {test::KeyStr("a", 4, kTypeDeletion
), test::KeyStr("a", 0, kTypeValue
),
867 test::KeyStr("b", 3, kTypeValue
)},
868 {"", "", ""}, kMaxSequenceNumber
/*last_commited_seq*/,
869 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
870 true /*bottommost_level*/);
873 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
874 NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot
) {
876 RunTest({test::KeyStr("a", 1, kTypeSingleDeletion
),
877 test::KeyStr("b", 2, kTypeSingleDeletion
),
878 test::KeyStr("c", 3, kTypeSingleDeletion
)},
880 {test::KeyStr("b", 2, kTypeSingleDeletion
),
881 test::KeyStr("c", 3, kTypeSingleDeletion
)},
882 {"", ""}, kMaxSequenceNumber
/*last_commited_seq*/,
883 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
884 true /*bottommost_level*/);
887 // Single delete should not cancel out values that not visible to the
888 // same set of snapshots
889 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
890 SingleDeleteAcrossSnapshotBoundary
) {
892 RunTest({test::KeyStr("a", 2, kTypeSingleDeletion
),
893 test::KeyStr("a", 1, kTypeValue
)},
895 {test::KeyStr("a", 2, kTypeSingleDeletion
),
896 test::KeyStr("a", 1, kTypeValue
)},
897 {"", "v1"}, 2 /*last_committed_seq*/);
900 // Single delete should be kept in case it is not visible to the
901 // earliest write conflict snapshot. If a single delete is kept for this reason,
902 // corresponding value can be trimmed to save space.
903 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
904 KeepSingleDeletionForWriteConflictChecking
) {
906 RunTest({test::KeyStr("a", 2, kTypeSingleDeletion
),
907 test::KeyStr("a", 1, kTypeValue
)},
909 {test::KeyStr("a", 2, kTypeSingleDeletion
),
910 test::KeyStr("a", 1, kTypeValue
)},
911 {"", ""}, 2 /*last_committed_seq*/, nullptr /*merge_operator*/,
912 nullptr /*compaction_filter*/, false /*bottommost_level*/,
913 2 /*earliest_write_conflict_snapshot*/);
916 // Compaction filter should keep uncommitted key as-is, and
917 // * Convert the latest velue to deletion, and/or
918 // * if latest value is a merge, apply filter to all suequent merges.
920 TEST_F(CompactionIteratorWithSnapshotCheckerTest
, CompactionFilter_Value
) {
921 std::unique_ptr
<CompactionFilter
> compaction_filter(
922 new FilterAllKeysCompactionFilter());
924 {test::KeyStr("a", 2, kTypeValue
), test::KeyStr("a", 1, kTypeValue
),
925 test::KeyStr("b", 3, kTypeValue
), test::KeyStr("c", 1, kTypeValue
)},
926 {"v2", "v1", "v3", "v4"},
927 {test::KeyStr("a", 2, kTypeValue
), test::KeyStr("a", 1, kTypeDeletion
),
928 test::KeyStr("b", 3, kTypeValue
), test::KeyStr("c", 1, kTypeDeletion
)},
929 {"v2", "", "v3", ""}, 1 /*last_committed_seq*/,
930 nullptr /*merge_operator*/, compaction_filter
.get());
933 TEST_F(CompactionIteratorWithSnapshotCheckerTest
, CompactionFilter_Deletion
) {
934 std::unique_ptr
<CompactionFilter
> compaction_filter(
935 new FilterAllKeysCompactionFilter());
937 {test::KeyStr("a", 2, kTypeDeletion
), test::KeyStr("a", 1, kTypeValue
)},
939 {test::KeyStr("a", 2, kTypeDeletion
),
940 test::KeyStr("a", 1, kTypeDeletion
)},
941 {"", ""}, 1 /*last_committed_seq*/, nullptr /*merge_operator*/,
942 compaction_filter
.get());
945 TEST_F(CompactionIteratorWithSnapshotCheckerTest
,
946 CompactionFilter_PartialMerge
) {
947 std::shared_ptr
<MergeOperator
> merge_op
=
948 MergeOperators::CreateStringAppendOperator();
949 std::unique_ptr
<CompactionFilter
> compaction_filter(
950 new FilterAllKeysCompactionFilter());
951 RunTest({test::KeyStr("a", 3, kTypeMerge
), test::KeyStr("a", 2, kTypeMerge
),
952 test::KeyStr("a", 1, kTypeMerge
)},
953 {"v3", "v2", "v1"}, {test::KeyStr("a", 3, kTypeMerge
)}, {"v3"},
954 2 /*last_committed_seq*/, merge_op
.get(), compaction_filter
.get());
957 TEST_F(CompactionIteratorWithSnapshotCheckerTest
, CompactionFilter_FullMerge
) {
958 std::shared_ptr
<MergeOperator
> merge_op
=
959 MergeOperators::CreateStringAppendOperator();
960 std::unique_ptr
<CompactionFilter
> compaction_filter(
961 new FilterAllKeysCompactionFilter());
963 {test::KeyStr("a", 3, kTypeMerge
), test::KeyStr("a", 2, kTypeMerge
),
964 test::KeyStr("a", 1, kTypeValue
)},
966 {test::KeyStr("a", 3, kTypeMerge
), test::KeyStr("a", 1, kTypeDeletion
)},
967 {"v3", ""}, 2 /*last_committed_seq*/, merge_op
.get(),
968 compaction_filter
.get());
971 } // namespace rocksdb
973 int main(int argc
, char** argv
) {
974 ::testing::InitGoogleTest(&argc
, argv
);
975 return RUN_ALL_TESTS();