]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / pessimistic_transaction_db.h
index 619a83e97e37afbdf69d2b4380dc393f71b198e7..25cd11054090803e3e7644aea6ca346327e50521 100644 (file)
@@ -21,6 +21,7 @@
 #include "rocksdb/utilities/transaction_db.h"
 #include "util/cast_util.h"
 #include "utilities/transactions/lock/lock_manager.h"
+#include "utilities/transactions/lock/range/range_lock_manager.h"
 #include "utilities/transactions/pessimistic_transaction.h"
 #include "utilities/transactions/write_prepared_txn.h"
 
@@ -70,20 +71,27 @@ class PessimisticTransactionDB : public TransactionDB {
   virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
   inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
                                             WriteBatch* updates) {
-    // Need to lock all keys in this batch to prevent write conflicts with
-    // concurrent transactions.
-    Transaction* txn = BeginInternalTransaction(opts);
-    txn->DisableIndexing();
-
-    auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
-
-    // Since commitBatch sorts the keys before locking, concurrent Write()
-    // operations will not cause a deadlock.
-    // In order to avoid a deadlock with a concurrent Transaction, Transactions
-    // should use a lock timeout.
-    Status s = txn_impl->CommitBatch(updates);
-
-    delete txn;
+    Status s;
+    if (opts.protection_bytes_per_key > 0) {
+      s = WriteBatchInternal::UpdateProtectionInfo(
+          updates, opts.protection_bytes_per_key);
+    }
+    if (s.ok()) {
+      // Need to lock all keys in this batch to prevent write conflicts with
+      // concurrent transactions.
+      Transaction* txn = BeginInternalTransaction(opts);
+      txn->DisableIndexing();
+
+      auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
+
+      // Since commitBatch sorts the keys before locking, concurrent Write()
+      // operations will not cause a deadlock.
+      // In order to avoid a deadlock with a concurrent Transaction,
+      // Transactions should use a lock timeout.
+      s = txn_impl->CommitBatch(updates);
+
+      delete txn;
+    }
 
     return s;
   }
@@ -93,11 +101,25 @@ class PessimisticTransactionDB : public TransactionDB {
                                     const std::string& column_family_name,
                                     ColumnFamilyHandle** handle) override;
 
+  Status CreateColumnFamilies(
+      const ColumnFamilyOptions& options,
+      const std::vector<std::string>& column_family_names,
+      std::vector<ColumnFamilyHandle*>* handles) override;
+
+  Status CreateColumnFamilies(
+      const std::vector<ColumnFamilyDescriptor>& column_families,
+      std::vector<ColumnFamilyHandle*>* handles) override;
+
   using StackableDB::DropColumnFamily;
   virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
 
+  Status DropColumnFamilies(
+      const std::vector<ColumnFamilyHandle*>& column_families) override;
+
   Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
                  const std::string& key, bool exclusive);
+  Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id,
+                      const Endpoint& start_endp, const Endpoint& end_endp);
 
   void UnLock(PessimisticTransaction* txn, const LockTracker& keys);
   void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
@@ -147,11 +169,28 @@ class PessimisticTransactionDB : public TransactionDB {
     return lock_manager_->GetLockTrackerFactory();
   }
 
+  std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot(
+      TxnTimestamp ts) override;
+
+  std::shared_ptr<const Snapshot> GetTimestampedSnapshot(
+      TxnTimestamp ts) const override;
+
+  void ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts) override;
+
+  Status GetTimestampedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub,
+                                 std::vector<std::shared_ptr<const Snapshot>>&
+                                     timestamped_snapshots) const override;
+
  protected:
   DBImpl* db_impl_;
   std::shared_ptr<Logger> info_log_;
   const TransactionDBOptions txn_db_options_;
 
+  static Status FailIfBatchHasTs(const WriteBatch* wb);
+
+  static Status FailIfCfEnablesTs(const DB* db,
+                                  const ColumnFamilyHandle* column_family);
+
   void ReinitializeTransaction(
       Transaction* txn, const WriteOptions& write_options,
       const TransactionOptions& txn_options = TransactionOptions());
@@ -172,11 +211,12 @@ class PessimisticTransactionDB : public TransactionDB {
   friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
   friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
 
-  std::unique_ptr<LockManager> lock_manager_;
+  Transaction* BeginInternalTransaction(const WriteOptions& options);
+
+  std::shared_ptr<LockManager> lock_manager_;
 
   // Must be held when adding/dropping column families.
   InstrumentedMutex column_family_mutex_;
-  Transaction* BeginInternalTransaction(const WriteOptions& options);
 
   // Used to ensure that no locks are stolen from an expirable transaction
   // that has started a commit. Only transactions with an expiration time
@@ -221,5 +261,58 @@ class WriteCommittedTxnDB : public PessimisticTransactionDB {
   virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
 };
 
+inline Status PessimisticTransactionDB::FailIfBatchHasTs(
+    const WriteBatch* batch) {
+  if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
+    return Status::NotSupported(
+        "Writes with timestamp must go through transaction API instead of "
+        "TransactionDB.");
+  }
+  return Status::OK();
+}
+
+inline Status PessimisticTransactionDB::FailIfCfEnablesTs(
+    const DB* db, const ColumnFamilyHandle* column_family) {
+  assert(db);
+  column_family = column_family ? column_family : db->DefaultColumnFamily();
+  assert(column_family);
+  const Comparator* const ucmp = column_family->GetComparator();
+  assert(ucmp);
+  if (ucmp->timestamp_size() > 0) {
+    return Status::NotSupported(
+        "Write operation with user timestamp must go through the transaction "
+        "API instead of TransactionDB.");
+  }
+  return Status::OK();
+}
+
+class SnapshotCreationCallback : public PostMemTableCallback {
+ public:
+  explicit SnapshotCreationCallback(
+      DBImpl* dbi, TxnTimestamp commit_ts,
+      const std::shared_ptr<TransactionNotifier>& notifier,
+      std::shared_ptr<const Snapshot>& snapshot)
+      : db_impl_(dbi),
+        commit_ts_(commit_ts),
+        snapshot_notifier_(notifier),
+        snapshot_(snapshot) {
+    assert(db_impl_);
+  }
+
+  ~SnapshotCreationCallback() override {
+    snapshot_creation_status_.PermitUncheckedError();
+  }
+
+  Status operator()(SequenceNumber seq, bool disable_memtable) override;
+
+ private:
+  DBImpl* const db_impl_;
+  const TxnTimestamp commit_ts_;
+  std::shared_ptr<TransactionNotifier> snapshot_notifier_;
+  std::shared_ptr<const Snapshot>& snapshot_;
+
+  Status snapshot_creation_status_;
+};
+
 }  // namespace ROCKSDB_NAMESPACE
 #endif  // ROCKSDB_LITE