]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/transactions/pessimistic_transaction.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / pessimistic_transaction.cc
index 8c71c1de790c0f13256f16f0b59a6bd28af29567..cb8fd3bb61cf5aea9ded13dd9b2bf8782a800a14 100644 (file)
@@ -14,6 +14,7 @@
 
 #include "db/column_family.h"
 #include "db/db_impl/db_impl.h"
+#include "logging/logging.h"
 #include "rocksdb/comparator.h"
 #include "rocksdb/db.h"
 #include "rocksdb/snapshot.h"
@@ -24,6 +25,7 @@
 #include "util/string_util.h"
 #include "utilities/transactions/pessimistic_transaction_db.h"
 #include "utilities/transactions/transaction_util.h"
+#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -59,7 +61,14 @@ PessimisticTransaction::PessimisticTransaction(
 }
 
 void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
-  txn_id_ = GenTxnID();
+  // Range lock manager uses address of transaction object as TXNID
+  const TransactionDBOptions& db_options = txn_db_impl_->GetTxnDBOptions();
+  if (db_options.lock_mgr_handle &&
+      db_options.lock_mgr_handle->getLockManager()->IsRangeLockSupported()) {
+    txn_id_ = reinterpret_cast<TransactionID>(this);
+  } else {
+    txn_id_ = GenTxnID();
+  }
 
   txn_state_ = STARTED;
 
@@ -91,6 +100,9 @@ void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
   use_only_the_last_commit_time_batch_for_recovery_ =
       txn_options.use_only_the_last_commit_time_batch_for_recovery;
   skip_prepare_ = txn_options.skip_prepare;
+
+  read_timestamp_ = kMaxTxnTimestamp;
+  commit_timestamp_ = kMaxTxnTimestamp;
 }
 
 PessimisticTransaction::~PessimisticTransaction() {
@@ -120,7 +132,7 @@ void PessimisticTransaction::Reinitialize(
 
 bool PessimisticTransaction::IsExpired() const {
   if (expiration_time_ > 0) {
-    if (db_->GetEnv()->NowMicros() >= expiration_time_) {
+    if (dbimpl_->GetSystemClock()->NowMicros() >= expiration_time_) {
       // Transaction is expired.
       return true;
     }
@@ -132,9 +144,306 @@ bool PessimisticTransaction::IsExpired() const {
 WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
                                      const WriteOptions& write_options,
                                      const TransactionOptions& txn_options)
-    : PessimisticTransaction(txn_db, write_options, txn_options){};
+    : PessimisticTransaction(txn_db, write_options, txn_options) {}
+
+Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options,
+                                       ColumnFamilyHandle* column_family,
+                                       const Slice& key, std::string* value,
+                                       bool exclusive, const bool do_validate) {
+  return GetForUpdateImpl(read_options, column_family, key, value, exclusive,
+                          do_validate);
+}
+
+Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options,
+                                       ColumnFamilyHandle* column_family,
+                                       const Slice& key,
+                                       PinnableSlice* pinnable_val,
+                                       bool exclusive, const bool do_validate) {
+  return GetForUpdateImpl(read_options, column_family, key, pinnable_val,
+                          exclusive, do_validate);
+}
+
+template <typename TValue>
+inline Status WriteCommittedTxn::GetForUpdateImpl(
+    const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+    const Slice& key, TValue* value, bool exclusive, const bool do_validate) {
+  column_family =
+      column_family ? column_family : db_impl_->DefaultColumnFamily();
+  assert(column_family);
+  if (!read_options.timestamp) {
+    const Comparator* const ucmp = column_family->GetComparator();
+    assert(ucmp);
+    size_t ts_sz = ucmp->timestamp_size();
+    if (0 == ts_sz) {
+      return TransactionBaseImpl::GetForUpdate(read_options, column_family, key,
+                                               value, exclusive, do_validate);
+    }
+  } else {
+    Status s = db_impl_->FailIfTsMismatchCf(
+        column_family, *(read_options.timestamp), /*ts_for_read=*/true);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+
+  if (!do_validate) {
+    return Status::InvalidArgument(
+        "If do_validate is false then GetForUpdate with read_timestamp is not "
+        "defined.");
+  } else if (kMaxTxnTimestamp == read_timestamp_) {
+    return Status::InvalidArgument("read_timestamp must be set for validation");
+  }
+
+  if (!read_options.timestamp) {
+    ReadOptions read_opts_copy = read_options;
+    char ts_buf[sizeof(kMaxTxnTimestamp)];
+    EncodeFixed64(ts_buf, read_timestamp_);
+    Slice ts(ts_buf, sizeof(ts_buf));
+    read_opts_copy.timestamp = &ts;
+    return TransactionBaseImpl::GetForUpdate(read_opts_copy, column_family, key,
+                                             value, exclusive, do_validate);
+  }
+  assert(read_options.timestamp);
+  const char* const ts_buf = read_options.timestamp->data();
+  assert(read_options.timestamp->size() == sizeof(kMaxTxnTimestamp));
+  TxnTimestamp ts = DecodeFixed64(ts_buf);
+  if (ts != read_timestamp_) {
+    return Status::InvalidArgument("Must read from the same read_timestamp");
+  }
+  return TransactionBaseImpl::GetForUpdate(read_options, column_family, key,
+                                           value, exclusive, do_validate);
+}
+
+Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family,
+                              const Slice& key, const Slice& value,
+                              const bool assume_tracked) {
+  const bool do_validate = !assume_tracked;
+  return Operate(column_family, key, do_validate, assume_tracked,
+                 [column_family, &key, &value, this]() {
+                   Status s =
+                       GetBatchForWrite()->Put(column_family, key, value);
+                   if (s.ok()) {
+                     ++num_puts_;
+                   }
+                   return s;
+                 });
+}
+
+Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family,
+                              const SliceParts& key, const SliceParts& value,
+                              const bool assume_tracked) {
+  const bool do_validate = !assume_tracked;
+  return Operate(column_family, key, do_validate, assume_tracked,
+                 [column_family, &key, &value, this]() {
+                   Status s =
+                       GetBatchForWrite()->Put(column_family, key, value);
+                   if (s.ok()) {
+                     ++num_puts_;
+                   }
+                   return s;
+                 });
+}
+
+Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family,
+                                       const Slice& key, const Slice& value) {
+  return Operate(
+      column_family, key, /*do_validate=*/false,
+      /*assume_tracked=*/false, [column_family, &key, &value, this]() {
+        Status s = GetBatchForWrite()->Put(column_family, key, value);
+        if (s.ok()) {
+          ++num_puts_;
+        }
+        return s;
+      });
+}
+
+Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family,
+                                       const SliceParts& key,
+                                       const SliceParts& value) {
+  return Operate(
+      column_family, key, /*do_validate=*/false,
+      /*assume_tracked=*/false, [column_family, &key, &value, this]() {
+        Status s = GetBatchForWrite()->Put(column_family, key, value);
+        if (s.ok()) {
+          ++num_puts_;
+        }
+        return s;
+      });
+}
+
+Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family,
+                                 const Slice& key, const bool assume_tracked) {
+  const bool do_validate = !assume_tracked;
+  return Operate(column_family, key, do_validate, assume_tracked,
+                 [column_family, &key, this]() {
+                   Status s = GetBatchForWrite()->Delete(column_family, key);
+                   if (s.ok()) {
+                     ++num_deletes_;
+                   }
+                   return s;
+                 });
+}
+
+Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family,
+                                 const SliceParts& key,
+                                 const bool assume_tracked) {
+  const bool do_validate = !assume_tracked;
+  return Operate(column_family, key, do_validate, assume_tracked,
+                 [column_family, &key, this]() {
+                   Status s = GetBatchForWrite()->Delete(column_family, key);
+                   if (s.ok()) {
+                     ++num_deletes_;
+                   }
+                   return s;
+                 });
+}
+
+Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family,
+                                          const Slice& key) {
+  return Operate(column_family, key, /*do_validate=*/false,
+                 /*assume_tracked=*/false, [column_family, &key, this]() {
+                   Status s = GetBatchForWrite()->Delete(column_family, key);
+                   if (s.ok()) {
+                     ++num_deletes_;
+                   }
+                   return s;
+                 });
+}
+
+Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family,
+                                          const SliceParts& key) {
+  return Operate(column_family, key, /*do_validate=*/false,
+                 /*assume_tracked=*/false, [column_family, &key, this]() {
+                   Status s = GetBatchForWrite()->Delete(column_family, key);
+                   if (s.ok()) {
+                     ++num_deletes_;
+                   }
+                   return s;
+                 });
+}
+
+Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family,
+                                       const Slice& key,
+                                       const bool assume_tracked) {
+  const bool do_validate = !assume_tracked;
+  return Operate(column_family, key, do_validate, assume_tracked,
+                 [column_family, &key, this]() {
+                   Status s =
+                       GetBatchForWrite()->SingleDelete(column_family, key);
+                   if (s.ok()) {
+                     ++num_deletes_;
+                   }
+                   return s;
+                 });
+}
+
+Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family,
+                                       const SliceParts& key,
+                                       const bool assume_tracked) {
+  const bool do_validate = !assume_tracked;
+  return Operate(column_family, key, do_validate, assume_tracked,
+                 [column_family, &key, this]() {
+                   Status s =
+                       GetBatchForWrite()->SingleDelete(column_family, key);
+                   if (s.ok()) {
+                     ++num_deletes_;
+                   }
+                   return s;
+                 });
+}
+
+Status WriteCommittedTxn::SingleDeleteUntracked(
+    ColumnFamilyHandle* column_family, const Slice& key) {
+  return Operate(column_family, key, /*do_validate=*/false,
+                 /*assume_tracked=*/false, [column_family, &key, this]() {
+                   Status s =
+                       GetBatchForWrite()->SingleDelete(column_family, key);
+                   if (s.ok()) {
+                     ++num_deletes_;
+                   }
+                   return s;
+                 });
+}
+
+Status WriteCommittedTxn::Merge(ColumnFamilyHandle* column_family,
+                                const Slice& key, const Slice& value,
+                                const bool assume_tracked) {
+  const bool do_validate = !assume_tracked;
+  return Operate(column_family, key, do_validate, assume_tracked,
+                 [column_family, &key, &value, this]() {
+                   Status s =
+                       GetBatchForWrite()->Merge(column_family, key, value);
+                   if (s.ok()) {
+                     ++num_merges_;
+                   }
+                   return s;
+                 });
+}
+
+template <typename TKey, typename TOperation>
+Status WriteCommittedTxn::Operate(ColumnFamilyHandle* column_family,
+                                  const TKey& key, const bool do_validate,
+                                  const bool assume_tracked,
+                                  TOperation&& operation) {
+  Status s;
+  if constexpr (std::is_same_v<Slice, TKey>) {
+    s = TryLock(column_family, key, /*read_only=*/false, /*exclusive=*/true,
+                do_validate, assume_tracked);
+  } else if constexpr (std::is_same_v<SliceParts, TKey>) {
+    std::string key_buf;
+    Slice contiguous_key(key, &key_buf);
+    s = TryLock(column_family, contiguous_key, /*read_only=*/false,
+                /*exclusive=*/true, do_validate, assume_tracked);
+  }
+  if (!s.ok()) {
+    return s;
+  }
+  column_family =
+      column_family ? column_family : db_impl_->DefaultColumnFamily();
+  assert(column_family);
+  const Comparator* const ucmp = column_family->GetComparator();
+  assert(ucmp);
+  size_t ts_sz = ucmp->timestamp_size();
+  if (ts_sz > 0) {
+    assert(ts_sz == sizeof(TxnTimestamp));
+    if (!IndexingEnabled()) {
+      cfs_with_ts_tracked_when_indexing_disabled_.insert(
+          column_family->GetID());
+    }
+  }
+  return operation();
+}
+
+Status WriteCommittedTxn::SetReadTimestampForValidation(TxnTimestamp ts) {
+  if (read_timestamp_ < kMaxTxnTimestamp && ts < read_timestamp_) {
+    return Status::InvalidArgument(
+        "Cannot decrease read timestamp for validation");
+  }
+  read_timestamp_ = ts;
+  return Status::OK();
+}
+
+Status WriteCommittedTxn::SetCommitTimestamp(TxnTimestamp ts) {
+  if (read_timestamp_ < kMaxTxnTimestamp && ts <= read_timestamp_) {
+    return Status::InvalidArgument(
+        "Cannot commit at timestamp smaller than or equal to read timestamp");
+  }
+  commit_timestamp_ = ts;
+  return Status::OK();
+}
 
 Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
+  if (batch && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
+    // CommitBatch() needs to lock the keys in the batch.
+    // However, the application also needs to specify the timestamp for the
+    // keys in batch before calling this API.
+    // This means timestamp order may violate the order of locking, thus
+    // violate the sequence number order for the same user key.
+    // Therefore, we disallow this operation for now.
+    return Status::NotSupported(
+        "Batch to commit includes timestamp assigned before locking");
+  }
+
   std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create());
   Status s = LockBatch(batch, keys_to_unlock.get());
 
@@ -173,7 +482,6 @@ Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
 }
 
 Status PessimisticTransaction::Prepare() {
-
   if (name_.empty()) {
     return Status::InvalidArgument(
         "Cannot prepare a transaction that has not been named.");
@@ -351,11 +659,54 @@ Status PessimisticTransaction::Commit() {
 }
 
 Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
+  WriteBatchWithIndex* wbwi = GetWriteBatch();
+  assert(wbwi);
+  WriteBatch* wb = wbwi->GetWriteBatch();
+  assert(wb);
+
+  const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb);
+  if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) {
+    return Status::InvalidArgument("Must assign a commit timestamp");
+  }
+
+  if (needs_ts) {
+    assert(commit_timestamp_ != kMaxTxnTimestamp);
+    char commit_ts_buf[sizeof(kMaxTxnTimestamp)];
+    EncodeFixed64(commit_ts_buf, commit_timestamp_);
+    Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf));
+
+    Status s =
+        wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t {
+          auto cf_iter = cfs_with_ts_tracked_when_indexing_disabled_.find(cf);
+          if (cf_iter != cfs_with_ts_tracked_when_indexing_disabled_.end()) {
+            return sizeof(kMaxTxnTimestamp);
+          }
+          const Comparator* ucmp =
+              WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf);
+          return ucmp ? ucmp->timestamp_size()
+                      : std::numeric_limits<uint64_t>::max();
+        });
+    if (!s.ok()) {
+      return s;
+    }
+  }
+
   uint64_t seq_used = kMaxSequenceNumber;
-  auto s =
-      db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(),
-                          /*callback*/ nullptr, /*log_used*/ nullptr,
-                          /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used);
+  SnapshotCreationCallback snapshot_creation_cb(db_impl_, commit_timestamp_,
+                                                snapshot_notifier_, snapshot_);
+  PostMemTableCallback* post_mem_cb = nullptr;
+  if (snapshot_needed_) {
+    if (commit_timestamp_ == kMaxTxnTimestamp) {
+      return Status::InvalidArgument("Must set transaction commit timestamp");
+    } else {
+      post_mem_cb = &snapshot_creation_cb;
+    }
+  }
+  auto s = db_impl_->WriteImpl(write_options_, wb,
+                               /*callback*/ nullptr, /*log_used*/ nullptr,
+                               /*log_ref*/ 0, /*disable_memtable*/ false,
+                               &seq_used, /*batch_cnt=*/0,
+                               /*pre_release_callback=*/nullptr, post_mem_cb);
   assert(!s.ok() || seq_used != kMaxSequenceNumber);
   if (s.ok()) {
     SetId(seq_used);
@@ -376,11 +727,46 @@ Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {
 }
 
 Status WriteCommittedTxn::CommitInternal() {
+  WriteBatchWithIndex* wbwi = GetWriteBatch();
+  assert(wbwi);
+  WriteBatch* wb = wbwi->GetWriteBatch();
+  assert(wb);
+
+  const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb);
+  if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) {
+    return Status::InvalidArgument("Must assign a commit timestamp");
+  }
   // We take the commit-time batch and append the Commit marker.
   // The Memtable will ignore the Commit marker in non-recovery mode
   WriteBatch* working_batch = GetCommitTimeWriteBatch();
-  auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
-  assert(s.ok());
+
+  Status s;
+  if (!needs_ts) {
+    s = WriteBatchInternal::MarkCommit(working_batch, name_);
+  } else {
+    assert(commit_timestamp_ != kMaxTxnTimestamp);
+    char commit_ts_buf[sizeof(kMaxTxnTimestamp)];
+    EncodeFixed64(commit_ts_buf, commit_timestamp_);
+    Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf));
+    s = WriteBatchInternal::MarkCommitWithTimestamp(working_batch, name_,
+                                                    commit_ts);
+    if (s.ok()) {
+      s = wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t {
+        if (cfs_with_ts_tracked_when_indexing_disabled_.find(cf) !=
+            cfs_with_ts_tracked_when_indexing_disabled_.end()) {
+          return sizeof(kMaxTxnTimestamp);
+        }
+        const Comparator* ucmp =
+            WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf);
+        return ucmp ? ucmp->timestamp_size()
+                    : std::numeric_limits<uint64_t>::max();
+      });
+    }
+  }
+
+  if (!s.ok()) {
+    return s;
+  }
 
   // any operations appended to this working_batch will be ignored from WAL
   working_batch->MarkWalTerminationPoint();
@@ -388,14 +774,26 @@ Status WriteCommittedTxn::CommitInternal() {
   // insert prepared batch into Memtable only skipping WAL.
   // Memtable will ignore BeginPrepare/EndPrepare markers
   // in non recovery mode and simply insert the values
-  s = WriteBatchInternal::Append(working_batch,
-                                 GetWriteBatch()->GetWriteBatch());
+  s = WriteBatchInternal::Append(working_batch, wb);
   assert(s.ok());
 
   uint64_t seq_used = kMaxSequenceNumber;
+  SnapshotCreationCallback snapshot_creation_cb(db_impl_, commit_timestamp_,
+                                                snapshot_notifier_, snapshot_);
+  PostMemTableCallback* post_mem_cb = nullptr;
+  if (snapshot_needed_) {
+    if (commit_timestamp_ == kMaxTxnTimestamp) {
+      s = Status::InvalidArgument("Must set transaction commit timestamp");
+      return s;
+    } else {
+      post_mem_cb = &snapshot_creation_cb;
+    }
+  }
   s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
                           /*log_used*/ nullptr, /*log_ref*/ log_number_,
-                          /*disable_memtable*/ false, &seq_used);
+                          /*disable_memtable*/ false, &seq_used,
+                          /*batch_cnt=*/0, /*pre_release_callback=*/nullptr,
+                          post_mem_cb);
   assert(!s.ok() || seq_used != kMaxSequenceNumber);
   if (s.ok()) {
     SetId(seq_used);
@@ -471,6 +869,10 @@ Status PessimisticTransaction::RollbackToSavePoint() {
 // On success, caller should unlock keys_to_unlock
 Status PessimisticTransaction::LockBatch(WriteBatch* batch,
                                          LockTracker* keys_to_unlock) {
+  if (!batch) {
+    return Status::InvalidArgument("batch is nullptr");
+  }
+
   class Handler : public WriteBatch::Handler {
    public:
     // Sorted map of column_family_id to sorted set of keys.
@@ -564,9 +966,20 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
   }
   uint32_t cfh_id = GetColumnFamilyID(column_family);
   std::string key_str = key.ToString();
-  PointLockStatus status = tracked_locks_->GetPointLockStatus(cfh_id, key_str);
-  bool previously_locked = status.locked;
-  bool lock_upgrade = previously_locked && exclusive && !status.exclusive;
+
+  PointLockStatus status;
+  bool lock_upgrade;
+  bool previously_locked;
+  if (tracked_locks_->IsPointLockSupported()) {
+    status = tracked_locks_->GetPointLockStatus(cfh_id, key_str);
+    previously_locked = status.locked;
+    lock_upgrade = previously_locked && exclusive && !status.exclusive;
+  } else {
+    // If the record is tracked, we can assume it was locked, too.
+    previously_locked = assume_tracked;
+    status.locked = false;
+    lock_upgrade = false;
+  }
 
   // Lock this key if this transactions hasn't already locked it or we require
   // an upgrade.
@@ -574,6 +987,13 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
     s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
   }
 
+  const ColumnFamilyHandle* const cfh =
+      column_family ? column_family : db_impl_->DefaultColumnFamily();
+  assert(cfh);
+  const Comparator* const ucmp = cfh->GetComparator();
+  assert(ucmp);
+  size_t ts_sz = ucmp->timestamp_size();
+
   SetSnapshotIfNeeded();
 
   // Even though we do not care about doing conflict checking for this write,
@@ -581,18 +1001,19 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
   // some other write.  However, we do not need to check if there have been
   // any writes since this transaction's snapshot.
   // TODO(agiardullo): could optimize by supporting shared txn locks in the
-  // future
+  // future.
   SequenceNumber tracked_at_seq =
       status.locked ? status.seq : kMaxSequenceNumber;
-  if (!do_validate || snapshot_ == nullptr) {
-    if (assume_tracked && !previously_locked) {
+  if (!do_validate || (snapshot_ == nullptr &&
+                       (0 == ts_sz || kMaxTxnTimestamp == read_timestamp_))) {
+    if (assume_tracked && !previously_locked &&
+        tracked_locks_->IsPointLockSupported()) {
       s = Status::InvalidArgument(
           "assume_tracked is set but it is not tracked yet");
     }
     // Need to remember the earliest sequence number that we know that this
     // key has not been modified after.  This is useful if this same
-    // transaction
-    // later tries to lock this key again.
+    // transaction later tries to lock this key again.
     if (tracked_at_seq == kMaxSequenceNumber) {
       // Since we haven't checked a snapshot, we only know this key has not
       // been modified since after we locked it.
@@ -603,24 +1024,21 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
       // lock, which would be an unusual sequence.
       tracked_at_seq = db_->GetLatestSequenceNumber();
     }
-  } else {
+  } else if (s.ok()) {
     // If a snapshot is set, we need to make sure the key hasn't been modified
     // since the snapshot.  This must be done after we locked the key.
     // If we already have validated an earilier snapshot it must has been
     // reflected in tracked_at_seq and ValidateSnapshot will return OK.
-    if (s.ok()) {
-      s = ValidateSnapshot(column_family, key, &tracked_at_seq);
+    s = ValidateSnapshot(column_family, key, &tracked_at_seq);
 
-      if (!s.ok()) {
-        // Failed to validate key
-        // Unlock key we just locked
-        if (lock_upgrade) {
-          s = txn_db_impl_->TryLock(this, cfh_id, key_str,
-                                    false /* exclusive */);
-          assert(s.ok());
-        } else if (!previously_locked) {
-          txn_db_impl_->UnLock(this, cfh_id, key.ToString());
-        }
+    if (!s.ok()) {
+      // Failed to validate key
+      // Unlock key we just locked
+      if (lock_upgrade) {
+        s = txn_db_impl_->TryLock(this, cfh_id, key_str, false /* exclusive */);
+        assert(s.ok());
+      } else if (!previously_locked) {
+        txn_db_impl_->UnLock(this, cfh_id, key.ToString());
       }
     }
   }
@@ -642,11 +1060,13 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
       TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
     } else {
 #ifndef NDEBUG
-      PointLockStatus lock_status =
-          tracked_locks_->GetPointLockStatus(cfh_id, key_str);
-      assert(lock_status.locked);
-      assert(lock_status.seq <= tracked_at_seq);
-      assert(lock_status.exclusive == exclusive);
+      if (tracked_locks_->IsPointLockSupported()) {
+        PointLockStatus lock_status =
+            tracked_locks_->GetPointLockStatus(cfh_id, key_str);
+        assert(lock_status.locked);
+        assert(lock_status.seq <= tracked_at_seq);
+        assert(lock_status.exclusive == exclusive);
+      }
 #endif
     }
   }
@@ -654,6 +1074,22 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
   return s;
 }
 
+Status PessimisticTransaction::GetRangeLock(ColumnFamilyHandle* column_family,
+                                            const Endpoint& start_endp,
+                                            const Endpoint& end_endp) {
+  ColumnFamilyHandle* cfh =
+      column_family ? column_family : db_impl_->DefaultColumnFamily();
+  uint32_t cfh_id = GetColumnFamilyID(cfh);
+
+  Status s = txn_db_impl_->TryRangeLock(this, cfh_id, start_endp, end_endp);
+
+  if (s.ok()) {
+    RangeLockRequest req{cfh_id, start_endp, end_endp};
+    tracked_locks_->Track(req);
+  }
+  return s;
+}
+
 // Return OK() if this key has not been modified more recently than the
 // transaction snapshot_.
 // tracked_at_seq is the global seq at which we either locked the key or already
@@ -661,15 +1097,21 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
 Status PessimisticTransaction::ValidateSnapshot(
     ColumnFamilyHandle* column_family, const Slice& key,
     SequenceNumber* tracked_at_seq) {
-  assert(snapshot_);
-
-  SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
-  if (*tracked_at_seq <= snap_seq) {
-    // If the key has been previous validated (or locked) at a sequence number
-    // earlier than the current snapshot's sequence number, we already know it
-    // has not been modified aftter snap_seq either.
-    return Status::OK();
+  assert(snapshot_ || read_timestamp_ < kMaxTxnTimestamp);
+
+  SequenceNumber snap_seq = 0;
+  if (snapshot_) {
+    snap_seq = snapshot_->GetSequenceNumber();
+    if (*tracked_at_seq <= snap_seq) {
+      // If the key has been previous validated (or locked) at a sequence number
+      // earlier than the current snapshot's sequence number, we already know it
+      // has not been modified aftter snap_seq either.
+      return Status::OK();
+    }
+  } else {
+    snap_seq = db_impl_->GetLatestSequenceNumber();
   }
+
   // Otherwise we have either
   // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
   // 2: snap_seq < tracked_at_seq: last time we lock the key was via
@@ -681,8 +1123,19 @@ Status PessimisticTransaction::ValidateSnapshot(
   ColumnFamilyHandle* cfh =
       column_family ? column_family : db_impl_->DefaultColumnFamily();
 
+  assert(cfh);
+  const Comparator* const ucmp = cfh->GetComparator();
+  assert(ucmp);
+  size_t ts_sz = ucmp->timestamp_size();
+  std::string ts_buf;
+  if (ts_sz > 0 && read_timestamp_ < kMaxTxnTimestamp) {
+    assert(ts_sz == sizeof(read_timestamp_));
+    PutFixed64(&ts_buf, read_timestamp_);
+  }
+
   return TransactionUtil::CheckKeyForConflicts(
-      db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */);
+      db_impl_, cfh, key.ToString(), snap_seq, ts_sz == 0 ? nullptr : &ts_buf,
+      false /* cache_only */);
 }
 
 bool PessimisticTransaction::TryStealingLocks() {