]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn_db.h
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#pragma once
7#ifndef ROCKSDB_LITE
8
9#ifndef __STDC_FORMAT_MACROS
10#define __STDC_FORMAT_MACROS
11#endif
12
13#include <inttypes.h>
14#include <mutex>
15#include <queue>
16#include <set>
17#include <string>
18#include <unordered_map>
19#include <vector>
20
21#include "db/db_iter.h"
22#include "db/pre_release_callback.h"
23#include "db/read_callback.h"
24#include "db/snapshot_checker.h"
25#include "rocksdb/db.h"
26#include "rocksdb/options.h"
27#include "rocksdb/utilities/transaction_db.h"
28#include "util/set_comparator.h"
29#include "util/string_util.h"
30#include "utilities/transactions/pessimistic_transaction.h"
31#include "utilities/transactions/pessimistic_transaction_db.h"
32#include "utilities/transactions/transaction_lock_mgr.h"
33#include "utilities/transactions/write_prepared_txn.h"
34
35namespace rocksdb {
36
37#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \
38 ; // due to overhead by default skip such lines
39// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__)
40
41// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
42// In this way some data in the DB might not be committed. The DB provides
43// mechanisms to tell such data apart from committed data.
44class WritePreparedTxnDB : public PessimisticTransactionDB {
45 public:
46 explicit WritePreparedTxnDB(
47 DB* db, const TransactionDBOptions& txn_db_options,
48 size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
49 size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
50 : PessimisticTransactionDB(db, txn_db_options),
51 SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
52 SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
53 COMMIT_CACHE_BITS(commit_cache_bits),
54 COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
55 FORMAT(COMMIT_CACHE_BITS) {
56 Init(txn_db_options);
57 }
58
59 explicit WritePreparedTxnDB(
60 StackableDB* db, const TransactionDBOptions& txn_db_options,
61 size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
62 size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
63 : PessimisticTransactionDB(db, txn_db_options),
64 SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
65 SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
66 COMMIT_CACHE_BITS(commit_cache_bits),
67 COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
68 FORMAT(COMMIT_CACHE_BITS) {
69 Init(txn_db_options);
70 }
71
72 virtual ~WritePreparedTxnDB();
73
74 virtual Status Initialize(
75 const std::vector<size_t>& compaction_enabled_cf_indices,
76 const std::vector<ColumnFamilyHandle*>& handles) override;
77
78 Transaction* BeginTransaction(const WriteOptions& write_options,
79 const TransactionOptions& txn_options,
80 Transaction* old_txn) override;
81
82 // Optimized version of ::Write that receives more optimization request such
83 // as skip_concurrency_control.
84 using PessimisticTransactionDB::Write;
85 Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&,
86 WriteBatch* updates) override;
87
88 // Write the batch to the underlying DB and mark it as committed. Could be
89 // used by both directly from TxnDB or through a transaction.
90 Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch,
91 size_t batch_cnt, WritePreparedTxn* txn);
92
93 using DB::Get;
94 virtual Status Get(const ReadOptions& options,
95 ColumnFamilyHandle* column_family, const Slice& key,
96 PinnableSlice* value) override;
97
98 using DB::MultiGet;
99 virtual std::vector<Status> MultiGet(
100 const ReadOptions& options,
101 const std::vector<ColumnFamilyHandle*>& column_family,
102 const std::vector<Slice>& keys,
103 std::vector<std::string>* values) override;
104
105 using DB::NewIterator;
106 virtual Iterator* NewIterator(const ReadOptions& options,
107 ColumnFamilyHandle* column_family) override;
108
109 using DB::NewIterators;
110 virtual Status NewIterators(
111 const ReadOptions& options,
112 const std::vector<ColumnFamilyHandle*>& column_families,
113 std::vector<Iterator*>* iterators) override;
114
115 virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
116
117 // Check whether the transaction that wrote the value with sequence number seq
118 // is visible to the snapshot with sequence number snapshot_seq.
119 // Returns true if commit_seq <= snapshot_seq
120 inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
121 uint64_t min_uncommitted = 0) const {
122 ROCKS_LOG_DETAILS(info_log_,
123 "IsInSnapshot %" PRIu64 " in %" PRIu64
124 " min_uncommitted %" PRIu64,
125 prep_seq, snapshot_seq, min_uncommitted);
126 // Here we try to infer the return value without looking into prepare list.
127 // This would help avoiding synchronization over a shared map.
128 // TODO(myabandeh): optimize this. This sequence of checks must be correct
129 // but not necessary efficient
130 if (prep_seq == 0) {
131 // Compaction will output keys to bottom-level with sequence number 0 if
132 // it is visible to the earliest snapshot.
133 ROCKS_LOG_DETAILS(
134 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
135 prep_seq, snapshot_seq, 1);
136 return true;
137 }
138 if (snapshot_seq < prep_seq) {
139 // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
140 ROCKS_LOG_DETAILS(
141 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
142 prep_seq, snapshot_seq, 0);
143 return false;
144 }
145 if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
146 // We should not normally reach here
147 WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
148 ReadLock rl(&prepared_mutex_);
149 ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64,
150 static_cast<uint64_t>(delayed_prepared_.size()));
151 if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
152 // Then it is not committed yet
153 ROCKS_LOG_DETAILS(info_log_,
154 "IsInSnapshot %" PRIu64 " in %" PRIu64
155 " returns %" PRId32,
156 prep_seq, snapshot_seq, 0);
157 return false;
158 }
159 }
160 // Note: since min_uncommitted does not include the delayed_prepared_ we
161 // should check delayed_prepared_ first before applying this optimization.
162 // TODO(myabandeh): include delayed_prepared_ in min_uncommitted
163 if (prep_seq < min_uncommitted) {
164 ROCKS_LOG_DETAILS(info_log_,
165 "IsInSnapshot %" PRIu64 " in %" PRIu64
166 " returns %" PRId32
167 " because of min_uncommitted %" PRIu64,
168 prep_seq, snapshot_seq, 1, min_uncommitted);
169 return true;
170 }
171 auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
172 CommitEntry64b dont_care;
173 CommitEntry cached;
174 bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
175 if (exist && prep_seq == cached.prep_seq) {
176 // It is committed and also not evicted from commit cache
177 ROCKS_LOG_DETAILS(
178 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
179 prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
180 return cached.commit_seq <= snapshot_seq;
181 }
182 // else it could be committed but not inserted in the map which could happen
183 // after recovery, or it could be committed and evicted by another commit,
184 // or never committed.
185
186 // At this point we dont know if it was committed or it is still prepared
187 auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
188 // max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now
189 if (max_evicted_seq < prep_seq) {
190 // Not evicted from cache and also not present, so must be still prepared
191 ROCKS_LOG_DETAILS(
192 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
193 prep_seq, snapshot_seq, 0);
194 return false;
195 }
196 // When advancing max_evicted_seq_, we move older entires from prepared to
197 // delayed_prepared_. Also we move evicted entries from commit cache to
198 // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
199 // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
200 // old_commit_map_, iii) committed with no conflict with any snapshot. Case
201 // (i) delayed_prepared_ is checked above
202 if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case
203 // only (iii) is the case: committed
204 // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
205 // snapshot_seq
206 ROCKS_LOG_DETAILS(
207 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
208 prep_seq, snapshot_seq, 1);
209 return true;
210 }
211 // else (ii) might be the case: check the commit data saved for this
212 // snapshot. If there was no overlapping commit entry, then it is committed
213 // with a commit_seq lower than any live snapshot, including snapshot_seq.
214 if (old_commit_map_empty_.load(std::memory_order_acquire)) {
215 ROCKS_LOG_DETAILS(
216 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
217 prep_seq, snapshot_seq, 1);
218 return true;
219 }
220 {
221 // We should not normally reach here unless sapshot_seq is old. This is a
222 // rare case and it is ok to pay the cost of mutex ReadLock for such old,
223 // reading transactions.
224 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
225 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
226 ReadLock rl(&old_commit_map_mutex_);
227 auto prep_set_entry = old_commit_map_.find(snapshot_seq);
228 bool found = prep_set_entry != old_commit_map_.end();
229 if (found) {
230 auto& vec = prep_set_entry->second;
231 found = std::binary_search(vec.begin(), vec.end(), prep_seq);
232 }
233 if (!found) {
234 ROCKS_LOG_DETAILS(info_log_,
235 "IsInSnapshot %" PRIu64 " in %" PRIu64
236 " returns %" PRId32,
237 prep_seq, snapshot_seq, 1);
238 return true;
239 }
240 }
241 // (ii) it the case: it is committed but after the snapshot_seq
242 ROCKS_LOG_DETAILS(
243 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
244 prep_seq, snapshot_seq, 0);
245 return false;
246 }
247
248 // Add the transaction with prepare sequence seq to the prepared list
249 void AddPrepared(uint64_t seq);
250 // Remove the transaction with prepare sequence seq from the prepared list
251 void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
252 // Add the transaction with prepare sequence prepare_seq and commit sequence
253 // commit_seq to the commit map. loop_cnt is to detect infinite loops.
254 void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
255 uint8_t loop_cnt = 0);
256
257 struct CommitEntry {
258 uint64_t prep_seq;
259 uint64_t commit_seq;
260 CommitEntry() : prep_seq(0), commit_seq(0) {}
261 CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
262 bool operator==(const CommitEntry& rhs) const {
263 return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
264 }
265 };
266
267 struct CommitEntry64bFormat {
268 explicit CommitEntry64bFormat(size_t index_bits)
269 : INDEX_BITS(index_bits),
270 PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
271 COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
272 COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)),
273 DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {}
274 // Number of higher bits of a sequence number that is not used. They are
275 // used to encode the value type, ...
276 const size_t PAD_BITS = static_cast<size_t>(8);
277 // Number of lower bits from prepare seq that can be skipped as they are
278 // implied by the index of the entry in the array
279 const size_t INDEX_BITS;
280 // Number of bits we use to encode the prepare seq
281 const size_t PREP_BITS;
282 // Number of bits we use to encode the commit seq.
283 const size_t COMMIT_BITS;
284 // Filter to encode/decode commit seq
285 const uint64_t COMMIT_FILTER;
286 // The value of commit_seq - prepare_seq + 1 must be less than this bound
287 const uint64_t DELTA_UPPERBOUND;
288 };
289
290 // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
291 // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ...
292 // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA
293 // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
294 // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
295 // bits that do not have to be encoded (will be provided externally) DELTA:
296 // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
297 // index bits + PADs
298 struct CommitEntry64b {
299 constexpr CommitEntry64b() noexcept : rep_(0) {}
300
301 CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format)
302 : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {}
303
304 CommitEntry64b(const uint64_t ps, const uint64_t cs,
305 const CommitEntry64bFormat& format) {
306 assert(ps < static_cast<uint64_t>(
307 (1ull << (format.PREP_BITS + format.INDEX_BITS))));
308 assert(ps <= cs);
309 uint64_t delta = cs - ps + 1; // make initialized delta always >= 1
310 // zero is reserved for uninitialized entries
311 assert(0 < delta);
312 assert(delta < format.DELTA_UPPERBOUND);
313 if (delta >= format.DELTA_UPPERBOUND) {
314 throw std::runtime_error(
315 "commit_seq >> prepare_seq. The allowed distance is " +
316 ToString(format.DELTA_UPPERBOUND) + " commit_seq is " +
317 ToString(cs) + " prepare_seq is " + ToString(ps));
318 }
319 rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
320 rep_ = rep_ | delta;
321 }
322
323 // Return false if the entry is empty
324 bool Parse(const uint64_t indexed_seq, CommitEntry* entry,
325 const CommitEntry64bFormat& format) {
326 uint64_t delta = rep_ & format.COMMIT_FILTER;
327 // zero is reserved for uninitialized entries
328 assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
329 if (delta == 0) {
330 return false; // initialized entry would have non-zero delta
331 }
332
333 assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)));
334 uint64_t prep_up = rep_ & ~format.COMMIT_FILTER;
335 prep_up >>= format.PAD_BITS;
336 const uint64_t& prep_low = indexed_seq;
337 entry->prep_seq = prep_up | prep_low;
338
339 entry->commit_seq = entry->prep_seq + delta - 1;
340 return true;
341 }
342
343 private:
344 uint64_t rep_;
345 };
346
347 // Struct to hold ownership of snapshot and read callback for cleanup.
348 struct IteratorState;
349
350 std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() {
351 return cf_map_;
352 }
353 std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() {
354 return handle_map_;
355 }
356 void UpdateCFComparatorMap(
357 const std::vector<ColumnFamilyHandle*>& handles) override;
358 void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
359
360 virtual const Snapshot* GetSnapshot() override;
361
362 protected:
363 virtual Status VerifyCFOptions(
364 const ColumnFamilyOptions& cf_options) override;
365
366 private:
367 friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
368 friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
369 friend class WritePreparedTransactionTest_CommitMapTest_Test;
370 friend class
371 WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test;
372 friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test;
373 friend class WritePreparedTransactionTestBase;
374 friend class PreparedHeap_BasicsTest_Test;
375 friend class PreparedHeap_EmptyAtTheEnd_Test;
376 friend class PreparedHeap_Concurrent_Test;
377 friend class WritePreparedTxn;
378 friend class WritePreparedTxnDBMock;
379 friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
380 friend class
381 WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test;
382 friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
383 friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
384 friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
385 friend class WritePreparedTransactionTest_RollbackTest_Test;
386 friend class WriteUnpreparedTxnDB;
387 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
388
389 void Init(const TransactionDBOptions& /* unused */);
390
391 void WPRecordTick(uint32_t ticker_type) const {
392 RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);
393 }
394
395 // A heap with the amortized O(1) complexity for erase. It uses one extra heap
396 // to keep track of erased entries that are not yet on top of the main heap.
397 class PreparedHeap {
398 std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
399 heap_;
400 std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
401 erased_heap_;
402 // True when testing crash recovery
403 bool TEST_CRASH_ = false;
404 friend class WritePreparedTxnDB;
405
406 public:
407 ~PreparedHeap() {
408 if (!TEST_CRASH_) {
409 assert(heap_.empty());
410 assert(erased_heap_.empty());
411 }
412 }
413 bool empty() { return heap_.empty(); }
414 uint64_t top() { return heap_.top(); }
415 void push(uint64_t v) { heap_.push(v); }
416 void pop() {
417 heap_.pop();
418 while (!heap_.empty() && !erased_heap_.empty() &&
419 // heap_.top() > erased_heap_.top() could happen if we have erased
420 // a non-existent entry. Ideally the user should not do that but we
421 // should be resilient against it.
422 heap_.top() >= erased_heap_.top()) {
423 if (heap_.top() == erased_heap_.top()) {
424 heap_.pop();
425 }
426 uint64_t erased __attribute__((__unused__));
427 erased = erased_heap_.top();
428 erased_heap_.pop();
429 // No duplicate prepare sequence numbers
430 assert(erased_heap_.empty() || erased_heap_.top() != erased);
431 }
432 while (heap_.empty() && !erased_heap_.empty()) {
433 erased_heap_.pop();
434 }
435 }
436 void erase(uint64_t seq) {
437 if (!heap_.empty()) {
438 if (seq < heap_.top()) {
439 // Already popped, ignore it.
440 } else if (heap_.top() == seq) {
441 pop();
442 assert(heap_.empty() || heap_.top() != seq);
443 } else { // (heap_.top() > seq)
444 // Down the heap, remember to pop it later
445 erased_heap_.push(seq);
446 }
447 }
448 }
449 };
450
451 void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; }
452
453 // Get the commit entry with index indexed_seq from the commit table. It
454 // returns true if such entry exists.
455 bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
456 CommitEntry* entry) const;
457
458 // Rewrite the entry with the index indexed_seq in the commit table with the
459 // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
460 // sets the evicted_entry and returns true.
461 bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry,
462 CommitEntry* evicted_entry);
463
464 // Rewrite the entry with the index indexed_seq in the commit table with the
465 // commit entry new_entry only if the existing entry matches the
466 // expected_entry. Returns false otherwise.
467 bool ExchangeCommitEntry(const uint64_t indexed_seq,
468 CommitEntry64b& expected_entry,
469 const CommitEntry& new_entry);
470
471 // Increase max_evicted_seq_ from the previous value prev_max to the new
472 // value. This also involves taking care of prepared txns that are not
473 // committed before new_max, as well as updating the list of live snapshots at
474 // the time of updating the max. Thread-safety: this function can be called
475 // concurrently. The concurrent invocations of this function is equivalent to
476 // a serial invocation in which the last invocation is the one with the
477 // largest new_max value.
478 void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
479 const SequenceNumber& new_max);
480
481 inline SequenceNumber SmallestUnCommittedSeq() {
482 // Since we update the prepare_heap always from the main write queue via
483 // PreReleaseCallback, the prepared_txns_.top() indicates the smallest
484 // prepared data in 2pc transactions. For non-2pc transactions that are
485 // written in two steps, we also update prepared_txns_ at the first step
486 // (via the same mechanism) so that their uncommitted data is reflected in
487 // SmallestUnCommittedSeq.
488 ReadLock rl(&prepared_mutex_);
489 // Since we are holding the mutex, and GetLatestSequenceNumber is updated
490 // after prepared_txns_ are, the value of GetLatestSequenceNumber would
491 // reflect any uncommitted data that is not added to prepared_txns_ yet.
492 // Otherwise, if there is no concurrent txn, this value simply reflects that
493 // latest value in the memtable.
494 if (prepared_txns_.empty()) {
495 return db_impl_->GetLatestSequenceNumber() + 1;
496 } else {
497 return std::min(prepared_txns_.top(),
498 db_impl_->GetLatestSequenceNumber() + 1);
499 }
500 }
501 // Enhance the snapshot object by recording in it the smallest uncommitted seq
502 inline void EnhanceSnapshot(SnapshotImpl* snapshot,
503 SequenceNumber min_uncommitted) {
504 assert(snapshot);
505 snapshot->min_uncommitted_ = min_uncommitted;
506 }
507
508 virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
509 SequenceNumber max);
510
511 // Will be called by the public ReleaseSnapshot method. Does the maintenance
512 // internal to WritePreparedTxnDB
513 void ReleaseSnapshotInternal(const SequenceNumber snap_seq);
514
515 // Update the list of snapshots corresponding to the soon-to-be-updated
516 // max_evicted_seq_. Thread-safety: this function can be called concurrently.
517 // The concurrent invocations of this function is equivalent to a serial
518 // invocation in which the last invocation is the one with the largest
519 // version value.
520 void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
521 const SequenceNumber& version);
522
523 // Check an evicted entry against live snapshots to see if it should be kept
524 // around or it can be safely discarded (and hence assume committed for all
525 // snapshots). Thread-safety: this function can be called concurrently. If it
526 // is called concurrently with multiple UpdateSnapshots, the result is the
527 // same as checking the intersection of the snapshot list before updates with
528 // the snapshot list of all the concurrent updates.
529 void CheckAgainstSnapshots(const CommitEntry& evicted);
530
531 // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
532 // commit_seq. Return false if checking the next snapshot(s) is not needed.
533 // This is the case if none of the next snapshots could satisfy the condition.
534 // next_is_larger: the next snapshot will be a larger value
535 bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
536 const uint64_t& commit_seq,
537 const uint64_t& snapshot_seq,
538 const bool next_is_larger);
539
540 // The list of live snapshots at the last time that max_evicted_seq_ advanced.
541 // The list stored into two data structures: in snapshot_cache_ that is
542 // efficient for concurrent reads, and in snapshots_ if the data does not fit
543 // into snapshot_cache_. The total number of snapshots in the two lists
544 std::atomic<size_t> snapshots_total_ = {};
545 // The list sorted in ascending order. Thread-safety for writes is provided
546 // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
547 // each entry. In x86_64 architecture such reads are compiled to simple read
548 // instructions. 128 entries
549 static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast<size_t>(7);
550 const size_t SNAPSHOT_CACHE_BITS;
551 const size_t SNAPSHOT_CACHE_SIZE;
552 unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
553 // 2nd list for storing snapshots. The list sorted in ascending order.
554 // Thread-safety is provided with snapshots_mutex_.
555 std::vector<SequenceNumber> snapshots_;
556 // The version of the latest list of snapshots. This can be used to avoid
557 // rewriting a list that is concurrently updated with a more recent version.
558 SequenceNumber snapshots_version_ = 0;
559
560 // A heap of prepared transactions. Thread-safety is provided with
561 // prepared_mutex_.
562 PreparedHeap prepared_txns_;
563 // 8m entry, 64MB size
564 static const size_t DEF_COMMIT_CACHE_BITS = static_cast<size_t>(23);
565 const size_t COMMIT_CACHE_BITS;
566 const size_t COMMIT_CACHE_SIZE;
567 const CommitEntry64bFormat FORMAT;
568 // commit_cache_ must be initialized to zero to tell apart an empty index from
569 // a filled one. Thread-safety is provided with commit_cache_mutex_.
570 unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
571 // The largest evicted *commit* sequence number from the commit_cache_. If a
572 // seq is smaller than max_evicted_seq_ is might or might not be present in
573 // commit_cache_. So commit_cache_ must first be checked before consulting
574 // with max_evicted_seq_.
575 std::atomic<uint64_t> max_evicted_seq_ = {};
576 // Advance max_evicted_seq_ by this value each time it needs an update. The
577 // larger the value, the less frequent advances we would have. We do not want
578 // it to be too large either as it would cause stalls by doing too much
579 // maintenance work under the lock.
580 size_t INC_STEP_FOR_MAX_EVICTED = 1;
581 // A map from old snapshots (expected to be used by a few read-only txns) to
582 // prepared sequence number of the evicted entries from commit_cache_ that
583 // overlaps with such snapshot. These are the prepared sequence numbers that
584 // the snapshot, to which they are mapped, cannot assume to be committed just
585 // because it is no longer in the commit_cache_. The vector must be sorted
586 // after each update.
587 // Thread-safety is provided with old_commit_map_mutex_.
588 std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_;
589 // A set of long-running prepared transactions that are not finished by the
590 // time max_evicted_seq_ advances their sequence number. This is expected to
591 // be empty normally. Thread-safety is provided with prepared_mutex_.
592 std::set<uint64_t> delayed_prepared_;
593 // Update when delayed_prepared_.empty() changes. Expected to be true
594 // normally.
595 std::atomic<bool> delayed_prepared_empty_ = {true};
596 // Update when old_commit_map_.empty() changes. Expected to be true normally.
597 std::atomic<bool> old_commit_map_empty_ = {true};
598 mutable port::RWMutex prepared_mutex_;
599 mutable port::RWMutex old_commit_map_mutex_;
600 mutable port::RWMutex commit_cache_mutex_;
601 mutable port::RWMutex snapshots_mutex_;
602 // A cache of the cf comparators
603 // Thread safety: since it is a const it is safe to read it concurrently
604 std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_;
605 // A cache of the cf handles
606 // Thread safety: since the handle is read-only object it is a const it is
607 // safe to read it concurrently
608 std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
609};
610
611class WritePreparedTxnReadCallback : public ReadCallback {
612 public:
613 WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
614 SequenceNumber min_uncommitted)
615 : db_(db), snapshot_(snapshot), min_uncommitted_(min_uncommitted) {}
616
617 // Will be called to see if the seq number visible; if not it moves on to
618 // the next seq number.
619 inline virtual bool IsVisible(SequenceNumber seq) override {
620 return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_);
621 }
622
623 private:
624 WritePreparedTxnDB* db_;
625 SequenceNumber snapshot_;
626 SequenceNumber min_uncommitted_;
627};
628
629class AddPreparedCallback : public PreReleaseCallback {
630 public:
631 AddPreparedCallback(WritePreparedTxnDB* db, size_t sub_batch_cnt,
632 bool two_write_queues)
633 : db_(db),
634 sub_batch_cnt_(sub_batch_cnt),
635 two_write_queues_(two_write_queues) {
636 (void)two_write_queues_; // to silence unused private field warning
637 }
638 virtual Status Callback(SequenceNumber prepare_seq,
639 bool is_mem_disabled) override {
640#ifdef NDEBUG
641 (void)is_mem_disabled;
642#endif
643 assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue
644 for (size_t i = 0; i < sub_batch_cnt_; i++) {
645 db_->AddPrepared(prepare_seq + i);
646 }
647 return Status::OK();
648 }
649
650 private:
651 WritePreparedTxnDB* db_;
652 size_t sub_batch_cnt_;
653 bool two_write_queues_;
654};
655
656class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
657 public:
658 // includes_data indicates that the commit also writes non-empty
659 // CommitTimeWriteBatch to memtable, which needs to be committed separately.
660 WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db,
661 DBImpl* db_impl,
662 SequenceNumber prep_seq,
663 size_t prep_batch_cnt,
664 size_t data_batch_cnt = 0,
665 bool publish_seq = true)
666 : db_(db),
667 db_impl_(db_impl),
668 prep_seq_(prep_seq),
669 prep_batch_cnt_(prep_batch_cnt),
670 data_batch_cnt_(data_batch_cnt),
671 includes_data_(data_batch_cnt_ > 0),
672 publish_seq_(publish_seq) {
673 assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
674 assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
675 }
676
677 virtual Status Callback(SequenceNumber commit_seq,
678 bool is_mem_disabled) override {
679#ifdef NDEBUG
680 (void)is_mem_disabled;
681#endif
682 assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
683 const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
684 ? commit_seq
685 : commit_seq + data_batch_cnt_ - 1;
686 if (prep_seq_ != kMaxSequenceNumber) {
687 for (size_t i = 0; i < prep_batch_cnt_; i++) {
688 db_->AddCommitted(prep_seq_ + i, last_commit_seq);
689 }
690 } // else there was no prepare phase
691 if (includes_data_) {
692 assert(data_batch_cnt_);
693 // Commit the data that is accompanied with the commit request
694 for (size_t i = 0; i < data_batch_cnt_; i++) {
695 // For commit seq of each batch use the commit seq of the last batch.
696 // This would make debugging easier by having all the batches having
697 // the same sequence number.
698 db_->AddCommitted(commit_seq + i, last_commit_seq);
699 }
700 }
701 if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) {
702 assert(is_mem_disabled); // implies the 2nd queue
703 // Publish the sequence number. We can do that here assuming the callback
704 // is invoked only from one write queue, which would guarantee that the
705 // publish sequence numbers will be in order, i.e., once a seq is
706 // published all the seq prior to that are also publishable.
707 db_impl_->SetLastPublishedSequence(last_commit_seq);
708 }
709 // else SequenceNumber that is updated as part of the write already does the
710 // publishing
711 return Status::OK();
712 }
713
714 private:
715 WritePreparedTxnDB* db_;
716 DBImpl* db_impl_;
717 // kMaxSequenceNumber if there was no prepare phase
718 SequenceNumber prep_seq_;
719 size_t prep_batch_cnt_;
720 size_t data_batch_cnt_;
721 // Either because it is commit without prepare or it has a
722 // CommitTimeWriteBatch
723 bool includes_data_;
724 // Should the callback also publishes the commit seq number
725 bool publish_seq_;
726};
727
728// Count the number of sub-batches inside a batch. A sub-batch does not have
729// duplicate keys.
730struct SubBatchCounter : public WriteBatch::Handler {
731 explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
732 : comparators_(comparators), batches_(1) {}
733 std::map<uint32_t, const Comparator*>& comparators_;
734 using CFKeys = std::set<Slice, SetComparator>;
735 std::map<uint32_t, CFKeys> keys_;
736 size_t batches_;
737 size_t BatchCount() { return batches_; }
738 void AddKey(const uint32_t cf, const Slice& key);
739 void InitWithComp(const uint32_t cf);
740 Status MarkNoop(bool) override { return Status::OK(); }
741 Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
742 Status MarkCommit(const Slice&) override { return Status::OK(); }
743 Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
744 AddKey(cf, key);
745 return Status::OK();
746 }
747 Status DeleteCF(uint32_t cf, const Slice& key) override {
748 AddKey(cf, key);
749 return Status::OK();
750 }
751 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
752 AddKey(cf, key);
753 return Status::OK();
754 }
755 Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
756 AddKey(cf, key);
757 return Status::OK();
758 }
759 Status MarkBeginPrepare(bool) override { return Status::OK(); }
760 Status MarkRollback(const Slice&) override { return Status::OK(); }
761 bool WriteAfterCommit() const override { return false; }
762};
763
764} // namespace rocksdb
765#endif // ROCKSDB_LITE