1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
12 #include "utilities/transactions/transaction_test.h"
13 #include "utilities/transactions/write_unprepared_txn.h"
14 #include "utilities/transactions/write_unprepared_txn_db.h"
18 class WriteUnpreparedTransactionTestBase
: public TransactionTestBase
{
20 WriteUnpreparedTransactionTestBase(bool use_stackable_db
,
22 TxnDBWritePolicy write_policy
)
23 : TransactionTestBase(use_stackable_db
, two_write_queue
, write_policy
){}
26 class WriteUnpreparedTransactionTest
27 : public WriteUnpreparedTransactionTestBase
,
28 virtual public ::testing::WithParamInterface
<
29 std::tuple
<bool, bool, TxnDBWritePolicy
>> {
31 WriteUnpreparedTransactionTest()
32 : WriteUnpreparedTransactionTestBase(std::get
<0>(GetParam()),
33 std::get
<1>(GetParam()),
34 std::get
<2>(GetParam())){}
37 INSTANTIATE_TEST_CASE_P(
38 WriteUnpreparedTransactionTest
, WriteUnpreparedTransactionTest
,
39 ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED
),
40 std::make_tuple(false, true, WRITE_UNPREPARED
)));
42 TEST_P(WriteUnpreparedTransactionTest
, ReadYourOwnWrite
) {
43 auto verify_state
= [](Iterator
* iter
, const std::string
& key
,
44 const std::string
& value
) {
45 ASSERT_TRUE(iter
->Valid());
46 ASSERT_OK(iter
->status());
47 ASSERT_EQ(key
, iter
->key().ToString());
48 ASSERT_EQ(value
, iter
->value().ToString());
51 options
.disable_auto_compactions
= true;
54 // The following tests checks whether reading your own write for
55 // a transaction works for write unprepared, when there are uncommitted
56 // values written into DB.
58 // Although the values written by DB::Put are technically committed, we add
59 // their seq num to unprep_seqs_ to pretend that they were written into DB
60 // as part of an unprepared batch, and then check if they are visible to the
62 auto snapshot0
= db
->GetSnapshot();
63 ASSERT_OK(db
->Put(WriteOptions(), "a", "v1"));
64 ASSERT_OK(db
->Put(WriteOptions(), "b", "v2"));
65 auto snapshot2
= db
->GetSnapshot();
66 ASSERT_OK(db
->Put(WriteOptions(), "a", "v3"));
67 ASSERT_OK(db
->Put(WriteOptions(), "b", "v4"));
68 auto snapshot4
= db
->GetSnapshot();
69 ASSERT_OK(db
->Put(WriteOptions(), "a", "v5"));
70 ASSERT_OK(db
->Put(WriteOptions(), "b", "v6"));
71 auto snapshot6
= db
->GetSnapshot();
72 ASSERT_OK(db
->Put(WriteOptions(), "a", "v7"));
73 ASSERT_OK(db
->Put(WriteOptions(), "b", "v8"));
74 auto snapshot8
= db
->GetSnapshot();
76 TransactionOptions txn_options
;
77 WriteOptions write_options
;
78 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
79 WriteUnpreparedTxn
* wup_txn
= dynamic_cast<WriteUnpreparedTxn
*>(txn
);
82 roptions
.snapshot
= snapshot0
;
84 auto iter
= txn
->GetIterator(roptions
);
88 wup_txn
->unprep_seqs_
[snapshot2
->GetSequenceNumber() + 1] =
89 snapshot4
->GetSequenceNumber() - snapshot2
->GetSequenceNumber();
91 ASSERT_OK(txn
->Get(roptions
, Slice("a"), &value
));
92 ASSERT_EQ(value
, "v3");
94 ASSERT_OK(txn
->Get(roptions
, Slice("b"), &value
));
95 ASSERT_EQ(value
, "v4");
97 wup_txn
->unprep_seqs_
[snapshot6
->GetSequenceNumber() + 1] =
98 snapshot8
->GetSequenceNumber() - snapshot6
->GetSequenceNumber();
100 ASSERT_OK(txn
->Get(roptions
, Slice("a"), &value
));
101 ASSERT_EQ(value
, "v7");
103 ASSERT_OK(txn
->Get(roptions
, Slice("b"), &value
));
104 ASSERT_EQ(value
, "v8");
106 wup_txn
->unprep_seqs_
.clear();
109 wup_txn
->unprep_seqs_
[snapshot2
->GetSequenceNumber() + 1] =
110 snapshot4
->GetSequenceNumber() - snapshot2
->GetSequenceNumber();
113 verify_state(iter
, "a", "v3");
116 verify_state(iter
, "b", "v4");
119 verify_state(iter
, "a", "v3");
122 verify_state(iter
, "b", "v4");
124 wup_txn
->unprep_seqs_
[snapshot6
->GetSequenceNumber() + 1] =
125 snapshot8
->GetSequenceNumber() - snapshot6
->GetSequenceNumber();
128 verify_state(iter
, "a", "v7");
131 verify_state(iter
, "b", "v8");
134 verify_state(iter
, "a", "v7");
137 verify_state(iter
, "b", "v8");
139 wup_txn
->unprep_seqs_
.clear();
141 // Test Prev(). For Prev(), we need to adjust the snapshot to match what is
142 // possible in WriteUnpreparedTxn.
144 // Because of row locks and ValidateSnapshot, there cannot be any committed
145 // entries after snapshot, but before the first prepared key.
147 roptions
.snapshot
= snapshot2
;
148 iter
= txn
->GetIterator(roptions
);
149 wup_txn
->unprep_seqs_
[snapshot2
->GetSequenceNumber() + 1] =
150 snapshot4
->GetSequenceNumber() - snapshot2
->GetSequenceNumber();
152 iter
->SeekForPrev("b");
153 verify_state(iter
, "b", "v4");
156 verify_state(iter
, "a", "v3");
159 verify_state(iter
, "b", "v4");
162 verify_state(iter
, "a", "v3");
165 roptions
.snapshot
= snapshot6
;
166 iter
= txn
->GetIterator(roptions
);
167 wup_txn
->unprep_seqs_
[snapshot6
->GetSequenceNumber() + 1] =
168 snapshot8
->GetSequenceNumber() - snapshot6
->GetSequenceNumber();
170 iter
->SeekForPrev("b");
171 verify_state(iter
, "b", "v8");
174 verify_state(iter
, "a", "v7");
177 verify_state(iter
, "b", "v8");
180 verify_state(iter
, "a", "v7");
182 // Since the unprep_seqs_ data were faked for testing, we do not want the
183 // destructor for the transaction to be rolling back data that did not
185 wup_txn
->unprep_seqs_
.clear();
187 db
->ReleaseSnapshot(snapshot0
);
188 db
->ReleaseSnapshot(snapshot2
);
189 db
->ReleaseSnapshot(snapshot4
);
190 db
->ReleaseSnapshot(snapshot6
);
191 db
->ReleaseSnapshot(snapshot8
);
196 // This tests how write unprepared behaves during recovery when the DB crashes
197 // after a transaction has either been unprepared or prepared, and tests if
198 // the changes are correctly applied for prepared transactions if we decide to
200 TEST_P(WriteUnpreparedTransactionTest
, RecoveryTest
) {
201 WriteOptions write_options
;
202 write_options
.disableWAL
= false;
203 TransactionOptions txn_options
;
204 std::vector
<Transaction
*> prepared_trans
;
205 WriteUnpreparedTxnDB
* wup_db
;
206 options
.disable_auto_compactions
= true;
208 enum Action
{ UNPREPARED
, ROLLBACK
, COMMIT
};
210 // batch_size of 1 causes writes to DB for every marker.
211 for (size_t batch_size
: {1, 1000000}) {
212 txn_options
.max_write_batch_size
= batch_size
;
213 for (bool empty
: {true, false}) {
214 for (Action a
: {UNPREPARED
, ROLLBACK
, COMMIT
}) {
215 for (int num_batches
= 1; num_batches
< 10; num_batches
++) {
217 prepared_trans
.clear();
219 wup_db
= dynamic_cast<WriteUnpreparedTxnDB
*>(db
);
221 for (int i
= 0; i
< num_batches
; i
++) {
222 ASSERT_OK(db
->Put(WriteOptions(), "k" + ToString(i
),
223 "before value" + ToString(i
)));
227 // Write num_batches unprepared batches.
228 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
229 WriteUnpreparedTxn
* wup_txn
= dynamic_cast<WriteUnpreparedTxn
*>(txn
);
231 for (int i
= 0; i
< num_batches
; i
++) {
232 ASSERT_OK(txn
->Put("k" + ToString(i
), "value" + ToString(i
)));
233 if (txn_options
.max_write_batch_size
== 1) {
234 ASSERT_EQ(wup_txn
->GetUnpreparedSequenceNumbers().size(), i
+ 1);
236 ASSERT_EQ(wup_txn
->GetUnpreparedSequenceNumbers().size(), 0);
239 if (a
== UNPREPARED
) {
240 // This is done to prevent the destructor from rolling back the
241 // transaction for us, since we want to pretend we crashed and
242 // test that recovery does the rollback.
243 wup_txn
->unprep_seqs_
.clear();
249 // Crash and run recovery code paths.
250 wup_db
->db_impl_
->FlushWAL(true);
251 wup_db
->TEST_Crash();
253 assert(db
!= nullptr);
255 db
->GetAllPreparedTransactions(&prepared_trans
);
256 ASSERT_EQ(prepared_trans
.size(), a
== UNPREPARED
? 0 : 1);
258 ASSERT_OK(prepared_trans
[0]->Rollback());
259 delete prepared_trans
[0];
260 } else if (a
== COMMIT
) {
261 ASSERT_OK(prepared_trans
[0]->Commit());
262 delete prepared_trans
[0];
265 Iterator
* iter
= db
->NewIterator(ReadOptions());
267 // Check that DB has before values.
268 if (!empty
|| a
== COMMIT
) {
269 for (int i
= 0; i
< num_batches
; i
++) {
270 ASSERT_TRUE(iter
->Valid());
271 ASSERT_EQ(iter
->key().ToString(), "k" + ToString(i
));
273 ASSERT_EQ(iter
->value().ToString(), "value" + ToString(i
));
275 ASSERT_EQ(iter
->value().ToString(),
276 "before value" + ToString(i
));
281 ASSERT_FALSE(iter
->Valid());
289 // Basic test to see that unprepared batch gets written to DB when batch size
290 // is exceeded. It also does some basic checks to see if commit/rollback works
291 // as expected for write unprepared.
292 TEST_P(WriteUnpreparedTransactionTest
, UnpreparedBatch
) {
293 WriteOptions write_options
;
294 TransactionOptions txn_options
;
295 const int kNumKeys
= 10;
297 // batch_size of 1 causes writes to DB for every marker.
298 for (size_t batch_size
: {1, 1000000}) {
299 txn_options
.max_write_batch_size
= batch_size
;
300 for (bool prepare
: {false, true}) {
301 for (bool commit
: {false, true}) {
303 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
304 WriteUnpreparedTxn
* wup_txn
= dynamic_cast<WriteUnpreparedTxn
*>(txn
);
307 for (int i
= 0; i
< kNumKeys
; i
++) {
308 txn
->Put("k" + ToString(i
), "v" + ToString(i
));
309 if (txn_options
.max_write_batch_size
== 1) {
310 ASSERT_EQ(wup_txn
->GetUnpreparedSequenceNumbers().size(), i
+ 1);
312 ASSERT_EQ(wup_txn
->GetUnpreparedSequenceNumbers().size(), 0);
317 ASSERT_OK(txn
->Prepare());
320 Iterator
* iter
= db
->NewIterator(ReadOptions());
322 assert(!iter
->Valid());
323 ASSERT_FALSE(iter
->Valid());
327 ASSERT_OK(txn
->Commit());
329 ASSERT_OK(txn
->Rollback());
333 iter
= db
->NewIterator(ReadOptions());
336 for (int i
= 0; i
< (commit
? kNumKeys
: 0); i
++) {
337 ASSERT_TRUE(iter
->Valid());
338 ASSERT_EQ(iter
->key().ToString(), "k" + ToString(i
));
339 ASSERT_EQ(iter
->value().ToString(), "v" + ToString(i
));
342 ASSERT_FALSE(iter
->Valid());
349 // Test whether logs containing unprepared/prepared batches are kept even
350 // after memtable finishes flushing, and whether they are removed when
351 // transaction commits/aborts.
353 // TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests.
354 TEST_P(WriteUnpreparedTransactionTest
, MarkLogWithPrepSection
) {
355 WriteOptions write_options
;
356 TransactionOptions txn_options
;
357 // batch_size of 1 causes writes to DB for every marker.
358 txn_options
.max_write_batch_size
= 1;
359 const int kNumKeys
= 10;
364 for (bool prepare
: {false, true}) {
365 for (bool commit
: {false, true}) {
367 auto wup_db
= dynamic_cast<WriteUnpreparedTxnDB
*>(db
);
368 auto db_impl
= wup_db
->db_impl_
;
370 Transaction
* txn1
= db
->BeginTransaction(write_options
, txn_options
);
371 ASSERT_OK(txn1
->SetName("xid1"));
373 Transaction
* txn2
= db
->BeginTransaction(write_options
, txn_options
);
374 ASSERT_OK(txn2
->SetName("xid2"));
376 // Spread this transaction across multiple log files.
377 for (int i
= 0; i
< kNumKeys
; i
++) {
378 ASSERT_OK(txn1
->Put("k1" + ToString(i
), "v" + ToString(i
)));
379 if (i
>= kNumKeys
/ 2) {
380 ASSERT_OK(txn2
->Put("k2" + ToString(i
), "v" + ToString(i
)));
384 db_impl
->TEST_SwitchWAL();
388 ASSERT_GT(txn1
->GetLogNumber(), 0);
389 ASSERT_GT(txn2
->GetLogNumber(), 0);
391 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(),
392 txn1
->GetLogNumber());
393 ASSERT_GT(db_impl
->TEST_LogfileNumber(), txn1
->GetLogNumber());
396 ASSERT_OK(txn1
->Prepare());
397 ASSERT_OK(txn2
->Prepare());
400 ASSERT_GE(db_impl
->TEST_LogfileNumber(), txn1
->GetLogNumber());
401 ASSERT_GE(db_impl
->TEST_LogfileNumber(), txn2
->GetLogNumber());
403 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(),
404 txn1
->GetLogNumber());
406 ASSERT_OK(txn1
->Commit());
408 ASSERT_OK(txn1
->Rollback());
411 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(),
412 txn2
->GetLogNumber());
415 ASSERT_OK(txn2
->Commit());
417 ASSERT_OK(txn2
->Rollback());
420 ASSERT_EQ(db_impl
->TEST_FindMinLogContainingOutstandingPrep(), 0);
428 } // namespace rocksdb
430 int main(int argc
, char** argv
) {
431 ::testing::InitGoogleTest(&argc
, argv
);
432 return RUN_ALL_TESTS();
438 int main(int /*argc*/, char** /*argv*/) {
440 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
444 #endif // ROCKSDB_LITE