#include "db/column_family.h"
#include "db/db_impl/db_impl.h"
+#include "logging/logging.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
+Status Transaction::CommitAndTryCreateSnapshot(
+ std::shared_ptr<TransactionNotifier> notifier, TxnTimestamp ts,
+ std::shared_ptr<const Snapshot>* snapshot) {
+ if (snapshot) {
+ snapshot->reset();
+ }
+ TxnTimestamp commit_ts = GetCommitTimestamp();
+ if (commit_ts == kMaxTxnTimestamp) {
+ if (ts == kMaxTxnTimestamp) {
+ return Status::InvalidArgument("Commit timestamp unset");
+ } else {
+ const Status s = SetCommitTimestamp(ts);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ } else if (ts != kMaxTxnTimestamp) {
+ if (ts != commit_ts) {
+ // For now we treat this as error.
+ return Status::InvalidArgument("Different commit ts specified");
+ }
+ }
+ SetSnapshotOnNextOperation(notifier);
+ Status s = Commit();
+ if (!s.ok()) {
+ return s;
+ }
+ assert(s.ok());
+ // If we reach here, we must return ok status for this function.
+ std::shared_ptr<const Snapshot> new_snapshot = GetTimestampedSnapshot();
+
+ if (snapshot) {
+ *snapshot = new_snapshot;
+ }
+ return Status::OK();
+}
+
TransactionBaseImpl::TransactionBaseImpl(
DB* db, const WriteOptions& write_options,
const LockTrackerFactory& lock_tracker_factory)
write_options_(write_options),
cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
lock_tracker_factory_(lock_tracker_factory),
- start_time_(db_->GetEnv()->NowMicros()),
- write_batch_(cmp_, 0, true, 0),
+ start_time_(dbimpl_->GetSystemClock()->NowMicros()),
+ write_batch_(cmp_, 0, true, 0, write_options.protection_bytes_per_key),
tracked_locks_(lock_tracker_factory_.Create()),
+ commit_time_batch_(0 /* reserved_bytes */, 0 /* max_bytes */,
+ write_options.protection_bytes_per_key,
+ 0 /* default_cf_ts_sz */),
indexing_enabled_(true) {
assert(dynamic_cast<DBImpl*>(db_) != nullptr);
log_number_ = 0;
name_.clear();
log_number_ = 0;
write_options_ = write_options;
- start_time_ = db_->GetEnv()->NowMicros();
+ start_time_ = dbimpl_->GetSystemClock()->NowMicros();
indexing_enabled_ = true;
cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily());
+ WriteBatchInternal::UpdateProtectionInfo(
+ write_batch_.GetWriteBatch(), write_options_.protection_bytes_per_key)
+ .PermitUncheckedError();
+ WriteBatchInternal::UpdateProtectionInfo(
+ &commit_time_batch_, write_options_.protection_bytes_per_key)
+ .PermitUncheckedError();
}
void TransactionBaseImpl::SetSnapshot() {
void TransactionBaseImpl::SetSavePoint() {
if (save_points_ == nullptr) {
- save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>());
+ save_points_.reset(
+ new std::stack<TransactionBaseImpl::SavePoint,
+ autovector<TransactionBaseImpl::SavePoint>>());
}
save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
num_puts_, num_deletes_, num_merges_,
}
Status TransactionBaseImpl::PopSavePoint() {
- if (save_points_ == nullptr ||
- save_points_->empty()) {
+ if (save_points_ == nullptr || save_points_->empty()) {
// No SavePoint yet.
assert(write_batch_.PopSavePoint().IsNotFound());
return Status::NotFound();
Iterator* db_iter = db_->NewIterator(read_options);
assert(db_iter);
- return write_batch_.NewIteratorWithBase(db_iter);
+ return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter,
+ &read_options);
}
Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
}
uint64_t TransactionBaseImpl::GetElapsedTime() const {
- return (db_->GetEnv()->NowMicros() - start_time_) / 1000;
+ return (dbimpl_->GetSystemClock()->NowMicros() - start_time_) / 1000;
}
uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_; }
return Status::InvalidArgument();
}
+ Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
+ return Status::InvalidArgument();
+ }
+
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}