]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
11fdf7f2 | 8 | #include <algorithm> |
494da23a | 9 | #include <atomic> |
f67539c2 | 10 | #include <cinttypes> |
11fdf7f2 TL |
11 | #include <functional> |
12 | #include <string> | |
13 | #include <thread> | |
14 | ||
f67539c2 | 15 | #include "db/db_impl/db_impl.h" |
11fdf7f2 | 16 | #include "db/dbformat.h" |
20effc67 | 17 | #include "port/port.h" |
1e59de90 | 18 | #include "port/stack_trace.h" |
11fdf7f2 TL |
19 | #include "rocksdb/db.h" |
20 | #include "rocksdb/options.h" | |
21 | #include "rocksdb/types.h" | |
22 | #include "rocksdb/utilities/debug.h" | |
23 | #include "rocksdb/utilities/transaction.h" | |
24 | #include "rocksdb/utilities/transaction_db.h" | |
25 | #include "table/mock_table.h" | |
f67539c2 TL |
26 | #include "test_util/sync_point.h" |
27 | #include "test_util/testharness.h" | |
28 | #include "test_util/testutil.h" | |
29 | #include "test_util/transaction_test_util.h" | |
494da23a | 30 | #include "util/mutexlock.h" |
11fdf7f2 TL |
31 | #include "util/random.h" |
32 | #include "util/string_util.h" | |
20effc67 | 33 | #include "utilities/fault_injection_env.h" |
11fdf7f2 TL |
34 | #include "utilities/merge_operators.h" |
35 | #include "utilities/merge_operators/string_append/stringappend.h" | |
36 | #include "utilities/transactions/pessimistic_transaction_db.h" | |
20effc67 | 37 | #include "utilities/transactions/transaction_test.h" |
11fdf7f2 TL |
38 | #include "utilities/transactions/write_prepared_txn_db.h" |
39 | ||
11fdf7f2 TL |
40 | using std::string; |
41 | ||
f67539c2 | 42 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
43 | |
44 | using CommitEntry = WritePreparedTxnDB::CommitEntry; | |
45 | using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b; | |
46 | using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat; | |
47 | ||
48 | TEST(PreparedHeap, BasicsTest) { | |
49 | WritePreparedTxnDB::PreparedHeap heap; | |
f67539c2 TL |
50 | { |
51 | MutexLock ml(heap.push_pop_mutex()); | |
52 | heap.push(14l); | |
53 | // Test with one element | |
54 | ASSERT_EQ(14l, heap.top()); | |
55 | heap.push(24l); | |
56 | heap.push(34l); | |
57 | // Test that old min is still on top | |
58 | ASSERT_EQ(14l, heap.top()); | |
59 | heap.push(44l); | |
60 | heap.push(54l); | |
61 | heap.push(64l); | |
62 | heap.push(74l); | |
63 | heap.push(84l); | |
64 | } | |
11fdf7f2 TL |
65 | // Test that old min is still on top |
66 | ASSERT_EQ(14l, heap.top()); | |
11fdf7f2 TL |
67 | heap.erase(24l); |
68 | // Test that old min is still on top | |
f67539c2 | 69 | ASSERT_EQ(14l, heap.top()); |
11fdf7f2 | 70 | heap.erase(14l); |
11fdf7f2 TL |
71 | // Test that the new comes to the top after multiple erase |
72 | ASSERT_EQ(34l, heap.top()); | |
73 | heap.erase(34l); | |
74 | // Test that the new comes to the top after single erase | |
75 | ASSERT_EQ(44l, heap.top()); | |
76 | heap.erase(54l); | |
77 | ASSERT_EQ(44l, heap.top()); | |
78 | heap.pop(); // pop 44l | |
79 | // Test that the erased items are ignored after pop | |
80 | ASSERT_EQ(64l, heap.top()); | |
81 | heap.erase(44l); | |
82 | // Test that erasing an already popped item would work | |
83 | ASSERT_EQ(64l, heap.top()); | |
84 | heap.erase(84l); | |
85 | ASSERT_EQ(64l, heap.top()); | |
f67539c2 TL |
86 | { |
87 | MutexLock ml(heap.push_pop_mutex()); | |
88 | heap.push(85l); | |
89 | heap.push(86l); | |
90 | heap.push(87l); | |
91 | heap.push(88l); | |
92 | heap.push(89l); | |
93 | } | |
11fdf7f2 TL |
94 | heap.erase(87l); |
95 | heap.erase(85l); | |
96 | heap.erase(89l); | |
97 | heap.erase(86l); | |
98 | heap.erase(88l); | |
99 | // Test top remains the same after a random order of many erases | |
100 | ASSERT_EQ(64l, heap.top()); | |
101 | heap.pop(); | |
102 | // Test that pop works with a series of random pending erases | |
103 | ASSERT_EQ(74l, heap.top()); | |
104 | ASSERT_FALSE(heap.empty()); | |
105 | heap.pop(); | |
106 | // Test that empty works | |
107 | ASSERT_TRUE(heap.empty()); | |
108 | } | |
109 | ||
110 | // This is a scenario reconstructed from a buggy trace. Test that the bug does | |
111 | // not resurface again. | |
112 | TEST(PreparedHeap, EmptyAtTheEnd) { | |
113 | WritePreparedTxnDB::PreparedHeap heap; | |
f67539c2 TL |
114 | { |
115 | MutexLock ml(heap.push_pop_mutex()); | |
116 | heap.push(40l); | |
117 | } | |
11fdf7f2 TL |
118 | ASSERT_EQ(40l, heap.top()); |
119 | // Although not a recommended scenario, we must be resilient against erase | |
120 | // without a prior push. | |
121 | heap.erase(50l); | |
122 | ASSERT_EQ(40l, heap.top()); | |
f67539c2 TL |
123 | { |
124 | MutexLock ml(heap.push_pop_mutex()); | |
125 | heap.push(60l); | |
126 | } | |
11fdf7f2 TL |
127 | ASSERT_EQ(40l, heap.top()); |
128 | ||
129 | heap.erase(60l); | |
130 | ASSERT_EQ(40l, heap.top()); | |
131 | heap.erase(40l); | |
132 | ASSERT_TRUE(heap.empty()); | |
133 | ||
f67539c2 TL |
134 | { |
135 | MutexLock ml(heap.push_pop_mutex()); | |
136 | heap.push(40l); | |
137 | } | |
11fdf7f2 TL |
138 | ASSERT_EQ(40l, heap.top()); |
139 | heap.erase(50l); | |
140 | ASSERT_EQ(40l, heap.top()); | |
f67539c2 TL |
141 | { |
142 | MutexLock ml(heap.push_pop_mutex()); | |
143 | heap.push(60l); | |
144 | } | |
11fdf7f2 TL |
145 | ASSERT_EQ(40l, heap.top()); |
146 | ||
147 | heap.erase(40l); | |
148 | // Test that the erase has not emptied the heap (we had a bug doing that) | |
149 | ASSERT_FALSE(heap.empty()); | |
150 | ASSERT_EQ(60l, heap.top()); | |
151 | heap.erase(60l); | |
152 | ASSERT_TRUE(heap.empty()); | |
153 | } | |
154 | ||
155 | // Generate random order of PreparedHeap access and test that the heap will be | |
156 | // successfully emptied at the end. | |
157 | TEST(PreparedHeap, Concurrent) { | |
158 | const size_t t_cnt = 10; | |
f67539c2 | 159 | ROCKSDB_NAMESPACE::port::Thread t[t_cnt + 1]; |
11fdf7f2 TL |
160 | WritePreparedTxnDB::PreparedHeap heap; |
161 | port::RWMutex prepared_mutex; | |
f67539c2 | 162 | std::atomic<size_t> last; |
11fdf7f2 TL |
163 | |
164 | for (size_t n = 0; n < 100; n++) { | |
f67539c2 TL |
165 | last = 0; |
166 | t[0] = ROCKSDB_NAMESPACE::port::Thread([&]() { | |
167 | Random rnd(1103); | |
168 | for (size_t seq = 1; seq <= t_cnt; seq++) { | |
169 | // This is not recommended usage but we should be resilient against it. | |
170 | bool skip_push = rnd.OneIn(5); | |
11fdf7f2 | 171 | if (!skip_push) { |
f67539c2 TL |
172 | MutexLock ml(heap.push_pop_mutex()); |
173 | std::this_thread::yield(); | |
11fdf7f2 | 174 | heap.push(seq); |
f67539c2 | 175 | last.store(seq); |
11fdf7f2 | 176 | } |
f67539c2 TL |
177 | } |
178 | }); | |
179 | for (size_t i = 1; i <= t_cnt; i++) { | |
180 | t[i] = | |
181 | ROCKSDB_NAMESPACE::port::Thread([&heap, &prepared_mutex, &last, i]() { | |
182 | auto seq = i; | |
183 | do { | |
184 | std::this_thread::yield(); | |
185 | } while (last.load() < seq); | |
186 | WriteLock wl(&prepared_mutex); | |
187 | heap.erase(seq); | |
188 | }); | |
11fdf7f2 | 189 | } |
f67539c2 | 190 | for (size_t i = 0; i <= t_cnt; i++) { |
11fdf7f2 TL |
191 | t[i].join(); |
192 | } | |
193 | ASSERT_TRUE(heap.empty()); | |
194 | } | |
195 | } | |
196 | ||
197 | // Test that WriteBatchWithIndex correctly counts the number of sub-batches | |
198 | TEST(WriteBatchWithIndex, SubBatchCnt) { | |
199 | ColumnFamilyOptions cf_options; | |
200 | std::string cf_name = "two"; | |
201 | DB* db; | |
202 | Options options; | |
203 | options.create_if_missing = true; | |
204 | const std::string dbname = test::PerThreadDBPath("transaction_testdb"); | |
1e59de90 | 205 | EXPECT_OK(DestroyDB(dbname, options)); |
11fdf7f2 TL |
206 | ASSERT_OK(DB::Open(options, dbname, &db)); |
207 | ColumnFamilyHandle* cf_handle = nullptr; | |
208 | ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); | |
209 | WriteOptions write_options; | |
210 | size_t batch_cnt = 1; | |
211 | size_t save_points = 0; | |
212 | std::vector<size_t> batch_cnt_at; | |
213 | WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true, | |
214 | 0); | |
215 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
216 | batch_cnt_at.push_back(batch_cnt); | |
217 | batch.SetSavePoint(); | |
218 | save_points++; | |
1e59de90 | 219 | ASSERT_OK(batch.Put(Slice("key"), Slice("value"))); |
11fdf7f2 TL |
220 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); |
221 | batch_cnt_at.push_back(batch_cnt); | |
222 | batch.SetSavePoint(); | |
223 | save_points++; | |
1e59de90 | 224 | ASSERT_OK(batch.Put(Slice("key2"), Slice("value2"))); |
11fdf7f2 TL |
225 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); |
226 | // duplicate the keys | |
227 | batch_cnt_at.push_back(batch_cnt); | |
228 | batch.SetSavePoint(); | |
229 | save_points++; | |
1e59de90 | 230 | ASSERT_OK(batch.Put(Slice("key"), Slice("value3"))); |
11fdf7f2 TL |
231 | batch_cnt++; |
232 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); | |
233 | // duplicate the 2nd key. It should not be counted duplicate since a | |
234 | // sub-patch is cut after the last duplicate. | |
235 | batch_cnt_at.push_back(batch_cnt); | |
236 | batch.SetSavePoint(); | |
237 | save_points++; | |
1e59de90 | 238 | ASSERT_OK(batch.Put(Slice("key2"), Slice("value4"))); |
11fdf7f2 TL |
239 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); |
240 | // duplicate the keys but in a different cf. It should not be counted as | |
241 | // duplicate keys | |
242 | batch_cnt_at.push_back(batch_cnt); | |
243 | batch.SetSavePoint(); | |
244 | save_points++; | |
1e59de90 | 245 | ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value5"))); |
11fdf7f2 TL |
246 | ASSERT_EQ(batch_cnt, batch.SubBatchCnt()); |
247 | ||
248 | // Test that the number of sub-batches matches what we count with | |
249 | // SubBatchCounter | |
250 | std::map<uint32_t, const Comparator*> comparators; | |
251 | comparators[0] = db->DefaultColumnFamily()->GetComparator(); | |
252 | comparators[cf_handle->GetID()] = cf_handle->GetComparator(); | |
253 | SubBatchCounter counter(comparators); | |
254 | ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter)); | |
255 | ASSERT_EQ(batch_cnt, counter.BatchCount()); | |
256 | ||
257 | // Test that RollbackToSavePoint will properly resets the number of | |
258 | // sub-batches | |
259 | for (size_t i = save_points; i > 0; i--) { | |
1e59de90 | 260 | ASSERT_OK(batch.RollbackToSavePoint()); |
11fdf7f2 TL |
261 | ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt()); |
262 | } | |
263 | ||
264 | // Test the count is right with random batches | |
265 | { | |
266 | const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms | |
267 | Random rnd(1131); | |
268 | std::string keys[TOTAL_KEYS]; | |
269 | for (size_t k = 0; k < TOTAL_KEYS; k++) { | |
270 | int len = static_cast<int>(rnd.Uniform(50)); | |
271 | keys[k] = test::RandomKey(&rnd, len); | |
272 | } | |
273 | for (size_t i = 0; i < 1000; i++) { // 1000 random batches | |
274 | WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(), | |
275 | 0, true, 0); | |
276 | for (size_t k = 0; k < 10; k++) { // 10 key per batch | |
277 | size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS)); | |
278 | Slice key = Slice(keys[ki]); | |
20effc67 TL |
279 | std::string tmp = rnd.RandomString(16); |
280 | Slice value = Slice(tmp); | |
1e59de90 | 281 | ASSERT_OK(rndbatch.Put(key, value)); |
11fdf7f2 TL |
282 | } |
283 | SubBatchCounter batch_counter(comparators); | |
284 | ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter)); | |
285 | ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount()); | |
286 | } | |
287 | } | |
288 | ||
289 | delete cf_handle; | |
290 | delete db; | |
291 | } | |
292 | ||
293 | TEST(CommitEntry64b, BasicTest) { | |
294 | const size_t INDEX_BITS = static_cast<size_t>(21); | |
295 | const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS); | |
296 | const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS)); | |
297 | ||
298 | // zero-initialized CommitEntry64b should indicate an empty entry | |
299 | CommitEntry64b empty_entry64b; | |
300 | uint64_t empty_index = 11ul; | |
301 | CommitEntry empty_entry; | |
302 | bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT); | |
303 | ASSERT_FALSE(ok); | |
304 | ||
305 | // the zero entry is reserved for un-initialized entries | |
306 | const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1; | |
307 | // Samples over the numbers that are covered by that many index bits | |
308 | std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}}; | |
309 | // Samples over the numbers that are covered by that many commit bits | |
310 | std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}}; | |
311 | // Iterate over prepare numbers that have i) cover all bits of a sequence | |
312 | // number, and ii) include some bits that fall into the range of index or | |
313 | // commit bits | |
314 | for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) { | |
315 | for (uint64_t i : is) { | |
316 | for (uint64_t d : ds) { | |
317 | uint64_t p = base + i + d; | |
318 | for (uint64_t c : {p, p + d / 2, p + d}) { | |
319 | uint64_t index = p % INDEX_SIZE; | |
320 | CommitEntry before(p, c), after; | |
321 | CommitEntry64b entry64b(before, FORMAT); | |
322 | ok = entry64b.Parse(index, &after, FORMAT); | |
323 | ASSERT_TRUE(ok); | |
324 | if (!(before == after)) { | |
325 | printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64 | |
326 | " c %" PRIu64 " index %" PRIu64 "\n", | |
327 | base, i, d, p, c, index); | |
328 | } | |
329 | ASSERT_EQ(before, after); | |
330 | } | |
331 | } | |
332 | } | |
333 | } | |
334 | } | |
335 | ||
336 | class WritePreparedTxnDBMock : public WritePreparedTxnDB { | |
337 | public: | |
338 | WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt) | |
339 | : WritePreparedTxnDB(db_impl, opt) {} | |
11fdf7f2 TL |
340 | void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) { |
341 | snapshots_ = snapshots; | |
342 | } | |
343 | void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); } | |
344 | ||
345 | protected: | |
494da23a | 346 | const std::vector<SequenceNumber> GetSnapshotListFromDB( |
11fdf7f2 TL |
347 | SequenceNumber /* unused */) override { |
348 | return snapshots_; | |
349 | } | |
350 | ||
351 | private: | |
352 | std::vector<SequenceNumber> snapshots_; | |
353 | }; | |
354 | ||
355 | class WritePreparedTransactionTestBase : public TransactionTestBase { | |
356 | public: | |
357 | WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue, | |
f67539c2 TL |
358 | TxnDBWritePolicy write_policy, |
359 | WriteOrdering write_ordering) | |
360 | : TransactionTestBase(use_stackable_db, two_write_queue, write_policy, | |
361 | write_ordering){}; | |
11fdf7f2 TL |
362 | |
363 | protected: | |
494da23a TL |
364 | void UpdateTransactionDBOptions(size_t snapshot_cache_bits, |
365 | size_t commit_cache_bits) { | |
366 | txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits; | |
367 | txn_db_options.wp_commit_cache_bits = commit_cache_bits; | |
368 | } | |
369 | void UpdateTransactionDBOptions(size_t snapshot_cache_bits) { | |
370 | txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits; | |
371 | } | |
11fdf7f2 TL |
372 | // If expect_update is set, check if it actually updated old_commit_map_. If |
373 | // it did not and yet suggested not to check the next snapshot, do the | |
374 | // opposite to check if it was not a bad suggestion. | |
375 | void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit, | |
376 | uint64_t snapshot, | |
377 | uint64_t next_snapshot, | |
378 | bool expect_update) { | |
379 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
380 | // reset old_commit_map_empty_ so that its value indicate whether | |
381 | // old_commit_map_ was updated | |
382 | wp_db->old_commit_map_empty_ = true; | |
383 | bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot, | |
384 | snapshot < next_snapshot); | |
385 | if (expect_update == wp_db->old_commit_map_empty_) { | |
386 | printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64 | |
387 | " next: %" PRIu64 "\n", | |
388 | prepare, commit, snapshot, next_snapshot); | |
389 | } | |
390 | EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_); | |
391 | if (!check_next && wp_db->old_commit_map_empty_) { | |
392 | // do the opposite to make sure it was not a bad suggestion | |
393 | const bool dont_care_bool = true; | |
394 | wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot, | |
395 | dont_care_bool); | |
396 | if (!wp_db->old_commit_map_empty_) { | |
397 | printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64 | |
398 | " next: %" PRIu64 "\n", | |
399 | prepare, commit, snapshot, next_snapshot); | |
400 | } | |
401 | EXPECT_TRUE(wp_db->old_commit_map_empty_); | |
402 | } | |
403 | } | |
404 | ||
405 | // Test that a CheckAgainstSnapshots thread reading old_snapshots will not | |
406 | // miss a snapshot because of a concurrent update by UpdateSnapshots that is | |
407 | // writing new_snapshots. Both threads are broken at two points. The sync | |
408 | // points to enforce them are specified by a1, a2, b1, and b2. CommitEntry | |
409 | // entry is expected to be vital for one of the snapshots that is common | |
410 | // between the old and new list of snapshots. | |
411 | void SnapshotConcurrentAccessTestInternal( | |
412 | WritePreparedTxnDB* wp_db, | |
413 | const std::vector<SequenceNumber>& old_snapshots, | |
414 | const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry, | |
415 | SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) { | |
416 | // First reset the snapshot list | |
417 | const std::vector<SequenceNumber> empty_snapshots; | |
418 | wp_db->old_commit_map_empty_ = true; | |
419 | wp_db->UpdateSnapshots(empty_snapshots, ++version); | |
420 | // Then initialize it with the old_snapshots | |
421 | wp_db->UpdateSnapshots(old_snapshots, ++version); | |
422 | ||
423 | // Starting from the first thread, cut each thread at two points | |
f67539c2 | 424 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
11fdf7f2 TL |
425 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1), |
426 | "WritePreparedTxnDB::UpdateSnapshots:s:start"}, | |
427 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1), | |
428 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)}, | |
429 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2), | |
430 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)}, | |
431 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2), | |
432 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)}, | |
433 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:end", | |
434 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)}, | |
435 | }); | |
f67539c2 | 436 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
11fdf7f2 TL |
437 | { |
438 | ASSERT_TRUE(wp_db->old_commit_map_empty_); | |
f67539c2 | 439 | ROCKSDB_NAMESPACE::port::Thread t1( |
11fdf7f2 | 440 | [&]() { wp_db->UpdateSnapshots(new_snapshots, version); }); |
1e59de90 | 441 | wp_db->CheckAgainstSnapshots(entry); |
11fdf7f2 | 442 | t1.join(); |
11fdf7f2 TL |
443 | ASSERT_FALSE(wp_db->old_commit_map_empty_); |
444 | } | |
f67539c2 | 445 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
11fdf7f2 TL |
446 | |
447 | wp_db->old_commit_map_empty_ = true; | |
448 | wp_db->UpdateSnapshots(empty_snapshots, ++version); | |
449 | wp_db->UpdateSnapshots(old_snapshots, ++version); | |
450 | // Starting from the second thread, cut each thread at two points | |
f67539c2 | 451 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
11fdf7f2 TL |
452 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1), |
453 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:start"}, | |
454 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1), | |
455 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)}, | |
456 | {"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2), | |
457 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)}, | |
458 | {"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2), | |
459 | "WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)}, | |
460 | {"WritePreparedTxnDB::UpdateSnapshots:p:end", | |
461 | "WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)}, | |
462 | }); | |
f67539c2 | 463 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
11fdf7f2 TL |
464 | { |
465 | ASSERT_TRUE(wp_db->old_commit_map_empty_); | |
f67539c2 | 466 | ROCKSDB_NAMESPACE::port::Thread t1( |
11fdf7f2 | 467 | [&]() { wp_db->UpdateSnapshots(new_snapshots, version); }); |
1e59de90 | 468 | wp_db->CheckAgainstSnapshots(entry); |
11fdf7f2 | 469 | t1.join(); |
11fdf7f2 TL |
470 | ASSERT_FALSE(wp_db->old_commit_map_empty_); |
471 | } | |
f67539c2 | 472 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
11fdf7f2 TL |
473 | } |
474 | ||
475 | // Verify value of keys. | |
476 | void VerifyKeys(const std::unordered_map<std::string, std::string>& data, | |
477 | const Snapshot* snapshot = nullptr) { | |
478 | std::string value; | |
479 | ReadOptions read_options; | |
480 | read_options.snapshot = snapshot; | |
481 | for (auto& kv : data) { | |
482 | auto s = db->Get(read_options, kv.first, &value); | |
483 | ASSERT_TRUE(s.ok() || s.IsNotFound()); | |
484 | if (s.ok()) { | |
485 | if (kv.second != value) { | |
486 | printf("key = %s\n", kv.first.c_str()); | |
487 | } | |
488 | ASSERT_EQ(kv.second, value); | |
489 | } else { | |
490 | ASSERT_EQ(kv.second, "NOT_FOUND"); | |
491 | } | |
492 | ||
493 | // Try with MultiGet API too | |
494 | std::vector<std::string> values; | |
495 | auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()}, | |
496 | {kv.first}, &values); | |
497 | ASSERT_EQ(1, values.size()); | |
498 | ASSERT_EQ(1, s_vec.size()); | |
499 | s = s_vec[0]; | |
500 | ASSERT_TRUE(s.ok() || s.IsNotFound()); | |
501 | if (s.ok()) { | |
502 | ASSERT_TRUE(kv.second == values[0]); | |
503 | } else { | |
504 | ASSERT_EQ(kv.second, "NOT_FOUND"); | |
505 | } | |
506 | } | |
507 | } | |
508 | ||
509 | // Verify all versions of keys. | |
510 | void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) { | |
511 | std::vector<KeyVersion> versions; | |
512 | const size_t kMaxKeys = 100000; | |
513 | ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key, | |
514 | expected_versions.back().user_key, kMaxKeys, | |
515 | &versions)); | |
516 | ASSERT_EQ(expected_versions.size(), versions.size()); | |
517 | for (size_t i = 0; i < versions.size(); i++) { | |
518 | ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key); | |
519 | ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence); | |
520 | ASSERT_EQ(expected_versions[i].type, versions[i].type); | |
521 | if (versions[i].type != kTypeDeletion && | |
522 | versions[i].type != kTypeSingleDeletion) { | |
523 | ASSERT_EQ(expected_versions[i].value, versions[i].value); | |
524 | } | |
525 | // Range delete not supported. | |
1e59de90 | 526 | ASSERT_NE(expected_versions[i].type, kTypeRangeDeletion); |
11fdf7f2 TL |
527 | } |
528 | } | |
529 | }; | |
530 | ||
531 | class WritePreparedTransactionTest | |
532 | : public WritePreparedTransactionTestBase, | |
533 | virtual public ::testing::WithParamInterface< | |
f67539c2 | 534 | std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering>> { |
11fdf7f2 TL |
535 | public: |
536 | WritePreparedTransactionTest() | |
f67539c2 TL |
537 | : WritePreparedTransactionTestBase( |
538 | std::get<0>(GetParam()), std::get<1>(GetParam()), | |
539 | std::get<2>(GetParam()), std::get<3>(GetParam())){}; | |
11fdf7f2 TL |
540 | }; |
541 | ||
1e59de90 | 542 | #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) |
11fdf7f2 TL |
543 | class SnapshotConcurrentAccessTest |
544 | : public WritePreparedTransactionTestBase, | |
f67539c2 TL |
545 | virtual public ::testing::WithParamInterface<std::tuple< |
546 | bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> { | |
11fdf7f2 TL |
547 | public: |
548 | SnapshotConcurrentAccessTest() | |
f67539c2 TL |
549 | : WritePreparedTransactionTestBase( |
550 | std::get<0>(GetParam()), std::get<1>(GetParam()), | |
551 | std::get<2>(GetParam()), std::get<3>(GetParam())), | |
552 | split_id_(std::get<4>(GetParam())), | |
553 | split_cnt_(std::get<5>(GetParam())){}; | |
11fdf7f2 TL |
554 | |
555 | protected: | |
556 | // A test is split into split_cnt_ tests, each identified with split_id_ where | |
557 | // 0 <= split_id_ < split_cnt_ | |
558 | size_t split_id_; | |
559 | size_t split_cnt_; | |
560 | }; | |
1e59de90 | 561 | #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) |
11fdf7f2 TL |
562 | |
563 | class SeqAdvanceConcurrentTest | |
564 | : public WritePreparedTransactionTestBase, | |
f67539c2 TL |
565 | virtual public ::testing::WithParamInterface<std::tuple< |
566 | bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> { | |
11fdf7f2 TL |
567 | public: |
568 | SeqAdvanceConcurrentTest() | |
f67539c2 TL |
569 | : WritePreparedTransactionTestBase( |
570 | std::get<0>(GetParam()), std::get<1>(GetParam()), | |
571 | std::get<2>(GetParam()), std::get<3>(GetParam())), | |
572 | split_id_(std::get<4>(GetParam())), | |
20effc67 TL |
573 | split_cnt_(std::get<5>(GetParam())) { |
574 | special_env.skip_fsync_ = true; | |
575 | }; | |
11fdf7f2 TL |
576 | |
577 | protected: | |
578 | // A test is split into split_cnt_ tests, each identified with split_id_ where | |
579 | // 0 <= split_id_ < split_cnt_ | |
580 | size_t split_id_; | |
581 | size_t split_cnt_; | |
582 | }; | |
583 | ||
584 | INSTANTIATE_TEST_CASE_P( | |
f67539c2 TL |
585 | WritePreparedTransaction, WritePreparedTransactionTest, |
586 | ::testing::Values( | |
587 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite), | |
588 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite), | |
589 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite))); | |
11fdf7f2 | 590 | |
1e59de90 | 591 | #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) |
11fdf7f2 TL |
592 | INSTANTIATE_TEST_CASE_P( |
593 | TwoWriteQueues, SnapshotConcurrentAccessTest, | |
f67539c2 TL |
594 | ::testing::Values( |
595 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 20), | |
596 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 20), | |
597 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 20), | |
598 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 20), | |
599 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 20), | |
600 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 20), | |
601 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 20), | |
602 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 20), | |
603 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 20), | |
604 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 20), | |
605 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 10, 20), | |
606 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 11, 20), | |
607 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 12, 20), | |
608 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 13, 20), | |
609 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 14, 20), | |
610 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 15, 20), | |
611 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 16, 20), | |
612 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 17, 20), | |
613 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 18, 20), | |
614 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 19, 20), | |
615 | ||
616 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 20), | |
617 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 20), | |
618 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 20), | |
619 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 20), | |
620 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 20), | |
621 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 20), | |
622 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 20), | |
623 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 20), | |
624 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 20), | |
625 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 20), | |
626 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 10, 20), | |
627 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 11, 20), | |
628 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 12, 20), | |
629 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 13, 20), | |
630 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 14, 20), | |
631 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 15, 20), | |
632 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 16, 20), | |
633 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 17, 20), | |
634 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 18, 20), | |
635 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 19, 20))); | |
11fdf7f2 TL |
636 | |
637 | INSTANTIATE_TEST_CASE_P( | |
638 | OneWriteQueue, SnapshotConcurrentAccessTest, | |
f67539c2 TL |
639 | ::testing::Values( |
640 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 20), | |
641 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 20), | |
642 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 20), | |
643 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 20), | |
644 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 20), | |
645 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 20), | |
646 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 20), | |
647 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 20), | |
648 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 20), | |
649 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 20), | |
650 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 10, 20), | |
651 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 11, 20), | |
652 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 12, 20), | |
653 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 13, 20), | |
654 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 14, 20), | |
655 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 15, 20), | |
656 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 16, 20), | |
657 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 17, 20), | |
658 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 18, 20), | |
659 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 19, 20))); | |
11fdf7f2 TL |
660 | |
661 | INSTANTIATE_TEST_CASE_P( | |
662 | TwoWriteQueues, SeqAdvanceConcurrentTest, | |
f67539c2 TL |
663 | ::testing::Values( |
664 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 10), | |
665 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 10), | |
666 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 10), | |
667 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 10), | |
668 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 10), | |
669 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 10), | |
670 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 10), | |
671 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 10), | |
672 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 10), | |
673 | std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 10), | |
674 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 10), | |
675 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 10), | |
676 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 10), | |
677 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 10), | |
678 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 10), | |
679 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 10), | |
680 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 10), | |
681 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 10), | |
682 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 10), | |
683 | std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 10))); | |
11fdf7f2 TL |
684 | |
685 | INSTANTIATE_TEST_CASE_P( | |
686 | OneWriteQueue, SeqAdvanceConcurrentTest, | |
f67539c2 TL |
687 | ::testing::Values( |
688 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 10), | |
689 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 10), | |
690 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 10), | |
691 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 10), | |
692 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 10), | |
693 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 10), | |
694 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 10), | |
695 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 10), | |
696 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 10), | |
697 | std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 10))); | |
1e59de90 | 698 | #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) |
11fdf7f2 | 699 | |
f67539c2 | 700 | TEST_P(WritePreparedTransactionTest, CommitMap) { |
11fdf7f2 | 701 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
1e59de90 TL |
702 | ASSERT_NE(wp_db, nullptr); |
703 | ASSERT_NE(wp_db->db_impl_, nullptr); | |
11fdf7f2 TL |
704 | size_t size = wp_db->COMMIT_CACHE_SIZE; |
705 | CommitEntry c = {5, 12}, e; | |
706 | bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e); | |
707 | ASSERT_FALSE(evicted); | |
708 | ||
709 | // Should be able to read the same value | |
710 | CommitEntry64b dont_care; | |
711 | bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e); | |
712 | ASSERT_TRUE(found); | |
713 | ASSERT_EQ(c, e); | |
714 | // Should be able to distinguish between overlapping entries | |
715 | found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e); | |
716 | ASSERT_TRUE(found); | |
717 | ASSERT_NE(c.prep_seq + size, e.prep_seq); | |
718 | // Should be able to detect non-existent entry | |
719 | found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e); | |
720 | ASSERT_FALSE(found); | |
721 | ||
722 | // Reject an invalid exchange | |
723 | CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size}; | |
724 | CommitEntry64b e2_64b(e2, wp_db->FORMAT); | |
725 | bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e); | |
726 | ASSERT_FALSE(exchanged); | |
727 | // check whether it did actually reject that | |
728 | found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e); | |
729 | ASSERT_TRUE(found); | |
730 | ASSERT_EQ(c, e); | |
731 | ||
732 | // Accept a valid exchange | |
733 | CommitEntry64b c_64b(c, wp_db->FORMAT); | |
734 | CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1}; | |
735 | exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3); | |
736 | ASSERT_TRUE(exchanged); | |
737 | // check whether it did actually accepted that | |
738 | found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e); | |
739 | ASSERT_TRUE(found); | |
740 | ASSERT_EQ(e3, e); | |
741 | ||
742 | // Rewrite an entry | |
743 | CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1}; | |
744 | evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e); | |
745 | ASSERT_TRUE(evicted); | |
746 | ASSERT_EQ(e3, e); | |
747 | found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e); | |
748 | ASSERT_TRUE(found); | |
749 | ASSERT_EQ(e4, e); | |
750 | } | |
751 | ||
752 | TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) { | |
753 | // If prepare <= snapshot < commit we should keep the entry around since its | |
754 | // nonexistence could be interpreted as committed in the snapshot while it is | |
755 | // not true. We keep such entries around by adding them to the | |
756 | // old_commit_map_. | |
757 | uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/; | |
758 | p = 10l, c = 15l, s = 20l, ns = 21l; | |
759 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
760 | // If we do not expect the old commit map to be updated, try also with a next | |
761 | // snapshot that is expected to update the old commit map. This would test | |
762 | // that MaybeUpdateOldCommitMap would not prevent us from checking the next | |
763 | // snapshot that must be checked. | |
764 | p = 10l, c = 15l, s = 20l, ns = 11l; | |
765 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
766 | ||
767 | p = 10l, c = 20l, s = 20l, ns = 19l; | |
768 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
769 | p = 10l, c = 20l, s = 20l, ns = 21l; | |
770 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
771 | ||
772 | p = 20l, c = 20l, s = 20l, ns = 21l; | |
773 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
774 | p = 20l, c = 20l, s = 20l, ns = 19l; | |
775 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
776 | ||
777 | p = 10l, c = 25l, s = 20l, ns = 21l; | |
778 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true); | |
779 | ||
780 | p = 20l, c = 25l, s = 20l, ns = 21l; | |
781 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true); | |
782 | ||
783 | p = 21l, c = 25l, s = 20l, ns = 22l; | |
784 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
785 | p = 21l, c = 25l, s = 20l, ns = 19l; | |
786 | MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); | |
787 | } | |
788 | ||
f67539c2 TL |
789 | // Trigger the condition where some old memtables are skipped when doing |
790 | // TransactionUtil::CheckKey(), and make sure the result is still correct. | |
791 | TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) { | |
792 | const int kAttemptHistoryMemtable = 0; | |
793 | const int kAttemptImmMemTable = 1; | |
794 | for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable; | |
795 | attempt++) { | |
796 | options.max_write_buffer_number_to_maintain = 3; | |
1e59de90 | 797 | ASSERT_OK(ReOpen()); |
f67539c2 TL |
798 | |
799 | WriteOptions write_options; | |
800 | ReadOptions read_options; | |
801 | TransactionOptions txn_options; | |
802 | txn_options.set_snapshot = true; | |
803 | string value; | |
f67539c2 TL |
804 | |
805 | ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar"))); | |
806 | ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar"))); | |
807 | ||
808 | Transaction* txn = db->BeginTransaction(write_options, txn_options); | |
809 | ASSERT_TRUE(txn != nullptr); | |
810 | ASSERT_OK(txn->SetName("txn")); | |
811 | ||
812 | Transaction* txn2 = db->BeginTransaction(write_options, txn_options); | |
813 | ASSERT_TRUE(txn2 != nullptr); | |
814 | ASSERT_OK(txn2->SetName("txn2")); | |
815 | ||
816 | // This transaction is created to cause potential conflict. | |
817 | Transaction* txn_x = db->BeginTransaction(write_options); | |
818 | ASSERT_OK(txn_x->SetName("txn_x")); | |
819 | ASSERT_OK(txn_x->Put(Slice("foo"), Slice("bar3"))); | |
820 | ASSERT_OK(txn_x->Prepare()); | |
821 | ||
822 | // Create snapshots after the prepare, but there should still | |
823 | // be a conflict when trying to read "foo". | |
824 | ||
825 | if (attempt == kAttemptImmMemTable) { | |
826 | // For the second attempt, hold flush from beginning. The memtable | |
827 | // will be switched to immutable after calling TEST_SwitchMemtable() | |
828 | // while CheckKey() is called. | |
829 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( | |
830 | {{"WritePreparedTransactionTest.CheckKeySkipOldMemtable", | |
831 | "FlushJob::Start"}}); | |
832 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); | |
833 | } | |
834 | ||
835 | // force a memtable flush. The memtable should still be kept | |
836 | FlushOptions flush_ops; | |
837 | if (attempt == kAttemptHistoryMemtable) { | |
838 | ASSERT_OK(db->Flush(flush_ops)); | |
839 | } else { | |
1e59de90 | 840 | ASSERT_EQ(attempt, kAttemptImmMemTable); |
f67539c2 | 841 | DBImpl* db_impl = static_cast<DBImpl*>(db->GetRootDB()); |
1e59de90 | 842 | ASSERT_OK(db_impl->TEST_SwitchMemtable()); |
f67539c2 TL |
843 | } |
844 | uint64_t num_imm_mems; | |
845 | ASSERT_TRUE(db->GetIntProperty(DB::Properties::kNumImmutableMemTable, | |
846 | &num_imm_mems)); | |
847 | if (attempt == kAttemptHistoryMemtable) { | |
848 | ASSERT_EQ(0, num_imm_mems); | |
849 | } else { | |
1e59de90 | 850 | ASSERT_EQ(attempt, kAttemptImmMemTable); |
f67539c2 TL |
851 | ASSERT_EQ(1, num_imm_mems); |
852 | } | |
853 | ||
854 | // Put something in active memtable | |
855 | ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar"))); | |
856 | ||
857 | // Create txn3 after flushing, but this transaction also needs to | |
858 | // check all memtables because of they contains uncommitted data. | |
859 | Transaction* txn3 = db->BeginTransaction(write_options, txn_options); | |
860 | ASSERT_TRUE(txn3 != nullptr); | |
861 | ASSERT_OK(txn3->SetName("txn3")); | |
862 | ||
863 | // Commit the pending write | |
864 | ASSERT_OK(txn_x->Commit()); | |
865 | ||
866 | // Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will | |
867 | // pass. In all cases, both memtables are queried. | |
868 | SetPerfLevel(PerfLevel::kEnableCount); | |
869 | get_perf_context()->Reset(); | |
870 | ASSERT_TRUE(txn3->GetForUpdate(read_options, "foo", &value).IsBusy()); | |
871 | // We should have checked two memtables, active and either immutable | |
872 | // or history memtable, depending on the test case. | |
873 | ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); | |
874 | ||
875 | get_perf_context()->Reset(); | |
876 | ASSERT_TRUE(txn->GetForUpdate(read_options, "foo", &value).IsBusy()); | |
877 | // We should have checked two memtables, active and either immutable | |
878 | // or history memtable, depending on the test case. | |
879 | ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); | |
880 | ||
881 | get_perf_context()->Reset(); | |
882 | ASSERT_OK(txn2->GetForUpdate(read_options, "foo2", &value)); | |
883 | ASSERT_EQ(value, "bar"); | |
884 | // We should have checked two memtables, and since there is no | |
885 | // conflict, another Get() will be made and fetch the data from | |
886 | // DB. If it is in immutable memtable, two extra memtable reads | |
887 | // will be issued. If it is not (in history), only one will | |
888 | // be made, which is to the active memtable. | |
889 | if (attempt == kAttemptHistoryMemtable) { | |
890 | ASSERT_EQ(3, get_perf_context()->get_from_memtable_count); | |
891 | } else { | |
1e59de90 | 892 | ASSERT_EQ(attempt, kAttemptImmMemTable); |
f67539c2 TL |
893 | ASSERT_EQ(4, get_perf_context()->get_from_memtable_count); |
894 | } | |
895 | ||
896 | Transaction* txn4 = db->BeginTransaction(write_options, txn_options); | |
897 | ASSERT_TRUE(txn4 != nullptr); | |
898 | ASSERT_OK(txn4->SetName("txn4")); | |
899 | get_perf_context()->Reset(); | |
900 | ASSERT_OK(txn4->GetForUpdate(read_options, "foo", &value)); | |
901 | if (attempt == kAttemptHistoryMemtable) { | |
902 | // Active memtable will be checked in snapshot validation and when | |
903 | // getting the value. | |
904 | ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); | |
905 | } else { | |
906 | // Only active memtable will be checked in snapshot validation but | |
907 | // both of active and immutable snapshot will be queried when | |
908 | // getting the value. | |
1e59de90 | 909 | ASSERT_EQ(attempt, kAttemptImmMemTable); |
f67539c2 TL |
910 | ASSERT_EQ(3, get_perf_context()->get_from_memtable_count); |
911 | } | |
912 | ||
913 | ASSERT_OK(txn2->Commit()); | |
914 | ASSERT_OK(txn4->Commit()); | |
915 | ||
916 | TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable"); | |
917 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); | |
918 | ||
919 | SetPerfLevel(PerfLevel::kDisable); | |
920 | ||
921 | delete txn; | |
922 | delete txn2; | |
923 | delete txn3; | |
924 | delete txn4; | |
925 | delete txn_x; | |
926 | } | |
927 | } | |
928 | ||
494da23a TL |
929 | // Reproduce the bug with two snapshots with the same seuqence number and test |
930 | // that the release of the first snapshot will not affect the reads by the other | |
931 | // snapshot | |
932 | TEST_P(WritePreparedTransactionTest, DoubleSnapshot) { | |
933 | TransactionOptions txn_options; | |
934 | Status s; | |
935 | ||
936 | // Insert initial value | |
937 | ASSERT_OK(db->Put(WriteOptions(), "key", "value1")); | |
938 | ||
939 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
940 | Transaction* txn = | |
941 | wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr); | |
942 | ASSERT_OK(txn->SetName("txn")); | |
943 | ASSERT_OK(txn->Put("key", "value2")); | |
944 | ASSERT_OK(txn->Prepare()); | |
945 | // Three snapshots with the same seq number | |
946 | const Snapshot* snapshot0 = wp_db->GetSnapshot(); | |
947 | const Snapshot* snapshot1 = wp_db->GetSnapshot(); | |
948 | const Snapshot* snapshot2 = wp_db->GetSnapshot(); | |
949 | ASSERT_OK(txn->Commit()); | |
950 | SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE; | |
951 | SequenceNumber overlap_seq = txn->GetId() + cache_size; | |
952 | delete txn; | |
953 | ||
954 | // 4th snapshot with a larger seq | |
955 | const Snapshot* snapshot3 = wp_db->GetSnapshot(); | |
956 | // Cause an eviction to advance max evicted seq number | |
957 | // This also fetches the 4 snapshots from db since their seq is lower than the | |
958 | // new max | |
959 | wp_db->AddCommitted(overlap_seq, overlap_seq); | |
960 | ||
961 | ReadOptions ropt; | |
962 | // It should see the value before commit | |
963 | ropt.snapshot = snapshot2; | |
964 | PinnableSlice pinnable_val; | |
965 | s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val); | |
966 | ASSERT_OK(s); | |
967 | ASSERT_TRUE(pinnable_val == "value1"); | |
968 | pinnable_val.Reset(); | |
969 | ||
970 | wp_db->ReleaseSnapshot(snapshot1); | |
971 | ||
972 | // It should still see the value before commit | |
973 | s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val); | |
974 | ASSERT_OK(s); | |
975 | ASSERT_TRUE(pinnable_val == "value1"); | |
976 | pinnable_val.Reset(); | |
977 | ||
978 | // Cause an eviction to advance max evicted seq number and trigger updating | |
979 | // the snapshot list | |
980 | overlap_seq += cache_size; | |
981 | wp_db->AddCommitted(overlap_seq, overlap_seq); | |
982 | ||
983 | // It should still see the value before commit | |
984 | s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val); | |
985 | ASSERT_OK(s); | |
986 | ASSERT_TRUE(pinnable_val == "value1"); | |
987 | pinnable_val.Reset(); | |
988 | ||
989 | wp_db->ReleaseSnapshot(snapshot0); | |
990 | wp_db->ReleaseSnapshot(snapshot2); | |
991 | wp_db->ReleaseSnapshot(snapshot3); | |
992 | } | |
993 | ||
994 | size_t UniqueCnt(std::vector<SequenceNumber> vec) { | |
995 | std::set<SequenceNumber> aset; | |
996 | for (auto i : vec) { | |
997 | aset.insert(i); | |
998 | } | |
999 | return aset.size(); | |
1000 | } | |
11fdf7f2 TL |
1001 | // Test that the entries in old_commit_map_ get garbage collected properly |
1002 | TEST_P(WritePreparedTransactionTest, OldCommitMapGC) { | |
1003 | const size_t snapshot_cache_bits = 0; | |
1004 | const size_t commit_cache_bits = 0; | |
1005 | DBImpl* mock_db = new DBImpl(options, dbname); | |
494da23a TL |
1006 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); |
1007 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( | |
1008 | new WritePreparedTxnDBMock(mock_db, txn_db_options)); | |
11fdf7f2 TL |
1009 | |
1010 | SequenceNumber seq = 0; | |
1011 | // Take the first snapshot that overlaps with two txn | |
1012 | auto prep_seq = ++seq; | |
1013 | wp_db->AddPrepared(prep_seq); | |
1014 | auto prep_seq2 = ++seq; | |
1015 | wp_db->AddPrepared(prep_seq2); | |
1016 | auto snap_seq1 = seq; | |
1017 | wp_db->TakeSnapshot(snap_seq1); | |
1018 | auto commit_seq = ++seq; | |
1019 | wp_db->AddCommitted(prep_seq, commit_seq); | |
1020 | wp_db->RemovePrepared(prep_seq); | |
1021 | auto commit_seq2 = ++seq; | |
1022 | wp_db->AddCommitted(prep_seq2, commit_seq2); | |
1023 | wp_db->RemovePrepared(prep_seq2); | |
1024 | // Take the 2nd and 3rd snapshot that overlap with the same txn | |
1025 | prep_seq = ++seq; | |
1026 | wp_db->AddPrepared(prep_seq); | |
1027 | auto snap_seq2 = seq; | |
1028 | wp_db->TakeSnapshot(snap_seq2); | |
1029 | seq++; | |
1030 | auto snap_seq3 = seq; | |
1031 | wp_db->TakeSnapshot(snap_seq3); | |
1032 | seq++; | |
1033 | commit_seq = ++seq; | |
1034 | wp_db->AddCommitted(prep_seq, commit_seq); | |
1035 | wp_db->RemovePrepared(prep_seq); | |
1036 | // Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the | |
1037 | // only item in the commit_cache_ via another commit. | |
1038 | prep_seq = ++seq; | |
1039 | wp_db->AddPrepared(prep_seq); | |
1040 | commit_seq = ++seq; | |
1041 | wp_db->AddCommitted(prep_seq, commit_seq); | |
1042 | wp_db->RemovePrepared(prep_seq); | |
1043 | ||
1044 | // Verify that the evicted commit entries for all snapshots are in the | |
1045 | // old_commit_map_ | |
1046 | { | |
1047 | ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); | |
1048 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
1049 | ASSERT_EQ(3, wp_db->old_commit_map_.size()); | |
494da23a TL |
1050 | ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1])); |
1051 | ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2])); | |
1052 | ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3])); | |
11fdf7f2 TL |
1053 | } |
1054 | ||
1055 | // Verify that the 2nd snapshot is cleaned up after the release | |
1056 | wp_db->ReleaseSnapshotInternal(snap_seq2); | |
1057 | { | |
1058 | ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); | |
1059 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
1060 | ASSERT_EQ(2, wp_db->old_commit_map_.size()); | |
494da23a TL |
1061 | ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1])); |
1062 | ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3])); | |
11fdf7f2 TL |
1063 | } |
1064 | ||
1065 | // Verify that the 1st snapshot is cleaned up after the release | |
1066 | wp_db->ReleaseSnapshotInternal(snap_seq1); | |
1067 | { | |
1068 | ASSERT_FALSE(wp_db->old_commit_map_empty_.load()); | |
1069 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
1070 | ASSERT_EQ(1, wp_db->old_commit_map_.size()); | |
494da23a | 1071 | ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3])); |
11fdf7f2 TL |
1072 | } |
1073 | ||
1074 | // Verify that the 3rd snapshot is cleaned up after the release | |
1075 | wp_db->ReleaseSnapshotInternal(snap_seq3); | |
1076 | { | |
1077 | ASSERT_TRUE(wp_db->old_commit_map_empty_.load()); | |
1078 | ReadLock rl(&wp_db->old_commit_map_mutex_); | |
1079 | ASSERT_EQ(0, wp_db->old_commit_map_.size()); | |
1080 | } | |
1081 | } | |
1082 | ||
f67539c2 | 1083 | TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshots) { |
11fdf7f2 TL |
1084 | std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l, |
1085 | 600l, 700l, 800l, 900l}; | |
1086 | const size_t snapshot_cache_bits = 2; | |
494da23a | 1087 | const uint64_t cache_size = 1ul << snapshot_cache_bits; |
11fdf7f2 TL |
1088 | // Safety check to express the intended size in the test. Can be adjusted if |
1089 | // the snapshots lists changed. | |
1e59de90 | 1090 | ASSERT_EQ((1ul << snapshot_cache_bits) * 2 + 1, snapshots.size()); |
11fdf7f2 | 1091 | DBImpl* mock_db = new DBImpl(options, dbname); |
494da23a | 1092 | UpdateTransactionDBOptions(snapshot_cache_bits); |
11fdf7f2 | 1093 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( |
494da23a | 1094 | new WritePreparedTxnDBMock(mock_db, txn_db_options)); |
11fdf7f2 TL |
1095 | SequenceNumber version = 1000l; |
1096 | ASSERT_EQ(0, wp_db->snapshots_total_); | |
1097 | wp_db->UpdateSnapshots(snapshots, version); | |
1098 | ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_); | |
1099 | // seq numbers are chosen so that we have two of them between each two | |
1100 | // snapshots. If the diff of two consecutive seq is more than 5, there is a | |
1101 | // snapshot between them. | |
1102 | std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l, | |
1103 | 355l, 450l, 455l, 550l, 555l, 650l, 655l, | |
1104 | 750l, 755l, 850l, 855l, 950l, 955l}; | |
1e59de90 | 1105 | ASSERT_GT(seqs.size(), 1); |
20effc67 | 1106 | for (size_t i = 0; i + 1 < seqs.size(); i++) { |
11fdf7f2 TL |
1107 | wp_db->old_commit_map_empty_ = true; // reset |
1108 | CommitEntry commit_entry = {seqs[i], seqs[i + 1]}; | |
1109 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1110 | // Expect update if there is snapshot in between the prepare and commit | |
1111 | bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 && | |
1112 | commit_entry.commit_seq >= snapshots.front() && | |
1113 | commit_entry.prep_seq <= snapshots.back(); | |
1114 | ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_); | |
1115 | } | |
494da23a TL |
1116 | |
1117 | // Test that search will include multiple snapshot from snapshot cache | |
1118 | { | |
1119 | // exclude first and last item in the cache | |
1120 | CommitEntry commit_entry = {snapshots.front() + 1, | |
1121 | snapshots[cache_size - 1] - 1}; | |
1122 | wp_db->old_commit_map_empty_ = true; // reset | |
1123 | wp_db->old_commit_map_.clear(); | |
1124 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1125 | ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2); | |
1126 | } | |
1127 | ||
1128 | // Test that search will include multiple snapshot from old snapshots | |
1129 | { | |
1130 | // include two in the middle | |
1131 | CommitEntry commit_entry = {snapshots[cache_size] + 1, | |
1132 | snapshots[cache_size + 2] + 1}; | |
1133 | wp_db->old_commit_map_empty_ = true; // reset | |
1134 | wp_db->old_commit_map_.clear(); | |
1135 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1136 | ASSERT_EQ(wp_db->old_commit_map_.size(), 2); | |
1137 | } | |
1138 | ||
1139 | // Test that search will include both snapshot cache and old snapshots | |
1140 | // Case 1: includes all in snapshot cache | |
1141 | { | |
1142 | CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1}; | |
1143 | wp_db->old_commit_map_empty_ = true; // reset | |
1144 | wp_db->old_commit_map_.clear(); | |
1145 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1146 | ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size()); | |
1147 | } | |
1148 | ||
1149 | // Case 2: includes all snapshot caches except the smallest | |
1150 | { | |
1151 | CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1}; | |
1152 | wp_db->old_commit_map_empty_ = true; // reset | |
1153 | wp_db->old_commit_map_.clear(); | |
1154 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1155 | ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1); | |
1156 | } | |
1157 | ||
1158 | // Case 3: includes only the largest of snapshot cache | |
1159 | { | |
1160 | CommitEntry commit_entry = {snapshots[cache_size - 1] - 1, | |
1161 | snapshots.back() + 1}; | |
1162 | wp_db->old_commit_map_empty_ = true; // reset | |
1163 | wp_db->old_commit_map_.clear(); | |
1164 | wp_db->CheckAgainstSnapshots(commit_entry); | |
1165 | ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1); | |
1166 | } | |
11fdf7f2 TL |
1167 | } |
1168 | ||
1e59de90 | 1169 | #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) |
11fdf7f2 TL |
1170 | // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in |
1171 | // parallel with UpdateSnapshots. | |
f67539c2 | 1172 | TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccess) { |
11fdf7f2 TL |
1173 | // We have a sync point in the method under test after checking each snapshot. |
1174 | // If you increase the max number of snapshots in this test, more sync points | |
1175 | // in the methods must also be added. | |
1176 | const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l, | |
1177 | 60l, 70l, 80l, 90l, 100l}; | |
1178 | const size_t snapshot_cache_bits = 2; | |
1179 | // Safety check to express the intended size in the test. Can be adjusted if | |
1180 | // the snapshots lists changed. | |
1e59de90 | 1181 | ASSERT_EQ((1ul << snapshot_cache_bits) * 2 + 2, snapshots.size()); |
11fdf7f2 TL |
1182 | SequenceNumber version = 1000l; |
1183 | // Choose the cache size so that the new snapshot list could replace all the | |
1184 | // existing items in the cache and also have some overflow. | |
1185 | DBImpl* mock_db = new DBImpl(options, dbname); | |
494da23a | 1186 | UpdateTransactionDBOptions(snapshot_cache_bits); |
11fdf7f2 | 1187 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( |
494da23a | 1188 | new WritePreparedTxnDBMock(mock_db, txn_db_options)); |
11fdf7f2 TL |
1189 | const size_t extra = 2; |
1190 | size_t loop_id = 0; | |
1191 | // Add up to extra items that do not fit into the cache | |
1192 | for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra; | |
1193 | old_size++) { | |
1194 | const std::vector<SequenceNumber> old_snapshots( | |
1195 | snapshots.begin(), snapshots.begin() + old_size); | |
1196 | ||
1197 | // Each member of old snapshot might or might not appear in the new list. We | |
1198 | // create a common_snapshots for each combination. | |
1199 | size_t new_comb_cnt = size_t(1) << old_size; | |
1200 | for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) { | |
1201 | if (loop_id % split_cnt_ != split_id_) continue; | |
1202 | printf("."); // To signal progress | |
1203 | fflush(stdout); | |
1204 | std::vector<SequenceNumber> common_snapshots; | |
1205 | for (size_t i = 0; i < old_snapshots.size(); i++) { | |
1206 | if (IsInCombination(i, new_comb)) { | |
1207 | common_snapshots.push_back(old_snapshots[i]); | |
1208 | } | |
1209 | } | |
1210 | // And add some new snapshots to the common list | |
1211 | for (size_t added_snapshots = 0; | |
1212 | added_snapshots <= snapshots.size() - old_snapshots.size(); | |
1213 | added_snapshots++) { | |
1214 | std::vector<SequenceNumber> new_snapshots = common_snapshots; | |
1215 | for (size_t i = 0; i < added_snapshots; i++) { | |
1216 | new_snapshots.push_back(snapshots[old_snapshots.size() + i]); | |
1217 | } | |
1218 | for (auto it = common_snapshots.begin(); it != common_snapshots.end(); | |
f67539c2 | 1219 | ++it) { |
11fdf7f2 TL |
1220 | auto snapshot = *it; |
1221 | // Create a commit entry that is around the snapshot and thus should | |
1222 | // be not be discarded | |
1223 | CommitEntry entry = {static_cast<uint64_t>(snapshot - 1), | |
1224 | snapshot + 1}; | |
1225 | // The critical part is when iterating the snapshot cache. Afterwards, | |
1226 | // we are operating under the lock | |
1227 | size_t a_range = | |
1228 | std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1; | |
1229 | size_t b_range = | |
1230 | std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1; | |
1231 | // Break each thread at two points | |
1232 | for (size_t a1 = 1; a1 <= a_range; a1++) { | |
1233 | for (size_t a2 = a1 + 1; a2 <= a_range; a2++) { | |
1234 | for (size_t b1 = 1; b1 <= b_range; b1++) { | |
1235 | for (size_t b2 = b1 + 1; b2 <= b_range; b2++) { | |
1236 | SnapshotConcurrentAccessTestInternal( | |
1237 | wp_db.get(), old_snapshots, new_snapshots, entry, version, | |
1238 | a1, a2, b1, b2); | |
1239 | } | |
1240 | } | |
1241 | } | |
1242 | } | |
1243 | } | |
1244 | } | |
1245 | } | |
1246 | } | |
1247 | printf("\n"); | |
1248 | } | |
1e59de90 | 1249 | #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) |
11fdf7f2 TL |
1250 | |
1251 | // This test clarifies the contract of AdvanceMaxEvictedSeq method | |
f67539c2 | 1252 | TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasic) { |
11fdf7f2 TL |
1253 | DBImpl* mock_db = new DBImpl(options, dbname); |
1254 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( | |
1255 | new WritePreparedTxnDBMock(mock_db, txn_db_options)); | |
1256 | ||
1257 | // 1. Set the initial values for max, prepared, and snapshots | |
1258 | SequenceNumber zero_max = 0l; | |
1259 | // Set the initial list of prepared txns | |
1260 | const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100, | |
1261 | 150, 200, 250}; | |
1262 | for (auto p : initial_prepared) { | |
1263 | wp_db->AddPrepared(p); | |
1264 | } | |
1265 | // This updates the max value and also set old prepared | |
1266 | SequenceNumber init_max = 100; | |
1267 | wp_db->AdvanceMaxEvictedSeq(zero_max, init_max); | |
1268 | const std::vector<SequenceNumber> initial_snapshots = {20, 40}; | |
1269 | wp_db->SetDBSnapshots(initial_snapshots); | |
1270 | // This will update the internal cache of snapshots from the DB | |
1271 | wp_db->UpdateSnapshots(initial_snapshots, init_max); | |
1272 | ||
1273 | // 2. Invoke AdvanceMaxEvictedSeq | |
1274 | const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300}; | |
1275 | wp_db->SetDBSnapshots(latest_snapshots); | |
1276 | SequenceNumber new_max = 200; | |
1277 | wp_db->AdvanceMaxEvictedSeq(init_max, new_max); | |
1278 | ||
1279 | // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract | |
1280 | // a. max should be updated to new_max | |
1281 | ASSERT_EQ(wp_db->max_evicted_seq_, new_max); | |
1282 | // b. delayed prepared should contain every txn <= max and prepared should | |
1283 | // only contain txns > max | |
1284 | auto it = initial_prepared.begin(); | |
f67539c2 | 1285 | for (; it != initial_prepared.end() && *it <= new_max; ++it) { |
11fdf7f2 TL |
1286 | ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it)); |
1287 | } | |
1288 | ASSERT_TRUE(wp_db->delayed_prepared_.empty()); | |
1289 | for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty(); | |
f67539c2 | 1290 | ++it, wp_db->prepared_txns_.pop()) { |
11fdf7f2 TL |
1291 | ASSERT_EQ(*it, wp_db->prepared_txns_.top()); |
1292 | } | |
1293 | ASSERT_TRUE(it == initial_prepared.end()); | |
1294 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1295 | // c. snapshots should contain everything below new_max | |
1296 | auto sit = latest_snapshots.begin(); | |
1297 | for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max && | |
1298 | i < wp_db->snapshots_total_; | |
1299 | sit++, i++) { | |
1300 | ASSERT_TRUE(i < wp_db->snapshots_total_); | |
1301 | // This test is in small scale and the list of snapshots are assumed to be | |
1302 | // within the cache size limit. This is just a safety check to double check | |
1303 | // that assumption. | |
1304 | ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE); | |
1305 | ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]); | |
1306 | } | |
1307 | } | |
1308 | ||
494da23a TL |
1309 | // A new snapshot should always be always larger than max_evicted_seq_ |
1310 | // Otherwise the snapshot does not go through AdvanceMaxEvictedSeq | |
1311 | TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) { | |
1312 | WriteOptions woptions; | |
1313 | TransactionOptions txn_options; | |
1314 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1315 | Transaction* txn0 = db->BeginTransaction(woptions, txn_options); | |
1316 | ASSERT_OK(txn0->Put(Slice("key"), Slice("value"))); | |
1317 | ASSERT_OK(txn0->Commit()); | |
1318 | const SequenceNumber seq = txn0->GetId(); // is also prepare seq | |
1319 | delete txn0; | |
1320 | std::vector<Transaction*> txns; | |
1321 | // Inc seq without committing anything | |
1322 | for (int i = 0; i < 10; i++) { | |
1323 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
1324 | ASSERT_OK(txn->SetName("xid" + std::to_string(i))); | |
1325 | ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value"))); | |
1326 | ASSERT_OK(txn->Prepare()); | |
1327 | txns.push_back(txn); | |
1328 | } | |
1329 | ||
1330 | // The new commit is seq + 10 | |
1331 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1332 | auto snap = wp_db->GetSnapshot(); | |
1333 | const SequenceNumber last_seq = snap->GetSequenceNumber(); | |
1334 | wp_db->ReleaseSnapshot(snap); | |
1335 | ASSERT_LT(seq, last_seq); | |
1336 | // Otherwise our test is not effective | |
1337 | ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED); | |
1338 | ||
1339 | // Evict seq out of commit cache | |
1340 | const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE; | |
1341 | // Check that the next write could make max go beyond last | |
1342 | auto last_max = wp_db->max_evicted_seq_.load(); | |
1343 | wp_db->AddCommitted(overwrite_seq, overwrite_seq); | |
1344 | // Check that eviction has advanced the max | |
1345 | ASSERT_LT(last_max, wp_db->max_evicted_seq_.load()); | |
1346 | // Check that the new max has not advanced the last seq | |
1347 | ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq); | |
1348 | for (auto txn : txns) { | |
1349 | txn->Rollback(); | |
1350 | delete txn; | |
1351 | } | |
1352 | } | |
1353 | ||
1354 | // A new snapshot should always be always larger than max_evicted_seq_ | |
1355 | // In very rare cases max could be below last published seq. Test that | |
1356 | // taking snapshot will wait for max to catch up. | |
1357 | TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) { | |
1358 | const size_t snapshot_cache_bits = 7; // same as default | |
1359 | const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction | |
1360 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 | 1361 | ASSERT_OK(ReOpen()); |
494da23a TL |
1362 | WriteOptions woptions; |
1363 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1364 | ||
1365 | const int writes = 50; | |
1366 | const int batch_cnt = 4; | |
f67539c2 | 1367 | ROCKSDB_NAMESPACE::port::Thread t1([&]() { |
494da23a TL |
1368 | for (int i = 0; i < writes; i++) { |
1369 | WriteBatch batch; | |
1370 | // For duplicate keys cause 4 commit entries, each evicting an entry that | |
f67539c2 | 1371 | // is not published yet, thus causing max evicted seq go higher than last |
494da23a TL |
1372 | // published. |
1373 | for (int b = 0; b < batch_cnt; b++) { | |
1e59de90 | 1374 | ASSERT_OK(batch.Put("foo", "foo")); |
494da23a | 1375 | } |
1e59de90 | 1376 | ASSERT_OK(db->Write(woptions, &batch)); |
494da23a TL |
1377 | } |
1378 | }); | |
1379 | ||
f67539c2 | 1380 | ROCKSDB_NAMESPACE::port::Thread t2([&]() { |
494da23a TL |
1381 | while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread |
1382 | std::this_thread::yield(); | |
1383 | } | |
1384 | for (int i = 0; i < 10; i++) { | |
f67539c2 | 1385 | SequenceNumber max_lower_bound = wp_db->max_evicted_seq_; |
494da23a TL |
1386 | auto snap = db->GetSnapshot(); |
1387 | if (snap->GetSequenceNumber() != 0) { | |
f67539c2 TL |
1388 | // Value of max_evicted_seq_ when snapshot was taken in unknown. We thus |
1389 | // compare with the lower bound instead as an approximation. | |
1390 | ASSERT_LT(max_lower_bound, snap->GetSequenceNumber()); | |
494da23a TL |
1391 | } // seq 0 is ok to be less than max since nothing is visible to it |
1392 | db->ReleaseSnapshot(snap); | |
1393 | } | |
1394 | }); | |
1395 | ||
1396 | t1.join(); | |
1397 | t2.join(); | |
1398 | ||
1399 | // Make sure that the test has worked and seq number has advanced as we | |
1400 | // thought | |
1401 | auto snap = db->GetSnapshot(); | |
1402 | ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1); | |
1403 | db->ReleaseSnapshot(snap); | |
1404 | } | |
1405 | ||
f67539c2 TL |
1406 | // Test that reads without snapshots would not hit an undefined state |
1407 | TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) { | |
1408 | const size_t snapshot_cache_bits = 7; // same as default | |
1409 | const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction | |
1410 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 | 1411 | ASSERT_OK(ReOpen()); |
f67539c2 TL |
1412 | WriteOptions woptions; |
1413 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1414 | ||
1415 | const int writes = 50; | |
1416 | ROCKSDB_NAMESPACE::port::Thread t1([&]() { | |
1417 | for (int i = 0; i < writes; i++) { | |
1418 | WriteBatch batch; | |
1e59de90 TL |
1419 | ASSERT_OK(batch.Put("key", "foo")); |
1420 | ASSERT_OK(db->Write(woptions, &batch)); | |
f67539c2 TL |
1421 | } |
1422 | }); | |
1423 | ||
1424 | ROCKSDB_NAMESPACE::port::Thread t2([&]() { | |
1425 | while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread | |
1426 | std::this_thread::yield(); | |
1427 | } | |
1428 | ReadOptions ropt; | |
1429 | PinnableSlice pinnable_val; | |
1430 | TransactionOptions txn_options; | |
1431 | for (int i = 0; i < 10; i++) { | |
1432 | auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); | |
1433 | ASSERT_TRUE(s.ok() || s.IsTryAgain()); | |
1434 | pinnable_val.Reset(); | |
1435 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
1436 | s = txn->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); | |
1437 | ASSERT_TRUE(s.ok() || s.IsTryAgain()); | |
1438 | pinnable_val.Reset(); | |
1439 | std::vector<std::string> values; | |
1440 | auto s_vec = | |
1441 | txn->MultiGet(ropt, {db->DefaultColumnFamily()}, {"key"}, &values); | |
1442 | ASSERT_EQ(1, values.size()); | |
1443 | ASSERT_EQ(1, s_vec.size()); | |
1444 | s = s_vec[0]; | |
1445 | ASSERT_TRUE(s.ok() || s.IsTryAgain()); | |
1446 | Slice key("key"); | |
1447 | txn->MultiGet(ropt, db->DefaultColumnFamily(), 1, &key, &pinnable_val, &s, | |
1448 | true); | |
1449 | ASSERT_TRUE(s.ok() || s.IsTryAgain()); | |
1450 | delete txn; | |
1451 | } | |
1452 | }); | |
1453 | ||
1454 | t1.join(); | |
1455 | t2.join(); | |
1456 | ||
1457 | // Make sure that the test has worked and seq number has advanced as we | |
1458 | // thought | |
1459 | auto snap = db->GetSnapshot(); | |
1460 | ASSERT_GT(snap->GetSequenceNumber(), writes - 1); | |
1461 | db->ReleaseSnapshot(snap); | |
1462 | } | |
1463 | ||
494da23a TL |
1464 | // Check that old_commit_map_ cleanup works correctly if the snapshot equals |
1465 | // max_evicted_seq_. | |
1466 | TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) { | |
1467 | const size_t snapshot_cache_bits = 7; // same as default | |
1468 | const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction | |
1469 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 | 1470 | ASSERT_OK(ReOpen()); |
494da23a TL |
1471 | WriteOptions woptions; |
1472 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1473 | // Insert something to increase seq | |
1474 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1475 | auto snap = db->GetSnapshot(); | |
1476 | auto snap_seq = snap->GetSequenceNumber(); | |
1477 | // Another insert should trigger eviction + load snapshot from db | |
1478 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1479 | // This is the scenario that we check agaisnt | |
1480 | ASSERT_EQ(snap_seq, wp_db->max_evicted_seq_); | |
1481 | // old_commit_map_ now has some data that needs gc | |
1482 | ASSERT_EQ(1, wp_db->snapshots_total_); | |
1483 | ASSERT_EQ(1, wp_db->old_commit_map_.size()); | |
1484 | ||
1485 | db->ReleaseSnapshot(snap); | |
1486 | ||
1487 | // Another insert should trigger eviction + load snapshot from db | |
1488 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1489 | ||
1490 | // the snapshot and related metadata must be properly garbage collected | |
1491 | ASSERT_EQ(0, wp_db->snapshots_total_); | |
1492 | ASSERT_TRUE(wp_db->snapshots_all_.empty()); | |
1493 | ASSERT_EQ(0, wp_db->old_commit_map_.size()); | |
1494 | } | |
1495 | ||
1496 | TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) { | |
1497 | auto snap = db->GetSnapshot(); | |
1498 | auto seq1 = snap->GetSequenceNumber(); | |
1499 | db->ReleaseSnapshot(snap); | |
1500 | ||
1501 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); | |
1502 | wp_db->AdvanceSeqByOne(); | |
1503 | ||
1504 | snap = db->GetSnapshot(); | |
1505 | auto seq2 = snap->GetSequenceNumber(); | |
1506 | db->ReleaseSnapshot(snap); | |
1507 | ||
1508 | ASSERT_LT(seq1, seq2); | |
1509 | } | |
1510 | ||
1511 | // Test that the txn Initilize calls the overridden functions | |
1512 | TEST_P(WritePreparedTransactionTest, TxnInitialize) { | |
1513 | TransactionOptions txn_options; | |
1514 | WriteOptions write_options; | |
1515 | ASSERT_OK(db->Put(write_options, "key", "value")); | |
1516 | Transaction* txn0 = db->BeginTransaction(write_options, txn_options); | |
1517 | ASSERT_OK(txn0->SetName("xid")); | |
1518 | ASSERT_OK(txn0->Put(Slice("key"), Slice("value1"))); | |
1519 | ASSERT_OK(txn0->Prepare()); | |
1520 | ||
1521 | // SetSnapshot is overridden to update min_uncommitted_ | |
1522 | txn_options.set_snapshot = true; | |
1523 | Transaction* txn1 = db->BeginTransaction(write_options, txn_options); | |
1524 | auto snap = txn1->GetSnapshot(); | |
1525 | auto snap_impl = reinterpret_cast<const SnapshotImpl*>(snap); | |
1526 | // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be | |
1527 | // udpated | |
1528 | ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq); | |
1529 | ||
1e59de90 TL |
1530 | ASSERT_OK(txn0->Rollback()); |
1531 | ASSERT_OK(txn1->Rollback()); | |
494da23a TL |
1532 | delete txn0; |
1533 | delete txn1; | |
1534 | } | |
1535 | ||
11fdf7f2 TL |
1536 | // This tests that transactions with duplicate keys perform correctly after max |
1537 | // is advancing their prepared sequence numbers. This will not be the case if | |
1538 | // for example the txn does not add the prepared seq for the second sub-batch to | |
494da23a | 1539 | // the PreparedHeap structure. |
f67539c2 TL |
1540 | TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicates) { |
1541 | const size_t snapshot_cache_bits = 7; // same as default | |
1542 | const size_t commit_cache_bits = 1; // disable commit cache | |
1543 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 | 1544 | ASSERT_OK(ReOpen()); |
f67539c2 TL |
1545 | |
1546 | ReadOptions ropt; | |
1547 | PinnableSlice pinnable_val; | |
11fdf7f2 TL |
1548 | WriteOptions write_options; |
1549 | TransactionOptions txn_options; | |
1550 | Transaction* txn0 = db->BeginTransaction(write_options, txn_options); | |
1551 | ASSERT_OK(txn0->SetName("xid")); | |
1552 | ASSERT_OK(txn0->Put(Slice("key"), Slice("value1"))); | |
1553 | ASSERT_OK(txn0->Put(Slice("key"), Slice("value2"))); | |
1554 | ASSERT_OK(txn0->Prepare()); | |
1555 | ||
f67539c2 TL |
1556 | ASSERT_OK(db->Put(write_options, "key2", "value")); |
1557 | // Will cause max advance due to disabled commit cache | |
1558 | ASSERT_OK(db->Put(write_options, "key3", "value")); | |
11fdf7f2 | 1559 | |
11fdf7f2 TL |
1560 | auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); |
1561 | ASSERT_TRUE(s.IsNotFound()); | |
1562 | delete txn0; | |
1563 | ||
f67539c2 | 1564 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
1e59de90 | 1565 | ASSERT_OK(wp_db->db_impl_->FlushWAL(true)); |
11fdf7f2 | 1566 | wp_db->TEST_Crash(); |
1e59de90 TL |
1567 | ASSERT_OK(ReOpenNoDelete()); |
1568 | ASSERT_NE(db, nullptr); | |
11fdf7f2 TL |
1569 | s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); |
1570 | ASSERT_TRUE(s.IsNotFound()); | |
1571 | ||
1572 | txn0 = db->GetTransactionByName("xid"); | |
1573 | ASSERT_OK(txn0->Rollback()); | |
1574 | delete txn0; | |
1575 | } | |
1576 | ||
1e59de90 | 1577 | #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) |
f67539c2 TL |
1578 | // Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and |
1579 | // delayed_prepared_, when is run concurrently with advancing max_evicted_seq, | |
1580 | // which moves prepared txns from prepared_txns_ to delayed_prepared_. | |
1581 | TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) { | |
1582 | const size_t snapshot_cache_bits = 7; // same as default | |
1583 | const size_t commit_cache_bits = 1; // disable commit cache | |
1584 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 | 1585 | ASSERT_OK(ReOpen()); |
f67539c2 TL |
1586 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
1587 | ReadOptions ropt; | |
1588 | PinnableSlice pinnable_val; | |
1589 | WriteOptions write_options; | |
1590 | TransactionOptions txn_options; | |
1591 | std::vector<Transaction*> txns, committed_txns; | |
1592 | ||
1593 | const int cnt = 100; | |
1594 | for (int i = 0; i < cnt; i++) { | |
1595 | Transaction* txn = db->BeginTransaction(write_options, txn_options); | |
1e59de90 TL |
1596 | ASSERT_OK(txn->SetName("xid" + std::to_string(i))); |
1597 | auto key = "key1" + std::to_string(i); | |
1598 | auto value = "value1" + std::to_string(i); | |
f67539c2 TL |
1599 | ASSERT_OK(txn->Put(Slice(key), Slice(value))); |
1600 | ASSERT_OK(txn->Prepare()); | |
1601 | txns.push_back(txn); | |
1602 | } | |
1603 | ||
1604 | port::Mutex mutex; | |
1605 | Random rnd(1103); | |
1606 | ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() { | |
1607 | for (int i = 0; i < cnt; i++) { | |
1608 | uint32_t index = rnd.Uniform(cnt - i); | |
1609 | Transaction* txn; | |
1610 | { | |
1611 | MutexLock l(&mutex); | |
1612 | txn = txns[index]; | |
1613 | txns.erase(txns.begin() + index); | |
1614 | } | |
1615 | // Since commit cache is practically disabled, commit results in immediate | |
1616 | // advance in max_evicted_seq_ and subsequently moving some prepared txns | |
1617 | // to delayed_prepared_. | |
1e59de90 | 1618 | ASSERT_OK(txn->Commit()); |
f67539c2 TL |
1619 | committed_txns.push_back(txn); |
1620 | } | |
1621 | }); | |
1622 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { | |
1623 | while (1) { | |
1624 | MutexLock l(&mutex); | |
1625 | if (txns.empty()) { | |
1626 | break; | |
1627 | } | |
1628 | auto min_uncommitted = wp_db->SmallestUnCommittedSeq(); | |
1629 | ASSERT_LE(min_uncommitted, (*txns.begin())->GetId()); | |
1630 | } | |
1631 | }); | |
1632 | ||
1633 | commit_thread.join(); | |
1634 | read_thread.join(); | |
1635 | for (auto txn : committed_txns) { | |
1636 | delete txn; | |
1637 | } | |
1638 | } | |
1e59de90 | 1639 | #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) |
f67539c2 TL |
1640 | |
1641 | TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) { | |
11fdf7f2 TL |
1642 | // Given the sequential run of txns, with this timeout we should never see a |
1643 | // deadlock nor a timeout unless we have a key conflict, which should be | |
1644 | // almost infeasible. | |
1645 | txn_db_options.transaction_lock_timeout = 1000; | |
1646 | txn_db_options.default_lock_timeout = 1000; | |
1e59de90 | 1647 | ASSERT_OK(ReOpen()); |
11fdf7f2 TL |
1648 | FlushOptions fopt; |
1649 | ||
1650 | // Number of different txn types we use in this test | |
1651 | const size_t type_cnt = 5; | |
1652 | // The size of the first write group | |
1653 | // TODO(myabandeh): This should be increase for pre-release tests | |
1654 | const size_t first_group_size = 2; | |
1655 | // Total number of txns we run in each test | |
1656 | // TODO(myabandeh): This should be increase for pre-release tests | |
1657 | const size_t txn_cnt = first_group_size + 1; | |
1658 | ||
1659 | size_t base[txn_cnt + 1] = { | |
1660 | 1, | |
1661 | }; | |
1662 | for (size_t bi = 1; bi <= txn_cnt; bi++) { | |
1663 | base[bi] = base[bi - 1] * type_cnt; | |
1664 | } | |
1665 | const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt)); | |
1666 | printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n); | |
1e59de90 TL |
1667 | for (size_t n = 0; n < max_n; n++) { |
1668 | if (n > 0) { | |
1669 | ASSERT_OK(ReOpen()); | |
1670 | } | |
1671 | ||
11fdf7f2 TL |
1672 | if (n % split_cnt_ != split_id_) continue; |
1673 | if (n % 1000 == 0) { | |
1674 | printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n); | |
1675 | } | |
20effc67 | 1676 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 | 1677 | auto seq = db_impl->TEST_GetLastVisibleSequence(); |
f67539c2 | 1678 | with_empty_commits = 0; |
11fdf7f2 TL |
1679 | exp_seq = seq; |
1680 | // This is increased before writing the batch for commit | |
1681 | commit_writes = 0; | |
1682 | // This is increased before txn starts linking if it expects to do a commit | |
1683 | // eventually | |
1684 | expected_commits = 0; | |
1685 | std::vector<port::Thread> threads; | |
1686 | ||
1e59de90 | 1687 | linked.store(0, std::memory_order_release); |
11fdf7f2 | 1688 | std::atomic<bool> batch_formed(false); |
f67539c2 | 1689 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
11fdf7f2 TL |
1690 | "WriteThread::EnterAsBatchGroupLeader:End", |
1691 | [&](void* /*arg*/) { batch_formed = true; }); | |
f67539c2 | 1692 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
11fdf7f2 | 1693 | "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) { |
1e59de90 TL |
1694 | size_t orig_linked = linked.fetch_add(1, std::memory_order_acq_rel); |
1695 | if (orig_linked == 0) { | |
11fdf7f2 | 1696 | // Wait until the others are linked too. |
1e59de90 | 1697 | while (linked.load(std::memory_order_acquire) < first_group_size) { |
11fdf7f2 | 1698 | } |
1e59de90 | 1699 | } else if (orig_linked == first_group_size) { |
11fdf7f2 TL |
1700 | // Make the 2nd batch of the rest of writes plus any followup |
1701 | // commits from the first batch | |
1e59de90 TL |
1702 | while (linked.load(std::memory_order_acquire) < |
1703 | txn_cnt + commit_writes) { | |
11fdf7f2 TL |
1704 | } |
1705 | } | |
1706 | // Then we will have one or more batches consisting of follow-up | |
1707 | // commits from the 2nd batch. There is a bit of non-determinism here | |
1708 | // but it should be tolerable. | |
1709 | }); | |
1710 | ||
f67539c2 | 1711 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
11fdf7f2 TL |
1712 | for (size_t bi = 0; bi < txn_cnt; bi++) { |
1713 | // get the bi-th digit in number system based on type_cnt | |
1714 | size_t d = (n % base[bi + 1]) / base[bi]; | |
1715 | switch (d) { | |
1716 | case 0: | |
1e59de90 | 1717 | threads.emplace_back(&TransactionTestBase::TestTxn0, this, bi); |
11fdf7f2 TL |
1718 | break; |
1719 | case 1: | |
1e59de90 | 1720 | threads.emplace_back(&TransactionTestBase::TestTxn1, this, bi); |
11fdf7f2 TL |
1721 | break; |
1722 | case 2: | |
1e59de90 | 1723 | threads.emplace_back(&TransactionTestBase::TestTxn2, this, bi); |
11fdf7f2 TL |
1724 | break; |
1725 | case 3: | |
1e59de90 | 1726 | threads.emplace_back(&TransactionTestBase::TestTxn3, this, bi); |
11fdf7f2 TL |
1727 | break; |
1728 | case 4: | |
1e59de90 | 1729 | threads.emplace_back(&TransactionTestBase::TestTxn3, this, bi); |
11fdf7f2 TL |
1730 | break; |
1731 | default: | |
1e59de90 | 1732 | FAIL(); |
11fdf7f2 TL |
1733 | } |
1734 | // wait to be linked | |
1e59de90 | 1735 | while (linked.load(std::memory_order_acquire) <= bi) { |
11fdf7f2 TL |
1736 | } |
1737 | // after a queue of size first_group_size | |
1738 | if (bi + 1 == first_group_size) { | |
1739 | while (!batch_formed) { | |
1740 | } | |
1741 | // to make it more deterministic, wait until the commits are linked | |
1e59de90 TL |
1742 | while (linked.load(std::memory_order_acquire) <= |
1743 | bi + expected_commits) { | |
11fdf7f2 TL |
1744 | } |
1745 | } | |
1746 | } | |
1747 | for (auto& t : threads) { | |
1748 | t.join(); | |
1749 | } | |
1750 | if (options.two_write_queues) { | |
1751 | // In this case none of the above scheduling tricks to deterministically | |
1752 | // form merged batches works because the writes go to separate queues. | |
1753 | // This would result in different write groups in each run of the test. We | |
1754 | // still keep the test since although non-deterministic and hard to debug, | |
1755 | // it is still useful to have. | |
1756 | // TODO(myabandeh): Add a deterministic unit test for two_write_queues | |
1757 | } | |
1758 | ||
1759 | // Check if memtable inserts advanced seq number as expected | |
1760 | seq = db_impl->TEST_GetLastVisibleSequence(); | |
1761 | ASSERT_EQ(exp_seq, seq); | |
1762 | ||
f67539c2 TL |
1763 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
1764 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
11fdf7f2 TL |
1765 | |
1766 | // Check if recovery preserves the last sequence number | |
1e59de90 TL |
1767 | ASSERT_OK(db_impl->FlushWAL(true)); |
1768 | ASSERT_OK(ReOpenNoDelete()); | |
1769 | ASSERT_NE(db, nullptr); | |
20effc67 | 1770 | db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 | 1771 | seq = db_impl->TEST_GetLastVisibleSequence(); |
f67539c2 | 1772 | ASSERT_LE(exp_seq, seq + with_empty_commits); |
11fdf7f2 TL |
1773 | |
1774 | // Check if flush preserves the last sequence number | |
1e59de90 | 1775 | ASSERT_OK(db_impl->Flush(fopt)); |
11fdf7f2 | 1776 | seq = db_impl->GetLatestSequenceNumber(); |
f67539c2 | 1777 | ASSERT_LE(exp_seq, seq + with_empty_commits); |
11fdf7f2 TL |
1778 | |
1779 | // Check if recovery after flush preserves the last sequence number | |
1e59de90 TL |
1780 | ASSERT_OK(db_impl->FlushWAL(true)); |
1781 | ASSERT_OK(ReOpenNoDelete()); | |
1782 | ASSERT_NE(db, nullptr); | |
20effc67 | 1783 | db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 | 1784 | seq = db_impl->GetLatestSequenceNumber(); |
f67539c2 | 1785 | ASSERT_LE(exp_seq, seq + with_empty_commits); |
11fdf7f2 TL |
1786 | } |
1787 | } | |
1788 | ||
1789 | // Run a couple of different txns among them some uncommitted. Restart the db at | |
1790 | // a couple points to check whether the list of uncommitted txns are recovered | |
1791 | // properly. | |
f67539c2 | 1792 | TEST_P(WritePreparedTransactionTest, BasicRecovery) { |
11fdf7f2 | 1793 | options.disable_auto_compactions = true; |
1e59de90 | 1794 | ASSERT_OK(ReOpen()); |
11fdf7f2 TL |
1795 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
1796 | ||
1e59de90 | 1797 | TestTxn0(0); |
11fdf7f2 TL |
1798 | |
1799 | TransactionOptions txn_options; | |
1800 | WriteOptions write_options; | |
1801 | size_t index = 1000; | |
1802 | Transaction* txn0 = db->BeginTransaction(write_options, txn_options); | |
1803 | auto istr0 = std::to_string(index); | |
1804 | auto s = txn0->SetName("xid" + istr0); | |
1805 | ASSERT_OK(s); | |
1806 | s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0)); | |
1807 | ASSERT_OK(s); | |
1808 | s = txn0->Prepare(); | |
1e59de90 | 1809 | ASSERT_OK(s); |
11fdf7f2 TL |
1810 | auto prep_seq_0 = txn0->GetId(); |
1811 | ||
1e59de90 | 1812 | TestTxn1(0); |
11fdf7f2 TL |
1813 | |
1814 | index++; | |
1815 | Transaction* txn1 = db->BeginTransaction(write_options, txn_options); | |
1816 | auto istr1 = std::to_string(index); | |
1817 | s = txn1->SetName("xid" + istr1); | |
1818 | ASSERT_OK(s); | |
1819 | s = txn1->Put(Slice("foo1" + istr1), Slice("bar")); | |
1820 | ASSERT_OK(s); | |
1821 | s = txn1->Prepare(); | |
1e59de90 | 1822 | ASSERT_OK(s); |
11fdf7f2 TL |
1823 | auto prep_seq_1 = txn1->GetId(); |
1824 | ||
1e59de90 | 1825 | TestTxn2(0); |
11fdf7f2 TL |
1826 | |
1827 | ReadOptions ropt; | |
1828 | PinnableSlice pinnable_val; | |
1829 | // Check the value is not committed before restart | |
1830 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1831 | ASSERT_TRUE(s.IsNotFound()); | |
1832 | pinnable_val.Reset(); | |
1833 | ||
1834 | delete txn0; | |
1835 | delete txn1; | |
1e59de90 | 1836 | ASSERT_OK(wp_db->db_impl_->FlushWAL(true)); |
11fdf7f2 | 1837 | wp_db->TEST_Crash(); |
1e59de90 TL |
1838 | ASSERT_OK(ReOpenNoDelete()); |
1839 | ASSERT_NE(db, nullptr); | |
11fdf7f2 TL |
1840 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
1841 | // After recovery, all the uncommitted txns (0 and 1) should be inserted into | |
1842 | // delayed_prepared_ | |
1843 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1844 | ASSERT_FALSE(wp_db->delayed_prepared_empty_); | |
1845 | ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_); | |
1846 | ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_); | |
1847 | { | |
1848 | ReadLock rl(&wp_db->prepared_mutex_); | |
1849 | ASSERT_EQ(2, wp_db->delayed_prepared_.size()); | |
1850 | ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) != | |
1851 | wp_db->delayed_prepared_.end()); | |
1852 | ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) != | |
1853 | wp_db->delayed_prepared_.end()); | |
1854 | } | |
1855 | ||
1856 | // Check the value is still not committed after restart | |
1857 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1858 | ASSERT_TRUE(s.IsNotFound()); | |
1859 | pinnable_val.Reset(); | |
1860 | ||
1e59de90 | 1861 | TestTxn3(0); |
11fdf7f2 TL |
1862 | |
1863 | // Test that a recovered txns will be properly marked committed for the next | |
1864 | // recovery | |
1865 | txn1 = db->GetTransactionByName("xid" + istr1); | |
1866 | ASSERT_NE(txn1, nullptr); | |
1e59de90 | 1867 | ASSERT_OK(txn1->Commit()); |
11fdf7f2 TL |
1868 | delete txn1; |
1869 | ||
1870 | index++; | |
1871 | Transaction* txn2 = db->BeginTransaction(write_options, txn_options); | |
1872 | auto istr2 = std::to_string(index); | |
1873 | s = txn2->SetName("xid" + istr2); | |
1874 | ASSERT_OK(s); | |
1875 | s = txn2->Put(Slice("foo2" + istr2), Slice("bar")); | |
1876 | ASSERT_OK(s); | |
1877 | s = txn2->Prepare(); | |
1e59de90 | 1878 | ASSERT_OK(s); |
11fdf7f2 TL |
1879 | auto prep_seq_2 = txn2->GetId(); |
1880 | ||
1881 | delete txn2; | |
1e59de90 | 1882 | ASSERT_OK(wp_db->db_impl_->FlushWAL(true)); |
11fdf7f2 | 1883 | wp_db->TEST_Crash(); |
1e59de90 TL |
1884 | ASSERT_OK(ReOpenNoDelete()); |
1885 | ASSERT_NE(db, nullptr); | |
11fdf7f2 TL |
1886 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
1887 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1888 | ASSERT_FALSE(wp_db->delayed_prepared_empty_); | |
1889 | ||
1890 | // 0 and 2 are prepared and 1 is committed | |
1891 | { | |
1892 | ReadLock rl(&wp_db->prepared_mutex_); | |
1893 | ASSERT_EQ(2, wp_db->delayed_prepared_.size()); | |
1894 | const auto& end = wp_db->delayed_prepared_.end(); | |
1895 | ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end); | |
1896 | ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end); | |
1897 | ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end); | |
1898 | } | |
1899 | ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_); | |
1900 | ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_); | |
1901 | ||
1902 | // Commit all the remaining txns | |
1903 | txn0 = db->GetTransactionByName("xid" + istr0); | |
1904 | ASSERT_NE(txn0, nullptr); | |
1e59de90 | 1905 | ASSERT_OK(txn0->Commit()); |
11fdf7f2 TL |
1906 | txn2 = db->GetTransactionByName("xid" + istr2); |
1907 | ASSERT_NE(txn2, nullptr); | |
1e59de90 | 1908 | ASSERT_OK(txn2->Commit()); |
11fdf7f2 TL |
1909 | |
1910 | // Check the value is committed after commit | |
1911 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1912 | ASSERT_TRUE(s.ok()); | |
1913 | ASSERT_TRUE(pinnable_val == ("bar0" + istr0)); | |
1914 | pinnable_val.Reset(); | |
1915 | ||
1916 | delete txn0; | |
1917 | delete txn2; | |
1e59de90 TL |
1918 | ASSERT_OK(wp_db->db_impl_->FlushWAL(true)); |
1919 | ASSERT_OK(ReOpenNoDelete()); | |
1920 | ASSERT_NE(db, nullptr); | |
11fdf7f2 TL |
1921 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
1922 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
1923 | ASSERT_TRUE(wp_db->delayed_prepared_empty_); | |
1924 | ||
1925 | // Check the value is still committed after recovery | |
1926 | s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val); | |
1927 | ASSERT_TRUE(s.ok()); | |
1928 | ASSERT_TRUE(pinnable_val == ("bar0" + istr0)); | |
1929 | pinnable_val.Reset(); | |
1930 | } | |
1931 | ||
1932 | // After recovery the commit map is empty while the max is set. The code would | |
494da23a TL |
1933 | // go through a different path which requires a separate test. Test that the |
1934 | // committed data before the restart is visible to all snapshots. | |
f67539c2 | 1935 | TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMap) { |
494da23a | 1936 | for (bool end_with_prepare : {false, true}) { |
1e59de90 | 1937 | ASSERT_OK(ReOpen()); |
494da23a TL |
1938 | WriteOptions woptions; |
1939 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1940 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1941 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1942 | SequenceNumber prepare_seq = kMaxSequenceNumber; | |
1943 | if (end_with_prepare) { | |
1944 | TransactionOptions txn_options; | |
1945 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
1946 | ASSERT_OK(txn->SetName("xid0")); | |
1947 | ASSERT_OK(txn->Prepare()); | |
1948 | prepare_seq = txn->GetId(); | |
1949 | delete txn; | |
1950 | } | |
1951 | dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash(); | |
20effc67 | 1952 | auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
1e59de90 TL |
1953 | ASSERT_OK(db_impl->FlushWAL(true)); |
1954 | ASSERT_OK(ReOpenNoDelete()); | |
494da23a | 1955 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
1e59de90 | 1956 | ASSERT_NE(wp_db, nullptr); |
494da23a TL |
1957 | ASSERT_GT(wp_db->max_evicted_seq_, 0); // max after recovery |
1958 | // Take a snapshot right after recovery | |
1959 | const Snapshot* snap = db->GetSnapshot(); | |
1960 | auto snap_seq = snap->GetSequenceNumber(); | |
1961 | ASSERT_GT(snap_seq, 0); | |
1962 | ||
1963 | for (SequenceNumber seq = 0; | |
1964 | seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) { | |
1965 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq)); | |
1966 | } | |
1967 | if (end_with_prepare) { | |
1968 | ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq)); | |
1969 | } | |
1970 | // trivial check | |
1971 | ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq)); | |
1972 | ||
1973 | db->ReleaseSnapshot(snap); | |
1974 | ||
1975 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1976 | // Take a snapshot after some writes | |
1977 | snap = db->GetSnapshot(); | |
1978 | snap_seq = snap->GetSequenceNumber(); | |
1979 | for (SequenceNumber seq = 0; | |
1980 | seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) { | |
1981 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq)); | |
1982 | } | |
1983 | if (end_with_prepare) { | |
1984 | ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq)); | |
1985 | } | |
1986 | // trivial check | |
1987 | ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq)); | |
1988 | ||
1989 | db->ReleaseSnapshot(snap); | |
1990 | } | |
1991 | } | |
1992 | ||
1993 | // Shows the contract of IsInSnapshot when called on invalid/released snapshots | |
1994 | TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) { | |
11fdf7f2 | 1995 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
494da23a TL |
1996 | WriteOptions woptions; |
1997 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
1998 | // snap seq = 1 | |
1999 | const Snapshot* snap1 = db->GetSnapshot(); | |
2000 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
2001 | ASSERT_OK(db->Put(woptions, "key", "value")); | |
2002 | // snap seq = 3 | |
2003 | const Snapshot* snap2 = db->GetSnapshot(); | |
2004 | const SequenceNumber seq = 1; | |
2005 | // Evict seq out of commit cache | |
2006 | size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq; | |
2007 | wp_db->AddCommitted(overwrite_seq, overwrite_seq); | |
2008 | SequenceNumber snap_seq; | |
2009 | uint64_t min_uncommitted = kMinUnCommittedSeq; | |
2010 | bool released; | |
2011 | ||
2012 | released = false; | |
2013 | snap_seq = snap1->GetSequenceNumber(); | |
2014 | ASSERT_LE(seq, snap_seq); | |
2015 | // Valid snapshot lower than max | |
2016 | ASSERT_LE(snap_seq, wp_db->max_evicted_seq_); | |
2017 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); | |
2018 | ASSERT_FALSE(released); | |
2019 | ||
2020 | released = false; | |
2021 | snap_seq = snap1->GetSequenceNumber(); | |
2022 | // Invaid snapshot lower than max | |
2023 | ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_); | |
2024 | ASSERT_TRUE( | |
2025 | wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released)); | |
2026 | ASSERT_TRUE(released); | |
2027 | ||
2028 | db->ReleaseSnapshot(snap1); | |
2029 | ||
2030 | released = false; | |
2031 | // Released snapshot lower than max | |
2032 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); | |
2033 | // The release does not take affect until the next max advance | |
2034 | ASSERT_FALSE(released); | |
2035 | ||
2036 | released = false; | |
2037 | // Invaid snapshot lower than max | |
2038 | ASSERT_TRUE( | |
2039 | wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released)); | |
2040 | ASSERT_TRUE(released); | |
2041 | ||
2042 | // This make the snapshot release to reflect in txn db structures | |
2043 | wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_, | |
2044 | wp_db->max_evicted_seq_ + 1); | |
2045 | ||
2046 | released = false; | |
2047 | // Released snapshot lower than max | |
2048 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); | |
2049 | ASSERT_TRUE(released); | |
2050 | ||
2051 | released = false; | |
2052 | // Invaid snapshot lower than max | |
2053 | ASSERT_TRUE( | |
2054 | wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released)); | |
2055 | ASSERT_TRUE(released); | |
2056 | ||
2057 | snap_seq = snap2->GetSequenceNumber(); | |
2058 | ||
2059 | released = false; | |
2060 | // Unreleased snapshot lower than max | |
2061 | ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released)); | |
2062 | ASSERT_FALSE(released); | |
2063 | ||
2064 | db->ReleaseSnapshot(snap2); | |
11fdf7f2 TL |
2065 | } |
2066 | ||
2067 | // Test WritePreparedTxnDB's IsInSnapshot against different ordering of | |
2068 | // snapshot, max_committed_seq_, prepared, and commit entries. | |
f67539c2 | 2069 | TEST_P(WritePreparedTransactionTest, IsInSnapshot) { |
11fdf7f2 TL |
2070 | WriteOptions wo; |
2071 | // Use small commit cache to trigger lots of eviction and fast advance of | |
2072 | // max_evicted_seq_ | |
2073 | const size_t commit_cache_bits = 3; | |
2074 | // Same for snapshot cache size | |
2075 | const size_t snapshot_cache_bits = 2; | |
2076 | ||
2077 | // Take some preliminary snapshots first. This is to stress the data structure | |
2078 | // that holds the old snapshots as it will be designed to be efficient when | |
2079 | // only a few snapshots are below the max_evicted_seq_. | |
2080 | for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) { | |
2081 | // Leave some gap between the preliminary snapshots and the final snapshot | |
2082 | // that we check. This should test for also different overlapping scenarios | |
2083 | // between the last snapshot and the commits. | |
2084 | for (int max_gap = 1; max_gap < 10; max_gap++) { | |
2085 | // Since we do not actually write to db, we mock the seq as it would be | |
2086 | // increased by the db. The only exception is that we need db seq to | |
2087 | // advance for our snapshots. for which we apply a dummy put each time we | |
2088 | // increase our mock of seq. | |
2089 | uint64_t seq = 0; | |
2090 | // At each step we prepare a txn and then we commit it in the next txn. | |
2091 | // This emulates the consecutive transactions that write to the same key | |
2092 | uint64_t cur_txn = 0; | |
2093 | // Number of snapshots taken so far | |
2094 | int num_snapshots = 0; | |
2095 | // Number of gaps applied so far | |
2096 | int gap_cnt = 0; | |
2097 | // The final snapshot that we will inspect | |
2098 | uint64_t snapshot = 0; | |
2099 | bool found_committed = false; | |
2100 | // To stress the data structure that maintain prepared txns, at each cycle | |
2101 | // we add a new prepare txn. These do not mean to be committed for | |
2102 | // snapshot inspection. | |
2103 | std::set<uint64_t> prepared; | |
2104 | // We keep the list of txns committed before we take the last snapshot. | |
2105 | // These should be the only seq numbers that will be found in the snapshot | |
2106 | std::set<uint64_t> committed_before; | |
2107 | // The set of commit seq numbers to be excluded from IsInSnapshot queries | |
2108 | std::set<uint64_t> commit_seqs; | |
2109 | DBImpl* mock_db = new DBImpl(options, dbname); | |
494da23a TL |
2110 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); |
2111 | std::unique_ptr<WritePreparedTxnDBMock> wp_db( | |
2112 | new WritePreparedTxnDBMock(mock_db, txn_db_options)); | |
11fdf7f2 TL |
2113 | // We continue until max advances a bit beyond the snapshot. |
2114 | while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { | |
2115 | // do prepare for a transaction | |
2116 | seq++; | |
2117 | wp_db->AddPrepared(seq); | |
2118 | prepared.insert(seq); | |
2119 | ||
2120 | // If cur_txn is not started, do prepare for it. | |
2121 | if (!cur_txn) { | |
2122 | seq++; | |
2123 | cur_txn = seq; | |
2124 | wp_db->AddPrepared(cur_txn); | |
1e59de90 | 2125 | } else { // else commit it |
11fdf7f2 TL |
2126 | seq++; |
2127 | wp_db->AddCommitted(cur_txn, seq); | |
2128 | wp_db->RemovePrepared(cur_txn); | |
2129 | commit_seqs.insert(seq); | |
2130 | if (!snapshot) { | |
2131 | committed_before.insert(cur_txn); | |
2132 | } | |
2133 | cur_txn = 0; | |
2134 | } | |
2135 | ||
2136 | if (num_snapshots < max_snapshots - 1) { | |
2137 | // Take preliminary snapshots | |
2138 | wp_db->TakeSnapshot(seq); | |
2139 | num_snapshots++; | |
2140 | } else if (gap_cnt < max_gap) { | |
2141 | // Wait for some gap before taking the final snapshot | |
2142 | gap_cnt++; | |
2143 | } else if (!snapshot) { | |
2144 | // Take the final snapshot if it is not already taken | |
2145 | snapshot = seq; | |
2146 | wp_db->TakeSnapshot(snapshot); | |
2147 | num_snapshots++; | |
2148 | } | |
2149 | ||
2150 | // If the snapshot is taken, verify seq numbers visible to it. We redo | |
2151 | // it at each cycle to test that the system is still sound when | |
2152 | // max_evicted_seq_ advances. | |
2153 | if (snapshot) { | |
2154 | for (uint64_t s = 1; | |
2155 | s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) { | |
2156 | bool was_committed = | |
2157 | (committed_before.find(s) != committed_before.end()); | |
2158 | bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot); | |
2159 | if (was_committed != is_in_snapshot) { | |
2160 | printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64 | |
2161 | " snapshot %" PRIu64 | |
2162 | " gap_cnt %d num_snapshots %d s %" PRIu64 "\n", | |
2163 | max_snapshots, max_gap, seq, | |
2164 | wp_db->max_evicted_seq_.load(), snapshot, gap_cnt, | |
2165 | num_snapshots, s); | |
2166 | } | |
2167 | ASSERT_EQ(was_committed, is_in_snapshot); | |
2168 | found_committed = found_committed || is_in_snapshot; | |
2169 | } | |
2170 | } | |
2171 | } | |
2172 | // Safety check to make sure the test actually ran | |
2173 | ASSERT_TRUE(found_committed); | |
2174 | // As an extra check, check if prepared set will be properly empty after | |
2175 | // they are committed. | |
2176 | if (cur_txn) { | |
2177 | wp_db->AddCommitted(cur_txn, seq); | |
2178 | wp_db->RemovePrepared(cur_txn); | |
2179 | } | |
2180 | for (auto p : prepared) { | |
2181 | wp_db->AddCommitted(p, seq); | |
2182 | wp_db->RemovePrepared(p); | |
2183 | } | |
2184 | ASSERT_TRUE(wp_db->delayed_prepared_.empty()); | |
2185 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
2186 | } | |
2187 | } | |
2188 | } | |
2189 | ||
2190 | void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s, | |
2191 | PinnableSlice& exp_v, Slice key) { | |
2192 | Status s; | |
2193 | PinnableSlice v; | |
2194 | s = db->Get(roptions, db->DefaultColumnFamily(), key, &v); | |
1e59de90 | 2195 | ASSERT_EQ(exp_s, s); |
11fdf7f2 TL |
2196 | ASSERT_TRUE(s.ok() || s.IsNotFound()); |
2197 | if (s.ok()) { | |
2198 | ASSERT_TRUE(exp_v == v); | |
2199 | } | |
2200 | ||
2201 | // Try with MultiGet API too | |
2202 | std::vector<std::string> values; | |
2203 | auto s_vec = | |
2204 | db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values); | |
2205 | ASSERT_EQ(1, values.size()); | |
2206 | ASSERT_EQ(1, s_vec.size()); | |
2207 | s = s_vec[0]; | |
1e59de90 | 2208 | ASSERT_EQ(exp_s, s); |
11fdf7f2 TL |
2209 | ASSERT_TRUE(s.ok() || s.IsNotFound()); |
2210 | if (s.ok()) { | |
2211 | ASSERT_TRUE(exp_v == values[0]); | |
2212 | } | |
2213 | } | |
2214 | ||
2215 | void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v, | |
2216 | Slice key) { | |
2217 | ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key); | |
2218 | } | |
2219 | ||
f67539c2 | 2220 | TEST_P(WritePreparedTransactionTest, Rollback) { |
11fdf7f2 TL |
2221 | ReadOptions roptions; |
2222 | WriteOptions woptions; | |
2223 | TransactionOptions txn_options; | |
2224 | const size_t num_keys = 4; | |
2225 | const size_t num_values = 5; | |
2226 | for (size_t ikey = 1; ikey <= num_keys; ikey++) { | |
2227 | for (size_t ivalue = 0; ivalue < num_values; ivalue++) { | |
2228 | for (bool crash : {false, true}) { | |
1e59de90 | 2229 | ASSERT_OK(ReOpen()); |
11fdf7f2 | 2230 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
1e59de90 | 2231 | std::string key_str = "key" + std::to_string(ikey); |
11fdf7f2 TL |
2232 | switch (ivalue) { |
2233 | case 0: | |
2234 | break; | |
2235 | case 1: | |
2236 | ASSERT_OK(db->Put(woptions, key_str, "initvalue1")); | |
2237 | break; | |
2238 | case 2: | |
2239 | ASSERT_OK(db->Merge(woptions, key_str, "initvalue2")); | |
2240 | break; | |
2241 | case 3: | |
2242 | ASSERT_OK(db->Delete(woptions, key_str)); | |
2243 | break; | |
2244 | case 4: | |
2245 | ASSERT_OK(db->SingleDelete(woptions, key_str)); | |
2246 | break; | |
2247 | default: | |
1e59de90 | 2248 | FAIL(); |
11fdf7f2 TL |
2249 | } |
2250 | ||
2251 | PinnableSlice v1; | |
2252 | auto s1 = | |
2253 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1); | |
2254 | PinnableSlice v2; | |
2255 | auto s2 = | |
2256 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2); | |
2257 | PinnableSlice v3; | |
2258 | auto s3 = | |
2259 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3); | |
2260 | PinnableSlice v4; | |
2261 | auto s4 = | |
2262 | db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4); | |
2263 | Transaction* txn = db->BeginTransaction(woptions, txn_options); | |
2264 | auto s = txn->SetName("xid0"); | |
2265 | ASSERT_OK(s); | |
2266 | s = txn->Put(Slice("key1"), Slice("value1")); | |
2267 | ASSERT_OK(s); | |
2268 | s = txn->Merge(Slice("key2"), Slice("value2")); | |
2269 | ASSERT_OK(s); | |
2270 | s = txn->Delete(Slice("key3")); | |
2271 | ASSERT_OK(s); | |
2272 | s = txn->SingleDelete(Slice("key4")); | |
2273 | ASSERT_OK(s); | |
2274 | s = txn->Prepare(); | |
2275 | ASSERT_OK(s); | |
2276 | ||
2277 | { | |
2278 | ReadLock rl(&wp_db->prepared_mutex_); | |
2279 | ASSERT_FALSE(wp_db->prepared_txns_.empty()); | |
2280 | ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top()); | |
2281 | } | |
2282 | ||
2283 | ASSERT_SAME(db, s1, v1, "key1"); | |
2284 | ASSERT_SAME(db, s2, v2, "key2"); | |
2285 | ASSERT_SAME(db, s3, v3, "key3"); | |
2286 | ASSERT_SAME(db, s4, v4, "key4"); | |
2287 | ||
2288 | if (crash) { | |
2289 | delete txn; | |
20effc67 | 2290 | auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
1e59de90 | 2291 | ASSERT_OK(db_impl->FlushWAL(true)); |
11fdf7f2 | 2292 | dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash(); |
1e59de90 TL |
2293 | ASSERT_OK(ReOpenNoDelete()); |
2294 | ASSERT_NE(db, nullptr); | |
11fdf7f2 TL |
2295 | wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
2296 | txn = db->GetTransactionByName("xid0"); | |
2297 | ASSERT_FALSE(wp_db->delayed_prepared_empty_); | |
2298 | ReadLock rl(&wp_db->prepared_mutex_); | |
2299 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
2300 | ASSERT_FALSE(wp_db->delayed_prepared_.empty()); | |
2301 | ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) != | |
2302 | wp_db->delayed_prepared_.end()); | |
2303 | } | |
2304 | ||
2305 | ASSERT_SAME(db, s1, v1, "key1"); | |
2306 | ASSERT_SAME(db, s2, v2, "key2"); | |
2307 | ASSERT_SAME(db, s3, v3, "key3"); | |
2308 | ASSERT_SAME(db, s4, v4, "key4"); | |
2309 | ||
2310 | s = txn->Rollback(); | |
2311 | ASSERT_OK(s); | |
2312 | ||
2313 | { | |
2314 | ASSERT_TRUE(wp_db->delayed_prepared_empty_); | |
2315 | ReadLock rl(&wp_db->prepared_mutex_); | |
2316 | ASSERT_TRUE(wp_db->prepared_txns_.empty()); | |
2317 | ASSERT_TRUE(wp_db->delayed_prepared_.empty()); | |
2318 | } | |
2319 | ||
2320 | ASSERT_SAME(db, s1, v1, "key1"); | |
2321 | ASSERT_SAME(db, s2, v2, "key2"); | |
2322 | ASSERT_SAME(db, s3, v3, "key3"); | |
2323 | ASSERT_SAME(db, s4, v4, "key4"); | |
2324 | delete txn; | |
2325 | } | |
2326 | } | |
2327 | } | |
2328 | } | |
2329 | ||
f67539c2 | 2330 | TEST_P(WritePreparedTransactionTest, DisableGCDuringRecovery) { |
11fdf7f2 TL |
2331 | // Use large buffer to avoid memtable flush after 1024 insertions |
2332 | options.write_buffer_size = 1024 * 1024; | |
1e59de90 | 2333 | ASSERT_OK(ReOpen()); |
11fdf7f2 TL |
2334 | std::vector<KeyVersion> versions; |
2335 | uint64_t seq = 0; | |
2336 | for (uint64_t i = 1; i <= 1024; i++) { | |
1e59de90 | 2337 | std::string v = "bar" + std::to_string(i); |
11fdf7f2 TL |
2338 | ASSERT_OK(db->Put(WriteOptions(), "foo", v)); |
2339 | VerifyKeys({{"foo", v}}); | |
2340 | seq++; // one for the key/value | |
2341 | KeyVersion kv = {"foo", v, seq, kTypeValue}; | |
2342 | if (options.two_write_queues) { | |
2343 | seq++; // one for the commit | |
2344 | } | |
2345 | versions.emplace_back(kv); | |
2346 | } | |
2347 | std::reverse(std::begin(versions), std::end(versions)); | |
2348 | VerifyInternalKeys(versions); | |
20effc67 | 2349 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
1e59de90 | 2350 | ASSERT_OK(db_impl->FlushWAL(true)); |
11fdf7f2 TL |
2351 | // Use small buffer to ensure memtable flush during recovery |
2352 | options.write_buffer_size = 1024; | |
1e59de90 | 2353 | ASSERT_OK(ReOpenNoDelete()); |
11fdf7f2 TL |
2354 | VerifyInternalKeys(versions); |
2355 | } | |
2356 | ||
f67539c2 | 2357 | TEST_P(WritePreparedTransactionTest, SequenceNumberZero) { |
11fdf7f2 TL |
2358 | ASSERT_OK(db->Put(WriteOptions(), "foo", "bar")); |
2359 | VerifyKeys({{"foo", "bar"}}); | |
2360 | const Snapshot* snapshot = db->GetSnapshot(); | |
2361 | ASSERT_OK(db->Flush(FlushOptions())); | |
2362 | // Dummy keys to avoid compaction trivially move files and get around actual | |
2363 | // compaction logic. | |
2364 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
2365 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
2366 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
2367 | // Compaction will output keys with sequence number 0, if it is visible to | |
2368 | // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is | |
2369 | // visible to any snapshot. | |
2370 | VerifyKeys({{"foo", "bar"}}); | |
2371 | VerifyKeys({{"foo", "bar"}}, snapshot); | |
2372 | VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}}); | |
2373 | db->ReleaseSnapshot(snapshot); | |
2374 | } | |
2375 | ||
2376 | // Compaction should not remove a key if it is not committed, and should | |
2377 | // proceed with older versions of the key as-if the new version doesn't exist. | |
2378 | TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) { | |
2379 | options.disable_auto_compactions = true; | |
1e59de90 | 2380 | ASSERT_OK(ReOpen()); |
20effc67 | 2381 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 TL |
2382 | // Snapshots to avoid keys get evicted. |
2383 | std::vector<const Snapshot*> snapshots; | |
2384 | // Keep track of expected sequence number. | |
2385 | SequenceNumber expected_seq = 0; | |
2386 | ||
2387 | auto add_key = [&](std::function<Status()> func) { | |
2388 | ASSERT_OK(func()); | |
2389 | expected_seq++; | |
2390 | if (options.two_write_queues) { | |
2391 | expected_seq++; // 1 for commit | |
2392 | } | |
2393 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
2394 | snapshots.push_back(db->GetSnapshot()); | |
2395 | }; | |
2396 | ||
2397 | // Each key here represent a standalone test case. | |
2398 | add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); }); | |
2399 | add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); }); | |
2400 | add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); }); | |
2401 | add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); }); | |
2402 | add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); }); | |
2403 | add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); }); | |
2404 | add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); }); | |
2405 | add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); }); | |
2406 | ASSERT_OK(db->Flush(FlushOptions())); | |
2407 | add_key([&]() { return db->Delete(WriteOptions(), "key6"); }); | |
2408 | add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); }); | |
2409 | ||
2410 | auto* transaction = db->BeginTransaction(WriteOptions()); | |
2411 | ASSERT_OK(transaction->SetName("txn")); | |
2412 | ASSERT_OK(transaction->Put("key1", "value1_2")); | |
2413 | ASSERT_OK(transaction->Delete("key2")); | |
2414 | ASSERT_OK(transaction->SingleDelete("key3")); | |
2415 | ASSERT_OK(transaction->Merge("key4", "value4_2")); | |
2416 | ASSERT_OK(transaction->Merge("key5", "value5_3")); | |
2417 | ASSERT_OK(transaction->Put("key6", "value6_2")); | |
2418 | ASSERT_OK(transaction->Put("key7", "value7_2")); | |
2419 | // Prepare but not commit. | |
2420 | ASSERT_OK(transaction->Prepare()); | |
2421 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
2422 | ASSERT_OK(db->Flush(FlushOptions())); | |
2423 | for (auto* s : snapshots) { | |
2424 | db->ReleaseSnapshot(s); | |
2425 | } | |
2426 | // Dummy keys to avoid compaction trivially move files and get around actual | |
2427 | // compaction logic. | |
2428 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
2429 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
2430 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
2431 | VerifyKeys({ | |
2432 | {"key1", "value1_1"}, | |
2433 | {"key2", "value2_1"}, | |
2434 | {"key3", "value3_1"}, | |
2435 | {"key4", "value4_1"}, | |
2436 | {"key5", "value5_1,value5_2"}, | |
2437 | {"key6", "NOT_FOUND"}, | |
2438 | {"key7", "NOT_FOUND"}, | |
2439 | }); | |
2440 | VerifyInternalKeys({ | |
2441 | {"key1", "value1_2", expected_seq, kTypeValue}, | |
2442 | {"key1", "value1_1", 0, kTypeValue}, | |
2443 | {"key2", "", expected_seq, kTypeDeletion}, | |
2444 | {"key2", "value2_1", 0, kTypeValue}, | |
2445 | {"key3", "", expected_seq, kTypeSingleDeletion}, | |
2446 | {"key3", "value3_1", 0, kTypeValue}, | |
2447 | {"key4", "value4_2", expected_seq, kTypeMerge}, | |
2448 | {"key4", "value4_1", 0, kTypeValue}, | |
2449 | {"key5", "value5_3", expected_seq, kTypeMerge}, | |
2450 | {"key5", "value5_1,value5_2", 0, kTypeValue}, | |
2451 | {"key6", "value6_2", expected_seq, kTypeValue}, | |
2452 | {"key7", "value7_2", expected_seq, kTypeValue}, | |
2453 | }); | |
2454 | ASSERT_OK(transaction->Commit()); | |
2455 | VerifyKeys({ | |
2456 | {"key1", "value1_2"}, | |
2457 | {"key2", "NOT_FOUND"}, | |
2458 | {"key3", "NOT_FOUND"}, | |
2459 | {"key4", "value4_1,value4_2"}, | |
2460 | {"key5", "value5_1,value5_2,value5_3"}, | |
2461 | {"key6", "value6_2"}, | |
2462 | {"key7", "value7_2"}, | |
2463 | }); | |
2464 | delete transaction; | |
2465 | } | |
2466 | ||
2467 | // Compaction should keep keys visible to a snapshot based on commit sequence, | |
2468 | // not just prepare sequence. | |
2469 | TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { | |
2470 | options.disable_auto_compactions = true; | |
1e59de90 | 2471 | ASSERT_OK(ReOpen()); |
11fdf7f2 TL |
2472 | // Keep track of expected sequence number. |
2473 | SequenceNumber expected_seq = 0; | |
2474 | auto* txn1 = db->BeginTransaction(WriteOptions()); | |
2475 | ASSERT_OK(txn1->SetName("txn1")); | |
2476 | ASSERT_OK(txn1->Put("key1", "value1_1")); | |
2477 | ASSERT_OK(txn1->Prepare()); | |
2478 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
2479 | ASSERT_OK(txn1->Commit()); | |
20effc67 | 2480 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 TL |
2481 | ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence()); |
2482 | delete txn1; | |
2483 | // Take a snapshots to avoid keys get evicted before compaction. | |
2484 | const Snapshot* snapshot1 = db->GetSnapshot(); | |
2485 | auto* txn2 = db->BeginTransaction(WriteOptions()); | |
2486 | ASSERT_OK(txn2->SetName("txn2")); | |
2487 | ASSERT_OK(txn2->Put("key2", "value2_1")); | |
2488 | ASSERT_OK(txn2->Prepare()); | |
2489 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
2490 | // txn1 commit before snapshot2 and it is visible to snapshot2. | |
2491 | // txn2 commit after snapshot2 and it is not visible. | |
2492 | const Snapshot* snapshot2 = db->GetSnapshot(); | |
2493 | ASSERT_OK(txn2->Commit()); | |
2494 | ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
2495 | delete txn2; | |
2496 | // Take a snapshots to avoid keys get evicted before compaction. | |
2497 | const Snapshot* snapshot3 = db->GetSnapshot(); | |
2498 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2")); | |
2499 | expected_seq++; // 1 for write | |
2500 | SequenceNumber seq1 = expected_seq; | |
2501 | if (options.two_write_queues) { | |
2502 | expected_seq++; // 1 for commit | |
2503 | } | |
2504 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
2505 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2")); | |
2506 | expected_seq++; // 1 for write | |
2507 | SequenceNumber seq2 = expected_seq; | |
2508 | if (options.two_write_queues) { | |
2509 | expected_seq++; // 1 for commit | |
2510 | } | |
2511 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
2512 | ASSERT_OK(db->Flush(FlushOptions())); | |
2513 | db->ReleaseSnapshot(snapshot1); | |
2514 | db->ReleaseSnapshot(snapshot3); | |
2515 | // Dummy keys to avoid compaction trivially move files and get around actual | |
2516 | // compaction logic. | |
2517 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
2518 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
2519 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
2520 | VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}}); | |
2521 | VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2); | |
2522 | VerifyInternalKeys({ | |
2523 | {"key1", "value1_2", seq1, kTypeValue}, | |
2524 | // "value1_1" is visible to snapshot2. Also keys at bottom level visible | |
2525 | // to earliest snapshot will output with seq = 0. | |
2526 | {"key1", "value1_1", 0, kTypeValue}, | |
2527 | {"key2", "value2_2", seq2, kTypeValue}, | |
2528 | }); | |
2529 | db->ReleaseSnapshot(snapshot2); | |
2530 | } | |
2531 | ||
494da23a TL |
2532 | TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) { |
2533 | const size_t snapshot_cache_bits = 7; // same as default | |
2534 | const size_t commit_cache_bits = 0; // disable commit cache | |
2535 | for (bool has_recent_prepare : {true, false}) { | |
2536 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 | 2537 | ASSERT_OK(ReOpen()); |
494da23a TL |
2538 | |
2539 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); | |
2540 | auto* transaction = | |
2541 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2542 | ASSERT_OK(transaction->SetName("txn")); | |
2543 | ASSERT_OK(transaction->Delete("key1")); | |
2544 | ASSERT_OK(transaction->Prepare()); | |
2545 | // snapshot1 should get min_uncommitted from prepared_txns_ heap. | |
2546 | auto snapshot1 = db->GetSnapshot(); | |
2547 | ASSERT_EQ(transaction->GetId(), | |
2548 | ((SnapshotImpl*)snapshot1)->min_uncommitted_); | |
2549 | // Add a commit to advance max_evicted_seq and move the prepared transaction | |
2550 | // into delayed_prepared_ set. | |
2551 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); | |
2552 | Transaction* txn2 = nullptr; | |
2553 | if (has_recent_prepare) { | |
2554 | txn2 = | |
2555 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2556 | ASSERT_OK(txn2->SetName("txn2")); | |
2557 | ASSERT_OK(txn2->Put("key3", "value3")); | |
2558 | ASSERT_OK(txn2->Prepare()); | |
2559 | } | |
2560 | // snapshot2 should get min_uncommitted from delayed_prepared_ set. | |
2561 | auto snapshot2 = db->GetSnapshot(); | |
2562 | ASSERT_EQ(transaction->GetId(), | |
2563 | ((SnapshotImpl*)snapshot1)->min_uncommitted_); | |
2564 | ASSERT_OK(transaction->Commit()); | |
2565 | delete transaction; | |
2566 | if (has_recent_prepare) { | |
2567 | ASSERT_OK(txn2->Commit()); | |
2568 | delete txn2; | |
2569 | } | |
2570 | VerifyKeys({{"key1", "NOT_FOUND"}}); | |
2571 | VerifyKeys({{"key1", "value1"}}, snapshot1); | |
2572 | VerifyKeys({{"key1", "value1"}}, snapshot2); | |
2573 | db->ReleaseSnapshot(snapshot1); | |
2574 | db->ReleaseSnapshot(snapshot2); | |
2575 | } | |
2576 | } | |
2577 | ||
2578 | // Insert two values, v1 and v2, for a key. Between prepare and commit of v2 | |
2579 | // take two snapshots, s1 and s2. Release s1 during compaction. | |
2580 | // Test to make sure compaction doesn't get confused and think s1 can see both | |
2581 | // values, and thus compact out the older value by mistake. | |
2582 | TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) { | |
2583 | const size_t snapshot_cache_bits = 7; // same as default | |
2584 | const size_t commit_cache_bits = 0; // minimum commit cache | |
2585 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 TL |
2586 | options.disable_auto_compactions = true; |
2587 | ASSERT_OK(ReOpen()); | |
494da23a TL |
2588 | |
2589 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1")); | |
2590 | auto* transaction = | |
2591 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2592 | ASSERT_OK(transaction->SetName("txn")); | |
2593 | ASSERT_OK(transaction->Put("key1", "value1_2")); | |
2594 | ASSERT_OK(transaction->Prepare()); | |
2595 | auto snapshot1 = db->GetSnapshot(); | |
2596 | // Increment sequence number. | |
2597 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); | |
2598 | auto snapshot2 = db->GetSnapshot(); | |
2599 | ASSERT_OK(transaction->Commit()); | |
2600 | delete transaction; | |
2601 | VerifyKeys({{"key1", "value1_2"}}); | |
2602 | VerifyKeys({{"key1", "value1_1"}}, snapshot1); | |
2603 | VerifyKeys({{"key1", "value1_1"}}, snapshot2); | |
2604 | // Add a flush to avoid compaction to fallback to trivial move. | |
2605 | ||
1e59de90 TL |
2606 | // The callback might be called twice, record the calling state to |
2607 | // prevent double calling. | |
2608 | bool callback_finished = false; | |
494da23a | 2609 | auto callback = [&](void*) { |
1e59de90 TL |
2610 | if (callback_finished) { |
2611 | return; | |
2612 | } | |
494da23a TL |
2613 | // Release snapshot1 after CompactionIterator init. |
2614 | // CompactionIterator need to figure out the earliest snapshot | |
2615 | // that can see key1:value1_2 is kMaxSequenceNumber, not | |
2616 | // snapshot1 or snapshot2. | |
2617 | db->ReleaseSnapshot(snapshot1); | |
2618 | // Add some keys to advance max_evicted_seq. | |
2619 | ASSERT_OK(db->Put(WriteOptions(), "key3", "value3")); | |
2620 | ASSERT_OK(db->Put(WriteOptions(), "key4", "value4")); | |
1e59de90 | 2621 | callback_finished = true; |
494da23a TL |
2622 | }; |
2623 | SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", | |
2624 | callback); | |
2625 | SyncPoint::GetInstance()->EnableProcessing(); | |
2626 | ||
2627 | ASSERT_OK(db->Flush(FlushOptions())); | |
2628 | VerifyKeys({{"key1", "value1_2"}}); | |
2629 | VerifyKeys({{"key1", "value1_1"}}, snapshot2); | |
2630 | db->ReleaseSnapshot(snapshot2); | |
2631 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2632 | } | |
2633 | ||
2634 | // Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2, | |
2635 | // after committing v2. Release s1 during compaction, right after compaction | |
2636 | // processes v2 and before processes v1. Test to make sure compaction doesn't | |
2637 | // get confused and believe v1 and v2 are visible to different snapshot | |
2638 | // (v1 by s2, v2 by s1) and refuse to compact out v1. | |
2639 | TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) { | |
2640 | const size_t snapshot_cache_bits = 7; // same as default | |
2641 | const size_t commit_cache_bits = 0; // minimum commit cache | |
2642 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 TL |
2643 | options.disable_auto_compactions = true; |
2644 | ASSERT_OK(ReOpen()); | |
494da23a TL |
2645 | |
2646 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); | |
2647 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value2")); | |
2648 | SequenceNumber v2_seq = db->GetLatestSequenceNumber(); | |
2649 | auto* s1 = db->GetSnapshot(); | |
2650 | // Advance sequence number. | |
2651 | ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy")); | |
2652 | auto* s2 = db->GetSnapshot(); | |
2653 | ||
2654 | int count_value = 0; | |
2655 | auto callback = [&](void* arg) { | |
2656 | auto* ikey = reinterpret_cast<ParsedInternalKey*>(arg); | |
2657 | if (ikey->user_key == "key1") { | |
2658 | count_value++; | |
2659 | if (count_value == 2) { | |
2660 | // Processing v1. | |
2661 | db->ReleaseSnapshot(s1); | |
2662 | // Add some keys to advance max_evicted_seq and update | |
2663 | // old_commit_map. | |
2664 | ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy")); | |
2665 | ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy")); | |
2666 | } | |
2667 | } | |
2668 | }; | |
2669 | SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV", | |
2670 | callback); | |
2671 | SyncPoint::GetInstance()->EnableProcessing(); | |
2672 | ||
2673 | ASSERT_OK(db->Flush(FlushOptions())); | |
2674 | // value1 should be compact out. | |
2675 | VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}}); | |
2676 | ||
2677 | // cleanup | |
2678 | db->ReleaseSnapshot(s2); | |
2679 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2680 | } | |
2681 | ||
2682 | // Insert two values, v1 and v2, for a key. Insert another dummy key | |
2683 | // so to evict the commit cache for v2, while v1 is still in commit cache. | |
2684 | // Take two snapshots, s1 and s2. Release s1 during compaction. | |
2685 | // Since commit cache for v2 is evicted, and old_commit_map don't have | |
2686 | // s1 (it is released), | |
2687 | // TODO(myabandeh): how can we be sure that the v2's commit info is evicted | |
2688 | // (and not v1's)? Instead of putting a dummy, we can directly call | |
2689 | // AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache. | |
2690 | TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) { | |
2691 | const size_t snapshot_cache_bits = 7; // same as default | |
2692 | const size_t commit_cache_bits = 1; // commit cache size = 2 | |
2693 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 TL |
2694 | options.disable_auto_compactions = true; |
2695 | ASSERT_OK(ReOpen()); | |
494da23a TL |
2696 | |
2697 | // Add a dummy key to evict v2 commit cache, but keep v1 commit cache. | |
2698 | // It also advance max_evicted_seq and can trigger old_commit_map cleanup. | |
2699 | auto add_dummy = [&]() { | |
2700 | auto* txn_dummy = | |
2701 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2702 | ASSERT_OK(txn_dummy->SetName("txn_dummy")); | |
2703 | ASSERT_OK(txn_dummy->Put("dummy", "dummy")); | |
2704 | ASSERT_OK(txn_dummy->Prepare()); | |
2705 | ASSERT_OK(txn_dummy->Commit()); | |
2706 | delete txn_dummy; | |
2707 | }; | |
2708 | ||
2709 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); | |
2710 | auto* txn = | |
2711 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2712 | ASSERT_OK(txn->SetName("txn")); | |
2713 | ASSERT_OK(txn->Put("key1", "value2")); | |
2714 | ASSERT_OK(txn->Prepare()); | |
2715 | // TODO(myabandeh): replace it with GetId()? | |
2716 | auto v2_seq = db->GetLatestSequenceNumber(); | |
2717 | ASSERT_OK(txn->Commit()); | |
2718 | delete txn; | |
2719 | auto* s1 = db->GetSnapshot(); | |
2720 | // Dummy key to advance sequence number. | |
2721 | add_dummy(); | |
2722 | auto* s2 = db->GetSnapshot(); | |
2723 | ||
1e59de90 TL |
2724 | // The callback might be called twice, record the calling state to |
2725 | // prevent double calling. | |
2726 | bool callback_finished = false; | |
494da23a | 2727 | auto callback = [&](void*) { |
1e59de90 TL |
2728 | if (callback_finished) { |
2729 | return; | |
2730 | } | |
494da23a TL |
2731 | db->ReleaseSnapshot(s1); |
2732 | // Add some dummy entries to trigger s1 being cleanup from old_commit_map. | |
2733 | add_dummy(); | |
2734 | add_dummy(); | |
1e59de90 | 2735 | callback_finished = true; |
494da23a TL |
2736 | }; |
2737 | SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", | |
2738 | callback); | |
2739 | SyncPoint::GetInstance()->EnableProcessing(); | |
2740 | ||
2741 | ASSERT_OK(db->Flush(FlushOptions())); | |
2742 | // value1 should be compact out. | |
2743 | VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}}); | |
2744 | ||
2745 | db->ReleaseSnapshot(s2); | |
2746 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2747 | } | |
2748 | ||
2749 | TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) { | |
2750 | const size_t snapshot_cache_bits = 7; // same as default | |
2751 | const size_t commit_cache_bits = 0; // minimum commit cache | |
2752 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 TL |
2753 | options.disable_auto_compactions = true; |
2754 | ASSERT_OK(ReOpen()); | |
494da23a TL |
2755 | |
2756 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); | |
1e59de90 | 2757 | SequenceNumber put_seq = db->GetLatestSequenceNumber(); |
494da23a TL |
2758 | auto* transaction = |
2759 | db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); | |
2760 | ASSERT_OK(transaction->SetName("txn")); | |
2761 | ASSERT_OK(transaction->Delete("key1")); | |
2762 | ASSERT_OK(transaction->Prepare()); | |
2763 | SequenceNumber del_seq = db->GetLatestSequenceNumber(); | |
2764 | auto snapshot1 = db->GetSnapshot(); | |
2765 | // Increment sequence number. | |
2766 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); | |
2767 | auto snapshot2 = db->GetSnapshot(); | |
2768 | ASSERT_OK(transaction->Commit()); | |
2769 | delete transaction; | |
2770 | VerifyKeys({{"key1", "NOT_FOUND"}}); | |
2771 | VerifyKeys({{"key1", "value1"}}, snapshot1); | |
2772 | VerifyKeys({{"key1", "value1"}}, snapshot2); | |
2773 | ASSERT_OK(db->Flush(FlushOptions())); | |
2774 | ||
2775 | auto callback = [&](void* compaction) { | |
2776 | // Release snapshot1 after CompactionIterator init. | |
2777 | // CompactionIterator need to double check and find out snapshot2 is now | |
2778 | // the earliest existing snapshot. | |
2779 | if (compaction != nullptr) { | |
2780 | db->ReleaseSnapshot(snapshot1); | |
2781 | // Add some keys to advance max_evicted_seq. | |
2782 | ASSERT_OK(db->Put(WriteOptions(), "key3", "value3")); | |
2783 | ASSERT_OK(db->Put(WriteOptions(), "key4", "value4")); | |
2784 | } | |
2785 | }; | |
2786 | SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", | |
2787 | callback); | |
2788 | SyncPoint::GetInstance()->EnableProcessing(); | |
2789 | ||
2790 | // Dummy keys to avoid compaction trivially move files and get around actual | |
2791 | // compaction logic. | |
2792 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
2793 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
2794 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
2795 | // Only verify for key1. Both the put and delete for the key should be kept. | |
2796 | // Since the delete tombstone is not visible to snapshot2, we need to keep | |
2797 | // at least one version of the key, for write-conflict check. | |
2798 | VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion}, | |
1e59de90 | 2799 | {"key1", "value1", put_seq, kTypeValue}}); |
494da23a TL |
2800 | db->ReleaseSnapshot(snapshot2); |
2801 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2802 | } | |
2803 | ||
1e59de90 TL |
2804 | TEST_P(WritePreparedTransactionTest, |
2805 | ReleaseEarliestSnapshotDuringCompaction_WithSD) { | |
2806 | constexpr size_t kSnapshotCacheBits = 7; // same as default | |
2807 | constexpr size_t kCommitCacheBits = 0; // minimum commit cache | |
2808 | UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); | |
2809 | options.disable_auto_compactions = true; | |
2810 | ASSERT_OK(ReOpen()); | |
2811 | ||
2812 | ASSERT_OK(db->Put(WriteOptions(), "key", "value")); | |
2813 | ASSERT_OK(db->Put(WriteOptions(), "foo", "value")); | |
2814 | ASSERT_OK(db->Flush(FlushOptions())); | |
2815 | ||
2816 | auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(), | |
2817 | /*old_txn=*/nullptr); | |
2818 | ASSERT_OK(txn->SingleDelete("key")); | |
2819 | ASSERT_OK(txn->Put("wow", "value")); | |
2820 | ASSERT_OK(txn->SetName("txn")); | |
2821 | ASSERT_OK(txn->Prepare()); | |
2822 | ASSERT_OK(db->Flush(FlushOptions())); | |
2823 | ||
2824 | const bool two_write_queues = std::get<1>(GetParam()); | |
2825 | if (two_write_queues) { | |
2826 | // In the case of two queues, commit another txn just to bump | |
2827 | // last_published_seq so that a subsequent GetSnapshot() call can return | |
2828 | // a snapshot with higher sequence. | |
2829 | auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(), | |
2830 | /*old_txn=*/nullptr); | |
2831 | ASSERT_OK(dummy_txn->Put("haha", "value")); | |
2832 | ASSERT_OK(dummy_txn->Commit()); | |
2833 | delete dummy_txn; | |
2834 | } | |
2835 | auto* snapshot = db->GetSnapshot(); | |
2836 | ||
2837 | ASSERT_OK(txn->Commit()); | |
2838 | delete txn; | |
2839 | ||
2840 | SyncPoint::GetInstance()->SetCallBack( | |
2841 | "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) { | |
2842 | if (!arg) { | |
2843 | return; | |
2844 | } | |
2845 | db->ReleaseSnapshot(snapshot); | |
2846 | ||
2847 | // Advance max_evicted_seq | |
2848 | ASSERT_OK(db->Put(WriteOptions(), "bar", "value")); | |
2849 | }); | |
2850 | SyncPoint::GetInstance()->EnableProcessing(); | |
2851 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, | |
2852 | /*end=*/nullptr)); | |
2853 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2854 | } | |
2855 | ||
2856 | TEST_P(WritePreparedTransactionTest, | |
2857 | ReleaseEarliestSnapshotDuringCompaction_WithSD2) { | |
2858 | constexpr size_t kSnapshotCacheBits = 7; // same as default | |
2859 | constexpr size_t kCommitCacheBits = 0; // minimum commit cache | |
2860 | UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); | |
2861 | options.disable_auto_compactions = true; | |
2862 | ASSERT_OK(ReOpen()); | |
2863 | ||
2864 | ASSERT_OK(db->Put(WriteOptions(), "foo", "value")); | |
2865 | ASSERT_OK(db->Put(WriteOptions(), "key", "value")); | |
2866 | ASSERT_OK(db->Flush(FlushOptions())); | |
2867 | ||
2868 | auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(), | |
2869 | /*old_txn=*/nullptr); | |
2870 | ASSERT_OK(txn->Put("bar", "value")); | |
2871 | ASSERT_OK(txn->SingleDelete("key")); | |
2872 | ASSERT_OK(txn->SetName("txn")); | |
2873 | ASSERT_OK(txn->Prepare()); | |
2874 | ASSERT_OK(db->Flush(FlushOptions())); | |
2875 | ||
2876 | ASSERT_OK(txn->Commit()); | |
2877 | delete txn; | |
2878 | ||
2879 | ASSERT_OK(db->Put(WriteOptions(), "haha", "value")); | |
2880 | ||
2881 | // Create a dummy transaction to take a snapshot for ww-conflict detection. | |
2882 | TransactionOptions txn_opts; | |
2883 | txn_opts.set_snapshot = true; | |
2884 | auto* dummy_txn = | |
2885 | db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr); | |
2886 | ||
2887 | SyncPoint::GetInstance()->SetCallBack( | |
2888 | "CompactionIterator::NextFromInput:SingleDelete:2", [&](void* /*arg*/) { | |
2889 | ASSERT_OK(dummy_txn->Rollback()); | |
2890 | delete dummy_txn; | |
2891 | ||
2892 | ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value")); | |
2893 | }); | |
2894 | SyncPoint::GetInstance()->EnableProcessing(); | |
2895 | ||
2896 | ASSERT_OK(db->Put(WriteOptions(), "haha2", "value")); | |
2897 | auto* snapshot = db->GetSnapshot(); | |
2898 | ||
2899 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
2900 | db->ReleaseSnapshot(snapshot); | |
2901 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2902 | } | |
2903 | ||
2904 | TEST_P(WritePreparedTransactionTest, | |
2905 | ReleaseEarliestSnapshotDuringCompaction_WithDelete) { | |
2906 | constexpr size_t kSnapshotCacheBits = 7; // same as default | |
2907 | constexpr size_t kCommitCacheBits = 0; // minimum commit cache | |
2908 | UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); | |
2909 | options.disable_auto_compactions = true; | |
2910 | ASSERT_OK(ReOpen()); | |
2911 | ||
2912 | ASSERT_OK(db->Put(WriteOptions(), "a", "value")); | |
2913 | ASSERT_OK(db->Put(WriteOptions(), "b", "value")); | |
2914 | ASSERT_OK(db->Put(WriteOptions(), "c", "value")); | |
2915 | ASSERT_OK(db->Flush(FlushOptions())); | |
2916 | ||
2917 | auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(), | |
2918 | /*old_txn=*/nullptr); | |
2919 | ASSERT_OK(txn->Delete("b")); | |
2920 | ASSERT_OK(txn->SetName("txn")); | |
2921 | ASSERT_OK(txn->Prepare()); | |
2922 | ||
2923 | const bool two_write_queues = std::get<1>(GetParam()); | |
2924 | if (two_write_queues) { | |
2925 | // In the case of two queues, commit another txn just to bump | |
2926 | // last_published_seq so that a subsequent GetSnapshot() call can return | |
2927 | // a snapshot with higher sequence. | |
2928 | auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(), | |
2929 | /*old_txn=*/nullptr); | |
2930 | ASSERT_OK(dummy_txn->Put("haha", "value")); | |
2931 | ASSERT_OK(dummy_txn->Commit()); | |
2932 | delete dummy_txn; | |
2933 | } | |
2934 | auto* snapshot1 = db->GetSnapshot(); | |
2935 | ASSERT_OK(txn->Commit()); | |
2936 | delete txn; | |
2937 | auto* snapshot2 = db->GetSnapshot(); | |
2938 | ||
2939 | SyncPoint::GetInstance()->SetCallBack( | |
2940 | "CompactionIterator::NextFromInput:BottommostDelete:1", [&](void* arg) { | |
2941 | if (!arg) { | |
2942 | return; | |
2943 | } | |
2944 | db->ReleaseSnapshot(snapshot1); | |
2945 | ||
2946 | // Advance max_evicted_seq | |
2947 | ASSERT_OK(db->Put(WriteOptions(), "dummy1", "value")); | |
2948 | }); | |
2949 | SyncPoint::GetInstance()->EnableProcessing(); | |
2950 | ||
2951 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, | |
2952 | /*end=*/nullptr)); | |
2953 | db->ReleaseSnapshot(snapshot2); | |
2954 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2955 | } | |
2956 | ||
2957 | TEST_P(WritePreparedTransactionTest, | |
2958 | ReleaseSnapshotBetweenSDAndPutDuringCompaction) { | |
2959 | constexpr size_t kSnapshotCacheBits = 7; // same as default | |
2960 | constexpr size_t kCommitCacheBits = 0; // minimum commit cache | |
2961 | UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); | |
2962 | options.disable_auto_compactions = true; | |
2963 | ASSERT_OK(ReOpen()); | |
2964 | ||
2965 | // Create a dummy transaction to take a snapshot for ww-conflict detection. | |
2966 | TransactionOptions txn_opts; | |
2967 | txn_opts.set_snapshot = true; | |
2968 | auto* dummy_txn = | |
2969 | db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr); | |
2970 | // Increment seq | |
2971 | ASSERT_OK(db->Put(WriteOptions(), "bar", "value")); | |
2972 | ||
2973 | ASSERT_OK(db->Put(WriteOptions(), "foo", "value")); | |
2974 | ASSERT_OK(db->SingleDelete(WriteOptions(), "foo")); | |
2975 | auto* snapshot1 = db->GetSnapshot(); | |
2976 | // Increment seq | |
2977 | ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value")); | |
2978 | auto* snapshot2 = db->GetSnapshot(); | |
2979 | ||
2980 | SyncPoint::GetInstance()->SetCallBack( | |
2981 | "CompactionIterator::NextFromInput:KeepSDForWW", [&](void* /*arg*/) { | |
2982 | db->ReleaseSnapshot(snapshot1); | |
2983 | ||
2984 | ASSERT_OK(db->Put(WriteOptions(), "dontcare2", "value2")); | |
2985 | }); | |
2986 | SyncPoint::GetInstance()->EnableProcessing(); | |
2987 | ||
2988 | ASSERT_OK(db->Flush(FlushOptions())); | |
2989 | db->ReleaseSnapshot(snapshot2); | |
2990 | ASSERT_OK(dummy_txn->Commit()); | |
2991 | delete dummy_txn; | |
2992 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2993 | } | |
2994 | ||
2995 | TEST_P(WritePreparedTransactionTest, | |
2996 | ReleaseEarliestWriteConflictSnapshot_SingleDelete) { | |
2997 | constexpr size_t kSnapshotCacheBits = 7; // same as default | |
2998 | constexpr size_t kCommitCacheBits = 0; // minimum commit cache | |
2999 | UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); | |
3000 | options.disable_auto_compactions = true; | |
3001 | ASSERT_OK(ReOpen()); | |
3002 | ||
3003 | ASSERT_OK(db->Put(WriteOptions(), "a", "value")); | |
3004 | ASSERT_OK(db->Put(WriteOptions(), "b", "value")); | |
3005 | ASSERT_OK(db->Put(WriteOptions(), "c", "value")); | |
3006 | ASSERT_OK(db->Flush(FlushOptions())); | |
3007 | ||
3008 | { | |
3009 | CompactRangeOptions cro; | |
3010 | cro.change_level = true; | |
3011 | cro.target_level = 2; | |
3012 | ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); | |
3013 | } | |
3014 | ||
3015 | std::unique_ptr<Transaction> txn; | |
3016 | txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions(), | |
3017 | /*old_txn=*/nullptr)); | |
3018 | ASSERT_OK(txn->SetName("txn1")); | |
3019 | ASSERT_OK(txn->SingleDelete("b")); | |
3020 | ASSERT_OK(txn->Prepare()); | |
3021 | ASSERT_OK(txn->Commit()); | |
3022 | ||
3023 | auto* snapshot1 = db->GetSnapshot(); | |
3024 | ||
3025 | // Bump seq of the db by performing writes so that | |
3026 | // earliest_snapshot_ < earliest_write_conflict_snapshot_ in | |
3027 | // CompactionIterator. | |
3028 | ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare")); | |
3029 | ||
3030 | // Create another snapshot for write conflict checking | |
3031 | std::unique_ptr<Transaction> txn2; | |
3032 | { | |
3033 | TransactionOptions txn_opts; | |
3034 | txn_opts.set_snapshot = true; | |
3035 | txn2.reset( | |
3036 | db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr)); | |
3037 | } | |
3038 | ||
3039 | // Bump seq so that the subsequent bg flush won't create a snapshot with the | |
3040 | // same seq as the previous snapshot for conflict checking. | |
3041 | ASSERT_OK(db->Put(WriteOptions(), "y", "dont")); | |
3042 | ||
3043 | ASSERT_OK(db->Flush(FlushOptions())); | |
3044 | ||
3045 | SyncPoint::GetInstance()->DisableProcessing(); | |
3046 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
3047 | SyncPoint::GetInstance()->SetCallBack( | |
3048 | "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* /*arg*/) { | |
3049 | // Rolling back txn2 should release its snapshot(for ww checking). | |
3050 | ASSERT_OK(txn2->Rollback()); | |
3051 | txn2.reset(); | |
3052 | // Advance max_evicted_seq | |
3053 | ASSERT_OK(db->Put(WriteOptions(), "x", "value")); | |
3054 | }); | |
3055 | SyncPoint::GetInstance()->EnableProcessing(); | |
3056 | ||
3057 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, | |
3058 | /*end=*/nullptr)); | |
3059 | ||
3060 | SyncPoint::GetInstance()->DisableProcessing(); | |
3061 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
3062 | ||
3063 | db->ReleaseSnapshot(snapshot1); | |
3064 | } | |
3065 | ||
3066 | TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing) { | |
3067 | constexpr size_t kSnapshotCacheBits = 7; // same as default | |
3068 | constexpr size_t kCommitCacheBits = 0; // minimum commit cache | |
3069 | UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); | |
3070 | options.disable_auto_compactions = true; | |
3071 | ASSERT_OK(ReOpen()); | |
3072 | ||
3073 | ASSERT_OK(db->Put(WriteOptions(), "a", "value")); | |
3074 | ASSERT_OK(db->Put(WriteOptions(), "b", "value")); | |
3075 | ASSERT_OK(db->Put(WriteOptions(), "c", "value")); | |
3076 | ASSERT_OK(db->Flush(FlushOptions())); | |
3077 | ||
3078 | { | |
3079 | CompactRangeOptions cro; | |
3080 | cro.change_level = true; | |
3081 | cro.target_level = 2; | |
3082 | ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); | |
3083 | } | |
3084 | ||
3085 | ASSERT_OK(db->SingleDelete(WriteOptions(), "b")); | |
3086 | ||
3087 | // Take a snapshot so that the SD won't be dropped during flush. | |
3088 | auto* tmp_snapshot = db->GetSnapshot(); | |
3089 | ||
3090 | ASSERT_OK(db->Put(WriteOptions(), "b", "value2")); | |
3091 | auto* snapshot = db->GetSnapshot(); | |
3092 | ASSERT_OK(db->Flush(FlushOptions())); | |
3093 | ||
3094 | db->ReleaseSnapshot(tmp_snapshot); | |
3095 | ||
3096 | // Bump the sequence so that the below bg compaction job's snapshot will be | |
3097 | // different from snapshot's sequence. | |
3098 | ASSERT_OK(db->Put(WriteOptions(), "z", "foo")); | |
3099 | ||
3100 | SyncPoint::GetInstance()->DisableProcessing(); | |
3101 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
3102 | SyncPoint::GetInstance()->SetCallBack( | |
3103 | "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) { | |
3104 | const auto* const ikey = | |
3105 | reinterpret_cast<const ParsedInternalKey*>(arg); | |
3106 | assert(ikey); | |
3107 | if (ikey->user_key == "b") { | |
3108 | assert(ikey->type == kTypeValue); | |
3109 | db->ReleaseSnapshot(snapshot); | |
3110 | ||
3111 | // Bump max_evicted_seq. | |
3112 | ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare")); | |
3113 | } | |
3114 | }); | |
3115 | SyncPoint::GetInstance()->EnableProcessing(); | |
3116 | ||
3117 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, | |
3118 | /*end=*/nullptr)); | |
3119 | ||
3120 | SyncPoint::GetInstance()->DisableProcessing(); | |
3121 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
3122 | } | |
3123 | ||
3124 | TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing2) { | |
3125 | constexpr size_t kSnapshotCacheBits = 7; // same as default | |
3126 | constexpr size_t kCommitCacheBits = 0; // minimum commit cache | |
3127 | UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); | |
3128 | options.disable_auto_compactions = true; | |
3129 | ASSERT_OK(ReOpen()); | |
3130 | ||
3131 | // Generate an L0 with only SD for one key "b". | |
3132 | ASSERT_OK(db->Put(WriteOptions(), "a", "value")); | |
3133 | ASSERT_OK(db->Put(WriteOptions(), "b", "value")); | |
3134 | // Take a snapshot so that subsequent flush outputs the SD for "b". | |
3135 | auto* tmp_snapshot = db->GetSnapshot(); | |
3136 | ASSERT_OK(db->SingleDelete(WriteOptions(), "b")); | |
3137 | ASSERT_OK(db->Put(WriteOptions(), "c", "value")); | |
3138 | ||
3139 | SyncPoint::GetInstance()->DisableProcessing(); | |
3140 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
3141 | SyncPoint::GetInstance()->SetCallBack( | |
3142 | "CompactionIterator::NextFromInput:SingleDelete:3", [&](void* arg) { | |
3143 | if (!arg) { | |
3144 | db->ReleaseSnapshot(tmp_snapshot); | |
3145 | // Bump max_evicted_seq | |
3146 | ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare")); | |
3147 | } | |
3148 | }); | |
3149 | SyncPoint::GetInstance()->EnableProcessing(); | |
3150 | ||
3151 | ASSERT_OK(db->Flush(FlushOptions())); | |
3152 | // Finish generating L0 with only SD for "b". | |
3153 | ||
3154 | SyncPoint::GetInstance()->DisableProcessing(); | |
3155 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
3156 | ||
3157 | // Move the L0 to L2. | |
3158 | { | |
3159 | CompactRangeOptions cro; | |
3160 | cro.change_level = true; | |
3161 | cro.target_level = 2; | |
3162 | ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); | |
3163 | } | |
3164 | ||
3165 | ASSERT_OK(db->Put(WriteOptions(), "b", "value1")); | |
3166 | ||
3167 | auto* snapshot = db->GetSnapshot(); | |
3168 | ||
3169 | // Bump seq so that a subsequent flush/compaction job's snapshot is larger | |
3170 | // than the above snapshot's seq. | |
3171 | ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare")); | |
3172 | ||
3173 | // Generate a second L0. | |
3174 | ASSERT_OK(db->Flush(FlushOptions())); | |
3175 | ||
3176 | SyncPoint::GetInstance()->SetCallBack( | |
3177 | "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) { | |
3178 | const auto* const ikey = | |
3179 | reinterpret_cast<const ParsedInternalKey*>(arg); | |
3180 | assert(ikey); | |
3181 | if (ikey->user_key == "b") { | |
3182 | assert(ikey->type == kTypeValue); | |
3183 | db->ReleaseSnapshot(snapshot); | |
3184 | ||
3185 | // Bump max_evicted_seq. | |
3186 | ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare")); | |
3187 | } | |
3188 | }); | |
3189 | SyncPoint::GetInstance()->EnableProcessing(); | |
3190 | ||
3191 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, | |
3192 | /*end=*/nullptr)); | |
3193 | ||
3194 | SyncPoint::GetInstance()->DisableProcessing(); | |
3195 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
3196 | } | |
3197 | ||
3198 | // Although the user-contract indicates that a SD can only be issued for a key | |
3199 | // that exists and has not been overwritten, it is still possible for a Delete | |
3200 | // to be present when write-prepared transaction is rolled back. | |
3201 | TEST_P(WritePreparedTransactionTest, SingleDeleteAfterRollback) { | |
3202 | constexpr size_t kSnapshotCacheBits = 7; // same as default | |
3203 | constexpr size_t kCommitCacheBits = 0; // minimum commit cache | |
3204 | txn_db_options.rollback_deletion_type_callback = | |
3205 | [](TransactionDB*, ColumnFamilyHandle*, const Slice&) { return true; }; | |
3206 | UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); | |
3207 | options.disable_auto_compactions = true; | |
3208 | ASSERT_OK(ReOpen()); | |
3209 | ||
3210 | // Get a write conflict snapshot by creating a transaction with | |
3211 | // set_snapshot=true. | |
3212 | TransactionOptions txn_opts; | |
3213 | txn_opts.set_snapshot = true; | |
3214 | std::unique_ptr<Transaction> dummy_txn( | |
3215 | db->BeginTransaction(WriteOptions(), txn_opts)); | |
3216 | ||
3217 | std::unique_ptr<Transaction> txn0( | |
3218 | db->BeginTransaction(WriteOptions(), TransactionOptions())); | |
3219 | ASSERT_OK(txn0->Put("foo", "value")); | |
3220 | ASSERT_OK(txn0->SetName("xid0")); | |
3221 | ASSERT_OK(txn0->Prepare()); | |
3222 | ||
3223 | // Create an SST with only {"foo": "value"}. | |
3224 | ASSERT_OK(db->Flush(FlushOptions())); | |
3225 | ||
3226 | // Insert a Delete to cancel out the prior Put by txn0. | |
3227 | ASSERT_OK(txn0->Rollback()); | |
3228 | txn0.reset(); | |
3229 | ||
3230 | // Create a second SST. | |
3231 | ASSERT_OK(db->Flush(FlushOptions())); | |
3232 | ||
3233 | ASSERT_OK(db->Put(WriteOptions(), "foo", "value1")); | |
3234 | ||
3235 | auto* snapshot = db->GetSnapshot(); | |
3236 | ||
3237 | ASSERT_OK(db->SingleDelete(WriteOptions(), "foo")); | |
3238 | ||
3239 | int count = 0; | |
3240 | SyncPoint::GetInstance()->DisableProcessing(); | |
3241 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
3242 | SyncPoint::GetInstance()->SetCallBack( | |
3243 | "CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) { | |
3244 | const auto* const c = reinterpret_cast<const Compaction*>(arg); | |
3245 | assert(!c); | |
3246 | // Trigger once only for SingleDelete during flush. | |
3247 | if (0 == count) { | |
3248 | ++count; | |
3249 | db->ReleaseSnapshot(snapshot); | |
3250 | // Bump max_evicted_seq | |
3251 | ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare")); | |
3252 | } | |
3253 | }); | |
3254 | SyncPoint::GetInstance()->EnableProcessing(); | |
3255 | ||
3256 | // Create a third SST containing a SD without its matching PUT. | |
3257 | ASSERT_OK(db->Flush(FlushOptions())); | |
3258 | ||
3259 | SyncPoint::GetInstance()->DisableProcessing(); | |
3260 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
3261 | SyncPoint::GetInstance()->EnableProcessing(); | |
3262 | ||
3263 | DBImpl* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB()); | |
3264 | assert(dbimpl); | |
3265 | ASSERT_OK(dbimpl->TEST_CompactRange( | |
3266 | /*level=*/0, /*begin=*/nullptr, /*end=*/nullptr, | |
3267 | /*column_family=*/nullptr, /*disallow_trivial_mode=*/true)); | |
3268 | ||
3269 | SyncPoint::GetInstance()->DisableProcessing(); | |
3270 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
3271 | ||
3272 | // Release the conflict-checking snapshot. | |
3273 | ASSERT_OK(dummy_txn->Rollback()); | |
3274 | } | |
3275 | ||
11fdf7f2 TL |
3276 | // A more complex test to verify compaction/flush should keep keys visible |
3277 | // to snapshots. | |
3278 | TEST_P(WritePreparedTransactionTest, | |
f67539c2 | 3279 | CompactionKeepSnapshotVisibleKeysRandomized) { |
11fdf7f2 TL |
3280 | constexpr size_t kNumTransactions = 10; |
3281 | constexpr size_t kNumIterations = 1000; | |
3282 | ||
3283 | std::vector<Transaction*> transactions(kNumTransactions, nullptr); | |
3284 | std::vector<size_t> versions(kNumTransactions, 0); | |
3285 | std::unordered_map<std::string, std::string> current_data; | |
3286 | std::vector<const Snapshot*> snapshots; | |
3287 | std::vector<std::unordered_map<std::string, std::string>> snapshot_data; | |
3288 | ||
3289 | Random rnd(1103); | |
3290 | options.disable_auto_compactions = true; | |
1e59de90 | 3291 | ASSERT_OK(ReOpen()); |
11fdf7f2 TL |
3292 | |
3293 | for (size_t i = 0; i < kNumTransactions; i++) { | |
1e59de90 | 3294 | std::string key = "key" + std::to_string(i); |
11fdf7f2 TL |
3295 | std::string value = "value0"; |
3296 | ASSERT_OK(db->Put(WriteOptions(), key, value)); | |
3297 | current_data[key] = value; | |
3298 | } | |
3299 | VerifyKeys(current_data); | |
3300 | ||
3301 | for (size_t iter = 0; iter < kNumIterations; iter++) { | |
3302 | auto r = rnd.Next() % (kNumTransactions + 1); | |
3303 | if (r < kNumTransactions) { | |
1e59de90 | 3304 | std::string key = "key" + std::to_string(r); |
11fdf7f2 | 3305 | if (transactions[r] == nullptr) { |
1e59de90 | 3306 | std::string value = "value" + std::to_string(versions[r] + 1); |
11fdf7f2 | 3307 | auto* txn = db->BeginTransaction(WriteOptions()); |
1e59de90 | 3308 | ASSERT_OK(txn->SetName("txn" + std::to_string(r))); |
11fdf7f2 TL |
3309 | ASSERT_OK(txn->Put(key, value)); |
3310 | ASSERT_OK(txn->Prepare()); | |
3311 | transactions[r] = txn; | |
3312 | } else { | |
1e59de90 | 3313 | std::string value = "value" + std::to_string(++versions[r]); |
11fdf7f2 TL |
3314 | ASSERT_OK(transactions[r]->Commit()); |
3315 | delete transactions[r]; | |
3316 | transactions[r] = nullptr; | |
3317 | current_data[key] = value; | |
3318 | } | |
3319 | } else { | |
3320 | auto* snapshot = db->GetSnapshot(); | |
3321 | VerifyKeys(current_data, snapshot); | |
3322 | snapshots.push_back(snapshot); | |
3323 | snapshot_data.push_back(current_data); | |
3324 | } | |
3325 | VerifyKeys(current_data); | |
3326 | } | |
3327 | // Take a last snapshot to test compaction with uncommitted prepared | |
3328 | // transaction. | |
3329 | snapshots.push_back(db->GetSnapshot()); | |
3330 | snapshot_data.push_back(current_data); | |
3331 | ||
1e59de90 | 3332 | ASSERT_EQ(snapshots.size(), snapshot_data.size()); |
11fdf7f2 TL |
3333 | for (size_t i = 0; i < snapshots.size(); i++) { |
3334 | VerifyKeys(snapshot_data[i], snapshots[i]); | |
3335 | } | |
3336 | ASSERT_OK(db->Flush(FlushOptions())); | |
3337 | for (size_t i = 0; i < snapshots.size(); i++) { | |
3338 | VerifyKeys(snapshot_data[i], snapshots[i]); | |
3339 | } | |
3340 | // Dummy keys to avoid compaction trivially move files and get around actual | |
3341 | // compaction logic. | |
3342 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
3343 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
3344 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
3345 | for (size_t i = 0; i < snapshots.size(); i++) { | |
3346 | VerifyKeys(snapshot_data[i], snapshots[i]); | |
3347 | } | |
3348 | // cleanup | |
3349 | for (size_t i = 0; i < kNumTransactions; i++) { | |
3350 | if (transactions[i] == nullptr) { | |
3351 | continue; | |
3352 | } | |
3353 | ASSERT_OK(transactions[i]->Commit()); | |
3354 | delete transactions[i]; | |
3355 | } | |
3356 | for (size_t i = 0; i < snapshots.size(); i++) { | |
3357 | db->ReleaseSnapshot(snapshots[i]); | |
3358 | } | |
3359 | } | |
3360 | ||
3361 | // Compaction should not apply the optimization to output key with sequence | |
3362 | // number equal to 0 if the key is not visible to earliest snapshot, based on | |
3363 | // commit sequence number. | |
3364 | TEST_P(WritePreparedTransactionTest, | |
3365 | CompactionShouldKeepSequenceForUncommittedKeys) { | |
3366 | options.disable_auto_compactions = true; | |
1e59de90 | 3367 | ASSERT_OK(ReOpen()); |
11fdf7f2 TL |
3368 | // Keep track of expected sequence number. |
3369 | SequenceNumber expected_seq = 0; | |
3370 | auto* transaction = db->BeginTransaction(WriteOptions()); | |
3371 | ASSERT_OK(transaction->SetName("txn")); | |
3372 | ASSERT_OK(transaction->Put("key1", "value1")); | |
3373 | ASSERT_OK(transaction->Prepare()); | |
3374 | ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); | |
3375 | SequenceNumber seq1 = expected_seq; | |
3376 | ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); | |
20effc67 | 3377 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
11fdf7f2 TL |
3378 | expected_seq++; // one for data |
3379 | if (options.two_write_queues) { | |
3380 | expected_seq++; // one for commit | |
3381 | } | |
3382 | ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); | |
3383 | ASSERT_OK(db->Flush(FlushOptions())); | |
3384 | // Dummy keys to avoid compaction trivially move files and get around actual | |
3385 | // compaction logic. | |
3386 | ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); | |
3387 | ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); | |
3388 | ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
3389 | VerifyKeys({ | |
3390 | {"key1", "NOT_FOUND"}, | |
3391 | {"key2", "value2"}, | |
3392 | }); | |
3393 | VerifyInternalKeys({ | |
3394 | // "key1" has not been committed. It keeps its sequence number. | |
3395 | {"key1", "value1", seq1, kTypeValue}, | |
3396 | // "key2" is committed and output with seq = 0. | |
3397 | {"key2", "value2", 0, kTypeValue}, | |
3398 | }); | |
3399 | ASSERT_OK(transaction->Commit()); | |
3400 | VerifyKeys({ | |
3401 | {"key1", "value1"}, | |
3402 | {"key2", "value2"}, | |
3403 | }); | |
3404 | delete transaction; | |
3405 | } | |
3406 | ||
494da23a TL |
3407 | TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) { |
3408 | options.disable_auto_compactions = true; | |
1e59de90 | 3409 | ASSERT_OK(ReOpen()); |
494da23a TL |
3410 | |
3411 | const Snapshot* snapshot = nullptr; | |
3412 | ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); | |
3413 | auto* txn = db->BeginTransaction(WriteOptions()); | |
3414 | ASSERT_OK(txn->SetName("txn")); | |
3415 | ASSERT_OK(txn->Put("key1", "value2")); | |
3416 | ASSERT_OK(txn->Prepare()); | |
3417 | ||
3418 | auto callback = [&](void*) { | |
3419 | // Snapshot is taken after compaction start. It should be taken into | |
3420 | // consideration for whether to compact out value1. | |
3421 | snapshot = db->GetSnapshot(); | |
3422 | ASSERT_OK(txn->Commit()); | |
3423 | delete txn; | |
3424 | }; | |
3425 | SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", | |
3426 | callback); | |
3427 | SyncPoint::GetInstance()->EnableProcessing(); | |
3428 | ASSERT_OK(db->Flush(FlushOptions())); | |
3429 | ASSERT_NE(nullptr, snapshot); | |
3430 | VerifyKeys({{"key1", "value2"}}); | |
3431 | VerifyKeys({{"key1", "value1"}}, snapshot); | |
3432 | db->ReleaseSnapshot(snapshot); | |
3433 | } | |
3434 | ||
11fdf7f2 TL |
3435 | TEST_P(WritePreparedTransactionTest, Iterate) { |
3436 | auto verify_state = [](Iterator* iter, const std::string& key, | |
3437 | const std::string& value) { | |
3438 | ASSERT_TRUE(iter->Valid()); | |
3439 | ASSERT_OK(iter->status()); | |
3440 | ASSERT_EQ(key, iter->key().ToString()); | |
3441 | ASSERT_EQ(value, iter->value().ToString()); | |
3442 | }; | |
3443 | ||
3444 | auto verify_iter = [&](const std::string& expected_val) { | |
3445 | // Get iterator from a concurrent transaction and make sure it has the | |
3446 | // same view as an iterator from the DB. | |
3447 | auto* txn = db->BeginTransaction(WriteOptions()); | |
3448 | ||
3449 | for (int i = 0; i < 2; i++) { | |
1e59de90 TL |
3450 | Iterator* iter = (i == 0) ? db->NewIterator(ReadOptions()) |
3451 | : txn->GetIterator(ReadOptions()); | |
11fdf7f2 TL |
3452 | // Seek |
3453 | iter->Seek("foo"); | |
3454 | verify_state(iter, "foo", expected_val); | |
3455 | // Next | |
3456 | iter->Seek("a"); | |
3457 | verify_state(iter, "a", "va"); | |
3458 | iter->Next(); | |
3459 | verify_state(iter, "foo", expected_val); | |
3460 | // SeekForPrev | |
3461 | iter->SeekForPrev("y"); | |
3462 | verify_state(iter, "foo", expected_val); | |
3463 | // Prev | |
3464 | iter->SeekForPrev("z"); | |
3465 | verify_state(iter, "z", "vz"); | |
3466 | iter->Prev(); | |
3467 | verify_state(iter, "foo", expected_val); | |
3468 | delete iter; | |
3469 | } | |
3470 | delete txn; | |
3471 | }; | |
3472 | ||
3473 | ASSERT_OK(db->Put(WriteOptions(), "foo", "v1")); | |
3474 | auto* transaction = db->BeginTransaction(WriteOptions()); | |
3475 | ASSERT_OK(transaction->SetName("txn")); | |
3476 | ASSERT_OK(transaction->Put("foo", "v2")); | |
3477 | ASSERT_OK(transaction->Prepare()); | |
3478 | VerifyKeys({{"foo", "v1"}}); | |
3479 | // dummy keys | |
3480 | ASSERT_OK(db->Put(WriteOptions(), "a", "va")); | |
3481 | ASSERT_OK(db->Put(WriteOptions(), "z", "vz")); | |
3482 | verify_iter("v1"); | |
3483 | ASSERT_OK(transaction->Commit()); | |
3484 | VerifyKeys({{"foo", "v2"}}); | |
3485 | verify_iter("v2"); | |
3486 | delete transaction; | |
3487 | } | |
3488 | ||
3489 | TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) { | |
3490 | Iterator* iter = db->NewIterator(ReadOptions()); | |
1e59de90 | 3491 | ASSERT_OK(iter->status()); |
11fdf7f2 TL |
3492 | ASSERT_TRUE(iter->Refresh().IsNotSupported()); |
3493 | delete iter; | |
3494 | } | |
3495 | ||
494da23a TL |
3496 | // Committing an delayed prepared has two non-atomic steps: update commit cache, |
3497 | // remove seq from delayed_prepared_. The read in IsInSnapshot also involves two | |
3498 | // non-atomic steps of checking these two data structures. This test breaks each | |
3499 | // in the middle to ensure correctness in spite of non-atomic execution. | |
3500 | // Note: This test is limitted to the case where snapshot is larger than the | |
3501 | // max_evicted_seq_. | |
3502 | TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) { | |
3503 | const size_t snapshot_cache_bits = 7; // same as default | |
3504 | const size_t commit_cache_bits = 3; // 8 entries | |
3505 | for (auto split_read : {true, false}) { | |
3506 | std::vector<bool> split_options = {false}; | |
3507 | if (split_read) { | |
3508 | // Also test for break before mutex | |
3509 | split_options.push_back(true); | |
3510 | } | |
3511 | for (auto split_before_mutex : split_options) { | |
3512 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 | 3513 | ASSERT_OK(ReOpen()); |
494da23a | 3514 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
20effc67 | 3515 | DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); |
494da23a TL |
3516 | // Fill up the commit cache |
3517 | std::string init_value("value1"); | |
3518 | for (int i = 0; i < 10; i++) { | |
1e59de90 | 3519 | ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value))); |
494da23a TL |
3520 | } |
3521 | // Prepare a transaction but do not commit it | |
3522 | Transaction* txn = | |
3523 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3524 | ASSERT_OK(txn->SetName("xid")); | |
3525 | ASSERT_OK(txn->Put(Slice("key1"), Slice("value2"))); | |
3526 | ASSERT_OK(txn->Prepare()); | |
3527 | // Commit a bunch of entries to advance max evicted seq and make the | |
3528 | // prepared a delayed prepared | |
3529 | for (int i = 0; i < 10; i++) { | |
1e59de90 | 3530 | ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3"))); |
494da23a TL |
3531 | } |
3532 | // The snapshot should not see the delayed prepared entry | |
3533 | auto snap = db->GetSnapshot(); | |
3534 | ||
3535 | if (split_read) { | |
3536 | if (split_before_mutex) { | |
3537 | // split before acquiring prepare_mutex_ | |
f67539c2 | 3538 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
494da23a TL |
3539 | {{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause", |
3540 | "AtomicCommitOfDelayedPrepared:Commit:before"}, | |
3541 | {"AtomicCommitOfDelayedPrepared:Commit:after", | |
3542 | "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}}); | |
3543 | } else { | |
3544 | // split right after reading from the commit cache | |
f67539c2 | 3545 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
494da23a TL |
3546 | {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause", |
3547 | "AtomicCommitOfDelayedPrepared:Commit:before"}, | |
3548 | {"AtomicCommitOfDelayedPrepared:Commit:after", | |
3549 | "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}}); | |
3550 | } | |
3551 | } else { // split commit | |
3552 | // split right before removing from delayed_prepared_ | |
f67539c2 | 3553 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
494da23a TL |
3554 | {{"WritePreparedTxnDB::RemovePrepared:pause", |
3555 | "AtomicCommitOfDelayedPrepared:Read:before"}, | |
3556 | {"AtomicCommitOfDelayedPrepared:Read:after", | |
3557 | "WritePreparedTxnDB::RemovePrepared:resume"}}); | |
3558 | } | |
3559 | SyncPoint::GetInstance()->EnableProcessing(); | |
3560 | ||
f67539c2 | 3561 | ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() { |
494da23a TL |
3562 | TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before"); |
3563 | ASSERT_OK(txn->Commit()); | |
3564 | if (split_before_mutex) { | |
3565 | // Do bunch of inserts to evict the commit entry from the cache. This | |
3566 | // would prevent the 2nd look into commit cache under prepare_mutex_ | |
3567 | // to see the commit entry. | |
3568 | auto seq = db_impl->TEST_GetLastVisibleSequence(); | |
3569 | size_t tries = 0; | |
3570 | while (wp_db->max_evicted_seq_ < seq && tries < 50) { | |
1e59de90 | 3571 | ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3"))); |
494da23a TL |
3572 | tries++; |
3573 | }; | |
3574 | ASSERT_LT(tries, 50); | |
3575 | } | |
3576 | TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after"); | |
3577 | delete txn; | |
3578 | }); | |
3579 | ||
f67539c2 | 3580 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { |
494da23a TL |
3581 | TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before"); |
3582 | ReadOptions roptions; | |
3583 | roptions.snapshot = snap; | |
3584 | PinnableSlice value; | |
3585 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value); | |
3586 | ASSERT_OK(s); | |
3587 | // It should not see the commit of delayed prepared | |
3588 | ASSERT_TRUE(value == init_value); | |
3589 | TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after"); | |
3590 | db->ReleaseSnapshot(snap); | |
3591 | }); | |
3592 | ||
3593 | read_thread.join(); | |
3594 | commit_thread.join(); | |
f67539c2 TL |
3595 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
3596 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
494da23a TL |
3597 | } // for split_before_mutex |
3598 | } // for split_read | |
3599 | } | |
3600 | ||
3601 | // When max evicted seq advances a prepared seq, it involves two updates: i) | |
3602 | // adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_. | |
3603 | // ::IsInSnapshot also reads these two values in a non-atomic way. This test | |
3604 | // ensures correctness if the update occurs after ::IsInSnapshot reads | |
3605 | // delayed_prepared_empty_ and before it reads max_evicted_seq_. | |
3606 | // Note: this test focuses on read snapshot larger than max_evicted_seq_. | |
3607 | TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) { | |
3608 | const size_t snapshot_cache_bits = 7; // same as default | |
3609 | const size_t commit_cache_bits = 3; // 8 entries | |
3610 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 | 3611 | ASSERT_OK(ReOpen()); |
494da23a TL |
3612 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
3613 | // Fill up the commit cache | |
3614 | std::string init_value("value1"); | |
3615 | for (int i = 0; i < 10; i++) { | |
1e59de90 | 3616 | ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value))); |
494da23a TL |
3617 | } |
3618 | // Prepare a transaction but do not commit it | |
3619 | Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3620 | ASSERT_OK(txn->SetName("xid")); | |
3621 | ASSERT_OK(txn->Put(Slice("key1"), Slice("value2"))); | |
3622 | ASSERT_OK(txn->Prepare()); | |
3623 | // Create a gap between prepare seq and snapshot seq | |
1e59de90 TL |
3624 | ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3"))); |
3625 | ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3"))); | |
494da23a TL |
3626 | // The snapshot should not see the delayed prepared entry |
3627 | auto snap = db->GetSnapshot(); | |
3628 | ASSERT_LT(txn->GetId(), snap->GetSequenceNumber()); | |
3629 | ||
3630 | // split right after reading delayed_prepared_empty_ | |
f67539c2 | 3631 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
494da23a TL |
3632 | {{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause", |
3633 | "AtomicUpdateOfDelayedPrepared:before"}, | |
3634 | {"AtomicUpdateOfDelayedPrepared:after", | |
3635 | "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}}); | |
3636 | SyncPoint::GetInstance()->EnableProcessing(); | |
3637 | ||
f67539c2 | 3638 | ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() { |
494da23a TL |
3639 | TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before"); |
3640 | // Commit a bunch of entries to advance max evicted seq and make the | |
3641 | // prepared a delayed prepared | |
3642 | size_t tries = 0; | |
3643 | while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) { | |
1e59de90 | 3644 | ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3"))); |
494da23a TL |
3645 | tries++; |
3646 | }; | |
3647 | ASSERT_LT(tries, 50); | |
3648 | // This is the case on which the test focuses | |
3649 | ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber()); | |
3650 | TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after"); | |
3651 | }); | |
3652 | ||
f67539c2 | 3653 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { |
494da23a TL |
3654 | ReadOptions roptions; |
3655 | roptions.snapshot = snap; | |
3656 | PinnableSlice value; | |
3657 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value); | |
3658 | ASSERT_OK(s); | |
3659 | // It should not see the uncommitted value of delayed prepared | |
3660 | ASSERT_TRUE(value == init_value); | |
3661 | db->ReleaseSnapshot(snap); | |
3662 | }); | |
3663 | ||
3664 | read_thread.join(); | |
3665 | commit_thread.join(); | |
3666 | ASSERT_OK(txn->Commit()); | |
3667 | delete txn; | |
f67539c2 TL |
3668 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
3669 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
494da23a TL |
3670 | } |
3671 | ||
3672 | // Eviction from commit cache and update of max evicted seq are two non-atomic | |
3673 | // steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading | |
3674 | // from commit cache are two non-atomic steps. This tests if the update occurs | |
3675 | // after reading max_evicted_seq_ and before reading the commit cache. | |
3676 | // Note: the test focuses on snapshot larger than max_evicted_seq_ | |
3677 | TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) { | |
3678 | const size_t snapshot_cache_bits = 7; // same as default | |
3679 | const size_t commit_cache_bits = 3; // 8 entries | |
3680 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 | 3681 | ASSERT_OK(ReOpen()); |
494da23a TL |
3682 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
3683 | // Fill up the commit cache | |
3684 | std::string init_value("value1"); | |
3685 | std::string last_value("value_final"); | |
3686 | for (int i = 0; i < 10; i++) { | |
1e59de90 | 3687 | ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value))); |
494da23a TL |
3688 | } |
3689 | // Do an uncommitted write to prevent min_uncommitted optimization | |
3690 | Transaction* txn1 = | |
3691 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3692 | ASSERT_OK(txn1->SetName("xid1")); | |
3693 | ASSERT_OK(txn1->Put(Slice("key0"), last_value)); | |
3694 | ASSERT_OK(txn1->Prepare()); | |
3695 | // Do a write with prepare to get the prepare seq | |
3696 | Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3697 | ASSERT_OK(txn->SetName("xid")); | |
3698 | ASSERT_OK(txn->Put(Slice("key1"), last_value)); | |
3699 | ASSERT_OK(txn->Prepare()); | |
3700 | ASSERT_OK(txn->Commit()); | |
3701 | // Create a gap between commit entry and snapshot seq | |
1e59de90 TL |
3702 | ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3"))); |
3703 | ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3"))); | |
494da23a TL |
3704 | // The snapshot should see the last commit |
3705 | auto snap = db->GetSnapshot(); | |
3706 | ASSERT_LE(txn->GetId(), snap->GetSequenceNumber()); | |
3707 | ||
3708 | // split right after reading max_evicted_seq_ | |
f67539c2 | 3709 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
494da23a TL |
3710 | {{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause", |
3711 | "NonAtomicUpdateOfMaxEvictedSeq:before"}, | |
3712 | {"NonAtomicUpdateOfMaxEvictedSeq:after", | |
3713 | "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}}); | |
3714 | SyncPoint::GetInstance()->EnableProcessing(); | |
3715 | ||
f67539c2 | 3716 | ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() { |
494da23a TL |
3717 | TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before"); |
3718 | // Commit a bunch of entries to advance max evicted seq beyond txn->GetId() | |
3719 | size_t tries = 0; | |
3720 | while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) { | |
1e59de90 | 3721 | ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3"))); |
494da23a TL |
3722 | tries++; |
3723 | }; | |
3724 | ASSERT_LT(tries, 50); | |
3725 | // This is the case on which the test focuses | |
3726 | ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber()); | |
3727 | TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after"); | |
3728 | }); | |
3729 | ||
f67539c2 | 3730 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { |
494da23a TL |
3731 | ReadOptions roptions; |
3732 | roptions.snapshot = snap; | |
3733 | PinnableSlice value; | |
3734 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value); | |
3735 | ASSERT_OK(s); | |
3736 | // It should see the committed value of the evicted entry | |
3737 | ASSERT_TRUE(value == last_value); | |
3738 | db->ReleaseSnapshot(snap); | |
3739 | }); | |
3740 | ||
3741 | read_thread.join(); | |
3742 | commit_thread.join(); | |
3743 | delete txn; | |
1e59de90 | 3744 | ASSERT_OK(txn1->Commit()); |
494da23a | 3745 | delete txn1; |
f67539c2 TL |
3746 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
3747 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
494da23a TL |
3748 | } |
3749 | ||
3750 | // Test when we add a prepared seq when the max_evicted_seq_ already goes beyond | |
3751 | // that. The test focuses on a race condition between AddPrepared and | |
3752 | // AdvanceMaxEvictedSeq functions. | |
3753 | TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) { | |
3754 | if (!options.two_write_queues) { | |
3755 | // This test is only for two write queues | |
3756 | return; | |
3757 | } | |
3758 | const size_t snapshot_cache_bits = 7; // same as default | |
3759 | // 1 entry to advance max after the 2nd commit | |
3760 | const size_t commit_cache_bits = 0; | |
3761 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 | 3762 | ASSERT_OK(ReOpen()); |
494da23a TL |
3763 | WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); |
3764 | std::string some_value("value_some"); | |
3765 | std::string uncommitted_value("value_uncommitted"); | |
3766 | // Prepare two uncommitted transactions | |
3767 | Transaction* txn1 = | |
3768 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3769 | ASSERT_OK(txn1->SetName("xid1")); | |
3770 | ASSERT_OK(txn1->Put(Slice("key1"), some_value)); | |
3771 | ASSERT_OK(txn1->Prepare()); | |
3772 | Transaction* txn2 = | |
3773 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3774 | ASSERT_OK(txn2->SetName("xid2")); | |
3775 | ASSERT_OK(txn2->Put(Slice("key2"), some_value)); | |
3776 | ASSERT_OK(txn2->Prepare()); | |
3777 | // Start the txn here so the other thread could get its id | |
3778 | Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3779 | ASSERT_OK(txn->SetName("xid")); | |
3780 | ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value)); | |
3781 | port::Mutex txn_mutex_; | |
3782 | ||
f67539c2 TL |
3783 | // t1) Insert prepared entry, t2) commit other entries to advance max |
3784 | // evicted sec and finish checking the existing prepared entries, t1) | |
494da23a | 3785 | // AddPrepared, t2) update max_evicted_seq_ |
f67539c2 TL |
3786 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
3787 | {"AddPreparedCallback::AddPrepared::begin:pause", | |
3788 | "AddPreparedBeforeMax::read_thread:start"}, | |
3789 | {"AdvanceMaxEvictedSeq::update_max:pause", | |
3790 | "AddPreparedCallback::AddPrepared::begin:resume"}, | |
3791 | {"AddPreparedCallback::AddPrepared::end", | |
3792 | "AdvanceMaxEvictedSeq::update_max:resume"}, | |
494da23a TL |
3793 | }); |
3794 | SyncPoint::GetInstance()->EnableProcessing(); | |
3795 | ||
f67539c2 | 3796 | ROCKSDB_NAMESPACE::port::Thread write_thread([&]() { |
494da23a TL |
3797 | txn_mutex_.Lock(); |
3798 | ASSERT_OK(txn->Prepare()); | |
3799 | txn_mutex_.Unlock(); | |
3800 | }); | |
3801 | ||
f67539c2 | 3802 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { |
494da23a TL |
3803 | TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start"); |
3804 | // Publish seq number with a commit | |
3805 | ASSERT_OK(txn1->Commit()); | |
3806 | // Since the commit cache size is one the 2nd commit evict the 1st one and | |
3807 | // invokes AdcanceMaxEvictedSeq | |
3808 | ASSERT_OK(txn2->Commit()); | |
3809 | ||
3810 | ReadOptions roptions; | |
3811 | PinnableSlice value; | |
3812 | // The snapshot should not see the uncommitted value from write_thread | |
3813 | auto snap = db->GetSnapshot(); | |
3814 | ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber()); | |
3815 | // This is the scenario that we test for | |
3816 | txn_mutex_.Lock(); | |
3817 | ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId()); | |
3818 | txn_mutex_.Unlock(); | |
3819 | roptions.snapshot = snap; | |
3820 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value); | |
3821 | ASSERT_TRUE(s.IsNotFound()); | |
3822 | db->ReleaseSnapshot(snap); | |
3823 | }); | |
3824 | ||
3825 | read_thread.join(); | |
3826 | write_thread.join(); | |
3827 | delete txn1; | |
3828 | delete txn2; | |
3829 | ASSERT_OK(txn->Commit()); | |
3830 | delete txn; | |
f67539c2 TL |
3831 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
3832 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
494da23a TL |
3833 | } |
3834 | ||
3835 | // When an old prepared entry gets committed, there is a gap between the time | |
3836 | // that it is published and when it is cleaned up from old_prepared_. This test | |
3837 | // stresses such cases. | |
3838 | TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) { | |
3839 | const size_t snapshot_cache_bits = 7; // same as default | |
3840 | for (const size_t commit_cache_bits : {0, 2, 3}) { | |
3841 | for (const size_t sub_batch_cnt : {1, 2, 3}) { | |
3842 | UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); | |
1e59de90 | 3843 | ASSERT_OK(ReOpen()); |
494da23a TL |
3844 | std::atomic<const Snapshot*> snap = {nullptr}; |
3845 | std::atomic<SequenceNumber> exp_prepare = {0}; | |
f67539c2 | 3846 | ROCKSDB_NAMESPACE::port::Thread callback_thread; |
494da23a TL |
3847 | // Value is synchronized via snap |
3848 | PinnableSlice value; | |
3849 | // Take a snapshot after publish and before RemovePrepared:Start | |
f67539c2 TL |
3850 | auto snap_callback = [&]() { |
3851 | ASSERT_EQ(nullptr, snap.load()); | |
3852 | snap.store(db->GetSnapshot()); | |
3853 | ReadOptions roptions; | |
3854 | roptions.snapshot = snap.load(); | |
3855 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key2", &value); | |
3856 | ASSERT_OK(s); | |
3857 | }; | |
494da23a TL |
3858 | auto callback = [&](void* param) { |
3859 | SequenceNumber prep_seq = *((SequenceNumber*)param); | |
3860 | if (prep_seq == exp_prepare.load()) { // only for write_thread | |
f67539c2 TL |
3861 | // We need to spawn a thread to avoid deadlock since getting a |
3862 | // snpashot might end up calling AdvanceSeqByOne which needs joining | |
3863 | // the write queue. | |
3864 | callback_thread = ROCKSDB_NAMESPACE::port::Thread(snap_callback); | |
3865 | TEST_SYNC_POINT("callback:end"); | |
494da23a TL |
3866 | } |
3867 | }; | |
f67539c2 TL |
3868 | // Wait for the first snapshot be taken in GetSnapshotInternal. Although |
3869 | // it might be updated before GetSnapshotInternal finishes but this should | |
3870 | // cover most of the cases. | |
3871 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ | |
3872 | {"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"}, | |
3873 | }); | |
494da23a TL |
3874 | SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback); |
3875 | SyncPoint::GetInstance()->EnableProcessing(); | |
3876 | // Thread to cause frequent evictions | |
f67539c2 | 3877 | ROCKSDB_NAMESPACE::port::Thread eviction_thread([&]() { |
494da23a TL |
3878 | // Too many txns might cause commit_seq - prepare_seq in another thread |
3879 | // to go beyond DELTA_UPPERBOUND | |
3880 | for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) { | |
1e59de90 | 3881 | ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice("value1"))); |
494da23a TL |
3882 | } |
3883 | }); | |
f67539c2 | 3884 | ROCKSDB_NAMESPACE::port::Thread write_thread([&]() { |
494da23a TL |
3885 | for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) { |
3886 | Transaction* txn = | |
3887 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3888 | ASSERT_OK(txn->SetName("xid")); | |
1e59de90 | 3889 | std::string val_str = "value" + std::to_string(i); |
494da23a | 3890 | for (size_t b = 0; b < sub_batch_cnt; b++) { |
f67539c2 | 3891 | ASSERT_OK(txn->Put(Slice("key2"), val_str)); |
494da23a TL |
3892 | } |
3893 | ASSERT_OK(txn->Prepare()); | |
3894 | // Let an eviction to kick in | |
3895 | std::this_thread::yield(); | |
3896 | ||
3897 | exp_prepare.store(txn->GetId()); | |
3898 | ASSERT_OK(txn->Commit()); | |
3899 | delete txn; | |
f67539c2 TL |
3900 | // Wait for the snapshot taking that is triggered by |
3901 | // RemovePrepared:Start callback | |
3902 | callback_thread.join(); | |
494da23a TL |
3903 | |
3904 | // Read with the snapshot taken before delayed_prepared_ cleanup | |
3905 | ReadOptions roptions; | |
3906 | roptions.snapshot = snap.load(); | |
3907 | ASSERT_NE(nullptr, roptions.snapshot); | |
3908 | PinnableSlice value2; | |
f67539c2 TL |
3909 | auto s = |
3910 | db->Get(roptions, db->DefaultColumnFamily(), "key2", &value2); | |
494da23a TL |
3911 | ASSERT_OK(s); |
3912 | // It should see its own write | |
3913 | ASSERT_TRUE(val_str == value2); | |
3914 | // The value read by snapshot should not change | |
3915 | ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str()); | |
3916 | ||
3917 | db->ReleaseSnapshot(roptions.snapshot); | |
3918 | snap.store(nullptr); | |
3919 | } | |
3920 | }); | |
3921 | write_thread.join(); | |
3922 | eviction_thread.join(); | |
f67539c2 TL |
3923 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
3924 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
494da23a | 3925 | } |
494da23a TL |
3926 | } |
3927 | } | |
3928 | ||
11fdf7f2 TL |
3929 | // Test that updating the commit map will not affect the existing snapshots |
3930 | TEST_P(WritePreparedTransactionTest, AtomicCommit) { | |
3931 | for (bool skip_prepare : {true, false}) { | |
f67539c2 | 3932 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
11fdf7f2 TL |
3933 | {"WritePreparedTxnDB::AddCommitted:start", |
3934 | "AtomicCommit::GetSnapshot:start"}, | |
3935 | {"AtomicCommit::Get:end", | |
3936 | "WritePreparedTxnDB::AddCommitted:start:pause"}, | |
3937 | {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"}, | |
3938 | {"AtomicCommit::Get2:end", | |
3939 | "WritePreparedTxnDB::AddCommitted:end:pause:"}, | |
3940 | }); | |
f67539c2 TL |
3941 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
3942 | ROCKSDB_NAMESPACE::port::Thread write_thread([&]() { | |
11fdf7f2 | 3943 | if (skip_prepare) { |
1e59de90 | 3944 | ASSERT_OK(db->Put(WriteOptions(), Slice("key"), Slice("value"))); |
11fdf7f2 TL |
3945 | } else { |
3946 | Transaction* txn = | |
3947 | db->BeginTransaction(WriteOptions(), TransactionOptions()); | |
3948 | ASSERT_OK(txn->SetName("xid")); | |
3949 | ASSERT_OK(txn->Put(Slice("key"), Slice("value"))); | |
3950 | ASSERT_OK(txn->Prepare()); | |
3951 | ASSERT_OK(txn->Commit()); | |
3952 | delete txn; | |
3953 | } | |
3954 | }); | |
f67539c2 | 3955 | ROCKSDB_NAMESPACE::port::Thread read_thread([&]() { |
11fdf7f2 TL |
3956 | ReadOptions roptions; |
3957 | TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start"); | |
3958 | roptions.snapshot = db->GetSnapshot(); | |
3959 | PinnableSlice val; | |
3960 | auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val); | |
3961 | TEST_SYNC_POINT("AtomicCommit::Get:end"); | |
3962 | TEST_SYNC_POINT("AtomicCommit::Get2:start"); | |
3963 | ASSERT_SAME(roptions, db, s, val, "key"); | |
3964 | TEST_SYNC_POINT("AtomicCommit::Get2:end"); | |
3965 | db->ReleaseSnapshot(roptions.snapshot); | |
3966 | }); | |
3967 | read_thread.join(); | |
3968 | write_thread.join(); | |
f67539c2 | 3969 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
11fdf7f2 TL |
3970 | } |
3971 | } | |
3972 | ||
1e59de90 TL |
3973 | TEST_P(WritePreparedTransactionTest, BasicRollbackDeletionTypeCb) { |
3974 | options.level0_file_num_compaction_trigger = 2; | |
3975 | // Always use SingleDelete to rollback Put. | |
3976 | txn_db_options.rollback_deletion_type_callback = | |
3977 | [](TransactionDB*, ColumnFamilyHandle*, const Slice&) { return true; }; | |
3978 | ||
3979 | const auto write_to_db = [&]() { | |
3980 | assert(db); | |
3981 | std::unique_ptr<Transaction> txn0( | |
3982 | db->BeginTransaction(WriteOptions(), TransactionOptions())); | |
3983 | ASSERT_OK(txn0->SetName("txn0")); | |
3984 | ASSERT_OK(txn0->Put("a", "v0")); | |
3985 | ASSERT_OK(txn0->Prepare()); | |
3986 | ||
3987 | // Generate sst1: [PUT('a')] | |
3988 | ASSERT_OK(db->Flush(FlushOptions())); | |
3989 | ||
3990 | { | |
3991 | CompactRangeOptions cro; | |
3992 | cro.change_level = true; | |
3993 | cro.target_level = options.num_levels - 1; | |
3994 | cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; | |
3995 | ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); | |
3996 | } | |
3997 | ||
3998 | ASSERT_OK(txn0->Rollback()); | |
3999 | txn0.reset(); | |
4000 | ||
4001 | ASSERT_OK(db->Put(WriteOptions(), "a", "v1")); | |
4002 | ||
4003 | ASSERT_OK(db->SingleDelete(WriteOptions(), "a")); | |
4004 | // Generate another SST with a SD to cover the oldest PUT('a') | |
4005 | ASSERT_OK(db->Flush(FlushOptions())); | |
4006 | ||
4007 | auto* dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB()); | |
4008 | assert(dbimpl); | |
4009 | ASSERT_OK(dbimpl->TEST_WaitForCompact()); | |
4010 | ||
4011 | { | |
4012 | CompactRangeOptions cro; | |
4013 | cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; | |
4014 | ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); | |
4015 | } | |
4016 | ||
4017 | { | |
4018 | std::string value; | |
4019 | const Status s = db->Get(ReadOptions(), "a", &value); | |
4020 | ASSERT_TRUE(s.IsNotFound()); | |
4021 | } | |
4022 | }; | |
4023 | ||
4024 | // Destroy and reopen | |
4025 | ASSERT_OK(ReOpen()); | |
4026 | write_to_db(); | |
4027 | } | |
4028 | ||
11fdf7f2 TL |
4029 | // Test that we can change write policy from WriteCommitted to WritePrepared |
4030 | // after a clean shutdown (which would empty the WAL) | |
4031 | TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) { | |
4032 | bool empty_wal = true; | |
4033 | CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal); | |
4034 | } | |
4035 | ||
4036 | // Test that we fail fast if WAL is not emptied between changing the write | |
4037 | // policy from WriteCommitted to WritePrepared | |
4038 | TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) { | |
4039 | bool empty_wal = true; | |
4040 | CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal); | |
4041 | } | |
4042 | ||
4043 | // Test that we can change write policy from WritePrepare back to WriteCommitted | |
4044 | // after a clean shutdown (which would empty the WAL) | |
4045 | TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) { | |
4046 | bool empty_wal = true; | |
4047 | CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal); | |
4048 | } | |
4049 | ||
4050 | // Test that we fail fast if WAL is not emptied between changing the write | |
4051 | // policy from WriteCommitted to WritePrepared | |
4052 | TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) { | |
4053 | bool empty_wal = true; | |
4054 | CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal); | |
4055 | } | |
4056 | ||
f67539c2 | 4057 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 TL |
4058 | |
4059 | int main(int argc, char** argv) { | |
1e59de90 | 4060 | ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); |
11fdf7f2 | 4061 | ::testing::InitGoogleTest(&argc, argv); |
1e59de90 TL |
4062 | if (getenv("CIRCLECI")) { |
4063 | // Looking for backtrace on "Resource temporarily unavailable" exceptions | |
4064 | ::testing::FLAGS_gtest_catch_exceptions = false; | |
4065 | } | |
11fdf7f2 TL |
4066 | return RUN_ALL_TESTS(); |
4067 | } | |
4068 | ||
4069 | #else | |
4070 | #include <stdio.h> | |
4071 | ||
4072 | int main(int /*argc*/, char** /*argv*/) { | |
4073 | fprintf(stderr, | |
4074 | "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n"); | |
4075 | return 0; | |
4076 | } | |
4077 | ||
4078 | #endif // ROCKSDB_LITE |