]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/transactions/pessimistic_transaction.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / pessimistic_transaction.cc
index 5ae5fed082238777a61b16ff7a2c3829d7ef756b..8c71c1de790c0f13256f16f0b59a6bd28af29567 100644 (file)
@@ -38,7 +38,10 @@ TransactionID PessimisticTransaction::GenTxnID() {
 PessimisticTransaction::PessimisticTransaction(
     TransactionDB* txn_db, const WriteOptions& write_options,
     const TransactionOptions& txn_options, const bool init)
-    : TransactionBaseImpl(txn_db->GetRootDB(), write_options),
+    : TransactionBaseImpl(
+          txn_db->GetRootDB(), write_options,
+          static_cast_with_check<PessimisticTransactionDB>(txn_db)
+              ->GetLockTrackerFactory()),
       txn_db_impl_(nullptr),
       expiration_time_(0),
       txn_id_(0),
@@ -48,9 +51,8 @@ PessimisticTransaction::PessimisticTransaction(
       deadlock_detect_(false),
       deadlock_detect_depth_(0),
       skip_concurrency_control_(false) {
-  txn_db_impl_ =
-      static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
-  db_impl_ = static_cast_with_check<DBImpl, DB>(db_);
+  txn_db_impl_ = static_cast_with_check<PessimisticTransactionDB>(txn_db);
+  db_impl_ = static_cast_with_check<DBImpl>(db_);
   if (init) {
     Initialize(txn_options);
   }
@@ -88,27 +90,28 @@ void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
   }
   use_only_the_last_commit_time_batch_for_recovery_ =
       txn_options.use_only_the_last_commit_time_batch_for_recovery;
+  skip_prepare_ = txn_options.skip_prepare;
 }
 
 PessimisticTransaction::~PessimisticTransaction() {
-  txn_db_impl_->UnLock(this, &GetTrackedKeys());
+  txn_db_impl_->UnLock(this, *tracked_locks_);
   if (expiration_time_ > 0) {
     txn_db_impl_->RemoveExpirableTransaction(txn_id_);
   }
-  if (!name_.empty() && txn_state_ != COMMITED) {
+  if (!name_.empty() && txn_state_ != COMMITTED) {
     txn_db_impl_->UnregisterTransaction(this);
   }
 }
 
 void PessimisticTransaction::Clear() {
-  txn_db_impl_->UnLock(this, &GetTrackedKeys());
+  txn_db_impl_->UnLock(this, *tracked_locks_);
   TransactionBaseImpl::Clear();
 }
 
 void PessimisticTransaction::Reinitialize(
     TransactionDB* txn_db, const WriteOptions& write_options,
     const TransactionOptions& txn_options) {
-  if (!name_.empty() && txn_state_ != COMMITED) {
+  if (!name_.empty() && txn_state_ != COMMITTED) {
     txn_db_impl_->UnregisterTransaction(this);
   }
   TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);
@@ -132,8 +135,8 @@ WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
     : PessimisticTransaction(txn_db, write_options, txn_options){};
 
 Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
-  TransactionKeyMap keys_to_unlock;
-  Status s = LockBatch(batch, &keys_to_unlock);
+  std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create());
+  Status s = LockBatch(batch, keys_to_unlock.get());
 
   if (!s.ok()) {
     return s;
@@ -156,7 +159,7 @@ Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
     txn_state_.store(AWAITING_COMMIT);
     s = CommitBatchInternal(batch);
     if (s.ok()) {
-      txn_state_.store(COMMITED);
+      txn_state_.store(COMMITTED);
     }
   } else if (txn_state_ == LOCKS_STOLEN) {
     s = Status::Expired();
@@ -164,13 +167,12 @@ Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
     s = Status::InvalidArgument("Transaction is not in state for commit.");
   }
 
-  txn_db_impl_->UnLock(this, &keys_to_unlock);
+  txn_db_impl_->UnLock(this, *keys_to_unlock);
 
   return s;
 }
 
 Status PessimisticTransaction::Prepare() {
-  Status s;
 
   if (name_.empty()) {
     return Status::InvalidArgument(
@@ -181,6 +183,7 @@ Status PessimisticTransaction::Prepare() {
     return Status::Expired();
   }
 
+  Status s;
   bool can_prepare = false;
 
   if (expiration_time_ > 0) {
@@ -191,11 +194,11 @@ Status PessimisticTransaction::Prepare() {
                                                       AWAITING_PREPARE);
   } else if (txn_state_ == STARTED) {
     // expiration and lock stealing is not possible
+    txn_state_.store(AWAITING_PREPARE);
     can_prepare = true;
   }
 
   if (can_prepare) {
-    txn_state_.store(AWAITING_PREPARE);
     // transaction can't expire after preparation
     expiration_time_ = 0;
     assert(log_number_ == 0 ||
@@ -209,7 +212,7 @@ Status PessimisticTransaction::Prepare() {
     s = Status::Expired();
   } else if (txn_state_ == PREPARED) {
     s = Status::InvalidArgument("Transaction has already been prepared.");
-  } else if (txn_state_ == COMMITED) {
+  } else if (txn_state_ == COMMITTED) {
     s = Status::InvalidArgument("Transaction has already been committed.");
   } else if (txn_state_ == ROLLEDBACK) {
     s = Status::InvalidArgument("Transaction has already been rolledback.");
@@ -223,7 +226,9 @@ Status PessimisticTransaction::Prepare() {
 Status WriteCommittedTxn::PrepareInternal() {
   WriteOptions write_options = write_options_;
   write_options.disableWAL = false;
-  WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
+  auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
+                                              name_);
+  assert(s.ok());
   class MarkLogCallback : public PreReleaseCallback {
    public:
     MarkLogCallback(DBImpl* db, bool two_write_queues)
@@ -253,15 +258,14 @@ Status WriteCommittedTxn::PrepareInternal() {
   const bool kDisableMemtable = true;
   SequenceNumber* const KIgnoreSeqUsed = nullptr;
   const size_t kNoBatchCount = 0;
-  Status s = db_impl_->WriteImpl(
-      write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback,
-      &log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
-      &mark_log_callback);
+  s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
+                          kNoWriteCallback, &log_number_, kRefNoLog,
+                          kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
+                          &mark_log_callback);
   return s;
 }
 
 Status PessimisticTransaction::Commit() {
-  Status s;
   bool commit_without_prepare = false;
   bool commit_prepared = false;
 
@@ -284,12 +288,14 @@ Status PessimisticTransaction::Commit() {
     commit_prepared = true;
   } else if (txn_state_ == STARTED) {
     // expiration and lock stealing is not a concern
-    commit_without_prepare = true;
-    // TODO(myabandeh): what if the user mistakenly forgets prepare? We should
-    // add an option so that the user explictly express the intention of
-    // skipping the prepare phase.
+    if (skip_prepare_) {
+      commit_without_prepare = true;
+    } else {
+      return Status::TxnNotPrepared();
+    }
   }
 
+  Status s;
   if (commit_without_prepare) {
     assert(!commit_prepared);
     if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
@@ -307,7 +313,7 @@ Status PessimisticTransaction::Commit() {
       }
       Clear();
       if (s.ok()) {
-        txn_state_.store(COMMITED);
+        txn_state_.store(COMMITTED);
       }
     }
   } else if (commit_prepared) {
@@ -330,10 +336,10 @@ Status PessimisticTransaction::Commit() {
     txn_db_impl_->UnregisterTransaction(this);
 
     Clear();
-    txn_state_.store(COMMITED);
+    txn_state_.store(COMMITTED);
   } else if (txn_state_ == LOCKS_STOLEN) {
     s = Status::Expired();
-  } else if (txn_state_ == COMMITED) {
+  } else if (txn_state_ == COMMITTED) {
     s = Status::InvalidArgument("Transaction has already been committed.");
   } else if (txn_state_ == ROLLEDBACK) {
     s = Status::InvalidArgument("Transaction has already been rolledback.");
@@ -373,7 +379,8 @@ Status WriteCommittedTxn::CommitInternal() {
   // We take the commit-time batch and append the Commit marker.
   // The Memtable will ignore the Commit marker in non-recovery mode
   WriteBatch* working_batch = GetCommitTimeWriteBatch();
-  WriteBatchInternal::MarkCommit(working_batch, name_);
+  auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
+  assert(s.ok());
 
   // any operations appended to this working_batch will be ignored from WAL
   working_batch->MarkWalTerminationPoint();
@@ -381,13 +388,14 @@ Status WriteCommittedTxn::CommitInternal() {
   // insert prepared batch into Memtable only skipping WAL.
   // Memtable will ignore BeginPrepare/EndPrepare markers
   // in non recovery mode and simply insert the values
-  WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());
+  s = WriteBatchInternal::Append(working_batch,
+                                 GetWriteBatch()->GetWriteBatch());
+  assert(s.ok());
 
   uint64_t seq_used = kMaxSequenceNumber;
-  auto s =
-      db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
+  s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
                           /*log_used*/ nullptr, /*log_ref*/ log_number_,
-                          /*disable_memtable*/ false, &seq_used);  
+                          /*disable_memtable*/ false, &seq_used);
   assert(!s.ok() || seq_used != kMaxSequenceNumber);
   if (s.ok()) {
     SetId(seq_used);
@@ -423,7 +431,7 @@ Status PessimisticTransaction::Rollback() {
     }
     // prepare couldn't have taken place
     Clear();
-  } else if (txn_state_ == COMMITED) {
+  } else if (txn_state_ == COMMITTED) {
     s = Status::InvalidArgument("This transaction has already been committed.");
   } else {
     s = Status::InvalidArgument(
@@ -435,8 +443,9 @@ Status PessimisticTransaction::Rollback() {
 
 Status WriteCommittedTxn::RollbackInternal() {
   WriteBatch rollback_marker;
-  WriteBatchInternal::MarkRollback(&rollback_marker, name_);
-  auto s = db_impl_->WriteImpl(write_options_, &rollback_marker);
+  auto s = WriteBatchInternal::MarkRollback(&rollback_marker, name_);
+  assert(s.ok());
+  s = db_impl_->WriteImpl(write_options_, &rollback_marker);
   return s;
 }
 
@@ -445,12 +454,14 @@ Status PessimisticTransaction::RollbackToSavePoint() {
     return Status::InvalidArgument("Transaction is beyond state for rollback.");
   }
 
-  // Unlock any keys locked since last transaction
-  const std::unique_ptr<TransactionKeyMap>& keys =
-      GetTrackedKeysSinceSavePoint();
-
-  if (keys) {
-    txn_db_impl_->UnLock(this, keys.get());
+  if (save_points_ != nullptr && !save_points_->empty()) {
+    // Unlock any keys locked since last transaction
+    auto& save_point_tracker = *save_points_->top().new_locks_;
+    std::unique_ptr<LockTracker> t(
+        tracked_locks_->GetTrackedLocksSinceSavePoint(save_point_tracker));
+    if (t) {
+      txn_db_impl_->UnLock(this, *t);
+    }
   }
 
   return TransactionBaseImpl::RollbackToSavePoint();
@@ -459,7 +470,7 @@ Status PessimisticTransaction::RollbackToSavePoint() {
 // Lock all keys in this batch.
 // On success, caller should unlock keys_to_unlock
 Status PessimisticTransaction::LockBatch(WriteBatch* batch,
-                                         TransactionKeyMap* keys_to_unlock) {
+                                         LockTracker* keys_to_unlock) {
   class Handler : public WriteBatch::Handler {
    public:
     // Sorted map of column_family_id to sorted set of keys.
@@ -499,9 +510,10 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch,
 
   // Iterating on this handler will add all keys in this batch into keys
   Handler handler;
-  batch->Iterate(&handler);
-
-  Status s;
+  Status s = batch->Iterate(&handler);
+  if (!s.ok()) {
+    return s;
+  }
 
   // Attempt to lock all keys
   for (const auto& cf_iter : handler.keys_) {
@@ -515,8 +527,13 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch,
       if (!s.ok()) {
         break;
       }
-      TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
-               false, true /* exclusive */);
+      PointLockRequest r;
+      r.column_family_id = cfh_id;
+      r.key = key;
+      r.seq = kMaxSequenceNumber;
+      r.read_only = false;
+      r.exclusive = true;
+      keys_to_unlock->Track(r);
     }
 
     if (!s.ok()) {
@@ -525,7 +542,7 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch,
   }
 
   if (!s.ok()) {
-    txn_db_impl_->UnLock(this, keys_to_unlock);
+    txn_db_impl_->UnLock(this, *keys_to_unlock);
   }
 
   return s;
@@ -547,28 +564,9 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
   }
   uint32_t cfh_id = GetColumnFamilyID(column_family);
   std::string key_str = key.ToString();
-  bool previously_locked;
-  bool lock_upgrade = false;
-
-  // lock this key if this transactions hasn't already locked it
-  SequenceNumber tracked_at_seq = kMaxSequenceNumber;
-
-  const auto& tracked_keys = GetTrackedKeys();
-  const auto tracked_keys_cf = tracked_keys.find(cfh_id);
-  if (tracked_keys_cf == tracked_keys.end()) {
-    previously_locked = false;
-  } else {
-    auto iter = tracked_keys_cf->second.find(key_str);
-    if (iter == tracked_keys_cf->second.end()) {
-      previously_locked = false;
-    } else {
-      if (!iter->second.exclusive && exclusive) {
-        lock_upgrade = true;
-      }
-      previously_locked = true;
-      tracked_at_seq = iter->second.seq;
-    }
-  }
+  PointLockStatus status = tracked_locks_->GetPointLockStatus(cfh_id, key_str);
+  bool previously_locked = status.locked;
+  bool lock_upgrade = previously_locked && exclusive && !status.exclusive;
 
   // Lock this key if this transactions hasn't already locked it or we require
   // an upgrade.
@@ -584,6 +582,8 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
   // any writes since this transaction's snapshot.
   // TODO(agiardullo): could optimize by supporting shared txn locks in the
   // future
+  SequenceNumber tracked_at_seq =
+      status.locked ? status.seq : kMaxSequenceNumber;
   if (!do_validate || snapshot_ == nullptr) {
     if (assume_tracked && !previously_locked) {
       s = Status::InvalidArgument(
@@ -613,15 +613,13 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
 
       if (!s.ok()) {
         // Failed to validate key
-        if (!previously_locked) {
-          // Unlock key we just locked
-          if (lock_upgrade) {
-            s = txn_db_impl_->TryLock(this, cfh_id, key_str,
-                                      false /* exclusive */);
-            assert(s.ok());
-          } else {
-            txn_db_impl_->UnLock(this, cfh_id, key.ToString());
-          }
+        // Unlock key we just locked
+        if (lock_upgrade) {
+          s = txn_db_impl_->TryLock(this, cfh_id, key_str,
+                                    false /* exclusive */);
+          assert(s.ok());
+        } else if (!previously_locked) {
+          txn_db_impl_->UnLock(this, cfh_id, key.ToString());
         }
       }
     }
@@ -644,10 +642,11 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
       TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
     } else {
 #ifndef NDEBUG
-      assert(tracked_keys_cf->second.count(key_str) > 0);
-      const auto& info = tracked_keys_cf->second.find(key_str)->second;
-      assert(info.seq <= tracked_at_seq);
-      assert(info.exclusive == exclusive);
+      PointLockStatus lock_status =
+          tracked_locks_->GetPointLockStatus(cfh_id, key_str);
+      assert(lock_status.locked);
+      assert(lock_status.seq <= tracked_at_seq);
+      assert(lock_status.exclusive == exclusive);
 #endif
     }
   }