]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/transactions/transaction_base.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / transaction_base.cc
index 212c822429240a0c982602d9018400dfa92bd308..1482dc961d887949d026c1ce716dcff7f3abf391 100644 (file)
@@ -7,34 +7,35 @@
 
 #include "utilities/transactions/transaction_base.h"
 
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
+#include <cinttypes>
 
-#include <inttypes.h>
-
-#include "db/db_impl.h"
 #include "db/column_family.h"
+#include "db/db_impl/db_impl.h"
 #include "rocksdb/comparator.h"
 #include "rocksdb/db.h"
 #include "rocksdb/status.h"
+#include "util/cast_util.h"
 #include "util/string_util.h"
+#include "utilities/transactions/lock/lock_tracker.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
-TransactionBaseImpl::TransactionBaseImpl(DB* db,
-                                         const WriteOptions& write_options)
+TransactionBaseImpl::TransactionBaseImpl(
+    DB* db, const WriteOptions& write_options,
+    const LockTrackerFactory& lock_tracker_factory)
     : db_(db),
-      dbimpl_(reinterpret_cast<DBImpl*>(db)),
+      dbimpl_(static_cast_with_check<DBImpl>(db)),
       write_options_(write_options),
       cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
+      lock_tracker_factory_(lock_tracker_factory),
       start_time_(db_->GetEnv()->NowMicros()),
       write_batch_(cmp_, 0, true, 0),
+      tracked_locks_(lock_tracker_factory_.Create()),
       indexing_enabled_(true) {
   assert(dynamic_cast<DBImpl*>(db_) != nullptr);
   log_number_ = 0;
   if (dbimpl_->allow_2pc()) {
-    WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
+    InitWriteBatch();
   }
 }
 
@@ -47,13 +48,13 @@ void TransactionBaseImpl::Clear() {
   save_points_.reset(nullptr);
   write_batch_.Clear();
   commit_time_batch_.Clear();
-  tracked_keys_.clear();
+  tracked_locks_->Clear();
   num_puts_ = 0;
   num_deletes_ = 0;
   num_merges_ = 0;
 
   if (dbimpl_->allow_2pc()) {
-    WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
+    InitWriteBatch();
   }
 }
 
@@ -123,10 +124,11 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
 
 void TransactionBaseImpl::SetSavePoint() {
   if (save_points_ == nullptr) {
-    save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint>());
+    save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint, autovector<TransactionBaseImpl::SavePoint>>());
   }
   save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
-                        num_puts_, num_deletes_, num_merges_);
+                        num_puts_, num_deletes_, num_merges_,
+                        lock_tracker_factory_);
   write_batch_.SetSavePoint();
 }
 
@@ -146,37 +148,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
     assert(s.ok());
 
     // Rollback any keys that were tracked since the last savepoint
-    const TransactionKeyMap& key_map = save_point.new_keys_;
-    for (const auto& key_map_iter : key_map) {
-      uint32_t column_family_id = key_map_iter.first;
-      auto& keys = key_map_iter.second;
-
-      auto& cf_tracked_keys = tracked_keys_[column_family_id];
-
-      for (const auto& key_iter : keys) {
-        const std::string& key = key_iter.first;
-        uint32_t num_reads = key_iter.second.num_reads;
-        uint32_t num_writes = key_iter.second.num_writes;
-
-        auto tracked_keys_iter = cf_tracked_keys.find(key);
-        assert(tracked_keys_iter != cf_tracked_keys.end());
-
-        // Decrement the total reads/writes of this key by the number of
-        // reads/writes done since the last SavePoint.
-        if (num_reads > 0) {
-          assert(tracked_keys_iter->second.num_reads >= num_reads);
-          tracked_keys_iter->second.num_reads -= num_reads;
-        }
-        if (num_writes > 0) {
-          assert(tracked_keys_iter->second.num_writes >= num_writes);
-          tracked_keys_iter->second.num_writes -= num_writes;
-        }
-        if (tracked_keys_iter->second.num_reads == 0 &&
-            tracked_keys_iter->second.num_writes == 0) {
-          tracked_keys_[column_family_id].erase(tracked_keys_iter);
-        }
-      }
-    }
+    tracked_locks_->Subtract(*save_point.new_locks_);
 
     save_points_->pop();
 
@@ -196,7 +168,20 @@ Status TransactionBaseImpl::PopSavePoint() {
   }
 
   assert(!save_points_->empty());
-  save_points_->pop();
+  // If there is another savepoint A below the current savepoint B, then A needs
+  // to inherit tracked_keys in B so that if we rollback to savepoint A, we
+  // remember to unlock keys in B. If there is no other savepoint below, then we
+  // can safely discard savepoint info.
+  if (save_points_->size() == 1) {
+    save_points_->pop();
+  } else {
+    TransactionBaseImpl::SavePoint top(lock_tracker_factory_);
+    std::swap(top, save_points_->top());
+    save_points_->pop();
+
+    save_points_->top().new_locks_->Merge(*top.new_locks_);
+  }
+
   return write_batch_.PopSavePoint();
 }
 
@@ -274,13 +259,22 @@ std::vector<Status> TransactionBaseImpl::MultiGet(
 
   std::vector<Status> stat_list(num_keys);
   for (size_t i = 0; i < num_keys; ++i) {
-    std::string* value = values ? &(*values)[i] : nullptr;
-    stat_list[i] = Get(read_options, column_family[i], keys[i], value);
+    stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
   }
 
   return stat_list;
 }
 
+void TransactionBaseImpl::MultiGet(const ReadOptions& read_options,
+                                   ColumnFamilyHandle* column_family,
+                                   const size_t num_keys, const Slice* keys,
+                                   PinnableSlice* values, Status* statuses,
+                                   const bool sorted_input) {
+  write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
+                                      num_keys, keys, values, statuses,
+                                      sorted_input);
+}
+
 std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
     const ReadOptions& read_options,
     const std::vector<ColumnFamilyHandle*>& column_family,
@@ -302,8 +296,7 @@ std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
   // TODO(agiardullo): optimize multiget?
   std::vector<Status> stat_list(num_keys);
   for (size_t i = 0; i < num_keys; ++i) {
-    std::string* value = values ? &(*values)[i] : nullptr;
-    stat_list[i] = Get(read_options, column_family[i], keys[i], value);
+    stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
   }
 
   return stat_list;
@@ -321,7 +314,8 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
   Iterator* db_iter = db_->NewIterator(read_options, column_family);
   assert(db_iter);
 
-  return write_batch_.NewIteratorWithBase(column_family, db_iter);
+  return write_batch_.NewIteratorWithBase(column_family, db_iter,
+                                          &read_options);
 }
 
 Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
@@ -536,7 +530,9 @@ Status TransactionBaseImpl::SingleDeleteUntracked(
 }
 
 void TransactionBaseImpl::PutLogData(const Slice& blob) {
-  write_batch_.PutLogData(blob);
+  auto s = write_batch_.PutLogData(blob);
+  (void)s;
+  assert(s.ok());
 }
 
 WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
@@ -554,95 +550,26 @@ uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; }
 uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; }
 
 uint64_t TransactionBaseImpl::GetNumKeys() const {
-  uint64_t count = 0;
-
-  // sum up locked keys in all column families
-  for (const auto& key_map_iter : tracked_keys_) {
-    const auto& keys = key_map_iter.second;
-    count += keys.size();
-  }
-
-  return count;
+  return tracked_locks_->GetNumPointLocks();
 }
 
 void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
                                    SequenceNumber seq, bool read_only,
                                    bool exclusive) {
+  PointLockRequest r;
+  r.column_family_id = cfh_id;
+  r.key = key;
+  r.seq = seq;
+  r.read_only = read_only;
+  r.exclusive = exclusive;
+
   // Update map of all tracked keys for this transaction
-  TrackKey(&tracked_keys_, cfh_id, key, seq, read_only, exclusive);
+  tracked_locks_->Track(r);
 
   if (save_points_ != nullptr && !save_points_->empty()) {
     // Update map of tracked keys in this SavePoint
-    TrackKey(&save_points_->top().new_keys_, cfh_id, key, seq, read_only,
-             exclusive);
-  }
-}
-
-// Add a key to the given TransactionKeyMap
-// seq for pessimistic transactions is the sequence number from which we know
-// there has not been a concurrent update to the key.
-void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
-                                   const std::string& key, SequenceNumber seq,
-                                   bool read_only, bool exclusive) {
-  auto& cf_key_map = (*key_map)[cfh_id];
-  auto iter = cf_key_map.find(key);
-  if (iter == cf_key_map.end()) {
-    auto result = cf_key_map.insert({key, TransactionKeyMapInfo(seq)});
-    iter = result.first;
-  } else if (seq < iter->second.seq) {
-    // Now tracking this key with an earlier sequence number
-    iter->second.seq = seq;
-  }
-  // else we do not update the seq. The smaller the tracked seq, the stronger it
-  // the guarantee since it implies from the seq onward there has not been a
-  // concurrent update to the key. So we update the seq if it implies stronger
-  // guarantees, i.e., if it is smaller than the existing trakced seq.
-
-  if (read_only) {
-    iter->second.num_reads++;
-  } else {
-    iter->second.num_writes++;
-  }
-  iter->second.exclusive |= exclusive;
-}
-
-std::unique_ptr<TransactionKeyMap>
-TransactionBaseImpl::GetTrackedKeysSinceSavePoint() {
-  if (save_points_ != nullptr && !save_points_->empty()) {
-    // Examine the number of reads/writes performed on all keys written
-    // since the last SavePoint and compare to the total number of reads/writes
-    // for each key.
-    TransactionKeyMap* result = new TransactionKeyMap();
-    for (const auto& key_map_iter : save_points_->top().new_keys_) {
-      uint32_t column_family_id = key_map_iter.first;
-      auto& keys = key_map_iter.second;
-
-      auto& cf_tracked_keys = tracked_keys_[column_family_id];
-
-      for (const auto& key_iter : keys) {
-        const std::string& key = key_iter.first;
-        uint32_t num_reads = key_iter.second.num_reads;
-        uint32_t num_writes = key_iter.second.num_writes;
-
-        auto total_key_info = cf_tracked_keys.find(key);
-        assert(total_key_info != cf_tracked_keys.end());
-        assert(total_key_info->second.num_reads >= num_reads);
-        assert(total_key_info->second.num_writes >= num_writes);
-
-        if (total_key_info->second.num_reads == num_reads &&
-            total_key_info->second.num_writes == num_writes) {
-          // All the reads/writes to this key were done in the last savepoint.
-          bool read_only = (num_writes == 0);
-          TrackKey(result, column_family_id, key, key_iter.second.seq,
-                   read_only, key_iter.second.exclusive);
-        }
-      }
-    }
-    return std::unique_ptr<TransactionKeyMap>(result);
+    save_points_->top().new_locks_->Track(r);
   }
-
-  // No SavePoint
-  return nullptr;
 }
 
 // Gets the write batch that should be used for Put/Merge/Deletes.
@@ -670,54 +597,28 @@ void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) {
 
 void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family,
                                            const Slice& key) {
-  uint32_t column_family_id = GetColumnFamilyID(column_family);
-  auto& cf_tracked_keys = tracked_keys_[column_family_id];
-  std::string key_str = key.ToString();
-  bool can_decrement = false;
-  bool can_unlock __attribute__((__unused__)) = false;
+  PointLockRequest r;
+  r.column_family_id = GetColumnFamilyID(column_family);
+  r.key = key.ToString();
+  r.read_only = true;
 
+  bool can_untrack = false;
   if (save_points_ != nullptr && !save_points_->empty()) {
-    // Check if this key was fetched ForUpdate in this SavePoint
-    auto& cf_savepoint_keys = save_points_->top().new_keys_[column_family_id];
-
-    auto savepoint_iter = cf_savepoint_keys.find(key_str);
-    if (savepoint_iter != cf_savepoint_keys.end()) {
-      if (savepoint_iter->second.num_reads > 0) {
-        savepoint_iter->second.num_reads--;
-        can_decrement = true;
-
-        if (savepoint_iter->second.num_reads == 0 &&
-            savepoint_iter->second.num_writes == 0) {
-          // No other GetForUpdates or write on this key in this SavePoint
-          cf_savepoint_keys.erase(savepoint_iter);
-          can_unlock = true;
-        }
-      }
-    }
+    // If there is no GetForUpdate of the key in this save point,
+    // then cannot untrack from the global lock tracker.
+    UntrackStatus s = save_points_->top().new_locks_->Untrack(r);
+    can_untrack = (s != UntrackStatus::NOT_TRACKED);
   } else {
-    // No SavePoint set
-    can_decrement = true;
-    can_unlock = true;
-  }
-
-  // We can only decrement the read count for this key if we were able to
-  // decrement the read count in the current SavePoint, OR if there is no
-  // SavePoint set.
-  if (can_decrement) {
-    auto key_iter = cf_tracked_keys.find(key_str);
-
-    if (key_iter != cf_tracked_keys.end()) {
-      if (key_iter->second.num_reads > 0) {
-        key_iter->second.num_reads--;
-
-        if (key_iter->second.num_reads == 0 &&
-            key_iter->second.num_writes == 0) {
-          // No other GetForUpdates or writes on this key
-          assert(can_unlock);
-          cf_tracked_keys.erase(key_iter);
-          UnlockGetForUpdate(column_family, key);
-        }
-      }
+    // No save point, so can untrack from the global lock tracker.
+    can_untrack = true;
+  }
+
+  if (can_untrack) {
+    // If erased from the global tracker, then can unlock the key.
+    UntrackStatus s = tracked_locks_->Untrack(r);
+    bool can_unlock = (s == UntrackStatus::REMOVED);
+    if (can_unlock) {
+      UnlockGetForUpdate(column_family, key);
     }
   }
 }
@@ -772,6 +673,6 @@ Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) {
 WriteBatch* TransactionBaseImpl::GetCommitTimeWriteBatch() {
   return &commit_time_batch_;
 }
-}  // namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE
 
 #endif  // ROCKSDB_LITE