]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #pragma once | |
7 | #ifndef ROCKSDB_LITE | |
8 | ||
9 | #ifndef __STDC_FORMAT_MACROS | |
10 | #define __STDC_FORMAT_MACROS | |
11 | #endif | |
12 | ||
13 | #include <inttypes.h> | |
14 | #include <mutex> | |
15 | #include <queue> | |
16 | #include <set> | |
17 | #include <string> | |
18 | #include <unordered_map> | |
19 | #include <vector> | |
20 | ||
21 | #include "db/db_iter.h" | |
22 | #include "db/pre_release_callback.h" | |
23 | #include "db/read_callback.h" | |
24 | #include "db/snapshot_checker.h" | |
25 | #include "rocksdb/db.h" | |
26 | #include "rocksdb/options.h" | |
27 | #include "rocksdb/utilities/transaction_db.h" | |
28 | #include "util/set_comparator.h" | |
29 | #include "util/string_util.h" | |
30 | #include "utilities/transactions/pessimistic_transaction.h" | |
31 | #include "utilities/transactions/pessimistic_transaction_db.h" | |
32 | #include "utilities/transactions/transaction_lock_mgr.h" | |
33 | #include "utilities/transactions/write_prepared_txn.h" | |
34 | ||
35 | namespace rocksdb { | |
36 | ||
37 | #define ROCKS_LOG_DETAILS(LGR, FMT, ...) \ | |
38 | ; // due to overhead by default skip such lines | |
39 | // ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__) | |
40 | ||
41 | // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. | |
42 | // In this way some data in the DB might not be committed. The DB provides | |
43 | // mechanisms to tell such data apart from committed data. | |
44 | class WritePreparedTxnDB : public PessimisticTransactionDB { | |
45 | public: | |
46 | explicit WritePreparedTxnDB( | |
47 | DB* db, const TransactionDBOptions& txn_db_options, | |
48 | size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS, | |
49 | size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS) | |
50 | : PessimisticTransactionDB(db, txn_db_options), | |
51 | SNAPSHOT_CACHE_BITS(snapshot_cache_bits), | |
52 | SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), | |
53 | COMMIT_CACHE_BITS(commit_cache_bits), | |
54 | COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), | |
55 | FORMAT(COMMIT_CACHE_BITS) { | |
56 | Init(txn_db_options); | |
57 | } | |
58 | ||
59 | explicit WritePreparedTxnDB( | |
60 | StackableDB* db, const TransactionDBOptions& txn_db_options, | |
61 | size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS, | |
62 | size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS) | |
63 | : PessimisticTransactionDB(db, txn_db_options), | |
64 | SNAPSHOT_CACHE_BITS(snapshot_cache_bits), | |
65 | SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), | |
66 | COMMIT_CACHE_BITS(commit_cache_bits), | |
67 | COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), | |
68 | FORMAT(COMMIT_CACHE_BITS) { | |
69 | Init(txn_db_options); | |
70 | } | |
71 | ||
72 | virtual ~WritePreparedTxnDB(); | |
73 | ||
74 | virtual Status Initialize( | |
75 | const std::vector<size_t>& compaction_enabled_cf_indices, | |
76 | const std::vector<ColumnFamilyHandle*>& handles) override; | |
77 | ||
78 | Transaction* BeginTransaction(const WriteOptions& write_options, | |
79 | const TransactionOptions& txn_options, | |
80 | Transaction* old_txn) override; | |
81 | ||
82 | // Optimized version of ::Write that receives more optimization request such | |
83 | // as skip_concurrency_control. | |
84 | using PessimisticTransactionDB::Write; | |
85 | Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&, | |
86 | WriteBatch* updates) override; | |
87 | ||
88 | // Write the batch to the underlying DB and mark it as committed. Could be | |
89 | // used by both directly from TxnDB or through a transaction. | |
90 | Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch, | |
91 | size_t batch_cnt, WritePreparedTxn* txn); | |
92 | ||
93 | using DB::Get; | |
94 | virtual Status Get(const ReadOptions& options, | |
95 | ColumnFamilyHandle* column_family, const Slice& key, | |
96 | PinnableSlice* value) override; | |
97 | ||
98 | using DB::MultiGet; | |
99 | virtual std::vector<Status> MultiGet( | |
100 | const ReadOptions& options, | |
101 | const std::vector<ColumnFamilyHandle*>& column_family, | |
102 | const std::vector<Slice>& keys, | |
103 | std::vector<std::string>* values) override; | |
104 | ||
105 | using DB::NewIterator; | |
106 | virtual Iterator* NewIterator(const ReadOptions& options, | |
107 | ColumnFamilyHandle* column_family) override; | |
108 | ||
109 | using DB::NewIterators; | |
110 | virtual Status NewIterators( | |
111 | const ReadOptions& options, | |
112 | const std::vector<ColumnFamilyHandle*>& column_families, | |
113 | std::vector<Iterator*>* iterators) override; | |
114 | ||
115 | virtual void ReleaseSnapshot(const Snapshot* snapshot) override; | |
116 | ||
117 | // Check whether the transaction that wrote the value with sequence number seq | |
118 | // is visible to the snapshot with sequence number snapshot_seq. | |
119 | // Returns true if commit_seq <= snapshot_seq | |
120 | inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq, | |
121 | uint64_t min_uncommitted = 0) const { | |
122 | ROCKS_LOG_DETAILS(info_log_, | |
123 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
124 | " min_uncommitted %" PRIu64, | |
125 | prep_seq, snapshot_seq, min_uncommitted); | |
126 | // Here we try to infer the return value without looking into prepare list. | |
127 | // This would help avoiding synchronization over a shared map. | |
128 | // TODO(myabandeh): optimize this. This sequence of checks must be correct | |
129 | // but not necessary efficient | |
130 | if (prep_seq == 0) { | |
131 | // Compaction will output keys to bottom-level with sequence number 0 if | |
132 | // it is visible to the earliest snapshot. | |
133 | ROCKS_LOG_DETAILS( | |
134 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
135 | prep_seq, snapshot_seq, 1); | |
136 | return true; | |
137 | } | |
138 | if (snapshot_seq < prep_seq) { | |
139 | // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq | |
140 | ROCKS_LOG_DETAILS( | |
141 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
142 | prep_seq, snapshot_seq, 0); | |
143 | return false; | |
144 | } | |
145 | if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { | |
146 | // We should not normally reach here | |
147 | WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD); | |
148 | ReadLock rl(&prepared_mutex_); | |
149 | ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64, | |
150 | static_cast<uint64_t>(delayed_prepared_.size())); | |
151 | if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { | |
152 | // Then it is not committed yet | |
153 | ROCKS_LOG_DETAILS(info_log_, | |
154 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
155 | " returns %" PRId32, | |
156 | prep_seq, snapshot_seq, 0); | |
157 | return false; | |
158 | } | |
159 | } | |
160 | // Note: since min_uncommitted does not include the delayed_prepared_ we | |
161 | // should check delayed_prepared_ first before applying this optimization. | |
162 | // TODO(myabandeh): include delayed_prepared_ in min_uncommitted | |
163 | if (prep_seq < min_uncommitted) { | |
164 | ROCKS_LOG_DETAILS(info_log_, | |
165 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
166 | " returns %" PRId32 | |
167 | " because of min_uncommitted %" PRIu64, | |
168 | prep_seq, snapshot_seq, 1, min_uncommitted); | |
169 | return true; | |
170 | } | |
171 | auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; | |
172 | CommitEntry64b dont_care; | |
173 | CommitEntry cached; | |
174 | bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); | |
175 | if (exist && prep_seq == cached.prep_seq) { | |
176 | // It is committed and also not evicted from commit cache | |
177 | ROCKS_LOG_DETAILS( | |
178 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
179 | prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); | |
180 | return cached.commit_seq <= snapshot_seq; | |
181 | } | |
182 | // else it could be committed but not inserted in the map which could happen | |
183 | // after recovery, or it could be committed and evicted by another commit, | |
184 | // or never committed. | |
185 | ||
186 | // At this point we dont know if it was committed or it is still prepared | |
187 | auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire); | |
188 | // max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now | |
189 | if (max_evicted_seq < prep_seq) { | |
190 | // Not evicted from cache and also not present, so must be still prepared | |
191 | ROCKS_LOG_DETAILS( | |
192 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
193 | prep_seq, snapshot_seq, 0); | |
194 | return false; | |
195 | } | |
196 | // When advancing max_evicted_seq_, we move older entires from prepared to | |
197 | // delayed_prepared_. Also we move evicted entries from commit cache to | |
198 | // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= | |
199 | // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in | |
200 | // old_commit_map_, iii) committed with no conflict with any snapshot. Case | |
201 | // (i) delayed_prepared_ is checked above | |
202 | if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case | |
203 | // only (iii) is the case: committed | |
204 | // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < | |
205 | // snapshot_seq | |
206 | ROCKS_LOG_DETAILS( | |
207 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
208 | prep_seq, snapshot_seq, 1); | |
209 | return true; | |
210 | } | |
211 | // else (ii) might be the case: check the commit data saved for this | |
212 | // snapshot. If there was no overlapping commit entry, then it is committed | |
213 | // with a commit_seq lower than any live snapshot, including snapshot_seq. | |
214 | if (old_commit_map_empty_.load(std::memory_order_acquire)) { | |
215 | ROCKS_LOG_DETAILS( | |
216 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
217 | prep_seq, snapshot_seq, 1); | |
218 | return true; | |
219 | } | |
220 | { | |
221 | // We should not normally reach here unless sapshot_seq is old. This is a | |
222 | // rare case and it is ok to pay the cost of mutex ReadLock for such old, | |
223 | // reading transactions. | |
224 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |
225 | ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); | |
226 | ReadLock rl(&old_commit_map_mutex_); | |
227 | auto prep_set_entry = old_commit_map_.find(snapshot_seq); | |
228 | bool found = prep_set_entry != old_commit_map_.end(); | |
229 | if (found) { | |
230 | auto& vec = prep_set_entry->second; | |
231 | found = std::binary_search(vec.begin(), vec.end(), prep_seq); | |
232 | } | |
233 | if (!found) { | |
234 | ROCKS_LOG_DETAILS(info_log_, | |
235 | "IsInSnapshot %" PRIu64 " in %" PRIu64 | |
236 | " returns %" PRId32, | |
237 | prep_seq, snapshot_seq, 1); | |
238 | return true; | |
239 | } | |
240 | } | |
241 | // (ii) it the case: it is committed but after the snapshot_seq | |
242 | ROCKS_LOG_DETAILS( | |
243 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, | |
244 | prep_seq, snapshot_seq, 0); | |
245 | return false; | |
246 | } | |
247 | ||
248 | // Add the transaction with prepare sequence seq to the prepared list | |
249 | void AddPrepared(uint64_t seq); | |
250 | // Remove the transaction with prepare sequence seq from the prepared list | |
251 | void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1); | |
252 | // Add the transaction with prepare sequence prepare_seq and commit sequence | |
253 | // commit_seq to the commit map. loop_cnt is to detect infinite loops. | |
254 | void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, | |
255 | uint8_t loop_cnt = 0); | |
256 | ||
257 | struct CommitEntry { | |
258 | uint64_t prep_seq; | |
259 | uint64_t commit_seq; | |
260 | CommitEntry() : prep_seq(0), commit_seq(0) {} | |
261 | CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} | |
262 | bool operator==(const CommitEntry& rhs) const { | |
263 | return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; | |
264 | } | |
265 | }; | |
266 | ||
267 | struct CommitEntry64bFormat { | |
268 | explicit CommitEntry64bFormat(size_t index_bits) | |
269 | : INDEX_BITS(index_bits), | |
270 | PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)), | |
271 | COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)), | |
272 | COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)), | |
273 | DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {} | |
274 | // Number of higher bits of a sequence number that is not used. They are | |
275 | // used to encode the value type, ... | |
276 | const size_t PAD_BITS = static_cast<size_t>(8); | |
277 | // Number of lower bits from prepare seq that can be skipped as they are | |
278 | // implied by the index of the entry in the array | |
279 | const size_t INDEX_BITS; | |
280 | // Number of bits we use to encode the prepare seq | |
281 | const size_t PREP_BITS; | |
282 | // Number of bits we use to encode the commit seq. | |
283 | const size_t COMMIT_BITS; | |
284 | // Filter to encode/decode commit seq | |
285 | const uint64_t COMMIT_FILTER; | |
286 | // The value of commit_seq - prepare_seq + 1 must be less than this bound | |
287 | const uint64_t DELTA_UPPERBOUND; | |
288 | }; | |
289 | ||
290 | // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ... | |
291 | // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ... | |
292 | // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA | |
293 | // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and | |
294 | // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the | |
295 | // bits that do not have to be encoded (will be provided externally) DELTA: | |
296 | // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of | |
297 | // index bits + PADs | |
298 | struct CommitEntry64b { | |
299 | constexpr CommitEntry64b() noexcept : rep_(0) {} | |
300 | ||
301 | CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format) | |
302 | : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {} | |
303 | ||
304 | CommitEntry64b(const uint64_t ps, const uint64_t cs, | |
305 | const CommitEntry64bFormat& format) { | |
306 | assert(ps < static_cast<uint64_t>( | |
307 | (1ull << (format.PREP_BITS + format.INDEX_BITS)))); | |
308 | assert(ps <= cs); | |
309 | uint64_t delta = cs - ps + 1; // make initialized delta always >= 1 | |
310 | // zero is reserved for uninitialized entries | |
311 | assert(0 < delta); | |
312 | assert(delta < format.DELTA_UPPERBOUND); | |
313 | if (delta >= format.DELTA_UPPERBOUND) { | |
314 | throw std::runtime_error( | |
315 | "commit_seq >> prepare_seq. The allowed distance is " + | |
316 | ToString(format.DELTA_UPPERBOUND) + " commit_seq is " + | |
317 | ToString(cs) + " prepare_seq is " + ToString(ps)); | |
318 | } | |
319 | rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER; | |
320 | rep_ = rep_ | delta; | |
321 | } | |
322 | ||
323 | // Return false if the entry is empty | |
324 | bool Parse(const uint64_t indexed_seq, CommitEntry* entry, | |
325 | const CommitEntry64bFormat& format) { | |
326 | uint64_t delta = rep_ & format.COMMIT_FILTER; | |
327 | // zero is reserved for uninitialized entries | |
328 | assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS))); | |
329 | if (delta == 0) { | |
330 | return false; // initialized entry would have non-zero delta | |
331 | } | |
332 | ||
333 | assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS))); | |
334 | uint64_t prep_up = rep_ & ~format.COMMIT_FILTER; | |
335 | prep_up >>= format.PAD_BITS; | |
336 | const uint64_t& prep_low = indexed_seq; | |
337 | entry->prep_seq = prep_up | prep_low; | |
338 | ||
339 | entry->commit_seq = entry->prep_seq + delta - 1; | |
340 | return true; | |
341 | } | |
342 | ||
343 | private: | |
344 | uint64_t rep_; | |
345 | }; | |
346 | ||
347 | // Struct to hold ownership of snapshot and read callback for cleanup. | |
348 | struct IteratorState; | |
349 | ||
350 | std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() { | |
351 | return cf_map_; | |
352 | } | |
353 | std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() { | |
354 | return handle_map_; | |
355 | } | |
356 | void UpdateCFComparatorMap( | |
357 | const std::vector<ColumnFamilyHandle*>& handles) override; | |
358 | void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override; | |
359 | ||
360 | virtual const Snapshot* GetSnapshot() override; | |
361 | ||
362 | protected: | |
363 | virtual Status VerifyCFOptions( | |
364 | const ColumnFamilyOptions& cf_options) override; | |
365 | ||
366 | private: | |
367 | friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; | |
368 | friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; | |
369 | friend class WritePreparedTransactionTest_CommitMapTest_Test; | |
370 | friend class | |
371 | WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test; | |
372 | friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test; | |
373 | friend class WritePreparedTransactionTestBase; | |
374 | friend class PreparedHeap_BasicsTest_Test; | |
375 | friend class PreparedHeap_EmptyAtTheEnd_Test; | |
376 | friend class PreparedHeap_Concurrent_Test; | |
377 | friend class WritePreparedTxn; | |
378 | friend class WritePreparedTxnDBMock; | |
379 | friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; | |
380 | friend class | |
381 | WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test; | |
382 | friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; | |
383 | friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; | |
384 | friend class WritePreparedTransactionTest_OldCommitMapGC_Test; | |
385 | friend class WritePreparedTransactionTest_RollbackTest_Test; | |
386 | friend class WriteUnpreparedTxnDB; | |
387 | friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; | |
388 | ||
389 | void Init(const TransactionDBOptions& /* unused */); | |
390 | ||
391 | void WPRecordTick(uint32_t ticker_type) const { | |
392 | RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type); | |
393 | } | |
394 | ||
395 | // A heap with the amortized O(1) complexity for erase. It uses one extra heap | |
396 | // to keep track of erased entries that are not yet on top of the main heap. | |
397 | class PreparedHeap { | |
398 | std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> | |
399 | heap_; | |
400 | std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> | |
401 | erased_heap_; | |
402 | // True when testing crash recovery | |
403 | bool TEST_CRASH_ = false; | |
404 | friend class WritePreparedTxnDB; | |
405 | ||
406 | public: | |
407 | ~PreparedHeap() { | |
408 | if (!TEST_CRASH_) { | |
409 | assert(heap_.empty()); | |
410 | assert(erased_heap_.empty()); | |
411 | } | |
412 | } | |
413 | bool empty() { return heap_.empty(); } | |
414 | uint64_t top() { return heap_.top(); } | |
415 | void push(uint64_t v) { heap_.push(v); } | |
416 | void pop() { | |
417 | heap_.pop(); | |
418 | while (!heap_.empty() && !erased_heap_.empty() && | |
419 | // heap_.top() > erased_heap_.top() could happen if we have erased | |
420 | // a non-existent entry. Ideally the user should not do that but we | |
421 | // should be resilient against it. | |
422 | heap_.top() >= erased_heap_.top()) { | |
423 | if (heap_.top() == erased_heap_.top()) { | |
424 | heap_.pop(); | |
425 | } | |
426 | uint64_t erased __attribute__((__unused__)); | |
427 | erased = erased_heap_.top(); | |
428 | erased_heap_.pop(); | |
429 | // No duplicate prepare sequence numbers | |
430 | assert(erased_heap_.empty() || erased_heap_.top() != erased); | |
431 | } | |
432 | while (heap_.empty() && !erased_heap_.empty()) { | |
433 | erased_heap_.pop(); | |
434 | } | |
435 | } | |
436 | void erase(uint64_t seq) { | |
437 | if (!heap_.empty()) { | |
438 | if (seq < heap_.top()) { | |
439 | // Already popped, ignore it. | |
440 | } else if (heap_.top() == seq) { | |
441 | pop(); | |
442 | assert(heap_.empty() || heap_.top() != seq); | |
443 | } else { // (heap_.top() > seq) | |
444 | // Down the heap, remember to pop it later | |
445 | erased_heap_.push(seq); | |
446 | } | |
447 | } | |
448 | } | |
449 | }; | |
450 | ||
451 | void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; } | |
452 | ||
453 | // Get the commit entry with index indexed_seq from the commit table. It | |
454 | // returns true if such entry exists. | |
455 | bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, | |
456 | CommitEntry* entry) const; | |
457 | ||
458 | // Rewrite the entry with the index indexed_seq in the commit table with the | |
459 | // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction, | |
460 | // sets the evicted_entry and returns true. | |
461 | bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry, | |
462 | CommitEntry* evicted_entry); | |
463 | ||
464 | // Rewrite the entry with the index indexed_seq in the commit table with the | |
465 | // commit entry new_entry only if the existing entry matches the | |
466 | // expected_entry. Returns false otherwise. | |
467 | bool ExchangeCommitEntry(const uint64_t indexed_seq, | |
468 | CommitEntry64b& expected_entry, | |
469 | const CommitEntry& new_entry); | |
470 | ||
471 | // Increase max_evicted_seq_ from the previous value prev_max to the new | |
472 | // value. This also involves taking care of prepared txns that are not | |
473 | // committed before new_max, as well as updating the list of live snapshots at | |
474 | // the time of updating the max. Thread-safety: this function can be called | |
475 | // concurrently. The concurrent invocations of this function is equivalent to | |
476 | // a serial invocation in which the last invocation is the one with the | |
477 | // largest new_max value. | |
478 | void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, | |
479 | const SequenceNumber& new_max); | |
480 | ||
481 | inline SequenceNumber SmallestUnCommittedSeq() { | |
482 | // Since we update the prepare_heap always from the main write queue via | |
483 | // PreReleaseCallback, the prepared_txns_.top() indicates the smallest | |
484 | // prepared data in 2pc transactions. For non-2pc transactions that are | |
485 | // written in two steps, we also update prepared_txns_ at the first step | |
486 | // (via the same mechanism) so that their uncommitted data is reflected in | |
487 | // SmallestUnCommittedSeq. | |
488 | ReadLock rl(&prepared_mutex_); | |
489 | // Since we are holding the mutex, and GetLatestSequenceNumber is updated | |
490 | // after prepared_txns_ are, the value of GetLatestSequenceNumber would | |
491 | // reflect any uncommitted data that is not added to prepared_txns_ yet. | |
492 | // Otherwise, if there is no concurrent txn, this value simply reflects that | |
493 | // latest value in the memtable. | |
494 | if (prepared_txns_.empty()) { | |
495 | return db_impl_->GetLatestSequenceNumber() + 1; | |
496 | } else { | |
497 | return std::min(prepared_txns_.top(), | |
498 | db_impl_->GetLatestSequenceNumber() + 1); | |
499 | } | |
500 | } | |
501 | // Enhance the snapshot object by recording in it the smallest uncommitted seq | |
502 | inline void EnhanceSnapshot(SnapshotImpl* snapshot, | |
503 | SequenceNumber min_uncommitted) { | |
504 | assert(snapshot); | |
505 | snapshot->min_uncommitted_ = min_uncommitted; | |
506 | } | |
507 | ||
508 | virtual const std::vector<SequenceNumber> GetSnapshotListFromDB( | |
509 | SequenceNumber max); | |
510 | ||
511 | // Will be called by the public ReleaseSnapshot method. Does the maintenance | |
512 | // internal to WritePreparedTxnDB | |
513 | void ReleaseSnapshotInternal(const SequenceNumber snap_seq); | |
514 | ||
515 | // Update the list of snapshots corresponding to the soon-to-be-updated | |
516 | // max_evicted_seq_. Thread-safety: this function can be called concurrently. | |
517 | // The concurrent invocations of this function is equivalent to a serial | |
518 | // invocation in which the last invocation is the one with the largest | |
519 | // version value. | |
520 | void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots, | |
521 | const SequenceNumber& version); | |
522 | ||
523 | // Check an evicted entry against live snapshots to see if it should be kept | |
524 | // around or it can be safely discarded (and hence assume committed for all | |
525 | // snapshots). Thread-safety: this function can be called concurrently. If it | |
526 | // is called concurrently with multiple UpdateSnapshots, the result is the | |
527 | // same as checking the intersection of the snapshot list before updates with | |
528 | // the snapshot list of all the concurrent updates. | |
529 | void CheckAgainstSnapshots(const CommitEntry& evicted); | |
530 | ||
531 | // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < | |
532 | // commit_seq. Return false if checking the next snapshot(s) is not needed. | |
533 | // This is the case if none of the next snapshots could satisfy the condition. | |
534 | // next_is_larger: the next snapshot will be a larger value | |
535 | bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq, | |
536 | const uint64_t& commit_seq, | |
537 | const uint64_t& snapshot_seq, | |
538 | const bool next_is_larger); | |
539 | ||
540 | // The list of live snapshots at the last time that max_evicted_seq_ advanced. | |
541 | // The list stored into two data structures: in snapshot_cache_ that is | |
542 | // efficient for concurrent reads, and in snapshots_ if the data does not fit | |
543 | // into snapshot_cache_. The total number of snapshots in the two lists | |
544 | std::atomic<size_t> snapshots_total_ = {}; | |
545 | // The list sorted in ascending order. Thread-safety for writes is provided | |
546 | // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for | |
547 | // each entry. In x86_64 architecture such reads are compiled to simple read | |
548 | // instructions. 128 entries | |
549 | static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast<size_t>(7); | |
550 | const size_t SNAPSHOT_CACHE_BITS; | |
551 | const size_t SNAPSHOT_CACHE_SIZE; | |
552 | unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_; | |
553 | // 2nd list for storing snapshots. The list sorted in ascending order. | |
554 | // Thread-safety is provided with snapshots_mutex_. | |
555 | std::vector<SequenceNumber> snapshots_; | |
556 | // The version of the latest list of snapshots. This can be used to avoid | |
557 | // rewriting a list that is concurrently updated with a more recent version. | |
558 | SequenceNumber snapshots_version_ = 0; | |
559 | ||
560 | // A heap of prepared transactions. Thread-safety is provided with | |
561 | // prepared_mutex_. | |
562 | PreparedHeap prepared_txns_; | |
563 | // 8m entry, 64MB size | |
564 | static const size_t DEF_COMMIT_CACHE_BITS = static_cast<size_t>(23); | |
565 | const size_t COMMIT_CACHE_BITS; | |
566 | const size_t COMMIT_CACHE_SIZE; | |
567 | const CommitEntry64bFormat FORMAT; | |
568 | // commit_cache_ must be initialized to zero to tell apart an empty index from | |
569 | // a filled one. Thread-safety is provided with commit_cache_mutex_. | |
570 | unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_; | |
571 | // The largest evicted *commit* sequence number from the commit_cache_. If a | |
572 | // seq is smaller than max_evicted_seq_ is might or might not be present in | |
573 | // commit_cache_. So commit_cache_ must first be checked before consulting | |
574 | // with max_evicted_seq_. | |
575 | std::atomic<uint64_t> max_evicted_seq_ = {}; | |
576 | // Advance max_evicted_seq_ by this value each time it needs an update. The | |
577 | // larger the value, the less frequent advances we would have. We do not want | |
578 | // it to be too large either as it would cause stalls by doing too much | |
579 | // maintenance work under the lock. | |
580 | size_t INC_STEP_FOR_MAX_EVICTED = 1; | |
581 | // A map from old snapshots (expected to be used by a few read-only txns) to | |
582 | // prepared sequence number of the evicted entries from commit_cache_ that | |
583 | // overlaps with such snapshot. These are the prepared sequence numbers that | |
584 | // the snapshot, to which they are mapped, cannot assume to be committed just | |
585 | // because it is no longer in the commit_cache_. The vector must be sorted | |
586 | // after each update. | |
587 | // Thread-safety is provided with old_commit_map_mutex_. | |
588 | std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_; | |
589 | // A set of long-running prepared transactions that are not finished by the | |
590 | // time max_evicted_seq_ advances their sequence number. This is expected to | |
591 | // be empty normally. Thread-safety is provided with prepared_mutex_. | |
592 | std::set<uint64_t> delayed_prepared_; | |
593 | // Update when delayed_prepared_.empty() changes. Expected to be true | |
594 | // normally. | |
595 | std::atomic<bool> delayed_prepared_empty_ = {true}; | |
596 | // Update when old_commit_map_.empty() changes. Expected to be true normally. | |
597 | std::atomic<bool> old_commit_map_empty_ = {true}; | |
598 | mutable port::RWMutex prepared_mutex_; | |
599 | mutable port::RWMutex old_commit_map_mutex_; | |
600 | mutable port::RWMutex commit_cache_mutex_; | |
601 | mutable port::RWMutex snapshots_mutex_; | |
602 | // A cache of the cf comparators | |
603 | // Thread safety: since it is a const it is safe to read it concurrently | |
604 | std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_; | |
605 | // A cache of the cf handles | |
606 | // Thread safety: since the handle is read-only object it is a const it is | |
607 | // safe to read it concurrently | |
608 | std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_; | |
609 | }; | |
610 | ||
611 | class WritePreparedTxnReadCallback : public ReadCallback { | |
612 | public: | |
613 | WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, | |
614 | SequenceNumber min_uncommitted) | |
615 | : db_(db), snapshot_(snapshot), min_uncommitted_(min_uncommitted) {} | |
616 | ||
617 | // Will be called to see if the seq number visible; if not it moves on to | |
618 | // the next seq number. | |
619 | inline virtual bool IsVisible(SequenceNumber seq) override { | |
620 | return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_); | |
621 | } | |
622 | ||
623 | private: | |
624 | WritePreparedTxnDB* db_; | |
625 | SequenceNumber snapshot_; | |
626 | SequenceNumber min_uncommitted_; | |
627 | }; | |
628 | ||
629 | class AddPreparedCallback : public PreReleaseCallback { | |
630 | public: | |
631 | AddPreparedCallback(WritePreparedTxnDB* db, size_t sub_batch_cnt, | |
632 | bool two_write_queues) | |
633 | : db_(db), | |
634 | sub_batch_cnt_(sub_batch_cnt), | |
635 | two_write_queues_(two_write_queues) { | |
636 | (void)two_write_queues_; // to silence unused private field warning | |
637 | } | |
638 | virtual Status Callback(SequenceNumber prepare_seq, | |
639 | bool is_mem_disabled) override { | |
640 | #ifdef NDEBUG | |
641 | (void)is_mem_disabled; | |
642 | #endif | |
643 | assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue | |
644 | for (size_t i = 0; i < sub_batch_cnt_; i++) { | |
645 | db_->AddPrepared(prepare_seq + i); | |
646 | } | |
647 | return Status::OK(); | |
648 | } | |
649 | ||
650 | private: | |
651 | WritePreparedTxnDB* db_; | |
652 | size_t sub_batch_cnt_; | |
653 | bool two_write_queues_; | |
654 | }; | |
655 | ||
656 | class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { | |
657 | public: | |
658 | // includes_data indicates that the commit also writes non-empty | |
659 | // CommitTimeWriteBatch to memtable, which needs to be committed separately. | |
660 | WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db, | |
661 | DBImpl* db_impl, | |
662 | SequenceNumber prep_seq, | |
663 | size_t prep_batch_cnt, | |
664 | size_t data_batch_cnt = 0, | |
665 | bool publish_seq = true) | |
666 | : db_(db), | |
667 | db_impl_(db_impl), | |
668 | prep_seq_(prep_seq), | |
669 | prep_batch_cnt_(prep_batch_cnt), | |
670 | data_batch_cnt_(data_batch_cnt), | |
671 | includes_data_(data_batch_cnt_ > 0), | |
672 | publish_seq_(publish_seq) { | |
673 | assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor | |
674 | assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0); | |
675 | } | |
676 | ||
677 | virtual Status Callback(SequenceNumber commit_seq, | |
678 | bool is_mem_disabled) override { | |
679 | #ifdef NDEBUG | |
680 | (void)is_mem_disabled; | |
681 | #endif | |
682 | assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); | |
683 | const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) | |
684 | ? commit_seq | |
685 | : commit_seq + data_batch_cnt_ - 1; | |
686 | if (prep_seq_ != kMaxSequenceNumber) { | |
687 | for (size_t i = 0; i < prep_batch_cnt_; i++) { | |
688 | db_->AddCommitted(prep_seq_ + i, last_commit_seq); | |
689 | } | |
690 | } // else there was no prepare phase | |
691 | if (includes_data_) { | |
692 | assert(data_batch_cnt_); | |
693 | // Commit the data that is accompanied with the commit request | |
694 | for (size_t i = 0; i < data_batch_cnt_; i++) { | |
695 | // For commit seq of each batch use the commit seq of the last batch. | |
696 | // This would make debugging easier by having all the batches having | |
697 | // the same sequence number. | |
698 | db_->AddCommitted(commit_seq + i, last_commit_seq); | |
699 | } | |
700 | } | |
701 | if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) { | |
702 | assert(is_mem_disabled); // implies the 2nd queue | |
703 | // Publish the sequence number. We can do that here assuming the callback | |
704 | // is invoked only from one write queue, which would guarantee that the | |
705 | // publish sequence numbers will be in order, i.e., once a seq is | |
706 | // published all the seq prior to that are also publishable. | |
707 | db_impl_->SetLastPublishedSequence(last_commit_seq); | |
708 | } | |
709 | // else SequenceNumber that is updated as part of the write already does the | |
710 | // publishing | |
711 | return Status::OK(); | |
712 | } | |
713 | ||
714 | private: | |
715 | WritePreparedTxnDB* db_; | |
716 | DBImpl* db_impl_; | |
717 | // kMaxSequenceNumber if there was no prepare phase | |
718 | SequenceNumber prep_seq_; | |
719 | size_t prep_batch_cnt_; | |
720 | size_t data_batch_cnt_; | |
721 | // Either because it is commit without prepare or it has a | |
722 | // CommitTimeWriteBatch | |
723 | bool includes_data_; | |
724 | // Should the callback also publishes the commit seq number | |
725 | bool publish_seq_; | |
726 | }; | |
727 | ||
728 | // Count the number of sub-batches inside a batch. A sub-batch does not have | |
729 | // duplicate keys. | |
730 | struct SubBatchCounter : public WriteBatch::Handler { | |
731 | explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators) | |
732 | : comparators_(comparators), batches_(1) {} | |
733 | std::map<uint32_t, const Comparator*>& comparators_; | |
734 | using CFKeys = std::set<Slice, SetComparator>; | |
735 | std::map<uint32_t, CFKeys> keys_; | |
736 | size_t batches_; | |
737 | size_t BatchCount() { return batches_; } | |
738 | void AddKey(const uint32_t cf, const Slice& key); | |
739 | void InitWithComp(const uint32_t cf); | |
740 | Status MarkNoop(bool) override { return Status::OK(); } | |
741 | Status MarkEndPrepare(const Slice&) override { return Status::OK(); } | |
742 | Status MarkCommit(const Slice&) override { return Status::OK(); } | |
743 | Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { | |
744 | AddKey(cf, key); | |
745 | return Status::OK(); | |
746 | } | |
747 | Status DeleteCF(uint32_t cf, const Slice& key) override { | |
748 | AddKey(cf, key); | |
749 | return Status::OK(); | |
750 | } | |
751 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { | |
752 | AddKey(cf, key); | |
753 | return Status::OK(); | |
754 | } | |
755 | Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { | |
756 | AddKey(cf, key); | |
757 | return Status::OK(); | |
758 | } | |
759 | Status MarkBeginPrepare(bool) override { return Status::OK(); } | |
760 | Status MarkRollback(const Slice&) override { return Status::OK(); } | |
761 | bool WriteAfterCommit() const override { return false; } | |
762 | }; | |
763 | ||
764 | } // namespace rocksdb | |
765 | #endif // ROCKSDB_LITE |