]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
11fdf7f2 | 8 | #include <algorithm> |
494da23a | 9 | #include <atomic> |
f67539c2 | 10 | #include <cinttypes> |
11fdf7f2 TL |
11 | #include <functional> |
12 | #include <string> | |
13 | #include <thread> | |
14 | ||
f67539c2 | 15 | #include "db/db_impl/db_impl.h" |
11fdf7f2 | 16 | #include "db/dbformat.h" |
20effc67 | 17 | #include "port/port.h" |
11fdf7f2 TL |
18 | #include "rocksdb/db.h" |
19 | #include "rocksdb/options.h" | |
20 | #include "rocksdb/types.h" | |
21 | #include "rocksdb/utilities/debug.h" | |
22 | #include "rocksdb/utilities/transaction.h" | |
23 | #include "rocksdb/utilities/transaction_db.h" | |
24 | #include "table/mock_table.h" | |
f67539c2 TL |
25 | #include "test_util/sync_point.h" |
26 | #include "test_util/testharness.h" | |
27 | #include "test_util/testutil.h" | |
28 | #include "test_util/transaction_test_util.h" | |
494da23a | 29 | #include "util/mutexlock.h" |
11fdf7f2 TL |
30 | #include "util/random.h" |
31 | #include "util/string_util.h" | |
20effc67 | 32 | #include "utilities/fault_injection_env.h" |
11fdf7f2 TL |
33 | #include "utilities/merge_operators.h" |
34 | #include "utilities/merge_operators/string_append/stringappend.h" | |
35 | #include "utilities/transactions/pessimistic_transaction_db.h" | |
20effc67 | 36 | #include "utilities/transactions/transaction_test.h" |
11fdf7f2 TL |
37 | #include "utilities/transactions/write_prepared_txn_db.h" |
38 | ||
11fdf7f2 TL |
39 | using std::string; |
40 | ||
f67539c2 | 41 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
42 | |
43 | using CommitEntry = WritePreparedTxnDB::CommitEntry; | |
44 | using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b; | |
45 | using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat; | |
46 | ||
47 | TEST(PreparedHeap, BasicsTest) { | |
48 | WritePreparedTxnDB::PreparedHeap heap; | |
f67539c2 TL |
49 | { |
50 | MutexLock ml(heap.push_pop_mutex()); | |
51 | heap.push(14l); | |
52 | // Test with one element | |
53 | ASSERT_EQ(14l, heap.top()); | |
54 | heap.push(24l); | |
55 | heap.push(34l); | |
56 | // Test that old min is still on top | |
57 | ASSERT_EQ(14l, heap.top()); | |
58 | heap.push(44l); | |
59 | heap.push(54l); | |
60 | heap.push(64l); | |
61 | heap.push(74l); | |
62 | heap.push(84l); | |
63 | } | |
11fdf7f2 TL |
64 | // Test that old min is still on top |
65 | ASSERT_EQ(14l, heap.top()); | |
11fdf7f2 TL |
66 | heap.erase(24l); |
67 | // Test that old min is still on top | |
f67539c2 | 68 | ASSERT_EQ(14l, heap.top()); |
11fdf7f2 | 69 | heap.erase(14l); |
11fdf7f2 TL |
70 | // Test that the new comes to the top after multiple erase |
71 | ASSERT_EQ(34l, heap.top()); | |
72 | heap.erase(34l); | |
73 | // Test that the new comes to the top after single erase | |
74 | ASSERT_EQ(44l, heap.top()); | |
75 | heap.erase(54l); | |
76 | ASSERT_EQ(44l, heap.top()); | |
77 | heap.pop(); // pop 44l | |
78 | // Test that the erased items are ignored after pop | |
79 | ASSERT_EQ(64l, heap.top()); | |
80 | heap.erase(44l); | |
81 | // Test that erasing an already popped item would work | |
82 | ASSERT_EQ(64l, heap.top()); | |
83 | heap.erase(84l); | |
84 | ASSERT_EQ(64l, heap.top()); | |
f67539c2 TL |
85 | { |
86 | MutexLock ml(heap.push_pop_mutex()); | |
87 | heap.push(85l); | |
88 | heap.push(86l); | |
89 | heap.push(87l); | |
90 | heap.push(88l); | |
91 | heap.push(89l); | |
92 | } | |
11fdf7f2 TL |
93 | heap.erase(87l); |
94 | heap.erase(85l); | |
95 | heap.erase(89l); | |
96 | heap.erase(86l); | |
97 | heap.erase(88l); | |
98 | // Test top remains the same after a random order of many erases | |
99 | ASSERT_EQ(64l, heap.top()); | |
100 | heap.pop(); | |
101 | // Test that pop works with a series of random pending erases | |
102 | ASSERT_EQ(74l, heap.top()); | |
103 | ASSERT_FALSE(heap.empty()); | |
104 | heap.pop(); | |
105 | // Test that empty works | |
106 | ASSERT_TRUE(heap.empty()); | |
107 | } | |
108 | ||
109 | // This is a scenario reconstructed from a buggy trace. Test that the bug does | |
110 | // not resurface again. | |
111 | TEST(PreparedHeap, EmptyAtTheEnd) { | |
112 | WritePreparedTxnDB::PreparedHeap heap; | |
f67539c2 TL |
113 | { |
114 | MutexLock ml(heap.push_pop_mutex()); | |
115 | heap.push(40l); | |
116 | } | |
11fdf7f2 TL |
117 | ASSERT_EQ(40l, heap.top()); |
118 | // Although not a recommended scenario, we must be resilient against erase | |
119 | // without a prior push. | |
120 | heap.erase(50l); | |
121 | ASSERT_EQ(40l, heap.top()); | |
f67539c2 TL |
122 | { |
123 | MutexLock ml(heap.push_pop_mutex()); | |
124 | heap.push(60l); | |
125 | } | |
11fdf7f2 TL |
126 | ASSERT_EQ(40l, heap.top()); |
127 | ||
128 | heap.erase(60l); | |
129 | ASSERT_EQ(40l, heap.top()); | |
130 | heap.erase(40l); | |
131 | ASSERT_TRUE(heap.empty()); | |
132 | ||
f67539c2 TL |
133 | { |
134 | MutexLock ml(heap.push_pop_mutex()); | |
135 | heap.push(40l); | |
136 | } | |
11fdf7f2 TL |
137 | ASSERT_EQ(40l, heap.top()); |
138 | heap.erase(50l); | |
139 | ASSERT_EQ(40l, heap.top()); | |
f67539c2 TL |
140 | { |
141 | MutexLock ml(heap.push_pop_mutex()); | |
142 | heap.push(60l); | |
143 | } | |
11fdf7f2 TL |
144 | ASSERT_EQ(40l, heap.top()); |
145 | ||
146 | heap.erase(40l); | |
147 | // Test that the erase has not emptied the heap (we had a bug doing that) | |
148 | ASSERT_FALSE(heap.empty()); | |
149 | ASSERT_EQ(60l, heap.top()); | |
150 | heap.erase(60l); | |
151 | ASSERT_TRUE(heap.empty()); | |
152 | } | |
153 | ||
154 | // Generate random order of PreparedHeap access and test that the heap will be | |
155 | // successfully emptied at the end. | |
156 | TEST(PreparedHeap, Concurrent) { | |
157 | const size_t t_cnt = 10; | |
f67539c2 | 158 | ROCKSDB_NAMESPACE::port::Thread t[t_cnt + 1]; |
11fdf7f2 TL |
159 | WritePreparedTxnDB::PreparedHeap heap; |
160 | port::RWMutex prepared_mutex; | |
f67539c2 | 161 | std::atomic<size_t> last; |
11fdf7f2 TL |
162 | |
163 | for (size_t n = 0; n < 100; n++) { | |
f67539c2 TL |
164 | last = 0; |
165 | t[0] = ROCKSDB_NAMESPACE::port::Thread([&]() { | |
166 | Random rnd(1103); | |
167 | for (size_t seq = 1; seq <= t_cnt; seq++) { | |
168 | // This is not recommended usage but we should be resilient against it. | |
169 | bool skip_push = rnd.OneIn(5); | |
11fdf7f2 | 170 | if (!skip_push) { |
f67539c2 TL |
171 | MutexLock ml(heap.push_pop_mutex()); |
172 | std::this_thread::yield(); | |
11fdf7f2 | 173 | heap.push(seq); |
f67539c2 | 174 | last.store(seq); |
11fdf7f2 | 175 | } |
f67539c2 TL |
176 | } |
177 | }); | |
178 | for (size_t i = 1; i <= t_cnt; i++) { | |
179 | t[i] = | |
180 | ROCKSDB_NAMESPACE::port::Thread([&heap, &prepared_mutex, &last, i]() { | |
181 | auto seq = i; | |
182 | do { | |
183 | std::this_thread::yield(); | |
184 | } while (last.load() < seq); | |
185 | WriteLock wl(&prepared_mutex); | |
186 | heap.erase(seq); | |
187 | }); | |
11fdf7f2 | 188 | } |
f67539c2 | 189 | for (size_t i = 0; i <= t_cnt; i++) { |
11fdf7f2 TL |
190 | t[i].join(); |
191 | } | |
192 | ASSERT_TRUE(heap.empty()); | |
193 | } | |
194 | } | |
195 | ||
196 | // Test that WriteBatchWithIndex correctly counts the number of sub-batches | |
197 | TEST(WriteBatchWithIndex, SubBatchCnt) { | |
198 | ColumnFamilyOptions cf_options; | |
199 | std::string cf_name = "two"; | |
200 | DB* db; | |
201 | Options options; | |
202 | options.create_if_missing = true; | |
203 | const std::string dbname = test::PerThreadDBPath("transaction_testdb"); | |
204 | DestroyDB(dbname, options); | |
205 | ASSERT_OK(DB::Open(options, dbname, &db)); | |
206 | ColumnFamilyHandle* cf_handle = nullptr; | |
207 | ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); | |
208 | WriteOptions write_options; | |
209 | size_t batch_cnt = 1; | |
210 | size_t save_points = 0; | |
211 | std::vector<size_t> batch_cnt_at; | |
212 | WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true, | |
213 | 0); | |
214 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
215 | batch_cnt_at.push_back(batch_cnt); | |
216 | batch.SetSavePoint(); | |
217 | save_points++; | |
218 | batch.Put(Slice("key"), Slice("value")); | |
219 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
220 | batch_cnt_at.push_back(batch_cnt); | |
221 | batch.SetSavePoint(); | |
222 | save_points++; | |
223 | batch.Put(Slice("key2"), Slice("value2")); | |
224 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
225 | // duplicate the keys | |
226 | batch_cnt_at.push_back(batch_cnt); | |
227 | batch.SetSavePoint(); | |
228 | save_points++; | |
229 | batch.Put(Slice("key"), Slice("value3")); | |
230 | batch_cnt++; | |
231 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
232 | // duplicate the 2nd key. It should not be counted duplicate since a | |
233 | // sub-patch is cut after the last duplicate. | |
234 | batch_cnt_at.push_back(batch_cnt); | |
235 | batch.SetSavePoint(); | |
236 | save_points++; | |
237 | batch.Put(Slice("key2"), Slice("value4")); | |
238 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
239 | // duplicate the keys but in a different cf. It should not be counted as | |
240 | // duplicate keys | |
241 | batch_cnt_at.push_back(batch_cnt); | |
242 | batch.SetSavePoint(); | |
243 | save_points++; | |
244 | batch.Put(cf_handle, Slice("key"), Slice("value5")); | |
245 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
246 | ||
247 | // Test that the number of sub-batches matches what we count with | |
248 | // SubBatchCounter | |
249 | std::map<uint32_t, const Comparator*> comparators; | |
250 | comparators[0] = db->DefaultColumnFamily()->GetComparator(); | |
251 | comparators[cf_handle->GetID()] = cf_handle->GetComparator(); | |
252 | SubBatchCounter counter(comparators); | |
253 | ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter)); | |
254 | ASSERT_EQ(batch_cnt, counter.BatchCount()); | |
255 | ||
256 | // Test that RollbackToSavePoint will properly resets the number of | |
257 | // sub-batches | |
258 | for (size_t i = save_points; i > 0; i--) { | |
259 | batch.RollbackToSavePoint(); | |
260 | ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt()); | |
261 | } | |
262 | ||
263 | // Test the count is right with random batches | |
264 | { | |
265 | const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms | |
266 | Random rnd(1131); | |
267 | std::string keys[TOTAL_KEYS]; | |
268 | for (size_t k = 0; k < TOTAL_KEYS; k++) { | |
269 | int len = static_cast<int>(rnd.Uniform(50)); | |
270 | keys[k] = test::RandomKey(&rnd, len); | |
271 | } | |
272 | for (size_t i = 0; i < 1000; i++) { // 1000 random batches | |
273 | WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(), | |
274 | 0, true, 0); | |
275 | for (size_t k = 0; k < 10; k++) { // 10 key per batch | |
276 | size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS)); | |
277 | Slice key = Slice(keys[ki]); | |
20effc67 TL |
278 | std::string tmp = rnd.RandomString(16); |
279 | Slice value = Slice(tmp); | |
11fdf7f2 TL |
280 | rndbatch.Put(key, value); |
281 | } | |
282 | SubBatchCounter batch_counter(comparators); | |
283 | ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter)); | |
284 | ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount()); | |
285 | } | |
286 | } | |
287 | ||
288 | delete cf_handle; | |
289 | delete db; | |
290 | } | |
291 | ||
292 | TEST(CommitEntry64b, BasicTest) { | |
293 | const size_t INDEX_BITS = static_cast<size_t>(21); | |
294 | const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS); | |
295 | const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS)); | |
296 | ||
297 | // zero-initialized CommitEntry64b should indicate an empty entry | |
298 | CommitEntry64b empty_entry64b; | |
299 | uint64_t empty_index = 11ul; | |
300 | CommitEntry empty_entry; | |
301 | bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT); | |
302 | ASSERT_FALSE(ok); | |
303 | ||
304 | // the zero entry is reserved for un-initialized entries | |
305 | const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1; | |
306 | // Samples over the numbers that are covered by that many index bits | |
307 | std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}}; | |
308 | // Samples over the numbers that are covered by that many commit bits | |
309 | std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}}; | |
310 | // Iterate over prepare numbers that have i) cover all bits of a sequence | |
311 | // number, and ii) include some bits that fall into the range of index or | |
312 | // commit bits | |
313 | for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) { | |
314 | for (uint64_t i : is) { | |
315 | for (uint64_t d : ds) { | |
316 | uint64_t p = base + i + d; | |
317 | for (uint64_t c : {p, p + d / 2, p + d}) { | |
318 | uint64_t index = p % INDEX_SIZE; | |
319 | CommitEntry before(p, c), after; | |
320 | CommitEntry64b entry64b(before, FORMAT); | |
321 | ok = entry64b.Parse(index, &after, FORMAT); | |
322 | ASSERT_TRUE(ok); | |
323 | if (!(before == after)) { | |
324 | printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64 | |
325 | " c %" PRIu64 " index %" PRIu64 "\n", | |
326 | base, i, d, p, c, index); | |
327 | } | |
328 | ASSERT_EQ(before, after); | |
329 | } | |
330 | } | |
331 | } | |
332 | } | |
333 | } | |
334 | ||
335 | class WritePreparedTxnDBMock : public WritePreparedTxnDB { | |
336 | public: | |
337 | WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt) | |
338 | : WritePreparedTxnDB(db_impl, opt) {} | |
11fdf7f2 TL |
339 | void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) { |
340 | snapshots_ = snapshots; | |
341 | } | |
342 | void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); } | |
343 | ||
344 | protected: | |
494da23a | 345 | const std::vector<SequenceNumber> GetSnapshotListFromDB( |
11fdf7f2 TL |
346 | SequenceNumber /* unused */) override { |
347 | return snapshots_; | |
348 | } | |
349 | ||
350 | private: | |
351 | std::vector<SequenceNumber> snapshots_; | |
352 | }; | |
353 | ||
354 | class WritePreparedTransactionTestBase : public TransactionTestBase { | |
355 | public: | |
356 | WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue, | |
f67539c2 TL |
357 | TxnDBWritePolicy write_policy, |
358 | WriteOrdering write_ordering) | |
359 | : TransactionTestBase(use_stackable_db, two_write_queue, write_policy, | |
360 | write_ordering){}; | |
11fdf7f2 TL |
361 | |
362 | protected: | |
494da23a TL |
363 | void UpdateTransactionDBOptions(size_t snapshot_cache_bits, |
364 | size_t commit_cache_bits) { | |
365 | txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits; | |
366 | txn_db_options.wp_commit_cache_bits = commit_cache_bits; | |
367 | } | |
368 | void UpdateTransactionDBOptions(size_t snapshot_cache_bits) { | |
369 | txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits; | |
370 | } | |
11fdf7f2 TL |
371 | // If expect_update is set, check if it actually updated old_commit_map_. If |
372 | // it did not and yet suggested not to check the next snapshot, do the | |
373 | // opposite to check if it was not a bad suggestion. | |
374 | void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit, | |
375 | uint64_t snapshot, | |
376 | uint64_t next_snapshot, | |
377 | bool expect_update) { | |
378 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
379 | // reset old_commit_map_empty_ so that its value indicate whether | |
380 | // old_commit_map_ was updated | |
381 | wp_db->old_commit_map_empty_ = true; | |
382 | bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot, | |
383 | snapshot < next_snapshot); | |
384 | if (expect_update == wp_db->old_commit_map_empty_) { | |
385 | printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64 | |
386 | " next: %" PRIu64 "\n", | |
387 | prepare, commit, snapshot, next_snapshot); | |
388 | } | |
389 | EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_); | |
390 | if (!check_next && wp_db->old_commit_map_empty_) { | |
391 | // do the opposite to make sure it was not a bad suggestion | |
392 | const bool dont_care_bool = true; | |
393 | wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot, | |
394 | dont_care_bool); | |
395 | if (!wp_db->old_commit_map_empty_) { | |
396 | printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64 | |
397 | " next: %" PRIu64 "\n", | |
398 | prepare, commit, snapshot, next_snapshot); | |
399 | } | |
400 | EXPECT_TRUE(wp_db->old_commit_map_empty_); | |
401 | } | |
402 | } | |
403 | ||
404 | // Test that a CheckAgainstSnapshots thread reading old_snapshots will not | |
405 | // miss a snapshot because of a concurrent update by UpdateSnapshots that is | |
406 | // writing new_snapshots. Both threads are broken at two points. The sync | |
407 | // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry | |
408 | // entry is expected to be vital for one of the snapshots that is common | |
409 | // between the old and new list of snapshots. | |
410 | void SnapshotConcurrentAccessTestInternal( | |
411 | WritePreparedTxnDB* wp_db, | |
412 | const std::vector<SequenceNumber>& old_snapshots, | |
413 | const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry, | |
414 | SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) { | |
415 | // First reset the snapshot list | |
416 | const std::vector<SequenceNumber> empty_snapshots; | |
417 | wp_db->old_commit_map_empty_ = true; | |
418 | wp_db->UpdateSnapshots(empty_snapshots, ++version); | |
419 | // Then initialize it with the old_snapshots | |
420 | wp_db->UpdateSnapshots(old_snapshots, ++version); | |
421 | ||
422 | // Starting from the first thread, cut each thread at two points | |
f67539c2 | 423 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
11fdf7f2 TL |
424 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1), |
425 | "WritePreparedTxnDB::UpdateSnapshots:s:start"}, | |
426 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1), | |
427 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)}, | |
428 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2), | |
429 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)}, | |
430 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2), | |
431 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)}, | |
432 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end", | |
433 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)}, | |
434 | }); | |
f67539c2 | 435 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
11fdf7f2 TL |
436 | { |
437 | ASSERT_TRUE(wp_db->old_commit_map_empty_); | |
f67539c2 | 438 | ROCKSDB_NAMESPACE::port::Thread t1( |
11fdf7f2 | 439 | [&]() { wp_db->UpdateSnapshots(new_snapshots, version); }); |
f67539c2 TL |
440 | ROCKSDB_NAMESPACE::port::Thread t2( |
441 | [&]() { wp_db->CheckAgainstSnapshots(entry); }); | |
11fdf7f2 TL |
442 | t1.join(); |
443 | t2.join(); | |
444 | ASSERT_FALSE(wp_db->old_commit_map_empty_); | |
445 | } | |
f67539c2 | 446 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
11fdf7f2 TL |
447 | |
448 | wp_db->old_commit_map_empty_ = true; | |
449 | wp_db->UpdateSnapshots(empty_snapshots, ++version); | |
450 | wp_db->UpdateSnapshots(old_snapshots, ++version); | |
451 | // Starting from the second thread, cut each thread at two points | |
f67539c2 | 452 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
11fdf7f2 TL |
453 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1), |
454 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"}, | |
455 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1), | |
456 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)}, | |
457 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2), | |
458 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)}, | |
459 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2), | |
460 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)}, | |
461 | {"WritePreparedTxnDB::UpdateSnapshots:p:end", | |
462 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)}, | |
463 | }); | |
f67539c2 | 464 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
11fdf7f2 TL |
465 | { |
466 | ASSERT_TRUE(wp_db->old_commit_map_empty_); | |
f67539c2 | 467 | ROCKSDB_NAMESPACE::port::Thread t1( |
11fdf7f2 | 468 | [&]() { wp_db->UpdateSnapshots(new_snapshots, version); }); |
f67539c2 TL |
469 | ROCKSDB_NAMESPACE::port::Thread t2( |
470 | [&]() { wp_db->CheckAgainstSnapshots(entry); }); | |
11fdf7f2 TL |
471 | t1.join(); |
472 | t2.join(); | |
473 | ASSERT_FALSE(wp_db->old_commit_map_empty_); | |
474 | } | |
f67539c2 | 475 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
11fdf7f2 TL |
476 | } |
477 | ||
478 | // Verify value of keys. | |
479 | void VerifyKeys(const std::unordered_map<std::string, std::string>& data, | |
480 | const Snapshot* snapshot = nullptr) { | |
481 | std::string value; | |
482 | ReadOptions read_options; | |
483 | read_options.snapshot = snapshot; | |
484 | for (auto& kv : data) { | |
485 | auto s = db->Get(read_options, kv.first, &value); | |
486 | ASSERT_TRUE(s.ok() || s.IsNotFound()); | |
487 | if (s.ok()) { | |
488 | if (kv.second != value) { | |
489 | printf("key = %s\n", kv.first.c_str()); | |
490 | } | |
491 | ASSERT_EQ(kv.second, value); | |
492 | } else { | |
493 | ASSERT_EQ(kv.second, "NOT_FOUND"); | |
494 | } | |
495 | ||
496 | // Try with MultiGet API too | |
497 | std::vector<std::string> values; | |
498 | auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()}, | |
499 | {kv.first}, &values); | |
500 | ASSERT_EQ(1, values.size()); | |
501 | ASSERT_EQ(1, s_vec.size()); | |
502 | s = s_vec[0]; | |
503 | ASSERT_TRUE(s.ok() || s.IsNotFound()); | |
504 | if (s.ok()) { | |
505 | ASSERT_TRUE(kv.second == values[0]); | |
506 | } else { | |
507 | ASSERT_EQ(kv.second, "NOT_FOUND"); | |
508 | } | |
509 | } | |
510 | } | |
511 | ||
512 | // Verify all versions of keys. | |
513 | void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) { | |
514 | std::vector<KeyVersion> versions; | |
515 | const size_t kMaxKeys = 100000; | |
516 | ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key, | |
517 | expected_versions.back().user_key, kMaxKeys, | |
518 | &versions)); | |
519 | ASSERT_EQ(expected_versions.size(), versions.size()); | |
520 | for (size_t i = 0; i < versions.size(); i++) { | |
521 | ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key); | |
522 | ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence); | |
523 | ASSERT_EQ(expected_versions[i].type, versions[i].type); | |
524 | if (versions[i].type != kTypeDeletion && | |
525 | versions[i].type != kTypeSingleDeletion) { | |
526 | ASSERT_EQ(expected_versions[i].value, versions[i].value); | |
527 | } | |
528 | // Range delete not supported. | |
529 | assert(expected_versions[i].type != kTypeRangeDeletion); | |
530 | } | |
531 | } | |
532 | }; | |
533 | ||
534 | class WritePreparedTransactionTest | |
535 | : public WritePreparedTransactionTestBase, | |
536 | virtual public ::testing::WithParamInterface< | |
f67539c2 | 537 | std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering>> { |
11fdf7f2 TL |
538 | public: |
539 | WritePreparedTransactionTest() | |
f67539c2 TL |
540 | : WritePreparedTransactionTestBase( |
541 | std::get<0>(GetParam()), std::get<1>(GetParam()), | |
542 | std::get<2>(GetParam()), std::get<3>(GetParam())){}; | |
11fdf7f2 TL |
543 | }; |
544 | ||
545 | #ifndef ROCKSDB_VALGRIND_RUN | |
546 | class SnapshotConcurrentAccessTest | |
547 | : public WritePreparedTransactionTestBase, | |
f67539c2 TL |
548 | virtual public ::testing::WithParamInterface<std::tuple< |
549 | bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> { | |
11fdf7f2 TL |
550 | public: |
551 | SnapshotConcurrentAccessTest() | |
f67539c2 TL |
552 | : WritePreparedTransactionTestBase( |
553 | std::get<0>(GetParam()), std::get<1>(GetParam()), | |
554 | std::get<2>(GetParam()), std::get<3>(GetParam())), | |
555 | split_id_(std::get<4>(GetParam())), | |
556 | split_cnt_(std::get<5>(GetParam())){}; | |
11fdf7f2 TL |
557 | |
558 | protected: | |
559 | // A test is split into split_cnt_ tests, each identified with split_id_ where | |
560 | // 0 <= split_id_ < split_cnt_ | |
561 | size_t split_id_; | |
562 | size_t split_cnt_; | |
563 | }; | |
564 | #endif // ROCKSDB_VALGRIND_RUN | |
565 | ||
566 | class SeqAdvanceConcurrentTest | |
567 | : public WritePreparedTransactionTestBase, | |
f67539c2 TL |
568 | virtual public ::testing::WithParamInterface<std::tuple< |
569 | bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> { | |
11fdf7f2 TL |
570 | public: |
571 | SeqAdvanceConcurrentTest() | |
f67539c2 TL |
572 | : WritePreparedTransactionTestBase( |
573 | std::get<0>(GetParam()), std::get<1>(GetParam()), | |
574 | std::get<2>(GetParam()), std::get<3>(GetParam())), | |
575 | split_id_(std::get<4>(GetParam())), | |
20effc67 TL |
576 | split_cnt_(std::get<5>(GetParam())) { |
577 | special_env.skip_fsync_ = true; | |
578 | }; | |
11fdf7f2 TL |
579 | |
580 | protected: | |
581 | // A test is split into split_cnt_ tests, each identified with split_id_ where | |
582 | // 0 <= split_id_ < split_cnt_ | |
583 | size_t split_id_; | |
584 | size_t split_cnt_; | |
585 | }; | |
586 | ||
587 | INSTANTIATE_TEST_CASE_P( | |
f67539c2 TL |
588 | WritePreparedTransaction, WritePreparedTransactionTest, |
589 | ::testing::Values( | |
590 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite), | |
591 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite), | |
592 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite))); | |
11fdf7f2 TL |
593 | |
594 | #ifndef ROCKSDB_VALGRIND_RUN | |
595 | INSTANTIATE_TEST_CASE_P( | |
596 | TwoWriteQueues, SnapshotConcurrentAccessTest, | |
f67539c2 TL |
597 | ::testing::Values( |
598 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 20), | |
599 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 20), | |
600 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 20), | |
601 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 20), | |
602 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 20), | |
603 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 20), | |
604 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 20), | |
605 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 20), | |
606 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 20), | |
607 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 20), | |
608 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 10, 20), | |
609 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 11, 20), | |
610 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 12, 20), | |
611 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 13, 20), | |
612 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 14, 20), | |
613 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 15, 20), | |
614 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 16, 20), | |
615 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 17, 20), | |
616 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 18, 20), | |
617 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 19, 20), | |
618 | ||
619 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 20), | |
620 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 20), | |
621 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 20), | |
622 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 20), | |
623 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 20), | |
624 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 20), | |
625 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 20), | |
626 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 20), | |
627 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 20), | |
628 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 20), | |
629 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 10, 20), | |
630 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 11, 20), | |
631 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 12, 20), | |
632 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 13, 20), | |
633 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 14, 20), | |
634 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 15, 20), | |
635 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 16, 20), | |
636 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 17, 20), | |
637 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 18, 20), | |
638 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 19, 20))); | |
11fdf7f2 TL |
639 | |
640 | INSTANTIATE_TEST_CASE_P( | |
641 | OneWriteQueue, SnapshotConcurrentAccessTest, | |
f67539c2 TL |
642 | ::testing::Values( |
643 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 20), | |
644 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 20), | |
645 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 20), | |
646 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 20), | |
647 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 20), | |
648 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 20), | |
649 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 20), | |
650 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 20), | |
651 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 20), | |
652 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 20), | |
653 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 10, 20), | |
654 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 11, 20), | |
655 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 12, 20), | |
656 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 13, 20), | |
657 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 14, 20), | |
658 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 15, 20), | |
659 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 16, 20), | |
660 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 17, 20), | |
661 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 18, 20), | |
662 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 19, 20))); | |
11fdf7f2 TL |
663 | |
664 | INSTANTIATE_TEST_CASE_P( | |
665 | TwoWriteQueues, SeqAdvanceConcurrentTest, | |
f67539c2 TL |
666 | ::testing::Values( |
667 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 10), | |
668 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 10), | |
669 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 10), | |
670 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 10), | |
671 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 10), | |
672 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 10), | |
673 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 10), | |
674 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 10), | |
675 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 10), | |
676 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 10), | |
677 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 10), | |
678 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 10), | |
679 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 10), | |
680 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 10), | |
681 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 10), | |
682 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 10), | |
683 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 10), | |
684 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 10), | |
685 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 10), | |
686 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 10))); | |
11fdf7f2 TL |
687 | |
688 | INSTANTIATE_TEST_CASE_P( | |
689 | OneWriteQueue, SeqAdvanceConcurrentTest, | |
f67539c2 TL |
690 | ::testing::Values( |
691 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 10), | |
692 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 10), | |
693 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 10), | |
694 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 10), | |
695 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 10), | |
696 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 10), | |
697 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 10), | |
698 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 10), | |
699 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 10), | |
700 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 10))); | |
11fdf7f2 TL |
701 | #endif // ROCKSDB_VALGRIND_RUN |
702 | ||
f67539c2 | 703 | TEST_P(WritePreparedTransactionTest, CommitMap) { |
11fdf7f2 TL |
704 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
705 | assert(wp_db); | |
706 | assert(wp_db->db_impl_); | |
707 | size_t size = wp_db->COMMIT_CACHE_SIZE; | |
708 | CommitEntry c = {5, 12}, e; | |
709 | bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e); | |
710 | ASSERT_FALSE(evicted); | |
711 | ||
712 | // Should be able to read the same value | |
713 | CommitEntry64b dont_care; | |
714 | bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e); | |
715 | ASSERT_TRUE(found); | |
716 | ASSERT_EQ(c, e); | |
717 | // Should be able to distinguish between overlapping entries | |
718 | found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e); | |
719 | ASSERT_TRUE(found); | |
720 | ASSERT_NE(c.prep_seq + size, e.prep_seq); | |
721 | // Should be able to detect non-existent entry | |
722 | found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e); | |
723 | ASSERT_FALSE(found); | |
724 | ||
725 | // Reject an invalid exchange | |
726 | CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size}; | |
727 | CommitEntry64b e2_64b(e2, wp_db->FORMAT); | |
728 | bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e); | |
729 | ASSERT_FALSE(exchanged); | |
730 | // check whether it did actually reject that | |
731 | found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e); | |
732 | ASSERT_TRUE(found); | |
733 | ASSERT_EQ(c, e); | |
734 | ||
735 | // Accept a valid exchange | |
736 | CommitEntry64b c_64b(c, wp_db->FORMAT); | |
737 | CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1}; | |
738 | exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3); | |
739 | ASSERT_TRUE(exchanged); | |
740 | // check whether it did actually accepted that | |
741 | found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e); | |
742 | ASSERT_TRUE(found); | |
743 | ASSERT_EQ(e3, e); | |
744 | ||
745 | // Rewrite an entry | |
746 | CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1}; | |
747 | evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e); | |
748 | ASSERT_TRUE(evicted); | |
749 | ASSERT_EQ(e3, e); | |
750 | found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e); | |
751 | ASSERT_TRUE(found); | |
752 | ASSERT_EQ(e4, e); | |
753 | } | |
754 | ||
755 | TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) { | |
756 | // If prepare <= snapshot < commit we should keep the entry around since its | |
757 | // nonexistence could be interpreted as committed in the snapshot while it is | |
758 | // not true. We keep such entries around by adding them to the | |
759 | // old_commit_map_. | |
760 | uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/; | |
761 | p = 10l, c = 15l, s = 20l, ns = 21l; | |
762 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
763 | // If we do not expect the old commit map to be updated, try also with a next | |
764 | // snapshot that is expected to update the old commit map. This would test | |
765 | // that MaybeUpdateOldCommitMap would not prevent us from checking the next | |
766 | // snapshot that must be checked. | |
767 | p = 10l, c = 15l, s = 20l, ns = 11l; | |
768 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
769 | ||
770 | p = 10l, c = 20l, s = 20l, ns = 19l; | |
771 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
772 | p = 10l, c = 20l, s = 20l, ns = 21l; | |
773 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
774 | ||
775 | p = 20l, c = 20l, s = 20l, ns = 21l; | |
776 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
777 | p = 20l, c = 20l, s = 20l, ns = 19l; | |
778 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
779 | ||
780 | p = 10l, c = 25l, s = 20l, ns = 21l; | |
781 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true); | |
782 | ||
783 | p = 20l, c = 25l, s = 20l, ns = 21l; | |
784 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true); | |
785 | ||
786 | p = 21l, c = 25l, s = 20l, ns = 22l; | |
787 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
788 | p = 21l, c = 25l, s = 20l, ns = 19l; | |
789 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
790 | } | |
791 | ||
f67539c2 TL |
792 | // Trigger the condition where some old memtables are skipped when doing |
793 | // TransactionUtil::CheckKey(), and make sure the result is still correct. | |
794 | TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) { | |
795 | const int kAttemptHistoryMemtable = 0; | |
796 | const int kAttemptImmMemTable = 1; | |
797 | for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable; | |
798 | attempt++) { | |
799 | options.max_write_buffer_number_to_maintain = 3; | |
800 | ReOpen(); | |
801 | ||
802 | WriteOptions write_options; | |
803 | ReadOptions read_options; | |
804 | TransactionOptions txn_options; | |
805 | txn_options.set_snapshot = true; | |
806 | string value; | |
807 | Status s; | |
808 | ||
809 | ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar"))); | |
810 | ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar"))); | |
811 | ||
812 | Transaction* txn = db->BeginTransaction(write_options, txn_options); | |
813 | ASSERT_TRUE(txn != nullptr); | |
814 | ASSERT_OK(txn->SetName("txn")); | |
815 | ||
816 | Transaction* txn2 = db->BeginTransaction(write_options, txn_options); | |
817 | ASSERT_TRUE(txn2 != nullptr); | |
818 | ASSERT_OK(txn2->SetName("txn2")); | |
819 | ||
820 | // This transaction is created to cause potential conflict. | |
821 | Transaction* txn_x = db->BeginTransaction(write_options); | |
822 | ASSERT_OK(txn_x->SetName("txn_x")); | |
823 | ASSERT_OK(txn_x->Put(Slice("foo"), Slice("bar3"))); | |
824 | ASSERT_OK(txn_x->Prepare()); | |
825 | ||
826 | // Create snapshots after the prepare, but there should still | |
827 | // be a conflict when trying to read "foo". | |
828 | ||
829 | if (attempt == kAttemptImmMemTable) { | |
830 | // For the second attempt, hold flush from beginning. The memtable | |
831 | // will be switched to immutable after calling TEST_SwitchMemtable() | |
832 | // while CheckKey() is called. | |
833 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( | |
834 | {{"WritePreparedTransactionTest.CheckKeySkipOldMemtable", | |
835 | "FlushJob::Start"}}); | |
836 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); | |
837 | } | |
838 | ||
839 | // force a memtable flush. The memtable should still be kept | |
840 | FlushOptions flush_ops; | |
841 | if (attempt == kAttemptHistoryMemtable) { | |
842 | ASSERT_OK(db->Flush(flush_ops)); | |
843 | } else { | |
844 | assert(attempt == kAttemptImmMemTable); | |
845 | DBImpl* db_impl = static_cast<DBImpl*>(db->GetRootDB()); | |
846 | db_impl->TEST_SwitchMemtable(); | |
847 | } | |
848 | uint64_t num_imm_mems; | |
849 | ASSERT_TRUE(db->GetIntProperty(DB::Properties::kNumImmutableMemTable, | |
850 | &num_imm_mems)); | |
851 | if (attempt == kAttemptHistoryMemtable) { | |
852 | ASSERT_EQ(0, num_imm_mems); | |
853 | } else { | |
854 | assert(attempt == kAttemptImmMemTable); | |
855 | ASSERT_EQ(1, num_imm_mems); | |
856 | } | |
857 | ||
858 | // Put something in active memtable | |
859 | ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar"))); | |
860 | ||
861 | // Create txn3 after flushing, but this transaction also needs to | |
862 | // check all memtables because of they contains uncommitted data. | |
863 | Transaction* txn3 = db->BeginTransaction(write_options, txn_options); | |
864 | ASSERT_TRUE(txn3 != nullptr); | |
865 | ASSERT_OK(txn3->SetName("txn3")); | |
866 | ||
867 | // Commit the pending write | |
868 | ASSERT_OK(txn_x->Commit()); | |
869 | ||
870 | // Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will | |
871 | // pass. In all cases, both memtables are queried. | |
872 | SetPerfLevel(PerfLevel::kEnableCount); | |
873 | get_perf_context()->Reset(); | |
874 | ASSERT_TRUE(txn3->GetForUpdate(read_options, "foo", &value).IsBusy()); | |
875 | // We should have checked two memtables, active and either immutable | |
876 | // or history memtable, depending on the test case. | |
877 | ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); | |
878 | ||
879 | get_perf_context()->Reset(); | |
880 | ASSERT_TRUE(txn->GetForUpdate(read_options, "foo", &value).IsBusy()); | |
881 | // We should have checked two memtables, active and either immutable | |
882 | // or history memtable, depending on the test case. | |
883 | ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); | |
884 | ||
885 | get_perf_context()->Reset(); | |
886 | ASSERT_OK(txn2->GetForUpdate(read_options, "foo2", &value)); | |
887 | ASSERT_EQ(value, "bar"); | |
888 | // We should have checked two memtables, and since there is no | |
889 | // conflict, another Get() will be made and fetch the data from | |
890 | // DB. If it is in immutable memtable, two extra memtable reads | |
891 | // will be issued. If it is not (in history), only one will | |
892 | // be made, which is to the active memtable. | |
893 | if (attempt == kAttemptHistoryMemtable) { | |
894 | ASSERT_EQ(3, get_perf_context()->get_from_memtable_count); | |
895 | } else { | |
896 | assert(attempt == kAttemptImmMemTable); | |
897 | ASSERT_EQ(4, get_perf_context()->get_from_memtable_count); | |
898 | } | |
899 | ||
900 | Transaction* txn4 = db->BeginTransaction(write_options, txn_options); | |
901 | ASSERT_TRUE(txn4 != nullptr); | |
902 | ASSERT_OK(txn4->SetName("txn4")); | |
903 | get_perf_context()->Reset(); | |
904 | ASSERT_OK(txn4->GetForUpdate(read_options, "foo", &value)); | |
905 | if (attempt == kAttemptHistoryMemtable) { | |
906 | // Active memtable will be checked in snapshot validation and when | |
907 | // getting the value. | |
908 | ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); | |
909 | } else { | |
910 | // Only active memtable will be checked in snapshot validation but | |
911 | // both of active and immutable snapshot will be queried when | |
912 | // getting the value. | |
913 | assert(attempt == kAttemptImmMemTable); | |
914 | ASSERT_EQ(3, get_perf_context()->get_from_memtable_count); | |
915 | } | |
916 | ||
917 | ASSERT_OK(txn2->Commit()); | |
918 | ASSERT_OK(txn4->Commit()); | |
919 | ||
920 | TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable"); | |
921 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); | |
922 | ||
923 | SetPerfLevel(PerfLevel::kDisable); | |
924 | ||
925 | delete txn; | |
926 | delete txn2; | |
927 | delete txn3; | |
928 | delete txn4; | |
929 | delete txn_x; | |
930 | } | |
931 | } | |
932 | ||
494da23a TL |
933 | // Reproduce the bug with two snapshots with the same seuqence number and test |
934 | // that the release of the first snapshot will not affect the reads by the other | |
935 | // snapshot | |
936 | TEST_P(WritePreparedTransactionTest, DoubleSnapshot) { | |
937 | TransactionOptions txn_options; | |
938 | Status s; | |
939 | ||
940 | // Insert initial value | |
941 | ASSERT_OK(db->Put(WriteOptions(), "key", "value1")); | |
942 | ||
943 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
944 | Transaction* txn = | |
945 | wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr); | |
946 | ASSERT_OK(txn->SetName("txn")); | |
947 | ASSERT_OK(txn->Put("key", "value2")); | |
948 | ASSERT_OK(txn->Prepare()); | |
949 | // Three snapshots with the same seq number | |
950 | const Snapshot* snapshot0 = wp_db->GetSnapshot(); | |
951 | const Snapshot* snapshot1 = wp_db->GetSnapshot(); | |
952 | const Snapshot* snapshot2 = wp_db->GetSnapshot(); | |
953 | ASSERT_OK(txn->Commit()); | |
954 | SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE; | |
955 | SequenceNumber overlap_seq = txn->GetId() + cache_size; | |
956 | delete txn; | |
957 | ||
958 | // 4th snapshot with a larger seq | |
959 | const Snapshot* snapshot3 = wp_db->GetSnapshot(); | |
960 | // Cause an eviction to advance max evicted seq number | |
961 | // This also fetches the 4 snapshots from db since their seq is lower than the | |
962 | // new max | |
963 | wp_db->AddCommitted(overlap_seq, overlap_seq); | |
964 | ||
965 | ReadOptions ropt; | |
966 | // It should see the value before commit | |
967 | ropt.snapshot = snapshot2; | |
968 | PinnableSlice pinnable_val; | |
969 | s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val); | |
970 | ASSERT_OK(s); | |
971 | ASSERT_TRUE(pinnable_val == "value1"); | |
972 | pinnable_val.Reset(); | |
973 | ||
974 | wp_db->ReleaseSnapshot(snapshot1); | |
975 | ||
976 | // It should still see the value before commit | |
977 | s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val); | |
978 | ASSERT_OK(s); | |
979 | ASSERT_TRUE(pinnable_val == "value1"); | |
980 | pinnable_val.Reset(); | |
981 | ||
982 | // Cause an eviction to advance max evicted seq number and trigger updating | |
983 | // the snapshot list | |
984 | overlap_seq += cache_size; | |
985 | wp_db->AddCommitted(overlap_seq, overlap_seq); | |
986 | ||
987 | // It should still see the value before commit | |
988 | s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val); | |
989 | ASSERT_OK(s); | |
990 | ASSERT_TRUE(pinnable_val == "value1"); | |
991 | pinnable_val.Reset(); | |
992 | ||
993 | wp_db->ReleaseSnapshot(snapshot0); | |
994 | wp_db->ReleaseSnapshot(snapshot2); | |
995 | wp_db->ReleaseSnapshot(snapshot3); | |
996 | } | |
997 | ||
998 | size_t UniqueCnt(std::vector<SequenceNumber> vec) { | |
999 | std::set<SequenceNumber> aset; | |
1000 | for (auto i : vec) { | |
1001 | aset.insert(i); | |
1002 | } | |
1003 | return aset.size(); | |
1004 | } | |
11fdf7f2 TL |
1005 | // Test that the entries in old_commit_map_ get garbage collected properly |
1006 | TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { | |
1007 | const size_t snapshot_cache_bits = 0; | |
1008 | const size_t commit_cache_bits = 0; | |
1009 | DBImpl* mock_db = new DBImpl(options, dbname); | |
494da23a TL |
1010 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); |
1011 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( | |
1012 | new WritePreparedTxnDBMock(mock_db, txn_db_options)); | |
11fdf7f2 TL |
1013 | |
1014 | SequenceNumber seq = 0; | |
1015 | // Take the first snapshot that overlaps with two txn | |
1016 | auto prep_seq = ++seq; | |
1017 | wp_db->AddPrepared(prep_seq); | |
1018 | auto prep_seq2 = ++seq; | |
1019 | wp_db->AddPrepared(prep_seq2); | |
1020 | auto snap_seq1 = seq; | |
1021 | wp_db->TakeSnapshot(snap_seq1); | |
1022 | auto commit_seq = ++seq; | |
1023 | wp_db->AddCommitted(prep_seq, commit_seq); | |
1024 | wp_db->RemovePrepared(prep_seq); | |
1025 | auto commit_seq2 = ++seq; | |
1026 | wp_db->AddCommitted(prep_seq2, commit_seq2); | |
1027 | wp_db->RemovePrepared(prep_seq2); | |
1028 | // Take the 2nd and 3rd snapshot that overlap with the same txn | |
1029 | prep_seq = ++seq; | |
1030 | wp_db->AddPrepared(prep_seq); | |
1031 | auto snap_seq2 = seq; | |
1032 | wp_db->TakeSnapshot(snap_seq2); | |
1033 | seq++; | |
1034 | auto snap_seq3 = seq; | |
1035 | wp_db->TakeSnapshot(snap_seq3); | |
1036 | seq++; | |
1037 | commit_seq = ++seq; | |
1038 | wp_db->AddCommitted(prep_seq, commit_seq); | |
1039 | wp_db->RemovePrepared(prep_seq); | |
1040 | // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the | |
1041 | // only item in the commit_cache_ via another commit. | |
1042 | prep_seq = ++seq; | |
1043 | wp_db->AddPrepared(prep_seq); | |
1044 | commit_seq = ++seq; | |
1045 | wp_db->AddCommitted(prep_seq, commit_seq); | |
1046 | wp_db->RemovePrepared(prep_seq); | |
1047 | ||
1048 | // Verify that the evicted commit entries for all snapshots are in the | |
1049 | // old_commit_map_ | |
1050 | { | |
1051 | ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); | |
1052 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
1053 | ASSERT_EQ(3, wp_db->old_commit_map_.size()); | |
494da23a TL |
1054 | ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1])); |
1055 | ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2])); | |
1056 | ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3])); | |
11fdf7f2 TL |
1057 | } |
1058 | ||
1059 | // Verify that the 2nd snapshot is cleaned up after the release | |
1060 | wp_db->ReleaseSnapshotInternal(snap_seq2); | |
1061 | { | |
1062 | ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); | |
1063 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
1064 | ASSERT_EQ(2, wp_db->old_commit_map_.size()); | |
494da23a TL |
1065 | ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1])); |
1066 | ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3])); | |
11fdf7f2 TL |
1067 | } |
1068 | ||
1069 | // Verify that the 1st snapshot is cleaned up after the release | |
1070 | wp_db->ReleaseSnapshotInternal(snap_seq1); | |
1071 | { | |
1072 | ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); | |
1073 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
1074 | ASSERT_EQ(1, wp_db->old_commit_map_.size()); | |
494da23a | 1075 | ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3])); |
11fdf7f2 TL |
1076 | } |
1077 | ||
1078 | // Verify that the 3rd snapshot is cleaned up after the release | |
1079 | wp_db->ReleaseSnapshotInternal(snap_seq3); | |
1080 | { | |
1081 | ASSERT_TRUE(wp_db->old_commit_map_empty_.load()); | |
1082 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
1083 | ASSERT_EQ(0, wp_db->old_commit_map_.size()); | |
1084 | } | |
1085 | } | |
1086 | ||
f67539c2 | 1087 | TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshots) { |
11fdf7f2 TL |
1088 | std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l, |
1089 | 600l, 700l, 800l, 900l}; | |
1090 | const size_t snapshot_cache_bits = 2; | |
494da23a | 1091 | const uint64_t cache_size = 1ul << snapshot_cache_bits; |
11fdf7f2 TL |
1092 | // Safety check to express the intended size in the test. Can be adjusted if |
1093 | // the snapshots lists changed. | |
1094 | assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size()); | |
1095 | DBImpl* mock_db = new DBImpl(options, dbname); | |
494da23a | 1096 | UpdateTransactionDBOptions(snapshot_cache_bits); |
11fdf7f2 | 1097 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( |
494da23a | 1098 | new WritePreparedTxnDBMock(mock_db, txn_db_options)); |
11fdf7f2 TL |
1099 | SequenceNumber version = 1000l; |
1100 | ASSERT_EQ(0, wp_db->snapshots_total_); | |
1101 | wp_db->UpdateSnapshots(snapshots, version); | |
1102 | ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_); | |
1103 | // seq numbers are chosen so that we have two of them between each two | |
1104 | // snapshots. If the diff of two consecutive seq is more than 5, there is a | |
1105 | // snapshot between them. | |
1106 | std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l, | |
1107 | 355l, 450l, 455l, 550l, 555l, 650l, 655l, | |
1108 | 750l, 755l, 850l, 855l, 950l, 955l}; | |
1109 | assert(seqs.size() > 1); | |
20effc67 | 1110 | for (size_t i = 0; i + 1 < seqs.size(); i++) { |
11fdf7f2 TL |
1111 | wp_db->old_commit_map_empty_ = true; // reset |
1112 | CommitEntry commit_entry = {seqs[i], seqs[i + 1]}; | |
1113 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1114 | // Expect update if there is snapshot in between the prepare and commit | |
1115 | bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 && | |
1116 | commit_entry.commit_seq >= snapshots.front() && | |
1117 | commit_entry.prep_seq <= snapshots.back(); | |
1118 | ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_); | |
1119 | } | |
494da23a TL |
1120 | |
1121 | // Test that search will include multiple snapshot from snapshot cache | |
1122 | { | |
1123 | // exclude first and last item in the cache | |
1124 | CommitEntry commit_entry = {snapshots.front() + 1, | |
1125 | snapshots[cache_size - 1] - 1}; | |
1126 | wp_db->old_commit_map_empty_ = true; // reset | |
1127 | wp_db->old_commit_map_.clear(); | |
1128 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1129 | ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2); | |
1130 | } | |
1131 | ||
1132 | // Test that search will include multiple snapshot from old snapshots | |
1133 | { | |
1134 | // include two in the middle | |
1135 | CommitEntry commit_entry = {snapshots[cache_size] + 1, | |
1136 | snapshots[cache_size + 2] + 1}; | |
1137 | wp_db->old_commit_map_empty_ = true; // reset | |
1138 | wp_db->old_commit_map_.clear(); | |
1139 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1140 | ASSERT_EQ(wp_db->old_commit_map_.size(), 2); | |
1141 | } | |
1142 | ||
1143 | // Test that search will include both snapshot cache and old snapshots | |
1144 | // Case 1: includes all in snapshot cache | |
1145 | { | |
1146 | CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1}; | |
1147 | wp_db->old_commit_map_empty_ = true; // reset | |
1148 | wp_db->old_commit_map_.clear(); | |
1149 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1150 | ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size()); | |
1151 | } | |
1152 | ||
1153 | // Case 2: includes all snapshot caches except the smallest | |
1154 | { | |
1155 | CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1}; | |
1156 | wp_db->old_commit_map_empty_ = true; // reset | |
1157 | wp_db->old_commit_map_.clear(); | |
1158 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1159 | ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1); | |
1160 | } | |
1161 | ||
1162 | // Case 3: includes only the largest of snapshot cache | |
1163 | { | |
1164 | CommitEntry commit_entry = {snapshots[cache_size - 1] - 1, | |
1165 | snapshots.back() + 1}; | |
1166 | wp_db->old_commit_map_empty_ = true; // reset | |
1167 | wp_db->old_commit_map_.clear(); | |
1168 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1169 | ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1); | |
1170 | } | |
11fdf7f2 TL |
1171 | } |
1172 | ||
1173 | // This test is too slow for travis | |
1174 | #ifndef TRAVIS | |
1175 | #ifndef ROCKSDB_VALGRIND_RUN | |
1176 | // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in | |
1177 | // parallel with UpdateSnapshots. | |
f67539c2 | 1178 | TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccess) { |
11fdf7f2 TL |
1179 | // We have a sync point in the method under test after checking each snapshot. |
1180 | // If you increase the max number of snapshots in this test, more sync points | |
1181 | // in the methods must also be added. | |
1182 | const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l, | |
1183 | 60l, 70l, 80l, 90l, 100l}; | |
1184 | const size_t snapshot_cache_bits = 2; | |
1185 | // Safety check to express the intended size in the test. Can be adjusted if | |
1186 | // the snapshots lists changed. | |
1187 | assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size()); | |
1188 | SequenceNumber version = 1000l; | |
1189 | // Choose the cache size so that the new snapshot list could replace all the | |
1190 | // existing items in the cache and also have some overflow. | |
1191 | DBImpl* mock_db = new DBImpl(options, dbname); | |
494da23a | 1192 | UpdateTransactionDBOptions(snapshot_cache_bits); |
11fdf7f2 | 1193 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( |
494da23a | 1194 | new WritePreparedTxnDBMock(mock_db, txn_db_options)); |
11fdf7f2 TL |
1195 | const size_t extra = 2; |
1196 | size_t loop_id = 0; | |
1197 | // Add up to extra items that do not fit into the cache | |
1198 | for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra; | |
1199 | old_size++) { | |
1200 | const std::vector<SequenceNumber> old_snapshots( | |
1201 | snapshots.begin(), snapshots.begin() + old_size); | |
1202 | ||
1203 | // Each member of old snapshot might or might not appear in the new list. We | |
1204 | // create a common_snapshots for each combination. | |
1205 | size_t new_comb_cnt = size_t(1) << old_size; | |
1206 | for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) { | |
1207 | if (loop_id % split_cnt_ != split_id_) continue; | |
1208 | printf("."); // To signal progress | |
1209 | fflush(stdout); | |
1210 | std::vector<SequenceNumber> common_snapshots; | |
1211 | for (size_t i = 0; i < old_snapshots.size(); i++) { | |
1212 | if (IsInCombination(i, new_comb)) { | |
1213 | common_snapshots.push_back(old_snapshots[i]); | |
1214 | } | |
1215 | } | |
1216 | // And add some new snapshots to the common list | |
1217 | for (size_t added_snapshots = 0; | |
1218 | added_snapshots <= snapshots.size() - old_snapshots.size(); | |
1219 | added_snapshots++) { | |
1220 | std::vector<SequenceNumber> new_snapshots = common_snapshots; | |
1221 | for (size_t i = 0; i < added_snapshots; i++) { | |
1222 | new_snapshots.push_back(snapshots[old_snapshots.size() + i]); | |
1223 | } | |
1224 | for (auto it = common_snapshots.begin(); it != common_snapshots.end(); | |
f67539c2 | 1225 | ++it) { |
11fdf7f2 TL |
1226 | auto snapshot = *it; |
1227 | // Create a commit entry that is around the snapshot and thus should | |
1228 | // be not be discarded | |
1229 | CommitEntry entry = {static_cast<uint64_t>(snapshot - 1), | |
1230 | snapshot + 1}; | |
1231 | // The critical part is when iterating the snapshot cache. Afterwards, | |
1232 | // we are operating under the lock | |
1233 | size_t a_range = | |
1234 | std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1; | |
1235 | size_t b_range = | |
1236 | std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1; | |
1237 | // Break each thread at two points | |
1238 | for (size_t a1 = 1; a1 <= a_range; a1++) { | |
1239 | for (size_t a2 = a1 + 1; a2 <= a_range; a2++) { | |
1240 | for (size_t b1 = 1; b1 <= b_range; b1++) { | |
1241 | for (size_t b2 = b1 + 1; b2 <= b_range; b2++) { | |
1242 | SnapshotConcurrentAccessTestInternal( | |
1243 | wp_db.get(), old_snapshots, new_snapshots, entry, version, | |
1244 | a1, a2, b1, b2); | |
1245 | } | |
1246 | } | |
1247 | } | |
1248 | } | |
1249 | } | |
1250 | } | |
1251 | } | |
1252 | } | |
1253 | printf("\n"); | |
1254 | } | |
1255 | #endif // ROCKSDB_VALGRIND_RUN | |
1256 | #endif // TRAVIS | |
1257 | ||
1258 | // This test clarifies the contract of AdvanceMaxEvictedSeq method | |
f67539c2 | 1259 | TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasic) { |
11fdf7f2 TL |
1260 | DBImpl* mock_db = new DBImpl(options, dbname); |
1261 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( | |
1262 | new WritePreparedTxnDBMock(mock_db, txn_db_options)); | |
1263 | ||
1264 | // 1. Set the initial values for max, prepared, and snapshots | |
1265 | SequenceNumber zero_max = 0l; | |
1266 | // Set the initial list of prepared txns | |
1267 | const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100, | |
1268 | 150, 200, 250}; | |
1269 | for (auto p : initial_prepared) { | |
1270 | wp_db->AddPrepared(p); | |
1271 | } | |
1272 | // This updates the max value and also set old prepared | |
1273 | SequenceNumber init_max = 100; | |
1274 | wp_db->AdvanceMaxEvictedSeq(zero_max, init_max); | |
1275 | const std::vector<SequenceNumber> initial_snapshots = {20, 40}; | |
1276 | wp_db->SetDBSnapshots(initial_snapshots); | |
1277 | // This will update the internal cache of snapshots from the DB | |
1278 | wp_db->UpdateSnapshots(initial_snapshots, init_max); | |
1279 | ||
1280 | // 2. Invoke AdvanceMaxEvictedSeq | |
1281 | const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300}; | |
1282 | wp_db->SetDBSnapshots(latest_snapshots); | |
1283 | SequenceNumber new_max = 200; | |
1284 | wp_db->AdvanceMaxEvictedSeq(init_max, new_max); | |
1285 | ||
1286 | // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract | |
1287 | // a. max should be updated to new_max | |
1288 | ASSERT_EQ(wp_db->max_evicted_seq_, new_max); | |
1289 | // b. delayed prepared should contain every txn <= max and prepared should | |
1290 | // only contain txns > max | |
1291 | auto it = initial_prepared.begin(); | |
f67539c2 | 1292 | for (; it != initial_prepared.end() && *it <= new_max; ++it) { |
11fdf7f2 TL |
1293 | ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it)); |
1294 | } | |
1295 | ASSERT_TRUE(wp_db->delayed_prepared_.empty()); | |
1296 | for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty(); | |
f67539c2 | 1297 | ++it, wp_db->prepared_txns_.pop()) { |
11fdf7f2 TL |
1298 | ASSERT_EQ(*it, wp_db->prepared_txns_.top()); |
1299 | } | |
1300 | ASSERT_TRUE(it == initial_prepared.end()); | |
1301 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1302 | // c. snapshots should contain everything below new_max | |
1303 | auto sit = latest_snapshots.begin(); | |
1304 | for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max && | |
1305 | i < wp_db->snapshots_total_; | |
1306 | sit++, i++) { | |
1307 | ASSERT_TRUE(i < wp_db->snapshots_total_); | |
1308 | // This test is in small scale and the list of snapshots are assumed to be | |
1309 | // within the cache size limit. This is just a safety check to double check | |
1310 | // that assumption. | |
1311 | ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE); | |
1312 | ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]); | |
1313 | } | |
1314 | } | |
1315 | ||
494da23a TL |
1316 | // A new snapshot should always be always larger than max_evicted_seq_ |
1317 | // Otherwise the snapshot does not go through AdvanceMaxEvictedSeq | |
1318 | TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) { | |
1319 | WriteOptions woptions; | |
1320 | TransactionOptions txn_options; | |
1321 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1322 | Transaction* txn0 = db->BeginTransaction(woptions, txn_options); | |
1323 | ASSERT_OK(txn0->Put(Slice("key"), Slice("value"))); | |
1324 | ASSERT_OK(txn0->Commit()); | |
1325 | const SequenceNumber seq = txn0->GetId(); // is also prepare seq | |
1326 | delete txn0; | |
1327 | std::vector<Transaction*> txns; | |
1328 | // Inc seq without committing anything | |
1329 | for (int i = 0; i < 10; i++) { | |
1330 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
1331 | ASSERT_OK(txn->SetName("xid" + std::to_string(i))); | |
1332 | ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value"))); | |
1333 | ASSERT_OK(txn->Prepare()); | |
1334 | txns.push_back(txn); | |
1335 | } | |
1336 | ||
1337 | // The new commit is seq + 10 | |
1338 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1339 | auto snap = wp_db->GetSnapshot(); | |
1340 | const SequenceNumber last_seq = snap->GetSequenceNumber(); | |
1341 | wp_db->ReleaseSnapshot(snap); | |
1342 | ASSERT_LT(seq, last_seq); | |
1343 | // Otherwise our test is not effective | |
1344 | ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED); | |
1345 | ||
1346 | // Evict seq out of commit cache | |
1347 | const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE; | |
1348 | // Check that the next write could make max go beyond last | |
1349 | auto last_max = wp_db->max_evicted_seq_.load(); | |
1350 | wp_db->AddCommitted(overwrite_seq, overwrite_seq); | |
1351 | // Check that eviction has advanced the max | |
1352 | ASSERT_LT(last_max, wp_db->max_evicted_seq_.load()); | |
1353 | // Check that the new max has not advanced the last seq | |
1354 | ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq); | |
1355 | for (auto txn : txns) { | |
1356 | txn->Rollback(); | |
1357 | delete txn; | |
1358 | } | |
1359 | } | |
1360 | ||
1361 | // A new snapshot should always be always larger than max_evicted_seq_ | |
1362 | // In very rare cases max could be below last published seq. Test that | |
1363 | // taking snapshot will wait for max to catch up. | |
1364 | TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) { | |
1365 | const size_t snapshot_cache_bits = 7; // same as default | |
1366 | const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction | |
1367 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1368 | ReOpen(); | |
1369 | WriteOptions woptions; | |
1370 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1371 | ||
1372 | const int writes = 50; | |
1373 | const int batch_cnt = 4; | |
f67539c2 | 1374 | ROCKSDB_NAMESPACE::port::Thread t1([&]() { |
494da23a TL |
1375 | for (int i = 0; i < writes; i++) { |
1376 | WriteBatch batch; | |
1377 | // For duplicate keys cause 4 commit entries, each evicting an entry that | |
f67539c2 | 1378 | // is not published yet, thus causing max evicted seq go higher than last |
494da23a TL |
1379 | // published. |
1380 | for (int b = 0; b < batch_cnt; b++) { | |
1381 | batch.Put("foo", "foo"); | |
1382 | } | |
1383 | db->Write(woptions, &batch); | |
1384 | } | |
1385 | }); | |
1386 | ||
f67539c2 | 1387 | ROCKSDB_NAMESPACE::port::Thread t2([&]() { |
494da23a TL |
1388 | while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread |
1389 | std::this_thread::yield(); | |
1390 | } | |
1391 | for (int i = 0; i < 10; i++) { | |
f67539c2 | 1392 | SequenceNumber max_lower_bound = wp_db->max_evicted_seq_; |
494da23a TL |
1393 | auto snap = db->GetSnapshot(); |
1394 | if (snap->GetSequenceNumber() != 0) { | |
f67539c2 TL |
1395 | // Value of max_evicted_seq_ when snapshot was taken in unknown. We thus |
1396 | // compare with the lower bound instead as an approximation. | |
1397 | ASSERT_LT(max_lower_bound, snap->GetSequenceNumber()); | |
494da23a TL |
1398 | } // seq 0 is ok to be less than max since nothing is visible to it |
1399 | db->ReleaseSnapshot(snap); | |
1400 | } | |
1401 | }); | |
1402 | ||
1403 | t1.join(); | |
1404 | t2.join(); | |
1405 | ||
1406 | // Make sure that the test has worked and seq number has advanced as we | |
1407 | // thought | |
1408 | auto snap = db->GetSnapshot(); | |
1409 | ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1); | |
1410 | db->ReleaseSnapshot(snap); | |
1411 | } | |
1412 | ||
f67539c2 TL |
1413 | // Test that reads without snapshots would not hit an undefined state |
1414 | TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) { | |
1415 | const size_t snapshot_cache_bits = 7; // same as default | |
1416 | const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction | |
1417 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1418 | ReOpen(); | |
1419 | WriteOptions woptions; | |
1420 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1421 | ||
1422 | const int writes = 50; | |
1423 | ROCKSDB_NAMESPACE::port::Thread t1([&]() { | |
1424 | for (int i = 0; i < writes; i++) { | |
1425 | WriteBatch batch; | |
1426 | batch.Put("key", "foo"); | |
1427 | db->Write(woptions, &batch); | |
1428 | } | |
1429 | }); | |
1430 | ||
1431 | ROCKSDB_NAMESPACE::port::Thread t2([&]() { | |
1432 | while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread | |
1433 | std::this_thread::yield(); | |
1434 | } | |
1435 | ReadOptions ropt; | |
1436 | PinnableSlice pinnable_val; | |
1437 | TransactionOptions txn_options; | |
1438 | for (int i = 0; i < 10; i++) { | |
1439 | auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); | |
1440 | ASSERT_TRUE(s.ok() || s.IsTryAgain()); | |
1441 | pinnable_val.Reset(); | |
1442 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
1443 | s = txn->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); | |
1444 | ASSERT_TRUE(s.ok() || s.IsTryAgain()); | |
1445 | pinnable_val.Reset(); | |
1446 | std::vector<std::string> values; | |
1447 | auto s_vec = | |
1448 | txn->MultiGet(ropt, {db->DefaultColumnFamily()}, {"key"}, &values); | |
1449 | ASSERT_EQ(1, values.size()); | |
1450 | ASSERT_EQ(1, s_vec.size()); | |
1451 | s = s_vec[0]; | |
1452 | ASSERT_TRUE(s.ok() || s.IsTryAgain()); | |
1453 | Slice key("key"); | |
1454 | txn->MultiGet(ropt, db->DefaultColumnFamily(), 1, &key, &pinnable_val, &s, | |
1455 | true); | |
1456 | ASSERT_TRUE(s.ok() || s.IsTryAgain()); | |
1457 | delete txn; | |
1458 | } | |
1459 | }); | |
1460 | ||
1461 | t1.join(); | |
1462 | t2.join(); | |
1463 | ||
1464 | // Make sure that the test has worked and seq number has advanced as we | |
1465 | // thought | |
1466 | auto snap = db->GetSnapshot(); | |
1467 | ASSERT_GT(snap->GetSequenceNumber(), writes - 1); | |
1468 | db->ReleaseSnapshot(snap); | |
1469 | } | |
1470 | ||
494da23a TL |
1471 | // Check that old_commit_map_ cleanup works correctly if the snapshot equals |
1472 | // max_evicted_seq_. | |
1473 | TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) { | |
1474 | const size_t snapshot_cache_bits = 7; // same as default | |
1475 | const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction | |
1476 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1477 | ReOpen(); | |
1478 | WriteOptions woptions; | |
1479 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1480 | // Insert something to increase seq | |
1481 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1482 | auto snap = db->GetSnapshot(); | |
1483 | auto snap_seq = snap->GetSequenceNumber(); | |
1484 | // Another insert should trigger eviction + load snapshot from db | |
1485 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1486 | // This is the scenario that we check agaisnt | |
1487 | ASSERT_EQ(snap_seq, wp_db->max_evicted_seq_); | |
1488 | // old_commit_map_ now has some data that needs gc | |
1489 | ASSERT_EQ(1, wp_db->snapshots_total_); | |
1490 | ASSERT_EQ(1, wp_db->old_commit_map_.size()); | |
1491 | ||
1492 | db->ReleaseSnapshot(snap); | |
1493 | ||
1494 | // Another insert should trigger eviction + load snapshot from db | |
1495 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1496 | ||
1497 | // the snapshot and related metadata must be properly garbage collected | |
1498 | ASSERT_EQ(0, wp_db->snapshots_total_); | |
1499 | ASSERT_TRUE(wp_db->snapshots_all_.empty()); | |
1500 | ASSERT_EQ(0, wp_db->old_commit_map_.size()); | |
1501 | } | |
1502 | ||
1503 | TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) { | |
1504 | auto snap = db->GetSnapshot(); | |
1505 | auto seq1 = snap->GetSequenceNumber(); | |
1506 | db->ReleaseSnapshot(snap); | |
1507 | ||
1508 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1509 | wp_db->AdvanceSeqByOne(); | |
1510 | ||
1511 | snap = db->GetSnapshot(); | |
1512 | auto seq2 = snap->GetSequenceNumber(); | |
1513 | db->ReleaseSnapshot(snap); | |
1514 | ||
1515 | ASSERT_LT(seq1, seq2); | |
1516 | } | |
1517 | ||
1518 | // Test that the txn Initilize calls the overridden functions | |
1519 | TEST_P(WritePreparedTransactionTest, TxnInitialize) { | |
1520 | TransactionOptions txn_options; | |
1521 | WriteOptions write_options; | |
1522 | ASSERT_OK(db->Put(write_options, "key", "value")); | |
1523 | Transaction* txn0 = db->BeginTransaction(write_options, txn_options); | |
1524 | ASSERT_OK(txn0->SetName("xid")); | |
1525 | ASSERT_OK(txn0->Put(Slice("key"), Slice("value1"))); | |
1526 | ASSERT_OK(txn0->Prepare()); | |
1527 | ||
1528 | // SetSnapshot is overridden to update min_uncommitted_ | |
1529 | txn_options.set_snapshot = true; | |
1530 | Transaction* txn1 = db->BeginTransaction(write_options, txn_options); | |
1531 | auto snap = txn1->GetSnapshot(); | |
1532 | auto snap_impl = reinterpret_cast<const SnapshotImpl*>(snap); | |
1533 | // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be | |
1534 | // udpated | |
1535 | ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq); | |
1536 | ||
1537 | txn0->Rollback(); | |
1538 | txn1->Rollback(); | |
1539 | delete txn0; | |
1540 | delete txn1; | |
1541 | } | |
1542 | ||
11fdf7f2 TL |
1543 | // This tests that transactions with duplicate keys perform correctly after max |
1544 | // is advancing their prepared sequence numbers. This will not be the case if | |
1545 | // for example the txn does not add the prepared seq for the second sub-batch to | |
494da23a | 1546 | // the PreparedHeap structure. |
f67539c2 TL |
1547 | TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicates) { |
1548 | const size_t snapshot_cache_bits = 7; // same as default | |
1549 | const size_t commit_cache_bits = 1; // disable commit cache | |
1550 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1551 | ReOpen(); | |
1552 | ||
1553 | ReadOptions ropt; | |
1554 | PinnableSlice pinnable_val; | |
11fdf7f2 TL |
1555 | WriteOptions write_options; |
1556 | TransactionOptions txn_options; | |
1557 | Transaction* txn0 = db->BeginTransaction(write_options, txn_options); | |
1558 | ASSERT_OK(txn0->SetName("xid")); | |
1559 | ASSERT_OK(txn0->Put(Slice("key"), Slice("value1"))); | |
1560 | ASSERT_OK(txn0->Put(Slice("key"), Slice("value2"))); | |
1561 | ASSERT_OK(txn0->Prepare()); | |
1562 | ||
f67539c2 TL |
1563 | ASSERT_OK(db->Put(write_options, "key2", "value")); |
1564 | // Will cause max advance due to disabled commit cache | |
1565 | ASSERT_OK(db->Put(write_options, "key3", "value")); | |
11fdf7f2 | 1566 | |
11fdf7f2 TL |
1567 | auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); |
1568 | ASSERT_TRUE(s.IsNotFound()); | |
1569 | delete txn0; | |
1570 | ||
f67539c2 | 1571 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
11fdf7f2 TL |
1572 | wp_db->db_impl_->FlushWAL(true); |
1573 | wp_db->TEST_Crash(); | |
1574 | ReOpenNoDelete(); | |
1575 | assert(db != nullptr); | |
11fdf7f2 TL |
1576 | s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); |
1577 | ASSERT_TRUE(s.IsNotFound()); | |
1578 | ||
1579 | txn0 = db->GetTransactionByName("xid"); | |
1580 | ASSERT_OK(txn0->Rollback()); | |
1581 | delete txn0; | |
1582 | } | |
1583 | ||
f67539c2 TL |
1584 | #ifndef ROCKSDB_VALGRIND_RUN |
1585 | // Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and | |
1586 | // delayed_prepared_, when is run concurrently with advancing max_evicted_seq, | |
1587 | // which moves prepared txns from prepared_txns_ to delayed_prepared_. | |
1588 | TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) { | |
1589 | const size_t snapshot_cache_bits = 7; // same as default | |
1590 | const size_t commit_cache_bits = 1; // disable commit cache | |
1591 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1592 | ReOpen(); | |
1593 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1594 | ReadOptions ropt; | |
1595 | PinnableSlice pinnable_val; | |
1596 | WriteOptions write_options; | |
1597 | TransactionOptions txn_options; | |
1598 | std::vector<Transaction*> txns, committed_txns; | |
1599 | ||
1600 | const int cnt = 100; | |
1601 | for (int i = 0; i < cnt; i++) { | |
1602 | Transaction* txn = db->BeginTransaction(write_options, txn_options); | |
1603 | ASSERT_OK(txn->SetName("xid" + ToString(i))); | |
1604 | auto key = "key1" + ToString(i); | |
1605 | auto value = "value1" + ToString(i); | |
1606 | ASSERT_OK(txn->Put(Slice(key), Slice(value))); | |
1607 | ASSERT_OK(txn->Prepare()); | |
1608 | txns.push_back(txn); | |
1609 | } | |
1610 | ||
1611 | port::Mutex mutex; | |
1612 | Random rnd(1103); | |
1613 | ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() { | |
1614 | for (int i = 0; i < cnt; i++) { | |
1615 | uint32_t index = rnd.Uniform(cnt - i); | |
1616 | Transaction* txn; | |
1617 | { | |
1618 | MutexLock l(&mutex); | |
1619 | txn = txns[index]; | |
1620 | txns.erase(txns.begin() + index); | |
1621 | } | |
1622 | // Since commit cache is practically disabled, commit results in immediate | |
1623 | // advance in max_evicted_seq_ and subsequently moving some prepared txns | |
1624 | // to delayed_prepared_. | |
1625 | txn->Commit(); | |
1626 | committed_txns.push_back(txn); | |
1627 | } | |
1628 | }); | |
1629 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { | |
1630 | while (1) { | |
1631 | MutexLock l(&mutex); | |
1632 | if (txns.empty()) { | |
1633 | break; | |
1634 | } | |
1635 | auto min_uncommitted = wp_db->SmallestUnCommittedSeq(); | |
1636 | ASSERT_LE(min_uncommitted, (*txns.begin())->GetId()); | |
1637 | } | |
1638 | }); | |
1639 | ||
1640 | commit_thread.join(); | |
1641 | read_thread.join(); | |
1642 | for (auto txn : committed_txns) { | |
1643 | delete txn; | |
1644 | } | |
1645 | } | |
1646 | #endif // ROCKSDB_VALGRIND_RUN | |
1647 | ||
1648 | TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) { | |
11fdf7f2 TL |
1649 | // Given the sequential run of txns, with this timeout we should never see a |
1650 | // deadlock nor a timeout unless we have a key conflict, which should be | |
1651 | // almost infeasible. | |
1652 | txn_db_options.transaction_lock_timeout = 1000; | |
1653 | txn_db_options.default_lock_timeout = 1000; | |
1654 | ReOpen(); | |
1655 | FlushOptions fopt; | |
1656 | ||
1657 | // Number of different txn types we use in this test | |
1658 | const size_t type_cnt = 5; | |
1659 | // The size of the first write group | |
1660 | // TODO(myabandeh): This should be increase for pre-release tests | |
1661 | const size_t first_group_size = 2; | |
1662 | // Total number of txns we run in each test | |
1663 | // TODO(myabandeh): This should be increase for pre-release tests | |
1664 | const size_t txn_cnt = first_group_size + 1; | |
1665 | ||
1666 | size_t base[txn_cnt + 1] = { | |
1667 | 1, | |
1668 | }; | |
1669 | for (size_t bi = 1; bi <= txn_cnt; bi++) { | |
1670 | base[bi] = base[bi - 1] * type_cnt; | |
1671 | } | |
1672 | const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt)); | |
1673 | printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n); | |
1674 | for (size_t n = 0; n < max_n; n++, ReOpen()) { | |
1675 | if (n % split_cnt_ != split_id_) continue; | |
1676 | if (n % 1000 == 0) { | |
1677 | printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n); | |
1678 | } | |
20effc67 | 1679 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 | 1680 | auto seq = db_impl->TEST_GetLastVisibleSequence(); |
f67539c2 | 1681 | with_empty_commits = 0; |
11fdf7f2 TL |
1682 | exp_seq = seq; |
1683 | // This is increased before writing the batch for commit | |
1684 | commit_writes = 0; | |
1685 | // This is increased before txn starts linking if it expects to do a commit | |
1686 | // eventually | |
1687 | expected_commits = 0; | |
1688 | std::vector<port::Thread> threads; | |
1689 | ||
1690 | linked = 0; | |
1691 | std::atomic<bool> batch_formed(false); | |
f67539c2 | 1692 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
11fdf7f2 TL |
1693 | "WriteThread::EnterAsBatchGroupLeader:End", |
1694 | [&](void* /*arg*/) { batch_formed = true; }); | |
f67539c2 | 1695 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
11fdf7f2 TL |
1696 | "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) { |
1697 | linked++; | |
1698 | if (linked == 1) { | |
1699 | // Wait until the others are linked too. | |
1700 | while (linked < first_group_size) { | |
1701 | } | |
1702 | } else if (linked == 1 + first_group_size) { | |
1703 | // Make the 2nd batch of the rest of writes plus any followup | |
1704 | // commits from the first batch | |
1705 | while (linked < txn_cnt + commit_writes) { | |
1706 | } | |
1707 | } | |
1708 | // Then we will have one or more batches consisting of follow-up | |
1709 | // commits from the 2nd batch. There is a bit of non-determinism here | |
1710 | // but it should be tolerable. | |
1711 | }); | |
1712 | ||
f67539c2 | 1713 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
11fdf7f2 TL |
1714 | for (size_t bi = 0; bi < txn_cnt; bi++) { |
1715 | // get the bi-th digit in number system based on type_cnt | |
1716 | size_t d = (n % base[bi + 1]) / base[bi]; | |
1717 | switch (d) { | |
1718 | case 0: | |
1719 | threads.emplace_back(txn_t0, bi); | |
1720 | break; | |
1721 | case 1: | |
1722 | threads.emplace_back(txn_t1, bi); | |
1723 | break; | |
1724 | case 2: | |
1725 | threads.emplace_back(txn_t2, bi); | |
1726 | break; | |
1727 | case 3: | |
1728 | threads.emplace_back(txn_t3, bi); | |
1729 | break; | |
1730 | case 4: | |
1731 | threads.emplace_back(txn_t3, bi); | |
1732 | break; | |
1733 | default: | |
1734 | assert(false); | |
1735 | } | |
1736 | // wait to be linked | |
1737 | while (linked.load() <= bi) { | |
1738 | } | |
1739 | // after a queue of size first_group_size | |
1740 | if (bi + 1 == first_group_size) { | |
1741 | while (!batch_formed) { | |
1742 | } | |
1743 | // to make it more deterministic, wait until the commits are linked | |
1744 | while (linked.load() <= bi + expected_commits) { | |
1745 | } | |
1746 | } | |
1747 | } | |
1748 | for (auto& t : threads) { | |
1749 | t.join(); | |
1750 | } | |
1751 | if (options.two_write_queues) { | |
1752 | // In this case none of the above scheduling tricks to deterministically | |
1753 | // form merged batches works because the writes go to separate queues. | |
1754 | // This would result in different write groups in each run of the test. We | |
1755 | // still keep the test since although non-deterministic and hard to debug, | |
1756 | // it is still useful to have. | |
1757 | // TODO(myabandeh): Add a deterministic unit test for two_write_queues | |
1758 | } | |
1759 | ||
1760 | // Check if memtable inserts advanced seq number as expected | |
1761 | seq = db_impl->TEST_GetLastVisibleSequence(); | |
1762 | ASSERT_EQ(exp_seq, seq); | |
1763 | ||
f67539c2 TL |
1764 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
1765 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
11fdf7f2 TL |
1766 | |
1767 | // Check if recovery preserves the last sequence number | |
1768 | db_impl->FlushWAL(true); | |
1769 | ReOpenNoDelete(); | |
1770 | assert(db != nullptr); | |
20effc67 | 1771 | db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 | 1772 | seq = db_impl->TEST_GetLastVisibleSequence(); |
f67539c2 | 1773 | ASSERT_LE(exp_seq, seq + with_empty_commits); |
11fdf7f2 TL |
1774 | |
1775 | // Check if flush preserves the last sequence number | |
1776 | db_impl->Flush(fopt); | |
1777 | seq = db_impl->GetLatestSequenceNumber(); | |
f67539c2 | 1778 | ASSERT_LE(exp_seq, seq + with_empty_commits); |
11fdf7f2 TL |
1779 | |
1780 | // Check if recovery after flush preserves the last sequence number | |
1781 | db_impl->FlushWAL(true); | |
1782 | ReOpenNoDelete(); | |
1783 | assert(db != nullptr); | |
20effc67 | 1784 | db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 | 1785 | seq = db_impl->GetLatestSequenceNumber(); |
f67539c2 | 1786 | ASSERT_LE(exp_seq, seq + with_empty_commits); |
11fdf7f2 TL |
1787 | } |
1788 | } | |
1789 | ||
1790 | // Run a couple of different txns among them some uncommitted. Restart the db at | |
1791 | // a couple points to check whether the list of uncommitted txns are recovered | |
1792 | // properly. | |
f67539c2 | 1793 | TEST_P(WritePreparedTransactionTest, BasicRecovery) { |
11fdf7f2 TL |
1794 | options.disable_auto_compactions = true; |
1795 | ReOpen(); | |
1796 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1797 | ||
1798 | txn_t0(0); | |
1799 | ||
1800 | TransactionOptions txn_options; | |
1801 | WriteOptions write_options; | |
1802 | size_t index = 1000; | |
1803 | Transaction* txn0 = db->BeginTransaction(write_options, txn_options); | |
1804 | auto istr0 = std::to_string(index); | |
1805 | auto s = txn0->SetName("xid" + istr0); | |
1806 | ASSERT_OK(s); | |
1807 | s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0)); | |
1808 | ASSERT_OK(s); | |
1809 | s = txn0->Prepare(); | |
1810 | auto prep_seq_0 = txn0->GetId(); | |
1811 | ||
1812 | txn_t1(0); | |
1813 | ||
1814 | index++; | |
1815 | Transaction* txn1 = db->BeginTransaction(write_options, txn_options); | |
1816 | auto istr1 = std::to_string(index); | |
1817 | s = txn1->SetName("xid" + istr1); | |
1818 | ASSERT_OK(s); | |
1819 | s = txn1->Put(Slice("foo1" + istr1), Slice("bar")); | |
1820 | ASSERT_OK(s); | |
1821 | s = txn1->Prepare(); | |
1822 | auto prep_seq_1 = txn1->GetId(); | |
1823 | ||
1824 | txn_t2(0); | |
1825 | ||
1826 | ReadOptions ropt; | |
1827 | PinnableSlice pinnable_val; | |
1828 | // Check the value is not committed before restart | |
1829 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1830 | ASSERT_TRUE(s.IsNotFound()); | |
1831 | pinnable_val.Reset(); | |
1832 | ||
1833 | delete txn0; | |
1834 | delete txn1; | |
1835 | wp_db->db_impl_->FlushWAL(true); | |
1836 | wp_db->TEST_Crash(); | |
1837 | ReOpenNoDelete(); | |
1838 | assert(db != nullptr); | |
1839 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1840 | // After recovery, all the uncommitted txns (0 and 1) should be inserted into | |
1841 | // delayed_prepared_ | |
1842 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1843 | ASSERT_FALSE(wp_db->delayed_prepared_empty_); | |
1844 | ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_); | |
1845 | ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_); | |
1846 | { | |
1847 | ReadLock rl(&wp_db->prepared_mutex_); | |
1848 | ASSERT_EQ(2, wp_db->delayed_prepared_.size()); | |
1849 | ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) != | |
1850 | wp_db->delayed_prepared_.end()); | |
1851 | ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) != | |
1852 | wp_db->delayed_prepared_.end()); | |
1853 | } | |
1854 | ||
1855 | // Check the value is still not committed after restart | |
1856 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1857 | ASSERT_TRUE(s.IsNotFound()); | |
1858 | pinnable_val.Reset(); | |
1859 | ||
1860 | txn_t3(0); | |
1861 | ||
1862 | // Test that a recovered txns will be properly marked committed for the next | |
1863 | // recovery | |
1864 | txn1 = db->GetTransactionByName("xid" + istr1); | |
1865 | ASSERT_NE(txn1, nullptr); | |
1866 | txn1->Commit(); | |
1867 | delete txn1; | |
1868 | ||
1869 | index++; | |
1870 | Transaction* txn2 = db->BeginTransaction(write_options, txn_options); | |
1871 | auto istr2 = std::to_string(index); | |
1872 | s = txn2->SetName("xid" + istr2); | |
1873 | ASSERT_OK(s); | |
1874 | s = txn2->Put(Slice("foo2" + istr2), Slice("bar")); | |
1875 | ASSERT_OK(s); | |
1876 | s = txn2->Prepare(); | |
1877 | auto prep_seq_2 = txn2->GetId(); | |
1878 | ||
1879 | delete txn2; | |
1880 | wp_db->db_impl_->FlushWAL(true); | |
1881 | wp_db->TEST_Crash(); | |
1882 | ReOpenNoDelete(); | |
1883 | assert(db != nullptr); | |
1884 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1885 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1886 | ASSERT_FALSE(wp_db->delayed_prepared_empty_); | |
1887 | ||
1888 | // 0 and 2 are prepared and 1 is committed | |
1889 | { | |
1890 | ReadLock rl(&wp_db->prepared_mutex_); | |
1891 | ASSERT_EQ(2, wp_db->delayed_prepared_.size()); | |
1892 | const auto& end = wp_db->delayed_prepared_.end(); | |
1893 | ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end); | |
1894 | ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end); | |
1895 | ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end); | |
1896 | } | |
1897 | ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_); | |
1898 | ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_); | |
1899 | ||
1900 | // Commit all the remaining txns | |
1901 | txn0 = db->GetTransactionByName("xid" + istr0); | |
1902 | ASSERT_NE(txn0, nullptr); | |
1903 | txn0->Commit(); | |
1904 | txn2 = db->GetTransactionByName("xid" + istr2); | |
1905 | ASSERT_NE(txn2, nullptr); | |
1906 | txn2->Commit(); | |
1907 | ||
1908 | // Check the value is committed after commit | |
1909 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1910 | ASSERT_TRUE(s.ok()); | |
1911 | ASSERT_TRUE(pinnable_val == ("bar0" + istr0)); | |
1912 | pinnable_val.Reset(); | |
1913 | ||
1914 | delete txn0; | |
1915 | delete txn2; | |
1916 | wp_db->db_impl_->FlushWAL(true); | |
1917 | ReOpenNoDelete(); | |
1918 | assert(db != nullptr); | |
1919 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1920 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1921 | ASSERT_TRUE(wp_db->delayed_prepared_empty_); | |
1922 | ||
1923 | // Check the value is still committed after recovery | |
1924 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1925 | ASSERT_TRUE(s.ok()); | |
1926 | ASSERT_TRUE(pinnable_val == ("bar0" + istr0)); | |
1927 | pinnable_val.Reset(); | |
1928 | } | |
1929 | ||
1930 | // After recovery the commit map is empty while the max is set. The code would | |
494da23a TL |
1931 | // go through a different path which requires a separate test. Test that the |
1932 | // committed data before the restart is visible to all snapshots. | |
f67539c2 | 1933 | TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMap) { |
494da23a TL |
1934 | for (bool end_with_prepare : {false, true}) { |
1935 | ReOpen(); | |
1936 | WriteOptions woptions; | |
1937 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1938 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1939 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1940 | SequenceNumber prepare_seq = kMaxSequenceNumber; | |
1941 | if (end_with_prepare) { | |
1942 | TransactionOptions txn_options; | |
1943 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
1944 | ASSERT_OK(txn->SetName("xid0")); | |
1945 | ASSERT_OK(txn->Prepare()); | |
1946 | prepare_seq = txn->GetId(); | |
1947 | delete txn; | |
1948 | } | |
1949 | dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash(); | |
20effc67 | 1950 | auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
494da23a TL |
1951 | db_impl->FlushWAL(true); |
1952 | ReOpenNoDelete(); | |
1953 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1954 | assert(wp_db != nullptr); | |
1955 | ASSERT_GT(wp_db->max_evicted_seq_, 0); // max after recovery | |
1956 | // Take a snapshot right after recovery | |
1957 | const Snapshot* snap = db->GetSnapshot(); | |
1958 | auto snap_seq = snap->GetSequenceNumber(); | |
1959 | ASSERT_GT(snap_seq, 0); | |
1960 | ||
1961 | for (SequenceNumber seq = 0; | |
1962 | seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) { | |
1963 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq)); | |
1964 | } | |
1965 | if (end_with_prepare) { | |
1966 | ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq)); | |
1967 | } | |
1968 | // trivial check | |
1969 | ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq)); | |
1970 | ||
1971 | db->ReleaseSnapshot(snap); | |
1972 | ||
1973 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1974 | // Take a snapshot after some writes | |
1975 | snap = db->GetSnapshot(); | |
1976 | snap_seq = snap->GetSequenceNumber(); | |
1977 | for (SequenceNumber seq = 0; | |
1978 | seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) { | |
1979 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq)); | |
1980 | } | |
1981 | if (end_with_prepare) { | |
1982 | ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq)); | |
1983 | } | |
1984 | // trivial check | |
1985 | ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq)); | |
1986 | ||
1987 | db->ReleaseSnapshot(snap); | |
1988 | } | |
1989 | } | |
1990 | ||
1991 | // Shows the contract of IsInSnapshot when called on invalid/released snapshots | |
1992 | TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) { | |
11fdf7f2 | 1993 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
494da23a TL |
1994 | WriteOptions woptions; |
1995 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1996 | // snap seq = 1 | |
1997 | const Snapshot* snap1 = db->GetSnapshot(); | |
1998 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1999 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
2000 | // snap seq = 3 | |
2001 | const Snapshot* snap2 = db->GetSnapshot(); | |
2002 | const SequenceNumber seq = 1; | |
2003 | // Evict seq out of commit cache | |
2004 | size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq; | |
2005 | wp_db->AddCommitted(overwrite_seq, overwrite_seq); | |
2006 | SequenceNumber snap_seq; | |
2007 | uint64_t min_uncommitted = kMinUnCommittedSeq; | |
2008 | bool released; | |
2009 | ||
2010 | released = false; | |
2011 | snap_seq = snap1->GetSequenceNumber(); | |
2012 | ASSERT_LE(seq, snap_seq); | |
2013 | // Valid snapshot lower than max | |
2014 | ASSERT_LE(snap_seq, wp_db->max_evicted_seq_); | |
2015 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); | |
2016 | ASSERT_FALSE(released); | |
2017 | ||
2018 | released = false; | |
2019 | snap_seq = snap1->GetSequenceNumber(); | |
2020 | // Invaid snapshot lower than max | |
2021 | ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_); | |
2022 | ASSERT_TRUE( | |
2023 | wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released)); | |
2024 | ASSERT_TRUE(released); | |
2025 | ||
2026 | db->ReleaseSnapshot(snap1); | |
2027 | ||
2028 | released = false; | |
2029 | // Released snapshot lower than max | |
2030 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); | |
2031 | // The release does not take affect until the next max advance | |
2032 | ASSERT_FALSE(released); | |
2033 | ||
2034 | released = false; | |
2035 | // Invaid snapshot lower than max | |
2036 | ASSERT_TRUE( | |
2037 | wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released)); | |
2038 | ASSERT_TRUE(released); | |
2039 | ||
2040 | // This make the snapshot release to reflect in txn db structures | |
2041 | wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_, | |
2042 | wp_db->max_evicted_seq_ + 1); | |
2043 | ||
2044 | released = false; | |
2045 | // Released snapshot lower than max | |
2046 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); | |
2047 | ASSERT_TRUE(released); | |
2048 | ||
2049 | released = false; | |
2050 | // Invaid snapshot lower than max | |
2051 | ASSERT_TRUE( | |
2052 | wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released)); | |
2053 | ASSERT_TRUE(released); | |
2054 | ||
2055 | snap_seq = snap2->GetSequenceNumber(); | |
2056 | ||
2057 | released = false; | |
2058 | // Unreleased snapshot lower than max | |
2059 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); | |
2060 | ASSERT_FALSE(released); | |
2061 | ||
2062 | db->ReleaseSnapshot(snap2); | |
11fdf7f2 TL |
2063 | } |
2064 | ||
2065 | // Test WritePreparedTxnDB's IsInSnapshot against different ordering of | |
2066 | // snapshot, max_committed_seq_, prepared, and commit entries. | |
f67539c2 | 2067 | TEST_P(WritePreparedTransactionTest, IsInSnapshot) { |
11fdf7f2 TL |
2068 | WriteOptions wo; |
2069 | // Use small commit cache to trigger lots of eviction and fast advance of | |
2070 | // max_evicted_seq_ | |
2071 | const size_t commit_cache_bits = 3; | |
2072 | // Same for snapshot cache size | |
2073 | const size_t snapshot_cache_bits = 2; | |
2074 | ||
2075 | // Take some preliminary snapshots first. This is to stress the data structure | |
2076 | // that holds the old snapshots as it will be designed to be efficient when | |
2077 | // only a few snapshots are below the max_evicted_seq_. | |
2078 | for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) { | |
2079 | // Leave some gap between the preliminary snapshots and the final snapshot | |
2080 | // that we check. This should test for also different overlapping scenarios | |
2081 | // between the last snapshot and the commits. | |
2082 | for (int max_gap = 1; max_gap < 10; max_gap++) { | |
2083 | // Since we do not actually write to db, we mock the seq as it would be | |
2084 | // increased by the db. The only exception is that we need db seq to | |
2085 | // advance for our snapshots. for which we apply a dummy put each time we | |
2086 | // increase our mock of seq. | |
2087 | uint64_t seq = 0; | |
2088 | // At each step we prepare a txn and then we commit it in the next txn. | |
2089 | // This emulates the consecutive transactions that write to the same key | |
2090 | uint64_t cur_txn = 0; | |
2091 | // Number of snapshots taken so far | |
2092 | int num_snapshots = 0; | |
2093 | // Number of gaps applied so far | |
2094 | int gap_cnt = 0; | |
2095 | // The final snapshot that we will inspect | |
2096 | uint64_t snapshot = 0; | |
2097 | bool found_committed = false; | |
2098 | // To stress the data structure that maintain prepared txns, at each cycle | |
2099 | // we add a new prepare txn. These do not mean to be committed for | |
2100 | // snapshot inspection. | |
2101 | std::set<uint64_t> prepared; | |
2102 | // We keep the list of txns committed before we take the last snapshot. | |
2103 | // These should be the only seq numbers that will be found in the snapshot | |
2104 | std::set<uint64_t> committed_before; | |
2105 | // The set of commit seq numbers to be excluded from IsInSnapshot queries | |
2106 | std::set<uint64_t> commit_seqs; | |
2107 | DBImpl* mock_db = new DBImpl(options, dbname); | |
494da23a TL |
2108 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); |
2109 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( | |
2110 | new WritePreparedTxnDBMock(mock_db, txn_db_options)); | |
11fdf7f2 TL |
2111 | // We continue until max advances a bit beyond the snapshot. |
2112 | while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { | |
2113 | // do prepare for a transaction | |
2114 | seq++; | |
2115 | wp_db->AddPrepared(seq); | |
2116 | prepared.insert(seq); | |
2117 | ||
2118 | // If cur_txn is not started, do prepare for it. | |
2119 | if (!cur_txn) { | |
2120 | seq++; | |
2121 | cur_txn = seq; | |
2122 | wp_db->AddPrepared(cur_txn); | |
2123 | } else { // else commit it | |
2124 | seq++; | |
2125 | wp_db->AddCommitted(cur_txn, seq); | |
2126 | wp_db->RemovePrepared(cur_txn); | |
2127 | commit_seqs.insert(seq); | |
2128 | if (!snapshot) { | |
2129 | committed_before.insert(cur_txn); | |
2130 | } | |
2131 | cur_txn = 0; | |
2132 | } | |
2133 | ||
2134 | if (num_snapshots < max_snapshots - 1) { | |
2135 | // Take preliminary snapshots | |
2136 | wp_db->TakeSnapshot(seq); | |
2137 | num_snapshots++; | |
2138 | } else if (gap_cnt < max_gap) { | |
2139 | // Wait for some gap before taking the final snapshot | |
2140 | gap_cnt++; | |
2141 | } else if (!snapshot) { | |
2142 | // Take the final snapshot if it is not already taken | |
2143 | snapshot = seq; | |
2144 | wp_db->TakeSnapshot(snapshot); | |
2145 | num_snapshots++; | |
2146 | } | |
2147 | ||
2148 | // If the snapshot is taken, verify seq numbers visible to it. We redo | |
2149 | // it at each cycle to test that the system is still sound when | |
2150 | // max_evicted_seq_ advances. | |
2151 | if (snapshot) { | |
2152 | for (uint64_t s = 1; | |
2153 | s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) { | |
2154 | bool was_committed = | |
2155 | (committed_before.find(s) != committed_before.end()); | |
2156 | bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot); | |
2157 | if (was_committed != is_in_snapshot) { | |
2158 | printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64 | |
2159 | " snapshot %" PRIu64 | |
2160 | " gap_cnt %d num_snapshots %d s %" PRIu64 "\n", | |
2161 | max_snapshots, max_gap, seq, | |
2162 | wp_db->max_evicted_seq_.load(), snapshot, gap_cnt, | |
2163 | num_snapshots, s); | |
2164 | } | |
2165 | ASSERT_EQ(was_committed, is_in_snapshot); | |
2166 | found_committed = found_committed || is_in_snapshot; | |
2167 | } | |
2168 | } | |
2169 | } | |
2170 | // Safety check to make sure the test actually ran | |
2171 | ASSERT_TRUE(found_committed); | |
2172 | // As an extra check, check if prepared set will be properly empty after | |
2173 | // they are committed. | |
2174 | if (cur_txn) { | |
2175 | wp_db->AddCommitted(cur_txn, seq); | |
2176 | wp_db->RemovePrepared(cur_txn); | |
2177 | } | |
2178 | for (auto p : prepared) { | |
2179 | wp_db->AddCommitted(p, seq); | |
2180 | wp_db->RemovePrepared(p); | |
2181 | } | |
2182 | ASSERT_TRUE(wp_db->delayed_prepared_.empty()); | |
2183 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
2184 | } | |
2185 | } | |
2186 | } | |
2187 | ||
2188 | void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s, | |
2189 | PinnableSlice& exp_v, Slice key) { | |
2190 | Status s; | |
2191 | PinnableSlice v; | |
2192 | s = db->Get(roptions, db->DefaultColumnFamily(), key, &v); | |
2193 | ASSERT_TRUE(exp_s == s); | |
2194 | ASSERT_TRUE(s.ok() || s.IsNotFound()); | |
2195 | if (s.ok()) { | |
2196 | ASSERT_TRUE(exp_v == v); | |
2197 | } | |
2198 | ||
2199 | // Try with MultiGet API too | |
2200 | std::vector<std::string> values; | |
2201 | auto s_vec = | |
2202 | db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values); | |
2203 | ASSERT_EQ(1, values.size()); | |
2204 | ASSERT_EQ(1, s_vec.size()); | |
2205 | s = s_vec[0]; | |
2206 | ASSERT_TRUE(exp_s == s); | |
2207 | ASSERT_TRUE(s.ok() || s.IsNotFound()); | |
2208 | if (s.ok()) { | |
2209 | ASSERT_TRUE(exp_v == values[0]); | |
2210 | } | |
2211 | } | |
2212 | ||
2213 | void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v, | |
2214 | Slice key) { | |
2215 | ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key); | |
2216 | } | |
2217 | ||
f67539c2 | 2218 | TEST_P(WritePreparedTransactionTest, Rollback) { |
11fdf7f2 TL |
2219 | ReadOptions roptions; |
2220 | WriteOptions woptions; | |
2221 | TransactionOptions txn_options; | |
2222 | const size_t num_keys = 4; | |
2223 | const size_t num_values = 5; | |
2224 | for (size_t ikey = 1; ikey <= num_keys; ikey++) { | |
2225 | for (size_t ivalue = 0; ivalue < num_values; ivalue++) { | |
2226 | for (bool crash : {false, true}) { | |
2227 | ReOpen(); | |
2228 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
2229 | std::string key_str = "key" + ToString(ikey); | |
2230 | switch (ivalue) { | |
2231 | case 0: | |
2232 | break; | |
2233 | case 1: | |
2234 | ASSERT_OK(db->Put(woptions, key_str, "initvalue1")); | |
2235 | break; | |
2236 | case 2: | |
2237 | ASSERT_OK(db->Merge(woptions, key_str, "initvalue2")); | |
2238 | break; | |
2239 | case 3: | |
2240 | ASSERT_OK(db->Delete(woptions, key_str)); | |
2241 | break; | |
2242 | case 4: | |
2243 | ASSERT_OK(db->SingleDelete(woptions, key_str)); | |
2244 | break; | |
2245 | default: | |
2246 | assert(0); | |
2247 | } | |
2248 | ||
2249 | PinnableSlice v1; | |
2250 | auto s1 = | |
2251 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1); | |
2252 | PinnableSlice v2; | |
2253 | auto s2 = | |
2254 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2); | |
2255 | PinnableSlice v3; | |
2256 | auto s3 = | |
2257 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3); | |
2258 | PinnableSlice v4; | |
2259 | auto s4 = | |
2260 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4); | |
2261 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
2262 | auto s = txn->SetName("xid0"); | |
2263 | ASSERT_OK(s); | |
2264 | s = txn->Put(Slice("key1"), Slice("value1")); | |
2265 | ASSERT_OK(s); | |
2266 | s = txn->Merge(Slice("key2"), Slice("value2")); | |
2267 | ASSERT_OK(s); | |
2268 | s = txn->Delete(Slice("key3")); | |
2269 | ASSERT_OK(s); | |
2270 | s = txn->SingleDelete(Slice("key4")); | |
2271 | ASSERT_OK(s); | |
2272 | s = txn->Prepare(); | |
2273 | ASSERT_OK(s); | |
2274 | ||
2275 | { | |
2276 | ReadLock rl(&wp_db->prepared_mutex_); | |
2277 | ASSERT_FALSE(wp_db->prepared_txns_.empty()); | |
2278 | ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top()); | |
2279 | } | |
2280 | ||
2281 | ASSERT_SAME(db, s1, v1, "key1"); | |
2282 | ASSERT_SAME(db, s2, v2, "key2"); | |
2283 | ASSERT_SAME(db, s3, v3, "key3"); | |
2284 | ASSERT_SAME(db, s4, v4, "key4"); | |
2285 | ||
2286 | if (crash) { | |
2287 | delete txn; | |
20effc67 | 2288 | auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 TL |
2289 | db_impl->FlushWAL(true); |
2290 | dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash(); | |
2291 | ReOpenNoDelete(); | |
2292 | assert(db != nullptr); | |
2293 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
2294 | txn = db->GetTransactionByName("xid0"); | |
2295 | ASSERT_FALSE(wp_db->delayed_prepared_empty_); | |
2296 | ReadLock rl(&wp_db->prepared_mutex_); | |
2297 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
2298 | ASSERT_FALSE(wp_db->delayed_prepared_.empty()); | |
2299 | ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) != | |
2300 | wp_db->delayed_prepared_.end()); | |
2301 | } | |
2302 | ||
2303 | ASSERT_SAME(db, s1, v1, "key1"); | |
2304 | ASSERT_SAME(db, s2, v2, "key2"); | |
2305 | ASSERT_SAME(db, s3, v3, "key3"); | |
2306 | ASSERT_SAME(db, s4, v4, "key4"); | |
2307 | ||
2308 | s = txn->Rollback(); | |
2309 | ASSERT_OK(s); | |
2310 | ||
2311 | { | |
2312 | ASSERT_TRUE(wp_db->delayed_prepared_empty_); | |
2313 | ReadLock rl(&wp_db->prepared_mutex_); | |
2314 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
2315 | ASSERT_TRUE(wp_db->delayed_prepared_.empty()); | |
2316 | } | |
2317 | ||
2318 | ASSERT_SAME(db, s1, v1, "key1"); | |
2319 | ASSERT_SAME(db, s2, v2, "key2"); | |
2320 | ASSERT_SAME(db, s3, v3, "key3"); | |
2321 | ASSERT_SAME(db, s4, v4, "key4"); | |
2322 | delete txn; | |
2323 | } | |
2324 | } | |
2325 | } | |
2326 | } | |
2327 | ||
f67539c2 | 2328 | TEST_P(WritePreparedTransactionTest, DisableGCDuringRecovery) { |
11fdf7f2 TL |
2329 | // Use large buffer to avoid memtable flush after 1024 insertions |
2330 | options.write_buffer_size = 1024 * 1024; | |
2331 | ReOpen(); | |
2332 | std::vector<KeyVersion> versions; | |
2333 | uint64_t seq = 0; | |
2334 | for (uint64_t i = 1; i <= 1024; i++) { | |
2335 | std::string v = "bar" + ToString(i); | |
2336 | ASSERT_OK(db->Put(WriteOptions(), "foo", v)); | |
2337 | VerifyKeys({{"foo", v}}); | |
2338 | seq++; // one for the key/value | |
2339 | KeyVersion kv = {"foo", v, seq, kTypeValue}; | |
2340 | if (options.two_write_queues) { | |
2341 | seq++; // one for the commit | |
2342 | } | |
2343 | versions.emplace_back(kv); | |
2344 | } | |
2345 | std::reverse(std::begin(versions), std::end(versions)); | |
2346 | VerifyInternalKeys(versions); | |
20effc67 | 2347 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 TL |
2348 | db_impl->FlushWAL(true); |
2349 | // Use small buffer to ensure memtable flush during recovery | |
2350 | options.write_buffer_size = 1024; | |
2351 | ReOpenNoDelete(); | |
2352 | VerifyInternalKeys(versions); | |
2353 | } | |
2354 | ||
f67539c2 | 2355 | TEST_P(WritePreparedTransactionTest, SequenceNumberZero) { |
11fdf7f2 TL |
2356 | ASSERT_OK(db->Put(WriteOptions(), "foo", "bar")); |
2357 | VerifyKeys({{"foo", "bar"}}); | |
2358 | const Snapshot* snapshot = db->GetSnapshot(); | |
2359 | ASSERT_OK(db->Flush(FlushOptions())); | |
2360 | // Dummy keys to avoid compaction trivially move files and get around actual | |
2361 | // compaction logic. | |
2362 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
2363 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
2364 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
2365 | // Compaction will output keys with sequence number 0, if it is visible to | |
2366 | // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is | |
2367 | // visible to any snapshot. | |
2368 | VerifyKeys({{"foo", "bar"}}); | |
2369 | VerifyKeys({{"foo", "bar"}}, snapshot); | |
2370 | VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}}); | |
2371 | db->ReleaseSnapshot(snapshot); | |
2372 | } | |
2373 | ||
2374 | // Compaction should not remove a key if it is not committed, and should | |
2375 | // proceed with older versions of the key as-if the new version doesn't exist. | |
2376 | TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) { | |
2377 | options.disable_auto_compactions = true; | |
2378 | ReOpen(); | |
20effc67 | 2379 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 TL |
2380 | // Snapshots to avoid keys get evicted. |
2381 | std::vector<const Snapshot*> snapshots; | |
2382 | // Keep track of expected sequence number. | |
2383 | SequenceNumber expected_seq = 0; | |
2384 | ||
2385 | auto add_key = [&](std::function<Status()> func) { | |
2386 | ASSERT_OK(func()); | |
2387 | expected_seq++; | |
2388 | if (options.two_write_queues) { | |
2389 | expected_seq++; // 1 for commit | |
2390 | } | |
2391 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
2392 | snapshots.push_back(db->GetSnapshot()); | |
2393 | }; | |
2394 | ||
2395 | // Each key here represent a standalone test case. | |
2396 | add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); }); | |
2397 | add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); }); | |
2398 | add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); }); | |
2399 | add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); }); | |
2400 | add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); }); | |
2401 | add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); }); | |
2402 | add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); }); | |
2403 | add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); }); | |
2404 | ASSERT_OK(db->Flush(FlushOptions())); | |
2405 | add_key([&]() { return db->Delete(WriteOptions(), "key6"); }); | |
2406 | add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); }); | |
2407 | ||
2408 | auto* transaction = db->BeginTransaction(WriteOptions()); | |
2409 | ASSERT_OK(transaction->SetName("txn")); | |
2410 | ASSERT_OK(transaction->Put("key1", "value1_2")); | |
2411 | ASSERT_OK(transaction->Delete("key2")); | |
2412 | ASSERT_OK(transaction->SingleDelete("key3")); | |
2413 | ASSERT_OK(transaction->Merge("key4", "value4_2")); | |
2414 | ASSERT_OK(transaction->Merge("key5", "value5_3")); | |
2415 | ASSERT_OK(transaction->Put("key6", "value6_2")); | |
2416 | ASSERT_OK(transaction->Put("key7", "value7_2")); | |
2417 | // Prepare but not commit. | |
2418 | ASSERT_OK(transaction->Prepare()); | |
2419 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
2420 | ASSERT_OK(db->Flush(FlushOptions())); | |
2421 | for (auto* s : snapshots) { | |
2422 | db->ReleaseSnapshot(s); | |
2423 | } | |
2424 | // Dummy keys to avoid compaction trivially move files and get around actual | |
2425 | // compaction logic. | |
2426 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
2427 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
2428 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
2429 | VerifyKeys({ | |
2430 | {"key1", "value1_1"}, | |
2431 | {"key2", "value2_1"}, | |
2432 | {"key3", "value3_1"}, | |
2433 | {"key4", "value4_1"}, | |
2434 | {"key5", "value5_1,value5_2"}, | |
2435 | {"key6", "NOT_FOUND"}, | |
2436 | {"key7", "NOT_FOUND"}, | |
2437 | }); | |
2438 | VerifyInternalKeys({ | |
2439 | {"key1", "value1_2", expected_seq, kTypeValue}, | |
2440 | {"key1", "value1_1", 0, kTypeValue}, | |
2441 | {"key2", "", expected_seq, kTypeDeletion}, | |
2442 | {"key2", "value2_1", 0, kTypeValue}, | |
2443 | {"key3", "", expected_seq, kTypeSingleDeletion}, | |
2444 | {"key3", "value3_1", 0, kTypeValue}, | |
2445 | {"key4", "value4_2", expected_seq, kTypeMerge}, | |
2446 | {"key4", "value4_1", 0, kTypeValue}, | |
2447 | {"key5", "value5_3", expected_seq, kTypeMerge}, | |
2448 | {"key5", "value5_1,value5_2", 0, kTypeValue}, | |
2449 | {"key6", "value6_2", expected_seq, kTypeValue}, | |
2450 | {"key7", "value7_2", expected_seq, kTypeValue}, | |
2451 | }); | |
2452 | ASSERT_OK(transaction->Commit()); | |
2453 | VerifyKeys({ | |
2454 | {"key1", "value1_2"}, | |
2455 | {"key2", "NOT_FOUND"}, | |
2456 | {"key3", "NOT_FOUND"}, | |
2457 | {"key4", "value4_1,value4_2"}, | |
2458 | {"key5", "value5_1,value5_2,value5_3"}, | |
2459 | {"key6", "value6_2"}, | |
2460 | {"key7", "value7_2"}, | |
2461 | }); | |
2462 | delete transaction; | |
2463 | } | |
2464 | ||
2465 | // Compaction should keep keys visible to a snapshot based on commit sequence, | |
2466 | // not just prepare sequence. | |
2467 | TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { | |
2468 | options.disable_auto_compactions = true; | |
2469 | ReOpen(); | |
2470 | // Keep track of expected sequence number. | |
2471 | SequenceNumber expected_seq = 0; | |
2472 | auto* txn1 = db->BeginTransaction(WriteOptions()); | |
2473 | ASSERT_OK(txn1->SetName("txn1")); | |
2474 | ASSERT_OK(txn1->Put("key1", "value1_1")); | |
2475 | ASSERT_OK(txn1->Prepare()); | |
2476 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
2477 | ASSERT_OK(txn1->Commit()); | |
20effc67 | 2478 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 TL |
2479 | ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence()); |
2480 | delete txn1; | |
2481 | // Take a snapshots to avoid keys get evicted before compaction. | |
2482 | const Snapshot* snapshot1 = db->GetSnapshot(); | |
2483 | auto* txn2 = db->BeginTransaction(WriteOptions()); | |
2484 | ASSERT_OK(txn2->SetName("txn2")); | |
2485 | ASSERT_OK(txn2->Put("key2", "value2_1")); | |
2486 | ASSERT_OK(txn2->Prepare()); | |
2487 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
2488 | // txn1 commit before snapshot2 and it is visible to snapshot2. | |
2489 | // txn2 commit after snapshot2 and it is not visible. | |
2490 | const Snapshot* snapshot2 = db->GetSnapshot(); | |
2491 | ASSERT_OK(txn2->Commit()); | |
2492 | ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
2493 | delete txn2; | |
2494 | // Take a snapshots to avoid keys get evicted before compaction. | |
2495 | const Snapshot* snapshot3 = db->GetSnapshot(); | |
2496 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2")); | |
2497 | expected_seq++; // 1 for write | |
2498 | SequenceNumber seq1 = expected_seq; | |
2499 | if (options.two_write_queues) { | |
2500 | expected_seq++; // 1 for commit | |
2501 | } | |
2502 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
2503 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2")); | |
2504 | expected_seq++; // 1 for write | |
2505 | SequenceNumber seq2 = expected_seq; | |
2506 | if (options.two_write_queues) { | |
2507 | expected_seq++; // 1 for commit | |
2508 | } | |
2509 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
2510 | ASSERT_OK(db->Flush(FlushOptions())); | |
2511 | db->ReleaseSnapshot(snapshot1); | |
2512 | db->ReleaseSnapshot(snapshot3); | |
2513 | // Dummy keys to avoid compaction trivially move files and get around actual | |
2514 | // compaction logic. | |
2515 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
2516 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
2517 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
2518 | VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}}); | |
2519 | VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2); | |
2520 | VerifyInternalKeys({ | |
2521 | {"key1", "value1_2", seq1, kTypeValue}, | |
2522 | // "value1_1" is visible to snapshot2. Also keys at bottom level visible | |
2523 | // to earliest snapshot will output with seq = 0. | |
2524 | {"key1", "value1_1", 0, kTypeValue}, | |
2525 | {"key2", "value2_2", seq2, kTypeValue}, | |
2526 | }); | |
2527 | db->ReleaseSnapshot(snapshot2); | |
2528 | } | |
2529 | ||
494da23a TL |
2530 | TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) { |
2531 | const size_t snapshot_cache_bits = 7; // same as default | |
2532 | const size_t commit_cache_bits = 0; // disable commit cache | |
2533 | for (bool has_recent_prepare : {true, false}) { | |
2534 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
2535 | ReOpen(); | |
2536 | ||
2537 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); | |
2538 | auto* transaction = | |
2539 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2540 | ASSERT_OK(transaction->SetName("txn")); | |
2541 | ASSERT_OK(transaction->Delete("key1")); | |
2542 | ASSERT_OK(transaction->Prepare()); | |
2543 | // snapshot1 should get min_uncommitted from prepared_txns_ heap. | |
2544 | auto snapshot1 = db->GetSnapshot(); | |
2545 | ASSERT_EQ(transaction->GetId(), | |
2546 | ((SnapshotImpl*)snapshot1)->min_uncommitted_); | |
2547 | // Add a commit to advance max_evicted_seq and move the prepared transaction | |
2548 | // into delayed_prepared_ set. | |
2549 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); | |
2550 | Transaction* txn2 = nullptr; | |
2551 | if (has_recent_prepare) { | |
2552 | txn2 = | |
2553 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2554 | ASSERT_OK(txn2->SetName("txn2")); | |
2555 | ASSERT_OK(txn2->Put("key3", "value3")); | |
2556 | ASSERT_OK(txn2->Prepare()); | |
2557 | } | |
2558 | // snapshot2 should get min_uncommitted from delayed_prepared_ set. | |
2559 | auto snapshot2 = db->GetSnapshot(); | |
2560 | ASSERT_EQ(transaction->GetId(), | |
2561 | ((SnapshotImpl*)snapshot1)->min_uncommitted_); | |
2562 | ASSERT_OK(transaction->Commit()); | |
2563 | delete transaction; | |
2564 | if (has_recent_prepare) { | |
2565 | ASSERT_OK(txn2->Commit()); | |
2566 | delete txn2; | |
2567 | } | |
2568 | VerifyKeys({{"key1", "NOT_FOUND"}}); | |
2569 | VerifyKeys({{"key1", "value1"}}, snapshot1); | |
2570 | VerifyKeys({{"key1", "value1"}}, snapshot2); | |
2571 | db->ReleaseSnapshot(snapshot1); | |
2572 | db->ReleaseSnapshot(snapshot2); | |
2573 | } | |
2574 | } | |
2575 | ||
2576 | // Insert two values, v1 and v2, for a key. Between prepare and commit of v2 | |
2577 | // take two snapshots, s1 and s2. Release s1 during compaction. | |
2578 | // Test to make sure compaction doesn't get confused and think s1 can see both | |
2579 | // values, and thus compact out the older value by mistake. | |
2580 | TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) { | |
2581 | const size_t snapshot_cache_bits = 7; // same as default | |
2582 | const size_t commit_cache_bits = 0; // minimum commit cache | |
2583 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
2584 | ReOpen(); | |
2585 | ||
2586 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1")); | |
2587 | auto* transaction = | |
2588 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2589 | ASSERT_OK(transaction->SetName("txn")); | |
2590 | ASSERT_OK(transaction->Put("key1", "value1_2")); | |
2591 | ASSERT_OK(transaction->Prepare()); | |
2592 | auto snapshot1 = db->GetSnapshot(); | |
2593 | // Increment sequence number. | |
2594 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); | |
2595 | auto snapshot2 = db->GetSnapshot(); | |
2596 | ASSERT_OK(transaction->Commit()); | |
2597 | delete transaction; | |
2598 | VerifyKeys({{"key1", "value1_2"}}); | |
2599 | VerifyKeys({{"key1", "value1_1"}}, snapshot1); | |
2600 | VerifyKeys({{"key1", "value1_1"}}, snapshot2); | |
2601 | // Add a flush to avoid compaction to fallback to trivial move. | |
2602 | ||
2603 | auto callback = [&](void*) { | |
2604 | // Release snapshot1 after CompactionIterator init. | |
2605 | // CompactionIterator need to figure out the earliest snapshot | |
2606 | // that can see key1:value1_2 is kMaxSequenceNumber, not | |
2607 | // snapshot1 or snapshot2. | |
2608 | db->ReleaseSnapshot(snapshot1); | |
2609 | // Add some keys to advance max_evicted_seq. | |
2610 | ASSERT_OK(db->Put(WriteOptions(), "key3", "value3")); | |
2611 | ASSERT_OK(db->Put(WriteOptions(), "key4", "value4")); | |
2612 | }; | |
2613 | SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", | |
2614 | callback); | |
2615 | SyncPoint::GetInstance()->EnableProcessing(); | |
2616 | ||
2617 | ASSERT_OK(db->Flush(FlushOptions())); | |
2618 | VerifyKeys({{"key1", "value1_2"}}); | |
2619 | VerifyKeys({{"key1", "value1_1"}}, snapshot2); | |
2620 | db->ReleaseSnapshot(snapshot2); | |
2621 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2622 | } | |
2623 | ||
2624 | // Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2, | |
2625 | // after committing v2. Release s1 during compaction, right after compaction | |
2626 | // processes v2 and before processes v1. Test to make sure compaction doesn't | |
2627 | // get confused and believe v1 and v2 are visible to different snapshot | |
2628 | // (v1 by s2, v2 by s1) and refuse to compact out v1. | |
2629 | TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) { | |
2630 | const size_t snapshot_cache_bits = 7; // same as default | |
2631 | const size_t commit_cache_bits = 0; // minimum commit cache | |
2632 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
2633 | ReOpen(); | |
2634 | ||
2635 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); | |
2636 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value2")); | |
2637 | SequenceNumber v2_seq = db->GetLatestSequenceNumber(); | |
2638 | auto* s1 = db->GetSnapshot(); | |
2639 | // Advance sequence number. | |
2640 | ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy")); | |
2641 | auto* s2 = db->GetSnapshot(); | |
2642 | ||
2643 | int count_value = 0; | |
2644 | auto callback = [&](void* arg) { | |
2645 | auto* ikey = reinterpret_cast<ParsedInternalKey*>(arg); | |
2646 | if (ikey->user_key == "key1") { | |
2647 | count_value++; | |
2648 | if (count_value == 2) { | |
2649 | // Processing v1. | |
2650 | db->ReleaseSnapshot(s1); | |
2651 | // Add some keys to advance max_evicted_seq and update | |
2652 | // old_commit_map. | |
2653 | ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy")); | |
2654 | ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy")); | |
2655 | } | |
2656 | } | |
2657 | }; | |
2658 | SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV", | |
2659 | callback); | |
2660 | SyncPoint::GetInstance()->EnableProcessing(); | |
2661 | ||
2662 | ASSERT_OK(db->Flush(FlushOptions())); | |
2663 | // value1 should be compact out. | |
2664 | VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}}); | |
2665 | ||
2666 | // cleanup | |
2667 | db->ReleaseSnapshot(s2); | |
2668 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2669 | } | |
2670 | ||
2671 | // Insert two values, v1 and v2, for a key. Insert another dummy key | |
2672 | // so to evict the commit cache for v2, while v1 is still in commit cache. | |
2673 | // Take two snapshots, s1 and s2. Release s1 during compaction. | |
2674 | // Since commit cache for v2 is evicted, and old_commit_map don't have | |
2675 | // s1 (it is released), | |
2676 | // TODO(myabandeh): how can we be sure that the v2's commit info is evicted | |
2677 | // (and not v1's)? Instead of putting a dummy, we can directly call | |
2678 | // AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache. | |
2679 | TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) { | |
2680 | const size_t snapshot_cache_bits = 7; // same as default | |
2681 | const size_t commit_cache_bits = 1; // commit cache size = 2 | |
2682 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
2683 | ReOpen(); | |
2684 | ||
2685 | // Add a dummy key to evict v2 commit cache, but keep v1 commit cache. | |
2686 | // It also advance max_evicted_seq and can trigger old_commit_map cleanup. | |
2687 | auto add_dummy = [&]() { | |
2688 | auto* txn_dummy = | |
2689 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2690 | ASSERT_OK(txn_dummy->SetName("txn_dummy")); | |
2691 | ASSERT_OK(txn_dummy->Put("dummy", "dummy")); | |
2692 | ASSERT_OK(txn_dummy->Prepare()); | |
2693 | ASSERT_OK(txn_dummy->Commit()); | |
2694 | delete txn_dummy; | |
2695 | }; | |
2696 | ||
2697 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); | |
2698 | auto* txn = | |
2699 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2700 | ASSERT_OK(txn->SetName("txn")); | |
2701 | ASSERT_OK(txn->Put("key1", "value2")); | |
2702 | ASSERT_OK(txn->Prepare()); | |
2703 | // TODO(myabandeh): replace it with GetId()? | |
2704 | auto v2_seq = db->GetLatestSequenceNumber(); | |
2705 | ASSERT_OK(txn->Commit()); | |
2706 | delete txn; | |
2707 | auto* s1 = db->GetSnapshot(); | |
2708 | // Dummy key to advance sequence number. | |
2709 | add_dummy(); | |
2710 | auto* s2 = db->GetSnapshot(); | |
2711 | ||
2712 | auto callback = [&](void*) { | |
2713 | db->ReleaseSnapshot(s1); | |
2714 | // Add some dummy entries to trigger s1 being cleanup from old_commit_map. | |
2715 | add_dummy(); | |
2716 | add_dummy(); | |
2717 | }; | |
2718 | SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", | |
2719 | callback); | |
2720 | SyncPoint::GetInstance()->EnableProcessing(); | |
2721 | ||
2722 | ASSERT_OK(db->Flush(FlushOptions())); | |
2723 | // value1 should be compact out. | |
2724 | VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}}); | |
2725 | ||
2726 | db->ReleaseSnapshot(s2); | |
2727 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2728 | } | |
2729 | ||
2730 | TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) { | |
2731 | const size_t snapshot_cache_bits = 7; // same as default | |
2732 | const size_t commit_cache_bits = 0; // minimum commit cache | |
2733 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
2734 | ReOpen(); | |
2735 | ||
2736 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); | |
2737 | auto* transaction = | |
2738 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2739 | ASSERT_OK(transaction->SetName("txn")); | |
2740 | ASSERT_OK(transaction->Delete("key1")); | |
2741 | ASSERT_OK(transaction->Prepare()); | |
2742 | SequenceNumber del_seq = db->GetLatestSequenceNumber(); | |
2743 | auto snapshot1 = db->GetSnapshot(); | |
2744 | // Increment sequence number. | |
2745 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); | |
2746 | auto snapshot2 = db->GetSnapshot(); | |
2747 | ASSERT_OK(transaction->Commit()); | |
2748 | delete transaction; | |
2749 | VerifyKeys({{"key1", "NOT_FOUND"}}); | |
2750 | VerifyKeys({{"key1", "value1"}}, snapshot1); | |
2751 | VerifyKeys({{"key1", "value1"}}, snapshot2); | |
2752 | ASSERT_OK(db->Flush(FlushOptions())); | |
2753 | ||
2754 | auto callback = [&](void* compaction) { | |
2755 | // Release snapshot1 after CompactionIterator init. | |
2756 | // CompactionIterator need to double check and find out snapshot2 is now | |
2757 | // the earliest existing snapshot. | |
2758 | if (compaction != nullptr) { | |
2759 | db->ReleaseSnapshot(snapshot1); | |
2760 | // Add some keys to advance max_evicted_seq. | |
2761 | ASSERT_OK(db->Put(WriteOptions(), "key3", "value3")); | |
2762 | ASSERT_OK(db->Put(WriteOptions(), "key4", "value4")); | |
2763 | } | |
2764 | }; | |
2765 | SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", | |
2766 | callback); | |
2767 | SyncPoint::GetInstance()->EnableProcessing(); | |
2768 | ||
2769 | // Dummy keys to avoid compaction trivially move files and get around actual | |
2770 | // compaction logic. | |
2771 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
2772 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
2773 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
2774 | // Only verify for key1. Both the put and delete for the key should be kept. | |
2775 | // Since the delete tombstone is not visible to snapshot2, we need to keep | |
2776 | // at least one version of the key, for write-conflict check. | |
2777 | VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion}, | |
2778 | {"key1", "value1", 0, kTypeValue}}); | |
2779 | db->ReleaseSnapshot(snapshot2); | |
2780 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2781 | } | |
2782 | ||
11fdf7f2 TL |
2783 | // A more complex test to verify compaction/flush should keep keys visible |
2784 | // to snapshots. | |
2785 | TEST_P(WritePreparedTransactionTest, | |
f67539c2 | 2786 | CompactionKeepSnapshotVisibleKeysRandomized) { |
11fdf7f2 TL |
2787 | constexpr size_t kNumTransactions = 10; |
2788 | constexpr size_t kNumIterations = 1000; | |
2789 | ||
2790 | std::vector<Transaction*> transactions(kNumTransactions, nullptr); | |
2791 | std::vector<size_t> versions(kNumTransactions, 0); | |
2792 | std::unordered_map<std::string, std::string> current_data; | |
2793 | std::vector<const Snapshot*> snapshots; | |
2794 | std::vector<std::unordered_map<std::string, std::string>> snapshot_data; | |
2795 | ||
2796 | Random rnd(1103); | |
2797 | options.disable_auto_compactions = true; | |
2798 | ReOpen(); | |
2799 | ||
2800 | for (size_t i = 0; i < kNumTransactions; i++) { | |
2801 | std::string key = "key" + ToString(i); | |
2802 | std::string value = "value0"; | |
2803 | ASSERT_OK(db->Put(WriteOptions(), key, value)); | |
2804 | current_data[key] = value; | |
2805 | } | |
2806 | VerifyKeys(current_data); | |
2807 | ||
2808 | for (size_t iter = 0; iter < kNumIterations; iter++) { | |
2809 | auto r = rnd.Next() % (kNumTransactions + 1); | |
2810 | if (r < kNumTransactions) { | |
2811 | std::string key = "key" + ToString(r); | |
2812 | if (transactions[r] == nullptr) { | |
2813 | std::string value = "value" + ToString(versions[r] + 1); | |
2814 | auto* txn = db->BeginTransaction(WriteOptions()); | |
2815 | ASSERT_OK(txn->SetName("txn" + ToString(r))); | |
2816 | ASSERT_OK(txn->Put(key, value)); | |
2817 | ASSERT_OK(txn->Prepare()); | |
2818 | transactions[r] = txn; | |
2819 | } else { | |
2820 | std::string value = "value" + ToString(++versions[r]); | |
2821 | ASSERT_OK(transactions[r]->Commit()); | |
2822 | delete transactions[r]; | |
2823 | transactions[r] = nullptr; | |
2824 | current_data[key] = value; | |
2825 | } | |
2826 | } else { | |
2827 | auto* snapshot = db->GetSnapshot(); | |
2828 | VerifyKeys(current_data, snapshot); | |
2829 | snapshots.push_back(snapshot); | |
2830 | snapshot_data.push_back(current_data); | |
2831 | } | |
2832 | VerifyKeys(current_data); | |
2833 | } | |
2834 | // Take a last snapshot to test compaction with uncommitted prepared | |
2835 | // transaction. | |
2836 | snapshots.push_back(db->GetSnapshot()); | |
2837 | snapshot_data.push_back(current_data); | |
2838 | ||
2839 | assert(snapshots.size() == snapshot_data.size()); | |
2840 | for (size_t i = 0; i < snapshots.size(); i++) { | |
2841 | VerifyKeys(snapshot_data[i], snapshots[i]); | |
2842 | } | |
2843 | ASSERT_OK(db->Flush(FlushOptions())); | |
2844 | for (size_t i = 0; i < snapshots.size(); i++) { | |
2845 | VerifyKeys(snapshot_data[i], snapshots[i]); | |
2846 | } | |
2847 | // Dummy keys to avoid compaction trivially move files and get around actual | |
2848 | // compaction logic. | |
2849 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
2850 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
2851 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
2852 | for (size_t i = 0; i < snapshots.size(); i++) { | |
2853 | VerifyKeys(snapshot_data[i], snapshots[i]); | |
2854 | } | |
2855 | // cleanup | |
2856 | for (size_t i = 0; i < kNumTransactions; i++) { | |
2857 | if (transactions[i] == nullptr) { | |
2858 | continue; | |
2859 | } | |
2860 | ASSERT_OK(transactions[i]->Commit()); | |
2861 | delete transactions[i]; | |
2862 | } | |
2863 | for (size_t i = 0; i < snapshots.size(); i++) { | |
2864 | db->ReleaseSnapshot(snapshots[i]); | |
2865 | } | |
2866 | } | |
2867 | ||
2868 | // Compaction should not apply the optimization to output key with sequence | |
2869 | // number equal to 0 if the key is not visible to earliest snapshot, based on | |
2870 | // commit sequence number. | |
2871 | TEST_P(WritePreparedTransactionTest, | |
2872 | CompactionShouldKeepSequenceForUncommittedKeys) { | |
2873 | options.disable_auto_compactions = true; | |
2874 | ReOpen(); | |
2875 | // Keep track of expected sequence number. | |
2876 | SequenceNumber expected_seq = 0; | |
2877 | auto* transaction = db->BeginTransaction(WriteOptions()); | |
2878 | ASSERT_OK(transaction->SetName("txn")); | |
2879 | ASSERT_OK(transaction->Put("key1", "value1")); | |
2880 | ASSERT_OK(transaction->Prepare()); | |
2881 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
2882 | SequenceNumber seq1 = expected_seq; | |
2883 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); | |
20effc67 | 2884 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 TL |
2885 | expected_seq++; // one for data |
2886 | if (options.two_write_queues) { | |
2887 | expected_seq++; // one for commit | |
2888 | } | |
2889 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
2890 | ASSERT_OK(db->Flush(FlushOptions())); | |
2891 | // Dummy keys to avoid compaction trivially move files and get around actual | |
2892 | // compaction logic. | |
2893 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
2894 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
2895 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
2896 | VerifyKeys({ | |
2897 | {"key1", "NOT_FOUND"}, | |
2898 | {"key2", "value2"}, | |
2899 | }); | |
2900 | VerifyInternalKeys({ | |
2901 | // "key1" has not been committed. It keeps its sequence number. | |
2902 | {"key1", "value1", seq1, kTypeValue}, | |
2903 | // "key2" is committed and output with seq = 0. | |
2904 | {"key2", "value2", 0, kTypeValue}, | |
2905 | }); | |
2906 | ASSERT_OK(transaction->Commit()); | |
2907 | VerifyKeys({ | |
2908 | {"key1", "value1"}, | |
2909 | {"key2", "value2"}, | |
2910 | }); | |
2911 | delete transaction; | |
2912 | } | |
2913 | ||
494da23a TL |
2914 | TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) { |
2915 | options.disable_auto_compactions = true; | |
2916 | ReOpen(); | |
2917 | ||
2918 | const Snapshot* snapshot = nullptr; | |
2919 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); | |
2920 | auto* txn = db->BeginTransaction(WriteOptions()); | |
2921 | ASSERT_OK(txn->SetName("txn")); | |
2922 | ASSERT_OK(txn->Put("key1", "value2")); | |
2923 | ASSERT_OK(txn->Prepare()); | |
2924 | ||
2925 | auto callback = [&](void*) { | |
2926 | // Snapshot is taken after compaction start. It should be taken into | |
2927 | // consideration for whether to compact out value1. | |
2928 | snapshot = db->GetSnapshot(); | |
2929 | ASSERT_OK(txn->Commit()); | |
2930 | delete txn; | |
2931 | }; | |
2932 | SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", | |
2933 | callback); | |
2934 | SyncPoint::GetInstance()->EnableProcessing(); | |
2935 | ASSERT_OK(db->Flush(FlushOptions())); | |
2936 | ASSERT_NE(nullptr, snapshot); | |
2937 | VerifyKeys({{"key1", "value2"}}); | |
2938 | VerifyKeys({{"key1", "value1"}}, snapshot); | |
2939 | db->ReleaseSnapshot(snapshot); | |
2940 | } | |
2941 | ||
11fdf7f2 TL |
2942 | TEST_P(WritePreparedTransactionTest, Iterate) { |
2943 | auto verify_state = [](Iterator* iter, const std::string& key, | |
2944 | const std::string& value) { | |
2945 | ASSERT_TRUE(iter->Valid()); | |
2946 | ASSERT_OK(iter->status()); | |
2947 | ASSERT_EQ(key, iter->key().ToString()); | |
2948 | ASSERT_EQ(value, iter->value().ToString()); | |
2949 | }; | |
2950 | ||
2951 | auto verify_iter = [&](const std::string& expected_val) { | |
2952 | // Get iterator from a concurrent transaction and make sure it has the | |
2953 | // same view as an iterator from the DB. | |
2954 | auto* txn = db->BeginTransaction(WriteOptions()); | |
2955 | ||
2956 | for (int i = 0; i < 2; i++) { | |
2957 | Iterator* iter = (i == 0) | |
2958 | ? db->NewIterator(ReadOptions()) | |
2959 | : txn->GetIterator(ReadOptions()); | |
2960 | // Seek | |
2961 | iter->Seek("foo"); | |
2962 | verify_state(iter, "foo", expected_val); | |
2963 | // Next | |
2964 | iter->Seek("a"); | |
2965 | verify_state(iter, "a", "va"); | |
2966 | iter->Next(); | |
2967 | verify_state(iter, "foo", expected_val); | |
2968 | // SeekForPrev | |
2969 | iter->SeekForPrev("y"); | |
2970 | verify_state(iter, "foo", expected_val); | |
2971 | // Prev | |
2972 | iter->SeekForPrev("z"); | |
2973 | verify_state(iter, "z", "vz"); | |
2974 | iter->Prev(); | |
2975 | verify_state(iter, "foo", expected_val); | |
2976 | delete iter; | |
2977 | } | |
2978 | delete txn; | |
2979 | }; | |
2980 | ||
2981 | ASSERT_OK(db->Put(WriteOptions(), "foo", "v1")); | |
2982 | auto* transaction = db->BeginTransaction(WriteOptions()); | |
2983 | ASSERT_OK(transaction->SetName("txn")); | |
2984 | ASSERT_OK(transaction->Put("foo", "v2")); | |
2985 | ASSERT_OK(transaction->Prepare()); | |
2986 | VerifyKeys({{"foo", "v1"}}); | |
2987 | // dummy keys | |
2988 | ASSERT_OK(db->Put(WriteOptions(), "a", "va")); | |
2989 | ASSERT_OK(db->Put(WriteOptions(), "z", "vz")); | |
2990 | verify_iter("v1"); | |
2991 | ASSERT_OK(transaction->Commit()); | |
2992 | VerifyKeys({{"foo", "v2"}}); | |
2993 | verify_iter("v2"); | |
2994 | delete transaction; | |
2995 | } | |
2996 | ||
2997 | TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) { | |
2998 | Iterator* iter = db->NewIterator(ReadOptions()); | |
2999 | ASSERT_TRUE(iter->Refresh().IsNotSupported()); | |
3000 | delete iter; | |
3001 | } | |
3002 | ||
494da23a TL |
3003 | // Committing an delayed prepared has two non-atomic steps: update commit cache, |
3004 | // remove seq from delayed_prepared_. The read in IsInSnapshot also involves two | |
3005 | // non-atomic steps of checking these two data structures. This test breaks each | |
3006 | // in the middle to ensure correctness in spite of non-atomic execution. | |
3007 | // Note: This test is limitted to the case where snapshot is larger than the | |
3008 | // max_evicted_seq_. | |
3009 | TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) { | |
3010 | const size_t snapshot_cache_bits = 7; // same as default | |
3011 | const size_t commit_cache_bits = 3; // 8 entries | |
3012 | for (auto split_read : {true, false}) { | |
3013 | std::vector<bool> split_options = {false}; | |
3014 | if (split_read) { | |
3015 | // Also test for break before mutex | |
3016 | split_options.push_back(true); | |
3017 | } | |
3018 | for (auto split_before_mutex : split_options) { | |
3019 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
3020 | ReOpen(); | |
3021 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
20effc67 | 3022 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
494da23a TL |
3023 | // Fill up the commit cache |
3024 | std::string init_value("value1"); | |
3025 | for (int i = 0; i < 10; i++) { | |
3026 | db->Put(WriteOptions(), Slice("key1"), Slice(init_value)); | |
3027 | } | |
3028 | // Prepare a transaction but do not commit it | |
3029 | Transaction* txn = | |
3030 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3031 | ASSERT_OK(txn->SetName("xid")); | |
3032 | ASSERT_OK(txn->Put(Slice("key1"), Slice("value2"))); | |
3033 | ASSERT_OK(txn->Prepare()); | |
3034 | // Commit a bunch of entries to advance max evicted seq and make the | |
3035 | // prepared a delayed prepared | |
3036 | for (int i = 0; i < 10; i++) { | |
3037 | db->Put(WriteOptions(), Slice("key3"), Slice("value3")); | |
3038 | } | |
3039 | // The snapshot should not see the delayed prepared entry | |
3040 | auto snap = db->GetSnapshot(); | |
3041 | ||
3042 | if (split_read) { | |
3043 | if (split_before_mutex) { | |
3044 | // split before acquiring prepare_mutex_ | |
f67539c2 | 3045 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
494da23a TL |
3046 | {{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause", |
3047 | "AtomicCommitOfDelayedPrepared:Commit:before"}, | |
3048 | {"AtomicCommitOfDelayedPrepared:Commit:after", | |
3049 | "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}}); | |
3050 | } else { | |
3051 | // split right after reading from the commit cache | |
f67539c2 | 3052 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
494da23a TL |
3053 | {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause", |
3054 | "AtomicCommitOfDelayedPrepared:Commit:before"}, | |
3055 | {"AtomicCommitOfDelayedPrepared:Commit:after", | |
3056 | "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}}); | |
3057 | } | |
3058 | } else { // split commit | |
3059 | // split right before removing from delayed_prepared_ | |
f67539c2 | 3060 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
494da23a TL |
3061 | {{"WritePreparedTxnDB::RemovePrepared:pause", |
3062 | "AtomicCommitOfDelayedPrepared:Read:before"}, | |
3063 | {"AtomicCommitOfDelayedPrepared:Read:after", | |
3064 | "WritePreparedTxnDB::RemovePrepared:resume"}}); | |
3065 | } | |
3066 | SyncPoint::GetInstance()->EnableProcessing(); | |
3067 | ||
f67539c2 | 3068 | ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() { |
494da23a TL |
3069 | TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before"); |
3070 | ASSERT_OK(txn->Commit()); | |
3071 | if (split_before_mutex) { | |
3072 | // Do bunch of inserts to evict the commit entry from the cache. This | |
3073 | // would prevent the 2nd look into commit cache under prepare_mutex_ | |
3074 | // to see the commit entry. | |
3075 | auto seq = db_impl->TEST_GetLastVisibleSequence(); | |
3076 | size_t tries = 0; | |
3077 | while (wp_db->max_evicted_seq_ < seq && tries < 50) { | |
3078 | db->Put(WriteOptions(), Slice("key3"), Slice("value3")); | |
3079 | tries++; | |
3080 | }; | |
3081 | ASSERT_LT(tries, 50); | |
3082 | } | |
3083 | TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after"); | |
3084 | delete txn; | |
3085 | }); | |
3086 | ||
f67539c2 | 3087 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { |
494da23a TL |
3088 | TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before"); |
3089 | ReadOptions roptions; | |
3090 | roptions.snapshot = snap; | |
3091 | PinnableSlice value; | |
3092 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value); | |
3093 | ASSERT_OK(s); | |
3094 | // It should not see the commit of delayed prepared | |
3095 | ASSERT_TRUE(value == init_value); | |
3096 | TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after"); | |
3097 | db->ReleaseSnapshot(snap); | |
3098 | }); | |
3099 | ||
3100 | read_thread.join(); | |
3101 | commit_thread.join(); | |
f67539c2 TL |
3102 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
3103 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
494da23a TL |
3104 | } // for split_before_mutex |
3105 | } // for split_read | |
3106 | } | |
3107 | ||
3108 | // When max evicted seq advances a prepared seq, it involves two updates: i) | |
3109 | // adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_. | |
3110 | // ::IsInSnapshot also reads these two values in a non-atomic way. This test | |
3111 | // ensures correctness if the update occurs after ::IsInSnapshot reads | |
3112 | // delayed_prepared_empty_ and before it reads max_evicted_seq_. | |
3113 | // Note: this test focuses on read snapshot larger than max_evicted_seq_. | |
3114 | TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) { | |
3115 | const size_t snapshot_cache_bits = 7; // same as default | |
3116 | const size_t commit_cache_bits = 3; // 8 entries | |
3117 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
3118 | ReOpen(); | |
3119 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
3120 | // Fill up the commit cache | |
3121 | std::string init_value("value1"); | |
3122 | for (int i = 0; i < 10; i++) { | |
3123 | db->Put(WriteOptions(), Slice("key1"), Slice(init_value)); | |
3124 | } | |
3125 | // Prepare a transaction but do not commit it | |
3126 | Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3127 | ASSERT_OK(txn->SetName("xid")); | |
3128 | ASSERT_OK(txn->Put(Slice("key1"), Slice("value2"))); | |
3129 | ASSERT_OK(txn->Prepare()); | |
3130 | // Create a gap between prepare seq and snapshot seq | |
3131 | db->Put(WriteOptions(), Slice("key3"), Slice("value3")); | |
3132 | db->Put(WriteOptions(), Slice("key3"), Slice("value3")); | |
3133 | // The snapshot should not see the delayed prepared entry | |
3134 | auto snap = db->GetSnapshot(); | |
3135 | ASSERT_LT(txn->GetId(), snap->GetSequenceNumber()); | |
3136 | ||
3137 | // split right after reading delayed_prepared_empty_ | |
f67539c2 | 3138 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
494da23a TL |
3139 | {{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause", |
3140 | "AtomicUpdateOfDelayedPrepared:before"}, | |
3141 | {"AtomicUpdateOfDelayedPrepared:after", | |
3142 | "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}}); | |
3143 | SyncPoint::GetInstance()->EnableProcessing(); | |
3144 | ||
f67539c2 | 3145 | ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() { |
494da23a TL |
3146 | TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before"); |
3147 | // Commit a bunch of entries to advance max evicted seq and make the | |
3148 | // prepared a delayed prepared | |
3149 | size_t tries = 0; | |
3150 | while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) { | |
3151 | db->Put(WriteOptions(), Slice("key3"), Slice("value3")); | |
3152 | tries++; | |
3153 | }; | |
3154 | ASSERT_LT(tries, 50); | |
3155 | // This is the case on which the test focuses | |
3156 | ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber()); | |
3157 | TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after"); | |
3158 | }); | |
3159 | ||
f67539c2 | 3160 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { |
494da23a TL |
3161 | ReadOptions roptions; |
3162 | roptions.snapshot = snap; | |
3163 | PinnableSlice value; | |
3164 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value); | |
3165 | ASSERT_OK(s); | |
3166 | // It should not see the uncommitted value of delayed prepared | |
3167 | ASSERT_TRUE(value == init_value); | |
3168 | db->ReleaseSnapshot(snap); | |
3169 | }); | |
3170 | ||
3171 | read_thread.join(); | |
3172 | commit_thread.join(); | |
3173 | ASSERT_OK(txn->Commit()); | |
3174 | delete txn; | |
f67539c2 TL |
3175 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
3176 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
494da23a TL |
3177 | } |
3178 | ||
3179 | // Eviction from commit cache and update of max evicted seq are two non-atomic | |
3180 | // steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading | |
3181 | // from commit cache are two non-atomic steps. This tests if the update occurs | |
3182 | // after reading max_evicted_seq_ and before reading the commit cache. | |
3183 | // Note: the test focuses on snapshot larger than max_evicted_seq_ | |
3184 | TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) { | |
3185 | const size_t snapshot_cache_bits = 7; // same as default | |
3186 | const size_t commit_cache_bits = 3; // 8 entries | |
3187 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
3188 | ReOpen(); | |
3189 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
3190 | // Fill up the commit cache | |
3191 | std::string init_value("value1"); | |
3192 | std::string last_value("value_final"); | |
3193 | for (int i = 0; i < 10; i++) { | |
3194 | db->Put(WriteOptions(), Slice("key1"), Slice(init_value)); | |
3195 | } | |
3196 | // Do an uncommitted write to prevent min_uncommitted optimization | |
3197 | Transaction* txn1 = | |
3198 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3199 | ASSERT_OK(txn1->SetName("xid1")); | |
3200 | ASSERT_OK(txn1->Put(Slice("key0"), last_value)); | |
3201 | ASSERT_OK(txn1->Prepare()); | |
3202 | // Do a write with prepare to get the prepare seq | |
3203 | Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3204 | ASSERT_OK(txn->SetName("xid")); | |
3205 | ASSERT_OK(txn->Put(Slice("key1"), last_value)); | |
3206 | ASSERT_OK(txn->Prepare()); | |
3207 | ASSERT_OK(txn->Commit()); | |
3208 | // Create a gap between commit entry and snapshot seq | |
3209 | db->Put(WriteOptions(), Slice("key3"), Slice("value3")); | |
3210 | db->Put(WriteOptions(), Slice("key3"), Slice("value3")); | |
3211 | // The snapshot should see the last commit | |
3212 | auto snap = db->GetSnapshot(); | |
3213 | ASSERT_LE(txn->GetId(), snap->GetSequenceNumber()); | |
3214 | ||
3215 | // split right after reading max_evicted_seq_ | |
f67539c2 | 3216 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
494da23a TL |
3217 | {{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause", |
3218 | "NonAtomicUpdateOfMaxEvictedSeq:before"}, | |
3219 | {"NonAtomicUpdateOfMaxEvictedSeq:after", | |
3220 | "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}}); | |
3221 | SyncPoint::GetInstance()->EnableProcessing(); | |
3222 | ||
f67539c2 | 3223 | ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() { |
494da23a TL |
3224 | TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before"); |
3225 | // Commit a bunch of entries to advance max evicted seq beyond txn->GetId() | |
3226 | size_t tries = 0; | |
3227 | while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) { | |
3228 | db->Put(WriteOptions(), Slice("key3"), Slice("value3")); | |
3229 | tries++; | |
3230 | }; | |
3231 | ASSERT_LT(tries, 50); | |
3232 | // This is the case on which the test focuses | |
3233 | ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber()); | |
3234 | TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after"); | |
3235 | }); | |
3236 | ||
f67539c2 | 3237 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { |
494da23a TL |
3238 | ReadOptions roptions; |
3239 | roptions.snapshot = snap; | |
3240 | PinnableSlice value; | |
3241 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value); | |
3242 | ASSERT_OK(s); | |
3243 | // It should see the committed value of the evicted entry | |
3244 | ASSERT_TRUE(value == last_value); | |
3245 | db->ReleaseSnapshot(snap); | |
3246 | }); | |
3247 | ||
3248 | read_thread.join(); | |
3249 | commit_thread.join(); | |
3250 | delete txn; | |
3251 | txn1->Commit(); | |
3252 | delete txn1; | |
f67539c2 TL |
3253 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
3254 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
494da23a TL |
3255 | } |
3256 | ||
3257 | // Test when we add a prepared seq when the max_evicted_seq_ already goes beyond | |
3258 | // that. The test focuses on a race condition between AddPrepared and | |
3259 | // AdvanceMaxEvictedSeq functions. | |
3260 | TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) { | |
3261 | if (!options.two_write_queues) { | |
3262 | // This test is only for two write queues | |
3263 | return; | |
3264 | } | |
3265 | const size_t snapshot_cache_bits = 7; // same as default | |
3266 | // 1 entry to advance max after the 2nd commit | |
3267 | const size_t commit_cache_bits = 0; | |
3268 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
3269 | ReOpen(); | |
3270 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
3271 | std::string some_value("value_some"); | |
3272 | std::string uncommitted_value("value_uncommitted"); | |
3273 | // Prepare two uncommitted transactions | |
3274 | Transaction* txn1 = | |
3275 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3276 | ASSERT_OK(txn1->SetName("xid1")); | |
3277 | ASSERT_OK(txn1->Put(Slice("key1"), some_value)); | |
3278 | ASSERT_OK(txn1->Prepare()); | |
3279 | Transaction* txn2 = | |
3280 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3281 | ASSERT_OK(txn2->SetName("xid2")); | |
3282 | ASSERT_OK(txn2->Put(Slice("key2"), some_value)); | |
3283 | ASSERT_OK(txn2->Prepare()); | |
3284 | // Start the txn here so the other thread could get its id | |
3285 | Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3286 | ASSERT_OK(txn->SetName("xid")); | |
3287 | ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value)); | |
3288 | port::Mutex txn_mutex_; | |
3289 | ||
f67539c2 TL |
3290 | // t1) Insert prepared entry, t2) commit other entries to advance max |
3291 | // evicted sec and finish checking the existing prepared entries, t1) | |
494da23a | 3292 | // AddPrepared, t2) update max_evicted_seq_ |
f67539c2 TL |
3293 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
3294 | {"AddPreparedCallback::AddPrepared::begin:pause", | |
3295 | "AddPreparedBeforeMax::read_thread:start"}, | |
3296 | {"AdvanceMaxEvictedSeq::update_max:pause", | |
3297 | "AddPreparedCallback::AddPrepared::begin:resume"}, | |
3298 | {"AddPreparedCallback::AddPrepared::end", | |
3299 | "AdvanceMaxEvictedSeq::update_max:resume"}, | |
494da23a TL |
3300 | }); |
3301 | SyncPoint::GetInstance()->EnableProcessing(); | |
3302 | ||
f67539c2 | 3303 | ROCKSDB_NAMESPACE::port::Thread write_thread([&]() { |
494da23a TL |
3304 | txn_mutex_.Lock(); |
3305 | ASSERT_OK(txn->Prepare()); | |
3306 | txn_mutex_.Unlock(); | |
3307 | }); | |
3308 | ||
f67539c2 | 3309 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { |
494da23a TL |
3310 | TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start"); |
3311 | // Publish seq number with a commit | |
3312 | ASSERT_OK(txn1->Commit()); | |
3313 | // Since the commit cache size is one the 2nd commit evict the 1st one and | |
3314 | // invokes AdcanceMaxEvictedSeq | |
3315 | ASSERT_OK(txn2->Commit()); | |
3316 | ||
3317 | ReadOptions roptions; | |
3318 | PinnableSlice value; | |
3319 | // The snapshot should not see the uncommitted value from write_thread | |
3320 | auto snap = db->GetSnapshot(); | |
3321 | ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber()); | |
3322 | // This is the scenario that we test for | |
3323 | txn_mutex_.Lock(); | |
3324 | ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId()); | |
3325 | txn_mutex_.Unlock(); | |
3326 | roptions.snapshot = snap; | |
3327 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value); | |
3328 | ASSERT_TRUE(s.IsNotFound()); | |
3329 | db->ReleaseSnapshot(snap); | |
3330 | }); | |
3331 | ||
3332 | read_thread.join(); | |
3333 | write_thread.join(); | |
3334 | delete txn1; | |
3335 | delete txn2; | |
3336 | ASSERT_OK(txn->Commit()); | |
3337 | delete txn; | |
f67539c2 TL |
3338 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
3339 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
494da23a TL |
3340 | } |
3341 | ||
3342 | // When an old prepared entry gets committed, there is a gap between the time | |
3343 | // that it is published and when it is cleaned up from old_prepared_. This test | |
3344 | // stresses such cases. | |
3345 | TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) { | |
3346 | const size_t snapshot_cache_bits = 7; // same as default | |
3347 | for (const size_t commit_cache_bits : {0, 2, 3}) { | |
3348 | for (const size_t sub_batch_cnt : {1, 2, 3}) { | |
3349 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
3350 | ReOpen(); | |
3351 | std::atomic<const Snapshot*> snap = {nullptr}; | |
3352 | std::atomic<SequenceNumber> exp_prepare = {0}; | |
f67539c2 | 3353 | ROCKSDB_NAMESPACE::port::Thread callback_thread; |
494da23a TL |
3354 | // Value is synchronized via snap |
3355 | PinnableSlice value; | |
3356 | // Take a snapshot after publish and before RemovePrepared:Start | |
f67539c2 TL |
3357 | auto snap_callback = [&]() { |
3358 | ASSERT_EQ(nullptr, snap.load()); | |
3359 | snap.store(db->GetSnapshot()); | |
3360 | ReadOptions roptions; | |
3361 | roptions.snapshot = snap.load(); | |
3362 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key2", &value); | |
3363 | ASSERT_OK(s); | |
3364 | }; | |
494da23a TL |
3365 | auto callback = [&](void* param) { |
3366 | SequenceNumber prep_seq = *((SequenceNumber*)param); | |
3367 | if (prep_seq == exp_prepare.load()) { // only for write_thread | |
f67539c2 TL |
3368 | // We need to spawn a thread to avoid deadlock since getting a |
3369 | // snpashot might end up calling AdvanceSeqByOne which needs joining | |
3370 | // the write queue. | |
3371 | callback_thread = ROCKSDB_NAMESPACE::port::Thread(snap_callback); | |
3372 | TEST_SYNC_POINT("callback:end"); | |
494da23a TL |
3373 | } |
3374 | }; | |
f67539c2 TL |
3375 | // Wait for the first snapshot be taken in GetSnapshotInternal. Although |
3376 | // it might be updated before GetSnapshotInternal finishes but this should | |
3377 | // cover most of the cases. | |
3378 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ | |
3379 | {"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"}, | |
3380 | }); | |
494da23a TL |
3381 | SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback); |
3382 | SyncPoint::GetInstance()->EnableProcessing(); | |
3383 | // Thread to cause frequent evictions | |
f67539c2 | 3384 | ROCKSDB_NAMESPACE::port::Thread eviction_thread([&]() { |
494da23a TL |
3385 | // Too many txns might cause commit_seq - prepare_seq in another thread |
3386 | // to go beyond DELTA_UPPERBOUND | |
3387 | for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) { | |
3388 | db->Put(WriteOptions(), Slice("key1"), Slice("value1")); | |
3389 | } | |
3390 | }); | |
f67539c2 | 3391 | ROCKSDB_NAMESPACE::port::Thread write_thread([&]() { |
494da23a TL |
3392 | for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) { |
3393 | Transaction* txn = | |
3394 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3395 | ASSERT_OK(txn->SetName("xid")); | |
3396 | std::string val_str = "value" + ToString(i); | |
3397 | for (size_t b = 0; b < sub_batch_cnt; b++) { | |
f67539c2 | 3398 | ASSERT_OK(txn->Put(Slice("key2"), val_str)); |
494da23a TL |
3399 | } |
3400 | ASSERT_OK(txn->Prepare()); | |
3401 | // Let an eviction to kick in | |
3402 | std::this_thread::yield(); | |
3403 | ||
3404 | exp_prepare.store(txn->GetId()); | |
3405 | ASSERT_OK(txn->Commit()); | |
3406 | delete txn; | |
f67539c2 TL |
3407 | // Wait for the snapshot taking that is triggered by |
3408 | // RemovePrepared:Start callback | |
3409 | callback_thread.join(); | |
494da23a TL |
3410 | |
3411 | // Read with the snapshot taken before delayed_prepared_ cleanup | |
3412 | ReadOptions roptions; | |
3413 | roptions.snapshot = snap.load(); | |
3414 | ASSERT_NE(nullptr, roptions.snapshot); | |
3415 | PinnableSlice value2; | |
f67539c2 TL |
3416 | auto s = |
3417 | db->Get(roptions, db->DefaultColumnFamily(), "key2", &value2); | |
494da23a TL |
3418 | ASSERT_OK(s); |
3419 | // It should see its own write | |
3420 | ASSERT_TRUE(val_str == value2); | |
3421 | // The value read by snapshot should not change | |
3422 | ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str()); | |
3423 | ||
3424 | db->ReleaseSnapshot(roptions.snapshot); | |
3425 | snap.store(nullptr); | |
3426 | } | |
3427 | }); | |
3428 | write_thread.join(); | |
3429 | eviction_thread.join(); | |
f67539c2 TL |
3430 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
3431 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
494da23a | 3432 | } |
494da23a TL |
3433 | } |
3434 | } | |
3435 | ||
11fdf7f2 TL |
3436 | // Test that updating the commit map will not affect the existing snapshots |
3437 | TEST_P(WritePreparedTransactionTest, AtomicCommit) { | |
3438 | for (bool skip_prepare : {true, false}) { | |
f67539c2 | 3439 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
11fdf7f2 TL |
3440 | {"WritePreparedTxnDB::AddCommitted:start", |
3441 | "AtomicCommit::GetSnapshot:start"}, | |
3442 | {"AtomicCommit::Get:end", | |
3443 | "WritePreparedTxnDB::AddCommitted:start:pause"}, | |
3444 | {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"}, | |
3445 | {"AtomicCommit::Get2:end", | |
3446 | "WritePreparedTxnDB::AddCommitted:end:pause:"}, | |
3447 | }); | |
f67539c2 TL |
3448 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
3449 | ROCKSDB_NAMESPACE::port::Thread write_thread([&]() { | |
11fdf7f2 TL |
3450 | if (skip_prepare) { |
3451 | db->Put(WriteOptions(), Slice("key"), Slice("value")); | |
3452 | } else { | |
3453 | Transaction* txn = | |
3454 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3455 | ASSERT_OK(txn->SetName("xid")); | |
3456 | ASSERT_OK(txn->Put(Slice("key"), Slice("value"))); | |
3457 | ASSERT_OK(txn->Prepare()); | |
3458 | ASSERT_OK(txn->Commit()); | |
3459 | delete txn; | |
3460 | } | |
3461 | }); | |
f67539c2 | 3462 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { |
11fdf7f2 TL |
3463 | ReadOptions roptions; |
3464 | TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start"); | |
3465 | roptions.snapshot = db->GetSnapshot(); | |
3466 | PinnableSlice val; | |
3467 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val); | |
3468 | TEST_SYNC_POINT("AtomicCommit::Get:end"); | |
3469 | TEST_SYNC_POINT("AtomicCommit::Get2:start"); | |
3470 | ASSERT_SAME(roptions, db, s, val, "key"); | |
3471 | TEST_SYNC_POINT("AtomicCommit::Get2:end"); | |
3472 | db->ReleaseSnapshot(roptions.snapshot); | |
3473 | }); | |
3474 | read_thread.join(); | |
3475 | write_thread.join(); | |
f67539c2 | 3476 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
11fdf7f2 TL |
3477 | } |
3478 | } | |
3479 | ||
3480 | // Test that we can change write policy from WriteCommitted to WritePrepared | |
3481 | // after a clean shutdown (which would empty the WAL) | |
3482 | TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) { | |
3483 | bool empty_wal = true; | |
3484 | CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal); | |
3485 | } | |
3486 | ||
3487 | // Test that we fail fast if WAL is not emptied between changing the write | |
3488 | // policy from WriteCommitted to WritePrepared | |
3489 | TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) { | |
3490 | bool empty_wal = true; | |
3491 | CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal); | |
3492 | } | |
3493 | ||
3494 | // Test that we can change write policy from WritePrepare back to WriteCommitted | |
3495 | // after a clean shutdown (which would empty the WAL) | |
3496 | TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) { | |
3497 | bool empty_wal = true; | |
3498 | CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal); | |
3499 | } | |
3500 | ||
3501 | // Test that we fail fast if WAL is not emptied between changing the write | |
3502 | // policy from WriteCommitted to WritePrepared | |
3503 | TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) { | |
3504 | bool empty_wal = true; | |
3505 | CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal); | |
3506 | } | |
3507 | ||
f67539c2 | 3508 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 TL |
3509 | |
3510 | int main(int argc, char** argv) { | |
3511 | ::testing::InitGoogleTest(&argc, argv); | |
3512 | return RUN_ALL_TESTS(); | |
3513 | } | |
3514 | ||
3515 | #else | |
3516 | #include <stdio.h> | |
3517 | ||
3518 | int main(int /*argc*/, char** /*argv*/) { | |
3519 | fprintf(stderr, | |
3520 | "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n"); | |
3521 | return 0; | |
3522 | } | |
3523 | ||
3524 | #endif // ROCKSDB_LITE |