1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
12 #include "db/db_impl/db_impl.h"
13 #include "logging/logging.h"
14 #include "port/port.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/perf_context.h"
17 #include "rocksdb/utilities/optimistic_transaction_db.h"
18 #include "rocksdb/utilities/transaction.h"
19 #include "test_util/sync_point.h"
20 #include "test_util/testharness.h"
21 #include "test_util/transaction_test_util.h"
22 #include "util/crc32c.h"
23 #include "util/random.h"
27 namespace ROCKSDB_NAMESPACE
{
29 class OptimisticTransactionTest
30 : public testing::Test
,
31 public testing::WithParamInterface
<OccValidationPolicy
> {
33 OptimisticTransactionDB
* txn_db
;
37 OptimisticTransactionTest() {
38 options
.create_if_missing
= true;
39 options
.max_write_buffer_number
= 2;
40 options
.max_write_buffer_size_to_maintain
= 1600;
41 dbname
= test::PerThreadDBPath("optimistic_transaction_testdb");
43 DestroyDB(dbname
, options
);
46 ~OptimisticTransactionTest() override
{
48 DestroyDB(dbname
, options
);
59 ColumnFamilyOptions
cf_options(options
);
60 OptimisticTransactionDBOptions occ_opts
;
61 occ_opts
.validate_policy
= GetParam();
62 std::vector
<ColumnFamilyDescriptor
> column_families
;
63 std::vector
<ColumnFamilyHandle
*> handles
;
64 column_families
.push_back(
65 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, cf_options
));
67 OptimisticTransactionDB::Open(DBOptions(options
), occ_opts
, dbname
,
68 column_families
, &handles
, &txn_db
);
71 assert(txn_db
!= nullptr);
72 assert(handles
.size() == 1);
77 TEST_P(OptimisticTransactionTest
, SuccessTest
) {
78 WriteOptions write_options
;
79 ReadOptions read_options
;
83 txn_db
->Put(write_options
, Slice("foo"), Slice("bar"));
84 txn_db
->Put(write_options
, Slice("foo2"), Slice("bar"));
86 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
89 txn
->GetForUpdate(read_options
, "foo", &value
);
90 ASSERT_EQ(value
, "bar");
92 txn
->Put(Slice("foo"), Slice("bar2"));
94 txn
->GetForUpdate(read_options
, "foo", &value
);
95 ASSERT_EQ(value
, "bar2");
100 txn_db
->Get(read_options
, "foo", &value
);
101 ASSERT_EQ(value
, "bar2");
106 TEST_P(OptimisticTransactionTest
, WriteConflictTest
) {
107 WriteOptions write_options
;
108 ReadOptions read_options
;
112 txn_db
->Put(write_options
, "foo", "bar");
113 txn_db
->Put(write_options
, "foo2", "bar");
115 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
118 txn
->Put("foo", "bar2");
120 // This Put outside of a transaction will conflict with the previous write
121 s
= txn_db
->Put(write_options
, "foo", "barz");
124 s
= txn_db
->Get(read_options
, "foo", &value
);
125 ASSERT_EQ(value
, "barz");
126 ASSERT_EQ(1, txn
->GetNumKeys());
129 ASSERT_TRUE(s
.IsBusy()); // Txn should not commit
131 // Verify that transaction did not write anything
132 txn_db
->Get(read_options
, "foo", &value
);
133 ASSERT_EQ(value
, "barz");
134 txn_db
->Get(read_options
, "foo2", &value
);
135 ASSERT_EQ(value
, "bar");
140 TEST_P(OptimisticTransactionTest
, WriteConflictTest2
) {
141 WriteOptions write_options
;
142 ReadOptions read_options
;
143 OptimisticTransactionOptions txn_options
;
147 txn_db
->Put(write_options
, "foo", "bar");
148 txn_db
->Put(write_options
, "foo2", "bar");
150 txn_options
.set_snapshot
= true;
151 Transaction
* txn
= txn_db
->BeginTransaction(write_options
, txn_options
);
154 // This Put outside of a transaction will conflict with a later write
155 s
= txn_db
->Put(write_options
, "foo", "barz");
158 txn
->Put("foo", "bar2"); // Conflicts with write done after snapshot taken
160 s
= txn_db
->Get(read_options
, "foo", &value
);
161 ASSERT_EQ(value
, "barz");
164 ASSERT_TRUE(s
.IsBusy()); // Txn should not commit
166 // Verify that transaction did not write anything
167 txn_db
->Get(read_options
, "foo", &value
);
168 ASSERT_EQ(value
, "barz");
169 txn_db
->Get(read_options
, "foo2", &value
);
170 ASSERT_EQ(value
, "bar");
175 TEST_P(OptimisticTransactionTest
, ReadConflictTest
) {
176 WriteOptions write_options
;
177 ReadOptions read_options
, snapshot_read_options
;
178 OptimisticTransactionOptions txn_options
;
182 txn_db
->Put(write_options
, "foo", "bar");
183 txn_db
->Put(write_options
, "foo2", "bar");
185 txn_options
.set_snapshot
= true;
186 Transaction
* txn
= txn_db
->BeginTransaction(write_options
, txn_options
);
190 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
192 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
193 ASSERT_EQ(value
, "bar");
195 // This Put outside of a transaction will conflict with the previous read
196 s
= txn_db
->Put(write_options
, "foo", "barz");
199 s
= txn_db
->Get(read_options
, "foo", &value
);
200 ASSERT_EQ(value
, "barz");
203 ASSERT_TRUE(s
.IsBusy()); // Txn should not commit
205 // Verify that transaction did not write anything
206 txn
->GetForUpdate(read_options
, "foo", &value
);
207 ASSERT_EQ(value
, "barz");
208 txn
->GetForUpdate(read_options
, "foo2", &value
);
209 ASSERT_EQ(value
, "bar");
214 TEST_P(OptimisticTransactionTest
, TxnOnlyTest
) {
215 // Test to make sure transactions work when there are no other writes in an
218 WriteOptions write_options
;
219 ReadOptions read_options
;
223 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
234 TEST_P(OptimisticTransactionTest
, FlushTest
) {
235 WriteOptions write_options
;
236 ReadOptions read_options
, snapshot_read_options
;
240 txn_db
->Put(write_options
, Slice("foo"), Slice("bar"));
241 txn_db
->Put(write_options
, Slice("foo2"), Slice("bar"));
243 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
246 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
248 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
249 ASSERT_EQ(value
, "bar");
251 txn
->Put(Slice("foo"), Slice("bar2"));
253 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
254 ASSERT_EQ(value
, "bar2");
256 // Put a random key so we have a memtable to flush
257 s
= txn_db
->Put(write_options
, "dummy", "dummy");
260 // force a memtable flush
261 FlushOptions flush_ops
;
262 txn_db
->Flush(flush_ops
);
265 // txn should commit since the flushed table is still in MemtableList History
268 txn_db
->Get(read_options
, "foo", &value
);
269 ASSERT_EQ(value
, "bar2");
274 TEST_P(OptimisticTransactionTest
, FlushTest2
) {
275 WriteOptions write_options
;
276 ReadOptions read_options
, snapshot_read_options
;
280 txn_db
->Put(write_options
, Slice("foo"), Slice("bar"));
281 txn_db
->Put(write_options
, Slice("foo2"), Slice("bar"));
283 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
286 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
288 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
289 ASSERT_EQ(value
, "bar");
291 txn
->Put(Slice("foo"), Slice("bar2"));
293 txn
->GetForUpdate(snapshot_read_options
, "foo", &value
);
294 ASSERT_EQ(value
, "bar2");
296 // Put a random key so we have a MemTable to flush
297 s
= txn_db
->Put(write_options
, "dummy", "dummy");
300 // force a memtable flush
301 FlushOptions flush_ops
;
302 txn_db
->Flush(flush_ops
);
304 // Put a random key so we have a MemTable to flush
305 s
= txn_db
->Put(write_options
, "dummy", "dummy2");
308 // force a memtable flush
309 txn_db
->Flush(flush_ops
);
311 s
= txn_db
->Put(write_options
, "dummy", "dummy3");
314 // force a memtable flush
315 // Since our test db has max_write_buffer_number=2, this flush will cause
316 // the first memtable to get purged from the MemtableList history.
317 txn_db
->Flush(flush_ops
);
320 // txn should not commit since MemTableList History is not large enough
321 ASSERT_TRUE(s
.IsTryAgain());
323 txn_db
->Get(read_options
, "foo", &value
);
324 ASSERT_EQ(value
, "bar");
329 // Trigger the condition where some old memtables are skipped when doing
330 // TransactionUtil::CheckKey(), and make sure the result is still correct.
331 TEST_P(OptimisticTransactionTest
, CheckKeySkipOldMemtable
) {
332 const int kAttemptHistoryMemtable
= 0;
333 const int kAttemptImmMemTable
= 1;
334 for (int attempt
= kAttemptHistoryMemtable
; attempt
<= kAttemptImmMemTable
;
336 options
.max_write_buffer_number_to_maintain
= 3;
339 WriteOptions write_options
;
340 ReadOptions read_options
;
341 ReadOptions snapshot_read_options
;
342 ReadOptions snapshot_read_options2
;
346 ASSERT_OK(txn_db
->Put(write_options
, Slice("foo"), Slice("bar")));
347 ASSERT_OK(txn_db
->Put(write_options
, Slice("foo2"), Slice("bar")));
349 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
350 ASSERT_TRUE(txn
!= nullptr);
352 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
);
353 ASSERT_TRUE(txn2
!= nullptr);
355 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
356 ASSERT_OK(txn
->GetForUpdate(snapshot_read_options
, "foo", &value
));
357 ASSERT_EQ(value
, "bar");
358 ASSERT_OK(txn
->Put(Slice("foo"), Slice("bar2")));
360 snapshot_read_options2
.snapshot
= txn2
->GetSnapshot();
361 ASSERT_OK(txn2
->GetForUpdate(snapshot_read_options2
, "foo2", &value
));
362 ASSERT_EQ(value
, "bar");
363 ASSERT_OK(txn2
->Put(Slice("foo2"), Slice("bar2")));
365 // txn updates "foo" and txn2 updates "foo2", and now a write is
366 // issued for "foo", which conflicts with txn but not txn2
367 ASSERT_OK(txn_db
->Put(write_options
, "foo", "bar"));
369 if (attempt
== kAttemptImmMemTable
) {
370 // For the second attempt, hold flush from beginning. The memtable
371 // will be switched to immutable after calling TEST_SwitchMemtable()
372 // while CheckKey() is called.
373 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
374 {{"OptimisticTransactionTest.CheckKeySkipOldMemtable",
375 "FlushJob::Start"}});
376 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
379 // force a memtable flush. The memtable should still be kept
380 FlushOptions flush_ops
;
381 if (attempt
== kAttemptHistoryMemtable
) {
382 ASSERT_OK(txn_db
->Flush(flush_ops
));
384 assert(attempt
== kAttemptImmMemTable
);
385 DBImpl
* db_impl
= static_cast<DBImpl
*>(txn_db
->GetRootDB());
386 db_impl
->TEST_SwitchMemtable();
388 uint64_t num_imm_mems
;
389 ASSERT_TRUE(txn_db
->GetIntProperty(DB::Properties::kNumImmutableMemTable
,
391 if (attempt
== kAttemptHistoryMemtable
) {
392 ASSERT_EQ(0, num_imm_mems
);
394 assert(attempt
== kAttemptImmMemTable
);
395 ASSERT_EQ(1, num_imm_mems
);
398 // Put something in active memtable
399 ASSERT_OK(txn_db
->Put(write_options
, Slice("foo3"), Slice("bar")));
401 // Create txn3 after flushing, when this transaction is commited,
402 // only need to check the active memtable
403 Transaction
* txn3
= txn_db
->BeginTransaction(write_options
);
404 ASSERT_TRUE(txn3
!= nullptr);
406 // Commit both of txn and txn2. txn will conflict but txn2 will
407 // pass. In both ways, both memtables are queried.
408 SetPerfLevel(PerfLevel::kEnableCount
);
410 get_perf_context()->Reset();
412 // We should have checked two memtables
413 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count
);
414 // txn should fail because of conflict, even if the memtable
415 // has flushed, because it is still preserved in history.
416 ASSERT_TRUE(s
.IsBusy());
418 get_perf_context()->Reset();
420 // We should have checked two memtables
421 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count
);
424 txn3
->Put(Slice("foo2"), Slice("bar2"));
425 get_perf_context()->Reset();
427 // txn3 is created after the active memtable is created, so that is the only
428 // memtable to check.
429 ASSERT_EQ(1, get_perf_context()->get_from_memtable_count
);
432 TEST_SYNC_POINT("OptimisticTransactionTest.CheckKeySkipOldMemtable");
433 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
435 SetPerfLevel(PerfLevel::kDisable
);
443 TEST_P(OptimisticTransactionTest
, NoSnapshotTest
) {
444 WriteOptions write_options
;
445 ReadOptions read_options
;
449 txn_db
->Put(write_options
, "AAA", "bar");
451 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
454 // Modify key after transaction start
455 txn_db
->Put(write_options
, "AAA", "bar1");
457 // Read and write without a snapshot
458 txn
->GetForUpdate(read_options
, "AAA", &value
);
459 ASSERT_EQ(value
, "bar1");
460 txn
->Put("AAA", "bar2");
462 // Should commit since read/write was done after data changed
466 txn
->GetForUpdate(read_options
, "AAA", &value
);
467 ASSERT_EQ(value
, "bar2");
472 TEST_P(OptimisticTransactionTest
, MultipleSnapshotTest
) {
473 WriteOptions write_options
;
474 ReadOptions read_options
, snapshot_read_options
;
478 txn_db
->Put(write_options
, "AAA", "bar");
479 txn_db
->Put(write_options
, "BBB", "bar");
480 txn_db
->Put(write_options
, "CCC", "bar");
482 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
485 txn_db
->Put(write_options
, "AAA", "bar1");
487 // Read and write without a snapshot
488 txn
->GetForUpdate(read_options
, "AAA", &value
);
489 ASSERT_EQ(value
, "bar1");
490 txn
->Put("AAA", "bar2");
492 // Modify BBB before snapshot is taken
493 txn_db
->Put(write_options
, "BBB", "bar1");
496 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
498 // Read and write with snapshot
499 txn
->GetForUpdate(snapshot_read_options
, "BBB", &value
);
500 ASSERT_EQ(value
, "bar1");
501 txn
->Put("BBB", "bar2");
503 txn_db
->Put(write_options
, "CCC", "bar1");
505 // Set a new snapshot
507 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
509 // Read and write with snapshot
510 txn
->GetForUpdate(snapshot_read_options
, "CCC", &value
);
511 ASSERT_EQ(value
, "bar1");
512 txn
->Put("CCC", "bar2");
514 s
= txn
->GetForUpdate(read_options
, "AAA", &value
);
516 ASSERT_EQ(value
, "bar2");
517 s
= txn
->GetForUpdate(read_options
, "BBB", &value
);
519 ASSERT_EQ(value
, "bar2");
520 s
= txn
->GetForUpdate(read_options
, "CCC", &value
);
522 ASSERT_EQ(value
, "bar2");
524 s
= txn_db
->Get(read_options
, "AAA", &value
);
526 ASSERT_EQ(value
, "bar1");
527 s
= txn_db
->Get(read_options
, "BBB", &value
);
529 ASSERT_EQ(value
, "bar1");
530 s
= txn_db
->Get(read_options
, "CCC", &value
);
532 ASSERT_EQ(value
, "bar1");
537 s
= txn_db
->Get(read_options
, "AAA", &value
);
539 ASSERT_EQ(value
, "bar2");
540 s
= txn_db
->Get(read_options
, "BBB", &value
);
542 ASSERT_EQ(value
, "bar2");
543 s
= txn_db
->Get(read_options
, "CCC", &value
);
545 ASSERT_EQ(value
, "bar2");
547 // verify that we track multiple writes to the same key at different snapshots
549 txn
= txn_db
->BeginTransaction(write_options
);
551 // Potentially conflicting writes
552 txn_db
->Put(write_options
, "ZZZ", "zzz");
553 txn_db
->Put(write_options
, "XXX", "xxx");
557 OptimisticTransactionOptions txn_options
;
558 txn_options
.set_snapshot
= true;
559 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
562 // This should not conflict in txn since the snapshot is later than the
563 // previous write (spoiler alert: it will later conflict with txn2).
564 txn
->Put("ZZZ", "zzzz");
570 // This will conflict since the snapshot is earlier than another write to ZZZ
571 txn2
->Put("ZZZ", "xxxxx");
574 ASSERT_TRUE(s
.IsBusy());
579 TEST_P(OptimisticTransactionTest
, ColumnFamiliesTest
) {
580 WriteOptions write_options
;
581 ReadOptions read_options
, snapshot_read_options
;
582 OptimisticTransactionOptions txn_options
;
586 ColumnFamilyHandle
*cfa
, *cfb
;
587 ColumnFamilyOptions cf_options
;
589 // Create 2 new column families
590 s
= txn_db
->CreateColumnFamily(cf_options
, "CFA", &cfa
);
592 s
= txn_db
->CreateColumnFamily(cf_options
, "CFB", &cfb
);
600 // open DB with three column families
601 std::vector
<ColumnFamilyDescriptor
> column_families
;
602 // have to open default column family
603 column_families
.push_back(
604 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, ColumnFamilyOptions()));
605 // open the new column families
606 column_families
.push_back(
607 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
608 column_families
.push_back(
609 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
610 std::vector
<ColumnFamilyHandle
*> handles
;
611 s
= OptimisticTransactionDB::Open(options
, dbname
, column_families
, &handles
,
614 assert(txn_db
!= nullptr);
616 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
620 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
622 txn_options
.set_snapshot
= true;
623 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
626 // Write some data to the db
628 batch
.Put("foo", "foo");
629 batch
.Put(handles
[1], "AAA", "bar");
630 batch
.Put(handles
[1], "AAAZZZ", "bar");
631 s
= txn_db
->Write(write_options
, &batch
);
633 txn_db
->Delete(write_options
, handles
[1], "AAAZZZ");
635 // These keys do no conflict with existing writes since they're in
636 // different column families
638 txn
->GetForUpdate(snapshot_read_options
, handles
[1], "foo", &value
);
639 Slice
key_slice("AAAZZZ");
640 Slice value_slices
[2] = {Slice("bar"), Slice("bar")};
641 txn
->Put(handles
[2], SliceParts(&key_slice
, 1), SliceParts(value_slices
, 2));
643 ASSERT_EQ(3, txn
->GetNumKeys());
648 s
= txn_db
->Get(read_options
, "AAA", &value
);
649 ASSERT_TRUE(s
.IsNotFound());
650 s
= txn_db
->Get(read_options
, handles
[2], "AAAZZZ", &value
);
651 ASSERT_EQ(value
, "barbar");
653 Slice key_slices
[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
654 Slice
value_slice("barbarbar");
655 // This write will cause a conflict with the earlier batch write
656 txn2
->Put(handles
[1], SliceParts(key_slices
, 3), SliceParts(&value_slice
, 1));
658 txn2
->Delete(handles
[2], "XXX");
659 txn2
->Delete(handles
[1], "XXX");
660 s
= txn2
->GetForUpdate(snapshot_read_options
, handles
[1], "AAA", &value
);
661 ASSERT_TRUE(s
.IsNotFound());
663 // Verify txn did not commit
665 ASSERT_TRUE(s
.IsBusy());
666 s
= txn_db
->Get(read_options
, handles
[1], "AAAZZZ", &value
);
667 ASSERT_EQ(value
, "barbar");
672 txn
= txn_db
->BeginTransaction(write_options
, txn_options
);
673 snapshot_read_options
.snapshot
= txn
->GetSnapshot();
675 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
678 std::vector
<ColumnFamilyHandle
*> multiget_cfh
= {handles
[1], handles
[2],
679 handles
[0], handles
[2]};
680 std::vector
<Slice
> multiget_keys
= {"AAA", "AAAZZZ", "foo", "foo"};
681 std::vector
<std::string
> values(4);
683 std::vector
<Status
> results
= txn
->MultiGetForUpdate(
684 snapshot_read_options
, multiget_cfh
, multiget_keys
, &values
);
685 ASSERT_OK(results
[0]);
686 ASSERT_OK(results
[1]);
687 ASSERT_OK(results
[2]);
688 ASSERT_TRUE(results
[3].IsNotFound());
689 ASSERT_EQ(values
[0], "bar");
690 ASSERT_EQ(values
[1], "barbar");
691 ASSERT_EQ(values
[2], "foo");
693 txn
->Delete(handles
[2], "ZZZ");
694 txn
->Put(handles
[2], "ZZZ", "YYY");
695 txn
->Put(handles
[2], "ZZZ", "YYYY");
696 txn
->Delete(handles
[2], "ZZZ");
697 txn
->Put(handles
[2], "AAAZZZ", "barbarbar");
699 ASSERT_EQ(5, txn
->GetNumKeys());
704 s
= txn_db
->Get(read_options
, handles
[2], "ZZZ", &value
);
705 ASSERT_TRUE(s
.IsNotFound());
707 // Put a key which will conflict with the next txn using the previous snapshot
708 txn_db
->Put(write_options
, handles
[2], "foo", "000");
710 results
= txn2
->MultiGetForUpdate(snapshot_read_options
, multiget_cfh
,
711 multiget_keys
, &values
);
712 ASSERT_OK(results
[0]);
713 ASSERT_OK(results
[1]);
714 ASSERT_OK(results
[2]);
715 ASSERT_TRUE(results
[3].IsNotFound());
716 ASSERT_EQ(values
[0], "bar");
717 ASSERT_EQ(values
[1], "barbar");
718 ASSERT_EQ(values
[2], "foo");
720 // Verify Txn Did not Commit
722 ASSERT_TRUE(s
.IsBusy());
724 s
= txn_db
->DropColumnFamily(handles
[1]);
726 s
= txn_db
->DropColumnFamily(handles
[2]);
732 for (auto handle
: handles
) {
737 TEST_P(OptimisticTransactionTest
, EmptyTest
) {
738 WriteOptions write_options
;
739 ReadOptions read_options
;
743 s
= txn_db
->Put(write_options
, "aaa", "aaa");
746 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
751 txn
= txn_db
->BeginTransaction(write_options
);
755 txn
= txn_db
->BeginTransaction(write_options
);
756 s
= txn
->GetForUpdate(read_options
, "aaa", &value
);
757 ASSERT_EQ(value
, "aaa");
763 txn
= txn_db
->BeginTransaction(write_options
);
765 s
= txn
->GetForUpdate(read_options
, "aaa", &value
);
766 ASSERT_EQ(value
, "aaa");
768 s
= txn_db
->Put(write_options
, "aaa", "xxx");
770 ASSERT_TRUE(s
.IsBusy());
774 TEST_P(OptimisticTransactionTest
, PredicateManyPreceders
) {
775 WriteOptions write_options
;
776 ReadOptions read_options1
, read_options2
;
777 OptimisticTransactionOptions txn_options
;
781 txn_options
.set_snapshot
= true;
782 Transaction
* txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
783 read_options1
.snapshot
= txn1
->GetSnapshot();
785 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
);
787 read_options2
.snapshot
= txn2
->GetSnapshot();
789 std::vector
<Slice
> multiget_keys
= {"1", "2", "3"};
790 std::vector
<std::string
> multiget_values
;
792 std::vector
<Status
> results
=
793 txn1
->MultiGetForUpdate(read_options1
, multiget_keys
, &multiget_values
);
794 ASSERT_TRUE(results
[1].IsNotFound());
801 multiget_values
.clear();
803 txn1
->MultiGetForUpdate(read_options1
, multiget_keys
, &multiget_values
);
804 ASSERT_TRUE(results
[1].IsNotFound());
806 // should not commit since txn2 wrote a key txn has read
808 ASSERT_TRUE(s
.IsBusy());
813 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
814 read_options1
.snapshot
= txn1
->GetSnapshot();
816 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
817 read_options2
.snapshot
= txn2
->GetSnapshot();
823 // txn1 can commit since txn2's delete hasn't happened yet (it's just batched)
827 s
= txn2
->GetForUpdate(read_options2
, "4", &value
);
828 ASSERT_TRUE(s
.IsNotFound());
830 // txn2 cannot commit since txn1 changed "4"
832 ASSERT_TRUE(s
.IsBusy());
838 TEST_P(OptimisticTransactionTest
, LostUpdate
) {
839 WriteOptions write_options
;
840 ReadOptions read_options
, read_options1
, read_options2
;
841 OptimisticTransactionOptions txn_options
;
845 // Test 2 transactions writing to the same key in multiple orders and
846 // with/without snapshots
848 Transaction
* txn1
= txn_db
->BeginTransaction(write_options
);
849 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
);
858 ASSERT_TRUE(s
.IsBusy());
863 txn_options
.set_snapshot
= true;
864 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
865 read_options1
.snapshot
= txn1
->GetSnapshot();
867 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
868 read_options2
.snapshot
= txn2
->GetSnapshot();
877 ASSERT_TRUE(s
.IsBusy());
882 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
883 read_options1
.snapshot
= txn1
->GetSnapshot();
885 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
886 read_options2
.snapshot
= txn2
->GetSnapshot();
894 ASSERT_TRUE(s
.IsBusy());
899 txn1
= txn_db
->BeginTransaction(write_options
, txn_options
);
900 read_options1
.snapshot
= txn1
->GetSnapshot();
902 txn2
= txn_db
->BeginTransaction(write_options
, txn_options
);
903 read_options2
.snapshot
= txn2
->GetSnapshot();
917 txn1
= txn_db
->BeginTransaction(write_options
);
918 txn2
= txn_db
->BeginTransaction(write_options
);
931 s
= txn_db
->Get(read_options
, "1", &value
);
933 ASSERT_EQ(value
, "8");
936 TEST_P(OptimisticTransactionTest
, UntrackedWrites
) {
937 WriteOptions write_options
;
938 ReadOptions read_options
;
942 // Verify transaction rollback works for untracked keys.
943 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
944 txn
->PutUntracked("untracked", "0");
946 s
= txn_db
->Get(read_options
, "untracked", &value
);
947 ASSERT_TRUE(s
.IsNotFound());
950 txn
= txn_db
->BeginTransaction(write_options
);
952 txn
->Put("tracked", "1");
953 txn
->PutUntracked("untracked", "1");
954 txn
->MergeUntracked("untracked", "2");
955 txn
->DeleteUntracked("untracked");
957 // Write to the untracked key outside of the transaction and verify
958 // it doesn't prevent the transaction from committing.
959 s
= txn_db
->Put(write_options
, "untracked", "x");
965 s
= txn_db
->Get(read_options
, "untracked", &value
);
966 ASSERT_TRUE(s
.IsNotFound());
969 txn
= txn_db
->BeginTransaction(write_options
);
971 txn
->Put("tracked", "10");
972 txn
->PutUntracked("untracked", "A");
974 // Write to tracked key outside of the transaction and verify that the
975 // untracked keys are not written when the commit fails.
976 s
= txn_db
->Delete(write_options
, "tracked");
979 ASSERT_TRUE(s
.IsBusy());
981 s
= txn_db
->Get(read_options
, "untracked", &value
);
982 ASSERT_TRUE(s
.IsNotFound());
987 TEST_P(OptimisticTransactionTest
, IteratorTest
) {
988 WriteOptions write_options
;
989 ReadOptions read_options
, snapshot_read_options
;
990 OptimisticTransactionOptions txn_options
;
994 // Write some keys to the db
995 s
= txn_db
->Put(write_options
, "A", "a");
998 s
= txn_db
->Put(write_options
, "G", "g");
1001 s
= txn_db
->Put(write_options
, "F", "f");
1004 s
= txn_db
->Put(write_options
, "C", "c");
1007 s
= txn_db
->Put(write_options
, "D", "d");
1010 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
1013 // Write some keys in a txn
1014 s
= txn
->Put("B", "b");
1017 s
= txn
->Put("H", "h");
1020 s
= txn
->Delete("D");
1023 s
= txn
->Put("E", "e");
1027 const Snapshot
* snapshot
= txn
->GetSnapshot();
1029 // Write some keys to the db after the snapshot
1030 s
= txn_db
->Put(write_options
, "BB", "xx");
1033 s
= txn_db
->Put(write_options
, "C", "xx");
1036 read_options
.snapshot
= snapshot
;
1037 Iterator
* iter
= txn
->GetIterator(read_options
);
1038 ASSERT_OK(iter
->status());
1039 iter
->SeekToFirst();
1041 // Read all keys via iter and lock them all
1042 std::string results
[] = {"a", "b", "c", "e", "f", "g", "h"};
1043 for (int i
= 0; i
< 7; i
++) {
1044 ASSERT_OK(iter
->status());
1045 ASSERT_TRUE(iter
->Valid());
1046 ASSERT_EQ(results
[i
], iter
->value().ToString());
1048 s
= txn
->GetForUpdate(read_options
, iter
->key(), nullptr);
1053 ASSERT_FALSE(iter
->Valid());
1056 ASSERT_OK(iter
->status());
1057 ASSERT_TRUE(iter
->Valid());
1058 ASSERT_EQ("g", iter
->value().ToString());
1061 ASSERT_OK(iter
->status());
1062 ASSERT_TRUE(iter
->Valid());
1063 ASSERT_EQ("f", iter
->value().ToString());
1066 ASSERT_OK(iter
->status());
1067 ASSERT_TRUE(iter
->Valid());
1068 ASSERT_EQ("e", iter
->value().ToString());
1071 ASSERT_OK(iter
->status());
1072 ASSERT_TRUE(iter
->Valid());
1073 ASSERT_EQ("c", iter
->value().ToString());
1076 ASSERT_OK(iter
->status());
1077 ASSERT_TRUE(iter
->Valid());
1078 ASSERT_EQ("e", iter
->value().ToString());
1081 ASSERT_OK(iter
->status());
1082 ASSERT_TRUE(iter
->Valid());
1083 ASSERT_EQ("a", iter
->value().ToString());
1086 ASSERT_OK(iter
->status());
1087 ASSERT_FALSE(iter
->Valid());
1090 ASSERT_OK(iter
->status());
1091 ASSERT_TRUE(iter
->Valid());
1092 ASSERT_EQ("h", iter
->value().ToString());
1094 // key "C" was modified in the db after txn's snapshot. txn will not commit.
1096 ASSERT_TRUE(s
.IsBusy());
1102 TEST_P(OptimisticTransactionTest
, SavepointTest
) {
1103 WriteOptions write_options
;
1104 ReadOptions read_options
, snapshot_read_options
;
1105 OptimisticTransactionOptions txn_options
;
1109 Transaction
* txn
= txn_db
->BeginTransaction(write_options
);
1112 s
= txn
->RollbackToSavePoint();
1113 ASSERT_TRUE(s
.IsNotFound());
1115 txn
->SetSavePoint(); // 1
1117 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to beginning of txn
1118 s
= txn
->RollbackToSavePoint();
1119 ASSERT_TRUE(s
.IsNotFound());
1121 s
= txn
->Put("B", "b");
1127 s
= txn_db
->Get(read_options
, "B", &value
);
1129 ASSERT_EQ("b", value
);
1132 txn
= txn_db
->BeginTransaction(write_options
);
1135 s
= txn
->Put("A", "a");
1138 s
= txn
->Put("B", "bb");
1141 s
= txn
->Put("C", "c");
1144 txn
->SetSavePoint(); // 2
1146 s
= txn
->Delete("B");
1149 s
= txn
->Put("C", "cc");
1152 s
= txn
->Put("D", "d");
1155 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to 2
1157 s
= txn
->Get(read_options
, "A", &value
);
1159 ASSERT_EQ("a", value
);
1161 s
= txn
->Get(read_options
, "B", &value
);
1163 ASSERT_EQ("bb", value
);
1165 s
= txn
->Get(read_options
, "C", &value
);
1167 ASSERT_EQ("c", value
);
1169 s
= txn
->Get(read_options
, "D", &value
);
1170 ASSERT_TRUE(s
.IsNotFound());
1172 s
= txn
->Put("A", "a");
1175 s
= txn
->Put("E", "e");
1178 // Rollback to beginning of txn
1179 s
= txn
->RollbackToSavePoint();
1180 ASSERT_TRUE(s
.IsNotFound());
1183 s
= txn
->Get(read_options
, "A", &value
);
1184 ASSERT_TRUE(s
.IsNotFound());
1186 s
= txn
->Get(read_options
, "B", &value
);
1188 ASSERT_EQ("b", value
);
1190 s
= txn
->Get(read_options
, "D", &value
);
1191 ASSERT_TRUE(s
.IsNotFound());
1193 s
= txn
->Get(read_options
, "D", &value
);
1194 ASSERT_TRUE(s
.IsNotFound());
1196 s
= txn
->Get(read_options
, "E", &value
);
1197 ASSERT_TRUE(s
.IsNotFound());
1199 s
= txn
->Put("A", "aa");
1202 s
= txn
->Put("F", "f");
1205 txn
->SetSavePoint(); // 3
1206 txn
->SetSavePoint(); // 4
1208 s
= txn
->Put("G", "g");
1211 s
= txn
->Delete("F");
1214 s
= txn
->Delete("B");
1217 s
= txn
->Get(read_options
, "A", &value
);
1219 ASSERT_EQ("aa", value
);
1221 s
= txn
->Get(read_options
, "F", &value
);
1222 ASSERT_TRUE(s
.IsNotFound());
1224 s
= txn
->Get(read_options
, "B", &value
);
1225 ASSERT_TRUE(s
.IsNotFound());
1227 ASSERT_OK(txn
->RollbackToSavePoint()); // Rollback to 3
1229 s
= txn
->Get(read_options
, "F", &value
);
1231 ASSERT_EQ("f", value
);
1233 s
= txn
->Get(read_options
, "G", &value
);
1234 ASSERT_TRUE(s
.IsNotFound());
1239 s
= txn_db
->Get(read_options
, "F", &value
);
1241 ASSERT_EQ("f", value
);
1243 s
= txn_db
->Get(read_options
, "G", &value
);
1244 ASSERT_TRUE(s
.IsNotFound());
1246 s
= txn_db
->Get(read_options
, "A", &value
);
1248 ASSERT_EQ("aa", value
);
1250 s
= txn_db
->Get(read_options
, "B", &value
);
1252 ASSERT_EQ("b", value
);
1254 s
= txn_db
->Get(read_options
, "C", &value
);
1255 ASSERT_TRUE(s
.IsNotFound());
1257 s
= txn_db
->Get(read_options
, "D", &value
);
1258 ASSERT_TRUE(s
.IsNotFound());
1260 s
= txn_db
->Get(read_options
, "E", &value
);
1261 ASSERT_TRUE(s
.IsNotFound());
1266 TEST_P(OptimisticTransactionTest
, UndoGetForUpdateTest
) {
1267 WriteOptions write_options
;
1268 ReadOptions read_options
, snapshot_read_options
;
1269 OptimisticTransactionOptions txn_options
;
1273 txn_db
->Put(write_options
, "A", "");
1275 Transaction
* txn1
= txn_db
->BeginTransaction(write_options
);
1278 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1281 txn1
->UndoGetForUpdate("A");
1283 Transaction
* txn2
= txn_db
->BeginTransaction(write_options
);
1284 txn2
->Put("A", "x");
1289 // Verify that txn1 can commit since A isn't conflict checked
1294 txn1
= txn_db
->BeginTransaction(write_options
);
1295 txn1
->Put("A", "a");
1297 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1300 txn1
->UndoGetForUpdate("A");
1302 txn2
= txn_db
->BeginTransaction(write_options
);
1303 txn2
->Put("A", "x");
1308 // Verify that txn1 cannot commit since A will still be conflict checked
1310 ASSERT_TRUE(s
.IsBusy());
1313 txn1
= txn_db
->BeginTransaction(write_options
);
1315 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1317 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1320 txn1
->UndoGetForUpdate("A");
1322 txn2
= txn_db
->BeginTransaction(write_options
);
1323 txn2
->Put("A", "x");
1328 // Verify that txn1 cannot commit since A will still be conflict checked
1330 ASSERT_TRUE(s
.IsBusy());
1333 txn1
= txn_db
->BeginTransaction(write_options
);
1335 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1337 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1340 txn1
->UndoGetForUpdate("A");
1341 txn1
->UndoGetForUpdate("A");
1343 txn2
= txn_db
->BeginTransaction(write_options
);
1344 txn2
->Put("A", "x");
1349 // Verify that txn1 can commit since A isn't conflict checked
1354 txn1
= txn_db
->BeginTransaction(write_options
);
1356 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1359 txn1
->SetSavePoint();
1360 txn1
->UndoGetForUpdate("A");
1362 txn2
= txn_db
->BeginTransaction(write_options
);
1363 txn2
->Put("A", "x");
1368 // Verify that txn1 cannot commit since A will still be conflict checked
1370 ASSERT_TRUE(s
.IsBusy());
1373 txn1
= txn_db
->BeginTransaction(write_options
);
1375 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1378 txn1
->SetSavePoint();
1379 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1381 txn1
->UndoGetForUpdate("A");
1383 txn2
= txn_db
->BeginTransaction(write_options
);
1384 txn2
->Put("A", "x");
1389 // Verify that txn1 cannot commit since A will still be conflict checked
1391 ASSERT_TRUE(s
.IsBusy());
1394 txn1
= txn_db
->BeginTransaction(write_options
);
1396 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1399 txn1
->SetSavePoint();
1400 s
= txn1
->GetForUpdate(read_options
, "A", &value
);
1402 txn1
->UndoGetForUpdate("A");
1404 txn1
->RollbackToSavePoint();
1405 txn1
->UndoGetForUpdate("A");
1407 txn2
= txn_db
->BeginTransaction(write_options
);
1408 txn2
->Put("A", "x");
1413 // Verify that txn1 can commit since A isn't conflict checked
1420 Status
OptimisticTransactionStressTestInserter(OptimisticTransactionDB
* db
,
1421 const size_t num_transactions
,
1422 const size_t num_sets
,
1423 const size_t num_keys_per_set
) {
1424 size_t seed
= std::hash
<std::thread::id
>()(std::this_thread::get_id());
1425 Random64
_rand(seed
);
1426 WriteOptions write_options
;
1427 ReadOptions read_options
;
1428 OptimisticTransactionOptions txn_options
;
1429 txn_options
.set_snapshot
= true;
1431 RandomTransactionInserter
inserter(&_rand
, write_options
, read_options
,
1433 static_cast<uint16_t>(num_sets
));
1435 for (size_t t
= 0; t
< num_transactions
; t
++) {
1436 bool success
= inserter
.OptimisticTransactionDBInsert(db
, txn_options
);
1438 // unexpected failure
1439 return inserter
.GetLastStatus();
1443 // Make sure at least some of the transactions succeeded. It's ok if
1444 // some failed due to write-conflicts.
1445 if (inserter
.GetFailureCount() > num_transactions
/ 2) {
1446 return Status::TryAgain("Too many transactions failed! " +
1447 std::to_string(inserter
.GetFailureCount()) + " / " +
1448 std::to_string(num_transactions
));
1451 return Status::OK();
1455 TEST_P(OptimisticTransactionTest
, OptimisticTransactionStressTest
) {
1456 const size_t num_threads
= 4;
1457 const size_t num_transactions_per_thread
= 10000;
1458 const size_t num_sets
= 3;
1459 const size_t num_keys_per_set
= 100;
1460 // Setting the key-space to be 100 keys should cause enough write-conflicts
1461 // to make this test interesting.
1463 std::vector
<port::Thread
> threads
;
1465 std::function
<void()> call_inserter
= [&] {
1466 ASSERT_OK(OptimisticTransactionStressTestInserter(
1467 txn_db
, num_transactions_per_thread
, num_sets
, num_keys_per_set
));
1470 // Create N threads that use RandomTransactionInserter to write
1471 // many transactions.
1472 for (uint32_t i
= 0; i
< num_threads
; i
++) {
1473 threads
.emplace_back(call_inserter
);
1476 // Wait for all threads to run
1477 for (auto& t
: threads
) {
1481 // Verify that data is consistent
1482 Status s
= RandomTransactionInserter::Verify(txn_db
, num_sets
);
1486 TEST_P(OptimisticTransactionTest
, SequenceNumberAfterRecoverTest
) {
1487 WriteOptions write_options
;
1488 OptimisticTransactionOptions transaction_options
;
1490 Transaction
* transaction(txn_db
->BeginTransaction(write_options
, transaction_options
));
1491 Status s
= transaction
->Put("foo", "val");
1493 s
= transaction
->Put("foo2", "val");
1495 s
= transaction
->Put("foo3", "val");
1497 s
= transaction
->Commit();
1502 transaction
= txn_db
->BeginTransaction(write_options
, transaction_options
);
1503 s
= transaction
->Put("bar", "val");
1505 s
= transaction
->Put("bar2", "val");
1507 s
= transaction
->Commit();
1513 INSTANTIATE_TEST_CASE_P(
1514 InstanceOccGroup
, OptimisticTransactionTest
,
1515 testing::Values(OccValidationPolicy::kValidateSerial
,
1516 OccValidationPolicy::kValidateParallel
));
1518 } // namespace ROCKSDB_NAMESPACE
1520 int main(int argc
, char** argv
) {
1521 ::testing::InitGoogleTest(&argc
, argv
);
1522 return RUN_ALL_TESTS();
1528 int main(int /*argc*/, char** /*argv*/) {
1531 "SKIPPED as optimistic_transaction is not supported in ROCKSDB_LITE\n");
1535 #endif // !ROCKSDB_LITE