]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_iter.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_iter.cc
index bfe7c721382c252cf7bc0f612e37118b02c9b517..e1375deb7d2130730fb33a092e79d1c826d243d7 100644 (file)
@@ -8,14 +8,16 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include "db/db_iter.h"
-#include <string>
+
 #include <iostream>
 #include <limits>
+#include <string>
 
 #include "db/dbformat.h"
 #include "db/merge_context.h"
 #include "db/merge_helper.h"
 #include "db/pinned_iterators_manager.h"
+#include "db/wide/wide_column_serialization.h"
 #include "file/filename.h"
 #include "logging/logging.h"
 #include "memory/arena.h"
@@ -24,6 +26,7 @@
 #include "rocksdb/iterator.h"
 #include "rocksdb/merge_operator.h"
 #include "rocksdb/options.h"
+#include "rocksdb/system_clock.h"
 #include "table/internal_iterator.h"
 #include "table/iterator_wrapper.h"
 #include "trace_replay/trace_replay.h"
 namespace ROCKSDB_NAMESPACE {
 
 DBIter::DBIter(Env* _env, const ReadOptions& read_options,
-               const ImmutableCFOptions& cf_options,
+               const ImmutableOptions& ioptions,
                const MutableCFOptions& mutable_cf_options,
-               const Comparator* cmp, InternalIterator* iter, SequenceNumber s,
-               bool arena_mode, uint64_t max_sequential_skip_in_iterations,
+               const Comparator* cmp, InternalIterator* iter,
+               const Version* version, SequenceNumber s, bool arena_mode,
+               uint64_t max_sequential_skip_in_iterations,
                ReadCallback* read_callback, DBImpl* db_impl,
-               ColumnFamilyData* cfd, bool allow_blob)
+               ColumnFamilyData* cfd, bool expose_blob_index)
     : prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
       env_(_env),
-      logger_(cf_options.info_log),
+      clock_(ioptions.clock),
+      logger_(ioptions.logger),
       user_comparator_(cmp),
-      merge_operator_(cf_options.merge_operator),
+      merge_operator_(ioptions.merge_operator.get()),
       iter_(iter),
+      version_(version),
       read_callback_(read_callback),
       sequence_(s),
-      statistics_(cf_options.statistics),
+      statistics_(ioptions.stats),
       max_skip_(max_sequential_skip_in_iterations),
       max_skippable_internal_keys_(read_options.max_skippable_internal_keys),
       num_internal_keys_skipped_(0),
@@ -65,13 +71,14 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
       expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
                                      read_options.total_order_seek ||
                                      read_options.auto_prefix_mode),
-      allow_blob_(allow_blob),
+      read_tier_(read_options.read_tier),
+      fill_cache_(read_options.fill_cache),
+      verify_checksums_(read_options.verify_checksums),
+      expose_blob_index_(expose_blob_index),
       is_blob_(false),
       arena_mode_(arena_mode),
-      range_del_agg_(&cf_options.internal_comparator, s),
       db_impl_(db_impl),
       cfd_(cfd),
-      start_seqnum_(read_options.iter_start_seqnum),
       timestamp_ub_(read_options.timestamp),
       timestamp_lb_(read_options.iter_start_ts),
       timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
@@ -82,7 +89,9 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
   if (iter_.iter()) {
     iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
   }
-  assert(timestamp_size_ == user_comparator_.timestamp_size());
+  status_.PermitUncheckedError();
+  assert(timestamp_size_ ==
+         user_comparator_.user_comparator()->timestamp_size());
 }
 
 Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
@@ -107,8 +116,7 @@ Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
 }
 
 bool DBIter::ParseKey(ParsedInternalKey* ikey) {
-  Status s =
-      ParseInternalKey(iter_.key(), ikey, false /* log_err_key */);  // TODO
+  Status s = ParseInternalKey(iter_.key(), ikey, false /* log_err_key */);
   if (!s.ok()) {
     status_ = Status::Corruption("In DBIter: ", s.getState());
     valid_ = false;
@@ -123,9 +131,11 @@ void DBIter::Next() {
   assert(valid_);
   assert(status_.ok());
 
-  PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, env_);
+  PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, clock_);
   // Release temporarily pinned blocks from last operation
   ReleaseTempPinnedData();
+  ResetBlobValue();
+  ResetValueAndColumns();
   local_stats_.skip_count_ += num_internal_keys_skipped_;
   local_stats_.skip_count_--;
   num_internal_keys_skipped_ = 0;
@@ -148,6 +158,8 @@ void DBIter::Next() {
 
   local_stats_.next_count_++;
   if (ok && iter_.Valid()) {
+    ClearSavedValue();
+
     if (prefix_same_as_start_) {
       assert(prefix_extractor_ != nullptr);
       const Slice prefix = prefix_.GetUserKey();
@@ -165,6 +177,65 @@ void DBIter::Next() {
   }
 }
 
+bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
+                                  const Slice& blob_index) {
+  assert(!is_blob_);
+  assert(blob_value_.empty());
+
+  if (expose_blob_index_) {  // Stacked BlobDB implementation
+    is_blob_ = true;
+    return true;
+  }
+
+  if (!version_) {
+    status_ = Status::Corruption("Encountered unexpected blob index.");
+    valid_ = false;
+    return false;
+  }
+
+  // TODO: consider moving ReadOptions from ArenaWrappedDBIter to DBIter to
+  // avoid having to copy options back and forth.
+  ReadOptions read_options;
+  read_options.read_tier = read_tier_;
+  read_options.fill_cache = fill_cache_;
+  read_options.verify_checksums = verify_checksums_;
+
+  constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
+  constexpr uint64_t* bytes_read = nullptr;
+
+  const Status s = version_->GetBlob(read_options, user_key, blob_index,
+                                     prefetch_buffer, &blob_value_, bytes_read);
+
+  if (!s.ok()) {
+    status_ = s;
+    valid_ = false;
+    return false;
+  }
+
+  is_blob_ = true;
+  return true;
+}
+
+bool DBIter::SetValueAndColumnsFromEntity(Slice slice) {
+  assert(value_.empty());
+  assert(wide_columns_.empty());
+
+  const Status s = WideColumnSerialization::Deserialize(slice, wide_columns_);
+
+  if (!s.ok()) {
+    status_ = s;
+    valid_ = false;
+    return false;
+  }
+
+  if (!wide_columns_.empty() &&
+      wide_columns_[0].name() == kDefaultWideColumnName) {
+    value_ = wide_columns_[0].value();
+  }
+
+  return true;
+}
+
 // PRE: saved_key_ has the current user key if skipping_saved_key
 // POST: saved_key_ should have the next user key if valid_,
 //       if the current entry is a result of merge
@@ -211,8 +282,6 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
   // to one.
   bool reseek_done = false;
 
-  is_blob_ = false;
-
   do {
     // Will update is_key_seqnum_zero_ as soon as we parsed the current key
     // but we need to save the previous value to be used in the loop.
@@ -221,25 +290,28 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
       is_key_seqnum_zero_ = false;
       return false;
     }
+    Slice user_key_without_ts =
+        StripTimestampFromUserKey(ikey_.user_key, timestamp_size_);
 
     is_key_seqnum_zero_ = (ikey_.sequence == 0);
 
     assert(iterate_upper_bound_ == nullptr ||
            iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound ||
            user_comparator_.CompareWithoutTimestamp(
-               ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
+               user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
                /*b_has_ts=*/false) < 0);
     if (iterate_upper_bound_ != nullptr &&
         iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound &&
         user_comparator_.CompareWithoutTimestamp(
-            ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
+            user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
             /*b_has_ts=*/false) >= 0) {
       break;
     }
 
     assert(prefix == nullptr || prefix_extractor_ != nullptr);
     if (prefix != nullptr &&
-        prefix_extractor_->Transform(ikey_.user_key).compare(*prefix) != 0) {
+        prefix_extractor_->Transform(user_key_without_ts).compare(*prefix) !=
+            0) {
       assert(prefix_same_as_start_);
       break;
     }
@@ -282,25 +354,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
           case kTypeSingleDeletion:
             // Arrange to skip all upcoming entries for this key since
             // they are hidden by this deletion.
-            // if iterartor specified start_seqnum we
-            // 1) return internal key, including the type
-            // 2) return ikey only if ikey.seqnum >= start_seqnum_
-            // note that if deletion seqnum is < start_seqnum_ we
-            // just skip it like in normal iterator.
-            if (start_seqnum_ > 0) {
-              if (ikey_.sequence >= start_seqnum_) {
-                saved_key_.SetInternalKey(ikey_);
-                valid_ = true;
-                return true;
-              } else {
-                saved_key_.SetUserKey(
-                    ikey_.user_key,
-                    !pin_thru_lifetime_ ||
-                        !iter_.iter()->IsKeyPinned() /* copy */);
-                skipping_saved_key = true;
-                PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
-              }
-            } else if (timestamp_lb_) {
+            if (timestamp_lb_) {
               saved_key_.SetInternalKey(ikey_);
               valid_ = true;
               return true;
@@ -314,75 +368,42 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
             break;
           case kTypeValue:
           case kTypeBlobIndex:
-            if (start_seqnum_ > 0) {
-              if (ikey_.sequence >= start_seqnum_) {
-                assert(ikey_.type != kTypeBlobIndex);
-                saved_key_.SetInternalKey(ikey_);
-                valid_ = true;
-                return true;
-              } else {
-                // this key and all previous versions shouldn't be included,
-                // skipping_saved_key
-                saved_key_.SetUserKey(
-                    ikey_.user_key,
-                    !pin_thru_lifetime_ ||
-                        !iter_.iter()->IsKeyPinned() /* copy */);
-                skipping_saved_key = true;
-              }
-            } else if (timestamp_lb_) {
+          case kTypeWideColumnEntity:
+            if (timestamp_lb_) {
               saved_key_.SetInternalKey(ikey_);
-              valid_ = true;
-              return true;
             } else {
               saved_key_.SetUserKey(
                   ikey_.user_key, !pin_thru_lifetime_ ||
                                       !iter_.iter()->IsKeyPinned() /* copy */);
-              if (range_del_agg_.ShouldDelete(
-                      ikey_, RangeDelPositioningMode::kForwardTraversal)) {
-                // Arrange to skip all upcoming entries for this key since
-                // they are hidden by this deletion.
-                skipping_saved_key = true;
-                num_skipped = 0;
-                reseek_done = false;
-                PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
-              } else if (ikey_.type == kTypeBlobIndex) {
-                if (!allow_blob_) {
-                  ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
-                  status_ = Status::NotSupported(
-                      "Encounter unexpected blob index. Please open DB with "
-                      "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
-                  valid_ = false;
-                  return false;
-                }
-
-                is_blob_ = true;
-                valid_ = true;
-                return true;
-              } else {
-                valid_ = true;
-                return true;
+            }
+
+            if (ikey_.type == kTypeBlobIndex) {
+              if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
+                return false;
+              }
+
+              SetValueAndColumnsFromPlain(expose_blob_index_ ? iter_.value()
+                                                             : blob_value_);
+            } else if (ikey_.type == kTypeWideColumnEntity) {
+              if (!SetValueAndColumnsFromEntity(iter_.value())) {
+                return false;
               }
+            } else {
+              assert(ikey_.type == kTypeValue);
+              SetValueAndColumnsFromPlain(iter_.value());
             }
+
+            valid_ = true;
+            return true;
             break;
           case kTypeMerge:
             saved_key_.SetUserKey(
                 ikey_.user_key,
                 !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
-            if (range_del_agg_.ShouldDelete(
-                    ikey_, RangeDelPositioningMode::kForwardTraversal)) {
-              // Arrange to skip all upcoming entries for this key since
-              // they are hidden by this deletion.
-              skipping_saved_key = true;
-              num_skipped = 0;
-              reseek_done = false;
-              PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
-            } else {
-              // By now, we are sure the current ikey is going to yield a
-              // value
-              current_entry_is_merged_ = true;
-              valid_ = true;
-              return MergeValuesNewToOld();  // Go to a different state machine
-            }
+            // By now, we are sure the current ikey is going to yield a value
+            current_entry_is_merged_ = true;
+            valid_ = true;
+            return MergeValuesNewToOld();  // Go to a different state machine
             break;
           default:
             valid_ = false;
@@ -437,7 +458,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
               &last_key,
               ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
         } else {
-          const std::string kTsMin(timestamp_size_, static_cast<char>(0));
+          const std::string kTsMin(timestamp_size_, '\0');
           AppendInternalKeyWithDifferentTimestamp(
               &last_key,
               ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
@@ -498,20 +519,19 @@ bool DBIter::MergeValuesNewToOld() {
   TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
 
   ParsedInternalKey ikey;
-  Status s;
   for (iter_.Next(); iter_.Valid(); iter_.Next()) {
     TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
     if (!ParseKey(&ikey)) {
       return false;
     }
 
-    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
+    if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
+                                                saved_key_.GetUserKey())) {
       // hit the next user key, stop right here
       break;
     }
     if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
-               range_del_agg_.ShouldDelete(
-                   ikey, RangeDelPositioningMode::kForwardTraversal)) {
+        kTypeDeletionWithTimestamp == ikey.type) {
       // hit a delete with the same user key, stop right here
       // iter_ is positioned after delete
       iter_.Next();
@@ -526,12 +546,7 @@ bool DBIter::MergeValuesNewToOld() {
       // hit a put, merge the put value with operands and store the
       // final result in saved_value_. We are done!
       const Slice val = iter_.value();
-      s = MergeHelper::TimedFullMerge(
-          merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
-          &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
-      if (!s.ok()) {
-        valid_ = false;
-        status_ = s;
+      if (!Merge(&val, ikey.user_key)) {
         return false;
       }
       // iter_ is positioned after put
@@ -548,17 +563,44 @@ bool DBIter::MergeValuesNewToOld() {
           iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
       PERF_COUNTER_ADD(internal_merge_count, 1);
     } else if (kTypeBlobIndex == ikey.type) {
-      if (!allow_blob_) {
-        ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
-        status_ = Status::NotSupported(
-            "Encounter unexpected blob index. Please open DB with "
-            "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
-      } else {
+      if (expose_blob_index_) {
         status_ =
-            Status::NotSupported("Blob DB does not support merge operator.");
+            Status::NotSupported("BlobDB does not support merge operator.");
+        valid_ = false;
+        return false;
       }
-      valid_ = false;
-      return false;
+      // hit a put, merge the put value with operands and store the
+      // final result in saved_value_. We are done!
+      if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) {
+        return false;
+      }
+      valid_ = true;
+      if (!Merge(&blob_value_, ikey.user_key)) {
+        return false;
+      }
+
+      ResetBlobValue();
+
+      // iter_ is positioned after put
+      iter_.Next();
+      if (!iter_.status().ok()) {
+        valid_ = false;
+        return false;
+      }
+      return true;
+    } else if (kTypeWideColumnEntity == ikey.type) {
+      if (!MergeEntity(iter_.value(), ikey.user_key)) {
+        return false;
+      }
+
+      // iter_ is positioned after put
+      iter_.Next();
+      if (!iter_.status().ok()) {
+        valid_ = false;
+        return false;
+      }
+
+      return true;
     } else {
       valid_ = false;
       status_ = Status::Corruption(
@@ -577,33 +619,21 @@ bool DBIter::MergeValuesNewToOld() {
   // a deletion marker.
   // feed null as the existing value to the merge operator, such that
   // client can differentiate this scenario and do things accordingly.
-  s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
-                                  nullptr, merge_context_.GetOperands(),
-                                  &saved_value_, logger_, statistics_, env_,
-                                  &pinned_value_, true);
-  if (!s.ok()) {
-    valid_ = false;
-    status_ = s;
+  if (!Merge(nullptr, saved_key_.GetUserKey())) {
     return false;
   }
-
   assert(status_.ok());
   return true;
 }
 
 void DBIter::Prev() {
-  if (timestamp_size_ > 0) {
-    valid_ = false;
-    status_ = Status::NotSupported(
-        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
-    return;
-  }
-
   assert(valid_);
   assert(status_.ok());
 
-  PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, env_);
+  PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, clock_);
   ReleaseTempPinnedData();
+  ResetBlobValue();
+  ResetValueAndColumns();
   ResetInternalKeysSkippedCounter();
   bool ok = true;
   if (direction_ == kForward) {
@@ -612,6 +642,8 @@ void DBIter::Prev() {
     }
   }
   if (ok) {
+    ClearSavedValue();
+
     Slice prefix;
     if (prefix_same_as_start_) {
       assert(prefix_extractor_ != nullptr);
@@ -637,9 +669,16 @@ bool DBIter::ReverseToForward() {
   // If that's the case, seek iter_ to current key.
   if (!expect_total_order_inner_iter() || !iter_.Valid()) {
     IterKey last_key;
-    last_key.SetInternalKey(ParsedInternalKey(
-        saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
+    ParsedInternalKey pikey(saved_key_.GetUserKey(), kMaxSequenceNumber,
+                            kValueTypeForSeek);
+    if (timestamp_size_ > 0) {
+      // TODO: pre-create kTsMax.
+      const std::string kTsMax(timestamp_size_, '\xff');
+      pikey.SetTimestamp(kTsMax);
+    }
+    last_key.SetInternalKey(pikey);
     iter_.Seek(last_key.GetInternalKey());
+    RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
   }
 
   direction_ = kForward;
@@ -690,6 +729,7 @@ bool DBIter::ReverseToBackward() {
         iter_.SeekToLast();
       }
     }
+    RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
   }
 
   direction_ = kReverse;
@@ -704,7 +744,9 @@ void DBIter::PrevInternal(const Slice* prefix) {
 
     assert(prefix == nullptr || prefix_extractor_ != nullptr);
     if (prefix != nullptr &&
-        prefix_extractor_->Transform(saved_key_.GetUserKey())
+        prefix_extractor_
+                ->Transform(StripTimestampFromUserKey(saved_key_.GetUserKey(),
+                                                      timestamp_size_))
                 .compare(*prefix) != 0) {
       assert(prefix_same_as_start_);
       // Current key does not have the same prefix as start
@@ -713,11 +755,13 @@ void DBIter::PrevInternal(const Slice* prefix) {
     }
 
     assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
-           user_comparator_.Compare(saved_key_.GetUserKey(),
-                                    *iterate_lower_bound_) >= 0);
+           user_comparator_.CompareWithoutTimestamp(
+               saved_key_.GetUserKey(), /*a_has_ts=*/true,
+               *iterate_lower_bound_, /*b_has_ts=*/false) >= 0);
     if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
-        user_comparator_.Compare(saved_key_.GetUserKey(),
-                                 *iterate_lower_bound_) < 0) {
+        user_comparator_.CompareWithoutTimestamp(
+            saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
+            /*b_has_ts=*/false) < 0) {
       // We've iterated earlier than the user-specified lower bound.
       valid_ = false;
       return;
@@ -762,11 +806,16 @@ bool DBIter::FindValueForCurrentKey() {
   assert(iter_.Valid());
   merge_context_.Clear();
   current_entry_is_merged_ = false;
-  // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
-  // kTypeValue)
+  // last entry before merge (could be kTypeDeletion,
+  // kTypeDeletionWithTimestamp, kTypeSingleDeletion, kTypeValue,
+  // kTypeBlobIndex, or kTypeWideColumnEntity)
   ValueType last_not_merge_type = kTypeDeletion;
   ValueType last_key_entry_type = kTypeDeletion;
 
+  // If false, it indicates that we have not seen any valid entry, even though
+  // last_key_entry_type is initialized to kTypeDeletion.
+  bool valid_entry_seen = false;
+
   // Temporarily pin blocks that hold (merge operands / the value)
   ReleaseTempPinnedData();
   TempPinData();
@@ -777,16 +826,33 @@ bool DBIter::FindValueForCurrentKey() {
       return false;
     }
 
+    if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
+                                                saved_key_.GetUserKey())) {
+      // Found a smaller user key, thus we are done with current user key.
+      break;
+    }
+
     assert(ikey.user_key.size() >= timestamp_size_);
     Slice ts;
     if (timestamp_size_ > 0) {
       ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
                  timestamp_size_);
     }
-    if (!IsVisible(ikey.sequence, ts) ||
-        !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
+
+    bool visible = IsVisible(ikey.sequence, ts);
+    if (!visible &&
+        (timestamp_lb_ == nullptr ||
+         user_comparator_.CompareTimestamp(ts, *timestamp_ub_) > 0)) {
+      // Found an invisible version of the current user key, and it must have
+      // a higher sequence number or timestamp. Therefore, we are done with the
+      // current user key.
       break;
     }
+
+    if (!ts.empty()) {
+      saved_timestamp_.assign(ts.data(), ts.size());
+    }
+
     if (TooManyInternalKeysSkipped()) {
       return false;
     }
@@ -803,15 +869,21 @@ bool DBIter::FindValueForCurrentKey() {
       return false;
     }
 
+    if (timestamp_lb_ != nullptr) {
+      // Only needed when timestamp_lb_ is not null
+      [[maybe_unused]] const bool ret = ParseKey(&ikey_);
+      saved_ikey_.assign(iter_.key().data(), iter_.key().size());
+      // Since the preceding ParseKey(&ikey) succeeds, so must this.
+      assert(ret);
+    }
+
+    valid_entry_seen = true;
     last_key_entry_type = ikey.type;
     switch (last_key_entry_type) {
       case kTypeValue:
       case kTypeBlobIndex:
-        if (range_del_agg_.ShouldDelete(
-                ikey, RangeDelPositioningMode::kBackwardTraversal)) {
-          last_key_entry_type = kTypeRangeDeletion;
-          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
-        } else if (iter_.iter()->IsValuePinned()) {
+      case kTypeWideColumnEntity:
+        if (iter_.iter()->IsValuePinned()) {
           pinned_value_ = iter_.value();
         } else {
           valid_ = false;
@@ -826,26 +898,18 @@ bool DBIter::FindValueForCurrentKey() {
         }
         break;
       case kTypeDeletion:
+      case kTypeDeletionWithTimestamp:
       case kTypeSingleDeletion:
         merge_context_.Clear();
         last_not_merge_type = last_key_entry_type;
         PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
         break;
-      case kTypeMerge:
-        if (range_del_agg_.ShouldDelete(
-                ikey, RangeDelPositioningMode::kBackwardTraversal)) {
-          merge_context_.Clear();
-          last_key_entry_type = kTypeRangeDeletion;
-          last_not_merge_type = last_key_entry_type;
-          PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
-        } else {
-          assert(merge_operator_ != nullptr);
-          merge_context_.PushOperandBack(
-              iter_.value(),
-              iter_.iter()->IsValuePinned() /* operand_pinned */);
-          PERF_COUNTER_ADD(internal_merge_count, 1);
-        }
-        break;
+      case kTypeMerge: {
+        assert(merge_operator_ != nullptr);
+        merge_context_.PushOperandBack(
+            iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
+        PERF_COUNTER_ADD(internal_merge_count, 1);
+      } break;
       default:
         valid_ = false;
         status_ = Status::Corruption(
@@ -857,6 +921,14 @@ bool DBIter::FindValueForCurrentKey() {
     PERF_COUNTER_ADD(internal_key_skipped_count, 1);
     iter_.Prev();
     ++num_skipped;
+
+    if (visible && timestamp_lb_ != nullptr) {
+      // If timestamp_lb_ is not nullptr, we do not have to look further for
+      // another internal key. We can return this current internal key. Yet we
+      // still keep the invariant that iter_ is positioned before the returned
+      // key.
+      break;
+    }
   }
 
   if (!iter_.status().ok()) {
@@ -864,57 +936,95 @@ bool DBIter::FindValueForCurrentKey() {
     return false;
   }
 
+  if (!valid_entry_seen) {
+    // Since we haven't seen any valid entry, last_key_entry_type remains
+    // unchanged and the same as its initial value.
+    assert(last_key_entry_type == kTypeDeletion);
+    assert(last_not_merge_type == kTypeDeletion);
+    valid_ = false;
+    return true;
+  }
+
+  if (timestamp_lb_ != nullptr) {
+    assert(last_key_entry_type == ikey_.type);
+  }
+
   Status s;
   s.PermitUncheckedError();
-  is_blob_ = false;
+
   switch (last_key_entry_type) {
     case kTypeDeletion:
+    case kTypeDeletionWithTimestamp:
     case kTypeSingleDeletion:
-    case kTypeRangeDeletion:
-      valid_ = false;
+      if (timestamp_lb_ == nullptr) {
+        valid_ = false;
+      } else {
+        saved_key_.SetInternalKey(saved_ikey_);
+        valid_ = true;
+      }
       return true;
     case kTypeMerge:
       current_entry_is_merged_ = true;
       if (last_not_merge_type == kTypeDeletion ||
           last_not_merge_type == kTypeSingleDeletion ||
-          last_not_merge_type == kTypeRangeDeletion) {
-        s = MergeHelper::TimedFullMerge(
-            merge_operator_, saved_key_.GetUserKey(), nullptr,
-            merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
-            env_, &pinned_value_, true);
+          last_not_merge_type == kTypeDeletionWithTimestamp) {
+        if (!Merge(nullptr, saved_key_.GetUserKey())) {
+          return false;
+        }
+        return true;
       } else if (last_not_merge_type == kTypeBlobIndex) {
-        if (!allow_blob_) {
-          ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
-          status_ = Status::NotSupported(
-              "Encounter unexpected blob index. Please open DB with "
-              "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
-        } else {
+        if (expose_blob_index_) {
           status_ =
-              Status::NotSupported("Blob DB does not support merge operator.");
+              Status::NotSupported("BlobDB does not support merge operator.");
+          valid_ = false;
+          return false;
         }
-        valid_ = false;
-        return false;
+        if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
+          return false;
+        }
+        valid_ = true;
+        if (!Merge(&blob_value_, saved_key_.GetUserKey())) {
+          return false;
+        }
+
+        ResetBlobValue();
+
+        return true;
+      } else if (last_not_merge_type == kTypeWideColumnEntity) {
+        if (!MergeEntity(pinned_value_, saved_key_.GetUserKey())) {
+          return false;
+        }
+
+        return true;
       } else {
         assert(last_not_merge_type == kTypeValue);
-        s = MergeHelper::TimedFullMerge(
-            merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
-            merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
-            env_, &pinned_value_, true);
+        if (!Merge(&pinned_value_, saved_key_.GetUserKey())) {
+          return false;
+        }
+        return true;
       }
       break;
     case kTypeValue:
-      // do nothing - we've already has value in pinned_value_
+      if (timestamp_lb_ != nullptr) {
+        saved_key_.SetInternalKey(saved_ikey_);
+      }
+
+      SetValueAndColumnsFromPlain(pinned_value_);
+
       break;
     case kTypeBlobIndex:
-      if (!allow_blob_) {
-        ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
-        status_ = Status::NotSupported(
-            "Encounter unexpected blob index. Please open DB with "
-            "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
-        valid_ = false;
+      if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
+        return false;
+      }
+
+      SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_
+                                                     : blob_value_);
+
+      break;
+    case kTypeWideColumnEntity:
+      if (!SetValueAndColumnsFromEntity(pinned_value_)) {
         return false;
       }
-      is_blob_ = true;
       break;
     default:
       valid_ = false;
@@ -941,15 +1051,24 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
   // FindValueForCurrentKeyUsingSeek()
   assert(pinned_iters_mgr_.PinningEnabled());
   std::string last_key;
-  AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
-                                                 sequence_, kValueTypeForSeek));
+  if (0 == timestamp_size_) {
+    AppendInternalKey(&last_key,
+                      ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
+                                        kValueTypeForSeek));
+  } else {
+    AppendInternalKeyWithDifferentTimestamp(
+        &last_key,
+        ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
+                          kValueTypeForSeek),
+        timestamp_lb_ == nullptr ? *timestamp_ub_ : *timestamp_lb_);
+  }
   iter_.Seek(last_key);
   RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
 
   // In case read_callback presents, the value we seek to may not be visible.
   // Find the next value that's visible.
   ParsedInternalKey ikey;
-  is_blob_ = false;
+
   while (true) {
     if (!iter_.Valid()) {
       valid_ = false;
@@ -966,7 +1085,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
                  timestamp_size_);
     }
 
-    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
+    if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
+                                                saved_key_.GetUserKey())) {
       // No visible values for this key, even though FindValueForCurrentKey()
       // has seen some. This is possible if we're using a tailing iterator, and
       // the entries were discarded in a compaction.
@@ -982,27 +1102,47 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
   }
 
   if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
-      range_del_agg_.ShouldDelete(
-          ikey, RangeDelPositioningMode::kBackwardTraversal)) {
-    valid_ = false;
+      kTypeDeletionWithTimestamp == ikey.type) {
+    if (timestamp_lb_ == nullptr) {
+      valid_ = false;
+    } else {
+      valid_ = true;
+      saved_key_.SetInternalKey(ikey);
+    }
     return true;
   }
-  if (ikey.type == kTypeBlobIndex && !allow_blob_) {
-    ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
-    status_ = Status::NotSupported(
-        "Encounter unexpected blob index. Please open DB with "
-        "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
-    valid_ = false;
-    return false;
-  }
   if (!iter_.PrepareValue()) {
     valid_ = false;
     return false;
   }
-  if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
+  if (timestamp_size_ > 0) {
+    Slice ts = ExtractTimestampFromUserKey(ikey.user_key, timestamp_size_);
+    saved_timestamp_.assign(ts.data(), ts.size());
+  }
+  if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex ||
+      ikey.type == kTypeWideColumnEntity) {
     assert(iter_.iter()->IsValuePinned());
     pinned_value_ = iter_.value();
-    is_blob_ = (ikey.type == kTypeBlobIndex);
+    if (ikey.type == kTypeBlobIndex) {
+      if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
+        return false;
+      }
+
+      SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_
+                                                     : blob_value_);
+    } else if (ikey.type == kTypeWideColumnEntity) {
+      if (!SetValueAndColumnsFromEntity(pinned_value_)) {
+        return false;
+      }
+    } else {
+      assert(ikey.type == kTypeValue);
+      SetValueAndColumnsFromPlain(pinned_value_);
+    }
+
+    if (timestamp_lb_ != nullptr) {
+      saved_key_.SetInternalKey(ikey);
+    }
+
     valid_ = true;
     return true;
   }
@@ -1027,12 +1167,12 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
     if (!ParseKey(&ikey)) {
       return false;
     }
-    if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
+    if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
+                                                saved_key_.GetUserKey())) {
       break;
     }
     if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
-        range_del_agg_.ShouldDelete(
-            ikey, RangeDelPositioningMode::kForwardTraversal)) {
+        ikey.type == kTypeDeletionWithTimestamp) {
       break;
     }
     if (!iter_.PrepareValue()) {
@@ -1042,33 +1182,38 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
 
     if (ikey.type == kTypeValue) {
       const Slice val = iter_.value();
-      Status s = MergeHelper::TimedFullMerge(
-          merge_operator_, saved_key_.GetUserKey(), &val,
-          merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
-          env_, &pinned_value_, true);
-      if (!s.ok()) {
-        valid_ = false;
-        status_ = s;
+      if (!Merge(&val, saved_key_.GetUserKey())) {
         return false;
       }
-      valid_ = true;
       return true;
     } else if (ikey.type == kTypeMerge) {
       merge_context_.PushOperand(
           iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
       PERF_COUNTER_ADD(internal_merge_count, 1);
     } else if (ikey.type == kTypeBlobIndex) {
-      if (!allow_blob_) {
-        ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
-        status_ = Status::NotSupported(
-            "Encounter unexpected blob index. Please open DB with "
-            "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
-      } else {
+      if (expose_blob_index_) {
         status_ =
-            Status::NotSupported("Blob DB does not support merge operator.");
+            Status::NotSupported("BlobDB does not support merge operator.");
+        valid_ = false;
+        return false;
       }
-      valid_ = false;
-      return false;
+      if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) {
+        return false;
+      }
+      valid_ = true;
+      if (!Merge(&blob_value_, saved_key_.GetUserKey())) {
+        return false;
+      }
+
+      ResetBlobValue();
+
+      return true;
+    } else if (ikey.type == kTypeWideColumnEntity) {
+      if (!MergeEntity(iter_.value(), saved_key_.GetUserKey())) {
+        return false;
+      }
+
+      return true;
     } else {
       valid_ = false;
       status_ = Status::Corruption(
@@ -1078,13 +1223,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
     }
   }
 
-  Status s = MergeHelper::TimedFullMerge(
-      merge_operator_, saved_key_.GetUserKey(), nullptr,
-      merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
-      &pinned_value_, true);
-  if (!s.ok()) {
-    valid_ = false;
-    status_ = s;
+  if (!Merge(nullptr, saved_key_.GetUserKey())) {
     return false;
   }
 
@@ -1107,6 +1246,43 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
   return true;
 }
 
+bool DBIter::Merge(const Slice* val, const Slice& user_key) {
+  Status s = MergeHelper::TimedFullMerge(
+      merge_operator_, user_key, val, merge_context_.GetOperands(),
+      &saved_value_, logger_, statistics_, clock_, &pinned_value_,
+      /* update_num_ops_stats */ true);
+  if (!s.ok()) {
+    valid_ = false;
+    status_ = s;
+    return false;
+  }
+
+  SetValueAndColumnsFromPlain(pinned_value_.data() ? pinned_value_
+                                                   : saved_value_);
+
+  valid_ = true;
+  return true;
+}
+
+bool DBIter::MergeEntity(const Slice& entity, const Slice& user_key) {
+  Status s = MergeHelper::TimedFullMergeWithEntity(
+      merge_operator_, user_key, entity, merge_context_.GetOperands(),
+      &saved_value_, logger_, statistics_, clock_,
+      /* update_num_ops_stats */ true);
+  if (!s.ok()) {
+    valid_ = false;
+    status_ = s;
+    return false;
+  }
+
+  if (!SetValueAndColumnsFromEntity(saved_value_)) {
+    return false;
+  }
+
+  valid_ = true;
+  return true;
+}
+
 // Move backwards until the key smaller than saved_key_.
 // Changes valid_ only if return value is false.
 bool DBIter::FindUserKeyBeforeSavedKey() {
@@ -1118,7 +1294,7 @@ bool DBIter::FindUserKeyBeforeSavedKey() {
       return false;
     }
 
-    if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
+    if (CompareKeyForSkip(ikey.user_key, saved_key_.GetUserKey()) < 0) {
       return true;
     }
 
@@ -1142,8 +1318,14 @@ bool DBIter::FindUserKeyBeforeSavedKey() {
     if (num_skipped >= max_skip_) {
       num_skipped = 0;
       IterKey last_key;
-      last_key.SetInternalKey(ParsedInternalKey(
-          saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
+      ParsedInternalKey pikey(saved_key_.GetUserKey(), kMaxSequenceNumber,
+                              kValueTypeForSeek);
+      if (timestamp_size_ > 0) {
+        // TODO: pre-create kTsMax.
+        const std::string kTsMax(timestamp_size_, '\xff');
+        pikey.SetTimestamp(kTsMax);
+      }
+      last_key.SetInternalKey(pikey);
       // It would be more efficient to use SeekForPrev() here, but some
       // iterators may not support it.
       iter_.Seek(last_key.GetInternalKey());
@@ -1220,29 +1402,60 @@ void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
   saved_key_.Clear();
   // now saved_key is used to store internal key.
   saved_key_.SetInternalKey(target, 0 /* sequence_number */,
-                            kValueTypeForSeekForPrev);
+                            kValueTypeForSeekForPrev, timestamp_ub_);
+
+  if (timestamp_size_ > 0) {
+    const std::string kTsMin(timestamp_size_, '\0');
+    Slice ts = kTsMin;
+    saved_key_.UpdateInternalKey(
+        /*seq=*/0, kValueTypeForSeekForPrev,
+        timestamp_lb_ == nullptr ? &ts : timestamp_lb_);
+  }
 
   if (iterate_upper_bound_ != nullptr &&
-      user_comparator_.Compare(saved_key_.GetUserKey(),
-                               *iterate_upper_bound_) >= 0) {
+      user_comparator_.CompareWithoutTimestamp(
+          saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_upper_bound_,
+          /*b_has_ts=*/false) >= 0) {
     saved_key_.Clear();
-    saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
+    saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber,
+                              kValueTypeForSeekForPrev, timestamp_ub_);
+    if (timestamp_size_ > 0) {
+      const std::string kTsMax(timestamp_size_, '\xff');
+      Slice ts = kTsMax;
+      saved_key_.UpdateInternalKey(
+          kMaxSequenceNumber, kValueTypeForSeekForPrev,
+          timestamp_lb_ != nullptr ? timestamp_lb_ : &ts);
+    }
   }
 }
 
 void DBIter::Seek(const Slice& target) {
-  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
-  StopWatch sw(env_, statistics_, DB_SEEK);
+  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
+  StopWatch sw(clock_, statistics_, DB_SEEK);
 
 #ifndef ROCKSDB_LITE
   if (db_impl_ != nullptr && cfd_ != nullptr) {
     // TODO: What do we do if this returns an error?
-    db_impl_->TraceIteratorSeek(cfd_->GetID(), target).PermitUncheckedError();
+    Slice lower_bound, upper_bound;
+    if (iterate_lower_bound_ != nullptr) {
+      lower_bound = *iterate_lower_bound_;
+    } else {
+      lower_bound = Slice("");
+    }
+    if (iterate_upper_bound_ != nullptr) {
+      upper_bound = *iterate_upper_bound_;
+    } else {
+      upper_bound = Slice("");
+    }
+    db_impl_->TraceIteratorSeek(cfd_->GetID(), target, lower_bound, upper_bound)
+        .PermitUncheckedError();
   }
 #endif  // ROCKSDB_LITE
 
   status_ = Status::OK();
   ReleaseTempPinnedData();
+  ResetBlobValue();
+  ResetValueAndColumns();
   ResetInternalKeysSkippedCounter();
 
   // Seek the inner iterator based on the target key.
@@ -1252,7 +1465,6 @@ void DBIter::Seek(const Slice& target) {
     SetSavedKeyToSeekTarget(target);
     iter_.Seek(saved_key_.GetInternalKey());
 
-    range_del_agg_.InvalidateRangeDelMapPositions();
     RecordTick(statistics_, NUMBER_DB_SEEK);
   }
   if (!iter_.Valid()) {
@@ -1265,7 +1477,7 @@ void DBIter::Seek(const Slice& target) {
   // we need to find out the next key that is visible to the user.
   ClearSavedValue();
   if (prefix_same_as_start_) {
-    // The case where the iterator needs to be invalidated if it has exausted
+    // The case where the iterator needs to be invalidated if it has exhausted
     // keys within the same prefix of the seek key.
     assert(prefix_extractor_ != nullptr);
     Slice target_prefix = prefix_extractor_->Transform(target);
@@ -1293,26 +1505,34 @@ void DBIter::Seek(const Slice& target) {
 }
 
 void DBIter::SeekForPrev(const Slice& target) {
-  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
-  StopWatch sw(env_, statistics_, DB_SEEK);
+  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
+  StopWatch sw(clock_, statistics_, DB_SEEK);
 
 #ifndef ROCKSDB_LITE
   if (db_impl_ != nullptr && cfd_ != nullptr) {
     // TODO: What do we do if this returns an error?
-    db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target)
+    Slice lower_bound, upper_bound;
+    if (iterate_lower_bound_ != nullptr) {
+      lower_bound = *iterate_lower_bound_;
+    } else {
+      lower_bound = Slice("");
+    }
+    if (iterate_upper_bound_ != nullptr) {
+      upper_bound = *iterate_upper_bound_;
+    } else {
+      upper_bound = Slice("");
+    }
+    db_impl_
+        ->TraceIteratorSeekForPrev(cfd_->GetID(), target, lower_bound,
+                                   upper_bound)
         .PermitUncheckedError();
   }
 #endif  // ROCKSDB_LITE
 
-  if (timestamp_size_ > 0) {
-    valid_ = false;
-    status_ = Status::NotSupported(
-        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
-    return;
-  }
-
   status_ = Status::OK();
   ReleaseTempPinnedData();
+  ResetBlobValue();
+  ResetValueAndColumns();
   ResetInternalKeysSkippedCounter();
 
   // Seek the inner iterator based on the target key.
@@ -1320,7 +1540,6 @@ void DBIter::SeekForPrev(const Slice& target) {
     PERF_TIMER_GUARD(seek_internal_seek_time);
     SetSavedKeyToSeekForPrevTarget(target);
     iter_.SeekForPrev(saved_key_.GetInternalKey());
-    range_del_agg_.InvalidateRangeDelMapPositions();
     RecordTick(statistics_, NUMBER_DB_SEEK);
   }
   if (!iter_.Valid()) {
@@ -1334,7 +1553,7 @@ void DBIter::SeekForPrev(const Slice& target) {
   // backward direction.
   ClearSavedValue();
   if (prefix_same_as_start_) {
-    // The case where the iterator needs to be invalidated if it has exausted
+    // The case where the iterator needs to be invalidated if it has exhausted
     // keys within the same prefix of the seek key.
     assert(prefix_extractor_ != nullptr);
     Slice target_prefix = prefix_extractor_->Transform(target);
@@ -1361,15 +1580,19 @@ void DBIter::SeekToFirst() {
     Seek(*iterate_lower_bound_);
     return;
   }
-  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
+  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
   // Don't use iter_::Seek() if we set a prefix extractor
   // because prefix seek will be used.
   if (!expect_total_order_inner_iter()) {
     max_skip_ = std::numeric_limits<uint64_t>::max();
   }
   status_ = Status::OK();
+  // if iterator is empty, this status_ could be unchecked.
+  status_.PermitUncheckedError();
   direction_ = kForward;
   ReleaseTempPinnedData();
+  ResetBlobValue();
+  ResetValueAndColumns();
   ResetInternalKeysSkippedCounter();
   ClearSavedValue();
   is_key_seqnum_zero_ = false;
@@ -1377,7 +1600,6 @@ void DBIter::SeekToFirst() {
   {
     PERF_TIMER_GUARD(seek_internal_seek_time);
     iter_.SeekToFirst();
-    range_del_agg_.InvalidateRangeDelMapPositions();
   }
 
   RecordTick(statistics_, NUMBER_DB_SEEK);
@@ -1399,37 +1621,49 @@ void DBIter::SeekToFirst() {
   }
   if (valid_ && prefix_same_as_start_) {
     assert(prefix_extractor_ != nullptr);
-    prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
+    prefix_.SetUserKey(prefix_extractor_->Transform(
+        StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
   }
 }
 
 void DBIter::SeekToLast() {
-  if (timestamp_size_ > 0) {
-    valid_ = false;
-    status_ = Status::NotSupported(
-        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
-    return;
-  }
-
   if (iterate_upper_bound_ != nullptr) {
     // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
     SeekForPrev(*iterate_upper_bound_);
-    if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
+    const bool is_ikey = (timestamp_size_ > 0 && timestamp_lb_ != nullptr);
+    Slice k = Valid() ? key() : Slice();
+    if (is_ikey && Valid()) {
+      k.remove_suffix(kNumInternalBytes + timestamp_size_);
+    }
+    while (Valid() && 0 == user_comparator_.CompareWithoutTimestamp(
+                               *iterate_upper_bound_, /*a_has_ts=*/false, k,
+                               /*b_has_ts=*/false)) {
       ReleaseTempPinnedData();
+      ResetBlobValue();
+      ResetValueAndColumns();
       PrevInternal(nullptr);
+
+      k = key();
+      if (is_ikey) {
+        k.remove_suffix(kNumInternalBytes + timestamp_size_);
+      }
     }
     return;
   }
 
-  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
+  PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
   // Don't use iter_::Seek() if we set a prefix extractor
   // because prefix seek will be used.
   if (!expect_total_order_inner_iter()) {
     max_skip_ = std::numeric_limits<uint64_t>::max();
   }
   status_ = Status::OK();
+  // if iterator is empty, this status_ could be unchecked.
+  status_.PermitUncheckedError();
   direction_ = kReverse;
   ReleaseTempPinnedData();
+  ResetBlobValue();
+  ResetValueAndColumns();
   ResetInternalKeysSkippedCounter();
   ClearSavedValue();
   is_key_seqnum_zero_ = false;
@@ -1437,7 +1671,6 @@ void DBIter::SeekToLast() {
   {
     PERF_TIMER_GUARD(seek_internal_seek_time);
     iter_.SeekToLast();
-    range_del_agg_.InvalidateRangeDelMapPositions();
   }
   PrevInternal(nullptr);
   if (statistics_ != nullptr) {
@@ -1450,23 +1683,25 @@ void DBIter::SeekToLast() {
   }
   if (valid_ && prefix_same_as_start_) {
     assert(prefix_extractor_ != nullptr);
-    prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
+    prefix_.SetUserKey(prefix_extractor_->Transform(
+        StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
   }
 }
 
 Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
-                        const ImmutableCFOptions& cf_options,
+                        const ImmutableOptions& ioptions,
                         const MutableCFOptions& mutable_cf_options,
                         const Comparator* user_key_comparator,
-                        InternalIterator* internal_iter,
+                        InternalIterator* internal_iter, const Version* version,
                         const SequenceNumber& sequence,
                         uint64_t max_sequential_skip_in_iterations,
                         ReadCallback* read_callback, DBImpl* db_impl,
-                        ColumnFamilyData* cfd, bool allow_blob) {
-  DBIter* db_iter = new DBIter(
-      env, read_options, cf_options, mutable_cf_options, user_key_comparator,
-      internal_iter, sequence, false, max_sequential_skip_in_iterations,
-      read_callback, db_impl, cfd, allow_blob);
+                        ColumnFamilyData* cfd, bool expose_blob_index) {
+  DBIter* db_iter =
+      new DBIter(env, read_options, ioptions, mutable_cf_options,
+                 user_key_comparator, internal_iter, version, sequence, false,
+                 max_sequential_skip_in_iterations, read_callback, db_impl, cfd,
+                 expose_blob_index);
   return db_iter;
 }