1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/transaction_test.h"
15 #include "db/db_impl/db_impl.h"
16 #include "port/port.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/options.h"
19 #include "rocksdb/perf_context.h"
20 #include "rocksdb/utilities/transaction.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "table/mock_table.h"
23 #include "test_util/sync_point.h"
24 #include "test_util/testharness.h"
25 #include "test_util/testutil.h"
26 #include "test_util/transaction_test_util.h"
27 #include "util/random.h"
28 #include "util/string_util.h"
29 #include "utilities/fault_injection_env.h"
30 #include "utilities/merge_operators.h"
31 #include "utilities/merge_operators/string_append/stringappend.h"
32 #include "utilities/transactions/pessimistic_transaction_db.h"
36 namespace ROCKSDB_NAMESPACE
{
38 INSTANTIATE_TEST_CASE_P(
39 DBAsBaseDB
, TransactionTest
,
41 std::make_tuple(false, false, WRITE_COMMITTED
, kOrderedWrite
),
42 std::make_tuple(false, true, WRITE_COMMITTED
, kOrderedWrite
),
43 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
),
44 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
),
45 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
),
46 std::make_tuple(false, false, WRITE_UNPREPARED
, kOrderedWrite
),
47 std::make_tuple(false, true, WRITE_UNPREPARED
, kOrderedWrite
)));
48 INSTANTIATE_TEST_CASE_P(
49 DBAsBaseDB
, TransactionStressTest
,
51 std::make_tuple(false, false, WRITE_COMMITTED
, kOrderedWrite
),
52 std::make_tuple(false, true, WRITE_COMMITTED
, kOrderedWrite
),
53 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
),
54 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
),
55 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
),
56 std::make_tuple(false, false, WRITE_UNPREPARED
, kOrderedWrite
),
57 std::make_tuple(false, true, WRITE_UNPREPARED
, kOrderedWrite
)));
58 INSTANTIATE_TEST_CASE_P(
59 StackableDBAsBaseDB
, TransactionTest
,
61 std::make_tuple(true, true, WRITE_COMMITTED
, kOrderedWrite
),
62 std::make_tuple(true, true, WRITE_PREPARED
, kOrderedWrite
),
63 std::make_tuple(true, true, WRITE_UNPREPARED
, kOrderedWrite
)));
65 // MySQLStyleTransactionTest takes far too long for valgrind to run.
66 #ifndef ROCKSDB_VALGRIND_RUN
67 INSTANTIATE_TEST_CASE_P(
68 MySQLStyleTransactionTest
, MySQLStyleTransactionTest
,
70 std::make_tuple(false, false, WRITE_COMMITTED
, kOrderedWrite
, false),
71 std::make_tuple(false, true, WRITE_COMMITTED
, kOrderedWrite
, false),
72 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, false),
73 std::make_tuple(false, false, WRITE_PREPARED
, kOrderedWrite
, true),
74 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, false),
75 std::make_tuple(false, true, WRITE_PREPARED
, kOrderedWrite
, true),
76 std::make_tuple(false, false, WRITE_UNPREPARED
, kOrderedWrite
, false),
77 std::make_tuple(false, false, WRITE_UNPREPARED
, kOrderedWrite
, true),
78 std::make_tuple(false, true, WRITE_UNPREPARED
, kOrderedWrite
, false),
79 std::make_tuple(false, true, WRITE_UNPREPARED
, kOrderedWrite
, true),
80 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, false),
81 std::make_tuple(false, true, WRITE_PREPARED
, kUnorderedWrite
, true)));
82 #endif // ROCKSDB_VALGRIND_RUN
84 TEST_P(TransactionTest
, DoubleEmptyWrite
) {
85 WriteOptions write_options
;
86 write_options
.sync
= true;
87 write_options
.disableWAL
= false;
91 ASSERT_OK(db
->Write(write_options
, &batch
));
92 ASSERT_OK(db
->Write(write_options
, &batch
));
94 // Also test committing empty transactions in 2PC
95 TransactionOptions txn_options
;
96 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
97 ASSERT_OK(txn0
->SetName("xid"));
98 ASSERT_OK(txn0
->Prepare());
99 ASSERT_OK(txn0
->Commit());
102 // Also test that it works during recovery
103 txn0
= db
->BeginTransaction(write_options
, txn_options
);
104 ASSERT_OK(txn0
->SetName("xid2"));
105 txn0
->Put(Slice("foo0"), Slice("bar0a"));
106 ASSERT_OK(txn0
->Prepare());
108 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
109 ASSERT_OK(ReOpenNoDelete());
110 assert(db
!= nullptr);
111 txn0
= db
->GetTransactionByName("xid2");
112 ASSERT_OK(txn0
->Commit());
116 TEST_P(TransactionTest
, SuccessTest
) {
117 ASSERT_OK(db
->ResetStats());
119 WriteOptions write_options
;
120 ReadOptions read_options
;
123 ASSERT_OK(db
->Put(write_options
, Slice("foo"), Slice("bar")));
124 ASSERT_OK(db
->Put(write_options
, Slice("foo2"), Slice("bar")));
126 Transaction
* txn
= db
->BeginTransaction(write_options
, TransactionOptions());
129 ASSERT_EQ(0, txn
->GetNumPuts());
130 ASSERT_LE(0, txn
->GetID());
132 ASSERT_OK(txn
->GetForUpdate(read_options
, "foo", &value
));
133 ASSERT_EQ(value
, "bar");
135 ASSERT_OK(txn
->Put(Slice("foo"), Slice("bar2")));
137 ASSERT_EQ(1, txn
->GetNumPuts());
139 ASSERT_OK(txn
->GetForUpdate(read_options
, "foo", &value
));
140 ASSERT_EQ(value
, "bar2");
142 ASSERT_OK(txn
->Commit());
144 ASSERT_OK(db
->Get(read_options
, "foo", &value
));
145 ASSERT_EQ(value
, "bar2");
150 // The test clarifies the contract of do_validate and assume_tracked
151 // in GetForUpdate and Put/Merge/Delete
152 TEST_P(TransactionTest
, AssumeExclusiveTracked
) {
153 WriteOptions write_options
;
154 ReadOptions read_options
;
157 TransactionOptions txn_options
;
158 txn_options
.lock_timeout
= 1;
159 const bool EXCLUSIVE
= true;
160 const bool DO_VALIDATE
= true;
161 const bool ASSUME_LOCKED
= true;
163 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
167 // commit a value after the snapshot is taken
168 ASSERT_OK(db
->Put(write_options
, Slice("foo"), Slice("bar")));
170 // By default write should fail to the commit after our snapshot
171 s
= txn
->GetForUpdate(read_options
, "foo", &value
, EXCLUSIVE
);
172 ASSERT_TRUE(s
.IsBusy());
173 // But the user could direct the db to skip validating the snapshot. The read
174 // value then should be the most recently committed
176 txn
->GetForUpdate(read_options
, "foo", &value
, EXCLUSIVE
, !DO_VALIDATE
));
177 ASSERT_EQ(value
, "bar");
179 // Although ValidateSnapshot is skipped the key must have still got locked
180 s
= db
->Put(write_options
, Slice("foo"), Slice("bar"));
181 ASSERT_TRUE(s
.IsTimedOut());
183 // By default the write operations should fail due to the commit after the
185 s
= txn
->Put(Slice("foo"), Slice("bar1"));
186 ASSERT_TRUE(s
.IsBusy());
187 s
= txn
->Put(db
->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
189 ASSERT_TRUE(s
.IsBusy());
190 // But the user could direct the db that it already assumes exclusive lock on
191 // the key due to the previous GetForUpdate call.
192 ASSERT_OK(txn
->Put(db
->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
194 ASSERT_OK(txn
->Merge(db
->DefaultColumnFamily(), Slice("foo"), Slice("bar2"),
197 txn
->Delete(db
->DefaultColumnFamily(), Slice("foo"), ASSUME_LOCKED
));
198 ASSERT_OK(txn
->SingleDelete(db
->DefaultColumnFamily(), Slice("foo"),
201 ASSERT_OK(txn
->Rollback());
205 // This test clarifies the contract of ValidateSnapshot
206 TEST_P(TransactionTest
, ValidateSnapshotTest
) {
207 for (bool with_flush
: {true}) {
208 for (bool with_2pc
: {true}) {
210 WriteOptions write_options
;
211 ReadOptions read_options
;
214 assert(db
!= nullptr);
216 db
->BeginTransaction(write_options
, TransactionOptions());
218 ASSERT_OK(txn1
->Put(Slice("foo"), Slice("bar1")));
220 ASSERT_OK(txn1
->SetName("xid1"));
221 ASSERT_OK(txn1
->Prepare());
225 auto db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
226 ASSERT_OK(db_impl
->TEST_FlushMemTable(true));
227 // Make sure the flushed memtable is not kept in memory
228 int max_memtable_in_history
=
230 options
.max_write_buffer_number
,
231 static_cast<int>(options
.max_write_buffer_size_to_maintain
) /
232 static_cast<int>(options
.write_buffer_size
)) +
234 for (int i
= 0; i
< max_memtable_in_history
; i
++) {
235 ASSERT_OK(db
->Put(write_options
, Slice("key"), Slice("value")));
236 ASSERT_OK(db_impl
->TEST_FlushMemTable(true));
241 db
->BeginTransaction(write_options
, TransactionOptions());
245 ASSERT_OK(txn1
->Commit());
248 auto pes_txn2
= dynamic_cast<PessimisticTransaction
*>(txn2
);
249 // Test the simple case where the key is not tracked yet
250 auto trakced_seq
= kMaxSequenceNumber
;
251 auto s
= pes_txn2
->ValidateSnapshot(db
->DefaultColumnFamily(), "foo",
253 ASSERT_TRUE(s
.IsBusy());
259 TEST_P(TransactionTest
, WaitingTxn
) {
260 WriteOptions write_options
;
261 ReadOptions read_options
;
262 TransactionOptions txn_options
;
266 txn_options
.lock_timeout
= 1;
267 s
= db
->Put(write_options
, Slice("foo"), Slice("bar"));
270 /* create second cf */
271 ColumnFamilyHandle
* cfa
;
272 ColumnFamilyOptions cf_options
;
273 s
= db
->CreateColumnFamily(cf_options
, "CFA", &cfa
);
275 s
= db
->Put(write_options
, cfa
, Slice("foo"), Slice("bar"));
278 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
279 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
280 TransactionID id1
= txn1
->GetID();
284 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
285 "PointLockManager::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) {
288 std::vector
<TransactionID
> wait
= txn2
->GetWaitingTxns(&cf_id
, &key
);
289 ASSERT_EQ(key
, "foo");
290 ASSERT_EQ(wait
.size(), 1);
291 ASSERT_EQ(wait
[0], id1
);
292 ASSERT_EQ(cf_id
, 0U);
295 get_perf_context()->Reset();
296 // lock key in default cf
297 s
= txn1
->GetForUpdate(read_options
, "foo", &value
);
299 ASSERT_EQ(value
, "bar");
300 ASSERT_EQ(get_perf_context()->key_lock_wait_count
, 0);
303 s
= txn1
->GetForUpdate(read_options
, cfa
, "foo", &value
);
305 ASSERT_EQ(value
, "bar");
306 ASSERT_EQ(get_perf_context()->key_lock_wait_count
, 0);
308 auto lock_data
= db
->GetLockStatusData();
309 // Locked keys exist in both column family.
310 ASSERT_EQ(lock_data
.size(), 2);
312 auto cf_iterator
= lock_data
.begin();
314 // The iterator points to an unordered_multimap
315 // thus the test can not assume any particular order.
317 // Column family is 1 or 0 (cfa).
318 if (cf_iterator
->first
!= 1 && cf_iterator
->first
!= 0) {
321 // The locked key is "foo" and is locked by txn1
322 ASSERT_EQ(cf_iterator
->second
.key
, "foo");
323 ASSERT_EQ(cf_iterator
->second
.ids
.size(), 1);
324 ASSERT_EQ(cf_iterator
->second
.ids
[0], txn1
->GetID());
328 // Column family is 0 (default) or 1.
329 if (cf_iterator
->first
!= 1 && cf_iterator
->first
!= 0) {
332 // The locked key is "foo" and is locked by txn1
333 ASSERT_EQ(cf_iterator
->second
.key
, "foo");
334 ASSERT_EQ(cf_iterator
->second
.ids
.size(), 1);
335 ASSERT_EQ(cf_iterator
->second
.ids
[0], txn1
->GetID());
337 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
339 s
= txn2
->GetForUpdate(read_options
, "foo", &value
);
340 ASSERT_TRUE(s
.IsTimedOut());
341 ASSERT_EQ(s
.ToString(), "Operation timed out: Timeout waiting to lock key");
342 ASSERT_EQ(get_perf_context()->key_lock_wait_count
, 1);
343 ASSERT_GE(get_perf_context()->key_lock_wait_time
, 0);
345 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
346 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
353 TEST_P(TransactionTest
, SharedLocks
) {
354 WriteOptions write_options
;
355 ReadOptions read_options
;
356 TransactionOptions txn_options
;
359 txn_options
.lock_timeout
= 1;
360 s
= db
->Put(write_options
, Slice("foo"), Slice("bar"));
363 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
364 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
365 Transaction
* txn3
= db
->BeginTransaction(write_options
, txn_options
);
370 // Test shared access between txns
371 s
= txn1
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
374 s
= txn2
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
377 s
= txn3
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
380 auto lock_data
= db
->GetLockStatusData();
381 ASSERT_EQ(lock_data
.size(), 1);
383 auto cf_iterator
= lock_data
.begin();
384 ASSERT_EQ(cf_iterator
->second
.key
, "foo");
386 // We compare whether the set of txns locking this key is the same. To do
387 // this, we need to sort both vectors so that the comparison is done
389 std::vector
<TransactionID
> expected_txns
= {txn1
->GetID(), txn2
->GetID(),
391 std::vector
<TransactionID
> lock_txns
= cf_iterator
->second
.ids
;
392 ASSERT_EQ(expected_txns
, lock_txns
);
393 ASSERT_FALSE(cf_iterator
->second
.exclusive
);
395 ASSERT_OK(txn1
->Rollback());
396 ASSERT_OK(txn2
->Rollback());
397 ASSERT_OK(txn3
->Rollback());
399 // Test txn1 and txn2 sharing a lock and txn3 trying to obtain it.
400 s
= txn1
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
403 s
= txn2
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
406 s
= txn3
->GetForUpdate(read_options
, "foo", nullptr);
407 ASSERT_TRUE(s
.IsTimedOut());
408 ASSERT_EQ(s
.ToString(), "Operation timed out: Timeout waiting to lock key");
410 txn1
->UndoGetForUpdate("foo");
411 s
= txn3
->GetForUpdate(read_options
, "foo", nullptr);
412 ASSERT_TRUE(s
.IsTimedOut());
413 ASSERT_EQ(s
.ToString(), "Operation timed out: Timeout waiting to lock key");
415 txn2
->UndoGetForUpdate("foo");
416 s
= txn3
->GetForUpdate(read_options
, "foo", nullptr);
419 ASSERT_OK(txn1
->Rollback());
420 ASSERT_OK(txn2
->Rollback());
421 ASSERT_OK(txn3
->Rollback());
423 // Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock.
424 s
= txn1
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
427 s
= txn2
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
430 s
= txn2
->GetForUpdate(read_options
, "foo", nullptr);
431 ASSERT_TRUE(s
.IsTimedOut());
432 ASSERT_EQ(s
.ToString(), "Operation timed out: Timeout waiting to lock key");
434 txn1
->UndoGetForUpdate("foo");
435 s
= txn2
->GetForUpdate(read_options
, "foo", nullptr);
438 ASSERT_OK(txn1
->Rollback());
439 ASSERT_OK(txn2
->Rollback());
441 // Test txn1 trying to downgrade its lock.
442 s
= txn1
->GetForUpdate(read_options
, "foo", nullptr, true /* exclusive */);
445 s
= txn2
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
446 ASSERT_TRUE(s
.IsTimedOut());
447 ASSERT_EQ(s
.ToString(), "Operation timed out: Timeout waiting to lock key");
449 // Should still fail after "downgrading".
450 s
= txn1
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
453 s
= txn2
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
454 ASSERT_TRUE(s
.IsTimedOut());
455 ASSERT_EQ(s
.ToString(), "Operation timed out: Timeout waiting to lock key");
457 ASSERT_OK(txn1
->Rollback());
458 ASSERT_OK(txn2
->Rollback());
460 // Test txn1 holding an exclusive lock and txn2 trying to obtain shared
462 s
= txn1
->GetForUpdate(read_options
, "foo", nullptr);
465 s
= txn2
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
466 ASSERT_TRUE(s
.IsTimedOut());
467 ASSERT_EQ(s
.ToString(), "Operation timed out: Timeout waiting to lock key");
469 txn1
->UndoGetForUpdate("foo");
470 s
= txn2
->GetForUpdate(read_options
, "foo", nullptr, false /* exclusive */);
478 TEST_P(TransactionTest
, DeadlockCycleShared
) {
479 WriteOptions write_options
;
480 ReadOptions read_options
;
481 TransactionOptions txn_options
;
483 txn_options
.lock_timeout
= 1000000;
484 txn_options
.deadlock_detect
= true;
486 // Set up a wait for chain like this:
492 // T1 -> T2 -> T4 ...
496 // up to T31, then T[16 - 31] -> T1.
497 // Note that Tn holds lock on floor(n / 2).
499 std::vector
<Transaction
*> txns(31);
501 for (uint32_t i
= 0; i
< 31; i
++) {
502 txns
[i
] = db
->BeginTransaction(write_options
, txn_options
);
503 ASSERT_TRUE(txns
[i
]);
504 auto s
= txns
[i
]->GetForUpdate(read_options
, ToString((i
+ 1) / 2), nullptr,
505 false /* exclusive */);
509 std::atomic
<uint32_t> checkpoints(0);
510 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
511 "PointLockManager::AcquireWithTimeout:WaitingTxn",
512 [&](void* /*arg*/) { checkpoints
.fetch_add(1); });
513 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
515 // We want the leaf transactions to block and hold everyone back.
516 std::vector
<port::Thread
> threads
;
517 for (uint32_t i
= 0; i
< 15; i
++) {
518 std::function
<void()> blocking_thread
= [&, i
] {
519 auto s
= txns
[i
]->GetForUpdate(read_options
, ToString(i
+ 1), nullptr,
520 true /* exclusive */);
522 ASSERT_OK(txns
[i
]->Rollback());
525 threads
.emplace_back(blocking_thread
);
528 // Wait until all threads are waiting on each other.
529 while (checkpoints
.load() != 15) {
531 std::this_thread::sleep_for(std::chrono::milliseconds(100));
533 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
534 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
536 // Complete the cycle T[16 - 31] -> T1
537 for (uint32_t i
= 15; i
< 31; i
++) {
539 txns
[i
]->GetForUpdate(read_options
, "0", nullptr, true /* exclusive */);
540 ASSERT_TRUE(s
.IsDeadlock());
542 // Calculate next buffer len, plateau at 5 when 5 records are inserted.
543 const uint32_t curr_dlock_buffer_len_
=
544 (i
- 14 > kInitialMaxDeadlocks
) ? kInitialMaxDeadlocks
: (i
- 14);
546 auto dlock_buffer
= db
->GetDeadlockInfoBuffer();
547 ASSERT_EQ(dlock_buffer
.size(), curr_dlock_buffer_len_
);
548 auto dlock_entry
= dlock_buffer
[0].path
;
549 ASSERT_EQ(dlock_entry
.size(), kInitialMaxDeadlocks
);
550 int64_t pre_deadlock_time
= dlock_buffer
[0].deadlock_time
;
551 int64_t cur_deadlock_time
= 0;
552 for (auto const& dl_path_rec
: dlock_buffer
) {
553 cur_deadlock_time
= dl_path_rec
.deadlock_time
;
554 ASSERT_NE(cur_deadlock_time
, 0);
555 ASSERT_TRUE(cur_deadlock_time
<= pre_deadlock_time
);
556 pre_deadlock_time
= cur_deadlock_time
;
559 int64_t curr_waiting_key
= 0;
561 // Offset of each txn id from the root of the shared dlock tree's txn id.
562 int64_t offset_root
= dlock_entry
[0].m_txn_id
- 1;
563 // Offset of the final entry in the dlock path from the root's txn id.
564 TransactionID leaf_id
=
565 dlock_entry
[dlock_entry
.size() - 1].m_txn_id
- offset_root
;
567 for (auto it
= dlock_entry
.rbegin(); it
!= dlock_entry
.rend(); ++it
) {
569 ASSERT_EQ(dl_node
.m_txn_id
, offset_root
+ leaf_id
);
570 ASSERT_EQ(dl_node
.m_cf_id
, 0U);
571 ASSERT_EQ(dl_node
.m_waiting_key
, ToString(curr_waiting_key
));
572 ASSERT_EQ(dl_node
.m_exclusive
, true);
574 if (curr_waiting_key
== 0) {
575 curr_waiting_key
= leaf_id
;
577 curr_waiting_key
/= 2;
582 // Rollback the leaf transaction.
583 for (uint32_t i
= 15; i
< 31; i
++) {
584 ASSERT_OK(txns
[i
]->Rollback());
588 for (auto& t
: threads
) {
592 // Downsize the buffer and verify the 3 latest deadlocks are preserved.
593 auto dlock_buffer_before_resize
= db
->GetDeadlockInfoBuffer();
594 db
->SetDeadlockInfoBufferSize(3);
595 auto dlock_buffer_after_resize
= db
->GetDeadlockInfoBuffer();
596 ASSERT_EQ(dlock_buffer_after_resize
.size(), 3);
598 for (uint32_t i
= 0; i
< dlock_buffer_after_resize
.size(); i
++) {
599 for (uint32_t j
= 0; j
< dlock_buffer_after_resize
[i
].path
.size(); j
++) {
600 ASSERT_EQ(dlock_buffer_after_resize
[i
].path
[j
].m_txn_id
,
601 dlock_buffer_before_resize
[i
].path
[j
].m_txn_id
);
605 // Upsize the buffer and verify the 3 latest dealocks are preserved.
606 dlock_buffer_before_resize
= db
->GetDeadlockInfoBuffer();
607 db
->SetDeadlockInfoBufferSize(5);
608 dlock_buffer_after_resize
= db
->GetDeadlockInfoBuffer();
609 ASSERT_EQ(dlock_buffer_after_resize
.size(), 3);
611 for (uint32_t i
= 0; i
< dlock_buffer_before_resize
.size(); i
++) {
612 for (uint32_t j
= 0; j
< dlock_buffer_before_resize
[i
].path
.size(); j
++) {
613 ASSERT_EQ(dlock_buffer_after_resize
[i
].path
[j
].m_txn_id
,
614 dlock_buffer_before_resize
[i
].path
[j
].m_txn_id
);
618 // Downsize to 0 and verify the size is consistent.
619 dlock_buffer_before_resize
= db
->GetDeadlockInfoBuffer();
620 db
->SetDeadlockInfoBufferSize(0);
621 dlock_buffer_after_resize
= db
->GetDeadlockInfoBuffer();
622 ASSERT_EQ(dlock_buffer_after_resize
.size(), 0);
624 // Upsize from 0 to verify the size is persistent.
625 dlock_buffer_before_resize
= db
->GetDeadlockInfoBuffer();
626 db
->SetDeadlockInfoBufferSize(3);
627 dlock_buffer_after_resize
= db
->GetDeadlockInfoBuffer();
628 ASSERT_EQ(dlock_buffer_after_resize
.size(), 0);
630 // Contrived case of shared lock of cycle size 2 to verify that a shared
631 // lock causing a deadlock is correctly reported as "shared" in the buffer.
632 std::vector
<Transaction
*> txns_shared(2);
634 // Create a cycle of size 2.
635 for (uint32_t i
= 0; i
< 2; i
++) {
636 txns_shared
[i
] = db
->BeginTransaction(write_options
, txn_options
);
637 ASSERT_TRUE(txns_shared
[i
]);
638 auto s
= txns_shared
[i
]->GetForUpdate(read_options
, ToString(i
), nullptr);
642 std::atomic
<uint32_t> checkpoints_shared(0);
643 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
644 "PointLockManager::AcquireWithTimeout:WaitingTxn",
645 [&](void* /*arg*/) { checkpoints_shared
.fetch_add(1); });
646 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
648 std::vector
<port::Thread
> threads_shared
;
649 for (uint32_t i
= 0; i
< 1; i
++) {
650 std::function
<void()> blocking_thread
= [&, i
] {
652 txns_shared
[i
]->GetForUpdate(read_options
, ToString(i
+ 1), nullptr);
654 ASSERT_OK(txns_shared
[i
]->Rollback());
655 delete txns_shared
[i
];
657 threads_shared
.emplace_back(blocking_thread
);
660 // Wait until all threads are waiting on each other.
661 while (checkpoints_shared
.load() != 1) {
663 std::this_thread::sleep_for(std::chrono::milliseconds(100));
665 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
666 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
668 // Complete the cycle T2 -> T1 with a shared lock.
669 auto s
= txns_shared
[1]->GetForUpdate(read_options
, "0", nullptr, false);
670 ASSERT_TRUE(s
.IsDeadlock());
672 auto dlock_buffer
= db
->GetDeadlockInfoBuffer();
674 // Verify the size of the buffer and the single path.
675 ASSERT_EQ(dlock_buffer
.size(), 1);
676 ASSERT_EQ(dlock_buffer
[0].path
.size(), 2);
678 // Verify the exclusivity field of the transactions in the deadlock path.
679 ASSERT_TRUE(dlock_buffer
[0].path
[0].m_exclusive
);
680 ASSERT_FALSE(dlock_buffer
[0].path
[1].m_exclusive
);
681 ASSERT_OK(txns_shared
[1]->Rollback());
682 delete txns_shared
[1];
684 for (auto& t
: threads_shared
) {
689 #ifndef ROCKSDB_VALGRIND_RUN
690 TEST_P(TransactionStressTest
, DeadlockCycle
) {
691 WriteOptions write_options
;
692 ReadOptions read_options
;
693 TransactionOptions txn_options
;
695 // offset by 2 from the max depth to test edge case
696 const uint32_t kMaxCycleLength
= 52;
698 txn_options
.lock_timeout
= 1000000;
699 txn_options
.deadlock_detect
= true;
701 for (uint32_t len
= 2; len
< kMaxCycleLength
; len
++) {
702 // Set up a long wait for chain like this:
704 // T1 -> T2 -> T3 -> ... -> Tlen
706 std::vector
<Transaction
*> txns(len
);
708 for (uint32_t i
= 0; i
< len
; i
++) {
709 txns
[i
] = db
->BeginTransaction(write_options
, txn_options
);
710 ASSERT_TRUE(txns
[i
]);
711 auto s
= txns
[i
]->GetForUpdate(read_options
, ToString(i
), nullptr);
715 std::atomic
<uint32_t> checkpoints(0);
716 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
717 "PointLockManager::AcquireWithTimeout:WaitingTxn",
718 [&](void* /*arg*/) { checkpoints
.fetch_add(1); });
719 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
721 // We want the last transaction in the chain to block and hold everyone
723 std::vector
<port::Thread
> threads
;
724 for (uint32_t i
= 0; i
+ 1 < len
; i
++) {
725 std::function
<void()> blocking_thread
= [&, i
] {
726 auto s
= txns
[i
]->GetForUpdate(read_options
, ToString(i
+ 1), nullptr);
728 ASSERT_OK(txns
[i
]->Rollback());
731 threads
.emplace_back(blocking_thread
);
734 // Wait until all threads are waiting on each other.
735 while (checkpoints
.load() != len
- 1) {
737 std::this_thread::sleep_for(std::chrono::milliseconds(100));
739 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
740 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
742 // Complete the cycle Tlen -> T1
743 auto s
= txns
[len
- 1]->GetForUpdate(read_options
, "0", nullptr);
744 ASSERT_TRUE(s
.IsDeadlock());
746 const uint32_t dlock_buffer_size_
= (len
- 1 > 5) ? 5 : (len
- 1);
747 uint32_t curr_waiting_key
= 0;
748 TransactionID curr_txn_id
= txns
[0]->GetID();
750 auto dlock_buffer
= db
->GetDeadlockInfoBuffer();
751 ASSERT_EQ(dlock_buffer
.size(), dlock_buffer_size_
);
752 uint32_t check_len
= len
;
753 bool check_limit_flag
= false;
755 // Special case for a deadlock path that exceeds the maximum depth.
758 check_limit_flag
= true;
760 auto dlock_entry
= dlock_buffer
[0].path
;
761 ASSERT_EQ(dlock_entry
.size(), check_len
);
762 ASSERT_EQ(dlock_buffer
[0].limit_exceeded
, check_limit_flag
);
764 int64_t pre_deadlock_time
= dlock_buffer
[0].deadlock_time
;
765 int64_t cur_deadlock_time
= 0;
766 for (auto const& dl_path_rec
: dlock_buffer
) {
767 cur_deadlock_time
= dl_path_rec
.deadlock_time
;
768 ASSERT_NE(cur_deadlock_time
, 0);
769 ASSERT_TRUE(cur_deadlock_time
<= pre_deadlock_time
);
770 pre_deadlock_time
= cur_deadlock_time
;
773 // Iterates backwards over path verifying decreasing txn_ids.
774 for (auto it
= dlock_entry
.rbegin(); it
!= dlock_entry
.rend(); ++it
) {
776 ASSERT_EQ(dl_node
.m_txn_id
, len
+ curr_txn_id
- 1);
777 ASSERT_EQ(dl_node
.m_cf_id
, 0u);
778 ASSERT_EQ(dl_node
.m_waiting_key
, ToString(curr_waiting_key
));
779 ASSERT_EQ(dl_node
.m_exclusive
, true);
782 if (curr_waiting_key
== 0) {
783 curr_waiting_key
= len
;
788 // Rollback the last transaction.
789 ASSERT_OK(txns
[len
- 1]->Rollback());
790 delete txns
[len
- 1];
792 for (auto& t
: threads
) {
798 TEST_P(TransactionStressTest
, DeadlockStress
) {
799 const uint32_t NUM_TXN_THREADS
= 10;
800 const uint32_t NUM_KEYS
= 100;
801 const uint32_t NUM_ITERS
= 10000;
803 WriteOptions write_options
;
804 ReadOptions read_options
;
805 TransactionOptions txn_options
;
807 txn_options
.lock_timeout
= 1000000;
808 txn_options
.deadlock_detect
= true;
809 std::vector
<std::string
> keys
;
811 for (uint32_t i
= 0; i
< NUM_KEYS
; i
++) {
812 ASSERT_OK(db
->Put(write_options
, Slice(ToString(i
)), Slice("")));
813 keys
.push_back(ToString(i
));
816 size_t tid
= std::hash
<std::thread::id
>()(std::this_thread::get_id());
817 Random
rnd(static_cast<uint32_t>(tid
));
818 std::function
<void(uint32_t)> stress_thread
= [&](uint32_t seed
) {
819 std::default_random_engine
g(seed
);
822 for (uint32_t i
= 0; i
< NUM_ITERS
; i
++) {
823 txn
= db
->BeginTransaction(write_options
, txn_options
);
824 auto random_keys
= keys
;
825 std::shuffle(random_keys
.begin(), random_keys
.end(), g
);
827 // Lock keys in random order.
828 for (const auto& k
: random_keys
) {
829 // Lock mostly for shared access, but exclusive 1/4 of the time.
831 txn
->GetForUpdate(read_options
, k
, nullptr, txn
->GetID() % 4 == 0);
833 ASSERT_TRUE(s
.IsDeadlock());
834 ASSERT_OK(txn
->Rollback());
843 std::vector
<port::Thread
> threads
;
844 for (uint32_t i
= 0; i
< NUM_TXN_THREADS
; i
++) {
845 threads
.emplace_back(stress_thread
, rnd
.Next());
848 for (auto& t
: threads
) {
852 #endif // ROCKSDB_VALGRIND_RUN
854 TEST_P(TransactionTest
, CommitTimeBatchFailTest
) {
855 WriteOptions write_options
;
856 TransactionOptions txn_options
;
861 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
864 ASSERT_OK(txn1
->GetCommitTimeWriteBatch()->Put("cat", "dog"));
866 s
= txn1
->Put("foo", "bar");
869 // fails due to non-empty commit-time batch
871 ASSERT_EQ(s
, Status::InvalidArgument());
876 TEST_P(TransactionTest
, LogMarkLeakTest
) {
877 TransactionOptions txn_options
;
878 WriteOptions write_options
;
879 options
.write_buffer_size
= 1024;
880 ASSERT_OK(ReOpenNoDelete());
881 assert(db
!= nullptr);
883 std::vector
<Transaction
*> txns
;
884 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
885 // At the beginning there should be no log containing prepare data
886 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(), 0);
887 for (size_t i
= 0; i
< 100; i
++) {
888 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
889 ASSERT_OK(txn
->SetName("xid" + ToString(i
)));
890 ASSERT_OK(txn
->Put(Slice("foo" + ToString(i
)), Slice("bar")));
891 ASSERT_OK(txn
->Prepare());
892 ASSERT_GT(db_impl
->TEST_FindMinLogContainingOutstandingPrep(), 0);
896 ASSERT_OK(txn
->Commit());
899 ASSERT_OK(db_impl
->TEST_FlushMemTable(true));
901 for (auto txn
: txns
) {
902 ASSERT_OK(txn
->Commit());
905 // At the end there should be no log left containing prepare data
906 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(), 0);
907 // Make sure that the underlying data structures are properly truncated and
909 ASSERT_EQ(db_impl
->TEST_PreparedSectionCompletedSize(), 0);
910 ASSERT_EQ(db_impl
->TEST_LogsWithPrepSize(), 0);
913 TEST_P(TransactionTest
, SimpleTwoPhaseTransactionTest
) {
914 for (bool cwb4recovery
: {true, false}) {
916 WriteOptions write_options
;
917 ReadOptions read_options
;
919 TransactionOptions txn_options
;
920 txn_options
.use_only_the_last_commit_time_batch_for_recovery
= cwb4recovery
;
925 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
927 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
928 s
= txn
->SetName("xid");
931 ASSERT_EQ(db
->GetTransactionByName("xid"), txn
);
934 s
= txn
->Put(Slice("foo"), Slice("bar"));
936 ASSERT_EQ(1, txn
->GetNumPuts());
939 s
= db
->Put(write_options
, Slice("foo2"), Slice("bar2"));
941 ASSERT_EQ(1, txn
->GetNumPuts());
944 ASSERT_OK(db
->Get(read_options
, "foo2", &value
));
945 ASSERT_EQ(value
, "bar2");
949 txn
->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")));
951 txn
->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")));
953 // nothing has been prepped yet
954 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(), 0);
959 // data not im mem yet
960 s
= db
->Get(read_options
, Slice("foo"), &value
);
961 ASSERT_TRUE(s
.IsNotFound());
962 s
= db
->Get(read_options
, Slice("gtid"), &value
);
963 ASSERT_TRUE(s
.IsNotFound());
965 // find trans in list of prepared transactions
966 std::vector
<Transaction
*> prepared_trans
;
967 db
->GetAllPreparedTransactions(&prepared_trans
);
968 ASSERT_EQ(prepared_trans
.size(), 1);
969 ASSERT_EQ(prepared_trans
.front()->GetName(), "xid");
971 auto log_containing_prep
=
972 db_impl
->TEST_FindMinLogContainingOutstandingPrep();
973 ASSERT_GT(log_containing_prep
, 0);
979 // value is now available
980 s
= db
->Get(read_options
, "foo", &value
);
982 ASSERT_EQ(value
, "bar");
985 s
= db
->Get(read_options
, "gtid", &value
);
987 ASSERT_EQ(value
, "dogs");
989 s
= db
->Get(read_options
, "gtid2", &value
);
991 ASSERT_EQ(value
, "cats");
994 // we already committed
996 ASSERT_EQ(s
, Status::InvalidArgument());
998 // no longer is prepared results
999 db
->GetAllPreparedTransactions(&prepared_trans
);
1000 ASSERT_EQ(prepared_trans
.size(), 0);
1001 ASSERT_EQ(db
->GetTransactionByName("xid"), nullptr);
1003 // heap should not care about prepared section anymore
1004 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(), 0);
1006 switch (txn_db_options
.write_policy
) {
1007 case WRITE_COMMITTED
:
1008 // but now our memtable should be referencing the prep section
1009 ASSERT_GE(log_containing_prep
, db_impl
->MinLogNumberToKeep());
1010 ASSERT_EQ(log_containing_prep
,
1011 db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1013 case WRITE_PREPARED
:
1014 case WRITE_UNPREPARED
:
1015 // In these modes memtable do not ref the prep sections
1016 ASSERT_EQ(0, db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1022 ASSERT_OK(db_impl
->TEST_FlushMemTable(true));
1023 // After flush the recoverable state must be visible
1025 s
= db
->Get(read_options
, "gtid", &value
);
1027 ASSERT_EQ(value
, "dogs");
1029 s
= db
->Get(read_options
, "gtid2", &value
);
1031 ASSERT_EQ(value
, "cats");
1034 // after memtable flush we can now relese the log
1035 ASSERT_GT(db_impl
->MinLogNumberToKeep(), log_containing_prep
);
1036 ASSERT_EQ(0, db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1041 // kill and reopen to trigger recovery
1042 s
= ReOpenNoDelete();
1044 assert(db
!= nullptr);
1045 s
= db
->Get(read_options
, "gtid", &value
);
1047 ASSERT_EQ(value
, "dogs");
1049 s
= db
->Get(read_options
, "gtid2", &value
);
1051 ASSERT_EQ(value
, "cats");
1056 TEST_P(TransactionTest
, TwoPhaseNameTest
) {
1059 WriteOptions write_options
;
1060 TransactionOptions txn_options
;
1061 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
1062 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
1063 Transaction
* txn3
= db
->BeginTransaction(write_options
, txn_options
);
1067 // cant prepare txn without name
1068 s
= txn1
->Prepare();
1069 ASSERT_EQ(s
, Status::InvalidArgument());
1072 s
= txn1
->SetName("");
1073 ASSERT_EQ(s
, Status::InvalidArgument());
1076 s
= txn1
->SetName(std::string(513, 'x'));
1077 ASSERT_EQ(s
, Status::InvalidArgument());
1080 s
= txn1
->SetName("name1");
1083 // cant have duplicate name
1084 s
= txn2
->SetName("name1");
1085 ASSERT_EQ(s
, Status::InvalidArgument());
1087 // shouldn't be able to prepare
1088 s
= txn2
->Prepare();
1089 ASSERT_EQ(s
, Status::InvalidArgument());
1092 s
= txn2
->SetName("name2");
1096 s
= txn2
->SetName("name3");
1097 ASSERT_EQ(s
, Status::InvalidArgument());
1099 ASSERT_EQ(txn1
->GetName(), "name1");
1100 ASSERT_EQ(txn2
->GetName(), "name2");
1102 s
= txn1
->Prepare();
1105 // can't rename after prepare
1106 s
= txn1
->SetName("name4");
1107 ASSERT_EQ(s
, Status::InvalidArgument());
1109 ASSERT_OK(txn1
->Rollback());
1110 ASSERT_OK(txn2
->Rollback());
1115 TEST_P(TransactionTest
, TwoPhaseEmptyWriteTest
) {
1116 for (bool cwb4recovery
: {true, false}) {
1117 for (bool test_with_empty_wal
: {true, false}) {
1118 if (!cwb4recovery
&& test_with_empty_wal
) {
1121 ASSERT_OK(ReOpen());
1125 WriteOptions write_options
;
1126 ReadOptions read_options
;
1127 TransactionOptions txn_options
;
1128 txn_options
.use_only_the_last_commit_time_batch_for_recovery
=
1130 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
1132 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
1135 s
= txn1
->SetName("joe");
1138 s
= txn2
->SetName("bob");
1141 s
= txn1
->Prepare();
1150 txn2
->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")));
1152 s
= txn2
->Prepare();
1159 if (!cwb4recovery
) {
1160 s
= db
->Get(read_options
, "foo", &value
);
1162 ASSERT_EQ(value
, "bar");
1164 if (test_with_empty_wal
) {
1165 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1166 ASSERT_OK(db_impl
->TEST_FlushMemTable(true));
1167 // After flush the state must be visible
1168 s
= db
->Get(read_options
, "foo", &value
);
1170 ASSERT_EQ(value
, "bar");
1172 ASSERT_OK(db
->FlushWAL(true));
1173 // kill and reopen to trigger recovery
1174 s
= ReOpenNoDelete();
1176 assert(db
!= nullptr);
1177 s
= db
->Get(read_options
, "foo", &value
);
1179 ASSERT_EQ(value
, "bar");
1185 #ifndef ROCKSDB_VALGRIND_RUN
1186 TEST_P(TransactionStressTest
, TwoPhaseExpirationTest
) {
1189 WriteOptions write_options
;
1190 TransactionOptions txn_options
;
1191 txn_options
.expiration
= 500; // 500ms
1192 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
1193 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
1197 s
= txn1
->SetName("joe");
1199 s
= txn2
->SetName("bob");
1202 s
= txn1
->Prepare();
1205 /* sleep override */
1206 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
1211 s
= txn2
->Prepare();
1212 ASSERT_EQ(s
, Status::Expired());
1218 TEST_P(TransactionTest
, TwoPhaseRollbackTest
) {
1219 WriteOptions write_options
;
1220 ReadOptions read_options
;
1222 TransactionOptions txn_options
;
1227 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1228 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
1229 s
= txn
->SetName("xid");
1233 s
= txn
->Put(Slice("tfoo"), Slice("tbar"));
1236 // value is readable form txn
1237 s
= txn
->Get(read_options
, Slice("tfoo"), &value
);
1239 ASSERT_EQ(value
, "tbar");
1242 s
= txn
->Rollback();
1245 // value is nolonger readable
1246 s
= txn
->Get(read_options
, Slice("tfoo"), &value
);
1247 ASSERT_TRUE(s
.IsNotFound());
1248 ASSERT_EQ(txn
->GetNumPuts(), 0);
1250 // put new txn values
1251 s
= txn
->Put(Slice("tfoo2"), Slice("tbar2"));
1254 // new value is readable from txn
1255 s
= txn
->Get(read_options
, Slice("tfoo2"), &value
);
1257 ASSERT_EQ(value
, "tbar2");
1262 // flush to next wal
1263 s
= db
->Put(write_options
, Slice("foo"), Slice("bar"));
1265 ASSERT_OK(db_impl
->TEST_FlushMemTable(true));
1267 // issue rollback (marker written to WAL)
1268 s
= txn
->Rollback();
1271 // value is nolonger readable
1272 s
= txn
->Get(read_options
, Slice("tfoo2"), &value
);
1273 ASSERT_TRUE(s
.IsNotFound());
1274 ASSERT_EQ(txn
->GetNumPuts(), 0);
1278 ASSERT_EQ(s
, Status::InvalidArgument());
1280 // try rollback again
1281 s
= txn
->Rollback();
1282 ASSERT_EQ(s
, Status::InvalidArgument());
1287 TEST_P(TransactionTest
, PersistentTwoPhaseTransactionTest
) {
1288 WriteOptions write_options
;
1289 write_options
.sync
= true;
1290 write_options
.disableWAL
= false;
1291 ReadOptions read_options
;
1293 TransactionOptions txn_options
;
1298 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1300 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
1301 s
= txn
->SetName("xid");
1304 ASSERT_EQ(db
->GetTransactionByName("xid"), txn
);
1307 s
= txn
->Put(Slice("foo"), Slice("bar"));
1309 ASSERT_EQ(1, txn
->GetNumPuts());
1312 s
= txn
->Get(read_options
, "foo", &value
);
1314 ASSERT_EQ(value
, "bar");
1317 s
= db
->Put(write_options
, Slice("foo2"), Slice("bar2"));
1319 ASSERT_EQ(1, txn
->GetNumPuts());
1321 ASSERT_OK(db_impl
->TEST_FlushMemTable(true));
1324 db
->Get(read_options
, "foo2", &value
);
1325 ASSERT_EQ(value
, "bar2");
1327 // nothing has been prepped yet
1328 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(), 0);
1334 // still not available to db
1335 s
= db
->Get(read_options
, Slice("foo"), &value
);
1336 ASSERT_TRUE(s
.IsNotFound());
1338 ASSERT_OK(db
->FlushWAL(false));
1341 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
1342 s
= ReOpenNoDelete();
1344 assert(db
!= nullptr);
1345 db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1347 // find trans in list of prepared transactions
1348 std::vector
<Transaction
*> prepared_trans
;
1349 db
->GetAllPreparedTransactions(&prepared_trans
);
1350 ASSERT_EQ(prepared_trans
.size(), 1);
1352 txn
= prepared_trans
.front();
1354 ASSERT_EQ(txn
->GetName(), "xid");
1355 ASSERT_EQ(db
->GetTransactionByName("xid"), txn
);
1357 // log has been marked
1358 auto log_containing_prep
=
1359 db_impl
->TEST_FindMinLogContainingOutstandingPrep();
1360 ASSERT_GT(log_containing_prep
, 0);
1362 // value is readable from txn
1363 s
= txn
->Get(read_options
, "foo", &value
);
1365 ASSERT_EQ(value
, "bar");
1371 // value is now available
1372 db
->Get(read_options
, "foo", &value
);
1373 ASSERT_EQ(value
, "bar");
1375 // we already committed
1377 ASSERT_EQ(s
, Status::InvalidArgument());
1379 // no longer is prepared results
1380 prepared_trans
.clear();
1381 db
->GetAllPreparedTransactions(&prepared_trans
);
1382 ASSERT_EQ(prepared_trans
.size(), 0);
1384 // transaction should no longer be visible
1385 ASSERT_EQ(db
->GetTransactionByName("xid"), nullptr);
1387 // heap should not care about prepared section anymore
1388 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(), 0);
1390 switch (txn_db_options
.write_policy
) {
1391 case WRITE_COMMITTED
:
1392 // but now our memtable should be referencing the prep section
1393 ASSERT_EQ(log_containing_prep
,
1394 db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1395 ASSERT_GE(log_containing_prep
, db_impl
->MinLogNumberToKeep());
1398 case WRITE_PREPARED
:
1399 case WRITE_UNPREPARED
:
1400 // In these modes memtable do not ref the prep sections
1401 ASSERT_EQ(0, db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1407 // Add a dummy record to memtable before a flush. Otherwise, the
1408 // memtable will be empty and flush will be skipped.
1409 s
= db
->Put(write_options
, Slice("foo3"), Slice("bar3"));
1412 ASSERT_OK(db_impl
->TEST_FlushMemTable(true));
1414 // after memtable flush we can now release the log
1415 ASSERT_GT(db_impl
->MinLogNumberToKeep(), log_containing_prep
);
1416 ASSERT_EQ(0, db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1420 // deleting transaction should unregister transaction
1421 ASSERT_EQ(db
->GetTransactionByName("xid"), nullptr);
1423 #endif // ROCKSDB_VALGRIND_RUN
1425 // TODO this test needs to be updated with serial commits
1426 TEST_P(TransactionTest
, DISABLED_TwoPhaseMultiThreadTest
) {
1427 // mix transaction writes and regular writes
1428 const uint32_t NUM_TXN_THREADS
= 50;
1429 std::atomic
<uint32_t> txn_thread_num(0);
1431 std::function
<void()> txn_write_thread
= [&]() {
1432 uint32_t id
= txn_thread_num
.fetch_add(1);
1434 WriteOptions write_options
;
1435 write_options
.sync
= true;
1436 write_options
.disableWAL
= false;
1437 TransactionOptions txn_options
;
1438 txn_options
.lock_timeout
= 1000000;
1440 txn_options
.expiration
= 1000000;
1442 TransactionName
name("xid_" + std::string(1, 'A' + static_cast<char>(id
)));
1443 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
1444 ASSERT_OK(txn
->SetName(name
));
1445 for (int i
= 0; i
< 10; i
++) {
1446 std::string
key(name
+ "_" + std::string(1, static_cast<char>('A' + i
)));
1447 ASSERT_OK(txn
->Put(key
, "val"));
1449 ASSERT_OK(txn
->Prepare());
1450 ASSERT_OK(txn
->Commit());
1454 // assure that all thread are in the same write group
1455 std::atomic
<uint32_t> t_wait_on_prepare(0);
1456 std::atomic
<uint32_t> t_wait_on_commit(0);
1458 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1459 "WriteThread::JoinBatchGroup:Wait", [&](void* arg
) {
1460 auto* writer
= reinterpret_cast<WriteThread::Writer
*>(arg
);
1462 if (writer
->ShouldWriteToWAL()) {
1463 t_wait_on_prepare
.fetch_add(1);
1465 while (t_wait_on_prepare
.load() < NUM_TXN_THREADS
) {
1466 env
->SleepForMicroseconds(10);
1468 } else if (writer
->ShouldWriteToMemtable()) {
1469 t_wait_on_commit
.fetch_add(1);
1471 while (t_wait_on_commit
.load() < NUM_TXN_THREADS
) {
1472 env
->SleepForMicroseconds(10);
1479 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1481 // do all the writes
1482 std::vector
<port::Thread
> threads
;
1483 for (uint32_t i
= 0; i
< NUM_TXN_THREADS
; i
++) {
1484 threads
.emplace_back(txn_write_thread
);
1486 for (auto& t
: threads
) {
1490 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1491 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1493 ReadOptions read_options
;
1496 for (uint32_t t
= 0; t
< NUM_TXN_THREADS
; t
++) {
1497 TransactionName
name("xid_" + std::string(1, 'A' + static_cast<char>(t
)));
1498 for (int i
= 0; i
< 10; i
++) {
1499 std::string
key(name
+ "_" + std::string(1, static_cast<char>('A' + i
)));
1500 s
= db
->Get(read_options
, key
, &value
);
1502 ASSERT_EQ(value
, "val");
1507 TEST_P(TransactionStressTest
, TwoPhaseLongPrepareTest
) {
1508 WriteOptions write_options
;
1509 write_options
.sync
= true;
1510 write_options
.disableWAL
= false;
1511 ReadOptions read_options
;
1512 TransactionOptions txn_options
;
1517 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
1518 s
= txn
->SetName("bob");
1522 s
= txn
->Put(Slice("foo"), Slice("bar"));
1531 for (int i
= 0; i
< 1000; i
++) {
1532 std::string
key(i
, 'k');
1533 std::string
val(1000, 'v');
1534 assert(db
!= nullptr);
1535 s
= db
->Put(write_options
, key
, val
);
1540 env
->SetFilesystemActive(false);
1541 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
1543 } else if (i
% 37 == 0) {
1550 txn
= db
->GetTransactionByName("bob");
1555 // verify data txn data
1556 s
= db
->Get(read_options
, "foo", &value
);
1557 ASSERT_EQ(s
, Status::OK());
1558 ASSERT_EQ(value
, "bar");
1560 // verify non txn data
1561 for (int i
= 0; i
< 1000; i
++) {
1562 std::string
key(i
, 'k');
1563 std::string
val(1000, 'v');
1564 s
= db
->Get(read_options
, key
, &value
);
1565 ASSERT_EQ(s
, Status::OK());
1566 ASSERT_EQ(value
, val
);
1572 TEST_P(TransactionTest
, TwoPhaseSequenceTest
) {
1573 WriteOptions write_options
;
1574 write_options
.sync
= true;
1575 write_options
.disableWAL
= false;
1576 ReadOptions read_options
;
1578 TransactionOptions txn_options
;
1583 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
1584 s
= txn
->SetName("xid");
1588 s
= txn
->Put(Slice("foo"), Slice("bar"));
1590 s
= txn
->Put(Slice("foo2"), Slice("bar2"));
1592 s
= txn
->Put(Slice("foo3"), Slice("bar3"));
1594 s
= txn
->Put(Slice("foo4"), Slice("bar4"));
1608 env
->SetFilesystemActive(false);
1610 assert(db
!= nullptr);
1612 // value is now available
1613 s
= db
->Get(read_options
, "foo4", &value
);
1614 ASSERT_EQ(s
, Status::OK());
1615 ASSERT_EQ(value
, "bar4");
1618 TEST_P(TransactionTest
, TwoPhaseDoubleRecoveryTest
) {
1619 WriteOptions write_options
;
1620 write_options
.sync
= true;
1621 write_options
.disableWAL
= false;
1622 ReadOptions read_options
;
1624 TransactionOptions txn_options
;
1629 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
1630 s
= txn
->SetName("a");
1634 s
= txn
->Put(Slice("foo"), Slice("bar"));
1644 env
->SetFilesystemActive(false);
1645 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
1649 assert(db
!= nullptr); // Make clang analyze happy.
1650 txn
= db
->GetTransactionByName("a");
1651 assert(txn
!= nullptr);
1655 s
= db
->Get(read_options
, "foo", &value
);
1656 ASSERT_EQ(s
, Status::OK());
1657 ASSERT_EQ(value
, "bar");
1661 txn
= db
->BeginTransaction(write_options
, txn_options
);
1662 s
= txn
->SetName("b");
1665 s
= txn
->Put(Slice("foo2"), Slice("bar2"));
1677 env
->SetFilesystemActive(false);
1678 ASSERT_OK(ReOpenNoDelete());
1679 assert(db
!= nullptr);
1681 // value is now available
1682 s
= db
->Get(read_options
, "foo", &value
);
1683 ASSERT_EQ(s
, Status::OK());
1684 ASSERT_EQ(value
, "bar");
1686 s
= db
->Get(read_options
, "foo2", &value
);
1687 ASSERT_EQ(s
, Status::OK());
1688 ASSERT_EQ(value
, "bar2");
1691 TEST_P(TransactionTest
, TwoPhaseLogRollingTest
) {
1692 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1696 ColumnFamilyHandle
*cfa
, *cfb
;
1698 // Create 2 new column families
1699 ColumnFamilyOptions cf_options
;
1700 s
= db
->CreateColumnFamily(cf_options
, "CFA", &cfa
);
1702 s
= db
->CreateColumnFamily(cf_options
, "CFB", &cfb
);
1706 wopts
.disableWAL
= false;
1709 TransactionOptions topts1
;
1710 Transaction
* txn1
= db
->BeginTransaction(wopts
, topts1
);
1711 s
= txn1
->SetName("xid1");
1714 TransactionOptions topts2
;
1715 Transaction
* txn2
= db
->BeginTransaction(wopts
, topts2
);
1716 s
= txn2
->SetName("xid2");
1719 // transaction put in two column families
1720 s
= txn1
->Put(cfa
, "ka1", "va1");
1723 // transaction put in two column families
1724 s
= txn2
->Put(cfa
, "ka2", "va2");
1726 s
= txn2
->Put(cfb
, "kb2", "vb2");
1729 // write prep section to wal
1730 s
= txn1
->Prepare();
1733 // our log should be in the heap
1734 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(),
1735 txn1
->GetLogNumber());
1736 ASSERT_EQ(db_impl
->TEST_LogfileNumber(), txn1
->GetLastLogNumber());
1738 // flush default cf to crate new log
1739 s
= db
->Put(wopts
, "foo", "bar");
1741 s
= db_impl
->TEST_FlushMemTable(true);
1744 // make sure we are on a new log
1745 ASSERT_GT(db_impl
->TEST_LogfileNumber(), txn1
->GetLastLogNumber());
1747 // put txn2 prep section in this log
1748 s
= txn2
->Prepare();
1750 ASSERT_EQ(db_impl
->TEST_LogfileNumber(), txn2
->GetLastLogNumber());
1752 // heap should still see first log
1753 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(),
1754 txn1
->GetLogNumber());
1760 // heap should now show txn2s log
1761 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(),
1762 txn2
->GetLogNumber());
1764 switch (txn_db_options
.write_policy
) {
1765 case WRITE_COMMITTED
:
1766 // we should see txn1s log refernced by the memtables
1767 ASSERT_EQ(txn1
->GetLogNumber(),
1768 db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1770 case WRITE_PREPARED
:
1771 case WRITE_UNPREPARED
:
1772 // In these modes memtable do not ref the prep sections
1773 ASSERT_EQ(0, db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1779 // flush default cf to crate new log
1780 s
= db
->Put(wopts
, "foo", "bar2");
1782 s
= db_impl
->TEST_FlushMemTable(true);
1785 // make sure we are on a new log
1786 ASSERT_GT(db_impl
->TEST_LogfileNumber(), txn2
->GetLastLogNumber());
1792 // heap should not show any logs
1793 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(), 0);
1795 switch (txn_db_options
.write_policy
) {
1796 case WRITE_COMMITTED
:
1797 // should show the first txn log
1798 ASSERT_EQ(txn1
->GetLogNumber(),
1799 db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1801 case WRITE_PREPARED
:
1802 case WRITE_UNPREPARED
:
1803 // In these modes memtable do not ref the prep sections
1804 ASSERT_EQ(0, db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1810 // flush only cfa memtable
1811 s
= db_impl
->TEST_FlushMemTable(true, false, cfa
);
1814 switch (txn_db_options
.write_policy
) {
1815 case WRITE_COMMITTED
:
1816 // should show the first txn log
1817 ASSERT_EQ(txn2
->GetLogNumber(),
1818 db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1820 case WRITE_PREPARED
:
1821 case WRITE_UNPREPARED
:
1822 // In these modes memtable do not ref the prep sections
1823 ASSERT_EQ(0, db_impl
->TEST_FindMinPrepLogReferencedByMemTable());
1829 // flush only cfb memtable
1830 s
= db_impl
->TEST_FlushMemTable(true, false, cfb
);
1833 // should show not dependency on logs
1834 ASSERT_EQ(db_impl
->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1835 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(), 0);
1843 TEST_P(TransactionTest
, TwoPhaseLogRollingTest2
) {
1844 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1847 ColumnFamilyHandle
*cfa
, *cfb
;
1849 ColumnFamilyOptions cf_options
;
1850 s
= db
->CreateColumnFamily(cf_options
, "CFA", &cfa
);
1852 s
= db
->CreateColumnFamily(cf_options
, "CFB", &cfb
);
1856 wopts
.disableWAL
= false;
1859 auto cfh_a
= static_cast_with_check
<ColumnFamilyHandleImpl
>(cfa
);
1860 auto cfh_b
= static_cast_with_check
<ColumnFamilyHandleImpl
>(cfb
);
1862 TransactionOptions topts1
;
1863 Transaction
* txn1
= db
->BeginTransaction(wopts
, topts1
);
1864 s
= txn1
->SetName("xid1");
1866 s
= txn1
->Put(cfa
, "boys", "girls1");
1869 Transaction
* txn2
= db
->BeginTransaction(wopts
, topts1
);
1870 s
= txn2
->SetName("xid2");
1872 s
= txn2
->Put(cfb
, "up", "down1");
1875 // prepre transaction in LOG A
1876 s
= txn1
->Prepare();
1879 // prepre transaction in LOG A
1880 s
= txn2
->Prepare();
1883 // regular put so that mem table can actually be flushed for log rolling
1884 s
= db
->Put(wopts
, "cats", "dogs1");
1887 auto prepare_log_no
= txn1
->GetLastLogNumber();
1890 s
= db_impl
->TEST_FlushMemTable(true);
1893 // now we pause background work so that
1894 // imm()s are not flushed before we can check their status
1895 s
= db_impl
->PauseBackgroundWork();
1898 ASSERT_GT(db_impl
->TEST_LogfileNumber(), prepare_log_no
);
1899 switch (txn_db_options
.write_policy
) {
1900 case WRITE_COMMITTED
:
1901 // This cf is empty and should ref the latest log
1902 ASSERT_GT(cfh_a
->cfd()->GetLogNumber(), prepare_log_no
);
1903 ASSERT_EQ(cfh_a
->cfd()->GetLogNumber(), db_impl
->TEST_LogfileNumber());
1905 case WRITE_PREPARED
:
1906 case WRITE_UNPREPARED
:
1907 // This cf is not flushed yet and should ref the log that has its data
1908 ASSERT_EQ(cfh_a
->cfd()->GetLogNumber(), prepare_log_no
);
1913 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(),
1914 txn1
->GetLogNumber());
1915 ASSERT_EQ(db_impl
->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1921 switch (txn_db_options
.write_policy
) {
1922 case WRITE_COMMITTED
:
1923 ASSERT_EQ(db_impl
->TEST_FindMinPrepLogReferencedByMemTable(),
1926 case WRITE_PREPARED
:
1927 case WRITE_UNPREPARED
:
1928 // In these modes memtable do not ref the prep sections
1929 ASSERT_EQ(db_impl
->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1935 ASSERT_TRUE(!db_impl
->TEST_UnableToReleaseOldestLog());
1937 // request a flush for all column families such that the earliest
1938 // alive log file can be killed
1939 db_impl
->TEST_SwitchWAL();
1940 // log cannot be flushed because txn2 has not been commited
1941 ASSERT_TRUE(!db_impl
->TEST_IsLogGettingFlushed());
1942 ASSERT_TRUE(db_impl
->TEST_UnableToReleaseOldestLog());
1944 // assert that cfa has a flush requested
1945 ASSERT_TRUE(cfh_a
->cfd()->imm()->HasFlushRequested());
1947 switch (txn_db_options
.write_policy
) {
1948 case WRITE_COMMITTED
:
1949 // cfb should not be flushed becuse it has no data from LOG A
1950 ASSERT_TRUE(!cfh_b
->cfd()->imm()->HasFlushRequested());
1952 case WRITE_PREPARED
:
1953 case WRITE_UNPREPARED
:
1954 // cfb should be flushed becuse it has prepared data from LOG A
1955 ASSERT_TRUE(cfh_b
->cfd()->imm()->HasFlushRequested());
1961 // cfb now has data from LOG A
1965 db_impl
->TEST_SwitchWAL();
1966 ASSERT_TRUE(!db_impl
->TEST_UnableToReleaseOldestLog());
1968 // we should see that cfb now has a flush requested
1969 ASSERT_TRUE(cfh_b
->cfd()->imm()->HasFlushRequested());
1971 // all data in LOG A resides in a memtable that has been
1972 // requested for a flush
1973 ASSERT_TRUE(db_impl
->TEST_IsLogGettingFlushed());
1981 * 1) use prepare to keep first log around to determine starting sequence
1983 * 2) insert many values, skipping wal, to increase seqid.
1984 * 3) insert final value into wal
1985 * 4) recover and see that final value was properly recovered - not
1986 * hidden behind improperly summed sequence ids
1988 TEST_P(TransactionTest
, TwoPhaseOutOfOrderDelete
) {
1989 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
1990 WriteOptions wal_on
, wal_off
;
1992 wal_on
.disableWAL
= false;
1993 wal_off
.disableWAL
= true;
1994 ReadOptions read_options
;
1995 TransactionOptions txn_options
;
2000 Transaction
* txn1
= db
->BeginTransaction(wal_on
, txn_options
);
2002 s
= txn1
->SetName("1");
2005 s
= db
->Put(wal_on
, "first", "first");
2008 s
= txn1
->Put(Slice("dummy"), Slice("dummy"));
2010 s
= txn1
->Prepare();
2013 s
= db
->Put(wal_off
, "cats", "dogs1");
2015 s
= db
->Put(wal_off
, "cats", "dogs2");
2017 s
= db
->Put(wal_off
, "cats", "dogs3");
2020 s
= db_impl
->TEST_FlushMemTable(true);
2023 s
= db
->Put(wal_on
, "cats", "dogs4");
2026 ASSERT_OK(db
->FlushWAL(false));
2029 env
->SetFilesystemActive(false);
2030 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
2031 ASSERT_OK(ReOpenNoDelete());
2032 assert(db
!= nullptr);
2034 s
= db
->Get(read_options
, "first", &value
);
2036 ASSERT_EQ(value
, "first");
2038 s
= db
->Get(read_options
, "cats", &value
);
2040 ASSERT_EQ(value
, "dogs4");
2043 TEST_P(TransactionTest
, FirstWriteTest
) {
2044 WriteOptions write_options
;
2046 // Test conflict checking against the very first write to a db.
2047 // The transaction's snapshot will have seq 1 and the following write
2048 // will have sequence 1.
2049 Status s
= db
->Put(write_options
, "A", "a");
2051 Transaction
* txn
= db
->BeginTransaction(write_options
);
2056 s
= txn
->Put("A", "b");
2062 TEST_P(TransactionTest
, FirstWriteTest2
) {
2063 WriteOptions write_options
;
2065 Transaction
* txn
= db
->BeginTransaction(write_options
);
2068 // Test conflict checking against the very first write to a db.
2069 // The transaction's snapshot is a seq 0 while the following write
2070 // will have sequence 1.
2071 Status s
= db
->Put(write_options
, "A", "a");
2074 s
= txn
->Put("A", "b");
2075 ASSERT_TRUE(s
.IsBusy());
2080 TEST_P(TransactionTest
, WriteOptionsTest
) {
2081 WriteOptions write_options
;
2082 write_options
.sync
= true;
2083 write_options
.disableWAL
= true;
2085 Transaction
* txn
= db
->BeginTransaction(write_options
);
2088 ASSERT_TRUE(txn
->GetWriteOptions()->sync
);
2090 write_options
.sync
= false;
2091 txn
->SetWriteOptions(write_options
);
2092 ASSERT_FALSE(txn
->GetWriteOptions()->sync
);
2093 ASSERT_TRUE(txn
->GetWriteOptions()->disableWAL
);
2098 TEST_P(TransactionTest
, WriteConflictTest
) {
2099 WriteOptions write_options
;
2100 ReadOptions read_options
;
2104 ASSERT_OK(db
->Put(write_options
, "foo", "A"));
2105 ASSERT_OK(db
->Put(write_options
, "foo2", "B"));
2107 Transaction
* txn
= db
->BeginTransaction(write_options
);
2110 s
= txn
->Put("foo", "A2");
2113 s
= txn
->Put("foo2", "B2");
2116 // This Put outside of a transaction will conflict with the previous write
2117 s
= db
->Put(write_options
, "foo", "xxx");
2118 ASSERT_TRUE(s
.IsTimedOut());
2120 s
= db
->Get(read_options
, "foo", &value
);
2121 ASSERT_EQ(value
, "A");
2126 db
->Get(read_options
, "foo", &value
);
2127 ASSERT_EQ(value
, "A2");
2128 db
->Get(read_options
, "foo2", &value
);
2129 ASSERT_EQ(value
, "B2");
2134 TEST_P(TransactionTest
, WriteConflictTest2
) {
2135 WriteOptions write_options
;
2136 ReadOptions read_options
;
2137 TransactionOptions txn_options
;
2141 ASSERT_OK(db
->Put(write_options
, "foo", "bar"));
2143 txn_options
.set_snapshot
= true;
2144 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
2147 // This Put outside of a transaction will conflict with a later write
2148 s
= db
->Put(write_options
, "foo", "barz");
2151 s
= txn
->Put("foo2", "X");
2155 "bar2"); // Conflicts with write done after snapshot taken
2156 ASSERT_TRUE(s
.IsBusy());
2158 s
= txn
->Put("foo3", "Y");
2161 s
= db
->Get(read_options
, "foo", &value
);
2162 ASSERT_EQ(value
, "barz");
2164 ASSERT_EQ(2, txn
->GetNumKeys());
2167 ASSERT_OK(s
); // Txn should commit, but only write foo2 and foo3
2169 // Verify that transaction wrote foo2 and foo3 but not foo
2170 db
->Get(read_options
, "foo", &value
);
2171 ASSERT_EQ(value
, "barz");
2173 db
->Get(read_options
, "foo2", &value
);
2174 ASSERT_EQ(value
, "X");
2176 db
->Get(read_options
, "foo3", &value
);
2177 ASSERT_EQ(value
, "Y");
2182 TEST_P(TransactionTest
, ReadConflictTest
) {
2183 WriteOptions write_options
;
2184 ReadOptions read_options
, snapshot_read_options
;
2185 TransactionOptions txn_options
;
2189 ASSERT_OK(db
->Put(write_options
, "foo", "bar"));
2190 ASSERT_OK(db
->Put(write_options
, "foo2", "bar"));
2192 txn_options
.set_snapshot
= true;
2193 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
2197 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
2199 ASSERT_OK(txn
->GetForUpdate(snapshot_read_options
, "foo", &value
));
2200 ASSERT_EQ(value
, "bar");
2202 // This Put outside of a transaction will conflict with the previous read
2203 s
= db
->Put(write_options
, "foo", "barz");
2204 ASSERT_TRUE(s
.IsTimedOut());
2206 s
= db
->Get(read_options
, "foo", &value
);
2207 ASSERT_EQ(value
, "bar");
2209 s
= txn
->Get(read_options
, "foo", &value
);
2210 ASSERT_EQ(value
, "bar");
2218 TEST_P(TransactionTest
, TxnOnlyTest
) {
2219 // Test to make sure transactions work when there are no other writes in an
2222 WriteOptions write_options
;
2223 ReadOptions read_options
;
2227 Transaction
* txn
= db
->BeginTransaction(write_options
);
2230 s
= txn
->Put("x", "y");
2239 TEST_P(TransactionTest
, FlushTest
) {
2240 WriteOptions write_options
;
2241 ReadOptions read_options
, snapshot_read_options
;
2245 ASSERT_OK(db
->Put(write_options
, Slice("foo"), Slice("bar")));
2246 ASSERT_OK(db
->Put(write_options
, Slice("foo2"), Slice("bar")));
2248 Transaction
* txn
= db
->BeginTransaction(write_options
);
2251 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
2253 ASSERT_OK(txn
->GetForUpdate(snapshot_read_options
, "foo", &value
));
2254 ASSERT_EQ(value
, "bar");
2256 s
= txn
->Put(Slice("foo"), Slice("bar2"));
2259 ASSERT_OK(txn
->GetForUpdate(snapshot_read_options
, "foo", &value
));
2260 ASSERT_EQ(value
, "bar2");
2262 // Put a random key so we have a memtable to flush
2263 s
= db
->Put(write_options
, "dummy", "dummy");
2266 // force a memtable flush
2267 FlushOptions flush_ops
;
2268 db
->Flush(flush_ops
);
2271 // txn should commit since the flushed table is still in MemtableList History
2274 db
->Get(read_options
, "foo", &value
);
2275 ASSERT_EQ(value
, "bar2");
2280 TEST_P(TransactionTest
, FlushTest2
) {
2281 const size_t num_tests
= 3;
2283 for (size_t n
= 0; n
< num_tests
; n
++) {
2284 // Test different table factories
2289 options
.table_factory
.reset(new mock::MockTableFactory());
2292 PlainTableOptions pt_opts
;
2293 pt_opts
.hash_table_ratio
= 0;
2294 options
.table_factory
.reset(NewPlainTableFactory(pt_opts
));
2299 Status s
= ReOpen();
2301 assert(db
!= nullptr);
2303 WriteOptions write_options
;
2304 ReadOptions read_options
, snapshot_read_options
;
2305 TransactionOptions txn_options
;
2308 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
2310 ASSERT_OK(db
->Put(write_options
, Slice("foo"), Slice("bar")));
2311 ASSERT_OK(db
->Put(write_options
, Slice("foo2"), Slice("bar2")));
2312 ASSERT_OK(db
->Put(write_options
, Slice("foo3"), Slice("bar3")));
2314 txn_options
.set_snapshot
= true;
2315 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
2318 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
2320 ASSERT_OK(txn
->GetForUpdate(snapshot_read_options
, "foo", &value
));
2321 ASSERT_EQ(value
, "bar");
2323 s
= txn
->Put(Slice("foo"), Slice("bar2"));
2326 ASSERT_OK(txn
->GetForUpdate(snapshot_read_options
, "foo", &value
));
2327 ASSERT_EQ(value
, "bar2");
2328 // verify foo is locked by txn
2329 s
= db
->Delete(write_options
, "foo");
2330 ASSERT_TRUE(s
.IsTimedOut());
2332 s
= db
->Put(write_options
, "Z", "z");
2334 s
= db
->Put(write_options
, "dummy", "dummy");
2337 s
= db
->Put(write_options
, "S", "s");
2339 s
= db
->SingleDelete(write_options
, "S");
2342 s
= txn
->Delete("S");
2343 // Should fail after encountering a write to S in memtable
2344 ASSERT_TRUE(s
.IsBusy());
2346 // force a memtable flush
2347 s
= db_impl
->TEST_FlushMemTable(true);
2350 // Put a random key so we have a MemTable to flush
2351 s
= db
->Put(write_options
, "dummy", "dummy2");
2354 // force a memtable flush
2355 ASSERT_OK(db_impl
->TEST_FlushMemTable(true));
2357 s
= db
->Put(write_options
, "dummy", "dummy3");
2360 // force a memtable flush
2361 // Since our test db has max_write_buffer_number=2, this flush will cause
2362 // the first memtable to get purged from the MemtableList history.
2363 ASSERT_OK(db_impl
->TEST_FlushMemTable(true));
2365 s
= txn
->Put("X", "Y");
2366 // Should succeed after verifying there is no write to X in SST file
2369 s
= txn
->Put("Z", "zz");
2370 // Should fail after encountering a write to Z in SST file
2371 ASSERT_TRUE(s
.IsBusy());
2373 s
= txn
->GetForUpdate(read_options
, "foo2", &value
);
2374 // should succeed since key was written before txn started
2376 // verify foo2 is locked by txn
2377 s
= db
->Delete(write_options
, "foo2");
2378 ASSERT_TRUE(s
.IsTimedOut());
2380 s
= txn
->Delete("S");
2381 // Should fail after encountering a write to S in SST file
2382 ASSERT_TRUE(s
.IsBusy());
2384 // Write a bunch of keys to db to force a compaction
2386 for (int i
= 0; i
< 1000; i
++) {
2387 s
= db
->Put(write_options
, std::to_string(i
),
2388 test::CompressibleString(&rnd
, 0.8, 100, &value
));
2392 s
= txn
->Put("X", "yy");
2393 // Should succeed after verifying there is no write to X in SST file
2396 s
= txn
->Put("Z", "zzz");
2397 // Should fail after encountering a write to Z in SST file
2398 ASSERT_TRUE(s
.IsBusy());
2400 s
= txn
->Delete("S");
2401 // Should fail after encountering a write to S in SST file
2402 ASSERT_TRUE(s
.IsBusy());
2404 s
= txn
->GetForUpdate(read_options
, "foo3", &value
);
2405 // should succeed since key was written before txn started
2407 // verify foo3 is locked by txn
2408 s
= db
->Delete(write_options
, "foo3");
2409 ASSERT_TRUE(s
.IsTimedOut());
2411 ASSERT_OK(db_impl
->TEST_WaitForCompact());
2416 // Transaction should only write the keys that succeeded.
2417 s
= db
->Get(read_options
, "foo", &value
);
2418 ASSERT_EQ(value
, "bar2");
2420 s
= db
->Get(read_options
, "X", &value
);
2422 ASSERT_EQ("yy", value
);
2424 s
= db
->Get(read_options
, "Z", &value
);
2426 ASSERT_EQ("z", value
);
2432 TEST_P(TransactionTest
, NoSnapshotTest
) {
2433 WriteOptions write_options
;
2434 ReadOptions read_options
;
2438 ASSERT_OK(db
->Put(write_options
, "AAA", "bar"));
2440 Transaction
* txn
= db
->BeginTransaction(write_options
);
2443 // Modify key after transaction start
2444 ASSERT_OK(db
->Put(write_options
, "AAA", "bar1"));
2446 // Read and write without a snap
2447 ASSERT_OK(txn
->GetForUpdate(read_options
, "AAA", &value
));
2448 ASSERT_EQ(value
, "bar1");
2449 s
= txn
->Put("AAA", "bar2");
2452 // Should commit since read/write was done after data changed
2456 ASSERT_OK(txn
->GetForUpdate(read_options
, "AAA", &value
));
2457 ASSERT_EQ(value
, "bar2");
2462 TEST_P(TransactionTest
, MultipleSnapshotTest
) {
2463 WriteOptions write_options
;
2464 ReadOptions read_options
, snapshot_read_options
;
2468 ASSERT_OK(db
->Put(write_options
, "AAA", "bar"));
2469 ASSERT_OK(db
->Put(write_options
, "BBB", "bar"));
2470 ASSERT_OK(db
->Put(write_options
, "CCC", "bar"));
2472 Transaction
* txn
= db
->BeginTransaction(write_options
);
2475 ASSERT_OK(db
->Put(write_options
, "AAA", "bar1"));
2477 // Read and write without a snapshot
2478 ASSERT_OK(txn
->GetForUpdate(read_options
, "AAA", &value
));
2479 ASSERT_EQ(value
, "bar1");
2480 s
= txn
->Put("AAA", "bar2");
2483 // Modify BBB before snapshot is taken
2484 ASSERT_OK(db
->Put(write_options
, "BBB", "bar1"));
2487 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
2489 // Read and write with snapshot
2490 ASSERT_OK(txn
->GetForUpdate(snapshot_read_options
, "BBB", &value
));
2491 ASSERT_EQ(value
, "bar1");
2492 s
= txn
->Put("BBB", "bar2");
2495 ASSERT_OK(db
->Put(write_options
, "CCC", "bar1"));
2497 // Set a new snapshot
2499 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
2501 // Read and write with snapshot
2502 ASSERT_OK(txn
->GetForUpdate(snapshot_read_options
, "CCC", &value
));
2503 ASSERT_EQ(value
, "bar1");
2504 s
= txn
->Put("CCC", "bar2");
2507 s
= txn
->GetForUpdate(read_options
, "AAA", &value
);
2509 ASSERT_EQ(value
, "bar2");
2510 s
= txn
->GetForUpdate(read_options
, "BBB", &value
);
2512 ASSERT_EQ(value
, "bar2");
2513 s
= txn
->GetForUpdate(read_options
, "CCC", &value
);
2515 ASSERT_EQ(value
, "bar2");
2517 s
= db
->Get(read_options
, "AAA", &value
);
2519 ASSERT_EQ(value
, "bar1");
2520 s
= db
->Get(read_options
, "BBB", &value
);
2522 ASSERT_EQ(value
, "bar1");
2523 s
= db
->Get(read_options
, "CCC", &value
);
2525 ASSERT_EQ(value
, "bar1");
2530 s
= db
->Get(read_options
, "AAA", &value
);
2532 ASSERT_EQ(value
, "bar2");
2533 s
= db
->Get(read_options
, "BBB", &value
);
2535 ASSERT_EQ(value
, "bar2");
2536 s
= db
->Get(read_options
, "CCC", &value
);
2538 ASSERT_EQ(value
, "bar2");
2540 // verify that we track multiple writes to the same key at different snapshots
2542 txn
= db
->BeginTransaction(write_options
);
2544 // Potentially conflicting writes
2545 ASSERT_OK(db
->Put(write_options
, "ZZZ", "zzz"));
2546 ASSERT_OK(db
->Put(write_options
, "XXX", "xxx"));
2550 TransactionOptions txn_options
;
2551 txn_options
.set_snapshot
= true;
2552 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
2553 txn2
->SetSnapshot();
2555 // This should not conflict in txn since the snapshot is later than the
2556 // previous write (spoiler alert: it will later conflict with txn2).
2557 s
= txn
->Put("ZZZ", "zzzz");
2565 // This will conflict since the snapshot is earlier than another write to ZZZ
2566 s
= txn2
->Put("ZZZ", "xxxxx");
2567 ASSERT_TRUE(s
.IsBusy());
2572 s
= db
->Get(read_options
, "ZZZ", &value
);
2574 ASSERT_EQ(value
, "zzzz");
2579 TEST_P(TransactionTest
, ColumnFamiliesTest
) {
2580 WriteOptions write_options
;
2581 ReadOptions read_options
, snapshot_read_options
;
2582 TransactionOptions txn_options
;
2586 ColumnFamilyHandle
*cfa
, *cfb
;
2587 ColumnFamilyOptions cf_options
;
2589 // Create 2 new column families
2590 s
= db
->CreateColumnFamily(cf_options
, "CFA", &cfa
);
2592 s
= db
->CreateColumnFamily(cf_options
, "CFB", &cfb
);
2600 // open DB with three column families
2601 std::vector
<ColumnFamilyDescriptor
> column_families
;
2602 // have to open default column family
2603 column_families
.push_back(
2604 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, ColumnFamilyOptions()));
2605 // open the new column families
2606 column_families
.push_back(
2607 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
2608 column_families
.push_back(
2609 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
2611 std::vector
<ColumnFamilyHandle
*> handles
;
2613 ASSERT_OK(ReOpenNoDelete(column_families
, &handles
));
2614 assert(db
!= nullptr);
2616 Transaction
* txn
= db
->BeginTransaction(write_options
);
2620 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
2622 txn_options
.set_snapshot
= true;
2623 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
2626 // Write some data to the db
2628 ASSERT_OK(batch
.Put("foo", "foo"));
2629 ASSERT_OK(batch
.Put(handles
[1], "AAA", "bar"));
2630 ASSERT_OK(batch
.Put(handles
[1], "AAAZZZ", "bar"));
2631 s
= db
->Write(write_options
, &batch
);
2633 ASSERT_OK(db
->Delete(write_options
, handles
[1], "AAAZZZ"));
2635 // These keys do not conflict with existing writes since they're in
2636 // different column families
2637 s
= txn
->Delete("AAA");
2639 s
= txn
->GetForUpdate(snapshot_read_options
, handles
[1], "foo", &value
);
2640 ASSERT_TRUE(s
.IsNotFound());
2641 Slice
key_slice("AAAZZZ");
2642 Slice value_slices
[2] = {Slice("bar"), Slice("bar")};
2643 s
= txn
->Put(handles
[2], SliceParts(&key_slice
, 1),
2644 SliceParts(value_slices
, 2));
2646 ASSERT_EQ(3, txn
->GetNumKeys());
2650 s
= db
->Get(read_options
, "AAA", &value
);
2651 ASSERT_TRUE(s
.IsNotFound());
2652 s
= db
->Get(read_options
, handles
[2], "AAAZZZ", &value
);
2653 ASSERT_EQ(value
, "barbar");
2655 Slice key_slices
[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
2656 Slice
value_slice("barbarbar");
2658 s
= txn2
->Delete(handles
[2], "XXX");
2660 s
= txn2
->Delete(handles
[1], "XXX");
2663 // This write will cause a conflict with the earlier batch write
2664 s
= txn2
->Put(handles
[1], SliceParts(key_slices
, 3),
2665 SliceParts(&value_slice
, 1));
2666 ASSERT_TRUE(s
.IsBusy());
2670 // In the above the latest change to AAAZZZ in handles[1] is delete.
2671 s
= db
->Get(read_options
, handles
[1], "AAAZZZ", &value
);
2672 ASSERT_TRUE(s
.IsNotFound());
2677 txn
= db
->BeginTransaction(write_options
, txn_options
);
2678 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
2680 txn2
= db
->BeginTransaction(write_options
, txn_options
);
2683 std::vector
<ColumnFamilyHandle
*> multiget_cfh
= {handles
[1], handles
[2],
2684 handles
[0], handles
[2]};
2685 std::vector
<Slice
> multiget_keys
= {"AAA", "AAAZZZ", "foo", "foo"};
2686 std::vector
<std::string
> values(4);
2687 std::vector
<Status
> results
= txn
->MultiGetForUpdate(
2688 snapshot_read_options
, multiget_cfh
, multiget_keys
, &values
);
2689 ASSERT_OK(results
[0]);
2690 ASSERT_OK(results
[1]);
2691 ASSERT_OK(results
[2]);
2692 ASSERT_TRUE(results
[3].IsNotFound());
2693 ASSERT_EQ(values
[0], "bar");
2694 ASSERT_EQ(values
[1], "barbar");
2695 ASSERT_EQ(values
[2], "foo");
2697 s
= txn
->SingleDelete(handles
[2], "ZZZ");
2699 s
= txn
->Put(handles
[2], "ZZZ", "YYY");
2701 s
= txn
->Put(handles
[2], "ZZZ", "YYYY");
2703 s
= txn
->Delete(handles
[2], "ZZZ");
2705 s
= txn
->Put(handles
[2], "AAAZZZ", "barbarbar");
2708 ASSERT_EQ(5, txn
->GetNumKeys());
2710 // Txn should commit
2713 s
= db
->Get(read_options
, handles
[2], "ZZZ", &value
);
2714 ASSERT_TRUE(s
.IsNotFound());
2716 // Put a key which will conflict with the next txn using the previous snapshot
2717 ASSERT_OK(db
->Put(write_options
, handles
[2], "foo", "000"));
2719 results
= txn2
->MultiGetForUpdate(snapshot_read_options
, multiget_cfh
,
2720 multiget_keys
, &values
);
2721 // All results should fail since there was a conflict
2722 ASSERT_TRUE(results
[0].IsBusy());
2723 ASSERT_TRUE(results
[1].IsBusy());
2724 ASSERT_TRUE(results
[2].IsBusy());
2725 ASSERT_TRUE(results
[3].IsBusy());
2727 s
= db
->Get(read_options
, handles
[2], "foo", &value
);
2728 ASSERT_EQ(value
, "000");
2733 s
= db
->DropColumnFamily(handles
[1]);
2735 s
= db
->DropColumnFamily(handles
[2]);
2741 for (auto handle
: handles
) {
2746 TEST_P(TransactionTest
, MultiGetBatchedTest
) {
2747 WriteOptions write_options
;
2748 ReadOptions read_options
, snapshot_read_options
;
2749 TransactionOptions txn_options
;
2753 ColumnFamilyHandle
* cf
;
2754 ColumnFamilyOptions cf_options
;
2756 // Create a new column families
2757 s
= db
->CreateColumnFamily(cf_options
, "CF", &cf
);
2764 // open DB with three column families
2765 std::vector
<ColumnFamilyDescriptor
> column_families
;
2766 // have to open default column family
2767 column_families
.push_back(
2768 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, ColumnFamilyOptions()));
2769 // open the new column families
2770 cf_options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
2771 column_families
.push_back(ColumnFamilyDescriptor("CF", cf_options
));
2773 std::vector
<ColumnFamilyHandle
*> handles
;
2775 options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
2776 ASSERT_OK(ReOpenNoDelete(column_families
, &handles
));
2777 assert(db
!= nullptr);
2779 // Write some data to the db
2781 ASSERT_OK(batch
.Put(handles
[1], "aaa", "val1"));
2782 ASSERT_OK(batch
.Put(handles
[1], "bbb", "val2"));
2783 ASSERT_OK(batch
.Put(handles
[1], "ccc", "val3"));
2784 ASSERT_OK(batch
.Put(handles
[1], "ddd", "foo"));
2785 ASSERT_OK(batch
.Put(handles
[1], "eee", "val5"));
2786 ASSERT_OK(batch
.Put(handles
[1], "fff", "val6"));
2787 ASSERT_OK(batch
.Merge(handles
[1], "ggg", "foo"));
2788 s
= db
->Write(write_options
, &batch
);
2791 Transaction
* txn
= db
->BeginTransaction(write_options
);
2795 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
2797 txn_options
.set_snapshot
= true;
2798 // Write some data to the db
2799 s
= txn
->Delete(handles
[1], "bbb");
2801 s
= txn
->Put(handles
[1], "ccc", "val3_new");
2803 s
= txn
->Merge(handles
[1], "ddd", "bar");
2806 std::vector
<Slice
> keys
= {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg"};
2807 std::vector
<PinnableSlice
> values(keys
.size());
2808 std::vector
<Status
> statuses(keys
.size());
2810 txn
->MultiGet(snapshot_read_options
, handles
[1], keys
.size(), keys
.data(),
2811 values
.data(), statuses
.data());
2812 ASSERT_TRUE(statuses
[0].ok());
2813 ASSERT_EQ(values
[0], "val1");
2814 ASSERT_TRUE(statuses
[1].IsNotFound());
2815 ASSERT_TRUE(statuses
[2].ok());
2816 ASSERT_EQ(values
[2], "val3_new");
2817 ASSERT_TRUE(statuses
[3].IsMergeInProgress());
2818 ASSERT_TRUE(statuses
[4].ok());
2819 ASSERT_EQ(values
[4], "val5");
2820 ASSERT_TRUE(statuses
[5].ok());
2821 ASSERT_EQ(values
[5], "val6");
2822 ASSERT_TRUE(statuses
[6].ok());
2823 ASSERT_EQ(values
[6], "foo");
2825 for (auto handle
: handles
) {
2830 // This test calls WriteBatchWithIndex::MultiGetFromBatchAndDB with a large
2831 // number of keys, i.e greater than MultiGetContext::MAX_BATCH_SIZE, which is
2832 // is 32. This forces autovector allocations in the MultiGet code paths
2833 // to use std::vector in addition to stack allocations. The MultiGet keys
2834 // includes Merges, which are handled specially in MultiGetFromBatchAndDB by
2835 // allocating an autovector of MergeContexts
2836 TEST_P(TransactionTest
, MultiGetLargeBatchedTest
) {
2837 WriteOptions write_options
;
2838 ReadOptions read_options
, snapshot_read_options
;
2842 ColumnFamilyHandle
* cf
;
2843 ColumnFamilyOptions cf_options
;
2845 std::vector
<std::string
> key_str
;
2846 for (int i
= 0; i
< 100; ++i
) {
2847 key_str
.emplace_back(std::to_string(i
));
2849 // Create a new column families
2850 s
= db
->CreateColumnFamily(cf_options
, "CF", &cf
);
2857 // open DB with three column families
2858 std::vector
<ColumnFamilyDescriptor
> column_families
;
2859 // have to open default column family
2860 column_families
.push_back(
2861 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, ColumnFamilyOptions()));
2862 // open the new column families
2863 cf_options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
2864 column_families
.push_back(ColumnFamilyDescriptor("CF", cf_options
));
2866 std::vector
<ColumnFamilyHandle
*> handles
;
2868 options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
2869 ASSERT_OK(ReOpenNoDelete(column_families
, &handles
));
2870 assert(db
!= nullptr);
2872 // Write some data to the db
2874 for (int i
= 0; i
< 3 * MultiGetContext::MAX_BATCH_SIZE
; ++i
) {
2875 std::string val
= "val" + std::to_string(i
);
2876 ASSERT_OK(batch
.Put(handles
[1], key_str
[i
], val
));
2878 s
= db
->Write(write_options
, &batch
);
2881 WriteBatchWithIndex wb
;
2882 // Write some data to the db
2883 s
= wb
.Delete(handles
[1], std::to_string(1));
2885 s
= wb
.Put(handles
[1], std::to_string(2), "new_val" + std::to_string(2));
2887 // Write a lot of merges so when we call MultiGetFromBatchAndDB later on,
2888 // it is forced to use std::vector in ROCKSDB_NAMESPACE::autovector to
2889 // allocate MergeContexts. The number of merges needs to be >
2890 // MultiGetContext::MAX_BATCH_SIZE
2891 for (int i
= 8; i
< MultiGetContext::MAX_BATCH_SIZE
+ 24; ++i
) {
2892 s
= wb
.Merge(handles
[1], std::to_string(i
), "merge");
2896 // MultiGet a lot of keys in order to force std::vector reallocations
2897 std::vector
<Slice
> keys
;
2898 for (int i
= 0; i
< MultiGetContext::MAX_BATCH_SIZE
+ 32; ++i
) {
2899 keys
.emplace_back(key_str
[i
]);
2901 std::vector
<PinnableSlice
> values(keys
.size());
2902 std::vector
<Status
> statuses(keys
.size());
2904 wb
.MultiGetFromBatchAndDB(db
, snapshot_read_options
, handles
[1], keys
.size(), keys
.data(),
2905 values
.data(), statuses
.data(), false);
2906 for (size_t i
=0; i
< keys
.size(); ++i
) {
2908 ASSERT_TRUE(statuses
[1].IsNotFound());
2909 } else if (i
== 2) {
2910 ASSERT_TRUE(statuses
[2].ok());
2911 ASSERT_EQ(values
[2], "new_val" + std::to_string(2));
2912 } else if (i
>= 8 && i
< 56) {
2913 ASSERT_TRUE(statuses
[i
].ok());
2914 ASSERT_EQ(values
[i
], "val" + std::to_string(i
) + ",merge");
2916 ASSERT_TRUE(statuses
[i
].ok());
2917 if (values
[i
] != "val" + std::to_string(i
)) {
2918 ASSERT_EQ(values
[i
], "val" + std::to_string(i
));
2923 for (auto handle
: handles
) {
2928 TEST_P(TransactionTest
, ColumnFamiliesTest2
) {
2929 WriteOptions write_options
;
2930 ReadOptions read_options
, snapshot_read_options
;
2934 ColumnFamilyHandle
*one
, *two
;
2935 ColumnFamilyOptions cf_options
;
2937 // Create 2 new column families
2938 s
= db
->CreateColumnFamily(cf_options
, "ONE", &one
);
2940 s
= db
->CreateColumnFamily(cf_options
, "TWO", &two
);
2943 Transaction
* txn1
= db
->BeginTransaction(write_options
);
2945 Transaction
* txn2
= db
->BeginTransaction(write_options
);
2948 s
= txn1
->Put(one
, "X", "1");
2950 s
= txn1
->Put(two
, "X", "2");
2952 s
= txn1
->Put("X", "0");
2955 s
= txn2
->Put(one
, "X", "11");
2956 ASSERT_TRUE(s
.IsTimedOut());
2961 // Drop first column family
2962 s
= db
->DropColumnFamily(one
);
2965 // Should fail since column family was dropped.
2970 txn1
= db
->BeginTransaction(write_options
);
2973 // Should fail since column family was dropped
2974 s
= txn1
->Put(one
, "X", "111");
2975 ASSERT_TRUE(s
.IsInvalidArgument());
2977 s
= txn1
->Put(two
, "X", "222");
2980 s
= txn1
->Put("X", "000");
2986 s
= db
->Get(read_options
, two
, "X", &value
);
2988 ASSERT_EQ("222", value
);
2990 s
= db
->Get(read_options
, "X", &value
);
2992 ASSERT_EQ("000", value
);
2994 s
= db
->DropColumnFamily(two
);
3004 TEST_P(TransactionTest
, EmptyTest
) {
3005 WriteOptions write_options
;
3006 ReadOptions read_options
;
3010 s
= db
->Put(write_options
, "aaa", "aaa");
3013 Transaction
* txn
= db
->BeginTransaction(write_options
);
3018 txn
= db
->BeginTransaction(write_options
);
3019 ASSERT_OK(txn
->Rollback());
3022 txn
= db
->BeginTransaction(write_options
);
3023 s
= txn
->GetForUpdate(read_options
, "aaa", &value
);
3024 ASSERT_EQ(value
, "aaa");
3030 txn
= db
->BeginTransaction(write_options
);
3033 s
= txn
->GetForUpdate(read_options
, "aaa", &value
);
3034 ASSERT_EQ(value
, "aaa");
3036 // Conflicts with previous GetForUpdate
3037 s
= db
->Put(write_options
, "aaa", "xxx");
3038 ASSERT_TRUE(s
.IsTimedOut());
3040 // transaction expired!
3046 TEST_P(TransactionTest
, PredicateManyPreceders
) {
3047 WriteOptions write_options
;
3048 ReadOptions read_options1
, read_options2
;
3049 TransactionOptions txn_options
;
3053 txn_options
.set_snapshot
= true;
3054 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
3055 read_options1
.snapshot
= txn1
->GetSnapshot();
3057 Transaction
* txn2
= db
->BeginTransaction(write_options
);
3058 txn2
->SetSnapshot();
3059 read_options2
.snapshot
= txn2
->GetSnapshot();
3061 std::vector
<Slice
> multiget_keys
= {"1", "2", "3"};
3062 std::vector
<std::string
> multiget_values
;
3064 std::vector
<Status
> results
=
3065 txn1
->MultiGetForUpdate(read_options1
, multiget_keys
, &multiget_values
);
3066 ASSERT_EQ(results
.size(), 3);
3067 ASSERT_TRUE(results
[0].IsNotFound());
3068 ASSERT_TRUE(results
[1].IsNotFound());
3069 ASSERT_TRUE(results
[2].IsNotFound());
3071 s
= txn2
->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate
3072 ASSERT_TRUE(s
.IsTimedOut());
3074 ASSERT_OK(txn2
->Rollback());
3076 multiget_values
.clear();
3078 txn1
->MultiGetForUpdate(read_options1
, multiget_keys
, &multiget_values
);
3079 ASSERT_EQ(results
.size(), 3);
3080 ASSERT_TRUE(results
[0].IsNotFound());
3081 ASSERT_TRUE(results
[1].IsNotFound());
3082 ASSERT_TRUE(results
[2].IsNotFound());
3090 txn1
= db
->BeginTransaction(write_options
, txn_options
);
3091 read_options1
.snapshot
= txn1
->GetSnapshot();
3093 txn2
= db
->BeginTransaction(write_options
, txn_options
);
3094 read_options2
.snapshot
= txn2
->GetSnapshot();
3096 s
= txn1
->Put("4", "x");
3099 s
= txn2
->Delete("4"); // conflict
3100 ASSERT_TRUE(s
.IsTimedOut());
3105 s
= txn2
->GetForUpdate(read_options2
, "4", &value
);
3106 ASSERT_TRUE(s
.IsBusy());
3108 ASSERT_OK(txn2
->Rollback());
3114 TEST_P(TransactionTest
, LostUpdate
) {
3115 WriteOptions write_options
;
3116 ReadOptions read_options
, read_options1
, read_options2
;
3117 TransactionOptions txn_options
;
3121 // Test 2 transactions writing to the same key in multiple orders and
3122 // with/without snapshots
3124 Transaction
* txn1
= db
->BeginTransaction(write_options
);
3125 Transaction
* txn2
= db
->BeginTransaction(write_options
);
3127 s
= txn1
->Put("1", "1");
3130 s
= txn2
->Put("1", "2"); // conflict
3131 ASSERT_TRUE(s
.IsTimedOut());
3139 s
= db
->Get(read_options
, "1", &value
);
3141 ASSERT_EQ("1", value
);
3146 txn_options
.set_snapshot
= true;
3147 txn1
= db
->BeginTransaction(write_options
, txn_options
);
3148 read_options1
.snapshot
= txn1
->GetSnapshot();
3150 txn2
= db
->BeginTransaction(write_options
, txn_options
);
3151 read_options2
.snapshot
= txn2
->GetSnapshot();
3153 s
= txn1
->Put("1", "3");
3155 s
= txn2
->Put("1", "4"); // conflict
3156 ASSERT_TRUE(s
.IsTimedOut());
3164 s
= db
->Get(read_options
, "1", &value
);
3166 ASSERT_EQ("3", value
);
3171 txn1
= db
->BeginTransaction(write_options
, txn_options
);
3172 read_options1
.snapshot
= txn1
->GetSnapshot();
3174 txn2
= db
->BeginTransaction(write_options
, txn_options
);
3175 read_options2
.snapshot
= txn2
->GetSnapshot();
3177 s
= txn1
->Put("1", "5");
3183 s
= txn2
->Put("1", "6");
3184 ASSERT_TRUE(s
.IsBusy());
3188 s
= db
->Get(read_options
, "1", &value
);
3190 ASSERT_EQ("5", value
);
3195 txn1
= db
->BeginTransaction(write_options
, txn_options
);
3196 read_options1
.snapshot
= txn1
->GetSnapshot();
3198 txn2
= db
->BeginTransaction(write_options
, txn_options
);
3199 read_options2
.snapshot
= txn2
->GetSnapshot();
3201 s
= txn1
->Put("1", "7");
3206 txn2
->SetSnapshot();
3207 s
= txn2
->Put("1", "8");
3212 s
= db
->Get(read_options
, "1", &value
);
3214 ASSERT_EQ("8", value
);
3219 txn1
= db
->BeginTransaction(write_options
);
3220 txn2
= db
->BeginTransaction(write_options
);
3222 s
= txn1
->Put("1", "9");
3227 s
= txn2
->Put("1", "10");
3235 s
= db
->Get(read_options
, "1", &value
);
3237 ASSERT_EQ(value
, "10");
3240 TEST_P(TransactionTest
, UntrackedWrites
) {
3241 if (txn_db_options
.write_policy
== WRITE_UNPREPARED
) {
3242 // TODO(lth): For WriteUnprepared, validate that untracked writes are
3247 WriteOptions write_options
;
3248 ReadOptions read_options
;
3252 // Verify transaction rollback works for untracked keys.
3253 Transaction
* txn
= db
->BeginTransaction(write_options
);
3256 s
= txn
->PutUntracked("untracked", "0");
3258 ASSERT_OK(txn
->Rollback());
3259 s
= db
->Get(read_options
, "untracked", &value
);
3260 ASSERT_TRUE(s
.IsNotFound());
3263 txn
= db
->BeginTransaction(write_options
);
3266 s
= db
->Put(write_options
, "untracked", "x");
3269 // Untracked writes should succeed even though key was written after snapshot
3270 s
= txn
->PutUntracked("untracked", "1");
3272 s
= txn
->MergeUntracked("untracked", "2");
3274 s
= txn
->DeleteUntracked("untracked");
3278 s
= txn
->Put("untracked", "3");
3279 ASSERT_TRUE(s
.IsBusy());
3284 s
= db
->Get(read_options
, "untracked", &value
);
3285 ASSERT_TRUE(s
.IsNotFound());
3290 TEST_P(TransactionTest
, ExpiredTransaction
) {
3291 WriteOptions write_options
;
3292 ReadOptions read_options
;
3293 TransactionOptions txn_options
;
3297 // Set txn expiration timeout to 0 microseconds (expires instantly)
3298 txn_options
.expiration
= 0;
3299 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
3301 s
= txn1
->Put("X", "1");
3304 s
= txn1
->Put("Y", "1");
3307 Transaction
* txn2
= db
->BeginTransaction(write_options
);
3309 // txn2 should be able to write to X since txn1 has expired
3310 s
= txn2
->Put("X", "2");
3315 s
= db
->Get(read_options
, "X", &value
);
3317 ASSERT_EQ("2", value
);
3319 s
= txn1
->Put("Z", "1");
3322 // txn1 should fail to commit since it is expired
3324 ASSERT_TRUE(s
.IsExpired());
3326 s
= db
->Get(read_options
, "Y", &value
);
3327 ASSERT_TRUE(s
.IsNotFound());
3329 s
= db
->Get(read_options
, "Z", &value
);
3330 ASSERT_TRUE(s
.IsNotFound());
3336 TEST_P(TransactionTest
, ReinitializeTest
) {
3337 WriteOptions write_options
;
3338 ReadOptions read_options
;
3339 TransactionOptions txn_options
;
3343 // Set txn expiration timeout to 0 microseconds (expires instantly)
3344 txn_options
.expiration
= 0;
3345 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
3347 // Reinitialize transaction to no long expire
3348 txn_options
.expiration
= -1;
3349 txn1
= db
->BeginTransaction(write_options
, txn_options
, txn1
);
3351 s
= txn1
->Put("Z", "z");
3354 // Should commit since not expired
3358 txn1
= db
->BeginTransaction(write_options
, txn_options
, txn1
);
3360 s
= txn1
->Put("Z", "zz");
3363 // Reinitilize txn1 and verify that Z gets unlocked
3364 txn1
= db
->BeginTransaction(write_options
, txn_options
, txn1
);
3366 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
, nullptr);
3367 s
= txn2
->Put("Z", "zzz");
3373 s
= db
->Get(read_options
, "Z", &value
);
3375 ASSERT_EQ(value
, "zzz");
3377 // Verify snapshots get reinitialized correctly
3378 txn1
->SetSnapshot();
3379 s
= txn1
->Put("Z", "zzzz");
3385 s
= db
->Get(read_options
, "Z", &value
);
3387 ASSERT_EQ(value
, "zzzz");
3389 txn1
= db
->BeginTransaction(write_options
, txn_options
, txn1
);
3390 const Snapshot
* snapshot
= txn1
->GetSnapshot();
3391 ASSERT_FALSE(snapshot
);
3393 txn_options
.set_snapshot
= true;
3394 txn1
= db
->BeginTransaction(write_options
, txn_options
, txn1
);
3395 snapshot
= txn1
->GetSnapshot();
3396 ASSERT_TRUE(snapshot
);
3398 s
= txn1
->Put("Z", "a");
3401 ASSERT_OK(txn1
->Rollback());
3403 s
= txn1
->Put("Y", "y");
3406 txn_options
.set_snapshot
= false;
3407 txn1
= db
->BeginTransaction(write_options
, txn_options
, txn1
);
3408 snapshot
= txn1
->GetSnapshot();
3409 ASSERT_FALSE(snapshot
);
3411 s
= txn1
->Put("X", "x");
3417 s
= db
->Get(read_options
, "Z", &value
);
3419 ASSERT_EQ(value
, "zzzz");
3421 s
= db
->Get(read_options
, "Y", &value
);
3422 ASSERT_TRUE(s
.IsNotFound());
3424 txn1
= db
->BeginTransaction(write_options
, txn_options
, txn1
);
3426 s
= txn1
->SetName("name");
3429 s
= txn1
->Prepare();
3434 txn1
= db
->BeginTransaction(write_options
, txn_options
, txn1
);
3436 s
= txn1
->SetName("name");
3442 TEST_P(TransactionTest
, Rollback
) {
3443 WriteOptions write_options
;
3444 ReadOptions read_options
;
3445 TransactionOptions txn_options
;
3449 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
3453 s
= txn1
->Put("X", "1");
3456 Transaction
* txn2
= db
->BeginTransaction(write_options
);
3458 // txn2 should not be able to write to X since txn1 has it locked
3459 s
= txn2
->Put("X", "2");
3460 ASSERT_TRUE(s
.IsTimedOut());
3462 ASSERT_OK(txn1
->Rollback());
3465 // txn2 should now be able to write to X
3466 s
= txn2
->Put("X", "3");
3472 s
= db
->Get(read_options
, "X", &value
);
3474 ASSERT_EQ("3", value
);
3479 TEST_P(TransactionTest
, LockLimitTest
) {
3480 WriteOptions write_options
;
3481 ReadOptions read_options
, snapshot_read_options
;
3482 TransactionOptions txn_options
;
3489 // Open DB with a lock limit of 3
3490 txn_db_options
.max_num_locks
= 3;
3491 ASSERT_OK(ReOpen());
3492 assert(db
!= nullptr);
3495 // Create a txn and verify we can only lock up to 3 keys
3496 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
3499 s
= txn
->Put("X", "x");
3502 s
= txn
->Put("Y", "y");
3505 s
= txn
->Put("Z", "z");
3508 // lock limit reached
3509 s
= txn
->Put("W", "w");
3510 ASSERT_TRUE(s
.IsBusy());
3512 // re-locking same key shouldn't put us over the limit
3513 s
= txn
->Put("X", "xx");
3516 s
= txn
->GetForUpdate(read_options
, "W", &value
);
3517 ASSERT_TRUE(s
.IsBusy());
3518 s
= txn
->GetForUpdate(read_options
, "V", &value
);
3519 ASSERT_TRUE(s
.IsBusy());
3521 // re-locking same key shouldn't put us over the limit
3522 s
= txn
->GetForUpdate(read_options
, "Y", &value
);
3524 ASSERT_EQ("y", value
);
3526 s
= txn
->Get(read_options
, "W", &value
);
3527 ASSERT_TRUE(s
.IsNotFound());
3529 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
3532 // "X" currently locked
3533 s
= txn2
->Put("X", "x");
3534 ASSERT_TRUE(s
.IsTimedOut());
3536 // lock limit reached
3537 s
= txn2
->Put("M", "m");
3538 ASSERT_TRUE(s
.IsBusy());
3543 s
= db
->Get(read_options
, "X", &value
);
3545 ASSERT_EQ("xx", value
);
3547 s
= db
->Get(read_options
, "W", &value
);
3548 ASSERT_TRUE(s
.IsNotFound());
3550 // Committing txn should release its locks and allow txn2 to proceed
3551 s
= txn2
->Put("X", "x2");
3554 s
= txn2
->Delete("X");
3557 s
= txn2
->Put("M", "m");
3560 s
= txn2
->Put("Z", "z2");
3563 // lock limit reached
3564 s
= txn2
->Delete("Y");
3565 ASSERT_TRUE(s
.IsBusy());
3570 s
= db
->Get(read_options
, "Z", &value
);
3572 ASSERT_EQ("z2", value
);
3574 s
= db
->Get(read_options
, "Y", &value
);
3576 ASSERT_EQ("y", value
);
3578 s
= db
->Get(read_options
, "X", &value
);
3579 ASSERT_TRUE(s
.IsNotFound());
3585 TEST_P(TransactionTest
, IteratorTest
) {
3586 // This test does writes without snapshot validation, and then tries to create
3587 // iterator later, which is unsupported in write unprepared.
3588 if (txn_db_options
.write_policy
== WRITE_UNPREPARED
) {
3592 WriteOptions write_options
;
3593 ReadOptions read_options
, snapshot_read_options
;
3597 // Write some keys to the db
3598 s
= db
->Put(write_options
, "A", "a");
3601 s
= db
->Put(write_options
, "G", "g");
3604 s
= db
->Put(write_options
, "F", "f");
3607 s
= db
->Put(write_options
, "C", "c");
3610 s
= db
->Put(write_options
, "D", "d");
3613 Transaction
* txn
= db
->BeginTransaction(write_options
);
3616 // Write some keys in a txn
3617 s
= txn
->Put("B", "b");
3620 s
= txn
->Put("H", "h");
3623 s
= txn
->Delete("D");
3626 s
= txn
->Put("E", "e");
3630 const Snapshot
* snapshot
= txn
->GetSnapshot();
3632 // Write some keys to the db after the snapshot
3633 s
= db
->Put(write_options
, "BB", "xx");
3636 s
= db
->Put(write_options
, "C", "xx");
3639 read_options
.snapshot
= snapshot
;
3640 Iterator
* iter
= txn
->GetIterator(read_options
);
3641 ASSERT_OK(iter
->status());
3642 iter
->SeekToFirst();
3644 // Read all keys via iter and lock them all
3645 std::string results
[] = {"a", "b", "c", "e", "f", "g", "h"};
3646 for (int i
= 0; i
< 7; i
++) {
3647 ASSERT_OK(iter
->status());
3648 ASSERT_TRUE(iter
->Valid());
3649 ASSERT_EQ(results
[i
], iter
->value().ToString());
3651 s
= txn
->GetForUpdate(read_options
, iter
->key(), nullptr);
3653 // "C" was modified after txn's snapshot
3654 ASSERT_TRUE(s
.IsBusy());
3661 ASSERT_FALSE(iter
->Valid());
3664 ASSERT_OK(iter
->status());
3665 ASSERT_TRUE(iter
->Valid());
3666 ASSERT_EQ("g", iter
->value().ToString());
3669 ASSERT_OK(iter
->status());
3670 ASSERT_TRUE(iter
->Valid());
3671 ASSERT_EQ("f", iter
->value().ToString());
3674 ASSERT_OK(iter
->status());
3675 ASSERT_TRUE(iter
->Valid());
3676 ASSERT_EQ("e", iter
->value().ToString());
3679 ASSERT_OK(iter
->status());
3680 ASSERT_TRUE(iter
->Valid());
3681 ASSERT_EQ("c", iter
->value().ToString());
3684 ASSERT_OK(iter
->status());
3685 ASSERT_TRUE(iter
->Valid());
3686 ASSERT_EQ("e", iter
->value().ToString());
3689 ASSERT_OK(iter
->status());
3690 ASSERT_TRUE(iter
->Valid());
3691 ASSERT_EQ("a", iter
->value().ToString());
3694 ASSERT_OK(iter
->status());
3695 ASSERT_FALSE(iter
->Valid());
3698 ASSERT_OK(iter
->status());
3699 ASSERT_TRUE(iter
->Valid());
3700 ASSERT_EQ("h", iter
->value().ToString());
3709 TEST_P(TransactionTest
, DisableIndexingTest
) {
3710 // Skip this test for write unprepared. It does not solely rely on WBWI for
3711 // read your own writes, so depending on whether batches are flushed or not,
3712 // only some writes will be visible.
3714 // Also, write unprepared does not support creating iterators if there has
3715 // been txn->Put() without snapshot validation.
3716 if (txn_db_options
.write_policy
== WRITE_UNPREPARED
) {
3720 WriteOptions write_options
;
3721 ReadOptions read_options
;
3725 Transaction
* txn
= db
->BeginTransaction(write_options
);
3728 s
= txn
->Put("A", "a");
3731 s
= txn
->Get(read_options
, "A", &value
);
3733 ASSERT_EQ("a", value
);
3735 txn
->DisableIndexing();
3737 s
= txn
->Put("B", "b");
3740 s
= txn
->Get(read_options
, "B", &value
);
3741 ASSERT_TRUE(s
.IsNotFound());
3743 Iterator
* iter
= txn
->GetIterator(read_options
);
3744 ASSERT_OK(iter
->status());
3747 ASSERT_OK(iter
->status());
3748 ASSERT_FALSE(iter
->Valid());
3750 s
= txn
->Delete("A");
3752 s
= txn
->Get(read_options
, "A", &value
);
3754 ASSERT_EQ("a", value
);
3756 txn
->EnableIndexing();
3758 s
= txn
->Put("B", "bb");
3762 ASSERT_OK(iter
->status());
3763 ASSERT_TRUE(iter
->Valid());
3764 ASSERT_EQ("bb", iter
->value().ToString());
3766 s
= txn
->Get(read_options
, "B", &value
);
3768 ASSERT_EQ("bb", value
);
3770 s
= txn
->Put("A", "aa");
3773 s
= txn
->Get(read_options
, "A", &value
);
3775 ASSERT_EQ("aa", value
);
3781 TEST_P(TransactionTest
, SavepointTest
) {
3782 WriteOptions write_options
;
3783 ReadOptions read_options
, snapshot_read_options
;
3787 Transaction
* txn
= db
->BeginTransaction(write_options
);
3790 ASSERT_EQ(0, txn
->GetNumPuts());
3792 s
= txn
->RollbackToSavePoint();
3793 ASSERT_TRUE(s
.IsNotFound());
3795 txn
->SetSavePoint(); // 1
3797 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to beginning of txn
3798 s
= txn
->RollbackToSavePoint();
3799 ASSERT_TRUE(s
.IsNotFound());
3801 s
= txn
->Put("B", "b");
3804 ASSERT_EQ(1, txn
->GetNumPuts());
3805 ASSERT_EQ(0, txn
->GetNumDeletes());
3810 s
= db
->Get(read_options
, "B", &value
);
3812 ASSERT_EQ("b", value
);
3815 txn
= db
->BeginTransaction(write_options
);
3818 s
= txn
->Put("A", "a");
3821 s
= txn
->Put("B", "bb");
3824 s
= txn
->Put("C", "c");
3827 txn
->SetSavePoint(); // 2
3829 s
= txn
->Delete("B");
3832 s
= txn
->Put("C", "cc");
3835 s
= txn
->Put("D", "d");
3838 ASSERT_EQ(5, txn
->GetNumPuts());
3839 ASSERT_EQ(1, txn
->GetNumDeletes());
3841 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to 2
3843 ASSERT_EQ(3, txn
->GetNumPuts());
3844 ASSERT_EQ(0, txn
->GetNumDeletes());
3846 s
= txn
->Get(read_options
, "A", &value
);
3848 ASSERT_EQ("a", value
);
3850 s
= txn
->Get(read_options
, "B", &value
);
3852 ASSERT_EQ("bb", value
);
3854 s
= txn
->Get(read_options
, "C", &value
);
3856 ASSERT_EQ("c", value
);
3858 s
= txn
->Get(read_options
, "D", &value
);
3859 ASSERT_TRUE(s
.IsNotFound());
3861 s
= txn
->Put("A", "a");
3864 s
= txn
->Put("E", "e");
3867 ASSERT_EQ(5, txn
->GetNumPuts());
3868 ASSERT_EQ(0, txn
->GetNumDeletes());
3870 // Rollback to beginning of txn
3871 s
= txn
->RollbackToSavePoint();
3872 ASSERT_TRUE(s
.IsNotFound());
3873 ASSERT_OK(txn
->Rollback());
3875 ASSERT_EQ(0, txn
->GetNumPuts());
3876 ASSERT_EQ(0, txn
->GetNumDeletes());
3878 s
= txn
->Get(read_options
, "A", &value
);
3879 ASSERT_TRUE(s
.IsNotFound());
3881 s
= txn
->Get(read_options
, "B", &value
);
3883 ASSERT_EQ("b", value
);
3885 s
= txn
->Get(read_options
, "D", &value
);
3886 ASSERT_TRUE(s
.IsNotFound());
3888 s
= txn
->Get(read_options
, "D", &value
);
3889 ASSERT_TRUE(s
.IsNotFound());
3891 s
= txn
->Get(read_options
, "E", &value
);
3892 ASSERT_TRUE(s
.IsNotFound());
3894 s
= txn
->Put("A", "aa");
3897 s
= txn
->Put("F", "f");
3900 ASSERT_EQ(2, txn
->GetNumPuts());
3901 ASSERT_EQ(0, txn
->GetNumDeletes());
3903 txn
->SetSavePoint(); // 3
3904 txn
->SetSavePoint(); // 4
3906 s
= txn
->Put("G", "g");
3909 s
= txn
->SingleDelete("F");
3912 s
= txn
->Delete("B");
3915 s
= txn
->Get(read_options
, "A", &value
);
3917 ASSERT_EQ("aa", value
);
3919 s
= txn
->Get(read_options
, "F", &value
);
3920 // According to db.h, doing a SingleDelete on a key that has been
3921 // overwritten will have undefinied behavior. So it is unclear what the
3922 // result of fetching "F" should be. The current implementation will
3923 // return NotFound in this case.
3924 ASSERT_TRUE(s
.IsNotFound());
3926 s
= txn
->Get(read_options
, "B", &value
);
3927 ASSERT_TRUE(s
.IsNotFound());
3929 ASSERT_EQ(3, txn
->GetNumPuts());
3930 ASSERT_EQ(2, txn
->GetNumDeletes());
3932 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to 3
3934 ASSERT_EQ(2, txn
->GetNumPuts());
3935 ASSERT_EQ(0, txn
->GetNumDeletes());
3937 s
= txn
->Get(read_options
, "F", &value
);
3939 ASSERT_EQ("f", value
);
3941 s
= txn
->Get(read_options
, "G", &value
);
3942 ASSERT_TRUE(s
.IsNotFound());
3947 s
= db
->Get(read_options
, "F", &value
);
3949 ASSERT_EQ("f", value
);
3951 s
= db
->Get(read_options
, "G", &value
);
3952 ASSERT_TRUE(s
.IsNotFound());
3954 s
= db
->Get(read_options
, "A", &value
);
3956 ASSERT_EQ("aa", value
);
3958 s
= db
->Get(read_options
, "B", &value
);
3960 ASSERT_EQ("b", value
);
3962 s
= db
->Get(read_options
, "C", &value
);
3963 ASSERT_TRUE(s
.IsNotFound());
3965 s
= db
->Get(read_options
, "D", &value
);
3966 ASSERT_TRUE(s
.IsNotFound());
3968 s
= db
->Get(read_options
, "E", &value
);
3969 ASSERT_TRUE(s
.IsNotFound());
3974 TEST_P(TransactionTest
, SavepointTest2
) {
3975 WriteOptions write_options
;
3976 ReadOptions read_options
;
3977 TransactionOptions txn_options
;
3980 txn_options
.lock_timeout
= 1; // 1 ms
3981 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
3984 s
= txn1
->Put("A", "");
3987 txn1
->SetSavePoint(); // 1
3989 s
= txn1
->Put("A", "a");
3992 s
= txn1
->Put("C", "c");
3995 txn1
->SetSavePoint(); // 2
3997 s
= txn1
->Put("A", "a");
3999 s
= txn1
->Put("B", "b");
4002 ASSERT_OK(txn1
->RollbackToSavePoint()); // Rollback to 2
4004 // Verify that "A" and "C" is still locked while "B" is not
4005 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
4008 s
= txn2
->Put("A", "a2");
4009 ASSERT_TRUE(s
.IsTimedOut());
4010 s
= txn2
->Put("C", "c2");
4011 ASSERT_TRUE(s
.IsTimedOut());
4012 s
= txn2
->Put("B", "b2");
4015 s
= txn1
->Put("A", "aa");
4017 s
= txn1
->Put("B", "bb");
4018 ASSERT_TRUE(s
.IsTimedOut());
4024 s
= txn1
->Put("A", "aaa");
4026 s
= txn1
->Put("B", "bbb");
4028 s
= txn1
->Put("C", "ccc");
4031 txn1
->SetSavePoint(); // 3
4032 ASSERT_OK(txn1
->RollbackToSavePoint()); // Rollback to 3
4034 // Verify that "A", "B", "C" are still locked
4035 txn2
= db
->BeginTransaction(write_options
, txn_options
);
4038 s
= txn2
->Put("A", "a2");
4039 ASSERT_TRUE(s
.IsTimedOut());
4040 s
= txn2
->Put("B", "b2");
4041 ASSERT_TRUE(s
.IsTimedOut());
4042 s
= txn2
->Put("C", "c2");
4043 ASSERT_TRUE(s
.IsTimedOut());
4045 ASSERT_OK(txn1
->RollbackToSavePoint()); // Rollback to 1
4047 // Verify that only "A" is locked
4048 s
= txn2
->Put("A", "a3");
4049 ASSERT_TRUE(s
.IsTimedOut());
4050 s
= txn2
->Put("B", "b3");
4052 s
= txn2
->Put("C", "c3po");
4059 // Verify "A" "C" "B" are no longer locked
4060 s
= txn2
->Put("A", "a4");
4062 s
= txn2
->Put("B", "b4");
4064 s
= txn2
->Put("C", "c4");
4072 TEST_P(TransactionTest
, SavepointTest3
) {
4073 WriteOptions write_options
;
4074 ReadOptions read_options
;
4075 TransactionOptions txn_options
;
4078 txn_options
.lock_timeout
= 1; // 1 ms
4079 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
4082 s
= txn1
->PopSavePoint(); // No SavePoint present
4083 ASSERT_TRUE(s
.IsNotFound());
4085 s
= txn1
->Put("A", "");
4088 s
= txn1
->PopSavePoint(); // Still no SavePoint present
4089 ASSERT_TRUE(s
.IsNotFound());
4091 txn1
->SetSavePoint(); // 1
4093 s
= txn1
->Put("A", "a");
4096 s
= txn1
->PopSavePoint(); // Remove 1
4097 ASSERT_TRUE(txn1
->RollbackToSavePoint().IsNotFound());
4099 // Verify that "A" is still locked
4100 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
4103 s
= txn2
->Put("A", "a2");
4104 ASSERT_TRUE(s
.IsTimedOut());
4107 txn1
->SetSavePoint(); // 2
4109 s
= txn1
->Put("B", "b");
4112 txn1
->SetSavePoint(); // 3
4114 s
= txn1
->Put("B", "b2");
4117 ASSERT_OK(txn1
->RollbackToSavePoint()); // Roll back to 2
4119 s
= txn1
->PopSavePoint();
4122 s
= txn1
->PopSavePoint();
4123 ASSERT_TRUE(s
.IsNotFound());
4131 // tnx1 should have modified "A" to "a"
4132 s
= db
->Get(read_options
, "A", &value
);
4134 ASSERT_EQ("a", value
);
4136 // tnx1 should have set "B" to just "b"
4137 s
= db
->Get(read_options
, "B", &value
);
4139 ASSERT_EQ("b", value
);
4141 s
= db
->Get(read_options
, "C", &value
);
4142 ASSERT_TRUE(s
.IsNotFound());
4145 TEST_P(TransactionTest
, SavepointTest4
) {
4146 WriteOptions write_options
;
4147 ReadOptions read_options
;
4148 TransactionOptions txn_options
;
4151 txn_options
.lock_timeout
= 1; // 1 ms
4152 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
4155 txn1
->SetSavePoint(); // 1
4156 s
= txn1
->Put("A", "a");
4159 txn1
->SetSavePoint(); // 2
4160 s
= txn1
->Put("B", "b");
4163 s
= txn1
->PopSavePoint(); // Remove 2
4166 // Verify that A/B still exists.
4168 ASSERT_OK(txn1
->Get(read_options
, "A", &value
));
4169 ASSERT_EQ("a", value
);
4171 ASSERT_OK(txn1
->Get(read_options
, "B", &value
));
4172 ASSERT_EQ("b", value
);
4174 ASSERT_OK(txn1
->RollbackToSavePoint()); // Rollback to 1
4176 // Verify that everything was rolled back.
4177 s
= txn1
->Get(read_options
, "A", &value
);
4178 ASSERT_TRUE(s
.IsNotFound());
4180 s
= txn1
->Get(read_options
, "B", &value
);
4181 ASSERT_TRUE(s
.IsNotFound());
4183 // Nothing should be locked
4184 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
4187 s
= txn2
->Put("A", "");
4190 s
= txn2
->Put("B", "");
4197 TEST_P(TransactionTest
, UndoGetForUpdateTest
) {
4198 WriteOptions write_options
;
4199 ReadOptions read_options
;
4200 TransactionOptions txn_options
;
4204 txn_options
.lock_timeout
= 1; // 1 ms
4205 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
4208 txn1
->UndoGetForUpdate("A");
4214 txn1
= db
->BeginTransaction(write_options
, txn_options
);
4216 txn1
->UndoGetForUpdate("A");
4217 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
4218 ASSERT_TRUE(s
.IsNotFound());
4220 // Verify that A is locked
4221 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
4222 s
= txn2
->Put("A", "a");
4223 ASSERT_TRUE(s
.IsTimedOut());
4225 txn1
->UndoGetForUpdate("A");
4227 // Verify that A is now unlocked
4228 s
= txn2
->Put("A", "a2");
4230 ASSERT_OK(txn2
->Commit());
4232 s
= db
->Get(read_options
, "A", &value
);
4234 ASSERT_EQ("a2", value
);
4236 s
= txn1
->Delete("A");
4238 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
4239 ASSERT_TRUE(s
.IsNotFound());
4241 s
= txn1
->Put("B", "b3");
4243 s
= txn1
->GetForUpdate(read_options
, "B", &value
);
4246 txn1
->UndoGetForUpdate("A");
4247 txn1
->UndoGetForUpdate("B");
4249 // Verify that A and B are still locked
4250 txn2
= db
->BeginTransaction(write_options
, txn_options
);
4251 s
= txn2
->Put("A", "a4");
4252 ASSERT_TRUE(s
.IsTimedOut());
4253 s
= txn2
->Put("B", "b4");
4254 ASSERT_TRUE(s
.IsTimedOut());
4256 ASSERT_OK(txn1
->Rollback());
4259 // Verify that A and B are no longer locked
4260 s
= txn2
->Put("A", "a5");
4262 s
= txn2
->Put("B", "b5");
4268 txn1
= db
->BeginTransaction(write_options
, txn_options
);
4270 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
4272 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
4274 s
= txn1
->GetForUpdate(read_options
, "C", &value
);
4275 ASSERT_TRUE(s
.IsNotFound());
4276 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
4278 s
= txn1
->GetForUpdate(read_options
, "C", &value
);
4279 ASSERT_TRUE(s
.IsNotFound());
4280 s
= txn1
->GetForUpdate(read_options
, "B", &value
);
4282 s
= txn1
->Put("B", "b5");
4283 s
= txn1
->GetForUpdate(read_options
, "B", &value
);
4286 txn1
->UndoGetForUpdate("A");
4287 txn1
->UndoGetForUpdate("B");
4288 txn1
->UndoGetForUpdate("C");
4289 txn1
->UndoGetForUpdate("X");
4291 // Verify A,B,C are locked
4292 txn2
= db
->BeginTransaction(write_options
, txn_options
);
4293 s
= txn2
->Put("A", "a6");
4294 ASSERT_TRUE(s
.IsTimedOut());
4295 s
= txn2
->Delete("B");
4296 ASSERT_TRUE(s
.IsTimedOut());
4297 s
= txn2
->Put("C", "c6");
4298 ASSERT_TRUE(s
.IsTimedOut());
4299 s
= txn2
->Put("X", "x6");
4302 txn1
->UndoGetForUpdate("A");
4303 txn1
->UndoGetForUpdate("B");
4304 txn1
->UndoGetForUpdate("C");
4305 txn1
->UndoGetForUpdate("X");
4307 // Verify A,B are locked and C is not
4308 s
= txn2
->Put("A", "a6");
4309 ASSERT_TRUE(s
.IsTimedOut());
4310 s
= txn2
->Delete("B");
4311 ASSERT_TRUE(s
.IsTimedOut());
4312 s
= txn2
->Put("C", "c6");
4314 s
= txn2
->Put("X", "x6");
4317 txn1
->UndoGetForUpdate("A");
4318 txn1
->UndoGetForUpdate("B");
4319 txn1
->UndoGetForUpdate("C");
4320 txn1
->UndoGetForUpdate("X");
4322 // Verify B is locked and A and C are not
4323 s
= txn2
->Put("A", "a7");
4325 s
= txn2
->Delete("B");
4326 ASSERT_TRUE(s
.IsTimedOut());
4327 s
= txn2
->Put("C", "c7");
4329 s
= txn2
->Put("X", "x7");
4341 TEST_P(TransactionTest
, UndoGetForUpdateTest2
) {
4342 WriteOptions write_options
;
4343 ReadOptions read_options
;
4344 TransactionOptions txn_options
;
4348 s
= db
->Put(write_options
, "A", "");
4351 txn_options
.lock_timeout
= 1; // 1 ms
4352 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
4355 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
4357 s
= txn1
->GetForUpdate(read_options
, "B", &value
);
4358 ASSERT_TRUE(s
.IsNotFound());
4360 s
= txn1
->Put("F", "f");
4363 txn1
->SetSavePoint(); // 1
4365 txn1
->UndoGetForUpdate("A");
4367 s
= txn1
->GetForUpdate(read_options
, "C", &value
);
4368 ASSERT_TRUE(s
.IsNotFound());
4369 s
= txn1
->GetForUpdate(read_options
, "D", &value
);
4370 ASSERT_TRUE(s
.IsNotFound());
4372 s
= txn1
->Put("E", "e");
4374 s
= txn1
->GetForUpdate(read_options
, "E", &value
);
4377 s
= txn1
->GetForUpdate(read_options
, "F", &value
);
4380 // Verify A,B,C,D,E,F are still locked
4381 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
4382 s
= txn2
->Put("A", "a1");
4383 ASSERT_TRUE(s
.IsTimedOut());
4384 s
= txn2
->Put("B", "b1");
4385 ASSERT_TRUE(s
.IsTimedOut());
4386 s
= txn2
->Put("C", "c1");
4387 ASSERT_TRUE(s
.IsTimedOut());
4388 s
= txn2
->Put("D", "d1");
4389 ASSERT_TRUE(s
.IsTimedOut());
4390 s
= txn2
->Put("E", "e1");
4391 ASSERT_TRUE(s
.IsTimedOut());
4392 s
= txn2
->Put("F", "f1");
4393 ASSERT_TRUE(s
.IsTimedOut());
4395 txn1
->UndoGetForUpdate("C");
4396 txn1
->UndoGetForUpdate("E");
4398 // Verify A,B,D,E,F are still locked and C is not.
4399 s
= txn2
->Put("A", "a2");
4400 ASSERT_TRUE(s
.IsTimedOut());
4401 s
= txn2
->Put("B", "b2");
4402 ASSERT_TRUE(s
.IsTimedOut());
4403 s
= txn2
->Put("D", "d2");
4404 ASSERT_TRUE(s
.IsTimedOut());
4405 s
= txn2
->Put("E", "e2");
4406 ASSERT_TRUE(s
.IsTimedOut());
4407 s
= txn2
->Put("F", "f2");
4408 ASSERT_TRUE(s
.IsTimedOut());
4409 s
= txn2
->Put("C", "c2");
4412 txn1
->SetSavePoint(); // 2
4414 s
= txn1
->Put("H", "h");
4417 txn1
->UndoGetForUpdate("A");
4418 txn1
->UndoGetForUpdate("B");
4419 txn1
->UndoGetForUpdate("C");
4420 txn1
->UndoGetForUpdate("D");
4421 txn1
->UndoGetForUpdate("E");
4422 txn1
->UndoGetForUpdate("F");
4423 txn1
->UndoGetForUpdate("G");
4424 txn1
->UndoGetForUpdate("H");
4426 // Verify A,B,D,E,F,H are still locked and C,G are not.
4427 s
= txn2
->Put("A", "a3");
4428 ASSERT_TRUE(s
.IsTimedOut());
4429 s
= txn2
->Put("B", "b3");
4430 ASSERT_TRUE(s
.IsTimedOut());
4431 s
= txn2
->Put("D", "d3");
4432 ASSERT_TRUE(s
.IsTimedOut());
4433 s
= txn2
->Put("E", "e3");
4434 ASSERT_TRUE(s
.IsTimedOut());
4435 s
= txn2
->Put("F", "f3");
4436 ASSERT_TRUE(s
.IsTimedOut());
4437 s
= txn2
->Put("H", "h3");
4438 ASSERT_TRUE(s
.IsTimedOut());
4439 s
= txn2
->Put("C", "c3");
4441 s
= txn2
->Put("G", "g3");
4444 ASSERT_OK(txn1
->RollbackToSavePoint()); // rollback to 2
4446 // Verify A,B,D,E,F are still locked and C,G,H are not.
4447 s
= txn2
->Put("A", "a3");
4448 ASSERT_TRUE(s
.IsTimedOut());
4449 s
= txn2
->Put("B", "b3");
4450 ASSERT_TRUE(s
.IsTimedOut());
4451 s
= txn2
->Put("D", "d3");
4452 ASSERT_TRUE(s
.IsTimedOut());
4453 s
= txn2
->Put("E", "e3");
4454 ASSERT_TRUE(s
.IsTimedOut());
4455 s
= txn2
->Put("F", "f3");
4456 ASSERT_TRUE(s
.IsTimedOut());
4457 s
= txn2
->Put("C", "c3");
4459 s
= txn2
->Put("G", "g3");
4461 s
= txn2
->Put("H", "h3");
4464 txn1
->UndoGetForUpdate("A");
4465 txn1
->UndoGetForUpdate("B");
4466 txn1
->UndoGetForUpdate("C");
4467 txn1
->UndoGetForUpdate("D");
4468 txn1
->UndoGetForUpdate("E");
4469 txn1
->UndoGetForUpdate("F");
4470 txn1
->UndoGetForUpdate("G");
4471 txn1
->UndoGetForUpdate("H");
4473 // Verify A,B,E,F are still locked and C,D,G,H are not.
4474 s
= txn2
->Put("A", "a3");
4475 ASSERT_TRUE(s
.IsTimedOut());
4476 s
= txn2
->Put("B", "b3");
4477 ASSERT_TRUE(s
.IsTimedOut());
4478 s
= txn2
->Put("E", "e3");
4479 ASSERT_TRUE(s
.IsTimedOut());
4480 s
= txn2
->Put("F", "f3");
4481 ASSERT_TRUE(s
.IsTimedOut());
4482 s
= txn2
->Put("C", "c3");
4484 s
= txn2
->Put("D", "d3");
4486 s
= txn2
->Put("G", "g3");
4488 s
= txn2
->Put("H", "h3");
4491 ASSERT_OK(txn1
->RollbackToSavePoint()); // rollback to 1
4493 // Verify A,B,F are still locked and C,D,E,G,H are not.
4494 s
= txn2
->Put("A", "a3");
4495 ASSERT_TRUE(s
.IsTimedOut());
4496 s
= txn2
->Put("B", "b3");
4497 ASSERT_TRUE(s
.IsTimedOut());
4498 s
= txn2
->Put("F", "f3");
4499 ASSERT_TRUE(s
.IsTimedOut());
4500 s
= txn2
->Put("C", "c3");
4502 s
= txn2
->Put("D", "d3");
4504 s
= txn2
->Put("E", "e3");
4506 s
= txn2
->Put("G", "g3");
4508 s
= txn2
->Put("H", "h3");
4511 txn1
->UndoGetForUpdate("A");
4512 txn1
->UndoGetForUpdate("B");
4513 txn1
->UndoGetForUpdate("C");
4514 txn1
->UndoGetForUpdate("D");
4515 txn1
->UndoGetForUpdate("E");
4516 txn1
->UndoGetForUpdate("F");
4517 txn1
->UndoGetForUpdate("G");
4518 txn1
->UndoGetForUpdate("H");
4520 // Verify F is still locked and A,B,C,D,E,G,H are not.
4521 s
= txn2
->Put("F", "f3");
4522 ASSERT_TRUE(s
.IsTimedOut());
4523 s
= txn2
->Put("A", "a3");
4525 s
= txn2
->Put("B", "b3");
4527 s
= txn2
->Put("C", "c3");
4529 s
= txn2
->Put("D", "d3");
4531 s
= txn2
->Put("E", "e3");
4533 s
= txn2
->Put("G", "g3");
4535 s
= txn2
->Put("H", "h3");
4547 TEST_P(TransactionTest
, TimeoutTest
) {
4548 WriteOptions write_options
;
4549 ReadOptions read_options
;
4556 // transaction writes have an infinite timeout,
4557 // but we will override this when we start a txn
4558 // db writes have infinite timeout
4559 txn_db_options
.transaction_lock_timeout
= -1;
4560 txn_db_options
.default_lock_timeout
= -1;
4562 s
= TransactionDB::Open(options
, txn_db_options
, dbname
, &db
);
4563 assert(db
!= nullptr);
4566 s
= db
->Put(write_options
, "aaa", "aaa");
4569 TransactionOptions txn_options0
;
4570 txn_options0
.expiration
= 100; // 100ms
4571 txn_options0
.lock_timeout
= 50; // txn timeout no longer infinite
4572 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options0
);
4574 s
= txn1
->GetForUpdate(read_options
, "aaa", nullptr);
4577 // Conflicts with previous GetForUpdate.
4578 // Since db writes do not have a timeout, this should eventually succeed when
4579 // the transaction expires.
4580 s
= db
->Put(write_options
, "aaa", "xxx");
4583 ASSERT_GE(txn1
->GetElapsedTime(),
4584 static_cast<uint64_t>(txn_options0
.expiration
));
4587 ASSERT_TRUE(s
.IsExpired()); // expired!
4589 s
= db
->Get(read_options
, "aaa", &value
);
4591 ASSERT_EQ("xxx", value
);
4596 // transaction writes have 10ms timeout,
4597 // db writes have infinite timeout
4598 txn_db_options
.transaction_lock_timeout
= 50;
4599 txn_db_options
.default_lock_timeout
= -1;
4601 s
= TransactionDB::Open(options
, txn_db_options
, dbname
, &db
);
4604 s
= db
->Put(write_options
, "aaa", "aaa");
4607 TransactionOptions txn_options
;
4608 txn_options
.expiration
= 100; // 100ms
4609 txn1
= db
->BeginTransaction(write_options
, txn_options
);
4611 s
= txn1
->GetForUpdate(read_options
, "aaa", nullptr);
4614 // Conflicts with previous GetForUpdate.
4615 // Since db writes do not have a timeout, this should eventually succeed when
4616 // the transaction expires.
4617 s
= db
->Put(write_options
, "aaa", "xxx");
4621 ASSERT_NOK(s
); // expired!
4623 s
= db
->Get(read_options
, "aaa", &value
);
4625 ASSERT_EQ("xxx", value
);
4628 txn_options
.expiration
= 6000000; // 100 minutes
4629 txn_options
.lock_timeout
= 1; // 1ms
4630 txn1
= db
->BeginTransaction(write_options
, txn_options
);
4631 txn1
->SetLockTimeout(100);
4633 TransactionOptions txn_options2
;
4634 txn_options2
.expiration
= 10; // 10ms
4635 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options2
);
4638 s
= txn2
->Put("a", "2");
4641 // txn1 has a lock timeout longer than txn2's expiration, so it will win
4642 s
= txn1
->Delete("a");
4648 // txn2 should be expired out since txn1 waiting until its timeout expired.
4650 ASSERT_TRUE(s
.IsExpired());
4654 txn_options
.expiration
= 6000000; // 100 minutes
4655 txn1
= db
->BeginTransaction(write_options
, txn_options
);
4656 txn_options2
.expiration
= 100000000;
4657 txn2
= db
->BeginTransaction(write_options
, txn_options2
);
4659 s
= txn1
->Delete("asdf");
4662 // txn2 has a smaller lock timeout than txn1's expiration, so it will time out
4663 s
= txn2
->Delete("asdf");
4664 ASSERT_TRUE(s
.IsTimedOut());
4665 ASSERT_EQ(s
.ToString(), "Operation timed out: Timeout waiting to lock key");
4670 s
= txn2
->Put("asdf", "asdf");
4676 s
= db
->Get(read_options
, "asdf", &value
);
4678 ASSERT_EQ("asdf", value
);
4684 TEST_P(TransactionTest
, SingleDeleteTest
) {
4685 WriteOptions write_options
;
4686 ReadOptions read_options
;
4690 Transaction
* txn
= db
->BeginTransaction(write_options
);
4693 s
= txn
->SingleDelete("A");
4696 s
= txn
->Get(read_options
, "A", &value
);
4697 ASSERT_TRUE(s
.IsNotFound());
4703 txn
= db
->BeginTransaction(write_options
);
4705 s
= txn
->SingleDelete("A");
4708 s
= txn
->Put("A", "a");
4711 s
= txn
->Get(read_options
, "A", &value
);
4713 ASSERT_EQ("a", value
);
4719 s
= db
->Get(read_options
, "A", &value
);
4721 ASSERT_EQ("a", value
);
4723 txn
= db
->BeginTransaction(write_options
);
4725 s
= txn
->SingleDelete("A");
4728 s
= txn
->Get(read_options
, "A", &value
);
4729 ASSERT_TRUE(s
.IsNotFound());
4735 s
= db
->Get(read_options
, "A", &value
);
4736 ASSERT_TRUE(s
.IsNotFound());
4738 txn
= db
->BeginTransaction(write_options
);
4739 Transaction
* txn2
= db
->BeginTransaction(write_options
);
4740 txn2
->SetSnapshot();
4742 s
= txn
->Put("A", "a");
4745 s
= txn
->Put("A", "a2");
4748 s
= txn
->SingleDelete("A");
4751 s
= txn
->SingleDelete("B");
4754 // According to db.h, doing a SingleDelete on a key that has been
4755 // overwritten will have undefinied behavior. So it is unclear what the
4756 // result of fetching "A" should be. The current implementation will
4757 // return NotFound in this case.
4758 s
= txn
->Get(read_options
, "A", &value
);
4759 ASSERT_TRUE(s
.IsNotFound());
4761 s
= txn2
->Put("B", "b");
4762 ASSERT_TRUE(s
.IsTimedOut());
4771 // According to db.h, doing a SingleDelete on a key that has been
4772 // overwritten will have undefinied behavior. So it is unclear what the
4773 // result of fetching "A" should be. The current implementation will
4774 // return NotFound in this case.
4775 s
= db
->Get(read_options
, "A", &value
);
4776 ASSERT_TRUE(s
.IsNotFound());
4778 s
= db
->Get(read_options
, "B", &value
);
4779 ASSERT_TRUE(s
.IsNotFound());
4782 TEST_P(TransactionTest
, MergeTest
) {
4783 WriteOptions write_options
;
4784 ReadOptions read_options
;
4788 Transaction
* txn
= db
->BeginTransaction(write_options
, TransactionOptions());
4791 s
= db
->Put(write_options
, "A", "a0");
4794 s
= txn
->Merge("A", "1");
4797 s
= txn
->Merge("A", "2");
4800 s
= txn
->Get(read_options
, "A", &value
);
4801 ASSERT_TRUE(s
.IsMergeInProgress());
4803 s
= txn
->Put("A", "a");
4806 s
= txn
->Get(read_options
, "A", &value
);
4808 ASSERT_EQ("a", value
);
4810 s
= txn
->Merge("A", "3");
4813 s
= txn
->Get(read_options
, "A", &value
);
4814 ASSERT_TRUE(s
.IsMergeInProgress());
4816 TransactionOptions txn_options
;
4817 txn_options
.lock_timeout
= 1; // 1 ms
4818 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
4821 // verify that txn has "A" locked
4822 s
= txn2
->Merge("A", "4");
4823 ASSERT_TRUE(s
.IsTimedOut());
4833 s
= db
->Get(read_options
, "A", &value
);
4835 ASSERT_EQ("a,3", value
);
4838 TEST_P(TransactionTest
, DeleteRangeSupportTest
) {
4839 // The `DeleteRange()` API is banned everywhere.
4841 db
->DeleteRange(WriteOptions(), db
->DefaultColumnFamily(), "a", "b")
4844 // But range deletions can be added via the `Write()` API by specifying the
4845 // proper flags to promise there are no conflicts according to the DB type
4846 // (see `TransactionDB::DeleteRange()` API doc for details).
4847 for (bool skip_concurrency_control
: {false, true}) {
4848 for (bool skip_duplicate_key_check
: {false, true}) {
4849 ASSERT_OK(db
->Put(WriteOptions(), "a", "val"));
4851 ASSERT_OK(wb
.DeleteRange("a", "b"));
4852 TransactionDBWriteOptimizations flags
;
4853 flags
.skip_concurrency_control
= skip_concurrency_control
;
4854 flags
.skip_duplicate_key_check
= skip_duplicate_key_check
;
4855 Status s
= db
->Write(WriteOptions(), flags
, &wb
);
4857 switch (txn_db_options
.write_policy
) {
4858 case WRITE_COMMITTED
:
4859 if (skip_concurrency_control
) {
4861 ASSERT_TRUE(db
->Get(ReadOptions(), "a", &value
).IsNotFound());
4864 ASSERT_OK(db
->Get(ReadOptions(), "a", &value
));
4867 case WRITE_PREPARED
:
4868 // Intentional fall-through
4869 case WRITE_UNPREPARED
:
4870 if (skip_concurrency_control
&& skip_duplicate_key_check
) {
4872 ASSERT_TRUE(db
->Get(ReadOptions(), "a", &value
).IsNotFound());
4875 ASSERT_OK(db
->Get(ReadOptions(), "a", &value
));
4879 // Without any promises from the user, range deletion via other `Write()`
4880 // APIs are still banned.
4881 ASSERT_OK(db
->Put(WriteOptions(), "a", "val"));
4882 ASSERT_NOK(db
->Write(WriteOptions(), &wb
));
4883 ASSERT_OK(db
->Get(ReadOptions(), "a", &value
));
4888 TEST_P(TransactionTest
, DeferSnapshotTest
) {
4889 WriteOptions write_options
;
4890 ReadOptions read_options
;
4894 s
= db
->Put(write_options
, "A", "a0");
4897 Transaction
* txn1
= db
->BeginTransaction(write_options
);
4898 Transaction
* txn2
= db
->BeginTransaction(write_options
);
4900 txn1
->SetSnapshotOnNextOperation();
4901 auto snapshot
= txn1
->GetSnapshot();
4902 ASSERT_FALSE(snapshot
);
4904 s
= txn2
->Put("A", "a2");
4910 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
4911 // Should not conflict with txn2 since snapshot wasn't set until
4912 // GetForUpdate was called.
4914 ASSERT_EQ("a2", value
);
4916 s
= txn1
->Put("A", "a1");
4919 s
= db
->Put(write_options
, "B", "b0");
4922 // Cannot lock B since it was written after the snapshot was set
4923 s
= txn1
->Put("B", "b1");
4924 ASSERT_TRUE(s
.IsBusy());
4930 s
= db
->Get(read_options
, "A", &value
);
4932 ASSERT_EQ("a1", value
);
4934 s
= db
->Get(read_options
, "B", &value
);
4936 ASSERT_EQ("b0", value
);
4939 TEST_P(TransactionTest
, DeferSnapshotTest2
) {
4940 WriteOptions write_options
;
4941 ReadOptions read_options
, snapshot_read_options
;
4945 Transaction
* txn1
= db
->BeginTransaction(write_options
);
4947 txn1
->SetSnapshot();
4949 s
= txn1
->Put("A", "a1");
4952 s
= db
->Put(write_options
, "C", "c0");
4954 s
= db
->Put(write_options
, "D", "d0");
4957 snapshot_read_options
.snapshot
= txn1
->GetSnapshot();
4959 txn1
->SetSnapshotOnNextOperation();
4961 s
= txn1
->Get(snapshot_read_options
, "C", &value
);
4962 // Snapshot was set before C was written
4963 ASSERT_TRUE(s
.IsNotFound());
4964 s
= txn1
->Get(snapshot_read_options
, "D", &value
);
4965 // Snapshot was set before D was written
4966 ASSERT_TRUE(s
.IsNotFound());
4968 // Snapshot should not have changed yet.
4969 snapshot_read_options
.snapshot
= txn1
->GetSnapshot();
4971 s
= txn1
->Get(snapshot_read_options
, "C", &value
);
4972 // Snapshot was set before C was written
4973 ASSERT_TRUE(s
.IsNotFound());
4974 s
= txn1
->Get(snapshot_read_options
, "D", &value
);
4975 // Snapshot was set before D was written
4976 ASSERT_TRUE(s
.IsNotFound());
4978 s
= txn1
->GetForUpdate(read_options
, "C", &value
);
4980 ASSERT_EQ("c0", value
);
4982 s
= db
->Put(write_options
, "D", "d00");
4985 // Snapshot is now set
4986 snapshot_read_options
.snapshot
= txn1
->GetSnapshot();
4987 s
= txn1
->Get(snapshot_read_options
, "D", &value
);
4989 ASSERT_EQ("d0", value
);
4996 TEST_P(TransactionTest
, DeferSnapshotSavePointTest
) {
4997 WriteOptions write_options
;
4998 ReadOptions read_options
, snapshot_read_options
;
5002 Transaction
* txn1
= db
->BeginTransaction(write_options
);
5004 txn1
->SetSavePoint(); // 1
5006 s
= db
->Put(write_options
, "T", "1");
5009 txn1
->SetSnapshotOnNextOperation();
5011 s
= db
->Put(write_options
, "T", "2");
5014 txn1
->SetSavePoint(); // 2
5016 s
= db
->Put(write_options
, "T", "3");
5019 s
= txn1
->Put("A", "a");
5022 txn1
->SetSavePoint(); // 3
5024 s
= db
->Put(write_options
, "T", "4");
5027 txn1
->SetSnapshot();
5028 txn1
->SetSnapshotOnNextOperation();
5030 txn1
->SetSavePoint(); // 4
5032 s
= db
->Put(write_options
, "T", "5");
5035 snapshot_read_options
.snapshot
= txn1
->GetSnapshot();
5036 s
= txn1
->Get(snapshot_read_options
, "T", &value
);
5038 ASSERT_EQ("4", value
);
5040 s
= txn1
->Put("A", "a1");
5043 snapshot_read_options
.snapshot
= txn1
->GetSnapshot();
5044 s
= txn1
->Get(snapshot_read_options
, "T", &value
);
5046 ASSERT_EQ("5", value
);
5048 s
= txn1
->RollbackToSavePoint(); // Rollback to 4
5051 snapshot_read_options
.snapshot
= txn1
->GetSnapshot();
5052 s
= txn1
->Get(snapshot_read_options
, "T", &value
);
5054 ASSERT_EQ("4", value
);
5056 s
= txn1
->RollbackToSavePoint(); // Rollback to 3
5059 snapshot_read_options
.snapshot
= txn1
->GetSnapshot();
5060 s
= txn1
->Get(snapshot_read_options
, "T", &value
);
5062 ASSERT_EQ("3", value
);
5064 s
= txn1
->Get(read_options
, "T", &value
);
5066 ASSERT_EQ("5", value
);
5068 s
= txn1
->RollbackToSavePoint(); // Rollback to 2
5071 snapshot_read_options
.snapshot
= txn1
->GetSnapshot();
5072 ASSERT_FALSE(snapshot_read_options
.snapshot
);
5073 s
= txn1
->Get(snapshot_read_options
, "T", &value
);
5075 ASSERT_EQ("5", value
);
5077 s
= txn1
->Delete("A");
5080 snapshot_read_options
.snapshot
= txn1
->GetSnapshot();
5081 ASSERT_TRUE(snapshot_read_options
.snapshot
);
5082 s
= txn1
->Get(snapshot_read_options
, "T", &value
);
5084 ASSERT_EQ("5", value
);
5086 s
= txn1
->RollbackToSavePoint(); // Rollback to 1
5089 s
= txn1
->Delete("A");
5092 snapshot_read_options
.snapshot
= txn1
->GetSnapshot();
5093 ASSERT_FALSE(snapshot_read_options
.snapshot
);
5094 s
= txn1
->Get(snapshot_read_options
, "T", &value
);
5096 ASSERT_EQ("5", value
);
5104 TEST_P(TransactionTest
, SetSnapshotOnNextOperationWithNotification
) {
5105 WriteOptions write_options
;
5106 ReadOptions read_options
;
5109 class Notifier
: public TransactionNotifier
{
5111 const Snapshot
** snapshot_ptr_
;
5114 explicit Notifier(const Snapshot
** snapshot_ptr
)
5115 : snapshot_ptr_(snapshot_ptr
) {}
5117 void SnapshotCreated(const Snapshot
* newSnapshot
) override
{
5118 *snapshot_ptr_
= newSnapshot
;
5122 std::shared_ptr
<Notifier
> notifier
=
5123 std::make_shared
<Notifier
>(&read_options
.snapshot
);
5126 s
= db
->Put(write_options
, "B", "0");
5129 Transaction
* txn1
= db
->BeginTransaction(write_options
);
5131 txn1
->SetSnapshotOnNextOperation(notifier
);
5132 ASSERT_FALSE(read_options
.snapshot
);
5134 s
= db
->Put(write_options
, "B", "1");
5137 // A Get does not generate the snapshot
5138 s
= txn1
->Get(read_options
, "B", &value
);
5140 ASSERT_FALSE(read_options
.snapshot
);
5141 ASSERT_EQ(value
, "1");
5143 // Any other operation does
5144 s
= txn1
->Put("A", "0");
5148 s
= db
->Put(write_options
, "B", "2");
5151 // The original value should still be read
5152 s
= txn1
->Get(read_options
, "B", &value
);
5154 ASSERT_TRUE(read_options
.snapshot
);
5155 ASSERT_EQ(value
, "1");
5163 TEST_P(TransactionTest
, ClearSnapshotTest
) {
5164 WriteOptions write_options
;
5165 ReadOptions read_options
, snapshot_read_options
;
5169 s
= db
->Put(write_options
, "foo", "0");
5172 Transaction
* txn
= db
->BeginTransaction(write_options
);
5175 s
= db
->Put(write_options
, "foo", "1");
5178 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
5179 ASSERT_FALSE(snapshot_read_options
.snapshot
);
5181 // No snapshot created yet
5182 s
= txn
->Get(snapshot_read_options
, "foo", &value
);
5183 ASSERT_EQ(value
, "1");
5186 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
5187 ASSERT_TRUE(snapshot_read_options
.snapshot
);
5189 s
= db
->Put(write_options
, "foo", "2");
5192 // Snapshot was created before change to '2'
5193 s
= txn
->Get(snapshot_read_options
, "foo", &value
);
5194 ASSERT_EQ(value
, "1");
5196 txn
->ClearSnapshot();
5197 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
5198 ASSERT_FALSE(snapshot_read_options
.snapshot
);
5200 // Snapshot has now been cleared
5201 s
= txn
->Get(snapshot_read_options
, "foo", &value
);
5202 ASSERT_EQ(value
, "2");
5210 TEST_P(TransactionTest
, ToggleAutoCompactionTest
) {
5213 ColumnFamilyHandle
*cfa
, *cfb
;
5214 ColumnFamilyOptions cf_options
;
5216 // Create 2 new column families
5217 s
= db
->CreateColumnFamily(cf_options
, "CFA", &cfa
);
5219 s
= db
->CreateColumnFamily(cf_options
, "CFB", &cfb
);
5226 // open DB with three column families
5227 std::vector
<ColumnFamilyDescriptor
> column_families
;
5228 // have to open default column family
5229 column_families
.push_back(
5230 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, ColumnFamilyOptions()));
5231 // open the new column families
5232 column_families
.push_back(
5233 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
5234 column_families
.push_back(
5235 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
5237 ColumnFamilyOptions
* cf_opt_default
= &column_families
[0].options
;
5238 ColumnFamilyOptions
* cf_opt_cfa
= &column_families
[1].options
;
5239 ColumnFamilyOptions
* cf_opt_cfb
= &column_families
[2].options
;
5240 cf_opt_default
->disable_auto_compactions
= false;
5241 cf_opt_cfa
->disable_auto_compactions
= true;
5242 cf_opt_cfb
->disable_auto_compactions
= false;
5244 std::vector
<ColumnFamilyHandle
*> handles
;
5246 s
= TransactionDB::Open(options
, txn_db_options
, dbname
, column_families
,
5250 auto cfh_default
= static_cast_with_check
<ColumnFamilyHandleImpl
>(handles
[0]);
5251 auto opt_default
= *cfh_default
->cfd()->GetLatestMutableCFOptions();
5253 auto cfh_a
= static_cast_with_check
<ColumnFamilyHandleImpl
>(handles
[1]);
5254 auto opt_a
= *cfh_a
->cfd()->GetLatestMutableCFOptions();
5256 auto cfh_b
= static_cast_with_check
<ColumnFamilyHandleImpl
>(handles
[2]);
5257 auto opt_b
= *cfh_b
->cfd()->GetLatestMutableCFOptions();
5259 ASSERT_EQ(opt_default
.disable_auto_compactions
, false);
5260 ASSERT_EQ(opt_a
.disable_auto_compactions
, true);
5261 ASSERT_EQ(opt_b
.disable_auto_compactions
, false);
5263 for (auto handle
: handles
) {
5268 TEST_P(TransactionStressTest
, ExpiredTransactionDataRace1
) {
5269 // In this test, txn1 should succeed committing,
5270 // as the callback is called after txn1 starts committing.
5271 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5272 {{"TransactionTest::ExpirableTransactionDataRace:1"}});
5273 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5274 "TransactionTest::ExpirableTransactionDataRace:1", [&](void* /*arg*/) {
5275 WriteOptions write_options
;
5276 TransactionOptions txn_options
;
5278 // Force txn1 to expire
5279 /* sleep override */
5280 std::this_thread::sleep_for(std::chrono::milliseconds(150));
5282 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
5284 s
= txn2
->Put("X", "2");
5285 ASSERT_TRUE(s
.IsTimedOut());
5291 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5293 WriteOptions write_options
;
5294 TransactionOptions txn_options
;
5296 txn_options
.expiration
= 100;
5297 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
5300 s
= txn1
->Put("X", "1");
5305 ReadOptions read_options
;
5307 s
= db
->Get(read_options
, "X", &value
);
5309 ASSERT_EQ("1", value
);
5312 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5315 #ifndef ROCKSDB_VALGRIND_RUN
5317 // cmt_delay_ms is the delay between prepare and commit
5318 // first_id is the id of the first transaction
5319 Status
TransactionStressTestInserter(
5320 TransactionDB
* db
, const size_t num_transactions
, const size_t num_sets
,
5321 const size_t num_keys_per_set
, Random64
* rand
,
5322 const uint64_t cmt_delay_ms
= 0, const uint64_t first_id
= 0) {
5323 WriteOptions write_options
;
5324 ReadOptions read_options
;
5325 TransactionOptions txn_options
;
5326 if (rand
->OneIn(2)) {
5327 txn_options
.use_only_the_last_commit_time_batch_for_recovery
= true;
5329 // Inside the inserter we might also retake the snapshot. We do both since two
5330 // separte functions are engaged for each.
5331 txn_options
.set_snapshot
= rand
->OneIn(2);
5333 RandomTransactionInserter
inserter(
5334 rand
, write_options
, read_options
, num_keys_per_set
,
5335 static_cast<uint16_t>(num_sets
), cmt_delay_ms
, first_id
);
5337 for (size_t t
= 0; t
< num_transactions
; t
++) {
5338 bool success
= inserter
.TransactionDBInsert(db
, txn_options
);
5340 // unexpected failure
5341 return inserter
.GetLastStatus();
5344 inserter
.GetLastStatus().PermitUncheckedError();
5346 // Make sure at least some of the transactions succeeded. It's ok if
5347 // some failed due to write-conflicts.
5348 if (num_transactions
!= 1 &&
5349 inserter
.GetFailureCount() > num_transactions
/ 2) {
5350 return Status::TryAgain("Too many transactions failed! " +
5351 std::to_string(inserter
.GetFailureCount()) + " / " +
5352 std::to_string(num_transactions
));
5355 return Status::OK();
5359 // Worker threads add a number to a key from each set of keys. The checker
5360 // threads verify that the sum of all keys in each set are equal.
5361 TEST_P(MySQLStyleTransactionTest
, TransactionStressTest
) {
5362 // Small write buffer to trigger more compactions
5363 options
.write_buffer_size
= 1024;
5364 ASSERT_OK(ReOpenNoDelete());
5365 constexpr size_t num_workers
= 4; // worker threads count
5366 constexpr size_t num_checkers
= 2; // checker threads count
5367 constexpr size_t num_slow_checkers
= 2; // checker threads emulating backups
5368 constexpr size_t num_slow_workers
= 1; // slow worker threads count
5369 constexpr size_t num_transactions_per_thread
= 10000;
5370 constexpr uint16_t num_sets
= 3;
5371 constexpr size_t num_keys_per_set
= 100;
5372 // Setting the key-space to be 100 keys should cause enough write-conflicts
5373 // to make this test interesting.
5375 std::vector
<port::Thread
> threads
;
5376 std::atomic
<uint32_t> finished
= {0};
5377 constexpr bool TAKE_SNAPSHOT
= true;
5378 uint64_t time_seed
= env
->NowMicros();
5379 printf("time_seed is %" PRIu64
"\n", time_seed
); // would help to reproduce
5381 std::function
<void()> call_inserter
= [&] {
5382 size_t thd_seed
= std::hash
<std::thread::id
>()(std::this_thread::get_id());
5383 Random64
rand(time_seed
* thd_seed
);
5384 ASSERT_OK(TransactionStressTestInserter(db
, num_transactions_per_thread
,
5385 num_sets
, num_keys_per_set
, &rand
));
5388 std::function
<void()> call_checker
= [&] {
5389 size_t thd_seed
= std::hash
<std::thread::id
>()(std::this_thread::get_id());
5390 Random64
rand(time_seed
* thd_seed
);
5391 // Verify that data is consistent
5392 while (finished
< num_workers
) {
5393 ASSERT_OK(RandomTransactionInserter::Verify(
5394 db
, num_sets
, num_keys_per_set
, TAKE_SNAPSHOT
, &rand
));
5397 std::function
<void()> call_slow_checker
= [&] {
5398 size_t thd_seed
= std::hash
<std::thread::id
>()(std::this_thread::get_id());
5399 Random64
rand(time_seed
* thd_seed
);
5400 // Verify that data is consistent
5401 while (finished
< num_workers
) {
5402 uint64_t delay_ms
= rand
.Uniform(100) + 1;
5403 Status s
= RandomTransactionInserter::Verify(
5404 db
, num_sets
, num_keys_per_set
, TAKE_SNAPSHOT
, &rand
, delay_ms
);
5408 std::function
<void()> call_slow_inserter
= [&] {
5409 size_t thd_seed
= std::hash
<std::thread::id
>()(std::this_thread::get_id());
5410 Random64
rand(time_seed
* thd_seed
);
5412 // Verify that data is consistent
5413 while (finished
< num_workers
) {
5414 uint64_t delay_ms
= rand
.Uniform(500) + 1;
5415 ASSERT_OK(TransactionStressTestInserter(db
, 1, num_sets
, num_keys_per_set
,
5416 &rand
, delay_ms
, id
++));
5420 for (uint32_t i
= 0; i
< num_workers
; i
++) {
5421 threads
.emplace_back(call_inserter
);
5423 for (uint32_t i
= 0; i
< num_checkers
; i
++) {
5424 threads
.emplace_back(call_checker
);
5426 if (with_slow_threads_
) {
5427 for (uint32_t i
= 0; i
< num_slow_checkers
; i
++) {
5428 threads
.emplace_back(call_slow_checker
);
5430 for (uint32_t i
= 0; i
< num_slow_workers
; i
++) {
5431 threads
.emplace_back(call_slow_inserter
);
5435 // Wait for all threads to finish
5436 for (auto& t
: threads
) {
5440 // Verify that data is consistent
5441 Status s
= RandomTransactionInserter::Verify(db
, num_sets
, num_keys_per_set
,
5445 #endif // ROCKSDB_VALGRIND_RUN
5447 TEST_P(TransactionTest
, MemoryLimitTest
) {
5448 TransactionOptions txn_options
;
5449 // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data.
5450 txn_options
.max_write_batch_size
= 29;
5451 // Set threshold to unlimited so that the write batch does not get flushed,
5452 // and can hit the memory limit.
5453 txn_options
.write_batch_flush_threshold
= 0;
5457 Transaction
* txn
= db
->BeginTransaction(WriteOptions(), txn_options
);
5460 ASSERT_EQ(0, txn
->GetNumPuts());
5461 ASSERT_LE(0, txn
->GetID());
5463 s
= txn
->Put(Slice("a"), Slice("...."));
5465 ASSERT_EQ(1, txn
->GetNumPuts());
5467 s
= txn
->Put(Slice("b"), Slice("...."));
5469 ASSERT_EQ(2, txn
->GetNumPuts());
5471 s
= txn
->Put(Slice("b"), Slice("...."));
5472 ASSERT_TRUE(s
.IsMemoryLimit());
5473 ASSERT_EQ(2, txn
->GetNumPuts());
5475 ASSERT_OK(txn
->Rollback());
5479 // This test clarifies the existing expectation from the sequence number
5480 // algorithm. It could detect mistakes in updating the code but it is not
5481 // necessarily the one acceptable way. If the algorithm is legitimately changed,
5482 // this unit test should be updated as well.
5483 TEST_P(TransactionStressTest
, SeqAdvanceTest
) {
5484 // TODO(myabandeh): must be test with false before new releases
5485 const bool short_test
= true;
5489 options
.disable_auto_compactions
= true;
5490 ASSERT_OK(ReOpen());
5492 // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some
5493 // of the branches. This is the same as counting a binary number where i-th
5494 // bit represents whether we take branch i in the represented by the number.
5495 const size_t NUM_BRANCHES
= short_test
? 6 : 10;
5496 // Helper function that shows if the branch is to be taken in the run
5497 // represented by the number n.
5498 auto branch_do
= [&](size_t n
, size_t* branch
) {
5499 assert(*branch
< NUM_BRANCHES
);
5500 const size_t filter
= static_cast<size_t>(1) << *branch
;
5503 const size_t max_n
= static_cast<size_t>(1) << NUM_BRANCHES
;
5504 for (size_t n
= 0; n
< max_n
; n
++) {
5505 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
5507 auto seq
= db_impl
->GetLatestSequenceNumber();
5510 seq
= db_impl
->TEST_GetLastVisibleSequence();
5511 ASSERT_EQ(exp_seq
, seq
);
5513 if (branch_do(n
, &branch
)) {
5514 ASSERT_OK(db_impl
->Flush(fopt
));
5515 seq
= db_impl
->TEST_GetLastVisibleSequence();
5516 ASSERT_EQ(exp_seq
, seq
);
5518 if (!short_test
&& branch_do(n
, &branch
)) {
5519 ASSERT_OK(db_impl
->FlushWAL(true));
5520 ASSERT_OK(ReOpenNoDelete());
5521 db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
5522 seq
= db_impl
->GetLatestSequenceNumber();
5523 ASSERT_EQ(exp_seq
, seq
);
5526 // Doing it twice might detect some bugs
5528 seq
= db_impl
->TEST_GetLastVisibleSequence();
5529 ASSERT_EQ(exp_seq
, seq
);
5532 seq
= db_impl
->TEST_GetLastVisibleSequence();
5533 ASSERT_EQ(exp_seq
, seq
);
5535 if (branch_do(n
, &branch
)) {
5536 ASSERT_OK(db_impl
->Flush(fopt
));
5537 seq
= db_impl
->TEST_GetLastVisibleSequence();
5538 ASSERT_EQ(exp_seq
, seq
);
5540 if (!short_test
&& branch_do(n
, &branch
)) {
5541 ASSERT_OK(db_impl
->FlushWAL(true));
5542 ASSERT_OK(ReOpenNoDelete());
5543 db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
5544 seq
= db_impl
->GetLatestSequenceNumber();
5545 ASSERT_EQ(exp_seq
, seq
);
5549 seq
= db_impl
->TEST_GetLastVisibleSequence();
5550 ASSERT_EQ(exp_seq
, seq
);
5552 if (branch_do(n
, &branch
)) {
5553 ASSERT_OK(db_impl
->Flush(fopt
));
5554 seq
= db_impl
->TEST_GetLastVisibleSequence();
5555 ASSERT_EQ(exp_seq
, seq
);
5557 if (!short_test
&& branch_do(n
, &branch
)) {
5558 ASSERT_OK(db_impl
->FlushWAL(true));
5559 ASSERT_OK(ReOpenNoDelete());
5560 db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
5561 seq
= db_impl
->GetLatestSequenceNumber();
5562 ASSERT_EQ(exp_seq
, seq
);
5566 seq
= db_impl
->TEST_GetLastVisibleSequence();
5568 ASSERT_EQ(exp_seq
, seq
);
5570 if (branch_do(n
, &branch
)) {
5571 ASSERT_OK(db_impl
->Flush(fopt
));
5572 seq
= db_impl
->TEST_GetLastVisibleSequence();
5573 ASSERT_EQ(exp_seq
, seq
);
5575 if (!short_test
&& branch_do(n
, &branch
)) {
5576 ASSERT_OK(db_impl
->FlushWAL(true));
5577 ASSERT_OK(ReOpenNoDelete());
5578 db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
5579 seq
= db_impl
->GetLatestSequenceNumber();
5580 ASSERT_EQ(exp_seq
, seq
);
5584 seq
= db_impl
->TEST_GetLastVisibleSequence();
5585 ASSERT_EQ(exp_seq
, seq
);
5587 if (branch_do(n
, &branch
)) {
5588 ASSERT_OK(db_impl
->Flush(fopt
));
5589 seq
= db_impl
->TEST_GetLastVisibleSequence();
5590 ASSERT_EQ(exp_seq
, seq
);
5592 if (!short_test
&& branch_do(n
, &branch
)) {
5593 ASSERT_OK(db_impl
->FlushWAL(true));
5594 ASSERT_OK(ReOpenNoDelete());
5595 db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
5596 seq
= db_impl
->GetLatestSequenceNumber();
5597 ASSERT_EQ(exp_seq
, seq
);
5599 ASSERT_OK(ReOpen());
5603 // Verify that the optimization would not compromize the correctness
5604 TEST_P(TransactionTest
, Optimizations
) {
5605 size_t comb_cnt
= size_t(1) << 2; // 2 is number of optimization vars
5606 for (size_t new_comb
= 0; new_comb
< comb_cnt
; new_comb
++) {
5607 TransactionDBWriteOptimizations optimizations
;
5608 optimizations
.skip_concurrency_control
= IsInCombination(0, new_comb
);
5609 optimizations
.skip_duplicate_key_check
= IsInCombination(1, new_comb
);
5611 ASSERT_OK(ReOpen());
5612 WriteOptions write_options
;
5614 ASSERT_OK(batch
.Put(Slice("k"), Slice("v1")));
5615 ASSERT_OK(db
->Write(write_options
, &batch
));
5618 PinnableSlice pinnable_val
;
5619 ASSERT_OK(db
->Get(ropt
, db
->DefaultColumnFamily(), "k", &pinnable_val
));
5620 ASSERT_TRUE(pinnable_val
== ("v1"));
5624 // A comparator that uses only the first three bytes
5625 class ThreeBytewiseComparator
: public Comparator
{
5627 ThreeBytewiseComparator() {}
5628 const char* Name() const override
{ return "test.ThreeBytewiseComparator"; }
5629 int Compare(const Slice
& a
, const Slice
& b
) const override
{
5630 Slice na
= Slice(a
.data(), a
.size() < 3 ? a
.size() : 3);
5631 Slice nb
= Slice(b
.data(), b
.size() < 3 ? b
.size() : 3);
5632 return na
.compare(nb
);
5634 bool Equal(const Slice
& a
, const Slice
& b
) const override
{
5635 Slice na
= Slice(a
.data(), a
.size() < 3 ? a
.size() : 3);
5636 Slice nb
= Slice(b
.data(), b
.size() < 3 ? b
.size() : 3);
5639 // These methods below don't seem relevant to this test. Implement them if
5640 // proven othersize.
5641 void FindShortestSeparator(std::string
* start
,
5642 const Slice
& limit
) const override
{
5643 const Comparator
* bytewise_comp
= BytewiseComparator();
5644 bytewise_comp
->FindShortestSeparator(start
, limit
);
5646 void FindShortSuccessor(std::string
* key
) const override
{
5647 const Comparator
* bytewise_comp
= BytewiseComparator();
5648 bytewise_comp
->FindShortSuccessor(key
);
5652 #ifndef ROCKSDB_VALGRIND_RUN
5653 TEST_P(TransactionTest
, GetWithoutSnapshot
) {
5654 WriteOptions write_options
;
5655 std::atomic
<bool> finish
= {false};
5656 ASSERT_OK(db
->Put(write_options
, "key", "value"));
5657 ROCKSDB_NAMESPACE::port::Thread
commit_thread([&]() {
5658 for (int i
= 0; i
< 100; i
++) {
5659 TransactionOptions txn_options
;
5660 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
5661 ASSERT_OK(txn
->SetName("xid"));
5662 ASSERT_OK(txn
->Put("key", "overridedvalue"));
5663 ASSERT_OK(txn
->Put("key", "value"));
5664 ASSERT_OK(txn
->Prepare());
5665 ASSERT_OK(txn
->Commit());
5670 ROCKSDB_NAMESPACE::port::Thread
read_thread([&]() {
5673 PinnableSlice pinnable_val
;
5674 ASSERT_OK(db
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
));
5675 ASSERT_TRUE(pinnable_val
== ("value"));
5678 commit_thread
.join();
5681 #endif // ROCKSDB_VALGRIND_RUN
5683 // Test that the transactional db can handle duplicate keys in the write batch
5684 TEST_P(TransactionTest
, DuplicateKeys
) {
5685 ColumnFamilyOptions cf_options
;
5686 std::string cf_name
= "two";
5687 ColumnFamilyHandle
* cf_handle
= nullptr;
5689 ASSERT_OK(db
->CreateColumnFamily(cf_options
, cf_name
, &cf_handle
));
5690 WriteOptions write_options
;
5692 ASSERT_OK(batch
.Put(Slice("key"), Slice("value")));
5693 ASSERT_OK(batch
.Put(Slice("key2"), Slice("value2")));
5694 // duplicate the keys
5695 ASSERT_OK(batch
.Put(Slice("key"), Slice("value3")));
5696 // duplicate the 2nd key. It should not be counted duplicate since a
5697 // sub-patch is cut after the last duplicate.
5698 ASSERT_OK(batch
.Put(Slice("key2"), Slice("value4")));
5699 // duplicate the keys but in a different cf. It should not be counted as
5701 ASSERT_OK(batch
.Put(cf_handle
, Slice("key"), Slice("value5")));
5703 ASSERT_OK(db
->Write(write_options
, &batch
));
5706 PinnableSlice pinnable_val
;
5707 auto s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "key", &pinnable_val
);
5709 ASSERT_TRUE(pinnable_val
== ("value3"));
5710 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "key2", &pinnable_val
);
5712 ASSERT_TRUE(pinnable_val
== ("value4"));
5713 s
= db
->Get(ropt
, cf_handle
, "key", &pinnable_val
);
5715 ASSERT_TRUE(pinnable_val
== ("value5"));
5720 // Test with non-bytewise comparator
5722 ASSERT_OK(ReOpen());
5723 std::unique_ptr
<const Comparator
> comp_gc(new ThreeBytewiseComparator());
5724 cf_options
.comparator
= comp_gc
.get();
5725 ASSERT_OK(db
->CreateColumnFamily(cf_options
, cf_name
, &cf_handle
));
5726 WriteOptions write_options
;
5728 ASSERT_OK(batch
.Put(cf_handle
, Slice("key"), Slice("value")));
5729 // The first three bytes are the same, do it must be counted as duplicate
5730 ASSERT_OK(batch
.Put(cf_handle
, Slice("key2"), Slice("value2")));
5731 // check for 2nd duplicate key in cf with non-default comparator
5732 ASSERT_OK(batch
.Put(cf_handle
, Slice("key2b"), Slice("value2b")));
5733 ASSERT_OK(db
->Write(write_options
, &batch
));
5735 // The value must be the most recent value for all the keys equal to "key",
5738 PinnableSlice pinnable_val
;
5739 ASSERT_OK(db
->Get(ropt
, cf_handle
, "key", &pinnable_val
));
5740 ASSERT_TRUE(pinnable_val
== ("value2b"));
5742 // Test duplicate keys with rollback
5743 TransactionOptions txn_options
;
5744 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
5745 ASSERT_OK(txn0
->SetName("xid"));
5746 ASSERT_OK(txn0
->Put(cf_handle
, Slice("key3"), Slice("value3")));
5747 ASSERT_OK(txn0
->Merge(cf_handle
, Slice("key4"), Slice("value4")));
5748 ASSERT_OK(txn0
->Rollback());
5749 ASSERT_OK(db
->Get(ropt
, cf_handle
, "key5", &pinnable_val
));
5750 ASSERT_TRUE(pinnable_val
== ("value2b"));
5754 cf_options
.comparator
= BytewiseComparator();
5757 for (bool do_prepare
: {true, false}) {
5758 for (bool do_rollback
: {true, false}) {
5759 for (bool with_commit_batch
: {true, false}) {
5760 if (with_commit_batch
&& !do_prepare
) {
5763 if (with_commit_batch
&& do_rollback
) {
5766 ASSERT_OK(ReOpen());
5767 ASSERT_OK(db
->CreateColumnFamily(cf_options
, cf_name
, &cf_handle
));
5768 TransactionOptions txn_options
;
5769 txn_options
.use_only_the_last_commit_time_batch_for_recovery
= false;
5770 WriteOptions write_options
;
5771 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
5772 auto s
= txn0
->SetName("xid");
5774 s
= txn0
->Put(Slice("foo0"), Slice("bar0a"));
5776 s
= txn0
->Put(Slice("foo0"), Slice("bar0b"));
5778 s
= txn0
->Put(Slice("foo1"), Slice("bar1"));
5780 s
= txn0
->Merge(Slice("foo2"), Slice("bar2a"));
5782 // Repeat a key after the start of a sub-patch. This should not cause a
5783 // duplicate in the most recent sub-patch and hence not creating a new
5785 s
= txn0
->Put(Slice("foo0"), Slice("bar0c"));
5787 s
= txn0
->Merge(Slice("foo2"), Slice("bar2b"));
5789 // duplicate the keys but in a different cf. It should not be counted as
5791 s
= txn0
->Put(cf_handle
, Slice("foo0"), Slice("bar0-cf1"));
5793 s
= txn0
->Put(Slice("foo3"), Slice("bar3"));
5795 s
= txn0
->Merge(Slice("foo3"), Slice("bar3"));
5797 s
= txn0
->Put(Slice("foo4"), Slice("bar4"));
5799 s
= txn0
->Delete(Slice("foo4"));
5801 s
= txn0
->SingleDelete(Slice("foo4"));
5804 s
= txn0
->Prepare();
5808 // Test rolling back the batch with duplicates
5809 s
= txn0
->Rollback();
5812 if (with_commit_batch
) {
5814 auto cb
= txn0
->GetCommitTimeWriteBatch();
5815 // duplicate a key in the original batch
5816 // TODO(myabandeh): the behavior of GetCommitTimeWriteBatch
5817 // conflicting with the prepared batch is currently undefined and
5818 // gives different results in different implementations.
5820 // s = cb->Put(Slice("foo0"), Slice("bar0d"));
5822 // add a new duplicate key
5823 s
= cb
->Put(Slice("foo6"), Slice("bar6a"));
5825 s
= cb
->Put(Slice("foo6"), Slice("bar6b"));
5827 // add a duplicate key that is removed in the same batch
5828 s
= cb
->Put(Slice("foo7"), Slice("bar7a"));
5830 s
= cb
->Delete(Slice("foo7"));
5838 PinnableSlice pinnable_val
;
5841 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0", &pinnable_val
);
5842 ASSERT_TRUE(s
.IsNotFound());
5843 s
= db
->Get(ropt
, cf_handle
, "foo0", &pinnable_val
);
5844 ASSERT_TRUE(s
.IsNotFound());
5845 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo1", &pinnable_val
);
5846 ASSERT_TRUE(s
.IsNotFound());
5847 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo2", &pinnable_val
);
5848 ASSERT_TRUE(s
.IsNotFound());
5849 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo3", &pinnable_val
);
5850 ASSERT_TRUE(s
.IsNotFound());
5851 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo4", &pinnable_val
);
5852 ASSERT_TRUE(s
.IsNotFound());
5854 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0", &pinnable_val
);
5856 ASSERT_TRUE(pinnable_val
== ("bar0c"));
5857 s
= db
->Get(ropt
, cf_handle
, "foo0", &pinnable_val
);
5859 ASSERT_TRUE(pinnable_val
== ("bar0-cf1"));
5860 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo1", &pinnable_val
);
5862 ASSERT_TRUE(pinnable_val
== ("bar1"));
5863 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo2", &pinnable_val
);
5865 ASSERT_TRUE(pinnable_val
== ("bar2a,bar2b"));
5866 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo3", &pinnable_val
);
5868 ASSERT_TRUE(pinnable_val
== ("bar3,bar3"));
5869 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo4", &pinnable_val
);
5870 ASSERT_TRUE(s
.IsNotFound());
5871 if (with_commit_batch
) {
5872 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo6", &pinnable_val
);
5874 ASSERT_TRUE(pinnable_val
== ("bar6b"));
5875 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo7", &pinnable_val
);
5876 ASSERT_TRUE(s
.IsNotFound());
5880 } // with_commit_batch
5884 if (!options
.unordered_write
) {
5885 // Also test with max_successive_merges > 0. max_successive_merges will not
5886 // affect our algorithm for duplicate key insertion but we add the test to
5888 cf_options
.max_successive_merges
= 2;
5889 cf_options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
5890 ASSERT_OK(ReOpen());
5891 db
->CreateColumnFamily(cf_options
, cf_name
, &cf_handle
);
5892 WriteOptions write_options
;
5893 // Ensure one value for the key
5894 ASSERT_OK(db
->Put(write_options
, cf_handle
, Slice("key"), Slice("value")));
5896 // Merge more than max_successive_merges times
5897 ASSERT_OK(batch
.Merge(cf_handle
, Slice("key"), Slice("1")));
5898 ASSERT_OK(batch
.Merge(cf_handle
, Slice("key"), Slice("2")));
5899 ASSERT_OK(batch
.Merge(cf_handle
, Slice("key"), Slice("3")));
5900 ASSERT_OK(batch
.Merge(cf_handle
, Slice("key"), Slice("4")));
5901 ASSERT_OK(db
->Write(write_options
, &batch
));
5902 ReadOptions read_options
;
5904 ASSERT_OK(db
->Get(read_options
, cf_handle
, "key", &value
));
5905 ASSERT_EQ(value
, "value,1,2,3,4");
5910 // Test that the duplicate detection is not compromised after rolling back
5912 TransactionOptions txn_options
;
5913 WriteOptions write_options
;
5914 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
5915 ASSERT_OK(txn0
->Put(Slice("foo0"), Slice("bar0a")));
5916 ASSERT_OK(txn0
->Put(Slice("foo0"), Slice("bar0b")));
5917 txn0
->SetSavePoint();
5918 ASSERT_OK(txn0
->RollbackToSavePoint());
5919 ASSERT_OK(txn0
->Commit());
5923 // Test sucessfull recovery after a crash
5925 ASSERT_OK(ReOpen());
5926 TransactionOptions txn_options
;
5927 WriteOptions write_options
;
5930 PinnableSlice pinnable_val
;
5933 std::unique_ptr
<const Comparator
> comp_gc(new ThreeBytewiseComparator());
5934 cf_options
.comparator
= comp_gc
.get();
5935 cf_options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
5936 ASSERT_OK(db
->CreateColumnFamily(cf_options
, cf_name
, &cf_handle
));
5938 std::vector
<ColumnFamilyDescriptor
> cfds
{
5939 ColumnFamilyDescriptor(kDefaultColumnFamilyName
,
5940 ColumnFamilyOptions(options
)),
5941 ColumnFamilyDescriptor(cf_name
, cf_options
),
5943 std::vector
<ColumnFamilyHandle
*> handles
;
5944 ASSERT_OK(ReOpenNoDelete(cfds
, &handles
));
5946 ASSERT_OK(db
->Put(write_options
, "foo0", "init"));
5947 ASSERT_OK(db
->Put(write_options
, "foo1", "init"));
5948 ASSERT_OK(db
->Put(write_options
, handles
[1], "foo0", "init"));
5949 ASSERT_OK(db
->Put(write_options
, handles
[1], "foo1", "init"));
5952 txn0
= db
->BeginTransaction(write_options
, txn_options
);
5953 ASSERT_OK(txn0
->SetName("xid"));
5954 ASSERT_OK(txn0
->Put(Slice("foo0"), Slice("bar0a")));
5955 ASSERT_OK(txn0
->Prepare());
5957 // This will check the asserts inside recovery code
5958 ASSERT_OK(db
->FlushWAL(true));
5959 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
5960 ASSERT_OK(ReOpenNoDelete(cfds
, &handles
));
5961 txn0
= db
->GetTransactionByName("xid");
5962 ASSERT_TRUE(txn0
!= nullptr);
5963 ASSERT_OK(txn0
->Commit());
5965 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0", &pinnable_val
);
5967 ASSERT_TRUE(pinnable_val
== ("bar0a"));
5969 // two entries, no duplicate
5970 txn0
= db
->BeginTransaction(write_options
, txn_options
);
5971 ASSERT_OK(txn0
->SetName("xid"));
5972 ASSERT_OK(txn0
->Put(handles
[1], Slice("foo0"), Slice("bar0b")));
5973 ASSERT_OK(txn0
->Put(handles
[1], Slice("fol1"), Slice("bar1b")));
5974 ASSERT_OK(txn0
->Put(Slice("foo0"), Slice("bar0b")));
5975 ASSERT_OK(txn0
->Put(Slice("foo1"), Slice("bar1b")));
5976 ASSERT_OK(txn0
->Prepare());
5978 // This will check the asserts inside recovery code
5979 ASSERT_OK(db
->FlushWAL(true));
5981 ASSERT_OK(static_cast_with_check
<DBImpl
>(db
->GetRootDB())
5982 ->TEST_FlushMemTable(true, false, handles
[1]));
5983 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
5984 ASSERT_OK(ReOpenNoDelete(cfds
, &handles
));
5985 txn0
= db
->GetTransactionByName("xid");
5986 ASSERT_TRUE(txn0
!= nullptr);
5987 ASSERT_OK(txn0
->Commit());
5989 pinnable_val
.Reset();
5990 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0", &pinnable_val
);
5992 ASSERT_TRUE(pinnable_val
== ("bar0b"));
5993 pinnable_val
.Reset();
5994 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo1", &pinnable_val
);
5996 ASSERT_TRUE(pinnable_val
== ("bar1b"));
5997 pinnable_val
.Reset();
5998 s
= db
->Get(ropt
, handles
[1], "foo0", &pinnable_val
);
6000 ASSERT_TRUE(pinnable_val
== ("bar0b"));
6001 pinnable_val
.Reset();
6002 s
= db
->Get(ropt
, handles
[1], "fol1", &pinnable_val
);
6004 ASSERT_TRUE(pinnable_val
== ("bar1b"));
6006 // one duplicate with ::Put
6007 txn0
= db
->BeginTransaction(write_options
, txn_options
);
6008 ASSERT_OK(txn0
->SetName("xid"));
6009 ASSERT_OK(txn0
->Put(handles
[1], Slice("key-nonkey0"), Slice("bar0c")));
6010 ASSERT_OK(txn0
->Put(handles
[1], Slice("key-nonkey1"), Slice("bar1d")));
6011 ASSERT_OK(txn0
->Put(Slice("foo0"), Slice("bar0c")));
6012 ASSERT_OK(txn0
->Put(Slice("foo1"), Slice("bar1c")));
6013 ASSERT_OK(txn0
->Put(Slice("foo0"), Slice("bar0d")));
6014 ASSERT_OK(txn0
->Prepare());
6016 // This will check the asserts inside recovery code
6017 ASSERT_OK(db
->FlushWAL(true));
6019 ASSERT_OK(static_cast_with_check
<DBImpl
>(db
->GetRootDB())
6020 ->TEST_FlushMemTable(true, false, handles
[1]));
6021 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
6022 ASSERT_OK(ReOpenNoDelete(cfds
, &handles
));
6023 txn0
= db
->GetTransactionByName("xid");
6024 ASSERT_TRUE(txn0
!= nullptr);
6025 ASSERT_OK(txn0
->Commit());
6027 pinnable_val
.Reset();
6028 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0", &pinnable_val
);
6030 ASSERT_TRUE(pinnable_val
== ("bar0d"));
6031 pinnable_val
.Reset();
6032 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo1", &pinnable_val
);
6034 ASSERT_TRUE(pinnable_val
== ("bar1c"));
6035 pinnable_val
.Reset();
6036 s
= db
->Get(ropt
, handles
[1], "key-nonkey2", &pinnable_val
);
6038 ASSERT_TRUE(pinnable_val
== ("bar1d"));
6040 // Duplicate with ::Put, ::Delete
6041 txn0
= db
->BeginTransaction(write_options
, txn_options
);
6042 ASSERT_OK(txn0
->SetName("xid"));
6043 ASSERT_OK(txn0
->Put(handles
[1], Slice("key-nonkey0"), Slice("bar0e")));
6044 ASSERT_OK(txn0
->Delete(handles
[1], Slice("key-nonkey1")));
6045 ASSERT_OK(txn0
->Put(Slice("foo0"), Slice("bar0e")));
6046 ASSERT_OK(txn0
->Delete(Slice("foo0")));
6047 ASSERT_OK(txn0
->Prepare());
6049 // This will check the asserts inside recovery code
6050 ASSERT_OK(db
->FlushWAL(true));
6052 ASSERT_OK(static_cast_with_check
<DBImpl
>(db
->GetRootDB())
6053 ->TEST_FlushMemTable(true, false, handles
[1]));
6054 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
6055 ASSERT_OK(ReOpenNoDelete(cfds
, &handles
));
6056 txn0
= db
->GetTransactionByName("xid");
6057 ASSERT_TRUE(txn0
!= nullptr);
6058 ASSERT_OK(txn0
->Commit());
6060 pinnable_val
.Reset();
6061 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0", &pinnable_val
);
6062 ASSERT_TRUE(s
.IsNotFound());
6063 pinnable_val
.Reset();
6064 s
= db
->Get(ropt
, handles
[1], "key-nonkey2", &pinnable_val
);
6065 ASSERT_TRUE(s
.IsNotFound());
6067 // Duplicate with ::Put, ::SingleDelete
6068 txn0
= db
->BeginTransaction(write_options
, txn_options
);
6069 ASSERT_OK(txn0
->SetName("xid"));
6070 ASSERT_OK(txn0
->Put(handles
[1], Slice("key-nonkey0"), Slice("bar0g")));
6071 ASSERT_OK(txn0
->SingleDelete(handles
[1], Slice("key-nonkey1")));
6072 ASSERT_OK(txn0
->Put(Slice("foo0"), Slice("bar0e")));
6073 ASSERT_OK(txn0
->SingleDelete(Slice("foo0")));
6074 ASSERT_OK(txn0
->Prepare());
6076 // This will check the asserts inside recovery code
6077 ASSERT_OK(db
->FlushWAL(true));
6079 ASSERT_OK(static_cast_with_check
<DBImpl
>(db
->GetRootDB())
6080 ->TEST_FlushMemTable(true, false, handles
[1]));
6081 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
6082 ASSERT_OK(ReOpenNoDelete(cfds
, &handles
));
6083 txn0
= db
->GetTransactionByName("xid");
6084 ASSERT_TRUE(txn0
!= nullptr);
6085 ASSERT_OK(txn0
->Commit());
6087 pinnable_val
.Reset();
6088 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0", &pinnable_val
);
6089 ASSERT_TRUE(s
.IsNotFound());
6090 pinnable_val
.Reset();
6091 s
= db
->Get(ropt
, handles
[1], "key-nonkey2", &pinnable_val
);
6092 ASSERT_TRUE(s
.IsNotFound());
6094 // Duplicate with ::Put, ::Merge
6095 txn0
= db
->BeginTransaction(write_options
, txn_options
);
6096 ASSERT_OK(txn0
->SetName("xid"));
6097 ASSERT_OK(txn0
->Put(handles
[1], Slice("key-nonkey0"), Slice("bar1i")));
6098 ASSERT_OK(txn0
->Merge(handles
[1], Slice("key-nonkey1"), Slice("bar1j")));
6099 ASSERT_OK(txn0
->Put(Slice("foo0"), Slice("bar0f")));
6100 ASSERT_OK(txn0
->Merge(Slice("foo0"), Slice("bar0g")));
6101 ASSERT_OK(txn0
->Prepare());
6103 // This will check the asserts inside recovery code
6104 ASSERT_OK(db
->FlushWAL(true));
6106 ASSERT_OK(static_cast_with_check
<DBImpl
>(db
->GetRootDB())
6107 ->TEST_FlushMemTable(true, false, handles
[1]));
6108 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
6109 ASSERT_OK(ReOpenNoDelete(cfds
, &handles
));
6110 txn0
= db
->GetTransactionByName("xid");
6111 ASSERT_TRUE(txn0
!= nullptr);
6112 ASSERT_OK(txn0
->Commit());
6114 pinnable_val
.Reset();
6115 s
= db
->Get(ropt
, db
->DefaultColumnFamily(), "foo0", &pinnable_val
);
6117 ASSERT_TRUE(pinnable_val
== ("bar0f,bar0g"));
6118 pinnable_val
.Reset();
6119 s
= db
->Get(ropt
, handles
[1], "key-nonkey2", &pinnable_val
);
6121 ASSERT_TRUE(pinnable_val
== ("bar1i,bar1j"));
6123 for (auto h
: handles
) {
6131 // Test that the reseek optimization in iterators will not result in an infinite
6132 // loop if there are too many uncommitted entries before the snapshot.
6133 TEST_P(TransactionTest
, ReseekOptimization
) {
6134 WriteOptions write_options
;
6135 write_options
.sync
= true;
6136 write_options
.disableWAL
= false;
6137 ColumnFamilyDescriptor cfd
;
6138 ASSERT_OK(db
->DefaultColumnFamily()->GetDescriptor(&cfd
));
6139 auto max_skip
= cfd
.options
.max_sequential_skip_in_iterations
;
6141 ASSERT_OK(db
->Put(write_options
, Slice("foo0"), Slice("initv")));
6143 TransactionOptions txn_options
;
6144 Transaction
* txn0
= db
->BeginTransaction(write_options
, txn_options
);
6145 ASSERT_OK(txn0
->SetName("xid"));
6146 // Duplicate keys will result into separate sequence numbers in WritePrepared
6147 // and WriteUnPrepared
6148 for (size_t i
= 0; i
< 2 * max_skip
; i
++) {
6149 ASSERT_OK(txn0
->Put(Slice("foo1"), Slice("bar")));
6151 ASSERT_OK(txn0
->Prepare());
6152 ASSERT_OK(db
->Put(write_options
, Slice("foo2"), Slice("initv")));
6154 ReadOptions read_options
;
6156 read_options
.max_skippable_internal_keys
= 10 * max_skip
;
6157 Iterator
* iter
= db
->NewIterator(read_options
);
6158 ASSERT_OK(iter
->status());
6160 iter
->SeekToFirst();
6161 while (iter
->Valid()) {
6163 ASSERT_OK(iter
->status());
6169 while (iter
->Valid()) {
6171 ASSERT_OK(iter
->status());
6176 ASSERT_OK(txn0
->Rollback());
6180 // After recovery in kPointInTimeRecovery mode, the corrupted log file remains
6181 // there. The new log files should be still read succesfully during recovery of
6183 TEST_P(TransactionTest
, DoubleCrashInRecovery
) {
6184 for (const bool manual_wal_flush
: {false, true}) {
6185 for (const bool write_after_recovery
: {false, true}) {
6186 options
.wal_recovery_mode
= WALRecoveryMode::kPointInTimeRecovery
;
6187 options
.manual_wal_flush
= manual_wal_flush
;
6188 ASSERT_OK(ReOpen());
6189 std::string cf_name
= "two";
6190 ColumnFamilyOptions cf_options
;
6191 ColumnFamilyHandle
* cf_handle
= nullptr;
6192 ASSERT_OK(db
->CreateColumnFamily(cf_options
, cf_name
, &cf_handle
));
6194 // Add a prepare entry to prevent the older logs from being deleted.
6195 WriteOptions write_options
;
6196 TransactionOptions txn_options
;
6197 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
6198 ASSERT_OK(txn
->SetName("xid"));
6199 ASSERT_OK(txn
->Put(Slice("foo-prepare"), Slice("bar-prepare")));
6200 ASSERT_OK(txn
->Prepare());
6202 FlushOptions flush_ops
;
6203 ASSERT_OK(db
->Flush(flush_ops
));
6204 // Now we have a log that cannot be deleted
6206 ASSERT_OK(db
->Put(write_options
, cf_handle
, "foo1", "bar1"));
6207 // Flush only the 2nd cf
6208 ASSERT_OK(db
->Flush(flush_ops
, cf_handle
));
6210 // The value is large enough to be touched by the corruption we ingest
6212 std::string
large_value(400, ' ');
6213 // key/value not touched by corruption
6214 ASSERT_OK(db
->Put(write_options
, "foo2", "bar2"));
6215 // key/value touched by corruption
6216 ASSERT_OK(db
->Put(write_options
, "foo3", large_value
));
6217 // key/value not touched by corruption
6218 ASSERT_OK(db
->Put(write_options
, "foo4", "bar4"));
6220 ASSERT_OK(db
->FlushWAL(true));
6221 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
->GetRootDB());
6222 uint64_t wal_file_id
= db_impl
->TEST_LogfileNumber();
6223 std::string fname
= LogFileName(dbname
, wal_file_id
);
6224 reinterpret_cast<PessimisticTransactionDB
*>(db
)->TEST_Crash();
6230 // Corrupt the last log file in the middle, so that it is not corrupted
6232 std::string file_content
;
6233 ASSERT_OK(ReadFileToString(env
, fname
, &file_content
));
6234 file_content
[400] = 'h';
6235 file_content
[401] = 'a';
6236 ASSERT_OK(env
->DeleteFile(fname
));
6237 ASSERT_OK(WriteStringToFile(env
, file_content
, fname
, true));
6239 // Recover from corruption
6240 std::vector
<ColumnFamilyHandle
*> handles
;
6241 std::vector
<ColumnFamilyDescriptor
> column_families
;
6242 column_families
.push_back(ColumnFamilyDescriptor(kDefaultColumnFamilyName
,
6243 ColumnFamilyOptions()));
6244 column_families
.push_back(
6245 ColumnFamilyDescriptor("two", ColumnFamilyOptions()));
6246 ASSERT_OK(ReOpenNoDelete(column_families
, &handles
));
6248 if (write_after_recovery
) {
6249 // Write data to the log right after the corrupted log
6250 ASSERT_OK(db
->Put(write_options
, "foo5", large_value
));
6253 // Persist data written to WAL during recovery or by the last Put
6254 ASSERT_OK(db
->FlushWAL(true));
6255 // 2nd crash to recover while having a valid log after the corrupted one.
6256 ASSERT_OK(ReOpenNoDelete(column_families
, &handles
));
6257 assert(db
!= nullptr);
6258 txn
= db
->GetTransactionByName("xid");
6259 ASSERT_TRUE(txn
!= nullptr);
6260 ASSERT_OK(txn
->Commit());
6262 for (auto handle
: handles
) {
6269 TEST_P(TransactionTest
, CommitWithoutPrepare
) {
6271 // skip_prepare = false.
6272 WriteOptions write_options
;
6273 TransactionOptions txn_options
;
6274 txn_options
.skip_prepare
= false;
6275 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
6276 ASSERT_TRUE(txn
->Commit().IsTxnNotPrepared());
6281 // skip_prepare = true.
6282 WriteOptions write_options
;
6283 TransactionOptions txn_options
;
6284 txn_options
.skip_prepare
= true;
6285 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
6286 ASSERT_OK(txn
->Commit());
6291 } // namespace ROCKSDB_NAMESPACE
6293 int main(int argc
, char** argv
) {
6294 ::testing::InitGoogleTest(&argc
, argv
);
6295 return RUN_ALL_TESTS();
6301 int main(int /*argc*/, char** /*argv*/) {
6303 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
6307 #endif // ROCKSDB_LITE