#pragma once
#ifndef ROCKSDB_LITE
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
+#include <cinttypes>
#include <mutex>
#include <queue>
#include <set>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
+#include "util/cast_util.h"
#include "util/set_comparator.h"
#include "util/string_util.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_lock_mgr.h"
#include "utilities/transactions/write_prepared_txn.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
+enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot };
// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
// In this way some data in the DB might not be committed. The DB provides
const TransactionOptions& txn_options,
Transaction* old_txn) override;
+ using TransactionDB::Write;
+ Status Write(const WriteOptions& opts, WriteBatch* updates) override;
+
// Optimized version of ::Write that receives more optimization request such
// as skip_concurrency_control.
using PessimisticTransactionDB::Write;
// Add the transaction with prepare sequence seq to the prepared list.
// Note: must be called serially with increasing seq on each call.
- void AddPrepared(uint64_t seq);
+ // locked is true if prepared_mutex_ is already locked.
+ void AddPrepared(uint64_t seq, bool locked = false);
// Check if any of the prepared txns are less than new max_evicted_seq_. Must
// be called with prepared_mutex_ write locked.
- void CheckPreparedAgainstMax(SequenceNumber new_max);
+ void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked);
// Remove the transaction with prepare sequence seq from the prepared list
void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
// Add the transaction with prepare sequence prepare_seq and commit sequence
protected:
virtual Status VerifyCFOptions(
const ColumnFamilyOptions& cf_options) override;
+ // Assign the min and max sequence numbers for reading from the db. A seq >
+ // max is not valid, and a seq < min is valid, and a min <= seq < max requires
+ // further checking. Normally max is defined by the snapshot and min is by
+ // minimum uncommitted seq.
+ inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot,
+ SequenceNumber* min,
+ SequenceNumber* max);
+ // Validate is a snapshot sequence number is still valid based on the latest
+ // db status. backed_by_snapshot specifies if the number is baked by an actual
+ // snapshot object. order specified the memory order with which we load the
+ // atomic variables: relax is enough for the default since we care about last
+ // value seen by same thread.
+ inline bool ValidateSnapshot(
+ const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
+ std::memory_order order = std::memory_order_relaxed);
+ // Get a dummy snapshot that refers to kMaxSequenceNumber
+ Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; }
private:
+ friend class AddPreparedCallback;
friend class PreparedHeap_BasicsTest_Test;
friend class PreparedHeap_Concurrent_Test;
friend class PreparedHeap_EmptyAtTheEnd_Test;
- friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test;
+ friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccess_Test;
friend class WritePreparedCommitEntryPreReleaseCallback;
friend class WritePreparedTransactionTestBase;
friend class WritePreparedTxn;
friend class WritePreparedTxnDBMock;
friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
- friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
+ friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasic_Test;
friend class
- WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test;
+ WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicates_Test;
friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
- friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
- friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
+ friend class WritePreparedTransactionTest_BasicRecovery_Test;
+ friend class WritePreparedTransactionTest_CheckAgainstSnapshots_Test;
friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
- friend class
- WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test;
- friend class WritePreparedTransactionTest_CommitMapTest_Test;
+ friend class WritePreparedTransactionTest_ConflictDetectionAfterRecovery_Test;
+ friend class WritePreparedTransactionTest_CommitMap_Test;
friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
- friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
+ friend class WritePreparedTransactionTest_IsInSnapshotEmptyMap_Test;
friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
- friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
+ friend class WritePreparedTransactionTest_IsInSnapshot_Test;
friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
+ friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test;
friend class
WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
friend class
WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
- friend class WritePreparedTransactionTest_RollbackTest_Test;
+ friend class WritePreparedTransactionTest_Rollback_Test;
+ friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test;
+ friend class WriteUnpreparedTxn;
friend class WriteUnpreparedTxnDB;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
// A heap with the amortized O(1) complexity for erase. It uses one extra heap
// to keep track of erased entries that are not yet on top of the main heap.
class PreparedHeap {
- std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
- heap_;
+ // The mutex is required for push and pop from PreparedHeap. ::erase will
+ // use external synchronization via prepared_mutex_.
+ port::Mutex push_pop_mutex_;
+ std::deque<uint64_t> heap_;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
erased_heap_;
+ std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber};
// True when testing crash recovery
bool TEST_CRASH_ = false;
friend class WritePreparedTxnDB;
assert(erased_heap_.empty());
}
}
- bool empty() { return heap_.empty(); }
- uint64_t top() { return heap_.top(); }
- void push(uint64_t v) { heap_.push(v); }
- void pop() {
- heap_.pop();
+ port::Mutex* push_pop_mutex() { return &push_pop_mutex_; }
+
+ inline bool empty() { return top() == kMaxSequenceNumber; }
+ // Returns kMaxSequenceNumber if empty() and the smallest otherwise.
+ inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); }
+ inline void push(uint64_t v) {
+ push_pop_mutex_.AssertHeld();
+ if (heap_.empty()) {
+ heap_top_.store(v, std::memory_order_release);
+ } else {
+ assert(heap_top_.load() < v);
+ }
+ heap_.push_back(v);
+ }
+ void pop(bool locked = false) {
+ if (!locked) {
+ push_pop_mutex()->Lock();
+ }
+ push_pop_mutex_.AssertHeld();
+ heap_.pop_front();
while (!heap_.empty() && !erased_heap_.empty() &&
// heap_.top() > erased_heap_.top() could happen if we have erased
// a non-existent entry. Ideally the user should not do that but we
// should be resilient against it.
- heap_.top() >= erased_heap_.top()) {
- if (heap_.top() == erased_heap_.top()) {
- heap_.pop();
+ heap_.front() >= erased_heap_.top()) {
+ if (heap_.front() == erased_heap_.top()) {
+ heap_.pop_front();
}
uint64_t erased __attribute__((__unused__));
erased = erased_heap_.top();
while (heap_.empty() && !erased_heap_.empty()) {
erased_heap_.pop();
}
+ heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber,
+ std::memory_order_release);
+ if (!locked) {
+ push_pop_mutex()->Unlock();
+ }
}
+ // Concurrrent calls needs external synchronization. It is safe to be called
+ // concurrent to push and pop though.
void erase(uint64_t seq) {
- if (!heap_.empty()) {
- if (seq < heap_.top()) {
+ if (!empty()) {
+ auto top_seq = top();
+ if (seq < top_seq) {
// Already popped, ignore it.
- } else if (heap_.top() == seq) {
+ } else if (top_seq == seq) {
pop();
- assert(heap_.empty() || heap_.top() != seq);
- } else { // (heap_.top() > seq)
+#ifndef NDEBUG
+ MutexLock ml(push_pop_mutex());
+ assert(heap_.empty() || heap_.front() != seq);
+#endif
+ } else { // top() > seq
// Down the heap, remember to pop it later
erased_heap_.push(seq);
}
const SequenceNumber& new_max);
inline SequenceNumber SmallestUnCommittedSeq() {
+ // Note: We have two lists to look into, but for performance reasons they
+ // are not read atomically. Since CheckPreparedAgainstMax copies the entry
+ // to delayed_prepared_ before removing it from prepared_txns_, to ensure
+ // that a prepared entry will not go unmissed, we look into them in opposite
+ // order: first read prepared_txns_ and then delayed_prepared_.
+
+ // This must be called before calling ::top. This is because the concurrent
+ // thread would call ::RemovePrepared before updating
+ // GetLatestSequenceNumber(). Reading then in opposite order here guarantees
+ // that the ::top that we read would be lower the ::top if we had otherwise
+ // update/read them atomically.
+ auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1;
+ auto min_prepare = prepared_txns_.top();
// Since we update the prepare_heap always from the main write queue via
// PreReleaseCallback, the prepared_txns_.top() indicates the smallest
// prepared data in 2pc transactions. For non-2pc transactions that are
// written in two steps, we also update prepared_txns_ at the first step
// (via the same mechanism) so that their uncommitted data is reflected in
// SmallestUnCommittedSeq.
- ReadLock rl(&prepared_mutex_);
- // Since we are holding the mutex, and GetLatestSequenceNumber is updated
- // after prepared_txns_ are, the value of GetLatestSequenceNumber would
- // reflect any uncommitted data that is not added to prepared_txns_ yet.
- // Otherwise, if there is no concurrent txn, this value simply reflects that
- // latest value in the memtable.
- if (!delayed_prepared_.empty()) {
- assert(!delayed_prepared_empty_.load());
- return *delayed_prepared_.begin();
+ if (!delayed_prepared_empty_.load()) {
+ ReadLock rl(&prepared_mutex_);
+ if (!delayed_prepared_.empty()) {
+ return *delayed_prepared_.begin();
+ }
}
- if (prepared_txns_.empty()) {
- return db_impl_->GetLatestSequenceNumber() + 1;
+ bool empty = min_prepare == kMaxSequenceNumber;
+ if (empty) {
+ // Since GetLatestSequenceNumber is updated
+ // after prepared_txns_ are, the value of GetLatestSequenceNumber would
+ // reflect any uncommitted data that is not added to prepared_txns_ yet.
+ // Otherwise, if there is no concurrent txn, this value simply reflects
+ // that latest value in the memtable.
+ return next_prepare;
} else {
- return std::min(prepared_txns_.top(),
- db_impl_->GetLatestSequenceNumber() + 1);
+ return std::min(min_prepare, next_prepare);
}
}
+
// Enhance the snapshot object by recording in it the smallest uncommitted seq
inline void EnhanceSnapshot(SnapshotImpl* snapshot,
SequenceNumber min_uncommitted) {
assert(snapshot);
+ assert(min_uncommitted <= snapshot->number_ + 1);
snapshot->min_uncommitted_ = min_uncommitted;
}
// Thread safety: since the handle is read-only object it is a const it is
// safe to read it concurrently
std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
+ // A dummy snapshot object that refers to kMaxSequenceNumber
+ SnapshotImpl dummy_max_snapshot_;
};
class WritePreparedTxnReadCallback : public ReadCallback {
public:
WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
- : ReadCallback(snapshot), db_(db) {}
+ : ReadCallback(snapshot),
+ db_(db),
+ backed_by_snapshot_(kBackedByDBSnapshot) {}
WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
- SequenceNumber min_uncommitted)
- : ReadCallback(snapshot, min_uncommitted), db_(db) {}
+ SequenceNumber min_uncommitted,
+ SnapshotBackup backed_by_snapshot)
+ : ReadCallback(snapshot, min_uncommitted),
+ db_(db),
+ backed_by_snapshot_(backed_by_snapshot) {
+ (void)backed_by_snapshot_; // to silence unused private field warning
+ }
+
+ virtual ~WritePreparedTxnReadCallback() {
+ // If it is not backed by snapshot, the caller must check validity
+ assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
+ }
// Will be called to see if the seq number visible; if not it moves on to
// the next seq number.
inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override {
auto snapshot = max_visible_seq_;
- return db_->IsInSnapshot(seq, snapshot, min_uncommitted_);
+ bool snap_released = false;
+ auto ret =
+ db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released);
+ assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
+ snap_released_ |= snap_released;
+ return ret;
+ }
+
+ inline bool valid() {
+ valid_checked_ = true;
+ return snap_released_ == false;
}
// TODO(myabandeh): override Refresh when Iterator::Refresh is supported
private:
WritePreparedTxnDB* db_;
+ // Whether max_visible_seq_ is backed by a snapshot
+ const SnapshotBackup backed_by_snapshot_;
+ bool snap_released_ = false;
+ // Safety check to ensure that the caller has checked invalid statuses
+ bool valid_checked_ = false;
};
class AddPreparedCallback : public PreReleaseCallback {
}
virtual Status Callback(SequenceNumber prepare_seq,
bool is_mem_disabled __attribute__((__unused__)),
- uint64_t log_number) override {
+ uint64_t log_number, size_t index,
+ size_t total) override {
+ assert(index < total);
+ // To reduce the cost of lock acquisition competing with the concurrent
+ // prepare requests, lock on the first callback and unlock on the last.
+ const bool do_lock = !two_write_queues_ || index == 0;
+ const bool do_unlock = !two_write_queues_ || index + 1 == total;
// Always Prepare from the main queue
assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue
+ TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause");
+ TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume");
+ if (do_lock) {
+ db_->prepared_txns_.push_pop_mutex()->Lock();
+ }
+ const bool kLocked = true;
for (size_t i = 0; i < sub_batch_cnt_; i++) {
- db_->AddPrepared(prepare_seq + i);
+ db_->AddPrepared(prepare_seq + i, kLocked);
+ }
+ if (do_unlock) {
+ db_->prepared_txns_.push_pop_mutex()->Unlock();
}
+ TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end");
if (first_prepare_batch_) {
assert(log_number != 0);
db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)),
- uint64_t) override {
+ uint64_t, size_t /*index*/,
+ size_t /*total*/) override {
// Always commit from the 2nd queue
assert(!db_impl_->immutable_db_options().two_write_queues ||
is_mem_disabled);
// publish sequence numbers will be in order, i.e., once a seq is
// published all the seq prior to that are also publishable.
db_impl_->SetLastPublishedSequence(last_commit_seq);
+ // Note RemovePrepared should be called after publishing the seq.
+ // Otherwise SmallestUnCommittedSeq optimization breaks.
+ if (prep_seq_ != kMaxSequenceNumber) {
+ db_->RemovePrepared(prep_seq_, prep_batch_cnt_);
+ } // else there was no prepare phase
+ if (includes_aux_batch_) {
+ db_->RemovePrepared(aux_seq_, aux_batch_cnt_);
+ }
}
// else SequenceNumber that is updated as part of the write already does the
// publishing
assert(prep_batch_cnt_ > 0);
}
- Status Callback(SequenceNumber commit_seq, bool is_mem_disabled,
- uint64_t) override {
+ Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t,
+ size_t /*index*/, size_t /*total*/) override {
// Always commit from the 2nd queue
assert(is_mem_disabled); // implies the 2nd queue
assert(db_impl_->immutable_db_options().two_write_queues);
bool WriteAfterCommit() const override { return false; }
};
-} // namespace rocksdb
+SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,
+ SequenceNumber* min,
+ SequenceNumber* max) {
+ if (snapshot != nullptr) {
+ *min = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
+ ->min_uncommitted_;
+ *max = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
+ ->number_;
+ return kBackedByDBSnapshot;
+ } else {
+ *min = SmallestUnCommittedSeq();
+ *max = 0; // to be assigned later after sv is referenced.
+ return kUnbackedByDBSnapshot;
+ }
+}
+
+bool WritePreparedTxnDB::ValidateSnapshot(
+ const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
+ std::memory_order order) {
+ if (backed_by_snapshot == kBackedByDBSnapshot) {
+ return true;
+ } else {
+ SequenceNumber max = max_evicted_seq_.load(order);
+ // Validate that max has not advanced the snapshot seq that is not backed
+ // by a real snapshot. This is a very rare case that should not happen in
+ // real workloads.
+ if (UNLIKELY(snap_seq <= max && snap_seq != 0)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE