]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/write_unprepared_transaction_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_unprepared_transaction_test.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#ifndef ROCKSDB_LITE
7
8#ifndef __STDC_FORMAT_MACROS
9#define __STDC_FORMAT_MACROS
10#endif
11
12#include "utilities/transactions/transaction_test.h"
13#include "utilities/transactions/write_unprepared_txn.h"
14#include "utilities/transactions/write_unprepared_txn_db.h"
15
16namespace rocksdb {
17
18class WriteUnpreparedTransactionTestBase : public TransactionTestBase {
19 public:
20 WriteUnpreparedTransactionTestBase(bool use_stackable_db,
21 bool two_write_queue,
22 TxnDBWritePolicy write_policy)
23 : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){}
24};
25
26class WriteUnpreparedTransactionTest
27 : public WriteUnpreparedTransactionTestBase,
28 virtual public ::testing::WithParamInterface<
29 std::tuple<bool, bool, TxnDBWritePolicy>> {
30 public:
31 WriteUnpreparedTransactionTest()
32 : WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()),
33 std::get<1>(GetParam()),
34 std::get<2>(GetParam())){}
35};
36
37INSTANTIATE_TEST_CASE_P(
38 WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest,
39 ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED),
40 std::make_tuple(false, true, WRITE_UNPREPARED)));
41
42TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
43 auto verify_state = [](Iterator* iter, const std::string& key,
44 const std::string& value) {
45 ASSERT_TRUE(iter->Valid());
46 ASSERT_OK(iter->status());
47 ASSERT_EQ(key, iter->key().ToString());
48 ASSERT_EQ(value, iter->value().ToString());
49 };
50
51 options.disable_auto_compactions = true;
52 ReOpen();
53
54 // The following tests checks whether reading your own write for
55 // a transaction works for write unprepared, when there are uncommitted
56 // values written into DB.
57 //
58 // Although the values written by DB::Put are technically committed, we add
59 // their seq num to unprep_seqs_ to pretend that they were written into DB
60 // as part of an unprepared batch, and then check if they are visible to the
61 // transaction.
62 auto snapshot0 = db->GetSnapshot();
63 ASSERT_OK(db->Put(WriteOptions(), "a", "v1"));
64 ASSERT_OK(db->Put(WriteOptions(), "b", "v2"));
65 auto snapshot2 = db->GetSnapshot();
66 ASSERT_OK(db->Put(WriteOptions(), "a", "v3"));
67 ASSERT_OK(db->Put(WriteOptions(), "b", "v4"));
68 auto snapshot4 = db->GetSnapshot();
69 ASSERT_OK(db->Put(WriteOptions(), "a", "v5"));
70 ASSERT_OK(db->Put(WriteOptions(), "b", "v6"));
71 auto snapshot6 = db->GetSnapshot();
72 ASSERT_OK(db->Put(WriteOptions(), "a", "v7"));
73 ASSERT_OK(db->Put(WriteOptions(), "b", "v8"));
74 auto snapshot8 = db->GetSnapshot();
75
76 TransactionOptions txn_options;
77 WriteOptions write_options;
78 Transaction* txn = db->BeginTransaction(write_options, txn_options);
79 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
80
81 ReadOptions roptions;
82 roptions.snapshot = snapshot0;
83
494da23a
TL
84 wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
85 snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
11fdf7f2
TL
86 auto iter = txn->GetIterator(roptions);
87
88 // Test Get().
89 std::string value;
11fdf7f2
TL
90
91 ASSERT_OK(txn->Get(roptions, Slice("a"), &value));
92 ASSERT_EQ(value, "v3");
93
94 ASSERT_OK(txn->Get(roptions, Slice("b"), &value));
95 ASSERT_EQ(value, "v4");
96
97 wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
98 snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
494da23a
TL
99 delete iter;
100 iter = txn->GetIterator(roptions);
11fdf7f2
TL
101
102 ASSERT_OK(txn->Get(roptions, Slice("a"), &value));
103 ASSERT_EQ(value, "v7");
104
105 ASSERT_OK(txn->Get(roptions, Slice("b"), &value));
106 ASSERT_EQ(value, "v8");
107
108 wup_txn->unprep_seqs_.clear();
109
110 // Test Next().
111 wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
112 snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
494da23a
TL
113 delete iter;
114 iter = txn->GetIterator(roptions);
11fdf7f2
TL
115
116 iter->Seek("a");
117 verify_state(iter, "a", "v3");
118
119 iter->Next();
120 verify_state(iter, "b", "v4");
121
122 iter->SeekToFirst();
123 verify_state(iter, "a", "v3");
124
125 iter->Next();
126 verify_state(iter, "b", "v4");
127
128 wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
129 snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
494da23a
TL
130 delete iter;
131 iter = txn->GetIterator(roptions);
11fdf7f2
TL
132
133 iter->Seek("a");
134 verify_state(iter, "a", "v7");
135
136 iter->Next();
137 verify_state(iter, "b", "v8");
138
139 iter->SeekToFirst();
140 verify_state(iter, "a", "v7");
141
142 iter->Next();
143 verify_state(iter, "b", "v8");
144
145 wup_txn->unprep_seqs_.clear();
146
147 // Test Prev(). For Prev(), we need to adjust the snapshot to match what is
148 // possible in WriteUnpreparedTxn.
149 //
150 // Because of row locks and ValidateSnapshot, there cannot be any committed
151 // entries after snapshot, but before the first prepared key.
11fdf7f2 152 roptions.snapshot = snapshot2;
11fdf7f2
TL
153 wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
154 snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
494da23a
TL
155 delete iter;
156 iter = txn->GetIterator(roptions);
11fdf7f2
TL
157
158 iter->SeekForPrev("b");
159 verify_state(iter, "b", "v4");
160
161 iter->Prev();
162 verify_state(iter, "a", "v3");
163
164 iter->SeekToLast();
165 verify_state(iter, "b", "v4");
166
167 iter->Prev();
168 verify_state(iter, "a", "v3");
169
11fdf7f2 170 roptions.snapshot = snapshot6;
11fdf7f2
TL
171 wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
172 snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
494da23a
TL
173 delete iter;
174 iter = txn->GetIterator(roptions);
11fdf7f2
TL
175
176 iter->SeekForPrev("b");
177 verify_state(iter, "b", "v8");
178
179 iter->Prev();
180 verify_state(iter, "a", "v7");
181
182 iter->SeekToLast();
183 verify_state(iter, "b", "v8");
184
185 iter->Prev();
186 verify_state(iter, "a", "v7");
187
188 // Since the unprep_seqs_ data were faked for testing, we do not want the
189 // destructor for the transaction to be rolling back data that did not
190 // exist.
191 wup_txn->unprep_seqs_.clear();
192
193 db->ReleaseSnapshot(snapshot0);
194 db->ReleaseSnapshot(snapshot2);
195 db->ReleaseSnapshot(snapshot4);
196 db->ReleaseSnapshot(snapshot6);
197 db->ReleaseSnapshot(snapshot8);
198 delete iter;
199 delete txn;
200}
201
202// This tests how write unprepared behaves during recovery when the DB crashes
203// after a transaction has either been unprepared or prepared, and tests if
204// the changes are correctly applied for prepared transactions if we decide to
205// rollback/commit.
206TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
207 WriteOptions write_options;
208 write_options.disableWAL = false;
209 TransactionOptions txn_options;
210 std::vector<Transaction*> prepared_trans;
211 WriteUnpreparedTxnDB* wup_db;
212 options.disable_auto_compactions = true;
213
214 enum Action { UNPREPARED, ROLLBACK, COMMIT };
215
216 // batch_size of 1 causes writes to DB for every marker.
217 for (size_t batch_size : {1, 1000000}) {
218 txn_options.max_write_batch_size = batch_size;
219 for (bool empty : {true, false}) {
220 for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) {
221 for (int num_batches = 1; num_batches < 10; num_batches++) {
222 // Reset database.
223 prepared_trans.clear();
224 ReOpen();
225 wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
226 if (!empty) {
227 for (int i = 0; i < num_batches; i++) {
228 ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i),
229 "before value" + ToString(i)));
230 }
231 }
232
233 // Write num_batches unprepared batches.
234 Transaction* txn = db->BeginTransaction(write_options, txn_options);
235 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
236 txn->SetName("xid");
237 for (int i = 0; i < num_batches; i++) {
238 ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i)));
239 if (txn_options.max_write_batch_size == 1) {
240 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1);
241 } else {
242 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
243 }
244 }
245 if (a == UNPREPARED) {
246 // This is done to prevent the destructor from rolling back the
247 // transaction for us, since we want to pretend we crashed and
248 // test that recovery does the rollback.
249 wup_txn->unprep_seqs_.clear();
250 } else {
251 txn->Prepare();
252 }
253 delete txn;
254
255 // Crash and run recovery code paths.
256 wup_db->db_impl_->FlushWAL(true);
257 wup_db->TEST_Crash();
258 ReOpenNoDelete();
259 assert(db != nullptr);
260
261 db->GetAllPreparedTransactions(&prepared_trans);
262 ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1);
263 if (a == ROLLBACK) {
264 ASSERT_OK(prepared_trans[0]->Rollback());
265 delete prepared_trans[0];
266 } else if (a == COMMIT) {
267 ASSERT_OK(prepared_trans[0]->Commit());
268 delete prepared_trans[0];
269 }
270
271 Iterator* iter = db->NewIterator(ReadOptions());
272 iter->SeekToFirst();
273 // Check that DB has before values.
274 if (!empty || a == COMMIT) {
275 for (int i = 0; i < num_batches; i++) {
276 ASSERT_TRUE(iter->Valid());
277 ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
278 if (a == COMMIT) {
279 ASSERT_EQ(iter->value().ToString(), "value" + ToString(i));
280 } else {
281 ASSERT_EQ(iter->value().ToString(),
282 "before value" + ToString(i));
283 }
284 iter->Next();
285 }
286 }
287 ASSERT_FALSE(iter->Valid());
288 delete iter;
289 }
290 }
291 }
292 }
293}
294
295// Basic test to see that unprepared batch gets written to DB when batch size
296// is exceeded. It also does some basic checks to see if commit/rollback works
297// as expected for write unprepared.
298TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
299 WriteOptions write_options;
300 TransactionOptions txn_options;
301 const int kNumKeys = 10;
302
303 // batch_size of 1 causes writes to DB for every marker.
304 for (size_t batch_size : {1, 1000000}) {
305 txn_options.max_write_batch_size = batch_size;
306 for (bool prepare : {false, true}) {
307 for (bool commit : {false, true}) {
308 ReOpen();
309 Transaction* txn = db->BeginTransaction(write_options, txn_options);
310 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
311 txn->SetName("xid");
312
313 for (int i = 0; i < kNumKeys; i++) {
314 txn->Put("k" + ToString(i), "v" + ToString(i));
315 if (txn_options.max_write_batch_size == 1) {
316 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1);
317 } else {
318 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
319 }
320 }
321
322 if (prepare) {
323 ASSERT_OK(txn->Prepare());
324 }
325
326 Iterator* iter = db->NewIterator(ReadOptions());
327 iter->SeekToFirst();
328 assert(!iter->Valid());
329 ASSERT_FALSE(iter->Valid());
330 delete iter;
331
332 if (commit) {
333 ASSERT_OK(txn->Commit());
334 } else {
335 ASSERT_OK(txn->Rollback());
336 }
337 delete txn;
338
339 iter = db->NewIterator(ReadOptions());
340 iter->SeekToFirst();
341
342 for (int i = 0; i < (commit ? kNumKeys : 0); i++) {
343 ASSERT_TRUE(iter->Valid());
344 ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
345 ASSERT_EQ(iter->value().ToString(), "v" + ToString(i));
346 iter->Next();
347 }
348 ASSERT_FALSE(iter->Valid());
349 delete iter;
350 }
351 }
352 }
353}
354
355// Test whether logs containing unprepared/prepared batches are kept even
356// after memtable finishes flushing, and whether they are removed when
357// transaction commits/aborts.
358//
359// TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests.
360TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) {
361 WriteOptions write_options;
362 TransactionOptions txn_options;
363 // batch_size of 1 causes writes to DB for every marker.
364 txn_options.max_write_batch_size = 1;
365 const int kNumKeys = 10;
366
367 WriteOptions wopts;
368 wopts.sync = true;
369
370 for (bool prepare : {false, true}) {
371 for (bool commit : {false, true}) {
372 ReOpen();
373 auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
374 auto db_impl = wup_db->db_impl_;
375
376 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
377 ASSERT_OK(txn1->SetName("xid1"));
378
379 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
380 ASSERT_OK(txn2->SetName("xid2"));
381
382 // Spread this transaction across multiple log files.
383 for (int i = 0; i < kNumKeys; i++) {
384 ASSERT_OK(txn1->Put("k1" + ToString(i), "v" + ToString(i)));
385 if (i >= kNumKeys / 2) {
386 ASSERT_OK(txn2->Put("k2" + ToString(i), "v" + ToString(i)));
387 }
388
389 if (i > 0) {
390 db_impl->TEST_SwitchWAL();
391 }
392 }
393
394 ASSERT_GT(txn1->GetLogNumber(), 0);
395 ASSERT_GT(txn2->GetLogNumber(), 0);
396
397 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
398 txn1->GetLogNumber());
399 ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
400
401 if (prepare) {
402 ASSERT_OK(txn1->Prepare());
403 ASSERT_OK(txn2->Prepare());
404 }
405
406 ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
407 ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
408
409 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
410 txn1->GetLogNumber());
411 if (commit) {
412 ASSERT_OK(txn1->Commit());
413 } else {
414 ASSERT_OK(txn1->Rollback());
415 }
416
417 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
418 txn2->GetLogNumber());
419
420 if (commit) {
421 ASSERT_OK(txn2->Commit());
422 } else {
423 ASSERT_OK(txn2->Rollback());
424 }
425
426 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
427
428 delete txn1;
429 delete txn2;
430 }
431 }
432}
433
434} // namespace rocksdb
435
436int main(int argc, char** argv) {
437 ::testing::InitGoogleTest(&argc, argv);
438 return RUN_ALL_TESTS();
439}
440
441#else
442#include <stdio.h>
443
444int main(int /*argc*/, char** /*argv*/) {
445 fprintf(stderr,
446 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
447 return 0;
448}
449
450#endif // ROCKSDB_LITE