]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_prepared_transaction_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_transaction_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include <algorithm>
9 #include <atomic>
10 #include <cinttypes>
11 #include <functional>
12 #include <string>
13 #include <thread>
14
15 #include "db/db_impl/db_impl.h"
16 #include "db/dbformat.h"
17 #include "port/port.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/options.h"
20 #include "rocksdb/types.h"
21 #include "rocksdb/utilities/debug.h"
22 #include "rocksdb/utilities/transaction.h"
23 #include "rocksdb/utilities/transaction_db.h"
24 #include "table/mock_table.h"
25 #include "test_util/sync_point.h"
26 #include "test_util/testharness.h"
27 #include "test_util/testutil.h"
28 #include "test_util/transaction_test_util.h"
29 #include "util/mutexlock.h"
30 #include "util/random.h"
31 #include "util/string_util.h"
32 #include "utilities/fault_injection_env.h"
33 #include "utilities/merge_operators.h"
34 #include "utilities/merge_operators/string_append/stringappend.h"
35 #include "utilities/transactions/pessimistic_transaction_db.h"
36 #include "utilities/transactions/transaction_test.h"
37 #include "utilities/transactions/write_prepared_txn_db.h"
38
39 using std::string;
40
41 namespace ROCKSDB_NAMESPACE {
42
43 using CommitEntry = WritePreparedTxnDB::CommitEntry;
44 using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b;
45 using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat;
46
47 TEST(PreparedHeap, BasicsTest) {
48 WritePreparedTxnDB::PreparedHeap heap;
49 {
50 MutexLock ml(heap.push_pop_mutex());
51 heap.push(14l);
52 // Test with one element
53 ASSERT_EQ(14l, heap.top());
54 heap.push(24l);
55 heap.push(34l);
56 // Test that old min is still on top
57 ASSERT_EQ(14l, heap.top());
58 heap.push(44l);
59 heap.push(54l);
60 heap.push(64l);
61 heap.push(74l);
62 heap.push(84l);
63 }
64 // Test that old min is still on top
65 ASSERT_EQ(14l, heap.top());
66 heap.erase(24l);
67 // Test that old min is still on top
68 ASSERT_EQ(14l, heap.top());
69 heap.erase(14l);
70 // Test that the new comes to the top after multiple erase
71 ASSERT_EQ(34l, heap.top());
72 heap.erase(34l);
73 // Test that the new comes to the top after single erase
74 ASSERT_EQ(44l, heap.top());
75 heap.erase(54l);
76 ASSERT_EQ(44l, heap.top());
77 heap.pop(); // pop 44l
78 // Test that the erased items are ignored after pop
79 ASSERT_EQ(64l, heap.top());
80 heap.erase(44l);
81 // Test that erasing an already popped item would work
82 ASSERT_EQ(64l, heap.top());
83 heap.erase(84l);
84 ASSERT_EQ(64l, heap.top());
85 {
86 MutexLock ml(heap.push_pop_mutex());
87 heap.push(85l);
88 heap.push(86l);
89 heap.push(87l);
90 heap.push(88l);
91 heap.push(89l);
92 }
93 heap.erase(87l);
94 heap.erase(85l);
95 heap.erase(89l);
96 heap.erase(86l);
97 heap.erase(88l);
98 // Test top remains the same after a random order of many erases
99 ASSERT_EQ(64l, heap.top());
100 heap.pop();
101 // Test that pop works with a series of random pending erases
102 ASSERT_EQ(74l, heap.top());
103 ASSERT_FALSE(heap.empty());
104 heap.pop();
105 // Test that empty works
106 ASSERT_TRUE(heap.empty());
107 }
108
109 // This is a scenario reconstructed from a buggy trace. Test that the bug does
110 // not resurface again.
111 TEST(PreparedHeap, EmptyAtTheEnd) {
112 WritePreparedTxnDB::PreparedHeap heap;
113 {
114 MutexLock ml(heap.push_pop_mutex());
115 heap.push(40l);
116 }
117 ASSERT_EQ(40l, heap.top());
118 // Although not a recommended scenario, we must be resilient against erase
119 // without a prior push.
120 heap.erase(50l);
121 ASSERT_EQ(40l, heap.top());
122 {
123 MutexLock ml(heap.push_pop_mutex());
124 heap.push(60l);
125 }
126 ASSERT_EQ(40l, heap.top());
127
128 heap.erase(60l);
129 ASSERT_EQ(40l, heap.top());
130 heap.erase(40l);
131 ASSERT_TRUE(heap.empty());
132
133 {
134 MutexLock ml(heap.push_pop_mutex());
135 heap.push(40l);
136 }
137 ASSERT_EQ(40l, heap.top());
138 heap.erase(50l);
139 ASSERT_EQ(40l, heap.top());
140 {
141 MutexLock ml(heap.push_pop_mutex());
142 heap.push(60l);
143 }
144 ASSERT_EQ(40l, heap.top());
145
146 heap.erase(40l);
147 // Test that the erase has not emptied the heap (we had a bug doing that)
148 ASSERT_FALSE(heap.empty());
149 ASSERT_EQ(60l, heap.top());
150 heap.erase(60l);
151 ASSERT_TRUE(heap.empty());
152 }
153
154 // Generate random order of PreparedHeap access and test that the heap will be
155 // successfully emptied at the end.
156 TEST(PreparedHeap, Concurrent) {
157 const size_t t_cnt = 10;
158 ROCKSDB_NAMESPACE::port::Thread t[t_cnt + 1];
159 WritePreparedTxnDB::PreparedHeap heap;
160 port::RWMutex prepared_mutex;
161 std::atomic<size_t> last;
162
163 for (size_t n = 0; n < 100; n++) {
164 last = 0;
165 t[0] = ROCKSDB_NAMESPACE::port::Thread([&]() {
166 Random rnd(1103);
167 for (size_t seq = 1; seq <= t_cnt; seq++) {
168 // This is not recommended usage but we should be resilient against it.
169 bool skip_push = rnd.OneIn(5);
170 if (!skip_push) {
171 MutexLock ml(heap.push_pop_mutex());
172 std::this_thread::yield();
173 heap.push(seq);
174 last.store(seq);
175 }
176 }
177 });
178 for (size_t i = 1; i <= t_cnt; i++) {
179 t[i] =
180 ROCKSDB_NAMESPACE::port::Thread([&heap, &prepared_mutex, &last, i]() {
181 auto seq = i;
182 do {
183 std::this_thread::yield();
184 } while (last.load() < seq);
185 WriteLock wl(&prepared_mutex);
186 heap.erase(seq);
187 });
188 }
189 for (size_t i = 0; i <= t_cnt; i++) {
190 t[i].join();
191 }
192 ASSERT_TRUE(heap.empty());
193 }
194 }
195
196 // Test that WriteBatchWithIndex correctly counts the number of sub-batches
197 TEST(WriteBatchWithIndex, SubBatchCnt) {
198 ColumnFamilyOptions cf_options;
199 std::string cf_name = "two";
200 DB* db;
201 Options options;
202 options.create_if_missing = true;
203 const std::string dbname = test::PerThreadDBPath("transaction_testdb");
204 DestroyDB(dbname, options);
205 ASSERT_OK(DB::Open(options, dbname, &db));
206 ColumnFamilyHandle* cf_handle = nullptr;
207 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
208 WriteOptions write_options;
209 size_t batch_cnt = 1;
210 size_t save_points = 0;
211 std::vector<size_t> batch_cnt_at;
212 WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true,
213 0);
214 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
215 batch_cnt_at.push_back(batch_cnt);
216 batch.SetSavePoint();
217 save_points++;
218 batch.Put(Slice("key"), Slice("value"));
219 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
220 batch_cnt_at.push_back(batch_cnt);
221 batch.SetSavePoint();
222 save_points++;
223 batch.Put(Slice("key2"), Slice("value2"));
224 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
225 // duplicate the keys
226 batch_cnt_at.push_back(batch_cnt);
227 batch.SetSavePoint();
228 save_points++;
229 batch.Put(Slice("key"), Slice("value3"));
230 batch_cnt++;
231 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
232 // duplicate the 2nd key. It should not be counted duplicate since a
233 // sub-patch is cut after the last duplicate.
234 batch_cnt_at.push_back(batch_cnt);
235 batch.SetSavePoint();
236 save_points++;
237 batch.Put(Slice("key2"), Slice("value4"));
238 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
239 // duplicate the keys but in a different cf. It should not be counted as
240 // duplicate keys
241 batch_cnt_at.push_back(batch_cnt);
242 batch.SetSavePoint();
243 save_points++;
244 batch.Put(cf_handle, Slice("key"), Slice("value5"));
245 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
246
247 // Test that the number of sub-batches matches what we count with
248 // SubBatchCounter
249 std::map<uint32_t, const Comparator*> comparators;
250 comparators[0] = db->DefaultColumnFamily()->GetComparator();
251 comparators[cf_handle->GetID()] = cf_handle->GetComparator();
252 SubBatchCounter counter(comparators);
253 ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter));
254 ASSERT_EQ(batch_cnt, counter.BatchCount());
255
256 // Test that RollbackToSavePoint will properly resets the number of
257 // sub-batches
258 for (size_t i = save_points; i > 0; i--) {
259 batch.RollbackToSavePoint();
260 ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt());
261 }
262
263 // Test the count is right with random batches
264 {
265 const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms
266 Random rnd(1131);
267 std::string keys[TOTAL_KEYS];
268 for (size_t k = 0; k < TOTAL_KEYS; k++) {
269 int len = static_cast<int>(rnd.Uniform(50));
270 keys[k] = test::RandomKey(&rnd, len);
271 }
272 for (size_t i = 0; i < 1000; i++) { // 1000 random batches
273 WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(),
274 0, true, 0);
275 for (size_t k = 0; k < 10; k++) { // 10 key per batch
276 size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS));
277 Slice key = Slice(keys[ki]);
278 std::string tmp = rnd.RandomString(16);
279 Slice value = Slice(tmp);
280 rndbatch.Put(key, value);
281 }
282 SubBatchCounter batch_counter(comparators);
283 ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter));
284 ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount());
285 }
286 }
287
288 delete cf_handle;
289 delete db;
290 }
291
292 TEST(CommitEntry64b, BasicTest) {
293 const size_t INDEX_BITS = static_cast<size_t>(21);
294 const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);
295 const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS));
296
297 // zero-initialized CommitEntry64b should indicate an empty entry
298 CommitEntry64b empty_entry64b;
299 uint64_t empty_index = 11ul;
300 CommitEntry empty_entry;
301 bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT);
302 ASSERT_FALSE(ok);
303
304 // the zero entry is reserved for un-initialized entries
305 const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1;
306 // Samples over the numbers that are covered by that many index bits
307 std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}};
308 // Samples over the numbers that are covered by that many commit bits
309 std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}};
310 // Iterate over prepare numbers that have i) cover all bits of a sequence
311 // number, and ii) include some bits that fall into the range of index or
312 // commit bits
313 for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) {
314 for (uint64_t i : is) {
315 for (uint64_t d : ds) {
316 uint64_t p = base + i + d;
317 for (uint64_t c : {p, p + d / 2, p + d}) {
318 uint64_t index = p % INDEX_SIZE;
319 CommitEntry before(p, c), after;
320 CommitEntry64b entry64b(before, FORMAT);
321 ok = entry64b.Parse(index, &after, FORMAT);
322 ASSERT_TRUE(ok);
323 if (!(before == after)) {
324 printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64
325 " c %" PRIu64 " index %" PRIu64 "\n",
326 base, i, d, p, c, index);
327 }
328 ASSERT_EQ(before, after);
329 }
330 }
331 }
332 }
333 }
334
335 class WritePreparedTxnDBMock : public WritePreparedTxnDB {
336 public:
337 WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
338 : WritePreparedTxnDB(db_impl, opt) {}
339 void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
340 snapshots_ = snapshots;
341 }
342 void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
343
344 protected:
345 const std::vector<SequenceNumber> GetSnapshotListFromDB(
346 SequenceNumber /* unused */) override {
347 return snapshots_;
348 }
349
350 private:
351 std::vector<SequenceNumber> snapshots_;
352 };
353
354 class WritePreparedTransactionTestBase : public TransactionTestBase {
355 public:
356 WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue,
357 TxnDBWritePolicy write_policy,
358 WriteOrdering write_ordering)
359 : TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
360 write_ordering){};
361
362 protected:
363 void UpdateTransactionDBOptions(size_t snapshot_cache_bits,
364 size_t commit_cache_bits) {
365 txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
366 txn_db_options.wp_commit_cache_bits = commit_cache_bits;
367 }
368 void UpdateTransactionDBOptions(size_t snapshot_cache_bits) {
369 txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
370 }
371 // If expect_update is set, check if it actually updated old_commit_map_. If
372 // it did not and yet suggested not to check the next snapshot, do the
373 // opposite to check if it was not a bad suggestion.
374 void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit,
375 uint64_t snapshot,
376 uint64_t next_snapshot,
377 bool expect_update) {
378 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
379 // reset old_commit_map_empty_ so that its value indicate whether
380 // old_commit_map_ was updated
381 wp_db->old_commit_map_empty_ = true;
382 bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot,
383 snapshot < next_snapshot);
384 if (expect_update == wp_db->old_commit_map_empty_) {
385 printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
386 " next: %" PRIu64 "\n",
387 prepare, commit, snapshot, next_snapshot);
388 }
389 EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_);
390 if (!check_next && wp_db->old_commit_map_empty_) {
391 // do the opposite to make sure it was not a bad suggestion
392 const bool dont_care_bool = true;
393 wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot,
394 dont_care_bool);
395 if (!wp_db->old_commit_map_empty_) {
396 printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
397 " next: %" PRIu64 "\n",
398 prepare, commit, snapshot, next_snapshot);
399 }
400 EXPECT_TRUE(wp_db->old_commit_map_empty_);
401 }
402 }
403
404 // Test that a CheckAgainstSnapshots thread reading old_snapshots will not
405 // miss a snapshot because of a concurrent update by UpdateSnapshots that is
406 // writing new_snapshots. Both threads are broken at two points. The sync
407 // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
408 // entry is expected to be vital for one of the snapshots that is common
409 // between the old and new list of snapshots.
410 void SnapshotConcurrentAccessTestInternal(
411 WritePreparedTxnDB* wp_db,
412 const std::vector<SequenceNumber>& old_snapshots,
413 const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry,
414 SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) {
415 // First reset the snapshot list
416 const std::vector<SequenceNumber> empty_snapshots;
417 wp_db->old_commit_map_empty_ = true;
418 wp_db->UpdateSnapshots(empty_snapshots, ++version);
419 // Then initialize it with the old_snapshots
420 wp_db->UpdateSnapshots(old_snapshots, ++version);
421
422 // Starting from the first thread, cut each thread at two points
423 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
424 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1),
425 "WritePreparedTxnDB::UpdateSnapshots:s:start"},
426 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1),
427 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)},
428 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2),
429 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)},
430 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2),
431 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)},
432 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
433 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)},
434 });
435 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
436 {
437 ASSERT_TRUE(wp_db->old_commit_map_empty_);
438 ROCKSDB_NAMESPACE::port::Thread t1(
439 [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
440 ROCKSDB_NAMESPACE::port::Thread t2(
441 [&]() { wp_db->CheckAgainstSnapshots(entry); });
442 t1.join();
443 t2.join();
444 ASSERT_FALSE(wp_db->old_commit_map_empty_);
445 }
446 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
447
448 wp_db->old_commit_map_empty_ = true;
449 wp_db->UpdateSnapshots(empty_snapshots, ++version);
450 wp_db->UpdateSnapshots(old_snapshots, ++version);
451 // Starting from the second thread, cut each thread at two points
452 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
453 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1),
454 "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
455 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1),
456 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)},
457 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2),
458 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)},
459 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2),
460 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)},
461 {"WritePreparedTxnDB::UpdateSnapshots:p:end",
462 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)},
463 });
464 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
465 {
466 ASSERT_TRUE(wp_db->old_commit_map_empty_);
467 ROCKSDB_NAMESPACE::port::Thread t1(
468 [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
469 ROCKSDB_NAMESPACE::port::Thread t2(
470 [&]() { wp_db->CheckAgainstSnapshots(entry); });
471 t1.join();
472 t2.join();
473 ASSERT_FALSE(wp_db->old_commit_map_empty_);
474 }
475 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
476 }
477
478 // Verify value of keys.
479 void VerifyKeys(const std::unordered_map<std::string, std::string>& data,
480 const Snapshot* snapshot = nullptr) {
481 std::string value;
482 ReadOptions read_options;
483 read_options.snapshot = snapshot;
484 for (auto& kv : data) {
485 auto s = db->Get(read_options, kv.first, &value);
486 ASSERT_TRUE(s.ok() || s.IsNotFound());
487 if (s.ok()) {
488 if (kv.second != value) {
489 printf("key = %s\n", kv.first.c_str());
490 }
491 ASSERT_EQ(kv.second, value);
492 } else {
493 ASSERT_EQ(kv.second, "NOT_FOUND");
494 }
495
496 // Try with MultiGet API too
497 std::vector<std::string> values;
498 auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()},
499 {kv.first}, &values);
500 ASSERT_EQ(1, values.size());
501 ASSERT_EQ(1, s_vec.size());
502 s = s_vec[0];
503 ASSERT_TRUE(s.ok() || s.IsNotFound());
504 if (s.ok()) {
505 ASSERT_TRUE(kv.second == values[0]);
506 } else {
507 ASSERT_EQ(kv.second, "NOT_FOUND");
508 }
509 }
510 }
511
512 // Verify all versions of keys.
513 void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
514 std::vector<KeyVersion> versions;
515 const size_t kMaxKeys = 100000;
516 ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
517 expected_versions.back().user_key, kMaxKeys,
518 &versions));
519 ASSERT_EQ(expected_versions.size(), versions.size());
520 for (size_t i = 0; i < versions.size(); i++) {
521 ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
522 ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence);
523 ASSERT_EQ(expected_versions[i].type, versions[i].type);
524 if (versions[i].type != kTypeDeletion &&
525 versions[i].type != kTypeSingleDeletion) {
526 ASSERT_EQ(expected_versions[i].value, versions[i].value);
527 }
528 // Range delete not supported.
529 assert(expected_versions[i].type != kTypeRangeDeletion);
530 }
531 }
532 };
533
534 class WritePreparedTransactionTest
535 : public WritePreparedTransactionTestBase,
536 virtual public ::testing::WithParamInterface<
537 std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering>> {
538 public:
539 WritePreparedTransactionTest()
540 : WritePreparedTransactionTestBase(
541 std::get<0>(GetParam()), std::get<1>(GetParam()),
542 std::get<2>(GetParam()), std::get<3>(GetParam())){};
543 };
544
545 #ifndef ROCKSDB_VALGRIND_RUN
546 class SnapshotConcurrentAccessTest
547 : public WritePreparedTransactionTestBase,
548 virtual public ::testing::WithParamInterface<std::tuple<
549 bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> {
550 public:
551 SnapshotConcurrentAccessTest()
552 : WritePreparedTransactionTestBase(
553 std::get<0>(GetParam()), std::get<1>(GetParam()),
554 std::get<2>(GetParam()), std::get<3>(GetParam())),
555 split_id_(std::get<4>(GetParam())),
556 split_cnt_(std::get<5>(GetParam())){};
557
558 protected:
559 // A test is split into split_cnt_ tests, each identified with split_id_ where
560 // 0 <= split_id_ < split_cnt_
561 size_t split_id_;
562 size_t split_cnt_;
563 };
564 #endif // ROCKSDB_VALGRIND_RUN
565
566 class SeqAdvanceConcurrentTest
567 : public WritePreparedTransactionTestBase,
568 virtual public ::testing::WithParamInterface<std::tuple<
569 bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> {
570 public:
571 SeqAdvanceConcurrentTest()
572 : WritePreparedTransactionTestBase(
573 std::get<0>(GetParam()), std::get<1>(GetParam()),
574 std::get<2>(GetParam()), std::get<3>(GetParam())),
575 split_id_(std::get<4>(GetParam())),
576 split_cnt_(std::get<5>(GetParam())) {
577 special_env.skip_fsync_ = true;
578 };
579
580 protected:
581 // A test is split into split_cnt_ tests, each identified with split_id_ where
582 // 0 <= split_id_ < split_cnt_
583 size_t split_id_;
584 size_t split_cnt_;
585 };
586
587 INSTANTIATE_TEST_CASE_P(
588 WritePreparedTransaction, WritePreparedTransactionTest,
589 ::testing::Values(
590 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
591 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
592 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite)));
593
594 #ifndef ROCKSDB_VALGRIND_RUN
595 INSTANTIATE_TEST_CASE_P(
596 TwoWriteQueues, SnapshotConcurrentAccessTest,
597 ::testing::Values(
598 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 20),
599 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 20),
600 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 20),
601 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 20),
602 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 20),
603 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 20),
604 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 20),
605 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 20),
606 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 20),
607 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 20),
608 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 10, 20),
609 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 11, 20),
610 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 12, 20),
611 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 13, 20),
612 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 14, 20),
613 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 15, 20),
614 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 16, 20),
615 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 17, 20),
616 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 18, 20),
617 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 19, 20),
618
619 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 20),
620 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 20),
621 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 20),
622 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 20),
623 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 20),
624 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 20),
625 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 20),
626 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 20),
627 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 20),
628 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 20),
629 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 10, 20),
630 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 11, 20),
631 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 12, 20),
632 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 13, 20),
633 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 14, 20),
634 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 15, 20),
635 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 16, 20),
636 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 17, 20),
637 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 18, 20),
638 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 19, 20)));
639
640 INSTANTIATE_TEST_CASE_P(
641 OneWriteQueue, SnapshotConcurrentAccessTest,
642 ::testing::Values(
643 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 20),
644 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 20),
645 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 20),
646 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 20),
647 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 20),
648 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 20),
649 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 20),
650 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 20),
651 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 20),
652 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 20),
653 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 10, 20),
654 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 11, 20),
655 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 12, 20),
656 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 13, 20),
657 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 14, 20),
658 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 15, 20),
659 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 16, 20),
660 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 17, 20),
661 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 18, 20),
662 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 19, 20)));
663
664 INSTANTIATE_TEST_CASE_P(
665 TwoWriteQueues, SeqAdvanceConcurrentTest,
666 ::testing::Values(
667 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 10),
668 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 10),
669 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 10),
670 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 10),
671 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 10),
672 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 10),
673 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 10),
674 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 10),
675 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 10),
676 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 10),
677 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 10),
678 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 10),
679 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 10),
680 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 10),
681 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 10),
682 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 10),
683 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 10),
684 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 10),
685 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 10),
686 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 10)));
687
688 INSTANTIATE_TEST_CASE_P(
689 OneWriteQueue, SeqAdvanceConcurrentTest,
690 ::testing::Values(
691 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 10),
692 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 10),
693 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 10),
694 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 10),
695 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 10),
696 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 10),
697 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 10),
698 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 10),
699 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 10),
700 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 10)));
701 #endif // ROCKSDB_VALGRIND_RUN
702
703 TEST_P(WritePreparedTransactionTest, CommitMap) {
704 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
705 assert(wp_db);
706 assert(wp_db->db_impl_);
707 size_t size = wp_db->COMMIT_CACHE_SIZE;
708 CommitEntry c = {5, 12}, e;
709 bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e);
710 ASSERT_FALSE(evicted);
711
712 // Should be able to read the same value
713 CommitEntry64b dont_care;
714 bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
715 ASSERT_TRUE(found);
716 ASSERT_EQ(c, e);
717 // Should be able to distinguish between overlapping entries
718 found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e);
719 ASSERT_TRUE(found);
720 ASSERT_NE(c.prep_seq + size, e.prep_seq);
721 // Should be able to detect non-existent entry
722 found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e);
723 ASSERT_FALSE(found);
724
725 // Reject an invalid exchange
726 CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size};
727 CommitEntry64b e2_64b(e2, wp_db->FORMAT);
728 bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e);
729 ASSERT_FALSE(exchanged);
730 // check whether it did actually reject that
731 found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e);
732 ASSERT_TRUE(found);
733 ASSERT_EQ(c, e);
734
735 // Accept a valid exchange
736 CommitEntry64b c_64b(c, wp_db->FORMAT);
737 CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1};
738 exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3);
739 ASSERT_TRUE(exchanged);
740 // check whether it did actually accepted that
741 found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
742 ASSERT_TRUE(found);
743 ASSERT_EQ(e3, e);
744
745 // Rewrite an entry
746 CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1};
747 evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e);
748 ASSERT_TRUE(evicted);
749 ASSERT_EQ(e3, e);
750 found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e);
751 ASSERT_TRUE(found);
752 ASSERT_EQ(e4, e);
753 }
754
755 TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
756 // If prepare <= snapshot < commit we should keep the entry around since its
757 // nonexistence could be interpreted as committed in the snapshot while it is
758 // not true. We keep such entries around by adding them to the
759 // old_commit_map_.
760 uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/;
761 p = 10l, c = 15l, s = 20l, ns = 21l;
762 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
763 // If we do not expect the old commit map to be updated, try also with a next
764 // snapshot that is expected to update the old commit map. This would test
765 // that MaybeUpdateOldCommitMap would not prevent us from checking the next
766 // snapshot that must be checked.
767 p = 10l, c = 15l, s = 20l, ns = 11l;
768 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
769
770 p = 10l, c = 20l, s = 20l, ns = 19l;
771 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
772 p = 10l, c = 20l, s = 20l, ns = 21l;
773 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
774
775 p = 20l, c = 20l, s = 20l, ns = 21l;
776 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
777 p = 20l, c = 20l, s = 20l, ns = 19l;
778 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
779
780 p = 10l, c = 25l, s = 20l, ns = 21l;
781 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
782
783 p = 20l, c = 25l, s = 20l, ns = 21l;
784 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
785
786 p = 21l, c = 25l, s = 20l, ns = 22l;
787 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
788 p = 21l, c = 25l, s = 20l, ns = 19l;
789 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
790 }
791
792 // Trigger the condition where some old memtables are skipped when doing
793 // TransactionUtil::CheckKey(), and make sure the result is still correct.
794 TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
795 const int kAttemptHistoryMemtable = 0;
796 const int kAttemptImmMemTable = 1;
797 for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
798 attempt++) {
799 options.max_write_buffer_number_to_maintain = 3;
800 ReOpen();
801
802 WriteOptions write_options;
803 ReadOptions read_options;
804 TransactionOptions txn_options;
805 txn_options.set_snapshot = true;
806 string value;
807 Status s;
808
809 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
810 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
811
812 Transaction* txn = db->BeginTransaction(write_options, txn_options);
813 ASSERT_TRUE(txn != nullptr);
814 ASSERT_OK(txn->SetName("txn"));
815
816 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
817 ASSERT_TRUE(txn2 != nullptr);
818 ASSERT_OK(txn2->SetName("txn2"));
819
820 // This transaction is created to cause potential conflict.
821 Transaction* txn_x = db->BeginTransaction(write_options);
822 ASSERT_OK(txn_x->SetName("txn_x"));
823 ASSERT_OK(txn_x->Put(Slice("foo"), Slice("bar3")));
824 ASSERT_OK(txn_x->Prepare());
825
826 // Create snapshots after the prepare, but there should still
827 // be a conflict when trying to read "foo".
828
829 if (attempt == kAttemptImmMemTable) {
830 // For the second attempt, hold flush from beginning. The memtable
831 // will be switched to immutable after calling TEST_SwitchMemtable()
832 // while CheckKey() is called.
833 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
834 {{"WritePreparedTransactionTest.CheckKeySkipOldMemtable",
835 "FlushJob::Start"}});
836 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
837 }
838
839 // force a memtable flush. The memtable should still be kept
840 FlushOptions flush_ops;
841 if (attempt == kAttemptHistoryMemtable) {
842 ASSERT_OK(db->Flush(flush_ops));
843 } else {
844 assert(attempt == kAttemptImmMemTable);
845 DBImpl* db_impl = static_cast<DBImpl*>(db->GetRootDB());
846 db_impl->TEST_SwitchMemtable();
847 }
848 uint64_t num_imm_mems;
849 ASSERT_TRUE(db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
850 &num_imm_mems));
851 if (attempt == kAttemptHistoryMemtable) {
852 ASSERT_EQ(0, num_imm_mems);
853 } else {
854 assert(attempt == kAttemptImmMemTable);
855 ASSERT_EQ(1, num_imm_mems);
856 }
857
858 // Put something in active memtable
859 ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar")));
860
861 // Create txn3 after flushing, but this transaction also needs to
862 // check all memtables because of they contains uncommitted data.
863 Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
864 ASSERT_TRUE(txn3 != nullptr);
865 ASSERT_OK(txn3->SetName("txn3"));
866
867 // Commit the pending write
868 ASSERT_OK(txn_x->Commit());
869
870 // Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will
871 // pass. In all cases, both memtables are queried.
872 SetPerfLevel(PerfLevel::kEnableCount);
873 get_perf_context()->Reset();
874 ASSERT_TRUE(txn3->GetForUpdate(read_options, "foo", &value).IsBusy());
875 // We should have checked two memtables, active and either immutable
876 // or history memtable, depending on the test case.
877 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
878
879 get_perf_context()->Reset();
880 ASSERT_TRUE(txn->GetForUpdate(read_options, "foo", &value).IsBusy());
881 // We should have checked two memtables, active and either immutable
882 // or history memtable, depending on the test case.
883 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
884
885 get_perf_context()->Reset();
886 ASSERT_OK(txn2->GetForUpdate(read_options, "foo2", &value));
887 ASSERT_EQ(value, "bar");
888 // We should have checked two memtables, and since there is no
889 // conflict, another Get() will be made and fetch the data from
890 // DB. If it is in immutable memtable, two extra memtable reads
891 // will be issued. If it is not (in history), only one will
892 // be made, which is to the active memtable.
893 if (attempt == kAttemptHistoryMemtable) {
894 ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
895 } else {
896 assert(attempt == kAttemptImmMemTable);
897 ASSERT_EQ(4, get_perf_context()->get_from_memtable_count);
898 }
899
900 Transaction* txn4 = db->BeginTransaction(write_options, txn_options);
901 ASSERT_TRUE(txn4 != nullptr);
902 ASSERT_OK(txn4->SetName("txn4"));
903 get_perf_context()->Reset();
904 ASSERT_OK(txn4->GetForUpdate(read_options, "foo", &value));
905 if (attempt == kAttemptHistoryMemtable) {
906 // Active memtable will be checked in snapshot validation and when
907 // getting the value.
908 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
909 } else {
910 // Only active memtable will be checked in snapshot validation but
911 // both of active and immutable snapshot will be queried when
912 // getting the value.
913 assert(attempt == kAttemptImmMemTable);
914 ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
915 }
916
917 ASSERT_OK(txn2->Commit());
918 ASSERT_OK(txn4->Commit());
919
920 TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable");
921 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
922
923 SetPerfLevel(PerfLevel::kDisable);
924
925 delete txn;
926 delete txn2;
927 delete txn3;
928 delete txn4;
929 delete txn_x;
930 }
931 }
932
933 // Reproduce the bug with two snapshots with the same seuqence number and test
934 // that the release of the first snapshot will not affect the reads by the other
935 // snapshot
936 TEST_P(WritePreparedTransactionTest, DoubleSnapshot) {
937 TransactionOptions txn_options;
938 Status s;
939
940 // Insert initial value
941 ASSERT_OK(db->Put(WriteOptions(), "key", "value1"));
942
943 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
944 Transaction* txn =
945 wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr);
946 ASSERT_OK(txn->SetName("txn"));
947 ASSERT_OK(txn->Put("key", "value2"));
948 ASSERT_OK(txn->Prepare());
949 // Three snapshots with the same seq number
950 const Snapshot* snapshot0 = wp_db->GetSnapshot();
951 const Snapshot* snapshot1 = wp_db->GetSnapshot();
952 const Snapshot* snapshot2 = wp_db->GetSnapshot();
953 ASSERT_OK(txn->Commit());
954 SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE;
955 SequenceNumber overlap_seq = txn->GetId() + cache_size;
956 delete txn;
957
958 // 4th snapshot with a larger seq
959 const Snapshot* snapshot3 = wp_db->GetSnapshot();
960 // Cause an eviction to advance max evicted seq number
961 // This also fetches the 4 snapshots from db since their seq is lower than the
962 // new max
963 wp_db->AddCommitted(overlap_seq, overlap_seq);
964
965 ReadOptions ropt;
966 // It should see the value before commit
967 ropt.snapshot = snapshot2;
968 PinnableSlice pinnable_val;
969 s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
970 ASSERT_OK(s);
971 ASSERT_TRUE(pinnable_val == "value1");
972 pinnable_val.Reset();
973
974 wp_db->ReleaseSnapshot(snapshot1);
975
976 // It should still see the value before commit
977 s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
978 ASSERT_OK(s);
979 ASSERT_TRUE(pinnable_val == "value1");
980 pinnable_val.Reset();
981
982 // Cause an eviction to advance max evicted seq number and trigger updating
983 // the snapshot list
984 overlap_seq += cache_size;
985 wp_db->AddCommitted(overlap_seq, overlap_seq);
986
987 // It should still see the value before commit
988 s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
989 ASSERT_OK(s);
990 ASSERT_TRUE(pinnable_val == "value1");
991 pinnable_val.Reset();
992
993 wp_db->ReleaseSnapshot(snapshot0);
994 wp_db->ReleaseSnapshot(snapshot2);
995 wp_db->ReleaseSnapshot(snapshot3);
996 }
997
998 size_t UniqueCnt(std::vector<SequenceNumber> vec) {
999 std::set<SequenceNumber> aset;
1000 for (auto i : vec) {
1001 aset.insert(i);
1002 }
1003 return aset.size();
1004 }
1005 // Test that the entries in old_commit_map_ get garbage collected properly
1006 TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
1007 const size_t snapshot_cache_bits = 0;
1008 const size_t commit_cache_bits = 0;
1009 DBImpl* mock_db = new DBImpl(options, dbname);
1010 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1011 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1012 new WritePreparedTxnDBMock(mock_db, txn_db_options));
1013
1014 SequenceNumber seq = 0;
1015 // Take the first snapshot that overlaps with two txn
1016 auto prep_seq = ++seq;
1017 wp_db->AddPrepared(prep_seq);
1018 auto prep_seq2 = ++seq;
1019 wp_db->AddPrepared(prep_seq2);
1020 auto snap_seq1 = seq;
1021 wp_db->TakeSnapshot(snap_seq1);
1022 auto commit_seq = ++seq;
1023 wp_db->AddCommitted(prep_seq, commit_seq);
1024 wp_db->RemovePrepared(prep_seq);
1025 auto commit_seq2 = ++seq;
1026 wp_db->AddCommitted(prep_seq2, commit_seq2);
1027 wp_db->RemovePrepared(prep_seq2);
1028 // Take the 2nd and 3rd snapshot that overlap with the same txn
1029 prep_seq = ++seq;
1030 wp_db->AddPrepared(prep_seq);
1031 auto snap_seq2 = seq;
1032 wp_db->TakeSnapshot(snap_seq2);
1033 seq++;
1034 auto snap_seq3 = seq;
1035 wp_db->TakeSnapshot(snap_seq3);
1036 seq++;
1037 commit_seq = ++seq;
1038 wp_db->AddCommitted(prep_seq, commit_seq);
1039 wp_db->RemovePrepared(prep_seq);
1040 // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
1041 // only item in the commit_cache_ via another commit.
1042 prep_seq = ++seq;
1043 wp_db->AddPrepared(prep_seq);
1044 commit_seq = ++seq;
1045 wp_db->AddCommitted(prep_seq, commit_seq);
1046 wp_db->RemovePrepared(prep_seq);
1047
1048 // Verify that the evicted commit entries for all snapshots are in the
1049 // old_commit_map_
1050 {
1051 ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
1052 ReadLock rl(&wp_db->old_commit_map_mutex_);
1053 ASSERT_EQ(3, wp_db->old_commit_map_.size());
1054 ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
1055 ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2]));
1056 ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
1057 }
1058
1059 // Verify that the 2nd snapshot is cleaned up after the release
1060 wp_db->ReleaseSnapshotInternal(snap_seq2);
1061 {
1062 ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
1063 ReadLock rl(&wp_db->old_commit_map_mutex_);
1064 ASSERT_EQ(2, wp_db->old_commit_map_.size());
1065 ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
1066 ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
1067 }
1068
1069 // Verify that the 1st snapshot is cleaned up after the release
1070 wp_db->ReleaseSnapshotInternal(snap_seq1);
1071 {
1072 ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
1073 ReadLock rl(&wp_db->old_commit_map_mutex_);
1074 ASSERT_EQ(1, wp_db->old_commit_map_.size());
1075 ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
1076 }
1077
1078 // Verify that the 3rd snapshot is cleaned up after the release
1079 wp_db->ReleaseSnapshotInternal(snap_seq3);
1080 {
1081 ASSERT_TRUE(wp_db->old_commit_map_empty_.load());
1082 ReadLock rl(&wp_db->old_commit_map_mutex_);
1083 ASSERT_EQ(0, wp_db->old_commit_map_.size());
1084 }
1085 }
1086
1087 TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshots) {
1088 std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
1089 600l, 700l, 800l, 900l};
1090 const size_t snapshot_cache_bits = 2;
1091 const uint64_t cache_size = 1ul << snapshot_cache_bits;
1092 // Safety check to express the intended size in the test. Can be adjusted if
1093 // the snapshots lists changed.
1094 assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
1095 DBImpl* mock_db = new DBImpl(options, dbname);
1096 UpdateTransactionDBOptions(snapshot_cache_bits);
1097 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1098 new WritePreparedTxnDBMock(mock_db, txn_db_options));
1099 SequenceNumber version = 1000l;
1100 ASSERT_EQ(0, wp_db->snapshots_total_);
1101 wp_db->UpdateSnapshots(snapshots, version);
1102 ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_);
1103 // seq numbers are chosen so that we have two of them between each two
1104 // snapshots. If the diff of two consecutive seq is more than 5, there is a
1105 // snapshot between them.
1106 std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l,
1107 355l, 450l, 455l, 550l, 555l, 650l, 655l,
1108 750l, 755l, 850l, 855l, 950l, 955l};
1109 assert(seqs.size() > 1);
1110 for (size_t i = 0; i + 1 < seqs.size(); i++) {
1111 wp_db->old_commit_map_empty_ = true; // reset
1112 CommitEntry commit_entry = {seqs[i], seqs[i + 1]};
1113 wp_db->CheckAgainstSnapshots(commit_entry);
1114 // Expect update if there is snapshot in between the prepare and commit
1115 bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 &&
1116 commit_entry.commit_seq >= snapshots.front() &&
1117 commit_entry.prep_seq <= snapshots.back();
1118 ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
1119 }
1120
1121 // Test that search will include multiple snapshot from snapshot cache
1122 {
1123 // exclude first and last item in the cache
1124 CommitEntry commit_entry = {snapshots.front() + 1,
1125 snapshots[cache_size - 1] - 1};
1126 wp_db->old_commit_map_empty_ = true; // reset
1127 wp_db->old_commit_map_.clear();
1128 wp_db->CheckAgainstSnapshots(commit_entry);
1129 ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2);
1130 }
1131
1132 // Test that search will include multiple snapshot from old snapshots
1133 {
1134 // include two in the middle
1135 CommitEntry commit_entry = {snapshots[cache_size] + 1,
1136 snapshots[cache_size + 2] + 1};
1137 wp_db->old_commit_map_empty_ = true; // reset
1138 wp_db->old_commit_map_.clear();
1139 wp_db->CheckAgainstSnapshots(commit_entry);
1140 ASSERT_EQ(wp_db->old_commit_map_.size(), 2);
1141 }
1142
1143 // Test that search will include both snapshot cache and old snapshots
1144 // Case 1: includes all in snapshot cache
1145 {
1146 CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1};
1147 wp_db->old_commit_map_empty_ = true; // reset
1148 wp_db->old_commit_map_.clear();
1149 wp_db->CheckAgainstSnapshots(commit_entry);
1150 ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size());
1151 }
1152
1153 // Case 2: includes all snapshot caches except the smallest
1154 {
1155 CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1};
1156 wp_db->old_commit_map_empty_ = true; // reset
1157 wp_db->old_commit_map_.clear();
1158 wp_db->CheckAgainstSnapshots(commit_entry);
1159 ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1);
1160 }
1161
1162 // Case 3: includes only the largest of snapshot cache
1163 {
1164 CommitEntry commit_entry = {snapshots[cache_size - 1] - 1,
1165 snapshots.back() + 1};
1166 wp_db->old_commit_map_empty_ = true; // reset
1167 wp_db->old_commit_map_.clear();
1168 wp_db->CheckAgainstSnapshots(commit_entry);
1169 ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1);
1170 }
1171 }
1172
1173 // This test is too slow for travis
1174 #ifndef TRAVIS
1175 #ifndef ROCKSDB_VALGRIND_RUN
1176 // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
1177 // parallel with UpdateSnapshots.
1178 TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccess) {
1179 // We have a sync point in the method under test after checking each snapshot.
1180 // If you increase the max number of snapshots in this test, more sync points
1181 // in the methods must also be added.
1182 const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l,
1183 60l, 70l, 80l, 90l, 100l};
1184 const size_t snapshot_cache_bits = 2;
1185 // Safety check to express the intended size in the test. Can be adjusted if
1186 // the snapshots lists changed.
1187 assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size());
1188 SequenceNumber version = 1000l;
1189 // Choose the cache size so that the new snapshot list could replace all the
1190 // existing items in the cache and also have some overflow.
1191 DBImpl* mock_db = new DBImpl(options, dbname);
1192 UpdateTransactionDBOptions(snapshot_cache_bits);
1193 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1194 new WritePreparedTxnDBMock(mock_db, txn_db_options));
1195 const size_t extra = 2;
1196 size_t loop_id = 0;
1197 // Add up to extra items that do not fit into the cache
1198 for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra;
1199 old_size++) {
1200 const std::vector<SequenceNumber> old_snapshots(
1201 snapshots.begin(), snapshots.begin() + old_size);
1202
1203 // Each member of old snapshot might or might not appear in the new list. We
1204 // create a common_snapshots for each combination.
1205 size_t new_comb_cnt = size_t(1) << old_size;
1206 for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) {
1207 if (loop_id % split_cnt_ != split_id_) continue;
1208 printf("."); // To signal progress
1209 fflush(stdout);
1210 std::vector<SequenceNumber> common_snapshots;
1211 for (size_t i = 0; i < old_snapshots.size(); i++) {
1212 if (IsInCombination(i, new_comb)) {
1213 common_snapshots.push_back(old_snapshots[i]);
1214 }
1215 }
1216 // And add some new snapshots to the common list
1217 for (size_t added_snapshots = 0;
1218 added_snapshots <= snapshots.size() - old_snapshots.size();
1219 added_snapshots++) {
1220 std::vector<SequenceNumber> new_snapshots = common_snapshots;
1221 for (size_t i = 0; i < added_snapshots; i++) {
1222 new_snapshots.push_back(snapshots[old_snapshots.size() + i]);
1223 }
1224 for (auto it = common_snapshots.begin(); it != common_snapshots.end();
1225 ++it) {
1226 auto snapshot = *it;
1227 // Create a commit entry that is around the snapshot and thus should
1228 // be not be discarded
1229 CommitEntry entry = {static_cast<uint64_t>(snapshot - 1),
1230 snapshot + 1};
1231 // The critical part is when iterating the snapshot cache. Afterwards,
1232 // we are operating under the lock
1233 size_t a_range =
1234 std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
1235 size_t b_range =
1236 std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
1237 // Break each thread at two points
1238 for (size_t a1 = 1; a1 <= a_range; a1++) {
1239 for (size_t a2 = a1 + 1; a2 <= a_range; a2++) {
1240 for (size_t b1 = 1; b1 <= b_range; b1++) {
1241 for (size_t b2 = b1 + 1; b2 <= b_range; b2++) {
1242 SnapshotConcurrentAccessTestInternal(
1243 wp_db.get(), old_snapshots, new_snapshots, entry, version,
1244 a1, a2, b1, b2);
1245 }
1246 }
1247 }
1248 }
1249 }
1250 }
1251 }
1252 }
1253 printf("\n");
1254 }
1255 #endif // ROCKSDB_VALGRIND_RUN
1256 #endif // TRAVIS
1257
1258 // This test clarifies the contract of AdvanceMaxEvictedSeq method
1259 TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasic) {
1260 DBImpl* mock_db = new DBImpl(options, dbname);
1261 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1262 new WritePreparedTxnDBMock(mock_db, txn_db_options));
1263
1264 // 1. Set the initial values for max, prepared, and snapshots
1265 SequenceNumber zero_max = 0l;
1266 // Set the initial list of prepared txns
1267 const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100,
1268 150, 200, 250};
1269 for (auto p : initial_prepared) {
1270 wp_db->AddPrepared(p);
1271 }
1272 // This updates the max value and also set old prepared
1273 SequenceNumber init_max = 100;
1274 wp_db->AdvanceMaxEvictedSeq(zero_max, init_max);
1275 const std::vector<SequenceNumber> initial_snapshots = {20, 40};
1276 wp_db->SetDBSnapshots(initial_snapshots);
1277 // This will update the internal cache of snapshots from the DB
1278 wp_db->UpdateSnapshots(initial_snapshots, init_max);
1279
1280 // 2. Invoke AdvanceMaxEvictedSeq
1281 const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300};
1282 wp_db->SetDBSnapshots(latest_snapshots);
1283 SequenceNumber new_max = 200;
1284 wp_db->AdvanceMaxEvictedSeq(init_max, new_max);
1285
1286 // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
1287 // a. max should be updated to new_max
1288 ASSERT_EQ(wp_db->max_evicted_seq_, new_max);
1289 // b. delayed prepared should contain every txn <= max and prepared should
1290 // only contain txns > max
1291 auto it = initial_prepared.begin();
1292 for (; it != initial_prepared.end() && *it <= new_max; ++it) {
1293 ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it));
1294 }
1295 ASSERT_TRUE(wp_db->delayed_prepared_.empty());
1296 for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty();
1297 ++it, wp_db->prepared_txns_.pop()) {
1298 ASSERT_EQ(*it, wp_db->prepared_txns_.top());
1299 }
1300 ASSERT_TRUE(it == initial_prepared.end());
1301 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1302 // c. snapshots should contain everything below new_max
1303 auto sit = latest_snapshots.begin();
1304 for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max &&
1305 i < wp_db->snapshots_total_;
1306 sit++, i++) {
1307 ASSERT_TRUE(i < wp_db->snapshots_total_);
1308 // This test is in small scale and the list of snapshots are assumed to be
1309 // within the cache size limit. This is just a safety check to double check
1310 // that assumption.
1311 ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE);
1312 ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]);
1313 }
1314 }
1315
1316 // A new snapshot should always be always larger than max_evicted_seq_
1317 // Otherwise the snapshot does not go through AdvanceMaxEvictedSeq
1318 TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) {
1319 WriteOptions woptions;
1320 TransactionOptions txn_options;
1321 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1322 Transaction* txn0 = db->BeginTransaction(woptions, txn_options);
1323 ASSERT_OK(txn0->Put(Slice("key"), Slice("value")));
1324 ASSERT_OK(txn0->Commit());
1325 const SequenceNumber seq = txn0->GetId(); // is also prepare seq
1326 delete txn0;
1327 std::vector<Transaction*> txns;
1328 // Inc seq without committing anything
1329 for (int i = 0; i < 10; i++) {
1330 Transaction* txn = db->BeginTransaction(woptions, txn_options);
1331 ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
1332 ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value")));
1333 ASSERT_OK(txn->Prepare());
1334 txns.push_back(txn);
1335 }
1336
1337 // The new commit is seq + 10
1338 ASSERT_OK(db->Put(woptions, "key", "value"));
1339 auto snap = wp_db->GetSnapshot();
1340 const SequenceNumber last_seq = snap->GetSequenceNumber();
1341 wp_db->ReleaseSnapshot(snap);
1342 ASSERT_LT(seq, last_seq);
1343 // Otherwise our test is not effective
1344 ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED);
1345
1346 // Evict seq out of commit cache
1347 const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE;
1348 // Check that the next write could make max go beyond last
1349 auto last_max = wp_db->max_evicted_seq_.load();
1350 wp_db->AddCommitted(overwrite_seq, overwrite_seq);
1351 // Check that eviction has advanced the max
1352 ASSERT_LT(last_max, wp_db->max_evicted_seq_.load());
1353 // Check that the new max has not advanced the last seq
1354 ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq);
1355 for (auto txn : txns) {
1356 txn->Rollback();
1357 delete txn;
1358 }
1359 }
1360
1361 // A new snapshot should always be always larger than max_evicted_seq_
1362 // In very rare cases max could be below last published seq. Test that
1363 // taking snapshot will wait for max to catch up.
1364 TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
1365 const size_t snapshot_cache_bits = 7; // same as default
1366 const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
1367 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1368 ReOpen();
1369 WriteOptions woptions;
1370 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1371
1372 const int writes = 50;
1373 const int batch_cnt = 4;
1374 ROCKSDB_NAMESPACE::port::Thread t1([&]() {
1375 for (int i = 0; i < writes; i++) {
1376 WriteBatch batch;
1377 // For duplicate keys cause 4 commit entries, each evicting an entry that
1378 // is not published yet, thus causing max evicted seq go higher than last
1379 // published.
1380 for (int b = 0; b < batch_cnt; b++) {
1381 batch.Put("foo", "foo");
1382 }
1383 db->Write(woptions, &batch);
1384 }
1385 });
1386
1387 ROCKSDB_NAMESPACE::port::Thread t2([&]() {
1388 while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
1389 std::this_thread::yield();
1390 }
1391 for (int i = 0; i < 10; i++) {
1392 SequenceNumber max_lower_bound = wp_db->max_evicted_seq_;
1393 auto snap = db->GetSnapshot();
1394 if (snap->GetSequenceNumber() != 0) {
1395 // Value of max_evicted_seq_ when snapshot was taken in unknown. We thus
1396 // compare with the lower bound instead as an approximation.
1397 ASSERT_LT(max_lower_bound, snap->GetSequenceNumber());
1398 } // seq 0 is ok to be less than max since nothing is visible to it
1399 db->ReleaseSnapshot(snap);
1400 }
1401 });
1402
1403 t1.join();
1404 t2.join();
1405
1406 // Make sure that the test has worked and seq number has advanced as we
1407 // thought
1408 auto snap = db->GetSnapshot();
1409 ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1);
1410 db->ReleaseSnapshot(snap);
1411 }
1412
1413 // Test that reads without snapshots would not hit an undefined state
1414 TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) {
1415 const size_t snapshot_cache_bits = 7; // same as default
1416 const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
1417 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1418 ReOpen();
1419 WriteOptions woptions;
1420 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1421
1422 const int writes = 50;
1423 ROCKSDB_NAMESPACE::port::Thread t1([&]() {
1424 for (int i = 0; i < writes; i++) {
1425 WriteBatch batch;
1426 batch.Put("key", "foo");
1427 db->Write(woptions, &batch);
1428 }
1429 });
1430
1431 ROCKSDB_NAMESPACE::port::Thread t2([&]() {
1432 while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
1433 std::this_thread::yield();
1434 }
1435 ReadOptions ropt;
1436 PinnableSlice pinnable_val;
1437 TransactionOptions txn_options;
1438 for (int i = 0; i < 10; i++) {
1439 auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1440 ASSERT_TRUE(s.ok() || s.IsTryAgain());
1441 pinnable_val.Reset();
1442 Transaction* txn = db->BeginTransaction(woptions, txn_options);
1443 s = txn->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1444 ASSERT_TRUE(s.ok() || s.IsTryAgain());
1445 pinnable_val.Reset();
1446 std::vector<std::string> values;
1447 auto s_vec =
1448 txn->MultiGet(ropt, {db->DefaultColumnFamily()}, {"key"}, &values);
1449 ASSERT_EQ(1, values.size());
1450 ASSERT_EQ(1, s_vec.size());
1451 s = s_vec[0];
1452 ASSERT_TRUE(s.ok() || s.IsTryAgain());
1453 Slice key("key");
1454 txn->MultiGet(ropt, db->DefaultColumnFamily(), 1, &key, &pinnable_val, &s,
1455 true);
1456 ASSERT_TRUE(s.ok() || s.IsTryAgain());
1457 delete txn;
1458 }
1459 });
1460
1461 t1.join();
1462 t2.join();
1463
1464 // Make sure that the test has worked and seq number has advanced as we
1465 // thought
1466 auto snap = db->GetSnapshot();
1467 ASSERT_GT(snap->GetSequenceNumber(), writes - 1);
1468 db->ReleaseSnapshot(snap);
1469 }
1470
1471 // Check that old_commit_map_ cleanup works correctly if the snapshot equals
1472 // max_evicted_seq_.
1473 TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) {
1474 const size_t snapshot_cache_bits = 7; // same as default
1475 const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
1476 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1477 ReOpen();
1478 WriteOptions woptions;
1479 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1480 // Insert something to increase seq
1481 ASSERT_OK(db->Put(woptions, "key", "value"));
1482 auto snap = db->GetSnapshot();
1483 auto snap_seq = snap->GetSequenceNumber();
1484 // Another insert should trigger eviction + load snapshot from db
1485 ASSERT_OK(db->Put(woptions, "key", "value"));
1486 // This is the scenario that we check agaisnt
1487 ASSERT_EQ(snap_seq, wp_db->max_evicted_seq_);
1488 // old_commit_map_ now has some data that needs gc
1489 ASSERT_EQ(1, wp_db->snapshots_total_);
1490 ASSERT_EQ(1, wp_db->old_commit_map_.size());
1491
1492 db->ReleaseSnapshot(snap);
1493
1494 // Another insert should trigger eviction + load snapshot from db
1495 ASSERT_OK(db->Put(woptions, "key", "value"));
1496
1497 // the snapshot and related metadata must be properly garbage collected
1498 ASSERT_EQ(0, wp_db->snapshots_total_);
1499 ASSERT_TRUE(wp_db->snapshots_all_.empty());
1500 ASSERT_EQ(0, wp_db->old_commit_map_.size());
1501 }
1502
1503 TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) {
1504 auto snap = db->GetSnapshot();
1505 auto seq1 = snap->GetSequenceNumber();
1506 db->ReleaseSnapshot(snap);
1507
1508 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1509 wp_db->AdvanceSeqByOne();
1510
1511 snap = db->GetSnapshot();
1512 auto seq2 = snap->GetSequenceNumber();
1513 db->ReleaseSnapshot(snap);
1514
1515 ASSERT_LT(seq1, seq2);
1516 }
1517
1518 // Test that the txn Initilize calls the overridden functions
1519 TEST_P(WritePreparedTransactionTest, TxnInitialize) {
1520 TransactionOptions txn_options;
1521 WriteOptions write_options;
1522 ASSERT_OK(db->Put(write_options, "key", "value"));
1523 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
1524 ASSERT_OK(txn0->SetName("xid"));
1525 ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
1526 ASSERT_OK(txn0->Prepare());
1527
1528 // SetSnapshot is overridden to update min_uncommitted_
1529 txn_options.set_snapshot = true;
1530 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1531 auto snap = txn1->GetSnapshot();
1532 auto snap_impl = reinterpret_cast<const SnapshotImpl*>(snap);
1533 // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
1534 // udpated
1535 ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq);
1536
1537 txn0->Rollback();
1538 txn1->Rollback();
1539 delete txn0;
1540 delete txn1;
1541 }
1542
1543 // This tests that transactions with duplicate keys perform correctly after max
1544 // is advancing their prepared sequence numbers. This will not be the case if
1545 // for example the txn does not add the prepared seq for the second sub-batch to
1546 // the PreparedHeap structure.
1547 TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicates) {
1548 const size_t snapshot_cache_bits = 7; // same as default
1549 const size_t commit_cache_bits = 1; // disable commit cache
1550 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1551 ReOpen();
1552
1553 ReadOptions ropt;
1554 PinnableSlice pinnable_val;
1555 WriteOptions write_options;
1556 TransactionOptions txn_options;
1557 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
1558 ASSERT_OK(txn0->SetName("xid"));
1559 ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
1560 ASSERT_OK(txn0->Put(Slice("key"), Slice("value2")));
1561 ASSERT_OK(txn0->Prepare());
1562
1563 ASSERT_OK(db->Put(write_options, "key2", "value"));
1564 // Will cause max advance due to disabled commit cache
1565 ASSERT_OK(db->Put(write_options, "key3", "value"));
1566
1567 auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1568 ASSERT_TRUE(s.IsNotFound());
1569 delete txn0;
1570
1571 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1572 wp_db->db_impl_->FlushWAL(true);
1573 wp_db->TEST_Crash();
1574 ReOpenNoDelete();
1575 assert(db != nullptr);
1576 s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1577 ASSERT_TRUE(s.IsNotFound());
1578
1579 txn0 = db->GetTransactionByName("xid");
1580 ASSERT_OK(txn0->Rollback());
1581 delete txn0;
1582 }
1583
1584 #ifndef ROCKSDB_VALGRIND_RUN
1585 // Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and
1586 // delayed_prepared_, when is run concurrently with advancing max_evicted_seq,
1587 // which moves prepared txns from prepared_txns_ to delayed_prepared_.
1588 TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) {
1589 const size_t snapshot_cache_bits = 7; // same as default
1590 const size_t commit_cache_bits = 1; // disable commit cache
1591 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1592 ReOpen();
1593 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1594 ReadOptions ropt;
1595 PinnableSlice pinnable_val;
1596 WriteOptions write_options;
1597 TransactionOptions txn_options;
1598 std::vector<Transaction*> txns, committed_txns;
1599
1600 const int cnt = 100;
1601 for (int i = 0; i < cnt; i++) {
1602 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1603 ASSERT_OK(txn->SetName("xid" + ToString(i)));
1604 auto key = "key1" + ToString(i);
1605 auto value = "value1" + ToString(i);
1606 ASSERT_OK(txn->Put(Slice(key), Slice(value)));
1607 ASSERT_OK(txn->Prepare());
1608 txns.push_back(txn);
1609 }
1610
1611 port::Mutex mutex;
1612 Random rnd(1103);
1613 ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
1614 for (int i = 0; i < cnt; i++) {
1615 uint32_t index = rnd.Uniform(cnt - i);
1616 Transaction* txn;
1617 {
1618 MutexLock l(&mutex);
1619 txn = txns[index];
1620 txns.erase(txns.begin() + index);
1621 }
1622 // Since commit cache is practically disabled, commit results in immediate
1623 // advance in max_evicted_seq_ and subsequently moving some prepared txns
1624 // to delayed_prepared_.
1625 txn->Commit();
1626 committed_txns.push_back(txn);
1627 }
1628 });
1629 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
1630 while (1) {
1631 MutexLock l(&mutex);
1632 if (txns.empty()) {
1633 break;
1634 }
1635 auto min_uncommitted = wp_db->SmallestUnCommittedSeq();
1636 ASSERT_LE(min_uncommitted, (*txns.begin())->GetId());
1637 }
1638 });
1639
1640 commit_thread.join();
1641 read_thread.join();
1642 for (auto txn : committed_txns) {
1643 delete txn;
1644 }
1645 }
1646 #endif // ROCKSDB_VALGRIND_RUN
1647
1648 TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
1649 // Given the sequential run of txns, with this timeout we should never see a
1650 // deadlock nor a timeout unless we have a key conflict, which should be
1651 // almost infeasible.
1652 txn_db_options.transaction_lock_timeout = 1000;
1653 txn_db_options.default_lock_timeout = 1000;
1654 ReOpen();
1655 FlushOptions fopt;
1656
1657 // Number of different txn types we use in this test
1658 const size_t type_cnt = 5;
1659 // The size of the first write group
1660 // TODO(myabandeh): This should be increase for pre-release tests
1661 const size_t first_group_size = 2;
1662 // Total number of txns we run in each test
1663 // TODO(myabandeh): This should be increase for pre-release tests
1664 const size_t txn_cnt = first_group_size + 1;
1665
1666 size_t base[txn_cnt + 1] = {
1667 1,
1668 };
1669 for (size_t bi = 1; bi <= txn_cnt; bi++) {
1670 base[bi] = base[bi - 1] * type_cnt;
1671 }
1672 const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt));
1673 printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n);
1674 for (size_t n = 0; n < max_n; n++, ReOpen()) {
1675 if (n % split_cnt_ != split_id_) continue;
1676 if (n % 1000 == 0) {
1677 printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
1678 }
1679 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1680 auto seq = db_impl->TEST_GetLastVisibleSequence();
1681 with_empty_commits = 0;
1682 exp_seq = seq;
1683 // This is increased before writing the batch for commit
1684 commit_writes = 0;
1685 // This is increased before txn starts linking if it expects to do a commit
1686 // eventually
1687 expected_commits = 0;
1688 std::vector<port::Thread> threads;
1689
1690 linked = 0;
1691 std::atomic<bool> batch_formed(false);
1692 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1693 "WriteThread::EnterAsBatchGroupLeader:End",
1694 [&](void* /*arg*/) { batch_formed = true; });
1695 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1696 "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
1697 linked++;
1698 if (linked == 1) {
1699 // Wait until the others are linked too.
1700 while (linked < first_group_size) {
1701 }
1702 } else if (linked == 1 + first_group_size) {
1703 // Make the 2nd batch of the rest of writes plus any followup
1704 // commits from the first batch
1705 while (linked < txn_cnt + commit_writes) {
1706 }
1707 }
1708 // Then we will have one or more batches consisting of follow-up
1709 // commits from the 2nd batch. There is a bit of non-determinism here
1710 // but it should be tolerable.
1711 });
1712
1713 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1714 for (size_t bi = 0; bi < txn_cnt; bi++) {
1715 // get the bi-th digit in number system based on type_cnt
1716 size_t d = (n % base[bi + 1]) / base[bi];
1717 switch (d) {
1718 case 0:
1719 threads.emplace_back(txn_t0, bi);
1720 break;
1721 case 1:
1722 threads.emplace_back(txn_t1, bi);
1723 break;
1724 case 2:
1725 threads.emplace_back(txn_t2, bi);
1726 break;
1727 case 3:
1728 threads.emplace_back(txn_t3, bi);
1729 break;
1730 case 4:
1731 threads.emplace_back(txn_t3, bi);
1732 break;
1733 default:
1734 assert(false);
1735 }
1736 // wait to be linked
1737 while (linked.load() <= bi) {
1738 }
1739 // after a queue of size first_group_size
1740 if (bi + 1 == first_group_size) {
1741 while (!batch_formed) {
1742 }
1743 // to make it more deterministic, wait until the commits are linked
1744 while (linked.load() <= bi + expected_commits) {
1745 }
1746 }
1747 }
1748 for (auto& t : threads) {
1749 t.join();
1750 }
1751 if (options.two_write_queues) {
1752 // In this case none of the above scheduling tricks to deterministically
1753 // form merged batches works because the writes go to separate queues.
1754 // This would result in different write groups in each run of the test. We
1755 // still keep the test since although non-deterministic and hard to debug,
1756 // it is still useful to have.
1757 // TODO(myabandeh): Add a deterministic unit test for two_write_queues
1758 }
1759
1760 // Check if memtable inserts advanced seq number as expected
1761 seq = db_impl->TEST_GetLastVisibleSequence();
1762 ASSERT_EQ(exp_seq, seq);
1763
1764 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1765 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1766
1767 // Check if recovery preserves the last sequence number
1768 db_impl->FlushWAL(true);
1769 ReOpenNoDelete();
1770 assert(db != nullptr);
1771 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1772 seq = db_impl->TEST_GetLastVisibleSequence();
1773 ASSERT_LE(exp_seq, seq + with_empty_commits);
1774
1775 // Check if flush preserves the last sequence number
1776 db_impl->Flush(fopt);
1777 seq = db_impl->GetLatestSequenceNumber();
1778 ASSERT_LE(exp_seq, seq + with_empty_commits);
1779
1780 // Check if recovery after flush preserves the last sequence number
1781 db_impl->FlushWAL(true);
1782 ReOpenNoDelete();
1783 assert(db != nullptr);
1784 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1785 seq = db_impl->GetLatestSequenceNumber();
1786 ASSERT_LE(exp_seq, seq + with_empty_commits);
1787 }
1788 }
1789
1790 // Run a couple of different txns among them some uncommitted. Restart the db at
1791 // a couple points to check whether the list of uncommitted txns are recovered
1792 // properly.
1793 TEST_P(WritePreparedTransactionTest, BasicRecovery) {
1794 options.disable_auto_compactions = true;
1795 ReOpen();
1796 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1797
1798 txn_t0(0);
1799
1800 TransactionOptions txn_options;
1801 WriteOptions write_options;
1802 size_t index = 1000;
1803 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
1804 auto istr0 = std::to_string(index);
1805 auto s = txn0->SetName("xid" + istr0);
1806 ASSERT_OK(s);
1807 s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0));
1808 ASSERT_OK(s);
1809 s = txn0->Prepare();
1810 auto prep_seq_0 = txn0->GetId();
1811
1812 txn_t1(0);
1813
1814 index++;
1815 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1816 auto istr1 = std::to_string(index);
1817 s = txn1->SetName("xid" + istr1);
1818 ASSERT_OK(s);
1819 s = txn1->Put(Slice("foo1" + istr1), Slice("bar"));
1820 ASSERT_OK(s);
1821 s = txn1->Prepare();
1822 auto prep_seq_1 = txn1->GetId();
1823
1824 txn_t2(0);
1825
1826 ReadOptions ropt;
1827 PinnableSlice pinnable_val;
1828 // Check the value is not committed before restart
1829 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1830 ASSERT_TRUE(s.IsNotFound());
1831 pinnable_val.Reset();
1832
1833 delete txn0;
1834 delete txn1;
1835 wp_db->db_impl_->FlushWAL(true);
1836 wp_db->TEST_Crash();
1837 ReOpenNoDelete();
1838 assert(db != nullptr);
1839 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1840 // After recovery, all the uncommitted txns (0 and 1) should be inserted into
1841 // delayed_prepared_
1842 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1843 ASSERT_FALSE(wp_db->delayed_prepared_empty_);
1844 ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
1845 ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_);
1846 {
1847 ReadLock rl(&wp_db->prepared_mutex_);
1848 ASSERT_EQ(2, wp_db->delayed_prepared_.size());
1849 ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) !=
1850 wp_db->delayed_prepared_.end());
1851 ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) !=
1852 wp_db->delayed_prepared_.end());
1853 }
1854
1855 // Check the value is still not committed after restart
1856 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1857 ASSERT_TRUE(s.IsNotFound());
1858 pinnable_val.Reset();
1859
1860 txn_t3(0);
1861
1862 // Test that a recovered txns will be properly marked committed for the next
1863 // recovery
1864 txn1 = db->GetTransactionByName("xid" + istr1);
1865 ASSERT_NE(txn1, nullptr);
1866 txn1->Commit();
1867 delete txn1;
1868
1869 index++;
1870 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1871 auto istr2 = std::to_string(index);
1872 s = txn2->SetName("xid" + istr2);
1873 ASSERT_OK(s);
1874 s = txn2->Put(Slice("foo2" + istr2), Slice("bar"));
1875 ASSERT_OK(s);
1876 s = txn2->Prepare();
1877 auto prep_seq_2 = txn2->GetId();
1878
1879 delete txn2;
1880 wp_db->db_impl_->FlushWAL(true);
1881 wp_db->TEST_Crash();
1882 ReOpenNoDelete();
1883 assert(db != nullptr);
1884 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1885 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1886 ASSERT_FALSE(wp_db->delayed_prepared_empty_);
1887
1888 // 0 and 2 are prepared and 1 is committed
1889 {
1890 ReadLock rl(&wp_db->prepared_mutex_);
1891 ASSERT_EQ(2, wp_db->delayed_prepared_.size());
1892 const auto& end = wp_db->delayed_prepared_.end();
1893 ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end);
1894 ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end);
1895 ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end);
1896 }
1897 ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
1898 ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_);
1899
1900 // Commit all the remaining txns
1901 txn0 = db->GetTransactionByName("xid" + istr0);
1902 ASSERT_NE(txn0, nullptr);
1903 txn0->Commit();
1904 txn2 = db->GetTransactionByName("xid" + istr2);
1905 ASSERT_NE(txn2, nullptr);
1906 txn2->Commit();
1907
1908 // Check the value is committed after commit
1909 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1910 ASSERT_TRUE(s.ok());
1911 ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
1912 pinnable_val.Reset();
1913
1914 delete txn0;
1915 delete txn2;
1916 wp_db->db_impl_->FlushWAL(true);
1917 ReOpenNoDelete();
1918 assert(db != nullptr);
1919 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1920 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1921 ASSERT_TRUE(wp_db->delayed_prepared_empty_);
1922
1923 // Check the value is still committed after recovery
1924 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1925 ASSERT_TRUE(s.ok());
1926 ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
1927 pinnable_val.Reset();
1928 }
1929
1930 // After recovery the commit map is empty while the max is set. The code would
1931 // go through a different path which requires a separate test. Test that the
1932 // committed data before the restart is visible to all snapshots.
1933 TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMap) {
1934 for (bool end_with_prepare : {false, true}) {
1935 ReOpen();
1936 WriteOptions woptions;
1937 ASSERT_OK(db->Put(woptions, "key", "value"));
1938 ASSERT_OK(db->Put(woptions, "key", "value"));
1939 ASSERT_OK(db->Put(woptions, "key", "value"));
1940 SequenceNumber prepare_seq = kMaxSequenceNumber;
1941 if (end_with_prepare) {
1942 TransactionOptions txn_options;
1943 Transaction* txn = db->BeginTransaction(woptions, txn_options);
1944 ASSERT_OK(txn->SetName("xid0"));
1945 ASSERT_OK(txn->Prepare());
1946 prepare_seq = txn->GetId();
1947 delete txn;
1948 }
1949 dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
1950 auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1951 db_impl->FlushWAL(true);
1952 ReOpenNoDelete();
1953 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1954 assert(wp_db != nullptr);
1955 ASSERT_GT(wp_db->max_evicted_seq_, 0); // max after recovery
1956 // Take a snapshot right after recovery
1957 const Snapshot* snap = db->GetSnapshot();
1958 auto snap_seq = snap->GetSequenceNumber();
1959 ASSERT_GT(snap_seq, 0);
1960
1961 for (SequenceNumber seq = 0;
1962 seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
1963 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
1964 }
1965 if (end_with_prepare) {
1966 ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
1967 }
1968 // trivial check
1969 ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
1970
1971 db->ReleaseSnapshot(snap);
1972
1973 ASSERT_OK(db->Put(woptions, "key", "value"));
1974 // Take a snapshot after some writes
1975 snap = db->GetSnapshot();
1976 snap_seq = snap->GetSequenceNumber();
1977 for (SequenceNumber seq = 0;
1978 seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
1979 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
1980 }
1981 if (end_with_prepare) {
1982 ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
1983 }
1984 // trivial check
1985 ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
1986
1987 db->ReleaseSnapshot(snap);
1988 }
1989 }
1990
1991 // Shows the contract of IsInSnapshot when called on invalid/released snapshots
1992 TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) {
1993 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1994 WriteOptions woptions;
1995 ASSERT_OK(db->Put(woptions, "key", "value"));
1996 // snap seq = 1
1997 const Snapshot* snap1 = db->GetSnapshot();
1998 ASSERT_OK(db->Put(woptions, "key", "value"));
1999 ASSERT_OK(db->Put(woptions, "key", "value"));
2000 // snap seq = 3
2001 const Snapshot* snap2 = db->GetSnapshot();
2002 const SequenceNumber seq = 1;
2003 // Evict seq out of commit cache
2004 size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq;
2005 wp_db->AddCommitted(overwrite_seq, overwrite_seq);
2006 SequenceNumber snap_seq;
2007 uint64_t min_uncommitted = kMinUnCommittedSeq;
2008 bool released;
2009
2010 released = false;
2011 snap_seq = snap1->GetSequenceNumber();
2012 ASSERT_LE(seq, snap_seq);
2013 // Valid snapshot lower than max
2014 ASSERT_LE(snap_seq, wp_db->max_evicted_seq_);
2015 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2016 ASSERT_FALSE(released);
2017
2018 released = false;
2019 snap_seq = snap1->GetSequenceNumber();
2020 // Invaid snapshot lower than max
2021 ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_);
2022 ASSERT_TRUE(
2023 wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
2024 ASSERT_TRUE(released);
2025
2026 db->ReleaseSnapshot(snap1);
2027
2028 released = false;
2029 // Released snapshot lower than max
2030 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2031 // The release does not take affect until the next max advance
2032 ASSERT_FALSE(released);
2033
2034 released = false;
2035 // Invaid snapshot lower than max
2036 ASSERT_TRUE(
2037 wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
2038 ASSERT_TRUE(released);
2039
2040 // This make the snapshot release to reflect in txn db structures
2041 wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_,
2042 wp_db->max_evicted_seq_ + 1);
2043
2044 released = false;
2045 // Released snapshot lower than max
2046 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2047 ASSERT_TRUE(released);
2048
2049 released = false;
2050 // Invaid snapshot lower than max
2051 ASSERT_TRUE(
2052 wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
2053 ASSERT_TRUE(released);
2054
2055 snap_seq = snap2->GetSequenceNumber();
2056
2057 released = false;
2058 // Unreleased snapshot lower than max
2059 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2060 ASSERT_FALSE(released);
2061
2062 db->ReleaseSnapshot(snap2);
2063 }
2064
2065 // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
2066 // snapshot, max_committed_seq_, prepared, and commit entries.
2067 TEST_P(WritePreparedTransactionTest, IsInSnapshot) {
2068 WriteOptions wo;
2069 // Use small commit cache to trigger lots of eviction and fast advance of
2070 // max_evicted_seq_
2071 const size_t commit_cache_bits = 3;
2072 // Same for snapshot cache size
2073 const size_t snapshot_cache_bits = 2;
2074
2075 // Take some preliminary snapshots first. This is to stress the data structure
2076 // that holds the old snapshots as it will be designed to be efficient when
2077 // only a few snapshots are below the max_evicted_seq_.
2078 for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
2079 // Leave some gap between the preliminary snapshots and the final snapshot
2080 // that we check. This should test for also different overlapping scenarios
2081 // between the last snapshot and the commits.
2082 for (int max_gap = 1; max_gap < 10; max_gap++) {
2083 // Since we do not actually write to db, we mock the seq as it would be
2084 // increased by the db. The only exception is that we need db seq to
2085 // advance for our snapshots. for which we apply a dummy put each time we
2086 // increase our mock of seq.
2087 uint64_t seq = 0;
2088 // At each step we prepare a txn and then we commit it in the next txn.
2089 // This emulates the consecutive transactions that write to the same key
2090 uint64_t cur_txn = 0;
2091 // Number of snapshots taken so far
2092 int num_snapshots = 0;
2093 // Number of gaps applied so far
2094 int gap_cnt = 0;
2095 // The final snapshot that we will inspect
2096 uint64_t snapshot = 0;
2097 bool found_committed = false;
2098 // To stress the data structure that maintain prepared txns, at each cycle
2099 // we add a new prepare txn. These do not mean to be committed for
2100 // snapshot inspection.
2101 std::set<uint64_t> prepared;
2102 // We keep the list of txns committed before we take the last snapshot.
2103 // These should be the only seq numbers that will be found in the snapshot
2104 std::set<uint64_t> committed_before;
2105 // The set of commit seq numbers to be excluded from IsInSnapshot queries
2106 std::set<uint64_t> commit_seqs;
2107 DBImpl* mock_db = new DBImpl(options, dbname);
2108 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2109 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
2110 new WritePreparedTxnDBMock(mock_db, txn_db_options));
2111 // We continue until max advances a bit beyond the snapshot.
2112 while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
2113 // do prepare for a transaction
2114 seq++;
2115 wp_db->AddPrepared(seq);
2116 prepared.insert(seq);
2117
2118 // If cur_txn is not started, do prepare for it.
2119 if (!cur_txn) {
2120 seq++;
2121 cur_txn = seq;
2122 wp_db->AddPrepared(cur_txn);
2123 } else { // else commit it
2124 seq++;
2125 wp_db->AddCommitted(cur_txn, seq);
2126 wp_db->RemovePrepared(cur_txn);
2127 commit_seqs.insert(seq);
2128 if (!snapshot) {
2129 committed_before.insert(cur_txn);
2130 }
2131 cur_txn = 0;
2132 }
2133
2134 if (num_snapshots < max_snapshots - 1) {
2135 // Take preliminary snapshots
2136 wp_db->TakeSnapshot(seq);
2137 num_snapshots++;
2138 } else if (gap_cnt < max_gap) {
2139 // Wait for some gap before taking the final snapshot
2140 gap_cnt++;
2141 } else if (!snapshot) {
2142 // Take the final snapshot if it is not already taken
2143 snapshot = seq;
2144 wp_db->TakeSnapshot(snapshot);
2145 num_snapshots++;
2146 }
2147
2148 // If the snapshot is taken, verify seq numbers visible to it. We redo
2149 // it at each cycle to test that the system is still sound when
2150 // max_evicted_seq_ advances.
2151 if (snapshot) {
2152 for (uint64_t s = 1;
2153 s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) {
2154 bool was_committed =
2155 (committed_before.find(s) != committed_before.end());
2156 bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
2157 if (was_committed != is_in_snapshot) {
2158 printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
2159 " snapshot %" PRIu64
2160 " gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
2161 max_snapshots, max_gap, seq,
2162 wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
2163 num_snapshots, s);
2164 }
2165 ASSERT_EQ(was_committed, is_in_snapshot);
2166 found_committed = found_committed || is_in_snapshot;
2167 }
2168 }
2169 }
2170 // Safety check to make sure the test actually ran
2171 ASSERT_TRUE(found_committed);
2172 // As an extra check, check if prepared set will be properly empty after
2173 // they are committed.
2174 if (cur_txn) {
2175 wp_db->AddCommitted(cur_txn, seq);
2176 wp_db->RemovePrepared(cur_txn);
2177 }
2178 for (auto p : prepared) {
2179 wp_db->AddCommitted(p, seq);
2180 wp_db->RemovePrepared(p);
2181 }
2182 ASSERT_TRUE(wp_db->delayed_prepared_.empty());
2183 ASSERT_TRUE(wp_db->prepared_txns_.empty());
2184 }
2185 }
2186 }
2187
2188 void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
2189 PinnableSlice& exp_v, Slice key) {
2190 Status s;
2191 PinnableSlice v;
2192 s = db->Get(roptions, db->DefaultColumnFamily(), key, &v);
2193 ASSERT_TRUE(exp_s == s);
2194 ASSERT_TRUE(s.ok() || s.IsNotFound());
2195 if (s.ok()) {
2196 ASSERT_TRUE(exp_v == v);
2197 }
2198
2199 // Try with MultiGet API too
2200 std::vector<std::string> values;
2201 auto s_vec =
2202 db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values);
2203 ASSERT_EQ(1, values.size());
2204 ASSERT_EQ(1, s_vec.size());
2205 s = s_vec[0];
2206 ASSERT_TRUE(exp_s == s);
2207 ASSERT_TRUE(s.ok() || s.IsNotFound());
2208 if (s.ok()) {
2209 ASSERT_TRUE(exp_v == values[0]);
2210 }
2211 }
2212
2213 void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v,
2214 Slice key) {
2215 ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key);
2216 }
2217
2218 TEST_P(WritePreparedTransactionTest, Rollback) {
2219 ReadOptions roptions;
2220 WriteOptions woptions;
2221 TransactionOptions txn_options;
2222 const size_t num_keys = 4;
2223 const size_t num_values = 5;
2224 for (size_t ikey = 1; ikey <= num_keys; ikey++) {
2225 for (size_t ivalue = 0; ivalue < num_values; ivalue++) {
2226 for (bool crash : {false, true}) {
2227 ReOpen();
2228 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
2229 std::string key_str = "key" + ToString(ikey);
2230 switch (ivalue) {
2231 case 0:
2232 break;
2233 case 1:
2234 ASSERT_OK(db->Put(woptions, key_str, "initvalue1"));
2235 break;
2236 case 2:
2237 ASSERT_OK(db->Merge(woptions, key_str, "initvalue2"));
2238 break;
2239 case 3:
2240 ASSERT_OK(db->Delete(woptions, key_str));
2241 break;
2242 case 4:
2243 ASSERT_OK(db->SingleDelete(woptions, key_str));
2244 break;
2245 default:
2246 assert(0);
2247 }
2248
2249 PinnableSlice v1;
2250 auto s1 =
2251 db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1);
2252 PinnableSlice v2;
2253 auto s2 =
2254 db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2);
2255 PinnableSlice v3;
2256 auto s3 =
2257 db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3);
2258 PinnableSlice v4;
2259 auto s4 =
2260 db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4);
2261 Transaction* txn = db->BeginTransaction(woptions, txn_options);
2262 auto s = txn->SetName("xid0");
2263 ASSERT_OK(s);
2264 s = txn->Put(Slice("key1"), Slice("value1"));
2265 ASSERT_OK(s);
2266 s = txn->Merge(Slice("key2"), Slice("value2"));
2267 ASSERT_OK(s);
2268 s = txn->Delete(Slice("key3"));
2269 ASSERT_OK(s);
2270 s = txn->SingleDelete(Slice("key4"));
2271 ASSERT_OK(s);
2272 s = txn->Prepare();
2273 ASSERT_OK(s);
2274
2275 {
2276 ReadLock rl(&wp_db->prepared_mutex_);
2277 ASSERT_FALSE(wp_db->prepared_txns_.empty());
2278 ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top());
2279 }
2280
2281 ASSERT_SAME(db, s1, v1, "key1");
2282 ASSERT_SAME(db, s2, v2, "key2");
2283 ASSERT_SAME(db, s3, v3, "key3");
2284 ASSERT_SAME(db, s4, v4, "key4");
2285
2286 if (crash) {
2287 delete txn;
2288 auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
2289 db_impl->FlushWAL(true);
2290 dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
2291 ReOpenNoDelete();
2292 assert(db != nullptr);
2293 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
2294 txn = db->GetTransactionByName("xid0");
2295 ASSERT_FALSE(wp_db->delayed_prepared_empty_);
2296 ReadLock rl(&wp_db->prepared_mutex_);
2297 ASSERT_TRUE(wp_db->prepared_txns_.empty());
2298 ASSERT_FALSE(wp_db->delayed_prepared_.empty());
2299 ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) !=
2300 wp_db->delayed_prepared_.end());
2301 }
2302
2303 ASSERT_SAME(db, s1, v1, "key1");
2304 ASSERT_SAME(db, s2, v2, "key2");
2305 ASSERT_SAME(db, s3, v3, "key3");
2306 ASSERT_SAME(db, s4, v4, "key4");
2307
2308 s = txn->Rollback();
2309 ASSERT_OK(s);
2310
2311 {
2312 ASSERT_TRUE(wp_db->delayed_prepared_empty_);
2313 ReadLock rl(&wp_db->prepared_mutex_);
2314 ASSERT_TRUE(wp_db->prepared_txns_.empty());
2315 ASSERT_TRUE(wp_db->delayed_prepared_.empty());
2316 }
2317
2318 ASSERT_SAME(db, s1, v1, "key1");
2319 ASSERT_SAME(db, s2, v2, "key2");
2320 ASSERT_SAME(db, s3, v3, "key3");
2321 ASSERT_SAME(db, s4, v4, "key4");
2322 delete txn;
2323 }
2324 }
2325 }
2326 }
2327
2328 TEST_P(WritePreparedTransactionTest, DisableGCDuringRecovery) {
2329 // Use large buffer to avoid memtable flush after 1024 insertions
2330 options.write_buffer_size = 1024 * 1024;
2331 ReOpen();
2332 std::vector<KeyVersion> versions;
2333 uint64_t seq = 0;
2334 for (uint64_t i = 1; i <= 1024; i++) {
2335 std::string v = "bar" + ToString(i);
2336 ASSERT_OK(db->Put(WriteOptions(), "foo", v));
2337 VerifyKeys({{"foo", v}});
2338 seq++; // one for the key/value
2339 KeyVersion kv = {"foo", v, seq, kTypeValue};
2340 if (options.two_write_queues) {
2341 seq++; // one for the commit
2342 }
2343 versions.emplace_back(kv);
2344 }
2345 std::reverse(std::begin(versions), std::end(versions));
2346 VerifyInternalKeys(versions);
2347 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
2348 db_impl->FlushWAL(true);
2349 // Use small buffer to ensure memtable flush during recovery
2350 options.write_buffer_size = 1024;
2351 ReOpenNoDelete();
2352 VerifyInternalKeys(versions);
2353 }
2354
2355 TEST_P(WritePreparedTransactionTest, SequenceNumberZero) {
2356 ASSERT_OK(db->Put(WriteOptions(), "foo", "bar"));
2357 VerifyKeys({{"foo", "bar"}});
2358 const Snapshot* snapshot = db->GetSnapshot();
2359 ASSERT_OK(db->Flush(FlushOptions()));
2360 // Dummy keys to avoid compaction trivially move files and get around actual
2361 // compaction logic.
2362 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2363 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2364 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2365 // Compaction will output keys with sequence number 0, if it is visible to
2366 // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
2367 // visible to any snapshot.
2368 VerifyKeys({{"foo", "bar"}});
2369 VerifyKeys({{"foo", "bar"}}, snapshot);
2370 VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}});
2371 db->ReleaseSnapshot(snapshot);
2372 }
2373
2374 // Compaction should not remove a key if it is not committed, and should
2375 // proceed with older versions of the key as-if the new version doesn't exist.
2376 TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
2377 options.disable_auto_compactions = true;
2378 ReOpen();
2379 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
2380 // Snapshots to avoid keys get evicted.
2381 std::vector<const Snapshot*> snapshots;
2382 // Keep track of expected sequence number.
2383 SequenceNumber expected_seq = 0;
2384
2385 auto add_key = [&](std::function<Status()> func) {
2386 ASSERT_OK(func());
2387 expected_seq++;
2388 if (options.two_write_queues) {
2389 expected_seq++; // 1 for commit
2390 }
2391 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
2392 snapshots.push_back(db->GetSnapshot());
2393 };
2394
2395 // Each key here represent a standalone test case.
2396 add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); });
2397 add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); });
2398 add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); });
2399 add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); });
2400 add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); });
2401 add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); });
2402 add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); });
2403 add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); });
2404 ASSERT_OK(db->Flush(FlushOptions()));
2405 add_key([&]() { return db->Delete(WriteOptions(), "key6"); });
2406 add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); });
2407
2408 auto* transaction = db->BeginTransaction(WriteOptions());
2409 ASSERT_OK(transaction->SetName("txn"));
2410 ASSERT_OK(transaction->Put("key1", "value1_2"));
2411 ASSERT_OK(transaction->Delete("key2"));
2412 ASSERT_OK(transaction->SingleDelete("key3"));
2413 ASSERT_OK(transaction->Merge("key4", "value4_2"));
2414 ASSERT_OK(transaction->Merge("key5", "value5_3"));
2415 ASSERT_OK(transaction->Put("key6", "value6_2"));
2416 ASSERT_OK(transaction->Put("key7", "value7_2"));
2417 // Prepare but not commit.
2418 ASSERT_OK(transaction->Prepare());
2419 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
2420 ASSERT_OK(db->Flush(FlushOptions()));
2421 for (auto* s : snapshots) {
2422 db->ReleaseSnapshot(s);
2423 }
2424 // Dummy keys to avoid compaction trivially move files and get around actual
2425 // compaction logic.
2426 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2427 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2428 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2429 VerifyKeys({
2430 {"key1", "value1_1"},
2431 {"key2", "value2_1"},
2432 {"key3", "value3_1"},
2433 {"key4", "value4_1"},
2434 {"key5", "value5_1,value5_2"},
2435 {"key6", "NOT_FOUND"},
2436 {"key7", "NOT_FOUND"},
2437 });
2438 VerifyInternalKeys({
2439 {"key1", "value1_2", expected_seq, kTypeValue},
2440 {"key1", "value1_1", 0, kTypeValue},
2441 {"key2", "", expected_seq, kTypeDeletion},
2442 {"key2", "value2_1", 0, kTypeValue},
2443 {"key3", "", expected_seq, kTypeSingleDeletion},
2444 {"key3", "value3_1", 0, kTypeValue},
2445 {"key4", "value4_2", expected_seq, kTypeMerge},
2446 {"key4", "value4_1", 0, kTypeValue},
2447 {"key5", "value5_3", expected_seq, kTypeMerge},
2448 {"key5", "value5_1,value5_2", 0, kTypeValue},
2449 {"key6", "value6_2", expected_seq, kTypeValue},
2450 {"key7", "value7_2", expected_seq, kTypeValue},
2451 });
2452 ASSERT_OK(transaction->Commit());
2453 VerifyKeys({
2454 {"key1", "value1_2"},
2455 {"key2", "NOT_FOUND"},
2456 {"key3", "NOT_FOUND"},
2457 {"key4", "value4_1,value4_2"},
2458 {"key5", "value5_1,value5_2,value5_3"},
2459 {"key6", "value6_2"},
2460 {"key7", "value7_2"},
2461 });
2462 delete transaction;
2463 }
2464
2465 // Compaction should keep keys visible to a snapshot based on commit sequence,
2466 // not just prepare sequence.
2467 TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
2468 options.disable_auto_compactions = true;
2469 ReOpen();
2470 // Keep track of expected sequence number.
2471 SequenceNumber expected_seq = 0;
2472 auto* txn1 = db->BeginTransaction(WriteOptions());
2473 ASSERT_OK(txn1->SetName("txn1"));
2474 ASSERT_OK(txn1->Put("key1", "value1_1"));
2475 ASSERT_OK(txn1->Prepare());
2476 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
2477 ASSERT_OK(txn1->Commit());
2478 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
2479 ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
2480 delete txn1;
2481 // Take a snapshots to avoid keys get evicted before compaction.
2482 const Snapshot* snapshot1 = db->GetSnapshot();
2483 auto* txn2 = db->BeginTransaction(WriteOptions());
2484 ASSERT_OK(txn2->SetName("txn2"));
2485 ASSERT_OK(txn2->Put("key2", "value2_1"));
2486 ASSERT_OK(txn2->Prepare());
2487 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
2488 // txn1 commit before snapshot2 and it is visible to snapshot2.
2489 // txn2 commit after snapshot2 and it is not visible.
2490 const Snapshot* snapshot2 = db->GetSnapshot();
2491 ASSERT_OK(txn2->Commit());
2492 ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
2493 delete txn2;
2494 // Take a snapshots to avoid keys get evicted before compaction.
2495 const Snapshot* snapshot3 = db->GetSnapshot();
2496 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
2497 expected_seq++; // 1 for write
2498 SequenceNumber seq1 = expected_seq;
2499 if (options.two_write_queues) {
2500 expected_seq++; // 1 for commit
2501 }
2502 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
2503 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
2504 expected_seq++; // 1 for write
2505 SequenceNumber seq2 = expected_seq;
2506 if (options.two_write_queues) {
2507 expected_seq++; // 1 for commit
2508 }
2509 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
2510 ASSERT_OK(db->Flush(FlushOptions()));
2511 db->ReleaseSnapshot(snapshot1);
2512 db->ReleaseSnapshot(snapshot3);
2513 // Dummy keys to avoid compaction trivially move files and get around actual
2514 // compaction logic.
2515 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2516 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2517 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2518 VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
2519 VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2);
2520 VerifyInternalKeys({
2521 {"key1", "value1_2", seq1, kTypeValue},
2522 // "value1_1" is visible to snapshot2. Also keys at bottom level visible
2523 // to earliest snapshot will output with seq = 0.
2524 {"key1", "value1_1", 0, kTypeValue},
2525 {"key2", "value2_2", seq2, kTypeValue},
2526 });
2527 db->ReleaseSnapshot(snapshot2);
2528 }
2529
2530 TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
2531 const size_t snapshot_cache_bits = 7; // same as default
2532 const size_t commit_cache_bits = 0; // disable commit cache
2533 for (bool has_recent_prepare : {true, false}) {
2534 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2535 ReOpen();
2536
2537 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2538 auto* transaction =
2539 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2540 ASSERT_OK(transaction->SetName("txn"));
2541 ASSERT_OK(transaction->Delete("key1"));
2542 ASSERT_OK(transaction->Prepare());
2543 // snapshot1 should get min_uncommitted from prepared_txns_ heap.
2544 auto snapshot1 = db->GetSnapshot();
2545 ASSERT_EQ(transaction->GetId(),
2546 ((SnapshotImpl*)snapshot1)->min_uncommitted_);
2547 // Add a commit to advance max_evicted_seq and move the prepared transaction
2548 // into delayed_prepared_ set.
2549 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
2550 Transaction* txn2 = nullptr;
2551 if (has_recent_prepare) {
2552 txn2 =
2553 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2554 ASSERT_OK(txn2->SetName("txn2"));
2555 ASSERT_OK(txn2->Put("key3", "value3"));
2556 ASSERT_OK(txn2->Prepare());
2557 }
2558 // snapshot2 should get min_uncommitted from delayed_prepared_ set.
2559 auto snapshot2 = db->GetSnapshot();
2560 ASSERT_EQ(transaction->GetId(),
2561 ((SnapshotImpl*)snapshot1)->min_uncommitted_);
2562 ASSERT_OK(transaction->Commit());
2563 delete transaction;
2564 if (has_recent_prepare) {
2565 ASSERT_OK(txn2->Commit());
2566 delete txn2;
2567 }
2568 VerifyKeys({{"key1", "NOT_FOUND"}});
2569 VerifyKeys({{"key1", "value1"}}, snapshot1);
2570 VerifyKeys({{"key1", "value1"}}, snapshot2);
2571 db->ReleaseSnapshot(snapshot1);
2572 db->ReleaseSnapshot(snapshot2);
2573 }
2574 }
2575
2576 // Insert two values, v1 and v2, for a key. Between prepare and commit of v2
2577 // take two snapshots, s1 and s2. Release s1 during compaction.
2578 // Test to make sure compaction doesn't get confused and think s1 can see both
2579 // values, and thus compact out the older value by mistake.
2580 TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
2581 const size_t snapshot_cache_bits = 7; // same as default
2582 const size_t commit_cache_bits = 0; // minimum commit cache
2583 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2584 ReOpen();
2585
2586 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1"));
2587 auto* transaction =
2588 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2589 ASSERT_OK(transaction->SetName("txn"));
2590 ASSERT_OK(transaction->Put("key1", "value1_2"));
2591 ASSERT_OK(transaction->Prepare());
2592 auto snapshot1 = db->GetSnapshot();
2593 // Increment sequence number.
2594 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
2595 auto snapshot2 = db->GetSnapshot();
2596 ASSERT_OK(transaction->Commit());
2597 delete transaction;
2598 VerifyKeys({{"key1", "value1_2"}});
2599 VerifyKeys({{"key1", "value1_1"}}, snapshot1);
2600 VerifyKeys({{"key1", "value1_1"}}, snapshot2);
2601 // Add a flush to avoid compaction to fallback to trivial move.
2602
2603 auto callback = [&](void*) {
2604 // Release snapshot1 after CompactionIterator init.
2605 // CompactionIterator need to figure out the earliest snapshot
2606 // that can see key1:value1_2 is kMaxSequenceNumber, not
2607 // snapshot1 or snapshot2.
2608 db->ReleaseSnapshot(snapshot1);
2609 // Add some keys to advance max_evicted_seq.
2610 ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
2611 ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
2612 };
2613 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2614 callback);
2615 SyncPoint::GetInstance()->EnableProcessing();
2616
2617 ASSERT_OK(db->Flush(FlushOptions()));
2618 VerifyKeys({{"key1", "value1_2"}});
2619 VerifyKeys({{"key1", "value1_1"}}, snapshot2);
2620 db->ReleaseSnapshot(snapshot2);
2621 SyncPoint::GetInstance()->ClearAllCallBacks();
2622 }
2623
2624 // Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2,
2625 // after committing v2. Release s1 during compaction, right after compaction
2626 // processes v2 and before processes v1. Test to make sure compaction doesn't
2627 // get confused and believe v1 and v2 are visible to different snapshot
2628 // (v1 by s2, v2 by s1) and refuse to compact out v1.
2629 TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
2630 const size_t snapshot_cache_bits = 7; // same as default
2631 const size_t commit_cache_bits = 0; // minimum commit cache
2632 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2633 ReOpen();
2634
2635 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2636 ASSERT_OK(db->Put(WriteOptions(), "key1", "value2"));
2637 SequenceNumber v2_seq = db->GetLatestSequenceNumber();
2638 auto* s1 = db->GetSnapshot();
2639 // Advance sequence number.
2640 ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy"));
2641 auto* s2 = db->GetSnapshot();
2642
2643 int count_value = 0;
2644 auto callback = [&](void* arg) {
2645 auto* ikey = reinterpret_cast<ParsedInternalKey*>(arg);
2646 if (ikey->user_key == "key1") {
2647 count_value++;
2648 if (count_value == 2) {
2649 // Processing v1.
2650 db->ReleaseSnapshot(s1);
2651 // Add some keys to advance max_evicted_seq and update
2652 // old_commit_map.
2653 ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy"));
2654 ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy"));
2655 }
2656 }
2657 };
2658 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV",
2659 callback);
2660 SyncPoint::GetInstance()->EnableProcessing();
2661
2662 ASSERT_OK(db->Flush(FlushOptions()));
2663 // value1 should be compact out.
2664 VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
2665
2666 // cleanup
2667 db->ReleaseSnapshot(s2);
2668 SyncPoint::GetInstance()->ClearAllCallBacks();
2669 }
2670
2671 // Insert two values, v1 and v2, for a key. Insert another dummy key
2672 // so to evict the commit cache for v2, while v1 is still in commit cache.
2673 // Take two snapshots, s1 and s2. Release s1 during compaction.
2674 // Since commit cache for v2 is evicted, and old_commit_map don't have
2675 // s1 (it is released),
2676 // TODO(myabandeh): how can we be sure that the v2's commit info is evicted
2677 // (and not v1's)? Instead of putting a dummy, we can directly call
2678 // AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache.
2679 TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
2680 const size_t snapshot_cache_bits = 7; // same as default
2681 const size_t commit_cache_bits = 1; // commit cache size = 2
2682 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2683 ReOpen();
2684
2685 // Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
2686 // It also advance max_evicted_seq and can trigger old_commit_map cleanup.
2687 auto add_dummy = [&]() {
2688 auto* txn_dummy =
2689 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2690 ASSERT_OK(txn_dummy->SetName("txn_dummy"));
2691 ASSERT_OK(txn_dummy->Put("dummy", "dummy"));
2692 ASSERT_OK(txn_dummy->Prepare());
2693 ASSERT_OK(txn_dummy->Commit());
2694 delete txn_dummy;
2695 };
2696
2697 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2698 auto* txn =
2699 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2700 ASSERT_OK(txn->SetName("txn"));
2701 ASSERT_OK(txn->Put("key1", "value2"));
2702 ASSERT_OK(txn->Prepare());
2703 // TODO(myabandeh): replace it with GetId()?
2704 auto v2_seq = db->GetLatestSequenceNumber();
2705 ASSERT_OK(txn->Commit());
2706 delete txn;
2707 auto* s1 = db->GetSnapshot();
2708 // Dummy key to advance sequence number.
2709 add_dummy();
2710 auto* s2 = db->GetSnapshot();
2711
2712 auto callback = [&](void*) {
2713 db->ReleaseSnapshot(s1);
2714 // Add some dummy entries to trigger s1 being cleanup from old_commit_map.
2715 add_dummy();
2716 add_dummy();
2717 };
2718 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2719 callback);
2720 SyncPoint::GetInstance()->EnableProcessing();
2721
2722 ASSERT_OK(db->Flush(FlushOptions()));
2723 // value1 should be compact out.
2724 VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
2725
2726 db->ReleaseSnapshot(s2);
2727 SyncPoint::GetInstance()->ClearAllCallBacks();
2728 }
2729
2730 TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
2731 const size_t snapshot_cache_bits = 7; // same as default
2732 const size_t commit_cache_bits = 0; // minimum commit cache
2733 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2734 ReOpen();
2735
2736 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2737 auto* transaction =
2738 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2739 ASSERT_OK(transaction->SetName("txn"));
2740 ASSERT_OK(transaction->Delete("key1"));
2741 ASSERT_OK(transaction->Prepare());
2742 SequenceNumber del_seq = db->GetLatestSequenceNumber();
2743 auto snapshot1 = db->GetSnapshot();
2744 // Increment sequence number.
2745 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
2746 auto snapshot2 = db->GetSnapshot();
2747 ASSERT_OK(transaction->Commit());
2748 delete transaction;
2749 VerifyKeys({{"key1", "NOT_FOUND"}});
2750 VerifyKeys({{"key1", "value1"}}, snapshot1);
2751 VerifyKeys({{"key1", "value1"}}, snapshot2);
2752 ASSERT_OK(db->Flush(FlushOptions()));
2753
2754 auto callback = [&](void* compaction) {
2755 // Release snapshot1 after CompactionIterator init.
2756 // CompactionIterator need to double check and find out snapshot2 is now
2757 // the earliest existing snapshot.
2758 if (compaction != nullptr) {
2759 db->ReleaseSnapshot(snapshot1);
2760 // Add some keys to advance max_evicted_seq.
2761 ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
2762 ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
2763 }
2764 };
2765 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2766 callback);
2767 SyncPoint::GetInstance()->EnableProcessing();
2768
2769 // Dummy keys to avoid compaction trivially move files and get around actual
2770 // compaction logic.
2771 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2772 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2773 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2774 // Only verify for key1. Both the put and delete for the key should be kept.
2775 // Since the delete tombstone is not visible to snapshot2, we need to keep
2776 // at least one version of the key, for write-conflict check.
2777 VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
2778 {"key1", "value1", 0, kTypeValue}});
2779 db->ReleaseSnapshot(snapshot2);
2780 SyncPoint::GetInstance()->ClearAllCallBacks();
2781 }
2782
2783 // A more complex test to verify compaction/flush should keep keys visible
2784 // to snapshots.
2785 TEST_P(WritePreparedTransactionTest,
2786 CompactionKeepSnapshotVisibleKeysRandomized) {
2787 constexpr size_t kNumTransactions = 10;
2788 constexpr size_t kNumIterations = 1000;
2789
2790 std::vector<Transaction*> transactions(kNumTransactions, nullptr);
2791 std::vector<size_t> versions(kNumTransactions, 0);
2792 std::unordered_map<std::string, std::string> current_data;
2793 std::vector<const Snapshot*> snapshots;
2794 std::vector<std::unordered_map<std::string, std::string>> snapshot_data;
2795
2796 Random rnd(1103);
2797 options.disable_auto_compactions = true;
2798 ReOpen();
2799
2800 for (size_t i = 0; i < kNumTransactions; i++) {
2801 std::string key = "key" + ToString(i);
2802 std::string value = "value0";
2803 ASSERT_OK(db->Put(WriteOptions(), key, value));
2804 current_data[key] = value;
2805 }
2806 VerifyKeys(current_data);
2807
2808 for (size_t iter = 0; iter < kNumIterations; iter++) {
2809 auto r = rnd.Next() % (kNumTransactions + 1);
2810 if (r < kNumTransactions) {
2811 std::string key = "key" + ToString(r);
2812 if (transactions[r] == nullptr) {
2813 std::string value = "value" + ToString(versions[r] + 1);
2814 auto* txn = db->BeginTransaction(WriteOptions());
2815 ASSERT_OK(txn->SetName("txn" + ToString(r)));
2816 ASSERT_OK(txn->Put(key, value));
2817 ASSERT_OK(txn->Prepare());
2818 transactions[r] = txn;
2819 } else {
2820 std::string value = "value" + ToString(++versions[r]);
2821 ASSERT_OK(transactions[r]->Commit());
2822 delete transactions[r];
2823 transactions[r] = nullptr;
2824 current_data[key] = value;
2825 }
2826 } else {
2827 auto* snapshot = db->GetSnapshot();
2828 VerifyKeys(current_data, snapshot);
2829 snapshots.push_back(snapshot);
2830 snapshot_data.push_back(current_data);
2831 }
2832 VerifyKeys(current_data);
2833 }
2834 // Take a last snapshot to test compaction with uncommitted prepared
2835 // transaction.
2836 snapshots.push_back(db->GetSnapshot());
2837 snapshot_data.push_back(current_data);
2838
2839 assert(snapshots.size() == snapshot_data.size());
2840 for (size_t i = 0; i < snapshots.size(); i++) {
2841 VerifyKeys(snapshot_data[i], snapshots[i]);
2842 }
2843 ASSERT_OK(db->Flush(FlushOptions()));
2844 for (size_t i = 0; i < snapshots.size(); i++) {
2845 VerifyKeys(snapshot_data[i], snapshots[i]);
2846 }
2847 // Dummy keys to avoid compaction trivially move files and get around actual
2848 // compaction logic.
2849 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2850 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2851 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2852 for (size_t i = 0; i < snapshots.size(); i++) {
2853 VerifyKeys(snapshot_data[i], snapshots[i]);
2854 }
2855 // cleanup
2856 for (size_t i = 0; i < kNumTransactions; i++) {
2857 if (transactions[i] == nullptr) {
2858 continue;
2859 }
2860 ASSERT_OK(transactions[i]->Commit());
2861 delete transactions[i];
2862 }
2863 for (size_t i = 0; i < snapshots.size(); i++) {
2864 db->ReleaseSnapshot(snapshots[i]);
2865 }
2866 }
2867
2868 // Compaction should not apply the optimization to output key with sequence
2869 // number equal to 0 if the key is not visible to earliest snapshot, based on
2870 // commit sequence number.
2871 TEST_P(WritePreparedTransactionTest,
2872 CompactionShouldKeepSequenceForUncommittedKeys) {
2873 options.disable_auto_compactions = true;
2874 ReOpen();
2875 // Keep track of expected sequence number.
2876 SequenceNumber expected_seq = 0;
2877 auto* transaction = db->BeginTransaction(WriteOptions());
2878 ASSERT_OK(transaction->SetName("txn"));
2879 ASSERT_OK(transaction->Put("key1", "value1"));
2880 ASSERT_OK(transaction->Prepare());
2881 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
2882 SequenceNumber seq1 = expected_seq;
2883 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
2884 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
2885 expected_seq++; // one for data
2886 if (options.two_write_queues) {
2887 expected_seq++; // one for commit
2888 }
2889 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
2890 ASSERT_OK(db->Flush(FlushOptions()));
2891 // Dummy keys to avoid compaction trivially move files and get around actual
2892 // compaction logic.
2893 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2894 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2895 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2896 VerifyKeys({
2897 {"key1", "NOT_FOUND"},
2898 {"key2", "value2"},
2899 });
2900 VerifyInternalKeys({
2901 // "key1" has not been committed. It keeps its sequence number.
2902 {"key1", "value1", seq1, kTypeValue},
2903 // "key2" is committed and output with seq = 0.
2904 {"key2", "value2", 0, kTypeValue},
2905 });
2906 ASSERT_OK(transaction->Commit());
2907 VerifyKeys({
2908 {"key1", "value1"},
2909 {"key2", "value2"},
2910 });
2911 delete transaction;
2912 }
2913
2914 TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) {
2915 options.disable_auto_compactions = true;
2916 ReOpen();
2917
2918 const Snapshot* snapshot = nullptr;
2919 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2920 auto* txn = db->BeginTransaction(WriteOptions());
2921 ASSERT_OK(txn->SetName("txn"));
2922 ASSERT_OK(txn->Put("key1", "value2"));
2923 ASSERT_OK(txn->Prepare());
2924
2925 auto callback = [&](void*) {
2926 // Snapshot is taken after compaction start. It should be taken into
2927 // consideration for whether to compact out value1.
2928 snapshot = db->GetSnapshot();
2929 ASSERT_OK(txn->Commit());
2930 delete txn;
2931 };
2932 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2933 callback);
2934 SyncPoint::GetInstance()->EnableProcessing();
2935 ASSERT_OK(db->Flush(FlushOptions()));
2936 ASSERT_NE(nullptr, snapshot);
2937 VerifyKeys({{"key1", "value2"}});
2938 VerifyKeys({{"key1", "value1"}}, snapshot);
2939 db->ReleaseSnapshot(snapshot);
2940 }
2941
2942 TEST_P(WritePreparedTransactionTest, Iterate) {
2943 auto verify_state = [](Iterator* iter, const std::string& key,
2944 const std::string& value) {
2945 ASSERT_TRUE(iter->Valid());
2946 ASSERT_OK(iter->status());
2947 ASSERT_EQ(key, iter->key().ToString());
2948 ASSERT_EQ(value, iter->value().ToString());
2949 };
2950
2951 auto verify_iter = [&](const std::string& expected_val) {
2952 // Get iterator from a concurrent transaction and make sure it has the
2953 // same view as an iterator from the DB.
2954 auto* txn = db->BeginTransaction(WriteOptions());
2955
2956 for (int i = 0; i < 2; i++) {
2957 Iterator* iter = (i == 0)
2958 ? db->NewIterator(ReadOptions())
2959 : txn->GetIterator(ReadOptions());
2960 // Seek
2961 iter->Seek("foo");
2962 verify_state(iter, "foo", expected_val);
2963 // Next
2964 iter->Seek("a");
2965 verify_state(iter, "a", "va");
2966 iter->Next();
2967 verify_state(iter, "foo", expected_val);
2968 // SeekForPrev
2969 iter->SeekForPrev("y");
2970 verify_state(iter, "foo", expected_val);
2971 // Prev
2972 iter->SeekForPrev("z");
2973 verify_state(iter, "z", "vz");
2974 iter->Prev();
2975 verify_state(iter, "foo", expected_val);
2976 delete iter;
2977 }
2978 delete txn;
2979 };
2980
2981 ASSERT_OK(db->Put(WriteOptions(), "foo", "v1"));
2982 auto* transaction = db->BeginTransaction(WriteOptions());
2983 ASSERT_OK(transaction->SetName("txn"));
2984 ASSERT_OK(transaction->Put("foo", "v2"));
2985 ASSERT_OK(transaction->Prepare());
2986 VerifyKeys({{"foo", "v1"}});
2987 // dummy keys
2988 ASSERT_OK(db->Put(WriteOptions(), "a", "va"));
2989 ASSERT_OK(db->Put(WriteOptions(), "z", "vz"));
2990 verify_iter("v1");
2991 ASSERT_OK(transaction->Commit());
2992 VerifyKeys({{"foo", "v2"}});
2993 verify_iter("v2");
2994 delete transaction;
2995 }
2996
2997 TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
2998 Iterator* iter = db->NewIterator(ReadOptions());
2999 ASSERT_TRUE(iter->Refresh().IsNotSupported());
3000 delete iter;
3001 }
3002
3003 // Committing an delayed prepared has two non-atomic steps: update commit cache,
3004 // remove seq from delayed_prepared_. The read in IsInSnapshot also involves two
3005 // non-atomic steps of checking these two data structures. This test breaks each
3006 // in the middle to ensure correctness in spite of non-atomic execution.
3007 // Note: This test is limitted to the case where snapshot is larger than the
3008 // max_evicted_seq_.
3009 TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
3010 const size_t snapshot_cache_bits = 7; // same as default
3011 const size_t commit_cache_bits = 3; // 8 entries
3012 for (auto split_read : {true, false}) {
3013 std::vector<bool> split_options = {false};
3014 if (split_read) {
3015 // Also test for break before mutex
3016 split_options.push_back(true);
3017 }
3018 for (auto split_before_mutex : split_options) {
3019 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3020 ReOpen();
3021 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3022 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
3023 // Fill up the commit cache
3024 std::string init_value("value1");
3025 for (int i = 0; i < 10; i++) {
3026 db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
3027 }
3028 // Prepare a transaction but do not commit it
3029 Transaction* txn =
3030 db->BeginTransaction(WriteOptions(), TransactionOptions());
3031 ASSERT_OK(txn->SetName("xid"));
3032 ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
3033 ASSERT_OK(txn->Prepare());
3034 // Commit a bunch of entries to advance max evicted seq and make the
3035 // prepared a delayed prepared
3036 for (int i = 0; i < 10; i++) {
3037 db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3038 }
3039 // The snapshot should not see the delayed prepared entry
3040 auto snap = db->GetSnapshot();
3041
3042 if (split_read) {
3043 if (split_before_mutex) {
3044 // split before acquiring prepare_mutex_
3045 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3046 {{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause",
3047 "AtomicCommitOfDelayedPrepared:Commit:before"},
3048 {"AtomicCommitOfDelayedPrepared:Commit:after",
3049 "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}});
3050 } else {
3051 // split right after reading from the commit cache
3052 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3053 {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
3054 "AtomicCommitOfDelayedPrepared:Commit:before"},
3055 {"AtomicCommitOfDelayedPrepared:Commit:after",
3056 "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
3057 }
3058 } else { // split commit
3059 // split right before removing from delayed_prepared_
3060 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3061 {{"WritePreparedTxnDB::RemovePrepared:pause",
3062 "AtomicCommitOfDelayedPrepared:Read:before"},
3063 {"AtomicCommitOfDelayedPrepared:Read:after",
3064 "WritePreparedTxnDB::RemovePrepared:resume"}});
3065 }
3066 SyncPoint::GetInstance()->EnableProcessing();
3067
3068 ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
3069 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before");
3070 ASSERT_OK(txn->Commit());
3071 if (split_before_mutex) {
3072 // Do bunch of inserts to evict the commit entry from the cache. This
3073 // would prevent the 2nd look into commit cache under prepare_mutex_
3074 // to see the commit entry.
3075 auto seq = db_impl->TEST_GetLastVisibleSequence();
3076 size_t tries = 0;
3077 while (wp_db->max_evicted_seq_ < seq && tries < 50) {
3078 db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3079 tries++;
3080 };
3081 ASSERT_LT(tries, 50);
3082 }
3083 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after");
3084 delete txn;
3085 });
3086
3087 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3088 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before");
3089 ReadOptions roptions;
3090 roptions.snapshot = snap;
3091 PinnableSlice value;
3092 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
3093 ASSERT_OK(s);
3094 // It should not see the commit of delayed prepared
3095 ASSERT_TRUE(value == init_value);
3096 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after");
3097 db->ReleaseSnapshot(snap);
3098 });
3099
3100 read_thread.join();
3101 commit_thread.join();
3102 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3103 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3104 } // for split_before_mutex
3105 } // for split_read
3106 }
3107
3108 // When max evicted seq advances a prepared seq, it involves two updates: i)
3109 // adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_.
3110 // ::IsInSnapshot also reads these two values in a non-atomic way. This test
3111 // ensures correctness if the update occurs after ::IsInSnapshot reads
3112 // delayed_prepared_empty_ and before it reads max_evicted_seq_.
3113 // Note: this test focuses on read snapshot larger than max_evicted_seq_.
3114 TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
3115 const size_t snapshot_cache_bits = 7; // same as default
3116 const size_t commit_cache_bits = 3; // 8 entries
3117 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3118 ReOpen();
3119 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3120 // Fill up the commit cache
3121 std::string init_value("value1");
3122 for (int i = 0; i < 10; i++) {
3123 db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
3124 }
3125 // Prepare a transaction but do not commit it
3126 Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
3127 ASSERT_OK(txn->SetName("xid"));
3128 ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
3129 ASSERT_OK(txn->Prepare());
3130 // Create a gap between prepare seq and snapshot seq
3131 db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3132 db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3133 // The snapshot should not see the delayed prepared entry
3134 auto snap = db->GetSnapshot();
3135 ASSERT_LT(txn->GetId(), snap->GetSequenceNumber());
3136
3137 // split right after reading delayed_prepared_empty_
3138 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3139 {{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause",
3140 "AtomicUpdateOfDelayedPrepared:before"},
3141 {"AtomicUpdateOfDelayedPrepared:after",
3142 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}});
3143 SyncPoint::GetInstance()->EnableProcessing();
3144
3145 ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
3146 TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before");
3147 // Commit a bunch of entries to advance max evicted seq and make the
3148 // prepared a delayed prepared
3149 size_t tries = 0;
3150 while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
3151 db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3152 tries++;
3153 };
3154 ASSERT_LT(tries, 50);
3155 // This is the case on which the test focuses
3156 ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
3157 TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after");
3158 });
3159
3160 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3161 ReadOptions roptions;
3162 roptions.snapshot = snap;
3163 PinnableSlice value;
3164 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
3165 ASSERT_OK(s);
3166 // It should not see the uncommitted value of delayed prepared
3167 ASSERT_TRUE(value == init_value);
3168 db->ReleaseSnapshot(snap);
3169 });
3170
3171 read_thread.join();
3172 commit_thread.join();
3173 ASSERT_OK(txn->Commit());
3174 delete txn;
3175 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3176 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3177 }
3178
3179 // Eviction from commit cache and update of max evicted seq are two non-atomic
3180 // steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading
3181 // from commit cache are two non-atomic steps. This tests if the update occurs
3182 // after reading max_evicted_seq_ and before reading the commit cache.
3183 // Note: the test focuses on snapshot larger than max_evicted_seq_
3184 TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
3185 const size_t snapshot_cache_bits = 7; // same as default
3186 const size_t commit_cache_bits = 3; // 8 entries
3187 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3188 ReOpen();
3189 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3190 // Fill up the commit cache
3191 std::string init_value("value1");
3192 std::string last_value("value_final");
3193 for (int i = 0; i < 10; i++) {
3194 db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
3195 }
3196 // Do an uncommitted write to prevent min_uncommitted optimization
3197 Transaction* txn1 =
3198 db->BeginTransaction(WriteOptions(), TransactionOptions());
3199 ASSERT_OK(txn1->SetName("xid1"));
3200 ASSERT_OK(txn1->Put(Slice("key0"), last_value));
3201 ASSERT_OK(txn1->Prepare());
3202 // Do a write with prepare to get the prepare seq
3203 Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
3204 ASSERT_OK(txn->SetName("xid"));
3205 ASSERT_OK(txn->Put(Slice("key1"), last_value));
3206 ASSERT_OK(txn->Prepare());
3207 ASSERT_OK(txn->Commit());
3208 // Create a gap between commit entry and snapshot seq
3209 db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3210 db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3211 // The snapshot should see the last commit
3212 auto snap = db->GetSnapshot();
3213 ASSERT_LE(txn->GetId(), snap->GetSequenceNumber());
3214
3215 // split right after reading max_evicted_seq_
3216 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3217 {{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause",
3218 "NonAtomicUpdateOfMaxEvictedSeq:before"},
3219 {"NonAtomicUpdateOfMaxEvictedSeq:after",
3220 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}});
3221 SyncPoint::GetInstance()->EnableProcessing();
3222
3223 ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
3224 TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before");
3225 // Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
3226 size_t tries = 0;
3227 while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
3228 db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
3229 tries++;
3230 };
3231 ASSERT_LT(tries, 50);
3232 // This is the case on which the test focuses
3233 ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
3234 TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after");
3235 });
3236
3237 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3238 ReadOptions roptions;
3239 roptions.snapshot = snap;
3240 PinnableSlice value;
3241 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
3242 ASSERT_OK(s);
3243 // It should see the committed value of the evicted entry
3244 ASSERT_TRUE(value == last_value);
3245 db->ReleaseSnapshot(snap);
3246 });
3247
3248 read_thread.join();
3249 commit_thread.join();
3250 delete txn;
3251 txn1->Commit();
3252 delete txn1;
3253 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3254 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3255 }
3256
3257 // Test when we add a prepared seq when the max_evicted_seq_ already goes beyond
3258 // that. The test focuses on a race condition between AddPrepared and
3259 // AdvanceMaxEvictedSeq functions.
3260 TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
3261 if (!options.two_write_queues) {
3262 // This test is only for two write queues
3263 return;
3264 }
3265 const size_t snapshot_cache_bits = 7; // same as default
3266 // 1 entry to advance max after the 2nd commit
3267 const size_t commit_cache_bits = 0;
3268 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3269 ReOpen();
3270 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3271 std::string some_value("value_some");
3272 std::string uncommitted_value("value_uncommitted");
3273 // Prepare two uncommitted transactions
3274 Transaction* txn1 =
3275 db->BeginTransaction(WriteOptions(), TransactionOptions());
3276 ASSERT_OK(txn1->SetName("xid1"));
3277 ASSERT_OK(txn1->Put(Slice("key1"), some_value));
3278 ASSERT_OK(txn1->Prepare());
3279 Transaction* txn2 =
3280 db->BeginTransaction(WriteOptions(), TransactionOptions());
3281 ASSERT_OK(txn2->SetName("xid2"));
3282 ASSERT_OK(txn2->Put(Slice("key2"), some_value));
3283 ASSERT_OK(txn2->Prepare());
3284 // Start the txn here so the other thread could get its id
3285 Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
3286 ASSERT_OK(txn->SetName("xid"));
3287 ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
3288 port::Mutex txn_mutex_;
3289
3290 // t1) Insert prepared entry, t2) commit other entries to advance max
3291 // evicted sec and finish checking the existing prepared entries, t1)
3292 // AddPrepared, t2) update max_evicted_seq_
3293 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3294 {"AddPreparedCallback::AddPrepared::begin:pause",
3295 "AddPreparedBeforeMax::read_thread:start"},
3296 {"AdvanceMaxEvictedSeq::update_max:pause",
3297 "AddPreparedCallback::AddPrepared::begin:resume"},
3298 {"AddPreparedCallback::AddPrepared::end",
3299 "AdvanceMaxEvictedSeq::update_max:resume"},
3300 });
3301 SyncPoint::GetInstance()->EnableProcessing();
3302
3303 ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
3304 txn_mutex_.Lock();
3305 ASSERT_OK(txn->Prepare());
3306 txn_mutex_.Unlock();
3307 });
3308
3309 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3310 TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start");
3311 // Publish seq number with a commit
3312 ASSERT_OK(txn1->Commit());
3313 // Since the commit cache size is one the 2nd commit evict the 1st one and
3314 // invokes AdcanceMaxEvictedSeq
3315 ASSERT_OK(txn2->Commit());
3316
3317 ReadOptions roptions;
3318 PinnableSlice value;
3319 // The snapshot should not see the uncommitted value from write_thread
3320 auto snap = db->GetSnapshot();
3321 ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
3322 // This is the scenario that we test for
3323 txn_mutex_.Lock();
3324 ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId());
3325 txn_mutex_.Unlock();
3326 roptions.snapshot = snap;
3327 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value);
3328 ASSERT_TRUE(s.IsNotFound());
3329 db->ReleaseSnapshot(snap);
3330 });
3331
3332 read_thread.join();
3333 write_thread.join();
3334 delete txn1;
3335 delete txn2;
3336 ASSERT_OK(txn->Commit());
3337 delete txn;
3338 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3339 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3340 }
3341
3342 // When an old prepared entry gets committed, there is a gap between the time
3343 // that it is published and when it is cleaned up from old_prepared_. This test
3344 // stresses such cases.
3345 TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
3346 const size_t snapshot_cache_bits = 7; // same as default
3347 for (const size_t commit_cache_bits : {0, 2, 3}) {
3348 for (const size_t sub_batch_cnt : {1, 2, 3}) {
3349 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3350 ReOpen();
3351 std::atomic<const Snapshot*> snap = {nullptr};
3352 std::atomic<SequenceNumber> exp_prepare = {0};
3353 ROCKSDB_NAMESPACE::port::Thread callback_thread;
3354 // Value is synchronized via snap
3355 PinnableSlice value;
3356 // Take a snapshot after publish and before RemovePrepared:Start
3357 auto snap_callback = [&]() {
3358 ASSERT_EQ(nullptr, snap.load());
3359 snap.store(db->GetSnapshot());
3360 ReadOptions roptions;
3361 roptions.snapshot = snap.load();
3362 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key2", &value);
3363 ASSERT_OK(s);
3364 };
3365 auto callback = [&](void* param) {
3366 SequenceNumber prep_seq = *((SequenceNumber*)param);
3367 if (prep_seq == exp_prepare.load()) { // only for write_thread
3368 // We need to spawn a thread to avoid deadlock since getting a
3369 // snpashot might end up calling AdvanceSeqByOne which needs joining
3370 // the write queue.
3371 callback_thread = ROCKSDB_NAMESPACE::port::Thread(snap_callback);
3372 TEST_SYNC_POINT("callback:end");
3373 }
3374 };
3375 // Wait for the first snapshot be taken in GetSnapshotInternal. Although
3376 // it might be updated before GetSnapshotInternal finishes but this should
3377 // cover most of the cases.
3378 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3379 {"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"},
3380 });
3381 SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
3382 SyncPoint::GetInstance()->EnableProcessing();
3383 // Thread to cause frequent evictions
3384 ROCKSDB_NAMESPACE::port::Thread eviction_thread([&]() {
3385 // Too many txns might cause commit_seq - prepare_seq in another thread
3386 // to go beyond DELTA_UPPERBOUND
3387 for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
3388 db->Put(WriteOptions(), Slice("key1"), Slice("value1"));
3389 }
3390 });
3391 ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
3392 for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
3393 Transaction* txn =
3394 db->BeginTransaction(WriteOptions(), TransactionOptions());
3395 ASSERT_OK(txn->SetName("xid"));
3396 std::string val_str = "value" + ToString(i);
3397 for (size_t b = 0; b < sub_batch_cnt; b++) {
3398 ASSERT_OK(txn->Put(Slice("key2"), val_str));
3399 }
3400 ASSERT_OK(txn->Prepare());
3401 // Let an eviction to kick in
3402 std::this_thread::yield();
3403
3404 exp_prepare.store(txn->GetId());
3405 ASSERT_OK(txn->Commit());
3406 delete txn;
3407 // Wait for the snapshot taking that is triggered by
3408 // RemovePrepared:Start callback
3409 callback_thread.join();
3410
3411 // Read with the snapshot taken before delayed_prepared_ cleanup
3412 ReadOptions roptions;
3413 roptions.snapshot = snap.load();
3414 ASSERT_NE(nullptr, roptions.snapshot);
3415 PinnableSlice value2;
3416 auto s =
3417 db->Get(roptions, db->DefaultColumnFamily(), "key2", &value2);
3418 ASSERT_OK(s);
3419 // It should see its own write
3420 ASSERT_TRUE(val_str == value2);
3421 // The value read by snapshot should not change
3422 ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str());
3423
3424 db->ReleaseSnapshot(roptions.snapshot);
3425 snap.store(nullptr);
3426 }
3427 });
3428 write_thread.join();
3429 eviction_thread.join();
3430 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3431 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3432 }
3433 }
3434 }
3435
3436 // Test that updating the commit map will not affect the existing snapshots
3437 TEST_P(WritePreparedTransactionTest, AtomicCommit) {
3438 for (bool skip_prepare : {true, false}) {
3439 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3440 {"WritePreparedTxnDB::AddCommitted:start",
3441 "AtomicCommit::GetSnapshot:start"},
3442 {"AtomicCommit::Get:end",
3443 "WritePreparedTxnDB::AddCommitted:start:pause"},
3444 {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
3445 {"AtomicCommit::Get2:end",
3446 "WritePreparedTxnDB::AddCommitted:end:pause:"},
3447 });
3448 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3449 ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
3450 if (skip_prepare) {
3451 db->Put(WriteOptions(), Slice("key"), Slice("value"));
3452 } else {
3453 Transaction* txn =
3454 db->BeginTransaction(WriteOptions(), TransactionOptions());
3455 ASSERT_OK(txn->SetName("xid"));
3456 ASSERT_OK(txn->Put(Slice("key"), Slice("value")));
3457 ASSERT_OK(txn->Prepare());
3458 ASSERT_OK(txn->Commit());
3459 delete txn;
3460 }
3461 });
3462 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3463 ReadOptions roptions;
3464 TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
3465 roptions.snapshot = db->GetSnapshot();
3466 PinnableSlice val;
3467 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val);
3468 TEST_SYNC_POINT("AtomicCommit::Get:end");
3469 TEST_SYNC_POINT("AtomicCommit::Get2:start");
3470 ASSERT_SAME(roptions, db, s, val, "key");
3471 TEST_SYNC_POINT("AtomicCommit::Get2:end");
3472 db->ReleaseSnapshot(roptions.snapshot);
3473 });
3474 read_thread.join();
3475 write_thread.join();
3476 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3477 }
3478 }
3479
3480 // Test that we can change write policy from WriteCommitted to WritePrepared
3481 // after a clean shutdown (which would empty the WAL)
3482 TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {
3483 bool empty_wal = true;
3484 CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal);
3485 }
3486
3487 // Test that we fail fast if WAL is not emptied between changing the write
3488 // policy from WriteCommitted to WritePrepared
3489 TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) {
3490 bool empty_wal = true;
3491 CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal);
3492 }
3493
3494 // Test that we can change write policy from WritePrepare back to WriteCommitted
3495 // after a clean shutdown (which would empty the WAL)
3496 TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) {
3497 bool empty_wal = true;
3498 CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal);
3499 }
3500
3501 // Test that we fail fast if WAL is not emptied between changing the write
3502 // policy from WriteCommitted to WritePrepared
3503 TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) {
3504 bool empty_wal = true;
3505 CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal);
3506 }
3507
3508 } // namespace ROCKSDB_NAMESPACE
3509
3510 int main(int argc, char** argv) {
3511 ::testing::InitGoogleTest(&argc, argv);
3512 return RUN_ALL_TESTS();
3513 }
3514
3515 #else
3516 #include <stdio.h>
3517
3518 int main(int /*argc*/, char** /*argv*/) {
3519 fprintf(stderr,
3520 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
3521 return 0;
3522 }
3523
3524 #endif // ROCKSDB_LITE