1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
9 #ifndef __STDC_FORMAT_MACROS
10 #define __STDC_FORMAT_MACROS
18 #include <unordered_map>
21 #include "db/db_iter.h"
22 #include "db/pre_release_callback.h"
23 #include "db/read_callback.h"
24 #include "db/snapshot_checker.h"
25 #include "rocksdb/db.h"
26 #include "rocksdb/options.h"
27 #include "rocksdb/utilities/transaction_db.h"
28 #include "util/set_comparator.h"
29 #include "util/string_util.h"
30 #include "utilities/transactions/pessimistic_transaction.h"
31 #include "utilities/transactions/pessimistic_transaction_db.h"
32 #include "utilities/transactions/transaction_lock_mgr.h"
33 #include "utilities/transactions/write_prepared_txn.h"
37 // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
38 // In this way some data in the DB might not be committed. The DB provides
39 // mechanisms to tell such data apart from committed data.
40 class WritePreparedTxnDB
: public PessimisticTransactionDB
{
42 explicit WritePreparedTxnDB(DB
* db
,
43 const TransactionDBOptions
& txn_db_options
)
44 : PessimisticTransactionDB(db
, txn_db_options
),
45 SNAPSHOT_CACHE_BITS(txn_db_options
.wp_snapshot_cache_bits
),
46 SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS
)),
47 COMMIT_CACHE_BITS(txn_db_options
.wp_commit_cache_bits
),
48 COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS
)),
49 FORMAT(COMMIT_CACHE_BITS
) {
53 explicit WritePreparedTxnDB(StackableDB
* db
,
54 const TransactionDBOptions
& txn_db_options
)
55 : PessimisticTransactionDB(db
, txn_db_options
),
56 SNAPSHOT_CACHE_BITS(txn_db_options
.wp_snapshot_cache_bits
),
57 SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS
)),
58 COMMIT_CACHE_BITS(txn_db_options
.wp_commit_cache_bits
),
59 COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS
)),
60 FORMAT(COMMIT_CACHE_BITS
) {
64 virtual ~WritePreparedTxnDB();
66 virtual Status
Initialize(
67 const std::vector
<size_t>& compaction_enabled_cf_indices
,
68 const std::vector
<ColumnFamilyHandle
*>& handles
) override
;
70 Transaction
* BeginTransaction(const WriteOptions
& write_options
,
71 const TransactionOptions
& txn_options
,
72 Transaction
* old_txn
) override
;
74 // Optimized version of ::Write that receives more optimization request such
75 // as skip_concurrency_control.
76 using PessimisticTransactionDB::Write
;
77 Status
Write(const WriteOptions
& opts
, const TransactionDBWriteOptimizations
&,
78 WriteBatch
* updates
) override
;
80 // Write the batch to the underlying DB and mark it as committed. Could be
81 // used by both directly from TxnDB or through a transaction.
82 Status
WriteInternal(const WriteOptions
& write_options
, WriteBatch
* batch
,
83 size_t batch_cnt
, WritePreparedTxn
* txn
);
86 virtual Status
Get(const ReadOptions
& options
,
87 ColumnFamilyHandle
* column_family
, const Slice
& key
,
88 PinnableSlice
* value
) override
;
91 virtual std::vector
<Status
> MultiGet(
92 const ReadOptions
& options
,
93 const std::vector
<ColumnFamilyHandle
*>& column_family
,
94 const std::vector
<Slice
>& keys
,
95 std::vector
<std::string
>* values
) override
;
97 using DB::NewIterator
;
98 virtual Iterator
* NewIterator(const ReadOptions
& options
,
99 ColumnFamilyHandle
* column_family
) override
;
101 using DB::NewIterators
;
102 virtual Status
NewIterators(
103 const ReadOptions
& options
,
104 const std::vector
<ColumnFamilyHandle
*>& column_families
,
105 std::vector
<Iterator
*>* iterators
) override
;
107 // Check whether the transaction that wrote the value with sequence number seq
108 // is visible to the snapshot with sequence number snapshot_seq.
109 // Returns true if commit_seq <= snapshot_seq
110 // If the snapshot_seq is already released and snapshot_seq <= max, sets
111 // *snap_released to true and returns true as well.
112 inline bool IsInSnapshot(uint64_t prep_seq
, uint64_t snapshot_seq
,
113 uint64_t min_uncommitted
= kMinUnCommittedSeq
,
114 bool* snap_released
= nullptr) const {
115 ROCKS_LOG_DETAILS(info_log_
,
116 "IsInSnapshot %" PRIu64
" in %" PRIu64
117 " min_uncommitted %" PRIu64
,
118 prep_seq
, snapshot_seq
, min_uncommitted
);
119 assert(min_uncommitted
>= kMinUnCommittedSeq
);
120 // Caller is responsible to initialize snap_released.
121 assert(snap_released
== nullptr || *snap_released
== false);
122 // Here we try to infer the return value without looking into prepare list.
123 // This would help avoiding synchronization over a shared map.
124 // TODO(myabandeh): optimize this. This sequence of checks must be correct
125 // but not necessary efficient
127 // Compaction will output keys to bottom-level with sequence number 0 if
128 // it is visible to the earliest snapshot.
130 info_log_
, "IsInSnapshot %" PRIu64
" in %" PRIu64
" returns %" PRId32
,
131 prep_seq
, snapshot_seq
, 1);
134 if (snapshot_seq
< prep_seq
) {
135 // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
137 info_log_
, "IsInSnapshot %" PRIu64
" in %" PRIu64
" returns %" PRId32
,
138 prep_seq
, snapshot_seq
, 0);
141 if (prep_seq
< min_uncommitted
) {
142 ROCKS_LOG_DETAILS(info_log_
,
143 "IsInSnapshot %" PRIu64
" in %" PRIu64
145 " because of min_uncommitted %" PRIu64
,
146 prep_seq
, snapshot_seq
, 1, min_uncommitted
);
149 // Commit of delayed prepared has two non-atomic steps: add to commit cache,
150 // remove from delayed prepared. Our reads from these two is also
151 // non-atomic. By looking into commit cache first thus we might not find the
152 // prep_seq neither in commit cache not in delayed_prepared_. To fix that i)
153 // we check if there was any delayed prepared BEFORE looking into commit
154 // cache, ii) if there was, we complete the search steps to be these: i)
155 // commit cache, ii) delayed prepared, commit cache again. In this way if
156 // the first query to commit cache missed the commit, the 2nd will catch it.
158 SequenceNumber max_evicted_seq_lb
, max_evicted_seq_ub
;
159 CommitEntry64b dont_care
;
160 auto indexed_seq
= prep_seq
% COMMIT_CACHE_SIZE
;
164 assert(repeats
< 100);
165 if (UNLIKELY(repeats
>= 100)) {
166 throw std::runtime_error(
167 "The read was intrupted 100 times by update to max_evicted_seq_. "
168 "This is unexpected in all setups");
170 max_evicted_seq_lb
= max_evicted_seq_
.load(std::memory_order_acquire
);
172 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause");
174 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume");
175 was_empty
= delayed_prepared_empty_
.load(std::memory_order_acquire
);
177 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause");
179 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume");
181 bool exist
= GetCommitEntry(indexed_seq
, &dont_care
, &cached
);
182 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
183 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
184 if (exist
&& prep_seq
== cached
.prep_seq
) {
185 // It is committed and also not evicted from commit cache
188 "IsInSnapshot %" PRIu64
" in %" PRIu64
" returns %" PRId32
,
189 prep_seq
, snapshot_seq
, cached
.commit_seq
<= snapshot_seq
);
190 return cached
.commit_seq
<= snapshot_seq
;
192 // else it could be committed but not inserted in the map which could
193 // happen after recovery, or it could be committed and evicted by another
194 // commit, or never committed.
196 // At this point we dont know if it was committed or it is still prepared
197 max_evicted_seq_ub
= max_evicted_seq_
.load(std::memory_order_acquire
);
198 if (UNLIKELY(max_evicted_seq_lb
!= max_evicted_seq_ub
)) {
201 // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub
202 if (max_evicted_seq_ub
< prep_seq
) {
203 // Not evicted from cache and also not present, so must be still
205 ROCKS_LOG_DETAILS(info_log_
,
206 "IsInSnapshot %" PRIu64
" in %" PRIu64
208 prep_seq
, snapshot_seq
, 0);
211 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause");
213 "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume");
215 // We should not normally reach here
216 WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD
);
217 ReadLock
rl(&prepared_mutex_
);
219 info_log_
, "prepared_mutex_ overhead %" PRIu64
" for %" PRIu64
,
220 static_cast<uint64_t>(delayed_prepared_
.size()), prep_seq
);
221 if (delayed_prepared_
.find(prep_seq
) != delayed_prepared_
.end()) {
222 // This is the order: 1) delayed_prepared_commits_ update, 2) publish
223 // 3) delayed_prepared_ clean up. So check if it is the case of a late
225 auto it
= delayed_prepared_commits_
.find(prep_seq
);
226 if (it
== delayed_prepared_commits_
.end()) {
227 // Then it is not committed yet
228 ROCKS_LOG_DETAILS(info_log_
,
229 "IsInSnapshot %" PRIu64
" in %" PRIu64
231 prep_seq
, snapshot_seq
, 0);
234 ROCKS_LOG_DETAILS(info_log_
,
235 "IsInSnapshot %" PRIu64
" in %" PRIu64
236 " commit: %" PRIu64
" returns %" PRId32
,
237 prep_seq
, snapshot_seq
, it
->second
,
238 snapshot_seq
<= it
->second
);
239 return it
->second
<= snapshot_seq
;
242 // 2nd query to commit cache. Refer to was_empty comment above.
243 exist
= GetCommitEntry(indexed_seq
, &dont_care
, &cached
);
244 if (exist
&& prep_seq
== cached
.prep_seq
) {
247 "IsInSnapshot %" PRIu64
" in %" PRIu64
" returns %" PRId32
,
248 prep_seq
, snapshot_seq
, cached
.commit_seq
<= snapshot_seq
);
249 return cached
.commit_seq
<= snapshot_seq
;
251 max_evicted_seq_ub
= max_evicted_seq_
.load(std::memory_order_acquire
);
254 } while (UNLIKELY(max_evicted_seq_lb
!= max_evicted_seq_ub
));
255 // When advancing max_evicted_seq_, we move older entires from prepared to
256 // delayed_prepared_. Also we move evicted entries from commit cache to
257 // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
258 // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
259 // old_commit_map_, iii) committed with no conflict with any snapshot. Case
260 // (i) delayed_prepared_ is checked above
261 if (max_evicted_seq_ub
< snapshot_seq
) { // then (ii) cannot be the case
262 // only (iii) is the case: committed
263 // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
266 info_log_
, "IsInSnapshot %" PRIu64
" in %" PRIu64
" returns %" PRId32
,
267 prep_seq
, snapshot_seq
, 1);
270 // else (ii) might be the case: check the commit data saved for this
271 // snapshot. If there was no overlapping commit entry, then it is committed
272 // with a commit_seq lower than any live snapshot, including snapshot_seq.
273 if (old_commit_map_empty_
.load(std::memory_order_acquire
)) {
274 ROCKS_LOG_DETAILS(info_log_
,
275 "IsInSnapshot %" PRIu64
" in %" PRIu64
276 " returns %" PRId32
" released=1",
277 prep_seq
, snapshot_seq
, 0);
278 assert(snap_released
);
279 // This snapshot is not valid anymore. We cannot tell if prep_seq is
280 // committed before or after the snapshot. Return true but also set
281 // snap_released to true.
282 *snap_released
= true;
286 // We should not normally reach here unless sapshot_seq is old. This is a
287 // rare case and it is ok to pay the cost of mutex ReadLock for such old,
288 // reading transactions.
289 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD
);
290 ReadLock
rl(&old_commit_map_mutex_
);
291 auto prep_set_entry
= old_commit_map_
.find(snapshot_seq
);
292 bool found
= prep_set_entry
!= old_commit_map_
.end();
294 auto& vec
= prep_set_entry
->second
;
295 found
= std::binary_search(vec
.begin(), vec
.end(), prep_seq
);
297 // coming from compaction
298 ROCKS_LOG_DETAILS(info_log_
,
299 "IsInSnapshot %" PRIu64
" in %" PRIu64
300 " returns %" PRId32
" released=1",
301 prep_seq
, snapshot_seq
, 0);
302 // This snapshot is not valid anymore. We cannot tell if prep_seq is
303 // committed before or after the snapshot. Return true but also set
304 // snap_released to true.
305 assert(snap_released
);
306 *snap_released
= true;
311 ROCKS_LOG_DETAILS(info_log_
,
312 "IsInSnapshot %" PRIu64
" in %" PRIu64
314 prep_seq
, snapshot_seq
, 1);
318 // (ii) it the case: it is committed but after the snapshot_seq
320 info_log_
, "IsInSnapshot %" PRIu64
" in %" PRIu64
" returns %" PRId32
,
321 prep_seq
, snapshot_seq
, 0);
325 // Add the transaction with prepare sequence seq to the prepared list.
326 // Note: must be called serially with increasing seq on each call.
327 void AddPrepared(uint64_t seq
);
328 // Check if any of the prepared txns are less than new max_evicted_seq_. Must
329 // be called with prepared_mutex_ write locked.
330 void CheckPreparedAgainstMax(SequenceNumber new_max
);
331 // Remove the transaction with prepare sequence seq from the prepared list
332 void RemovePrepared(const uint64_t seq
, const size_t batch_cnt
= 1);
333 // Add the transaction with prepare sequence prepare_seq and commit sequence
334 // commit_seq to the commit map. loop_cnt is to detect infinite loops.
335 // Note: must be called serially.
336 void AddCommitted(uint64_t prepare_seq
, uint64_t commit_seq
,
337 uint8_t loop_cnt
= 0);
342 CommitEntry() : prep_seq(0), commit_seq(0) {}
343 CommitEntry(uint64_t ps
, uint64_t cs
) : prep_seq(ps
), commit_seq(cs
) {}
344 bool operator==(const CommitEntry
& rhs
) const {
345 return prep_seq
== rhs
.prep_seq
&& commit_seq
== rhs
.commit_seq
;
349 struct CommitEntry64bFormat
{
350 explicit CommitEntry64bFormat(size_t index_bits
)
351 : INDEX_BITS(index_bits
),
352 PREP_BITS(static_cast<size_t>(64 - PAD_BITS
- INDEX_BITS
)),
353 COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS
)),
354 COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS
) - 1)),
355 DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS
))) {}
356 // Number of higher bits of a sequence number that is not used. They are
357 // used to encode the value type, ...
358 const size_t PAD_BITS
= static_cast<size_t>(8);
359 // Number of lower bits from prepare seq that can be skipped as they are
360 // implied by the index of the entry in the array
361 const size_t INDEX_BITS
;
362 // Number of bits we use to encode the prepare seq
363 const size_t PREP_BITS
;
364 // Number of bits we use to encode the commit seq.
365 const size_t COMMIT_BITS
;
366 // Filter to encode/decode commit seq
367 const uint64_t COMMIT_FILTER
;
368 // The value of commit_seq - prepare_seq + 1 must be less than this bound
369 const uint64_t DELTA_UPPERBOUND
;
372 // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
373 // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ...
374 // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA
375 // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
376 // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
377 // bits that do not have to be encoded (will be provided externally) DELTA:
378 // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
380 struct CommitEntry64b
{
381 constexpr CommitEntry64b() noexcept
: rep_(0) {}
383 CommitEntry64b(const CommitEntry
& entry
, const CommitEntry64bFormat
& format
)
384 : CommitEntry64b(entry
.prep_seq
, entry
.commit_seq
, format
) {}
386 CommitEntry64b(const uint64_t ps
, const uint64_t cs
,
387 const CommitEntry64bFormat
& format
) {
388 assert(ps
< static_cast<uint64_t>(
389 (1ull << (format
.PREP_BITS
+ format
.INDEX_BITS
))));
391 uint64_t delta
= cs
- ps
+ 1; // make initialized delta always >= 1
392 // zero is reserved for uninitialized entries
394 assert(delta
< format
.DELTA_UPPERBOUND
);
395 if (delta
>= format
.DELTA_UPPERBOUND
) {
396 throw std::runtime_error(
397 "commit_seq >> prepare_seq. The allowed distance is " +
398 ToString(format
.DELTA_UPPERBOUND
) + " commit_seq is " +
399 ToString(cs
) + " prepare_seq is " + ToString(ps
));
401 rep_
= (ps
<< format
.PAD_BITS
) & ~format
.COMMIT_FILTER
;
405 // Return false if the entry is empty
406 bool Parse(const uint64_t indexed_seq
, CommitEntry
* entry
,
407 const CommitEntry64bFormat
& format
) {
408 uint64_t delta
= rep_
& format
.COMMIT_FILTER
;
409 // zero is reserved for uninitialized entries
410 assert(delta
< static_cast<uint64_t>((1ull << format
.COMMIT_BITS
)));
412 return false; // initialized entry would have non-zero delta
415 assert(indexed_seq
< static_cast<uint64_t>((1ull << format
.INDEX_BITS
)));
416 uint64_t prep_up
= rep_
& ~format
.COMMIT_FILTER
;
417 prep_up
>>= format
.PAD_BITS
;
418 const uint64_t& prep_low
= indexed_seq
;
419 entry
->prep_seq
= prep_up
| prep_low
;
421 entry
->commit_seq
= entry
->prep_seq
+ delta
- 1;
429 // Struct to hold ownership of snapshot and read callback for cleanup.
430 struct IteratorState
;
432 std::shared_ptr
<std::map
<uint32_t, const Comparator
*>> GetCFComparatorMap() {
435 std::shared_ptr
<std::map
<uint32_t, ColumnFamilyHandle
*>> GetCFHandleMap() {
438 void UpdateCFComparatorMap(
439 const std::vector
<ColumnFamilyHandle
*>& handles
) override
;
440 void UpdateCFComparatorMap(ColumnFamilyHandle
* handle
) override
;
442 virtual const Snapshot
* GetSnapshot() override
;
443 SnapshotImpl
* GetSnapshotInternal(bool for_ww_conflict_check
);
446 virtual Status
VerifyCFOptions(
447 const ColumnFamilyOptions
& cf_options
) override
;
450 friend class PreparedHeap_BasicsTest_Test
;
451 friend class PreparedHeap_Concurrent_Test
;
452 friend class PreparedHeap_EmptyAtTheEnd_Test
;
453 friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test
;
454 friend class WritePreparedCommitEntryPreReleaseCallback
;
455 friend class WritePreparedTransactionTestBase
;
456 friend class WritePreparedTxn
;
457 friend class WritePreparedTxnDBMock
;
458 friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test
;
459 friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test
;
461 WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test
;
462 friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test
;
463 friend class WritePreparedTransactionTest_BasicRecoveryTest_Test
;
464 friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test
;
465 friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test
;
467 WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test
;
468 friend class WritePreparedTransactionTest_CommitMapTest_Test
;
469 friend class WritePreparedTransactionTest_DoubleSnapshot_Test
;
470 friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test
;
471 friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test
;
472 friend class WritePreparedTransactionTest_IsInSnapshotTest_Test
;
473 friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test
;
474 friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test
;
476 WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test
;
478 WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test
;
479 friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test
;
480 friend class WritePreparedTransactionTest_OldCommitMapGC_Test
;
481 friend class WritePreparedTransactionTest_RollbackTest_Test
;
482 friend class WriteUnpreparedTxnDB
;
483 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test
;
485 void Init(const TransactionDBOptions
& /* unused */);
487 void WPRecordTick(uint32_t ticker_type
) const {
488 RecordTick(db_impl_
->immutable_db_options_
.statistics
.get(), ticker_type
);
491 // A heap with the amortized O(1) complexity for erase. It uses one extra heap
492 // to keep track of erased entries that are not yet on top of the main heap.
494 std::priority_queue
<uint64_t, std::vector
<uint64_t>, std::greater
<uint64_t>>
496 std::priority_queue
<uint64_t, std::vector
<uint64_t>, std::greater
<uint64_t>>
498 // True when testing crash recovery
499 bool TEST_CRASH_
= false;
500 friend class WritePreparedTxnDB
;
505 assert(heap_
.empty());
506 assert(erased_heap_
.empty());
509 bool empty() { return heap_
.empty(); }
510 uint64_t top() { return heap_
.top(); }
511 void push(uint64_t v
) { heap_
.push(v
); }
514 while (!heap_
.empty() && !erased_heap_
.empty() &&
515 // heap_.top() > erased_heap_.top() could happen if we have erased
516 // a non-existent entry. Ideally the user should not do that but we
517 // should be resilient against it.
518 heap_
.top() >= erased_heap_
.top()) {
519 if (heap_
.top() == erased_heap_
.top()) {
522 uint64_t erased
__attribute__((__unused__
));
523 erased
= erased_heap_
.top();
525 // No duplicate prepare sequence numbers
526 assert(erased_heap_
.empty() || erased_heap_
.top() != erased
);
528 while (heap_
.empty() && !erased_heap_
.empty()) {
532 void erase(uint64_t seq
) {
533 if (!heap_
.empty()) {
534 if (seq
< heap_
.top()) {
535 // Already popped, ignore it.
536 } else if (heap_
.top() == seq
) {
538 assert(heap_
.empty() || heap_
.top() != seq
);
539 } else { // (heap_.top() > seq)
540 // Down the heap, remember to pop it later
541 erased_heap_
.push(seq
);
547 void TEST_Crash() override
{ prepared_txns_
.TEST_CRASH_
= true; }
549 // Get the commit entry with index indexed_seq from the commit table. It
550 // returns true if such entry exists.
551 bool GetCommitEntry(const uint64_t indexed_seq
, CommitEntry64b
* entry_64b
,
552 CommitEntry
* entry
) const;
554 // Rewrite the entry with the index indexed_seq in the commit table with the
555 // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
556 // sets the evicted_entry and returns true.
557 bool AddCommitEntry(const uint64_t indexed_seq
, const CommitEntry
& new_entry
,
558 CommitEntry
* evicted_entry
);
560 // Rewrite the entry with the index indexed_seq in the commit table with the
561 // commit entry new_entry only if the existing entry matches the
562 // expected_entry. Returns false otherwise.
563 bool ExchangeCommitEntry(const uint64_t indexed_seq
,
564 CommitEntry64b
& expected_entry
,
565 const CommitEntry
& new_entry
);
567 // Increase max_evicted_seq_ from the previous value prev_max to the new
568 // value. This also involves taking care of prepared txns that are not
569 // committed before new_max, as well as updating the list of live snapshots at
570 // the time of updating the max. Thread-safety: this function can be called
571 // concurrently. The concurrent invocations of this function is equivalent to
572 // a serial invocation in which the last invocation is the one with the
573 // largest new_max value.
574 void AdvanceMaxEvictedSeq(const SequenceNumber
& prev_max
,
575 const SequenceNumber
& new_max
);
577 inline SequenceNumber
SmallestUnCommittedSeq() {
578 // Since we update the prepare_heap always from the main write queue via
579 // PreReleaseCallback, the prepared_txns_.top() indicates the smallest
580 // prepared data in 2pc transactions. For non-2pc transactions that are
581 // written in two steps, we also update prepared_txns_ at the first step
582 // (via the same mechanism) so that their uncommitted data is reflected in
583 // SmallestUnCommittedSeq.
584 ReadLock
rl(&prepared_mutex_
);
585 // Since we are holding the mutex, and GetLatestSequenceNumber is updated
586 // after prepared_txns_ are, the value of GetLatestSequenceNumber would
587 // reflect any uncommitted data that is not added to prepared_txns_ yet.
588 // Otherwise, if there is no concurrent txn, this value simply reflects that
589 // latest value in the memtable.
590 if (!delayed_prepared_
.empty()) {
591 assert(!delayed_prepared_empty_
.load());
592 return *delayed_prepared_
.begin();
594 if (prepared_txns_
.empty()) {
595 return db_impl_
->GetLatestSequenceNumber() + 1;
597 return std::min(prepared_txns_
.top(),
598 db_impl_
->GetLatestSequenceNumber() + 1);
601 // Enhance the snapshot object by recording in it the smallest uncommitted seq
602 inline void EnhanceSnapshot(SnapshotImpl
* snapshot
,
603 SequenceNumber min_uncommitted
) {
605 snapshot
->min_uncommitted_
= min_uncommitted
;
608 virtual const std::vector
<SequenceNumber
> GetSnapshotListFromDB(
611 // Will be called by the public ReleaseSnapshot method. Does the maintenance
612 // internal to WritePreparedTxnDB
613 void ReleaseSnapshotInternal(const SequenceNumber snap_seq
);
615 // Update the list of snapshots corresponding to the soon-to-be-updated
616 // max_evicted_seq_. Thread-safety: this function can be called concurrently.
617 // The concurrent invocations of this function is equivalent to a serial
618 // invocation in which the last invocation is the one with the largest
620 void UpdateSnapshots(const std::vector
<SequenceNumber
>& snapshots
,
621 const SequenceNumber
& version
);
622 // Check the new list of new snapshots against the old one to see if any of
623 // the snapshots are released and to do the cleanup for the released snapshot.
624 void CleanupReleasedSnapshots(
625 const std::vector
<SequenceNumber
>& new_snapshots
,
626 const std::vector
<SequenceNumber
>& old_snapshots
);
628 // Check an evicted entry against live snapshots to see if it should be kept
629 // around or it can be safely discarded (and hence assume committed for all
630 // snapshots). Thread-safety: this function can be called concurrently. If it
631 // is called concurrently with multiple UpdateSnapshots, the result is the
632 // same as checking the intersection of the snapshot list before updates with
633 // the snapshot list of all the concurrent updates.
634 void CheckAgainstSnapshots(const CommitEntry
& evicted
);
636 // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
637 // commit_seq. Return false if checking the next snapshot(s) is not needed.
638 // This is the case if none of the next snapshots could satisfy the condition.
639 // next_is_larger: the next snapshot will be a larger value
640 bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq
,
641 const uint64_t& commit_seq
,
642 const uint64_t& snapshot_seq
,
643 const bool next_is_larger
);
645 // A trick to increase the last visible sequence number by one and also wait
646 // for the in-flight commits to be visible.
647 void AdvanceSeqByOne();
649 // The list of live snapshots at the last time that max_evicted_seq_ advanced.
650 // The list stored into two data structures: in snapshot_cache_ that is
651 // efficient for concurrent reads, and in snapshots_ if the data does not fit
652 // into snapshot_cache_. The total number of snapshots in the two lists
653 std::atomic
<size_t> snapshots_total_
= {};
654 // The list sorted in ascending order. Thread-safety for writes is provided
655 // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
656 // each entry. In x86_64 architecture such reads are compiled to simple read
658 const size_t SNAPSHOT_CACHE_BITS
;
659 const size_t SNAPSHOT_CACHE_SIZE
;
660 std::unique_ptr
<std::atomic
<SequenceNumber
>[]> snapshot_cache_
;
661 // 2nd list for storing snapshots. The list sorted in ascending order.
662 // Thread-safety is provided with snapshots_mutex_.
663 std::vector
<SequenceNumber
> snapshots_
;
664 // The list of all snapshots: snapshots_ + snapshot_cache_. This list although
665 // redundant but simplifies CleanupOldSnapshots implementation.
666 // Thread-safety is provided with snapshots_mutex_.
667 std::vector
<SequenceNumber
> snapshots_all_
;
668 // The version of the latest list of snapshots. This can be used to avoid
669 // rewriting a list that is concurrently updated with a more recent version.
670 SequenceNumber snapshots_version_
= 0;
672 // A heap of prepared transactions. Thread-safety is provided with
674 PreparedHeap prepared_txns_
;
675 const size_t COMMIT_CACHE_BITS
;
676 const size_t COMMIT_CACHE_SIZE
;
677 const CommitEntry64bFormat FORMAT
;
678 // commit_cache_ must be initialized to zero to tell apart an empty index from
679 // a filled one. Thread-safety is provided with commit_cache_mutex_.
680 std::unique_ptr
<std::atomic
<CommitEntry64b
>[]> commit_cache_
;
681 // The largest evicted *commit* sequence number from the commit_cache_. If a
682 // seq is smaller than max_evicted_seq_ is might or might not be present in
683 // commit_cache_. So commit_cache_ must first be checked before consulting
684 // with max_evicted_seq_.
685 std::atomic
<uint64_t> max_evicted_seq_
= {};
686 // Order: 1) update future_max_evicted_seq_ = new_max, 2)
687 // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since
688 // GetSnapshotInternal guarantess that the snapshot seq is larger than
689 // future_max_evicted_seq_, this guarantes that if a snapshot is not larger
690 // than max has already being looked at via a GetSnapshotListFromDB(new_max).
691 std::atomic
<uint64_t> future_max_evicted_seq_
= {};
692 // Advance max_evicted_seq_ by this value each time it needs an update. The
693 // larger the value, the less frequent advances we would have. We do not want
694 // it to be too large either as it would cause stalls by doing too much
695 // maintenance work under the lock.
696 size_t INC_STEP_FOR_MAX_EVICTED
= 1;
697 // A map from old snapshots (expected to be used by a few read-only txns) to
698 // prepared sequence number of the evicted entries from commit_cache_ that
699 // overlaps with such snapshot. These are the prepared sequence numbers that
700 // the snapshot, to which they are mapped, cannot assume to be committed just
701 // because it is no longer in the commit_cache_. The vector must be sorted
702 // after each update.
703 // Thread-safety is provided with old_commit_map_mutex_.
704 std::map
<SequenceNumber
, std::vector
<SequenceNumber
>> old_commit_map_
;
705 // A set of long-running prepared transactions that are not finished by the
706 // time max_evicted_seq_ advances their sequence number. This is expected to
707 // be empty normally. Thread-safety is provided with prepared_mutex_.
708 std::set
<uint64_t> delayed_prepared_
;
709 // Commit of a delayed prepared: 1) update commit cache, 2) update
710 // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_.
711 // delayed_prepared_commits_ will help us tell apart the unprepared txns from
712 // the ones that are committed but not cleaned up yet.
713 std::unordered_map
<SequenceNumber
, SequenceNumber
> delayed_prepared_commits_
;
714 // Update when delayed_prepared_.empty() changes. Expected to be true
716 std::atomic
<bool> delayed_prepared_empty_
= {true};
717 // Update when old_commit_map_.empty() changes. Expected to be true normally.
718 std::atomic
<bool> old_commit_map_empty_
= {true};
719 mutable port::RWMutex prepared_mutex_
;
720 mutable port::RWMutex old_commit_map_mutex_
;
721 mutable port::RWMutex commit_cache_mutex_
;
722 mutable port::RWMutex snapshots_mutex_
;
723 // A cache of the cf comparators
724 // Thread safety: since it is a const it is safe to read it concurrently
725 std::shared_ptr
<std::map
<uint32_t, const Comparator
*>> cf_map_
;
726 // A cache of the cf handles
727 // Thread safety: since the handle is read-only object it is a const it is
728 // safe to read it concurrently
729 std::shared_ptr
<std::map
<uint32_t, ColumnFamilyHandle
*>> handle_map_
;
732 class WritePreparedTxnReadCallback
: public ReadCallback
{
734 WritePreparedTxnReadCallback(WritePreparedTxnDB
* db
, SequenceNumber snapshot
)
735 : ReadCallback(snapshot
), db_(db
) {}
736 WritePreparedTxnReadCallback(WritePreparedTxnDB
* db
, SequenceNumber snapshot
,
737 SequenceNumber min_uncommitted
)
738 : ReadCallback(snapshot
, min_uncommitted
), db_(db
) {}
740 // Will be called to see if the seq number visible; if not it moves on to
741 // the next seq number.
742 inline virtual bool IsVisibleFullCheck(SequenceNumber seq
) override
{
743 auto snapshot
= max_visible_seq_
;
744 return db_
->IsInSnapshot(seq
, snapshot
, min_uncommitted_
);
747 // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
749 WritePreparedTxnDB
* db_
;
752 class AddPreparedCallback
: public PreReleaseCallback
{
754 AddPreparedCallback(WritePreparedTxnDB
* db
, DBImpl
* db_impl
,
755 size_t sub_batch_cnt
, bool two_write_queues
,
756 bool first_prepare_batch
)
759 sub_batch_cnt_(sub_batch_cnt
),
760 two_write_queues_(two_write_queues
),
761 first_prepare_batch_(first_prepare_batch
) {
762 (void)two_write_queues_
; // to silence unused private field warning
764 virtual Status
Callback(SequenceNumber prepare_seq
,
765 bool is_mem_disabled
__attribute__((__unused__
)),
766 uint64_t log_number
) override
{
767 // Always Prepare from the main queue
768 assert(!two_write_queues_
|| !is_mem_disabled
); // implies the 1st queue
769 for (size_t i
= 0; i
< sub_batch_cnt_
; i
++) {
770 db_
->AddPrepared(prepare_seq
+ i
);
772 if (first_prepare_batch_
) {
773 assert(log_number
!= 0);
774 db_impl_
->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
781 WritePreparedTxnDB
* db_
;
783 size_t sub_batch_cnt_
;
784 bool two_write_queues_
;
785 // It is 2PC and this is the first prepare batch. Always the case in 2PC
786 // unless it is WriteUnPrepared.
787 bool first_prepare_batch_
;
790 class WritePreparedCommitEntryPreReleaseCallback
: public PreReleaseCallback
{
792 // includes_data indicates that the commit also writes non-empty
793 // CommitTimeWriteBatch to memtable, which needs to be committed separately.
794 WritePreparedCommitEntryPreReleaseCallback(
795 WritePreparedTxnDB
* db
, DBImpl
* db_impl
, SequenceNumber prep_seq
,
796 size_t prep_batch_cnt
, size_t data_batch_cnt
= 0,
797 SequenceNumber aux_seq
= kMaxSequenceNumber
, size_t aux_batch_cnt
= 0)
801 prep_batch_cnt_(prep_batch_cnt
),
802 data_batch_cnt_(data_batch_cnt
),
803 includes_data_(data_batch_cnt_
> 0),
805 aux_batch_cnt_(aux_batch_cnt
),
806 includes_aux_batch_(aux_batch_cnt
> 0) {
807 assert((prep_batch_cnt_
> 0) != (prep_seq
== kMaxSequenceNumber
)); // xor
808 assert(prep_batch_cnt_
> 0 || data_batch_cnt_
> 0);
809 assert((aux_batch_cnt_
> 0) != (aux_seq
== kMaxSequenceNumber
)); // xor
812 virtual Status
Callback(SequenceNumber commit_seq
,
813 bool is_mem_disabled
__attribute__((__unused__
)),
815 // Always commit from the 2nd queue
816 assert(!db_impl_
->immutable_db_options().two_write_queues
||
818 assert(includes_data_
|| prep_seq_
!= kMaxSequenceNumber
);
819 // Data batch is what accompanied with the commit marker and affects the
820 // last seq in the commit batch.
821 const uint64_t last_commit_seq
= LIKELY(data_batch_cnt_
<= 1)
823 : commit_seq
+ data_batch_cnt_
- 1;
824 if (prep_seq_
!= kMaxSequenceNumber
) {
825 for (size_t i
= 0; i
< prep_batch_cnt_
; i
++) {
826 db_
->AddCommitted(prep_seq_
+ i
, last_commit_seq
);
828 } // else there was no prepare phase
829 if (includes_aux_batch_
) {
830 for (size_t i
= 0; i
< aux_batch_cnt_
; i
++) {
831 db_
->AddCommitted(aux_seq_
+ i
, last_commit_seq
);
834 if (includes_data_
) {
835 assert(data_batch_cnt_
);
836 // Commit the data that is accompanied with the commit request
837 for (size_t i
= 0; i
< data_batch_cnt_
; i
++) {
838 // For commit seq of each batch use the commit seq of the last batch.
839 // This would make debugging easier by having all the batches having
840 // the same sequence number.
841 db_
->AddCommitted(commit_seq
+ i
, last_commit_seq
);
844 if (db_impl_
->immutable_db_options().two_write_queues
) {
845 assert(is_mem_disabled
); // implies the 2nd queue
846 // Publish the sequence number. We can do that here assuming the callback
847 // is invoked only from one write queue, which would guarantee that the
848 // publish sequence numbers will be in order, i.e., once a seq is
849 // published all the seq prior to that are also publishable.
850 db_impl_
->SetLastPublishedSequence(last_commit_seq
);
852 // else SequenceNumber that is updated as part of the write already does the
858 WritePreparedTxnDB
* db_
;
860 // kMaxSequenceNumber if there was no prepare phase
861 SequenceNumber prep_seq_
;
862 size_t prep_batch_cnt_
;
863 size_t data_batch_cnt_
;
864 // Data here is the batch that is written with the commit marker, either
865 // because it is commit without prepare or commit has a CommitTimeWriteBatch.
867 // Auxiliary batch (if there is any) is a batch that is written before, but
868 // gets the same commit seq as prepare batch or data batch. This is used in
869 // two write queues where the CommitTimeWriteBatch becomes the aux batch and
870 // we do a separate write to actually commit everything.
871 SequenceNumber aux_seq_
;
872 size_t aux_batch_cnt_
;
873 bool includes_aux_batch_
;
876 // For two_write_queues commit both the aborted batch and the cleanup batch and
877 // then published the seq
878 class WritePreparedRollbackPreReleaseCallback
: public PreReleaseCallback
{
880 WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB
* db
,
882 SequenceNumber prep_seq
,
883 SequenceNumber rollback_seq
,
884 size_t prep_batch_cnt
)
888 rollback_seq_(rollback_seq
),
889 prep_batch_cnt_(prep_batch_cnt
) {
890 assert(prep_seq
!= kMaxSequenceNumber
);
891 assert(rollback_seq
!= kMaxSequenceNumber
);
892 assert(prep_batch_cnt_
> 0);
895 Status
Callback(SequenceNumber commit_seq
, bool is_mem_disabled
,
897 // Always commit from the 2nd queue
898 assert(is_mem_disabled
); // implies the 2nd queue
899 assert(db_impl_
->immutable_db_options().two_write_queues
);
901 (void)is_mem_disabled
;
903 const uint64_t last_commit_seq
= commit_seq
;
904 db_
->AddCommitted(rollback_seq_
, last_commit_seq
);
905 for (size_t i
= 0; i
< prep_batch_cnt_
; i
++) {
906 db_
->AddCommitted(prep_seq_
+ i
, last_commit_seq
);
908 db_impl_
->SetLastPublishedSequence(last_commit_seq
);
913 WritePreparedTxnDB
* db_
;
915 SequenceNumber prep_seq_
;
916 SequenceNumber rollback_seq_
;
917 size_t prep_batch_cnt_
;
920 // Count the number of sub-batches inside a batch. A sub-batch does not have
922 struct SubBatchCounter
: public WriteBatch::Handler
{
923 explicit SubBatchCounter(std::map
<uint32_t, const Comparator
*>& comparators
)
924 : comparators_(comparators
), batches_(1) {}
925 std::map
<uint32_t, const Comparator
*>& comparators_
;
926 using CFKeys
= std::set
<Slice
, SetComparator
>;
927 std::map
<uint32_t, CFKeys
> keys_
;
929 size_t BatchCount() { return batches_
; }
930 void AddKey(const uint32_t cf
, const Slice
& key
);
931 void InitWithComp(const uint32_t cf
);
932 Status
MarkNoop(bool) override
{ return Status::OK(); }
933 Status
MarkEndPrepare(const Slice
&) override
{ return Status::OK(); }
934 Status
MarkCommit(const Slice
&) override
{ return Status::OK(); }
935 Status
PutCF(uint32_t cf
, const Slice
& key
, const Slice
&) override
{
939 Status
DeleteCF(uint32_t cf
, const Slice
& key
) override
{
943 Status
SingleDeleteCF(uint32_t cf
, const Slice
& key
) override
{
947 Status
MergeCF(uint32_t cf
, const Slice
& key
, const Slice
&) override
{
951 Status
MarkBeginPrepare(bool) override
{ return Status::OK(); }
952 Status
MarkRollback(const Slice
&) override
{ return Status::OK(); }
953 bool WriteAfterCommit() const override
{ return false; }
956 } // namespace rocksdb
957 #endif // ROCKSDB_LITE