1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "db/db_test_util.h"
9 #include "db/forward_iterator.h"
10 #include "port/stack_trace.h"
11 #include "rocksdb/merge_operator.h"
12 #include "utilities/merge_operators.h"
13 #include "utilities/merge_operators/string_append/stringappend2.h"
17 class TestReadCallback
: public ReadCallback
{
19 TestReadCallback(SnapshotChecker
* snapshot_checker
,
20 SequenceNumber snapshot_seq
)
21 : ReadCallback(snapshot_seq
),
22 snapshot_checker_(snapshot_checker
),
23 snapshot_seq_(snapshot_seq
) {}
25 bool IsVisibleFullCheck(SequenceNumber seq
) override
{
26 return snapshot_checker_
->CheckInSnapshot(seq
, snapshot_seq_
) ==
27 SnapshotCheckerResult::kInSnapshot
;
31 SnapshotChecker
* snapshot_checker_
;
32 SequenceNumber snapshot_seq_
;
35 // Test merge operator functionality.
36 class DBMergeOperatorTest
: public DBTestBase
{
38 DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
40 std::string
GetWithReadCallback(SnapshotChecker
* snapshot_checker
,
42 const Snapshot
* snapshot
= nullptr) {
43 SequenceNumber seq
= snapshot
== nullptr ? db_
->GetLatestSequenceNumber()
44 : snapshot
->GetSequenceNumber();
45 TestReadCallback
read_callback(snapshot_checker
, seq
);
47 read_opt
.snapshot
= snapshot
;
50 dbfull()->GetImpl(read_opt
, db_
->DefaultColumnFamily(), key
, &value
,
51 nullptr /*value_found*/, &read_callback
);
55 return value
.ToString();
59 TEST_F(DBMergeOperatorTest
, LimitMergeOperands
) {
60 class LimitedStringAppendMergeOp
: public StringAppendTESTOperator
{
62 LimitedStringAppendMergeOp(int limit
, char delim
)
63 : StringAppendTESTOperator(delim
), limit_(limit
) {}
65 const char* Name() const override
{
66 return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
69 bool ShouldMerge(const std::vector
<Slice
>& operands
) const override
{
70 if (operands
.size() > 0 && limit_
> 0 && operands
.size() >= limit_
) {
81 options
.create_if_missing
= true;
82 // Use only the latest two merge operands.
83 options
.merge_operator
=
84 std::make_shared
<LimitedStringAppendMergeOp
>(2, ',');
87 // All K1 values are in memtable.
88 ASSERT_OK(Merge("k1", "a"));
89 ASSERT_OK(Merge("k1", "b"));
90 ASSERT_OK(Merge("k1", "c"));
91 ASSERT_OK(Merge("k1", "d"));
93 ASSERT_TRUE(db_
->Get(ReadOptions(), "k1", &value
).ok());
94 // Make sure that only the latest two merge operands are used. If this was
95 // not the case the value would be "a,b,c,d".
96 ASSERT_EQ(value
, "c,d");
98 // All K2 values are flushed to L0 into a single file.
99 ASSERT_OK(Merge("k2", "a"));
100 ASSERT_OK(Merge("k2", "b"));
101 ASSERT_OK(Merge("k2", "c"));
102 ASSERT_OK(Merge("k2", "d"));
104 ASSERT_TRUE(db_
->Get(ReadOptions(), "k2", &value
).ok());
105 ASSERT_EQ(value
, "c,d");
107 // All K3 values are flushed and are in different files.
108 ASSERT_OK(Merge("k3", "ab"));
110 ASSERT_OK(Merge("k3", "bc"));
112 ASSERT_OK(Merge("k3", "cd"));
114 ASSERT_OK(Merge("k3", "de"));
115 ASSERT_TRUE(db_
->Get(ReadOptions(), "k3", &value
).ok());
116 ASSERT_EQ(value
, "cd,de");
118 // All K4 values are in different levels
119 ASSERT_OK(Merge("k4", "ab"));
122 ASSERT_OK(Merge("k4", "bc"));
125 ASSERT_OK(Merge("k4", "cd"));
128 ASSERT_OK(Merge("k4", "de"));
129 ASSERT_TRUE(db_
->Get(ReadOptions(), "k4", &value
).ok());
130 ASSERT_EQ(value
, "cd,de");
133 TEST_F(DBMergeOperatorTest
, MergeErrorOnRead
) {
135 options
.create_if_missing
= true;
136 options
.merge_operator
.reset(new TestPutOperator());
139 ASSERT_OK(Merge("k1", "v1"));
140 ASSERT_OK(Merge("k1", "corrupted"));
142 ASSERT_TRUE(db_
->Get(ReadOptions(), "k1", &value
).IsCorruption());
143 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
146 TEST_F(DBMergeOperatorTest
, MergeErrorOnWrite
) {
148 options
.create_if_missing
= true;
149 options
.merge_operator
.reset(new TestPutOperator());
150 options
.max_successive_merges
= 3;
153 ASSERT_OK(Merge("k1", "v1"));
154 ASSERT_OK(Merge("k1", "v2"));
155 // Will trigger a merge when hitting max_successive_merges and the merge
156 // will fail. The delta will be inserted nevertheless.
157 ASSERT_OK(Merge("k1", "corrupted"));
158 // Data should stay unmerged after the error.
159 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
162 TEST_F(DBMergeOperatorTest
, MergeErrorOnIteration
) {
164 options
.create_if_missing
= true;
165 options
.merge_operator
.reset(new TestPutOperator());
168 DestroyAndReopen(options
);
169 ASSERT_OK(Merge("k1", "v1"));
170 ASSERT_OK(Merge("k1", "corrupted"));
171 ASSERT_OK(Put("k2", "v2"));
172 auto* iter
= db_
->NewIterator(ReadOptions());
174 ASSERT_FALSE(iter
->Valid());
175 ASSERT_TRUE(iter
->status().IsCorruption());
177 iter
= db_
->NewIterator(ReadOptions());
179 ASSERT_TRUE(iter
->Valid());
180 ASSERT_OK(iter
->status());
182 ASSERT_FALSE(iter
->Valid());
183 ASSERT_TRUE(iter
->status().IsCorruption());
185 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
187 DestroyAndReopen(options
);
188 ASSERT_OK(Merge("k1", "v1"));
189 ASSERT_OK(Put("k2", "v2"));
190 ASSERT_OK(Merge("k2", "corrupted"));
191 iter
= db_
->NewIterator(ReadOptions());
193 ASSERT_TRUE(iter
->Valid());
194 ASSERT_OK(iter
->status());
196 ASSERT_FALSE(iter
->Valid());
197 ASSERT_TRUE(iter
->status().IsCorruption());
199 VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
203 class MergeOperatorPinningTest
: public DBMergeOperatorTest
,
204 public testing::WithParamInterface
<bool> {
206 MergeOperatorPinningTest() { disable_block_cache_
= GetParam(); }
208 bool disable_block_cache_
;
211 INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest
, MergeOperatorPinningTest
,
215 TEST_P(MergeOperatorPinningTest
, OperandsMultiBlocks
) {
216 Options options
= CurrentOptions();
217 BlockBasedTableOptions table_options
;
218 table_options
.block_size
= 1; // every block will contain one entry
219 table_options
.no_block_cache
= disable_block_cache_
;
220 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
221 options
.merge_operator
= MergeOperators::CreateStringAppendTESTOperator();
222 options
.level0_slowdown_writes_trigger
= (1 << 30);
223 options
.level0_stop_writes_trigger
= (1 << 30);
224 options
.disable_auto_compactions
= true;
225 DestroyAndReopen(options
);
227 const int kKeysPerFile
= 10;
228 const int kOperandsPerKeyPerFile
= 7;
229 const int kOperandSize
= 100;
230 // Filse to write in L0 before compacting to lower level
231 const int kFilesPerLevel
= 3;
234 std::map
<std::string
, std::string
> true_data
;
239 for (int j
= 0; j
< kKeysPerFile
; j
++) {
240 std::string key
= Key(key_id
% 35);
242 for (int k
= 0; k
< kOperandsPerKeyPerFile
; k
++) {
243 std::string val
= RandomString(&rnd
, kOperandSize
);
244 ASSERT_OK(db_
->Merge(WriteOptions(), key
, val
));
245 if (true_data
[key
].size() == 0) {
246 true_data
[key
] = val
;
248 true_data
[key
] += "," + val
;
253 if (lvl_to_fill
== -1) {
254 // Keep last batch in memtable and stop
259 if (batch_num
% kFilesPerLevel
== 0) {
260 if (lvl_to_fill
!= 0) {
261 MoveFilesToLevel(lvl_to_fill
);
273 ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
275 VerifyDBFromMap(true_data
);
278 TEST_P(MergeOperatorPinningTest
, Randomized
) {
280 Options options
= CurrentOptions();
281 options
.merge_operator
= MergeOperators::CreateMaxOperator();
282 BlockBasedTableOptions table_options
;
283 table_options
.no_block_cache
= disable_block_cache_
;
284 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
285 DestroyAndReopen(options
);
288 std::map
<std::string
, std::string
> true_data
;
290 const int kTotalMerges
= 5000;
291 // Every key gets ~10 operands
292 const int kKeyRange
= kTotalMerges
/ 10;
293 const int kOperandSize
= 20;
294 const int kNumPutBefore
= kKeyRange
/ 10; // 10% value
295 const int kNumPutAfter
= kKeyRange
/ 10; // 10% overwrite
296 const int kNumDelete
= kKeyRange
/ 10; // 10% delete
298 // kNumPutBefore keys will have base values
299 for (int i
= 0; i
< kNumPutBefore
; i
++) {
300 std::string key
= Key(rnd
.Next() % kKeyRange
);
301 std::string value
= RandomString(&rnd
, kOperandSize
);
302 ASSERT_OK(db_
->Put(WriteOptions(), key
, value
));
304 true_data
[key
] = value
;
307 // Do kTotalMerges merges
308 for (int i
= 0; i
< kTotalMerges
; i
++) {
309 std::string key
= Key(rnd
.Next() % kKeyRange
);
310 std::string value
= RandomString(&rnd
, kOperandSize
);
311 ASSERT_OK(db_
->Merge(WriteOptions(), key
, value
));
313 if (true_data
[key
] < value
) {
314 true_data
[key
] = value
;
318 // Overwrite random kNumPutAfter keys
319 for (int i
= 0; i
< kNumPutAfter
; i
++) {
320 std::string key
= Key(rnd
.Next() % kKeyRange
);
321 std::string value
= RandomString(&rnd
, kOperandSize
);
322 ASSERT_OK(db_
->Put(WriteOptions(), key
, value
));
324 true_data
[key
] = value
;
327 // Delete random kNumDelete keys
328 for (int i
= 0; i
< kNumDelete
; i
++) {
329 std::string key
= Key(rnd
.Next() % kKeyRange
);
330 ASSERT_OK(db_
->Delete(WriteOptions(), key
));
332 true_data
.erase(key
);
335 VerifyDBFromMap(true_data
);
337 } while (ChangeOptions(kSkipMergePut
));
340 class MergeOperatorHook
: public MergeOperator
{
342 explicit MergeOperatorHook(std::shared_ptr
<MergeOperator
> _merge_op
)
343 : merge_op_(_merge_op
) {}
345 bool FullMergeV2(const MergeOperationInput
& merge_in
,
346 MergeOperationOutput
* merge_out
) const override
{
348 bool res
= merge_op_
->FullMergeV2(merge_in
, merge_out
);
353 const char* Name() const override
{ return merge_op_
->Name(); }
355 std::shared_ptr
<MergeOperator
> merge_op_
;
356 std::function
<void()> before_merge_
= []() {};
357 std::function
<void()> after_merge_
= []() {};
360 TEST_P(MergeOperatorPinningTest
, EvictCacheBeforeMerge
) {
361 Options options
= CurrentOptions();
364 std::make_shared
<MergeOperatorHook
>(MergeOperators::CreateMaxOperator());
365 options
.merge_operator
= merge_hook
;
366 options
.disable_auto_compactions
= true;
367 options
.level0_slowdown_writes_trigger
= (1 << 30);
368 options
.level0_stop_writes_trigger
= (1 << 30);
369 options
.max_open_files
= 20;
370 BlockBasedTableOptions bbto
;
371 bbto
.no_block_cache
= disable_block_cache_
;
372 if (bbto
.no_block_cache
== false) {
373 bbto
.block_cache
= NewLRUCache(64 * 1024 * 1024);
375 bbto
.block_cache
= nullptr;
377 options
.table_factory
.reset(NewBlockBasedTableFactory(bbto
));
378 DestroyAndReopen(options
);
380 const int kNumOperands
= 30;
381 const int kNumKeys
= 1000;
382 const int kOperandSize
= 100;
385 // 1000 keys every key have 30 operands, every operand is in a different file
386 std::map
<std::string
, std::string
> true_data
;
387 for (int i
= 0; i
< kNumOperands
; i
++) {
388 for (int j
= 0; j
< kNumKeys
; j
++) {
389 std::string k
= Key(j
);
390 std::string v
= RandomString(&rnd
, kOperandSize
);
391 ASSERT_OK(db_
->Merge(WriteOptions(), k
, v
));
393 true_data
[k
] = std::max(true_data
[k
], v
);
398 std::vector
<uint64_t> file_numbers
= ListTableFiles(env_
, dbname_
);
399 ASSERT_EQ(file_numbers
.size(), kNumOperands
);
402 // Code executed before merge operation
403 merge_hook
->before_merge_
= [&]() {
404 // Evict all tables from cache before every merge operation
405 for (uint64_t num
: file_numbers
) {
406 TableCache::Evict(dbfull()->TEST_table_cache(), num
);
408 // Decrease cache capacity to force all unrefed blocks to be evicted
409 if (bbto
.block_cache
) {
410 bbto
.block_cache
->SetCapacity(1);
415 // Code executed after merge operation
416 merge_hook
->after_merge_
= [&]() {
417 // Increase capacity again after doing the merge
418 if (bbto
.block_cache
) {
419 bbto
.block_cache
->SetCapacity(64 * 1024 * 1024);
424 VerifyDBFromMap(true_data
, &total_reads
);
425 ASSERT_EQ(merge_cnt
, total_reads
);
427 db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
429 VerifyDBFromMap(true_data
, &total_reads
);
432 TEST_P(MergeOperatorPinningTest
, TailingIterator
) {
433 Options options
= CurrentOptions();
434 options
.merge_operator
= MergeOperators::CreateMaxOperator();
435 BlockBasedTableOptions bbto
;
436 bbto
.no_block_cache
= disable_block_cache_
;
437 options
.table_factory
.reset(NewBlockBasedTableFactory(bbto
));
438 DestroyAndReopen(options
);
440 const int kNumOperands
= 100;
441 const int kNumWrites
= 100000;
443 std::function
<void()> writer_func
= [&]() {
445 for (int i
= 0; i
< kNumWrites
; i
++) {
446 db_
->Merge(WriteOptions(), Key(k
), Key(k
));
448 if (i
&& i
% kNumOperands
== 0) {
451 if (i
&& i
% 127 == 0) {
454 if (i
&& i
% 317 == 0) {
455 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
460 std::function
<void()> reader_func
= [&]() {
463 Iterator
* iter
= db_
->NewIterator(ro
);
466 for (int i
= 0; i
< (kNumWrites
/ kNumOperands
); i
++) {
467 while (!iter
->Valid()) {
468 // wait for the key to be written
469 env_
->SleepForMicroseconds(100);
472 ASSERT_EQ(iter
->key(), Key(i
));
473 ASSERT_EQ(iter
->value(), Key(i
));
481 rocksdb::port::Thread
writer_thread(writer_func
);
482 rocksdb::port::Thread
reader_thread(reader_func
);
484 writer_thread
.join();
485 reader_thread
.join();
488 TEST_F(DBMergeOperatorTest
, TailingIteratorMemtableUnrefedBySomeoneElse
) {
489 Options options
= CurrentOptions();
490 options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
491 DestroyAndReopen(options
);
493 // Overview of the test:
494 // * There are two merge operands for the same key: one in an sst file,
495 // another in a memtable.
496 // * Seek a tailing iterator to this key.
497 // * As part of the seek, the iterator will:
498 // (a) first visit the operand in the memtable and tell ForwardIterator
499 // to pin this operand, then
500 // (b) move on to the operand in the sst file, then pass both operands
501 // to merge operator.
502 // * The memtable may get flushed and unreferenced by another thread between
503 // (a) and (b). The test simulates it by flushing the memtable inside a
504 // SyncPoint callback located between (a) and (b).
505 // * In this case it's ForwardIterator's responsibility to keep the memtable
506 // pinned until (b) is complete. There used to be a bug causing
507 // ForwardIterator to not pin it in some circumstances. This test
510 db_
->Merge(WriteOptions(), "key", "sst");
511 db_
->Flush(FlushOptions()); // Switch to SuperVersion A
512 db_
->Merge(WriteOptions(), "key", "memtable");
514 // Pin SuperVersion A
515 std::unique_ptr
<Iterator
> someone_else(db_
->NewIterator(ReadOptions()));
517 bool pushed_first_operand
= false;
518 bool stepped_to_next_operand
= false;
519 rocksdb::SyncPoint::GetInstance()->SetCallBack(
520 "DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
521 EXPECT_FALSE(pushed_first_operand
);
522 pushed_first_operand
= true;
523 db_
->Flush(FlushOptions()); // Switch to SuperVersion B
525 rocksdb::SyncPoint::GetInstance()->SetCallBack(
526 "DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
527 EXPECT_FALSE(stepped_to_next_operand
);
528 stepped_to_next_operand
= true;
529 someone_else
.reset(); // Unpin SuperVersion A
531 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
535 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(ro
));
538 ASSERT_TRUE(iter
->status().ok());
539 ASSERT_TRUE(iter
->Valid());
540 EXPECT_EQ(std::string("sst,memtable"), iter
->value().ToString());
541 EXPECT_TRUE(pushed_first_operand
);
542 EXPECT_TRUE(stepped_to_next_operand
);
544 #endif // ROCKSDB_LITE
546 TEST_F(DBMergeOperatorTest
, SnapshotCheckerAndReadCallback
) {
547 Options options
= CurrentOptions();
548 options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
549 DestroyAndReopen(options
);
551 class TestSnapshotChecker
: public SnapshotChecker
{
553 SnapshotCheckerResult
CheckInSnapshot(
554 SequenceNumber seq
, SequenceNumber snapshot_seq
) const override
{
555 return IsInSnapshot(seq
, snapshot_seq
)
556 ? SnapshotCheckerResult::kInSnapshot
557 : SnapshotCheckerResult::kNotInSnapshot
;
560 bool IsInSnapshot(SequenceNumber seq
, SequenceNumber snapshot_seq
) const {
561 switch (snapshot_seq
) {
567 // seq = 2 not visible to snapshot with seq = 2
572 // seq = 4 not visible to snpahost with seq = 4
575 // seq >=4 is uncommitted
580 TestSnapshotChecker
* snapshot_checker
= new TestSnapshotChecker();
581 dbfull()->SetSnapshotChecker(snapshot_checker
);
584 ASSERT_OK(Merge("foo", "v1"));
585 ASSERT_EQ(1, db_
->GetLatestSequenceNumber());
586 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker
, "foo"));
587 ASSERT_OK(Merge("foo", "v2"));
588 ASSERT_EQ(2, db_
->GetLatestSequenceNumber());
589 // v2 is not visible to latest snapshot, which has seq = 2.
590 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker
, "foo"));
591 // Take a snapshot with seq = 2.
592 const Snapshot
* snapshot1
= db_
->GetSnapshot();
593 ASSERT_EQ(2, snapshot1
->GetSequenceNumber());
594 // v2 is not visible to snapshot1, which has seq = 2
595 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker
, "foo", snapshot1
));
597 // Verify flush doesn't alter the result.
599 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker
, "foo", snapshot1
));
600 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker
, "foo"));
602 ASSERT_OK(Merge("foo", "v3"));
603 ASSERT_EQ(3, db_
->GetLatestSequenceNumber());
604 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker
, "foo"));
605 ASSERT_OK(Merge("foo", "v4"));
606 ASSERT_EQ(4, db_
->GetLatestSequenceNumber());
607 // v4 is not visible to latest snapshot, which has seq = 4.
608 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker
, "foo"));
609 const Snapshot
* snapshot2
= db_
->GetSnapshot();
610 ASSERT_EQ(4, snapshot2
->GetSequenceNumber());
611 // v4 is not visible to snapshot2, which has seq = 4.
612 ASSERT_EQ("v1,v2,v3",
613 GetWithReadCallback(snapshot_checker
, "foo", snapshot2
));
615 // Verify flush doesn't alter the result.
617 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker
, "foo", snapshot1
));
618 ASSERT_EQ("v1,v2,v3",
619 GetWithReadCallback(snapshot_checker
, "foo", snapshot2
));
620 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker
, "foo"));
622 ASSERT_OK(Merge("foo", "v5"));
623 ASSERT_EQ(5, db_
->GetLatestSequenceNumber());
625 ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker
, "foo"));
627 // full manual compaction.
628 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
630 // Verify compaction doesn't alter the result.
631 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker
, "foo", snapshot1
));
632 ASSERT_EQ("v1,v2,v3",
633 GetWithReadCallback(snapshot_checker
, "foo", snapshot2
));
634 ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker
, "foo"));
636 db_
->ReleaseSnapshot(snapshot1
);
637 db_
->ReleaseSnapshot(snapshot2
);
640 } // namespace rocksdb
642 int main(int argc
, char** argv
) {
643 rocksdb::port::InstallStackTraceHandler();
644 ::testing::InitGoogleTest(&argc
, argv
);
645 return RUN_ALL_TESTS();