]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/optimistic_transaction_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / optimistic_transaction_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include <functional>
9 #include <string>
10 #include <thread>
11
12 #include "db/db_impl/db_impl.h"
13 #include "logging/logging.h"
14 #include "port/port.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/perf_context.h"
17 #include "rocksdb/utilities/optimistic_transaction_db.h"
18 #include "rocksdb/utilities/transaction.h"
19 #include "test_util/sync_point.h"
20 #include "test_util/testharness.h"
21 #include "test_util/transaction_test_util.h"
22 #include "util/crc32c.h"
23 #include "util/random.h"
24
25 using std::string;
26
27 namespace ROCKSDB_NAMESPACE {
28
29 class OptimisticTransactionTest
30 : public testing::Test,
31 public testing::WithParamInterface<OccValidationPolicy> {
32 public:
33 OptimisticTransactionDB* txn_db;
34 string dbname;
35 Options options;
36
37 OptimisticTransactionTest() {
38 options.create_if_missing = true;
39 options.max_write_buffer_number = 2;
40 options.max_write_buffer_size_to_maintain = 1600;
41 dbname = test::PerThreadDBPath("optimistic_transaction_testdb");
42
43 DestroyDB(dbname, options);
44 Open();
45 }
46 ~OptimisticTransactionTest() override {
47 delete txn_db;
48 DestroyDB(dbname, options);
49 }
50
51 void Reopen() {
52 delete txn_db;
53 txn_db = nullptr;
54 Open();
55 }
56
57 private:
58 void Open() {
59 ColumnFamilyOptions cf_options(options);
60 OptimisticTransactionDBOptions occ_opts;
61 occ_opts.validate_policy = GetParam();
62 std::vector<ColumnFamilyDescriptor> column_families;
63 std::vector<ColumnFamilyHandle*> handles;
64 column_families.push_back(
65 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
66 Status s =
67 OptimisticTransactionDB::Open(DBOptions(options), occ_opts, dbname,
68 column_families, &handles, &txn_db);
69
70 assert(s.ok());
71 assert(txn_db != nullptr);
72 assert(handles.size() == 1);
73 delete handles[0];
74 }
75 };
76
77 TEST_P(OptimisticTransactionTest, SuccessTest) {
78 WriteOptions write_options;
79 ReadOptions read_options;
80 string value;
81 Status s;
82
83 txn_db->Put(write_options, Slice("foo"), Slice("bar"));
84 txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
85
86 Transaction* txn = txn_db->BeginTransaction(write_options);
87 ASSERT_TRUE(txn);
88
89 txn->GetForUpdate(read_options, "foo", &value);
90 ASSERT_EQ(value, "bar");
91
92 txn->Put(Slice("foo"), Slice("bar2"));
93
94 txn->GetForUpdate(read_options, "foo", &value);
95 ASSERT_EQ(value, "bar2");
96
97 s = txn->Commit();
98 ASSERT_OK(s);
99
100 txn_db->Get(read_options, "foo", &value);
101 ASSERT_EQ(value, "bar2");
102
103 delete txn;
104 }
105
106 TEST_P(OptimisticTransactionTest, WriteConflictTest) {
107 WriteOptions write_options;
108 ReadOptions read_options;
109 string value;
110 Status s;
111
112 txn_db->Put(write_options, "foo", "bar");
113 txn_db->Put(write_options, "foo2", "bar");
114
115 Transaction* txn = txn_db->BeginTransaction(write_options);
116 ASSERT_TRUE(txn);
117
118 txn->Put("foo", "bar2");
119
120 // This Put outside of a transaction will conflict with the previous write
121 s = txn_db->Put(write_options, "foo", "barz");
122 ASSERT_OK(s);
123
124 s = txn_db->Get(read_options, "foo", &value);
125 ASSERT_EQ(value, "barz");
126 ASSERT_EQ(1, txn->GetNumKeys());
127
128 s = txn->Commit();
129 ASSERT_TRUE(s.IsBusy()); // Txn should not commit
130
131 // Verify that transaction did not write anything
132 txn_db->Get(read_options, "foo", &value);
133 ASSERT_EQ(value, "barz");
134 txn_db->Get(read_options, "foo2", &value);
135 ASSERT_EQ(value, "bar");
136
137 delete txn;
138 }
139
140 TEST_P(OptimisticTransactionTest, WriteConflictTest2) {
141 WriteOptions write_options;
142 ReadOptions read_options;
143 OptimisticTransactionOptions txn_options;
144 string value;
145 Status s;
146
147 txn_db->Put(write_options, "foo", "bar");
148 txn_db->Put(write_options, "foo2", "bar");
149
150 txn_options.set_snapshot = true;
151 Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
152 ASSERT_TRUE(txn);
153
154 // This Put outside of a transaction will conflict with a later write
155 s = txn_db->Put(write_options, "foo", "barz");
156 ASSERT_OK(s);
157
158 txn->Put("foo", "bar2"); // Conflicts with write done after snapshot taken
159
160 s = txn_db->Get(read_options, "foo", &value);
161 ASSERT_EQ(value, "barz");
162
163 s = txn->Commit();
164 ASSERT_TRUE(s.IsBusy()); // Txn should not commit
165
166 // Verify that transaction did not write anything
167 txn_db->Get(read_options, "foo", &value);
168 ASSERT_EQ(value, "barz");
169 txn_db->Get(read_options, "foo2", &value);
170 ASSERT_EQ(value, "bar");
171
172 delete txn;
173 }
174
175 TEST_P(OptimisticTransactionTest, ReadConflictTest) {
176 WriteOptions write_options;
177 ReadOptions read_options, snapshot_read_options;
178 OptimisticTransactionOptions txn_options;
179 string value;
180 Status s;
181
182 txn_db->Put(write_options, "foo", "bar");
183 txn_db->Put(write_options, "foo2", "bar");
184
185 txn_options.set_snapshot = true;
186 Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
187 ASSERT_TRUE(txn);
188
189 txn->SetSnapshot();
190 snapshot_read_options.snapshot = txn->GetSnapshot();
191
192 txn->GetForUpdate(snapshot_read_options, "foo", &value);
193 ASSERT_EQ(value, "bar");
194
195 // This Put outside of a transaction will conflict with the previous read
196 s = txn_db->Put(write_options, "foo", "barz");
197 ASSERT_OK(s);
198
199 s = txn_db->Get(read_options, "foo", &value);
200 ASSERT_EQ(value, "barz");
201
202 s = txn->Commit();
203 ASSERT_TRUE(s.IsBusy()); // Txn should not commit
204
205 // Verify that transaction did not write anything
206 txn->GetForUpdate(read_options, "foo", &value);
207 ASSERT_EQ(value, "barz");
208 txn->GetForUpdate(read_options, "foo2", &value);
209 ASSERT_EQ(value, "bar");
210
211 delete txn;
212 }
213
214 TEST_P(OptimisticTransactionTest, TxnOnlyTest) {
215 // Test to make sure transactions work when there are no other writes in an
216 // empty db.
217
218 WriteOptions write_options;
219 ReadOptions read_options;
220 string value;
221 Status s;
222
223 Transaction* txn = txn_db->BeginTransaction(write_options);
224 ASSERT_TRUE(txn);
225
226 txn->Put("x", "y");
227
228 s = txn->Commit();
229 ASSERT_OK(s);
230
231 delete txn;
232 }
233
234 TEST_P(OptimisticTransactionTest, FlushTest) {
235 WriteOptions write_options;
236 ReadOptions read_options, snapshot_read_options;
237 string value;
238 Status s;
239
240 txn_db->Put(write_options, Slice("foo"), Slice("bar"));
241 txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
242
243 Transaction* txn = txn_db->BeginTransaction(write_options);
244 ASSERT_TRUE(txn);
245
246 snapshot_read_options.snapshot = txn->GetSnapshot();
247
248 txn->GetForUpdate(snapshot_read_options, "foo", &value);
249 ASSERT_EQ(value, "bar");
250
251 txn->Put(Slice("foo"), Slice("bar2"));
252
253 txn->GetForUpdate(snapshot_read_options, "foo", &value);
254 ASSERT_EQ(value, "bar2");
255
256 // Put a random key so we have a memtable to flush
257 s = txn_db->Put(write_options, "dummy", "dummy");
258 ASSERT_OK(s);
259
260 // force a memtable flush
261 FlushOptions flush_ops;
262 txn_db->Flush(flush_ops);
263
264 s = txn->Commit();
265 // txn should commit since the flushed table is still in MemtableList History
266 ASSERT_OK(s);
267
268 txn_db->Get(read_options, "foo", &value);
269 ASSERT_EQ(value, "bar2");
270
271 delete txn;
272 }
273
274 TEST_P(OptimisticTransactionTest, FlushTest2) {
275 WriteOptions write_options;
276 ReadOptions read_options, snapshot_read_options;
277 string value;
278 Status s;
279
280 txn_db->Put(write_options, Slice("foo"), Slice("bar"));
281 txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
282
283 Transaction* txn = txn_db->BeginTransaction(write_options);
284 ASSERT_TRUE(txn);
285
286 snapshot_read_options.snapshot = txn->GetSnapshot();
287
288 txn->GetForUpdate(snapshot_read_options, "foo", &value);
289 ASSERT_EQ(value, "bar");
290
291 txn->Put(Slice("foo"), Slice("bar2"));
292
293 txn->GetForUpdate(snapshot_read_options, "foo", &value);
294 ASSERT_EQ(value, "bar2");
295
296 // Put a random key so we have a MemTable to flush
297 s = txn_db->Put(write_options, "dummy", "dummy");
298 ASSERT_OK(s);
299
300 // force a memtable flush
301 FlushOptions flush_ops;
302 txn_db->Flush(flush_ops);
303
304 // Put a random key so we have a MemTable to flush
305 s = txn_db->Put(write_options, "dummy", "dummy2");
306 ASSERT_OK(s);
307
308 // force a memtable flush
309 txn_db->Flush(flush_ops);
310
311 s = txn_db->Put(write_options, "dummy", "dummy3");
312 ASSERT_OK(s);
313
314 // force a memtable flush
315 // Since our test db has max_write_buffer_number=2, this flush will cause
316 // the first memtable to get purged from the MemtableList history.
317 txn_db->Flush(flush_ops);
318
319 s = txn->Commit();
320 // txn should not commit since MemTableList History is not large enough
321 ASSERT_TRUE(s.IsTryAgain());
322
323 txn_db->Get(read_options, "foo", &value);
324 ASSERT_EQ(value, "bar");
325
326 delete txn;
327 }
328
329 // Trigger the condition where some old memtables are skipped when doing
330 // TransactionUtil::CheckKey(), and make sure the result is still correct.
331 TEST_P(OptimisticTransactionTest, CheckKeySkipOldMemtable) {
332 const int kAttemptHistoryMemtable = 0;
333 const int kAttemptImmMemTable = 1;
334 for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
335 attempt++) {
336 options.max_write_buffer_number_to_maintain = 3;
337 Reopen();
338
339 WriteOptions write_options;
340 ReadOptions read_options;
341 ReadOptions snapshot_read_options;
342 ReadOptions snapshot_read_options2;
343 string value;
344 Status s;
345
346 ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar")));
347 ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar")));
348
349 Transaction* txn = txn_db->BeginTransaction(write_options);
350 ASSERT_TRUE(txn != nullptr);
351
352 Transaction* txn2 = txn_db->BeginTransaction(write_options);
353 ASSERT_TRUE(txn2 != nullptr);
354
355 snapshot_read_options.snapshot = txn->GetSnapshot();
356 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
357 ASSERT_EQ(value, "bar");
358 ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
359
360 snapshot_read_options2.snapshot = txn2->GetSnapshot();
361 ASSERT_OK(txn2->GetForUpdate(snapshot_read_options2, "foo2", &value));
362 ASSERT_EQ(value, "bar");
363 ASSERT_OK(txn2->Put(Slice("foo2"), Slice("bar2")));
364
365 // txn updates "foo" and txn2 updates "foo2", and now a write is
366 // issued for "foo", which conflicts with txn but not txn2
367 ASSERT_OK(txn_db->Put(write_options, "foo", "bar"));
368
369 if (attempt == kAttemptImmMemTable) {
370 // For the second attempt, hold flush from beginning. The memtable
371 // will be switched to immutable after calling TEST_SwitchMemtable()
372 // while CheckKey() is called.
373 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
374 {{"OptimisticTransactionTest.CheckKeySkipOldMemtable",
375 "FlushJob::Start"}});
376 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
377 }
378
379 // force a memtable flush. The memtable should still be kept
380 FlushOptions flush_ops;
381 if (attempt == kAttemptHistoryMemtable) {
382 ASSERT_OK(txn_db->Flush(flush_ops));
383 } else {
384 assert(attempt == kAttemptImmMemTable);
385 DBImpl* db_impl = static_cast<DBImpl*>(txn_db->GetRootDB());
386 db_impl->TEST_SwitchMemtable();
387 }
388 uint64_t num_imm_mems;
389 ASSERT_TRUE(txn_db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
390 &num_imm_mems));
391 if (attempt == kAttemptHistoryMemtable) {
392 ASSERT_EQ(0, num_imm_mems);
393 } else {
394 assert(attempt == kAttemptImmMemTable);
395 ASSERT_EQ(1, num_imm_mems);
396 }
397
398 // Put something in active memtable
399 ASSERT_OK(txn_db->Put(write_options, Slice("foo3"), Slice("bar")));
400
401 // Create txn3 after flushing, when this transaction is commited,
402 // only need to check the active memtable
403 Transaction* txn3 = txn_db->BeginTransaction(write_options);
404 ASSERT_TRUE(txn3 != nullptr);
405
406 // Commit both of txn and txn2. txn will conflict but txn2 will
407 // pass. In both ways, both memtables are queried.
408 SetPerfLevel(PerfLevel::kEnableCount);
409
410 get_perf_context()->Reset();
411 s = txn->Commit();
412 // We should have checked two memtables
413 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
414 // txn should fail because of conflict, even if the memtable
415 // has flushed, because it is still preserved in history.
416 ASSERT_TRUE(s.IsBusy());
417
418 get_perf_context()->Reset();
419 s = txn2->Commit();
420 // We should have checked two memtables
421 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
422 ASSERT_TRUE(s.ok());
423
424 txn3->Put(Slice("foo2"), Slice("bar2"));
425 get_perf_context()->Reset();
426 s = txn3->Commit();
427 // txn3 is created after the active memtable is created, so that is the only
428 // memtable to check.
429 ASSERT_EQ(1, get_perf_context()->get_from_memtable_count);
430 ASSERT_TRUE(s.ok());
431
432 TEST_SYNC_POINT("OptimisticTransactionTest.CheckKeySkipOldMemtable");
433 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
434
435 SetPerfLevel(PerfLevel::kDisable);
436
437 delete txn;
438 delete txn2;
439 delete txn3;
440 }
441 }
442
443 TEST_P(OptimisticTransactionTest, NoSnapshotTest) {
444 WriteOptions write_options;
445 ReadOptions read_options;
446 string value;
447 Status s;
448
449 txn_db->Put(write_options, "AAA", "bar");
450
451 Transaction* txn = txn_db->BeginTransaction(write_options);
452 ASSERT_TRUE(txn);
453
454 // Modify key after transaction start
455 txn_db->Put(write_options, "AAA", "bar1");
456
457 // Read and write without a snapshot
458 txn->GetForUpdate(read_options, "AAA", &value);
459 ASSERT_EQ(value, "bar1");
460 txn->Put("AAA", "bar2");
461
462 // Should commit since read/write was done after data changed
463 s = txn->Commit();
464 ASSERT_OK(s);
465
466 txn->GetForUpdate(read_options, "AAA", &value);
467 ASSERT_EQ(value, "bar2");
468
469 delete txn;
470 }
471
472 TEST_P(OptimisticTransactionTest, MultipleSnapshotTest) {
473 WriteOptions write_options;
474 ReadOptions read_options, snapshot_read_options;
475 string value;
476 Status s;
477
478 txn_db->Put(write_options, "AAA", "bar");
479 txn_db->Put(write_options, "BBB", "bar");
480 txn_db->Put(write_options, "CCC", "bar");
481
482 Transaction* txn = txn_db->BeginTransaction(write_options);
483 ASSERT_TRUE(txn);
484
485 txn_db->Put(write_options, "AAA", "bar1");
486
487 // Read and write without a snapshot
488 txn->GetForUpdate(read_options, "AAA", &value);
489 ASSERT_EQ(value, "bar1");
490 txn->Put("AAA", "bar2");
491
492 // Modify BBB before snapshot is taken
493 txn_db->Put(write_options, "BBB", "bar1");
494
495 txn->SetSnapshot();
496 snapshot_read_options.snapshot = txn->GetSnapshot();
497
498 // Read and write with snapshot
499 txn->GetForUpdate(snapshot_read_options, "BBB", &value);
500 ASSERT_EQ(value, "bar1");
501 txn->Put("BBB", "bar2");
502
503 txn_db->Put(write_options, "CCC", "bar1");
504
505 // Set a new snapshot
506 txn->SetSnapshot();
507 snapshot_read_options.snapshot = txn->GetSnapshot();
508
509 // Read and write with snapshot
510 txn->GetForUpdate(snapshot_read_options, "CCC", &value);
511 ASSERT_EQ(value, "bar1");
512 txn->Put("CCC", "bar2");
513
514 s = txn->GetForUpdate(read_options, "AAA", &value);
515 ASSERT_OK(s);
516 ASSERT_EQ(value, "bar2");
517 s = txn->GetForUpdate(read_options, "BBB", &value);
518 ASSERT_OK(s);
519 ASSERT_EQ(value, "bar2");
520 s = txn->GetForUpdate(read_options, "CCC", &value);
521 ASSERT_OK(s);
522 ASSERT_EQ(value, "bar2");
523
524 s = txn_db->Get(read_options, "AAA", &value);
525 ASSERT_OK(s);
526 ASSERT_EQ(value, "bar1");
527 s = txn_db->Get(read_options, "BBB", &value);
528 ASSERT_OK(s);
529 ASSERT_EQ(value, "bar1");
530 s = txn_db->Get(read_options, "CCC", &value);
531 ASSERT_OK(s);
532 ASSERT_EQ(value, "bar1");
533
534 s = txn->Commit();
535 ASSERT_OK(s);
536
537 s = txn_db->Get(read_options, "AAA", &value);
538 ASSERT_OK(s);
539 ASSERT_EQ(value, "bar2");
540 s = txn_db->Get(read_options, "BBB", &value);
541 ASSERT_OK(s);
542 ASSERT_EQ(value, "bar2");
543 s = txn_db->Get(read_options, "CCC", &value);
544 ASSERT_OK(s);
545 ASSERT_EQ(value, "bar2");
546
547 // verify that we track multiple writes to the same key at different snapshots
548 delete txn;
549 txn = txn_db->BeginTransaction(write_options);
550
551 // Potentially conflicting writes
552 txn_db->Put(write_options, "ZZZ", "zzz");
553 txn_db->Put(write_options, "XXX", "xxx");
554
555 txn->SetSnapshot();
556
557 OptimisticTransactionOptions txn_options;
558 txn_options.set_snapshot = true;
559 Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options);
560 txn2->SetSnapshot();
561
562 // This should not conflict in txn since the snapshot is later than the
563 // previous write (spoiler alert: it will later conflict with txn2).
564 txn->Put("ZZZ", "zzzz");
565 s = txn->Commit();
566 ASSERT_OK(s);
567
568 delete txn;
569
570 // This will conflict since the snapshot is earlier than another write to ZZZ
571 txn2->Put("ZZZ", "xxxxx");
572
573 s = txn2->Commit();
574 ASSERT_TRUE(s.IsBusy());
575
576 delete txn2;
577 }
578
579 TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) {
580 WriteOptions write_options;
581 ReadOptions read_options, snapshot_read_options;
582 OptimisticTransactionOptions txn_options;
583 string value;
584 Status s;
585
586 ColumnFamilyHandle *cfa, *cfb;
587 ColumnFamilyOptions cf_options;
588
589 // Create 2 new column families
590 s = txn_db->CreateColumnFamily(cf_options, "CFA", &cfa);
591 ASSERT_OK(s);
592 s = txn_db->CreateColumnFamily(cf_options, "CFB", &cfb);
593 ASSERT_OK(s);
594
595 delete cfa;
596 delete cfb;
597 delete txn_db;
598 txn_db = nullptr;
599
600 // open DB with three column families
601 std::vector<ColumnFamilyDescriptor> column_families;
602 // have to open default column family
603 column_families.push_back(
604 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
605 // open the new column families
606 column_families.push_back(
607 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
608 column_families.push_back(
609 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
610 std::vector<ColumnFamilyHandle*> handles;
611 s = OptimisticTransactionDB::Open(options, dbname, column_families, &handles,
612 &txn_db);
613 ASSERT_OK(s);
614 assert(txn_db != nullptr);
615
616 Transaction* txn = txn_db->BeginTransaction(write_options);
617 ASSERT_TRUE(txn);
618
619 txn->SetSnapshot();
620 snapshot_read_options.snapshot = txn->GetSnapshot();
621
622 txn_options.set_snapshot = true;
623 Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options);
624 ASSERT_TRUE(txn2);
625
626 // Write some data to the db
627 WriteBatch batch;
628 batch.Put("foo", "foo");
629 batch.Put(handles[1], "AAA", "bar");
630 batch.Put(handles[1], "AAAZZZ", "bar");
631 s = txn_db->Write(write_options, &batch);
632 ASSERT_OK(s);
633 txn_db->Delete(write_options, handles[1], "AAAZZZ");
634
635 // These keys do no conflict with existing writes since they're in
636 // different column families
637 txn->Delete("AAA");
638 txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
639 Slice key_slice("AAAZZZ");
640 Slice value_slices[2] = {Slice("bar"), Slice("bar")};
641 txn->Put(handles[2], SliceParts(&key_slice, 1), SliceParts(value_slices, 2));
642
643 ASSERT_EQ(3, txn->GetNumKeys());
644
645 // Txn should commit
646 s = txn->Commit();
647 ASSERT_OK(s);
648 s = txn_db->Get(read_options, "AAA", &value);
649 ASSERT_TRUE(s.IsNotFound());
650 s = txn_db->Get(read_options, handles[2], "AAAZZZ", &value);
651 ASSERT_EQ(value, "barbar");
652
653 Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
654 Slice value_slice("barbarbar");
655 // This write will cause a conflict with the earlier batch write
656 txn2->Put(handles[1], SliceParts(key_slices, 3), SliceParts(&value_slice, 1));
657
658 txn2->Delete(handles[2], "XXX");
659 txn2->Delete(handles[1], "XXX");
660 s = txn2->GetForUpdate(snapshot_read_options, handles[1], "AAA", &value);
661 ASSERT_TRUE(s.IsNotFound());
662
663 // Verify txn did not commit
664 s = txn2->Commit();
665 ASSERT_TRUE(s.IsBusy());
666 s = txn_db->Get(read_options, handles[1], "AAAZZZ", &value);
667 ASSERT_EQ(value, "barbar");
668
669 delete txn;
670 delete txn2;
671
672 txn = txn_db->BeginTransaction(write_options, txn_options);
673 snapshot_read_options.snapshot = txn->GetSnapshot();
674
675 txn2 = txn_db->BeginTransaction(write_options, txn_options);
676 ASSERT_TRUE(txn);
677
678 std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
679 handles[0], handles[2]};
680 std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
681 std::vector<std::string> values(4);
682
683 std::vector<Status> results = txn->MultiGetForUpdate(
684 snapshot_read_options, multiget_cfh, multiget_keys, &values);
685 ASSERT_OK(results[0]);
686 ASSERT_OK(results[1]);
687 ASSERT_OK(results[2]);
688 ASSERT_TRUE(results[3].IsNotFound());
689 ASSERT_EQ(values[0], "bar");
690 ASSERT_EQ(values[1], "barbar");
691 ASSERT_EQ(values[2], "foo");
692
693 txn->Delete(handles[2], "ZZZ");
694 txn->Put(handles[2], "ZZZ", "YYY");
695 txn->Put(handles[2], "ZZZ", "YYYY");
696 txn->Delete(handles[2], "ZZZ");
697 txn->Put(handles[2], "AAAZZZ", "barbarbar");
698
699 ASSERT_EQ(5, txn->GetNumKeys());
700
701 // Txn should commit
702 s = txn->Commit();
703 ASSERT_OK(s);
704 s = txn_db->Get(read_options, handles[2], "ZZZ", &value);
705 ASSERT_TRUE(s.IsNotFound());
706
707 // Put a key which will conflict with the next txn using the previous snapshot
708 txn_db->Put(write_options, handles[2], "foo", "000");
709
710 results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
711 multiget_keys, &values);
712 ASSERT_OK(results[0]);
713 ASSERT_OK(results[1]);
714 ASSERT_OK(results[2]);
715 ASSERT_TRUE(results[3].IsNotFound());
716 ASSERT_EQ(values[0], "bar");
717 ASSERT_EQ(values[1], "barbar");
718 ASSERT_EQ(values[2], "foo");
719
720 // Verify Txn Did not Commit
721 s = txn2->Commit();
722 ASSERT_TRUE(s.IsBusy());
723
724 s = txn_db->DropColumnFamily(handles[1]);
725 ASSERT_OK(s);
726 s = txn_db->DropColumnFamily(handles[2]);
727 ASSERT_OK(s);
728
729 delete txn;
730 delete txn2;
731
732 for (auto handle : handles) {
733 delete handle;
734 }
735 }
736
737 TEST_P(OptimisticTransactionTest, EmptyTest) {
738 WriteOptions write_options;
739 ReadOptions read_options;
740 string value;
741 Status s;
742
743 s = txn_db->Put(write_options, "aaa", "aaa");
744 ASSERT_OK(s);
745
746 Transaction* txn = txn_db->BeginTransaction(write_options);
747 s = txn->Commit();
748 ASSERT_OK(s);
749 delete txn;
750
751 txn = txn_db->BeginTransaction(write_options);
752 txn->Rollback();
753 delete txn;
754
755 txn = txn_db->BeginTransaction(write_options);
756 s = txn->GetForUpdate(read_options, "aaa", &value);
757 ASSERT_EQ(value, "aaa");
758
759 s = txn->Commit();
760 ASSERT_OK(s);
761 delete txn;
762
763 txn = txn_db->BeginTransaction(write_options);
764 txn->SetSnapshot();
765 s = txn->GetForUpdate(read_options, "aaa", &value);
766 ASSERT_EQ(value, "aaa");
767
768 s = txn_db->Put(write_options, "aaa", "xxx");
769 s = txn->Commit();
770 ASSERT_TRUE(s.IsBusy());
771 delete txn;
772 }
773
774 TEST_P(OptimisticTransactionTest, PredicateManyPreceders) {
775 WriteOptions write_options;
776 ReadOptions read_options1, read_options2;
777 OptimisticTransactionOptions txn_options;
778 string value;
779 Status s;
780
781 txn_options.set_snapshot = true;
782 Transaction* txn1 = txn_db->BeginTransaction(write_options, txn_options);
783 read_options1.snapshot = txn1->GetSnapshot();
784
785 Transaction* txn2 = txn_db->BeginTransaction(write_options);
786 txn2->SetSnapshot();
787 read_options2.snapshot = txn2->GetSnapshot();
788
789 std::vector<Slice> multiget_keys = {"1", "2", "3"};
790 std::vector<std::string> multiget_values;
791
792 std::vector<Status> results =
793 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
794 ASSERT_TRUE(results[1].IsNotFound());
795
796 txn2->Put("2", "x");
797
798 s = txn2->Commit();
799 ASSERT_OK(s);
800
801 multiget_values.clear();
802 results =
803 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
804 ASSERT_TRUE(results[1].IsNotFound());
805
806 // should not commit since txn2 wrote a key txn has read
807 s = txn1->Commit();
808 ASSERT_TRUE(s.IsBusy());
809
810 delete txn1;
811 delete txn2;
812
813 txn1 = txn_db->BeginTransaction(write_options, txn_options);
814 read_options1.snapshot = txn1->GetSnapshot();
815
816 txn2 = txn_db->BeginTransaction(write_options, txn_options);
817 read_options2.snapshot = txn2->GetSnapshot();
818
819 txn1->Put("4", "x");
820
821 txn2->Delete("4");
822
823 // txn1 can commit since txn2's delete hasn't happened yet (it's just batched)
824 s = txn1->Commit();
825 ASSERT_OK(s);
826
827 s = txn2->GetForUpdate(read_options2, "4", &value);
828 ASSERT_TRUE(s.IsNotFound());
829
830 // txn2 cannot commit since txn1 changed "4"
831 s = txn2->Commit();
832 ASSERT_TRUE(s.IsBusy());
833
834 delete txn1;
835 delete txn2;
836 }
837
838 TEST_P(OptimisticTransactionTest, LostUpdate) {
839 WriteOptions write_options;
840 ReadOptions read_options, read_options1, read_options2;
841 OptimisticTransactionOptions txn_options;
842 string value;
843 Status s;
844
845 // Test 2 transactions writing to the same key in multiple orders and
846 // with/without snapshots
847
848 Transaction* txn1 = txn_db->BeginTransaction(write_options);
849 Transaction* txn2 = txn_db->BeginTransaction(write_options);
850
851 txn1->Put("1", "1");
852 txn2->Put("1", "2");
853
854 s = txn1->Commit();
855 ASSERT_OK(s);
856
857 s = txn2->Commit();
858 ASSERT_TRUE(s.IsBusy());
859
860 delete txn1;
861 delete txn2;
862
863 txn_options.set_snapshot = true;
864 txn1 = txn_db->BeginTransaction(write_options, txn_options);
865 read_options1.snapshot = txn1->GetSnapshot();
866
867 txn2 = txn_db->BeginTransaction(write_options, txn_options);
868 read_options2.snapshot = txn2->GetSnapshot();
869
870 txn1->Put("1", "3");
871 txn2->Put("1", "4");
872
873 s = txn1->Commit();
874 ASSERT_OK(s);
875
876 s = txn2->Commit();
877 ASSERT_TRUE(s.IsBusy());
878
879 delete txn1;
880 delete txn2;
881
882 txn1 = txn_db->BeginTransaction(write_options, txn_options);
883 read_options1.snapshot = txn1->GetSnapshot();
884
885 txn2 = txn_db->BeginTransaction(write_options, txn_options);
886 read_options2.snapshot = txn2->GetSnapshot();
887
888 txn1->Put("1", "5");
889 s = txn1->Commit();
890 ASSERT_OK(s);
891
892 txn2->Put("1", "6");
893 s = txn2->Commit();
894 ASSERT_TRUE(s.IsBusy());
895
896 delete txn1;
897 delete txn2;
898
899 txn1 = txn_db->BeginTransaction(write_options, txn_options);
900 read_options1.snapshot = txn1->GetSnapshot();
901
902 txn2 = txn_db->BeginTransaction(write_options, txn_options);
903 read_options2.snapshot = txn2->GetSnapshot();
904
905 txn1->Put("1", "5");
906 s = txn1->Commit();
907 ASSERT_OK(s);
908
909 txn2->SetSnapshot();
910 txn2->Put("1", "6");
911 s = txn2->Commit();
912 ASSERT_OK(s);
913
914 delete txn1;
915 delete txn2;
916
917 txn1 = txn_db->BeginTransaction(write_options);
918 txn2 = txn_db->BeginTransaction(write_options);
919
920 txn1->Put("1", "7");
921 s = txn1->Commit();
922 ASSERT_OK(s);
923
924 txn2->Put("1", "8");
925 s = txn2->Commit();
926 ASSERT_OK(s);
927
928 delete txn1;
929 delete txn2;
930
931 s = txn_db->Get(read_options, "1", &value);
932 ASSERT_OK(s);
933 ASSERT_EQ(value, "8");
934 }
935
936 TEST_P(OptimisticTransactionTest, UntrackedWrites) {
937 WriteOptions write_options;
938 ReadOptions read_options;
939 string value;
940 Status s;
941
942 // Verify transaction rollback works for untracked keys.
943 Transaction* txn = txn_db->BeginTransaction(write_options);
944 txn->PutUntracked("untracked", "0");
945 txn->Rollback();
946 s = txn_db->Get(read_options, "untracked", &value);
947 ASSERT_TRUE(s.IsNotFound());
948
949 delete txn;
950 txn = txn_db->BeginTransaction(write_options);
951
952 txn->Put("tracked", "1");
953 txn->PutUntracked("untracked", "1");
954 txn->MergeUntracked("untracked", "2");
955 txn->DeleteUntracked("untracked");
956
957 // Write to the untracked key outside of the transaction and verify
958 // it doesn't prevent the transaction from committing.
959 s = txn_db->Put(write_options, "untracked", "x");
960 ASSERT_OK(s);
961
962 s = txn->Commit();
963 ASSERT_OK(s);
964
965 s = txn_db->Get(read_options, "untracked", &value);
966 ASSERT_TRUE(s.IsNotFound());
967
968 delete txn;
969 txn = txn_db->BeginTransaction(write_options);
970
971 txn->Put("tracked", "10");
972 txn->PutUntracked("untracked", "A");
973
974 // Write to tracked key outside of the transaction and verify that the
975 // untracked keys are not written when the commit fails.
976 s = txn_db->Delete(write_options, "tracked");
977
978 s = txn->Commit();
979 ASSERT_TRUE(s.IsBusy());
980
981 s = txn_db->Get(read_options, "untracked", &value);
982 ASSERT_TRUE(s.IsNotFound());
983
984 delete txn;
985 }
986
987 TEST_P(OptimisticTransactionTest, IteratorTest) {
988 WriteOptions write_options;
989 ReadOptions read_options, snapshot_read_options;
990 OptimisticTransactionOptions txn_options;
991 string value;
992 Status s;
993
994 // Write some keys to the db
995 s = txn_db->Put(write_options, "A", "a");
996 ASSERT_OK(s);
997
998 s = txn_db->Put(write_options, "G", "g");
999 ASSERT_OK(s);
1000
1001 s = txn_db->Put(write_options, "F", "f");
1002 ASSERT_OK(s);
1003
1004 s = txn_db->Put(write_options, "C", "c");
1005 ASSERT_OK(s);
1006
1007 s = txn_db->Put(write_options, "D", "d");
1008 ASSERT_OK(s);
1009
1010 Transaction* txn = txn_db->BeginTransaction(write_options);
1011 ASSERT_TRUE(txn);
1012
1013 // Write some keys in a txn
1014 s = txn->Put("B", "b");
1015 ASSERT_OK(s);
1016
1017 s = txn->Put("H", "h");
1018 ASSERT_OK(s);
1019
1020 s = txn->Delete("D");
1021 ASSERT_OK(s);
1022
1023 s = txn->Put("E", "e");
1024 ASSERT_OK(s);
1025
1026 txn->SetSnapshot();
1027 const Snapshot* snapshot = txn->GetSnapshot();
1028
1029 // Write some keys to the db after the snapshot
1030 s = txn_db->Put(write_options, "BB", "xx");
1031 ASSERT_OK(s);
1032
1033 s = txn_db->Put(write_options, "C", "xx");
1034 ASSERT_OK(s);
1035
1036 read_options.snapshot = snapshot;
1037 Iterator* iter = txn->GetIterator(read_options);
1038 ASSERT_OK(iter->status());
1039 iter->SeekToFirst();
1040
1041 // Read all keys via iter and lock them all
1042 std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
1043 for (int i = 0; i < 7; i++) {
1044 ASSERT_OK(iter->status());
1045 ASSERT_TRUE(iter->Valid());
1046 ASSERT_EQ(results[i], iter->value().ToString());
1047
1048 s = txn->GetForUpdate(read_options, iter->key(), nullptr);
1049 ASSERT_OK(s);
1050
1051 iter->Next();
1052 }
1053 ASSERT_FALSE(iter->Valid());
1054
1055 iter->Seek("G");
1056 ASSERT_OK(iter->status());
1057 ASSERT_TRUE(iter->Valid());
1058 ASSERT_EQ("g", iter->value().ToString());
1059
1060 iter->Prev();
1061 ASSERT_OK(iter->status());
1062 ASSERT_TRUE(iter->Valid());
1063 ASSERT_EQ("f", iter->value().ToString());
1064
1065 iter->Seek("D");
1066 ASSERT_OK(iter->status());
1067 ASSERT_TRUE(iter->Valid());
1068 ASSERT_EQ("e", iter->value().ToString());
1069
1070 iter->Seek("C");
1071 ASSERT_OK(iter->status());
1072 ASSERT_TRUE(iter->Valid());
1073 ASSERT_EQ("c", iter->value().ToString());
1074
1075 iter->Next();
1076 ASSERT_OK(iter->status());
1077 ASSERT_TRUE(iter->Valid());
1078 ASSERT_EQ("e", iter->value().ToString());
1079
1080 iter->Seek("");
1081 ASSERT_OK(iter->status());
1082 ASSERT_TRUE(iter->Valid());
1083 ASSERT_EQ("a", iter->value().ToString());
1084
1085 iter->Seek("X");
1086 ASSERT_OK(iter->status());
1087 ASSERT_FALSE(iter->Valid());
1088
1089 iter->SeekToLast();
1090 ASSERT_OK(iter->status());
1091 ASSERT_TRUE(iter->Valid());
1092 ASSERT_EQ("h", iter->value().ToString());
1093
1094 // key "C" was modified in the db after txn's snapshot. txn will not commit.
1095 s = txn->Commit();
1096 ASSERT_TRUE(s.IsBusy());
1097
1098 delete iter;
1099 delete txn;
1100 }
1101
1102 TEST_P(OptimisticTransactionTest, SavepointTest) {
1103 WriteOptions write_options;
1104 ReadOptions read_options, snapshot_read_options;
1105 OptimisticTransactionOptions txn_options;
1106 string value;
1107 Status s;
1108
1109 Transaction* txn = txn_db->BeginTransaction(write_options);
1110 ASSERT_TRUE(txn);
1111
1112 s = txn->RollbackToSavePoint();
1113 ASSERT_TRUE(s.IsNotFound());
1114
1115 txn->SetSavePoint(); // 1
1116
1117 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
1118 s = txn->RollbackToSavePoint();
1119 ASSERT_TRUE(s.IsNotFound());
1120
1121 s = txn->Put("B", "b");
1122 ASSERT_OK(s);
1123
1124 s = txn->Commit();
1125 ASSERT_OK(s);
1126
1127 s = txn_db->Get(read_options, "B", &value);
1128 ASSERT_OK(s);
1129 ASSERT_EQ("b", value);
1130
1131 delete txn;
1132 txn = txn_db->BeginTransaction(write_options);
1133 ASSERT_TRUE(txn);
1134
1135 s = txn->Put("A", "a");
1136 ASSERT_OK(s);
1137
1138 s = txn->Put("B", "bb");
1139 ASSERT_OK(s);
1140
1141 s = txn->Put("C", "c");
1142 ASSERT_OK(s);
1143
1144 txn->SetSavePoint(); // 2
1145
1146 s = txn->Delete("B");
1147 ASSERT_OK(s);
1148
1149 s = txn->Put("C", "cc");
1150 ASSERT_OK(s);
1151
1152 s = txn->Put("D", "d");
1153 ASSERT_OK(s);
1154
1155 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
1156
1157 s = txn->Get(read_options, "A", &value);
1158 ASSERT_OK(s);
1159 ASSERT_EQ("a", value);
1160
1161 s = txn->Get(read_options, "B", &value);
1162 ASSERT_OK(s);
1163 ASSERT_EQ("bb", value);
1164
1165 s = txn->Get(read_options, "C", &value);
1166 ASSERT_OK(s);
1167 ASSERT_EQ("c", value);
1168
1169 s = txn->Get(read_options, "D", &value);
1170 ASSERT_TRUE(s.IsNotFound());
1171
1172 s = txn->Put("A", "a");
1173 ASSERT_OK(s);
1174
1175 s = txn->Put("E", "e");
1176 ASSERT_OK(s);
1177
1178 // Rollback to beginning of txn
1179 s = txn->RollbackToSavePoint();
1180 ASSERT_TRUE(s.IsNotFound());
1181 txn->Rollback();
1182
1183 s = txn->Get(read_options, "A", &value);
1184 ASSERT_TRUE(s.IsNotFound());
1185
1186 s = txn->Get(read_options, "B", &value);
1187 ASSERT_OK(s);
1188 ASSERT_EQ("b", value);
1189
1190 s = txn->Get(read_options, "D", &value);
1191 ASSERT_TRUE(s.IsNotFound());
1192
1193 s = txn->Get(read_options, "D", &value);
1194 ASSERT_TRUE(s.IsNotFound());
1195
1196 s = txn->Get(read_options, "E", &value);
1197 ASSERT_TRUE(s.IsNotFound());
1198
1199 s = txn->Put("A", "aa");
1200 ASSERT_OK(s);
1201
1202 s = txn->Put("F", "f");
1203 ASSERT_OK(s);
1204
1205 txn->SetSavePoint(); // 3
1206 txn->SetSavePoint(); // 4
1207
1208 s = txn->Put("G", "g");
1209 ASSERT_OK(s);
1210
1211 s = txn->Delete("F");
1212 ASSERT_OK(s);
1213
1214 s = txn->Delete("B");
1215 ASSERT_OK(s);
1216
1217 s = txn->Get(read_options, "A", &value);
1218 ASSERT_OK(s);
1219 ASSERT_EQ("aa", value);
1220
1221 s = txn->Get(read_options, "F", &value);
1222 ASSERT_TRUE(s.IsNotFound());
1223
1224 s = txn->Get(read_options, "B", &value);
1225 ASSERT_TRUE(s.IsNotFound());
1226
1227 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
1228
1229 s = txn->Get(read_options, "F", &value);
1230 ASSERT_OK(s);
1231 ASSERT_EQ("f", value);
1232
1233 s = txn->Get(read_options, "G", &value);
1234 ASSERT_TRUE(s.IsNotFound());
1235
1236 s = txn->Commit();
1237 ASSERT_OK(s);
1238
1239 s = txn_db->Get(read_options, "F", &value);
1240 ASSERT_OK(s);
1241 ASSERT_EQ("f", value);
1242
1243 s = txn_db->Get(read_options, "G", &value);
1244 ASSERT_TRUE(s.IsNotFound());
1245
1246 s = txn_db->Get(read_options, "A", &value);
1247 ASSERT_OK(s);
1248 ASSERT_EQ("aa", value);
1249
1250 s = txn_db->Get(read_options, "B", &value);
1251 ASSERT_OK(s);
1252 ASSERT_EQ("b", value);
1253
1254 s = txn_db->Get(read_options, "C", &value);
1255 ASSERT_TRUE(s.IsNotFound());
1256
1257 s = txn_db->Get(read_options, "D", &value);
1258 ASSERT_TRUE(s.IsNotFound());
1259
1260 s = txn_db->Get(read_options, "E", &value);
1261 ASSERT_TRUE(s.IsNotFound());
1262
1263 delete txn;
1264 }
1265
1266 TEST_P(OptimisticTransactionTest, UndoGetForUpdateTest) {
1267 WriteOptions write_options;
1268 ReadOptions read_options, snapshot_read_options;
1269 OptimisticTransactionOptions txn_options;
1270 string value;
1271 Status s;
1272
1273 txn_db->Put(write_options, "A", "");
1274
1275 Transaction* txn1 = txn_db->BeginTransaction(write_options);
1276 ASSERT_TRUE(txn1);
1277
1278 s = txn1->GetForUpdate(read_options, "A", &value);
1279 ASSERT_OK(s);
1280
1281 txn1->UndoGetForUpdate("A");
1282
1283 Transaction* txn2 = txn_db->BeginTransaction(write_options);
1284 txn2->Put("A", "x");
1285 s = txn2->Commit();
1286 ASSERT_OK(s);
1287 delete txn2;
1288
1289 // Verify that txn1 can commit since A isn't conflict checked
1290 s = txn1->Commit();
1291 ASSERT_OK(s);
1292 delete txn1;
1293
1294 txn1 = txn_db->BeginTransaction(write_options);
1295 txn1->Put("A", "a");
1296
1297 s = txn1->GetForUpdate(read_options, "A", &value);
1298 ASSERT_OK(s);
1299
1300 txn1->UndoGetForUpdate("A");
1301
1302 txn2 = txn_db->BeginTransaction(write_options);
1303 txn2->Put("A", "x");
1304 s = txn2->Commit();
1305 ASSERT_OK(s);
1306 delete txn2;
1307
1308 // Verify that txn1 cannot commit since A will still be conflict checked
1309 s = txn1->Commit();
1310 ASSERT_TRUE(s.IsBusy());
1311 delete txn1;
1312
1313 txn1 = txn_db->BeginTransaction(write_options);
1314
1315 s = txn1->GetForUpdate(read_options, "A", &value);
1316 ASSERT_OK(s);
1317 s = txn1->GetForUpdate(read_options, "A", &value);
1318 ASSERT_OK(s);
1319
1320 txn1->UndoGetForUpdate("A");
1321
1322 txn2 = txn_db->BeginTransaction(write_options);
1323 txn2->Put("A", "x");
1324 s = txn2->Commit();
1325 ASSERT_OK(s);
1326 delete txn2;
1327
1328 // Verify that txn1 cannot commit since A will still be conflict checked
1329 s = txn1->Commit();
1330 ASSERT_TRUE(s.IsBusy());
1331 delete txn1;
1332
1333 txn1 = txn_db->BeginTransaction(write_options);
1334
1335 s = txn1->GetForUpdate(read_options, "A", &value);
1336 ASSERT_OK(s);
1337 s = txn1->GetForUpdate(read_options, "A", &value);
1338 ASSERT_OK(s);
1339
1340 txn1->UndoGetForUpdate("A");
1341 txn1->UndoGetForUpdate("A");
1342
1343 txn2 = txn_db->BeginTransaction(write_options);
1344 txn2->Put("A", "x");
1345 s = txn2->Commit();
1346 ASSERT_OK(s);
1347 delete txn2;
1348
1349 // Verify that txn1 can commit since A isn't conflict checked
1350 s = txn1->Commit();
1351 ASSERT_OK(s);
1352 delete txn1;
1353
1354 txn1 = txn_db->BeginTransaction(write_options);
1355
1356 s = txn1->GetForUpdate(read_options, "A", &value);
1357 ASSERT_OK(s);
1358
1359 txn1->SetSavePoint();
1360 txn1->UndoGetForUpdate("A");
1361
1362 txn2 = txn_db->BeginTransaction(write_options);
1363 txn2->Put("A", "x");
1364 s = txn2->Commit();
1365 ASSERT_OK(s);
1366 delete txn2;
1367
1368 // Verify that txn1 cannot commit since A will still be conflict checked
1369 s = txn1->Commit();
1370 ASSERT_TRUE(s.IsBusy());
1371 delete txn1;
1372
1373 txn1 = txn_db->BeginTransaction(write_options);
1374
1375 s = txn1->GetForUpdate(read_options, "A", &value);
1376 ASSERT_OK(s);
1377
1378 txn1->SetSavePoint();
1379 s = txn1->GetForUpdate(read_options, "A", &value);
1380 ASSERT_OK(s);
1381 txn1->UndoGetForUpdate("A");
1382
1383 txn2 = txn_db->BeginTransaction(write_options);
1384 txn2->Put("A", "x");
1385 s = txn2->Commit();
1386 ASSERT_OK(s);
1387 delete txn2;
1388
1389 // Verify that txn1 cannot commit since A will still be conflict checked
1390 s = txn1->Commit();
1391 ASSERT_TRUE(s.IsBusy());
1392 delete txn1;
1393
1394 txn1 = txn_db->BeginTransaction(write_options);
1395
1396 s = txn1->GetForUpdate(read_options, "A", &value);
1397 ASSERT_OK(s);
1398
1399 txn1->SetSavePoint();
1400 s = txn1->GetForUpdate(read_options, "A", &value);
1401 ASSERT_OK(s);
1402 txn1->UndoGetForUpdate("A");
1403
1404 txn1->RollbackToSavePoint();
1405 txn1->UndoGetForUpdate("A");
1406
1407 txn2 = txn_db->BeginTransaction(write_options);
1408 txn2->Put("A", "x");
1409 s = txn2->Commit();
1410 ASSERT_OK(s);
1411 delete txn2;
1412
1413 // Verify that txn1 can commit since A isn't conflict checked
1414 s = txn1->Commit();
1415 ASSERT_OK(s);
1416 delete txn1;
1417 }
1418
1419 namespace {
1420 Status OptimisticTransactionStressTestInserter(OptimisticTransactionDB* db,
1421 const size_t num_transactions,
1422 const size_t num_sets,
1423 const size_t num_keys_per_set) {
1424 size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
1425 Random64 _rand(seed);
1426 WriteOptions write_options;
1427 ReadOptions read_options;
1428 OptimisticTransactionOptions txn_options;
1429 txn_options.set_snapshot = true;
1430
1431 RandomTransactionInserter inserter(&_rand, write_options, read_options,
1432 num_keys_per_set,
1433 static_cast<uint16_t>(num_sets));
1434
1435 for (size_t t = 0; t < num_transactions; t++) {
1436 bool success = inserter.OptimisticTransactionDBInsert(db, txn_options);
1437 if (!success) {
1438 // unexpected failure
1439 return inserter.GetLastStatus();
1440 }
1441 }
1442
1443 // Make sure at least some of the transactions succeeded. It's ok if
1444 // some failed due to write-conflicts.
1445 if (inserter.GetFailureCount() > num_transactions / 2) {
1446 return Status::TryAgain("Too many transactions failed! " +
1447 std::to_string(inserter.GetFailureCount()) + " / " +
1448 std::to_string(num_transactions));
1449 }
1450
1451 return Status::OK();
1452 }
1453 } // namespace
1454
1455 TEST_P(OptimisticTransactionTest, OptimisticTransactionStressTest) {
1456 const size_t num_threads = 4;
1457 const size_t num_transactions_per_thread = 10000;
1458 const size_t num_sets = 3;
1459 const size_t num_keys_per_set = 100;
1460 // Setting the key-space to be 100 keys should cause enough write-conflicts
1461 // to make this test interesting.
1462
1463 std::vector<port::Thread> threads;
1464
1465 std::function<void()> call_inserter = [&] {
1466 ASSERT_OK(OptimisticTransactionStressTestInserter(
1467 txn_db, num_transactions_per_thread, num_sets, num_keys_per_set));
1468 };
1469
1470 // Create N threads that use RandomTransactionInserter to write
1471 // many transactions.
1472 for (uint32_t i = 0; i < num_threads; i++) {
1473 threads.emplace_back(call_inserter);
1474 }
1475
1476 // Wait for all threads to run
1477 for (auto& t : threads) {
1478 t.join();
1479 }
1480
1481 // Verify that data is consistent
1482 Status s = RandomTransactionInserter::Verify(txn_db, num_sets);
1483 ASSERT_OK(s);
1484 }
1485
1486 TEST_P(OptimisticTransactionTest, SequenceNumberAfterRecoverTest) {
1487 WriteOptions write_options;
1488 OptimisticTransactionOptions transaction_options;
1489
1490 Transaction* transaction(txn_db->BeginTransaction(write_options, transaction_options));
1491 Status s = transaction->Put("foo", "val");
1492 ASSERT_OK(s);
1493 s = transaction->Put("foo2", "val");
1494 ASSERT_OK(s);
1495 s = transaction->Put("foo3", "val");
1496 ASSERT_OK(s);
1497 s = transaction->Commit();
1498 ASSERT_OK(s);
1499 delete transaction;
1500
1501 Reopen();
1502 transaction = txn_db->BeginTransaction(write_options, transaction_options);
1503 s = transaction->Put("bar", "val");
1504 ASSERT_OK(s);
1505 s = transaction->Put("bar2", "val");
1506 ASSERT_OK(s);
1507 s = transaction->Commit();
1508 ASSERT_OK(s);
1509
1510 delete transaction;
1511 }
1512
1513 INSTANTIATE_TEST_CASE_P(
1514 InstanceOccGroup, OptimisticTransactionTest,
1515 testing::Values(OccValidationPolicy::kValidateSerial,
1516 OccValidationPolicy::kValidateParallel));
1517
1518 } // namespace ROCKSDB_NAMESPACE
1519
1520 int main(int argc, char** argv) {
1521 ::testing::InitGoogleTest(&argc, argv);
1522 return RUN_ALL_TESTS();
1523 }
1524
1525 #else
1526 #include <stdio.h>
1527
1528 int main(int /*argc*/, char** /*argv*/) {
1529 fprintf(
1530 stderr,
1531 "SKIPPED as optimistic_transaction is not supported in ROCKSDB_LITE\n");
1532 return 0;
1533 }
1534
1535 #endif // !ROCKSDB_LITE