]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/transactions/write_prepared_txn.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn.cc
index cb20d143988bfc65fa10e9f26ebad63bc80fc79f..4100925c55ce5e17889569e0473af6a0c87551d8 100644 (file)
@@ -31,8 +31,13 @@ struct WriteOptions;
 WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
                                    const WriteOptions& write_options,
                                    const TransactionOptions& txn_options)
-    : PessimisticTransaction(txn_db, write_options, txn_options),
-      wpt_db_(txn_db) {}
+    : PessimisticTransaction(txn_db, write_options, txn_options, false),
+      wpt_db_(txn_db) {
+  // Call Initialize outside PessimisticTransaction constructor otherwise it
+  // would skip overridden functions in WritePreparedTxn since they are not
+  // defined yet in the constructor of PessimisticTransaction
+  Initialize(txn_options);
+}
 
 void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
   PessimisticTransaction::Initialize(txn_options);
@@ -45,7 +50,8 @@ Status WritePreparedTxn::Get(const ReadOptions& read_options,
   auto snapshot = read_options.snapshot;
   auto snap_seq =
       snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
-  SequenceNumber min_uncommitted = 0;  // by default disable the optimization
+  SequenceNumber min_uncommitted =
+      kMinUnCommittedSeq;  // by default disable the optimization
   if (snapshot != nullptr) {
     min_uncommitted =
         static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
@@ -78,19 +84,17 @@ Status WritePreparedTxn::PrepareInternal() {
   WriteOptions write_options = write_options_;
   write_options.disableWAL = false;
   const bool WRITE_AFTER_COMMIT = true;
+  const bool kFirstPrepareBatch = true;
   WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
                                      !WRITE_AFTER_COMMIT);
   // For each duplicate key we account for a new sub-batch
   prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
-  // AddPrepared better to be called in the pre-release callback otherwise there
-  // is a non-zero chance of max advancing prepare_seq and readers assume the
-  // data as committed.
-  // Also having it in the PreReleaseCallback allows in-order addition of
-  // prepared entries to PrepareHeap and hence enables an optimization. Refer to
+  // Having AddPrepared in the PreReleaseCallback allows in-order addition of
+  // prepared entries to PreparedHeap and hence enables an optimization. Refer to
   // SmallestUnCommittedSeq for more details.
   AddPreparedCallback add_prepared_callback(
-      wpt_db_, prepare_batch_cnt_,
-      db_impl_->immutable_db_options().two_write_queues);
+      wpt_db_, db_impl_, prepare_batch_cnt_,
+      db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch);
   const bool DISABLE_MEMTABLE = true;
   uint64_t seq_used = kMaxSequenceNumber;
   Status s = db_impl_->WriteImpl(
@@ -146,14 +150,19 @@ Status WritePreparedTxn::CommitInternal() {
   const bool disable_memtable = !includes_data;
   const bool do_one_write =
       !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
-  const bool publish_seq = do_one_write;
-  // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to
-  // DB in one shot. min_uncommitted still works since it requires capturing
-  // data that is written to DB but not yet committed, while
-  // CommitTimeWriteBatch commits with PreReleaseCallback.
   WritePreparedCommitEntryPreReleaseCallback update_commit_map(
-      wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt,
-      publish_seq);
+      wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
+  // This is to call AddPrepared on CommitTimeWriteBatch
+  const bool kFirstPrepareBatch = true;
+  AddPreparedCallback add_prepared_callback(
+      wpt_db_, db_impl_, commit_batch_cnt,
+      db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
+  PreReleaseCallback* pre_release_callback;
+  if (do_one_write) {
+    pre_release_callback = &update_commit_map;
+  } else {
+    pre_release_callback = &add_prepared_callback;
+  }
   uint64_t seq_used = kMaxSequenceNumber;
   // Since the prepared batch is directly written to memtable, there is already
   // a connection between the memtable and its WAL, so there is no need to
@@ -162,37 +171,29 @@ Status WritePreparedTxn::CommitInternal() {
   size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
   auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
                                zero_log_number, disable_memtable, &seq_used,
-                               batch_cnt, &update_commit_map);
+                               batch_cnt, pre_release_callback);
   assert(!s.ok() || seq_used != kMaxSequenceNumber);
+  const SequenceNumber commit_batch_seq = seq_used;
   if (LIKELY(do_one_write || !s.ok())) {
     if (LIKELY(s.ok())) {
       // Note RemovePrepared should be called after WriteImpl that publishsed
       // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
       wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
     }
+    if (UNLIKELY(!do_one_write)) {
+      wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
+    }
     return s;
   }  // else do the 2nd write to publish seq
   // Note: the 2nd write comes with a performance penality. So if we have too
   // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
   // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
   // two_write_queues should be disabled to avoid many additional writes here.
-  class PublishSeqPreReleaseCallback : public PreReleaseCallback {
-   public:
-    explicit PublishSeqPreReleaseCallback(DBImpl* db_impl)
-        : db_impl_(db_impl) {}
-    virtual Status Callback(SequenceNumber seq, bool is_mem_disabled) override {
-#ifdef NDEBUG
-      (void)is_mem_disabled;
-#endif
-      assert(is_mem_disabled);
-      assert(db_impl_->immutable_db_options().two_write_queues);
-      db_impl_->SetLastPublishedSequence(seq);
-      return Status::OK();
-    }
-
-   private:
-    DBImpl* db_impl_;
-  } publish_seq_callback(db_impl_);
+  const size_t kZeroData = 0;
+  // Update commit map only from the 2nd queue
+  WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch(
+      wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData,
+      commit_batch_seq, commit_batch_cnt);
   WriteBatch empty_batch;
   empty_batch.PutLogData(Slice());
   // In the absence of Prepare markers, use Noop as a batch separator
@@ -202,11 +203,12 @@ Status WritePreparedTxn::CommitInternal() {
   const uint64_t NO_REF_LOG = 0;
   s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
                           NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
-                          &publish_seq_callback);
+                          &update_commit_map_with_aux_batch);
   assert(!s.ok() || seq_used != kMaxSequenceNumber);
   // Note RemovePrepared should be called after WriteImpl that publishsed the
   // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
   wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
+  wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
   return s;
 }
 
@@ -218,8 +220,7 @@ Status WritePreparedTxn::RollbackInternal() {
   assert(GetId() > 0);
   auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
   auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
-  // In WritePrepared, the txn is is the same as prepare seq
-  auto last_visible_txn = GetId() - 1;
+  auto read_at_seq = kMaxSequenceNumber;
   struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
     DBImpl* db_;
     ReadOptions roptions;
@@ -237,8 +238,7 @@ Status WritePreparedTxn::RollbackInternal() {
         std::map<uint32_t, ColumnFamilyHandle*>& handles,
         bool rollback_merge_operands)
         : db_(db),
-          callback(wpt_db, snap_seq,
-                   0),  // 0 disables min_uncommitted optimization
+          callback(wpt_db, snap_seq),  // disable min_uncommitted optimization
           rollback_batch_(dst_batch),
           comparators_(comparators),
           handles_(handles),
@@ -307,8 +307,8 @@ Status WritePreparedTxn::RollbackInternal() {
     }
 
    protected:
-    virtual bool WriteAfterCommit() const override { return false; }
-  } rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch,
+    bool WriteAfterCommit() const override { return false; }
+  } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
                      *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
                      wpt_db_->txn_db_options_.rollback_merge_operands);
   auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
@@ -323,22 +323,32 @@ Status WritePreparedTxn::RollbackInternal() {
   const uint64_t NO_REF_LOG = 0;
   uint64_t seq_used = kMaxSequenceNumber;
   const size_t ONE_BATCH = 1;
-  // We commit the rolled back prepared batches. ALthough this is
+  const bool kFirstPrepareBatch = true;
+  // We commit the rolled back prepared batches. Although this is
   // counter-intuitive, i) it is safe to do so, since the prepared batches are
   // already canceled out by the rollback batch, ii) adding the commit entry to
   // CommitCache will allow us to benefit from the existing mechanism in
   // CommitCache that keeps an entry evicted due to max advance and yet overlaps
   // with a live snapshot around so that the live snapshot properly skips the
   // entry even if its prepare seq is lower than max_evicted_seq_.
+  AddPreparedCallback add_prepared_callback(
+      wpt_db_, db_impl_, ONE_BATCH,
+      db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
   WritePreparedCommitEntryPreReleaseCallback update_commit_map(
       wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
+  PreReleaseCallback* pre_release_callback;
+  if (do_one_write) {
+    pre_release_callback = &update_commit_map;
+  } else {
+    pre_release_callback = &add_prepared_callback;
+  }
   // Note: the rollback batch does not need AddPrepared since it is written to
   // DB in one shot. min_uncommitted still works since it requires capturing
   // data that is written to DB but not yet committed, while
-  // the roolback batch commits with PreReleaseCallback.
+  // the rollback batch commits with PreReleaseCallback.
   s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
                           NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
-                          do_one_write ? &update_commit_map : nullptr);
+                          pre_release_callback);
   assert(!s.ok() || seq_used != kMaxSequenceNumber);
   if (!s.ok()) {
     return s;
@@ -347,15 +357,14 @@ Status WritePreparedTxn::RollbackInternal() {
     wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
     return s;
   }  // else do the 2nd write for commit
-  uint64_t& prepare_seq = seq_used;
+  uint64_t rollback_seq = seq_used;
   ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
-                    "RollbackInternal 2nd write prepare_seq: %" PRIu64,
-                    prepare_seq);
+                    "RollbackInternal 2nd write rollback_seq: %" PRIu64,
+                    rollback_seq);
   // Commit the batch by writing an empty batch to the queue that will release
   // the commit sequence number to readers.
-  const size_t ZERO_COMMITS = 0;
-  WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
-      wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS);
+  WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
+      wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_);
   WriteBatch empty_batch;
   empty_batch.PutLogData(Slice());
   // In the absence of Prepare markers, use Noop as a batch separator
@@ -364,20 +373,13 @@ Status WritePreparedTxn::RollbackInternal() {
                           NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
                           &update_commit_map_with_prepare);
   assert(!s.ok() || seq_used != kMaxSequenceNumber);
-  // Mark the txn as rolled back
-  uint64_t& rollback_seq = seq_used;
+  ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
+                    "RollbackInternal (status=%s) commit: %" PRIu64,
+                    s.ToString().c_str(), GetId());
   if (s.ok()) {
-    // Note: it is safe to do it after PreReleaseCallback via WriteImpl since
-    // all the writes by the prpared batch are already blinded by the rollback
-    // batch. The only reason we commit the prepared batch here is to benefit
-    // from the existing mechanism in CommitCache that takes care of the rare
-    // cases that the prepare seq is visible to a snsapshot but max evicted seq
-    // advances that prepare seq.
-    for (size_t i = 0; i < prepare_batch_cnt_; i++) {
-      wpt_db_->AddCommitted(GetId() + i, rollback_seq);
-    }
     wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
   }
+  wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
 
   return s;
 }
@@ -410,24 +412,12 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
   WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted);
   return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
                                                snap_seq, false /* cache_only */,
-                                               &snap_checker);
+                                               &snap_checker, min_uncommitted);
 }
 
 void WritePreparedTxn::SetSnapshot() {
-  // Note: for this optimization setting the last sequence number and obtaining
-  // the smallest uncommitted seq should be done atomically. However to avoid
-  // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
-  // snapshot. Since we always updated the list of unprepared seq (via
-  // AddPrepared) AFTER the last sequence is updated, this guarantees that the
-  // smallest uncommited seq that we pair with the snapshot is smaller or equal
-  // the value that would be obtained otherwise atomically. That is ok since
-  // this optimization works as long as min_uncommitted is less than or equal
-  // than the smallest uncommitted seq when the snapshot was taken.
-  auto min_uncommitted = wpt_db_->SmallestUnCommittedSeq();
-  const bool FOR_WW_CONFLICT_CHECK = true;
-  SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK);
-  assert(snapshot);
-  wpt_db_->EnhanceSnapshot(snapshot, min_uncommitted);
+  const bool kForWWConflictCheck = true;
+  SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck);
   SetSnapshotInternal(snapshot);
 }