]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_prepared_transaction_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_transaction_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
10 #endif
11
12 #include "utilities/transactions/transaction_test.h"
13
14 #include <inttypes.h>
15 #include <algorithm>
16 #include <functional>
17 #include <string>
18 #include <thread>
19
20 #include "db/db_impl.h"
21 #include "db/dbformat.h"
22 #include "rocksdb/db.h"
23 #include "rocksdb/options.h"
24 #include "rocksdb/types.h"
25 #include "rocksdb/utilities/debug.h"
26 #include "rocksdb/utilities/transaction.h"
27 #include "rocksdb/utilities/transaction_db.h"
28 #include "table/mock_table.h"
29 #include "util/fault_injection_test_env.h"
30 #include "util/random.h"
31 #include "util/string_util.h"
32 #include "util/sync_point.h"
33 #include "util/testharness.h"
34 #include "util/testutil.h"
35 #include "util/transaction_test_util.h"
36 #include "utilities/merge_operators.h"
37 #include "utilities/merge_operators/string_append/stringappend.h"
38 #include "utilities/transactions/pessimistic_transaction_db.h"
39 #include "utilities/transactions/write_prepared_txn_db.h"
40
41 #include "port/port.h"
42
43 using std::string;
44
45 namespace rocksdb {
46
47 using CommitEntry = WritePreparedTxnDB::CommitEntry;
48 using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b;
49 using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat;
50
51 TEST(PreparedHeap, BasicsTest) {
52 WritePreparedTxnDB::PreparedHeap heap;
53 heap.push(14l);
54 // Test with one element
55 ASSERT_EQ(14l, heap.top());
56 heap.push(24l);
57 heap.push(34l);
58 // Test that old min is still on top
59 ASSERT_EQ(14l, heap.top());
60 heap.push(13l);
61 // Test that the new min will be on top
62 ASSERT_EQ(13l, heap.top());
63 // Test that it is persistent
64 ASSERT_EQ(13l, heap.top());
65 heap.push(44l);
66 heap.push(54l);
67 heap.push(64l);
68 heap.push(74l);
69 heap.push(84l);
70 // Test that old min is still on top
71 ASSERT_EQ(13l, heap.top());
72 heap.erase(24l);
73 // Test that old min is still on top
74 ASSERT_EQ(13l, heap.top());
75 heap.erase(14l);
76 // Test that old min is still on top
77 ASSERT_EQ(13l, heap.top());
78 heap.erase(13l);
79 // Test that the new comes to the top after multiple erase
80 ASSERT_EQ(34l, heap.top());
81 heap.erase(34l);
82 // Test that the new comes to the top after single erase
83 ASSERT_EQ(44l, heap.top());
84 heap.erase(54l);
85 ASSERT_EQ(44l, heap.top());
86 heap.pop(); // pop 44l
87 // Test that the erased items are ignored after pop
88 ASSERT_EQ(64l, heap.top());
89 heap.erase(44l);
90 // Test that erasing an already popped item would work
91 ASSERT_EQ(64l, heap.top());
92 heap.erase(84l);
93 ASSERT_EQ(64l, heap.top());
94 heap.push(85l);
95 heap.push(86l);
96 heap.push(87l);
97 heap.push(88l);
98 heap.push(89l);
99 heap.erase(87l);
100 heap.erase(85l);
101 heap.erase(89l);
102 heap.erase(86l);
103 heap.erase(88l);
104 // Test top remains the same after a random order of many erases
105 ASSERT_EQ(64l, heap.top());
106 heap.pop();
107 // Test that pop works with a series of random pending erases
108 ASSERT_EQ(74l, heap.top());
109 ASSERT_FALSE(heap.empty());
110 heap.pop();
111 // Test that empty works
112 ASSERT_TRUE(heap.empty());
113 }
114
115 // This is a scenario reconstructed from a buggy trace. Test that the bug does
116 // not resurface again.
117 TEST(PreparedHeap, EmptyAtTheEnd) {
118 WritePreparedTxnDB::PreparedHeap heap;
119 heap.push(40l);
120 ASSERT_EQ(40l, heap.top());
121 // Although not a recommended scenario, we must be resilient against erase
122 // without a prior push.
123 heap.erase(50l);
124 ASSERT_EQ(40l, heap.top());
125 heap.push(60l);
126 ASSERT_EQ(40l, heap.top());
127
128 heap.erase(60l);
129 ASSERT_EQ(40l, heap.top());
130 heap.erase(40l);
131 ASSERT_TRUE(heap.empty());
132
133 heap.push(40l);
134 ASSERT_EQ(40l, heap.top());
135 heap.erase(50l);
136 ASSERT_EQ(40l, heap.top());
137 heap.push(60l);
138 ASSERT_EQ(40l, heap.top());
139
140 heap.erase(40l);
141 // Test that the erase has not emptied the heap (we had a bug doing that)
142 ASSERT_FALSE(heap.empty());
143 ASSERT_EQ(60l, heap.top());
144 heap.erase(60l);
145 ASSERT_TRUE(heap.empty());
146 }
147
148 // Generate random order of PreparedHeap access and test that the heap will be
149 // successfully emptied at the end.
150 TEST(PreparedHeap, Concurrent) {
151 const size_t t_cnt = 10;
152 rocksdb::port::Thread t[t_cnt];
153 Random rnd(1103);
154 WritePreparedTxnDB::PreparedHeap heap;
155 port::RWMutex prepared_mutex;
156
157 for (size_t n = 0; n < 100; n++) {
158 for (size_t i = 0; i < t_cnt; i++) {
159 // This is not recommended usage but we should be resilient against it.
160 bool skip_push = rnd.OneIn(5);
161 t[i] = rocksdb::port::Thread([&heap, &prepared_mutex, skip_push, i]() {
162 auto seq = i;
163 std::this_thread::yield();
164 if (!skip_push) {
165 WriteLock wl(&prepared_mutex);
166 heap.push(seq);
167 }
168 std::this_thread::yield();
169 {
170 WriteLock wl(&prepared_mutex);
171 heap.erase(seq);
172 }
173 });
174 }
175 for (size_t i = 0; i < t_cnt; i++) {
176 t[i].join();
177 }
178 ASSERT_TRUE(heap.empty());
179 }
180 }
181
182 // Test that WriteBatchWithIndex correctly counts the number of sub-batches
183 TEST(WriteBatchWithIndex, SubBatchCnt) {
184 ColumnFamilyOptions cf_options;
185 std::string cf_name = "two";
186 DB* db;
187 Options options;
188 options.create_if_missing = true;
189 const std::string dbname = test::PerThreadDBPath("transaction_testdb");
190 DestroyDB(dbname, options);
191 ASSERT_OK(DB::Open(options, dbname, &db));
192 ColumnFamilyHandle* cf_handle = nullptr;
193 ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
194 WriteOptions write_options;
195 size_t batch_cnt = 1;
196 size_t save_points = 0;
197 std::vector<size_t> batch_cnt_at;
198 WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true,
199 0);
200 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
201 batch_cnt_at.push_back(batch_cnt);
202 batch.SetSavePoint();
203 save_points++;
204 batch.Put(Slice("key"), Slice("value"));
205 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
206 batch_cnt_at.push_back(batch_cnt);
207 batch.SetSavePoint();
208 save_points++;
209 batch.Put(Slice("key2"), Slice("value2"));
210 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
211 // duplicate the keys
212 batch_cnt_at.push_back(batch_cnt);
213 batch.SetSavePoint();
214 save_points++;
215 batch.Put(Slice("key"), Slice("value3"));
216 batch_cnt++;
217 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
218 // duplicate the 2nd key. It should not be counted duplicate since a
219 // sub-patch is cut after the last duplicate.
220 batch_cnt_at.push_back(batch_cnt);
221 batch.SetSavePoint();
222 save_points++;
223 batch.Put(Slice("key2"), Slice("value4"));
224 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
225 // duplicate the keys but in a different cf. It should not be counted as
226 // duplicate keys
227 batch_cnt_at.push_back(batch_cnt);
228 batch.SetSavePoint();
229 save_points++;
230 batch.Put(cf_handle, Slice("key"), Slice("value5"));
231 ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
232
233 // Test that the number of sub-batches matches what we count with
234 // SubBatchCounter
235 std::map<uint32_t, const Comparator*> comparators;
236 comparators[0] = db->DefaultColumnFamily()->GetComparator();
237 comparators[cf_handle->GetID()] = cf_handle->GetComparator();
238 SubBatchCounter counter(comparators);
239 ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter));
240 ASSERT_EQ(batch_cnt, counter.BatchCount());
241
242 // Test that RollbackToSavePoint will properly resets the number of
243 // sub-batches
244 for (size_t i = save_points; i > 0; i--) {
245 batch.RollbackToSavePoint();
246 ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt());
247 }
248
249 // Test the count is right with random batches
250 {
251 const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms
252 Random rnd(1131);
253 std::string keys[TOTAL_KEYS];
254 for (size_t k = 0; k < TOTAL_KEYS; k++) {
255 int len = static_cast<int>(rnd.Uniform(50));
256 keys[k] = test::RandomKey(&rnd, len);
257 }
258 for (size_t i = 0; i < 1000; i++) { // 1000 random batches
259 WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(),
260 0, true, 0);
261 for (size_t k = 0; k < 10; k++) { // 10 key per batch
262 size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS));
263 Slice key = Slice(keys[ki]);
264 std::string buffer;
265 Slice value = Slice(test::RandomString(&rnd, 16, &buffer));
266 rndbatch.Put(key, value);
267 }
268 SubBatchCounter batch_counter(comparators);
269 ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter));
270 ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount());
271 }
272 }
273
274 delete cf_handle;
275 delete db;
276 }
277
278 TEST(CommitEntry64b, BasicTest) {
279 const size_t INDEX_BITS = static_cast<size_t>(21);
280 const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);
281 const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS));
282
283 // zero-initialized CommitEntry64b should indicate an empty entry
284 CommitEntry64b empty_entry64b;
285 uint64_t empty_index = 11ul;
286 CommitEntry empty_entry;
287 bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT);
288 ASSERT_FALSE(ok);
289
290 // the zero entry is reserved for un-initialized entries
291 const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1;
292 // Samples over the numbers that are covered by that many index bits
293 std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}};
294 // Samples over the numbers that are covered by that many commit bits
295 std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}};
296 // Iterate over prepare numbers that have i) cover all bits of a sequence
297 // number, and ii) include some bits that fall into the range of index or
298 // commit bits
299 for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) {
300 for (uint64_t i : is) {
301 for (uint64_t d : ds) {
302 uint64_t p = base + i + d;
303 for (uint64_t c : {p, p + d / 2, p + d}) {
304 uint64_t index = p % INDEX_SIZE;
305 CommitEntry before(p, c), after;
306 CommitEntry64b entry64b(before, FORMAT);
307 ok = entry64b.Parse(index, &after, FORMAT);
308 ASSERT_TRUE(ok);
309 if (!(before == after)) {
310 printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64
311 " c %" PRIu64 " index %" PRIu64 "\n",
312 base, i, d, p, c, index);
313 }
314 ASSERT_EQ(before, after);
315 }
316 }
317 }
318 }
319 }
320
321 class WritePreparedTxnDBMock : public WritePreparedTxnDB {
322 public:
323 WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
324 : WritePreparedTxnDB(db_impl, opt) {}
325 WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
326 size_t snapshot_cache_size)
327 : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size) {}
328 WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
329 size_t snapshot_cache_size, size_t commit_cache_size)
330 : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size,
331 commit_cache_size) {}
332 void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
333 snapshots_ = snapshots;
334 }
335 void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
336
337 protected:
338 virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
339 SequenceNumber /* unused */) override {
340 return snapshots_;
341 }
342
343 private:
344 std::vector<SequenceNumber> snapshots_;
345 };
346
347 class WritePreparedTransactionTestBase : public TransactionTestBase {
348 public:
349 WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue,
350 TxnDBWritePolicy write_policy)
351 : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){};
352
353 protected:
354 // If expect_update is set, check if it actually updated old_commit_map_. If
355 // it did not and yet suggested not to check the next snapshot, do the
356 // opposite to check if it was not a bad suggestion.
357 void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit,
358 uint64_t snapshot,
359 uint64_t next_snapshot,
360 bool expect_update) {
361 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
362 // reset old_commit_map_empty_ so that its value indicate whether
363 // old_commit_map_ was updated
364 wp_db->old_commit_map_empty_ = true;
365 bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot,
366 snapshot < next_snapshot);
367 if (expect_update == wp_db->old_commit_map_empty_) {
368 printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
369 " next: %" PRIu64 "\n",
370 prepare, commit, snapshot, next_snapshot);
371 }
372 EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_);
373 if (!check_next && wp_db->old_commit_map_empty_) {
374 // do the opposite to make sure it was not a bad suggestion
375 const bool dont_care_bool = true;
376 wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot,
377 dont_care_bool);
378 if (!wp_db->old_commit_map_empty_) {
379 printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
380 " next: %" PRIu64 "\n",
381 prepare, commit, snapshot, next_snapshot);
382 }
383 EXPECT_TRUE(wp_db->old_commit_map_empty_);
384 }
385 }
386
387 // Test that a CheckAgainstSnapshots thread reading old_snapshots will not
388 // miss a snapshot because of a concurrent update by UpdateSnapshots that is
389 // writing new_snapshots. Both threads are broken at two points. The sync
390 // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
391 // entry is expected to be vital for one of the snapshots that is common
392 // between the old and new list of snapshots.
393 void SnapshotConcurrentAccessTestInternal(
394 WritePreparedTxnDB* wp_db,
395 const std::vector<SequenceNumber>& old_snapshots,
396 const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry,
397 SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) {
398 // First reset the snapshot list
399 const std::vector<SequenceNumber> empty_snapshots;
400 wp_db->old_commit_map_empty_ = true;
401 wp_db->UpdateSnapshots(empty_snapshots, ++version);
402 // Then initialize it with the old_snapshots
403 wp_db->UpdateSnapshots(old_snapshots, ++version);
404
405 // Starting from the first thread, cut each thread at two points
406 rocksdb::SyncPoint::GetInstance()->LoadDependency({
407 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1),
408 "WritePreparedTxnDB::UpdateSnapshots:s:start"},
409 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1),
410 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)},
411 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2),
412 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)},
413 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2),
414 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)},
415 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
416 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)},
417 });
418 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
419 {
420 ASSERT_TRUE(wp_db->old_commit_map_empty_);
421 rocksdb::port::Thread t1(
422 [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
423 rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); });
424 t1.join();
425 t2.join();
426 ASSERT_FALSE(wp_db->old_commit_map_empty_);
427 }
428 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
429
430 wp_db->old_commit_map_empty_ = true;
431 wp_db->UpdateSnapshots(empty_snapshots, ++version);
432 wp_db->UpdateSnapshots(old_snapshots, ++version);
433 // Starting from the second thread, cut each thread at two points
434 rocksdb::SyncPoint::GetInstance()->LoadDependency({
435 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1),
436 "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
437 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1),
438 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)},
439 {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2),
440 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)},
441 {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2),
442 "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)},
443 {"WritePreparedTxnDB::UpdateSnapshots:p:end",
444 "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)},
445 });
446 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
447 {
448 ASSERT_TRUE(wp_db->old_commit_map_empty_);
449 rocksdb::port::Thread t1(
450 [&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
451 rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); });
452 t1.join();
453 t2.join();
454 ASSERT_FALSE(wp_db->old_commit_map_empty_);
455 }
456 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
457 }
458
459 // Verify value of keys.
460 void VerifyKeys(const std::unordered_map<std::string, std::string>& data,
461 const Snapshot* snapshot = nullptr) {
462 std::string value;
463 ReadOptions read_options;
464 read_options.snapshot = snapshot;
465 for (auto& kv : data) {
466 auto s = db->Get(read_options, kv.first, &value);
467 ASSERT_TRUE(s.ok() || s.IsNotFound());
468 if (s.ok()) {
469 if (kv.second != value) {
470 printf("key = %s\n", kv.first.c_str());
471 }
472 ASSERT_EQ(kv.second, value);
473 } else {
474 ASSERT_EQ(kv.second, "NOT_FOUND");
475 }
476
477 // Try with MultiGet API too
478 std::vector<std::string> values;
479 auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()},
480 {kv.first}, &values);
481 ASSERT_EQ(1, values.size());
482 ASSERT_EQ(1, s_vec.size());
483 s = s_vec[0];
484 ASSERT_TRUE(s.ok() || s.IsNotFound());
485 if (s.ok()) {
486 ASSERT_TRUE(kv.second == values[0]);
487 } else {
488 ASSERT_EQ(kv.second, "NOT_FOUND");
489 }
490 }
491 }
492
493 // Verify all versions of keys.
494 void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
495 std::vector<KeyVersion> versions;
496 const size_t kMaxKeys = 100000;
497 ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
498 expected_versions.back().user_key, kMaxKeys,
499 &versions));
500 ASSERT_EQ(expected_versions.size(), versions.size());
501 for (size_t i = 0; i < versions.size(); i++) {
502 ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
503 ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence);
504 ASSERT_EQ(expected_versions[i].type, versions[i].type);
505 if (versions[i].type != kTypeDeletion &&
506 versions[i].type != kTypeSingleDeletion) {
507 ASSERT_EQ(expected_versions[i].value, versions[i].value);
508 }
509 // Range delete not supported.
510 assert(expected_versions[i].type != kTypeRangeDeletion);
511 }
512 }
513 };
514
515 class WritePreparedTransactionTest
516 : public WritePreparedTransactionTestBase,
517 virtual public ::testing::WithParamInterface<
518 std::tuple<bool, bool, TxnDBWritePolicy>> {
519 public:
520 WritePreparedTransactionTest()
521 : WritePreparedTransactionTestBase(std::get<0>(GetParam()),
522 std::get<1>(GetParam()),
523 std::get<2>(GetParam())){};
524 };
525
526 #ifndef ROCKSDB_VALGRIND_RUN
527 class SnapshotConcurrentAccessTest
528 : public WritePreparedTransactionTestBase,
529 virtual public ::testing::WithParamInterface<
530 std::tuple<bool, bool, TxnDBWritePolicy, size_t, size_t>> {
531 public:
532 SnapshotConcurrentAccessTest()
533 : WritePreparedTransactionTestBase(std::get<0>(GetParam()),
534 std::get<1>(GetParam()),
535 std::get<2>(GetParam())),
536 split_id_(std::get<3>(GetParam())),
537 split_cnt_(std::get<4>(GetParam())){};
538
539 protected:
540 // A test is split into split_cnt_ tests, each identified with split_id_ where
541 // 0 <= split_id_ < split_cnt_
542 size_t split_id_;
543 size_t split_cnt_;
544 };
545 #endif // ROCKSDB_VALGRIND_RUN
546
547 class SeqAdvanceConcurrentTest
548 : public WritePreparedTransactionTestBase,
549 virtual public ::testing::WithParamInterface<
550 std::tuple<bool, bool, TxnDBWritePolicy, size_t, size_t>> {
551 public:
552 SeqAdvanceConcurrentTest()
553 : WritePreparedTransactionTestBase(std::get<0>(GetParam()),
554 std::get<1>(GetParam()),
555 std::get<2>(GetParam())),
556 split_id_(std::get<3>(GetParam())),
557 split_cnt_(std::get<4>(GetParam())){};
558
559 protected:
560 // A test is split into split_cnt_ tests, each identified with split_id_ where
561 // 0 <= split_id_ < split_cnt_
562 size_t split_id_;
563 size_t split_cnt_;
564 };
565
566 INSTANTIATE_TEST_CASE_P(
567 WritePreparedTransactionTest, WritePreparedTransactionTest,
568 ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED),
569 std::make_tuple(false, true, WRITE_PREPARED)));
570
571 #ifndef ROCKSDB_VALGRIND_RUN
572 INSTANTIATE_TEST_CASE_P(
573 TwoWriteQueues, SnapshotConcurrentAccessTest,
574 ::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 20),
575 std::make_tuple(false, true, WRITE_PREPARED, 1, 20),
576 std::make_tuple(false, true, WRITE_PREPARED, 2, 20),
577 std::make_tuple(false, true, WRITE_PREPARED, 3, 20),
578 std::make_tuple(false, true, WRITE_PREPARED, 4, 20),
579 std::make_tuple(false, true, WRITE_PREPARED, 5, 20),
580 std::make_tuple(false, true, WRITE_PREPARED, 6, 20),
581 std::make_tuple(false, true, WRITE_PREPARED, 7, 20),
582 std::make_tuple(false, true, WRITE_PREPARED, 8, 20),
583 std::make_tuple(false, true, WRITE_PREPARED, 9, 20),
584 std::make_tuple(false, true, WRITE_PREPARED, 10, 20),
585 std::make_tuple(false, true, WRITE_PREPARED, 11, 20),
586 std::make_tuple(false, true, WRITE_PREPARED, 12, 20),
587 std::make_tuple(false, true, WRITE_PREPARED, 13, 20),
588 std::make_tuple(false, true, WRITE_PREPARED, 14, 20),
589 std::make_tuple(false, true, WRITE_PREPARED, 15, 20),
590 std::make_tuple(false, true, WRITE_PREPARED, 16, 20),
591 std::make_tuple(false, true, WRITE_PREPARED, 17, 20),
592 std::make_tuple(false, true, WRITE_PREPARED, 18, 20),
593 std::make_tuple(false, true, WRITE_PREPARED, 19, 20)));
594
595 INSTANTIATE_TEST_CASE_P(
596 OneWriteQueue, SnapshotConcurrentAccessTest,
597 ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 20),
598 std::make_tuple(false, false, WRITE_PREPARED, 1, 20),
599 std::make_tuple(false, false, WRITE_PREPARED, 2, 20),
600 std::make_tuple(false, false, WRITE_PREPARED, 3, 20),
601 std::make_tuple(false, false, WRITE_PREPARED, 4, 20),
602 std::make_tuple(false, false, WRITE_PREPARED, 5, 20),
603 std::make_tuple(false, false, WRITE_PREPARED, 6, 20),
604 std::make_tuple(false, false, WRITE_PREPARED, 7, 20),
605 std::make_tuple(false, false, WRITE_PREPARED, 8, 20),
606 std::make_tuple(false, false, WRITE_PREPARED, 9, 20),
607 std::make_tuple(false, false, WRITE_PREPARED, 10, 20),
608 std::make_tuple(false, false, WRITE_PREPARED, 11, 20),
609 std::make_tuple(false, false, WRITE_PREPARED, 12, 20),
610 std::make_tuple(false, false, WRITE_PREPARED, 13, 20),
611 std::make_tuple(false, false, WRITE_PREPARED, 14, 20),
612 std::make_tuple(false, false, WRITE_PREPARED, 15, 20),
613 std::make_tuple(false, false, WRITE_PREPARED, 16, 20),
614 std::make_tuple(false, false, WRITE_PREPARED, 17, 20),
615 std::make_tuple(false, false, WRITE_PREPARED, 18, 20),
616 std::make_tuple(false, false, WRITE_PREPARED, 19, 20)));
617
618 INSTANTIATE_TEST_CASE_P(
619 TwoWriteQueues, SeqAdvanceConcurrentTest,
620 ::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 10),
621 std::make_tuple(false, true, WRITE_PREPARED, 1, 10),
622 std::make_tuple(false, true, WRITE_PREPARED, 2, 10),
623 std::make_tuple(false, true, WRITE_PREPARED, 3, 10),
624 std::make_tuple(false, true, WRITE_PREPARED, 4, 10),
625 std::make_tuple(false, true, WRITE_PREPARED, 5, 10),
626 std::make_tuple(false, true, WRITE_PREPARED, 6, 10),
627 std::make_tuple(false, true, WRITE_PREPARED, 7, 10),
628 std::make_tuple(false, true, WRITE_PREPARED, 8, 10),
629 std::make_tuple(false, true, WRITE_PREPARED, 9, 10)));
630
631 INSTANTIATE_TEST_CASE_P(
632 OneWriteQueue, SeqAdvanceConcurrentTest,
633 ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 10),
634 std::make_tuple(false, false, WRITE_PREPARED, 1, 10),
635 std::make_tuple(false, false, WRITE_PREPARED, 2, 10),
636 std::make_tuple(false, false, WRITE_PREPARED, 3, 10),
637 std::make_tuple(false, false, WRITE_PREPARED, 4, 10),
638 std::make_tuple(false, false, WRITE_PREPARED, 5, 10),
639 std::make_tuple(false, false, WRITE_PREPARED, 6, 10),
640 std::make_tuple(false, false, WRITE_PREPARED, 7, 10),
641 std::make_tuple(false, false, WRITE_PREPARED, 8, 10),
642 std::make_tuple(false, false, WRITE_PREPARED, 9, 10)));
643 #endif // ROCKSDB_VALGRIND_RUN
644
645 TEST_P(WritePreparedTransactionTest, CommitMapTest) {
646 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
647 assert(wp_db);
648 assert(wp_db->db_impl_);
649 size_t size = wp_db->COMMIT_CACHE_SIZE;
650 CommitEntry c = {5, 12}, e;
651 bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e);
652 ASSERT_FALSE(evicted);
653
654 // Should be able to read the same value
655 CommitEntry64b dont_care;
656 bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
657 ASSERT_TRUE(found);
658 ASSERT_EQ(c, e);
659 // Should be able to distinguish between overlapping entries
660 found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e);
661 ASSERT_TRUE(found);
662 ASSERT_NE(c.prep_seq + size, e.prep_seq);
663 // Should be able to detect non-existent entry
664 found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e);
665 ASSERT_FALSE(found);
666
667 // Reject an invalid exchange
668 CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size};
669 CommitEntry64b e2_64b(e2, wp_db->FORMAT);
670 bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e);
671 ASSERT_FALSE(exchanged);
672 // check whether it did actually reject that
673 found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e);
674 ASSERT_TRUE(found);
675 ASSERT_EQ(c, e);
676
677 // Accept a valid exchange
678 CommitEntry64b c_64b(c, wp_db->FORMAT);
679 CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1};
680 exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3);
681 ASSERT_TRUE(exchanged);
682 // check whether it did actually accepted that
683 found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
684 ASSERT_TRUE(found);
685 ASSERT_EQ(e3, e);
686
687 // Rewrite an entry
688 CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1};
689 evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e);
690 ASSERT_TRUE(evicted);
691 ASSERT_EQ(e3, e);
692 found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e);
693 ASSERT_TRUE(found);
694 ASSERT_EQ(e4, e);
695 }
696
697 TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
698 // If prepare <= snapshot < commit we should keep the entry around since its
699 // nonexistence could be interpreted as committed in the snapshot while it is
700 // not true. We keep such entries around by adding them to the
701 // old_commit_map_.
702 uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/;
703 p = 10l, c = 15l, s = 20l, ns = 21l;
704 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
705 // If we do not expect the old commit map to be updated, try also with a next
706 // snapshot that is expected to update the old commit map. This would test
707 // that MaybeUpdateOldCommitMap would not prevent us from checking the next
708 // snapshot that must be checked.
709 p = 10l, c = 15l, s = 20l, ns = 11l;
710 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
711
712 p = 10l, c = 20l, s = 20l, ns = 19l;
713 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
714 p = 10l, c = 20l, s = 20l, ns = 21l;
715 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
716
717 p = 20l, c = 20l, s = 20l, ns = 21l;
718 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
719 p = 20l, c = 20l, s = 20l, ns = 19l;
720 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
721
722 p = 10l, c = 25l, s = 20l, ns = 21l;
723 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
724
725 p = 20l, c = 25l, s = 20l, ns = 21l;
726 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
727
728 p = 21l, c = 25l, s = 20l, ns = 22l;
729 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
730 p = 21l, c = 25l, s = 20l, ns = 19l;
731 MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
732 }
733
734 // Test that the entries in old_commit_map_ get garbage collected properly
735 TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
736 const size_t snapshot_cache_bits = 0;
737 const size_t commit_cache_bits = 0;
738 DBImpl* mock_db = new DBImpl(options, dbname);
739 std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
740 mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
741
742 SequenceNumber seq = 0;
743 // Take the first snapshot that overlaps with two txn
744 auto prep_seq = ++seq;
745 wp_db->AddPrepared(prep_seq);
746 auto prep_seq2 = ++seq;
747 wp_db->AddPrepared(prep_seq2);
748 auto snap_seq1 = seq;
749 wp_db->TakeSnapshot(snap_seq1);
750 auto commit_seq = ++seq;
751 wp_db->AddCommitted(prep_seq, commit_seq);
752 wp_db->RemovePrepared(prep_seq);
753 auto commit_seq2 = ++seq;
754 wp_db->AddCommitted(prep_seq2, commit_seq2);
755 wp_db->RemovePrepared(prep_seq2);
756 // Take the 2nd and 3rd snapshot that overlap with the same txn
757 prep_seq = ++seq;
758 wp_db->AddPrepared(prep_seq);
759 auto snap_seq2 = seq;
760 wp_db->TakeSnapshot(snap_seq2);
761 seq++;
762 auto snap_seq3 = seq;
763 wp_db->TakeSnapshot(snap_seq3);
764 seq++;
765 commit_seq = ++seq;
766 wp_db->AddCommitted(prep_seq, commit_seq);
767 wp_db->RemovePrepared(prep_seq);
768 // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
769 // only item in the commit_cache_ via another commit.
770 prep_seq = ++seq;
771 wp_db->AddPrepared(prep_seq);
772 commit_seq = ++seq;
773 wp_db->AddCommitted(prep_seq, commit_seq);
774 wp_db->RemovePrepared(prep_seq);
775
776 // Verify that the evicted commit entries for all snapshots are in the
777 // old_commit_map_
778 {
779 ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
780 ReadLock rl(&wp_db->old_commit_map_mutex_);
781 ASSERT_EQ(3, wp_db->old_commit_map_.size());
782 ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size());
783 ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq2].size());
784 ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
785 }
786
787 // Verify that the 2nd snapshot is cleaned up after the release
788 wp_db->ReleaseSnapshotInternal(snap_seq2);
789 {
790 ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
791 ReadLock rl(&wp_db->old_commit_map_mutex_);
792 ASSERT_EQ(2, wp_db->old_commit_map_.size());
793 ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size());
794 ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
795 }
796
797 // Verify that the 1st snapshot is cleaned up after the release
798 wp_db->ReleaseSnapshotInternal(snap_seq1);
799 {
800 ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
801 ReadLock rl(&wp_db->old_commit_map_mutex_);
802 ASSERT_EQ(1, wp_db->old_commit_map_.size());
803 ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size());
804 }
805
806 // Verify that the 3rd snapshot is cleaned up after the release
807 wp_db->ReleaseSnapshotInternal(snap_seq3);
808 {
809 ASSERT_TRUE(wp_db->old_commit_map_empty_.load());
810 ReadLock rl(&wp_db->old_commit_map_mutex_);
811 ASSERT_EQ(0, wp_db->old_commit_map_.size());
812 }
813 }
814
815 TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
816 std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
817 600l, 700l, 800l, 900l};
818 const size_t snapshot_cache_bits = 2;
819 // Safety check to express the intended size in the test. Can be adjusted if
820 // the snapshots lists changed.
821 assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
822 DBImpl* mock_db = new DBImpl(options, dbname);
823 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
824 new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
825 SequenceNumber version = 1000l;
826 ASSERT_EQ(0, wp_db->snapshots_total_);
827 wp_db->UpdateSnapshots(snapshots, version);
828 ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_);
829 // seq numbers are chosen so that we have two of them between each two
830 // snapshots. If the diff of two consecutive seq is more than 5, there is a
831 // snapshot between them.
832 std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l,
833 355l, 450l, 455l, 550l, 555l, 650l, 655l,
834 750l, 755l, 850l, 855l, 950l, 955l};
835 assert(seqs.size() > 1);
836 for (size_t i = 0; i < seqs.size() - 1; i++) {
837 wp_db->old_commit_map_empty_ = true; // reset
838 CommitEntry commit_entry = {seqs[i], seqs[i + 1]};
839 wp_db->CheckAgainstSnapshots(commit_entry);
840 // Expect update if there is snapshot in between the prepare and commit
841 bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 &&
842 commit_entry.commit_seq >= snapshots.front() &&
843 commit_entry.prep_seq <= snapshots.back();
844 ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
845 }
846 }
847
848 // This test is too slow for travis
849 #ifndef TRAVIS
850 #ifndef ROCKSDB_VALGRIND_RUN
851 // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
852 // parallel with UpdateSnapshots.
853 TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccessTest) {
854 // We have a sync point in the method under test after checking each snapshot.
855 // If you increase the max number of snapshots in this test, more sync points
856 // in the methods must also be added.
857 const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l,
858 60l, 70l, 80l, 90l, 100l};
859 const size_t snapshot_cache_bits = 2;
860 // Safety check to express the intended size in the test. Can be adjusted if
861 // the snapshots lists changed.
862 assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size());
863 SequenceNumber version = 1000l;
864 // Choose the cache size so that the new snapshot list could replace all the
865 // existing items in the cache and also have some overflow.
866 DBImpl* mock_db = new DBImpl(options, dbname);
867 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
868 new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits));
869 const size_t extra = 2;
870 size_t loop_id = 0;
871 // Add up to extra items that do not fit into the cache
872 for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra;
873 old_size++) {
874 const std::vector<SequenceNumber> old_snapshots(
875 snapshots.begin(), snapshots.begin() + old_size);
876
877 // Each member of old snapshot might or might not appear in the new list. We
878 // create a common_snapshots for each combination.
879 size_t new_comb_cnt = size_t(1) << old_size;
880 for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) {
881 if (loop_id % split_cnt_ != split_id_) continue;
882 printf("."); // To signal progress
883 fflush(stdout);
884 std::vector<SequenceNumber> common_snapshots;
885 for (size_t i = 0; i < old_snapshots.size(); i++) {
886 if (IsInCombination(i, new_comb)) {
887 common_snapshots.push_back(old_snapshots[i]);
888 }
889 }
890 // And add some new snapshots to the common list
891 for (size_t added_snapshots = 0;
892 added_snapshots <= snapshots.size() - old_snapshots.size();
893 added_snapshots++) {
894 std::vector<SequenceNumber> new_snapshots = common_snapshots;
895 for (size_t i = 0; i < added_snapshots; i++) {
896 new_snapshots.push_back(snapshots[old_snapshots.size() + i]);
897 }
898 for (auto it = common_snapshots.begin(); it != common_snapshots.end();
899 it++) {
900 auto snapshot = *it;
901 // Create a commit entry that is around the snapshot and thus should
902 // be not be discarded
903 CommitEntry entry = {static_cast<uint64_t>(snapshot - 1),
904 snapshot + 1};
905 // The critical part is when iterating the snapshot cache. Afterwards,
906 // we are operating under the lock
907 size_t a_range =
908 std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
909 size_t b_range =
910 std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
911 // Break each thread at two points
912 for (size_t a1 = 1; a1 <= a_range; a1++) {
913 for (size_t a2 = a1 + 1; a2 <= a_range; a2++) {
914 for (size_t b1 = 1; b1 <= b_range; b1++) {
915 for (size_t b2 = b1 + 1; b2 <= b_range; b2++) {
916 SnapshotConcurrentAccessTestInternal(
917 wp_db.get(), old_snapshots, new_snapshots, entry, version,
918 a1, a2, b1, b2);
919 }
920 }
921 }
922 }
923 }
924 }
925 }
926 }
927 printf("\n");
928 }
929 #endif // ROCKSDB_VALGRIND_RUN
930 #endif // TRAVIS
931
932 // This test clarifies the contract of AdvanceMaxEvictedSeq method
933 TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) {
934 DBImpl* mock_db = new DBImpl(options, dbname);
935 std::unique_ptr<WritePreparedTxnDBMock> wp_db(
936 new WritePreparedTxnDBMock(mock_db, txn_db_options));
937
938 // 1. Set the initial values for max, prepared, and snapshots
939 SequenceNumber zero_max = 0l;
940 // Set the initial list of prepared txns
941 const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100,
942 150, 200, 250};
943 for (auto p : initial_prepared) {
944 wp_db->AddPrepared(p);
945 }
946 // This updates the max value and also set old prepared
947 SequenceNumber init_max = 100;
948 wp_db->AdvanceMaxEvictedSeq(zero_max, init_max);
949 const std::vector<SequenceNumber> initial_snapshots = {20, 40};
950 wp_db->SetDBSnapshots(initial_snapshots);
951 // This will update the internal cache of snapshots from the DB
952 wp_db->UpdateSnapshots(initial_snapshots, init_max);
953
954 // 2. Invoke AdvanceMaxEvictedSeq
955 const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300};
956 wp_db->SetDBSnapshots(latest_snapshots);
957 SequenceNumber new_max = 200;
958 wp_db->AdvanceMaxEvictedSeq(init_max, new_max);
959
960 // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
961 // a. max should be updated to new_max
962 ASSERT_EQ(wp_db->max_evicted_seq_, new_max);
963 // b. delayed prepared should contain every txn <= max and prepared should
964 // only contain txns > max
965 auto it = initial_prepared.begin();
966 for (; it != initial_prepared.end() && *it <= new_max; it++) {
967 ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it));
968 }
969 ASSERT_TRUE(wp_db->delayed_prepared_.empty());
970 for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty();
971 it++, wp_db->prepared_txns_.pop()) {
972 ASSERT_EQ(*it, wp_db->prepared_txns_.top());
973 }
974 ASSERT_TRUE(it == initial_prepared.end());
975 ASSERT_TRUE(wp_db->prepared_txns_.empty());
976 // c. snapshots should contain everything below new_max
977 auto sit = latest_snapshots.begin();
978 for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max &&
979 i < wp_db->snapshots_total_;
980 sit++, i++) {
981 ASSERT_TRUE(i < wp_db->snapshots_total_);
982 // This test is in small scale and the list of snapshots are assumed to be
983 // within the cache size limit. This is just a safety check to double check
984 // that assumption.
985 ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE);
986 ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]);
987 }
988 }
989
990 // This tests that transactions with duplicate keys perform correctly after max
991 // is advancing their prepared sequence numbers. This will not be the case if
992 // for example the txn does not add the prepared seq for the second sub-batch to
993 // the PrepareHeap structure.
994 TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
995 WriteOptions write_options;
996 TransactionOptions txn_options;
997 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
998 ASSERT_OK(txn0->SetName("xid"));
999 ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
1000 ASSERT_OK(txn0->Put(Slice("key"), Slice("value2")));
1001 ASSERT_OK(txn0->Prepare());
1002
1003 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1004 // Ensure that all the prepared sequence numbers will be removed from the
1005 // PrepareHeap.
1006 SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE;
1007 wp_db->AdvanceMaxEvictedSeq(0, new_max);
1008
1009 ReadOptions ropt;
1010 PinnableSlice pinnable_val;
1011 auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1012 ASSERT_TRUE(s.IsNotFound());
1013 delete txn0;
1014
1015 wp_db->db_impl_->FlushWAL(true);
1016 wp_db->TEST_Crash();
1017 ReOpenNoDelete();
1018 assert(db != nullptr);
1019 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1020 wp_db->AdvanceMaxEvictedSeq(0, new_max);
1021 s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
1022 ASSERT_TRUE(s.IsNotFound());
1023
1024 txn0 = db->GetTransactionByName("xid");
1025 ASSERT_OK(txn0->Rollback());
1026 delete txn0;
1027 }
1028
1029 TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) {
1030 // Given the sequential run of txns, with this timeout we should never see a
1031 // deadlock nor a timeout unless we have a key conflict, which should be
1032 // almost infeasible.
1033 txn_db_options.transaction_lock_timeout = 1000;
1034 txn_db_options.default_lock_timeout = 1000;
1035 ReOpen();
1036 FlushOptions fopt;
1037
1038 // Number of different txn types we use in this test
1039 const size_t type_cnt = 5;
1040 // The size of the first write group
1041 // TODO(myabandeh): This should be increase for pre-release tests
1042 const size_t first_group_size = 2;
1043 // Total number of txns we run in each test
1044 // TODO(myabandeh): This should be increase for pre-release tests
1045 const size_t txn_cnt = first_group_size + 1;
1046
1047 size_t base[txn_cnt + 1] = {
1048 1,
1049 };
1050 for (size_t bi = 1; bi <= txn_cnt; bi++) {
1051 base[bi] = base[bi - 1] * type_cnt;
1052 }
1053 const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt));
1054 printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n);
1055 for (size_t n = 0; n < max_n; n++, ReOpen()) {
1056 if (n % split_cnt_ != split_id_) continue;
1057 if (n % 1000 == 0) {
1058 printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
1059 }
1060 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1061 auto seq = db_impl->TEST_GetLastVisibleSequence();
1062 exp_seq = seq;
1063 // This is increased before writing the batch for commit
1064 commit_writes = 0;
1065 // This is increased before txn starts linking if it expects to do a commit
1066 // eventually
1067 expected_commits = 0;
1068 std::vector<port::Thread> threads;
1069
1070 linked = 0;
1071 std::atomic<bool> batch_formed(false);
1072 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1073 "WriteThread::EnterAsBatchGroupLeader:End",
1074 [&](void* /*arg*/) { batch_formed = true; });
1075 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1076 "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
1077 linked++;
1078 if (linked == 1) {
1079 // Wait until the others are linked too.
1080 while (linked < first_group_size) {
1081 }
1082 } else if (linked == 1 + first_group_size) {
1083 // Make the 2nd batch of the rest of writes plus any followup
1084 // commits from the first batch
1085 while (linked < txn_cnt + commit_writes) {
1086 }
1087 }
1088 // Then we will have one or more batches consisting of follow-up
1089 // commits from the 2nd batch. There is a bit of non-determinism here
1090 // but it should be tolerable.
1091 });
1092
1093 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1094 for (size_t bi = 0; bi < txn_cnt; bi++) {
1095 // get the bi-th digit in number system based on type_cnt
1096 size_t d = (n % base[bi + 1]) / base[bi];
1097 switch (d) {
1098 case 0:
1099 threads.emplace_back(txn_t0, bi);
1100 break;
1101 case 1:
1102 threads.emplace_back(txn_t1, bi);
1103 break;
1104 case 2:
1105 threads.emplace_back(txn_t2, bi);
1106 break;
1107 case 3:
1108 threads.emplace_back(txn_t3, bi);
1109 break;
1110 case 4:
1111 threads.emplace_back(txn_t3, bi);
1112 break;
1113 default:
1114 assert(false);
1115 }
1116 // wait to be linked
1117 while (linked.load() <= bi) {
1118 }
1119 // after a queue of size first_group_size
1120 if (bi + 1 == first_group_size) {
1121 while (!batch_formed) {
1122 }
1123 // to make it more deterministic, wait until the commits are linked
1124 while (linked.load() <= bi + expected_commits) {
1125 }
1126 }
1127 }
1128 for (auto& t : threads) {
1129 t.join();
1130 }
1131 if (options.two_write_queues) {
1132 // In this case none of the above scheduling tricks to deterministically
1133 // form merged batches works because the writes go to separate queues.
1134 // This would result in different write groups in each run of the test. We
1135 // still keep the test since although non-deterministic and hard to debug,
1136 // it is still useful to have.
1137 // TODO(myabandeh): Add a deterministic unit test for two_write_queues
1138 }
1139
1140 // Check if memtable inserts advanced seq number as expected
1141 seq = db_impl->TEST_GetLastVisibleSequence();
1142 ASSERT_EQ(exp_seq, seq);
1143
1144 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1145 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
1146
1147 // Check if recovery preserves the last sequence number
1148 db_impl->FlushWAL(true);
1149 ReOpenNoDelete();
1150 assert(db != nullptr);
1151 db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1152 seq = db_impl->TEST_GetLastVisibleSequence();
1153 ASSERT_EQ(exp_seq, seq);
1154
1155 // Check if flush preserves the last sequence number
1156 db_impl->Flush(fopt);
1157 seq = db_impl->GetLatestSequenceNumber();
1158 ASSERT_EQ(exp_seq, seq);
1159
1160 // Check if recovery after flush preserves the last sequence number
1161 db_impl->FlushWAL(true);
1162 ReOpenNoDelete();
1163 assert(db != nullptr);
1164 db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1165 seq = db_impl->GetLatestSequenceNumber();
1166 ASSERT_EQ(exp_seq, seq);
1167 }
1168 }
1169
1170 // Run a couple of different txns among them some uncommitted. Restart the db at
1171 // a couple points to check whether the list of uncommitted txns are recovered
1172 // properly.
1173 TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) {
1174 options.disable_auto_compactions = true;
1175 ReOpen();
1176 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1177
1178 txn_t0(0);
1179
1180 TransactionOptions txn_options;
1181 WriteOptions write_options;
1182 size_t index = 1000;
1183 Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
1184 auto istr0 = std::to_string(index);
1185 auto s = txn0->SetName("xid" + istr0);
1186 ASSERT_OK(s);
1187 s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0));
1188 ASSERT_OK(s);
1189 s = txn0->Prepare();
1190 auto prep_seq_0 = txn0->GetId();
1191
1192 txn_t1(0);
1193
1194 index++;
1195 Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
1196 auto istr1 = std::to_string(index);
1197 s = txn1->SetName("xid" + istr1);
1198 ASSERT_OK(s);
1199 s = txn1->Put(Slice("foo1" + istr1), Slice("bar"));
1200 ASSERT_OK(s);
1201 s = txn1->Prepare();
1202 auto prep_seq_1 = txn1->GetId();
1203
1204 txn_t2(0);
1205
1206 ReadOptions ropt;
1207 PinnableSlice pinnable_val;
1208 // Check the value is not committed before restart
1209 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1210 ASSERT_TRUE(s.IsNotFound());
1211 pinnable_val.Reset();
1212
1213 delete txn0;
1214 delete txn1;
1215 wp_db->db_impl_->FlushWAL(true);
1216 wp_db->TEST_Crash();
1217 ReOpenNoDelete();
1218 assert(db != nullptr);
1219 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1220 // After recovery, all the uncommitted txns (0 and 1) should be inserted into
1221 // delayed_prepared_
1222 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1223 ASSERT_FALSE(wp_db->delayed_prepared_empty_);
1224 ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
1225 ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_);
1226 {
1227 ReadLock rl(&wp_db->prepared_mutex_);
1228 ASSERT_EQ(2, wp_db->delayed_prepared_.size());
1229 ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) !=
1230 wp_db->delayed_prepared_.end());
1231 ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) !=
1232 wp_db->delayed_prepared_.end());
1233 }
1234
1235 // Check the value is still not committed after restart
1236 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1237 ASSERT_TRUE(s.IsNotFound());
1238 pinnable_val.Reset();
1239
1240 txn_t3(0);
1241
1242 // Test that a recovered txns will be properly marked committed for the next
1243 // recovery
1244 txn1 = db->GetTransactionByName("xid" + istr1);
1245 ASSERT_NE(txn1, nullptr);
1246 txn1->Commit();
1247 delete txn1;
1248
1249 index++;
1250 Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
1251 auto istr2 = std::to_string(index);
1252 s = txn2->SetName("xid" + istr2);
1253 ASSERT_OK(s);
1254 s = txn2->Put(Slice("foo2" + istr2), Slice("bar"));
1255 ASSERT_OK(s);
1256 s = txn2->Prepare();
1257 auto prep_seq_2 = txn2->GetId();
1258
1259 delete txn2;
1260 wp_db->db_impl_->FlushWAL(true);
1261 wp_db->TEST_Crash();
1262 ReOpenNoDelete();
1263 assert(db != nullptr);
1264 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1265 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1266 ASSERT_FALSE(wp_db->delayed_prepared_empty_);
1267
1268 // 0 and 2 are prepared and 1 is committed
1269 {
1270 ReadLock rl(&wp_db->prepared_mutex_);
1271 ASSERT_EQ(2, wp_db->delayed_prepared_.size());
1272 const auto& end = wp_db->delayed_prepared_.end();
1273 ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end);
1274 ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end);
1275 ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end);
1276 }
1277 ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
1278 ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_);
1279
1280 // Commit all the remaining txns
1281 txn0 = db->GetTransactionByName("xid" + istr0);
1282 ASSERT_NE(txn0, nullptr);
1283 txn0->Commit();
1284 txn2 = db->GetTransactionByName("xid" + istr2);
1285 ASSERT_NE(txn2, nullptr);
1286 txn2->Commit();
1287
1288 // Check the value is committed after commit
1289 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1290 ASSERT_TRUE(s.ok());
1291 ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
1292 pinnable_val.Reset();
1293
1294 delete txn0;
1295 delete txn2;
1296 wp_db->db_impl_->FlushWAL(true);
1297 ReOpenNoDelete();
1298 assert(db != nullptr);
1299 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1300 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1301 ASSERT_TRUE(wp_db->delayed_prepared_empty_);
1302
1303 // Check the value is still committed after recovery
1304 s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
1305 ASSERT_TRUE(s.ok());
1306 ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
1307 pinnable_val.Reset();
1308 }
1309
1310 // After recovery the commit map is empty while the max is set. The code would
1311 // go through a different path which requires a separate test.
1312 TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) {
1313 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1314 wp_db->max_evicted_seq_ = 100;
1315 ASSERT_FALSE(wp_db->IsInSnapshot(50, 40));
1316 ASSERT_TRUE(wp_db->IsInSnapshot(50, 50));
1317 ASSERT_TRUE(wp_db->IsInSnapshot(50, 100));
1318 ASSERT_TRUE(wp_db->IsInSnapshot(50, 150));
1319 ASSERT_FALSE(wp_db->IsInSnapshot(100, 80));
1320 ASSERT_TRUE(wp_db->IsInSnapshot(100, 100));
1321 ASSERT_TRUE(wp_db->IsInSnapshot(100, 150));
1322 }
1323
1324 // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
1325 // snapshot, max_committed_seq_, prepared, and commit entries.
1326 TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
1327 WriteOptions wo;
1328 // Use small commit cache to trigger lots of eviction and fast advance of
1329 // max_evicted_seq_
1330 const size_t commit_cache_bits = 3;
1331 // Same for snapshot cache size
1332 const size_t snapshot_cache_bits = 2;
1333
1334 // Take some preliminary snapshots first. This is to stress the data structure
1335 // that holds the old snapshots as it will be designed to be efficient when
1336 // only a few snapshots are below the max_evicted_seq_.
1337 for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
1338 // Leave some gap between the preliminary snapshots and the final snapshot
1339 // that we check. This should test for also different overlapping scenarios
1340 // between the last snapshot and the commits.
1341 for (int max_gap = 1; max_gap < 10; max_gap++) {
1342 // Since we do not actually write to db, we mock the seq as it would be
1343 // increased by the db. The only exception is that we need db seq to
1344 // advance for our snapshots. for which we apply a dummy put each time we
1345 // increase our mock of seq.
1346 uint64_t seq = 0;
1347 // At each step we prepare a txn and then we commit it in the next txn.
1348 // This emulates the consecutive transactions that write to the same key
1349 uint64_t cur_txn = 0;
1350 // Number of snapshots taken so far
1351 int num_snapshots = 0;
1352 // Number of gaps applied so far
1353 int gap_cnt = 0;
1354 // The final snapshot that we will inspect
1355 uint64_t snapshot = 0;
1356 bool found_committed = false;
1357 // To stress the data structure that maintain prepared txns, at each cycle
1358 // we add a new prepare txn. These do not mean to be committed for
1359 // snapshot inspection.
1360 std::set<uint64_t> prepared;
1361 // We keep the list of txns committed before we take the last snapshot.
1362 // These should be the only seq numbers that will be found in the snapshot
1363 std::set<uint64_t> committed_before;
1364 // The set of commit seq numbers to be excluded from IsInSnapshot queries
1365 std::set<uint64_t> commit_seqs;
1366 DBImpl* mock_db = new DBImpl(options, dbname);
1367 std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
1368 mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
1369 // We continue until max advances a bit beyond the snapshot.
1370 while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
1371 // do prepare for a transaction
1372 seq++;
1373 wp_db->AddPrepared(seq);
1374 prepared.insert(seq);
1375
1376 // If cur_txn is not started, do prepare for it.
1377 if (!cur_txn) {
1378 seq++;
1379 cur_txn = seq;
1380 wp_db->AddPrepared(cur_txn);
1381 } else { // else commit it
1382 seq++;
1383 wp_db->AddCommitted(cur_txn, seq);
1384 wp_db->RemovePrepared(cur_txn);
1385 commit_seqs.insert(seq);
1386 if (!snapshot) {
1387 committed_before.insert(cur_txn);
1388 }
1389 cur_txn = 0;
1390 }
1391
1392 if (num_snapshots < max_snapshots - 1) {
1393 // Take preliminary snapshots
1394 wp_db->TakeSnapshot(seq);
1395 num_snapshots++;
1396 } else if (gap_cnt < max_gap) {
1397 // Wait for some gap before taking the final snapshot
1398 gap_cnt++;
1399 } else if (!snapshot) {
1400 // Take the final snapshot if it is not already taken
1401 snapshot = seq;
1402 wp_db->TakeSnapshot(snapshot);
1403 num_snapshots++;
1404 }
1405
1406 // If the snapshot is taken, verify seq numbers visible to it. We redo
1407 // it at each cycle to test that the system is still sound when
1408 // max_evicted_seq_ advances.
1409 if (snapshot) {
1410 for (uint64_t s = 1;
1411 s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) {
1412 bool was_committed =
1413 (committed_before.find(s) != committed_before.end());
1414 bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
1415 if (was_committed != is_in_snapshot) {
1416 printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
1417 " snapshot %" PRIu64
1418 " gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
1419 max_snapshots, max_gap, seq,
1420 wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
1421 num_snapshots, s);
1422 }
1423 ASSERT_EQ(was_committed, is_in_snapshot);
1424 found_committed = found_committed || is_in_snapshot;
1425 }
1426 }
1427 }
1428 // Safety check to make sure the test actually ran
1429 ASSERT_TRUE(found_committed);
1430 // As an extra check, check if prepared set will be properly empty after
1431 // they are committed.
1432 if (cur_txn) {
1433 wp_db->AddCommitted(cur_txn, seq);
1434 wp_db->RemovePrepared(cur_txn);
1435 }
1436 for (auto p : prepared) {
1437 wp_db->AddCommitted(p, seq);
1438 wp_db->RemovePrepared(p);
1439 }
1440 ASSERT_TRUE(wp_db->delayed_prepared_.empty());
1441 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1442 }
1443 }
1444 }
1445
1446 void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
1447 PinnableSlice& exp_v, Slice key) {
1448 Status s;
1449 PinnableSlice v;
1450 s = db->Get(roptions, db->DefaultColumnFamily(), key, &v);
1451 ASSERT_TRUE(exp_s == s);
1452 ASSERT_TRUE(s.ok() || s.IsNotFound());
1453 if (s.ok()) {
1454 ASSERT_TRUE(exp_v == v);
1455 }
1456
1457 // Try with MultiGet API too
1458 std::vector<std::string> values;
1459 auto s_vec =
1460 db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values);
1461 ASSERT_EQ(1, values.size());
1462 ASSERT_EQ(1, s_vec.size());
1463 s = s_vec[0];
1464 ASSERT_TRUE(exp_s == s);
1465 ASSERT_TRUE(s.ok() || s.IsNotFound());
1466 if (s.ok()) {
1467 ASSERT_TRUE(exp_v == values[0]);
1468 }
1469 }
1470
1471 void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v,
1472 Slice key) {
1473 ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key);
1474 }
1475
1476 TEST_P(WritePreparedTransactionTest, RollbackTest) {
1477 ReadOptions roptions;
1478 WriteOptions woptions;
1479 TransactionOptions txn_options;
1480 const size_t num_keys = 4;
1481 const size_t num_values = 5;
1482 for (size_t ikey = 1; ikey <= num_keys; ikey++) {
1483 for (size_t ivalue = 0; ivalue < num_values; ivalue++) {
1484 for (bool crash : {false, true}) {
1485 ReOpen();
1486 WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1487 std::string key_str = "key" + ToString(ikey);
1488 switch (ivalue) {
1489 case 0:
1490 break;
1491 case 1:
1492 ASSERT_OK(db->Put(woptions, key_str, "initvalue1"));
1493 break;
1494 case 2:
1495 ASSERT_OK(db->Merge(woptions, key_str, "initvalue2"));
1496 break;
1497 case 3:
1498 ASSERT_OK(db->Delete(woptions, key_str));
1499 break;
1500 case 4:
1501 ASSERT_OK(db->SingleDelete(woptions, key_str));
1502 break;
1503 default:
1504 assert(0);
1505 }
1506
1507 PinnableSlice v1;
1508 auto s1 =
1509 db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1);
1510 PinnableSlice v2;
1511 auto s2 =
1512 db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2);
1513 PinnableSlice v3;
1514 auto s3 =
1515 db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3);
1516 PinnableSlice v4;
1517 auto s4 =
1518 db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4);
1519 Transaction* txn = db->BeginTransaction(woptions, txn_options);
1520 auto s = txn->SetName("xid0");
1521 ASSERT_OK(s);
1522 s = txn->Put(Slice("key1"), Slice("value1"));
1523 ASSERT_OK(s);
1524 s = txn->Merge(Slice("key2"), Slice("value2"));
1525 ASSERT_OK(s);
1526 s = txn->Delete(Slice("key3"));
1527 ASSERT_OK(s);
1528 s = txn->SingleDelete(Slice("key4"));
1529 ASSERT_OK(s);
1530 s = txn->Prepare();
1531 ASSERT_OK(s);
1532
1533 {
1534 ReadLock rl(&wp_db->prepared_mutex_);
1535 ASSERT_FALSE(wp_db->prepared_txns_.empty());
1536 ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top());
1537 }
1538
1539 ASSERT_SAME(db, s1, v1, "key1");
1540 ASSERT_SAME(db, s2, v2, "key2");
1541 ASSERT_SAME(db, s3, v3, "key3");
1542 ASSERT_SAME(db, s4, v4, "key4");
1543
1544 if (crash) {
1545 delete txn;
1546 auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1547 db_impl->FlushWAL(true);
1548 dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
1549 ReOpenNoDelete();
1550 assert(db != nullptr);
1551 wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
1552 txn = db->GetTransactionByName("xid0");
1553 ASSERT_FALSE(wp_db->delayed_prepared_empty_);
1554 ReadLock rl(&wp_db->prepared_mutex_);
1555 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1556 ASSERT_FALSE(wp_db->delayed_prepared_.empty());
1557 ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) !=
1558 wp_db->delayed_prepared_.end());
1559 }
1560
1561 ASSERT_SAME(db, s1, v1, "key1");
1562 ASSERT_SAME(db, s2, v2, "key2");
1563 ASSERT_SAME(db, s3, v3, "key3");
1564 ASSERT_SAME(db, s4, v4, "key4");
1565
1566 s = txn->Rollback();
1567 ASSERT_OK(s);
1568
1569 {
1570 ASSERT_TRUE(wp_db->delayed_prepared_empty_);
1571 ReadLock rl(&wp_db->prepared_mutex_);
1572 ASSERT_TRUE(wp_db->prepared_txns_.empty());
1573 ASSERT_TRUE(wp_db->delayed_prepared_.empty());
1574 }
1575
1576 ASSERT_SAME(db, s1, v1, "key1");
1577 ASSERT_SAME(db, s2, v2, "key2");
1578 ASSERT_SAME(db, s3, v3, "key3");
1579 ASSERT_SAME(db, s4, v4, "key4");
1580 delete txn;
1581 }
1582 }
1583 }
1584 }
1585
1586 TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) {
1587 // Use large buffer to avoid memtable flush after 1024 insertions
1588 options.write_buffer_size = 1024 * 1024;
1589 ReOpen();
1590 std::vector<KeyVersion> versions;
1591 uint64_t seq = 0;
1592 for (uint64_t i = 1; i <= 1024; i++) {
1593 std::string v = "bar" + ToString(i);
1594 ASSERT_OK(db->Put(WriteOptions(), "foo", v));
1595 VerifyKeys({{"foo", v}});
1596 seq++; // one for the key/value
1597 KeyVersion kv = {"foo", v, seq, kTypeValue};
1598 if (options.two_write_queues) {
1599 seq++; // one for the commit
1600 }
1601 versions.emplace_back(kv);
1602 }
1603 std::reverse(std::begin(versions), std::end(versions));
1604 VerifyInternalKeys(versions);
1605 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1606 db_impl->FlushWAL(true);
1607 // Use small buffer to ensure memtable flush during recovery
1608 options.write_buffer_size = 1024;
1609 ReOpenNoDelete();
1610 VerifyInternalKeys(versions);
1611 }
1612
1613 TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) {
1614 ASSERT_OK(db->Put(WriteOptions(), "foo", "bar"));
1615 VerifyKeys({{"foo", "bar"}});
1616 const Snapshot* snapshot = db->GetSnapshot();
1617 ASSERT_OK(db->Flush(FlushOptions()));
1618 // Dummy keys to avoid compaction trivially move files and get around actual
1619 // compaction logic.
1620 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
1621 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
1622 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1623 // Compaction will output keys with sequence number 0, if it is visible to
1624 // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
1625 // visible to any snapshot.
1626 VerifyKeys({{"foo", "bar"}});
1627 VerifyKeys({{"foo", "bar"}}, snapshot);
1628 VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}});
1629 db->ReleaseSnapshot(snapshot);
1630 }
1631
1632 // Compaction should not remove a key if it is not committed, and should
1633 // proceed with older versions of the key as-if the new version doesn't exist.
1634 TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
1635 options.disable_auto_compactions = true;
1636 ReOpen();
1637 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1638 // Snapshots to avoid keys get evicted.
1639 std::vector<const Snapshot*> snapshots;
1640 // Keep track of expected sequence number.
1641 SequenceNumber expected_seq = 0;
1642
1643 auto add_key = [&](std::function<Status()> func) {
1644 ASSERT_OK(func());
1645 expected_seq++;
1646 if (options.two_write_queues) {
1647 expected_seq++; // 1 for commit
1648 }
1649 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
1650 snapshots.push_back(db->GetSnapshot());
1651 };
1652
1653 // Each key here represent a standalone test case.
1654 add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); });
1655 add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); });
1656 add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); });
1657 add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); });
1658 add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); });
1659 add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); });
1660 add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); });
1661 add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); });
1662 ASSERT_OK(db->Flush(FlushOptions()));
1663 add_key([&]() { return db->Delete(WriteOptions(), "key6"); });
1664 add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); });
1665
1666 auto* transaction = db->BeginTransaction(WriteOptions());
1667 ASSERT_OK(transaction->SetName("txn"));
1668 ASSERT_OK(transaction->Put("key1", "value1_2"));
1669 ASSERT_OK(transaction->Delete("key2"));
1670 ASSERT_OK(transaction->SingleDelete("key3"));
1671 ASSERT_OK(transaction->Merge("key4", "value4_2"));
1672 ASSERT_OK(transaction->Merge("key5", "value5_3"));
1673 ASSERT_OK(transaction->Put("key6", "value6_2"));
1674 ASSERT_OK(transaction->Put("key7", "value7_2"));
1675 // Prepare but not commit.
1676 ASSERT_OK(transaction->Prepare());
1677 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
1678 ASSERT_OK(db->Flush(FlushOptions()));
1679 for (auto* s : snapshots) {
1680 db->ReleaseSnapshot(s);
1681 }
1682 // Dummy keys to avoid compaction trivially move files and get around actual
1683 // compaction logic.
1684 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
1685 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
1686 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1687 VerifyKeys({
1688 {"key1", "value1_1"},
1689 {"key2", "value2_1"},
1690 {"key3", "value3_1"},
1691 {"key4", "value4_1"},
1692 {"key5", "value5_1,value5_2"},
1693 {"key6", "NOT_FOUND"},
1694 {"key7", "NOT_FOUND"},
1695 });
1696 VerifyInternalKeys({
1697 {"key1", "value1_2", expected_seq, kTypeValue},
1698 {"key1", "value1_1", 0, kTypeValue},
1699 {"key2", "", expected_seq, kTypeDeletion},
1700 {"key2", "value2_1", 0, kTypeValue},
1701 {"key3", "", expected_seq, kTypeSingleDeletion},
1702 {"key3", "value3_1", 0, kTypeValue},
1703 {"key4", "value4_2", expected_seq, kTypeMerge},
1704 {"key4", "value4_1", 0, kTypeValue},
1705 {"key5", "value5_3", expected_seq, kTypeMerge},
1706 {"key5", "value5_1,value5_2", 0, kTypeValue},
1707 {"key6", "value6_2", expected_seq, kTypeValue},
1708 {"key7", "value7_2", expected_seq, kTypeValue},
1709 });
1710 ASSERT_OK(transaction->Commit());
1711 VerifyKeys({
1712 {"key1", "value1_2"},
1713 {"key2", "NOT_FOUND"},
1714 {"key3", "NOT_FOUND"},
1715 {"key4", "value4_1,value4_2"},
1716 {"key5", "value5_1,value5_2,value5_3"},
1717 {"key6", "value6_2"},
1718 {"key7", "value7_2"},
1719 });
1720 delete transaction;
1721 }
1722
1723 // Compaction should keep keys visible to a snapshot based on commit sequence,
1724 // not just prepare sequence.
1725 TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
1726 options.disable_auto_compactions = true;
1727 ReOpen();
1728 // Keep track of expected sequence number.
1729 SequenceNumber expected_seq = 0;
1730 auto* txn1 = db->BeginTransaction(WriteOptions());
1731 ASSERT_OK(txn1->SetName("txn1"));
1732 ASSERT_OK(txn1->Put("key1", "value1_1"));
1733 ASSERT_OK(txn1->Prepare());
1734 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
1735 ASSERT_OK(txn1->Commit());
1736 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1737 ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
1738 delete txn1;
1739 // Take a snapshots to avoid keys get evicted before compaction.
1740 const Snapshot* snapshot1 = db->GetSnapshot();
1741 auto* txn2 = db->BeginTransaction(WriteOptions());
1742 ASSERT_OK(txn2->SetName("txn2"));
1743 ASSERT_OK(txn2->Put("key2", "value2_1"));
1744 ASSERT_OK(txn2->Prepare());
1745 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
1746 // txn1 commit before snapshot2 and it is visible to snapshot2.
1747 // txn2 commit after snapshot2 and it is not visible.
1748 const Snapshot* snapshot2 = db->GetSnapshot();
1749 ASSERT_OK(txn2->Commit());
1750 ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
1751 delete txn2;
1752 // Take a snapshots to avoid keys get evicted before compaction.
1753 const Snapshot* snapshot3 = db->GetSnapshot();
1754 ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
1755 expected_seq++; // 1 for write
1756 SequenceNumber seq1 = expected_seq;
1757 if (options.two_write_queues) {
1758 expected_seq++; // 1 for commit
1759 }
1760 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
1761 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
1762 expected_seq++; // 1 for write
1763 SequenceNumber seq2 = expected_seq;
1764 if (options.two_write_queues) {
1765 expected_seq++; // 1 for commit
1766 }
1767 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
1768 ASSERT_OK(db->Flush(FlushOptions()));
1769 db->ReleaseSnapshot(snapshot1);
1770 db->ReleaseSnapshot(snapshot3);
1771 // Dummy keys to avoid compaction trivially move files and get around actual
1772 // compaction logic.
1773 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
1774 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
1775 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1776 VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
1777 VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2);
1778 VerifyInternalKeys({
1779 {"key1", "value1_2", seq1, kTypeValue},
1780 // "value1_1" is visible to snapshot2. Also keys at bottom level visible
1781 // to earliest snapshot will output with seq = 0.
1782 {"key1", "value1_1", 0, kTypeValue},
1783 {"key2", "value2_2", seq2, kTypeValue},
1784 });
1785 db->ReleaseSnapshot(snapshot2);
1786 }
1787
1788 // A more complex test to verify compaction/flush should keep keys visible
1789 // to snapshots.
1790 TEST_P(WritePreparedTransactionTest,
1791 CompactionShouldKeepSnapshotVisibleKeysRandomized) {
1792 constexpr size_t kNumTransactions = 10;
1793 constexpr size_t kNumIterations = 1000;
1794
1795 std::vector<Transaction*> transactions(kNumTransactions, nullptr);
1796 std::vector<size_t> versions(kNumTransactions, 0);
1797 std::unordered_map<std::string, std::string> current_data;
1798 std::vector<const Snapshot*> snapshots;
1799 std::vector<std::unordered_map<std::string, std::string>> snapshot_data;
1800
1801 Random rnd(1103);
1802 options.disable_auto_compactions = true;
1803 ReOpen();
1804
1805 for (size_t i = 0; i < kNumTransactions; i++) {
1806 std::string key = "key" + ToString(i);
1807 std::string value = "value0";
1808 ASSERT_OK(db->Put(WriteOptions(), key, value));
1809 current_data[key] = value;
1810 }
1811 VerifyKeys(current_data);
1812
1813 for (size_t iter = 0; iter < kNumIterations; iter++) {
1814 auto r = rnd.Next() % (kNumTransactions + 1);
1815 if (r < kNumTransactions) {
1816 std::string key = "key" + ToString(r);
1817 if (transactions[r] == nullptr) {
1818 std::string value = "value" + ToString(versions[r] + 1);
1819 auto* txn = db->BeginTransaction(WriteOptions());
1820 ASSERT_OK(txn->SetName("txn" + ToString(r)));
1821 ASSERT_OK(txn->Put(key, value));
1822 ASSERT_OK(txn->Prepare());
1823 transactions[r] = txn;
1824 } else {
1825 std::string value = "value" + ToString(++versions[r]);
1826 ASSERT_OK(transactions[r]->Commit());
1827 delete transactions[r];
1828 transactions[r] = nullptr;
1829 current_data[key] = value;
1830 }
1831 } else {
1832 auto* snapshot = db->GetSnapshot();
1833 VerifyKeys(current_data, snapshot);
1834 snapshots.push_back(snapshot);
1835 snapshot_data.push_back(current_data);
1836 }
1837 VerifyKeys(current_data);
1838 }
1839 // Take a last snapshot to test compaction with uncommitted prepared
1840 // transaction.
1841 snapshots.push_back(db->GetSnapshot());
1842 snapshot_data.push_back(current_data);
1843
1844 assert(snapshots.size() == snapshot_data.size());
1845 for (size_t i = 0; i < snapshots.size(); i++) {
1846 VerifyKeys(snapshot_data[i], snapshots[i]);
1847 }
1848 ASSERT_OK(db->Flush(FlushOptions()));
1849 for (size_t i = 0; i < snapshots.size(); i++) {
1850 VerifyKeys(snapshot_data[i], snapshots[i]);
1851 }
1852 // Dummy keys to avoid compaction trivially move files and get around actual
1853 // compaction logic.
1854 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
1855 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
1856 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1857 for (size_t i = 0; i < snapshots.size(); i++) {
1858 VerifyKeys(snapshot_data[i], snapshots[i]);
1859 }
1860 // cleanup
1861 for (size_t i = 0; i < kNumTransactions; i++) {
1862 if (transactions[i] == nullptr) {
1863 continue;
1864 }
1865 ASSERT_OK(transactions[i]->Commit());
1866 delete transactions[i];
1867 }
1868 for (size_t i = 0; i < snapshots.size(); i++) {
1869 db->ReleaseSnapshot(snapshots[i]);
1870 }
1871 }
1872
1873 // Compaction should not apply the optimization to output key with sequence
1874 // number equal to 0 if the key is not visible to earliest snapshot, based on
1875 // commit sequence number.
1876 TEST_P(WritePreparedTransactionTest,
1877 CompactionShouldKeepSequenceForUncommittedKeys) {
1878 options.disable_auto_compactions = true;
1879 ReOpen();
1880 // Keep track of expected sequence number.
1881 SequenceNumber expected_seq = 0;
1882 auto* transaction = db->BeginTransaction(WriteOptions());
1883 ASSERT_OK(transaction->SetName("txn"));
1884 ASSERT_OK(transaction->Put("key1", "value1"));
1885 ASSERT_OK(transaction->Prepare());
1886 ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
1887 SequenceNumber seq1 = expected_seq;
1888 ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
1889 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
1890 expected_seq++; // one for data
1891 if (options.two_write_queues) {
1892 expected_seq++; // one for commit
1893 }
1894 ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
1895 ASSERT_OK(db->Flush(FlushOptions()));
1896 // Dummy keys to avoid compaction trivially move files and get around actual
1897 // compaction logic.
1898 ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
1899 ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
1900 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1901 VerifyKeys({
1902 {"key1", "NOT_FOUND"},
1903 {"key2", "value2"},
1904 });
1905 VerifyInternalKeys({
1906 // "key1" has not been committed. It keeps its sequence number.
1907 {"key1", "value1", seq1, kTypeValue},
1908 // "key2" is committed and output with seq = 0.
1909 {"key2", "value2", 0, kTypeValue},
1910 });
1911 ASSERT_OK(transaction->Commit());
1912 VerifyKeys({
1913 {"key1", "value1"},
1914 {"key2", "value2"},
1915 });
1916 delete transaction;
1917 }
1918
1919 TEST_P(WritePreparedTransactionTest, Iterate) {
1920 auto verify_state = [](Iterator* iter, const std::string& key,
1921 const std::string& value) {
1922 ASSERT_TRUE(iter->Valid());
1923 ASSERT_OK(iter->status());
1924 ASSERT_EQ(key, iter->key().ToString());
1925 ASSERT_EQ(value, iter->value().ToString());
1926 };
1927
1928 auto verify_iter = [&](const std::string& expected_val) {
1929 // Get iterator from a concurrent transaction and make sure it has the
1930 // same view as an iterator from the DB.
1931 auto* txn = db->BeginTransaction(WriteOptions());
1932
1933 for (int i = 0; i < 2; i++) {
1934 Iterator* iter = (i == 0)
1935 ? db->NewIterator(ReadOptions())
1936 : txn->GetIterator(ReadOptions());
1937 // Seek
1938 iter->Seek("foo");
1939 verify_state(iter, "foo", expected_val);
1940 // Next
1941 iter->Seek("a");
1942 verify_state(iter, "a", "va");
1943 iter->Next();
1944 verify_state(iter, "foo", expected_val);
1945 // SeekForPrev
1946 iter->SeekForPrev("y");
1947 verify_state(iter, "foo", expected_val);
1948 // Prev
1949 iter->SeekForPrev("z");
1950 verify_state(iter, "z", "vz");
1951 iter->Prev();
1952 verify_state(iter, "foo", expected_val);
1953 delete iter;
1954 }
1955 delete txn;
1956 };
1957
1958 ASSERT_OK(db->Put(WriteOptions(), "foo", "v1"));
1959 auto* transaction = db->BeginTransaction(WriteOptions());
1960 ASSERT_OK(transaction->SetName("txn"));
1961 ASSERT_OK(transaction->Put("foo", "v2"));
1962 ASSERT_OK(transaction->Prepare());
1963 VerifyKeys({{"foo", "v1"}});
1964 // dummy keys
1965 ASSERT_OK(db->Put(WriteOptions(), "a", "va"));
1966 ASSERT_OK(db->Put(WriteOptions(), "z", "vz"));
1967 verify_iter("v1");
1968 ASSERT_OK(transaction->Commit());
1969 VerifyKeys({{"foo", "v2"}});
1970 verify_iter("v2");
1971 delete transaction;
1972 }
1973
1974 TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
1975 Iterator* iter = db->NewIterator(ReadOptions());
1976 ASSERT_TRUE(iter->Refresh().IsNotSupported());
1977 delete iter;
1978 }
1979
1980 // Test that updating the commit map will not affect the existing snapshots
1981 TEST_P(WritePreparedTransactionTest, AtomicCommit) {
1982 for (bool skip_prepare : {true, false}) {
1983 rocksdb::SyncPoint::GetInstance()->LoadDependency({
1984 {"WritePreparedTxnDB::AddCommitted:start",
1985 "AtomicCommit::GetSnapshot:start"},
1986 {"AtomicCommit::Get:end",
1987 "WritePreparedTxnDB::AddCommitted:start:pause"},
1988 {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
1989 {"AtomicCommit::Get2:end",
1990 "WritePreparedTxnDB::AddCommitted:end:pause:"},
1991 });
1992 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1993 rocksdb::port::Thread write_thread([&]() {
1994 if (skip_prepare) {
1995 db->Put(WriteOptions(), Slice("key"), Slice("value"));
1996 } else {
1997 Transaction* txn =
1998 db->BeginTransaction(WriteOptions(), TransactionOptions());
1999 ASSERT_OK(txn->SetName("xid"));
2000 ASSERT_OK(txn->Put(Slice("key"), Slice("value")));
2001 ASSERT_OK(txn->Prepare());
2002 ASSERT_OK(txn->Commit());
2003 delete txn;
2004 }
2005 });
2006 rocksdb::port::Thread read_thread([&]() {
2007 ReadOptions roptions;
2008 TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
2009 roptions.snapshot = db->GetSnapshot();
2010 PinnableSlice val;
2011 auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val);
2012 TEST_SYNC_POINT("AtomicCommit::Get:end");
2013 TEST_SYNC_POINT("AtomicCommit::Get2:start");
2014 ASSERT_SAME(roptions, db, s, val, "key");
2015 TEST_SYNC_POINT("AtomicCommit::Get2:end");
2016 db->ReleaseSnapshot(roptions.snapshot);
2017 });
2018 read_thread.join();
2019 write_thread.join();
2020 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2021 }
2022 }
2023
2024 // Test that we can change write policy from WriteCommitted to WritePrepared
2025 // after a clean shutdown (which would empty the WAL)
2026 TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {
2027 bool empty_wal = true;
2028 CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal);
2029 }
2030
2031 // Test that we fail fast if WAL is not emptied between changing the write
2032 // policy from WriteCommitted to WritePrepared
2033 TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) {
2034 bool empty_wal = true;
2035 CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal);
2036 }
2037
2038 // Test that we can change write policy from WritePrepare back to WriteCommitted
2039 // after a clean shutdown (which would empty the WAL)
2040 TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) {
2041 bool empty_wal = true;
2042 CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal);
2043 }
2044
2045 // Test that we fail fast if WAL is not emptied between changing the write
2046 // policy from WriteCommitted to WritePrepared
2047 TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) {
2048 bool empty_wal = true;
2049 CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal);
2050 }
2051
2052 } // namespace rocksdb
2053
2054 int main(int argc, char** argv) {
2055 ::testing::InitGoogleTest(&argc, argv);
2056 return RUN_ALL_TESTS();
2057 }
2058
2059 #else
2060 #include <stdio.h>
2061
2062 int main(int /*argc*/, char** /*argv*/) {
2063 fprintf(stderr,
2064 "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
2065 return 0;
2066 }
2067
2068 #endif // ROCKSDB_LITE