]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/write_unprepared_transaction_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_unprepared_transaction_test.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#ifndef ROCKSDB_LITE
7
11fdf7f2
TL
8#include "utilities/transactions/transaction_test.h"
9#include "utilities/transactions/write_unprepared_txn.h"
10#include "utilities/transactions/write_unprepared_txn_db.h"
11
f67539c2 12namespace ROCKSDB_NAMESPACE {
11fdf7f2
TL
13
14class WriteUnpreparedTransactionTestBase : public TransactionTestBase {
15 public:
16 WriteUnpreparedTransactionTestBase(bool use_stackable_db,
17 bool two_write_queue,
18 TxnDBWritePolicy write_policy)
f67539c2
TL
19 : TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
20 kOrderedWrite) {}
11fdf7f2
TL
21};
22
23class WriteUnpreparedTransactionTest
24 : public WriteUnpreparedTransactionTestBase,
25 virtual public ::testing::WithParamInterface<
26 std::tuple<bool, bool, TxnDBWritePolicy>> {
27 public:
28 WriteUnpreparedTransactionTest()
29 : WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()),
30 std::get<1>(GetParam()),
1e59de90 31 std::get<2>(GetParam())) {}
11fdf7f2
TL
32};
33
34INSTANTIATE_TEST_CASE_P(
35 WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest,
36 ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED),
37 std::make_tuple(false, true, WRITE_UNPREPARED)));
38
f67539c2
TL
39enum StressAction { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT };
40class WriteUnpreparedStressTest : public WriteUnpreparedTransactionTestBase,
41 virtual public ::testing::WithParamInterface<
42 std::tuple<bool, StressAction>> {
43 public:
44 WriteUnpreparedStressTest()
45 : WriteUnpreparedTransactionTestBase(false, std::get<0>(GetParam()),
46 WRITE_UNPREPARED),
47 action_(std::get<1>(GetParam())) {}
48 StressAction action_;
49};
50
51INSTANTIATE_TEST_CASE_P(
52 WriteUnpreparedStressTest, WriteUnpreparedStressTest,
53 ::testing::Values(std::make_tuple(false, NO_SNAPSHOT),
54 std::make_tuple(false, RO_SNAPSHOT),
55 std::make_tuple(false, REFRESH_SNAPSHOT),
56 std::make_tuple(true, NO_SNAPSHOT),
57 std::make_tuple(true, RO_SNAPSHOT),
58 std::make_tuple(true, REFRESH_SNAPSHOT)));
59
11fdf7f2 60TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
f67539c2
TL
61 // The following tests checks whether reading your own write for
62 // a transaction works for write unprepared, when there are uncommitted
63 // values written into DB.
11fdf7f2
TL
64 auto verify_state = [](Iterator* iter, const std::string& key,
65 const std::string& value) {
66 ASSERT_TRUE(iter->Valid());
67 ASSERT_OK(iter->status());
68 ASSERT_EQ(key, iter->key().ToString());
69 ASSERT_EQ(value, iter->value().ToString());
70 };
71
f67539c2
TL
72 // Test always reseeking vs never reseeking.
73 for (uint64_t max_skip : {0, std::numeric_limits<int>::max()}) {
74 options.max_sequential_skip_in_iterations = max_skip;
75 options.disable_auto_compactions = true;
1e59de90 76 ASSERT_OK(ReOpen());
11fdf7f2 77
f67539c2
TL
78 TransactionOptions txn_options;
79 WriteOptions woptions;
80 ReadOptions roptions;
11fdf7f2 81
f67539c2
TL
82 ASSERT_OK(db->Put(woptions, "a", ""));
83 ASSERT_OK(db->Put(woptions, "b", ""));
11fdf7f2 84
f67539c2
TL
85 Transaction* txn = db->BeginTransaction(woptions, txn_options);
86 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
87 txn->SetSnapshot();
11fdf7f2 88
f67539c2 89 for (int i = 0; i < 5; i++) {
1e59de90 90 std::string stored_value = "v" + std::to_string(i);
f67539c2
TL
91 ASSERT_OK(txn->Put("a", stored_value));
92 ASSERT_OK(txn->Put("b", stored_value));
1e59de90 93 ASSERT_OK(wup_txn->FlushWriteBatchToDB(false));
11fdf7f2 94
f67539c2
TL
95 // Test Get()
96 std::string value;
97 ASSERT_OK(txn->Get(roptions, "a", &value));
98 ASSERT_EQ(value, stored_value);
99 ASSERT_OK(txn->Get(roptions, "b", &value));
100 ASSERT_EQ(value, stored_value);
11fdf7f2 101
f67539c2
TL
102 // Test Next()
103 auto iter = txn->GetIterator(roptions);
104 iter->Seek("a");
105 verify_state(iter, "a", stored_value);
11fdf7f2 106
f67539c2
TL
107 iter->Next();
108 verify_state(iter, "b", stored_value);
11fdf7f2 109
f67539c2
TL
110 iter->SeekToFirst();
111 verify_state(iter, "a", stored_value);
11fdf7f2 112
f67539c2
TL
113 iter->Next();
114 verify_state(iter, "b", stored_value);
11fdf7f2 115
f67539c2 116 delete iter;
11fdf7f2 117
f67539c2
TL
118 // Test Prev()
119 iter = txn->GetIterator(roptions);
120 iter->SeekForPrev("b");
121 verify_state(iter, "b", stored_value);
11fdf7f2 122
f67539c2
TL
123 iter->Prev();
124 verify_state(iter, "a", stored_value);
11fdf7f2 125
f67539c2
TL
126 iter->SeekToLast();
127 verify_state(iter, "b", stored_value);
11fdf7f2 128
f67539c2
TL
129 iter->Prev();
130 verify_state(iter, "a", stored_value);
11fdf7f2 131
f67539c2
TL
132 delete iter;
133 }
11fdf7f2 134
f67539c2
TL
135 delete txn;
136 }
137}
11fdf7f2 138
1e59de90 139#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
f67539c2
TL
140TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) {
141 // This is a stress test where different threads are writing random keys, and
142 // then before committing or aborting the transaction, it validates to see
143 // that it can read the keys it wrote, and the keys it did not write respect
144 // the snapshot. To avoid row lock contention (and simply stressing the
145 // locking system), each thread is mostly only writing to its own set of keys.
146 const uint32_t kNumIter = 1000;
147 const uint32_t kNumThreads = 10;
148 const uint32_t kNumKeys = 5;
149
f67539c2
TL
150 // Test with
151 // 1. no snapshots set
152 // 2. snapshot set on ReadOptions
153 // 3. snapshot set, and refreshing after every write.
154 StressAction a = action_;
155 WriteOptions write_options;
156 txn_db_options.transaction_lock_timeout = -1;
157 options.disable_auto_compactions = true;
1e59de90 158 ASSERT_OK(ReOpen());
11fdf7f2 159
f67539c2
TL
160 std::vector<std::string> keys;
161 for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) {
1e59de90 162 keys.push_back("k" + std::to_string(k));
f67539c2 163 }
20effc67 164 RandomShuffle(keys.begin(), keys.end());
f67539c2
TL
165
166 // This counter will act as a "sequence number" to help us validate
167 // visibility logic with snapshots. If we had direct access to the seqno of
168 // snapshots and key/values, then we should directly compare those instead.
169 std::atomic<int64_t> counter(0);
170
171 std::function<void(uint32_t)> stress_thread = [&](int id) {
172 size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
173 Random64 rnd(static_cast<uint32_t>(tid));
174
175 Transaction* txn;
176 TransactionOptions txn_options;
177 // batch_size of 1 causes writes to DB for every marker.
178 txn_options.write_batch_flush_threshold = 1;
179 ReadOptions read_options;
180
181 for (uint32_t i = 0; i < kNumIter; i++) {
20effc67
TL
182 std::set<std::string> owned_keys(keys.begin() + id * kNumKeys,
183 keys.begin() + (id + 1) * kNumKeys);
f67539c2
TL
184 // Add unowned keys to make the workload more interesting, but this
185 // increases row lock contention, so just do it sometimes.
186 if (rnd.OneIn(2)) {
187 owned_keys.insert(keys[rnd.Uniform(kNumKeys * kNumThreads)]);
188 }
11fdf7f2 189
f67539c2 190 txn = db->BeginTransaction(write_options, txn_options);
1e59de90 191 ASSERT_OK(txn->SetName(std::to_string(id)));
f67539c2
TL
192 txn->SetSnapshot();
193 if (a >= RO_SNAPSHOT) {
194 read_options.snapshot = txn->GetSnapshot();
195 ASSERT_TRUE(read_options.snapshot != nullptr);
196 }
11fdf7f2 197
f67539c2
TL
198 uint64_t buf[2];
199 buf[0] = id;
11fdf7f2 200
f67539c2
TL
201 // When scanning through the database, make sure that all unprepared
202 // keys have value >= snapshot and all other keys have value < snapshot.
203 int64_t snapshot_num = counter.fetch_add(1);
11fdf7f2 204
f67539c2
TL
205 Status s;
206 for (const auto& key : owned_keys) {
207 buf[1] = counter.fetch_add(1);
208 s = txn->Put(key, Slice((const char*)buf, sizeof(buf)));
209 if (!s.ok()) {
210 break;
211 }
212 if (a == REFRESH_SNAPSHOT) {
213 txn->SetSnapshot();
214 read_options.snapshot = txn->GetSnapshot();
215 snapshot_num = counter.fetch_add(1);
216 }
217 }
11fdf7f2 218
f67539c2
TL
219 // Failure is possible due to snapshot validation. In this case,
220 // rollback and move onto next iteration.
221 if (!s.ok()) {
222 ASSERT_TRUE(s.IsBusy());
223 ASSERT_OK(txn->Rollback());
224 delete txn;
225 continue;
226 }
11fdf7f2 227
f67539c2
TL
228 auto verify_key = [&owned_keys, &a, &id, &snapshot_num](
229 const std::string& key, const std::string& value) {
230 if (owned_keys.count(key) > 0) {
231 ASSERT_EQ(value.size(), 16);
232
233 // Since this key is part of owned_keys, then this key must be
234 // unprepared by this transaction identified by 'id'
235 ASSERT_EQ(((int64_t*)value.c_str())[0], id);
236 if (a == REFRESH_SNAPSHOT) {
237 // If refresh snapshot is true, then the snapshot is refreshed
238 // after every Put(), meaning that the current snapshot in
239 // snapshot_num must be greater than the "seqno" of any keys
240 // written by the current transaction.
241 ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
242 } else {
243 // If refresh snapshot is not on, then the snapshot was taken at
244 // the beginning of the transaction, meaning all writes must come
245 // after snapshot_num
246 ASSERT_GT(((int64_t*)value.c_str())[1], snapshot_num);
247 }
248 } else if (a >= RO_SNAPSHOT) {
249 // If this is not an unprepared key, just assert that the key
250 // "seqno" is smaller than the snapshot seqno.
251 ASSERT_EQ(value.size(), 16);
252 ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
253 }
254 };
255
256 // Validate Get()/Next()/Prev(). Do only one of them to save time, and
257 // reduce lock contention.
258 switch (rnd.Uniform(3)) {
259 case 0: // Validate Get()
260 {
261 for (const auto& key : keys) {
262 std::string value;
263 s = txn->Get(read_options, Slice(key), &value);
264 if (!s.ok()) {
265 ASSERT_TRUE(s.IsNotFound());
266 ASSERT_EQ(owned_keys.count(key), 0);
267 } else {
268 verify_key(key, value);
269 }
270 }
271 break;
272 }
273 case 1: // Validate Next()
274 {
275 Iterator* iter = txn->GetIterator(read_options);
1e59de90 276 ASSERT_OK(iter->status());
f67539c2
TL
277 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
278 verify_key(iter->key().ToString(), iter->value().ToString());
279 }
1e59de90 280 ASSERT_OK(iter->status());
f67539c2
TL
281 delete iter;
282 break;
283 }
284 case 2: // Validate Prev()
285 {
286 Iterator* iter = txn->GetIterator(read_options);
1e59de90 287 ASSERT_OK(iter->status());
f67539c2
TL
288 for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
289 verify_key(iter->key().ToString(), iter->value().ToString());
290 }
1e59de90 291 ASSERT_OK(iter->status());
f67539c2
TL
292 delete iter;
293 break;
294 }
295 default:
1e59de90 296 FAIL();
f67539c2 297 }
11fdf7f2 298
f67539c2
TL
299 if (rnd.OneIn(2)) {
300 ASSERT_OK(txn->Commit());
301 } else {
302 ASSERT_OK(txn->Rollback());
303 }
304 delete txn;
305 }
306 };
11fdf7f2 307
f67539c2
TL
308 std::vector<port::Thread> threads;
309 for (uint32_t i = 0; i < kNumThreads; i++) {
310 threads.emplace_back(stress_thread, i);
311 }
11fdf7f2 312
f67539c2
TL
313 for (auto& t : threads) {
314 t.join();
315 }
11fdf7f2 316}
1e59de90 317#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
11fdf7f2
TL
318
319// This tests how write unprepared behaves during recovery when the DB crashes
320// after a transaction has either been unprepared or prepared, and tests if
321// the changes are correctly applied for prepared transactions if we decide to
322// rollback/commit.
323TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
324 WriteOptions write_options;
325 write_options.disableWAL = false;
326 TransactionOptions txn_options;
327 std::vector<Transaction*> prepared_trans;
328 WriteUnpreparedTxnDB* wup_db;
329 options.disable_auto_compactions = true;
330
331 enum Action { UNPREPARED, ROLLBACK, COMMIT };
332
333 // batch_size of 1 causes writes to DB for every marker.
334 for (size_t batch_size : {1, 1000000}) {
f67539c2 335 txn_options.write_batch_flush_threshold = batch_size;
11fdf7f2
TL
336 for (bool empty : {true, false}) {
337 for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) {
338 for (int num_batches = 1; num_batches < 10; num_batches++) {
339 // Reset database.
340 prepared_trans.clear();
1e59de90 341 ASSERT_OK(ReOpen());
11fdf7f2
TL
342 wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
343 if (!empty) {
344 for (int i = 0; i < num_batches; i++) {
1e59de90
TL
345 ASSERT_OK(db->Put(WriteOptions(), "k" + std::to_string(i),
346 "before value" + std::to_string(i)));
11fdf7f2
TL
347 }
348 }
349
350 // Write num_batches unprepared batches.
351 Transaction* txn = db->BeginTransaction(write_options, txn_options);
352 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
1e59de90 353 ASSERT_OK(txn->SetName("xid"));
11fdf7f2 354 for (int i = 0; i < num_batches; i++) {
1e59de90
TL
355 ASSERT_OK(
356 txn->Put("k" + std::to_string(i), "value" + std::to_string(i)));
f67539c2
TL
357 if (txn_options.write_batch_flush_threshold == 1) {
358 // WriteUnprepared will check write_batch_flush_threshold and
359 // possibly flush before appending to the write batch. No flush
360 // will happen at the first write because the batch is still
361 // empty, so after k puts, there should be k-1 flushed batches.
362 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
11fdf7f2
TL
363 } else {
364 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
365 }
366 }
367 if (a == UNPREPARED) {
368 // This is done to prevent the destructor from rolling back the
369 // transaction for us, since we want to pretend we crashed and
370 // test that recovery does the rollback.
371 wup_txn->unprep_seqs_.clear();
372 } else {
1e59de90 373 ASSERT_OK(txn->Prepare());
11fdf7f2
TL
374 }
375 delete txn;
376
377 // Crash and run recovery code paths.
1e59de90 378 ASSERT_OK(wup_db->db_impl_->FlushWAL(true));
11fdf7f2 379 wup_db->TEST_Crash();
1e59de90 380 ASSERT_OK(ReOpenNoDelete());
11fdf7f2
TL
381 assert(db != nullptr);
382
383 db->GetAllPreparedTransactions(&prepared_trans);
384 ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1);
385 if (a == ROLLBACK) {
386 ASSERT_OK(prepared_trans[0]->Rollback());
387 delete prepared_trans[0];
388 } else if (a == COMMIT) {
389 ASSERT_OK(prepared_trans[0]->Commit());
390 delete prepared_trans[0];
391 }
392
393 Iterator* iter = db->NewIterator(ReadOptions());
1e59de90 394 ASSERT_OK(iter->status());
11fdf7f2
TL
395 iter->SeekToFirst();
396 // Check that DB has before values.
397 if (!empty || a == COMMIT) {
398 for (int i = 0; i < num_batches; i++) {
399 ASSERT_TRUE(iter->Valid());
1e59de90 400 ASSERT_EQ(iter->key().ToString(), "k" + std::to_string(i));
11fdf7f2 401 if (a == COMMIT) {
1e59de90
TL
402 ASSERT_EQ(iter->value().ToString(),
403 "value" + std::to_string(i));
11fdf7f2
TL
404 } else {
405 ASSERT_EQ(iter->value().ToString(),
1e59de90 406 "before value" + std::to_string(i));
11fdf7f2
TL
407 }
408 iter->Next();
409 }
410 }
411 ASSERT_FALSE(iter->Valid());
1e59de90 412 ASSERT_OK(iter->status());
11fdf7f2
TL
413 delete iter;
414 }
415 }
416 }
417 }
418}
419
420// Basic test to see that unprepared batch gets written to DB when batch size
421// is exceeded. It also does some basic checks to see if commit/rollback works
422// as expected for write unprepared.
423TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
424 WriteOptions write_options;
425 TransactionOptions txn_options;
426 const int kNumKeys = 10;
427
428 // batch_size of 1 causes writes to DB for every marker.
429 for (size_t batch_size : {1, 1000000}) {
f67539c2 430 txn_options.write_batch_flush_threshold = batch_size;
11fdf7f2
TL
431 for (bool prepare : {false, true}) {
432 for (bool commit : {false, true}) {
1e59de90 433 ASSERT_OK(ReOpen());
11fdf7f2
TL
434 Transaction* txn = db->BeginTransaction(write_options, txn_options);
435 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
1e59de90 436 ASSERT_OK(txn->SetName("xid"));
11fdf7f2
TL
437
438 for (int i = 0; i < kNumKeys; i++) {
1e59de90 439 ASSERT_OK(txn->Put("k" + std::to_string(i), "v" + std::to_string(i)));
f67539c2
TL
440 if (txn_options.write_batch_flush_threshold == 1) {
441 // WriteUnprepared will check write_batch_flush_threshold and
442 // possibly flush before appending to the write batch. No flush will
443 // happen at the first write because the batch is still empty, so
444 // after k puts, there should be k-1 flushed batches.
445 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
11fdf7f2
TL
446 } else {
447 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
448 }
449 }
450
451 if (prepare) {
452 ASSERT_OK(txn->Prepare());
453 }
454
455 Iterator* iter = db->NewIterator(ReadOptions());
1e59de90 456 ASSERT_OK(iter->status());
11fdf7f2
TL
457 iter->SeekToFirst();
458 assert(!iter->Valid());
459 ASSERT_FALSE(iter->Valid());
1e59de90 460 ASSERT_OK(iter->status());
11fdf7f2
TL
461 delete iter;
462
463 if (commit) {
464 ASSERT_OK(txn->Commit());
465 } else {
466 ASSERT_OK(txn->Rollback());
467 }
468 delete txn;
469
470 iter = db->NewIterator(ReadOptions());
1e59de90 471 ASSERT_OK(iter->status());
11fdf7f2
TL
472 iter->SeekToFirst();
473
474 for (int i = 0; i < (commit ? kNumKeys : 0); i++) {
475 ASSERT_TRUE(iter->Valid());
1e59de90
TL
476 ASSERT_EQ(iter->key().ToString(), "k" + std::to_string(i));
477 ASSERT_EQ(iter->value().ToString(), "v" + std::to_string(i));
11fdf7f2
TL
478 iter->Next();
479 }
480 ASSERT_FALSE(iter->Valid());
1e59de90 481 ASSERT_OK(iter->status());
11fdf7f2
TL
482 delete iter;
483 }
484 }
485 }
486}
487
488// Test whether logs containing unprepared/prepared batches are kept even
489// after memtable finishes flushing, and whether they are removed when
490// transaction commits/aborts.
491//
492// TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests.
493TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) {
494 WriteOptions write_options;
495 TransactionOptions txn_options;
496 // batch_size of 1 causes writes to DB for every marker.
f67539c2 497 txn_options.write_batch_flush_threshold = 1;
11fdf7f2
TL
498 const int kNumKeys = 10;
499
500 WriteOptions wopts;
501 wopts.sync = true;
502
503 for (bool prepare : {false, true}) {
504 for (bool commit : {false, true}) {
1e59de90 505 ASSERT_OK(ReOpen());
11fdf7f2
TL
506 auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
507 auto db_impl = wup_db->db_impl_;
508
509 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
510 ASSERT_OK(txn1->SetName("xid1"));
511
512 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
513 ASSERT_OK(txn2->SetName("xid2"));
514
515 // Spread this transaction across multiple log files.
516 for (int i = 0; i < kNumKeys; i++) {
1e59de90 517 ASSERT_OK(txn1->Put("k1" + std::to_string(i), "v" + std::to_string(i)));
11fdf7f2 518 if (i >= kNumKeys / 2) {
1e59de90
TL
519 ASSERT_OK(
520 txn2->Put("k2" + std::to_string(i), "v" + std::to_string(i)));
11fdf7f2
TL
521 }
522
523 if (i > 0) {
1e59de90 524 ASSERT_OK(db_impl->TEST_SwitchWAL());
11fdf7f2
TL
525 }
526 }
527
528 ASSERT_GT(txn1->GetLogNumber(), 0);
529 ASSERT_GT(txn2->GetLogNumber(), 0);
530
531 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
532 txn1->GetLogNumber());
533 ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
534
535 if (prepare) {
536 ASSERT_OK(txn1->Prepare());
537 ASSERT_OK(txn2->Prepare());
538 }
539
540 ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
541 ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
542
543 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
544 txn1->GetLogNumber());
545 if (commit) {
546 ASSERT_OK(txn1->Commit());
547 } else {
548 ASSERT_OK(txn1->Rollback());
549 }
550
551 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
552 txn2->GetLogNumber());
553
554 if (commit) {
555 ASSERT_OK(txn2->Commit());
556 } else {
557 ASSERT_OK(txn2->Rollback());
558 }
559
560 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
561
562 delete txn1;
563 delete txn2;
564 }
565 }
566}
567
f67539c2
TL
568TEST_P(WriteUnpreparedTransactionTest, NoSnapshotWrite) {
569 WriteOptions woptions;
570 TransactionOptions txn_options;
571 txn_options.write_batch_flush_threshold = 1;
572
573 Transaction* txn = db->BeginTransaction(woptions, txn_options);
574
575 // Do some writes with no snapshot
576 ASSERT_OK(txn->Put("a", "a"));
577 ASSERT_OK(txn->Put("b", "b"));
578 ASSERT_OK(txn->Put("c", "c"));
579
580 // Test that it is still possible to create iterators after writes with no
581 // snapshot, if iterator snapshot is fresh enough.
582 ReadOptions roptions;
583 auto iter = txn->GetIterator(roptions);
1e59de90 584 ASSERT_OK(iter->status());
f67539c2
TL
585 int keys = 0;
586 for (iter->SeekToLast(); iter->Valid(); iter->Prev(), keys++) {
587 ASSERT_OK(iter->status());
588 ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
589 }
590 ASSERT_EQ(keys, 3);
1e59de90 591 ASSERT_OK(iter->status());
f67539c2
TL
592
593 delete iter;
594 delete txn;
595}
596
597// Test whether write to a transaction while iterating is supported.
598TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
599 WriteOptions woptions;
600 TransactionOptions txn_options;
601 txn_options.write_batch_flush_threshold = 1;
602
603 enum Action { DO_DELETE, DO_UPDATE };
604
605 for (Action a : {DO_DELETE, DO_UPDATE}) {
606 for (int i = 0; i < 100; i++) {
1e59de90 607 ASSERT_OK(db->Put(woptions, std::to_string(i), std::to_string(i)));
f67539c2
TL
608 }
609
610 Transaction* txn = db->BeginTransaction(woptions, txn_options);
611 // write_batch_ now contains 1 key.
612 ASSERT_OK(txn->Put("9", "a"));
613
614 ReadOptions roptions;
615 auto iter = txn->GetIterator(roptions);
1e59de90 616 ASSERT_OK(iter->status());
f67539c2
TL
617 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
618 ASSERT_OK(iter->status());
619 if (iter->key() == "9") {
620 ASSERT_EQ(iter->value().ToString(), "a");
621 } else {
622 ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
623 }
624
625 if (a == DO_DELETE) {
626 ASSERT_OK(txn->Delete(iter->key()));
627 } else {
628 ASSERT_OK(txn->Put(iter->key(), "b"));
629 }
630 }
1e59de90 631 ASSERT_OK(iter->status());
f67539c2
TL
632
633 delete iter;
634 ASSERT_OK(txn->Commit());
635
636 iter = db->NewIterator(roptions);
1e59de90 637 ASSERT_OK(iter->status());
f67539c2
TL
638 if (a == DO_DELETE) {
639 // Check that db is empty.
640 iter->SeekToFirst();
641 ASSERT_FALSE(iter->Valid());
642 } else {
643 int keys = 0;
644 // Check that all values are updated to b.
645 for (iter->SeekToFirst(); iter->Valid(); iter->Next(), keys++) {
646 ASSERT_OK(iter->status());
647 ASSERT_EQ(iter->value().ToString(), "b");
648 }
649 ASSERT_EQ(keys, 100);
650 }
1e59de90 651 ASSERT_OK(iter->status());
f67539c2
TL
652
653 delete iter;
654 delete txn;
655 }
656}
657
1e59de90
TL
658// Test that using an iterator after transaction clear is not supported
659TEST_P(WriteUnpreparedTransactionTest, IterateAfterClear) {
660 WriteOptions woptions;
661 TransactionOptions txn_options;
662 txn_options.write_batch_flush_threshold = 1;
663
664 enum Action { kCommit, kRollback };
665
666 for (Action a : {kCommit, kRollback}) {
667 for (int i = 0; i < 100; i++) {
668 ASSERT_OK(db->Put(woptions, std::to_string(i), std::to_string(i)));
669 }
670
671 Transaction* txn = db->BeginTransaction(woptions, txn_options);
672 ASSERT_OK(txn->Put("9", "a"));
673
674 ReadOptions roptions;
675 auto iter1 = txn->GetIterator(roptions);
676 auto iter2 = txn->GetIterator(roptions);
677 iter1->SeekToFirst();
678 iter2->Seek("9");
679
680 // Check that iterators are valid before transaction finishes.
681 ASSERT_TRUE(iter1->Valid());
682 ASSERT_TRUE(iter2->Valid());
683 ASSERT_OK(iter1->status());
684 ASSERT_OK(iter2->status());
685
686 if (a == kCommit) {
687 ASSERT_OK(txn->Commit());
688 } else {
689 ASSERT_OK(txn->Rollback());
690 }
691
692 // Check that iterators are invalidated after transaction finishes.
693 ASSERT_FALSE(iter1->Valid());
694 ASSERT_FALSE(iter2->Valid());
695 ASSERT_TRUE(iter1->status().IsInvalidArgument());
696 ASSERT_TRUE(iter2->status().IsInvalidArgument());
697
698 delete iter1;
699 delete iter2;
700 delete txn;
701 }
702}
703
f67539c2
TL
704TEST_P(WriteUnpreparedTransactionTest, SavePoint) {
705 WriteOptions woptions;
706 TransactionOptions txn_options;
707 txn_options.write_batch_flush_threshold = 1;
708
709 Transaction* txn = db->BeginTransaction(woptions, txn_options);
710 txn->SetSavePoint();
711 ASSERT_OK(txn->Put("a", "a"));
712 ASSERT_OK(txn->Put("b", "b"));
713 ASSERT_OK(txn->Commit());
714
715 ReadOptions roptions;
716 std::string value;
717 ASSERT_OK(txn->Get(roptions, "a", &value));
718 ASSERT_EQ(value, "a");
719 ASSERT_OK(txn->Get(roptions, "b", &value));
720 ASSERT_EQ(value, "b");
721 delete txn;
722}
723
724TEST_P(WriteUnpreparedTransactionTest, UntrackedKeys) {
725 WriteOptions woptions;
726 TransactionOptions txn_options;
727 txn_options.write_batch_flush_threshold = 1;
728
729 Transaction* txn = db->BeginTransaction(woptions, txn_options);
730 auto wb = txn->GetWriteBatch()->GetWriteBatch();
731 ASSERT_OK(txn->Put("a", "a"));
732 ASSERT_OK(wb->Put("a_untrack", "a_untrack"));
733 txn->SetSavePoint();
734 ASSERT_OK(txn->Put("b", "b"));
735 ASSERT_OK(txn->Put("b_untrack", "b_untrack"));
736
737 ReadOptions roptions;
738 std::string value;
739 ASSERT_OK(txn->Get(roptions, "a", &value));
740 ASSERT_EQ(value, "a");
741 ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
742 ASSERT_EQ(value, "a_untrack");
743 ASSERT_OK(txn->Get(roptions, "b", &value));
744 ASSERT_EQ(value, "b");
745 ASSERT_OK(txn->Get(roptions, "b_untrack", &value));
746 ASSERT_EQ(value, "b_untrack");
747
748 // b and b_untrack should be rolled back.
749 ASSERT_OK(txn->RollbackToSavePoint());
750 ASSERT_OK(txn->Get(roptions, "a", &value));
751 ASSERT_EQ(value, "a");
752 ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
753 ASSERT_EQ(value, "a_untrack");
754 auto s = txn->Get(roptions, "b", &value);
755 ASSERT_TRUE(s.IsNotFound());
756 s = txn->Get(roptions, "b_untrack", &value);
757 ASSERT_TRUE(s.IsNotFound());
758
759 // Everything should be rolled back.
760 ASSERT_OK(txn->Rollback());
761 s = txn->Get(roptions, "a", &value);
762 ASSERT_TRUE(s.IsNotFound());
763 s = txn->Get(roptions, "a_untrack", &value);
764 ASSERT_TRUE(s.IsNotFound());
765 s = txn->Get(roptions, "b", &value);
766 ASSERT_TRUE(s.IsNotFound());
767 s = txn->Get(roptions, "b_untrack", &value);
768 ASSERT_TRUE(s.IsNotFound());
769
770 delete txn;
771}
772
773} // namespace ROCKSDB_NAMESPACE
11fdf7f2
TL
774
775int main(int argc, char** argv) {
1e59de90 776 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
11fdf7f2
TL
777 ::testing::InitGoogleTest(&argc, argv);
778 return RUN_ALL_TESTS();
779}
780
781#else
782#include <stdio.h>
783
784int main(int /*argc*/, char** /*argv*/) {
785 fprintf(stderr,
786 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
787 return 0;
788}
789
790#endif // ROCKSDB_LITE