1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
15 #include "db/db_impl/db_impl.h"
16 #include "db/dbformat.h"
17 #include "port/port.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/options.h"
20 #include "rocksdb/types.h"
21 #include "rocksdb/utilities/debug.h"
22 #include "rocksdb/utilities/transaction.h"
23 #include "rocksdb/utilities/transaction_db.h"
24 #include "table/mock_table.h"
25 #include "test_util/sync_point.h"
26 #include "test_util/testharness.h"
27 #include "test_util/testutil.h"
28 #include "test_util/transaction_test_util.h"
29 #include "util/mutexlock.h"
30 #include "util/random.h"
31 #include "util/string_util.h"
32 #include "utilities/fault_injection_env.h"
33 #include "utilities/merge_operators.h"
34 #include "utilities/merge_operators/string_append/stringappend.h"
35 #include "utilities/transactions/pessimistic_transaction_db.h"
36 #include "utilities/transactions/transaction_test.h"
37 #include "utilities/transactions/write_prepared_txn_db.h"
41 namespace ROCKSDB_NAMESPACE
{
43 using CommitEntry
= WritePreparedTxnDB::CommitEntry
;
44 using CommitEntry64b
= WritePreparedTxnDB::CommitEntry64b
;
45 using CommitEntry64bFormat
= WritePreparedTxnDB::CommitEntry64bFormat
;
47 TEST(PreparedHeap
, BasicsTest
) {
48 WritePreparedTxnDB::PreparedHeap heap
;
50 MutexLock
ml(heap
.push_pop_mutex());
52 // Test with one element
53 ASSERT_EQ(14l, heap
.top());
56 // Test that old min is still on top
57 ASSERT_EQ(14l, heap
.top());
64 // Test that old min is still on top
65 ASSERT_EQ(14l, heap
.top());
67 // Test that old min is still on top
68 ASSERT_EQ(14l, heap
.top());
70 // Test that the new comes to the top after multiple erase
71 ASSERT_EQ(34l, heap
.top());
73 // Test that the new comes to the top after single erase
74 ASSERT_EQ(44l, heap
.top());
76 ASSERT_EQ(44l, heap
.top());
77 heap
.pop(); // pop 44l
78 // Test that the erased items are ignored after pop
79 ASSERT_EQ(64l, heap
.top());
81 // Test that erasing an already popped item would work
82 ASSERT_EQ(64l, heap
.top());
84 ASSERT_EQ(64l, heap
.top());
86 MutexLock
ml(heap
.push_pop_mutex());
98 // Test top remains the same after a random order of many erases
99 ASSERT_EQ(64l, heap
.top());
101 // Test that pop works with a series of random pending erases
102 ASSERT_EQ(74l, heap
.top());
103 ASSERT_FALSE(heap
.empty());
105 // Test that empty works
106 ASSERT_TRUE(heap
.empty());
109 // This is a scenario reconstructed from a buggy trace. Test that the bug does
110 // not resurface again.
111 TEST(PreparedHeap
, EmptyAtTheEnd
) {
112 WritePreparedTxnDB::PreparedHeap heap
;
114 MutexLock
ml(heap
.push_pop_mutex());
117 ASSERT_EQ(40l, heap
.top());
118 // Although not a recommended scenario, we must be resilient against erase
119 // without a prior push.
121 ASSERT_EQ(40l, heap
.top());
123 MutexLock
ml(heap
.push_pop_mutex());
126 ASSERT_EQ(40l, heap
.top());
129 ASSERT_EQ(40l, heap
.top());
131 ASSERT_TRUE(heap
.empty());
134 MutexLock
ml(heap
.push_pop_mutex());
137 ASSERT_EQ(40l, heap
.top());
139 ASSERT_EQ(40l, heap
.top());
141 MutexLock
ml(heap
.push_pop_mutex());
144 ASSERT_EQ(40l, heap
.top());
147 // Test that the erase has not emptied the heap (we had a bug doing that)
148 ASSERT_FALSE(heap
.empty());
149 ASSERT_EQ(60l, heap
.top());
151 ASSERT_TRUE(heap
.empty());
154 // Generate random order of PreparedHeap access and test that the heap will be
155 // successfully emptied at the end.
156 TEST(PreparedHeap
, Concurrent
) {
157 const size_t t_cnt
= 10;
158 ROCKSDB_NAMESPACE::port::Thread t
[t_cnt
+ 1];
159 WritePreparedTxnDB::PreparedHeap heap
;
160 port::RWMutex prepared_mutex
;
161 std::atomic
<size_t> last
;
163 for (size_t n
= 0; n
< 100; n
++) {
165 t
[0] = ROCKSDB_NAMESPACE::port::Thread([&]() {
167 for (size_t seq
= 1; seq
<= t_cnt
; seq
++) {
168 // This is not recommended usage but we should be resilient against it.
169 bool skip_push
= rnd
.OneIn(5);
171 MutexLock
ml(heap
.push_pop_mutex());
172 std::this_thread::yield();
178 for (size_t i
= 1; i
<= t_cnt
; i
++) {
180 ROCKSDB_NAMESPACE::port::Thread([&heap
, &prepared_mutex
, &last
, i
]() {
183 std::this_thread::yield();
184 } while (last
.load() < seq
);
185 WriteLock
wl(&prepared_mutex
);
189 for (size_t i
= 0; i
<= t_cnt
; i
++) {
192 ASSERT_TRUE(heap
.empty());
196 // Test that WriteBatchWithIndex correctly counts the number of sub-batches
197 TEST(WriteBatchWithIndex
, SubBatchCnt
) {
198 ColumnFamilyOptions cf_options
;
199 std::string cf_name
= "two";
202 options
.create_if_missing
= true;
203 const std::string dbname
= test::PerThreadDBPath("transaction_testdb");
204 DestroyDB(dbname
, options
);
205 ASSERT_OK(DB::Open(options
, dbname
, &db
));
206 ColumnFamilyHandle
* cf_handle
= nullptr;
207 ASSERT_OK(db
->CreateColumnFamily(cf_options
, cf_name
, &cf_handle
));
208 WriteOptions write_options
;
209 size_t batch_cnt
= 1;
210 size_t save_points
= 0;
211 std::vector
<size_t> batch_cnt_at
;
212 WriteBatchWithIndex
batch(db
->DefaultColumnFamily()->GetComparator(), 0, true,
214 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
215 batch_cnt_at
.push_back(batch_cnt
);
216 batch
.SetSavePoint();
218 batch
.Put(Slice("key"), Slice("value"));
219 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
220 batch_cnt_at
.push_back(batch_cnt
);
221 batch
.SetSavePoint();
223 batch
.Put(Slice("key2"), Slice("value2"));
224 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
225 // duplicate the keys
226 batch_cnt_at
.push_back(batch_cnt
);
227 batch
.SetSavePoint();
229 batch
.Put(Slice("key"), Slice("value3"));
231 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
232 // duplicate the 2nd key. It should not be counted duplicate since a
233 // sub-patch is cut after the last duplicate.
234 batch_cnt_at
.push_back(batch_cnt
);
235 batch
.SetSavePoint();
237 batch
.Put(Slice("key2"), Slice("value4"));
238 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
239 // duplicate the keys but in a different cf. It should not be counted as
241 batch_cnt_at
.push_back(batch_cnt
);
242 batch
.SetSavePoint();
244 batch
.Put(cf_handle
, Slice("key"), Slice("value5"));
245 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
247 // Test that the number of sub-batches matches what we count with
249 std::map
<uint32_t, const Comparator
*> comparators
;
250 comparators
[0] = db
->DefaultColumnFamily()->GetComparator();
251 comparators
[cf_handle
->GetID()] = cf_handle
->GetComparator();
252 SubBatchCounter
counter(comparators
);
253 ASSERT_OK(batch
.GetWriteBatch()->Iterate(&counter
));
254 ASSERT_EQ(batch_cnt
, counter
.BatchCount());
256 // Test that RollbackToSavePoint will properly resets the number of
258 for (size_t i
= save_points
; i
> 0; i
--) {
259 batch
.RollbackToSavePoint();
260 ASSERT_EQ(batch_cnt_at
[i
- 1], batch
.SubBatchCnt());
263 // Test the count is right with random batches
265 const size_t TOTAL_KEYS
= 20; // 20 ~= 10 to cause a few randoms
267 std::string keys
[TOTAL_KEYS
];
268 for (size_t k
= 0; k
< TOTAL_KEYS
; k
++) {
269 int len
= static_cast<int>(rnd
.Uniform(50));
270 keys
[k
] = test::RandomKey(&rnd
, len
);
272 for (size_t i
= 0; i
< 1000; i
++) { // 1000 random batches
273 WriteBatchWithIndex
rndbatch(db
->DefaultColumnFamily()->GetComparator(),
275 for (size_t k
= 0; k
< 10; k
++) { // 10 key per batch
276 size_t ki
= static_cast<size_t>(rnd
.Uniform(TOTAL_KEYS
));
277 Slice key
= Slice(keys
[ki
]);
278 std::string tmp
= rnd
.RandomString(16);
279 Slice value
= Slice(tmp
);
280 rndbatch
.Put(key
, value
);
282 SubBatchCounter
batch_counter(comparators
);
283 ASSERT_OK(rndbatch
.GetWriteBatch()->Iterate(&batch_counter
));
284 ASSERT_EQ(rndbatch
.SubBatchCnt(), batch_counter
.BatchCount());
292 TEST(CommitEntry64b
, BasicTest
) {
293 const size_t INDEX_BITS
= static_cast<size_t>(21);
294 const size_t INDEX_SIZE
= static_cast<size_t>(1ull << INDEX_BITS
);
295 const CommitEntry64bFormat
FORMAT(static_cast<size_t>(INDEX_BITS
));
297 // zero-initialized CommitEntry64b should indicate an empty entry
298 CommitEntry64b empty_entry64b
;
299 uint64_t empty_index
= 11ul;
300 CommitEntry empty_entry
;
301 bool ok
= empty_entry64b
.Parse(empty_index
, &empty_entry
, FORMAT
);
304 // the zero entry is reserved for un-initialized entries
305 const size_t MAX_COMMIT
= (1 << FORMAT
.COMMIT_BITS
) - 1 - 1;
306 // Samples over the numbers that are covered by that many index bits
307 std::array
<uint64_t, 4> is
= {{0, 1, INDEX_SIZE
/ 2 + 1, INDEX_SIZE
- 1}};
308 // Samples over the numbers that are covered by that many commit bits
309 std::array
<uint64_t, 4> ds
= {{0, 1, MAX_COMMIT
/ 2 + 1, MAX_COMMIT
}};
310 // Iterate over prepare numbers that have i) cover all bits of a sequence
311 // number, and ii) include some bits that fall into the range of index or
313 for (uint64_t base
= 1; base
< kMaxSequenceNumber
; base
*= 2) {
314 for (uint64_t i
: is
) {
315 for (uint64_t d
: ds
) {
316 uint64_t p
= base
+ i
+ d
;
317 for (uint64_t c
: {p
, p
+ d
/ 2, p
+ d
}) {
318 uint64_t index
= p
% INDEX_SIZE
;
319 CommitEntry
before(p
, c
), after
;
320 CommitEntry64b
entry64b(before
, FORMAT
);
321 ok
= entry64b
.Parse(index
, &after
, FORMAT
);
323 if (!(before
== after
)) {
324 printf("base %" PRIu64
" i %" PRIu64
" d %" PRIu64
" p %" PRIu64
325 " c %" PRIu64
" index %" PRIu64
"\n",
326 base
, i
, d
, p
, c
, index
);
328 ASSERT_EQ(before
, after
);
335 class WritePreparedTxnDBMock
: public WritePreparedTxnDB
{
337 WritePreparedTxnDBMock(DBImpl
* db_impl
, TransactionDBOptions
& opt
)
338 : WritePreparedTxnDB(db_impl
, opt
) {}
339 void SetDBSnapshots(const std::vector
<SequenceNumber
>& snapshots
) {
340 snapshots_
= snapshots
;
342 void TakeSnapshot(SequenceNumber seq
) { snapshots_
.push_back(seq
); }
345 const std::vector
<SequenceNumber
> GetSnapshotListFromDB(
346 SequenceNumber
/* unused */) override
{
351 std::vector
<SequenceNumber
> snapshots_
;
354 class WritePreparedTransactionTestBase
: public TransactionTestBase
{
356 WritePreparedTransactionTestBase(bool use_stackable_db
, bool two_write_queue
,
357 TxnDBWritePolicy write_policy
,
358 WriteOrdering write_ordering
)
359 : TransactionTestBase(use_stackable_db
, two_write_queue
, write_policy
,
363 void UpdateTransactionDBOptions(size_t snapshot_cache_bits
,
364 size_t commit_cache_bits
) {
365 txn_db_options
.wp_snapshot_cache_bits
= snapshot_cache_bits
;
366 txn_db_options
.wp_commit_cache_bits
= commit_cache_bits
;
368 void UpdateTransactionDBOptions(size_t snapshot_cache_bits
) {
369 txn_db_options
.wp_snapshot_cache_bits
= snapshot_cache_bits
;
371 // If expect_update is set, check if it actually updated old_commit_map_. If
372 // it did not and yet suggested not to check the next snapshot, do the
373 // opposite to check if it was not a bad suggestion.
374 void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare
, uint64_t commit
,
376 uint64_t next_snapshot
,
377 bool expect_update
) {
378 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
379 // reset old_commit_map_empty_ so that its value indicate whether
380 // old_commit_map_ was updated
381 wp_db
->old_commit_map_empty_
= true;
382 bool check_next
= wp_db
->MaybeUpdateOldCommitMap(prepare
, commit
, snapshot
,
383 snapshot
< next_snapshot
);
384 if (expect_update
== wp_db
->old_commit_map_empty_
) {
385 printf("prepare: %" PRIu64
" commit: %" PRIu64
" snapshot: %" PRIu64
386 " next: %" PRIu64
"\n",
387 prepare
, commit
, snapshot
, next_snapshot
);
389 EXPECT_EQ(!expect_update
, wp_db
->old_commit_map_empty_
);
390 if (!check_next
&& wp_db
->old_commit_map_empty_
) {
391 // do the opposite to make sure it was not a bad suggestion
392 const bool dont_care_bool
= true;
393 wp_db
->MaybeUpdateOldCommitMap(prepare
, commit
, next_snapshot
,
395 if (!wp_db
->old_commit_map_empty_
) {
396 printf("prepare: %" PRIu64
" commit: %" PRIu64
" snapshot: %" PRIu64
397 " next: %" PRIu64
"\n",
398 prepare
, commit
, snapshot
, next_snapshot
);
400 EXPECT_TRUE(wp_db
->old_commit_map_empty_
);
404 // Test that a CheckAgainstSnapshots thread reading old_snapshots will not
405 // miss a snapshot because of a concurrent update by UpdateSnapshots that is
406 // writing new_snapshots. Both threads are broken at two points. The sync
407 // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
408 // entry is expected to be vital for one of the snapshots that is common
409 // between the old and new list of snapshots.
410 void SnapshotConcurrentAccessTestInternal(
411 WritePreparedTxnDB
* wp_db
,
412 const std::vector
<SequenceNumber
>& old_snapshots
,
413 const std::vector
<SequenceNumber
>& new_snapshots
, CommitEntry
& entry
,
414 SequenceNumber
& version
, size_t a1
, size_t a2
, size_t b1
, size_t b2
) {
415 // First reset the snapshot list
416 const std::vector
<SequenceNumber
> empty_snapshots
;
417 wp_db
->old_commit_map_empty_
= true;
418 wp_db
->UpdateSnapshots(empty_snapshots
, ++version
);
419 // Then initialize it with the old_snapshots
420 wp_db
->UpdateSnapshots(old_snapshots
, ++version
);
422 // Starting from the first thread, cut each thread at two points
423 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
424 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1
),
425 "WritePreparedTxnDB::UpdateSnapshots:s:start"},
426 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1
),
427 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1
)},
428 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2
),
429 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1
)},
430 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2
),
431 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2
)},
432 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
433 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2
)},
435 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
437 ASSERT_TRUE(wp_db
->old_commit_map_empty_
);
438 ROCKSDB_NAMESPACE::port::Thread
t1(
439 [&]() { wp_db
->UpdateSnapshots(new_snapshots
, version
); });
440 ROCKSDB_NAMESPACE::port::Thread
t2(
441 [&]() { wp_db
->CheckAgainstSnapshots(entry
); });
444 ASSERT_FALSE(wp_db
->old_commit_map_empty_
);
446 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
448 wp_db
->old_commit_map_empty_
= true;
449 wp_db
->UpdateSnapshots(empty_snapshots
, ++version
);
450 wp_db
->UpdateSnapshots(old_snapshots
, ++version
);
451 // Starting from the second thread, cut each thread at two points
452 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
453 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1
),
454 "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
455 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1
),
456 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1
)},
457 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2
),
458 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1
)},
459 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2
),
460 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2
)},
461 {"WritePreparedTxnDB::UpdateSnapshots:p:end",
462 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2
)},
464 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
466 ASSERT_TRUE(wp_db
->old_commit_map_empty_
);
467 ROCKSDB_NAMESPACE::port::Thread
t1(
468 [&]() { wp_db
->UpdateSnapshots(new_snapshots
, version
); });
469 ROCKSDB_NAMESPACE::port::Thread
t2(
470 [&]() { wp_db
->CheckAgainstSnapshots(entry
); });
473 ASSERT_FALSE(wp_db
->old_commit_map_empty_
);
475 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
478 // Verify value of keys.
479 void VerifyKeys(const std::unordered_map
<std::string
, std::string
>& data
,
480 const Snapshot
* snapshot
= nullptr) {
482 ReadOptions read_options
;
483 read_options
.snapshot
= snapshot
;
484 for (auto& kv
: data
) {
485 auto s
= db
->Get(read_options
, kv
.first
, &value
);
486 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
488 if (kv
.second
!= value
) {
489 printf("key = %s\n", kv
.first
.c_str());
491 ASSERT_EQ(kv
.second
, value
);
493 ASSERT_EQ(kv
.second
, "NOT_FOUND");
496 // Try with MultiGet API too
497 std::vector
<std::string
> values
;
498 auto s_vec
= db
->MultiGet(read_options
, {db
->DefaultColumnFamily()},
499 {kv
.first
}, &values
);
500 ASSERT_EQ(1, values
.size());
501 ASSERT_EQ(1, s_vec
.size());
503 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
505 ASSERT_TRUE(kv
.second
== values
[0]);
507 ASSERT_EQ(kv
.second
, "NOT_FOUND");
512 // Verify all versions of keys.
513 void VerifyInternalKeys(const std::vector
<KeyVersion
>& expected_versions
) {
514 std::vector
<KeyVersion
> versions
;
515 const size_t kMaxKeys
= 100000;
516 ASSERT_OK(GetAllKeyVersions(db
, expected_versions
.front().user_key
,
517 expected_versions
.back().user_key
, kMaxKeys
,
519 ASSERT_EQ(expected_versions
.size(), versions
.size());
520 for (size_t i
= 0; i
< versions
.size(); i
++) {
521 ASSERT_EQ(expected_versions
[i
].user_key
, versions
[i
].user_key
);
522 ASSERT_EQ(expected_versions
[i
].sequence
, versions
[i
].sequence
);
523 ASSERT_EQ(expected_versions
[i
].type
, versions
[i
].type
);
524 if (versions
[i
].type
!= kTypeDeletion
&&
525 versions
[i
].type
!= kTypeSingleDeletion
) {
526 ASSERT_EQ(expected_versions
[i
].value
, versions
[i
].value
);
528 // Range delete not supported.
529 assert(expected_versions
[i
].type
!= kTypeRangeDeletion
);
534 class WritePreparedTransactionTest
535 : public WritePreparedTransactionTestBase
,
536 virtual public ::testing::WithParamInterface
<
537 std::tuple
<bool, bool, TxnDBWritePolicy
, WriteOrdering
>> {
539 WritePreparedTransactionTest()
540 : WritePreparedTransactionTestBase(
541 std::get
<0>(GetParam()), std::get
<1>(GetParam()),
542 std::get
<2>(GetParam()), std::get
<3>(GetParam())){};
545 #ifndef ROCKSDB_VALGRIND_RUN
546 class SnapshotConcurrentAccessTest
547 : public WritePreparedTransactionTestBase
,
548 virtual public ::testing::WithParamInterface
<std::tuple
<
549 bool, bool, TxnDBWritePolicy
, WriteOrdering
, size_t, size_t>> {
551 SnapshotConcurrentAccessTest()
552 : WritePreparedTransactionTestBase(
553 std::get
<0>(GetParam()), std::get
<1>(GetParam()),
554 std::get
<2>(GetParam()), std::get
<3>(GetParam())),
555 split_id_(std::get
<4>(GetParam())),
556 split_cnt_(std::get
<5>(GetParam())){};
559 // A test is split into split_cnt_ tests, each identified with split_id_ where
560 // 0 <= split_id_ < split_cnt_
564 #endif // ROCKSDB_VALGRIND_RUN
566 class SeqAdvanceConcurrentTest
567 : public WritePreparedTransactionTestBase
,
568 virtual public ::testing::WithParamInterface
<std::tuple
<
569 bool, bool, TxnDBWritePolicy
, WriteOrdering
, size_t, size_t>> {
571 SeqAdvanceConcurrentTest()
572 : WritePreparedTransactionTestBase(
573 std::get
<0>(GetParam()), std::get
<1>(GetParam()),
574 std::get
<2>(GetParam()), std::get
<3>(GetParam())),
575 split_id_(std::get
<4>(GetParam())),
576 split_cnt_(std::get
<5>(GetParam())) {
577 special_env
.skip_fsync_
= true;
581 // A test is split into split_cnt_ tests, each identified with split_id_ where
582 // 0 <= split_id_ < split_cnt_
587 INSTANTIATE_TEST_CASE_P(
588 WritePreparedTransaction
, WritePreparedTransactionTest
,
590 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
),
591 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
),
592 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
)));
594 #ifndef ROCKSDB_VALGRIND_RUN
595 INSTANTIATE_TEST_CASE_P(
596 TwoWriteQueues
, SnapshotConcurrentAccessTest
,
598 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 0, 20),
599 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 1, 20),
600 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 2, 20),
601 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 3, 20),
602 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 4, 20),
603 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 5, 20),
604 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 6, 20),
605 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 7, 20),
606 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 8, 20),
607 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 9, 20),
608 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 10, 20),
609 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 11, 20),
610 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 12, 20),
611 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 13, 20),
612 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 14, 20),
613 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 15, 20),
614 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 16, 20),
615 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 17, 20),
616 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 18, 20),
617 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 19, 20),
619 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 0, 20),
620 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 1, 20),
621 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 2, 20),
622 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 3, 20),
623 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 4, 20),
624 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 5, 20),
625 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 6, 20),
626 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 7, 20),
627 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 8, 20),
628 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 9, 20),
629 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 10, 20),
630 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 11, 20),
631 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 12, 20),
632 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 13, 20),
633 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 14, 20),
634 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 15, 20),
635 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 16, 20),
636 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 17, 20),
637 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 18, 20),
638 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 19, 20)));
640 INSTANTIATE_TEST_CASE_P(
641 OneWriteQueue
, SnapshotConcurrentAccessTest
,
643 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 0, 20),
644 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 1, 20),
645 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 2, 20),
646 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 3, 20),
647 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 4, 20),
648 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 5, 20),
649 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 6, 20),
650 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 7, 20),
651 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 8, 20),
652 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 9, 20),
653 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 10, 20),
654 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 11, 20),
655 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 12, 20),
656 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 13, 20),
657 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 14, 20),
658 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 15, 20),
659 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 16, 20),
660 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 17, 20),
661 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 18, 20),
662 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 19, 20)));
664 INSTANTIATE_TEST_CASE_P(
665 TwoWriteQueues
, SeqAdvanceConcurrentTest
,
667 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 0, 10),
668 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 1, 10),
669 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 2, 10),
670 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 3, 10),
671 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 4, 10),
672 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 5, 10),
673 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 6, 10),
674 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 7, 10),
675 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 8, 10),
676 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 9, 10),
677 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 0, 10),
678 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 1, 10),
679 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 2, 10),
680 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 3, 10),
681 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 4, 10),
682 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 5, 10),
683 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 6, 10),
684 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 7, 10),
685 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 8, 10),
686 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 9, 10)));
688 INSTANTIATE_TEST_CASE_P(
689 OneWriteQueue
, SeqAdvanceConcurrentTest
,
691 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 0, 10),
692 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 1, 10),
693 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 2, 10),
694 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 3, 10),
695 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 4, 10),
696 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 5, 10),
697 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 6, 10),
698 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 7, 10),
699 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 8, 10),
700 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 9, 10)));
701 #endif // ROCKSDB_VALGRIND_RUN
703 TEST_P(WritePreparedTransactionTest
, CommitMap
) {
704 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
706 assert(wp_db
->db_impl_
);
707 size_t size
= wp_db
->COMMIT_CACHE_SIZE
;
708 CommitEntry c
= {5, 12}, e
;
709 bool evicted
= wp_db
->AddCommitEntry(c
.prep_seq
% size
, c
, &e
);
710 ASSERT_FALSE(evicted
);
712 // Should be able to read the same value
713 CommitEntry64b dont_care
;
714 bool found
= wp_db
->GetCommitEntry(c
.prep_seq
% size
, &dont_care
, &e
);
717 // Should be able to distinguish between overlapping entries
718 found
= wp_db
->GetCommitEntry((c
.prep_seq
+ size
) % size
, &dont_care
, &e
);
720 ASSERT_NE(c
.prep_seq
+ size
, e
.prep_seq
);
721 // Should be able to detect non-existent entry
722 found
= wp_db
->GetCommitEntry((c
.prep_seq
+ 1) % size
, &dont_care
, &e
);
725 // Reject an invalid exchange
726 CommitEntry e2
= {c
.prep_seq
+ size
, c
.commit_seq
+ size
};
727 CommitEntry64b
e2_64b(e2
, wp_db
->FORMAT
);
728 bool exchanged
= wp_db
->ExchangeCommitEntry(e2
.prep_seq
% size
, e2_64b
, e
);
729 ASSERT_FALSE(exchanged
);
730 // check whether it did actually reject that
731 found
= wp_db
->GetCommitEntry(e2
.prep_seq
% size
, &dont_care
, &e
);
735 // Accept a valid exchange
736 CommitEntry64b
c_64b(c
, wp_db
->FORMAT
);
737 CommitEntry e3
= {c
.prep_seq
+ size
, c
.commit_seq
+ size
+ 1};
738 exchanged
= wp_db
->ExchangeCommitEntry(c
.prep_seq
% size
, c_64b
, e3
);
739 ASSERT_TRUE(exchanged
);
740 // check whether it did actually accepted that
741 found
= wp_db
->GetCommitEntry(c
.prep_seq
% size
, &dont_care
, &e
);
746 CommitEntry e4
= {e3
.prep_seq
+ size
, e3
.commit_seq
+ size
+ 1};
747 evicted
= wp_db
->AddCommitEntry(e4
.prep_seq
% size
, e4
, &e
);
748 ASSERT_TRUE(evicted
);
750 found
= wp_db
->GetCommitEntry(e4
.prep_seq
% size
, &dont_care
, &e
);
755 TEST_P(WritePreparedTransactionTest
, MaybeUpdateOldCommitMap
) {
756 // If prepare <= snapshot < commit we should keep the entry around since its
757 // nonexistence could be interpreted as committed in the snapshot while it is
758 // not true. We keep such entries around by adding them to the
760 uint64_t p
/*prepare*/, c
/*commit*/, s
/*snapshot*/, ns
/*next_snapshot*/;
761 p
= 10l, c
= 15l, s
= 20l, ns
= 21l;
762 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
763 // If we do not expect the old commit map to be updated, try also with a next
764 // snapshot that is expected to update the old commit map. This would test
765 // that MaybeUpdateOldCommitMap would not prevent us from checking the next
766 // snapshot that must be checked.
767 p
= 10l, c
= 15l, s
= 20l, ns
= 11l;
768 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
770 p
= 10l, c
= 20l, s
= 20l, ns
= 19l;
771 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
772 p
= 10l, c
= 20l, s
= 20l, ns
= 21l;
773 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
775 p
= 20l, c
= 20l, s
= 20l, ns
= 21l;
776 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
777 p
= 20l, c
= 20l, s
= 20l, ns
= 19l;
778 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
780 p
= 10l, c
= 25l, s
= 20l, ns
= 21l;
781 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, true);
783 p
= 20l, c
= 25l, s
= 20l, ns
= 21l;
784 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, true);
786 p
= 21l, c
= 25l, s
= 20l, ns
= 22l;
787 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
788 p
= 21l, c
= 25l, s
= 20l, ns
= 19l;
789 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
792 // Trigger the condition where some old memtables are skipped when doing
793 // TransactionUtil::CheckKey(), and make sure the result is still correct.
794 TEST_P(WritePreparedTransactionTest
, CheckKeySkipOldMemtable
) {
795 const int kAttemptHistoryMemtable
= 0;
796 const int kAttemptImmMemTable
= 1;
797 for (int attempt
= kAttemptHistoryMemtable
; attempt
<= kAttemptImmMemTable
;
799 options
.max_write_buffer_number_to_maintain
= 3;
802 WriteOptions write_options
;
803 ReadOptions read_options
;
804 TransactionOptions txn_options
;
805 txn_options
.set_snapshot
= true;
809 ASSERT_OK(db
->Put(write_options
, Slice("foo"), Slice("bar")));
810 ASSERT_OK(db
->Put(write_options
, Slice("foo2"), Slice("bar")));
812 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
813 ASSERT_TRUE(txn
!= nullptr);
814 ASSERT_OK(txn
->SetName("txn"));
816 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
817 ASSERT_TRUE(txn2
!= nullptr);
818 ASSERT_OK(txn2
->SetName("txn2"));
820 // This transaction is created to cause potential conflict.
821 Transaction
* txn_x
= db
->BeginTransaction(write_options
);
822 ASSERT_OK(txn_x
->SetName("txn_x"));
823 ASSERT_OK(txn_x
->Put(Slice("foo"), Slice("bar3")));
824 ASSERT_OK(txn_x
->Prepare());
826 // Create snapshots after the prepare, but there should still
827 // be a conflict when trying to read "foo".
829 if (attempt
== kAttemptImmMemTable
) {
830 // For the second attempt, hold flush from beginning. The memtable
831 // will be switched to immutable after calling TEST_SwitchMemtable()
832 // while CheckKey() is called.
833 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
834 {{"WritePreparedTransactionTest.CheckKeySkipOldMemtable",
835 "FlushJob::Start"}});
836 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
839 // force a memtable flush. The memtable should still be kept
840 FlushOptions flush_ops
;
841 if (attempt
== kAttemptHistoryMemtable
) {
842 ASSERT_OK(db
->Flush(flush_ops
));
844 assert(attempt
== kAttemptImmMemTable
);
845 DBImpl
* db_impl
= static_cast<DBImpl
*>(db
->GetRootDB());
846 db_impl
->TEST_SwitchMemtable();
848 uint64_t num_imm_mems
;
849 ASSERT_TRUE(db
->GetIntProperty(DB::Properties::kNumImmutableMemTable
,
851 if (attempt
== kAttemptHistoryMemtable
) {
852 ASSERT_EQ(0, num_imm_mems
);
854 assert(attempt
== kAttemptImmMemTable
);
855 ASSERT_EQ(1, num_imm_mems
);
858 // Put something in active memtable
859 ASSERT_OK(db
->Put(write_options
, Slice("foo3"), Slice("bar")));
861 // Create txn3 after flushing, but this transaction also needs to
862 // check all memtables because of they contains uncommitted data.
863 Transaction
* txn3
= db
->BeginTransaction(write_options
, txn_options
);
864 ASSERT_TRUE(txn3
!= nullptr);
865 ASSERT_OK(txn3
->SetName("txn3"));
867 // Commit the pending write
868 ASSERT_OK(txn_x
->Commit());
870 // Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will
871 // pass. In all cases, both memtables are queried.
872 SetPerfLevel(PerfLevel::kEnableCount
);
873 get_perf_context()->Reset();
874 ASSERT_TRUE(txn3
->GetForUpdate(read_options
, "foo", &value
).IsBusy());
875 // We should have checked two memtables, active and either immutable
876 // or history memtable, depending on the test case.
877 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count
);
879 get_perf_context()->Reset();
880 ASSERT_TRUE(txn
->GetForUpdate(read_options
, "foo", &value
).IsBusy());
881 // We should have checked two memtables, active and either immutable
882 // or history memtable, depending on the test case.
883 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count
);
885 get_perf_context()->Reset();
886 ASSERT_OK(txn2
->GetForUpdate(read_options
, "foo2", &value
));
887 ASSERT_EQ(value
, "bar");
888 // We should have checked two memtables, and since there is no
889 // conflict, another Get() will be made and fetch the data from
890 // DB. If it is in immutable memtable, two extra memtable reads
891 // will be issued. If it is not (in history), only one will
892 // be made, which is to the active memtable.
893 if (attempt
== kAttemptHistoryMemtable
) {
894 ASSERT_EQ(3, get_perf_context()->get_from_memtable_count
);
896 assert(attempt
== kAttemptImmMemTable
);
897 ASSERT_EQ(4, get_perf_context()->get_from_memtable_count
);
900 Transaction
* txn4
= db
->BeginTransaction(write_options
, txn_options
);
901 ASSERT_TRUE(txn4
!= nullptr);
902 ASSERT_OK(txn4
->SetName("txn4"));
903 get_perf_context()->Reset();
904 ASSERT_OK(txn4
->GetForUpdate(read_options
, "foo", &value
));
905 if (attempt
== kAttemptHistoryMemtable
) {
906 // Active memtable will be checked in snapshot validation and when
907 // getting the value.
908 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count
);
910 // Only active memtable will be checked in snapshot validation but
911 // both of active and immutable snapshot will be queried when
912 // getting the value.
913 assert(attempt
== kAttemptImmMemTable
);
914 ASSERT_EQ(3, get_perf_context()->get_from_memtable_count
);
917 ASSERT_OK(txn2
->Commit());
918 ASSERT_OK(txn4
->Commit());
920 TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable");
921 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
923 SetPerfLevel(PerfLevel::kDisable
);
933 // Reproduce the bug with two snapshots with the same seuqence number and test
934 // that the release of the first snapshot will not affect the reads by the other
936 TEST_P(WritePreparedTransactionTest
, DoubleSnapshot
) {
937 TransactionOptions txn_options
;
940 // Insert initial value
941 ASSERT_OK(db
->Put(WriteOptions(), "key", "value1"));
943 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
945 wp_db
->BeginTransaction(WriteOptions(), txn_options
, nullptr);
946 ASSERT_OK(txn
->SetName("txn"));
947 ASSERT_OK(txn
->Put("key", "value2"));
948 ASSERT_OK(txn
->Prepare());
949 // Three snapshots with the same seq number
950 const Snapshot
* snapshot0
= wp_db
->GetSnapshot();
951 const Snapshot
* snapshot1
= wp_db
->GetSnapshot();
952 const Snapshot
* snapshot2
= wp_db
->GetSnapshot();
953 ASSERT_OK(txn
->Commit());
954 SequenceNumber cache_size
= wp_db
->COMMIT_CACHE_SIZE
;
955 SequenceNumber overlap_seq
= txn
->GetId() + cache_size
;
958 // 4th snapshot with a larger seq
959 const Snapshot
* snapshot3
= wp_db
->GetSnapshot();
960 // Cause an eviction to advance max evicted seq number
961 // This also fetches the 4 snapshots from db since their seq is lower than the
963 wp_db
->AddCommitted(overlap_seq
, overlap_seq
);
966 // It should see the value before commit
967 ropt
.snapshot
= snapshot2
;
968 PinnableSlice pinnable_val
;
969 s
= wp_db
->Get(ropt
, wp_db
->DefaultColumnFamily(), "key", &pinnable_val
);
971 ASSERT_TRUE(pinnable_val
== "value1");
972 pinnable_val
.Reset();
974 wp_db
->ReleaseSnapshot(snapshot1
);
976 // It should still see the value before commit
977 s
= wp_db
->Get(ropt
, wp_db
->DefaultColumnFamily(), "key", &pinnable_val
);
979 ASSERT_TRUE(pinnable_val
== "value1");
980 pinnable_val
.Reset();
982 // Cause an eviction to advance max evicted seq number and trigger updating
984 overlap_seq
+= cache_size
;
985 wp_db
->AddCommitted(overlap_seq
, overlap_seq
);
987 // It should still see the value before commit
988 s
= wp_db
->Get(ropt
, wp_db
->DefaultColumnFamily(), "key", &pinnable_val
);
990 ASSERT_TRUE(pinnable_val
== "value1");
991 pinnable_val
.Reset();
993 wp_db
->ReleaseSnapshot(snapshot0
);
994 wp_db
->ReleaseSnapshot(snapshot2
);
995 wp_db
->ReleaseSnapshot(snapshot3
);
998 size_t UniqueCnt(std::vector
<SequenceNumber
> vec
) {
999 std::set
<SequenceNumber
> aset
;
1000 for (auto i
: vec
) {
1005 // Test that the entries in old_commit_map_ get garbage collected properly
1006 TEST_P(WritePreparedTransactionTest
, OldCommitMapGC
) {
1007 const size_t snapshot_cache_bits
= 0;
1008 const size_t commit_cache_bits
= 0;
1009 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
1010 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1011 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
1012 new WritePreparedTxnDBMock(mock_db
, txn_db_options
));
1014 SequenceNumber seq
= 0;
1015 // Take the first snapshot that overlaps with two txn
1016 auto prep_seq
= ++seq
;
1017 wp_db
->AddPrepared(prep_seq
);
1018 auto prep_seq2
= ++seq
;
1019 wp_db
->AddPrepared(prep_seq2
);
1020 auto snap_seq1
= seq
;
1021 wp_db
->TakeSnapshot(snap_seq1
);
1022 auto commit_seq
= ++seq
;
1023 wp_db
->AddCommitted(prep_seq
, commit_seq
);
1024 wp_db
->RemovePrepared(prep_seq
);
1025 auto commit_seq2
= ++seq
;
1026 wp_db
->AddCommitted(prep_seq2
, commit_seq2
);
1027 wp_db
->RemovePrepared(prep_seq2
);
1028 // Take the 2nd and 3rd snapshot that overlap with the same txn
1030 wp_db
->AddPrepared(prep_seq
);
1031 auto snap_seq2
= seq
;
1032 wp_db
->TakeSnapshot(snap_seq2
);
1034 auto snap_seq3
= seq
;
1035 wp_db
->TakeSnapshot(snap_seq3
);
1038 wp_db
->AddCommitted(prep_seq
, commit_seq
);
1039 wp_db
->RemovePrepared(prep_seq
);
1040 // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
1041 // only item in the commit_cache_ via another commit.
1043 wp_db
->AddPrepared(prep_seq
);
1045 wp_db
->AddCommitted(prep_seq
, commit_seq
);
1046 wp_db
->RemovePrepared(prep_seq
);
1048 // Verify that the evicted commit entries for all snapshots are in the
1051 ASSERT_FALSE(wp_db
->old_commit_map_empty_
.load());
1052 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
1053 ASSERT_EQ(3, wp_db
->old_commit_map_
.size());
1054 ASSERT_EQ(2, UniqueCnt(wp_db
->old_commit_map_
[snap_seq1
]));
1055 ASSERT_EQ(1, UniqueCnt(wp_db
->old_commit_map_
[snap_seq2
]));
1056 ASSERT_EQ(1, UniqueCnt(wp_db
->old_commit_map_
[snap_seq3
]));
1059 // Verify that the 2nd snapshot is cleaned up after the release
1060 wp_db
->ReleaseSnapshotInternal(snap_seq2
);
1062 ASSERT_FALSE(wp_db
->old_commit_map_empty_
.load());
1063 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
1064 ASSERT_EQ(2, wp_db
->old_commit_map_
.size());
1065 ASSERT_EQ(2, UniqueCnt(wp_db
->old_commit_map_
[snap_seq1
]));
1066 ASSERT_EQ(1, UniqueCnt(wp_db
->old_commit_map_
[snap_seq3
]));
1069 // Verify that the 1st snapshot is cleaned up after the release
1070 wp_db
->ReleaseSnapshotInternal(snap_seq1
);
1072 ASSERT_FALSE(wp_db
->old_commit_map_empty_
.load());
1073 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
1074 ASSERT_EQ(1, wp_db
->old_commit_map_
.size());
1075 ASSERT_EQ(1, UniqueCnt(wp_db
->old_commit_map_
[snap_seq3
]));
1078 // Verify that the 3rd snapshot is cleaned up after the release
1079 wp_db
->ReleaseSnapshotInternal(snap_seq3
);
1081 ASSERT_TRUE(wp_db
->old_commit_map_empty_
.load());
1082 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
1083 ASSERT_EQ(0, wp_db
->old_commit_map_
.size());
1087 TEST_P(WritePreparedTransactionTest
, CheckAgainstSnapshots
) {
1088 std::vector
<SequenceNumber
> snapshots
= {100l, 200l, 300l, 400l, 500l,
1089 600l, 700l, 800l, 900l};
1090 const size_t snapshot_cache_bits
= 2;
1091 const uint64_t cache_size
= 1ul << snapshot_cache_bits
;
1092 // Safety check to express the intended size in the test. Can be adjusted if
1093 // the snapshots lists changed.
1094 assert((1ul << snapshot_cache_bits
) * 2 + 1 == snapshots
.size());
1095 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
1096 UpdateTransactionDBOptions(snapshot_cache_bits
);
1097 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
1098 new WritePreparedTxnDBMock(mock_db
, txn_db_options
));
1099 SequenceNumber version
= 1000l;
1100 ASSERT_EQ(0, wp_db
->snapshots_total_
);
1101 wp_db
->UpdateSnapshots(snapshots
, version
);
1102 ASSERT_EQ(snapshots
.size(), wp_db
->snapshots_total_
);
1103 // seq numbers are chosen so that we have two of them between each two
1104 // snapshots. If the diff of two consecutive seq is more than 5, there is a
1105 // snapshot between them.
1106 std::vector
<SequenceNumber
> seqs
= {50l, 55l, 150l, 155l, 250l, 255l, 350l,
1107 355l, 450l, 455l, 550l, 555l, 650l, 655l,
1108 750l, 755l, 850l, 855l, 950l, 955l};
1109 assert(seqs
.size() > 1);
1110 for (size_t i
= 0; i
+ 1 < seqs
.size(); i
++) {
1111 wp_db
->old_commit_map_empty_
= true; // reset
1112 CommitEntry commit_entry
= {seqs
[i
], seqs
[i
+ 1]};
1113 wp_db
->CheckAgainstSnapshots(commit_entry
);
1114 // Expect update if there is snapshot in between the prepare and commit
1115 bool expect_update
= commit_entry
.commit_seq
- commit_entry
.prep_seq
> 5 &&
1116 commit_entry
.commit_seq
>= snapshots
.front() &&
1117 commit_entry
.prep_seq
<= snapshots
.back();
1118 ASSERT_EQ(expect_update
, !wp_db
->old_commit_map_empty_
);
1121 // Test that search will include multiple snapshot from snapshot cache
1123 // exclude first and last item in the cache
1124 CommitEntry commit_entry
= {snapshots
.front() + 1,
1125 snapshots
[cache_size
- 1] - 1};
1126 wp_db
->old_commit_map_empty_
= true; // reset
1127 wp_db
->old_commit_map_
.clear();
1128 wp_db
->CheckAgainstSnapshots(commit_entry
);
1129 ASSERT_EQ(wp_db
->old_commit_map_
.size(), cache_size
- 2);
1132 // Test that search will include multiple snapshot from old snapshots
1134 // include two in the middle
1135 CommitEntry commit_entry
= {snapshots
[cache_size
] + 1,
1136 snapshots
[cache_size
+ 2] + 1};
1137 wp_db
->old_commit_map_empty_
= true; // reset
1138 wp_db
->old_commit_map_
.clear();
1139 wp_db
->CheckAgainstSnapshots(commit_entry
);
1140 ASSERT_EQ(wp_db
->old_commit_map_
.size(), 2);
1143 // Test that search will include both snapshot cache and old snapshots
1144 // Case 1: includes all in snapshot cache
1146 CommitEntry commit_entry
= {snapshots
.front() - 1, snapshots
.back() + 1};
1147 wp_db
->old_commit_map_empty_
= true; // reset
1148 wp_db
->old_commit_map_
.clear();
1149 wp_db
->CheckAgainstSnapshots(commit_entry
);
1150 ASSERT_EQ(wp_db
->old_commit_map_
.size(), snapshots
.size());
1153 // Case 2: includes all snapshot caches except the smallest
1155 CommitEntry commit_entry
= {snapshots
.front() + 1, snapshots
.back() + 1};
1156 wp_db
->old_commit_map_empty_
= true; // reset
1157 wp_db
->old_commit_map_
.clear();
1158 wp_db
->CheckAgainstSnapshots(commit_entry
);
1159 ASSERT_EQ(wp_db
->old_commit_map_
.size(), snapshots
.size() - 1);
1162 // Case 3: includes only the largest of snapshot cache
1164 CommitEntry commit_entry
= {snapshots
[cache_size
- 1] - 1,
1165 snapshots
.back() + 1};
1166 wp_db
->old_commit_map_empty_
= true; // reset
1167 wp_db
->old_commit_map_
.clear();
1168 wp_db
->CheckAgainstSnapshots(commit_entry
);
1169 ASSERT_EQ(wp_db
->old_commit_map_
.size(), snapshots
.size() - cache_size
+ 1);
1173 // This test is too slow for travis
1175 #ifndef ROCKSDB_VALGRIND_RUN
1176 // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
1177 // parallel with UpdateSnapshots.
1178 TEST_P(SnapshotConcurrentAccessTest
, SnapshotConcurrentAccess
) {
1179 // We have a sync point in the method under test after checking each snapshot.
1180 // If you increase the max number of snapshots in this test, more sync points
1181 // in the methods must also be added.
1182 const std::vector
<SequenceNumber
> snapshots
= {10l, 20l, 30l, 40l, 50l,
1183 60l, 70l, 80l, 90l, 100l};
1184 const size_t snapshot_cache_bits
= 2;
1185 // Safety check to express the intended size in the test. Can be adjusted if
1186 // the snapshots lists changed.
1187 assert((1ul << snapshot_cache_bits
) * 2 + 2 == snapshots
.size());
1188 SequenceNumber version
= 1000l;
1189 // Choose the cache size so that the new snapshot list could replace all the
1190 // existing items in the cache and also have some overflow.
1191 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
1192 UpdateTransactionDBOptions(snapshot_cache_bits
);
1193 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
1194 new WritePreparedTxnDBMock(mock_db
, txn_db_options
));
1195 const size_t extra
= 2;
1197 // Add up to extra items that do not fit into the cache
1198 for (size_t old_size
= 1; old_size
<= wp_db
->SNAPSHOT_CACHE_SIZE
+ extra
;
1200 const std::vector
<SequenceNumber
> old_snapshots(
1201 snapshots
.begin(), snapshots
.begin() + old_size
);
1203 // Each member of old snapshot might or might not appear in the new list. We
1204 // create a common_snapshots for each combination.
1205 size_t new_comb_cnt
= size_t(1) << old_size
;
1206 for (size_t new_comb
= 0; new_comb
< new_comb_cnt
; new_comb
++, loop_id
++) {
1207 if (loop_id
% split_cnt_
!= split_id_
) continue;
1208 printf("."); // To signal progress
1210 std::vector
<SequenceNumber
> common_snapshots
;
1211 for (size_t i
= 0; i
< old_snapshots
.size(); i
++) {
1212 if (IsInCombination(i
, new_comb
)) {
1213 common_snapshots
.push_back(old_snapshots
[i
]);
1216 // And add some new snapshots to the common list
1217 for (size_t added_snapshots
= 0;
1218 added_snapshots
<= snapshots
.size() - old_snapshots
.size();
1219 added_snapshots
++) {
1220 std::vector
<SequenceNumber
> new_snapshots
= common_snapshots
;
1221 for (size_t i
= 0; i
< added_snapshots
; i
++) {
1222 new_snapshots
.push_back(snapshots
[old_snapshots
.size() + i
]);
1224 for (auto it
= common_snapshots
.begin(); it
!= common_snapshots
.end();
1226 auto snapshot
= *it
;
1227 // Create a commit entry that is around the snapshot and thus should
1228 // be not be discarded
1229 CommitEntry entry
= {static_cast<uint64_t>(snapshot
- 1),
1231 // The critical part is when iterating the snapshot cache. Afterwards,
1232 // we are operating under the lock
1234 std::min(old_snapshots
.size(), wp_db
->SNAPSHOT_CACHE_SIZE
) + 1;
1236 std::min(new_snapshots
.size(), wp_db
->SNAPSHOT_CACHE_SIZE
) + 1;
1237 // Break each thread at two points
1238 for (size_t a1
= 1; a1
<= a_range
; a1
++) {
1239 for (size_t a2
= a1
+ 1; a2
<= a_range
; a2
++) {
1240 for (size_t b1
= 1; b1
<= b_range
; b1
++) {
1241 for (size_t b2
= b1
+ 1; b2
<= b_range
; b2
++) {
1242 SnapshotConcurrentAccessTestInternal(
1243 wp_db
.get(), old_snapshots
, new_snapshots
, entry
, version
,
1255 #endif // ROCKSDB_VALGRIND_RUN
1258 // This test clarifies the contract of AdvanceMaxEvictedSeq method
1259 TEST_P(WritePreparedTransactionTest
, AdvanceMaxEvictedSeqBasic
) {
1260 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
1261 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
1262 new WritePreparedTxnDBMock(mock_db
, txn_db_options
));
1264 // 1. Set the initial values for max, prepared, and snapshots
1265 SequenceNumber zero_max
= 0l;
1266 // Set the initial list of prepared txns
1267 const std::vector
<SequenceNumber
> initial_prepared
= {10, 30, 50, 100,
1269 for (auto p
: initial_prepared
) {
1270 wp_db
->AddPrepared(p
);
1272 // This updates the max value and also set old prepared
1273 SequenceNumber init_max
= 100;
1274 wp_db
->AdvanceMaxEvictedSeq(zero_max
, init_max
);
1275 const std::vector
<SequenceNumber
> initial_snapshots
= {20, 40};
1276 wp_db
->SetDBSnapshots(initial_snapshots
);
1277 // This will update the internal cache of snapshots from the DB
1278 wp_db
->UpdateSnapshots(initial_snapshots
, init_max
);
1280 // 2. Invoke AdvanceMaxEvictedSeq
1281 const std::vector
<SequenceNumber
> latest_snapshots
= {20, 110, 220, 300};
1282 wp_db
->SetDBSnapshots(latest_snapshots
);
1283 SequenceNumber new_max
= 200;
1284 wp_db
->AdvanceMaxEvictedSeq(init_max
, new_max
);
1286 // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
1287 // a. max should be updated to new_max
1288 ASSERT_EQ(wp_db
->max_evicted_seq_
, new_max
);
1289 // b. delayed prepared should contain every txn <= max and prepared should
1290 // only contain txns > max
1291 auto it
= initial_prepared
.begin();
1292 for (; it
!= initial_prepared
.end() && *it
<= new_max
; ++it
) {
1293 ASSERT_EQ(1, wp_db
->delayed_prepared_
.erase(*it
));
1295 ASSERT_TRUE(wp_db
->delayed_prepared_
.empty());
1296 for (; it
!= initial_prepared
.end() && !wp_db
->prepared_txns_
.empty();
1297 ++it
, wp_db
->prepared_txns_
.pop()) {
1298 ASSERT_EQ(*it
, wp_db
->prepared_txns_
.top());
1300 ASSERT_TRUE(it
== initial_prepared
.end());
1301 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1302 // c. snapshots should contain everything below new_max
1303 auto sit
= latest_snapshots
.begin();
1304 for (size_t i
= 0; sit
!= latest_snapshots
.end() && *sit
<= new_max
&&
1305 i
< wp_db
->snapshots_total_
;
1307 ASSERT_TRUE(i
< wp_db
->snapshots_total_
);
1308 // This test is in small scale and the list of snapshots are assumed to be
1309 // within the cache size limit. This is just a safety check to double check
1311 ASSERT_TRUE(i
< wp_db
->SNAPSHOT_CACHE_SIZE
);
1312 ASSERT_EQ(*sit
, wp_db
->snapshot_cache_
[i
]);
1316 // A new snapshot should always be always larger than max_evicted_seq_
1317 // Otherwise the snapshot does not go through AdvanceMaxEvictedSeq
1318 TEST_P(WritePreparedTransactionTest
, NewSnapshotLargerThanMax
) {
1319 WriteOptions woptions
;
1320 TransactionOptions txn_options
;
1321 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1322 Transaction
* txn0
= db
->BeginTransaction(woptions
, txn_options
);
1323 ASSERT_OK(txn0
->Put(Slice("key"), Slice("value")));
1324 ASSERT_OK(txn0
->Commit());
1325 const SequenceNumber seq
= txn0
->GetId(); // is also prepare seq
1327 std::vector
<Transaction
*> txns
;
1328 // Inc seq without committing anything
1329 for (int i
= 0; i
< 10; i
++) {
1330 Transaction
* txn
= db
->BeginTransaction(woptions
, txn_options
);
1331 ASSERT_OK(txn
->SetName("xid" + std::to_string(i
)));
1332 ASSERT_OK(txn
->Put(Slice("key" + std::to_string(i
)), Slice("value")));
1333 ASSERT_OK(txn
->Prepare());
1334 txns
.push_back(txn
);
1337 // The new commit is seq + 10
1338 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1339 auto snap
= wp_db
->GetSnapshot();
1340 const SequenceNumber last_seq
= snap
->GetSequenceNumber();
1341 wp_db
->ReleaseSnapshot(snap
);
1342 ASSERT_LT(seq
, last_seq
);
1343 // Otherwise our test is not effective
1344 ASSERT_LT(last_seq
- seq
, wp_db
->INC_STEP_FOR_MAX_EVICTED
);
1346 // Evict seq out of commit cache
1347 const SequenceNumber overwrite_seq
= seq
+ wp_db
->COMMIT_CACHE_SIZE
;
1348 // Check that the next write could make max go beyond last
1349 auto last_max
= wp_db
->max_evicted_seq_
.load();
1350 wp_db
->AddCommitted(overwrite_seq
, overwrite_seq
);
1351 // Check that eviction has advanced the max
1352 ASSERT_LT(last_max
, wp_db
->max_evicted_seq_
.load());
1353 // Check that the new max has not advanced the last seq
1354 ASSERT_LT(wp_db
->max_evicted_seq_
.load(), last_seq
);
1355 for (auto txn
: txns
) {
1361 // A new snapshot should always be always larger than max_evicted_seq_
1362 // In very rare cases max could be below last published seq. Test that
1363 // taking snapshot will wait for max to catch up.
1364 TEST_P(WritePreparedTransactionTest
, MaxCatchupWithNewSnapshot
) {
1365 const size_t snapshot_cache_bits
= 7; // same as default
1366 const size_t commit_cache_bits
= 0; // only 1 entry => frequent eviction
1367 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1369 WriteOptions woptions
;
1370 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1372 const int writes
= 50;
1373 const int batch_cnt
= 4;
1374 ROCKSDB_NAMESPACE::port::Thread
t1([&]() {
1375 for (int i
= 0; i
< writes
; i
++) {
1377 // For duplicate keys cause 4 commit entries, each evicting an entry that
1378 // is not published yet, thus causing max evicted seq go higher than last
1380 for (int b
= 0; b
< batch_cnt
; b
++) {
1381 batch
.Put("foo", "foo");
1383 db
->Write(woptions
, &batch
);
1387 ROCKSDB_NAMESPACE::port::Thread
t2([&]() {
1388 while (wp_db
->max_evicted_seq_
== 0) { // wait for insert thread
1389 std::this_thread::yield();
1391 for (int i
= 0; i
< 10; i
++) {
1392 SequenceNumber max_lower_bound
= wp_db
->max_evicted_seq_
;
1393 auto snap
= db
->GetSnapshot();
1394 if (snap
->GetSequenceNumber() != 0) {
1395 // Value of max_evicted_seq_ when snapshot was taken in unknown. We thus
1396 // compare with the lower bound instead as an approximation.
1397 ASSERT_LT(max_lower_bound
, snap
->GetSequenceNumber());
1398 } // seq 0 is ok to be less than max since nothing is visible to it
1399 db
->ReleaseSnapshot(snap
);
1406 // Make sure that the test has worked and seq number has advanced as we
1408 auto snap
= db
->GetSnapshot();
1409 ASSERT_GT(snap
->GetSequenceNumber(), batch_cnt
* writes
- 1);
1410 db
->ReleaseSnapshot(snap
);
1413 // Test that reads without snapshots would not hit an undefined state
1414 TEST_P(WritePreparedTransactionTest
, MaxCatchupWithUnbackedSnapshot
) {
1415 const size_t snapshot_cache_bits
= 7; // same as default
1416 const size_t commit_cache_bits
= 0; // only 1 entry => frequent eviction
1417 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1419 WriteOptions woptions
;
1420 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1422 const int writes
= 50;
1423 ROCKSDB_NAMESPACE::port::Thread
t1([&]() {
1424 for (int i
= 0; i
< writes
; i
++) {
1426 batch
.Put("key", "foo");
1427 db
->Write(woptions
, &batch
);
1431 ROCKSDB_NAMESPACE::port::Thread
t2([&]() {
1432 while (wp_db
->max_evicted_seq_
== 0) { // wait for insert thread
1433 std::this_thread::yield();
1436 PinnableSlice pinnable_val
;
1437 TransactionOptions txn_options
;
1438 for (int i
= 0; i
< 10; i
++) {
1439 auto s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
);
1440 ASSERT_TRUE(s
.ok() || s
.IsTryAgain());
1441 pinnable_val
.Reset();
1442 Transaction
* txn
= db
->BeginTransaction(woptions
, txn_options
);
1443 s
= txn
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
);
1444 ASSERT_TRUE(s
.ok() || s
.IsTryAgain());
1445 pinnable_val
.Reset();
1446 std::vector
<std::string
> values
;
1448 txn
->MultiGet(ropt
, {db
->DefaultColumnFamily()}, {"key"}, &values
);
1449 ASSERT_EQ(1, values
.size());
1450 ASSERT_EQ(1, s_vec
.size());
1452 ASSERT_TRUE(s
.ok() || s
.IsTryAgain());
1454 txn
->MultiGet(ropt
, db
->DefaultColumnFamily(), 1, &key
, &pinnable_val
, &s
,
1456 ASSERT_TRUE(s
.ok() || s
.IsTryAgain());
1464 // Make sure that the test has worked and seq number has advanced as we
1466 auto snap
= db
->GetSnapshot();
1467 ASSERT_GT(snap
->GetSequenceNumber(), writes
- 1);
1468 db
->ReleaseSnapshot(snap
);
1471 // Check that old_commit_map_ cleanup works correctly if the snapshot equals
1472 // max_evicted_seq_.
1473 TEST_P(WritePreparedTransactionTest
, CleanupSnapshotEqualToMax
) {
1474 const size_t snapshot_cache_bits
= 7; // same as default
1475 const size_t commit_cache_bits
= 0; // only 1 entry => frequent eviction
1476 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1478 WriteOptions woptions
;
1479 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1480 // Insert something to increase seq
1481 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1482 auto snap
= db
->GetSnapshot();
1483 auto snap_seq
= snap
->GetSequenceNumber();
1484 // Another insert should trigger eviction + load snapshot from db
1485 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1486 // This is the scenario that we check agaisnt
1487 ASSERT_EQ(snap_seq
, wp_db
->max_evicted_seq_
);
1488 // old_commit_map_ now has some data that needs gc
1489 ASSERT_EQ(1, wp_db
->snapshots_total_
);
1490 ASSERT_EQ(1, wp_db
->old_commit_map_
.size());
1492 db
->ReleaseSnapshot(snap
);
1494 // Another insert should trigger eviction + load snapshot from db
1495 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1497 // the snapshot and related metadata must be properly garbage collected
1498 ASSERT_EQ(0, wp_db
->snapshots_total_
);
1499 ASSERT_TRUE(wp_db
->snapshots_all_
.empty());
1500 ASSERT_EQ(0, wp_db
->old_commit_map_
.size());
1503 TEST_P(WritePreparedTransactionTest
, AdvanceSeqByOne
) {
1504 auto snap
= db
->GetSnapshot();
1505 auto seq1
= snap
->GetSequenceNumber();
1506 db
->ReleaseSnapshot(snap
);
1508 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1509 wp_db
->AdvanceSeqByOne();
1511 snap
= db
->GetSnapshot();
1512 auto seq2
= snap
->GetSequenceNumber();
1513 db
->ReleaseSnapshot(snap
);
1515 ASSERT_LT(seq1
, seq2
);
1518 // Test that the txn Initilize calls the overridden functions
1519 TEST_P(WritePreparedTransactionTest
, TxnInitialize
) {
1520 TransactionOptions txn_options
;
1521 WriteOptions write_options
;
1522 ASSERT_OK(db
->Put(write_options
, "key", "value"));
1523 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
1524 ASSERT_OK(txn0
->SetName("xid"));
1525 ASSERT_OK(txn0
->Put(Slice("key"), Slice("value1")));
1526 ASSERT_OK(txn0
->Prepare());
1528 // SetSnapshot is overridden to update min_uncommitted_
1529 txn_options
.set_snapshot
= true;
1530 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
1531 auto snap
= txn1
->GetSnapshot();
1532 auto snap_impl
= reinterpret_cast<const SnapshotImpl
*>(snap
);
1533 // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
1535 ASSERT_GT(snap_impl
->min_uncommitted_
, kMinUnCommittedSeq
);
1543 // This tests that transactions with duplicate keys perform correctly after max
1544 // is advancing their prepared sequence numbers. This will not be the case if
1545 // for example the txn does not add the prepared seq for the second sub-batch to
1546 // the PreparedHeap structure.
1547 TEST_P(WritePreparedTransactionTest
, AdvanceMaxEvictedSeqWithDuplicates
) {
1548 const size_t snapshot_cache_bits
= 7; // same as default
1549 const size_t commit_cache_bits
= 1; // disable commit cache
1550 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1554 PinnableSlice pinnable_val
;
1555 WriteOptions write_options
;
1556 TransactionOptions txn_options
;
1557 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
1558 ASSERT_OK(txn0
->SetName("xid"));
1559 ASSERT_OK(txn0
->Put(Slice("key"), Slice("value1")));
1560 ASSERT_OK(txn0
->Put(Slice("key"), Slice("value2")));
1561 ASSERT_OK(txn0
->Prepare());
1563 ASSERT_OK(db
->Put(write_options
, "key2", "value"));
1564 // Will cause max advance due to disabled commit cache
1565 ASSERT_OK(db
->Put(write_options
, "key3", "value"));
1567 auto s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
);
1568 ASSERT_TRUE(s
.IsNotFound());
1571 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1572 wp_db
->db_impl_
->FlushWAL(true);
1573 wp_db
->TEST_Crash();
1575 assert(db
!= nullptr);
1576 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
);
1577 ASSERT_TRUE(s
.IsNotFound());
1579 txn0
= db
->GetTransactionByName("xid");
1580 ASSERT_OK(txn0
->Rollback());
1584 #ifndef ROCKSDB_VALGRIND_RUN
1585 // Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and
1586 // delayed_prepared_, when is run concurrently with advancing max_evicted_seq,
1587 // which moves prepared txns from prepared_txns_ to delayed_prepared_.
1588 TEST_P(WritePreparedTransactionTest
, SmallestUnCommittedSeq
) {
1589 const size_t snapshot_cache_bits
= 7; // same as default
1590 const size_t commit_cache_bits
= 1; // disable commit cache
1591 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1593 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1595 PinnableSlice pinnable_val
;
1596 WriteOptions write_options
;
1597 TransactionOptions txn_options
;
1598 std::vector
<Transaction
*> txns
, committed_txns
;
1600 const int cnt
= 100;
1601 for (int i
= 0; i
< cnt
; i
++) {
1602 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
1603 ASSERT_OK(txn
->SetName("xid" + ToString(i
)));
1604 auto key
= "key1" + ToString(i
);
1605 auto value
= "value1" + ToString(i
);
1606 ASSERT_OK(txn
->Put(Slice(key
), Slice(value
)));
1607 ASSERT_OK(txn
->Prepare());
1608 txns
.push_back(txn
);
1613 ROCKSDB_NAMESPACE::port::Thread
commit_thread([&]() {
1614 for (int i
= 0; i
< cnt
; i
++) {
1615 uint32_t index
= rnd
.Uniform(cnt
- i
);
1618 MutexLock
l(&mutex
);
1620 txns
.erase(txns
.begin() + index
);
1622 // Since commit cache is practically disabled, commit results in immediate
1623 // advance in max_evicted_seq_ and subsequently moving some prepared txns
1624 // to delayed_prepared_.
1626 committed_txns
.push_back(txn
);
1629 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
1631 MutexLock
l(&mutex
);
1635 auto min_uncommitted
= wp_db
->SmallestUnCommittedSeq();
1636 ASSERT_LE(min_uncommitted
, (*txns
.begin())->GetId());
1640 commit_thread
.join();
1642 for (auto txn
: committed_txns
) {
1646 #endif // ROCKSDB_VALGRIND_RUN
1648 TEST_P(SeqAdvanceConcurrentTest
, SeqAdvanceConcurrent
) {
1649 // Given the sequential run of txns, with this timeout we should never see a
1650 // deadlock nor a timeout unless we have a key conflict, which should be
1651 // almost infeasible.
1652 txn_db_options
.transaction_lock_timeout
= 1000;
1653 txn_db_options
.default_lock_timeout
= 1000;
1657 // Number of different txn types we use in this test
1658 const size_t type_cnt
= 5;
1659 // The size of the first write group
1660 // TODO(myabandeh): This should be increase for pre-release tests
1661 const size_t first_group_size
= 2;
1662 // Total number of txns we run in each test
1663 // TODO(myabandeh): This should be increase for pre-release tests
1664 const size_t txn_cnt
= first_group_size
+ 1;
1666 size_t base
[txn_cnt
+ 1] = {
1669 for (size_t bi
= 1; bi
<= txn_cnt
; bi
++) {
1670 base
[bi
] = base
[bi
- 1] * type_cnt
;
1672 const size_t max_n
= static_cast<size_t>(std::pow(type_cnt
, txn_cnt
));
1673 printf("Number of cases being tested is %" ROCKSDB_PRIszt
"\n", max_n
);
1674 for (size_t n
= 0; n
< max_n
; n
++, ReOpen()) {
1675 if (n
% split_cnt_
!= split_id_
) continue;
1676 if (n
% 1000 == 0) {
1677 printf("Tested %" ROCKSDB_PRIszt
" cases so far\n", n
);
1679 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1680 auto seq
= db_impl
->TEST_GetLastVisibleSequence();
1681 with_empty_commits
= 0;
1683 // This is increased before writing the batch for commit
1685 // This is increased before txn starts linking if it expects to do a commit
1687 expected_commits
= 0;
1688 std::vector
<port::Thread
> threads
;
1691 std::atomic
<bool> batch_formed(false);
1692 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1693 "WriteThread::EnterAsBatchGroupLeader:End",
1694 [&](void* /*arg*/) { batch_formed
= true; });
1695 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1696 "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
1699 // Wait until the others are linked too.
1700 while (linked
< first_group_size
) {
1702 } else if (linked
== 1 + first_group_size
) {
1703 // Make the 2nd batch of the rest of writes plus any followup
1704 // commits from the first batch
1705 while (linked
< txn_cnt
+ commit_writes
) {
1708 // Then we will have one or more batches consisting of follow-up
1709 // commits from the 2nd batch. There is a bit of non-determinism here
1710 // but it should be tolerable.
1713 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1714 for (size_t bi
= 0; bi
< txn_cnt
; bi
++) {
1715 // get the bi-th digit in number system based on type_cnt
1716 size_t d
= (n
% base
[bi
+ 1]) / base
[bi
];
1719 threads
.emplace_back(txn_t0
, bi
);
1722 threads
.emplace_back(txn_t1
, bi
);
1725 threads
.emplace_back(txn_t2
, bi
);
1728 threads
.emplace_back(txn_t3
, bi
);
1731 threads
.emplace_back(txn_t3
, bi
);
1736 // wait to be linked
1737 while (linked
.load() <= bi
) {
1739 // after a queue of size first_group_size
1740 if (bi
+ 1 == first_group_size
) {
1741 while (!batch_formed
) {
1743 // to make it more deterministic, wait until the commits are linked
1744 while (linked
.load() <= bi
+ expected_commits
) {
1748 for (auto& t
: threads
) {
1751 if (options
.two_write_queues
) {
1752 // In this case none of the above scheduling tricks to deterministically
1753 // form merged batches works because the writes go to separate queues.
1754 // This would result in different write groups in each run of the test. We
1755 // still keep the test since although non-deterministic and hard to debug,
1756 // it is still useful to have.
1757 // TODO(myabandeh): Add a deterministic unit test for two_write_queues
1760 // Check if memtable inserts advanced seq number as expected
1761 seq
= db_impl
->TEST_GetLastVisibleSequence();
1762 ASSERT_EQ(exp_seq
, seq
);
1764 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1765 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1767 // Check if recovery preserves the last sequence number
1768 db_impl
->FlushWAL(true);
1770 assert(db
!= nullptr);
1771 db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1772 seq
= db_impl
->TEST_GetLastVisibleSequence();
1773 ASSERT_LE(exp_seq
, seq
+ with_empty_commits
);
1775 // Check if flush preserves the last sequence number
1776 db_impl
->Flush(fopt
);
1777 seq
= db_impl
->GetLatestSequenceNumber();
1778 ASSERT_LE(exp_seq
, seq
+ with_empty_commits
);
1780 // Check if recovery after flush preserves the last sequence number
1781 db_impl
->FlushWAL(true);
1783 assert(db
!= nullptr);
1784 db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1785 seq
= db_impl
->GetLatestSequenceNumber();
1786 ASSERT_LE(exp_seq
, seq
+ with_empty_commits
);
1790 // Run a couple of different txns among them some uncommitted. Restart the db at
1791 // a couple points to check whether the list of uncommitted txns are recovered
1793 TEST_P(WritePreparedTransactionTest
, BasicRecovery
) {
1794 options
.disable_auto_compactions
= true;
1796 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1800 TransactionOptions txn_options
;
1801 WriteOptions write_options
;
1802 size_t index
= 1000;
1803 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
1804 auto istr0
= std::to_string(index
);
1805 auto s
= txn0
->SetName("xid" + istr0
);
1807 s
= txn0
->Put(Slice("foo0" + istr0
), Slice("bar0" + istr0
));
1809 s
= txn0
->Prepare();
1810 auto prep_seq_0
= txn0
->GetId();
1815 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
1816 auto istr1
= std::to_string(index
);
1817 s
= txn1
->SetName("xid" + istr1
);
1819 s
= txn1
->Put(Slice("foo1" + istr1
), Slice("bar"));
1821 s
= txn1
->Prepare();
1822 auto prep_seq_1
= txn1
->GetId();
1827 PinnableSlice pinnable_val
;
1828 // Check the value is not committed before restart
1829 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1830 ASSERT_TRUE(s
.IsNotFound());
1831 pinnable_val
.Reset();
1835 wp_db
->db_impl_
->FlushWAL(true);
1836 wp_db
->TEST_Crash();
1838 assert(db
!= nullptr);
1839 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1840 // After recovery, all the uncommitted txns (0 and 1) should be inserted into
1841 // delayed_prepared_
1842 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1843 ASSERT_FALSE(wp_db
->delayed_prepared_empty_
);
1844 ASSERT_LE(prep_seq_0
, wp_db
->max_evicted_seq_
);
1845 ASSERT_LE(prep_seq_1
, wp_db
->max_evicted_seq_
);
1847 ReadLock
rl(&wp_db
->prepared_mutex_
);
1848 ASSERT_EQ(2, wp_db
->delayed_prepared_
.size());
1849 ASSERT_TRUE(wp_db
->delayed_prepared_
.find(prep_seq_0
) !=
1850 wp_db
->delayed_prepared_
.end());
1851 ASSERT_TRUE(wp_db
->delayed_prepared_
.find(prep_seq_1
) !=
1852 wp_db
->delayed_prepared_
.end());
1855 // Check the value is still not committed after restart
1856 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1857 ASSERT_TRUE(s
.IsNotFound());
1858 pinnable_val
.Reset();
1862 // Test that a recovered txns will be properly marked committed for the next
1864 txn1
= db
->GetTransactionByName("xid" + istr1
);
1865 ASSERT_NE(txn1
, nullptr);
1870 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
1871 auto istr2
= std::to_string(index
);
1872 s
= txn2
->SetName("xid" + istr2
);
1874 s
= txn2
->Put(Slice("foo2" + istr2
), Slice("bar"));
1876 s
= txn2
->Prepare();
1877 auto prep_seq_2
= txn2
->GetId();
1880 wp_db
->db_impl_
->FlushWAL(true);
1881 wp_db
->TEST_Crash();
1883 assert(db
!= nullptr);
1884 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1885 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1886 ASSERT_FALSE(wp_db
->delayed_prepared_empty_
);
1888 // 0 and 2 are prepared and 1 is committed
1890 ReadLock
rl(&wp_db
->prepared_mutex_
);
1891 ASSERT_EQ(2, wp_db
->delayed_prepared_
.size());
1892 const auto& end
= wp_db
->delayed_prepared_
.end();
1893 ASSERT_NE(wp_db
->delayed_prepared_
.find(prep_seq_0
), end
);
1894 ASSERT_EQ(wp_db
->delayed_prepared_
.find(prep_seq_1
), end
);
1895 ASSERT_NE(wp_db
->delayed_prepared_
.find(prep_seq_2
), end
);
1897 ASSERT_LE(prep_seq_0
, wp_db
->max_evicted_seq_
);
1898 ASSERT_LE(prep_seq_2
, wp_db
->max_evicted_seq_
);
1900 // Commit all the remaining txns
1901 txn0
= db
->GetTransactionByName("xid" + istr0
);
1902 ASSERT_NE(txn0
, nullptr);
1904 txn2
= db
->GetTransactionByName("xid" + istr2
);
1905 ASSERT_NE(txn2
, nullptr);
1908 // Check the value is committed after commit
1909 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1910 ASSERT_TRUE(s
.ok());
1911 ASSERT_TRUE(pinnable_val
== ("bar0" + istr0
));
1912 pinnable_val
.Reset();
1916 wp_db
->db_impl_
->FlushWAL(true);
1918 assert(db
!= nullptr);
1919 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1920 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1921 ASSERT_TRUE(wp_db
->delayed_prepared_empty_
);
1923 // Check the value is still committed after recovery
1924 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1925 ASSERT_TRUE(s
.ok());
1926 ASSERT_TRUE(pinnable_val
== ("bar0" + istr0
));
1927 pinnable_val
.Reset();
1930 // After recovery the commit map is empty while the max is set. The code would
1931 // go through a different path which requires a separate test. Test that the
1932 // committed data before the restart is visible to all snapshots.
1933 TEST_P(WritePreparedTransactionTest
, IsInSnapshotEmptyMap
) {
1934 for (bool end_with_prepare
: {false, true}) {
1936 WriteOptions woptions
;
1937 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1938 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1939 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1940 SequenceNumber prepare_seq
= kMaxSequenceNumber
;
1941 if (end_with_prepare
) {
1942 TransactionOptions txn_options
;
1943 Transaction
* txn
= db
->BeginTransaction(woptions
, txn_options
);
1944 ASSERT_OK(txn
->SetName("xid0"));
1945 ASSERT_OK(txn
->Prepare());
1946 prepare_seq
= txn
->GetId();
1949 dynamic_cast<WritePreparedTxnDB
*>(db
)->TEST_Crash();
1950 auto db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1951 db_impl
->FlushWAL(true);
1953 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1954 assert(wp_db
!= nullptr);
1955 ASSERT_GT(wp_db
->max_evicted_seq_
, 0); // max after recovery
1956 // Take a snapshot right after recovery
1957 const Snapshot
* snap
= db
->GetSnapshot();
1958 auto snap_seq
= snap
->GetSequenceNumber();
1959 ASSERT_GT(snap_seq
, 0);
1961 for (SequenceNumber seq
= 0;
1962 seq
<= wp_db
->max_evicted_seq_
&& seq
!= prepare_seq
; seq
++) {
1963 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
));
1965 if (end_with_prepare
) {
1966 ASSERT_FALSE(wp_db
->IsInSnapshot(prepare_seq
, snap_seq
));
1969 ASSERT_FALSE(wp_db
->IsInSnapshot(snap_seq
+ 1, snap_seq
));
1971 db
->ReleaseSnapshot(snap
);
1973 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1974 // Take a snapshot after some writes
1975 snap
= db
->GetSnapshot();
1976 snap_seq
= snap
->GetSequenceNumber();
1977 for (SequenceNumber seq
= 0;
1978 seq
<= wp_db
->max_evicted_seq_
&& seq
!= prepare_seq
; seq
++) {
1979 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
));
1981 if (end_with_prepare
) {
1982 ASSERT_FALSE(wp_db
->IsInSnapshot(prepare_seq
, snap_seq
));
1985 ASSERT_FALSE(wp_db
->IsInSnapshot(snap_seq
+ 1, snap_seq
));
1987 db
->ReleaseSnapshot(snap
);
1991 // Shows the contract of IsInSnapshot when called on invalid/released snapshots
1992 TEST_P(WritePreparedTransactionTest
, IsInSnapshotReleased
) {
1993 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1994 WriteOptions woptions
;
1995 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1997 const Snapshot
* snap1
= db
->GetSnapshot();
1998 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1999 ASSERT_OK(db
->Put(woptions
, "key", "value"));
2001 const Snapshot
* snap2
= db
->GetSnapshot();
2002 const SequenceNumber seq
= 1;
2003 // Evict seq out of commit cache
2004 size_t overwrite_seq
= wp_db
->COMMIT_CACHE_SIZE
+ seq
;
2005 wp_db
->AddCommitted(overwrite_seq
, overwrite_seq
);
2006 SequenceNumber snap_seq
;
2007 uint64_t min_uncommitted
= kMinUnCommittedSeq
;
2011 snap_seq
= snap1
->GetSequenceNumber();
2012 ASSERT_LE(seq
, snap_seq
);
2013 // Valid snapshot lower than max
2014 ASSERT_LE(snap_seq
, wp_db
->max_evicted_seq_
);
2015 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
, min_uncommitted
, &released
));
2016 ASSERT_FALSE(released
);
2019 snap_seq
= snap1
->GetSequenceNumber();
2020 // Invaid snapshot lower than max
2021 ASSERT_LE(snap_seq
+ 1, wp_db
->max_evicted_seq_
);
2023 wp_db
->IsInSnapshot(seq
, snap_seq
+ 1, min_uncommitted
, &released
));
2024 ASSERT_TRUE(released
);
2026 db
->ReleaseSnapshot(snap1
);
2029 // Released snapshot lower than max
2030 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
, min_uncommitted
, &released
));
2031 // The release does not take affect until the next max advance
2032 ASSERT_FALSE(released
);
2035 // Invaid snapshot lower than max
2037 wp_db
->IsInSnapshot(seq
, snap_seq
+ 1, min_uncommitted
, &released
));
2038 ASSERT_TRUE(released
);
2040 // This make the snapshot release to reflect in txn db structures
2041 wp_db
->AdvanceMaxEvictedSeq(wp_db
->max_evicted_seq_
,
2042 wp_db
->max_evicted_seq_
+ 1);
2045 // Released snapshot lower than max
2046 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
, min_uncommitted
, &released
));
2047 ASSERT_TRUE(released
);
2050 // Invaid snapshot lower than max
2052 wp_db
->IsInSnapshot(seq
, snap_seq
+ 1, min_uncommitted
, &released
));
2053 ASSERT_TRUE(released
);
2055 snap_seq
= snap2
->GetSequenceNumber();
2058 // Unreleased snapshot lower than max
2059 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
, min_uncommitted
, &released
));
2060 ASSERT_FALSE(released
);
2062 db
->ReleaseSnapshot(snap2
);
2065 // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
2066 // snapshot, max_committed_seq_, prepared, and commit entries.
2067 TEST_P(WritePreparedTransactionTest
, IsInSnapshot
) {
2069 // Use small commit cache to trigger lots of eviction and fast advance of
2071 const size_t commit_cache_bits
= 3;
2072 // Same for snapshot cache size
2073 const size_t snapshot_cache_bits
= 2;
2075 // Take some preliminary snapshots first. This is to stress the data structure
2076 // that holds the old snapshots as it will be designed to be efficient when
2077 // only a few snapshots are below the max_evicted_seq_.
2078 for (int max_snapshots
= 1; max_snapshots
< 20; max_snapshots
++) {
2079 // Leave some gap between the preliminary snapshots and the final snapshot
2080 // that we check. This should test for also different overlapping scenarios
2081 // between the last snapshot and the commits.
2082 for (int max_gap
= 1; max_gap
< 10; max_gap
++) {
2083 // Since we do not actually write to db, we mock the seq as it would be
2084 // increased by the db. The only exception is that we need db seq to
2085 // advance for our snapshots. for which we apply a dummy put each time we
2086 // increase our mock of seq.
2088 // At each step we prepare a txn and then we commit it in the next txn.
2089 // This emulates the consecutive transactions that write to the same key
2090 uint64_t cur_txn
= 0;
2091 // Number of snapshots taken so far
2092 int num_snapshots
= 0;
2093 // Number of gaps applied so far
2095 // The final snapshot that we will inspect
2096 uint64_t snapshot
= 0;
2097 bool found_committed
= false;
2098 // To stress the data structure that maintain prepared txns, at each cycle
2099 // we add a new prepare txn. These do not mean to be committed for
2100 // snapshot inspection.
2101 std::set
<uint64_t> prepared
;
2102 // We keep the list of txns committed before we take the last snapshot.
2103 // These should be the only seq numbers that will be found in the snapshot
2104 std::set
<uint64_t> committed_before
;
2105 // The set of commit seq numbers to be excluded from IsInSnapshot queries
2106 std::set
<uint64_t> commit_seqs
;
2107 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
2108 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2109 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
2110 new WritePreparedTxnDBMock(mock_db
, txn_db_options
));
2111 // We continue until max advances a bit beyond the snapshot.
2112 while (!snapshot
|| wp_db
->max_evicted_seq_
< snapshot
+ 100) {
2113 // do prepare for a transaction
2115 wp_db
->AddPrepared(seq
);
2116 prepared
.insert(seq
);
2118 // If cur_txn is not started, do prepare for it.
2122 wp_db
->AddPrepared(cur_txn
);
2123 } else { // else commit it
2125 wp_db
->AddCommitted(cur_txn
, seq
);
2126 wp_db
->RemovePrepared(cur_txn
);
2127 commit_seqs
.insert(seq
);
2129 committed_before
.insert(cur_txn
);
2134 if (num_snapshots
< max_snapshots
- 1) {
2135 // Take preliminary snapshots
2136 wp_db
->TakeSnapshot(seq
);
2138 } else if (gap_cnt
< max_gap
) {
2139 // Wait for some gap before taking the final snapshot
2141 } else if (!snapshot
) {
2142 // Take the final snapshot if it is not already taken
2144 wp_db
->TakeSnapshot(snapshot
);
2148 // If the snapshot is taken, verify seq numbers visible to it. We redo
2149 // it at each cycle to test that the system is still sound when
2150 // max_evicted_seq_ advances.
2152 for (uint64_t s
= 1;
2153 s
<= seq
&& commit_seqs
.find(s
) == commit_seqs
.end(); s
++) {
2154 bool was_committed
=
2155 (committed_before
.find(s
) != committed_before
.end());
2156 bool is_in_snapshot
= wp_db
->IsInSnapshot(s
, snapshot
);
2157 if (was_committed
!= is_in_snapshot
) {
2158 printf("max_snapshots %d max_gap %d seq %" PRIu64
" max %" PRIu64
2159 " snapshot %" PRIu64
2160 " gap_cnt %d num_snapshots %d s %" PRIu64
"\n",
2161 max_snapshots
, max_gap
, seq
,
2162 wp_db
->max_evicted_seq_
.load(), snapshot
, gap_cnt
,
2165 ASSERT_EQ(was_committed
, is_in_snapshot
);
2166 found_committed
= found_committed
|| is_in_snapshot
;
2170 // Safety check to make sure the test actually ran
2171 ASSERT_TRUE(found_committed
);
2172 // As an extra check, check if prepared set will be properly empty after
2173 // they are committed.
2175 wp_db
->AddCommitted(cur_txn
, seq
);
2176 wp_db
->RemovePrepared(cur_txn
);
2178 for (auto p
: prepared
) {
2179 wp_db
->AddCommitted(p
, seq
);
2180 wp_db
->RemovePrepared(p
);
2182 ASSERT_TRUE(wp_db
->delayed_prepared_
.empty());
2183 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
2188 void ASSERT_SAME(ReadOptions roptions
, TransactionDB
* db
, Status exp_s
,
2189 PinnableSlice
& exp_v
, Slice key
) {
2192 s
= db
->Get(roptions
, db
->DefaultColumnFamily(), key
, &v
);
2193 ASSERT_TRUE(exp_s
== s
);
2194 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
2196 ASSERT_TRUE(exp_v
== v
);
2199 // Try with MultiGet API too
2200 std::vector
<std::string
> values
;
2202 db
->MultiGet(roptions
, {db
->DefaultColumnFamily()}, {key
}, &values
);
2203 ASSERT_EQ(1, values
.size());
2204 ASSERT_EQ(1, s_vec
.size());
2206 ASSERT_TRUE(exp_s
== s
);
2207 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
2209 ASSERT_TRUE(exp_v
== values
[0]);
2213 void ASSERT_SAME(TransactionDB
* db
, Status exp_s
, PinnableSlice
& exp_v
,
2215 ASSERT_SAME(ReadOptions(), db
, exp_s
, exp_v
, key
);
2218 TEST_P(WritePreparedTransactionTest
, Rollback
) {
2219 ReadOptions roptions
;
2220 WriteOptions woptions
;
2221 TransactionOptions txn_options
;
2222 const size_t num_keys
= 4;
2223 const size_t num_values
= 5;
2224 for (size_t ikey
= 1; ikey
<= num_keys
; ikey
++) {
2225 for (size_t ivalue
= 0; ivalue
< num_values
; ivalue
++) {
2226 for (bool crash
: {false, true}) {
2228 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
2229 std::string key_str
= "key" + ToString(ikey
);
2234 ASSERT_OK(db
->Put(woptions
, key_str
, "initvalue1"));
2237 ASSERT_OK(db
->Merge(woptions
, key_str
, "initvalue2"));
2240 ASSERT_OK(db
->Delete(woptions
, key_str
));
2243 ASSERT_OK(db
->SingleDelete(woptions
, key_str
));
2251 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key1"), &v1
);
2254 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key2"), &v2
);
2257 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key3"), &v3
);
2260 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key4"), &v4
);
2261 Transaction
* txn
= db
->BeginTransaction(woptions
, txn_options
);
2262 auto s
= txn
->SetName("xid0");
2264 s
= txn
->Put(Slice("key1"), Slice("value1"));
2266 s
= txn
->Merge(Slice("key2"), Slice("value2"));
2268 s
= txn
->Delete(Slice("key3"));
2270 s
= txn
->SingleDelete(Slice("key4"));
2276 ReadLock
rl(&wp_db
->prepared_mutex_
);
2277 ASSERT_FALSE(wp_db
->prepared_txns_
.empty());
2278 ASSERT_EQ(txn
->GetId(), wp_db
->prepared_txns_
.top());
2281 ASSERT_SAME(db
, s1
, v1
, "key1");
2282 ASSERT_SAME(db
, s2
, v2
, "key2");
2283 ASSERT_SAME(db
, s3
, v3
, "key3");
2284 ASSERT_SAME(db
, s4
, v4
, "key4");
2288 auto db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
2289 db_impl
->FlushWAL(true);
2290 dynamic_cast<WritePreparedTxnDB
*>(db
)->TEST_Crash();
2292 assert(db
!= nullptr);
2293 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
2294 txn
= db
->GetTransactionByName("xid0");
2295 ASSERT_FALSE(wp_db
->delayed_prepared_empty_
);
2296 ReadLock
rl(&wp_db
->prepared_mutex_
);
2297 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
2298 ASSERT_FALSE(wp_db
->delayed_prepared_
.empty());
2299 ASSERT_TRUE(wp_db
->delayed_prepared_
.find(txn
->GetId()) !=
2300 wp_db
->delayed_prepared_
.end());
2303 ASSERT_SAME(db
, s1
, v1
, "key1");
2304 ASSERT_SAME(db
, s2
, v2
, "key2");
2305 ASSERT_SAME(db
, s3
, v3
, "key3");
2306 ASSERT_SAME(db
, s4
, v4
, "key4");
2308 s
= txn
->Rollback();
2312 ASSERT_TRUE(wp_db
->delayed_prepared_empty_
);
2313 ReadLock
rl(&wp_db
->prepared_mutex_
);
2314 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
2315 ASSERT_TRUE(wp_db
->delayed_prepared_
.empty());
2318 ASSERT_SAME(db
, s1
, v1
, "key1");
2319 ASSERT_SAME(db
, s2
, v2
, "key2");
2320 ASSERT_SAME(db
, s3
, v3
, "key3");
2321 ASSERT_SAME(db
, s4
, v4
, "key4");
2328 TEST_P(WritePreparedTransactionTest
, DisableGCDuringRecovery
) {
2329 // Use large buffer to avoid memtable flush after 1024 insertions
2330 options
.write_buffer_size
= 1024 * 1024;
2332 std::vector
<KeyVersion
> versions
;
2334 for (uint64_t i
= 1; i
<= 1024; i
++) {
2335 std::string v
= "bar" + ToString(i
);
2336 ASSERT_OK(db
->Put(WriteOptions(), "foo", v
));
2337 VerifyKeys({{"foo", v
}});
2338 seq
++; // one for the key/value
2339 KeyVersion kv
= {"foo", v
, seq
, kTypeValue
};
2340 if (options
.two_write_queues
) {
2341 seq
++; // one for the commit
2343 versions
.emplace_back(kv
);
2345 std::reverse(std::begin(versions
), std::end(versions
));
2346 VerifyInternalKeys(versions
);
2347 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
2348 db_impl
->FlushWAL(true);
2349 // Use small buffer to ensure memtable flush during recovery
2350 options
.write_buffer_size
= 1024;
2352 VerifyInternalKeys(versions
);
2355 TEST_P(WritePreparedTransactionTest
, SequenceNumberZero
) {
2356 ASSERT_OK(db
->Put(WriteOptions(), "foo", "bar"));
2357 VerifyKeys({{"foo", "bar"}});
2358 const Snapshot
* snapshot
= db
->GetSnapshot();
2359 ASSERT_OK(db
->Flush(FlushOptions()));
2360 // Dummy keys to avoid compaction trivially move files and get around actual
2361 // compaction logic.
2362 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
2363 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
2364 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2365 // Compaction will output keys with sequence number 0, if it is visible to
2366 // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
2367 // visible to any snapshot.
2368 VerifyKeys({{"foo", "bar"}});
2369 VerifyKeys({{"foo", "bar"}}, snapshot
);
2370 VerifyInternalKeys({{"foo", "bar", 0, kTypeValue
}});
2371 db
->ReleaseSnapshot(snapshot
);
2374 // Compaction should not remove a key if it is not committed, and should
2375 // proceed with older versions of the key as-if the new version doesn't exist.
2376 TEST_P(WritePreparedTransactionTest
, CompactionShouldKeepUncommittedKeys
) {
2377 options
.disable_auto_compactions
= true;
2379 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
2380 // Snapshots to avoid keys get evicted.
2381 std::vector
<const Snapshot
*> snapshots
;
2382 // Keep track of expected sequence number.
2383 SequenceNumber expected_seq
= 0;
2385 auto add_key
= [&](std::function
<Status()> func
) {
2388 if (options
.two_write_queues
) {
2389 expected_seq
++; // 1 for commit
2391 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
2392 snapshots
.push_back(db
->GetSnapshot());
2395 // Each key here represent a standalone test case.
2396 add_key([&]() { return db
->Put(WriteOptions(), "key1", "value1_1"); });
2397 add_key([&]() { return db
->Put(WriteOptions(), "key2", "value2_1"); });
2398 add_key([&]() { return db
->Put(WriteOptions(), "key3", "value3_1"); });
2399 add_key([&]() { return db
->Put(WriteOptions(), "key4", "value4_1"); });
2400 add_key([&]() { return db
->Merge(WriteOptions(), "key5", "value5_1"); });
2401 add_key([&]() { return db
->Merge(WriteOptions(), "key5", "value5_2"); });
2402 add_key([&]() { return db
->Put(WriteOptions(), "key6", "value6_1"); });
2403 add_key([&]() { return db
->Put(WriteOptions(), "key7", "value7_1"); });
2404 ASSERT_OK(db
->Flush(FlushOptions()));
2405 add_key([&]() { return db
->Delete(WriteOptions(), "key6"); });
2406 add_key([&]() { return db
->SingleDelete(WriteOptions(), "key7"); });
2408 auto* transaction
= db
->BeginTransaction(WriteOptions());
2409 ASSERT_OK(transaction
->SetName("txn"));
2410 ASSERT_OK(transaction
->Put("key1", "value1_2"));
2411 ASSERT_OK(transaction
->Delete("key2"));
2412 ASSERT_OK(transaction
->SingleDelete("key3"));
2413 ASSERT_OK(transaction
->Merge("key4", "value4_2"));
2414 ASSERT_OK(transaction
->Merge("key5", "value5_3"));
2415 ASSERT_OK(transaction
->Put("key6", "value6_2"));
2416 ASSERT_OK(transaction
->Put("key7", "value7_2"));
2417 // Prepare but not commit.
2418 ASSERT_OK(transaction
->Prepare());
2419 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
2420 ASSERT_OK(db
->Flush(FlushOptions()));
2421 for (auto* s
: snapshots
) {
2422 db
->ReleaseSnapshot(s
);
2424 // Dummy keys to avoid compaction trivially move files and get around actual
2425 // compaction logic.
2426 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
2427 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
2428 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2430 {"key1", "value1_1"},
2431 {"key2", "value2_1"},
2432 {"key3", "value3_1"},
2433 {"key4", "value4_1"},
2434 {"key5", "value5_1,value5_2"},
2435 {"key6", "NOT_FOUND"},
2436 {"key7", "NOT_FOUND"},
2438 VerifyInternalKeys({
2439 {"key1", "value1_2", expected_seq
, kTypeValue
},
2440 {"key1", "value1_1", 0, kTypeValue
},
2441 {"key2", "", expected_seq
, kTypeDeletion
},
2442 {"key2", "value2_1", 0, kTypeValue
},
2443 {"key3", "", expected_seq
, kTypeSingleDeletion
},
2444 {"key3", "value3_1", 0, kTypeValue
},
2445 {"key4", "value4_2", expected_seq
, kTypeMerge
},
2446 {"key4", "value4_1", 0, kTypeValue
},
2447 {"key5", "value5_3", expected_seq
, kTypeMerge
},
2448 {"key5", "value5_1,value5_2", 0, kTypeValue
},
2449 {"key6", "value6_2", expected_seq
, kTypeValue
},
2450 {"key7", "value7_2", expected_seq
, kTypeValue
},
2452 ASSERT_OK(transaction
->Commit());
2454 {"key1", "value1_2"},
2455 {"key2", "NOT_FOUND"},
2456 {"key3", "NOT_FOUND"},
2457 {"key4", "value4_1,value4_2"},
2458 {"key5", "value5_1,value5_2,value5_3"},
2459 {"key6", "value6_2"},
2460 {"key7", "value7_2"},
2465 // Compaction should keep keys visible to a snapshot based on commit sequence,
2466 // not just prepare sequence.
2467 TEST_P(WritePreparedTransactionTest
, CompactionShouldKeepSnapshotVisibleKeys
) {
2468 options
.disable_auto_compactions
= true;
2470 // Keep track of expected sequence number.
2471 SequenceNumber expected_seq
= 0;
2472 auto* txn1
= db
->BeginTransaction(WriteOptions());
2473 ASSERT_OK(txn1
->SetName("txn1"));
2474 ASSERT_OK(txn1
->Put("key1", "value1_1"));
2475 ASSERT_OK(txn1
->Prepare());
2476 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
2477 ASSERT_OK(txn1
->Commit());
2478 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
2479 ASSERT_EQ(++expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
2481 // Take a snapshots to avoid keys get evicted before compaction.
2482 const Snapshot
* snapshot1
= db
->GetSnapshot();
2483 auto* txn2
= db
->BeginTransaction(WriteOptions());
2484 ASSERT_OK(txn2
->SetName("txn2"));
2485 ASSERT_OK(txn2
->Put("key2", "value2_1"));
2486 ASSERT_OK(txn2
->Prepare());
2487 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
2488 // txn1 commit before snapshot2 and it is visible to snapshot2.
2489 // txn2 commit after snapshot2 and it is not visible.
2490 const Snapshot
* snapshot2
= db
->GetSnapshot();
2491 ASSERT_OK(txn2
->Commit());
2492 ASSERT_EQ(++expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
2494 // Take a snapshots to avoid keys get evicted before compaction.
2495 const Snapshot
* snapshot3
= db
->GetSnapshot();
2496 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1_2"));
2497 expected_seq
++; // 1 for write
2498 SequenceNumber seq1
= expected_seq
;
2499 if (options
.two_write_queues
) {
2500 expected_seq
++; // 1 for commit
2502 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
2503 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2_2"));
2504 expected_seq
++; // 1 for write
2505 SequenceNumber seq2
= expected_seq
;
2506 if (options
.two_write_queues
) {
2507 expected_seq
++; // 1 for commit
2509 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
2510 ASSERT_OK(db
->Flush(FlushOptions()));
2511 db
->ReleaseSnapshot(snapshot1
);
2512 db
->ReleaseSnapshot(snapshot3
);
2513 // Dummy keys to avoid compaction trivially move files and get around actual
2514 // compaction logic.
2515 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
2516 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
2517 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2518 VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
2519 VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2
);
2520 VerifyInternalKeys({
2521 {"key1", "value1_2", seq1
, kTypeValue
},
2522 // "value1_1" is visible to snapshot2. Also keys at bottom level visible
2523 // to earliest snapshot will output with seq = 0.
2524 {"key1", "value1_1", 0, kTypeValue
},
2525 {"key2", "value2_2", seq2
, kTypeValue
},
2527 db
->ReleaseSnapshot(snapshot2
);
2530 TEST_P(WritePreparedTransactionTest
, SmallestUncommittedOptimization
) {
2531 const size_t snapshot_cache_bits
= 7; // same as default
2532 const size_t commit_cache_bits
= 0; // disable commit cache
2533 for (bool has_recent_prepare
: {true, false}) {
2534 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2537 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1"));
2539 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2540 ASSERT_OK(transaction
->SetName("txn"));
2541 ASSERT_OK(transaction
->Delete("key1"));
2542 ASSERT_OK(transaction
->Prepare());
2543 // snapshot1 should get min_uncommitted from prepared_txns_ heap.
2544 auto snapshot1
= db
->GetSnapshot();
2545 ASSERT_EQ(transaction
->GetId(),
2546 ((SnapshotImpl
*)snapshot1
)->min_uncommitted_
);
2547 // Add a commit to advance max_evicted_seq and move the prepared transaction
2548 // into delayed_prepared_ set.
2549 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2"));
2550 Transaction
* txn2
= nullptr;
2551 if (has_recent_prepare
) {
2553 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2554 ASSERT_OK(txn2
->SetName("txn2"));
2555 ASSERT_OK(txn2
->Put("key3", "value3"));
2556 ASSERT_OK(txn2
->Prepare());
2558 // snapshot2 should get min_uncommitted from delayed_prepared_ set.
2559 auto snapshot2
= db
->GetSnapshot();
2560 ASSERT_EQ(transaction
->GetId(),
2561 ((SnapshotImpl
*)snapshot1
)->min_uncommitted_
);
2562 ASSERT_OK(transaction
->Commit());
2564 if (has_recent_prepare
) {
2565 ASSERT_OK(txn2
->Commit());
2568 VerifyKeys({{"key1", "NOT_FOUND"}});
2569 VerifyKeys({{"key1", "value1"}}, snapshot1
);
2570 VerifyKeys({{"key1", "value1"}}, snapshot2
);
2571 db
->ReleaseSnapshot(snapshot1
);
2572 db
->ReleaseSnapshot(snapshot2
);
2576 // Insert two values, v1 and v2, for a key. Between prepare and commit of v2
2577 // take two snapshots, s1 and s2. Release s1 during compaction.
2578 // Test to make sure compaction doesn't get confused and think s1 can see both
2579 // values, and thus compact out the older value by mistake.
2580 TEST_P(WritePreparedTransactionTest
, ReleaseSnapshotDuringCompaction
) {
2581 const size_t snapshot_cache_bits
= 7; // same as default
2582 const size_t commit_cache_bits
= 0; // minimum commit cache
2583 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2586 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1_1"));
2588 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2589 ASSERT_OK(transaction
->SetName("txn"));
2590 ASSERT_OK(transaction
->Put("key1", "value1_2"));
2591 ASSERT_OK(transaction
->Prepare());
2592 auto snapshot1
= db
->GetSnapshot();
2593 // Increment sequence number.
2594 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2"));
2595 auto snapshot2
= db
->GetSnapshot();
2596 ASSERT_OK(transaction
->Commit());
2598 VerifyKeys({{"key1", "value1_2"}});
2599 VerifyKeys({{"key1", "value1_1"}}, snapshot1
);
2600 VerifyKeys({{"key1", "value1_1"}}, snapshot2
);
2601 // Add a flush to avoid compaction to fallback to trivial move.
2603 auto callback
= [&](void*) {
2604 // Release snapshot1 after CompactionIterator init.
2605 // CompactionIterator need to figure out the earliest snapshot
2606 // that can see key1:value1_2 is kMaxSequenceNumber, not
2607 // snapshot1 or snapshot2.
2608 db
->ReleaseSnapshot(snapshot1
);
2609 // Add some keys to advance max_evicted_seq.
2610 ASSERT_OK(db
->Put(WriteOptions(), "key3", "value3"));
2611 ASSERT_OK(db
->Put(WriteOptions(), "key4", "value4"));
2613 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2615 SyncPoint::GetInstance()->EnableProcessing();
2617 ASSERT_OK(db
->Flush(FlushOptions()));
2618 VerifyKeys({{"key1", "value1_2"}});
2619 VerifyKeys({{"key1", "value1_1"}}, snapshot2
);
2620 db
->ReleaseSnapshot(snapshot2
);
2621 SyncPoint::GetInstance()->ClearAllCallBacks();
2624 // Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2,
2625 // after committing v2. Release s1 during compaction, right after compaction
2626 // processes v2 and before processes v1. Test to make sure compaction doesn't
2627 // get confused and believe v1 and v2 are visible to different snapshot
2628 // (v1 by s2, v2 by s1) and refuse to compact out v1.
2629 TEST_P(WritePreparedTransactionTest
, ReleaseSnapshotDuringCompaction2
) {
2630 const size_t snapshot_cache_bits
= 7; // same as default
2631 const size_t commit_cache_bits
= 0; // minimum commit cache
2632 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2635 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1"));
2636 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value2"));
2637 SequenceNumber v2_seq
= db
->GetLatestSequenceNumber();
2638 auto* s1
= db
->GetSnapshot();
2639 // Advance sequence number.
2640 ASSERT_OK(db
->Put(WriteOptions(), "key2", "dummy"));
2641 auto* s2
= db
->GetSnapshot();
2643 int count_value
= 0;
2644 auto callback
= [&](void* arg
) {
2645 auto* ikey
= reinterpret_cast<ParsedInternalKey
*>(arg
);
2646 if (ikey
->user_key
== "key1") {
2648 if (count_value
== 2) {
2650 db
->ReleaseSnapshot(s1
);
2651 // Add some keys to advance max_evicted_seq and update
2653 ASSERT_OK(db
->Put(WriteOptions(), "key3", "dummy"));
2654 ASSERT_OK(db
->Put(WriteOptions(), "key4", "dummy"));
2658 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV",
2660 SyncPoint::GetInstance()->EnableProcessing();
2662 ASSERT_OK(db
->Flush(FlushOptions()));
2663 // value1 should be compact out.
2664 VerifyInternalKeys({{"key1", "value2", v2_seq
, kTypeValue
}});
2667 db
->ReleaseSnapshot(s2
);
2668 SyncPoint::GetInstance()->ClearAllCallBacks();
2671 // Insert two values, v1 and v2, for a key. Insert another dummy key
2672 // so to evict the commit cache for v2, while v1 is still in commit cache.
2673 // Take two snapshots, s1 and s2. Release s1 during compaction.
2674 // Since commit cache for v2 is evicted, and old_commit_map don't have
2675 // s1 (it is released),
2676 // TODO(myabandeh): how can we be sure that the v2's commit info is evicted
2677 // (and not v1's)? Instead of putting a dummy, we can directly call
2678 // AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache.
2679 TEST_P(WritePreparedTransactionTest
, ReleaseSnapshotDuringCompaction3
) {
2680 const size_t snapshot_cache_bits
= 7; // same as default
2681 const size_t commit_cache_bits
= 1; // commit cache size = 2
2682 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2685 // Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
2686 // It also advance max_evicted_seq and can trigger old_commit_map cleanup.
2687 auto add_dummy
= [&]() {
2689 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2690 ASSERT_OK(txn_dummy
->SetName("txn_dummy"));
2691 ASSERT_OK(txn_dummy
->Put("dummy", "dummy"));
2692 ASSERT_OK(txn_dummy
->Prepare());
2693 ASSERT_OK(txn_dummy
->Commit());
2697 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1"));
2699 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2700 ASSERT_OK(txn
->SetName("txn"));
2701 ASSERT_OK(txn
->Put("key1", "value2"));
2702 ASSERT_OK(txn
->Prepare());
2703 // TODO(myabandeh): replace it with GetId()?
2704 auto v2_seq
= db
->GetLatestSequenceNumber();
2705 ASSERT_OK(txn
->Commit());
2707 auto* s1
= db
->GetSnapshot();
2708 // Dummy key to advance sequence number.
2710 auto* s2
= db
->GetSnapshot();
2712 auto callback
= [&](void*) {
2713 db
->ReleaseSnapshot(s1
);
2714 // Add some dummy entries to trigger s1 being cleanup from old_commit_map.
2718 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2720 SyncPoint::GetInstance()->EnableProcessing();
2722 ASSERT_OK(db
->Flush(FlushOptions()));
2723 // value1 should be compact out.
2724 VerifyInternalKeys({{"key1", "value2", v2_seq
, kTypeValue
}});
2726 db
->ReleaseSnapshot(s2
);
2727 SyncPoint::GetInstance()->ClearAllCallBacks();
2730 TEST_P(WritePreparedTransactionTest
, ReleaseEarliestSnapshotDuringCompaction
) {
2731 const size_t snapshot_cache_bits
= 7; // same as default
2732 const size_t commit_cache_bits
= 0; // minimum commit cache
2733 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2736 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1"));
2738 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2739 ASSERT_OK(transaction
->SetName("txn"));
2740 ASSERT_OK(transaction
->Delete("key1"));
2741 ASSERT_OK(transaction
->Prepare());
2742 SequenceNumber del_seq
= db
->GetLatestSequenceNumber();
2743 auto snapshot1
= db
->GetSnapshot();
2744 // Increment sequence number.
2745 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2"));
2746 auto snapshot2
= db
->GetSnapshot();
2747 ASSERT_OK(transaction
->Commit());
2749 VerifyKeys({{"key1", "NOT_FOUND"}});
2750 VerifyKeys({{"key1", "value1"}}, snapshot1
);
2751 VerifyKeys({{"key1", "value1"}}, snapshot2
);
2752 ASSERT_OK(db
->Flush(FlushOptions()));
2754 auto callback
= [&](void* compaction
) {
2755 // Release snapshot1 after CompactionIterator init.
2756 // CompactionIterator need to double check and find out snapshot2 is now
2757 // the earliest existing snapshot.
2758 if (compaction
!= nullptr) {
2759 db
->ReleaseSnapshot(snapshot1
);
2760 // Add some keys to advance max_evicted_seq.
2761 ASSERT_OK(db
->Put(WriteOptions(), "key3", "value3"));
2762 ASSERT_OK(db
->Put(WriteOptions(), "key4", "value4"));
2765 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2767 SyncPoint::GetInstance()->EnableProcessing();
2769 // Dummy keys to avoid compaction trivially move files and get around actual
2770 // compaction logic.
2771 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
2772 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
2773 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2774 // Only verify for key1. Both the put and delete for the key should be kept.
2775 // Since the delete tombstone is not visible to snapshot2, we need to keep
2776 // at least one version of the key, for write-conflict check.
2777 VerifyInternalKeys({{"key1", "", del_seq
, kTypeDeletion
},
2778 {"key1", "value1", 0, kTypeValue
}});
2779 db
->ReleaseSnapshot(snapshot2
);
2780 SyncPoint::GetInstance()->ClearAllCallBacks();
2783 // A more complex test to verify compaction/flush should keep keys visible
2785 TEST_P(WritePreparedTransactionTest
,
2786 CompactionKeepSnapshotVisibleKeysRandomized
) {
2787 constexpr size_t kNumTransactions
= 10;
2788 constexpr size_t kNumIterations
= 1000;
2790 std::vector
<Transaction
*> transactions(kNumTransactions
, nullptr);
2791 std::vector
<size_t> versions(kNumTransactions
, 0);
2792 std::unordered_map
<std::string
, std::string
> current_data
;
2793 std::vector
<const Snapshot
*> snapshots
;
2794 std::vector
<std::unordered_map
<std::string
, std::string
>> snapshot_data
;
2797 options
.disable_auto_compactions
= true;
2800 for (size_t i
= 0; i
< kNumTransactions
; i
++) {
2801 std::string key
= "key" + ToString(i
);
2802 std::string value
= "value0";
2803 ASSERT_OK(db
->Put(WriteOptions(), key
, value
));
2804 current_data
[key
] = value
;
2806 VerifyKeys(current_data
);
2808 for (size_t iter
= 0; iter
< kNumIterations
; iter
++) {
2809 auto r
= rnd
.Next() % (kNumTransactions
+ 1);
2810 if (r
< kNumTransactions
) {
2811 std::string key
= "key" + ToString(r
);
2812 if (transactions
[r
] == nullptr) {
2813 std::string value
= "value" + ToString(versions
[r
] + 1);
2814 auto* txn
= db
->BeginTransaction(WriteOptions());
2815 ASSERT_OK(txn
->SetName("txn" + ToString(r
)));
2816 ASSERT_OK(txn
->Put(key
, value
));
2817 ASSERT_OK(txn
->Prepare());
2818 transactions
[r
] = txn
;
2820 std::string value
= "value" + ToString(++versions
[r
]);
2821 ASSERT_OK(transactions
[r
]->Commit());
2822 delete transactions
[r
];
2823 transactions
[r
] = nullptr;
2824 current_data
[key
] = value
;
2827 auto* snapshot
= db
->GetSnapshot();
2828 VerifyKeys(current_data
, snapshot
);
2829 snapshots
.push_back(snapshot
);
2830 snapshot_data
.push_back(current_data
);
2832 VerifyKeys(current_data
);
2834 // Take a last snapshot to test compaction with uncommitted prepared
2836 snapshots
.push_back(db
->GetSnapshot());
2837 snapshot_data
.push_back(current_data
);
2839 assert(snapshots
.size() == snapshot_data
.size());
2840 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
2841 VerifyKeys(snapshot_data
[i
], snapshots
[i
]);
2843 ASSERT_OK(db
->Flush(FlushOptions()));
2844 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
2845 VerifyKeys(snapshot_data
[i
], snapshots
[i
]);
2847 // Dummy keys to avoid compaction trivially move files and get around actual
2848 // compaction logic.
2849 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
2850 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
2851 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2852 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
2853 VerifyKeys(snapshot_data
[i
], snapshots
[i
]);
2856 for (size_t i
= 0; i
< kNumTransactions
; i
++) {
2857 if (transactions
[i
] == nullptr) {
2860 ASSERT_OK(transactions
[i
]->Commit());
2861 delete transactions
[i
];
2863 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
2864 db
->ReleaseSnapshot(snapshots
[i
]);
2868 // Compaction should not apply the optimization to output key with sequence
2869 // number equal to 0 if the key is not visible to earliest snapshot, based on
2870 // commit sequence number.
2871 TEST_P(WritePreparedTransactionTest
,
2872 CompactionShouldKeepSequenceForUncommittedKeys
) {
2873 options
.disable_auto_compactions
= true;
2875 // Keep track of expected sequence number.
2876 SequenceNumber expected_seq
= 0;
2877 auto* transaction
= db
->BeginTransaction(WriteOptions());
2878 ASSERT_OK(transaction
->SetName("txn"));
2879 ASSERT_OK(transaction
->Put("key1", "value1"));
2880 ASSERT_OK(transaction
->Prepare());
2881 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
2882 SequenceNumber seq1
= expected_seq
;
2883 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2"));
2884 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
2885 expected_seq
++; // one for data
2886 if (options
.two_write_queues
) {
2887 expected_seq
++; // one for commit
2889 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
2890 ASSERT_OK(db
->Flush(FlushOptions()));
2891 // Dummy keys to avoid compaction trivially move files and get around actual
2892 // compaction logic.
2893 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
2894 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
2895 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2897 {"key1", "NOT_FOUND"},
2900 VerifyInternalKeys({
2901 // "key1" has not been committed. It keeps its sequence number.
2902 {"key1", "value1", seq1
, kTypeValue
},
2903 // "key2" is committed and output with seq = 0.
2904 {"key2", "value2", 0, kTypeValue
},
2906 ASSERT_OK(transaction
->Commit());
2914 TEST_P(WritePreparedTransactionTest
, CommitAndSnapshotDuringCompaction
) {
2915 options
.disable_auto_compactions
= true;
2918 const Snapshot
* snapshot
= nullptr;
2919 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1"));
2920 auto* txn
= db
->BeginTransaction(WriteOptions());
2921 ASSERT_OK(txn
->SetName("txn"));
2922 ASSERT_OK(txn
->Put("key1", "value2"));
2923 ASSERT_OK(txn
->Prepare());
2925 auto callback
= [&](void*) {
2926 // Snapshot is taken after compaction start. It should be taken into
2927 // consideration for whether to compact out value1.
2928 snapshot
= db
->GetSnapshot();
2929 ASSERT_OK(txn
->Commit());
2932 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2934 SyncPoint::GetInstance()->EnableProcessing();
2935 ASSERT_OK(db
->Flush(FlushOptions()));
2936 ASSERT_NE(nullptr, snapshot
);
2937 VerifyKeys({{"key1", "value2"}});
2938 VerifyKeys({{"key1", "value1"}}, snapshot
);
2939 db
->ReleaseSnapshot(snapshot
);
2942 TEST_P(WritePreparedTransactionTest
, Iterate
) {
2943 auto verify_state
= [](Iterator
* iter
, const std::string
& key
,
2944 const std::string
& value
) {
2945 ASSERT_TRUE(iter
->Valid());
2946 ASSERT_OK(iter
->status());
2947 ASSERT_EQ(key
, iter
->key().ToString());
2948 ASSERT_EQ(value
, iter
->value().ToString());
2951 auto verify_iter
= [&](const std::string
& expected_val
) {
2952 // Get iterator from a concurrent transaction and make sure it has the
2953 // same view as an iterator from the DB.
2954 auto* txn
= db
->BeginTransaction(WriteOptions());
2956 for (int i
= 0; i
< 2; i
++) {
2957 Iterator
* iter
= (i
== 0)
2958 ? db
->NewIterator(ReadOptions())
2959 : txn
->GetIterator(ReadOptions());
2962 verify_state(iter
, "foo", expected_val
);
2965 verify_state(iter
, "a", "va");
2967 verify_state(iter
, "foo", expected_val
);
2969 iter
->SeekForPrev("y");
2970 verify_state(iter
, "foo", expected_val
);
2972 iter
->SeekForPrev("z");
2973 verify_state(iter
, "z", "vz");
2975 verify_state(iter
, "foo", expected_val
);
2981 ASSERT_OK(db
->Put(WriteOptions(), "foo", "v1"));
2982 auto* transaction
= db
->BeginTransaction(WriteOptions());
2983 ASSERT_OK(transaction
->SetName("txn"));
2984 ASSERT_OK(transaction
->Put("foo", "v2"));
2985 ASSERT_OK(transaction
->Prepare());
2986 VerifyKeys({{"foo", "v1"}});
2988 ASSERT_OK(db
->Put(WriteOptions(), "a", "va"));
2989 ASSERT_OK(db
->Put(WriteOptions(), "z", "vz"));
2991 ASSERT_OK(transaction
->Commit());
2992 VerifyKeys({{"foo", "v2"}});
2997 TEST_P(WritePreparedTransactionTest
, IteratorRefreshNotSupported
) {
2998 Iterator
* iter
= db
->NewIterator(ReadOptions());
2999 ASSERT_TRUE(iter
->Refresh().IsNotSupported());
3003 // Committing an delayed prepared has two non-atomic steps: update commit cache,
3004 // remove seq from delayed_prepared_. The read in IsInSnapshot also involves two
3005 // non-atomic steps of checking these two data structures. This test breaks each
3006 // in the middle to ensure correctness in spite of non-atomic execution.
3007 // Note: This test is limitted to the case where snapshot is larger than the
3008 // max_evicted_seq_.
3009 TEST_P(WritePreparedTransactionTest
, NonAtomicCommitOfDelayedPrepared
) {
3010 const size_t snapshot_cache_bits
= 7; // same as default
3011 const size_t commit_cache_bits
= 3; // 8 entries
3012 for (auto split_read
: {true, false}) {
3013 std::vector
<bool> split_options
= {false};
3015 // Also test for break before mutex
3016 split_options
.push_back(true);
3018 for (auto split_before_mutex
: split_options
) {
3019 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
3021 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
3022 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
3023 // Fill up the commit cache
3024 std::string
init_value("value1");
3025 for (int i
= 0; i
< 10; i
++) {
3026 db
->Put(WriteOptions(), Slice("key1"), Slice(init_value
));
3028 // Prepare a transaction but do not commit it
3030 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3031 ASSERT_OK(txn
->SetName("xid"));
3032 ASSERT_OK(txn
->Put(Slice("key1"), Slice("value2")));
3033 ASSERT_OK(txn
->Prepare());
3034 // Commit a bunch of entries to advance max evicted seq and make the
3035 // prepared a delayed prepared
3036 for (int i
= 0; i
< 10; i
++) {
3037 db
->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3039 // The snapshot should not see the delayed prepared entry
3040 auto snap
= db
->GetSnapshot();
3043 if (split_before_mutex
) {
3044 // split before acquiring prepare_mutex_
3045 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3046 {{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause",
3047 "AtomicCommitOfDelayedPrepared:Commit:before"},
3048 {"AtomicCommitOfDelayedPrepared:Commit:after",
3049 "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}});
3051 // split right after reading from the commit cache
3052 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3053 {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
3054 "AtomicCommitOfDelayedPrepared:Commit:before"},
3055 {"AtomicCommitOfDelayedPrepared:Commit:after",
3056 "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
3058 } else { // split commit
3059 // split right before removing from delayed_prepared_
3060 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3061 {{"WritePreparedTxnDB::RemovePrepared:pause",
3062 "AtomicCommitOfDelayedPrepared:Read:before"},
3063 {"AtomicCommitOfDelayedPrepared:Read:after",
3064 "WritePreparedTxnDB::RemovePrepared:resume"}});
3066 SyncPoint::GetInstance()->EnableProcessing();
3068 ROCKSDB_NAMESPACE::port::Thread
commit_thread([&]() {
3069 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before");
3070 ASSERT_OK(txn
->Commit());
3071 if (split_before_mutex
) {
3072 // Do bunch of inserts to evict the commit entry from the cache. This
3073 // would prevent the 2nd look into commit cache under prepare_mutex_
3074 // to see the commit entry.
3075 auto seq
= db_impl
->TEST_GetLastVisibleSequence();
3077 while (wp_db
->max_evicted_seq_
< seq
&& tries
< 50) {
3078 db
->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3081 ASSERT_LT(tries
, 50);
3083 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after");
3087 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
3088 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before");
3089 ReadOptions roptions
;
3090 roptions
.snapshot
= snap
;
3091 PinnableSlice value
;
3092 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key1", &value
);
3094 // It should not see the commit of delayed prepared
3095 ASSERT_TRUE(value
== init_value
);
3096 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after");
3097 db
->ReleaseSnapshot(snap
);
3101 commit_thread
.join();
3102 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3103 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3104 } // for split_before_mutex
3108 // When max evicted seq advances a prepared seq, it involves two updates: i)
3109 // adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_.
3110 // ::IsInSnapshot also reads these two values in a non-atomic way. This test
3111 // ensures correctness if the update occurs after ::IsInSnapshot reads
3112 // delayed_prepared_empty_ and before it reads max_evicted_seq_.
3113 // Note: this test focuses on read snapshot larger than max_evicted_seq_.
3114 TEST_P(WritePreparedTransactionTest
, NonAtomicUpdateOfDelayedPrepared
) {
3115 const size_t snapshot_cache_bits
= 7; // same as default
3116 const size_t commit_cache_bits
= 3; // 8 entries
3117 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
3119 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
3120 // Fill up the commit cache
3121 std::string
init_value("value1");
3122 for (int i
= 0; i
< 10; i
++) {
3123 db
->Put(WriteOptions(), Slice("key1"), Slice(init_value
));
3125 // Prepare a transaction but do not commit it
3126 Transaction
* txn
= db
->BeginTransaction(WriteOptions(), TransactionOptions());
3127 ASSERT_OK(txn
->SetName("xid"));
3128 ASSERT_OK(txn
->Put(Slice("key1"), Slice("value2")));
3129 ASSERT_OK(txn
->Prepare());
3130 // Create a gap between prepare seq and snapshot seq
3131 db
->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3132 db
->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3133 // The snapshot should not see the delayed prepared entry
3134 auto snap
= db
->GetSnapshot();
3135 ASSERT_LT(txn
->GetId(), snap
->GetSequenceNumber());
3137 // split right after reading delayed_prepared_empty_
3138 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3139 {{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause",
3140 "AtomicUpdateOfDelayedPrepared:before"},
3141 {"AtomicUpdateOfDelayedPrepared:after",
3142 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}});
3143 SyncPoint::GetInstance()->EnableProcessing();
3145 ROCKSDB_NAMESPACE::port::Thread
commit_thread([&]() {
3146 TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before");
3147 // Commit a bunch of entries to advance max evicted seq and make the
3148 // prepared a delayed prepared
3150 while (wp_db
->max_evicted_seq_
< txn
->GetId() && tries
< 50) {
3151 db
->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3154 ASSERT_LT(tries
, 50);
3155 // This is the case on which the test focuses
3156 ASSERT_LT(wp_db
->max_evicted_seq_
, snap
->GetSequenceNumber());
3157 TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after");
3160 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
3161 ReadOptions roptions
;
3162 roptions
.snapshot
= snap
;
3163 PinnableSlice value
;
3164 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key1", &value
);
3166 // It should not see the uncommitted value of delayed prepared
3167 ASSERT_TRUE(value
== init_value
);
3168 db
->ReleaseSnapshot(snap
);
3172 commit_thread
.join();
3173 ASSERT_OK(txn
->Commit());
3175 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3176 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3179 // Eviction from commit cache and update of max evicted seq are two non-atomic
3180 // steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading
3181 // from commit cache are two non-atomic steps. This tests if the update occurs
3182 // after reading max_evicted_seq_ and before reading the commit cache.
3183 // Note: the test focuses on snapshot larger than max_evicted_seq_
3184 TEST_P(WritePreparedTransactionTest
, NonAtomicUpdateOfMaxEvictedSeq
) {
3185 const size_t snapshot_cache_bits
= 7; // same as default
3186 const size_t commit_cache_bits
= 3; // 8 entries
3187 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
3189 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
3190 // Fill up the commit cache
3191 std::string
init_value("value1");
3192 std::string
last_value("value_final");
3193 for (int i
= 0; i
< 10; i
++) {
3194 db
->Put(WriteOptions(), Slice("key1"), Slice(init_value
));
3196 // Do an uncommitted write to prevent min_uncommitted optimization
3198 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3199 ASSERT_OK(txn1
->SetName("xid1"));
3200 ASSERT_OK(txn1
->Put(Slice("key0"), last_value
));
3201 ASSERT_OK(txn1
->Prepare());
3202 // Do a write with prepare to get the prepare seq
3203 Transaction
* txn
= db
->BeginTransaction(WriteOptions(), TransactionOptions());
3204 ASSERT_OK(txn
->SetName("xid"));
3205 ASSERT_OK(txn
->Put(Slice("key1"), last_value
));
3206 ASSERT_OK(txn
->Prepare());
3207 ASSERT_OK(txn
->Commit());
3208 // Create a gap between commit entry and snapshot seq
3209 db
->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3210 db
->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3211 // The snapshot should see the last commit
3212 auto snap
= db
->GetSnapshot();
3213 ASSERT_LE(txn
->GetId(), snap
->GetSequenceNumber());
3215 // split right after reading max_evicted_seq_
3216 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3217 {{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause",
3218 "NonAtomicUpdateOfMaxEvictedSeq:before"},
3219 {"NonAtomicUpdateOfMaxEvictedSeq:after",
3220 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}});
3221 SyncPoint::GetInstance()->EnableProcessing();
3223 ROCKSDB_NAMESPACE::port::Thread
commit_thread([&]() {
3224 TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before");
3225 // Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
3227 while (wp_db
->max_evicted_seq_
< txn
->GetId() && tries
< 50) {
3228 db
->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3231 ASSERT_LT(tries
, 50);
3232 // This is the case on which the test focuses
3233 ASSERT_LT(wp_db
->max_evicted_seq_
, snap
->GetSequenceNumber());
3234 TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after");
3237 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
3238 ReadOptions roptions
;
3239 roptions
.snapshot
= snap
;
3240 PinnableSlice value
;
3241 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key1", &value
);
3243 // It should see the committed value of the evicted entry
3244 ASSERT_TRUE(value
== last_value
);
3245 db
->ReleaseSnapshot(snap
);
3249 commit_thread
.join();
3253 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3254 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3257 // Test when we add a prepared seq when the max_evicted_seq_ already goes beyond
3258 // that. The test focuses on a race condition between AddPrepared and
3259 // AdvanceMaxEvictedSeq functions.
3260 TEST_P(WritePreparedTransactionTest
, AddPreparedBeforeMax
) {
3261 if (!options
.two_write_queues
) {
3262 // This test is only for two write queues
3265 const size_t snapshot_cache_bits
= 7; // same as default
3266 // 1 entry to advance max after the 2nd commit
3267 const size_t commit_cache_bits
= 0;
3268 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
3270 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
3271 std::string
some_value("value_some");
3272 std::string
uncommitted_value("value_uncommitted");
3273 // Prepare two uncommitted transactions
3275 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3276 ASSERT_OK(txn1
->SetName("xid1"));
3277 ASSERT_OK(txn1
->Put(Slice("key1"), some_value
));
3278 ASSERT_OK(txn1
->Prepare());
3280 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3281 ASSERT_OK(txn2
->SetName("xid2"));
3282 ASSERT_OK(txn2
->Put(Slice("key2"), some_value
));
3283 ASSERT_OK(txn2
->Prepare());
3284 // Start the txn here so the other thread could get its id
3285 Transaction
* txn
= db
->BeginTransaction(WriteOptions(), TransactionOptions());
3286 ASSERT_OK(txn
->SetName("xid"));
3287 ASSERT_OK(txn
->Put(Slice("key0"), uncommitted_value
));
3288 port::Mutex txn_mutex_
;
3290 // t1) Insert prepared entry, t2) commit other entries to advance max
3291 // evicted sec and finish checking the existing prepared entries, t1)
3292 // AddPrepared, t2) update max_evicted_seq_
3293 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3294 {"AddPreparedCallback::AddPrepared::begin:pause",
3295 "AddPreparedBeforeMax::read_thread:start"},
3296 {"AdvanceMaxEvictedSeq::update_max:pause",
3297 "AddPreparedCallback::AddPrepared::begin:resume"},
3298 {"AddPreparedCallback::AddPrepared::end",
3299 "AdvanceMaxEvictedSeq::update_max:resume"},
3301 SyncPoint::GetInstance()->EnableProcessing();
3303 ROCKSDB_NAMESPACE::port::Thread
write_thread([&]() {
3305 ASSERT_OK(txn
->Prepare());
3306 txn_mutex_
.Unlock();
3309 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
3310 TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start");
3311 // Publish seq number with a commit
3312 ASSERT_OK(txn1
->Commit());
3313 // Since the commit cache size is one the 2nd commit evict the 1st one and
3314 // invokes AdcanceMaxEvictedSeq
3315 ASSERT_OK(txn2
->Commit());
3317 ReadOptions roptions
;
3318 PinnableSlice value
;
3319 // The snapshot should not see the uncommitted value from write_thread
3320 auto snap
= db
->GetSnapshot();
3321 ASSERT_LT(wp_db
->max_evicted_seq_
, snap
->GetSequenceNumber());
3322 // This is the scenario that we test for
3324 ASSERT_GT(wp_db
->max_evicted_seq_
, txn
->GetId());
3325 txn_mutex_
.Unlock();
3326 roptions
.snapshot
= snap
;
3327 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key0", &value
);
3328 ASSERT_TRUE(s
.IsNotFound());
3329 db
->ReleaseSnapshot(snap
);
3333 write_thread
.join();
3336 ASSERT_OK(txn
->Commit());
3338 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3339 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3342 // When an old prepared entry gets committed, there is a gap between the time
3343 // that it is published and when it is cleaned up from old_prepared_. This test
3344 // stresses such cases.
3345 TEST_P(WritePreparedTransactionTest
, CommitOfDelayedPrepared
) {
3346 const size_t snapshot_cache_bits
= 7; // same as default
3347 for (const size_t commit_cache_bits
: {0, 2, 3}) {
3348 for (const size_t sub_batch_cnt
: {1, 2, 3}) {
3349 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
3351 std::atomic
<const Snapshot
*> snap
= {nullptr};
3352 std::atomic
<SequenceNumber
> exp_prepare
= {0};
3353 ROCKSDB_NAMESPACE::port::Thread callback_thread
;
3354 // Value is synchronized via snap
3355 PinnableSlice value
;
3356 // Take a snapshot after publish and before RemovePrepared:Start
3357 auto snap_callback
= [&]() {
3358 ASSERT_EQ(nullptr, snap
.load());
3359 snap
.store(db
->GetSnapshot());
3360 ReadOptions roptions
;
3361 roptions
.snapshot
= snap
.load();
3362 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key2", &value
);
3365 auto callback
= [&](void* param
) {
3366 SequenceNumber prep_seq
= *((SequenceNumber
*)param
);
3367 if (prep_seq
== exp_prepare
.load()) { // only for write_thread
3368 // We need to spawn a thread to avoid deadlock since getting a
3369 // snpashot might end up calling AdvanceSeqByOne which needs joining
3371 callback_thread
= ROCKSDB_NAMESPACE::port::Thread(snap_callback
);
3372 TEST_SYNC_POINT("callback:end");
3375 // Wait for the first snapshot be taken in GetSnapshotInternal. Although
3376 // it might be updated before GetSnapshotInternal finishes but this should
3377 // cover most of the cases.
3378 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3379 {"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"},
3381 SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback
);
3382 SyncPoint::GetInstance()->EnableProcessing();
3383 // Thread to cause frequent evictions
3384 ROCKSDB_NAMESPACE::port::Thread
eviction_thread([&]() {
3385 // Too many txns might cause commit_seq - prepare_seq in another thread
3386 // to go beyond DELTA_UPPERBOUND
3387 for (int i
= 0; i
< 25 * (1 << commit_cache_bits
); i
++) {
3388 db
->Put(WriteOptions(), Slice("key1"), Slice("value1"));
3391 ROCKSDB_NAMESPACE::port::Thread
write_thread([&]() {
3392 for (int i
= 0; i
< 25 * (1 << commit_cache_bits
); i
++) {
3394 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3395 ASSERT_OK(txn
->SetName("xid"));
3396 std::string val_str
= "value" + ToString(i
);
3397 for (size_t b
= 0; b
< sub_batch_cnt
; b
++) {
3398 ASSERT_OK(txn
->Put(Slice("key2"), val_str
));
3400 ASSERT_OK(txn
->Prepare());
3401 // Let an eviction to kick in
3402 std::this_thread::yield();
3404 exp_prepare
.store(txn
->GetId());
3405 ASSERT_OK(txn
->Commit());
3407 // Wait for the snapshot taking that is triggered by
3408 // RemovePrepared:Start callback
3409 callback_thread
.join();
3411 // Read with the snapshot taken before delayed_prepared_ cleanup
3412 ReadOptions roptions
;
3413 roptions
.snapshot
= snap
.load();
3414 ASSERT_NE(nullptr, roptions
.snapshot
);
3415 PinnableSlice value2
;
3417 db
->Get(roptions
, db
->DefaultColumnFamily(), "key2", &value2
);
3419 // It should see its own write
3420 ASSERT_TRUE(val_str
== value2
);
3421 // The value read by snapshot should not change
3422 ASSERT_STREQ(value2
.ToString().c_str(), value
.ToString().c_str());
3424 db
->ReleaseSnapshot(roptions
.snapshot
);
3425 snap
.store(nullptr);
3428 write_thread
.join();
3429 eviction_thread
.join();
3430 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3431 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3436 // Test that updating the commit map will not affect the existing snapshots
3437 TEST_P(WritePreparedTransactionTest
, AtomicCommit
) {
3438 for (bool skip_prepare
: {true, false}) {
3439 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3440 {"WritePreparedTxnDB::AddCommitted:start",
3441 "AtomicCommit::GetSnapshot:start"},
3442 {"AtomicCommit::Get:end",
3443 "WritePreparedTxnDB::AddCommitted:start:pause"},
3444 {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
3445 {"AtomicCommit::Get2:end",
3446 "WritePreparedTxnDB::AddCommitted:end:pause:"},
3448 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3449 ROCKSDB_NAMESPACE::port::Thread
write_thread([&]() {
3451 db
->Put(WriteOptions(), Slice("key"), Slice("value"));
3454 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3455 ASSERT_OK(txn
->SetName("xid"));
3456 ASSERT_OK(txn
->Put(Slice("key"), Slice("value")));
3457 ASSERT_OK(txn
->Prepare());
3458 ASSERT_OK(txn
->Commit());
3462 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
3463 ReadOptions roptions
;
3464 TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
3465 roptions
.snapshot
= db
->GetSnapshot();
3467 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key", &val
);
3468 TEST_SYNC_POINT("AtomicCommit::Get:end");
3469 TEST_SYNC_POINT("AtomicCommit::Get2:start");
3470 ASSERT_SAME(roptions
, db
, s
, val
, "key");
3471 TEST_SYNC_POINT("AtomicCommit::Get2:end");
3472 db
->ReleaseSnapshot(roptions
.snapshot
);
3475 write_thread
.join();
3476 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3480 // Test that we can change write policy from WriteCommitted to WritePrepared
3481 // after a clean shutdown (which would empty the WAL)
3482 TEST_P(WritePreparedTransactionTest
, WP_WC_DBBackwardCompatibility
) {
3483 bool empty_wal
= true;
3484 CrossCompatibilityTest(WRITE_COMMITTED
, WRITE_PREPARED
, empty_wal
);
3487 // Test that we fail fast if WAL is not emptied between changing the write
3488 // policy from WriteCommitted to WritePrepared
3489 TEST_P(WritePreparedTransactionTest
, WP_WC_WALBackwardIncompatibility
) {
3490 bool empty_wal
= true;
3491 CrossCompatibilityTest(WRITE_COMMITTED
, WRITE_PREPARED
, !empty_wal
);
3494 // Test that we can change write policy from WritePrepare back to WriteCommitted
3495 // after a clean shutdown (which would empty the WAL)
3496 TEST_P(WritePreparedTransactionTest
, WC_WP_ForwardCompatibility
) {
3497 bool empty_wal
= true;
3498 CrossCompatibilityTest(WRITE_PREPARED
, WRITE_COMMITTED
, empty_wal
);
3501 // Test that we fail fast if WAL is not emptied between changing the write
3502 // policy from WriteCommitted to WritePrepared
3503 TEST_P(WritePreparedTransactionTest
, WC_WP_WALForwardIncompatibility
) {
3504 bool empty_wal
= true;
3505 CrossCompatibilityTest(WRITE_PREPARED
, WRITE_COMMITTED
, !empty_wal
);
3508 } // namespace ROCKSDB_NAMESPACE
3510 int main(int argc
, char** argv
) {
3511 ::testing::InitGoogleTest(&argc
, argv
);
3512 return RUN_ALL_TESTS();
3518 int main(int /*argc*/, char** /*argv*/) {
3520 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
3524 #endif // ROCKSDB_LITE