1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
15 #include "db/db_impl/db_impl.h"
16 #include "db/dbformat.h"
17 #include "port/port.h"
18 #include "port/stack_trace.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/types.h"
22 #include "rocksdb/utilities/debug.h"
23 #include "rocksdb/utilities/transaction.h"
24 #include "rocksdb/utilities/transaction_db.h"
25 #include "table/mock_table.h"
26 #include "test_util/sync_point.h"
27 #include "test_util/testharness.h"
28 #include "test_util/testutil.h"
29 #include "test_util/transaction_test_util.h"
30 #include "util/mutexlock.h"
31 #include "util/random.h"
32 #include "util/string_util.h"
33 #include "utilities/fault_injection_env.h"
34 #include "utilities/merge_operators.h"
35 #include "utilities/merge_operators/string_append/stringappend.h"
36 #include "utilities/transactions/pessimistic_transaction_db.h"
37 #include "utilities/transactions/transaction_test.h"
38 #include "utilities/transactions/write_prepared_txn_db.h"
42 namespace ROCKSDB_NAMESPACE
{
44 using CommitEntry
= WritePreparedTxnDB::CommitEntry
;
45 using CommitEntry64b
= WritePreparedTxnDB::CommitEntry64b
;
46 using CommitEntry64bFormat
= WritePreparedTxnDB::CommitEntry64bFormat
;
48 TEST(PreparedHeap
, BasicsTest
) {
49 WritePreparedTxnDB::PreparedHeap heap
;
51 MutexLock
ml(heap
.push_pop_mutex());
53 // Test with one element
54 ASSERT_EQ(14l, heap
.top());
57 // Test that old min is still on top
58 ASSERT_EQ(14l, heap
.top());
65 // Test that old min is still on top
66 ASSERT_EQ(14l, heap
.top());
68 // Test that old min is still on top
69 ASSERT_EQ(14l, heap
.top());
71 // Test that the new comes to the top after multiple erase
72 ASSERT_EQ(34l, heap
.top());
74 // Test that the new comes to the top after single erase
75 ASSERT_EQ(44l, heap
.top());
77 ASSERT_EQ(44l, heap
.top());
78 heap
.pop(); // pop 44l
79 // Test that the erased items are ignored after pop
80 ASSERT_EQ(64l, heap
.top());
82 // Test that erasing an already popped item would work
83 ASSERT_EQ(64l, heap
.top());
85 ASSERT_EQ(64l, heap
.top());
87 MutexLock
ml(heap
.push_pop_mutex());
99 // Test top remains the same after a random order of many erases
100 ASSERT_EQ(64l, heap
.top());
102 // Test that pop works with a series of random pending erases
103 ASSERT_EQ(74l, heap
.top());
104 ASSERT_FALSE(heap
.empty());
106 // Test that empty works
107 ASSERT_TRUE(heap
.empty());
110 // This is a scenario reconstructed from a buggy trace. Test that the bug does
111 // not resurface again.
112 TEST(PreparedHeap
, EmptyAtTheEnd
) {
113 WritePreparedTxnDB::PreparedHeap heap
;
115 MutexLock
ml(heap
.push_pop_mutex());
118 ASSERT_EQ(40l, heap
.top());
119 // Although not a recommended scenario, we must be resilient against erase
120 // without a prior push.
122 ASSERT_EQ(40l, heap
.top());
124 MutexLock
ml(heap
.push_pop_mutex());
127 ASSERT_EQ(40l, heap
.top());
130 ASSERT_EQ(40l, heap
.top());
132 ASSERT_TRUE(heap
.empty());
135 MutexLock
ml(heap
.push_pop_mutex());
138 ASSERT_EQ(40l, heap
.top());
140 ASSERT_EQ(40l, heap
.top());
142 MutexLock
ml(heap
.push_pop_mutex());
145 ASSERT_EQ(40l, heap
.top());
148 // Test that the erase has not emptied the heap (we had a bug doing that)
149 ASSERT_FALSE(heap
.empty());
150 ASSERT_EQ(60l, heap
.top());
152 ASSERT_TRUE(heap
.empty());
155 // Generate random order of PreparedHeap access and test that the heap will be
156 // successfully emptied at the end.
157 TEST(PreparedHeap
, Concurrent
) {
158 const size_t t_cnt
= 10;
159 ROCKSDB_NAMESPACE::port::Thread t
[t_cnt
+ 1];
160 WritePreparedTxnDB::PreparedHeap heap
;
161 port::RWMutex prepared_mutex
;
162 std::atomic
<size_t> last
;
164 for (size_t n
= 0; n
< 100; n
++) {
166 t
[0] = ROCKSDB_NAMESPACE::port::Thread([&]() {
168 for (size_t seq
= 1; seq
<= t_cnt
; seq
++) {
169 // This is not recommended usage but we should be resilient against it.
170 bool skip_push
= rnd
.OneIn(5);
172 MutexLock
ml(heap
.push_pop_mutex());
173 std::this_thread::yield();
179 for (size_t i
= 1; i
<= t_cnt
; i
++) {
181 ROCKSDB_NAMESPACE::port::Thread([&heap
, &prepared_mutex
, &last
, i
]() {
184 std::this_thread::yield();
185 } while (last
.load() < seq
);
186 WriteLock
wl(&prepared_mutex
);
190 for (size_t i
= 0; i
<= t_cnt
; i
++) {
193 ASSERT_TRUE(heap
.empty());
197 // Test that WriteBatchWithIndex correctly counts the number of sub-batches
198 TEST(WriteBatchWithIndex
, SubBatchCnt
) {
199 ColumnFamilyOptions cf_options
;
200 std::string cf_name
= "two";
203 options
.create_if_missing
= true;
204 const std::string dbname
= test::PerThreadDBPath("transaction_testdb");
205 EXPECT_OK(DestroyDB(dbname
, options
));
206 ASSERT_OK(DB::Open(options
, dbname
, &db
));
207 ColumnFamilyHandle
* cf_handle
= nullptr;
208 ASSERT_OK(db
->CreateColumnFamily(cf_options
, cf_name
, &cf_handle
));
209 WriteOptions write_options
;
210 size_t batch_cnt
= 1;
211 size_t save_points
= 0;
212 std::vector
<size_t> batch_cnt_at
;
213 WriteBatchWithIndex
batch(db
->DefaultColumnFamily()->GetComparator(), 0, true,
215 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
216 batch_cnt_at
.push_back(batch_cnt
);
217 batch
.SetSavePoint();
219 ASSERT_OK(batch
.Put(Slice("key"), Slice("value")));
220 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
221 batch_cnt_at
.push_back(batch_cnt
);
222 batch
.SetSavePoint();
224 ASSERT_OK(batch
.Put(Slice("key2"), Slice("value2")));
225 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
226 // duplicate the keys
227 batch_cnt_at
.push_back(batch_cnt
);
228 batch
.SetSavePoint();
230 ASSERT_OK(batch
.Put(Slice("key"), Slice("value3")));
232 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
233 // duplicate the 2nd key. It should not be counted duplicate since a
234 // sub-patch is cut after the last duplicate.
235 batch_cnt_at
.push_back(batch_cnt
);
236 batch
.SetSavePoint();
238 ASSERT_OK(batch
.Put(Slice("key2"), Slice("value4")));
239 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
240 // duplicate the keys but in a different cf. It should not be counted as
242 batch_cnt_at
.push_back(batch_cnt
);
243 batch
.SetSavePoint();
245 ASSERT_OK(batch
.Put(cf_handle
, Slice("key"), Slice("value5")));
246 ASSERT_EQ(batch_cnt
, batch
.SubBatchCnt());
248 // Test that the number of sub-batches matches what we count with
250 std::map
<uint32_t, const Comparator
*> comparators
;
251 comparators
[0] = db
->DefaultColumnFamily()->GetComparator();
252 comparators
[cf_handle
->GetID()] = cf_handle
->GetComparator();
253 SubBatchCounter
counter(comparators
);
254 ASSERT_OK(batch
.GetWriteBatch()->Iterate(&counter
));
255 ASSERT_EQ(batch_cnt
, counter
.BatchCount());
257 // Test that RollbackToSavePoint will properly resets the number of
259 for (size_t i
= save_points
; i
> 0; i
--) {
260 ASSERT_OK(batch
.RollbackToSavePoint());
261 ASSERT_EQ(batch_cnt_at
[i
- 1], batch
.SubBatchCnt());
264 // Test the count is right with random batches
266 const size_t TOTAL_KEYS
= 20; // 20 ~= 10 to cause a few randoms
268 std::string keys
[TOTAL_KEYS
];
269 for (size_t k
= 0; k
< TOTAL_KEYS
; k
++) {
270 int len
= static_cast<int>(rnd
.Uniform(50));
271 keys
[k
] = test::RandomKey(&rnd
, len
);
273 for (size_t i
= 0; i
< 1000; i
++) { // 1000 random batches
274 WriteBatchWithIndex
rndbatch(db
->DefaultColumnFamily()->GetComparator(),
276 for (size_t k
= 0; k
< 10; k
++) { // 10 key per batch
277 size_t ki
= static_cast<size_t>(rnd
.Uniform(TOTAL_KEYS
));
278 Slice key
= Slice(keys
[ki
]);
279 std::string tmp
= rnd
.RandomString(16);
280 Slice value
= Slice(tmp
);
281 ASSERT_OK(rndbatch
.Put(key
, value
));
283 SubBatchCounter
batch_counter(comparators
);
284 ASSERT_OK(rndbatch
.GetWriteBatch()->Iterate(&batch_counter
));
285 ASSERT_EQ(rndbatch
.SubBatchCnt(), batch_counter
.BatchCount());
293 TEST(CommitEntry64b
, BasicTest
) {
294 const size_t INDEX_BITS
= static_cast<size_t>(21);
295 const size_t INDEX_SIZE
= static_cast<size_t>(1ull << INDEX_BITS
);
296 const CommitEntry64bFormat
FORMAT(static_cast<size_t>(INDEX_BITS
));
298 // zero-initialized CommitEntry64b should indicate an empty entry
299 CommitEntry64b empty_entry64b
;
300 uint64_t empty_index
= 11ul;
301 CommitEntry empty_entry
;
302 bool ok
= empty_entry64b
.Parse(empty_index
, &empty_entry
, FORMAT
);
305 // the zero entry is reserved for un-initialized entries
306 const size_t MAX_COMMIT
= (1 << FORMAT
.COMMIT_BITS
) - 1 - 1;
307 // Samples over the numbers that are covered by that many index bits
308 std::array
<uint64_t, 4> is
= {{0, 1, INDEX_SIZE
/ 2 + 1, INDEX_SIZE
- 1}};
309 // Samples over the numbers that are covered by that many commit bits
310 std::array
<uint64_t, 4> ds
= {{0, 1, MAX_COMMIT
/ 2 + 1, MAX_COMMIT
}};
311 // Iterate over prepare numbers that have i) cover all bits of a sequence
312 // number, and ii) include some bits that fall into the range of index or
314 for (uint64_t base
= 1; base
< kMaxSequenceNumber
; base
*= 2) {
315 for (uint64_t i
: is
) {
316 for (uint64_t d
: ds
) {
317 uint64_t p
= base
+ i
+ d
;
318 for (uint64_t c
: {p
, p
+ d
/ 2, p
+ d
}) {
319 uint64_t index
= p
% INDEX_SIZE
;
320 CommitEntry
before(p
, c
), after
;
321 CommitEntry64b
entry64b(before
, FORMAT
);
322 ok
= entry64b
.Parse(index
, &after
, FORMAT
);
324 if (!(before
== after
)) {
325 printf("base %" PRIu64
" i %" PRIu64
" d %" PRIu64
" p %" PRIu64
326 " c %" PRIu64
" index %" PRIu64
"\n",
327 base
, i
, d
, p
, c
, index
);
329 ASSERT_EQ(before
, after
);
336 class WritePreparedTxnDBMock
: public WritePreparedTxnDB
{
338 WritePreparedTxnDBMock(DBImpl
* db_impl
, TransactionDBOptions
& opt
)
339 : WritePreparedTxnDB(db_impl
, opt
) {}
340 void SetDBSnapshots(const std::vector
<SequenceNumber
>& snapshots
) {
341 snapshots_
= snapshots
;
343 void TakeSnapshot(SequenceNumber seq
) { snapshots_
.push_back(seq
); }
346 const std::vector
<SequenceNumber
> GetSnapshotListFromDB(
347 SequenceNumber
/* unused */) override
{
352 std::vector
<SequenceNumber
> snapshots_
;
355 class WritePreparedTransactionTestBase
: public TransactionTestBase
{
357 WritePreparedTransactionTestBase(bool use_stackable_db
, bool two_write_queue
,
358 TxnDBWritePolicy write_policy
,
359 WriteOrdering write_ordering
)
360 : TransactionTestBase(use_stackable_db
, two_write_queue
, write_policy
,
364 void UpdateTransactionDBOptions(size_t snapshot_cache_bits
,
365 size_t commit_cache_bits
) {
366 txn_db_options
.wp_snapshot_cache_bits
= snapshot_cache_bits
;
367 txn_db_options
.wp_commit_cache_bits
= commit_cache_bits
;
369 void UpdateTransactionDBOptions(size_t snapshot_cache_bits
) {
370 txn_db_options
.wp_snapshot_cache_bits
= snapshot_cache_bits
;
372 // If expect_update is set, check if it actually updated old_commit_map_. If
373 // it did not and yet suggested not to check the next snapshot, do the
374 // opposite to check if it was not a bad suggestion.
375 void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare
, uint64_t commit
,
377 uint64_t next_snapshot
,
378 bool expect_update
) {
379 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
380 // reset old_commit_map_empty_ so that its value indicate whether
381 // old_commit_map_ was updated
382 wp_db
->old_commit_map_empty_
= true;
383 bool check_next
= wp_db
->MaybeUpdateOldCommitMap(prepare
, commit
, snapshot
,
384 snapshot
< next_snapshot
);
385 if (expect_update
== wp_db
->old_commit_map_empty_
) {
386 printf("prepare: %" PRIu64
" commit: %" PRIu64
" snapshot: %" PRIu64
387 " next: %" PRIu64
"\n",
388 prepare
, commit
, snapshot
, next_snapshot
);
390 EXPECT_EQ(!expect_update
, wp_db
->old_commit_map_empty_
);
391 if (!check_next
&& wp_db
->old_commit_map_empty_
) {
392 // do the opposite to make sure it was not a bad suggestion
393 const bool dont_care_bool
= true;
394 wp_db
->MaybeUpdateOldCommitMap(prepare
, commit
, next_snapshot
,
396 if (!wp_db
->old_commit_map_empty_
) {
397 printf("prepare: %" PRIu64
" commit: %" PRIu64
" snapshot: %" PRIu64
398 " next: %" PRIu64
"\n",
399 prepare
, commit
, snapshot
, next_snapshot
);
401 EXPECT_TRUE(wp_db
->old_commit_map_empty_
);
405 // Test that a CheckAgainstSnapshots thread reading old_snapshots will not
406 // miss a snapshot because of a concurrent update by UpdateSnapshots that is
407 // writing new_snapshots. Both threads are broken at two points. The sync
408 // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
409 // entry is expected to be vital for one of the snapshots that is common
410 // between the old and new list of snapshots.
411 void SnapshotConcurrentAccessTestInternal(
412 WritePreparedTxnDB
* wp_db
,
413 const std::vector
<SequenceNumber
>& old_snapshots
,
414 const std::vector
<SequenceNumber
>& new_snapshots
, CommitEntry
& entry
,
415 SequenceNumber
& version
, size_t a1
, size_t a2
, size_t b1
, size_t b2
) {
416 // First reset the snapshot list
417 const std::vector
<SequenceNumber
> empty_snapshots
;
418 wp_db
->old_commit_map_empty_
= true;
419 wp_db
->UpdateSnapshots(empty_snapshots
, ++version
);
420 // Then initialize it with the old_snapshots
421 wp_db
->UpdateSnapshots(old_snapshots
, ++version
);
423 // Starting from the first thread, cut each thread at two points
424 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
425 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1
),
426 "WritePreparedTxnDB::UpdateSnapshots:s:start"},
427 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1
),
428 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1
)},
429 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2
),
430 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1
)},
431 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2
),
432 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2
)},
433 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
434 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2
)},
436 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
438 ASSERT_TRUE(wp_db
->old_commit_map_empty_
);
439 ROCKSDB_NAMESPACE::port::Thread
t1(
440 [&]() { wp_db
->UpdateSnapshots(new_snapshots
, version
); });
441 wp_db
->CheckAgainstSnapshots(entry
);
443 ASSERT_FALSE(wp_db
->old_commit_map_empty_
);
445 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
447 wp_db
->old_commit_map_empty_
= true;
448 wp_db
->UpdateSnapshots(empty_snapshots
, ++version
);
449 wp_db
->UpdateSnapshots(old_snapshots
, ++version
);
450 // Starting from the second thread, cut each thread at two points
451 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
452 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1
),
453 "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
454 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1
),
455 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1
)},
456 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2
),
457 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1
)},
458 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2
),
459 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2
)},
460 {"WritePreparedTxnDB::UpdateSnapshots:p:end",
461 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2
)},
463 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
465 ASSERT_TRUE(wp_db
->old_commit_map_empty_
);
466 ROCKSDB_NAMESPACE::port::Thread
t1(
467 [&]() { wp_db
->UpdateSnapshots(new_snapshots
, version
); });
468 wp_db
->CheckAgainstSnapshots(entry
);
470 ASSERT_FALSE(wp_db
->old_commit_map_empty_
);
472 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
475 // Verify value of keys.
476 void VerifyKeys(const std::unordered_map
<std::string
, std::string
>& data
,
477 const Snapshot
* snapshot
= nullptr) {
479 ReadOptions read_options
;
480 read_options
.snapshot
= snapshot
;
481 for (auto& kv
: data
) {
482 auto s
= db
->Get(read_options
, kv
.first
, &value
);
483 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
485 if (kv
.second
!= value
) {
486 printf("key = %s\n", kv
.first
.c_str());
488 ASSERT_EQ(kv
.second
, value
);
490 ASSERT_EQ(kv
.second
, "NOT_FOUND");
493 // Try with MultiGet API too
494 std::vector
<std::string
> values
;
495 auto s_vec
= db
->MultiGet(read_options
, {db
->DefaultColumnFamily()},
496 {kv
.first
}, &values
);
497 ASSERT_EQ(1, values
.size());
498 ASSERT_EQ(1, s_vec
.size());
500 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
502 ASSERT_TRUE(kv
.second
== values
[0]);
504 ASSERT_EQ(kv
.second
, "NOT_FOUND");
509 // Verify all versions of keys.
510 void VerifyInternalKeys(const std::vector
<KeyVersion
>& expected_versions
) {
511 std::vector
<KeyVersion
> versions
;
512 const size_t kMaxKeys
= 100000;
513 ASSERT_OK(GetAllKeyVersions(db
, expected_versions
.front().user_key
,
514 expected_versions
.back().user_key
, kMaxKeys
,
516 ASSERT_EQ(expected_versions
.size(), versions
.size());
517 for (size_t i
= 0; i
< versions
.size(); i
++) {
518 ASSERT_EQ(expected_versions
[i
].user_key
, versions
[i
].user_key
);
519 ASSERT_EQ(expected_versions
[i
].sequence
, versions
[i
].sequence
);
520 ASSERT_EQ(expected_versions
[i
].type
, versions
[i
].type
);
521 if (versions
[i
].type
!= kTypeDeletion
&&
522 versions
[i
].type
!= kTypeSingleDeletion
) {
523 ASSERT_EQ(expected_versions
[i
].value
, versions
[i
].value
);
525 // Range delete not supported.
526 ASSERT_NE(expected_versions
[i
].type
, kTypeRangeDeletion
);
531 class WritePreparedTransactionTest
532 : public WritePreparedTransactionTestBase
,
533 virtual public ::testing::WithParamInterface
<
534 std::tuple
<bool, bool, TxnDBWritePolicy
, WriteOrdering
>> {
536 WritePreparedTransactionTest()
537 : WritePreparedTransactionTestBase(
538 std::get
<0>(GetParam()), std::get
<1>(GetParam()),
539 std::get
<2>(GetParam()), std::get
<3>(GetParam())){};
542 #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
543 class SnapshotConcurrentAccessTest
544 : public WritePreparedTransactionTestBase
,
545 virtual public ::testing::WithParamInterface
<std::tuple
<
546 bool, bool, TxnDBWritePolicy
, WriteOrdering
, size_t, size_t>> {
548 SnapshotConcurrentAccessTest()
549 : WritePreparedTransactionTestBase(
550 std::get
<0>(GetParam()), std::get
<1>(GetParam()),
551 std::get
<2>(GetParam()), std::get
<3>(GetParam())),
552 split_id_(std::get
<4>(GetParam())),
553 split_cnt_(std::get
<5>(GetParam())){};
556 // A test is split into split_cnt_ tests, each identified with split_id_ where
557 // 0 <= split_id_ < split_cnt_
561 #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
563 class SeqAdvanceConcurrentTest
564 : public WritePreparedTransactionTestBase
,
565 virtual public ::testing::WithParamInterface
<std::tuple
<
566 bool, bool, TxnDBWritePolicy
, WriteOrdering
, size_t, size_t>> {
568 SeqAdvanceConcurrentTest()
569 : WritePreparedTransactionTestBase(
570 std::get
<0>(GetParam()), std::get
<1>(GetParam()),
571 std::get
<2>(GetParam()), std::get
<3>(GetParam())),
572 split_id_(std::get
<4>(GetParam())),
573 split_cnt_(std::get
<5>(GetParam())) {
574 special_env
.skip_fsync_
= true;
578 // A test is split into split_cnt_ tests, each identified with split_id_ where
579 // 0 <= split_id_ < split_cnt_
584 INSTANTIATE_TEST_CASE_P(
585 WritePreparedTransaction
, WritePreparedTransactionTest
,
587 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
),
588 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
),
589 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
)));
591 #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
592 INSTANTIATE_TEST_CASE_P(
593 TwoWriteQueues
, SnapshotConcurrentAccessTest
,
595 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 0, 20),
596 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 1, 20),
597 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 2, 20),
598 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 3, 20),
599 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 4, 20),
600 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 5, 20),
601 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 6, 20),
602 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 7, 20),
603 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 8, 20),
604 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 9, 20),
605 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 10, 20),
606 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 11, 20),
607 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 12, 20),
608 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 13, 20),
609 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 14, 20),
610 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 15, 20),
611 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 16, 20),
612 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 17, 20),
613 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 18, 20),
614 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 19, 20),
616 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 0, 20),
617 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 1, 20),
618 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 2, 20),
619 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 3, 20),
620 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 4, 20),
621 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 5, 20),
622 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 6, 20),
623 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 7, 20),
624 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 8, 20),
625 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 9, 20),
626 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 10, 20),
627 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 11, 20),
628 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 12, 20),
629 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 13, 20),
630 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 14, 20),
631 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 15, 20),
632 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 16, 20),
633 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 17, 20),
634 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 18, 20),
635 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 19, 20)));
637 INSTANTIATE_TEST_CASE_P(
638 OneWriteQueue
, SnapshotConcurrentAccessTest
,
640 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 0, 20),
641 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 1, 20),
642 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 2, 20),
643 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 3, 20),
644 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 4, 20),
645 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 5, 20),
646 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 6, 20),
647 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 7, 20),
648 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 8, 20),
649 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 9, 20),
650 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 10, 20),
651 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 11, 20),
652 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 12, 20),
653 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 13, 20),
654 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 14, 20),
655 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 15, 20),
656 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 16, 20),
657 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 17, 20),
658 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 18, 20),
659 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 19, 20)));
661 INSTANTIATE_TEST_CASE_P(
662 TwoWriteQueues
, SeqAdvanceConcurrentTest
,
664 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 0, 10),
665 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 1, 10),
666 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 2, 10),
667 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 3, 10),
668 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 4, 10),
669 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 5, 10),
670 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 6, 10),
671 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 7, 10),
672 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 8, 10),
673 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, 9, 10),
674 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 0, 10),
675 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 1, 10),
676 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 2, 10),
677 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 3, 10),
678 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 4, 10),
679 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 5, 10),
680 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 6, 10),
681 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 7, 10),
682 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 8, 10),
683 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, 9, 10)));
685 INSTANTIATE_TEST_CASE_P(
686 OneWriteQueue
, SeqAdvanceConcurrentTest
,
688 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 0, 10),
689 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 1, 10),
690 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 2, 10),
691 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 3, 10),
692 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 4, 10),
693 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 5, 10),
694 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 6, 10),
695 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 7, 10),
696 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 8, 10),
697 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, 9, 10)));
698 #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
700 TEST_P(WritePreparedTransactionTest
, CommitMap
) {
701 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
702 ASSERT_NE(wp_db
, nullptr);
703 ASSERT_NE(wp_db
->db_impl_
, nullptr);
704 size_t size
= wp_db
->COMMIT_CACHE_SIZE
;
705 CommitEntry c
= {5, 12}, e
;
706 bool evicted
= wp_db
->AddCommitEntry(c
.prep_seq
% size
, c
, &e
);
707 ASSERT_FALSE(evicted
);
709 // Should be able to read the same value
710 CommitEntry64b dont_care
;
711 bool found
= wp_db
->GetCommitEntry(c
.prep_seq
% size
, &dont_care
, &e
);
714 // Should be able to distinguish between overlapping entries
715 found
= wp_db
->GetCommitEntry((c
.prep_seq
+ size
) % size
, &dont_care
, &e
);
717 ASSERT_NE(c
.prep_seq
+ size
, e
.prep_seq
);
718 // Should be able to detect non-existent entry
719 found
= wp_db
->GetCommitEntry((c
.prep_seq
+ 1) % size
, &dont_care
, &e
);
722 // Reject an invalid exchange
723 CommitEntry e2
= {c
.prep_seq
+ size
, c
.commit_seq
+ size
};
724 CommitEntry64b
e2_64b(e2
, wp_db
->FORMAT
);
725 bool exchanged
= wp_db
->ExchangeCommitEntry(e2
.prep_seq
% size
, e2_64b
, e
);
726 ASSERT_FALSE(exchanged
);
727 // check whether it did actually reject that
728 found
= wp_db
->GetCommitEntry(e2
.prep_seq
% size
, &dont_care
, &e
);
732 // Accept a valid exchange
733 CommitEntry64b
c_64b(c
, wp_db
->FORMAT
);
734 CommitEntry e3
= {c
.prep_seq
+ size
, c
.commit_seq
+ size
+ 1};
735 exchanged
= wp_db
->ExchangeCommitEntry(c
.prep_seq
% size
, c_64b
, e3
);
736 ASSERT_TRUE(exchanged
);
737 // check whether it did actually accepted that
738 found
= wp_db
->GetCommitEntry(c
.prep_seq
% size
, &dont_care
, &e
);
743 CommitEntry e4
= {e3
.prep_seq
+ size
, e3
.commit_seq
+ size
+ 1};
744 evicted
= wp_db
->AddCommitEntry(e4
.prep_seq
% size
, e4
, &e
);
745 ASSERT_TRUE(evicted
);
747 found
= wp_db
->GetCommitEntry(e4
.prep_seq
% size
, &dont_care
, &e
);
752 TEST_P(WritePreparedTransactionTest
, MaybeUpdateOldCommitMap
) {
753 // If prepare <= snapshot < commit we should keep the entry around since its
754 // nonexistence could be interpreted as committed in the snapshot while it is
755 // not true. We keep such entries around by adding them to the
757 uint64_t p
/*prepare*/, c
/*commit*/, s
/*snapshot*/, ns
/*next_snapshot*/;
758 p
= 10l, c
= 15l, s
= 20l, ns
= 21l;
759 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
760 // If we do not expect the old commit map to be updated, try also with a next
761 // snapshot that is expected to update the old commit map. This would test
762 // that MaybeUpdateOldCommitMap would not prevent us from checking the next
763 // snapshot that must be checked.
764 p
= 10l, c
= 15l, s
= 20l, ns
= 11l;
765 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
767 p
= 10l, c
= 20l, s
= 20l, ns
= 19l;
768 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
769 p
= 10l, c
= 20l, s
= 20l, ns
= 21l;
770 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
772 p
= 20l, c
= 20l, s
= 20l, ns
= 21l;
773 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
774 p
= 20l, c
= 20l, s
= 20l, ns
= 19l;
775 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
777 p
= 10l, c
= 25l, s
= 20l, ns
= 21l;
778 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, true);
780 p
= 20l, c
= 25l, s
= 20l, ns
= 21l;
781 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, true);
783 p
= 21l, c
= 25l, s
= 20l, ns
= 22l;
784 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
785 p
= 21l, c
= 25l, s
= 20l, ns
= 19l;
786 MaybeUpdateOldCommitMapTestWithNext(p
, c
, s
, ns
, false);
789 // Trigger the condition where some old memtables are skipped when doing
790 // TransactionUtil::CheckKey(), and make sure the result is still correct.
791 TEST_P(WritePreparedTransactionTest
, CheckKeySkipOldMemtable
) {
792 const int kAttemptHistoryMemtable
= 0;
793 const int kAttemptImmMemTable
= 1;
794 for (int attempt
= kAttemptHistoryMemtable
; attempt
<= kAttemptImmMemTable
;
796 options
.max_write_buffer_number_to_maintain
= 3;
799 WriteOptions write_options
;
800 ReadOptions read_options
;
801 TransactionOptions txn_options
;
802 txn_options
.set_snapshot
= true;
805 ASSERT_OK(db
->Put(write_options
, Slice("foo"), Slice("bar")));
806 ASSERT_OK(db
->Put(write_options
, Slice("foo2"), Slice("bar")));
808 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
809 ASSERT_TRUE(txn
!= nullptr);
810 ASSERT_OK(txn
->SetName("txn"));
812 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
813 ASSERT_TRUE(txn2
!= nullptr);
814 ASSERT_OK(txn2
->SetName("txn2"));
816 // This transaction is created to cause potential conflict.
817 Transaction
* txn_x
= db
->BeginTransaction(write_options
);
818 ASSERT_OK(txn_x
->SetName("txn_x"));
819 ASSERT_OK(txn_x
->Put(Slice("foo"), Slice("bar3")));
820 ASSERT_OK(txn_x
->Prepare());
822 // Create snapshots after the prepare, but there should still
823 // be a conflict when trying to read "foo".
825 if (attempt
== kAttemptImmMemTable
) {
826 // For the second attempt, hold flush from beginning. The memtable
827 // will be switched to immutable after calling TEST_SwitchMemtable()
828 // while CheckKey() is called.
829 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
830 {{"WritePreparedTransactionTest.CheckKeySkipOldMemtable",
831 "FlushJob::Start"}});
832 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
835 // force a memtable flush. The memtable should still be kept
836 FlushOptions flush_ops
;
837 if (attempt
== kAttemptHistoryMemtable
) {
838 ASSERT_OK(db
->Flush(flush_ops
));
840 ASSERT_EQ(attempt
, kAttemptImmMemTable
);
841 DBImpl
* db_impl
= static_cast<DBImpl
*>(db
->GetRootDB());
842 ASSERT_OK(db_impl
->TEST_SwitchMemtable());
844 uint64_t num_imm_mems
;
845 ASSERT_TRUE(db
->GetIntProperty(DB::Properties::kNumImmutableMemTable
,
847 if (attempt
== kAttemptHistoryMemtable
) {
848 ASSERT_EQ(0, num_imm_mems
);
850 ASSERT_EQ(attempt
, kAttemptImmMemTable
);
851 ASSERT_EQ(1, num_imm_mems
);
854 // Put something in active memtable
855 ASSERT_OK(db
->Put(write_options
, Slice("foo3"), Slice("bar")));
857 // Create txn3 after flushing, but this transaction also needs to
858 // check all memtables because of they contains uncommitted data.
859 Transaction
* txn3
= db
->BeginTransaction(write_options
, txn_options
);
860 ASSERT_TRUE(txn3
!= nullptr);
861 ASSERT_OK(txn3
->SetName("txn3"));
863 // Commit the pending write
864 ASSERT_OK(txn_x
->Commit());
866 // Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will
867 // pass. In all cases, both memtables are queried.
868 SetPerfLevel(PerfLevel::kEnableCount
);
869 get_perf_context()->Reset();
870 ASSERT_TRUE(txn3
->GetForUpdate(read_options
, "foo", &value
).IsBusy());
871 // We should have checked two memtables, active and either immutable
872 // or history memtable, depending on the test case.
873 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count
);
875 get_perf_context()->Reset();
876 ASSERT_TRUE(txn
->GetForUpdate(read_options
, "foo", &value
).IsBusy());
877 // We should have checked two memtables, active and either immutable
878 // or history memtable, depending on the test case.
879 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count
);
881 get_perf_context()->Reset();
882 ASSERT_OK(txn2
->GetForUpdate(read_options
, "foo2", &value
));
883 ASSERT_EQ(value
, "bar");
884 // We should have checked two memtables, and since there is no
885 // conflict, another Get() will be made and fetch the data from
886 // DB. If it is in immutable memtable, two extra memtable reads
887 // will be issued. If it is not (in history), only one will
888 // be made, which is to the active memtable.
889 if (attempt
== kAttemptHistoryMemtable
) {
890 ASSERT_EQ(3, get_perf_context()->get_from_memtable_count
);
892 ASSERT_EQ(attempt
, kAttemptImmMemTable
);
893 ASSERT_EQ(4, get_perf_context()->get_from_memtable_count
);
896 Transaction
* txn4
= db
->BeginTransaction(write_options
, txn_options
);
897 ASSERT_TRUE(txn4
!= nullptr);
898 ASSERT_OK(txn4
->SetName("txn4"));
899 get_perf_context()->Reset();
900 ASSERT_OK(txn4
->GetForUpdate(read_options
, "foo", &value
));
901 if (attempt
== kAttemptHistoryMemtable
) {
902 // Active memtable will be checked in snapshot validation and when
903 // getting the value.
904 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count
);
906 // Only active memtable will be checked in snapshot validation but
907 // both of active and immutable snapshot will be queried when
908 // getting the value.
909 ASSERT_EQ(attempt
, kAttemptImmMemTable
);
910 ASSERT_EQ(3, get_perf_context()->get_from_memtable_count
);
913 ASSERT_OK(txn2
->Commit());
914 ASSERT_OK(txn4
->Commit());
916 TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable");
917 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
919 SetPerfLevel(PerfLevel::kDisable
);
929 // Reproduce the bug with two snapshots with the same seuqence number and test
930 // that the release of the first snapshot will not affect the reads by the other
932 TEST_P(WritePreparedTransactionTest
, DoubleSnapshot
) {
933 TransactionOptions txn_options
;
936 // Insert initial value
937 ASSERT_OK(db
->Put(WriteOptions(), "key", "value1"));
939 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
941 wp_db
->BeginTransaction(WriteOptions(), txn_options
, nullptr);
942 ASSERT_OK(txn
->SetName("txn"));
943 ASSERT_OK(txn
->Put("key", "value2"));
944 ASSERT_OK(txn
->Prepare());
945 // Three snapshots with the same seq number
946 const Snapshot
* snapshot0
= wp_db
->GetSnapshot();
947 const Snapshot
* snapshot1
= wp_db
->GetSnapshot();
948 const Snapshot
* snapshot2
= wp_db
->GetSnapshot();
949 ASSERT_OK(txn
->Commit());
950 SequenceNumber cache_size
= wp_db
->COMMIT_CACHE_SIZE
;
951 SequenceNumber overlap_seq
= txn
->GetId() + cache_size
;
954 // 4th snapshot with a larger seq
955 const Snapshot
* snapshot3
= wp_db
->GetSnapshot();
956 // Cause an eviction to advance max evicted seq number
957 // This also fetches the 4 snapshots from db since their seq is lower than the
959 wp_db
->AddCommitted(overlap_seq
, overlap_seq
);
962 // It should see the value before commit
963 ropt
.snapshot
= snapshot2
;
964 PinnableSlice pinnable_val
;
965 s
= wp_db
->Get(ropt
, wp_db
->DefaultColumnFamily(), "key", &pinnable_val
);
967 ASSERT_TRUE(pinnable_val
== "value1");
968 pinnable_val
.Reset();
970 wp_db
->ReleaseSnapshot(snapshot1
);
972 // It should still see the value before commit
973 s
= wp_db
->Get(ropt
, wp_db
->DefaultColumnFamily(), "key", &pinnable_val
);
975 ASSERT_TRUE(pinnable_val
== "value1");
976 pinnable_val
.Reset();
978 // Cause an eviction to advance max evicted seq number and trigger updating
980 overlap_seq
+= cache_size
;
981 wp_db
->AddCommitted(overlap_seq
, overlap_seq
);
983 // It should still see the value before commit
984 s
= wp_db
->Get(ropt
, wp_db
->DefaultColumnFamily(), "key", &pinnable_val
);
986 ASSERT_TRUE(pinnable_val
== "value1");
987 pinnable_val
.Reset();
989 wp_db
->ReleaseSnapshot(snapshot0
);
990 wp_db
->ReleaseSnapshot(snapshot2
);
991 wp_db
->ReleaseSnapshot(snapshot3
);
994 size_t UniqueCnt(std::vector
<SequenceNumber
> vec
) {
995 std::set
<SequenceNumber
> aset
;
1001 // Test that the entries in old_commit_map_ get garbage collected properly
1002 TEST_P(WritePreparedTransactionTest
, OldCommitMapGC
) {
1003 const size_t snapshot_cache_bits
= 0;
1004 const size_t commit_cache_bits
= 0;
1005 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
1006 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1007 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
1008 new WritePreparedTxnDBMock(mock_db
, txn_db_options
));
1010 SequenceNumber seq
= 0;
1011 // Take the first snapshot that overlaps with two txn
1012 auto prep_seq
= ++seq
;
1013 wp_db
->AddPrepared(prep_seq
);
1014 auto prep_seq2
= ++seq
;
1015 wp_db
->AddPrepared(prep_seq2
);
1016 auto snap_seq1
= seq
;
1017 wp_db
->TakeSnapshot(snap_seq1
);
1018 auto commit_seq
= ++seq
;
1019 wp_db
->AddCommitted(prep_seq
, commit_seq
);
1020 wp_db
->RemovePrepared(prep_seq
);
1021 auto commit_seq2
= ++seq
;
1022 wp_db
->AddCommitted(prep_seq2
, commit_seq2
);
1023 wp_db
->RemovePrepared(prep_seq2
);
1024 // Take the 2nd and 3rd snapshot that overlap with the same txn
1026 wp_db
->AddPrepared(prep_seq
);
1027 auto snap_seq2
= seq
;
1028 wp_db
->TakeSnapshot(snap_seq2
);
1030 auto snap_seq3
= seq
;
1031 wp_db
->TakeSnapshot(snap_seq3
);
1034 wp_db
->AddCommitted(prep_seq
, commit_seq
);
1035 wp_db
->RemovePrepared(prep_seq
);
1036 // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
1037 // only item in the commit_cache_ via another commit.
1039 wp_db
->AddPrepared(prep_seq
);
1041 wp_db
->AddCommitted(prep_seq
, commit_seq
);
1042 wp_db
->RemovePrepared(prep_seq
);
1044 // Verify that the evicted commit entries for all snapshots are in the
1047 ASSERT_FALSE(wp_db
->old_commit_map_empty_
.load());
1048 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
1049 ASSERT_EQ(3, wp_db
->old_commit_map_
.size());
1050 ASSERT_EQ(2, UniqueCnt(wp_db
->old_commit_map_
[snap_seq1
]));
1051 ASSERT_EQ(1, UniqueCnt(wp_db
->old_commit_map_
[snap_seq2
]));
1052 ASSERT_EQ(1, UniqueCnt(wp_db
->old_commit_map_
[snap_seq3
]));
1055 // Verify that the 2nd snapshot is cleaned up after the release
1056 wp_db
->ReleaseSnapshotInternal(snap_seq2
);
1058 ASSERT_FALSE(wp_db
->old_commit_map_empty_
.load());
1059 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
1060 ASSERT_EQ(2, wp_db
->old_commit_map_
.size());
1061 ASSERT_EQ(2, UniqueCnt(wp_db
->old_commit_map_
[snap_seq1
]));
1062 ASSERT_EQ(1, UniqueCnt(wp_db
->old_commit_map_
[snap_seq3
]));
1065 // Verify that the 1st snapshot is cleaned up after the release
1066 wp_db
->ReleaseSnapshotInternal(snap_seq1
);
1068 ASSERT_FALSE(wp_db
->old_commit_map_empty_
.load());
1069 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
1070 ASSERT_EQ(1, wp_db
->old_commit_map_
.size());
1071 ASSERT_EQ(1, UniqueCnt(wp_db
->old_commit_map_
[snap_seq3
]));
1074 // Verify that the 3rd snapshot is cleaned up after the release
1075 wp_db
->ReleaseSnapshotInternal(snap_seq3
);
1077 ASSERT_TRUE(wp_db
->old_commit_map_empty_
.load());
1078 ReadLock
rl(&wp_db
->old_commit_map_mutex_
);
1079 ASSERT_EQ(0, wp_db
->old_commit_map_
.size());
1083 TEST_P(WritePreparedTransactionTest
, CheckAgainstSnapshots
) {
1084 std::vector
<SequenceNumber
> snapshots
= {100l, 200l, 300l, 400l, 500l,
1085 600l, 700l, 800l, 900l};
1086 const size_t snapshot_cache_bits
= 2;
1087 const uint64_t cache_size
= 1ul << snapshot_cache_bits
;
1088 // Safety check to express the intended size in the test. Can be adjusted if
1089 // the snapshots lists changed.
1090 ASSERT_EQ((1ul << snapshot_cache_bits
) * 2 + 1, snapshots
.size());
1091 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
1092 UpdateTransactionDBOptions(snapshot_cache_bits
);
1093 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
1094 new WritePreparedTxnDBMock(mock_db
, txn_db_options
));
1095 SequenceNumber version
= 1000l;
1096 ASSERT_EQ(0, wp_db
->snapshots_total_
);
1097 wp_db
->UpdateSnapshots(snapshots
, version
);
1098 ASSERT_EQ(snapshots
.size(), wp_db
->snapshots_total_
);
1099 // seq numbers are chosen so that we have two of them between each two
1100 // snapshots. If the diff of two consecutive seq is more than 5, there is a
1101 // snapshot between them.
1102 std::vector
<SequenceNumber
> seqs
= {50l, 55l, 150l, 155l, 250l, 255l, 350l,
1103 355l, 450l, 455l, 550l, 555l, 650l, 655l,
1104 750l, 755l, 850l, 855l, 950l, 955l};
1105 ASSERT_GT(seqs
.size(), 1);
1106 for (size_t i
= 0; i
+ 1 < seqs
.size(); i
++) {
1107 wp_db
->old_commit_map_empty_
= true; // reset
1108 CommitEntry commit_entry
= {seqs
[i
], seqs
[i
+ 1]};
1109 wp_db
->CheckAgainstSnapshots(commit_entry
);
1110 // Expect update if there is snapshot in between the prepare and commit
1111 bool expect_update
= commit_entry
.commit_seq
- commit_entry
.prep_seq
> 5 &&
1112 commit_entry
.commit_seq
>= snapshots
.front() &&
1113 commit_entry
.prep_seq
<= snapshots
.back();
1114 ASSERT_EQ(expect_update
, !wp_db
->old_commit_map_empty_
);
1117 // Test that search will include multiple snapshot from snapshot cache
1119 // exclude first and last item in the cache
1120 CommitEntry commit_entry
= {snapshots
.front() + 1,
1121 snapshots
[cache_size
- 1] - 1};
1122 wp_db
->old_commit_map_empty_
= true; // reset
1123 wp_db
->old_commit_map_
.clear();
1124 wp_db
->CheckAgainstSnapshots(commit_entry
);
1125 ASSERT_EQ(wp_db
->old_commit_map_
.size(), cache_size
- 2);
1128 // Test that search will include multiple snapshot from old snapshots
1130 // include two in the middle
1131 CommitEntry commit_entry
= {snapshots
[cache_size
] + 1,
1132 snapshots
[cache_size
+ 2] + 1};
1133 wp_db
->old_commit_map_empty_
= true; // reset
1134 wp_db
->old_commit_map_
.clear();
1135 wp_db
->CheckAgainstSnapshots(commit_entry
);
1136 ASSERT_EQ(wp_db
->old_commit_map_
.size(), 2);
1139 // Test that search will include both snapshot cache and old snapshots
1140 // Case 1: includes all in snapshot cache
1142 CommitEntry commit_entry
= {snapshots
.front() - 1, snapshots
.back() + 1};
1143 wp_db
->old_commit_map_empty_
= true; // reset
1144 wp_db
->old_commit_map_
.clear();
1145 wp_db
->CheckAgainstSnapshots(commit_entry
);
1146 ASSERT_EQ(wp_db
->old_commit_map_
.size(), snapshots
.size());
1149 // Case 2: includes all snapshot caches except the smallest
1151 CommitEntry commit_entry
= {snapshots
.front() + 1, snapshots
.back() + 1};
1152 wp_db
->old_commit_map_empty_
= true; // reset
1153 wp_db
->old_commit_map_
.clear();
1154 wp_db
->CheckAgainstSnapshots(commit_entry
);
1155 ASSERT_EQ(wp_db
->old_commit_map_
.size(), snapshots
.size() - 1);
1158 // Case 3: includes only the largest of snapshot cache
1160 CommitEntry commit_entry
= {snapshots
[cache_size
- 1] - 1,
1161 snapshots
.back() + 1};
1162 wp_db
->old_commit_map_empty_
= true; // reset
1163 wp_db
->old_commit_map_
.clear();
1164 wp_db
->CheckAgainstSnapshots(commit_entry
);
1165 ASSERT_EQ(wp_db
->old_commit_map_
.size(), snapshots
.size() - cache_size
+ 1);
1169 #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
1170 // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
1171 // parallel with UpdateSnapshots.
1172 TEST_P(SnapshotConcurrentAccessTest
, SnapshotConcurrentAccess
) {
1173 // We have a sync point in the method under test after checking each snapshot.
1174 // If you increase the max number of snapshots in this test, more sync points
1175 // in the methods must also be added.
1176 const std::vector
<SequenceNumber
> snapshots
= {10l, 20l, 30l, 40l, 50l,
1177 60l, 70l, 80l, 90l, 100l};
1178 const size_t snapshot_cache_bits
= 2;
1179 // Safety check to express the intended size in the test. Can be adjusted if
1180 // the snapshots lists changed.
1181 ASSERT_EQ((1ul << snapshot_cache_bits
) * 2 + 2, snapshots
.size());
1182 SequenceNumber version
= 1000l;
1183 // Choose the cache size so that the new snapshot list could replace all the
1184 // existing items in the cache and also have some overflow.
1185 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
1186 UpdateTransactionDBOptions(snapshot_cache_bits
);
1187 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
1188 new WritePreparedTxnDBMock(mock_db
, txn_db_options
));
1189 const size_t extra
= 2;
1191 // Add up to extra items that do not fit into the cache
1192 for (size_t old_size
= 1; old_size
<= wp_db
->SNAPSHOT_CACHE_SIZE
+ extra
;
1194 const std::vector
<SequenceNumber
> old_snapshots(
1195 snapshots
.begin(), snapshots
.begin() + old_size
);
1197 // Each member of old snapshot might or might not appear in the new list. We
1198 // create a common_snapshots for each combination.
1199 size_t new_comb_cnt
= size_t(1) << old_size
;
1200 for (size_t new_comb
= 0; new_comb
< new_comb_cnt
; new_comb
++, loop_id
++) {
1201 if (loop_id
% split_cnt_
!= split_id_
) continue;
1202 printf("."); // To signal progress
1204 std::vector
<SequenceNumber
> common_snapshots
;
1205 for (size_t i
= 0; i
< old_snapshots
.size(); i
++) {
1206 if (IsInCombination(i
, new_comb
)) {
1207 common_snapshots
.push_back(old_snapshots
[i
]);
1210 // And add some new snapshots to the common list
1211 for (size_t added_snapshots
= 0;
1212 added_snapshots
<= snapshots
.size() - old_snapshots
.size();
1213 added_snapshots
++) {
1214 std::vector
<SequenceNumber
> new_snapshots
= common_snapshots
;
1215 for (size_t i
= 0; i
< added_snapshots
; i
++) {
1216 new_snapshots
.push_back(snapshots
[old_snapshots
.size() + i
]);
1218 for (auto it
= common_snapshots
.begin(); it
!= common_snapshots
.end();
1220 auto snapshot
= *it
;
1221 // Create a commit entry that is around the snapshot and thus should
1222 // be not be discarded
1223 CommitEntry entry
= {static_cast<uint64_t>(snapshot
- 1),
1225 // The critical part is when iterating the snapshot cache. Afterwards,
1226 // we are operating under the lock
1228 std::min(old_snapshots
.size(), wp_db
->SNAPSHOT_CACHE_SIZE
) + 1;
1230 std::min(new_snapshots
.size(), wp_db
->SNAPSHOT_CACHE_SIZE
) + 1;
1231 // Break each thread at two points
1232 for (size_t a1
= 1; a1
<= a_range
; a1
++) {
1233 for (size_t a2
= a1
+ 1; a2
<= a_range
; a2
++) {
1234 for (size_t b1
= 1; b1
<= b_range
; b1
++) {
1235 for (size_t b2
= b1
+ 1; b2
<= b_range
; b2
++) {
1236 SnapshotConcurrentAccessTestInternal(
1237 wp_db
.get(), old_snapshots
, new_snapshots
, entry
, version
,
1249 #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
1251 // This test clarifies the contract of AdvanceMaxEvictedSeq method
1252 TEST_P(WritePreparedTransactionTest
, AdvanceMaxEvictedSeqBasic
) {
1253 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
1254 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
1255 new WritePreparedTxnDBMock(mock_db
, txn_db_options
));
1257 // 1. Set the initial values for max, prepared, and snapshots
1258 SequenceNumber zero_max
= 0l;
1259 // Set the initial list of prepared txns
1260 const std::vector
<SequenceNumber
> initial_prepared
= {10, 30, 50, 100,
1262 for (auto p
: initial_prepared
) {
1263 wp_db
->AddPrepared(p
);
1265 // This updates the max value and also set old prepared
1266 SequenceNumber init_max
= 100;
1267 wp_db
->AdvanceMaxEvictedSeq(zero_max
, init_max
);
1268 const std::vector
<SequenceNumber
> initial_snapshots
= {20, 40};
1269 wp_db
->SetDBSnapshots(initial_snapshots
);
1270 // This will update the internal cache of snapshots from the DB
1271 wp_db
->UpdateSnapshots(initial_snapshots
, init_max
);
1273 // 2. Invoke AdvanceMaxEvictedSeq
1274 const std::vector
<SequenceNumber
> latest_snapshots
= {20, 110, 220, 300};
1275 wp_db
->SetDBSnapshots(latest_snapshots
);
1276 SequenceNumber new_max
= 200;
1277 wp_db
->AdvanceMaxEvictedSeq(init_max
, new_max
);
1279 // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
1280 // a. max should be updated to new_max
1281 ASSERT_EQ(wp_db
->max_evicted_seq_
, new_max
);
1282 // b. delayed prepared should contain every txn <= max and prepared should
1283 // only contain txns > max
1284 auto it
= initial_prepared
.begin();
1285 for (; it
!= initial_prepared
.end() && *it
<= new_max
; ++it
) {
1286 ASSERT_EQ(1, wp_db
->delayed_prepared_
.erase(*it
));
1288 ASSERT_TRUE(wp_db
->delayed_prepared_
.empty());
1289 for (; it
!= initial_prepared
.end() && !wp_db
->prepared_txns_
.empty();
1290 ++it
, wp_db
->prepared_txns_
.pop()) {
1291 ASSERT_EQ(*it
, wp_db
->prepared_txns_
.top());
1293 ASSERT_TRUE(it
== initial_prepared
.end());
1294 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1295 // c. snapshots should contain everything below new_max
1296 auto sit
= latest_snapshots
.begin();
1297 for (size_t i
= 0; sit
!= latest_snapshots
.end() && *sit
<= new_max
&&
1298 i
< wp_db
->snapshots_total_
;
1300 ASSERT_TRUE(i
< wp_db
->snapshots_total_
);
1301 // This test is in small scale and the list of snapshots are assumed to be
1302 // within the cache size limit. This is just a safety check to double check
1304 ASSERT_TRUE(i
< wp_db
->SNAPSHOT_CACHE_SIZE
);
1305 ASSERT_EQ(*sit
, wp_db
->snapshot_cache_
[i
]);
1309 // A new snapshot should always be always larger than max_evicted_seq_
1310 // Otherwise the snapshot does not go through AdvanceMaxEvictedSeq
1311 TEST_P(WritePreparedTransactionTest
, NewSnapshotLargerThanMax
) {
1312 WriteOptions woptions
;
1313 TransactionOptions txn_options
;
1314 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1315 Transaction
* txn0
= db
->BeginTransaction(woptions
, txn_options
);
1316 ASSERT_OK(txn0
->Put(Slice("key"), Slice("value")));
1317 ASSERT_OK(txn0
->Commit());
1318 const SequenceNumber seq
= txn0
->GetId(); // is also prepare seq
1320 std::vector
<Transaction
*> txns
;
1321 // Inc seq without committing anything
1322 for (int i
= 0; i
< 10; i
++) {
1323 Transaction
* txn
= db
->BeginTransaction(woptions
, txn_options
);
1324 ASSERT_OK(txn
->SetName("xid" + std::to_string(i
)));
1325 ASSERT_OK(txn
->Put(Slice("key" + std::to_string(i
)), Slice("value")));
1326 ASSERT_OK(txn
->Prepare());
1327 txns
.push_back(txn
);
1330 // The new commit is seq + 10
1331 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1332 auto snap
= wp_db
->GetSnapshot();
1333 const SequenceNumber last_seq
= snap
->GetSequenceNumber();
1334 wp_db
->ReleaseSnapshot(snap
);
1335 ASSERT_LT(seq
, last_seq
);
1336 // Otherwise our test is not effective
1337 ASSERT_LT(last_seq
- seq
, wp_db
->INC_STEP_FOR_MAX_EVICTED
);
1339 // Evict seq out of commit cache
1340 const SequenceNumber overwrite_seq
= seq
+ wp_db
->COMMIT_CACHE_SIZE
;
1341 // Check that the next write could make max go beyond last
1342 auto last_max
= wp_db
->max_evicted_seq_
.load();
1343 wp_db
->AddCommitted(overwrite_seq
, overwrite_seq
);
1344 // Check that eviction has advanced the max
1345 ASSERT_LT(last_max
, wp_db
->max_evicted_seq_
.load());
1346 // Check that the new max has not advanced the last seq
1347 ASSERT_LT(wp_db
->max_evicted_seq_
.load(), last_seq
);
1348 for (auto txn
: txns
) {
1354 // A new snapshot should always be always larger than max_evicted_seq_
1355 // In very rare cases max could be below last published seq. Test that
1356 // taking snapshot will wait for max to catch up.
1357 TEST_P(WritePreparedTransactionTest
, MaxCatchupWithNewSnapshot
) {
1358 const size_t snapshot_cache_bits
= 7; // same as default
1359 const size_t commit_cache_bits
= 0; // only 1 entry => frequent eviction
1360 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1361 ASSERT_OK(ReOpen());
1362 WriteOptions woptions
;
1363 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1365 const int writes
= 50;
1366 const int batch_cnt
= 4;
1367 ROCKSDB_NAMESPACE::port::Thread
t1([&]() {
1368 for (int i
= 0; i
< writes
; i
++) {
1370 // For duplicate keys cause 4 commit entries, each evicting an entry that
1371 // is not published yet, thus causing max evicted seq go higher than last
1373 for (int b
= 0; b
< batch_cnt
; b
++) {
1374 ASSERT_OK(batch
.Put("foo", "foo"));
1376 ASSERT_OK(db
->Write(woptions
, &batch
));
1380 ROCKSDB_NAMESPACE::port::Thread
t2([&]() {
1381 while (wp_db
->max_evicted_seq_
== 0) { // wait for insert thread
1382 std::this_thread::yield();
1384 for (int i
= 0; i
< 10; i
++) {
1385 SequenceNumber max_lower_bound
= wp_db
->max_evicted_seq_
;
1386 auto snap
= db
->GetSnapshot();
1387 if (snap
->GetSequenceNumber() != 0) {
1388 // Value of max_evicted_seq_ when snapshot was taken in unknown. We thus
1389 // compare with the lower bound instead as an approximation.
1390 ASSERT_LT(max_lower_bound
, snap
->GetSequenceNumber());
1391 } // seq 0 is ok to be less than max since nothing is visible to it
1392 db
->ReleaseSnapshot(snap
);
1399 // Make sure that the test has worked and seq number has advanced as we
1401 auto snap
= db
->GetSnapshot();
1402 ASSERT_GT(snap
->GetSequenceNumber(), batch_cnt
* writes
- 1);
1403 db
->ReleaseSnapshot(snap
);
1406 // Test that reads without snapshots would not hit an undefined state
1407 TEST_P(WritePreparedTransactionTest
, MaxCatchupWithUnbackedSnapshot
) {
1408 const size_t snapshot_cache_bits
= 7; // same as default
1409 const size_t commit_cache_bits
= 0; // only 1 entry => frequent eviction
1410 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1411 ASSERT_OK(ReOpen());
1412 WriteOptions woptions
;
1413 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1415 const int writes
= 50;
1416 ROCKSDB_NAMESPACE::port::Thread
t1([&]() {
1417 for (int i
= 0; i
< writes
; i
++) {
1419 ASSERT_OK(batch
.Put("key", "foo"));
1420 ASSERT_OK(db
->Write(woptions
, &batch
));
1424 ROCKSDB_NAMESPACE::port::Thread
t2([&]() {
1425 while (wp_db
->max_evicted_seq_
== 0) { // wait for insert thread
1426 std::this_thread::yield();
1429 PinnableSlice pinnable_val
;
1430 TransactionOptions txn_options
;
1431 for (int i
= 0; i
< 10; i
++) {
1432 auto s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
);
1433 ASSERT_TRUE(s
.ok() || s
.IsTryAgain());
1434 pinnable_val
.Reset();
1435 Transaction
* txn
= db
->BeginTransaction(woptions
, txn_options
);
1436 s
= txn
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
);
1437 ASSERT_TRUE(s
.ok() || s
.IsTryAgain());
1438 pinnable_val
.Reset();
1439 std::vector
<std::string
> values
;
1441 txn
->MultiGet(ropt
, {db
->DefaultColumnFamily()}, {"key"}, &values
);
1442 ASSERT_EQ(1, values
.size());
1443 ASSERT_EQ(1, s_vec
.size());
1445 ASSERT_TRUE(s
.ok() || s
.IsTryAgain());
1447 txn
->MultiGet(ropt
, db
->DefaultColumnFamily(), 1, &key
, &pinnable_val
, &s
,
1449 ASSERT_TRUE(s
.ok() || s
.IsTryAgain());
1457 // Make sure that the test has worked and seq number has advanced as we
1459 auto snap
= db
->GetSnapshot();
1460 ASSERT_GT(snap
->GetSequenceNumber(), writes
- 1);
1461 db
->ReleaseSnapshot(snap
);
1464 // Check that old_commit_map_ cleanup works correctly if the snapshot equals
1465 // max_evicted_seq_.
1466 TEST_P(WritePreparedTransactionTest
, CleanupSnapshotEqualToMax
) {
1467 const size_t snapshot_cache_bits
= 7; // same as default
1468 const size_t commit_cache_bits
= 0; // only 1 entry => frequent eviction
1469 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1470 ASSERT_OK(ReOpen());
1471 WriteOptions woptions
;
1472 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1473 // Insert something to increase seq
1474 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1475 auto snap
= db
->GetSnapshot();
1476 auto snap_seq
= snap
->GetSequenceNumber();
1477 // Another insert should trigger eviction + load snapshot from db
1478 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1479 // This is the scenario that we check agaisnt
1480 ASSERT_EQ(snap_seq
, wp_db
->max_evicted_seq_
);
1481 // old_commit_map_ now has some data that needs gc
1482 ASSERT_EQ(1, wp_db
->snapshots_total_
);
1483 ASSERT_EQ(1, wp_db
->old_commit_map_
.size());
1485 db
->ReleaseSnapshot(snap
);
1487 // Another insert should trigger eviction + load snapshot from db
1488 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1490 // the snapshot and related metadata must be properly garbage collected
1491 ASSERT_EQ(0, wp_db
->snapshots_total_
);
1492 ASSERT_TRUE(wp_db
->snapshots_all_
.empty());
1493 ASSERT_EQ(0, wp_db
->old_commit_map_
.size());
1496 TEST_P(WritePreparedTransactionTest
, AdvanceSeqByOne
) {
1497 auto snap
= db
->GetSnapshot();
1498 auto seq1
= snap
->GetSequenceNumber();
1499 db
->ReleaseSnapshot(snap
);
1501 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1502 wp_db
->AdvanceSeqByOne();
1504 snap
= db
->GetSnapshot();
1505 auto seq2
= snap
->GetSequenceNumber();
1506 db
->ReleaseSnapshot(snap
);
1508 ASSERT_LT(seq1
, seq2
);
1511 // Test that the txn Initilize calls the overridden functions
1512 TEST_P(WritePreparedTransactionTest
, TxnInitialize
) {
1513 TransactionOptions txn_options
;
1514 WriteOptions write_options
;
1515 ASSERT_OK(db
->Put(write_options
, "key", "value"));
1516 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
1517 ASSERT_OK(txn0
->SetName("xid"));
1518 ASSERT_OK(txn0
->Put(Slice("key"), Slice("value1")));
1519 ASSERT_OK(txn0
->Prepare());
1521 // SetSnapshot is overridden to update min_uncommitted_
1522 txn_options
.set_snapshot
= true;
1523 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
1524 auto snap
= txn1
->GetSnapshot();
1525 auto snap_impl
= reinterpret_cast<const SnapshotImpl
*>(snap
);
1526 // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
1528 ASSERT_GT(snap_impl
->min_uncommitted_
, kMinUnCommittedSeq
);
1530 ASSERT_OK(txn0
->Rollback());
1531 ASSERT_OK(txn1
->Rollback());
1536 // This tests that transactions with duplicate keys perform correctly after max
1537 // is advancing their prepared sequence numbers. This will not be the case if
1538 // for example the txn does not add the prepared seq for the second sub-batch to
1539 // the PreparedHeap structure.
1540 TEST_P(WritePreparedTransactionTest
, AdvanceMaxEvictedSeqWithDuplicates
) {
1541 const size_t snapshot_cache_bits
= 7; // same as default
1542 const size_t commit_cache_bits
= 1; // disable commit cache
1543 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1544 ASSERT_OK(ReOpen());
1547 PinnableSlice pinnable_val
;
1548 WriteOptions write_options
;
1549 TransactionOptions txn_options
;
1550 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
1551 ASSERT_OK(txn0
->SetName("xid"));
1552 ASSERT_OK(txn0
->Put(Slice("key"), Slice("value1")));
1553 ASSERT_OK(txn0
->Put(Slice("key"), Slice("value2")));
1554 ASSERT_OK(txn0
->Prepare());
1556 ASSERT_OK(db
->Put(write_options
, "key2", "value"));
1557 // Will cause max advance due to disabled commit cache
1558 ASSERT_OK(db
->Put(write_options
, "key3", "value"));
1560 auto s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
);
1561 ASSERT_TRUE(s
.IsNotFound());
1564 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1565 ASSERT_OK(wp_db
->db_impl_
->FlushWAL(true));
1566 wp_db
->TEST_Crash();
1567 ASSERT_OK(ReOpenNoDelete());
1568 ASSERT_NE(db
, nullptr);
1569 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
);
1570 ASSERT_TRUE(s
.IsNotFound());
1572 txn0
= db
->GetTransactionByName("xid");
1573 ASSERT_OK(txn0
->Rollback());
1577 #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
1578 // Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and
1579 // delayed_prepared_, when is run concurrently with advancing max_evicted_seq,
1580 // which moves prepared txns from prepared_txns_ to delayed_prepared_.
1581 TEST_P(WritePreparedTransactionTest
, SmallestUnCommittedSeq
) {
1582 const size_t snapshot_cache_bits
= 7; // same as default
1583 const size_t commit_cache_bits
= 1; // disable commit cache
1584 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
1585 ASSERT_OK(ReOpen());
1586 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1588 PinnableSlice pinnable_val
;
1589 WriteOptions write_options
;
1590 TransactionOptions txn_options
;
1591 std::vector
<Transaction
*> txns
, committed_txns
;
1593 const int cnt
= 100;
1594 for (int i
= 0; i
< cnt
; i
++) {
1595 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
1596 ASSERT_OK(txn
->SetName("xid" + std::to_string(i
)));
1597 auto key
= "key1" + std::to_string(i
);
1598 auto value
= "value1" + std::to_string(i
);
1599 ASSERT_OK(txn
->Put(Slice(key
), Slice(value
)));
1600 ASSERT_OK(txn
->Prepare());
1601 txns
.push_back(txn
);
1606 ROCKSDB_NAMESPACE::port::Thread
commit_thread([&]() {
1607 for (int i
= 0; i
< cnt
; i
++) {
1608 uint32_t index
= rnd
.Uniform(cnt
- i
);
1611 MutexLock
l(&mutex
);
1613 txns
.erase(txns
.begin() + index
);
1615 // Since commit cache is practically disabled, commit results in immediate
1616 // advance in max_evicted_seq_ and subsequently moving some prepared txns
1617 // to delayed_prepared_.
1618 ASSERT_OK(txn
->Commit());
1619 committed_txns
.push_back(txn
);
1622 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
1624 MutexLock
l(&mutex
);
1628 auto min_uncommitted
= wp_db
->SmallestUnCommittedSeq();
1629 ASSERT_LE(min_uncommitted
, (*txns
.begin())->GetId());
1633 commit_thread
.join();
1635 for (auto txn
: committed_txns
) {
1639 #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
1641 TEST_P(SeqAdvanceConcurrentTest
, SeqAdvanceConcurrent
) {
1642 // Given the sequential run of txns, with this timeout we should never see a
1643 // deadlock nor a timeout unless we have a key conflict, which should be
1644 // almost infeasible.
1645 txn_db_options
.transaction_lock_timeout
= 1000;
1646 txn_db_options
.default_lock_timeout
= 1000;
1647 ASSERT_OK(ReOpen());
1650 // Number of different txn types we use in this test
1651 const size_t type_cnt
= 5;
1652 // The size of the first write group
1653 // TODO(myabandeh): This should be increase for pre-release tests
1654 const size_t first_group_size
= 2;
1655 // Total number of txns we run in each test
1656 // TODO(myabandeh): This should be increase for pre-release tests
1657 const size_t txn_cnt
= first_group_size
+ 1;
1659 size_t base
[txn_cnt
+ 1] = {
1662 for (size_t bi
= 1; bi
<= txn_cnt
; bi
++) {
1663 base
[bi
] = base
[bi
- 1] * type_cnt
;
1665 const size_t max_n
= static_cast<size_t>(std::pow(type_cnt
, txn_cnt
));
1666 printf("Number of cases being tested is %" ROCKSDB_PRIszt
"\n", max_n
);
1667 for (size_t n
= 0; n
< max_n
; n
++) {
1669 ASSERT_OK(ReOpen());
1672 if (n
% split_cnt_
!= split_id_
) continue;
1673 if (n
% 1000 == 0) {
1674 printf("Tested %" ROCKSDB_PRIszt
" cases so far\n", n
);
1676 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1677 auto seq
= db_impl
->TEST_GetLastVisibleSequence();
1678 with_empty_commits
= 0;
1680 // This is increased before writing the batch for commit
1682 // This is increased before txn starts linking if it expects to do a commit
1684 expected_commits
= 0;
1685 std::vector
<port::Thread
> threads
;
1687 linked
.store(0, std::memory_order_release
);
1688 std::atomic
<bool> batch_formed(false);
1689 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1690 "WriteThread::EnterAsBatchGroupLeader:End",
1691 [&](void* /*arg*/) { batch_formed
= true; });
1692 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1693 "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
1694 size_t orig_linked
= linked
.fetch_add(1, std::memory_order_acq_rel
);
1695 if (orig_linked
== 0) {
1696 // Wait until the others are linked too.
1697 while (linked
.load(std::memory_order_acquire
) < first_group_size
) {
1699 } else if (orig_linked
== first_group_size
) {
1700 // Make the 2nd batch of the rest of writes plus any followup
1701 // commits from the first batch
1702 while (linked
.load(std::memory_order_acquire
) <
1703 txn_cnt
+ commit_writes
) {
1706 // Then we will have one or more batches consisting of follow-up
1707 // commits from the 2nd batch. There is a bit of non-determinism here
1708 // but it should be tolerable.
1711 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1712 for (size_t bi
= 0; bi
< txn_cnt
; bi
++) {
1713 // get the bi-th digit in number system based on type_cnt
1714 size_t d
= (n
% base
[bi
+ 1]) / base
[bi
];
1717 threads
.emplace_back(&TransactionTestBase::TestTxn0
, this, bi
);
1720 threads
.emplace_back(&TransactionTestBase::TestTxn1
, this, bi
);
1723 threads
.emplace_back(&TransactionTestBase::TestTxn2
, this, bi
);
1726 threads
.emplace_back(&TransactionTestBase::TestTxn3
, this, bi
);
1729 threads
.emplace_back(&TransactionTestBase::TestTxn3
, this, bi
);
1734 // wait to be linked
1735 while (linked
.load(std::memory_order_acquire
) <= bi
) {
1737 // after a queue of size first_group_size
1738 if (bi
+ 1 == first_group_size
) {
1739 while (!batch_formed
) {
1741 // to make it more deterministic, wait until the commits are linked
1742 while (linked
.load(std::memory_order_acquire
) <=
1743 bi
+ expected_commits
) {
1747 for (auto& t
: threads
) {
1750 if (options
.two_write_queues
) {
1751 // In this case none of the above scheduling tricks to deterministically
1752 // form merged batches works because the writes go to separate queues.
1753 // This would result in different write groups in each run of the test. We
1754 // still keep the test since although non-deterministic and hard to debug,
1755 // it is still useful to have.
1756 // TODO(myabandeh): Add a deterministic unit test for two_write_queues
1759 // Check if memtable inserts advanced seq number as expected
1760 seq
= db_impl
->TEST_GetLastVisibleSequence();
1761 ASSERT_EQ(exp_seq
, seq
);
1763 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1764 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1766 // Check if recovery preserves the last sequence number
1767 ASSERT_OK(db_impl
->FlushWAL(true));
1768 ASSERT_OK(ReOpenNoDelete());
1769 ASSERT_NE(db
, nullptr);
1770 db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1771 seq
= db_impl
->TEST_GetLastVisibleSequence();
1772 ASSERT_LE(exp_seq
, seq
+ with_empty_commits
);
1774 // Check if flush preserves the last sequence number
1775 ASSERT_OK(db_impl
->Flush(fopt
));
1776 seq
= db_impl
->GetLatestSequenceNumber();
1777 ASSERT_LE(exp_seq
, seq
+ with_empty_commits
);
1779 // Check if recovery after flush preserves the last sequence number
1780 ASSERT_OK(db_impl
->FlushWAL(true));
1781 ASSERT_OK(ReOpenNoDelete());
1782 ASSERT_NE(db
, nullptr);
1783 db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1784 seq
= db_impl
->GetLatestSequenceNumber();
1785 ASSERT_LE(exp_seq
, seq
+ with_empty_commits
);
1789 // Run a couple of different txns among them some uncommitted. Restart the db at
1790 // a couple points to check whether the list of uncommitted txns are recovered
1792 TEST_P(WritePreparedTransactionTest
, BasicRecovery
) {
1793 options
.disable_auto_compactions
= true;
1794 ASSERT_OK(ReOpen());
1795 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1799 TransactionOptions txn_options
;
1800 WriteOptions write_options
;
1801 size_t index
= 1000;
1802 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
1803 auto istr0
= std::to_string(index
);
1804 auto s
= txn0
->SetName("xid" + istr0
);
1806 s
= txn0
->Put(Slice("foo0" + istr0
), Slice("bar0" + istr0
));
1808 s
= txn0
->Prepare();
1810 auto prep_seq_0
= txn0
->GetId();
1815 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
1816 auto istr1
= std::to_string(index
);
1817 s
= txn1
->SetName("xid" + istr1
);
1819 s
= txn1
->Put(Slice("foo1" + istr1
), Slice("bar"));
1821 s
= txn1
->Prepare();
1823 auto prep_seq_1
= txn1
->GetId();
1828 PinnableSlice pinnable_val
;
1829 // Check the value is not committed before restart
1830 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1831 ASSERT_TRUE(s
.IsNotFound());
1832 pinnable_val
.Reset();
1836 ASSERT_OK(wp_db
->db_impl_
->FlushWAL(true));
1837 wp_db
->TEST_Crash();
1838 ASSERT_OK(ReOpenNoDelete());
1839 ASSERT_NE(db
, nullptr);
1840 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1841 // After recovery, all the uncommitted txns (0 and 1) should be inserted into
1842 // delayed_prepared_
1843 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1844 ASSERT_FALSE(wp_db
->delayed_prepared_empty_
);
1845 ASSERT_LE(prep_seq_0
, wp_db
->max_evicted_seq_
);
1846 ASSERT_LE(prep_seq_1
, wp_db
->max_evicted_seq_
);
1848 ReadLock
rl(&wp_db
->prepared_mutex_
);
1849 ASSERT_EQ(2, wp_db
->delayed_prepared_
.size());
1850 ASSERT_TRUE(wp_db
->delayed_prepared_
.find(prep_seq_0
) !=
1851 wp_db
->delayed_prepared_
.end());
1852 ASSERT_TRUE(wp_db
->delayed_prepared_
.find(prep_seq_1
) !=
1853 wp_db
->delayed_prepared_
.end());
1856 // Check the value is still not committed after restart
1857 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1858 ASSERT_TRUE(s
.IsNotFound());
1859 pinnable_val
.Reset();
1863 // Test that a recovered txns will be properly marked committed for the next
1865 txn1
= db
->GetTransactionByName("xid" + istr1
);
1866 ASSERT_NE(txn1
, nullptr);
1867 ASSERT_OK(txn1
->Commit());
1871 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
1872 auto istr2
= std::to_string(index
);
1873 s
= txn2
->SetName("xid" + istr2
);
1875 s
= txn2
->Put(Slice("foo2" + istr2
), Slice("bar"));
1877 s
= txn2
->Prepare();
1879 auto prep_seq_2
= txn2
->GetId();
1882 ASSERT_OK(wp_db
->db_impl_
->FlushWAL(true));
1883 wp_db
->TEST_Crash();
1884 ASSERT_OK(ReOpenNoDelete());
1885 ASSERT_NE(db
, nullptr);
1886 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1887 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1888 ASSERT_FALSE(wp_db
->delayed_prepared_empty_
);
1890 // 0 and 2 are prepared and 1 is committed
1892 ReadLock
rl(&wp_db
->prepared_mutex_
);
1893 ASSERT_EQ(2, wp_db
->delayed_prepared_
.size());
1894 const auto& end
= wp_db
->delayed_prepared_
.end();
1895 ASSERT_NE(wp_db
->delayed_prepared_
.find(prep_seq_0
), end
);
1896 ASSERT_EQ(wp_db
->delayed_prepared_
.find(prep_seq_1
), end
);
1897 ASSERT_NE(wp_db
->delayed_prepared_
.find(prep_seq_2
), end
);
1899 ASSERT_LE(prep_seq_0
, wp_db
->max_evicted_seq_
);
1900 ASSERT_LE(prep_seq_2
, wp_db
->max_evicted_seq_
);
1902 // Commit all the remaining txns
1903 txn0
= db
->GetTransactionByName("xid" + istr0
);
1904 ASSERT_NE(txn0
, nullptr);
1905 ASSERT_OK(txn0
->Commit());
1906 txn2
= db
->GetTransactionByName("xid" + istr2
);
1907 ASSERT_NE(txn2
, nullptr);
1908 ASSERT_OK(txn2
->Commit());
1910 // Check the value is committed after commit
1911 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1912 ASSERT_TRUE(s
.ok());
1913 ASSERT_TRUE(pinnable_val
== ("bar0" + istr0
));
1914 pinnable_val
.Reset();
1918 ASSERT_OK(wp_db
->db_impl_
->FlushWAL(true));
1919 ASSERT_OK(ReOpenNoDelete());
1920 ASSERT_NE(db
, nullptr);
1921 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1922 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
1923 ASSERT_TRUE(wp_db
->delayed_prepared_empty_
);
1925 // Check the value is still committed after recovery
1926 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0" + istr0
, &pinnable_val
);
1927 ASSERT_TRUE(s
.ok());
1928 ASSERT_TRUE(pinnable_val
== ("bar0" + istr0
));
1929 pinnable_val
.Reset();
1932 // After recovery the commit map is empty while the max is set. The code would
1933 // go through a different path which requires a separate test. Test that the
1934 // committed data before the restart is visible to all snapshots.
1935 TEST_P(WritePreparedTransactionTest
, IsInSnapshotEmptyMap
) {
1936 for (bool end_with_prepare
: {false, true}) {
1937 ASSERT_OK(ReOpen());
1938 WriteOptions woptions
;
1939 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1940 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1941 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1942 SequenceNumber prepare_seq
= kMaxSequenceNumber
;
1943 if (end_with_prepare
) {
1944 TransactionOptions txn_options
;
1945 Transaction
* txn
= db
->BeginTransaction(woptions
, txn_options
);
1946 ASSERT_OK(txn
->SetName("xid0"));
1947 ASSERT_OK(txn
->Prepare());
1948 prepare_seq
= txn
->GetId();
1951 dynamic_cast<WritePreparedTxnDB
*>(db
)->TEST_Crash();
1952 auto db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1953 ASSERT_OK(db_impl
->FlushWAL(true));
1954 ASSERT_OK(ReOpenNoDelete());
1955 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1956 ASSERT_NE(wp_db
, nullptr);
1957 ASSERT_GT(wp_db
->max_evicted_seq_
, 0); // max after recovery
1958 // Take a snapshot right after recovery
1959 const Snapshot
* snap
= db
->GetSnapshot();
1960 auto snap_seq
= snap
->GetSequenceNumber();
1961 ASSERT_GT(snap_seq
, 0);
1963 for (SequenceNumber seq
= 0;
1964 seq
<= wp_db
->max_evicted_seq_
&& seq
!= prepare_seq
; seq
++) {
1965 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
));
1967 if (end_with_prepare
) {
1968 ASSERT_FALSE(wp_db
->IsInSnapshot(prepare_seq
, snap_seq
));
1971 ASSERT_FALSE(wp_db
->IsInSnapshot(snap_seq
+ 1, snap_seq
));
1973 db
->ReleaseSnapshot(snap
);
1975 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1976 // Take a snapshot after some writes
1977 snap
= db
->GetSnapshot();
1978 snap_seq
= snap
->GetSequenceNumber();
1979 for (SequenceNumber seq
= 0;
1980 seq
<= wp_db
->max_evicted_seq_
&& seq
!= prepare_seq
; seq
++) {
1981 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
));
1983 if (end_with_prepare
) {
1984 ASSERT_FALSE(wp_db
->IsInSnapshot(prepare_seq
, snap_seq
));
1987 ASSERT_FALSE(wp_db
->IsInSnapshot(snap_seq
+ 1, snap_seq
));
1989 db
->ReleaseSnapshot(snap
);
1993 // Shows the contract of IsInSnapshot when called on invalid/released snapshots
1994 TEST_P(WritePreparedTransactionTest
, IsInSnapshotReleased
) {
1995 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
1996 WriteOptions woptions
;
1997 ASSERT_OK(db
->Put(woptions
, "key", "value"));
1999 const Snapshot
* snap1
= db
->GetSnapshot();
2000 ASSERT_OK(db
->Put(woptions
, "key", "value"));
2001 ASSERT_OK(db
->Put(woptions
, "key", "value"));
2003 const Snapshot
* snap2
= db
->GetSnapshot();
2004 const SequenceNumber seq
= 1;
2005 // Evict seq out of commit cache
2006 size_t overwrite_seq
= wp_db
->COMMIT_CACHE_SIZE
+ seq
;
2007 wp_db
->AddCommitted(overwrite_seq
, overwrite_seq
);
2008 SequenceNumber snap_seq
;
2009 uint64_t min_uncommitted
= kMinUnCommittedSeq
;
2013 snap_seq
= snap1
->GetSequenceNumber();
2014 ASSERT_LE(seq
, snap_seq
);
2015 // Valid snapshot lower than max
2016 ASSERT_LE(snap_seq
, wp_db
->max_evicted_seq_
);
2017 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
, min_uncommitted
, &released
));
2018 ASSERT_FALSE(released
);
2021 snap_seq
= snap1
->GetSequenceNumber();
2022 // Invaid snapshot lower than max
2023 ASSERT_LE(snap_seq
+ 1, wp_db
->max_evicted_seq_
);
2025 wp_db
->IsInSnapshot(seq
, snap_seq
+ 1, min_uncommitted
, &released
));
2026 ASSERT_TRUE(released
);
2028 db
->ReleaseSnapshot(snap1
);
2031 // Released snapshot lower than max
2032 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
, min_uncommitted
, &released
));
2033 // The release does not take affect until the next max advance
2034 ASSERT_FALSE(released
);
2037 // Invaid snapshot lower than max
2039 wp_db
->IsInSnapshot(seq
, snap_seq
+ 1, min_uncommitted
, &released
));
2040 ASSERT_TRUE(released
);
2042 // This make the snapshot release to reflect in txn db structures
2043 wp_db
->AdvanceMaxEvictedSeq(wp_db
->max_evicted_seq_
,
2044 wp_db
->max_evicted_seq_
+ 1);
2047 // Released snapshot lower than max
2048 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
, min_uncommitted
, &released
));
2049 ASSERT_TRUE(released
);
2052 // Invaid snapshot lower than max
2054 wp_db
->IsInSnapshot(seq
, snap_seq
+ 1, min_uncommitted
, &released
));
2055 ASSERT_TRUE(released
);
2057 snap_seq
= snap2
->GetSequenceNumber();
2060 // Unreleased snapshot lower than max
2061 ASSERT_TRUE(wp_db
->IsInSnapshot(seq
, snap_seq
, min_uncommitted
, &released
));
2062 ASSERT_FALSE(released
);
2064 db
->ReleaseSnapshot(snap2
);
2067 // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
2068 // snapshot, max_committed_seq_, prepared, and commit entries.
2069 TEST_P(WritePreparedTransactionTest
, IsInSnapshot
) {
2071 // Use small commit cache to trigger lots of eviction and fast advance of
2073 const size_t commit_cache_bits
= 3;
2074 // Same for snapshot cache size
2075 const size_t snapshot_cache_bits
= 2;
2077 // Take some preliminary snapshots first. This is to stress the data structure
2078 // that holds the old snapshots as it will be designed to be efficient when
2079 // only a few snapshots are below the max_evicted_seq_.
2080 for (int max_snapshots
= 1; max_snapshots
< 20; max_snapshots
++) {
2081 // Leave some gap between the preliminary snapshots and the final snapshot
2082 // that we check. This should test for also different overlapping scenarios
2083 // between the last snapshot and the commits.
2084 for (int max_gap
= 1; max_gap
< 10; max_gap
++) {
2085 // Since we do not actually write to db, we mock the seq as it would be
2086 // increased by the db. The only exception is that we need db seq to
2087 // advance for our snapshots. for which we apply a dummy put each time we
2088 // increase our mock of seq.
2090 // At each step we prepare a txn and then we commit it in the next txn.
2091 // This emulates the consecutive transactions that write to the same key
2092 uint64_t cur_txn
= 0;
2093 // Number of snapshots taken so far
2094 int num_snapshots
= 0;
2095 // Number of gaps applied so far
2097 // The final snapshot that we will inspect
2098 uint64_t snapshot
= 0;
2099 bool found_committed
= false;
2100 // To stress the data structure that maintain prepared txns, at each cycle
2101 // we add a new prepare txn. These do not mean to be committed for
2102 // snapshot inspection.
2103 std::set
<uint64_t> prepared
;
2104 // We keep the list of txns committed before we take the last snapshot.
2105 // These should be the only seq numbers that will be found in the snapshot
2106 std::set
<uint64_t> committed_before
;
2107 // The set of commit seq numbers to be excluded from IsInSnapshot queries
2108 std::set
<uint64_t> commit_seqs
;
2109 DBImpl
* mock_db
= new DBImpl(options
, dbname
);
2110 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2111 std::unique_ptr
<WritePreparedTxnDBMock
> wp_db(
2112 new WritePreparedTxnDBMock(mock_db
, txn_db_options
));
2113 // We continue until max advances a bit beyond the snapshot.
2114 while (!snapshot
|| wp_db
->max_evicted_seq_
< snapshot
+ 100) {
2115 // do prepare for a transaction
2117 wp_db
->AddPrepared(seq
);
2118 prepared
.insert(seq
);
2120 // If cur_txn is not started, do prepare for it.
2124 wp_db
->AddPrepared(cur_txn
);
2125 } else { // else commit it
2127 wp_db
->AddCommitted(cur_txn
, seq
);
2128 wp_db
->RemovePrepared(cur_txn
);
2129 commit_seqs
.insert(seq
);
2131 committed_before
.insert(cur_txn
);
2136 if (num_snapshots
< max_snapshots
- 1) {
2137 // Take preliminary snapshots
2138 wp_db
->TakeSnapshot(seq
);
2140 } else if (gap_cnt
< max_gap
) {
2141 // Wait for some gap before taking the final snapshot
2143 } else if (!snapshot
) {
2144 // Take the final snapshot if it is not already taken
2146 wp_db
->TakeSnapshot(snapshot
);
2150 // If the snapshot is taken, verify seq numbers visible to it. We redo
2151 // it at each cycle to test that the system is still sound when
2152 // max_evicted_seq_ advances.
2154 for (uint64_t s
= 1;
2155 s
<= seq
&& commit_seqs
.find(s
) == commit_seqs
.end(); s
++) {
2156 bool was_committed
=
2157 (committed_before
.find(s
) != committed_before
.end());
2158 bool is_in_snapshot
= wp_db
->IsInSnapshot(s
, snapshot
);
2159 if (was_committed
!= is_in_snapshot
) {
2160 printf("max_snapshots %d max_gap %d seq %" PRIu64
" max %" PRIu64
2161 " snapshot %" PRIu64
2162 " gap_cnt %d num_snapshots %d s %" PRIu64
"\n",
2163 max_snapshots
, max_gap
, seq
,
2164 wp_db
->max_evicted_seq_
.load(), snapshot
, gap_cnt
,
2167 ASSERT_EQ(was_committed
, is_in_snapshot
);
2168 found_committed
= found_committed
|| is_in_snapshot
;
2172 // Safety check to make sure the test actually ran
2173 ASSERT_TRUE(found_committed
);
2174 // As an extra check, check if prepared set will be properly empty after
2175 // they are committed.
2177 wp_db
->AddCommitted(cur_txn
, seq
);
2178 wp_db
->RemovePrepared(cur_txn
);
2180 for (auto p
: prepared
) {
2181 wp_db
->AddCommitted(p
, seq
);
2182 wp_db
->RemovePrepared(p
);
2184 ASSERT_TRUE(wp_db
->delayed_prepared_
.empty());
2185 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
2190 void ASSERT_SAME(ReadOptions roptions
, TransactionDB
* db
, Status exp_s
,
2191 PinnableSlice
& exp_v
, Slice key
) {
2194 s
= db
->Get(roptions
, db
->DefaultColumnFamily(), key
, &v
);
2195 ASSERT_EQ(exp_s
, s
);
2196 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
2198 ASSERT_TRUE(exp_v
== v
);
2201 // Try with MultiGet API too
2202 std::vector
<std::string
> values
;
2204 db
->MultiGet(roptions
, {db
->DefaultColumnFamily()}, {key
}, &values
);
2205 ASSERT_EQ(1, values
.size());
2206 ASSERT_EQ(1, s_vec
.size());
2208 ASSERT_EQ(exp_s
, s
);
2209 ASSERT_TRUE(s
.ok() || s
.IsNotFound());
2211 ASSERT_TRUE(exp_v
== values
[0]);
2215 void ASSERT_SAME(TransactionDB
* db
, Status exp_s
, PinnableSlice
& exp_v
,
2217 ASSERT_SAME(ReadOptions(), db
, exp_s
, exp_v
, key
);
2220 TEST_P(WritePreparedTransactionTest
, Rollback
) {
2221 ReadOptions roptions
;
2222 WriteOptions woptions
;
2223 TransactionOptions txn_options
;
2224 const size_t num_keys
= 4;
2225 const size_t num_values
= 5;
2226 for (size_t ikey
= 1; ikey
<= num_keys
; ikey
++) {
2227 for (size_t ivalue
= 0; ivalue
< num_values
; ivalue
++) {
2228 for (bool crash
: {false, true}) {
2229 ASSERT_OK(ReOpen());
2230 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
2231 std::string key_str
= "key" + std::to_string(ikey
);
2236 ASSERT_OK(db
->Put(woptions
, key_str
, "initvalue1"));
2239 ASSERT_OK(db
->Merge(woptions
, key_str
, "initvalue2"));
2242 ASSERT_OK(db
->Delete(woptions
, key_str
));
2245 ASSERT_OK(db
->SingleDelete(woptions
, key_str
));
2253 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key1"), &v1
);
2256 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key2"), &v2
);
2259 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key3"), &v3
);
2262 db
->Get(roptions
, db
->DefaultColumnFamily(), Slice("key4"), &v4
);
2263 Transaction
* txn
= db
->BeginTransaction(woptions
, txn_options
);
2264 auto s
= txn
->SetName("xid0");
2266 s
= txn
->Put(Slice("key1"), Slice("value1"));
2268 s
= txn
->Merge(Slice("key2"), Slice("value2"));
2270 s
= txn
->Delete(Slice("key3"));
2272 s
= txn
->SingleDelete(Slice("key4"));
2278 ReadLock
rl(&wp_db
->prepared_mutex_
);
2279 ASSERT_FALSE(wp_db
->prepared_txns_
.empty());
2280 ASSERT_EQ(txn
->GetId(), wp_db
->prepared_txns_
.top());
2283 ASSERT_SAME(db
, s1
, v1
, "key1");
2284 ASSERT_SAME(db
, s2
, v2
, "key2");
2285 ASSERT_SAME(db
, s3
, v3
, "key3");
2286 ASSERT_SAME(db
, s4
, v4
, "key4");
2290 auto db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
2291 ASSERT_OK(db_impl
->FlushWAL(true));
2292 dynamic_cast<WritePreparedTxnDB
*>(db
)->TEST_Crash();
2293 ASSERT_OK(ReOpenNoDelete());
2294 ASSERT_NE(db
, nullptr);
2295 wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
2296 txn
= db
->GetTransactionByName("xid0");
2297 ASSERT_FALSE(wp_db
->delayed_prepared_empty_
);
2298 ReadLock
rl(&wp_db
->prepared_mutex_
);
2299 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
2300 ASSERT_FALSE(wp_db
->delayed_prepared_
.empty());
2301 ASSERT_TRUE(wp_db
->delayed_prepared_
.find(txn
->GetId()) !=
2302 wp_db
->delayed_prepared_
.end());
2305 ASSERT_SAME(db
, s1
, v1
, "key1");
2306 ASSERT_SAME(db
, s2
, v2
, "key2");
2307 ASSERT_SAME(db
, s3
, v3
, "key3");
2308 ASSERT_SAME(db
, s4
, v4
, "key4");
2310 s
= txn
->Rollback();
2314 ASSERT_TRUE(wp_db
->delayed_prepared_empty_
);
2315 ReadLock
rl(&wp_db
->prepared_mutex_
);
2316 ASSERT_TRUE(wp_db
->prepared_txns_
.empty());
2317 ASSERT_TRUE(wp_db
->delayed_prepared_
.empty());
2320 ASSERT_SAME(db
, s1
, v1
, "key1");
2321 ASSERT_SAME(db
, s2
, v2
, "key2");
2322 ASSERT_SAME(db
, s3
, v3
, "key3");
2323 ASSERT_SAME(db
, s4
, v4
, "key4");
2330 TEST_P(WritePreparedTransactionTest
, DisableGCDuringRecovery
) {
2331 // Use large buffer to avoid memtable flush after 1024 insertions
2332 options
.write_buffer_size
= 1024 * 1024;
2333 ASSERT_OK(ReOpen());
2334 std::vector
<KeyVersion
> versions
;
2336 for (uint64_t i
= 1; i
<= 1024; i
++) {
2337 std::string v
= "bar" + std::to_string(i
);
2338 ASSERT_OK(db
->Put(WriteOptions(), "foo", v
));
2339 VerifyKeys({{"foo", v
}});
2340 seq
++; // one for the key/value
2341 KeyVersion kv
= {"foo", v
, seq
, kTypeValue
};
2342 if (options
.two_write_queues
) {
2343 seq
++; // one for the commit
2345 versions
.emplace_back(kv
);
2347 std::reverse(std::begin(versions
), std::end(versions
));
2348 VerifyInternalKeys(versions
);
2349 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
2350 ASSERT_OK(db_impl
->FlushWAL(true));
2351 // Use small buffer to ensure memtable flush during recovery
2352 options
.write_buffer_size
= 1024;
2353 ASSERT_OK(ReOpenNoDelete());
2354 VerifyInternalKeys(versions
);
2357 TEST_P(WritePreparedTransactionTest
, SequenceNumberZero
) {
2358 ASSERT_OK(db
->Put(WriteOptions(), "foo", "bar"));
2359 VerifyKeys({{"foo", "bar"}});
2360 const Snapshot
* snapshot
= db
->GetSnapshot();
2361 ASSERT_OK(db
->Flush(FlushOptions()));
2362 // Dummy keys to avoid compaction trivially move files and get around actual
2363 // compaction logic.
2364 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
2365 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
2366 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2367 // Compaction will output keys with sequence number 0, if it is visible to
2368 // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
2369 // visible to any snapshot.
2370 VerifyKeys({{"foo", "bar"}});
2371 VerifyKeys({{"foo", "bar"}}, snapshot
);
2372 VerifyInternalKeys({{"foo", "bar", 0, kTypeValue
}});
2373 db
->ReleaseSnapshot(snapshot
);
2376 // Compaction should not remove a key if it is not committed, and should
2377 // proceed with older versions of the key as-if the new version doesn't exist.
2378 TEST_P(WritePreparedTransactionTest
, CompactionShouldKeepUncommittedKeys
) {
2379 options
.disable_auto_compactions
= true;
2380 ASSERT_OK(ReOpen());
2381 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
2382 // Snapshots to avoid keys get evicted.
2383 std::vector
<const Snapshot
*> snapshots
;
2384 // Keep track of expected sequence number.
2385 SequenceNumber expected_seq
= 0;
2387 auto add_key
= [&](std::function
<Status()> func
) {
2390 if (options
.two_write_queues
) {
2391 expected_seq
++; // 1 for commit
2393 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
2394 snapshots
.push_back(db
->GetSnapshot());
2397 // Each key here represent a standalone test case.
2398 add_key([&]() { return db
->Put(WriteOptions(), "key1", "value1_1"); });
2399 add_key([&]() { return db
->Put(WriteOptions(), "key2", "value2_1"); });
2400 add_key([&]() { return db
->Put(WriteOptions(), "key3", "value3_1"); });
2401 add_key([&]() { return db
->Put(WriteOptions(), "key4", "value4_1"); });
2402 add_key([&]() { return db
->Merge(WriteOptions(), "key5", "value5_1"); });
2403 add_key([&]() { return db
->Merge(WriteOptions(), "key5", "value5_2"); });
2404 add_key([&]() { return db
->Put(WriteOptions(), "key6", "value6_1"); });
2405 add_key([&]() { return db
->Put(WriteOptions(), "key7", "value7_1"); });
2406 ASSERT_OK(db
->Flush(FlushOptions()));
2407 add_key([&]() { return db
->Delete(WriteOptions(), "key6"); });
2408 add_key([&]() { return db
->SingleDelete(WriteOptions(), "key7"); });
2410 auto* transaction
= db
->BeginTransaction(WriteOptions());
2411 ASSERT_OK(transaction
->SetName("txn"));
2412 ASSERT_OK(transaction
->Put("key1", "value1_2"));
2413 ASSERT_OK(transaction
->Delete("key2"));
2414 ASSERT_OK(transaction
->SingleDelete("key3"));
2415 ASSERT_OK(transaction
->Merge("key4", "value4_2"));
2416 ASSERT_OK(transaction
->Merge("key5", "value5_3"));
2417 ASSERT_OK(transaction
->Put("key6", "value6_2"));
2418 ASSERT_OK(transaction
->Put("key7", "value7_2"));
2419 // Prepare but not commit.
2420 ASSERT_OK(transaction
->Prepare());
2421 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
2422 ASSERT_OK(db
->Flush(FlushOptions()));
2423 for (auto* s
: snapshots
) {
2424 db
->ReleaseSnapshot(s
);
2426 // Dummy keys to avoid compaction trivially move files and get around actual
2427 // compaction logic.
2428 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
2429 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
2430 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2432 {"key1", "value1_1"},
2433 {"key2", "value2_1"},
2434 {"key3", "value3_1"},
2435 {"key4", "value4_1"},
2436 {"key5", "value5_1,value5_2"},
2437 {"key6", "NOT_FOUND"},
2438 {"key7", "NOT_FOUND"},
2440 VerifyInternalKeys({
2441 {"key1", "value1_2", expected_seq
, kTypeValue
},
2442 {"key1", "value1_1", 0, kTypeValue
},
2443 {"key2", "", expected_seq
, kTypeDeletion
},
2444 {"key2", "value2_1", 0, kTypeValue
},
2445 {"key3", "", expected_seq
, kTypeSingleDeletion
},
2446 {"key3", "value3_1", 0, kTypeValue
},
2447 {"key4", "value4_2", expected_seq
, kTypeMerge
},
2448 {"key4", "value4_1", 0, kTypeValue
},
2449 {"key5", "value5_3", expected_seq
, kTypeMerge
},
2450 {"key5", "value5_1,value5_2", 0, kTypeValue
},
2451 {"key6", "value6_2", expected_seq
, kTypeValue
},
2452 {"key7", "value7_2", expected_seq
, kTypeValue
},
2454 ASSERT_OK(transaction
->Commit());
2456 {"key1", "value1_2"},
2457 {"key2", "NOT_FOUND"},
2458 {"key3", "NOT_FOUND"},
2459 {"key4", "value4_1,value4_2"},
2460 {"key5", "value5_1,value5_2,value5_3"},
2461 {"key6", "value6_2"},
2462 {"key7", "value7_2"},
2467 // Compaction should keep keys visible to a snapshot based on commit sequence,
2468 // not just prepare sequence.
2469 TEST_P(WritePreparedTransactionTest
, CompactionShouldKeepSnapshotVisibleKeys
) {
2470 options
.disable_auto_compactions
= true;
2471 ASSERT_OK(ReOpen());
2472 // Keep track of expected sequence number.
2473 SequenceNumber expected_seq
= 0;
2474 auto* txn1
= db
->BeginTransaction(WriteOptions());
2475 ASSERT_OK(txn1
->SetName("txn1"));
2476 ASSERT_OK(txn1
->Put("key1", "value1_1"));
2477 ASSERT_OK(txn1
->Prepare());
2478 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
2479 ASSERT_OK(txn1
->Commit());
2480 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
2481 ASSERT_EQ(++expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
2483 // Take a snapshots to avoid keys get evicted before compaction.
2484 const Snapshot
* snapshot1
= db
->GetSnapshot();
2485 auto* txn2
= db
->BeginTransaction(WriteOptions());
2486 ASSERT_OK(txn2
->SetName("txn2"));
2487 ASSERT_OK(txn2
->Put("key2", "value2_1"));
2488 ASSERT_OK(txn2
->Prepare());
2489 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
2490 // txn1 commit before snapshot2 and it is visible to snapshot2.
2491 // txn2 commit after snapshot2 and it is not visible.
2492 const Snapshot
* snapshot2
= db
->GetSnapshot();
2493 ASSERT_OK(txn2
->Commit());
2494 ASSERT_EQ(++expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
2496 // Take a snapshots to avoid keys get evicted before compaction.
2497 const Snapshot
* snapshot3
= db
->GetSnapshot();
2498 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1_2"));
2499 expected_seq
++; // 1 for write
2500 SequenceNumber seq1
= expected_seq
;
2501 if (options
.two_write_queues
) {
2502 expected_seq
++; // 1 for commit
2504 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
2505 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2_2"));
2506 expected_seq
++; // 1 for write
2507 SequenceNumber seq2
= expected_seq
;
2508 if (options
.two_write_queues
) {
2509 expected_seq
++; // 1 for commit
2511 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
2512 ASSERT_OK(db
->Flush(FlushOptions()));
2513 db
->ReleaseSnapshot(snapshot1
);
2514 db
->ReleaseSnapshot(snapshot3
);
2515 // Dummy keys to avoid compaction trivially move files and get around actual
2516 // compaction logic.
2517 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
2518 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
2519 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2520 VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
2521 VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2
);
2522 VerifyInternalKeys({
2523 {"key1", "value1_2", seq1
, kTypeValue
},
2524 // "value1_1" is visible to snapshot2. Also keys at bottom level visible
2525 // to earliest snapshot will output with seq = 0.
2526 {"key1", "value1_1", 0, kTypeValue
},
2527 {"key2", "value2_2", seq2
, kTypeValue
},
2529 db
->ReleaseSnapshot(snapshot2
);
2532 TEST_P(WritePreparedTransactionTest
, SmallestUncommittedOptimization
) {
2533 const size_t snapshot_cache_bits
= 7; // same as default
2534 const size_t commit_cache_bits
= 0; // disable commit cache
2535 for (bool has_recent_prepare
: {true, false}) {
2536 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2537 ASSERT_OK(ReOpen());
2539 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1"));
2541 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2542 ASSERT_OK(transaction
->SetName("txn"));
2543 ASSERT_OK(transaction
->Delete("key1"));
2544 ASSERT_OK(transaction
->Prepare());
2545 // snapshot1 should get min_uncommitted from prepared_txns_ heap.
2546 auto snapshot1
= db
->GetSnapshot();
2547 ASSERT_EQ(transaction
->GetId(),
2548 ((SnapshotImpl
*)snapshot1
)->min_uncommitted_
);
2549 // Add a commit to advance max_evicted_seq and move the prepared transaction
2550 // into delayed_prepared_ set.
2551 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2"));
2552 Transaction
* txn2
= nullptr;
2553 if (has_recent_prepare
) {
2555 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2556 ASSERT_OK(txn2
->SetName("txn2"));
2557 ASSERT_OK(txn2
->Put("key3", "value3"));
2558 ASSERT_OK(txn2
->Prepare());
2560 // snapshot2 should get min_uncommitted from delayed_prepared_ set.
2561 auto snapshot2
= db
->GetSnapshot();
2562 ASSERT_EQ(transaction
->GetId(),
2563 ((SnapshotImpl
*)snapshot1
)->min_uncommitted_
);
2564 ASSERT_OK(transaction
->Commit());
2566 if (has_recent_prepare
) {
2567 ASSERT_OK(txn2
->Commit());
2570 VerifyKeys({{"key1", "NOT_FOUND"}});
2571 VerifyKeys({{"key1", "value1"}}, snapshot1
);
2572 VerifyKeys({{"key1", "value1"}}, snapshot2
);
2573 db
->ReleaseSnapshot(snapshot1
);
2574 db
->ReleaseSnapshot(snapshot2
);
2578 // Insert two values, v1 and v2, for a key. Between prepare and commit of v2
2579 // take two snapshots, s1 and s2. Release s1 during compaction.
2580 // Test to make sure compaction doesn't get confused and think s1 can see both
2581 // values, and thus compact out the older value by mistake.
2582 TEST_P(WritePreparedTransactionTest
, ReleaseSnapshotDuringCompaction
) {
2583 const size_t snapshot_cache_bits
= 7; // same as default
2584 const size_t commit_cache_bits
= 0; // minimum commit cache
2585 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2586 options
.disable_auto_compactions
= true;
2587 ASSERT_OK(ReOpen());
2589 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1_1"));
2591 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2592 ASSERT_OK(transaction
->SetName("txn"));
2593 ASSERT_OK(transaction
->Put("key1", "value1_2"));
2594 ASSERT_OK(transaction
->Prepare());
2595 auto snapshot1
= db
->GetSnapshot();
2596 // Increment sequence number.
2597 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2"));
2598 auto snapshot2
= db
->GetSnapshot();
2599 ASSERT_OK(transaction
->Commit());
2601 VerifyKeys({{"key1", "value1_2"}});
2602 VerifyKeys({{"key1", "value1_1"}}, snapshot1
);
2603 VerifyKeys({{"key1", "value1_1"}}, snapshot2
);
2604 // Add a flush to avoid compaction to fallback to trivial move.
2606 // The callback might be called twice, record the calling state to
2607 // prevent double calling.
2608 bool callback_finished
= false;
2609 auto callback
= [&](void*) {
2610 if (callback_finished
) {
2613 // Release snapshot1 after CompactionIterator init.
2614 // CompactionIterator need to figure out the earliest snapshot
2615 // that can see key1:value1_2 is kMaxSequenceNumber, not
2616 // snapshot1 or snapshot2.
2617 db
->ReleaseSnapshot(snapshot1
);
2618 // Add some keys to advance max_evicted_seq.
2619 ASSERT_OK(db
->Put(WriteOptions(), "key3", "value3"));
2620 ASSERT_OK(db
->Put(WriteOptions(), "key4", "value4"));
2621 callback_finished
= true;
2623 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2625 SyncPoint::GetInstance()->EnableProcessing();
2627 ASSERT_OK(db
->Flush(FlushOptions()));
2628 VerifyKeys({{"key1", "value1_2"}});
2629 VerifyKeys({{"key1", "value1_1"}}, snapshot2
);
2630 db
->ReleaseSnapshot(snapshot2
);
2631 SyncPoint::GetInstance()->ClearAllCallBacks();
2634 // Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2,
2635 // after committing v2. Release s1 during compaction, right after compaction
2636 // processes v2 and before processes v1. Test to make sure compaction doesn't
2637 // get confused and believe v1 and v2 are visible to different snapshot
2638 // (v1 by s2, v2 by s1) and refuse to compact out v1.
2639 TEST_P(WritePreparedTransactionTest
, ReleaseSnapshotDuringCompaction2
) {
2640 const size_t snapshot_cache_bits
= 7; // same as default
2641 const size_t commit_cache_bits
= 0; // minimum commit cache
2642 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2643 options
.disable_auto_compactions
= true;
2644 ASSERT_OK(ReOpen());
2646 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1"));
2647 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value2"));
2648 SequenceNumber v2_seq
= db
->GetLatestSequenceNumber();
2649 auto* s1
= db
->GetSnapshot();
2650 // Advance sequence number.
2651 ASSERT_OK(db
->Put(WriteOptions(), "key2", "dummy"));
2652 auto* s2
= db
->GetSnapshot();
2654 int count_value
= 0;
2655 auto callback
= [&](void* arg
) {
2656 auto* ikey
= reinterpret_cast<ParsedInternalKey
*>(arg
);
2657 if (ikey
->user_key
== "key1") {
2659 if (count_value
== 2) {
2661 db
->ReleaseSnapshot(s1
);
2662 // Add some keys to advance max_evicted_seq and update
2664 ASSERT_OK(db
->Put(WriteOptions(), "key3", "dummy"));
2665 ASSERT_OK(db
->Put(WriteOptions(), "key4", "dummy"));
2669 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV",
2671 SyncPoint::GetInstance()->EnableProcessing();
2673 ASSERT_OK(db
->Flush(FlushOptions()));
2674 // value1 should be compact out.
2675 VerifyInternalKeys({{"key1", "value2", v2_seq
, kTypeValue
}});
2678 db
->ReleaseSnapshot(s2
);
2679 SyncPoint::GetInstance()->ClearAllCallBacks();
2682 // Insert two values, v1 and v2, for a key. Insert another dummy key
2683 // so to evict the commit cache for v2, while v1 is still in commit cache.
2684 // Take two snapshots, s1 and s2. Release s1 during compaction.
2685 // Since commit cache for v2 is evicted, and old_commit_map don't have
2686 // s1 (it is released),
2687 // TODO(myabandeh): how can we be sure that the v2's commit info is evicted
2688 // (and not v1's)? Instead of putting a dummy, we can directly call
2689 // AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache.
2690 TEST_P(WritePreparedTransactionTest
, ReleaseSnapshotDuringCompaction3
) {
2691 const size_t snapshot_cache_bits
= 7; // same as default
2692 const size_t commit_cache_bits
= 1; // commit cache size = 2
2693 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2694 options
.disable_auto_compactions
= true;
2695 ASSERT_OK(ReOpen());
2697 // Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
2698 // It also advance max_evicted_seq and can trigger old_commit_map cleanup.
2699 auto add_dummy
= [&]() {
2701 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2702 ASSERT_OK(txn_dummy
->SetName("txn_dummy"));
2703 ASSERT_OK(txn_dummy
->Put("dummy", "dummy"));
2704 ASSERT_OK(txn_dummy
->Prepare());
2705 ASSERT_OK(txn_dummy
->Commit());
2709 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1"));
2711 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2712 ASSERT_OK(txn
->SetName("txn"));
2713 ASSERT_OK(txn
->Put("key1", "value2"));
2714 ASSERT_OK(txn
->Prepare());
2715 // TODO(myabandeh): replace it with GetId()?
2716 auto v2_seq
= db
->GetLatestSequenceNumber();
2717 ASSERT_OK(txn
->Commit());
2719 auto* s1
= db
->GetSnapshot();
2720 // Dummy key to advance sequence number.
2722 auto* s2
= db
->GetSnapshot();
2724 // The callback might be called twice, record the calling state to
2725 // prevent double calling.
2726 bool callback_finished
= false;
2727 auto callback
= [&](void*) {
2728 if (callback_finished
) {
2731 db
->ReleaseSnapshot(s1
);
2732 // Add some dummy entries to trigger s1 being cleanup from old_commit_map.
2735 callback_finished
= true;
2737 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2739 SyncPoint::GetInstance()->EnableProcessing();
2741 ASSERT_OK(db
->Flush(FlushOptions()));
2742 // value1 should be compact out.
2743 VerifyInternalKeys({{"key1", "value2", v2_seq
, kTypeValue
}});
2745 db
->ReleaseSnapshot(s2
);
2746 SyncPoint::GetInstance()->ClearAllCallBacks();
2749 TEST_P(WritePreparedTransactionTest
, ReleaseEarliestSnapshotDuringCompaction
) {
2750 const size_t snapshot_cache_bits
= 7; // same as default
2751 const size_t commit_cache_bits
= 0; // minimum commit cache
2752 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
2753 options
.disable_auto_compactions
= true;
2754 ASSERT_OK(ReOpen());
2756 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1"));
2757 SequenceNumber put_seq
= db
->GetLatestSequenceNumber();
2759 db
->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2760 ASSERT_OK(transaction
->SetName("txn"));
2761 ASSERT_OK(transaction
->Delete("key1"));
2762 ASSERT_OK(transaction
->Prepare());
2763 SequenceNumber del_seq
= db
->GetLatestSequenceNumber();
2764 auto snapshot1
= db
->GetSnapshot();
2765 // Increment sequence number.
2766 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2"));
2767 auto snapshot2
= db
->GetSnapshot();
2768 ASSERT_OK(transaction
->Commit());
2770 VerifyKeys({{"key1", "NOT_FOUND"}});
2771 VerifyKeys({{"key1", "value1"}}, snapshot1
);
2772 VerifyKeys({{"key1", "value1"}}, snapshot2
);
2773 ASSERT_OK(db
->Flush(FlushOptions()));
2775 auto callback
= [&](void* compaction
) {
2776 // Release snapshot1 after CompactionIterator init.
2777 // CompactionIterator need to double check and find out snapshot2 is now
2778 // the earliest existing snapshot.
2779 if (compaction
!= nullptr) {
2780 db
->ReleaseSnapshot(snapshot1
);
2781 // Add some keys to advance max_evicted_seq.
2782 ASSERT_OK(db
->Put(WriteOptions(), "key3", "value3"));
2783 ASSERT_OK(db
->Put(WriteOptions(), "key4", "value4"));
2786 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2788 SyncPoint::GetInstance()->EnableProcessing();
2790 // Dummy keys to avoid compaction trivially move files and get around actual
2791 // compaction logic.
2792 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
2793 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
2794 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2795 // Only verify for key1. Both the put and delete for the key should be kept.
2796 // Since the delete tombstone is not visible to snapshot2, we need to keep
2797 // at least one version of the key, for write-conflict check.
2798 VerifyInternalKeys({{"key1", "", del_seq
, kTypeDeletion
},
2799 {"key1", "value1", put_seq
, kTypeValue
}});
2800 db
->ReleaseSnapshot(snapshot2
);
2801 SyncPoint::GetInstance()->ClearAllCallBacks();
2804 TEST_P(WritePreparedTransactionTest
,
2805 ReleaseEarliestSnapshotDuringCompaction_WithSD
) {
2806 constexpr size_t kSnapshotCacheBits
= 7; // same as default
2807 constexpr size_t kCommitCacheBits
= 0; // minimum commit cache
2808 UpdateTransactionDBOptions(kSnapshotCacheBits
, kCommitCacheBits
);
2809 options
.disable_auto_compactions
= true;
2810 ASSERT_OK(ReOpen());
2812 ASSERT_OK(db
->Put(WriteOptions(), "key", "value"));
2813 ASSERT_OK(db
->Put(WriteOptions(), "foo", "value"));
2814 ASSERT_OK(db
->Flush(FlushOptions()));
2816 auto* txn
= db
->BeginTransaction(WriteOptions(), TransactionOptions(),
2817 /*old_txn=*/nullptr);
2818 ASSERT_OK(txn
->SingleDelete("key"));
2819 ASSERT_OK(txn
->Put("wow", "value"));
2820 ASSERT_OK(txn
->SetName("txn"));
2821 ASSERT_OK(txn
->Prepare());
2822 ASSERT_OK(db
->Flush(FlushOptions()));
2824 const bool two_write_queues
= std::get
<1>(GetParam());
2825 if (two_write_queues
) {
2826 // In the case of two queues, commit another txn just to bump
2827 // last_published_seq so that a subsequent GetSnapshot() call can return
2828 // a snapshot with higher sequence.
2829 auto* dummy_txn
= db
->BeginTransaction(WriteOptions(), TransactionOptions(),
2830 /*old_txn=*/nullptr);
2831 ASSERT_OK(dummy_txn
->Put("haha", "value"));
2832 ASSERT_OK(dummy_txn
->Commit());
2835 auto* snapshot
= db
->GetSnapshot();
2837 ASSERT_OK(txn
->Commit());
2840 SyncPoint::GetInstance()->SetCallBack(
2841 "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg
) {
2845 db
->ReleaseSnapshot(snapshot
);
2847 // Advance max_evicted_seq
2848 ASSERT_OK(db
->Put(WriteOptions(), "bar", "value"));
2850 SyncPoint::GetInstance()->EnableProcessing();
2851 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
2853 SyncPoint::GetInstance()->ClearAllCallBacks();
2856 TEST_P(WritePreparedTransactionTest
,
2857 ReleaseEarliestSnapshotDuringCompaction_WithSD2
) {
2858 constexpr size_t kSnapshotCacheBits
= 7; // same as default
2859 constexpr size_t kCommitCacheBits
= 0; // minimum commit cache
2860 UpdateTransactionDBOptions(kSnapshotCacheBits
, kCommitCacheBits
);
2861 options
.disable_auto_compactions
= true;
2862 ASSERT_OK(ReOpen());
2864 ASSERT_OK(db
->Put(WriteOptions(), "foo", "value"));
2865 ASSERT_OK(db
->Put(WriteOptions(), "key", "value"));
2866 ASSERT_OK(db
->Flush(FlushOptions()));
2868 auto* txn
= db
->BeginTransaction(WriteOptions(), TransactionOptions(),
2869 /*old_txn=*/nullptr);
2870 ASSERT_OK(txn
->Put("bar", "value"));
2871 ASSERT_OK(txn
->SingleDelete("key"));
2872 ASSERT_OK(txn
->SetName("txn"));
2873 ASSERT_OK(txn
->Prepare());
2874 ASSERT_OK(db
->Flush(FlushOptions()));
2876 ASSERT_OK(txn
->Commit());
2879 ASSERT_OK(db
->Put(WriteOptions(), "haha", "value"));
2881 // Create a dummy transaction to take a snapshot for ww-conflict detection.
2882 TransactionOptions txn_opts
;
2883 txn_opts
.set_snapshot
= true;
2885 db
->BeginTransaction(WriteOptions(), txn_opts
, /*old_txn=*/nullptr);
2887 SyncPoint::GetInstance()->SetCallBack(
2888 "CompactionIterator::NextFromInput:SingleDelete:2", [&](void* /*arg*/) {
2889 ASSERT_OK(dummy_txn
->Rollback());
2892 ASSERT_OK(db
->Put(WriteOptions(), "dontcare", "value"));
2894 SyncPoint::GetInstance()->EnableProcessing();
2896 ASSERT_OK(db
->Put(WriteOptions(), "haha2", "value"));
2897 auto* snapshot
= db
->GetSnapshot();
2899 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2900 db
->ReleaseSnapshot(snapshot
);
2901 SyncPoint::GetInstance()->ClearAllCallBacks();
2904 TEST_P(WritePreparedTransactionTest
,
2905 ReleaseEarliestSnapshotDuringCompaction_WithDelete
) {
2906 constexpr size_t kSnapshotCacheBits
= 7; // same as default
2907 constexpr size_t kCommitCacheBits
= 0; // minimum commit cache
2908 UpdateTransactionDBOptions(kSnapshotCacheBits
, kCommitCacheBits
);
2909 options
.disable_auto_compactions
= true;
2910 ASSERT_OK(ReOpen());
2912 ASSERT_OK(db
->Put(WriteOptions(), "a", "value"));
2913 ASSERT_OK(db
->Put(WriteOptions(), "b", "value"));
2914 ASSERT_OK(db
->Put(WriteOptions(), "c", "value"));
2915 ASSERT_OK(db
->Flush(FlushOptions()));
2917 auto* txn
= db
->BeginTransaction(WriteOptions(), TransactionOptions(),
2918 /*old_txn=*/nullptr);
2919 ASSERT_OK(txn
->Delete("b"));
2920 ASSERT_OK(txn
->SetName("txn"));
2921 ASSERT_OK(txn
->Prepare());
2923 const bool two_write_queues
= std::get
<1>(GetParam());
2924 if (two_write_queues
) {
2925 // In the case of two queues, commit another txn just to bump
2926 // last_published_seq so that a subsequent GetSnapshot() call can return
2927 // a snapshot with higher sequence.
2928 auto* dummy_txn
= db
->BeginTransaction(WriteOptions(), TransactionOptions(),
2929 /*old_txn=*/nullptr);
2930 ASSERT_OK(dummy_txn
->Put("haha", "value"));
2931 ASSERT_OK(dummy_txn
->Commit());
2934 auto* snapshot1
= db
->GetSnapshot();
2935 ASSERT_OK(txn
->Commit());
2937 auto* snapshot2
= db
->GetSnapshot();
2939 SyncPoint::GetInstance()->SetCallBack(
2940 "CompactionIterator::NextFromInput:BottommostDelete:1", [&](void* arg
) {
2944 db
->ReleaseSnapshot(snapshot1
);
2946 // Advance max_evicted_seq
2947 ASSERT_OK(db
->Put(WriteOptions(), "dummy1", "value"));
2949 SyncPoint::GetInstance()->EnableProcessing();
2951 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
2953 db
->ReleaseSnapshot(snapshot2
);
2954 SyncPoint::GetInstance()->ClearAllCallBacks();
2957 TEST_P(WritePreparedTransactionTest
,
2958 ReleaseSnapshotBetweenSDAndPutDuringCompaction
) {
2959 constexpr size_t kSnapshotCacheBits
= 7; // same as default
2960 constexpr size_t kCommitCacheBits
= 0; // minimum commit cache
2961 UpdateTransactionDBOptions(kSnapshotCacheBits
, kCommitCacheBits
);
2962 options
.disable_auto_compactions
= true;
2963 ASSERT_OK(ReOpen());
2965 // Create a dummy transaction to take a snapshot for ww-conflict detection.
2966 TransactionOptions txn_opts
;
2967 txn_opts
.set_snapshot
= true;
2969 db
->BeginTransaction(WriteOptions(), txn_opts
, /*old_txn=*/nullptr);
2971 ASSERT_OK(db
->Put(WriteOptions(), "bar", "value"));
2973 ASSERT_OK(db
->Put(WriteOptions(), "foo", "value"));
2974 ASSERT_OK(db
->SingleDelete(WriteOptions(), "foo"));
2975 auto* snapshot1
= db
->GetSnapshot();
2977 ASSERT_OK(db
->Put(WriteOptions(), "dontcare", "value"));
2978 auto* snapshot2
= db
->GetSnapshot();
2980 SyncPoint::GetInstance()->SetCallBack(
2981 "CompactionIterator::NextFromInput:KeepSDForWW", [&](void* /*arg*/) {
2982 db
->ReleaseSnapshot(snapshot1
);
2984 ASSERT_OK(db
->Put(WriteOptions(), "dontcare2", "value2"));
2986 SyncPoint::GetInstance()->EnableProcessing();
2988 ASSERT_OK(db
->Flush(FlushOptions()));
2989 db
->ReleaseSnapshot(snapshot2
);
2990 ASSERT_OK(dummy_txn
->Commit());
2992 SyncPoint::GetInstance()->ClearAllCallBacks();
2995 TEST_P(WritePreparedTransactionTest
,
2996 ReleaseEarliestWriteConflictSnapshot_SingleDelete
) {
2997 constexpr size_t kSnapshotCacheBits
= 7; // same as default
2998 constexpr size_t kCommitCacheBits
= 0; // minimum commit cache
2999 UpdateTransactionDBOptions(kSnapshotCacheBits
, kCommitCacheBits
);
3000 options
.disable_auto_compactions
= true;
3001 ASSERT_OK(ReOpen());
3003 ASSERT_OK(db
->Put(WriteOptions(), "a", "value"));
3004 ASSERT_OK(db
->Put(WriteOptions(), "b", "value"));
3005 ASSERT_OK(db
->Put(WriteOptions(), "c", "value"));
3006 ASSERT_OK(db
->Flush(FlushOptions()));
3009 CompactRangeOptions cro
;
3010 cro
.change_level
= true;
3011 cro
.target_level
= 2;
3012 ASSERT_OK(db
->CompactRange(cro
, /*begin=*/nullptr, /*end=*/nullptr));
3015 std::unique_ptr
<Transaction
> txn
;
3016 txn
.reset(db
->BeginTransaction(WriteOptions(), TransactionOptions(),
3017 /*old_txn=*/nullptr));
3018 ASSERT_OK(txn
->SetName("txn1"));
3019 ASSERT_OK(txn
->SingleDelete("b"));
3020 ASSERT_OK(txn
->Prepare());
3021 ASSERT_OK(txn
->Commit());
3023 auto* snapshot1
= db
->GetSnapshot();
3025 // Bump seq of the db by performing writes so that
3026 // earliest_snapshot_ < earliest_write_conflict_snapshot_ in
3027 // CompactionIterator.
3028 ASSERT_OK(db
->Put(WriteOptions(), "z", "dontcare"));
3030 // Create another snapshot for write conflict checking
3031 std::unique_ptr
<Transaction
> txn2
;
3033 TransactionOptions txn_opts
;
3034 txn_opts
.set_snapshot
= true;
3036 db
->BeginTransaction(WriteOptions(), txn_opts
, /*old_txn=*/nullptr));
3039 // Bump seq so that the subsequent bg flush won't create a snapshot with the
3040 // same seq as the previous snapshot for conflict checking.
3041 ASSERT_OK(db
->Put(WriteOptions(), "y", "dont"));
3043 ASSERT_OK(db
->Flush(FlushOptions()));
3045 SyncPoint::GetInstance()->DisableProcessing();
3046 SyncPoint::GetInstance()->ClearAllCallBacks();
3047 SyncPoint::GetInstance()->SetCallBack(
3048 "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* /*arg*/) {
3049 // Rolling back txn2 should release its snapshot(for ww checking).
3050 ASSERT_OK(txn2
->Rollback());
3052 // Advance max_evicted_seq
3053 ASSERT_OK(db
->Put(WriteOptions(), "x", "value"));
3055 SyncPoint::GetInstance()->EnableProcessing();
3057 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
3060 SyncPoint::GetInstance()->DisableProcessing();
3061 SyncPoint::GetInstance()->ClearAllCallBacks();
3063 db
->ReleaseSnapshot(snapshot1
);
3066 TEST_P(WritePreparedTransactionTest
, ReleaseEarliestSnapshotAfterSeqZeroing
) {
3067 constexpr size_t kSnapshotCacheBits
= 7; // same as default
3068 constexpr size_t kCommitCacheBits
= 0; // minimum commit cache
3069 UpdateTransactionDBOptions(kSnapshotCacheBits
, kCommitCacheBits
);
3070 options
.disable_auto_compactions
= true;
3071 ASSERT_OK(ReOpen());
3073 ASSERT_OK(db
->Put(WriteOptions(), "a", "value"));
3074 ASSERT_OK(db
->Put(WriteOptions(), "b", "value"));
3075 ASSERT_OK(db
->Put(WriteOptions(), "c", "value"));
3076 ASSERT_OK(db
->Flush(FlushOptions()));
3079 CompactRangeOptions cro
;
3080 cro
.change_level
= true;
3081 cro
.target_level
= 2;
3082 ASSERT_OK(db
->CompactRange(cro
, /*begin=*/nullptr, /*end=*/nullptr));
3085 ASSERT_OK(db
->SingleDelete(WriteOptions(), "b"));
3087 // Take a snapshot so that the SD won't be dropped during flush.
3088 auto* tmp_snapshot
= db
->GetSnapshot();
3090 ASSERT_OK(db
->Put(WriteOptions(), "b", "value2"));
3091 auto* snapshot
= db
->GetSnapshot();
3092 ASSERT_OK(db
->Flush(FlushOptions()));
3094 db
->ReleaseSnapshot(tmp_snapshot
);
3096 // Bump the sequence so that the below bg compaction job's snapshot will be
3097 // different from snapshot's sequence.
3098 ASSERT_OK(db
->Put(WriteOptions(), "z", "foo"));
3100 SyncPoint::GetInstance()->DisableProcessing();
3101 SyncPoint::GetInstance()->ClearAllCallBacks();
3102 SyncPoint::GetInstance()->SetCallBack(
3103 "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg
) {
3104 const auto* const ikey
=
3105 reinterpret_cast<const ParsedInternalKey
*>(arg
);
3107 if (ikey
->user_key
== "b") {
3108 assert(ikey
->type
== kTypeValue
);
3109 db
->ReleaseSnapshot(snapshot
);
3111 // Bump max_evicted_seq.
3112 ASSERT_OK(db
->Put(WriteOptions(), "z", "dontcare"));
3115 SyncPoint::GetInstance()->EnableProcessing();
3117 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
3120 SyncPoint::GetInstance()->DisableProcessing();
3121 SyncPoint::GetInstance()->ClearAllCallBacks();
3124 TEST_P(WritePreparedTransactionTest
, ReleaseEarliestSnapshotAfterSeqZeroing2
) {
3125 constexpr size_t kSnapshotCacheBits
= 7; // same as default
3126 constexpr size_t kCommitCacheBits
= 0; // minimum commit cache
3127 UpdateTransactionDBOptions(kSnapshotCacheBits
, kCommitCacheBits
);
3128 options
.disable_auto_compactions
= true;
3129 ASSERT_OK(ReOpen());
3131 // Generate an L0 with only SD for one key "b".
3132 ASSERT_OK(db
->Put(WriteOptions(), "a", "value"));
3133 ASSERT_OK(db
->Put(WriteOptions(), "b", "value"));
3134 // Take a snapshot so that subsequent flush outputs the SD for "b".
3135 auto* tmp_snapshot
= db
->GetSnapshot();
3136 ASSERT_OK(db
->SingleDelete(WriteOptions(), "b"));
3137 ASSERT_OK(db
->Put(WriteOptions(), "c", "value"));
3139 SyncPoint::GetInstance()->DisableProcessing();
3140 SyncPoint::GetInstance()->ClearAllCallBacks();
3141 SyncPoint::GetInstance()->SetCallBack(
3142 "CompactionIterator::NextFromInput:SingleDelete:3", [&](void* arg
) {
3144 db
->ReleaseSnapshot(tmp_snapshot
);
3145 // Bump max_evicted_seq
3146 ASSERT_OK(db
->Put(WriteOptions(), "x", "dontcare"));
3149 SyncPoint::GetInstance()->EnableProcessing();
3151 ASSERT_OK(db
->Flush(FlushOptions()));
3152 // Finish generating L0 with only SD for "b".
3154 SyncPoint::GetInstance()->DisableProcessing();
3155 SyncPoint::GetInstance()->ClearAllCallBacks();
3157 // Move the L0 to L2.
3159 CompactRangeOptions cro
;
3160 cro
.change_level
= true;
3161 cro
.target_level
= 2;
3162 ASSERT_OK(db
->CompactRange(cro
, /*begin=*/nullptr, /*end=*/nullptr));
3165 ASSERT_OK(db
->Put(WriteOptions(), "b", "value1"));
3167 auto* snapshot
= db
->GetSnapshot();
3169 // Bump seq so that a subsequent flush/compaction job's snapshot is larger
3170 // than the above snapshot's seq.
3171 ASSERT_OK(db
->Put(WriteOptions(), "x", "dontcare"));
3173 // Generate a second L0.
3174 ASSERT_OK(db
->Flush(FlushOptions()));
3176 SyncPoint::GetInstance()->SetCallBack(
3177 "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg
) {
3178 const auto* const ikey
=
3179 reinterpret_cast<const ParsedInternalKey
*>(arg
);
3181 if (ikey
->user_key
== "b") {
3182 assert(ikey
->type
== kTypeValue
);
3183 db
->ReleaseSnapshot(snapshot
);
3185 // Bump max_evicted_seq.
3186 ASSERT_OK(db
->Put(WriteOptions(), "z", "dontcare"));
3189 SyncPoint::GetInstance()->EnableProcessing();
3191 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
3194 SyncPoint::GetInstance()->DisableProcessing();
3195 SyncPoint::GetInstance()->ClearAllCallBacks();
3198 // Although the user-contract indicates that a SD can only be issued for a key
3199 // that exists and has not been overwritten, it is still possible for a Delete
3200 // to be present when write-prepared transaction is rolled back.
3201 TEST_P(WritePreparedTransactionTest
, SingleDeleteAfterRollback
) {
3202 constexpr size_t kSnapshotCacheBits
= 7; // same as default
3203 constexpr size_t kCommitCacheBits
= 0; // minimum commit cache
3204 txn_db_options
.rollback_deletion_type_callback
=
3205 [](TransactionDB
*, ColumnFamilyHandle
*, const Slice
&) { return true; };
3206 UpdateTransactionDBOptions(kSnapshotCacheBits
, kCommitCacheBits
);
3207 options
.disable_auto_compactions
= true;
3208 ASSERT_OK(ReOpen());
3210 // Get a write conflict snapshot by creating a transaction with
3211 // set_snapshot=true.
3212 TransactionOptions txn_opts
;
3213 txn_opts
.set_snapshot
= true;
3214 std::unique_ptr
<Transaction
> dummy_txn(
3215 db
->BeginTransaction(WriteOptions(), txn_opts
));
3217 std::unique_ptr
<Transaction
> txn0(
3218 db
->BeginTransaction(WriteOptions(), TransactionOptions()));
3219 ASSERT_OK(txn0
->Put("foo", "value"));
3220 ASSERT_OK(txn0
->SetName("xid0"));
3221 ASSERT_OK(txn0
->Prepare());
3223 // Create an SST with only {"foo": "value"}.
3224 ASSERT_OK(db
->Flush(FlushOptions()));
3226 // Insert a Delete to cancel out the prior Put by txn0.
3227 ASSERT_OK(txn0
->Rollback());
3230 // Create a second SST.
3231 ASSERT_OK(db
->Flush(FlushOptions()));
3233 ASSERT_OK(db
->Put(WriteOptions(), "foo", "value1"));
3235 auto* snapshot
= db
->GetSnapshot();
3237 ASSERT_OK(db
->SingleDelete(WriteOptions(), "foo"));
3240 SyncPoint::GetInstance()->DisableProcessing();
3241 SyncPoint::GetInstance()->ClearAllCallBacks();
3242 SyncPoint::GetInstance()->SetCallBack(
3243 "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg
) {
3244 const auto* const c
= reinterpret_cast<const Compaction
*>(arg
);
3246 // Trigger once only for SingleDelete during flush.
3249 db
->ReleaseSnapshot(snapshot
);
3250 // Bump max_evicted_seq
3251 ASSERT_OK(db
->Put(WriteOptions(), "x", "dontcare"));
3254 SyncPoint::GetInstance()->EnableProcessing();
3256 // Create a third SST containing a SD without its matching PUT.
3257 ASSERT_OK(db
->Flush(FlushOptions()));
3259 SyncPoint::GetInstance()->DisableProcessing();
3260 SyncPoint::GetInstance()->ClearAllCallBacks();
3261 SyncPoint::GetInstance()->EnableProcessing();
3263 DBImpl
* dbimpl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
3265 ASSERT_OK(dbimpl
->TEST_CompactRange(
3266 /*level=*/0, /*begin=*/nullptr, /*end=*/nullptr,
3267 /*column_family=*/nullptr, /*disallow_trivial_mode=*/true));
3269 SyncPoint::GetInstance()->DisableProcessing();
3270 SyncPoint::GetInstance()->ClearAllCallBacks();
3272 // Release the conflict-checking snapshot.
3273 ASSERT_OK(dummy_txn
->Rollback());
3276 // A more complex test to verify compaction/flush should keep keys visible
3278 TEST_P(WritePreparedTransactionTest
,
3279 CompactionKeepSnapshotVisibleKeysRandomized
) {
3280 constexpr size_t kNumTransactions
= 10;
3281 constexpr size_t kNumIterations
= 1000;
3283 std::vector
<Transaction
*> transactions(kNumTransactions
, nullptr);
3284 std::vector
<size_t> versions(kNumTransactions
, 0);
3285 std::unordered_map
<std::string
, std::string
> current_data
;
3286 std::vector
<const Snapshot
*> snapshots
;
3287 std::vector
<std::unordered_map
<std::string
, std::string
>> snapshot_data
;
3290 options
.disable_auto_compactions
= true;
3291 ASSERT_OK(ReOpen());
3293 for (size_t i
= 0; i
< kNumTransactions
; i
++) {
3294 std::string key
= "key" + std::to_string(i
);
3295 std::string value
= "value0";
3296 ASSERT_OK(db
->Put(WriteOptions(), key
, value
));
3297 current_data
[key
] = value
;
3299 VerifyKeys(current_data
);
3301 for (size_t iter
= 0; iter
< kNumIterations
; iter
++) {
3302 auto r
= rnd
.Next() % (kNumTransactions
+ 1);
3303 if (r
< kNumTransactions
) {
3304 std::string key
= "key" + std::to_string(r
);
3305 if (transactions
[r
] == nullptr) {
3306 std::string value
= "value" + std::to_string(versions
[r
] + 1);
3307 auto* txn
= db
->BeginTransaction(WriteOptions());
3308 ASSERT_OK(txn
->SetName("txn" + std::to_string(r
)));
3309 ASSERT_OK(txn
->Put(key
, value
));
3310 ASSERT_OK(txn
->Prepare());
3311 transactions
[r
] = txn
;
3313 std::string value
= "value" + std::to_string(++versions
[r
]);
3314 ASSERT_OK(transactions
[r
]->Commit());
3315 delete transactions
[r
];
3316 transactions
[r
] = nullptr;
3317 current_data
[key
] = value
;
3320 auto* snapshot
= db
->GetSnapshot();
3321 VerifyKeys(current_data
, snapshot
);
3322 snapshots
.push_back(snapshot
);
3323 snapshot_data
.push_back(current_data
);
3325 VerifyKeys(current_data
);
3327 // Take a last snapshot to test compaction with uncommitted prepared
3329 snapshots
.push_back(db
->GetSnapshot());
3330 snapshot_data
.push_back(current_data
);
3332 ASSERT_EQ(snapshots
.size(), snapshot_data
.size());
3333 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
3334 VerifyKeys(snapshot_data
[i
], snapshots
[i
]);
3336 ASSERT_OK(db
->Flush(FlushOptions()));
3337 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
3338 VerifyKeys(snapshot_data
[i
], snapshots
[i
]);
3340 // Dummy keys to avoid compaction trivially move files and get around actual
3341 // compaction logic.
3342 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
3343 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
3344 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3345 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
3346 VerifyKeys(snapshot_data
[i
], snapshots
[i
]);
3349 for (size_t i
= 0; i
< kNumTransactions
; i
++) {
3350 if (transactions
[i
] == nullptr) {
3353 ASSERT_OK(transactions
[i
]->Commit());
3354 delete transactions
[i
];
3356 for (size_t i
= 0; i
< snapshots
.size(); i
++) {
3357 db
->ReleaseSnapshot(snapshots
[i
]);
3361 // Compaction should not apply the optimization to output key with sequence
3362 // number equal to 0 if the key is not visible to earliest snapshot, based on
3363 // commit sequence number.
3364 TEST_P(WritePreparedTransactionTest
,
3365 CompactionShouldKeepSequenceForUncommittedKeys
) {
3366 options
.disable_auto_compactions
= true;
3367 ASSERT_OK(ReOpen());
3368 // Keep track of expected sequence number.
3369 SequenceNumber expected_seq
= 0;
3370 auto* transaction
= db
->BeginTransaction(WriteOptions());
3371 ASSERT_OK(transaction
->SetName("txn"));
3372 ASSERT_OK(transaction
->Put("key1", "value1"));
3373 ASSERT_OK(transaction
->Prepare());
3374 ASSERT_EQ(++expected_seq
, db
->GetLatestSequenceNumber());
3375 SequenceNumber seq1
= expected_seq
;
3376 ASSERT_OK(db
->Put(WriteOptions(), "key2", "value2"));
3377 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
3378 expected_seq
++; // one for data
3379 if (options
.two_write_queues
) {
3380 expected_seq
++; // one for commit
3382 ASSERT_EQ(expected_seq
, db_impl
->TEST_GetLastVisibleSequence());
3383 ASSERT_OK(db
->Flush(FlushOptions()));
3384 // Dummy keys to avoid compaction trivially move files and get around actual
3385 // compaction logic.
3386 ASSERT_OK(db
->Put(WriteOptions(), "a", "dummy"));
3387 ASSERT_OK(db
->Put(WriteOptions(), "z", "dummy"));
3388 ASSERT_OK(db
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3390 {"key1", "NOT_FOUND"},
3393 VerifyInternalKeys({
3394 // "key1" has not been committed. It keeps its sequence number.
3395 {"key1", "value1", seq1
, kTypeValue
},
3396 // "key2" is committed and output with seq = 0.
3397 {"key2", "value2", 0, kTypeValue
},
3399 ASSERT_OK(transaction
->Commit());
3407 TEST_P(WritePreparedTransactionTest
, CommitAndSnapshotDuringCompaction
) {
3408 options
.disable_auto_compactions
= true;
3409 ASSERT_OK(ReOpen());
3411 const Snapshot
* snapshot
= nullptr;
3412 ASSERT_OK(db
->Put(WriteOptions(), "key1", "value1"));
3413 auto* txn
= db
->BeginTransaction(WriteOptions());
3414 ASSERT_OK(txn
->SetName("txn"));
3415 ASSERT_OK(txn
->Put("key1", "value2"));
3416 ASSERT_OK(txn
->Prepare());
3418 auto callback
= [&](void*) {
3419 // Snapshot is taken after compaction start. It should be taken into
3420 // consideration for whether to compact out value1.
3421 snapshot
= db
->GetSnapshot();
3422 ASSERT_OK(txn
->Commit());
3425 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
3427 SyncPoint::GetInstance()->EnableProcessing();
3428 ASSERT_OK(db
->Flush(FlushOptions()));
3429 ASSERT_NE(nullptr, snapshot
);
3430 VerifyKeys({{"key1", "value2"}});
3431 VerifyKeys({{"key1", "value1"}}, snapshot
);
3432 db
->ReleaseSnapshot(snapshot
);
3435 TEST_P(WritePreparedTransactionTest
, Iterate
) {
3436 auto verify_state
= [](Iterator
* iter
, const std::string
& key
,
3437 const std::string
& value
) {
3438 ASSERT_TRUE(iter
->Valid());
3439 ASSERT_OK(iter
->status());
3440 ASSERT_EQ(key
, iter
->key().ToString());
3441 ASSERT_EQ(value
, iter
->value().ToString());
3444 auto verify_iter
= [&](const std::string
& expected_val
) {
3445 // Get iterator from a concurrent transaction and make sure it has the
3446 // same view as an iterator from the DB.
3447 auto* txn
= db
->BeginTransaction(WriteOptions());
3449 for (int i
= 0; i
< 2; i
++) {
3450 Iterator
* iter
= (i
== 0) ? db
->NewIterator(ReadOptions())
3451 : txn
->GetIterator(ReadOptions());
3454 verify_state(iter
, "foo", expected_val
);
3457 verify_state(iter
, "a", "va");
3459 verify_state(iter
, "foo", expected_val
);
3461 iter
->SeekForPrev("y");
3462 verify_state(iter
, "foo", expected_val
);
3464 iter
->SeekForPrev("z");
3465 verify_state(iter
, "z", "vz");
3467 verify_state(iter
, "foo", expected_val
);
3473 ASSERT_OK(db
->Put(WriteOptions(), "foo", "v1"));
3474 auto* transaction
= db
->BeginTransaction(WriteOptions());
3475 ASSERT_OK(transaction
->SetName("txn"));
3476 ASSERT_OK(transaction
->Put("foo", "v2"));
3477 ASSERT_OK(transaction
->Prepare());
3478 VerifyKeys({{"foo", "v1"}});
3480 ASSERT_OK(db
->Put(WriteOptions(), "a", "va"));
3481 ASSERT_OK(db
->Put(WriteOptions(), "z", "vz"));
3483 ASSERT_OK(transaction
->Commit());
3484 VerifyKeys({{"foo", "v2"}});
3489 TEST_P(WritePreparedTransactionTest
, IteratorRefreshNotSupported
) {
3490 Iterator
* iter
= db
->NewIterator(ReadOptions());
3491 ASSERT_OK(iter
->status());
3492 ASSERT_TRUE(iter
->Refresh().IsNotSupported());
3496 // Committing an delayed prepared has two non-atomic steps: update commit cache,
3497 // remove seq from delayed_prepared_. The read in IsInSnapshot also involves two
3498 // non-atomic steps of checking these two data structures. This test breaks each
3499 // in the middle to ensure correctness in spite of non-atomic execution.
3500 // Note: This test is limitted to the case where snapshot is larger than the
3501 // max_evicted_seq_.
3502 TEST_P(WritePreparedTransactionTest
, NonAtomicCommitOfDelayedPrepared
) {
3503 const size_t snapshot_cache_bits
= 7; // same as default
3504 const size_t commit_cache_bits
= 3; // 8 entries
3505 for (auto split_read
: {true, false}) {
3506 std::vector
<bool> split_options
= {false};
3508 // Also test for break before mutex
3509 split_options
.push_back(true);
3511 for (auto split_before_mutex
: split_options
) {
3512 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
3513 ASSERT_OK(ReOpen());
3514 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
3515 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
3516 // Fill up the commit cache
3517 std::string
init_value("value1");
3518 for (int i
= 0; i
< 10; i
++) {
3519 ASSERT_OK(db
->Put(WriteOptions(), Slice("key1"), Slice(init_value
)));
3521 // Prepare a transaction but do not commit it
3523 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3524 ASSERT_OK(txn
->SetName("xid"));
3525 ASSERT_OK(txn
->Put(Slice("key1"), Slice("value2")));
3526 ASSERT_OK(txn
->Prepare());
3527 // Commit a bunch of entries to advance max evicted seq and make the
3528 // prepared a delayed prepared
3529 for (int i
= 0; i
< 10; i
++) {
3530 ASSERT_OK(db
->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3532 // The snapshot should not see the delayed prepared entry
3533 auto snap
= db
->GetSnapshot();
3536 if (split_before_mutex
) {
3537 // split before acquiring prepare_mutex_
3538 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3539 {{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause",
3540 "AtomicCommitOfDelayedPrepared:Commit:before"},
3541 {"AtomicCommitOfDelayedPrepared:Commit:after",
3542 "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}});
3544 // split right after reading from the commit cache
3545 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3546 {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
3547 "AtomicCommitOfDelayedPrepared:Commit:before"},
3548 {"AtomicCommitOfDelayedPrepared:Commit:after",
3549 "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
3551 } else { // split commit
3552 // split right before removing from delayed_prepared_
3553 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3554 {{"WritePreparedTxnDB::RemovePrepared:pause",
3555 "AtomicCommitOfDelayedPrepared:Read:before"},
3556 {"AtomicCommitOfDelayedPrepared:Read:after",
3557 "WritePreparedTxnDB::RemovePrepared:resume"}});
3559 SyncPoint::GetInstance()->EnableProcessing();
3561 ROCKSDB_NAMESPACE::port::Thread
commit_thread([&]() {
3562 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before");
3563 ASSERT_OK(txn
->Commit());
3564 if (split_before_mutex
) {
3565 // Do bunch of inserts to evict the commit entry from the cache. This
3566 // would prevent the 2nd look into commit cache under prepare_mutex_
3567 // to see the commit entry.
3568 auto seq
= db_impl
->TEST_GetLastVisibleSequence();
3570 while (wp_db
->max_evicted_seq_
< seq
&& tries
< 50) {
3571 ASSERT_OK(db
->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3574 ASSERT_LT(tries
, 50);
3576 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after");
3580 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
3581 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before");
3582 ReadOptions roptions
;
3583 roptions
.snapshot
= snap
;
3584 PinnableSlice value
;
3585 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key1", &value
);
3587 // It should not see the commit of delayed prepared
3588 ASSERT_TRUE(value
== init_value
);
3589 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after");
3590 db
->ReleaseSnapshot(snap
);
3594 commit_thread
.join();
3595 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3596 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3597 } // for split_before_mutex
3601 // When max evicted seq advances a prepared seq, it involves two updates: i)
3602 // adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_.
3603 // ::IsInSnapshot also reads these two values in a non-atomic way. This test
3604 // ensures correctness if the update occurs after ::IsInSnapshot reads
3605 // delayed_prepared_empty_ and before it reads max_evicted_seq_.
3606 // Note: this test focuses on read snapshot larger than max_evicted_seq_.
3607 TEST_P(WritePreparedTransactionTest
, NonAtomicUpdateOfDelayedPrepared
) {
3608 const size_t snapshot_cache_bits
= 7; // same as default
3609 const size_t commit_cache_bits
= 3; // 8 entries
3610 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
3611 ASSERT_OK(ReOpen());
3612 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
3613 // Fill up the commit cache
3614 std::string
init_value("value1");
3615 for (int i
= 0; i
< 10; i
++) {
3616 ASSERT_OK(db
->Put(WriteOptions(), Slice("key1"), Slice(init_value
)));
3618 // Prepare a transaction but do not commit it
3619 Transaction
* txn
= db
->BeginTransaction(WriteOptions(), TransactionOptions());
3620 ASSERT_OK(txn
->SetName("xid"));
3621 ASSERT_OK(txn
->Put(Slice("key1"), Slice("value2")));
3622 ASSERT_OK(txn
->Prepare());
3623 // Create a gap between prepare seq and snapshot seq
3624 ASSERT_OK(db
->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3625 ASSERT_OK(db
->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3626 // The snapshot should not see the delayed prepared entry
3627 auto snap
= db
->GetSnapshot();
3628 ASSERT_LT(txn
->GetId(), snap
->GetSequenceNumber());
3630 // split right after reading delayed_prepared_empty_
3631 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3632 {{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause",
3633 "AtomicUpdateOfDelayedPrepared:before"},
3634 {"AtomicUpdateOfDelayedPrepared:after",
3635 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}});
3636 SyncPoint::GetInstance()->EnableProcessing();
3638 ROCKSDB_NAMESPACE::port::Thread
commit_thread([&]() {
3639 TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before");
3640 // Commit a bunch of entries to advance max evicted seq and make the
3641 // prepared a delayed prepared
3643 while (wp_db
->max_evicted_seq_
< txn
->GetId() && tries
< 50) {
3644 ASSERT_OK(db
->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3647 ASSERT_LT(tries
, 50);
3648 // This is the case on which the test focuses
3649 ASSERT_LT(wp_db
->max_evicted_seq_
, snap
->GetSequenceNumber());
3650 TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after");
3653 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
3654 ReadOptions roptions
;
3655 roptions
.snapshot
= snap
;
3656 PinnableSlice value
;
3657 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key1", &value
);
3659 // It should not see the uncommitted value of delayed prepared
3660 ASSERT_TRUE(value
== init_value
);
3661 db
->ReleaseSnapshot(snap
);
3665 commit_thread
.join();
3666 ASSERT_OK(txn
->Commit());
3668 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3669 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3672 // Eviction from commit cache and update of max evicted seq are two non-atomic
3673 // steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading
3674 // from commit cache are two non-atomic steps. This tests if the update occurs
3675 // after reading max_evicted_seq_ and before reading the commit cache.
3676 // Note: the test focuses on snapshot larger than max_evicted_seq_
3677 TEST_P(WritePreparedTransactionTest
, NonAtomicUpdateOfMaxEvictedSeq
) {
3678 const size_t snapshot_cache_bits
= 7; // same as default
3679 const size_t commit_cache_bits
= 3; // 8 entries
3680 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
3681 ASSERT_OK(ReOpen());
3682 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
3683 // Fill up the commit cache
3684 std::string
init_value("value1");
3685 std::string
last_value("value_final");
3686 for (int i
= 0; i
< 10; i
++) {
3687 ASSERT_OK(db
->Put(WriteOptions(), Slice("key1"), Slice(init_value
)));
3689 // Do an uncommitted write to prevent min_uncommitted optimization
3691 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3692 ASSERT_OK(txn1
->SetName("xid1"));
3693 ASSERT_OK(txn1
->Put(Slice("key0"), last_value
));
3694 ASSERT_OK(txn1
->Prepare());
3695 // Do a write with prepare to get the prepare seq
3696 Transaction
* txn
= db
->BeginTransaction(WriteOptions(), TransactionOptions());
3697 ASSERT_OK(txn
->SetName("xid"));
3698 ASSERT_OK(txn
->Put(Slice("key1"), last_value
));
3699 ASSERT_OK(txn
->Prepare());
3700 ASSERT_OK(txn
->Commit());
3701 // Create a gap between commit entry and snapshot seq
3702 ASSERT_OK(db
->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3703 ASSERT_OK(db
->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3704 // The snapshot should see the last commit
3705 auto snap
= db
->GetSnapshot();
3706 ASSERT_LE(txn
->GetId(), snap
->GetSequenceNumber());
3708 // split right after reading max_evicted_seq_
3709 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3710 {{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause",
3711 "NonAtomicUpdateOfMaxEvictedSeq:before"},
3712 {"NonAtomicUpdateOfMaxEvictedSeq:after",
3713 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}});
3714 SyncPoint::GetInstance()->EnableProcessing();
3716 ROCKSDB_NAMESPACE::port::Thread
commit_thread([&]() {
3717 TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before");
3718 // Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
3720 while (wp_db
->max_evicted_seq_
< txn
->GetId() && tries
< 50) {
3721 ASSERT_OK(db
->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3724 ASSERT_LT(tries
, 50);
3725 // This is the case on which the test focuses
3726 ASSERT_LT(wp_db
->max_evicted_seq_
, snap
->GetSequenceNumber());
3727 TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after");
3730 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
3731 ReadOptions roptions
;
3732 roptions
.snapshot
= snap
;
3733 PinnableSlice value
;
3734 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key1", &value
);
3736 // It should see the committed value of the evicted entry
3737 ASSERT_TRUE(value
== last_value
);
3738 db
->ReleaseSnapshot(snap
);
3742 commit_thread
.join();
3744 ASSERT_OK(txn1
->Commit());
3746 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3747 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3750 // Test when we add a prepared seq when the max_evicted_seq_ already goes beyond
3751 // that. The test focuses on a race condition between AddPrepared and
3752 // AdvanceMaxEvictedSeq functions.
3753 TEST_P(WritePreparedTransactionTest
, AddPreparedBeforeMax
) {
3754 if (!options
.two_write_queues
) {
3755 // This test is only for two write queues
3758 const size_t snapshot_cache_bits
= 7; // same as default
3759 // 1 entry to advance max after the 2nd commit
3760 const size_t commit_cache_bits
= 0;
3761 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
3762 ASSERT_OK(ReOpen());
3763 WritePreparedTxnDB
* wp_db
= dynamic_cast<WritePreparedTxnDB
*>(db
);
3764 std::string
some_value("value_some");
3765 std::string
uncommitted_value("value_uncommitted");
3766 // Prepare two uncommitted transactions
3768 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3769 ASSERT_OK(txn1
->SetName("xid1"));
3770 ASSERT_OK(txn1
->Put(Slice("key1"), some_value
));
3771 ASSERT_OK(txn1
->Prepare());
3773 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3774 ASSERT_OK(txn2
->SetName("xid2"));
3775 ASSERT_OK(txn2
->Put(Slice("key2"), some_value
));
3776 ASSERT_OK(txn2
->Prepare());
3777 // Start the txn here so the other thread could get its id
3778 Transaction
* txn
= db
->BeginTransaction(WriteOptions(), TransactionOptions());
3779 ASSERT_OK(txn
->SetName("xid"));
3780 ASSERT_OK(txn
->Put(Slice("key0"), uncommitted_value
));
3781 port::Mutex txn_mutex_
;
3783 // t1) Insert prepared entry, t2) commit other entries to advance max
3784 // evicted sec and finish checking the existing prepared entries, t1)
3785 // AddPrepared, t2) update max_evicted_seq_
3786 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3787 {"AddPreparedCallback::AddPrepared::begin:pause",
3788 "AddPreparedBeforeMax::read_thread:start"},
3789 {"AdvanceMaxEvictedSeq::update_max:pause",
3790 "AddPreparedCallback::AddPrepared::begin:resume"},
3791 {"AddPreparedCallback::AddPrepared::end",
3792 "AdvanceMaxEvictedSeq::update_max:resume"},
3794 SyncPoint::GetInstance()->EnableProcessing();
3796 ROCKSDB_NAMESPACE::port::Thread
write_thread([&]() {
3798 ASSERT_OK(txn
->Prepare());
3799 txn_mutex_
.Unlock();
3802 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
3803 TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start");
3804 // Publish seq number with a commit
3805 ASSERT_OK(txn1
->Commit());
3806 // Since the commit cache size is one the 2nd commit evict the 1st one and
3807 // invokes AdcanceMaxEvictedSeq
3808 ASSERT_OK(txn2
->Commit());
3810 ReadOptions roptions
;
3811 PinnableSlice value
;
3812 // The snapshot should not see the uncommitted value from write_thread
3813 auto snap
= db
->GetSnapshot();
3814 ASSERT_LT(wp_db
->max_evicted_seq_
, snap
->GetSequenceNumber());
3815 // This is the scenario that we test for
3817 ASSERT_GT(wp_db
->max_evicted_seq_
, txn
->GetId());
3818 txn_mutex_
.Unlock();
3819 roptions
.snapshot
= snap
;
3820 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key0", &value
);
3821 ASSERT_TRUE(s
.IsNotFound());
3822 db
->ReleaseSnapshot(snap
);
3826 write_thread
.join();
3829 ASSERT_OK(txn
->Commit());
3831 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3832 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3835 // When an old prepared entry gets committed, there is a gap between the time
3836 // that it is published and when it is cleaned up from old_prepared_. This test
3837 // stresses such cases.
3838 TEST_P(WritePreparedTransactionTest
, CommitOfDelayedPrepared
) {
3839 const size_t snapshot_cache_bits
= 7; // same as default
3840 for (const size_t commit_cache_bits
: {0, 2, 3}) {
3841 for (const size_t sub_batch_cnt
: {1, 2, 3}) {
3842 UpdateTransactionDBOptions(snapshot_cache_bits
, commit_cache_bits
);
3843 ASSERT_OK(ReOpen());
3844 std::atomic
<const Snapshot
*> snap
= {nullptr};
3845 std::atomic
<SequenceNumber
> exp_prepare
= {0};
3846 ROCKSDB_NAMESPACE::port::Thread callback_thread
;
3847 // Value is synchronized via snap
3848 PinnableSlice value
;
3849 // Take a snapshot after publish and before RemovePrepared:Start
3850 auto snap_callback
= [&]() {
3851 ASSERT_EQ(nullptr, snap
.load());
3852 snap
.store(db
->GetSnapshot());
3853 ReadOptions roptions
;
3854 roptions
.snapshot
= snap
.load();
3855 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key2", &value
);
3858 auto callback
= [&](void* param
) {
3859 SequenceNumber prep_seq
= *((SequenceNumber
*)param
);
3860 if (prep_seq
== exp_prepare
.load()) { // only for write_thread
3861 // We need to spawn a thread to avoid deadlock since getting a
3862 // snpashot might end up calling AdvanceSeqByOne which needs joining
3864 callback_thread
= ROCKSDB_NAMESPACE::port::Thread(snap_callback
);
3865 TEST_SYNC_POINT("callback:end");
3868 // Wait for the first snapshot be taken in GetSnapshotInternal. Although
3869 // it might be updated before GetSnapshotInternal finishes but this should
3870 // cover most of the cases.
3871 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3872 {"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"},
3874 SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback
);
3875 SyncPoint::GetInstance()->EnableProcessing();
3876 // Thread to cause frequent evictions
3877 ROCKSDB_NAMESPACE::port::Thread
eviction_thread([&]() {
3878 // Too many txns might cause commit_seq - prepare_seq in another thread
3879 // to go beyond DELTA_UPPERBOUND
3880 for (int i
= 0; i
< 25 * (1 << commit_cache_bits
); i
++) {
3881 ASSERT_OK(db
->Put(WriteOptions(), Slice("key1"), Slice("value1")));
3884 ROCKSDB_NAMESPACE::port::Thread
write_thread([&]() {
3885 for (int i
= 0; i
< 25 * (1 << commit_cache_bits
); i
++) {
3887 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3888 ASSERT_OK(txn
->SetName("xid"));
3889 std::string val_str
= "value" + std::to_string(i
);
3890 for (size_t b
= 0; b
< sub_batch_cnt
; b
++) {
3891 ASSERT_OK(txn
->Put(Slice("key2"), val_str
));
3893 ASSERT_OK(txn
->Prepare());
3894 // Let an eviction to kick in
3895 std::this_thread::yield();
3897 exp_prepare
.store(txn
->GetId());
3898 ASSERT_OK(txn
->Commit());
3900 // Wait for the snapshot taking that is triggered by
3901 // RemovePrepared:Start callback
3902 callback_thread
.join();
3904 // Read with the snapshot taken before delayed_prepared_ cleanup
3905 ReadOptions roptions
;
3906 roptions
.snapshot
= snap
.load();
3907 ASSERT_NE(nullptr, roptions
.snapshot
);
3908 PinnableSlice value2
;
3910 db
->Get(roptions
, db
->DefaultColumnFamily(), "key2", &value2
);
3912 // It should see its own write
3913 ASSERT_TRUE(val_str
== value2
);
3914 // The value read by snapshot should not change
3915 ASSERT_STREQ(value2
.ToString().c_str(), value
.ToString().c_str());
3917 db
->ReleaseSnapshot(roptions
.snapshot
);
3918 snap
.store(nullptr);
3921 write_thread
.join();
3922 eviction_thread
.join();
3923 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3924 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3929 // Test that updating the commit map will not affect the existing snapshots
3930 TEST_P(WritePreparedTransactionTest
, AtomicCommit
) {
3931 for (bool skip_prepare
: {true, false}) {
3932 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3933 {"WritePreparedTxnDB::AddCommitted:start",
3934 "AtomicCommit::GetSnapshot:start"},
3935 {"AtomicCommit::Get:end",
3936 "WritePreparedTxnDB::AddCommitted:start:pause"},
3937 {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
3938 {"AtomicCommit::Get2:end",
3939 "WritePreparedTxnDB::AddCommitted:end:pause:"},
3941 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3942 ROCKSDB_NAMESPACE::port::Thread
write_thread([&]() {
3944 ASSERT_OK(db
->Put(WriteOptions(), Slice("key"), Slice("value")));
3947 db
->BeginTransaction(WriteOptions(), TransactionOptions());
3948 ASSERT_OK(txn
->SetName("xid"));
3949 ASSERT_OK(txn
->Put(Slice("key"), Slice("value")));
3950 ASSERT_OK(txn
->Prepare());
3951 ASSERT_OK(txn
->Commit());
3955 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
3956 ReadOptions roptions
;
3957 TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
3958 roptions
.snapshot
= db
->GetSnapshot();
3960 auto s
= db
->Get(roptions
, db
->DefaultColumnFamily(), "key", &val
);
3961 TEST_SYNC_POINT("AtomicCommit::Get:end");
3962 TEST_SYNC_POINT("AtomicCommit::Get2:start");
3963 ASSERT_SAME(roptions
, db
, s
, val
, "key");
3964 TEST_SYNC_POINT("AtomicCommit::Get2:end");
3965 db
->ReleaseSnapshot(roptions
.snapshot
);
3968 write_thread
.join();
3969 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3973 TEST_P(WritePreparedTransactionTest
, BasicRollbackDeletionTypeCb
) {
3974 options
.level0_file_num_compaction_trigger
= 2;
3975 // Always use SingleDelete to rollback Put.
3976 txn_db_options
.rollback_deletion_type_callback
=
3977 [](TransactionDB
*, ColumnFamilyHandle
*, const Slice
&) { return true; };
3979 const auto write_to_db
= [&]() {
3981 std::unique_ptr
<Transaction
> txn0(
3982 db
->BeginTransaction(WriteOptions(), TransactionOptions()));
3983 ASSERT_OK(txn0
->SetName("txn0"));
3984 ASSERT_OK(txn0
->Put("a", "v0"));
3985 ASSERT_OK(txn0
->Prepare());
3987 // Generate sst1: [PUT('a')]
3988 ASSERT_OK(db
->Flush(FlushOptions()));
3991 CompactRangeOptions cro
;
3992 cro
.change_level
= true;
3993 cro
.target_level
= options
.num_levels
- 1;
3994 cro
.bottommost_level_compaction
= BottommostLevelCompaction::kForce
;
3995 ASSERT_OK(db
->CompactRange(cro
, /*begin=*/nullptr, /*end=*/nullptr));
3998 ASSERT_OK(txn0
->Rollback());
4001 ASSERT_OK(db
->Put(WriteOptions(), "a", "v1"));
4003 ASSERT_OK(db
->SingleDelete(WriteOptions(), "a"));
4004 // Generate another SST with a SD to cover the oldest PUT('a')
4005 ASSERT_OK(db
->Flush(FlushOptions()));
4007 auto* dbimpl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
4009 ASSERT_OK(dbimpl
->TEST_WaitForCompact());
4012 CompactRangeOptions cro
;
4013 cro
.bottommost_level_compaction
= BottommostLevelCompaction::kForce
;
4014 ASSERT_OK(db
->CompactRange(cro
, /*begin=*/nullptr, /*end=*/nullptr));
4019 const Status s
= db
->Get(ReadOptions(), "a", &value
);
4020 ASSERT_TRUE(s
.IsNotFound());
4024 // Destroy and reopen
4025 ASSERT_OK(ReOpen());
4029 // Test that we can change write policy from WriteCommitted to WritePrepared
4030 // after a clean shutdown (which would empty the WAL)
4031 TEST_P(WritePreparedTransactionTest
, WP_WC_DBBackwardCompatibility
) {
4032 bool empty_wal
= true;
4033 CrossCompatibilityTest(WRITE_COMMITTED
, WRITE_PREPARED
, empty_wal
);
4036 // Test that we fail fast if WAL is not emptied between changing the write
4037 // policy from WriteCommitted to WritePrepared
4038 TEST_P(WritePreparedTransactionTest
, WP_WC_WALBackwardIncompatibility
) {
4039 bool empty_wal
= true;
4040 CrossCompatibilityTest(WRITE_COMMITTED
, WRITE_PREPARED
, !empty_wal
);
4043 // Test that we can change write policy from WritePrepare back to WriteCommitted
4044 // after a clean shutdown (which would empty the WAL)
4045 TEST_P(WritePreparedTransactionTest
, WC_WP_ForwardCompatibility
) {
4046 bool empty_wal
= true;
4047 CrossCompatibilityTest(WRITE_PREPARED
, WRITE_COMMITTED
, empty_wal
);
4050 // Test that we fail fast if WAL is not emptied between changing the write
4051 // policy from WriteCommitted to WritePrepared
4052 TEST_P(WritePreparedTransactionTest
, WC_WP_WALForwardIncompatibility
) {
4053 bool empty_wal
= true;
4054 CrossCompatibilityTest(WRITE_PREPARED
, WRITE_COMMITTED
, !empty_wal
);
4057 } // namespace ROCKSDB_NAMESPACE
4059 int main(int argc
, char** argv
) {
4060 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
4061 ::testing::InitGoogleTest(&argc
, argv
);
4062 if (getenv("CIRCLECI")) {
4063 // Looking for backtrace on "Resource temporarily unavailable" exceptions
4064 ::testing::FLAGS_gtest_catch_exceptions
= false;
4066 return RUN_ALL_TESTS();
4072 int main(int /*argc*/, char** /*argv*/) {
4074 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
4078 #endif // ROCKSDB_LITE