]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/forward_iterator.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / forward_iterator.cc
index 013af04e997fd2c6d03bd8ba8d947f9ca190286d..3fbc2cf47066ae409c19f4fdaa54f126e2c61df7 100644 (file)
@@ -33,11 +33,11 @@ namespace ROCKSDB_NAMESPACE {
 //     iter.Next()
 class ForwardLevelIterator : public InternalIterator {
  public:
-  ForwardLevelIterator(const ColumnFamilyData* const cfd,
-                       const ReadOptions& read_options,
-                       const std::vector<FileMetaData*>& files,
-                       const SliceTransform* prefix_extractor,
-                       bool allow_unprepared_value)
+  ForwardLevelIterator(
+      const ColumnFamilyData* const cfd, const ReadOptions& read_options,
+      const std::vector<FileMetaData*>& files,
+      const std::shared_ptr<const SliceTransform>& prefix_extractor,
+      bool allow_unprepared_value)
       : cfd_(cfd),
         read_options_(read_options),
         files_(files),
@@ -46,7 +46,9 @@ class ForwardLevelIterator : public InternalIterator {
         file_iter_(nullptr),
         pinned_iters_mgr_(nullptr),
         prefix_extractor_(prefix_extractor),
-        allow_unprepared_value_(allow_unprepared_value) {}
+        allow_unprepared_value_(allow_unprepared_value) {
+    status_.PermitUncheckedError();  // Allow uninitialized status through
+  }
 
   ~ForwardLevelIterator() override {
     // Reset current pointer
@@ -102,9 +104,7 @@ class ForwardLevelIterator : public InternalIterator {
     status_ = Status::NotSupported("ForwardLevelIterator::Prev()");
     valid_ = false;
   }
-  bool Valid() const override {
-    return valid_;
-  }
+  bool Valid() const override { return valid_; }
   void SeekToFirst() override {
     assert(file_iter_ != nullptr);
     if (!status_.ok()) {
@@ -209,7 +209,8 @@ class ForwardLevelIterator : public InternalIterator {
   Status status_;
   InternalIterator* file_iter_;
   PinnedIteratorsManager* pinned_iters_mgr_;
-  const SliceTransform* prefix_extractor_;
+  // Kept alive by ForwardIterator::sv_->mutable_cf_options
+  const std::shared_ptr<const SliceTransform>& prefix_extractor_;
   const bool allow_unprepared_value_;
 };
 
@@ -238,12 +239,16 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
   if (sv_) {
     RebuildIterators(false);
   }
-}
 
-ForwardIterator::~ForwardIterator() {
-  Cleanup(true);
+  // immutable_status_ is a local aggregation of the
+  // status of the immutable Iterators.
+  // We have to PermitUncheckedError in case it is never
+  // used, otherwise it will fail ASSERT_STATUS_CHECKED.
+  immutable_status_.PermitUncheckedError();
 }
 
+ForwardIterator::~ForwardIterator() { Cleanup(true); }
+
 void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
                                 bool background_purge_on_iterator_cleanup) {
   if (sv->Unref()) {
@@ -275,13 +280,13 @@ struct SVCleanupParams {
   SuperVersion* sv;
   bool background_purge_on_iterator_cleanup;
 };
-}
+}  // anonymous namespace
 
 // Used in PinnedIteratorsManager to release pinned SuperVersion
 void ForwardIterator::DeferredSVCleanup(void* arg) {
   auto d = reinterpret_cast<SVCleanupParams*>(arg);
-  ForwardIterator::SVCleanup(
-    d->db, d->sv, d->background_purge_on_iterator_cleanup);
+  ForwardIterator::SVCleanup(d->db, d->sv,
+                             d->background_purge_on_iterator_cleanup);
   delete d;
 }
 
@@ -342,7 +347,7 @@ void ForwardIterator::SeekToFirst() {
   } else if (immutable_status_.IsIncomplete()) {
     ResetIncompleteIterators();
   }
-  SeekInternal(Slice(), true);
+  SeekInternal(Slice(), true, false);
 }
 
 bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
@@ -360,48 +365,60 @@ void ForwardIterator::Seek(const Slice& internal_key) {
   } else if (immutable_status_.IsIncomplete()) {
     ResetIncompleteIterators();
   }
-  SeekInternal(internal_key, false);
+
+  SeekInternal(internal_key, false, false);
+  if (read_options_.async_io) {
+    SeekInternal(internal_key, false, true);
+  }
 }
 
+// In case of async_io, SeekInternal is called twice with seek_after_async_io
+// enabled in second call which only does seeking part to retrieve the blocks.
 void ForwardIterator::SeekInternal(const Slice& internal_key,
-                                   bool seek_to_first) {
+                                   bool seek_to_first,
+                                   bool seek_after_async_io) {
   assert(mutable_iter_);
   // mutable
-  seek_to_first ? mutable_iter_->SeekToFirst() :
-                  mutable_iter_->Seek(internal_key);
+  if (!seek_after_async_io) {
+    seek_to_first ? mutable_iter_->SeekToFirst()
+                  : mutable_iter_->Seek(internal_key);
+  }
 
   // immutable
   // TODO(ljin): NeedToSeekImmutable has negative impact on performance
   // if it turns to need to seek immutable often. We probably want to have
   // an option to turn it off.
-  if (seek_to_first || NeedToSeekImmutable(internal_key)) {
-    immutable_status_ = Status::OK();
-    if (has_iter_trimmed_for_upper_bound_ &&
-        (
-            // prev_ is not set yet
-            is_prev_set_ == false ||
-            // We are doing SeekToFirst() and internal_key.size() = 0
-            seek_to_first ||
-            // prev_key_ > internal_key
-            cfd_->internal_comparator().InternalKeyComparator::Compare(
-                prev_key_.GetInternalKey(), internal_key) > 0)) {
-      // Some iterators are trimmed. Need to rebuild.
-      RebuildIterators(true);
-      // Already seeked mutable iter, so seek again
-      seek_to_first ? mutable_iter_->SeekToFirst()
-                    : mutable_iter_->Seek(internal_key);
-    }
-    {
-      auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
-      immutable_min_heap_.swap(tmp);
-    }
-    for (size_t i = 0; i < imm_iters_.size(); i++) {
-      auto* m = imm_iters_[i];
-      seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
-      if (!m->status().ok()) {
-        immutable_status_ = m->status();
-      } else if (m->Valid()) {
-        immutable_min_heap_.push(m);
+  if (seek_to_first || seek_after_async_io ||
+      NeedToSeekImmutable(internal_key)) {
+    if (!seek_after_async_io) {
+      immutable_status_ = Status::OK();
+      if (has_iter_trimmed_for_upper_bound_ &&
+          (
+              // prev_ is not set yet
+              is_prev_set_ == false ||
+              // We are doing SeekToFirst() and internal_key.size() = 0
+              seek_to_first ||
+              // prev_key_ > internal_key
+              cfd_->internal_comparator().InternalKeyComparator::Compare(
+                  prev_key_.GetInternalKey(), internal_key) > 0)) {
+        // Some iterators are trimmed. Need to rebuild.
+        RebuildIterators(true);
+        // Already seeked mutable iter, so seek again
+        seek_to_first ? mutable_iter_->SeekToFirst()
+                      : mutable_iter_->Seek(internal_key);
+      }
+      {
+        auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
+        immutable_min_heap_.swap(tmp);
+      }
+      for (size_t i = 0; i < imm_iters_.size(); i++) {
+        auto* m = imm_iters_[i];
+        seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
+        if (!m->status().ok()) {
+          immutable_status_ = m->status();
+        } else if (m->Valid()) {
+          immutable_min_heap_.push(m);
+        }
       }
     }
 
@@ -415,12 +432,19 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
       if (!l0_iters_[i]) {
         continue;
       }
+      if (seek_after_async_io) {
+        if (!l0_iters_[i]->status().IsTryAgain()) {
+          continue;
+        }
+      }
+
       if (seek_to_first) {
         l0_iters_[i]->SeekToFirst();
       } else {
-        // If the target key passes over the larget key, we are sure Next()
+        // If the target key passes over the largest key, we are sure Next()
         // won't go over this file.
-        if (user_comparator_->Compare(target_user_key,
+        if (seek_after_async_io == false &&
+            user_comparator_->Compare(target_user_key,
                                       l0[i]->largest.user_key()) > 0) {
           if (read_options_.iterate_upper_bound != nullptr) {
             has_iter_trimmed_for_upper_bound_ = true;
@@ -432,7 +456,10 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
         l0_iters_[i]->Seek(internal_key);
       }
 
-      if (!l0_iters_[i]->status().ok()) {
+      if (l0_iters_[i]->status().IsTryAgain()) {
+        assert(!seek_after_async_io);
+        continue;
+      } else if (!l0_iters_[i]->status().ok()) {
         immutable_status_ = l0_iters_[i]->status();
       } else if (l0_iters_[i]->Valid() &&
                  !IsOverUpperBound(l0_iters_[i]->key())) {
@@ -453,19 +480,30 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
       if (level_iters_[level - 1] == nullptr) {
         continue;
       }
+
+      if (seek_after_async_io) {
+        if (!level_iters_[level - 1]->status().IsTryAgain()) {
+          continue;
+        }
+      }
       uint32_t f_idx = 0;
-      if (!seek_to_first) {
+      if (!seek_to_first && !seek_after_async_io) {
         f_idx = FindFileInRange(level_files, internal_key, 0,
                                 static_cast<uint32_t>(level_files.size()));
       }
 
       // Seek
-      if (f_idx < level_files.size()) {
-        level_iters_[level - 1]->SetFileIndex(f_idx);
-        seek_to_first ? level_iters_[level - 1]->SeekToFirst() :
-                        level_iters_[level - 1]->Seek(internal_key);
+      if (seek_after_async_io || f_idx < level_files.size()) {
+        if (!seek_after_async_io) {
+          level_iters_[level - 1]->SetFileIndex(f_idx);
+        }
+        seek_to_first ? level_iters_[level - 1]->SeekToFirst()
+                      : level_iters_[level - 1]->Seek(internal_key);
 
-        if (!level_iters_[level - 1]->status().ok()) {
+        if (level_iters_[level - 1]->status().IsTryAgain()) {
+          assert(!seek_after_async_io);
+          continue;
+        } else if (!level_iters_[level - 1]->status().ok()) {
           immutable_status_ = level_iters_[level - 1]->status();
         } else if (level_iters_[level - 1]->Valid() &&
                    !IsOverUpperBound(level_iters_[level - 1]->key())) {
@@ -493,7 +531,11 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
     immutable_min_heap_.push(current_);
   }
 
-  UpdateCurrent();
+  // For async_io, it should be updated when seek_after_async_io is true (in
+  // second call).
+  if (seek_to_first || !read_options_.async_io || seek_after_async_io) {
+    UpdateCurrent();
+  }
   TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
 }
 
@@ -501,8 +543,7 @@ void ForwardIterator::Next() {
   assert(valid_);
   bool update_prev_key = false;
 
-  if (sv_ == nullptr ||
-      sv_->version_number != cfd_->GetSuperVersionNumber()) {
+  if (sv_ == nullptr || sv_->version_number != cfd_->GetSuperVersionNumber()) {
     std::string current_key = key().ToString();
     Slice old_key(current_key.data(), current_key.size());
 
@@ -511,7 +552,12 @@ void ForwardIterator::Next() {
     } else {
       RenewIterators();
     }
-    SeekInternal(old_key, false);
+
+    SeekInternal(old_key, false, false);
+    if (read_options_.async_io) {
+      SeekInternal(old_key, false, true);
+    }
+
     if (!valid_ || key().compare(old_key) != 0) {
       return;
     }
@@ -527,7 +573,6 @@ void ForwardIterator::Next() {
       update_prev_key = true;
     }
 
-
     if (update_prev_key) {
       prev_key_.SetInternalKey(current_->key());
       is_prev_set_ = true;
@@ -584,7 +629,7 @@ bool ForwardIterator::PrepareValue() {
 
   assert(!current_->Valid());
   assert(!current_->status().ok());
-  assert(current_ != mutable_iter_); // memtable iterator can't fail
+  assert(current_ != mutable_iter_);  // memtable iterator can't fail
   assert(immutable_status_.ok());
 
   valid_ = false;
@@ -595,7 +640,7 @@ bool ForwardIterator::PrepareValue() {
 Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
   assert(prop != nullptr);
   if (prop_name == "rocksdb.iterator.super-version-number") {
-    *prop = ToString(sv_->version_number);
+    *prop = std::to_string(sv_->version_number);
     return Status::OK();
   }
   return Status::InvalidArgument();
@@ -659,7 +704,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
   if (!read_options_.ignore_range_deletions) {
     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
         sv_->mem->NewRangeTombstoneIterator(
-            read_options_, sv_->current->version_set()->LastSequence()));
+            read_options_, sv_->current->version_set()->LastSequence(),
+            false /* immutable_memtable */));
     range_del_agg.AddTombstones(std::move(range_del_iter));
     // Always return Status::OK().
     Status temp_s = sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
@@ -684,7 +730,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
     l0_iters_.push_back(cfd_->table_cache()->NewIterator(
         read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
         read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
-        sv_->mutable_cf_options.prefix_extractor.get(),
+        sv_->mutable_cf_options.prefix_extractor,
         /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
         TableReaderCaller::kUserIterator, /*arena=*/nullptr,
         /*skip_filters=*/false, /*level=*/-1,
@@ -692,7 +738,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
         /*smallest_compaction_key=*/nullptr,
         /*largest_compaction_key=*/nullptr, allow_unprepared_value_));
   }
-  BuildLevelIterators(vstorage);
+  BuildLevelIterators(vstorage, sv_);
   current_ = nullptr;
   is_prev_set_ = false;
 
@@ -724,7 +770,8 @@ void ForwardIterator::RenewIterators() {
   if (!read_options_.ignore_range_deletions) {
     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
         svnew->mem->NewRangeTombstoneIterator(
-            read_options_, sv_->current->version_set()->LastSequence()));
+            read_options_, sv_->current->version_set()->LastSequence(),
+            false /* immutable_memtable */));
     range_del_agg.AddTombstones(std::move(range_del_iter));
     // Always return Status::OK().
     Status temp_s = svnew->imm->AddRangeTombstoneIterators(
@@ -764,7 +811,7 @@ void ForwardIterator::RenewIterators() {
         read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
         *l0_files_new[inew],
         read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
-        svnew->mutable_cf_options.prefix_extractor.get(),
+        svnew->mutable_cf_options.prefix_extractor,
         /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
         TableReaderCaller::kUserIterator, /*arena=*/nullptr,
         /*skip_filters=*/false, /*level=*/-1,
@@ -783,7 +830,7 @@ void ForwardIterator::RenewIterators() {
     DeleteIterator(l);
   }
   level_iters_.clear();
-  BuildLevelIterators(vstorage_new);
+  BuildLevelIterators(vstorage_new, svnew);
   current_ = nullptr;
   is_prev_set_ = false;
   SVCleanup();
@@ -797,7 +844,8 @@ void ForwardIterator::RenewIterators() {
   }
 }
 
-void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
+void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage,
+                                          SuperVersion* sv) {
   level_iters_.reserve(vstorage->num_levels() - 1);
   for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
     const auto& level_files = vstorage->LevelFiles(level);
@@ -813,8 +861,7 @@ void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
     } else {
       level_iters_.push_back(new ForwardLevelIterator(
           cfd_, read_options_, level_files,
-          sv_->mutable_cf_options.prefix_extractor.get(),
-          allow_unprepared_value_));
+          sv->mutable_cf_options.prefix_extractor, allow_unprepared_value_));
     }
   }
 }
@@ -830,7 +877,7 @@ void ForwardIterator::ResetIncompleteIterators() {
     l0_iters_[i] = cfd_->table_cache()->NewIterator(
         read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
         *l0_files[i], /*range_del_agg=*/nullptr,
-        sv_->mutable_cf_options.prefix_extractor.get(),
+        sv_->mutable_cf_options.prefix_extractor,
         /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
         TableReaderCaller::kUserIterator, /*arena=*/nullptr,
         /*skip_filters=*/false, /*level=*/-1,
@@ -897,11 +944,11 @@ bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
   }
   Slice prev_key = prev_key_.GetInternalKey();
   if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
-    prefix_extractor_->Transform(prev_key)) != 0) {
+                               prefix_extractor_->Transform(prev_key)) != 0) {
     return true;
   }
   if (cfd_->internal_comparator().InternalKeyComparator::Compare(
-        prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
+          prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
     return true;
   }
 
@@ -910,8 +957,8 @@ bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
     return false;
   }
   if (cfd_->internal_comparator().InternalKeyComparator::Compare(
-        target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
-                                          : current_->key()) > 0) {
+          target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
+                                            : current_->key()) > 0) {
     return true;
   }
   return false;
@@ -985,13 +1032,13 @@ bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters,
 uint32_t ForwardIterator::FindFileInRange(
     const std::vector<FileMetaData*>& files, const Slice& internal_key,
     uint32_t left, uint32_t right) {
-  auto cmp = [&](const FileMetaData* f, const Slice& key) -> bool {
+  auto cmp = [&](const FileMetaData* f, const Slice& k) -> bool {
     return cfd_->internal_comparator().InternalKeyComparator::Compare(
-            f->largest.Encode(), key) < 0;
+               f->largest.Encode(), k) < 0;
   };
-  const auto &b = files.begin();
-  return static_cast<uint32_t>(std::lower_bound(b + left,
-                                 b + right, internal_key, cmp) - b);
+  const autob = files.begin();
+  return static_cast<uint32_t>(
+      std::lower_bound(b + left, b + right, internal_key, cmp) - b);
 }
 
 void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {