namespace rocksdb {
-#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \
- ; // due to overhead by default skip such lines
-// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__)
-
// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
// In this way some data in the DB might not be committed. The DB provides
// mechanisms to tell such data apart from committed data.
class WritePreparedTxnDB : public PessimisticTransactionDB {
public:
- explicit WritePreparedTxnDB(
- DB* db, const TransactionDBOptions& txn_db_options,
- size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
- size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
+ explicit WritePreparedTxnDB(DB* db,
+ const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options),
- SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
+ SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
- COMMIT_CACHE_BITS(commit_cache_bits),
+ COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) {
Init(txn_db_options);
}
- explicit WritePreparedTxnDB(
- StackableDB* db, const TransactionDBOptions& txn_db_options,
- size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS,
- size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS)
+ explicit WritePreparedTxnDB(StackableDB* db,
+ const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options),
- SNAPSHOT_CACHE_BITS(snapshot_cache_bits),
+ SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
- COMMIT_CACHE_BITS(commit_cache_bits),
+ COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) {
Init(txn_db_options);
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override;
- virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
-
// Check whether the transaction that wrote the value with sequence number seq
// is visible to the snapshot with sequence number snapshot_seq.
// Returns true if commit_seq <= snapshot_seq
+ // If the snapshot_seq is already released and snapshot_seq <= max, sets
+ // *snap_released to true and returns true as well.
inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
- uint64_t min_uncommitted = 0) const {
+ uint64_t min_uncommitted = kMinUnCommittedSeq,
+ bool* snap_released = nullptr) const {
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
" min_uncommitted %" PRIu64,
prep_seq, snapshot_seq, min_uncommitted);
+ assert(min_uncommitted >= kMinUnCommittedSeq);
+ // Caller is responsible to initialize snap_released.
+ assert(snap_released == nullptr || *snap_released == false);
// Here we try to infer the return value without looking into prepare list.
// This would help avoiding synchronization over a shared map.
// TODO(myabandeh): optimize this. This sequence of checks must be correct
prep_seq, snapshot_seq, 0);
return false;
}
- if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
- // We should not normally reach here
- WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
- ReadLock rl(&prepared_mutex_);
- ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64,
- static_cast<uint64_t>(delayed_prepared_.size()));
- if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
- // Then it is not committed yet
- ROCKS_LOG_DETAILS(info_log_,
- "IsInSnapshot %" PRIu64 " in %" PRIu64
- " returns %" PRId32,
- prep_seq, snapshot_seq, 0);
- return false;
- }
- }
- // Note: since min_uncommitted does not include the delayed_prepared_ we
- // should check delayed_prepared_ first before applying this optimization.
- // TODO(myabandeh): include delayed_prepared_ in min_uncommitted
if (prep_seq < min_uncommitted) {
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
prep_seq, snapshot_seq, 1, min_uncommitted);
return true;
}
- auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
+ // Commit of delayed prepared has two non-atomic steps: add to commit cache,
+ // remove from delayed prepared. Our reads from these two is also
+ // non-atomic. By looking into commit cache first thus we might not find the
+ // prep_seq neither in commit cache not in delayed_prepared_. To fix that i)
+ // we check if there was any delayed prepared BEFORE looking into commit
+ // cache, ii) if there was, we complete the search steps to be these: i)
+ // commit cache, ii) delayed prepared, commit cache again. In this way if
+ // the first query to commit cache missed the commit, the 2nd will catch it.
+ bool was_empty;
+ SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub;
CommitEntry64b dont_care;
- CommitEntry cached;
- bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
- if (exist && prep_seq == cached.prep_seq) {
- // It is committed and also not evicted from commit cache
- ROCKS_LOG_DETAILS(
- info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
- prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
- return cached.commit_seq <= snapshot_seq;
- }
- // else it could be committed but not inserted in the map which could happen
- // after recovery, or it could be committed and evicted by another commit,
- // or never committed.
-
- // At this point we dont know if it was committed or it is still prepared
- auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
- // max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now
- if (max_evicted_seq < prep_seq) {
- // Not evicted from cache and also not present, so must be still prepared
- ROCKS_LOG_DETAILS(
- info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
- prep_seq, snapshot_seq, 0);
- return false;
- }
+ auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
+ size_t repeats = 0;
+ do {
+ repeats++;
+ assert(repeats < 100);
+ if (UNLIKELY(repeats >= 100)) {
+ throw std::runtime_error(
+ "The read was intrupted 100 times by update to max_evicted_seq_. "
+ "This is unexpected in all setups");
+ }
+ max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire);
+ TEST_SYNC_POINT(
+ "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause");
+ TEST_SYNC_POINT(
+ "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume");
+ was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
+ TEST_SYNC_POINT(
+ "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause");
+ TEST_SYNC_POINT(
+ "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume");
+ CommitEntry cached;
+ bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
+ TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
+ TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
+ if (exist && prep_seq == cached.prep_seq) {
+ // It is committed and also not evicted from commit cache
+ ROCKS_LOG_DETAILS(
+ info_log_,
+ "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
+ prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
+ return cached.commit_seq <= snapshot_seq;
+ }
+ // else it could be committed but not inserted in the map which could
+ // happen after recovery, or it could be committed and evicted by another
+ // commit, or never committed.
+
+ // At this point we dont know if it was committed or it is still prepared
+ max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
+ if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) {
+ continue;
+ }
+ // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub
+ if (max_evicted_seq_ub < prep_seq) {
+ // Not evicted from cache and also not present, so must be still
+ // prepared
+ ROCKS_LOG_DETAILS(info_log_,
+ "IsInSnapshot %" PRIu64 " in %" PRIu64
+ " returns %" PRId32,
+ prep_seq, snapshot_seq, 0);
+ return false;
+ }
+ TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause");
+ TEST_SYNC_POINT(
+ "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume");
+ if (!was_empty) {
+ // We should not normally reach here
+ WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
+ ReadLock rl(&prepared_mutex_);
+ ROCKS_LOG_WARN(
+ info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,
+ static_cast<uint64_t>(delayed_prepared_.size()), prep_seq);
+ if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
+ // This is the order: 1) delayed_prepared_commits_ update, 2) publish
+ // 3) delayed_prepared_ clean up. So check if it is the case of a late
+ // clenaup.
+ auto it = delayed_prepared_commits_.find(prep_seq);
+ if (it == delayed_prepared_commits_.end()) {
+ // Then it is not committed yet
+ ROCKS_LOG_DETAILS(info_log_,
+ "IsInSnapshot %" PRIu64 " in %" PRIu64
+ " returns %" PRId32,
+ prep_seq, snapshot_seq, 0);
+ return false;
+ } else {
+ ROCKS_LOG_DETAILS(info_log_,
+ "IsInSnapshot %" PRIu64 " in %" PRIu64
+ " commit: %" PRIu64 " returns %" PRId32,
+ prep_seq, snapshot_seq, it->second,
+ snapshot_seq <= it->second);
+ return it->second <= snapshot_seq;
+ }
+ } else {
+ // 2nd query to commit cache. Refer to was_empty comment above.
+ exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
+ if (exist && prep_seq == cached.prep_seq) {
+ ROCKS_LOG_DETAILS(
+ info_log_,
+ "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
+ prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
+ return cached.commit_seq <= snapshot_seq;
+ }
+ max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
+ }
+ }
+ } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub));
// When advancing max_evicted_seq_, we move older entires from prepared to
// delayed_prepared_. Also we move evicted entries from commit cache to
// old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
// max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
// old_commit_map_, iii) committed with no conflict with any snapshot. Case
// (i) delayed_prepared_ is checked above
- if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case
+ if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case
// only (iii) is the case: committed
// commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
// snapshot_seq
// snapshot. If there was no overlapping commit entry, then it is committed
// with a commit_seq lower than any live snapshot, including snapshot_seq.
if (old_commit_map_empty_.load(std::memory_order_acquire)) {
- ROCKS_LOG_DETAILS(
- info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
- prep_seq, snapshot_seq, 1);
+ ROCKS_LOG_DETAILS(info_log_,
+ "IsInSnapshot %" PRIu64 " in %" PRIu64
+ " returns %" PRId32 " released=1",
+ prep_seq, snapshot_seq, 0);
+ assert(snap_released);
+ // This snapshot is not valid anymore. We cannot tell if prep_seq is
+ // committed before or after the snapshot. Return true but also set
+ // snap_released to true.
+ *snap_released = true;
return true;
}
{
// rare case and it is ok to pay the cost of mutex ReadLock for such old,
// reading transactions.
WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
- ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
ReadLock rl(&old_commit_map_mutex_);
auto prep_set_entry = old_commit_map_.find(snapshot_seq);
bool found = prep_set_entry != old_commit_map_.end();
if (found) {
auto& vec = prep_set_entry->second;
found = std::binary_search(vec.begin(), vec.end(), prep_seq);
+ } else {
+ // coming from compaction
+ ROCKS_LOG_DETAILS(info_log_,
+ "IsInSnapshot %" PRIu64 " in %" PRIu64
+ " returns %" PRId32 " released=1",
+ prep_seq, snapshot_seq, 0);
+ // This snapshot is not valid anymore. We cannot tell if prep_seq is
+ // committed before or after the snapshot. Return true but also set
+ // snap_released to true.
+ assert(snap_released);
+ *snap_released = true;
+ return true;
}
+
if (!found) {
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
return false;
}
- // Add the transaction with prepare sequence seq to the prepared list
+ // Add the transaction with prepare sequence seq to the prepared list.
+ // Note: must be called serially with increasing seq on each call.
void AddPrepared(uint64_t seq);
+ // Check if any of the prepared txns are less than new max_evicted_seq_. Must
+ // be called with prepared_mutex_ write locked.
+ void CheckPreparedAgainstMax(SequenceNumber new_max);
// Remove the transaction with prepare sequence seq from the prepared list
void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
// Add the transaction with prepare sequence prepare_seq and commit sequence
// commit_seq to the commit map. loop_cnt is to detect infinite loops.
+ // Note: must be called serially.
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
uint8_t loop_cnt = 0);
void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
virtual const Snapshot* GetSnapshot() override;
+ SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check);
protected:
virtual Status VerifyCFOptions(
const ColumnFamilyOptions& cf_options) override;
private:
- friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
- friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
- friend class WritePreparedTransactionTest_CommitMapTest_Test;
- friend class
- WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test;
- friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test;
- friend class WritePreparedTransactionTestBase;
friend class PreparedHeap_BasicsTest_Test;
- friend class PreparedHeap_EmptyAtTheEnd_Test;
friend class PreparedHeap_Concurrent_Test;
+ friend class PreparedHeap_EmptyAtTheEnd_Test;
+ friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test;
+ friend class WritePreparedCommitEntryPreReleaseCallback;
+ friend class WritePreparedTransactionTestBase;
friend class WritePreparedTxn;
friend class WritePreparedTxnDBMock;
+ friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
friend class
WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test;
+ friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
+ friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
+ friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
+ friend class
+ WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test;
+ friend class WritePreparedTransactionTest_CommitMapTest_Test;
+ friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
+ friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
+ friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
+ friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
+ friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
+ friend class
+ WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
+ friend class
+ WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
+ friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
friend class WritePreparedTransactionTest_RollbackTest_Test;
friend class WriteUnpreparedTxnDB;
// reflect any uncommitted data that is not added to prepared_txns_ yet.
// Otherwise, if there is no concurrent txn, this value simply reflects that
// latest value in the memtable.
+ if (!delayed_prepared_.empty()) {
+ assert(!delayed_prepared_empty_.load());
+ return *delayed_prepared_.begin();
+ }
if (prepared_txns_.empty()) {
return db_impl_->GetLatestSequenceNumber() + 1;
} else {
// version value.
void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
const SequenceNumber& version);
+ // Check the new list of new snapshots against the old one to see if any of
+ // the snapshots are released and to do the cleanup for the released snapshot.
+ void CleanupReleasedSnapshots(
+ const std::vector<SequenceNumber>& new_snapshots,
+ const std::vector<SequenceNumber>& old_snapshots);
// Check an evicted entry against live snapshots to see if it should be kept
// around or it can be safely discarded (and hence assume committed for all
const uint64_t& snapshot_seq,
const bool next_is_larger);
+ // A trick to increase the last visible sequence number by one and also wait
+ // for the in-flight commits to be visible.
+ void AdvanceSeqByOne();
+
// The list of live snapshots at the last time that max_evicted_seq_ advanced.
// The list stored into two data structures: in snapshot_cache_ that is
// efficient for concurrent reads, and in snapshots_ if the data does not fit
// The list sorted in ascending order. Thread-safety for writes is provided
// with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
// each entry. In x86_64 architecture such reads are compiled to simple read
- // instructions. 128 entries
- static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast<size_t>(7);
+ // instructions.
const size_t SNAPSHOT_CACHE_BITS;
const size_t SNAPSHOT_CACHE_SIZE;
- unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
+ std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
// 2nd list for storing snapshots. The list sorted in ascending order.
// Thread-safety is provided with snapshots_mutex_.
std::vector<SequenceNumber> snapshots_;
+ // The list of all snapshots: snapshots_ + snapshot_cache_. This list although
+ // redundant but simplifies CleanupOldSnapshots implementation.
+ // Thread-safety is provided with snapshots_mutex_.
+ std::vector<SequenceNumber> snapshots_all_;
// The version of the latest list of snapshots. This can be used to avoid
// rewriting a list that is concurrently updated with a more recent version.
SequenceNumber snapshots_version_ = 0;
// A heap of prepared transactions. Thread-safety is provided with
// prepared_mutex_.
PreparedHeap prepared_txns_;
- // 8m entry, 64MB size
- static const size_t DEF_COMMIT_CACHE_BITS = static_cast<size_t>(23);
const size_t COMMIT_CACHE_BITS;
const size_t COMMIT_CACHE_SIZE;
const CommitEntry64bFormat FORMAT;
// commit_cache_ must be initialized to zero to tell apart an empty index from
// a filled one. Thread-safety is provided with commit_cache_mutex_.
- unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
+ std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
// The largest evicted *commit* sequence number from the commit_cache_. If a
// seq is smaller than max_evicted_seq_ is might or might not be present in
// commit_cache_. So commit_cache_ must first be checked before consulting
// with max_evicted_seq_.
std::atomic<uint64_t> max_evicted_seq_ = {};
+ // Order: 1) update future_max_evicted_seq_ = new_max, 2)
+ // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since
+ // GetSnapshotInternal guarantess that the snapshot seq is larger than
+ // future_max_evicted_seq_, this guarantes that if a snapshot is not larger
+ // than max has already being looked at via a GetSnapshotListFromDB(new_max).
+ std::atomic<uint64_t> future_max_evicted_seq_ = {};
// Advance max_evicted_seq_ by this value each time it needs an update. The
// larger the value, the less frequent advances we would have. We do not want
// it to be too large either as it would cause stalls by doing too much
// time max_evicted_seq_ advances their sequence number. This is expected to
// be empty normally. Thread-safety is provided with prepared_mutex_.
std::set<uint64_t> delayed_prepared_;
+ // Commit of a delayed prepared: 1) update commit cache, 2) update
+ // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_.
+ // delayed_prepared_commits_ will help us tell apart the unprepared txns from
+ // the ones that are committed but not cleaned up yet.
+ std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_;
// Update when delayed_prepared_.empty() changes. Expected to be true
// normally.
std::atomic<bool> delayed_prepared_empty_ = {true};
class WritePreparedTxnReadCallback : public ReadCallback {
public:
+ WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
+ : ReadCallback(snapshot), db_(db) {}
WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
SequenceNumber min_uncommitted)
- : db_(db), snapshot_(snapshot), min_uncommitted_(min_uncommitted) {}
+ : ReadCallback(snapshot, min_uncommitted), db_(db) {}
// Will be called to see if the seq number visible; if not it moves on to
// the next seq number.
- inline virtual bool IsVisible(SequenceNumber seq) override {
- return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_);
+ inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override {
+ auto snapshot = max_visible_seq_;
+ return db_->IsInSnapshot(seq, snapshot, min_uncommitted_);
}
+ // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
private:
WritePreparedTxnDB* db_;
- SequenceNumber snapshot_;
- SequenceNumber min_uncommitted_;
};
class AddPreparedCallback : public PreReleaseCallback {
public:
- AddPreparedCallback(WritePreparedTxnDB* db, size_t sub_batch_cnt,
- bool two_write_queues)
+ AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl,
+ size_t sub_batch_cnt, bool two_write_queues,
+ bool first_prepare_batch)
: db_(db),
+ db_impl_(db_impl),
sub_batch_cnt_(sub_batch_cnt),
- two_write_queues_(two_write_queues) {
+ two_write_queues_(two_write_queues),
+ first_prepare_batch_(first_prepare_batch) {
(void)two_write_queues_; // to silence unused private field warning
}
virtual Status Callback(SequenceNumber prepare_seq,
- bool is_mem_disabled) override {
-#ifdef NDEBUG
- (void)is_mem_disabled;
-#endif
+ bool is_mem_disabled __attribute__((__unused__)),
+ uint64_t log_number) override {
+ // Always Prepare from the main queue
assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue
for (size_t i = 0; i < sub_batch_cnt_; i++) {
db_->AddPrepared(prepare_seq + i);
}
+ if (first_prepare_batch_) {
+ assert(log_number != 0);
+ db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
+ log_number);
+ }
return Status::OK();
}
private:
WritePreparedTxnDB* db_;
+ DBImpl* db_impl_;
size_t sub_batch_cnt_;
bool two_write_queues_;
+ // It is 2PC and this is the first prepare batch. Always the case in 2PC
+ // unless it is WriteUnPrepared.
+ bool first_prepare_batch_;
};
class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
public:
// includes_data indicates that the commit also writes non-empty
// CommitTimeWriteBatch to memtable, which needs to be committed separately.
- WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db,
- DBImpl* db_impl,
- SequenceNumber prep_seq,
- size_t prep_batch_cnt,
- size_t data_batch_cnt = 0,
- bool publish_seq = true)
+ WritePreparedCommitEntryPreReleaseCallback(
+ WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq,
+ size_t prep_batch_cnt, size_t data_batch_cnt = 0,
+ SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0)
: db_(db),
db_impl_(db_impl),
prep_seq_(prep_seq),
prep_batch_cnt_(prep_batch_cnt),
data_batch_cnt_(data_batch_cnt),
includes_data_(data_batch_cnt_ > 0),
- publish_seq_(publish_seq) {
+ aux_seq_(aux_seq),
+ aux_batch_cnt_(aux_batch_cnt),
+ includes_aux_batch_(aux_batch_cnt > 0) {
assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
+ assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber)); // xor
}
virtual Status Callback(SequenceNumber commit_seq,
- bool is_mem_disabled) override {
-#ifdef NDEBUG
- (void)is_mem_disabled;
-#endif
+ bool is_mem_disabled __attribute__((__unused__)),
+ uint64_t) override {
+ // Always commit from the 2nd queue
+ assert(!db_impl_->immutable_db_options().two_write_queues ||
+ is_mem_disabled);
assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
+ // Data batch is what accompanied with the commit marker and affects the
+ // last seq in the commit batch.
const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
? commit_seq
: commit_seq + data_batch_cnt_ - 1;
db_->AddCommitted(prep_seq_ + i, last_commit_seq);
}
} // else there was no prepare phase
+ if (includes_aux_batch_) {
+ for (size_t i = 0; i < aux_batch_cnt_; i++) {
+ db_->AddCommitted(aux_seq_ + i, last_commit_seq);
+ }
+ }
if (includes_data_) {
assert(data_batch_cnt_);
// Commit the data that is accompanied with the commit request
db_->AddCommitted(commit_seq + i, last_commit_seq);
}
}
- if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) {
+ if (db_impl_->immutable_db_options().two_write_queues) {
assert(is_mem_disabled); // implies the 2nd queue
// Publish the sequence number. We can do that here assuming the callback
// is invoked only from one write queue, which would guarantee that the
SequenceNumber prep_seq_;
size_t prep_batch_cnt_;
size_t data_batch_cnt_;
- // Either because it is commit without prepare or it has a
- // CommitTimeWriteBatch
+ // Data here is the batch that is written with the commit marker, either
+ // because it is commit without prepare or commit has a CommitTimeWriteBatch.
bool includes_data_;
- // Should the callback also publishes the commit seq number
- bool publish_seq_;
+ // Auxiliary batch (if there is any) is a batch that is written before, but
+ // gets the same commit seq as prepare batch or data batch. This is used in
+ // two write queues where the CommitTimeWriteBatch becomes the aux batch and
+ // we do a separate write to actually commit everything.
+ SequenceNumber aux_seq_;
+ size_t aux_batch_cnt_;
+ bool includes_aux_batch_;
+};
+
+// For two_write_queues commit both the aborted batch and the cleanup batch and
+// then published the seq
+class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
+ public:
+ WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db,
+ DBImpl* db_impl,
+ SequenceNumber prep_seq,
+ SequenceNumber rollback_seq,
+ size_t prep_batch_cnt)
+ : db_(db),
+ db_impl_(db_impl),
+ prep_seq_(prep_seq),
+ rollback_seq_(rollback_seq),
+ prep_batch_cnt_(prep_batch_cnt) {
+ assert(prep_seq != kMaxSequenceNumber);
+ assert(rollback_seq != kMaxSequenceNumber);
+ assert(prep_batch_cnt_ > 0);
+ }
+
+ Status Callback(SequenceNumber commit_seq, bool is_mem_disabled,
+ uint64_t) override {
+ // Always commit from the 2nd queue
+ assert(is_mem_disabled); // implies the 2nd queue
+ assert(db_impl_->immutable_db_options().two_write_queues);
+#ifdef NDEBUG
+ (void)is_mem_disabled;
+#endif
+ const uint64_t last_commit_seq = commit_seq;
+ db_->AddCommitted(rollback_seq_, last_commit_seq);
+ for (size_t i = 0; i < prep_batch_cnt_; i++) {
+ db_->AddCommitted(prep_seq_ + i, last_commit_seq);
+ }
+ db_impl_->SetLastPublishedSequence(last_commit_seq);
+ return Status::OK();
+ }
+
+ private:
+ WritePreparedTxnDB* db_;
+ DBImpl* db_impl_;
+ SequenceNumber prep_seq_;
+ SequenceNumber rollback_seq_;
+ size_t prep_batch_cnt_;
};
// Count the number of sub-batches inside a batch. A sub-batch does not have