#include "db/column_family.h"
#include "db/db_impl/db_impl.h"
+#include "logging/logging.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/snapshot.h"
#include "util/string_util.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_util.h"
+#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
namespace ROCKSDB_NAMESPACE {
}
void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
- txn_id_ = GenTxnID();
+ // Range lock manager uses address of transaction object as TXNID
+ const TransactionDBOptions& db_options = txn_db_impl_->GetTxnDBOptions();
+ if (db_options.lock_mgr_handle &&
+ db_options.lock_mgr_handle->getLockManager()->IsRangeLockSupported()) {
+ txn_id_ = reinterpret_cast<TransactionID>(this);
+ } else {
+ txn_id_ = GenTxnID();
+ }
txn_state_ = STARTED;
use_only_the_last_commit_time_batch_for_recovery_ =
txn_options.use_only_the_last_commit_time_batch_for_recovery;
skip_prepare_ = txn_options.skip_prepare;
+
+ read_timestamp_ = kMaxTxnTimestamp;
+ commit_timestamp_ = kMaxTxnTimestamp;
}
PessimisticTransaction::~PessimisticTransaction() {
bool PessimisticTransaction::IsExpired() const {
if (expiration_time_ > 0) {
- if (db_->GetEnv()->NowMicros() >= expiration_time_) {
+ if (dbimpl_->GetSystemClock()->NowMicros() >= expiration_time_) {
// Transaction is expired.
return true;
}
WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options)
- : PessimisticTransaction(txn_db, write_options, txn_options){};
+ : PessimisticTransaction(txn_db, write_options, txn_options) {}
+
+Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key, std::string* value,
+ bool exclusive, const bool do_validate) {
+ return GetForUpdateImpl(read_options, column_family, key, value, exclusive,
+ do_validate);
+}
+
+Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key,
+ PinnableSlice* pinnable_val,
+ bool exclusive, const bool do_validate) {
+ return GetForUpdateImpl(read_options, column_family, key, pinnable_val,
+ exclusive, do_validate);
+}
+
+template <typename TValue>
+inline Status WriteCommittedTxn::GetForUpdateImpl(
+ const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+ const Slice& key, TValue* value, bool exclusive, const bool do_validate) {
+ column_family =
+ column_family ? column_family : db_impl_->DefaultColumnFamily();
+ assert(column_family);
+ if (!read_options.timestamp) {
+ const Comparator* const ucmp = column_family->GetComparator();
+ assert(ucmp);
+ size_t ts_sz = ucmp->timestamp_size();
+ if (0 == ts_sz) {
+ return TransactionBaseImpl::GetForUpdate(read_options, column_family, key,
+ value, exclusive, do_validate);
+ }
+ } else {
+ Status s = db_impl_->FailIfTsMismatchCf(
+ column_family, *(read_options.timestamp), /*ts_for_read=*/true);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ if (!do_validate) {
+ return Status::InvalidArgument(
+ "If do_validate is false then GetForUpdate with read_timestamp is not "
+ "defined.");
+ } else if (kMaxTxnTimestamp == read_timestamp_) {
+ return Status::InvalidArgument("read_timestamp must be set for validation");
+ }
+
+ if (!read_options.timestamp) {
+ ReadOptions read_opts_copy = read_options;
+ char ts_buf[sizeof(kMaxTxnTimestamp)];
+ EncodeFixed64(ts_buf, read_timestamp_);
+ Slice ts(ts_buf, sizeof(ts_buf));
+ read_opts_copy.timestamp = &ts;
+ return TransactionBaseImpl::GetForUpdate(read_opts_copy, column_family, key,
+ value, exclusive, do_validate);
+ }
+ assert(read_options.timestamp);
+ const char* const ts_buf = read_options.timestamp->data();
+ assert(read_options.timestamp->size() == sizeof(kMaxTxnTimestamp));
+ TxnTimestamp ts = DecodeFixed64(ts_buf);
+ if (ts != read_timestamp_) {
+ return Status::InvalidArgument("Must read from the same read_timestamp");
+ }
+ return TransactionBaseImpl::GetForUpdate(read_options, column_family, key,
+ value, exclusive, do_validate);
+}
+
+Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family,
+ const Slice& key, const Slice& value,
+ const bool assume_tracked) {
+ const bool do_validate = !assume_tracked;
+ return Operate(column_family, key, do_validate, assume_tracked,
+ [column_family, &key, &value, this]() {
+ Status s =
+ GetBatchForWrite()->Put(column_family, key, value);
+ if (s.ok()) {
+ ++num_puts_;
+ }
+ return s;
+ });
+}
+
+Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family,
+ const SliceParts& key, const SliceParts& value,
+ const bool assume_tracked) {
+ const bool do_validate = !assume_tracked;
+ return Operate(column_family, key, do_validate, assume_tracked,
+ [column_family, &key, &value, this]() {
+ Status s =
+ GetBatchForWrite()->Put(column_family, key, value);
+ if (s.ok()) {
+ ++num_puts_;
+ }
+ return s;
+ });
+}
+
+Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family,
+ const Slice& key, const Slice& value) {
+ return Operate(
+ column_family, key, /*do_validate=*/false,
+ /*assume_tracked=*/false, [column_family, &key, &value, this]() {
+ Status s = GetBatchForWrite()->Put(column_family, key, value);
+ if (s.ok()) {
+ ++num_puts_;
+ }
+ return s;
+ });
+}
+
+Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family,
+ const SliceParts& key,
+ const SliceParts& value) {
+ return Operate(
+ column_family, key, /*do_validate=*/false,
+ /*assume_tracked=*/false, [column_family, &key, &value, this]() {
+ Status s = GetBatchForWrite()->Put(column_family, key, value);
+ if (s.ok()) {
+ ++num_puts_;
+ }
+ return s;
+ });
+}
+
+Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family,
+ const Slice& key, const bool assume_tracked) {
+ const bool do_validate = !assume_tracked;
+ return Operate(column_family, key, do_validate, assume_tracked,
+ [column_family, &key, this]() {
+ Status s = GetBatchForWrite()->Delete(column_family, key);
+ if (s.ok()) {
+ ++num_deletes_;
+ }
+ return s;
+ });
+}
+
+Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family,
+ const SliceParts& key,
+ const bool assume_tracked) {
+ const bool do_validate = !assume_tracked;
+ return Operate(column_family, key, do_validate, assume_tracked,
+ [column_family, &key, this]() {
+ Status s = GetBatchForWrite()->Delete(column_family, key);
+ if (s.ok()) {
+ ++num_deletes_;
+ }
+ return s;
+ });
+}
+
+Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family,
+ const Slice& key) {
+ return Operate(column_family, key, /*do_validate=*/false,
+ /*assume_tracked=*/false, [column_family, &key, this]() {
+ Status s = GetBatchForWrite()->Delete(column_family, key);
+ if (s.ok()) {
+ ++num_deletes_;
+ }
+ return s;
+ });
+}
+
+Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family,
+ const SliceParts& key) {
+ return Operate(column_family, key, /*do_validate=*/false,
+ /*assume_tracked=*/false, [column_family, &key, this]() {
+ Status s = GetBatchForWrite()->Delete(column_family, key);
+ if (s.ok()) {
+ ++num_deletes_;
+ }
+ return s;
+ });
+}
+
+Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family,
+ const Slice& key,
+ const bool assume_tracked) {
+ const bool do_validate = !assume_tracked;
+ return Operate(column_family, key, do_validate, assume_tracked,
+ [column_family, &key, this]() {
+ Status s =
+ GetBatchForWrite()->SingleDelete(column_family, key);
+ if (s.ok()) {
+ ++num_deletes_;
+ }
+ return s;
+ });
+}
+
+Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family,
+ const SliceParts& key,
+ const bool assume_tracked) {
+ const bool do_validate = !assume_tracked;
+ return Operate(column_family, key, do_validate, assume_tracked,
+ [column_family, &key, this]() {
+ Status s =
+ GetBatchForWrite()->SingleDelete(column_family, key);
+ if (s.ok()) {
+ ++num_deletes_;
+ }
+ return s;
+ });
+}
+
+Status WriteCommittedTxn::SingleDeleteUntracked(
+ ColumnFamilyHandle* column_family, const Slice& key) {
+ return Operate(column_family, key, /*do_validate=*/false,
+ /*assume_tracked=*/false, [column_family, &key, this]() {
+ Status s =
+ GetBatchForWrite()->SingleDelete(column_family, key);
+ if (s.ok()) {
+ ++num_deletes_;
+ }
+ return s;
+ });
+}
+
+Status WriteCommittedTxn::Merge(ColumnFamilyHandle* column_family,
+ const Slice& key, const Slice& value,
+ const bool assume_tracked) {
+ const bool do_validate = !assume_tracked;
+ return Operate(column_family, key, do_validate, assume_tracked,
+ [column_family, &key, &value, this]() {
+ Status s =
+ GetBatchForWrite()->Merge(column_family, key, value);
+ if (s.ok()) {
+ ++num_merges_;
+ }
+ return s;
+ });
+}
+
+template <typename TKey, typename TOperation>
+Status WriteCommittedTxn::Operate(ColumnFamilyHandle* column_family,
+ const TKey& key, const bool do_validate,
+ const bool assume_tracked,
+ TOperation&& operation) {
+ Status s;
+ if constexpr (std::is_same_v<Slice, TKey>) {
+ s = TryLock(column_family, key, /*read_only=*/false, /*exclusive=*/true,
+ do_validate, assume_tracked);
+ } else if constexpr (std::is_same_v<SliceParts, TKey>) {
+ std::string key_buf;
+ Slice contiguous_key(key, &key_buf);
+ s = TryLock(column_family, contiguous_key, /*read_only=*/false,
+ /*exclusive=*/true, do_validate, assume_tracked);
+ }
+ if (!s.ok()) {
+ return s;
+ }
+ column_family =
+ column_family ? column_family : db_impl_->DefaultColumnFamily();
+ assert(column_family);
+ const Comparator* const ucmp = column_family->GetComparator();
+ assert(ucmp);
+ size_t ts_sz = ucmp->timestamp_size();
+ if (ts_sz > 0) {
+ assert(ts_sz == sizeof(TxnTimestamp));
+ if (!IndexingEnabled()) {
+ cfs_with_ts_tracked_when_indexing_disabled_.insert(
+ column_family->GetID());
+ }
+ }
+ return operation();
+}
+
+Status WriteCommittedTxn::SetReadTimestampForValidation(TxnTimestamp ts) {
+ if (read_timestamp_ < kMaxTxnTimestamp && ts < read_timestamp_) {
+ return Status::InvalidArgument(
+ "Cannot decrease read timestamp for validation");
+ }
+ read_timestamp_ = ts;
+ return Status::OK();
+}
+
+Status WriteCommittedTxn::SetCommitTimestamp(TxnTimestamp ts) {
+ if (read_timestamp_ < kMaxTxnTimestamp && ts <= read_timestamp_) {
+ return Status::InvalidArgument(
+ "Cannot commit at timestamp smaller than or equal to read timestamp");
+ }
+ commit_timestamp_ = ts;
+ return Status::OK();
+}
Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
+ if (batch && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
+ // CommitBatch() needs to lock the keys in the batch.
+ // However, the application also needs to specify the timestamp for the
+ // keys in batch before calling this API.
+ // This means timestamp order may violate the order of locking, thus
+ // violate the sequence number order for the same user key.
+ // Therefore, we disallow this operation for now.
+ return Status::NotSupported(
+ "Batch to commit includes timestamp assigned before locking");
+ }
+
std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create());
Status s = LockBatch(batch, keys_to_unlock.get());
}
Status PessimisticTransaction::Prepare() {
-
if (name_.empty()) {
return Status::InvalidArgument(
"Cannot prepare a transaction that has not been named.");
}
Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
+ WriteBatchWithIndex* wbwi = GetWriteBatch();
+ assert(wbwi);
+ WriteBatch* wb = wbwi->GetWriteBatch();
+ assert(wb);
+
+ const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb);
+ if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) {
+ return Status::InvalidArgument("Must assign a commit timestamp");
+ }
+
+ if (needs_ts) {
+ assert(commit_timestamp_ != kMaxTxnTimestamp);
+ char commit_ts_buf[sizeof(kMaxTxnTimestamp)];
+ EncodeFixed64(commit_ts_buf, commit_timestamp_);
+ Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf));
+
+ Status s =
+ wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t {
+ auto cf_iter = cfs_with_ts_tracked_when_indexing_disabled_.find(cf);
+ if (cf_iter != cfs_with_ts_tracked_when_indexing_disabled_.end()) {
+ return sizeof(kMaxTxnTimestamp);
+ }
+ const Comparator* ucmp =
+ WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf);
+ return ucmp ? ucmp->timestamp_size()
+ : std::numeric_limits<uint64_t>::max();
+ });
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
uint64_t seq_used = kMaxSequenceNumber;
- auto s =
- db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(),
- /*callback*/ nullptr, /*log_used*/ nullptr,
- /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used);
+ SnapshotCreationCallback snapshot_creation_cb(db_impl_, commit_timestamp_,
+ snapshot_notifier_, snapshot_);
+ PostMemTableCallback* post_mem_cb = nullptr;
+ if (snapshot_needed_) {
+ if (commit_timestamp_ == kMaxTxnTimestamp) {
+ return Status::InvalidArgument("Must set transaction commit timestamp");
+ } else {
+ post_mem_cb = &snapshot_creation_cb;
+ }
+ }
+ auto s = db_impl_->WriteImpl(write_options_, wb,
+ /*callback*/ nullptr, /*log_used*/ nullptr,
+ /*log_ref*/ 0, /*disable_memtable*/ false,
+ &seq_used, /*batch_cnt=*/0,
+ /*pre_release_callback=*/nullptr, post_mem_cb);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (s.ok()) {
SetId(seq_used);
}
Status WriteCommittedTxn::CommitInternal() {
+ WriteBatchWithIndex* wbwi = GetWriteBatch();
+ assert(wbwi);
+ WriteBatch* wb = wbwi->GetWriteBatch();
+ assert(wb);
+
+ const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb);
+ if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) {
+ return Status::InvalidArgument("Must assign a commit timestamp");
+ }
// We take the commit-time batch and append the Commit marker.
// The Memtable will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
- auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
- assert(s.ok());
+
+ Status s;
+ if (!needs_ts) {
+ s = WriteBatchInternal::MarkCommit(working_batch, name_);
+ } else {
+ assert(commit_timestamp_ != kMaxTxnTimestamp);
+ char commit_ts_buf[sizeof(kMaxTxnTimestamp)];
+ EncodeFixed64(commit_ts_buf, commit_timestamp_);
+ Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf));
+ s = WriteBatchInternal::MarkCommitWithTimestamp(working_batch, name_,
+ commit_ts);
+ if (s.ok()) {
+ s = wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t {
+ if (cfs_with_ts_tracked_when_indexing_disabled_.find(cf) !=
+ cfs_with_ts_tracked_when_indexing_disabled_.end()) {
+ return sizeof(kMaxTxnTimestamp);
+ }
+ const Comparator* ucmp =
+ WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf);
+ return ucmp ? ucmp->timestamp_size()
+ : std::numeric_limits<uint64_t>::max();
+ });
+ }
+ }
+
+ if (!s.ok()) {
+ return s;
+ }
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();
// insert prepared batch into Memtable only skipping WAL.
// Memtable will ignore BeginPrepare/EndPrepare markers
// in non recovery mode and simply insert the values
- s = WriteBatchInternal::Append(working_batch,
- GetWriteBatch()->GetWriteBatch());
+ s = WriteBatchInternal::Append(working_batch, wb);
assert(s.ok());
uint64_t seq_used = kMaxSequenceNumber;
+ SnapshotCreationCallback snapshot_creation_cb(db_impl_, commit_timestamp_,
+ snapshot_notifier_, snapshot_);
+ PostMemTableCallback* post_mem_cb = nullptr;
+ if (snapshot_needed_) {
+ if (commit_timestamp_ == kMaxTxnTimestamp) {
+ s = Status::InvalidArgument("Must set transaction commit timestamp");
+ return s;
+ } else {
+ post_mem_cb = &snapshot_creation_cb;
+ }
+ }
s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
/*log_used*/ nullptr, /*log_ref*/ log_number_,
- /*disable_memtable*/ false, &seq_used);
+ /*disable_memtable*/ false, &seq_used,
+ /*batch_cnt=*/0, /*pre_release_callback=*/nullptr,
+ post_mem_cb);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (s.ok()) {
SetId(seq_used);
// On success, caller should unlock keys_to_unlock
Status PessimisticTransaction::LockBatch(WriteBatch* batch,
LockTracker* keys_to_unlock) {
+ if (!batch) {
+ return Status::InvalidArgument("batch is nullptr");
+ }
+
class Handler : public WriteBatch::Handler {
public:
// Sorted map of column_family_id to sorted set of keys.
}
uint32_t cfh_id = GetColumnFamilyID(column_family);
std::string key_str = key.ToString();
- PointLockStatus status = tracked_locks_->GetPointLockStatus(cfh_id, key_str);
- bool previously_locked = status.locked;
- bool lock_upgrade = previously_locked && exclusive && !status.exclusive;
+
+ PointLockStatus status;
+ bool lock_upgrade;
+ bool previously_locked;
+ if (tracked_locks_->IsPointLockSupported()) {
+ status = tracked_locks_->GetPointLockStatus(cfh_id, key_str);
+ previously_locked = status.locked;
+ lock_upgrade = previously_locked && exclusive && !status.exclusive;
+ } else {
+ // If the record is tracked, we can assume it was locked, too.
+ previously_locked = assume_tracked;
+ status.locked = false;
+ lock_upgrade = false;
+ }
// Lock this key if this transactions hasn't already locked it or we require
// an upgrade.
s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
}
+ const ColumnFamilyHandle* const cfh =
+ column_family ? column_family : db_impl_->DefaultColumnFamily();
+ assert(cfh);
+ const Comparator* const ucmp = cfh->GetComparator();
+ assert(ucmp);
+ size_t ts_sz = ucmp->timestamp_size();
+
SetSnapshotIfNeeded();
// Even though we do not care about doing conflict checking for this write,
// some other write. However, we do not need to check if there have been
// any writes since this transaction's snapshot.
// TODO(agiardullo): could optimize by supporting shared txn locks in the
- // future
+ // future.
SequenceNumber tracked_at_seq =
status.locked ? status.seq : kMaxSequenceNumber;
- if (!do_validate || snapshot_ == nullptr) {
- if (assume_tracked && !previously_locked) {
+ if (!do_validate || (snapshot_ == nullptr &&
+ (0 == ts_sz || kMaxTxnTimestamp == read_timestamp_))) {
+ if (assume_tracked && !previously_locked &&
+ tracked_locks_->IsPointLockSupported()) {
s = Status::InvalidArgument(
"assume_tracked is set but it is not tracked yet");
}
// Need to remember the earliest sequence number that we know that this
// key has not been modified after. This is useful if this same
- // transaction
- // later tries to lock this key again.
+ // transaction later tries to lock this key again.
if (tracked_at_seq == kMaxSequenceNumber) {
// Since we haven't checked a snapshot, we only know this key has not
// been modified since after we locked it.
// lock, which would be an unusual sequence.
tracked_at_seq = db_->GetLatestSequenceNumber();
}
- } else {
+ } else if (s.ok()) {
// If a snapshot is set, we need to make sure the key hasn't been modified
// since the snapshot. This must be done after we locked the key.
// If we already have validated an earilier snapshot it must has been
// reflected in tracked_at_seq and ValidateSnapshot will return OK.
- if (s.ok()) {
- s = ValidateSnapshot(column_family, key, &tracked_at_seq);
+ s = ValidateSnapshot(column_family, key, &tracked_at_seq);
- if (!s.ok()) {
- // Failed to validate key
- // Unlock key we just locked
- if (lock_upgrade) {
- s = txn_db_impl_->TryLock(this, cfh_id, key_str,
- false /* exclusive */);
- assert(s.ok());
- } else if (!previously_locked) {
- txn_db_impl_->UnLock(this, cfh_id, key.ToString());
- }
+ if (!s.ok()) {
+ // Failed to validate key
+ // Unlock key we just locked
+ if (lock_upgrade) {
+ s = txn_db_impl_->TryLock(this, cfh_id, key_str, false /* exclusive */);
+ assert(s.ok());
+ } else if (!previously_locked) {
+ txn_db_impl_->UnLock(this, cfh_id, key.ToString());
}
}
}
TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
} else {
#ifndef NDEBUG
- PointLockStatus lock_status =
- tracked_locks_->GetPointLockStatus(cfh_id, key_str);
- assert(lock_status.locked);
- assert(lock_status.seq <= tracked_at_seq);
- assert(lock_status.exclusive == exclusive);
+ if (tracked_locks_->IsPointLockSupported()) {
+ PointLockStatus lock_status =
+ tracked_locks_->GetPointLockStatus(cfh_id, key_str);
+ assert(lock_status.locked);
+ assert(lock_status.seq <= tracked_at_seq);
+ assert(lock_status.exclusive == exclusive);
+ }
#endif
}
}
return s;
}
+Status PessimisticTransaction::GetRangeLock(ColumnFamilyHandle* column_family,
+ const Endpoint& start_endp,
+ const Endpoint& end_endp) {
+ ColumnFamilyHandle* cfh =
+ column_family ? column_family : db_impl_->DefaultColumnFamily();
+ uint32_t cfh_id = GetColumnFamilyID(cfh);
+
+ Status s = txn_db_impl_->TryRangeLock(this, cfh_id, start_endp, end_endp);
+
+ if (s.ok()) {
+ RangeLockRequest req{cfh_id, start_endp, end_endp};
+ tracked_locks_->Track(req);
+ }
+ return s;
+}
+
// Return OK() if this key has not been modified more recently than the
// transaction snapshot_.
// tracked_at_seq is the global seq at which we either locked the key or already
Status PessimisticTransaction::ValidateSnapshot(
ColumnFamilyHandle* column_family, const Slice& key,
SequenceNumber* tracked_at_seq) {
- assert(snapshot_);
-
- SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
- if (*tracked_at_seq <= snap_seq) {
- // If the key has been previous validated (or locked) at a sequence number
- // earlier than the current snapshot's sequence number, we already know it
- // has not been modified aftter snap_seq either.
- return Status::OK();
+ assert(snapshot_ || read_timestamp_ < kMaxTxnTimestamp);
+
+ SequenceNumber snap_seq = 0;
+ if (snapshot_) {
+ snap_seq = snapshot_->GetSequenceNumber();
+ if (*tracked_at_seq <= snap_seq) {
+ // If the key has been previous validated (or locked) at a sequence number
+ // earlier than the current snapshot's sequence number, we already know it
+ // has not been modified aftter snap_seq either.
+ return Status::OK();
+ }
+ } else {
+ snap_seq = db_impl_->GetLatestSequenceNumber();
}
+
// Otherwise we have either
// 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
// 2: snap_seq < tracked_at_seq: last time we lock the key was via
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();
+ assert(cfh);
+ const Comparator* const ucmp = cfh->GetComparator();
+ assert(ucmp);
+ size_t ts_sz = ucmp->timestamp_size();
+ std::string ts_buf;
+ if (ts_sz > 0 && read_timestamp_ < kMaxTxnTimestamp) {
+ assert(ts_sz == sizeof(read_timestamp_));
+ PutFixed64(&ts_buf, read_timestamp_);
+ }
+
return TransactionUtil::CheckKeyForConflicts(
- db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */);
+ db_impl_, cfh, key.ToString(), snap_seq, ts_sz == 0 ? nullptr : &ts_buf,
+ false /* cache_only */);
}
bool PessimisticTransaction::TryStealingLocks() {