]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_merge_operator_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_merge_operator_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 #include <string>
6 #include <vector>
7
8 #include "db/db_test_util.h"
9 #include "db/forward_iterator.h"
10 #include "port/stack_trace.h"
11 #include "rocksdb/merge_operator.h"
12 #include "util/random.h"
13 #include "utilities/merge_operators.h"
14 #include "utilities/merge_operators/string_append/stringappend2.h"
15
16 namespace ROCKSDB_NAMESPACE {
17
18 class TestReadCallback : public ReadCallback {
19 public:
20 TestReadCallback(SnapshotChecker* snapshot_checker,
21 SequenceNumber snapshot_seq)
22 : ReadCallback(snapshot_seq),
23 snapshot_checker_(snapshot_checker),
24 snapshot_seq_(snapshot_seq) {}
25
26 bool IsVisibleFullCheck(SequenceNumber seq) override {
27 return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==
28 SnapshotCheckerResult::kInSnapshot;
29 }
30
31 private:
32 SnapshotChecker* snapshot_checker_;
33 SequenceNumber snapshot_seq_;
34 };
35
36 // Test merge operator functionality.
37 class DBMergeOperatorTest : public DBTestBase {
38 public:
39 DBMergeOperatorTest()
40 : DBTestBase("db_merge_operator_test", /*env_do_fsync=*/false) {}
41
42 std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
43 const Slice& key,
44 const Snapshot* snapshot = nullptr) {
45 SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
46 : snapshot->GetSequenceNumber();
47 TestReadCallback read_callback(snapshot_checker, seq);
48 ReadOptions read_opt;
49 read_opt.snapshot = snapshot;
50 PinnableSlice value;
51 DBImpl::GetImplOptions get_impl_options;
52 get_impl_options.column_family = db_->DefaultColumnFamily();
53 get_impl_options.value = &value;
54 get_impl_options.callback = &read_callback;
55 Status s = dbfull()->GetImpl(read_opt, key, get_impl_options);
56 if (!s.ok()) {
57 return s.ToString();
58 }
59 return value.ToString();
60 }
61 };
62
63 TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
64 class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
65 public:
66 LimitedStringAppendMergeOp(int limit, char delim)
67 : StringAppendTESTOperator(delim), limit_(limit) {}
68
69 const char* Name() const override {
70 return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
71 }
72
73 bool ShouldMerge(const std::vector<Slice>& operands) const override {
74 if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
75 return true;
76 }
77 return false;
78 }
79
80 private:
81 size_t limit_ = 0;
82 };
83
84 Options options;
85 options.create_if_missing = true;
86 // Use only the latest two merge operands.
87 options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
88 options.env = env_;
89 Reopen(options);
90 // All K1 values are in memtable.
91 ASSERT_OK(Merge("k1", "a"));
92 ASSERT_OK(Merge("k1", "b"));
93 ASSERT_OK(Merge("k1", "c"));
94 ASSERT_OK(Merge("k1", "d"));
95 std::string value;
96 ASSERT_OK(db_->Get(ReadOptions(), "k1", &value));
97 // Make sure that only the latest two merge operands are used. If this was
98 // not the case the value would be "a,b,c,d".
99 ASSERT_EQ(value, "c,d");
100
101 // All K2 values are flushed to L0 into a single file.
102 ASSERT_OK(Merge("k2", "a"));
103 ASSERT_OK(Merge("k2", "b"));
104 ASSERT_OK(Merge("k2", "c"));
105 ASSERT_OK(Merge("k2", "d"));
106 ASSERT_OK(Flush());
107 ASSERT_OK(db_->Get(ReadOptions(), "k2", &value));
108 ASSERT_EQ(value, "c,d");
109
110 // All K3 values are flushed and are in different files.
111 ASSERT_OK(Merge("k3", "ab"));
112 ASSERT_OK(Flush());
113 ASSERT_OK(Merge("k3", "bc"));
114 ASSERT_OK(Flush());
115 ASSERT_OK(Merge("k3", "cd"));
116 ASSERT_OK(Flush());
117 ASSERT_OK(Merge("k3", "de"));
118 ASSERT_OK(db_->Get(ReadOptions(), "k3", &value));
119 ASSERT_EQ(value, "cd,de");
120
121 // All K4 values are in different levels
122 ASSERT_OK(Merge("k4", "ab"));
123 ASSERT_OK(Flush());
124 MoveFilesToLevel(4);
125 ASSERT_OK(Merge("k4", "bc"));
126 ASSERT_OK(Flush());
127 MoveFilesToLevel(3);
128 ASSERT_OK(Merge("k4", "cd"));
129 ASSERT_OK(Flush());
130 MoveFilesToLevel(1);
131 ASSERT_OK(Merge("k4", "de"));
132 ASSERT_OK(db_->Get(ReadOptions(), "k4", &value));
133 ASSERT_EQ(value, "cd,de");
134 }
135
136 TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
137 Options options;
138 options.create_if_missing = true;
139 options.merge_operator.reset(new TestPutOperator());
140 options.env = env_;
141 Reopen(options);
142 ASSERT_OK(Merge("k1", "v1"));
143 ASSERT_OK(Merge("k1", "corrupted"));
144 std::string value;
145 ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
146 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
147 }
148
149 TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
150 Options options;
151 options.create_if_missing = true;
152 options.merge_operator.reset(new TestPutOperator());
153 options.max_successive_merges = 3;
154 options.env = env_;
155 Reopen(options);
156 ASSERT_OK(Merge("k1", "v1"));
157 ASSERT_OK(Merge("k1", "v2"));
158 // Will trigger a merge when hitting max_successive_merges and the merge
159 // will fail. The delta will be inserted nevertheless.
160 ASSERT_OK(Merge("k1", "corrupted"));
161 // Data should stay unmerged after the error.
162 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
163 }
164
165 TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
166 Options options;
167 options.create_if_missing = true;
168 options.merge_operator.reset(new TestPutOperator());
169 options.env = env_;
170
171 DestroyAndReopen(options);
172 ASSERT_OK(Merge("k1", "v1"));
173 ASSERT_OK(Merge("k1", "corrupted"));
174 ASSERT_OK(Put("k2", "v2"));
175 auto* iter = db_->NewIterator(ReadOptions());
176 iter->Seek("k1");
177 ASSERT_FALSE(iter->Valid());
178 ASSERT_TRUE(iter->status().IsCorruption());
179 delete iter;
180 iter = db_->NewIterator(ReadOptions());
181 iter->Seek("k2");
182 ASSERT_TRUE(iter->Valid());
183 ASSERT_OK(iter->status());
184 iter->Prev();
185 ASSERT_FALSE(iter->Valid());
186 ASSERT_TRUE(iter->status().IsCorruption());
187 delete iter;
188 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
189
190 DestroyAndReopen(options);
191 ASSERT_OK(Merge("k1", "v1"));
192 ASSERT_OK(Put("k2", "v2"));
193 ASSERT_OK(Merge("k2", "corrupted"));
194 iter = db_->NewIterator(ReadOptions());
195 iter->Seek("k1");
196 ASSERT_TRUE(iter->Valid());
197 ASSERT_OK(iter->status());
198 iter->Next();
199 ASSERT_FALSE(iter->Valid());
200 ASSERT_TRUE(iter->status().IsCorruption());
201 delete iter;
202 VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
203 }
204
205 class MergeOperatorPinningTest : public DBMergeOperatorTest,
206 public testing::WithParamInterface<bool> {
207 public:
208 MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
209
210 bool disable_block_cache_;
211 };
212
213 INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
214 ::testing::Bool());
215
216 #ifndef ROCKSDB_LITE
217 TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
218 Options options = CurrentOptions();
219 BlockBasedTableOptions table_options;
220 table_options.block_size = 1; // every block will contain one entry
221 table_options.no_block_cache = disable_block_cache_;
222 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
223 options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
224 options.level0_slowdown_writes_trigger = (1 << 30);
225 options.level0_stop_writes_trigger = (1 << 30);
226 options.disable_auto_compactions = true;
227 DestroyAndReopen(options);
228
229 const int kKeysPerFile = 10;
230 const int kOperandsPerKeyPerFile = 7;
231 const int kOperandSize = 100;
232 // Filse to write in L0 before compacting to lower level
233 const int kFilesPerLevel = 3;
234
235 Random rnd(301);
236 std::map<std::string, std::string> true_data;
237 int batch_num = 1;
238 int lvl_to_fill = 4;
239 int key_id = 0;
240 while (true) {
241 for (int j = 0; j < kKeysPerFile; j++) {
242 std::string key = Key(key_id % 35);
243 key_id++;
244 for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
245 std::string val = rnd.RandomString(kOperandSize);
246 ASSERT_OK(db_->Merge(WriteOptions(), key, val));
247 if (true_data[key].size() == 0) {
248 true_data[key] = val;
249 } else {
250 true_data[key] += "," + val;
251 }
252 }
253 }
254
255 if (lvl_to_fill == -1) {
256 // Keep last batch in memtable and stop
257 break;
258 }
259
260 ASSERT_OK(Flush());
261 if (batch_num % kFilesPerLevel == 0) {
262 if (lvl_to_fill != 0) {
263 MoveFilesToLevel(lvl_to_fill);
264 }
265 lvl_to_fill--;
266 }
267 batch_num++;
268 }
269
270 // 3 L0 files
271 // 1 L1 file
272 // 3 L2 files
273 // 1 L3 file
274 // 3 L4 Files
275 ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
276
277 VerifyDBFromMap(true_data);
278 }
279
280 class MergeOperatorHook : public MergeOperator {
281 public:
282 explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
283 : merge_op_(_merge_op) {}
284
285 bool FullMergeV2(const MergeOperationInput& merge_in,
286 MergeOperationOutput* merge_out) const override {
287 before_merge_();
288 bool res = merge_op_->FullMergeV2(merge_in, merge_out);
289 after_merge_();
290 return res;
291 }
292
293 const char* Name() const override { return merge_op_->Name(); }
294
295 std::shared_ptr<MergeOperator> merge_op_;
296 std::function<void()> before_merge_ = []() {};
297 std::function<void()> after_merge_ = []() {};
298 };
299
300 TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
301 Options options = CurrentOptions();
302
303 auto merge_hook =
304 std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
305 options.merge_operator = merge_hook;
306 options.disable_auto_compactions = true;
307 options.level0_slowdown_writes_trigger = (1 << 30);
308 options.level0_stop_writes_trigger = (1 << 30);
309 options.max_open_files = 20;
310 BlockBasedTableOptions bbto;
311 bbto.no_block_cache = disable_block_cache_;
312 if (bbto.no_block_cache == false) {
313 bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
314 } else {
315 bbto.block_cache = nullptr;
316 }
317 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
318 DestroyAndReopen(options);
319
320 const int kNumOperands = 30;
321 const int kNumKeys = 1000;
322 const int kOperandSize = 100;
323 Random rnd(301);
324
325 // 1000 keys every key have 30 operands, every operand is in a different file
326 std::map<std::string, std::string> true_data;
327 for (int i = 0; i < kNumOperands; i++) {
328 for (int j = 0; j < kNumKeys; j++) {
329 std::string k = Key(j);
330 std::string v = rnd.RandomString(kOperandSize);
331 ASSERT_OK(db_->Merge(WriteOptions(), k, v));
332
333 true_data[k] = std::max(true_data[k], v);
334 }
335 ASSERT_OK(Flush());
336 }
337
338 std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
339 ASSERT_EQ(file_numbers.size(), kNumOperands);
340 int merge_cnt = 0;
341
342 // Code executed before merge operation
343 merge_hook->before_merge_ = [&]() {
344 // Evict all tables from cache before every merge operation
345 auto* table_cache = dbfull()->TEST_table_cache();
346 for (uint64_t num : file_numbers) {
347 TableCache::Evict(table_cache, num);
348 }
349 // Decrease cache capacity to force all unrefed blocks to be evicted
350 if (bbto.block_cache) {
351 bbto.block_cache->SetCapacity(1);
352 }
353 merge_cnt++;
354 };
355
356 // Code executed after merge operation
357 merge_hook->after_merge_ = [&]() {
358 // Increase capacity again after doing the merge
359 if (bbto.block_cache) {
360 bbto.block_cache->SetCapacity(64 * 1024 * 1024);
361 }
362 };
363
364 size_t total_reads;
365 VerifyDBFromMap(true_data, &total_reads);
366 ASSERT_EQ(merge_cnt, total_reads);
367
368 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
369
370 VerifyDBFromMap(true_data, &total_reads);
371 }
372
373 TEST_P(MergeOperatorPinningTest, TailingIterator) {
374 Options options = CurrentOptions();
375 options.merge_operator = MergeOperators::CreateMaxOperator();
376 BlockBasedTableOptions bbto;
377 bbto.no_block_cache = disable_block_cache_;
378 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
379 DestroyAndReopen(options);
380
381 const int kNumOperands = 100;
382 const int kNumWrites = 100000;
383
384 std::function<void()> writer_func = [&]() {
385 int k = 0;
386 for (int i = 0; i < kNumWrites; i++) {
387 ASSERT_OK(db_->Merge(WriteOptions(), Key(k), Key(k)));
388
389 if (i && i % kNumOperands == 0) {
390 k++;
391 }
392 if (i && i % 127 == 0) {
393 ASSERT_OK(Flush());
394 }
395 if (i && i % 317 == 0) {
396 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
397 }
398 }
399 };
400
401 std::function<void()> reader_func = [&]() {
402 ReadOptions ro;
403 ro.tailing = true;
404 Iterator* iter = db_->NewIterator(ro);
405 ASSERT_OK(iter->status());
406 iter->SeekToFirst();
407 for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
408 while (!iter->Valid()) {
409 // wait for the key to be written
410 env_->SleepForMicroseconds(100);
411 iter->Seek(Key(i));
412 }
413 ASSERT_EQ(iter->key(), Key(i));
414 ASSERT_EQ(iter->value(), Key(i));
415
416 iter->Next();
417 }
418 ASSERT_OK(iter->status());
419
420 delete iter;
421 };
422
423 ROCKSDB_NAMESPACE::port::Thread writer_thread(writer_func);
424 ROCKSDB_NAMESPACE::port::Thread reader_thread(reader_func);
425
426 writer_thread.join();
427 reader_thread.join();
428 }
429
430 TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
431 Options options = CurrentOptions();
432 options.merge_operator = MergeOperators::CreateStringAppendOperator();
433 DestroyAndReopen(options);
434
435 // Overview of the test:
436 // * There are two merge operands for the same key: one in an sst file,
437 // another in a memtable.
438 // * Seek a tailing iterator to this key.
439 // * As part of the seek, the iterator will:
440 // (a) first visit the operand in the memtable and tell ForwardIterator
441 // to pin this operand, then
442 // (b) move on to the operand in the sst file, then pass both operands
443 // to merge operator.
444 // * The memtable may get flushed and unreferenced by another thread between
445 // (a) and (b). The test simulates it by flushing the memtable inside a
446 // SyncPoint callback located between (a) and (b).
447 // * In this case it's ForwardIterator's responsibility to keep the memtable
448 // pinned until (b) is complete. There used to be a bug causing
449 // ForwardIterator to not pin it in some circumstances. This test
450 // reproduces it.
451
452 ASSERT_OK(db_->Merge(WriteOptions(), "key", "sst"));
453 ASSERT_OK(db_->Flush(FlushOptions())); // Switch to SuperVersion A
454 ASSERT_OK(db_->Merge(WriteOptions(), "key", "memtable"));
455
456 // Pin SuperVersion A
457 std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
458 ASSERT_OK(someone_else->status());
459
460 bool pushed_first_operand = false;
461 bool stepped_to_next_operand = false;
462 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
463 "DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
464 EXPECT_FALSE(pushed_first_operand);
465 pushed_first_operand = true;
466 EXPECT_OK(db_->Flush(FlushOptions())); // Switch to SuperVersion B
467 });
468 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
469 "DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
470 EXPECT_FALSE(stepped_to_next_operand);
471 stepped_to_next_operand = true;
472 someone_else.reset(); // Unpin SuperVersion A
473 });
474 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
475
476 ReadOptions ro;
477 ro.tailing = true;
478 std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
479 iter->Seek("key");
480
481 ASSERT_OK(iter->status());
482 ASSERT_TRUE(iter->Valid());
483 EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
484 EXPECT_TRUE(pushed_first_operand);
485 EXPECT_TRUE(stepped_to_next_operand);
486 }
487 #endif // ROCKSDB_LITE
488
489 TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
490 Options options = CurrentOptions();
491 options.merge_operator = MergeOperators::CreateStringAppendOperator();
492 DestroyAndReopen(options);
493
494 class TestSnapshotChecker : public SnapshotChecker {
495 public:
496 SnapshotCheckerResult CheckInSnapshot(
497 SequenceNumber seq, SequenceNumber snapshot_seq) const override {
498 return IsInSnapshot(seq, snapshot_seq)
499 ? SnapshotCheckerResult::kInSnapshot
500 : SnapshotCheckerResult::kNotInSnapshot;
501 }
502
503 bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const {
504 switch (snapshot_seq) {
505 case 0:
506 return seq == 0;
507 case 1:
508 return seq <= 1;
509 case 2:
510 // seq = 2 not visible to snapshot with seq = 2
511 return seq <= 1;
512 case 3:
513 return seq <= 3;
514 case 4:
515 // seq = 4 not visible to snpahost with seq = 4
516 return seq <= 3;
517 default:
518 // seq >=4 is uncommitted
519 return seq <= 4;
520 };
521 }
522 };
523 TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
524 dbfull()->SetSnapshotChecker(snapshot_checker);
525
526 std::string value;
527 ASSERT_OK(Merge("foo", "v1"));
528 ASSERT_EQ(1, db_->GetLatestSequenceNumber());
529 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
530 ASSERT_OK(Merge("foo", "v2"));
531 ASSERT_EQ(2, db_->GetLatestSequenceNumber());
532 // v2 is not visible to latest snapshot, which has seq = 2.
533 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
534 // Take a snapshot with seq = 2.
535 const Snapshot* snapshot1 = db_->GetSnapshot();
536 ASSERT_EQ(2, snapshot1->GetSequenceNumber());
537 // v2 is not visible to snapshot1, which has seq = 2
538 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
539
540 // Verify flush doesn't alter the result.
541 ASSERT_OK(Flush());
542 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
543 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
544
545 ASSERT_OK(Merge("foo", "v3"));
546 ASSERT_EQ(3, db_->GetLatestSequenceNumber());
547 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
548 ASSERT_OK(Merge("foo", "v4"));
549 ASSERT_EQ(4, db_->GetLatestSequenceNumber());
550 // v4 is not visible to latest snapshot, which has seq = 4.
551 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
552 const Snapshot* snapshot2 = db_->GetSnapshot();
553 ASSERT_EQ(4, snapshot2->GetSequenceNumber());
554 // v4 is not visible to snapshot2, which has seq = 4.
555 ASSERT_EQ("v1,v2,v3",
556 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
557
558 // Verify flush doesn't alter the result.
559 ASSERT_OK(Flush());
560 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
561 ASSERT_EQ("v1,v2,v3",
562 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
563 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
564
565 ASSERT_OK(Merge("foo", "v5"));
566 ASSERT_EQ(5, db_->GetLatestSequenceNumber());
567 // v5 is uncommitted
568 ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
569
570 // full manual compaction.
571 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
572
573 // Verify compaction doesn't alter the result.
574 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
575 ASSERT_EQ("v1,v2,v3",
576 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
577 ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
578
579 db_->ReleaseSnapshot(snapshot1);
580 db_->ReleaseSnapshot(snapshot2);
581 }
582
583 class PerConfigMergeOperatorPinningTest
584 : public DBMergeOperatorTest,
585 public testing::WithParamInterface<std::tuple<bool, int>> {
586 public:
587 PerConfigMergeOperatorPinningTest() {
588 std::tie(disable_block_cache_, option_config_) = GetParam();
589 }
590
591 bool disable_block_cache_;
592 };
593
594 INSTANTIATE_TEST_CASE_P(
595 MergeOperatorPinningTest, PerConfigMergeOperatorPinningTest,
596 ::testing::Combine(::testing::Bool(),
597 ::testing::Range(static_cast<int>(DBTestBase::kDefault),
598 static_cast<int>(DBTestBase::kEnd))));
599
600 TEST_P(PerConfigMergeOperatorPinningTest, Randomized) {
601 if (ShouldSkipOptions(option_config_, kSkipMergePut)) {
602 return;
603 }
604
605 Options options = CurrentOptions();
606 options.merge_operator = MergeOperators::CreateMaxOperator();
607 BlockBasedTableOptions table_options;
608 table_options.no_block_cache = disable_block_cache_;
609 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
610 DestroyAndReopen(options);
611
612 Random rnd(301);
613 std::map<std::string, std::string> true_data;
614
615 const int kTotalMerges = 5000;
616 // Every key gets ~10 operands
617 const int kKeyRange = kTotalMerges / 10;
618 const int kOperandSize = 20;
619 const int kNumPutBefore = kKeyRange / 10; // 10% value
620 const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
621 const int kNumDelete = kKeyRange / 10; // 10% delete
622
623 // kNumPutBefore keys will have base values
624 for (int i = 0; i < kNumPutBefore; i++) {
625 std::string key = Key(rnd.Next() % kKeyRange);
626 std::string value = rnd.RandomString(kOperandSize);
627 ASSERT_OK(db_->Put(WriteOptions(), key, value));
628
629 true_data[key] = value;
630 }
631
632 // Do kTotalMerges merges
633 for (int i = 0; i < kTotalMerges; i++) {
634 std::string key = Key(rnd.Next() % kKeyRange);
635 std::string value = rnd.RandomString(kOperandSize);
636 ASSERT_OK(db_->Merge(WriteOptions(), key, value));
637
638 if (true_data[key] < value) {
639 true_data[key] = value;
640 }
641 }
642
643 // Overwrite random kNumPutAfter keys
644 for (int i = 0; i < kNumPutAfter; i++) {
645 std::string key = Key(rnd.Next() % kKeyRange);
646 std::string value = rnd.RandomString(kOperandSize);
647 ASSERT_OK(db_->Put(WriteOptions(), key, value));
648
649 true_data[key] = value;
650 }
651
652 // Delete random kNumDelete keys
653 for (int i = 0; i < kNumDelete; i++) {
654 std::string key = Key(rnd.Next() % kKeyRange);
655 ASSERT_OK(db_->Delete(WriteOptions(), key));
656
657 true_data.erase(key);
658 }
659
660 VerifyDBFromMap(true_data);
661 }
662
663 } // namespace ROCKSDB_NAMESPACE
664
665 int main(int argc, char** argv) {
666 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
667 ::testing::InitGoogleTest(&argc, argv);
668 return RUN_ALL_TESTS();
669 }