]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
11fdf7f2 TL |
8 | #include "utilities/transactions/transaction_test.h" |
9 | #include "utilities/transactions/write_unprepared_txn.h" | |
10 | #include "utilities/transactions/write_unprepared_txn_db.h" | |
11 | ||
f67539c2 | 12 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
13 | |
14 | class WriteUnpreparedTransactionTestBase : public TransactionTestBase { | |
15 | public: | |
16 | WriteUnpreparedTransactionTestBase(bool use_stackable_db, | |
17 | bool two_write_queue, | |
18 | TxnDBWritePolicy write_policy) | |
f67539c2 TL |
19 | : TransactionTestBase(use_stackable_db, two_write_queue, write_policy, |
20 | kOrderedWrite) {} | |
11fdf7f2 TL |
21 | }; |
22 | ||
23 | class WriteUnpreparedTransactionTest | |
24 | : public WriteUnpreparedTransactionTestBase, | |
25 | virtual public ::testing::WithParamInterface< | |
26 | std::tuple<bool, bool, TxnDBWritePolicy>> { | |
27 | public: | |
28 | WriteUnpreparedTransactionTest() | |
29 | : WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()), | |
30 | std::get<1>(GetParam()), | |
31 | std::get<2>(GetParam())){} | |
32 | }; | |
33 | ||
34 | INSTANTIATE_TEST_CASE_P( | |
35 | WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest, | |
36 | ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED), | |
37 | std::make_tuple(false, true, WRITE_UNPREPARED))); | |
38 | ||
f67539c2 TL |
39 | enum StressAction { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT }; |
40 | class WriteUnpreparedStressTest : public WriteUnpreparedTransactionTestBase, | |
41 | virtual public ::testing::WithParamInterface< | |
42 | std::tuple<bool, StressAction>> { | |
43 | public: | |
44 | WriteUnpreparedStressTest() | |
45 | : WriteUnpreparedTransactionTestBase(false, std::get<0>(GetParam()), | |
46 | WRITE_UNPREPARED), | |
47 | action_(std::get<1>(GetParam())) {} | |
48 | StressAction action_; | |
49 | }; | |
50 | ||
51 | INSTANTIATE_TEST_CASE_P( | |
52 | WriteUnpreparedStressTest, WriteUnpreparedStressTest, | |
53 | ::testing::Values(std::make_tuple(false, NO_SNAPSHOT), | |
54 | std::make_tuple(false, RO_SNAPSHOT), | |
55 | std::make_tuple(false, REFRESH_SNAPSHOT), | |
56 | std::make_tuple(true, NO_SNAPSHOT), | |
57 | std::make_tuple(true, RO_SNAPSHOT), | |
58 | std::make_tuple(true, REFRESH_SNAPSHOT))); | |
59 | ||
11fdf7f2 | 60 | TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { |
f67539c2 TL |
61 | // The following tests checks whether reading your own write for |
62 | // a transaction works for write unprepared, when there are uncommitted | |
63 | // values written into DB. | |
11fdf7f2 TL |
64 | auto verify_state = [](Iterator* iter, const std::string& key, |
65 | const std::string& value) { | |
66 | ASSERT_TRUE(iter->Valid()); | |
67 | ASSERT_OK(iter->status()); | |
68 | ASSERT_EQ(key, iter->key().ToString()); | |
69 | ASSERT_EQ(value, iter->value().ToString()); | |
70 | }; | |
71 | ||
f67539c2 TL |
72 | // Test always reseeking vs never reseeking. |
73 | for (uint64_t max_skip : {0, std::numeric_limits<int>::max()}) { | |
74 | options.max_sequential_skip_in_iterations = max_skip; | |
75 | options.disable_auto_compactions = true; | |
76 | ReOpen(); | |
11fdf7f2 | 77 | |
f67539c2 TL |
78 | TransactionOptions txn_options; |
79 | WriteOptions woptions; | |
80 | ReadOptions roptions; | |
11fdf7f2 | 81 | |
f67539c2 TL |
82 | ASSERT_OK(db->Put(woptions, "a", "")); |
83 | ASSERT_OK(db->Put(woptions, "b", "")); | |
11fdf7f2 | 84 | |
f67539c2 TL |
85 | Transaction* txn = db->BeginTransaction(woptions, txn_options); |
86 | WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn); | |
87 | txn->SetSnapshot(); | |
11fdf7f2 | 88 | |
f67539c2 TL |
89 | for (int i = 0; i < 5; i++) { |
90 | std::string stored_value = "v" + ToString(i); | |
91 | ASSERT_OK(txn->Put("a", stored_value)); | |
92 | ASSERT_OK(txn->Put("b", stored_value)); | |
93 | wup_txn->FlushWriteBatchToDB(false); | |
11fdf7f2 | 94 | |
f67539c2 TL |
95 | // Test Get() |
96 | std::string value; | |
97 | ASSERT_OK(txn->Get(roptions, "a", &value)); | |
98 | ASSERT_EQ(value, stored_value); | |
99 | ASSERT_OK(txn->Get(roptions, "b", &value)); | |
100 | ASSERT_EQ(value, stored_value); | |
11fdf7f2 | 101 | |
f67539c2 TL |
102 | // Test Next() |
103 | auto iter = txn->GetIterator(roptions); | |
104 | iter->Seek("a"); | |
105 | verify_state(iter, "a", stored_value); | |
11fdf7f2 | 106 | |
f67539c2 TL |
107 | iter->Next(); |
108 | verify_state(iter, "b", stored_value); | |
11fdf7f2 | 109 | |
f67539c2 TL |
110 | iter->SeekToFirst(); |
111 | verify_state(iter, "a", stored_value); | |
11fdf7f2 | 112 | |
f67539c2 TL |
113 | iter->Next(); |
114 | verify_state(iter, "b", stored_value); | |
11fdf7f2 | 115 | |
f67539c2 | 116 | delete iter; |
11fdf7f2 | 117 | |
f67539c2 TL |
118 | // Test Prev() |
119 | iter = txn->GetIterator(roptions); | |
120 | iter->SeekForPrev("b"); | |
121 | verify_state(iter, "b", stored_value); | |
11fdf7f2 | 122 | |
f67539c2 TL |
123 | iter->Prev(); |
124 | verify_state(iter, "a", stored_value); | |
11fdf7f2 | 125 | |
f67539c2 TL |
126 | iter->SeekToLast(); |
127 | verify_state(iter, "b", stored_value); | |
11fdf7f2 | 128 | |
f67539c2 TL |
129 | iter->Prev(); |
130 | verify_state(iter, "a", stored_value); | |
11fdf7f2 | 131 | |
f67539c2 TL |
132 | delete iter; |
133 | } | |
11fdf7f2 | 134 | |
f67539c2 TL |
135 | delete txn; |
136 | } | |
137 | } | |
11fdf7f2 | 138 | |
f67539c2 TL |
139 | #ifndef ROCKSDB_VALGRIND_RUN |
140 | TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) { | |
141 | // This is a stress test where different threads are writing random keys, and | |
142 | // then before committing or aborting the transaction, it validates to see | |
143 | // that it can read the keys it wrote, and the keys it did not write respect | |
144 | // the snapshot. To avoid row lock contention (and simply stressing the | |
145 | // locking system), each thread is mostly only writing to its own set of keys. | |
146 | const uint32_t kNumIter = 1000; | |
147 | const uint32_t kNumThreads = 10; | |
148 | const uint32_t kNumKeys = 5; | |
149 | ||
f67539c2 TL |
150 | // Test with |
151 | // 1. no snapshots set | |
152 | // 2. snapshot set on ReadOptions | |
153 | // 3. snapshot set, and refreshing after every write. | |
154 | StressAction a = action_; | |
155 | WriteOptions write_options; | |
156 | txn_db_options.transaction_lock_timeout = -1; | |
157 | options.disable_auto_compactions = true; | |
158 | ReOpen(); | |
11fdf7f2 | 159 | |
f67539c2 TL |
160 | std::vector<std::string> keys; |
161 | for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) { | |
162 | keys.push_back("k" + ToString(k)); | |
163 | } | |
20effc67 | 164 | RandomShuffle(keys.begin(), keys.end()); |
f67539c2 TL |
165 | |
166 | // This counter will act as a "sequence number" to help us validate | |
167 | // visibility logic with snapshots. If we had direct access to the seqno of | |
168 | // snapshots and key/values, then we should directly compare those instead. | |
169 | std::atomic<int64_t> counter(0); | |
170 | ||
171 | std::function<void(uint32_t)> stress_thread = [&](int id) { | |
172 | size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id()); | |
173 | Random64 rnd(static_cast<uint32_t>(tid)); | |
174 | ||
175 | Transaction* txn; | |
176 | TransactionOptions txn_options; | |
177 | // batch_size of 1 causes writes to DB for every marker. | |
178 | txn_options.write_batch_flush_threshold = 1; | |
179 | ReadOptions read_options; | |
180 | ||
181 | for (uint32_t i = 0; i < kNumIter; i++) { | |
20effc67 TL |
182 | std::set<std::string> owned_keys(keys.begin() + id * kNumKeys, |
183 | keys.begin() + (id + 1) * kNumKeys); | |
f67539c2 TL |
184 | // Add unowned keys to make the workload more interesting, but this |
185 | // increases row lock contention, so just do it sometimes. | |
186 | if (rnd.OneIn(2)) { | |
187 | owned_keys.insert(keys[rnd.Uniform(kNumKeys * kNumThreads)]); | |
188 | } | |
11fdf7f2 | 189 | |
f67539c2 TL |
190 | txn = db->BeginTransaction(write_options, txn_options); |
191 | txn->SetName(ToString(id)); | |
192 | txn->SetSnapshot(); | |
193 | if (a >= RO_SNAPSHOT) { | |
194 | read_options.snapshot = txn->GetSnapshot(); | |
195 | ASSERT_TRUE(read_options.snapshot != nullptr); | |
196 | } | |
11fdf7f2 | 197 | |
f67539c2 TL |
198 | uint64_t buf[2]; |
199 | buf[0] = id; | |
11fdf7f2 | 200 | |
f67539c2 TL |
201 | // When scanning through the database, make sure that all unprepared |
202 | // keys have value >= snapshot and all other keys have value < snapshot. | |
203 | int64_t snapshot_num = counter.fetch_add(1); | |
11fdf7f2 | 204 | |
f67539c2 TL |
205 | Status s; |
206 | for (const auto& key : owned_keys) { | |
207 | buf[1] = counter.fetch_add(1); | |
208 | s = txn->Put(key, Slice((const char*)buf, sizeof(buf))); | |
209 | if (!s.ok()) { | |
210 | break; | |
211 | } | |
212 | if (a == REFRESH_SNAPSHOT) { | |
213 | txn->SetSnapshot(); | |
214 | read_options.snapshot = txn->GetSnapshot(); | |
215 | snapshot_num = counter.fetch_add(1); | |
216 | } | |
217 | } | |
11fdf7f2 | 218 | |
f67539c2 TL |
219 | // Failure is possible due to snapshot validation. In this case, |
220 | // rollback and move onto next iteration. | |
221 | if (!s.ok()) { | |
222 | ASSERT_TRUE(s.IsBusy()); | |
223 | ASSERT_OK(txn->Rollback()); | |
224 | delete txn; | |
225 | continue; | |
226 | } | |
11fdf7f2 | 227 | |
f67539c2 TL |
228 | auto verify_key = [&owned_keys, &a, &id, &snapshot_num]( |
229 | const std::string& key, const std::string& value) { | |
230 | if (owned_keys.count(key) > 0) { | |
231 | ASSERT_EQ(value.size(), 16); | |
232 | ||
233 | // Since this key is part of owned_keys, then this key must be | |
234 | // unprepared by this transaction identified by 'id' | |
235 | ASSERT_EQ(((int64_t*)value.c_str())[0], id); | |
236 | if (a == REFRESH_SNAPSHOT) { | |
237 | // If refresh snapshot is true, then the snapshot is refreshed | |
238 | // after every Put(), meaning that the current snapshot in | |
239 | // snapshot_num must be greater than the "seqno" of any keys | |
240 | // written by the current transaction. | |
241 | ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num); | |
242 | } else { | |
243 | // If refresh snapshot is not on, then the snapshot was taken at | |
244 | // the beginning of the transaction, meaning all writes must come | |
245 | // after snapshot_num | |
246 | ASSERT_GT(((int64_t*)value.c_str())[1], snapshot_num); | |
247 | } | |
248 | } else if (a >= RO_SNAPSHOT) { | |
249 | // If this is not an unprepared key, just assert that the key | |
250 | // "seqno" is smaller than the snapshot seqno. | |
251 | ASSERT_EQ(value.size(), 16); | |
252 | ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num); | |
253 | } | |
254 | }; | |
255 | ||
256 | // Validate Get()/Next()/Prev(). Do only one of them to save time, and | |
257 | // reduce lock contention. | |
258 | switch (rnd.Uniform(3)) { | |
259 | case 0: // Validate Get() | |
260 | { | |
261 | for (const auto& key : keys) { | |
262 | std::string value; | |
263 | s = txn->Get(read_options, Slice(key), &value); | |
264 | if (!s.ok()) { | |
265 | ASSERT_TRUE(s.IsNotFound()); | |
266 | ASSERT_EQ(owned_keys.count(key), 0); | |
267 | } else { | |
268 | verify_key(key, value); | |
269 | } | |
270 | } | |
271 | break; | |
272 | } | |
273 | case 1: // Validate Next() | |
274 | { | |
275 | Iterator* iter = txn->GetIterator(read_options); | |
276 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
277 | verify_key(iter->key().ToString(), iter->value().ToString()); | |
278 | } | |
279 | delete iter; | |
280 | break; | |
281 | } | |
282 | case 2: // Validate Prev() | |
283 | { | |
284 | Iterator* iter = txn->GetIterator(read_options); | |
285 | for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { | |
286 | verify_key(iter->key().ToString(), iter->value().ToString()); | |
287 | } | |
288 | delete iter; | |
289 | break; | |
290 | } | |
291 | default: | |
292 | ASSERT_TRUE(false); | |
293 | } | |
11fdf7f2 | 294 | |
f67539c2 TL |
295 | if (rnd.OneIn(2)) { |
296 | ASSERT_OK(txn->Commit()); | |
297 | } else { | |
298 | ASSERT_OK(txn->Rollback()); | |
299 | } | |
300 | delete txn; | |
301 | } | |
302 | }; | |
11fdf7f2 | 303 | |
f67539c2 TL |
304 | std::vector<port::Thread> threads; |
305 | for (uint32_t i = 0; i < kNumThreads; i++) { | |
306 | threads.emplace_back(stress_thread, i); | |
307 | } | |
11fdf7f2 | 308 | |
f67539c2 TL |
309 | for (auto& t : threads) { |
310 | t.join(); | |
311 | } | |
11fdf7f2 | 312 | } |
f67539c2 | 313 | #endif // ROCKSDB_VALGRIND_RUN |
11fdf7f2 TL |
314 | |
315 | // This tests how write unprepared behaves during recovery when the DB crashes | |
316 | // after a transaction has either been unprepared or prepared, and tests if | |
317 | // the changes are correctly applied for prepared transactions if we decide to | |
318 | // rollback/commit. | |
319 | TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) { | |
320 | WriteOptions write_options; | |
321 | write_options.disableWAL = false; | |
322 | TransactionOptions txn_options; | |
323 | std::vector<Transaction*> prepared_trans; | |
324 | WriteUnpreparedTxnDB* wup_db; | |
325 | options.disable_auto_compactions = true; | |
326 | ||
327 | enum Action { UNPREPARED, ROLLBACK, COMMIT }; | |
328 | ||
329 | // batch_size of 1 causes writes to DB for every marker. | |
330 | for (size_t batch_size : {1, 1000000}) { | |
f67539c2 | 331 | txn_options.write_batch_flush_threshold = batch_size; |
11fdf7f2 TL |
332 | for (bool empty : {true, false}) { |
333 | for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) { | |
334 | for (int num_batches = 1; num_batches < 10; num_batches++) { | |
335 | // Reset database. | |
336 | prepared_trans.clear(); | |
337 | ReOpen(); | |
338 | wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db); | |
339 | if (!empty) { | |
340 | for (int i = 0; i < num_batches; i++) { | |
341 | ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i), | |
342 | "before value" + ToString(i))); | |
343 | } | |
344 | } | |
345 | ||
346 | // Write num_batches unprepared batches. | |
347 | Transaction* txn = db->BeginTransaction(write_options, txn_options); | |
348 | WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn); | |
349 | txn->SetName("xid"); | |
350 | for (int i = 0; i < num_batches; i++) { | |
351 | ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i))); | |
f67539c2 TL |
352 | if (txn_options.write_batch_flush_threshold == 1) { |
353 | // WriteUnprepared will check write_batch_flush_threshold and | |
354 | // possibly flush before appending to the write batch. No flush | |
355 | // will happen at the first write because the batch is still | |
356 | // empty, so after k puts, there should be k-1 flushed batches. | |
357 | ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i); | |
11fdf7f2 TL |
358 | } else { |
359 | ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); | |
360 | } | |
361 | } | |
362 | if (a == UNPREPARED) { | |
363 | // This is done to prevent the destructor from rolling back the | |
364 | // transaction for us, since we want to pretend we crashed and | |
365 | // test that recovery does the rollback. | |
366 | wup_txn->unprep_seqs_.clear(); | |
367 | } else { | |
368 | txn->Prepare(); | |
369 | } | |
370 | delete txn; | |
371 | ||
372 | // Crash and run recovery code paths. | |
373 | wup_db->db_impl_->FlushWAL(true); | |
374 | wup_db->TEST_Crash(); | |
375 | ReOpenNoDelete(); | |
376 | assert(db != nullptr); | |
377 | ||
378 | db->GetAllPreparedTransactions(&prepared_trans); | |
379 | ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1); | |
380 | if (a == ROLLBACK) { | |
381 | ASSERT_OK(prepared_trans[0]->Rollback()); | |
382 | delete prepared_trans[0]; | |
383 | } else if (a == COMMIT) { | |
384 | ASSERT_OK(prepared_trans[0]->Commit()); | |
385 | delete prepared_trans[0]; | |
386 | } | |
387 | ||
388 | Iterator* iter = db->NewIterator(ReadOptions()); | |
389 | iter->SeekToFirst(); | |
390 | // Check that DB has before values. | |
391 | if (!empty || a == COMMIT) { | |
392 | for (int i = 0; i < num_batches; i++) { | |
393 | ASSERT_TRUE(iter->Valid()); | |
394 | ASSERT_EQ(iter->key().ToString(), "k" + ToString(i)); | |
395 | if (a == COMMIT) { | |
396 | ASSERT_EQ(iter->value().ToString(), "value" + ToString(i)); | |
397 | } else { | |
398 | ASSERT_EQ(iter->value().ToString(), | |
399 | "before value" + ToString(i)); | |
400 | } | |
401 | iter->Next(); | |
402 | } | |
403 | } | |
404 | ASSERT_FALSE(iter->Valid()); | |
405 | delete iter; | |
406 | } | |
407 | } | |
408 | } | |
409 | } | |
410 | } | |
411 | ||
412 | // Basic test to see that unprepared batch gets written to DB when batch size | |
413 | // is exceeded. It also does some basic checks to see if commit/rollback works | |
414 | // as expected for write unprepared. | |
415 | TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) { | |
416 | WriteOptions write_options; | |
417 | TransactionOptions txn_options; | |
418 | const int kNumKeys = 10; | |
419 | ||
420 | // batch_size of 1 causes writes to DB for every marker. | |
421 | for (size_t batch_size : {1, 1000000}) { | |
f67539c2 | 422 | txn_options.write_batch_flush_threshold = batch_size; |
11fdf7f2 TL |
423 | for (bool prepare : {false, true}) { |
424 | for (bool commit : {false, true}) { | |
425 | ReOpen(); | |
426 | Transaction* txn = db->BeginTransaction(write_options, txn_options); | |
427 | WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn); | |
428 | txn->SetName("xid"); | |
429 | ||
430 | for (int i = 0; i < kNumKeys; i++) { | |
431 | txn->Put("k" + ToString(i), "v" + ToString(i)); | |
f67539c2 TL |
432 | if (txn_options.write_batch_flush_threshold == 1) { |
433 | // WriteUnprepared will check write_batch_flush_threshold and | |
434 | // possibly flush before appending to the write batch. No flush will | |
435 | // happen at the first write because the batch is still empty, so | |
436 | // after k puts, there should be k-1 flushed batches. | |
437 | ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i); | |
11fdf7f2 TL |
438 | } else { |
439 | ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); | |
440 | } | |
441 | } | |
442 | ||
443 | if (prepare) { | |
444 | ASSERT_OK(txn->Prepare()); | |
445 | } | |
446 | ||
447 | Iterator* iter = db->NewIterator(ReadOptions()); | |
448 | iter->SeekToFirst(); | |
449 | assert(!iter->Valid()); | |
450 | ASSERT_FALSE(iter->Valid()); | |
451 | delete iter; | |
452 | ||
453 | if (commit) { | |
454 | ASSERT_OK(txn->Commit()); | |
455 | } else { | |
456 | ASSERT_OK(txn->Rollback()); | |
457 | } | |
458 | delete txn; | |
459 | ||
460 | iter = db->NewIterator(ReadOptions()); | |
461 | iter->SeekToFirst(); | |
462 | ||
463 | for (int i = 0; i < (commit ? kNumKeys : 0); i++) { | |
464 | ASSERT_TRUE(iter->Valid()); | |
465 | ASSERT_EQ(iter->key().ToString(), "k" + ToString(i)); | |
466 | ASSERT_EQ(iter->value().ToString(), "v" + ToString(i)); | |
467 | iter->Next(); | |
468 | } | |
469 | ASSERT_FALSE(iter->Valid()); | |
470 | delete iter; | |
471 | } | |
472 | } | |
473 | } | |
474 | } | |
475 | ||
476 | // Test whether logs containing unprepared/prepared batches are kept even | |
477 | // after memtable finishes flushing, and whether they are removed when | |
478 | // transaction commits/aborts. | |
479 | // | |
480 | // TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests. | |
481 | TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) { | |
482 | WriteOptions write_options; | |
483 | TransactionOptions txn_options; | |
484 | // batch_size of 1 causes writes to DB for every marker. | |
f67539c2 | 485 | txn_options.write_batch_flush_threshold = 1; |
11fdf7f2 TL |
486 | const int kNumKeys = 10; |
487 | ||
488 | WriteOptions wopts; | |
489 | wopts.sync = true; | |
490 | ||
491 | for (bool prepare : {false, true}) { | |
492 | for (bool commit : {false, true}) { | |
493 | ReOpen(); | |
494 | auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db); | |
495 | auto db_impl = wup_db->db_impl_; | |
496 | ||
497 | Transaction* txn1 = db->BeginTransaction(write_options, txn_options); | |
498 | ASSERT_OK(txn1->SetName("xid1")); | |
499 | ||
500 | Transaction* txn2 = db->BeginTransaction(write_options, txn_options); | |
501 | ASSERT_OK(txn2->SetName("xid2")); | |
502 | ||
503 | // Spread this transaction across multiple log files. | |
504 | for (int i = 0; i < kNumKeys; i++) { | |
505 | ASSERT_OK(txn1->Put("k1" + ToString(i), "v" + ToString(i))); | |
506 | if (i >= kNumKeys / 2) { | |
507 | ASSERT_OK(txn2->Put("k2" + ToString(i), "v" + ToString(i))); | |
508 | } | |
509 | ||
510 | if (i > 0) { | |
511 | db_impl->TEST_SwitchWAL(); | |
512 | } | |
513 | } | |
514 | ||
515 | ASSERT_GT(txn1->GetLogNumber(), 0); | |
516 | ASSERT_GT(txn2->GetLogNumber(), 0); | |
517 | ||
518 | ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), | |
519 | txn1->GetLogNumber()); | |
520 | ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); | |
521 | ||
522 | if (prepare) { | |
523 | ASSERT_OK(txn1->Prepare()); | |
524 | ASSERT_OK(txn2->Prepare()); | |
525 | } | |
526 | ||
527 | ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); | |
528 | ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); | |
529 | ||
530 | ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), | |
531 | txn1->GetLogNumber()); | |
532 | if (commit) { | |
533 | ASSERT_OK(txn1->Commit()); | |
534 | } else { | |
535 | ASSERT_OK(txn1->Rollback()); | |
536 | } | |
537 | ||
538 | ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), | |
539 | txn2->GetLogNumber()); | |
540 | ||
541 | if (commit) { | |
542 | ASSERT_OK(txn2->Commit()); | |
543 | } else { | |
544 | ASSERT_OK(txn2->Rollback()); | |
545 | } | |
546 | ||
547 | ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); | |
548 | ||
549 | delete txn1; | |
550 | delete txn2; | |
551 | } | |
552 | } | |
553 | } | |
554 | ||
f67539c2 TL |
555 | TEST_P(WriteUnpreparedTransactionTest, NoSnapshotWrite) { |
556 | WriteOptions woptions; | |
557 | TransactionOptions txn_options; | |
558 | txn_options.write_batch_flush_threshold = 1; | |
559 | ||
560 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
561 | ||
562 | // Do some writes with no snapshot | |
563 | ASSERT_OK(txn->Put("a", "a")); | |
564 | ASSERT_OK(txn->Put("b", "b")); | |
565 | ASSERT_OK(txn->Put("c", "c")); | |
566 | ||
567 | // Test that it is still possible to create iterators after writes with no | |
568 | // snapshot, if iterator snapshot is fresh enough. | |
569 | ReadOptions roptions; | |
570 | auto iter = txn->GetIterator(roptions); | |
571 | int keys = 0; | |
572 | for (iter->SeekToLast(); iter->Valid(); iter->Prev(), keys++) { | |
573 | ASSERT_OK(iter->status()); | |
574 | ASSERT_EQ(iter->key().ToString(), iter->value().ToString()); | |
575 | } | |
576 | ASSERT_EQ(keys, 3); | |
577 | ||
578 | delete iter; | |
579 | delete txn; | |
580 | } | |
581 | ||
582 | // Test whether write to a transaction while iterating is supported. | |
583 | TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) { | |
584 | WriteOptions woptions; | |
585 | TransactionOptions txn_options; | |
586 | txn_options.write_batch_flush_threshold = 1; | |
587 | ||
588 | enum Action { DO_DELETE, DO_UPDATE }; | |
589 | ||
590 | for (Action a : {DO_DELETE, DO_UPDATE}) { | |
591 | for (int i = 0; i < 100; i++) { | |
592 | ASSERT_OK(db->Put(woptions, ToString(i), ToString(i))); | |
593 | } | |
594 | ||
595 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
596 | // write_batch_ now contains 1 key. | |
597 | ASSERT_OK(txn->Put("9", "a")); | |
598 | ||
599 | ReadOptions roptions; | |
600 | auto iter = txn->GetIterator(roptions); | |
601 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
602 | ASSERT_OK(iter->status()); | |
603 | if (iter->key() == "9") { | |
604 | ASSERT_EQ(iter->value().ToString(), "a"); | |
605 | } else { | |
606 | ASSERT_EQ(iter->key().ToString(), iter->value().ToString()); | |
607 | } | |
608 | ||
609 | if (a == DO_DELETE) { | |
610 | ASSERT_OK(txn->Delete(iter->key())); | |
611 | } else { | |
612 | ASSERT_OK(txn->Put(iter->key(), "b")); | |
613 | } | |
614 | } | |
615 | ||
616 | delete iter; | |
617 | ASSERT_OK(txn->Commit()); | |
618 | ||
619 | iter = db->NewIterator(roptions); | |
620 | if (a == DO_DELETE) { | |
621 | // Check that db is empty. | |
622 | iter->SeekToFirst(); | |
623 | ASSERT_FALSE(iter->Valid()); | |
624 | } else { | |
625 | int keys = 0; | |
626 | // Check that all values are updated to b. | |
627 | for (iter->SeekToFirst(); iter->Valid(); iter->Next(), keys++) { | |
628 | ASSERT_OK(iter->status()); | |
629 | ASSERT_EQ(iter->value().ToString(), "b"); | |
630 | } | |
631 | ASSERT_EQ(keys, 100); | |
632 | } | |
633 | ||
634 | delete iter; | |
635 | delete txn; | |
636 | } | |
637 | } | |
638 | ||
639 | TEST_P(WriteUnpreparedTransactionTest, SavePoint) { | |
640 | WriteOptions woptions; | |
641 | TransactionOptions txn_options; | |
642 | txn_options.write_batch_flush_threshold = 1; | |
643 | ||
644 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
645 | txn->SetSavePoint(); | |
646 | ASSERT_OK(txn->Put("a", "a")); | |
647 | ASSERT_OK(txn->Put("b", "b")); | |
648 | ASSERT_OK(txn->Commit()); | |
649 | ||
650 | ReadOptions roptions; | |
651 | std::string value; | |
652 | ASSERT_OK(txn->Get(roptions, "a", &value)); | |
653 | ASSERT_EQ(value, "a"); | |
654 | ASSERT_OK(txn->Get(roptions, "b", &value)); | |
655 | ASSERT_EQ(value, "b"); | |
656 | delete txn; | |
657 | } | |
658 | ||
659 | TEST_P(WriteUnpreparedTransactionTest, UntrackedKeys) { | |
660 | WriteOptions woptions; | |
661 | TransactionOptions txn_options; | |
662 | txn_options.write_batch_flush_threshold = 1; | |
663 | ||
664 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
665 | auto wb = txn->GetWriteBatch()->GetWriteBatch(); | |
666 | ASSERT_OK(txn->Put("a", "a")); | |
667 | ASSERT_OK(wb->Put("a_untrack", "a_untrack")); | |
668 | txn->SetSavePoint(); | |
669 | ASSERT_OK(txn->Put("b", "b")); | |
670 | ASSERT_OK(txn->Put("b_untrack", "b_untrack")); | |
671 | ||
672 | ReadOptions roptions; | |
673 | std::string value; | |
674 | ASSERT_OK(txn->Get(roptions, "a", &value)); | |
675 | ASSERT_EQ(value, "a"); | |
676 | ASSERT_OK(txn->Get(roptions, "a_untrack", &value)); | |
677 | ASSERT_EQ(value, "a_untrack"); | |
678 | ASSERT_OK(txn->Get(roptions, "b", &value)); | |
679 | ASSERT_EQ(value, "b"); | |
680 | ASSERT_OK(txn->Get(roptions, "b_untrack", &value)); | |
681 | ASSERT_EQ(value, "b_untrack"); | |
682 | ||
683 | // b and b_untrack should be rolled back. | |
684 | ASSERT_OK(txn->RollbackToSavePoint()); | |
685 | ASSERT_OK(txn->Get(roptions, "a", &value)); | |
686 | ASSERT_EQ(value, "a"); | |
687 | ASSERT_OK(txn->Get(roptions, "a_untrack", &value)); | |
688 | ASSERT_EQ(value, "a_untrack"); | |
689 | auto s = txn->Get(roptions, "b", &value); | |
690 | ASSERT_TRUE(s.IsNotFound()); | |
691 | s = txn->Get(roptions, "b_untrack", &value); | |
692 | ASSERT_TRUE(s.IsNotFound()); | |
693 | ||
694 | // Everything should be rolled back. | |
695 | ASSERT_OK(txn->Rollback()); | |
696 | s = txn->Get(roptions, "a", &value); | |
697 | ASSERT_TRUE(s.IsNotFound()); | |
698 | s = txn->Get(roptions, "a_untrack", &value); | |
699 | ASSERT_TRUE(s.IsNotFound()); | |
700 | s = txn->Get(roptions, "b", &value); | |
701 | ASSERT_TRUE(s.IsNotFound()); | |
702 | s = txn->Get(roptions, "b_untrack", &value); | |
703 | ASSERT_TRUE(s.IsNotFound()); | |
704 | ||
705 | delete txn; | |
706 | } | |
707 | ||
708 | } // namespace ROCKSDB_NAMESPACE | |
11fdf7f2 TL |
709 | |
710 | int main(int argc, char** argv) { | |
711 | ::testing::InitGoogleTest(&argc, argv); | |
712 | return RUN_ALL_TESTS(); | |
713 | } | |
714 | ||
715 | #else | |
716 | #include <stdio.h> | |
717 | ||
718 | int main(int /*argc*/, char** /*argv*/) { | |
719 | fprintf(stderr, | |
720 | "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n"); | |
721 | return 0; | |
722 | } | |
723 | ||
724 | #endif // ROCKSDB_LITE |