]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_iter.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / db_iter.cc
index f619d64f803da1a8160e993428206b9a40b80ba9..701d0d8fcaf9a3d58874cd9b00374ee960cda858 100644 (file)
@@ -1,16 +1,15 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include "db/db_iter.h"
-#include <stdexcept>
-#include <deque>
 #include <string>
+#include <iostream>
 #include <limits>
 
 #include "db/dbformat.h"
@@ -18,7 +17,6 @@
 #include "db/merge_helper.h"
 #include "db/pinned_iterators_manager.h"
 #include "monitoring/perf_context_imp.h"
-#include "port/port.h"
 #include "rocksdb/env.h"
 #include "rocksdb/iterator.h"
 #include "rocksdb/merge_operator.h"
@@ -29,6 +27,7 @@
 #include "util/logging.h"
 #include "util/mutexlock.h"
 #include "util/string_util.h"
+#include "util/trace_replay.h"
 
 namespace rocksdb {
 
@@ -50,12 +49,17 @@ static void DumpInternalIter(Iterator* iter) {
 // combines multiple entries for the same userkey found in the DB
 // representation into a single entry while accounting for sequence
 // numbers, deletion markers, overwrites, etc.
-class DBIter: public Iterator {
+class DBIter final: public Iterator {
  public:
   // The following is grossly complicated. TODO: clean it up
   // Which direction is the iterator currently moving?
-  // (1) When moving forward, the internal iterator is positioned at
-  //     the exact entry that yields this->key(), this->value()
+  // (1) When moving forward:
+  //   (1a) if current_entry_is_merged_ = false, the internal iterator is
+  //        positioned at the exact entry that yields this->key(), this->value()
+  //   (1b) if current_entry_is_merged_ = true, the internal iterator is
+  //        positioned immediately after the last entry that contributed to the
+  //        current this->value(). That entry may or may not have key equal to
+  //        this->key().
   // (2) When moving backwards, the internal iterator is positioned
   //     just before all entries whose user key == this->key().
   enum Direction {
@@ -78,6 +82,7 @@ class DBIter: public Iterator {
       prev_count_ = 0;
       prev_found_count_ = 0;
       bytes_read_ = 0;
+      skip_count_ = 0;
     }
 
     void BumpGlobalStatistics(Statistics* global_statistics) {
@@ -86,6 +91,8 @@ class DBIter: public Iterator {
       RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
       RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
       RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
+      RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_);
+      PERF_COUNTER_ADD(iter_read_bytes, bytes_read_);
       ResetCounters();
     }
 
@@ -99,14 +106,19 @@ class DBIter: public Iterator {
     uint64_t prev_found_count_;
     // Map to Tickers::ITER_BYTES_READ
     uint64_t bytes_read_;
+    // Map to Tickers::NUMBER_ITER_SKIP
+    uint64_t skip_count_;
   };
 
-  DBIter(Env* env, const ReadOptions& read_options,
-         const ImmutableCFOptions& cf_options, const Comparator* cmp,
+  DBIter(Env* _env, const ReadOptions& read_options,
+         const ImmutableCFOptions& cf_options,
+         const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
          InternalIterator* iter, SequenceNumber s, bool arena_mode,
-         uint64_t max_sequential_skip_in_iterations, uint64_t version_number)
+         uint64_t max_sequential_skip_in_iterations,
+         ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
+         bool allow_blob)
       : arena_mode_(arena_mode),
-        env_(env),
+        env_(_env),
         logger_(cf_options.info_log),
         user_comparator_(cmp),
         merge_operator_(cf_options.merge_operator),
@@ -116,15 +128,22 @@ class DBIter: public Iterator {
         valid_(false),
         current_entry_is_merged_(false),
         statistics_(cf_options.statistics),
-        version_number_(version_number),
+        num_internal_keys_skipped_(0),
+        iterate_lower_bound_(read_options.iterate_lower_bound),
         iterate_upper_bound_(read_options.iterate_upper_bound),
         prefix_same_as_start_(read_options.prefix_same_as_start),
         pin_thru_lifetime_(read_options.pin_data),
         total_order_seek_(read_options.total_order_seek),
         range_del_agg_(cf_options.internal_comparator, s,
-                       true /* collapse_deletions */) {
+                       true /* collapse_deletions */),
+        read_callback_(read_callback),
+        db_impl_(db_impl),
+        cfd_(cfd),
+        allow_blob_(allow_blob),
+        is_blob_(false),
+        start_seqnum_(read_options.iter_start_seqnum) {
     RecordTick(statistics_, NO_ITERATORS);
-    prefix_extractor_ = cf_options.prefix_extractor;
+    prefix_extractor_ = mutable_cf_options.prefix_extractor.get();
     max_skip_ = max_sequential_skip_in_iterations;
     max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
     if (pin_thru_lifetime_) {
@@ -139,7 +158,10 @@ class DBIter: public Iterator {
     if (pinned_iters_mgr_.PinningEnabled()) {
       pinned_iters_mgr_.ReleasePinnedData();
     }
-    RecordTick(statistics_, NO_ITERATORS, -1);
+    // Compiler warning issue filed:
+    // https://github.com/facebook/rocksdb/issues/3013
+    RecordTick(statistics_, NO_ITERATORS, uint64_t(-1));
+    ResetInternalKeysSkippedCounter();
     local_stats_.BumpGlobalStatistics(statistics_);
     if (!arena_mode_) {
       delete iter_;
@@ -159,7 +181,12 @@ class DBIter: public Iterator {
   virtual bool Valid() const override { return valid_; }
   virtual Slice key() const override {
     assert(valid_);
-    return saved_key_.GetUserKey();
+    if(start_seqnum_ > 0) {
+      return saved_key_.GetInternalKey();
+    } else {
+      return saved_key_.GetUserKey();
+    }
+
   }
   virtual Slice value() const override {
     assert(valid_);
@@ -177,9 +204,14 @@ class DBIter: public Iterator {
     if (status_.ok()) {
       return iter_->status();
     } else {
+      assert(!valid_);
       return status_;
     }
   }
+  bool IsBlob() const {
+    assert(valid_ && (allow_blob_ || !is_blob_));
+    return is_blob_;
+  }
 
   virtual Status GetProperty(std::string prop_name,
                              std::string* prop) override {
@@ -188,10 +220,7 @@ class DBIter: public Iterator {
     }
     if (prop_name == "rocksdb.iterator.super-version-number") {
       // First try to pass the value returned from inner iterator.
-      if (!iter_->GetProperty(prop_name, prop).ok()) {
-        *prop = ToString(version_number_);
-      }
-      return Status::OK();
+      return iter_->GetProperty(prop_name, prop);
     } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
       if (valid_) {
         *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
@@ -199,6 +228,9 @@ class DBIter: public Iterator {
         *prop = "Iterator is not valid.";
       }
       return Status::OK();
+    } else if (prop_name == "rocksdb.iterator.internal-key") {
+      *prop = saved_key_.GetUserKey().ToString();
+      return Status::OK();
     }
     return Status::InvalidArgument("Undentified property.");
   }
@@ -209,21 +241,40 @@ class DBIter: public Iterator {
   virtual void SeekForPrev(const Slice& target) override;
   virtual void SeekToFirst() override;
   virtual void SeekToLast() override;
+  Env* env() { return env_; }
+  void set_sequence(uint64_t s) { sequence_ = s; }
+  void set_valid(bool v) { valid_ = v; }
 
  private:
-  void ReverseToForward();
-  void ReverseToBackward();
-  void PrevInternal();
-  void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
+  // For all methods in this block:
+  // PRE: iter_->Valid() && status_.ok()
+  // Return false if there was an error, and status() is non-ok, valid_ = false;
+  // in this case callers would usually stop what they were doing and return.
+  bool ReverseToForward();
+  bool ReverseToBackward();
   bool FindValueForCurrentKey();
   bool FindValueForCurrentKeyUsingSeek();
-  void FindPrevUserKey();
-  void FindNextUserKey();
-  inline void FindNextUserEntry(bool skipping, bool prefix_check);
-  void FindNextUserEntryInternal(bool skipping, bool prefix_check);
+  bool FindUserKeyBeforeSavedKey();
+  inline bool FindNextUserEntry(bool skipping, bool prefix_check);
+  bool FindNextUserEntryInternal(bool skipping, bool prefix_check);
   bool ParseKey(ParsedInternalKey* key);
-  void MergeValuesNewToOld();
+  bool MergeValuesNewToOld();
+
+  void PrevInternal();
   bool TooManyInternalKeysSkipped(bool increment = true);
+  bool IsVisible(SequenceNumber sequence);
+
+  // CanReseekToSkip() returns whether the iterator can use the optimization
+  // where it reseek by sequence number to get the next key when there are too
+  // many versions. This is disabled for write unprepared because seeking to
+  // sequence number does not guarantee that it is visible.
+  inline bool CanReseekToSkip();
+
+  // MaxVisibleSequenceNumber() returns the maximum visible sequence number
+  // for this snapshot. This sequence number may be greater than snapshot
+  // seqno because uncommitted data written to DB for write unprepared will
+  // have a higher sequence number.
+  inline SequenceNumber MaxVisibleSequenceNumber();
 
   // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
   // is called
@@ -250,6 +301,10 @@ class DBIter: public Iterator {
   }
 
   inline void ResetInternalKeysSkippedCounter() {
+    local_stats_.skip_count_ += num_internal_keys_skipped_;
+    if (valid_) {
+      local_stats_.skip_count_--;
+    }
     num_internal_keys_skipped_ = 0;
   }
 
@@ -260,10 +315,14 @@ class DBIter: public Iterator {
   const Comparator* const user_comparator_;
   const MergeOperator* const merge_operator_;
   InternalIterator* iter_;
-  SequenceNumber const sequence_;
+  SequenceNumber sequence_;
 
   Status status_;
   IterKey saved_key_;
+  // Reusable internal key data structure. This is only used inside one function
+  // and should not be used across functions. Reusing this object can reduce
+  // overhead of calling construction of the function if creating it each time.
+  ParsedInternalKey ikey_;
   std::string saved_value_;
   Slice pinned_value_;
   Direction direction_;
@@ -274,7 +333,7 @@ class DBIter: public Iterator {
   uint64_t max_skip_;
   uint64_t max_skippable_internal_keys_;
   uint64_t num_internal_keys_skipped_;
-  uint64_t version_number_;
+  const Slice* iterate_lower_bound_;
   const Slice* iterate_upper_bound_;
   IterKey prefix_start_buf_;
   Slice prefix_start_key_;
@@ -288,6 +347,14 @@ class DBIter: public Iterator {
   RangeDelAggregator range_del_agg_;
   LocalStatistics local_stats_;
   PinnedIteratorsManager pinned_iters_mgr_;
+  ReadCallback* read_callback_;
+  DBImpl* db_impl_;
+  ColumnFamilyData* cfd_;
+  bool allow_blob_;
+  bool is_blob_;
+  // for diff snapshots we want the lower bound on the seqnum;
+  // if this value > 0 iterator will return internal keys
+  SequenceNumber start_seqnum_;
 
   // No copying allowed
   DBIter(const DBIter&);
@@ -297,6 +364,7 @@ class DBIter: public Iterator {
 inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
   if (!ParseInternalKey(iter_->key(), ikey)) {
     status_ = Status::Corruption("corrupted internal key in DBIter");
+    valid_ = false;
     ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
                     iter_->key().ToString(true).c_str());
     return false;
@@ -307,12 +375,16 @@ inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
 
 void DBIter::Next() {
   assert(valid_);
+  assert(status_.ok());
 
   // Release temporarily pinned blocks from last operation
   ReleaseTempPinnedData();
   ResetInternalKeysSkippedCounter();
+  bool ok = true;
   if (direction_ == kReverse) {
-    ReverseToForward();
+    if (!ReverseToForward()) {
+      ok = false;
+    }
   } else if (iter_->Valid() && !current_entry_is_merged_) {
     // If the current value is not a merge, the iter position is the
     // current key, which is already returned. We can safely issue a
@@ -326,13 +398,12 @@ void DBIter::Next() {
   if (statistics_ != nullptr) {
     local_stats_.next_count_++;
   }
-  // Now we point to the next internal position, for both of merge and
-  // not merge cases.
-  if (!iter_->Valid()) {
+  if (ok && iter_->Valid()) {
+    FindNextUserEntry(true /* skipping the current user key */,
+                      prefix_same_as_start_);
+  } else {
     valid_ = false;
-    return;
   }
-  FindNextUserEntry(true /* skipping the current user key */, prefix_same_as_start_);
   if (statistics_ != nullptr && valid_) {
     local_stats_.next_found_count_++;
     local_stats_.bytes_read_ += (key().size() + value().size());
@@ -353,15 +424,16 @@ void DBIter::Next() {
 // keys against the prefix of the seeked key. Set to false when
 // performing a seek without a key (e.g. SeekToFirst). Set to
 // prefix_same_as_start_ for other iterations.
-inline void DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
+inline bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
   PERF_TIMER_GUARD(find_next_user_entry_time);
-  FindNextUserEntryInternal(skipping, prefix_check);
+  return FindNextUserEntryInternal(skipping, prefix_check);
 }
 
 // Actual implementation of DBIter::FindNextUserEntry()
-void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
+bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
   // Loop until we hit an acceptable entry to yield
   assert(iter_->Valid());
+  assert(status_.ok());
   assert(direction_ == kForward);
   current_entry_is_merged_ = false;
 
@@ -377,73 +449,112 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
   //  - none of the above  : saved_key_ can contain anything, it doesn't matter.
   uint64_t num_skipped = 0;
 
-  do {
-    ParsedInternalKey ikey;
+  is_blob_ = false;
 
-    if (!ParseKey(&ikey)) {
-      // Skip corrupted keys.
-      iter_->Next();
-      continue;
+  do {
+    if (!ParseKey(&ikey_)) {
+      return false;
     }
 
     if (iterate_upper_bound_ != nullptr &&
-        user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) {
+        user_comparator_->Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
       break;
     }
 
     if (prefix_extractor_ && prefix_check &&
-        prefix_extractor_->Transform(ikey.user_key)
-          .compare(prefix_start_key_) != 0) {
+        prefix_extractor_->Transform(ikey_.user_key)
+                .compare(prefix_start_key_) != 0) {
       break;
     }
 
     if (TooManyInternalKeysSkipped()) {
-      return;
+      return false;
     }
 
-    if (ikey.sequence <= sequence_) {
-      if (skipping &&
-          user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) <=
-              0) {
+    if (IsVisible(ikey_.sequence)) {
+      if (skipping && user_comparator_->Compare(ikey_.user_key,
+                                                saved_key_.GetUserKey()) <= 0) {
         num_skipped++;  // skip this entry
         PERF_COUNTER_ADD(internal_key_skipped_count, 1);
       } else {
         num_skipped = 0;
-        switch (ikey.type) {
+        switch (ikey_.type) {
           case kTypeDeletion:
           case kTypeSingleDeletion:
             // Arrange to skip all upcoming entries for this key since
             // they are hidden by this deletion.
-            saved_key_.SetUserKey(
-                ikey.user_key,
-                !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
-            skipping = true;
-            PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
-            break;
-          case kTypeValue:
-            saved_key_.SetUserKey(
-                ikey.user_key,
-                !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
-            if (range_del_agg_.ShouldDelete(
-                    ikey, RangeDelAggregator::RangePositioningMode::
-                              kForwardTraversal)) {
-              // Arrange to skip all upcoming entries for this key since
-              // they are hidden by this deletion.
+            // if iterartor specified start_seqnum we
+            // 1) return internal key, including the type
+            // 2) return ikey only if ikey.seqnum >= start_seqnum_
+            // note that if deletion seqnum is < start_seqnum_ we
+            // just skip it like in normal iterator.
+            if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_)  {
+              saved_key_.SetInternalKey(ikey_);
+              valid_ = true;
+              return true;
+            } else {
+              saved_key_.SetUserKey(
+                ikey_.user_key,
+                !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
               skipping = true;
-              num_skipped = 0;
               PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
+            }
+            break;
+          case kTypeValue:
+          case kTypeBlobIndex:
+            if (start_seqnum_ > 0) {
+              // we are taking incremental snapshot here
+              // incremental snapshots aren't supported on DB with range deletes
+              assert(!(
+                (ikey_.type == kTypeBlobIndex) && (start_seqnum_ > 0)
+              ));
+              if (ikey_.sequence >= start_seqnum_) {
+                saved_key_.SetInternalKey(ikey_);
+                valid_ = true;
+                return true;
+              } else {
+                // this key and all previous versions shouldn't be included,
+                // skipping
+                saved_key_.SetUserKey(ikey_.user_key,
+                  !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
+                skipping = true;
+              }
             } else {
-              valid_ = true;
-              return;
+              saved_key_.SetUserKey(
+                  ikey_.user_key,
+                  !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
+              if (range_del_agg_.ShouldDelete(
+                      ikey_, RangeDelPositioningMode::kForwardTraversal)) {
+                // Arrange to skip all upcoming entries for this key since
+                // they are hidden by this deletion.
+                skipping = true;
+                num_skipped = 0;
+                PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
+              } else if (ikey_.type == kTypeBlobIndex) {
+                if (!allow_blob_) {
+                  ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
+                  status_ = Status::NotSupported(
+                      "Encounter unexpected blob index. Please open DB with "
+                      "rocksdb::blob_db::BlobDB instead.");
+                  valid_ = false;
+                  return false;
+                }
+
+                is_blob_ = true;
+                valid_ = true;
+                return true;
+              } else {
+                valid_ = true;
+                return true;
+              }
             }
             break;
           case kTypeMerge:
             saved_key_.SetUserKey(
-                ikey.user_key,
-                !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
+                ikey_.user_key,
+                !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
             if (range_del_agg_.ShouldDelete(
-                    ikey, RangeDelAggregator::RangePositioningMode::
-                              kForwardTraversal)) {
+                    ikey_, RangeDelPositioningMode::kForwardTraversal)) {
               // Arrange to skip all upcoming entries for this key since
               // they are hidden by this deletion.
               skipping = true;
@@ -454,8 +565,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
               // value
               current_entry_is_merged_ = true;
               valid_ = true;
-              MergeValuesNewToOld();  // Go to a different state machine
-              return;
+              return MergeValuesNewToOld();  // Go to a different state machine
             }
             break;
           default:
@@ -464,17 +574,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
         }
       }
     } else {
-      // This key was inserted after our snapshot was taken.
       PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
 
-      // Here saved_key_ may contain some old key, or the default empty key, or
-      // key assigned by some random other method. We don't care.
-      if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) <=
-          0) {
+      // This key was inserted after our snapshot was taken.
+      // If this happens too many times in a row for the same user key, we want
+      // to seek to the target sequence number.
+      int cmp =
+          user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey());
+      if (cmp == 0 || (skipping && cmp <= 0)) {
         num_skipped++;
       } else {
         saved_key_.SetUserKey(
-            ikey.user_key,
+            ikey_.user_key,
             !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
         skipping = false;
         num_skipped = 0;
@@ -483,7 +594,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
 
     // If we have sequentially iterated via numerous equal keys, then it's
     // better to seek so that we can avoid too many key comparisons.
-    if (num_skipped > max_skip_) {
+    if (num_skipped > max_skip_ && CanReseekToSkip()) {
       num_skipped = 0;
       std::string last_key;
       if (skipping) {
@@ -510,7 +621,9 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
       iter_->Next();
     }
   } while (iter_->Valid());
+
   valid_ = false;
+  return iter_->status().ok();
 }
 
 // Merge values of the same user key starting from the current iter_ position
@@ -519,12 +632,12 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
 //      saved_key_ stores the user key
 // POST: saved_value_ has the merged value for the user key
 //       iter_ points to the next entry (or invalid)
-void DBIter::MergeValuesNewToOld() {
+bool DBIter::MergeValuesNewToOld() {
   if (!merge_operator_) {
     ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
     status_ = Status::InvalidArgument("merge_operator_ must be set.");
     valid_ = false;
-    return;
+    return false;
   }
 
   // Temporarily pin the blocks that hold merge operands
@@ -533,13 +646,14 @@ void DBIter::MergeValuesNewToOld() {
   // Start the merge process by pushing the first operand
   merge_context_.PushOperand(iter_->value(),
                              iter_->IsValuePinned() /* operand_pinned */);
+  TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
 
   ParsedInternalKey ikey;
   Status s;
   for (iter_->Next(); iter_->Valid(); iter_->Next()) {
+    TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
     if (!ParseKey(&ikey)) {
-      // skip corrupted key
-      continue;
+      return false;
     }
 
     if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
@@ -547,8 +661,7 @@ void DBIter::MergeValuesNewToOld() {
       break;
     } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
                range_del_agg_.ShouldDelete(
-                   ikey, RangeDelAggregator::RangePositioningMode::
-                             kForwardTraversal)) {
+                   ikey, RangeDelPositioningMode::kForwardTraversal)) {
       // hit a delete with the same user key, stop right here
       // iter_ is positioned after delete
       iter_->Next();
@@ -556,28 +669,50 @@ void DBIter::MergeValuesNewToOld() {
     } else if (kTypeValue == ikey.type) {
       // hit a put, merge the put value with operands and store the
       // final result in saved_value_. We are done!
-      // ignore corruption if there is any.
       const Slice val = iter_->value();
       s = MergeHelper::TimedFullMerge(
           merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
-          &saved_value_, logger_, statistics_, env_, &pinned_value_);
+          &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
       if (!s.ok()) {
+        valid_ = false;
         status_ = s;
+        return false;
       }
       // iter_ is positioned after put
       iter_->Next();
-      return;
+      if (!iter_->status().ok()) {
+        valid_ = false;
+        return false;
+      }
+      return true;
     } else if (kTypeMerge == ikey.type) {
       // hit a merge, add the value as an operand and run associative merge.
       // when complete, add result to operands and continue.
       merge_context_.PushOperand(iter_->value(),
                                  iter_->IsValuePinned() /* operand_pinned */);
       PERF_COUNTER_ADD(internal_merge_count, 1);
+    } else if (kTypeBlobIndex == ikey.type) {
+      if (!allow_blob_) {
+        ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
+        status_ = Status::NotSupported(
+            "Encounter unexpected blob index. Please open DB with "
+            "rocksdb::blob_db::BlobDB instead.");
+      } else {
+        status_ =
+            Status::NotSupported("Blob DB does not support merge operator.");
+      }
+      valid_ = false;
+      return false;
     } else {
       assert(false);
     }
   }
 
+  if (!iter_->status().ok()) {
+    valid_ = false;
+    return false;
+  }
+
   // we either exhausted all internal keys under this user key, or hit
   // a deletion marker.
   // feed null as the existing value to the merge operator, such that
@@ -585,20 +720,31 @@ void DBIter::MergeValuesNewToOld() {
   s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
                                   nullptr, merge_context_.GetOperands(),
                                   &saved_value_, logger_, statistics_, env_,
-                                  &pinned_value_);
+                                  &pinned_value_, true);
   if (!s.ok()) {
+    valid_ = false;
     status_ = s;
+    return false;
   }
+
+  assert(status_.ok());
+  return true;
 }
 
 void DBIter::Prev() {
   assert(valid_);
+  assert(status_.ok());
   ReleaseTempPinnedData();
   ResetInternalKeysSkippedCounter();
+  bool ok = true;
   if (direction_ == kForward) {
-    ReverseToBackward();
+    if (!ReverseToBackward()) {
+      ok = false;
+    }
+  }
+  if (ok) {
+    PrevInternal();
   }
-  PrevInternal();
   if (statistics_ != nullptr) {
     local_stats_.prev_count_++;
     if (valid_) {
@@ -608,113 +754,132 @@ void DBIter::Prev() {
   }
 }
 
-void DBIter::ReverseToForward() {
-  if (prefix_extractor_ != nullptr && !total_order_seek_) {
+bool DBIter::ReverseToForward() {
+  assert(iter_->status().ok());
+
+  // When moving backwards, iter_ is positioned on _previous_ key, which may
+  // not exist or may have different prefix than the current key().
+  // If that's the case, seek iter_ to current key.
+  if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) {
     IterKey last_key;
     last_key.SetInternalKey(ParsedInternalKey(
         saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
     iter_->Seek(last_key.GetInternalKey());
   }
-  FindNextUserKey();
+
   direction_ = kForward;
-  if (!iter_->Valid()) {
-    iter_->SeekToFirst();
-    range_del_agg_.InvalidateTombstoneMapPositions();
+  // Skip keys less than the current key() (a.k.a. saved_key_).
+  while (iter_->Valid()) {
+    ParsedInternalKey ikey;
+    if (!ParseKey(&ikey)) {
+      return false;
+    }
+    if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >=
+        0) {
+      return true;
+    }
+    iter_->Next();
+  }
+
+  if (!iter_->status().ok()) {
+    valid_ = false;
+    return false;
   }
+
+  return true;
 }
 
-void DBIter::ReverseToBackward() {
-  if (prefix_extractor_ != nullptr && !total_order_seek_) {
+// Move iter_ to the key before saved_key_.
+bool DBIter::ReverseToBackward() {
+  assert(iter_->status().ok());
+
+  // When current_entry_is_merged_ is true, iter_ may be positioned on the next
+  // key, which may not exist or may have prefix different from current.
+  // If that's the case, seek to saved_key_.
+  if (current_entry_is_merged_ &&
+      ((prefix_extractor_ != nullptr && !total_order_seek_) ||
+       !iter_->Valid())) {
     IterKey last_key;
-    last_key.SetInternalKey(ParsedInternalKey(saved_key_.GetUserKey(), 0,
-                                              kValueTypeForSeekForPrev));
-    iter_->SeekForPrev(last_key.GetInternalKey());
-  }
-  if (current_entry_is_merged_) {
-    // Not placed in the same key. Need to call Prev() until finding the
-    // previous key.
-    if (!iter_->Valid()) {
-      iter_->SeekToLast();
-      range_del_agg_.InvalidateTombstoneMapPositions();
-    }
-    ParsedInternalKey ikey;
-    FindParseableKey(&ikey, kReverse);
-    while (iter_->Valid() &&
-           user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >
-               0) {
-      if (ikey.sequence > sequence_) {
-        PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
-      } else {
-        PERF_COUNTER_ADD(internal_key_skipped_count, 1);
+    // Using kMaxSequenceNumber and kValueTypeForSeek
+    // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
+    // than saved_key_.
+    last_key.SetInternalKey(ParsedInternalKey(
+        saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
+    if (prefix_extractor_ != nullptr && !total_order_seek_) {
+      iter_->SeekForPrev(last_key.GetInternalKey());
+    } else {
+      // Some iterators may not support SeekForPrev(), so we avoid using it
+      // when prefix seek mode is disabled. This is somewhat expensive
+      // (an extra Prev(), as well as an extra change of direction of iter_),
+      // so we may need to reconsider it later.
+      iter_->Seek(last_key.GetInternalKey());
+      if (!iter_->Valid() && iter_->status().ok()) {
+        iter_->SeekToLast();
       }
-      iter_->Prev();
-      FindParseableKey(&ikey, kReverse);
     }
   }
-#ifndef NDEBUG
-  if (iter_->Valid()) {
-    ParsedInternalKey ikey;
-    assert(ParseKey(&ikey));
-    assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) <=
-           0);
-  }
-#endif
 
-  FindPrevUserKey();
   direction_ = kReverse;
+  return FindUserKeyBeforeSavedKey();
 }
 
 void DBIter::PrevInternal() {
-  if (!iter_->Valid()) {
-    valid_ = false;
-    return;
-  }
-
-  ParsedInternalKey ikey;
-
   while (iter_->Valid()) {
     saved_key_.SetUserKey(
         ExtractUserKey(iter_->key()),
         !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
 
-    if (FindValueForCurrentKey()) {
-      valid_ = true;
-      if (!iter_->Valid()) {
-        return;
-      }
-      FindParseableKey(&ikey, kReverse);
-      if (user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
-        FindPrevUserKey();
-      }
-      if (valid_ && prefix_extractor_ && prefix_same_as_start_ &&
-          prefix_extractor_->Transform(saved_key_.GetUserKey())
-                  .compare(prefix_start_key_) != 0) {
-        valid_ = false;
-      }
+    if (prefix_extractor_ && prefix_same_as_start_ &&
+        prefix_extractor_->Transform(saved_key_.GetUserKey())
+                .compare(prefix_start_key_) != 0) {
+      // Current key does not have the same prefix as start
+      valid_ = false;
       return;
     }
 
-    if (TooManyInternalKeysSkipped(false)) {
+    if (iterate_lower_bound_ != nullptr &&
+        user_comparator_->Compare(saved_key_.GetUserKey(),
+                                  *iterate_lower_bound_) < 0) {
+      // We've iterated earlier than the user-specified lower bound.
+      valid_ = false;
       return;
     }
 
-    if (!iter_->Valid()) {
-      break;
+    if (!FindValueForCurrentKey()) {  // assigns valid_
+      return;
+    }
+
+    // Whether or not we found a value for current key, we need iter_ to end up
+    // on a smaller key.
+    if (!FindUserKeyBeforeSavedKey()) {
+      return;
     }
-    FindParseableKey(&ikey, kReverse);
-    if (user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
-      FindPrevUserKey();
+
+    if (valid_) {
+      // Found the value.
+      return;
+    }
+
+    if (TooManyInternalKeysSkipped(false)) {
+      return;
     }
   }
+
   // We haven't found any key - iterator is not valid
-  // Or the prefix is different than start prefix
-  assert(!iter_->Valid());
   valid_ = false;
 }
 
-// This function checks, if the entry with biggest sequence_number <= sequence_
-// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
-// saved_value_
+// Used for backwards iteration.
+// Looks at the entries with user key saved_key_ and finds the most up-to-date
+// value for it, or executes a merge, or determines that the value was deleted.
+// Sets valid_ to true if the value is found and is ready to be presented to
+// the user through value().
+// Sets valid_ to false if the value was deleted, and we should try another key.
+// Returns false if an error occurred, and !status().ok() and !valid_.
+//
+// PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
+// POST: iter_ is positioned on one of the entries equal to saved_key_, or on
+//       the entry just before them, or on the entry just after them.
 bool DBIter::FindValueForCurrentKey() {
   assert(iter_->Valid());
   merge_context_.Clear();
@@ -724,30 +889,37 @@ bool DBIter::FindValueForCurrentKey() {
   ValueType last_not_merge_type = kTypeDeletion;
   ValueType last_key_entry_type = kTypeDeletion;
 
-  ParsedInternalKey ikey;
-  FindParseableKey(&ikey, kReverse);
-
   // Temporarily pin blocks that hold (merge operands / the value)
   ReleaseTempPinnedData();
   TempPinData();
   size_t num_skipped = 0;
-  while (iter_->Valid() && ikey.sequence <= sequence_ &&
-         user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+  while (iter_->Valid()) {
+    ParsedInternalKey ikey;
+    if (!ParseKey(&ikey)) {
+      return false;
+    }
+
+    if (!IsVisible(ikey.sequence) ||
+        !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+      break;
+    }
     if (TooManyInternalKeysSkipped()) {
       return false;
     }
 
-    // We iterate too much: let's use Seek() to avoid too much key comparisons
-    if (num_skipped >= max_skip_) {
+    // This user key has lots of entries.
+    // We're going from old to new, and it's taking too long. Let's do a Seek()
+    // and go from new to old. This helps when a key was overwritten many times.
+    if (num_skipped >= max_skip_ && CanReseekToSkip()) {
       return FindValueForCurrentKeyUsingSeek();
     }
 
     last_key_entry_type = ikey.type;
     switch (last_key_entry_type) {
       case kTypeValue:
+      case kTypeBlobIndex:
         if (range_del_agg_.ShouldDelete(
-                ikey,
-                RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
+                ikey, RangeDelPositioningMode::kBackwardTraversal)) {
           last_key_entry_type = kTypeRangeDeletion;
           PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
         } else {
@@ -765,8 +937,7 @@ bool DBIter::FindValueForCurrentKey() {
         break;
       case kTypeMerge:
         if (range_del_agg_.ShouldDelete(
-                ikey,
-                RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
+                ikey, RangeDelPositioningMode::kBackwardTraversal)) {
           merge_context_.Clear();
           last_key_entry_type = kTypeRangeDeletion;
           last_not_merge_type = last_key_entry_type;
@@ -783,19 +954,23 @@ bool DBIter::FindValueForCurrentKey() {
     }
 
     PERF_COUNTER_ADD(internal_key_skipped_count, 1);
-    assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()));
     iter_->Prev();
     ++num_skipped;
-    FindParseableKey(&ikey, kReverse);
+  }
+
+  if (!iter_->status().ok()) {
+    valid_ = false;
+    return false;
   }
 
   Status s;
+  is_blob_ = false;
   switch (last_key_entry_type) {
     case kTypeDeletion:
     case kTypeSingleDeletion:
     case kTypeRangeDeletion:
       valid_ = false;
-      return false;
+      return true;
     case kTypeMerge:
       current_entry_is_merged_ = true;
       if (last_not_merge_type == kTypeDeletion ||
@@ -804,31 +979,58 @@ bool DBIter::FindValueForCurrentKey() {
         s = MergeHelper::TimedFullMerge(
             merge_operator_, saved_key_.GetUserKey(), nullptr,
             merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
-            env_, &pinned_value_);
+            env_, &pinned_value_, true);
+      } else if (last_not_merge_type == kTypeBlobIndex) {
+        if (!allow_blob_) {
+          ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
+          status_ = Status::NotSupported(
+              "Encounter unexpected blob index. Please open DB with "
+              "rocksdb::blob_db::BlobDB instead.");
+        } else {
+          status_ =
+              Status::NotSupported("Blob DB does not support merge operator.");
+        }
+        valid_ = false;
+        return false;
       } else {
         assert(last_not_merge_type == kTypeValue);
         s = MergeHelper::TimedFullMerge(
             merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
             merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
-            env_, &pinned_value_);
+            env_, &pinned_value_, true);
       }
       break;
     case kTypeValue:
-      // do nothing - we've already has value in saved_value_
+      // do nothing - we've already has value in pinned_value_
+      break;
+    case kTypeBlobIndex:
+      if (!allow_blob_) {
+        ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
+        status_ = Status::NotSupported(
+            "Encounter unexpected blob index. Please open DB with "
+            "rocksdb::blob_db::BlobDB instead.");
+        valid_ = false;
+        return false;
+      }
+      is_blob_ = true;
       break;
     default:
       assert(false);
       break;
   }
-  valid_ = true;
   if (!s.ok()) {
+    valid_ = false;
     status_ = s;
+    return false;
   }
+  valid_ = true;
   return true;
 }
 
 // This function is used in FindValueForCurrentKey.
 // We use Seek() function instead of Prev() to find necessary value
+// TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
+//       Would be nice to reuse some code.
 bool DBIter::FindValueForCurrentKeyUsingSeek() {
   // FindValueForCurrentKey will enable pinning before calling
   // FindValueForCurrentKeyUsingSeek()
@@ -839,17 +1041,48 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
   iter_->Seek(last_key);
   RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
 
-  // assume there is at least one parseable key for this user key
+  // In case read_callback presents, the value we seek to may not be visible.
+  // Find the next value that's visible.
   ParsedInternalKey ikey;
-  FindParseableKey(&ikey, kForward);
+  while (true) {
+    if (!iter_->Valid()) {
+      valid_ = false;
+      return iter_->status().ok();
+    }
+
+    if (!ParseKey(&ikey)) {
+      return false;
+    }
+    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+      // No visible values for this key, even though FindValueForCurrentKey()
+      // has seen some. This is possible if we're using a tailing iterator, and
+      // the entries were discarded in a compaction.
+      valid_ = false;
+      return true;
+    }
+
+    if (IsVisible(ikey.sequence)) {
+      break;
+    }
+
+    iter_->Next();
+  }
 
   if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
       range_del_agg_.ShouldDelete(
-          ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
+          ikey, RangeDelPositioningMode::kBackwardTraversal)) {
+    valid_ = false;
+    return true;
+  }
+  if (ikey.type == kTypeBlobIndex && !allow_blob_) {
+    ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
+    status_ = Status::NotSupported(
+        "Encounter unexpected blob index. Please open DB with "
+        "rocksdb::blob_db::BlobDB instead.");
     valid_ = false;
     return false;
   }
-  if (ikey.type == kTypeValue) {
+  if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
     assert(iter_->IsValuePinned());
     pinned_value_ = iter_->value();
     valid_ = true;
@@ -858,110 +1091,146 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
 
   // kTypeMerge. We need to collect all kTypeMerge values and save them
   // in operands
+  assert(ikey.type == kTypeMerge);
   current_entry_is_merged_ = true;
   merge_context_.Clear();
-  while (
-      iter_->Valid() &&
-      user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) &&
-      ikey.type == kTypeMerge &&
-      !range_del_agg_.ShouldDelete(
-          ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
-    merge_context_.PushOperand(iter_->value(),
-                               iter_->IsValuePinned() /* operand_pinned */);
-    PERF_COUNTER_ADD(internal_merge_count, 1);
+  merge_context_.PushOperand(iter_->value(),
+                             iter_->IsValuePinned() /* operand_pinned */);
+  while (true) {
     iter_->Next();
-    FindParseableKey(&ikey, kForward);
-  }
 
-  Status s;
-  if (!iter_->Valid() ||
-      !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) ||
-      ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
-      range_del_agg_.ShouldDelete(
-          ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
-    s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
-                                    nullptr, merge_context_.GetOperands(),
-                                    &saved_value_, logger_, statistics_, env_,
-                                    &pinned_value_);
-    // Make iter_ valid and point to saved_key_
-    if (!iter_->Valid() ||
-        !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
-      iter_->Seek(last_key);
-      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
+    if (!iter_->Valid()) {
+      if (!iter_->status().ok()) {
+        valid_ = false;
+        return false;
+      }
+      break;
     }
-    valid_ = true;
-    if (!s.ok()) {
-      status_ = s;
+    if (!ParseKey(&ikey)) {
+      return false;
+    }
+    if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+      break;
+    }
+
+    if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
+        range_del_agg_.ShouldDelete(
+            ikey, RangeDelPositioningMode::kBackwardTraversal)) {
+      break;
+    } else if (ikey.type == kTypeValue) {
+      const Slice val = iter_->value();
+      Status s = MergeHelper::TimedFullMerge(
+          merge_operator_, saved_key_.GetUserKey(), &val,
+          merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
+          env_, &pinned_value_, true);
+      if (!s.ok()) {
+        valid_ = false;
+        status_ = s;
+        return false;
+      }
+      valid_ = true;
+      return true;
+    } else if (ikey.type == kTypeMerge) {
+      merge_context_.PushOperand(iter_->value(),
+                                 iter_->IsValuePinned() /* operand_pinned */);
+      PERF_COUNTER_ADD(internal_merge_count, 1);
+    } else if (ikey.type == kTypeBlobIndex) {
+      if (!allow_blob_) {
+        ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
+        status_ = Status::NotSupported(
+            "Encounter unexpected blob index. Please open DB with "
+            "rocksdb::blob_db::BlobDB instead.");
+      } else {
+        status_ =
+            Status::NotSupported("Blob DB does not support merge operator.");
+      }
+      valid_ = false;
+      return false;
+    } else {
+      assert(false);
     }
-    return true;
   }
 
-  const Slice& val = iter_->value();
-  s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
-                                  &val, merge_context_.GetOperands(),
-                                  &saved_value_, logger_, statistics_, env_,
-                                  &pinned_value_);
-  valid_ = true;
+  Status s = MergeHelper::TimedFullMerge(
+      merge_operator_, saved_key_.GetUserKey(), nullptr,
+      merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
+      &pinned_value_, true);
   if (!s.ok()) {
+    valid_ = false;
     status_ = s;
+    return false;
   }
-  return true;
-}
 
-// Used in Next to change directions
-// Go to next user key
-// Don't use Seek(),
-// because next user key will be very close
-void DBIter::FindNextUserKey() {
-  if (!iter_->Valid()) {
-    return;
-  }
-  ParsedInternalKey ikey;
-  FindParseableKey(&ikey, kForward);
-  while (iter_->Valid() &&
-         !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
-    iter_->Next();
-    FindParseableKey(&ikey, kForward);
+  // Make sure we leave iter_ in a good state. If it's valid and we don't care
+  // about prefixes, that's already good enough. Otherwise it needs to be
+  // seeked to the current key.
+  if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) {
+    if (prefix_extractor_ != nullptr && !total_order_seek_) {
+      iter_->SeekForPrev(last_key);
+    } else {
+      iter_->Seek(last_key);
+      if (!iter_->Valid() && iter_->status().ok()) {
+        iter_->SeekToLast();
+      }
+    }
+    RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
   }
+
+  valid_ = true;
+  return true;
 }
 
-// Go to previous user_key
-void DBIter::FindPrevUserKey() {
-  if (!iter_->Valid()) {
-    return;
-  }
+// Move backwards until the key smaller than saved_key_.
+// Changes valid_ only if return value is false.
+bool DBIter::FindUserKeyBeforeSavedKey() {
+  assert(status_.ok());
   size_t num_skipped = 0;
-  ParsedInternalKey ikey;
-  FindParseableKey(&ikey, kReverse);
-  int cmp;
-  while (iter_->Valid() &&
-         ((cmp = user_comparator_->Compare(ikey.user_key,
-                                           saved_key_.GetUserKey())) == 0 ||
-          (cmp > 0 && ikey.sequence > sequence_))) {
-    if (TooManyInternalKeysSkipped()) {
-      return;
+  while (iter_->Valid()) {
+    ParsedInternalKey ikey;
+    if (!ParseKey(&ikey)) {
+      return false;
     }
 
-    if (cmp == 0) {
-      if (num_skipped >= max_skip_) {
-        num_skipped = 0;
-        IterKey last_key;
-        last_key.SetInternalKey(ParsedInternalKey(
-            saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
-        iter_->Seek(last_key.GetInternalKey());
-        RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
-      } else {
-        ++num_skipped;
-      }
+    if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
+      return true;
     }
-    if (ikey.sequence > sequence_) {
+
+    if (TooManyInternalKeysSkipped()) {
+      return false;
+    }
+
+    assert(ikey.sequence != kMaxSequenceNumber);
+    if (!IsVisible(ikey.sequence)) {
       PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
     } else {
       PERF_COUNTER_ADD(internal_key_skipped_count, 1);
     }
+
+    if (num_skipped >= max_skip_ && CanReseekToSkip()) {
+      num_skipped = 0;
+      IterKey last_key;
+      last_key.SetInternalKey(ParsedInternalKey(
+          saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
+      // It would be more efficient to use SeekForPrev() here, but some
+      // iterators may not support it.
+      iter_->Seek(last_key.GetInternalKey());
+      RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
+      if (!iter_->Valid()) {
+        break;
+      }
+    } else {
+      ++num_skipped;
+    }
+
     iter_->Prev();
-    FindParseableKey(&ikey, kReverse);
   }
+
+  if (!iter_->status().ok()) {
+    valid_ = false;
+    return false;
+  }
+
+  return true;
 }
 
 bool DBIter::TooManyInternalKeysSkipped(bool increment) {
@@ -976,28 +1245,51 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) {
   return false;
 }
 
-// Skip all unparseable keys
-void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
-  while (iter_->Valid() && !ParseKey(ikey)) {
-    if (direction == kReverse) {
-      iter_->Prev();
-    } else {
-      iter_->Next();
-    }
+bool DBIter::IsVisible(SequenceNumber sequence) {
+  return sequence <= MaxVisibleSequenceNumber() &&
+         (read_callback_ == nullptr || read_callback_->IsVisible(sequence));
+}
+
+bool DBIter::CanReseekToSkip() {
+  return read_callback_ == nullptr ||
+         read_callback_->MaxUnpreparedSequenceNumber() == 0;
+}
+
+SequenceNumber DBIter::MaxVisibleSequenceNumber() {
+  if (read_callback_ == nullptr) {
+    return sequence_;
   }
+
+  return std::max(sequence_, read_callback_->MaxUnpreparedSequenceNumber());
 }
 
 void DBIter::Seek(const Slice& target) {
   StopWatch sw(env_, statistics_, DB_SEEK);
+  status_ = Status::OK();
   ReleaseTempPinnedData();
   ResetInternalKeysSkippedCounter();
+
+  SequenceNumber seq = MaxVisibleSequenceNumber();
   saved_key_.Clear();
-  saved_key_.SetInternalKey(target, sequence_);
+  saved_key_.SetInternalKey(target, seq);
+
+#ifndef ROCKSDB_LITE
+  if (db_impl_ != nullptr && cfd_ != nullptr) {
+    db_impl_->TraceIteratorSeek(cfd_->GetID(), target);
+  }
+#endif  // ROCKSDB_LITE
+
+  if (iterate_lower_bound_ != nullptr &&
+      user_comparator_->Compare(saved_key_.GetUserKey(),
+                                *iterate_lower_bound_) < 0) {
+    saved_key_.Clear();
+    saved_key_.SetInternalKey(*iterate_lower_bound_, seq);
+  }
 
   {
     PERF_TIMER_GUARD(seek_internal_seek_time);
     iter_->Seek(saved_key_.GetInternalKey());
-    range_del_agg_.InvalidateTombstoneMapPositions();
+    range_del_agg_.InvalidateRangeDelMapPositions();
   }
   RecordTick(statistics_, NUMBER_DB_SEEK);
   if (iter_->Valid()) {
@@ -1012,8 +1304,10 @@ void DBIter::Seek(const Slice& target) {
     }
     if (statistics_ != nullptr) {
       if (valid_) {
+        // Decrement since we don't want to count this key as skipped
         RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
         RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
+        PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
       }
     }
   } else {
@@ -1028,6 +1322,7 @@ void DBIter::Seek(const Slice& target) {
 
 void DBIter::SeekForPrev(const Slice& target) {
   StopWatch sw(env_, statistics_, DB_SEEK);
+  status_ = Status::OK();
   ReleaseTempPinnedData();
   ResetInternalKeysSkippedCounter();
   saved_key_.Clear();
@@ -1035,12 +1330,25 @@ void DBIter::SeekForPrev(const Slice& target) {
   saved_key_.SetInternalKey(target, 0 /* sequence_number */,
                             kValueTypeForSeekForPrev);
 
+  if (iterate_upper_bound_ != nullptr &&
+      user_comparator_->Compare(saved_key_.GetUserKey(),
+                                *iterate_upper_bound_) >= 0) {
+    saved_key_.Clear();
+    saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
+  }
+
   {
     PERF_TIMER_GUARD(seek_internal_seek_time);
     iter_->SeekForPrev(saved_key_.GetInternalKey());
-    range_del_agg_.InvalidateTombstoneMapPositions();
+    range_del_agg_.InvalidateRangeDelMapPositions();
   }
 
+#ifndef ROCKSDB_LITE
+  if (db_impl_ != nullptr && cfd_ != nullptr) {
+    db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target);
+  }
+#endif  // ROCKSDB_LITE
+
   RecordTick(statistics_, NUMBER_DB_SEEK);
   if (iter_->Valid()) {
     if (prefix_extractor_ && prefix_same_as_start_) {
@@ -1056,6 +1364,7 @@ void DBIter::SeekForPrev(const Slice& target) {
       if (valid_) {
         RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
         RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
+        PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
       }
     }
   } else {
@@ -1068,11 +1377,16 @@ void DBIter::SeekForPrev(const Slice& target) {
 }
 
 void DBIter::SeekToFirst() {
+  if (iterate_lower_bound_ != nullptr) {
+    Seek(*iterate_lower_bound_);
+    return;
+  }
   // Don't use iter_::Seek() if we set a prefix extractor
   // because prefix seek will be used.
-  if (prefix_extractor_ != nullptr) {
+  if (prefix_extractor_ != nullptr && !total_order_seek_) {
     max_skip_ = std::numeric_limits<uint64_t>::max();
   }
+  status_ = Status::OK();
   direction_ = kForward;
   ReleaseTempPinnedData();
   ResetInternalKeysSkippedCounter();
@@ -1081,7 +1395,7 @@ void DBIter::SeekToFirst() {
   {
     PERF_TIMER_GUARD(seek_internal_seek_time);
     iter_->SeekToFirst();
-    range_del_agg_.InvalidateTombstoneMapPositions();
+    range_del_agg_.InvalidateRangeDelMapPositions();
   }
 
   RecordTick(statistics_, NUMBER_DB_SEEK);
@@ -1094,6 +1408,7 @@ void DBIter::SeekToFirst() {
       if (valid_) {
         RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
         RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
+        PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
       }
     }
   } else {
@@ -1107,11 +1422,22 @@ void DBIter::SeekToFirst() {
 }
 
 void DBIter::SeekToLast() {
+  if (iterate_upper_bound_ != nullptr) {
+    // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
+    SeekForPrev(*iterate_upper_bound_);
+    if (Valid() && user_comparator_->Equal(*iterate_upper_bound_, key())) {
+      ReleaseTempPinnedData();
+      PrevInternal();
+    }
+    return;
+  }
+
   // Don't use iter_::Seek() if we set a prefix extractor
   // because prefix seek will be used.
-  if (prefix_extractor_ != nullptr) {
+  if (prefix_extractor_ != nullptr && !total_order_seek_) {
     max_skip_ = std::numeric_limits<uint64_t>::max();
   }
+  status_ = Status::OK();
   direction_ = kReverse;
   ReleaseTempPinnedData();
   ResetInternalKeysSkippedCounter();
@@ -1120,27 +1446,15 @@ void DBIter::SeekToLast() {
   {
     PERF_TIMER_GUARD(seek_internal_seek_time);
     iter_->SeekToLast();
-    range_del_agg_.InvalidateTombstoneMapPositions();
-  }
-  // When the iterate_upper_bound is set to a value,
-  // it will seek to the last key before the
-  // ReadOptions.iterate_upper_bound
-  if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
-    SeekForPrev(*iterate_upper_bound_);
-    range_del_agg_.InvalidateTombstoneMapPositions();
-    if (!Valid()) {
-      return;
-    } else if (user_comparator_->Equal(*iterate_upper_bound_, key())) {
-      Prev();
-    }
-  } else {
-    PrevInternal();
+    range_del_agg_.InvalidateRangeDelMapPositions();
   }
+  PrevInternal();
   if (statistics_ != nullptr) {
     RecordTick(statistics_, NUMBER_DB_SEEK);
     if (valid_) {
       RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
       RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
+      PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
     }
   }
   if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
@@ -1152,21 +1466,22 @@ void DBIter::SeekToLast() {
 
 Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
                         const ImmutableCFOptions& cf_options,
+                        const MutableCFOptions& mutable_cf_options,
                         const Comparator* user_key_comparator,
                         InternalIterator* internal_iter,
                         const SequenceNumber& sequence,
                         uint64_t max_sequential_skip_in_iterations,
-                        uint64_t version_number) {
+                        ReadCallback* read_callback, DBImpl* db_impl,
+                        ColumnFamilyData* cfd, bool allow_blob) {
   DBIter* db_iter = new DBIter(
-      env, read_options, cf_options, user_key_comparator, internal_iter,
-      sequence, false, max_sequential_skip_in_iterations, version_number);
+      env, read_options, cf_options, mutable_cf_options, user_key_comparator,
+      internal_iter, sequence, false, max_sequential_skip_in_iterations,
+      read_callback, db_impl, cfd, allow_blob);
   return db_iter;
 }
 
 ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
 
-void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }
-
 RangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() {
   return db_iter_->GetRangeDelAggregator();
 }
@@ -1189,28 +1504,84 @@ inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
 inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
 inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
 inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
+bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); }
 inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
                                               std::string* prop) {
+  if (prop_name == "rocksdb.iterator.super-version-number") {
+    // First try to pass the value returned from inner iterator.
+    if (!db_iter_->GetProperty(prop_name, prop).ok()) {
+      *prop = ToString(sv_number_);
+    }
+    return Status::OK();
+  }
   return db_iter_->GetProperty(prop_name, prop);
 }
-void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
-                                         void* arg2) {
-  db_iter_->RegisterCleanup(function, arg1, arg2);
+
+void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
+                              const ImmutableCFOptions& cf_options,
+                              const MutableCFOptions& mutable_cf_options,
+                              const SequenceNumber& sequence,
+                              uint64_t max_sequential_skip_in_iteration,
+                              uint64_t version_number,
+                              ReadCallback* read_callback, DBImpl* db_impl,
+                              ColumnFamilyData* cfd, bool allow_blob,
+                              bool allow_refresh) {
+  auto mem = arena_.AllocateAligned(sizeof(DBIter));
+  db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options,
+                              cf_options.user_comparator, nullptr, sequence,
+                              true, max_sequential_skip_in_iteration,
+                              read_callback, db_impl, cfd, allow_blob);
+  sv_number_ = version_number;
+  allow_refresh_ = allow_refresh;
+}
+
+Status ArenaWrappedDBIter::Refresh() {
+  if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
+    return Status::NotSupported("Creating renew iterator is not allowed.");
+  }
+  assert(db_iter_ != nullptr);
+  // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
+  // correct behavior. Will be corrected automatically when we take a snapshot
+  // here for the case of WritePreparedTxnDB.
+  SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
+  uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
+  if (sv_number_ != cur_sv_number) {
+    Env* env = db_iter_->env();
+    db_iter_->~DBIter();
+    arena_.~Arena();
+    new (&arena_) Arena();
+
+    SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
+    Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
+         latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
+         cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,
+         allow_refresh_);
+
+    InternalIterator* internal_iter = db_impl_->NewInternalIterator(
+        read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator());
+    SetIterUnderDBIter(internal_iter);
+  } else {
+    db_iter_->set_sequence(latest_seq);
+    db_iter_->set_valid(false);
+  }
+  return Status::OK();
 }
 
 ArenaWrappedDBIter* NewArenaWrappedDbIterator(
     Env* env, const ReadOptions& read_options,
-    const ImmutableCFOptions& cf_options, const Comparator* user_key_comparator,
-    const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
-    uint64_t version_number) {
+    const ImmutableCFOptions& cf_options,
+    const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
+    uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
+    ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
+    bool allow_blob, bool allow_refresh) {
   ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
-  Arena* arena = iter->GetArena();
-  auto mem = arena->AllocateAligned(sizeof(DBIter));
-  DBIter* db_iter = new (mem)
-      DBIter(env, read_options, cf_options, user_key_comparator, nullptr,
-             sequence, true, max_sequential_skip_in_iterations, version_number);
-
-  iter->SetDBIter(db_iter);
+  iter->Init(env, read_options, cf_options, mutable_cf_options, sequence,
+             max_sequential_skip_in_iterations, version_number, read_callback,
+             db_impl, cfd, allow_blob, allow_refresh);
+  if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
+    iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback,
+                           allow_blob);
+  }
 
   return iter;
 }