]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/optimistic_transaction_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / optimistic_transaction_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
8#include <functional>
9#include <string>
10#include <thread>
11
f67539c2 12#include "db/db_impl/db_impl.h"
f67539c2 13#include "port/port.h"
7c673cae 14#include "rocksdb/db.h"
f67539c2 15#include "rocksdb/perf_context.h"
7c673cae
FG
16#include "rocksdb/utilities/optimistic_transaction_db.h"
17#include "rocksdb/utilities/transaction.h"
f67539c2
TL
18#include "test_util/sync_point.h"
19#include "test_util/testharness.h"
20#include "test_util/transaction_test_util.h"
7c673cae 21#include "util/crc32c.h"
7c673cae 22#include "util/random.h"
7c673cae
FG
23
24using std::string;
25
f67539c2 26namespace ROCKSDB_NAMESPACE {
7c673cae 27
f67539c2
TL
28class OptimisticTransactionTest
29 : public testing::Test,
30 public testing::WithParamInterface<OccValidationPolicy> {
7c673cae
FG
31 public:
32 OptimisticTransactionDB* txn_db;
7c673cae
FG
33 string dbname;
34 Options options;
35
36 OptimisticTransactionTest() {
37 options.create_if_missing = true;
38 options.max_write_buffer_number = 2;
f67539c2 39 options.max_write_buffer_size_to_maintain = 1600;
11fdf7f2 40 dbname = test::PerThreadDBPath("optimistic_transaction_testdb");
7c673cae
FG
41
42 DestroyDB(dbname, options);
43 Open();
44 }
494da23a 45 ~OptimisticTransactionTest() override {
7c673cae
FG
46 delete txn_db;
47 DestroyDB(dbname, options);
48 }
49
50 void Reopen() {
51 delete txn_db;
11fdf7f2 52 txn_db = nullptr;
7c673cae
FG
53 Open();
54 }
55
56private:
57 void Open() {
f67539c2
TL
58 ColumnFamilyOptions cf_options(options);
59 OptimisticTransactionDBOptions occ_opts;
60 occ_opts.validate_policy = GetParam();
61 std::vector<ColumnFamilyDescriptor> column_families;
62 std::vector<ColumnFamilyHandle*> handles;
63 column_families.push_back(
64 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
65 Status s =
66 OptimisticTransactionDB::Open(DBOptions(options), occ_opts, dbname,
67 column_families, &handles, &txn_db);
68
7c673cae 69 assert(s.ok());
11fdf7f2 70 assert(txn_db != nullptr);
f67539c2
TL
71 assert(handles.size() == 1);
72 delete handles[0];
7c673cae
FG
73 }
74};
75
f67539c2 76TEST_P(OptimisticTransactionTest, SuccessTest) {
7c673cae
FG
77 WriteOptions write_options;
78 ReadOptions read_options;
79 string value;
80 Status s;
81
11fdf7f2
TL
82 txn_db->Put(write_options, Slice("foo"), Slice("bar"));
83 txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
7c673cae
FG
84
85 Transaction* txn = txn_db->BeginTransaction(write_options);
86 ASSERT_TRUE(txn);
87
88 txn->GetForUpdate(read_options, "foo", &value);
89 ASSERT_EQ(value, "bar");
90
91 txn->Put(Slice("foo"), Slice("bar2"));
92
93 txn->GetForUpdate(read_options, "foo", &value);
94 ASSERT_EQ(value, "bar2");
95
96 s = txn->Commit();
97 ASSERT_OK(s);
98
11fdf7f2 99 txn_db->Get(read_options, "foo", &value);
7c673cae
FG
100 ASSERT_EQ(value, "bar2");
101
102 delete txn;
103}
104
f67539c2 105TEST_P(OptimisticTransactionTest, WriteConflictTest) {
7c673cae
FG
106 WriteOptions write_options;
107 ReadOptions read_options;
108 string value;
109 Status s;
110
11fdf7f2
TL
111 txn_db->Put(write_options, "foo", "bar");
112 txn_db->Put(write_options, "foo2", "bar");
7c673cae
FG
113
114 Transaction* txn = txn_db->BeginTransaction(write_options);
115 ASSERT_TRUE(txn);
116
117 txn->Put("foo", "bar2");
118
119 // This Put outside of a transaction will conflict with the previous write
11fdf7f2 120 s = txn_db->Put(write_options, "foo", "barz");
7c673cae
FG
121 ASSERT_OK(s);
122
11fdf7f2 123 s = txn_db->Get(read_options, "foo", &value);
7c673cae
FG
124 ASSERT_EQ(value, "barz");
125 ASSERT_EQ(1, txn->GetNumKeys());
126
127 s = txn->Commit();
128 ASSERT_TRUE(s.IsBusy()); // Txn should not commit
129
130 // Verify that transaction did not write anything
11fdf7f2 131 txn_db->Get(read_options, "foo", &value);
7c673cae 132 ASSERT_EQ(value, "barz");
11fdf7f2 133 txn_db->Get(read_options, "foo2", &value);
7c673cae
FG
134 ASSERT_EQ(value, "bar");
135
136 delete txn;
137}
138
f67539c2 139TEST_P(OptimisticTransactionTest, WriteConflictTest2) {
7c673cae
FG
140 WriteOptions write_options;
141 ReadOptions read_options;
142 OptimisticTransactionOptions txn_options;
143 string value;
144 Status s;
145
11fdf7f2
TL
146 txn_db->Put(write_options, "foo", "bar");
147 txn_db->Put(write_options, "foo2", "bar");
7c673cae
FG
148
149 txn_options.set_snapshot = true;
150 Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
151 ASSERT_TRUE(txn);
152
153 // This Put outside of a transaction will conflict with a later write
11fdf7f2 154 s = txn_db->Put(write_options, "foo", "barz");
7c673cae
FG
155 ASSERT_OK(s);
156
157 txn->Put("foo", "bar2"); // Conflicts with write done after snapshot taken
158
11fdf7f2 159 s = txn_db->Get(read_options, "foo", &value);
7c673cae
FG
160 ASSERT_EQ(value, "barz");
161
162 s = txn->Commit();
163 ASSERT_TRUE(s.IsBusy()); // Txn should not commit
164
165 // Verify that transaction did not write anything
11fdf7f2 166 txn_db->Get(read_options, "foo", &value);
7c673cae 167 ASSERT_EQ(value, "barz");
11fdf7f2 168 txn_db->Get(read_options, "foo2", &value);
7c673cae
FG
169 ASSERT_EQ(value, "bar");
170
171 delete txn;
172}
173
f67539c2 174TEST_P(OptimisticTransactionTest, ReadConflictTest) {
7c673cae
FG
175 WriteOptions write_options;
176 ReadOptions read_options, snapshot_read_options;
177 OptimisticTransactionOptions txn_options;
178 string value;
179 Status s;
180
11fdf7f2
TL
181 txn_db->Put(write_options, "foo", "bar");
182 txn_db->Put(write_options, "foo2", "bar");
7c673cae
FG
183
184 txn_options.set_snapshot = true;
185 Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
186 ASSERT_TRUE(txn);
187
188 txn->SetSnapshot();
189 snapshot_read_options.snapshot = txn->GetSnapshot();
190
191 txn->GetForUpdate(snapshot_read_options, "foo", &value);
192 ASSERT_EQ(value, "bar");
193
194 // This Put outside of a transaction will conflict with the previous read
11fdf7f2 195 s = txn_db->Put(write_options, "foo", "barz");
7c673cae
FG
196 ASSERT_OK(s);
197
11fdf7f2 198 s = txn_db->Get(read_options, "foo", &value);
7c673cae
FG
199 ASSERT_EQ(value, "barz");
200
201 s = txn->Commit();
202 ASSERT_TRUE(s.IsBusy()); // Txn should not commit
203
204 // Verify that transaction did not write anything
205 txn->GetForUpdate(read_options, "foo", &value);
206 ASSERT_EQ(value, "barz");
207 txn->GetForUpdate(read_options, "foo2", &value);
208 ASSERT_EQ(value, "bar");
209
210 delete txn;
211}
212
f67539c2 213TEST_P(OptimisticTransactionTest, TxnOnlyTest) {
7c673cae
FG
214 // Test to make sure transactions work when there are no other writes in an
215 // empty db.
216
217 WriteOptions write_options;
218 ReadOptions read_options;
219 string value;
220 Status s;
221
222 Transaction* txn = txn_db->BeginTransaction(write_options);
223 ASSERT_TRUE(txn);
224
225 txn->Put("x", "y");
226
227 s = txn->Commit();
228 ASSERT_OK(s);
229
230 delete txn;
231}
232
f67539c2 233TEST_P(OptimisticTransactionTest, FlushTest) {
7c673cae
FG
234 WriteOptions write_options;
235 ReadOptions read_options, snapshot_read_options;
236 string value;
237 Status s;
238
11fdf7f2
TL
239 txn_db->Put(write_options, Slice("foo"), Slice("bar"));
240 txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
7c673cae
FG
241
242 Transaction* txn = txn_db->BeginTransaction(write_options);
243 ASSERT_TRUE(txn);
244
245 snapshot_read_options.snapshot = txn->GetSnapshot();
246
247 txn->GetForUpdate(snapshot_read_options, "foo", &value);
248 ASSERT_EQ(value, "bar");
249
250 txn->Put(Slice("foo"), Slice("bar2"));
251
252 txn->GetForUpdate(snapshot_read_options, "foo", &value);
253 ASSERT_EQ(value, "bar2");
254
255 // Put a random key so we have a memtable to flush
11fdf7f2 256 s = txn_db->Put(write_options, "dummy", "dummy");
7c673cae
FG
257 ASSERT_OK(s);
258
259 // force a memtable flush
260 FlushOptions flush_ops;
11fdf7f2 261 txn_db->Flush(flush_ops);
7c673cae
FG
262
263 s = txn->Commit();
264 // txn should commit since the flushed table is still in MemtableList History
265 ASSERT_OK(s);
266
11fdf7f2 267 txn_db->Get(read_options, "foo", &value);
7c673cae
FG
268 ASSERT_EQ(value, "bar2");
269
270 delete txn;
271}
272
f67539c2 273TEST_P(OptimisticTransactionTest, FlushTest2) {
7c673cae
FG
274 WriteOptions write_options;
275 ReadOptions read_options, snapshot_read_options;
276 string value;
277 Status s;
278
11fdf7f2
TL
279 txn_db->Put(write_options, Slice("foo"), Slice("bar"));
280 txn_db->Put(write_options, Slice("foo2"), Slice("bar"));
7c673cae
FG
281
282 Transaction* txn = txn_db->BeginTransaction(write_options);
283 ASSERT_TRUE(txn);
284
285 snapshot_read_options.snapshot = txn->GetSnapshot();
286
287 txn->GetForUpdate(snapshot_read_options, "foo", &value);
288 ASSERT_EQ(value, "bar");
289
290 txn->Put(Slice("foo"), Slice("bar2"));
291
292 txn->GetForUpdate(snapshot_read_options, "foo", &value);
293 ASSERT_EQ(value, "bar2");
294
295 // Put a random key so we have a MemTable to flush
11fdf7f2 296 s = txn_db->Put(write_options, "dummy", "dummy");
7c673cae
FG
297 ASSERT_OK(s);
298
299 // force a memtable flush
300 FlushOptions flush_ops;
11fdf7f2 301 txn_db->Flush(flush_ops);
7c673cae
FG
302
303 // Put a random key so we have a MemTable to flush
11fdf7f2 304 s = txn_db->Put(write_options, "dummy", "dummy2");
7c673cae
FG
305 ASSERT_OK(s);
306
307 // force a memtable flush
11fdf7f2 308 txn_db->Flush(flush_ops);
7c673cae 309
11fdf7f2 310 s = txn_db->Put(write_options, "dummy", "dummy3");
7c673cae
FG
311 ASSERT_OK(s);
312
313 // force a memtable flush
314 // Since our test db has max_write_buffer_number=2, this flush will cause
315 // the first memtable to get purged from the MemtableList history.
11fdf7f2 316 txn_db->Flush(flush_ops);
7c673cae
FG
317
318 s = txn->Commit();
319 // txn should not commit since MemTableList History is not large enough
320 ASSERT_TRUE(s.IsTryAgain());
321
11fdf7f2 322 txn_db->Get(read_options, "foo", &value);
7c673cae
FG
323 ASSERT_EQ(value, "bar");
324
325 delete txn;
326}
327
f67539c2
TL
328// Trigger the condition where some old memtables are skipped when doing
329// TransactionUtil::CheckKey(), and make sure the result is still correct.
330TEST_P(OptimisticTransactionTest, CheckKeySkipOldMemtable) {
331 const int kAttemptHistoryMemtable = 0;
332 const int kAttemptImmMemTable = 1;
333 for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
334 attempt++) {
335 options.max_write_buffer_number_to_maintain = 3;
336 Reopen();
337
338 WriteOptions write_options;
339 ReadOptions read_options;
340 ReadOptions snapshot_read_options;
341 ReadOptions snapshot_read_options2;
342 string value;
343 Status s;
344
345 ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar")));
346 ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar")));
347
348 Transaction* txn = txn_db->BeginTransaction(write_options);
349 ASSERT_TRUE(txn != nullptr);
350
351 Transaction* txn2 = txn_db->BeginTransaction(write_options);
352 ASSERT_TRUE(txn2 != nullptr);
353
354 snapshot_read_options.snapshot = txn->GetSnapshot();
355 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
356 ASSERT_EQ(value, "bar");
357 ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
358
359 snapshot_read_options2.snapshot = txn2->GetSnapshot();
360 ASSERT_OK(txn2->GetForUpdate(snapshot_read_options2, "foo2", &value));
361 ASSERT_EQ(value, "bar");
362 ASSERT_OK(txn2->Put(Slice("foo2"), Slice("bar2")));
363
364 // txn updates "foo" and txn2 updates "foo2", and now a write is
365 // issued for "foo", which conflicts with txn but not txn2
366 ASSERT_OK(txn_db->Put(write_options, "foo", "bar"));
367
368 if (attempt == kAttemptImmMemTable) {
369 // For the second attempt, hold flush from beginning. The memtable
370 // will be switched to immutable after calling TEST_SwitchMemtable()
371 // while CheckKey() is called.
372 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
373 {{"OptimisticTransactionTest.CheckKeySkipOldMemtable",
374 "FlushJob::Start"}});
375 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
376 }
377
378 // force a memtable flush. The memtable should still be kept
379 FlushOptions flush_ops;
380 if (attempt == kAttemptHistoryMemtable) {
381 ASSERT_OK(txn_db->Flush(flush_ops));
382 } else {
383 assert(attempt == kAttemptImmMemTable);
384 DBImpl* db_impl = static_cast<DBImpl*>(txn_db->GetRootDB());
385 db_impl->TEST_SwitchMemtable();
386 }
387 uint64_t num_imm_mems;
388 ASSERT_TRUE(txn_db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
389 &num_imm_mems));
390 if (attempt == kAttemptHistoryMemtable) {
391 ASSERT_EQ(0, num_imm_mems);
392 } else {
393 assert(attempt == kAttemptImmMemTable);
394 ASSERT_EQ(1, num_imm_mems);
395 }
396
397 // Put something in active memtable
398 ASSERT_OK(txn_db->Put(write_options, Slice("foo3"), Slice("bar")));
399
400 // Create txn3 after flushing, when this transaction is commited,
401 // only need to check the active memtable
402 Transaction* txn3 = txn_db->BeginTransaction(write_options);
403 ASSERT_TRUE(txn3 != nullptr);
404
405 // Commit both of txn and txn2. txn will conflict but txn2 will
406 // pass. In both ways, both memtables are queried.
407 SetPerfLevel(PerfLevel::kEnableCount);
408
409 get_perf_context()->Reset();
410 s = txn->Commit();
411 // We should have checked two memtables
412 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
413 // txn should fail because of conflict, even if the memtable
414 // has flushed, because it is still preserved in history.
415 ASSERT_TRUE(s.IsBusy());
416
417 get_perf_context()->Reset();
418 s = txn2->Commit();
419 // We should have checked two memtables
420 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
421 ASSERT_TRUE(s.ok());
422
423 txn3->Put(Slice("foo2"), Slice("bar2"));
424 get_perf_context()->Reset();
425 s = txn3->Commit();
426 // txn3 is created after the active memtable is created, so that is the only
427 // memtable to check.
428 ASSERT_EQ(1, get_perf_context()->get_from_memtable_count);
429 ASSERT_TRUE(s.ok());
430
431 TEST_SYNC_POINT("OptimisticTransactionTest.CheckKeySkipOldMemtable");
432 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
433
434 SetPerfLevel(PerfLevel::kDisable);
435
436 delete txn;
437 delete txn2;
438 delete txn3;
439 }
440}
441
442TEST_P(OptimisticTransactionTest, NoSnapshotTest) {
7c673cae
FG
443 WriteOptions write_options;
444 ReadOptions read_options;
445 string value;
446 Status s;
447
11fdf7f2 448 txn_db->Put(write_options, "AAA", "bar");
7c673cae
FG
449
450 Transaction* txn = txn_db->BeginTransaction(write_options);
451 ASSERT_TRUE(txn);
452
453 // Modify key after transaction start
11fdf7f2 454 txn_db->Put(write_options, "AAA", "bar1");
7c673cae
FG
455
456 // Read and write without a snapshot
457 txn->GetForUpdate(read_options, "AAA", &value);
458 ASSERT_EQ(value, "bar1");
459 txn->Put("AAA", "bar2");
460
461 // Should commit since read/write was done after data changed
462 s = txn->Commit();
463 ASSERT_OK(s);
464
465 txn->GetForUpdate(read_options, "AAA", &value);
466 ASSERT_EQ(value, "bar2");
467
468 delete txn;
469}
470
f67539c2 471TEST_P(OptimisticTransactionTest, MultipleSnapshotTest) {
7c673cae
FG
472 WriteOptions write_options;
473 ReadOptions read_options, snapshot_read_options;
474 string value;
475 Status s;
476
11fdf7f2
TL
477 txn_db->Put(write_options, "AAA", "bar");
478 txn_db->Put(write_options, "BBB", "bar");
479 txn_db->Put(write_options, "CCC", "bar");
7c673cae
FG
480
481 Transaction* txn = txn_db->BeginTransaction(write_options);
482 ASSERT_TRUE(txn);
483
11fdf7f2 484 txn_db->Put(write_options, "AAA", "bar1");
7c673cae
FG
485
486 // Read and write without a snapshot
487 txn->GetForUpdate(read_options, "AAA", &value);
488 ASSERT_EQ(value, "bar1");
489 txn->Put("AAA", "bar2");
490
491 // Modify BBB before snapshot is taken
11fdf7f2 492 txn_db->Put(write_options, "BBB", "bar1");
7c673cae
FG
493
494 txn->SetSnapshot();
495 snapshot_read_options.snapshot = txn->GetSnapshot();
496
497 // Read and write with snapshot
498 txn->GetForUpdate(snapshot_read_options, "BBB", &value);
499 ASSERT_EQ(value, "bar1");
500 txn->Put("BBB", "bar2");
501
11fdf7f2 502 txn_db->Put(write_options, "CCC", "bar1");
7c673cae
FG
503
504 // Set a new snapshot
505 txn->SetSnapshot();
506 snapshot_read_options.snapshot = txn->GetSnapshot();
507
508 // Read and write with snapshot
509 txn->GetForUpdate(snapshot_read_options, "CCC", &value);
510 ASSERT_EQ(value, "bar1");
511 txn->Put("CCC", "bar2");
512
513 s = txn->GetForUpdate(read_options, "AAA", &value);
514 ASSERT_OK(s);
515 ASSERT_EQ(value, "bar2");
516 s = txn->GetForUpdate(read_options, "BBB", &value);
517 ASSERT_OK(s);
518 ASSERT_EQ(value, "bar2");
519 s = txn->GetForUpdate(read_options, "CCC", &value);
520 ASSERT_OK(s);
521 ASSERT_EQ(value, "bar2");
522
11fdf7f2 523 s = txn_db->Get(read_options, "AAA", &value);
7c673cae
FG
524 ASSERT_OK(s);
525 ASSERT_EQ(value, "bar1");
11fdf7f2 526 s = txn_db->Get(read_options, "BBB", &value);
7c673cae
FG
527 ASSERT_OK(s);
528 ASSERT_EQ(value, "bar1");
11fdf7f2 529 s = txn_db->Get(read_options, "CCC", &value);
7c673cae
FG
530 ASSERT_OK(s);
531 ASSERT_EQ(value, "bar1");
532
533 s = txn->Commit();
534 ASSERT_OK(s);
535
11fdf7f2 536 s = txn_db->Get(read_options, "AAA", &value);
7c673cae
FG
537 ASSERT_OK(s);
538 ASSERT_EQ(value, "bar2");
11fdf7f2 539 s = txn_db->Get(read_options, "BBB", &value);
7c673cae
FG
540 ASSERT_OK(s);
541 ASSERT_EQ(value, "bar2");
11fdf7f2 542 s = txn_db->Get(read_options, "CCC", &value);
7c673cae
FG
543 ASSERT_OK(s);
544 ASSERT_EQ(value, "bar2");
545
546 // verify that we track multiple writes to the same key at different snapshots
547 delete txn;
548 txn = txn_db->BeginTransaction(write_options);
549
550 // Potentially conflicting writes
11fdf7f2
TL
551 txn_db->Put(write_options, "ZZZ", "zzz");
552 txn_db->Put(write_options, "XXX", "xxx");
7c673cae
FG
553
554 txn->SetSnapshot();
555
556 OptimisticTransactionOptions txn_options;
557 txn_options.set_snapshot = true;
558 Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options);
559 txn2->SetSnapshot();
560
561 // This should not conflict in txn since the snapshot is later than the
562 // previous write (spoiler alert: it will later conflict with txn2).
563 txn->Put("ZZZ", "zzzz");
564 s = txn->Commit();
565 ASSERT_OK(s);
566
567 delete txn;
568
569 // This will conflict since the snapshot is earlier than another write to ZZZ
570 txn2->Put("ZZZ", "xxxxx");
571
572 s = txn2->Commit();
573 ASSERT_TRUE(s.IsBusy());
574
575 delete txn2;
576}
577
f67539c2 578TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) {
7c673cae
FG
579 WriteOptions write_options;
580 ReadOptions read_options, snapshot_read_options;
581 OptimisticTransactionOptions txn_options;
582 string value;
583 Status s;
584
585 ColumnFamilyHandle *cfa, *cfb;
586 ColumnFamilyOptions cf_options;
587
588 // Create 2 new column families
11fdf7f2 589 s = txn_db->CreateColumnFamily(cf_options, "CFA", &cfa);
7c673cae 590 ASSERT_OK(s);
11fdf7f2 591 s = txn_db->CreateColumnFamily(cf_options, "CFB", &cfb);
7c673cae
FG
592 ASSERT_OK(s);
593
594 delete cfa;
595 delete cfb;
596 delete txn_db;
11fdf7f2 597 txn_db = nullptr;
7c673cae
FG
598
599 // open DB with three column families
600 std::vector<ColumnFamilyDescriptor> column_families;
601 // have to open default column family
602 column_families.push_back(
603 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
604 // open the new column families
605 column_families.push_back(
606 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
607 column_families.push_back(
608 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
609 std::vector<ColumnFamilyHandle*> handles;
610 s = OptimisticTransactionDB::Open(options, dbname, column_families, &handles,
611 &txn_db);
612 ASSERT_OK(s);
11fdf7f2 613 assert(txn_db != nullptr);
7c673cae
FG
614
615 Transaction* txn = txn_db->BeginTransaction(write_options);
616 ASSERT_TRUE(txn);
617
618 txn->SetSnapshot();
619 snapshot_read_options.snapshot = txn->GetSnapshot();
620
621 txn_options.set_snapshot = true;
622 Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options);
623 ASSERT_TRUE(txn2);
624
625 // Write some data to the db
626 WriteBatch batch;
627 batch.Put("foo", "foo");
628 batch.Put(handles[1], "AAA", "bar");
629 batch.Put(handles[1], "AAAZZZ", "bar");
11fdf7f2 630 s = txn_db->Write(write_options, &batch);
7c673cae 631 ASSERT_OK(s);
11fdf7f2 632 txn_db->Delete(write_options, handles[1], "AAAZZZ");
7c673cae
FG
633
634 // These keys do no conflict with existing writes since they're in
635 // different column families
636 txn->Delete("AAA");
637 txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
638 Slice key_slice("AAAZZZ");
639 Slice value_slices[2] = {Slice("bar"), Slice("bar")};
640 txn->Put(handles[2], SliceParts(&key_slice, 1), SliceParts(value_slices, 2));
641
642 ASSERT_EQ(3, txn->GetNumKeys());
643
644 // Txn should commit
645 s = txn->Commit();
646 ASSERT_OK(s);
11fdf7f2 647 s = txn_db->Get(read_options, "AAA", &value);
7c673cae 648 ASSERT_TRUE(s.IsNotFound());
11fdf7f2 649 s = txn_db->Get(read_options, handles[2], "AAAZZZ", &value);
7c673cae
FG
650 ASSERT_EQ(value, "barbar");
651
652 Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
653 Slice value_slice("barbarbar");
654 // This write will cause a conflict with the earlier batch write
655 txn2->Put(handles[1], SliceParts(key_slices, 3), SliceParts(&value_slice, 1));
656
657 txn2->Delete(handles[2], "XXX");
658 txn2->Delete(handles[1], "XXX");
659 s = txn2->GetForUpdate(snapshot_read_options, handles[1], "AAA", &value);
660 ASSERT_TRUE(s.IsNotFound());
661
662 // Verify txn did not commit
663 s = txn2->Commit();
664 ASSERT_TRUE(s.IsBusy());
11fdf7f2 665 s = txn_db->Get(read_options, handles[1], "AAAZZZ", &value);
7c673cae
FG
666 ASSERT_EQ(value, "barbar");
667
668 delete txn;
669 delete txn2;
670
671 txn = txn_db->BeginTransaction(write_options, txn_options);
672 snapshot_read_options.snapshot = txn->GetSnapshot();
673
674 txn2 = txn_db->BeginTransaction(write_options, txn_options);
675 ASSERT_TRUE(txn);
676
677 std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
678 handles[0], handles[2]};
679 std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
680 std::vector<std::string> values(4);
681
682 std::vector<Status> results = txn->MultiGetForUpdate(
683 snapshot_read_options, multiget_cfh, multiget_keys, &values);
684 ASSERT_OK(results[0]);
685 ASSERT_OK(results[1]);
686 ASSERT_OK(results[2]);
687 ASSERT_TRUE(results[3].IsNotFound());
688 ASSERT_EQ(values[0], "bar");
689 ASSERT_EQ(values[1], "barbar");
690 ASSERT_EQ(values[2], "foo");
691
692 txn->Delete(handles[2], "ZZZ");
693 txn->Put(handles[2], "ZZZ", "YYY");
694 txn->Put(handles[2], "ZZZ", "YYYY");
695 txn->Delete(handles[2], "ZZZ");
696 txn->Put(handles[2], "AAAZZZ", "barbarbar");
697
698 ASSERT_EQ(5, txn->GetNumKeys());
699
700 // Txn should commit
701 s = txn->Commit();
702 ASSERT_OK(s);
11fdf7f2 703 s = txn_db->Get(read_options, handles[2], "ZZZ", &value);
7c673cae
FG
704 ASSERT_TRUE(s.IsNotFound());
705
706 // Put a key which will conflict with the next txn using the previous snapshot
11fdf7f2 707 txn_db->Put(write_options, handles[2], "foo", "000");
7c673cae
FG
708
709 results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
710 multiget_keys, &values);
711 ASSERT_OK(results[0]);
712 ASSERT_OK(results[1]);
713 ASSERT_OK(results[2]);
714 ASSERT_TRUE(results[3].IsNotFound());
715 ASSERT_EQ(values[0], "bar");
716 ASSERT_EQ(values[1], "barbar");
717 ASSERT_EQ(values[2], "foo");
718
719 // Verify Txn Did not Commit
720 s = txn2->Commit();
721 ASSERT_TRUE(s.IsBusy());
722
11fdf7f2 723 s = txn_db->DropColumnFamily(handles[1]);
7c673cae 724 ASSERT_OK(s);
11fdf7f2 725 s = txn_db->DropColumnFamily(handles[2]);
7c673cae
FG
726 ASSERT_OK(s);
727
728 delete txn;
729 delete txn2;
730
731 for (auto handle : handles) {
732 delete handle;
733 }
734}
735
f67539c2 736TEST_P(OptimisticTransactionTest, EmptyTest) {
7c673cae
FG
737 WriteOptions write_options;
738 ReadOptions read_options;
739 string value;
740 Status s;
741
11fdf7f2 742 s = txn_db->Put(write_options, "aaa", "aaa");
7c673cae
FG
743 ASSERT_OK(s);
744
745 Transaction* txn = txn_db->BeginTransaction(write_options);
746 s = txn->Commit();
747 ASSERT_OK(s);
748 delete txn;
749
750 txn = txn_db->BeginTransaction(write_options);
751 txn->Rollback();
752 delete txn;
753
754 txn = txn_db->BeginTransaction(write_options);
755 s = txn->GetForUpdate(read_options, "aaa", &value);
756 ASSERT_EQ(value, "aaa");
757
758 s = txn->Commit();
759 ASSERT_OK(s);
760 delete txn;
761
762 txn = txn_db->BeginTransaction(write_options);
763 txn->SetSnapshot();
764 s = txn->GetForUpdate(read_options, "aaa", &value);
765 ASSERT_EQ(value, "aaa");
766
11fdf7f2 767 s = txn_db->Put(write_options, "aaa", "xxx");
7c673cae
FG
768 s = txn->Commit();
769 ASSERT_TRUE(s.IsBusy());
770 delete txn;
771}
772
f67539c2 773TEST_P(OptimisticTransactionTest, PredicateManyPreceders) {
7c673cae
FG
774 WriteOptions write_options;
775 ReadOptions read_options1, read_options2;
776 OptimisticTransactionOptions txn_options;
777 string value;
778 Status s;
779
780 txn_options.set_snapshot = true;
781 Transaction* txn1 = txn_db->BeginTransaction(write_options, txn_options);
782 read_options1.snapshot = txn1->GetSnapshot();
783
784 Transaction* txn2 = txn_db->BeginTransaction(write_options);
785 txn2->SetSnapshot();
786 read_options2.snapshot = txn2->GetSnapshot();
787
788 std::vector<Slice> multiget_keys = {"1", "2", "3"};
789 std::vector<std::string> multiget_values;
790
791 std::vector<Status> results =
792 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
793 ASSERT_TRUE(results[1].IsNotFound());
794
795 txn2->Put("2", "x");
796
797 s = txn2->Commit();
798 ASSERT_OK(s);
799
800 multiget_values.clear();
801 results =
802 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
803 ASSERT_TRUE(results[1].IsNotFound());
804
805 // should not commit since txn2 wrote a key txn has read
806 s = txn1->Commit();
807 ASSERT_TRUE(s.IsBusy());
808
809 delete txn1;
810 delete txn2;
811
812 txn1 = txn_db->BeginTransaction(write_options, txn_options);
813 read_options1.snapshot = txn1->GetSnapshot();
814
815 txn2 = txn_db->BeginTransaction(write_options, txn_options);
816 read_options2.snapshot = txn2->GetSnapshot();
817
818 txn1->Put("4", "x");
819
820 txn2->Delete("4");
821
822 // txn1 can commit since txn2's delete hasn't happened yet (it's just batched)
823 s = txn1->Commit();
824 ASSERT_OK(s);
825
826 s = txn2->GetForUpdate(read_options2, "4", &value);
827 ASSERT_TRUE(s.IsNotFound());
828
829 // txn2 cannot commit since txn1 changed "4"
830 s = txn2->Commit();
831 ASSERT_TRUE(s.IsBusy());
832
833 delete txn1;
834 delete txn2;
835}
836
f67539c2 837TEST_P(OptimisticTransactionTest, LostUpdate) {
7c673cae
FG
838 WriteOptions write_options;
839 ReadOptions read_options, read_options1, read_options2;
840 OptimisticTransactionOptions txn_options;
841 string value;
842 Status s;
843
844 // Test 2 transactions writing to the same key in multiple orders and
845 // with/without snapshots
846
847 Transaction* txn1 = txn_db->BeginTransaction(write_options);
848 Transaction* txn2 = txn_db->BeginTransaction(write_options);
849
850 txn1->Put("1", "1");
851 txn2->Put("1", "2");
852
853 s = txn1->Commit();
854 ASSERT_OK(s);
855
856 s = txn2->Commit();
857 ASSERT_TRUE(s.IsBusy());
858
859 delete txn1;
860 delete txn2;
861
862 txn_options.set_snapshot = true;
863 txn1 = txn_db->BeginTransaction(write_options, txn_options);
864 read_options1.snapshot = txn1->GetSnapshot();
865
866 txn2 = txn_db->BeginTransaction(write_options, txn_options);
867 read_options2.snapshot = txn2->GetSnapshot();
868
869 txn1->Put("1", "3");
870 txn2->Put("1", "4");
871
872 s = txn1->Commit();
873 ASSERT_OK(s);
874
875 s = txn2->Commit();
876 ASSERT_TRUE(s.IsBusy());
877
878 delete txn1;
879 delete txn2;
880
881 txn1 = txn_db->BeginTransaction(write_options, txn_options);
882 read_options1.snapshot = txn1->GetSnapshot();
883
884 txn2 = txn_db->BeginTransaction(write_options, txn_options);
885 read_options2.snapshot = txn2->GetSnapshot();
886
887 txn1->Put("1", "5");
888 s = txn1->Commit();
889 ASSERT_OK(s);
890
891 txn2->Put("1", "6");
892 s = txn2->Commit();
893 ASSERT_TRUE(s.IsBusy());
894
895 delete txn1;
896 delete txn2;
897
898 txn1 = txn_db->BeginTransaction(write_options, txn_options);
899 read_options1.snapshot = txn1->GetSnapshot();
900
901 txn2 = txn_db->BeginTransaction(write_options, txn_options);
902 read_options2.snapshot = txn2->GetSnapshot();
903
904 txn1->Put("1", "5");
905 s = txn1->Commit();
906 ASSERT_OK(s);
907
908 txn2->SetSnapshot();
909 txn2->Put("1", "6");
910 s = txn2->Commit();
911 ASSERT_OK(s);
912
913 delete txn1;
914 delete txn2;
915
916 txn1 = txn_db->BeginTransaction(write_options);
917 txn2 = txn_db->BeginTransaction(write_options);
918
919 txn1->Put("1", "7");
920 s = txn1->Commit();
921 ASSERT_OK(s);
922
923 txn2->Put("1", "8");
924 s = txn2->Commit();
925 ASSERT_OK(s);
926
927 delete txn1;
928 delete txn2;
929
11fdf7f2 930 s = txn_db->Get(read_options, "1", &value);
7c673cae
FG
931 ASSERT_OK(s);
932 ASSERT_EQ(value, "8");
933}
934
f67539c2 935TEST_P(OptimisticTransactionTest, UntrackedWrites) {
7c673cae
FG
936 WriteOptions write_options;
937 ReadOptions read_options;
938 string value;
939 Status s;
940
941 // Verify transaction rollback works for untracked keys.
942 Transaction* txn = txn_db->BeginTransaction(write_options);
943 txn->PutUntracked("untracked", "0");
944 txn->Rollback();
11fdf7f2 945 s = txn_db->Get(read_options, "untracked", &value);
7c673cae
FG
946 ASSERT_TRUE(s.IsNotFound());
947
948 delete txn;
949 txn = txn_db->BeginTransaction(write_options);
950
951 txn->Put("tracked", "1");
952 txn->PutUntracked("untracked", "1");
953 txn->MergeUntracked("untracked", "2");
954 txn->DeleteUntracked("untracked");
955
956 // Write to the untracked key outside of the transaction and verify
957 // it doesn't prevent the transaction from committing.
11fdf7f2 958 s = txn_db->Put(write_options, "untracked", "x");
7c673cae
FG
959 ASSERT_OK(s);
960
961 s = txn->Commit();
962 ASSERT_OK(s);
963
11fdf7f2 964 s = txn_db->Get(read_options, "untracked", &value);
7c673cae
FG
965 ASSERT_TRUE(s.IsNotFound());
966
967 delete txn;
968 txn = txn_db->BeginTransaction(write_options);
969
970 txn->Put("tracked", "10");
971 txn->PutUntracked("untracked", "A");
972
973 // Write to tracked key outside of the transaction and verify that the
974 // untracked keys are not written when the commit fails.
11fdf7f2 975 s = txn_db->Delete(write_options, "tracked");
7c673cae
FG
976
977 s = txn->Commit();
978 ASSERT_TRUE(s.IsBusy());
979
11fdf7f2 980 s = txn_db->Get(read_options, "untracked", &value);
7c673cae
FG
981 ASSERT_TRUE(s.IsNotFound());
982
983 delete txn;
984}
985
f67539c2 986TEST_P(OptimisticTransactionTest, IteratorTest) {
7c673cae
FG
987 WriteOptions write_options;
988 ReadOptions read_options, snapshot_read_options;
989 OptimisticTransactionOptions txn_options;
990 string value;
991 Status s;
992
993 // Write some keys to the db
11fdf7f2 994 s = txn_db->Put(write_options, "A", "a");
7c673cae
FG
995 ASSERT_OK(s);
996
11fdf7f2 997 s = txn_db->Put(write_options, "G", "g");
7c673cae
FG
998 ASSERT_OK(s);
999
11fdf7f2 1000 s = txn_db->Put(write_options, "F", "f");
7c673cae
FG
1001 ASSERT_OK(s);
1002
11fdf7f2 1003 s = txn_db->Put(write_options, "C", "c");
7c673cae
FG
1004 ASSERT_OK(s);
1005
11fdf7f2 1006 s = txn_db->Put(write_options, "D", "d");
7c673cae
FG
1007 ASSERT_OK(s);
1008
1009 Transaction* txn = txn_db->BeginTransaction(write_options);
1010 ASSERT_TRUE(txn);
1011
1012 // Write some keys in a txn
1013 s = txn->Put("B", "b");
1014 ASSERT_OK(s);
1015
1016 s = txn->Put("H", "h");
1017 ASSERT_OK(s);
1018
1019 s = txn->Delete("D");
1020 ASSERT_OK(s);
1021
1022 s = txn->Put("E", "e");
1023 ASSERT_OK(s);
1024
1025 txn->SetSnapshot();
1026 const Snapshot* snapshot = txn->GetSnapshot();
1027
1028 // Write some keys to the db after the snapshot
11fdf7f2 1029 s = txn_db->Put(write_options, "BB", "xx");
7c673cae
FG
1030 ASSERT_OK(s);
1031
11fdf7f2 1032 s = txn_db->Put(write_options, "C", "xx");
7c673cae
FG
1033 ASSERT_OK(s);
1034
1035 read_options.snapshot = snapshot;
1036 Iterator* iter = txn->GetIterator(read_options);
1037 ASSERT_OK(iter->status());
1038 iter->SeekToFirst();
1039
1040 // Read all keys via iter and lock them all
1041 std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
1042 for (int i = 0; i < 7; i++) {
1043 ASSERT_OK(iter->status());
1044 ASSERT_TRUE(iter->Valid());
1045 ASSERT_EQ(results[i], iter->value().ToString());
1046
1047 s = txn->GetForUpdate(read_options, iter->key(), nullptr);
1048 ASSERT_OK(s);
1049
1050 iter->Next();
1051 }
1052 ASSERT_FALSE(iter->Valid());
1053
1054 iter->Seek("G");
1055 ASSERT_OK(iter->status());
1056 ASSERT_TRUE(iter->Valid());
1057 ASSERT_EQ("g", iter->value().ToString());
1058
1059 iter->Prev();
1060 ASSERT_OK(iter->status());
1061 ASSERT_TRUE(iter->Valid());
1062 ASSERT_EQ("f", iter->value().ToString());
1063
1064 iter->Seek("D");
1065 ASSERT_OK(iter->status());
1066 ASSERT_TRUE(iter->Valid());
1067 ASSERT_EQ("e", iter->value().ToString());
1068
1069 iter->Seek("C");
1070 ASSERT_OK(iter->status());
1071 ASSERT_TRUE(iter->Valid());
1072 ASSERT_EQ("c", iter->value().ToString());
1073
1074 iter->Next();
1075 ASSERT_OK(iter->status());
1076 ASSERT_TRUE(iter->Valid());
1077 ASSERT_EQ("e", iter->value().ToString());
1078
1079 iter->Seek("");
1080 ASSERT_OK(iter->status());
1081 ASSERT_TRUE(iter->Valid());
1082 ASSERT_EQ("a", iter->value().ToString());
1083
1084 iter->Seek("X");
1085 ASSERT_OK(iter->status());
1086 ASSERT_FALSE(iter->Valid());
1087
1088 iter->SeekToLast();
1089 ASSERT_OK(iter->status());
1090 ASSERT_TRUE(iter->Valid());
1091 ASSERT_EQ("h", iter->value().ToString());
1092
1093 // key "C" was modified in the db after txn's snapshot. txn will not commit.
1094 s = txn->Commit();
1095 ASSERT_TRUE(s.IsBusy());
1096
1097 delete iter;
1098 delete txn;
1099}
1100
20effc67
TL
1101TEST_P(OptimisticTransactionTest, DeleteRangeSupportTest) {
1102 // `OptimisticTransactionDB` does not allow range deletion in any API.
1103 ASSERT_TRUE(
1104 txn_db
1105 ->DeleteRange(WriteOptions(), txn_db->DefaultColumnFamily(), "a", "b")
1106 .IsNotSupported());
1107 WriteBatch wb;
1108 ASSERT_OK(wb.DeleteRange("a", "b"));
1109 ASSERT_NOK(txn_db->Write(WriteOptions(), &wb));
1110}
1111
f67539c2 1112TEST_P(OptimisticTransactionTest, SavepointTest) {
7c673cae
FG
1113 WriteOptions write_options;
1114 ReadOptions read_options, snapshot_read_options;
1115 OptimisticTransactionOptions txn_options;
1116 string value;
1117 Status s;
1118
1119 Transaction* txn = txn_db->BeginTransaction(write_options);
1120 ASSERT_TRUE(txn);
1121
1122 s = txn->RollbackToSavePoint();
1123 ASSERT_TRUE(s.IsNotFound());
1124
1125 txn->SetSavePoint(); // 1
1126
1127 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
1128 s = txn->RollbackToSavePoint();
1129 ASSERT_TRUE(s.IsNotFound());
1130
1131 s = txn->Put("B", "b");
1132 ASSERT_OK(s);
1133
1134 s = txn->Commit();
1135 ASSERT_OK(s);
1136
11fdf7f2 1137 s = txn_db->Get(read_options, "B", &value);
7c673cae
FG
1138 ASSERT_OK(s);
1139 ASSERT_EQ("b", value);
1140
1141 delete txn;
1142 txn = txn_db->BeginTransaction(write_options);
1143 ASSERT_TRUE(txn);
1144
1145 s = txn->Put("A", "a");
1146 ASSERT_OK(s);
1147
1148 s = txn->Put("B", "bb");
1149 ASSERT_OK(s);
1150
1151 s = txn->Put("C", "c");
1152 ASSERT_OK(s);
1153
1154 txn->SetSavePoint(); // 2
1155
1156 s = txn->Delete("B");
1157 ASSERT_OK(s);
1158
1159 s = txn->Put("C", "cc");
1160 ASSERT_OK(s);
1161
1162 s = txn->Put("D", "d");
1163 ASSERT_OK(s);
1164
1165 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
1166
1167 s = txn->Get(read_options, "A", &value);
1168 ASSERT_OK(s);
1169 ASSERT_EQ("a", value);
1170
1171 s = txn->Get(read_options, "B", &value);
1172 ASSERT_OK(s);
1173 ASSERT_EQ("bb", value);
1174
1175 s = txn->Get(read_options, "C", &value);
1176 ASSERT_OK(s);
1177 ASSERT_EQ("c", value);
1178
1179 s = txn->Get(read_options, "D", &value);
1180 ASSERT_TRUE(s.IsNotFound());
1181
1182 s = txn->Put("A", "a");
1183 ASSERT_OK(s);
1184
1185 s = txn->Put("E", "e");
1186 ASSERT_OK(s);
1187
1188 // Rollback to beginning of txn
1189 s = txn->RollbackToSavePoint();
1190 ASSERT_TRUE(s.IsNotFound());
1191 txn->Rollback();
1192
1193 s = txn->Get(read_options, "A", &value);
1194 ASSERT_TRUE(s.IsNotFound());
1195
1196 s = txn->Get(read_options, "B", &value);
1197 ASSERT_OK(s);
1198 ASSERT_EQ("b", value);
1199
1200 s = txn->Get(read_options, "D", &value);
1201 ASSERT_TRUE(s.IsNotFound());
1202
1203 s = txn->Get(read_options, "D", &value);
1204 ASSERT_TRUE(s.IsNotFound());
1205
1206 s = txn->Get(read_options, "E", &value);
1207 ASSERT_TRUE(s.IsNotFound());
1208
1209 s = txn->Put("A", "aa");
1210 ASSERT_OK(s);
1211
1212 s = txn->Put("F", "f");
1213 ASSERT_OK(s);
1214
1215 txn->SetSavePoint(); // 3
1216 txn->SetSavePoint(); // 4
1217
1218 s = txn->Put("G", "g");
1219 ASSERT_OK(s);
1220
1221 s = txn->Delete("F");
1222 ASSERT_OK(s);
1223
1224 s = txn->Delete("B");
1225 ASSERT_OK(s);
1226
1227 s = txn->Get(read_options, "A", &value);
1228 ASSERT_OK(s);
1229 ASSERT_EQ("aa", value);
1230
1231 s = txn->Get(read_options, "F", &value);
1232 ASSERT_TRUE(s.IsNotFound());
1233
1234 s = txn->Get(read_options, "B", &value);
1235 ASSERT_TRUE(s.IsNotFound());
1236
1237 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
1238
1239 s = txn->Get(read_options, "F", &value);
1240 ASSERT_OK(s);
1241 ASSERT_EQ("f", value);
1242
1243 s = txn->Get(read_options, "G", &value);
1244 ASSERT_TRUE(s.IsNotFound());
1245
1246 s = txn->Commit();
1247 ASSERT_OK(s);
1248
11fdf7f2 1249 s = txn_db->Get(read_options, "F", &value);
7c673cae
FG
1250 ASSERT_OK(s);
1251 ASSERT_EQ("f", value);
1252
11fdf7f2 1253 s = txn_db->Get(read_options, "G", &value);
7c673cae
FG
1254 ASSERT_TRUE(s.IsNotFound());
1255
11fdf7f2 1256 s = txn_db->Get(read_options, "A", &value);
7c673cae
FG
1257 ASSERT_OK(s);
1258 ASSERT_EQ("aa", value);
1259
11fdf7f2 1260 s = txn_db->Get(read_options, "B", &value);
7c673cae
FG
1261 ASSERT_OK(s);
1262 ASSERT_EQ("b", value);
1263
11fdf7f2 1264 s = txn_db->Get(read_options, "C", &value);
7c673cae
FG
1265 ASSERT_TRUE(s.IsNotFound());
1266
11fdf7f2 1267 s = txn_db->Get(read_options, "D", &value);
7c673cae
FG
1268 ASSERT_TRUE(s.IsNotFound());
1269
11fdf7f2 1270 s = txn_db->Get(read_options, "E", &value);
7c673cae
FG
1271 ASSERT_TRUE(s.IsNotFound());
1272
1273 delete txn;
1274}
1275
f67539c2 1276TEST_P(OptimisticTransactionTest, UndoGetForUpdateTest) {
7c673cae
FG
1277 WriteOptions write_options;
1278 ReadOptions read_options, snapshot_read_options;
1279 OptimisticTransactionOptions txn_options;
1280 string value;
1281 Status s;
1282
11fdf7f2 1283 txn_db->Put(write_options, "A", "");
7c673cae
FG
1284
1285 Transaction* txn1 = txn_db->BeginTransaction(write_options);
1286 ASSERT_TRUE(txn1);
1287
1288 s = txn1->GetForUpdate(read_options, "A", &value);
1289 ASSERT_OK(s);
1290
1291 txn1->UndoGetForUpdate("A");
1292
1293 Transaction* txn2 = txn_db->BeginTransaction(write_options);
1294 txn2->Put("A", "x");
1295 s = txn2->Commit();
1296 ASSERT_OK(s);
1297 delete txn2;
1298
1299 // Verify that txn1 can commit since A isn't conflict checked
1300 s = txn1->Commit();
1301 ASSERT_OK(s);
1302 delete txn1;
1303
1304 txn1 = txn_db->BeginTransaction(write_options);
1305 txn1->Put("A", "a");
1306
1307 s = txn1->GetForUpdate(read_options, "A", &value);
1308 ASSERT_OK(s);
1309
1310 txn1->UndoGetForUpdate("A");
1311
1312 txn2 = txn_db->BeginTransaction(write_options);
1313 txn2->Put("A", "x");
1314 s = txn2->Commit();
1315 ASSERT_OK(s);
1316 delete txn2;
1317
1318 // Verify that txn1 cannot commit since A will still be conflict checked
1319 s = txn1->Commit();
1320 ASSERT_TRUE(s.IsBusy());
1321 delete txn1;
1322
1323 txn1 = txn_db->BeginTransaction(write_options);
1324
1325 s = txn1->GetForUpdate(read_options, "A", &value);
1326 ASSERT_OK(s);
1327 s = txn1->GetForUpdate(read_options, "A", &value);
1328 ASSERT_OK(s);
1329
1330 txn1->UndoGetForUpdate("A");
1331
1332 txn2 = txn_db->BeginTransaction(write_options);
1333 txn2->Put("A", "x");
1334 s = txn2->Commit();
1335 ASSERT_OK(s);
1336 delete txn2;
1337
1338 // Verify that txn1 cannot commit since A will still be conflict checked
1339 s = txn1->Commit();
1340 ASSERT_TRUE(s.IsBusy());
1341 delete txn1;
1342
1343 txn1 = txn_db->BeginTransaction(write_options);
1344
1345 s = txn1->GetForUpdate(read_options, "A", &value);
1346 ASSERT_OK(s);
1347 s = txn1->GetForUpdate(read_options, "A", &value);
1348 ASSERT_OK(s);
1349
1350 txn1->UndoGetForUpdate("A");
1351 txn1->UndoGetForUpdate("A");
1352
1353 txn2 = txn_db->BeginTransaction(write_options);
1354 txn2->Put("A", "x");
1355 s = txn2->Commit();
1356 ASSERT_OK(s);
1357 delete txn2;
1358
1359 // Verify that txn1 can commit since A isn't conflict checked
1360 s = txn1->Commit();
1361 ASSERT_OK(s);
1362 delete txn1;
1363
1364 txn1 = txn_db->BeginTransaction(write_options);
1365
1366 s = txn1->GetForUpdate(read_options, "A", &value);
1367 ASSERT_OK(s);
1368
1369 txn1->SetSavePoint();
1370 txn1->UndoGetForUpdate("A");
1371
1372 txn2 = txn_db->BeginTransaction(write_options);
1373 txn2->Put("A", "x");
1374 s = txn2->Commit();
1375 ASSERT_OK(s);
1376 delete txn2;
1377
1378 // Verify that txn1 cannot commit since A will still be conflict checked
1379 s = txn1->Commit();
1380 ASSERT_TRUE(s.IsBusy());
1381 delete txn1;
1382
1383 txn1 = txn_db->BeginTransaction(write_options);
1384
1385 s = txn1->GetForUpdate(read_options, "A", &value);
1386 ASSERT_OK(s);
1387
1388 txn1->SetSavePoint();
1389 s = txn1->GetForUpdate(read_options, "A", &value);
1390 ASSERT_OK(s);
1391 txn1->UndoGetForUpdate("A");
1392
1393 txn2 = txn_db->BeginTransaction(write_options);
1394 txn2->Put("A", "x");
1395 s = txn2->Commit();
1396 ASSERT_OK(s);
1397 delete txn2;
1398
1399 // Verify that txn1 cannot commit since A will still be conflict checked
1400 s = txn1->Commit();
1401 ASSERT_TRUE(s.IsBusy());
1402 delete txn1;
1403
1404 txn1 = txn_db->BeginTransaction(write_options);
1405
1406 s = txn1->GetForUpdate(read_options, "A", &value);
1407 ASSERT_OK(s);
1408
1409 txn1->SetSavePoint();
1410 s = txn1->GetForUpdate(read_options, "A", &value);
1411 ASSERT_OK(s);
1412 txn1->UndoGetForUpdate("A");
1413
1414 txn1->RollbackToSavePoint();
1415 txn1->UndoGetForUpdate("A");
1416
1417 txn2 = txn_db->BeginTransaction(write_options);
1418 txn2->Put("A", "x");
1419 s = txn2->Commit();
1420 ASSERT_OK(s);
1421 delete txn2;
1422
1423 // Verify that txn1 can commit since A isn't conflict checked
1424 s = txn1->Commit();
1425 ASSERT_OK(s);
1426 delete txn1;
1427}
1428
1429namespace {
1430Status OptimisticTransactionStressTestInserter(OptimisticTransactionDB* db,
1431 const size_t num_transactions,
1432 const size_t num_sets,
1433 const size_t num_keys_per_set) {
1434 size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
1435 Random64 _rand(seed);
1436 WriteOptions write_options;
1437 ReadOptions read_options;
1438 OptimisticTransactionOptions txn_options;
1439 txn_options.set_snapshot = true;
1440
1441 RandomTransactionInserter inserter(&_rand, write_options, read_options,
1442 num_keys_per_set,
1443 static_cast<uint16_t>(num_sets));
1444
1445 for (size_t t = 0; t < num_transactions; t++) {
1446 bool success = inserter.OptimisticTransactionDBInsert(db, txn_options);
1447 if (!success) {
1448 // unexpected failure
1449 return inserter.GetLastStatus();
1450 }
1451 }
1452
1453 // Make sure at least some of the transactions succeeded. It's ok if
1454 // some failed due to write-conflicts.
1455 if (inserter.GetFailureCount() > num_transactions / 2) {
1456 return Status::TryAgain("Too many transactions failed! " +
1457 std::to_string(inserter.GetFailureCount()) + " / " +
1458 std::to_string(num_transactions));
1459 }
1460
1461 return Status::OK();
1462}
1463} // namespace
1464
f67539c2 1465TEST_P(OptimisticTransactionTest, OptimisticTransactionStressTest) {
7c673cae
FG
1466 const size_t num_threads = 4;
1467 const size_t num_transactions_per_thread = 10000;
1468 const size_t num_sets = 3;
1469 const size_t num_keys_per_set = 100;
1470 // Setting the key-space to be 100 keys should cause enough write-conflicts
1471 // to make this test interesting.
1472
1473 std::vector<port::Thread> threads;
1474
1475 std::function<void()> call_inserter = [&] {
1476 ASSERT_OK(OptimisticTransactionStressTestInserter(
1477 txn_db, num_transactions_per_thread, num_sets, num_keys_per_set));
1478 };
1479
1480 // Create N threads that use RandomTransactionInserter to write
1481 // many transactions.
1482 for (uint32_t i = 0; i < num_threads; i++) {
1483 threads.emplace_back(call_inserter);
1484 }
1485
1486 // Wait for all threads to run
1487 for (auto& t : threads) {
1488 t.join();
1489 }
1490
1491 // Verify that data is consistent
11fdf7f2 1492 Status s = RandomTransactionInserter::Verify(txn_db, num_sets);
7c673cae
FG
1493 ASSERT_OK(s);
1494}
1495
f67539c2 1496TEST_P(OptimisticTransactionTest, SequenceNumberAfterRecoverTest) {
7c673cae
FG
1497 WriteOptions write_options;
1498 OptimisticTransactionOptions transaction_options;
1499
1500 Transaction* transaction(txn_db->BeginTransaction(write_options, transaction_options));
1501 Status s = transaction->Put("foo", "val");
1502 ASSERT_OK(s);
1503 s = transaction->Put("foo2", "val");
1504 ASSERT_OK(s);
1505 s = transaction->Put("foo3", "val");
1506 ASSERT_OK(s);
1507 s = transaction->Commit();
1508 ASSERT_OK(s);
1509 delete transaction;
1510
1511 Reopen();
1512 transaction = txn_db->BeginTransaction(write_options, transaction_options);
1513 s = transaction->Put("bar", "val");
1514 ASSERT_OK(s);
1515 s = transaction->Put("bar2", "val");
1516 ASSERT_OK(s);
1517 s = transaction->Commit();
1518 ASSERT_OK(s);
1519
1520 delete transaction;
1521}
1522
f67539c2
TL
1523INSTANTIATE_TEST_CASE_P(
1524 InstanceOccGroup, OptimisticTransactionTest,
1525 testing::Values(OccValidationPolicy::kValidateSerial,
1526 OccValidationPolicy::kValidateParallel));
1527
1528} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
1529
1530int main(int argc, char** argv) {
1531 ::testing::InitGoogleTest(&argc, argv);
1532 return RUN_ALL_TESTS();
1533}
1534
1535#else
1536#include <stdio.h>
1537
11fdf7f2 1538int main(int /*argc*/, char** /*argv*/) {
7c673cae
FG
1539 fprintf(
1540 stderr,
1541 "SKIPPED as optimistic_transaction is not supported in ROCKSDB_LITE\n");
1542 return 0;
1543}
1544
1545#endif // !ROCKSDB_LITE