]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/transactions/write_prepared_txn.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn.cc
index 4100925c55ce5e17889569e0473af6a0c87551d8..216d83555ce494384f69ce1efe99375e35529ea7 100644 (file)
@@ -7,16 +7,12 @@
 
 #include "utilities/transactions/write_prepared_txn.h"
 
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
+#include <cinttypes>
 #include <map>
 #include <set>
 
 #include "db/column_family.h"
-#include "db/db_impl.h"
+#include "db/db_impl/db_impl.h"
 #include "rocksdb/db.h"
 #include "rocksdb/status.h"
 #include "rocksdb/utilities/transaction_db.h"
@@ -24,7 +20,7 @@
 #include "utilities/transactions/pessimistic_transaction.h"
 #include "utilities/transactions/write_prepared_txn_db.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 struct WriteOptions;
 
@@ -44,23 +40,46 @@ void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
   prepare_batch_cnt_ = 0;
 }
 
-Status WritePreparedTxn::Get(const ReadOptions& read_options,
+void WritePreparedTxn::MultiGet(const ReadOptions& options,
+                                ColumnFamilyHandle* column_family,
+                                const size_t num_keys, const Slice* keys,
+                                PinnableSlice* values, Status* statuses,
+                                const bool sorted_input) {
+  SequenceNumber min_uncommitted, snap_seq;
+  const SnapshotBackup backed_by_snapshot =
+      wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
+  WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
+                                        backed_by_snapshot);
+  write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
+                                      keys, values, statuses, sorted_input,
+                                      &callback);
+  if (UNLIKELY(!callback.valid() ||
+               !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
+    wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
+    for (size_t i = 0; i < num_keys; i++) {
+      statuses[i] = Status::TryAgain();
+    }
+  }
+}
+
+Status WritePreparedTxn::Get(const ReadOptions& options,
                              ColumnFamilyHandle* column_family,
                              const Slice& key, PinnableSlice* pinnable_val) {
-  auto snapshot = read_options.snapshot;
-  auto snap_seq =
-      snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
-  SequenceNumber min_uncommitted =
-      kMinUnCommittedSeq;  // by default disable the optimization
-  if (snapshot != nullptr) {
-    min_uncommitted =
-        static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
-            ->min_uncommitted_;
+  SequenceNumber min_uncommitted, snap_seq;
+  const SnapshotBackup backed_by_snapshot =
+      wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
+  WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
+                                        backed_by_snapshot);
+  auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
+                                            pinnable_val, &callback);
+  if (LIKELY(callback.valid() &&
+             wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
+                                       backed_by_snapshot))) {
+    return res;
+  } else {
+    wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
+    return Status::TryAgain();
   }
-
-  WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted);
-  return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
-                                        pinnable_val, &callback);
 }
 
 Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
@@ -175,12 +194,15 @@ Status WritePreparedTxn::CommitInternal() {
   assert(!s.ok() || seq_used != kMaxSequenceNumber);
   const SequenceNumber commit_batch_seq = seq_used;
   if (LIKELY(do_one_write || !s.ok())) {
-    if (LIKELY(s.ok())) {
-      // Note RemovePrepared should be called after WriteImpl that publishsed
+    if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues &&
+                 s.ok())) {
+      // Note: RemovePrepared should be called after WriteImpl that publishsed
       // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
       wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
-    }
+    }  // else RemovePrepared is called from within PreReleaseCallback
     if (UNLIKELY(!do_one_write)) {
+      assert(!s.ok());
+      // Cleanup the prepared entry we added with add_prepared_callback
       wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
     }
     return s;
@@ -205,10 +227,14 @@ Status WritePreparedTxn::CommitInternal() {
                           NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
                           &update_commit_map_with_aux_batch);
   assert(!s.ok() || seq_used != kMaxSequenceNumber);
-  // Note RemovePrepared should be called after WriteImpl that publishsed the
-  // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
-  wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
-  wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
+  if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) {
+    if (s.ok()) {
+      // Note: RemovePrepared should be called after WriteImpl that publishsed
+      // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
+      wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
+    }
+    wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
+  }  // else RemovePrepared is called from within PreReleaseCallback
   return s;
 }
 
@@ -221,9 +247,11 @@ Status WritePreparedTxn::RollbackInternal() {
   auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
   auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
   auto read_at_seq = kMaxSequenceNumber;
+  ReadOptions roptions;
+  // to prevent callback's seq to be overrriden inside DBImpk::Get
+  roptions.snapshot = wpt_db_->GetMaxSnapshot();
   struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
     DBImpl* db_;
-    ReadOptions roptions;
     WritePreparedTxnReadCallback callback;
     WriteBatch* rollback_batch_;
     std::map<uint32_t, const Comparator*>& comparators_;
@@ -231,18 +259,20 @@ Status WritePreparedTxn::RollbackInternal() {
     using CFKeys = std::set<Slice, SetComparator>;
     std::map<uint32_t, CFKeys> keys_;
     bool rollback_merge_operands_;
+    ReadOptions roptions_;
     RollbackWriteBatchBuilder(
         DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
         WriteBatch* dst_batch,
         std::map<uint32_t, const Comparator*>& comparators,
         std::map<uint32_t, ColumnFamilyHandle*>& handles,
-        bool rollback_merge_operands)
+        bool rollback_merge_operands, ReadOptions _roptions)
         : db_(db),
           callback(wpt_db, snap_seq),  // disable min_uncommitted optimization
           rollback_batch_(dst_batch),
           comparators_(comparators),
           handles_(handles),
-          rollback_merge_operands_(rollback_merge_operands) {}
+          rollback_merge_operands_(rollback_merge_operands),
+          roptions_(_roptions) {}
 
     Status Rollback(uint32_t cf, const Slice& key) {
       Status s;
@@ -260,8 +290,12 @@ Status WritePreparedTxn::RollbackInternal() {
       PinnableSlice pinnable_val;
       bool not_used;
       auto cf_handle = handles_[cf];
-      s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
-                       &callback);
+      DBImpl::GetImplOptions get_impl_options;
+      get_impl_options.column_family = cf_handle;
+      get_impl_options.value = &pinnable_val;
+      get_impl_options.value_found = &not_used;
+      get_impl_options.callback = &callback;
+      s = db_->GetImpl(roptions_, key, get_impl_options);
       assert(s.ok() || s.IsNotFound());
       if (s.ok()) {
         s = rollback_batch_->Put(cf_handle, key, pinnable_val);
@@ -310,7 +344,8 @@ Status WritePreparedTxn::RollbackInternal() {
     bool WriteAfterCommit() const override { return false; }
   } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
                      *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
-                     wpt_db_->txn_db_options_.rollback_merge_operands);
+                     wpt_db_->txn_db_options_.rollback_merge_operands,
+                     roptions);
   auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
   assert(s.ok());
   if (!s.ok()) {
@@ -354,6 +389,7 @@ Status WritePreparedTxn::RollbackInternal() {
     return s;
   }
   if (do_one_write) {
+    assert(!db_impl_->immutable_db_options().two_write_queues);
     wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
     return s;
   }  // else do the 2nd write for commit
@@ -376,9 +412,13 @@ Status WritePreparedTxn::RollbackInternal() {
   ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
                     "RollbackInternal (status=%s) commit: %" PRIu64,
                     s.ToString().c_str(), GetId());
+  // TODO(lth): For WriteUnPrepared that rollback is called frequently,
+  // RemovePrepared could be moved to the callback to reduce lock contention.
   if (s.ok()) {
     wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
   }
+  // Note: RemovePrepared for prepared batch is called from within
+  // PreReleaseCallback
   wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
 
   return s;
@@ -409,7 +449,8 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
   ColumnFamilyHandle* cfh =
       column_family ? column_family : db_impl_->DefaultColumnFamily();
 
-  WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted);
+  WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted,
+                                            kBackedByDBSnapshot);
   return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
                                                snap_seq, false /* cache_only */,
                                                &snap_checker, min_uncommitted);
@@ -427,6 +468,6 @@ Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
   return ret;
 }
 
-}  // namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE
 
 #endif  // ROCKSDB_LITE