WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options)
- : PessimisticTransaction(txn_db, write_options, txn_options),
- wpt_db_(txn_db) {}
+ : PessimisticTransaction(txn_db, write_options, txn_options, false),
+ wpt_db_(txn_db) {
+ // Call Initialize outside PessimisticTransaction constructor otherwise it
+ // would skip overridden functions in WritePreparedTxn since they are not
+ // defined yet in the constructor of PessimisticTransaction
+ Initialize(txn_options);
+}
void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
PessimisticTransaction::Initialize(txn_options);
auto snapshot = read_options.snapshot;
auto snap_seq =
snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
- SequenceNumber min_uncommitted = 0; // by default disable the optimization
+ SequenceNumber min_uncommitted =
+ kMinUnCommittedSeq; // by default disable the optimization
if (snapshot != nullptr) {
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
const bool WRITE_AFTER_COMMIT = true;
+ const bool kFirstPrepareBatch = true;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
!WRITE_AFTER_COMMIT);
// For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
- // AddPrepared better to be called in the pre-release callback otherwise there
- // is a non-zero chance of max advancing prepare_seq and readers assume the
- // data as committed.
- // Also having it in the PreReleaseCallback allows in-order addition of
- // prepared entries to PrepareHeap and hence enables an optimization. Refer to
+ // Having AddPrepared in the PreReleaseCallback allows in-order addition of
+ // prepared entries to PreparedHeap and hence enables an optimization. Refer to
// SmallestUnCommittedSeq for more details.
AddPreparedCallback add_prepared_callback(
- wpt_db_, prepare_batch_cnt_,
- db_impl_->immutable_db_options().two_write_queues);
+ wpt_db_, db_impl_, prepare_batch_cnt_,
+ db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch);
const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber;
Status s = db_impl_->WriteImpl(
const bool disable_memtable = !includes_data;
const bool do_one_write =
!db_impl_->immutable_db_options().two_write_queues || disable_memtable;
- const bool publish_seq = do_one_write;
- // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to
- // DB in one shot. min_uncommitted still works since it requires capturing
- // data that is written to DB but not yet committed, while
- // CommitTimeWriteBatch commits with PreReleaseCallback.
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
- wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt,
- publish_seq);
+ wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
+ // This is to call AddPrepared on CommitTimeWriteBatch
+ const bool kFirstPrepareBatch = true;
+ AddPreparedCallback add_prepared_callback(
+ wpt_db_, db_impl_, commit_batch_cnt,
+ db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
+ PreReleaseCallback* pre_release_callback;
+ if (do_one_write) {
+ pre_release_callback = &update_commit_map;
+ } else {
+ pre_release_callback = &add_prepared_callback;
+ }
uint64_t seq_used = kMaxSequenceNumber;
// Since the prepared batch is directly written to memtable, there is already
// a connection between the memtable and its WAL, so there is no need to
size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used,
- batch_cnt, &update_commit_map);
+ batch_cnt, pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
+ const SequenceNumber commit_batch_seq = seq_used;
if (LIKELY(do_one_write || !s.ok())) {
if (LIKELY(s.ok())) {
// Note RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
}
+ if (UNLIKELY(!do_one_write)) {
+ wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
+ }
return s;
} // else do the 2nd write to publish seq
// Note: the 2nd write comes with a performance penality. So if we have too
// many of commits accompanied with ComitTimeWriteBatch and yet we cannot
// enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
// two_write_queues should be disabled to avoid many additional writes here.
- class PublishSeqPreReleaseCallback : public PreReleaseCallback {
- public:
- explicit PublishSeqPreReleaseCallback(DBImpl* db_impl)
- : db_impl_(db_impl) {}
- virtual Status Callback(SequenceNumber seq, bool is_mem_disabled) override {
-#ifdef NDEBUG
- (void)is_mem_disabled;
-#endif
- assert(is_mem_disabled);
- assert(db_impl_->immutable_db_options().two_write_queues);
- db_impl_->SetLastPublishedSequence(seq);
- return Status::OK();
- }
-
- private:
- DBImpl* db_impl_;
- } publish_seq_callback(db_impl_);
+ const size_t kZeroData = 0;
+ // Update commit map only from the 2nd queue
+ WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch(
+ wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData,
+ commit_batch_seq, commit_batch_cnt);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator
const uint64_t NO_REF_LOG = 0;
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
- &publish_seq_callback);
+ &update_commit_map_with_aux_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
+ wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
return s;
}
assert(GetId() > 0);
auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
- // In WritePrepared, the txn is is the same as prepare seq
- auto last_visible_txn = GetId() - 1;
+ auto read_at_seq = kMaxSequenceNumber;
struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
DBImpl* db_;
ReadOptions roptions;
std::map<uint32_t, ColumnFamilyHandle*>& handles,
bool rollback_merge_operands)
: db_(db),
- callback(wpt_db, snap_seq,
- 0), // 0 disables min_uncommitted optimization
+ callback(wpt_db, snap_seq), // disable min_uncommitted optimization
rollback_batch_(dst_batch),
comparators_(comparators),
handles_(handles),
}
protected:
- virtual bool WriteAfterCommit() const override { return false; }
- } rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch,
+ bool WriteAfterCommit() const override { return false; }
+ } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
*cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
wpt_db_->txn_db_options_.rollback_merge_operands);
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
const uint64_t NO_REF_LOG = 0;
uint64_t seq_used = kMaxSequenceNumber;
const size_t ONE_BATCH = 1;
- // We commit the rolled back prepared batches. ALthough this is
+ const bool kFirstPrepareBatch = true;
+ // We commit the rolled back prepared batches. Although this is
// counter-intuitive, i) it is safe to do so, since the prepared batches are
// already canceled out by the rollback batch, ii) adding the commit entry to
// CommitCache will allow us to benefit from the existing mechanism in
// CommitCache that keeps an entry evicted due to max advance and yet overlaps
// with a live snapshot around so that the live snapshot properly skips the
// entry even if its prepare seq is lower than max_evicted_seq_.
+ AddPreparedCallback add_prepared_callback(
+ wpt_db_, db_impl_, ONE_BATCH,
+ db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
+ PreReleaseCallback* pre_release_callback;
+ if (do_one_write) {
+ pre_release_callback = &update_commit_map;
+ } else {
+ pre_release_callback = &add_prepared_callback;
+ }
// Note: the rollback batch does not need AddPrepared since it is written to
// DB in one shot. min_uncommitted still works since it requires capturing
// data that is written to DB but not yet committed, while
- // the roolback batch commits with PreReleaseCallback.
+ // the rollback batch commits with PreReleaseCallback.
s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
- do_one_write ? &update_commit_map : nullptr);
+ pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (!s.ok()) {
return s;
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
return s;
} // else do the 2nd write for commit
- uint64_t& prepare_seq = seq_used;
+ uint64_t rollback_seq = seq_used;
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
- "RollbackInternal 2nd write prepare_seq: %" PRIu64,
- prepare_seq);
+ "RollbackInternal 2nd write rollback_seq: %" PRIu64,
+ rollback_seq);
// Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers.
- const size_t ZERO_COMMITS = 0;
- WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
- wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS);
+ WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
+ wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
- // Mark the txn as rolled back
- uint64_t& rollback_seq = seq_used;
+ ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
+ "RollbackInternal (status=%s) commit: %" PRIu64,
+ s.ToString().c_str(), GetId());
if (s.ok()) {
- // Note: it is safe to do it after PreReleaseCallback via WriteImpl since
- // all the writes by the prpared batch are already blinded by the rollback
- // batch. The only reason we commit the prepared batch here is to benefit
- // from the existing mechanism in CommitCache that takes care of the rare
- // cases that the prepare seq is visible to a snsapshot but max evicted seq
- // advances that prepare seq.
- for (size_t i = 0; i < prepare_batch_cnt_; i++) {
- wpt_db_->AddCommitted(GetId() + i, rollback_seq);
- }
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
}
+ wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
return s;
}
WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted);
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snap_seq, false /* cache_only */,
- &snap_checker);
+ &snap_checker, min_uncommitted);
}
void WritePreparedTxn::SetSnapshot() {
- // Note: for this optimization setting the last sequence number and obtaining
- // the smallest uncommitted seq should be done atomically. However to avoid
- // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
- // snapshot. Since we always updated the list of unprepared seq (via
- // AddPrepared) AFTER the last sequence is updated, this guarantees that the
- // smallest uncommited seq that we pair with the snapshot is smaller or equal
- // the value that would be obtained otherwise atomically. That is ok since
- // this optimization works as long as min_uncommitted is less than or equal
- // than the smallest uncommitted seq when the snapshot was taken.
- auto min_uncommitted = wpt_db_->SmallestUnCommittedSeq();
- const bool FOR_WW_CONFLICT_CHECK = true;
- SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK);
- assert(snapshot);
- wpt_db_->EnhanceSnapshot(snapshot, min_uncommitted);
+ const bool kForWWConflictCheck = true;
+ SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck);
SetSnapshotInternal(snapshot);
}