]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_prepared_transaction_test.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_transaction_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include <algorithm>
9 #include <atomic>
10 #include <cinttypes>
11 #include <functional>
12 #include <string>
13 #include <thread>
14
15 #include "db/db_impl/db_impl.h"
16 #include "db/dbformat.h"
17 #include "port/port.h"
18 #include "port/stack_trace.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/types.h"
22 #include "rocksdb/utilities/debug.h"
23 #include "rocksdb/utilities/transaction.h"
24 #include "rocksdb/utilities/transaction_db.h"
25 #include "table/mock_table.h"
26 #include "test_util/sync_point.h"
27 #include "test_util/testharness.h"
28 #include "test_util/testutil.h"
29 #include "test_util/transaction_test_util.h"
30 #include "util/mutexlock.h"
31 #include "util/random.h"
32 #include "util/string_util.h"
33 #include "utilities/fault_injection_env.h"
34 #include "utilities/merge_operators.h"
35 #include "utilities/merge_operators/string_append/stringappend.h"
36 #include "utilities/transactions/pessimistic_transaction_db.h"
37 #include "utilities/transactions/transaction_test.h"
38 #include "utilities/transactions/write_prepared_txn_db.h"
39
40 using std::string;
41
42 namespace ROCKSDB_NAMESPACE {
43
44 using CommitEntry = WritePreparedTxnDB::CommitEntry;
45 using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b;
46 using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat;
47
48 TEST(PreparedHeap, BasicsTest) {
49 WritePreparedTxnDB::PreparedHeap heap;
50 {
51 MutexLock ml(heap.push_pop_mutex());
52 heap.push(14l);
53 // Test with one element
54 ASSERT_EQ(14l, heap.top());
55 heap.push(24l);
56 heap.push(34l);
57 // Test that old min is still on top
58 ASSERT_EQ(14l, heap.top());
59 heap.push(44l);
60 heap.push(54l);
61 heap.push(64l);
62 heap.push(74l);
63 heap.push(84l);
64 }
65 // Test that old min is still on top
66 ASSERT_EQ(14l, heap.top());
67 heap.erase(24l);
68 // Test that old min is still on top
69 ASSERT_EQ(14l, heap.top());
70 heap.erase(14l);
71 // Test that the new comes to the top after multiple erase
72 ASSERT_EQ(34l, heap.top());
73 heap.erase(34l);
74 // Test that the new comes to the top after single erase
75 ASSERT_EQ(44l, heap.top());
76 heap.erase(54l);
77 ASSERT_EQ(44l, heap.top());
78 heap.pop(); // pop 44l
79 // Test that the erased items are ignored after pop
80 ASSERT_EQ(64l, heap.top());
81 heap.erase(44l);
82 // Test that erasing an already popped item would work
83 ASSERT_EQ(64l, heap.top());
84 heap.erase(84l);
85 ASSERT_EQ(64l, heap.top());
86 {
87 MutexLock ml(heap.push_pop_mutex());
88 heap.push(85l);
89 heap.push(86l);
90 heap.push(87l);
91 heap.push(88l);
92 heap.push(89l);
93 }
94 heap.erase(87l);
95 heap.erase(85l);
96 heap.erase(89l);
97 heap.erase(86l);
98 heap.erase(88l);
99 // Test top remains the same after a random order of many erases
100 ASSERT_EQ(64l, heap.top());
101 heap.pop();
102 // Test that pop works with a series of random pending erases
103 ASSERT_EQ(74l, heap.top());
104 ASSERT_FALSE(heap.empty());
105 heap.pop();
106 // Test that empty works
107 ASSERT_TRUE(heap.empty());
108 }
109
110 // This is a scenario reconstructed from a buggy trace. Test that the bug does
111 // not resurface again.
112 TEST(PreparedHeap, EmptyAtTheEnd) {
113 WritePreparedTxnDB::PreparedHeap heap;
114 {
115 MutexLock ml(heap.push_pop_mutex());
116 heap.push(40l);
117 }
118 ASSERT_EQ(40l, heap.top());
119 // Although not a recommended scenario, we must be resilient against erase
120 // without a prior push.
121 heap.erase(50l);
122 ASSERT_EQ(40l, heap.top());
123 {
124 MutexLock ml(heap.push_pop_mutex());
125 heap.push(60l);
126 }
127 ASSERT_EQ(40l, heap.top());
128
129 heap.erase(60l);
130 ASSERT_EQ(40l, heap.top());
131 heap.erase(40l);
132 ASSERT_TRUE(heap.empty());
133
134 {
135 MutexLock ml(heap.push_pop_mutex());
136 heap.push(40l);
137 }
138 ASSERT_EQ(40l, heap.top());
139 heap.erase(50l);
140 ASSERT_EQ(40l, heap.top());
141 {
142 MutexLock ml(heap.push_pop_mutex());
143 heap.push(60l);
144 }
145 ASSERT_EQ(40l, heap.top());
146
147 heap.erase(40l);
148 // Test that the erase has not emptied the heap (we had a bug doing that)
149 ASSERT_FALSE(heap.empty());
150 ASSERT_EQ(60l, heap.top());
151 heap.erase(60l);
152 ASSERT_TRUE(heap.empty());
153 }
154
155 // Generate random order of PreparedHeap access and test that the heap will be
156 // successfully emptied at the end.
157 TEST(PreparedHeap, Concurrent) {
158 const size_t t_cnt = 10;
159 ROCKSDB_NAMESPACE::port::Thread t[t_cnt + 1];
160 WritePreparedTxnDB::PreparedHeap heap;
161 port::RWMutex prepared_mutex;
162 std::atomic<size_t> last;
163
164 for (size_t n = 0; n < 100; n++) {
165 last = 0;
166 t[0] = ROCKSDB_NAMESPACE::port::Thread([&]() {
167 Random rnd(1103);
168 for (size_t seq = 1; seq <= t_cnt; seq++) {
169 // This is not recommended usage but we should be resilient against it.
170 bool skip_push = rnd.OneIn(5);
171 if (!skip_push) {
172 MutexLock ml(heap.push_pop_mutex());
173 std::this_thread::yield();
174 heap.push(seq);
175 last.store(seq);
176 }
177 }
178 });
179 for (size_t i = 1; i <= t_cnt; i++) {
180 t[i] =
181 ROCKSDB_NAMESPACE::port::Thread([&heap, &prepared_mutex, &last, i]() {
182 auto seq = i;
183 do {
184 std::this_thread::yield();
185 } while (last.load() < seq);
186 WriteLock wl(&prepared_mutex);
187 heap.erase(seq);
188 });
189 }
190 for (size_t i = 0; i <= t_cnt; i++) {
191 t[i].join();
192 }
193 ASSERT_TRUE(heap.empty());
194 }
195 }
196
197 // Test that WriteBatchWithIndex correctly counts the number of sub-batches
198 TEST(WriteBatchWithIndex, SubBatchCnt) {
199 ColumnFamilyOptions cf_options;
200 std::string cf_name = "two";
201 DB* db;
202 Options options;
203 options.create_if_missing = true;
204 const std::string dbname = test::PerThreadDBPath("transaction_testdb");
205 EXPECT_OK(DestroyDB(dbname, options));
206 ASSERT_OK(DB::Open(options, dbname, &db));
207 ColumnFamilyHandle* cf_handle = nullptr;
208 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
209 WriteOptions write_options;
210 size_t batch_cnt = 1;
211 size_t save_points = 0;
212 std::vector<size_t> batch_cnt_at;
213 WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true,
214 0);
215 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
216 batch_cnt_at.push_back(batch_cnt);
217 batch.SetSavePoint();
218 save_points++;
219 ASSERT_OK(batch.Put(Slice("key"), Slice("value")));
220 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
221 batch_cnt_at.push_back(batch_cnt);
222 batch.SetSavePoint();
223 save_points++;
224 ASSERT_OK(batch.Put(Slice("key2"), Slice("value2")));
225 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
226 // duplicate the keys
227 batch_cnt_at.push_back(batch_cnt);
228 batch.SetSavePoint();
229 save_points++;
230 ASSERT_OK(batch.Put(Slice("key"), Slice("value3")));
231 batch_cnt++;
232 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
233 // duplicate the 2nd key. It should not be counted duplicate since a
234 // sub-patch is cut after the last duplicate.
235 batch_cnt_at.push_back(batch_cnt);
236 batch.SetSavePoint();
237 save_points++;
238 ASSERT_OK(batch.Put(Slice("key2"), Slice("value4")));
239 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
240 // duplicate the keys but in a different cf. It should not be counted as
241 // duplicate keys
242 batch_cnt_at.push_back(batch_cnt);
243 batch.SetSavePoint();
244 save_points++;
245 ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value5")));
246 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
247
248 // Test that the number of sub-batches matches what we count with
249 // SubBatchCounter
250 std::map<uint32_t, const Comparator*> comparators;
251 comparators[0] = db->DefaultColumnFamily()->GetComparator();
252 comparators[cf_handle->GetID()] = cf_handle->GetComparator();
253 SubBatchCounter counter(comparators);
254 ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter));
255 ASSERT_EQ(batch_cnt, counter.BatchCount());
256
257 // Test that RollbackToSavePoint will properly resets the number of
258 // sub-batches
259 for (size_t i = save_points; i > 0; i--) {
260 ASSERT_OK(batch.RollbackToSavePoint());
261 ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt());
262 }
263
264 // Test the count is right with random batches
265 {
266 const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms
267 Random rnd(1131);
268 std::string keys[TOTAL_KEYS];
269 for (size_t k = 0; k < TOTAL_KEYS; k++) {
270 int len = static_cast<int>(rnd.Uniform(50));
271 keys[k] = test::RandomKey(&rnd, len);
272 }
273 for (size_t i = 0; i < 1000; i++) { // 1000 random batches
274 WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(),
275 0, true, 0);
276 for (size_t k = 0; k < 10; k++) { // 10 key per batch
277 size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS));
278 Slice key = Slice(keys[ki]);
279 std::string tmp = rnd.RandomString(16);
280 Slice value = Slice(tmp);
281 ASSERT_OK(rndbatch.Put(key, value));
282 }
283 SubBatchCounter batch_counter(comparators);
284 ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter));
285 ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount());
286 }
287 }
288
289 delete cf_handle;
290 delete db;
291 }
292
293 TEST(CommitEntry64b, BasicTest) {
294 const size_t INDEX_BITS = static_cast<size_t>(21);
295 const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);
296 const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS));
297
298 // zero-initialized CommitEntry64b should indicate an empty entry
299 CommitEntry64b empty_entry64b;
300 uint64_t empty_index = 11ul;
301 CommitEntry empty_entry;
302 bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT);
303 ASSERT_FALSE(ok);
304
305 // the zero entry is reserved for un-initialized entries
306 const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1;
307 // Samples over the numbers that are covered by that many index bits
308 std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}};
309 // Samples over the numbers that are covered by that many commit bits
310 std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}};
311 // Iterate over prepare numbers that have i) cover all bits of a sequence
312 // number, and ii) include some bits that fall into the range of index or
313 // commit bits
314 for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) {
315 for (uint64_t i : is) {
316 for (uint64_t d : ds) {
317 uint64_t p = base + i + d;
318 for (uint64_t c : {p, p + d / 2, p + d}) {
319 uint64_t index = p % INDEX_SIZE;
320 CommitEntry before(p, c), after;
321 CommitEntry64b entry64b(before, FORMAT);
322 ok = entry64b.Parse(index, &after, FORMAT);
323 ASSERT_TRUE(ok);
324 if (!(before == after)) {
325 printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64
326 " c %" PRIu64 " index %" PRIu64 "\n",
327 base, i, d, p, c, index);
328 }
329 ASSERT_EQ(before, after);
330 }
331 }
332 }
333 }
334 }
335
336 class WritePreparedTxnDBMock : public WritePreparedTxnDB {
337 public:
338 WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
339 : WritePreparedTxnDB(db_impl, opt) {}
340 void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
341 snapshots_ = snapshots;
342 }
343 void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
344
345 protected:
346 const std::vector<SequenceNumber> GetSnapshotListFromDB(
347 SequenceNumber /* unused */) override {
348 return snapshots_;
349 }
350
351 private:
352 std::vector<SequenceNumber> snapshots_;
353 };
354
355 class WritePreparedTransactionTestBase : public TransactionTestBase {
356 public:
357 WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue,
358 TxnDBWritePolicy write_policy,
359 WriteOrdering write_ordering)
360 : TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
361 write_ordering){};
362
363 protected:
364 void UpdateTransactionDBOptions(size_t snapshot_cache_bits,
365 size_t commit_cache_bits) {
366 txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
367 txn_db_options.wp_commit_cache_bits = commit_cache_bits;
368 }
369 void UpdateTransactionDBOptions(size_t snapshot_cache_bits) {
370 txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
371 }
372 // If expect_update is set, check if it actually updated old_commit_map_. If
373 // it did not and yet suggested not to check the next snapshot, do the
374 // opposite to check if it was not a bad suggestion.
375 void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit,
376 uint64_t snapshot,
377 uint64_t next_snapshot,
378 bool expect_update) {
379 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
380 // reset old_commit_map_empty_ so that its value indicate whether
381 // old_commit_map_ was updated
382 wp_db->old_commit_map_empty_ = true;
383 bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot,
384 snapshot < next_snapshot);
385 if (expect_update == wp_db->old_commit_map_empty_) {
386 printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
387 " next: %" PRIu64 "\n",
388 prepare, commit, snapshot, next_snapshot);
389 }
390 EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_);
391 if (!check_next && wp_db->old_commit_map_empty_) {
392 // do the opposite to make sure it was not a bad suggestion
393 const bool dont_care_bool = true;
394 wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot,
395 dont_care_bool);
396 if (!wp_db->old_commit_map_empty_) {
397 printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
398 " next: %" PRIu64 "\n",
399 prepare, commit, snapshot, next_snapshot);
400 }
401 EXPECT_TRUE(wp_db->old_commit_map_empty_);
402 }
403 }
404
405 // Test that a CheckAgainstSnapshots thread reading old_snapshots will not
406 // miss a snapshot because of a concurrent update by UpdateSnapshots that is
407 // writing new_snapshots. Both threads are broken at two points. The sync
408 // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
409 // entry is expected to be vital for one of the snapshots that is common
410 // between the old and new list of snapshots.
411 void SnapshotConcurrentAccessTestInternal(
412 WritePreparedTxnDB* wp_db,
413 const std::vector<SequenceNumber>& old_snapshots,
414 const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry,
415 SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) {
416 // First reset the snapshot list
417 const std::vector<SequenceNumber> empty_snapshots;
418 wp_db->old_commit_map_empty_ = true;
419 wp_db->UpdateSnapshots(empty_snapshots, ++version);
420 // Then initialize it with the old_snapshots
421 wp_db->UpdateSnapshots(old_snapshots, ++version);
422
423 // Starting from the first thread, cut each thread at two points
424 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
425 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1),
426 "WritePreparedTxnDB::UpdateSnapshots:s:start"},
427 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1),
428 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)},
429 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2),
430 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)},
431 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2),
432 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)},
433 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
434 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)},
435 });
436 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
437 {
438 ASSERT_TRUE(wp_db->old_commit_map_empty_);
439 ROCKSDB_NAMESPACE::port::Thread t1(
440 [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
441 wp_db->CheckAgainstSnapshots(entry);
442 t1.join();
443 ASSERT_FALSE(wp_db->old_commit_map_empty_);
444 }
445 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
446
447 wp_db->old_commit_map_empty_ = true;
448 wp_db->UpdateSnapshots(empty_snapshots, ++version);
449 wp_db->UpdateSnapshots(old_snapshots, ++version);
450 // Starting from the second thread, cut each thread at two points
451 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
452 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1),
453 "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
454 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1),
455 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)},
456 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2),
457 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)},
458 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2),
459 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)},
460 {"WritePreparedTxnDB::UpdateSnapshots:p:end",
461 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)},
462 });
463 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
464 {
465 ASSERT_TRUE(wp_db->old_commit_map_empty_);
466 ROCKSDB_NAMESPACE::port::Thread t1(
467 [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
468 wp_db->CheckAgainstSnapshots(entry);
469 t1.join();
470 ASSERT_FALSE(wp_db->old_commit_map_empty_);
471 }
472 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
473 }
474
475 // Verify value of keys.
476 void VerifyKeys(const std::unordered_map<std::string, std::string>& data,
477 const Snapshot* snapshot = nullptr) {
478 std::string value;
479 ReadOptions read_options;
480 read_options.snapshot = snapshot;
481 for (auto& kv : data) {
482 auto s = db->Get(read_options, kv.first, &value);
483 ASSERT_TRUE(s.ok() || s.IsNotFound());
484 if (s.ok()) {
485 if (kv.second != value) {
486 printf("key = %s\n", kv.first.c_str());
487 }
488 ASSERT_EQ(kv.second, value);
489 } else {
490 ASSERT_EQ(kv.second, "NOT_FOUND");
491 }
492
493 // Try with MultiGet API too
494 std::vector<std::string> values;
495 auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()},
496 {kv.first}, &values);
497 ASSERT_EQ(1, values.size());
498 ASSERT_EQ(1, s_vec.size());
499 s = s_vec[0];
500 ASSERT_TRUE(s.ok() || s.IsNotFound());
501 if (s.ok()) {
502 ASSERT_TRUE(kv.second == values[0]);
503 } else {
504 ASSERT_EQ(kv.second, "NOT_FOUND");
505 }
506 }
507 }
508
509 // Verify all versions of keys.
510 void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
511 std::vector<KeyVersion> versions;
512 const size_t kMaxKeys = 100000;
513 ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
514 expected_versions.back().user_key, kMaxKeys,
515 &versions));
516 ASSERT_EQ(expected_versions.size(), versions.size());
517 for (size_t i = 0; i < versions.size(); i++) {
518 ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
519 ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence);
520 ASSERT_EQ(expected_versions[i].type, versions[i].type);
521 if (versions[i].type != kTypeDeletion &&
522 versions[i].type != kTypeSingleDeletion) {
523 ASSERT_EQ(expected_versions[i].value, versions[i].value);
524 }
525 // Range delete not supported.
526 ASSERT_NE(expected_versions[i].type, kTypeRangeDeletion);
527 }
528 }
529 };
530
531 class WritePreparedTransactionTest
532 : public WritePreparedTransactionTestBase,
533 virtual public ::testing::WithParamInterface<
534 std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering>> {
535 public:
536 WritePreparedTransactionTest()
537 : WritePreparedTransactionTestBase(
538 std::get<0>(GetParam()), std::get<1>(GetParam()),
539 std::get<2>(GetParam()), std::get<3>(GetParam())){};
540 };
541
542 #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
543 class SnapshotConcurrentAccessTest
544 : public WritePreparedTransactionTestBase,
545 virtual public ::testing::WithParamInterface<std::tuple<
546 bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> {
547 public:
548 SnapshotConcurrentAccessTest()
549 : WritePreparedTransactionTestBase(
550 std::get<0>(GetParam()), std::get<1>(GetParam()),
551 std::get<2>(GetParam()), std::get<3>(GetParam())),
552 split_id_(std::get<4>(GetParam())),
553 split_cnt_(std::get<5>(GetParam())){};
554
555 protected:
556 // A test is split into split_cnt_ tests, each identified with split_id_ where
557 // 0 <= split_id_ < split_cnt_
558 size_t split_id_;
559 size_t split_cnt_;
560 };
561 #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
562
563 class SeqAdvanceConcurrentTest
564 : public WritePreparedTransactionTestBase,
565 virtual public ::testing::WithParamInterface<std::tuple<
566 bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> {
567 public:
568 SeqAdvanceConcurrentTest()
569 : WritePreparedTransactionTestBase(
570 std::get<0>(GetParam()), std::get<1>(GetParam()),
571 std::get<2>(GetParam()), std::get<3>(GetParam())),
572 split_id_(std::get<4>(GetParam())),
573 split_cnt_(std::get<5>(GetParam())) {
574 special_env.skip_fsync_ = true;
575 };
576
577 protected:
578 // A test is split into split_cnt_ tests, each identified with split_id_ where
579 // 0 <= split_id_ < split_cnt_
580 size_t split_id_;
581 size_t split_cnt_;
582 };
583
584 INSTANTIATE_TEST_CASE_P(
585 WritePreparedTransaction, WritePreparedTransactionTest,
586 ::testing::Values(
587 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
588 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
589 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite)));
590
591 #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
592 INSTANTIATE_TEST_CASE_P(
593 TwoWriteQueues, SnapshotConcurrentAccessTest,
594 ::testing::Values(
595 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 20),
596 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 20),
597 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 20),
598 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 20),
599 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 20),
600 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 20),
601 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 20),
602 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 20),
603 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 20),
604 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 20),
605 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 10, 20),
606 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 11, 20),
607 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 12, 20),
608 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 13, 20),
609 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 14, 20),
610 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 15, 20),
611 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 16, 20),
612 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 17, 20),
613 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 18, 20),
614 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 19, 20),
615
616 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 20),
617 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 20),
618 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 20),
619 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 20),
620 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 20),
621 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 20),
622 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 20),
623 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 20),
624 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 20),
625 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 20),
626 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 10, 20),
627 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 11, 20),
628 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 12, 20),
629 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 13, 20),
630 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 14, 20),
631 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 15, 20),
632 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 16, 20),
633 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 17, 20),
634 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 18, 20),
635 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 19, 20)));
636
637 INSTANTIATE_TEST_CASE_P(
638 OneWriteQueue, SnapshotConcurrentAccessTest,
639 ::testing::Values(
640 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 20),
641 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 20),
642 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 20),
643 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 20),
644 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 20),
645 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 20),
646 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 20),
647 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 20),
648 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 20),
649 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 20),
650 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 10, 20),
651 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 11, 20),
652 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 12, 20),
653 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 13, 20),
654 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 14, 20),
655 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 15, 20),
656 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 16, 20),
657 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 17, 20),
658 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 18, 20),
659 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 19, 20)));
660
661 INSTANTIATE_TEST_CASE_P(
662 TwoWriteQueues, SeqAdvanceConcurrentTest,
663 ::testing::Values(
664 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 10),
665 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 10),
666 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 10),
667 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 10),
668 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 10),
669 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 10),
670 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 10),
671 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 10),
672 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 10),
673 std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 10),
674 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 10),
675 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 10),
676 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 10),
677 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 10),
678 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 10),
679 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 10),
680 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 10),
681 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 10),
682 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 10),
683 std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 10)));
684
685 INSTANTIATE_TEST_CASE_P(
686 OneWriteQueue, SeqAdvanceConcurrentTest,
687 ::testing::Values(
688 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 10),
689 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 10),
690 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 10),
691 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 10),
692 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 10),
693 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 10),
694 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 10),
695 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 10),
696 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 10),
697 std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 10)));
698 #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
699
700 TEST_P(WritePreparedTransactionTest, CommitMap) {
701 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
702 ASSERT_NE(wp_db, nullptr);
703 ASSERT_NE(wp_db->db_impl_, nullptr);
704 size_t size = wp_db->COMMIT_CACHE_SIZE;
705 CommitEntry c = {5, 12}, e;
706 bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e);
707 ASSERT_FALSE(evicted);
708
709 // Should be able to read the same value
710 CommitEntry64b dont_care;
711 bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
712 ASSERT_TRUE(found);
713 ASSERT_EQ(c, e);
714 // Should be able to distinguish between overlapping entries
715 found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e);
716 ASSERT_TRUE(found);
717 ASSERT_NE(c.prep_seq + size, e.prep_seq);
718 // Should be able to detect non-existent entry
719 found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e);
720 ASSERT_FALSE(found);
721
722 // Reject an invalid exchange
723 CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size};
724 CommitEntry64b e2_64b(e2, wp_db->FORMAT);
725 bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e);
726 ASSERT_FALSE(exchanged);
727 // check whether it did actually reject that
728 found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e);
729 ASSERT_TRUE(found);
730 ASSERT_EQ(c, e);
731
732 // Accept a valid exchange
733 CommitEntry64b c_64b(c, wp_db->FORMAT);
734 CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1};
735 exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3);
736 ASSERT_TRUE(exchanged);
737 // check whether it did actually accepted that
738 found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
739 ASSERT_TRUE(found);
740 ASSERT_EQ(e3, e);
741
742 // Rewrite an entry
743 CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1};
744 evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e);
745 ASSERT_TRUE(evicted);
746 ASSERT_EQ(e3, e);
747 found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e);
748 ASSERT_TRUE(found);
749 ASSERT_EQ(e4, e);
750 }
751
752 TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
753 // If prepare <= snapshot < commit we should keep the entry around since its
754 // nonexistence could be interpreted as committed in the snapshot while it is
755 // not true. We keep such entries around by adding them to the
756 // old_commit_map_.
757 uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/;
758 p = 10l, c = 15l, s = 20l, ns = 21l;
759 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
760 // If we do not expect the old commit map to be updated, try also with a next
761 // snapshot that is expected to update the old commit map. This would test
762 // that MaybeUpdateOldCommitMap would not prevent us from checking the next
763 // snapshot that must be checked.
764 p = 10l, c = 15l, s = 20l, ns = 11l;
765 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
766
767 p = 10l, c = 20l, s = 20l, ns = 19l;
768 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
769 p = 10l, c = 20l, s = 20l, ns = 21l;
770 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
771
772 p = 20l, c = 20l, s = 20l, ns = 21l;
773 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
774 p = 20l, c = 20l, s = 20l, ns = 19l;
775 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
776
777 p = 10l, c = 25l, s = 20l, ns = 21l;
778 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
779
780 p = 20l, c = 25l, s = 20l, ns = 21l;
781 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
782
783 p = 21l, c = 25l, s = 20l, ns = 22l;
784 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
785 p = 21l, c = 25l, s = 20l, ns = 19l;
786 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
787 }
788
789 // Trigger the condition where some old memtables are skipped when doing
790 // TransactionUtil::CheckKey(), and make sure the result is still correct.
791 TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
792 const int kAttemptHistoryMemtable = 0;
793 const int kAttemptImmMemTable = 1;
794 for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
795 attempt++) {
796 options.max_write_buffer_number_to_maintain = 3;
797 ASSERT_OK(ReOpen());
798
799 WriteOptions write_options;
800 ReadOptions read_options;
801 TransactionOptions txn_options;
802 txn_options.set_snapshot = true;
803 string value;
804
805 ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
806 ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
807
808 Transaction* txn = db->BeginTransaction(write_options, txn_options);
809 ASSERT_TRUE(txn != nullptr);
810 ASSERT_OK(txn->SetName("txn"));
811
812 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
813 ASSERT_TRUE(txn2 != nullptr);
814 ASSERT_OK(txn2->SetName("txn2"));
815
816 // This transaction is created to cause potential conflict.
817 Transaction* txn_x = db->BeginTransaction(write_options);
818 ASSERT_OK(txn_x->SetName("txn_x"));
819 ASSERT_OK(txn_x->Put(Slice("foo"), Slice("bar3")));
820 ASSERT_OK(txn_x->Prepare());
821
822 // Create snapshots after the prepare, but there should still
823 // be a conflict when trying to read "foo".
824
825 if (attempt == kAttemptImmMemTable) {
826 // For the second attempt, hold flush from beginning. The memtable
827 // will be switched to immutable after calling TEST_SwitchMemtable()
828 // while CheckKey() is called.
829 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
830 {{"WritePreparedTransactionTest.CheckKeySkipOldMemtable",
831 "FlushJob::Start"}});
832 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
833 }
834
835 // force a memtable flush. The memtable should still be kept
836 FlushOptions flush_ops;
837 if (attempt == kAttemptHistoryMemtable) {
838 ASSERT_OK(db->Flush(flush_ops));
839 } else {
840 ASSERT_EQ(attempt, kAttemptImmMemTable);
841 DBImpl* db_impl = static_cast<DBImpl*>(db->GetRootDB());
842 ASSERT_OK(db_impl->TEST_SwitchMemtable());
843 }
844 uint64_t num_imm_mems;
845 ASSERT_TRUE(db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
846 &num_imm_mems));
847 if (attempt == kAttemptHistoryMemtable) {
848 ASSERT_EQ(0, num_imm_mems);
849 } else {
850 ASSERT_EQ(attempt, kAttemptImmMemTable);
851 ASSERT_EQ(1, num_imm_mems);
852 }
853
854 // Put something in active memtable
855 ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar")));
856
857 // Create txn3 after flushing, but this transaction also needs to
858 // check all memtables because of they contains uncommitted data.
859 Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
860 ASSERT_TRUE(txn3 != nullptr);
861 ASSERT_OK(txn3->SetName("txn3"));
862
863 // Commit the pending write
864 ASSERT_OK(txn_x->Commit());
865
866 // Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will
867 // pass. In all cases, both memtables are queried.
868 SetPerfLevel(PerfLevel::kEnableCount);
869 get_perf_context()->Reset();
870 ASSERT_TRUE(txn3->GetForUpdate(read_options, "foo", &value).IsBusy());
871 // We should have checked two memtables, active and either immutable
872 // or history memtable, depending on the test case.
873 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
874
875 get_perf_context()->Reset();
876 ASSERT_TRUE(txn->GetForUpdate(read_options, "foo", &value).IsBusy());
877 // We should have checked two memtables, active and either immutable
878 // or history memtable, depending on the test case.
879 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
880
881 get_perf_context()->Reset();
882 ASSERT_OK(txn2->GetForUpdate(read_options, "foo2", &value));
883 ASSERT_EQ(value, "bar");
884 // We should have checked two memtables, and since there is no
885 // conflict, another Get() will be made and fetch the data from
886 // DB. If it is in immutable memtable, two extra memtable reads
887 // will be issued. If it is not (in history), only one will
888 // be made, which is to the active memtable.
889 if (attempt == kAttemptHistoryMemtable) {
890 ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
891 } else {
892 ASSERT_EQ(attempt, kAttemptImmMemTable);
893 ASSERT_EQ(4, get_perf_context()->get_from_memtable_count);
894 }
895
896 Transaction* txn4 = db->BeginTransaction(write_options, txn_options);
897 ASSERT_TRUE(txn4 != nullptr);
898 ASSERT_OK(txn4->SetName("txn4"));
899 get_perf_context()->Reset();
900 ASSERT_OK(txn4->GetForUpdate(read_options, "foo", &value));
901 if (attempt == kAttemptHistoryMemtable) {
902 // Active memtable will be checked in snapshot validation and when
903 // getting the value.
904 ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
905 } else {
906 // Only active memtable will be checked in snapshot validation but
907 // both of active and immutable snapshot will be queried when
908 // getting the value.
909 ASSERT_EQ(attempt, kAttemptImmMemTable);
910 ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
911 }
912
913 ASSERT_OK(txn2->Commit());
914 ASSERT_OK(txn4->Commit());
915
916 TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable");
917 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
918
919 SetPerfLevel(PerfLevel::kDisable);
920
921 delete txn;
922 delete txn2;
923 delete txn3;
924 delete txn4;
925 delete txn_x;
926 }
927 }
928
929 // Reproduce the bug with two snapshots with the same seuqence number and test
930 // that the release of the first snapshot will not affect the reads by the other
931 // snapshot
932 TEST_P(WritePreparedTransactionTest, DoubleSnapshot) {
933 TransactionOptions txn_options;
934 Status s;
935
936 // Insert initial value
937 ASSERT_OK(db->Put(WriteOptions(), "key", "value1"));
938
939 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
940 Transaction* txn =
941 wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr);
942 ASSERT_OK(txn->SetName("txn"));
943 ASSERT_OK(txn->Put("key", "value2"));
944 ASSERT_OK(txn->Prepare());
945 // Three snapshots with the same seq number
946 const Snapshot* snapshot0 = wp_db->GetSnapshot();
947 const Snapshot* snapshot1 = wp_db->GetSnapshot();
948 const Snapshot* snapshot2 = wp_db->GetSnapshot();
949 ASSERT_OK(txn->Commit());
950 SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE;
951 SequenceNumber overlap_seq = txn->GetId() + cache_size;
952 delete txn;
953
954 // 4th snapshot with a larger seq
955 const Snapshot* snapshot3 = wp_db->GetSnapshot();
956 // Cause an eviction to advance max evicted seq number
957 // This also fetches the 4 snapshots from db since their seq is lower than the
958 // new max
959 wp_db->AddCommitted(overlap_seq, overlap_seq);
960
961 ReadOptions ropt;
962 // It should see the value before commit
963 ropt.snapshot = snapshot2;
964 PinnableSlice pinnable_val;
965 s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
966 ASSERT_OK(s);
967 ASSERT_TRUE(pinnable_val == "value1");
968 pinnable_val.Reset();
969
970 wp_db->ReleaseSnapshot(snapshot1);
971
972 // It should still see the value before commit
973 s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
974 ASSERT_OK(s);
975 ASSERT_TRUE(pinnable_val == "value1");
976 pinnable_val.Reset();
977
978 // Cause an eviction to advance max evicted seq number and trigger updating
979 // the snapshot list
980 overlap_seq += cache_size;
981 wp_db->AddCommitted(overlap_seq, overlap_seq);
982
983 // It should still see the value before commit
984 s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
985 ASSERT_OK(s);
986 ASSERT_TRUE(pinnable_val == "value1");
987 pinnable_val.Reset();
988
989 wp_db->ReleaseSnapshot(snapshot0);
990 wp_db->ReleaseSnapshot(snapshot2);
991 wp_db->ReleaseSnapshot(snapshot3);
992 }
993
994 size_t UniqueCnt(std::vector<SequenceNumber> vec) {
995 std::set<SequenceNumber> aset;
996 for (auto i : vec) {
997 aset.insert(i);
998 }
999 return aset.size();
1000 }
1001 // Test that the entries in old_commit_map_ get garbage collected properly
1002 TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
1003 const size_t snapshot_cache_bits = 0;
1004 const size_t commit_cache_bits = 0;
1005 DBImpl* mock_db = new DBImpl(options, dbname);
1006 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1007 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1008 new WritePreparedTxnDBMock(mock_db, txn_db_options));
1009
1010 SequenceNumber seq = 0;
1011 // Take the first snapshot that overlaps with two txn
1012 auto prep_seq = ++seq;
1013 wp_db->AddPrepared(prep_seq);
1014 auto prep_seq2 = ++seq;
1015 wp_db->AddPrepared(prep_seq2);
1016 auto snap_seq1 = seq;
1017 wp_db->TakeSnapshot(snap_seq1);
1018 auto commit_seq = ++seq;
1019 wp_db->AddCommitted(prep_seq, commit_seq);
1020 wp_db->RemovePrepared(prep_seq);
1021 auto commit_seq2 = ++seq;
1022 wp_db->AddCommitted(prep_seq2, commit_seq2);
1023 wp_db->RemovePrepared(prep_seq2);
1024 // Take the 2nd and 3rd snapshot that overlap with the same txn
1025 prep_seq = ++seq;
1026 wp_db->AddPrepared(prep_seq);
1027 auto snap_seq2 = seq;
1028 wp_db->TakeSnapshot(snap_seq2);
1029 seq++;
1030 auto snap_seq3 = seq;
1031 wp_db->TakeSnapshot(snap_seq3);
1032 seq++;
1033 commit_seq = ++seq;
1034 wp_db->AddCommitted(prep_seq, commit_seq);
1035 wp_db->RemovePrepared(prep_seq);
1036 // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
1037 // only item in the commit_cache_ via another commit.
1038 prep_seq = ++seq;
1039 wp_db->AddPrepared(prep_seq);
1040 commit_seq = ++seq;
1041 wp_db->AddCommitted(prep_seq, commit_seq);
1042 wp_db->RemovePrepared(prep_seq);
1043
1044 // Verify that the evicted commit entries for all snapshots are in the
1045 // old_commit_map_
1046 {
1047 ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
1048 ReadLock rl(&wp_db->old_commit_map_mutex_);
1049 ASSERT_EQ(3, wp_db->old_commit_map_.size());
1050 ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
1051 ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2]));
1052 ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
1053 }
1054
1055 // Verify that the 2nd snapshot is cleaned up after the release
1056 wp_db->ReleaseSnapshotInternal(snap_seq2);
1057 {
1058 ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
1059 ReadLock rl(&wp_db->old_commit_map_mutex_);
1060 ASSERT_EQ(2, wp_db->old_commit_map_.size());
1061 ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
1062 ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
1063 }
1064
1065 // Verify that the 1st snapshot is cleaned up after the release
1066 wp_db->ReleaseSnapshotInternal(snap_seq1);
1067 {
1068 ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
1069 ReadLock rl(&wp_db->old_commit_map_mutex_);
1070 ASSERT_EQ(1, wp_db->old_commit_map_.size());
1071 ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
1072 }
1073
1074 // Verify that the 3rd snapshot is cleaned up after the release
1075 wp_db->ReleaseSnapshotInternal(snap_seq3);
1076 {
1077 ASSERT_TRUE(wp_db->old_commit_map_empty_.load());
1078 ReadLock rl(&wp_db->old_commit_map_mutex_);
1079 ASSERT_EQ(0, wp_db->old_commit_map_.size());
1080 }
1081 }
1082
1083 TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshots) {
1084 std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
1085 600l, 700l, 800l, 900l};
1086 const size_t snapshot_cache_bits = 2;
1087 const uint64_t cache_size = 1ul << snapshot_cache_bits;
1088 // Safety check to express the intended size in the test. Can be adjusted if
1089 // the snapshots lists changed.
1090 ASSERT_EQ((1ul << snapshot_cache_bits) * 2 + 1, snapshots.size());
1091 DBImpl* mock_db = new DBImpl(options, dbname);
1092 UpdateTransactionDBOptions(snapshot_cache_bits);
1093 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1094 new WritePreparedTxnDBMock(mock_db, txn_db_options));
1095 SequenceNumber version = 1000l;
1096 ASSERT_EQ(0, wp_db->snapshots_total_);
1097 wp_db->UpdateSnapshots(snapshots, version);
1098 ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_);
1099 // seq numbers are chosen so that we have two of them between each two
1100 // snapshots. If the diff of two consecutive seq is more than 5, there is a
1101 // snapshot between them.
1102 std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l,
1103 355l, 450l, 455l, 550l, 555l, 650l, 655l,
1104 750l, 755l, 850l, 855l, 950l, 955l};
1105 ASSERT_GT(seqs.size(), 1);
1106 for (size_t i = 0; i + 1 < seqs.size(); i++) {
1107 wp_db->old_commit_map_empty_ = true; // reset
1108 CommitEntry commit_entry = {seqs[i], seqs[i + 1]};
1109 wp_db->CheckAgainstSnapshots(commit_entry);
1110 // Expect update if there is snapshot in between the prepare and commit
1111 bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 &&
1112 commit_entry.commit_seq >= snapshots.front() &&
1113 commit_entry.prep_seq <= snapshots.back();
1114 ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
1115 }
1116
1117 // Test that search will include multiple snapshot from snapshot cache
1118 {
1119 // exclude first and last item in the cache
1120 CommitEntry commit_entry = {snapshots.front() + 1,
1121 snapshots[cache_size - 1] - 1};
1122 wp_db->old_commit_map_empty_ = true; // reset
1123 wp_db->old_commit_map_.clear();
1124 wp_db->CheckAgainstSnapshots(commit_entry);
1125 ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2);
1126 }
1127
1128 // Test that search will include multiple snapshot from old snapshots
1129 {
1130 // include two in the middle
1131 CommitEntry commit_entry = {snapshots[cache_size] + 1,
1132 snapshots[cache_size + 2] + 1};
1133 wp_db->old_commit_map_empty_ = true; // reset
1134 wp_db->old_commit_map_.clear();
1135 wp_db->CheckAgainstSnapshots(commit_entry);
1136 ASSERT_EQ(wp_db->old_commit_map_.size(), 2);
1137 }
1138
1139 // Test that search will include both snapshot cache and old snapshots
1140 // Case 1: includes all in snapshot cache
1141 {
1142 CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1};
1143 wp_db->old_commit_map_empty_ = true; // reset
1144 wp_db->old_commit_map_.clear();
1145 wp_db->CheckAgainstSnapshots(commit_entry);
1146 ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size());
1147 }
1148
1149 // Case 2: includes all snapshot caches except the smallest
1150 {
1151 CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1};
1152 wp_db->old_commit_map_empty_ = true; // reset
1153 wp_db->old_commit_map_.clear();
1154 wp_db->CheckAgainstSnapshots(commit_entry);
1155 ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1);
1156 }
1157
1158 // Case 3: includes only the largest of snapshot cache
1159 {
1160 CommitEntry commit_entry = {snapshots[cache_size - 1] - 1,
1161 snapshots.back() + 1};
1162 wp_db->old_commit_map_empty_ = true; // reset
1163 wp_db->old_commit_map_.clear();
1164 wp_db->CheckAgainstSnapshots(commit_entry);
1165 ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1);
1166 }
1167 }
1168
1169 #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
1170 // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
1171 // parallel with UpdateSnapshots.
1172 TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccess) {
1173 // We have a sync point in the method under test after checking each snapshot.
1174 // If you increase the max number of snapshots in this test, more sync points
1175 // in the methods must also be added.
1176 const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l,
1177 60l, 70l, 80l, 90l, 100l};
1178 const size_t snapshot_cache_bits = 2;
1179 // Safety check to express the intended size in the test. Can be adjusted if
1180 // the snapshots lists changed.
1181 ASSERT_EQ((1ul << snapshot_cache_bits) * 2 + 2, snapshots.size());
1182 SequenceNumber version = 1000l;
1183 // Choose the cache size so that the new snapshot list could replace all the
1184 // existing items in the cache and also have some overflow.
1185 DBImpl* mock_db = new DBImpl(options, dbname);
1186 UpdateTransactionDBOptions(snapshot_cache_bits);
1187 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1188 new WritePreparedTxnDBMock(mock_db, txn_db_options));
1189 const size_t extra = 2;
1190 size_t loop_id = 0;
1191 // Add up to extra items that do not fit into the cache
1192 for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra;
1193 old_size++) {
1194 const std::vector<SequenceNumber> old_snapshots(
1195 snapshots.begin(), snapshots.begin() + old_size);
1196
1197 // Each member of old snapshot might or might not appear in the new list. We
1198 // create a common_snapshots for each combination.
1199 size_t new_comb_cnt = size_t(1) << old_size;
1200 for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) {
1201 if (loop_id % split_cnt_ != split_id_) continue;
1202 printf("."); // To signal progress
1203 fflush(stdout);
1204 std::vector<SequenceNumber> common_snapshots;
1205 for (size_t i = 0; i < old_snapshots.size(); i++) {
1206 if (IsInCombination(i, new_comb)) {
1207 common_snapshots.push_back(old_snapshots[i]);
1208 }
1209 }
1210 // And add some new snapshots to the common list
1211 for (size_t added_snapshots = 0;
1212 added_snapshots <= snapshots.size() - old_snapshots.size();
1213 added_snapshots++) {
1214 std::vector<SequenceNumber> new_snapshots = common_snapshots;
1215 for (size_t i = 0; i < added_snapshots; i++) {
1216 new_snapshots.push_back(snapshots[old_snapshots.size() + i]);
1217 }
1218 for (auto it = common_snapshots.begin(); it != common_snapshots.end();
1219 ++it) {
1220 auto snapshot = *it;
1221 // Create a commit entry that is around the snapshot and thus should
1222 // be not be discarded
1223 CommitEntry entry = {static_cast<uint64_t>(snapshot - 1),
1224 snapshot + 1};
1225 // The critical part is when iterating the snapshot cache. Afterwards,
1226 // we are operating under the lock
1227 size_t a_range =
1228 std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
1229 size_t b_range =
1230 std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
1231 // Break each thread at two points
1232 for (size_t a1 = 1; a1 <= a_range; a1++) {
1233 for (size_t a2 = a1 + 1; a2 <= a_range; a2++) {
1234 for (size_t b1 = 1; b1 <= b_range; b1++) {
1235 for (size_t b2 = b1 + 1; b2 <= b_range; b2++) {
1236 SnapshotConcurrentAccessTestInternal(
1237 wp_db.get(), old_snapshots, new_snapshots, entry, version,
1238 a1, a2, b1, b2);
1239 }
1240 }
1241 }
1242 }
1243 }
1244 }
1245 }
1246 }
1247 printf("\n");
1248 }
1249 #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
1250
1251 // This test clarifies the contract of AdvanceMaxEvictedSeq method
1252 TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasic) {
1253 DBImpl* mock_db = new DBImpl(options, dbname);
1254 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
1255 new WritePreparedTxnDBMock(mock_db, txn_db_options));
1256
1257 // 1. Set the initial values for max, prepared, and snapshots
1258 SequenceNumber zero_max = 0l;
1259 // Set the initial list of prepared txns
1260 const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100,
1261 150, 200, 250};
1262 for (auto p : initial_prepared) {
1263 wp_db->AddPrepared(p);
1264 }
1265 // This updates the max value and also set old prepared
1266 SequenceNumber init_max = 100;
1267 wp_db->AdvanceMaxEvictedSeq(zero_max, init_max);
1268 const std::vector<SequenceNumber> initial_snapshots = {20, 40};
1269 wp_db->SetDBSnapshots(initial_snapshots);
1270 // This will update the internal cache of snapshots from the DB
1271 wp_db->UpdateSnapshots(initial_snapshots, init_max);
1272
1273 // 2. Invoke AdvanceMaxEvictedSeq
1274 const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300};
1275 wp_db->SetDBSnapshots(latest_snapshots);
1276 SequenceNumber new_max = 200;
1277 wp_db->AdvanceMaxEvictedSeq(init_max, new_max);
1278
1279 // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
1280 // a. max should be updated to new_max
1281 ASSERT_EQ(wp_db->max_evicted_seq_, new_max);
1282 // b. delayed prepared should contain every txn <= max and prepared should
1283 // only contain txns > max
1284 auto it = initial_prepared.begin();
1285 for (; it != initial_prepared.end() && *it <= new_max; ++it) {
1286 ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it));
1287 }
1288 ASSERT_TRUE(wp_db->delayed_prepared_.empty());
1289 for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty();
1290 ++it, wp_db->prepared_txns_.pop()) {
1291 ASSERT_EQ(*it, wp_db->prepared_txns_.top());
1292 }
1293 ASSERT_TRUE(it == initial_prepared.end());
1294 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1295 // c. snapshots should contain everything below new_max
1296 auto sit = latest_snapshots.begin();
1297 for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max &&
1298 i < wp_db->snapshots_total_;
1299 sit++, i++) {
1300 ASSERT_TRUE(i < wp_db->snapshots_total_);
1301 // This test is in small scale and the list of snapshots are assumed to be
1302 // within the cache size limit. This is just a safety check to double check
1303 // that assumption.
1304 ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE);
1305 ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]);
1306 }
1307 }
1308
1309 // A new snapshot should always be always larger than max_evicted_seq_
1310 // Otherwise the snapshot does not go through AdvanceMaxEvictedSeq
1311 TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) {
1312 WriteOptions woptions;
1313 TransactionOptions txn_options;
1314 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1315 Transaction* txn0 = db->BeginTransaction(woptions, txn_options);
1316 ASSERT_OK(txn0->Put(Slice("key"), Slice("value")));
1317 ASSERT_OK(txn0->Commit());
1318 const SequenceNumber seq = txn0->GetId(); // is also prepare seq
1319 delete txn0;
1320 std::vector<Transaction*> txns;
1321 // Inc seq without committing anything
1322 for (int i = 0; i < 10; i++) {
1323 Transaction* txn = db->BeginTransaction(woptions, txn_options);
1324 ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
1325 ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value")));
1326 ASSERT_OK(txn->Prepare());
1327 txns.push_back(txn);
1328 }
1329
1330 // The new commit is seq + 10
1331 ASSERT_OK(db->Put(woptions, "key", "value"));
1332 auto snap = wp_db->GetSnapshot();
1333 const SequenceNumber last_seq = snap->GetSequenceNumber();
1334 wp_db->ReleaseSnapshot(snap);
1335 ASSERT_LT(seq, last_seq);
1336 // Otherwise our test is not effective
1337 ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED);
1338
1339 // Evict seq out of commit cache
1340 const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE;
1341 // Check that the next write could make max go beyond last
1342 auto last_max = wp_db->max_evicted_seq_.load();
1343 wp_db->AddCommitted(overwrite_seq, overwrite_seq);
1344 // Check that eviction has advanced the max
1345 ASSERT_LT(last_max, wp_db->max_evicted_seq_.load());
1346 // Check that the new max has not advanced the last seq
1347 ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq);
1348 for (auto txn : txns) {
1349 txn->Rollback();
1350 delete txn;
1351 }
1352 }
1353
1354 // A new snapshot should always be always larger than max_evicted_seq_
1355 // In very rare cases max could be below last published seq. Test that
1356 // taking snapshot will wait for max to catch up.
1357 TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
1358 const size_t snapshot_cache_bits = 7; // same as default
1359 const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
1360 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1361 ASSERT_OK(ReOpen());
1362 WriteOptions woptions;
1363 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1364
1365 const int writes = 50;
1366 const int batch_cnt = 4;
1367 ROCKSDB_NAMESPACE::port::Thread t1([&]() {
1368 for (int i = 0; i < writes; i++) {
1369 WriteBatch batch;
1370 // For duplicate keys cause 4 commit entries, each evicting an entry that
1371 // is not published yet, thus causing max evicted seq go higher than last
1372 // published.
1373 for (int b = 0; b < batch_cnt; b++) {
1374 ASSERT_OK(batch.Put("foo", "foo"));
1375 }
1376 ASSERT_OK(db->Write(woptions, &batch));
1377 }
1378 });
1379
1380 ROCKSDB_NAMESPACE::port::Thread t2([&]() {
1381 while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
1382 std::this_thread::yield();
1383 }
1384 for (int i = 0; i < 10; i++) {
1385 SequenceNumber max_lower_bound = wp_db->max_evicted_seq_;
1386 auto snap = db->GetSnapshot();
1387 if (snap->GetSequenceNumber() != 0) {
1388 // Value of max_evicted_seq_ when snapshot was taken in unknown. We thus
1389 // compare with the lower bound instead as an approximation.
1390 ASSERT_LT(max_lower_bound, snap->GetSequenceNumber());
1391 } // seq 0 is ok to be less than max since nothing is visible to it
1392 db->ReleaseSnapshot(snap);
1393 }
1394 });
1395
1396 t1.join();
1397 t2.join();
1398
1399 // Make sure that the test has worked and seq number has advanced as we
1400 // thought
1401 auto snap = db->GetSnapshot();
1402 ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1);
1403 db->ReleaseSnapshot(snap);
1404 }
1405
1406 // Test that reads without snapshots would not hit an undefined state
1407 TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) {
1408 const size_t snapshot_cache_bits = 7; // same as default
1409 const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
1410 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1411 ASSERT_OK(ReOpen());
1412 WriteOptions woptions;
1413 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1414
1415 const int writes = 50;
1416 ROCKSDB_NAMESPACE::port::Thread t1([&]() {
1417 for (int i = 0; i < writes; i++) {
1418 WriteBatch batch;
1419 ASSERT_OK(batch.Put("key", "foo"));
1420 ASSERT_OK(db->Write(woptions, &batch));
1421 }
1422 });
1423
1424 ROCKSDB_NAMESPACE::port::Thread t2([&]() {
1425 while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
1426 std::this_thread::yield();
1427 }
1428 ReadOptions ropt;
1429 PinnableSlice pinnable_val;
1430 TransactionOptions txn_options;
1431 for (int i = 0; i < 10; i++) {
1432 auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1433 ASSERT_TRUE(s.ok() || s.IsTryAgain());
1434 pinnable_val.Reset();
1435 Transaction* txn = db->BeginTransaction(woptions, txn_options);
1436 s = txn->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1437 ASSERT_TRUE(s.ok() || s.IsTryAgain());
1438 pinnable_val.Reset();
1439 std::vector<std::string> values;
1440 auto s_vec =
1441 txn->MultiGet(ropt, {db->DefaultColumnFamily()}, {"key"}, &values);
1442 ASSERT_EQ(1, values.size());
1443 ASSERT_EQ(1, s_vec.size());
1444 s = s_vec[0];
1445 ASSERT_TRUE(s.ok() || s.IsTryAgain());
1446 Slice key("key");
1447 txn->MultiGet(ropt, db->DefaultColumnFamily(), 1, &key, &pinnable_val, &s,
1448 true);
1449 ASSERT_TRUE(s.ok() || s.IsTryAgain());
1450 delete txn;
1451 }
1452 });
1453
1454 t1.join();
1455 t2.join();
1456
1457 // Make sure that the test has worked and seq number has advanced as we
1458 // thought
1459 auto snap = db->GetSnapshot();
1460 ASSERT_GT(snap->GetSequenceNumber(), writes - 1);
1461 db->ReleaseSnapshot(snap);
1462 }
1463
1464 // Check that old_commit_map_ cleanup works correctly if the snapshot equals
1465 // max_evicted_seq_.
1466 TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) {
1467 const size_t snapshot_cache_bits = 7; // same as default
1468 const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
1469 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1470 ASSERT_OK(ReOpen());
1471 WriteOptions woptions;
1472 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1473 // Insert something to increase seq
1474 ASSERT_OK(db->Put(woptions, "key", "value"));
1475 auto snap = db->GetSnapshot();
1476 auto snap_seq = snap->GetSequenceNumber();
1477 // Another insert should trigger eviction + load snapshot from db
1478 ASSERT_OK(db->Put(woptions, "key", "value"));
1479 // This is the scenario that we check agaisnt
1480 ASSERT_EQ(snap_seq, wp_db->max_evicted_seq_);
1481 // old_commit_map_ now has some data that needs gc
1482 ASSERT_EQ(1, wp_db->snapshots_total_);
1483 ASSERT_EQ(1, wp_db->old_commit_map_.size());
1484
1485 db->ReleaseSnapshot(snap);
1486
1487 // Another insert should trigger eviction + load snapshot from db
1488 ASSERT_OK(db->Put(woptions, "key", "value"));
1489
1490 // the snapshot and related metadata must be properly garbage collected
1491 ASSERT_EQ(0, wp_db->snapshots_total_);
1492 ASSERT_TRUE(wp_db->snapshots_all_.empty());
1493 ASSERT_EQ(0, wp_db->old_commit_map_.size());
1494 }
1495
1496 TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) {
1497 auto snap = db->GetSnapshot();
1498 auto seq1 = snap->GetSequenceNumber();
1499 db->ReleaseSnapshot(snap);
1500
1501 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1502 wp_db->AdvanceSeqByOne();
1503
1504 snap = db->GetSnapshot();
1505 auto seq2 = snap->GetSequenceNumber();
1506 db->ReleaseSnapshot(snap);
1507
1508 ASSERT_LT(seq1, seq2);
1509 }
1510
1511 // Test that the txn Initilize calls the overridden functions
1512 TEST_P(WritePreparedTransactionTest, TxnInitialize) {
1513 TransactionOptions txn_options;
1514 WriteOptions write_options;
1515 ASSERT_OK(db->Put(write_options, "key", "value"));
1516 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
1517 ASSERT_OK(txn0->SetName("xid"));
1518 ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
1519 ASSERT_OK(txn0->Prepare());
1520
1521 // SetSnapshot is overridden to update min_uncommitted_
1522 txn_options.set_snapshot = true;
1523 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1524 auto snap = txn1->GetSnapshot();
1525 auto snap_impl = reinterpret_cast<const SnapshotImpl*>(snap);
1526 // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
1527 // udpated
1528 ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq);
1529
1530 ASSERT_OK(txn0->Rollback());
1531 ASSERT_OK(txn1->Rollback());
1532 delete txn0;
1533 delete txn1;
1534 }
1535
1536 // This tests that transactions with duplicate keys perform correctly after max
1537 // is advancing their prepared sequence numbers. This will not be the case if
1538 // for example the txn does not add the prepared seq for the second sub-batch to
1539 // the PreparedHeap structure.
1540 TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicates) {
1541 const size_t snapshot_cache_bits = 7; // same as default
1542 const size_t commit_cache_bits = 1; // disable commit cache
1543 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1544 ASSERT_OK(ReOpen());
1545
1546 ReadOptions ropt;
1547 PinnableSlice pinnable_val;
1548 WriteOptions write_options;
1549 TransactionOptions txn_options;
1550 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
1551 ASSERT_OK(txn0->SetName("xid"));
1552 ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
1553 ASSERT_OK(txn0->Put(Slice("key"), Slice("value2")));
1554 ASSERT_OK(txn0->Prepare());
1555
1556 ASSERT_OK(db->Put(write_options, "key2", "value"));
1557 // Will cause max advance due to disabled commit cache
1558 ASSERT_OK(db->Put(write_options, "key3", "value"));
1559
1560 auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1561 ASSERT_TRUE(s.IsNotFound());
1562 delete txn0;
1563
1564 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1565 ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
1566 wp_db->TEST_Crash();
1567 ASSERT_OK(ReOpenNoDelete());
1568 ASSERT_NE(db, nullptr);
1569 s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1570 ASSERT_TRUE(s.IsNotFound());
1571
1572 txn0 = db->GetTransactionByName("xid");
1573 ASSERT_OK(txn0->Rollback());
1574 delete txn0;
1575 }
1576
1577 #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
1578 // Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and
1579 // delayed_prepared_, when is run concurrently with advancing max_evicted_seq,
1580 // which moves prepared txns from prepared_txns_ to delayed_prepared_.
1581 TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) {
1582 const size_t snapshot_cache_bits = 7; // same as default
1583 const size_t commit_cache_bits = 1; // disable commit cache
1584 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
1585 ASSERT_OK(ReOpen());
1586 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1587 ReadOptions ropt;
1588 PinnableSlice pinnable_val;
1589 WriteOptions write_options;
1590 TransactionOptions txn_options;
1591 std::vector<Transaction*> txns, committed_txns;
1592
1593 const int cnt = 100;
1594 for (int i = 0; i < cnt; i++) {
1595 Transaction* txn = db->BeginTransaction(write_options, txn_options);
1596 ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
1597 auto key = "key1" + std::to_string(i);
1598 auto value = "value1" + std::to_string(i);
1599 ASSERT_OK(txn->Put(Slice(key), Slice(value)));
1600 ASSERT_OK(txn->Prepare());
1601 txns.push_back(txn);
1602 }
1603
1604 port::Mutex mutex;
1605 Random rnd(1103);
1606 ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
1607 for (int i = 0; i < cnt; i++) {
1608 uint32_t index = rnd.Uniform(cnt - i);
1609 Transaction* txn;
1610 {
1611 MutexLock l(&mutex);
1612 txn = txns[index];
1613 txns.erase(txns.begin() + index);
1614 }
1615 // Since commit cache is practically disabled, commit results in immediate
1616 // advance in max_evicted_seq_ and subsequently moving some prepared txns
1617 // to delayed_prepared_.
1618 ASSERT_OK(txn->Commit());
1619 committed_txns.push_back(txn);
1620 }
1621 });
1622 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
1623 while (1) {
1624 MutexLock l(&mutex);
1625 if (txns.empty()) {
1626 break;
1627 }
1628 auto min_uncommitted = wp_db->SmallestUnCommittedSeq();
1629 ASSERT_LE(min_uncommitted, (*txns.begin())->GetId());
1630 }
1631 });
1632
1633 commit_thread.join();
1634 read_thread.join();
1635 for (auto txn : committed_txns) {
1636 delete txn;
1637 }
1638 }
1639 #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
1640
1641 TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
1642 // Given the sequential run of txns, with this timeout we should never see a
1643 // deadlock nor a timeout unless we have a key conflict, which should be
1644 // almost infeasible.
1645 txn_db_options.transaction_lock_timeout = 1000;
1646 txn_db_options.default_lock_timeout = 1000;
1647 ASSERT_OK(ReOpen());
1648 FlushOptions fopt;
1649
1650 // Number of different txn types we use in this test
1651 const size_t type_cnt = 5;
1652 // The size of the first write group
1653 // TODO(myabandeh): This should be increase for pre-release tests
1654 const size_t first_group_size = 2;
1655 // Total number of txns we run in each test
1656 // TODO(myabandeh): This should be increase for pre-release tests
1657 const size_t txn_cnt = first_group_size + 1;
1658
1659 size_t base[txn_cnt + 1] = {
1660 1,
1661 };
1662 for (size_t bi = 1; bi <= txn_cnt; bi++) {
1663 base[bi] = base[bi - 1] * type_cnt;
1664 }
1665 const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt));
1666 printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n);
1667 for (size_t n = 0; n < max_n; n++) {
1668 if (n > 0) {
1669 ASSERT_OK(ReOpen());
1670 }
1671
1672 if (n % split_cnt_ != split_id_) continue;
1673 if (n % 1000 == 0) {
1674 printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
1675 }
1676 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1677 auto seq = db_impl->TEST_GetLastVisibleSequence();
1678 with_empty_commits = 0;
1679 exp_seq = seq;
1680 // This is increased before writing the batch for commit
1681 commit_writes = 0;
1682 // This is increased before txn starts linking if it expects to do a commit
1683 // eventually
1684 expected_commits = 0;
1685 std::vector<port::Thread> threads;
1686
1687 linked.store(0, std::memory_order_release);
1688 std::atomic<bool> batch_formed(false);
1689 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1690 "WriteThread::EnterAsBatchGroupLeader:End",
1691 [&](void* /*arg*/) { batch_formed = true; });
1692 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1693 "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
1694 size_t orig_linked = linked.fetch_add(1, std::memory_order_acq_rel);
1695 if (orig_linked == 0) {
1696 // Wait until the others are linked too.
1697 while (linked.load(std::memory_order_acquire) < first_group_size) {
1698 }
1699 } else if (orig_linked == first_group_size) {
1700 // Make the 2nd batch of the rest of writes plus any followup
1701 // commits from the first batch
1702 while (linked.load(std::memory_order_acquire) <
1703 txn_cnt + commit_writes) {
1704 }
1705 }
1706 // Then we will have one or more batches consisting of follow-up
1707 // commits from the 2nd batch. There is a bit of non-determinism here
1708 // but it should be tolerable.
1709 });
1710
1711 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1712 for (size_t bi = 0; bi < txn_cnt; bi++) {
1713 // get the bi-th digit in number system based on type_cnt
1714 size_t d = (n % base[bi + 1]) / base[bi];
1715 switch (d) {
1716 case 0:
1717 threads.emplace_back(&TransactionTestBase::TestTxn0, this, bi);
1718 break;
1719 case 1:
1720 threads.emplace_back(&TransactionTestBase::TestTxn1, this, bi);
1721 break;
1722 case 2:
1723 threads.emplace_back(&TransactionTestBase::TestTxn2, this, bi);
1724 break;
1725 case 3:
1726 threads.emplace_back(&TransactionTestBase::TestTxn3, this, bi);
1727 break;
1728 case 4:
1729 threads.emplace_back(&TransactionTestBase::TestTxn3, this, bi);
1730 break;
1731 default:
1732 FAIL();
1733 }
1734 // wait to be linked
1735 while (linked.load(std::memory_order_acquire) <= bi) {
1736 }
1737 // after a queue of size first_group_size
1738 if (bi + 1 == first_group_size) {
1739 while (!batch_formed) {
1740 }
1741 // to make it more deterministic, wait until the commits are linked
1742 while (linked.load(std::memory_order_acquire) <=
1743 bi + expected_commits) {
1744 }
1745 }
1746 }
1747 for (auto& t : threads) {
1748 t.join();
1749 }
1750 if (options.two_write_queues) {
1751 // In this case none of the above scheduling tricks to deterministically
1752 // form merged batches works because the writes go to separate queues.
1753 // This would result in different write groups in each run of the test. We
1754 // still keep the test since although non-deterministic and hard to debug,
1755 // it is still useful to have.
1756 // TODO(myabandeh): Add a deterministic unit test for two_write_queues
1757 }
1758
1759 // Check if memtable inserts advanced seq number as expected
1760 seq = db_impl->TEST_GetLastVisibleSequence();
1761 ASSERT_EQ(exp_seq, seq);
1762
1763 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1764 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1765
1766 // Check if recovery preserves the last sequence number
1767 ASSERT_OK(db_impl->FlushWAL(true));
1768 ASSERT_OK(ReOpenNoDelete());
1769 ASSERT_NE(db, nullptr);
1770 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1771 seq = db_impl->TEST_GetLastVisibleSequence();
1772 ASSERT_LE(exp_seq, seq + with_empty_commits);
1773
1774 // Check if flush preserves the last sequence number
1775 ASSERT_OK(db_impl->Flush(fopt));
1776 seq = db_impl->GetLatestSequenceNumber();
1777 ASSERT_LE(exp_seq, seq + with_empty_commits);
1778
1779 // Check if recovery after flush preserves the last sequence number
1780 ASSERT_OK(db_impl->FlushWAL(true));
1781 ASSERT_OK(ReOpenNoDelete());
1782 ASSERT_NE(db, nullptr);
1783 db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1784 seq = db_impl->GetLatestSequenceNumber();
1785 ASSERT_LE(exp_seq, seq + with_empty_commits);
1786 }
1787 }
1788
1789 // Run a couple of different txns among them some uncommitted. Restart the db at
1790 // a couple points to check whether the list of uncommitted txns are recovered
1791 // properly.
1792 TEST_P(WritePreparedTransactionTest, BasicRecovery) {
1793 options.disable_auto_compactions = true;
1794 ASSERT_OK(ReOpen());
1795 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1796
1797 TestTxn0(0);
1798
1799 TransactionOptions txn_options;
1800 WriteOptions write_options;
1801 size_t index = 1000;
1802 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
1803 auto istr0 = std::to_string(index);
1804 auto s = txn0->SetName("xid" + istr0);
1805 ASSERT_OK(s);
1806 s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0));
1807 ASSERT_OK(s);
1808 s = txn0->Prepare();
1809 ASSERT_OK(s);
1810 auto prep_seq_0 = txn0->GetId();
1811
1812 TestTxn1(0);
1813
1814 index++;
1815 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1816 auto istr1 = std::to_string(index);
1817 s = txn1->SetName("xid" + istr1);
1818 ASSERT_OK(s);
1819 s = txn1->Put(Slice("foo1" + istr1), Slice("bar"));
1820 ASSERT_OK(s);
1821 s = txn1->Prepare();
1822 ASSERT_OK(s);
1823 auto prep_seq_1 = txn1->GetId();
1824
1825 TestTxn2(0);
1826
1827 ReadOptions ropt;
1828 PinnableSlice pinnable_val;
1829 // Check the value is not committed before restart
1830 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1831 ASSERT_TRUE(s.IsNotFound());
1832 pinnable_val.Reset();
1833
1834 delete txn0;
1835 delete txn1;
1836 ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
1837 wp_db->TEST_Crash();
1838 ASSERT_OK(ReOpenNoDelete());
1839 ASSERT_NE(db, nullptr);
1840 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1841 // After recovery, all the uncommitted txns (0 and 1) should be inserted into
1842 // delayed_prepared_
1843 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1844 ASSERT_FALSE(wp_db->delayed_prepared_empty_);
1845 ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
1846 ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_);
1847 {
1848 ReadLock rl(&wp_db->prepared_mutex_);
1849 ASSERT_EQ(2, wp_db->delayed_prepared_.size());
1850 ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) !=
1851 wp_db->delayed_prepared_.end());
1852 ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) !=
1853 wp_db->delayed_prepared_.end());
1854 }
1855
1856 // Check the value is still not committed after restart
1857 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1858 ASSERT_TRUE(s.IsNotFound());
1859 pinnable_val.Reset();
1860
1861 TestTxn3(0);
1862
1863 // Test that a recovered txns will be properly marked committed for the next
1864 // recovery
1865 txn1 = db->GetTransactionByName("xid" + istr1);
1866 ASSERT_NE(txn1, nullptr);
1867 ASSERT_OK(txn1->Commit());
1868 delete txn1;
1869
1870 index++;
1871 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1872 auto istr2 = std::to_string(index);
1873 s = txn2->SetName("xid" + istr2);
1874 ASSERT_OK(s);
1875 s = txn2->Put(Slice("foo2" + istr2), Slice("bar"));
1876 ASSERT_OK(s);
1877 s = txn2->Prepare();
1878 ASSERT_OK(s);
1879 auto prep_seq_2 = txn2->GetId();
1880
1881 delete txn2;
1882 ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
1883 wp_db->TEST_Crash();
1884 ASSERT_OK(ReOpenNoDelete());
1885 ASSERT_NE(db, nullptr);
1886 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1887 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1888 ASSERT_FALSE(wp_db->delayed_prepared_empty_);
1889
1890 // 0 and 2 are prepared and 1 is committed
1891 {
1892 ReadLock rl(&wp_db->prepared_mutex_);
1893 ASSERT_EQ(2, wp_db->delayed_prepared_.size());
1894 const auto& end = wp_db->delayed_prepared_.end();
1895 ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end);
1896 ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end);
1897 ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end);
1898 }
1899 ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
1900 ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_);
1901
1902 // Commit all the remaining txns
1903 txn0 = db->GetTransactionByName("xid" + istr0);
1904 ASSERT_NE(txn0, nullptr);
1905 ASSERT_OK(txn0->Commit());
1906 txn2 = db->GetTransactionByName("xid" + istr2);
1907 ASSERT_NE(txn2, nullptr);
1908 ASSERT_OK(txn2->Commit());
1909
1910 // Check the value is committed after commit
1911 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1912 ASSERT_TRUE(s.ok());
1913 ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
1914 pinnable_val.Reset();
1915
1916 delete txn0;
1917 delete txn2;
1918 ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
1919 ASSERT_OK(ReOpenNoDelete());
1920 ASSERT_NE(db, nullptr);
1921 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1922 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1923 ASSERT_TRUE(wp_db->delayed_prepared_empty_);
1924
1925 // Check the value is still committed after recovery
1926 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1927 ASSERT_TRUE(s.ok());
1928 ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
1929 pinnable_val.Reset();
1930 }
1931
1932 // After recovery the commit map is empty while the max is set. The code would
1933 // go through a different path which requires a separate test. Test that the
1934 // committed data before the restart is visible to all snapshots.
1935 TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMap) {
1936 for (bool end_with_prepare : {false, true}) {
1937 ASSERT_OK(ReOpen());
1938 WriteOptions woptions;
1939 ASSERT_OK(db->Put(woptions, "key", "value"));
1940 ASSERT_OK(db->Put(woptions, "key", "value"));
1941 ASSERT_OK(db->Put(woptions, "key", "value"));
1942 SequenceNumber prepare_seq = kMaxSequenceNumber;
1943 if (end_with_prepare) {
1944 TransactionOptions txn_options;
1945 Transaction* txn = db->BeginTransaction(woptions, txn_options);
1946 ASSERT_OK(txn->SetName("xid0"));
1947 ASSERT_OK(txn->Prepare());
1948 prepare_seq = txn->GetId();
1949 delete txn;
1950 }
1951 dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
1952 auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
1953 ASSERT_OK(db_impl->FlushWAL(true));
1954 ASSERT_OK(ReOpenNoDelete());
1955 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1956 ASSERT_NE(wp_db, nullptr);
1957 ASSERT_GT(wp_db->max_evicted_seq_, 0); // max after recovery
1958 // Take a snapshot right after recovery
1959 const Snapshot* snap = db->GetSnapshot();
1960 auto snap_seq = snap->GetSequenceNumber();
1961 ASSERT_GT(snap_seq, 0);
1962
1963 for (SequenceNumber seq = 0;
1964 seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
1965 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
1966 }
1967 if (end_with_prepare) {
1968 ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
1969 }
1970 // trivial check
1971 ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
1972
1973 db->ReleaseSnapshot(snap);
1974
1975 ASSERT_OK(db->Put(woptions, "key", "value"));
1976 // Take a snapshot after some writes
1977 snap = db->GetSnapshot();
1978 snap_seq = snap->GetSequenceNumber();
1979 for (SequenceNumber seq = 0;
1980 seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
1981 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
1982 }
1983 if (end_with_prepare) {
1984 ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
1985 }
1986 // trivial check
1987 ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
1988
1989 db->ReleaseSnapshot(snap);
1990 }
1991 }
1992
1993 // Shows the contract of IsInSnapshot when called on invalid/released snapshots
1994 TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) {
1995 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1996 WriteOptions woptions;
1997 ASSERT_OK(db->Put(woptions, "key", "value"));
1998 // snap seq = 1
1999 const Snapshot* snap1 = db->GetSnapshot();
2000 ASSERT_OK(db->Put(woptions, "key", "value"));
2001 ASSERT_OK(db->Put(woptions, "key", "value"));
2002 // snap seq = 3
2003 const Snapshot* snap2 = db->GetSnapshot();
2004 const SequenceNumber seq = 1;
2005 // Evict seq out of commit cache
2006 size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq;
2007 wp_db->AddCommitted(overwrite_seq, overwrite_seq);
2008 SequenceNumber snap_seq;
2009 uint64_t min_uncommitted = kMinUnCommittedSeq;
2010 bool released;
2011
2012 released = false;
2013 snap_seq = snap1->GetSequenceNumber();
2014 ASSERT_LE(seq, snap_seq);
2015 // Valid snapshot lower than max
2016 ASSERT_LE(snap_seq, wp_db->max_evicted_seq_);
2017 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2018 ASSERT_FALSE(released);
2019
2020 released = false;
2021 snap_seq = snap1->GetSequenceNumber();
2022 // Invaid snapshot lower than max
2023 ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_);
2024 ASSERT_TRUE(
2025 wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
2026 ASSERT_TRUE(released);
2027
2028 db->ReleaseSnapshot(snap1);
2029
2030 released = false;
2031 // Released snapshot lower than max
2032 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2033 // The release does not take affect until the next max advance
2034 ASSERT_FALSE(released);
2035
2036 released = false;
2037 // Invaid snapshot lower than max
2038 ASSERT_TRUE(
2039 wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
2040 ASSERT_TRUE(released);
2041
2042 // This make the snapshot release to reflect in txn db structures
2043 wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_,
2044 wp_db->max_evicted_seq_ + 1);
2045
2046 released = false;
2047 // Released snapshot lower than max
2048 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2049 ASSERT_TRUE(released);
2050
2051 released = false;
2052 // Invaid snapshot lower than max
2053 ASSERT_TRUE(
2054 wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
2055 ASSERT_TRUE(released);
2056
2057 snap_seq = snap2->GetSequenceNumber();
2058
2059 released = false;
2060 // Unreleased snapshot lower than max
2061 ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
2062 ASSERT_FALSE(released);
2063
2064 db->ReleaseSnapshot(snap2);
2065 }
2066
2067 // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
2068 // snapshot, max_committed_seq_, prepared, and commit entries.
2069 TEST_P(WritePreparedTransactionTest, IsInSnapshot) {
2070 WriteOptions wo;
2071 // Use small commit cache to trigger lots of eviction and fast advance of
2072 // max_evicted_seq_
2073 const size_t commit_cache_bits = 3;
2074 // Same for snapshot cache size
2075 const size_t snapshot_cache_bits = 2;
2076
2077 // Take some preliminary snapshots first. This is to stress the data structure
2078 // that holds the old snapshots as it will be designed to be efficient when
2079 // only a few snapshots are below the max_evicted_seq_.
2080 for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
2081 // Leave some gap between the preliminary snapshots and the final snapshot
2082 // that we check. This should test for also different overlapping scenarios
2083 // between the last snapshot and the commits.
2084 for (int max_gap = 1; max_gap < 10; max_gap++) {
2085 // Since we do not actually write to db, we mock the seq as it would be
2086 // increased by the db. The only exception is that we need db seq to
2087 // advance for our snapshots. for which we apply a dummy put each time we
2088 // increase our mock of seq.
2089 uint64_t seq = 0;
2090 // At each step we prepare a txn and then we commit it in the next txn.
2091 // This emulates the consecutive transactions that write to the same key
2092 uint64_t cur_txn = 0;
2093 // Number of snapshots taken so far
2094 int num_snapshots = 0;
2095 // Number of gaps applied so far
2096 int gap_cnt = 0;
2097 // The final snapshot that we will inspect
2098 uint64_t snapshot = 0;
2099 bool found_committed = false;
2100 // To stress the data structure that maintain prepared txns, at each cycle
2101 // we add a new prepare txn. These do not mean to be committed for
2102 // snapshot inspection.
2103 std::set<uint64_t> prepared;
2104 // We keep the list of txns committed before we take the last snapshot.
2105 // These should be the only seq numbers that will be found in the snapshot
2106 std::set<uint64_t> committed_before;
2107 // The set of commit seq numbers to be excluded from IsInSnapshot queries
2108 std::set<uint64_t> commit_seqs;
2109 DBImpl* mock_db = new DBImpl(options, dbname);
2110 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2111 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
2112 new WritePreparedTxnDBMock(mock_db, txn_db_options));
2113 // We continue until max advances a bit beyond the snapshot.
2114 while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
2115 // do prepare for a transaction
2116 seq++;
2117 wp_db->AddPrepared(seq);
2118 prepared.insert(seq);
2119
2120 // If cur_txn is not started, do prepare for it.
2121 if (!cur_txn) {
2122 seq++;
2123 cur_txn = seq;
2124 wp_db->AddPrepared(cur_txn);
2125 } else { // else commit it
2126 seq++;
2127 wp_db->AddCommitted(cur_txn, seq);
2128 wp_db->RemovePrepared(cur_txn);
2129 commit_seqs.insert(seq);
2130 if (!snapshot) {
2131 committed_before.insert(cur_txn);
2132 }
2133 cur_txn = 0;
2134 }
2135
2136 if (num_snapshots < max_snapshots - 1) {
2137 // Take preliminary snapshots
2138 wp_db->TakeSnapshot(seq);
2139 num_snapshots++;
2140 } else if (gap_cnt < max_gap) {
2141 // Wait for some gap before taking the final snapshot
2142 gap_cnt++;
2143 } else if (!snapshot) {
2144 // Take the final snapshot if it is not already taken
2145 snapshot = seq;
2146 wp_db->TakeSnapshot(snapshot);
2147 num_snapshots++;
2148 }
2149
2150 // If the snapshot is taken, verify seq numbers visible to it. We redo
2151 // it at each cycle to test that the system is still sound when
2152 // max_evicted_seq_ advances.
2153 if (snapshot) {
2154 for (uint64_t s = 1;
2155 s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) {
2156 bool was_committed =
2157 (committed_before.find(s) != committed_before.end());
2158 bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
2159 if (was_committed != is_in_snapshot) {
2160 printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
2161 " snapshot %" PRIu64
2162 " gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
2163 max_snapshots, max_gap, seq,
2164 wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
2165 num_snapshots, s);
2166 }
2167 ASSERT_EQ(was_committed, is_in_snapshot);
2168 found_committed = found_committed || is_in_snapshot;
2169 }
2170 }
2171 }
2172 // Safety check to make sure the test actually ran
2173 ASSERT_TRUE(found_committed);
2174 // As an extra check, check if prepared set will be properly empty after
2175 // they are committed.
2176 if (cur_txn) {
2177 wp_db->AddCommitted(cur_txn, seq);
2178 wp_db->RemovePrepared(cur_txn);
2179 }
2180 for (auto p : prepared) {
2181 wp_db->AddCommitted(p, seq);
2182 wp_db->RemovePrepared(p);
2183 }
2184 ASSERT_TRUE(wp_db->delayed_prepared_.empty());
2185 ASSERT_TRUE(wp_db->prepared_txns_.empty());
2186 }
2187 }
2188 }
2189
2190 void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
2191 PinnableSlice& exp_v, Slice key) {
2192 Status s;
2193 PinnableSlice v;
2194 s = db->Get(roptions, db->DefaultColumnFamily(), key, &v);
2195 ASSERT_EQ(exp_s, s);
2196 ASSERT_TRUE(s.ok() || s.IsNotFound());
2197 if (s.ok()) {
2198 ASSERT_TRUE(exp_v == v);
2199 }
2200
2201 // Try with MultiGet API too
2202 std::vector<std::string> values;
2203 auto s_vec =
2204 db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values);
2205 ASSERT_EQ(1, values.size());
2206 ASSERT_EQ(1, s_vec.size());
2207 s = s_vec[0];
2208 ASSERT_EQ(exp_s, s);
2209 ASSERT_TRUE(s.ok() || s.IsNotFound());
2210 if (s.ok()) {
2211 ASSERT_TRUE(exp_v == values[0]);
2212 }
2213 }
2214
2215 void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v,
2216 Slice key) {
2217 ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key);
2218 }
2219
2220 TEST_P(WritePreparedTransactionTest, Rollback) {
2221 ReadOptions roptions;
2222 WriteOptions woptions;
2223 TransactionOptions txn_options;
2224 const size_t num_keys = 4;
2225 const size_t num_values = 5;
2226 for (size_t ikey = 1; ikey <= num_keys; ikey++) {
2227 for (size_t ivalue = 0; ivalue < num_values; ivalue++) {
2228 for (bool crash : {false, true}) {
2229 ASSERT_OK(ReOpen());
2230 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
2231 std::string key_str = "key" + std::to_string(ikey);
2232 switch (ivalue) {
2233 case 0:
2234 break;
2235 case 1:
2236 ASSERT_OK(db->Put(woptions, key_str, "initvalue1"));
2237 break;
2238 case 2:
2239 ASSERT_OK(db->Merge(woptions, key_str, "initvalue2"));
2240 break;
2241 case 3:
2242 ASSERT_OK(db->Delete(woptions, key_str));
2243 break;
2244 case 4:
2245 ASSERT_OK(db->SingleDelete(woptions, key_str));
2246 break;
2247 default:
2248 FAIL();
2249 }
2250
2251 PinnableSlice v1;
2252 auto s1 =
2253 db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1);
2254 PinnableSlice v2;
2255 auto s2 =
2256 db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2);
2257 PinnableSlice v3;
2258 auto s3 =
2259 db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3);
2260 PinnableSlice v4;
2261 auto s4 =
2262 db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4);
2263 Transaction* txn = db->BeginTransaction(woptions, txn_options);
2264 auto s = txn->SetName("xid0");
2265 ASSERT_OK(s);
2266 s = txn->Put(Slice("key1"), Slice("value1"));
2267 ASSERT_OK(s);
2268 s = txn->Merge(Slice("key2"), Slice("value2"));
2269 ASSERT_OK(s);
2270 s = txn->Delete(Slice("key3"));
2271 ASSERT_OK(s);
2272 s = txn->SingleDelete(Slice("key4"));
2273 ASSERT_OK(s);
2274 s = txn->Prepare();
2275 ASSERT_OK(s);
2276
2277 {
2278 ReadLock rl(&wp_db->prepared_mutex_);
2279 ASSERT_FALSE(wp_db->prepared_txns_.empty());
2280 ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top());
2281 }
2282
2283 ASSERT_SAME(db, s1, v1, "key1");
2284 ASSERT_SAME(db, s2, v2, "key2");
2285 ASSERT_SAME(db, s3, v3, "key3");
2286 ASSERT_SAME(db, s4, v4, "key4");
2287
2288 if (crash) {
2289 delete txn;
2290 auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
2291 ASSERT_OK(db_impl->FlushWAL(true));
2292 dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
2293 ASSERT_OK(ReOpenNoDelete());
2294 ASSERT_NE(db, nullptr);
2295 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
2296 txn = db->GetTransactionByName("xid0");
2297 ASSERT_FALSE(wp_db->delayed_prepared_empty_);
2298 ReadLock rl(&wp_db->prepared_mutex_);
2299 ASSERT_TRUE(wp_db->prepared_txns_.empty());
2300 ASSERT_FALSE(wp_db->delayed_prepared_.empty());
2301 ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) !=
2302 wp_db->delayed_prepared_.end());
2303 }
2304
2305 ASSERT_SAME(db, s1, v1, "key1");
2306 ASSERT_SAME(db, s2, v2, "key2");
2307 ASSERT_SAME(db, s3, v3, "key3");
2308 ASSERT_SAME(db, s4, v4, "key4");
2309
2310 s = txn->Rollback();
2311 ASSERT_OK(s);
2312
2313 {
2314 ASSERT_TRUE(wp_db->delayed_prepared_empty_);
2315 ReadLock rl(&wp_db->prepared_mutex_);
2316 ASSERT_TRUE(wp_db->prepared_txns_.empty());
2317 ASSERT_TRUE(wp_db->delayed_prepared_.empty());
2318 }
2319
2320 ASSERT_SAME(db, s1, v1, "key1");
2321 ASSERT_SAME(db, s2, v2, "key2");
2322 ASSERT_SAME(db, s3, v3, "key3");
2323 ASSERT_SAME(db, s4, v4, "key4");
2324 delete txn;
2325 }
2326 }
2327 }
2328 }
2329
2330 TEST_P(WritePreparedTransactionTest, DisableGCDuringRecovery) {
2331 // Use large buffer to avoid memtable flush after 1024 insertions
2332 options.write_buffer_size = 1024 * 1024;
2333 ASSERT_OK(ReOpen());
2334 std::vector<KeyVersion> versions;
2335 uint64_t seq = 0;
2336 for (uint64_t i = 1; i <= 1024; i++) {
2337 std::string v = "bar" + std::to_string(i);
2338 ASSERT_OK(db->Put(WriteOptions(), "foo", v));
2339 VerifyKeys({{"foo", v}});
2340 seq++; // one for the key/value
2341 KeyVersion kv = {"foo", v, seq, kTypeValue};
2342 if (options.two_write_queues) {
2343 seq++; // one for the commit
2344 }
2345 versions.emplace_back(kv);
2346 }
2347 std::reverse(std::begin(versions), std::end(versions));
2348 VerifyInternalKeys(versions);
2349 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
2350 ASSERT_OK(db_impl->FlushWAL(true));
2351 // Use small buffer to ensure memtable flush during recovery
2352 options.write_buffer_size = 1024;
2353 ASSERT_OK(ReOpenNoDelete());
2354 VerifyInternalKeys(versions);
2355 }
2356
2357 TEST_P(WritePreparedTransactionTest, SequenceNumberZero) {
2358 ASSERT_OK(db->Put(WriteOptions(), "foo", "bar"));
2359 VerifyKeys({{"foo", "bar"}});
2360 const Snapshot* snapshot = db->GetSnapshot();
2361 ASSERT_OK(db->Flush(FlushOptions()));
2362 // Dummy keys to avoid compaction trivially move files and get around actual
2363 // compaction logic.
2364 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2365 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2366 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2367 // Compaction will output keys with sequence number 0, if it is visible to
2368 // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
2369 // visible to any snapshot.
2370 VerifyKeys({{"foo", "bar"}});
2371 VerifyKeys({{"foo", "bar"}}, snapshot);
2372 VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}});
2373 db->ReleaseSnapshot(snapshot);
2374 }
2375
2376 // Compaction should not remove a key if it is not committed, and should
2377 // proceed with older versions of the key as-if the new version doesn't exist.
2378 TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
2379 options.disable_auto_compactions = true;
2380 ASSERT_OK(ReOpen());
2381 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
2382 // Snapshots to avoid keys get evicted.
2383 std::vector<const Snapshot*> snapshots;
2384 // Keep track of expected sequence number.
2385 SequenceNumber expected_seq = 0;
2386
2387 auto add_key = [&](std::function<Status()> func) {
2388 ASSERT_OK(func());
2389 expected_seq++;
2390 if (options.two_write_queues) {
2391 expected_seq++; // 1 for commit
2392 }
2393 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
2394 snapshots.push_back(db->GetSnapshot());
2395 };
2396
2397 // Each key here represent a standalone test case.
2398 add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); });
2399 add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); });
2400 add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); });
2401 add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); });
2402 add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); });
2403 add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); });
2404 add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); });
2405 add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); });
2406 ASSERT_OK(db->Flush(FlushOptions()));
2407 add_key([&]() { return db->Delete(WriteOptions(), "key6"); });
2408 add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); });
2409
2410 auto* transaction = db->BeginTransaction(WriteOptions());
2411 ASSERT_OK(transaction->SetName("txn"));
2412 ASSERT_OK(transaction->Put("key1", "value1_2"));
2413 ASSERT_OK(transaction->Delete("key2"));
2414 ASSERT_OK(transaction->SingleDelete("key3"));
2415 ASSERT_OK(transaction->Merge("key4", "value4_2"));
2416 ASSERT_OK(transaction->Merge("key5", "value5_3"));
2417 ASSERT_OK(transaction->Put("key6", "value6_2"));
2418 ASSERT_OK(transaction->Put("key7", "value7_2"));
2419 // Prepare but not commit.
2420 ASSERT_OK(transaction->Prepare());
2421 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
2422 ASSERT_OK(db->Flush(FlushOptions()));
2423 for (auto* s : snapshots) {
2424 db->ReleaseSnapshot(s);
2425 }
2426 // Dummy keys to avoid compaction trivially move files and get around actual
2427 // compaction logic.
2428 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2429 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2430 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2431 VerifyKeys({
2432 {"key1", "value1_1"},
2433 {"key2", "value2_1"},
2434 {"key3", "value3_1"},
2435 {"key4", "value4_1"},
2436 {"key5", "value5_1,value5_2"},
2437 {"key6", "NOT_FOUND"},
2438 {"key7", "NOT_FOUND"},
2439 });
2440 VerifyInternalKeys({
2441 {"key1", "value1_2", expected_seq, kTypeValue},
2442 {"key1", "value1_1", 0, kTypeValue},
2443 {"key2", "", expected_seq, kTypeDeletion},
2444 {"key2", "value2_1", 0, kTypeValue},
2445 {"key3", "", expected_seq, kTypeSingleDeletion},
2446 {"key3", "value3_1", 0, kTypeValue},
2447 {"key4", "value4_2", expected_seq, kTypeMerge},
2448 {"key4", "value4_1", 0, kTypeValue},
2449 {"key5", "value5_3", expected_seq, kTypeMerge},
2450 {"key5", "value5_1,value5_2", 0, kTypeValue},
2451 {"key6", "value6_2", expected_seq, kTypeValue},
2452 {"key7", "value7_2", expected_seq, kTypeValue},
2453 });
2454 ASSERT_OK(transaction->Commit());
2455 VerifyKeys({
2456 {"key1", "value1_2"},
2457 {"key2", "NOT_FOUND"},
2458 {"key3", "NOT_FOUND"},
2459 {"key4", "value4_1,value4_2"},
2460 {"key5", "value5_1,value5_2,value5_3"},
2461 {"key6", "value6_2"},
2462 {"key7", "value7_2"},
2463 });
2464 delete transaction;
2465 }
2466
2467 // Compaction should keep keys visible to a snapshot based on commit sequence,
2468 // not just prepare sequence.
2469 TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
2470 options.disable_auto_compactions = true;
2471 ASSERT_OK(ReOpen());
2472 // Keep track of expected sequence number.
2473 SequenceNumber expected_seq = 0;
2474 auto* txn1 = db->BeginTransaction(WriteOptions());
2475 ASSERT_OK(txn1->SetName("txn1"));
2476 ASSERT_OK(txn1->Put("key1", "value1_1"));
2477 ASSERT_OK(txn1->Prepare());
2478 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
2479 ASSERT_OK(txn1->Commit());
2480 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
2481 ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
2482 delete txn1;
2483 // Take a snapshots to avoid keys get evicted before compaction.
2484 const Snapshot* snapshot1 = db->GetSnapshot();
2485 auto* txn2 = db->BeginTransaction(WriteOptions());
2486 ASSERT_OK(txn2->SetName("txn2"));
2487 ASSERT_OK(txn2->Put("key2", "value2_1"));
2488 ASSERT_OK(txn2->Prepare());
2489 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
2490 // txn1 commit before snapshot2 and it is visible to snapshot2.
2491 // txn2 commit after snapshot2 and it is not visible.
2492 const Snapshot* snapshot2 = db->GetSnapshot();
2493 ASSERT_OK(txn2->Commit());
2494 ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
2495 delete txn2;
2496 // Take a snapshots to avoid keys get evicted before compaction.
2497 const Snapshot* snapshot3 = db->GetSnapshot();
2498 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
2499 expected_seq++; // 1 for write
2500 SequenceNumber seq1 = expected_seq;
2501 if (options.two_write_queues) {
2502 expected_seq++; // 1 for commit
2503 }
2504 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
2505 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
2506 expected_seq++; // 1 for write
2507 SequenceNumber seq2 = expected_seq;
2508 if (options.two_write_queues) {
2509 expected_seq++; // 1 for commit
2510 }
2511 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
2512 ASSERT_OK(db->Flush(FlushOptions()));
2513 db->ReleaseSnapshot(snapshot1);
2514 db->ReleaseSnapshot(snapshot3);
2515 // Dummy keys to avoid compaction trivially move files and get around actual
2516 // compaction logic.
2517 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2518 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2519 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2520 VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
2521 VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2);
2522 VerifyInternalKeys({
2523 {"key1", "value1_2", seq1, kTypeValue},
2524 // "value1_1" is visible to snapshot2. Also keys at bottom level visible
2525 // to earliest snapshot will output with seq = 0.
2526 {"key1", "value1_1", 0, kTypeValue},
2527 {"key2", "value2_2", seq2, kTypeValue},
2528 });
2529 db->ReleaseSnapshot(snapshot2);
2530 }
2531
2532 TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
2533 const size_t snapshot_cache_bits = 7; // same as default
2534 const size_t commit_cache_bits = 0; // disable commit cache
2535 for (bool has_recent_prepare : {true, false}) {
2536 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2537 ASSERT_OK(ReOpen());
2538
2539 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2540 auto* transaction =
2541 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2542 ASSERT_OK(transaction->SetName("txn"));
2543 ASSERT_OK(transaction->Delete("key1"));
2544 ASSERT_OK(transaction->Prepare());
2545 // snapshot1 should get min_uncommitted from prepared_txns_ heap.
2546 auto snapshot1 = db->GetSnapshot();
2547 ASSERT_EQ(transaction->GetId(),
2548 ((SnapshotImpl*)snapshot1)->min_uncommitted_);
2549 // Add a commit to advance max_evicted_seq and move the prepared transaction
2550 // into delayed_prepared_ set.
2551 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
2552 Transaction* txn2 = nullptr;
2553 if (has_recent_prepare) {
2554 txn2 =
2555 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2556 ASSERT_OK(txn2->SetName("txn2"));
2557 ASSERT_OK(txn2->Put("key3", "value3"));
2558 ASSERT_OK(txn2->Prepare());
2559 }
2560 // snapshot2 should get min_uncommitted from delayed_prepared_ set.
2561 auto snapshot2 = db->GetSnapshot();
2562 ASSERT_EQ(transaction->GetId(),
2563 ((SnapshotImpl*)snapshot1)->min_uncommitted_);
2564 ASSERT_OK(transaction->Commit());
2565 delete transaction;
2566 if (has_recent_prepare) {
2567 ASSERT_OK(txn2->Commit());
2568 delete txn2;
2569 }
2570 VerifyKeys({{"key1", "NOT_FOUND"}});
2571 VerifyKeys({{"key1", "value1"}}, snapshot1);
2572 VerifyKeys({{"key1", "value1"}}, snapshot2);
2573 db->ReleaseSnapshot(snapshot1);
2574 db->ReleaseSnapshot(snapshot2);
2575 }
2576 }
2577
2578 // Insert two values, v1 and v2, for a key. Between prepare and commit of v2
2579 // take two snapshots, s1 and s2. Release s1 during compaction.
2580 // Test to make sure compaction doesn't get confused and think s1 can see both
2581 // values, and thus compact out the older value by mistake.
2582 TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
2583 const size_t snapshot_cache_bits = 7; // same as default
2584 const size_t commit_cache_bits = 0; // minimum commit cache
2585 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2586 options.disable_auto_compactions = true;
2587 ASSERT_OK(ReOpen());
2588
2589 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1"));
2590 auto* transaction =
2591 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2592 ASSERT_OK(transaction->SetName("txn"));
2593 ASSERT_OK(transaction->Put("key1", "value1_2"));
2594 ASSERT_OK(transaction->Prepare());
2595 auto snapshot1 = db->GetSnapshot();
2596 // Increment sequence number.
2597 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
2598 auto snapshot2 = db->GetSnapshot();
2599 ASSERT_OK(transaction->Commit());
2600 delete transaction;
2601 VerifyKeys({{"key1", "value1_2"}});
2602 VerifyKeys({{"key1", "value1_1"}}, snapshot1);
2603 VerifyKeys({{"key1", "value1_1"}}, snapshot2);
2604 // Add a flush to avoid compaction to fallback to trivial move.
2605
2606 // The callback might be called twice, record the calling state to
2607 // prevent double calling.
2608 bool callback_finished = false;
2609 auto callback = [&](void*) {
2610 if (callback_finished) {
2611 return;
2612 }
2613 // Release snapshot1 after CompactionIterator init.
2614 // CompactionIterator need to figure out the earliest snapshot
2615 // that can see key1:value1_2 is kMaxSequenceNumber, not
2616 // snapshot1 or snapshot2.
2617 db->ReleaseSnapshot(snapshot1);
2618 // Add some keys to advance max_evicted_seq.
2619 ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
2620 ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
2621 callback_finished = true;
2622 };
2623 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2624 callback);
2625 SyncPoint::GetInstance()->EnableProcessing();
2626
2627 ASSERT_OK(db->Flush(FlushOptions()));
2628 VerifyKeys({{"key1", "value1_2"}});
2629 VerifyKeys({{"key1", "value1_1"}}, snapshot2);
2630 db->ReleaseSnapshot(snapshot2);
2631 SyncPoint::GetInstance()->ClearAllCallBacks();
2632 }
2633
2634 // Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2,
2635 // after committing v2. Release s1 during compaction, right after compaction
2636 // processes v2 and before processes v1. Test to make sure compaction doesn't
2637 // get confused and believe v1 and v2 are visible to different snapshot
2638 // (v1 by s2, v2 by s1) and refuse to compact out v1.
2639 TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
2640 const size_t snapshot_cache_bits = 7; // same as default
2641 const size_t commit_cache_bits = 0; // minimum commit cache
2642 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2643 options.disable_auto_compactions = true;
2644 ASSERT_OK(ReOpen());
2645
2646 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2647 ASSERT_OK(db->Put(WriteOptions(), "key1", "value2"));
2648 SequenceNumber v2_seq = db->GetLatestSequenceNumber();
2649 auto* s1 = db->GetSnapshot();
2650 // Advance sequence number.
2651 ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy"));
2652 auto* s2 = db->GetSnapshot();
2653
2654 int count_value = 0;
2655 auto callback = [&](void* arg) {
2656 auto* ikey = reinterpret_cast<ParsedInternalKey*>(arg);
2657 if (ikey->user_key == "key1") {
2658 count_value++;
2659 if (count_value == 2) {
2660 // Processing v1.
2661 db->ReleaseSnapshot(s1);
2662 // Add some keys to advance max_evicted_seq and update
2663 // old_commit_map.
2664 ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy"));
2665 ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy"));
2666 }
2667 }
2668 };
2669 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV",
2670 callback);
2671 SyncPoint::GetInstance()->EnableProcessing();
2672
2673 ASSERT_OK(db->Flush(FlushOptions()));
2674 // value1 should be compact out.
2675 VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
2676
2677 // cleanup
2678 db->ReleaseSnapshot(s2);
2679 SyncPoint::GetInstance()->ClearAllCallBacks();
2680 }
2681
2682 // Insert two values, v1 and v2, for a key. Insert another dummy key
2683 // so to evict the commit cache for v2, while v1 is still in commit cache.
2684 // Take two snapshots, s1 and s2. Release s1 during compaction.
2685 // Since commit cache for v2 is evicted, and old_commit_map don't have
2686 // s1 (it is released),
2687 // TODO(myabandeh): how can we be sure that the v2's commit info is evicted
2688 // (and not v1's)? Instead of putting a dummy, we can directly call
2689 // AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache.
2690 TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
2691 const size_t snapshot_cache_bits = 7; // same as default
2692 const size_t commit_cache_bits = 1; // commit cache size = 2
2693 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2694 options.disable_auto_compactions = true;
2695 ASSERT_OK(ReOpen());
2696
2697 // Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
2698 // It also advance max_evicted_seq and can trigger old_commit_map cleanup.
2699 auto add_dummy = [&]() {
2700 auto* txn_dummy =
2701 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2702 ASSERT_OK(txn_dummy->SetName("txn_dummy"));
2703 ASSERT_OK(txn_dummy->Put("dummy", "dummy"));
2704 ASSERT_OK(txn_dummy->Prepare());
2705 ASSERT_OK(txn_dummy->Commit());
2706 delete txn_dummy;
2707 };
2708
2709 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2710 auto* txn =
2711 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2712 ASSERT_OK(txn->SetName("txn"));
2713 ASSERT_OK(txn->Put("key1", "value2"));
2714 ASSERT_OK(txn->Prepare());
2715 // TODO(myabandeh): replace it with GetId()?
2716 auto v2_seq = db->GetLatestSequenceNumber();
2717 ASSERT_OK(txn->Commit());
2718 delete txn;
2719 auto* s1 = db->GetSnapshot();
2720 // Dummy key to advance sequence number.
2721 add_dummy();
2722 auto* s2 = db->GetSnapshot();
2723
2724 // The callback might be called twice, record the calling state to
2725 // prevent double calling.
2726 bool callback_finished = false;
2727 auto callback = [&](void*) {
2728 if (callback_finished) {
2729 return;
2730 }
2731 db->ReleaseSnapshot(s1);
2732 // Add some dummy entries to trigger s1 being cleanup from old_commit_map.
2733 add_dummy();
2734 add_dummy();
2735 callback_finished = true;
2736 };
2737 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2738 callback);
2739 SyncPoint::GetInstance()->EnableProcessing();
2740
2741 ASSERT_OK(db->Flush(FlushOptions()));
2742 // value1 should be compact out.
2743 VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
2744
2745 db->ReleaseSnapshot(s2);
2746 SyncPoint::GetInstance()->ClearAllCallBacks();
2747 }
2748
2749 TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
2750 const size_t snapshot_cache_bits = 7; // same as default
2751 const size_t commit_cache_bits = 0; // minimum commit cache
2752 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
2753 options.disable_auto_compactions = true;
2754 ASSERT_OK(ReOpen());
2755
2756 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
2757 SequenceNumber put_seq = db->GetLatestSequenceNumber();
2758 auto* transaction =
2759 db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
2760 ASSERT_OK(transaction->SetName("txn"));
2761 ASSERT_OK(transaction->Delete("key1"));
2762 ASSERT_OK(transaction->Prepare());
2763 SequenceNumber del_seq = db->GetLatestSequenceNumber();
2764 auto snapshot1 = db->GetSnapshot();
2765 // Increment sequence number.
2766 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
2767 auto snapshot2 = db->GetSnapshot();
2768 ASSERT_OK(transaction->Commit());
2769 delete transaction;
2770 VerifyKeys({{"key1", "NOT_FOUND"}});
2771 VerifyKeys({{"key1", "value1"}}, snapshot1);
2772 VerifyKeys({{"key1", "value1"}}, snapshot2);
2773 ASSERT_OK(db->Flush(FlushOptions()));
2774
2775 auto callback = [&](void* compaction) {
2776 // Release snapshot1 after CompactionIterator init.
2777 // CompactionIterator need to double check and find out snapshot2 is now
2778 // the earliest existing snapshot.
2779 if (compaction != nullptr) {
2780 db->ReleaseSnapshot(snapshot1);
2781 // Add some keys to advance max_evicted_seq.
2782 ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
2783 ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
2784 }
2785 };
2786 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
2787 callback);
2788 SyncPoint::GetInstance()->EnableProcessing();
2789
2790 // Dummy keys to avoid compaction trivially move files and get around actual
2791 // compaction logic.
2792 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
2793 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
2794 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2795 // Only verify for key1. Both the put and delete for the key should be kept.
2796 // Since the delete tombstone is not visible to snapshot2, we need to keep
2797 // at least one version of the key, for write-conflict check.
2798 VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
2799 {"key1", "value1", put_seq, kTypeValue}});
2800 db->ReleaseSnapshot(snapshot2);
2801 SyncPoint::GetInstance()->ClearAllCallBacks();
2802 }
2803
2804 TEST_P(WritePreparedTransactionTest,
2805 ReleaseEarliestSnapshotDuringCompaction_WithSD) {
2806 constexpr size_t kSnapshotCacheBits = 7; // same as default
2807 constexpr size_t kCommitCacheBits = 0; // minimum commit cache
2808 UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
2809 options.disable_auto_compactions = true;
2810 ASSERT_OK(ReOpen());
2811
2812 ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
2813 ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
2814 ASSERT_OK(db->Flush(FlushOptions()));
2815
2816 auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
2817 /*old_txn=*/nullptr);
2818 ASSERT_OK(txn->SingleDelete("key"));
2819 ASSERT_OK(txn->Put("wow", "value"));
2820 ASSERT_OK(txn->SetName("txn"));
2821 ASSERT_OK(txn->Prepare());
2822 ASSERT_OK(db->Flush(FlushOptions()));
2823
2824 const bool two_write_queues = std::get<1>(GetParam());
2825 if (two_write_queues) {
2826 // In the case of two queues, commit another txn just to bump
2827 // last_published_seq so that a subsequent GetSnapshot() call can return
2828 // a snapshot with higher sequence.
2829 auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
2830 /*old_txn=*/nullptr);
2831 ASSERT_OK(dummy_txn->Put("haha", "value"));
2832 ASSERT_OK(dummy_txn->Commit());
2833 delete dummy_txn;
2834 }
2835 auto* snapshot = db->GetSnapshot();
2836
2837 ASSERT_OK(txn->Commit());
2838 delete txn;
2839
2840 SyncPoint::GetInstance()->SetCallBack(
2841 "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) {
2842 if (!arg) {
2843 return;
2844 }
2845 db->ReleaseSnapshot(snapshot);
2846
2847 // Advance max_evicted_seq
2848 ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
2849 });
2850 SyncPoint::GetInstance()->EnableProcessing();
2851 ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
2852 /*end=*/nullptr));
2853 SyncPoint::GetInstance()->ClearAllCallBacks();
2854 }
2855
2856 TEST_P(WritePreparedTransactionTest,
2857 ReleaseEarliestSnapshotDuringCompaction_WithSD2) {
2858 constexpr size_t kSnapshotCacheBits = 7; // same as default
2859 constexpr size_t kCommitCacheBits = 0; // minimum commit cache
2860 UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
2861 options.disable_auto_compactions = true;
2862 ASSERT_OK(ReOpen());
2863
2864 ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
2865 ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
2866 ASSERT_OK(db->Flush(FlushOptions()));
2867
2868 auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
2869 /*old_txn=*/nullptr);
2870 ASSERT_OK(txn->Put("bar", "value"));
2871 ASSERT_OK(txn->SingleDelete("key"));
2872 ASSERT_OK(txn->SetName("txn"));
2873 ASSERT_OK(txn->Prepare());
2874 ASSERT_OK(db->Flush(FlushOptions()));
2875
2876 ASSERT_OK(txn->Commit());
2877 delete txn;
2878
2879 ASSERT_OK(db->Put(WriteOptions(), "haha", "value"));
2880
2881 // Create a dummy transaction to take a snapshot for ww-conflict detection.
2882 TransactionOptions txn_opts;
2883 txn_opts.set_snapshot = true;
2884 auto* dummy_txn =
2885 db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
2886
2887 SyncPoint::GetInstance()->SetCallBack(
2888 "CompactionIterator::NextFromInput:SingleDelete:2", [&](void* /*arg*/) {
2889 ASSERT_OK(dummy_txn->Rollback());
2890 delete dummy_txn;
2891
2892 ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
2893 });
2894 SyncPoint::GetInstance()->EnableProcessing();
2895
2896 ASSERT_OK(db->Put(WriteOptions(), "haha2", "value"));
2897 auto* snapshot = db->GetSnapshot();
2898
2899 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
2900 db->ReleaseSnapshot(snapshot);
2901 SyncPoint::GetInstance()->ClearAllCallBacks();
2902 }
2903
2904 TEST_P(WritePreparedTransactionTest,
2905 ReleaseEarliestSnapshotDuringCompaction_WithDelete) {
2906 constexpr size_t kSnapshotCacheBits = 7; // same as default
2907 constexpr size_t kCommitCacheBits = 0; // minimum commit cache
2908 UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
2909 options.disable_auto_compactions = true;
2910 ASSERT_OK(ReOpen());
2911
2912 ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
2913 ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
2914 ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
2915 ASSERT_OK(db->Flush(FlushOptions()));
2916
2917 auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
2918 /*old_txn=*/nullptr);
2919 ASSERT_OK(txn->Delete("b"));
2920 ASSERT_OK(txn->SetName("txn"));
2921 ASSERT_OK(txn->Prepare());
2922
2923 const bool two_write_queues = std::get<1>(GetParam());
2924 if (two_write_queues) {
2925 // In the case of two queues, commit another txn just to bump
2926 // last_published_seq so that a subsequent GetSnapshot() call can return
2927 // a snapshot with higher sequence.
2928 auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
2929 /*old_txn=*/nullptr);
2930 ASSERT_OK(dummy_txn->Put("haha", "value"));
2931 ASSERT_OK(dummy_txn->Commit());
2932 delete dummy_txn;
2933 }
2934 auto* snapshot1 = db->GetSnapshot();
2935 ASSERT_OK(txn->Commit());
2936 delete txn;
2937 auto* snapshot2 = db->GetSnapshot();
2938
2939 SyncPoint::GetInstance()->SetCallBack(
2940 "CompactionIterator::NextFromInput:BottommostDelete:1", [&](void* arg) {
2941 if (!arg) {
2942 return;
2943 }
2944 db->ReleaseSnapshot(snapshot1);
2945
2946 // Advance max_evicted_seq
2947 ASSERT_OK(db->Put(WriteOptions(), "dummy1", "value"));
2948 });
2949 SyncPoint::GetInstance()->EnableProcessing();
2950
2951 ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
2952 /*end=*/nullptr));
2953 db->ReleaseSnapshot(snapshot2);
2954 SyncPoint::GetInstance()->ClearAllCallBacks();
2955 }
2956
2957 TEST_P(WritePreparedTransactionTest,
2958 ReleaseSnapshotBetweenSDAndPutDuringCompaction) {
2959 constexpr size_t kSnapshotCacheBits = 7; // same as default
2960 constexpr size_t kCommitCacheBits = 0; // minimum commit cache
2961 UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
2962 options.disable_auto_compactions = true;
2963 ASSERT_OK(ReOpen());
2964
2965 // Create a dummy transaction to take a snapshot for ww-conflict detection.
2966 TransactionOptions txn_opts;
2967 txn_opts.set_snapshot = true;
2968 auto* dummy_txn =
2969 db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
2970 // Increment seq
2971 ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
2972
2973 ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
2974 ASSERT_OK(db->SingleDelete(WriteOptions(), "foo"));
2975 auto* snapshot1 = db->GetSnapshot();
2976 // Increment seq
2977 ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
2978 auto* snapshot2 = db->GetSnapshot();
2979
2980 SyncPoint::GetInstance()->SetCallBack(
2981 "CompactionIterator::NextFromInput:KeepSDForWW", [&](void* /*arg*/) {
2982 db->ReleaseSnapshot(snapshot1);
2983
2984 ASSERT_OK(db->Put(WriteOptions(), "dontcare2", "value2"));
2985 });
2986 SyncPoint::GetInstance()->EnableProcessing();
2987
2988 ASSERT_OK(db->Flush(FlushOptions()));
2989 db->ReleaseSnapshot(snapshot2);
2990 ASSERT_OK(dummy_txn->Commit());
2991 delete dummy_txn;
2992 SyncPoint::GetInstance()->ClearAllCallBacks();
2993 }
2994
2995 TEST_P(WritePreparedTransactionTest,
2996 ReleaseEarliestWriteConflictSnapshot_SingleDelete) {
2997 constexpr size_t kSnapshotCacheBits = 7; // same as default
2998 constexpr size_t kCommitCacheBits = 0; // minimum commit cache
2999 UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
3000 options.disable_auto_compactions = true;
3001 ASSERT_OK(ReOpen());
3002
3003 ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
3004 ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
3005 ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
3006 ASSERT_OK(db->Flush(FlushOptions()));
3007
3008 {
3009 CompactRangeOptions cro;
3010 cro.change_level = true;
3011 cro.target_level = 2;
3012 ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
3013 }
3014
3015 std::unique_ptr<Transaction> txn;
3016 txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions(),
3017 /*old_txn=*/nullptr));
3018 ASSERT_OK(txn->SetName("txn1"));
3019 ASSERT_OK(txn->SingleDelete("b"));
3020 ASSERT_OK(txn->Prepare());
3021 ASSERT_OK(txn->Commit());
3022
3023 auto* snapshot1 = db->GetSnapshot();
3024
3025 // Bump seq of the db by performing writes so that
3026 // earliest_snapshot_ < earliest_write_conflict_snapshot_ in
3027 // CompactionIterator.
3028 ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare"));
3029
3030 // Create another snapshot for write conflict checking
3031 std::unique_ptr<Transaction> txn2;
3032 {
3033 TransactionOptions txn_opts;
3034 txn_opts.set_snapshot = true;
3035 txn2.reset(
3036 db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr));
3037 }
3038
3039 // Bump seq so that the subsequent bg flush won't create a snapshot with the
3040 // same seq as the previous snapshot for conflict checking.
3041 ASSERT_OK(db->Put(WriteOptions(), "y", "dont"));
3042
3043 ASSERT_OK(db->Flush(FlushOptions()));
3044
3045 SyncPoint::GetInstance()->DisableProcessing();
3046 SyncPoint::GetInstance()->ClearAllCallBacks();
3047 SyncPoint::GetInstance()->SetCallBack(
3048 "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* /*arg*/) {
3049 // Rolling back txn2 should release its snapshot(for ww checking).
3050 ASSERT_OK(txn2->Rollback());
3051 txn2.reset();
3052 // Advance max_evicted_seq
3053 ASSERT_OK(db->Put(WriteOptions(), "x", "value"));
3054 });
3055 SyncPoint::GetInstance()->EnableProcessing();
3056
3057 ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
3058 /*end=*/nullptr));
3059
3060 SyncPoint::GetInstance()->DisableProcessing();
3061 SyncPoint::GetInstance()->ClearAllCallBacks();
3062
3063 db->ReleaseSnapshot(snapshot1);
3064 }
3065
3066 TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing) {
3067 constexpr size_t kSnapshotCacheBits = 7; // same as default
3068 constexpr size_t kCommitCacheBits = 0; // minimum commit cache
3069 UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
3070 options.disable_auto_compactions = true;
3071 ASSERT_OK(ReOpen());
3072
3073 ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
3074 ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
3075 ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
3076 ASSERT_OK(db->Flush(FlushOptions()));
3077
3078 {
3079 CompactRangeOptions cro;
3080 cro.change_level = true;
3081 cro.target_level = 2;
3082 ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
3083 }
3084
3085 ASSERT_OK(db->SingleDelete(WriteOptions(), "b"));
3086
3087 // Take a snapshot so that the SD won't be dropped during flush.
3088 auto* tmp_snapshot = db->GetSnapshot();
3089
3090 ASSERT_OK(db->Put(WriteOptions(), "b", "value2"));
3091 auto* snapshot = db->GetSnapshot();
3092 ASSERT_OK(db->Flush(FlushOptions()));
3093
3094 db->ReleaseSnapshot(tmp_snapshot);
3095
3096 // Bump the sequence so that the below bg compaction job's snapshot will be
3097 // different from snapshot's sequence.
3098 ASSERT_OK(db->Put(WriteOptions(), "z", "foo"));
3099
3100 SyncPoint::GetInstance()->DisableProcessing();
3101 SyncPoint::GetInstance()->ClearAllCallBacks();
3102 SyncPoint::GetInstance()->SetCallBack(
3103 "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) {
3104 const auto* const ikey =
3105 reinterpret_cast<const ParsedInternalKey*>(arg);
3106 assert(ikey);
3107 if (ikey->user_key == "b") {
3108 assert(ikey->type == kTypeValue);
3109 db->ReleaseSnapshot(snapshot);
3110
3111 // Bump max_evicted_seq.
3112 ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare"));
3113 }
3114 });
3115 SyncPoint::GetInstance()->EnableProcessing();
3116
3117 ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
3118 /*end=*/nullptr));
3119
3120 SyncPoint::GetInstance()->DisableProcessing();
3121 SyncPoint::GetInstance()->ClearAllCallBacks();
3122 }
3123
3124 TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing2) {
3125 constexpr size_t kSnapshotCacheBits = 7; // same as default
3126 constexpr size_t kCommitCacheBits = 0; // minimum commit cache
3127 UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
3128 options.disable_auto_compactions = true;
3129 ASSERT_OK(ReOpen());
3130
3131 // Generate an L0 with only SD for one key "b".
3132 ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
3133 ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
3134 // Take a snapshot so that subsequent flush outputs the SD for "b".
3135 auto* tmp_snapshot = db->GetSnapshot();
3136 ASSERT_OK(db->SingleDelete(WriteOptions(), "b"));
3137 ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
3138
3139 SyncPoint::GetInstance()->DisableProcessing();
3140 SyncPoint::GetInstance()->ClearAllCallBacks();
3141 SyncPoint::GetInstance()->SetCallBack(
3142 "CompactionIterator::NextFromInput:SingleDelete:3", [&](void* arg) {
3143 if (!arg) {
3144 db->ReleaseSnapshot(tmp_snapshot);
3145 // Bump max_evicted_seq
3146 ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare"));
3147 }
3148 });
3149 SyncPoint::GetInstance()->EnableProcessing();
3150
3151 ASSERT_OK(db->Flush(FlushOptions()));
3152 // Finish generating L0 with only SD for "b".
3153
3154 SyncPoint::GetInstance()->DisableProcessing();
3155 SyncPoint::GetInstance()->ClearAllCallBacks();
3156
3157 // Move the L0 to L2.
3158 {
3159 CompactRangeOptions cro;
3160 cro.change_level = true;
3161 cro.target_level = 2;
3162 ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
3163 }
3164
3165 ASSERT_OK(db->Put(WriteOptions(), "b", "value1"));
3166
3167 auto* snapshot = db->GetSnapshot();
3168
3169 // Bump seq so that a subsequent flush/compaction job's snapshot is larger
3170 // than the above snapshot's seq.
3171 ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare"));
3172
3173 // Generate a second L0.
3174 ASSERT_OK(db->Flush(FlushOptions()));
3175
3176 SyncPoint::GetInstance()->SetCallBack(
3177 "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) {
3178 const auto* const ikey =
3179 reinterpret_cast<const ParsedInternalKey*>(arg);
3180 assert(ikey);
3181 if (ikey->user_key == "b") {
3182 assert(ikey->type == kTypeValue);
3183 db->ReleaseSnapshot(snapshot);
3184
3185 // Bump max_evicted_seq.
3186 ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare"));
3187 }
3188 });
3189 SyncPoint::GetInstance()->EnableProcessing();
3190
3191 ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
3192 /*end=*/nullptr));
3193
3194 SyncPoint::GetInstance()->DisableProcessing();
3195 SyncPoint::GetInstance()->ClearAllCallBacks();
3196 }
3197
3198 // Although the user-contract indicates that a SD can only be issued for a key
3199 // that exists and has not been overwritten, it is still possible for a Delete
3200 // to be present when write-prepared transaction is rolled back.
3201 TEST_P(WritePreparedTransactionTest, SingleDeleteAfterRollback) {
3202 constexpr size_t kSnapshotCacheBits = 7; // same as default
3203 constexpr size_t kCommitCacheBits = 0; // minimum commit cache
3204 txn_db_options.rollback_deletion_type_callback =
3205 [](TransactionDB*, ColumnFamilyHandle*, const Slice&) { return true; };
3206 UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
3207 options.disable_auto_compactions = true;
3208 ASSERT_OK(ReOpen());
3209
3210 // Get a write conflict snapshot by creating a transaction with
3211 // set_snapshot=true.
3212 TransactionOptions txn_opts;
3213 txn_opts.set_snapshot = true;
3214 std::unique_ptr<Transaction> dummy_txn(
3215 db->BeginTransaction(WriteOptions(), txn_opts));
3216
3217 std::unique_ptr<Transaction> txn0(
3218 db->BeginTransaction(WriteOptions(), TransactionOptions()));
3219 ASSERT_OK(txn0->Put("foo", "value"));
3220 ASSERT_OK(txn0->SetName("xid0"));
3221 ASSERT_OK(txn0->Prepare());
3222
3223 // Create an SST with only {"foo": "value"}.
3224 ASSERT_OK(db->Flush(FlushOptions()));
3225
3226 // Insert a Delete to cancel out the prior Put by txn0.
3227 ASSERT_OK(txn0->Rollback());
3228 txn0.reset();
3229
3230 // Create a second SST.
3231 ASSERT_OK(db->Flush(FlushOptions()));
3232
3233 ASSERT_OK(db->Put(WriteOptions(), "foo", "value1"));
3234
3235 auto* snapshot = db->GetSnapshot();
3236
3237 ASSERT_OK(db->SingleDelete(WriteOptions(), "foo"));
3238
3239 int count = 0;
3240 SyncPoint::GetInstance()->DisableProcessing();
3241 SyncPoint::GetInstance()->ClearAllCallBacks();
3242 SyncPoint::GetInstance()->SetCallBack(
3243 "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) {
3244 const auto* const c = reinterpret_cast<const Compaction*>(arg);
3245 assert(!c);
3246 // Trigger once only for SingleDelete during flush.
3247 if (0 == count) {
3248 ++count;
3249 db->ReleaseSnapshot(snapshot);
3250 // Bump max_evicted_seq
3251 ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare"));
3252 }
3253 });
3254 SyncPoint::GetInstance()->EnableProcessing();
3255
3256 // Create a third SST containing a SD without its matching PUT.
3257 ASSERT_OK(db->Flush(FlushOptions()));
3258
3259 SyncPoint::GetInstance()->DisableProcessing();
3260 SyncPoint::GetInstance()->ClearAllCallBacks();
3261 SyncPoint::GetInstance()->EnableProcessing();
3262
3263 DBImpl* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
3264 assert(dbimpl);
3265 ASSERT_OK(dbimpl->TEST_CompactRange(
3266 /*level=*/0, /*begin=*/nullptr, /*end=*/nullptr,
3267 /*column_family=*/nullptr, /*disallow_trivial_mode=*/true));
3268
3269 SyncPoint::GetInstance()->DisableProcessing();
3270 SyncPoint::GetInstance()->ClearAllCallBacks();
3271
3272 // Release the conflict-checking snapshot.
3273 ASSERT_OK(dummy_txn->Rollback());
3274 }
3275
3276 // A more complex test to verify compaction/flush should keep keys visible
3277 // to snapshots.
3278 TEST_P(WritePreparedTransactionTest,
3279 CompactionKeepSnapshotVisibleKeysRandomized) {
3280 constexpr size_t kNumTransactions = 10;
3281 constexpr size_t kNumIterations = 1000;
3282
3283 std::vector<Transaction*> transactions(kNumTransactions, nullptr);
3284 std::vector<size_t> versions(kNumTransactions, 0);
3285 std::unordered_map<std::string, std::string> current_data;
3286 std::vector<const Snapshot*> snapshots;
3287 std::vector<std::unordered_map<std::string, std::string>> snapshot_data;
3288
3289 Random rnd(1103);
3290 options.disable_auto_compactions = true;
3291 ASSERT_OK(ReOpen());
3292
3293 for (size_t i = 0; i < kNumTransactions; i++) {
3294 std::string key = "key" + std::to_string(i);
3295 std::string value = "value0";
3296 ASSERT_OK(db->Put(WriteOptions(), key, value));
3297 current_data[key] = value;
3298 }
3299 VerifyKeys(current_data);
3300
3301 for (size_t iter = 0; iter < kNumIterations; iter++) {
3302 auto r = rnd.Next() % (kNumTransactions + 1);
3303 if (r < kNumTransactions) {
3304 std::string key = "key" + std::to_string(r);
3305 if (transactions[r] == nullptr) {
3306 std::string value = "value" + std::to_string(versions[r] + 1);
3307 auto* txn = db->BeginTransaction(WriteOptions());
3308 ASSERT_OK(txn->SetName("txn" + std::to_string(r)));
3309 ASSERT_OK(txn->Put(key, value));
3310 ASSERT_OK(txn->Prepare());
3311 transactions[r] = txn;
3312 } else {
3313 std::string value = "value" + std::to_string(++versions[r]);
3314 ASSERT_OK(transactions[r]->Commit());
3315 delete transactions[r];
3316 transactions[r] = nullptr;
3317 current_data[key] = value;
3318 }
3319 } else {
3320 auto* snapshot = db->GetSnapshot();
3321 VerifyKeys(current_data, snapshot);
3322 snapshots.push_back(snapshot);
3323 snapshot_data.push_back(current_data);
3324 }
3325 VerifyKeys(current_data);
3326 }
3327 // Take a last snapshot to test compaction with uncommitted prepared
3328 // transaction.
3329 snapshots.push_back(db->GetSnapshot());
3330 snapshot_data.push_back(current_data);
3331
3332 ASSERT_EQ(snapshots.size(), snapshot_data.size());
3333 for (size_t i = 0; i < snapshots.size(); i++) {
3334 VerifyKeys(snapshot_data[i], snapshots[i]);
3335 }
3336 ASSERT_OK(db->Flush(FlushOptions()));
3337 for (size_t i = 0; i < snapshots.size(); i++) {
3338 VerifyKeys(snapshot_data[i], snapshots[i]);
3339 }
3340 // Dummy keys to avoid compaction trivially move files and get around actual
3341 // compaction logic.
3342 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
3343 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
3344 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3345 for (size_t i = 0; i < snapshots.size(); i++) {
3346 VerifyKeys(snapshot_data[i], snapshots[i]);
3347 }
3348 // cleanup
3349 for (size_t i = 0; i < kNumTransactions; i++) {
3350 if (transactions[i] == nullptr) {
3351 continue;
3352 }
3353 ASSERT_OK(transactions[i]->Commit());
3354 delete transactions[i];
3355 }
3356 for (size_t i = 0; i < snapshots.size(); i++) {
3357 db->ReleaseSnapshot(snapshots[i]);
3358 }
3359 }
3360
3361 // Compaction should not apply the optimization to output key with sequence
3362 // number equal to 0 if the key is not visible to earliest snapshot, based on
3363 // commit sequence number.
3364 TEST_P(WritePreparedTransactionTest,
3365 CompactionShouldKeepSequenceForUncommittedKeys) {
3366 options.disable_auto_compactions = true;
3367 ASSERT_OK(ReOpen());
3368 // Keep track of expected sequence number.
3369 SequenceNumber expected_seq = 0;
3370 auto* transaction = db->BeginTransaction(WriteOptions());
3371 ASSERT_OK(transaction->SetName("txn"));
3372 ASSERT_OK(transaction->Put("key1", "value1"));
3373 ASSERT_OK(transaction->Prepare());
3374 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
3375 SequenceNumber seq1 = expected_seq;
3376 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
3377 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
3378 expected_seq++; // one for data
3379 if (options.two_write_queues) {
3380 expected_seq++; // one for commit
3381 }
3382 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
3383 ASSERT_OK(db->Flush(FlushOptions()));
3384 // Dummy keys to avoid compaction trivially move files and get around actual
3385 // compaction logic.
3386 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
3387 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
3388 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
3389 VerifyKeys({
3390 {"key1", "NOT_FOUND"},
3391 {"key2", "value2"},
3392 });
3393 VerifyInternalKeys({
3394 // "key1" has not been committed. It keeps its sequence number.
3395 {"key1", "value1", seq1, kTypeValue},
3396 // "key2" is committed and output with seq = 0.
3397 {"key2", "value2", 0, kTypeValue},
3398 });
3399 ASSERT_OK(transaction->Commit());
3400 VerifyKeys({
3401 {"key1", "value1"},
3402 {"key2", "value2"},
3403 });
3404 delete transaction;
3405 }
3406
3407 TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) {
3408 options.disable_auto_compactions = true;
3409 ASSERT_OK(ReOpen());
3410
3411 const Snapshot* snapshot = nullptr;
3412 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
3413 auto* txn = db->BeginTransaction(WriteOptions());
3414 ASSERT_OK(txn->SetName("txn"));
3415 ASSERT_OK(txn->Put("key1", "value2"));
3416 ASSERT_OK(txn->Prepare());
3417
3418 auto callback = [&](void*) {
3419 // Snapshot is taken after compaction start. It should be taken into
3420 // consideration for whether to compact out value1.
3421 snapshot = db->GetSnapshot();
3422 ASSERT_OK(txn->Commit());
3423 delete txn;
3424 };
3425 SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
3426 callback);
3427 SyncPoint::GetInstance()->EnableProcessing();
3428 ASSERT_OK(db->Flush(FlushOptions()));
3429 ASSERT_NE(nullptr, snapshot);
3430 VerifyKeys({{"key1", "value2"}});
3431 VerifyKeys({{"key1", "value1"}}, snapshot);
3432 db->ReleaseSnapshot(snapshot);
3433 }
3434
3435 TEST_P(WritePreparedTransactionTest, Iterate) {
3436 auto verify_state = [](Iterator* iter, const std::string& key,
3437 const std::string& value) {
3438 ASSERT_TRUE(iter->Valid());
3439 ASSERT_OK(iter->status());
3440 ASSERT_EQ(key, iter->key().ToString());
3441 ASSERT_EQ(value, iter->value().ToString());
3442 };
3443
3444 auto verify_iter = [&](const std::string& expected_val) {
3445 // Get iterator from a concurrent transaction and make sure it has the
3446 // same view as an iterator from the DB.
3447 auto* txn = db->BeginTransaction(WriteOptions());
3448
3449 for (int i = 0; i < 2; i++) {
3450 Iterator* iter = (i == 0) ? db->NewIterator(ReadOptions())
3451 : txn->GetIterator(ReadOptions());
3452 // Seek
3453 iter->Seek("foo");
3454 verify_state(iter, "foo", expected_val);
3455 // Next
3456 iter->Seek("a");
3457 verify_state(iter, "a", "va");
3458 iter->Next();
3459 verify_state(iter, "foo", expected_val);
3460 // SeekForPrev
3461 iter->SeekForPrev("y");
3462 verify_state(iter, "foo", expected_val);
3463 // Prev
3464 iter->SeekForPrev("z");
3465 verify_state(iter, "z", "vz");
3466 iter->Prev();
3467 verify_state(iter, "foo", expected_val);
3468 delete iter;
3469 }
3470 delete txn;
3471 };
3472
3473 ASSERT_OK(db->Put(WriteOptions(), "foo", "v1"));
3474 auto* transaction = db->BeginTransaction(WriteOptions());
3475 ASSERT_OK(transaction->SetName("txn"));
3476 ASSERT_OK(transaction->Put("foo", "v2"));
3477 ASSERT_OK(transaction->Prepare());
3478 VerifyKeys({{"foo", "v1"}});
3479 // dummy keys
3480 ASSERT_OK(db->Put(WriteOptions(), "a", "va"));
3481 ASSERT_OK(db->Put(WriteOptions(), "z", "vz"));
3482 verify_iter("v1");
3483 ASSERT_OK(transaction->Commit());
3484 VerifyKeys({{"foo", "v2"}});
3485 verify_iter("v2");
3486 delete transaction;
3487 }
3488
3489 TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
3490 Iterator* iter = db->NewIterator(ReadOptions());
3491 ASSERT_OK(iter->status());
3492 ASSERT_TRUE(iter->Refresh().IsNotSupported());
3493 delete iter;
3494 }
3495
3496 // Committing an delayed prepared has two non-atomic steps: update commit cache,
3497 // remove seq from delayed_prepared_. The read in IsInSnapshot also involves two
3498 // non-atomic steps of checking these two data structures. This test breaks each
3499 // in the middle to ensure correctness in spite of non-atomic execution.
3500 // Note: This test is limitted to the case where snapshot is larger than the
3501 // max_evicted_seq_.
3502 TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
3503 const size_t snapshot_cache_bits = 7; // same as default
3504 const size_t commit_cache_bits = 3; // 8 entries
3505 for (auto split_read : {true, false}) {
3506 std::vector<bool> split_options = {false};
3507 if (split_read) {
3508 // Also test for break before mutex
3509 split_options.push_back(true);
3510 }
3511 for (auto split_before_mutex : split_options) {
3512 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3513 ASSERT_OK(ReOpen());
3514 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3515 DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
3516 // Fill up the commit cache
3517 std::string init_value("value1");
3518 for (int i = 0; i < 10; i++) {
3519 ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
3520 }
3521 // Prepare a transaction but do not commit it
3522 Transaction* txn =
3523 db->BeginTransaction(WriteOptions(), TransactionOptions());
3524 ASSERT_OK(txn->SetName("xid"));
3525 ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
3526 ASSERT_OK(txn->Prepare());
3527 // Commit a bunch of entries to advance max evicted seq and make the
3528 // prepared a delayed prepared
3529 for (int i = 0; i < 10; i++) {
3530 ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3531 }
3532 // The snapshot should not see the delayed prepared entry
3533 auto snap = db->GetSnapshot();
3534
3535 if (split_read) {
3536 if (split_before_mutex) {
3537 // split before acquiring prepare_mutex_
3538 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3539 {{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause",
3540 "AtomicCommitOfDelayedPrepared:Commit:before"},
3541 {"AtomicCommitOfDelayedPrepared:Commit:after",
3542 "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}});
3543 } else {
3544 // split right after reading from the commit cache
3545 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3546 {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
3547 "AtomicCommitOfDelayedPrepared:Commit:before"},
3548 {"AtomicCommitOfDelayedPrepared:Commit:after",
3549 "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
3550 }
3551 } else { // split commit
3552 // split right before removing from delayed_prepared_
3553 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3554 {{"WritePreparedTxnDB::RemovePrepared:pause",
3555 "AtomicCommitOfDelayedPrepared:Read:before"},
3556 {"AtomicCommitOfDelayedPrepared:Read:after",
3557 "WritePreparedTxnDB::RemovePrepared:resume"}});
3558 }
3559 SyncPoint::GetInstance()->EnableProcessing();
3560
3561 ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
3562 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before");
3563 ASSERT_OK(txn->Commit());
3564 if (split_before_mutex) {
3565 // Do bunch of inserts to evict the commit entry from the cache. This
3566 // would prevent the 2nd look into commit cache under prepare_mutex_
3567 // to see the commit entry.
3568 auto seq = db_impl->TEST_GetLastVisibleSequence();
3569 size_t tries = 0;
3570 while (wp_db->max_evicted_seq_ < seq && tries < 50) {
3571 ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3572 tries++;
3573 };
3574 ASSERT_LT(tries, 50);
3575 }
3576 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after");
3577 delete txn;
3578 });
3579
3580 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3581 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before");
3582 ReadOptions roptions;
3583 roptions.snapshot = snap;
3584 PinnableSlice value;
3585 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
3586 ASSERT_OK(s);
3587 // It should not see the commit of delayed prepared
3588 ASSERT_TRUE(value == init_value);
3589 TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after");
3590 db->ReleaseSnapshot(snap);
3591 });
3592
3593 read_thread.join();
3594 commit_thread.join();
3595 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3596 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3597 } // for split_before_mutex
3598 } // for split_read
3599 }
3600
3601 // When max evicted seq advances a prepared seq, it involves two updates: i)
3602 // adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_.
3603 // ::IsInSnapshot also reads these two values in a non-atomic way. This test
3604 // ensures correctness if the update occurs after ::IsInSnapshot reads
3605 // delayed_prepared_empty_ and before it reads max_evicted_seq_.
3606 // Note: this test focuses on read snapshot larger than max_evicted_seq_.
3607 TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
3608 const size_t snapshot_cache_bits = 7; // same as default
3609 const size_t commit_cache_bits = 3; // 8 entries
3610 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3611 ASSERT_OK(ReOpen());
3612 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3613 // Fill up the commit cache
3614 std::string init_value("value1");
3615 for (int i = 0; i < 10; i++) {
3616 ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
3617 }
3618 // Prepare a transaction but do not commit it
3619 Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
3620 ASSERT_OK(txn->SetName("xid"));
3621 ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
3622 ASSERT_OK(txn->Prepare());
3623 // Create a gap between prepare seq and snapshot seq
3624 ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3625 ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3626 // The snapshot should not see the delayed prepared entry
3627 auto snap = db->GetSnapshot();
3628 ASSERT_LT(txn->GetId(), snap->GetSequenceNumber());
3629
3630 // split right after reading delayed_prepared_empty_
3631 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3632 {{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause",
3633 "AtomicUpdateOfDelayedPrepared:before"},
3634 {"AtomicUpdateOfDelayedPrepared:after",
3635 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}});
3636 SyncPoint::GetInstance()->EnableProcessing();
3637
3638 ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
3639 TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before");
3640 // Commit a bunch of entries to advance max evicted seq and make the
3641 // prepared a delayed prepared
3642 size_t tries = 0;
3643 while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
3644 ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3645 tries++;
3646 };
3647 ASSERT_LT(tries, 50);
3648 // This is the case on which the test focuses
3649 ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
3650 TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after");
3651 });
3652
3653 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3654 ReadOptions roptions;
3655 roptions.snapshot = snap;
3656 PinnableSlice value;
3657 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
3658 ASSERT_OK(s);
3659 // It should not see the uncommitted value of delayed prepared
3660 ASSERT_TRUE(value == init_value);
3661 db->ReleaseSnapshot(snap);
3662 });
3663
3664 read_thread.join();
3665 commit_thread.join();
3666 ASSERT_OK(txn->Commit());
3667 delete txn;
3668 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3669 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3670 }
3671
3672 // Eviction from commit cache and update of max evicted seq are two non-atomic
3673 // steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading
3674 // from commit cache are two non-atomic steps. This tests if the update occurs
3675 // after reading max_evicted_seq_ and before reading the commit cache.
3676 // Note: the test focuses on snapshot larger than max_evicted_seq_
3677 TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
3678 const size_t snapshot_cache_bits = 7; // same as default
3679 const size_t commit_cache_bits = 3; // 8 entries
3680 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3681 ASSERT_OK(ReOpen());
3682 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3683 // Fill up the commit cache
3684 std::string init_value("value1");
3685 std::string last_value("value_final");
3686 for (int i = 0; i < 10; i++) {
3687 ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
3688 }
3689 // Do an uncommitted write to prevent min_uncommitted optimization
3690 Transaction* txn1 =
3691 db->BeginTransaction(WriteOptions(), TransactionOptions());
3692 ASSERT_OK(txn1->SetName("xid1"));
3693 ASSERT_OK(txn1->Put(Slice("key0"), last_value));
3694 ASSERT_OK(txn1->Prepare());
3695 // Do a write with prepare to get the prepare seq
3696 Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
3697 ASSERT_OK(txn->SetName("xid"));
3698 ASSERT_OK(txn->Put(Slice("key1"), last_value));
3699 ASSERT_OK(txn->Prepare());
3700 ASSERT_OK(txn->Commit());
3701 // Create a gap between commit entry and snapshot seq
3702 ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3703 ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3704 // The snapshot should see the last commit
3705 auto snap = db->GetSnapshot();
3706 ASSERT_LE(txn->GetId(), snap->GetSequenceNumber());
3707
3708 // split right after reading max_evicted_seq_
3709 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3710 {{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause",
3711 "NonAtomicUpdateOfMaxEvictedSeq:before"},
3712 {"NonAtomicUpdateOfMaxEvictedSeq:after",
3713 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}});
3714 SyncPoint::GetInstance()->EnableProcessing();
3715
3716 ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
3717 TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before");
3718 // Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
3719 size_t tries = 0;
3720 while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
3721 ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
3722 tries++;
3723 };
3724 ASSERT_LT(tries, 50);
3725 // This is the case on which the test focuses
3726 ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
3727 TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after");
3728 });
3729
3730 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3731 ReadOptions roptions;
3732 roptions.snapshot = snap;
3733 PinnableSlice value;
3734 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
3735 ASSERT_OK(s);
3736 // It should see the committed value of the evicted entry
3737 ASSERT_TRUE(value == last_value);
3738 db->ReleaseSnapshot(snap);
3739 });
3740
3741 read_thread.join();
3742 commit_thread.join();
3743 delete txn;
3744 ASSERT_OK(txn1->Commit());
3745 delete txn1;
3746 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3747 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3748 }
3749
3750 // Test when we add a prepared seq when the max_evicted_seq_ already goes beyond
3751 // that. The test focuses on a race condition between AddPrepared and
3752 // AdvanceMaxEvictedSeq functions.
3753 TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
3754 if (!options.two_write_queues) {
3755 // This test is only for two write queues
3756 return;
3757 }
3758 const size_t snapshot_cache_bits = 7; // same as default
3759 // 1 entry to advance max after the 2nd commit
3760 const size_t commit_cache_bits = 0;
3761 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3762 ASSERT_OK(ReOpen());
3763 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
3764 std::string some_value("value_some");
3765 std::string uncommitted_value("value_uncommitted");
3766 // Prepare two uncommitted transactions
3767 Transaction* txn1 =
3768 db->BeginTransaction(WriteOptions(), TransactionOptions());
3769 ASSERT_OK(txn1->SetName("xid1"));
3770 ASSERT_OK(txn1->Put(Slice("key1"), some_value));
3771 ASSERT_OK(txn1->Prepare());
3772 Transaction* txn2 =
3773 db->BeginTransaction(WriteOptions(), TransactionOptions());
3774 ASSERT_OK(txn2->SetName("xid2"));
3775 ASSERT_OK(txn2->Put(Slice("key2"), some_value));
3776 ASSERT_OK(txn2->Prepare());
3777 // Start the txn here so the other thread could get its id
3778 Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
3779 ASSERT_OK(txn->SetName("xid"));
3780 ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
3781 port::Mutex txn_mutex_;
3782
3783 // t1) Insert prepared entry, t2) commit other entries to advance max
3784 // evicted sec and finish checking the existing prepared entries, t1)
3785 // AddPrepared, t2) update max_evicted_seq_
3786 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3787 {"AddPreparedCallback::AddPrepared::begin:pause",
3788 "AddPreparedBeforeMax::read_thread:start"},
3789 {"AdvanceMaxEvictedSeq::update_max:pause",
3790 "AddPreparedCallback::AddPrepared::begin:resume"},
3791 {"AddPreparedCallback::AddPrepared::end",
3792 "AdvanceMaxEvictedSeq::update_max:resume"},
3793 });
3794 SyncPoint::GetInstance()->EnableProcessing();
3795
3796 ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
3797 txn_mutex_.Lock();
3798 ASSERT_OK(txn->Prepare());
3799 txn_mutex_.Unlock();
3800 });
3801
3802 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3803 TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start");
3804 // Publish seq number with a commit
3805 ASSERT_OK(txn1->Commit());
3806 // Since the commit cache size is one the 2nd commit evict the 1st one and
3807 // invokes AdcanceMaxEvictedSeq
3808 ASSERT_OK(txn2->Commit());
3809
3810 ReadOptions roptions;
3811 PinnableSlice value;
3812 // The snapshot should not see the uncommitted value from write_thread
3813 auto snap = db->GetSnapshot();
3814 ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
3815 // This is the scenario that we test for
3816 txn_mutex_.Lock();
3817 ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId());
3818 txn_mutex_.Unlock();
3819 roptions.snapshot = snap;
3820 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value);
3821 ASSERT_TRUE(s.IsNotFound());
3822 db->ReleaseSnapshot(snap);
3823 });
3824
3825 read_thread.join();
3826 write_thread.join();
3827 delete txn1;
3828 delete txn2;
3829 ASSERT_OK(txn->Commit());
3830 delete txn;
3831 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3832 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3833 }
3834
3835 // When an old prepared entry gets committed, there is a gap between the time
3836 // that it is published and when it is cleaned up from old_prepared_. This test
3837 // stresses such cases.
3838 TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
3839 const size_t snapshot_cache_bits = 7; // same as default
3840 for (const size_t commit_cache_bits : {0, 2, 3}) {
3841 for (const size_t sub_batch_cnt : {1, 2, 3}) {
3842 UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
3843 ASSERT_OK(ReOpen());
3844 std::atomic<const Snapshot*> snap = {nullptr};
3845 std::atomic<SequenceNumber> exp_prepare = {0};
3846 ROCKSDB_NAMESPACE::port::Thread callback_thread;
3847 // Value is synchronized via snap
3848 PinnableSlice value;
3849 // Take a snapshot after publish and before RemovePrepared:Start
3850 auto snap_callback = [&]() {
3851 ASSERT_EQ(nullptr, snap.load());
3852 snap.store(db->GetSnapshot());
3853 ReadOptions roptions;
3854 roptions.snapshot = snap.load();
3855 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key2", &value);
3856 ASSERT_OK(s);
3857 };
3858 auto callback = [&](void* param) {
3859 SequenceNumber prep_seq = *((SequenceNumber*)param);
3860 if (prep_seq == exp_prepare.load()) { // only for write_thread
3861 // We need to spawn a thread to avoid deadlock since getting a
3862 // snpashot might end up calling AdvanceSeqByOne which needs joining
3863 // the write queue.
3864 callback_thread = ROCKSDB_NAMESPACE::port::Thread(snap_callback);
3865 TEST_SYNC_POINT("callback:end");
3866 }
3867 };
3868 // Wait for the first snapshot be taken in GetSnapshotInternal. Although
3869 // it might be updated before GetSnapshotInternal finishes but this should
3870 // cover most of the cases.
3871 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3872 {"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"},
3873 });
3874 SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
3875 SyncPoint::GetInstance()->EnableProcessing();
3876 // Thread to cause frequent evictions
3877 ROCKSDB_NAMESPACE::port::Thread eviction_thread([&]() {
3878 // Too many txns might cause commit_seq - prepare_seq in another thread
3879 // to go beyond DELTA_UPPERBOUND
3880 for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
3881 ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice("value1")));
3882 }
3883 });
3884 ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
3885 for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
3886 Transaction* txn =
3887 db->BeginTransaction(WriteOptions(), TransactionOptions());
3888 ASSERT_OK(txn->SetName("xid"));
3889 std::string val_str = "value" + std::to_string(i);
3890 for (size_t b = 0; b < sub_batch_cnt; b++) {
3891 ASSERT_OK(txn->Put(Slice("key2"), val_str));
3892 }
3893 ASSERT_OK(txn->Prepare());
3894 // Let an eviction to kick in
3895 std::this_thread::yield();
3896
3897 exp_prepare.store(txn->GetId());
3898 ASSERT_OK(txn->Commit());
3899 delete txn;
3900 // Wait for the snapshot taking that is triggered by
3901 // RemovePrepared:Start callback
3902 callback_thread.join();
3903
3904 // Read with the snapshot taken before delayed_prepared_ cleanup
3905 ReadOptions roptions;
3906 roptions.snapshot = snap.load();
3907 ASSERT_NE(nullptr, roptions.snapshot);
3908 PinnableSlice value2;
3909 auto s =
3910 db->Get(roptions, db->DefaultColumnFamily(), "key2", &value2);
3911 ASSERT_OK(s);
3912 // It should see its own write
3913 ASSERT_TRUE(val_str == value2);
3914 // The value read by snapshot should not change
3915 ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str());
3916
3917 db->ReleaseSnapshot(roptions.snapshot);
3918 snap.store(nullptr);
3919 }
3920 });
3921 write_thread.join();
3922 eviction_thread.join();
3923 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3924 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
3925 }
3926 }
3927 }
3928
3929 // Test that updating the commit map will not affect the existing snapshots
3930 TEST_P(WritePreparedTransactionTest, AtomicCommit) {
3931 for (bool skip_prepare : {true, false}) {
3932 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3933 {"WritePreparedTxnDB::AddCommitted:start",
3934 "AtomicCommit::GetSnapshot:start"},
3935 {"AtomicCommit::Get:end",
3936 "WritePreparedTxnDB::AddCommitted:start:pause"},
3937 {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
3938 {"AtomicCommit::Get2:end",
3939 "WritePreparedTxnDB::AddCommitted:end:pause:"},
3940 });
3941 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3942 ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
3943 if (skip_prepare) {
3944 ASSERT_OK(db->Put(WriteOptions(), Slice("key"), Slice("value")));
3945 } else {
3946 Transaction* txn =
3947 db->BeginTransaction(WriteOptions(), TransactionOptions());
3948 ASSERT_OK(txn->SetName("xid"));
3949 ASSERT_OK(txn->Put(Slice("key"), Slice("value")));
3950 ASSERT_OK(txn->Prepare());
3951 ASSERT_OK(txn->Commit());
3952 delete txn;
3953 }
3954 });
3955 ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
3956 ReadOptions roptions;
3957 TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
3958 roptions.snapshot = db->GetSnapshot();
3959 PinnableSlice val;
3960 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val);
3961 TEST_SYNC_POINT("AtomicCommit::Get:end");
3962 TEST_SYNC_POINT("AtomicCommit::Get2:start");
3963 ASSERT_SAME(roptions, db, s, val, "key");
3964 TEST_SYNC_POINT("AtomicCommit::Get2:end");
3965 db->ReleaseSnapshot(roptions.snapshot);
3966 });
3967 read_thread.join();
3968 write_thread.join();
3969 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3970 }
3971 }
3972
3973 TEST_P(WritePreparedTransactionTest, BasicRollbackDeletionTypeCb) {
3974 options.level0_file_num_compaction_trigger = 2;
3975 // Always use SingleDelete to rollback Put.
3976 txn_db_options.rollback_deletion_type_callback =
3977 [](TransactionDB*, ColumnFamilyHandle*, const Slice&) { return true; };
3978
3979 const auto write_to_db = [&]() {
3980 assert(db);
3981 std::unique_ptr<Transaction> txn0(
3982 db->BeginTransaction(WriteOptions(), TransactionOptions()));
3983 ASSERT_OK(txn0->SetName("txn0"));
3984 ASSERT_OK(txn0->Put("a", "v0"));
3985 ASSERT_OK(txn0->Prepare());
3986
3987 // Generate sst1: [PUT('a')]
3988 ASSERT_OK(db->Flush(FlushOptions()));
3989
3990 {
3991 CompactRangeOptions cro;
3992 cro.change_level = true;
3993 cro.target_level = options.num_levels - 1;
3994 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
3995 ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
3996 }
3997
3998 ASSERT_OK(txn0->Rollback());
3999 txn0.reset();
4000
4001 ASSERT_OK(db->Put(WriteOptions(), "a", "v1"));
4002
4003 ASSERT_OK(db->SingleDelete(WriteOptions(), "a"));
4004 // Generate another SST with a SD to cover the oldest PUT('a')
4005 ASSERT_OK(db->Flush(FlushOptions()));
4006
4007 auto* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
4008 assert(dbimpl);
4009 ASSERT_OK(dbimpl->TEST_WaitForCompact());
4010
4011 {
4012 CompactRangeOptions cro;
4013 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
4014 ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
4015 }
4016
4017 {
4018 std::string value;
4019 const Status s = db->Get(ReadOptions(), "a", &value);
4020 ASSERT_TRUE(s.IsNotFound());
4021 }
4022 };
4023
4024 // Destroy and reopen
4025 ASSERT_OK(ReOpen());
4026 write_to_db();
4027 }
4028
4029 // Test that we can change write policy from WriteCommitted to WritePrepared
4030 // after a clean shutdown (which would empty the WAL)
4031 TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {
4032 bool empty_wal = true;
4033 CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal);
4034 }
4035
4036 // Test that we fail fast if WAL is not emptied between changing the write
4037 // policy from WriteCommitted to WritePrepared
4038 TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) {
4039 bool empty_wal = true;
4040 CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal);
4041 }
4042
4043 // Test that we can change write policy from WritePrepare back to WriteCommitted
4044 // after a clean shutdown (which would empty the WAL)
4045 TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) {
4046 bool empty_wal = true;
4047 CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal);
4048 }
4049
4050 // Test that we fail fast if WAL is not emptied between changing the write
4051 // policy from WriteCommitted to WritePrepared
4052 TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) {
4053 bool empty_wal = true;
4054 CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal);
4055 }
4056
4057 } // namespace ROCKSDB_NAMESPACE
4058
4059 int main(int argc, char** argv) {
4060 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
4061 ::testing::InitGoogleTest(&argc, argv);
4062 if (getenv("CIRCLECI")) {
4063 // Looking for backtrace on "Resource temporarily unavailable" exceptions
4064 ::testing::FLAGS_gtest_catch_exceptions = false;
4065 }
4066 return RUN_ALL_TESTS();
4067 }
4068
4069 #else
4070 #include <stdio.h>
4071
4072 int main(int /*argc*/, char** /*argv*/) {
4073 fprintf(stderr,
4074 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
4075 return 0;
4076 }
4077
4078 #endif // ROCKSDB_LITE