]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #pragma once | |
7 | #ifndef ROCKSDB_LITE | |
8 | ||
f67539c2 | 9 | #include <cinttypes> |
11fdf7f2 TL |
10 | #include <mutex> |
11 | #include <queue> | |
12 | #include <set> | |
13 | #include <string> | |
14 | #include <unordered_map> | |
15 | #include <vector> | |
16 | ||
17 | #include "db/db_iter.h" | |
18 | #include "db/pre_release_callback.h" | |
19 | #include "db/read_callback.h" | |
20 | #include "db/snapshot_checker.h" | |
21 | #include "rocksdb/db.h" | |
22 | #include "rocksdb/options.h" | |
23 | #include "rocksdb/utilities/transaction_db.h" | |
f67539c2 | 24 | #include "util/cast_util.h" |
11fdf7f2 TL |
25 | #include "util/set_comparator.h" |
26 | #include "util/string_util.h" | |
27 | #include "utilities/transactions/pessimistic_transaction.h" | |
28 | #include "utilities/transactions/pessimistic_transaction_db.h" | |
11fdf7f2 TL |
29 | #include "utilities/transactions/write_prepared_txn.h" |
30 | ||
f67539c2 TL |
31 | namespace ROCKSDB_NAMESPACE { |
32 | enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot }; | |
11fdf7f2 | 33 | |
11fdf7f2 TL |
34 | // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. |
35 | // In this way some data in the DB might not be committed. The DB provides | |
36 | // mechanisms to tell such data apart from committed data. | |
37 | class WritePreparedTxnDB : public PessimisticTransactionDB { | |
38 | public: | |
494da23a TL |
39 | explicit WritePreparedTxnDB(DB* db, |
40 | const TransactionDBOptions& txn_db_options) | |
11fdf7f2 | 41 | : PessimisticTransactionDB(db, txn_db_options), |
494da23a | 42 | SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), |
11fdf7f2 | 43 | SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), |
494da23a | 44 | COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), |
11fdf7f2 TL |
45 | COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), |
46 | FORMAT(COMMIT_CACHE_BITS) { | |
47 | Init(txn_db_options); | |
48 | } | |
49 | ||
494da23a TL |
50 | explicit WritePreparedTxnDB(StackableDB* db, |
51 | const TransactionDBOptions& txn_db_options) | |
11fdf7f2 | 52 | : PessimisticTransactionDB(db, txn_db_options), |
494da23a | 53 | SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), |
11fdf7f2 | 54 | SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), |
494da23a | 55 | COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), |
11fdf7f2 TL |
56 | COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), |
57 | FORMAT(COMMIT_CACHE_BITS) { | |
58 | Init(txn_db_options); | |
59 | } | |
60 | ||
61 | virtual ~WritePreparedTxnDB(); | |
62 | ||
63 | virtual Status Initialize( | |
64 | const std::vector<size_t>& compaction_enabled_cf_indices, | |
65 | const std::vector<ColumnFamilyHandle*>& handles) override; | |
66 | ||
67 | Transaction* BeginTransaction(const WriteOptions& write_options, | |
68 | const TransactionOptions& txn_options, | |
69 | Transaction* old_txn) override; | |
70 | ||
f67539c2 TL |
71 | using TransactionDB::Write; |
72 | Status Write(const WriteOptions& opts, WriteBatch* updates) override; | |
73 | ||
11fdf7f2 TL |
74 | // Optimized version of ::Write that receives more optimization request such |
75 | // as skip_concurrency_control. | |
76 | using PessimisticTransactionDB::Write; | |
77 | Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&, | |
78 | WriteBatch* updates) override; | |
79 | ||
80 | // Write the batch to the underlying DB and mark it as committed. Could be | |
81 | // used by both directly from TxnDB or through a transaction. | |
82 | Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch, | |
83 | size_t batch_cnt, WritePreparedTxn* txn); | |
84 | ||
85 | using DB::Get; | |
86 | virtual Status Get(const ReadOptions& options, | |
87 | ColumnFamilyHandle* column_family, const Slice& key, | |
88 | PinnableSlice* value) override; | |
89 | ||
90 | using DB::MultiGet; | |
91 | virtual std::vector<Status> MultiGet( | |
92 | const ReadOptions& options, | |
93 | const std::vector<ColumnFamilyHandle*>& column_family, | |
94 | const std::vector<Slice>& keys, | |
95 | std::vector<std::string>* values) override; | |
96 | ||
97 | using DB::NewIterator; | |
98 | virtual Iterator* NewIterator(const ReadOptions& options, | |
99 | ColumnFamilyHandle* column_family) override; | |
100 | ||
101 | using DB::NewIterators; | |
102 | virtual Status NewIterators( | |
103 | const ReadOptions& options, | |
104 | const std::vector<ColumnFamilyHandle*>& column_families, | |
105 | std::vector<Iterator*>* iterators) override; | |
106 | ||
11fdf7f2 TL |
107 | // Check whether the transaction that wrote the value with sequence number seq |
108 | // is visible to the snapshot with sequence number snapshot_seq. | |
109 | // Returns true if commit_seq <= snapshot_seq | |
494da23a TL |
110 | // If the snapshot_seq is already released and snapshot_seq <= max, sets |
111 | // *snap_released to true and returns true as well. | |
11fdf7f2 | 112 | inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq, |
494da23a TL |
113 | uint64_t min_uncommitted = kMinUnCommittedSeq, |
114 | bool* snap_released = nullptr) const { | |
11fdf7f2 TL |
115 | ROCKS_LOG_DETAILS(info_log_, |
116 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
117 | " min_uncommitted %" PRIu64, | |
118 | prep_seq, snapshot_seq, min_uncommitted); | |
494da23a TL |
119 | assert(min_uncommitted >= kMinUnCommittedSeq); |
120 | // Caller is responsible to initialize snap_released. | |
121 | assert(snap_released == nullptr || *snap_released == false); | |
11fdf7f2 TL |
122 | // Here we try to infer the return value without looking into prepare list. |
123 | // This would help avoiding synchronization over a shared map. | |
124 | // TODO(myabandeh): optimize this. This sequence of checks must be correct | |
125 | // but not necessary efficient | |
126 | if (prep_seq == 0) { | |
127 | // Compaction will output keys to bottom-level with sequence number 0 if | |
128 | // it is visible to the earliest snapshot. | |
129 | ROCKS_LOG_DETAILS( | |
130 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
131 | prep_seq, snapshot_seq, 1); | |
132 | return true; | |
133 | } | |
134 | if (snapshot_seq < prep_seq) { | |
135 | // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq | |
136 | ROCKS_LOG_DETAILS( | |
137 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
138 | prep_seq, snapshot_seq, 0); | |
139 | return false; | |
140 | } | |
11fdf7f2 TL |
141 | if (prep_seq < min_uncommitted) { |
142 | ROCKS_LOG_DETAILS(info_log_, | |
143 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
144 | " returns %" PRId32 | |
145 | " because of min_uncommitted %" PRIu64, | |
146 | prep_seq, snapshot_seq, 1, min_uncommitted); | |
147 | return true; | |
148 | } | |
494da23a TL |
149 | // Commit of delayed prepared has two non-atomic steps: add to commit cache, |
150 | // remove from delayed prepared. Our reads from these two is also | |
151 | // non-atomic. By looking into commit cache first thus we might not find the | |
152 | // prep_seq neither in commit cache not in delayed_prepared_. To fix that i) | |
153 | // we check if there was any delayed prepared BEFORE looking into commit | |
154 | // cache, ii) if there was, we complete the search steps to be these: i) | |
155 | // commit cache, ii) delayed prepared, commit cache again. In this way if | |
156 | // the first query to commit cache missed the commit, the 2nd will catch it. | |
157 | bool was_empty; | |
158 | SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub; | |
11fdf7f2 | 159 | CommitEntry64b dont_care; |
494da23a TL |
160 | auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; |
161 | size_t repeats = 0; | |
162 | do { | |
163 | repeats++; | |
164 | assert(repeats < 100); | |
165 | if (UNLIKELY(repeats >= 100)) { | |
166 | throw std::runtime_error( | |
167 | "The read was intrupted 100 times by update to max_evicted_seq_. " | |
168 | "This is unexpected in all setups"); | |
169 | } | |
170 | max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire); | |
171 | TEST_SYNC_POINT( | |
172 | "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause"); | |
173 | TEST_SYNC_POINT( | |
174 | "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"); | |
175 | was_empty = delayed_prepared_empty_.load(std::memory_order_acquire); | |
176 | TEST_SYNC_POINT( | |
177 | "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause"); | |
178 | TEST_SYNC_POINT( | |
179 | "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"); | |
180 | CommitEntry cached; | |
181 | bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); | |
182 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause"); | |
183 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"); | |
184 | if (exist && prep_seq == cached.prep_seq) { | |
185 | // It is committed and also not evicted from commit cache | |
186 | ROCKS_LOG_DETAILS( | |
187 | info_log_, | |
188 | "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
189 | prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); | |
190 | return cached.commit_seq <= snapshot_seq; | |
191 | } | |
192 | // else it could be committed but not inserted in the map which could | |
193 | // happen after recovery, or it could be committed and evicted by another | |
194 | // commit, or never committed. | |
195 | ||
20effc67 | 196 | // At this point we don't know if it was committed or it is still prepared |
494da23a TL |
197 | max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire); |
198 | if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) { | |
199 | continue; | |
200 | } | |
201 | // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub | |
202 | if (max_evicted_seq_ub < prep_seq) { | |
203 | // Not evicted from cache and also not present, so must be still | |
204 | // prepared | |
205 | ROCKS_LOG_DETAILS(info_log_, | |
206 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
207 | " returns %" PRId32, | |
208 | prep_seq, snapshot_seq, 0); | |
209 | return false; | |
210 | } | |
211 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause"); | |
212 | TEST_SYNC_POINT( | |
213 | "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"); | |
214 | if (!was_empty) { | |
215 | // We should not normally reach here | |
216 | WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD); | |
217 | ReadLock rl(&prepared_mutex_); | |
218 | ROCKS_LOG_WARN( | |
219 | info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64, | |
220 | static_cast<uint64_t>(delayed_prepared_.size()), prep_seq); | |
221 | if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { | |
222 | // This is the order: 1) delayed_prepared_commits_ update, 2) publish | |
223 | // 3) delayed_prepared_ clean up. So check if it is the case of a late | |
224 | // clenaup. | |
225 | auto it = delayed_prepared_commits_.find(prep_seq); | |
226 | if (it == delayed_prepared_commits_.end()) { | |
227 | // Then it is not committed yet | |
228 | ROCKS_LOG_DETAILS(info_log_, | |
229 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
230 | " returns %" PRId32, | |
231 | prep_seq, snapshot_seq, 0); | |
232 | return false; | |
233 | } else { | |
234 | ROCKS_LOG_DETAILS(info_log_, | |
235 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
236 | " commit: %" PRIu64 " returns %" PRId32, | |
237 | prep_seq, snapshot_seq, it->second, | |
238 | snapshot_seq <= it->second); | |
239 | return it->second <= snapshot_seq; | |
240 | } | |
241 | } else { | |
242 | // 2nd query to commit cache. Refer to was_empty comment above. | |
243 | exist = GetCommitEntry(indexed_seq, &dont_care, &cached); | |
244 | if (exist && prep_seq == cached.prep_seq) { | |
245 | ROCKS_LOG_DETAILS( | |
246 | info_log_, | |
247 | "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
248 | prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); | |
249 | return cached.commit_seq <= snapshot_seq; | |
250 | } | |
251 | max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire); | |
252 | } | |
253 | } | |
254 | } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)); | |
11fdf7f2 TL |
255 | // When advancing max_evicted_seq_, we move older entires from prepared to |
256 | // delayed_prepared_. Also we move evicted entries from commit cache to | |
257 | // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= | |
258 | // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in | |
259 | // old_commit_map_, iii) committed with no conflict with any snapshot. Case | |
260 | // (i) delayed_prepared_ is checked above | |
494da23a | 261 | if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case |
11fdf7f2 TL |
262 | // only (iii) is the case: committed |
263 | // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < | |
264 | // snapshot_seq | |
265 | ROCKS_LOG_DETAILS( | |
266 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
267 | prep_seq, snapshot_seq, 1); | |
268 | return true; | |
269 | } | |
270 | // else (ii) might be the case: check the commit data saved for this | |
271 | // snapshot. If there was no overlapping commit entry, then it is committed | |
272 | // with a commit_seq lower than any live snapshot, including snapshot_seq. | |
273 | if (old_commit_map_empty_.load(std::memory_order_acquire)) { | |
494da23a TL |
274 | ROCKS_LOG_DETAILS(info_log_, |
275 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
276 | " returns %" PRId32 " released=1", | |
277 | prep_seq, snapshot_seq, 0); | |
278 | assert(snap_released); | |
279 | // This snapshot is not valid anymore. We cannot tell if prep_seq is | |
280 | // committed before or after the snapshot. Return true but also set | |
281 | // snap_released to true. | |
282 | *snap_released = true; | |
11fdf7f2 TL |
283 | return true; |
284 | } | |
285 | { | |
286 | // We should not normally reach here unless sapshot_seq is old. This is a | |
287 | // rare case and it is ok to pay the cost of mutex ReadLock for such old, | |
288 | // reading transactions. | |
289 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |
11fdf7f2 TL |
290 | ReadLock rl(&old_commit_map_mutex_); |
291 | auto prep_set_entry = old_commit_map_.find(snapshot_seq); | |
292 | bool found = prep_set_entry != old_commit_map_.end(); | |
293 | if (found) { | |
294 | auto& vec = prep_set_entry->second; | |
295 | found = std::binary_search(vec.begin(), vec.end(), prep_seq); | |
494da23a TL |
296 | } else { |
297 | // coming from compaction | |
298 | ROCKS_LOG_DETAILS(info_log_, | |
299 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
300 | " returns %" PRId32 " released=1", | |
301 | prep_seq, snapshot_seq, 0); | |
302 | // This snapshot is not valid anymore. We cannot tell if prep_seq is | |
303 | // committed before or after the snapshot. Return true but also set | |
304 | // snap_released to true. | |
305 | assert(snap_released); | |
306 | *snap_released = true; | |
307 | return true; | |
11fdf7f2 | 308 | } |
494da23a | 309 | |
11fdf7f2 TL |
310 | if (!found) { |
311 | ROCKS_LOG_DETAILS(info_log_, | |
312 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
313 | " returns %" PRId32, | |
314 | prep_seq, snapshot_seq, 1); | |
315 | return true; | |
316 | } | |
317 | } | |
318 | // (ii) it the case: it is committed but after the snapshot_seq | |
319 | ROCKS_LOG_DETAILS( | |
320 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
321 | prep_seq, snapshot_seq, 0); | |
322 | return false; | |
323 | } | |
324 | ||
494da23a TL |
325 | // Add the transaction with prepare sequence seq to the prepared list. |
326 | // Note: must be called serially with increasing seq on each call. | |
f67539c2 TL |
327 | // locked is true if prepared_mutex_ is already locked. |
328 | void AddPrepared(uint64_t seq, bool locked = false); | |
494da23a TL |
329 | // Check if any of the prepared txns are less than new max_evicted_seq_. Must |
330 | // be called with prepared_mutex_ write locked. | |
f67539c2 | 331 | void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked); |
11fdf7f2 TL |
332 | // Remove the transaction with prepare sequence seq from the prepared list |
333 | void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1); | |
334 | // Add the transaction with prepare sequence prepare_seq and commit sequence | |
335 | // commit_seq to the commit map. loop_cnt is to detect infinite loops. | |
494da23a | 336 | // Note: must be called serially. |
11fdf7f2 TL |
337 | void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, |
338 | uint8_t loop_cnt = 0); | |
339 | ||
340 | struct CommitEntry { | |
341 | uint64_t prep_seq; | |
342 | uint64_t commit_seq; | |
343 | CommitEntry() : prep_seq(0), commit_seq(0) {} | |
344 | CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} | |
345 | bool operator==(const CommitEntry& rhs) const { | |
346 | return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; | |
347 | } | |
348 | }; | |
349 | ||
350 | struct CommitEntry64bFormat { | |
351 | explicit CommitEntry64bFormat(size_t index_bits) | |
352 | : INDEX_BITS(index_bits), | |
353 | PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)), | |
354 | COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)), | |
355 | COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)), | |
356 | DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {} | |
357 | // Number of higher bits of a sequence number that is not used. They are | |
358 | // used to encode the value type, ... | |
359 | const size_t PAD_BITS = static_cast<size_t>(8); | |
360 | // Number of lower bits from prepare seq that can be skipped as they are | |
361 | // implied by the index of the entry in the array | |
362 | const size_t INDEX_BITS; | |
363 | // Number of bits we use to encode the prepare seq | |
364 | const size_t PREP_BITS; | |
365 | // Number of bits we use to encode the commit seq. | |
366 | const size_t COMMIT_BITS; | |
367 | // Filter to encode/decode commit seq | |
368 | const uint64_t COMMIT_FILTER; | |
369 | // The value of commit_seq - prepare_seq + 1 must be less than this bound | |
370 | const uint64_t DELTA_UPPERBOUND; | |
371 | }; | |
372 | ||
373 | // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ... | |
374 | // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ... | |
375 | // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA | |
376 | // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and | |
377 | // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the | |
378 | // bits that do not have to be encoded (will be provided externally) DELTA: | |
379 | // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of | |
380 | // index bits + PADs | |
381 | struct CommitEntry64b { | |
382 | constexpr CommitEntry64b() noexcept : rep_(0) {} | |
383 | ||
384 | CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format) | |
385 | : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {} | |
386 | ||
387 | CommitEntry64b(const uint64_t ps, const uint64_t cs, | |
388 | const CommitEntry64bFormat& format) { | |
389 | assert(ps < static_cast<uint64_t>( | |
390 | (1ull << (format.PREP_BITS + format.INDEX_BITS)))); | |
391 | assert(ps <= cs); | |
392 | uint64_t delta = cs - ps + 1; // make initialized delta always >= 1 | |
393 | // zero is reserved for uninitialized entries | |
394 | assert(0 < delta); | |
395 | assert(delta < format.DELTA_UPPERBOUND); | |
396 | if (delta >= format.DELTA_UPPERBOUND) { | |
397 | throw std::runtime_error( | |
398 | "commit_seq >> prepare_seq. The allowed distance is " + | |
399 | ToString(format.DELTA_UPPERBOUND) + " commit_seq is " + | |
400 | ToString(cs) + " prepare_seq is " + ToString(ps)); | |
401 | } | |
402 | rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER; | |
403 | rep_ = rep_ | delta; | |
404 | } | |
405 | ||
406 | // Return false if the entry is empty | |
407 | bool Parse(const uint64_t indexed_seq, CommitEntry* entry, | |
408 | const CommitEntry64bFormat& format) { | |
409 | uint64_t delta = rep_ & format.COMMIT_FILTER; | |
410 | // zero is reserved for uninitialized entries | |
411 | assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS))); | |
412 | if (delta == 0) { | |
413 | return false; // initialized entry would have non-zero delta | |
414 | } | |
415 | ||
416 | assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS))); | |
417 | uint64_t prep_up = rep_ & ~format.COMMIT_FILTER; | |
418 | prep_up >>= format.PAD_BITS; | |
419 | const uint64_t& prep_low = indexed_seq; | |
420 | entry->prep_seq = prep_up | prep_low; | |
421 | ||
422 | entry->commit_seq = entry->prep_seq + delta - 1; | |
423 | return true; | |
424 | } | |
425 | ||
426 | private: | |
427 | uint64_t rep_; | |
428 | }; | |
429 | ||
430 | // Struct to hold ownership of snapshot and read callback for cleanup. | |
431 | struct IteratorState; | |
432 | ||
433 | std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() { | |
434 | return cf_map_; | |
435 | } | |
436 | std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() { | |
437 | return handle_map_; | |
438 | } | |
439 | void UpdateCFComparatorMap( | |
440 | const std::vector<ColumnFamilyHandle*>& handles) override; | |
441 | void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override; | |
442 | ||
443 | virtual const Snapshot* GetSnapshot() override; | |
494da23a | 444 | SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check); |
11fdf7f2 TL |
445 | |
446 | protected: | |
447 | virtual Status VerifyCFOptions( | |
448 | const ColumnFamilyOptions& cf_options) override; | |
f67539c2 TL |
449 | // Assign the min and max sequence numbers for reading from the db. A seq > |
450 | // max is not valid, and a seq < min is valid, and a min <= seq < max requires | |
451 | // further checking. Normally max is defined by the snapshot and min is by | |
452 | // minimum uncommitted seq. | |
453 | inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot, | |
454 | SequenceNumber* min, | |
455 | SequenceNumber* max); | |
456 | // Validate is a snapshot sequence number is still valid based on the latest | |
457 | // db status. backed_by_snapshot specifies if the number is baked by an actual | |
458 | // snapshot object. order specified the memory order with which we load the | |
459 | // atomic variables: relax is enough for the default since we care about last | |
460 | // value seen by same thread. | |
461 | inline bool ValidateSnapshot( | |
462 | const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot, | |
463 | std::memory_order order = std::memory_order_relaxed); | |
464 | // Get a dummy snapshot that refers to kMaxSequenceNumber | |
465 | Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; } | |
11fdf7f2 TL |
466 | |
467 | private: | |
f67539c2 | 468 | friend class AddPreparedCallback; |
11fdf7f2 | 469 | friend class PreparedHeap_BasicsTest_Test; |
11fdf7f2 | 470 | friend class PreparedHeap_Concurrent_Test; |
494da23a | 471 | friend class PreparedHeap_EmptyAtTheEnd_Test; |
f67539c2 | 472 | friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccess_Test; |
494da23a TL |
473 | friend class WritePreparedCommitEntryPreReleaseCallback; |
474 | friend class WritePreparedTransactionTestBase; | |
11fdf7f2 TL |
475 | friend class WritePreparedTxn; |
476 | friend class WritePreparedTxnDBMock; | |
494da23a | 477 | friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test; |
f67539c2 | 478 | friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasic_Test; |
11fdf7f2 | 479 | friend class |
f67539c2 | 480 | WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicates_Test; |
494da23a | 481 | friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test; |
f67539c2 TL |
482 | friend class WritePreparedTransactionTest_BasicRecovery_Test; |
483 | friend class WritePreparedTransactionTest_CheckAgainstSnapshots_Test; | |
494da23a | 484 | friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test; |
f67539c2 TL |
485 | friend class WritePreparedTransactionTest_ConflictDetectionAfterRecovery_Test; |
486 | friend class WritePreparedTransactionTest_CommitMap_Test; | |
494da23a | 487 | friend class WritePreparedTransactionTest_DoubleSnapshot_Test; |
f67539c2 | 488 | friend class WritePreparedTransactionTest_IsInSnapshotEmptyMap_Test; |
494da23a | 489 | friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test; |
f67539c2 | 490 | friend class WritePreparedTransactionTest_IsInSnapshot_Test; |
494da23a TL |
491 | friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test; |
492 | friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test; | |
f67539c2 | 493 | friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test; |
494da23a TL |
494 | friend class |
495 | WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test; | |
496 | friend class | |
497 | WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test; | |
498 | friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test; | |
11fdf7f2 | 499 | friend class WritePreparedTransactionTest_OldCommitMapGC_Test; |
f67539c2 TL |
500 | friend class WritePreparedTransactionTest_Rollback_Test; |
501 | friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test; | |
502 | friend class WriteUnpreparedTxn; | |
11fdf7f2 TL |
503 | friend class WriteUnpreparedTxnDB; |
504 | friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; | |
505 | ||
506 | void Init(const TransactionDBOptions& /* unused */); | |
507 | ||
508 | void WPRecordTick(uint32_t ticker_type) const { | |
509 | RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type); | |
510 | } | |
511 | ||
512 | // A heap with the amortized O(1) complexity for erase. It uses one extra heap | |
513 | // to keep track of erased entries that are not yet on top of the main heap. | |
514 | class PreparedHeap { | |
f67539c2 TL |
515 | // The mutex is required for push and pop from PreparedHeap. ::erase will |
516 | // use external synchronization via prepared_mutex_. | |
517 | port::Mutex push_pop_mutex_; | |
518 | std::deque<uint64_t> heap_; | |
11fdf7f2 TL |
519 | std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> |
520 | erased_heap_; | |
f67539c2 | 521 | std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber}; |
11fdf7f2 TL |
522 | // True when testing crash recovery |
523 | bool TEST_CRASH_ = false; | |
524 | friend class WritePreparedTxnDB; | |
525 | ||
526 | public: | |
527 | ~PreparedHeap() { | |
528 | if (!TEST_CRASH_) { | |
529 | assert(heap_.empty()); | |
530 | assert(erased_heap_.empty()); | |
531 | } | |
532 | } | |
f67539c2 TL |
533 | port::Mutex* push_pop_mutex() { return &push_pop_mutex_; } |
534 | ||
535 | inline bool empty() { return top() == kMaxSequenceNumber; } | |
536 | // Returns kMaxSequenceNumber if empty() and the smallest otherwise. | |
537 | inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); } | |
538 | inline void push(uint64_t v) { | |
539 | push_pop_mutex_.AssertHeld(); | |
540 | if (heap_.empty()) { | |
541 | heap_top_.store(v, std::memory_order_release); | |
542 | } else { | |
543 | assert(heap_top_.load() < v); | |
544 | } | |
545 | heap_.push_back(v); | |
546 | } | |
547 | void pop(bool locked = false) { | |
548 | if (!locked) { | |
549 | push_pop_mutex()->Lock(); | |
550 | } | |
551 | push_pop_mutex_.AssertHeld(); | |
552 | heap_.pop_front(); | |
11fdf7f2 TL |
553 | while (!heap_.empty() && !erased_heap_.empty() && |
554 | // heap_.top() > erased_heap_.top() could happen if we have erased | |
555 | // a non-existent entry. Ideally the user should not do that but we | |
556 | // should be resilient against it. | |
f67539c2 TL |
557 | heap_.front() >= erased_heap_.top()) { |
558 | if (heap_.front() == erased_heap_.top()) { | |
559 | heap_.pop_front(); | |
11fdf7f2 TL |
560 | } |
561 | uint64_t erased __attribute__((__unused__)); | |
562 | erased = erased_heap_.top(); | |
563 | erased_heap_.pop(); | |
564 | // No duplicate prepare sequence numbers | |
565 | assert(erased_heap_.empty() || erased_heap_.top() != erased); | |
566 | } | |
567 | while (heap_.empty() && !erased_heap_.empty()) { | |
568 | erased_heap_.pop(); | |
569 | } | |
f67539c2 TL |
570 | heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber, |
571 | std::memory_order_release); | |
572 | if (!locked) { | |
573 | push_pop_mutex()->Unlock(); | |
574 | } | |
11fdf7f2 | 575 | } |
f67539c2 TL |
576 | // Concurrrent calls needs external synchronization. It is safe to be called |
577 | // concurrent to push and pop though. | |
11fdf7f2 | 578 | void erase(uint64_t seq) { |
f67539c2 TL |
579 | if (!empty()) { |
580 | auto top_seq = top(); | |
581 | if (seq < top_seq) { | |
11fdf7f2 | 582 | // Already popped, ignore it. |
f67539c2 | 583 | } else if (top_seq == seq) { |
11fdf7f2 | 584 | pop(); |
f67539c2 TL |
585 | #ifndef NDEBUG |
586 | MutexLock ml(push_pop_mutex()); | |
587 | assert(heap_.empty() || heap_.front() != seq); | |
588 | #endif | |
589 | } else { // top() > seq | |
11fdf7f2 TL |
590 | // Down the heap, remember to pop it later |
591 | erased_heap_.push(seq); | |
592 | } | |
593 | } | |
594 | } | |
595 | }; | |
596 | ||
597 | void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; } | |
598 | ||
599 | // Get the commit entry with index indexed_seq from the commit table. It | |
600 | // returns true if such entry exists. | |
601 | bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, | |
602 | CommitEntry* entry) const; | |
603 | ||
604 | // Rewrite the entry with the index indexed_seq in the commit table with the | |
605 | // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction, | |
606 | // sets the evicted_entry and returns true. | |
607 | bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry, | |
608 | CommitEntry* evicted_entry); | |
609 | ||
610 | // Rewrite the entry with the index indexed_seq in the commit table with the | |
611 | // commit entry new_entry only if the existing entry matches the | |
612 | // expected_entry. Returns false otherwise. | |
613 | bool ExchangeCommitEntry(const uint64_t indexed_seq, | |
614 | CommitEntry64b& expected_entry, | |
615 | const CommitEntry& new_entry); | |
616 | ||
617 | // Increase max_evicted_seq_ from the previous value prev_max to the new | |
618 | // value. This also involves taking care of prepared txns that are not | |
619 | // committed before new_max, as well as updating the list of live snapshots at | |
620 | // the time of updating the max. Thread-safety: this function can be called | |
621 | // concurrently. The concurrent invocations of this function is equivalent to | |
622 | // a serial invocation in which the last invocation is the one with the | |
623 | // largest new_max value. | |
624 | void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, | |
625 | const SequenceNumber& new_max); | |
626 | ||
627 | inline SequenceNumber SmallestUnCommittedSeq() { | |
f67539c2 TL |
628 | // Note: We have two lists to look into, but for performance reasons they |
629 | // are not read atomically. Since CheckPreparedAgainstMax copies the entry | |
630 | // to delayed_prepared_ before removing it from prepared_txns_, to ensure | |
631 | // that a prepared entry will not go unmissed, we look into them in opposite | |
632 | // order: first read prepared_txns_ and then delayed_prepared_. | |
633 | ||
634 | // This must be called before calling ::top. This is because the concurrent | |
635 | // thread would call ::RemovePrepared before updating | |
636 | // GetLatestSequenceNumber(). Reading then in opposite order here guarantees | |
637 | // that the ::top that we read would be lower the ::top if we had otherwise | |
638 | // update/read them atomically. | |
639 | auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1; | |
640 | auto min_prepare = prepared_txns_.top(); | |
11fdf7f2 TL |
641 | // Since we update the prepare_heap always from the main write queue via |
642 | // PreReleaseCallback, the prepared_txns_.top() indicates the smallest | |
643 | // prepared data in 2pc transactions. For non-2pc transactions that are | |
644 | // written in two steps, we also update prepared_txns_ at the first step | |
645 | // (via the same mechanism) so that their uncommitted data is reflected in | |
646 | // SmallestUnCommittedSeq. | |
f67539c2 TL |
647 | if (!delayed_prepared_empty_.load()) { |
648 | ReadLock rl(&prepared_mutex_); | |
649 | if (!delayed_prepared_.empty()) { | |
650 | return *delayed_prepared_.begin(); | |
651 | } | |
494da23a | 652 | } |
f67539c2 TL |
653 | bool empty = min_prepare == kMaxSequenceNumber; |
654 | if (empty) { | |
655 | // Since GetLatestSequenceNumber is updated | |
656 | // after prepared_txns_ are, the value of GetLatestSequenceNumber would | |
657 | // reflect any uncommitted data that is not added to prepared_txns_ yet. | |
658 | // Otherwise, if there is no concurrent txn, this value simply reflects | |
659 | // that latest value in the memtable. | |
660 | return next_prepare; | |
11fdf7f2 | 661 | } else { |
f67539c2 | 662 | return std::min(min_prepare, next_prepare); |
11fdf7f2 TL |
663 | } |
664 | } | |
f67539c2 | 665 | |
11fdf7f2 TL |
666 | // Enhance the snapshot object by recording in it the smallest uncommitted seq |
667 | inline void EnhanceSnapshot(SnapshotImpl* snapshot, | |
668 | SequenceNumber min_uncommitted) { | |
669 | assert(snapshot); | |
f67539c2 | 670 | assert(min_uncommitted <= snapshot->number_ + 1); |
11fdf7f2 TL |
671 | snapshot->min_uncommitted_ = min_uncommitted; |
672 | } | |
673 | ||
674 | virtual const std::vector<SequenceNumber> GetSnapshotListFromDB( | |
675 | SequenceNumber max); | |
676 | ||
677 | // Will be called by the public ReleaseSnapshot method. Does the maintenance | |
678 | // internal to WritePreparedTxnDB | |
679 | void ReleaseSnapshotInternal(const SequenceNumber snap_seq); | |
680 | ||
681 | // Update the list of snapshots corresponding to the soon-to-be-updated | |
682 | // max_evicted_seq_. Thread-safety: this function can be called concurrently. | |
683 | // The concurrent invocations of this function is equivalent to a serial | |
684 | // invocation in which the last invocation is the one with the largest | |
685 | // version value. | |
686 | void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots, | |
687 | const SequenceNumber& version); | |
494da23a TL |
688 | // Check the new list of new snapshots against the old one to see if any of |
689 | // the snapshots are released and to do the cleanup for the released snapshot. | |
690 | void CleanupReleasedSnapshots( | |
691 | const std::vector<SequenceNumber>& new_snapshots, | |
692 | const std::vector<SequenceNumber>& old_snapshots); | |
11fdf7f2 TL |
693 | |
694 | // Check an evicted entry against live snapshots to see if it should be kept | |
695 | // around or it can be safely discarded (and hence assume committed for all | |
696 | // snapshots). Thread-safety: this function can be called concurrently. If it | |
697 | // is called concurrently with multiple UpdateSnapshots, the result is the | |
698 | // same as checking the intersection of the snapshot list before updates with | |
699 | // the snapshot list of all the concurrent updates. | |
700 | void CheckAgainstSnapshots(const CommitEntry& evicted); | |
701 | ||
702 | // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < | |
703 | // commit_seq. Return false if checking the next snapshot(s) is not needed. | |
704 | // This is the case if none of the next snapshots could satisfy the condition. | |
705 | // next_is_larger: the next snapshot will be a larger value | |
706 | bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq, | |
707 | const uint64_t& commit_seq, | |
708 | const uint64_t& snapshot_seq, | |
709 | const bool next_is_larger); | |
710 | ||
494da23a TL |
711 | // A trick to increase the last visible sequence number by one and also wait |
712 | // for the in-flight commits to be visible. | |
713 | void AdvanceSeqByOne(); | |
714 | ||
11fdf7f2 TL |
715 | // The list of live snapshots at the last time that max_evicted_seq_ advanced. |
716 | // The list stored into two data structures: in snapshot_cache_ that is | |
717 | // efficient for concurrent reads, and in snapshots_ if the data does not fit | |
718 | // into snapshot_cache_. The total number of snapshots in the two lists | |
719 | std::atomic<size_t> snapshots_total_ = {}; | |
720 | // The list sorted in ascending order. Thread-safety for writes is provided | |
721 | // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for | |
722 | // each entry. In x86_64 architecture such reads are compiled to simple read | |
494da23a | 723 | // instructions. |
11fdf7f2 TL |
724 | const size_t SNAPSHOT_CACHE_BITS; |
725 | const size_t SNAPSHOT_CACHE_SIZE; | |
494da23a | 726 | std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_; |
11fdf7f2 TL |
727 | // 2nd list for storing snapshots. The list sorted in ascending order. |
728 | // Thread-safety is provided with snapshots_mutex_. | |
729 | std::vector<SequenceNumber> snapshots_; | |
494da23a TL |
730 | // The list of all snapshots: snapshots_ + snapshot_cache_. This list although |
731 | // redundant but simplifies CleanupOldSnapshots implementation. | |
732 | // Thread-safety is provided with snapshots_mutex_. | |
733 | std::vector<SequenceNumber> snapshots_all_; | |
11fdf7f2 TL |
734 | // The version of the latest list of snapshots. This can be used to avoid |
735 | // rewriting a list that is concurrently updated with a more recent version. | |
736 | SequenceNumber snapshots_version_ = 0; | |
737 | ||
738 | // A heap of prepared transactions. Thread-safety is provided with | |
739 | // prepared_mutex_. | |
740 | PreparedHeap prepared_txns_; | |
11fdf7f2 TL |
741 | const size_t COMMIT_CACHE_BITS; |
742 | const size_t COMMIT_CACHE_SIZE; | |
743 | const CommitEntry64bFormat FORMAT; | |
744 | // commit_cache_ must be initialized to zero to tell apart an empty index from | |
745 | // a filled one. Thread-safety is provided with commit_cache_mutex_. | |
494da23a | 746 | std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_; |
11fdf7f2 TL |
747 | // The largest evicted *commit* sequence number from the commit_cache_. If a |
748 | // seq is smaller than max_evicted_seq_ is might or might not be present in | |
749 | // commit_cache_. So commit_cache_ must first be checked before consulting | |
750 | // with max_evicted_seq_. | |
751 | std::atomic<uint64_t> max_evicted_seq_ = {}; | |
494da23a TL |
752 | // Order: 1) update future_max_evicted_seq_ = new_max, 2) |
753 | // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since | |
754 | // GetSnapshotInternal guarantess that the snapshot seq is larger than | |
755 | // future_max_evicted_seq_, this guarantes that if a snapshot is not larger | |
756 | // than max has already being looked at via a GetSnapshotListFromDB(new_max). | |
757 | std::atomic<uint64_t> future_max_evicted_seq_ = {}; | |
11fdf7f2 TL |
758 | // Advance max_evicted_seq_ by this value each time it needs an update. The |
759 | // larger the value, the less frequent advances we would have. We do not want | |
760 | // it to be too large either as it would cause stalls by doing too much | |
761 | // maintenance work under the lock. | |
762 | size_t INC_STEP_FOR_MAX_EVICTED = 1; | |
763 | // A map from old snapshots (expected to be used by a few read-only txns) to | |
764 | // prepared sequence number of the evicted entries from commit_cache_ that | |
765 | // overlaps with such snapshot. These are the prepared sequence numbers that | |
766 | // the snapshot, to which they are mapped, cannot assume to be committed just | |
767 | // because it is no longer in the commit_cache_. The vector must be sorted | |
768 | // after each update. | |
769 | // Thread-safety is provided with old_commit_map_mutex_. | |
770 | std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_; | |
771 | // A set of long-running prepared transactions that are not finished by the | |
772 | // time max_evicted_seq_ advances their sequence number. This is expected to | |
773 | // be empty normally. Thread-safety is provided with prepared_mutex_. | |
774 | std::set<uint64_t> delayed_prepared_; | |
494da23a TL |
775 | // Commit of a delayed prepared: 1) update commit cache, 2) update |
776 | // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_. | |
777 | // delayed_prepared_commits_ will help us tell apart the unprepared txns from | |
778 | // the ones that are committed but not cleaned up yet. | |
779 | std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_; | |
11fdf7f2 TL |
780 | // Update when delayed_prepared_.empty() changes. Expected to be true |
781 | // normally. | |
782 | std::atomic<bool> delayed_prepared_empty_ = {true}; | |
783 | // Update when old_commit_map_.empty() changes. Expected to be true normally. | |
784 | std::atomic<bool> old_commit_map_empty_ = {true}; | |
785 | mutable port::RWMutex prepared_mutex_; | |
786 | mutable port::RWMutex old_commit_map_mutex_; | |
787 | mutable port::RWMutex commit_cache_mutex_; | |
788 | mutable port::RWMutex snapshots_mutex_; | |
789 | // A cache of the cf comparators | |
790 | // Thread safety: since it is a const it is safe to read it concurrently | |
791 | std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_; | |
792 | // A cache of the cf handles | |
793 | // Thread safety: since the handle is read-only object it is a const it is | |
794 | // safe to read it concurrently | |
795 | std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_; | |
f67539c2 TL |
796 | // A dummy snapshot object that refers to kMaxSequenceNumber |
797 | SnapshotImpl dummy_max_snapshot_; | |
11fdf7f2 TL |
798 | }; |
799 | ||
800 | class WritePreparedTxnReadCallback : public ReadCallback { | |
801 | public: | |
494da23a | 802 | WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) |
f67539c2 TL |
803 | : ReadCallback(snapshot), |
804 | db_(db), | |
805 | backed_by_snapshot_(kBackedByDBSnapshot) {} | |
11fdf7f2 | 806 | WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, |
f67539c2 TL |
807 | SequenceNumber min_uncommitted, |
808 | SnapshotBackup backed_by_snapshot) | |
809 | : ReadCallback(snapshot, min_uncommitted), | |
810 | db_(db), | |
811 | backed_by_snapshot_(backed_by_snapshot) { | |
812 | (void)backed_by_snapshot_; // to silence unused private field warning | |
813 | } | |
814 | ||
815 | virtual ~WritePreparedTxnReadCallback() { | |
816 | // If it is not backed by snapshot, the caller must check validity | |
817 | assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot); | |
818 | } | |
11fdf7f2 TL |
819 | |
820 | // Will be called to see if the seq number visible; if not it moves on to | |
821 | // the next seq number. | |
494da23a TL |
822 | inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override { |
823 | auto snapshot = max_visible_seq_; | |
f67539c2 TL |
824 | bool snap_released = false; |
825 | auto ret = | |
826 | db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released); | |
827 | assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot); | |
828 | snap_released_ |= snap_released; | |
829 | return ret; | |
830 | } | |
831 | ||
832 | inline bool valid() { | |
833 | valid_checked_ = true; | |
834 | return snap_released_ == false; | |
11fdf7f2 TL |
835 | } |
836 | ||
494da23a | 837 | // TODO(myabandeh): override Refresh when Iterator::Refresh is supported |
11fdf7f2 TL |
838 | private: |
839 | WritePreparedTxnDB* db_; | |
f67539c2 TL |
840 | // Whether max_visible_seq_ is backed by a snapshot |
841 | const SnapshotBackup backed_by_snapshot_; | |
842 | bool snap_released_ = false; | |
843 | // Safety check to ensure that the caller has checked invalid statuses | |
844 | bool valid_checked_ = false; | |
11fdf7f2 TL |
845 | }; |
846 | ||
847 | class AddPreparedCallback : public PreReleaseCallback { | |
848 | public: | |
494da23a TL |
849 | AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl, |
850 | size_t sub_batch_cnt, bool two_write_queues, | |
851 | bool first_prepare_batch) | |
11fdf7f2 | 852 | : db_(db), |
494da23a | 853 | db_impl_(db_impl), |
11fdf7f2 | 854 | sub_batch_cnt_(sub_batch_cnt), |
494da23a TL |
855 | two_write_queues_(two_write_queues), |
856 | first_prepare_batch_(first_prepare_batch) { | |
11fdf7f2 TL |
857 | (void)two_write_queues_; // to silence unused private field warning |
858 | } | |
859 | virtual Status Callback(SequenceNumber prepare_seq, | |
494da23a | 860 | bool is_mem_disabled __attribute__((__unused__)), |
f67539c2 TL |
861 | uint64_t log_number, size_t index, |
862 | size_t total) override { | |
863 | assert(index < total); | |
864 | // To reduce the cost of lock acquisition competing with the concurrent | |
865 | // prepare requests, lock on the first callback and unlock on the last. | |
866 | const bool do_lock = !two_write_queues_ || index == 0; | |
867 | const bool do_unlock = !two_write_queues_ || index + 1 == total; | |
494da23a | 868 | // Always Prepare from the main queue |
11fdf7f2 | 869 | assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue |
f67539c2 TL |
870 | TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause"); |
871 | TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume"); | |
872 | if (do_lock) { | |
873 | db_->prepared_txns_.push_pop_mutex()->Lock(); | |
874 | } | |
875 | const bool kLocked = true; | |
11fdf7f2 | 876 | for (size_t i = 0; i < sub_batch_cnt_; i++) { |
f67539c2 TL |
877 | db_->AddPrepared(prepare_seq + i, kLocked); |
878 | } | |
879 | if (do_unlock) { | |
880 | db_->prepared_txns_.push_pop_mutex()->Unlock(); | |
11fdf7f2 | 881 | } |
f67539c2 | 882 | TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end"); |
494da23a TL |
883 | if (first_prepare_batch_) { |
884 | assert(log_number != 0); | |
885 | db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( | |
886 | log_number); | |
887 | } | |
11fdf7f2 TL |
888 | return Status::OK(); |
889 | } | |
890 | ||
891 | private: | |
892 | WritePreparedTxnDB* db_; | |
494da23a | 893 | DBImpl* db_impl_; |
11fdf7f2 TL |
894 | size_t sub_batch_cnt_; |
895 | bool two_write_queues_; | |
494da23a TL |
896 | // It is 2PC and this is the first prepare batch. Always the case in 2PC |
897 | // unless it is WriteUnPrepared. | |
898 | bool first_prepare_batch_; | |
11fdf7f2 TL |
899 | }; |
900 | ||
901 | class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { | |
902 | public: | |
903 | // includes_data indicates that the commit also writes non-empty | |
904 | // CommitTimeWriteBatch to memtable, which needs to be committed separately. | |
494da23a TL |
905 | WritePreparedCommitEntryPreReleaseCallback( |
906 | WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq, | |
907 | size_t prep_batch_cnt, size_t data_batch_cnt = 0, | |
908 | SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0) | |
11fdf7f2 TL |
909 | : db_(db), |
910 | db_impl_(db_impl), | |
911 | prep_seq_(prep_seq), | |
912 | prep_batch_cnt_(prep_batch_cnt), | |
913 | data_batch_cnt_(data_batch_cnt), | |
914 | includes_data_(data_batch_cnt_ > 0), | |
494da23a TL |
915 | aux_seq_(aux_seq), |
916 | aux_batch_cnt_(aux_batch_cnt), | |
917 | includes_aux_batch_(aux_batch_cnt > 0) { | |
11fdf7f2 TL |
918 | assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor |
919 | assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0); | |
494da23a | 920 | assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber)); // xor |
11fdf7f2 TL |
921 | } |
922 | ||
923 | virtual Status Callback(SequenceNumber commit_seq, | |
494da23a | 924 | bool is_mem_disabled __attribute__((__unused__)), |
f67539c2 TL |
925 | uint64_t, size_t /*index*/, |
926 | size_t /*total*/) override { | |
494da23a TL |
927 | // Always commit from the 2nd queue |
928 | assert(!db_impl_->immutable_db_options().two_write_queues || | |
929 | is_mem_disabled); | |
11fdf7f2 | 930 | assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); |
494da23a TL |
931 | // Data batch is what accompanied with the commit marker and affects the |
932 | // last seq in the commit batch. | |
11fdf7f2 TL |
933 | const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) |
934 | ? commit_seq | |
935 | : commit_seq + data_batch_cnt_ - 1; | |
936 | if (prep_seq_ != kMaxSequenceNumber) { | |
937 | for (size_t i = 0; i < prep_batch_cnt_; i++) { | |
938 | db_->AddCommitted(prep_seq_ + i, last_commit_seq); | |
939 | } | |
940 | } // else there was no prepare phase | |
494da23a TL |
941 | if (includes_aux_batch_) { |
942 | for (size_t i = 0; i < aux_batch_cnt_; i++) { | |
943 | db_->AddCommitted(aux_seq_ + i, last_commit_seq); | |
944 | } | |
945 | } | |
11fdf7f2 TL |
946 | if (includes_data_) { |
947 | assert(data_batch_cnt_); | |
948 | // Commit the data that is accompanied with the commit request | |
949 | for (size_t i = 0; i < data_batch_cnt_; i++) { | |
950 | // For commit seq of each batch use the commit seq of the last batch. | |
951 | // This would make debugging easier by having all the batches having | |
952 | // the same sequence number. | |
953 | db_->AddCommitted(commit_seq + i, last_commit_seq); | |
954 | } | |
955 | } | |
494da23a | 956 | if (db_impl_->immutable_db_options().two_write_queues) { |
11fdf7f2 TL |
957 | assert(is_mem_disabled); // implies the 2nd queue |
958 | // Publish the sequence number. We can do that here assuming the callback | |
959 | // is invoked only from one write queue, which would guarantee that the | |
960 | // publish sequence numbers will be in order, i.e., once a seq is | |
961 | // published all the seq prior to that are also publishable. | |
962 | db_impl_->SetLastPublishedSequence(last_commit_seq); | |
f67539c2 TL |
963 | // Note RemovePrepared should be called after publishing the seq. |
964 | // Otherwise SmallestUnCommittedSeq optimization breaks. | |
965 | if (prep_seq_ != kMaxSequenceNumber) { | |
966 | db_->RemovePrepared(prep_seq_, prep_batch_cnt_); | |
967 | } // else there was no prepare phase | |
968 | if (includes_aux_batch_) { | |
969 | db_->RemovePrepared(aux_seq_, aux_batch_cnt_); | |
970 | } | |
11fdf7f2 TL |
971 | } |
972 | // else SequenceNumber that is updated as part of the write already does the | |
973 | // publishing | |
974 | return Status::OK(); | |
975 | } | |
976 | ||
977 | private: | |
978 | WritePreparedTxnDB* db_; | |
979 | DBImpl* db_impl_; | |
980 | // kMaxSequenceNumber if there was no prepare phase | |
981 | SequenceNumber prep_seq_; | |
982 | size_t prep_batch_cnt_; | |
983 | size_t data_batch_cnt_; | |
494da23a TL |
984 | // Data here is the batch that is written with the commit marker, either |
985 | // because it is commit without prepare or commit has a CommitTimeWriteBatch. | |
11fdf7f2 | 986 | bool includes_data_; |
494da23a TL |
987 | // Auxiliary batch (if there is any) is a batch that is written before, but |
988 | // gets the same commit seq as prepare batch or data batch. This is used in | |
989 | // two write queues where the CommitTimeWriteBatch becomes the aux batch and | |
990 | // we do a separate write to actually commit everything. | |
991 | SequenceNumber aux_seq_; | |
992 | size_t aux_batch_cnt_; | |
993 | bool includes_aux_batch_; | |
994 | }; | |
995 | ||
996 | // For two_write_queues commit both the aborted batch and the cleanup batch and | |
997 | // then published the seq | |
998 | class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback { | |
999 | public: | |
1000 | WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db, | |
1001 | DBImpl* db_impl, | |
1002 | SequenceNumber prep_seq, | |
1003 | SequenceNumber rollback_seq, | |
1004 | size_t prep_batch_cnt) | |
1005 | : db_(db), | |
1006 | db_impl_(db_impl), | |
1007 | prep_seq_(prep_seq), | |
1008 | rollback_seq_(rollback_seq), | |
1009 | prep_batch_cnt_(prep_batch_cnt) { | |
1010 | assert(prep_seq != kMaxSequenceNumber); | |
1011 | assert(rollback_seq != kMaxSequenceNumber); | |
1012 | assert(prep_batch_cnt_ > 0); | |
1013 | } | |
1014 | ||
f67539c2 TL |
1015 | Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t, |
1016 | size_t /*index*/, size_t /*total*/) override { | |
494da23a TL |
1017 | // Always commit from the 2nd queue |
1018 | assert(is_mem_disabled); // implies the 2nd queue | |
1019 | assert(db_impl_->immutable_db_options().two_write_queues); | |
1020 | #ifdef NDEBUG | |
1021 | (void)is_mem_disabled; | |
1022 | #endif | |
1023 | const uint64_t last_commit_seq = commit_seq; | |
1024 | db_->AddCommitted(rollback_seq_, last_commit_seq); | |
1025 | for (size_t i = 0; i < prep_batch_cnt_; i++) { | |
1026 | db_->AddCommitted(prep_seq_ + i, last_commit_seq); | |
1027 | } | |
1028 | db_impl_->SetLastPublishedSequence(last_commit_seq); | |
1029 | return Status::OK(); | |
1030 | } | |
1031 | ||
1032 | private: | |
1033 | WritePreparedTxnDB* db_; | |
1034 | DBImpl* db_impl_; | |
1035 | SequenceNumber prep_seq_; | |
1036 | SequenceNumber rollback_seq_; | |
1037 | size_t prep_batch_cnt_; | |
11fdf7f2 TL |
1038 | }; |
1039 | ||
1040 | // Count the number of sub-batches inside a batch. A sub-batch does not have | |
1041 | // duplicate keys. | |
1042 | struct SubBatchCounter : public WriteBatch::Handler { | |
1043 | explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators) | |
1044 | : comparators_(comparators), batches_(1) {} | |
1045 | std::map<uint32_t, const Comparator*>& comparators_; | |
1046 | using CFKeys = std::set<Slice, SetComparator>; | |
1047 | std::map<uint32_t, CFKeys> keys_; | |
1048 | size_t batches_; | |
1049 | size_t BatchCount() { return batches_; } | |
1050 | void AddKey(const uint32_t cf, const Slice& key); | |
1051 | void InitWithComp(const uint32_t cf); | |
1052 | Status MarkNoop(bool) override { return Status::OK(); } | |
1053 | Status MarkEndPrepare(const Slice&) override { return Status::OK(); } | |
1054 | Status MarkCommit(const Slice&) override { return Status::OK(); } | |
1055 | Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { | |
1056 | AddKey(cf, key); | |
1057 | return Status::OK(); | |
1058 | } | |
1059 | Status DeleteCF(uint32_t cf, const Slice& key) override { | |
1060 | AddKey(cf, key); | |
1061 | return Status::OK(); | |
1062 | } | |
1063 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { | |
1064 | AddKey(cf, key); | |
1065 | return Status::OK(); | |
1066 | } | |
1067 | Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { | |
1068 | AddKey(cf, key); | |
1069 | return Status::OK(); | |
1070 | } | |
1071 | Status MarkBeginPrepare(bool) override { return Status::OK(); } | |
1072 | Status MarkRollback(const Slice&) override { return Status::OK(); } | |
1073 | bool WriteAfterCommit() const override { return false; } | |
1074 | }; | |
1075 | ||
f67539c2 TL |
1076 | SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot, |
1077 | SequenceNumber* min, | |
1078 | SequenceNumber* max) { | |
1079 | if (snapshot != nullptr) { | |
20effc67 TL |
1080 | *min = |
1081 | static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_; | |
1082 | *max = static_cast_with_check<const SnapshotImpl>(snapshot)->number_; | |
f67539c2 TL |
1083 | return kBackedByDBSnapshot; |
1084 | } else { | |
1085 | *min = SmallestUnCommittedSeq(); | |
1086 | *max = 0; // to be assigned later after sv is referenced. | |
1087 | return kUnbackedByDBSnapshot; | |
1088 | } | |
1089 | } | |
1090 | ||
1091 | bool WritePreparedTxnDB::ValidateSnapshot( | |
1092 | const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot, | |
1093 | std::memory_order order) { | |
1094 | if (backed_by_snapshot == kBackedByDBSnapshot) { | |
1095 | return true; | |
1096 | } else { | |
1097 | SequenceNumber max = max_evicted_seq_.load(order); | |
1098 | // Validate that max has not advanced the snapshot seq that is not backed | |
1099 | // by a real snapshot. This is a very rare case that should not happen in | |
1100 | // real workloads. | |
1101 | if (UNLIKELY(snap_seq <= max && snap_seq != 0)) { | |
1102 | return false; | |
1103 | } | |
1104 | } | |
1105 | return true; | |
1106 | } | |
1107 | ||
1108 | } // namespace ROCKSDB_NAMESPACE | |
11fdf7f2 | 1109 | #endif // ROCKSDB_LITE |