1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
12 #include "rocksdb/db.h"
13 #include "rocksdb/utilities/optimistic_transaction_db.h"
14 #include "rocksdb/utilities/transaction.h"
15 #include "util/crc32c.h"
16 #include "util/logging.h"
17 #include "util/random.h"
18 #include "util/testharness.h"
19 #include "util/transaction_test_util.h"
20 #include "port/port.h"
26 class OptimisticTransactionTest
: public testing::Test
{
28 OptimisticTransactionDB
* txn_db
;
33 OptimisticTransactionTest() {
34 options
.create_if_missing
= true;
35 options
.max_write_buffer_number
= 2;
36 dbname
= test::TmpDir() + "/optimistic_transaction_testdb";
38 DestroyDB(dbname
, options
);
41 ~OptimisticTransactionTest() {
43 DestroyDB(dbname
, options
);
53 Status s
= OptimisticTransactionDB::Open(options
, dbname
, &txn_db
);
55 db
= txn_db
->GetBaseDB();
59 TEST_F(OptimisticTransactionTest
, SuccessTest
) {
60 WriteOptions write_options
;
61 ReadOptions read_options
;
65 db
->Put(write_options
, Slice("foo"), Slice("bar"));
66 db
->Put(write_options
, Slice("foo2"), Slice("bar"));
68 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
71 txn
->GetForUpdate(read_options
, "foo", &value
);
72 ASSERT_EQ(value
, "bar");
74 txn
->Put(Slice("foo"), Slice("bar2"));
76 txn
->GetForUpdate(read_options
, "foo", &value
);
77 ASSERT_EQ(value
, "bar2");
82 db
->Get(read_options
, "foo", &value
);
83 ASSERT_EQ(value
, "bar2");
88 TEST_F(OptimisticTransactionTest
, WriteConflictTest
) {
89 WriteOptions write_options
;
90 ReadOptions read_options
;
94 db
->Put(write_options
, "foo", "bar");
95 db
->Put(write_options
, "foo2", "bar");
97 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
100 txn
->Put("foo", "bar2");
102 // This Put outside of a transaction will conflict with the previous write
103 s
= db
->Put(write_options
, "foo", "barz");
106 s
= db
->Get(read_options
, "foo", &value
);
107 ASSERT_EQ(value
, "barz");
108 ASSERT_EQ(1, txn
->GetNumKeys());
111 ASSERT_TRUE(s
.IsBusy()); // Txn should not commit
113 // Verify that transaction did not write anything
114 db
->Get(read_options
, "foo", &value
);
115 ASSERT_EQ(value
, "barz");
116 db
->Get(read_options
, "foo2", &value
);
117 ASSERT_EQ(value
, "bar");
122 TEST_F(OptimisticTransactionTest
, WriteConflictTest2
) {
123 WriteOptions write_options
;
124 ReadOptions read_options
;
125 OptimisticTransactionOptions txn_options
;
129 db
->Put(write_options
, "foo", "bar");
130 db
->Put(write_options
, "foo2", "bar");
132 txn_options
.set_snapshot
= true;
133 Transaction
* txn
= txn_db
->BeginTransaction(write_options
, txn_options
);
136 // This Put outside of a transaction will conflict with a later write
137 s
= db
->Put(write_options
, "foo", "barz");
140 txn
->Put("foo", "bar2"); // Conflicts with write done after snapshot taken
142 s
= db
->Get(read_options
, "foo", &value
);
143 ASSERT_EQ(value
, "barz");
146 ASSERT_TRUE(s
.IsBusy()); // Txn should not commit
148 // Verify that transaction did not write anything
149 db
->Get(read_options
, "foo", &value
);
150 ASSERT_EQ(value
, "barz");
151 db
->Get(read_options
, "foo2", &value
);
152 ASSERT_EQ(value
, "bar");
157 TEST_F(OptimisticTransactionTest
, ReadConflictTest
) {
158 WriteOptions write_options
;
159 ReadOptions read_options
, snapshot_read_options
;
160 OptimisticTransactionOptions txn_options
;
164 db
->Put(write_options
, "foo", "bar");
165 db
->Put(write_options
, "foo2", "bar");
167 txn_options
.set_snapshot
= true;
168 Transaction
* txn
= txn_db
->BeginTransaction(write_options
, txn_options
);
172 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
174 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
175 ASSERT_EQ(value
, "bar");
177 // This Put outside of a transaction will conflict with the previous read
178 s
= db
->Put(write_options
, "foo", "barz");
181 s
= db
->Get(read_options
, "foo", &value
);
182 ASSERT_EQ(value
, "barz");
185 ASSERT_TRUE(s
.IsBusy()); // Txn should not commit
187 // Verify that transaction did not write anything
188 txn
->GetForUpdate(read_options
, "foo", &value
);
189 ASSERT_EQ(value
, "barz");
190 txn
->GetForUpdate(read_options
, "foo2", &value
);
191 ASSERT_EQ(value
, "bar");
196 TEST_F(OptimisticTransactionTest
, TxnOnlyTest
) {
197 // Test to make sure transactions work when there are no other writes in an
200 WriteOptions write_options
;
201 ReadOptions read_options
;
205 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
216 TEST_F(OptimisticTransactionTest
, FlushTest
) {
217 WriteOptions write_options
;
218 ReadOptions read_options
, snapshot_read_options
;
222 db
->Put(write_options
, Slice("foo"), Slice("bar"));
223 db
->Put(write_options
, Slice("foo2"), Slice("bar"));
225 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
228 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
230 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
231 ASSERT_EQ(value
, "bar");
233 txn
->Put(Slice("foo"), Slice("bar2"));
235 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
236 ASSERT_EQ(value
, "bar2");
238 // Put a random key so we have a memtable to flush
239 s
= db
->Put(write_options
, "dummy", "dummy");
242 // force a memtable flush
243 FlushOptions flush_ops
;
244 db
->Flush(flush_ops
);
247 // txn should commit since the flushed table is still in MemtableList History
250 db
->Get(read_options
, "foo", &value
);
251 ASSERT_EQ(value
, "bar2");
256 TEST_F(OptimisticTransactionTest
, FlushTest2
) {
257 WriteOptions write_options
;
258 ReadOptions read_options
, snapshot_read_options
;
262 db
->Put(write_options
, Slice("foo"), Slice("bar"));
263 db
->Put(write_options
, Slice("foo2"), Slice("bar"));
265 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
268 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
270 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
271 ASSERT_EQ(value
, "bar");
273 txn
->Put(Slice("foo"), Slice("bar2"));
275 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
276 ASSERT_EQ(value
, "bar2");
278 // Put a random key so we have a MemTable to flush
279 s
= db
->Put(write_options
, "dummy", "dummy");
282 // force a memtable flush
283 FlushOptions flush_ops
;
284 db
->Flush(flush_ops
);
286 // Put a random key so we have a MemTable to flush
287 s
= db
->Put(write_options
, "dummy", "dummy2");
290 // force a memtable flush
291 db
->Flush(flush_ops
);
293 s
= db
->Put(write_options
, "dummy", "dummy3");
296 // force a memtable flush
297 // Since our test db has max_write_buffer_number=2, this flush will cause
298 // the first memtable to get purged from the MemtableList history.
299 db
->Flush(flush_ops
);
302 // txn should not commit since MemTableList History is not large enough
303 ASSERT_TRUE(s
.IsTryAgain());
305 db
->Get(read_options
, "foo", &value
);
306 ASSERT_EQ(value
, "bar");
311 TEST_F(OptimisticTransactionTest
, NoSnapshotTest
) {
312 WriteOptions write_options
;
313 ReadOptions read_options
;
317 db
->Put(write_options
, "AAA", "bar");
319 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
322 // Modify key after transaction start
323 db
->Put(write_options
, "AAA", "bar1");
325 // Read and write without a snapshot
326 txn
->GetForUpdate(read_options
, "AAA", &value
);
327 ASSERT_EQ(value
, "bar1");
328 txn
->Put("AAA", "bar2");
330 // Should commit since read/write was done after data changed
334 txn
->GetForUpdate(read_options
, "AAA", &value
);
335 ASSERT_EQ(value
, "bar2");
340 TEST_F(OptimisticTransactionTest
, MultipleSnapshotTest
) {
341 WriteOptions write_options
;
342 ReadOptions read_options
, snapshot_read_options
;
346 db
->Put(write_options
, "AAA", "bar");
347 db
->Put(write_options
, "BBB", "bar");
348 db
->Put(write_options
, "CCC", "bar");
350 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
353 db
->Put(write_options
, "AAA", "bar1");
355 // Read and write without a snapshot
356 txn
->GetForUpdate(read_options
, "AAA", &value
);
357 ASSERT_EQ(value
, "bar1");
358 txn
->Put("AAA", "bar2");
360 // Modify BBB before snapshot is taken
361 db
->Put(write_options
, "BBB", "bar1");
364 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
366 // Read and write with snapshot
367 txn
->GetForUpdate(snapshot_read_options
, "BBB", &value
);
368 ASSERT_EQ(value
, "bar1");
369 txn
->Put("BBB", "bar2");
371 db
->Put(write_options
, "CCC", "bar1");
373 // Set a new snapshot
375 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
377 // Read and write with snapshot
378 txn
->GetForUpdate(snapshot_read_options
, "CCC", &value
);
379 ASSERT_EQ(value
, "bar1");
380 txn
->Put("CCC", "bar2");
382 s
= txn
->GetForUpdate(read_options
, "AAA", &value
);
384 ASSERT_EQ(value
, "bar2");
385 s
= txn
->GetForUpdate(read_options
, "BBB", &value
);
387 ASSERT_EQ(value
, "bar2");
388 s
= txn
->GetForUpdate(read_options
, "CCC", &value
);
390 ASSERT_EQ(value
, "bar2");
392 s
= db
->Get(read_options
, "AAA", &value
);
394 ASSERT_EQ(value
, "bar1");
395 s
= db
->Get(read_options
, "BBB", &value
);
397 ASSERT_EQ(value
, "bar1");
398 s
= db
->Get(read_options
, "CCC", &value
);
400 ASSERT_EQ(value
, "bar1");
405 s
= db
->Get(read_options
, "AAA", &value
);
407 ASSERT_EQ(value
, "bar2");
408 s
= db
->Get(read_options
, "BBB", &value
);
410 ASSERT_EQ(value
, "bar2");
411 s
= db
->Get(read_options
, "CCC", &value
);
413 ASSERT_EQ(value
, "bar2");
415 // verify that we track multiple writes to the same key at different snapshots
417 txn
= txn_db
->BeginTransaction(write_options
);
419 // Potentially conflicting writes
420 db
->Put(write_options
, "ZZZ", "zzz");
421 db
->Put(write_options
, "XXX", "xxx");
425 OptimisticTransactionOptions txn_options
;
426 txn_options
.set_snapshot
= true;
427 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
430 // This should not conflict in txn since the snapshot is later than the
431 // previous write (spoiler alert: it will later conflict with txn2).
432 txn
->Put("ZZZ", "zzzz");
438 // This will conflict since the snapshot is earlier than another write to ZZZ
439 txn2
->Put("ZZZ", "xxxxx");
442 ASSERT_TRUE(s
.IsBusy());
447 TEST_F(OptimisticTransactionTest
, ColumnFamiliesTest
) {
448 WriteOptions write_options
;
449 ReadOptions read_options
, snapshot_read_options
;
450 OptimisticTransactionOptions txn_options
;
454 ColumnFamilyHandle
*cfa
, *cfb
;
455 ColumnFamilyOptions cf_options
;
457 // Create 2 new column families
458 s
= db
->CreateColumnFamily(cf_options
, "CFA", &cfa
);
460 s
= db
->CreateColumnFamily(cf_options
, "CFB", &cfb
);
467 // open DB with three column families
468 std::vector
<ColumnFamilyDescriptor
> column_families
;
469 // have to open default column family
470 column_families
.push_back(
471 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, ColumnFamilyOptions()));
472 // open the new column families
473 column_families
.push_back(
474 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
475 column_families
.push_back(
476 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
477 std::vector
<ColumnFamilyHandle
*> handles
;
478 s
= OptimisticTransactionDB::Open(options
, dbname
, column_families
, &handles
,
481 db
= txn_db
->GetBaseDB();
483 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
487 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
489 txn_options
.set_snapshot
= true;
490 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
493 // Write some data to the db
495 batch
.Put("foo", "foo");
496 batch
.Put(handles
[1], "AAA", "bar");
497 batch
.Put(handles
[1], "AAAZZZ", "bar");
498 s
= db
->Write(write_options
, &batch
);
500 db
->Delete(write_options
, handles
[1], "AAAZZZ");
502 // These keys do no conflict with existing writes since they're in
503 // different column families
505 txn
->GetForUpdate(snapshot_read_options
, handles
[1], "foo", &value
);
506 Slice
key_slice("AAAZZZ");
507 Slice value_slices
[2] = {Slice("bar"), Slice("bar")};
508 txn
->Put(handles
[2], SliceParts(&key_slice
, 1), SliceParts(value_slices
, 2));
510 ASSERT_EQ(3, txn
->GetNumKeys());
515 s
= db
->Get(read_options
, "AAA", &value
);
516 ASSERT_TRUE(s
.IsNotFound());
517 s
= db
->Get(read_options
, handles
[2], "AAAZZZ", &value
);
518 ASSERT_EQ(value
, "barbar");
520 Slice key_slices
[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
521 Slice
value_slice("barbarbar");
522 // This write will cause a conflict with the earlier batch write
523 txn2
->Put(handles
[1], SliceParts(key_slices
, 3), SliceParts(&value_slice
, 1));
525 txn2
->Delete(handles
[2], "XXX");
526 txn2
->Delete(handles
[1], "XXX");
527 s
= txn2
->GetForUpdate(snapshot_read_options
, handles
[1], "AAA", &value
);
528 ASSERT_TRUE(s
.IsNotFound());
530 // Verify txn did not commit
532 ASSERT_TRUE(s
.IsBusy());
533 s
= db
->Get(read_options
, handles
[1], "AAAZZZ", &value
);
534 ASSERT_EQ(value
, "barbar");
539 txn
= txn_db
->BeginTransaction(write_options
, txn_options
);
540 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
542 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
545 std::vector
<ColumnFamilyHandle
*> multiget_cfh
= {handles
[1], handles
[2],
546 handles
[0], handles
[2]};
547 std::vector
<Slice
> multiget_keys
= {"AAA", "AAAZZZ", "foo", "foo"};
548 std::vector
<std::string
> values(4);
550 std::vector
<Status
> results
= txn
->MultiGetForUpdate(
551 snapshot_read_options
, multiget_cfh
, multiget_keys
, &values
);
552 ASSERT_OK(results
[0]);
553 ASSERT_OK(results
[1]);
554 ASSERT_OK(results
[2]);
555 ASSERT_TRUE(results
[3].IsNotFound());
556 ASSERT_EQ(values
[0], "bar");
557 ASSERT_EQ(values
[1], "barbar");
558 ASSERT_EQ(values
[2], "foo");
560 txn
->Delete(handles
[2], "ZZZ");
561 txn
->Put(handles
[2], "ZZZ", "YYY");
562 txn
->Put(handles
[2], "ZZZ", "YYYY");
563 txn
->Delete(handles
[2], "ZZZ");
564 txn
->Put(handles
[2], "AAAZZZ", "barbarbar");
566 ASSERT_EQ(5, txn
->GetNumKeys());
571 s
= db
->Get(read_options
, handles
[2], "ZZZ", &value
);
572 ASSERT_TRUE(s
.IsNotFound());
574 // Put a key which will conflict with the next txn using the previous snapshot
575 db
->Put(write_options
, handles
[2], "foo", "000");
577 results
= txn2
->MultiGetForUpdate(snapshot_read_options
, multiget_cfh
,
578 multiget_keys
, &values
);
579 ASSERT_OK(results
[0]);
580 ASSERT_OK(results
[1]);
581 ASSERT_OK(results
[2]);
582 ASSERT_TRUE(results
[3].IsNotFound());
583 ASSERT_EQ(values
[0], "bar");
584 ASSERT_EQ(values
[1], "barbar");
585 ASSERT_EQ(values
[2], "foo");
587 // Verify Txn Did not Commit
589 ASSERT_TRUE(s
.IsBusy());
591 s
= db
->DropColumnFamily(handles
[1]);
593 s
= db
->DropColumnFamily(handles
[2]);
599 for (auto handle
: handles
) {
604 TEST_F(OptimisticTransactionTest
, EmptyTest
) {
605 WriteOptions write_options
;
606 ReadOptions read_options
;
610 s
= db
->Put(write_options
, "aaa", "aaa");
613 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
618 txn
= txn_db
->BeginTransaction(write_options
);
622 txn
= txn_db
->BeginTransaction(write_options
);
623 s
= txn
->GetForUpdate(read_options
, "aaa", &value
);
624 ASSERT_EQ(value
, "aaa");
630 txn
= txn_db
->BeginTransaction(write_options
);
632 s
= txn
->GetForUpdate(read_options
, "aaa", &value
);
633 ASSERT_EQ(value
, "aaa");
635 s
= db
->Put(write_options
, "aaa", "xxx");
637 ASSERT_TRUE(s
.IsBusy());
641 TEST_F(OptimisticTransactionTest
, PredicateManyPreceders
) {
642 WriteOptions write_options
;
643 ReadOptions read_options1
, read_options2
;
644 OptimisticTransactionOptions txn_options
;
648 txn_options
.set_snapshot
= true;
649 Transaction
* txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
650 read_options1
.snapshot
= txn1
->GetSnapshot();
652 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
);
654 read_options2
.snapshot
= txn2
->GetSnapshot();
656 std::vector
<Slice
> multiget_keys
= {"1", "2", "3"};
657 std::vector
<std::string
> multiget_values
;
659 std::vector
<Status
> results
=
660 txn1
->MultiGetForUpdate(read_options1
, multiget_keys
, &multiget_values
);
661 ASSERT_TRUE(results
[1].IsNotFound());
668 multiget_values
.clear();
670 txn1
->MultiGetForUpdate(read_options1
, multiget_keys
, &multiget_values
);
671 ASSERT_TRUE(results
[1].IsNotFound());
673 // should not commit since txn2 wrote a key txn has read
675 ASSERT_TRUE(s
.IsBusy());
680 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
681 read_options1
.snapshot
= txn1
->GetSnapshot();
683 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
684 read_options2
.snapshot
= txn2
->GetSnapshot();
690 // txn1 can commit since txn2's delete hasn't happened yet (it's just batched)
694 s
= txn2
->GetForUpdate(read_options2
, "4", &value
);
695 ASSERT_TRUE(s
.IsNotFound());
697 // txn2 cannot commit since txn1 changed "4"
699 ASSERT_TRUE(s
.IsBusy());
705 TEST_F(OptimisticTransactionTest
, LostUpdate
) {
706 WriteOptions write_options
;
707 ReadOptions read_options
, read_options1
, read_options2
;
708 OptimisticTransactionOptions txn_options
;
712 // Test 2 transactions writing to the same key in multiple orders and
713 // with/without snapshots
715 Transaction
* txn1
= txn_db
->BeginTransaction(write_options
);
716 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
);
725 ASSERT_TRUE(s
.IsBusy());
730 txn_options
.set_snapshot
= true;
731 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
732 read_options1
.snapshot
= txn1
->GetSnapshot();
734 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
735 read_options2
.snapshot
= txn2
->GetSnapshot();
744 ASSERT_TRUE(s
.IsBusy());
749 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
750 read_options1
.snapshot
= txn1
->GetSnapshot();
752 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
753 read_options2
.snapshot
= txn2
->GetSnapshot();
761 ASSERT_TRUE(s
.IsBusy());
766 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
767 read_options1
.snapshot
= txn1
->GetSnapshot();
769 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
770 read_options2
.snapshot
= txn2
->GetSnapshot();
784 txn1
= txn_db
->BeginTransaction(write_options
);
785 txn2
= txn_db
->BeginTransaction(write_options
);
798 s
= db
->Get(read_options
, "1", &value
);
800 ASSERT_EQ(value
, "8");
803 TEST_F(OptimisticTransactionTest
, UntrackedWrites
) {
804 WriteOptions write_options
;
805 ReadOptions read_options
;
809 // Verify transaction rollback works for untracked keys.
810 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
811 txn
->PutUntracked("untracked", "0");
813 s
= db
->Get(read_options
, "untracked", &value
);
814 ASSERT_TRUE(s
.IsNotFound());
817 txn
= txn_db
->BeginTransaction(write_options
);
819 txn
->Put("tracked", "1");
820 txn
->PutUntracked("untracked", "1");
821 txn
->MergeUntracked("untracked", "2");
822 txn
->DeleteUntracked("untracked");
824 // Write to the untracked key outside of the transaction and verify
825 // it doesn't prevent the transaction from committing.
826 s
= db
->Put(write_options
, "untracked", "x");
832 s
= db
->Get(read_options
, "untracked", &value
);
833 ASSERT_TRUE(s
.IsNotFound());
836 txn
= txn_db
->BeginTransaction(write_options
);
838 txn
->Put("tracked", "10");
839 txn
->PutUntracked("untracked", "A");
841 // Write to tracked key outside of the transaction and verify that the
842 // untracked keys are not written when the commit fails.
843 s
= db
->Delete(write_options
, "tracked");
846 ASSERT_TRUE(s
.IsBusy());
848 s
= db
->Get(read_options
, "untracked", &value
);
849 ASSERT_TRUE(s
.IsNotFound());
854 TEST_F(OptimisticTransactionTest
, IteratorTest
) {
855 WriteOptions write_options
;
856 ReadOptions read_options
, snapshot_read_options
;
857 OptimisticTransactionOptions txn_options
;
861 // Write some keys to the db
862 s
= db
->Put(write_options
, "A", "a");
865 s
= db
->Put(write_options
, "G", "g");
868 s
= db
->Put(write_options
, "F", "f");
871 s
= db
->Put(write_options
, "C", "c");
874 s
= db
->Put(write_options
, "D", "d");
877 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
880 // Write some keys in a txn
881 s
= txn
->Put("B", "b");
884 s
= txn
->Put("H", "h");
887 s
= txn
->Delete("D");
890 s
= txn
->Put("E", "e");
894 const Snapshot
* snapshot
= txn
->GetSnapshot();
896 // Write some keys to the db after the snapshot
897 s
= db
->Put(write_options
, "BB", "xx");
900 s
= db
->Put(write_options
, "C", "xx");
903 read_options
.snapshot
= snapshot
;
904 Iterator
* iter
= txn
->GetIterator(read_options
);
905 ASSERT_OK(iter
->status());
908 // Read all keys via iter and lock them all
909 std::string results
[] = {"a", "b", "c", "e", "f", "g", "h"};
910 for (int i
= 0; i
< 7; i
++) {
911 ASSERT_OK(iter
->status());
912 ASSERT_TRUE(iter
->Valid());
913 ASSERT_EQ(results
[i
], iter
->value().ToString());
915 s
= txn
->GetForUpdate(read_options
, iter
->key(), nullptr);
920 ASSERT_FALSE(iter
->Valid());
923 ASSERT_OK(iter
->status());
924 ASSERT_TRUE(iter
->Valid());
925 ASSERT_EQ("g", iter
->value().ToString());
928 ASSERT_OK(iter
->status());
929 ASSERT_TRUE(iter
->Valid());
930 ASSERT_EQ("f", iter
->value().ToString());
933 ASSERT_OK(iter
->status());
934 ASSERT_TRUE(iter
->Valid());
935 ASSERT_EQ("e", iter
->value().ToString());
938 ASSERT_OK(iter
->status());
939 ASSERT_TRUE(iter
->Valid());
940 ASSERT_EQ("c", iter
->value().ToString());
943 ASSERT_OK(iter
->status());
944 ASSERT_TRUE(iter
->Valid());
945 ASSERT_EQ("e", iter
->value().ToString());
948 ASSERT_OK(iter
->status());
949 ASSERT_TRUE(iter
->Valid());
950 ASSERT_EQ("a", iter
->value().ToString());
953 ASSERT_OK(iter
->status());
954 ASSERT_FALSE(iter
->Valid());
957 ASSERT_OK(iter
->status());
958 ASSERT_TRUE(iter
->Valid());
959 ASSERT_EQ("h", iter
->value().ToString());
961 // key "C" was modified in the db after txn's snapshot. txn will not commit.
963 ASSERT_TRUE(s
.IsBusy());
969 TEST_F(OptimisticTransactionTest
, SavepointTest
) {
970 WriteOptions write_options
;
971 ReadOptions read_options
, snapshot_read_options
;
972 OptimisticTransactionOptions txn_options
;
976 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
979 s
= txn
->RollbackToSavePoint();
980 ASSERT_TRUE(s
.IsNotFound());
982 txn
->SetSavePoint(); // 1
984 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to beginning of txn
985 s
= txn
->RollbackToSavePoint();
986 ASSERT_TRUE(s
.IsNotFound());
988 s
= txn
->Put("B", "b");
994 s
= db
->Get(read_options
, "B", &value
);
996 ASSERT_EQ("b", value
);
999 txn
= txn_db
->BeginTransaction(write_options
);
1002 s
= txn
->Put("A", "a");
1005 s
= txn
->Put("B", "bb");
1008 s
= txn
->Put("C", "c");
1011 txn
->SetSavePoint(); // 2
1013 s
= txn
->Delete("B");
1016 s
= txn
->Put("C", "cc");
1019 s
= txn
->Put("D", "d");
1022 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to 2
1024 s
= txn
->Get(read_options
, "A", &value
);
1026 ASSERT_EQ("a", value
);
1028 s
= txn
->Get(read_options
, "B", &value
);
1030 ASSERT_EQ("bb", value
);
1032 s
= txn
->Get(read_options
, "C", &value
);
1034 ASSERT_EQ("c", value
);
1036 s
= txn
->Get(read_options
, "D", &value
);
1037 ASSERT_TRUE(s
.IsNotFound());
1039 s
= txn
->Put("A", "a");
1042 s
= txn
->Put("E", "e");
1045 // Rollback to beginning of txn
1046 s
= txn
->RollbackToSavePoint();
1047 ASSERT_TRUE(s
.IsNotFound());
1050 s
= txn
->Get(read_options
, "A", &value
);
1051 ASSERT_TRUE(s
.IsNotFound());
1053 s
= txn
->Get(read_options
, "B", &value
);
1055 ASSERT_EQ("b", value
);
1057 s
= txn
->Get(read_options
, "D", &value
);
1058 ASSERT_TRUE(s
.IsNotFound());
1060 s
= txn
->Get(read_options
, "D", &value
);
1061 ASSERT_TRUE(s
.IsNotFound());
1063 s
= txn
->Get(read_options
, "E", &value
);
1064 ASSERT_TRUE(s
.IsNotFound());
1066 s
= txn
->Put("A", "aa");
1069 s
= txn
->Put("F", "f");
1072 txn
->SetSavePoint(); // 3
1073 txn
->SetSavePoint(); // 4
1075 s
= txn
->Put("G", "g");
1078 s
= txn
->Delete("F");
1081 s
= txn
->Delete("B");
1084 s
= txn
->Get(read_options
, "A", &value
);
1086 ASSERT_EQ("aa", value
);
1088 s
= txn
->Get(read_options
, "F", &value
);
1089 ASSERT_TRUE(s
.IsNotFound());
1091 s
= txn
->Get(read_options
, "B", &value
);
1092 ASSERT_TRUE(s
.IsNotFound());
1094 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to 3
1096 s
= txn
->Get(read_options
, "F", &value
);
1098 ASSERT_EQ("f", value
);
1100 s
= txn
->Get(read_options
, "G", &value
);
1101 ASSERT_TRUE(s
.IsNotFound());
1106 s
= db
->Get(read_options
, "F", &value
);
1108 ASSERT_EQ("f", value
);
1110 s
= db
->Get(read_options
, "G", &value
);
1111 ASSERT_TRUE(s
.IsNotFound());
1113 s
= db
->Get(read_options
, "A", &value
);
1115 ASSERT_EQ("aa", value
);
1117 s
= db
->Get(read_options
, "B", &value
);
1119 ASSERT_EQ("b", value
);
1121 s
= db
->Get(read_options
, "C", &value
);
1122 ASSERT_TRUE(s
.IsNotFound());
1124 s
= db
->Get(read_options
, "D", &value
);
1125 ASSERT_TRUE(s
.IsNotFound());
1127 s
= db
->Get(read_options
, "E", &value
);
1128 ASSERT_TRUE(s
.IsNotFound());
1133 TEST_F(OptimisticTransactionTest
, UndoGetForUpdateTest
) {
1134 WriteOptions write_options
;
1135 ReadOptions read_options
, snapshot_read_options
;
1136 OptimisticTransactionOptions txn_options
;
1140 db
->Put(write_options
, "A", "");
1142 Transaction
* txn1
= txn_db
->BeginTransaction(write_options
);
1145 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1148 txn1
->UndoGetForUpdate("A");
1150 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
);
1151 txn2
->Put("A", "x");
1156 // Verify that txn1 can commit since A isn't conflict checked
1161 txn1
= txn_db
->BeginTransaction(write_options
);
1162 txn1
->Put("A", "a");
1164 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1167 txn1
->UndoGetForUpdate("A");
1169 txn2
= txn_db
->BeginTransaction(write_options
);
1170 txn2
->Put("A", "x");
1175 // Verify that txn1 cannot commit since A will still be conflict checked
1177 ASSERT_TRUE(s
.IsBusy());
1180 txn1
= txn_db
->BeginTransaction(write_options
);
1182 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1184 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1187 txn1
->UndoGetForUpdate("A");
1189 txn2
= txn_db
->BeginTransaction(write_options
);
1190 txn2
->Put("A", "x");
1195 // Verify that txn1 cannot commit since A will still be conflict checked
1197 ASSERT_TRUE(s
.IsBusy());
1200 txn1
= txn_db
->BeginTransaction(write_options
);
1202 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1204 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1207 txn1
->UndoGetForUpdate("A");
1208 txn1
->UndoGetForUpdate("A");
1210 txn2
= txn_db
->BeginTransaction(write_options
);
1211 txn2
->Put("A", "x");
1216 // Verify that txn1 can commit since A isn't conflict checked
1221 txn1
= txn_db
->BeginTransaction(write_options
);
1223 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1226 txn1
->SetSavePoint();
1227 txn1
->UndoGetForUpdate("A");
1229 txn2
= txn_db
->BeginTransaction(write_options
);
1230 txn2
->Put("A", "x");
1235 // Verify that txn1 cannot commit since A will still be conflict checked
1237 ASSERT_TRUE(s
.IsBusy());
1240 txn1
= txn_db
->BeginTransaction(write_options
);
1242 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1245 txn1
->SetSavePoint();
1246 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1248 txn1
->UndoGetForUpdate("A");
1250 txn2
= txn_db
->BeginTransaction(write_options
);
1251 txn2
->Put("A", "x");
1256 // Verify that txn1 cannot commit since A will still be conflict checked
1258 ASSERT_TRUE(s
.IsBusy());
1261 txn1
= txn_db
->BeginTransaction(write_options
);
1263 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1266 txn1
->SetSavePoint();
1267 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1269 txn1
->UndoGetForUpdate("A");
1271 txn1
->RollbackToSavePoint();
1272 txn1
->UndoGetForUpdate("A");
1274 txn2
= txn_db
->BeginTransaction(write_options
);
1275 txn2
->Put("A", "x");
1280 // Verify that txn1 can commit since A isn't conflict checked
1287 Status
OptimisticTransactionStressTestInserter(OptimisticTransactionDB
* db
,
1288 const size_t num_transactions
,
1289 const size_t num_sets
,
1290 const size_t num_keys_per_set
) {
1291 size_t seed
= std::hash
<std::thread::id
>()(std::this_thread::get_id());
1292 Random64
_rand(seed
);
1293 WriteOptions write_options
;
1294 ReadOptions read_options
;
1295 OptimisticTransactionOptions txn_options
;
1296 txn_options
.set_snapshot
= true;
1298 RandomTransactionInserter
inserter(&_rand
, write_options
, read_options
,
1300 static_cast<uint16_t>(num_sets
));
1302 for (size_t t
= 0; t
< num_transactions
; t
++) {
1303 bool success
= inserter
.OptimisticTransactionDBInsert(db
, txn_options
);
1305 // unexpected failure
1306 return inserter
.GetLastStatus();
1310 // Make sure at least some of the transactions succeeded. It's ok if
1311 // some failed due to write-conflicts.
1312 if (inserter
.GetFailureCount() > num_transactions
/ 2) {
1313 return Status::TryAgain("Too many transactions failed! " +
1314 std::to_string(inserter
.GetFailureCount()) + " / " +
1315 std::to_string(num_transactions
));
1318 return Status::OK();
1322 TEST_F(OptimisticTransactionTest
, OptimisticTransactionStressTest
) {
1323 const size_t num_threads
= 4;
1324 const size_t num_transactions_per_thread
= 10000;
1325 const size_t num_sets
= 3;
1326 const size_t num_keys_per_set
= 100;
1327 // Setting the key-space to be 100 keys should cause enough write-conflicts
1328 // to make this test interesting.
1330 std::vector
<port::Thread
> threads
;
1332 std::function
<void()> call_inserter
= [&] {
1333 ASSERT_OK(OptimisticTransactionStressTestInserter(
1334 txn_db
, num_transactions_per_thread
, num_sets
, num_keys_per_set
));
1337 // Create N threads that use RandomTransactionInserter to write
1338 // many transactions.
1339 for (uint32_t i
= 0; i
< num_threads
; i
++) {
1340 threads
.emplace_back(call_inserter
);
1343 // Wait for all threads to run
1344 for (auto& t
: threads
) {
1348 // Verify that data is consistent
1349 Status s
= RandomTransactionInserter::Verify(db
, num_sets
);
1353 TEST_F(OptimisticTransactionTest
, SequenceNumberAfterRecoverTest
) {
1354 WriteOptions write_options
;
1355 OptimisticTransactionOptions transaction_options
;
1357 Transaction
* transaction(txn_db
->BeginTransaction(write_options
, transaction_options
));
1358 Status s
= transaction
->Put("foo", "val");
1360 s
= transaction
->Put("foo2", "val");
1362 s
= transaction
->Put("foo3", "val");
1364 s
= transaction
->Commit();
1369 transaction
= txn_db
->BeginTransaction(write_options
, transaction_options
);
1370 s
= transaction
->Put("bar", "val");
1372 s
= transaction
->Put("bar2", "val");
1374 s
= transaction
->Commit();
1380 } // namespace rocksdb
1382 int main(int argc
, char** argv
) {
1383 ::testing::InitGoogleTest(&argc
, argv
);
1384 return RUN_ALL_TESTS();
1390 int main(int argc
, char** argv
) {
1393 "SKIPPED as optimistic_transaction is not supported in ROCKSDB_LITE\n");
1397 #endif // !ROCKSDB_LITE