]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_merge_operator_test.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / rocksdb / db / db_merge_operator_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5#include <string>
6#include <vector>
7
8#include "db/db_test_util.h"
9#include "db/forward_iterator.h"
10#include "port/stack_trace.h"
11fdf7f2 11#include "rocksdb/merge_operator.h"
7c673cae 12#include "utilities/merge_operators.h"
11fdf7f2 13#include "utilities/merge_operators/string_append/stringappend2.h"
7c673cae 14
f67539c2 15namespace ROCKSDB_NAMESPACE {
7c673cae 16
11fdf7f2
TL
17class TestReadCallback : public ReadCallback {
18 public:
19 TestReadCallback(SnapshotChecker* snapshot_checker,
20 SequenceNumber snapshot_seq)
494da23a
TL
21 : ReadCallback(snapshot_seq),
22 snapshot_checker_(snapshot_checker),
23 snapshot_seq_(snapshot_seq) {}
11fdf7f2 24
494da23a
TL
25 bool IsVisibleFullCheck(SequenceNumber seq) override {
26 return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==
27 SnapshotCheckerResult::kInSnapshot;
11fdf7f2
TL
28 }
29
30 private:
31 SnapshotChecker* snapshot_checker_;
32 SequenceNumber snapshot_seq_;
33};
34
7c673cae
FG
35// Test merge operator functionality.
36class DBMergeOperatorTest : public DBTestBase {
37 public:
38 DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
11fdf7f2
TL
39
40 std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
41 const Slice& key,
42 const Snapshot* snapshot = nullptr) {
43 SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
44 : snapshot->GetSequenceNumber();
45 TestReadCallback read_callback(snapshot_checker, seq);
46 ReadOptions read_opt;
47 read_opt.snapshot = snapshot;
48 PinnableSlice value;
f67539c2
TL
49 DBImpl::GetImplOptions get_impl_options;
50 get_impl_options.column_family = db_->DefaultColumnFamily();
51 get_impl_options.value = &value;
52 get_impl_options.callback = &read_callback;
53 Status s = dbfull()->GetImpl(read_opt, key, get_impl_options);
11fdf7f2
TL
54 if (!s.ok()) {
55 return s.ToString();
56 }
57 return value.ToString();
58 }
7c673cae
FG
59};
60
11fdf7f2
TL
61TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
62 class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
63 public:
64 LimitedStringAppendMergeOp(int limit, char delim)
65 : StringAppendTESTOperator(delim), limit_(limit) {}
66
67 const char* Name() const override {
68 return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
7c673cae 69 }
11fdf7f2
TL
70
71 bool ShouldMerge(const std::vector<Slice>& operands) const override {
72 if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
73 return true;
7c673cae 74 }
11fdf7f2 75 return false;
7c673cae 76 }
7c673cae 77
11fdf7f2
TL
78 private:
79 size_t limit_ = 0;
80 };
81
82 Options options;
83 options.create_if_missing = true;
84 // Use only the latest two merge operands.
85 options.merge_operator =
86 std::make_shared<LimitedStringAppendMergeOp>(2, ',');
87 options.env = env_;
88 Reopen(options);
89 // All K1 values are in memtable.
90 ASSERT_OK(Merge("k1", "a"));
91 ASSERT_OK(Merge("k1", "b"));
92 ASSERT_OK(Merge("k1", "c"));
93 ASSERT_OK(Merge("k1", "d"));
94 std::string value;
95 ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok());
96 // Make sure that only the latest two merge operands are used. If this was
97 // not the case the value would be "a,b,c,d".
98 ASSERT_EQ(value, "c,d");
99
100 // All K2 values are flushed to L0 into a single file.
101 ASSERT_OK(Merge("k2", "a"));
102 ASSERT_OK(Merge("k2", "b"));
103 ASSERT_OK(Merge("k2", "c"));
104 ASSERT_OK(Merge("k2", "d"));
105 ASSERT_OK(Flush());
106 ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok());
107 ASSERT_EQ(value, "c,d");
108
109 // All K3 values are flushed and are in different files.
110 ASSERT_OK(Merge("k3", "ab"));
111 ASSERT_OK(Flush());
112 ASSERT_OK(Merge("k3", "bc"));
113 ASSERT_OK(Flush());
114 ASSERT_OK(Merge("k3", "cd"));
115 ASSERT_OK(Flush());
116 ASSERT_OK(Merge("k3", "de"));
117 ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok());
118 ASSERT_EQ(value, "cd,de");
119
120 // All K4 values are in different levels
121 ASSERT_OK(Merge("k4", "ab"));
122 ASSERT_OK(Flush());
123 MoveFilesToLevel(4);
124 ASSERT_OK(Merge("k4", "bc"));
125 ASSERT_OK(Flush());
126 MoveFilesToLevel(3);
127 ASSERT_OK(Merge("k4", "cd"));
128 ASSERT_OK(Flush());
129 MoveFilesToLevel(1);
130 ASSERT_OK(Merge("k4", "de"));
131 ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok());
132 ASSERT_EQ(value, "cd,de");
133}
7c673cae
FG
134
135TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
136 Options options;
137 options.create_if_missing = true;
138 options.merge_operator.reset(new TestPutOperator());
139 options.env = env_;
140 Reopen(options);
141 ASSERT_OK(Merge("k1", "v1"));
142 ASSERT_OK(Merge("k1", "corrupted"));
143 std::string value;
144 ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
145 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
146}
147
148TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
149 Options options;
150 options.create_if_missing = true;
151 options.merge_operator.reset(new TestPutOperator());
152 options.max_successive_merges = 3;
153 options.env = env_;
154 Reopen(options);
155 ASSERT_OK(Merge("k1", "v1"));
156 ASSERT_OK(Merge("k1", "v2"));
157 // Will trigger a merge when hitting max_successive_merges and the merge
158 // will fail. The delta will be inserted nevertheless.
159 ASSERT_OK(Merge("k1", "corrupted"));
160 // Data should stay unmerged after the error.
161 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
162}
163
164TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
165 Options options;
166 options.create_if_missing = true;
167 options.merge_operator.reset(new TestPutOperator());
168 options.env = env_;
169
170 DestroyAndReopen(options);
171 ASSERT_OK(Merge("k1", "v1"));
172 ASSERT_OK(Merge("k1", "corrupted"));
173 ASSERT_OK(Put("k2", "v2"));
11fdf7f2
TL
174 auto* iter = db_->NewIterator(ReadOptions());
175 iter->Seek("k1");
176 ASSERT_FALSE(iter->Valid());
177 ASSERT_TRUE(iter->status().IsCorruption());
178 delete iter;
179 iter = db_->NewIterator(ReadOptions());
180 iter->Seek("k2");
181 ASSERT_TRUE(iter->Valid());
182 ASSERT_OK(iter->status());
183 iter->Prev();
184 ASSERT_FALSE(iter->Valid());
185 ASSERT_TRUE(iter->status().IsCorruption());
186 delete iter;
7c673cae
FG
187 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
188
189 DestroyAndReopen(options);
190 ASSERT_OK(Merge("k1", "v1"));
191 ASSERT_OK(Put("k2", "v2"));
192 ASSERT_OK(Merge("k2", "corrupted"));
11fdf7f2
TL
193 iter = db_->NewIterator(ReadOptions());
194 iter->Seek("k1");
195 ASSERT_TRUE(iter->Valid());
196 ASSERT_OK(iter->status());
197 iter->Next();
198 ASSERT_FALSE(iter->Valid());
199 ASSERT_TRUE(iter->status().IsCorruption());
200 delete iter;
7c673cae
FG
201 VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
202}
203
204
205class MergeOperatorPinningTest : public DBMergeOperatorTest,
206 public testing::WithParamInterface<bool> {
207 public:
208 MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
209
210 bool disable_block_cache_;
211};
212
213INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
214 ::testing::Bool());
215
216#ifndef ROCKSDB_LITE
217TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
218 Options options = CurrentOptions();
219 BlockBasedTableOptions table_options;
220 table_options.block_size = 1; // every block will contain one entry
221 table_options.no_block_cache = disable_block_cache_;
222 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
223 options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
224 options.level0_slowdown_writes_trigger = (1 << 30);
225 options.level0_stop_writes_trigger = (1 << 30);
226 options.disable_auto_compactions = true;
227 DestroyAndReopen(options);
228
229 const int kKeysPerFile = 10;
230 const int kOperandsPerKeyPerFile = 7;
231 const int kOperandSize = 100;
232 // Filse to write in L0 before compacting to lower level
233 const int kFilesPerLevel = 3;
234
235 Random rnd(301);
236 std::map<std::string, std::string> true_data;
237 int batch_num = 1;
238 int lvl_to_fill = 4;
239 int key_id = 0;
240 while (true) {
241 for (int j = 0; j < kKeysPerFile; j++) {
242 std::string key = Key(key_id % 35);
243 key_id++;
244 for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
245 std::string val = RandomString(&rnd, kOperandSize);
246 ASSERT_OK(db_->Merge(WriteOptions(), key, val));
247 if (true_data[key].size() == 0) {
248 true_data[key] = val;
249 } else {
250 true_data[key] += "," + val;
251 }
252 }
253 }
254
255 if (lvl_to_fill == -1) {
256 // Keep last batch in memtable and stop
257 break;
258 }
259
260 ASSERT_OK(Flush());
261 if (batch_num % kFilesPerLevel == 0) {
262 if (lvl_to_fill != 0) {
263 MoveFilesToLevel(lvl_to_fill);
264 }
265 lvl_to_fill--;
266 }
267 batch_num++;
268 }
269
270 // 3 L0 files
271 // 1 L1 file
272 // 3 L2 files
273 // 1 L3 file
274 // 3 L4 Files
275 ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
276
277 VerifyDBFromMap(true_data);
278}
279
7c673cae
FG
280class MergeOperatorHook : public MergeOperator {
281 public:
282 explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
283 : merge_op_(_merge_op) {}
284
494da23a
TL
285 bool FullMergeV2(const MergeOperationInput& merge_in,
286 MergeOperationOutput* merge_out) const override {
7c673cae
FG
287 before_merge_();
288 bool res = merge_op_->FullMergeV2(merge_in, merge_out);
289 after_merge_();
290 return res;
291 }
292
494da23a 293 const char* Name() const override { return merge_op_->Name(); }
7c673cae
FG
294
295 std::shared_ptr<MergeOperator> merge_op_;
296 std::function<void()> before_merge_ = []() {};
297 std::function<void()> after_merge_ = []() {};
298};
299
300TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
301 Options options = CurrentOptions();
302
303 auto merge_hook =
304 std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
305 options.merge_operator = merge_hook;
306 options.disable_auto_compactions = true;
307 options.level0_slowdown_writes_trigger = (1 << 30);
308 options.level0_stop_writes_trigger = (1 << 30);
309 options.max_open_files = 20;
310 BlockBasedTableOptions bbto;
311 bbto.no_block_cache = disable_block_cache_;
312 if (bbto.no_block_cache == false) {
313 bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
314 } else {
315 bbto.block_cache = nullptr;
316 }
317 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
318 DestroyAndReopen(options);
319
320 const int kNumOperands = 30;
321 const int kNumKeys = 1000;
322 const int kOperandSize = 100;
323 Random rnd(301);
324
325 // 1000 keys every key have 30 operands, every operand is in a different file
326 std::map<std::string, std::string> true_data;
327 for (int i = 0; i < kNumOperands; i++) {
328 for (int j = 0; j < kNumKeys; j++) {
329 std::string k = Key(j);
330 std::string v = RandomString(&rnd, kOperandSize);
331 ASSERT_OK(db_->Merge(WriteOptions(), k, v));
332
333 true_data[k] = std::max(true_data[k], v);
334 }
335 ASSERT_OK(Flush());
336 }
337
338 std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
339 ASSERT_EQ(file_numbers.size(), kNumOperands);
340 int merge_cnt = 0;
341
342 // Code executed before merge operation
343 merge_hook->before_merge_ = [&]() {
344 // Evict all tables from cache before every merge operation
345 for (uint64_t num : file_numbers) {
346 TableCache::Evict(dbfull()->TEST_table_cache(), num);
347 }
348 // Decrease cache capacity to force all unrefed blocks to be evicted
349 if (bbto.block_cache) {
350 bbto.block_cache->SetCapacity(1);
351 }
352 merge_cnt++;
353 };
354
355 // Code executed after merge operation
356 merge_hook->after_merge_ = [&]() {
357 // Increase capacity again after doing the merge
358 if (bbto.block_cache) {
359 bbto.block_cache->SetCapacity(64 * 1024 * 1024);
360 }
361 };
362
363 size_t total_reads;
364 VerifyDBFromMap(true_data, &total_reads);
365 ASSERT_EQ(merge_cnt, total_reads);
366
367 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
368
369 VerifyDBFromMap(true_data, &total_reads);
370}
371
372TEST_P(MergeOperatorPinningTest, TailingIterator) {
373 Options options = CurrentOptions();
374 options.merge_operator = MergeOperators::CreateMaxOperator();
375 BlockBasedTableOptions bbto;
376 bbto.no_block_cache = disable_block_cache_;
377 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
378 DestroyAndReopen(options);
379
380 const int kNumOperands = 100;
381 const int kNumWrites = 100000;
382
383 std::function<void()> writer_func = [&]() {
384 int k = 0;
385 for (int i = 0; i < kNumWrites; i++) {
386 db_->Merge(WriteOptions(), Key(k), Key(k));
387
388 if (i && i % kNumOperands == 0) {
389 k++;
390 }
391 if (i && i % 127 == 0) {
392 ASSERT_OK(Flush());
393 }
394 if (i && i % 317 == 0) {
395 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
396 }
397 }
398 };
399
400 std::function<void()> reader_func = [&]() {
401 ReadOptions ro;
402 ro.tailing = true;
403 Iterator* iter = db_->NewIterator(ro);
404
405 iter->SeekToFirst();
406 for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
407 while (!iter->Valid()) {
408 // wait for the key to be written
409 env_->SleepForMicroseconds(100);
410 iter->Seek(Key(i));
411 }
412 ASSERT_EQ(iter->key(), Key(i));
413 ASSERT_EQ(iter->value(), Key(i));
414
415 iter->Next();
416 }
417
418 delete iter;
419 };
420
f67539c2
TL
421 ROCKSDB_NAMESPACE::port::Thread writer_thread(writer_func);
422 ROCKSDB_NAMESPACE::port::Thread reader_thread(reader_func);
7c673cae
FG
423
424 writer_thread.join();
425 reader_thread.join();
426}
11fdf7f2
TL
427
428TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
429 Options options = CurrentOptions();
430 options.merge_operator = MergeOperators::CreateStringAppendOperator();
431 DestroyAndReopen(options);
432
433 // Overview of the test:
434 // * There are two merge operands for the same key: one in an sst file,
435 // another in a memtable.
436 // * Seek a tailing iterator to this key.
437 // * As part of the seek, the iterator will:
438 // (a) first visit the operand in the memtable and tell ForwardIterator
439 // to pin this operand, then
440 // (b) move on to the operand in the sst file, then pass both operands
441 // to merge operator.
442 // * The memtable may get flushed and unreferenced by another thread between
443 // (a) and (b). The test simulates it by flushing the memtable inside a
444 // SyncPoint callback located between (a) and (b).
445 // * In this case it's ForwardIterator's responsibility to keep the memtable
446 // pinned until (b) is complete. There used to be a bug causing
447 // ForwardIterator to not pin it in some circumstances. This test
448 // reproduces it.
449
450 db_->Merge(WriteOptions(), "key", "sst");
451 db_->Flush(FlushOptions()); // Switch to SuperVersion A
452 db_->Merge(WriteOptions(), "key", "memtable");
453
454 // Pin SuperVersion A
455 std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
456
457 bool pushed_first_operand = false;
458 bool stepped_to_next_operand = false;
f67539c2 459 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2
TL
460 "DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
461 EXPECT_FALSE(pushed_first_operand);
462 pushed_first_operand = true;
463 db_->Flush(FlushOptions()); // Switch to SuperVersion B
464 });
f67539c2 465 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2
TL
466 "DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
467 EXPECT_FALSE(stepped_to_next_operand);
468 stepped_to_next_operand = true;
469 someone_else.reset(); // Unpin SuperVersion A
470 });
f67539c2 471 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
11fdf7f2
TL
472
473 ReadOptions ro;
474 ro.tailing = true;
475 std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
476 iter->Seek("key");
477
478 ASSERT_TRUE(iter->status().ok());
479 ASSERT_TRUE(iter->Valid());
480 EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
481 EXPECT_TRUE(pushed_first_operand);
482 EXPECT_TRUE(stepped_to_next_operand);
483}
7c673cae
FG
484#endif // ROCKSDB_LITE
485
11fdf7f2
TL
486TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
487 Options options = CurrentOptions();
488 options.merge_operator = MergeOperators::CreateStringAppendOperator();
489 DestroyAndReopen(options);
490
491 class TestSnapshotChecker : public SnapshotChecker {
494da23a
TL
492 public:
493 SnapshotCheckerResult CheckInSnapshot(
494 SequenceNumber seq, SequenceNumber snapshot_seq) const override {
495 return IsInSnapshot(seq, snapshot_seq)
496 ? SnapshotCheckerResult::kInSnapshot
497 : SnapshotCheckerResult::kNotInSnapshot;
498 }
499
500 bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const {
11fdf7f2
TL
501 switch (snapshot_seq) {
502 case 0:
503 return seq == 0;
504 case 1:
505 return seq <= 1;
506 case 2:
507 // seq = 2 not visible to snapshot with seq = 2
508 return seq <= 1;
509 case 3:
510 return seq <= 3;
511 case 4:
512 // seq = 4 not visible to snpahost with seq = 4
513 return seq <= 3;
514 default:
515 // seq >=4 is uncommitted
516 return seq <= 4;
517 };
518 }
519 };
520 TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
521 dbfull()->SetSnapshotChecker(snapshot_checker);
522
523 std::string value;
524 ASSERT_OK(Merge("foo", "v1"));
525 ASSERT_EQ(1, db_->GetLatestSequenceNumber());
526 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
527 ASSERT_OK(Merge("foo", "v2"));
528 ASSERT_EQ(2, db_->GetLatestSequenceNumber());
529 // v2 is not visible to latest snapshot, which has seq = 2.
530 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
531 // Take a snapshot with seq = 2.
532 const Snapshot* snapshot1 = db_->GetSnapshot();
533 ASSERT_EQ(2, snapshot1->GetSequenceNumber());
534 // v2 is not visible to snapshot1, which has seq = 2
535 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
536
537 // Verify flush doesn't alter the result.
538 ASSERT_OK(Flush());
539 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
540 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
541
542 ASSERT_OK(Merge("foo", "v3"));
543 ASSERT_EQ(3, db_->GetLatestSequenceNumber());
544 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
545 ASSERT_OK(Merge("foo", "v4"));
546 ASSERT_EQ(4, db_->GetLatestSequenceNumber());
547 // v4 is not visible to latest snapshot, which has seq = 4.
548 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
549 const Snapshot* snapshot2 = db_->GetSnapshot();
550 ASSERT_EQ(4, snapshot2->GetSequenceNumber());
551 // v4 is not visible to snapshot2, which has seq = 4.
552 ASSERT_EQ("v1,v2,v3",
553 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
554
555 // Verify flush doesn't alter the result.
556 ASSERT_OK(Flush());
557 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
558 ASSERT_EQ("v1,v2,v3",
559 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
560 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
561
562 ASSERT_OK(Merge("foo", "v5"));
563 ASSERT_EQ(5, db_->GetLatestSequenceNumber());
564 // v5 is uncommitted
565 ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
566
567 // full manual compaction.
568 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
569
570 // Verify compaction doesn't alter the result.
571 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
572 ASSERT_EQ("v1,v2,v3",
573 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
574 ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
575
576 db_->ReleaseSnapshot(snapshot1);
577 db_->ReleaseSnapshot(snapshot2);
578}
579
f67539c2
TL
580class PerConfigMergeOperatorPinningTest
581 : public DBMergeOperatorTest,
582 public testing::WithParamInterface<std::tuple<bool, int>> {
583 public:
584 PerConfigMergeOperatorPinningTest() {
585 std::tie(disable_block_cache_, option_config_) = GetParam();
586 }
587
588 bool disable_block_cache_;
589};
590
591INSTANTIATE_TEST_CASE_P(
592 MergeOperatorPinningTest, PerConfigMergeOperatorPinningTest,
593 ::testing::Combine(::testing::Bool(),
594 ::testing::Range(static_cast<int>(DBTestBase::kDefault),
595 static_cast<int>(DBTestBase::kEnd))));
596
597TEST_P(PerConfigMergeOperatorPinningTest, Randomized) {
598 if (ShouldSkipOptions(option_config_, kSkipMergePut)) {
599 return;
600 }
601
602 Options options = CurrentOptions();
603 options.merge_operator = MergeOperators::CreateMaxOperator();
604 BlockBasedTableOptions table_options;
605 table_options.no_block_cache = disable_block_cache_;
606 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
607 DestroyAndReopen(options);
608
609 Random rnd(301);
610 std::map<std::string, std::string> true_data;
611
612 const int kTotalMerges = 5000;
613 // Every key gets ~10 operands
614 const int kKeyRange = kTotalMerges / 10;
615 const int kOperandSize = 20;
616 const int kNumPutBefore = kKeyRange / 10; // 10% value
617 const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
618 const int kNumDelete = kKeyRange / 10; // 10% delete
619
620 // kNumPutBefore keys will have base values
621 for (int i = 0; i < kNumPutBefore; i++) {
622 std::string key = Key(rnd.Next() % kKeyRange);
623 std::string value = RandomString(&rnd, kOperandSize);
624 ASSERT_OK(db_->Put(WriteOptions(), key, value));
625
626 true_data[key] = value;
627 }
628
629 // Do kTotalMerges merges
630 for (int i = 0; i < kTotalMerges; i++) {
631 std::string key = Key(rnd.Next() % kKeyRange);
632 std::string value = RandomString(&rnd, kOperandSize);
633 ASSERT_OK(db_->Merge(WriteOptions(), key, value));
634
635 if (true_data[key] < value) {
636 true_data[key] = value;
637 }
638 }
639
640 // Overwrite random kNumPutAfter keys
641 for (int i = 0; i < kNumPutAfter; i++) {
642 std::string key = Key(rnd.Next() % kKeyRange);
643 std::string value = RandomString(&rnd, kOperandSize);
644 ASSERT_OK(db_->Put(WriteOptions(), key, value));
645
646 true_data[key] = value;
647 }
648
649 // Delete random kNumDelete keys
650 for (int i = 0; i < kNumDelete; i++) {
651 std::string key = Key(rnd.Next() % kKeyRange);
652 ASSERT_OK(db_->Delete(WriteOptions(), key));
653
654 true_data.erase(key);
655 }
656
657 VerifyDBFromMap(true_data);
658}
659
660} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
661
662int main(int argc, char** argv) {
f67539c2 663 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
7c673cae
FG
664 ::testing::InitGoogleTest(&argc, argv);
665 return RUN_ALL_TESTS();
666}