]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/transaction_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / transaction_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
11fdf7f2
TL
8#include "utilities/transactions/transaction_test.h"
9
7c673cae
FG
10#include <algorithm>
11#include <functional>
12#include <string>
13#include <thread>
14
f67539c2 15#include "db/db_impl/db_impl.h"
20effc67 16#include "port/port.h"
7c673cae
FG
17#include "rocksdb/db.h"
18#include "rocksdb/options.h"
11fdf7f2 19#include "rocksdb/perf_context.h"
7c673cae
FG
20#include "rocksdb/utilities/transaction.h"
21#include "rocksdb/utilities/transaction_db.h"
22#include "table/mock_table.h"
f67539c2
TL
23#include "test_util/sync_point.h"
24#include "test_util/testharness.h"
25#include "test_util/testutil.h"
26#include "test_util/transaction_test_util.h"
7c673cae
FG
27#include "util/random.h"
28#include "util/string_util.h"
20effc67 29#include "utilities/fault_injection_env.h"
7c673cae
FG
30#include "utilities/merge_operators.h"
31#include "utilities/merge_operators/string_append/stringappend.h"
11fdf7f2 32#include "utilities/transactions/pessimistic_transaction_db.h"
7c673cae 33
7c673cae
FG
34using std::string;
35
f67539c2 36namespace ROCKSDB_NAMESPACE {
7c673cae 37
11fdf7f2
TL
38INSTANTIATE_TEST_CASE_P(
39 DBAsBaseDB, TransactionTest,
f67539c2
TL
40 ::testing::Values(
41 std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite),
42 std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite),
43 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
44 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
45 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
46 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite),
47 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite)));
11fdf7f2
TL
48INSTANTIATE_TEST_CASE_P(
49 DBAsBaseDB, TransactionStressTest,
f67539c2
TL
50 ::testing::Values(
51 std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite),
52 std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite),
53 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
54 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
55 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite),
56 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite),
57 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite)));
11fdf7f2
TL
58INSTANTIATE_TEST_CASE_P(
59 StackableDBAsBaseDB, TransactionTest,
f67539c2
TL
60 ::testing::Values(
61 std::make_tuple(true, true, WRITE_COMMITTED, kOrderedWrite),
62 std::make_tuple(true, true, WRITE_PREPARED, kOrderedWrite),
63 std::make_tuple(true, true, WRITE_UNPREPARED, kOrderedWrite)));
11fdf7f2
TL
64
65// MySQLStyleTransactionTest takes far too long for valgrind to run.
66#ifndef ROCKSDB_VALGRIND_RUN
67INSTANTIATE_TEST_CASE_P(
68 MySQLStyleTransactionTest, MySQLStyleTransactionTest,
f67539c2
TL
69 ::testing::Values(
70 std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite, false),
71 std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite, false),
72 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, false),
73 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, true),
74 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, false),
75 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, true),
76 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, false),
77 std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, true),
78 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, false),
79 std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, true),
80 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, false),
81 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, true)));
11fdf7f2 82#endif // ROCKSDB_VALGRIND_RUN
7c673cae
FG
83
84TEST_P(TransactionTest, DoubleEmptyWrite) {
85 WriteOptions write_options;
86 write_options.sync = true;
87 write_options.disableWAL = false;
88
89 WriteBatch batch;
90
91 ASSERT_OK(db->Write(write_options, &batch));
92 ASSERT_OK(db->Write(write_options, &batch));
11fdf7f2
TL
93
94 // Also test committing empty transactions in 2PC
95 TransactionOptions txn_options;
96 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
97 ASSERT_OK(txn0->SetName("xid"));
98 ASSERT_OK(txn0->Prepare());
99 ASSERT_OK(txn0->Commit());
100 delete txn0;
101
102 // Also test that it works during recovery
103 txn0 = db->BeginTransaction(write_options, txn_options);
104 ASSERT_OK(txn0->SetName("xid2"));
105 txn0->Put(Slice("foo0"), Slice("bar0a"));
106 ASSERT_OK(txn0->Prepare());
107 delete txn0;
108 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
109 ASSERT_OK(ReOpenNoDelete());
110 assert(db != nullptr);
111 txn0 = db->GetTransactionByName("xid2");
112 ASSERT_OK(txn0->Commit());
113 delete txn0;
7c673cae
FG
114}
115
116TEST_P(TransactionTest, SuccessTest) {
117 ASSERT_OK(db->ResetStats());
118
119 WriteOptions write_options;
120 ReadOptions read_options;
11fdf7f2 121 std::string value;
7c673cae 122
11fdf7f2
TL
123 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
124 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
7c673cae
FG
125
126 Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
127 ASSERT_TRUE(txn);
128
129 ASSERT_EQ(0, txn->GetNumPuts());
130 ASSERT_LE(0, txn->GetID());
131
11fdf7f2 132 ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
7c673cae
FG
133 ASSERT_EQ(value, "bar");
134
11fdf7f2 135 ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
7c673cae
FG
136
137 ASSERT_EQ(1, txn->GetNumPuts());
138
11fdf7f2 139 ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
7c673cae
FG
140 ASSERT_EQ(value, "bar2");
141
11fdf7f2 142 ASSERT_OK(txn->Commit());
7c673cae 143
11fdf7f2 144 ASSERT_OK(db->Get(read_options, "foo", &value));
7c673cae
FG
145 ASSERT_EQ(value, "bar2");
146
147 delete txn;
148}
149
494da23a
TL
150// The test clarifies the contract of do_validate and assume_tracked
151// in GetForUpdate and Put/Merge/Delete
152TEST_P(TransactionTest, AssumeExclusiveTracked) {
153 WriteOptions write_options;
154 ReadOptions read_options;
155 std::string value;
156 Status s;
157 TransactionOptions txn_options;
158 txn_options.lock_timeout = 1;
159 const bool EXCLUSIVE = true;
160 const bool DO_VALIDATE = true;
161 const bool ASSUME_LOCKED = true;
162
163 Transaction* txn = db->BeginTransaction(write_options, txn_options);
164 ASSERT_TRUE(txn);
165 txn->SetSnapshot();
166
167 // commit a value after the snapshot is taken
168 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
169
170 // By default write should fail to the commit after our snapshot
171 s = txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE);
172 ASSERT_TRUE(s.IsBusy());
173 // But the user could direct the db to skip validating the snapshot. The read
174 // value then should be the most recently committed
175 ASSERT_OK(
176 txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE, !DO_VALIDATE));
177 ASSERT_EQ(value, "bar");
178
179 // Although ValidateSnapshot is skipped the key must have still got locked
180 s = db->Put(write_options, Slice("foo"), Slice("bar"));
181 ASSERT_TRUE(s.IsTimedOut());
182
183 // By default the write operations should fail due to the commit after the
184 // snapshot
185 s = txn->Put(Slice("foo"), Slice("bar1"));
186 ASSERT_TRUE(s.IsBusy());
187 s = txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
188 !ASSUME_LOCKED);
189 ASSERT_TRUE(s.IsBusy());
190 // But the user could direct the db that it already assumes exclusive lock on
191 // the key due to the previous GetForUpdate call.
192 ASSERT_OK(txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
193 ASSUME_LOCKED));
194 ASSERT_OK(txn->Merge(db->DefaultColumnFamily(), Slice("foo"), Slice("bar2"),
195 ASSUME_LOCKED));
196 ASSERT_OK(
197 txn->Delete(db->DefaultColumnFamily(), Slice("foo"), ASSUME_LOCKED));
198 ASSERT_OK(txn->SingleDelete(db->DefaultColumnFamily(), Slice("foo"),
199 ASSUME_LOCKED));
200
20effc67 201 ASSERT_OK(txn->Rollback());
494da23a
TL
202 delete txn;
203}
204
11fdf7f2
TL
205// This test clarifies the contract of ValidateSnapshot
206TEST_P(TransactionTest, ValidateSnapshotTest) {
494da23a
TL
207 for (bool with_flush : {true}) {
208 for (bool with_2pc : {true}) {
209 ASSERT_OK(ReOpen());
210 WriteOptions write_options;
211 ReadOptions read_options;
212 std::string value;
11fdf7f2 213
494da23a
TL
214 assert(db != nullptr);
215 Transaction* txn1 =
216 db->BeginTransaction(write_options, TransactionOptions());
217 ASSERT_TRUE(txn1);
218 ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1")));
219 if (with_2pc) {
220 ASSERT_OK(txn1->SetName("xid1"));
221 ASSERT_OK(txn1->Prepare());
222 }
223
224 if (with_flush) {
20effc67
TL
225 auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
226 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
494da23a
TL
227 // Make sure the flushed memtable is not kept in memory
228 int max_memtable_in_history =
f67539c2
TL
229 std::max(
230 options.max_write_buffer_number,
231 static_cast<int>(options.max_write_buffer_size_to_maintain) /
232 static_cast<int>(options.write_buffer_size)) +
494da23a
TL
233 1;
234 for (int i = 0; i < max_memtable_in_history; i++) {
20effc67
TL
235 ASSERT_OK(db->Put(write_options, Slice("key"), Slice("value")));
236 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
494da23a
TL
237 }
238 }
11fdf7f2 239
494da23a
TL
240 Transaction* txn2 =
241 db->BeginTransaction(write_options, TransactionOptions());
242 ASSERT_TRUE(txn2);
243 txn2->SetSnapshot();
11fdf7f2 244
494da23a
TL
245 ASSERT_OK(txn1->Commit());
246 delete txn1;
11fdf7f2 247
494da23a
TL
248 auto pes_txn2 = dynamic_cast<PessimisticTransaction*>(txn2);
249 // Test the simple case where the key is not tracked yet
250 auto trakced_seq = kMaxSequenceNumber;
251 auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo",
252 &trakced_seq);
253 ASSERT_TRUE(s.IsBusy());
254 delete txn2;
255 }
11fdf7f2
TL
256 }
257}
258
7c673cae
FG
259TEST_P(TransactionTest, WaitingTxn) {
260 WriteOptions write_options;
261 ReadOptions read_options;
262 TransactionOptions txn_options;
263 string value;
264 Status s;
265
266 txn_options.lock_timeout = 1;
267 s = db->Put(write_options, Slice("foo"), Slice("bar"));
268 ASSERT_OK(s);
269
270 /* create second cf */
271 ColumnFamilyHandle* cfa;
272 ColumnFamilyOptions cf_options;
273 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
274 ASSERT_OK(s);
275 s = db->Put(write_options, cfa, Slice("foo"), Slice("bar"));
276 ASSERT_OK(s);
277
278 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
279 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
280 TransactionID id1 = txn1->GetID();
281 ASSERT_TRUE(txn1);
282 ASSERT_TRUE(txn2);
283
f67539c2 284 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
20effc67 285 "PointLockManager::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) {
7c673cae
FG
286 std::string key;
287 uint32_t cf_id;
288 std::vector<TransactionID> wait = txn2->GetWaitingTxns(&cf_id, &key);
289 ASSERT_EQ(key, "foo");
290 ASSERT_EQ(wait.size(), 1);
291 ASSERT_EQ(wait[0], id1);
f67539c2 292 ASSERT_EQ(cf_id, 0U);
7c673cae
FG
293 });
294
11fdf7f2 295 get_perf_context()->Reset();
7c673cae
FG
296 // lock key in default cf
297 s = txn1->GetForUpdate(read_options, "foo", &value);
298 ASSERT_OK(s);
299 ASSERT_EQ(value, "bar");
11fdf7f2 300 ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
7c673cae
FG
301
302 // lock key in cfa
303 s = txn1->GetForUpdate(read_options, cfa, "foo", &value);
304 ASSERT_OK(s);
305 ASSERT_EQ(value, "bar");
11fdf7f2 306 ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
7c673cae
FG
307
308 auto lock_data = db->GetLockStatusData();
309 // Locked keys exist in both column family.
310 ASSERT_EQ(lock_data.size(), 2);
311
312 auto cf_iterator = lock_data.begin();
313
314 // The iterator points to an unordered_multimap
315 // thus the test can not assume any particular order.
316
317 // Column family is 1 or 0 (cfa).
318 if (cf_iterator->first != 1 && cf_iterator->first != 0) {
11fdf7f2 319 FAIL();
7c673cae
FG
320 }
321 // The locked key is "foo" and is locked by txn1
322 ASSERT_EQ(cf_iterator->second.key, "foo");
323 ASSERT_EQ(cf_iterator->second.ids.size(), 1);
324 ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
325
326 cf_iterator++;
327
328 // Column family is 0 (default) or 1.
329 if (cf_iterator->first != 1 && cf_iterator->first != 0) {
11fdf7f2 330 FAIL();
7c673cae
FG
331 }
332 // The locked key is "foo" and is locked by txn1
333 ASSERT_EQ(cf_iterator->second.key, "foo");
334 ASSERT_EQ(cf_iterator->second.ids.size(), 1);
335 ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
336
f67539c2 337 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
338
339 s = txn2->GetForUpdate(read_options, "foo", &value);
340 ASSERT_TRUE(s.IsTimedOut());
341 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
11fdf7f2
TL
342 ASSERT_EQ(get_perf_context()->key_lock_wait_count, 1);
343 ASSERT_GE(get_perf_context()->key_lock_wait_time, 0);
7c673cae 344
f67539c2
TL
345 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
346 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
7c673cae
FG
347
348 delete cfa;
349 delete txn1;
350 delete txn2;
351}
352
353TEST_P(TransactionTest, SharedLocks) {
354 WriteOptions write_options;
355 ReadOptions read_options;
356 TransactionOptions txn_options;
357 Status s;
358
359 txn_options.lock_timeout = 1;
360 s = db->Put(write_options, Slice("foo"), Slice("bar"));
361 ASSERT_OK(s);
362
363 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
364 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
365 Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
366 ASSERT_TRUE(txn1);
367 ASSERT_TRUE(txn2);
368 ASSERT_TRUE(txn3);
369
370 // Test shared access between txns
371 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
372 ASSERT_OK(s);
373
374 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
375 ASSERT_OK(s);
376
377 s = txn3->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
378 ASSERT_OK(s);
379
380 auto lock_data = db->GetLockStatusData();
381 ASSERT_EQ(lock_data.size(), 1);
382
383 auto cf_iterator = lock_data.begin();
384 ASSERT_EQ(cf_iterator->second.key, "foo");
385
386 // We compare whether the set of txns locking this key is the same. To do
387 // this, we need to sort both vectors so that the comparison is done
388 // correctly.
389 std::vector<TransactionID> expected_txns = {txn1->GetID(), txn2->GetID(),
390 txn3->GetID()};
391 std::vector<TransactionID> lock_txns = cf_iterator->second.ids;
392 ASSERT_EQ(expected_txns, lock_txns);
393 ASSERT_FALSE(cf_iterator->second.exclusive);
394
20effc67
TL
395 ASSERT_OK(txn1->Rollback());
396 ASSERT_OK(txn2->Rollback());
397 ASSERT_OK(txn3->Rollback());
7c673cae
FG
398
399 // Test txn1 and txn2 sharing a lock and txn3 trying to obtain it.
400 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
401 ASSERT_OK(s);
402
403 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
404 ASSERT_OK(s);
405
406 s = txn3->GetForUpdate(read_options, "foo", nullptr);
407 ASSERT_TRUE(s.IsTimedOut());
408 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
409
410 txn1->UndoGetForUpdate("foo");
411 s = txn3->GetForUpdate(read_options, "foo", nullptr);
412 ASSERT_TRUE(s.IsTimedOut());
413 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
414
415 txn2->UndoGetForUpdate("foo");
416 s = txn3->GetForUpdate(read_options, "foo", nullptr);
417 ASSERT_OK(s);
418
20effc67
TL
419 ASSERT_OK(txn1->Rollback());
420 ASSERT_OK(txn2->Rollback());
421 ASSERT_OK(txn3->Rollback());
7c673cae
FG
422
423 // Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock.
424 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
425 ASSERT_OK(s);
426
427 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
428 ASSERT_OK(s);
429
430 s = txn2->GetForUpdate(read_options, "foo", nullptr);
431 ASSERT_TRUE(s.IsTimedOut());
432 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
433
434 txn1->UndoGetForUpdate("foo");
435 s = txn2->GetForUpdate(read_options, "foo", nullptr);
436 ASSERT_OK(s);
437
11fdf7f2
TL
438 ASSERT_OK(txn1->Rollback());
439 ASSERT_OK(txn2->Rollback());
7c673cae
FG
440
441 // Test txn1 trying to downgrade its lock.
442 s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */);
443 ASSERT_OK(s);
444
445 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
446 ASSERT_TRUE(s.IsTimedOut());
447 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
448
449 // Should still fail after "downgrading".
450 s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
451 ASSERT_OK(s);
452
453 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
454 ASSERT_TRUE(s.IsTimedOut());
455 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
456
20effc67
TL
457 ASSERT_OK(txn1->Rollback());
458 ASSERT_OK(txn2->Rollback());
7c673cae
FG
459
460 // Test txn1 holding an exclusive lock and txn2 trying to obtain shared
461 // access.
462 s = txn1->GetForUpdate(read_options, "foo", nullptr);
463 ASSERT_OK(s);
464
465 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
466 ASSERT_TRUE(s.IsTimedOut());
467 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
468
469 txn1->UndoGetForUpdate("foo");
470 s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
471 ASSERT_OK(s);
472
473 delete txn1;
474 delete txn2;
475 delete txn3;
476}
477
478TEST_P(TransactionTest, DeadlockCycleShared) {
479 WriteOptions write_options;
480 ReadOptions read_options;
481 TransactionOptions txn_options;
482
483 txn_options.lock_timeout = 1000000;
484 txn_options.deadlock_detect = true;
485
486 // Set up a wait for chain like this:
487 //
488 // Tn -> T(n*2)
489 // Tn -> T(n*2 + 1)
490 //
491 // So we have:
492 // T1 -> T2 -> T4 ...
493 // | |> T5 ...
494 // |> T3 -> T6 ...
495 // |> T7 ...
496 // up to T31, then T[16 - 31] -> T1.
497 // Note that Tn holds lock on floor(n / 2).
498
499 std::vector<Transaction*> txns(31);
500
501 for (uint32_t i = 0; i < 31; i++) {
502 txns[i] = db->BeginTransaction(write_options, txn_options);
503 ASSERT_TRUE(txns[i]);
504 auto s = txns[i]->GetForUpdate(read_options, ToString((i + 1) / 2), nullptr,
505 false /* exclusive */);
506 ASSERT_OK(s);
507 }
508
509 std::atomic<uint32_t> checkpoints(0);
f67539c2 510 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
20effc67 511 "PointLockManager::AcquireWithTimeout:WaitingTxn",
11fdf7f2 512 [&](void* /*arg*/) { checkpoints.fetch_add(1); });
f67539c2 513 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
514
515 // We want the leaf transactions to block and hold everyone back.
516 std::vector<port::Thread> threads;
517 for (uint32_t i = 0; i < 15; i++) {
518 std::function<void()> blocking_thread = [&, i] {
519 auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr,
520 true /* exclusive */);
521 ASSERT_OK(s);
20effc67 522 ASSERT_OK(txns[i]->Rollback());
7c673cae
FG
523 delete txns[i];
524 };
525 threads.emplace_back(blocking_thread);
526 }
527
528 // Wait until all threads are waiting on each other.
529 while (checkpoints.load() != 15) {
530 /* sleep override */
531 std::this_thread::sleep_for(std::chrono::milliseconds(100));
532 }
f67539c2
TL
533 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
534 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
7c673cae
FG
535
536 // Complete the cycle T[16 - 31] -> T1
537 for (uint32_t i = 15; i < 31; i++) {
538 auto s =
539 txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */);
540 ASSERT_TRUE(s.IsDeadlock());
11fdf7f2
TL
541
542 // Calculate next buffer len, plateau at 5 when 5 records are inserted.
543 const uint32_t curr_dlock_buffer_len_ =
544 (i - 14 > kInitialMaxDeadlocks) ? kInitialMaxDeadlocks : (i - 14);
545
546 auto dlock_buffer = db->GetDeadlockInfoBuffer();
547 ASSERT_EQ(dlock_buffer.size(), curr_dlock_buffer_len_);
548 auto dlock_entry = dlock_buffer[0].path;
549 ASSERT_EQ(dlock_entry.size(), kInitialMaxDeadlocks);
550 int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
551 int64_t cur_deadlock_time = 0;
552 for (auto const& dl_path_rec : dlock_buffer) {
553 cur_deadlock_time = dl_path_rec.deadlock_time;
554 ASSERT_NE(cur_deadlock_time, 0);
555 ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
556 pre_deadlock_time = cur_deadlock_time;
557 }
558
559 int64_t curr_waiting_key = 0;
560
561 // Offset of each txn id from the root of the shared dlock tree's txn id.
562 int64_t offset_root = dlock_entry[0].m_txn_id - 1;
563 // Offset of the final entry in the dlock path from the root's txn id.
564 TransactionID leaf_id =
565 dlock_entry[dlock_entry.size() - 1].m_txn_id - offset_root;
566
f67539c2 567 for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); ++it) {
11fdf7f2
TL
568 auto dl_node = *it;
569 ASSERT_EQ(dl_node.m_txn_id, offset_root + leaf_id);
f67539c2 570 ASSERT_EQ(dl_node.m_cf_id, 0U);
11fdf7f2
TL
571 ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
572 ASSERT_EQ(dl_node.m_exclusive, true);
573
574 if (curr_waiting_key == 0) {
575 curr_waiting_key = leaf_id;
576 }
577 curr_waiting_key /= 2;
578 leaf_id /= 2;
579 }
7c673cae
FG
580 }
581
582 // Rollback the leaf transaction.
583 for (uint32_t i = 15; i < 31; i++) {
20effc67 584 ASSERT_OK(txns[i]->Rollback());
7c673cae
FG
585 delete txns[i];
586 }
587
588 for (auto& t : threads) {
589 t.join();
590 }
11fdf7f2
TL
591
592 // Downsize the buffer and verify the 3 latest deadlocks are preserved.
593 auto dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
594 db->SetDeadlockInfoBufferSize(3);
595 auto dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
596 ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
597
598 for (uint32_t i = 0; i < dlock_buffer_after_resize.size(); i++) {
599 for (uint32_t j = 0; j < dlock_buffer_after_resize[i].path.size(); j++) {
600 ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
601 dlock_buffer_before_resize[i].path[j].m_txn_id);
602 }
603 }
604
605 // Upsize the buffer and verify the 3 latest dealocks are preserved.
606 dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
607 db->SetDeadlockInfoBufferSize(5);
608 dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
609 ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
610
611 for (uint32_t i = 0; i < dlock_buffer_before_resize.size(); i++) {
612 for (uint32_t j = 0; j < dlock_buffer_before_resize[i].path.size(); j++) {
613 ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
614 dlock_buffer_before_resize[i].path[j].m_txn_id);
615 }
616 }
617
618 // Downsize to 0 and verify the size is consistent.
619 dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
620 db->SetDeadlockInfoBufferSize(0);
621 dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
622 ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
623
624 // Upsize from 0 to verify the size is persistent.
625 dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
626 db->SetDeadlockInfoBufferSize(3);
627 dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
628 ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
629
630 // Contrived case of shared lock of cycle size 2 to verify that a shared
631 // lock causing a deadlock is correctly reported as "shared" in the buffer.
632 std::vector<Transaction*> txns_shared(2);
633
634 // Create a cycle of size 2.
635 for (uint32_t i = 0; i < 2; i++) {
636 txns_shared[i] = db->BeginTransaction(write_options, txn_options);
637 ASSERT_TRUE(txns_shared[i]);
638 auto s = txns_shared[i]->GetForUpdate(read_options, ToString(i), nullptr);
639 ASSERT_OK(s);
640 }
641
642 std::atomic<uint32_t> checkpoints_shared(0);
f67539c2 643 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
20effc67 644 "PointLockManager::AcquireWithTimeout:WaitingTxn",
11fdf7f2 645 [&](void* /*arg*/) { checkpoints_shared.fetch_add(1); });
f67539c2 646 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
11fdf7f2
TL
647
648 std::vector<port::Thread> threads_shared;
649 for (uint32_t i = 0; i < 1; i++) {
650 std::function<void()> blocking_thread = [&, i] {
651 auto s =
652 txns_shared[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
653 ASSERT_OK(s);
20effc67 654 ASSERT_OK(txns_shared[i]->Rollback());
11fdf7f2
TL
655 delete txns_shared[i];
656 };
657 threads_shared.emplace_back(blocking_thread);
658 }
659
660 // Wait until all threads are waiting on each other.
661 while (checkpoints_shared.load() != 1) {
662 /* sleep override */
663 std::this_thread::sleep_for(std::chrono::milliseconds(100));
664 }
f67539c2
TL
665 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
666 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
11fdf7f2
TL
667
668 // Complete the cycle T2 -> T1 with a shared lock.
669 auto s = txns_shared[1]->GetForUpdate(read_options, "0", nullptr, false);
670 ASSERT_TRUE(s.IsDeadlock());
671
672 auto dlock_buffer = db->GetDeadlockInfoBuffer();
673
674 // Verify the size of the buffer and the single path.
675 ASSERT_EQ(dlock_buffer.size(), 1);
676 ASSERT_EQ(dlock_buffer[0].path.size(), 2);
677
678 // Verify the exclusivity field of the transactions in the deadlock path.
679 ASSERT_TRUE(dlock_buffer[0].path[0].m_exclusive);
680 ASSERT_FALSE(dlock_buffer[0].path[1].m_exclusive);
20effc67 681 ASSERT_OK(txns_shared[1]->Rollback());
11fdf7f2
TL
682 delete txns_shared[1];
683
684 for (auto& t : threads_shared) {
685 t.join();
686 }
7c673cae
FG
687}
688
494da23a 689#ifndef ROCKSDB_VALGRIND_RUN
11fdf7f2 690TEST_P(TransactionStressTest, DeadlockCycle) {
7c673cae
FG
691 WriteOptions write_options;
692 ReadOptions read_options;
693 TransactionOptions txn_options;
694
11fdf7f2
TL
695 // offset by 2 from the max depth to test edge case
696 const uint32_t kMaxCycleLength = 52;
7c673cae
FG
697
698 txn_options.lock_timeout = 1000000;
699 txn_options.deadlock_detect = true;
700
701 for (uint32_t len = 2; len < kMaxCycleLength; len++) {
702 // Set up a long wait for chain like this:
703 //
704 // T1 -> T2 -> T3 -> ... -> Tlen
11fdf7f2 705
7c673cae
FG
706 std::vector<Transaction*> txns(len);
707
708 for (uint32_t i = 0; i < len; i++) {
709 txns[i] = db->BeginTransaction(write_options, txn_options);
710 ASSERT_TRUE(txns[i]);
711 auto s = txns[i]->GetForUpdate(read_options, ToString(i), nullptr);
712 ASSERT_OK(s);
713 }
714
715 std::atomic<uint32_t> checkpoints(0);
f67539c2 716 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
20effc67 717 "PointLockManager::AcquireWithTimeout:WaitingTxn",
11fdf7f2 718 [&](void* /*arg*/) { checkpoints.fetch_add(1); });
f67539c2 719 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
720
721 // We want the last transaction in the chain to block and hold everyone
722 // back.
723 std::vector<port::Thread> threads;
20effc67 724 for (uint32_t i = 0; i + 1 < len; i++) {
7c673cae 725 std::function<void()> blocking_thread = [&, i] {
11fdf7f2 726 auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
7c673cae 727 ASSERT_OK(s);
20effc67 728 ASSERT_OK(txns[i]->Rollback());
7c673cae
FG
729 delete txns[i];
730 };
731 threads.emplace_back(blocking_thread);
732 }
733
734 // Wait until all threads are waiting on each other.
735 while (checkpoints.load() != len - 1) {
736 /* sleep override */
737 std::this_thread::sleep_for(std::chrono::milliseconds(100));
738 }
f67539c2
TL
739 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
740 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
7c673cae
FG
741
742 // Complete the cycle Tlen -> T1
743 auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr);
744 ASSERT_TRUE(s.IsDeadlock());
745
11fdf7f2
TL
746 const uint32_t dlock_buffer_size_ = (len - 1 > 5) ? 5 : (len - 1);
747 uint32_t curr_waiting_key = 0;
748 TransactionID curr_txn_id = txns[0]->GetID();
749
750 auto dlock_buffer = db->GetDeadlockInfoBuffer();
751 ASSERT_EQ(dlock_buffer.size(), dlock_buffer_size_);
752 uint32_t check_len = len;
753 bool check_limit_flag = false;
754
755 // Special case for a deadlock path that exceeds the maximum depth.
756 if (len > 50) {
757 check_len = 0;
758 check_limit_flag = true;
759 }
760 auto dlock_entry = dlock_buffer[0].path;
761 ASSERT_EQ(dlock_entry.size(), check_len);
762 ASSERT_EQ(dlock_buffer[0].limit_exceeded, check_limit_flag);
763
764 int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
765 int64_t cur_deadlock_time = 0;
766 for (auto const& dl_path_rec : dlock_buffer) {
767 cur_deadlock_time = dl_path_rec.deadlock_time;
768 ASSERT_NE(cur_deadlock_time, 0);
769 ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
770 pre_deadlock_time = cur_deadlock_time;
771 }
772
773 // Iterates backwards over path verifying decreasing txn_ids.
f67539c2 774 for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); ++it) {
11fdf7f2
TL
775 auto dl_node = *it;
776 ASSERT_EQ(dl_node.m_txn_id, len + curr_txn_id - 1);
f67539c2 777 ASSERT_EQ(dl_node.m_cf_id, 0u);
11fdf7f2
TL
778 ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
779 ASSERT_EQ(dl_node.m_exclusive, true);
780
781 curr_txn_id--;
782 if (curr_waiting_key == 0) {
783 curr_waiting_key = len;
784 }
785 curr_waiting_key--;
786 }
787
7c673cae 788 // Rollback the last transaction.
20effc67 789 ASSERT_OK(txns[len - 1]->Rollback());
7c673cae
FG
790 delete txns[len - 1];
791
792 for (auto& t : threads) {
793 t.join();
794 }
795 }
796}
797
11fdf7f2 798TEST_P(TransactionStressTest, DeadlockStress) {
7c673cae
FG
799 const uint32_t NUM_TXN_THREADS = 10;
800 const uint32_t NUM_KEYS = 100;
11fdf7f2 801 const uint32_t NUM_ITERS = 10000;
7c673cae
FG
802
803 WriteOptions write_options;
804 ReadOptions read_options;
805 TransactionOptions txn_options;
806
807 txn_options.lock_timeout = 1000000;
808 txn_options.deadlock_detect = true;
809 std::vector<std::string> keys;
810
811 for (uint32_t i = 0; i < NUM_KEYS; i++) {
20effc67 812 ASSERT_OK(db->Put(write_options, Slice(ToString(i)), Slice("")));
7c673cae
FG
813 keys.push_back(ToString(i));
814 }
815
816 size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
817 Random rnd(static_cast<uint32_t>(tid));
818 std::function<void(uint32_t)> stress_thread = [&](uint32_t seed) {
819 std::default_random_engine g(seed);
820
821 Transaction* txn;
822 for (uint32_t i = 0; i < NUM_ITERS; i++) {
823 txn = db->BeginTransaction(write_options, txn_options);
824 auto random_keys = keys;
825 std::shuffle(random_keys.begin(), random_keys.end(), g);
826
827 // Lock keys in random order.
828 for (const auto& k : random_keys) {
829 // Lock mostly for shared access, but exclusive 1/4 of the time.
830 auto s =
831 txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0);
832 if (!s.ok()) {
833 ASSERT_TRUE(s.IsDeadlock());
20effc67 834 ASSERT_OK(txn->Rollback());
7c673cae
FG
835 break;
836 }
837 }
838
839 delete txn;
840 }
841 };
842
843 std::vector<port::Thread> threads;
844 for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
845 threads.emplace_back(stress_thread, rnd.Next());
846 }
847
848 for (auto& t : threads) {
849 t.join();
850 }
851}
494da23a 852#endif // ROCKSDB_VALGRIND_RUN
7c673cae
FG
853
854TEST_P(TransactionTest, CommitTimeBatchFailTest) {
855 WriteOptions write_options;
856 TransactionOptions txn_options;
857
11fdf7f2 858 std::string value;
7c673cae
FG
859 Status s;
860
861 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
862 ASSERT_TRUE(txn1);
863
11fdf7f2 864 ASSERT_OK(txn1->GetCommitTimeWriteBatch()->Put("cat", "dog"));
7c673cae
FG
865
866 s = txn1->Put("foo", "bar");
867 ASSERT_OK(s);
868
869 // fails due to non-empty commit-time batch
870 s = txn1->Commit();
871 ASSERT_EQ(s, Status::InvalidArgument());
872
873 delete txn1;
874}
875
11fdf7f2
TL
876TEST_P(TransactionTest, LogMarkLeakTest) {
877 TransactionOptions txn_options;
7c673cae 878 WriteOptions write_options;
11fdf7f2
TL
879 options.write_buffer_size = 1024;
880 ASSERT_OK(ReOpenNoDelete());
881 assert(db != nullptr);
882 Random rnd(47);
883 std::vector<Transaction*> txns;
20effc67 884 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
11fdf7f2
TL
885 // At the beginning there should be no log containing prepare data
886 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
887 for (size_t i = 0; i < 100; i++) {
888 Transaction* txn = db->BeginTransaction(write_options, txn_options);
889 ASSERT_OK(txn->SetName("xid" + ToString(i)));
890 ASSERT_OK(txn->Put(Slice("foo" + ToString(i)), Slice("bar")));
891 ASSERT_OK(txn->Prepare());
892 ASSERT_GT(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
893 if (rnd.OneIn(5)) {
894 txns.push_back(txn);
895 } else {
896 ASSERT_OK(txn->Commit());
897 delete txn;
898 }
20effc67 899 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
11fdf7f2
TL
900 }
901 for (auto txn : txns) {
902 ASSERT_OK(txn->Commit());
903 delete txn;
904 }
905 // At the end there should be no log left containing prepare data
906 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
907 // Make sure that the underlying data structures are properly truncated and
908 // cause not leak
909 ASSERT_EQ(db_impl->TEST_PreparedSectionCompletedSize(), 0);
910 ASSERT_EQ(db_impl->TEST_LogsWithPrepSize(), 0);
911}
7c673cae 912
11fdf7f2
TL
913TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
914 for (bool cwb4recovery : {true, false}) {
915 ASSERT_OK(ReOpen());
916 WriteOptions write_options;
917 ReadOptions read_options;
7c673cae 918
11fdf7f2
TL
919 TransactionOptions txn_options;
920 txn_options.use_only_the_last_commit_time_batch_for_recovery = cwb4recovery;
7c673cae 921
11fdf7f2
TL
922 string value;
923 Status s;
7c673cae 924
20effc67 925 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
7c673cae 926
11fdf7f2
TL
927 Transaction* txn = db->BeginTransaction(write_options, txn_options);
928 s = txn->SetName("xid");
929 ASSERT_OK(s);
7c673cae 930
11fdf7f2 931 ASSERT_EQ(db->GetTransactionByName("xid"), txn);
7c673cae 932
11fdf7f2
TL
933 // transaction put
934 s = txn->Put(Slice("foo"), Slice("bar"));
935 ASSERT_OK(s);
936 ASSERT_EQ(1, txn->GetNumPuts());
7c673cae 937
11fdf7f2
TL
938 // regular db put
939 s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
940 ASSERT_OK(s);
941 ASSERT_EQ(1, txn->GetNumPuts());
7c673cae 942
11fdf7f2 943 // regular db read
20effc67 944 ASSERT_OK(db->Get(read_options, "foo2", &value));
11fdf7f2 945 ASSERT_EQ(value, "bar2");
7c673cae 946
11fdf7f2 947 // commit time put
20effc67
TL
948 ASSERT_OK(
949 txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")));
950 ASSERT_OK(
951 txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")));
7c673cae 952
11fdf7f2
TL
953 // nothing has been prepped yet
954 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
7c673cae 955
11fdf7f2
TL
956 s = txn->Prepare();
957 ASSERT_OK(s);
7c673cae 958
11fdf7f2
TL
959 // data not im mem yet
960 s = db->Get(read_options, Slice("foo"), &value);
961 ASSERT_TRUE(s.IsNotFound());
962 s = db->Get(read_options, Slice("gtid"), &value);
963 ASSERT_TRUE(s.IsNotFound());
7c673cae 964
11fdf7f2
TL
965 // find trans in list of prepared transactions
966 std::vector<Transaction*> prepared_trans;
967 db->GetAllPreparedTransactions(&prepared_trans);
968 ASSERT_EQ(prepared_trans.size(), 1);
969 ASSERT_EQ(prepared_trans.front()->GetName(), "xid");
7c673cae 970
11fdf7f2
TL
971 auto log_containing_prep =
972 db_impl->TEST_FindMinLogContainingOutstandingPrep();
973 ASSERT_GT(log_containing_prep, 0);
7c673cae 974
11fdf7f2
TL
975 // make commit
976 s = txn->Commit();
977 ASSERT_OK(s);
7c673cae 978
11fdf7f2
TL
979 // value is now available
980 s = db->Get(read_options, "foo", &value);
981 ASSERT_OK(s);
982 ASSERT_EQ(value, "bar");
7c673cae 983
11fdf7f2
TL
984 if (!cwb4recovery) {
985 s = db->Get(read_options, "gtid", &value);
986 ASSERT_OK(s);
987 ASSERT_EQ(value, "dogs");
7c673cae 988
11fdf7f2
TL
989 s = db->Get(read_options, "gtid2", &value);
990 ASSERT_OK(s);
991 ASSERT_EQ(value, "cats");
992 }
7c673cae 993
11fdf7f2
TL
994 // we already committed
995 s = txn->Commit();
996 ASSERT_EQ(s, Status::InvalidArgument());
997
998 // no longer is prepared results
999 db->GetAllPreparedTransactions(&prepared_trans);
1000 ASSERT_EQ(prepared_trans.size(), 0);
1001 ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1002
1003 // heap should not care about prepared section anymore
1004 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1005
1006 switch (txn_db_options.write_policy) {
1007 case WRITE_COMMITTED:
1008 // but now our memtable should be referencing the prep section
1009 ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
1010 ASSERT_EQ(log_containing_prep,
1011 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1012 break;
1013 case WRITE_PREPARED:
1014 case WRITE_UNPREPARED:
1015 // In these modes memtable do not ref the prep sections
1016 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1017 break;
1018 default:
1019 assert(false);
1020 }
7c673cae 1021
20effc67 1022 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
11fdf7f2
TL
1023 // After flush the recoverable state must be visible
1024 if (cwb4recovery) {
1025 s = db->Get(read_options, "gtid", &value);
1026 ASSERT_OK(s);
1027 ASSERT_EQ(value, "dogs");
7c673cae 1028
11fdf7f2
TL
1029 s = db->Get(read_options, "gtid2", &value);
1030 ASSERT_OK(s);
1031 ASSERT_EQ(value, "cats");
1032 }
7c673cae 1033
11fdf7f2
TL
1034 // after memtable flush we can now relese the log
1035 ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
1036 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
7c673cae 1037
11fdf7f2 1038 delete txn;
7c673cae 1039
11fdf7f2
TL
1040 if (cwb4recovery) {
1041 // kill and reopen to trigger recovery
1042 s = ReOpenNoDelete();
1043 ASSERT_OK(s);
1044 assert(db != nullptr);
1045 s = db->Get(read_options, "gtid", &value);
1046 ASSERT_OK(s);
1047 ASSERT_EQ(value, "dogs");
1048
1049 s = db->Get(read_options, "gtid2", &value);
1050 ASSERT_OK(s);
1051 ASSERT_EQ(value, "cats");
1052 }
1053 }
7c673cae
FG
1054}
1055
1056TEST_P(TransactionTest, TwoPhaseNameTest) {
1057 Status s;
1058
1059 WriteOptions write_options;
1060 TransactionOptions txn_options;
1061 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1062 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1063 Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
1064 ASSERT_TRUE(txn3);
1065 delete txn3;
1066
1067 // cant prepare txn without name
1068 s = txn1->Prepare();
1069 ASSERT_EQ(s, Status::InvalidArgument());
1070
1071 // name too short
1072 s = txn1->SetName("");
1073 ASSERT_EQ(s, Status::InvalidArgument());
1074
1075 // name too long
1076 s = txn1->SetName(std::string(513, 'x'));
1077 ASSERT_EQ(s, Status::InvalidArgument());
1078
1079 // valid set name
1080 s = txn1->SetName("name1");
1081 ASSERT_OK(s);
1082
1083 // cant have duplicate name
1084 s = txn2->SetName("name1");
1085 ASSERT_EQ(s, Status::InvalidArgument());
1086
1087 // shouldn't be able to prepare
1088 s = txn2->Prepare();
1089 ASSERT_EQ(s, Status::InvalidArgument());
1090
1091 // valid name set
1092 s = txn2->SetName("name2");
1093 ASSERT_OK(s);
1094
1095 // cant reset name
1096 s = txn2->SetName("name3");
1097 ASSERT_EQ(s, Status::InvalidArgument());
1098
1099 ASSERT_EQ(txn1->GetName(), "name1");
1100 ASSERT_EQ(txn2->GetName(), "name2");
1101
1102 s = txn1->Prepare();
1103 ASSERT_OK(s);
1104
1105 // can't rename after prepare
1106 s = txn1->SetName("name4");
1107 ASSERT_EQ(s, Status::InvalidArgument());
1108
20effc67
TL
1109 ASSERT_OK(txn1->Rollback());
1110 ASSERT_OK(txn2->Rollback());
7c673cae
FG
1111 delete txn1;
1112 delete txn2;
1113}
1114
1115TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
11fdf7f2
TL
1116 for (bool cwb4recovery : {true, false}) {
1117 for (bool test_with_empty_wal : {true, false}) {
1118 if (!cwb4recovery && test_with_empty_wal) {
1119 continue;
1120 }
1121 ASSERT_OK(ReOpen());
1122 Status s;
1123 std::string value;
1124
1125 WriteOptions write_options;
1126 ReadOptions read_options;
1127 TransactionOptions txn_options;
1128 txn_options.use_only_the_last_commit_time_batch_for_recovery =
1129 cwb4recovery;
1130 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1131 ASSERT_TRUE(txn1);
1132 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1133 ASSERT_TRUE(txn2);
1134
1135 s = txn1->SetName("joe");
1136 ASSERT_OK(s);
7c673cae 1137
11fdf7f2
TL
1138 s = txn2->SetName("bob");
1139 ASSERT_OK(s);
7c673cae 1140
11fdf7f2
TL
1141 s = txn1->Prepare();
1142 ASSERT_OK(s);
7c673cae 1143
11fdf7f2
TL
1144 s = txn1->Commit();
1145 ASSERT_OK(s);
7c673cae 1146
11fdf7f2 1147 delete txn1;
7c673cae 1148
20effc67
TL
1149 ASSERT_OK(
1150 txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")));
7c673cae 1151
11fdf7f2
TL
1152 s = txn2->Prepare();
1153 ASSERT_OK(s);
7c673cae 1154
11fdf7f2
TL
1155 s = txn2->Commit();
1156 ASSERT_OK(s);
7c673cae 1157
11fdf7f2
TL
1158 delete txn2;
1159 if (!cwb4recovery) {
1160 s = db->Get(read_options, "foo", &value);
1161 ASSERT_OK(s);
1162 ASSERT_EQ(value, "bar");
1163 } else {
1164 if (test_with_empty_wal) {
20effc67
TL
1165 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1166 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
11fdf7f2
TL
1167 // After flush the state must be visible
1168 s = db->Get(read_options, "foo", &value);
1169 ASSERT_OK(s);
1170 ASSERT_EQ(value, "bar");
1171 }
20effc67 1172 ASSERT_OK(db->FlushWAL(true));
11fdf7f2
TL
1173 // kill and reopen to trigger recovery
1174 s = ReOpenNoDelete();
1175 ASSERT_OK(s);
1176 assert(db != nullptr);
1177 s = db->Get(read_options, "foo", &value);
1178 ASSERT_OK(s);
1179 ASSERT_EQ(value, "bar");
1180 }
1181 }
1182 }
7c673cae
FG
1183}
1184
494da23a 1185#ifndef ROCKSDB_VALGRIND_RUN
11fdf7f2 1186TEST_P(TransactionStressTest, TwoPhaseExpirationTest) {
7c673cae
FG
1187 Status s;
1188
1189 WriteOptions write_options;
1190 TransactionOptions txn_options;
1191 txn_options.expiration = 500; // 500ms
1192 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1193 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1194 ASSERT_TRUE(txn1);
1195 ASSERT_TRUE(txn1);
1196
1197 s = txn1->SetName("joe");
1198 ASSERT_OK(s);
1199 s = txn2->SetName("bob");
1200 ASSERT_OK(s);
1201
1202 s = txn1->Prepare();
1203 ASSERT_OK(s);
1204
1205 /* sleep override */
1206 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
1207
1208 s = txn1->Commit();
1209 ASSERT_OK(s);
1210
1211 s = txn2->Prepare();
1212 ASSERT_EQ(s, Status::Expired());
1213
1214 delete txn1;
1215 delete txn2;
1216}
1217
1218TEST_P(TransactionTest, TwoPhaseRollbackTest) {
1219 WriteOptions write_options;
1220 ReadOptions read_options;
1221
1222 TransactionOptions txn_options;
1223
11fdf7f2 1224 std::string value;
7c673cae
FG
1225 Status s;
1226
20effc67 1227 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
7c673cae
FG
1228 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1229 s = txn->SetName("xid");
1230 ASSERT_OK(s);
1231
1232 // transaction put
1233 s = txn->Put(Slice("tfoo"), Slice("tbar"));
1234 ASSERT_OK(s);
1235
1236 // value is readable form txn
1237 s = txn->Get(read_options, Slice("tfoo"), &value);
1238 ASSERT_OK(s);
1239 ASSERT_EQ(value, "tbar");
1240
1241 // issue rollback
1242 s = txn->Rollback();
1243 ASSERT_OK(s);
1244
1245 // value is nolonger readable
1246 s = txn->Get(read_options, Slice("tfoo"), &value);
1247 ASSERT_TRUE(s.IsNotFound());
1248 ASSERT_EQ(txn->GetNumPuts(), 0);
1249
1250 // put new txn values
1251 s = txn->Put(Slice("tfoo2"), Slice("tbar2"));
1252 ASSERT_OK(s);
1253
1254 // new value is readable from txn
1255 s = txn->Get(read_options, Slice("tfoo2"), &value);
1256 ASSERT_OK(s);
1257 ASSERT_EQ(value, "tbar2");
1258
1259 s = txn->Prepare();
1260 ASSERT_OK(s);
1261
1262 // flush to next wal
1263 s = db->Put(write_options, Slice("foo"), Slice("bar"));
1264 ASSERT_OK(s);
20effc67 1265 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
7c673cae
FG
1266
1267 // issue rollback (marker written to WAL)
1268 s = txn->Rollback();
1269 ASSERT_OK(s);
1270
1271 // value is nolonger readable
1272 s = txn->Get(read_options, Slice("tfoo2"), &value);
1273 ASSERT_TRUE(s.IsNotFound());
1274 ASSERT_EQ(txn->GetNumPuts(), 0);
1275
1276 // make commit
1277 s = txn->Commit();
1278 ASSERT_EQ(s, Status::InvalidArgument());
1279
1280 // try rollback again
1281 s = txn->Rollback();
1282 ASSERT_EQ(s, Status::InvalidArgument());
1283
1284 delete txn;
1285}
1286
1287TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
1288 WriteOptions write_options;
1289 write_options.sync = true;
1290 write_options.disableWAL = false;
1291 ReadOptions read_options;
1292
1293 TransactionOptions txn_options;
1294
11fdf7f2 1295 std::string value;
7c673cae
FG
1296 Status s;
1297
20effc67 1298 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
7c673cae
FG
1299
1300 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1301 s = txn->SetName("xid");
1302 ASSERT_OK(s);
1303
1304 ASSERT_EQ(db->GetTransactionByName("xid"), txn);
1305
1306 // transaction put
1307 s = txn->Put(Slice("foo"), Slice("bar"));
1308 ASSERT_OK(s);
1309 ASSERT_EQ(1, txn->GetNumPuts());
1310
1311 // txn read
1312 s = txn->Get(read_options, "foo", &value);
1313 ASSERT_OK(s);
1314 ASSERT_EQ(value, "bar");
1315
1316 // regular db put
1317 s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
1318 ASSERT_OK(s);
1319 ASSERT_EQ(1, txn->GetNumPuts());
1320
20effc67 1321 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
7c673cae
FG
1322
1323 // regular db read
1324 db->Get(read_options, "foo2", &value);
1325 ASSERT_EQ(value, "bar2");
1326
1327 // nothing has been prepped yet
1328 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1329
1330 // prepare
1331 s = txn->Prepare();
1332 ASSERT_OK(s);
1333
1334 // still not available to db
1335 s = db->Get(read_options, Slice("foo"), &value);
1336 ASSERT_TRUE(s.IsNotFound());
1337
20effc67 1338 ASSERT_OK(db->FlushWAL(false));
7c673cae
FG
1339 delete txn;
1340 // kill and reopen
11fdf7f2 1341 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
7c673cae
FG
1342 s = ReOpenNoDelete();
1343 ASSERT_OK(s);
11fdf7f2 1344 assert(db != nullptr);
20effc67 1345 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
7c673cae
FG
1346
1347 // find trans in list of prepared transactions
1348 std::vector<Transaction*> prepared_trans;
1349 db->GetAllPreparedTransactions(&prepared_trans);
1350 ASSERT_EQ(prepared_trans.size(), 1);
1351
1352 txn = prepared_trans.front();
1353 ASSERT_TRUE(txn);
1354 ASSERT_EQ(txn->GetName(), "xid");
1355 ASSERT_EQ(db->GetTransactionByName("xid"), txn);
1356
1357 // log has been marked
1358 auto log_containing_prep =
1359 db_impl->TEST_FindMinLogContainingOutstandingPrep();
1360 ASSERT_GT(log_containing_prep, 0);
1361
1362 // value is readable from txn
1363 s = txn->Get(read_options, "foo", &value);
1364 ASSERT_OK(s);
1365 ASSERT_EQ(value, "bar");
1366
1367 // make commit
1368 s = txn->Commit();
1369 ASSERT_OK(s);
1370
1371 // value is now available
1372 db->Get(read_options, "foo", &value);
1373 ASSERT_EQ(value, "bar");
1374
1375 // we already committed
1376 s = txn->Commit();
1377 ASSERT_EQ(s, Status::InvalidArgument());
1378
11fdf7f2 1379 // no longer is prepared results
7c673cae
FG
1380 prepared_trans.clear();
1381 db->GetAllPreparedTransactions(&prepared_trans);
1382 ASSERT_EQ(prepared_trans.size(), 0);
1383
1384 // transaction should no longer be visible
1385 ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1386
1387 // heap should not care about prepared section anymore
1388 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1389
11fdf7f2
TL
1390 switch (txn_db_options.write_policy) {
1391 case WRITE_COMMITTED:
1392 // but now our memtable should be referencing the prep section
1393 ASSERT_EQ(log_containing_prep,
1394 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1395 ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
1396
1397 break;
1398 case WRITE_PREPARED:
1399 case WRITE_UNPREPARED:
1400 // In these modes memtable do not ref the prep sections
1401 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1402 break;
1403 default:
1404 assert(false);
1405 }
1406
1407 // Add a dummy record to memtable before a flush. Otherwise, the
1408 // memtable will be empty and flush will be skipped.
1409 s = db->Put(write_options, Slice("foo3"), Slice("bar3"));
1410 ASSERT_OK(s);
7c673cae 1411
20effc67 1412 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
7c673cae 1413
11fdf7f2
TL
1414 // after memtable flush we can now release the log
1415 ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
7c673cae
FG
1416 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1417
1418 delete txn;
1419
1420 // deleting transaction should unregister transaction
1421 ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
1422}
494da23a 1423#endif // ROCKSDB_VALGRIND_RUN
7c673cae 1424
11fdf7f2
TL
1425// TODO this test needs to be updated with serial commits
1426TEST_P(TransactionTest, DISABLED_TwoPhaseMultiThreadTest) {
7c673cae
FG
1427 // mix transaction writes and regular writes
1428 const uint32_t NUM_TXN_THREADS = 50;
1429 std::atomic<uint32_t> txn_thread_num(0);
1430
1431 std::function<void()> txn_write_thread = [&]() {
1432 uint32_t id = txn_thread_num.fetch_add(1);
1433
1434 WriteOptions write_options;
1435 write_options.sync = true;
1436 write_options.disableWAL = false;
1437 TransactionOptions txn_options;
1438 txn_options.lock_timeout = 1000000;
1439 if (id % 2 == 0) {
1440 txn_options.expiration = 1000000;
1441 }
11fdf7f2 1442 TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(id)));
7c673cae
FG
1443 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1444 ASSERT_OK(txn->SetName(name));
1445 for (int i = 0; i < 10; i++) {
11fdf7f2 1446 std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
7c673cae
FG
1447 ASSERT_OK(txn->Put(key, "val"));
1448 }
1449 ASSERT_OK(txn->Prepare());
1450 ASSERT_OK(txn->Commit());
1451 delete txn;
1452 };
1453
1454 // assure that all thread are in the same write group
1455 std::atomic<uint32_t> t_wait_on_prepare(0);
1456 std::atomic<uint32_t> t_wait_on_commit(0);
1457
f67539c2 1458 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
7c673cae
FG
1459 "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
1460 auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
1461
1462 if (writer->ShouldWriteToWAL()) {
1463 t_wait_on_prepare.fetch_add(1);
1464 // wait for friends
1465 while (t_wait_on_prepare.load() < NUM_TXN_THREADS) {
1466 env->SleepForMicroseconds(10);
1467 }
1468 } else if (writer->ShouldWriteToMemtable()) {
1469 t_wait_on_commit.fetch_add(1);
1470 // wait for friends
1471 while (t_wait_on_commit.load() < NUM_TXN_THREADS) {
1472 env->SleepForMicroseconds(10);
1473 }
1474 } else {
11fdf7f2 1475 FAIL();
7c673cae
FG
1476 }
1477 });
1478
f67539c2 1479 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
1480
1481 // do all the writes
1482 std::vector<port::Thread> threads;
1483 for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
1484 threads.emplace_back(txn_write_thread);
1485 }
1486 for (auto& t : threads) {
1487 t.join();
1488 }
1489
f67539c2
TL
1490 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1491 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
7c673cae
FG
1492
1493 ReadOptions read_options;
1494 std::string value;
1495 Status s;
1496 for (uint32_t t = 0; t < NUM_TXN_THREADS; t++) {
11fdf7f2 1497 TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(t)));
7c673cae 1498 for (int i = 0; i < 10; i++) {
11fdf7f2 1499 std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
7c673cae
FG
1500 s = db->Get(read_options, key, &value);
1501 ASSERT_OK(s);
1502 ASSERT_EQ(value, "val");
1503 }
1504 }
1505}
1506
11fdf7f2 1507TEST_P(TransactionStressTest, TwoPhaseLongPrepareTest) {
7c673cae
FG
1508 WriteOptions write_options;
1509 write_options.sync = true;
1510 write_options.disableWAL = false;
1511 ReadOptions read_options;
1512 TransactionOptions txn_options;
1513
1514 std::string value;
1515 Status s;
1516
1517 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1518 s = txn->SetName("bob");
1519 ASSERT_OK(s);
1520
1521 // transaction put
1522 s = txn->Put(Slice("foo"), Slice("bar"));
1523 ASSERT_OK(s);
1524
1525 // prepare
1526 s = txn->Prepare();
1527 ASSERT_OK(s);
1528
1529 delete txn;
1530
1531 for (int i = 0; i < 1000; i++) {
1532 std::string key(i, 'k');
1533 std::string val(1000, 'v');
11fdf7f2 1534 assert(db != nullptr);
7c673cae
FG
1535 s = db->Put(write_options, key, val);
1536 ASSERT_OK(s);
1537
1538 if (i % 29 == 0) {
1539 // crash
1540 env->SetFilesystemActive(false);
11fdf7f2 1541 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
7c673cae
FG
1542 ReOpenNoDelete();
1543 } else if (i % 37 == 0) {
1544 // close
1545 ReOpenNoDelete();
1546 }
1547 }
1548
1549 // commit old txn
1550 txn = db->GetTransactionByName("bob");
1551 ASSERT_TRUE(txn);
1552 s = txn->Commit();
1553 ASSERT_OK(s);
1554
1555 // verify data txn data
1556 s = db->Get(read_options, "foo", &value);
1557 ASSERT_EQ(s, Status::OK());
1558 ASSERT_EQ(value, "bar");
1559
1560 // verify non txn data
1561 for (int i = 0; i < 1000; i++) {
1562 std::string key(i, 'k');
1563 std::string val(1000, 'v');
1564 s = db->Get(read_options, key, &value);
1565 ASSERT_EQ(s, Status::OK());
1566 ASSERT_EQ(value, val);
1567 }
1568
1569 delete txn;
1570}
1571
1572TEST_P(TransactionTest, TwoPhaseSequenceTest) {
1573 WriteOptions write_options;
1574 write_options.sync = true;
1575 write_options.disableWAL = false;
1576 ReadOptions read_options;
1577
1578 TransactionOptions txn_options;
1579
1580 std::string value;
1581 Status s;
1582
1583 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1584 s = txn->SetName("xid");
1585 ASSERT_OK(s);
1586
1587 // transaction put
1588 s = txn->Put(Slice("foo"), Slice("bar"));
1589 ASSERT_OK(s);
1590 s = txn->Put(Slice("foo2"), Slice("bar2"));
1591 ASSERT_OK(s);
1592 s = txn->Put(Slice("foo3"), Slice("bar3"));
1593 ASSERT_OK(s);
1594 s = txn->Put(Slice("foo4"), Slice("bar4"));
1595 ASSERT_OK(s);
1596
1597 // prepare
1598 s = txn->Prepare();
1599 ASSERT_OK(s);
1600
1601 // make commit
1602 s = txn->Commit();
1603 ASSERT_OK(s);
1604
1605 delete txn;
1606
1607 // kill and reopen
1608 env->SetFilesystemActive(false);
1609 ReOpenNoDelete();
11fdf7f2 1610 assert(db != nullptr);
7c673cae
FG
1611
1612 // value is now available
1613 s = db->Get(read_options, "foo4", &value);
1614 ASSERT_EQ(s, Status::OK());
1615 ASSERT_EQ(value, "bar4");
1616}
1617
1618TEST_P(TransactionTest, TwoPhaseDoubleRecoveryTest) {
1619 WriteOptions write_options;
1620 write_options.sync = true;
1621 write_options.disableWAL = false;
1622 ReadOptions read_options;
1623
1624 TransactionOptions txn_options;
1625
1626 std::string value;
1627 Status s;
1628
1629 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1630 s = txn->SetName("a");
1631 ASSERT_OK(s);
1632
1633 // transaction put
1634 s = txn->Put(Slice("foo"), Slice("bar"));
1635 ASSERT_OK(s);
1636
1637 // prepare
1638 s = txn->Prepare();
1639 ASSERT_OK(s);
1640
1641 delete txn;
1642
1643 // kill and reopen
1644 env->SetFilesystemActive(false);
11fdf7f2 1645 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
7c673cae
FG
1646 ReOpenNoDelete();
1647
1648 // commit old txn
20effc67 1649 assert(db != nullptr); // Make clang analyze happy.
7c673cae 1650 txn = db->GetTransactionByName("a");
20effc67 1651 assert(txn != nullptr);
7c673cae
FG
1652 s = txn->Commit();
1653 ASSERT_OK(s);
1654
1655 s = db->Get(read_options, "foo", &value);
1656 ASSERT_EQ(s, Status::OK());
1657 ASSERT_EQ(value, "bar");
1658
1659 delete txn;
1660
1661 txn = db->BeginTransaction(write_options, txn_options);
1662 s = txn->SetName("b");
1663 ASSERT_OK(s);
1664
1665 s = txn->Put(Slice("foo2"), Slice("bar2"));
1666 ASSERT_OK(s);
1667
1668 s = txn->Prepare();
1669 ASSERT_OK(s);
1670
1671 s = txn->Commit();
1672 ASSERT_OK(s);
1673
1674 delete txn;
1675
1676 // kill and reopen
1677 env->SetFilesystemActive(false);
20effc67 1678 ASSERT_OK(ReOpenNoDelete());
11fdf7f2 1679 assert(db != nullptr);
7c673cae
FG
1680
1681 // value is now available
1682 s = db->Get(read_options, "foo", &value);
1683 ASSERT_EQ(s, Status::OK());
1684 ASSERT_EQ(value, "bar");
1685
1686 s = db->Get(read_options, "foo2", &value);
1687 ASSERT_EQ(s, Status::OK());
1688 ASSERT_EQ(value, "bar2");
1689}
1690
1691TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
20effc67 1692 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
7c673cae
FG
1693
1694 Status s;
11fdf7f2 1695 std::string v;
7c673cae
FG
1696 ColumnFamilyHandle *cfa, *cfb;
1697
1698 // Create 2 new column families
1699 ColumnFamilyOptions cf_options;
1700 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
1701 ASSERT_OK(s);
1702 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
1703 ASSERT_OK(s);
1704
1705 WriteOptions wopts;
1706 wopts.disableWAL = false;
1707 wopts.sync = true;
1708
1709 TransactionOptions topts1;
1710 Transaction* txn1 = db->BeginTransaction(wopts, topts1);
1711 s = txn1->SetName("xid1");
1712 ASSERT_OK(s);
1713
1714 TransactionOptions topts2;
1715 Transaction* txn2 = db->BeginTransaction(wopts, topts2);
1716 s = txn2->SetName("xid2");
1717 ASSERT_OK(s);
1718
1719 // transaction put in two column families
1720 s = txn1->Put(cfa, "ka1", "va1");
1721 ASSERT_OK(s);
1722
1723 // transaction put in two column families
1724 s = txn2->Put(cfa, "ka2", "va2");
1725 ASSERT_OK(s);
1726 s = txn2->Put(cfb, "kb2", "vb2");
1727 ASSERT_OK(s);
1728
1729 // write prep section to wal
1730 s = txn1->Prepare();
1731 ASSERT_OK(s);
1732
1733 // our log should be in the heap
1734 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1735 txn1->GetLogNumber());
f67539c2 1736 ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
7c673cae
FG
1737
1738 // flush default cf to crate new log
1739 s = db->Put(wopts, "foo", "bar");
1740 ASSERT_OK(s);
1741 s = db_impl->TEST_FlushMemTable(true);
1742 ASSERT_OK(s);
1743
1744 // make sure we are on a new log
f67539c2 1745 ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLastLogNumber());
7c673cae
FG
1746
1747 // put txn2 prep section in this log
1748 s = txn2->Prepare();
1749 ASSERT_OK(s);
f67539c2 1750 ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
7c673cae
FG
1751
1752 // heap should still see first log
1753 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1754 txn1->GetLogNumber());
1755
1756 // commit txn1
1757 s = txn1->Commit();
1758 ASSERT_OK(s);
1759
1760 // heap should now show txn2s log
1761 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
1762 txn2->GetLogNumber());
1763
11fdf7f2
TL
1764 switch (txn_db_options.write_policy) {
1765 case WRITE_COMMITTED:
1766 // we should see txn1s log refernced by the memtables
1767 ASSERT_EQ(txn1->GetLogNumber(),
1768 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1769 break;
1770 case WRITE_PREPARED:
1771 case WRITE_UNPREPARED:
1772 // In these modes memtable do not ref the prep sections
1773 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1774 break;
1775 default:
1776 assert(false);
1777 }
7c673cae
FG
1778
1779 // flush default cf to crate new log
1780 s = db->Put(wopts, "foo", "bar2");
1781 ASSERT_OK(s);
1782 s = db_impl->TEST_FlushMemTable(true);
1783 ASSERT_OK(s);
1784
1785 // make sure we are on a new log
f67539c2 1786 ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLastLogNumber());
7c673cae
FG
1787
1788 // commit txn2
1789 s = txn2->Commit();
1790 ASSERT_OK(s);
1791
1792 // heap should not show any logs
1793 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1794
11fdf7f2
TL
1795 switch (txn_db_options.write_policy) {
1796 case WRITE_COMMITTED:
1797 // should show the first txn log
1798 ASSERT_EQ(txn1->GetLogNumber(),
1799 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1800 break;
1801 case WRITE_PREPARED:
1802 case WRITE_UNPREPARED:
1803 // In these modes memtable do not ref the prep sections
1804 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1805 break;
1806 default:
1807 assert(false);
1808 }
7c673cae
FG
1809
1810 // flush only cfa memtable
11fdf7f2
TL
1811 s = db_impl->TEST_FlushMemTable(true, false, cfa);
1812 ASSERT_OK(s);
1813
1814 switch (txn_db_options.write_policy) {
1815 case WRITE_COMMITTED:
1816 // should show the first txn log
1817 ASSERT_EQ(txn2->GetLogNumber(),
1818 db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1819 break;
1820 case WRITE_PREPARED:
1821 case WRITE_UNPREPARED:
1822 // In these modes memtable do not ref the prep sections
1823 ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
1824 break;
1825 default:
1826 assert(false);
1827 }
7c673cae
FG
1828
1829 // flush only cfb memtable
11fdf7f2 1830 s = db_impl->TEST_FlushMemTable(true, false, cfb);
7c673cae
FG
1831 ASSERT_OK(s);
1832
1833 // should show not dependency on logs
1834 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1835 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
1836
1837 delete txn1;
1838 delete txn2;
1839 delete cfa;
1840 delete cfb;
1841}
1842
1843TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
20effc67 1844 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
7c673cae
FG
1845
1846 Status s;
1847 ColumnFamilyHandle *cfa, *cfb;
1848
1849 ColumnFamilyOptions cf_options;
1850 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
1851 ASSERT_OK(s);
1852 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
1853 ASSERT_OK(s);
1854
1855 WriteOptions wopts;
1856 wopts.disableWAL = false;
1857 wopts.sync = true;
1858
20effc67
TL
1859 auto cfh_a = static_cast_with_check<ColumnFamilyHandleImpl>(cfa);
1860 auto cfh_b = static_cast_with_check<ColumnFamilyHandleImpl>(cfb);
7c673cae
FG
1861
1862 TransactionOptions topts1;
1863 Transaction* txn1 = db->BeginTransaction(wopts, topts1);
1864 s = txn1->SetName("xid1");
1865 ASSERT_OK(s);
1866 s = txn1->Put(cfa, "boys", "girls1");
1867 ASSERT_OK(s);
1868
1869 Transaction* txn2 = db->BeginTransaction(wopts, topts1);
1870 s = txn2->SetName("xid2");
1871 ASSERT_OK(s);
1872 s = txn2->Put(cfb, "up", "down1");
1873 ASSERT_OK(s);
1874
1875 // prepre transaction in LOG A
1876 s = txn1->Prepare();
1877 ASSERT_OK(s);
1878
1879 // prepre transaction in LOG A
1880 s = txn2->Prepare();
1881 ASSERT_OK(s);
1882
1883 // regular put so that mem table can actually be flushed for log rolling
1884 s = db->Put(wopts, "cats", "dogs1");
1885 ASSERT_OK(s);
1886
f67539c2 1887 auto prepare_log_no = txn1->GetLastLogNumber();
7c673cae
FG
1888
1889 // roll to LOG B
1890 s = db_impl->TEST_FlushMemTable(true);
1891 ASSERT_OK(s);
1892
1893 // now we pause background work so that
1894 // imm()s are not flushed before we can check their status
1895 s = db_impl->PauseBackgroundWork();
1896 ASSERT_OK(s);
1897
1898 ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
11fdf7f2
TL
1899 switch (txn_db_options.write_policy) {
1900 case WRITE_COMMITTED:
1901 // This cf is empty and should ref the latest log
1902 ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
1903 ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
1904 break;
1905 case WRITE_PREPARED:
1906 case WRITE_UNPREPARED:
1907 // This cf is not flushed yet and should ref the log that has its data
1908 ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
1909 break;
1910 default:
1911 assert(false);
1912 }
7c673cae 1913 ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
f67539c2 1914 txn1->GetLogNumber());
7c673cae
FG
1915 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1916
1917 // commit in LOG B
1918 s = txn1->Commit();
1919 ASSERT_OK(s);
1920
11fdf7f2
TL
1921 switch (txn_db_options.write_policy) {
1922 case WRITE_COMMITTED:
1923 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
1924 prepare_log_no);
1925 break;
1926 case WRITE_PREPARED:
1927 case WRITE_UNPREPARED:
1928 // In these modes memtable do not ref the prep sections
1929 ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
1930 break;
1931 default:
1932 assert(false);
1933 }
7c673cae 1934
11fdf7f2 1935 ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
7c673cae
FG
1936
1937 // request a flush for all column families such that the earliest
1938 // alive log file can be killed
11fdf7f2 1939 db_impl->TEST_SwitchWAL();
7c673cae
FG
1940 // log cannot be flushed because txn2 has not been commited
1941 ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
11fdf7f2 1942 ASSERT_TRUE(db_impl->TEST_UnableToReleaseOldestLog());
7c673cae
FG
1943
1944 // assert that cfa has a flush requested
1945 ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
1946
11fdf7f2
TL
1947 switch (txn_db_options.write_policy) {
1948 case WRITE_COMMITTED:
1949 // cfb should not be flushed becuse it has no data from LOG A
1950 ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
1951 break;
1952 case WRITE_PREPARED:
1953 case WRITE_UNPREPARED:
1954 // cfb should be flushed becuse it has prepared data from LOG A
1955 ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
1956 break;
1957 default:
1958 assert(false);
1959 }
7c673cae
FG
1960
1961 // cfb now has data from LOG A
1962 s = txn2->Commit();
1963 ASSERT_OK(s);
1964
11fdf7f2
TL
1965 db_impl->TEST_SwitchWAL();
1966 ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
7c673cae
FG
1967
1968 // we should see that cfb now has a flush requested
1969 ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
1970
1971 // all data in LOG A resides in a memtable that has been
1972 // requested for a flush
1973 ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
1974
1975 delete txn1;
1976 delete txn2;
1977 delete cfa;
1978 delete cfb;
1979}
1980/*
1981 * 1) use prepare to keep first log around to determine starting sequence
1982 * during recovery.
1983 * 2) insert many values, skipping wal, to increase seqid.
1984 * 3) insert final value into wal
1985 * 4) recover and see that final value was properly recovered - not
1986 * hidden behind improperly summed sequence ids
1987 */
1988TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) {
20effc67 1989 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
7c673cae
FG
1990 WriteOptions wal_on, wal_off;
1991 wal_on.sync = true;
1992 wal_on.disableWAL = false;
1993 wal_off.disableWAL = true;
1994 ReadOptions read_options;
1995 TransactionOptions txn_options;
1996
1997 std::string value;
1998 Status s;
1999
2000 Transaction* txn1 = db->BeginTransaction(wal_on, txn_options);
2001
2002 s = txn1->SetName("1");
2003 ASSERT_OK(s);
2004
2005 s = db->Put(wal_on, "first", "first");
2006 ASSERT_OK(s);
2007
2008 s = txn1->Put(Slice("dummy"), Slice("dummy"));
2009 ASSERT_OK(s);
2010 s = txn1->Prepare();
2011 ASSERT_OK(s);
2012
2013 s = db->Put(wal_off, "cats", "dogs1");
2014 ASSERT_OK(s);
2015 s = db->Put(wal_off, "cats", "dogs2");
2016 ASSERT_OK(s);
2017 s = db->Put(wal_off, "cats", "dogs3");
2018 ASSERT_OK(s);
2019
2020 s = db_impl->TEST_FlushMemTable(true);
2021 ASSERT_OK(s);
2022
2023 s = db->Put(wal_on, "cats", "dogs4");
2024 ASSERT_OK(s);
2025
20effc67 2026 ASSERT_OK(db->FlushWAL(false));
11fdf7f2 2027
7c673cae
FG
2028 // kill and reopen
2029 env->SetFilesystemActive(false);
11fdf7f2 2030 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
20effc67 2031 ASSERT_OK(ReOpenNoDelete());
11fdf7f2 2032 assert(db != nullptr);
7c673cae
FG
2033
2034 s = db->Get(read_options, "first", &value);
2035 ASSERT_OK(s);
2036 ASSERT_EQ(value, "first");
2037
2038 s = db->Get(read_options, "cats", &value);
2039 ASSERT_OK(s);
2040 ASSERT_EQ(value, "dogs4");
2041}
2042
2043TEST_P(TransactionTest, FirstWriteTest) {
2044 WriteOptions write_options;
2045
2046 // Test conflict checking against the very first write to a db.
2047 // The transaction's snapshot will have seq 1 and the following write
2048 // will have sequence 1.
2049 Status s = db->Put(write_options, "A", "a");
2050
2051 Transaction* txn = db->BeginTransaction(write_options);
2052 txn->SetSnapshot();
2053
2054 ASSERT_OK(s);
2055
2056 s = txn->Put("A", "b");
2057 ASSERT_OK(s);
2058
2059 delete txn;
2060}
2061
2062TEST_P(TransactionTest, FirstWriteTest2) {
2063 WriteOptions write_options;
2064
2065 Transaction* txn = db->BeginTransaction(write_options);
2066 txn->SetSnapshot();
2067
2068 // Test conflict checking against the very first write to a db.
2069 // The transaction's snapshot is a seq 0 while the following write
2070 // will have sequence 1.
2071 Status s = db->Put(write_options, "A", "a");
2072 ASSERT_OK(s);
2073
2074 s = txn->Put("A", "b");
2075 ASSERT_TRUE(s.IsBusy());
2076
2077 delete txn;
2078}
2079
2080TEST_P(TransactionTest, WriteOptionsTest) {
2081 WriteOptions write_options;
2082 write_options.sync = true;
2083 write_options.disableWAL = true;
2084
2085 Transaction* txn = db->BeginTransaction(write_options);
2086 ASSERT_TRUE(txn);
2087
2088 ASSERT_TRUE(txn->GetWriteOptions()->sync);
2089
2090 write_options.sync = false;
2091 txn->SetWriteOptions(write_options);
2092 ASSERT_FALSE(txn->GetWriteOptions()->sync);
2093 ASSERT_TRUE(txn->GetWriteOptions()->disableWAL);
2094
2095 delete txn;
2096}
2097
2098TEST_P(TransactionTest, WriteConflictTest) {
2099 WriteOptions write_options;
2100 ReadOptions read_options;
2101 string value;
2102 Status s;
2103
20effc67
TL
2104 ASSERT_OK(db->Put(write_options, "foo", "A"));
2105 ASSERT_OK(db->Put(write_options, "foo2", "B"));
7c673cae
FG
2106
2107 Transaction* txn = db->BeginTransaction(write_options);
2108 ASSERT_TRUE(txn);
2109
2110 s = txn->Put("foo", "A2");
2111 ASSERT_OK(s);
2112
2113 s = txn->Put("foo2", "B2");
2114 ASSERT_OK(s);
2115
2116 // This Put outside of a transaction will conflict with the previous write
2117 s = db->Put(write_options, "foo", "xxx");
2118 ASSERT_TRUE(s.IsTimedOut());
2119
2120 s = db->Get(read_options, "foo", &value);
2121 ASSERT_EQ(value, "A");
2122
2123 s = txn->Commit();
2124 ASSERT_OK(s);
2125
2126 db->Get(read_options, "foo", &value);
2127 ASSERT_EQ(value, "A2");
2128 db->Get(read_options, "foo2", &value);
2129 ASSERT_EQ(value, "B2");
2130
2131 delete txn;
2132}
2133
2134TEST_P(TransactionTest, WriteConflictTest2) {
2135 WriteOptions write_options;
2136 ReadOptions read_options;
2137 TransactionOptions txn_options;
11fdf7f2 2138 std::string value;
7c673cae
FG
2139 Status s;
2140
20effc67 2141 ASSERT_OK(db->Put(write_options, "foo", "bar"));
7c673cae
FG
2142
2143 txn_options.set_snapshot = true;
2144 Transaction* txn = db->BeginTransaction(write_options, txn_options);
2145 ASSERT_TRUE(txn);
2146
2147 // This Put outside of a transaction will conflict with a later write
2148 s = db->Put(write_options, "foo", "barz");
2149 ASSERT_OK(s);
2150
2151 s = txn->Put("foo2", "X");
2152 ASSERT_OK(s);
2153
2154 s = txn->Put("foo",
2155 "bar2"); // Conflicts with write done after snapshot taken
2156 ASSERT_TRUE(s.IsBusy());
2157
2158 s = txn->Put("foo3", "Y");
2159 ASSERT_OK(s);
2160
2161 s = db->Get(read_options, "foo", &value);
2162 ASSERT_EQ(value, "barz");
2163
2164 ASSERT_EQ(2, txn->GetNumKeys());
2165
2166 s = txn->Commit();
2167 ASSERT_OK(s); // Txn should commit, but only write foo2 and foo3
2168
2169 // Verify that transaction wrote foo2 and foo3 but not foo
2170 db->Get(read_options, "foo", &value);
2171 ASSERT_EQ(value, "barz");
2172
2173 db->Get(read_options, "foo2", &value);
2174 ASSERT_EQ(value, "X");
2175
2176 db->Get(read_options, "foo3", &value);
2177 ASSERT_EQ(value, "Y");
2178
2179 delete txn;
2180}
2181
2182TEST_P(TransactionTest, ReadConflictTest) {
2183 WriteOptions write_options;
2184 ReadOptions read_options, snapshot_read_options;
2185 TransactionOptions txn_options;
11fdf7f2 2186 std::string value;
7c673cae
FG
2187 Status s;
2188
20effc67
TL
2189 ASSERT_OK(db->Put(write_options, "foo", "bar"));
2190 ASSERT_OK(db->Put(write_options, "foo2", "bar"));
7c673cae
FG
2191
2192 txn_options.set_snapshot = true;
2193 Transaction* txn = db->BeginTransaction(write_options, txn_options);
2194 ASSERT_TRUE(txn);
2195
2196 txn->SetSnapshot();
2197 snapshot_read_options.snapshot = txn->GetSnapshot();
2198
20effc67 2199 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
7c673cae
FG
2200 ASSERT_EQ(value, "bar");
2201
2202 // This Put outside of a transaction will conflict with the previous read
2203 s = db->Put(write_options, "foo", "barz");
2204 ASSERT_TRUE(s.IsTimedOut());
2205
2206 s = db->Get(read_options, "foo", &value);
2207 ASSERT_EQ(value, "bar");
2208
2209 s = txn->Get(read_options, "foo", &value);
2210 ASSERT_EQ(value, "bar");
2211
2212 s = txn->Commit();
2213 ASSERT_OK(s);
2214
2215 delete txn;
2216}
2217
2218TEST_P(TransactionTest, TxnOnlyTest) {
2219 // Test to make sure transactions work when there are no other writes in an
2220 // empty db.
2221
2222 WriteOptions write_options;
2223 ReadOptions read_options;
11fdf7f2 2224 std::string value;
7c673cae
FG
2225 Status s;
2226
2227 Transaction* txn = db->BeginTransaction(write_options);
2228 ASSERT_TRUE(txn);
2229
2230 s = txn->Put("x", "y");
2231 ASSERT_OK(s);
2232
2233 s = txn->Commit();
2234 ASSERT_OK(s);
2235
2236 delete txn;
2237}
2238
2239TEST_P(TransactionTest, FlushTest) {
2240 WriteOptions write_options;
2241 ReadOptions read_options, snapshot_read_options;
11fdf7f2 2242 std::string value;
7c673cae
FG
2243 Status s;
2244
20effc67
TL
2245 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
2246 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
7c673cae
FG
2247
2248 Transaction* txn = db->BeginTransaction(write_options);
2249 ASSERT_TRUE(txn);
2250
2251 snapshot_read_options.snapshot = txn->GetSnapshot();
2252
20effc67 2253 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
7c673cae
FG
2254 ASSERT_EQ(value, "bar");
2255
2256 s = txn->Put(Slice("foo"), Slice("bar2"));
2257 ASSERT_OK(s);
2258
20effc67 2259 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
7c673cae
FG
2260 ASSERT_EQ(value, "bar2");
2261
2262 // Put a random key so we have a memtable to flush
2263 s = db->Put(write_options, "dummy", "dummy");
2264 ASSERT_OK(s);
2265
2266 // force a memtable flush
2267 FlushOptions flush_ops;
2268 db->Flush(flush_ops);
2269
2270 s = txn->Commit();
2271 // txn should commit since the flushed table is still in MemtableList History
2272 ASSERT_OK(s);
2273
2274 db->Get(read_options, "foo", &value);
2275 ASSERT_EQ(value, "bar2");
2276
2277 delete txn;
2278}
2279
2280TEST_P(TransactionTest, FlushTest2) {
2281 const size_t num_tests = 3;
2282
2283 for (size_t n = 0; n < num_tests; n++) {
2284 // Test different table factories
2285 switch (n) {
2286 case 0:
2287 break;
2288 case 1:
2289 options.table_factory.reset(new mock::MockTableFactory());
2290 break;
2291 case 2: {
2292 PlainTableOptions pt_opts;
2293 pt_opts.hash_table_ratio = 0;
2294 options.table_factory.reset(NewPlainTableFactory(pt_opts));
2295 break;
2296 }
2297 }
2298
2299 Status s = ReOpen();
2300 ASSERT_OK(s);
11fdf7f2 2301 assert(db != nullptr);
7c673cae
FG
2302
2303 WriteOptions write_options;
2304 ReadOptions read_options, snapshot_read_options;
2305 TransactionOptions txn_options;
2306 string value;
2307
20effc67 2308 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
7c673cae 2309
20effc67
TL
2310 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
2311 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar2")));
2312 ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar3")));
7c673cae
FG
2313
2314 txn_options.set_snapshot = true;
2315 Transaction* txn = db->BeginTransaction(write_options, txn_options);
2316 ASSERT_TRUE(txn);
2317
2318 snapshot_read_options.snapshot = txn->GetSnapshot();
2319
20effc67 2320 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
7c673cae
FG
2321 ASSERT_EQ(value, "bar");
2322
2323 s = txn->Put(Slice("foo"), Slice("bar2"));
2324 ASSERT_OK(s);
2325
20effc67 2326 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value));
7c673cae
FG
2327 ASSERT_EQ(value, "bar2");
2328 // verify foo is locked by txn
2329 s = db->Delete(write_options, "foo");
2330 ASSERT_TRUE(s.IsTimedOut());
2331
2332 s = db->Put(write_options, "Z", "z");
2333 ASSERT_OK(s);
2334 s = db->Put(write_options, "dummy", "dummy");
2335 ASSERT_OK(s);
2336
2337 s = db->Put(write_options, "S", "s");
2338 ASSERT_OK(s);
2339 s = db->SingleDelete(write_options, "S");
2340 ASSERT_OK(s);
2341
2342 s = txn->Delete("S");
2343 // Should fail after encountering a write to S in memtable
2344 ASSERT_TRUE(s.IsBusy());
2345
2346 // force a memtable flush
2347 s = db_impl->TEST_FlushMemTable(true);
2348 ASSERT_OK(s);
2349
2350 // Put a random key so we have a MemTable to flush
2351 s = db->Put(write_options, "dummy", "dummy2");
2352 ASSERT_OK(s);
2353
2354 // force a memtable flush
2355 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
2356
2357 s = db->Put(write_options, "dummy", "dummy3");
2358 ASSERT_OK(s);
2359
2360 // force a memtable flush
2361 // Since our test db has max_write_buffer_number=2, this flush will cause
2362 // the first memtable to get purged from the MemtableList history.
2363 ASSERT_OK(db_impl->TEST_FlushMemTable(true));
2364
2365 s = txn->Put("X", "Y");
2366 // Should succeed after verifying there is no write to X in SST file
2367 ASSERT_OK(s);
2368
2369 s = txn->Put("Z", "zz");
2370 // Should fail after encountering a write to Z in SST file
2371 ASSERT_TRUE(s.IsBusy());
2372
2373 s = txn->GetForUpdate(read_options, "foo2", &value);
2374 // should succeed since key was written before txn started
2375 ASSERT_OK(s);
2376 // verify foo2 is locked by txn
2377 s = db->Delete(write_options, "foo2");
2378 ASSERT_TRUE(s.IsTimedOut());
2379
2380 s = txn->Delete("S");
2381 // Should fail after encountering a write to S in SST file
2382 ASSERT_TRUE(s.IsBusy());
2383
2384 // Write a bunch of keys to db to force a compaction
2385 Random rnd(47);
2386 for (int i = 0; i < 1000; i++) {
2387 s = db->Put(write_options, std::to_string(i),
2388 test::CompressibleString(&rnd, 0.8, 100, &value));
2389 ASSERT_OK(s);
2390 }
2391
2392 s = txn->Put("X", "yy");
2393 // Should succeed after verifying there is no write to X in SST file
2394 ASSERT_OK(s);
2395
2396 s = txn->Put("Z", "zzz");
2397 // Should fail after encountering a write to Z in SST file
2398 ASSERT_TRUE(s.IsBusy());
2399
2400 s = txn->Delete("S");
2401 // Should fail after encountering a write to S in SST file
2402 ASSERT_TRUE(s.IsBusy());
2403
2404 s = txn->GetForUpdate(read_options, "foo3", &value);
2405 // should succeed since key was written before txn started
2406 ASSERT_OK(s);
2407 // verify foo3 is locked by txn
2408 s = db->Delete(write_options, "foo3");
2409 ASSERT_TRUE(s.IsTimedOut());
2410
20effc67 2411 ASSERT_OK(db_impl->TEST_WaitForCompact());
7c673cae
FG
2412
2413 s = txn->Commit();
2414 ASSERT_OK(s);
2415
2416 // Transaction should only write the keys that succeeded.
2417 s = db->Get(read_options, "foo", &value);
2418 ASSERT_EQ(value, "bar2");
2419
2420 s = db->Get(read_options, "X", &value);
2421 ASSERT_OK(s);
2422 ASSERT_EQ("yy", value);
2423
2424 s = db->Get(read_options, "Z", &value);
2425 ASSERT_OK(s);
2426 ASSERT_EQ("z", value);
2427
2428 delete txn;
2429 }
2430}
2431
2432TEST_P(TransactionTest, NoSnapshotTest) {
2433 WriteOptions write_options;
2434 ReadOptions read_options;
11fdf7f2 2435 std::string value;
7c673cae
FG
2436 Status s;
2437
20effc67 2438 ASSERT_OK(db->Put(write_options, "AAA", "bar"));
7c673cae
FG
2439
2440 Transaction* txn = db->BeginTransaction(write_options);
2441 ASSERT_TRUE(txn);
2442
2443 // Modify key after transaction start
20effc67 2444 ASSERT_OK(db->Put(write_options, "AAA", "bar1"));
7c673cae
FG
2445
2446 // Read and write without a snap
20effc67 2447 ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
7c673cae
FG
2448 ASSERT_EQ(value, "bar1");
2449 s = txn->Put("AAA", "bar2");
2450 ASSERT_OK(s);
2451
2452 // Should commit since read/write was done after data changed
2453 s = txn->Commit();
2454 ASSERT_OK(s);
2455
20effc67 2456 ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
7c673cae
FG
2457 ASSERT_EQ(value, "bar2");
2458
2459 delete txn;
2460}
2461
2462TEST_P(TransactionTest, MultipleSnapshotTest) {
2463 WriteOptions write_options;
2464 ReadOptions read_options, snapshot_read_options;
11fdf7f2 2465 std::string value;
7c673cae
FG
2466 Status s;
2467
11fdf7f2
TL
2468 ASSERT_OK(db->Put(write_options, "AAA", "bar"));
2469 ASSERT_OK(db->Put(write_options, "BBB", "bar"));
2470 ASSERT_OK(db->Put(write_options, "CCC", "bar"));
7c673cae
FG
2471
2472 Transaction* txn = db->BeginTransaction(write_options);
2473 ASSERT_TRUE(txn);
2474
20effc67 2475 ASSERT_OK(db->Put(write_options, "AAA", "bar1"));
7c673cae
FG
2476
2477 // Read and write without a snapshot
11fdf7f2 2478 ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
7c673cae
FG
2479 ASSERT_EQ(value, "bar1");
2480 s = txn->Put("AAA", "bar2");
2481 ASSERT_OK(s);
2482
2483 // Modify BBB before snapshot is taken
11fdf7f2 2484 ASSERT_OK(db->Put(write_options, "BBB", "bar1"));
7c673cae
FG
2485
2486 txn->SetSnapshot();
2487 snapshot_read_options.snapshot = txn->GetSnapshot();
2488
2489 // Read and write with snapshot
11fdf7f2 2490 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "BBB", &value));
7c673cae
FG
2491 ASSERT_EQ(value, "bar1");
2492 s = txn->Put("BBB", "bar2");
2493 ASSERT_OK(s);
2494
11fdf7f2 2495 ASSERT_OK(db->Put(write_options, "CCC", "bar1"));
7c673cae
FG
2496
2497 // Set a new snapshot
2498 txn->SetSnapshot();
2499 snapshot_read_options.snapshot = txn->GetSnapshot();
2500
2501 // Read and write with snapshot
20effc67 2502 ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "CCC", &value));
7c673cae
FG
2503 ASSERT_EQ(value, "bar1");
2504 s = txn->Put("CCC", "bar2");
2505 ASSERT_OK(s);
2506
2507 s = txn->GetForUpdate(read_options, "AAA", &value);
2508 ASSERT_OK(s);
2509 ASSERT_EQ(value, "bar2");
2510 s = txn->GetForUpdate(read_options, "BBB", &value);
2511 ASSERT_OK(s);
2512 ASSERT_EQ(value, "bar2");
2513 s = txn->GetForUpdate(read_options, "CCC", &value);
2514 ASSERT_OK(s);
2515 ASSERT_EQ(value, "bar2");
2516
2517 s = db->Get(read_options, "AAA", &value);
2518 ASSERT_OK(s);
2519 ASSERT_EQ(value, "bar1");
2520 s = db->Get(read_options, "BBB", &value);
2521 ASSERT_OK(s);
2522 ASSERT_EQ(value, "bar1");
2523 s = db->Get(read_options, "CCC", &value);
2524 ASSERT_OK(s);
2525 ASSERT_EQ(value, "bar1");
2526
2527 s = txn->Commit();
2528 ASSERT_OK(s);
2529
2530 s = db->Get(read_options, "AAA", &value);
2531 ASSERT_OK(s);
2532 ASSERT_EQ(value, "bar2");
2533 s = db->Get(read_options, "BBB", &value);
2534 ASSERT_OK(s);
2535 ASSERT_EQ(value, "bar2");
2536 s = db->Get(read_options, "CCC", &value);
2537 ASSERT_OK(s);
2538 ASSERT_EQ(value, "bar2");
2539
2540 // verify that we track multiple writes to the same key at different snapshots
2541 delete txn;
2542 txn = db->BeginTransaction(write_options);
2543
2544 // Potentially conflicting writes
20effc67
TL
2545 ASSERT_OK(db->Put(write_options, "ZZZ", "zzz"));
2546 ASSERT_OK(db->Put(write_options, "XXX", "xxx"));
7c673cae
FG
2547
2548 txn->SetSnapshot();
2549
2550 TransactionOptions txn_options;
2551 txn_options.set_snapshot = true;
2552 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
2553 txn2->SetSnapshot();
2554
2555 // This should not conflict in txn since the snapshot is later than the
2556 // previous write (spoiler alert: it will later conflict with txn2).
2557 s = txn->Put("ZZZ", "zzzz");
2558 ASSERT_OK(s);
2559
2560 s = txn->Commit();
2561 ASSERT_OK(s);
2562
2563 delete txn;
2564
2565 // This will conflict since the snapshot is earlier than another write to ZZZ
2566 s = txn2->Put("ZZZ", "xxxxx");
2567 ASSERT_TRUE(s.IsBusy());
2568
2569 s = txn2->Commit();
2570 ASSERT_OK(s);
2571
2572 s = db->Get(read_options, "ZZZ", &value);
2573 ASSERT_OK(s);
2574 ASSERT_EQ(value, "zzzz");
2575
2576 delete txn2;
2577}
2578
2579TEST_P(TransactionTest, ColumnFamiliesTest) {
2580 WriteOptions write_options;
2581 ReadOptions read_options, snapshot_read_options;
2582 TransactionOptions txn_options;
2583 string value;
2584 Status s;
2585
2586 ColumnFamilyHandle *cfa, *cfb;
2587 ColumnFamilyOptions cf_options;
2588
2589 // Create 2 new column families
2590 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
2591 ASSERT_OK(s);
2592 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
2593 ASSERT_OK(s);
2594
2595 delete cfa;
2596 delete cfb;
2597 delete db;
11fdf7f2 2598 db = nullptr;
7c673cae
FG
2599
2600 // open DB with three column families
2601 std::vector<ColumnFamilyDescriptor> column_families;
2602 // have to open default column family
2603 column_families.push_back(
2604 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2605 // open the new column families
2606 column_families.push_back(
2607 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
2608 column_families.push_back(
2609 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
2610
2611 std::vector<ColumnFamilyHandle*> handles;
2612
f67539c2 2613 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
11fdf7f2 2614 assert(db != nullptr);
7c673cae
FG
2615
2616 Transaction* txn = db->BeginTransaction(write_options);
2617 ASSERT_TRUE(txn);
2618
2619 txn->SetSnapshot();
2620 snapshot_read_options.snapshot = txn->GetSnapshot();
2621
2622 txn_options.set_snapshot = true;
2623 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
2624 ASSERT_TRUE(txn2);
2625
2626 // Write some data to the db
2627 WriteBatch batch;
20effc67
TL
2628 ASSERT_OK(batch.Put("foo", "foo"));
2629 ASSERT_OK(batch.Put(handles[1], "AAA", "bar"));
2630 ASSERT_OK(batch.Put(handles[1], "AAAZZZ", "bar"));
7c673cae
FG
2631 s = db->Write(write_options, &batch);
2632 ASSERT_OK(s);
20effc67 2633 ASSERT_OK(db->Delete(write_options, handles[1], "AAAZZZ"));
7c673cae
FG
2634
2635 // These keys do not conflict with existing writes since they're in
2636 // different column families
2637 s = txn->Delete("AAA");
2638 ASSERT_OK(s);
2639 s = txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
2640 ASSERT_TRUE(s.IsNotFound());
2641 Slice key_slice("AAAZZZ");
2642 Slice value_slices[2] = {Slice("bar"), Slice("bar")};
2643 s = txn->Put(handles[2], SliceParts(&key_slice, 1),
2644 SliceParts(value_slices, 2));
2645 ASSERT_OK(s);
2646 ASSERT_EQ(3, txn->GetNumKeys());
2647
2648 s = txn->Commit();
2649 ASSERT_OK(s);
2650 s = db->Get(read_options, "AAA", &value);
2651 ASSERT_TRUE(s.IsNotFound());
2652 s = db->Get(read_options, handles[2], "AAAZZZ", &value);
2653 ASSERT_EQ(value, "barbar");
2654
2655 Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
2656 Slice value_slice("barbarbar");
2657
2658 s = txn2->Delete(handles[2], "XXX");
2659 ASSERT_OK(s);
2660 s = txn2->Delete(handles[1], "XXX");
2661 ASSERT_OK(s);
2662
2663 // This write will cause a conflict with the earlier batch write
2664 s = txn2->Put(handles[1], SliceParts(key_slices, 3),
2665 SliceParts(&value_slice, 1));
2666 ASSERT_TRUE(s.IsBusy());
2667
2668 s = txn2->Commit();
2669 ASSERT_OK(s);
2670 // In the above the latest change to AAAZZZ in handles[1] is delete.
2671 s = db->Get(read_options, handles[1], "AAAZZZ", &value);
2672 ASSERT_TRUE(s.IsNotFound());
2673
2674 delete txn;
2675 delete txn2;
2676
2677 txn = db->BeginTransaction(write_options, txn_options);
2678 snapshot_read_options.snapshot = txn->GetSnapshot();
2679
2680 txn2 = db->BeginTransaction(write_options, txn_options);
2681 ASSERT_TRUE(txn);
2682
2683 std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
2684 handles[0], handles[2]};
2685 std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
2686 std::vector<std::string> values(4);
7c673cae
FG
2687 std::vector<Status> results = txn->MultiGetForUpdate(
2688 snapshot_read_options, multiget_cfh, multiget_keys, &values);
2689 ASSERT_OK(results[0]);
2690 ASSERT_OK(results[1]);
2691 ASSERT_OK(results[2]);
2692 ASSERT_TRUE(results[3].IsNotFound());
2693 ASSERT_EQ(values[0], "bar");
2694 ASSERT_EQ(values[1], "barbar");
2695 ASSERT_EQ(values[2], "foo");
2696
2697 s = txn->SingleDelete(handles[2], "ZZZ");
2698 ASSERT_OK(s);
2699 s = txn->Put(handles[2], "ZZZ", "YYY");
2700 ASSERT_OK(s);
2701 s = txn->Put(handles[2], "ZZZ", "YYYY");
2702 ASSERT_OK(s);
2703 s = txn->Delete(handles[2], "ZZZ");
2704 ASSERT_OK(s);
2705 s = txn->Put(handles[2], "AAAZZZ", "barbarbar");
2706 ASSERT_OK(s);
2707
2708 ASSERT_EQ(5, txn->GetNumKeys());
2709
2710 // Txn should commit
2711 s = txn->Commit();
2712 ASSERT_OK(s);
2713 s = db->Get(read_options, handles[2], "ZZZ", &value);
2714 ASSERT_TRUE(s.IsNotFound());
2715
2716 // Put a key which will conflict with the next txn using the previous snapshot
20effc67 2717 ASSERT_OK(db->Put(write_options, handles[2], "foo", "000"));
7c673cae
FG
2718
2719 results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
2720 multiget_keys, &values);
2721 // All results should fail since there was a conflict
2722 ASSERT_TRUE(results[0].IsBusy());
2723 ASSERT_TRUE(results[1].IsBusy());
2724 ASSERT_TRUE(results[2].IsBusy());
2725 ASSERT_TRUE(results[3].IsBusy());
2726
2727 s = db->Get(read_options, handles[2], "foo", &value);
2728 ASSERT_EQ(value, "000");
2729
2730 s = txn2->Commit();
2731 ASSERT_OK(s);
2732
2733 s = db->DropColumnFamily(handles[1]);
2734 ASSERT_OK(s);
2735 s = db->DropColumnFamily(handles[2]);
2736 ASSERT_OK(s);
2737
2738 delete txn;
2739 delete txn2;
2740
2741 for (auto handle : handles) {
2742 delete handle;
2743 }
2744}
2745
f67539c2
TL
2746TEST_P(TransactionTest, MultiGetBatchedTest) {
2747 WriteOptions write_options;
2748 ReadOptions read_options, snapshot_read_options;
2749 TransactionOptions txn_options;
2750 string value;
2751 Status s;
2752
2753 ColumnFamilyHandle* cf;
2754 ColumnFamilyOptions cf_options;
2755
2756 // Create a new column families
2757 s = db->CreateColumnFamily(cf_options, "CF", &cf);
2758 ASSERT_OK(s);
2759
2760 delete cf;
2761 delete db;
2762 db = nullptr;
2763
2764 // open DB with three column families
2765 std::vector<ColumnFamilyDescriptor> column_families;
2766 // have to open default column family
2767 column_families.push_back(
2768 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2769 // open the new column families
2770 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
2771 column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));
2772
2773 std::vector<ColumnFamilyHandle*> handles;
2774
2775 options.merge_operator = MergeOperators::CreateStringAppendOperator();
2776 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2777 assert(db != nullptr);
2778
2779 // Write some data to the db
2780 WriteBatch batch;
20effc67
TL
2781 ASSERT_OK(batch.Put(handles[1], "aaa", "val1"));
2782 ASSERT_OK(batch.Put(handles[1], "bbb", "val2"));
2783 ASSERT_OK(batch.Put(handles[1], "ccc", "val3"));
2784 ASSERT_OK(batch.Put(handles[1], "ddd", "foo"));
2785 ASSERT_OK(batch.Put(handles[1], "eee", "val5"));
2786 ASSERT_OK(batch.Put(handles[1], "fff", "val6"));
2787 ASSERT_OK(batch.Merge(handles[1], "ggg", "foo"));
f67539c2
TL
2788 s = db->Write(write_options, &batch);
2789 ASSERT_OK(s);
2790
2791 Transaction* txn = db->BeginTransaction(write_options);
2792 ASSERT_TRUE(txn);
2793
2794 txn->SetSnapshot();
2795 snapshot_read_options.snapshot = txn->GetSnapshot();
2796
2797 txn_options.set_snapshot = true;
2798 // Write some data to the db
2799 s = txn->Delete(handles[1], "bbb");
2800 ASSERT_OK(s);
2801 s = txn->Put(handles[1], "ccc", "val3_new");
2802 ASSERT_OK(s);
2803 s = txn->Merge(handles[1], "ddd", "bar");
2804 ASSERT_OK(s);
2805
2806 std::vector<Slice> keys = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg"};
2807 std::vector<PinnableSlice> values(keys.size());
2808 std::vector<Status> statuses(keys.size());
2809
2810 txn->MultiGet(snapshot_read_options, handles[1], keys.size(), keys.data(),
2811 values.data(), statuses.data());
2812 ASSERT_TRUE(statuses[0].ok());
2813 ASSERT_EQ(values[0], "val1");
2814 ASSERT_TRUE(statuses[1].IsNotFound());
2815 ASSERT_TRUE(statuses[2].ok());
2816 ASSERT_EQ(values[2], "val3_new");
2817 ASSERT_TRUE(statuses[3].IsMergeInProgress());
2818 ASSERT_TRUE(statuses[4].ok());
2819 ASSERT_EQ(values[4], "val5");
2820 ASSERT_TRUE(statuses[5].ok());
2821 ASSERT_EQ(values[5], "val6");
2822 ASSERT_TRUE(statuses[6].ok());
2823 ASSERT_EQ(values[6], "foo");
2824 delete txn;
2825 for (auto handle : handles) {
2826 delete handle;
2827 }
2828}
2829
2830// This test calls WriteBatchWithIndex::MultiGetFromBatchAndDB with a large
2831// number of keys, i.e greater than MultiGetContext::MAX_BATCH_SIZE, which is
2832// is 32. This forces autovector allocations in the MultiGet code paths
2833// to use std::vector in addition to stack allocations. The MultiGet keys
2834// includes Merges, which are handled specially in MultiGetFromBatchAndDB by
2835// allocating an autovector of MergeContexts
2836TEST_P(TransactionTest, MultiGetLargeBatchedTest) {
2837 WriteOptions write_options;
2838 ReadOptions read_options, snapshot_read_options;
2839 string value;
2840 Status s;
2841
2842 ColumnFamilyHandle* cf;
2843 ColumnFamilyOptions cf_options;
2844
2845 std::vector<std::string> key_str;
2846 for (int i = 0; i < 100; ++i) {
2847 key_str.emplace_back(std::to_string(i));
2848 }
2849 // Create a new column families
2850 s = db->CreateColumnFamily(cf_options, "CF", &cf);
2851 ASSERT_OK(s);
2852
2853 delete cf;
2854 delete db;
2855 db = nullptr;
2856
2857 // open DB with three column families
2858 std::vector<ColumnFamilyDescriptor> column_families;
2859 // have to open default column family
2860 column_families.push_back(
2861 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
2862 // open the new column families
2863 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
2864 column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));
2865
2866 std::vector<ColumnFamilyHandle*> handles;
2867
2868 options.merge_operator = MergeOperators::CreateStringAppendOperator();
2869 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
2870 assert(db != nullptr);
2871
2872 // Write some data to the db
2873 WriteBatch batch;
2874 for (int i = 0; i < 3 * MultiGetContext::MAX_BATCH_SIZE; ++i) {
2875 std::string val = "val" + std::to_string(i);
20effc67 2876 ASSERT_OK(batch.Put(handles[1], key_str[i], val));
f67539c2
TL
2877 }
2878 s = db->Write(write_options, &batch);
2879 ASSERT_OK(s);
2880
2881 WriteBatchWithIndex wb;
2882 // Write some data to the db
2883 s = wb.Delete(handles[1], std::to_string(1));
2884 ASSERT_OK(s);
2885 s = wb.Put(handles[1], std::to_string(2), "new_val" + std::to_string(2));
2886 ASSERT_OK(s);
2887 // Write a lot of merges so when we call MultiGetFromBatchAndDB later on,
2888 // it is forced to use std::vector in ROCKSDB_NAMESPACE::autovector to
2889 // allocate MergeContexts. The number of merges needs to be >
2890 // MultiGetContext::MAX_BATCH_SIZE
2891 for (int i = 8; i < MultiGetContext::MAX_BATCH_SIZE + 24; ++i) {
2892 s = wb.Merge(handles[1], std::to_string(i), "merge");
2893 ASSERT_OK(s);
2894 }
2895
2896 // MultiGet a lot of keys in order to force std::vector reallocations
2897 std::vector<Slice> keys;
2898 for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE + 32; ++i) {
2899 keys.emplace_back(key_str[i]);
2900 }
2901 std::vector<PinnableSlice> values(keys.size());
2902 std::vector<Status> statuses(keys.size());
2903
2904 wb.MultiGetFromBatchAndDB(db, snapshot_read_options, handles[1], keys.size(), keys.data(),
2905 values.data(), statuses.data(), false);
2906 for (size_t i =0; i < keys.size(); ++i) {
2907 if (i == 1) {
2908 ASSERT_TRUE(statuses[1].IsNotFound());
2909 } else if (i == 2) {
2910 ASSERT_TRUE(statuses[2].ok());
2911 ASSERT_EQ(values[2], "new_val" + std::to_string(2));
2912 } else if (i >= 8 && i < 56) {
2913 ASSERT_TRUE(statuses[i].ok());
2914 ASSERT_EQ(values[i], "val" + std::to_string(i) + ",merge");
2915 } else {
2916 ASSERT_TRUE(statuses[i].ok());
2917 if (values[i] != "val" + std::to_string(i)) {
2918 ASSERT_EQ(values[i], "val" + std::to_string(i));
2919 }
2920 }
2921 }
2922
2923 for (auto handle : handles) {
2924 delete handle;
2925 }
2926}
2927
7c673cae
FG
2928TEST_P(TransactionTest, ColumnFamiliesTest2) {
2929 WriteOptions write_options;
2930 ReadOptions read_options, snapshot_read_options;
7c673cae
FG
2931 string value;
2932 Status s;
2933
2934 ColumnFamilyHandle *one, *two;
2935 ColumnFamilyOptions cf_options;
2936
2937 // Create 2 new column families
2938 s = db->CreateColumnFamily(cf_options, "ONE", &one);
2939 ASSERT_OK(s);
2940 s = db->CreateColumnFamily(cf_options, "TWO", &two);
2941 ASSERT_OK(s);
2942
2943 Transaction* txn1 = db->BeginTransaction(write_options);
2944 ASSERT_TRUE(txn1);
2945 Transaction* txn2 = db->BeginTransaction(write_options);
2946 ASSERT_TRUE(txn2);
2947
2948 s = txn1->Put(one, "X", "1");
2949 ASSERT_OK(s);
2950 s = txn1->Put(two, "X", "2");
2951 ASSERT_OK(s);
2952 s = txn1->Put("X", "0");
2953 ASSERT_OK(s);
2954
2955 s = txn2->Put(one, "X", "11");
2956 ASSERT_TRUE(s.IsTimedOut());
2957
2958 s = txn1->Commit();
2959 ASSERT_OK(s);
2960
2961 // Drop first column family
2962 s = db->DropColumnFamily(one);
2963 ASSERT_OK(s);
2964
2965 // Should fail since column family was dropped.
2966 s = txn2->Commit();
2967 ASSERT_OK(s);
2968
2969 delete txn1;
2970 txn1 = db->BeginTransaction(write_options);
2971 ASSERT_TRUE(txn1);
2972
2973 // Should fail since column family was dropped
2974 s = txn1->Put(one, "X", "111");
2975 ASSERT_TRUE(s.IsInvalidArgument());
2976
2977 s = txn1->Put(two, "X", "222");
2978 ASSERT_OK(s);
2979
2980 s = txn1->Put("X", "000");
2981 ASSERT_OK(s);
2982
2983 s = txn1->Commit();
2984 ASSERT_OK(s);
2985
2986 s = db->Get(read_options, two, "X", &value);
2987 ASSERT_OK(s);
2988 ASSERT_EQ("222", value);
2989
2990 s = db->Get(read_options, "X", &value);
2991 ASSERT_OK(s);
2992 ASSERT_EQ("000", value);
2993
2994 s = db->DropColumnFamily(two);
2995 ASSERT_OK(s);
2996
2997 delete txn1;
2998 delete txn2;
2999
3000 delete one;
3001 delete two;
3002}
3003
3004TEST_P(TransactionTest, EmptyTest) {
3005 WriteOptions write_options;
3006 ReadOptions read_options;
3007 string value;
3008 Status s;
3009
3010 s = db->Put(write_options, "aaa", "aaa");
3011 ASSERT_OK(s);
3012
3013 Transaction* txn = db->BeginTransaction(write_options);
3014 s = txn->Commit();
3015 ASSERT_OK(s);
3016 delete txn;
3017
3018 txn = db->BeginTransaction(write_options);
20effc67 3019 ASSERT_OK(txn->Rollback());
7c673cae
FG
3020 delete txn;
3021
3022 txn = db->BeginTransaction(write_options);
3023 s = txn->GetForUpdate(read_options, "aaa", &value);
3024 ASSERT_EQ(value, "aaa");
3025
3026 s = txn->Commit();
3027 ASSERT_OK(s);
3028 delete txn;
3029
3030 txn = db->BeginTransaction(write_options);
3031 txn->SetSnapshot();
3032
3033 s = txn->GetForUpdate(read_options, "aaa", &value);
3034 ASSERT_EQ(value, "aaa");
3035
3036 // Conflicts with previous GetForUpdate
3037 s = db->Put(write_options, "aaa", "xxx");
3038 ASSERT_TRUE(s.IsTimedOut());
3039
3040 // transaction expired!
3041 s = txn->Commit();
3042 ASSERT_OK(s);
3043 delete txn;
3044}
3045
3046TEST_P(TransactionTest, PredicateManyPreceders) {
3047 WriteOptions write_options;
3048 ReadOptions read_options1, read_options2;
3049 TransactionOptions txn_options;
3050 string value;
3051 Status s;
3052
3053 txn_options.set_snapshot = true;
3054 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3055 read_options1.snapshot = txn1->GetSnapshot();
3056
3057 Transaction* txn2 = db->BeginTransaction(write_options);
3058 txn2->SetSnapshot();
3059 read_options2.snapshot = txn2->GetSnapshot();
3060
3061 std::vector<Slice> multiget_keys = {"1", "2", "3"};
3062 std::vector<std::string> multiget_values;
3063
3064 std::vector<Status> results =
3065 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
20effc67
TL
3066 ASSERT_EQ(results.size(), 3);
3067 ASSERT_TRUE(results[0].IsNotFound());
7c673cae 3068 ASSERT_TRUE(results[1].IsNotFound());
20effc67 3069 ASSERT_TRUE(results[2].IsNotFound());
7c673cae
FG
3070
3071 s = txn2->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate
3072 ASSERT_TRUE(s.IsTimedOut());
3073
20effc67 3074 ASSERT_OK(txn2->Rollback());
7c673cae
FG
3075
3076 multiget_values.clear();
3077 results =
3078 txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
20effc67
TL
3079 ASSERT_EQ(results.size(), 3);
3080 ASSERT_TRUE(results[0].IsNotFound());
7c673cae 3081 ASSERT_TRUE(results[1].IsNotFound());
20effc67 3082 ASSERT_TRUE(results[2].IsNotFound());
7c673cae
FG
3083
3084 s = txn1->Commit();
3085 ASSERT_OK(s);
3086
3087 delete txn1;
3088 delete txn2;
3089
3090 txn1 = db->BeginTransaction(write_options, txn_options);
3091 read_options1.snapshot = txn1->GetSnapshot();
3092
3093 txn2 = db->BeginTransaction(write_options, txn_options);
3094 read_options2.snapshot = txn2->GetSnapshot();
3095
3096 s = txn1->Put("4", "x");
3097 ASSERT_OK(s);
3098
3099 s = txn2->Delete("4"); // conflict
3100 ASSERT_TRUE(s.IsTimedOut());
3101
3102 s = txn1->Commit();
3103 ASSERT_OK(s);
3104
3105 s = txn2->GetForUpdate(read_options2, "4", &value);
3106 ASSERT_TRUE(s.IsBusy());
3107
20effc67 3108 ASSERT_OK(txn2->Rollback());
7c673cae
FG
3109
3110 delete txn1;
3111 delete txn2;
3112}
3113
3114TEST_P(TransactionTest, LostUpdate) {
3115 WriteOptions write_options;
3116 ReadOptions read_options, read_options1, read_options2;
3117 TransactionOptions txn_options;
11fdf7f2 3118 std::string value;
7c673cae
FG
3119 Status s;
3120
3121 // Test 2 transactions writing to the same key in multiple orders and
3122 // with/without snapshots
3123
3124 Transaction* txn1 = db->BeginTransaction(write_options);
3125 Transaction* txn2 = db->BeginTransaction(write_options);
3126
3127 s = txn1->Put("1", "1");
3128 ASSERT_OK(s);
3129
3130 s = txn2->Put("1", "2"); // conflict
3131 ASSERT_TRUE(s.IsTimedOut());
3132
3133 s = txn2->Commit();
3134 ASSERT_OK(s);
3135
3136 s = txn1->Commit();
3137 ASSERT_OK(s);
3138
3139 s = db->Get(read_options, "1", &value);
3140 ASSERT_OK(s);
3141 ASSERT_EQ("1", value);
3142
3143 delete txn1;
3144 delete txn2;
3145
3146 txn_options.set_snapshot = true;
3147 txn1 = db->BeginTransaction(write_options, txn_options);
3148 read_options1.snapshot = txn1->GetSnapshot();
3149
3150 txn2 = db->BeginTransaction(write_options, txn_options);
3151 read_options2.snapshot = txn2->GetSnapshot();
3152
3153 s = txn1->Put("1", "3");
3154 ASSERT_OK(s);
3155 s = txn2->Put("1", "4"); // conflict
3156 ASSERT_TRUE(s.IsTimedOut());
3157
3158 s = txn1->Commit();
3159 ASSERT_OK(s);
3160
3161 s = txn2->Commit();
3162 ASSERT_OK(s);
3163
3164 s = db->Get(read_options, "1", &value);
3165 ASSERT_OK(s);
3166 ASSERT_EQ("3", value);
3167
3168 delete txn1;
3169 delete txn2;
3170
3171 txn1 = db->BeginTransaction(write_options, txn_options);
3172 read_options1.snapshot = txn1->GetSnapshot();
3173
3174 txn2 = db->BeginTransaction(write_options, txn_options);
3175 read_options2.snapshot = txn2->GetSnapshot();
3176
3177 s = txn1->Put("1", "5");
3178 ASSERT_OK(s);
3179
3180 s = txn1->Commit();
3181 ASSERT_OK(s);
3182
3183 s = txn2->Put("1", "6");
3184 ASSERT_TRUE(s.IsBusy());
3185 s = txn2->Commit();
3186 ASSERT_OK(s);
3187
3188 s = db->Get(read_options, "1", &value);
3189 ASSERT_OK(s);
3190 ASSERT_EQ("5", value);
3191
3192 delete txn1;
3193 delete txn2;
3194
3195 txn1 = db->BeginTransaction(write_options, txn_options);
3196 read_options1.snapshot = txn1->GetSnapshot();
3197
3198 txn2 = db->BeginTransaction(write_options, txn_options);
3199 read_options2.snapshot = txn2->GetSnapshot();
3200
3201 s = txn1->Put("1", "7");
3202 ASSERT_OK(s);
3203 s = txn1->Commit();
3204 ASSERT_OK(s);
3205
3206 txn2->SetSnapshot();
3207 s = txn2->Put("1", "8");
3208 ASSERT_OK(s);
3209 s = txn2->Commit();
3210 ASSERT_OK(s);
3211
3212 s = db->Get(read_options, "1", &value);
3213 ASSERT_OK(s);
3214 ASSERT_EQ("8", value);
3215
3216 delete txn1;
3217 delete txn2;
3218
3219 txn1 = db->BeginTransaction(write_options);
3220 txn2 = db->BeginTransaction(write_options);
3221
3222 s = txn1->Put("1", "9");
3223 ASSERT_OK(s);
3224 s = txn1->Commit();
3225 ASSERT_OK(s);
3226
3227 s = txn2->Put("1", "10");
3228 ASSERT_OK(s);
3229 s = txn2->Commit();
3230 ASSERT_OK(s);
3231
3232 delete txn1;
3233 delete txn2;
3234
3235 s = db->Get(read_options, "1", &value);
3236 ASSERT_OK(s);
3237 ASSERT_EQ(value, "10");
3238}
3239
3240TEST_P(TransactionTest, UntrackedWrites) {
f67539c2
TL
3241 if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3242 // TODO(lth): For WriteUnprepared, validate that untracked writes are
3243 // not supported.
3244 return;
3245 }
3246
7c673cae
FG
3247 WriteOptions write_options;
3248 ReadOptions read_options;
11fdf7f2 3249 std::string value;
7c673cae
FG
3250 Status s;
3251
3252 // Verify transaction rollback works for untracked keys.
3253 Transaction* txn = db->BeginTransaction(write_options);
3254 txn->SetSnapshot();
3255
3256 s = txn->PutUntracked("untracked", "0");
3257 ASSERT_OK(s);
20effc67 3258 ASSERT_OK(txn->Rollback());
7c673cae
FG
3259 s = db->Get(read_options, "untracked", &value);
3260 ASSERT_TRUE(s.IsNotFound());
3261
3262 delete txn;
3263 txn = db->BeginTransaction(write_options);
3264 txn->SetSnapshot();
3265
3266 s = db->Put(write_options, "untracked", "x");
3267 ASSERT_OK(s);
3268
3269 // Untracked writes should succeed even though key was written after snapshot
3270 s = txn->PutUntracked("untracked", "1");
3271 ASSERT_OK(s);
3272 s = txn->MergeUntracked("untracked", "2");
3273 ASSERT_OK(s);
3274 s = txn->DeleteUntracked("untracked");
3275 ASSERT_OK(s);
3276
3277 // Conflict
3278 s = txn->Put("untracked", "3");
3279 ASSERT_TRUE(s.IsBusy());
3280
3281 s = txn->Commit();
3282 ASSERT_OK(s);
3283
3284 s = db->Get(read_options, "untracked", &value);
3285 ASSERT_TRUE(s.IsNotFound());
3286
3287 delete txn;
3288}
3289
3290TEST_P(TransactionTest, ExpiredTransaction) {
3291 WriteOptions write_options;
3292 ReadOptions read_options;
3293 TransactionOptions txn_options;
3294 string value;
3295 Status s;
3296
3297 // Set txn expiration timeout to 0 microseconds (expires instantly)
3298 txn_options.expiration = 0;
3299 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3300
3301 s = txn1->Put("X", "1");
3302 ASSERT_OK(s);
3303
3304 s = txn1->Put("Y", "1");
3305 ASSERT_OK(s);
3306
3307 Transaction* txn2 = db->BeginTransaction(write_options);
3308
3309 // txn2 should be able to write to X since txn1 has expired
3310 s = txn2->Put("X", "2");
3311 ASSERT_OK(s);
3312
3313 s = txn2->Commit();
3314 ASSERT_OK(s);
3315 s = db->Get(read_options, "X", &value);
3316 ASSERT_OK(s);
3317 ASSERT_EQ("2", value);
3318
3319 s = txn1->Put("Z", "1");
3320 ASSERT_OK(s);
3321
3322 // txn1 should fail to commit since it is expired
3323 s = txn1->Commit();
3324 ASSERT_TRUE(s.IsExpired());
3325
3326 s = db->Get(read_options, "Y", &value);
3327 ASSERT_TRUE(s.IsNotFound());
3328
3329 s = db->Get(read_options, "Z", &value);
3330 ASSERT_TRUE(s.IsNotFound());
3331
3332 delete txn1;
3333 delete txn2;
3334}
3335
3336TEST_P(TransactionTest, ReinitializeTest) {
3337 WriteOptions write_options;
3338 ReadOptions read_options;
3339 TransactionOptions txn_options;
11fdf7f2 3340 std::string value;
7c673cae
FG
3341 Status s;
3342
3343 // Set txn expiration timeout to 0 microseconds (expires instantly)
3344 txn_options.expiration = 0;
3345 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3346
3347 // Reinitialize transaction to no long expire
3348 txn_options.expiration = -1;
3349 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3350
3351 s = txn1->Put("Z", "z");
3352 ASSERT_OK(s);
3353
3354 // Should commit since not expired
3355 s = txn1->Commit();
3356 ASSERT_OK(s);
3357
3358 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3359
3360 s = txn1->Put("Z", "zz");
3361 ASSERT_OK(s);
3362
3363 // Reinitilize txn1 and verify that Z gets unlocked
3364 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3365
3366 Transaction* txn2 = db->BeginTransaction(write_options, txn_options, nullptr);
3367 s = txn2->Put("Z", "zzz");
3368 ASSERT_OK(s);
3369 s = txn2->Commit();
3370 ASSERT_OK(s);
3371 delete txn2;
3372
3373 s = db->Get(read_options, "Z", &value);
3374 ASSERT_OK(s);
3375 ASSERT_EQ(value, "zzz");
3376
3377 // Verify snapshots get reinitialized correctly
3378 txn1->SetSnapshot();
3379 s = txn1->Put("Z", "zzzz");
3380 ASSERT_OK(s);
3381
3382 s = txn1->Commit();
3383 ASSERT_OK(s);
3384
3385 s = db->Get(read_options, "Z", &value);
3386 ASSERT_OK(s);
3387 ASSERT_EQ(value, "zzzz");
3388
3389 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3390 const Snapshot* snapshot = txn1->GetSnapshot();
3391 ASSERT_FALSE(snapshot);
3392
3393 txn_options.set_snapshot = true;
3394 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3395 snapshot = txn1->GetSnapshot();
3396 ASSERT_TRUE(snapshot);
3397
3398 s = txn1->Put("Z", "a");
3399 ASSERT_OK(s);
3400
20effc67 3401 ASSERT_OK(txn1->Rollback());
7c673cae
FG
3402
3403 s = txn1->Put("Y", "y");
3404 ASSERT_OK(s);
3405
3406 txn_options.set_snapshot = false;
3407 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3408 snapshot = txn1->GetSnapshot();
3409 ASSERT_FALSE(snapshot);
3410
3411 s = txn1->Put("X", "x");
3412 ASSERT_OK(s);
3413
3414 s = txn1->Commit();
3415 ASSERT_OK(s);
3416
3417 s = db->Get(read_options, "Z", &value);
3418 ASSERT_OK(s);
3419 ASSERT_EQ(value, "zzzz");
3420
3421 s = db->Get(read_options, "Y", &value);
3422 ASSERT_TRUE(s.IsNotFound());
3423
3424 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3425
3426 s = txn1->SetName("name");
3427 ASSERT_OK(s);
3428
3429 s = txn1->Prepare();
3430 ASSERT_OK(s);
3431 s = txn1->Commit();
3432 ASSERT_OK(s);
3433
3434 txn1 = db->BeginTransaction(write_options, txn_options, txn1);
3435
3436 s = txn1->SetName("name");
3437 ASSERT_OK(s);
3438
3439 delete txn1;
3440}
3441
3442TEST_P(TransactionTest, Rollback) {
3443 WriteOptions write_options;
3444 ReadOptions read_options;
3445 TransactionOptions txn_options;
11fdf7f2 3446 std::string value;
7c673cae
FG
3447 Status s;
3448
3449 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3450
3451 ASSERT_OK(s);
3452
3453 s = txn1->Put("X", "1");
3454 ASSERT_OK(s);
3455
3456 Transaction* txn2 = db->BeginTransaction(write_options);
3457
3458 // txn2 should not be able to write to X since txn1 has it locked
3459 s = txn2->Put("X", "2");
3460 ASSERT_TRUE(s.IsTimedOut());
3461
20effc67 3462 ASSERT_OK(txn1->Rollback());
7c673cae
FG
3463 delete txn1;
3464
3465 // txn2 should now be able to write to X
3466 s = txn2->Put("X", "3");
3467 ASSERT_OK(s);
3468
3469 s = txn2->Commit();
3470 ASSERT_OK(s);
3471
3472 s = db->Get(read_options, "X", &value);
3473 ASSERT_OK(s);
3474 ASSERT_EQ("3", value);
3475
3476 delete txn2;
3477}
3478
3479TEST_P(TransactionTest, LockLimitTest) {
3480 WriteOptions write_options;
3481 ReadOptions read_options, snapshot_read_options;
3482 TransactionOptions txn_options;
3483 string value;
3484 Status s;
3485
3486 delete db;
11fdf7f2 3487 db = nullptr;
7c673cae
FG
3488
3489 // Open DB with a lock limit of 3
3490 txn_db_options.max_num_locks = 3;
f67539c2 3491 ASSERT_OK(ReOpen());
11fdf7f2 3492 assert(db != nullptr);
7c673cae
FG
3493 ASSERT_OK(s);
3494
3495 // Create a txn and verify we can only lock up to 3 keys
11fdf7f2 3496 Transaction* txn = db->BeginTransaction(write_options, txn_options);
7c673cae
FG
3497 ASSERT_TRUE(txn);
3498
3499 s = txn->Put("X", "x");
3500 ASSERT_OK(s);
3501
3502 s = txn->Put("Y", "y");
3503 ASSERT_OK(s);
3504
3505 s = txn->Put("Z", "z");
3506 ASSERT_OK(s);
3507
3508 // lock limit reached
3509 s = txn->Put("W", "w");
3510 ASSERT_TRUE(s.IsBusy());
3511
3512 // re-locking same key shouldn't put us over the limit
3513 s = txn->Put("X", "xx");
3514 ASSERT_OK(s);
3515
3516 s = txn->GetForUpdate(read_options, "W", &value);
3517 ASSERT_TRUE(s.IsBusy());
3518 s = txn->GetForUpdate(read_options, "V", &value);
3519 ASSERT_TRUE(s.IsBusy());
3520
3521 // re-locking same key shouldn't put us over the limit
3522 s = txn->GetForUpdate(read_options, "Y", &value);
3523 ASSERT_OK(s);
3524 ASSERT_EQ("y", value);
3525
3526 s = txn->Get(read_options, "W", &value);
3527 ASSERT_TRUE(s.IsNotFound());
3528
11fdf7f2 3529 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
7c673cae
FG
3530 ASSERT_TRUE(txn2);
3531
3532 // "X" currently locked
3533 s = txn2->Put("X", "x");
3534 ASSERT_TRUE(s.IsTimedOut());
3535
3536 // lock limit reached
3537 s = txn2->Put("M", "m");
3538 ASSERT_TRUE(s.IsBusy());
3539
3540 s = txn->Commit();
3541 ASSERT_OK(s);
3542
3543 s = db->Get(read_options, "X", &value);
3544 ASSERT_OK(s);
3545 ASSERT_EQ("xx", value);
3546
3547 s = db->Get(read_options, "W", &value);
3548 ASSERT_TRUE(s.IsNotFound());
3549
3550 // Committing txn should release its locks and allow txn2 to proceed
3551 s = txn2->Put("X", "x2");
3552 ASSERT_OK(s);
3553
3554 s = txn2->Delete("X");
3555 ASSERT_OK(s);
3556
3557 s = txn2->Put("M", "m");
3558 ASSERT_OK(s);
3559
3560 s = txn2->Put("Z", "z2");
3561 ASSERT_OK(s);
3562
3563 // lock limit reached
3564 s = txn2->Delete("Y");
3565 ASSERT_TRUE(s.IsBusy());
3566
3567 s = txn2->Commit();
3568 ASSERT_OK(s);
3569
3570 s = db->Get(read_options, "Z", &value);
3571 ASSERT_OK(s);
3572 ASSERT_EQ("z2", value);
3573
3574 s = db->Get(read_options, "Y", &value);
3575 ASSERT_OK(s);
3576 ASSERT_EQ("y", value);
3577
3578 s = db->Get(read_options, "X", &value);
3579 ASSERT_TRUE(s.IsNotFound());
3580
3581 delete txn;
3582 delete txn2;
3583}
3584
3585TEST_P(TransactionTest, IteratorTest) {
f67539c2
TL
3586 // This test does writes without snapshot validation, and then tries to create
3587 // iterator later, which is unsupported in write unprepared.
3588 if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3589 return;
3590 }
3591
7c673cae
FG
3592 WriteOptions write_options;
3593 ReadOptions read_options, snapshot_read_options;
11fdf7f2 3594 std::string value;
7c673cae
FG
3595 Status s;
3596
3597 // Write some keys to the db
3598 s = db->Put(write_options, "A", "a");
3599 ASSERT_OK(s);
3600
3601 s = db->Put(write_options, "G", "g");
3602 ASSERT_OK(s);
3603
3604 s = db->Put(write_options, "F", "f");
3605 ASSERT_OK(s);
3606
3607 s = db->Put(write_options, "C", "c");
3608 ASSERT_OK(s);
3609
3610 s = db->Put(write_options, "D", "d");
3611 ASSERT_OK(s);
3612
3613 Transaction* txn = db->BeginTransaction(write_options);
3614 ASSERT_TRUE(txn);
3615
3616 // Write some keys in a txn
3617 s = txn->Put("B", "b");
3618 ASSERT_OK(s);
3619
3620 s = txn->Put("H", "h");
3621 ASSERT_OK(s);
3622
3623 s = txn->Delete("D");
3624 ASSERT_OK(s);
3625
3626 s = txn->Put("E", "e");
3627 ASSERT_OK(s);
3628
3629 txn->SetSnapshot();
3630 const Snapshot* snapshot = txn->GetSnapshot();
3631
3632 // Write some keys to the db after the snapshot
3633 s = db->Put(write_options, "BB", "xx");
3634 ASSERT_OK(s);
3635
3636 s = db->Put(write_options, "C", "xx");
3637 ASSERT_OK(s);
3638
3639 read_options.snapshot = snapshot;
3640 Iterator* iter = txn->GetIterator(read_options);
3641 ASSERT_OK(iter->status());
3642 iter->SeekToFirst();
3643
3644 // Read all keys via iter and lock them all
3645 std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
3646 for (int i = 0; i < 7; i++) {
3647 ASSERT_OK(iter->status());
3648 ASSERT_TRUE(iter->Valid());
3649 ASSERT_EQ(results[i], iter->value().ToString());
3650
3651 s = txn->GetForUpdate(read_options, iter->key(), nullptr);
3652 if (i == 2) {
3653 // "C" was modified after txn's snapshot
3654 ASSERT_TRUE(s.IsBusy());
3655 } else {
3656 ASSERT_OK(s);
3657 }
3658
3659 iter->Next();
3660 }
3661 ASSERT_FALSE(iter->Valid());
3662
3663 iter->Seek("G");
3664 ASSERT_OK(iter->status());
3665 ASSERT_TRUE(iter->Valid());
3666 ASSERT_EQ("g", iter->value().ToString());
3667
3668 iter->Prev();
3669 ASSERT_OK(iter->status());
3670 ASSERT_TRUE(iter->Valid());
3671 ASSERT_EQ("f", iter->value().ToString());
3672
3673 iter->Seek("D");
3674 ASSERT_OK(iter->status());
3675 ASSERT_TRUE(iter->Valid());
3676 ASSERT_EQ("e", iter->value().ToString());
3677
3678 iter->Seek("C");
3679 ASSERT_OK(iter->status());
3680 ASSERT_TRUE(iter->Valid());
3681 ASSERT_EQ("c", iter->value().ToString());
3682
3683 iter->Next();
3684 ASSERT_OK(iter->status());
3685 ASSERT_TRUE(iter->Valid());
3686 ASSERT_EQ("e", iter->value().ToString());
3687
3688 iter->Seek("");
3689 ASSERT_OK(iter->status());
3690 ASSERT_TRUE(iter->Valid());
3691 ASSERT_EQ("a", iter->value().ToString());
3692
3693 iter->Seek("X");
3694 ASSERT_OK(iter->status());
3695 ASSERT_FALSE(iter->Valid());
3696
3697 iter->SeekToLast();
3698 ASSERT_OK(iter->status());
3699 ASSERT_TRUE(iter->Valid());
3700 ASSERT_EQ("h", iter->value().ToString());
3701
3702 s = txn->Commit();
3703 ASSERT_OK(s);
3704
3705 delete iter;
3706 delete txn;
3707}
3708
3709TEST_P(TransactionTest, DisableIndexingTest) {
f67539c2
TL
3710 // Skip this test for write unprepared. It does not solely rely on WBWI for
3711 // read your own writes, so depending on whether batches are flushed or not,
3712 // only some writes will be visible.
3713 //
3714 // Also, write unprepared does not support creating iterators if there has
3715 // been txn->Put() without snapshot validation.
3716 if (txn_db_options.write_policy == WRITE_UNPREPARED) {
3717 return;
3718 }
3719
7c673cae
FG
3720 WriteOptions write_options;
3721 ReadOptions read_options;
11fdf7f2 3722 std::string value;
7c673cae
FG
3723 Status s;
3724
3725 Transaction* txn = db->BeginTransaction(write_options);
3726 ASSERT_TRUE(txn);
3727
3728 s = txn->Put("A", "a");
3729 ASSERT_OK(s);
3730
3731 s = txn->Get(read_options, "A", &value);
3732 ASSERT_OK(s);
3733 ASSERT_EQ("a", value);
3734
3735 txn->DisableIndexing();
3736
3737 s = txn->Put("B", "b");
3738 ASSERT_OK(s);
3739
3740 s = txn->Get(read_options, "B", &value);
3741 ASSERT_TRUE(s.IsNotFound());
3742
3743 Iterator* iter = txn->GetIterator(read_options);
3744 ASSERT_OK(iter->status());
3745
3746 iter->Seek("B");
3747 ASSERT_OK(iter->status());
3748 ASSERT_FALSE(iter->Valid());
3749
3750 s = txn->Delete("A");
3751
3752 s = txn->Get(read_options, "A", &value);
3753 ASSERT_OK(s);
3754 ASSERT_EQ("a", value);
3755
3756 txn->EnableIndexing();
3757
3758 s = txn->Put("B", "bb");
3759 ASSERT_OK(s);
3760
3761 iter->Seek("B");
3762 ASSERT_OK(iter->status());
3763 ASSERT_TRUE(iter->Valid());
3764 ASSERT_EQ("bb", iter->value().ToString());
3765
3766 s = txn->Get(read_options, "B", &value);
3767 ASSERT_OK(s);
3768 ASSERT_EQ("bb", value);
3769
3770 s = txn->Put("A", "aa");
3771 ASSERT_OK(s);
3772
3773 s = txn->Get(read_options, "A", &value);
3774 ASSERT_OK(s);
3775 ASSERT_EQ("aa", value);
3776
3777 delete iter;
3778 delete txn;
3779}
3780
3781TEST_P(TransactionTest, SavepointTest) {
3782 WriteOptions write_options;
3783 ReadOptions read_options, snapshot_read_options;
11fdf7f2 3784 std::string value;
7c673cae
FG
3785 Status s;
3786
3787 Transaction* txn = db->BeginTransaction(write_options);
3788 ASSERT_TRUE(txn);
3789
3790 ASSERT_EQ(0, txn->GetNumPuts());
3791
3792 s = txn->RollbackToSavePoint();
3793 ASSERT_TRUE(s.IsNotFound());
3794
3795 txn->SetSavePoint(); // 1
3796
3797 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
3798 s = txn->RollbackToSavePoint();
3799 ASSERT_TRUE(s.IsNotFound());
3800
3801 s = txn->Put("B", "b");
3802 ASSERT_OK(s);
3803
3804 ASSERT_EQ(1, txn->GetNumPuts());
3805 ASSERT_EQ(0, txn->GetNumDeletes());
3806
3807 s = txn->Commit();
3808 ASSERT_OK(s);
3809
3810 s = db->Get(read_options, "B", &value);
3811 ASSERT_OK(s);
3812 ASSERT_EQ("b", value);
3813
3814 delete txn;
3815 txn = db->BeginTransaction(write_options);
3816 ASSERT_TRUE(txn);
3817
3818 s = txn->Put("A", "a");
3819 ASSERT_OK(s);
3820
3821 s = txn->Put("B", "bb");
3822 ASSERT_OK(s);
3823
3824 s = txn->Put("C", "c");
3825 ASSERT_OK(s);
3826
3827 txn->SetSavePoint(); // 2
3828
3829 s = txn->Delete("B");
3830 ASSERT_OK(s);
3831
3832 s = txn->Put("C", "cc");
3833 ASSERT_OK(s);
3834
3835 s = txn->Put("D", "d");
3836 ASSERT_OK(s);
3837
3838 ASSERT_EQ(5, txn->GetNumPuts());
3839 ASSERT_EQ(1, txn->GetNumDeletes());
3840
3841 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
3842
3843 ASSERT_EQ(3, txn->GetNumPuts());
3844 ASSERT_EQ(0, txn->GetNumDeletes());
3845
3846 s = txn->Get(read_options, "A", &value);
3847 ASSERT_OK(s);
3848 ASSERT_EQ("a", value);
3849
3850 s = txn->Get(read_options, "B", &value);
3851 ASSERT_OK(s);
3852 ASSERT_EQ("bb", value);
3853
3854 s = txn->Get(read_options, "C", &value);
3855 ASSERT_OK(s);
3856 ASSERT_EQ("c", value);
3857
3858 s = txn->Get(read_options, "D", &value);
3859 ASSERT_TRUE(s.IsNotFound());
3860
3861 s = txn->Put("A", "a");
3862 ASSERT_OK(s);
3863
3864 s = txn->Put("E", "e");
3865 ASSERT_OK(s);
3866
3867 ASSERT_EQ(5, txn->GetNumPuts());
3868 ASSERT_EQ(0, txn->GetNumDeletes());
3869
3870 // Rollback to beginning of txn
3871 s = txn->RollbackToSavePoint();
3872 ASSERT_TRUE(s.IsNotFound());
20effc67 3873 ASSERT_OK(txn->Rollback());
7c673cae
FG
3874
3875 ASSERT_EQ(0, txn->GetNumPuts());
3876 ASSERT_EQ(0, txn->GetNumDeletes());
3877
3878 s = txn->Get(read_options, "A", &value);
3879 ASSERT_TRUE(s.IsNotFound());
3880
3881 s = txn->Get(read_options, "B", &value);
3882 ASSERT_OK(s);
3883 ASSERT_EQ("b", value);
3884
3885 s = txn->Get(read_options, "D", &value);
3886 ASSERT_TRUE(s.IsNotFound());
3887
3888 s = txn->Get(read_options, "D", &value);
3889 ASSERT_TRUE(s.IsNotFound());
3890
3891 s = txn->Get(read_options, "E", &value);
3892 ASSERT_TRUE(s.IsNotFound());
3893
3894 s = txn->Put("A", "aa");
3895 ASSERT_OK(s);
3896
3897 s = txn->Put("F", "f");
3898 ASSERT_OK(s);
3899
3900 ASSERT_EQ(2, txn->GetNumPuts());
3901 ASSERT_EQ(0, txn->GetNumDeletes());
3902
3903 txn->SetSavePoint(); // 3
3904 txn->SetSavePoint(); // 4
3905
3906 s = txn->Put("G", "g");
3907 ASSERT_OK(s);
3908
3909 s = txn->SingleDelete("F");
3910 ASSERT_OK(s);
3911
3912 s = txn->Delete("B");
3913 ASSERT_OK(s);
3914
3915 s = txn->Get(read_options, "A", &value);
3916 ASSERT_OK(s);
3917 ASSERT_EQ("aa", value);
3918
3919 s = txn->Get(read_options, "F", &value);
3920 // According to db.h, doing a SingleDelete on a key that has been
3921 // overwritten will have undefinied behavior. So it is unclear what the
3922 // result of fetching "F" should be. The current implementation will
3923 // return NotFound in this case.
3924 ASSERT_TRUE(s.IsNotFound());
3925
3926 s = txn->Get(read_options, "B", &value);
3927 ASSERT_TRUE(s.IsNotFound());
3928
3929 ASSERT_EQ(3, txn->GetNumPuts());
3930 ASSERT_EQ(2, txn->GetNumDeletes());
3931
3932 ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
3933
3934 ASSERT_EQ(2, txn->GetNumPuts());
3935 ASSERT_EQ(0, txn->GetNumDeletes());
3936
3937 s = txn->Get(read_options, "F", &value);
3938 ASSERT_OK(s);
3939 ASSERT_EQ("f", value);
3940
3941 s = txn->Get(read_options, "G", &value);
3942 ASSERT_TRUE(s.IsNotFound());
3943
3944 s = txn->Commit();
3945 ASSERT_OK(s);
3946
3947 s = db->Get(read_options, "F", &value);
3948 ASSERT_OK(s);
3949 ASSERT_EQ("f", value);
3950
3951 s = db->Get(read_options, "G", &value);
3952 ASSERT_TRUE(s.IsNotFound());
3953
3954 s = db->Get(read_options, "A", &value);
3955 ASSERT_OK(s);
3956 ASSERT_EQ("aa", value);
3957
3958 s = db->Get(read_options, "B", &value);
3959 ASSERT_OK(s);
3960 ASSERT_EQ("b", value);
3961
3962 s = db->Get(read_options, "C", &value);
3963 ASSERT_TRUE(s.IsNotFound());
3964
3965 s = db->Get(read_options, "D", &value);
3966 ASSERT_TRUE(s.IsNotFound());
3967
3968 s = db->Get(read_options, "E", &value);
3969 ASSERT_TRUE(s.IsNotFound());
3970
3971 delete txn;
3972}
3973
3974TEST_P(TransactionTest, SavepointTest2) {
3975 WriteOptions write_options;
3976 ReadOptions read_options;
3977 TransactionOptions txn_options;
3978 Status s;
3979
3980 txn_options.lock_timeout = 1; // 1 ms
3981 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
3982 ASSERT_TRUE(txn1);
3983
3984 s = txn1->Put("A", "");
3985 ASSERT_OK(s);
3986
3987 txn1->SetSavePoint(); // 1
3988
3989 s = txn1->Put("A", "a");
3990 ASSERT_OK(s);
3991
3992 s = txn1->Put("C", "c");
3993 ASSERT_OK(s);
3994
3995 txn1->SetSavePoint(); // 2
3996
3997 s = txn1->Put("A", "a");
3998 ASSERT_OK(s);
3999 s = txn1->Put("B", "b");
4000 ASSERT_OK(s);
4001
4002 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 2
4003
4004 // Verify that "A" and "C" is still locked while "B" is not
4005 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4006 ASSERT_TRUE(txn2);
4007
4008 s = txn2->Put("A", "a2");
4009 ASSERT_TRUE(s.IsTimedOut());
4010 s = txn2->Put("C", "c2");
4011 ASSERT_TRUE(s.IsTimedOut());
4012 s = txn2->Put("B", "b2");
4013 ASSERT_OK(s);
4014
4015 s = txn1->Put("A", "aa");
4016 ASSERT_OK(s);
4017 s = txn1->Put("B", "bb");
4018 ASSERT_TRUE(s.IsTimedOut());
4019
4020 s = txn2->Commit();
4021 ASSERT_OK(s);
4022 delete txn2;
4023
4024 s = txn1->Put("A", "aaa");
4025 ASSERT_OK(s);
4026 s = txn1->Put("B", "bbb");
4027 ASSERT_OK(s);
4028 s = txn1->Put("C", "ccc");
4029 ASSERT_OK(s);
4030
4031 txn1->SetSavePoint(); // 3
4032 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 3
4033
4034 // Verify that "A", "B", "C" are still locked
4035 txn2 = db->BeginTransaction(write_options, txn_options);
4036 ASSERT_TRUE(txn2);
4037
4038 s = txn2->Put("A", "a2");
4039 ASSERT_TRUE(s.IsTimedOut());
4040 s = txn2->Put("B", "b2");
4041 ASSERT_TRUE(s.IsTimedOut());
4042 s = txn2->Put("C", "c2");
4043 ASSERT_TRUE(s.IsTimedOut());
4044
4045 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
4046
4047 // Verify that only "A" is locked
4048 s = txn2->Put("A", "a3");
4049 ASSERT_TRUE(s.IsTimedOut());
4050 s = txn2->Put("B", "b3");
4051 ASSERT_OK(s);
4052 s = txn2->Put("C", "c3po");
4053 ASSERT_OK(s);
4054
4055 s = txn1->Commit();
4056 ASSERT_OK(s);
4057 delete txn1;
4058
4059 // Verify "A" "C" "B" are no longer locked
4060 s = txn2->Put("A", "a4");
4061 ASSERT_OK(s);
4062 s = txn2->Put("B", "b4");
4063 ASSERT_OK(s);
4064 s = txn2->Put("C", "c4");
4065 ASSERT_OK(s);
4066
4067 s = txn2->Commit();
4068 ASSERT_OK(s);
4069 delete txn2;
4070}
4071
11fdf7f2
TL
4072TEST_P(TransactionTest, SavepointTest3) {
4073 WriteOptions write_options;
4074 ReadOptions read_options;
4075 TransactionOptions txn_options;
4076 Status s;
4077
4078 txn_options.lock_timeout = 1; // 1 ms
4079 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4080 ASSERT_TRUE(txn1);
4081
4082 s = txn1->PopSavePoint(); // No SavePoint present
4083 ASSERT_TRUE(s.IsNotFound());
4084
4085 s = txn1->Put("A", "");
4086 ASSERT_OK(s);
4087
4088 s = txn1->PopSavePoint(); // Still no SavePoint present
4089 ASSERT_TRUE(s.IsNotFound());
4090
4091 txn1->SetSavePoint(); // 1
4092
4093 s = txn1->Put("A", "a");
4094 ASSERT_OK(s);
4095
4096 s = txn1->PopSavePoint(); // Remove 1
4097 ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound());
4098
4099 // Verify that "A" is still locked
4100 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4101 ASSERT_TRUE(txn2);
4102
4103 s = txn2->Put("A", "a2");
4104 ASSERT_TRUE(s.IsTimedOut());
4105 delete txn2;
4106
4107 txn1->SetSavePoint(); // 2
4108
4109 s = txn1->Put("B", "b");
4110 ASSERT_OK(s);
4111
4112 txn1->SetSavePoint(); // 3
4113
4114 s = txn1->Put("B", "b2");
4115 ASSERT_OK(s);
4116
4117 ASSERT_OK(txn1->RollbackToSavePoint()); // Roll back to 2
4118
4119 s = txn1->PopSavePoint();
4120 ASSERT_OK(s);
4121
4122 s = txn1->PopSavePoint();
4123 ASSERT_TRUE(s.IsNotFound());
4124
4125 s = txn1->Commit();
4126 ASSERT_OK(s);
4127 delete txn1;
4128
4129 std::string value;
4130
4131 // tnx1 should have modified "A" to "a"
4132 s = db->Get(read_options, "A", &value);
4133 ASSERT_OK(s);
4134 ASSERT_EQ("a", value);
4135
4136 // tnx1 should have set "B" to just "b"
4137 s = db->Get(read_options, "B", &value);
4138 ASSERT_OK(s);
4139 ASSERT_EQ("b", value);
4140
4141 s = db->Get(read_options, "C", &value);
4142 ASSERT_TRUE(s.IsNotFound());
4143}
4144
f67539c2
TL
4145TEST_P(TransactionTest, SavepointTest4) {
4146 WriteOptions write_options;
4147 ReadOptions read_options;
4148 TransactionOptions txn_options;
4149 Status s;
4150
4151 txn_options.lock_timeout = 1; // 1 ms
4152 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4153 ASSERT_TRUE(txn1);
4154
4155 txn1->SetSavePoint(); // 1
4156 s = txn1->Put("A", "a");
4157 ASSERT_OK(s);
4158
4159 txn1->SetSavePoint(); // 2
4160 s = txn1->Put("B", "b");
4161 ASSERT_OK(s);
4162
4163 s = txn1->PopSavePoint(); // Remove 2
4164 ASSERT_OK(s);
4165
4166 // Verify that A/B still exists.
4167 std::string value;
4168 ASSERT_OK(txn1->Get(read_options, "A", &value));
4169 ASSERT_EQ("a", value);
4170
4171 ASSERT_OK(txn1->Get(read_options, "B", &value));
4172 ASSERT_EQ("b", value);
4173
4174 ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
4175
4176 // Verify that everything was rolled back.
4177 s = txn1->Get(read_options, "A", &value);
4178 ASSERT_TRUE(s.IsNotFound());
4179
4180 s = txn1->Get(read_options, "B", &value);
4181 ASSERT_TRUE(s.IsNotFound());
4182
4183 // Nothing should be locked
4184 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4185 ASSERT_TRUE(txn2);
4186
4187 s = txn2->Put("A", "");
4188 ASSERT_OK(s);
4189
4190 s = txn2->Put("B", "");
4191 ASSERT_OK(s);
4192
4193 delete txn2;
4194 delete txn1;
4195}
4196
7c673cae
FG
4197TEST_P(TransactionTest, UndoGetForUpdateTest) {
4198 WriteOptions write_options;
4199 ReadOptions read_options;
4200 TransactionOptions txn_options;
11fdf7f2 4201 std::string value;
7c673cae
FG
4202 Status s;
4203
4204 txn_options.lock_timeout = 1; // 1 ms
4205 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4206 ASSERT_TRUE(txn1);
4207
4208 txn1->UndoGetForUpdate("A");
4209
4210 s = txn1->Commit();
4211 ASSERT_OK(s);
4212 delete txn1;
4213
4214 txn1 = db->BeginTransaction(write_options, txn_options);
4215
4216 txn1->UndoGetForUpdate("A");
4217 s = txn1->GetForUpdate(read_options, "A", &value);
4218 ASSERT_TRUE(s.IsNotFound());
4219
4220 // Verify that A is locked
4221 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4222 s = txn2->Put("A", "a");
4223 ASSERT_TRUE(s.IsTimedOut());
4224
4225 txn1->UndoGetForUpdate("A");
4226
4227 // Verify that A is now unlocked
4228 s = txn2->Put("A", "a2");
4229 ASSERT_OK(s);
20effc67 4230 ASSERT_OK(txn2->Commit());
7c673cae
FG
4231 delete txn2;
4232 s = db->Get(read_options, "A", &value);
4233 ASSERT_OK(s);
4234 ASSERT_EQ("a2", value);
4235
4236 s = txn1->Delete("A");
4237 ASSERT_OK(s);
4238 s = txn1->GetForUpdate(read_options, "A", &value);
4239 ASSERT_TRUE(s.IsNotFound());
4240
4241 s = txn1->Put("B", "b3");
4242 ASSERT_OK(s);
4243 s = txn1->GetForUpdate(read_options, "B", &value);
4244 ASSERT_OK(s);
4245
4246 txn1->UndoGetForUpdate("A");
4247 txn1->UndoGetForUpdate("B");
4248
4249 // Verify that A and B are still locked
4250 txn2 = db->BeginTransaction(write_options, txn_options);
4251 s = txn2->Put("A", "a4");
4252 ASSERT_TRUE(s.IsTimedOut());
4253 s = txn2->Put("B", "b4");
4254 ASSERT_TRUE(s.IsTimedOut());
4255
20effc67 4256 ASSERT_OK(txn1->Rollback());
7c673cae
FG
4257 delete txn1;
4258
4259 // Verify that A and B are no longer locked
4260 s = txn2->Put("A", "a5");
4261 ASSERT_OK(s);
4262 s = txn2->Put("B", "b5");
4263 ASSERT_OK(s);
4264 s = txn2->Commit();
4265 delete txn2;
4266 ASSERT_OK(s);
4267
4268 txn1 = db->BeginTransaction(write_options, txn_options);
4269
4270 s = txn1->GetForUpdate(read_options, "A", &value);
4271 ASSERT_OK(s);
4272 s = txn1->GetForUpdate(read_options, "A", &value);
4273 ASSERT_OK(s);
4274 s = txn1->GetForUpdate(read_options, "C", &value);
4275 ASSERT_TRUE(s.IsNotFound());
4276 s = txn1->GetForUpdate(read_options, "A", &value);
4277 ASSERT_OK(s);
4278 s = txn1->GetForUpdate(read_options, "C", &value);
4279 ASSERT_TRUE(s.IsNotFound());
4280 s = txn1->GetForUpdate(read_options, "B", &value);
4281 ASSERT_OK(s);
4282 s = txn1->Put("B", "b5");
4283 s = txn1->GetForUpdate(read_options, "B", &value);
4284 ASSERT_OK(s);
4285
4286 txn1->UndoGetForUpdate("A");
4287 txn1->UndoGetForUpdate("B");
4288 txn1->UndoGetForUpdate("C");
4289 txn1->UndoGetForUpdate("X");
4290
4291 // Verify A,B,C are locked
4292 txn2 = db->BeginTransaction(write_options, txn_options);
4293 s = txn2->Put("A", "a6");
4294 ASSERT_TRUE(s.IsTimedOut());
4295 s = txn2->Delete("B");
4296 ASSERT_TRUE(s.IsTimedOut());
4297 s = txn2->Put("C", "c6");
4298 ASSERT_TRUE(s.IsTimedOut());
4299 s = txn2->Put("X", "x6");
4300 ASSERT_OK(s);
4301
4302 txn1->UndoGetForUpdate("A");
4303 txn1->UndoGetForUpdate("B");
4304 txn1->UndoGetForUpdate("C");
4305 txn1->UndoGetForUpdate("X");
4306
4307 // Verify A,B are locked and C is not
4308 s = txn2->Put("A", "a6");
4309 ASSERT_TRUE(s.IsTimedOut());
4310 s = txn2->Delete("B");
4311 ASSERT_TRUE(s.IsTimedOut());
4312 s = txn2->Put("C", "c6");
4313 ASSERT_OK(s);
4314 s = txn2->Put("X", "x6");
4315 ASSERT_OK(s);
4316
4317 txn1->UndoGetForUpdate("A");
4318 txn1->UndoGetForUpdate("B");
4319 txn1->UndoGetForUpdate("C");
4320 txn1->UndoGetForUpdate("X");
4321
4322 // Verify B is locked and A and C are not
4323 s = txn2->Put("A", "a7");
4324 ASSERT_OK(s);
4325 s = txn2->Delete("B");
4326 ASSERT_TRUE(s.IsTimedOut());
4327 s = txn2->Put("C", "c7");
4328 ASSERT_OK(s);
4329 s = txn2->Put("X", "x7");
4330 ASSERT_OK(s);
4331
4332 s = txn2->Commit();
4333 ASSERT_OK(s);
4334 delete txn2;
4335
4336 s = txn1->Commit();
4337 ASSERT_OK(s);
4338 delete txn1;
4339}
4340
4341TEST_P(TransactionTest, UndoGetForUpdateTest2) {
4342 WriteOptions write_options;
4343 ReadOptions read_options;
4344 TransactionOptions txn_options;
11fdf7f2 4345 std::string value;
7c673cae
FG
4346 Status s;
4347
4348 s = db->Put(write_options, "A", "");
4349 ASSERT_OK(s);
4350
4351 txn_options.lock_timeout = 1; // 1 ms
4352 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
4353 ASSERT_TRUE(txn1);
4354
4355 s = txn1->GetForUpdate(read_options, "A", &value);
4356 ASSERT_OK(s);
4357 s = txn1->GetForUpdate(read_options, "B", &value);
4358 ASSERT_TRUE(s.IsNotFound());
4359
4360 s = txn1->Put("F", "f");
4361 ASSERT_OK(s);
4362
4363 txn1->SetSavePoint(); // 1
4364
4365 txn1->UndoGetForUpdate("A");
4366
4367 s = txn1->GetForUpdate(read_options, "C", &value);
4368 ASSERT_TRUE(s.IsNotFound());
4369 s = txn1->GetForUpdate(read_options, "D", &value);
4370 ASSERT_TRUE(s.IsNotFound());
4371
4372 s = txn1->Put("E", "e");
4373 ASSERT_OK(s);
4374 s = txn1->GetForUpdate(read_options, "E", &value);
4375 ASSERT_OK(s);
4376
4377 s = txn1->GetForUpdate(read_options, "F", &value);
4378 ASSERT_OK(s);
4379
4380 // Verify A,B,C,D,E,F are still locked
4381 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4382 s = txn2->Put("A", "a1");
4383 ASSERT_TRUE(s.IsTimedOut());
4384 s = txn2->Put("B", "b1");
4385 ASSERT_TRUE(s.IsTimedOut());
4386 s = txn2->Put("C", "c1");
4387 ASSERT_TRUE(s.IsTimedOut());
4388 s = txn2->Put("D", "d1");
4389 ASSERT_TRUE(s.IsTimedOut());
4390 s = txn2->Put("E", "e1");
4391 ASSERT_TRUE(s.IsTimedOut());
4392 s = txn2->Put("F", "f1");
4393 ASSERT_TRUE(s.IsTimedOut());
4394
4395 txn1->UndoGetForUpdate("C");
4396 txn1->UndoGetForUpdate("E");
4397
4398 // Verify A,B,D,E,F are still locked and C is not.
4399 s = txn2->Put("A", "a2");
4400 ASSERT_TRUE(s.IsTimedOut());
4401 s = txn2->Put("B", "b2");
4402 ASSERT_TRUE(s.IsTimedOut());
4403 s = txn2->Put("D", "d2");
4404 ASSERT_TRUE(s.IsTimedOut());
4405 s = txn2->Put("E", "e2");
4406 ASSERT_TRUE(s.IsTimedOut());
4407 s = txn2->Put("F", "f2");
4408 ASSERT_TRUE(s.IsTimedOut());
4409 s = txn2->Put("C", "c2");
4410 ASSERT_OK(s);
4411
4412 txn1->SetSavePoint(); // 2
4413
4414 s = txn1->Put("H", "h");
4415 ASSERT_OK(s);
4416
4417 txn1->UndoGetForUpdate("A");
4418 txn1->UndoGetForUpdate("B");
4419 txn1->UndoGetForUpdate("C");
4420 txn1->UndoGetForUpdate("D");
4421 txn1->UndoGetForUpdate("E");
4422 txn1->UndoGetForUpdate("F");
4423 txn1->UndoGetForUpdate("G");
4424 txn1->UndoGetForUpdate("H");
4425
4426 // Verify A,B,D,E,F,H are still locked and C,G are not.
4427 s = txn2->Put("A", "a3");
4428 ASSERT_TRUE(s.IsTimedOut());
4429 s = txn2->Put("B", "b3");
4430 ASSERT_TRUE(s.IsTimedOut());
4431 s = txn2->Put("D", "d3");
4432 ASSERT_TRUE(s.IsTimedOut());
4433 s = txn2->Put("E", "e3");
4434 ASSERT_TRUE(s.IsTimedOut());
4435 s = txn2->Put("F", "f3");
4436 ASSERT_TRUE(s.IsTimedOut());
4437 s = txn2->Put("H", "h3");
4438 ASSERT_TRUE(s.IsTimedOut());
4439 s = txn2->Put("C", "c3");
4440 ASSERT_OK(s);
4441 s = txn2->Put("G", "g3");
4442 ASSERT_OK(s);
4443
20effc67 4444 ASSERT_OK(txn1->RollbackToSavePoint()); // rollback to 2
7c673cae
FG
4445
4446 // Verify A,B,D,E,F are still locked and C,G,H are not.
4447 s = txn2->Put("A", "a3");
4448 ASSERT_TRUE(s.IsTimedOut());
4449 s = txn2->Put("B", "b3");
4450 ASSERT_TRUE(s.IsTimedOut());
4451 s = txn2->Put("D", "d3");
4452 ASSERT_TRUE(s.IsTimedOut());
4453 s = txn2->Put("E", "e3");
4454 ASSERT_TRUE(s.IsTimedOut());
4455 s = txn2->Put("F", "f3");
4456 ASSERT_TRUE(s.IsTimedOut());
4457 s = txn2->Put("C", "c3");
4458 ASSERT_OK(s);
4459 s = txn2->Put("G", "g3");
4460 ASSERT_OK(s);
4461 s = txn2->Put("H", "h3");
4462 ASSERT_OK(s);
4463
4464 txn1->UndoGetForUpdate("A");
4465 txn1->UndoGetForUpdate("B");
4466 txn1->UndoGetForUpdate("C");
4467 txn1->UndoGetForUpdate("D");
4468 txn1->UndoGetForUpdate("E");
4469 txn1->UndoGetForUpdate("F");
4470 txn1->UndoGetForUpdate("G");
4471 txn1->UndoGetForUpdate("H");
4472
4473 // Verify A,B,E,F are still locked and C,D,G,H are not.
4474 s = txn2->Put("A", "a3");
4475 ASSERT_TRUE(s.IsTimedOut());
4476 s = txn2->Put("B", "b3");
4477 ASSERT_TRUE(s.IsTimedOut());
4478 s = txn2->Put("E", "e3");
4479 ASSERT_TRUE(s.IsTimedOut());
4480 s = txn2->Put("F", "f3");
4481 ASSERT_TRUE(s.IsTimedOut());
4482 s = txn2->Put("C", "c3");
4483 ASSERT_OK(s);
4484 s = txn2->Put("D", "d3");
4485 ASSERT_OK(s);
4486 s = txn2->Put("G", "g3");
4487 ASSERT_OK(s);
4488 s = txn2->Put("H", "h3");
4489 ASSERT_OK(s);
4490
20effc67 4491 ASSERT_OK(txn1->RollbackToSavePoint()); // rollback to 1
7c673cae
FG
4492
4493 // Verify A,B,F are still locked and C,D,E,G,H are not.
4494 s = txn2->Put("A", "a3");
4495 ASSERT_TRUE(s.IsTimedOut());
4496 s = txn2->Put("B", "b3");
4497 ASSERT_TRUE(s.IsTimedOut());
4498 s = txn2->Put("F", "f3");
4499 ASSERT_TRUE(s.IsTimedOut());
4500 s = txn2->Put("C", "c3");
4501 ASSERT_OK(s);
4502 s = txn2->Put("D", "d3");
4503 ASSERT_OK(s);
4504 s = txn2->Put("E", "e3");
4505 ASSERT_OK(s);
4506 s = txn2->Put("G", "g3");
4507 ASSERT_OK(s);
4508 s = txn2->Put("H", "h3");
4509 ASSERT_OK(s);
4510
4511 txn1->UndoGetForUpdate("A");
4512 txn1->UndoGetForUpdate("B");
4513 txn1->UndoGetForUpdate("C");
4514 txn1->UndoGetForUpdate("D");
4515 txn1->UndoGetForUpdate("E");
4516 txn1->UndoGetForUpdate("F");
4517 txn1->UndoGetForUpdate("G");
4518 txn1->UndoGetForUpdate("H");
4519
4520 // Verify F is still locked and A,B,C,D,E,G,H are not.
4521 s = txn2->Put("F", "f3");
4522 ASSERT_TRUE(s.IsTimedOut());
4523 s = txn2->Put("A", "a3");
4524 ASSERT_OK(s);
4525 s = txn2->Put("B", "b3");
4526 ASSERT_OK(s);
4527 s = txn2->Put("C", "c3");
4528 ASSERT_OK(s);
4529 s = txn2->Put("D", "d3");
4530 ASSERT_OK(s);
4531 s = txn2->Put("E", "e3");
4532 ASSERT_OK(s);
4533 s = txn2->Put("G", "g3");
4534 ASSERT_OK(s);
4535 s = txn2->Put("H", "h3");
4536 ASSERT_OK(s);
4537
4538 s = txn1->Commit();
4539 ASSERT_OK(s);
4540 s = txn2->Commit();
4541 ASSERT_OK(s);
4542
4543 delete txn1;
4544 delete txn2;
4545}
4546
4547TEST_P(TransactionTest, TimeoutTest) {
4548 WriteOptions write_options;
4549 ReadOptions read_options;
11fdf7f2 4550 std::string value;
7c673cae
FG
4551 Status s;
4552
4553 delete db;
11fdf7f2 4554 db = nullptr;
7c673cae
FG
4555
4556 // transaction writes have an infinite timeout,
4557 // but we will override this when we start a txn
4558 // db writes have infinite timeout
4559 txn_db_options.transaction_lock_timeout = -1;
4560 txn_db_options.default_lock_timeout = -1;
4561
4562 s = TransactionDB::Open(options, txn_db_options, dbname, &db);
11fdf7f2 4563 assert(db != nullptr);
7c673cae
FG
4564 ASSERT_OK(s);
4565
4566 s = db->Put(write_options, "aaa", "aaa");
4567 ASSERT_OK(s);
4568
4569 TransactionOptions txn_options0;
4570 txn_options0.expiration = 100; // 100ms
4571 txn_options0.lock_timeout = 50; // txn timeout no longer infinite
4572 Transaction* txn1 = db->BeginTransaction(write_options, txn_options0);
4573
4574 s = txn1->GetForUpdate(read_options, "aaa", nullptr);
4575 ASSERT_OK(s);
4576
4577 // Conflicts with previous GetForUpdate.
4578 // Since db writes do not have a timeout, this should eventually succeed when
4579 // the transaction expires.
4580 s = db->Put(write_options, "aaa", "xxx");
4581 ASSERT_OK(s);
4582
4583 ASSERT_GE(txn1->GetElapsedTime(),
4584 static_cast<uint64_t>(txn_options0.expiration));
4585
4586 s = txn1->Commit();
4587 ASSERT_TRUE(s.IsExpired()); // expired!
4588
4589 s = db->Get(read_options, "aaa", &value);
4590 ASSERT_OK(s);
4591 ASSERT_EQ("xxx", value);
4592
4593 delete txn1;
4594 delete db;
4595
4596 // transaction writes have 10ms timeout,
4597 // db writes have infinite timeout
4598 txn_db_options.transaction_lock_timeout = 50;
4599 txn_db_options.default_lock_timeout = -1;
4600
4601 s = TransactionDB::Open(options, txn_db_options, dbname, &db);
4602 ASSERT_OK(s);
4603
4604 s = db->Put(write_options, "aaa", "aaa");
4605 ASSERT_OK(s);
4606
4607 TransactionOptions txn_options;
4608 txn_options.expiration = 100; // 100ms
4609 txn1 = db->BeginTransaction(write_options, txn_options);
4610
4611 s = txn1->GetForUpdate(read_options, "aaa", nullptr);
4612 ASSERT_OK(s);
4613
4614 // Conflicts with previous GetForUpdate.
4615 // Since db writes do not have a timeout, this should eventually succeed when
4616 // the transaction expires.
4617 s = db->Put(write_options, "aaa", "xxx");
4618 ASSERT_OK(s);
4619
4620 s = txn1->Commit();
4621 ASSERT_NOK(s); // expired!
4622
4623 s = db->Get(read_options, "aaa", &value);
4624 ASSERT_OK(s);
4625 ASSERT_EQ("xxx", value);
4626
4627 delete txn1;
4628 txn_options.expiration = 6000000; // 100 minutes
4629 txn_options.lock_timeout = 1; // 1ms
4630 txn1 = db->BeginTransaction(write_options, txn_options);
4631 txn1->SetLockTimeout(100);
4632
4633 TransactionOptions txn_options2;
4634 txn_options2.expiration = 10; // 10ms
4635 Transaction* txn2 = db->BeginTransaction(write_options, txn_options2);
4636 ASSERT_OK(s);
4637
4638 s = txn2->Put("a", "2");
4639 ASSERT_OK(s);
4640
4641 // txn1 has a lock timeout longer than txn2's expiration, so it will win
4642 s = txn1->Delete("a");
4643 ASSERT_OK(s);
4644
4645 s = txn1->Commit();
4646 ASSERT_OK(s);
4647
4648 // txn2 should be expired out since txn1 waiting until its timeout expired.
4649 s = txn2->Commit();
4650 ASSERT_TRUE(s.IsExpired());
4651
4652 delete txn1;
4653 delete txn2;
4654 txn_options.expiration = 6000000; // 100 minutes
4655 txn1 = db->BeginTransaction(write_options, txn_options);
4656 txn_options2.expiration = 100000000;
4657 txn2 = db->BeginTransaction(write_options, txn_options2);
4658
4659 s = txn1->Delete("asdf");
4660 ASSERT_OK(s);
4661
4662 // txn2 has a smaller lock timeout than txn1's expiration, so it will time out
4663 s = txn2->Delete("asdf");
4664 ASSERT_TRUE(s.IsTimedOut());
4665 ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
4666
4667 s = txn1->Commit();
4668 ASSERT_OK(s);
4669
4670 s = txn2->Put("asdf", "asdf");
4671 ASSERT_OK(s);
4672
4673 s = txn2->Commit();
4674 ASSERT_OK(s);
4675
4676 s = db->Get(read_options, "asdf", &value);
4677 ASSERT_OK(s);
4678 ASSERT_EQ("asdf", value);
4679
4680 delete txn1;
4681 delete txn2;
4682}
4683
4684TEST_P(TransactionTest, SingleDeleteTest) {
4685 WriteOptions write_options;
4686 ReadOptions read_options;
11fdf7f2 4687 std::string value;
7c673cae
FG
4688 Status s;
4689
4690 Transaction* txn = db->BeginTransaction(write_options);
4691 ASSERT_TRUE(txn);
4692
4693 s = txn->SingleDelete("A");
4694 ASSERT_OK(s);
4695
4696 s = txn->Get(read_options, "A", &value);
4697 ASSERT_TRUE(s.IsNotFound());
4698
4699 s = txn->Commit();
4700 ASSERT_OK(s);
4701 delete txn;
4702
4703 txn = db->BeginTransaction(write_options);
4704
4705 s = txn->SingleDelete("A");
4706 ASSERT_OK(s);
4707
4708 s = txn->Put("A", "a");
4709 ASSERT_OK(s);
4710
4711 s = txn->Get(read_options, "A", &value);
4712 ASSERT_OK(s);
4713 ASSERT_EQ("a", value);
4714
4715 s = txn->Commit();
4716 ASSERT_OK(s);
4717 delete txn;
4718
4719 s = db->Get(read_options, "A", &value);
4720 ASSERT_OK(s);
4721 ASSERT_EQ("a", value);
4722
4723 txn = db->BeginTransaction(write_options);
4724
4725 s = txn->SingleDelete("A");
4726 ASSERT_OK(s);
4727
4728 s = txn->Get(read_options, "A", &value);
4729 ASSERT_TRUE(s.IsNotFound());
4730
4731 s = txn->Commit();
4732 ASSERT_OK(s);
4733 delete txn;
4734
4735 s = db->Get(read_options, "A", &value);
4736 ASSERT_TRUE(s.IsNotFound());
4737
4738 txn = db->BeginTransaction(write_options);
4739 Transaction* txn2 = db->BeginTransaction(write_options);
4740 txn2->SetSnapshot();
4741
4742 s = txn->Put("A", "a");
4743 ASSERT_OK(s);
4744
4745 s = txn->Put("A", "a2");
4746 ASSERT_OK(s);
4747
4748 s = txn->SingleDelete("A");
4749 ASSERT_OK(s);
4750
4751 s = txn->SingleDelete("B");
4752 ASSERT_OK(s);
4753
4754 // According to db.h, doing a SingleDelete on a key that has been
4755 // overwritten will have undefinied behavior. So it is unclear what the
4756 // result of fetching "A" should be. The current implementation will
4757 // return NotFound in this case.
4758 s = txn->Get(read_options, "A", &value);
4759 ASSERT_TRUE(s.IsNotFound());
4760
4761 s = txn2->Put("B", "b");
4762 ASSERT_TRUE(s.IsTimedOut());
4763 s = txn2->Commit();
4764 ASSERT_OK(s);
4765 delete txn2;
4766
4767 s = txn->Commit();
4768 ASSERT_OK(s);
4769 delete txn;
4770
4771 // According to db.h, doing a SingleDelete on a key that has been
4772 // overwritten will have undefinied behavior. So it is unclear what the
4773 // result of fetching "A" should be. The current implementation will
4774 // return NotFound in this case.
4775 s = db->Get(read_options, "A", &value);
4776 ASSERT_TRUE(s.IsNotFound());
4777
4778 s = db->Get(read_options, "B", &value);
4779 ASSERT_TRUE(s.IsNotFound());
4780}
4781
4782TEST_P(TransactionTest, MergeTest) {
4783 WriteOptions write_options;
4784 ReadOptions read_options;
11fdf7f2 4785 std::string value;
7c673cae
FG
4786 Status s;
4787
4788 Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
4789 ASSERT_TRUE(txn);
4790
4791 s = db->Put(write_options, "A", "a0");
4792 ASSERT_OK(s);
4793
4794 s = txn->Merge("A", "1");
4795 ASSERT_OK(s);
4796
4797 s = txn->Merge("A", "2");
4798 ASSERT_OK(s);
4799
4800 s = txn->Get(read_options, "A", &value);
4801 ASSERT_TRUE(s.IsMergeInProgress());
4802
4803 s = txn->Put("A", "a");
4804 ASSERT_OK(s);
4805
4806 s = txn->Get(read_options, "A", &value);
4807 ASSERT_OK(s);
4808 ASSERT_EQ("a", value);
4809
4810 s = txn->Merge("A", "3");
4811 ASSERT_OK(s);
4812
4813 s = txn->Get(read_options, "A", &value);
4814 ASSERT_TRUE(s.IsMergeInProgress());
4815
4816 TransactionOptions txn_options;
4817 txn_options.lock_timeout = 1; // 1 ms
4818 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
4819 ASSERT_TRUE(txn2);
4820
4821 // verify that txn has "A" locked
4822 s = txn2->Merge("A", "4");
4823 ASSERT_TRUE(s.IsTimedOut());
4824
4825 s = txn2->Commit();
4826 ASSERT_OK(s);
4827 delete txn2;
4828
4829 s = txn->Commit();
4830 ASSERT_OK(s);
4831 delete txn;
4832
4833 s = db->Get(read_options, "A", &value);
4834 ASSERT_OK(s);
4835 ASSERT_EQ("a,3", value);
4836}
4837
20effc67
TL
4838TEST_P(TransactionTest, DeleteRangeSupportTest) {
4839 // The `DeleteRange()` API is banned everywhere.
4840 ASSERT_TRUE(
4841 db->DeleteRange(WriteOptions(), db->DefaultColumnFamily(), "a", "b")
4842 .IsNotSupported());
4843
4844 // But range deletions can be added via the `Write()` API by specifying the
4845 // proper flags to promise there are no conflicts according to the DB type
4846 // (see `TransactionDB::DeleteRange()` API doc for details).
4847 for (bool skip_concurrency_control : {false, true}) {
4848 for (bool skip_duplicate_key_check : {false, true}) {
4849 ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
4850 WriteBatch wb;
4851 ASSERT_OK(wb.DeleteRange("a", "b"));
4852 TransactionDBWriteOptimizations flags;
4853 flags.skip_concurrency_control = skip_concurrency_control;
4854 flags.skip_duplicate_key_check = skip_duplicate_key_check;
4855 Status s = db->Write(WriteOptions(), flags, &wb);
4856 std::string value;
4857 switch (txn_db_options.write_policy) {
4858 case WRITE_COMMITTED:
4859 if (skip_concurrency_control) {
4860 ASSERT_OK(s);
4861 ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
4862 } else {
4863 ASSERT_NOK(s);
4864 ASSERT_OK(db->Get(ReadOptions(), "a", &value));
4865 }
4866 break;
4867 case WRITE_PREPARED:
4868 // Intentional fall-through
4869 case WRITE_UNPREPARED:
4870 if (skip_concurrency_control && skip_duplicate_key_check) {
4871 ASSERT_OK(s);
4872 ASSERT_TRUE(db->Get(ReadOptions(), "a", &value).IsNotFound());
4873 } else {
4874 ASSERT_NOK(s);
4875 ASSERT_OK(db->Get(ReadOptions(), "a", &value));
4876 }
4877 break;
4878 }
4879 // Without any promises from the user, range deletion via other `Write()`
4880 // APIs are still banned.
4881 ASSERT_OK(db->Put(WriteOptions(), "a", "val"));
4882 ASSERT_NOK(db->Write(WriteOptions(), &wb));
4883 ASSERT_OK(db->Get(ReadOptions(), "a", &value));
4884 }
4885 }
4886}
4887
7c673cae
FG
4888TEST_P(TransactionTest, DeferSnapshotTest) {
4889 WriteOptions write_options;
4890 ReadOptions read_options;
11fdf7f2 4891 std::string value;
7c673cae
FG
4892 Status s;
4893
4894 s = db->Put(write_options, "A", "a0");
4895 ASSERT_OK(s);
4896
4897 Transaction* txn1 = db->BeginTransaction(write_options);
4898 Transaction* txn2 = db->BeginTransaction(write_options);
4899
4900 txn1->SetSnapshotOnNextOperation();
4901 auto snapshot = txn1->GetSnapshot();
4902 ASSERT_FALSE(snapshot);
4903
4904 s = txn2->Put("A", "a2");
4905 ASSERT_OK(s);
4906 s = txn2->Commit();
4907 ASSERT_OK(s);
4908 delete txn2;
4909
4910 s = txn1->GetForUpdate(read_options, "A", &value);
4911 // Should not conflict with txn2 since snapshot wasn't set until
4912 // GetForUpdate was called.
4913 ASSERT_OK(s);
4914 ASSERT_EQ("a2", value);
4915
4916 s = txn1->Put("A", "a1");
4917 ASSERT_OK(s);
4918
4919 s = db->Put(write_options, "B", "b0");
4920 ASSERT_OK(s);
4921
4922 // Cannot lock B since it was written after the snapshot was set
4923 s = txn1->Put("B", "b1");
4924 ASSERT_TRUE(s.IsBusy());
4925
4926 s = txn1->Commit();
4927 ASSERT_OK(s);
4928 delete txn1;
4929
4930 s = db->Get(read_options, "A", &value);
4931 ASSERT_OK(s);
4932 ASSERT_EQ("a1", value);
4933
4934 s = db->Get(read_options, "B", &value);
4935 ASSERT_OK(s);
4936 ASSERT_EQ("b0", value);
4937}
4938
4939TEST_P(TransactionTest, DeferSnapshotTest2) {
4940 WriteOptions write_options;
4941 ReadOptions read_options, snapshot_read_options;
11fdf7f2 4942 std::string value;
7c673cae
FG
4943 Status s;
4944
4945 Transaction* txn1 = db->BeginTransaction(write_options);
4946
4947 txn1->SetSnapshot();
4948
4949 s = txn1->Put("A", "a1");
4950 ASSERT_OK(s);
4951
4952 s = db->Put(write_options, "C", "c0");
4953 ASSERT_OK(s);
4954 s = db->Put(write_options, "D", "d0");
4955 ASSERT_OK(s);
4956
4957 snapshot_read_options.snapshot = txn1->GetSnapshot();
4958
4959 txn1->SetSnapshotOnNextOperation();
4960
4961 s = txn1->Get(snapshot_read_options, "C", &value);
4962 // Snapshot was set before C was written
4963 ASSERT_TRUE(s.IsNotFound());
4964 s = txn1->Get(snapshot_read_options, "D", &value);
4965 // Snapshot was set before D was written
4966 ASSERT_TRUE(s.IsNotFound());
4967
4968 // Snapshot should not have changed yet.
4969 snapshot_read_options.snapshot = txn1->GetSnapshot();
4970
4971 s = txn1->Get(snapshot_read_options, "C", &value);
4972 // Snapshot was set before C was written
4973 ASSERT_TRUE(s.IsNotFound());
4974 s = txn1->Get(snapshot_read_options, "D", &value);
4975 // Snapshot was set before D was written
4976 ASSERT_TRUE(s.IsNotFound());
4977
4978 s = txn1->GetForUpdate(read_options, "C", &value);
4979 ASSERT_OK(s);
4980 ASSERT_EQ("c0", value);
4981
4982 s = db->Put(write_options, "D", "d00");
4983 ASSERT_OK(s);
4984
4985 // Snapshot is now set
4986 snapshot_read_options.snapshot = txn1->GetSnapshot();
4987 s = txn1->Get(snapshot_read_options, "D", &value);
4988 ASSERT_OK(s);
4989 ASSERT_EQ("d0", value);
4990
4991 s = txn1->Commit();
4992 ASSERT_OK(s);
4993 delete txn1;
4994}
4995
4996TEST_P(TransactionTest, DeferSnapshotSavePointTest) {
4997 WriteOptions write_options;
4998 ReadOptions read_options, snapshot_read_options;
11fdf7f2 4999 std::string value;
7c673cae
FG
5000 Status s;
5001
5002 Transaction* txn1 = db->BeginTransaction(write_options);
5003
5004 txn1->SetSavePoint(); // 1
5005
5006 s = db->Put(write_options, "T", "1");
5007 ASSERT_OK(s);
5008
5009 txn1->SetSnapshotOnNextOperation();
5010
5011 s = db->Put(write_options, "T", "2");
5012 ASSERT_OK(s);
5013
5014 txn1->SetSavePoint(); // 2
5015
5016 s = db->Put(write_options, "T", "3");
5017 ASSERT_OK(s);
5018
5019 s = txn1->Put("A", "a");
5020 ASSERT_OK(s);
5021
5022 txn1->SetSavePoint(); // 3
5023
5024 s = db->Put(write_options, "T", "4");
5025 ASSERT_OK(s);
5026
5027 txn1->SetSnapshot();
5028 txn1->SetSnapshotOnNextOperation();
5029
5030 txn1->SetSavePoint(); // 4
5031
5032 s = db->Put(write_options, "T", "5");
5033 ASSERT_OK(s);
5034
5035 snapshot_read_options.snapshot = txn1->GetSnapshot();
5036 s = txn1->Get(snapshot_read_options, "T", &value);
5037 ASSERT_OK(s);
5038 ASSERT_EQ("4", value);
5039
5040 s = txn1->Put("A", "a1");
5041 ASSERT_OK(s);
5042
5043 snapshot_read_options.snapshot = txn1->GetSnapshot();
5044 s = txn1->Get(snapshot_read_options, "T", &value);
5045 ASSERT_OK(s);
5046 ASSERT_EQ("5", value);
5047
5048 s = txn1->RollbackToSavePoint(); // Rollback to 4
5049 ASSERT_OK(s);
5050
5051 snapshot_read_options.snapshot = txn1->GetSnapshot();
5052 s = txn1->Get(snapshot_read_options, "T", &value);
5053 ASSERT_OK(s);
5054 ASSERT_EQ("4", value);
5055
5056 s = txn1->RollbackToSavePoint(); // Rollback to 3
5057 ASSERT_OK(s);
5058
5059 snapshot_read_options.snapshot = txn1->GetSnapshot();
5060 s = txn1->Get(snapshot_read_options, "T", &value);
5061 ASSERT_OK(s);
5062 ASSERT_EQ("3", value);
5063
5064 s = txn1->Get(read_options, "T", &value);
5065 ASSERT_OK(s);
5066 ASSERT_EQ("5", value);
5067
5068 s = txn1->RollbackToSavePoint(); // Rollback to 2
5069 ASSERT_OK(s);
5070
5071 snapshot_read_options.snapshot = txn1->GetSnapshot();
5072 ASSERT_FALSE(snapshot_read_options.snapshot);
5073 s = txn1->Get(snapshot_read_options, "T", &value);
5074 ASSERT_OK(s);
5075 ASSERT_EQ("5", value);
5076
5077 s = txn1->Delete("A");
5078 ASSERT_OK(s);
5079
5080 snapshot_read_options.snapshot = txn1->GetSnapshot();
5081 ASSERT_TRUE(snapshot_read_options.snapshot);
5082 s = txn1->Get(snapshot_read_options, "T", &value);
5083 ASSERT_OK(s);
5084 ASSERT_EQ("5", value);
5085
5086 s = txn1->RollbackToSavePoint(); // Rollback to 1
5087 ASSERT_OK(s);
5088
5089 s = txn1->Delete("A");
5090 ASSERT_OK(s);
5091
5092 snapshot_read_options.snapshot = txn1->GetSnapshot();
5093 ASSERT_FALSE(snapshot_read_options.snapshot);
5094 s = txn1->Get(snapshot_read_options, "T", &value);
5095 ASSERT_OK(s);
5096 ASSERT_EQ("5", value);
5097
5098 s = txn1->Commit();
5099 ASSERT_OK(s);
5100
5101 delete txn1;
5102}
5103
5104TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) {
5105 WriteOptions write_options;
5106 ReadOptions read_options;
11fdf7f2 5107 std::string value;
7c673cae
FG
5108
5109 class Notifier : public TransactionNotifier {
5110 private:
5111 const Snapshot** snapshot_ptr_;
5112
5113 public:
5114 explicit Notifier(const Snapshot** snapshot_ptr)
5115 : snapshot_ptr_(snapshot_ptr) {}
5116
494da23a 5117 void SnapshotCreated(const Snapshot* newSnapshot) override {
7c673cae
FG
5118 *snapshot_ptr_ = newSnapshot;
5119 }
5120 };
5121
5122 std::shared_ptr<Notifier> notifier =
5123 std::make_shared<Notifier>(&read_options.snapshot);
5124 Status s;
5125
5126 s = db->Put(write_options, "B", "0");
5127 ASSERT_OK(s);
5128
5129 Transaction* txn1 = db->BeginTransaction(write_options);
5130
5131 txn1->SetSnapshotOnNextOperation(notifier);
5132 ASSERT_FALSE(read_options.snapshot);
5133
5134 s = db->Put(write_options, "B", "1");
5135 ASSERT_OK(s);
5136
5137 // A Get does not generate the snapshot
5138 s = txn1->Get(read_options, "B", &value);
5139 ASSERT_OK(s);
5140 ASSERT_FALSE(read_options.snapshot);
5141 ASSERT_EQ(value, "1");
5142
5143 // Any other operation does
5144 s = txn1->Put("A", "0");
5145 ASSERT_OK(s);
5146
5147 // Now change "B".
5148 s = db->Put(write_options, "B", "2");
5149 ASSERT_OK(s);
5150
5151 // The original value should still be read
5152 s = txn1->Get(read_options, "B", &value);
5153 ASSERT_OK(s);
5154 ASSERT_TRUE(read_options.snapshot);
5155 ASSERT_EQ(value, "1");
5156
5157 s = txn1->Commit();
5158 ASSERT_OK(s);
5159
5160 delete txn1;
5161}
5162
5163TEST_P(TransactionTest, ClearSnapshotTest) {
5164 WriteOptions write_options;
5165 ReadOptions read_options, snapshot_read_options;
11fdf7f2 5166 std::string value;
7c673cae
FG
5167 Status s;
5168
5169 s = db->Put(write_options, "foo", "0");
5170 ASSERT_OK(s);
5171
5172 Transaction* txn = db->BeginTransaction(write_options);
5173 ASSERT_TRUE(txn);
5174
5175 s = db->Put(write_options, "foo", "1");
5176 ASSERT_OK(s);
5177
5178 snapshot_read_options.snapshot = txn->GetSnapshot();
5179 ASSERT_FALSE(snapshot_read_options.snapshot);
5180
5181 // No snapshot created yet
5182 s = txn->Get(snapshot_read_options, "foo", &value);
5183 ASSERT_EQ(value, "1");
5184
5185 txn->SetSnapshot();
5186 snapshot_read_options.snapshot = txn->GetSnapshot();
5187 ASSERT_TRUE(snapshot_read_options.snapshot);
5188
5189 s = db->Put(write_options, "foo", "2");
5190 ASSERT_OK(s);
5191
5192 // Snapshot was created before change to '2'
5193 s = txn->Get(snapshot_read_options, "foo", &value);
5194 ASSERT_EQ(value, "1");
5195
5196 txn->ClearSnapshot();
5197 snapshot_read_options.snapshot = txn->GetSnapshot();
5198 ASSERT_FALSE(snapshot_read_options.snapshot);
5199
5200 // Snapshot has now been cleared
5201 s = txn->Get(snapshot_read_options, "foo", &value);
5202 ASSERT_EQ(value, "2");
5203
5204 s = txn->Commit();
5205 ASSERT_OK(s);
5206
5207 delete txn;
5208}
5209
5210TEST_P(TransactionTest, ToggleAutoCompactionTest) {
5211 Status s;
5212
7c673cae
FG
5213 ColumnFamilyHandle *cfa, *cfb;
5214 ColumnFamilyOptions cf_options;
5215
5216 // Create 2 new column families
5217 s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
5218 ASSERT_OK(s);
5219 s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
5220 ASSERT_OK(s);
5221
5222 delete cfa;
5223 delete cfb;
5224 delete db;
5225
5226 // open DB with three column families
5227 std::vector<ColumnFamilyDescriptor> column_families;
5228 // have to open default column family
5229 column_families.push_back(
5230 ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
5231 // open the new column families
5232 column_families.push_back(
5233 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
5234 column_families.push_back(
5235 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
5236
5237 ColumnFamilyOptions* cf_opt_default = &column_families[0].options;
5238 ColumnFamilyOptions* cf_opt_cfa = &column_families[1].options;
5239 ColumnFamilyOptions* cf_opt_cfb = &column_families[2].options;
5240 cf_opt_default->disable_auto_compactions = false;
5241 cf_opt_cfa->disable_auto_compactions = true;
5242 cf_opt_cfb->disable_auto_compactions = false;
5243
5244 std::vector<ColumnFamilyHandle*> handles;
5245
5246 s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
5247 &handles, &db);
5248 ASSERT_OK(s);
5249
20effc67 5250 auto cfh_default = static_cast_with_check<ColumnFamilyHandleImpl>(handles[0]);
7c673cae
FG
5251 auto opt_default = *cfh_default->cfd()->GetLatestMutableCFOptions();
5252
20effc67 5253 auto cfh_a = static_cast_with_check<ColumnFamilyHandleImpl>(handles[1]);
7c673cae
FG
5254 auto opt_a = *cfh_a->cfd()->GetLatestMutableCFOptions();
5255
20effc67 5256 auto cfh_b = static_cast_with_check<ColumnFamilyHandleImpl>(handles[2]);
7c673cae
FG
5257 auto opt_b = *cfh_b->cfd()->GetLatestMutableCFOptions();
5258
5259 ASSERT_EQ(opt_default.disable_auto_compactions, false);
5260 ASSERT_EQ(opt_a.disable_auto_compactions, true);
5261 ASSERT_EQ(opt_b.disable_auto_compactions, false);
5262
5263 for (auto handle : handles) {
5264 delete handle;
5265 }
5266}
5267
11fdf7f2 5268TEST_P(TransactionStressTest, ExpiredTransactionDataRace1) {
7c673cae
FG
5269 // In this test, txn1 should succeed committing,
5270 // as the callback is called after txn1 starts committing.
f67539c2 5271 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
7c673cae 5272 {{"TransactionTest::ExpirableTransactionDataRace:1"}});
f67539c2 5273 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2 5274 "TransactionTest::ExpirableTransactionDataRace:1", [&](void* /*arg*/) {
7c673cae
FG
5275 WriteOptions write_options;
5276 TransactionOptions txn_options;
5277
5278 // Force txn1 to expire
5279 /* sleep override */
5280 std::this_thread::sleep_for(std::chrono::milliseconds(150));
5281
5282 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
5283 Status s;
5284 s = txn2->Put("X", "2");
5285 ASSERT_TRUE(s.IsTimedOut());
5286 s = txn2->Commit();
5287 ASSERT_OK(s);
5288 delete txn2;
5289 });
5290
f67539c2 5291 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
5292
5293 WriteOptions write_options;
5294 TransactionOptions txn_options;
5295
5296 txn_options.expiration = 100;
5297 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
5298
5299 Status s;
5300 s = txn1->Put("X", "1");
5301 ASSERT_OK(s);
5302 s = txn1->Commit();
5303 ASSERT_OK(s);
5304
5305 ReadOptions read_options;
5306 string value;
5307 s = db->Get(read_options, "X", &value);
20effc67 5308 ASSERT_OK(s);
7c673cae
FG
5309 ASSERT_EQ("1", value);
5310
5311 delete txn1;
f67539c2 5312 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
5313}
5314
11fdf7f2 5315#ifndef ROCKSDB_VALGRIND_RUN
7c673cae 5316namespace {
494da23a
TL
5317// cmt_delay_ms is the delay between prepare and commit
5318// first_id is the id of the first transaction
5319Status TransactionStressTestInserter(
5320 TransactionDB* db, const size_t num_transactions, const size_t num_sets,
5321 const size_t num_keys_per_set, Random64* rand,
5322 const uint64_t cmt_delay_ms = 0, const uint64_t first_id = 0) {
7c673cae
FG
5323 WriteOptions write_options;
5324 ReadOptions read_options;
5325 TransactionOptions txn_options;
f67539c2
TL
5326 if (rand->OneIn(2)) {
5327 txn_options.use_only_the_last_commit_time_batch_for_recovery = true;
5328 }
494da23a
TL
5329 // Inside the inserter we might also retake the snapshot. We do both since two
5330 // separte functions are engaged for each.
5331 txn_options.set_snapshot = rand->OneIn(2);
7c673cae 5332
494da23a
TL
5333 RandomTransactionInserter inserter(
5334 rand, write_options, read_options, num_keys_per_set,
5335 static_cast<uint16_t>(num_sets), cmt_delay_ms, first_id);
7c673cae
FG
5336
5337 for (size_t t = 0; t < num_transactions; t++) {
5338 bool success = inserter.TransactionDBInsert(db, txn_options);
5339 if (!success) {
5340 // unexpected failure
5341 return inserter.GetLastStatus();
5342 }
5343 }
20effc67 5344 inserter.GetLastStatus().PermitUncheckedError();
7c673cae
FG
5345
5346 // Make sure at least some of the transactions succeeded. It's ok if
5347 // some failed due to write-conflicts.
494da23a
TL
5348 if (num_transactions != 1 &&
5349 inserter.GetFailureCount() > num_transactions / 2) {
7c673cae
FG
5350 return Status::TryAgain("Too many transactions failed! " +
5351 std::to_string(inserter.GetFailureCount()) + " / " +
5352 std::to_string(num_transactions));
5353 }
5354
5355 return Status::OK();
5356}
5357} // namespace
5358
11fdf7f2
TL
5359// Worker threads add a number to a key from each set of keys. The checker
5360// threads verify that the sum of all keys in each set are equal.
5361TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
5362 // Small write buffer to trigger more compactions
5363 options.write_buffer_size = 1024;
20effc67
TL
5364 ASSERT_OK(ReOpenNoDelete());
5365 constexpr size_t num_workers = 4; // worker threads count
5366 constexpr size_t num_checkers = 2; // checker threads count
5367 constexpr size_t num_slow_checkers = 2; // checker threads emulating backups
5368 constexpr size_t num_slow_workers = 1; // slow worker threads count
5369 constexpr size_t num_transactions_per_thread = 10000;
5370 constexpr uint16_t num_sets = 3;
5371 constexpr size_t num_keys_per_set = 100;
7c673cae
FG
5372 // Setting the key-space to be 100 keys should cause enough write-conflicts
5373 // to make this test interesting.
5374
5375 std::vector<port::Thread> threads;
11fdf7f2 5376 std::atomic<uint32_t> finished = {0};
20effc67 5377 constexpr bool TAKE_SNAPSHOT = true;
494da23a
TL
5378 uint64_t time_seed = env->NowMicros();
5379 printf("time_seed is %" PRIu64 "\n", time_seed); // would help to reproduce
7c673cae
FG
5380
5381 std::function<void()> call_inserter = [&] {
494da23a
TL
5382 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5383 Random64 rand(time_seed * thd_seed);
7c673cae 5384 ASSERT_OK(TransactionStressTestInserter(db, num_transactions_per_thread,
494da23a 5385 num_sets, num_keys_per_set, &rand));
11fdf7f2
TL
5386 finished++;
5387 };
5388 std::function<void()> call_checker = [&] {
494da23a
TL
5389 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5390 Random64 rand(time_seed * thd_seed);
11fdf7f2
TL
5391 // Verify that data is consistent
5392 while (finished < num_workers) {
20effc67
TL
5393 ASSERT_OK(RandomTransactionInserter::Verify(
5394 db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand));
11fdf7f2 5395 }
7c673cae 5396 };
494da23a
TL
5397 std::function<void()> call_slow_checker = [&] {
5398 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5399 Random64 rand(time_seed * thd_seed);
5400 // Verify that data is consistent
5401 while (finished < num_workers) {
5402 uint64_t delay_ms = rand.Uniform(100) + 1;
5403 Status s = RandomTransactionInserter::Verify(
5404 db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand, delay_ms);
5405 ASSERT_OK(s);
5406 }
5407 };
5408 std::function<void()> call_slow_inserter = [&] {
5409 size_t thd_seed = std::hash<std::thread::id>()(std::this_thread::get_id());
5410 Random64 rand(time_seed * thd_seed);
5411 uint64_t id = 0;
5412 // Verify that data is consistent
5413 while (finished < num_workers) {
5414 uint64_t delay_ms = rand.Uniform(500) + 1;
5415 ASSERT_OK(TransactionStressTestInserter(db, 1, num_sets, num_keys_per_set,
5416 &rand, delay_ms, id++));
5417 }
5418 };
7c673cae 5419
11fdf7f2 5420 for (uint32_t i = 0; i < num_workers; i++) {
7c673cae
FG
5421 threads.emplace_back(call_inserter);
5422 }
11fdf7f2
TL
5423 for (uint32_t i = 0; i < num_checkers; i++) {
5424 threads.emplace_back(call_checker);
5425 }
494da23a
TL
5426 if (with_slow_threads_) {
5427 for (uint32_t i = 0; i < num_slow_checkers; i++) {
5428 threads.emplace_back(call_slow_checker);
5429 }
5430 for (uint32_t i = 0; i < num_slow_workers; i++) {
5431 threads.emplace_back(call_slow_inserter);
5432 }
5433 }
7c673cae 5434
11fdf7f2 5435 // Wait for all threads to finish
7c673cae
FG
5436 for (auto& t : threads) {
5437 t.join();
5438 }
5439
5440 // Verify that data is consistent
11fdf7f2
TL
5441 Status s = RandomTransactionInserter::Verify(db, num_sets, num_keys_per_set,
5442 !TAKE_SNAPSHOT);
7c673cae
FG
5443 ASSERT_OK(s);
5444}
11fdf7f2 5445#endif // ROCKSDB_VALGRIND_RUN
7c673cae
FG
5446
5447TEST_P(TransactionTest, MemoryLimitTest) {
5448 TransactionOptions txn_options;
5449 // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data.
5450 txn_options.max_write_batch_size = 29;
f67539c2
TL
5451 // Set threshold to unlimited so that the write batch does not get flushed,
5452 // and can hit the memory limit.
5453 txn_options.write_batch_flush_threshold = 0;
11fdf7f2 5454 std::string value;
7c673cae
FG
5455 Status s;
5456
5457 Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options);
5458 ASSERT_TRUE(txn);
5459
5460 ASSERT_EQ(0, txn->GetNumPuts());
5461 ASSERT_LE(0, txn->GetID());
5462
5463 s = txn->Put(Slice("a"), Slice("...."));
5464 ASSERT_OK(s);
5465 ASSERT_EQ(1, txn->GetNumPuts());
5466
5467 s = txn->Put(Slice("b"), Slice("...."));
5468 ASSERT_OK(s);
5469 ASSERT_EQ(2, txn->GetNumPuts());
5470
5471 s = txn->Put(Slice("b"), Slice("...."));
f67539c2
TL
5472 ASSERT_TRUE(s.IsMemoryLimit());
5473 ASSERT_EQ(2, txn->GetNumPuts());
7c673cae 5474
20effc67 5475 ASSERT_OK(txn->Rollback());
7c673cae
FG
5476 delete txn;
5477}
5478
11fdf7f2
TL
5479// This test clarifies the existing expectation from the sequence number
5480// algorithm. It could detect mistakes in updating the code but it is not
5481// necessarily the one acceptable way. If the algorithm is legitimately changed,
5482// this unit test should be updated as well.
5483TEST_P(TransactionStressTest, SeqAdvanceTest) {
5484 // TODO(myabandeh): must be test with false before new releases
5485 const bool short_test = true;
5486 WriteOptions wopts;
5487 FlushOptions fopt;
5488
5489 options.disable_auto_compactions = true;
5490 ASSERT_OK(ReOpen());
5491
5492 // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some
5493 // of the branches. This is the same as counting a binary number where i-th
5494 // bit represents whether we take branch i in the represented by the number.
5495 const size_t NUM_BRANCHES = short_test ? 6 : 10;
5496 // Helper function that shows if the branch is to be taken in the run
5497 // represented by the number n.
5498 auto branch_do = [&](size_t n, size_t* branch) {
5499 assert(*branch < NUM_BRANCHES);
5500 const size_t filter = static_cast<size_t>(1) << *branch;
5501 return n & filter;
5502 };
5503 const size_t max_n = static_cast<size_t>(1) << NUM_BRANCHES;
5504 for (size_t n = 0; n < max_n; n++) {
20effc67 5505 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
11fdf7f2
TL
5506 size_t branch = 0;
5507 auto seq = db_impl->GetLatestSequenceNumber();
5508 exp_seq = seq;
5509 txn_t0(0);
5510 seq = db_impl->TEST_GetLastVisibleSequence();
5511 ASSERT_EQ(exp_seq, seq);
5512
5513 if (branch_do(n, &branch)) {
5514 ASSERT_OK(db_impl->Flush(fopt));
5515 seq = db_impl->TEST_GetLastVisibleSequence();
5516 ASSERT_EQ(exp_seq, seq);
5517 }
5518 if (!short_test && branch_do(n, &branch)) {
5519 ASSERT_OK(db_impl->FlushWAL(true));
5520 ASSERT_OK(ReOpenNoDelete());
20effc67 5521 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
11fdf7f2
TL
5522 seq = db_impl->GetLatestSequenceNumber();
5523 ASSERT_EQ(exp_seq, seq);
5524 }
5525
5526 // Doing it twice might detect some bugs
5527 txn_t0(1);
5528 seq = db_impl->TEST_GetLastVisibleSequence();
5529 ASSERT_EQ(exp_seq, seq);
5530
5531 txn_t1(0);
5532 seq = db_impl->TEST_GetLastVisibleSequence();
5533 ASSERT_EQ(exp_seq, seq);
5534
5535 if (branch_do(n, &branch)) {
5536 ASSERT_OK(db_impl->Flush(fopt));
5537 seq = db_impl->TEST_GetLastVisibleSequence();
5538 ASSERT_EQ(exp_seq, seq);
5539 }
5540 if (!short_test && branch_do(n, &branch)) {
5541 ASSERT_OK(db_impl->FlushWAL(true));
5542 ASSERT_OK(ReOpenNoDelete());
20effc67 5543 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
11fdf7f2
TL
5544 seq = db_impl->GetLatestSequenceNumber();
5545 ASSERT_EQ(exp_seq, seq);
5546 }
5547
5548 txn_t3(0);
5549 seq = db_impl->TEST_GetLastVisibleSequence();
5550 ASSERT_EQ(exp_seq, seq);
5551
5552 if (branch_do(n, &branch)) {
5553 ASSERT_OK(db_impl->Flush(fopt));
5554 seq = db_impl->TEST_GetLastVisibleSequence();
5555 ASSERT_EQ(exp_seq, seq);
5556 }
5557 if (!short_test && branch_do(n, &branch)) {
5558 ASSERT_OK(db_impl->FlushWAL(true));
5559 ASSERT_OK(ReOpenNoDelete());
20effc67 5560 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
11fdf7f2
TL
5561 seq = db_impl->GetLatestSequenceNumber();
5562 ASSERT_EQ(exp_seq, seq);
5563 }
5564
5565 txn_t4(0);
5566 seq = db_impl->TEST_GetLastVisibleSequence();
5567
5568 ASSERT_EQ(exp_seq, seq);
5569
5570 if (branch_do(n, &branch)) {
5571 ASSERT_OK(db_impl->Flush(fopt));
5572 seq = db_impl->TEST_GetLastVisibleSequence();
5573 ASSERT_EQ(exp_seq, seq);
5574 }
5575 if (!short_test && branch_do(n, &branch)) {
5576 ASSERT_OK(db_impl->FlushWAL(true));
5577 ASSERT_OK(ReOpenNoDelete());
20effc67 5578 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
11fdf7f2
TL
5579 seq = db_impl->GetLatestSequenceNumber();
5580 ASSERT_EQ(exp_seq, seq);
5581 }
5582
5583 txn_t2(0);
5584 seq = db_impl->TEST_GetLastVisibleSequence();
5585 ASSERT_EQ(exp_seq, seq);
5586
5587 if (branch_do(n, &branch)) {
5588 ASSERT_OK(db_impl->Flush(fopt));
5589 seq = db_impl->TEST_GetLastVisibleSequence();
5590 ASSERT_EQ(exp_seq, seq);
5591 }
5592 if (!short_test && branch_do(n, &branch)) {
5593 ASSERT_OK(db_impl->FlushWAL(true));
5594 ASSERT_OK(ReOpenNoDelete());
20effc67 5595 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
11fdf7f2
TL
5596 seq = db_impl->GetLatestSequenceNumber();
5597 ASSERT_EQ(exp_seq, seq);
5598 }
5599 ASSERT_OK(ReOpen());
5600 }
5601}
5602
5603// Verify that the optimization would not compromize the correctness
5604TEST_P(TransactionTest, Optimizations) {
5605 size_t comb_cnt = size_t(1) << 2; // 2 is number of optimization vars
5606 for (size_t new_comb = 0; new_comb < comb_cnt; new_comb++) {
5607 TransactionDBWriteOptimizations optimizations;
5608 optimizations.skip_concurrency_control = IsInCombination(0, new_comb);
5609 optimizations.skip_duplicate_key_check = IsInCombination(1, new_comb);
5610
5611 ASSERT_OK(ReOpen());
5612 WriteOptions write_options;
5613 WriteBatch batch;
20effc67 5614 ASSERT_OK(batch.Put(Slice("k"), Slice("v1")));
11fdf7f2
TL
5615 ASSERT_OK(db->Write(write_options, &batch));
5616
5617 ReadOptions ropt;
5618 PinnableSlice pinnable_val;
5619 ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "k", &pinnable_val));
5620 ASSERT_TRUE(pinnable_val == ("v1"));
5621 }
5622}
5623
5624// A comparator that uses only the first three bytes
5625class ThreeBytewiseComparator : public Comparator {
5626 public:
5627 ThreeBytewiseComparator() {}
494da23a
TL
5628 const char* Name() const override { return "test.ThreeBytewiseComparator"; }
5629 int Compare(const Slice& a, const Slice& b) const override {
11fdf7f2
TL
5630 Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
5631 Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
5632 return na.compare(nb);
5633 }
494da23a 5634 bool Equal(const Slice& a, const Slice& b) const override {
11fdf7f2
TL
5635 Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
5636 Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
5637 return na == nb;
5638 }
20effc67 5639 // These methods below don't seem relevant to this test. Implement them if
11fdf7f2
TL
5640 // proven othersize.
5641 void FindShortestSeparator(std::string* start,
5642 const Slice& limit) const override {
5643 const Comparator* bytewise_comp = BytewiseComparator();
5644 bytewise_comp->FindShortestSeparator(start, limit);
5645 }
5646 void FindShortSuccessor(std::string* key) const override {
5647 const Comparator* bytewise_comp = BytewiseComparator();
5648 bytewise_comp->FindShortSuccessor(key);
5649 }
5650};
5651
f67539c2
TL
5652#ifndef ROCKSDB_VALGRIND_RUN
5653TEST_P(TransactionTest, GetWithoutSnapshot) {
5654 WriteOptions write_options;
5655 std::atomic<bool> finish = {false};
20effc67 5656 ASSERT_OK(db->Put(write_options, "key", "value"));
f67539c2
TL
5657 ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
5658 for (int i = 0; i < 100; i++) {
5659 TransactionOptions txn_options;
5660 Transaction* txn = db->BeginTransaction(write_options, txn_options);
5661 ASSERT_OK(txn->SetName("xid"));
5662 ASSERT_OK(txn->Put("key", "overridedvalue"));
5663 ASSERT_OK(txn->Put("key", "value"));
5664 ASSERT_OK(txn->Prepare());
5665 ASSERT_OK(txn->Commit());
5666 delete txn;
5667 }
5668 finish = true;
5669 });
5670 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
5671 while (!finish) {
5672 ReadOptions ropt;
5673 PinnableSlice pinnable_val;
5674 ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val));
5675 ASSERT_TRUE(pinnable_val == ("value"));
5676 }
5677 });
5678 commit_thread.join();
5679 read_thread.join();
5680}
5681#endif // ROCKSDB_VALGRIND_RUN
5682
11fdf7f2
TL
5683// Test that the transactional db can handle duplicate keys in the write batch
5684TEST_P(TransactionTest, DuplicateKeys) {
5685 ColumnFamilyOptions cf_options;
5686 std::string cf_name = "two";
5687 ColumnFamilyHandle* cf_handle = nullptr;
5688 {
5689 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5690 WriteOptions write_options;
5691 WriteBatch batch;
20effc67
TL
5692 ASSERT_OK(batch.Put(Slice("key"), Slice("value")));
5693 ASSERT_OK(batch.Put(Slice("key2"), Slice("value2")));
11fdf7f2 5694 // duplicate the keys
20effc67 5695 ASSERT_OK(batch.Put(Slice("key"), Slice("value3")));
11fdf7f2
TL
5696 // duplicate the 2nd key. It should not be counted duplicate since a
5697 // sub-patch is cut after the last duplicate.
20effc67 5698 ASSERT_OK(batch.Put(Slice("key2"), Slice("value4")));
11fdf7f2
TL
5699 // duplicate the keys but in a different cf. It should not be counted as
5700 // duplicate keys
20effc67 5701 ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value5")));
11fdf7f2
TL
5702
5703 ASSERT_OK(db->Write(write_options, &batch));
5704
5705 ReadOptions ropt;
5706 PinnableSlice pinnable_val;
5707 auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
5708 ASSERT_OK(s);
5709 ASSERT_TRUE(pinnable_val == ("value3"));
5710 s = db->Get(ropt, db->DefaultColumnFamily(), "key2", &pinnable_val);
5711 ASSERT_OK(s);
5712 ASSERT_TRUE(pinnable_val == ("value4"));
5713 s = db->Get(ropt, cf_handle, "key", &pinnable_val);
5714 ASSERT_OK(s);
5715 ASSERT_TRUE(pinnable_val == ("value5"));
5716
5717 delete cf_handle;
5718 }
5719
5720 // Test with non-bytewise comparator
5721 {
5722 ASSERT_OK(ReOpen());
5723 std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
5724 cf_options.comparator = comp_gc.get();
5725 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5726 WriteOptions write_options;
5727 WriteBatch batch;
20effc67 5728 ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value")));
11fdf7f2 5729 // The first three bytes are the same, do it must be counted as duplicate
20effc67 5730 ASSERT_OK(batch.Put(cf_handle, Slice("key2"), Slice("value2")));
11fdf7f2 5731 // check for 2nd duplicate key in cf with non-default comparator
20effc67 5732 ASSERT_OK(batch.Put(cf_handle, Slice("key2b"), Slice("value2b")));
11fdf7f2
TL
5733 ASSERT_OK(db->Write(write_options, &batch));
5734
5735 // The value must be the most recent value for all the keys equal to "key",
5736 // including "key2"
5737 ReadOptions ropt;
5738 PinnableSlice pinnable_val;
5739 ASSERT_OK(db->Get(ropt, cf_handle, "key", &pinnable_val));
5740 ASSERT_TRUE(pinnable_val == ("value2b"));
5741
5742 // Test duplicate keys with rollback
5743 TransactionOptions txn_options;
5744 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5745 ASSERT_OK(txn0->SetName("xid"));
5746 ASSERT_OK(txn0->Put(cf_handle, Slice("key3"), Slice("value3")));
5747 ASSERT_OK(txn0->Merge(cf_handle, Slice("key4"), Slice("value4")));
5748 ASSERT_OK(txn0->Rollback());
5749 ASSERT_OK(db->Get(ropt, cf_handle, "key5", &pinnable_val));
5750 ASSERT_TRUE(pinnable_val == ("value2b"));
5751 delete txn0;
5752
5753 delete cf_handle;
5754 cf_options.comparator = BytewiseComparator();
5755 }
5756
5757 for (bool do_prepare : {true, false}) {
5758 for (bool do_rollback : {true, false}) {
5759 for (bool with_commit_batch : {true, false}) {
5760 if (with_commit_batch && !do_prepare) {
5761 continue;
5762 }
5763 if (with_commit_batch && do_rollback) {
5764 continue;
5765 }
5766 ASSERT_OK(ReOpen());
5767 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5768 TransactionOptions txn_options;
5769 txn_options.use_only_the_last_commit_time_batch_for_recovery = false;
5770 WriteOptions write_options;
5771 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5772 auto s = txn0->SetName("xid");
5773 ASSERT_OK(s);
5774 s = txn0->Put(Slice("foo0"), Slice("bar0a"));
5775 ASSERT_OK(s);
5776 s = txn0->Put(Slice("foo0"), Slice("bar0b"));
5777 ASSERT_OK(s);
5778 s = txn0->Put(Slice("foo1"), Slice("bar1"));
5779 ASSERT_OK(s);
5780 s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
5781 ASSERT_OK(s);
5782 // Repeat a key after the start of a sub-patch. This should not cause a
5783 // duplicate in the most recent sub-patch and hence not creating a new
5784 // sub-patch.
5785 s = txn0->Put(Slice("foo0"), Slice("bar0c"));
5786 ASSERT_OK(s);
5787 s = txn0->Merge(Slice("foo2"), Slice("bar2b"));
5788 ASSERT_OK(s);
5789 // duplicate the keys but in a different cf. It should not be counted as
5790 // duplicate.
5791 s = txn0->Put(cf_handle, Slice("foo0"), Slice("bar0-cf1"));
5792 ASSERT_OK(s);
5793 s = txn0->Put(Slice("foo3"), Slice("bar3"));
5794 ASSERT_OK(s);
5795 s = txn0->Merge(Slice("foo3"), Slice("bar3"));
5796 ASSERT_OK(s);
5797 s = txn0->Put(Slice("foo4"), Slice("bar4"));
5798 ASSERT_OK(s);
5799 s = txn0->Delete(Slice("foo4"));
5800 ASSERT_OK(s);
5801 s = txn0->SingleDelete(Slice("foo4"));
5802 ASSERT_OK(s);
5803 if (do_prepare) {
5804 s = txn0->Prepare();
5805 ASSERT_OK(s);
5806 }
5807 if (do_rollback) {
5808 // Test rolling back the batch with duplicates
5809 s = txn0->Rollback();
5810 ASSERT_OK(s);
5811 } else {
5812 if (with_commit_batch) {
5813 assert(do_prepare);
5814 auto cb = txn0->GetCommitTimeWriteBatch();
5815 // duplicate a key in the original batch
5816 // TODO(myabandeh): the behavior of GetCommitTimeWriteBatch
5817 // conflicting with the prepared batch is currently undefined and
5818 // gives different results in different implementations.
5819
5820 // s = cb->Put(Slice("foo0"), Slice("bar0d"));
5821 // ASSERT_OK(s);
5822 // add a new duplicate key
5823 s = cb->Put(Slice("foo6"), Slice("bar6a"));
5824 ASSERT_OK(s);
5825 s = cb->Put(Slice("foo6"), Slice("bar6b"));
5826 ASSERT_OK(s);
5827 // add a duplicate key that is removed in the same batch
5828 s = cb->Put(Slice("foo7"), Slice("bar7a"));
5829 ASSERT_OK(s);
5830 s = cb->Delete(Slice("foo7"));
5831 ASSERT_OK(s);
5832 }
5833 s = txn0->Commit();
5834 ASSERT_OK(s);
5835 }
5836 delete txn0;
5837 ReadOptions ropt;
5838 PinnableSlice pinnable_val;
5839
5840 if (do_rollback) {
5841 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5842 ASSERT_TRUE(s.IsNotFound());
5843 s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
5844 ASSERT_TRUE(s.IsNotFound());
5845 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5846 ASSERT_TRUE(s.IsNotFound());
5847 s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
5848 ASSERT_TRUE(s.IsNotFound());
5849 s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
5850 ASSERT_TRUE(s.IsNotFound());
5851 s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
5852 ASSERT_TRUE(s.IsNotFound());
5853 } else {
5854 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5855 ASSERT_OK(s);
5856 ASSERT_TRUE(pinnable_val == ("bar0c"));
5857 s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
5858 ASSERT_OK(s);
5859 ASSERT_TRUE(pinnable_val == ("bar0-cf1"));
5860 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5861 ASSERT_OK(s);
5862 ASSERT_TRUE(pinnable_val == ("bar1"));
5863 s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
5864 ASSERT_OK(s);
5865 ASSERT_TRUE(pinnable_val == ("bar2a,bar2b"));
5866 s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
5867 ASSERT_OK(s);
5868 ASSERT_TRUE(pinnable_val == ("bar3,bar3"));
5869 s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
5870 ASSERT_TRUE(s.IsNotFound());
5871 if (with_commit_batch) {
5872 s = db->Get(ropt, db->DefaultColumnFamily(), "foo6", &pinnable_val);
5873 ASSERT_OK(s);
5874 ASSERT_TRUE(pinnable_val == ("bar6b"));
5875 s = db->Get(ropt, db->DefaultColumnFamily(), "foo7", &pinnable_val);
5876 ASSERT_TRUE(s.IsNotFound());
5877 }
5878 }
5879 delete cf_handle;
5880 } // with_commit_batch
5881 } // do_rollback
5882 } // do_prepare
5883
f67539c2 5884 if (!options.unordered_write) {
11fdf7f2
TL
5885 // Also test with max_successive_merges > 0. max_successive_merges will not
5886 // affect our algorithm for duplicate key insertion but we add the test to
5887 // verify that.
5888 cf_options.max_successive_merges = 2;
5889 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
5890 ASSERT_OK(ReOpen());
5891 db->CreateColumnFamily(cf_options, cf_name, &cf_handle);
5892 WriteOptions write_options;
5893 // Ensure one value for the key
5894 ASSERT_OK(db->Put(write_options, cf_handle, Slice("key"), Slice("value")));
5895 WriteBatch batch;
5896 // Merge more than max_successive_merges times
20effc67
TL
5897 ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("1")));
5898 ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("2")));
5899 ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("3")));
5900 ASSERT_OK(batch.Merge(cf_handle, Slice("key"), Slice("4")));
11fdf7f2
TL
5901 ASSERT_OK(db->Write(write_options, &batch));
5902 ReadOptions read_options;
5903 string value;
5904 ASSERT_OK(db->Get(read_options, cf_handle, "key", &value));
5905 ASSERT_EQ(value, "value,1,2,3,4");
5906 delete cf_handle;
5907 }
5908
5909 {
5910 // Test that the duplicate detection is not compromised after rolling back
5911 // to a save point
5912 TransactionOptions txn_options;
5913 WriteOptions write_options;
5914 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
5915 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
5916 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
5917 txn0->SetSavePoint();
5918 ASSERT_OK(txn0->RollbackToSavePoint());
5919 ASSERT_OK(txn0->Commit());
5920 delete txn0;
5921 }
5922
5923 // Test sucessfull recovery after a crash
5924 {
5925 ASSERT_OK(ReOpen());
5926 TransactionOptions txn_options;
5927 WriteOptions write_options;
5928 ReadOptions ropt;
5929 Transaction* txn0;
5930 PinnableSlice pinnable_val;
5931 Status s;
5932
5933 std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
5934 cf_options.comparator = comp_gc.get();
f67539c2 5935 cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
11fdf7f2
TL
5936 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
5937 delete cf_handle;
5938 std::vector<ColumnFamilyDescriptor> cfds{
5939 ColumnFamilyDescriptor(kDefaultColumnFamilyName,
5940 ColumnFamilyOptions(options)),
5941 ColumnFamilyDescriptor(cf_name, cf_options),
5942 };
5943 std::vector<ColumnFamilyHandle*> handles;
5944 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5945
5946 ASSERT_OK(db->Put(write_options, "foo0", "init"));
5947 ASSERT_OK(db->Put(write_options, "foo1", "init"));
5948 ASSERT_OK(db->Put(write_options, handles[1], "foo0", "init"));
5949 ASSERT_OK(db->Put(write_options, handles[1], "foo1", "init"));
5950
5951 // one entry
5952 txn0 = db->BeginTransaction(write_options, txn_options);
5953 ASSERT_OK(txn0->SetName("xid"));
5954 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
5955 ASSERT_OK(txn0->Prepare());
5956 delete txn0;
5957 // This will check the asserts inside recovery code
5958 ASSERT_OK(db->FlushWAL(true));
5959 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
5960 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5961 txn0 = db->GetTransactionByName("xid");
5962 ASSERT_TRUE(txn0 != nullptr);
5963 ASSERT_OK(txn0->Commit());
5964 delete txn0;
5965 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5966 ASSERT_OK(s);
5967 ASSERT_TRUE(pinnable_val == ("bar0a"));
5968
5969 // two entries, no duplicate
5970 txn0 = db->BeginTransaction(write_options, txn_options);
5971 ASSERT_OK(txn0->SetName("xid"));
5972 ASSERT_OK(txn0->Put(handles[1], Slice("foo0"), Slice("bar0b")));
5973 ASSERT_OK(txn0->Put(handles[1], Slice("fol1"), Slice("bar1b")));
5974 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
5975 ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1b")));
5976 ASSERT_OK(txn0->Prepare());
5977 delete txn0;
5978 // This will check the asserts inside recovery code
20effc67 5979 ASSERT_OK(db->FlushWAL(true));
11fdf7f2 5980 // Flush only cf 1
20effc67
TL
5981 ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
5982 ->TEST_FlushMemTable(true, false, handles[1]));
11fdf7f2
TL
5983 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
5984 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
5985 txn0 = db->GetTransactionByName("xid");
5986 ASSERT_TRUE(txn0 != nullptr);
5987 ASSERT_OK(txn0->Commit());
5988 delete txn0;
5989 pinnable_val.Reset();
5990 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
5991 ASSERT_OK(s);
5992 ASSERT_TRUE(pinnable_val == ("bar0b"));
5993 pinnable_val.Reset();
5994 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
5995 ASSERT_OK(s);
5996 ASSERT_TRUE(pinnable_val == ("bar1b"));
5997 pinnable_val.Reset();
5998 s = db->Get(ropt, handles[1], "foo0", &pinnable_val);
5999 ASSERT_OK(s);
6000 ASSERT_TRUE(pinnable_val == ("bar0b"));
6001 pinnable_val.Reset();
6002 s = db->Get(ropt, handles[1], "fol1", &pinnable_val);
6003 ASSERT_OK(s);
6004 ASSERT_TRUE(pinnable_val == ("bar1b"));
6005
6006 // one duplicate with ::Put
6007 txn0 = db->BeginTransaction(write_options, txn_options);
6008 ASSERT_OK(txn0->SetName("xid"));
6009 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0c")));
6010 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey1"), Slice("bar1d")));
6011 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0c")));
6012 ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1c")));
6013 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0d")));
6014 ASSERT_OK(txn0->Prepare());
6015 delete txn0;
6016 // This will check the asserts inside recovery code
6017 ASSERT_OK(db->FlushWAL(true));
6018 // Flush only cf 1
20effc67
TL
6019 ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6020 ->TEST_FlushMemTable(true, false, handles[1]));
11fdf7f2
TL
6021 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6022 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6023 txn0 = db->GetTransactionByName("xid");
6024 ASSERT_TRUE(txn0 != nullptr);
6025 ASSERT_OK(txn0->Commit());
6026 delete txn0;
6027 pinnable_val.Reset();
6028 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6029 ASSERT_OK(s);
6030 ASSERT_TRUE(pinnable_val == ("bar0d"));
6031 pinnable_val.Reset();
6032 s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
6033 ASSERT_OK(s);
6034 ASSERT_TRUE(pinnable_val == ("bar1c"));
6035 pinnable_val.Reset();
6036 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6037 ASSERT_OK(s);
6038 ASSERT_TRUE(pinnable_val == ("bar1d"));
6039
6040 // Duplicate with ::Put, ::Delete
6041 txn0 = db->BeginTransaction(write_options, txn_options);
6042 ASSERT_OK(txn0->SetName("xid"));
6043 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0e")));
6044 ASSERT_OK(txn0->Delete(handles[1], Slice("key-nonkey1")));
6045 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e")));
6046 ASSERT_OK(txn0->Delete(Slice("foo0")));
6047 ASSERT_OK(txn0->Prepare());
6048 delete txn0;
6049 // This will check the asserts inside recovery code
6050 ASSERT_OK(db->FlushWAL(true));
6051 // Flush only cf 1
20effc67
TL
6052 ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6053 ->TEST_FlushMemTable(true, false, handles[1]));
11fdf7f2
TL
6054 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6055 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6056 txn0 = db->GetTransactionByName("xid");
6057 ASSERT_TRUE(txn0 != nullptr);
6058 ASSERT_OK(txn0->Commit());
6059 delete txn0;
6060 pinnable_val.Reset();
6061 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6062 ASSERT_TRUE(s.IsNotFound());
6063 pinnable_val.Reset();
6064 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6065 ASSERT_TRUE(s.IsNotFound());
6066
6067 // Duplicate with ::Put, ::SingleDelete
6068 txn0 = db->BeginTransaction(write_options, txn_options);
6069 ASSERT_OK(txn0->SetName("xid"));
6070 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0g")));
6071 ASSERT_OK(txn0->SingleDelete(handles[1], Slice("key-nonkey1")));
6072 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e")));
6073 ASSERT_OK(txn0->SingleDelete(Slice("foo0")));
6074 ASSERT_OK(txn0->Prepare());
6075 delete txn0;
6076 // This will check the asserts inside recovery code
6077 ASSERT_OK(db->FlushWAL(true));
6078 // Flush only cf 1
20effc67
TL
6079 ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6080 ->TEST_FlushMemTable(true, false, handles[1]));
11fdf7f2
TL
6081 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6082 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6083 txn0 = db->GetTransactionByName("xid");
6084 ASSERT_TRUE(txn0 != nullptr);
6085 ASSERT_OK(txn0->Commit());
6086 delete txn0;
6087 pinnable_val.Reset();
6088 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6089 ASSERT_TRUE(s.IsNotFound());
6090 pinnable_val.Reset();
6091 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6092 ASSERT_TRUE(s.IsNotFound());
6093
6094 // Duplicate with ::Put, ::Merge
6095 txn0 = db->BeginTransaction(write_options, txn_options);
6096 ASSERT_OK(txn0->SetName("xid"));
6097 ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar1i")));
6098 ASSERT_OK(txn0->Merge(handles[1], Slice("key-nonkey1"), Slice("bar1j")));
6099 ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0f")));
6100 ASSERT_OK(txn0->Merge(Slice("foo0"), Slice("bar0g")));
6101 ASSERT_OK(txn0->Prepare());
6102 delete txn0;
6103 // This will check the asserts inside recovery code
6104 ASSERT_OK(db->FlushWAL(true));
6105 // Flush only cf 1
20effc67
TL
6106 ASSERT_OK(static_cast_with_check<DBImpl>(db->GetRootDB())
6107 ->TEST_FlushMemTable(true, false, handles[1]));
11fdf7f2
TL
6108 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6109 ASSERT_OK(ReOpenNoDelete(cfds, &handles));
6110 txn0 = db->GetTransactionByName("xid");
6111 ASSERT_TRUE(txn0 != nullptr);
6112 ASSERT_OK(txn0->Commit());
6113 delete txn0;
6114 pinnable_val.Reset();
6115 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
6116 ASSERT_OK(s);
6117 ASSERT_TRUE(pinnable_val == ("bar0f,bar0g"));
6118 pinnable_val.Reset();
6119 s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val);
6120 ASSERT_OK(s);
6121 ASSERT_TRUE(pinnable_val == ("bar1i,bar1j"));
6122
6123 for (auto h : handles) {
6124 delete h;
6125 }
6126 delete db;
6127 db = nullptr;
6128 }
6129}
6130
f67539c2
TL
6131// Test that the reseek optimization in iterators will not result in an infinite
6132// loop if there are too many uncommitted entries before the snapshot.
6133TEST_P(TransactionTest, ReseekOptimization) {
6134 WriteOptions write_options;
6135 write_options.sync = true;
6136 write_options.disableWAL = false;
6137 ColumnFamilyDescriptor cfd;
20effc67 6138 ASSERT_OK(db->DefaultColumnFamily()->GetDescriptor(&cfd));
f67539c2
TL
6139 auto max_skip = cfd.options.max_sequential_skip_in_iterations;
6140
6141 ASSERT_OK(db->Put(write_options, Slice("foo0"), Slice("initv")));
6142
6143 TransactionOptions txn_options;
6144 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
6145 ASSERT_OK(txn0->SetName("xid"));
6146 // Duplicate keys will result into separate sequence numbers in WritePrepared
6147 // and WriteUnPrepared
6148 for (size_t i = 0; i < 2 * max_skip; i++) {
6149 ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar")));
6150 }
6151 ASSERT_OK(txn0->Prepare());
6152 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("initv")));
6153
6154 ReadOptions read_options;
6155 // To avoid loops
6156 read_options.max_skippable_internal_keys = 10 * max_skip;
6157 Iterator* iter = db->NewIterator(read_options);
6158 ASSERT_OK(iter->status());
6159 size_t cnt = 0;
6160 iter->SeekToFirst();
6161 while (iter->Valid()) {
6162 iter->Next();
6163 ASSERT_OK(iter->status());
6164 cnt++;
6165 }
6166 ASSERT_EQ(cnt, 2);
6167 cnt = 0;
6168 iter->SeekToLast();
6169 while (iter->Valid()) {
6170 iter->Prev();
6171 ASSERT_OK(iter->status());
6172 cnt++;
6173 }
6174 ASSERT_EQ(cnt, 2);
6175 delete iter;
20effc67 6176 ASSERT_OK(txn0->Rollback());
f67539c2
TL
6177 delete txn0;
6178}
6179
6180// After recovery in kPointInTimeRecovery mode, the corrupted log file remains
6181// there. The new log files should be still read succesfully during recovery of
6182// the 2nd crash.
6183TEST_P(TransactionTest, DoubleCrashInRecovery) {
6184 for (const bool manual_wal_flush : {false, true}) {
6185 for (const bool write_after_recovery : {false, true}) {
6186 options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
6187 options.manual_wal_flush = manual_wal_flush;
20effc67 6188 ASSERT_OK(ReOpen());
f67539c2
TL
6189 std::string cf_name = "two";
6190 ColumnFamilyOptions cf_options;
6191 ColumnFamilyHandle* cf_handle = nullptr;
6192 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
6193
6194 // Add a prepare entry to prevent the older logs from being deleted.
6195 WriteOptions write_options;
6196 TransactionOptions txn_options;
6197 Transaction* txn = db->BeginTransaction(write_options, txn_options);
6198 ASSERT_OK(txn->SetName("xid"));
6199 ASSERT_OK(txn->Put(Slice("foo-prepare"), Slice("bar-prepare")));
6200 ASSERT_OK(txn->Prepare());
6201
6202 FlushOptions flush_ops;
20effc67 6203 ASSERT_OK(db->Flush(flush_ops));
f67539c2
TL
6204 // Now we have a log that cannot be deleted
6205
6206 ASSERT_OK(db->Put(write_options, cf_handle, "foo1", "bar1"));
6207 // Flush only the 2nd cf
20effc67 6208 ASSERT_OK(db->Flush(flush_ops, cf_handle));
f67539c2
TL
6209
6210 // The value is large enough to be touched by the corruption we ingest
6211 // below.
6212 std::string large_value(400, ' ');
6213 // key/value not touched by corruption
6214 ASSERT_OK(db->Put(write_options, "foo2", "bar2"));
6215 // key/value touched by corruption
6216 ASSERT_OK(db->Put(write_options, "foo3", large_value));
6217 // key/value not touched by corruption
6218 ASSERT_OK(db->Put(write_options, "foo4", "bar4"));
6219
20effc67
TL
6220 ASSERT_OK(db->FlushWAL(true));
6221 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
f67539c2
TL
6222 uint64_t wal_file_id = db_impl->TEST_LogfileNumber();
6223 std::string fname = LogFileName(dbname, wal_file_id);
6224 reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
6225 delete txn;
6226 delete cf_handle;
6227 delete db;
6228 db = nullptr;
6229
6230 // Corrupt the last log file in the middle, so that it is not corrupted
6231 // in the tail.
6232 std::string file_content;
6233 ASSERT_OK(ReadFileToString(env, fname, &file_content));
6234 file_content[400] = 'h';
6235 file_content[401] = 'a';
6236 ASSERT_OK(env->DeleteFile(fname));
6237 ASSERT_OK(WriteStringToFile(env, file_content, fname, true));
6238
6239 // Recover from corruption
6240 std::vector<ColumnFamilyHandle*> handles;
6241 std::vector<ColumnFamilyDescriptor> column_families;
6242 column_families.push_back(ColumnFamilyDescriptor(kDefaultColumnFamilyName,
6243 ColumnFamilyOptions()));
6244 column_families.push_back(
6245 ColumnFamilyDescriptor("two", ColumnFamilyOptions()));
6246 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
6247
6248 if (write_after_recovery) {
6249 // Write data to the log right after the corrupted log
6250 ASSERT_OK(db->Put(write_options, "foo5", large_value));
6251 }
6252
6253 // Persist data written to WAL during recovery or by the last Put
20effc67 6254 ASSERT_OK(db->FlushWAL(true));
f67539c2
TL
6255 // 2nd crash to recover while having a valid log after the corrupted one.
6256 ASSERT_OK(ReOpenNoDelete(column_families, &handles));
6257 assert(db != nullptr);
6258 txn = db->GetTransactionByName("xid");
6259 ASSERT_TRUE(txn != nullptr);
6260 ASSERT_OK(txn->Commit());
6261 delete txn;
6262 for (auto handle : handles) {
6263 delete handle;
6264 }
6265 }
6266 }
6267}
6268
20effc67
TL
6269TEST_P(TransactionTest, CommitWithoutPrepare) {
6270 {
6271 // skip_prepare = false.
6272 WriteOptions write_options;
6273 TransactionOptions txn_options;
6274 txn_options.skip_prepare = false;
6275 Transaction* txn = db->BeginTransaction(write_options, txn_options);
6276 ASSERT_TRUE(txn->Commit().IsTxnNotPrepared());
6277 delete txn;
6278 }
6279
6280 {
6281 // skip_prepare = true.
6282 WriteOptions write_options;
6283 TransactionOptions txn_options;
6284 txn_options.skip_prepare = true;
6285 Transaction* txn = db->BeginTransaction(write_options, txn_options);
6286 ASSERT_OK(txn->Commit());
6287 delete txn;
6288 }
6289}
6290
f67539c2 6291} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
6292
6293int main(int argc, char** argv) {
6294 ::testing::InitGoogleTest(&argc, argv);
6295 return RUN_ALL_TESTS();
6296}
6297
6298#else
6299#include <stdio.h>
6300
11fdf7f2 6301int main(int /*argc*/, char** /*argv*/) {
7c673cae
FG
6302 fprintf(stderr,
6303 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
6304 return 0;
6305}
6306
6307#endif // ROCKSDB_LITE