]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_with_timestamp_basic_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_with_timestamp_basic_test.cc
index 226e425013052f13cb81a54edca5d5bd18a5714d..6ea1aaf461baece3ce2063786fd08869a508c590 100644 (file)
@@ -7,7 +7,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
-#include "db/db_test_util.h"
+#include "db/db_with_timestamp_test_util.h"
 #include "port/stack_trace.h"
 #include "rocksdb/perf_context.h"
 #include "rocksdb/utilities/debug.h"
 #if !defined(ROCKSDB_LITE)
 #include "test_util/sync_point.h"
 #endif
+#include "test_util/testutil.h"
 #include "utilities/fault_injection_env.h"
+#include "utilities/merge_operators/string_append/stringappend2.h"
 
 namespace ROCKSDB_NAMESPACE {
-class DBBasicTestWithTimestampBase : public DBTestBase {
+class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
  public:
-  explicit DBBasicTestWithTimestampBase(const std::string& dbname)
-      : DBTestBase(dbname, /*env_do_fsync=*/true) {}
+  DBBasicTestWithTimestamp()
+      : DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {}
+};
 
- protected:
-  static std::string Key1(uint64_t k) {
-    std::string ret;
-    PutFixed64(&ret, k);
-    std::reverse(ret.begin(), ret.end());
-    return ret;
-  }
+TEST_F(DBBasicTestWithTimestamp, SanityChecks) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  options.avoid_flush_during_shutdown = true;
+  options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
+  DestroyAndReopen(options);
 
-  class TestComparator : public Comparator {
-   private:
-    const Comparator* cmp_without_ts_;
+  Options options1 = CurrentOptions();
+  options1.env = env_;
+  options1.comparator = test::BytewiseComparatorWithU64TsWrapper();
+  options1.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
+  assert(options1.comparator &&
+         options1.comparator->timestamp_size() == sizeof(uint64_t));
+  ColumnFamilyHandle* handle = nullptr;
+  Status s = db_->CreateColumnFamily(options1, "data", &handle);
+  ASSERT_OK(s);
 
-   public:
-    explicit TestComparator(size_t ts_sz)
-        : Comparator(ts_sz), cmp_without_ts_(nullptr) {
-      cmp_without_ts_ = BytewiseComparator();
-    }
+  std::string dummy_ts(sizeof(uint64_t), '\0');
+  // Perform timestamp operations on default cf.
+  ASSERT_TRUE(
+      db_->Put(WriteOptions(), "key", dummy_ts, "value").IsInvalidArgument());
+  ASSERT_TRUE(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), "key",
+                         dummy_ts, "value")
+                  .IsInvalidArgument());
+  ASSERT_TRUE(db_->Delete(WriteOptions(), "key", dummy_ts).IsInvalidArgument());
+  ASSERT_TRUE(
+      db_->SingleDelete(WriteOptions(), "key", dummy_ts).IsInvalidArgument());
+  ASSERT_TRUE(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                               "begin_key", "end_key", dummy_ts)
+                  .IsInvalidArgument());
+
+  // Perform non-timestamp operations on "data" cf.
+  ASSERT_TRUE(
+      db_->Put(WriteOptions(), handle, "key", "value").IsInvalidArgument());
+  ASSERT_TRUE(db_->Delete(WriteOptions(), handle, "key").IsInvalidArgument());
+  ASSERT_TRUE(
+      db_->SingleDelete(WriteOptions(), handle, "key").IsInvalidArgument());
+
+  ASSERT_TRUE(
+      db_->Merge(WriteOptions(), handle, "key", "value").IsInvalidArgument());
+  ASSERT_TRUE(db_->DeleteRange(WriteOptions(), handle, "begin_key", "end_key")
+                  .IsInvalidArgument());
 
-    const char* Name() const override { return "TestComparator"; }
+  {
+    WriteBatch wb;
+    ASSERT_OK(wb.Put(handle, "key", "value"));
+    ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument());
+  }
+  {
+    WriteBatch wb;
+    ASSERT_OK(wb.Delete(handle, "key"));
+    ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument());
+  }
+  {
+    WriteBatch wb;
+    ASSERT_OK(wb.SingleDelete(handle, "key"));
+    ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument());
+  }
+  {
+    WriteBatch wb;
+    ASSERT_OK(wb.DeleteRange(handle, "begin_key", "end_key"));
+    ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument());
+  }
 
-    void FindShortSuccessor(std::string*) const override {}
+  // Perform timestamp operations with timestamps of incorrect size.
+  const std::string wrong_ts(sizeof(uint32_t), '\0');
+  ASSERT_TRUE(db_->Put(WriteOptions(), handle, "key", wrong_ts, "value")
+                  .IsInvalidArgument());
+  ASSERT_TRUE(db_->Merge(WriteOptions(), handle, "key", wrong_ts, "value")
+                  .IsInvalidArgument());
+  ASSERT_TRUE(
+      db_->Delete(WriteOptions(), handle, "key", wrong_ts).IsInvalidArgument());
+  ASSERT_TRUE(db_->SingleDelete(WriteOptions(), handle, "key", wrong_ts)
+                  .IsInvalidArgument());
+  ASSERT_TRUE(
+      db_->DeleteRange(WriteOptions(), handle, "begin_key", "end_key", wrong_ts)
+          .IsInvalidArgument());
+
+  delete handle;
+}
 
-    void FindShortestSeparator(std::string*, const Slice&) const override {}
+TEST_F(DBBasicTestWithTimestamp, MixedCfs) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  options.avoid_flush_during_shutdown = true;
+  DestroyAndReopen(options);
 
-    int Compare(const Slice& a, const Slice& b) const override {
-      int r = CompareWithoutTimestamp(a, b);
-      if (r != 0 || 0 == timestamp_size()) {
-        return r;
-      }
-      return -CompareTimestamp(
-          Slice(a.data() + a.size() - timestamp_size(), timestamp_size()),
-          Slice(b.data() + b.size() - timestamp_size(), timestamp_size()));
-    }
+  Options options1 = CurrentOptions();
+  options1.env = env_;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options1.comparator = &test_cmp;
+  ColumnFamilyHandle* handle = nullptr;
+  Status s = db_->CreateColumnFamily(options1, "data", &handle);
+  ASSERT_OK(s);
 
-    using Comparator::CompareWithoutTimestamp;
-    int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
-                                bool b_has_ts) const override {
-      if (a_has_ts) {
-        assert(a.size() >= timestamp_size());
-      }
-      if (b_has_ts) {
-        assert(b.size() >= timestamp_size());
-      }
-      Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, timestamp_size()) : a;
-      Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, timestamp_size()) : b;
-      return cmp_without_ts_->Compare(lhs, rhs);
-    }
-
-    int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
-      if (!ts1.data() && !ts2.data()) {
-        return 0;
-      } else if (ts1.data() && !ts2.data()) {
-        return 1;
-      } else if (!ts1.data() && ts2.data()) {
-        return -1;
-      }
-      assert(ts1.size() == ts2.size());
-      uint64_t low1 = 0;
-      uint64_t low2 = 0;
-      uint64_t high1 = 0;
-      uint64_t high2 = 0;
-      const size_t kSize = ts1.size();
-      std::unique_ptr<char[]> ts1_buf(new char[kSize]);
-      memcpy(ts1_buf.get(), ts1.data(), ts1.size());
-      std::unique_ptr<char[]> ts2_buf(new char[kSize]);
-      memcpy(ts2_buf.get(), ts2.data(), ts2.size());
-      Slice ts1_copy = Slice(ts1_buf.get(), kSize);
-      Slice ts2_copy = Slice(ts2_buf.get(), kSize);
-      auto* ptr1 = const_cast<Slice*>(&ts1_copy);
-      auto* ptr2 = const_cast<Slice*>(&ts2_copy);
-      if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) ||
-          !GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) {
+  WriteBatch wb;
+  ASSERT_OK(wb.Put("a", "value"));
+  ASSERT_OK(wb.Put(handle, "a", "value"));
+  {
+    std::string ts = Timestamp(1, 0);
+    const auto ts_sz_func = [kTimestampSize, handle](uint32_t cf_id) {
+      assert(handle);
+      if (cf_id == 0) {
+        return static_cast<size_t>(0);
+      } else if (cf_id == handle->GetID()) {
+        return kTimestampSize;
+      } else {
         assert(false);
+        return std::numeric_limits<size_t>::max();
       }
-      if (high1 < high2) {
-        return -1;
-      } else if (high1 > high2) {
-        return 1;
-      }
-      if (low1 < low2) {
-        return -1;
-      } else if (low1 > low2) {
-        return 1;
-      }
-      return 0;
-    }
+    };
+    ASSERT_OK(wb.UpdateTimestamps(ts, ts_sz_func));
+    ASSERT_OK(db_->Write(WriteOptions(), &wb));
+  }
+
+  const auto verify_db = [this](ColumnFamilyHandle* h, const std::string& key,
+                                const std::string& ts,
+                                const std::string& expected_value) {
+    ASSERT_EQ(expected_value, Get(key));
+    Slice read_ts_slice(ts);
+    ReadOptions read_opts;
+    read_opts.timestamp = &read_ts_slice;
+    std::string value;
+    ASSERT_OK(db_->Get(read_opts, h, key, &value));
+    ASSERT_EQ(expected_value, value);
   };
 
-  std::string Timestamp(uint64_t low, uint64_t high) {
-    std::string ts;
-    PutFixed64(&ts, low);
-    PutFixed64(&ts, high);
-    return ts;
+  verify_db(handle, "a", Timestamp(1, 0), "value");
+
+  delete handle;
+  Close();
+
+  std::vector<ColumnFamilyDescriptor> cf_descs;
+  cf_descs.emplace_back(kDefaultColumnFamilyName, options);
+  cf_descs.emplace_back("data", options1);
+  options.create_if_missing = false;
+  s = DB::Open(options, dbname_, cf_descs, &handles_, &db_);
+  ASSERT_OK(s);
+
+  verify_db(handles_[1], "a", Timestamp(1, 0), "value");
+
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp, CompactRangeWithSpecifiedRange) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+
+  WriteOptions write_opts;
+  std::string ts = Timestamp(1, 0);
+
+  ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar"));
+  ASSERT_OK(Flush());
+
+  std::string start_str = "foo";
+  std::string end_str = "foo2";
+  Slice start(start_str), end(end_str);
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
+
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+
+  std::string ts_str = Timestamp(1, 0);
+  WriteOptions wopts;
+  ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v1"));
+  ASSERT_OK(db_->Put(wopts, "k2", ts_str, "v2"));
+  ASSERT_OK(db_->Put(wopts, "k3", ts_str, "v3"));
+
+  ts_str = Timestamp(2, 0);
+  ASSERT_OK(db_->Delete(wopts, "k3", ts_str));
+
+  ts_str = Timestamp(4, 0);
+  ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v5"));
+
+  ts_str = Timestamp(5, 0);
+  ASSERT_OK(
+      db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k0", "k9", ts_str));
+
+  ts_str = Timestamp(3, 0);
+  Slice ts = ts_str;
+  CompactRangeOptions cro;
+  cro.full_history_ts_low = &ts;
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+
+  ASSERT_OK(Flush());
+
+  ReadOptions ropts;
+  ropts.timestamp = &ts;
+  std::string value;
+  Status s = db_->Get(ropts, "k1", &value);
+  ASSERT_OK(s);
+  ASSERT_EQ("v1", value);
+
+  std::string key_ts;
+  ASSERT_TRUE(db_->Get(ropts, "k3", &value, &key_ts).IsNotFound());
+  ASSERT_EQ(Timestamp(2, 0), key_ts);
+
+  ts_str = Timestamp(5, 0);
+  ts = ts_str;
+  ropts.timestamp = &ts;
+  ASSERT_TRUE(db_->Get(ropts, "k2", &value, &key_ts).IsNotFound());
+  ASSERT_EQ(Timestamp(5, 0), key_ts);
+  ASSERT_TRUE(db_->Get(ropts, "k2", &value).IsNotFound());
+
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLow) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+
+  const std::string kKey = "test kKey";
+
+  // Test set ts_low first and flush()
+  int current_ts_low = 5;
+  std::string ts_low_str = Timestamp(current_ts_low, 0);
+  Slice ts_low = ts_low_str;
+  CompactRangeOptions comp_opts;
+  comp_opts.full_history_ts_low = &ts_low;
+  comp_opts.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+
+  ASSERT_OK(db_->CompactRange(comp_opts, nullptr, nullptr));
+
+  auto* cfd =
+      static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
+          ->cfd();
+  auto result_ts_low = cfd->GetFullHistoryTsLow();
+
+  ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0);
+
+  for (int i = 0; i < 10; i++) {
+    WriteOptions write_opts;
+    std::string ts = Timestamp(i, 0);
+    ASSERT_OK(db_->Put(write_opts, kKey, ts, Key(i)));
   }
+  ASSERT_OK(Flush());
 
-  void CheckIterUserEntry(const Iterator* it, const Slice& expected_key,
-                          ValueType expected_value_type,
-                          const Slice& expected_value,
-                          const Slice& expected_ts) const {
-    ASSERT_TRUE(it->Valid());
-    ASSERT_OK(it->status());
-    ASSERT_EQ(expected_key, it->key());
-    if (kTypeValue == expected_value_type) {
-      ASSERT_EQ(expected_value, it->value());
+  for (int i = 0; i < 10; i++) {
+    ReadOptions read_opts;
+    std::string ts_str = Timestamp(i, 0);
+    Slice ts = ts_str;
+    read_opts.timestamp = &ts;
+    std::string value;
+    Status status = db_->Get(read_opts, kKey, &value);
+    if (i < current_ts_low) {
+      ASSERT_TRUE(status.IsInvalidArgument());
+    } else {
+      ASSERT_OK(status);
+      ASSERT_TRUE(value.compare(Key(i)) == 0);
     }
-    ASSERT_EQ(expected_ts, it->timestamp());
   }
 
-  void CheckIterEntry(const Iterator* it, const Slice& expected_ukey,
-                      SequenceNumber expected_seq, ValueType expected_val_type,
-                      const Slice& expected_value, const Slice& expected_ts) {
-    ASSERT_TRUE(it->Valid());
-    ASSERT_OK(it->status());
-    std::string ukey_and_ts;
-    ukey_and_ts.assign(expected_ukey.data(), expected_ukey.size());
-    ukey_and_ts.append(expected_ts.data(), expected_ts.size());
-    ParsedInternalKey parsed_ikey;
-    ASSERT_OK(
-        ParseInternalKey(it->key(), &parsed_ikey, true /* log_err_key */));
-    ASSERT_EQ(ukey_and_ts, parsed_ikey.user_key);
-    ASSERT_EQ(expected_val_type, parsed_ikey.type);
-    ASSERT_EQ(expected_seq, parsed_ikey.sequence);
-    if (expected_val_type == kTypeValue) {
-      ASSERT_EQ(expected_value, it->value());
-    }
-    ASSERT_EQ(expected_ts, it->timestamp());
-  }
-
-  void CheckIterEntry(const Iterator* it, const Slice& expected_ukey,
-                      ValueType expected_val_type, const Slice& expected_value,
-                      const Slice& expected_ts) {
-    ASSERT_TRUE(it->Valid());
-    ASSERT_OK(it->status());
-    std::string ukey_and_ts;
-    ukey_and_ts.assign(expected_ukey.data(), expected_ukey.size());
-    ukey_and_ts.append(expected_ts.data(), expected_ts.size());
+  // Test set ts_low and then trigger compaction
+  for (int i = 10; i < 20; i++) {
+    WriteOptions write_opts;
+    std::string ts = Timestamp(i, 0);
+    ASSERT_OK(db_->Put(write_opts, kKey, ts, Key(i)));
+  }
 
-    ParsedInternalKey parsed_ikey;
-    ASSERT_OK(
-        ParseInternalKey(it->key(), &parsed_ikey, true /* log_err_key */));
-    ASSERT_EQ(expected_val_type, parsed_ikey.type);
-    ASSERT_EQ(Slice(ukey_and_ts), parsed_ikey.user_key);
-    if (expected_val_type == kTypeValue) {
-      ASSERT_EQ(expected_value, it->value());
-    }
-    ASSERT_EQ(expected_ts, it->timestamp());
+  ASSERT_OK(Flush());
+
+  current_ts_low = 15;
+  ts_low_str = Timestamp(current_ts_low, 0);
+  ts_low = ts_low_str;
+  comp_opts.full_history_ts_low = &ts_low;
+  ASSERT_OK(db_->CompactRange(comp_opts, nullptr, nullptr));
+  result_ts_low = cfd->GetFullHistoryTsLow();
+  ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0);
+
+  for (int i = current_ts_low; i < 20; i++) {
+    ReadOptions read_opts;
+    std::string ts_str = Timestamp(i, 0);
+    Slice ts = ts_str;
+    read_opts.timestamp = &ts;
+    std::string value;
+    Status status = db_->Get(read_opts, kKey, &value);
+    ASSERT_OK(status);
+    ASSERT_TRUE(value.compare(Key(i)) == 0);
   }
-};
 
-class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
- public:
-  DBBasicTestWithTimestamp()
-      : DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {}
-};
+  // Test invalid compaction with range
+  Slice start(kKey), end(kKey);
+  Status s = db_->CompactRange(comp_opts, &start, &end);
+  ASSERT_TRUE(s.IsInvalidArgument());
+  s = db_->CompactRange(comp_opts, &start, nullptr);
+  ASSERT_TRUE(s.IsInvalidArgument());
+  s = db_->CompactRange(comp_opts, nullptr, &end);
+  ASSERT_TRUE(s.IsInvalidArgument());
+
+  // Test invalid compaction with the decreasing ts_low
+  ts_low_str = Timestamp(current_ts_low - 1, 0);
+  ts_low = ts_low_str;
+  comp_opts.full_history_ts_low = &ts_low;
+  s = db_->CompactRange(comp_opts, nullptr, nullptr);
+  ASSERT_TRUE(s.IsInvalidArgument());
+
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLowWithPublicAPI) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+  std::string ts_low_str = Timestamp(9, 0);
+  ASSERT_OK(
+      db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), ts_low_str));
+  std::string result_ts_low;
+  ASSERT_OK(db_->GetFullHistoryTsLow(nullptr, &result_ts_low));
+  ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low_str, result_ts_low) == 0);
+  // test increase full_history_low backward
+  std::string ts_low_str_back = Timestamp(8, 0);
+  auto s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
+                                         ts_low_str_back);
+  ASSERT_EQ(s, Status::InvalidArgument());
+  // test IncreaseFullHistoryTsLow with a timestamp whose length is longger
+  // than the cf's timestamp size
+  std::string ts_low_str_long(Timestamp(0, 0).size() + 1, 'a');
+  s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
+                                    ts_low_str_long);
+  ASSERT_EQ(s, Status::InvalidArgument());
+  // test IncreaseFullHistoryTsLow with a timestamp which is null
+  std::string ts_low_str_null = "";
+  s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
+                                    ts_low_str_null);
+  ASSERT_EQ(s, Status::InvalidArgument());
+  // test IncreaseFullHistoryTsLow for a column family that does not enable
+  // timestamp
+  options.comparator = BytewiseComparator();
+  DestroyAndReopen(options);
+  ts_low_str = Timestamp(10, 0);
+  s = db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), ts_low_str);
+  ASSERT_EQ(s, Status::InvalidArgument());
+  // test GetFullHistoryTsLow for a column family that does not enable
+  // timestamp
+  std::string current_ts_low;
+  s = db_->GetFullHistoryTsLow(db_->DefaultColumnFamily(), &current_ts_low);
+  ASSERT_EQ(s, Status::InvalidArgument());
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp, GetApproximateSizes) {
+  Options options = CurrentOptions();
+  options.write_buffer_size = 100000000;  // Large write buffer
+  options.compression = kNoCompression;
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+  auto default_cf = db_->DefaultColumnFamily();
+
+  WriteOptions write_opts;
+  std::string ts = Timestamp(1, 0);
+
+  const int N = 128;
+  Random rnd(301);
+  for (int i = 0; i < N; i++) {
+    ASSERT_OK(db_->Put(write_opts, Key(i), ts, rnd.RandomString(1024)));
+  }
+
+  uint64_t size;
+  std::string start = Key(50);
+  std::string end = Key(60);
+  Range r(start, end);
+  SizeApproximationOptions size_approx_options;
+  size_approx_options.include_memtables = true;
+  size_approx_options.include_files = true;
+  ASSERT_OK(
+      db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
+  ASSERT_GT(size, 6000);
+  ASSERT_LT(size, 204800);
+
+  // test multiple ranges
+  std::vector<Range> ranges;
+  std::string start_tmp = Key(10);
+  std::string end_tmp = Key(20);
+  ranges.emplace_back(Range(start_tmp, end_tmp));
+  ranges.emplace_back(Range(start, end));
+  uint64_t range_sizes[2];
+  ASSERT_OK(db_->GetApproximateSizes(size_approx_options, default_cf,
+                                     ranges.data(), 2, range_sizes));
+
+  ASSERT_EQ(range_sizes[1], size);
+
+  // Zero if not including mem table
+  ASSERT_OK(db_->GetApproximateSizes(&r, 1, &size));
+  ASSERT_EQ(size, 0);
+
+  start = Key(500);
+  end = Key(600);
+  r = Range(start, end);
+  ASSERT_OK(
+      db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
+  ASSERT_EQ(size, 0);
+
+  // Test range boundaries
+  ASSERT_OK(db_->Put(write_opts, Key(1000), ts, rnd.RandomString(1024)));
+  // Should include start key
+  start = Key(1000);
+  end = Key(1100);
+  r = Range(start, end);
+  ASSERT_OK(
+      db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
+  ASSERT_GT(size, 0);
+
+  // Should exclude end key
+  start = Key(900);
+  end = Key(1000);
+  r = Range(start, end);
+  ASSERT_OK(
+      db_->GetApproximateSizes(size_approx_options, default_cf, &r, 1, &size));
+  ASSERT_EQ(size, 0);
+
+  Close();
+}
 
-TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
+TEST_F(DBBasicTestWithTimestamp, SimpleIterate) {
   const int kNumKeysPerFile = 128;
   const uint64_t kMaxKey = 1024;
   Options options = CurrentOptions();
@@ -188,7 +481,8 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
   const size_t kTimestampSize = Timestamp(0, 0).size();
   TestComparator test_cmp(kTimestampSize);
   options.comparator = &test_cmp;
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
   DestroyAndReopen(options);
   const std::vector<uint64_t> start_keys = {1, 0};
   const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
@@ -197,10 +491,9 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
                                                     Timestamp(4, 0)};
   for (size_t i = 0; i < write_timestamps.size(); ++i) {
     WriteOptions write_opts;
-    Slice write_ts = write_timestamps[i];
-    write_opts.timestamp = &write_ts;
     for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
-      Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
+      Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
+                          "value" + std::to_string(i));
       ASSERT_OK(s);
     }
   }
@@ -211,6 +504,7 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
     std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
     int count = 0;
     uint64_t key = 0;
+    // Forward iterate.
     for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid();
          it->Next(), ++count, ++key) {
       CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
@@ -219,7 +513,16 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
     size_t expected_count = kMaxKey - start_keys[i] + 1;
     ASSERT_EQ(expected_count, count);
 
-    // SeekToFirst() with lower bound.
+    // Backward iterate.
+    count = 0;
+    for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid();
+         it->Prev(), ++count, --key) {
+      CheckIterUserEntry(it.get(), Key1(key), kTypeValue,
+                         "value" + std::to_string(i), write_timestamps[i]);
+    }
+    ASSERT_EQ(static_cast<size_t>(kMaxKey) - start_keys[i] + 1, count);
+
+    // SeekToFirst()/SeekToLast() with lower/upper bounds.
     // Then iter with lower and upper bounds.
     uint64_t l = 0;
     uint64_t r = kMaxKey + 1;
@@ -237,6 +540,12 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
                            "value" + std::to_string(i), write_timestamps[i]);
       }
       ASSERT_EQ(r - std::max(l, start_keys[i]), count);
+
+      for (it->SeekToLast(), key = std::min(r, kMaxKey + 1), count = 0;
+           it->Valid(); it->Prev(), --key, ++count) {
+        CheckIterUserEntry(it.get(), Key1(key - 1), kTypeValue,
+                           "value" + std::to_string(i), write_timestamps[i]);
+      }
       l += (kMaxKey / 100);
       r -= (kMaxKey / 100);
     }
@@ -244,15 +553,226 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
   Close();
 }
 
-TEST_F(DBBasicTestWithTimestamp, SeekWithPrefixLessThanKey) {
+TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+  auto check_value_by_ts = [](DB* db, Slice key, std::string readTs,
+                              Status status, std::string checkValue,
+                              std::string expected_ts) {
+    ReadOptions ropts;
+    Slice ts = readTs;
+    ropts.timestamp = &ts;
+    std::string value;
+    std::string key_ts;
+    Status s = db->Get(ropts, key, &value, &key_ts);
+    ASSERT_TRUE(s == status);
+    if (s.ok()) {
+      ASSERT_EQ(checkValue, value);
+    }
+    if (s.ok() || s.IsNotFound()) {
+      ASSERT_EQ(expected_ts, key_ts);
+    }
+  };
+  // Construct data of different versions with different ts
+  ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(2, 0), "v1"));
+  ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(4, 0), "v2"));
+  ASSERT_OK(db_->Delete(WriteOptions(), "k1", Timestamp(5, 0)));
+  ASSERT_OK(db_->Put(WriteOptions(), "k1", Timestamp(6, 0), "v3"));
+  check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v3",
+                    Timestamp(6, 0));
+  ASSERT_OK(Flush());
+  Close();
+
+  ColumnFamilyOptions cf_options(options);
+  std::vector<ColumnFamilyDescriptor> column_families;
+  column_families.push_back(
+      ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
+  DBOptions db_options(options);
+
+  // Trim data whose version > Timestamp(5, 0), read(k1, ts(7)) <- NOT_FOUND.
+  ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
+                                   &handles_, &db_, Timestamp(5, 0)));
+  check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::NotFound(), "",
+                    Timestamp(5, 0));
+  Close();
+
+  // Trim data whose timestamp > Timestamp(4, 0), read(k1, ts(7)) <- v2
+  ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
+                                   &handles_, &db_, Timestamp(4, 0)));
+  check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2",
+                    Timestamp(4, 0));
+  Close();
+
+  Reopen(options);
+  ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "k1",
+                             "k3", Timestamp(7, 0)));
+  check_value_by_ts(db_, "k1", Timestamp(8, 0), Status::NotFound(), "",
+                    Timestamp(7, 0));
+  Close();
+  // Trim data whose timestamp > Timestamp(6, 0), read(k1, ts(8)) <- v2
+  ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
+                                   &handles_, &db_, Timestamp(6, 0)));
+  check_value_by_ts(db_, "k1", Timestamp(8, 0), Status::OK(), "v2",
+                    Timestamp(4, 0));
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp, OpenAndTrimHistoryInvalidOptionTest) {
+  Destroy(last_options_);
+
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+
+  ColumnFamilyOptions cf_options(options);
+  std::vector<ColumnFamilyDescriptor> column_families;
+  column_families.push_back(
+      ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
+  DBOptions db_options(options);
+
+  // OpenAndTrimHistory should not work with avoid_flush_during_recovery
+  db_options.avoid_flush_during_recovery = true;
+  ASSERT_TRUE(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
+                                     &handles_, &db_, Timestamp(0, 0))
+                  .IsInvalidArgument());
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBBasicTestWithTimestamp, GetTimestampTableProperties) {
+  Options options = CurrentOptions();
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+  // Create 2 tables
+  for (int table = 0; table < 2; ++table) {
+    for (int i = 0; i < 10; i++) {
+      std::string ts = Timestamp(i, 0);
+      ASSERT_OK(db_->Put(WriteOptions(), "key", ts, Key(i)));
+    }
+    ASSERT_OK(Flush());
+  }
+
+  TablePropertiesCollection props;
+  ASSERT_OK(db_->GetPropertiesOfAllTables(&props));
+  ASSERT_EQ(2U, props.size());
+  for (const auto& item : props) {
+    auto& user_collected = item.second->user_collected_properties;
+    ASSERT_TRUE(user_collected.find("rocksdb.timestamp_min") !=
+                user_collected.end());
+    ASSERT_TRUE(user_collected.find("rocksdb.timestamp_max") !=
+                user_collected.end());
+    ASSERT_EQ(user_collected.at("rocksdb.timestamp_min"), Timestamp(0, 0));
+    ASSERT_EQ(user_collected.at("rocksdb.timestamp_max"), Timestamp(9, 0));
+  }
+  Close();
+}
+#endif  // !ROCKSDB_LITE
+
+class DBBasicTestWithTimestampTableOptions
+    : public DBBasicTestWithTimestampBase,
+      public testing::WithParamInterface<BlockBasedTableOptions::IndexType> {
+ public:
+  explicit DBBasicTestWithTimestampTableOptions()
+      : DBBasicTestWithTimestampBase(
+            "db_basic_test_with_timestamp_table_options") {}
+};
+
+INSTANTIATE_TEST_CASE_P(
+    Timestamp, DBBasicTestWithTimestampTableOptions,
+    testing::Values(
+        BlockBasedTableOptions::IndexType::kBinarySearch,
+        BlockBasedTableOptions::IndexType::kHashSearch,
+        BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
+        BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey));
+
+TEST_P(DBBasicTestWithTimestampTableOptions, GetAndMultiGet) {
+  Options options = GetDefaultOptions();
+  options.create_if_missing = true;
+  options.prefix_extractor.reset(NewFixedPrefixTransform(3));
+  options.compression = kNoCompression;
+  BlockBasedTableOptions bbto;
+  bbto.index_type = GetParam();
+  bbto.block_size = 100;
+  options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator cmp(kTimestampSize);
+  options.comparator = &cmp;
+  DestroyAndReopen(options);
+  constexpr uint64_t kNumKeys = 1024;
+  for (uint64_t k = 0; k < kNumKeys; ++k) {
+    WriteOptions write_opts;
+    ASSERT_OK(db_->Put(write_opts, Key1(k), Timestamp(1, 0),
+                       "value" + std::to_string(k)));
+  }
+  ASSERT_OK(Flush());
+  {
+    ReadOptions read_opts;
+    read_opts.total_order_seek = true;
+    std::string ts_str = Timestamp(2, 0);
+    Slice ts = ts_str;
+    read_opts.timestamp = &ts;
+    std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
+    // verify Get()
+    for (it->SeekToFirst(); it->Valid(); it->Next()) {
+      std::string value_from_get;
+      std::string key_str(it->key().data(), it->key().size());
+      std::string timestamp;
+      ASSERT_OK(db_->Get(read_opts, key_str, &value_from_get, &timestamp));
+      ASSERT_EQ(it->value(), value_from_get);
+      ASSERT_EQ(Timestamp(1, 0), timestamp);
+    }
+
+    // verify MultiGet()
+    constexpr uint64_t step = 2;
+    static_assert(0 == (kNumKeys % step),
+                  "kNumKeys must be a multiple of step");
+    for (uint64_t k = 0; k < kNumKeys; k += 2) {
+      std::vector<std::string> key_strs;
+      std::vector<Slice> keys;
+      for (size_t i = 0; i < step; ++i) {
+        key_strs.push_back(Key1(k + i));
+      }
+      for (size_t i = 0; i < step; ++i) {
+        keys.emplace_back(key_strs[i]);
+      }
+      std::vector<std::string> values;
+      std::vector<std::string> timestamps;
+      std::vector<Status> statuses =
+          db_->MultiGet(read_opts, keys, &values, &timestamps);
+      ASSERT_EQ(step, statuses.size());
+      ASSERT_EQ(step, values.size());
+      ASSERT_EQ(step, timestamps.size());
+      for (uint64_t i = 0; i < step; ++i) {
+        ASSERT_OK(statuses[i]);
+        ASSERT_EQ("value" + std::to_string(k + i), values[i]);
+        ASSERT_EQ(Timestamp(1, 0), timestamps[i]);
+      }
+    }
+  }
+  Close();
+}
+
+TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithPrefixLessThanKey) {
   Options options = CurrentOptions();
   options.env = env_;
   options.create_if_missing = true;
   options.prefix_extractor.reset(NewFixedPrefixTransform(3));
+  options.memtable_whole_key_filtering = true;
+  options.memtable_prefix_bloom_size_ratio = 0.1;
   BlockBasedTableOptions bbto;
   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
   bbto.cache_index_and_filter_blocks = true;
   bbto.whole_key_filtering = true;
+  bbto.index_type = GetParam();
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   const size_t kTimestampSize = Timestamp(0, 0).size();
   TestComparator test_cmp(kTimestampSize);
@@ -260,31 +780,32 @@ TEST_F(DBBasicTestWithTimestamp, SeekWithPrefixLessThanKey) {
   DestroyAndReopen(options);
 
   WriteOptions write_opts;
-  std::string ts_str = Timestamp(1, 0);
-  Slice ts = ts_str;
-  write_opts.timestamp = &ts;
+  std::string ts = Timestamp(1, 0);
 
-  ASSERT_OK(db_->Put(write_opts, "foo1", "bar"));
-  Flush();
+  ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar"));
+  ASSERT_OK(Flush());
 
-  ASSERT_OK(db_->Put(write_opts, "foo2", "bar"));
-  Flush();
+  ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar"));
+  ASSERT_OK(Flush());
 
   // Move sst file to next level
   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 
-  ASSERT_OK(db_->Put(write_opts, "foo3", "bar"));
-  Flush();
+  ASSERT_OK(db_->Put(write_opts, "foo3", ts, "bar"));
+  ASSERT_OK(Flush());
 
   ReadOptions read_opts;
-  std::string read_ts = Timestamp(1, 0);
-  ts = read_ts;
-  read_opts.timestamp = &ts;
+  Slice read_ts = ts;
+  read_opts.timestamp = &read_ts;
   {
     std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
     iter->Seek("foo");
     ASSERT_TRUE(iter->Valid());
     ASSERT_OK(iter->status());
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+    ASSERT_OK(iter->status());
+
     iter->Seek("bbb");
     ASSERT_FALSE(iter->Valid());
     ASSERT_OK(iter->status());
@@ -293,15 +814,20 @@ TEST_F(DBBasicTestWithTimestamp, SeekWithPrefixLessThanKey) {
   Close();
 }
 
-TEST_F(DBBasicTestWithTimestamp, SeekWithPrefixLargerThanKey) {
+TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithCappedPrefix) {
   Options options = CurrentOptions();
   options.env = env_;
   options.create_if_missing = true;
-  options.prefix_extractor.reset(NewFixedPrefixTransform(20));
+  // All of the keys or this test must be longer than 3 characters
+  constexpr int kMinKeyLen = 3;
+  options.prefix_extractor.reset(NewCappedPrefixTransform(kMinKeyLen));
+  options.memtable_whole_key_filtering = true;
+  options.memtable_prefix_bloom_size_ratio = 0.1;
   BlockBasedTableOptions bbto;
   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
   bbto.cache_index_and_filter_blocks = true;
   bbto.whole_key_filtering = true;
+  bbto.index_type = GetParam();
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   const size_t kTimestampSize = Timestamp(0, 0).size();
   TestComparator test_cmp(kTimestampSize);
@@ -309,26 +835,24 @@ TEST_F(DBBasicTestWithTimestamp, SeekWithPrefixLargerThanKey) {
   DestroyAndReopen(options);
 
   WriteOptions write_opts;
-  std::string ts_str = Timestamp(1, 0);
-  Slice ts = ts_str;
-  write_opts.timestamp = &ts;
+  std::string ts = Timestamp(1, 0);
 
-  ASSERT_OK(db_->Put(write_opts, "foo1", "bar"));
-  Flush();
+  ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar"));
+  ASSERT_OK(Flush());
 
-  ASSERT_OK(db_->Put(write_opts, "foo2", "bar"));
-  Flush();
+  ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar"));
+  ASSERT_OK(Flush());
 
   // Move sst file to next level
   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 
-  ASSERT_OK(db_->Put(write_opts, "foo3", "bar"));
-  Flush();
+  ASSERT_OK(db_->Put(write_opts, "foo3", ts, "bar"));
+  ASSERT_OK(Flush());
 
   ReadOptions read_opts;
-  std::string read_ts = Timestamp(2, 0);
-  ts = read_ts;
-  read_opts.timestamp = &ts;
+  ts = Timestamp(2, 0);
+  Slice read_ts = ts;
+  read_opts.timestamp = &read_ts;
   {
     std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
     // Make sure the prefix extractor doesn't include timestamp, otherwise it
@@ -336,12 +860,242 @@ TEST_F(DBBasicTestWithTimestamp, SeekWithPrefixLargerThanKey) {
     iter->Seek("foo");
     ASSERT_TRUE(iter->Valid());
     ASSERT_OK(iter->status());
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+    ASSERT_OK(iter->status());
+  }
+
+  Close();
+}
+
+TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithBound) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  options.prefix_extractor.reset(NewFixedPrefixTransform(2));
+  BlockBasedTableOptions bbto;
+  bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
+  bbto.cache_index_and_filter_blocks = true;
+  bbto.whole_key_filtering = true;
+  bbto.index_type = GetParam();
+  options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+
+  WriteOptions write_opts;
+  std::string ts = Timestamp(1, 0);
+
+  ASSERT_OK(db_->Put(write_opts, "foo1", ts, "bar1"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(db_->Put(write_opts, "foo2", ts, "bar2"));
+  ASSERT_OK(Flush());
+
+  // Move sst file to next level
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+  for (int i = 3; i < 9; ++i) {
+    ASSERT_OK(db_->Put(write_opts, "foo" + std::to_string(i), ts,
+                       "bar" + std::to_string(i)));
+  }
+  ASSERT_OK(Flush());
+
+  ReadOptions read_opts;
+  ts = Timestamp(2, 0);
+  Slice read_ts = ts;
+  read_opts.timestamp = &read_ts;
+  std::string up_bound = "foo5";  // exclusive
+  Slice up_bound_slice = up_bound;
+  std::string lo_bound = "foo2";  // inclusive
+  Slice lo_bound_slice = lo_bound;
+  read_opts.iterate_upper_bound = &up_bound_slice;
+  read_opts.iterate_lower_bound = &lo_bound_slice;
+  read_opts.auto_prefix_mode = true;
+  {
+    std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
+    // Make sure the prefix extractor doesn't include timestamp, otherwise it
+    // may return invalid result.
+    iter->Seek("foo");
+    CheckIterUserEntry(iter.get(), lo_bound, kTypeValue, "bar2",
+                       Timestamp(1, 0));
+    iter->SeekToFirst();
+    CheckIterUserEntry(iter.get(), lo_bound, kTypeValue, "bar2",
+                       Timestamp(1, 0));
+    iter->SeekForPrev("g");
+    CheckIterUserEntry(iter.get(), "foo4", kTypeValue, "bar4", Timestamp(1, 0));
+    iter->SeekToLast();
+    CheckIterUserEntry(iter.get(), "foo4", kTypeValue, "bar4", Timestamp(1, 0));
+  }
+
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp, ChangeIterationDirection) {
+  Options options = GetDefaultOptions();
+  options.create_if_missing = true;
+  options.env = env_;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  options.prefix_extractor.reset(NewFixedPrefixTransform(1));
+  options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+  DestroyAndReopen(options);
+  const std::vector<std::string> timestamps = {Timestamp(1, 1), Timestamp(0, 2),
+                                               Timestamp(4, 3)};
+  const std::vector<std::tuple<std::string, std::string>> kvs = {
+      std::make_tuple("aa", "value1"), std::make_tuple("ab", "value2")};
+  for (const auto& ts : timestamps) {
+    WriteBatch wb(0, 0, 0, kTimestampSize);
+    for (const auto& kv : kvs) {
+      const std::string& key = std::get<0>(kv);
+      const std::string& value = std::get<1>(kv);
+      ASSERT_OK(wb.Put(key, value));
+    }
+
+    ASSERT_OK(wb.UpdateTimestamps(
+        ts, [kTimestampSize](uint32_t) { return kTimestampSize; }));
+    ASSERT_OK(db_->Write(WriteOptions(), &wb));
+  }
+  std::string read_ts_str = Timestamp(5, 3);
+  Slice read_ts = read_ts_str;
+  ReadOptions read_opts;
+  read_opts.timestamp = &read_ts;
+  std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
+
+  it->SeekToFirst();
+  ASSERT_TRUE(it->Valid());
+  it->Prev();
+  ASSERT_FALSE(it->Valid());
+
+  it->SeekToLast();
+  ASSERT_TRUE(it->Valid());
+  uint64_t prev_reseek_count =
+      options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION);
+  ASSERT_EQ(0, prev_reseek_count);
+  it->Next();
+  ASSERT_FALSE(it->Valid());
+  ASSERT_EQ(1 + prev_reseek_count,
+            options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
+
+  it->Seek(std::get<0>(kvs[0]));
+  CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue,
+                     std::get<1>(kvs[0]), Timestamp(4, 3));
+  it->Next();
+  CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue,
+                     std::get<1>(kvs[1]), Timestamp(4, 3));
+  it->Prev();
+  CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue,
+                     std::get<1>(kvs[0]), Timestamp(4, 3));
+
+  prev_reseek_count =
+      options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION);
+  ASSERT_EQ(1, prev_reseek_count);
+  it->Next();
+  CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue,
+                     std::get<1>(kvs[1]), Timestamp(4, 3));
+  ASSERT_EQ(1 + prev_reseek_count,
+            options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
+
+  it->SeekForPrev(std::get<0>(kvs[1]));
+  CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue,
+                     std::get<1>(kvs[1]), Timestamp(4, 3));
+  it->Prev();
+  CheckIterUserEntry(it.get(), std::get<0>(kvs[0]), kTypeValue,
+                     std::get<1>(kvs[0]), Timestamp(4, 3));
+
+  prev_reseek_count =
+      options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION);
+  it->Next();
+  CheckIterUserEntry(it.get(), std::get<0>(kvs[1]), kTypeValue,
+                     std::get<1>(kvs[1]), Timestamp(4, 3));
+  ASSERT_EQ(1 + prev_reseek_count,
+            options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
+
+  it.reset();
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
+  constexpr int kNumKeysPerFile = 128;
+  constexpr uint64_t kMaxKey = 1024;
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
+  DestroyAndReopen(options);
+  const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
+                                                     Timestamp(3, 0)};
+  const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
+                                                    Timestamp(4, 0)};
+  const std::vector<std::string> read_timestamps_lb = {Timestamp(1, 0),
+                                                       Timestamp(1, 0)};
+  for (size_t i = 0; i < write_timestamps.size(); ++i) {
+    WriteOptions write_opts;
+    for (uint64_t key = 0; key <= kMaxKey; ++key) {
+      Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
+                          "value" + std::to_string(i));
+      ASSERT_OK(s);
+    }
+  }
+  for (size_t i = 0; i < read_timestamps.size(); ++i) {
+    ReadOptions read_opts;
+    Slice read_ts = read_timestamps[i];
+    Slice read_ts_lb = read_timestamps_lb[i];
+    read_opts.timestamp = &read_ts;
+    read_opts.iter_start_ts = &read_ts_lb;
+    std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
+    int count = 0;
+    uint64_t key = 0;
+    for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) {
+      CheckIterEntry(it.get(), Key1(key), kTypeValue,
+                     "value" + std::to_string(i), write_timestamps[i]);
+      if (i > 0) {
+        it->Next();
+        CheckIterEntry(it.get(), Key1(key), kTypeValue,
+                       "value" + std::to_string(i - 1),
+                       write_timestamps[i - 1]);
+      }
+    }
+    size_t expected_count = kMaxKey + 1;
+    ASSERT_EQ(expected_count, count);
+  }
+  // Delete all keys@ts=5 and check iteration result with start ts set
+  {
+    std::string write_timestamp = Timestamp(5, 0);
+    WriteOptions write_opts;
+    for (uint64_t key = 0; key < kMaxKey + 1; ++key) {
+      Status s = db_->Delete(write_opts, Key1(key), write_timestamp);
+      ASSERT_OK(s);
+    }
+
+    std::string read_timestamp = Timestamp(6, 0);
+    ReadOptions read_opts;
+    Slice read_ts = read_timestamp;
+    read_opts.timestamp = &read_ts;
+    std::string read_timestamp_lb = Timestamp(2, 0);
+    Slice read_ts_lb = read_timestamp_lb;
+    read_opts.iter_start_ts = &read_ts_lb;
+    std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
+    int count = 0;
+    uint64_t key = 0;
+    for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) {
+      CheckIterEntry(it.get(), Key1(key), kTypeDeletionWithTimestamp, Slice(),
+                     write_timestamp);
+      // Skip key@ts=3 and land on tombstone key@ts=5
+      it->Next();
+    }
+    ASSERT_EQ(kMaxKey + 1, count);
   }
-
   Close();
 }
 
-TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
+TEST_F(DBBasicTestWithTimestamp, BackwardIterateLowerTsBound) {
   constexpr int kNumKeysPerFile = 128;
   constexpr uint64_t kMaxKey = 1024;
   Options options = CurrentOptions();
@@ -350,7 +1104,8 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
   const size_t kTimestampSize = Timestamp(0, 0).size();
   TestComparator test_cmp(kTimestampSize);
   options.comparator = &test_cmp;
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
   DestroyAndReopen(options);
   const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
                                                      Timestamp(3, 0)};
@@ -360,10 +1115,9 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
                                                        Timestamp(1, 0)};
   for (size_t i = 0; i < write_timestamps.size(); ++i) {
     WriteOptions write_opts;
-    Slice write_ts = write_timestamps[i];
-    write_opts.timestamp = &write_ts;
     for (uint64_t key = 0; key <= kMaxKey; ++key) {
-      Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
+      Status s = db_->Put(write_opts, Key1(key), write_timestamps[i],
+                          "value" + std::to_string(i));
       ASSERT_OK(s);
     }
   }
@@ -376,14 +1130,14 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
     std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
     int count = 0;
     uint64_t key = 0;
-    for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) {
-      CheckIterEntry(it.get(), Key1(key), kTypeValue,
-                     "value" + std::to_string(i), write_timestamps[i]);
+    for (it->SeekForPrev(Key1(kMaxKey)), key = kMaxKey; it->Valid();
+         it->Prev(), ++count, --key) {
+      CheckIterEntry(it.get(), Key1(key), kTypeValue, "value0",
+                     write_timestamps[0]);
       if (i > 0) {
-        it->Next();
-        CheckIterEntry(it.get(), Key1(key), kTypeValue,
-                       "value" + std::to_string(i - 1),
-                       write_timestamps[i - 1]);
+        it->Prev();
+        CheckIterEntry(it.get(), Key1(key), kTypeValue, "value1",
+                       write_timestamps[1]);
       }
     }
     size_t expected_count = kMaxKey + 1;
@@ -393,10 +1147,8 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
   {
     std::string write_timestamp = Timestamp(5, 0);
     WriteOptions write_opts;
-    Slice write_ts = write_timestamp;
-    write_opts.timestamp = &write_ts;
     for (uint64_t key = 0; key < kMaxKey + 1; ++key) {
-      Status s = db_->Delete(write_opts, Key1(key));
+      Status s = db_->Delete(write_opts, Key1(key), write_timestamp);
       ASSERT_OK(s);
     }
 
@@ -409,102 +1161,122 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
     read_opts.iter_start_ts = &read_ts_lb;
     std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
     int count = 0;
-    uint64_t key = 0;
-    for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) {
+    uint64_t key = kMaxKey;
+    for (it->SeekForPrev(Key1(key)), key = kMaxKey; it->Valid();
+         it->Prev(), ++count, --key) {
+      CheckIterEntry(it.get(), Key1(key), kTypeValue, "value1",
+                     Timestamp(3, 0));
+      it->Prev();
       CheckIterEntry(it.get(), Key1(key), kTypeDeletionWithTimestamp, Slice(),
-                     write_ts);
-      // Skip key@ts=3 and land on tombstone key@ts=5
-      it->Next();
+                     write_timestamp);
     }
     ASSERT_EQ(kMaxKey + 1, count);
   }
   Close();
 }
 
-TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
-  const int kNumKeysPerFile = 128;
-  const uint64_t kMaxKey = 0xffffffffffffffff;
-  const uint64_t kMinKey = kMaxKey - 1023;
+TEST_F(DBBasicTestWithTimestamp, SimpleBackwardIterateLowerTsBound) {
   Options options = CurrentOptions();
   options.env = env_;
   options.create_if_missing = true;
-  // Need to disable compaction to bottommost level when sequence number will be
-  // zeroed out, causing the verification of sequence number to fail in this
-  // test.
-  options.disable_auto_compactions = true;
   const size_t kTimestampSize = Timestamp(0, 0).size();
   TestComparator test_cmp(kTimestampSize);
   options.comparator = &test_cmp;
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
   DestroyAndReopen(options);
-  std::vector<SequenceNumber> start_seqs;
 
-  const int kNumTimestamps = 4;
-  std::vector<std::string> write_ts_list;
-  for (int t = 0; t != kNumTimestamps; ++t) {
-    write_ts_list.push_back(Timestamp(2 * t, /*do not care*/ 17));
+  std::string ts_ub_buf = Timestamp(5, 0);
+  Slice ts_ub = ts_ub_buf;
+  std::string ts_lb_buf = Timestamp(1, 0);
+  Slice ts_lb = ts_lb_buf;
+
+  {
+    ReadOptions read_opts;
+    read_opts.timestamp = &ts_ub;
+    read_opts.iter_start_ts = &ts_lb;
+    std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
+    it->SeekToLast();
+    ASSERT_FALSE(it->Valid());
+    ASSERT_OK(it->status());
+
+    it->SeekForPrev("foo");
+    ASSERT_FALSE(it->Valid());
+    ASSERT_OK(it->status());
   }
-  WriteOptions write_opts;
-  for (size_t i = 0; i != write_ts_list.size(); ++i) {
-    Slice write_ts = write_ts_list[i];
-    write_opts.timestamp = &write_ts;
-    for (uint64_t k = kMaxKey; k >= kMinKey; --k) {
-      Status s;
-      if (k % 2) {
-        s = db_->Put(write_opts, Key1(k), "value" + std::to_string(i));
-      } else {
-        s = db_->Delete(write_opts, Key1(k));
-      }
-      ASSERT_OK(s);
-    }
-    start_seqs.push_back(db_->GetLatestSequenceNumber());
+
+  // Test iterate_upper_bound
+  ASSERT_OK(db_->Put(WriteOptions(), "a", Timestamp(0, 0), "v0"));
+  ASSERT_OK(db_->SingleDelete(WriteOptions(), "a", Timestamp(1, 0)));
+
+  for (int i = 0; i < 5; ++i) {
+    ASSERT_OK(db_->Put(WriteOptions(), "b", Timestamp(i, 0),
+                       "v" + std::to_string(i)));
   }
-  std::vector<std::string> read_ts_list;
-  for (int t = 0; t != kNumTimestamps - 1; ++t) {
-    read_ts_list.push_back(Timestamp(2 * t + 3, /*do not care*/ 17));
+
+  {
+    ReadOptions read_opts;
+    read_opts.timestamp = &ts_ub;
+    read_opts.iter_start_ts = &ts_lb;
+    std::string key_ub_str = "b";  // exclusive
+    Slice key_ub = key_ub_str;
+    read_opts.iterate_upper_bound = &key_ub;
+    std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
+    it->SeekToLast();
+    CheckIterEntry(it.get(), "a", kTypeSingleDeletion, Slice(),
+                   Timestamp(1, 0));
+
+    key_ub_str = "a";  // exclusive
+    key_ub = key_ub_str;
+    read_opts.iterate_upper_bound = &key_ub;
+    it.reset(db_->NewIterator(read_opts));
+    it->SeekToLast();
+    ASSERT_FALSE(it->Valid());
+    ASSERT_OK(it->status());
   }
 
-  ReadOptions read_opts;
-  // Scan with only read_opts.iter_start_seqnum set.
-  for (size_t i = 0; i != read_ts_list.size(); ++i) {
-    Slice read_ts = read_ts_list[i];
-    read_opts.timestamp = &read_ts;
-    read_opts.iter_start_seqnum = start_seqs[i] + 1;
-    std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
-    SequenceNumber expected_seq = start_seqs[i] + (kMaxKey - kMinKey) + 1;
-    uint64_t key = kMinKey;
-    for (iter->Seek(Key1(kMinKey)); iter->Valid(); iter->Next()) {
-      CheckIterEntry(
-          iter.get(), Key1(key), expected_seq,
-          (key % 2) ? kTypeValue : kTypeDeletionWithTimestamp,
-          (key % 2) ? "value" + std::to_string(i + 1) : std::string(),
-          write_ts_list[i + 1]);
-      ++key;
-      --expected_seq;
-    }
-  }
-  // Scan with both read_opts.iter_start_seqnum and read_opts.iter_start_ts set.
-  std::vector<std::string> read_ts_lb_list;
-  for (int t = 0; t < kNumTimestamps - 1; ++t) {
-    read_ts_lb_list.push_back(Timestamp(2 * t, /*do not care*/ 17));
-  }
-  for (size_t i = 0; i < read_ts_list.size(); ++i) {
-    Slice read_ts = read_ts_list[i];
-    Slice read_ts_lb = read_ts_lb_list[i];
-    read_opts.timestamp = &read_ts;
-    read_opts.iter_start_ts = &read_ts_lb;
-    read_opts.iter_start_seqnum = start_seqs[i] + 1;
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp, BackwardIterateLowerTsBound_Reseek) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  options.max_sequential_skip_in_iterations = 2;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+
+  for (int i = 0; i < 10; ++i) {
+    ASSERT_OK(db_->Put(WriteOptions(), "a", Timestamp(i, 0),
+                       "v" + std::to_string(i)));
+  }
+
+  for (int i = 0; i < 10; ++i) {
+    ASSERT_OK(db_->Put(WriteOptions(), "b", Timestamp(i, 0),
+                       "v" + std::to_string(i)));
+  }
+
+  {
+    std::string ts_ub_buf = Timestamp(6, 0);
+    Slice ts_ub = ts_ub_buf;
+    std::string ts_lb_buf = Timestamp(4, 0);
+    Slice ts_lb = ts_lb_buf;
+
+    ReadOptions read_opts;
+    read_opts.timestamp = &ts_ub;
+    read_opts.iter_start_ts = &ts_lb;
     std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
-    uint64_t key = kMinKey;
-    SequenceNumber expected_seq = start_seqs[i] + (kMaxKey - kMinKey) + 1;
-    for (it->Seek(Key1(kMinKey)); it->Valid(); it->Next()) {
-      CheckIterEntry(it.get(), Key1(key), expected_seq,
-                     (key % 2) ? kTypeValue : kTypeDeletionWithTimestamp,
-                     "value" + std::to_string(i + 1), write_ts_list[i + 1]);
-      ++key;
-      --expected_seq;
+    it->SeekToLast();
+    for (int i = 0; i < 3 && it->Valid(); it->Prev(), ++i) {
+      CheckIterEntry(it.get(), "b", kTypeValue, "v" + std::to_string(4 + i),
+                     Timestamp(4 + i, 0));
+    }
+    for (int i = 0; i < 3 && it->Valid(); it->Prev(), ++i) {
+      CheckIterEntry(it.get(), "a", kTypeValue, "v" + std::to_string(4 + i),
+                     Timestamp(4 + i, 0));
     }
   }
+
   Close();
 }
 
@@ -523,10 +1295,8 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) {
   WriteOptions write_opts;
   Status s;
   for (size_t i = 0; i != kNumKeys; ++i) {
-    std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
-    Slice ts = ts_str;
-    write_opts.timestamp = &ts;
-    s = db_->Put(write_opts, "foo", "value" + std::to_string(i));
+    std::string ts = Timestamp(static_cast<uint64_t>(i + 1), 0);
+    s = db_->Put(write_opts, "foo", ts, "value" + std::to_string(i));
     ASSERT_OK(s);
   }
   {
@@ -539,6 +1309,16 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) {
     CheckIterUserEntry(iter.get(), "foo", kTypeValue, "value0", ts_str);
     ASSERT_EQ(
         1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
+
+    ts_str = Timestamp(kNumKeys, 0);
+    ts = ts_str;
+    read_opts.timestamp = &ts;
+    iter.reset(db_->NewIterator(read_opts));
+    iter->SeekToLast();
+    CheckIterUserEntry(iter.get(), "foo", kTypeValue,
+                       "value" + std::to_string(kNumKeys - 1), ts_str);
+    ASSERT_EQ(
+        2, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
   }
   Close();
 }
@@ -558,18 +1338,17 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
   WriteOptions write_opts;
   Status s;
   for (size_t i = 0; i != kNumKeys; ++i) {
-    std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
-    Slice ts = ts_str;
-    write_opts.timestamp = &ts;
-    s = db_->Put(write_opts, "a", "value" + std::to_string(i));
+    std::string ts = Timestamp(static_cast<uint64_t>(i + 1), 0);
+    s = db_->Put(write_opts, "a", ts, "value" + std::to_string(i));
     ASSERT_OK(s);
   }
   {
     std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
-    WriteBatch batch(0, 0, kTimestampSize);
-    ASSERT_OK(batch.Put("a", "new_value"));
-    ASSERT_OK(batch.Put("b", "new_value"));
-    s = batch.AssignTimestamp(ts_str);
+    WriteBatch batch(0, 0, 0, kTimestampSize);
+    { ASSERT_OK(batch.Put("a", "new_value")); }
+    { ASSERT_OK(batch.Put("b", "new_value")); }
+    s = batch.UpdateTimestamps(
+        ts_str, [kTimestampSize](uint32_t) { return kTimestampSize; });
     ASSERT_OK(s);
     s = db_->Write(write_opts, &batch);
     ASSERT_OK(s);
@@ -589,6 +1368,43 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
   Close();
 }
 
+TEST_F(DBBasicTestWithTimestamp, ReseekToUserKeyBeforeSavedKey) {
+  Options options = GetDefaultOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  constexpr size_t kNumKeys = 16;
+  options.max_sequential_skip_in_iterations = kNumKeys / 2;
+  options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+  for (size_t i = 0; i < kNumKeys; ++i) {
+    std::string ts = Timestamp(static_cast<uint64_t>(i + 1), 0);
+    WriteOptions write_opts;
+    Status s = db_->Put(write_opts, "b", ts, "value" + std::to_string(i));
+    ASSERT_OK(s);
+  }
+  {
+    std::string ts = Timestamp(1, 0);
+    WriteOptions write_opts;
+    ASSERT_OK(db_->Put(write_opts, "a", ts, "value"));
+  }
+  {
+    ReadOptions read_opts;
+    std::string ts_str = Timestamp(1, 0);
+    Slice ts = ts_str;
+    read_opts.timestamp = &ts;
+    std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
+    iter->SeekToLast();
+    iter->Prev();
+    CheckIterUserEntry(iter.get(), "a", kTypeValue, "value", ts_str);
+    ASSERT_EQ(
+        1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
+  }
+  Close();
+}
+
 TEST_F(DBBasicTestWithTimestamp, MultiGetWithFastLocalBloom) {
   Options options = CurrentOptions();
   options.env = env_;
@@ -605,31 +1421,45 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetWithFastLocalBloom) {
 
   // Write any value
   WriteOptions write_opts;
-  std::string ts_str = Timestamp(1, 0);
-  Slice ts = ts_str;
-  write_opts.timestamp = &ts;
+  std::string ts = Timestamp(1, 0);
 
-  ASSERT_OK(db_->Put(write_opts, "foo", "bar"));
+  ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar"));
 
-  Flush();
+  ASSERT_OK(Flush());
 
   // Read with MultiGet
   ReadOptions read_opts;
-  read_opts.timestamp = &ts;
+  Slice read_ts = ts;
+  read_opts.timestamp = &read_ts;
   size_t batch_size = 1;
   std::vector<Slice> keys(batch_size);
   std::vector<PinnableSlice> values(batch_size);
   std::vector<Status> statuses(batch_size);
+  std::vector<std::string> timestamps(batch_size);
   keys[0] = "foo";
   ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
   db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
-                statuses.data());
+                timestamps.data(), statuses.data(), true);
 
   ASSERT_OK(statuses[0]);
+  ASSERT_EQ(Timestamp(1, 0), timestamps[0]);
+  for (auto& elem : values) {
+    elem.Reset();
+  }
+
+  ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0)));
+  ts = Timestamp(3, 0);
+  read_ts = ts;
+  read_opts.timestamp = &read_ts;
+  db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
+                timestamps.data(), statuses.data(), true);
+  ASSERT_TRUE(statuses[0].IsNotFound());
+  ASSERT_EQ(Timestamp(2, 0), timestamps[0]);
+
   Close();
 }
 
-TEST_F(DBBasicTestWithTimestamp, MultiGetWithPrefix) {
+TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetWithPrefix) {
   Options options = CurrentOptions();
   options.env = env_;
   options.create_if_missing = true;
@@ -638,6 +1468,7 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetWithPrefix) {
   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
   bbto.cache_index_and_filter_blocks = true;
   bbto.whole_key_filtering = false;
+  bbto.index_type = GetParam();
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   const size_t kTimestampSize = Timestamp(0, 0).size();
   TestComparator test_cmp(kTimestampSize);
@@ -646,31 +1477,49 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetWithPrefix) {
 
   // Write any value
   WriteOptions write_opts;
-  std::string ts_str = Timestamp(1, 0);
-  Slice ts = ts_str;
-  write_opts.timestamp = &ts;
+  std::string ts = Timestamp(1, 0);
 
-  ASSERT_OK(db_->Put(write_opts, "foo", "bar"));
+  ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar"));
 
-  Flush();
+  ASSERT_OK(Flush());
 
   // Read with MultiGet
   ReadOptions read_opts;
-  read_opts.timestamp = &ts;
+  Slice read_ts = ts;
+  read_opts.timestamp = &read_ts;
   size_t batch_size = 1;
   std::vector<Slice> keys(batch_size);
   std::vector<PinnableSlice> values(batch_size);
   std::vector<Status> statuses(batch_size);
+  std::vector<std::string> timestamps(batch_size);
   keys[0] = "foo";
   ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
   db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
-                statuses.data());
+                timestamps.data(), statuses.data(), true);
 
   ASSERT_OK(statuses[0]);
+  ASSERT_EQ(Timestamp(1, 0), timestamps[0]);
+  for (auto& elem : values) {
+    elem.Reset();
+  }
+
+  ASSERT_OK(db_->SingleDelete(WriteOptions(), "foo", Timestamp(2, 0)));
+  // TODO re-enable after fixing a bug of kHashSearch
+  if (GetParam() != BlockBasedTableOptions::IndexType::kHashSearch) {
+    ASSERT_OK(Flush());
+  }
+
+  ts = Timestamp(3, 0);
+  read_ts = ts;
+  db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
+                timestamps.data(), statuses.data(), true);
+  ASSERT_TRUE(statuses[0].IsNotFound());
+  ASSERT_EQ(Timestamp(2, 0), timestamps[0]);
+
   Close();
 }
 
-TEST_F(DBBasicTestWithTimestamp, MultiGetWithMemBloomFilter) {
+TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetWithMemBloomFilter) {
   Options options = CurrentOptions();
   options.env = env_;
   options.create_if_missing = true;
@@ -679,6 +1528,7 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetWithMemBloomFilter) {
   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
   bbto.cache_index_and_filter_blocks = true;
   bbto.whole_key_filtering = false;
+  bbto.index_type = GetParam();
   options.memtable_prefix_bloom_size_ratio = 0.1;
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   const size_t kTimestampSize = Timestamp(0, 0).size();
@@ -688,17 +1538,15 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetWithMemBloomFilter) {
 
   // Write any value
   WriteOptions write_opts;
-  std::string ts_str = Timestamp(1, 0);
-  Slice ts = ts_str;
-  write_opts.timestamp = &ts;
+  std::string ts = Timestamp(1, 0);
 
-  ASSERT_OK(db_->Put(write_opts, "foo", "bar"));
+  ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar"));
 
   // Read with MultiGet
-  ts_str = Timestamp(2, 0);
-  ts = ts_str;
+  ts = Timestamp(2, 0);
+  Slice read_ts = ts;
   ReadOptions read_opts;
-  read_opts.timestamp = &ts;
+  read_opts.timestamp = &read_ts;
   size_t batch_size = 1;
   std::vector<Slice> keys(batch_size);
   std::vector<PinnableSlice> values(batch_size);
@@ -729,32 +1577,30 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetRangeFiltering) {
 
   // Write any value
   WriteOptions write_opts;
-  std::string ts_str = Timestamp(1, 0);
-  Slice ts = ts_str;
-  write_opts.timestamp = &ts;
+  std::string ts = Timestamp(1, 0);
 
   // random data
   for (int i = 0; i < 3; i++) {
-    auto key = ToString(i * 10);
-    auto value = ToString(i * 10);
+    auto key = std::to_string(i * 10);
+    auto value = std::to_string(i * 10);
     Slice key_slice = key;
     Slice value_slice = value;
-    ASSERT_OK(db_->Put(write_opts, key_slice, value_slice));
-    Flush();
+    ASSERT_OK(db_->Put(write_opts, key_slice, ts, value_slice));
+    ASSERT_OK(Flush());
   }
 
   // Make num_levels to 2 to do key range filtering of sst files
   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 
-  ASSERT_OK(db_->Put(write_opts, "foo", "bar"));
+  ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar"));
 
-  Flush();
+  ASSERT_OK(Flush());
 
   // Read with MultiGet
-  ts_str = Timestamp(2, 0);
-  ts = ts_str;
+  ts = Timestamp(2, 0);
+  Slice read_ts = ts;
   ReadOptions read_opts;
-  read_opts.timestamp = &ts;
+  read_opts.timestamp = &read_ts;
   size_t batch_size = 1;
   std::vector<Slice> keys(batch_size);
   std::vector<PinnableSlice> values(batch_size);
@@ -768,15 +1614,16 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetRangeFiltering) {
   Close();
 }
 
-TEST_F(DBBasicTestWithTimestamp, MultiGetPrefixFilter) {
+TEST_P(DBBasicTestWithTimestampTableOptions, MultiGetPrefixFilter) {
   Options options = CurrentOptions();
   options.env = env_;
   options.create_if_missing = true;
-  options.prefix_extractor.reset(NewCappedPrefixTransform(5));
+  options.prefix_extractor.reset(NewCappedPrefixTransform(3));
   BlockBasedTableOptions bbto;
   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
   bbto.cache_index_and_filter_blocks = true;
   bbto.whole_key_filtering = false;
+  bbto.index_type = GetParam();
   options.memtable_prefix_bloom_size_ratio = 0.1;
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   const size_t kTimestampSize = Timestamp(0, 0).size();
@@ -785,18 +1632,16 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetPrefixFilter) {
   DestroyAndReopen(options);
 
   WriteOptions write_opts;
-  std::string ts_str = Timestamp(1, 0);
-  Slice ts = ts_str;
-  write_opts.timestamp = &ts;
+  std::string ts = Timestamp(1, 0);
 
-  ASSERT_OK(db_->Put(write_opts, "foo", "bar"));
+  ASSERT_OK(db_->Put(write_opts, "foo", ts, "bar"));
 
-  Flush();
+  ASSERT_OK(Flush());
   // Read with MultiGet
-  ts_str = Timestamp(2, 0);
-  ts = ts_str;
+  ts = Timestamp(2, 0);
+  Slice read_ts = ts;
   ReadOptions read_opts;
-  read_opts.timestamp = &ts;
+  read_opts.timestamp = &read_ts;
   size_t batch_size = 1;
   std::vector<Slice> keys(batch_size);
   std::vector<std::string> values(batch_size);
@@ -811,7 +1656,7 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetPrefixFilter) {
   Close();
 }
 
-TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) {
+TEST_F(DBBasicTestWithTimestamp, MaxKeysSkippedDuringNext) {
   Options options = CurrentOptions();
   options.env = env_;
   options.create_if_missing = true;
@@ -824,16 +1669,12 @@ TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) {
   WriteOptions write_opts;
   Status s;
   {
-    std::string ts_str = Timestamp(1, 0);
-    Slice ts = ts_str;
-    write_opts.timestamp = &ts;
-    ASSERT_OK(db_->Put(write_opts, "a", "value"));
+    std::string ts = Timestamp(1, 0);
+    ASSERT_OK(db_->Put(write_opts, "a", ts, "value"));
   }
   for (size_t i = 0; i < kNumKeys; ++i) {
-    std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
-    Slice ts = ts_str;
-    write_opts.timestamp = &ts;
-    s = db_->Put(write_opts, "b", "value" + std::to_string(i));
+    std::string ts = Timestamp(static_cast<uint64_t>(i + 1), 0);
+    s = db_->Put(write_opts, "b", ts, "value" + std::to_string(i));
     ASSERT_OK(s);
   }
   {
@@ -850,6 +1691,41 @@ TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) {
   Close();
 }
 
+TEST_F(DBBasicTestWithTimestamp, MaxKeysSkippedDuringPrev) {
+  Options options = GetDefaultOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+  constexpr size_t max_skippable_internal_keys = 2;
+  const size_t kNumKeys = max_skippable_internal_keys + 2;
+  WriteOptions write_opts;
+  Status s;
+  {
+    std::string ts = Timestamp(1, 0);
+    ASSERT_OK(db_->Put(write_opts, "b", ts, "value"));
+  }
+  for (size_t i = 0; i < kNumKeys; ++i) {
+    std::string ts = Timestamp(static_cast<uint64_t>(i + 1), 0);
+    s = db_->Put(write_opts, "a", ts, "value" + std::to_string(i));
+    ASSERT_OK(s);
+  }
+  {
+    ReadOptions read_opts;
+    read_opts.max_skippable_internal_keys = max_skippable_internal_keys;
+    std::string ts_str = Timestamp(1, 0);
+    Slice ts = ts_str;
+    read_opts.timestamp = &ts;
+    std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
+    iter->SeekToLast();
+    iter->Prev();
+    ASSERT_TRUE(iter->status().IsIncomplete());
+  }
+  Close();
+}
+
 // Create two L0, and compact them to a new L1. In this test, L1 is L_bottom.
 // Two L0s:
 //       f1                                  f2
@@ -867,55 +1743,179 @@ TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) {
   options.level0_file_num_compaction_trigger = 2;
   DestroyAndReopen(options);
   WriteOptions write_opts;
-  std::string ts_str = Timestamp(1, 0);
-  Slice ts = ts_str;
-  write_opts.timestamp = &ts;
-  ASSERT_OK(db_->Put(write_opts, "a", "value0"));
+  std::string ts = Timestamp(1, 0);
+  ASSERT_OK(db_->Put(write_opts, "a", ts, "value0"));
   ASSERT_OK(Flush());
 
-  ts_str = Timestamp(2, 0);
-  ts = ts_str;
-  write_opts.timestamp = &ts;
-  ASSERT_OK(db_->Put(write_opts, "b", "value0"));
-  ts_str = Timestamp(3, 0);
-  ts = ts_str;
-  write_opts.timestamp = &ts;
-  ASSERT_OK(db_->Delete(write_opts, "a"));
+  ts = Timestamp(2, 0);
+  ASSERT_OK(db_->Put(write_opts, "b", ts, "value0"));
+  ts = Timestamp(3, 0);
+  ASSERT_OK(db_->Delete(write_opts, "a", ts));
   ASSERT_OK(Flush());
   ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   ReadOptions read_opts;
-  ts_str = Timestamp(1, 0);
-  ts = ts_str;
-  read_opts.timestamp = &ts;
+  ts = Timestamp(1, 0);
+  Slice read_ts = ts;
+  read_opts.timestamp = &read_ts;
   std::string value;
   Status s = db_->Get(read_opts, "a", &value);
   ASSERT_OK(s);
   ASSERT_EQ("value0", value);
 
-  ts_str = Timestamp(3, 0);
-  ts = ts_str;
-  read_opts.timestamp = &ts;
-  s = db_->Get(read_opts, "a", &value);
+  ts = Timestamp(3, 0);
+  read_ts = ts;
+  read_opts.timestamp = &read_ts;
+  std::string key_ts;
+  s = db_->Get(read_opts, "a", &value, &key_ts);
   ASSERT_TRUE(s.IsNotFound());
+  ASSERT_EQ(Timestamp(3, 0), key_ts);
 
   // Time-travel to the past before deletion
-  ts_str = Timestamp(2, 0);
-  ts = ts_str;
-  read_opts.timestamp = &ts;
+  ts = Timestamp(2, 0);
+  read_ts = ts;
+  read_opts.timestamp = &read_ts;
   s = db_->Get(read_opts, "a", &value);
   ASSERT_OK(s);
   ASSERT_EQ("value0", value);
   Close();
 }
 
+#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
+class DBBasicTestWithTimestampFilterPrefixSettings
+    : public DBBasicTestWithTimestampBase,
+      public testing::WithParamInterface<
+          std::tuple<std::shared_ptr<const FilterPolicy>, bool, bool,
+                     std::shared_ptr<const SliceTransform>, bool, double,
+                     BlockBasedTableOptions::IndexType>> {
+ public:
+  DBBasicTestWithTimestampFilterPrefixSettings()
+      : DBBasicTestWithTimestampBase(
+            "db_basic_test_with_timestamp_filter_prefix") {}
+};
+
+TEST_P(DBBasicTestWithTimestampFilterPrefixSettings, GetAndMultiGet) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  BlockBasedTableOptions bbto;
+  bbto.filter_policy = std::get<0>(GetParam());
+  bbto.whole_key_filtering = std::get<1>(GetParam());
+  bbto.cache_index_and_filter_blocks = std::get<2>(GetParam());
+  bbto.index_type = std::get<6>(GetParam());
+  options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+  options.prefix_extractor = std::get<3>(GetParam());
+  options.memtable_whole_key_filtering = std::get<4>(GetParam());
+  options.memtable_prefix_bloom_size_ratio = std::get<5>(GetParam());
+
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+  const int kMaxKey = 1000;
+
+  // Write any value
+  WriteOptions write_opts;
+  std::string ts = Timestamp(1, 0);
+
+  int idx = 0;
+  for (; idx < kMaxKey / 4; idx++) {
+    ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar"));
+    ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar"));
+  }
+
+  ASSERT_OK(Flush());
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+  for (; idx < kMaxKey / 2; idx++) {
+    ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar"));
+    ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar"));
+  }
+
+  ASSERT_OK(Flush());
+
+  for (; idx < kMaxKey; idx++) {
+    ASSERT_OK(db_->Put(write_opts, Key1(idx), ts, "bar"));
+    ASSERT_OK(db_->Put(write_opts, KeyWithPrefix("foo", idx), ts, "bar"));
+  }
+
+  // Read with MultiGet
+  ReadOptions read_opts;
+  Slice read_ts = ts;
+  read_opts.timestamp = &read_ts;
+
+  for (idx = 0; idx < kMaxKey; idx++) {
+    size_t batch_size = 4;
+    std::vector<std::string> keys_str(batch_size);
+    std::vector<PinnableSlice> values(batch_size);
+    std::vector<Status> statuses(batch_size);
+    ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
+
+    keys_str[0] = Key1(idx);
+    keys_str[1] = KeyWithPrefix("foo", idx);
+    keys_str[2] = Key1(kMaxKey + idx);
+    keys_str[3] = KeyWithPrefix("foo", kMaxKey + idx);
+
+    auto keys = ConvertStrToSlice(keys_str);
+
+    db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
+                  statuses.data());
+
+    for (int i = 0; i < 2; i++) {
+      ASSERT_OK(statuses[i]);
+    }
+    for (int i = 2; i < 4; i++) {
+      ASSERT_TRUE(statuses[i].IsNotFound());
+    }
+
+    for (int i = 0; i < 2; i++) {
+      std::string value;
+      ASSERT_OK(db_->Get(read_opts, keys[i], &value));
+      std::unique_ptr<Iterator> it1(db_->NewIterator(read_opts));
+      ASSERT_NE(nullptr, it1);
+      ASSERT_OK(it1->status());
+      it1->Seek(keys[i]);
+      ASSERT_TRUE(it1->Valid());
+    }
+
+    for (int i = 2; i < 4; i++) {
+      std::string value;
+      Status s = db_->Get(read_opts, keys[i], &value);
+      ASSERT_TRUE(s.IsNotFound());
+    }
+  }
+  Close();
+}
+
+INSTANTIATE_TEST_CASE_P(
+    Timestamp, DBBasicTestWithTimestampFilterPrefixSettings,
+    ::testing::Combine(
+        ::testing::Values(
+            std::shared_ptr<const FilterPolicy>(nullptr),
+            std::shared_ptr<const FilterPolicy>(NewBloomFilterPolicy(10, true)),
+            std::shared_ptr<const FilterPolicy>(NewBloomFilterPolicy(10,
+                                                                     false))),
+        ::testing::Bool(), ::testing::Bool(),
+        ::testing::Values(
+            std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(1)),
+            std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(4)),
+            std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(7)),
+            std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(8))),
+        ::testing::Bool(), ::testing::Values(0, 0.1),
+        ::testing::Values(
+            BlockBasedTableOptions::IndexType::kBinarySearch,
+            BlockBasedTableOptions::IndexType::kHashSearch,
+            BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
+            BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey)));
+#endif  // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
+
 class DataVisibilityTest : public DBBasicTestWithTimestampBase {
  public:
   DataVisibilityTest() : DBBasicTestWithTimestampBase("data_visibility_test") {
     // Initialize test data
     for (int i = 0; i < kTestDataSize; i++) {
-      test_data_[i].key = "key" + ToString(i);
-      test_data_[i].value = "value" + ToString(i);
+      test_data_[i].key = "key" + std::to_string(i);
+      test_data_[i].value = "value" + std::to_string(i);
       test_data_[i].timestamp = Timestamp(i, 0);
       test_data_[i].ts = i;
       test_data_[i].seq_num = kMaxSequenceNumber;
@@ -937,12 +1937,10 @@ class DataVisibilityTest : public DBBasicTestWithTimestampBase {
   void PutTestData(int index, ColumnFamilyHandle* cfh = nullptr) {
     ASSERT_LE(index, kTestDataSize);
     WriteOptions write_opts;
-    Slice ts_slice = test_data_[index].timestamp;
-    write_opts.timestamp = &ts_slice;
 
     if (cfh == nullptr) {
-      ASSERT_OK(
-          db_->Put(write_opts, test_data_[index].key, test_data_[index].value));
+      ASSERT_OK(db_->Put(write_opts, test_data_[index].key,
+                         test_data_[index].timestamp, test_data_[index].value));
       const Snapshot* snap = db_->GetSnapshot();
       test_data_[index].seq_num = snap->GetSequenceNumber();
       if (index > 0) {
@@ -951,7 +1949,7 @@ class DataVisibilityTest : public DBBasicTestWithTimestampBase {
       db_->ReleaseSnapshot(snap);
     } else {
       ASSERT_OK(db_->Put(write_opts, cfh, test_data_[index].key,
-                         test_data_[index].value));
+                         test_data_[index].timestamp, test_data_[index].value));
     }
   }
 
@@ -1046,7 +2044,7 @@ constexpr int DataVisibilityTest::kTestDataSize;
 //                               seq'=11
 //                               write finishes
 //         GetImpl(ts,seq)
-// It is OK to return <k, t1, s1> if ts>=t1 AND seq>=s1. If ts>=1t1 but seq<s1,
+// It is OK to return <k, t1, s1> if ts>=t1 AND seq>=s1. If ts>=t1 but seq<s1,
 // the key should not be returned.
 TEST_F(DataVisibilityTest, PointLookupWithoutSnapshot1) {
   Options options = CurrentOptions();
@@ -1063,13 +2061,11 @@ TEST_F(DataVisibilityTest, PointLookupWithoutSnapshot1) {
   });
   SyncPoint::GetInstance()->EnableProcessing();
   port::Thread writer_thread([this]() {
-    std::string write_ts_str = Timestamp(1, 0);
-    Slice write_ts = write_ts_str;
+    std::string write_ts = Timestamp(1, 0);
     WriteOptions write_opts;
-    write_opts.timestamp = &write_ts;
     TEST_SYNC_POINT(
         "DataVisibilityTest::PointLookupWithoutSnapshot1:BeforePut");
-    Status s = db_->Put(write_opts, "foo", "value");
+    Status s = db_->Put(write_opts, "foo", write_ts, "value");
     ASSERT_OK(s);
     TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot1:AfterPut");
   });
@@ -1111,20 +2107,16 @@ TEST_F(DataVisibilityTest, PointLookupWithoutSnapshot2) {
   });
   SyncPoint::GetInstance()->EnableProcessing();
   port::Thread writer_thread([this]() {
-    std::string write_ts_str = Timestamp(1, 0);
-    Slice write_ts = write_ts_str;
+    std::string write_ts = Timestamp(1, 0);
     WriteOptions write_opts;
-    write_opts.timestamp = &write_ts;
     TEST_SYNC_POINT(
         "DataVisibilityTest::PointLookupWithoutSnapshot2:BeforePut");
-    Status s = db_->Put(write_opts, "foo", "value");
+    Status s = db_->Put(write_opts, "foo", write_ts, "value");
     ASSERT_OK(s);
     ASSERT_OK(Flush());
 
-    write_ts_str = Timestamp(2, 0);
-    write_ts = write_ts_str;
-    write_opts.timestamp = &write_ts;
-    s = db_->Put(write_opts, "bar", "value");
+    write_ts = Timestamp(2, 0);
+    s = db_->Put(write_opts, "bar", write_ts, "value");
     ASSERT_OK(s);
     TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithoutSnapshot2:AfterPut");
   });
@@ -1165,12 +2157,10 @@ TEST_F(DataVisibilityTest, PointLookupWithSnapshot1) {
   });
   SyncPoint::GetInstance()->EnableProcessing();
   port::Thread writer_thread([this]() {
-    std::string write_ts_str = Timestamp(1, 0);
-    Slice write_ts = write_ts_str;
+    std::string write_ts = Timestamp(1, 0);
     WriteOptions write_opts;
-    write_opts.timestamp = &write_ts;
     TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:BeforePut");
-    Status s = db_->Put(write_opts, "foo", "value");
+    Status s = db_->Put(write_opts, "foo", write_ts, "value");
     TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot1:AfterPut");
     ASSERT_OK(s);
   });
@@ -1217,19 +2207,15 @@ TEST_F(DataVisibilityTest, PointLookupWithSnapshot2) {
   });
   SyncPoint::GetInstance()->EnableProcessing();
   port::Thread writer_thread([this]() {
-    std::string write_ts_str = Timestamp(1, 0);
-    Slice write_ts = write_ts_str;
+    std::string write_ts = Timestamp(1, 0);
     WriteOptions write_opts;
-    write_opts.timestamp = &write_ts;
     TEST_SYNC_POINT("DataVisibilityTest::PointLookupWithSnapshot2:BeforePut");
-    Status s = db_->Put(write_opts, "foo", "value1");
+    Status s = db_->Put(write_opts, "foo", write_ts, "value1");
     ASSERT_OK(s);
     ASSERT_OK(Flush());
 
-    write_ts_str = Timestamp(2, 0);
-    write_ts = write_ts_str;
-    write_opts.timestamp = &write_ts;
-    s = db_->Put(write_opts, "bar", "value2");
+    write_ts = Timestamp(2, 0);
+    s = db_->Put(write_opts, "bar", write_ts, "value2");
     ASSERT_OK(s);
   });
   const Snapshot* snap = db_->GetSnapshot();
@@ -1274,10 +2260,8 @@ TEST_F(DataVisibilityTest, RangeScanWithoutSnapshot) {
     WriteOptions write_opts;
     TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithoutSnapshot:BeforePut");
     for (int i = 0; i < 3; ++i) {
-      std::string write_ts_str = Timestamp(i + 1, 0);
-      Slice write_ts = write_ts_str;
-      write_opts.timestamp = &write_ts;
-      Status s = db_->Put(write_opts, "key" + std::to_string(i),
+      std::string write_ts = Timestamp(i + 1, 0);
+      Status s = db_->Put(write_opts, "key" + std::to_string(i), write_ts,
                           "value" + std::to_string(i));
       ASSERT_OK(s);
     }
@@ -1321,10 +2305,8 @@ TEST_F(DataVisibilityTest, RangeScanWithSnapshot) {
     WriteOptions write_opts;
     TEST_SYNC_POINT("DataVisibilityTest::RangeScanWithSnapshot:BeforePut");
     for (int i = 0; i < 3; ++i) {
-      std::string write_ts_str = Timestamp(i + 1, 0);
-      Slice write_ts = write_ts_str;
-      write_opts.timestamp = &write_ts;
-      Status s = db_->Put(write_opts, "key" + std::to_string(i),
+      std::string write_ts = Timestamp(i + 1, 0);
+      Status s = db_->Put(write_opts, "key" + std::to_string(i), write_ts,
                           "value" + std::to_string(i));
       ASSERT_OK(s);
     }
@@ -1372,7 +2354,7 @@ TEST_F(DataVisibilityTest, MultiGetWithTimestamp) {
   VerifyDefaultCF(snap0);
   VerifyDefaultCF(snap1);
 
-  Flush();
+  ASSERT_OK(Flush());
 
   const Snapshot* snap2 = db_->GetSnapshot();
   PutTestData(2);
@@ -1458,7 +2440,7 @@ TEST_F(DataVisibilityTest, MultiGetCrossCF) {
   VerifyDefaultCF(snap0);
   VerifyDefaultCF(snap1);
 
-  Flush();
+  ASSERT_OK(Flush());
 
   const Snapshot* snap2 = db_->GetSnapshot();
   PutTestData(2);
@@ -1498,6 +2480,7 @@ TEST_F(DataVisibilityTest, MultiGetCrossCF) {
   Close();
 }
 
+#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
 class DBBasicTestWithTimestampCompressionSettings
     : public DBBasicTestWithTimestampBase,
       public testing::WithParamInterface<
@@ -1515,7 +2498,8 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) {
   Options options = CurrentOptions();
   options.create_if_missing = true;
   options.env = env_;
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
   size_t ts_sz = Timestamp(0, 0).size();
   TestComparator test_cmp(ts_sz);
   options.comparator = &test_cmp;
@@ -1556,12 +2540,11 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) {
     read_ts_list.push_back(Timestamp(1 + i * 2, 0));
     const Slice write_ts = write_ts_list.back();
     WriteOptions wopts;
-    wopts.timestamp = &write_ts;
     for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
       for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
-        ASSERT_OK(Put(cf, Key1(j),
-                      "value_" + std::to_string(j) + "_" + std::to_string(i),
-                      wopts));
+        ASSERT_OK(
+            db_->Put(wopts, handles_[cf], Key1(j), write_ts,
+                     "value_" + std::to_string(j) + "_" + std::to_string(i)));
       }
     }
   }
@@ -1593,7 +2576,8 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) {
   TestComparator test_cmp(kTimestampSize);
   options.comparator = &test_cmp;
   const int kNumKeysPerFile = 1024;
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
   BlockBasedTableOptions bbto;
   bbto.filter_policy = std::get<0>(GetParam());
   bbto.whole_key_filtering = true;
@@ -1625,43 +2609,47 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) {
   const size_t kNumL0Files =
       static_cast<size_t>(Options().level0_file_num_compaction_trigger);
   {
-    // Generate enough L0 files with ts=1 to trigger compaction to L1
-    std::string ts_str = Timestamp(1, 0);
-    Slice ts = ts_str;
+    // Half of the keys will go through Deletion and remaining half with
+    // SingleDeletion. Generate enough L0 files with ts=1 to trigger compaction
+    // to L1
+    std::string ts = Timestamp(1, 0);
     WriteOptions wopts;
-    wopts.timestamp = &ts;
-    for (size_t i = 0; i != kNumL0Files; ++i) {
-      for (int j = 0; j != kNumKeysPerFile; ++j) {
-        ASSERT_OK(db_->Put(wopts, Key1(j), "value" + std::to_string(i)));
+    for (size_t i = 0; i < kNumL0Files; ++i) {
+      for (int j = 0; j < kNumKeysPerFile; ++j) {
+        ASSERT_OK(db_->Put(wopts, Key1(j), ts, "value" + std::to_string(i)));
       }
       ASSERT_OK(db_->Flush(FlushOptions()));
     }
     ASSERT_OK(dbfull()->TEST_WaitForCompact());
     // Generate another L0 at ts=3
-    ts_str = Timestamp(3, 0);
-    ts = ts_str;
-    wopts.timestamp = &ts;
-    for (int i = 0; i != kNumKeysPerFile; ++i) {
+    ts = Timestamp(3, 0);
+    for (int i = 0; i < kNumKeysPerFile; ++i) {
       std::string key_str = Key1(i);
       Slice key(key_str);
       if ((i % 3) == 0) {
-        ASSERT_OK(db_->Delete(wopts, key));
+        if (i < kNumKeysPerFile / 2) {
+          ASSERT_OK(db_->Delete(wopts, key, ts));
+        } else {
+          ASSERT_OK(db_->SingleDelete(wopts, key, ts));
+        }
       } else {
-        ASSERT_OK(db_->Put(wopts, key, "new_value"));
+        ASSERT_OK(db_->Put(wopts, key, ts, "new_value"));
       }
     }
     ASSERT_OK(db_->Flush(FlushOptions()));
     // Populate memtable at ts=5
-    ts_str = Timestamp(5, 0);
-    ts = ts_str;
-    wopts.timestamp = &ts;
+    ts = Timestamp(5, 0);
     for (int i = 0; i != kNumKeysPerFile; ++i) {
       std::string key_str = Key1(i);
       Slice key(key_str);
       if ((i % 3) == 1) {
-        ASSERT_OK(db_->Delete(wopts, key));
+        if (i < kNumKeysPerFile / 2) {
+          ASSERT_OK(db_->Delete(wopts, key, ts));
+        } else {
+          ASSERT_OK(db_->SingleDelete(wopts, key, ts));
+        }
       } else if ((i % 3) == 2) {
-        ASSERT_OK(db_->Put(wopts, key, "new_value_2"));
+        ASSERT_OK(db_->Put(wopts, key, ts, "new_value_2"));
       }
     }
   }
@@ -1672,12 +2660,18 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) {
     ropts.timestamp = &ts;
     for (uint64_t i = 0; i != static_cast<uint64_t>(kNumKeysPerFile); ++i) {
       std::string value;
-      Status s = db_->Get(ropts, Key1(i), &value);
+      std::string key_ts;
+      Status s = db_->Get(ropts, Key1(i), &value, &key_ts);
       if ((i % 3) == 2) {
         ASSERT_OK(s);
         ASSERT_EQ("new_value_2", value);
+        ASSERT_EQ(Timestamp(5, 0), key_ts);
+      } else if ((i % 3) == 1) {
+        ASSERT_TRUE(s.IsNotFound());
+        ASSERT_EQ(Timestamp(5, 0), key_ts);
       } else {
         ASSERT_TRUE(s.IsNotFound());
+        ASSERT_EQ(Timestamp(3, 0), key_ts);
       }
     }
   }
@@ -1722,7 +2716,8 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
   Options options = CurrentOptions();
   options.create_if_missing = true;
   options.env = env_;
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
 
   FlushedFileCollector* collector = new FlushedFileCollector();
   options.listeners.emplace_back(collector);
@@ -1785,13 +2780,12 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
     read_ts_list.push_back(Timestamp(1 + i * 2, 0));
     const Slice write_ts = write_ts_list.back();
     WriteOptions wopts;
-    wopts.timestamp = &write_ts;
     for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
       size_t memtable_get_start = 0;
       for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
-        ASSERT_OK(Put(cf, Key1(j),
-                      "value_" + std::to_string(j) + "_" + std::to_string(i),
-                      wopts));
+        ASSERT_OK(
+            db_->Put(wopts, handles_[cf], Key1(j), write_ts,
+                     "value_" + std::to_string(j) + "_" + std::to_string(i)));
         if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
           verify_records_func(i, memtable_get_start, j, handles_[cf]);
           memtable_get_start = j + 1;
@@ -1800,6 +2794,8 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
           // incremental positions such that lowerlevel[1].smallest.userkey ==
           // higherlevel[0].largest.userkey
           ASSERT_OK(Flush(cf));
+          ASSERT_OK(dbfull()->TEST_WaitForCompact());  // wait for flush (which
+                                                       // is also a compaction)
 
           // compact files (2 at each level) to a lower level such that all
           // keys with the same timestamp is at one level, with newer versions
@@ -1838,7 +2834,10 @@ TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) {
   Options options = CurrentOptions();
   options.create_if_missing = true;
   options.env = env_;
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
+  options.memtable_prefix_bloom_size_ratio = 0.1;
+  options.memtable_whole_key_filtering = true;
 
   size_t ts_sz = Timestamp(0, 0).size();
   TestComparator test_cmp(ts_sz);
@@ -1885,19 +2884,22 @@ TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) {
     }
   };
 
+  const std::string dummy_ts(ts_sz, '\0');
   for (size_t i = 0; i != kNumTimestamps; ++i) {
     write_ts_list.push_back(Timestamp(i * 2, 0));
     read_ts_list.push_back(Timestamp(1 + i * 2, 0));
     const Slice& write_ts = write_ts_list.back();
     for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
       WriteOptions wopts;
-      WriteBatch batch(0, 0, ts_sz);
+      WriteBatch batch(0, 0, 0, ts_sz);
       for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
-        ASSERT_OK(
-            batch.Put(handles_[cf], Key1(j),
-                      "value_" + std::to_string(j) + "_" + std::to_string(i)));
+        const std::string key = Key1(j);
+        const std::string value =
+            "value_" + std::to_string(j) + "_" + std::to_string(i);
+        ASSERT_OK(batch.Put(handles_[cf], key, value));
       }
-      ASSERT_OK(batch.AssignTimestamp(write_ts));
+      ASSERT_OK(batch.UpdateTimestamps(write_ts,
+                                       [ts_sz](uint32_t) { return ts_sz; }));
       ASSERT_OK(db_->Write(wopts, &batch));
 
       verify_records_func(i, handles_[cf]);
@@ -1929,18 +2931,16 @@ TEST_F(DBBasicTestWithTimestamp, MultiGetNoReturnTs) {
   options.comparator = &test_cmp;
   DestroyAndReopen(options);
   WriteOptions write_opts;
-  std::string ts_str = Timestamp(1, 0);
-  Slice ts = ts_str;
-  write_opts.timestamp = &ts;
-  ASSERT_OK(db_->Put(write_opts, "foo", "value"));
-  ASSERT_OK(db_->Put(write_opts, "bar", "value"));
-  ASSERT_OK(db_->Put(write_opts, "fooxxxxxxxxxxxxxxxx", "value"));
-  ASSERT_OK(db_->Put(write_opts, "barxxxxxxxxxxxxxxxx", "value"));
+  std::string ts = Timestamp(1, 0);
+  ASSERT_OK(db_->Put(write_opts, "foo", ts, "value"));
+  ASSERT_OK(db_->Put(write_opts, "bar", ts, "value"));
+  ASSERT_OK(db_->Put(write_opts, "fooxxxxxxxxxxxxxxxx", ts, "value"));
+  ASSERT_OK(db_->Put(write_opts, "barxxxxxxxxxxxxxxxx", ts, "value"));
   ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
-  ts_str = Timestamp(2, 0);
-  ts = ts_str;
+  ts = Timestamp(2, 0);
+  Slice read_ts = ts;
   ReadOptions read_opts;
-  read_opts.timestamp = &ts;
+  read_opts.timestamp = &read_ts;
   {
     ColumnFamilyHandle* column_families[] = {cfh, cfh};
     Slice keys[] = {"foo", "bar"};
@@ -1986,14 +2986,15 @@ class DBBasicTestWithTimestampPrefixSeek
     : public DBBasicTestWithTimestampBase,
       public testing::WithParamInterface<
           std::tuple<std::shared_ptr<const SliceTransform>,
-                     std::shared_ptr<const FilterPolicy>, bool>> {
+                     std::shared_ptr<const FilterPolicy>, bool,
+                     BlockBasedTableOptions::IndexType>> {
  public:
   DBBasicTestWithTimestampPrefixSeek()
       : DBBasicTestWithTimestampBase(
             "/db_basic_test_with_timestamp_prefix_seek") {}
 };
 
-TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
+TEST_P(DBBasicTestWithTimestampPrefixSeek, IterateWithPrefix) {
   const size_t kNumKeysPerFile = 128;
   Options options = CurrentOptions();
   options.env = env_;
@@ -2002,9 +3003,11 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
   TestComparator test_cmp(kTimestampSize);
   options.comparator = &test_cmp;
   options.prefix_extractor = std::get<0>(GetParam());
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
   BlockBasedTableOptions bbto;
   bbto.filter_policy = std::get<1>(GetParam());
+  bbto.index_type = std::get<3>(GetParam());
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   DestroyAndReopen(options);
 
@@ -2015,10 +3018,9 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
   WriteOptions write_opts;
   {
     for (size_t i = 0; i != write_ts_list.size(); ++i) {
-      Slice write_ts = write_ts_list[i];
-      write_opts.timestamp = &write_ts;
       for (uint64_t key = kMaxKey; key >= kMinKey; --key) {
-        Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
+        Status s = db_->Put(write_opts, Key1(key), write_ts_list[i],
+                            "value" + std::to_string(i));
         ASSERT_OK(s);
       }
     }
@@ -2043,6 +3045,13 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
                          "value" + std::to_string(i), write_ts_list[i]);
       iter->Next();
       ASSERT_FALSE(iter->Valid());
+
+      // Seek to kMinKey
+      iter->Seek(Key1(kMinKey));
+      CheckIterUserEntry(iter.get(), Key1(kMinKey), kTypeValue,
+                         "value" + std::to_string(i), write_ts_list[i]);
+      iter->Prev();
+      ASSERT_FALSE(iter->Valid());
     }
     const std::vector<uint64_t> targets = {kMinKey, kMinKey + 0x10,
                                            kMinKey + 0x100, kMaxKey};
@@ -2058,6 +3067,7 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
       Slice read_ts = read_ts_list[i];
       read_opts.timestamp = &read_ts;
       std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
+      // Forward and backward iterate.
       for (size_t j = 0; j != targets.size(); ++j) {
         std::string start_key = Key1(targets[j]);
         uint64_t expected_ub =
@@ -2081,6 +3091,24 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
           it->Next();
         }
         ASSERT_EQ(expected_ub - targets[j] + 1, count);
+
+        count = 0;
+        expected_key = targets[j];
+        it->SeekForPrev(start_key);
+        uint64_t expected_lb = (targets[j] & kPrefixMask);
+        while (it->Valid()) {
+          // Out of prefix
+          if (!read_opts.prefix_same_as_start &&
+              pe->Transform(it->key()) != pe->Transform(start_key)) {
+            break;
+          }
+          CheckIterUserEntry(it.get(), Key1(expected_key), kTypeValue,
+                             "value" + std::to_string(i), write_ts_list[i]);
+          ++count;
+          --expected_key;
+          it->Prev();
+        }
+        ASSERT_EQ(targets[j] - std::max(expected_lb, kMinKey) + 1, count);
       }
     }
   }
@@ -2093,6 +3121,7 @@ INSTANTIATE_TEST_CASE_P(
     Timestamp, DBBasicTestWithTimestampPrefixSeek,
     ::testing::Combine(
         ::testing::Values(
+            std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(1)),
             std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(4)),
             std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(7)),
             std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(8))),
@@ -2102,19 +3131,25 @@ INSTANTIATE_TEST_CASE_P(
                           std::shared_ptr<const FilterPolicy>(
                               NewBloomFilterPolicy(20 /*bits_per_key*/,
                                                    false))),
-        ::testing::Bool()));
+        ::testing::Bool(),
+        ::testing::Values(
+            BlockBasedTableOptions::IndexType::kBinarySearch,
+            BlockBasedTableOptions::IndexType::kHashSearch,
+            BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
+            BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey)));
 
 class DBBasicTestWithTsIterTombstones
     : public DBBasicTestWithTimestampBase,
       public testing::WithParamInterface<
           std::tuple<std::shared_ptr<const SliceTransform>,
-                     std::shared_ptr<const FilterPolicy>, int>> {
+                     std::shared_ptr<const FilterPolicy>, int,
+                     BlockBasedTableOptions::IndexType>> {
  public:
   DBBasicTestWithTsIterTombstones()
       : DBBasicTestWithTimestampBase("/db_basic_ts_iter_tombstones") {}
 };
 
-TEST_P(DBBasicTestWithTsIterTombstones, ForwardIterDelete) {
+TEST_P(DBBasicTestWithTsIterTombstones, IterWithDelete) {
   constexpr size_t kNumKeysPerFile = 128;
   Options options = CurrentOptions();
   options.env = env_;
@@ -2122,9 +3157,11 @@ TEST_P(DBBasicTestWithTsIterTombstones, ForwardIterDelete) {
   TestComparator test_cmp(kTimestampSize);
   options.comparator = &test_cmp;
   options.prefix_extractor = std::get<0>(GetParam());
-  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  options.memtable_factory.reset(
+      test::NewSpecialSkipListFactory(kNumKeysPerFile));
   BlockBasedTableOptions bbto;
   bbto.filter_policy = std::get<1>(GetParam());
+  bbto.index_type = std::get<3>(GetParam());
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   options.num_levels = std::get<2>(GetParam());
   DestroyAndReopen(options);
@@ -2135,24 +3172,23 @@ TEST_P(DBBasicTestWithTsIterTombstones, ForwardIterDelete) {
   uint64_t key = kMinKey;
   WriteOptions write_opts;
   Slice ts = write_ts_strs[0];
-  write_opts.timestamp = &ts;
   do {
-    Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(key));
+    Status s = db_->Put(write_opts, Key1(key), write_ts_strs[0],
+                        "value" + std::to_string(key));
     ASSERT_OK(s);
     if (kMaxKey == key) {
       break;
     }
     ++key;
   } while (true);
-  // Delete them all
-  ts = write_ts_strs[1];
-  write_opts.timestamp = &ts;
+
   for (key = kMaxKey; key >= kMinKey; --key) {
     Status s;
     if (0 != (key % 2)) {
-      s = db_->Put(write_opts, Key1(key), "value1" + std::to_string(key));
+      s = db_->Put(write_opts, Key1(key), write_ts_strs[1],
+                   "value1" + std::to_string(key));
     } else {
-      s = db_->Delete(write_opts, Key1(key));
+      s = db_->Delete(write_opts, Key1(key), write_ts_strs[1]);
     }
     ASSERT_OK(s);
   }
@@ -2171,6 +3207,13 @@ TEST_P(DBBasicTestWithTsIterTombstones, ForwardIterDelete) {
       ASSERT_EQ("value1" + std::to_string(key), iter->value());
     }
     ASSERT_EQ((kMaxKey - kMinKey + 1) / 2, count);
+
+    for (iter->SeekToLast(), count = 0, key = kMaxKey; iter->Valid();
+         key -= 2, ++count, iter->Prev()) {
+      ASSERT_EQ(Key1(key), iter->key());
+      ASSERT_EQ("value1" + std::to_string(key), iter->value());
+    }
+    ASSERT_EQ((kMaxKey - kMinKey + 1) / 2, count);
   }
   Close();
 }
@@ -2186,17 +3229,648 @@ INSTANTIATE_TEST_CASE_P(
                               NewBloomFilterPolicy(10, false)),
                           std::shared_ptr<const FilterPolicy>(
                               NewBloomFilterPolicy(20, false))),
-        ::testing::Values(2, 6)));
+        ::testing::Values(2, 6),
+        ::testing::Values(
+            BlockBasedTableOptions::IndexType::kBinarySearch,
+            BlockBasedTableOptions::IndexType::kHashSearch,
+            BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
+            BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey)));
+#endif  // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
 
-}  // namespace ROCKSDB_NAMESPACE
+class UpdateFullHistoryTsLowTest : public DBBasicTestWithTimestampBase {
+ public:
+  UpdateFullHistoryTsLowTest()
+      : DBBasicTestWithTimestampBase("/update_full_history_ts_low_test") {}
+};
+
+TEST_F(UpdateFullHistoryTsLowTest, ConcurrentUpdate) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  std::string lower_ts_low = Timestamp(10, 0);
+  std::string higher_ts_low = Timestamp(25, 0);
+  const size_t kTimestampSize = lower_ts_low.size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+
+  DestroyAndReopen(options);
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  // This workaround swaps `lower_ts_low` originally used for update by the
+  // caller to `higher_ts_low` after its writer is queued to make sure
+  // the caller will always get a TryAgain error.
+  // It mimics cases where two threads update full_history_ts_low concurrently
+  // with one thread writing a higher ts_low and one thread writing a lower
+  // ts_low.
+  VersionEdit* version_edit;
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::IncreaseFullHistoryTsLowImpl:BeforeEdit",
+      [&](void* arg) { version_edit = reinterpret_cast<VersionEdit*>(arg); });
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::LogAndApply:BeforeWriterWaiting",
+      [&](void* /*arg*/) { version_edit->SetFullHistoryTsLow(higher_ts_low); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_TRUE(
+      db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(), lower_ts_low)
+          .IsTryAgain());
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp,
+       GCPreserveRangeTombstoneWhenNoOrSmallFullHistoryLow) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+
+  std::string ts_str = Timestamp(1, 0);
+  WriteOptions wopts;
+  ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v1"));
+  ASSERT_OK(db_->Put(wopts, "k2", ts_str, "v2"));
+  ASSERT_OK(db_->Put(wopts, "k3", ts_str, "v3"));
+  ts_str = Timestamp(2, 0);
+  ASSERT_OK(
+      db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k1", "k3", ts_str));
+
+  ts_str = Timestamp(3, 0);
+  Slice ts = ts_str;
+  ReadOptions ropts;
+  ropts.timestamp = &ts;
+  CompactRangeOptions cro;
+  cro.full_history_ts_low = nullptr;
+  std::string value, key_ts;
+  Status s;
+  auto verify = [&] {
+    s = db_->Get(ropts, "k1", &value);
+    ASSERT_TRUE(s.IsNotFound());
+
+    s = db_->Get(ropts, "k2", &value, &key_ts);
+    ASSERT_TRUE(s.IsNotFound());
+    ASSERT_EQ(key_ts, Timestamp(2, 0));
+
+    ASSERT_OK(db_->Get(ropts, "k3", &value, &key_ts));
+    ASSERT_EQ(value, "v3");
+    ASSERT_EQ(Timestamp(1, 0), key_ts);
+
+    size_t batch_size = 3;
+    std::vector<std::string> key_strs = {"k1", "k2", "k3"};
+    std::vector<Slice> keys{key_strs.begin(), key_strs.end()};
+    std::vector<PinnableSlice> values(batch_size);
+    std::vector<Status> statuses(batch_size);
+    db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(),
+                  values.data(), statuses.data(), true /* sorted_input */);
+    ASSERT_TRUE(statuses[0].IsNotFound());
+    ASSERT_TRUE(statuses[1].IsNotFound());
+    ASSERT_OK(statuses[2]);
+    ;
+    ASSERT_EQ(values[2], "v3");
+  };
+  verify();
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+  verify();
+  std::string lb = Timestamp(0, 0);
+  Slice lb_slice = lb;
+  cro.full_history_ts_low = &lb_slice;
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+  verify();
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp,
+       GCRangeTombstonesAndCoveredKeysRespectingTslow) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.create_if_missing = true;
+  BlockBasedTableOptions bbto;
+  bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
+  bbto.cache_index_and_filter_blocks = true;
+  bbto.whole_key_filtering = true;
+  options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  options.num_levels = 2;
+  DestroyAndReopen(options);
+
+  WriteOptions wopts;
+  ASSERT_OK(db_->Put(wopts, "k1", Timestamp(1, 0), "v1"));
+  ASSERT_OK(db_->Delete(wopts, "k2", Timestamp(2, 0)));
+  ASSERT_OK(db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k1", "k3",
+                             Timestamp(3, 0)));
+  ASSERT_OK(db_->Put(wopts, "k3", Timestamp(4, 0), "v3"));
+
+  ReadOptions ropts;
+  std::string read_ts = Timestamp(5, 0);
+  Slice read_ts_slice = read_ts;
+  ropts.timestamp = &read_ts_slice;
+  size_t batch_size = 3;
+  std::vector<std::string> key_strs = {"k1", "k2", "k3"};
+  std::vector<Slice> keys = {key_strs.begin(), key_strs.end()};
+  std::vector<PinnableSlice> values(batch_size);
+  std::vector<Status> statuses(batch_size);
+  std::vector<std::string> timestamps(batch_size);
+  db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(),
+                values.data(), timestamps.data(), statuses.data(),
+                true /* sorted_input */);
+  ASSERT_TRUE(statuses[0].IsNotFound());
+  ASSERT_EQ(timestamps[0], Timestamp(3, 0));
+  ASSERT_TRUE(statuses[1].IsNotFound());
+  // DeleteRange has a higher timestamp than Delete for "k2"
+  ASSERT_EQ(timestamps[1], Timestamp(3, 0));
+  ASSERT_OK(statuses[2]);
+  ASSERT_EQ(values[2], "v3");
+  ASSERT_EQ(timestamps[2], Timestamp(4, 0));
+
+  CompactRangeOptions cro;
+  // Range tombstone has timestamp >= full_history_ts_low, covered keys
+  // are not dropped.
+  std::string compaction_ts_str = Timestamp(2, 0);
+  Slice compaction_ts = compaction_ts_str;
+  cro.full_history_ts_low = &compaction_ts;
+  cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+  ropts.timestamp = &compaction_ts;
+  std::string value, ts;
+  ASSERT_OK(db_->Get(ropts, "k1", &value, &ts));
+  ASSERT_EQ(value, "v1");
+  // timestamp is below full_history_ts_low, zeroed out as the key goes into
+  // bottommost level
+  ASSERT_EQ(ts, Timestamp(0, 0));
+  ASSERT_TRUE(db_->Get(ropts, "k2", &value, &ts).IsNotFound());
+  ASSERT_EQ(ts, Timestamp(2, 0));
+
+  compaction_ts_str = Timestamp(4, 0);
+  compaction_ts = compaction_ts_str;
+  cro.full_history_ts_low = &compaction_ts;
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+  ropts.timestamp = &read_ts_slice;
+  // k1, k2 and the range tombstone should be dropped
+  // k3 should still exist
+  db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(),
+                values.data(), timestamps.data(), statuses.data(),
+                true /* sorted_input */);
+  ASSERT_TRUE(statuses[0].IsNotFound());
+  ASSERT_TRUE(timestamps[0].empty());
+  ASSERT_TRUE(statuses[1].IsNotFound());
+  ASSERT_TRUE(timestamps[1].empty());
+  ASSERT_OK(statuses[2]);
+  ASSERT_EQ(values[2], "v3");
+  ASSERT_EQ(timestamps[2], Timestamp(4, 0));
+
+  Close();
+}
+
+TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) {
+  const int kNum = 200, kRangeBegin = 50, kRangeEnd = 150, kNumPerFile = 25;
+  Options options = CurrentOptions();
+  options.prefix_extractor.reset(NewFixedPrefixTransform(3));
+  options.compression = kNoCompression;
+  BlockBasedTableOptions bbto;
+  bbto.index_type = GetParam();
+  bbto.block_size = 100;
+  options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+  options.env = env_;
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile));
+  DestroyAndReopen(options);
+
+  // Write half of the keys before the tombstone and half after the tombstone.
+  // Only covered keys (i.e., within the range and older than the tombstone)
+  // should be deleted.
+  for (int i = 0; i < kNum; ++i) {
+    if (i == kNum / 2) {
+      ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                                 Key1(kRangeBegin), Key1(kRangeEnd),
+                                 Timestamp(i, 0)));
+    }
+    ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0),
+                       "val" + std::to_string(i)));
+    if (i == kNum - kNumPerFile) {
+      ASSERT_OK(Flush());
+    }
+  }
+
+  ReadOptions read_opts;
+  read_opts.total_order_seek = true;
+  std::string read_ts = Timestamp(kNum, 0);
+  Slice read_ts_slice = read_ts;
+  read_opts.timestamp = &read_ts_slice;
+  {
+    std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
+    ASSERT_OK(iter->status());
+
+    int expected = 0;
+    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+      ASSERT_EQ(Key1(expected), iter->key());
+      if (expected == kRangeBegin - 1) {
+        expected = kNum / 2;
+      } else {
+        ++expected;
+      }
+    }
+    ASSERT_EQ(kNum, expected);
+
+    expected = kNum / 2;
+    for (iter->Seek(Key1(kNum / 2)); iter->Valid(); iter->Next()) {
+      ASSERT_EQ(Key1(expected), iter->key());
+      ++expected;
+    }
+    ASSERT_EQ(kNum, expected);
+
+    expected = kRangeBegin - 1;
+    for (iter->SeekForPrev(Key1(kNum / 2 - 1)); iter->Valid(); iter->Prev()) {
+      ASSERT_EQ(Key1(expected), iter->key());
+      --expected;
+    }
+    ASSERT_EQ(-1, expected);
+
+    read_ts = Timestamp(0, 0);
+    read_ts_slice = read_ts;
+    read_opts.timestamp = &read_ts_slice;
+    iter.reset(db_->NewIterator(read_opts));
+    iter->SeekToFirst();
+    ASSERT_TRUE(iter->Valid());
+    ASSERT_EQ(iter->key(), Key1(0));
+    iter->Next();
+    ASSERT_FALSE(iter->Valid());
+    ASSERT_OK(iter->status());
+  }
+
+  read_ts = Timestamp(kNum, 0);
+  read_ts_slice = read_ts;
+  read_opts.timestamp = &read_ts_slice;
+  std::string value, timestamp;
+  Status s;
+  for (int i = 0; i < kNum; ++i) {
+    s = db_->Get(read_opts, Key1(i), &value, &timestamp);
+    if (i >= kRangeBegin && i < kNum / 2) {
+      ASSERT_TRUE(s.IsNotFound());
+      ASSERT_EQ(timestamp, Timestamp(kNum / 2, 0));
+    } else {
+      ASSERT_OK(s);
+      ASSERT_EQ(value, "val" + std::to_string(i));
+      ASSERT_EQ(timestamp, Timestamp(i, 0));
+    }
+  }
+
+  size_t batch_size = kNum;
+  std::vector<std::string> key_strs(batch_size);
+  std::vector<Slice> keys(batch_size);
+  std::vector<PinnableSlice> values(batch_size);
+  std::vector<Status> statuses(batch_size);
+  std::vector<std::string> timestamps(batch_size);
+  for (int i = 0; i < kNum; ++i) {
+    key_strs[i] = Key1(i);
+    keys[i] = key_strs[i];
+  }
+  db_->MultiGet(read_opts, db_->DefaultColumnFamily(), batch_size, keys.data(),
+                values.data(), timestamps.data(), statuses.data(),
+                true /* sorted_input */);
+  for (int i = 0; i < kNum; ++i) {
+    if (i >= kRangeBegin && i < kNum / 2) {
+      ASSERT_TRUE(statuses[i].IsNotFound());
+      ASSERT_EQ(timestamps[i], Timestamp(kNum / 2, 0));
+    } else {
+      ASSERT_OK(statuses[i]);
+      ASSERT_EQ(values[i], "val" + std::to_string(i));
+      ASSERT_EQ(timestamps[i], Timestamp(i, 0));
+    }
+  }
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp, DeleteRangeGetIteratorWithSnapshot) {
+  // 4 keys 0, 1, 2, 3 at timestamps 0, 1, 2, 3 respectively.
+  // A range tombstone [1, 3) at timestamp 1 and has a sequence number between
+  // key 1 and 2.
+  Options options = CurrentOptions();
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  DestroyAndReopen(options);
+  WriteOptions write_opts;
+  std::string put_ts = Timestamp(0, 0);
+  const int kNum = 4, kNumPerFile = 1, kRangeBegin = 1, kRangeEnd = 3;
+  options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile));
+  const Snapshot* before_tombstone = nullptr;
+  const Snapshot* after_tombstone = nullptr;
+  for (int i = 0; i < kNum; ++i) {
+    ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0),
+                       "val" + std::to_string(i)));
+    if (i == kRangeBegin) {
+      before_tombstone = db_->GetSnapshot();
+      ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                                 Key1(kRangeBegin), Key1(kRangeEnd),
+                                 Timestamp(kRangeBegin, 0)));
+    }
+    if (i == kNum / 2) {
+      ASSERT_OK(Flush());
+    }
+  }
+  assert(before_tombstone);
+  after_tombstone = db_->GetSnapshot();
+  // snapshot and ts before tombstone
+  std::string read_ts_str = Timestamp(kRangeBegin - 1, 0);  // (0, 0)
+  Slice read_ts = read_ts_str;
+  ReadOptions read_opts;
+  read_opts.timestamp = &read_ts;
+  read_opts.snapshot = before_tombstone;
+  std::vector<Status> expected_status = {
+      Status::OK(), Status::NotFound(), Status::NotFound(), Status::NotFound()};
+  std::vector<std::string> expected_values(kNum);
+  expected_values[0] = "val" + std::to_string(0);
+  std::vector<std::string> expected_timestamps(kNum);
+  expected_timestamps[0] = Timestamp(0, 0);
+
+  size_t batch_size = kNum;
+  std::vector<std::string> key_strs(batch_size);
+  std::vector<Slice> keys(batch_size);
+  std::vector<PinnableSlice> values(batch_size);
+  std::vector<Status> statuses(batch_size);
+  std::vector<std::string> timestamps(batch_size);
+  for (int i = 0; i < kNum; ++i) {
+    key_strs[i] = Key1(i);
+    keys[i] = key_strs[i];
+  }
+
+  auto verify = [&] {
+    db_->MultiGet(read_opts, db_->DefaultColumnFamily(), batch_size,
+                  keys.data(), values.data(), timestamps.data(),
+                  statuses.data(), true /* sorted_input */);
+    std::string value, timestamp;
+    Status s;
+    for (int i = 0; i < kNum; ++i) {
+      s = db_->Get(read_opts, Key1(i), &value, &timestamp);
+      ASSERT_EQ(s, expected_status[i]);
+      ASSERT_EQ(statuses[i], expected_status[i]);
+      if (s.ok()) {
+        ASSERT_EQ(value, expected_values[i]);
+        ASSERT_EQ(values[i], expected_values[i]);
+      }
+      if (!timestamp.empty()) {
+        ASSERT_EQ(timestamp, expected_timestamps[i]);
+        ASSERT_EQ(timestamps[i], expected_timestamps[i]);
+      } else {
+        ASSERT_TRUE(timestamps[i].empty());
+      }
+    }
+    std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
+    std::unique_ptr<Iterator> iter_for_seek(db_->NewIterator(read_opts));
+    iter->SeekToFirst();
+    for (int i = 0; i < kNum; ++i) {
+      if (expected_status[i].ok()) {
+        auto verify_iter = [&](Iterator* iter_ptr) {
+          ASSERT_TRUE(iter_ptr->Valid());
+          ASSERT_EQ(iter_ptr->key(), keys[i]);
+          ASSERT_EQ(iter_ptr->value(), expected_values[i]);
+          ASSERT_EQ(iter_ptr->timestamp(), expected_timestamps[i]);
+        };
+        verify_iter(iter.get());
+        iter->Next();
+
+        iter_for_seek->Seek(keys[i]);
+        verify_iter(iter_for_seek.get());
+
+        iter_for_seek->SeekForPrev(keys[i]);
+        verify_iter(iter_for_seek.get());
+      }
+    }
+    ASSERT_FALSE(iter->Valid());
+    ASSERT_OK(iter->status());
+  };
+
+  verify();
+
+  // snapshot before tombstone and ts after tombstone
+  read_ts_str = Timestamp(kNum, 0);  // (4, 0)
+  read_ts = read_ts_str;
+  read_opts.timestamp = &read_ts;
+  read_opts.snapshot = before_tombstone;
+  expected_status[1] = Status::OK();
+  expected_timestamps[1] = Timestamp(1, 0);
+  expected_values[1] = "val" + std::to_string(1);
+  verify();
+
+  // snapshot after tombstone and ts before tombstone
+  read_ts_str = Timestamp(kRangeBegin - 1, 0);  // (0, 0)
+  read_ts = read_ts_str;
+  read_opts.timestamp = &read_ts;
+  read_opts.snapshot = after_tombstone;
+  expected_status[1] = Status::NotFound();
+  expected_timestamps[1].clear();
+  expected_values[1].clear();
+  verify();
+
+  // snapshot and ts after tombstone
+  read_ts_str = Timestamp(kNum, 0);  // (4, 0)
+  read_ts = read_ts_str;
+  read_opts.timestamp = &read_ts;
+  read_opts.snapshot = after_tombstone;
+  for (int i = 0; i < kNum; ++i) {
+    if (i == kRangeBegin) {
+      expected_status[i] = Status::NotFound();
+      expected_values[i].clear();
+    } else {
+      expected_status[i] = Status::OK();
+      expected_values[i] = "val" + std::to_string(i);
+    }
+    expected_timestamps[i] = Timestamp(i, 0);
+  }
+  verify();
+
+  db_->ReleaseSnapshot(before_tombstone);
+  db_->ReleaseSnapshot(after_tombstone);
+  Close();
+}
 
-#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
-extern "C" {
-void RegisterCustomObjects(int argc, char** argv);
+TEST_F(DBBasicTestWithTimestamp, MergeBasic) {
+  Options options = GetDefaultOptions();
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  options.merge_operator = std::make_shared<StringAppendTESTOperator>('.');
+  DestroyAndReopen(options);
+
+  const std::array<std::string, 3> write_ts_strs = {
+      Timestamp(100, 0), Timestamp(200, 0), Timestamp(300, 0)};
+  constexpr size_t kNumOfUniqKeys = 100;
+  ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
+
+  for (size_t i = 0; i < write_ts_strs.size(); ++i) {
+    for (size_t j = 0; j < kNumOfUniqKeys; ++j) {
+      Status s;
+      if (i == 0) {
+        const std::string val = "v" + std::to_string(j) + "_0";
+        s = db_->Put(WriteOptions(), Key1(j), write_ts_strs[i], val);
+      } else {
+        const std::string merge_op = std::to_string(i);
+        s = db_->Merge(WriteOptions(), default_cf, Key1(j), write_ts_strs[i],
+                       merge_op);
+      }
+      ASSERT_OK(s);
+    }
+  }
+
+  std::array<std::string, 3> read_ts_strs = {
+      Timestamp(150, 0), Timestamp(250, 0), Timestamp(350, 0)};
+
+  const auto verify_db_with_get = [&]() {
+    for (size_t i = 0; i < kNumOfUniqKeys; ++i) {
+      const std::string base_val = "v" + std::to_string(i) + "_0";
+      const std::array<std::string, 3> expected_values = {
+          base_val, base_val + ".1", base_val + ".1.2"};
+      const std::array<std::string, 3>& expected_ts = write_ts_strs;
+      ReadOptions read_opts;
+      for (size_t j = 0; j < read_ts_strs.size(); ++j) {
+        Slice read_ts = read_ts_strs[j];
+        read_opts.timestamp = &read_ts;
+        std::string value;
+        std::string ts;
+        const Status s = db_->Get(read_opts, Key1(i), &value, &ts);
+        ASSERT_OK(s);
+        ASSERT_EQ(expected_values[j], value);
+        ASSERT_EQ(expected_ts[j], ts);
+
+        // Do Seek/SeekForPrev
+        std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
+        it->Seek(Key1(i));
+        ASSERT_TRUE(it->Valid());
+        ASSERT_EQ(expected_values[j], it->value());
+        ASSERT_EQ(expected_ts[j], it->timestamp());
+
+        it->SeekForPrev(Key1(i));
+        ASSERT_TRUE(it->Valid());
+        ASSERT_EQ(expected_values[j], it->value());
+        ASSERT_EQ(expected_ts[j], it->timestamp());
+      }
+    }
+  };
+
+  const auto verify_db_with_iterator = [&]() {
+    std::string value_suffix;
+    for (size_t i = 0; i < read_ts_strs.size(); ++i) {
+      ReadOptions read_opts;
+      Slice read_ts = read_ts_strs[i];
+      read_opts.timestamp = &read_ts;
+      std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
+      size_t key_int_val = 0;
+      for (it->SeekToFirst(); it->Valid(); it->Next(), ++key_int_val) {
+        const std::string key = Key1(key_int_val);
+        const std::string value =
+            "v" + std::to_string(key_int_val) + "_0" + value_suffix;
+        ASSERT_EQ(key, it->key());
+        ASSERT_EQ(value, it->value());
+        ASSERT_EQ(write_ts_strs[i], it->timestamp());
+      }
+      ASSERT_EQ(kNumOfUniqKeys, key_int_val);
+
+      key_int_val = kNumOfUniqKeys - 1;
+      for (it->SeekToLast(); it->Valid(); it->Prev(), --key_int_val) {
+        const std::string key = Key1(key_int_val);
+        const std::string value =
+            "v" + std::to_string(key_int_val) + "_0" + value_suffix;
+        ASSERT_EQ(key, it->key());
+        ASSERT_EQ(value, it->value());
+        ASSERT_EQ(write_ts_strs[i], it->timestamp());
+      }
+      ASSERT_EQ(std::numeric_limits<size_t>::max(), key_int_val);
+
+      value_suffix = value_suffix + "." + std::to_string(i + 1);
+    }
+  };
+
+  verify_db_with_get();
+  verify_db_with_iterator();
+
+  ASSERT_OK(db_->Flush(FlushOptions()));
+
+  verify_db_with_get();
+  verify_db_with_iterator();
+
+  Close();
+}
+
+TEST_F(DBBasicTestWithTimestamp, MergeAfterDeletion) {
+  Options options = GetDefaultOptions();
+  options.create_if_missing = true;
+  const size_t kTimestampSize = Timestamp(0, 0).size();
+  TestComparator test_cmp(kTimestampSize);
+  options.comparator = &test_cmp;
+  options.merge_operator = std::make_shared<StringAppendTESTOperator>('.');
+  DestroyAndReopen(options);
+
+  ColumnFamilyHandle* const column_family = db_->DefaultColumnFamily();
+
+  const size_t num_keys_per_file = 10;
+  const size_t num_merges_per_key = 2;
+  for (size_t i = 0; i < num_keys_per_file; ++i) {
+    std::string ts = Timestamp(i + 10000, 0);
+    Status s = db_->Delete(WriteOptions(), Key1(i), ts);
+    ASSERT_OK(s);
+    for (size_t j = 1; j <= num_merges_per_key; ++j) {
+      ts = Timestamp(i + 10000 + j, 0);
+      s = db_->Merge(WriteOptions(), column_family, Key1(i), ts,
+                     std::to_string(j));
+      ASSERT_OK(s);
+    }
+  }
+
+  const auto verify_db = [&]() {
+    ReadOptions read_opts;
+    std::string read_ts_str = Timestamp(20000, 0);
+    Slice ts = read_ts_str;
+    read_opts.timestamp = &ts;
+    std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
+    size_t count = 0;
+    for (it->SeekToFirst(); it->Valid(); it->Next(), ++count) {
+      std::string key = Key1(count);
+      ASSERT_EQ(key, it->key());
+      std::string value;
+      for (size_t j = 1; j <= num_merges_per_key; ++j) {
+        value.append(std::to_string(j));
+        if (j < num_merges_per_key) {
+          value.push_back('.');
+        }
+      }
+      ASSERT_EQ(value, it->value());
+      std::string ts1 = Timestamp(count + 10000 + num_merges_per_key, 0);
+      ASSERT_EQ(ts1, it->timestamp());
+    }
+    ASSERT_OK(it->status());
+    ASSERT_EQ(num_keys_per_file, count);
+    for (it->SeekToLast(); it->Valid(); it->Prev(), --count) {
+      std::string key = Key1(count - 1);
+      ASSERT_EQ(key, it->key());
+      std::string value;
+      for (size_t j = 1; j <= num_merges_per_key; ++j) {
+        value.append(std::to_string(j));
+        if (j < num_merges_per_key) {
+          value.push_back('.');
+        }
+      }
+      ASSERT_EQ(value, it->value());
+      std::string ts1 = Timestamp(count - 1 + 10000 + num_merges_per_key, 0);
+      ASSERT_EQ(ts1, it->timestamp());
+    }
+    ASSERT_OK(it->status());
+    ASSERT_EQ(0, count);
+  };
+
+  verify_db();
+
+  Close();
 }
-#else
-void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
-#endif  // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
+}  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();