]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_unprepared_transaction_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_unprepared_transaction_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/transaction_test.h"
9 #include "utilities/transactions/write_unprepared_txn.h"
10 #include "utilities/transactions/write_unprepared_txn_db.h"
11
12 namespace ROCKSDB_NAMESPACE {
13
14 class WriteUnpreparedTransactionTestBase : public TransactionTestBase {
15 public:
16 WriteUnpreparedTransactionTestBase(bool use_stackable_db,
17 bool two_write_queue,
18 TxnDBWritePolicy write_policy)
19 : TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
20 kOrderedWrite) {}
21 };
22
23 class WriteUnpreparedTransactionTest
24 : public WriteUnpreparedTransactionTestBase,
25 virtual public ::testing::WithParamInterface<
26 std::tuple<bool, bool, TxnDBWritePolicy>> {
27 public:
28 WriteUnpreparedTransactionTest()
29 : WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()),
30 std::get<1>(GetParam()),
31 std::get<2>(GetParam())){}
32 };
33
34 INSTANTIATE_TEST_CASE_P(
35 WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest,
36 ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED),
37 std::make_tuple(false, true, WRITE_UNPREPARED)));
38
39 enum StressAction { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT };
40 class WriteUnpreparedStressTest : public WriteUnpreparedTransactionTestBase,
41 virtual public ::testing::WithParamInterface<
42 std::tuple<bool, StressAction>> {
43 public:
44 WriteUnpreparedStressTest()
45 : WriteUnpreparedTransactionTestBase(false, std::get<0>(GetParam()),
46 WRITE_UNPREPARED),
47 action_(std::get<1>(GetParam())) {}
48 StressAction action_;
49 };
50
51 INSTANTIATE_TEST_CASE_P(
52 WriteUnpreparedStressTest, WriteUnpreparedStressTest,
53 ::testing::Values(std::make_tuple(false, NO_SNAPSHOT),
54 std::make_tuple(false, RO_SNAPSHOT),
55 std::make_tuple(false, REFRESH_SNAPSHOT),
56 std::make_tuple(true, NO_SNAPSHOT),
57 std::make_tuple(true, RO_SNAPSHOT),
58 std::make_tuple(true, REFRESH_SNAPSHOT)));
59
60 TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
61 // The following tests checks whether reading your own write for
62 // a transaction works for write unprepared, when there are uncommitted
63 // values written into DB.
64 auto verify_state = [](Iterator* iter, const std::string& key,
65 const std::string& value) {
66 ASSERT_TRUE(iter->Valid());
67 ASSERT_OK(iter->status());
68 ASSERT_EQ(key, iter->key().ToString());
69 ASSERT_EQ(value, iter->value().ToString());
70 };
71
72 // Test always reseeking vs never reseeking.
73 for (uint64_t max_skip : {0, std::numeric_limits<int>::max()}) {
74 options.max_sequential_skip_in_iterations = max_skip;
75 options.disable_auto_compactions = true;
76 ReOpen();
77
78 TransactionOptions txn_options;
79 WriteOptions woptions;
80 ReadOptions roptions;
81
82 ASSERT_OK(db->Put(woptions, "a", ""));
83 ASSERT_OK(db->Put(woptions, "b", ""));
84
85 Transaction* txn = db->BeginTransaction(woptions, txn_options);
86 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
87 txn->SetSnapshot();
88
89 for (int i = 0; i < 5; i++) {
90 std::string stored_value = "v" + ToString(i);
91 ASSERT_OK(txn->Put("a", stored_value));
92 ASSERT_OK(txn->Put("b", stored_value));
93 wup_txn->FlushWriteBatchToDB(false);
94
95 // Test Get()
96 std::string value;
97 ASSERT_OK(txn->Get(roptions, "a", &value));
98 ASSERT_EQ(value, stored_value);
99 ASSERT_OK(txn->Get(roptions, "b", &value));
100 ASSERT_EQ(value, stored_value);
101
102 // Test Next()
103 auto iter = txn->GetIterator(roptions);
104 iter->Seek("a");
105 verify_state(iter, "a", stored_value);
106
107 iter->Next();
108 verify_state(iter, "b", stored_value);
109
110 iter->SeekToFirst();
111 verify_state(iter, "a", stored_value);
112
113 iter->Next();
114 verify_state(iter, "b", stored_value);
115
116 delete iter;
117
118 // Test Prev()
119 iter = txn->GetIterator(roptions);
120 iter->SeekForPrev("b");
121 verify_state(iter, "b", stored_value);
122
123 iter->Prev();
124 verify_state(iter, "a", stored_value);
125
126 iter->SeekToLast();
127 verify_state(iter, "b", stored_value);
128
129 iter->Prev();
130 verify_state(iter, "a", stored_value);
131
132 delete iter;
133 }
134
135 delete txn;
136 }
137 }
138
139 #ifndef ROCKSDB_VALGRIND_RUN
140 TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) {
141 // This is a stress test where different threads are writing random keys, and
142 // then before committing or aborting the transaction, it validates to see
143 // that it can read the keys it wrote, and the keys it did not write respect
144 // the snapshot. To avoid row lock contention (and simply stressing the
145 // locking system), each thread is mostly only writing to its own set of keys.
146 const uint32_t kNumIter = 1000;
147 const uint32_t kNumThreads = 10;
148 const uint32_t kNumKeys = 5;
149
150 std::default_random_engine rand(static_cast<uint32_t>(
151 std::hash<std::thread::id>()(std::this_thread::get_id())));
152
153 // Test with
154 // 1. no snapshots set
155 // 2. snapshot set on ReadOptions
156 // 3. snapshot set, and refreshing after every write.
157 StressAction a = action_;
158 WriteOptions write_options;
159 txn_db_options.transaction_lock_timeout = -1;
160 options.disable_auto_compactions = true;
161 ReOpen();
162
163 std::vector<std::string> keys;
164 for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) {
165 keys.push_back("k" + ToString(k));
166 }
167 std::shuffle(keys.begin(), keys.end(), rand);
168
169 // This counter will act as a "sequence number" to help us validate
170 // visibility logic with snapshots. If we had direct access to the seqno of
171 // snapshots and key/values, then we should directly compare those instead.
172 std::atomic<int64_t> counter(0);
173
174 std::function<void(uint32_t)> stress_thread = [&](int id) {
175 size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
176 Random64 rnd(static_cast<uint32_t>(tid));
177
178 Transaction* txn;
179 TransactionOptions txn_options;
180 // batch_size of 1 causes writes to DB for every marker.
181 txn_options.write_batch_flush_threshold = 1;
182 ReadOptions read_options;
183
184 for (uint32_t i = 0; i < kNumIter; i++) {
185 std::set<std::string> owned_keys(&keys[id * kNumKeys],
186 &keys[(id + 1) * kNumKeys]);
187 // Add unowned keys to make the workload more interesting, but this
188 // increases row lock contention, so just do it sometimes.
189 if (rnd.OneIn(2)) {
190 owned_keys.insert(keys[rnd.Uniform(kNumKeys * kNumThreads)]);
191 }
192
193 txn = db->BeginTransaction(write_options, txn_options);
194 txn->SetName(ToString(id));
195 txn->SetSnapshot();
196 if (a >= RO_SNAPSHOT) {
197 read_options.snapshot = txn->GetSnapshot();
198 ASSERT_TRUE(read_options.snapshot != nullptr);
199 }
200
201 uint64_t buf[2];
202 buf[0] = id;
203
204 // When scanning through the database, make sure that all unprepared
205 // keys have value >= snapshot and all other keys have value < snapshot.
206 int64_t snapshot_num = counter.fetch_add(1);
207
208 Status s;
209 for (const auto& key : owned_keys) {
210 buf[1] = counter.fetch_add(1);
211 s = txn->Put(key, Slice((const char*)buf, sizeof(buf)));
212 if (!s.ok()) {
213 break;
214 }
215 if (a == REFRESH_SNAPSHOT) {
216 txn->SetSnapshot();
217 read_options.snapshot = txn->GetSnapshot();
218 snapshot_num = counter.fetch_add(1);
219 }
220 }
221
222 // Failure is possible due to snapshot validation. In this case,
223 // rollback and move onto next iteration.
224 if (!s.ok()) {
225 ASSERT_TRUE(s.IsBusy());
226 ASSERT_OK(txn->Rollback());
227 delete txn;
228 continue;
229 }
230
231 auto verify_key = [&owned_keys, &a, &id, &snapshot_num](
232 const std::string& key, const std::string& value) {
233 if (owned_keys.count(key) > 0) {
234 ASSERT_EQ(value.size(), 16);
235
236 // Since this key is part of owned_keys, then this key must be
237 // unprepared by this transaction identified by 'id'
238 ASSERT_EQ(((int64_t*)value.c_str())[0], id);
239 if (a == REFRESH_SNAPSHOT) {
240 // If refresh snapshot is true, then the snapshot is refreshed
241 // after every Put(), meaning that the current snapshot in
242 // snapshot_num must be greater than the "seqno" of any keys
243 // written by the current transaction.
244 ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
245 } else {
246 // If refresh snapshot is not on, then the snapshot was taken at
247 // the beginning of the transaction, meaning all writes must come
248 // after snapshot_num
249 ASSERT_GT(((int64_t*)value.c_str())[1], snapshot_num);
250 }
251 } else if (a >= RO_SNAPSHOT) {
252 // If this is not an unprepared key, just assert that the key
253 // "seqno" is smaller than the snapshot seqno.
254 ASSERT_EQ(value.size(), 16);
255 ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
256 }
257 };
258
259 // Validate Get()/Next()/Prev(). Do only one of them to save time, and
260 // reduce lock contention.
261 switch (rnd.Uniform(3)) {
262 case 0: // Validate Get()
263 {
264 for (const auto& key : keys) {
265 std::string value;
266 s = txn->Get(read_options, Slice(key), &value);
267 if (!s.ok()) {
268 ASSERT_TRUE(s.IsNotFound());
269 ASSERT_EQ(owned_keys.count(key), 0);
270 } else {
271 verify_key(key, value);
272 }
273 }
274 break;
275 }
276 case 1: // Validate Next()
277 {
278 Iterator* iter = txn->GetIterator(read_options);
279 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
280 verify_key(iter->key().ToString(), iter->value().ToString());
281 }
282 delete iter;
283 break;
284 }
285 case 2: // Validate Prev()
286 {
287 Iterator* iter = txn->GetIterator(read_options);
288 for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
289 verify_key(iter->key().ToString(), iter->value().ToString());
290 }
291 delete iter;
292 break;
293 }
294 default:
295 ASSERT_TRUE(false);
296 }
297
298 if (rnd.OneIn(2)) {
299 ASSERT_OK(txn->Commit());
300 } else {
301 ASSERT_OK(txn->Rollback());
302 }
303 delete txn;
304 }
305 };
306
307 std::vector<port::Thread> threads;
308 for (uint32_t i = 0; i < kNumThreads; i++) {
309 threads.emplace_back(stress_thread, i);
310 }
311
312 for (auto& t : threads) {
313 t.join();
314 }
315 }
316 #endif // ROCKSDB_VALGRIND_RUN
317
318 // This tests how write unprepared behaves during recovery when the DB crashes
319 // after a transaction has either been unprepared or prepared, and tests if
320 // the changes are correctly applied for prepared transactions if we decide to
321 // rollback/commit.
322 TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
323 WriteOptions write_options;
324 write_options.disableWAL = false;
325 TransactionOptions txn_options;
326 std::vector<Transaction*> prepared_trans;
327 WriteUnpreparedTxnDB* wup_db;
328 options.disable_auto_compactions = true;
329
330 enum Action { UNPREPARED, ROLLBACK, COMMIT };
331
332 // batch_size of 1 causes writes to DB for every marker.
333 for (size_t batch_size : {1, 1000000}) {
334 txn_options.write_batch_flush_threshold = batch_size;
335 for (bool empty : {true, false}) {
336 for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) {
337 for (int num_batches = 1; num_batches < 10; num_batches++) {
338 // Reset database.
339 prepared_trans.clear();
340 ReOpen();
341 wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
342 if (!empty) {
343 for (int i = 0; i < num_batches; i++) {
344 ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i),
345 "before value" + ToString(i)));
346 }
347 }
348
349 // Write num_batches unprepared batches.
350 Transaction* txn = db->BeginTransaction(write_options, txn_options);
351 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
352 txn->SetName("xid");
353 for (int i = 0; i < num_batches; i++) {
354 ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i)));
355 if (txn_options.write_batch_flush_threshold == 1) {
356 // WriteUnprepared will check write_batch_flush_threshold and
357 // possibly flush before appending to the write batch. No flush
358 // will happen at the first write because the batch is still
359 // empty, so after k puts, there should be k-1 flushed batches.
360 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
361 } else {
362 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
363 }
364 }
365 if (a == UNPREPARED) {
366 // This is done to prevent the destructor from rolling back the
367 // transaction for us, since we want to pretend we crashed and
368 // test that recovery does the rollback.
369 wup_txn->unprep_seqs_.clear();
370 } else {
371 txn->Prepare();
372 }
373 delete txn;
374
375 // Crash and run recovery code paths.
376 wup_db->db_impl_->FlushWAL(true);
377 wup_db->TEST_Crash();
378 ReOpenNoDelete();
379 assert(db != nullptr);
380
381 db->GetAllPreparedTransactions(&prepared_trans);
382 ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1);
383 if (a == ROLLBACK) {
384 ASSERT_OK(prepared_trans[0]->Rollback());
385 delete prepared_trans[0];
386 } else if (a == COMMIT) {
387 ASSERT_OK(prepared_trans[0]->Commit());
388 delete prepared_trans[0];
389 }
390
391 Iterator* iter = db->NewIterator(ReadOptions());
392 iter->SeekToFirst();
393 // Check that DB has before values.
394 if (!empty || a == COMMIT) {
395 for (int i = 0; i < num_batches; i++) {
396 ASSERT_TRUE(iter->Valid());
397 ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
398 if (a == COMMIT) {
399 ASSERT_EQ(iter->value().ToString(), "value" + ToString(i));
400 } else {
401 ASSERT_EQ(iter->value().ToString(),
402 "before value" + ToString(i));
403 }
404 iter->Next();
405 }
406 }
407 ASSERT_FALSE(iter->Valid());
408 delete iter;
409 }
410 }
411 }
412 }
413 }
414
415 // Basic test to see that unprepared batch gets written to DB when batch size
416 // is exceeded. It also does some basic checks to see if commit/rollback works
417 // as expected for write unprepared.
418 TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
419 WriteOptions write_options;
420 TransactionOptions txn_options;
421 const int kNumKeys = 10;
422
423 // batch_size of 1 causes writes to DB for every marker.
424 for (size_t batch_size : {1, 1000000}) {
425 txn_options.write_batch_flush_threshold = batch_size;
426 for (bool prepare : {false, true}) {
427 for (bool commit : {false, true}) {
428 ReOpen();
429 Transaction* txn = db->BeginTransaction(write_options, txn_options);
430 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
431 txn->SetName("xid");
432
433 for (int i = 0; i < kNumKeys; i++) {
434 txn->Put("k" + ToString(i), "v" + ToString(i));
435 if (txn_options.write_batch_flush_threshold == 1) {
436 // WriteUnprepared will check write_batch_flush_threshold and
437 // possibly flush before appending to the write batch. No flush will
438 // happen at the first write because the batch is still empty, so
439 // after k puts, there should be k-1 flushed batches.
440 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
441 } else {
442 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
443 }
444 }
445
446 if (prepare) {
447 ASSERT_OK(txn->Prepare());
448 }
449
450 Iterator* iter = db->NewIterator(ReadOptions());
451 iter->SeekToFirst();
452 assert(!iter->Valid());
453 ASSERT_FALSE(iter->Valid());
454 delete iter;
455
456 if (commit) {
457 ASSERT_OK(txn->Commit());
458 } else {
459 ASSERT_OK(txn->Rollback());
460 }
461 delete txn;
462
463 iter = db->NewIterator(ReadOptions());
464 iter->SeekToFirst();
465
466 for (int i = 0; i < (commit ? kNumKeys : 0); i++) {
467 ASSERT_TRUE(iter->Valid());
468 ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
469 ASSERT_EQ(iter->value().ToString(), "v" + ToString(i));
470 iter->Next();
471 }
472 ASSERT_FALSE(iter->Valid());
473 delete iter;
474 }
475 }
476 }
477 }
478
479 // Test whether logs containing unprepared/prepared batches are kept even
480 // after memtable finishes flushing, and whether they are removed when
481 // transaction commits/aborts.
482 //
483 // TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests.
484 TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) {
485 WriteOptions write_options;
486 TransactionOptions txn_options;
487 // batch_size of 1 causes writes to DB for every marker.
488 txn_options.write_batch_flush_threshold = 1;
489 const int kNumKeys = 10;
490
491 WriteOptions wopts;
492 wopts.sync = true;
493
494 for (bool prepare : {false, true}) {
495 for (bool commit : {false, true}) {
496 ReOpen();
497 auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
498 auto db_impl = wup_db->db_impl_;
499
500 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
501 ASSERT_OK(txn1->SetName("xid1"));
502
503 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
504 ASSERT_OK(txn2->SetName("xid2"));
505
506 // Spread this transaction across multiple log files.
507 for (int i = 0; i < kNumKeys; i++) {
508 ASSERT_OK(txn1->Put("k1" + ToString(i), "v" + ToString(i)));
509 if (i >= kNumKeys / 2) {
510 ASSERT_OK(txn2->Put("k2" + ToString(i), "v" + ToString(i)));
511 }
512
513 if (i > 0) {
514 db_impl->TEST_SwitchWAL();
515 }
516 }
517
518 ASSERT_GT(txn1->GetLogNumber(), 0);
519 ASSERT_GT(txn2->GetLogNumber(), 0);
520
521 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
522 txn1->GetLogNumber());
523 ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
524
525 if (prepare) {
526 ASSERT_OK(txn1->Prepare());
527 ASSERT_OK(txn2->Prepare());
528 }
529
530 ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
531 ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
532
533 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
534 txn1->GetLogNumber());
535 if (commit) {
536 ASSERT_OK(txn1->Commit());
537 } else {
538 ASSERT_OK(txn1->Rollback());
539 }
540
541 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
542 txn2->GetLogNumber());
543
544 if (commit) {
545 ASSERT_OK(txn2->Commit());
546 } else {
547 ASSERT_OK(txn2->Rollback());
548 }
549
550 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
551
552 delete txn1;
553 delete txn2;
554 }
555 }
556 }
557
558 TEST_P(WriteUnpreparedTransactionTest, NoSnapshotWrite) {
559 WriteOptions woptions;
560 TransactionOptions txn_options;
561 txn_options.write_batch_flush_threshold = 1;
562
563 Transaction* txn = db->BeginTransaction(woptions, txn_options);
564
565 // Do some writes with no snapshot
566 ASSERT_OK(txn->Put("a", "a"));
567 ASSERT_OK(txn->Put("b", "b"));
568 ASSERT_OK(txn->Put("c", "c"));
569
570 // Test that it is still possible to create iterators after writes with no
571 // snapshot, if iterator snapshot is fresh enough.
572 ReadOptions roptions;
573 auto iter = txn->GetIterator(roptions);
574 int keys = 0;
575 for (iter->SeekToLast(); iter->Valid(); iter->Prev(), keys++) {
576 ASSERT_OK(iter->status());
577 ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
578 }
579 ASSERT_EQ(keys, 3);
580
581 delete iter;
582 delete txn;
583 }
584
585 // Test whether write to a transaction while iterating is supported.
586 TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
587 WriteOptions woptions;
588 TransactionOptions txn_options;
589 txn_options.write_batch_flush_threshold = 1;
590
591 enum Action { DO_DELETE, DO_UPDATE };
592
593 for (Action a : {DO_DELETE, DO_UPDATE}) {
594 for (int i = 0; i < 100; i++) {
595 ASSERT_OK(db->Put(woptions, ToString(i), ToString(i)));
596 }
597
598 Transaction* txn = db->BeginTransaction(woptions, txn_options);
599 // write_batch_ now contains 1 key.
600 ASSERT_OK(txn->Put("9", "a"));
601
602 ReadOptions roptions;
603 auto iter = txn->GetIterator(roptions);
604 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
605 ASSERT_OK(iter->status());
606 if (iter->key() == "9") {
607 ASSERT_EQ(iter->value().ToString(), "a");
608 } else {
609 ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
610 }
611
612 if (a == DO_DELETE) {
613 ASSERT_OK(txn->Delete(iter->key()));
614 } else {
615 ASSERT_OK(txn->Put(iter->key(), "b"));
616 }
617 }
618
619 delete iter;
620 ASSERT_OK(txn->Commit());
621
622 iter = db->NewIterator(roptions);
623 if (a == DO_DELETE) {
624 // Check that db is empty.
625 iter->SeekToFirst();
626 ASSERT_FALSE(iter->Valid());
627 } else {
628 int keys = 0;
629 // Check that all values are updated to b.
630 for (iter->SeekToFirst(); iter->Valid(); iter->Next(), keys++) {
631 ASSERT_OK(iter->status());
632 ASSERT_EQ(iter->value().ToString(), "b");
633 }
634 ASSERT_EQ(keys, 100);
635 }
636
637 delete iter;
638 delete txn;
639 }
640 }
641
642 TEST_P(WriteUnpreparedTransactionTest, SavePoint) {
643 WriteOptions woptions;
644 TransactionOptions txn_options;
645 txn_options.write_batch_flush_threshold = 1;
646
647 Transaction* txn = db->BeginTransaction(woptions, txn_options);
648 txn->SetSavePoint();
649 ASSERT_OK(txn->Put("a", "a"));
650 ASSERT_OK(txn->Put("b", "b"));
651 ASSERT_OK(txn->Commit());
652
653 ReadOptions roptions;
654 std::string value;
655 ASSERT_OK(txn->Get(roptions, "a", &value));
656 ASSERT_EQ(value, "a");
657 ASSERT_OK(txn->Get(roptions, "b", &value));
658 ASSERT_EQ(value, "b");
659 delete txn;
660 }
661
662 TEST_P(WriteUnpreparedTransactionTest, UntrackedKeys) {
663 WriteOptions woptions;
664 TransactionOptions txn_options;
665 txn_options.write_batch_flush_threshold = 1;
666
667 Transaction* txn = db->BeginTransaction(woptions, txn_options);
668 auto wb = txn->GetWriteBatch()->GetWriteBatch();
669 ASSERT_OK(txn->Put("a", "a"));
670 ASSERT_OK(wb->Put("a_untrack", "a_untrack"));
671 txn->SetSavePoint();
672 ASSERT_OK(txn->Put("b", "b"));
673 ASSERT_OK(txn->Put("b_untrack", "b_untrack"));
674
675 ReadOptions roptions;
676 std::string value;
677 ASSERT_OK(txn->Get(roptions, "a", &value));
678 ASSERT_EQ(value, "a");
679 ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
680 ASSERT_EQ(value, "a_untrack");
681 ASSERT_OK(txn->Get(roptions, "b", &value));
682 ASSERT_EQ(value, "b");
683 ASSERT_OK(txn->Get(roptions, "b_untrack", &value));
684 ASSERT_EQ(value, "b_untrack");
685
686 // b and b_untrack should be rolled back.
687 ASSERT_OK(txn->RollbackToSavePoint());
688 ASSERT_OK(txn->Get(roptions, "a", &value));
689 ASSERT_EQ(value, "a");
690 ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
691 ASSERT_EQ(value, "a_untrack");
692 auto s = txn->Get(roptions, "b", &value);
693 ASSERT_TRUE(s.IsNotFound());
694 s = txn->Get(roptions, "b_untrack", &value);
695 ASSERT_TRUE(s.IsNotFound());
696
697 // Everything should be rolled back.
698 ASSERT_OK(txn->Rollback());
699 s = txn->Get(roptions, "a", &value);
700 ASSERT_TRUE(s.IsNotFound());
701 s = txn->Get(roptions, "a_untrack", &value);
702 ASSERT_TRUE(s.IsNotFound());
703 s = txn->Get(roptions, "b", &value);
704 ASSERT_TRUE(s.IsNotFound());
705 s = txn->Get(roptions, "b_untrack", &value);
706 ASSERT_TRUE(s.IsNotFound());
707
708 delete txn;
709 }
710
711 } // namespace ROCKSDB_NAMESPACE
712
713 int main(int argc, char** argv) {
714 ::testing::InitGoogleTest(&argc, argv);
715 return RUN_ALL_TESTS();
716 }
717
718 #else
719 #include <stdio.h>
720
721 int main(int /*argc*/, char** /*argv*/) {
722 fprintf(stderr,
723 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
724 return 0;
725 }
726
727 #endif // ROCKSDB_LITE