1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
12 #include "utilities/transactions/transaction_test.h"
20 #include "db/db_impl.h"
21 #include "db/dbformat.h"
22 #include "rocksdb/db.h"
23 #include "rocksdb/options.h"
24 #include "rocksdb/types.h"
25 #include "rocksdb/utilities/debug.h"
26 #include "rocksdb/utilities/transaction.h"
27 #include "rocksdb/utilities/transaction_db.h"
28 #include "table/mock_table.h"
29 #include "util/fault_injection_test_env.h"
30 #include "util/random.h"
31 #include "util/string_util.h"
32 #include "util/sync_point.h"
33 #include "util/testharness.h"
34 #include "util/testutil.h"
35 #include "util/transaction_test_util.h"
36 #include "utilities/merge_operators.h"
37 #include "utilities/merge_operators/string_append/stringappend.h"
38 #include "utilities/transactions/pessimistic_transaction_db.h"
39 #include "utilities/transactions/write_prepared_txn_db.h"
41 #include "port/port.h"
47 using CommitEntry
= WritePreparedTxnDB::CommitEntry
;
48 using CommitEntry64b
= WritePreparedTxnDB::CommitEntry64b
;
49 using CommitEntry64bFormat
= WritePreparedTxnDB::CommitEntry64bFormat
;
51 TEST(PreparedHeap
, BasicsTest
) {
52 WritePreparedTxnDB::PreparedHeap heap
;
54 // Test with one element
55 ASSERT_EQ(14l, heap
.top());
58 // Test that old min is still on top
59 ASSERT_EQ(14l, heap
.top());
61 // Test that the new min will be on top
62 ASSERT_EQ(13l, heap
.top());
63 // Test that it is persistent
64 ASSERT_EQ(13l, heap
.top());
70 // Test that old min is still on top
71 ASSERT_EQ(13l, heap
.top());
73 // Test that old min is still on top
74 ASSERT_EQ(13l, heap
.top());
76 // Test that old min is still on top
77 ASSERT_EQ(13l, heap
.top());
79 // Test that the new comes to the top after multiple erase
80 ASSERT_EQ(34l, heap
.top());
82 // Test that the new comes to the top after single erase
83 ASSERT_EQ(44l, heap
.top());
85 ASSERT_EQ(44l, heap
.top());
86 heap
.pop(); // pop 44l
87 // Test that the erased items are ignored after pop
88 ASSERT_EQ(64l, heap
.top());
90 // Test that erasing an already popped item would work
91 ASSERT_EQ(64l, heap
.top());
93 ASSERT_EQ(64l, heap
.top());
104 // Test top remains the same after a random order of many erases
105 ASSERT_EQ(64l, heap
.top());
107 // Test that pop works with a series of random pending erases
108 ASSERT_EQ(74l, heap
.top());
109 ASSERT_FALSE(heap
.empty());
111 // Test that empty works
112 ASSERT_TRUE(heap
.empty());
115 // This is a scenario reconstructed from a buggy trace. Test that the bug does
116 // not resurface again.
117 TEST(PreparedHeap
, EmptyAtTheEnd
) {
118 WritePreparedTxnDB::PreparedHeap heap
;
120 ASSERT_EQ(40l, heap
.top());
121 // Although not a recommended scenario, we must be resilient against erase
122 // without a prior push.
124 ASSERT_EQ(40l, heap
.top());
126 ASSERT_EQ(40l, heap
.top());
129 ASSERT_EQ(40l, heap
.top());
131 ASSERT_TRUE(heap
.empty());
134 ASSERT_EQ(40l, heap
.top());
136 ASSERT_EQ(40l, heap
.top());
138 ASSERT_EQ(40l, heap
.top());
141 // Test that the erase has not emptied the heap (we had a bug doing that)
142 ASSERT_FALSE(heap
.empty());
143 ASSERT_EQ(60l, heap
.top());
145 ASSERT_TRUE(heap
.empty());
148 // Generate random order of PreparedHeap access and test that the heap will be
149 // successfully emptied at the end.
150 TEST(PreparedHeap
, Concurrent
) {
151 const size_t t_cnt
= 10;
152 rocksdb::port::Thread t
[t_cnt
];
154 WritePreparedTxnDB::PreparedHeap heap
;
155 port::RWMutex prepared_mutex
;
157 for (size_t n
= 0; n
< 100; n
++) {
158 for (size_t i
= 0; i
< t_cnt
; i
++) {
159 // This is not recommended usage but we should be resilient against it.
160 bool skip_push
= rnd
.OneIn(5);
161 t
[i
] = rocksdb::port::Thread([&heap
, &prepared_mutex
, skip_push
, i
]() {
163 std::this_thread::yield();
165 WriteLock
wl(&prepared_mutex
);
168 std::this_thread::yield();
170 WriteLock
wl(&prepared_mutex
);
175 for (size_t i
= 0; i
< t_cnt
; i
++) {
178 ASSERT_TRUE(heap
.empty());
182 // Test that WriteBatchWithIndex correctly counts the number of sub-batches
183 TEST(WriteBatchWithIndex
, SubBatchCnt
) {
184 ColumnFamilyOptions cf_options
;
185 std::string cf_name
= "two";
188 options
.create_if_missing
= true;
189 const std::string dbname
= test::PerThreadDBPath("transaction_testdb");
190 DestroyDB(dbname
, options
);
191 ASSERT_OK(DB::Open(options
, dbname
, &db
));
192 ColumnFamilyHandle
* cf_handle
= nullptr;
193 ASSERT_OK(db
->CreateColumnFamily(cf_options
, cf_name
, &cf_handle
));
194 WriteOptions write_options
;
195 size_t batch_cnt
= 1;
196 size_t save_points
= 0;
197 std::vector
<size_t> batch_cnt_at
;
198 WriteBatchWithIndex
batch(db
->DefaultColumnFamily()->GetComparator(), 0, true,
200 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
201 batch_cnt_at
.push_back(batch_cnt
);
202 batch
.SetSavePoint();
204 batch
.Put(Slice("key"), Slice("value"));
205 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
206 batch_cnt_at
.push_back(batch_cnt
);
207 batch
.SetSavePoint();
209 batch
.Put(Slice("key2"), Slice("value2"));
210 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
211 // duplicate the keys
212 batch_cnt_at
.push_back(batch_cnt
);
213 batch
.SetSavePoint();
215 batch
.Put(Slice("key"), Slice("value3"));
217 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
218 // duplicate the 2nd key. It should not be counted duplicate since a
219 // sub-patch is cut after the last duplicate.
220 batch_cnt_at
.push_back(batch_cnt
);
221 batch
.SetSavePoint();
223 batch
.Put(Slice("key2"), Slice("value4"));
224 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
225 // duplicate the keys but in a different cf. It should not be counted as
227 batch_cnt_at
.push_back(batch_cnt
);
228 batch
.SetSavePoint();
230 batch
.Put(cf_handle
, Slice("key"), Slice("value5"));
231 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
233 // Test that the number of sub-batches matches what we count with
235 std::map
<uint32_t, const Comparator
*> comparators
;
236 comparators
[0] = db
->DefaultColumnFamily()->GetComparator();
237 comparators
[cf_handle
->GetID()] = cf_handle
->GetComparator();
238 SubBatchCounter
counter(comparators
);
239 ASSERT_OK(batch
.GetWriteBatch()->Iterate(&counter
));
240 ASSERT_EQ(batch_cnt
, counter
.BatchCount());
242 // Test that RollbackToSavePoint will properly resets the number of
244 for (size_t i
= save_points
; i
> 0; i
--) {
245 batch
.RollbackToSavePoint();
246 ASSERT_EQ(batch_cnt_at
[i
- 1], batch
.SubBatchCnt());
249 // Test the count is right with random batches
251 const size_t TOTAL_KEYS
= 20; // 20 ~= 10 to cause a few randoms
253 std::string keys
[TOTAL_KEYS
];
254 for (size_t k
= 0; k
< TOTAL_KEYS
; k
++) {
255 int len
= static_cast<int>(rnd
.Uniform(50));
256 keys
[k
] = test::RandomKey(&rnd
, len
);
258 for (size_t i
= 0; i
< 1000; i
++) { // 1000 random batches
259 WriteBatchWithIndex
rndbatch(db
->DefaultColumnFamily()->GetComparator(),
261 for (size_t k
= 0; k
< 10; k
++) { // 10 key per batch
262 size_t ki
= static_cast<size_t>(rnd
.Uniform(TOTAL_KEYS
));
263 Slice key
= Slice(keys
[ki
]);
265 Slice value
= Slice(test::RandomString(&rnd
, 16, &buffer
));
266 rndbatch
.Put(key
, value
);
268 SubBatchCounter
batch_counter(comparators
);
269 ASSERT_OK(rndbatch
.GetWriteBatch()->Iterate(&batch_counter
));
270 ASSERT_EQ(rndbatch
.SubBatchCnt(), batch_counter
.BatchCount());
278 TEST(CommitEntry64b
, BasicTest
) {
279 const size_t INDEX_BITS
= static_cast<size_t>(21);
280 const size_t INDEX_SIZE
= static_cast<size_t>(1ull << INDEX_BITS
);
281 const CommitEntry64bFormat
FORMAT(static_cast<size_t>(INDEX_BITS
));
283 // zero-initialized CommitEntry64b should indicate an empty entry
284 CommitEntry64b empty_entry64b
;
285 uint64_t empty_index
= 11ul;
286 CommitEntry empty_entry
;
287 bool ok
= empty_entry64b
.Parse(empty_index
, &empty_entry
, FORMAT
);
290 // the zero entry is reserved for un-initialized entries
291 const size_t MAX_COMMIT
= (1 << FORMAT
.COMMIT_BITS
) - 1 - 1;
292 // Samples over the numbers that are covered by that many index bits
293 std::array
<uint64_t, 4> is
= {{0, 1, INDEX_SIZE
/ 2 + 1, INDEX_SIZE
- 1}};
294 // Samples over the numbers that are covered by that many commit bits
295 std::array
<uint64_t, 4> ds
= {{0, 1, MAX_COMMIT
/ 2 + 1, MAX_COMMIT
}};
296 // Iterate over prepare numbers that have i) cover all bits of a sequence
297 // number, and ii) include some bits that fall into the range of index or
299 for (uint64_t base
= 1; base
< kMaxSequenceNumber
; base
*= 2) {
300 for (uint64_t i
: is
) {
301 for (uint64_t d
: ds
) {
302 uint64_t p
= base
+ i
+ d
;
303 for (uint64_t c
: {p
, p
+ d
/ 2, p
+ d
}) {
304 uint64_t index
= p
% INDEX_SIZE
;
305 CommitEntry
before(p
, c
), after
;
306 CommitEntry64b
entry64b(before
, FORMAT
);
307 ok
= entry64b
.Parse(index
, &after
, FORMAT
);
309 if (!(before
== after
)) {
310 printf("base %" PRIu64
" i %" PRIu64
" d %" PRIu64
" p %" PRIu64
311 " c %" PRIu64
" index %" PRIu64
"\n",
312 base
, i
, d
, p
, c
, index
);
314 ASSERT_EQ(before
, after
);
321 class WritePreparedTxnDBMock
: public WritePreparedTxnDB
{
323 WritePreparedTxnDBMock(DBImpl
* db_impl
, TransactionDBOptions
& opt
)
324 : WritePreparedTxnDB(db_impl
, opt
) {}
325 WritePreparedTxnDBMock(DBImpl
* db_impl
, TransactionDBOptions
& opt
,
326 size_t snapshot_cache_size
)
327 : WritePreparedTxnDB(db_impl
, opt
, snapshot_cache_size
) {}
328 WritePreparedTxnDBMock(DBImpl
* db_impl
, TransactionDBOptions
& opt
,
329 size_t snapshot_cache_size
, size_t commit_cache_size
)
330 : WritePreparedTxnDB(db_impl
, opt
, snapshot_cache_size
,
331 commit_cache_size
) {}
332 void SetDBSnapshots(const std::vector
<SequenceNumber
>& snapshots
) {
333 snapshots_
= snapshots
;
335 void TakeSnapshot(SequenceNumber seq
) { snapshots_
.push_back(seq
); }
338 virtual const std::vector
<SequenceNumber
> GetSnapshotListFromDB(
339 SequenceNumber
/* unused */) override
{
344 std::vector
<SequenceNumber
> snapshots_
;
347 class WritePreparedTransactionTestBase
: public TransactionTestBase
{
349 WritePreparedTransactionTestBase(bool use_stackable_db
, bool two_write_queue
,
350 TxnDBWritePolicy write_policy
)
351 : TransactionTestBase(use_stackable_db
, two_write_queue
, write_policy
){};
354 // If expect_update is set, check if it actually updated old_commit_map_. If
355 // it did not and yet suggested not to check the next snapshot, do the
356 // opposite to check if it was not a bad suggestion.
357 void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare
, uint64_t commit
,
359 uint64_t next_snapshot
,
360 bool expect_update
) {
361 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
362 // reset old_commit_map_empty_ so that its value indicate whether
363 // old_commit_map_ was updated
364 wp_db
->old_commit_map_empty_
= true;
365 bool check_next
= wp_db
->MaybeUpdateOldCommitMap(prepare
, commit
, snapshot
,
366 snapshot
< next_snapshot
);
367 if (expect_update
== wp_db
->old_commit_map_empty_
) {
368 printf("prepare: %" PRIu64
" commit: %" PRIu64
" snapshot: %" PRIu64
369 " next: %" PRIu64
"\n",
370 prepare
, commit
, snapshot
, next_snapshot
);
372 EXPECT_EQ(!expect_update
, wp_db
->old_commit_map_empty_
);
373 if (!check_next
&& wp_db
->old_commit_map_empty_
) {
374 // do the opposite to make sure it was not a bad suggestion
375 const bool dont_care_bool
= true;
376 wp_db
->MaybeUpdateOldCommitMap(prepare
, commit
, next_snapshot
,
378 if (!wp_db
->old_commit_map_empty_
) {
379 printf("prepare: %" PRIu64
" commit: %" PRIu64
" snapshot: %" PRIu64
380 " next: %" PRIu64
"\n",
381 prepare
, commit
, snapshot
, next_snapshot
);
383 EXPECT_TRUE(wp_db
->old_commit_map_empty_
);
387 // Test that a CheckAgainstSnapshots thread reading old_snapshots will not
388 // miss a snapshot because of a concurrent update by UpdateSnapshots that is
389 // writing new_snapshots. Both threads are broken at two points. The sync
390 // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
391 // entry is expected to be vital for one of the snapshots that is common
392 // between the old and new list of snapshots.
393 void SnapshotConcurrentAccessTestInternal(
394 WritePreparedTxnDB
* wp_db
,
395 const std::vector
<SequenceNumber
>& old_snapshots
,
396 const std::vector
<SequenceNumber
>& new_snapshots
, CommitEntry
& entry
,
397 SequenceNumber
& version
, size_t a1
, size_t a2
, size_t b1
, size_t b2
) {
398 // First reset the snapshot list
399 const std::vector
<SequenceNumber
> empty_snapshots
;
400 wp_db
->old_commit_map_empty_
= true;
401 wp_db
->UpdateSnapshots(empty_snapshots
, ++version
);
402 // Then initialize it with the old_snapshots
403 wp_db
->UpdateSnapshots(old_snapshots
, ++version
);
405 // Starting from the first thread, cut each thread at two points
406 rocksdb::SyncPoint::GetInstance()->LoadDependency({
407 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1
),
408 "WritePreparedTxnDB::UpdateSnapshots:s:start"},
409 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1
),
410 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1
)},
411 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2
),
412 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1
)},
413 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2
),
414 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2
)},
415 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
416 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2
)},
418 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
420 ASSERT_TRUE(wp_db
->old_commit_map_empty_
);
421 rocksdb::port::Thread
t1(
422 [&]() { wp_db
->UpdateSnapshots(new_snapshots
, version
); });
423 rocksdb::port::Thread
t2([&]() { wp_db
->CheckAgainstSnapshots(entry
); });
426 ASSERT_FALSE(wp_db
->old_commit_map_empty_
);
428 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
430 wp_db
->old_commit_map_empty_
= true;
431 wp_db
->UpdateSnapshots(empty_snapshots
, ++version
);
432 wp_db
->UpdateSnapshots(old_snapshots
, ++version
);
433 // Starting from the second thread, cut each thread at two points
434 rocksdb::SyncPoint::GetInstance()->LoadDependency({
435 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1
),
436 "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
437 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1
),
438 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1
)},
439 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2
),
440 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1
)},
441 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2
),
442 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2
)},
443 {"WritePreparedTxnDB::UpdateSnapshots:p:end",
444 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2
)},
446 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
448 ASSERT_TRUE(wp_db
->old_commit_map_empty_
);
449 rocksdb::port::Thread
t1(
450 [&]() { wp_db
->UpdateSnapshots(new_snapshots
, version
); });
451 rocksdb::port::Thread
t2([&]() { wp_db
->CheckAgainstSnapshots(entry
); });
454 ASSERT_FALSE(wp_db
->old_commit_map_empty_
);
456 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
459 // Verify value of keys.
460 void VerifyKeys(const std::unordered_map
<std::string
, std::string
>& data
,
461 const Snapshot
* snapshot
= nullptr) {
463 ReadOptions read_options
;
464 read_options
.snapshot
= snapshot
;
465 for (auto& kv
: data
) {
466 auto s
= db
->Get(read_options
, kv
.first
, &value
);
467 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
469 if (kv
.second
!= value
) {
470 printf("key = %s\n", kv
.first
.c_str());
472 ASSERT_EQ(kv
.second
, value
);
474 ASSERT_EQ(kv
.second
, "NOT_FOUND");
477 // Try with MultiGet API too
478 std::vector
<std::string
> values
;
479 auto s_vec
= db
->MultiGet(read_options
, {db
->DefaultColumnFamily()},
480 {kv
.first
}, &values
);
481 ASSERT_EQ(1, values
.size());
482 ASSERT_EQ(1, s_vec
.size());
484 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
486 ASSERT_TRUE(kv
.second
== values
[0]);
488 ASSERT_EQ(kv
.second
, "NOT_FOUND");
493 // Verify all versions of keys.
494 void VerifyInternalKeys(const std::vector
<KeyVersion
>& expected_versions
) {
495 std::vector
<KeyVersion
> versions
;
496 const size_t kMaxKeys
= 100000;
497 ASSERT_OK(GetAllKeyVersions(db
, expected_versions
.front().user_key
,
498 expected_versions
.back().user_key
, kMaxKeys
,
500 ASSERT_EQ(expected_versions
.size(), versions
.size());
501 for (size_t i
= 0; i
< versions
.size(); i
++) {
502 ASSERT_EQ(expected_versions
[i
].user_key
, versions
[i
].user_key
);
503 ASSERT_EQ(expected_versions
[i
].sequence
, versions
[i
].sequence
);
504 ASSERT_EQ(expected_versions
[i
].type
, versions
[i
].type
);
505 if (versions
[i
].type
!= kTypeDeletion
&&
506 versions
[i
].type
!= kTypeSingleDeletion
) {
507 ASSERT_EQ(expected_versions
[i
].value
, versions
[i
].value
);
509 // Range delete not supported.
510 assert(expected_versions
[i
].type
!= kTypeRangeDeletion
);
515 class WritePreparedTransactionTest
516 : public WritePreparedTransactionTestBase
,
517 virtual public ::testing::WithParamInterface
<
518 std::tuple
<bool, bool, TxnDBWritePolicy
>> {
520 WritePreparedTransactionTest()
521 : WritePreparedTransactionTestBase(std::get
<0>(GetParam()),
522 std::get
<1>(GetParam()),
523 std::get
<2>(GetParam())){};
526 #ifndef ROCKSDB_VALGRIND_RUN
527 class SnapshotConcurrentAccessTest
528 : public WritePreparedTransactionTestBase
,
529 virtual public ::testing::WithParamInterface
<
530 std::tuple
<bool, bool, TxnDBWritePolicy
, size_t, size_t>> {
532 SnapshotConcurrentAccessTest()
533 : WritePreparedTransactionTestBase(std::get
<0>(GetParam()),
534 std::get
<1>(GetParam()),
535 std::get
<2>(GetParam())),
536 split_id_(std::get
<3>(GetParam())),
537 split_cnt_(std::get
<4>(GetParam())){};
540 // A test is split into split_cnt_ tests, each identified with split_id_ where
541 // 0 <= split_id_ < split_cnt_
545 #endif // ROCKSDB_VALGRIND_RUN
547 class SeqAdvanceConcurrentTest
548 : public WritePreparedTransactionTestBase
,
549 virtual public ::testing::WithParamInterface
<
550 std::tuple
<bool, bool, TxnDBWritePolicy
, size_t, size_t>> {
552 SeqAdvanceConcurrentTest()
553 : WritePreparedTransactionTestBase(std::get
<0>(GetParam()),
554 std::get
<1>(GetParam()),
555 std::get
<2>(GetParam())),
556 split_id_(std::get
<3>(GetParam())),
557 split_cnt_(std::get
<4>(GetParam())){};
560 // A test is split into split_cnt_ tests, each identified with split_id_ where
561 // 0 <= split_id_ < split_cnt_
566 INSTANTIATE_TEST_CASE_P(
567 WritePreparedTransactionTest
, WritePreparedTransactionTest
,
568 ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED
),
569 std::make_tuple(false, true, WRITE_PREPARED
)));
571 #ifndef ROCKSDB_VALGRIND_RUN
572 INSTANTIATE_TEST_CASE_P(
573 TwoWriteQueues
, SnapshotConcurrentAccessTest
,
574 ::testing::Values(std::make_tuple(false, true, WRITE_PREPARED
, 0, 20),
575 std::make_tuple(false, true, WRITE_PREPARED
, 1, 20),
576 std::make_tuple(false, true, WRITE_PREPARED
, 2, 20),
577 std::make_tuple(false, true, WRITE_PREPARED
, 3, 20),
578 std::make_tuple(false, true, WRITE_PREPARED
, 4, 20),
579 std::make_tuple(false, true, WRITE_PREPARED
, 5, 20),
580 std::make_tuple(false, true, WRITE_PREPARED
, 6, 20),
581 std::make_tuple(false, true, WRITE_PREPARED
, 7, 20),
582 std::make_tuple(false, true, WRITE_PREPARED
, 8, 20),
583 std::make_tuple(false, true, WRITE_PREPARED
, 9, 20),
584 std::make_tuple(false, true, WRITE_PREPARED
, 10, 20),
585 std::make_tuple(false, true, WRITE_PREPARED
, 11, 20),
586 std::make_tuple(false, true, WRITE_PREPARED
, 12, 20),
587 std::make_tuple(false, true, WRITE_PREPARED
, 13, 20),
588 std::make_tuple(false, true, WRITE_PREPARED
, 14, 20),
589 std::make_tuple(false, true, WRITE_PREPARED
, 15, 20),
590 std::make_tuple(false, true, WRITE_PREPARED
, 16, 20),
591 std::make_tuple(false, true, WRITE_PREPARED
, 17, 20),
592 std::make_tuple(false, true, WRITE_PREPARED
, 18, 20),
593 std::make_tuple(false, true, WRITE_PREPARED
, 19, 20)));
595 INSTANTIATE_TEST_CASE_P(
596 OneWriteQueue
, SnapshotConcurrentAccessTest
,
597 ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED
, 0, 20),
598 std::make_tuple(false, false, WRITE_PREPARED
, 1, 20),
599 std::make_tuple(false, false, WRITE_PREPARED
, 2, 20),
600 std::make_tuple(false, false, WRITE_PREPARED
, 3, 20),
601 std::make_tuple(false, false, WRITE_PREPARED
, 4, 20),
602 std::make_tuple(false, false, WRITE_PREPARED
, 5, 20),
603 std::make_tuple(false, false, WRITE_PREPARED
, 6, 20),
604 std::make_tuple(false, false, WRITE_PREPARED
, 7, 20),
605 std::make_tuple(false, false, WRITE_PREPARED
, 8, 20),
606 std::make_tuple(false, false, WRITE_PREPARED
, 9, 20),
607 std::make_tuple(false, false, WRITE_PREPARED
, 10, 20),
608 std::make_tuple(false, false, WRITE_PREPARED
, 11, 20),
609 std::make_tuple(false, false, WRITE_PREPARED
, 12, 20),
610 std::make_tuple(false, false, WRITE_PREPARED
, 13, 20),
611 std::make_tuple(false, false, WRITE_PREPARED
, 14, 20),
612 std::make_tuple(false, false, WRITE_PREPARED
, 15, 20),
613 std::make_tuple(false, false, WRITE_PREPARED
, 16, 20),
614 std::make_tuple(false, false, WRITE_PREPARED
, 17, 20),
615 std::make_tuple(false, false, WRITE_PREPARED
, 18, 20),
616 std::make_tuple(false, false, WRITE_PREPARED
, 19, 20)));
618 INSTANTIATE_TEST_CASE_P(
619 TwoWriteQueues
, SeqAdvanceConcurrentTest
,
620 ::testing::Values(std::make_tuple(false, true, WRITE_PREPARED
, 0, 10),
621 std::make_tuple(false, true, WRITE_PREPARED
, 1, 10),
622 std::make_tuple(false, true, WRITE_PREPARED
, 2, 10),
623 std::make_tuple(false, true, WRITE_PREPARED
, 3, 10),
624 std::make_tuple(false, true, WRITE_PREPARED
, 4, 10),
625 std::make_tuple(false, true, WRITE_PREPARED
, 5, 10),
626 std::make_tuple(false, true, WRITE_PREPARED
, 6, 10),
627 std::make_tuple(false, true, WRITE_PREPARED
, 7, 10),
628 std::make_tuple(false, true, WRITE_PREPARED
, 8, 10),
629 std::make_tuple(false, true, WRITE_PREPARED
, 9, 10)));
631 INSTANTIATE_TEST_CASE_P(
632 OneWriteQueue
, SeqAdvanceConcurrentTest
,
633 ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED
, 0, 10),
634 std::make_tuple(false, false, WRITE_PREPARED
, 1, 10),
635 std::make_tuple(false, false, WRITE_PREPARED
, 2, 10),
636 std::make_tuple(false, false, WRITE_PREPARED
, 3, 10),
637 std::make_tuple(false, false, WRITE_PREPARED
, 4, 10),
638 std::make_tuple(false, false, WRITE_PREPARED
, 5, 10),
639 std::make_tuple(false, false, WRITE_PREPARED
, 6, 10),
640 std::make_tuple(false, false, WRITE_PREPARED
, 7, 10),
641 std::make_tuple(false, false, WRITE_PREPARED
, 8, 10),
642 std::make_tuple(false, false, WRITE_PREPARED
, 9, 10)));
643 #endif // ROCKSDB_VALGRIND_RUN
645 TEST_P(WritePreparedTransactionTest
, CommitMapTest
) {
646 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
648 assert(wp_db
->db_impl_
);
649 size_t size
= wp_db
->COMMIT_CACHE_SIZE
;
650 CommitEntry c
= {5, 12}, e
;
651 bool evicted
= wp_db
->AddCommitEntry(c
.prep_seq
% size
, c
, &e
);
652 ASSERT_FALSE(evicted
);
654 // Should be able to read the same value
655 CommitEntry64b dont_care
;
656 bool found
= wp_db
->GetCommitEntry(c
.prep_seq
% size
, &dont_care
, &e
);
659 // Should be able to distinguish between overlapping entries
660 found
= wp_db
->GetCommitEntry((c
.prep_seq
+ size
) % size
, &dont_care
, &e
);
662 ASSERT_NE(c
.prep_seq
+ size
, e
.prep_seq
);
663 // Should be able to detect non-existent entry
664 found
= wp_db
->GetCommitEntry((c
.prep_seq
+ 1) % size
, &dont_care
, &e
);
667 // Reject an invalid exchange
668 CommitEntry e2
= {c
.prep_seq
+ size
, c
.commit_seq
+ size
};
669 CommitEntry64b
e2_64b(e2
, wp_db
->FORMAT
);
670 bool exchanged
= wp_db
->ExchangeCommitEntry(e2
.prep_seq
% size
, e2_64b
, e
);
671 ASSERT_FALSE(exchanged
);
672 // check whether it did actually reject that
673 found
= wp_db
->GetCommitEntry(e2
.prep_seq
% size
, &dont_care
, &e
);
677 // Accept a valid exchange
678 CommitEntry64b
c_64b(c
, wp_db
->FORMAT
);
679 CommitEntry e3
= {c
.prep_seq
+ size
, c
.commit_seq
+ size
+ 1};
680 exchanged
= wp_db
->ExchangeCommitEntry(c
.prep_seq
% size
, c_64b
, e3
);
681 ASSERT_TRUE(exchanged
);
682 // check whether it did actually accepted that
683 found
= wp_db
->GetCommitEntry(c
.prep_seq
% size
, &dont_care
, &e
);
688 CommitEntry e4
= {e3
.prep_seq
+ size
, e3
.commit_seq
+ size
+ 1};
689 evicted
= wp_db
->AddCommitEntry(e4
.prep_seq
% size
, e4
, &e
);
690 ASSERT_TRUE(evicted
);
692 found
= wp_db
->GetCommitEntry(e4
.prep_seq
% size
, &dont_care
, &e
);
697 TEST_P(WritePreparedTransactionTest
, MaybeUpdateOldCommitMap
) {
698 // If prepare <= snapshot < commit we should keep the entry around since its
699 // nonexistence could be interpreted as committed in the snapshot while it is
700 // not true. We keep such entries around by adding them to the
702 uint64_t p
/*prepare*/, c
/*commit*/, s
/*snapshot*/, ns
/*next_snapshot*/;
703 p
= 10l, c
= 15l, s
= 20l, ns
= 21l;
704 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
705 // If we do not expect the old commit map to be updated, try also with a next
706 // snapshot that is expected to update the old commit map. This would test
707 // that MaybeUpdateOldCommitMap would not prevent us from checking the next
708 // snapshot that must be checked.
709 p
= 10l, c
= 15l, s
= 20l, ns
= 11l;
710 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
712 p
= 10l, c
= 20l, s
= 20l, ns
= 19l;
713 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
714 p
= 10l, c
= 20l, s
= 20l, ns
= 21l;
715 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
717 p
= 20l, c
= 20l, s
= 20l, ns
= 21l;
718 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
719 p
= 20l, c
= 20l, s
= 20l, ns
= 19l;
720 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
722 p
= 10l, c
= 25l, s
= 20l, ns
= 21l;
723 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, true);
725 p
= 20l, c
= 25l, s
= 20l, ns
= 21l;
726 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, true);
728 p
= 21l, c
= 25l, s
= 20l, ns
= 22l;
729 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
730 p
= 21l, c
= 25l, s
= 20l, ns
= 19l;
731 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
734 // Test that the entries in old_commit_map_ get garbage collected properly
735 TEST_P(WritePreparedTransactionTest
, OldCommitMapGC
) {
736 const size_t snapshot_cache_bits
= 0;
737 const size_t commit_cache_bits
= 0;
738 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
739 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(new WritePreparedTxnDBMock(
740 mock_db
, txn_db_options
, snapshot_cache_bits
, commit_cache_bits
));
742 SequenceNumber seq
= 0;
743 // Take the first snapshot that overlaps with two txn
744 auto prep_seq
= ++seq
;
745 wp_db
->AddPrepared(prep_seq
);
746 auto prep_seq2
= ++seq
;
747 wp_db
->AddPrepared(prep_seq2
);
748 auto snap_seq1
= seq
;
749 wp_db
->TakeSnapshot(snap_seq1
);
750 auto commit_seq
= ++seq
;
751 wp_db
->AddCommitted(prep_seq
, commit_seq
);
752 wp_db
->RemovePrepared(prep_seq
);
753 auto commit_seq2
= ++seq
;
754 wp_db
->AddCommitted(prep_seq2
, commit_seq2
);
755 wp_db
->RemovePrepared(prep_seq2
);
756 // Take the 2nd and 3rd snapshot that overlap with the same txn
758 wp_db
->AddPrepared(prep_seq
);
759 auto snap_seq2
= seq
;
760 wp_db
->TakeSnapshot(snap_seq2
);
762 auto snap_seq3
= seq
;
763 wp_db
->TakeSnapshot(snap_seq3
);
766 wp_db
->AddCommitted(prep_seq
, commit_seq
);
767 wp_db
->RemovePrepared(prep_seq
);
768 // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
769 // only item in the commit_cache_ via another commit.
771 wp_db
->AddPrepared(prep_seq
);
773 wp_db
->AddCommitted(prep_seq
, commit_seq
);
774 wp_db
->RemovePrepared(prep_seq
);
776 // Verify that the evicted commit entries for all snapshots are in the
779 ASSERT_FALSE(wp_db
->old_commit_map_empty_
.load());
780 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
781 ASSERT_EQ(3, wp_db
->old_commit_map_
.size());
782 ASSERT_EQ(2, wp_db
->old_commit_map_
[snap_seq1
].size());
783 ASSERT_EQ(1, wp_db
->old_commit_map_
[snap_seq2
].size());
784 ASSERT_EQ(1, wp_db
->old_commit_map_
[snap_seq3
].size());
787 // Verify that the 2nd snapshot is cleaned up after the release
788 wp_db
->ReleaseSnapshotInternal(snap_seq2
);
790 ASSERT_FALSE(wp_db
->old_commit_map_empty_
.load());
791 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
792 ASSERT_EQ(2, wp_db
->old_commit_map_
.size());
793 ASSERT_EQ(2, wp_db
->old_commit_map_
[snap_seq1
].size());
794 ASSERT_EQ(1, wp_db
->old_commit_map_
[snap_seq3
].size());
797 // Verify that the 1st snapshot is cleaned up after the release
798 wp_db
->ReleaseSnapshotInternal(snap_seq1
);
800 ASSERT_FALSE(wp_db
->old_commit_map_empty_
.load());
801 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
802 ASSERT_EQ(1, wp_db
->old_commit_map_
.size());
803 ASSERT_EQ(1, wp_db
->old_commit_map_
[snap_seq3
].size());
806 // Verify that the 3rd snapshot is cleaned up after the release
807 wp_db
->ReleaseSnapshotInternal(snap_seq3
);
809 ASSERT_TRUE(wp_db
->old_commit_map_empty_
.load());
810 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
811 ASSERT_EQ(0, wp_db
->old_commit_map_
.size());
815 TEST_P(WritePreparedTransactionTest
, CheckAgainstSnapshotsTest
) {
816 std::vector
<SequenceNumber
> snapshots
= {100l, 200l, 300l, 400l, 500l,
817 600l, 700l, 800l, 900l};
818 const size_t snapshot_cache_bits
= 2;
819 // Safety check to express the intended size in the test. Can be adjusted if
820 // the snapshots lists changed.
821 assert((1ul << snapshot_cache_bits
) * 2 + 1 == snapshots
.size());
822 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
823 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
824 new WritePreparedTxnDBMock(mock_db
, txn_db_options
, snapshot_cache_bits
));
825 SequenceNumber version
= 1000l;
826 ASSERT_EQ(0, wp_db
->snapshots_total_
);
827 wp_db
->UpdateSnapshots(snapshots
, version
);
828 ASSERT_EQ(snapshots
.size(), wp_db
->snapshots_total_
);
829 // seq numbers are chosen so that we have two of them between each two
830 // snapshots. If the diff of two consecutive seq is more than 5, there is a
831 // snapshot between them.
832 std::vector
<SequenceNumber
> seqs
= {50l, 55l, 150l, 155l, 250l, 255l, 350l,
833 355l, 450l, 455l, 550l, 555l, 650l, 655l,
834 750l, 755l, 850l, 855l, 950l, 955l};
835 assert(seqs
.size() > 1);
836 for (size_t i
= 0; i
< seqs
.size() - 1; i
++) {
837 wp_db
->old_commit_map_empty_
= true; // reset
838 CommitEntry commit_entry
= {seqs
[i
], seqs
[i
+ 1]};
839 wp_db
->CheckAgainstSnapshots(commit_entry
);
840 // Expect update if there is snapshot in between the prepare and commit
841 bool expect_update
= commit_entry
.commit_seq
- commit_entry
.prep_seq
> 5 &&
842 commit_entry
.commit_seq
>= snapshots
.front() &&
843 commit_entry
.prep_seq
<= snapshots
.back();
844 ASSERT_EQ(expect_update
, !wp_db
->old_commit_map_empty_
);
848 // This test is too slow for travis
850 #ifndef ROCKSDB_VALGRIND_RUN
851 // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
852 // parallel with UpdateSnapshots.
853 TEST_P(SnapshotConcurrentAccessTest
, SnapshotConcurrentAccessTest
) {
854 // We have a sync point in the method under test after checking each snapshot.
855 // If you increase the max number of snapshots in this test, more sync points
856 // in the methods must also be added.
857 const std::vector
<SequenceNumber
> snapshots
= {10l, 20l, 30l, 40l, 50l,
858 60l, 70l, 80l, 90l, 100l};
859 const size_t snapshot_cache_bits
= 2;
860 // Safety check to express the intended size in the test. Can be adjusted if
861 // the snapshots lists changed.
862 assert((1ul << snapshot_cache_bits
) * 2 + 2 == snapshots
.size());
863 SequenceNumber version
= 1000l;
864 // Choose the cache size so that the new snapshot list could replace all the
865 // existing items in the cache and also have some overflow.
866 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
867 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
868 new WritePreparedTxnDBMock(mock_db
, txn_db_options
, snapshot_cache_bits
));
869 const size_t extra
= 2;
871 // Add up to extra items that do not fit into the cache
872 for (size_t old_size
= 1; old_size
<= wp_db
->SNAPSHOT_CACHE_SIZE
+ extra
;
874 const std::vector
<SequenceNumber
> old_snapshots(
875 snapshots
.begin(), snapshots
.begin() + old_size
);
877 // Each member of old snapshot might or might not appear in the new list. We
878 // create a common_snapshots for each combination.
879 size_t new_comb_cnt
= size_t(1) << old_size
;
880 for (size_t new_comb
= 0; new_comb
< new_comb_cnt
; new_comb
++, loop_id
++) {
881 if (loop_id
% split_cnt_
!= split_id_
) continue;
882 printf("."); // To signal progress
884 std::vector
<SequenceNumber
> common_snapshots
;
885 for (size_t i
= 0; i
< old_snapshots
.size(); i
++) {
886 if (IsInCombination(i
, new_comb
)) {
887 common_snapshots
.push_back(old_snapshots
[i
]);
890 // And add some new snapshots to the common list
891 for (size_t added_snapshots
= 0;
892 added_snapshots
<= snapshots
.size() - old_snapshots
.size();
894 std::vector
<SequenceNumber
> new_snapshots
= common_snapshots
;
895 for (size_t i
= 0; i
< added_snapshots
; i
++) {
896 new_snapshots
.push_back(snapshots
[old_snapshots
.size() + i
]);
898 for (auto it
= common_snapshots
.begin(); it
!= common_snapshots
.end();
901 // Create a commit entry that is around the snapshot and thus should
902 // be not be discarded
903 CommitEntry entry
= {static_cast<uint64_t>(snapshot
- 1),
905 // The critical part is when iterating the snapshot cache. Afterwards,
906 // we are operating under the lock
908 std::min(old_snapshots
.size(), wp_db
->SNAPSHOT_CACHE_SIZE
) + 1;
910 std::min(new_snapshots
.size(), wp_db
->SNAPSHOT_CACHE_SIZE
) + 1;
911 // Break each thread at two points
912 for (size_t a1
= 1; a1
<= a_range
; a1
++) {
913 for (size_t a2
= a1
+ 1; a2
<= a_range
; a2
++) {
914 for (size_t b1
= 1; b1
<= b_range
; b1
++) {
915 for (size_t b2
= b1
+ 1; b2
<= b_range
; b2
++) {
916 SnapshotConcurrentAccessTestInternal(
917 wp_db
.get(), old_snapshots
, new_snapshots
, entry
, version
,
929 #endif // ROCKSDB_VALGRIND_RUN
932 // This test clarifies the contract of AdvanceMaxEvictedSeq method
933 TEST_P(WritePreparedTransactionTest
, AdvanceMaxEvictedSeqBasicTest
) {
934 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
935 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
936 new WritePreparedTxnDBMock(mock_db
, txn_db_options
));
938 // 1. Set the initial values for max, prepared, and snapshots
939 SequenceNumber zero_max
= 0l;
940 // Set the initial list of prepared txns
941 const std::vector
<SequenceNumber
> initial_prepared
= {10, 30, 50, 100,
943 for (auto p
: initial_prepared
) {
944 wp_db
->AddPrepared(p
);
946 // This updates the max value and also set old prepared
947 SequenceNumber init_max
= 100;
948 wp_db
->AdvanceMaxEvictedSeq(zero_max
, init_max
);
949 const std::vector
<SequenceNumber
> initial_snapshots
= {20, 40};
950 wp_db
->SetDBSnapshots(initial_snapshots
);
951 // This will update the internal cache of snapshots from the DB
952 wp_db
->UpdateSnapshots(initial_snapshots
, init_max
);
954 // 2. Invoke AdvanceMaxEvictedSeq
955 const std::vector
<SequenceNumber
> latest_snapshots
= {20, 110, 220, 300};
956 wp_db
->SetDBSnapshots(latest_snapshots
);
957 SequenceNumber new_max
= 200;
958 wp_db
->AdvanceMaxEvictedSeq(init_max
, new_max
);
960 // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
961 // a. max should be updated to new_max
962 ASSERT_EQ(wp_db
->max_evicted_seq_
, new_max
);
963 // b. delayed prepared should contain every txn <= max and prepared should
964 // only contain txns > max
965 auto it
= initial_prepared
.begin();
966 for (; it
!= initial_prepared
.end() && *it
<= new_max
; it
++) {
967 ASSERT_EQ(1, wp_db
->delayed_prepared_
.erase(*it
));
969 ASSERT_TRUE(wp_db
->delayed_prepared_
.empty());
970 for (; it
!= initial_prepared
.end() && !wp_db
->prepared_txns_
.empty();
971 it
++, wp_db
->prepared_txns_
.pop()) {
972 ASSERT_EQ(*it
, wp_db
->prepared_txns_
.top());
974 ASSERT_TRUE(it
== initial_prepared
.end());
975 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
976 // c. snapshots should contain everything below new_max
977 auto sit
= latest_snapshots
.begin();
978 for (size_t i
= 0; sit
!= latest_snapshots
.end() && *sit
<= new_max
&&
979 i
< wp_db
->snapshots_total_
;
981 ASSERT_TRUE(i
< wp_db
->snapshots_total_
);
982 // This test is in small scale and the list of snapshots are assumed to be
983 // within the cache size limit. This is just a safety check to double check
985 ASSERT_TRUE(i
< wp_db
->SNAPSHOT_CACHE_SIZE
);
986 ASSERT_EQ(*sit
, wp_db
->snapshot_cache_
[i
]);
990 // This tests that transactions with duplicate keys perform correctly after max
991 // is advancing their prepared sequence numbers. This will not be the case if
992 // for example the txn does not add the prepared seq for the second sub-batch to
993 // the PrepareHeap structure.
994 TEST_P(WritePreparedTransactionTest
, AdvanceMaxEvictedSeqWithDuplicatesTest
) {
995 WriteOptions write_options
;
996 TransactionOptions txn_options
;
997 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
998 ASSERT_OK(txn0
->SetName("xid"));
999 ASSERT_OK(txn0
->Put(Slice("key"), Slice("value1")));
1000 ASSERT_OK(txn0
->Put(Slice("key"), Slice("value2")));
1001 ASSERT_OK(txn0
->Prepare());
1003 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1004 // Ensure that all the prepared sequence numbers will be removed from the
1006 SequenceNumber new_max
= wp_db
->COMMIT_CACHE_SIZE
;
1007 wp_db
->AdvanceMaxEvictedSeq(0, new_max
);
1010 PinnableSlice pinnable_val
;
1011 auto s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
);
1012 ASSERT_TRUE(s
.IsNotFound());
1015 wp_db
->db_impl_
->FlushWAL(true);
1016 wp_db
->TEST_Crash();
1018 assert(db
!= nullptr);
1019 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1020 wp_db
->AdvanceMaxEvictedSeq(0, new_max
);
1021 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
);
1022 ASSERT_TRUE(s
.IsNotFound());
1024 txn0
= db
->GetTransactionByName("xid");
1025 ASSERT_OK(txn0
->Rollback());
1029 TEST_P(SeqAdvanceConcurrentTest
, SeqAdvanceConcurrentTest
) {
1030 // Given the sequential run of txns, with this timeout we should never see a
1031 // deadlock nor a timeout unless we have a key conflict, which should be
1032 // almost infeasible.
1033 txn_db_options
.transaction_lock_timeout
= 1000;
1034 txn_db_options
.default_lock_timeout
= 1000;
1038 // Number of different txn types we use in this test
1039 const size_t type_cnt
= 5;
1040 // The size of the first write group
1041 // TODO(myabandeh): This should be increase for pre-release tests
1042 const size_t first_group_size
= 2;
1043 // Total number of txns we run in each test
1044 // TODO(myabandeh): This should be increase for pre-release tests
1045 const size_t txn_cnt
= first_group_size
+ 1;
1047 size_t base
[txn_cnt
+ 1] = {
1050 for (size_t bi
= 1; bi
<= txn_cnt
; bi
++) {
1051 base
[bi
] = base
[bi
- 1] * type_cnt
;
1053 const size_t max_n
= static_cast<size_t>(std::pow(type_cnt
, txn_cnt
));
1054 printf("Number of cases being tested is %" ROCKSDB_PRIszt
"\n", max_n
);
1055 for (size_t n
= 0; n
< max_n
; n
++, ReOpen()) {
1056 if (n
% split_cnt_
!= split_id_
) continue;
1057 if (n
% 1000 == 0) {
1058 printf("Tested %" ROCKSDB_PRIszt
" cases so far\n", n
);
1060 DBImpl
* db_impl
= reinterpret_cast<DBImpl
*>(db
->GetRootDB());
1061 auto seq
= db_impl
->TEST_GetLastVisibleSequence();
1063 // This is increased before writing the batch for commit
1065 // This is increased before txn starts linking if it expects to do a commit
1067 expected_commits
= 0;
1068 std::vector
<port::Thread
> threads
;
1071 std::atomic
<bool> batch_formed(false);
1072 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1073 "WriteThread::EnterAsBatchGroupLeader:End",
1074 [&](void* /*arg*/) { batch_formed
= true; });
1075 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1076 "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
1079 // Wait until the others are linked too.
1080 while (linked
< first_group_size
) {
1082 } else if (linked
== 1 + first_group_size
) {
1083 // Make the 2nd batch of the rest of writes plus any followup
1084 // commits from the first batch
1085 while (linked
< txn_cnt
+ commit_writes
) {
1088 // Then we will have one or more batches consisting of follow-up
1089 // commits from the 2nd batch. There is a bit of non-determinism here
1090 // but it should be tolerable.
1093 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1094 for (size_t bi
= 0; bi
< txn_cnt
; bi
++) {
1095 // get the bi-th digit in number system based on type_cnt
1096 size_t d
= (n
% base
[bi
+ 1]) / base
[bi
];
1099 threads
.emplace_back(txn_t0
, bi
);
1102 threads
.emplace_back(txn_t1
, bi
);
1105 threads
.emplace_back(txn_t2
, bi
);
1108 threads
.emplace_back(txn_t3
, bi
);
1111 threads
.emplace_back(txn_t3
, bi
);
1116 // wait to be linked
1117 while (linked
.load() <= bi
) {
1119 // after a queue of size first_group_size
1120 if (bi
+ 1 == first_group_size
) {
1121 while (!batch_formed
) {
1123 // to make it more deterministic, wait until the commits are linked
1124 while (linked
.load() <= bi
+ expected_commits
) {
1128 for (auto& t
: threads
) {
1131 if (options
.two_write_queues
) {
1132 // In this case none of the above scheduling tricks to deterministically
1133 // form merged batches works because the writes go to separate queues.
1134 // This would result in different write groups in each run of the test. We
1135 // still keep the test since although non-deterministic and hard to debug,
1136 // it is still useful to have.
1137 // TODO(myabandeh): Add a deterministic unit test for two_write_queues
1140 // Check if memtable inserts advanced seq number as expected
1141 seq
= db_impl
->TEST_GetLastVisibleSequence();
1142 ASSERT_EQ(exp_seq
, seq
);
1144 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1145 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
1147 // Check if recovery preserves the last sequence number
1148 db_impl
->FlushWAL(true);
1150 assert(db
!= nullptr);
1151 db_impl
= reinterpret_cast<DBImpl
*>(db
->GetRootDB());
1152 seq
= db_impl
->TEST_GetLastVisibleSequence();
1153 ASSERT_EQ(exp_seq
, seq
);
1155 // Check if flush preserves the last sequence number
1156 db_impl
->Flush(fopt
);
1157 seq
= db_impl
->GetLatestSequenceNumber();
1158 ASSERT_EQ(exp_seq
, seq
);
1160 // Check if recovery after flush preserves the last sequence number
1161 db_impl
->FlushWAL(true);
1163 assert(db
!= nullptr);
1164 db_impl
= reinterpret_cast<DBImpl
*>(db
->GetRootDB());
1165 seq
= db_impl
->GetLatestSequenceNumber();
1166 ASSERT_EQ(exp_seq
, seq
);
1170 // Run a couple of different txns among them some uncommitted. Restart the db at
1171 // a couple points to check whether the list of uncommitted txns are recovered
1173 TEST_P(WritePreparedTransactionTest
, BasicRecoveryTest
) {
1174 options
.disable_auto_compactions
= true;
1176 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1180 TransactionOptions txn_options
;
1181 WriteOptions write_options
;
1182 size_t index
= 1000;
1183 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
1184 auto istr0
= std::to_string(index
);
1185 auto s
= txn0
->SetName("xid" + istr0
);
1187 s
= txn0
->Put(Slice("foo0" + istr0
), Slice("bar0" + istr0
));
1189 s
= txn0
->Prepare();
1190 auto prep_seq_0
= txn0
->GetId();
1195 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
1196 auto istr1
= std::to_string(index
);
1197 s
= txn1
->SetName("xid" + istr1
);
1199 s
= txn1
->Put(Slice("foo1" + istr1
), Slice("bar"));
1201 s
= txn1
->Prepare();
1202 auto prep_seq_1
= txn1
->GetId();
1207 PinnableSlice pinnable_val
;
1208 // Check the value is not committed before restart
1209 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1210 ASSERT_TRUE(s
.IsNotFound());
1211 pinnable_val
.Reset();
1215 wp_db
->db_impl_
->FlushWAL(true);
1216 wp_db
->TEST_Crash();
1218 assert(db
!= nullptr);
1219 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1220 // After recovery, all the uncommitted txns (0 and 1) should be inserted into
1221 // delayed_prepared_
1222 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1223 ASSERT_FALSE(wp_db
->delayed_prepared_empty_
);
1224 ASSERT_LE(prep_seq_0
, wp_db
->max_evicted_seq_
);
1225 ASSERT_LE(prep_seq_1
, wp_db
->max_evicted_seq_
);
1227 ReadLock
rl(&wp_db
->prepared_mutex_
);
1228 ASSERT_EQ(2, wp_db
->delayed_prepared_
.size());
1229 ASSERT_TRUE(wp_db
->delayed_prepared_
.find(prep_seq_0
) !=
1230 wp_db
->delayed_prepared_
.end());
1231 ASSERT_TRUE(wp_db
->delayed_prepared_
.find(prep_seq_1
) !=
1232 wp_db
->delayed_prepared_
.end());
1235 // Check the value is still not committed after restart
1236 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1237 ASSERT_TRUE(s
.IsNotFound());
1238 pinnable_val
.Reset();
1242 // Test that a recovered txns will be properly marked committed for the next
1244 txn1
= db
->GetTransactionByName("xid" + istr1
);
1245 ASSERT_NE(txn1
, nullptr);
1250 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
1251 auto istr2
= std::to_string(index
);
1252 s
= txn2
->SetName("xid" + istr2
);
1254 s
= txn2
->Put(Slice("foo2" + istr2
), Slice("bar"));
1256 s
= txn2
->Prepare();
1257 auto prep_seq_2
= txn2
->GetId();
1260 wp_db
->db_impl_
->FlushWAL(true);
1261 wp_db
->TEST_Crash();
1263 assert(db
!= nullptr);
1264 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1265 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1266 ASSERT_FALSE(wp_db
->delayed_prepared_empty_
);
1268 // 0 and 2 are prepared and 1 is committed
1270 ReadLock
rl(&wp_db
->prepared_mutex_
);
1271 ASSERT_EQ(2, wp_db
->delayed_prepared_
.size());
1272 const auto& end
= wp_db
->delayed_prepared_
.end();
1273 ASSERT_NE(wp_db
->delayed_prepared_
.find(prep_seq_0
), end
);
1274 ASSERT_EQ(wp_db
->delayed_prepared_
.find(prep_seq_1
), end
);
1275 ASSERT_NE(wp_db
->delayed_prepared_
.find(prep_seq_2
), end
);
1277 ASSERT_LE(prep_seq_0
, wp_db
->max_evicted_seq_
);
1278 ASSERT_LE(prep_seq_2
, wp_db
->max_evicted_seq_
);
1280 // Commit all the remaining txns
1281 txn0
= db
->GetTransactionByName("xid" + istr0
);
1282 ASSERT_NE(txn0
, nullptr);
1284 txn2
= db
->GetTransactionByName("xid" + istr2
);
1285 ASSERT_NE(txn2
, nullptr);
1288 // Check the value is committed after commit
1289 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1290 ASSERT_TRUE(s
.ok());
1291 ASSERT_TRUE(pinnable_val
== ("bar0" + istr0
));
1292 pinnable_val
.Reset();
1296 wp_db
->db_impl_
->FlushWAL(true);
1298 assert(db
!= nullptr);
1299 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1300 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1301 ASSERT_TRUE(wp_db
->delayed_prepared_empty_
);
1303 // Check the value is still committed after recovery
1304 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1305 ASSERT_TRUE(s
.ok());
1306 ASSERT_TRUE(pinnable_val
== ("bar0" + istr0
));
1307 pinnable_val
.Reset();
1310 // After recovery the commit map is empty while the max is set. The code would
1311 // go through a different path which requires a separate test.
1312 TEST_P(WritePreparedTransactionTest
, IsInSnapshotEmptyMapTest
) {
1313 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1314 wp_db
->max_evicted_seq_
= 100;
1315 ASSERT_FALSE(wp_db
->IsInSnapshot(50, 40));
1316 ASSERT_TRUE(wp_db
->IsInSnapshot(50, 50));
1317 ASSERT_TRUE(wp_db
->IsInSnapshot(50, 100));
1318 ASSERT_TRUE(wp_db
->IsInSnapshot(50, 150));
1319 ASSERT_FALSE(wp_db
->IsInSnapshot(100, 80));
1320 ASSERT_TRUE(wp_db
->IsInSnapshot(100, 100));
1321 ASSERT_TRUE(wp_db
->IsInSnapshot(100, 150));
1324 // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
1325 // snapshot, max_committed_seq_, prepared, and commit entries.
1326 TEST_P(WritePreparedTransactionTest
, IsInSnapshotTest
) {
1328 // Use small commit cache to trigger lots of eviction and fast advance of
1330 const size_t commit_cache_bits
= 3;
1331 // Same for snapshot cache size
1332 const size_t snapshot_cache_bits
= 2;
1334 // Take some preliminary snapshots first. This is to stress the data structure
1335 // that holds the old snapshots as it will be designed to be efficient when
1336 // only a few snapshots are below the max_evicted_seq_.
1337 for (int max_snapshots
= 1; max_snapshots
< 20; max_snapshots
++) {
1338 // Leave some gap between the preliminary snapshots and the final snapshot
1339 // that we check. This should test for also different overlapping scenarios
1340 // between the last snapshot and the commits.
1341 for (int max_gap
= 1; max_gap
< 10; max_gap
++) {
1342 // Since we do not actually write to db, we mock the seq as it would be
1343 // increased by the db. The only exception is that we need db seq to
1344 // advance for our snapshots. for which we apply a dummy put each time we
1345 // increase our mock of seq.
1347 // At each step we prepare a txn and then we commit it in the next txn.
1348 // This emulates the consecutive transactions that write to the same key
1349 uint64_t cur_txn
= 0;
1350 // Number of snapshots taken so far
1351 int num_snapshots
= 0;
1352 // Number of gaps applied so far
1354 // The final snapshot that we will inspect
1355 uint64_t snapshot
= 0;
1356 bool found_committed
= false;
1357 // To stress the data structure that maintain prepared txns, at each cycle
1358 // we add a new prepare txn. These do not mean to be committed for
1359 // snapshot inspection.
1360 std::set
<uint64_t> prepared
;
1361 // We keep the list of txns committed before we take the last snapshot.
1362 // These should be the only seq numbers that will be found in the snapshot
1363 std::set
<uint64_t> committed_before
;
1364 // The set of commit seq numbers to be excluded from IsInSnapshot queries
1365 std::set
<uint64_t> commit_seqs
;
1366 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
1367 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(new WritePreparedTxnDBMock(
1368 mock_db
, txn_db_options
, snapshot_cache_bits
, commit_cache_bits
));
1369 // We continue until max advances a bit beyond the snapshot.
1370 while (!snapshot
|| wp_db
->max_evicted_seq_
< snapshot
+ 100) {
1371 // do prepare for a transaction
1373 wp_db
->AddPrepared(seq
);
1374 prepared
.insert(seq
);
1376 // If cur_txn is not started, do prepare for it.
1380 wp_db
->AddPrepared(cur_txn
);
1381 } else { // else commit it
1383 wp_db
->AddCommitted(cur_txn
, seq
);
1384 wp_db
->RemovePrepared(cur_txn
);
1385 commit_seqs
.insert(seq
);
1387 committed_before
.insert(cur_txn
);
1392 if (num_snapshots
< max_snapshots
- 1) {
1393 // Take preliminary snapshots
1394 wp_db
->TakeSnapshot(seq
);
1396 } else if (gap_cnt
< max_gap
) {
1397 // Wait for some gap before taking the final snapshot
1399 } else if (!snapshot
) {
1400 // Take the final snapshot if it is not already taken
1402 wp_db
->TakeSnapshot(snapshot
);
1406 // If the snapshot is taken, verify seq numbers visible to it. We redo
1407 // it at each cycle to test that the system is still sound when
1408 // max_evicted_seq_ advances.
1410 for (uint64_t s
= 1;
1411 s
<= seq
&& commit_seqs
.find(s
) == commit_seqs
.end(); s
++) {
1412 bool was_committed
=
1413 (committed_before
.find(s
) != committed_before
.end());
1414 bool is_in_snapshot
= wp_db
->IsInSnapshot(s
, snapshot
);
1415 if (was_committed
!= is_in_snapshot
) {
1416 printf("max_snapshots %d max_gap %d seq %" PRIu64
" max %" PRIu64
1417 " snapshot %" PRIu64
1418 " gap_cnt %d num_snapshots %d s %" PRIu64
"\n",
1419 max_snapshots
, max_gap
, seq
,
1420 wp_db
->max_evicted_seq_
.load(), snapshot
, gap_cnt
,
1423 ASSERT_EQ(was_committed
, is_in_snapshot
);
1424 found_committed
= found_committed
|| is_in_snapshot
;
1428 // Safety check to make sure the test actually ran
1429 ASSERT_TRUE(found_committed
);
1430 // As an extra check, check if prepared set will be properly empty after
1431 // they are committed.
1433 wp_db
->AddCommitted(cur_txn
, seq
);
1434 wp_db
->RemovePrepared(cur_txn
);
1436 for (auto p
: prepared
) {
1437 wp_db
->AddCommitted(p
, seq
);
1438 wp_db
->RemovePrepared(p
);
1440 ASSERT_TRUE(wp_db
->delayed_prepared_
.empty());
1441 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1446 void ASSERT_SAME(ReadOptions roptions
, TransactionDB
* db
, Status exp_s
,
1447 PinnableSlice
& exp_v
, Slice key
) {
1450 s
= db
->Get(roptions
, db
->DefaultColumnFamily(), key
, &v
);
1451 ASSERT_TRUE(exp_s
== s
);
1452 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
1454 ASSERT_TRUE(exp_v
== v
);
1457 // Try with MultiGet API too
1458 std::vector
<std::string
> values
;
1460 db
->MultiGet(roptions
, {db
->DefaultColumnFamily()}, {key
}, &values
);
1461 ASSERT_EQ(1, values
.size());
1462 ASSERT_EQ(1, s_vec
.size());
1464 ASSERT_TRUE(exp_s
== s
);
1465 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
1467 ASSERT_TRUE(exp_v
== values
[0]);
1471 void ASSERT_SAME(TransactionDB
* db
, Status exp_s
, PinnableSlice
& exp_v
,
1473 ASSERT_SAME(ReadOptions(), db
, exp_s
, exp_v
, key
);
1476 TEST_P(WritePreparedTransactionTest
, RollbackTest
) {
1477 ReadOptions roptions
;
1478 WriteOptions woptions
;
1479 TransactionOptions txn_options
;
1480 const size_t num_keys
= 4;
1481 const size_t num_values
= 5;
1482 for (size_t ikey
= 1; ikey
<= num_keys
; ikey
++) {
1483 for (size_t ivalue
= 0; ivalue
< num_values
; ivalue
++) {
1484 for (bool crash
: {false, true}) {
1486 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1487 std::string key_str
= "key" + ToString(ikey
);
1492 ASSERT_OK(db
->Put(woptions
, key_str
, "initvalue1"));
1495 ASSERT_OK(db
->Merge(woptions
, key_str
, "initvalue2"));
1498 ASSERT_OK(db
->Delete(woptions
, key_str
));
1501 ASSERT_OK(db
->SingleDelete(woptions
, key_str
));
1509 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key1"), &v1
);
1512 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key2"), &v2
);
1515 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key3"), &v3
);
1518 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key4"), &v4
);
1519 Transaction
* txn
= db
->BeginTransaction(woptions
, txn_options
);
1520 auto s
= txn
->SetName("xid0");
1522 s
= txn
->Put(Slice("key1"), Slice("value1"));
1524 s
= txn
->Merge(Slice("key2"), Slice("value2"));
1526 s
= txn
->Delete(Slice("key3"));
1528 s
= txn
->SingleDelete(Slice("key4"));
1534 ReadLock
rl(&wp_db
->prepared_mutex_
);
1535 ASSERT_FALSE(wp_db
->prepared_txns_
.empty());
1536 ASSERT_EQ(txn
->GetId(), wp_db
->prepared_txns_
.top());
1539 ASSERT_SAME(db
, s1
, v1
, "key1");
1540 ASSERT_SAME(db
, s2
, v2
, "key2");
1541 ASSERT_SAME(db
, s3
, v3
, "key3");
1542 ASSERT_SAME(db
, s4
, v4
, "key4");
1546 auto db_impl
= reinterpret_cast<DBImpl
*>(db
->GetRootDB());
1547 db_impl
->FlushWAL(true);
1548 dynamic_cast<WritePreparedTxnDB
*>(db
)->TEST_Crash();
1550 assert(db
!= nullptr);
1551 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1552 txn
= db
->GetTransactionByName("xid0");
1553 ASSERT_FALSE(wp_db
->delayed_prepared_empty_
);
1554 ReadLock
rl(&wp_db
->prepared_mutex_
);
1555 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1556 ASSERT_FALSE(wp_db
->delayed_prepared_
.empty());
1557 ASSERT_TRUE(wp_db
->delayed_prepared_
.find(txn
->GetId()) !=
1558 wp_db
->delayed_prepared_
.end());
1561 ASSERT_SAME(db
, s1
, v1
, "key1");
1562 ASSERT_SAME(db
, s2
, v2
, "key2");
1563 ASSERT_SAME(db
, s3
, v3
, "key3");
1564 ASSERT_SAME(db
, s4
, v4
, "key4");
1566 s
= txn
->Rollback();
1570 ASSERT_TRUE(wp_db
->delayed_prepared_empty_
);
1571 ReadLock
rl(&wp_db
->prepared_mutex_
);
1572 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1573 ASSERT_TRUE(wp_db
->delayed_prepared_
.empty());
1576 ASSERT_SAME(db
, s1
, v1
, "key1");
1577 ASSERT_SAME(db
, s2
, v2
, "key2");
1578 ASSERT_SAME(db
, s3
, v3
, "key3");
1579 ASSERT_SAME(db
, s4
, v4
, "key4");
1586 TEST_P(WritePreparedTransactionTest
, DisableGCDuringRecoveryTest
) {
1587 // Use large buffer to avoid memtable flush after 1024 insertions
1588 options
.write_buffer_size
= 1024 * 1024;
1590 std::vector
<KeyVersion
> versions
;
1592 for (uint64_t i
= 1; i
<= 1024; i
++) {
1593 std::string v
= "bar" + ToString(i
);
1594 ASSERT_OK(db
->Put(WriteOptions(), "foo", v
));
1595 VerifyKeys({{"foo", v
}});
1596 seq
++; // one for the key/value
1597 KeyVersion kv
= {"foo", v
, seq
, kTypeValue
};
1598 if (options
.two_write_queues
) {
1599 seq
++; // one for the commit
1601 versions
.emplace_back(kv
);
1603 std::reverse(std::begin(versions
), std::end(versions
));
1604 VerifyInternalKeys(versions
);
1605 DBImpl
* db_impl
= reinterpret_cast<DBImpl
*>(db
->GetRootDB());
1606 db_impl
->FlushWAL(true);
1607 // Use small buffer to ensure memtable flush during recovery
1608 options
.write_buffer_size
= 1024;
1610 VerifyInternalKeys(versions
);
1613 TEST_P(WritePreparedTransactionTest
, SequenceNumberZeroTest
) {
1614 ASSERT_OK(db
->Put(WriteOptions(), "foo", "bar"));
1615 VerifyKeys({{"foo", "bar"}});
1616 const Snapshot
* snapshot
= db
->GetSnapshot();
1617 ASSERT_OK(db
->Flush(FlushOptions()));
1618 // Dummy keys to avoid compaction trivially move files and get around actual
1619 // compaction logic.
1620 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
1621 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
1622 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1623 // Compaction will output keys with sequence number 0, if it is visible to
1624 // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
1625 // visible to any snapshot.
1626 VerifyKeys({{"foo", "bar"}});
1627 VerifyKeys({{"foo", "bar"}}, snapshot
);
1628 VerifyInternalKeys({{"foo", "bar", 0, kTypeValue
}});
1629 db
->ReleaseSnapshot(snapshot
);
1632 // Compaction should not remove a key if it is not committed, and should
1633 // proceed with older versions of the key as-if the new version doesn't exist.
1634 TEST_P(WritePreparedTransactionTest
, CompactionShouldKeepUncommittedKeys
) {
1635 options
.disable_auto_compactions
= true;
1637 DBImpl
* db_impl
= reinterpret_cast<DBImpl
*>(db
->GetRootDB());
1638 // Snapshots to avoid keys get evicted.
1639 std::vector
<const Snapshot
*> snapshots
;
1640 // Keep track of expected sequence number.
1641 SequenceNumber expected_seq
= 0;
1643 auto add_key
= [&](std::function
<Status()> func
) {
1646 if (options
.two_write_queues
) {
1647 expected_seq
++; // 1 for commit
1649 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
1650 snapshots
.push_back(db
->GetSnapshot());
1653 // Each key here represent a standalone test case.
1654 add_key([&]() { return db
->Put(WriteOptions(), "key1", "value1_1"); });
1655 add_key([&]() { return db
->Put(WriteOptions(), "key2", "value2_1"); });
1656 add_key([&]() { return db
->Put(WriteOptions(), "key3", "value3_1"); });
1657 add_key([&]() { return db
->Put(WriteOptions(), "key4", "value4_1"); });
1658 add_key([&]() { return db
->Merge(WriteOptions(), "key5", "value5_1"); });
1659 add_key([&]() { return db
->Merge(WriteOptions(), "key5", "value5_2"); });
1660 add_key([&]() { return db
->Put(WriteOptions(), "key6", "value6_1"); });
1661 add_key([&]() { return db
->Put(WriteOptions(), "key7", "value7_1"); });
1662 ASSERT_OK(db
->Flush(FlushOptions()));
1663 add_key([&]() { return db
->Delete(WriteOptions(), "key6"); });
1664 add_key([&]() { return db
->SingleDelete(WriteOptions(), "key7"); });
1666 auto* transaction
= db
->BeginTransaction(WriteOptions());
1667 ASSERT_OK(transaction
->SetName("txn"));
1668 ASSERT_OK(transaction
->Put("key1", "value1_2"));
1669 ASSERT_OK(transaction
->Delete("key2"));
1670 ASSERT_OK(transaction
->SingleDelete("key3"));
1671 ASSERT_OK(transaction
->Merge("key4", "value4_2"));
1672 ASSERT_OK(transaction
->Merge("key5", "value5_3"));
1673 ASSERT_OK(transaction
->Put("key6", "value6_2"));
1674 ASSERT_OK(transaction
->Put("key7", "value7_2"));
1675 // Prepare but not commit.
1676 ASSERT_OK(transaction
->Prepare());
1677 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
1678 ASSERT_OK(db
->Flush(FlushOptions()));
1679 for (auto* s
: snapshots
) {
1680 db
->ReleaseSnapshot(s
);
1682 // Dummy keys to avoid compaction trivially move files and get around actual
1683 // compaction logic.
1684 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
1685 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
1686 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1688 {"key1", "value1_1"},
1689 {"key2", "value2_1"},
1690 {"key3", "value3_1"},
1691 {"key4", "value4_1"},
1692 {"key5", "value5_1,value5_2"},
1693 {"key6", "NOT_FOUND"},
1694 {"key7", "NOT_FOUND"},
1696 VerifyInternalKeys({
1697 {"key1", "value1_2", expected_seq
, kTypeValue
},
1698 {"key1", "value1_1", 0, kTypeValue
},
1699 {"key2", "", expected_seq
, kTypeDeletion
},
1700 {"key2", "value2_1", 0, kTypeValue
},
1701 {"key3", "", expected_seq
, kTypeSingleDeletion
},
1702 {"key3", "value3_1", 0, kTypeValue
},
1703 {"key4", "value4_2", expected_seq
, kTypeMerge
},
1704 {"key4", "value4_1", 0, kTypeValue
},
1705 {"key5", "value5_3", expected_seq
, kTypeMerge
},
1706 {"key5", "value5_1,value5_2", 0, kTypeValue
},
1707 {"key6", "value6_2", expected_seq
, kTypeValue
},
1708 {"key7", "value7_2", expected_seq
, kTypeValue
},
1710 ASSERT_OK(transaction
->Commit());
1712 {"key1", "value1_2"},
1713 {"key2", "NOT_FOUND"},
1714 {"key3", "NOT_FOUND"},
1715 {"key4", "value4_1,value4_2"},
1716 {"key5", "value5_1,value5_2,value5_3"},
1717 {"key6", "value6_2"},
1718 {"key7", "value7_2"},
1723 // Compaction should keep keys visible to a snapshot based on commit sequence,
1724 // not just prepare sequence.
1725 TEST_P(WritePreparedTransactionTest
, CompactionShouldKeepSnapshotVisibleKeys
) {
1726 options
.disable_auto_compactions
= true;
1728 // Keep track of expected sequence number.
1729 SequenceNumber expected_seq
= 0;
1730 auto* txn1
= db
->BeginTransaction(WriteOptions());
1731 ASSERT_OK(txn1
->SetName("txn1"));
1732 ASSERT_OK(txn1
->Put("key1", "value1_1"));
1733 ASSERT_OK(txn1
->Prepare());
1734 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
1735 ASSERT_OK(txn1
->Commit());
1736 DBImpl
* db_impl
= reinterpret_cast<DBImpl
*>(db
->GetRootDB());
1737 ASSERT_EQ(++expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
1739 // Take a snapshots to avoid keys get evicted before compaction.
1740 const Snapshot
* snapshot1
= db
->GetSnapshot();
1741 auto* txn2
= db
->BeginTransaction(WriteOptions());
1742 ASSERT_OK(txn2
->SetName("txn2"));
1743 ASSERT_OK(txn2
->Put("key2", "value2_1"));
1744 ASSERT_OK(txn2
->Prepare());
1745 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
1746 // txn1 commit before snapshot2 and it is visible to snapshot2.
1747 // txn2 commit after snapshot2 and it is not visible.
1748 const Snapshot
* snapshot2
= db
->GetSnapshot();
1749 ASSERT_OK(txn2
->Commit());
1750 ASSERT_EQ(++expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
1752 // Take a snapshots to avoid keys get evicted before compaction.
1753 const Snapshot
* snapshot3
= db
->GetSnapshot();
1754 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1_2"));
1755 expected_seq
++; // 1 for write
1756 SequenceNumber seq1
= expected_seq
;
1757 if (options
.two_write_queues
) {
1758 expected_seq
++; // 1 for commit
1760 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
1761 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2_2"));
1762 expected_seq
++; // 1 for write
1763 SequenceNumber seq2
= expected_seq
;
1764 if (options
.two_write_queues
) {
1765 expected_seq
++; // 1 for commit
1767 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
1768 ASSERT_OK(db
->Flush(FlushOptions()));
1769 db
->ReleaseSnapshot(snapshot1
);
1770 db
->ReleaseSnapshot(snapshot3
);
1771 // Dummy keys to avoid compaction trivially move files and get around actual
1772 // compaction logic.
1773 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
1774 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
1775 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1776 VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
1777 VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2
);
1778 VerifyInternalKeys({
1779 {"key1", "value1_2", seq1
, kTypeValue
},
1780 // "value1_1" is visible to snapshot2. Also keys at bottom level visible
1781 // to earliest snapshot will output with seq = 0.
1782 {"key1", "value1_1", 0, kTypeValue
},
1783 {"key2", "value2_2", seq2
, kTypeValue
},
1785 db
->ReleaseSnapshot(snapshot2
);
1788 // A more complex test to verify compaction/flush should keep keys visible
1790 TEST_P(WritePreparedTransactionTest
,
1791 CompactionShouldKeepSnapshotVisibleKeysRandomized
) {
1792 constexpr size_t kNumTransactions
= 10;
1793 constexpr size_t kNumIterations
= 1000;
1795 std::vector
<Transaction
*> transactions(kNumTransactions
, nullptr);
1796 std::vector
<size_t> versions(kNumTransactions
, 0);
1797 std::unordered_map
<std::string
, std::string
> current_data
;
1798 std::vector
<const Snapshot
*> snapshots
;
1799 std::vector
<std::unordered_map
<std::string
, std::string
>> snapshot_data
;
1802 options
.disable_auto_compactions
= true;
1805 for (size_t i
= 0; i
< kNumTransactions
; i
++) {
1806 std::string key
= "key" + ToString(i
);
1807 std::string value
= "value0";
1808 ASSERT_OK(db
->Put(WriteOptions(), key
, value
));
1809 current_data
[key
] = value
;
1811 VerifyKeys(current_data
);
1813 for (size_t iter
= 0; iter
< kNumIterations
; iter
++) {
1814 auto r
= rnd
.Next() % (kNumTransactions
+ 1);
1815 if (r
< kNumTransactions
) {
1816 std::string key
= "key" + ToString(r
);
1817 if (transactions
[r
] == nullptr) {
1818 std::string value
= "value" + ToString(versions
[r
] + 1);
1819 auto* txn
= db
->BeginTransaction(WriteOptions());
1820 ASSERT_OK(txn
->SetName("txn" + ToString(r
)));
1821 ASSERT_OK(txn
->Put(key
, value
));
1822 ASSERT_OK(txn
->Prepare());
1823 transactions
[r
] = txn
;
1825 std::string value
= "value" + ToString(++versions
[r
]);
1826 ASSERT_OK(transactions
[r
]->Commit());
1827 delete transactions
[r
];
1828 transactions
[r
] = nullptr;
1829 current_data
[key
] = value
;
1832 auto* snapshot
= db
->GetSnapshot();
1833 VerifyKeys(current_data
, snapshot
);
1834 snapshots
.push_back(snapshot
);
1835 snapshot_data
.push_back(current_data
);
1837 VerifyKeys(current_data
);
1839 // Take a last snapshot to test compaction with uncommitted prepared
1841 snapshots
.push_back(db
->GetSnapshot());
1842 snapshot_data
.push_back(current_data
);
1844 assert(snapshots
.size() == snapshot_data
.size());
1845 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
1846 VerifyKeys(snapshot_data
[i
], snapshots
[i
]);
1848 ASSERT_OK(db
->Flush(FlushOptions()));
1849 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
1850 VerifyKeys(snapshot_data
[i
], snapshots
[i
]);
1852 // Dummy keys to avoid compaction trivially move files and get around actual
1853 // compaction logic.
1854 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
1855 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
1856 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1857 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
1858 VerifyKeys(snapshot_data
[i
], snapshots
[i
]);
1861 for (size_t i
= 0; i
< kNumTransactions
; i
++) {
1862 if (transactions
[i
] == nullptr) {
1865 ASSERT_OK(transactions
[i
]->Commit());
1866 delete transactions
[i
];
1868 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
1869 db
->ReleaseSnapshot(snapshots
[i
]);
1873 // Compaction should not apply the optimization to output key with sequence
1874 // number equal to 0 if the key is not visible to earliest snapshot, based on
1875 // commit sequence number.
1876 TEST_P(WritePreparedTransactionTest
,
1877 CompactionShouldKeepSequenceForUncommittedKeys
) {
1878 options
.disable_auto_compactions
= true;
1880 // Keep track of expected sequence number.
1881 SequenceNumber expected_seq
= 0;
1882 auto* transaction
= db
->BeginTransaction(WriteOptions());
1883 ASSERT_OK(transaction
->SetName("txn"));
1884 ASSERT_OK(transaction
->Put("key1", "value1"));
1885 ASSERT_OK(transaction
->Prepare());
1886 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
1887 SequenceNumber seq1
= expected_seq
;
1888 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2"));
1889 DBImpl
* db_impl
= reinterpret_cast<DBImpl
*>(db
->GetRootDB());
1890 expected_seq
++; // one for data
1891 if (options
.two_write_queues
) {
1892 expected_seq
++; // one for commit
1894 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
1895 ASSERT_OK(db
->Flush(FlushOptions()));
1896 // Dummy keys to avoid compaction trivially move files and get around actual
1897 // compaction logic.
1898 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
1899 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
1900 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1902 {"key1", "NOT_FOUND"},
1905 VerifyInternalKeys({
1906 // "key1" has not been committed. It keeps its sequence number.
1907 {"key1", "value1", seq1
, kTypeValue
},
1908 // "key2" is committed and output with seq = 0.
1909 {"key2", "value2", 0, kTypeValue
},
1911 ASSERT_OK(transaction
->Commit());
1919 TEST_P(WritePreparedTransactionTest
, Iterate
) {
1920 auto verify_state
= [](Iterator
* iter
, const std::string
& key
,
1921 const std::string
& value
) {
1922 ASSERT_TRUE(iter
->Valid());
1923 ASSERT_OK(iter
->status());
1924 ASSERT_EQ(key
, iter
->key().ToString());
1925 ASSERT_EQ(value
, iter
->value().ToString());
1928 auto verify_iter
= [&](const std::string
& expected_val
) {
1929 // Get iterator from a concurrent transaction and make sure it has the
1930 // same view as an iterator from the DB.
1931 auto* txn
= db
->BeginTransaction(WriteOptions());
1933 for (int i
= 0; i
< 2; i
++) {
1934 Iterator
* iter
= (i
== 0)
1935 ? db
->NewIterator(ReadOptions())
1936 : txn
->GetIterator(ReadOptions());
1939 verify_state(iter
, "foo", expected_val
);
1942 verify_state(iter
, "a", "va");
1944 verify_state(iter
, "foo", expected_val
);
1946 iter
->SeekForPrev("y");
1947 verify_state(iter
, "foo", expected_val
);
1949 iter
->SeekForPrev("z");
1950 verify_state(iter
, "z", "vz");
1952 verify_state(iter
, "foo", expected_val
);
1958 ASSERT_OK(db
->Put(WriteOptions(), "foo", "v1"));
1959 auto* transaction
= db
->BeginTransaction(WriteOptions());
1960 ASSERT_OK(transaction
->SetName("txn"));
1961 ASSERT_OK(transaction
->Put("foo", "v2"));
1962 ASSERT_OK(transaction
->Prepare());
1963 VerifyKeys({{"foo", "v1"}});
1965 ASSERT_OK(db
->Put(WriteOptions(), "a", "va"));
1966 ASSERT_OK(db
->Put(WriteOptions(), "z", "vz"));
1968 ASSERT_OK(transaction
->Commit());
1969 VerifyKeys({{"foo", "v2"}});
1974 TEST_P(WritePreparedTransactionTest
, IteratorRefreshNotSupported
) {
1975 Iterator
* iter
= db
->NewIterator(ReadOptions());
1976 ASSERT_TRUE(iter
->Refresh().IsNotSupported());
1980 // Test that updating the commit map will not affect the existing snapshots
1981 TEST_P(WritePreparedTransactionTest
, AtomicCommit
) {
1982 for (bool skip_prepare
: {true, false}) {
1983 rocksdb::SyncPoint::GetInstance()->LoadDependency({
1984 {"WritePreparedTxnDB::AddCommitted:start",
1985 "AtomicCommit::GetSnapshot:start"},
1986 {"AtomicCommit::Get:end",
1987 "WritePreparedTxnDB::AddCommitted:start:pause"},
1988 {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
1989 {"AtomicCommit::Get2:end",
1990 "WritePreparedTxnDB::AddCommitted:end:pause:"},
1992 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1993 rocksdb::port::Thread
write_thread([&]() {
1995 db
->Put(WriteOptions(), Slice("key"), Slice("value"));
1998 db
->BeginTransaction(WriteOptions(), TransactionOptions());
1999 ASSERT_OK(txn
->SetName("xid"));
2000 ASSERT_OK(txn
->Put(Slice("key"), Slice("value")));
2001 ASSERT_OK(txn
->Prepare());
2002 ASSERT_OK(txn
->Commit());
2006 rocksdb::port::Thread
read_thread([&]() {
2007 ReadOptions roptions
;
2008 TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
2009 roptions
.snapshot
= db
->GetSnapshot();
2011 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key", &val
);
2012 TEST_SYNC_POINT("AtomicCommit::Get:end");
2013 TEST_SYNC_POINT("AtomicCommit::Get2:start");
2014 ASSERT_SAME(roptions
, db
, s
, val
, "key");
2015 TEST_SYNC_POINT("AtomicCommit::Get2:end");
2016 db
->ReleaseSnapshot(roptions
.snapshot
);
2019 write_thread
.join();
2020 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2024 // Test that we can change write policy from WriteCommitted to WritePrepared
2025 // after a clean shutdown (which would empty the WAL)
2026 TEST_P(WritePreparedTransactionTest
, WP_WC_DBBackwardCompatibility
) {
2027 bool empty_wal
= true;
2028 CrossCompatibilityTest(WRITE_COMMITTED
, WRITE_PREPARED
, empty_wal
);
2031 // Test that we fail fast if WAL is not emptied between changing the write
2032 // policy from WriteCommitted to WritePrepared
2033 TEST_P(WritePreparedTransactionTest
, WP_WC_WALBackwardIncompatibility
) {
2034 bool empty_wal
= true;
2035 CrossCompatibilityTest(WRITE_COMMITTED
, WRITE_PREPARED
, !empty_wal
);
2038 // Test that we can change write policy from WritePrepare back to WriteCommitted
2039 // after a clean shutdown (which would empty the WAL)
2040 TEST_P(WritePreparedTransactionTest
, WC_WP_ForwardCompatibility
) {
2041 bool empty_wal
= true;
2042 CrossCompatibilityTest(WRITE_PREPARED
, WRITE_COMMITTED
, empty_wal
);
2045 // Test that we fail fast if WAL is not emptied between changing the write
2046 // policy from WriteCommitted to WritePrepared
2047 TEST_P(WritePreparedTransactionTest
, WC_WP_WALForwardIncompatibility
) {
2048 bool empty_wal
= true;
2049 CrossCompatibilityTest(WRITE_PREPARED
, WRITE_COMMITTED
, !empty_wal
);
2052 } // namespace rocksdb
2054 int main(int argc
, char** argv
) {
2055 ::testing::InitGoogleTest(&argc
, argv
);
2056 return RUN_ALL_TESTS();
2062 int main(int /*argc*/, char** /*argv*/) {
2064 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
2068 #endif // ROCKSDB_LITE