1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
12 #include "rocksdb/db.h"
13 #include "rocksdb/utilities/optimistic_transaction_db.h"
14 #include "rocksdb/utilities/transaction.h"
15 #include "util/crc32c.h"
16 #include "util/logging.h"
17 #include "util/random.h"
18 #include "util/testharness.h"
19 #include "util/transaction_test_util.h"
20 #include "port/port.h"
26 class OptimisticTransactionTest
: public testing::Test
{
28 OptimisticTransactionDB
* txn_db
;
32 OptimisticTransactionTest() {
33 options
.create_if_missing
= true;
34 options
.max_write_buffer_number
= 2;
35 dbname
= test::PerThreadDBPath("optimistic_transaction_testdb");
37 DestroyDB(dbname
, options
);
40 ~OptimisticTransactionTest() override
{
42 DestroyDB(dbname
, options
);
53 Status s
= OptimisticTransactionDB::Open(options
, dbname
, &txn_db
);
55 assert(txn_db
!= nullptr);
59 TEST_F(OptimisticTransactionTest
, SuccessTest
) {
60 WriteOptions write_options
;
61 ReadOptions read_options
;
65 txn_db
->Put(write_options
, Slice("foo"), Slice("bar"));
66 txn_db
->Put(write_options
, Slice("foo2"), Slice("bar"));
68 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
71 txn
->GetForUpdate(read_options
, "foo", &value
);
72 ASSERT_EQ(value
, "bar");
74 txn
->Put(Slice("foo"), Slice("bar2"));
76 txn
->GetForUpdate(read_options
, "foo", &value
);
77 ASSERT_EQ(value
, "bar2");
82 txn_db
->Get(read_options
, "foo", &value
);
83 ASSERT_EQ(value
, "bar2");
88 TEST_F(OptimisticTransactionTest
, WriteConflictTest
) {
89 WriteOptions write_options
;
90 ReadOptions read_options
;
94 txn_db
->Put(write_options
, "foo", "bar");
95 txn_db
->Put(write_options
, "foo2", "bar");
97 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
100 txn
->Put("foo", "bar2");
102 // This Put outside of a transaction will conflict with the previous write
103 s
= txn_db
->Put(write_options
, "foo", "barz");
106 s
= txn_db
->Get(read_options
, "foo", &value
);
107 ASSERT_EQ(value
, "barz");
108 ASSERT_EQ(1, txn
->GetNumKeys());
111 ASSERT_TRUE(s
.IsBusy()); // Txn should not commit
113 // Verify that transaction did not write anything
114 txn_db
->Get(read_options
, "foo", &value
);
115 ASSERT_EQ(value
, "barz");
116 txn_db
->Get(read_options
, "foo2", &value
);
117 ASSERT_EQ(value
, "bar");
122 TEST_F(OptimisticTransactionTest
, WriteConflictTest2
) {
123 WriteOptions write_options
;
124 ReadOptions read_options
;
125 OptimisticTransactionOptions txn_options
;
129 txn_db
->Put(write_options
, "foo", "bar");
130 txn_db
->Put(write_options
, "foo2", "bar");
132 txn_options
.set_snapshot
= true;
133 Transaction
* txn
= txn_db
->BeginTransaction(write_options
, txn_options
);
136 // This Put outside of a transaction will conflict with a later write
137 s
= txn_db
->Put(write_options
, "foo", "barz");
140 txn
->Put("foo", "bar2"); // Conflicts with write done after snapshot taken
142 s
= txn_db
->Get(read_options
, "foo", &value
);
143 ASSERT_EQ(value
, "barz");
146 ASSERT_TRUE(s
.IsBusy()); // Txn should not commit
148 // Verify that transaction did not write anything
149 txn_db
->Get(read_options
, "foo", &value
);
150 ASSERT_EQ(value
, "barz");
151 txn_db
->Get(read_options
, "foo2", &value
);
152 ASSERT_EQ(value
, "bar");
157 TEST_F(OptimisticTransactionTest
, ReadConflictTest
) {
158 WriteOptions write_options
;
159 ReadOptions read_options
, snapshot_read_options
;
160 OptimisticTransactionOptions txn_options
;
164 txn_db
->Put(write_options
, "foo", "bar");
165 txn_db
->Put(write_options
, "foo2", "bar");
167 txn_options
.set_snapshot
= true;
168 Transaction
* txn
= txn_db
->BeginTransaction(write_options
, txn_options
);
172 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
174 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
175 ASSERT_EQ(value
, "bar");
177 // This Put outside of a transaction will conflict with the previous read
178 s
= txn_db
->Put(write_options
, "foo", "barz");
181 s
= txn_db
->Get(read_options
, "foo", &value
);
182 ASSERT_EQ(value
, "barz");
185 ASSERT_TRUE(s
.IsBusy()); // Txn should not commit
187 // Verify that transaction did not write anything
188 txn
->GetForUpdate(read_options
, "foo", &value
);
189 ASSERT_EQ(value
, "barz");
190 txn
->GetForUpdate(read_options
, "foo2", &value
);
191 ASSERT_EQ(value
, "bar");
196 TEST_F(OptimisticTransactionTest
, TxnOnlyTest
) {
197 // Test to make sure transactions work when there are no other writes in an
200 WriteOptions write_options
;
201 ReadOptions read_options
;
205 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
216 TEST_F(OptimisticTransactionTest
, FlushTest
) {
217 WriteOptions write_options
;
218 ReadOptions read_options
, snapshot_read_options
;
222 txn_db
->Put(write_options
, Slice("foo"), Slice("bar"));
223 txn_db
->Put(write_options
, Slice("foo2"), Slice("bar"));
225 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
228 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
230 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
231 ASSERT_EQ(value
, "bar");
233 txn
->Put(Slice("foo"), Slice("bar2"));
235 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
236 ASSERT_EQ(value
, "bar2");
238 // Put a random key so we have a memtable to flush
239 s
= txn_db
->Put(write_options
, "dummy", "dummy");
242 // force a memtable flush
243 FlushOptions flush_ops
;
244 txn_db
->Flush(flush_ops
);
247 // txn should commit since the flushed table is still in MemtableList History
250 txn_db
->Get(read_options
, "foo", &value
);
251 ASSERT_EQ(value
, "bar2");
256 TEST_F(OptimisticTransactionTest
, FlushTest2
) {
257 WriteOptions write_options
;
258 ReadOptions read_options
, snapshot_read_options
;
262 txn_db
->Put(write_options
, Slice("foo"), Slice("bar"));
263 txn_db
->Put(write_options
, Slice("foo2"), Slice("bar"));
265 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
268 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
270 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
271 ASSERT_EQ(value
, "bar");
273 txn
->Put(Slice("foo"), Slice("bar2"));
275 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
276 ASSERT_EQ(value
, "bar2");
278 // Put a random key so we have a MemTable to flush
279 s
= txn_db
->Put(write_options
, "dummy", "dummy");
282 // force a memtable flush
283 FlushOptions flush_ops
;
284 txn_db
->Flush(flush_ops
);
286 // Put a random key so we have a MemTable to flush
287 s
= txn_db
->Put(write_options
, "dummy", "dummy2");
290 // force a memtable flush
291 txn_db
->Flush(flush_ops
);
293 s
= txn_db
->Put(write_options
, "dummy", "dummy3");
296 // force a memtable flush
297 // Since our test db has max_write_buffer_number=2, this flush will cause
298 // the first memtable to get purged from the MemtableList history.
299 txn_db
->Flush(flush_ops
);
302 // txn should not commit since MemTableList History is not large enough
303 ASSERT_TRUE(s
.IsTryAgain());
305 txn_db
->Get(read_options
, "foo", &value
);
306 ASSERT_EQ(value
, "bar");
311 TEST_F(OptimisticTransactionTest
, NoSnapshotTest
) {
312 WriteOptions write_options
;
313 ReadOptions read_options
;
317 txn_db
->Put(write_options
, "AAA", "bar");
319 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
322 // Modify key after transaction start
323 txn_db
->Put(write_options
, "AAA", "bar1");
325 // Read and write without a snapshot
326 txn
->GetForUpdate(read_options
, "AAA", &value
);
327 ASSERT_EQ(value
, "bar1");
328 txn
->Put("AAA", "bar2");
330 // Should commit since read/write was done after data changed
334 txn
->GetForUpdate(read_options
, "AAA", &value
);
335 ASSERT_EQ(value
, "bar2");
340 TEST_F(OptimisticTransactionTest
, MultipleSnapshotTest
) {
341 WriteOptions write_options
;
342 ReadOptions read_options
, snapshot_read_options
;
346 txn_db
->Put(write_options
, "AAA", "bar");
347 txn_db
->Put(write_options
, "BBB", "bar");
348 txn_db
->Put(write_options
, "CCC", "bar");
350 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
353 txn_db
->Put(write_options
, "AAA", "bar1");
355 // Read and write without a snapshot
356 txn
->GetForUpdate(read_options
, "AAA", &value
);
357 ASSERT_EQ(value
, "bar1");
358 txn
->Put("AAA", "bar2");
360 // Modify BBB before snapshot is taken
361 txn_db
->Put(write_options
, "BBB", "bar1");
364 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
366 // Read and write with snapshot
367 txn
->GetForUpdate(snapshot_read_options
, "BBB", &value
);
368 ASSERT_EQ(value
, "bar1");
369 txn
->Put("BBB", "bar2");
371 txn_db
->Put(write_options
, "CCC", "bar1");
373 // Set a new snapshot
375 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
377 // Read and write with snapshot
378 txn
->GetForUpdate(snapshot_read_options
, "CCC", &value
);
379 ASSERT_EQ(value
, "bar1");
380 txn
->Put("CCC", "bar2");
382 s
= txn
->GetForUpdate(read_options
, "AAA", &value
);
384 ASSERT_EQ(value
, "bar2");
385 s
= txn
->GetForUpdate(read_options
, "BBB", &value
);
387 ASSERT_EQ(value
, "bar2");
388 s
= txn
->GetForUpdate(read_options
, "CCC", &value
);
390 ASSERT_EQ(value
, "bar2");
392 s
= txn_db
->Get(read_options
, "AAA", &value
);
394 ASSERT_EQ(value
, "bar1");
395 s
= txn_db
->Get(read_options
, "BBB", &value
);
397 ASSERT_EQ(value
, "bar1");
398 s
= txn_db
->Get(read_options
, "CCC", &value
);
400 ASSERT_EQ(value
, "bar1");
405 s
= txn_db
->Get(read_options
, "AAA", &value
);
407 ASSERT_EQ(value
, "bar2");
408 s
= txn_db
->Get(read_options
, "BBB", &value
);
410 ASSERT_EQ(value
, "bar2");
411 s
= txn_db
->Get(read_options
, "CCC", &value
);
413 ASSERT_EQ(value
, "bar2");
415 // verify that we track multiple writes to the same key at different snapshots
417 txn
= txn_db
->BeginTransaction(write_options
);
419 // Potentially conflicting writes
420 txn_db
->Put(write_options
, "ZZZ", "zzz");
421 txn_db
->Put(write_options
, "XXX", "xxx");
425 OptimisticTransactionOptions txn_options
;
426 txn_options
.set_snapshot
= true;
427 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
430 // This should not conflict in txn since the snapshot is later than the
431 // previous write (spoiler alert: it will later conflict with txn2).
432 txn
->Put("ZZZ", "zzzz");
438 // This will conflict since the snapshot is earlier than another write to ZZZ
439 txn2
->Put("ZZZ", "xxxxx");
442 ASSERT_TRUE(s
.IsBusy());
447 TEST_F(OptimisticTransactionTest
, ColumnFamiliesTest
) {
448 WriteOptions write_options
;
449 ReadOptions read_options
, snapshot_read_options
;
450 OptimisticTransactionOptions txn_options
;
454 ColumnFamilyHandle
*cfa
, *cfb
;
455 ColumnFamilyOptions cf_options
;
457 // Create 2 new column families
458 s
= txn_db
->CreateColumnFamily(cf_options
, "CFA", &cfa
);
460 s
= txn_db
->CreateColumnFamily(cf_options
, "CFB", &cfb
);
468 // open DB with three column families
469 std::vector
<ColumnFamilyDescriptor
> column_families
;
470 // have to open default column family
471 column_families
.push_back(
472 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, ColumnFamilyOptions()));
473 // open the new column families
474 column_families
.push_back(
475 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
476 column_families
.push_back(
477 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
478 std::vector
<ColumnFamilyHandle
*> handles
;
479 s
= OptimisticTransactionDB::Open(options
, dbname
, column_families
, &handles
,
482 assert(txn_db
!= nullptr);
484 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
488 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
490 txn_options
.set_snapshot
= true;
491 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
494 // Write some data to the db
496 batch
.Put("foo", "foo");
497 batch
.Put(handles
[1], "AAA", "bar");
498 batch
.Put(handles
[1], "AAAZZZ", "bar");
499 s
= txn_db
->Write(write_options
, &batch
);
501 txn_db
->Delete(write_options
, handles
[1], "AAAZZZ");
503 // These keys do no conflict with existing writes since they're in
504 // different column families
506 txn
->GetForUpdate(snapshot_read_options
, handles
[1], "foo", &value
);
507 Slice
key_slice("AAAZZZ");
508 Slice value_slices
[2] = {Slice("bar"), Slice("bar")};
509 txn
->Put(handles
[2], SliceParts(&key_slice
, 1), SliceParts(value_slices
, 2));
511 ASSERT_EQ(3, txn
->GetNumKeys());
516 s
= txn_db
->Get(read_options
, "AAA", &value
);
517 ASSERT_TRUE(s
.IsNotFound());
518 s
= txn_db
->Get(read_options
, handles
[2], "AAAZZZ", &value
);
519 ASSERT_EQ(value
, "barbar");
521 Slice key_slices
[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
522 Slice
value_slice("barbarbar");
523 // This write will cause a conflict with the earlier batch write
524 txn2
->Put(handles
[1], SliceParts(key_slices
, 3), SliceParts(&value_slice
, 1));
526 txn2
->Delete(handles
[2], "XXX");
527 txn2
->Delete(handles
[1], "XXX");
528 s
= txn2
->GetForUpdate(snapshot_read_options
, handles
[1], "AAA", &value
);
529 ASSERT_TRUE(s
.IsNotFound());
531 // Verify txn did not commit
533 ASSERT_TRUE(s
.IsBusy());
534 s
= txn_db
->Get(read_options
, handles
[1], "AAAZZZ", &value
);
535 ASSERT_EQ(value
, "barbar");
540 txn
= txn_db
->BeginTransaction(write_options
, txn_options
);
541 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
543 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
546 std::vector
<ColumnFamilyHandle
*> multiget_cfh
= {handles
[1], handles
[2],
547 handles
[0], handles
[2]};
548 std::vector
<Slice
> multiget_keys
= {"AAA", "AAAZZZ", "foo", "foo"};
549 std::vector
<std::string
> values(4);
551 std::vector
<Status
> results
= txn
->MultiGetForUpdate(
552 snapshot_read_options
, multiget_cfh
, multiget_keys
, &values
);
553 ASSERT_OK(results
[0]);
554 ASSERT_OK(results
[1]);
555 ASSERT_OK(results
[2]);
556 ASSERT_TRUE(results
[3].IsNotFound());
557 ASSERT_EQ(values
[0], "bar");
558 ASSERT_EQ(values
[1], "barbar");
559 ASSERT_EQ(values
[2], "foo");
561 txn
->Delete(handles
[2], "ZZZ");
562 txn
->Put(handles
[2], "ZZZ", "YYY");
563 txn
->Put(handles
[2], "ZZZ", "YYYY");
564 txn
->Delete(handles
[2], "ZZZ");
565 txn
->Put(handles
[2], "AAAZZZ", "barbarbar");
567 ASSERT_EQ(5, txn
->GetNumKeys());
572 s
= txn_db
->Get(read_options
, handles
[2], "ZZZ", &value
);
573 ASSERT_TRUE(s
.IsNotFound());
575 // Put a key which will conflict with the next txn using the previous snapshot
576 txn_db
->Put(write_options
, handles
[2], "foo", "000");
578 results
= txn2
->MultiGetForUpdate(snapshot_read_options
, multiget_cfh
,
579 multiget_keys
, &values
);
580 ASSERT_OK(results
[0]);
581 ASSERT_OK(results
[1]);
582 ASSERT_OK(results
[2]);
583 ASSERT_TRUE(results
[3].IsNotFound());
584 ASSERT_EQ(values
[0], "bar");
585 ASSERT_EQ(values
[1], "barbar");
586 ASSERT_EQ(values
[2], "foo");
588 // Verify Txn Did not Commit
590 ASSERT_TRUE(s
.IsBusy());
592 s
= txn_db
->DropColumnFamily(handles
[1]);
594 s
= txn_db
->DropColumnFamily(handles
[2]);
600 for (auto handle
: handles
) {
605 TEST_F(OptimisticTransactionTest
, EmptyTest
) {
606 WriteOptions write_options
;
607 ReadOptions read_options
;
611 s
= txn_db
->Put(write_options
, "aaa", "aaa");
614 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
619 txn
= txn_db
->BeginTransaction(write_options
);
623 txn
= txn_db
->BeginTransaction(write_options
);
624 s
= txn
->GetForUpdate(read_options
, "aaa", &value
);
625 ASSERT_EQ(value
, "aaa");
631 txn
= txn_db
->BeginTransaction(write_options
);
633 s
= txn
->GetForUpdate(read_options
, "aaa", &value
);
634 ASSERT_EQ(value
, "aaa");
636 s
= txn_db
->Put(write_options
, "aaa", "xxx");
638 ASSERT_TRUE(s
.IsBusy());
642 TEST_F(OptimisticTransactionTest
, PredicateManyPreceders
) {
643 WriteOptions write_options
;
644 ReadOptions read_options1
, read_options2
;
645 OptimisticTransactionOptions txn_options
;
649 txn_options
.set_snapshot
= true;
650 Transaction
* txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
651 read_options1
.snapshot
= txn1
->GetSnapshot();
653 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
);
655 read_options2
.snapshot
= txn2
->GetSnapshot();
657 std::vector
<Slice
> multiget_keys
= {"1", "2", "3"};
658 std::vector
<std::string
> multiget_values
;
660 std::vector
<Status
> results
=
661 txn1
->MultiGetForUpdate(read_options1
, multiget_keys
, &multiget_values
);
662 ASSERT_TRUE(results
[1].IsNotFound());
669 multiget_values
.clear();
671 txn1
->MultiGetForUpdate(read_options1
, multiget_keys
, &multiget_values
);
672 ASSERT_TRUE(results
[1].IsNotFound());
674 // should not commit since txn2 wrote a key txn has read
676 ASSERT_TRUE(s
.IsBusy());
681 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
682 read_options1
.snapshot
= txn1
->GetSnapshot();
684 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
685 read_options2
.snapshot
= txn2
->GetSnapshot();
691 // txn1 can commit since txn2's delete hasn't happened yet (it's just batched)
695 s
= txn2
->GetForUpdate(read_options2
, "4", &value
);
696 ASSERT_TRUE(s
.IsNotFound());
698 // txn2 cannot commit since txn1 changed "4"
700 ASSERT_TRUE(s
.IsBusy());
706 TEST_F(OptimisticTransactionTest
, LostUpdate
) {
707 WriteOptions write_options
;
708 ReadOptions read_options
, read_options1
, read_options2
;
709 OptimisticTransactionOptions txn_options
;
713 // Test 2 transactions writing to the same key in multiple orders and
714 // with/without snapshots
716 Transaction
* txn1
= txn_db
->BeginTransaction(write_options
);
717 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
);
726 ASSERT_TRUE(s
.IsBusy());
731 txn_options
.set_snapshot
= true;
732 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
733 read_options1
.snapshot
= txn1
->GetSnapshot();
735 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
736 read_options2
.snapshot
= txn2
->GetSnapshot();
745 ASSERT_TRUE(s
.IsBusy());
750 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
751 read_options1
.snapshot
= txn1
->GetSnapshot();
753 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
754 read_options2
.snapshot
= txn2
->GetSnapshot();
762 ASSERT_TRUE(s
.IsBusy());
767 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
768 read_options1
.snapshot
= txn1
->GetSnapshot();
770 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
771 read_options2
.snapshot
= txn2
->GetSnapshot();
785 txn1
= txn_db
->BeginTransaction(write_options
);
786 txn2
= txn_db
->BeginTransaction(write_options
);
799 s
= txn_db
->Get(read_options
, "1", &value
);
801 ASSERT_EQ(value
, "8");
804 TEST_F(OptimisticTransactionTest
, UntrackedWrites
) {
805 WriteOptions write_options
;
806 ReadOptions read_options
;
810 // Verify transaction rollback works for untracked keys.
811 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
812 txn
->PutUntracked("untracked", "0");
814 s
= txn_db
->Get(read_options
, "untracked", &value
);
815 ASSERT_TRUE(s
.IsNotFound());
818 txn
= txn_db
->BeginTransaction(write_options
);
820 txn
->Put("tracked", "1");
821 txn
->PutUntracked("untracked", "1");
822 txn
->MergeUntracked("untracked", "2");
823 txn
->DeleteUntracked("untracked");
825 // Write to the untracked key outside of the transaction and verify
826 // it doesn't prevent the transaction from committing.
827 s
= txn_db
->Put(write_options
, "untracked", "x");
833 s
= txn_db
->Get(read_options
, "untracked", &value
);
834 ASSERT_TRUE(s
.IsNotFound());
837 txn
= txn_db
->BeginTransaction(write_options
);
839 txn
->Put("tracked", "10");
840 txn
->PutUntracked("untracked", "A");
842 // Write to tracked key outside of the transaction and verify that the
843 // untracked keys are not written when the commit fails.
844 s
= txn_db
->Delete(write_options
, "tracked");
847 ASSERT_TRUE(s
.IsBusy());
849 s
= txn_db
->Get(read_options
, "untracked", &value
);
850 ASSERT_TRUE(s
.IsNotFound());
855 TEST_F(OptimisticTransactionTest
, IteratorTest
) {
856 WriteOptions write_options
;
857 ReadOptions read_options
, snapshot_read_options
;
858 OptimisticTransactionOptions txn_options
;
862 // Write some keys to the db
863 s
= txn_db
->Put(write_options
, "A", "a");
866 s
= txn_db
->Put(write_options
, "G", "g");
869 s
= txn_db
->Put(write_options
, "F", "f");
872 s
= txn_db
->Put(write_options
, "C", "c");
875 s
= txn_db
->Put(write_options
, "D", "d");
878 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
881 // Write some keys in a txn
882 s
= txn
->Put("B", "b");
885 s
= txn
->Put("H", "h");
888 s
= txn
->Delete("D");
891 s
= txn
->Put("E", "e");
895 const Snapshot
* snapshot
= txn
->GetSnapshot();
897 // Write some keys to the db after the snapshot
898 s
= txn_db
->Put(write_options
, "BB", "xx");
901 s
= txn_db
->Put(write_options
, "C", "xx");
904 read_options
.snapshot
= snapshot
;
905 Iterator
* iter
= txn
->GetIterator(read_options
);
906 ASSERT_OK(iter
->status());
909 // Read all keys via iter and lock them all
910 std::string results
[] = {"a", "b", "c", "e", "f", "g", "h"};
911 for (int i
= 0; i
< 7; i
++) {
912 ASSERT_OK(iter
->status());
913 ASSERT_TRUE(iter
->Valid());
914 ASSERT_EQ(results
[i
], iter
->value().ToString());
916 s
= txn
->GetForUpdate(read_options
, iter
->key(), nullptr);
921 ASSERT_FALSE(iter
->Valid());
924 ASSERT_OK(iter
->status());
925 ASSERT_TRUE(iter
->Valid());
926 ASSERT_EQ("g", iter
->value().ToString());
929 ASSERT_OK(iter
->status());
930 ASSERT_TRUE(iter
->Valid());
931 ASSERT_EQ("f", iter
->value().ToString());
934 ASSERT_OK(iter
->status());
935 ASSERT_TRUE(iter
->Valid());
936 ASSERT_EQ("e", iter
->value().ToString());
939 ASSERT_OK(iter
->status());
940 ASSERT_TRUE(iter
->Valid());
941 ASSERT_EQ("c", iter
->value().ToString());
944 ASSERT_OK(iter
->status());
945 ASSERT_TRUE(iter
->Valid());
946 ASSERT_EQ("e", iter
->value().ToString());
949 ASSERT_OK(iter
->status());
950 ASSERT_TRUE(iter
->Valid());
951 ASSERT_EQ("a", iter
->value().ToString());
954 ASSERT_OK(iter
->status());
955 ASSERT_FALSE(iter
->Valid());
958 ASSERT_OK(iter
->status());
959 ASSERT_TRUE(iter
->Valid());
960 ASSERT_EQ("h", iter
->value().ToString());
962 // key "C" was modified in the db after txn's snapshot. txn will not commit.
964 ASSERT_TRUE(s
.IsBusy());
970 TEST_F(OptimisticTransactionTest
, SavepointTest
) {
971 WriteOptions write_options
;
972 ReadOptions read_options
, snapshot_read_options
;
973 OptimisticTransactionOptions txn_options
;
977 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
980 s
= txn
->RollbackToSavePoint();
981 ASSERT_TRUE(s
.IsNotFound());
983 txn
->SetSavePoint(); // 1
985 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to beginning of txn
986 s
= txn
->RollbackToSavePoint();
987 ASSERT_TRUE(s
.IsNotFound());
989 s
= txn
->Put("B", "b");
995 s
= txn_db
->Get(read_options
, "B", &value
);
997 ASSERT_EQ("b", value
);
1000 txn
= txn_db
->BeginTransaction(write_options
);
1003 s
= txn
->Put("A", "a");
1006 s
= txn
->Put("B", "bb");
1009 s
= txn
->Put("C", "c");
1012 txn
->SetSavePoint(); // 2
1014 s
= txn
->Delete("B");
1017 s
= txn
->Put("C", "cc");
1020 s
= txn
->Put("D", "d");
1023 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to 2
1025 s
= txn
->Get(read_options
, "A", &value
);
1027 ASSERT_EQ("a", value
);
1029 s
= txn
->Get(read_options
, "B", &value
);
1031 ASSERT_EQ("bb", value
);
1033 s
= txn
->Get(read_options
, "C", &value
);
1035 ASSERT_EQ("c", value
);
1037 s
= txn
->Get(read_options
, "D", &value
);
1038 ASSERT_TRUE(s
.IsNotFound());
1040 s
= txn
->Put("A", "a");
1043 s
= txn
->Put("E", "e");
1046 // Rollback to beginning of txn
1047 s
= txn
->RollbackToSavePoint();
1048 ASSERT_TRUE(s
.IsNotFound());
1051 s
= txn
->Get(read_options
, "A", &value
);
1052 ASSERT_TRUE(s
.IsNotFound());
1054 s
= txn
->Get(read_options
, "B", &value
);
1056 ASSERT_EQ("b", value
);
1058 s
= txn
->Get(read_options
, "D", &value
);
1059 ASSERT_TRUE(s
.IsNotFound());
1061 s
= txn
->Get(read_options
, "D", &value
);
1062 ASSERT_TRUE(s
.IsNotFound());
1064 s
= txn
->Get(read_options
, "E", &value
);
1065 ASSERT_TRUE(s
.IsNotFound());
1067 s
= txn
->Put("A", "aa");
1070 s
= txn
->Put("F", "f");
1073 txn
->SetSavePoint(); // 3
1074 txn
->SetSavePoint(); // 4
1076 s
= txn
->Put("G", "g");
1079 s
= txn
->Delete("F");
1082 s
= txn
->Delete("B");
1085 s
= txn
->Get(read_options
, "A", &value
);
1087 ASSERT_EQ("aa", value
);
1089 s
= txn
->Get(read_options
, "F", &value
);
1090 ASSERT_TRUE(s
.IsNotFound());
1092 s
= txn
->Get(read_options
, "B", &value
);
1093 ASSERT_TRUE(s
.IsNotFound());
1095 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to 3
1097 s
= txn
->Get(read_options
, "F", &value
);
1099 ASSERT_EQ("f", value
);
1101 s
= txn
->Get(read_options
, "G", &value
);
1102 ASSERT_TRUE(s
.IsNotFound());
1107 s
= txn_db
->Get(read_options
, "F", &value
);
1109 ASSERT_EQ("f", value
);
1111 s
= txn_db
->Get(read_options
, "G", &value
);
1112 ASSERT_TRUE(s
.IsNotFound());
1114 s
= txn_db
->Get(read_options
, "A", &value
);
1116 ASSERT_EQ("aa", value
);
1118 s
= txn_db
->Get(read_options
, "B", &value
);
1120 ASSERT_EQ("b", value
);
1122 s
= txn_db
->Get(read_options
, "C", &value
);
1123 ASSERT_TRUE(s
.IsNotFound());
1125 s
= txn_db
->Get(read_options
, "D", &value
);
1126 ASSERT_TRUE(s
.IsNotFound());
1128 s
= txn_db
->Get(read_options
, "E", &value
);
1129 ASSERT_TRUE(s
.IsNotFound());
1134 TEST_F(OptimisticTransactionTest
, UndoGetForUpdateTest
) {
1135 WriteOptions write_options
;
1136 ReadOptions read_options
, snapshot_read_options
;
1137 OptimisticTransactionOptions txn_options
;
1141 txn_db
->Put(write_options
, "A", "");
1143 Transaction
* txn1
= txn_db
->BeginTransaction(write_options
);
1146 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1149 txn1
->UndoGetForUpdate("A");
1151 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
);
1152 txn2
->Put("A", "x");
1157 // Verify that txn1 can commit since A isn't conflict checked
1162 txn1
= txn_db
->BeginTransaction(write_options
);
1163 txn1
->Put("A", "a");
1165 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1168 txn1
->UndoGetForUpdate("A");
1170 txn2
= txn_db
->BeginTransaction(write_options
);
1171 txn2
->Put("A", "x");
1176 // Verify that txn1 cannot commit since A will still be conflict checked
1178 ASSERT_TRUE(s
.IsBusy());
1181 txn1
= txn_db
->BeginTransaction(write_options
);
1183 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1185 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1188 txn1
->UndoGetForUpdate("A");
1190 txn2
= txn_db
->BeginTransaction(write_options
);
1191 txn2
->Put("A", "x");
1196 // Verify that txn1 cannot commit since A will still be conflict checked
1198 ASSERT_TRUE(s
.IsBusy());
1201 txn1
= txn_db
->BeginTransaction(write_options
);
1203 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1205 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1208 txn1
->UndoGetForUpdate("A");
1209 txn1
->UndoGetForUpdate("A");
1211 txn2
= txn_db
->BeginTransaction(write_options
);
1212 txn2
->Put("A", "x");
1217 // Verify that txn1 can commit since A isn't conflict checked
1222 txn1
= txn_db
->BeginTransaction(write_options
);
1224 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1227 txn1
->SetSavePoint();
1228 txn1
->UndoGetForUpdate("A");
1230 txn2
= txn_db
->BeginTransaction(write_options
);
1231 txn2
->Put("A", "x");
1236 // Verify that txn1 cannot commit since A will still be conflict checked
1238 ASSERT_TRUE(s
.IsBusy());
1241 txn1
= txn_db
->BeginTransaction(write_options
);
1243 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1246 txn1
->SetSavePoint();
1247 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1249 txn1
->UndoGetForUpdate("A");
1251 txn2
= txn_db
->BeginTransaction(write_options
);
1252 txn2
->Put("A", "x");
1257 // Verify that txn1 cannot commit since A will still be conflict checked
1259 ASSERT_TRUE(s
.IsBusy());
1262 txn1
= txn_db
->BeginTransaction(write_options
);
1264 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1267 txn1
->SetSavePoint();
1268 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1270 txn1
->UndoGetForUpdate("A");
1272 txn1
->RollbackToSavePoint();
1273 txn1
->UndoGetForUpdate("A");
1275 txn2
= txn_db
->BeginTransaction(write_options
);
1276 txn2
->Put("A", "x");
1281 // Verify that txn1 can commit since A isn't conflict checked
1288 Status
OptimisticTransactionStressTestInserter(OptimisticTransactionDB
* db
,
1289 const size_t num_transactions
,
1290 const size_t num_sets
,
1291 const size_t num_keys_per_set
) {
1292 size_t seed
= std::hash
<std::thread::id
>()(std::this_thread::get_id());
1293 Random64
_rand(seed
);
1294 WriteOptions write_options
;
1295 ReadOptions read_options
;
1296 OptimisticTransactionOptions txn_options
;
1297 txn_options
.set_snapshot
= true;
1299 RandomTransactionInserter
inserter(&_rand
, write_options
, read_options
,
1301 static_cast<uint16_t>(num_sets
));
1303 for (size_t t
= 0; t
< num_transactions
; t
++) {
1304 bool success
= inserter
.OptimisticTransactionDBInsert(db
, txn_options
);
1306 // unexpected failure
1307 return inserter
.GetLastStatus();
1311 // Make sure at least some of the transactions succeeded. It's ok if
1312 // some failed due to write-conflicts.
1313 if (inserter
.GetFailureCount() > num_transactions
/ 2) {
1314 return Status::TryAgain("Too many transactions failed! " +
1315 std::to_string(inserter
.GetFailureCount()) + " / " +
1316 std::to_string(num_transactions
));
1319 return Status::OK();
1323 TEST_F(OptimisticTransactionTest
, OptimisticTransactionStressTest
) {
1324 const size_t num_threads
= 4;
1325 const size_t num_transactions_per_thread
= 10000;
1326 const size_t num_sets
= 3;
1327 const size_t num_keys_per_set
= 100;
1328 // Setting the key-space to be 100 keys should cause enough write-conflicts
1329 // to make this test interesting.
1331 std::vector
<port::Thread
> threads
;
1333 std::function
<void()> call_inserter
= [&] {
1334 ASSERT_OK(OptimisticTransactionStressTestInserter(
1335 txn_db
, num_transactions_per_thread
, num_sets
, num_keys_per_set
));
1338 // Create N threads that use RandomTransactionInserter to write
1339 // many transactions.
1340 for (uint32_t i
= 0; i
< num_threads
; i
++) {
1341 threads
.emplace_back(call_inserter
);
1344 // Wait for all threads to run
1345 for (auto& t
: threads
) {
1349 // Verify that data is consistent
1350 Status s
= RandomTransactionInserter::Verify(txn_db
, num_sets
);
1354 TEST_F(OptimisticTransactionTest
, SequenceNumberAfterRecoverTest
) {
1355 WriteOptions write_options
;
1356 OptimisticTransactionOptions transaction_options
;
1358 Transaction
* transaction(txn_db
->BeginTransaction(write_options
, transaction_options
));
1359 Status s
= transaction
->Put("foo", "val");
1361 s
= transaction
->Put("foo2", "val");
1363 s
= transaction
->Put("foo3", "val");
1365 s
= transaction
->Commit();
1370 transaction
= txn_db
->BeginTransaction(write_options
, transaction_options
);
1371 s
= transaction
->Put("bar", "val");
1373 s
= transaction
->Put("bar2", "val");
1375 s
= transaction
->Commit();
1381 } // namespace rocksdb
1383 int main(int argc
, char** argv
) {
1384 ::testing::InitGoogleTest(&argc
, argv
);
1385 return RUN_ALL_TESTS();
1391 int main(int /*argc*/, char** /*argv*/) {
1394 "SKIPPED as optimistic_transaction is not supported in ROCKSDB_LITE\n");
1398 #endif // !ROCKSDB_LITE