]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_iter.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_iter.cc
index 701d0d8fcaf9a3d58874cd9b00374ee960cda858..541a5fbed9fdf30f329eb35641982bd91b2bf4f9 100644 (file)
@@ -28,6 +28,7 @@
 #include "util/mutexlock.h"
 #include "util/string_util.h"
 #include "util/trace_replay.h"
+#include "util/user_comparator_wrapper.h"
 
 namespace rocksdb {
 
@@ -117,32 +118,31 @@ class DBIter final: public Iterator {
          uint64_t max_sequential_skip_in_iterations,
          ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
          bool allow_blob)
-      : arena_mode_(arena_mode),
-        env_(_env),
+      : env_(_env),
         logger_(cf_options.info_log),
         user_comparator_(cmp),
         merge_operator_(cf_options.merge_operator),
         iter_(iter),
+        read_callback_(read_callback),
         sequence_(s),
-        direction_(kForward),
-        valid_(false),
-        current_entry_is_merged_(false),
         statistics_(cf_options.statistics),
         num_internal_keys_skipped_(0),
         iterate_lower_bound_(read_options.iterate_lower_bound),
         iterate_upper_bound_(read_options.iterate_upper_bound),
+        direction_(kForward),
+        valid_(false),
+        current_entry_is_merged_(false),
         prefix_same_as_start_(read_options.prefix_same_as_start),
         pin_thru_lifetime_(read_options.pin_data),
         total_order_seek_(read_options.total_order_seek),
-        range_del_agg_(cf_options.internal_comparator, s,
-                       true /* collapse_deletions */),
-        read_callback_(read_callback),
-        db_impl_(db_impl),
-        cfd_(cfd),
         allow_blob_(allow_blob),
         is_blob_(false),
+        arena_mode_(arena_mode),
+        range_del_agg_(&cf_options.internal_comparator, s),
+        db_impl_(db_impl),
+        cfd_(cfd),
         start_seqnum_(read_options.iter_start_seqnum) {
-    RecordTick(statistics_, NO_ITERATORS);
+    RecordTick(statistics_, NO_ITERATOR_CREATED);
     prefix_extractor_ = mutable_cf_options.prefix_extractor.get();
     max_skip_ = max_sequential_skip_in_iterations;
     max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
@@ -153,14 +153,12 @@ class DBIter final: public Iterator {
       iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
     }
   }
-  virtual ~DBIter() {
+  ~DBIter() override {
     // Release pinned data if any
     if (pinned_iters_mgr_.PinningEnabled()) {
       pinned_iters_mgr_.ReleasePinnedData();
     }
-    // Compiler warning issue filed:
-    // https://github.com/facebook/rocksdb/issues/3013
-    RecordTick(statistics_, NO_ITERATORS, uint64_t(-1));
+    RecordTick(statistics_, NO_ITERATOR_DELETED);
     ResetInternalKeysSkippedCounter();
     local_stats_.BumpGlobalStatistics(statistics_);
     if (!arena_mode_) {
@@ -174,21 +172,20 @@ class DBIter final: public Iterator {
     iter_ = iter;
     iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
   }
-  virtual RangeDelAggregator* GetRangeDelAggregator() {
+  virtual ReadRangeDelAggregator* GetRangeDelAggregator() {
     return &range_del_agg_;
   }
 
-  virtual bool Valid() const override { return valid_; }
-  virtual Slice key() const override {
+  bool Valid() const override { return valid_; }
+  Slice key() const override {
     assert(valid_);
     if(start_seqnum_ > 0) {
       return saved_key_.GetInternalKey();
     } else {
       return saved_key_.GetUserKey();
     }
-
   }
-  virtual Slice value() const override {
+  Slice value() const override {
     assert(valid_);
     if (current_entry_is_merged_) {
       // If pinned_value_ is set then the result of merge operator is one of
@@ -200,7 +197,7 @@ class DBIter final: public Iterator {
       return iter_->value();
     }
   }
-  virtual Status status() const override {
+  Status status() const override {
     if (status_.ok()) {
       return iter_->status();
     } else {
@@ -213,8 +210,7 @@ class DBIter final: public Iterator {
     return is_blob_;
   }
 
-  virtual Status GetProperty(std::string prop_name,
-                             std::string* prop) override {
+  Status GetProperty(std::string prop_name, std::string* prop) override {
     if (prop == nullptr) {
       return Status::InvalidArgument("prop is nullptr");
     }
@@ -232,17 +228,22 @@ class DBIter final: public Iterator {
       *prop = saved_key_.GetUserKey().ToString();
       return Status::OK();
     }
-    return Status::InvalidArgument("Undentified property.");
+    return Status::InvalidArgument("Unidentified property.");
   }
 
-  virtual void Next() override;
-  virtual void Prev() override;
-  virtual void Seek(const Slice& target) override;
-  virtual void SeekForPrev(const Slice& target) override;
-  virtual void SeekToFirst() override;
-  virtual void SeekToLast() override;
+  void Next() override;
+  void Prev() override;
+  void Seek(const Slice& target) override;
+  void SeekForPrev(const Slice& target) override;
+  void SeekToFirst() override;
+  void SeekToLast() override;
   Env* env() { return env_; }
-  void set_sequence(uint64_t s) { sequence_ = s; }
+  void set_sequence(uint64_t s) {
+    sequence_ = s;
+    if (read_callback_) {
+      read_callback_->Refresh(s);
+    }
+  }
   void set_valid(bool v) { valid_ = v; }
 
  private:
@@ -262,7 +263,7 @@ class DBIter final: public Iterator {
 
   void PrevInternal();
   bool TooManyInternalKeysSkipped(bool increment = true);
-  bool IsVisible(SequenceNumber sequence);
+  inline bool IsVisible(SequenceNumber sequence);
 
   // CanReseekToSkip() returns whether the iterator can use the optimization
   // where it reseek by sequence number to get the next key when there are too
@@ -270,12 +271,6 @@ class DBIter final: public Iterator {
   // sequence number does not guarantee that it is visible.
   inline bool CanReseekToSkip();
 
-  // MaxVisibleSequenceNumber() returns the maximum visible sequence number
-  // for this snapshot. This sequence number may be greater than snapshot
-  // seqno because uncommitted data written to DB for write unprepared will
-  // have a higher sequence number.
-  inline SequenceNumber MaxVisibleSequenceNumber();
-
   // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
   // is called
   void TempPinData() {
@@ -309,15 +304,16 @@ class DBIter final: public Iterator {
   }
 
   const SliceTransform* prefix_extractor_;
-  bool arena_mode_;
   Env* const env_;
   Logger* logger_;
-  const Comparator* const user_comparator_;
+  UserComparatorWrapper user_comparator_;
   const MergeOperator* const merge_operator_;
   InternalIterator* iter_;
+  ReadCallback* read_callback_;
+  // Max visible sequence number. It is normally the snapshot seq unless we have
+  // uncommitted data in db as in WriteUnCommitted.
   SequenceNumber sequence_;
 
-  Status status_;
   IterKey saved_key_;
   // Reusable internal key data structure. This is only used inside one function
   // and should not be used across functions. Reusing this object can reduce
@@ -325,9 +321,6 @@ class DBIter final: public Iterator {
   ParsedInternalKey ikey_;
   std::string saved_value_;
   Slice pinned_value_;
-  Direction direction_;
-  bool valid_;
-  bool current_entry_is_merged_;
   // for prefix seek mode to support prev()
   Statistics* statistics_;
   uint64_t max_skip_;
@@ -335,23 +328,29 @@ class DBIter final: public Iterator {
   uint64_t num_internal_keys_skipped_;
   const Slice* iterate_lower_bound_;
   const Slice* iterate_upper_bound_;
+
   IterKey prefix_start_buf_;
+
+  Status status_;
   Slice prefix_start_key_;
+  Direction direction_;
+  bool valid_;
+  bool current_entry_is_merged_;
   const bool prefix_same_as_start_;
   // Means that we will pin all data blocks we read as long the Iterator
   // is not deleted, will be true if ReadOptions::pin_data is true
   const bool pin_thru_lifetime_;
   const bool total_order_seek_;
+  bool allow_blob_;
+  bool is_blob_;
+  bool arena_mode_;
   // List of operands for merge operator.
   MergeContext merge_context_;
-  RangeDelAggregator range_del_agg_;
+  ReadRangeDelAggregator range_del_agg_;
   LocalStatistics local_stats_;
   PinnedIteratorsManager pinned_iters_mgr_;
-  ReadCallback* read_callback_;
   DBImpl* db_impl_;
   ColumnFamilyData* cfd_;
-  bool allow_blob_;
-  bool is_blob_;
   // for diff snapshots we want the lower bound on the seqnum;
   // if this value > 0 iterator will return internal keys
   SequenceNumber start_seqnum_;
@@ -377,6 +376,7 @@ void DBIter::Next() {
   assert(valid_);
   assert(status_.ok());
 
+  PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, env_);
   // Release temporarily pinned blocks from last operation
   ReleaseTempPinnedData();
   ResetInternalKeysSkippedCounter();
@@ -457,7 +457,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
     }
 
     if (iterate_upper_bound_ != nullptr &&
-        user_comparator_->Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
+        user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
       break;
     }
 
@@ -472,8 +472,8 @@ bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
     }
 
     if (IsVisible(ikey_.sequence)) {
-      if (skipping && user_comparator_->Compare(ikey_.user_key,
-                                                saved_key_.GetUserKey()) <= 0) {
+      if (skipping && user_comparator_.Compare(ikey_.user_key,
+                                               saved_key_.GetUserKey()) <= 0) {
         num_skipped++;  // skip this entry
         PERF_COUNTER_ADD(internal_key_skipped_count, 1);
       } else {
@@ -580,7 +580,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
       // If this happens too many times in a row for the same user key, we want
       // to seek to the target sequence number.
       int cmp =
-          user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey());
+          user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey());
       if (cmp == 0 || (skipping && cmp <= 0)) {
         num_skipped++;
       } else {
@@ -656,7 +656,7 @@ bool DBIter::MergeValuesNewToOld() {
       return false;
     }
 
-    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
       // hit the next user key, stop right here
       break;
     } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
@@ -734,6 +734,8 @@ bool DBIter::MergeValuesNewToOld() {
 void DBIter::Prev() {
   assert(valid_);
   assert(status_.ok());
+
+  PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, env_);
   ReleaseTempPinnedData();
   ResetInternalKeysSkippedCounter();
   bool ok = true;
@@ -774,8 +776,7 @@ bool DBIter::ReverseToForward() {
     if (!ParseKey(&ikey)) {
       return false;
     }
-    if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >=
-        0) {
+    if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
       return true;
     }
     iter_->Next();
@@ -838,8 +839,8 @@ void DBIter::PrevInternal() {
     }
 
     if (iterate_lower_bound_ != nullptr &&
-        user_comparator_->Compare(saved_key_.GetUserKey(),
-                                  *iterate_lower_bound_) < 0) {
+        user_comparator_.Compare(saved_key_.GetUserKey(),
+                                 *iterate_lower_bound_) < 0) {
       // We've iterated earlier than the user-specified lower bound.
       valid_ = false;
       return;
@@ -900,7 +901,7 @@ bool DBIter::FindValueForCurrentKey() {
     }
 
     if (!IsVisible(ikey.sequence) ||
-        !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+        !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
       break;
     }
     if (TooManyInternalKeysSkipped()) {
@@ -1053,7 +1054,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
     if (!ParseKey(&ikey)) {
       return false;
     }
-    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
       // No visible values for this key, even though FindValueForCurrentKey()
       // has seen some. This is possible if we're using a tailing iterator, and
       // the entries were discarded in a compaction.
@@ -1109,13 +1110,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
     if (!ParseKey(&ikey)) {
       return false;
     }
-    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
       break;
     }
 
     if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
         range_del_agg_.ShouldDelete(
-            ikey, RangeDelPositioningMode::kBackwardTraversal)) {
+            ikey, RangeDelPositioningMode::kForwardTraversal)) {
       break;
     } else if (ikey.type == kTypeValue) {
       const Slice val = iter_->value();
@@ -1191,7 +1192,7 @@ bool DBIter::FindUserKeyBeforeSavedKey() {
       return false;
     }
 
-    if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
+    if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
       return true;
     }
 
@@ -1246,30 +1247,25 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) {
 }
 
 bool DBIter::IsVisible(SequenceNumber sequence) {
-  return sequence <= MaxVisibleSequenceNumber() &&
-         (read_callback_ == nullptr || read_callback_->IsVisible(sequence));
-}
-
-bool DBIter::CanReseekToSkip() {
-  return read_callback_ == nullptr ||
-         read_callback_->MaxUnpreparedSequenceNumber() == 0;
-}
-
-SequenceNumber DBIter::MaxVisibleSequenceNumber() {
   if (read_callback_ == nullptr) {
-    return sequence_;
+    return sequence <= sequence_;
+  } else {
+    return read_callback_->IsVisible(sequence);
   }
+}
 
-  return std::max(sequence_, read_callback_->MaxUnpreparedSequenceNumber());
+bool DBIter::CanReseekToSkip() {
+  return read_callback_ == nullptr || read_callback_->CanReseekToSkip();
 }
 
 void DBIter::Seek(const Slice& target) {
+  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
   StopWatch sw(env_, statistics_, DB_SEEK);
   status_ = Status::OK();
   ReleaseTempPinnedData();
   ResetInternalKeysSkippedCounter();
 
-  SequenceNumber seq = MaxVisibleSequenceNumber();
+  SequenceNumber seq = sequence_;
   saved_key_.Clear();
   saved_key_.SetInternalKey(target, seq);
 
@@ -1280,8 +1276,8 @@ void DBIter::Seek(const Slice& target) {
 #endif  // ROCKSDB_LITE
 
   if (iterate_lower_bound_ != nullptr &&
-      user_comparator_->Compare(saved_key_.GetUserKey(),
-                                *iterate_lower_bound_) < 0) {
+      user_comparator_.Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) <
+          0) {
     saved_key_.Clear();
     saved_key_.SetInternalKey(*iterate_lower_bound_, seq);
   }
@@ -1321,6 +1317,7 @@ void DBIter::Seek(const Slice& target) {
 }
 
 void DBIter::SeekForPrev(const Slice& target) {
+  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
   StopWatch sw(env_, statistics_, DB_SEEK);
   status_ = Status::OK();
   ReleaseTempPinnedData();
@@ -1331,8 +1328,8 @@ void DBIter::SeekForPrev(const Slice& target) {
                             kValueTypeForSeekForPrev);
 
   if (iterate_upper_bound_ != nullptr &&
-      user_comparator_->Compare(saved_key_.GetUserKey(),
-                                *iterate_upper_bound_) >= 0) {
+      user_comparator_.Compare(saved_key_.GetUserKey(),
+                               *iterate_upper_bound_) >= 0) {
     saved_key_.Clear();
     saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
   }
@@ -1381,6 +1378,7 @@ void DBIter::SeekToFirst() {
     Seek(*iterate_lower_bound_);
     return;
   }
+  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
   // Don't use iter_::Seek() if we set a prefix extractor
   // because prefix seek will be used.
   if (prefix_extractor_ != nullptr && !total_order_seek_) {
@@ -1425,13 +1423,14 @@ void DBIter::SeekToLast() {
   if (iterate_upper_bound_ != nullptr) {
     // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
     SeekForPrev(*iterate_upper_bound_);
-    if (Valid() && user_comparator_->Equal(*iterate_upper_bound_, key())) {
+    if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
       ReleaseTempPinnedData();
       PrevInternal();
     }
     return;
   }
 
+  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
   // Don't use iter_::Seek() if we set a prefix extractor
   // because prefix seek will be used.
   if (prefix_extractor_ != nullptr && !total_order_seek_) {
@@ -1482,7 +1481,7 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
 
 ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
 
-RangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() {
+ReadRangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() {
   return db_iter_->GetRangeDelAggregator();
 }
 
@@ -1552,13 +1551,17 @@ Status ArenaWrappedDBIter::Refresh() {
     new (&arena_) Arena();
 
     SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
+    if (read_callback_) {
+      read_callback_->Refresh(latest_seq);
+    }
     Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
          latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
          cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,
          allow_refresh_);
 
     InternalIterator* internal_iter = db_impl_->NewInternalIterator(
-        read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator());
+        read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
+        latest_seq);
     SetIterUnderDBIter(internal_iter);
   } else {
     db_iter_->set_sequence(latest_seq);