#include "utilities/transactions/write_prepared_txn.h"
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
+#include <cinttypes>
#include <map>
#include <set>
#include "db/column_family.h"
-#include "db/db_impl.h"
+#include "db/db_impl/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/transaction_db.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/write_prepared_txn_db.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
struct WriteOptions;
prepare_batch_cnt_ = 0;
}
-Status WritePreparedTxn::Get(const ReadOptions& read_options,
+void WritePreparedTxn::MultiGet(const ReadOptions& options,
+ ColumnFamilyHandle* column_family,
+ const size_t num_keys, const Slice* keys,
+ PinnableSlice* values, Status* statuses,
+ const bool sorted_input) {
+ SequenceNumber min_uncommitted, snap_seq;
+ const SnapshotBackup backed_by_snapshot =
+ wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
+ WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
+ backed_by_snapshot);
+ write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
+ keys, values, statuses, sorted_input,
+ &callback);
+ if (UNLIKELY(!callback.valid() ||
+ !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
+ wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
+ for (size_t i = 0; i < num_keys; i++) {
+ statuses[i] = Status::TryAgain();
+ }
+ }
+}
+
+Status WritePreparedTxn::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {
- auto snapshot = read_options.snapshot;
- auto snap_seq =
- snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
- SequenceNumber min_uncommitted =
- kMinUnCommittedSeq; // by default disable the optimization
- if (snapshot != nullptr) {
- min_uncommitted =
- static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
- ->min_uncommitted_;
+ SequenceNumber min_uncommitted, snap_seq;
+ const SnapshotBackup backed_by_snapshot =
+ wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
+ WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
+ backed_by_snapshot);
+ auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
+ pinnable_val, &callback);
+ if (LIKELY(callback.valid() &&
+ wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
+ backed_by_snapshot))) {
+ return res;
+ } else {
+ wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
+ return Status::TryAgain();
}
-
- WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted);
- return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
- pinnable_val, &callback);
}
Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
assert(!s.ok() || seq_used != kMaxSequenceNumber);
const SequenceNumber commit_batch_seq = seq_used;
if (LIKELY(do_one_write || !s.ok())) {
- if (LIKELY(s.ok())) {
- // Note RemovePrepared should be called after WriteImpl that publishsed
+ if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues &&
+ s.ok())) {
+ // Note: RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
- }
+ } // else RemovePrepared is called from within PreReleaseCallback
if (UNLIKELY(!do_one_write)) {
+ assert(!s.ok());
+ // Cleanup the prepared entry we added with add_prepared_callback
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
}
return s;
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_aux_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
- // Note RemovePrepared should be called after WriteImpl that publishsed the
- // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
- wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
- wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
+ if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) {
+ if (s.ok()) {
+ // Note: RemovePrepared should be called after WriteImpl that publishsed
+ // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
+ wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
+ }
+ wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
+ } // else RemovePrepared is called from within PreReleaseCallback
return s;
}
auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
auto read_at_seq = kMaxSequenceNumber;
+ ReadOptions roptions;
+ // to prevent callback's seq to be overrriden inside DBImpk::Get
+ roptions.snapshot = wpt_db_->GetMaxSnapshot();
struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
DBImpl* db_;
- ReadOptions roptions;
WritePreparedTxnReadCallback callback;
WriteBatch* rollback_batch_;
std::map<uint32_t, const Comparator*>& comparators_;
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
bool rollback_merge_operands_;
+ ReadOptions roptions_;
RollbackWriteBatchBuilder(
DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators,
std::map<uint32_t, ColumnFamilyHandle*>& handles,
- bool rollback_merge_operands)
+ bool rollback_merge_operands, ReadOptions _roptions)
: db_(db),
callback(wpt_db, snap_seq), // disable min_uncommitted optimization
rollback_batch_(dst_batch),
comparators_(comparators),
handles_(handles),
- rollback_merge_operands_(rollback_merge_operands) {}
+ rollback_merge_operands_(rollback_merge_operands),
+ roptions_(_roptions) {}
Status Rollback(uint32_t cf, const Slice& key) {
Status s;
PinnableSlice pinnable_val;
bool not_used;
auto cf_handle = handles_[cf];
- s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used,
- &callback);
+ DBImpl::GetImplOptions get_impl_options;
+ get_impl_options.column_family = cf_handle;
+ get_impl_options.value = &pinnable_val;
+ get_impl_options.value_found = ¬_used;
+ get_impl_options.callback = &callback;
+ s = db_->GetImpl(roptions_, key, get_impl_options);
assert(s.ok() || s.IsNotFound());
if (s.ok()) {
s = rollback_batch_->Put(cf_handle, key, pinnable_val);
bool WriteAfterCommit() const override { return false; }
} rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
*cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
- wpt_db_->txn_db_options_.rollback_merge_operands);
+ wpt_db_->txn_db_options_.rollback_merge_operands,
+ roptions);
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
assert(s.ok());
if (!s.ok()) {
return s;
}
if (do_one_write) {
+ assert(!db_impl_->immutable_db_options().two_write_queues);
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
return s;
} // else do the 2nd write for commit
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"RollbackInternal (status=%s) commit: %" PRIu64,
s.ToString().c_str(), GetId());
+ // TODO(lth): For WriteUnPrepared that rollback is called frequently,
+ // RemovePrepared could be moved to the callback to reduce lock contention.
if (s.ok()) {
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
}
+ // Note: RemovePrepared for prepared batch is called from within
+ // PreReleaseCallback
wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
return s;
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();
- WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted);
+ WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted,
+ kBackedByDBSnapshot);
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snap_seq, false /* cache_only */,
&snap_checker, min_uncommitted);
return ret;
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE