]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #ifndef __STDC_FORMAT_MACROS | |
9 | #define __STDC_FORMAT_MACROS | |
10 | #endif | |
11 | ||
12 | #include "utilities/transactions/transaction_test.h" | |
13 | #include "utilities/transactions/write_unprepared_txn.h" | |
14 | #include "utilities/transactions/write_unprepared_txn_db.h" | |
15 | ||
16 | namespace rocksdb { | |
17 | ||
18 | class WriteUnpreparedTransactionTestBase : public TransactionTestBase { | |
19 | public: | |
20 | WriteUnpreparedTransactionTestBase(bool use_stackable_db, | |
21 | bool two_write_queue, | |
22 | TxnDBWritePolicy write_policy) | |
23 | : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){} | |
24 | }; | |
25 | ||
26 | class WriteUnpreparedTransactionTest | |
27 | : public WriteUnpreparedTransactionTestBase, | |
28 | virtual public ::testing::WithParamInterface< | |
29 | std::tuple<bool, bool, TxnDBWritePolicy>> { | |
30 | public: | |
31 | WriteUnpreparedTransactionTest() | |
32 | : WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()), | |
33 | std::get<1>(GetParam()), | |
34 | std::get<2>(GetParam())){} | |
35 | }; | |
36 | ||
37 | INSTANTIATE_TEST_CASE_P( | |
38 | WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest, | |
39 | ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED), | |
40 | std::make_tuple(false, true, WRITE_UNPREPARED))); | |
41 | ||
42 | TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { | |
43 | auto verify_state = [](Iterator* iter, const std::string& key, | |
44 | const std::string& value) { | |
45 | ASSERT_TRUE(iter->Valid()); | |
46 | ASSERT_OK(iter->status()); | |
47 | ASSERT_EQ(key, iter->key().ToString()); | |
48 | ASSERT_EQ(value, iter->value().ToString()); | |
49 | }; | |
50 | ||
51 | options.disable_auto_compactions = true; | |
52 | ReOpen(); | |
53 | ||
54 | // The following tests checks whether reading your own write for | |
55 | // a transaction works for write unprepared, when there are uncommitted | |
56 | // values written into DB. | |
57 | // | |
58 | // Although the values written by DB::Put are technically committed, we add | |
59 | // their seq num to unprep_seqs_ to pretend that they were written into DB | |
60 | // as part of an unprepared batch, and then check if they are visible to the | |
61 | // transaction. | |
62 | auto snapshot0 = db->GetSnapshot(); | |
63 | ASSERT_OK(db->Put(WriteOptions(), "a", "v1")); | |
64 | ASSERT_OK(db->Put(WriteOptions(), "b", "v2")); | |
65 | auto snapshot2 = db->GetSnapshot(); | |
66 | ASSERT_OK(db->Put(WriteOptions(), "a", "v3")); | |
67 | ASSERT_OK(db->Put(WriteOptions(), "b", "v4")); | |
68 | auto snapshot4 = db->GetSnapshot(); | |
69 | ASSERT_OK(db->Put(WriteOptions(), "a", "v5")); | |
70 | ASSERT_OK(db->Put(WriteOptions(), "b", "v6")); | |
71 | auto snapshot6 = db->GetSnapshot(); | |
72 | ASSERT_OK(db->Put(WriteOptions(), "a", "v7")); | |
73 | ASSERT_OK(db->Put(WriteOptions(), "b", "v8")); | |
74 | auto snapshot8 = db->GetSnapshot(); | |
75 | ||
76 | TransactionOptions txn_options; | |
77 | WriteOptions write_options; | |
78 | Transaction* txn = db->BeginTransaction(write_options, txn_options); | |
79 | WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn); | |
80 | ||
81 | ReadOptions roptions; | |
82 | roptions.snapshot = snapshot0; | |
83 | ||
494da23a TL |
84 | wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = |
85 | snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); | |
11fdf7f2 TL |
86 | auto iter = txn->GetIterator(roptions); |
87 | ||
88 | // Test Get(). | |
89 | std::string value; | |
11fdf7f2 TL |
90 | |
91 | ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); | |
92 | ASSERT_EQ(value, "v3"); | |
93 | ||
94 | ASSERT_OK(txn->Get(roptions, Slice("b"), &value)); | |
95 | ASSERT_EQ(value, "v4"); | |
96 | ||
97 | wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = | |
98 | snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); | |
494da23a TL |
99 | delete iter; |
100 | iter = txn->GetIterator(roptions); | |
11fdf7f2 TL |
101 | |
102 | ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); | |
103 | ASSERT_EQ(value, "v7"); | |
104 | ||
105 | ASSERT_OK(txn->Get(roptions, Slice("b"), &value)); | |
106 | ASSERT_EQ(value, "v8"); | |
107 | ||
108 | wup_txn->unprep_seqs_.clear(); | |
109 | ||
110 | // Test Next(). | |
111 | wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = | |
112 | snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); | |
494da23a TL |
113 | delete iter; |
114 | iter = txn->GetIterator(roptions); | |
11fdf7f2 TL |
115 | |
116 | iter->Seek("a"); | |
117 | verify_state(iter, "a", "v3"); | |
118 | ||
119 | iter->Next(); | |
120 | verify_state(iter, "b", "v4"); | |
121 | ||
122 | iter->SeekToFirst(); | |
123 | verify_state(iter, "a", "v3"); | |
124 | ||
125 | iter->Next(); | |
126 | verify_state(iter, "b", "v4"); | |
127 | ||
128 | wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = | |
129 | snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); | |
494da23a TL |
130 | delete iter; |
131 | iter = txn->GetIterator(roptions); | |
11fdf7f2 TL |
132 | |
133 | iter->Seek("a"); | |
134 | verify_state(iter, "a", "v7"); | |
135 | ||
136 | iter->Next(); | |
137 | verify_state(iter, "b", "v8"); | |
138 | ||
139 | iter->SeekToFirst(); | |
140 | verify_state(iter, "a", "v7"); | |
141 | ||
142 | iter->Next(); | |
143 | verify_state(iter, "b", "v8"); | |
144 | ||
145 | wup_txn->unprep_seqs_.clear(); | |
146 | ||
147 | // Test Prev(). For Prev(), we need to adjust the snapshot to match what is | |
148 | // possible in WriteUnpreparedTxn. | |
149 | // | |
150 | // Because of row locks and ValidateSnapshot, there cannot be any committed | |
151 | // entries after snapshot, but before the first prepared key. | |
11fdf7f2 | 152 | roptions.snapshot = snapshot2; |
11fdf7f2 TL |
153 | wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = |
154 | snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); | |
494da23a TL |
155 | delete iter; |
156 | iter = txn->GetIterator(roptions); | |
11fdf7f2 TL |
157 | |
158 | iter->SeekForPrev("b"); | |
159 | verify_state(iter, "b", "v4"); | |
160 | ||
161 | iter->Prev(); | |
162 | verify_state(iter, "a", "v3"); | |
163 | ||
164 | iter->SeekToLast(); | |
165 | verify_state(iter, "b", "v4"); | |
166 | ||
167 | iter->Prev(); | |
168 | verify_state(iter, "a", "v3"); | |
169 | ||
11fdf7f2 | 170 | roptions.snapshot = snapshot6; |
11fdf7f2 TL |
171 | wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = |
172 | snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); | |
494da23a TL |
173 | delete iter; |
174 | iter = txn->GetIterator(roptions); | |
11fdf7f2 TL |
175 | |
176 | iter->SeekForPrev("b"); | |
177 | verify_state(iter, "b", "v8"); | |
178 | ||
179 | iter->Prev(); | |
180 | verify_state(iter, "a", "v7"); | |
181 | ||
182 | iter->SeekToLast(); | |
183 | verify_state(iter, "b", "v8"); | |
184 | ||
185 | iter->Prev(); | |
186 | verify_state(iter, "a", "v7"); | |
187 | ||
188 | // Since the unprep_seqs_ data were faked for testing, we do not want the | |
189 | // destructor for the transaction to be rolling back data that did not | |
190 | // exist. | |
191 | wup_txn->unprep_seqs_.clear(); | |
192 | ||
193 | db->ReleaseSnapshot(snapshot0); | |
194 | db->ReleaseSnapshot(snapshot2); | |
195 | db->ReleaseSnapshot(snapshot4); | |
196 | db->ReleaseSnapshot(snapshot6); | |
197 | db->ReleaseSnapshot(snapshot8); | |
198 | delete iter; | |
199 | delete txn; | |
200 | } | |
201 | ||
202 | // This tests how write unprepared behaves during recovery when the DB crashes | |
203 | // after a transaction has either been unprepared or prepared, and tests if | |
204 | // the changes are correctly applied for prepared transactions if we decide to | |
205 | // rollback/commit. | |
206 | TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) { | |
207 | WriteOptions write_options; | |
208 | write_options.disableWAL = false; | |
209 | TransactionOptions txn_options; | |
210 | std::vector<Transaction*> prepared_trans; | |
211 | WriteUnpreparedTxnDB* wup_db; | |
212 | options.disable_auto_compactions = true; | |
213 | ||
214 | enum Action { UNPREPARED, ROLLBACK, COMMIT }; | |
215 | ||
216 | // batch_size of 1 causes writes to DB for every marker. | |
217 | for (size_t batch_size : {1, 1000000}) { | |
218 | txn_options.max_write_batch_size = batch_size; | |
219 | for (bool empty : {true, false}) { | |
220 | for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) { | |
221 | for (int num_batches = 1; num_batches < 10; num_batches++) { | |
222 | // Reset database. | |
223 | prepared_trans.clear(); | |
224 | ReOpen(); | |
225 | wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db); | |
226 | if (!empty) { | |
227 | for (int i = 0; i < num_batches; i++) { | |
228 | ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i), | |
229 | "before value" + ToString(i))); | |
230 | } | |
231 | } | |
232 | ||
233 | // Write num_batches unprepared batches. | |
234 | Transaction* txn = db->BeginTransaction(write_options, txn_options); | |
235 | WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn); | |
236 | txn->SetName("xid"); | |
237 | for (int i = 0; i < num_batches; i++) { | |
238 | ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i))); | |
239 | if (txn_options.max_write_batch_size == 1) { | |
240 | ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); | |
241 | } else { | |
242 | ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); | |
243 | } | |
244 | } | |
245 | if (a == UNPREPARED) { | |
246 | // This is done to prevent the destructor from rolling back the | |
247 | // transaction for us, since we want to pretend we crashed and | |
248 | // test that recovery does the rollback. | |
249 | wup_txn->unprep_seqs_.clear(); | |
250 | } else { | |
251 | txn->Prepare(); | |
252 | } | |
253 | delete txn; | |
254 | ||
255 | // Crash and run recovery code paths. | |
256 | wup_db->db_impl_->FlushWAL(true); | |
257 | wup_db->TEST_Crash(); | |
258 | ReOpenNoDelete(); | |
259 | assert(db != nullptr); | |
260 | ||
261 | db->GetAllPreparedTransactions(&prepared_trans); | |
262 | ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1); | |
263 | if (a == ROLLBACK) { | |
264 | ASSERT_OK(prepared_trans[0]->Rollback()); | |
265 | delete prepared_trans[0]; | |
266 | } else if (a == COMMIT) { | |
267 | ASSERT_OK(prepared_trans[0]->Commit()); | |
268 | delete prepared_trans[0]; | |
269 | } | |
270 | ||
271 | Iterator* iter = db->NewIterator(ReadOptions()); | |
272 | iter->SeekToFirst(); | |
273 | // Check that DB has before values. | |
274 | if (!empty || a == COMMIT) { | |
275 | for (int i = 0; i < num_batches; i++) { | |
276 | ASSERT_TRUE(iter->Valid()); | |
277 | ASSERT_EQ(iter->key().ToString(), "k" + ToString(i)); | |
278 | if (a == COMMIT) { | |
279 | ASSERT_EQ(iter->value().ToString(), "value" + ToString(i)); | |
280 | } else { | |
281 | ASSERT_EQ(iter->value().ToString(), | |
282 | "before value" + ToString(i)); | |
283 | } | |
284 | iter->Next(); | |
285 | } | |
286 | } | |
287 | ASSERT_FALSE(iter->Valid()); | |
288 | delete iter; | |
289 | } | |
290 | } | |
291 | } | |
292 | } | |
293 | } | |
294 | ||
295 | // Basic test to see that unprepared batch gets written to DB when batch size | |
296 | // is exceeded. It also does some basic checks to see if commit/rollback works | |
297 | // as expected for write unprepared. | |
298 | TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) { | |
299 | WriteOptions write_options; | |
300 | TransactionOptions txn_options; | |
301 | const int kNumKeys = 10; | |
302 | ||
303 | // batch_size of 1 causes writes to DB for every marker. | |
304 | for (size_t batch_size : {1, 1000000}) { | |
305 | txn_options.max_write_batch_size = batch_size; | |
306 | for (bool prepare : {false, true}) { | |
307 | for (bool commit : {false, true}) { | |
308 | ReOpen(); | |
309 | Transaction* txn = db->BeginTransaction(write_options, txn_options); | |
310 | WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn); | |
311 | txn->SetName("xid"); | |
312 | ||
313 | for (int i = 0; i < kNumKeys; i++) { | |
314 | txn->Put("k" + ToString(i), "v" + ToString(i)); | |
315 | if (txn_options.max_write_batch_size == 1) { | |
316 | ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); | |
317 | } else { | |
318 | ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); | |
319 | } | |
320 | } | |
321 | ||
322 | if (prepare) { | |
323 | ASSERT_OK(txn->Prepare()); | |
324 | } | |
325 | ||
326 | Iterator* iter = db->NewIterator(ReadOptions()); | |
327 | iter->SeekToFirst(); | |
328 | assert(!iter->Valid()); | |
329 | ASSERT_FALSE(iter->Valid()); | |
330 | delete iter; | |
331 | ||
332 | if (commit) { | |
333 | ASSERT_OK(txn->Commit()); | |
334 | } else { | |
335 | ASSERT_OK(txn->Rollback()); | |
336 | } | |
337 | delete txn; | |
338 | ||
339 | iter = db->NewIterator(ReadOptions()); | |
340 | iter->SeekToFirst(); | |
341 | ||
342 | for (int i = 0; i < (commit ? kNumKeys : 0); i++) { | |
343 | ASSERT_TRUE(iter->Valid()); | |
344 | ASSERT_EQ(iter->key().ToString(), "k" + ToString(i)); | |
345 | ASSERT_EQ(iter->value().ToString(), "v" + ToString(i)); | |
346 | iter->Next(); | |
347 | } | |
348 | ASSERT_FALSE(iter->Valid()); | |
349 | delete iter; | |
350 | } | |
351 | } | |
352 | } | |
353 | } | |
354 | ||
355 | // Test whether logs containing unprepared/prepared batches are kept even | |
356 | // after memtable finishes flushing, and whether they are removed when | |
357 | // transaction commits/aborts. | |
358 | // | |
359 | // TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests. | |
360 | TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) { | |
361 | WriteOptions write_options; | |
362 | TransactionOptions txn_options; | |
363 | // batch_size of 1 causes writes to DB for every marker. | |
364 | txn_options.max_write_batch_size = 1; | |
365 | const int kNumKeys = 10; | |
366 | ||
367 | WriteOptions wopts; | |
368 | wopts.sync = true; | |
369 | ||
370 | for (bool prepare : {false, true}) { | |
371 | for (bool commit : {false, true}) { | |
372 | ReOpen(); | |
373 | auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db); | |
374 | auto db_impl = wup_db->db_impl_; | |
375 | ||
376 | Transaction* txn1 = db->BeginTransaction(write_options, txn_options); | |
377 | ASSERT_OK(txn1->SetName("xid1")); | |
378 | ||
379 | Transaction* txn2 = db->BeginTransaction(write_options, txn_options); | |
380 | ASSERT_OK(txn2->SetName("xid2")); | |
381 | ||
382 | // Spread this transaction across multiple log files. | |
383 | for (int i = 0; i < kNumKeys; i++) { | |
384 | ASSERT_OK(txn1->Put("k1" + ToString(i), "v" + ToString(i))); | |
385 | if (i >= kNumKeys / 2) { | |
386 | ASSERT_OK(txn2->Put("k2" + ToString(i), "v" + ToString(i))); | |
387 | } | |
388 | ||
389 | if (i > 0) { | |
390 | db_impl->TEST_SwitchWAL(); | |
391 | } | |
392 | } | |
393 | ||
394 | ASSERT_GT(txn1->GetLogNumber(), 0); | |
395 | ASSERT_GT(txn2->GetLogNumber(), 0); | |
396 | ||
397 | ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), | |
398 | txn1->GetLogNumber()); | |
399 | ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); | |
400 | ||
401 | if (prepare) { | |
402 | ASSERT_OK(txn1->Prepare()); | |
403 | ASSERT_OK(txn2->Prepare()); | |
404 | } | |
405 | ||
406 | ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); | |
407 | ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); | |
408 | ||
409 | ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), | |
410 | txn1->GetLogNumber()); | |
411 | if (commit) { | |
412 | ASSERT_OK(txn1->Commit()); | |
413 | } else { | |
414 | ASSERT_OK(txn1->Rollback()); | |
415 | } | |
416 | ||
417 | ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), | |
418 | txn2->GetLogNumber()); | |
419 | ||
420 | if (commit) { | |
421 | ASSERT_OK(txn2->Commit()); | |
422 | } else { | |
423 | ASSERT_OK(txn2->Rollback()); | |
424 | } | |
425 | ||
426 | ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); | |
427 | ||
428 | delete txn1; | |
429 | delete txn2; | |
430 | } | |
431 | } | |
432 | } | |
433 | ||
434 | } // namespace rocksdb | |
435 | ||
436 | int main(int argc, char** argv) { | |
437 | ::testing::InitGoogleTest(&argc, argv); | |
438 | return RUN_ALL_TESTS(); | |
439 | } | |
440 | ||
441 | #else | |
442 | #include <stdio.h> | |
443 | ||
444 | int main(int /*argc*/, char** /*argv*/) { | |
445 | fprintf(stderr, | |
446 | "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n"); | |
447 | return 0; | |
448 | } | |
449 | ||
450 | #endif // ROCKSDB_LITE |