]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/write_unprepared_transaction_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_unprepared_transaction_test.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#ifndef ROCKSDB_LITE
7
11fdf7f2
TL
8#include "utilities/transactions/transaction_test.h"
9#include "utilities/transactions/write_unprepared_txn.h"
10#include "utilities/transactions/write_unprepared_txn_db.h"
11
f67539c2 12namespace ROCKSDB_NAMESPACE {
11fdf7f2
TL
13
14class WriteUnpreparedTransactionTestBase : public TransactionTestBase {
15 public:
16 WriteUnpreparedTransactionTestBase(bool use_stackable_db,
17 bool two_write_queue,
18 TxnDBWritePolicy write_policy)
f67539c2
TL
19 : TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
20 kOrderedWrite) {}
11fdf7f2
TL
21};
22
23class WriteUnpreparedTransactionTest
24 : public WriteUnpreparedTransactionTestBase,
25 virtual public ::testing::WithParamInterface<
26 std::tuple<bool, bool, TxnDBWritePolicy>> {
27 public:
28 WriteUnpreparedTransactionTest()
29 : WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()),
30 std::get<1>(GetParam()),
31 std::get<2>(GetParam())){}
32};
33
34INSTANTIATE_TEST_CASE_P(
35 WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest,
36 ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED),
37 std::make_tuple(false, true, WRITE_UNPREPARED)));
38
f67539c2
TL
39enum StressAction { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT };
40class WriteUnpreparedStressTest : public WriteUnpreparedTransactionTestBase,
41 virtual public ::testing::WithParamInterface<
42 std::tuple<bool, StressAction>> {
43 public:
44 WriteUnpreparedStressTest()
45 : WriteUnpreparedTransactionTestBase(false, std::get<0>(GetParam()),
46 WRITE_UNPREPARED),
47 action_(std::get<1>(GetParam())) {}
48 StressAction action_;
49};
50
51INSTANTIATE_TEST_CASE_P(
52 WriteUnpreparedStressTest, WriteUnpreparedStressTest,
53 ::testing::Values(std::make_tuple(false, NO_SNAPSHOT),
54 std::make_tuple(false, RO_SNAPSHOT),
55 std::make_tuple(false, REFRESH_SNAPSHOT),
56 std::make_tuple(true, NO_SNAPSHOT),
57 std::make_tuple(true, RO_SNAPSHOT),
58 std::make_tuple(true, REFRESH_SNAPSHOT)));
59
11fdf7f2 60TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
f67539c2
TL
61 // The following tests checks whether reading your own write for
62 // a transaction works for write unprepared, when there are uncommitted
63 // values written into DB.
11fdf7f2
TL
64 auto verify_state = [](Iterator* iter, const std::string& key,
65 const std::string& value) {
66 ASSERT_TRUE(iter->Valid());
67 ASSERT_OK(iter->status());
68 ASSERT_EQ(key, iter->key().ToString());
69 ASSERT_EQ(value, iter->value().ToString());
70 };
71
f67539c2
TL
72 // Test always reseeking vs never reseeking.
73 for (uint64_t max_skip : {0, std::numeric_limits<int>::max()}) {
74 options.max_sequential_skip_in_iterations = max_skip;
75 options.disable_auto_compactions = true;
76 ReOpen();
11fdf7f2 77
f67539c2
TL
78 TransactionOptions txn_options;
79 WriteOptions woptions;
80 ReadOptions roptions;
11fdf7f2 81
f67539c2
TL
82 ASSERT_OK(db->Put(woptions, "a", ""));
83 ASSERT_OK(db->Put(woptions, "b", ""));
11fdf7f2 84
f67539c2
TL
85 Transaction* txn = db->BeginTransaction(woptions, txn_options);
86 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
87 txn->SetSnapshot();
11fdf7f2 88
f67539c2
TL
89 for (int i = 0; i < 5; i++) {
90 std::string stored_value = "v" + ToString(i);
91 ASSERT_OK(txn->Put("a", stored_value));
92 ASSERT_OK(txn->Put("b", stored_value));
93 wup_txn->FlushWriteBatchToDB(false);
11fdf7f2 94
f67539c2
TL
95 // Test Get()
96 std::string value;
97 ASSERT_OK(txn->Get(roptions, "a", &value));
98 ASSERT_EQ(value, stored_value);
99 ASSERT_OK(txn->Get(roptions, "b", &value));
100 ASSERT_EQ(value, stored_value);
11fdf7f2 101
f67539c2
TL
102 // Test Next()
103 auto iter = txn->GetIterator(roptions);
104 iter->Seek("a");
105 verify_state(iter, "a", stored_value);
11fdf7f2 106
f67539c2
TL
107 iter->Next();
108 verify_state(iter, "b", stored_value);
11fdf7f2 109
f67539c2
TL
110 iter->SeekToFirst();
111 verify_state(iter, "a", stored_value);
11fdf7f2 112
f67539c2
TL
113 iter->Next();
114 verify_state(iter, "b", stored_value);
11fdf7f2 115
f67539c2 116 delete iter;
11fdf7f2 117
f67539c2
TL
118 // Test Prev()
119 iter = txn->GetIterator(roptions);
120 iter->SeekForPrev("b");
121 verify_state(iter, "b", stored_value);
11fdf7f2 122
f67539c2
TL
123 iter->Prev();
124 verify_state(iter, "a", stored_value);
11fdf7f2 125
f67539c2
TL
126 iter->SeekToLast();
127 verify_state(iter, "b", stored_value);
11fdf7f2 128
f67539c2
TL
129 iter->Prev();
130 verify_state(iter, "a", stored_value);
11fdf7f2 131
f67539c2
TL
132 delete iter;
133 }
11fdf7f2 134
f67539c2
TL
135 delete txn;
136 }
137}
11fdf7f2 138
f67539c2
TL
139#ifndef ROCKSDB_VALGRIND_RUN
140TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) {
141 // This is a stress test where different threads are writing random keys, and
142 // then before committing or aborting the transaction, it validates to see
143 // that it can read the keys it wrote, and the keys it did not write respect
144 // the snapshot. To avoid row lock contention (and simply stressing the
145 // locking system), each thread is mostly only writing to its own set of keys.
146 const uint32_t kNumIter = 1000;
147 const uint32_t kNumThreads = 10;
148 const uint32_t kNumKeys = 5;
149
f67539c2
TL
150 // Test with
151 // 1. no snapshots set
152 // 2. snapshot set on ReadOptions
153 // 3. snapshot set, and refreshing after every write.
154 StressAction a = action_;
155 WriteOptions write_options;
156 txn_db_options.transaction_lock_timeout = -1;
157 options.disable_auto_compactions = true;
158 ReOpen();
11fdf7f2 159
f67539c2
TL
160 std::vector<std::string> keys;
161 for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) {
162 keys.push_back("k" + ToString(k));
163 }
20effc67 164 RandomShuffle(keys.begin(), keys.end());
f67539c2
TL
165
166 // This counter will act as a "sequence number" to help us validate
167 // visibility logic with snapshots. If we had direct access to the seqno of
168 // snapshots and key/values, then we should directly compare those instead.
169 std::atomic<int64_t> counter(0);
170
171 std::function<void(uint32_t)> stress_thread = [&](int id) {
172 size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
173 Random64 rnd(static_cast<uint32_t>(tid));
174
175 Transaction* txn;
176 TransactionOptions txn_options;
177 // batch_size of 1 causes writes to DB for every marker.
178 txn_options.write_batch_flush_threshold = 1;
179 ReadOptions read_options;
180
181 for (uint32_t i = 0; i < kNumIter; i++) {
20effc67
TL
182 std::set<std::string> owned_keys(keys.begin() + id * kNumKeys,
183 keys.begin() + (id + 1) * kNumKeys);
f67539c2
TL
184 // Add unowned keys to make the workload more interesting, but this
185 // increases row lock contention, so just do it sometimes.
186 if (rnd.OneIn(2)) {
187 owned_keys.insert(keys[rnd.Uniform(kNumKeys * kNumThreads)]);
188 }
11fdf7f2 189
f67539c2
TL
190 txn = db->BeginTransaction(write_options, txn_options);
191 txn->SetName(ToString(id));
192 txn->SetSnapshot();
193 if (a >= RO_SNAPSHOT) {
194 read_options.snapshot = txn->GetSnapshot();
195 ASSERT_TRUE(read_options.snapshot != nullptr);
196 }
11fdf7f2 197
f67539c2
TL
198 uint64_t buf[2];
199 buf[0] = id;
11fdf7f2 200
f67539c2
TL
201 // When scanning through the database, make sure that all unprepared
202 // keys have value >= snapshot and all other keys have value < snapshot.
203 int64_t snapshot_num = counter.fetch_add(1);
11fdf7f2 204
f67539c2
TL
205 Status s;
206 for (const auto& key : owned_keys) {
207 buf[1] = counter.fetch_add(1);
208 s = txn->Put(key, Slice((const char*)buf, sizeof(buf)));
209 if (!s.ok()) {
210 break;
211 }
212 if (a == REFRESH_SNAPSHOT) {
213 txn->SetSnapshot();
214 read_options.snapshot = txn->GetSnapshot();
215 snapshot_num = counter.fetch_add(1);
216 }
217 }
11fdf7f2 218
f67539c2
TL
219 // Failure is possible due to snapshot validation. In this case,
220 // rollback and move onto next iteration.
221 if (!s.ok()) {
222 ASSERT_TRUE(s.IsBusy());
223 ASSERT_OK(txn->Rollback());
224 delete txn;
225 continue;
226 }
11fdf7f2 227
f67539c2
TL
228 auto verify_key = [&owned_keys, &a, &id, &snapshot_num](
229 const std::string& key, const std::string& value) {
230 if (owned_keys.count(key) > 0) {
231 ASSERT_EQ(value.size(), 16);
232
233 // Since this key is part of owned_keys, then this key must be
234 // unprepared by this transaction identified by 'id'
235 ASSERT_EQ(((int64_t*)value.c_str())[0], id);
236 if (a == REFRESH_SNAPSHOT) {
237 // If refresh snapshot is true, then the snapshot is refreshed
238 // after every Put(), meaning that the current snapshot in
239 // snapshot_num must be greater than the "seqno" of any keys
240 // written by the current transaction.
241 ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
242 } else {
243 // If refresh snapshot is not on, then the snapshot was taken at
244 // the beginning of the transaction, meaning all writes must come
245 // after snapshot_num
246 ASSERT_GT(((int64_t*)value.c_str())[1], snapshot_num);
247 }
248 } else if (a >= RO_SNAPSHOT) {
249 // If this is not an unprepared key, just assert that the key
250 // "seqno" is smaller than the snapshot seqno.
251 ASSERT_EQ(value.size(), 16);
252 ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num);
253 }
254 };
255
256 // Validate Get()/Next()/Prev(). Do only one of them to save time, and
257 // reduce lock contention.
258 switch (rnd.Uniform(3)) {
259 case 0: // Validate Get()
260 {
261 for (const auto& key : keys) {
262 std::string value;
263 s = txn->Get(read_options, Slice(key), &value);
264 if (!s.ok()) {
265 ASSERT_TRUE(s.IsNotFound());
266 ASSERT_EQ(owned_keys.count(key), 0);
267 } else {
268 verify_key(key, value);
269 }
270 }
271 break;
272 }
273 case 1: // Validate Next()
274 {
275 Iterator* iter = txn->GetIterator(read_options);
276 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
277 verify_key(iter->key().ToString(), iter->value().ToString());
278 }
279 delete iter;
280 break;
281 }
282 case 2: // Validate Prev()
283 {
284 Iterator* iter = txn->GetIterator(read_options);
285 for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
286 verify_key(iter->key().ToString(), iter->value().ToString());
287 }
288 delete iter;
289 break;
290 }
291 default:
292 ASSERT_TRUE(false);
293 }
11fdf7f2 294
f67539c2
TL
295 if (rnd.OneIn(2)) {
296 ASSERT_OK(txn->Commit());
297 } else {
298 ASSERT_OK(txn->Rollback());
299 }
300 delete txn;
301 }
302 };
11fdf7f2 303
f67539c2
TL
304 std::vector<port::Thread> threads;
305 for (uint32_t i = 0; i < kNumThreads; i++) {
306 threads.emplace_back(stress_thread, i);
307 }
11fdf7f2 308
f67539c2
TL
309 for (auto& t : threads) {
310 t.join();
311 }
11fdf7f2 312}
f67539c2 313#endif // ROCKSDB_VALGRIND_RUN
11fdf7f2
TL
314
315// This tests how write unprepared behaves during recovery when the DB crashes
316// after a transaction has either been unprepared or prepared, and tests if
317// the changes are correctly applied for prepared transactions if we decide to
318// rollback/commit.
319TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
320 WriteOptions write_options;
321 write_options.disableWAL = false;
322 TransactionOptions txn_options;
323 std::vector<Transaction*> prepared_trans;
324 WriteUnpreparedTxnDB* wup_db;
325 options.disable_auto_compactions = true;
326
327 enum Action { UNPREPARED, ROLLBACK, COMMIT };
328
329 // batch_size of 1 causes writes to DB for every marker.
330 for (size_t batch_size : {1, 1000000}) {
f67539c2 331 txn_options.write_batch_flush_threshold = batch_size;
11fdf7f2
TL
332 for (bool empty : {true, false}) {
333 for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) {
334 for (int num_batches = 1; num_batches < 10; num_batches++) {
335 // Reset database.
336 prepared_trans.clear();
337 ReOpen();
338 wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
339 if (!empty) {
340 for (int i = 0; i < num_batches; i++) {
341 ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i),
342 "before value" + ToString(i)));
343 }
344 }
345
346 // Write num_batches unprepared batches.
347 Transaction* txn = db->BeginTransaction(write_options, txn_options);
348 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
349 txn->SetName("xid");
350 for (int i = 0; i < num_batches; i++) {
351 ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i)));
f67539c2
TL
352 if (txn_options.write_batch_flush_threshold == 1) {
353 // WriteUnprepared will check write_batch_flush_threshold and
354 // possibly flush before appending to the write batch. No flush
355 // will happen at the first write because the batch is still
356 // empty, so after k puts, there should be k-1 flushed batches.
357 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
11fdf7f2
TL
358 } else {
359 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
360 }
361 }
362 if (a == UNPREPARED) {
363 // This is done to prevent the destructor from rolling back the
364 // transaction for us, since we want to pretend we crashed and
365 // test that recovery does the rollback.
366 wup_txn->unprep_seqs_.clear();
367 } else {
368 txn->Prepare();
369 }
370 delete txn;
371
372 // Crash and run recovery code paths.
373 wup_db->db_impl_->FlushWAL(true);
374 wup_db->TEST_Crash();
375 ReOpenNoDelete();
376 assert(db != nullptr);
377
378 db->GetAllPreparedTransactions(&prepared_trans);
379 ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1);
380 if (a == ROLLBACK) {
381 ASSERT_OK(prepared_trans[0]->Rollback());
382 delete prepared_trans[0];
383 } else if (a == COMMIT) {
384 ASSERT_OK(prepared_trans[0]->Commit());
385 delete prepared_trans[0];
386 }
387
388 Iterator* iter = db->NewIterator(ReadOptions());
389 iter->SeekToFirst();
390 // Check that DB has before values.
391 if (!empty || a == COMMIT) {
392 for (int i = 0; i < num_batches; i++) {
393 ASSERT_TRUE(iter->Valid());
394 ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
395 if (a == COMMIT) {
396 ASSERT_EQ(iter->value().ToString(), "value" + ToString(i));
397 } else {
398 ASSERT_EQ(iter->value().ToString(),
399 "before value" + ToString(i));
400 }
401 iter->Next();
402 }
403 }
404 ASSERT_FALSE(iter->Valid());
405 delete iter;
406 }
407 }
408 }
409 }
410}
411
412// Basic test to see that unprepared batch gets written to DB when batch size
413// is exceeded. It also does some basic checks to see if commit/rollback works
414// as expected for write unprepared.
415TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
416 WriteOptions write_options;
417 TransactionOptions txn_options;
418 const int kNumKeys = 10;
419
420 // batch_size of 1 causes writes to DB for every marker.
421 for (size_t batch_size : {1, 1000000}) {
f67539c2 422 txn_options.write_batch_flush_threshold = batch_size;
11fdf7f2
TL
423 for (bool prepare : {false, true}) {
424 for (bool commit : {false, true}) {
425 ReOpen();
426 Transaction* txn = db->BeginTransaction(write_options, txn_options);
427 WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
428 txn->SetName("xid");
429
430 for (int i = 0; i < kNumKeys; i++) {
431 txn->Put("k" + ToString(i), "v" + ToString(i));
f67539c2
TL
432 if (txn_options.write_batch_flush_threshold == 1) {
433 // WriteUnprepared will check write_batch_flush_threshold and
434 // possibly flush before appending to the write batch. No flush will
435 // happen at the first write because the batch is still empty, so
436 // after k puts, there should be k-1 flushed batches.
437 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i);
11fdf7f2
TL
438 } else {
439 ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0);
440 }
441 }
442
443 if (prepare) {
444 ASSERT_OK(txn->Prepare());
445 }
446
447 Iterator* iter = db->NewIterator(ReadOptions());
448 iter->SeekToFirst();
449 assert(!iter->Valid());
450 ASSERT_FALSE(iter->Valid());
451 delete iter;
452
453 if (commit) {
454 ASSERT_OK(txn->Commit());
455 } else {
456 ASSERT_OK(txn->Rollback());
457 }
458 delete txn;
459
460 iter = db->NewIterator(ReadOptions());
461 iter->SeekToFirst();
462
463 for (int i = 0; i < (commit ? kNumKeys : 0); i++) {
464 ASSERT_TRUE(iter->Valid());
465 ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
466 ASSERT_EQ(iter->value().ToString(), "v" + ToString(i));
467 iter->Next();
468 }
469 ASSERT_FALSE(iter->Valid());
470 delete iter;
471 }
472 }
473 }
474}
475
476// Test whether logs containing unprepared/prepared batches are kept even
477// after memtable finishes flushing, and whether they are removed when
478// transaction commits/aborts.
479//
480// TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests.
481TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) {
482 WriteOptions write_options;
483 TransactionOptions txn_options;
484 // batch_size of 1 causes writes to DB for every marker.
f67539c2 485 txn_options.write_batch_flush_threshold = 1;
11fdf7f2
TL
486 const int kNumKeys = 10;
487
488 WriteOptions wopts;
489 wopts.sync = true;
490
491 for (bool prepare : {false, true}) {
492 for (bool commit : {false, true}) {
493 ReOpen();
494 auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
495 auto db_impl = wup_db->db_impl_;
496
497 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
498 ASSERT_OK(txn1->SetName("xid1"));
499
500 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
501 ASSERT_OK(txn2->SetName("xid2"));
502
503 // Spread this transaction across multiple log files.
504 for (int i = 0; i < kNumKeys; i++) {
505 ASSERT_OK(txn1->Put("k1" + ToString(i), "v" + ToString(i)));
506 if (i >= kNumKeys / 2) {
507 ASSERT_OK(txn2->Put("k2" + ToString(i), "v" + ToString(i)));
508 }
509
510 if (i > 0) {
511 db_impl->TEST_SwitchWAL();
512 }
513 }
514
515 ASSERT_GT(txn1->GetLogNumber(), 0);
516 ASSERT_GT(txn2->GetLogNumber(), 0);
517
518 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
519 txn1->GetLogNumber());
520 ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
521
522 if (prepare) {
523 ASSERT_OK(txn1->Prepare());
524 ASSERT_OK(txn2->Prepare());
525 }
526
527 ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
528 ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
529
530 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
531 txn1->GetLogNumber());
532 if (commit) {
533 ASSERT_OK(txn1->Commit());
534 } else {
535 ASSERT_OK(txn1->Rollback());
536 }
537
538 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
539 txn2->GetLogNumber());
540
541 if (commit) {
542 ASSERT_OK(txn2->Commit());
543 } else {
544 ASSERT_OK(txn2->Rollback());
545 }
546
547 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
548
549 delete txn1;
550 delete txn2;
551 }
552 }
553}
554
f67539c2
TL
555TEST_P(WriteUnpreparedTransactionTest, NoSnapshotWrite) {
556 WriteOptions woptions;
557 TransactionOptions txn_options;
558 txn_options.write_batch_flush_threshold = 1;
559
560 Transaction* txn = db->BeginTransaction(woptions, txn_options);
561
562 // Do some writes with no snapshot
563 ASSERT_OK(txn->Put("a", "a"));
564 ASSERT_OK(txn->Put("b", "b"));
565 ASSERT_OK(txn->Put("c", "c"));
566
567 // Test that it is still possible to create iterators after writes with no
568 // snapshot, if iterator snapshot is fresh enough.
569 ReadOptions roptions;
570 auto iter = txn->GetIterator(roptions);
571 int keys = 0;
572 for (iter->SeekToLast(); iter->Valid(); iter->Prev(), keys++) {
573 ASSERT_OK(iter->status());
574 ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
575 }
576 ASSERT_EQ(keys, 3);
577
578 delete iter;
579 delete txn;
580}
581
582// Test whether write to a transaction while iterating is supported.
583TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
584 WriteOptions woptions;
585 TransactionOptions txn_options;
586 txn_options.write_batch_flush_threshold = 1;
587
588 enum Action { DO_DELETE, DO_UPDATE };
589
590 for (Action a : {DO_DELETE, DO_UPDATE}) {
591 for (int i = 0; i < 100; i++) {
592 ASSERT_OK(db->Put(woptions, ToString(i), ToString(i)));
593 }
594
595 Transaction* txn = db->BeginTransaction(woptions, txn_options);
596 // write_batch_ now contains 1 key.
597 ASSERT_OK(txn->Put("9", "a"));
598
599 ReadOptions roptions;
600 auto iter = txn->GetIterator(roptions);
601 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
602 ASSERT_OK(iter->status());
603 if (iter->key() == "9") {
604 ASSERT_EQ(iter->value().ToString(), "a");
605 } else {
606 ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
607 }
608
609 if (a == DO_DELETE) {
610 ASSERT_OK(txn->Delete(iter->key()));
611 } else {
612 ASSERT_OK(txn->Put(iter->key(), "b"));
613 }
614 }
615
616 delete iter;
617 ASSERT_OK(txn->Commit());
618
619 iter = db->NewIterator(roptions);
620 if (a == DO_DELETE) {
621 // Check that db is empty.
622 iter->SeekToFirst();
623 ASSERT_FALSE(iter->Valid());
624 } else {
625 int keys = 0;
626 // Check that all values are updated to b.
627 for (iter->SeekToFirst(); iter->Valid(); iter->Next(), keys++) {
628 ASSERT_OK(iter->status());
629 ASSERT_EQ(iter->value().ToString(), "b");
630 }
631 ASSERT_EQ(keys, 100);
632 }
633
634 delete iter;
635 delete txn;
636 }
637}
638
639TEST_P(WriteUnpreparedTransactionTest, SavePoint) {
640 WriteOptions woptions;
641 TransactionOptions txn_options;
642 txn_options.write_batch_flush_threshold = 1;
643
644 Transaction* txn = db->BeginTransaction(woptions, txn_options);
645 txn->SetSavePoint();
646 ASSERT_OK(txn->Put("a", "a"));
647 ASSERT_OK(txn->Put("b", "b"));
648 ASSERT_OK(txn->Commit());
649
650 ReadOptions roptions;
651 std::string value;
652 ASSERT_OK(txn->Get(roptions, "a", &value));
653 ASSERT_EQ(value, "a");
654 ASSERT_OK(txn->Get(roptions, "b", &value));
655 ASSERT_EQ(value, "b");
656 delete txn;
657}
658
659TEST_P(WriteUnpreparedTransactionTest, UntrackedKeys) {
660 WriteOptions woptions;
661 TransactionOptions txn_options;
662 txn_options.write_batch_flush_threshold = 1;
663
664 Transaction* txn = db->BeginTransaction(woptions, txn_options);
665 auto wb = txn->GetWriteBatch()->GetWriteBatch();
666 ASSERT_OK(txn->Put("a", "a"));
667 ASSERT_OK(wb->Put("a_untrack", "a_untrack"));
668 txn->SetSavePoint();
669 ASSERT_OK(txn->Put("b", "b"));
670 ASSERT_OK(txn->Put("b_untrack", "b_untrack"));
671
672 ReadOptions roptions;
673 std::string value;
674 ASSERT_OK(txn->Get(roptions, "a", &value));
675 ASSERT_EQ(value, "a");
676 ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
677 ASSERT_EQ(value, "a_untrack");
678 ASSERT_OK(txn->Get(roptions, "b", &value));
679 ASSERT_EQ(value, "b");
680 ASSERT_OK(txn->Get(roptions, "b_untrack", &value));
681 ASSERT_EQ(value, "b_untrack");
682
683 // b and b_untrack should be rolled back.
684 ASSERT_OK(txn->RollbackToSavePoint());
685 ASSERT_OK(txn->Get(roptions, "a", &value));
686 ASSERT_EQ(value, "a");
687 ASSERT_OK(txn->Get(roptions, "a_untrack", &value));
688 ASSERT_EQ(value, "a_untrack");
689 auto s = txn->Get(roptions, "b", &value);
690 ASSERT_TRUE(s.IsNotFound());
691 s = txn->Get(roptions, "b_untrack", &value);
692 ASSERT_TRUE(s.IsNotFound());
693
694 // Everything should be rolled back.
695 ASSERT_OK(txn->Rollback());
696 s = txn->Get(roptions, "a", &value);
697 ASSERT_TRUE(s.IsNotFound());
698 s = txn->Get(roptions, "a_untrack", &value);
699 ASSERT_TRUE(s.IsNotFound());
700 s = txn->Get(roptions, "b", &value);
701 ASSERT_TRUE(s.IsNotFound());
702 s = txn->Get(roptions, "b_untrack", &value);
703 ASSERT_TRUE(s.IsNotFound());
704
705 delete txn;
706}
707
708} // namespace ROCKSDB_NAMESPACE
11fdf7f2
TL
709
710int main(int argc, char** argv) {
711 ::testing::InitGoogleTest(&argc, argv);
712 return RUN_ALL_TESTS();
713}
714
715#else
716#include <stdio.h>
717
718int main(int /*argc*/, char** /*argv*/) {
719 fprintf(stderr,
720 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
721 return 0;
722}
723
724#endif // ROCKSDB_LITE