]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_unprepared_transaction_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_unprepared_transaction_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
10 #endif
11
12 #include "utilities/transactions/transaction_test.h"
13 #include "utilities/transactions/write_unprepared_txn.h"
14 #include "utilities/transactions/write_unprepared_txn_db.h"
15
16 namespace rocksdb {
17
18 class WriteUnpreparedTransactionTestBase : public TransactionTestBase {
19 public:
20 WriteUnpreparedTransactionTestBase(bool use_stackable_db,
21 bool two_write_queue,
22 TxnDBWritePolicy write_policy)
23 : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){}
24 };
25
26 class WriteUnpreparedTransactionTest
27 : public WriteUnpreparedTransactionTestBase,
28 virtual public ::testing::WithParamInterface<
29 std::tuple<bool, bool, TxnDBWritePolicy>> {
30 public:
31 WriteUnpreparedTransactionTest()
32 : WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()),
33 std::get<1>(GetParam()),
34 std::get<2>(GetParam())){}
35 };
36
37 INSTANTIATE_TEST_CASE_P(
38 WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest,
39 ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED),
40 std::make_tuple(false, true, WRITE_UNPREPARED)));
41
42 TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
43 auto verify_state = [](Iterator* iter, const std::string& key,
44 const std::string& value) {
45 ASSERT_TRUE(iter->Valid());
46 ASSERT_OK(iter->status());
47 ASSERT_EQ(key, iter->key().ToString());
48 ASSERT_EQ(value, iter->value().ToString());
49 };
50
51 options.disable_auto_compactions = true;
52 ReOpen();
53
54 // The following tests checks whether reading your own write for
55 // a transaction works for write unprepared, when there are uncommitted
56 // values written into DB.
57 //
58 // Although the values written by DB::Put are technically committed, we add
59 // their seq num to unprep_seqs_ to pretend that they were written into DB
60 // as part of an unprepared batch, and then check if they are visible to the
61 // transaction.
62 auto snapshot0 = db->GetSnapshot();
63 ASSERT_OK(db->Put(WriteOptions(), "a", "v1"));
64 ASSERT_OK(db->Put(WriteOptions(), "b", "v2"));
65 auto snapshot2 = db->GetSnapshot();
66 ASSERT_OK(db->Put(WriteOptions(), "a", "v3"));
67 ASSERT_OK(db->Put(WriteOptions(), "b", "v4"));
68 auto snapshot4 = db->GetSnapshot();
69 ASSERT_OK(db->Put(WriteOptions(), "a", "v5"));
70 ASSERT_OK(db->Put(WriteOptions(), "b", "v6"));
71 auto snapshot6 = db->GetSnapshot();
72 ASSERT_OK(db->Put(WriteOptions(), "a", "v7"));
73 ASSERT_OK(db->Put(WriteOptions(), "b", "v8"));
74 auto snapshot8 = db->GetSnapshot();
75
76 TransactionOptions txn_options;
77 WriteOptions write_options;
78 Transaction* txn = db->BeginTransaction(write_options, txn_options);
79 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
80
81 ReadOptions roptions;
82 roptions.snapshot = snapshot0;
83
84 auto iter = txn->GetIterator(roptions);
85
86 // Test Get().
87 std::string value;
88 wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
89 snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
90
91 ASSERT_OK(txn->Get(roptions, Slice("a"), &value));
92 ASSERT_EQ(value, "v3");
93
94 ASSERT_OK(txn->Get(roptions, Slice("b"), &value));
95 ASSERT_EQ(value, "v4");
96
97 wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
98 snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
99
100 ASSERT_OK(txn->Get(roptions, Slice("a"), &value));
101 ASSERT_EQ(value, "v7");
102
103 ASSERT_OK(txn->Get(roptions, Slice("b"), &value));
104 ASSERT_EQ(value, "v8");
105
106 wup_txn->unprep_seqs_.clear();
107
108 // Test Next().
109 wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
110 snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
111
112 iter->Seek("a");
113 verify_state(iter, "a", "v3");
114
115 iter->Next();
116 verify_state(iter, "b", "v4");
117
118 iter->SeekToFirst();
119 verify_state(iter, "a", "v3");
120
121 iter->Next();
122 verify_state(iter, "b", "v4");
123
124 wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
125 snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
126
127 iter->Seek("a");
128 verify_state(iter, "a", "v7");
129
130 iter->Next();
131 verify_state(iter, "b", "v8");
132
133 iter->SeekToFirst();
134 verify_state(iter, "a", "v7");
135
136 iter->Next();
137 verify_state(iter, "b", "v8");
138
139 wup_txn->unprep_seqs_.clear();
140
141 // Test Prev(). For Prev(), we need to adjust the snapshot to match what is
142 // possible in WriteUnpreparedTxn.
143 //
144 // Because of row locks and ValidateSnapshot, there cannot be any committed
145 // entries after snapshot, but before the first prepared key.
146 delete iter;
147 roptions.snapshot = snapshot2;
148 iter = txn->GetIterator(roptions);
149 wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
150 snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
151
152 iter->SeekForPrev("b");
153 verify_state(iter, "b", "v4");
154
155 iter->Prev();
156 verify_state(iter, "a", "v3");
157
158 iter->SeekToLast();
159 verify_state(iter, "b", "v4");
160
161 iter->Prev();
162 verify_state(iter, "a", "v3");
163
164 delete iter;
165 roptions.snapshot = snapshot6;
166 iter = txn->GetIterator(roptions);
167 wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
168 snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
169
170 iter->SeekForPrev("b");
171 verify_state(iter, "b", "v8");
172
173 iter->Prev();
174 verify_state(iter, "a", "v7");
175
176 iter->SeekToLast();
177 verify_state(iter, "b", "v8");
178
179 iter->Prev();
180 verify_state(iter, "a", "v7");
181
182 // Since the unprep_seqs_ data were faked for testing, we do not want the
183 // destructor for the transaction to be rolling back data that did not
184 // exist.
185 wup_txn->unprep_seqs_.clear();
186
187 db->ReleaseSnapshot(snapshot0);
188 db->ReleaseSnapshot(snapshot2);
189 db->ReleaseSnapshot(snapshot4);
190 db->ReleaseSnapshot(snapshot6);
191 db->ReleaseSnapshot(snapshot8);
192 delete iter;
193 delete txn;
194 }
195
196 // This tests how write unprepared behaves during recovery when the DB crashes
197 // after a transaction has either been unprepared or prepared, and tests if
198 // the changes are correctly applied for prepared transactions if we decide to
199 // rollback/commit.
200 TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
201 WriteOptions write_options;
202 write_options.disableWAL = false;
203 TransactionOptions txn_options;
204 std::vector<Transaction*> prepared_trans;
205 WriteUnpreparedTxnDB* wup_db;
206 options.disable_auto_compactions = true;
207
208 enum Action { UNPREPARED, ROLLBACK, COMMIT };
209
210 // batch_size of 1 causes writes to DB for every marker.
211 for (size_t batch_size : {1, 1000000}) {
212 txn_options.max_write_batch_size = batch_size;
213 for (bool empty : {true, false}) {
214 for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) {
215 for (int num_batches = 1; num_batches < 10; num_batches++) {
216 // Reset database.
217 prepared_trans.clear();
218 ReOpen();
219 wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
220 if (!empty) {
221 for (int i = 0; i < num_batches; i++) {
222 ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i),
223 "before value" + ToString(i)));
224 }
225 }
226
227 // Write num_batches unprepared batches.
228 Transaction* txn = db->BeginTransaction(write_options, txn_options);
229 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
230 txn->SetName("xid");
231 for (int i = 0; i < num_batches; i++) {
232 ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i)));
233 if (txn_options.max_write_batch_size == 1) {
234 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1);
235 } else {
236 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
237 }
238 }
239 if (a == UNPREPARED) {
240 // This is done to prevent the destructor from rolling back the
241 // transaction for us, since we want to pretend we crashed and
242 // test that recovery does the rollback.
243 wup_txn->unprep_seqs_.clear();
244 } else {
245 txn->Prepare();
246 }
247 delete txn;
248
249 // Crash and run recovery code paths.
250 wup_db->db_impl_->FlushWAL(true);
251 wup_db->TEST_Crash();
252 ReOpenNoDelete();
253 assert(db != nullptr);
254
255 db->GetAllPreparedTransactions(&prepared_trans);
256 ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1);
257 if (a == ROLLBACK) {
258 ASSERT_OK(prepared_trans[0]->Rollback());
259 delete prepared_trans[0];
260 } else if (a == COMMIT) {
261 ASSERT_OK(prepared_trans[0]->Commit());
262 delete prepared_trans[0];
263 }
264
265 Iterator* iter = db->NewIterator(ReadOptions());
266 iter->SeekToFirst();
267 // Check that DB has before values.
268 if (!empty || a == COMMIT) {
269 for (int i = 0; i < num_batches; i++) {
270 ASSERT_TRUE(iter->Valid());
271 ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
272 if (a == COMMIT) {
273 ASSERT_EQ(iter->value().ToString(), "value" + ToString(i));
274 } else {
275 ASSERT_EQ(iter->value().ToString(),
276 "before value" + ToString(i));
277 }
278 iter->Next();
279 }
280 }
281 ASSERT_FALSE(iter->Valid());
282 delete iter;
283 }
284 }
285 }
286 }
287 }
288
289 // Basic test to see that unprepared batch gets written to DB when batch size
290 // is exceeded. It also does some basic checks to see if commit/rollback works
291 // as expected for write unprepared.
292 TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
293 WriteOptions write_options;
294 TransactionOptions txn_options;
295 const int kNumKeys = 10;
296
297 // batch_size of 1 causes writes to DB for every marker.
298 for (size_t batch_size : {1, 1000000}) {
299 txn_options.max_write_batch_size = batch_size;
300 for (bool prepare : {false, true}) {
301 for (bool commit : {false, true}) {
302 ReOpen();
303 Transaction* txn = db->BeginTransaction(write_options, txn_options);
304 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
305 txn->SetName("xid");
306
307 for (int i = 0; i < kNumKeys; i++) {
308 txn->Put("k" + ToString(i), "v" + ToString(i));
309 if (txn_options.max_write_batch_size == 1) {
310 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1);
311 } else {
312 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
313 }
314 }
315
316 if (prepare) {
317 ASSERT_OK(txn->Prepare());
318 }
319
320 Iterator* iter = db->NewIterator(ReadOptions());
321 iter->SeekToFirst();
322 assert(!iter->Valid());
323 ASSERT_FALSE(iter->Valid());
324 delete iter;
325
326 if (commit) {
327 ASSERT_OK(txn->Commit());
328 } else {
329 ASSERT_OK(txn->Rollback());
330 }
331 delete txn;
332
333 iter = db->NewIterator(ReadOptions());
334 iter->SeekToFirst();
335
336 for (int i = 0; i < (commit ? kNumKeys : 0); i++) {
337 ASSERT_TRUE(iter->Valid());
338 ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
339 ASSERT_EQ(iter->value().ToString(), "v" + ToString(i));
340 iter->Next();
341 }
342 ASSERT_FALSE(iter->Valid());
343 delete iter;
344 }
345 }
346 }
347 }
348
349 // Test whether logs containing unprepared/prepared batches are kept even
350 // after memtable finishes flushing, and whether they are removed when
351 // transaction commits/aborts.
352 //
353 // TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests.
354 TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) {
355 WriteOptions write_options;
356 TransactionOptions txn_options;
357 // batch_size of 1 causes writes to DB for every marker.
358 txn_options.max_write_batch_size = 1;
359 const int kNumKeys = 10;
360
361 WriteOptions wopts;
362 wopts.sync = true;
363
364 for (bool prepare : {false, true}) {
365 for (bool commit : {false, true}) {
366 ReOpen();
367 auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
368 auto db_impl = wup_db->db_impl_;
369
370 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
371 ASSERT_OK(txn1->SetName("xid1"));
372
373 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
374 ASSERT_OK(txn2->SetName("xid2"));
375
376 // Spread this transaction across multiple log files.
377 for (int i = 0; i < kNumKeys; i++) {
378 ASSERT_OK(txn1->Put("k1" + ToString(i), "v" + ToString(i)));
379 if (i >= kNumKeys / 2) {
380 ASSERT_OK(txn2->Put("k2" + ToString(i), "v" + ToString(i)));
381 }
382
383 if (i > 0) {
384 db_impl->TEST_SwitchWAL();
385 }
386 }
387
388 ASSERT_GT(txn1->GetLogNumber(), 0);
389 ASSERT_GT(txn2->GetLogNumber(), 0);
390
391 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
392 txn1->GetLogNumber());
393 ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
394
395 if (prepare) {
396 ASSERT_OK(txn1->Prepare());
397 ASSERT_OK(txn2->Prepare());
398 }
399
400 ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
401 ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
402
403 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
404 txn1->GetLogNumber());
405 if (commit) {
406 ASSERT_OK(txn1->Commit());
407 } else {
408 ASSERT_OK(txn1->Rollback());
409 }
410
411 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
412 txn2->GetLogNumber());
413
414 if (commit) {
415 ASSERT_OK(txn2->Commit());
416 } else {
417 ASSERT_OK(txn2->Rollback());
418 }
419
420 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
421
422 delete txn1;
423 delete txn2;
424 }
425 }
426 }
427
428 } // namespace rocksdb
429
430 int main(int argc, char** argv) {
431 ::testing::InitGoogleTest(&argc, argv);
432 return RUN_ALL_TESTS();
433 }
434
435 #else
436 #include <stdio.h>
437
438 int main(int /*argc*/, char** /*argv*/) {
439 fprintf(stderr,
440 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
441 return 0;
442 }
443
444 #endif // ROCKSDB_LITE