#include "utilities/transactions/transaction_base.h"
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
+#include <cinttypes>
-#include <inttypes.h>
-
-#include "db/db_impl.h"
#include "db/column_family.h"
+#include "db/db_impl/db_impl.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
+#include "util/cast_util.h"
#include "util/string_util.h"
+#include "utilities/transactions/lock/lock_tracker.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
-TransactionBaseImpl::TransactionBaseImpl(DB* db,
- const WriteOptions& write_options)
+TransactionBaseImpl::TransactionBaseImpl(
+ DB* db, const WriteOptions& write_options,
+ const LockTrackerFactory& lock_tracker_factory)
: db_(db),
- dbimpl_(reinterpret_cast<DBImpl*>(db)),
+ dbimpl_(static_cast_with_check<DBImpl>(db)),
write_options_(write_options),
cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
+ lock_tracker_factory_(lock_tracker_factory),
start_time_(db_->GetEnv()->NowMicros()),
write_batch_(cmp_, 0, true, 0),
+ tracked_locks_(lock_tracker_factory_.Create()),
indexing_enabled_(true) {
assert(dynamic_cast<DBImpl*>(db_) != nullptr);
log_number_ = 0;
if (dbimpl_->allow_2pc()) {
- WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
+ InitWriteBatch();
}
}
save_points_.reset(nullptr);
write_batch_.Clear();
commit_time_batch_.Clear();
- tracked_keys_.clear();
+ tracked_locks_->Clear();
num_puts_ = 0;
num_deletes_ = 0;
num_merges_ = 0;
if (dbimpl_->allow_2pc()) {
- WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
+ InitWriteBatch();
}
}
void TransactionBaseImpl::SetSavePoint() {
if (save_points_ == nullptr) {
- save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint>());
+ save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>());
}
save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
- num_puts_, num_deletes_, num_merges_);
+ num_puts_, num_deletes_, num_merges_,
+ lock_tracker_factory_);
write_batch_.SetSavePoint();
}
assert(s.ok());
// Rollback any keys that were tracked since the last savepoint
- const TransactionKeyMap& key_map = save_point.new_keys_;
- for (const auto& key_map_iter : key_map) {
- uint32_t column_family_id = key_map_iter.first;
- auto& keys = key_map_iter.second;
-
- auto& cf_tracked_keys = tracked_keys_[column_family_id];
-
- for (const auto& key_iter : keys) {
- const std::string& key = key_iter.first;
- uint32_t num_reads = key_iter.second.num_reads;
- uint32_t num_writes = key_iter.second.num_writes;
-
- auto tracked_keys_iter = cf_tracked_keys.find(key);
- assert(tracked_keys_iter != cf_tracked_keys.end());
-
- // Decrement the total reads/writes of this key by the number of
- // reads/writes done since the last SavePoint.
- if (num_reads > 0) {
- assert(tracked_keys_iter->second.num_reads >= num_reads);
- tracked_keys_iter->second.num_reads -= num_reads;
- }
- if (num_writes > 0) {
- assert(tracked_keys_iter->second.num_writes >= num_writes);
- tracked_keys_iter->second.num_writes -= num_writes;
- }
- if (tracked_keys_iter->second.num_reads == 0 &&
- tracked_keys_iter->second.num_writes == 0) {
- tracked_keys_[column_family_id].erase(tracked_keys_iter);
- }
- }
- }
+ tracked_locks_->Subtract(*save_point.new_locks_);
save_points_->pop();
}
assert(!save_points_->empty());
- save_points_->pop();
+ // If there is another savepoint A below the current savepoint B, then A needs
+ // to inherit tracked_keys in B so that if we rollback to savepoint A, we
+ // remember to unlock keys in B. If there is no other savepoint below, then we
+ // can safely discard savepoint info.
+ if (save_points_->size() == 1) {
+ save_points_->pop();
+ } else {
+ TransactionBaseImpl::SavePoint top(lock_tracker_factory_);
+ std::swap(top, save_points_->top());
+ save_points_->pop();
+
+ save_points_->top().new_locks_->Merge(*top.new_locks_);
+ }
+
return write_batch_.PopSavePoint();
}
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
- std::string* value = values ? &(*values)[i] : nullptr;
- stat_list[i] = Get(read_options, column_family[i], keys[i], value);
+ stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
}
return stat_list;
}
+void TransactionBaseImpl::MultiGet(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family,
+ const size_t num_keys, const Slice* keys,
+ PinnableSlice* values, Status* statuses,
+ const bool sorted_input) {
+ write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
+ num_keys, keys, values, statuses,
+ sorted_input);
+}
+
std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
// TODO(agiardullo): optimize multiget?
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
- std::string* value = values ? &(*values)[i] : nullptr;
- stat_list[i] = Get(read_options, column_family[i], keys[i], value);
+ stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
}
return stat_list;
Iterator* db_iter = db_->NewIterator(read_options, column_family);
assert(db_iter);
- return write_batch_.NewIteratorWithBase(column_family, db_iter);
+ return write_batch_.NewIteratorWithBase(column_family, db_iter,
+ &read_options);
}
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
}
void TransactionBaseImpl::PutLogData(const Slice& blob) {
- write_batch_.PutLogData(blob);
+ auto s = write_batch_.PutLogData(blob);
+ (void)s;
+ assert(s.ok());
}
WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; }
uint64_t TransactionBaseImpl::GetNumKeys() const {
- uint64_t count = 0;
-
- // sum up locked keys in all column families
- for (const auto& key_map_iter : tracked_keys_) {
- const auto& keys = key_map_iter.second;
- count += keys.size();
- }
-
- return count;
+ return tracked_locks_->GetNumPointLocks();
}
void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
SequenceNumber seq, bool read_only,
bool exclusive) {
+ PointLockRequest r;
+ r.column_family_id = cfh_id;
+ r.key = key;
+ r.seq = seq;
+ r.read_only = read_only;
+ r.exclusive = exclusive;
+
// Update map of all tracked keys for this transaction
- TrackKey(&tracked_keys_, cfh_id, key, seq, read_only, exclusive);
+ tracked_locks_->Track(r);
if (save_points_ != nullptr && !save_points_->empty()) {
// Update map of tracked keys in this SavePoint
- TrackKey(&save_points_->top().new_keys_, cfh_id, key, seq, read_only,
- exclusive);
- }
-}
-
-// Add a key to the given TransactionKeyMap
-// seq for pessimistic transactions is the sequence number from which we know
-// there has not been a concurrent update to the key.
-void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
- const std::string& key, SequenceNumber seq,
- bool read_only, bool exclusive) {
- auto& cf_key_map = (*key_map)[cfh_id];
- auto iter = cf_key_map.find(key);
- if (iter == cf_key_map.end()) {
- auto result = cf_key_map.insert({key, TransactionKeyMapInfo(seq)});
- iter = result.first;
- } else if (seq < iter->second.seq) {
- // Now tracking this key with an earlier sequence number
- iter->second.seq = seq;
- }
- // else we do not update the seq. The smaller the tracked seq, the stronger it
- // the guarantee since it implies from the seq onward there has not been a
- // concurrent update to the key. So we update the seq if it implies stronger
- // guarantees, i.e., if it is smaller than the existing trakced seq.
-
- if (read_only) {
- iter->second.num_reads++;
- } else {
- iter->second.num_writes++;
- }
- iter->second.exclusive |= exclusive;
-}
-
-std::unique_ptr<TransactionKeyMap>
-TransactionBaseImpl::GetTrackedKeysSinceSavePoint() {
- if (save_points_ != nullptr && !save_points_->empty()) {
- // Examine the number of reads/writes performed on all keys written
- // since the last SavePoint and compare to the total number of reads/writes
- // for each key.
- TransactionKeyMap* result = new TransactionKeyMap();
- for (const auto& key_map_iter : save_points_->top().new_keys_) {
- uint32_t column_family_id = key_map_iter.first;
- auto& keys = key_map_iter.second;
-
- auto& cf_tracked_keys = tracked_keys_[column_family_id];
-
- for (const auto& key_iter : keys) {
- const std::string& key = key_iter.first;
- uint32_t num_reads = key_iter.second.num_reads;
- uint32_t num_writes = key_iter.second.num_writes;
-
- auto total_key_info = cf_tracked_keys.find(key);
- assert(total_key_info != cf_tracked_keys.end());
- assert(total_key_info->second.num_reads >= num_reads);
- assert(total_key_info->second.num_writes >= num_writes);
-
- if (total_key_info->second.num_reads == num_reads &&
- total_key_info->second.num_writes == num_writes) {
- // All the reads/writes to this key were done in the last savepoint.
- bool read_only = (num_writes == 0);
- TrackKey(result, column_family_id, key, key_iter.second.seq,
- read_only, key_iter.second.exclusive);
- }
- }
- }
- return std::unique_ptr<TransactionKeyMap>(result);
+ save_points_->top().new_locks_->Track(r);
}
-
- // No SavePoint
- return nullptr;
}
// Gets the write batch that should be used for Put/Merge/Deletes.
void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) {
- uint32_t column_family_id = GetColumnFamilyID(column_family);
- auto& cf_tracked_keys = tracked_keys_[column_family_id];
- std::string key_str = key.ToString();
- bool can_decrement = false;
- bool can_unlock __attribute__((__unused__)) = false;
+ PointLockRequest r;
+ r.column_family_id = GetColumnFamilyID(column_family);
+ r.key = key.ToString();
+ r.read_only = true;
+ bool can_untrack = false;
if (save_points_ != nullptr && !save_points_->empty()) {
- // Check if this key was fetched ForUpdate in this SavePoint
- auto& cf_savepoint_keys = save_points_->top().new_keys_[column_family_id];
-
- auto savepoint_iter = cf_savepoint_keys.find(key_str);
- if (savepoint_iter != cf_savepoint_keys.end()) {
- if (savepoint_iter->second.num_reads > 0) {
- savepoint_iter->second.num_reads--;
- can_decrement = true;
-
- if (savepoint_iter->second.num_reads == 0 &&
- savepoint_iter->second.num_writes == 0) {
- // No other GetForUpdates or write on this key in this SavePoint
- cf_savepoint_keys.erase(savepoint_iter);
- can_unlock = true;
- }
- }
- }
+ // If there is no GetForUpdate of the key in this save point,
+ // then cannot untrack from the global lock tracker.
+ UntrackStatus s = save_points_->top().new_locks_->Untrack(r);
+ can_untrack = (s != UntrackStatus::NOT_TRACKED);
} else {
- // No SavePoint set
- can_decrement = true;
- can_unlock = true;
- }
-
- // We can only decrement the read count for this key if we were able to
- // decrement the read count in the current SavePoint, OR if there is no
- // SavePoint set.
- if (can_decrement) {
- auto key_iter = cf_tracked_keys.find(key_str);
-
- if (key_iter != cf_tracked_keys.end()) {
- if (key_iter->second.num_reads > 0) {
- key_iter->second.num_reads--;
-
- if (key_iter->second.num_reads == 0 &&
- key_iter->second.num_writes == 0) {
- // No other GetForUpdates or writes on this key
- assert(can_unlock);
- cf_tracked_keys.erase(key_iter);
- UnlockGetForUpdate(column_family, key);
- }
- }
+ // No save point, so can untrack from the global lock tracker.
+ can_untrack = true;
+ }
+
+ if (can_untrack) {
+ // If erased from the global tracker, then can unlock the key.
+ UntrackStatus s = tracked_locks_->Untrack(r);
+ bool can_unlock = (s == UntrackStatus::REMOVED);
+ if (can_unlock) {
+ UnlockGetForUpdate(column_family, key);
}
}
}
WriteBatch* TransactionBaseImpl::GetCommitTimeWriteBatch() {
return &commit_time_batch_;
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE