]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include <functional> | |
9 | #include <string> | |
10 | #include <thread> | |
11 | ||
f67539c2 | 12 | #include "db/db_impl/db_impl.h" |
f67539c2 | 13 | #include "port/port.h" |
7c673cae | 14 | #include "rocksdb/db.h" |
f67539c2 | 15 | #include "rocksdb/perf_context.h" |
7c673cae FG |
16 | #include "rocksdb/utilities/optimistic_transaction_db.h" |
17 | #include "rocksdb/utilities/transaction.h" | |
f67539c2 TL |
18 | #include "test_util/sync_point.h" |
19 | #include "test_util/testharness.h" | |
20 | #include "test_util/transaction_test_util.h" | |
7c673cae | 21 | #include "util/crc32c.h" |
7c673cae | 22 | #include "util/random.h" |
7c673cae FG |
23 | |
24 | using std::string; | |
25 | ||
f67539c2 | 26 | namespace ROCKSDB_NAMESPACE { |
7c673cae | 27 | |
f67539c2 TL |
28 | class OptimisticTransactionTest |
29 | : public testing::Test, | |
30 | public testing::WithParamInterface<OccValidationPolicy> { | |
7c673cae FG |
31 | public: |
32 | OptimisticTransactionDB* txn_db; | |
7c673cae FG |
33 | string dbname; |
34 | Options options; | |
35 | ||
36 | OptimisticTransactionTest() { | |
37 | options.create_if_missing = true; | |
38 | options.max_write_buffer_number = 2; | |
f67539c2 | 39 | options.max_write_buffer_size_to_maintain = 1600; |
11fdf7f2 | 40 | dbname = test::PerThreadDBPath("optimistic_transaction_testdb"); |
7c673cae FG |
41 | |
42 | DestroyDB(dbname, options); | |
43 | Open(); | |
44 | } | |
494da23a | 45 | ~OptimisticTransactionTest() override { |
7c673cae FG |
46 | delete txn_db; |
47 | DestroyDB(dbname, options); | |
48 | } | |
49 | ||
50 | void Reopen() { | |
51 | delete txn_db; | |
11fdf7f2 | 52 | txn_db = nullptr; |
7c673cae FG |
53 | Open(); |
54 | } | |
55 | ||
56 | private: | |
57 | void Open() { | |
f67539c2 TL |
58 | ColumnFamilyOptions cf_options(options); |
59 | OptimisticTransactionDBOptions occ_opts; | |
60 | occ_opts.validate_policy = GetParam(); | |
61 | std::vector<ColumnFamilyDescriptor> column_families; | |
62 | std::vector<ColumnFamilyHandle*> handles; | |
63 | column_families.push_back( | |
64 | ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); | |
65 | Status s = | |
66 | OptimisticTransactionDB::Open(DBOptions(options), occ_opts, dbname, | |
67 | column_families, &handles, &txn_db); | |
68 | ||
7c673cae | 69 | assert(s.ok()); |
11fdf7f2 | 70 | assert(txn_db != nullptr); |
f67539c2 TL |
71 | assert(handles.size() == 1); |
72 | delete handles[0]; | |
7c673cae FG |
73 | } |
74 | }; | |
75 | ||
f67539c2 | 76 | TEST_P(OptimisticTransactionTest, SuccessTest) { |
7c673cae FG |
77 | WriteOptions write_options; |
78 | ReadOptions read_options; | |
79 | string value; | |
80 | Status s; | |
81 | ||
11fdf7f2 TL |
82 | txn_db->Put(write_options, Slice("foo"), Slice("bar")); |
83 | txn_db->Put(write_options, Slice("foo2"), Slice("bar")); | |
7c673cae FG |
84 | |
85 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
86 | ASSERT_TRUE(txn); | |
87 | ||
88 | txn->GetForUpdate(read_options, "foo", &value); | |
89 | ASSERT_EQ(value, "bar"); | |
90 | ||
91 | txn->Put(Slice("foo"), Slice("bar2")); | |
92 | ||
93 | txn->GetForUpdate(read_options, "foo", &value); | |
94 | ASSERT_EQ(value, "bar2"); | |
95 | ||
96 | s = txn->Commit(); | |
97 | ASSERT_OK(s); | |
98 | ||
11fdf7f2 | 99 | txn_db->Get(read_options, "foo", &value); |
7c673cae FG |
100 | ASSERT_EQ(value, "bar2"); |
101 | ||
102 | delete txn; | |
103 | } | |
104 | ||
f67539c2 | 105 | TEST_P(OptimisticTransactionTest, WriteConflictTest) { |
7c673cae FG |
106 | WriteOptions write_options; |
107 | ReadOptions read_options; | |
108 | string value; | |
109 | Status s; | |
110 | ||
11fdf7f2 TL |
111 | txn_db->Put(write_options, "foo", "bar"); |
112 | txn_db->Put(write_options, "foo2", "bar"); | |
7c673cae FG |
113 | |
114 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
115 | ASSERT_TRUE(txn); | |
116 | ||
117 | txn->Put("foo", "bar2"); | |
118 | ||
119 | // This Put outside of a transaction will conflict with the previous write | |
11fdf7f2 | 120 | s = txn_db->Put(write_options, "foo", "barz"); |
7c673cae FG |
121 | ASSERT_OK(s); |
122 | ||
11fdf7f2 | 123 | s = txn_db->Get(read_options, "foo", &value); |
7c673cae FG |
124 | ASSERT_EQ(value, "barz"); |
125 | ASSERT_EQ(1, txn->GetNumKeys()); | |
126 | ||
127 | s = txn->Commit(); | |
128 | ASSERT_TRUE(s.IsBusy()); // Txn should not commit | |
129 | ||
130 | // Verify that transaction did not write anything | |
11fdf7f2 | 131 | txn_db->Get(read_options, "foo", &value); |
7c673cae | 132 | ASSERT_EQ(value, "barz"); |
11fdf7f2 | 133 | txn_db->Get(read_options, "foo2", &value); |
7c673cae FG |
134 | ASSERT_EQ(value, "bar"); |
135 | ||
136 | delete txn; | |
137 | } | |
138 | ||
f67539c2 | 139 | TEST_P(OptimisticTransactionTest, WriteConflictTest2) { |
7c673cae FG |
140 | WriteOptions write_options; |
141 | ReadOptions read_options; | |
142 | OptimisticTransactionOptions txn_options; | |
143 | string value; | |
144 | Status s; | |
145 | ||
11fdf7f2 TL |
146 | txn_db->Put(write_options, "foo", "bar"); |
147 | txn_db->Put(write_options, "foo2", "bar"); | |
7c673cae FG |
148 | |
149 | txn_options.set_snapshot = true; | |
150 | Transaction* txn = txn_db->BeginTransaction(write_options, txn_options); | |
151 | ASSERT_TRUE(txn); | |
152 | ||
153 | // This Put outside of a transaction will conflict with a later write | |
11fdf7f2 | 154 | s = txn_db->Put(write_options, "foo", "barz"); |
7c673cae FG |
155 | ASSERT_OK(s); |
156 | ||
157 | txn->Put("foo", "bar2"); // Conflicts with write done after snapshot taken | |
158 | ||
11fdf7f2 | 159 | s = txn_db->Get(read_options, "foo", &value); |
7c673cae FG |
160 | ASSERT_EQ(value, "barz"); |
161 | ||
162 | s = txn->Commit(); | |
163 | ASSERT_TRUE(s.IsBusy()); // Txn should not commit | |
164 | ||
165 | // Verify that transaction did not write anything | |
11fdf7f2 | 166 | txn_db->Get(read_options, "foo", &value); |
7c673cae | 167 | ASSERT_EQ(value, "barz"); |
11fdf7f2 | 168 | txn_db->Get(read_options, "foo2", &value); |
7c673cae FG |
169 | ASSERT_EQ(value, "bar"); |
170 | ||
171 | delete txn; | |
172 | } | |
173 | ||
f67539c2 | 174 | TEST_P(OptimisticTransactionTest, ReadConflictTest) { |
7c673cae FG |
175 | WriteOptions write_options; |
176 | ReadOptions read_options, snapshot_read_options; | |
177 | OptimisticTransactionOptions txn_options; | |
178 | string value; | |
179 | Status s; | |
180 | ||
11fdf7f2 TL |
181 | txn_db->Put(write_options, "foo", "bar"); |
182 | txn_db->Put(write_options, "foo2", "bar"); | |
7c673cae FG |
183 | |
184 | txn_options.set_snapshot = true; | |
185 | Transaction* txn = txn_db->BeginTransaction(write_options, txn_options); | |
186 | ASSERT_TRUE(txn); | |
187 | ||
188 | txn->SetSnapshot(); | |
189 | snapshot_read_options.snapshot = txn->GetSnapshot(); | |
190 | ||
191 | txn->GetForUpdate(snapshot_read_options, "foo", &value); | |
192 | ASSERT_EQ(value, "bar"); | |
193 | ||
194 | // This Put outside of a transaction will conflict with the previous read | |
11fdf7f2 | 195 | s = txn_db->Put(write_options, "foo", "barz"); |
7c673cae FG |
196 | ASSERT_OK(s); |
197 | ||
11fdf7f2 | 198 | s = txn_db->Get(read_options, "foo", &value); |
7c673cae FG |
199 | ASSERT_EQ(value, "barz"); |
200 | ||
201 | s = txn->Commit(); | |
202 | ASSERT_TRUE(s.IsBusy()); // Txn should not commit | |
203 | ||
204 | // Verify that transaction did not write anything | |
205 | txn->GetForUpdate(read_options, "foo", &value); | |
206 | ASSERT_EQ(value, "barz"); | |
207 | txn->GetForUpdate(read_options, "foo2", &value); | |
208 | ASSERT_EQ(value, "bar"); | |
209 | ||
210 | delete txn; | |
211 | } | |
212 | ||
f67539c2 | 213 | TEST_P(OptimisticTransactionTest, TxnOnlyTest) { |
7c673cae FG |
214 | // Test to make sure transactions work when there are no other writes in an |
215 | // empty db. | |
216 | ||
217 | WriteOptions write_options; | |
218 | ReadOptions read_options; | |
219 | string value; | |
220 | Status s; | |
221 | ||
222 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
223 | ASSERT_TRUE(txn); | |
224 | ||
225 | txn->Put("x", "y"); | |
226 | ||
227 | s = txn->Commit(); | |
228 | ASSERT_OK(s); | |
229 | ||
230 | delete txn; | |
231 | } | |
232 | ||
f67539c2 | 233 | TEST_P(OptimisticTransactionTest, FlushTest) { |
7c673cae FG |
234 | WriteOptions write_options; |
235 | ReadOptions read_options, snapshot_read_options; | |
236 | string value; | |
237 | Status s; | |
238 | ||
11fdf7f2 TL |
239 | txn_db->Put(write_options, Slice("foo"), Slice("bar")); |
240 | txn_db->Put(write_options, Slice("foo2"), Slice("bar")); | |
7c673cae FG |
241 | |
242 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
243 | ASSERT_TRUE(txn); | |
244 | ||
245 | snapshot_read_options.snapshot = txn->GetSnapshot(); | |
246 | ||
247 | txn->GetForUpdate(snapshot_read_options, "foo", &value); | |
248 | ASSERT_EQ(value, "bar"); | |
249 | ||
250 | txn->Put(Slice("foo"), Slice("bar2")); | |
251 | ||
252 | txn->GetForUpdate(snapshot_read_options, "foo", &value); | |
253 | ASSERT_EQ(value, "bar2"); | |
254 | ||
255 | // Put a random key so we have a memtable to flush | |
11fdf7f2 | 256 | s = txn_db->Put(write_options, "dummy", "dummy"); |
7c673cae FG |
257 | ASSERT_OK(s); |
258 | ||
259 | // force a memtable flush | |
260 | FlushOptions flush_ops; | |
11fdf7f2 | 261 | txn_db->Flush(flush_ops); |
7c673cae FG |
262 | |
263 | s = txn->Commit(); | |
264 | // txn should commit since the flushed table is still in MemtableList History | |
265 | ASSERT_OK(s); | |
266 | ||
11fdf7f2 | 267 | txn_db->Get(read_options, "foo", &value); |
7c673cae FG |
268 | ASSERT_EQ(value, "bar2"); |
269 | ||
270 | delete txn; | |
271 | } | |
272 | ||
f67539c2 | 273 | TEST_P(OptimisticTransactionTest, FlushTest2) { |
7c673cae FG |
274 | WriteOptions write_options; |
275 | ReadOptions read_options, snapshot_read_options; | |
276 | string value; | |
277 | Status s; | |
278 | ||
11fdf7f2 TL |
279 | txn_db->Put(write_options, Slice("foo"), Slice("bar")); |
280 | txn_db->Put(write_options, Slice("foo2"), Slice("bar")); | |
7c673cae FG |
281 | |
282 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
283 | ASSERT_TRUE(txn); | |
284 | ||
285 | snapshot_read_options.snapshot = txn->GetSnapshot(); | |
286 | ||
287 | txn->GetForUpdate(snapshot_read_options, "foo", &value); | |
288 | ASSERT_EQ(value, "bar"); | |
289 | ||
290 | txn->Put(Slice("foo"), Slice("bar2")); | |
291 | ||
292 | txn->GetForUpdate(snapshot_read_options, "foo", &value); | |
293 | ASSERT_EQ(value, "bar2"); | |
294 | ||
295 | // Put a random key so we have a MemTable to flush | |
11fdf7f2 | 296 | s = txn_db->Put(write_options, "dummy", "dummy"); |
7c673cae FG |
297 | ASSERT_OK(s); |
298 | ||
299 | // force a memtable flush | |
300 | FlushOptions flush_ops; | |
11fdf7f2 | 301 | txn_db->Flush(flush_ops); |
7c673cae FG |
302 | |
303 | // Put a random key so we have a MemTable to flush | |
11fdf7f2 | 304 | s = txn_db->Put(write_options, "dummy", "dummy2"); |
7c673cae FG |
305 | ASSERT_OK(s); |
306 | ||
307 | // force a memtable flush | |
11fdf7f2 | 308 | txn_db->Flush(flush_ops); |
7c673cae | 309 | |
11fdf7f2 | 310 | s = txn_db->Put(write_options, "dummy", "dummy3"); |
7c673cae FG |
311 | ASSERT_OK(s); |
312 | ||
313 | // force a memtable flush | |
314 | // Since our test db has max_write_buffer_number=2, this flush will cause | |
315 | // the first memtable to get purged from the MemtableList history. | |
11fdf7f2 | 316 | txn_db->Flush(flush_ops); |
7c673cae FG |
317 | |
318 | s = txn->Commit(); | |
319 | // txn should not commit since MemTableList History is not large enough | |
320 | ASSERT_TRUE(s.IsTryAgain()); | |
321 | ||
11fdf7f2 | 322 | txn_db->Get(read_options, "foo", &value); |
7c673cae FG |
323 | ASSERT_EQ(value, "bar"); |
324 | ||
325 | delete txn; | |
326 | } | |
327 | ||
f67539c2 TL |
328 | // Trigger the condition where some old memtables are skipped when doing |
329 | // TransactionUtil::CheckKey(), and make sure the result is still correct. | |
330 | TEST_P(OptimisticTransactionTest, CheckKeySkipOldMemtable) { | |
331 | const int kAttemptHistoryMemtable = 0; | |
332 | const int kAttemptImmMemTable = 1; | |
333 | for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable; | |
334 | attempt++) { | |
335 | options.max_write_buffer_number_to_maintain = 3; | |
336 | Reopen(); | |
337 | ||
338 | WriteOptions write_options; | |
339 | ReadOptions read_options; | |
340 | ReadOptions snapshot_read_options; | |
341 | ReadOptions snapshot_read_options2; | |
342 | string value; | |
343 | Status s; | |
344 | ||
345 | ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar"))); | |
346 | ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar"))); | |
347 | ||
348 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
349 | ASSERT_TRUE(txn != nullptr); | |
350 | ||
351 | Transaction* txn2 = txn_db->BeginTransaction(write_options); | |
352 | ASSERT_TRUE(txn2 != nullptr); | |
353 | ||
354 | snapshot_read_options.snapshot = txn->GetSnapshot(); | |
355 | ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value)); | |
356 | ASSERT_EQ(value, "bar"); | |
357 | ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2"))); | |
358 | ||
359 | snapshot_read_options2.snapshot = txn2->GetSnapshot(); | |
360 | ASSERT_OK(txn2->GetForUpdate(snapshot_read_options2, "foo2", &value)); | |
361 | ASSERT_EQ(value, "bar"); | |
362 | ASSERT_OK(txn2->Put(Slice("foo2"), Slice("bar2"))); | |
363 | ||
364 | // txn updates "foo" and txn2 updates "foo2", and now a write is | |
365 | // issued for "foo", which conflicts with txn but not txn2 | |
366 | ASSERT_OK(txn_db->Put(write_options, "foo", "bar")); | |
367 | ||
368 | if (attempt == kAttemptImmMemTable) { | |
369 | // For the second attempt, hold flush from beginning. The memtable | |
370 | // will be switched to immutable after calling TEST_SwitchMemtable() | |
371 | // while CheckKey() is called. | |
372 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( | |
373 | {{"OptimisticTransactionTest.CheckKeySkipOldMemtable", | |
374 | "FlushJob::Start"}}); | |
375 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); | |
376 | } | |
377 | ||
378 | // force a memtable flush. The memtable should still be kept | |
379 | FlushOptions flush_ops; | |
380 | if (attempt == kAttemptHistoryMemtable) { | |
381 | ASSERT_OK(txn_db->Flush(flush_ops)); | |
382 | } else { | |
383 | assert(attempt == kAttemptImmMemTable); | |
384 | DBImpl* db_impl = static_cast<DBImpl*>(txn_db->GetRootDB()); | |
385 | db_impl->TEST_SwitchMemtable(); | |
386 | } | |
387 | uint64_t num_imm_mems; | |
388 | ASSERT_TRUE(txn_db->GetIntProperty(DB::Properties::kNumImmutableMemTable, | |
389 | &num_imm_mems)); | |
390 | if (attempt == kAttemptHistoryMemtable) { | |
391 | ASSERT_EQ(0, num_imm_mems); | |
392 | } else { | |
393 | assert(attempt == kAttemptImmMemTable); | |
394 | ASSERT_EQ(1, num_imm_mems); | |
395 | } | |
396 | ||
397 | // Put something in active memtable | |
398 | ASSERT_OK(txn_db->Put(write_options, Slice("foo3"), Slice("bar"))); | |
399 | ||
400 | // Create txn3 after flushing, when this transaction is commited, | |
401 | // only need to check the active memtable | |
402 | Transaction* txn3 = txn_db->BeginTransaction(write_options); | |
403 | ASSERT_TRUE(txn3 != nullptr); | |
404 | ||
405 | // Commit both of txn and txn2. txn will conflict but txn2 will | |
406 | // pass. In both ways, both memtables are queried. | |
407 | SetPerfLevel(PerfLevel::kEnableCount); | |
408 | ||
409 | get_perf_context()->Reset(); | |
410 | s = txn->Commit(); | |
411 | // We should have checked two memtables | |
412 | ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); | |
413 | // txn should fail because of conflict, even if the memtable | |
414 | // has flushed, because it is still preserved in history. | |
415 | ASSERT_TRUE(s.IsBusy()); | |
416 | ||
417 | get_perf_context()->Reset(); | |
418 | s = txn2->Commit(); | |
419 | // We should have checked two memtables | |
420 | ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); | |
421 | ASSERT_TRUE(s.ok()); | |
422 | ||
423 | txn3->Put(Slice("foo2"), Slice("bar2")); | |
424 | get_perf_context()->Reset(); | |
425 | s = txn3->Commit(); | |
426 | // txn3 is created after the active memtable is created, so that is the only | |
427 | // memtable to check. | |
428 | ASSERT_EQ(1, get_perf_context()->get_from_memtable_count); | |
429 | ASSERT_TRUE(s.ok()); | |
430 | ||
431 | TEST_SYNC_POINT("OptimisticTransactionTest.CheckKeySkipOldMemtable"); | |
432 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); | |
433 | ||
434 | SetPerfLevel(PerfLevel::kDisable); | |
435 | ||
436 | delete txn; | |
437 | delete txn2; | |
438 | delete txn3; | |
439 | } | |
440 | } | |
441 | ||
442 | TEST_P(OptimisticTransactionTest, NoSnapshotTest) { | |
7c673cae FG |
443 | WriteOptions write_options; |
444 | ReadOptions read_options; | |
445 | string value; | |
446 | Status s; | |
447 | ||
11fdf7f2 | 448 | txn_db->Put(write_options, "AAA", "bar"); |
7c673cae FG |
449 | |
450 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
451 | ASSERT_TRUE(txn); | |
452 | ||
453 | // Modify key after transaction start | |
11fdf7f2 | 454 | txn_db->Put(write_options, "AAA", "bar1"); |
7c673cae FG |
455 | |
456 | // Read and write without a snapshot | |
457 | txn->GetForUpdate(read_options, "AAA", &value); | |
458 | ASSERT_EQ(value, "bar1"); | |
459 | txn->Put("AAA", "bar2"); | |
460 | ||
461 | // Should commit since read/write was done after data changed | |
462 | s = txn->Commit(); | |
463 | ASSERT_OK(s); | |
464 | ||
465 | txn->GetForUpdate(read_options, "AAA", &value); | |
466 | ASSERT_EQ(value, "bar2"); | |
467 | ||
468 | delete txn; | |
469 | } | |
470 | ||
f67539c2 | 471 | TEST_P(OptimisticTransactionTest, MultipleSnapshotTest) { |
7c673cae FG |
472 | WriteOptions write_options; |
473 | ReadOptions read_options, snapshot_read_options; | |
474 | string value; | |
475 | Status s; | |
476 | ||
11fdf7f2 TL |
477 | txn_db->Put(write_options, "AAA", "bar"); |
478 | txn_db->Put(write_options, "BBB", "bar"); | |
479 | txn_db->Put(write_options, "CCC", "bar"); | |
7c673cae FG |
480 | |
481 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
482 | ASSERT_TRUE(txn); | |
483 | ||
11fdf7f2 | 484 | txn_db->Put(write_options, "AAA", "bar1"); |
7c673cae FG |
485 | |
486 | // Read and write without a snapshot | |
487 | txn->GetForUpdate(read_options, "AAA", &value); | |
488 | ASSERT_EQ(value, "bar1"); | |
489 | txn->Put("AAA", "bar2"); | |
490 | ||
491 | // Modify BBB before snapshot is taken | |
11fdf7f2 | 492 | txn_db->Put(write_options, "BBB", "bar1"); |
7c673cae FG |
493 | |
494 | txn->SetSnapshot(); | |
495 | snapshot_read_options.snapshot = txn->GetSnapshot(); | |
496 | ||
497 | // Read and write with snapshot | |
498 | txn->GetForUpdate(snapshot_read_options, "BBB", &value); | |
499 | ASSERT_EQ(value, "bar1"); | |
500 | txn->Put("BBB", "bar2"); | |
501 | ||
11fdf7f2 | 502 | txn_db->Put(write_options, "CCC", "bar1"); |
7c673cae FG |
503 | |
504 | // Set a new snapshot | |
505 | txn->SetSnapshot(); | |
506 | snapshot_read_options.snapshot = txn->GetSnapshot(); | |
507 | ||
508 | // Read and write with snapshot | |
509 | txn->GetForUpdate(snapshot_read_options, "CCC", &value); | |
510 | ASSERT_EQ(value, "bar1"); | |
511 | txn->Put("CCC", "bar2"); | |
512 | ||
513 | s = txn->GetForUpdate(read_options, "AAA", &value); | |
514 | ASSERT_OK(s); | |
515 | ASSERT_EQ(value, "bar2"); | |
516 | s = txn->GetForUpdate(read_options, "BBB", &value); | |
517 | ASSERT_OK(s); | |
518 | ASSERT_EQ(value, "bar2"); | |
519 | s = txn->GetForUpdate(read_options, "CCC", &value); | |
520 | ASSERT_OK(s); | |
521 | ASSERT_EQ(value, "bar2"); | |
522 | ||
11fdf7f2 | 523 | s = txn_db->Get(read_options, "AAA", &value); |
7c673cae FG |
524 | ASSERT_OK(s); |
525 | ASSERT_EQ(value, "bar1"); | |
11fdf7f2 | 526 | s = txn_db->Get(read_options, "BBB", &value); |
7c673cae FG |
527 | ASSERT_OK(s); |
528 | ASSERT_EQ(value, "bar1"); | |
11fdf7f2 | 529 | s = txn_db->Get(read_options, "CCC", &value); |
7c673cae FG |
530 | ASSERT_OK(s); |
531 | ASSERT_EQ(value, "bar1"); | |
532 | ||
533 | s = txn->Commit(); | |
534 | ASSERT_OK(s); | |
535 | ||
11fdf7f2 | 536 | s = txn_db->Get(read_options, "AAA", &value); |
7c673cae FG |
537 | ASSERT_OK(s); |
538 | ASSERT_EQ(value, "bar2"); | |
11fdf7f2 | 539 | s = txn_db->Get(read_options, "BBB", &value); |
7c673cae FG |
540 | ASSERT_OK(s); |
541 | ASSERT_EQ(value, "bar2"); | |
11fdf7f2 | 542 | s = txn_db->Get(read_options, "CCC", &value); |
7c673cae FG |
543 | ASSERT_OK(s); |
544 | ASSERT_EQ(value, "bar2"); | |
545 | ||
546 | // verify that we track multiple writes to the same key at different snapshots | |
547 | delete txn; | |
548 | txn = txn_db->BeginTransaction(write_options); | |
549 | ||
550 | // Potentially conflicting writes | |
11fdf7f2 TL |
551 | txn_db->Put(write_options, "ZZZ", "zzz"); |
552 | txn_db->Put(write_options, "XXX", "xxx"); | |
7c673cae FG |
553 | |
554 | txn->SetSnapshot(); | |
555 | ||
556 | OptimisticTransactionOptions txn_options; | |
557 | txn_options.set_snapshot = true; | |
558 | Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options); | |
559 | txn2->SetSnapshot(); | |
560 | ||
561 | // This should not conflict in txn since the snapshot is later than the | |
562 | // previous write (spoiler alert: it will later conflict with txn2). | |
563 | txn->Put("ZZZ", "zzzz"); | |
564 | s = txn->Commit(); | |
565 | ASSERT_OK(s); | |
566 | ||
567 | delete txn; | |
568 | ||
569 | // This will conflict since the snapshot is earlier than another write to ZZZ | |
570 | txn2->Put("ZZZ", "xxxxx"); | |
571 | ||
572 | s = txn2->Commit(); | |
573 | ASSERT_TRUE(s.IsBusy()); | |
574 | ||
575 | delete txn2; | |
576 | } | |
577 | ||
f67539c2 | 578 | TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) { |
7c673cae FG |
579 | WriteOptions write_options; |
580 | ReadOptions read_options, snapshot_read_options; | |
581 | OptimisticTransactionOptions txn_options; | |
582 | string value; | |
583 | Status s; | |
584 | ||
585 | ColumnFamilyHandle *cfa, *cfb; | |
586 | ColumnFamilyOptions cf_options; | |
587 | ||
588 | // Create 2 new column families | |
11fdf7f2 | 589 | s = txn_db->CreateColumnFamily(cf_options, "CFA", &cfa); |
7c673cae | 590 | ASSERT_OK(s); |
11fdf7f2 | 591 | s = txn_db->CreateColumnFamily(cf_options, "CFB", &cfb); |
7c673cae FG |
592 | ASSERT_OK(s); |
593 | ||
594 | delete cfa; | |
595 | delete cfb; | |
596 | delete txn_db; | |
11fdf7f2 | 597 | txn_db = nullptr; |
7c673cae FG |
598 | |
599 | // open DB with three column families | |
600 | std::vector<ColumnFamilyDescriptor> column_families; | |
601 | // have to open default column family | |
602 | column_families.push_back( | |
603 | ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); | |
604 | // open the new column families | |
605 | column_families.push_back( | |
606 | ColumnFamilyDescriptor("CFA", ColumnFamilyOptions())); | |
607 | column_families.push_back( | |
608 | ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); | |
609 | std::vector<ColumnFamilyHandle*> handles; | |
610 | s = OptimisticTransactionDB::Open(options, dbname, column_families, &handles, | |
611 | &txn_db); | |
612 | ASSERT_OK(s); | |
11fdf7f2 | 613 | assert(txn_db != nullptr); |
7c673cae FG |
614 | |
615 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
616 | ASSERT_TRUE(txn); | |
617 | ||
618 | txn->SetSnapshot(); | |
619 | snapshot_read_options.snapshot = txn->GetSnapshot(); | |
620 | ||
621 | txn_options.set_snapshot = true; | |
622 | Transaction* txn2 = txn_db->BeginTransaction(write_options, txn_options); | |
623 | ASSERT_TRUE(txn2); | |
624 | ||
625 | // Write some data to the db | |
626 | WriteBatch batch; | |
627 | batch.Put("foo", "foo"); | |
628 | batch.Put(handles[1], "AAA", "bar"); | |
629 | batch.Put(handles[1], "AAAZZZ", "bar"); | |
11fdf7f2 | 630 | s = txn_db->Write(write_options, &batch); |
7c673cae | 631 | ASSERT_OK(s); |
11fdf7f2 | 632 | txn_db->Delete(write_options, handles[1], "AAAZZZ"); |
7c673cae FG |
633 | |
634 | // These keys do no conflict with existing writes since they're in | |
635 | // different column families | |
636 | txn->Delete("AAA"); | |
637 | txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value); | |
638 | Slice key_slice("AAAZZZ"); | |
639 | Slice value_slices[2] = {Slice("bar"), Slice("bar")}; | |
640 | txn->Put(handles[2], SliceParts(&key_slice, 1), SliceParts(value_slices, 2)); | |
641 | ||
642 | ASSERT_EQ(3, txn->GetNumKeys()); | |
643 | ||
644 | // Txn should commit | |
645 | s = txn->Commit(); | |
646 | ASSERT_OK(s); | |
11fdf7f2 | 647 | s = txn_db->Get(read_options, "AAA", &value); |
7c673cae | 648 | ASSERT_TRUE(s.IsNotFound()); |
11fdf7f2 | 649 | s = txn_db->Get(read_options, handles[2], "AAAZZZ", &value); |
7c673cae FG |
650 | ASSERT_EQ(value, "barbar"); |
651 | ||
652 | Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")}; | |
653 | Slice value_slice("barbarbar"); | |
654 | // This write will cause a conflict with the earlier batch write | |
655 | txn2->Put(handles[1], SliceParts(key_slices, 3), SliceParts(&value_slice, 1)); | |
656 | ||
657 | txn2->Delete(handles[2], "XXX"); | |
658 | txn2->Delete(handles[1], "XXX"); | |
659 | s = txn2->GetForUpdate(snapshot_read_options, handles[1], "AAA", &value); | |
660 | ASSERT_TRUE(s.IsNotFound()); | |
661 | ||
662 | // Verify txn did not commit | |
663 | s = txn2->Commit(); | |
664 | ASSERT_TRUE(s.IsBusy()); | |
11fdf7f2 | 665 | s = txn_db->Get(read_options, handles[1], "AAAZZZ", &value); |
7c673cae FG |
666 | ASSERT_EQ(value, "barbar"); |
667 | ||
668 | delete txn; | |
669 | delete txn2; | |
670 | ||
671 | txn = txn_db->BeginTransaction(write_options, txn_options); | |
672 | snapshot_read_options.snapshot = txn->GetSnapshot(); | |
673 | ||
674 | txn2 = txn_db->BeginTransaction(write_options, txn_options); | |
675 | ASSERT_TRUE(txn); | |
676 | ||
677 | std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2], | |
678 | handles[0], handles[2]}; | |
679 | std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"}; | |
680 | std::vector<std::string> values(4); | |
681 | ||
682 | std::vector<Status> results = txn->MultiGetForUpdate( | |
683 | snapshot_read_options, multiget_cfh, multiget_keys, &values); | |
684 | ASSERT_OK(results[0]); | |
685 | ASSERT_OK(results[1]); | |
686 | ASSERT_OK(results[2]); | |
687 | ASSERT_TRUE(results[3].IsNotFound()); | |
688 | ASSERT_EQ(values[0], "bar"); | |
689 | ASSERT_EQ(values[1], "barbar"); | |
690 | ASSERT_EQ(values[2], "foo"); | |
691 | ||
692 | txn->Delete(handles[2], "ZZZ"); | |
693 | txn->Put(handles[2], "ZZZ", "YYY"); | |
694 | txn->Put(handles[2], "ZZZ", "YYYY"); | |
695 | txn->Delete(handles[2], "ZZZ"); | |
696 | txn->Put(handles[2], "AAAZZZ", "barbarbar"); | |
697 | ||
698 | ASSERT_EQ(5, txn->GetNumKeys()); | |
699 | ||
700 | // Txn should commit | |
701 | s = txn->Commit(); | |
702 | ASSERT_OK(s); | |
11fdf7f2 | 703 | s = txn_db->Get(read_options, handles[2], "ZZZ", &value); |
7c673cae FG |
704 | ASSERT_TRUE(s.IsNotFound()); |
705 | ||
706 | // Put a key which will conflict with the next txn using the previous snapshot | |
11fdf7f2 | 707 | txn_db->Put(write_options, handles[2], "foo", "000"); |
7c673cae FG |
708 | |
709 | results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh, | |
710 | multiget_keys, &values); | |
711 | ASSERT_OK(results[0]); | |
712 | ASSERT_OK(results[1]); | |
713 | ASSERT_OK(results[2]); | |
714 | ASSERT_TRUE(results[3].IsNotFound()); | |
715 | ASSERT_EQ(values[0], "bar"); | |
716 | ASSERT_EQ(values[1], "barbar"); | |
717 | ASSERT_EQ(values[2], "foo"); | |
718 | ||
719 | // Verify Txn Did not Commit | |
720 | s = txn2->Commit(); | |
721 | ASSERT_TRUE(s.IsBusy()); | |
722 | ||
11fdf7f2 | 723 | s = txn_db->DropColumnFamily(handles[1]); |
7c673cae | 724 | ASSERT_OK(s); |
11fdf7f2 | 725 | s = txn_db->DropColumnFamily(handles[2]); |
7c673cae FG |
726 | ASSERT_OK(s); |
727 | ||
728 | delete txn; | |
729 | delete txn2; | |
730 | ||
731 | for (auto handle : handles) { | |
732 | delete handle; | |
733 | } | |
734 | } | |
735 | ||
f67539c2 | 736 | TEST_P(OptimisticTransactionTest, EmptyTest) { |
7c673cae FG |
737 | WriteOptions write_options; |
738 | ReadOptions read_options; | |
739 | string value; | |
740 | Status s; | |
741 | ||
11fdf7f2 | 742 | s = txn_db->Put(write_options, "aaa", "aaa"); |
7c673cae FG |
743 | ASSERT_OK(s); |
744 | ||
745 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
746 | s = txn->Commit(); | |
747 | ASSERT_OK(s); | |
748 | delete txn; | |
749 | ||
750 | txn = txn_db->BeginTransaction(write_options); | |
751 | txn->Rollback(); | |
752 | delete txn; | |
753 | ||
754 | txn = txn_db->BeginTransaction(write_options); | |
755 | s = txn->GetForUpdate(read_options, "aaa", &value); | |
756 | ASSERT_EQ(value, "aaa"); | |
757 | ||
758 | s = txn->Commit(); | |
759 | ASSERT_OK(s); | |
760 | delete txn; | |
761 | ||
762 | txn = txn_db->BeginTransaction(write_options); | |
763 | txn->SetSnapshot(); | |
764 | s = txn->GetForUpdate(read_options, "aaa", &value); | |
765 | ASSERT_EQ(value, "aaa"); | |
766 | ||
11fdf7f2 | 767 | s = txn_db->Put(write_options, "aaa", "xxx"); |
7c673cae FG |
768 | s = txn->Commit(); |
769 | ASSERT_TRUE(s.IsBusy()); | |
770 | delete txn; | |
771 | } | |
772 | ||
f67539c2 | 773 | TEST_P(OptimisticTransactionTest, PredicateManyPreceders) { |
7c673cae FG |
774 | WriteOptions write_options; |
775 | ReadOptions read_options1, read_options2; | |
776 | OptimisticTransactionOptions txn_options; | |
777 | string value; | |
778 | Status s; | |
779 | ||
780 | txn_options.set_snapshot = true; | |
781 | Transaction* txn1 = txn_db->BeginTransaction(write_options, txn_options); | |
782 | read_options1.snapshot = txn1->GetSnapshot(); | |
783 | ||
784 | Transaction* txn2 = txn_db->BeginTransaction(write_options); | |
785 | txn2->SetSnapshot(); | |
786 | read_options2.snapshot = txn2->GetSnapshot(); | |
787 | ||
788 | std::vector<Slice> multiget_keys = {"1", "2", "3"}; | |
789 | std::vector<std::string> multiget_values; | |
790 | ||
791 | std::vector<Status> results = | |
792 | txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values); | |
793 | ASSERT_TRUE(results[1].IsNotFound()); | |
794 | ||
795 | txn2->Put("2", "x"); | |
796 | ||
797 | s = txn2->Commit(); | |
798 | ASSERT_OK(s); | |
799 | ||
800 | multiget_values.clear(); | |
801 | results = | |
802 | txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values); | |
803 | ASSERT_TRUE(results[1].IsNotFound()); | |
804 | ||
805 | // should not commit since txn2 wrote a key txn has read | |
806 | s = txn1->Commit(); | |
807 | ASSERT_TRUE(s.IsBusy()); | |
808 | ||
809 | delete txn1; | |
810 | delete txn2; | |
811 | ||
812 | txn1 = txn_db->BeginTransaction(write_options, txn_options); | |
813 | read_options1.snapshot = txn1->GetSnapshot(); | |
814 | ||
815 | txn2 = txn_db->BeginTransaction(write_options, txn_options); | |
816 | read_options2.snapshot = txn2->GetSnapshot(); | |
817 | ||
818 | txn1->Put("4", "x"); | |
819 | ||
820 | txn2->Delete("4"); | |
821 | ||
822 | // txn1 can commit since txn2's delete hasn't happened yet (it's just batched) | |
823 | s = txn1->Commit(); | |
824 | ASSERT_OK(s); | |
825 | ||
826 | s = txn2->GetForUpdate(read_options2, "4", &value); | |
827 | ASSERT_TRUE(s.IsNotFound()); | |
828 | ||
829 | // txn2 cannot commit since txn1 changed "4" | |
830 | s = txn2->Commit(); | |
831 | ASSERT_TRUE(s.IsBusy()); | |
832 | ||
833 | delete txn1; | |
834 | delete txn2; | |
835 | } | |
836 | ||
f67539c2 | 837 | TEST_P(OptimisticTransactionTest, LostUpdate) { |
7c673cae FG |
838 | WriteOptions write_options; |
839 | ReadOptions read_options, read_options1, read_options2; | |
840 | OptimisticTransactionOptions txn_options; | |
841 | string value; | |
842 | Status s; | |
843 | ||
844 | // Test 2 transactions writing to the same key in multiple orders and | |
845 | // with/without snapshots | |
846 | ||
847 | Transaction* txn1 = txn_db->BeginTransaction(write_options); | |
848 | Transaction* txn2 = txn_db->BeginTransaction(write_options); | |
849 | ||
850 | txn1->Put("1", "1"); | |
851 | txn2->Put("1", "2"); | |
852 | ||
853 | s = txn1->Commit(); | |
854 | ASSERT_OK(s); | |
855 | ||
856 | s = txn2->Commit(); | |
857 | ASSERT_TRUE(s.IsBusy()); | |
858 | ||
859 | delete txn1; | |
860 | delete txn2; | |
861 | ||
862 | txn_options.set_snapshot = true; | |
863 | txn1 = txn_db->BeginTransaction(write_options, txn_options); | |
864 | read_options1.snapshot = txn1->GetSnapshot(); | |
865 | ||
866 | txn2 = txn_db->BeginTransaction(write_options, txn_options); | |
867 | read_options2.snapshot = txn2->GetSnapshot(); | |
868 | ||
869 | txn1->Put("1", "3"); | |
870 | txn2->Put("1", "4"); | |
871 | ||
872 | s = txn1->Commit(); | |
873 | ASSERT_OK(s); | |
874 | ||
875 | s = txn2->Commit(); | |
876 | ASSERT_TRUE(s.IsBusy()); | |
877 | ||
878 | delete txn1; | |
879 | delete txn2; | |
880 | ||
881 | txn1 = txn_db->BeginTransaction(write_options, txn_options); | |
882 | read_options1.snapshot = txn1->GetSnapshot(); | |
883 | ||
884 | txn2 = txn_db->BeginTransaction(write_options, txn_options); | |
885 | read_options2.snapshot = txn2->GetSnapshot(); | |
886 | ||
887 | txn1->Put("1", "5"); | |
888 | s = txn1->Commit(); | |
889 | ASSERT_OK(s); | |
890 | ||
891 | txn2->Put("1", "6"); | |
892 | s = txn2->Commit(); | |
893 | ASSERT_TRUE(s.IsBusy()); | |
894 | ||
895 | delete txn1; | |
896 | delete txn2; | |
897 | ||
898 | txn1 = txn_db->BeginTransaction(write_options, txn_options); | |
899 | read_options1.snapshot = txn1->GetSnapshot(); | |
900 | ||
901 | txn2 = txn_db->BeginTransaction(write_options, txn_options); | |
902 | read_options2.snapshot = txn2->GetSnapshot(); | |
903 | ||
904 | txn1->Put("1", "5"); | |
905 | s = txn1->Commit(); | |
906 | ASSERT_OK(s); | |
907 | ||
908 | txn2->SetSnapshot(); | |
909 | txn2->Put("1", "6"); | |
910 | s = txn2->Commit(); | |
911 | ASSERT_OK(s); | |
912 | ||
913 | delete txn1; | |
914 | delete txn2; | |
915 | ||
916 | txn1 = txn_db->BeginTransaction(write_options); | |
917 | txn2 = txn_db->BeginTransaction(write_options); | |
918 | ||
919 | txn1->Put("1", "7"); | |
920 | s = txn1->Commit(); | |
921 | ASSERT_OK(s); | |
922 | ||
923 | txn2->Put("1", "8"); | |
924 | s = txn2->Commit(); | |
925 | ASSERT_OK(s); | |
926 | ||
927 | delete txn1; | |
928 | delete txn2; | |
929 | ||
11fdf7f2 | 930 | s = txn_db->Get(read_options, "1", &value); |
7c673cae FG |
931 | ASSERT_OK(s); |
932 | ASSERT_EQ(value, "8"); | |
933 | } | |
934 | ||
f67539c2 | 935 | TEST_P(OptimisticTransactionTest, UntrackedWrites) { |
7c673cae FG |
936 | WriteOptions write_options; |
937 | ReadOptions read_options; | |
938 | string value; | |
939 | Status s; | |
940 | ||
941 | // Verify transaction rollback works for untracked keys. | |
942 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
943 | txn->PutUntracked("untracked", "0"); | |
944 | txn->Rollback(); | |
11fdf7f2 | 945 | s = txn_db->Get(read_options, "untracked", &value); |
7c673cae FG |
946 | ASSERT_TRUE(s.IsNotFound()); |
947 | ||
948 | delete txn; | |
949 | txn = txn_db->BeginTransaction(write_options); | |
950 | ||
951 | txn->Put("tracked", "1"); | |
952 | txn->PutUntracked("untracked", "1"); | |
953 | txn->MergeUntracked("untracked", "2"); | |
954 | txn->DeleteUntracked("untracked"); | |
955 | ||
956 | // Write to the untracked key outside of the transaction and verify | |
957 | // it doesn't prevent the transaction from committing. | |
11fdf7f2 | 958 | s = txn_db->Put(write_options, "untracked", "x"); |
7c673cae FG |
959 | ASSERT_OK(s); |
960 | ||
961 | s = txn->Commit(); | |
962 | ASSERT_OK(s); | |
963 | ||
11fdf7f2 | 964 | s = txn_db->Get(read_options, "untracked", &value); |
7c673cae FG |
965 | ASSERT_TRUE(s.IsNotFound()); |
966 | ||
967 | delete txn; | |
968 | txn = txn_db->BeginTransaction(write_options); | |
969 | ||
970 | txn->Put("tracked", "10"); | |
971 | txn->PutUntracked("untracked", "A"); | |
972 | ||
973 | // Write to tracked key outside of the transaction and verify that the | |
974 | // untracked keys are not written when the commit fails. | |
11fdf7f2 | 975 | s = txn_db->Delete(write_options, "tracked"); |
7c673cae FG |
976 | |
977 | s = txn->Commit(); | |
978 | ASSERT_TRUE(s.IsBusy()); | |
979 | ||
11fdf7f2 | 980 | s = txn_db->Get(read_options, "untracked", &value); |
7c673cae FG |
981 | ASSERT_TRUE(s.IsNotFound()); |
982 | ||
983 | delete txn; | |
984 | } | |
985 | ||
f67539c2 | 986 | TEST_P(OptimisticTransactionTest, IteratorTest) { |
7c673cae FG |
987 | WriteOptions write_options; |
988 | ReadOptions read_options, snapshot_read_options; | |
989 | OptimisticTransactionOptions txn_options; | |
990 | string value; | |
991 | Status s; | |
992 | ||
993 | // Write some keys to the db | |
11fdf7f2 | 994 | s = txn_db->Put(write_options, "A", "a"); |
7c673cae FG |
995 | ASSERT_OK(s); |
996 | ||
11fdf7f2 | 997 | s = txn_db->Put(write_options, "G", "g"); |
7c673cae FG |
998 | ASSERT_OK(s); |
999 | ||
11fdf7f2 | 1000 | s = txn_db->Put(write_options, "F", "f"); |
7c673cae FG |
1001 | ASSERT_OK(s); |
1002 | ||
11fdf7f2 | 1003 | s = txn_db->Put(write_options, "C", "c"); |
7c673cae FG |
1004 | ASSERT_OK(s); |
1005 | ||
11fdf7f2 | 1006 | s = txn_db->Put(write_options, "D", "d"); |
7c673cae FG |
1007 | ASSERT_OK(s); |
1008 | ||
1009 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
1010 | ASSERT_TRUE(txn); | |
1011 | ||
1012 | // Write some keys in a txn | |
1013 | s = txn->Put("B", "b"); | |
1014 | ASSERT_OK(s); | |
1015 | ||
1016 | s = txn->Put("H", "h"); | |
1017 | ASSERT_OK(s); | |
1018 | ||
1019 | s = txn->Delete("D"); | |
1020 | ASSERT_OK(s); | |
1021 | ||
1022 | s = txn->Put("E", "e"); | |
1023 | ASSERT_OK(s); | |
1024 | ||
1025 | txn->SetSnapshot(); | |
1026 | const Snapshot* snapshot = txn->GetSnapshot(); | |
1027 | ||
1028 | // Write some keys to the db after the snapshot | |
11fdf7f2 | 1029 | s = txn_db->Put(write_options, "BB", "xx"); |
7c673cae FG |
1030 | ASSERT_OK(s); |
1031 | ||
11fdf7f2 | 1032 | s = txn_db->Put(write_options, "C", "xx"); |
7c673cae FG |
1033 | ASSERT_OK(s); |
1034 | ||
1035 | read_options.snapshot = snapshot; | |
1036 | Iterator* iter = txn->GetIterator(read_options); | |
1037 | ASSERT_OK(iter->status()); | |
1038 | iter->SeekToFirst(); | |
1039 | ||
1040 | // Read all keys via iter and lock them all | |
1041 | std::string results[] = {"a", "b", "c", "e", "f", "g", "h"}; | |
1042 | for (int i = 0; i < 7; i++) { | |
1043 | ASSERT_OK(iter->status()); | |
1044 | ASSERT_TRUE(iter->Valid()); | |
1045 | ASSERT_EQ(results[i], iter->value().ToString()); | |
1046 | ||
1047 | s = txn->GetForUpdate(read_options, iter->key(), nullptr); | |
1048 | ASSERT_OK(s); | |
1049 | ||
1050 | iter->Next(); | |
1051 | } | |
1052 | ASSERT_FALSE(iter->Valid()); | |
1053 | ||
1054 | iter->Seek("G"); | |
1055 | ASSERT_OK(iter->status()); | |
1056 | ASSERT_TRUE(iter->Valid()); | |
1057 | ASSERT_EQ("g", iter->value().ToString()); | |
1058 | ||
1059 | iter->Prev(); | |
1060 | ASSERT_OK(iter->status()); | |
1061 | ASSERT_TRUE(iter->Valid()); | |
1062 | ASSERT_EQ("f", iter->value().ToString()); | |
1063 | ||
1064 | iter->Seek("D"); | |
1065 | ASSERT_OK(iter->status()); | |
1066 | ASSERT_TRUE(iter->Valid()); | |
1067 | ASSERT_EQ("e", iter->value().ToString()); | |
1068 | ||
1069 | iter->Seek("C"); | |
1070 | ASSERT_OK(iter->status()); | |
1071 | ASSERT_TRUE(iter->Valid()); | |
1072 | ASSERT_EQ("c", iter->value().ToString()); | |
1073 | ||
1074 | iter->Next(); | |
1075 | ASSERT_OK(iter->status()); | |
1076 | ASSERT_TRUE(iter->Valid()); | |
1077 | ASSERT_EQ("e", iter->value().ToString()); | |
1078 | ||
1079 | iter->Seek(""); | |
1080 | ASSERT_OK(iter->status()); | |
1081 | ASSERT_TRUE(iter->Valid()); | |
1082 | ASSERT_EQ("a", iter->value().ToString()); | |
1083 | ||
1084 | iter->Seek("X"); | |
1085 | ASSERT_OK(iter->status()); | |
1086 | ASSERT_FALSE(iter->Valid()); | |
1087 | ||
1088 | iter->SeekToLast(); | |
1089 | ASSERT_OK(iter->status()); | |
1090 | ASSERT_TRUE(iter->Valid()); | |
1091 | ASSERT_EQ("h", iter->value().ToString()); | |
1092 | ||
1093 | // key "C" was modified in the db after txn's snapshot. txn will not commit. | |
1094 | s = txn->Commit(); | |
1095 | ASSERT_TRUE(s.IsBusy()); | |
1096 | ||
1097 | delete iter; | |
1098 | delete txn; | |
1099 | } | |
1100 | ||
20effc67 TL |
1101 | TEST_P(OptimisticTransactionTest, DeleteRangeSupportTest) { |
1102 | // `OptimisticTransactionDB` does not allow range deletion in any API. | |
1103 | ASSERT_TRUE( | |
1104 | txn_db | |
1105 | ->DeleteRange(WriteOptions(), txn_db->DefaultColumnFamily(), "a", "b") | |
1106 | .IsNotSupported()); | |
1107 | WriteBatch wb; | |
1108 | ASSERT_OK(wb.DeleteRange("a", "b")); | |
1109 | ASSERT_NOK(txn_db->Write(WriteOptions(), &wb)); | |
1110 | } | |
1111 | ||
f67539c2 | 1112 | TEST_P(OptimisticTransactionTest, SavepointTest) { |
7c673cae FG |
1113 | WriteOptions write_options; |
1114 | ReadOptions read_options, snapshot_read_options; | |
1115 | OptimisticTransactionOptions txn_options; | |
1116 | string value; | |
1117 | Status s; | |
1118 | ||
1119 | Transaction* txn = txn_db->BeginTransaction(write_options); | |
1120 | ASSERT_TRUE(txn); | |
1121 | ||
1122 | s = txn->RollbackToSavePoint(); | |
1123 | ASSERT_TRUE(s.IsNotFound()); | |
1124 | ||
1125 | txn->SetSavePoint(); // 1 | |
1126 | ||
1127 | ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn | |
1128 | s = txn->RollbackToSavePoint(); | |
1129 | ASSERT_TRUE(s.IsNotFound()); | |
1130 | ||
1131 | s = txn->Put("B", "b"); | |
1132 | ASSERT_OK(s); | |
1133 | ||
1134 | s = txn->Commit(); | |
1135 | ASSERT_OK(s); | |
1136 | ||
11fdf7f2 | 1137 | s = txn_db->Get(read_options, "B", &value); |
7c673cae FG |
1138 | ASSERT_OK(s); |
1139 | ASSERT_EQ("b", value); | |
1140 | ||
1141 | delete txn; | |
1142 | txn = txn_db->BeginTransaction(write_options); | |
1143 | ASSERT_TRUE(txn); | |
1144 | ||
1145 | s = txn->Put("A", "a"); | |
1146 | ASSERT_OK(s); | |
1147 | ||
1148 | s = txn->Put("B", "bb"); | |
1149 | ASSERT_OK(s); | |
1150 | ||
1151 | s = txn->Put("C", "c"); | |
1152 | ASSERT_OK(s); | |
1153 | ||
1154 | txn->SetSavePoint(); // 2 | |
1155 | ||
1156 | s = txn->Delete("B"); | |
1157 | ASSERT_OK(s); | |
1158 | ||
1159 | s = txn->Put("C", "cc"); | |
1160 | ASSERT_OK(s); | |
1161 | ||
1162 | s = txn->Put("D", "d"); | |
1163 | ASSERT_OK(s); | |
1164 | ||
1165 | ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2 | |
1166 | ||
1167 | s = txn->Get(read_options, "A", &value); | |
1168 | ASSERT_OK(s); | |
1169 | ASSERT_EQ("a", value); | |
1170 | ||
1171 | s = txn->Get(read_options, "B", &value); | |
1172 | ASSERT_OK(s); | |
1173 | ASSERT_EQ("bb", value); | |
1174 | ||
1175 | s = txn->Get(read_options, "C", &value); | |
1176 | ASSERT_OK(s); | |
1177 | ASSERT_EQ("c", value); | |
1178 | ||
1179 | s = txn->Get(read_options, "D", &value); | |
1180 | ASSERT_TRUE(s.IsNotFound()); | |
1181 | ||
1182 | s = txn->Put("A", "a"); | |
1183 | ASSERT_OK(s); | |
1184 | ||
1185 | s = txn->Put("E", "e"); | |
1186 | ASSERT_OK(s); | |
1187 | ||
1188 | // Rollback to beginning of txn | |
1189 | s = txn->RollbackToSavePoint(); | |
1190 | ASSERT_TRUE(s.IsNotFound()); | |
1191 | txn->Rollback(); | |
1192 | ||
1193 | s = txn->Get(read_options, "A", &value); | |
1194 | ASSERT_TRUE(s.IsNotFound()); | |
1195 | ||
1196 | s = txn->Get(read_options, "B", &value); | |
1197 | ASSERT_OK(s); | |
1198 | ASSERT_EQ("b", value); | |
1199 | ||
1200 | s = txn->Get(read_options, "D", &value); | |
1201 | ASSERT_TRUE(s.IsNotFound()); | |
1202 | ||
1203 | s = txn->Get(read_options, "D", &value); | |
1204 | ASSERT_TRUE(s.IsNotFound()); | |
1205 | ||
1206 | s = txn->Get(read_options, "E", &value); | |
1207 | ASSERT_TRUE(s.IsNotFound()); | |
1208 | ||
1209 | s = txn->Put("A", "aa"); | |
1210 | ASSERT_OK(s); | |
1211 | ||
1212 | s = txn->Put("F", "f"); | |
1213 | ASSERT_OK(s); | |
1214 | ||
1215 | txn->SetSavePoint(); // 3 | |
1216 | txn->SetSavePoint(); // 4 | |
1217 | ||
1218 | s = txn->Put("G", "g"); | |
1219 | ASSERT_OK(s); | |
1220 | ||
1221 | s = txn->Delete("F"); | |
1222 | ASSERT_OK(s); | |
1223 | ||
1224 | s = txn->Delete("B"); | |
1225 | ASSERT_OK(s); | |
1226 | ||
1227 | s = txn->Get(read_options, "A", &value); | |
1228 | ASSERT_OK(s); | |
1229 | ASSERT_EQ("aa", value); | |
1230 | ||
1231 | s = txn->Get(read_options, "F", &value); | |
1232 | ASSERT_TRUE(s.IsNotFound()); | |
1233 | ||
1234 | s = txn->Get(read_options, "B", &value); | |
1235 | ASSERT_TRUE(s.IsNotFound()); | |
1236 | ||
1237 | ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3 | |
1238 | ||
1239 | s = txn->Get(read_options, "F", &value); | |
1240 | ASSERT_OK(s); | |
1241 | ASSERT_EQ("f", value); | |
1242 | ||
1243 | s = txn->Get(read_options, "G", &value); | |
1244 | ASSERT_TRUE(s.IsNotFound()); | |
1245 | ||
1246 | s = txn->Commit(); | |
1247 | ASSERT_OK(s); | |
1248 | ||
11fdf7f2 | 1249 | s = txn_db->Get(read_options, "F", &value); |
7c673cae FG |
1250 | ASSERT_OK(s); |
1251 | ASSERT_EQ("f", value); | |
1252 | ||
11fdf7f2 | 1253 | s = txn_db->Get(read_options, "G", &value); |
7c673cae FG |
1254 | ASSERT_TRUE(s.IsNotFound()); |
1255 | ||
11fdf7f2 | 1256 | s = txn_db->Get(read_options, "A", &value); |
7c673cae FG |
1257 | ASSERT_OK(s); |
1258 | ASSERT_EQ("aa", value); | |
1259 | ||
11fdf7f2 | 1260 | s = txn_db->Get(read_options, "B", &value); |
7c673cae FG |
1261 | ASSERT_OK(s); |
1262 | ASSERT_EQ("b", value); | |
1263 | ||
11fdf7f2 | 1264 | s = txn_db->Get(read_options, "C", &value); |
7c673cae FG |
1265 | ASSERT_TRUE(s.IsNotFound()); |
1266 | ||
11fdf7f2 | 1267 | s = txn_db->Get(read_options, "D", &value); |
7c673cae FG |
1268 | ASSERT_TRUE(s.IsNotFound()); |
1269 | ||
11fdf7f2 | 1270 | s = txn_db->Get(read_options, "E", &value); |
7c673cae FG |
1271 | ASSERT_TRUE(s.IsNotFound()); |
1272 | ||
1273 | delete txn; | |
1274 | } | |
1275 | ||
f67539c2 | 1276 | TEST_P(OptimisticTransactionTest, UndoGetForUpdateTest) { |
7c673cae FG |
1277 | WriteOptions write_options; |
1278 | ReadOptions read_options, snapshot_read_options; | |
1279 | OptimisticTransactionOptions txn_options; | |
1280 | string value; | |
1281 | Status s; | |
1282 | ||
11fdf7f2 | 1283 | txn_db->Put(write_options, "A", ""); |
7c673cae FG |
1284 | |
1285 | Transaction* txn1 = txn_db->BeginTransaction(write_options); | |
1286 | ASSERT_TRUE(txn1); | |
1287 | ||
1288 | s = txn1->GetForUpdate(read_options, "A", &value); | |
1289 | ASSERT_OK(s); | |
1290 | ||
1291 | txn1->UndoGetForUpdate("A"); | |
1292 | ||
1293 | Transaction* txn2 = txn_db->BeginTransaction(write_options); | |
1294 | txn2->Put("A", "x"); | |
1295 | s = txn2->Commit(); | |
1296 | ASSERT_OK(s); | |
1297 | delete txn2; | |
1298 | ||
1299 | // Verify that txn1 can commit since A isn't conflict checked | |
1300 | s = txn1->Commit(); | |
1301 | ASSERT_OK(s); | |
1302 | delete txn1; | |
1303 | ||
1304 | txn1 = txn_db->BeginTransaction(write_options); | |
1305 | txn1->Put("A", "a"); | |
1306 | ||
1307 | s = txn1->GetForUpdate(read_options, "A", &value); | |
1308 | ASSERT_OK(s); | |
1309 | ||
1310 | txn1->UndoGetForUpdate("A"); | |
1311 | ||
1312 | txn2 = txn_db->BeginTransaction(write_options); | |
1313 | txn2->Put("A", "x"); | |
1314 | s = txn2->Commit(); | |
1315 | ASSERT_OK(s); | |
1316 | delete txn2; | |
1317 | ||
1318 | // Verify that txn1 cannot commit since A will still be conflict checked | |
1319 | s = txn1->Commit(); | |
1320 | ASSERT_TRUE(s.IsBusy()); | |
1321 | delete txn1; | |
1322 | ||
1323 | txn1 = txn_db->BeginTransaction(write_options); | |
1324 | ||
1325 | s = txn1->GetForUpdate(read_options, "A", &value); | |
1326 | ASSERT_OK(s); | |
1327 | s = txn1->GetForUpdate(read_options, "A", &value); | |
1328 | ASSERT_OK(s); | |
1329 | ||
1330 | txn1->UndoGetForUpdate("A"); | |
1331 | ||
1332 | txn2 = txn_db->BeginTransaction(write_options); | |
1333 | txn2->Put("A", "x"); | |
1334 | s = txn2->Commit(); | |
1335 | ASSERT_OK(s); | |
1336 | delete txn2; | |
1337 | ||
1338 | // Verify that txn1 cannot commit since A will still be conflict checked | |
1339 | s = txn1->Commit(); | |
1340 | ASSERT_TRUE(s.IsBusy()); | |
1341 | delete txn1; | |
1342 | ||
1343 | txn1 = txn_db->BeginTransaction(write_options); | |
1344 | ||
1345 | s = txn1->GetForUpdate(read_options, "A", &value); | |
1346 | ASSERT_OK(s); | |
1347 | s = txn1->GetForUpdate(read_options, "A", &value); | |
1348 | ASSERT_OK(s); | |
1349 | ||
1350 | txn1->UndoGetForUpdate("A"); | |
1351 | txn1->UndoGetForUpdate("A"); | |
1352 | ||
1353 | txn2 = txn_db->BeginTransaction(write_options); | |
1354 | txn2->Put("A", "x"); | |
1355 | s = txn2->Commit(); | |
1356 | ASSERT_OK(s); | |
1357 | delete txn2; | |
1358 | ||
1359 | // Verify that txn1 can commit since A isn't conflict checked | |
1360 | s = txn1->Commit(); | |
1361 | ASSERT_OK(s); | |
1362 | delete txn1; | |
1363 | ||
1364 | txn1 = txn_db->BeginTransaction(write_options); | |
1365 | ||
1366 | s = txn1->GetForUpdate(read_options, "A", &value); | |
1367 | ASSERT_OK(s); | |
1368 | ||
1369 | txn1->SetSavePoint(); | |
1370 | txn1->UndoGetForUpdate("A"); | |
1371 | ||
1372 | txn2 = txn_db->BeginTransaction(write_options); | |
1373 | txn2->Put("A", "x"); | |
1374 | s = txn2->Commit(); | |
1375 | ASSERT_OK(s); | |
1376 | delete txn2; | |
1377 | ||
1378 | // Verify that txn1 cannot commit since A will still be conflict checked | |
1379 | s = txn1->Commit(); | |
1380 | ASSERT_TRUE(s.IsBusy()); | |
1381 | delete txn1; | |
1382 | ||
1383 | txn1 = txn_db->BeginTransaction(write_options); | |
1384 | ||
1385 | s = txn1->GetForUpdate(read_options, "A", &value); | |
1386 | ASSERT_OK(s); | |
1387 | ||
1388 | txn1->SetSavePoint(); | |
1389 | s = txn1->GetForUpdate(read_options, "A", &value); | |
1390 | ASSERT_OK(s); | |
1391 | txn1->UndoGetForUpdate("A"); | |
1392 | ||
1393 | txn2 = txn_db->BeginTransaction(write_options); | |
1394 | txn2->Put("A", "x"); | |
1395 | s = txn2->Commit(); | |
1396 | ASSERT_OK(s); | |
1397 | delete txn2; | |
1398 | ||
1399 | // Verify that txn1 cannot commit since A will still be conflict checked | |
1400 | s = txn1->Commit(); | |
1401 | ASSERT_TRUE(s.IsBusy()); | |
1402 | delete txn1; | |
1403 | ||
1404 | txn1 = txn_db->BeginTransaction(write_options); | |
1405 | ||
1406 | s = txn1->GetForUpdate(read_options, "A", &value); | |
1407 | ASSERT_OK(s); | |
1408 | ||
1409 | txn1->SetSavePoint(); | |
1410 | s = txn1->GetForUpdate(read_options, "A", &value); | |
1411 | ASSERT_OK(s); | |
1412 | txn1->UndoGetForUpdate("A"); | |
1413 | ||
1414 | txn1->RollbackToSavePoint(); | |
1415 | txn1->UndoGetForUpdate("A"); | |
1416 | ||
1417 | txn2 = txn_db->BeginTransaction(write_options); | |
1418 | txn2->Put("A", "x"); | |
1419 | s = txn2->Commit(); | |
1420 | ASSERT_OK(s); | |
1421 | delete txn2; | |
1422 | ||
1423 | // Verify that txn1 can commit since A isn't conflict checked | |
1424 | s = txn1->Commit(); | |
1425 | ASSERT_OK(s); | |
1426 | delete txn1; | |
1427 | } | |
1428 | ||
1429 | namespace { | |
1430 | Status OptimisticTransactionStressTestInserter(OptimisticTransactionDB* db, | |
1431 | const size_t num_transactions, | |
1432 | const size_t num_sets, | |
1433 | const size_t num_keys_per_set) { | |
1434 | size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id()); | |
1435 | Random64 _rand(seed); | |
1436 | WriteOptions write_options; | |
1437 | ReadOptions read_options; | |
1438 | OptimisticTransactionOptions txn_options; | |
1439 | txn_options.set_snapshot = true; | |
1440 | ||
1441 | RandomTransactionInserter inserter(&_rand, write_options, read_options, | |
1442 | num_keys_per_set, | |
1443 | static_cast<uint16_t>(num_sets)); | |
1444 | ||
1445 | for (size_t t = 0; t < num_transactions; t++) { | |
1446 | bool success = inserter.OptimisticTransactionDBInsert(db, txn_options); | |
1447 | if (!success) { | |
1448 | // unexpected failure | |
1449 | return inserter.GetLastStatus(); | |
1450 | } | |
1451 | } | |
1452 | ||
1453 | // Make sure at least some of the transactions succeeded. It's ok if | |
1454 | // some failed due to write-conflicts. | |
1455 | if (inserter.GetFailureCount() > num_transactions / 2) { | |
1456 | return Status::TryAgain("Too many transactions failed! " + | |
1457 | std::to_string(inserter.GetFailureCount()) + " / " + | |
1458 | std::to_string(num_transactions)); | |
1459 | } | |
1460 | ||
1461 | return Status::OK(); | |
1462 | } | |
1463 | } // namespace | |
1464 | ||
f67539c2 | 1465 | TEST_P(OptimisticTransactionTest, OptimisticTransactionStressTest) { |
7c673cae FG |
1466 | const size_t num_threads = 4; |
1467 | const size_t num_transactions_per_thread = 10000; | |
1468 | const size_t num_sets = 3; | |
1469 | const size_t num_keys_per_set = 100; | |
1470 | // Setting the key-space to be 100 keys should cause enough write-conflicts | |
1471 | // to make this test interesting. | |
1472 | ||
1473 | std::vector<port::Thread> threads; | |
1474 | ||
1475 | std::function<void()> call_inserter = [&] { | |
1476 | ASSERT_OK(OptimisticTransactionStressTestInserter( | |
1477 | txn_db, num_transactions_per_thread, num_sets, num_keys_per_set)); | |
1478 | }; | |
1479 | ||
1480 | // Create N threads that use RandomTransactionInserter to write | |
1481 | // many transactions. | |
1482 | for (uint32_t i = 0; i < num_threads; i++) { | |
1483 | threads.emplace_back(call_inserter); | |
1484 | } | |
1485 | ||
1486 | // Wait for all threads to run | |
1487 | for (auto& t : threads) { | |
1488 | t.join(); | |
1489 | } | |
1490 | ||
1491 | // Verify that data is consistent | |
11fdf7f2 | 1492 | Status s = RandomTransactionInserter::Verify(txn_db, num_sets); |
7c673cae FG |
1493 | ASSERT_OK(s); |
1494 | } | |
1495 | ||
f67539c2 | 1496 | TEST_P(OptimisticTransactionTest, SequenceNumberAfterRecoverTest) { |
7c673cae FG |
1497 | WriteOptions write_options; |
1498 | OptimisticTransactionOptions transaction_options; | |
1499 | ||
1500 | Transaction* transaction(txn_db->BeginTransaction(write_options, transaction_options)); | |
1501 | Status s = transaction->Put("foo", "val"); | |
1502 | ASSERT_OK(s); | |
1503 | s = transaction->Put("foo2", "val"); | |
1504 | ASSERT_OK(s); | |
1505 | s = transaction->Put("foo3", "val"); | |
1506 | ASSERT_OK(s); | |
1507 | s = transaction->Commit(); | |
1508 | ASSERT_OK(s); | |
1509 | delete transaction; | |
1510 | ||
1511 | Reopen(); | |
1512 | transaction = txn_db->BeginTransaction(write_options, transaction_options); | |
1513 | s = transaction->Put("bar", "val"); | |
1514 | ASSERT_OK(s); | |
1515 | s = transaction->Put("bar2", "val"); | |
1516 | ASSERT_OK(s); | |
1517 | s = transaction->Commit(); | |
1518 | ASSERT_OK(s); | |
1519 | ||
1520 | delete transaction; | |
1521 | } | |
1522 | ||
f67539c2 TL |
1523 | INSTANTIATE_TEST_CASE_P( |
1524 | InstanceOccGroup, OptimisticTransactionTest, | |
1525 | testing::Values(OccValidationPolicy::kValidateSerial, | |
1526 | OccValidationPolicy::kValidateParallel)); | |
1527 | ||
1528 | } // namespace ROCKSDB_NAMESPACE | |
7c673cae FG |
1529 | |
1530 | int main(int argc, char** argv) { | |
1531 | ::testing::InitGoogleTest(&argc, argv); | |
1532 | return RUN_ALL_TESTS(); | |
1533 | } | |
1534 | ||
1535 | #else | |
1536 | #include <stdio.h> | |
1537 | ||
11fdf7f2 | 1538 | int main(int /*argc*/, char** /*argv*/) { |
7c673cae FG |
1539 | fprintf( |
1540 | stderr, | |
1541 | "SKIPPED as optimistic_transaction is not supported in ROCKSDB_LITE\n"); | |
1542 | return 0; | |
1543 | } | |
1544 | ||
1545 | #endif // !ROCKSDB_LITE |