PessimisticTransaction::PessimisticTransaction(
TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options, const bool init)
- : TransactionBaseImpl(txn_db->GetRootDB(), write_options),
+ : TransactionBaseImpl(
+ txn_db->GetRootDB(), write_options,
+ static_cast_with_check<PessimisticTransactionDB>(txn_db)
+ ->GetLockTrackerFactory()),
txn_db_impl_(nullptr),
expiration_time_(0),
txn_id_(0),
deadlock_detect_(false),
deadlock_detect_depth_(0),
skip_concurrency_control_(false) {
- txn_db_impl_ =
- static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
- db_impl_ = static_cast_with_check<DBImpl, DB>(db_);
+ txn_db_impl_ = static_cast_with_check<PessimisticTransactionDB>(txn_db);
+ db_impl_ = static_cast_with_check<DBImpl>(db_);
if (init) {
Initialize(txn_options);
}
}
use_only_the_last_commit_time_batch_for_recovery_ =
txn_options.use_only_the_last_commit_time_batch_for_recovery;
+ skip_prepare_ = txn_options.skip_prepare;
}
PessimisticTransaction::~PessimisticTransaction() {
- txn_db_impl_->UnLock(this, &GetTrackedKeys());
+ txn_db_impl_->UnLock(this, *tracked_locks_);
if (expiration_time_ > 0) {
txn_db_impl_->RemoveExpirableTransaction(txn_id_);
}
- if (!name_.empty() && txn_state_ != COMMITED) {
+ if (!name_.empty() && txn_state_ != COMMITTED) {
txn_db_impl_->UnregisterTransaction(this);
}
}
void PessimisticTransaction::Clear() {
- txn_db_impl_->UnLock(this, &GetTrackedKeys());
+ txn_db_impl_->UnLock(this, *tracked_locks_);
TransactionBaseImpl::Clear();
}
void PessimisticTransaction::Reinitialize(
TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options) {
- if (!name_.empty() && txn_state_ != COMMITED) {
+ if (!name_.empty() && txn_state_ != COMMITTED) {
txn_db_impl_->UnregisterTransaction(this);
}
TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);
: PessimisticTransaction(txn_db, write_options, txn_options){};
Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
- TransactionKeyMap keys_to_unlock;
- Status s = LockBatch(batch, &keys_to_unlock);
+ std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create());
+ Status s = LockBatch(batch, keys_to_unlock.get());
if (!s.ok()) {
return s;
txn_state_.store(AWAITING_COMMIT);
s = CommitBatchInternal(batch);
if (s.ok()) {
- txn_state_.store(COMMITED);
+ txn_state_.store(COMMITTED);
}
} else if (txn_state_ == LOCKS_STOLEN) {
s = Status::Expired();
s = Status::InvalidArgument("Transaction is not in state for commit.");
}
- txn_db_impl_->UnLock(this, &keys_to_unlock);
+ txn_db_impl_->UnLock(this, *keys_to_unlock);
return s;
}
Status PessimisticTransaction::Prepare() {
- Status s;
if (name_.empty()) {
return Status::InvalidArgument(
return Status::Expired();
}
+ Status s;
bool can_prepare = false;
if (expiration_time_ > 0) {
AWAITING_PREPARE);
} else if (txn_state_ == STARTED) {
// expiration and lock stealing is not possible
+ txn_state_.store(AWAITING_PREPARE);
can_prepare = true;
}
if (can_prepare) {
- txn_state_.store(AWAITING_PREPARE);
// transaction can't expire after preparation
expiration_time_ = 0;
assert(log_number_ == 0 ||
s = Status::Expired();
} else if (txn_state_ == PREPARED) {
s = Status::InvalidArgument("Transaction has already been prepared.");
- } else if (txn_state_ == COMMITED) {
+ } else if (txn_state_ == COMMITTED) {
s = Status::InvalidArgument("Transaction has already been committed.");
} else if (txn_state_ == ROLLEDBACK) {
s = Status::InvalidArgument("Transaction has already been rolledback.");
Status WriteCommittedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
- WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
+ auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
+ name_);
+ assert(s.ok());
class MarkLogCallback : public PreReleaseCallback {
public:
MarkLogCallback(DBImpl* db, bool two_write_queues)
const bool kDisableMemtable = true;
SequenceNumber* const KIgnoreSeqUsed = nullptr;
const size_t kNoBatchCount = 0;
- Status s = db_impl_->WriteImpl(
- write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback,
- &log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
- &mark_log_callback);
+ s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
+ kNoWriteCallback, &log_number_, kRefNoLog,
+ kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
+ &mark_log_callback);
return s;
}
Status PessimisticTransaction::Commit() {
- Status s;
bool commit_without_prepare = false;
bool commit_prepared = false;
commit_prepared = true;
} else if (txn_state_ == STARTED) {
// expiration and lock stealing is not a concern
- commit_without_prepare = true;
- // TODO(myabandeh): what if the user mistakenly forgets prepare? We should
- // add an option so that the user explictly express the intention of
- // skipping the prepare phase.
+ if (skip_prepare_) {
+ commit_without_prepare = true;
+ } else {
+ return Status::TxnNotPrepared();
+ }
}
+ Status s;
if (commit_without_prepare) {
assert(!commit_prepared);
if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
}
Clear();
if (s.ok()) {
- txn_state_.store(COMMITED);
+ txn_state_.store(COMMITTED);
}
}
} else if (commit_prepared) {
txn_db_impl_->UnregisterTransaction(this);
Clear();
- txn_state_.store(COMMITED);
+ txn_state_.store(COMMITTED);
} else if (txn_state_ == LOCKS_STOLEN) {
s = Status::Expired();
- } else if (txn_state_ == COMMITED) {
+ } else if (txn_state_ == COMMITTED) {
s = Status::InvalidArgument("Transaction has already been committed.");
} else if (txn_state_ == ROLLEDBACK) {
s = Status::InvalidArgument("Transaction has already been rolledback.");
// We take the commit-time batch and append the Commit marker.
// The Memtable will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
- WriteBatchInternal::MarkCommit(working_batch, name_);
+ auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
+ assert(s.ok());
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();
// insert prepared batch into Memtable only skipping WAL.
// Memtable will ignore BeginPrepare/EndPrepare markers
// in non recovery mode and simply insert the values
- WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());
+ s = WriteBatchInternal::Append(working_batch,
+ GetWriteBatch()->GetWriteBatch());
+ assert(s.ok());
uint64_t seq_used = kMaxSequenceNumber;
- auto s =
- db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
+ s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
/*log_used*/ nullptr, /*log_ref*/ log_number_,
- /*disable_memtable*/ false, &seq_used);
+ /*disable_memtable*/ false, &seq_used);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (s.ok()) {
SetId(seq_used);
}
// prepare couldn't have taken place
Clear();
- } else if (txn_state_ == COMMITED) {
+ } else if (txn_state_ == COMMITTED) {
s = Status::InvalidArgument("This transaction has already been committed.");
} else {
s = Status::InvalidArgument(
Status WriteCommittedTxn::RollbackInternal() {
WriteBatch rollback_marker;
- WriteBatchInternal::MarkRollback(&rollback_marker, name_);
- auto s = db_impl_->WriteImpl(write_options_, &rollback_marker);
+ auto s = WriteBatchInternal::MarkRollback(&rollback_marker, name_);
+ assert(s.ok());
+ s = db_impl_->WriteImpl(write_options_, &rollback_marker);
return s;
}
return Status::InvalidArgument("Transaction is beyond state for rollback.");
}
- // Unlock any keys locked since last transaction
- const std::unique_ptr<TransactionKeyMap>& keys =
- GetTrackedKeysSinceSavePoint();
-
- if (keys) {
- txn_db_impl_->UnLock(this, keys.get());
+ if (save_points_ != nullptr && !save_points_->empty()) {
+ // Unlock any keys locked since last transaction
+ auto& save_point_tracker = *save_points_->top().new_locks_;
+ std::unique_ptr<LockTracker> t(
+ tracked_locks_->GetTrackedLocksSinceSavePoint(save_point_tracker));
+ if (t) {
+ txn_db_impl_->UnLock(this, *t);
+ }
}
return TransactionBaseImpl::RollbackToSavePoint();
// Lock all keys in this batch.
// On success, caller should unlock keys_to_unlock
Status PessimisticTransaction::LockBatch(WriteBatch* batch,
- TransactionKeyMap* keys_to_unlock) {
+ LockTracker* keys_to_unlock) {
class Handler : public WriteBatch::Handler {
public:
// Sorted map of column_family_id to sorted set of keys.
// Iterating on this handler will add all keys in this batch into keys
Handler handler;
- batch->Iterate(&handler);
-
- Status s;
+ Status s = batch->Iterate(&handler);
+ if (!s.ok()) {
+ return s;
+ }
// Attempt to lock all keys
for (const auto& cf_iter : handler.keys_) {
if (!s.ok()) {
break;
}
- TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
- false, true /* exclusive */);
+ PointLockRequest r;
+ r.column_family_id = cfh_id;
+ r.key = key;
+ r.seq = kMaxSequenceNumber;
+ r.read_only = false;
+ r.exclusive = true;
+ keys_to_unlock->Track(r);
}
if (!s.ok()) {
}
if (!s.ok()) {
- txn_db_impl_->UnLock(this, keys_to_unlock);
+ txn_db_impl_->UnLock(this, *keys_to_unlock);
}
return s;
}
uint32_t cfh_id = GetColumnFamilyID(column_family);
std::string key_str = key.ToString();
- bool previously_locked;
- bool lock_upgrade = false;
-
- // lock this key if this transactions hasn't already locked it
- SequenceNumber tracked_at_seq = kMaxSequenceNumber;
-
- const auto& tracked_keys = GetTrackedKeys();
- const auto tracked_keys_cf = tracked_keys.find(cfh_id);
- if (tracked_keys_cf == tracked_keys.end()) {
- previously_locked = false;
- } else {
- auto iter = tracked_keys_cf->second.find(key_str);
- if (iter == tracked_keys_cf->second.end()) {
- previously_locked = false;
- } else {
- if (!iter->second.exclusive && exclusive) {
- lock_upgrade = true;
- }
- previously_locked = true;
- tracked_at_seq = iter->second.seq;
- }
- }
+ PointLockStatus status = tracked_locks_->GetPointLockStatus(cfh_id, key_str);
+ bool previously_locked = status.locked;
+ bool lock_upgrade = previously_locked && exclusive && !status.exclusive;
// Lock this key if this transactions hasn't already locked it or we require
// an upgrade.
// any writes since this transaction's snapshot.
// TODO(agiardullo): could optimize by supporting shared txn locks in the
// future
+ SequenceNumber tracked_at_seq =
+ status.locked ? status.seq : kMaxSequenceNumber;
if (!do_validate || snapshot_ == nullptr) {
if (assume_tracked && !previously_locked) {
s = Status::InvalidArgument(
if (!s.ok()) {
// Failed to validate key
- if (!previously_locked) {
- // Unlock key we just locked
- if (lock_upgrade) {
- s = txn_db_impl_->TryLock(this, cfh_id, key_str,
- false /* exclusive */);
- assert(s.ok());
- } else {
- txn_db_impl_->UnLock(this, cfh_id, key.ToString());
- }
+ // Unlock key we just locked
+ if (lock_upgrade) {
+ s = txn_db_impl_->TryLock(this, cfh_id, key_str,
+ false /* exclusive */);
+ assert(s.ok());
+ } else if (!previously_locked) {
+ txn_db_impl_->UnLock(this, cfh_id, key.ToString());
}
}
}
TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
} else {
#ifndef NDEBUG
- assert(tracked_keys_cf->second.count(key_str) > 0);
- const auto& info = tracked_keys_cf->second.find(key_str)->second;
- assert(info.seq <= tracked_at_seq);
- assert(info.exclusive == exclusive);
+ PointLockStatus lock_status =
+ tracked_locks_->GetPointLockStatus(cfh_id, key_str);
+ assert(lock_status.locked);
+ assert(lock_status.seq <= tracked_at_seq);
+ assert(lock_status.exclusive == exclusive);
#endif
}
}