]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/transactions/transaction_base.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / transaction_base.cc
index 1482dc961d887949d026c1ce716dcff7f3abf391..83fd94ac8557e78e1a42db265585d0ab707bcf36 100644 (file)
@@ -11,6 +11,7 @@
 
 #include "db/column_family.h"
 #include "db/db_impl/db_impl.h"
+#include "logging/logging.h"
 #include "rocksdb/comparator.h"
 #include "rocksdb/db.h"
 #include "rocksdb/status.h"
 
 namespace ROCKSDB_NAMESPACE {
 
+Status Transaction::CommitAndTryCreateSnapshot(
+    std::shared_ptr<TransactionNotifier> notifier, TxnTimestamp ts,
+    std::shared_ptr<const Snapshot>* snapshot) {
+  if (snapshot) {
+    snapshot->reset();
+  }
+  TxnTimestamp commit_ts = GetCommitTimestamp();
+  if (commit_ts == kMaxTxnTimestamp) {
+    if (ts == kMaxTxnTimestamp) {
+      return Status::InvalidArgument("Commit timestamp unset");
+    } else {
+      const Status s = SetCommitTimestamp(ts);
+      if (!s.ok()) {
+        return s;
+      }
+    }
+  } else if (ts != kMaxTxnTimestamp) {
+    if (ts != commit_ts) {
+      // For now we treat this as error.
+      return Status::InvalidArgument("Different commit ts specified");
+    }
+  }
+  SetSnapshotOnNextOperation(notifier);
+  Status s = Commit();
+  if (!s.ok()) {
+    return s;
+  }
+  assert(s.ok());
+  // If we reach here, we must return ok status for this function.
+  std::shared_ptr<const Snapshot> new_snapshot = GetTimestampedSnapshot();
+
+  if (snapshot) {
+    *snapshot = new_snapshot;
+  }
+  return Status::OK();
+}
+
 TransactionBaseImpl::TransactionBaseImpl(
     DB* db, const WriteOptions& write_options,
     const LockTrackerFactory& lock_tracker_factory)
@@ -28,9 +66,12 @@ TransactionBaseImpl::TransactionBaseImpl(
       write_options_(write_options),
       cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
       lock_tracker_factory_(lock_tracker_factory),
-      start_time_(db_->GetEnv()->NowMicros()),
-      write_batch_(cmp_, 0, true, 0),
+      start_time_(dbimpl_->GetSystemClock()->NowMicros()),
+      write_batch_(cmp_, 0, true, 0, write_options.protection_bytes_per_key),
       tracked_locks_(lock_tracker_factory_.Create()),
+      commit_time_batch_(0 /* reserved_bytes */, 0 /* max_bytes */,
+                         write_options.protection_bytes_per_key,
+                         0 /* default_cf_ts_sz */),
       indexing_enabled_(true) {
   assert(dynamic_cast<DBImpl*>(db_) != nullptr);
   log_number_ = 0;
@@ -67,9 +108,15 @@ void TransactionBaseImpl::Reinitialize(DB* db,
   name_.clear();
   log_number_ = 0;
   write_options_ = write_options;
-  start_time_ = db_->GetEnv()->NowMicros();
+  start_time_ = dbimpl_->GetSystemClock()->NowMicros();
   indexing_enabled_ = true;
   cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily());
+  WriteBatchInternal::UpdateProtectionInfo(
+      write_batch_.GetWriteBatch(), write_options_.protection_bytes_per_key)
+      .PermitUncheckedError();
+  WriteBatchInternal::UpdateProtectionInfo(
+      &commit_time_batch_, write_options_.protection_bytes_per_key)
+      .PermitUncheckedError();
 }
 
 void TransactionBaseImpl::SetSnapshot() {
@@ -124,7 +171,9 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
 
 void TransactionBaseImpl::SetSavePoint() {
   if (save_points_ == nullptr) {
-    save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>());
+    save_points_.reset(
+        new std::stack<TransactionBaseImpl::SavePoint,
+                       autovector<TransactionBaseImpl::SavePoint>>());
   }
   save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
                         num_puts_, num_deletes_, num_merges_,
@@ -160,8 +209,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
 }
 
 Status TransactionBaseImpl::PopSavePoint() {
-  if (save_points_ == nullptr ||
-      save_points_->empty()) {
+  if (save_points_ == nullptr || save_points_->empty()) {
     // No SavePoint yet.
     assert(write_batch_.PopSavePoint().IsNotFound());
     return Status::NotFound();
@@ -306,7 +354,8 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
   Iterator* db_iter = db_->NewIterator(read_options);
   assert(db_iter);
 
-  return write_batch_.NewIteratorWithBase(db_iter);
+  return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter,
+                                          &read_options);
 }
 
 Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
@@ -540,7 +589,7 @@ WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
 }
 
 uint64_t TransactionBaseImpl::GetElapsedTime() const {
-  return (db_->GetEnv()->NowMicros() - start_time_) / 1000;
+  return (dbimpl_->GetSystemClock()->NowMicros() - start_time_) / 1000;
 }
 
 uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_; }
@@ -661,6 +710,10 @@ Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) {
       return Status::InvalidArgument();
     }
 
+    Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
+      return Status::InvalidArgument();
+    }
+
     Status MarkRollback(const Slice&) override {
       return Status::InvalidArgument();
     }