]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn_db.h
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#pragma once
7#ifndef ROCKSDB_LITE
8
f67539c2 9#include <cinttypes>
11fdf7f2
TL
10#include <mutex>
11#include <queue>
12#include <set>
13#include <string>
14#include <unordered_map>
15#include <vector>
16
17#include "db/db_iter.h"
18#include "db/pre_release_callback.h"
19#include "db/read_callback.h"
20#include "db/snapshot_checker.h"
21#include "rocksdb/db.h"
22#include "rocksdb/options.h"
23#include "rocksdb/utilities/transaction_db.h"
f67539c2 24#include "util/cast_util.h"
11fdf7f2
TL
25#include "util/set_comparator.h"
26#include "util/string_util.h"
27#include "utilities/transactions/pessimistic_transaction.h"
28#include "utilities/transactions/pessimistic_transaction_db.h"
11fdf7f2
TL
29#include "utilities/transactions/write_prepared_txn.h"
30
f67539c2
TL
31namespace ROCKSDB_NAMESPACE {
32enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot };
11fdf7f2 33
11fdf7f2
TL
34// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
35// In this way some data in the DB might not be committed. The DB provides
36// mechanisms to tell such data apart from committed data.
37class WritePreparedTxnDB : public PessimisticTransactionDB {
38 public:
494da23a
TL
39 explicit WritePreparedTxnDB(DB* db,
40 const TransactionDBOptions& txn_db_options)
11fdf7f2 41 : PessimisticTransactionDB(db, txn_db_options),
494da23a 42 SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
11fdf7f2 43 SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
494da23a 44 COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
11fdf7f2
TL
45 COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
46 FORMAT(COMMIT_CACHE_BITS) {
47 Init(txn_db_options);
48 }
49
494da23a
TL
50 explicit WritePreparedTxnDB(StackableDB* db,
51 const TransactionDBOptions& txn_db_options)
11fdf7f2 52 : PessimisticTransactionDB(db, txn_db_options),
494da23a 53 SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
11fdf7f2 54 SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
494da23a 55 COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
11fdf7f2
TL
56 COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
57 FORMAT(COMMIT_CACHE_BITS) {
58 Init(txn_db_options);
59 }
60
61 virtual ~WritePreparedTxnDB();
62
63 virtual Status Initialize(
64 const std::vector<size_t>& compaction_enabled_cf_indices,
65 const std::vector<ColumnFamilyHandle*>& handles) override;
66
67 Transaction* BeginTransaction(const WriteOptions& write_options,
68 const TransactionOptions& txn_options,
69 Transaction* old_txn) override;
70
f67539c2
TL
71 using TransactionDB::Write;
72 Status Write(const WriteOptions& opts, WriteBatch* updates) override;
73
11fdf7f2
TL
74 // Optimized version of ::Write that receives more optimization request such
75 // as skip_concurrency_control.
76 using PessimisticTransactionDB::Write;
77 Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&,
78 WriteBatch* updates) override;
79
80 // Write the batch to the underlying DB and mark it as committed. Could be
81 // used by both directly from TxnDB or through a transaction.
82 Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch,
83 size_t batch_cnt, WritePreparedTxn* txn);
84
85 using DB::Get;
86 virtual Status Get(const ReadOptions& options,
87 ColumnFamilyHandle* column_family, const Slice& key,
88 PinnableSlice* value) override;
89
90 using DB::MultiGet;
91 virtual std::vector<Status> MultiGet(
92 const ReadOptions& options,
93 const std::vector<ColumnFamilyHandle*>& column_family,
94 const std::vector<Slice>& keys,
95 std::vector<std::string>* values) override;
96
97 using DB::NewIterator;
98 virtual Iterator* NewIterator(const ReadOptions& options,
99 ColumnFamilyHandle* column_family) override;
100
101 using DB::NewIterators;
102 virtual Status NewIterators(
103 const ReadOptions& options,
104 const std::vector<ColumnFamilyHandle*>& column_families,
105 std::vector<Iterator*>* iterators) override;
106
11fdf7f2
TL
107 // Check whether the transaction that wrote the value with sequence number seq
108 // is visible to the snapshot with sequence number snapshot_seq.
109 // Returns true if commit_seq <= snapshot_seq
494da23a
TL
110 // If the snapshot_seq is already released and snapshot_seq <= max, sets
111 // *snap_released to true and returns true as well.
11fdf7f2 112 inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
494da23a
TL
113 uint64_t min_uncommitted = kMinUnCommittedSeq,
114 bool* snap_released = nullptr) const {
11fdf7f2
TL
115 ROCKS_LOG_DETAILS(info_log_,
116 "IsInSnapshot %" PRIu64 " in %" PRIu64
117 " min_uncommitted %" PRIu64,
118 prep_seq, snapshot_seq, min_uncommitted);
494da23a
TL
119 assert(min_uncommitted >= kMinUnCommittedSeq);
120 // Caller is responsible to initialize snap_released.
121 assert(snap_released == nullptr || *snap_released == false);
11fdf7f2
TL
122 // Here we try to infer the return value without looking into prepare list.
123 // This would help avoiding synchronization over a shared map.
124 // TODO(myabandeh): optimize this. This sequence of checks must be correct
125 // but not necessary efficient
126 if (prep_seq == 0) {
127 // Compaction will output keys to bottom-level with sequence number 0 if
128 // it is visible to the earliest snapshot.
129 ROCKS_LOG_DETAILS(
130 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
131 prep_seq, snapshot_seq, 1);
132 return true;
133 }
134 if (snapshot_seq < prep_seq) {
135 // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
136 ROCKS_LOG_DETAILS(
137 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
138 prep_seq, snapshot_seq, 0);
139 return false;
140 }
11fdf7f2
TL
141 if (prep_seq < min_uncommitted) {
142 ROCKS_LOG_DETAILS(info_log_,
143 "IsInSnapshot %" PRIu64 " in %" PRIu64
144 " returns %" PRId32
145 " because of min_uncommitted %" PRIu64,
146 prep_seq, snapshot_seq, 1, min_uncommitted);
147 return true;
148 }
494da23a
TL
149 // Commit of delayed prepared has two non-atomic steps: add to commit cache,
150 // remove from delayed prepared. Our reads from these two is also
151 // non-atomic. By looking into commit cache first thus we might not find the
152 // prep_seq neither in commit cache not in delayed_prepared_. To fix that i)
153 // we check if there was any delayed prepared BEFORE looking into commit
154 // cache, ii) if there was, we complete the search steps to be these: i)
155 // commit cache, ii) delayed prepared, commit cache again. In this way if
156 // the first query to commit cache missed the commit, the 2nd will catch it.
157 bool was_empty;
158 SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub;
11fdf7f2 159 CommitEntry64b dont_care;
494da23a
TL
160 auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
161 size_t repeats = 0;
162 do {
163 repeats++;
164 assert(repeats < 100);
165 if (UNLIKELY(repeats >= 100)) {
166 throw std::runtime_error(
167 "The read was intrupted 100 times by update to max_evicted_seq_. "
168 "This is unexpected in all setups");
169 }
170 max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire);
171 TEST_SYNC_POINT(
172 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause");
173 TEST_SYNC_POINT(
174 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume");
175 was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
176 TEST_SYNC_POINT(
177 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause");
178 TEST_SYNC_POINT(
179 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume");
180 CommitEntry cached;
181 bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
182 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
183 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
184 if (exist && prep_seq == cached.prep_seq) {
185 // It is committed and also not evicted from commit cache
186 ROCKS_LOG_DETAILS(
187 info_log_,
188 "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
189 prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
190 return cached.commit_seq <= snapshot_seq;
191 }
192 // else it could be committed but not inserted in the map which could
193 // happen after recovery, or it could be committed and evicted by another
194 // commit, or never committed.
195
20effc67 196 // At this point we don't know if it was committed or it is still prepared
494da23a
TL
197 max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
198 if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) {
199 continue;
200 }
201 // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub
202 if (max_evicted_seq_ub < prep_seq) {
203 // Not evicted from cache and also not present, so must be still
204 // prepared
205 ROCKS_LOG_DETAILS(info_log_,
206 "IsInSnapshot %" PRIu64 " in %" PRIu64
207 " returns %" PRId32,
208 prep_seq, snapshot_seq, 0);
209 return false;
210 }
211 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause");
212 TEST_SYNC_POINT(
213 "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume");
214 if (!was_empty) {
215 // We should not normally reach here
216 WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
217 ReadLock rl(&prepared_mutex_);
218 ROCKS_LOG_WARN(
219 info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,
220 static_cast<uint64_t>(delayed_prepared_.size()), prep_seq);
221 if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
222 // This is the order: 1) delayed_prepared_commits_ update, 2) publish
223 // 3) delayed_prepared_ clean up. So check if it is the case of a late
224 // clenaup.
225 auto it = delayed_prepared_commits_.find(prep_seq);
226 if (it == delayed_prepared_commits_.end()) {
227 // Then it is not committed yet
228 ROCKS_LOG_DETAILS(info_log_,
229 "IsInSnapshot %" PRIu64 " in %" PRIu64
230 " returns %" PRId32,
231 prep_seq, snapshot_seq, 0);
232 return false;
233 } else {
234 ROCKS_LOG_DETAILS(info_log_,
235 "IsInSnapshot %" PRIu64 " in %" PRIu64
236 " commit: %" PRIu64 " returns %" PRId32,
237 prep_seq, snapshot_seq, it->second,
238 snapshot_seq <= it->second);
239 return it->second <= snapshot_seq;
240 }
241 } else {
242 // 2nd query to commit cache. Refer to was_empty comment above.
243 exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
244 if (exist && prep_seq == cached.prep_seq) {
245 ROCKS_LOG_DETAILS(
246 info_log_,
247 "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
248 prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
249 return cached.commit_seq <= snapshot_seq;
250 }
251 max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
252 }
253 }
254 } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub));
11fdf7f2
TL
255 // When advancing max_evicted_seq_, we move older entires from prepared to
256 // delayed_prepared_. Also we move evicted entries from commit cache to
257 // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
258 // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
259 // old_commit_map_, iii) committed with no conflict with any snapshot. Case
260 // (i) delayed_prepared_ is checked above
494da23a 261 if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case
11fdf7f2
TL
262 // only (iii) is the case: committed
263 // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
264 // snapshot_seq
265 ROCKS_LOG_DETAILS(
266 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
267 prep_seq, snapshot_seq, 1);
268 return true;
269 }
270 // else (ii) might be the case: check the commit data saved for this
271 // snapshot. If there was no overlapping commit entry, then it is committed
272 // with a commit_seq lower than any live snapshot, including snapshot_seq.
273 if (old_commit_map_empty_.load(std::memory_order_acquire)) {
494da23a
TL
274 ROCKS_LOG_DETAILS(info_log_,
275 "IsInSnapshot %" PRIu64 " in %" PRIu64
276 " returns %" PRId32 " released=1",
277 prep_seq, snapshot_seq, 0);
278 assert(snap_released);
279 // This snapshot is not valid anymore. We cannot tell if prep_seq is
280 // committed before or after the snapshot. Return true but also set
281 // snap_released to true.
282 *snap_released = true;
11fdf7f2
TL
283 return true;
284 }
285 {
286 // We should not normally reach here unless sapshot_seq is old. This is a
287 // rare case and it is ok to pay the cost of mutex ReadLock for such old,
288 // reading transactions.
289 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
11fdf7f2
TL
290 ReadLock rl(&old_commit_map_mutex_);
291 auto prep_set_entry = old_commit_map_.find(snapshot_seq);
292 bool found = prep_set_entry != old_commit_map_.end();
293 if (found) {
294 auto& vec = prep_set_entry->second;
295 found = std::binary_search(vec.begin(), vec.end(), prep_seq);
494da23a
TL
296 } else {
297 // coming from compaction
298 ROCKS_LOG_DETAILS(info_log_,
299 "IsInSnapshot %" PRIu64 " in %" PRIu64
300 " returns %" PRId32 " released=1",
301 prep_seq, snapshot_seq, 0);
302 // This snapshot is not valid anymore. We cannot tell if prep_seq is
303 // committed before or after the snapshot. Return true but also set
304 // snap_released to true.
305 assert(snap_released);
306 *snap_released = true;
307 return true;
11fdf7f2 308 }
494da23a 309
11fdf7f2
TL
310 if (!found) {
311 ROCKS_LOG_DETAILS(info_log_,
312 "IsInSnapshot %" PRIu64 " in %" PRIu64
313 " returns %" PRId32,
314 prep_seq, snapshot_seq, 1);
315 return true;
316 }
317 }
318 // (ii) it the case: it is committed but after the snapshot_seq
319 ROCKS_LOG_DETAILS(
320 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
321 prep_seq, snapshot_seq, 0);
322 return false;
323 }
324
494da23a
TL
325 // Add the transaction with prepare sequence seq to the prepared list.
326 // Note: must be called serially with increasing seq on each call.
f67539c2
TL
327 // locked is true if prepared_mutex_ is already locked.
328 void AddPrepared(uint64_t seq, bool locked = false);
494da23a
TL
329 // Check if any of the prepared txns are less than new max_evicted_seq_. Must
330 // be called with prepared_mutex_ write locked.
f67539c2 331 void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked);
11fdf7f2
TL
332 // Remove the transaction with prepare sequence seq from the prepared list
333 void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
334 // Add the transaction with prepare sequence prepare_seq and commit sequence
335 // commit_seq to the commit map. loop_cnt is to detect infinite loops.
494da23a 336 // Note: must be called serially.
11fdf7f2
TL
337 void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
338 uint8_t loop_cnt = 0);
339
340 struct CommitEntry {
341 uint64_t prep_seq;
342 uint64_t commit_seq;
343 CommitEntry() : prep_seq(0), commit_seq(0) {}
344 CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
345 bool operator==(const CommitEntry& rhs) const {
346 return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
347 }
348 };
349
350 struct CommitEntry64bFormat {
351 explicit CommitEntry64bFormat(size_t index_bits)
352 : INDEX_BITS(index_bits),
353 PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
354 COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
355 COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)),
356 DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {}
357 // Number of higher bits of a sequence number that is not used. They are
358 // used to encode the value type, ...
359 const size_t PAD_BITS = static_cast<size_t>(8);
360 // Number of lower bits from prepare seq that can be skipped as they are
361 // implied by the index of the entry in the array
362 const size_t INDEX_BITS;
363 // Number of bits we use to encode the prepare seq
364 const size_t PREP_BITS;
365 // Number of bits we use to encode the commit seq.
366 const size_t COMMIT_BITS;
367 // Filter to encode/decode commit seq
368 const uint64_t COMMIT_FILTER;
369 // The value of commit_seq - prepare_seq + 1 must be less than this bound
370 const uint64_t DELTA_UPPERBOUND;
371 };
372
373 // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
374 // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ...
375 // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA
376 // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
377 // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
378 // bits that do not have to be encoded (will be provided externally) DELTA:
379 // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
380 // index bits + PADs
381 struct CommitEntry64b {
382 constexpr CommitEntry64b() noexcept : rep_(0) {}
383
384 CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format)
385 : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {}
386
387 CommitEntry64b(const uint64_t ps, const uint64_t cs,
388 const CommitEntry64bFormat& format) {
389 assert(ps < static_cast<uint64_t>(
390 (1ull << (format.PREP_BITS + format.INDEX_BITS))));
391 assert(ps <= cs);
392 uint64_t delta = cs - ps + 1; // make initialized delta always >= 1
393 // zero is reserved for uninitialized entries
394 assert(0 < delta);
395 assert(delta < format.DELTA_UPPERBOUND);
396 if (delta >= format.DELTA_UPPERBOUND) {
397 throw std::runtime_error(
398 "commit_seq >> prepare_seq. The allowed distance is " +
399 ToString(format.DELTA_UPPERBOUND) + " commit_seq is " +
400 ToString(cs) + " prepare_seq is " + ToString(ps));
401 }
402 rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
403 rep_ = rep_ | delta;
404 }
405
406 // Return false if the entry is empty
407 bool Parse(const uint64_t indexed_seq, CommitEntry* entry,
408 const CommitEntry64bFormat& format) {
409 uint64_t delta = rep_ & format.COMMIT_FILTER;
410 // zero is reserved for uninitialized entries
411 assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
412 if (delta == 0) {
413 return false; // initialized entry would have non-zero delta
414 }
415
416 assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)));
417 uint64_t prep_up = rep_ & ~format.COMMIT_FILTER;
418 prep_up >>= format.PAD_BITS;
419 const uint64_t& prep_low = indexed_seq;
420 entry->prep_seq = prep_up | prep_low;
421
422 entry->commit_seq = entry->prep_seq + delta - 1;
423 return true;
424 }
425
426 private:
427 uint64_t rep_;
428 };
429
430 // Struct to hold ownership of snapshot and read callback for cleanup.
431 struct IteratorState;
432
433 std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() {
434 return cf_map_;
435 }
436 std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() {
437 return handle_map_;
438 }
439 void UpdateCFComparatorMap(
440 const std::vector<ColumnFamilyHandle*>& handles) override;
441 void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
442
443 virtual const Snapshot* GetSnapshot() override;
494da23a 444 SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check);
11fdf7f2
TL
445
446 protected:
447 virtual Status VerifyCFOptions(
448 const ColumnFamilyOptions& cf_options) override;
f67539c2
TL
449 // Assign the min and max sequence numbers for reading from the db. A seq >
450 // max is not valid, and a seq < min is valid, and a min <= seq < max requires
451 // further checking. Normally max is defined by the snapshot and min is by
452 // minimum uncommitted seq.
453 inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot,
454 SequenceNumber* min,
455 SequenceNumber* max);
456 // Validate is a snapshot sequence number is still valid based on the latest
457 // db status. backed_by_snapshot specifies if the number is baked by an actual
458 // snapshot object. order specified the memory order with which we load the
459 // atomic variables: relax is enough for the default since we care about last
460 // value seen by same thread.
461 inline bool ValidateSnapshot(
462 const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
463 std::memory_order order = std::memory_order_relaxed);
464 // Get a dummy snapshot that refers to kMaxSequenceNumber
465 Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; }
11fdf7f2
TL
466
467 private:
f67539c2 468 friend class AddPreparedCallback;
11fdf7f2 469 friend class PreparedHeap_BasicsTest_Test;
11fdf7f2 470 friend class PreparedHeap_Concurrent_Test;
494da23a 471 friend class PreparedHeap_EmptyAtTheEnd_Test;
f67539c2 472 friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccess_Test;
494da23a
TL
473 friend class WritePreparedCommitEntryPreReleaseCallback;
474 friend class WritePreparedTransactionTestBase;
11fdf7f2
TL
475 friend class WritePreparedTxn;
476 friend class WritePreparedTxnDBMock;
494da23a 477 friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
f67539c2 478 friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasic_Test;
11fdf7f2 479 friend class
f67539c2 480 WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicates_Test;
494da23a 481 friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
f67539c2
TL
482 friend class WritePreparedTransactionTest_BasicRecovery_Test;
483 friend class WritePreparedTransactionTest_CheckAgainstSnapshots_Test;
494da23a 484 friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
f67539c2
TL
485 friend class WritePreparedTransactionTest_ConflictDetectionAfterRecovery_Test;
486 friend class WritePreparedTransactionTest_CommitMap_Test;
494da23a 487 friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
f67539c2 488 friend class WritePreparedTransactionTest_IsInSnapshotEmptyMap_Test;
494da23a 489 friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
f67539c2 490 friend class WritePreparedTransactionTest_IsInSnapshot_Test;
494da23a
TL
491 friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
492 friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
f67539c2 493 friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test;
494da23a
TL
494 friend class
495 WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
496 friend class
497 WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
498 friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
11fdf7f2 499 friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
f67539c2
TL
500 friend class WritePreparedTransactionTest_Rollback_Test;
501 friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test;
502 friend class WriteUnpreparedTxn;
11fdf7f2
TL
503 friend class WriteUnpreparedTxnDB;
504 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
505
506 void Init(const TransactionDBOptions& /* unused */);
507
508 void WPRecordTick(uint32_t ticker_type) const {
509 RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);
510 }
511
512 // A heap with the amortized O(1) complexity for erase. It uses one extra heap
513 // to keep track of erased entries that are not yet on top of the main heap.
514 class PreparedHeap {
f67539c2
TL
515 // The mutex is required for push and pop from PreparedHeap. ::erase will
516 // use external synchronization via prepared_mutex_.
517 port::Mutex push_pop_mutex_;
518 std::deque<uint64_t> heap_;
11fdf7f2
TL
519 std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
520 erased_heap_;
f67539c2 521 std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber};
11fdf7f2
TL
522 // True when testing crash recovery
523 bool TEST_CRASH_ = false;
524 friend class WritePreparedTxnDB;
525
526 public:
527 ~PreparedHeap() {
528 if (!TEST_CRASH_) {
529 assert(heap_.empty());
530 assert(erased_heap_.empty());
531 }
532 }
f67539c2
TL
533 port::Mutex* push_pop_mutex() { return &push_pop_mutex_; }
534
535 inline bool empty() { return top() == kMaxSequenceNumber; }
536 // Returns kMaxSequenceNumber if empty() and the smallest otherwise.
537 inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); }
538 inline void push(uint64_t v) {
539 push_pop_mutex_.AssertHeld();
540 if (heap_.empty()) {
541 heap_top_.store(v, std::memory_order_release);
542 } else {
543 assert(heap_top_.load() < v);
544 }
545 heap_.push_back(v);
546 }
547 void pop(bool locked = false) {
548 if (!locked) {
549 push_pop_mutex()->Lock();
550 }
551 push_pop_mutex_.AssertHeld();
552 heap_.pop_front();
11fdf7f2
TL
553 while (!heap_.empty() && !erased_heap_.empty() &&
554 // heap_.top() > erased_heap_.top() could happen if we have erased
555 // a non-existent entry. Ideally the user should not do that but we
556 // should be resilient against it.
f67539c2
TL
557 heap_.front() >= erased_heap_.top()) {
558 if (heap_.front() == erased_heap_.top()) {
559 heap_.pop_front();
11fdf7f2
TL
560 }
561 uint64_t erased __attribute__((__unused__));
562 erased = erased_heap_.top();
563 erased_heap_.pop();
564 // No duplicate prepare sequence numbers
565 assert(erased_heap_.empty() || erased_heap_.top() != erased);
566 }
567 while (heap_.empty() && !erased_heap_.empty()) {
568 erased_heap_.pop();
569 }
f67539c2
TL
570 heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber,
571 std::memory_order_release);
572 if (!locked) {
573 push_pop_mutex()->Unlock();
574 }
11fdf7f2 575 }
f67539c2
TL
576 // Concurrrent calls needs external synchronization. It is safe to be called
577 // concurrent to push and pop though.
11fdf7f2 578 void erase(uint64_t seq) {
f67539c2
TL
579 if (!empty()) {
580 auto top_seq = top();
581 if (seq < top_seq) {
11fdf7f2 582 // Already popped, ignore it.
f67539c2 583 } else if (top_seq == seq) {
11fdf7f2 584 pop();
f67539c2
TL
585#ifndef NDEBUG
586 MutexLock ml(push_pop_mutex());
587 assert(heap_.empty() || heap_.front() != seq);
588#endif
589 } else { // top() > seq
11fdf7f2
TL
590 // Down the heap, remember to pop it later
591 erased_heap_.push(seq);
592 }
593 }
594 }
595 };
596
597 void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; }
598
599 // Get the commit entry with index indexed_seq from the commit table. It
600 // returns true if such entry exists.
601 bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
602 CommitEntry* entry) const;
603
604 // Rewrite the entry with the index indexed_seq in the commit table with the
605 // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
606 // sets the evicted_entry and returns true.
607 bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry,
608 CommitEntry* evicted_entry);
609
610 // Rewrite the entry with the index indexed_seq in the commit table with the
611 // commit entry new_entry only if the existing entry matches the
612 // expected_entry. Returns false otherwise.
613 bool ExchangeCommitEntry(const uint64_t indexed_seq,
614 CommitEntry64b& expected_entry,
615 const CommitEntry& new_entry);
616
617 // Increase max_evicted_seq_ from the previous value prev_max to the new
618 // value. This also involves taking care of prepared txns that are not
619 // committed before new_max, as well as updating the list of live snapshots at
620 // the time of updating the max. Thread-safety: this function can be called
621 // concurrently. The concurrent invocations of this function is equivalent to
622 // a serial invocation in which the last invocation is the one with the
623 // largest new_max value.
624 void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
625 const SequenceNumber& new_max);
626
627 inline SequenceNumber SmallestUnCommittedSeq() {
f67539c2
TL
628 // Note: We have two lists to look into, but for performance reasons they
629 // are not read atomically. Since CheckPreparedAgainstMax copies the entry
630 // to delayed_prepared_ before removing it from prepared_txns_, to ensure
631 // that a prepared entry will not go unmissed, we look into them in opposite
632 // order: first read prepared_txns_ and then delayed_prepared_.
633
634 // This must be called before calling ::top. This is because the concurrent
635 // thread would call ::RemovePrepared before updating
636 // GetLatestSequenceNumber(). Reading then in opposite order here guarantees
637 // that the ::top that we read would be lower the ::top if we had otherwise
638 // update/read them atomically.
639 auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1;
640 auto min_prepare = prepared_txns_.top();
11fdf7f2
TL
641 // Since we update the prepare_heap always from the main write queue via
642 // PreReleaseCallback, the prepared_txns_.top() indicates the smallest
643 // prepared data in 2pc transactions. For non-2pc transactions that are
644 // written in two steps, we also update prepared_txns_ at the first step
645 // (via the same mechanism) so that their uncommitted data is reflected in
646 // SmallestUnCommittedSeq.
f67539c2
TL
647 if (!delayed_prepared_empty_.load()) {
648 ReadLock rl(&prepared_mutex_);
649 if (!delayed_prepared_.empty()) {
650 return *delayed_prepared_.begin();
651 }
494da23a 652 }
f67539c2
TL
653 bool empty = min_prepare == kMaxSequenceNumber;
654 if (empty) {
655 // Since GetLatestSequenceNumber is updated
656 // after prepared_txns_ are, the value of GetLatestSequenceNumber would
657 // reflect any uncommitted data that is not added to prepared_txns_ yet.
658 // Otherwise, if there is no concurrent txn, this value simply reflects
659 // that latest value in the memtable.
660 return next_prepare;
11fdf7f2 661 } else {
f67539c2 662 return std::min(min_prepare, next_prepare);
11fdf7f2
TL
663 }
664 }
f67539c2 665
11fdf7f2
TL
666 // Enhance the snapshot object by recording in it the smallest uncommitted seq
667 inline void EnhanceSnapshot(SnapshotImpl* snapshot,
668 SequenceNumber min_uncommitted) {
669 assert(snapshot);
f67539c2 670 assert(min_uncommitted <= snapshot->number_ + 1);
11fdf7f2
TL
671 snapshot->min_uncommitted_ = min_uncommitted;
672 }
673
674 virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
675 SequenceNumber max);
676
677 // Will be called by the public ReleaseSnapshot method. Does the maintenance
678 // internal to WritePreparedTxnDB
679 void ReleaseSnapshotInternal(const SequenceNumber snap_seq);
680
681 // Update the list of snapshots corresponding to the soon-to-be-updated
682 // max_evicted_seq_. Thread-safety: this function can be called concurrently.
683 // The concurrent invocations of this function is equivalent to a serial
684 // invocation in which the last invocation is the one with the largest
685 // version value.
686 void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
687 const SequenceNumber& version);
494da23a
TL
688 // Check the new list of new snapshots against the old one to see if any of
689 // the snapshots are released and to do the cleanup for the released snapshot.
690 void CleanupReleasedSnapshots(
691 const std::vector<SequenceNumber>& new_snapshots,
692 const std::vector<SequenceNumber>& old_snapshots);
11fdf7f2
TL
693
694 // Check an evicted entry against live snapshots to see if it should be kept
695 // around or it can be safely discarded (and hence assume committed for all
696 // snapshots). Thread-safety: this function can be called concurrently. If it
697 // is called concurrently with multiple UpdateSnapshots, the result is the
698 // same as checking the intersection of the snapshot list before updates with
699 // the snapshot list of all the concurrent updates.
700 void CheckAgainstSnapshots(const CommitEntry& evicted);
701
702 // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
703 // commit_seq. Return false if checking the next snapshot(s) is not needed.
704 // This is the case if none of the next snapshots could satisfy the condition.
705 // next_is_larger: the next snapshot will be a larger value
706 bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
707 const uint64_t& commit_seq,
708 const uint64_t& snapshot_seq,
709 const bool next_is_larger);
710
494da23a
TL
711 // A trick to increase the last visible sequence number by one and also wait
712 // for the in-flight commits to be visible.
713 void AdvanceSeqByOne();
714
11fdf7f2
TL
715 // The list of live snapshots at the last time that max_evicted_seq_ advanced.
716 // The list stored into two data structures: in snapshot_cache_ that is
717 // efficient for concurrent reads, and in snapshots_ if the data does not fit
718 // into snapshot_cache_. The total number of snapshots in the two lists
719 std::atomic<size_t> snapshots_total_ = {};
720 // The list sorted in ascending order. Thread-safety for writes is provided
721 // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
722 // each entry. In x86_64 architecture such reads are compiled to simple read
494da23a 723 // instructions.
11fdf7f2
TL
724 const size_t SNAPSHOT_CACHE_BITS;
725 const size_t SNAPSHOT_CACHE_SIZE;
494da23a 726 std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
11fdf7f2
TL
727 // 2nd list for storing snapshots. The list sorted in ascending order.
728 // Thread-safety is provided with snapshots_mutex_.
729 std::vector<SequenceNumber> snapshots_;
494da23a
TL
730 // The list of all snapshots: snapshots_ + snapshot_cache_. This list although
731 // redundant but simplifies CleanupOldSnapshots implementation.
732 // Thread-safety is provided with snapshots_mutex_.
733 std::vector<SequenceNumber> snapshots_all_;
11fdf7f2
TL
734 // The version of the latest list of snapshots. This can be used to avoid
735 // rewriting a list that is concurrently updated with a more recent version.
736 SequenceNumber snapshots_version_ = 0;
737
738 // A heap of prepared transactions. Thread-safety is provided with
739 // prepared_mutex_.
740 PreparedHeap prepared_txns_;
11fdf7f2
TL
741 const size_t COMMIT_CACHE_BITS;
742 const size_t COMMIT_CACHE_SIZE;
743 const CommitEntry64bFormat FORMAT;
744 // commit_cache_ must be initialized to zero to tell apart an empty index from
745 // a filled one. Thread-safety is provided with commit_cache_mutex_.
494da23a 746 std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
11fdf7f2
TL
747 // The largest evicted *commit* sequence number from the commit_cache_. If a
748 // seq is smaller than max_evicted_seq_ is might or might not be present in
749 // commit_cache_. So commit_cache_ must first be checked before consulting
750 // with max_evicted_seq_.
751 std::atomic<uint64_t> max_evicted_seq_ = {};
494da23a
TL
752 // Order: 1) update future_max_evicted_seq_ = new_max, 2)
753 // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since
754 // GetSnapshotInternal guarantess that the snapshot seq is larger than
755 // future_max_evicted_seq_, this guarantes that if a snapshot is not larger
756 // than max has already being looked at via a GetSnapshotListFromDB(new_max).
757 std::atomic<uint64_t> future_max_evicted_seq_ = {};
11fdf7f2
TL
758 // Advance max_evicted_seq_ by this value each time it needs an update. The
759 // larger the value, the less frequent advances we would have. We do not want
760 // it to be too large either as it would cause stalls by doing too much
761 // maintenance work under the lock.
762 size_t INC_STEP_FOR_MAX_EVICTED = 1;
763 // A map from old snapshots (expected to be used by a few read-only txns) to
764 // prepared sequence number of the evicted entries from commit_cache_ that
765 // overlaps with such snapshot. These are the prepared sequence numbers that
766 // the snapshot, to which they are mapped, cannot assume to be committed just
767 // because it is no longer in the commit_cache_. The vector must be sorted
768 // after each update.
769 // Thread-safety is provided with old_commit_map_mutex_.
770 std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_;
771 // A set of long-running prepared transactions that are not finished by the
772 // time max_evicted_seq_ advances their sequence number. This is expected to
773 // be empty normally. Thread-safety is provided with prepared_mutex_.
774 std::set<uint64_t> delayed_prepared_;
494da23a
TL
775 // Commit of a delayed prepared: 1) update commit cache, 2) update
776 // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_.
777 // delayed_prepared_commits_ will help us tell apart the unprepared txns from
778 // the ones that are committed but not cleaned up yet.
779 std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_;
11fdf7f2
TL
780 // Update when delayed_prepared_.empty() changes. Expected to be true
781 // normally.
782 std::atomic<bool> delayed_prepared_empty_ = {true};
783 // Update when old_commit_map_.empty() changes. Expected to be true normally.
784 std::atomic<bool> old_commit_map_empty_ = {true};
785 mutable port::RWMutex prepared_mutex_;
786 mutable port::RWMutex old_commit_map_mutex_;
787 mutable port::RWMutex commit_cache_mutex_;
788 mutable port::RWMutex snapshots_mutex_;
789 // A cache of the cf comparators
790 // Thread safety: since it is a const it is safe to read it concurrently
791 std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_;
792 // A cache of the cf handles
793 // Thread safety: since the handle is read-only object it is a const it is
794 // safe to read it concurrently
795 std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
f67539c2
TL
796 // A dummy snapshot object that refers to kMaxSequenceNumber
797 SnapshotImpl dummy_max_snapshot_;
11fdf7f2
TL
798};
799
800class WritePreparedTxnReadCallback : public ReadCallback {
801 public:
494da23a 802 WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
f67539c2
TL
803 : ReadCallback(snapshot),
804 db_(db),
805 backed_by_snapshot_(kBackedByDBSnapshot) {}
11fdf7f2 806 WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
f67539c2
TL
807 SequenceNumber min_uncommitted,
808 SnapshotBackup backed_by_snapshot)
809 : ReadCallback(snapshot, min_uncommitted),
810 db_(db),
811 backed_by_snapshot_(backed_by_snapshot) {
812 (void)backed_by_snapshot_; // to silence unused private field warning
813 }
814
815 virtual ~WritePreparedTxnReadCallback() {
816 // If it is not backed by snapshot, the caller must check validity
817 assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
818 }
11fdf7f2
TL
819
820 // Will be called to see if the seq number visible; if not it moves on to
821 // the next seq number.
494da23a
TL
822 inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override {
823 auto snapshot = max_visible_seq_;
f67539c2
TL
824 bool snap_released = false;
825 auto ret =
826 db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released);
827 assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
828 snap_released_ |= snap_released;
829 return ret;
830 }
831
832 inline bool valid() {
833 valid_checked_ = true;
834 return snap_released_ == false;
11fdf7f2
TL
835 }
836
494da23a 837 // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
11fdf7f2
TL
838 private:
839 WritePreparedTxnDB* db_;
f67539c2
TL
840 // Whether max_visible_seq_ is backed by a snapshot
841 const SnapshotBackup backed_by_snapshot_;
842 bool snap_released_ = false;
843 // Safety check to ensure that the caller has checked invalid statuses
844 bool valid_checked_ = false;
11fdf7f2
TL
845};
846
847class AddPreparedCallback : public PreReleaseCallback {
848 public:
494da23a
TL
849 AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl,
850 size_t sub_batch_cnt, bool two_write_queues,
851 bool first_prepare_batch)
11fdf7f2 852 : db_(db),
494da23a 853 db_impl_(db_impl),
11fdf7f2 854 sub_batch_cnt_(sub_batch_cnt),
494da23a
TL
855 two_write_queues_(two_write_queues),
856 first_prepare_batch_(first_prepare_batch) {
11fdf7f2
TL
857 (void)two_write_queues_; // to silence unused private field warning
858 }
859 virtual Status Callback(SequenceNumber prepare_seq,
494da23a 860 bool is_mem_disabled __attribute__((__unused__)),
f67539c2
TL
861 uint64_t log_number, size_t index,
862 size_t total) override {
863 assert(index < total);
864 // To reduce the cost of lock acquisition competing with the concurrent
865 // prepare requests, lock on the first callback and unlock on the last.
866 const bool do_lock = !two_write_queues_ || index == 0;
867 const bool do_unlock = !two_write_queues_ || index + 1 == total;
494da23a 868 // Always Prepare from the main queue
11fdf7f2 869 assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue
f67539c2
TL
870 TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause");
871 TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume");
872 if (do_lock) {
873 db_->prepared_txns_.push_pop_mutex()->Lock();
874 }
875 const bool kLocked = true;
11fdf7f2 876 for (size_t i = 0; i < sub_batch_cnt_; i++) {
f67539c2
TL
877 db_->AddPrepared(prepare_seq + i, kLocked);
878 }
879 if (do_unlock) {
880 db_->prepared_txns_.push_pop_mutex()->Unlock();
11fdf7f2 881 }
f67539c2 882 TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end");
494da23a
TL
883 if (first_prepare_batch_) {
884 assert(log_number != 0);
885 db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
886 log_number);
887 }
11fdf7f2
TL
888 return Status::OK();
889 }
890
891 private:
892 WritePreparedTxnDB* db_;
494da23a 893 DBImpl* db_impl_;
11fdf7f2
TL
894 size_t sub_batch_cnt_;
895 bool two_write_queues_;
494da23a
TL
896 // It is 2PC and this is the first prepare batch. Always the case in 2PC
897 // unless it is WriteUnPrepared.
898 bool first_prepare_batch_;
11fdf7f2
TL
899};
900
901class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
902 public:
903 // includes_data indicates that the commit also writes non-empty
904 // CommitTimeWriteBatch to memtable, which needs to be committed separately.
494da23a
TL
905 WritePreparedCommitEntryPreReleaseCallback(
906 WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq,
907 size_t prep_batch_cnt, size_t data_batch_cnt = 0,
908 SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0)
11fdf7f2
TL
909 : db_(db),
910 db_impl_(db_impl),
911 prep_seq_(prep_seq),
912 prep_batch_cnt_(prep_batch_cnt),
913 data_batch_cnt_(data_batch_cnt),
914 includes_data_(data_batch_cnt_ > 0),
494da23a
TL
915 aux_seq_(aux_seq),
916 aux_batch_cnt_(aux_batch_cnt),
917 includes_aux_batch_(aux_batch_cnt > 0) {
11fdf7f2
TL
918 assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
919 assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
494da23a 920 assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber)); // xor
11fdf7f2
TL
921 }
922
923 virtual Status Callback(SequenceNumber commit_seq,
494da23a 924 bool is_mem_disabled __attribute__((__unused__)),
f67539c2
TL
925 uint64_t, size_t /*index*/,
926 size_t /*total*/) override {
494da23a
TL
927 // Always commit from the 2nd queue
928 assert(!db_impl_->immutable_db_options().two_write_queues ||
929 is_mem_disabled);
11fdf7f2 930 assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
494da23a
TL
931 // Data batch is what accompanied with the commit marker and affects the
932 // last seq in the commit batch.
11fdf7f2
TL
933 const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
934 ? commit_seq
935 : commit_seq + data_batch_cnt_ - 1;
936 if (prep_seq_ != kMaxSequenceNumber) {
937 for (size_t i = 0; i < prep_batch_cnt_; i++) {
938 db_->AddCommitted(prep_seq_ + i, last_commit_seq);
939 }
940 } // else there was no prepare phase
494da23a
TL
941 if (includes_aux_batch_) {
942 for (size_t i = 0; i < aux_batch_cnt_; i++) {
943 db_->AddCommitted(aux_seq_ + i, last_commit_seq);
944 }
945 }
11fdf7f2
TL
946 if (includes_data_) {
947 assert(data_batch_cnt_);
948 // Commit the data that is accompanied with the commit request
949 for (size_t i = 0; i < data_batch_cnt_; i++) {
950 // For commit seq of each batch use the commit seq of the last batch.
951 // This would make debugging easier by having all the batches having
952 // the same sequence number.
953 db_->AddCommitted(commit_seq + i, last_commit_seq);
954 }
955 }
494da23a 956 if (db_impl_->immutable_db_options().two_write_queues) {
11fdf7f2
TL
957 assert(is_mem_disabled); // implies the 2nd queue
958 // Publish the sequence number. We can do that here assuming the callback
959 // is invoked only from one write queue, which would guarantee that the
960 // publish sequence numbers will be in order, i.e., once a seq is
961 // published all the seq prior to that are also publishable.
962 db_impl_->SetLastPublishedSequence(last_commit_seq);
f67539c2
TL
963 // Note RemovePrepared should be called after publishing the seq.
964 // Otherwise SmallestUnCommittedSeq optimization breaks.
965 if (prep_seq_ != kMaxSequenceNumber) {
966 db_->RemovePrepared(prep_seq_, prep_batch_cnt_);
967 } // else there was no prepare phase
968 if (includes_aux_batch_) {
969 db_->RemovePrepared(aux_seq_, aux_batch_cnt_);
970 }
11fdf7f2
TL
971 }
972 // else SequenceNumber that is updated as part of the write already does the
973 // publishing
974 return Status::OK();
975 }
976
977 private:
978 WritePreparedTxnDB* db_;
979 DBImpl* db_impl_;
980 // kMaxSequenceNumber if there was no prepare phase
981 SequenceNumber prep_seq_;
982 size_t prep_batch_cnt_;
983 size_t data_batch_cnt_;
494da23a
TL
984 // Data here is the batch that is written with the commit marker, either
985 // because it is commit without prepare or commit has a CommitTimeWriteBatch.
11fdf7f2 986 bool includes_data_;
494da23a
TL
987 // Auxiliary batch (if there is any) is a batch that is written before, but
988 // gets the same commit seq as prepare batch or data batch. This is used in
989 // two write queues where the CommitTimeWriteBatch becomes the aux batch and
990 // we do a separate write to actually commit everything.
991 SequenceNumber aux_seq_;
992 size_t aux_batch_cnt_;
993 bool includes_aux_batch_;
994};
995
996// For two_write_queues commit both the aborted batch and the cleanup batch and
997// then published the seq
998class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
999 public:
1000 WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db,
1001 DBImpl* db_impl,
1002 SequenceNumber prep_seq,
1003 SequenceNumber rollback_seq,
1004 size_t prep_batch_cnt)
1005 : db_(db),
1006 db_impl_(db_impl),
1007 prep_seq_(prep_seq),
1008 rollback_seq_(rollback_seq),
1009 prep_batch_cnt_(prep_batch_cnt) {
1010 assert(prep_seq != kMaxSequenceNumber);
1011 assert(rollback_seq != kMaxSequenceNumber);
1012 assert(prep_batch_cnt_ > 0);
1013 }
1014
f67539c2
TL
1015 Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t,
1016 size_t /*index*/, size_t /*total*/) override {
494da23a
TL
1017 // Always commit from the 2nd queue
1018 assert(is_mem_disabled); // implies the 2nd queue
1019 assert(db_impl_->immutable_db_options().two_write_queues);
1020#ifdef NDEBUG
1021 (void)is_mem_disabled;
1022#endif
1023 const uint64_t last_commit_seq = commit_seq;
1024 db_->AddCommitted(rollback_seq_, last_commit_seq);
1025 for (size_t i = 0; i < prep_batch_cnt_; i++) {
1026 db_->AddCommitted(prep_seq_ + i, last_commit_seq);
1027 }
1028 db_impl_->SetLastPublishedSequence(last_commit_seq);
1029 return Status::OK();
1030 }
1031
1032 private:
1033 WritePreparedTxnDB* db_;
1034 DBImpl* db_impl_;
1035 SequenceNumber prep_seq_;
1036 SequenceNumber rollback_seq_;
1037 size_t prep_batch_cnt_;
11fdf7f2
TL
1038};
1039
1040// Count the number of sub-batches inside a batch. A sub-batch does not have
1041// duplicate keys.
1042struct SubBatchCounter : public WriteBatch::Handler {
1043 explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
1044 : comparators_(comparators), batches_(1) {}
1045 std::map<uint32_t, const Comparator*>& comparators_;
1046 using CFKeys = std::set<Slice, SetComparator>;
1047 std::map<uint32_t, CFKeys> keys_;
1048 size_t batches_;
1049 size_t BatchCount() { return batches_; }
1050 void AddKey(const uint32_t cf, const Slice& key);
1051 void InitWithComp(const uint32_t cf);
1052 Status MarkNoop(bool) override { return Status::OK(); }
1053 Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
1054 Status MarkCommit(const Slice&) override { return Status::OK(); }
1055 Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
1056 AddKey(cf, key);
1057 return Status::OK();
1058 }
1059 Status DeleteCF(uint32_t cf, const Slice& key) override {
1060 AddKey(cf, key);
1061 return Status::OK();
1062 }
1063 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
1064 AddKey(cf, key);
1065 return Status::OK();
1066 }
1067 Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
1068 AddKey(cf, key);
1069 return Status::OK();
1070 }
1071 Status MarkBeginPrepare(bool) override { return Status::OK(); }
1072 Status MarkRollback(const Slice&) override { return Status::OK(); }
1073 bool WriteAfterCommit() const override { return false; }
1074};
1075
f67539c2
TL
1076SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,
1077 SequenceNumber* min,
1078 SequenceNumber* max) {
1079 if (snapshot != nullptr) {
20effc67
TL
1080 *min =
1081 static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
1082 *max = static_cast_with_check<const SnapshotImpl>(snapshot)->number_;
f67539c2
TL
1083 return kBackedByDBSnapshot;
1084 } else {
1085 *min = SmallestUnCommittedSeq();
1086 *max = 0; // to be assigned later after sv is referenced.
1087 return kUnbackedByDBSnapshot;
1088 }
1089}
1090
1091bool WritePreparedTxnDB::ValidateSnapshot(
1092 const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
1093 std::memory_order order) {
1094 if (backed_by_snapshot == kBackedByDBSnapshot) {
1095 return true;
1096 } else {
1097 SequenceNumber max = max_evicted_seq_.load(order);
1098 // Validate that max has not advanced the snapshot seq that is not backed
1099 // by a real snapshot. This is a very rare case that should not happen in
1100 // real workloads.
1101 if (UNLIKELY(snap_seq <= max && snap_seq != 0)) {
1102 return false;
1103 }
1104 }
1105 return true;
1106}
1107
1108} // namespace ROCKSDB_NAMESPACE
11fdf7f2 1109#endif // ROCKSDB_LITE