]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_merge_operator_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_merge_operator_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 #include <string>
6 #include <vector>
7
8 #include "db/db_test_util.h"
9 #include "db/forward_iterator.h"
10 #include "port/stack_trace.h"
11 #include "rocksdb/merge_operator.h"
12 #include "utilities/merge_operators.h"
13 #include "utilities/merge_operators/string_append/stringappend2.h"
14
15 namespace rocksdb {
16
17 class TestReadCallback : public ReadCallback {
18 public:
19 TestReadCallback(SnapshotChecker* snapshot_checker,
20 SequenceNumber snapshot_seq)
21 : ReadCallback(snapshot_seq),
22 snapshot_checker_(snapshot_checker),
23 snapshot_seq_(snapshot_seq) {}
24
25 bool IsVisibleFullCheck(SequenceNumber seq) override {
26 return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==
27 SnapshotCheckerResult::kInSnapshot;
28 }
29
30 private:
31 SnapshotChecker* snapshot_checker_;
32 SequenceNumber snapshot_seq_;
33 };
34
35 // Test merge operator functionality.
36 class DBMergeOperatorTest : public DBTestBase {
37 public:
38 DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
39
40 std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
41 const Slice& key,
42 const Snapshot* snapshot = nullptr) {
43 SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
44 : snapshot->GetSequenceNumber();
45 TestReadCallback read_callback(snapshot_checker, seq);
46 ReadOptions read_opt;
47 read_opt.snapshot = snapshot;
48 PinnableSlice value;
49 Status s =
50 dbfull()->GetImpl(read_opt, db_->DefaultColumnFamily(), key, &value,
51 nullptr /*value_found*/, &read_callback);
52 if (!s.ok()) {
53 return s.ToString();
54 }
55 return value.ToString();
56 }
57 };
58
59 TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
60 class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
61 public:
62 LimitedStringAppendMergeOp(int limit, char delim)
63 : StringAppendTESTOperator(delim), limit_(limit) {}
64
65 const char* Name() const override {
66 return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
67 }
68
69 bool ShouldMerge(const std::vector<Slice>& operands) const override {
70 if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
71 return true;
72 }
73 return false;
74 }
75
76 private:
77 size_t limit_ = 0;
78 };
79
80 Options options;
81 options.create_if_missing = true;
82 // Use only the latest two merge operands.
83 options.merge_operator =
84 std::make_shared<LimitedStringAppendMergeOp>(2, ',');
85 options.env = env_;
86 Reopen(options);
87 // All K1 values are in memtable.
88 ASSERT_OK(Merge("k1", "a"));
89 ASSERT_OK(Merge("k1", "b"));
90 ASSERT_OK(Merge("k1", "c"));
91 ASSERT_OK(Merge("k1", "d"));
92 std::string value;
93 ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok());
94 // Make sure that only the latest two merge operands are used. If this was
95 // not the case the value would be "a,b,c,d".
96 ASSERT_EQ(value, "c,d");
97
98 // All K2 values are flushed to L0 into a single file.
99 ASSERT_OK(Merge("k2", "a"));
100 ASSERT_OK(Merge("k2", "b"));
101 ASSERT_OK(Merge("k2", "c"));
102 ASSERT_OK(Merge("k2", "d"));
103 ASSERT_OK(Flush());
104 ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok());
105 ASSERT_EQ(value, "c,d");
106
107 // All K3 values are flushed and are in different files.
108 ASSERT_OK(Merge("k3", "ab"));
109 ASSERT_OK(Flush());
110 ASSERT_OK(Merge("k3", "bc"));
111 ASSERT_OK(Flush());
112 ASSERT_OK(Merge("k3", "cd"));
113 ASSERT_OK(Flush());
114 ASSERT_OK(Merge("k3", "de"));
115 ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok());
116 ASSERT_EQ(value, "cd,de");
117
118 // All K4 values are in different levels
119 ASSERT_OK(Merge("k4", "ab"));
120 ASSERT_OK(Flush());
121 MoveFilesToLevel(4);
122 ASSERT_OK(Merge("k4", "bc"));
123 ASSERT_OK(Flush());
124 MoveFilesToLevel(3);
125 ASSERT_OK(Merge("k4", "cd"));
126 ASSERT_OK(Flush());
127 MoveFilesToLevel(1);
128 ASSERT_OK(Merge("k4", "de"));
129 ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok());
130 ASSERT_EQ(value, "cd,de");
131 }
132
133 TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
134 Options options;
135 options.create_if_missing = true;
136 options.merge_operator.reset(new TestPutOperator());
137 options.env = env_;
138 Reopen(options);
139 ASSERT_OK(Merge("k1", "v1"));
140 ASSERT_OK(Merge("k1", "corrupted"));
141 std::string value;
142 ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
143 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
144 }
145
146 TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
147 Options options;
148 options.create_if_missing = true;
149 options.merge_operator.reset(new TestPutOperator());
150 options.max_successive_merges = 3;
151 options.env = env_;
152 Reopen(options);
153 ASSERT_OK(Merge("k1", "v1"));
154 ASSERT_OK(Merge("k1", "v2"));
155 // Will trigger a merge when hitting max_successive_merges and the merge
156 // will fail. The delta will be inserted nevertheless.
157 ASSERT_OK(Merge("k1", "corrupted"));
158 // Data should stay unmerged after the error.
159 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
160 }
161
162 TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
163 Options options;
164 options.create_if_missing = true;
165 options.merge_operator.reset(new TestPutOperator());
166 options.env = env_;
167
168 DestroyAndReopen(options);
169 ASSERT_OK(Merge("k1", "v1"));
170 ASSERT_OK(Merge("k1", "corrupted"));
171 ASSERT_OK(Put("k2", "v2"));
172 auto* iter = db_->NewIterator(ReadOptions());
173 iter->Seek("k1");
174 ASSERT_FALSE(iter->Valid());
175 ASSERT_TRUE(iter->status().IsCorruption());
176 delete iter;
177 iter = db_->NewIterator(ReadOptions());
178 iter->Seek("k2");
179 ASSERT_TRUE(iter->Valid());
180 ASSERT_OK(iter->status());
181 iter->Prev();
182 ASSERT_FALSE(iter->Valid());
183 ASSERT_TRUE(iter->status().IsCorruption());
184 delete iter;
185 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
186
187 DestroyAndReopen(options);
188 ASSERT_OK(Merge("k1", "v1"));
189 ASSERT_OK(Put("k2", "v2"));
190 ASSERT_OK(Merge("k2", "corrupted"));
191 iter = db_->NewIterator(ReadOptions());
192 iter->Seek("k1");
193 ASSERT_TRUE(iter->Valid());
194 ASSERT_OK(iter->status());
195 iter->Next();
196 ASSERT_FALSE(iter->Valid());
197 ASSERT_TRUE(iter->status().IsCorruption());
198 delete iter;
199 VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
200 }
201
202
203 class MergeOperatorPinningTest : public DBMergeOperatorTest,
204 public testing::WithParamInterface<bool> {
205 public:
206 MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
207
208 bool disable_block_cache_;
209 };
210
211 INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
212 ::testing::Bool());
213
214 #ifndef ROCKSDB_LITE
215 TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
216 Options options = CurrentOptions();
217 BlockBasedTableOptions table_options;
218 table_options.block_size = 1; // every block will contain one entry
219 table_options.no_block_cache = disable_block_cache_;
220 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
221 options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
222 options.level0_slowdown_writes_trigger = (1 << 30);
223 options.level0_stop_writes_trigger = (1 << 30);
224 options.disable_auto_compactions = true;
225 DestroyAndReopen(options);
226
227 const int kKeysPerFile = 10;
228 const int kOperandsPerKeyPerFile = 7;
229 const int kOperandSize = 100;
230 // Filse to write in L0 before compacting to lower level
231 const int kFilesPerLevel = 3;
232
233 Random rnd(301);
234 std::map<std::string, std::string> true_data;
235 int batch_num = 1;
236 int lvl_to_fill = 4;
237 int key_id = 0;
238 while (true) {
239 for (int j = 0; j < kKeysPerFile; j++) {
240 std::string key = Key(key_id % 35);
241 key_id++;
242 for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
243 std::string val = RandomString(&rnd, kOperandSize);
244 ASSERT_OK(db_->Merge(WriteOptions(), key, val));
245 if (true_data[key].size() == 0) {
246 true_data[key] = val;
247 } else {
248 true_data[key] += "," + val;
249 }
250 }
251 }
252
253 if (lvl_to_fill == -1) {
254 // Keep last batch in memtable and stop
255 break;
256 }
257
258 ASSERT_OK(Flush());
259 if (batch_num % kFilesPerLevel == 0) {
260 if (lvl_to_fill != 0) {
261 MoveFilesToLevel(lvl_to_fill);
262 }
263 lvl_to_fill--;
264 }
265 batch_num++;
266 }
267
268 // 3 L0 files
269 // 1 L1 file
270 // 3 L2 files
271 // 1 L3 file
272 // 3 L4 Files
273 ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
274
275 VerifyDBFromMap(true_data);
276 }
277
278 TEST_P(MergeOperatorPinningTest, Randomized) {
279 do {
280 Options options = CurrentOptions();
281 options.merge_operator = MergeOperators::CreateMaxOperator();
282 BlockBasedTableOptions table_options;
283 table_options.no_block_cache = disable_block_cache_;
284 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
285 DestroyAndReopen(options);
286
287 Random rnd(301);
288 std::map<std::string, std::string> true_data;
289
290 const int kTotalMerges = 5000;
291 // Every key gets ~10 operands
292 const int kKeyRange = kTotalMerges / 10;
293 const int kOperandSize = 20;
294 const int kNumPutBefore = kKeyRange / 10; // 10% value
295 const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
296 const int kNumDelete = kKeyRange / 10; // 10% delete
297
298 // kNumPutBefore keys will have base values
299 for (int i = 0; i < kNumPutBefore; i++) {
300 std::string key = Key(rnd.Next() % kKeyRange);
301 std::string value = RandomString(&rnd, kOperandSize);
302 ASSERT_OK(db_->Put(WriteOptions(), key, value));
303
304 true_data[key] = value;
305 }
306
307 // Do kTotalMerges merges
308 for (int i = 0; i < kTotalMerges; i++) {
309 std::string key = Key(rnd.Next() % kKeyRange);
310 std::string value = RandomString(&rnd, kOperandSize);
311 ASSERT_OK(db_->Merge(WriteOptions(), key, value));
312
313 if (true_data[key] < value) {
314 true_data[key] = value;
315 }
316 }
317
318 // Overwrite random kNumPutAfter keys
319 for (int i = 0; i < kNumPutAfter; i++) {
320 std::string key = Key(rnd.Next() % kKeyRange);
321 std::string value = RandomString(&rnd, kOperandSize);
322 ASSERT_OK(db_->Put(WriteOptions(), key, value));
323
324 true_data[key] = value;
325 }
326
327 // Delete random kNumDelete keys
328 for (int i = 0; i < kNumDelete; i++) {
329 std::string key = Key(rnd.Next() % kKeyRange);
330 ASSERT_OK(db_->Delete(WriteOptions(), key));
331
332 true_data.erase(key);
333 }
334
335 VerifyDBFromMap(true_data);
336
337 } while (ChangeOptions(kSkipMergePut));
338 }
339
340 class MergeOperatorHook : public MergeOperator {
341 public:
342 explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
343 : merge_op_(_merge_op) {}
344
345 bool FullMergeV2(const MergeOperationInput& merge_in,
346 MergeOperationOutput* merge_out) const override {
347 before_merge_();
348 bool res = merge_op_->FullMergeV2(merge_in, merge_out);
349 after_merge_();
350 return res;
351 }
352
353 const char* Name() const override { return merge_op_->Name(); }
354
355 std::shared_ptr<MergeOperator> merge_op_;
356 std::function<void()> before_merge_ = []() {};
357 std::function<void()> after_merge_ = []() {};
358 };
359
360 TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
361 Options options = CurrentOptions();
362
363 auto merge_hook =
364 std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
365 options.merge_operator = merge_hook;
366 options.disable_auto_compactions = true;
367 options.level0_slowdown_writes_trigger = (1 << 30);
368 options.level0_stop_writes_trigger = (1 << 30);
369 options.max_open_files = 20;
370 BlockBasedTableOptions bbto;
371 bbto.no_block_cache = disable_block_cache_;
372 if (bbto.no_block_cache == false) {
373 bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
374 } else {
375 bbto.block_cache = nullptr;
376 }
377 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
378 DestroyAndReopen(options);
379
380 const int kNumOperands = 30;
381 const int kNumKeys = 1000;
382 const int kOperandSize = 100;
383 Random rnd(301);
384
385 // 1000 keys every key have 30 operands, every operand is in a different file
386 std::map<std::string, std::string> true_data;
387 for (int i = 0; i < kNumOperands; i++) {
388 for (int j = 0; j < kNumKeys; j++) {
389 std::string k = Key(j);
390 std::string v = RandomString(&rnd, kOperandSize);
391 ASSERT_OK(db_->Merge(WriteOptions(), k, v));
392
393 true_data[k] = std::max(true_data[k], v);
394 }
395 ASSERT_OK(Flush());
396 }
397
398 std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
399 ASSERT_EQ(file_numbers.size(), kNumOperands);
400 int merge_cnt = 0;
401
402 // Code executed before merge operation
403 merge_hook->before_merge_ = [&]() {
404 // Evict all tables from cache before every merge operation
405 for (uint64_t num : file_numbers) {
406 TableCache::Evict(dbfull()->TEST_table_cache(), num);
407 }
408 // Decrease cache capacity to force all unrefed blocks to be evicted
409 if (bbto.block_cache) {
410 bbto.block_cache->SetCapacity(1);
411 }
412 merge_cnt++;
413 };
414
415 // Code executed after merge operation
416 merge_hook->after_merge_ = [&]() {
417 // Increase capacity again after doing the merge
418 if (bbto.block_cache) {
419 bbto.block_cache->SetCapacity(64 * 1024 * 1024);
420 }
421 };
422
423 size_t total_reads;
424 VerifyDBFromMap(true_data, &total_reads);
425 ASSERT_EQ(merge_cnt, total_reads);
426
427 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
428
429 VerifyDBFromMap(true_data, &total_reads);
430 }
431
432 TEST_P(MergeOperatorPinningTest, TailingIterator) {
433 Options options = CurrentOptions();
434 options.merge_operator = MergeOperators::CreateMaxOperator();
435 BlockBasedTableOptions bbto;
436 bbto.no_block_cache = disable_block_cache_;
437 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
438 DestroyAndReopen(options);
439
440 const int kNumOperands = 100;
441 const int kNumWrites = 100000;
442
443 std::function<void()> writer_func = [&]() {
444 int k = 0;
445 for (int i = 0; i < kNumWrites; i++) {
446 db_->Merge(WriteOptions(), Key(k), Key(k));
447
448 if (i && i % kNumOperands == 0) {
449 k++;
450 }
451 if (i && i % 127 == 0) {
452 ASSERT_OK(Flush());
453 }
454 if (i && i % 317 == 0) {
455 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
456 }
457 }
458 };
459
460 std::function<void()> reader_func = [&]() {
461 ReadOptions ro;
462 ro.tailing = true;
463 Iterator* iter = db_->NewIterator(ro);
464
465 iter->SeekToFirst();
466 for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
467 while (!iter->Valid()) {
468 // wait for the key to be written
469 env_->SleepForMicroseconds(100);
470 iter->Seek(Key(i));
471 }
472 ASSERT_EQ(iter->key(), Key(i));
473 ASSERT_EQ(iter->value(), Key(i));
474
475 iter->Next();
476 }
477
478 delete iter;
479 };
480
481 rocksdb::port::Thread writer_thread(writer_func);
482 rocksdb::port::Thread reader_thread(reader_func);
483
484 writer_thread.join();
485 reader_thread.join();
486 }
487
488 TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
489 Options options = CurrentOptions();
490 options.merge_operator = MergeOperators::CreateStringAppendOperator();
491 DestroyAndReopen(options);
492
493 // Overview of the test:
494 // * There are two merge operands for the same key: one in an sst file,
495 // another in a memtable.
496 // * Seek a tailing iterator to this key.
497 // * As part of the seek, the iterator will:
498 // (a) first visit the operand in the memtable and tell ForwardIterator
499 // to pin this operand, then
500 // (b) move on to the operand in the sst file, then pass both operands
501 // to merge operator.
502 // * The memtable may get flushed and unreferenced by another thread between
503 // (a) and (b). The test simulates it by flushing the memtable inside a
504 // SyncPoint callback located between (a) and (b).
505 // * In this case it's ForwardIterator's responsibility to keep the memtable
506 // pinned until (b) is complete. There used to be a bug causing
507 // ForwardIterator to not pin it in some circumstances. This test
508 // reproduces it.
509
510 db_->Merge(WriteOptions(), "key", "sst");
511 db_->Flush(FlushOptions()); // Switch to SuperVersion A
512 db_->Merge(WriteOptions(), "key", "memtable");
513
514 // Pin SuperVersion A
515 std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
516
517 bool pushed_first_operand = false;
518 bool stepped_to_next_operand = false;
519 rocksdb::SyncPoint::GetInstance()->SetCallBack(
520 "DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
521 EXPECT_FALSE(pushed_first_operand);
522 pushed_first_operand = true;
523 db_->Flush(FlushOptions()); // Switch to SuperVersion B
524 });
525 rocksdb::SyncPoint::GetInstance()->SetCallBack(
526 "DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
527 EXPECT_FALSE(stepped_to_next_operand);
528 stepped_to_next_operand = true;
529 someone_else.reset(); // Unpin SuperVersion A
530 });
531 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
532
533 ReadOptions ro;
534 ro.tailing = true;
535 std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
536 iter->Seek("key");
537
538 ASSERT_TRUE(iter->status().ok());
539 ASSERT_TRUE(iter->Valid());
540 EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
541 EXPECT_TRUE(pushed_first_operand);
542 EXPECT_TRUE(stepped_to_next_operand);
543 }
544 #endif // ROCKSDB_LITE
545
546 TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
547 Options options = CurrentOptions();
548 options.merge_operator = MergeOperators::CreateStringAppendOperator();
549 DestroyAndReopen(options);
550
551 class TestSnapshotChecker : public SnapshotChecker {
552 public:
553 SnapshotCheckerResult CheckInSnapshot(
554 SequenceNumber seq, SequenceNumber snapshot_seq) const override {
555 return IsInSnapshot(seq, snapshot_seq)
556 ? SnapshotCheckerResult::kInSnapshot
557 : SnapshotCheckerResult::kNotInSnapshot;
558 }
559
560 bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const {
561 switch (snapshot_seq) {
562 case 0:
563 return seq == 0;
564 case 1:
565 return seq <= 1;
566 case 2:
567 // seq = 2 not visible to snapshot with seq = 2
568 return seq <= 1;
569 case 3:
570 return seq <= 3;
571 case 4:
572 // seq = 4 not visible to snpahost with seq = 4
573 return seq <= 3;
574 default:
575 // seq >=4 is uncommitted
576 return seq <= 4;
577 };
578 }
579 };
580 TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
581 dbfull()->SetSnapshotChecker(snapshot_checker);
582
583 std::string value;
584 ASSERT_OK(Merge("foo", "v1"));
585 ASSERT_EQ(1, db_->GetLatestSequenceNumber());
586 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
587 ASSERT_OK(Merge("foo", "v2"));
588 ASSERT_EQ(2, db_->GetLatestSequenceNumber());
589 // v2 is not visible to latest snapshot, which has seq = 2.
590 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
591 // Take a snapshot with seq = 2.
592 const Snapshot* snapshot1 = db_->GetSnapshot();
593 ASSERT_EQ(2, snapshot1->GetSequenceNumber());
594 // v2 is not visible to snapshot1, which has seq = 2
595 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
596
597 // Verify flush doesn't alter the result.
598 ASSERT_OK(Flush());
599 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
600 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
601
602 ASSERT_OK(Merge("foo", "v3"));
603 ASSERT_EQ(3, db_->GetLatestSequenceNumber());
604 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
605 ASSERT_OK(Merge("foo", "v4"));
606 ASSERT_EQ(4, db_->GetLatestSequenceNumber());
607 // v4 is not visible to latest snapshot, which has seq = 4.
608 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
609 const Snapshot* snapshot2 = db_->GetSnapshot();
610 ASSERT_EQ(4, snapshot2->GetSequenceNumber());
611 // v4 is not visible to snapshot2, which has seq = 4.
612 ASSERT_EQ("v1,v2,v3",
613 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
614
615 // Verify flush doesn't alter the result.
616 ASSERT_OK(Flush());
617 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
618 ASSERT_EQ("v1,v2,v3",
619 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
620 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
621
622 ASSERT_OK(Merge("foo", "v5"));
623 ASSERT_EQ(5, db_->GetLatestSequenceNumber());
624 // v5 is uncommitted
625 ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
626
627 // full manual compaction.
628 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
629
630 // Verify compaction doesn't alter the result.
631 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
632 ASSERT_EQ("v1,v2,v3",
633 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
634 ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
635
636 db_->ReleaseSnapshot(snapshot1);
637 db_->ReleaseSnapshot(snapshot2);
638 }
639
640 } // namespace rocksdb
641
642 int main(int argc, char** argv) {
643 rocksdb::port::InstallStackTraceHandler();
644 ::testing::InitGoogleTest(&argc, argv);
645 return RUN_ALL_TESTS();
646 }