]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #ifndef __STDC_FORMAT_MACROS | |
9 | #define __STDC_FORMAT_MACROS | |
10 | #endif | |
11 | ||
12 | #include "utilities/transactions/transaction_test.h" | |
13 | ||
14 | #include <inttypes.h> | |
15 | #include <algorithm> | |
16 | #include <functional> | |
17 | #include <string> | |
18 | #include <thread> | |
19 | ||
20 | #include "db/db_impl.h" | |
21 | #include "db/dbformat.h" | |
22 | #include "rocksdb/db.h" | |
23 | #include "rocksdb/options.h" | |
24 | #include "rocksdb/types.h" | |
25 | #include "rocksdb/utilities/debug.h" | |
26 | #include "rocksdb/utilities/transaction.h" | |
27 | #include "rocksdb/utilities/transaction_db.h" | |
28 | #include "table/mock_table.h" | |
29 | #include "util/fault_injection_test_env.h" | |
30 | #include "util/random.h" | |
31 | #include "util/string_util.h" | |
32 | #include "util/sync_point.h" | |
33 | #include "util/testharness.h" | |
34 | #include "util/testutil.h" | |
35 | #include "util/transaction_test_util.h" | |
36 | #include "utilities/merge_operators.h" | |
37 | #include "utilities/merge_operators/string_append/stringappend.h" | |
38 | #include "utilities/transactions/pessimistic_transaction_db.h" | |
39 | #include "utilities/transactions/write_prepared_txn_db.h" | |
40 | ||
41 | #include "port/port.h" | |
42 | ||
43 | using std::string; | |
44 | ||
45 | namespace rocksdb { | |
46 | ||
47 | using CommitEntry = WritePreparedTxnDB::CommitEntry; | |
48 | using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b; | |
49 | using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat; | |
50 | ||
51 | TEST(PreparedHeap, BasicsTest) { | |
52 | WritePreparedTxnDB::PreparedHeap heap; | |
53 | heap.push(14l); | |
54 | // Test with one element | |
55 | ASSERT_EQ(14l, heap.top()); | |
56 | heap.push(24l); | |
57 | heap.push(34l); | |
58 | // Test that old min is still on top | |
59 | ASSERT_EQ(14l, heap.top()); | |
60 | heap.push(13l); | |
61 | // Test that the new min will be on top | |
62 | ASSERT_EQ(13l, heap.top()); | |
63 | // Test that it is persistent | |
64 | ASSERT_EQ(13l, heap.top()); | |
65 | heap.push(44l); | |
66 | heap.push(54l); | |
67 | heap.push(64l); | |
68 | heap.push(74l); | |
69 | heap.push(84l); | |
70 | // Test that old min is still on top | |
71 | ASSERT_EQ(13l, heap.top()); | |
72 | heap.erase(24l); | |
73 | // Test that old min is still on top | |
74 | ASSERT_EQ(13l, heap.top()); | |
75 | heap.erase(14l); | |
76 | // Test that old min is still on top | |
77 | ASSERT_EQ(13l, heap.top()); | |
78 | heap.erase(13l); | |
79 | // Test that the new comes to the top after multiple erase | |
80 | ASSERT_EQ(34l, heap.top()); | |
81 | heap.erase(34l); | |
82 | // Test that the new comes to the top after single erase | |
83 | ASSERT_EQ(44l, heap.top()); | |
84 | heap.erase(54l); | |
85 | ASSERT_EQ(44l, heap.top()); | |
86 | heap.pop(); // pop 44l | |
87 | // Test that the erased items are ignored after pop | |
88 | ASSERT_EQ(64l, heap.top()); | |
89 | heap.erase(44l); | |
90 | // Test that erasing an already popped item would work | |
91 | ASSERT_EQ(64l, heap.top()); | |
92 | heap.erase(84l); | |
93 | ASSERT_EQ(64l, heap.top()); | |
94 | heap.push(85l); | |
95 | heap.push(86l); | |
96 | heap.push(87l); | |
97 | heap.push(88l); | |
98 | heap.push(89l); | |
99 | heap.erase(87l); | |
100 | heap.erase(85l); | |
101 | heap.erase(89l); | |
102 | heap.erase(86l); | |
103 | heap.erase(88l); | |
104 | // Test top remains the same after a random order of many erases | |
105 | ASSERT_EQ(64l, heap.top()); | |
106 | heap.pop(); | |
107 | // Test that pop works with a series of random pending erases | |
108 | ASSERT_EQ(74l, heap.top()); | |
109 | ASSERT_FALSE(heap.empty()); | |
110 | heap.pop(); | |
111 | // Test that empty works | |
112 | ASSERT_TRUE(heap.empty()); | |
113 | } | |
114 | ||
115 | // This is a scenario reconstructed from a buggy trace. Test that the bug does | |
116 | // not resurface again. | |
117 | TEST(PreparedHeap, EmptyAtTheEnd) { | |
118 | WritePreparedTxnDB::PreparedHeap heap; | |
119 | heap.push(40l); | |
120 | ASSERT_EQ(40l, heap.top()); | |
121 | // Although not a recommended scenario, we must be resilient against erase | |
122 | // without a prior push. | |
123 | heap.erase(50l); | |
124 | ASSERT_EQ(40l, heap.top()); | |
125 | heap.push(60l); | |
126 | ASSERT_EQ(40l, heap.top()); | |
127 | ||
128 | heap.erase(60l); | |
129 | ASSERT_EQ(40l, heap.top()); | |
130 | heap.erase(40l); | |
131 | ASSERT_TRUE(heap.empty()); | |
132 | ||
133 | heap.push(40l); | |
134 | ASSERT_EQ(40l, heap.top()); | |
135 | heap.erase(50l); | |
136 | ASSERT_EQ(40l, heap.top()); | |
137 | heap.push(60l); | |
138 | ASSERT_EQ(40l, heap.top()); | |
139 | ||
140 | heap.erase(40l); | |
141 | // Test that the erase has not emptied the heap (we had a bug doing that) | |
142 | ASSERT_FALSE(heap.empty()); | |
143 | ASSERT_EQ(60l, heap.top()); | |
144 | heap.erase(60l); | |
145 | ASSERT_TRUE(heap.empty()); | |
146 | } | |
147 | ||
148 | // Generate random order of PreparedHeap access and test that the heap will be | |
149 | // successfully emptied at the end. | |
150 | TEST(PreparedHeap, Concurrent) { | |
151 | const size_t t_cnt = 10; | |
152 | rocksdb::port::Thread t[t_cnt]; | |
153 | Random rnd(1103); | |
154 | WritePreparedTxnDB::PreparedHeap heap; | |
155 | port::RWMutex prepared_mutex; | |
156 | ||
157 | for (size_t n = 0; n < 100; n++) { | |
158 | for (size_t i = 0; i < t_cnt; i++) { | |
159 | // This is not recommended usage but we should be resilient against it. | |
160 | bool skip_push = rnd.OneIn(5); | |
161 | t[i] = rocksdb::port::Thread([&heap, &prepared_mutex, skip_push, i]() { | |
162 | auto seq = i; | |
163 | std::this_thread::yield(); | |
164 | if (!skip_push) { | |
165 | WriteLock wl(&prepared_mutex); | |
166 | heap.push(seq); | |
167 | } | |
168 | std::this_thread::yield(); | |
169 | { | |
170 | WriteLock wl(&prepared_mutex); | |
171 | heap.erase(seq); | |
172 | } | |
173 | }); | |
174 | } | |
175 | for (size_t i = 0; i < t_cnt; i++) { | |
176 | t[i].join(); | |
177 | } | |
178 | ASSERT_TRUE(heap.empty()); | |
179 | } | |
180 | } | |
181 | ||
182 | // Test that WriteBatchWithIndex correctly counts the number of sub-batches | |
183 | TEST(WriteBatchWithIndex, SubBatchCnt) { | |
184 | ColumnFamilyOptions cf_options; | |
185 | std::string cf_name = "two"; | |
186 | DB* db; | |
187 | Options options; | |
188 | options.create_if_missing = true; | |
189 | const std::string dbname = test::PerThreadDBPath("transaction_testdb"); | |
190 | DestroyDB(dbname, options); | |
191 | ASSERT_OK(DB::Open(options, dbname, &db)); | |
192 | ColumnFamilyHandle* cf_handle = nullptr; | |
193 | ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); | |
194 | WriteOptions write_options; | |
195 | size_t batch_cnt = 1; | |
196 | size_t save_points = 0; | |
197 | std::vector<size_t> batch_cnt_at; | |
198 | WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true, | |
199 | 0); | |
200 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
201 | batch_cnt_at.push_back(batch_cnt); | |
202 | batch.SetSavePoint(); | |
203 | save_points++; | |
204 | batch.Put(Slice("key"), Slice("value")); | |
205 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
206 | batch_cnt_at.push_back(batch_cnt); | |
207 | batch.SetSavePoint(); | |
208 | save_points++; | |
209 | batch.Put(Slice("key2"), Slice("value2")); | |
210 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
211 | // duplicate the keys | |
212 | batch_cnt_at.push_back(batch_cnt); | |
213 | batch.SetSavePoint(); | |
214 | save_points++; | |
215 | batch.Put(Slice("key"), Slice("value3")); | |
216 | batch_cnt++; | |
217 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
218 | // duplicate the 2nd key. It should not be counted duplicate since a | |
219 | // sub-patch is cut after the last duplicate. | |
220 | batch_cnt_at.push_back(batch_cnt); | |
221 | batch.SetSavePoint(); | |
222 | save_points++; | |
223 | batch.Put(Slice("key2"), Slice("value4")); | |
224 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
225 | // duplicate the keys but in a different cf. It should not be counted as | |
226 | // duplicate keys | |
227 | batch_cnt_at.push_back(batch_cnt); | |
228 | batch.SetSavePoint(); | |
229 | save_points++; | |
230 | batch.Put(cf_handle, Slice("key"), Slice("value5")); | |
231 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
232 | ||
233 | // Test that the number of sub-batches matches what we count with | |
234 | // SubBatchCounter | |
235 | std::map<uint32_t, const Comparator*> comparators; | |
236 | comparators[0] = db->DefaultColumnFamily()->GetComparator(); | |
237 | comparators[cf_handle->GetID()] = cf_handle->GetComparator(); | |
238 | SubBatchCounter counter(comparators); | |
239 | ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter)); | |
240 | ASSERT_EQ(batch_cnt, counter.BatchCount()); | |
241 | ||
242 | // Test that RollbackToSavePoint will properly resets the number of | |
243 | // sub-batches | |
244 | for (size_t i = save_points; i > 0; i--) { | |
245 | batch.RollbackToSavePoint(); | |
246 | ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt()); | |
247 | } | |
248 | ||
249 | // Test the count is right with random batches | |
250 | { | |
251 | const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms | |
252 | Random rnd(1131); | |
253 | std::string keys[TOTAL_KEYS]; | |
254 | for (size_t k = 0; k < TOTAL_KEYS; k++) { | |
255 | int len = static_cast<int>(rnd.Uniform(50)); | |
256 | keys[k] = test::RandomKey(&rnd, len); | |
257 | } | |
258 | for (size_t i = 0; i < 1000; i++) { // 1000 random batches | |
259 | WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(), | |
260 | 0, true, 0); | |
261 | for (size_t k = 0; k < 10; k++) { // 10 key per batch | |
262 | size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS)); | |
263 | Slice key = Slice(keys[ki]); | |
264 | std::string buffer; | |
265 | Slice value = Slice(test::RandomString(&rnd, 16, &buffer)); | |
266 | rndbatch.Put(key, value); | |
267 | } | |
268 | SubBatchCounter batch_counter(comparators); | |
269 | ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter)); | |
270 | ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount()); | |
271 | } | |
272 | } | |
273 | ||
274 | delete cf_handle; | |
275 | delete db; | |
276 | } | |
277 | ||
278 | TEST(CommitEntry64b, BasicTest) { | |
279 | const size_t INDEX_BITS = static_cast<size_t>(21); | |
280 | const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS); | |
281 | const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS)); | |
282 | ||
283 | // zero-initialized CommitEntry64b should indicate an empty entry | |
284 | CommitEntry64b empty_entry64b; | |
285 | uint64_t empty_index = 11ul; | |
286 | CommitEntry empty_entry; | |
287 | bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT); | |
288 | ASSERT_FALSE(ok); | |
289 | ||
290 | // the zero entry is reserved for un-initialized entries | |
291 | const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1; | |
292 | // Samples over the numbers that are covered by that many index bits | |
293 | std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}}; | |
294 | // Samples over the numbers that are covered by that many commit bits | |
295 | std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}}; | |
296 | // Iterate over prepare numbers that have i) cover all bits of a sequence | |
297 | // number, and ii) include some bits that fall into the range of index or | |
298 | // commit bits | |
299 | for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) { | |
300 | for (uint64_t i : is) { | |
301 | for (uint64_t d : ds) { | |
302 | uint64_t p = base + i + d; | |
303 | for (uint64_t c : {p, p + d / 2, p + d}) { | |
304 | uint64_t index = p % INDEX_SIZE; | |
305 | CommitEntry before(p, c), after; | |
306 | CommitEntry64b entry64b(before, FORMAT); | |
307 | ok = entry64b.Parse(index, &after, FORMAT); | |
308 | ASSERT_TRUE(ok); | |
309 | if (!(before == after)) { | |
310 | printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64 | |
311 | " c %" PRIu64 " index %" PRIu64 "\n", | |
312 | base, i, d, p, c, index); | |
313 | } | |
314 | ASSERT_EQ(before, after); | |
315 | } | |
316 | } | |
317 | } | |
318 | } | |
319 | } | |
320 | ||
321 | class WritePreparedTxnDBMock : public WritePreparedTxnDB { | |
322 | public: | |
323 | WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt) | |
324 | : WritePreparedTxnDB(db_impl, opt) {} | |
325 | WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt, | |
326 | size_t snapshot_cache_size) | |
327 | : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size) {} | |
328 | WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt, | |
329 | size_t snapshot_cache_size, size_t commit_cache_size) | |
330 | : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size, | |
331 | commit_cache_size) {} | |
332 | void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) { | |
333 | snapshots_ = snapshots; | |
334 | } | |
335 | void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); } | |
336 | ||
337 | protected: | |
338 | virtual const std::vector<SequenceNumber> GetSnapshotListFromDB( | |
339 | SequenceNumber /* unused */) override { | |
340 | return snapshots_; | |
341 | } | |
342 | ||
343 | private: | |
344 | std::vector<SequenceNumber> snapshots_; | |
345 | }; | |
346 | ||
347 | class WritePreparedTransactionTestBase : public TransactionTestBase { | |
348 | public: | |
349 | WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue, | |
350 | TxnDBWritePolicy write_policy) | |
351 | : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){}; | |
352 | ||
353 | protected: | |
354 | // If expect_update is set, check if it actually updated old_commit_map_. If | |
355 | // it did not and yet suggested not to check the next snapshot, do the | |
356 | // opposite to check if it was not a bad suggestion. | |
357 | void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit, | |
358 | uint64_t snapshot, | |
359 | uint64_t next_snapshot, | |
360 | bool expect_update) { | |
361 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
362 | // reset old_commit_map_empty_ so that its value indicate whether | |
363 | // old_commit_map_ was updated | |
364 | wp_db->old_commit_map_empty_ = true; | |
365 | bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot, | |
366 | snapshot < next_snapshot); | |
367 | if (expect_update == wp_db->old_commit_map_empty_) { | |
368 | printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64 | |
369 | " next: %" PRIu64 "\n", | |
370 | prepare, commit, snapshot, next_snapshot); | |
371 | } | |
372 | EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_); | |
373 | if (!check_next && wp_db->old_commit_map_empty_) { | |
374 | // do the opposite to make sure it was not a bad suggestion | |
375 | const bool dont_care_bool = true; | |
376 | wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot, | |
377 | dont_care_bool); | |
378 | if (!wp_db->old_commit_map_empty_) { | |
379 | printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64 | |
380 | " next: %" PRIu64 "\n", | |
381 | prepare, commit, snapshot, next_snapshot); | |
382 | } | |
383 | EXPECT_TRUE(wp_db->old_commit_map_empty_); | |
384 | } | |
385 | } | |
386 | ||
387 | // Test that a CheckAgainstSnapshots thread reading old_snapshots will not | |
388 | // miss a snapshot because of a concurrent update by UpdateSnapshots that is | |
389 | // writing new_snapshots. Both threads are broken at two points. The sync | |
390 | // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry | |
391 | // entry is expected to be vital for one of the snapshots that is common | |
392 | // between the old and new list of snapshots. | |
393 | void SnapshotConcurrentAccessTestInternal( | |
394 | WritePreparedTxnDB* wp_db, | |
395 | const std::vector<SequenceNumber>& old_snapshots, | |
396 | const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry, | |
397 | SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) { | |
398 | // First reset the snapshot list | |
399 | const std::vector<SequenceNumber> empty_snapshots; | |
400 | wp_db->old_commit_map_empty_ = true; | |
401 | wp_db->UpdateSnapshots(empty_snapshots, ++version); | |
402 | // Then initialize it with the old_snapshots | |
403 | wp_db->UpdateSnapshots(old_snapshots, ++version); | |
404 | ||
405 | // Starting from the first thread, cut each thread at two points | |
406 | rocksdb::SyncPoint::GetInstance()->LoadDependency({ | |
407 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1), | |
408 | "WritePreparedTxnDB::UpdateSnapshots:s:start"}, | |
409 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1), | |
410 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)}, | |
411 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2), | |
412 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)}, | |
413 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2), | |
414 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)}, | |
415 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end", | |
416 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)}, | |
417 | }); | |
418 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
419 | { | |
420 | ASSERT_TRUE(wp_db->old_commit_map_empty_); | |
421 | rocksdb::port::Thread t1( | |
422 | [&]() { wp_db->UpdateSnapshots(new_snapshots, version); }); | |
423 | rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); }); | |
424 | t1.join(); | |
425 | t2.join(); | |
426 | ASSERT_FALSE(wp_db->old_commit_map_empty_); | |
427 | } | |
428 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
429 | ||
430 | wp_db->old_commit_map_empty_ = true; | |
431 | wp_db->UpdateSnapshots(empty_snapshots, ++version); | |
432 | wp_db->UpdateSnapshots(old_snapshots, ++version); | |
433 | // Starting from the second thread, cut each thread at two points | |
434 | rocksdb::SyncPoint::GetInstance()->LoadDependency({ | |
435 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1), | |
436 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"}, | |
437 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1), | |
438 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)}, | |
439 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2), | |
440 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)}, | |
441 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2), | |
442 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)}, | |
443 | {"WritePreparedTxnDB::UpdateSnapshots:p:end", | |
444 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)}, | |
445 | }); | |
446 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
447 | { | |
448 | ASSERT_TRUE(wp_db->old_commit_map_empty_); | |
449 | rocksdb::port::Thread t1( | |
450 | [&]() { wp_db->UpdateSnapshots(new_snapshots, version); }); | |
451 | rocksdb::port::Thread t2([&]() { wp_db->CheckAgainstSnapshots(entry); }); | |
452 | t1.join(); | |
453 | t2.join(); | |
454 | ASSERT_FALSE(wp_db->old_commit_map_empty_); | |
455 | } | |
456 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
457 | } | |
458 | ||
459 | // Verify value of keys. | |
460 | void VerifyKeys(const std::unordered_map<std::string, std::string>& data, | |
461 | const Snapshot* snapshot = nullptr) { | |
462 | std::string value; | |
463 | ReadOptions read_options; | |
464 | read_options.snapshot = snapshot; | |
465 | for (auto& kv : data) { | |
466 | auto s = db->Get(read_options, kv.first, &value); | |
467 | ASSERT_TRUE(s.ok() || s.IsNotFound()); | |
468 | if (s.ok()) { | |
469 | if (kv.second != value) { | |
470 | printf("key = %s\n", kv.first.c_str()); | |
471 | } | |
472 | ASSERT_EQ(kv.second, value); | |
473 | } else { | |
474 | ASSERT_EQ(kv.second, "NOT_FOUND"); | |
475 | } | |
476 | ||
477 | // Try with MultiGet API too | |
478 | std::vector<std::string> values; | |
479 | auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()}, | |
480 | {kv.first}, &values); | |
481 | ASSERT_EQ(1, values.size()); | |
482 | ASSERT_EQ(1, s_vec.size()); | |
483 | s = s_vec[0]; | |
484 | ASSERT_TRUE(s.ok() || s.IsNotFound()); | |
485 | if (s.ok()) { | |
486 | ASSERT_TRUE(kv.second == values[0]); | |
487 | } else { | |
488 | ASSERT_EQ(kv.second, "NOT_FOUND"); | |
489 | } | |
490 | } | |
491 | } | |
492 | ||
493 | // Verify all versions of keys. | |
494 | void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) { | |
495 | std::vector<KeyVersion> versions; | |
496 | const size_t kMaxKeys = 100000; | |
497 | ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key, | |
498 | expected_versions.back().user_key, kMaxKeys, | |
499 | &versions)); | |
500 | ASSERT_EQ(expected_versions.size(), versions.size()); | |
501 | for (size_t i = 0; i < versions.size(); i++) { | |
502 | ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key); | |
503 | ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence); | |
504 | ASSERT_EQ(expected_versions[i].type, versions[i].type); | |
505 | if (versions[i].type != kTypeDeletion && | |
506 | versions[i].type != kTypeSingleDeletion) { | |
507 | ASSERT_EQ(expected_versions[i].value, versions[i].value); | |
508 | } | |
509 | // Range delete not supported. | |
510 | assert(expected_versions[i].type != kTypeRangeDeletion); | |
511 | } | |
512 | } | |
513 | }; | |
514 | ||
515 | class WritePreparedTransactionTest | |
516 | : public WritePreparedTransactionTestBase, | |
517 | virtual public ::testing::WithParamInterface< | |
518 | std::tuple<bool, bool, TxnDBWritePolicy>> { | |
519 | public: | |
520 | WritePreparedTransactionTest() | |
521 | : WritePreparedTransactionTestBase(std::get<0>(GetParam()), | |
522 | std::get<1>(GetParam()), | |
523 | std::get<2>(GetParam())){}; | |
524 | }; | |
525 | ||
526 | #ifndef ROCKSDB_VALGRIND_RUN | |
527 | class SnapshotConcurrentAccessTest | |
528 | : public WritePreparedTransactionTestBase, | |
529 | virtual public ::testing::WithParamInterface< | |
530 | std::tuple<bool, bool, TxnDBWritePolicy, size_t, size_t>> { | |
531 | public: | |
532 | SnapshotConcurrentAccessTest() | |
533 | : WritePreparedTransactionTestBase(std::get<0>(GetParam()), | |
534 | std::get<1>(GetParam()), | |
535 | std::get<2>(GetParam())), | |
536 | split_id_(std::get<3>(GetParam())), | |
537 | split_cnt_(std::get<4>(GetParam())){}; | |
538 | ||
539 | protected: | |
540 | // A test is split into split_cnt_ tests, each identified with split_id_ where | |
541 | // 0 <= split_id_ < split_cnt_ | |
542 | size_t split_id_; | |
543 | size_t split_cnt_; | |
544 | }; | |
545 | #endif // ROCKSDB_VALGRIND_RUN | |
546 | ||
547 | class SeqAdvanceConcurrentTest | |
548 | : public WritePreparedTransactionTestBase, | |
549 | virtual public ::testing::WithParamInterface< | |
550 | std::tuple<bool, bool, TxnDBWritePolicy, size_t, size_t>> { | |
551 | public: | |
552 | SeqAdvanceConcurrentTest() | |
553 | : WritePreparedTransactionTestBase(std::get<0>(GetParam()), | |
554 | std::get<1>(GetParam()), | |
555 | std::get<2>(GetParam())), | |
556 | split_id_(std::get<3>(GetParam())), | |
557 | split_cnt_(std::get<4>(GetParam())){}; | |
558 | ||
559 | protected: | |
560 | // A test is split into split_cnt_ tests, each identified with split_id_ where | |
561 | // 0 <= split_id_ < split_cnt_ | |
562 | size_t split_id_; | |
563 | size_t split_cnt_; | |
564 | }; | |
565 | ||
566 | INSTANTIATE_TEST_CASE_P( | |
567 | WritePreparedTransactionTest, WritePreparedTransactionTest, | |
568 | ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED), | |
569 | std::make_tuple(false, true, WRITE_PREPARED))); | |
570 | ||
571 | #ifndef ROCKSDB_VALGRIND_RUN | |
572 | INSTANTIATE_TEST_CASE_P( | |
573 | TwoWriteQueues, SnapshotConcurrentAccessTest, | |
574 | ::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 20), | |
575 | std::make_tuple(false, true, WRITE_PREPARED, 1, 20), | |
576 | std::make_tuple(false, true, WRITE_PREPARED, 2, 20), | |
577 | std::make_tuple(false, true, WRITE_PREPARED, 3, 20), | |
578 | std::make_tuple(false, true, WRITE_PREPARED, 4, 20), | |
579 | std::make_tuple(false, true, WRITE_PREPARED, 5, 20), | |
580 | std::make_tuple(false, true, WRITE_PREPARED, 6, 20), | |
581 | std::make_tuple(false, true, WRITE_PREPARED, 7, 20), | |
582 | std::make_tuple(false, true, WRITE_PREPARED, 8, 20), | |
583 | std::make_tuple(false, true, WRITE_PREPARED, 9, 20), | |
584 | std::make_tuple(false, true, WRITE_PREPARED, 10, 20), | |
585 | std::make_tuple(false, true, WRITE_PREPARED, 11, 20), | |
586 | std::make_tuple(false, true, WRITE_PREPARED, 12, 20), | |
587 | std::make_tuple(false, true, WRITE_PREPARED, 13, 20), | |
588 | std::make_tuple(false, true, WRITE_PREPARED, 14, 20), | |
589 | std::make_tuple(false, true, WRITE_PREPARED, 15, 20), | |
590 | std::make_tuple(false, true, WRITE_PREPARED, 16, 20), | |
591 | std::make_tuple(false, true, WRITE_PREPARED, 17, 20), | |
592 | std::make_tuple(false, true, WRITE_PREPARED, 18, 20), | |
593 | std::make_tuple(false, true, WRITE_PREPARED, 19, 20))); | |
594 | ||
595 | INSTANTIATE_TEST_CASE_P( | |
596 | OneWriteQueue, SnapshotConcurrentAccessTest, | |
597 | ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 20), | |
598 | std::make_tuple(false, false, WRITE_PREPARED, 1, 20), | |
599 | std::make_tuple(false, false, WRITE_PREPARED, 2, 20), | |
600 | std::make_tuple(false, false, WRITE_PREPARED, 3, 20), | |
601 | std::make_tuple(false, false, WRITE_PREPARED, 4, 20), | |
602 | std::make_tuple(false, false, WRITE_PREPARED, 5, 20), | |
603 | std::make_tuple(false, false, WRITE_PREPARED, 6, 20), | |
604 | std::make_tuple(false, false, WRITE_PREPARED, 7, 20), | |
605 | std::make_tuple(false, false, WRITE_PREPARED, 8, 20), | |
606 | std::make_tuple(false, false, WRITE_PREPARED, 9, 20), | |
607 | std::make_tuple(false, false, WRITE_PREPARED, 10, 20), | |
608 | std::make_tuple(false, false, WRITE_PREPARED, 11, 20), | |
609 | std::make_tuple(false, false, WRITE_PREPARED, 12, 20), | |
610 | std::make_tuple(false, false, WRITE_PREPARED, 13, 20), | |
611 | std::make_tuple(false, false, WRITE_PREPARED, 14, 20), | |
612 | std::make_tuple(false, false, WRITE_PREPARED, 15, 20), | |
613 | std::make_tuple(false, false, WRITE_PREPARED, 16, 20), | |
614 | std::make_tuple(false, false, WRITE_PREPARED, 17, 20), | |
615 | std::make_tuple(false, false, WRITE_PREPARED, 18, 20), | |
616 | std::make_tuple(false, false, WRITE_PREPARED, 19, 20))); | |
617 | ||
618 | INSTANTIATE_TEST_CASE_P( | |
619 | TwoWriteQueues, SeqAdvanceConcurrentTest, | |
620 | ::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 10), | |
621 | std::make_tuple(false, true, WRITE_PREPARED, 1, 10), | |
622 | std::make_tuple(false, true, WRITE_PREPARED, 2, 10), | |
623 | std::make_tuple(false, true, WRITE_PREPARED, 3, 10), | |
624 | std::make_tuple(false, true, WRITE_PREPARED, 4, 10), | |
625 | std::make_tuple(false, true, WRITE_PREPARED, 5, 10), | |
626 | std::make_tuple(false, true, WRITE_PREPARED, 6, 10), | |
627 | std::make_tuple(false, true, WRITE_PREPARED, 7, 10), | |
628 | std::make_tuple(false, true, WRITE_PREPARED, 8, 10), | |
629 | std::make_tuple(false, true, WRITE_PREPARED, 9, 10))); | |
630 | ||
631 | INSTANTIATE_TEST_CASE_P( | |
632 | OneWriteQueue, SeqAdvanceConcurrentTest, | |
633 | ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 10), | |
634 | std::make_tuple(false, false, WRITE_PREPARED, 1, 10), | |
635 | std::make_tuple(false, false, WRITE_PREPARED, 2, 10), | |
636 | std::make_tuple(false, false, WRITE_PREPARED, 3, 10), | |
637 | std::make_tuple(false, false, WRITE_PREPARED, 4, 10), | |
638 | std::make_tuple(false, false, WRITE_PREPARED, 5, 10), | |
639 | std::make_tuple(false, false, WRITE_PREPARED, 6, 10), | |
640 | std::make_tuple(false, false, WRITE_PREPARED, 7, 10), | |
641 | std::make_tuple(false, false, WRITE_PREPARED, 8, 10), | |
642 | std::make_tuple(false, false, WRITE_PREPARED, 9, 10))); | |
643 | #endif // ROCKSDB_VALGRIND_RUN | |
644 | ||
645 | TEST_P(WritePreparedTransactionTest, CommitMapTest) { | |
646 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
647 | assert(wp_db); | |
648 | assert(wp_db->db_impl_); | |
649 | size_t size = wp_db->COMMIT_CACHE_SIZE; | |
650 | CommitEntry c = {5, 12}, e; | |
651 | bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e); | |
652 | ASSERT_FALSE(evicted); | |
653 | ||
654 | // Should be able to read the same value | |
655 | CommitEntry64b dont_care; | |
656 | bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e); | |
657 | ASSERT_TRUE(found); | |
658 | ASSERT_EQ(c, e); | |
659 | // Should be able to distinguish between overlapping entries | |
660 | found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e); | |
661 | ASSERT_TRUE(found); | |
662 | ASSERT_NE(c.prep_seq + size, e.prep_seq); | |
663 | // Should be able to detect non-existent entry | |
664 | found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e); | |
665 | ASSERT_FALSE(found); | |
666 | ||
667 | // Reject an invalid exchange | |
668 | CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size}; | |
669 | CommitEntry64b e2_64b(e2, wp_db->FORMAT); | |
670 | bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e); | |
671 | ASSERT_FALSE(exchanged); | |
672 | // check whether it did actually reject that | |
673 | found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e); | |
674 | ASSERT_TRUE(found); | |
675 | ASSERT_EQ(c, e); | |
676 | ||
677 | // Accept a valid exchange | |
678 | CommitEntry64b c_64b(c, wp_db->FORMAT); | |
679 | CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1}; | |
680 | exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3); | |
681 | ASSERT_TRUE(exchanged); | |
682 | // check whether it did actually accepted that | |
683 | found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e); | |
684 | ASSERT_TRUE(found); | |
685 | ASSERT_EQ(e3, e); | |
686 | ||
687 | // Rewrite an entry | |
688 | CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1}; | |
689 | evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e); | |
690 | ASSERT_TRUE(evicted); | |
691 | ASSERT_EQ(e3, e); | |
692 | found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e); | |
693 | ASSERT_TRUE(found); | |
694 | ASSERT_EQ(e4, e); | |
695 | } | |
696 | ||
697 | TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) { | |
698 | // If prepare <= snapshot < commit we should keep the entry around since its | |
699 | // nonexistence could be interpreted as committed in the snapshot while it is | |
700 | // not true. We keep such entries around by adding them to the | |
701 | // old_commit_map_. | |
702 | uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/; | |
703 | p = 10l, c = 15l, s = 20l, ns = 21l; | |
704 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
705 | // If we do not expect the old commit map to be updated, try also with a next | |
706 | // snapshot that is expected to update the old commit map. This would test | |
707 | // that MaybeUpdateOldCommitMap would not prevent us from checking the next | |
708 | // snapshot that must be checked. | |
709 | p = 10l, c = 15l, s = 20l, ns = 11l; | |
710 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
711 | ||
712 | p = 10l, c = 20l, s = 20l, ns = 19l; | |
713 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
714 | p = 10l, c = 20l, s = 20l, ns = 21l; | |
715 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
716 | ||
717 | p = 20l, c = 20l, s = 20l, ns = 21l; | |
718 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
719 | p = 20l, c = 20l, s = 20l, ns = 19l; | |
720 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
721 | ||
722 | p = 10l, c = 25l, s = 20l, ns = 21l; | |
723 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true); | |
724 | ||
725 | p = 20l, c = 25l, s = 20l, ns = 21l; | |
726 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true); | |
727 | ||
728 | p = 21l, c = 25l, s = 20l, ns = 22l; | |
729 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
730 | p = 21l, c = 25l, s = 20l, ns = 19l; | |
731 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
732 | } | |
733 | ||
734 | // Test that the entries in old_commit_map_ get garbage collected properly | |
735 | TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { | |
736 | const size_t snapshot_cache_bits = 0; | |
737 | const size_t commit_cache_bits = 0; | |
738 | DBImpl* mock_db = new DBImpl(options, dbname); | |
739 | std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock( | |
740 | mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits)); | |
741 | ||
742 | SequenceNumber seq = 0; | |
743 | // Take the first snapshot that overlaps with two txn | |
744 | auto prep_seq = ++seq; | |
745 | wp_db->AddPrepared(prep_seq); | |
746 | auto prep_seq2 = ++seq; | |
747 | wp_db->AddPrepared(prep_seq2); | |
748 | auto snap_seq1 = seq; | |
749 | wp_db->TakeSnapshot(snap_seq1); | |
750 | auto commit_seq = ++seq; | |
751 | wp_db->AddCommitted(prep_seq, commit_seq); | |
752 | wp_db->RemovePrepared(prep_seq); | |
753 | auto commit_seq2 = ++seq; | |
754 | wp_db->AddCommitted(prep_seq2, commit_seq2); | |
755 | wp_db->RemovePrepared(prep_seq2); | |
756 | // Take the 2nd and 3rd snapshot that overlap with the same txn | |
757 | prep_seq = ++seq; | |
758 | wp_db->AddPrepared(prep_seq); | |
759 | auto snap_seq2 = seq; | |
760 | wp_db->TakeSnapshot(snap_seq2); | |
761 | seq++; | |
762 | auto snap_seq3 = seq; | |
763 | wp_db->TakeSnapshot(snap_seq3); | |
764 | seq++; | |
765 | commit_seq = ++seq; | |
766 | wp_db->AddCommitted(prep_seq, commit_seq); | |
767 | wp_db->RemovePrepared(prep_seq); | |
768 | // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the | |
769 | // only item in the commit_cache_ via another commit. | |
770 | prep_seq = ++seq; | |
771 | wp_db->AddPrepared(prep_seq); | |
772 | commit_seq = ++seq; | |
773 | wp_db->AddCommitted(prep_seq, commit_seq); | |
774 | wp_db->RemovePrepared(prep_seq); | |
775 | ||
776 | // Verify that the evicted commit entries for all snapshots are in the | |
777 | // old_commit_map_ | |
778 | { | |
779 | ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); | |
780 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
781 | ASSERT_EQ(3, wp_db->old_commit_map_.size()); | |
782 | ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size()); | |
783 | ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq2].size()); | |
784 | ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); | |
785 | } | |
786 | ||
787 | // Verify that the 2nd snapshot is cleaned up after the release | |
788 | wp_db->ReleaseSnapshotInternal(snap_seq2); | |
789 | { | |
790 | ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); | |
791 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
792 | ASSERT_EQ(2, wp_db->old_commit_map_.size()); | |
793 | ASSERT_EQ(2, wp_db->old_commit_map_[snap_seq1].size()); | |
794 | ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); | |
795 | } | |
796 | ||
797 | // Verify that the 1st snapshot is cleaned up after the release | |
798 | wp_db->ReleaseSnapshotInternal(snap_seq1); | |
799 | { | |
800 | ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); | |
801 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
802 | ASSERT_EQ(1, wp_db->old_commit_map_.size()); | |
803 | ASSERT_EQ(1, wp_db->old_commit_map_[snap_seq3].size()); | |
804 | } | |
805 | ||
806 | // Verify that the 3rd snapshot is cleaned up after the release | |
807 | wp_db->ReleaseSnapshotInternal(snap_seq3); | |
808 | { | |
809 | ASSERT_TRUE(wp_db->old_commit_map_empty_.load()); | |
810 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
811 | ASSERT_EQ(0, wp_db->old_commit_map_.size()); | |
812 | } | |
813 | } | |
814 | ||
815 | TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) { | |
816 | std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l, | |
817 | 600l, 700l, 800l, 900l}; | |
818 | const size_t snapshot_cache_bits = 2; | |
819 | // Safety check to express the intended size in the test. Can be adjusted if | |
820 | // the snapshots lists changed. | |
821 | assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size()); | |
822 | DBImpl* mock_db = new DBImpl(options, dbname); | |
823 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( | |
824 | new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits)); | |
825 | SequenceNumber version = 1000l; | |
826 | ASSERT_EQ(0, wp_db->snapshots_total_); | |
827 | wp_db->UpdateSnapshots(snapshots, version); | |
828 | ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_); | |
829 | // seq numbers are chosen so that we have two of them between each two | |
830 | // snapshots. If the diff of two consecutive seq is more than 5, there is a | |
831 | // snapshot between them. | |
832 | std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l, | |
833 | 355l, 450l, 455l, 550l, 555l, 650l, 655l, | |
834 | 750l, 755l, 850l, 855l, 950l, 955l}; | |
835 | assert(seqs.size() > 1); | |
836 | for (size_t i = 0; i < seqs.size() - 1; i++) { | |
837 | wp_db->old_commit_map_empty_ = true; // reset | |
838 | CommitEntry commit_entry = {seqs[i], seqs[i + 1]}; | |
839 | wp_db->CheckAgainstSnapshots(commit_entry); | |
840 | // Expect update if there is snapshot in between the prepare and commit | |
841 | bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 && | |
842 | commit_entry.commit_seq >= snapshots.front() && | |
843 | commit_entry.prep_seq <= snapshots.back(); | |
844 | ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_); | |
845 | } | |
846 | } | |
847 | ||
848 | // This test is too slow for travis | |
849 | #ifndef TRAVIS | |
850 | #ifndef ROCKSDB_VALGRIND_RUN | |
851 | // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in | |
852 | // parallel with UpdateSnapshots. | |
853 | TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccessTest) { | |
854 | // We have a sync point in the method under test after checking each snapshot. | |
855 | // If you increase the max number of snapshots in this test, more sync points | |
856 | // in the methods must also be added. | |
857 | const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l, | |
858 | 60l, 70l, 80l, 90l, 100l}; | |
859 | const size_t snapshot_cache_bits = 2; | |
860 | // Safety check to express the intended size in the test. Can be adjusted if | |
861 | // the snapshots lists changed. | |
862 | assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size()); | |
863 | SequenceNumber version = 1000l; | |
864 | // Choose the cache size so that the new snapshot list could replace all the | |
865 | // existing items in the cache and also have some overflow. | |
866 | DBImpl* mock_db = new DBImpl(options, dbname); | |
867 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( | |
868 | new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits)); | |
869 | const size_t extra = 2; | |
870 | size_t loop_id = 0; | |
871 | // Add up to extra items that do not fit into the cache | |
872 | for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra; | |
873 | old_size++) { | |
874 | const std::vector<SequenceNumber> old_snapshots( | |
875 | snapshots.begin(), snapshots.begin() + old_size); | |
876 | ||
877 | // Each member of old snapshot might or might not appear in the new list. We | |
878 | // create a common_snapshots for each combination. | |
879 | size_t new_comb_cnt = size_t(1) << old_size; | |
880 | for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) { | |
881 | if (loop_id % split_cnt_ != split_id_) continue; | |
882 | printf("."); // To signal progress | |
883 | fflush(stdout); | |
884 | std::vector<SequenceNumber> common_snapshots; | |
885 | for (size_t i = 0; i < old_snapshots.size(); i++) { | |
886 | if (IsInCombination(i, new_comb)) { | |
887 | common_snapshots.push_back(old_snapshots[i]); | |
888 | } | |
889 | } | |
890 | // And add some new snapshots to the common list | |
891 | for (size_t added_snapshots = 0; | |
892 | added_snapshots <= snapshots.size() - old_snapshots.size(); | |
893 | added_snapshots++) { | |
894 | std::vector<SequenceNumber> new_snapshots = common_snapshots; | |
895 | for (size_t i = 0; i < added_snapshots; i++) { | |
896 | new_snapshots.push_back(snapshots[old_snapshots.size() + i]); | |
897 | } | |
898 | for (auto it = common_snapshots.begin(); it != common_snapshots.end(); | |
899 | it++) { | |
900 | auto snapshot = *it; | |
901 | // Create a commit entry that is around the snapshot and thus should | |
902 | // be not be discarded | |
903 | CommitEntry entry = {static_cast<uint64_t>(snapshot - 1), | |
904 | snapshot + 1}; | |
905 | // The critical part is when iterating the snapshot cache. Afterwards, | |
906 | // we are operating under the lock | |
907 | size_t a_range = | |
908 | std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1; | |
909 | size_t b_range = | |
910 | std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1; | |
911 | // Break each thread at two points | |
912 | for (size_t a1 = 1; a1 <= a_range; a1++) { | |
913 | for (size_t a2 = a1 + 1; a2 <= a_range; a2++) { | |
914 | for (size_t b1 = 1; b1 <= b_range; b1++) { | |
915 | for (size_t b2 = b1 + 1; b2 <= b_range; b2++) { | |
916 | SnapshotConcurrentAccessTestInternal( | |
917 | wp_db.get(), old_snapshots, new_snapshots, entry, version, | |
918 | a1, a2, b1, b2); | |
919 | } | |
920 | } | |
921 | } | |
922 | } | |
923 | } | |
924 | } | |
925 | } | |
926 | } | |
927 | printf("\n"); | |
928 | } | |
929 | #endif // ROCKSDB_VALGRIND_RUN | |
930 | #endif // TRAVIS | |
931 | ||
932 | // This test clarifies the contract of AdvanceMaxEvictedSeq method | |
933 | TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) { | |
934 | DBImpl* mock_db = new DBImpl(options, dbname); | |
935 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( | |
936 | new WritePreparedTxnDBMock(mock_db, txn_db_options)); | |
937 | ||
938 | // 1. Set the initial values for max, prepared, and snapshots | |
939 | SequenceNumber zero_max = 0l; | |
940 | // Set the initial list of prepared txns | |
941 | const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100, | |
942 | 150, 200, 250}; | |
943 | for (auto p : initial_prepared) { | |
944 | wp_db->AddPrepared(p); | |
945 | } | |
946 | // This updates the max value and also set old prepared | |
947 | SequenceNumber init_max = 100; | |
948 | wp_db->AdvanceMaxEvictedSeq(zero_max, init_max); | |
949 | const std::vector<SequenceNumber> initial_snapshots = {20, 40}; | |
950 | wp_db->SetDBSnapshots(initial_snapshots); | |
951 | // This will update the internal cache of snapshots from the DB | |
952 | wp_db->UpdateSnapshots(initial_snapshots, init_max); | |
953 | ||
954 | // 2. Invoke AdvanceMaxEvictedSeq | |
955 | const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300}; | |
956 | wp_db->SetDBSnapshots(latest_snapshots); | |
957 | SequenceNumber new_max = 200; | |
958 | wp_db->AdvanceMaxEvictedSeq(init_max, new_max); | |
959 | ||
960 | // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract | |
961 | // a. max should be updated to new_max | |
962 | ASSERT_EQ(wp_db->max_evicted_seq_, new_max); | |
963 | // b. delayed prepared should contain every txn <= max and prepared should | |
964 | // only contain txns > max | |
965 | auto it = initial_prepared.begin(); | |
966 | for (; it != initial_prepared.end() && *it <= new_max; it++) { | |
967 | ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it)); | |
968 | } | |
969 | ASSERT_TRUE(wp_db->delayed_prepared_.empty()); | |
970 | for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty(); | |
971 | it++, wp_db->prepared_txns_.pop()) { | |
972 | ASSERT_EQ(*it, wp_db->prepared_txns_.top()); | |
973 | } | |
974 | ASSERT_TRUE(it == initial_prepared.end()); | |
975 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
976 | // c. snapshots should contain everything below new_max | |
977 | auto sit = latest_snapshots.begin(); | |
978 | for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max && | |
979 | i < wp_db->snapshots_total_; | |
980 | sit++, i++) { | |
981 | ASSERT_TRUE(i < wp_db->snapshots_total_); | |
982 | // This test is in small scale and the list of snapshots are assumed to be | |
983 | // within the cache size limit. This is just a safety check to double check | |
984 | // that assumption. | |
985 | ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE); | |
986 | ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]); | |
987 | } | |
988 | } | |
989 | ||
990 | // This tests that transactions with duplicate keys perform correctly after max | |
991 | // is advancing their prepared sequence numbers. This will not be the case if | |
992 | // for example the txn does not add the prepared seq for the second sub-batch to | |
993 | // the PrepareHeap structure. | |
994 | TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) { | |
995 | WriteOptions write_options; | |
996 | TransactionOptions txn_options; | |
997 | Transaction* txn0 = db->BeginTransaction(write_options, txn_options); | |
998 | ASSERT_OK(txn0->SetName("xid")); | |
999 | ASSERT_OK(txn0->Put(Slice("key"), Slice("value1"))); | |
1000 | ASSERT_OK(txn0->Put(Slice("key"), Slice("value2"))); | |
1001 | ASSERT_OK(txn0->Prepare()); | |
1002 | ||
1003 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1004 | // Ensure that all the prepared sequence numbers will be removed from the | |
1005 | // PrepareHeap. | |
1006 | SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE; | |
1007 | wp_db->AdvanceMaxEvictedSeq(0, new_max); | |
1008 | ||
1009 | ReadOptions ropt; | |
1010 | PinnableSlice pinnable_val; | |
1011 | auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); | |
1012 | ASSERT_TRUE(s.IsNotFound()); | |
1013 | delete txn0; | |
1014 | ||
1015 | wp_db->db_impl_->FlushWAL(true); | |
1016 | wp_db->TEST_Crash(); | |
1017 | ReOpenNoDelete(); | |
1018 | assert(db != nullptr); | |
1019 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1020 | wp_db->AdvanceMaxEvictedSeq(0, new_max); | |
1021 | s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); | |
1022 | ASSERT_TRUE(s.IsNotFound()); | |
1023 | ||
1024 | txn0 = db->GetTransactionByName("xid"); | |
1025 | ASSERT_OK(txn0->Rollback()); | |
1026 | delete txn0; | |
1027 | } | |
1028 | ||
1029 | TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) { | |
1030 | // Given the sequential run of txns, with this timeout we should never see a | |
1031 | // deadlock nor a timeout unless we have a key conflict, which should be | |
1032 | // almost infeasible. | |
1033 | txn_db_options.transaction_lock_timeout = 1000; | |
1034 | txn_db_options.default_lock_timeout = 1000; | |
1035 | ReOpen(); | |
1036 | FlushOptions fopt; | |
1037 | ||
1038 | // Number of different txn types we use in this test | |
1039 | const size_t type_cnt = 5; | |
1040 | // The size of the first write group | |
1041 | // TODO(myabandeh): This should be increase for pre-release tests | |
1042 | const size_t first_group_size = 2; | |
1043 | // Total number of txns we run in each test | |
1044 | // TODO(myabandeh): This should be increase for pre-release tests | |
1045 | const size_t txn_cnt = first_group_size + 1; | |
1046 | ||
1047 | size_t base[txn_cnt + 1] = { | |
1048 | 1, | |
1049 | }; | |
1050 | for (size_t bi = 1; bi <= txn_cnt; bi++) { | |
1051 | base[bi] = base[bi - 1] * type_cnt; | |
1052 | } | |
1053 | const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt)); | |
1054 | printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n); | |
1055 | for (size_t n = 0; n < max_n; n++, ReOpen()) { | |
1056 | if (n % split_cnt_ != split_id_) continue; | |
1057 | if (n % 1000 == 0) { | |
1058 | printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n); | |
1059 | } | |
1060 | DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); | |
1061 | auto seq = db_impl->TEST_GetLastVisibleSequence(); | |
1062 | exp_seq = seq; | |
1063 | // This is increased before writing the batch for commit | |
1064 | commit_writes = 0; | |
1065 | // This is increased before txn starts linking if it expects to do a commit | |
1066 | // eventually | |
1067 | expected_commits = 0; | |
1068 | std::vector<port::Thread> threads; | |
1069 | ||
1070 | linked = 0; | |
1071 | std::atomic<bool> batch_formed(false); | |
1072 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
1073 | "WriteThread::EnterAsBatchGroupLeader:End", | |
1074 | [&](void* /*arg*/) { batch_formed = true; }); | |
1075 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
1076 | "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) { | |
1077 | linked++; | |
1078 | if (linked == 1) { | |
1079 | // Wait until the others are linked too. | |
1080 | while (linked < first_group_size) { | |
1081 | } | |
1082 | } else if (linked == 1 + first_group_size) { | |
1083 | // Make the 2nd batch of the rest of writes plus any followup | |
1084 | // commits from the first batch | |
1085 | while (linked < txn_cnt + commit_writes) { | |
1086 | } | |
1087 | } | |
1088 | // Then we will have one or more batches consisting of follow-up | |
1089 | // commits from the 2nd batch. There is a bit of non-determinism here | |
1090 | // but it should be tolerable. | |
1091 | }); | |
1092 | ||
1093 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
1094 | for (size_t bi = 0; bi < txn_cnt; bi++) { | |
1095 | // get the bi-th digit in number system based on type_cnt | |
1096 | size_t d = (n % base[bi + 1]) / base[bi]; | |
1097 | switch (d) { | |
1098 | case 0: | |
1099 | threads.emplace_back(txn_t0, bi); | |
1100 | break; | |
1101 | case 1: | |
1102 | threads.emplace_back(txn_t1, bi); | |
1103 | break; | |
1104 | case 2: | |
1105 | threads.emplace_back(txn_t2, bi); | |
1106 | break; | |
1107 | case 3: | |
1108 | threads.emplace_back(txn_t3, bi); | |
1109 | break; | |
1110 | case 4: | |
1111 | threads.emplace_back(txn_t3, bi); | |
1112 | break; | |
1113 | default: | |
1114 | assert(false); | |
1115 | } | |
1116 | // wait to be linked | |
1117 | while (linked.load() <= bi) { | |
1118 | } | |
1119 | // after a queue of size first_group_size | |
1120 | if (bi + 1 == first_group_size) { | |
1121 | while (!batch_formed) { | |
1122 | } | |
1123 | // to make it more deterministic, wait until the commits are linked | |
1124 | while (linked.load() <= bi + expected_commits) { | |
1125 | } | |
1126 | } | |
1127 | } | |
1128 | for (auto& t : threads) { | |
1129 | t.join(); | |
1130 | } | |
1131 | if (options.two_write_queues) { | |
1132 | // In this case none of the above scheduling tricks to deterministically | |
1133 | // form merged batches works because the writes go to separate queues. | |
1134 | // This would result in different write groups in each run of the test. We | |
1135 | // still keep the test since although non-deterministic and hard to debug, | |
1136 | // it is still useful to have. | |
1137 | // TODO(myabandeh): Add a deterministic unit test for two_write_queues | |
1138 | } | |
1139 | ||
1140 | // Check if memtable inserts advanced seq number as expected | |
1141 | seq = db_impl->TEST_GetLastVisibleSequence(); | |
1142 | ASSERT_EQ(exp_seq, seq); | |
1143 | ||
1144 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
1145 | rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
1146 | ||
1147 | // Check if recovery preserves the last sequence number | |
1148 | db_impl->FlushWAL(true); | |
1149 | ReOpenNoDelete(); | |
1150 | assert(db != nullptr); | |
1151 | db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); | |
1152 | seq = db_impl->TEST_GetLastVisibleSequence(); | |
1153 | ASSERT_EQ(exp_seq, seq); | |
1154 | ||
1155 | // Check if flush preserves the last sequence number | |
1156 | db_impl->Flush(fopt); | |
1157 | seq = db_impl->GetLatestSequenceNumber(); | |
1158 | ASSERT_EQ(exp_seq, seq); | |
1159 | ||
1160 | // Check if recovery after flush preserves the last sequence number | |
1161 | db_impl->FlushWAL(true); | |
1162 | ReOpenNoDelete(); | |
1163 | assert(db != nullptr); | |
1164 | db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); | |
1165 | seq = db_impl->GetLatestSequenceNumber(); | |
1166 | ASSERT_EQ(exp_seq, seq); | |
1167 | } | |
1168 | } | |
1169 | ||
1170 | // Run a couple of different txns among them some uncommitted. Restart the db at | |
1171 | // a couple points to check whether the list of uncommitted txns are recovered | |
1172 | // properly. | |
1173 | TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) { | |
1174 | options.disable_auto_compactions = true; | |
1175 | ReOpen(); | |
1176 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1177 | ||
1178 | txn_t0(0); | |
1179 | ||
1180 | TransactionOptions txn_options; | |
1181 | WriteOptions write_options; | |
1182 | size_t index = 1000; | |
1183 | Transaction* txn0 = db->BeginTransaction(write_options, txn_options); | |
1184 | auto istr0 = std::to_string(index); | |
1185 | auto s = txn0->SetName("xid" + istr0); | |
1186 | ASSERT_OK(s); | |
1187 | s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0)); | |
1188 | ASSERT_OK(s); | |
1189 | s = txn0->Prepare(); | |
1190 | auto prep_seq_0 = txn0->GetId(); | |
1191 | ||
1192 | txn_t1(0); | |
1193 | ||
1194 | index++; | |
1195 | Transaction* txn1 = db->BeginTransaction(write_options, txn_options); | |
1196 | auto istr1 = std::to_string(index); | |
1197 | s = txn1->SetName("xid" + istr1); | |
1198 | ASSERT_OK(s); | |
1199 | s = txn1->Put(Slice("foo1" + istr1), Slice("bar")); | |
1200 | ASSERT_OK(s); | |
1201 | s = txn1->Prepare(); | |
1202 | auto prep_seq_1 = txn1->GetId(); | |
1203 | ||
1204 | txn_t2(0); | |
1205 | ||
1206 | ReadOptions ropt; | |
1207 | PinnableSlice pinnable_val; | |
1208 | // Check the value is not committed before restart | |
1209 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1210 | ASSERT_TRUE(s.IsNotFound()); | |
1211 | pinnable_val.Reset(); | |
1212 | ||
1213 | delete txn0; | |
1214 | delete txn1; | |
1215 | wp_db->db_impl_->FlushWAL(true); | |
1216 | wp_db->TEST_Crash(); | |
1217 | ReOpenNoDelete(); | |
1218 | assert(db != nullptr); | |
1219 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1220 | // After recovery, all the uncommitted txns (0 and 1) should be inserted into | |
1221 | // delayed_prepared_ | |
1222 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1223 | ASSERT_FALSE(wp_db->delayed_prepared_empty_); | |
1224 | ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_); | |
1225 | ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_); | |
1226 | { | |
1227 | ReadLock rl(&wp_db->prepared_mutex_); | |
1228 | ASSERT_EQ(2, wp_db->delayed_prepared_.size()); | |
1229 | ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) != | |
1230 | wp_db->delayed_prepared_.end()); | |
1231 | ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) != | |
1232 | wp_db->delayed_prepared_.end()); | |
1233 | } | |
1234 | ||
1235 | // Check the value is still not committed after restart | |
1236 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1237 | ASSERT_TRUE(s.IsNotFound()); | |
1238 | pinnable_val.Reset(); | |
1239 | ||
1240 | txn_t3(0); | |
1241 | ||
1242 | // Test that a recovered txns will be properly marked committed for the next | |
1243 | // recovery | |
1244 | txn1 = db->GetTransactionByName("xid" + istr1); | |
1245 | ASSERT_NE(txn1, nullptr); | |
1246 | txn1->Commit(); | |
1247 | delete txn1; | |
1248 | ||
1249 | index++; | |
1250 | Transaction* txn2 = db->BeginTransaction(write_options, txn_options); | |
1251 | auto istr2 = std::to_string(index); | |
1252 | s = txn2->SetName("xid" + istr2); | |
1253 | ASSERT_OK(s); | |
1254 | s = txn2->Put(Slice("foo2" + istr2), Slice("bar")); | |
1255 | ASSERT_OK(s); | |
1256 | s = txn2->Prepare(); | |
1257 | auto prep_seq_2 = txn2->GetId(); | |
1258 | ||
1259 | delete txn2; | |
1260 | wp_db->db_impl_->FlushWAL(true); | |
1261 | wp_db->TEST_Crash(); | |
1262 | ReOpenNoDelete(); | |
1263 | assert(db != nullptr); | |
1264 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1265 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1266 | ASSERT_FALSE(wp_db->delayed_prepared_empty_); | |
1267 | ||
1268 | // 0 and 2 are prepared and 1 is committed | |
1269 | { | |
1270 | ReadLock rl(&wp_db->prepared_mutex_); | |
1271 | ASSERT_EQ(2, wp_db->delayed_prepared_.size()); | |
1272 | const auto& end = wp_db->delayed_prepared_.end(); | |
1273 | ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end); | |
1274 | ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end); | |
1275 | ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end); | |
1276 | } | |
1277 | ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_); | |
1278 | ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_); | |
1279 | ||
1280 | // Commit all the remaining txns | |
1281 | txn0 = db->GetTransactionByName("xid" + istr0); | |
1282 | ASSERT_NE(txn0, nullptr); | |
1283 | txn0->Commit(); | |
1284 | txn2 = db->GetTransactionByName("xid" + istr2); | |
1285 | ASSERT_NE(txn2, nullptr); | |
1286 | txn2->Commit(); | |
1287 | ||
1288 | // Check the value is committed after commit | |
1289 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1290 | ASSERT_TRUE(s.ok()); | |
1291 | ASSERT_TRUE(pinnable_val == ("bar0" + istr0)); | |
1292 | pinnable_val.Reset(); | |
1293 | ||
1294 | delete txn0; | |
1295 | delete txn2; | |
1296 | wp_db->db_impl_->FlushWAL(true); | |
1297 | ReOpenNoDelete(); | |
1298 | assert(db != nullptr); | |
1299 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1300 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1301 | ASSERT_TRUE(wp_db->delayed_prepared_empty_); | |
1302 | ||
1303 | // Check the value is still committed after recovery | |
1304 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1305 | ASSERT_TRUE(s.ok()); | |
1306 | ASSERT_TRUE(pinnable_val == ("bar0" + istr0)); | |
1307 | pinnable_val.Reset(); | |
1308 | } | |
1309 | ||
1310 | // After recovery the commit map is empty while the max is set. The code would | |
1311 | // go through a different path which requires a separate test. | |
1312 | TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) { | |
1313 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1314 | wp_db->max_evicted_seq_ = 100; | |
1315 | ASSERT_FALSE(wp_db->IsInSnapshot(50, 40)); | |
1316 | ASSERT_TRUE(wp_db->IsInSnapshot(50, 50)); | |
1317 | ASSERT_TRUE(wp_db->IsInSnapshot(50, 100)); | |
1318 | ASSERT_TRUE(wp_db->IsInSnapshot(50, 150)); | |
1319 | ASSERT_FALSE(wp_db->IsInSnapshot(100, 80)); | |
1320 | ASSERT_TRUE(wp_db->IsInSnapshot(100, 100)); | |
1321 | ASSERT_TRUE(wp_db->IsInSnapshot(100, 150)); | |
1322 | } | |
1323 | ||
1324 | // Test WritePreparedTxnDB's IsInSnapshot against different ordering of | |
1325 | // snapshot, max_committed_seq_, prepared, and commit entries. | |
1326 | TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { | |
1327 | WriteOptions wo; | |
1328 | // Use small commit cache to trigger lots of eviction and fast advance of | |
1329 | // max_evicted_seq_ | |
1330 | const size_t commit_cache_bits = 3; | |
1331 | // Same for snapshot cache size | |
1332 | const size_t snapshot_cache_bits = 2; | |
1333 | ||
1334 | // Take some preliminary snapshots first. This is to stress the data structure | |
1335 | // that holds the old snapshots as it will be designed to be efficient when | |
1336 | // only a few snapshots are below the max_evicted_seq_. | |
1337 | for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) { | |
1338 | // Leave some gap between the preliminary snapshots and the final snapshot | |
1339 | // that we check. This should test for also different overlapping scenarios | |
1340 | // between the last snapshot and the commits. | |
1341 | for (int max_gap = 1; max_gap < 10; max_gap++) { | |
1342 | // Since we do not actually write to db, we mock the seq as it would be | |
1343 | // increased by the db. The only exception is that we need db seq to | |
1344 | // advance for our snapshots. for which we apply a dummy put each time we | |
1345 | // increase our mock of seq. | |
1346 | uint64_t seq = 0; | |
1347 | // At each step we prepare a txn and then we commit it in the next txn. | |
1348 | // This emulates the consecutive transactions that write to the same key | |
1349 | uint64_t cur_txn = 0; | |
1350 | // Number of snapshots taken so far | |
1351 | int num_snapshots = 0; | |
1352 | // Number of gaps applied so far | |
1353 | int gap_cnt = 0; | |
1354 | // The final snapshot that we will inspect | |
1355 | uint64_t snapshot = 0; | |
1356 | bool found_committed = false; | |
1357 | // To stress the data structure that maintain prepared txns, at each cycle | |
1358 | // we add a new prepare txn. These do not mean to be committed for | |
1359 | // snapshot inspection. | |
1360 | std::set<uint64_t> prepared; | |
1361 | // We keep the list of txns committed before we take the last snapshot. | |
1362 | // These should be the only seq numbers that will be found in the snapshot | |
1363 | std::set<uint64_t> committed_before; | |
1364 | // The set of commit seq numbers to be excluded from IsInSnapshot queries | |
1365 | std::set<uint64_t> commit_seqs; | |
1366 | DBImpl* mock_db = new DBImpl(options, dbname); | |
1367 | std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock( | |
1368 | mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits)); | |
1369 | // We continue until max advances a bit beyond the snapshot. | |
1370 | while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { | |
1371 | // do prepare for a transaction | |
1372 | seq++; | |
1373 | wp_db->AddPrepared(seq); | |
1374 | prepared.insert(seq); | |
1375 | ||
1376 | // If cur_txn is not started, do prepare for it. | |
1377 | if (!cur_txn) { | |
1378 | seq++; | |
1379 | cur_txn = seq; | |
1380 | wp_db->AddPrepared(cur_txn); | |
1381 | } else { // else commit it | |
1382 | seq++; | |
1383 | wp_db->AddCommitted(cur_txn, seq); | |
1384 | wp_db->RemovePrepared(cur_txn); | |
1385 | commit_seqs.insert(seq); | |
1386 | if (!snapshot) { | |
1387 | committed_before.insert(cur_txn); | |
1388 | } | |
1389 | cur_txn = 0; | |
1390 | } | |
1391 | ||
1392 | if (num_snapshots < max_snapshots - 1) { | |
1393 | // Take preliminary snapshots | |
1394 | wp_db->TakeSnapshot(seq); | |
1395 | num_snapshots++; | |
1396 | } else if (gap_cnt < max_gap) { | |
1397 | // Wait for some gap before taking the final snapshot | |
1398 | gap_cnt++; | |
1399 | } else if (!snapshot) { | |
1400 | // Take the final snapshot if it is not already taken | |
1401 | snapshot = seq; | |
1402 | wp_db->TakeSnapshot(snapshot); | |
1403 | num_snapshots++; | |
1404 | } | |
1405 | ||
1406 | // If the snapshot is taken, verify seq numbers visible to it. We redo | |
1407 | // it at each cycle to test that the system is still sound when | |
1408 | // max_evicted_seq_ advances. | |
1409 | if (snapshot) { | |
1410 | for (uint64_t s = 1; | |
1411 | s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) { | |
1412 | bool was_committed = | |
1413 | (committed_before.find(s) != committed_before.end()); | |
1414 | bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot); | |
1415 | if (was_committed != is_in_snapshot) { | |
1416 | printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64 | |
1417 | " snapshot %" PRIu64 | |
1418 | " gap_cnt %d num_snapshots %d s %" PRIu64 "\n", | |
1419 | max_snapshots, max_gap, seq, | |
1420 | wp_db->max_evicted_seq_.load(), snapshot, gap_cnt, | |
1421 | num_snapshots, s); | |
1422 | } | |
1423 | ASSERT_EQ(was_committed, is_in_snapshot); | |
1424 | found_committed = found_committed || is_in_snapshot; | |
1425 | } | |
1426 | } | |
1427 | } | |
1428 | // Safety check to make sure the test actually ran | |
1429 | ASSERT_TRUE(found_committed); | |
1430 | // As an extra check, check if prepared set will be properly empty after | |
1431 | // they are committed. | |
1432 | if (cur_txn) { | |
1433 | wp_db->AddCommitted(cur_txn, seq); | |
1434 | wp_db->RemovePrepared(cur_txn); | |
1435 | } | |
1436 | for (auto p : prepared) { | |
1437 | wp_db->AddCommitted(p, seq); | |
1438 | wp_db->RemovePrepared(p); | |
1439 | } | |
1440 | ASSERT_TRUE(wp_db->delayed_prepared_.empty()); | |
1441 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1442 | } | |
1443 | } | |
1444 | } | |
1445 | ||
1446 | void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s, | |
1447 | PinnableSlice& exp_v, Slice key) { | |
1448 | Status s; | |
1449 | PinnableSlice v; | |
1450 | s = db->Get(roptions, db->DefaultColumnFamily(), key, &v); | |
1451 | ASSERT_TRUE(exp_s == s); | |
1452 | ASSERT_TRUE(s.ok() || s.IsNotFound()); | |
1453 | if (s.ok()) { | |
1454 | ASSERT_TRUE(exp_v == v); | |
1455 | } | |
1456 | ||
1457 | // Try with MultiGet API too | |
1458 | std::vector<std::string> values; | |
1459 | auto s_vec = | |
1460 | db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values); | |
1461 | ASSERT_EQ(1, values.size()); | |
1462 | ASSERT_EQ(1, s_vec.size()); | |
1463 | s = s_vec[0]; | |
1464 | ASSERT_TRUE(exp_s == s); | |
1465 | ASSERT_TRUE(s.ok() || s.IsNotFound()); | |
1466 | if (s.ok()) { | |
1467 | ASSERT_TRUE(exp_v == values[0]); | |
1468 | } | |
1469 | } | |
1470 | ||
1471 | void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v, | |
1472 | Slice key) { | |
1473 | ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key); | |
1474 | } | |
1475 | ||
1476 | TEST_P(WritePreparedTransactionTest, RollbackTest) { | |
1477 | ReadOptions roptions; | |
1478 | WriteOptions woptions; | |
1479 | TransactionOptions txn_options; | |
1480 | const size_t num_keys = 4; | |
1481 | const size_t num_values = 5; | |
1482 | for (size_t ikey = 1; ikey <= num_keys; ikey++) { | |
1483 | for (size_t ivalue = 0; ivalue < num_values; ivalue++) { | |
1484 | for (bool crash : {false, true}) { | |
1485 | ReOpen(); | |
1486 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1487 | std::string key_str = "key" + ToString(ikey); | |
1488 | switch (ivalue) { | |
1489 | case 0: | |
1490 | break; | |
1491 | case 1: | |
1492 | ASSERT_OK(db->Put(woptions, key_str, "initvalue1")); | |
1493 | break; | |
1494 | case 2: | |
1495 | ASSERT_OK(db->Merge(woptions, key_str, "initvalue2")); | |
1496 | break; | |
1497 | case 3: | |
1498 | ASSERT_OK(db->Delete(woptions, key_str)); | |
1499 | break; | |
1500 | case 4: | |
1501 | ASSERT_OK(db->SingleDelete(woptions, key_str)); | |
1502 | break; | |
1503 | default: | |
1504 | assert(0); | |
1505 | } | |
1506 | ||
1507 | PinnableSlice v1; | |
1508 | auto s1 = | |
1509 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1); | |
1510 | PinnableSlice v2; | |
1511 | auto s2 = | |
1512 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2); | |
1513 | PinnableSlice v3; | |
1514 | auto s3 = | |
1515 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3); | |
1516 | PinnableSlice v4; | |
1517 | auto s4 = | |
1518 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4); | |
1519 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
1520 | auto s = txn->SetName("xid0"); | |
1521 | ASSERT_OK(s); | |
1522 | s = txn->Put(Slice("key1"), Slice("value1")); | |
1523 | ASSERT_OK(s); | |
1524 | s = txn->Merge(Slice("key2"), Slice("value2")); | |
1525 | ASSERT_OK(s); | |
1526 | s = txn->Delete(Slice("key3")); | |
1527 | ASSERT_OK(s); | |
1528 | s = txn->SingleDelete(Slice("key4")); | |
1529 | ASSERT_OK(s); | |
1530 | s = txn->Prepare(); | |
1531 | ASSERT_OK(s); | |
1532 | ||
1533 | { | |
1534 | ReadLock rl(&wp_db->prepared_mutex_); | |
1535 | ASSERT_FALSE(wp_db->prepared_txns_.empty()); | |
1536 | ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top()); | |
1537 | } | |
1538 | ||
1539 | ASSERT_SAME(db, s1, v1, "key1"); | |
1540 | ASSERT_SAME(db, s2, v2, "key2"); | |
1541 | ASSERT_SAME(db, s3, v3, "key3"); | |
1542 | ASSERT_SAME(db, s4, v4, "key4"); | |
1543 | ||
1544 | if (crash) { | |
1545 | delete txn; | |
1546 | auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); | |
1547 | db_impl->FlushWAL(true); | |
1548 | dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash(); | |
1549 | ReOpenNoDelete(); | |
1550 | assert(db != nullptr); | |
1551 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1552 | txn = db->GetTransactionByName("xid0"); | |
1553 | ASSERT_FALSE(wp_db->delayed_prepared_empty_); | |
1554 | ReadLock rl(&wp_db->prepared_mutex_); | |
1555 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1556 | ASSERT_FALSE(wp_db->delayed_prepared_.empty()); | |
1557 | ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) != | |
1558 | wp_db->delayed_prepared_.end()); | |
1559 | } | |
1560 | ||
1561 | ASSERT_SAME(db, s1, v1, "key1"); | |
1562 | ASSERT_SAME(db, s2, v2, "key2"); | |
1563 | ASSERT_SAME(db, s3, v3, "key3"); | |
1564 | ASSERT_SAME(db, s4, v4, "key4"); | |
1565 | ||
1566 | s = txn->Rollback(); | |
1567 | ASSERT_OK(s); | |
1568 | ||
1569 | { | |
1570 | ASSERT_TRUE(wp_db->delayed_prepared_empty_); | |
1571 | ReadLock rl(&wp_db->prepared_mutex_); | |
1572 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1573 | ASSERT_TRUE(wp_db->delayed_prepared_.empty()); | |
1574 | } | |
1575 | ||
1576 | ASSERT_SAME(db, s1, v1, "key1"); | |
1577 | ASSERT_SAME(db, s2, v2, "key2"); | |
1578 | ASSERT_SAME(db, s3, v3, "key3"); | |
1579 | ASSERT_SAME(db, s4, v4, "key4"); | |
1580 | delete txn; | |
1581 | } | |
1582 | } | |
1583 | } | |
1584 | } | |
1585 | ||
1586 | TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) { | |
1587 | // Use large buffer to avoid memtable flush after 1024 insertions | |
1588 | options.write_buffer_size = 1024 * 1024; | |
1589 | ReOpen(); | |
1590 | std::vector<KeyVersion> versions; | |
1591 | uint64_t seq = 0; | |
1592 | for (uint64_t i = 1; i <= 1024; i++) { | |
1593 | std::string v = "bar" + ToString(i); | |
1594 | ASSERT_OK(db->Put(WriteOptions(), "foo", v)); | |
1595 | VerifyKeys({{"foo", v}}); | |
1596 | seq++; // one for the key/value | |
1597 | KeyVersion kv = {"foo", v, seq, kTypeValue}; | |
1598 | if (options.two_write_queues) { | |
1599 | seq++; // one for the commit | |
1600 | } | |
1601 | versions.emplace_back(kv); | |
1602 | } | |
1603 | std::reverse(std::begin(versions), std::end(versions)); | |
1604 | VerifyInternalKeys(versions); | |
1605 | DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); | |
1606 | db_impl->FlushWAL(true); | |
1607 | // Use small buffer to ensure memtable flush during recovery | |
1608 | options.write_buffer_size = 1024; | |
1609 | ReOpenNoDelete(); | |
1610 | VerifyInternalKeys(versions); | |
1611 | } | |
1612 | ||
1613 | TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) { | |
1614 | ASSERT_OK(db->Put(WriteOptions(), "foo", "bar")); | |
1615 | VerifyKeys({{"foo", "bar"}}); | |
1616 | const Snapshot* snapshot = db->GetSnapshot(); | |
1617 | ASSERT_OK(db->Flush(FlushOptions())); | |
1618 | // Dummy keys to avoid compaction trivially move files and get around actual | |
1619 | // compaction logic. | |
1620 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
1621 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
1622 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
1623 | // Compaction will output keys with sequence number 0, if it is visible to | |
1624 | // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is | |
1625 | // visible to any snapshot. | |
1626 | VerifyKeys({{"foo", "bar"}}); | |
1627 | VerifyKeys({{"foo", "bar"}}, snapshot); | |
1628 | VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}}); | |
1629 | db->ReleaseSnapshot(snapshot); | |
1630 | } | |
1631 | ||
1632 | // Compaction should not remove a key if it is not committed, and should | |
1633 | // proceed with older versions of the key as-if the new version doesn't exist. | |
1634 | TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) { | |
1635 | options.disable_auto_compactions = true; | |
1636 | ReOpen(); | |
1637 | DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); | |
1638 | // Snapshots to avoid keys get evicted. | |
1639 | std::vector<const Snapshot*> snapshots; | |
1640 | // Keep track of expected sequence number. | |
1641 | SequenceNumber expected_seq = 0; | |
1642 | ||
1643 | auto add_key = [&](std::function<Status()> func) { | |
1644 | ASSERT_OK(func()); | |
1645 | expected_seq++; | |
1646 | if (options.two_write_queues) { | |
1647 | expected_seq++; // 1 for commit | |
1648 | } | |
1649 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
1650 | snapshots.push_back(db->GetSnapshot()); | |
1651 | }; | |
1652 | ||
1653 | // Each key here represent a standalone test case. | |
1654 | add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); }); | |
1655 | add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); }); | |
1656 | add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); }); | |
1657 | add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); }); | |
1658 | add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); }); | |
1659 | add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); }); | |
1660 | add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); }); | |
1661 | add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); }); | |
1662 | ASSERT_OK(db->Flush(FlushOptions())); | |
1663 | add_key([&]() { return db->Delete(WriteOptions(), "key6"); }); | |
1664 | add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); }); | |
1665 | ||
1666 | auto* transaction = db->BeginTransaction(WriteOptions()); | |
1667 | ASSERT_OK(transaction->SetName("txn")); | |
1668 | ASSERT_OK(transaction->Put("key1", "value1_2")); | |
1669 | ASSERT_OK(transaction->Delete("key2")); | |
1670 | ASSERT_OK(transaction->SingleDelete("key3")); | |
1671 | ASSERT_OK(transaction->Merge("key4", "value4_2")); | |
1672 | ASSERT_OK(transaction->Merge("key5", "value5_3")); | |
1673 | ASSERT_OK(transaction->Put("key6", "value6_2")); | |
1674 | ASSERT_OK(transaction->Put("key7", "value7_2")); | |
1675 | // Prepare but not commit. | |
1676 | ASSERT_OK(transaction->Prepare()); | |
1677 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
1678 | ASSERT_OK(db->Flush(FlushOptions())); | |
1679 | for (auto* s : snapshots) { | |
1680 | db->ReleaseSnapshot(s); | |
1681 | } | |
1682 | // Dummy keys to avoid compaction trivially move files and get around actual | |
1683 | // compaction logic. | |
1684 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
1685 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
1686 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
1687 | VerifyKeys({ | |
1688 | {"key1", "value1_1"}, | |
1689 | {"key2", "value2_1"}, | |
1690 | {"key3", "value3_1"}, | |
1691 | {"key4", "value4_1"}, | |
1692 | {"key5", "value5_1,value5_2"}, | |
1693 | {"key6", "NOT_FOUND"}, | |
1694 | {"key7", "NOT_FOUND"}, | |
1695 | }); | |
1696 | VerifyInternalKeys({ | |
1697 | {"key1", "value1_2", expected_seq, kTypeValue}, | |
1698 | {"key1", "value1_1", 0, kTypeValue}, | |
1699 | {"key2", "", expected_seq, kTypeDeletion}, | |
1700 | {"key2", "value2_1", 0, kTypeValue}, | |
1701 | {"key3", "", expected_seq, kTypeSingleDeletion}, | |
1702 | {"key3", "value3_1", 0, kTypeValue}, | |
1703 | {"key4", "value4_2", expected_seq, kTypeMerge}, | |
1704 | {"key4", "value4_1", 0, kTypeValue}, | |
1705 | {"key5", "value5_3", expected_seq, kTypeMerge}, | |
1706 | {"key5", "value5_1,value5_2", 0, kTypeValue}, | |
1707 | {"key6", "value6_2", expected_seq, kTypeValue}, | |
1708 | {"key7", "value7_2", expected_seq, kTypeValue}, | |
1709 | }); | |
1710 | ASSERT_OK(transaction->Commit()); | |
1711 | VerifyKeys({ | |
1712 | {"key1", "value1_2"}, | |
1713 | {"key2", "NOT_FOUND"}, | |
1714 | {"key3", "NOT_FOUND"}, | |
1715 | {"key4", "value4_1,value4_2"}, | |
1716 | {"key5", "value5_1,value5_2,value5_3"}, | |
1717 | {"key6", "value6_2"}, | |
1718 | {"key7", "value7_2"}, | |
1719 | }); | |
1720 | delete transaction; | |
1721 | } | |
1722 | ||
1723 | // Compaction should keep keys visible to a snapshot based on commit sequence, | |
1724 | // not just prepare sequence. | |
1725 | TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { | |
1726 | options.disable_auto_compactions = true; | |
1727 | ReOpen(); | |
1728 | // Keep track of expected sequence number. | |
1729 | SequenceNumber expected_seq = 0; | |
1730 | auto* txn1 = db->BeginTransaction(WriteOptions()); | |
1731 | ASSERT_OK(txn1->SetName("txn1")); | |
1732 | ASSERT_OK(txn1->Put("key1", "value1_1")); | |
1733 | ASSERT_OK(txn1->Prepare()); | |
1734 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
1735 | ASSERT_OK(txn1->Commit()); | |
1736 | DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); | |
1737 | ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
1738 | delete txn1; | |
1739 | // Take a snapshots to avoid keys get evicted before compaction. | |
1740 | const Snapshot* snapshot1 = db->GetSnapshot(); | |
1741 | auto* txn2 = db->BeginTransaction(WriteOptions()); | |
1742 | ASSERT_OK(txn2->SetName("txn2")); | |
1743 | ASSERT_OK(txn2->Put("key2", "value2_1")); | |
1744 | ASSERT_OK(txn2->Prepare()); | |
1745 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
1746 | // txn1 commit before snapshot2 and it is visible to snapshot2. | |
1747 | // txn2 commit after snapshot2 and it is not visible. | |
1748 | const Snapshot* snapshot2 = db->GetSnapshot(); | |
1749 | ASSERT_OK(txn2->Commit()); | |
1750 | ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
1751 | delete txn2; | |
1752 | // Take a snapshots to avoid keys get evicted before compaction. | |
1753 | const Snapshot* snapshot3 = db->GetSnapshot(); | |
1754 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2")); | |
1755 | expected_seq++; // 1 for write | |
1756 | SequenceNumber seq1 = expected_seq; | |
1757 | if (options.two_write_queues) { | |
1758 | expected_seq++; // 1 for commit | |
1759 | } | |
1760 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
1761 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2")); | |
1762 | expected_seq++; // 1 for write | |
1763 | SequenceNumber seq2 = expected_seq; | |
1764 | if (options.two_write_queues) { | |
1765 | expected_seq++; // 1 for commit | |
1766 | } | |
1767 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
1768 | ASSERT_OK(db->Flush(FlushOptions())); | |
1769 | db->ReleaseSnapshot(snapshot1); | |
1770 | db->ReleaseSnapshot(snapshot3); | |
1771 | // Dummy keys to avoid compaction trivially move files and get around actual | |
1772 | // compaction logic. | |
1773 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
1774 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
1775 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
1776 | VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}}); | |
1777 | VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2); | |
1778 | VerifyInternalKeys({ | |
1779 | {"key1", "value1_2", seq1, kTypeValue}, | |
1780 | // "value1_1" is visible to snapshot2. Also keys at bottom level visible | |
1781 | // to earliest snapshot will output with seq = 0. | |
1782 | {"key1", "value1_1", 0, kTypeValue}, | |
1783 | {"key2", "value2_2", seq2, kTypeValue}, | |
1784 | }); | |
1785 | db->ReleaseSnapshot(snapshot2); | |
1786 | } | |
1787 | ||
1788 | // A more complex test to verify compaction/flush should keep keys visible | |
1789 | // to snapshots. | |
1790 | TEST_P(WritePreparedTransactionTest, | |
1791 | CompactionShouldKeepSnapshotVisibleKeysRandomized) { | |
1792 | constexpr size_t kNumTransactions = 10; | |
1793 | constexpr size_t kNumIterations = 1000; | |
1794 | ||
1795 | std::vector<Transaction*> transactions(kNumTransactions, nullptr); | |
1796 | std::vector<size_t> versions(kNumTransactions, 0); | |
1797 | std::unordered_map<std::string, std::string> current_data; | |
1798 | std::vector<const Snapshot*> snapshots; | |
1799 | std::vector<std::unordered_map<std::string, std::string>> snapshot_data; | |
1800 | ||
1801 | Random rnd(1103); | |
1802 | options.disable_auto_compactions = true; | |
1803 | ReOpen(); | |
1804 | ||
1805 | for (size_t i = 0; i < kNumTransactions; i++) { | |
1806 | std::string key = "key" + ToString(i); | |
1807 | std::string value = "value0"; | |
1808 | ASSERT_OK(db->Put(WriteOptions(), key, value)); | |
1809 | current_data[key] = value; | |
1810 | } | |
1811 | VerifyKeys(current_data); | |
1812 | ||
1813 | for (size_t iter = 0; iter < kNumIterations; iter++) { | |
1814 | auto r = rnd.Next() % (kNumTransactions + 1); | |
1815 | if (r < kNumTransactions) { | |
1816 | std::string key = "key" + ToString(r); | |
1817 | if (transactions[r] == nullptr) { | |
1818 | std::string value = "value" + ToString(versions[r] + 1); | |
1819 | auto* txn = db->BeginTransaction(WriteOptions()); | |
1820 | ASSERT_OK(txn->SetName("txn" + ToString(r))); | |
1821 | ASSERT_OK(txn->Put(key, value)); | |
1822 | ASSERT_OK(txn->Prepare()); | |
1823 | transactions[r] = txn; | |
1824 | } else { | |
1825 | std::string value = "value" + ToString(++versions[r]); | |
1826 | ASSERT_OK(transactions[r]->Commit()); | |
1827 | delete transactions[r]; | |
1828 | transactions[r] = nullptr; | |
1829 | current_data[key] = value; | |
1830 | } | |
1831 | } else { | |
1832 | auto* snapshot = db->GetSnapshot(); | |
1833 | VerifyKeys(current_data, snapshot); | |
1834 | snapshots.push_back(snapshot); | |
1835 | snapshot_data.push_back(current_data); | |
1836 | } | |
1837 | VerifyKeys(current_data); | |
1838 | } | |
1839 | // Take a last snapshot to test compaction with uncommitted prepared | |
1840 | // transaction. | |
1841 | snapshots.push_back(db->GetSnapshot()); | |
1842 | snapshot_data.push_back(current_data); | |
1843 | ||
1844 | assert(snapshots.size() == snapshot_data.size()); | |
1845 | for (size_t i = 0; i < snapshots.size(); i++) { | |
1846 | VerifyKeys(snapshot_data[i], snapshots[i]); | |
1847 | } | |
1848 | ASSERT_OK(db->Flush(FlushOptions())); | |
1849 | for (size_t i = 0; i < snapshots.size(); i++) { | |
1850 | VerifyKeys(snapshot_data[i], snapshots[i]); | |
1851 | } | |
1852 | // Dummy keys to avoid compaction trivially move files and get around actual | |
1853 | // compaction logic. | |
1854 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
1855 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
1856 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
1857 | for (size_t i = 0; i < snapshots.size(); i++) { | |
1858 | VerifyKeys(snapshot_data[i], snapshots[i]); | |
1859 | } | |
1860 | // cleanup | |
1861 | for (size_t i = 0; i < kNumTransactions; i++) { | |
1862 | if (transactions[i] == nullptr) { | |
1863 | continue; | |
1864 | } | |
1865 | ASSERT_OK(transactions[i]->Commit()); | |
1866 | delete transactions[i]; | |
1867 | } | |
1868 | for (size_t i = 0; i < snapshots.size(); i++) { | |
1869 | db->ReleaseSnapshot(snapshots[i]); | |
1870 | } | |
1871 | } | |
1872 | ||
1873 | // Compaction should not apply the optimization to output key with sequence | |
1874 | // number equal to 0 if the key is not visible to earliest snapshot, based on | |
1875 | // commit sequence number. | |
1876 | TEST_P(WritePreparedTransactionTest, | |
1877 | CompactionShouldKeepSequenceForUncommittedKeys) { | |
1878 | options.disable_auto_compactions = true; | |
1879 | ReOpen(); | |
1880 | // Keep track of expected sequence number. | |
1881 | SequenceNumber expected_seq = 0; | |
1882 | auto* transaction = db->BeginTransaction(WriteOptions()); | |
1883 | ASSERT_OK(transaction->SetName("txn")); | |
1884 | ASSERT_OK(transaction->Put("key1", "value1")); | |
1885 | ASSERT_OK(transaction->Prepare()); | |
1886 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
1887 | SequenceNumber seq1 = expected_seq; | |
1888 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); | |
1889 | DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); | |
1890 | expected_seq++; // one for data | |
1891 | if (options.two_write_queues) { | |
1892 | expected_seq++; // one for commit | |
1893 | } | |
1894 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
1895 | ASSERT_OK(db->Flush(FlushOptions())); | |
1896 | // Dummy keys to avoid compaction trivially move files and get around actual | |
1897 | // compaction logic. | |
1898 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
1899 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
1900 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
1901 | VerifyKeys({ | |
1902 | {"key1", "NOT_FOUND"}, | |
1903 | {"key2", "value2"}, | |
1904 | }); | |
1905 | VerifyInternalKeys({ | |
1906 | // "key1" has not been committed. It keeps its sequence number. | |
1907 | {"key1", "value1", seq1, kTypeValue}, | |
1908 | // "key2" is committed and output with seq = 0. | |
1909 | {"key2", "value2", 0, kTypeValue}, | |
1910 | }); | |
1911 | ASSERT_OK(transaction->Commit()); | |
1912 | VerifyKeys({ | |
1913 | {"key1", "value1"}, | |
1914 | {"key2", "value2"}, | |
1915 | }); | |
1916 | delete transaction; | |
1917 | } | |
1918 | ||
1919 | TEST_P(WritePreparedTransactionTest, Iterate) { | |
1920 | auto verify_state = [](Iterator* iter, const std::string& key, | |
1921 | const std::string& value) { | |
1922 | ASSERT_TRUE(iter->Valid()); | |
1923 | ASSERT_OK(iter->status()); | |
1924 | ASSERT_EQ(key, iter->key().ToString()); | |
1925 | ASSERT_EQ(value, iter->value().ToString()); | |
1926 | }; | |
1927 | ||
1928 | auto verify_iter = [&](const std::string& expected_val) { | |
1929 | // Get iterator from a concurrent transaction and make sure it has the | |
1930 | // same view as an iterator from the DB. | |
1931 | auto* txn = db->BeginTransaction(WriteOptions()); | |
1932 | ||
1933 | for (int i = 0; i < 2; i++) { | |
1934 | Iterator* iter = (i == 0) | |
1935 | ? db->NewIterator(ReadOptions()) | |
1936 | : txn->GetIterator(ReadOptions()); | |
1937 | // Seek | |
1938 | iter->Seek("foo"); | |
1939 | verify_state(iter, "foo", expected_val); | |
1940 | // Next | |
1941 | iter->Seek("a"); | |
1942 | verify_state(iter, "a", "va"); | |
1943 | iter->Next(); | |
1944 | verify_state(iter, "foo", expected_val); | |
1945 | // SeekForPrev | |
1946 | iter->SeekForPrev("y"); | |
1947 | verify_state(iter, "foo", expected_val); | |
1948 | // Prev | |
1949 | iter->SeekForPrev("z"); | |
1950 | verify_state(iter, "z", "vz"); | |
1951 | iter->Prev(); | |
1952 | verify_state(iter, "foo", expected_val); | |
1953 | delete iter; | |
1954 | } | |
1955 | delete txn; | |
1956 | }; | |
1957 | ||
1958 | ASSERT_OK(db->Put(WriteOptions(), "foo", "v1")); | |
1959 | auto* transaction = db->BeginTransaction(WriteOptions()); | |
1960 | ASSERT_OK(transaction->SetName("txn")); | |
1961 | ASSERT_OK(transaction->Put("foo", "v2")); | |
1962 | ASSERT_OK(transaction->Prepare()); | |
1963 | VerifyKeys({{"foo", "v1"}}); | |
1964 | // dummy keys | |
1965 | ASSERT_OK(db->Put(WriteOptions(), "a", "va")); | |
1966 | ASSERT_OK(db->Put(WriteOptions(), "z", "vz")); | |
1967 | verify_iter("v1"); | |
1968 | ASSERT_OK(transaction->Commit()); | |
1969 | VerifyKeys({{"foo", "v2"}}); | |
1970 | verify_iter("v2"); | |
1971 | delete transaction; | |
1972 | } | |
1973 | ||
1974 | TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) { | |
1975 | Iterator* iter = db->NewIterator(ReadOptions()); | |
1976 | ASSERT_TRUE(iter->Refresh().IsNotSupported()); | |
1977 | delete iter; | |
1978 | } | |
1979 | ||
1980 | // Test that updating the commit map will not affect the existing snapshots | |
1981 | TEST_P(WritePreparedTransactionTest, AtomicCommit) { | |
1982 | for (bool skip_prepare : {true, false}) { | |
1983 | rocksdb::SyncPoint::GetInstance()->LoadDependency({ | |
1984 | {"WritePreparedTxnDB::AddCommitted:start", | |
1985 | "AtomicCommit::GetSnapshot:start"}, | |
1986 | {"AtomicCommit::Get:end", | |
1987 | "WritePreparedTxnDB::AddCommitted:start:pause"}, | |
1988 | {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"}, | |
1989 | {"AtomicCommit::Get2:end", | |
1990 | "WritePreparedTxnDB::AddCommitted:end:pause:"}, | |
1991 | }); | |
1992 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
1993 | rocksdb::port::Thread write_thread([&]() { | |
1994 | if (skip_prepare) { | |
1995 | db->Put(WriteOptions(), Slice("key"), Slice("value")); | |
1996 | } else { | |
1997 | Transaction* txn = | |
1998 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
1999 | ASSERT_OK(txn->SetName("xid")); | |
2000 | ASSERT_OK(txn->Put(Slice("key"), Slice("value"))); | |
2001 | ASSERT_OK(txn->Prepare()); | |
2002 | ASSERT_OK(txn->Commit()); | |
2003 | delete txn; | |
2004 | } | |
2005 | }); | |
2006 | rocksdb::port::Thread read_thread([&]() { | |
2007 | ReadOptions roptions; | |
2008 | TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start"); | |
2009 | roptions.snapshot = db->GetSnapshot(); | |
2010 | PinnableSlice val; | |
2011 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val); | |
2012 | TEST_SYNC_POINT("AtomicCommit::Get:end"); | |
2013 | TEST_SYNC_POINT("AtomicCommit::Get2:start"); | |
2014 | ASSERT_SAME(roptions, db, s, val, "key"); | |
2015 | TEST_SYNC_POINT("AtomicCommit::Get2:end"); | |
2016 | db->ReleaseSnapshot(roptions.snapshot); | |
2017 | }); | |
2018 | read_thread.join(); | |
2019 | write_thread.join(); | |
2020 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
2021 | } | |
2022 | } | |
2023 | ||
2024 | // Test that we can change write policy from WriteCommitted to WritePrepared | |
2025 | // after a clean shutdown (which would empty the WAL) | |
2026 | TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) { | |
2027 | bool empty_wal = true; | |
2028 | CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal); | |
2029 | } | |
2030 | ||
2031 | // Test that we fail fast if WAL is not emptied between changing the write | |
2032 | // policy from WriteCommitted to WritePrepared | |
2033 | TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) { | |
2034 | bool empty_wal = true; | |
2035 | CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal); | |
2036 | } | |
2037 | ||
2038 | // Test that we can change write policy from WritePrepare back to WriteCommitted | |
2039 | // after a clean shutdown (which would empty the WAL) | |
2040 | TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) { | |
2041 | bool empty_wal = true; | |
2042 | CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal); | |
2043 | } | |
2044 | ||
2045 | // Test that we fail fast if WAL is not emptied between changing the write | |
2046 | // policy from WriteCommitted to WritePrepared | |
2047 | TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) { | |
2048 | bool empty_wal = true; | |
2049 | CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal); | |
2050 | } | |
2051 | ||
2052 | } // namespace rocksdb | |
2053 | ||
2054 | int main(int argc, char** argv) { | |
2055 | ::testing::InitGoogleTest(&argc, argv); | |
2056 | return RUN_ALL_TESTS(); | |
2057 | } | |
2058 | ||
2059 | #else | |
2060 | #include <stdio.h> | |
2061 | ||
2062 | int main(int /*argc*/, char** /*argv*/) { | |
2063 | fprintf(stderr, | |
2064 | "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n"); | |
2065 | return 0; | |
2066 | } | |
2067 | ||
2068 | #endif // ROCKSDB_LITE |