]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn_db.h
index 10d1dbf607ea6c274512395c4cd3d1cdcdea0d54..964b72689f81010cfdc8e8198ad327b833f7b5e8 100644 (file)
@@ -6,11 +6,7 @@
 #pragma once
 #ifndef ROCKSDB_LITE
 
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
+#include <cinttypes>
 #include <mutex>
 #include <queue>
 #include <set>
@@ -25,6 +21,7 @@
 #include "rocksdb/db.h"
 #include "rocksdb/options.h"
 #include "rocksdb/utilities/transaction_db.h"
+#include "util/cast_util.h"
 #include "util/set_comparator.h"
 #include "util/string_util.h"
 #include "utilities/transactions/pessimistic_transaction.h"
@@ -32,7 +29,8 @@
 #include "utilities/transactions/transaction_lock_mgr.h"
 #include "utilities/transactions/write_prepared_txn.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
+enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot };
 
 // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
 // In this way some data in the DB might not be committed. The DB provides
@@ -71,6 +69,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
                                 const TransactionOptions& txn_options,
                                 Transaction* old_txn) override;
 
+  using TransactionDB::Write;
+  Status Write(const WriteOptions& opts, WriteBatch* updates) override;
+
   // Optimized version of ::Write that receives more optimization request such
   // as skip_concurrency_control.
   using PessimisticTransactionDB::Write;
@@ -324,10 +325,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
 
   // Add the transaction with prepare sequence seq to the prepared list.
   // Note: must be called serially with increasing seq on each call.
-  void AddPrepared(uint64_t seq);
+  // locked is true if prepared_mutex_ is already locked.
+  void AddPrepared(uint64_t seq, bool locked = false);
   // Check if any of the prepared txns are less than new max_evicted_seq_. Must
   // be called with prepared_mutex_ write locked.
-  void CheckPreparedAgainstMax(SequenceNumber new_max);
+  void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked);
   // Remove the transaction with prepare sequence seq from the prepared list
   void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
   // Add the transaction with prepare sequence prepare_seq and commit sequence
@@ -445,40 +447,60 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
  protected:
   virtual Status VerifyCFOptions(
       const ColumnFamilyOptions& cf_options) override;
+  // Assign the min and max sequence numbers for reading from the db. A seq >
+  // max is not valid, and a seq < min is valid, and a min <= seq < max requires
+  // further checking. Normally max is defined by the snapshot and min is by
+  // minimum uncommitted seq.
+  inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot,
+                                         SequenceNumber* min,
+                                         SequenceNumber* max);
+  // Validate is a snapshot sequence number is still valid based on the latest
+  // db status. backed_by_snapshot specifies if the number is baked by an actual
+  // snapshot object. order specified the memory order with which we load the
+  // atomic variables: relax is enough for the default since we care about last
+  // value seen by same thread.
+  inline bool ValidateSnapshot(
+      const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
+      std::memory_order order = std::memory_order_relaxed);
+  // Get a dummy snapshot that refers to kMaxSequenceNumber
+  Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; }
 
  private:
+  friend class AddPreparedCallback;
   friend class PreparedHeap_BasicsTest_Test;
   friend class PreparedHeap_Concurrent_Test;
   friend class PreparedHeap_EmptyAtTheEnd_Test;
-  friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test;
+  friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccess_Test;
   friend class WritePreparedCommitEntryPreReleaseCallback;
   friend class WritePreparedTransactionTestBase;
   friend class WritePreparedTxn;
   friend class WritePreparedTxnDBMock;
   friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
-  friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
+  friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasic_Test;
   friend class
-      WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test;
+      WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicates_Test;
   friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
-  friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
-  friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
+  friend class WritePreparedTransactionTest_BasicRecovery_Test;
+  friend class WritePreparedTransactionTest_CheckAgainstSnapshots_Test;
   friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
-  friend class
-      WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test;
-  friend class WritePreparedTransactionTest_CommitMapTest_Test;
+  friend class WritePreparedTransactionTest_ConflictDetectionAfterRecovery_Test;
+  friend class WritePreparedTransactionTest_CommitMap_Test;
   friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
-  friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
+  friend class WritePreparedTransactionTest_IsInSnapshotEmptyMap_Test;
   friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
-  friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
+  friend class WritePreparedTransactionTest_IsInSnapshot_Test;
   friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
   friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
+  friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test;
   friend class
       WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
   friend class
       WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
   friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
   friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
-  friend class WritePreparedTransactionTest_RollbackTest_Test;
+  friend class WritePreparedTransactionTest_Rollback_Test;
+  friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test;
+  friend class WriteUnpreparedTxn;
   friend class WriteUnpreparedTxnDB;
   friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
 
@@ -491,10 +513,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   // A heap with the amortized O(1) complexity for erase. It uses one extra heap
   // to keep track of erased entries that are not yet on top of the main heap.
   class PreparedHeap {
-    std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
-        heap_;
+    // The mutex is required for push and pop from PreparedHeap. ::erase will
+    // use external synchronization via prepared_mutex_.
+    port::Mutex push_pop_mutex_;
+    std::deque<uint64_t> heap_;
     std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
         erased_heap_;
+    std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber};
     // True when testing crash recovery
     bool TEST_CRASH_ = false;
     friend class WritePreparedTxnDB;
@@ -506,18 +531,33 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
         assert(erased_heap_.empty());
       }
     }
-    bool empty() { return heap_.empty(); }
-    uint64_t top() { return heap_.top(); }
-    void push(uint64_t v) { heap_.push(v); }
-    void pop() {
-      heap_.pop();
+    port::Mutex* push_pop_mutex() { return &push_pop_mutex_; }
+
+    inline bool empty() { return top() == kMaxSequenceNumber; }
+    // Returns kMaxSequenceNumber if empty() and the smallest otherwise.
+    inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); }
+    inline void push(uint64_t v) {
+      push_pop_mutex_.AssertHeld();
+      if (heap_.empty()) {
+        heap_top_.store(v, std::memory_order_release);
+      } else {
+        assert(heap_top_.load() < v);
+      }
+      heap_.push_back(v);
+    }
+    void pop(bool locked = false) {
+      if (!locked) {
+        push_pop_mutex()->Lock();
+      }
+      push_pop_mutex_.AssertHeld();
+      heap_.pop_front();
       while (!heap_.empty() && !erased_heap_.empty() &&
              // heap_.top() > erased_heap_.top() could happen if we have erased
              // a non-existent entry. Ideally the user should not do that but we
              // should be resilient against it.
-             heap_.top() >= erased_heap_.top()) {
-        if (heap_.top() == erased_heap_.top()) {
-          heap_.pop();
+             heap_.front() >= erased_heap_.top()) {
+        if (heap_.front() == erased_heap_.top()) {
+          heap_.pop_front();
         }
         uint64_t erased __attribute__((__unused__));
         erased = erased_heap_.top();
@@ -528,15 +568,26 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
       while (heap_.empty() && !erased_heap_.empty()) {
         erased_heap_.pop();
       }
+      heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber,
+                      std::memory_order_release);
+      if (!locked) {
+        push_pop_mutex()->Unlock();
+      }
     }
+    // Concurrrent calls needs external synchronization. It is safe to be called
+    // concurrent to push and pop though.
     void erase(uint64_t seq) {
-      if (!heap_.empty()) {
-        if (seq < heap_.top()) {
+      if (!empty()) {
+        auto top_seq = top();
+        if (seq < top_seq) {
           // Already popped, ignore it.
-        } else if (heap_.top() == seq) {
+        } else if (top_seq == seq) {
           pop();
-          assert(heap_.empty() || heap_.top() != seq);
-        } else {  // (heap_.top() > seq)
+#ifndef NDEBUG
+          MutexLock ml(push_pop_mutex());
+          assert(heap_.empty() || heap_.front() != seq);
+#endif
+        } else {  // top() > seq
           // Down the heap, remember to pop it later
           erased_heap_.push(seq);
         }
@@ -575,33 +626,49 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
                             const SequenceNumber& new_max);
 
   inline SequenceNumber SmallestUnCommittedSeq() {
+    // Note: We have two lists to look into, but for performance reasons they
+    // are not read atomically. Since CheckPreparedAgainstMax copies the entry
+    // to delayed_prepared_ before removing it from prepared_txns_, to ensure
+    // that a prepared entry will not go unmissed, we look into them in opposite
+    // order: first read prepared_txns_ and then delayed_prepared_.
+
+    // This must be called before calling ::top. This is because the concurrent
+    // thread would call ::RemovePrepared before updating
+    // GetLatestSequenceNumber(). Reading then in opposite order here guarantees
+    // that the ::top that we read would be lower the ::top if we had otherwise
+    // update/read them atomically.
+    auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1;
+    auto min_prepare = prepared_txns_.top();
     // Since we update the prepare_heap always from the main write queue via
     // PreReleaseCallback, the prepared_txns_.top() indicates the smallest
     // prepared data in 2pc transactions. For non-2pc transactions that are
     // written in two steps, we also update prepared_txns_ at the first step
     // (via the same mechanism) so that their uncommitted data is reflected in
     // SmallestUnCommittedSeq.
-    ReadLock rl(&prepared_mutex_);
-    // Since we are holding the mutex, and GetLatestSequenceNumber is updated
-    // after prepared_txns_ are, the value of GetLatestSequenceNumber would
-    // reflect any uncommitted data that is not added to prepared_txns_ yet.
-    // Otherwise, if there is no concurrent txn, this value simply reflects that
-    // latest value in the memtable.
-    if (!delayed_prepared_.empty()) {
-      assert(!delayed_prepared_empty_.load());
-      return *delayed_prepared_.begin();
+    if (!delayed_prepared_empty_.load()) {
+      ReadLock rl(&prepared_mutex_);
+      if (!delayed_prepared_.empty()) {
+        return *delayed_prepared_.begin();
+      }
     }
-    if (prepared_txns_.empty()) {
-      return db_impl_->GetLatestSequenceNumber() + 1;
+    bool empty = min_prepare == kMaxSequenceNumber;
+    if (empty) {
+      // Since GetLatestSequenceNumber is updated
+      // after prepared_txns_ are, the value of GetLatestSequenceNumber would
+      // reflect any uncommitted data that is not added to prepared_txns_ yet.
+      // Otherwise, if there is no concurrent txn, this value simply reflects
+      // that latest value in the memtable.
+      return next_prepare;
     } else {
-      return std::min(prepared_txns_.top(),
-                      db_impl_->GetLatestSequenceNumber() + 1);
+      return std::min(min_prepare, next_prepare);
     }
   }
+
   // Enhance the snapshot object by recording in it the smallest uncommitted seq
   inline void EnhanceSnapshot(SnapshotImpl* snapshot,
                               SequenceNumber min_uncommitted) {
     assert(snapshot);
+    assert(min_uncommitted <= snapshot->number_ + 1);
     snapshot->min_uncommitted_ = min_uncommitted;
   }
 
@@ -727,26 +794,55 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
   // Thread safety: since the handle is read-only object it is a const it is
   // safe to read it concurrently
   std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
+  // A dummy snapshot object that refers to kMaxSequenceNumber
+  SnapshotImpl dummy_max_snapshot_;
 };
 
 class WritePreparedTxnReadCallback : public ReadCallback {
  public:
   WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
-      : ReadCallback(snapshot), db_(db) {}
+      : ReadCallback(snapshot),
+        db_(db),
+        backed_by_snapshot_(kBackedByDBSnapshot) {}
   WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
-                               SequenceNumber min_uncommitted)
-      : ReadCallback(snapshot, min_uncommitted), db_(db) {}
+                               SequenceNumber min_uncommitted,
+                               SnapshotBackup backed_by_snapshot)
+      : ReadCallback(snapshot, min_uncommitted),
+        db_(db),
+        backed_by_snapshot_(backed_by_snapshot) {
+    (void)backed_by_snapshot_;  // to silence unused private field warning
+  }
+
+  virtual ~WritePreparedTxnReadCallback() {
+    // If it is not backed by snapshot, the caller must check validity
+    assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
+  }
 
   // Will be called to see if the seq number visible; if not it moves on to
   // the next seq number.
   inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override {
     auto snapshot = max_visible_seq_;
-    return db_->IsInSnapshot(seq, snapshot, min_uncommitted_);
+    bool snap_released = false;
+    auto ret =
+        db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released);
+    assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
+    snap_released_ |= snap_released;
+    return ret;
+  }
+
+  inline bool valid() {
+    valid_checked_ = true;
+    return snap_released_ == false;
   }
 
   // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
  private:
   WritePreparedTxnDB* db_;
+  // Whether max_visible_seq_ is backed by a snapshot
+  const SnapshotBackup backed_by_snapshot_;
+  bool snap_released_ = false;
+  // Safety check to ensure that the caller has checked invalid statuses
+  bool valid_checked_ = false;
 };
 
 class AddPreparedCallback : public PreReleaseCallback {
@@ -763,12 +859,28 @@ class AddPreparedCallback : public PreReleaseCallback {
   }
   virtual Status Callback(SequenceNumber prepare_seq,
                           bool is_mem_disabled __attribute__((__unused__)),
-                          uint64_t log_number) override {
+                          uint64_t log_number, size_t index,
+                          size_t total) override {
+    assert(index < total);
+    // To reduce the cost of lock acquisition competing with the concurrent
+    // prepare requests, lock on the first callback and unlock on the last.
+    const bool do_lock = !two_write_queues_ || index == 0;
+    const bool do_unlock = !two_write_queues_ || index + 1 == total;
     // Always Prepare from the main queue
     assert(!two_write_queues_ || !is_mem_disabled);  // implies the 1st queue
+    TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause");
+    TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume");
+    if (do_lock) {
+      db_->prepared_txns_.push_pop_mutex()->Lock();
+    }
+    const bool kLocked = true;
     for (size_t i = 0; i < sub_batch_cnt_; i++) {
-      db_->AddPrepared(prepare_seq + i);
+      db_->AddPrepared(prepare_seq + i, kLocked);
+    }
+    if (do_unlock) {
+      db_->prepared_txns_.push_pop_mutex()->Unlock();
     }
+    TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end");
     if (first_prepare_batch_) {
       assert(log_number != 0);
       db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
@@ -811,7 +923,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
 
   virtual Status Callback(SequenceNumber commit_seq,
                           bool is_mem_disabled __attribute__((__unused__)),
-                          uint64_t) override {
+                          uint64_t, size_t /*index*/,
+                          size_t /*total*/) override {
     // Always commit from the 2nd queue
     assert(!db_impl_->immutable_db_options().two_write_queues ||
            is_mem_disabled);
@@ -848,6 +961,14 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
       // publish sequence numbers will be in order, i.e., once a seq is
       // published all the seq prior to that are also publishable.
       db_impl_->SetLastPublishedSequence(last_commit_seq);
+      // Note RemovePrepared should be called after publishing the seq.
+      // Otherwise SmallestUnCommittedSeq optimization breaks.
+      if (prep_seq_ != kMaxSequenceNumber) {
+        db_->RemovePrepared(prep_seq_, prep_batch_cnt_);
+      }  // else there was no prepare phase
+      if (includes_aux_batch_) {
+        db_->RemovePrepared(aux_seq_, aux_batch_cnt_);
+      }
     }
     // else SequenceNumber that is updated as part of the write already does the
     // publishing
@@ -892,8 +1013,8 @@ class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
     assert(prep_batch_cnt_ > 0);
   }
 
-  Status Callback(SequenceNumber commit_seq, bool is_mem_disabled,
-                  uint64_t) override {
+  Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t,
+                  size_t /*index*/, size_t /*total*/) override {
     // Always commit from the 2nd queue
     assert(is_mem_disabled);  // implies the 2nd queue
     assert(db_impl_->immutable_db_options().two_write_queues);
@@ -953,5 +1074,38 @@ struct SubBatchCounter : public WriteBatch::Handler {
   bool WriteAfterCommit() const override { return false; }
 };
 
-}  //  namespace rocksdb
+SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,
+                                                    SequenceNumber* min,
+                                                    SequenceNumber* max) {
+  if (snapshot != nullptr) {
+    *min = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
+               ->min_uncommitted_;
+    *max = static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
+               ->number_;
+    return kBackedByDBSnapshot;
+  } else {
+    *min = SmallestUnCommittedSeq();
+    *max = 0;  // to be assigned later after sv is referenced.
+    return kUnbackedByDBSnapshot;
+  }
+}
+
+bool WritePreparedTxnDB::ValidateSnapshot(
+    const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
+    std::memory_order order) {
+  if (backed_by_snapshot == kBackedByDBSnapshot) {
+    return true;
+  } else {
+    SequenceNumber max = max_evicted_seq_.load(order);
+    // Validate that max has not advanced the snapshot seq that is not backed
+    // by a real snapshot. This is a very rare case that should not happen in
+    // real workloads.
+    if (UNLIKELY(snap_seq <= max && snap_seq != 0)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+}  // namespace ROCKSDB_NAMESPACE
 #endif  // ROCKSDB_LITE