]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/optimistic_transaction_test.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / utilities / transactions / optimistic_transaction_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5
6 #ifndef ROCKSDB_LITE
7
8 #include <functional>
9 #include <string>
10 #include <thread>
11
12 #include "rocksdb/db.h"
13 #include "rocksdb/utilities/optimistic_transaction_db.h"
14 #include "rocksdb/utilities/transaction.h"
15 #include "util/crc32c.h"
16 #include "util/logging.h"
17 #include "util/random.h"
18 #include "util/testharness.h"
19 #include "util/transaction_test_util.h"
20 #include "port/port.h"
21
22 using std::string;
23
24 namespace rocksdb {
25
26 class OptimisticTransactionTest : public testing::Test {
27 public:
28 OptimisticTransactionDB* txn_db;
29 DB* db;
30 string dbname;
31 Options options;
32
33 OptimisticTransactionTest() {
34 options.create_if_missing = true;
35 options.max_write_buffer_number = 2;
36 dbname = test::TmpDir() + "/optimistic_transaction_testdb";
37
38 DestroyDB(dbname, options);
39 Open();
40 }
41 ~OptimisticTransactionTest() {
42 delete txn_db;
43 DestroyDB(dbname, options);
44 }
45
46 void Reopen() {
47 delete txn_db;
48 Open();
49 }
50
51 private:
52 void Open() {
53 Status s = OptimisticTransactionDB::Open(options, dbname, &txn_db);
54 assert(s.ok());
55 db = txn_db->GetBaseDB();
56 }
57 };
58
59 TEST_F(OptimisticTransactionTest, SuccessTest) {
60 WriteOptions write_options;
61 ReadOptions read_options;
62 string value;
63 Status s;
64
65 db->Put(write_options, Slice("foo"), Slice("bar"));
66 db->Put(write_options, Slice("foo2"), Slice("bar"));
67
68 Transaction* txn = txn_db->BeginTransaction(write_options);
69 ASSERT_TRUE(txn);
70
71 txn->GetForUpdate(read_options, "foo", &value);
72 ASSERT_EQ(value, "bar");
73
74 txn->Put(Slice("foo"), Slice("bar2"));
75
76 txn->GetForUpdate(read_options, "foo", &value);
77 ASSERT_EQ(value, "bar2");
78
79 s = txn->Commit();
80 ASSERT_OK(s);
81
82 db->Get(read_options, "foo", &value);
83 ASSERT_EQ(value, "bar2");
84
85 delete txn;
86 }
87
88 TEST_F(OptimisticTransactionTest, WriteConflictTest) {
89 WriteOptions write_options;
90 ReadOptions read_options;
91 string value;
92 Status s;
93
94 db->Put(write_options, "foo", "bar");
95 db->Put(write_options, "foo2", "bar");
96
97 Transaction* txn = txn_db->BeginTransaction(write_options);
98 ASSERT_TRUE(txn);
99
100 txn->Put("foo", "bar2");
101
102 // This Put outside of a transaction will conflict with the previous write
103 s = db->Put(write_options, "foo", "barz");
104 ASSERT_OK(s);
105
106 s = db->Get(read_options, "foo", &value);
107 ASSERT_EQ(value, "barz");
108 ASSERT_EQ(1, txn->GetNumKeys());
109
110 s = txn->Commit();
111 ASSERT_TRUE(s.IsBusy()); // Txn should not commit
112
113 // Verify that transaction did not write anything
114 db->Get(read_options, "foo", &value);
115 ASSERT_EQ(value, "barz");
116 db->Get(read_options, "foo2", &value);
117 ASSERT_EQ(value, "bar");
118
119 delete txn;
120 }
121
122 TEST_F(OptimisticTransactionTest, WriteConflictTest2) {
123 WriteOptions write_options;
124 ReadOptions read_options;
125 OptimisticTransactionOptions txn_options;
126 string value;
127 Status s;
128
129 db->Put(write_options, "foo", "bar");
130 db->Put(write_options, "foo2", "bar");
131
132 txn_options.set_snapshot = true;
133 Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
134 ASSERT_TRUE(txn);
135
136 // This Put outside of a transaction will conflict with a later write
137 s = db->Put(write_options, "foo", "barz");
138 ASSERT_OK(s);
139
140 txn->Put("foo", "bar2"); // Conflicts with write done after snapshot taken
141
142 s = db->Get(read_options, "foo", &value);
143 ASSERT_EQ(value, "barz");
144
145 s = txn->Commit();
146 ASSERT_TRUE(s.IsBusy()); // Txn should not commit
147
148 // Verify that transaction did not write anything
149 db->Get(read_options, "foo", &value);
150 ASSERT_EQ(value, "barz");
151 db->Get(read_options, "foo2", &value);
152 ASSERT_EQ(value, "bar");
153
154 delete txn;
155 }
156
157 TEST_F(OptimisticTransactionTest, ReadConflictTest) {
158 WriteOptions write_options;
159 ReadOptions read_options, snapshot_read_options;
160 OptimisticTransactionOptions txn_options;
161 string value;
162 Status s;
163
164 db->Put(write_options, "foo", "bar");
165 db->Put(write_options, "foo2", "bar");
166
167 txn_options.set_snapshot = true;
168 Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
169 ASSERT_TRUE(txn);
170
171 txn->SetSnapshot();
172 snapshot_read_options.snapshot = txn->GetSnapshot();
173
174 txn->GetForUpdate(snapshot_read_options, "foo", &value);
175 ASSERT_EQ(value, "bar");
176
177 // This Put outside of a transaction will conflict with the previous read
178 s = db->Put(write_options, "foo", "barz");
179 ASSERT_OK(s);
180
181 s = db->Get(read_options, "foo", &value);
182 ASSERT_EQ(value, "barz");
183
184 s = txn->Commit();
185 ASSERT_TRUE(s.IsBusy()); // Txn should not commit
186
187 // Verify that transaction did not write anything
188 txn->GetForUpdate(read_options, "foo", &value);
189 ASSERT_EQ(value, "barz");
190 txn->GetForUpdate(read_options, "foo2", &value);
191 ASSERT_EQ(value, "bar");
192
193 delete txn;
194 }
195
196 TEST_F(OptimisticTransactionTest, TxnOnlyTest) {
197 // Test to make sure transactions work when there are no other writes in an
198 // empty db.
199
200 WriteOptions write_options;
201 ReadOptions read_options;
202 string value;
203 Status s;
204
205 Transaction* txn = txn_db->BeginTransaction(write_options);
206 ASSERT_TRUE(txn);
207
208 txn->Put("x", "y");
209
210 s = txn->Commit();
211 ASSERT_OK(s);
212
213 delete txn;
214 }
215
216 TEST_F(OptimisticTransactionTest, FlushTest) {
217 WriteOptions write_options;
218 ReadOptions read_options, snapshot_read_options;
219 string value;
220 Status s;
221
222 db->Put(write_options, Slice("foo"), Slice("bar"));
223 db->Put(write_options, Slice("foo2"), Slice("bar"));
224
225 Transaction* txn = txn_db->BeginTransaction(write_options);
226 ASSERT_TRUE(txn);
227
228 snapshot_read_options.snapshot = txn->GetSnapshot();
229
230 txn->GetForUpdate(snapshot_read_options, "foo", &value);
231 ASSERT_EQ(value, "bar");
232
233 txn->Put(Slice("foo"), Slice("bar2"));
234
235 txn->GetForUpdate(snapshot_read_options, "foo", &value);
236 ASSERT_EQ(value, "bar2");
237
238 // Put a random key so we have a memtable to flush
239 s = db->Put(write_options, "dummy", "dummy");
240 ASSERT_OK(s);
241
242 // force a memtable flush
243 FlushOptions flush_ops;
244 db->Flush(flush_ops);
245
246 s = txn->Commit();
247 // txn should commit since the flushed table is still in MemtableList History
248 ASSERT_OK(s);
249
250 db->Get(read_options, "foo", &value);
251 ASSERT_EQ(value, "bar2");
252
253 delete txn;
254 }
255
256 TEST_F(OptimisticTransactionTest, FlushTest2) {
257 WriteOptions write_options;
258 ReadOptions read_options, snapshot_read_options;
259 string value;
260 Status s;
261
262 db->Put(write_options, Slice("foo"), Slice("bar"));
263 db->Put(write_options, Slice("foo2"), Slice("bar"));
264
265 Transaction* txn = txn_db->BeginTransaction(write_options);
266 ASSERT_TRUE(txn);
267
268 snapshot_read_options.snapshot = txn->GetSnapshot();
269
270 txn->GetForUpdate(snapshot_read_options, "foo", &value);
271 ASSERT_EQ(value, "bar");
272
273 txn->Put(Slice("foo"), Slice("bar2"));
274
275 txn->GetForUpdate(snapshot_read_options, "foo", &value);
276 ASSERT_EQ(value, "bar2");
277
278 // Put a random key so we have a MemTable to flush
279 s = db->Put(write_options, "dummy", "dummy");
280 ASSERT_OK(s);
281
282 // force a memtable flush
283 FlushOptions flush_ops;
284 db->Flush(flush_ops);
285
286 // Put a random key so we have a MemTable to flush
287 s = db->Put(write_options, "dummy", "dummy2");
288 ASSERT_OK(s);
289
290 // force a memtable flush
291 db->Flush(flush_ops);
292
293 s = db->Put(write_options, "dummy", "dummy3");
294 ASSERT_OK(s);
295
296 // force a memtable flush
297 // Since our test db has max_write_buffer_number=2, this flush will cause
298 // the first memtable to get purged from the MemtableList history.
299 db->Flush(flush_ops);
300
301 s = txn->Commit();
302 // txn should not commit since MemTableList History is not large enough
303 ASSERT_TRUE(s.IsTryAgain());
304
305 db->Get(read_options, "foo", &value);
306 ASSERT_EQ(value, "bar");
307
308 delete txn;
309 }
310
311 TEST_F(OptimisticTransactionTest, NoSnapshotTest) {
312 WriteOptions write_options;
313 ReadOptions read_options;
314 string value;
315 Status s;
316
317 db->Put(write_options, "AAA", "bar");
318
319 Transaction* txn = txn_db->BeginTransaction(write_options);
320 ASSERT_TRUE(txn);
321
322 // Modify key after transaction start
323 db->Put(write_options, "AAA", "bar1");
324
325 // Read and write without a snapshot
326 txn->GetForUpdate(read_options, "AAA", &value);
327 ASSERT_EQ(value, "bar1");
328 txn->Put("AAA", "bar2");
329
330 // Should commit since read/write was done after data changed
331 s = txn->Commit();
332 ASSERT_OK(s);
333
334 txn->GetForUpdate(read_options, "AAA", &value);
335 ASSERT_EQ(value, "bar2");
336
337 delete txn;
338 }
339
340 TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) {
341 WriteOptions write_options;
342 ReadOptions read_options, snapshot_read_options;
343 string value;
344 Status s;
345
346 db->Put(write_options, "AAA", "bar");
347 db->Put(write_options, "BBB", "bar");
348 db->Put(write_options, "CCC", "bar");
349
350 Transaction* txn = txn_db->BeginTransaction(write_options);
351 ASSERT_TRUE(txn);
352
353 db->Put(write_options, "AAA", "bar1");
354
355 // Read and write without a snapshot
356 txn->GetForUpdate(read_options, "AAA", &value);
357 ASSERT_EQ(value, "bar1");
358 txn->Put("AAA", "bar2");
359
360 // Modify BBB before snapshot is taken
361 db->Put(write_options, "BBB", "bar1");
362
363 txn->SetSnapshot();
364 snapshot_read_options.snapshot = txn->GetSnapshot();
365
366 // Read and write with snapshot
367 txn->GetForUpdate(snapshot_read_options, "BBB", &value);
368 ASSERT_EQ(value, "bar1");
369 txn->Put("BBB", "bar2");
370
371 db->Put(write_options, "CCC", "bar1");
372
373 // Set a new snapshot
374 txn->SetSnapshot();
375 snapshot_read_options.snapshot = txn->GetSnapshot();
376
377 // Read and write with snapshot
378 txn->GetForUpdate(snapshot_read_options, "CCC", &value);
379 ASSERT_EQ(value, "bar1");
380 txn->Put("CCC", "bar2");
381
382 s = txn->GetForUpdate(read_options, "AAA", &value);
383 ASSERT_OK(s);
384 ASSERT_EQ(value, "bar2");
385 s = txn->GetForUpdate(read_options, "BBB", &value);
386 ASSERT_OK(s);
387 ASSERT_EQ(value, "bar2");
388 s = txn->GetForUpdate(read_options, "CCC", &value);
389 ASSERT_OK(s);
390 ASSERT_EQ(value, "bar2");
391
392 s = db->Get(read_options, "AAA", &value);
393 ASSERT_OK(s);
394 ASSERT_EQ(value, "bar1");
395 s = db->Get(read_options, "BBB", &value);
396 ASSERT_OK(s);
397 ASSERT_EQ(value, "bar1");
398 s = db->Get(read_options, "CCC", &value);
399 ASSERT_OK(s);
400 ASSERT_EQ(value, "bar1");
401
402 s = txn->Commit();
403 ASSERT_OK(s);
404
405 s = db->Get(read_options, "AAA", &value);
406 ASSERT_OK(s);
407 ASSERT_EQ(value, "bar2");
408 s = db->Get(read_options, "BBB", &value);
409 ASSERT_OK(s);
410 ASSERT_EQ(value, "bar2");
411 s = db->Get(read_options, "CCC", &value);
412 ASSERT_OK(s);
413 ASSERT_EQ(value, "bar2");
414
415 // verify that we track multiple writes to the same key at different snapshots
416 delete txn;
417 txn = txn_db->BeginTransaction(write_options);
418
419 // Potentially conflicting writes
420 db->Put(write_options, "ZZZ", "zzz");
421 db->Put(write_options, "XXX", "xxx");
422
423 txn->SetSnapshot();
424
425 OptimisticTransactionOptions txn_options;
426 txn_options.set_snapshot = true;
427 Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options);
428 txn2->SetSnapshot();
429
430 // This should not conflict in txn since the snapshot is later than the
431 // previous write (spoiler alert: it will later conflict with txn2).
432 txn->Put("ZZZ", "zzzz");
433 s = txn->Commit();
434 ASSERT_OK(s);
435
436 delete txn;
437
438 // This will conflict since the snapshot is earlier than another write to ZZZ
439 txn2->Put("ZZZ", "xxxxx");
440
441 s = txn2->Commit();
442 ASSERT_TRUE(s.IsBusy());
443
444 delete txn2;
445 }
446
447 TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
448 WriteOptions write_options;
449 ReadOptions read_options, snapshot_read_options;
450 OptimisticTransactionOptions txn_options;
451 string value;
452 Status s;
453
454 ColumnFamilyHandle *cfa, *cfb;
455 ColumnFamilyOptions cf_options;
456
457 // Create 2 new column families
458 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
459 ASSERT_OK(s);
460 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
461 ASSERT_OK(s);
462
463 delete cfa;
464 delete cfb;
465 delete txn_db;
466
467 // open DB with three column families
468 std::vector<ColumnFamilyDescriptor> column_families;
469 // have to open default column family
470 column_families.push_back(
471 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
472 // open the new column families
473 column_families.push_back(
474 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
475 column_families.push_back(
476 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
477 std::vector<ColumnFamilyHandle*> handles;
478 s = OptimisticTransactionDB::Open(options, dbname, column_families, &handles,
479 &txn_db);
480 ASSERT_OK(s);
481 db = txn_db->GetBaseDB();
482
483 Transaction* txn = txn_db->BeginTransaction(write_options);
484 ASSERT_TRUE(txn);
485
486 txn->SetSnapshot();
487 snapshot_read_options.snapshot = txn->GetSnapshot();
488
489 txn_options.set_snapshot = true;
490 Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options);
491 ASSERT_TRUE(txn2);
492
493 // Write some data to the db
494 WriteBatch batch;
495 batch.Put("foo", "foo");
496 batch.Put(handles[1], "AAA", "bar");
497 batch.Put(handles[1], "AAAZZZ", "bar");
498 s = db->Write(write_options, &batch);
499 ASSERT_OK(s);
500 db->Delete(write_options, handles[1], "AAAZZZ");
501
502 // These keys do no conflict with existing writes since they're in
503 // different column families
504 txn->Delete("AAA");
505 txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
506 Slice key_slice("AAAZZZ");
507 Slice value_slices[2] = {Slice("bar"), Slice("bar")};
508 txn->Put(handles[2], SliceParts(&key_slice, 1), SliceParts(value_slices, 2));
509
510 ASSERT_EQ(3, txn->GetNumKeys());
511
512 // Txn should commit
513 s = txn->Commit();
514 ASSERT_OK(s);
515 s = db->Get(read_options, "AAA", &value);
516 ASSERT_TRUE(s.IsNotFound());
517 s = db->Get(read_options, handles[2], "AAAZZZ", &value);
518 ASSERT_EQ(value, "barbar");
519
520 Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
521 Slice value_slice("barbarbar");
522 // This write will cause a conflict with the earlier batch write
523 txn2->Put(handles[1], SliceParts(key_slices, 3), SliceParts(&value_slice, 1));
524
525 txn2->Delete(handles[2], "XXX");
526 txn2->Delete(handles[1], "XXX");
527 s = txn2->GetForUpdate(snapshot_read_options, handles[1], "AAA", &value);
528 ASSERT_TRUE(s.IsNotFound());
529
530 // Verify txn did not commit
531 s = txn2->Commit();
532 ASSERT_TRUE(s.IsBusy());
533 s = db->Get(read_options, handles[1], "AAAZZZ", &value);
534 ASSERT_EQ(value, "barbar");
535
536 delete txn;
537 delete txn2;
538
539 txn = txn_db->BeginTransaction(write_options, txn_options);
540 snapshot_read_options.snapshot = txn->GetSnapshot();
541
542 txn2 = txn_db->BeginTransaction(write_options, txn_options);
543 ASSERT_TRUE(txn);
544
545 std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
546 handles[0], handles[2]};
547 std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
548 std::vector<std::string> values(4);
549
550 std::vector<Status> results = txn->MultiGetForUpdate(
551 snapshot_read_options, multiget_cfh, multiget_keys, &values);
552 ASSERT_OK(results[0]);
553 ASSERT_OK(results[1]);
554 ASSERT_OK(results[2]);
555 ASSERT_TRUE(results[3].IsNotFound());
556 ASSERT_EQ(values[0], "bar");
557 ASSERT_EQ(values[1], "barbar");
558 ASSERT_EQ(values[2], "foo");
559
560 txn->Delete(handles[2], "ZZZ");
561 txn->Put(handles[2], "ZZZ", "YYY");
562 txn->Put(handles[2], "ZZZ", "YYYY");
563 txn->Delete(handles[2], "ZZZ");
564 txn->Put(handles[2], "AAAZZZ", "barbarbar");
565
566 ASSERT_EQ(5, txn->GetNumKeys());
567
568 // Txn should commit
569 s = txn->Commit();
570 ASSERT_OK(s);
571 s = db->Get(read_options, handles[2], "ZZZ", &value);
572 ASSERT_TRUE(s.IsNotFound());
573
574 // Put a key which will conflict with the next txn using the previous snapshot
575 db->Put(write_options, handles[2], "foo", "000");
576
577 results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
578 multiget_keys, &values);
579 ASSERT_OK(results[0]);
580 ASSERT_OK(results[1]);
581 ASSERT_OK(results[2]);
582 ASSERT_TRUE(results[3].IsNotFound());
583 ASSERT_EQ(values[0], "bar");
584 ASSERT_EQ(values[1], "barbar");
585 ASSERT_EQ(values[2], "foo");
586
587 // Verify Txn Did not Commit
588 s = txn2->Commit();
589 ASSERT_TRUE(s.IsBusy());
590
591 s = db->DropColumnFamily(handles[1]);
592 ASSERT_OK(s);
593 s = db->DropColumnFamily(handles[2]);
594 ASSERT_OK(s);
595
596 delete txn;
597 delete txn2;
598
599 for (auto handle : handles) {
600 delete handle;
601 }
602 }
603
604 TEST_F(OptimisticTransactionTest, EmptyTest) {
605 WriteOptions write_options;
606 ReadOptions read_options;
607 string value;
608 Status s;
609
610 s = db->Put(write_options, "aaa", "aaa");
611 ASSERT_OK(s);
612
613 Transaction* txn = txn_db->BeginTransaction(write_options);
614 s = txn->Commit();
615 ASSERT_OK(s);
616 delete txn;
617
618 txn = txn_db->BeginTransaction(write_options);
619 txn->Rollback();
620 delete txn;
621
622 txn = txn_db->BeginTransaction(write_options);
623 s = txn->GetForUpdate(read_options, "aaa", &value);
624 ASSERT_EQ(value, "aaa");
625
626 s = txn->Commit();
627 ASSERT_OK(s);
628 delete txn;
629
630 txn = txn_db->BeginTransaction(write_options);
631 txn->SetSnapshot();
632 s = txn->GetForUpdate(read_options, "aaa", &value);
633 ASSERT_EQ(value, "aaa");
634
635 s = db->Put(write_options, "aaa", "xxx");
636 s = txn->Commit();
637 ASSERT_TRUE(s.IsBusy());
638 delete txn;
639 }
640
641 TEST_F(OptimisticTransactionTest, PredicateManyPreceders) {
642 WriteOptions write_options;
643 ReadOptions read_options1, read_options2;
644 OptimisticTransactionOptions txn_options;
645 string value;
646 Status s;
647
648 txn_options.set_snapshot = true;
649 Transaction* txn1 = txn_db->BeginTransaction(write_options, txn_options);
650 read_options1.snapshot = txn1->GetSnapshot();
651
652 Transaction* txn2 = txn_db->BeginTransaction(write_options);
653 txn2->SetSnapshot();
654 read_options2.snapshot = txn2->GetSnapshot();
655
656 std::vector<Slice> multiget_keys = {"1", "2", "3"};
657 std::vector<std::string> multiget_values;
658
659 std::vector<Status> results =
660 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
661 ASSERT_TRUE(results[1].IsNotFound());
662
663 txn2->Put("2", "x");
664
665 s = txn2->Commit();
666 ASSERT_OK(s);
667
668 multiget_values.clear();
669 results =
670 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
671 ASSERT_TRUE(results[1].IsNotFound());
672
673 // should not commit since txn2 wrote a key txn has read
674 s = txn1->Commit();
675 ASSERT_TRUE(s.IsBusy());
676
677 delete txn1;
678 delete txn2;
679
680 txn1 = txn_db->BeginTransaction(write_options, txn_options);
681 read_options1.snapshot = txn1->GetSnapshot();
682
683 txn2 = txn_db->BeginTransaction(write_options, txn_options);
684 read_options2.snapshot = txn2->GetSnapshot();
685
686 txn1->Put("4", "x");
687
688 txn2->Delete("4");
689
690 // txn1 can commit since txn2's delete hasn't happened yet (it's just batched)
691 s = txn1->Commit();
692 ASSERT_OK(s);
693
694 s = txn2->GetForUpdate(read_options2, "4", &value);
695 ASSERT_TRUE(s.IsNotFound());
696
697 // txn2 cannot commit since txn1 changed "4"
698 s = txn2->Commit();
699 ASSERT_TRUE(s.IsBusy());
700
701 delete txn1;
702 delete txn2;
703 }
704
705 TEST_F(OptimisticTransactionTest, LostUpdate) {
706 WriteOptions write_options;
707 ReadOptions read_options, read_options1, read_options2;
708 OptimisticTransactionOptions txn_options;
709 string value;
710 Status s;
711
712 // Test 2 transactions writing to the same key in multiple orders and
713 // with/without snapshots
714
715 Transaction* txn1 = txn_db->BeginTransaction(write_options);
716 Transaction* txn2 = txn_db->BeginTransaction(write_options);
717
718 txn1->Put("1", "1");
719 txn2->Put("1", "2");
720
721 s = txn1->Commit();
722 ASSERT_OK(s);
723
724 s = txn2->Commit();
725 ASSERT_TRUE(s.IsBusy());
726
727 delete txn1;
728 delete txn2;
729
730 txn_options.set_snapshot = true;
731 txn1 = txn_db->BeginTransaction(write_options, txn_options);
732 read_options1.snapshot = txn1->GetSnapshot();
733
734 txn2 = txn_db->BeginTransaction(write_options, txn_options);
735 read_options2.snapshot = txn2->GetSnapshot();
736
737 txn1->Put("1", "3");
738 txn2->Put("1", "4");
739
740 s = txn1->Commit();
741 ASSERT_OK(s);
742
743 s = txn2->Commit();
744 ASSERT_TRUE(s.IsBusy());
745
746 delete txn1;
747 delete txn2;
748
749 txn1 = txn_db->BeginTransaction(write_options, txn_options);
750 read_options1.snapshot = txn1->GetSnapshot();
751
752 txn2 = txn_db->BeginTransaction(write_options, txn_options);
753 read_options2.snapshot = txn2->GetSnapshot();
754
755 txn1->Put("1", "5");
756 s = txn1->Commit();
757 ASSERT_OK(s);
758
759 txn2->Put("1", "6");
760 s = txn2->Commit();
761 ASSERT_TRUE(s.IsBusy());
762
763 delete txn1;
764 delete txn2;
765
766 txn1 = txn_db->BeginTransaction(write_options, txn_options);
767 read_options1.snapshot = txn1->GetSnapshot();
768
769 txn2 = txn_db->BeginTransaction(write_options, txn_options);
770 read_options2.snapshot = txn2->GetSnapshot();
771
772 txn1->Put("1", "5");
773 s = txn1->Commit();
774 ASSERT_OK(s);
775
776 txn2->SetSnapshot();
777 txn2->Put("1", "6");
778 s = txn2->Commit();
779 ASSERT_OK(s);
780
781 delete txn1;
782 delete txn2;
783
784 txn1 = txn_db->BeginTransaction(write_options);
785 txn2 = txn_db->BeginTransaction(write_options);
786
787 txn1->Put("1", "7");
788 s = txn1->Commit();
789 ASSERT_OK(s);
790
791 txn2->Put("1", "8");
792 s = txn2->Commit();
793 ASSERT_OK(s);
794
795 delete txn1;
796 delete txn2;
797
798 s = db->Get(read_options, "1", &value);
799 ASSERT_OK(s);
800 ASSERT_EQ(value, "8");
801 }
802
803 TEST_F(OptimisticTransactionTest, UntrackedWrites) {
804 WriteOptions write_options;
805 ReadOptions read_options;
806 string value;
807 Status s;
808
809 // Verify transaction rollback works for untracked keys.
810 Transaction* txn = txn_db->BeginTransaction(write_options);
811 txn->PutUntracked("untracked", "0");
812 txn->Rollback();
813 s = db->Get(read_options, "untracked", &value);
814 ASSERT_TRUE(s.IsNotFound());
815
816 delete txn;
817 txn = txn_db->BeginTransaction(write_options);
818
819 txn->Put("tracked", "1");
820 txn->PutUntracked("untracked", "1");
821 txn->MergeUntracked("untracked", "2");
822 txn->DeleteUntracked("untracked");
823
824 // Write to the untracked key outside of the transaction and verify
825 // it doesn't prevent the transaction from committing.
826 s = db->Put(write_options, "untracked", "x");
827 ASSERT_OK(s);
828
829 s = txn->Commit();
830 ASSERT_OK(s);
831
832 s = db->Get(read_options, "untracked", &value);
833 ASSERT_TRUE(s.IsNotFound());
834
835 delete txn;
836 txn = txn_db->BeginTransaction(write_options);
837
838 txn->Put("tracked", "10");
839 txn->PutUntracked("untracked", "A");
840
841 // Write to tracked key outside of the transaction and verify that the
842 // untracked keys are not written when the commit fails.
843 s = db->Delete(write_options, "tracked");
844
845 s = txn->Commit();
846 ASSERT_TRUE(s.IsBusy());
847
848 s = db->Get(read_options, "untracked", &value);
849 ASSERT_TRUE(s.IsNotFound());
850
851 delete txn;
852 }
853
854 TEST_F(OptimisticTransactionTest, IteratorTest) {
855 WriteOptions write_options;
856 ReadOptions read_options, snapshot_read_options;
857 OptimisticTransactionOptions txn_options;
858 string value;
859 Status s;
860
861 // Write some keys to the db
862 s = db->Put(write_options, "A", "a");
863 ASSERT_OK(s);
864
865 s = db->Put(write_options, "G", "g");
866 ASSERT_OK(s);
867
868 s = db->Put(write_options, "F", "f");
869 ASSERT_OK(s);
870
871 s = db->Put(write_options, "C", "c");
872 ASSERT_OK(s);
873
874 s = db->Put(write_options, "D", "d");
875 ASSERT_OK(s);
876
877 Transaction* txn = txn_db->BeginTransaction(write_options);
878 ASSERT_TRUE(txn);
879
880 // Write some keys in a txn
881 s = txn->Put("B", "b");
882 ASSERT_OK(s);
883
884 s = txn->Put("H", "h");
885 ASSERT_OK(s);
886
887 s = txn->Delete("D");
888 ASSERT_OK(s);
889
890 s = txn->Put("E", "e");
891 ASSERT_OK(s);
892
893 txn->SetSnapshot();
894 const Snapshot* snapshot = txn->GetSnapshot();
895
896 // Write some keys to the db after the snapshot
897 s = db->Put(write_options, "BB", "xx");
898 ASSERT_OK(s);
899
900 s = db->Put(write_options, "C", "xx");
901 ASSERT_OK(s);
902
903 read_options.snapshot = snapshot;
904 Iterator* iter = txn->GetIterator(read_options);
905 ASSERT_OK(iter->status());
906 iter->SeekToFirst();
907
908 // Read all keys via iter and lock them all
909 std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
910 for (int i = 0; i < 7; i++) {
911 ASSERT_OK(iter->status());
912 ASSERT_TRUE(iter->Valid());
913 ASSERT_EQ(results[i], iter->value().ToString());
914
915 s = txn->GetForUpdate(read_options, iter->key(), nullptr);
916 ASSERT_OK(s);
917
918 iter->Next();
919 }
920 ASSERT_FALSE(iter->Valid());
921
922 iter->Seek("G");
923 ASSERT_OK(iter->status());
924 ASSERT_TRUE(iter->Valid());
925 ASSERT_EQ("g", iter->value().ToString());
926
927 iter->Prev();
928 ASSERT_OK(iter->status());
929 ASSERT_TRUE(iter->Valid());
930 ASSERT_EQ("f", iter->value().ToString());
931
932 iter->Seek("D");
933 ASSERT_OK(iter->status());
934 ASSERT_TRUE(iter->Valid());
935 ASSERT_EQ("e", iter->value().ToString());
936
937 iter->Seek("C");
938 ASSERT_OK(iter->status());
939 ASSERT_TRUE(iter->Valid());
940 ASSERT_EQ("c", iter->value().ToString());
941
942 iter->Next();
943 ASSERT_OK(iter->status());
944 ASSERT_TRUE(iter->Valid());
945 ASSERT_EQ("e", iter->value().ToString());
946
947 iter->Seek("");
948 ASSERT_OK(iter->status());
949 ASSERT_TRUE(iter->Valid());
950 ASSERT_EQ("a", iter->value().ToString());
951
952 iter->Seek("X");
953 ASSERT_OK(iter->status());
954 ASSERT_FALSE(iter->Valid());
955
956 iter->SeekToLast();
957 ASSERT_OK(iter->status());
958 ASSERT_TRUE(iter->Valid());
959 ASSERT_EQ("h", iter->value().ToString());
960
961 // key "C" was modified in the db after txn's snapshot. txn will not commit.
962 s = txn->Commit();
963 ASSERT_TRUE(s.IsBusy());
964
965 delete iter;
966 delete txn;
967 }
968
969 TEST_F(OptimisticTransactionTest, SavepointTest) {
970 WriteOptions write_options;
971 ReadOptions read_options, snapshot_read_options;
972 OptimisticTransactionOptions txn_options;
973 string value;
974 Status s;
975
976 Transaction* txn = txn_db->BeginTransaction(write_options);
977 ASSERT_TRUE(txn);
978
979 s = txn->RollbackToSavePoint();
980 ASSERT_TRUE(s.IsNotFound());
981
982 txn->SetSavePoint(); // 1
983
984 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
985 s = txn->RollbackToSavePoint();
986 ASSERT_TRUE(s.IsNotFound());
987
988 s = txn->Put("B", "b");
989 ASSERT_OK(s);
990
991 s = txn->Commit();
992 ASSERT_OK(s);
993
994 s = db->Get(read_options, "B", &value);
995 ASSERT_OK(s);
996 ASSERT_EQ("b", value);
997
998 delete txn;
999 txn = txn_db->BeginTransaction(write_options);
1000 ASSERT_TRUE(txn);
1001
1002 s = txn->Put("A", "a");
1003 ASSERT_OK(s);
1004
1005 s = txn->Put("B", "bb");
1006 ASSERT_OK(s);
1007
1008 s = txn->Put("C", "c");
1009 ASSERT_OK(s);
1010
1011 txn->SetSavePoint(); // 2
1012
1013 s = txn->Delete("B");
1014 ASSERT_OK(s);
1015
1016 s = txn->Put("C", "cc");
1017 ASSERT_OK(s);
1018
1019 s = txn->Put("D", "d");
1020 ASSERT_OK(s);
1021
1022 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
1023
1024 s = txn->Get(read_options, "A", &value);
1025 ASSERT_OK(s);
1026 ASSERT_EQ("a", value);
1027
1028 s = txn->Get(read_options, "B", &value);
1029 ASSERT_OK(s);
1030 ASSERT_EQ("bb", value);
1031
1032 s = txn->Get(read_options, "C", &value);
1033 ASSERT_OK(s);
1034 ASSERT_EQ("c", value);
1035
1036 s = txn->Get(read_options, "D", &value);
1037 ASSERT_TRUE(s.IsNotFound());
1038
1039 s = txn->Put("A", "a");
1040 ASSERT_OK(s);
1041
1042 s = txn->Put("E", "e");
1043 ASSERT_OK(s);
1044
1045 // Rollback to beginning of txn
1046 s = txn->RollbackToSavePoint();
1047 ASSERT_TRUE(s.IsNotFound());
1048 txn->Rollback();
1049
1050 s = txn->Get(read_options, "A", &value);
1051 ASSERT_TRUE(s.IsNotFound());
1052
1053 s = txn->Get(read_options, "B", &value);
1054 ASSERT_OK(s);
1055 ASSERT_EQ("b", value);
1056
1057 s = txn->Get(read_options, "D", &value);
1058 ASSERT_TRUE(s.IsNotFound());
1059
1060 s = txn->Get(read_options, "D", &value);
1061 ASSERT_TRUE(s.IsNotFound());
1062
1063 s = txn->Get(read_options, "E", &value);
1064 ASSERT_TRUE(s.IsNotFound());
1065
1066 s = txn->Put("A", "aa");
1067 ASSERT_OK(s);
1068
1069 s = txn->Put("F", "f");
1070 ASSERT_OK(s);
1071
1072 txn->SetSavePoint(); // 3
1073 txn->SetSavePoint(); // 4
1074
1075 s = txn->Put("G", "g");
1076 ASSERT_OK(s);
1077
1078 s = txn->Delete("F");
1079 ASSERT_OK(s);
1080
1081 s = txn->Delete("B");
1082 ASSERT_OK(s);
1083
1084 s = txn->Get(read_options, "A", &value);
1085 ASSERT_OK(s);
1086 ASSERT_EQ("aa", value);
1087
1088 s = txn->Get(read_options, "F", &value);
1089 ASSERT_TRUE(s.IsNotFound());
1090
1091 s = txn->Get(read_options, "B", &value);
1092 ASSERT_TRUE(s.IsNotFound());
1093
1094 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
1095
1096 s = txn->Get(read_options, "F", &value);
1097 ASSERT_OK(s);
1098 ASSERT_EQ("f", value);
1099
1100 s = txn->Get(read_options, "G", &value);
1101 ASSERT_TRUE(s.IsNotFound());
1102
1103 s = txn->Commit();
1104 ASSERT_OK(s);
1105
1106 s = db->Get(read_options, "F", &value);
1107 ASSERT_OK(s);
1108 ASSERT_EQ("f", value);
1109
1110 s = db->Get(read_options, "G", &value);
1111 ASSERT_TRUE(s.IsNotFound());
1112
1113 s = db->Get(read_options, "A", &value);
1114 ASSERT_OK(s);
1115 ASSERT_EQ("aa", value);
1116
1117 s = db->Get(read_options, "B", &value);
1118 ASSERT_OK(s);
1119 ASSERT_EQ("b", value);
1120
1121 s = db->Get(read_options, "C", &value);
1122 ASSERT_TRUE(s.IsNotFound());
1123
1124 s = db->Get(read_options, "D", &value);
1125 ASSERT_TRUE(s.IsNotFound());
1126
1127 s = db->Get(read_options, "E", &value);
1128 ASSERT_TRUE(s.IsNotFound());
1129
1130 delete txn;
1131 }
1132
1133 TEST_F(OptimisticTransactionTest, UndoGetForUpdateTest) {
1134 WriteOptions write_options;
1135 ReadOptions read_options, snapshot_read_options;
1136 OptimisticTransactionOptions txn_options;
1137 string value;
1138 Status s;
1139
1140 db->Put(write_options, "A", "");
1141
1142 Transaction* txn1 = txn_db->BeginTransaction(write_options);
1143 ASSERT_TRUE(txn1);
1144
1145 s = txn1->GetForUpdate(read_options, "A", &value);
1146 ASSERT_OK(s);
1147
1148 txn1->UndoGetForUpdate("A");
1149
1150 Transaction* txn2 = txn_db->BeginTransaction(write_options);
1151 txn2->Put("A", "x");
1152 s = txn2->Commit();
1153 ASSERT_OK(s);
1154 delete txn2;
1155
1156 // Verify that txn1 can commit since A isn't conflict checked
1157 s = txn1->Commit();
1158 ASSERT_OK(s);
1159 delete txn1;
1160
1161 txn1 = txn_db->BeginTransaction(write_options);
1162 txn1->Put("A", "a");
1163
1164 s = txn1->GetForUpdate(read_options, "A", &value);
1165 ASSERT_OK(s);
1166
1167 txn1->UndoGetForUpdate("A");
1168
1169 txn2 = txn_db->BeginTransaction(write_options);
1170 txn2->Put("A", "x");
1171 s = txn2->Commit();
1172 ASSERT_OK(s);
1173 delete txn2;
1174
1175 // Verify that txn1 cannot commit since A will still be conflict checked
1176 s = txn1->Commit();
1177 ASSERT_TRUE(s.IsBusy());
1178 delete txn1;
1179
1180 txn1 = txn_db->BeginTransaction(write_options);
1181
1182 s = txn1->GetForUpdate(read_options, "A", &value);
1183 ASSERT_OK(s);
1184 s = txn1->GetForUpdate(read_options, "A", &value);
1185 ASSERT_OK(s);
1186
1187 txn1->UndoGetForUpdate("A");
1188
1189 txn2 = txn_db->BeginTransaction(write_options);
1190 txn2->Put("A", "x");
1191 s = txn2->Commit();
1192 ASSERT_OK(s);
1193 delete txn2;
1194
1195 // Verify that txn1 cannot commit since A will still be conflict checked
1196 s = txn1->Commit();
1197 ASSERT_TRUE(s.IsBusy());
1198 delete txn1;
1199
1200 txn1 = txn_db->BeginTransaction(write_options);
1201
1202 s = txn1->GetForUpdate(read_options, "A", &value);
1203 ASSERT_OK(s);
1204 s = txn1->GetForUpdate(read_options, "A", &value);
1205 ASSERT_OK(s);
1206
1207 txn1->UndoGetForUpdate("A");
1208 txn1->UndoGetForUpdate("A");
1209
1210 txn2 = txn_db->BeginTransaction(write_options);
1211 txn2->Put("A", "x");
1212 s = txn2->Commit();
1213 ASSERT_OK(s);
1214 delete txn2;
1215
1216 // Verify that txn1 can commit since A isn't conflict checked
1217 s = txn1->Commit();
1218 ASSERT_OK(s);
1219 delete txn1;
1220
1221 txn1 = txn_db->BeginTransaction(write_options);
1222
1223 s = txn1->GetForUpdate(read_options, "A", &value);
1224 ASSERT_OK(s);
1225
1226 txn1->SetSavePoint();
1227 txn1->UndoGetForUpdate("A");
1228
1229 txn2 = txn_db->BeginTransaction(write_options);
1230 txn2->Put("A", "x");
1231 s = txn2->Commit();
1232 ASSERT_OK(s);
1233 delete txn2;
1234
1235 // Verify that txn1 cannot commit since A will still be conflict checked
1236 s = txn1->Commit();
1237 ASSERT_TRUE(s.IsBusy());
1238 delete txn1;
1239
1240 txn1 = txn_db->BeginTransaction(write_options);
1241
1242 s = txn1->GetForUpdate(read_options, "A", &value);
1243 ASSERT_OK(s);
1244
1245 txn1->SetSavePoint();
1246 s = txn1->GetForUpdate(read_options, "A", &value);
1247 ASSERT_OK(s);
1248 txn1->UndoGetForUpdate("A");
1249
1250 txn2 = txn_db->BeginTransaction(write_options);
1251 txn2->Put("A", "x");
1252 s = txn2->Commit();
1253 ASSERT_OK(s);
1254 delete txn2;
1255
1256 // Verify that txn1 cannot commit since A will still be conflict checked
1257 s = txn1->Commit();
1258 ASSERT_TRUE(s.IsBusy());
1259 delete txn1;
1260
1261 txn1 = txn_db->BeginTransaction(write_options);
1262
1263 s = txn1->GetForUpdate(read_options, "A", &value);
1264 ASSERT_OK(s);
1265
1266 txn1->SetSavePoint();
1267 s = txn1->GetForUpdate(read_options, "A", &value);
1268 ASSERT_OK(s);
1269 txn1->UndoGetForUpdate("A");
1270
1271 txn1->RollbackToSavePoint();
1272 txn1->UndoGetForUpdate("A");
1273
1274 txn2 = txn_db->BeginTransaction(write_options);
1275 txn2->Put("A", "x");
1276 s = txn2->Commit();
1277 ASSERT_OK(s);
1278 delete txn2;
1279
1280 // Verify that txn1 can commit since A isn't conflict checked
1281 s = txn1->Commit();
1282 ASSERT_OK(s);
1283 delete txn1;
1284 }
1285
1286 namespace {
1287 Status OptimisticTransactionStressTestInserter(OptimisticTransactionDB* db,
1288 const size_t num_transactions,
1289 const size_t num_sets,
1290 const size_t num_keys_per_set) {
1291 size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
1292 Random64 _rand(seed);
1293 WriteOptions write_options;
1294 ReadOptions read_options;
1295 OptimisticTransactionOptions txn_options;
1296 txn_options.set_snapshot = true;
1297
1298 RandomTransactionInserter inserter(&_rand, write_options, read_options,
1299 num_keys_per_set,
1300 static_cast<uint16_t>(num_sets));
1301
1302 for (size_t t = 0; t < num_transactions; t++) {
1303 bool success = inserter.OptimisticTransactionDBInsert(db, txn_options);
1304 if (!success) {
1305 // unexpected failure
1306 return inserter.GetLastStatus();
1307 }
1308 }
1309
1310 // Make sure at least some of the transactions succeeded. It's ok if
1311 // some failed due to write-conflicts.
1312 if (inserter.GetFailureCount() > num_transactions / 2) {
1313 return Status::TryAgain("Too many transactions failed! " +
1314 std::to_string(inserter.GetFailureCount()) + " / " +
1315 std::to_string(num_transactions));
1316 }
1317
1318 return Status::OK();
1319 }
1320 } // namespace
1321
1322 TEST_F(OptimisticTransactionTest, OptimisticTransactionStressTest) {
1323 const size_t num_threads = 4;
1324 const size_t num_transactions_per_thread = 10000;
1325 const size_t num_sets = 3;
1326 const size_t num_keys_per_set = 100;
1327 // Setting the key-space to be 100 keys should cause enough write-conflicts
1328 // to make this test interesting.
1329
1330 std::vector<port::Thread> threads;
1331
1332 std::function<void()> call_inserter = [&] {
1333 ASSERT_OK(OptimisticTransactionStressTestInserter(
1334 txn_db, num_transactions_per_thread, num_sets, num_keys_per_set));
1335 };
1336
1337 // Create N threads that use RandomTransactionInserter to write
1338 // many transactions.
1339 for (uint32_t i = 0; i < num_threads; i++) {
1340 threads.emplace_back(call_inserter);
1341 }
1342
1343 // Wait for all threads to run
1344 for (auto& t : threads) {
1345 t.join();
1346 }
1347
1348 // Verify that data is consistent
1349 Status s = RandomTransactionInserter::Verify(db, num_sets);
1350 ASSERT_OK(s);
1351 }
1352
1353 TEST_F(OptimisticTransactionTest, SequenceNumberAfterRecoverTest) {
1354 WriteOptions write_options;
1355 OptimisticTransactionOptions transaction_options;
1356
1357 Transaction* transaction(txn_db->BeginTransaction(write_options, transaction_options));
1358 Status s = transaction->Put("foo", "val");
1359 ASSERT_OK(s);
1360 s = transaction->Put("foo2", "val");
1361 ASSERT_OK(s);
1362 s = transaction->Put("foo3", "val");
1363 ASSERT_OK(s);
1364 s = transaction->Commit();
1365 ASSERT_OK(s);
1366 delete transaction;
1367
1368 Reopen();
1369 transaction = txn_db->BeginTransaction(write_options, transaction_options);
1370 s = transaction->Put("bar", "val");
1371 ASSERT_OK(s);
1372 s = transaction->Put("bar2", "val");
1373 ASSERT_OK(s);
1374 s = transaction->Commit();
1375 ASSERT_OK(s);
1376
1377 delete transaction;
1378 }
1379
1380 } // namespace rocksdb
1381
1382 int main(int argc, char** argv) {
1383 ::testing::InitGoogleTest(&argc, argv);
1384 return RUN_ALL_TESTS();
1385 }
1386
1387 #else
1388 #include <stdio.h>
1389
1390 int main(int argc, char** argv) {
1391 fprintf(
1392 stderr,
1393 "SKIPPED as optimistic_transaction is not supported in ROCKSDB_LITE\n");
1394 return 0;
1395 }
1396
1397 #endif // !ROCKSDB_LITE