]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_test2.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_test2.cc
index 9efdcbd22cc091a0331d2dd76985f435e8377597..8adde36806a08a48abbfe5134dd58aeac95c8e5e 100644 (file)
@@ -6,17 +6,25 @@
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
+
 #include <atomic>
 #include <cstdlib>
 #include <functional>
+#include <memory>
 
 #include "db/db_test_util.h"
 #include "db/read_callback.h"
 #include "options/options_helper.h"
 #include "port/port.h"
 #include "port/stack_trace.h"
+#include "rocksdb/experimental.h"
+#include "rocksdb/iostats_context.h"
 #include "rocksdb/persistent_cache.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/trace_record_result.h"
+#include "rocksdb/utilities/replayer.h"
 #include "rocksdb/wal_filter.h"
+#include "test_util/testutil.h"
 #include "util/random.h"
 #include "utilities/fault_injection_env.h"
 
@@ -24,7 +32,7 @@ namespace ROCKSDB_NAMESPACE {
 
 class DBTest2 : public DBTestBase {
  public:
-  DBTest2() : DBTestBase("/db_test2", /*env_do_fsync=*/true) {}
+  DBTest2() : DBTestBase("db_test2", /*env_do_fsync=*/true) {}
 };
 
 #ifndef ROCKSDB_LITE
@@ -41,9 +49,7 @@ TEST_F(DBTest2, OpenForReadOnly) {
   std::vector<std::string> files;
   ASSERT_OK(env_->GetChildren(dbname, &files));
   for (auto& f : files) {
-    if (f != "." && f != "..") {
-      ASSERT_OK(env_->DeleteFile(dbname + "/" + f));
-    }
+    ASSERT_OK(env_->DeleteFile(dbname + "/" + f));
   }
   // <dbname> should be empty now and we should be able to delete it
   ASSERT_OK(env_->DeleteDir(dbname));
@@ -75,9 +81,7 @@ TEST_F(DBTest2, OpenForReadOnlyWithColumnFamilies) {
   std::vector<std::string> files;
   ASSERT_OK(env_->GetChildren(dbname, &files));
   for (auto& f : files) {
-    if (f != "." && f != "..") {
-      ASSERT_OK(env_->DeleteFile(dbname + "/" + f));
-    }
+    ASSERT_OK(env_->DeleteFile(dbname + "/" + f));
   }
   // <dbname> should be empty now and we should be able to delete it
   ASSERT_OK(env_->DeleteDir(dbname));
@@ -94,7 +98,7 @@ class TestReadOnlyWithCompressedCache
       public testing::WithParamInterface<std::tuple<int, bool>> {
  public:
   TestReadOnlyWithCompressedCache()
-      : DBTestBase("/test_readonly_with_compressed_cache",
+      : DBTestBase("test_readonly_with_compressed_cache",
                    /*env_do_fsync=*/true) {
     max_open_files_ = std::get<0>(GetParam());
     use_mmap_ = std::get<1>(GetParam());
@@ -158,8 +162,14 @@ class PartitionedIndexTestListener : public EventListener {
 };
 
 TEST_F(DBTest2, PartitionedIndexUserToInternalKey) {
+  const int kValueSize = 10500;
+  const int kNumEntriesPerFile = 1000;
+  const int kNumFiles = 3;
+  const int kNumDistinctKeys = 30;
+
   BlockBasedTableOptions table_options;
   Options options = CurrentOptions();
+  options.disable_auto_compactions = true;
   table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
   PartitionedIndexTestListener* listener = new PartitionedIndexTestListener();
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
@@ -168,13 +178,16 @@ TEST_F(DBTest2, PartitionedIndexUserToInternalKey) {
   Reopen(options);
   Random rnd(301);
 
-  for (int i = 0; i < 3000; i++) {
-    int j = i % 30;
-    std::string value = rnd.RandomString(10500);
-    ASSERT_OK(Put("keykey_" + std::to_string(j), value));
-    snapshots.push_back(db_->GetSnapshot());
+  for (int i = 0; i < kNumFiles; i++) {
+    for (int j = 0; j < kNumEntriesPerFile; j++) {
+      int key_id = (i * kNumEntriesPerFile + j) % kNumDistinctKeys;
+      std::string value = rnd.RandomString(kValueSize);
+      ASSERT_OK(Put("keykey_" + std::to_string(key_id), value));
+      snapshots.push_back(db_->GetSnapshot());
+    }
+    ASSERT_OK(Flush());
   }
-  Flush();
+
   for (auto s : snapshots) {
     db_->ReleaseSnapshot(s);
   }
@@ -187,7 +200,7 @@ class PrefixFullBloomWithReverseComparator
       public ::testing::WithParamInterface<bool> {
  public:
   PrefixFullBloomWithReverseComparator()
-      : DBTestBase("/prefix_bloom_reverse", /*env_do_fsync=*/true) {}
+      : DBTestBase("prefix_bloom_reverse", /*env_do_fsync=*/true) {}
   void SetUp() override { if_cache_filter_ = GetParam(); }
   bool if_cache_filter_;
 };
@@ -213,7 +226,7 @@ TEST_P(PrefixFullBloomWithReverseComparator,
   ASSERT_OK(dbfull()->Put(WriteOptions(), "bar234", "foo2"));
   ASSERT_OK(dbfull()->Put(WriteOptions(), "foo123", "foo3"));
 
-  dbfull()->Flush(FlushOptions());
+  ASSERT_OK(dbfull()->Flush(FlushOptions()));
 
   if (bbto.block_cache) {
     bbto.block_cache->EraseUnRefEntries();
@@ -245,18 +258,20 @@ INSTANTIATE_TEST_CASE_P(PrefixFullBloomWithReverseComparator,
                         PrefixFullBloomWithReverseComparator, testing::Bool());
 
 TEST_F(DBTest2, IteratorPropertyVersionNumber) {
-  Put("", "");
+  ASSERT_OK(Put("", ""));
   Iterator* iter1 = db_->NewIterator(ReadOptions());
+  ASSERT_OK(iter1->status());
   std::string prop_value;
   ASSERT_OK(
       iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
   uint64_t version_number1 =
       static_cast<uint64_t>(std::atoi(prop_value.c_str()));
 
-  Put("", "");
-  Flush();
+  ASSERT_OK(Put("", ""));
+  ASSERT_OK(Flush());
 
   Iterator* iter2 = db_->NewIterator(ReadOptions());
+  ASSERT_OK(iter2->status());
   ASSERT_OK(
       iter2->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
   uint64_t version_number2 =
@@ -264,9 +279,10 @@ TEST_F(DBTest2, IteratorPropertyVersionNumber) {
 
   ASSERT_GT(version_number2, version_number1);
 
-  Put("", "");
+  ASSERT_OK(Put("", ""));
 
   Iterator* iter3 = db_->NewIterator(ReadOptions());
+  ASSERT_OK(iter3->status());
   ASSERT_OK(
       iter3->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
   uint64_t version_number3 =
@@ -296,8 +312,8 @@ TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) {
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
   CreateAndReopenWithCF({"pikachu"}, options);
 
-  Put(1, "a", "begin");
-  Put(1, "z", "end");
+  ASSERT_OK(Put(1, "a", "begin"));
+  ASSERT_OK(Put(1, "z", "end"));
   ASSERT_OK(Flush(1));
   TryReopenWithColumnFamilies({"default", "pikachu"}, options);
 
@@ -313,10 +329,10 @@ TEST_F(DBTest2, MaxSuccessiveMergesChangeWithDBRecovery) {
   options.merge_operator = MergeOperators::CreatePutOperator();
   options.disable_auto_compactions = true;
   DestroyAndReopen(options);
-  Put("poi", "Finch");
-  db_->Merge(WriteOptions(), "poi", "Reese");
-  db_->Merge(WriteOptions(), "poi", "Shaw");
-  db_->Merge(WriteOptions(), "poi", "Root");
+  ASSERT_OK(Put("poi", "Finch"));
+  ASSERT_OK(db_->Merge(WriteOptions(), "poi", "Reese"));
+  ASSERT_OK(db_->Merge(WriteOptions(), "poi", "Shaw"));
+  ASSERT_OK(db_->Merge(WriteOptions(), "poi", "Root"));
   options.max_successive_merges = 2;
   Reopen(options);
 }
@@ -327,7 +343,7 @@ class DBTestSharedWriteBufferAcrossCFs
       public testing::WithParamInterface<std::tuple<bool, bool>> {
  public:
   DBTestSharedWriteBufferAcrossCFs()
-      : DBTestBase("/db_test_shared_write_buffer", /*env_do_fsync=*/true) {}
+      : DBTestBase("db_test_shared_write_buffer", /*env_do_fsync=*/true) {}
   void SetUp() override {
     use_old_interface_ = std::get<0>(GetParam());
     cost_cache_ = std::get<1>(GetParam());
@@ -339,6 +355,10 @@ class DBTestSharedWriteBufferAcrossCFs
 TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
   Options options = CurrentOptions();
   options.arena_block_size = 4096;
+  auto flush_listener = std::make_shared<FlushCounterListener>();
+  options.listeners.push_back(flush_listener);
+  // Don't trip the listener at shutdown.
+  options.avoid_flush_during_shutdown = true;
 
   // Avoid undeterministic value by malloc_usable_size();
   // Force arena block size to 1
@@ -374,14 +394,18 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
   wo.disableWAL = true;
 
   std::function<void()> wait_flush = [&]() {
-    dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
-    dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
-    dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
-    dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
+    // Ensure background work is fully finished including listener callbacks
+    // before accessing listener state.
+    ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
   };
 
   // Create some data and flush "default" and "nikitich" so that they
   // are newer CFs created.
+  flush_listener->expected_flush_reason = FlushReason::kManualFlush;
   ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
   Flush(3);
   ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
@@ -392,6 +416,7 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
   ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
             static_cast<uint64_t>(1));
 
+  flush_listener->expected_flush_reason = FlushReason::kWriteBufferManager;
   ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
   if (cost_cache_) {
     ASSERT_GE(cache->GetUsage(), 256 * 1024);
@@ -516,6 +541,10 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
   std::string dbname2 = test::PerThreadDBPath("db_shared_wb_db2");
   Options options = CurrentOptions();
   options.arena_block_size = 4096;
+  auto flush_listener = std::make_shared<FlushCounterListener>();
+  options.listeners.push_back(flush_listener);
+  // Don't trip the listener at shutdown.
+  options.avoid_flush_during_shutdown = true;
   // Avoid undeterministic value by malloc_usable_size();
   // Force arena block size to 1
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
@@ -546,13 +575,19 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
   wo.disableWAL = true;
 
   std::function<void()> wait_flush = [&]() {
-    dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
-    dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
-    dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
-    static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
+    ASSERT_OK(static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable());
+    // Ensure background work is fully finished including listener callbacks
+    // before accessing listener state.
+    ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
+    ASSERT_OK(
+        static_cast_with_check<DBImpl>(db2)->TEST_WaitForBackgroundWork());
   };
 
   // Trigger a flush on cf2
+  flush_listener->expected_flush_reason = FlushReason::kWriteBufferManager;
   ASSERT_OK(Put(2, Key(1), DummyString(70000), wo));
   wait_flush();
   ASSERT_OK(Put(0, Key(1), DummyString(20000), wo));
@@ -564,7 +599,7 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
 
   ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
   wait_flush();
-  static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
+  ASSERT_OK(static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable());
   {
     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default") +
                   GetNumberOfSstFilesForColumnFamily(db_, "cf1") +
@@ -595,7 +630,7 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
   wait_flush();
   ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
   wait_flush();
-  static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
+  ASSERT_OK(static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable());
   {
     ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
               static_cast<uint64_t>(1));
@@ -616,8 +651,12 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
 TEST_F(DBTest2, TestWriteBufferNoLimitWithCache) {
   Options options = CurrentOptions();
   options.arena_block_size = 4096;
-  std::shared_ptr<Cache> cache =
-      NewLRUCache(LRUCacheOptions(10000000, 1, false, 0.0));
+  std::shared_ptr<Cache> cache = NewLRUCache(LRUCacheOptions(
+      10000000 /* capacity */, 1 /* num_shard_bits */,
+      false /* strict_capacity_limit */, 0.0 /* high_pri_pool_ratio */,
+      nullptr /* memory_allocator */, kDefaultToAdaptiveMutex,
+      kDontChargeCacheMetadata));
+
   options.write_buffer_size = 50000;  // this is never hit
   // Use a write buffer total size so that the soft limit is about
   // 105000.
@@ -630,33 +669,33 @@ TEST_F(DBTest2, TestWriteBufferNoLimitWithCache) {
 }
 
 namespace {
-  void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
-    const std::vector<Slice>& keys_must_not_exist) {
-    // Ensure that expected keys exist
-    std::vector<std::string> values;
-    if (keys_must_exist.size() > 0) {
-      std::vector<Status> status_list =
+void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
+                          const std::vector<Slice>& keys_must_not_exist) {
+  // Ensure that expected keys exist
+  std::vector<std::string> values;
+  if (keys_must_exist.size() > 0) {
+    std::vector<Status> status_list =
         db->MultiGet(ReadOptions(), keys_must_exist, &values);
-      for (size_t i = 0; i < keys_must_exist.size(); i++) {
-        ASSERT_OK(status_list[i]);
-      }
+    for (size_t i = 0; i < keys_must_exist.size(); i++) {
+      ASSERT_OK(status_list[i]);
     }
+  }
 
-    // Ensure that given keys don't exist
-    if (keys_must_not_exist.size() > 0) {
-      std::vector<Status> status_list =
+  // Ensure that given keys don't exist
+  if (keys_must_not_exist.size() > 0) {
+    std::vector<Status> status_list =
         db->MultiGet(ReadOptions(), keys_must_not_exist, &values);
-      for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
-        ASSERT_TRUE(status_list[i].IsNotFound());
-      }
+    for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
+      ASSERT_TRUE(status_list[i].IsNotFound());
     }
   }
+}
 
-}  // namespace
+}  // anonymous namespace
 
 TEST_F(DBTest2, WalFilterTest) {
   class TestWalFilter : public WalFilter {
-  private:
+   private:
     // Processing option that is requested to be applied at the given index
     WalFilter::WalProcessingOption wal_processing_option_;
     // Index at which to apply wal_processing_option_
@@ -666,12 +705,12 @@ TEST_F(DBTest2, WalFilterTest) {
     // Current record index, incremented with each record encountered.
     size_t current_record_index_;
 
-  public:
+   public:
     TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
-      size_t apply_option_for_record_index)
-      : wal_processing_option_(wal_processing_option),
-      apply_option_at_record_index_(apply_option_for_record_index),
-      current_record_index_(0) {}
+                  size_t apply_option_for_record_index)
+        : wal_processing_option_(wal_processing_option),
+          apply_option_at_record_index_(apply_option_for_record_index),
+          current_record_index_(0) {}
 
     WalProcessingOption LogRecord(const WriteBatch& /*batch*/,
                                   WriteBatch* /*new_batch*/,
@@ -680,8 +719,7 @@ TEST_F(DBTest2, WalFilterTest) {
 
       if (current_record_index_ == apply_option_at_record_index_) {
         option_to_return = wal_processing_option_;
-      }
-      else {
+      } else {
         option_to_return = WalProcessingOption::kContinueProcessing;
       }
 
@@ -708,46 +746,45 @@ TEST_F(DBTest2, WalFilterTest) {
 
   // Test with all WAL processing options
   for (int option = 0;
-    option < static_cast<int>(
-    WalFilter::WalProcessingOption::kWalProcessingOptionMax);
-  option++) {
+       option < static_cast<int>(
+                    WalFilter::WalProcessingOption::kWalProcessingOptionMax);
+       option++) {
     Options options = OptionsForLogIterTest();
     DestroyAndReopen(options);
-    CreateAndReopenWithCF({ "pikachu" }, options);
+    CreateAndReopenWithCF({"pikachu"}, options);
 
     // Write given keys in given batches
     for (size_t i = 0; i < batch_keys.size(); i++) {
       WriteBatch batch;
       for (size_t j = 0; j < batch_keys[i].size(); j++) {
-        batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
+        ASSERT_OK(batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)));
       }
-      dbfull()->Write(WriteOptions(), &batch);
+      ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
     }
 
     WalFilter::WalProcessingOption wal_processing_option =
-      static_cast<WalFilter::WalProcessingOption>(option);
+        static_cast<WalFilter::WalProcessingOption>(option);
 
     // Create a test filter that would apply wal_processing_option at the first
     // record
     size_t apply_option_for_record_index = 1;
     TestWalFilter test_wal_filter(wal_processing_option,
-      apply_option_for_record_index);
+                                  apply_option_for_record_index);
 
     // Reopen database with option to use WAL filter
     options = OptionsForLogIterTest();
     options.wal_filter = &test_wal_filter;
     Status status =
-      TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
+        TryReopenWithColumnFamilies({"default", "pikachu"}, options);
     if (wal_processing_option ==
-      WalFilter::WalProcessingOption::kCorruptedRecord) {
-      assert(!status.ok());
+        WalFilter::WalProcessingOption::kCorruptedRecord) {
+      ASSERT_NOK(status);
       // In case of corruption we can turn off paranoid_checks to reopen
       // databse
       options.paranoid_checks = false;
-      ReopenWithColumnFamilies({ "default", "pikachu" }, options);
-    }
-    else {
-      assert(status.ok());
+      ReopenWithColumnFamilies({"default", "pikachu"}, options);
+    } else {
+      ASSERT_OK(status);
     }
 
     // Compute which keys we expect to be found
@@ -755,56 +792,54 @@ TEST_F(DBTest2, WalFilterTest) {
     std::vector<Slice> keys_must_exist;
     std::vector<Slice> keys_must_not_exist;
     switch (wal_processing_option) {
-    case WalFilter::WalProcessingOption::kCorruptedRecord:
-    case WalFilter::WalProcessingOption::kContinueProcessing: {
-      fprintf(stderr, "Testing with complete WAL processing\n");
-      // we expect all records to be processed
-      for (size_t i = 0; i < batch_keys.size(); i++) {
-        for (size_t j = 0; j < batch_keys[i].size(); j++) {
-          keys_must_exist.push_back(Slice(batch_keys[i][j]));
-        }
-      }
-      break;
-    }
-    case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
-      fprintf(stderr,
-        "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
-        apply_option_for_record_index);
-      // We expect the record with apply_option_for_record_index to be not
-      // found.
-      for (size_t i = 0; i < batch_keys.size(); i++) {
-        for (size_t j = 0; j < batch_keys[i].size(); j++) {
-          if (i == apply_option_for_record_index) {
-            keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
-          }
-          else {
+      case WalFilter::WalProcessingOption::kCorruptedRecord:
+      case WalFilter::WalProcessingOption::kContinueProcessing: {
+        fprintf(stderr, "Testing with complete WAL processing\n");
+        // we expect all records to be processed
+        for (size_t i = 0; i < batch_keys.size(); i++) {
+          for (size_t j = 0; j < batch_keys[i].size(); j++) {
             keys_must_exist.push_back(Slice(batch_keys[i][j]));
           }
         }
+        break;
       }
-      break;
-    }
-    case WalFilter::WalProcessingOption::kStopReplay: {
-      fprintf(stderr,
-        "Testing with stopping replay from record %" ROCKSDB_PRIszt
-        "\n",
-        apply_option_for_record_index);
-      // We expect records beyond apply_option_for_record_index to be not
-      // found.
-      for (size_t i = 0; i < batch_keys.size(); i++) {
-        for (size_t j = 0; j < batch_keys[i].size(); j++) {
-          if (i >= apply_option_for_record_index) {
-            keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
+      case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
+        fprintf(stderr,
+                "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
+                apply_option_for_record_index);
+        // We expect the record with apply_option_for_record_index to be not
+        // found.
+        for (size_t i = 0; i < batch_keys.size(); i++) {
+          for (size_t j = 0; j < batch_keys[i].size(); j++) {
+            if (i == apply_option_for_record_index) {
+              keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
+            } else {
+              keys_must_exist.push_back(Slice(batch_keys[i][j]));
+            }
           }
-          else {
-            keys_must_exist.push_back(Slice(batch_keys[i][j]));
+        }
+        break;
+      }
+      case WalFilter::WalProcessingOption::kStopReplay: {
+        fprintf(stderr,
+                "Testing with stopping replay from record %" ROCKSDB_PRIszt
+                "\n",
+                apply_option_for_record_index);
+        // We expect records beyond apply_option_for_record_index to be not
+        // found.
+        for (size_t i = 0; i < batch_keys.size(); i++) {
+          for (size_t j = 0; j < batch_keys[i].size(); j++) {
+            if (i >= apply_option_for_record_index) {
+              keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
+            } else {
+              keys_must_exist.push_back(Slice(batch_keys[i][j]));
+            }
           }
         }
+        break;
       }
-      break;
-    }
-    default:
-      assert(false);  // unhandled case
+      default:
+        FAIL();  // unhandled case
     }
 
     bool checked_after_reopen = false;
@@ -822,7 +857,7 @@ TEST_F(DBTest2, WalFilterTest) {
       //(even if they were skipped)
       // reopn database with option to use WAL filter
       options = OptionsForLogIterTest();
-      ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+      ReopenWithColumnFamilies({"default", "pikachu"}, options);
 
       checked_after_reopen = true;
     }
@@ -831,7 +866,7 @@ TEST_F(DBTest2, WalFilterTest) {
 
 TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
   class ChangeBatchHandler : public WriteBatch::Handler {
-  private:
+   private:
     // Batch to insert keys in
     WriteBatch* new_write_batch_;
     // Number of keys to add in the new batch
@@ -839,22 +874,22 @@ TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
     // Number of keys added to new batch
     size_t num_keys_added_;
 
-  public:
+   public:
     ChangeBatchHandler(WriteBatch* new_write_batch,
-      size_t num_keys_to_add_in_new_batch)
-      : new_write_batch_(new_write_batch),
-      num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
-      num_keys_added_(0) {}
+                       size_t num_keys_to_add_in_new_batch)
+        : new_write_batch_(new_write_batch),
+          num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
+          num_keys_added_(0) {}
     void Put(const Slice& key, const Slice& value) override {
       if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
-        new_write_batch_->Put(key, value);
+        ASSERT_OK(new_write_batch_->Put(key, value));
         ++num_keys_added_;
       }
     }
   };
 
   class TestWalFilterWithChangeBatch : public WalFilter {
-  private:
+   private:
     // Index at which to start changing records
     size_t change_records_from_index_;
     // Number of keys to add in the new batch
@@ -862,27 +897,31 @@ TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
     // Current record index, incremented with each record encountered.
     size_t current_record_index_;
 
-  public:
+   public:
     TestWalFilterWithChangeBatch(size_t change_records_from_index,
-      size_t num_keys_to_add_in_new_batch)
-      : change_records_from_index_(change_records_from_index),
-      num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
-      current_record_index_(0) {}
+                                 size_t num_keys_to_add_in_new_batch)
+        : change_records_from_index_(change_records_from_index),
+          num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
+          current_record_index_(0) {}
 
     WalProcessingOption LogRecord(const WriteBatch& batch,
                                   WriteBatch* new_batch,
                                   bool* batch_changed) const override {
       if (current_record_index_ >= change_records_from_index_) {
         ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
-        batch.Iterate(&handler);
-        *batch_changed = true;
+        Status s = batch.Iterate(&handler);
+        if (s.ok()) {
+          *batch_changed = true;
+        } else {
+          assert(false);
+        }
       }
 
       // Filter is passed as a const object for RocksDB to not modify the
       // object, however we modify it for our own purpose here and hence
       // cast the constness away.
       (const_cast<TestWalFilterWithChangeBatch*>(this)
-        ->current_record_index_)++;
+           ->current_record_index_)++;
 
       return WalProcessingOption::kContinueProcessing;
     }
@@ -901,15 +940,15 @@ TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
 
   Options options = OptionsForLogIterTest();
   DestroyAndReopen(options);
-  CreateAndReopenWithCF({ "pikachu" }, options);
+  CreateAndReopenWithCF({"pikachu"}, options);
 
   // Write given keys in given batches
   for (size_t i = 0; i < batch_keys.size(); i++) {
     WriteBatch batch;
     for (size_t j = 0; j < batch_keys[i].size(); j++) {
-      batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
+      ASSERT_OK(batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)));
     }
-    dbfull()->Write(WriteOptions(), &batch);
+    ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
   }
 
   // Create a test filter that would apply wal_processing_option at the first
@@ -917,12 +956,12 @@ TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
   size_t change_records_from_index = 1;
   size_t num_keys_to_add_in_new_batch = 1;
   TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
-    change_records_from_index, num_keys_to_add_in_new_batch);
+      change_records_from_index, num_keys_to_add_in_new_batch);
 
   // Reopen database with option to use WAL filter
   options = OptionsForLogIterTest();
   options.wal_filter = &test_wal_filter_with_change_batch;
-  ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+  ReopenWithColumnFamilies({"default", "pikachu"}, options);
 
   // Ensure that all keys exist before change_records_from_index_
   // And after that index only single key exists
@@ -934,8 +973,7 @@ TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
     for (size_t j = 0; j < batch_keys[i].size(); j++) {
       if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
         keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
-      }
-      else {
+      } else {
         keys_must_exist.push_back(Slice(batch_keys[i][j]));
       }
     }
@@ -956,7 +994,7 @@ TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
     //(even if they were skipped)
     // reopn database with option to use WAL filter
     options = OptionsForLogIterTest();
-    ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+    ReopenWithColumnFamilies({"default", "pikachu"}, options);
 
     checked_after_reopen = true;
   }
@@ -964,18 +1002,23 @@ TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
 
 TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
   class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
-  public:
-   WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch,
-                                 bool* batch_changed) const override {
-     *new_batch = batch;
-     new_batch->Put("key_extra", "value_extra");
-     *batch_changed = true;
-     return WalProcessingOption::kContinueProcessing;
-   }
-
-   const char* Name() const override {
-     return "WalFilterTestWithChangeBatchExtraKeys";
-   }
+   public:
+    WalProcessingOption LogRecord(const WriteBatch& batch,
+                                  WriteBatch* new_batch,
+                                  bool* batch_changed) const override {
+      *new_batch = batch;
+      Status s = new_batch->Put("key_extra", "value_extra");
+      if (s.ok()) {
+        *batch_changed = true;
+      } else {
+        assert(false);
+      }
+      return WalProcessingOption::kContinueProcessing;
+    }
+
+    const char* Name() const override {
+      return "WalFilterTestWithChangeBatchExtraKeys";
+    }
   };
 
   std::vector<std::vector<std::string>> batch_keys(3);
@@ -989,15 +1032,15 @@ TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
 
   Options options = OptionsForLogIterTest();
   DestroyAndReopen(options);
-  CreateAndReopenWithCF({ "pikachu" }, options);
+  CreateAndReopenWithCF({"pikachu"}, options);
 
   // Write given keys in given batches
   for (size_t i = 0; i < batch_keys.size(); i++) {
     WriteBatch batch;
     for (size_t j = 0; j < batch_keys[i].size(); j++) {
-      batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
+      ASSERT_OK(batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)));
     }
-    dbfull()->Write(WriteOptions(), &batch);
+    ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
   }
 
   // Create a test filter that would add extra keys
@@ -1012,7 +1055,7 @@ TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
   // Reopen without filter, now reopen should succeed - previous
   // attempt to open must not have altered the db.
   options = OptionsForLogIterTest();
-  ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+  ReopenWithColumnFamilies({"default", "pikachu"}, options);
 
   std::vector<Slice> keys_must_exist;
   std::vector<Slice> keys_must_not_exist;  // empty vector
@@ -1028,7 +1071,7 @@ TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
 
 TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
   class TestWalFilterWithColumnFamilies : public WalFilter {
-  private:
+   private:
     // column_family_id -> log_number map (provided to WALFilter)
     std::map<uint32_t, uint64_t> cf_log_number_map_;
     // column_family_name -> column_family_id map (provided to WALFilter)
@@ -1038,31 +1081,34 @@ TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
     // during recovery (i.e. aren't already flushed to SST file(s))
     // for verification against the keys we expect.
     std::map<uint32_t, std::vector<std::string>> cf_wal_keys_;
-  public:
-   void ColumnFamilyLogNumberMap(
-       const std::map<uint32_t, uint64_t>& cf_lognumber_map,
-       const std::map<std::string, uint32_t>& cf_name_id_map) override {
-     cf_log_number_map_ = cf_lognumber_map;
-     cf_name_id_map_ = cf_name_id_map;
-   }
-
-   WalProcessingOption LogRecordFound(unsigned long long log_number,
-                                      const std::string& /*log_file_name*/,
-                                      const WriteBatch& batch,
-                                      WriteBatch* /*new_batch*/,
-                                      bool* /*batch_changed*/) override {
-     class LogRecordBatchHandler : public WriteBatch::Handler {
-      private:
-        const std::map<uint32_t, uint64_t> & cf_log_number_map_;
-        std::map<uint32_t, std::vector<std::string>> & cf_wal_keys_;
+
+   public:
+    void ColumnFamilyLogNumberMap(
+        const std::map<uint32_t, uint64_t>& cf_lognumber_map,
+        const std::map<std::string, uint32_t>& cf_name_id_map) override {
+      cf_log_number_map_ = cf_lognumber_map;
+      cf_name_id_map_ = cf_name_id_map;
+    }
+
+    WalProcessingOption LogRecordFound(unsigned long long log_number,
+                                       const std::string& /*log_file_name*/,
+                                       const WriteBatch& batch,
+                                       WriteBatch* /*new_batch*/,
+                                       bool* /*batch_changed*/) override {
+      class LogRecordBatchHandler : public WriteBatch::Handler {
+       private:
+        const std::map<uint32_t, uint64_t>& cf_log_number_map_;
+        std::map<uint32_t, std::vector<std::string>>& cf_wal_keys_;
         unsigned long long log_number_;
-      public:
-        LogRecordBatchHandler(unsigned long long current_log_number,
-          const std::map<uint32_t, uint64_t> & cf_log_number_map,
-          std::map<uint32_t, std::vector<std::string>> & cf_wal_keys) :
-          cf_log_number_map_(cf_log_number_map),
-          cf_wal_keys_(cf_wal_keys),
-          log_number_(current_log_number){}
+
+       public:
+        LogRecordBatchHandler(
+            unsigned long long current_log_number,
+            const std::map<uint32_t, uint64_t>& cf_log_number_map,
+            std::map<uint32_t, std::vector<std::string>>& cf_wal_keys)
+            : cf_log_number_map_(cf_log_number_map),
+              cf_wal_keys_(cf_wal_keys),
+              log_number_(current_log_number) {}
 
         Status PutCF(uint32_t column_family_id, const Slice& key,
                      const Slice& /*value*/) override {
@@ -1073,27 +1119,31 @@ TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
           // (i.e. isn't flushed to SST file(s) for column_family_id)
           // add it to the cf_wal_keys_ map for verification.
           if (log_number_ >= log_number_for_cf) {
-            cf_wal_keys_[column_family_id].push_back(std::string(key.data(),
-              key.size()));
+            cf_wal_keys_[column_family_id].push_back(
+                std::string(key.data(), key.size()));
           }
           return Status::OK();
         }
       } handler(log_number, cf_log_number_map_, cf_wal_keys_);
 
-      batch.Iterate(&handler);
+      Status s = batch.Iterate(&handler);
+      if (!s.ok()) {
+        // TODO(AR) is this ok?
+        return WalProcessingOption::kCorruptedRecord;
+      }
 
       return WalProcessingOption::kContinueProcessing;
-   }
+    }
 
-   const char* Name() const override {
-     return "WalFilterTestWithColumnFamilies";
-   }
+    const char* Name() const override {
+      return "WalFilterTestWithColumnFamilies";
+    }
 
     const std::map<uint32_t, std::vector<std::string>>& GetColumnFamilyKeys() {
       return cf_wal_keys_;
     }
 
-    const std::map<std::string, uint32_t> & GetColumnFamilyNameIdMap() {
+    const std::map<std::string, uint32_t>& GetColumnFamilyNameIdMap() {
       return cf_name_id_map_;
     }
   };
@@ -1109,20 +1159,22 @@ TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
 
   Options options = OptionsForLogIterTest();
   DestroyAndReopen(options);
-  CreateAndReopenWithCF({ "pikachu" }, options);
+  CreateAndReopenWithCF({"pikachu"}, options);
 
   // Write given keys in given batches
   for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
     WriteBatch batch;
     for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
-      batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024));
-      batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024));
+      ASSERT_OK(batch.Put(handles_[0], batch_keys_pre_flush[i][j],
+                          DummyString(1024)));
+      ASSERT_OK(batch.Put(handles_[1], batch_keys_pre_flush[i][j],
+                          DummyString(1024)));
     }
-    dbfull()->Write(WriteOptions(), &batch);
+    ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
   }
 
-  //Flush default column-family
-  db_->Flush(FlushOptions(), handles_[0]);
+  // Flush default column-family
+  ASSERT_OK(db_->Flush(FlushOptions(), handles_[0]));
 
   // Do some more writes
   std::vector<std::vector<std::string>> batch_keys_post_flush(3);
@@ -1138,10 +1190,12 @@ TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
   for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
     WriteBatch batch;
     for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
-      batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024));
-      batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024));
+      ASSERT_OK(batch.Put(handles_[0], batch_keys_post_flush[i][j],
+                          DummyString(1024)));
+      ASSERT_OK(batch.Put(handles_[1], batch_keys_post_flush[i][j],
+                          DummyString(1024)));
     }
-    dbfull()->Write(WriteOptions(), &batch);
+    ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
   }
 
   // On Recovery we should only find the second batch applicable to default CF
@@ -1153,8 +1207,7 @@ TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
   // Reopen database with option to use WAL filter
   options = OptionsForLogIterTest();
   options.wal_filter = &test_wal_filter_column_families;
-  Status status =
-    TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
+  Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
   ASSERT_TRUE(status.ok());
 
   // verify that handles_[0] only has post_flush keys
@@ -1163,24 +1216,24 @@ TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
   auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap();
   size_t index = 0;
   auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]];
-  //default column-family, only post_flush keys are expected
+  // default column-family, only post_flush keys are expected
   for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
     for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
       Slice key_from_the_log(keys_cf[index++]);
       Slice batch_key(batch_keys_post_flush[i][j]);
-      ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
+      ASSERT_EQ(key_from_the_log.compare(batch_key), 0);
     }
   }
-  ASSERT_TRUE(index == keys_cf.size());
+  ASSERT_EQ(index, keys_cf.size());
 
   index = 0;
   keys_cf = cf_wal_keys[name_id_map["pikachu"]];
-  //pikachu column-family, all keys are expected
+  // pikachu column-family, all keys are expected
   for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
     for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
       Slice key_from_the_log(keys_cf[index++]);
       Slice batch_key(batch_keys_pre_flush[i][j]);
-      ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
+      ASSERT_EQ(key_from_the_log.compare(batch_key), 0);
     }
   }
 
@@ -1188,10 +1241,10 @@ TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
     for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
       Slice key_from_the_log(keys_cf[index++]);
       Slice batch_key(batch_keys_post_flush[i][j]);
-      ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
+      ASSERT_EQ(key_from_the_log.compare(batch_key), 0);
     }
   }
-  ASSERT_TRUE(index == keys_cf.size());
+  ASSERT_EQ(index, keys_cf.size());
 }
 
 TEST_F(DBTest2, PresetCompressionDict) {
@@ -1211,7 +1264,7 @@ TEST_F(DBTest2, PresetCompressionDict) {
   options.disable_auto_compactions = true;
   options.level0_file_num_compaction_trigger = kNumL0Files;
   options.memtable_factory.reset(
-      new SpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
+      test::NewSpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
   options.num_levels = 2;
   options.target_file_size_base = kL0FileBytes;
   options.target_file_size_multiplier = 2;
@@ -1225,7 +1278,7 @@ TEST_F(DBTest2, PresetCompressionDict) {
 #if LZ4_VERSION_NUMBER >= 10400  // r124+
   compression_types.push_back(kLZ4Compression);
   compression_types.push_back(kLZ4HCCompression);
-#endif                          // LZ4_VERSION_NUMBER >= 10400
+#endif  // LZ4_VERSION_NUMBER >= 10400
   if (ZSTD_Supported()) {
     compression_types.push_back(kZSTD);
   }
@@ -1233,6 +1286,7 @@ TEST_F(DBTest2, PresetCompressionDict) {
   enum DictionaryTypes : int {
     kWithoutDict,
     kWithDict,
+    kWithZSTDfinalizeDict,
     kWithZSTDTrainedDict,
     kDictEnd,
   };
@@ -1241,6 +1295,7 @@ TEST_F(DBTest2, PresetCompressionDict) {
     options.compression = compression_type;
     size_t bytes_without_dict = 0;
     size_t bytes_with_dict = 0;
+    size_t bytes_with_zstd_finalize_dict = 0;
     size_t bytes_with_zstd_trained_dict = 0;
     for (int i = kWithoutDict; i < kDictEnd; i++) {
       // First iteration: compress without preset dictionary
@@ -1260,12 +1315,22 @@ TEST_F(DBTest2, PresetCompressionDict) {
           options.compression_opts.max_dict_bytes = kBlockSizeBytes;
           options.compression_opts.zstd_max_train_bytes = 0;
           break;
+        case kWithZSTDfinalizeDict:
+          if (compression_type != kZSTD ||
+              !ZSTD_FinalizeDictionarySupported()) {
+            continue;
+          }
+          options.compression_opts.max_dict_bytes = kBlockSizeBytes;
+          options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
+          options.compression_opts.use_zstd_dict_trainer = false;
+          break;
         case kWithZSTDTrainedDict:
-          if (compression_type != kZSTD) {
+          if (compression_type != kZSTD || !ZSTD_TrainDictionarySupported()) {
             continue;
           }
           options.compression_opts.max_dict_bytes = kBlockSizeBytes;
           options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
+          options.compression_opts.use_zstd_dict_trainer = true;
           break;
         default:
           assert(false);
@@ -1288,11 +1353,11 @@ TEST_F(DBTest2, PresetCompressionDict) {
           ASSERT_OK(Put(1, Key(static_cast<int>(key_num)),
                         seq_datas[(key_num / 10) % 10]));
         }
-        dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
+        ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
         ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
       }
-      dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
-                                  true /* disallow_trivial_move */);
+      ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
+                                            true /* disallow_trivial_move */));
       ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
       ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
 
@@ -1302,6 +1367,8 @@ TEST_F(DBTest2, PresetCompressionDict) {
         bytes_without_dict = total_sst_bytes;
       } else if (i == kWithDict) {
         bytes_with_dict = total_sst_bytes;
+      } else if (i == kWithZSTDfinalizeDict) {
+        bytes_with_zstd_finalize_dict = total_sst_bytes;
       } else if (i == kWithZSTDTrainedDict) {
         bytes_with_zstd_trained_dict = total_sst_bytes;
       }
@@ -1312,6 +1379,13 @@ TEST_F(DBTest2, PresetCompressionDict) {
       }
       if (i == kWithDict) {
         ASSERT_GT(bytes_without_dict, bytes_with_dict);
+      } else if (i == kWithZSTDTrainedDict) {
+        // In zstd compression, it is sometimes possible that using a finalized
+        // dictionary does not get as good a compression ratio as raw content
+        // dictionary. But using a dictionary should always get better
+        // compression ratio than not using one.
+        ASSERT_TRUE(bytes_with_dict > bytes_with_zstd_finalize_dict ||
+                    bytes_without_dict > bytes_with_zstd_finalize_dict);
       } else if (i == kWithZSTDTrainedDict) {
         // In zstd compression, it is sometimes possible that using a trained
         // dictionary does not get as good a compression ratio as without
@@ -1396,7 +1470,7 @@ class PresetCompressionDictTest
       public testing::WithParamInterface<std::tuple<CompressionType, bool>> {
  public:
   PresetCompressionDictTest()
-      : DBTestBase("/db_test2", false /* env_do_fsync */),
+      : DBTestBase("db_test2", false /* env_do_fsync */),
         compression_type_(std::get<0>(GetParam())),
         bottommost_(std::get<1>(GetParam())) {}
 
@@ -1412,67 +1486,94 @@ INSTANTIATE_TEST_CASE_P(
 
 TEST_P(PresetCompressionDictTest, Flush) {
   // Verifies that dictionary is generated and written during flush only when
-  // `ColumnFamilyOptions::compression` enables dictionary.
+  // `ColumnFamilyOptions::compression` enables dictionary. Also verifies the
+  // size of the dictionary is within expectations according to the limit on
+  // buffering set by `CompressionOptions::max_dict_buffer_bytes`.
   const size_t kValueLen = 256;
   const size_t kKeysPerFile = 1 << 10;
-  const size_t kDictLen = 4 << 10;
+  const size_t kDictLen = 16 << 10;
+  const size_t kBlockLen = 4 << 10;
 
   Options options = CurrentOptions();
   if (bottommost_) {
     options.bottommost_compression = compression_type_;
     options.bottommost_compression_opts.enabled = true;
     options.bottommost_compression_opts.max_dict_bytes = kDictLen;
+    options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
   } else {
     options.compression = compression_type_;
     options.compression_opts.max_dict_bytes = kDictLen;
+    options.compression_opts.max_dict_buffer_bytes = kBlockLen;
   }
-  options.memtable_factory.reset(new SpecialSkipListFactory(kKeysPerFile));
+  options.memtable_factory.reset(test::NewSpecialSkipListFactory(kKeysPerFile));
   options.statistics = CreateDBStatistics();
   BlockBasedTableOptions bbto;
+  bbto.block_size = kBlockLen;
   bbto.cache_index_and_filter_blocks = true;
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   Reopen(options);
 
-  uint64_t prev_compression_dict_misses =
-      TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
   Random rnd(301);
   for (size_t i = 0; i <= kKeysPerFile; ++i) {
     ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(kValueLen)));
   }
   ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
 
-  // If there's a compression dictionary, it should have been loaded when the
-  // flush finished, incurring a cache miss.
-  uint64_t expected_compression_dict_misses;
+  // We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
+  // compression dictionary exists since dictionaries would be preloaded when
+  // the flush finishes.
   if (bottommost_) {
-    expected_compression_dict_misses = prev_compression_dict_misses;
+    // Flush is never considered bottommost. This should change in the future
+    // since flushed files may have nothing underneath them, like the one in
+    // this test case.
+    ASSERT_EQ(
+        TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+        0);
   } else {
-    expected_compression_dict_misses = prev_compression_dict_misses + 1;
+    ASSERT_GT(
+        TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+        0);
+    // TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
+    // number of bytes needs to be adjusted in case the cached block is in
+    // ZSTD's digested dictionary format.
+    if (compression_type_ != kZSTD &&
+        compression_type_ != kZSTDNotFinalCompression) {
+      // Although we limited buffering to `kBlockLen`, there may be up to two
+      // blocks of data included in the dictionary since we only check limit
+      // after each block is built.
+      ASSERT_LE(TestGetTickerCount(options,
+                                   BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+                2 * kBlockLen);
+    }
   }
-  ASSERT_EQ(expected_compression_dict_misses,
-            TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
 }
 
 TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
   // Verifies that dictionary is generated and written during compaction to
   // non-bottommost level only when `ColumnFamilyOptions::compression` enables
-  // dictionary.
+  // dictionary. Also verifies the size of the dictionary is within expectations
+  // according to the limit on buffering set by
+  // `CompressionOptions::max_dict_buffer_bytes`.
   const size_t kValueLen = 256;
   const size_t kKeysPerFile = 1 << 10;
-  const size_t kDictLen = 4 << 10;
+  const size_t kDictLen = 16 << 10;
+  const size_t kBlockLen = 4 << 10;
 
   Options options = CurrentOptions();
   if (bottommost_) {
     options.bottommost_compression = compression_type_;
     options.bottommost_compression_opts.enabled = true;
     options.bottommost_compression_opts.max_dict_bytes = kDictLen;
+    options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
   } else {
     options.compression = compression_type_;
     options.compression_opts.max_dict_bytes = kDictLen;
+    options.compression_opts.max_dict_buffer_bytes = kBlockLen;
   }
   options.disable_auto_compactions = true;
   options.statistics = CreateDBStatistics();
   BlockBasedTableOptions bbto;
+  bbto.block_size = kBlockLen;
   bbto.cache_index_and_filter_blocks = true;
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   Reopen(options);
@@ -1494,8 +1595,8 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
   ASSERT_EQ("2,0,1", FilesPerLevel(0));
 #endif  // ROCKSDB_LITE
 
-  uint64_t prev_compression_dict_misses =
-      TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
+  uint64_t prev_compression_dict_bytes_inserted =
+      TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
   // This L0->L1 compaction merges the two L0 files into L1. The produced L1
   // file is not bottommost due to the existing L2 file covering the same key-
   // range.
@@ -1503,38 +1604,58 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("0,1,1", FilesPerLevel(0));
 #endif  // ROCKSDB_LITE
-  // If there's a compression dictionary, it should have been loaded when the
-  // compaction finished, incurring a cache miss.
-  uint64_t expected_compression_dict_misses;
+  // We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
+  // compression dictionary exists since dictionaries would be preloaded when
+  // the compaction finishes.
   if (bottommost_) {
-    expected_compression_dict_misses = prev_compression_dict_misses;
+    ASSERT_EQ(
+        TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+        prev_compression_dict_bytes_inserted);
   } else {
-    expected_compression_dict_misses = prev_compression_dict_misses + 1;
+    ASSERT_GT(
+        TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+        prev_compression_dict_bytes_inserted);
+    // TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
+    // number of bytes needs to be adjusted in case the cached block is in
+    // ZSTD's digested dictionary format.
+    if (compression_type_ != kZSTD &&
+        compression_type_ != kZSTDNotFinalCompression) {
+      // Although we limited buffering to `kBlockLen`, there may be up to two
+      // blocks of data included in the dictionary since we only check limit
+      // after each block is built.
+      ASSERT_LE(TestGetTickerCount(options,
+                                   BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+                prev_compression_dict_bytes_inserted + 2 * kBlockLen);
+    }
   }
-  ASSERT_EQ(expected_compression_dict_misses,
-            TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
 }
 
 TEST_P(PresetCompressionDictTest, CompactBottommost) {
   // Verifies that dictionary is generated and written during compaction to
   // non-bottommost level only when either `ColumnFamilyOptions::compression` or
-  // `ColumnFamilyOptions::bottommost_compression` enables dictionary.
+  // `ColumnFamilyOptions::bottommost_compression` enables dictionary. Also
+  // verifies the size of the dictionary is within expectations according to the
+  // limit on buffering set by `CompressionOptions::max_dict_buffer_bytes`.
   const size_t kValueLen = 256;
   const size_t kKeysPerFile = 1 << 10;
-  const size_t kDictLen = 4 << 10;
+  const size_t kDictLen = 16 << 10;
+  const size_t kBlockLen = 4 << 10;
 
   Options options = CurrentOptions();
   if (bottommost_) {
     options.bottommost_compression = compression_type_;
     options.bottommost_compression_opts.enabled = true;
     options.bottommost_compression_opts.max_dict_bytes = kDictLen;
+    options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
   } else {
     options.compression = compression_type_;
     options.compression_opts.max_dict_bytes = kDictLen;
+    options.compression_opts.max_dict_buffer_bytes = kBlockLen;
   }
   options.disable_auto_compactions = true;
   options.statistics = CreateDBStatistics();
   BlockBasedTableOptions bbto;
+  bbto.block_size = kBlockLen;
   bbto.cache_index_and_filter_blocks = true;
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   Reopen(options);
@@ -1550,17 +1671,28 @@ TEST_P(PresetCompressionDictTest, CompactBottommost) {
   ASSERT_EQ("2", FilesPerLevel(0));
 #endif  // ROCKSDB_LITE
 
-  uint64_t prev_compression_dict_misses =
-      TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
+  uint64_t prev_compression_dict_bytes_inserted =
+      TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
   CompactRangeOptions cro;
   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("0,1", FilesPerLevel(0));
 #endif  // ROCKSDB_LITE
-  // If there's a compression dictionary, it should have been loaded when the
-  // compaction finished, incurring a cache miss.
-  ASSERT_EQ(prev_compression_dict_misses + 1,
-            TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
+  ASSERT_GT(
+      TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+      prev_compression_dict_bytes_inserted);
+  // TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
+  // number of bytes needs to be adjusted in case the cached block is in ZSTD's
+  // digested dictionary format.
+  if (compression_type_ != kZSTD &&
+      compression_type_ != kZSTDNotFinalCompression) {
+    // Although we limited buffering to `kBlockLen`, there may be up to two
+    // blocks of data included in the dictionary since we only check limit after
+    // each block is built.
+    ASSERT_LE(
+        TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
+        prev_compression_dict_bytes_inserted + 2 * kBlockLen);
+  }
 }
 
 class CompactionCompressionListener : public EventListener {
@@ -1574,7 +1706,7 @@ class CompactionCompressionListener : public EventListener {
     for (int level = 0; level < db->NumberLevels(); level++) {
       std::string files_at_level;
       ASSERT_TRUE(
-          db->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
+          db->GetProperty("rocksdb.num-files-at-level" + std::to_string(level),
                           &files_at_level));
       if (files_at_level != "0") {
         bottommost_level = level;
@@ -1662,15 +1794,14 @@ TEST_P(CompressionFailuresTest, CompressionFailures) {
         });
   } else if (compression_failure_type_ == kTestDecompressionFail) {
     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-        "UncompressBlockContentsForCompressionType:TamperWithReturnValue",
-        [](void* arg) {
+        "UncompressBlockData:TamperWithReturnValue", [](void* arg) {
           Status* ret = static_cast<Status*>(arg);
           ASSERT_OK(*ret);
           *ret = Status::Corruption("kTestDecompressionFail");
         });
   } else if (compression_failure_type_ == kTestDecompressionCorruption) {
     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-        "UncompressBlockContentsForCompressionType:"
+        "UncompressBlockData:"
         "TamperWithDecompressionOutput",
         [](void* arg) {
           BlockContents* contents = static_cast<BlockContents*>(arg);
@@ -1738,7 +1869,7 @@ TEST_P(CompressionFailuresTest, CompressionFailures) {
               "Could not decompress: kTestDecompressionFail");
   } else if (compression_failure_type_ == kTestDecompressionCorruption) {
     ASSERT_EQ(std::string(s.getState()),
-              "Decompressed block did not match raw block");
+              "Decompressed block did not match pre-compression block");
   }
 }
 
@@ -1804,7 +1935,7 @@ TEST_F(DBTest2, CompressionOptions) {
           ASSERT_OK(Put(key, value));
         }
         ASSERT_OK(Flush());
-        dbfull()->TEST_WaitForCompact();
+        ASSERT_OK(dbfull()->TEST_WaitForCompact());
       }
 
       // Make sure that we wrote enough to check all 7 levels
@@ -1819,6 +1950,7 @@ TEST_F(DBTest2, CompressionOptions) {
         ASSERT_EQ(key_value_written[key], value);
         key_value_written.erase(key);
       }
+      ASSERT_OK(db_iter->status());
       ASSERT_EQ(0, key_value_written.size());
     }
   }
@@ -1826,7 +1958,8 @@ TEST_F(DBTest2, CompressionOptions) {
 
 class CompactionStallTestListener : public EventListener {
  public:
-  CompactionStallTestListener() : compacting_files_cnt_(0), compacted_files_cnt_(0) {}
+  CompactionStallTestListener()
+      : compacting_files_cnt_(0), compacted_files_cnt_(0) {}
 
   void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
     ASSERT_EQ(ci.cf_name, "default");
@@ -1900,12 +2033,13 @@ TEST_F(DBTest2, CompactionStall) {
   // Hold NotifyOnCompactionCompleted in the unlock mutex section
   TEST_SYNC_POINT("DBTest2::CompactionStall:3");
 
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_LT(NumTableFilesAtLevel(0),
             options.level0_file_num_compaction_trigger);
   ASSERT_GT(listener->compacted_files_cnt_.load(),
             10 - options.level0_file_num_compaction_trigger);
-  ASSERT_EQ(listener->compacting_files_cnt_.load(), listener->compacted_files_cnt_.load());
+  ASSERT_EQ(listener->compacting_files_cnt_.load(),
+            listener->compacted_files_cnt_.load());
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
@@ -1921,8 +2055,8 @@ TEST_F(DBTest2, FirstSnapshotTest) {
   // This snapshot will have sequence number 0 what is expected behaviour.
   const Snapshot* s1 = db_->GetSnapshot();
 
-  Put(1, "k1", std::string(100000, 'x'));  // Fill memtable
-  Put(1, "k2", std::string(100000, 'y'));  // Trigger flush
+  ASSERT_OK(Put(1, "k1", std::string(100000, 'x')));  // Fill memtable
+  ASSERT_OK(Put(1, "k2", std::string(100000, 'y')));  // Trigger flush
 
   db_->ReleaseSnapshot(s1);
 }
@@ -1935,17 +2069,17 @@ TEST_F(DBTest2, DuplicateSnapshot) {
   DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
   SequenceNumber oldest_ww_snap, first_ww_snap;
 
-  Put("k", "v");  // inc seq
+  ASSERT_OK(Put("k", "v"));  // inc seq
   snapshots.push_back(db_->GetSnapshot());
   snapshots.push_back(db_->GetSnapshot());
-  Put("k", "v");  // inc seq
+  ASSERT_OK(Put("k", "v"));  // inc seq
   snapshots.push_back(db_->GetSnapshot());
   snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
   first_ww_snap = snapshots.back()->GetSequenceNumber();
-  Put("k", "v");  // inc seq
+  ASSERT_OK(Put("k", "v"));  // inc seq
   snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
   snapshots.push_back(db_->GetSnapshot());
-  Put("k", "v");  // inc seq
+  ASSERT_OK(Put("k", "v"));  // inc seq
   snapshots.push_back(db_->GetSnapshot());
 
   {
@@ -1966,7 +2100,7 @@ class PinL0IndexAndFilterBlocksTest
       public testing::WithParamInterface<std::tuple<bool, bool>> {
  public:
   PinL0IndexAndFilterBlocksTest()
-      : DBTestBase("/db_pin_l0_index_bloom_test", /*env_do_fsync=*/true) {}
+      : DBTestBase("db_pin_l0_index_bloom_test", /*env_do_fsync=*/true) {}
   void SetUp() override {
     infinite_max_files_ = std::get<0>(GetParam());
     disallow_preload_ = std::get<1>(GetParam());
@@ -1985,19 +2119,19 @@ class PinL0IndexAndFilterBlocksTest
     options->table_factory.reset(NewBlockBasedTableFactory(table_options));
     CreateAndReopenWithCF({"pikachu"}, *options);
 
-    Put(1, "a", "begin");
-    Put(1, "z", "end");
+    ASSERT_OK(Put(1, "a", "begin"));
+    ASSERT_OK(Put(1, "z", "end"));
     ASSERT_OK(Flush(1));
     // move this table to L1
-    dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
+    ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
 
     // reset block cache
     table_options.block_cache = NewLRUCache(64 * 1024);
     options->table_factory.reset(NewBlockBasedTableFactory(table_options));
     TryReopenWithColumnFamilies({"default", "pikachu"}, *options);
     // create new table at L0
-    Put(1, "a2", "begin2");
-    Put(1, "z2", "end2");
+    ASSERT_OK(Put(1, "a2", "begin2"));
+    ASSERT_OK(Put(1, "z2", "end2"));
     ASSERT_OK(Flush(1));
 
     if (close_afterwards) {
@@ -2041,7 +2175,7 @@ TEST_P(PinL0IndexAndFilterBlocksTest,
 
   std::string value;
   // Miss and hit count should remain the same, they're all pinned.
-  db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value);
+  ASSERT_TRUE(db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value));
   ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
   ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
   ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
@@ -2169,7 +2303,7 @@ TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) {
   // cache read for both of index and filter. If prefetch doesn't explicitly
   // happen, it will happen when verifying the file.
   Compact(1, "a", "zzzzz");
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   if (!disallow_preload_) {
     ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
@@ -2207,8 +2341,8 @@ INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest,
 #ifndef ROCKSDB_LITE
 TEST_F(DBTest2, MaxCompactionBytesTest) {
   Options options = CurrentOptions();
-  options.memtable_factory.reset(
-      new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
+  options.memtable_factory.reset(test::NewSpecialSkipListFactory(
+      DBTestBase::kNumKeysByGenerateNewRandomFile));
   options.compaction_style = kCompactionStyleLevel;
   options.write_buffer_size = 200 << 10;
   options.arena_block_size = 4 << 10;
@@ -2228,7 +2362,7 @@ TEST_F(DBTest2, MaxCompactionBytesTest) {
     GenerateNewRandomFile(&rnd);
   }
   CompactRangeOptions cro;
-  cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
+  cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
   ASSERT_EQ("0,0,8", FilesPerLevel(0));
 
@@ -2240,14 +2374,20 @@ TEST_F(DBTest2, MaxCompactionBytesTest) {
   GenerateNewRandomFile(&rnd);
   // Add three more small files that overlap with the previous file
   for (int i = 0; i < 3; i++) {
-    Put("a", "z");
+    ASSERT_OK(Put("a", "z"));
     ASSERT_OK(Flush());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
-  // Output files to L1 are cut to three pieces, according to
-  // options.max_compaction_bytes
-  ASSERT_EQ("0,3,8", FilesPerLevel(0));
+  // Output files to L1 are cut to 4 pieces, according to
+  // options.max_compaction_bytes (300K)
+  // There are 8 files on L2 (grandparents level), each one is 100K. The first
+  // file overlaps with a, b which max_compaction_bytes is less than 300K, the
+  // second one overlaps with d, e, which is also less than 300K. Including any
+  // extra grandparent file will make the future compaction larger than 300K.
+  // L1: [  1  ] [  2 ]  [  3  ] [ 4 ]
+  // L2: [a] [b] [c] [d] [e] [f] [g] [h]
+  ASSERT_EQ("0,4,8", FilesPerLevel(0));
 }
 
 static void UniqueIdCallback(void* arg) {
@@ -2370,14 +2510,14 @@ TEST_F(DBTest2, TestPerfContextIterCpuTime) {
 
   const size_t kNumEntries = 10;
   for (size_t i = 0; i < kNumEntries; ++i) {
-    ASSERT_OK(Put("k" + ToString(i), "v" + ToString(i)));
+    ASSERT_OK(Put("k" + std::to_string(i), "v" + std::to_string(i)));
   }
   ASSERT_OK(Flush());
   for (size_t i = 0; i < kNumEntries; ++i) {
-    ASSERT_EQ("v" + ToString(i), Get("k" + ToString(i)));
+    ASSERT_EQ("v" + std::to_string(i), Get("k" + std::to_string(i)));
   }
-  std::string last_key = "k" + ToString(kNumEntries - 1);
-  std::string last_value = "v" + ToString(kNumEntries - 1);
+  std::string last_key = "k" + std::to_string(kNumEntries - 1);
+  std::string last_value = "v" + std::to_string(kNumEntries - 1);
   env_->now_cpu_count_.store(0);
   env_->SetMockSleep();
 
@@ -2404,6 +2544,7 @@ TEST_F(DBTest2, TestPerfContextIterCpuTime) {
   ASSERT_EQ(0, get_perf_context()->iter_next_cpu_nanos);
   iter->Prev();
   ASSERT_TRUE(iter->Valid());
+  ASSERT_OK(iter->status());
   ASSERT_EQ("v0", iter->value().ToString());
   ASSERT_EQ(0, get_perf_context()->iter_prev_cpu_nanos);
   ASSERT_EQ(0, env_->now_cpu_count_.load());
@@ -2440,6 +2581,7 @@ TEST_F(DBTest2, TestPerfContextIterCpuTime) {
   ASSERT_LT(get_perf_context()->iter_next_cpu_nanos, kDummyAddonNanos);
   iter->Prev();
   ASSERT_TRUE(iter->Valid());
+  ASSERT_OK(iter->status());
   ASSERT_EQ("v0", iter->value().ToString());
   ASSERT_GT(get_perf_context()->iter_prev_cpu_nanos, 0);
   ASSERT_LT(get_perf_context()->iter_prev_cpu_nanos, kDummyAddonNanos);
@@ -2522,7 +2664,7 @@ namespace {
 void CountSyncPoint() {
   TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr /* arg */);
 }
-}  // namespace
+}  // anonymous namespace
 
 TEST_F(DBTest2, SyncPointMarker) {
   std::atomic<int> sync_point_called(0);
@@ -2637,6 +2779,7 @@ TEST_F(DBTest2, ReadAmpBitmap) {
     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
       ASSERT_EQ(iter->value().ToString(), Get(iter->key().ToString()));
     }
+    ASSERT_OK(iter->status());
     delete iter;
 
     // Read amp is on average 100% since we read all what we loaded in memory
@@ -2654,7 +2797,7 @@ TEST_F(DBTest2, ReadAmpBitmap) {
   }
 }
 
-#ifndef OS_SOLARIS // GetUniqueIdFromFile is not implemented
+#ifndef OS_SOLARIS  // GetUniqueIdFromFile is not implemented
 TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
   {
     const int kIdBufLen = 100;
@@ -2711,7 +2854,6 @@ TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
     Close();
     Reopen(options);
 
-    uint64_t total_useful_bytes = 0;
     std::set<int> read_keys;
     std::string value;
     // Iter1: Read half the DB, Read even keys
@@ -2722,8 +2864,6 @@ TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
 
       if (read_keys.find(i) == read_keys.end()) {
         auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
-        total_useful_bytes +=
-            GetEncodedEntrySize(internal_key.size(), value.size());
         read_keys.insert(i);
       }
     }
@@ -2750,8 +2890,6 @@ TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
 
       if (read_keys.find(i) == read_keys.end()) {
         auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
-        total_useful_bytes +=
-            GetEncodedEntrySize(internal_key.size(), value.size());
         read_keys.insert(i);
       }
     }
@@ -2761,7 +2899,6 @@ TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
     size_t total_loaded_bytes_iter2 =
         options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
 
-
     // Read amp is on average 100% since we read all what we loaded in memory
     if (k == 0) {
       ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2,
@@ -2773,7 +2910,7 @@ TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
     }
   }
 }
-#endif // !OS_SOLARIS
+#endif  // !OS_SOLARIS
 
 #ifndef ROCKSDB_LITE
 TEST_F(DBTest2, AutomaticCompactionOverlapManualCompaction) {
@@ -2940,10 +3077,21 @@ TEST_F(DBTest2, PausingManualCompaction1) {
   int manual_compactions_paused = 0;
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) {
+        auto canceled = static_cast<std::atomic<bool>*>(arg);
+        // CompactRange triggers manual compaction and cancel the compaction
+        // by set *canceled as true
+        if (canceled != nullptr) {
+          canceled->store(true, std::memory_order_release);
+        }
+        manual_compactions_paused += 1;
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
         auto paused = static_cast<std::atomic<int>*>(arg);
+        // CompactFiles() relies on manual_compactions_paused to
+        // determine if thie compaction should be paused or not
         ASSERT_EQ(0, paused->load(std::memory_order_acquire));
         paused->fetch_add(1, std::memory_order_release);
-        manual_compactions_paused += 1;
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
@@ -2956,10 +3104,12 @@ TEST_F(DBTest2, PausingManualCompaction1) {
   }
 
   // OK, now trigger a manual compaction
-  dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+  ASSERT_TRUE(dbfull()
+                  ->CompactRange(CompactRangeOptions(), nullptr, nullptr)
+                  .IsManualCompactionPaused());
 
   // Wait for compactions to get scheduled and stopped
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 
   // Get file names after compaction is stopped
   files_meta.clear();
@@ -2974,10 +3124,12 @@ TEST_F(DBTest2, PausingManualCompaction1) {
 
   manual_compactions_paused = 0;
   // Now make sure CompactFiles also not run
-  dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
-                         files_before_compact, 0);
+  ASSERT_TRUE(dbfull()
+                  ->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
+                                 files_before_compact, 0)
+                  .IsManualCompactionPaused());
   // Wait for manual compaction to get scheduled and finish
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 
   files_meta.clear();
   files_after_compact.clear();
@@ -3004,7 +3156,7 @@ TEST_F(DBTest2, PausingManualCompaction2) {
 
   Random rnd(301);
   for (int i = 0; i < 2; i++) {
-    // Generate a file containing 10 keys.
+    // Generate a file containing 100 keys.
     for (int j = 0; j < 100; j++) {
       ASSERT_OK(Put(Key(j), rnd.RandomString(50)));
     }
@@ -3030,7 +3182,7 @@ TEST_F(DBTest2, PausingManualCompaction3) {
         for (int k = 0; k < 1000; k++) {
           ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
         }
-        Flush();
+        ASSERT_OK(Flush());
       }
 
       for (int l = 1; l < options.num_levels - i; l++) {
@@ -3051,8 +3203,10 @@ TEST_F(DBTest2, PausingManualCompaction3) {
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
   dbfull()->DisableManualCompaction();
-  dbfull()->CompactRange(compact_options, nullptr, nullptr);
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_TRUE(dbfull()
+                  ->CompactRange(compact_options, nullptr, nullptr)
+                  .IsManualCompactionPaused());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
   // As manual compaction disabled, not even reach sync point
   ASSERT_EQ(run_manual_compactions, 0);
 #ifndef ROCKSDB_LITE
@@ -3062,8 +3216,8 @@ TEST_F(DBTest2, PausingManualCompaction3) {
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
       "CompactionJob::Run():PausingManualCompaction:1");
   dbfull()->EnableManualCompaction();
-  dbfull()->CompactRange(compact_options, nullptr, nullptr);
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
 #endif  // !ROCKSDB_LITE
@@ -3084,7 +3238,7 @@ TEST_F(DBTest2, PausingManualCompaction4) {
         for (int k = 0; k < 1000; k++) {
           ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
         }
-        Flush();
+        ASSERT_OK(Flush());
       }
 
       for (int l = 1; l < options.num_levels - i; l++) {
@@ -3101,15 +3255,28 @@ TEST_F(DBTest2, PausingManualCompaction4) {
   int run_manual_compactions = 0;
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) {
+        auto canceled = static_cast<std::atomic<bool>*>(arg);
+        // CompactRange triggers manual compaction and cancel the compaction
+        // by set *canceled as true
+        if (canceled != nullptr) {
+          canceled->store(true, std::memory_order_release);
+        }
+        run_manual_compactions++;
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "TestCompactFiles:PausingManualCompaction:3", [&](void* arg) {
         auto paused = static_cast<std::atomic<int>*>(arg);
+        // CompactFiles() relies on manual_compactions_paused to
+        // determine if thie compaction should be paused or not
         ASSERT_EQ(0, paused->load(std::memory_order_acquire));
         paused->fetch_add(1, std::memory_order_release);
-        run_manual_compactions++;
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
-  dbfull()->CompactRange(compact_options, nullptr, nullptr);
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_TRUE(dbfull()
+                  ->CompactRange(compact_options, nullptr, nullptr)
+                  .IsManualCompactionPaused());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
   ASSERT_EQ(run_manual_compactions, 1);
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
@@ -3117,9 +3284,8 @@ TEST_F(DBTest2, PausingManualCompaction4) {
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
       "CompactionJob::Run():PausingManualCompaction:2");
-  dbfull()->EnableManualCompaction();
-  dbfull()->CompactRange(compact_options, nullptr, nullptr);
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
 #endif  // !ROCKSDB_LITE
@@ -3127,114 +3293,474 @@ TEST_F(DBTest2, PausingManualCompaction4) {
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
-TEST_F(DBTest2, OptimizeForPointLookup) {
-  Options options = CurrentOptions();
-  Close();
-  options.OptimizeForPointLookup(2);
-  ASSERT_OK(DB::Open(options, dbname_, &db_));
-
-  ASSERT_OK(Put("foo", "v1"));
-  ASSERT_EQ("v1", Get("foo"));
-  Flush();
-  ASSERT_EQ("v1", Get("foo"));
-}
+TEST_F(DBTest2, CancelManualCompaction1) {
+  CompactRangeOptions compact_options;
+  auto canceledPtr =
+      std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
+  compact_options.canceled = canceledPtr.get();
 
-TEST_F(DBTest2, OptimizeForSmallDB) {
   Options options = CurrentOptions();
-  Close();
-  options.OptimizeForSmallDb();
+  options.disable_auto_compactions = true;
+  options.num_levels = 7;
 
-  // Find the cache object
-  ASSERT_TRUE(options.table_factory->IsInstanceOf(
-      TableFactory::kBlockBasedTableName()));
-  auto table_options =
-      options.table_factory->GetOptions<BlockBasedTableOptions>();
+  Random rnd(301);
+  auto generate_files = [&]() {
+    for (int i = 0; i < options.num_levels; i++) {
+      for (int j = 0; j < options.num_levels - i + 1; j++) {
+        for (int k = 0; k < 1000; k++) {
+          ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
+        }
+        ASSERT_OK(Flush());
+      }
 
-  ASSERT_TRUE(table_options != nullptr);
-  std::shared_ptr<Cache> cache = table_options->block_cache;
+      for (int l = 1; l < options.num_levels - i; l++) {
+        MoveFilesToLevel(l);
+      }
+    }
+  };
 
-  ASSERT_EQ(0, cache->GetUsage());
-  ASSERT_OK(DB::Open(options, dbname_, &db_));
-  ASSERT_OK(Put("foo", "v1"));
+  DestroyAndReopen(options);
+  generate_files();
+#ifndef ROCKSDB_LITE
+  ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
+#endif  // !ROCKSDB_LITE
 
-  // memtable size is costed to the block cache
-  ASSERT_NE(0, cache->GetUsage());
+  int run_manual_compactions = 0;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "CompactionJob::Run():PausingManualCompaction:1",
+      [&](void* /*arg*/) { run_manual_compactions++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
-  ASSERT_EQ("v1", Get("foo"));
-  Flush();
+  // Setup a callback to disable compactions after a couple of levels are
+  // compacted
+  int compactions_run = 0;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::RunManualCompaction()::1",
+      [&](void* /*arg*/) { ++compactions_run; });
 
-  size_t prev_size = cache->GetUsage();
-  // Remember block cache size, so that we can find that
-  // it is filled after Get().
-  // Use pinnable slice so that it can ping the block so that
-  // when we check the size it is not evicted.
-  PinnableSlice value;
-  ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), "foo", &value));
-  ASSERT_GT(cache->GetUsage(), prev_size);
-  value.Reset();
-}
+  ASSERT_TRUE(dbfull()
+                  ->CompactRange(compact_options, nullptr, nullptr)
+                  .IsManualCompactionPaused());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 
-#endif  // ROCKSDB_LITE
+  // Since compactions are disabled, we shouldn't start compacting.
+  // E.g. we should call the compaction function exactly one time.
+  ASSERT_EQ(compactions_run, 0);
+  ASSERT_EQ(run_manual_compactions, 0);
+#ifndef ROCKSDB_LITE
+  ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
+#endif  // !ROCKSDB_LITE
 
-TEST_F(DBTest2, IterRaceFlush1) {
-  ASSERT_OK(Put("foo", "v1"));
+  compactions_run = 0;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+      "DBImpl::RunManualCompaction()::1");
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::RunManualCompaction()::1", [&](void* /*arg*/) {
+        ++compactions_run;
+        // After 3 compactions disable
+        if (compactions_run == 3) {
+          compact_options.canceled->store(true, std::memory_order_release);
+        }
+      });
 
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
-      {{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
-       {"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});
+  compact_options.canceled->store(false, std::memory_order_release);
+  ASSERT_TRUE(dbfull()
+                  ->CompactRange(compact_options, nullptr, nullptr)
+                  .IsManualCompactionPaused());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_EQ(compactions_run, 3);
 
-  ROCKSDB_NAMESPACE::port::Thread t1([&] {
-    TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
-    ASSERT_OK(Put("foo", "v2"));
-    Flush();
-    TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
-  });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+      "DBImpl::RunManualCompaction()::1");
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+      "CompactionJob::Run():PausingManualCompaction:1");
 
-  // iterator is created after the first Put(), so it should see either
-  // "v1" or "v2".
-  {
-    std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
-    it->Seek("foo");
-    ASSERT_TRUE(it->Valid());
-    ASSERT_EQ("foo", it->key().ToString());
-  }
+  // Compactions should work again if we re-enable them..
+  compact_options.canceled->store(false, std::memory_order_relaxed);
+  ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+#ifndef ROCKSDB_LITE
+  ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
+#endif  // !ROCKSDB_LITE
 
-  t1.join();
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
-TEST_F(DBTest2, IterRaceFlush2) {
-  ASSERT_OK(Put("foo", "v1"));
+TEST_F(DBTest2, CancelManualCompaction2) {
+  CompactRangeOptions compact_options;
+  auto canceledPtr =
+      std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
+  compact_options.canceled = canceledPtr.get();
+  compact_options.max_subcompactions = 1;
 
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
-      {{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
-       {"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});
+  Options options = CurrentOptions();
+  options.disable_auto_compactions = true;
+  options.num_levels = 7;
 
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  Random rnd(301);
+  auto generate_files = [&]() {
+    for (int i = 0; i < options.num_levels; i++) {
+      for (int j = 0; j < options.num_levels - i + 1; j++) {
+        for (int k = 0; k < 1000; k++) {
+          ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50)));
+        }
+        ASSERT_OK(Flush());
+      }
 
-  ROCKSDB_NAMESPACE::port::Thread t1([&] {
-    TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
-    ASSERT_OK(Put("foo", "v2"));
-    Flush();
-    TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
-  });
+      for (int l = 1; l < options.num_levels - i; l++) {
+        MoveFilesToLevel(l);
+      }
+    }
+  };
 
-  // iterator is created after the first Put(), so it should see either
-  // "v1" or "v2".
-  {
-    std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
-    it->Seek("foo");
-    ASSERT_TRUE(it->Valid());
-    ASSERT_EQ("foo", it->key().ToString());
-  }
+  DestroyAndReopen(options);
+  generate_files();
+#ifndef ROCKSDB_LITE
+  ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel());
+#endif  // !ROCKSDB_LITE
 
-  t1.join();
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
-}
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
-TEST_F(DBTest2, IterRefreshRaceFlush) {
+  int compactions_run = 0;
+  std::atomic<int> kv_compactions{0};
+  int compactions_stopped_at = 0;
+  int kv_compactions_stopped_at = 0;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::RunManualCompaction()::1", [&](void* /*arg*/) {
+        ++compactions_run;
+        // After 3 compactions disable
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "CompactionIterator:ProcessKV", [&](void* /*arg*/) {
+        int kv_compactions_run =
+            kv_compactions.fetch_add(1, std::memory_order_release);
+        if (kv_compactions_run == 5) {
+          compact_options.canceled->store(true, std::memory_order_release);
+          kv_compactions_stopped_at = kv_compactions_run;
+          compactions_stopped_at = compactions_run;
+        }
+      });
+
+  compact_options.canceled->store(false, std::memory_order_release);
+  ASSERT_TRUE(dbfull()
+                  ->CompactRange(compact_options, nullptr, nullptr)
+                  .IsManualCompactionPaused());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+
+  // NOTE: as we set compact_options.max_subcompacitons = 1, and store true to
+  // the canceled variable from the single compacting thread (via callback),
+  // this value is deterministically kv_compactions_stopped_at + 1.
+  ASSERT_EQ(kv_compactions, kv_compactions_stopped_at + 1);
+  ASSERT_EQ(compactions_run, compactions_stopped_at);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+      "CompactionIterator::ProcessKV");
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+      "DBImpl::RunManualCompaction()::1");
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+      "CompactionJob::Run():PausingManualCompaction:1");
+
+  // Compactions should work again if we re-enable them..
+  compact_options.canceled->store(false, std::memory_order_relaxed);
+  ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+#ifndef ROCKSDB_LITE
+  ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
+#endif  // !ROCKSDB_LITE
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+class CancelCompactionListener : public EventListener {
+ public:
+  CancelCompactionListener()
+      : num_compaction_started_(0), num_compaction_ended_(0) {}
+
+  void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
+    ASSERT_EQ(ci.cf_name, "default");
+    ASSERT_EQ(ci.base_input_level, 0);
+    num_compaction_started_++;
+  }
+
+  void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
+    ASSERT_EQ(ci.cf_name, "default");
+    ASSERT_EQ(ci.base_input_level, 0);
+    ASSERT_EQ(ci.status.code(), code_);
+    ASSERT_EQ(ci.status.subcode(), subcode_);
+    num_compaction_ended_++;
+  }
+
+  std::atomic<size_t> num_compaction_started_;
+  std::atomic<size_t> num_compaction_ended_;
+  Status::Code code_;
+  Status::SubCode subcode_;
+};
+
+TEST_F(DBTest2, CancelManualCompactionWithListener) {
+  CompactRangeOptions compact_options;
+  auto canceledPtr =
+      std::unique_ptr<std::atomic<bool>>(new std::atomic<bool>{true});
+  compact_options.canceled = canceledPtr.get();
+  compact_options.max_subcompactions = 1;
+
+  Options options = CurrentOptions();
+  options.disable_auto_compactions = true;
+  CancelCompactionListener* listener = new CancelCompactionListener();
+  options.listeners.emplace_back(listener);
+
+  DestroyAndReopen(options);
+
+  Random rnd(301);
+  for (int i = 0; i < 10; i++) {
+    for (int j = 0; j < 10; j++) {
+      ASSERT_OK(Put(Key(i + j * 10), rnd.RandomString(50)));
+    }
+    ASSERT_OK(Flush());
+  }
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "CompactionIterator:ProcessKV", [&](void* /*arg*/) {
+        compact_options.canceled->store(true, std::memory_order_release);
+      });
+
+  int running_compaction = 0;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "CompactionJob::FinishCompactionOutputFile1",
+      [&](void* /*arg*/) { running_compaction++; });
+
+  // Case I: 1 Notify begin compaction, 2 Set *canceled as true to disable
+  // manual compaction in the callback function, 3 Compaction not run,
+  // 4 Notify compaction end.
+  listener->code_ = Status::kIncomplete;
+  listener->subcode_ = Status::SubCode::kManualCompactionPaused;
+
+  compact_options.canceled->store(false, std::memory_order_release);
+  ASSERT_TRUE(dbfull()
+                  ->CompactRange(compact_options, nullptr, nullptr)
+                  .IsManualCompactionPaused());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+
+  ASSERT_GT(listener->num_compaction_started_, 0);
+  ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
+  ASSERT_EQ(running_compaction, 0);
+
+  listener->num_compaction_started_ = 0;
+  listener->num_compaction_ended_ = 0;
+
+  // Case II: 1 Set *canceled as true in the callback function to disable manual
+  // compaction, 2 Notify begin compaction (return without notifying), 3 Notify
+  // compaction end (return without notifying).
+  ASSERT_TRUE(dbfull()
+                  ->CompactRange(compact_options, nullptr, nullptr)
+                  .IsManualCompactionPaused());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+
+  ASSERT_EQ(listener->num_compaction_started_, 0);
+  ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
+  ASSERT_EQ(running_compaction, 0);
+
+  // Case III: 1 Notify begin compaction, 2 Compaction in between
+  // 3. Set *canceled as true in the callback function to disable manual
+  // compaction, 4 Notify compaction end.
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+      "CompactionIterator:ProcessKV");
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "CompactionJob::Run:BeforeVerify", [&](void* /*arg*/) {
+        compact_options.canceled->store(true, std::memory_order_release);
+      });
+
+  listener->code_ = Status::kOk;
+  listener->subcode_ = Status::SubCode::kNone;
+
+  compact_options.canceled->store(false, std::memory_order_release);
+  ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+
+  ASSERT_GT(listener->num_compaction_started_, 0);
+  ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
+
+  // Compaction job will succeed.
+  ASSERT_GT(running_compaction, 0);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, CompactionOnBottomPriorityWithListener) {
+  int num_levels = 3;
+  const int kNumFilesTrigger = 4;
+
+  Options options = CurrentOptions();
+  env_->SetBackgroundThreads(0, Env::Priority::HIGH);
+  env_->SetBackgroundThreads(0, Env::Priority::LOW);
+  env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
+  options.env = env_;
+  options.compaction_style = kCompactionStyleUniversal;
+  options.num_levels = num_levels;
+  options.write_buffer_size = 100 << 10;     // 100KB
+  options.target_file_size_base = 32 << 10;  // 32KB
+  options.level0_file_num_compaction_trigger = kNumFilesTrigger;
+  // Trigger compaction if size amplification exceeds 110%
+  options.compaction_options_universal.max_size_amplification_percent = 110;
+
+  CancelCompactionListener* listener = new CancelCompactionListener();
+  options.listeners.emplace_back(listener);
+
+  DestroyAndReopen(options);
+
+  int num_bottom_thread_compaction_scheduled = 0;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
+      [&](void* /*arg*/) { num_bottom_thread_compaction_scheduled++; });
+
+  int num_compaction_jobs = 0;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "CompactionJob::Run():End",
+      [&](void* /*arg*/) { num_compaction_jobs++; });
+
+  listener->code_ = Status::kOk;
+  listener->subcode_ = Status::SubCode::kNone;
+
+  Random rnd(301);
+  for (int i = 0; i < 1; ++i) {
+    for (int num = 0; num < kNumFilesTrigger; num++) {
+      int key_idx = 0;
+      GenerateNewFile(&rnd, &key_idx, true /* no_wait */);
+      // use no_wait above because that one waits for flush and compaction. We
+      // don't want to wait for compaction because the full compaction is
+      // intentionally blocked while more files are flushed.
+      ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+    }
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_GT(num_bottom_thread_compaction_scheduled, 0);
+  ASSERT_EQ(num_compaction_jobs, 1);
+  ASSERT_GT(listener->num_compaction_started_, 0);
+  ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, OptimizeForPointLookup) {
+  Options options = CurrentOptions();
+  Close();
+  options.OptimizeForPointLookup(2);
+  ASSERT_OK(DB::Open(options, dbname_, &db_));
+
+  ASSERT_OK(Put("foo", "v1"));
+  ASSERT_EQ("v1", Get("foo"));
+  ASSERT_OK(Flush());
+  ASSERT_EQ("v1", Get("foo"));
+}
+
+TEST_F(DBTest2, OptimizeForSmallDB) {
+  Options options = CurrentOptions();
+  Close();
+  options.OptimizeForSmallDb();
+
+  // Find the cache object
+  ASSERT_TRUE(options.table_factory->IsInstanceOf(
+      TableFactory::kBlockBasedTableName()));
+  auto table_options =
+      options.table_factory->GetOptions<BlockBasedTableOptions>();
+
+  ASSERT_TRUE(table_options != nullptr);
+  std::shared_ptr<Cache> cache = table_options->block_cache;
+
+  ASSERT_EQ(0, cache->GetUsage());
+  ASSERT_OK(DB::Open(options, dbname_, &db_));
+  ASSERT_OK(Put("foo", "v1"));
+
+  // memtable size is costed to the block cache
+  ASSERT_NE(0, cache->GetUsage());
+
+  ASSERT_EQ("v1", Get("foo"));
+  ASSERT_OK(Flush());
+
+  size_t prev_size = cache->GetUsage();
+  // Remember block cache size, so that we can find that
+  // it is filled after Get().
+  // Use pinnable slice so that it can ping the block so that
+  // when we check the size it is not evicted.
+  PinnableSlice value;
+  ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), "foo", &value));
+  ASSERT_GT(cache->GetUsage(), prev_size);
+  value.Reset();
+}
+
+#endif  // ROCKSDB_LITE
+
+TEST_F(DBTest2, IterRaceFlush1) {
+  ASSERT_OK(Put("foo", "v1"));
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
+       {"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ROCKSDB_NAMESPACE::port::Thread t1([&] {
+    TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
+    ASSERT_OK(Put("foo", "v2"));
+    ASSERT_OK(Flush());
+    TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
+  });
+
+  // iterator is created after the first Put(), and its snapshot sequence is
+  // assigned after second Put(), so it must see v2.
+  {
+    std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
+    it->Seek("foo");
+    ASSERT_TRUE(it->Valid());
+    ASSERT_OK(it->status());
+    ASSERT_EQ("foo", it->key().ToString());
+    ASSERT_EQ("v2", it->value().ToString());
+  }
+
+  t1.join();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, IterRaceFlush2) {
+  ASSERT_OK(Put("foo", "v1"));
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
+       {"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ROCKSDB_NAMESPACE::port::Thread t1([&] {
+    TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
+    ASSERT_OK(Put("foo", "v2"));
+    ASSERT_OK(Flush());
+    TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
+  });
+
+  // iterator is created after the first Put(), and its snapshot sequence is
+  // assigned before second Put(), thus it must see v1.
+  {
+    std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
+    it->Seek("foo");
+    ASSERT_TRUE(it->Valid());
+    ASSERT_OK(it->status());
+    ASSERT_EQ("foo", it->key().ToString());
+    ASSERT_EQ("v1", it->value().ToString());
+  }
+
+  t1.join();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, IterRefreshRaceFlush) {
   ASSERT_OK(Put("foo", "v1"));
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
@@ -3246,18 +3772,21 @@ TEST_F(DBTest2, IterRefreshRaceFlush) {
   ROCKSDB_NAMESPACE::port::Thread t1([&] {
     TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:1");
     ASSERT_OK(Put("foo", "v2"));
-    Flush();
+    ASSERT_OK(Flush());
     TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:2");
   });
 
-  // iterator is created after the first Put(), so it should see either
-  // "v1" or "v2".
+  // iterator is refreshed after the first Put(), and its sequence number is
+  // assigned after second Put(), thus it must see v2.
   {
     std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
-    it->Refresh();
+    ASSERT_OK(it->status());
+    ASSERT_OK(it->Refresh());
     it->Seek("foo");
     ASSERT_TRUE(it->Valid());
+    ASSERT_OK(it->status());
     ASSERT_EQ("foo", it->key().ToString());
+    ASSERT_EQ("v2", it->value().ToString());
   }
 
   t1.join();
@@ -3276,7 +3805,7 @@ TEST_F(DBTest2, GetRaceFlush1) {
   ROCKSDB_NAMESPACE::port::Thread t1([&] {
     TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
     ASSERT_OK(Put("foo", "v2"));
-    Flush();
+    ASSERT_OK(Flush());
     TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
   });
 
@@ -3299,7 +3828,7 @@ TEST_F(DBTest2, GetRaceFlush2) {
   port::Thread t1([&] {
     TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
     ASSERT_OK(Put("foo", "v2"));
-    Flush();
+    ASSERT_OK(Flush());
     TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
   });
 
@@ -3372,6 +3901,7 @@ TEST_F(DBTest2, MemtableOnlyIterator) {
   ASSERT_EQ("second", value);
   // nothing should be returned using memtable-only iterator after flushing.
   it = db_->NewIterator(ropt, handles_[1]);
+  ASSERT_OK(it->status());
   count = 0;
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
     ASSERT_TRUE(it->Valid());
@@ -3379,11 +3909,13 @@ TEST_F(DBTest2, MemtableOnlyIterator) {
   }
   ASSERT_TRUE(!it->Valid());
   ASSERT_EQ(0, count);
+  ASSERT_OK(it->status());
   delete it;
 
   // Add a key to memtable
   ASSERT_OK(Put(1, "foobar", "third"));
   it = db_->NewIterator(ropt, handles_[1]);
+  ASSERT_OK(it->status());
   count = 0;
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
     ASSERT_TRUE(it->Valid());
@@ -3393,6 +3925,7 @@ TEST_F(DBTest2, MemtableOnlyIterator) {
   }
   ASSERT_TRUE(!it->Valid());
   ASSERT_EQ(1, count);
+  ASSERT_OK(it->status());
   delete it;
 }
 
@@ -3421,28 +3954,28 @@ TEST_F(DBTest2, LowPriWrite) {
   WriteOptions wo;
   for (int i = 0; i < 6; i++) {
     wo.low_pri = false;
-    Put("", "", wo);
+    ASSERT_OK(Put("", "", wo));
     wo.low_pri = true;
-    Put("", "", wo);
-    Flush();
+    ASSERT_OK(Put("", "", wo));
+    ASSERT_OK(Flush());
   }
   ASSERT_EQ(0, rate_limit_count.load());
   wo.low_pri = true;
-  Put("", "", wo);
+  ASSERT_OK(Put("", "", wo));
   ASSERT_EQ(1, rate_limit_count.load());
   wo.low_pri = false;
-  Put("", "", wo);
+  ASSERT_OK(Put("", "", wo));
   ASSERT_EQ(1, rate_limit_count.load());
 
   TEST_SYNC_POINT("DBTest.LowPriWrite:0");
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   wo.low_pri = true;
-  Put("", "", wo);
+  ASSERT_OK(Put("", "", wo));
   ASSERT_EQ(1, rate_limit_count.load());
   wo.low_pri = false;
-  Put("", "", wo);
+  ASSERT_OK(Put("", "", wo));
   ASSERT_EQ(1, rate_limit_count.load());
 }
 
@@ -3453,65 +3986,78 @@ TEST_F(DBTest2, RateLimitedCompactionReads) {
   const int kBytesPerKey = 1024;
   const int kNumL0Files = 4;
 
-  for (auto use_direct_io : {false, true}) {
-    if (use_direct_io && !IsDirectIOSupported()) {
-      continue;
-    }
-    Options options = CurrentOptions();
-    options.compression = kNoCompression;
-    options.level0_file_num_compaction_trigger = kNumL0Files;
-    options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
-    options.new_table_reader_for_compaction_inputs = true;
-    // takes roughly one second, split into 100 x 10ms intervals. Each interval
-    // permits 5.12KB, which is smaller than the block size, so this test
-    // exercises the code for chunking reads.
-    options.rate_limiter.reset(NewGenericRateLimiter(
-        static_cast<int64_t>(kNumL0Files * kNumKeysPerFile *
-                             kBytesPerKey) /* rate_bytes_per_sec */,
-        10 * 1000 /* refill_period_us */, 10 /* fairness */,
-        RateLimiter::Mode::kReadsOnly));
-    options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
-        use_direct_io;
-    BlockBasedTableOptions bbto;
-    bbto.block_size = 16384;
-    bbto.no_block_cache = true;
-    options.table_factory.reset(NewBlockBasedTableFactory(bbto));
-    DestroyAndReopen(options);
-
-    for (int i = 0; i < kNumL0Files; ++i) {
-      for (int j = 0; j <= kNumKeysPerFile; ++j) {
-        ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
+  for (int compaction_readahead_size : {0, 32 << 10}) {
+    for (auto use_direct_io : {false, true}) {
+      if (use_direct_io && !IsDirectIOSupported()) {
+        continue;
       }
-      dbfull()->TEST_WaitForFlushMemTable();
-      ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
-    }
-    dbfull()->TEST_WaitForCompact();
-    ASSERT_EQ(0, NumTableFilesAtLevel(0));
-
-    ASSERT_EQ(0, options.rate_limiter->GetTotalBytesThrough(Env::IO_HIGH));
-    // should be slightly above 512KB due to non-data blocks read. Arbitrarily
-    // chose 1MB as the upper bound on the total bytes read.
-    size_t rate_limited_bytes =
-        options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW);
-    // Include the explicit prefetch of the footer in direct I/O case.
-    size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
-    ASSERT_GE(
-        rate_limited_bytes,
-        static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
-    ASSERT_LT(
-        rate_limited_bytes,
-        static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
-                            direct_io_extra));
+      Options options = CurrentOptions();
+      options.compaction_readahead_size = compaction_readahead_size;
+      options.compression = kNoCompression;
+      options.level0_file_num_compaction_trigger = kNumL0Files;
+      options.memtable_factory.reset(
+          test::NewSpecialSkipListFactory(kNumKeysPerFile));
+      // takes roughly one second, split into 100 x 10ms intervals. Each
+      // interval permits 5.12KB, which is smaller than the block size, so this
+      // test exercises the code for chunking reads.
+      options.rate_limiter.reset(NewGenericRateLimiter(
+          static_cast<int64_t>(kNumL0Files * kNumKeysPerFile *
+                               kBytesPerKey) /* rate_bytes_per_sec */,
+          10 * 1000 /* refill_period_us */, 10 /* fairness */,
+          RateLimiter::Mode::kReadsOnly));
+      options.use_direct_reads =
+          options.use_direct_io_for_flush_and_compaction = use_direct_io;
+      BlockBasedTableOptions bbto;
+      bbto.block_size = 16384;
+      bbto.no_block_cache = true;
+      options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+      DestroyAndReopen(options);
 
-    Iterator* iter = db_->NewIterator(ReadOptions());
-    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
-      ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
+      for (int i = 0; i < kNumL0Files; ++i) {
+        for (int j = 0; j <= kNumKeysPerFile; ++j) {
+          ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
+        }
+        ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+        if (i + 1 < kNumL0Files) {
+          ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
+        }
+      }
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
+      ASSERT_EQ(0, NumTableFilesAtLevel(0));
+
+      // should be slightly above 512KB due to non-data blocks read. Arbitrarily
+      // chose 1MB as the upper bound on the total bytes read.
+      size_t rate_limited_bytes = static_cast<size_t>(
+          options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL));
+      // The charges can exist for `IO_LOW` and `IO_USER` priorities.
+      size_t rate_limited_bytes_by_pri =
+          options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW) +
+          options.rate_limiter->GetTotalBytesThrough(Env::IO_USER);
+      ASSERT_EQ(rate_limited_bytes,
+                static_cast<size_t>(rate_limited_bytes_by_pri));
+      // Include the explicit prefetch of the footer in direct I/O case.
+      size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
+      ASSERT_GE(
+          rate_limited_bytes,
+          static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
+      ASSERT_LT(
+          rate_limited_bytes,
+          static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
+                              direct_io_extra));
+
+      Iterator* iter = db_->NewIterator(ReadOptions());
+      ASSERT_OK(iter->status());
+      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+        ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
+      }
+      delete iter;
+      // bytes read for user iterator shouldn't count against the rate limit.
+      rate_limited_bytes_by_pri =
+          options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW) +
+          options.rate_limiter->GetTotalBytesThrough(Env::IO_USER);
+      ASSERT_EQ(rate_limited_bytes,
+                static_cast<size_t>(rate_limited_bytes_by_pri));
     }
-    delete iter;
-    // bytes read for user iterator shouldn't count against the rate limit.
-    ASSERT_EQ(rate_limited_bytes,
-              static_cast<size_t>(
-                  options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
   }
 }
 #endif  // ROCKSDB_LITE
@@ -3524,8 +4070,8 @@ TEST_F(DBTest2, ReduceLevel) {
   options.disable_auto_compactions = true;
   options.num_levels = 7;
   Reopen(options);
-  Put("foo", "bar");
-  Flush();
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Flush());
   MoveFilesToLevel(6);
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
@@ -3533,7 +4079,7 @@ TEST_F(DBTest2, ReduceLevel) {
   CompactRangeOptions compact_options;
   compact_options.change_level = true;
   compact_options.target_level = 1;
-  dbfull()->CompactRange(compact_options, nullptr, nullptr);
+  ASSERT_OK(dbfull()->CompactRange(compact_options, nullptr, nullptr));
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("0,1", FilesPerLevel());
 #endif  // !ROCKSDB_LITE
@@ -3562,35 +4108,35 @@ TEST_F(DBTest2, ReadCallbackTest) {
   // the DB instead of assuming what seq the DB used.
   int i = 1;
   for (; i < 10; i++) {
-    Put(key, value + std::to_string(i));
+    ASSERT_OK(Put(key, value + std::to_string(i)));
     // Take a snapshot to avoid the value being removed during compaction
     auto snapshot = dbfull()->GetSnapshot();
     snapshots.push_back(snapshot);
   }
-  Flush();
+  ASSERT_OK(Flush());
   for (; i < 20; i++) {
-    Put(key, value + std::to_string(i));
+    ASSERT_OK(Put(key, value + std::to_string(i)));
     // Take a snapshot to avoid the value being removed during compaction
     auto snapshot = dbfull()->GetSnapshot();
     snapshots.push_back(snapshot);
   }
-  Flush();
+  ASSERT_OK(Flush());
   MoveFilesToLevel(6);
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
 #endif  // !ROCKSDB_LITE
   for (; i < 30; i++) {
-    Put(key, value + std::to_string(i));
+    ASSERT_OK(Put(key, value + std::to_string(i)));
     auto snapshot = dbfull()->GetSnapshot();
     snapshots.push_back(snapshot);
   }
-  Flush();
+  ASSERT_OK(Flush());
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel());
 #endif  // !ROCKSDB_LITE
   // And also add some values to the memtable
   for (; i < 40; i++) {
-    Put(key, value + std::to_string(i));
+    ASSERT_OK(Put(key, value + std::to_string(i)));
     auto snapshot = dbfull()->GetSnapshot();
     snapshots.push_back(snapshot);
   }
@@ -3663,40 +4209,46 @@ TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) {
       [&](void* /*arg*/) { env_->SleepForMicroseconds(1000000); });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
-  Put("key", "val");
+  ASSERT_OK(Put("key", "val"));
   FlushOptions flush_opts;
   flush_opts.wait = false;
   db_->Flush(flush_opts);
   TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered");
 
-  db_->DisableFileDeletions();
+  ASSERT_OK(db_->DisableFileDeletions());
   VectorLogPtr log_files;
-  db_->GetSortedWalFiles(log_files);
+  ASSERT_OK(db_->GetSortedWalFiles(log_files));
   TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured");
   for (const auto& log_file : log_files) {
     ASSERT_OK(env_->FileExists(LogFileName(dbname_, log_file->LogNumber())));
   }
 
-  db_->EnableFileDeletions();
+  ASSERT_OK(db_->EnableFileDeletions());
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
 TEST_F(DBTest2, TestNumPread) {
   Options options = CurrentOptions();
+  bool prefetch_supported =
+      test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
   // disable block cache
   BlockBasedTableOptions table_options;
   table_options.no_block_cache = true;
   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
   Reopen(options);
   env_->count_random_reads_ = true;
-
   env_->random_file_open_counter_.store(0);
   ASSERT_OK(Put("bar", "foo"));
   ASSERT_OK(Put("foo", "bar"));
   ASSERT_OK(Flush());
-  // After flush, we'll open the file and read footer, meta block,
-  // property block and index block.
-  ASSERT_EQ(4, env_->random_read_counter_.Read());
+  if (prefetch_supported) {
+    // After flush, we'll open the file and read footer, meta block,
+    // property block and index block.
+    ASSERT_EQ(4, env_->random_read_counter_.Read());
+  } else {
+    // With prefetch not supported, we will do a single read into a buffer
+    ASSERT_EQ(1, env_->random_read_counter_.Read());
+  }
   ASSERT_EQ(1, env_->random_file_open_counter_.load());
 
   // One pread per a normal data block read
@@ -3712,19 +4264,30 @@ TEST_F(DBTest2, TestNumPread) {
   ASSERT_OK(Put("bar2", "foo2"));
   ASSERT_OK(Put("foo2", "bar2"));
   ASSERT_OK(Flush());
-  // After flush, we'll open the file and read footer, meta block,
-  // property block and index block.
-  ASSERT_EQ(4, env_->random_read_counter_.Read());
+  if (prefetch_supported) {
+    // After flush, we'll open the file and read footer, meta block,
+    // property block and index block.
+    ASSERT_EQ(4, env_->random_read_counter_.Read());
+  } else {
+    // With prefetch not supported, we will do a single read into a buffer
+    ASSERT_EQ(1, env_->random_read_counter_.Read());
+  }
   ASSERT_EQ(1, env_->random_file_open_counter_.load());
 
-  // Compaction needs two input blocks, which requires 2 preads, and
-  // generate a new SST file which needs 4 preads (footer, meta block,
-  // property block and index block). In total 6.
   env_->random_file_open_counter_.store(0);
   env_->random_read_counter_.Reset();
   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
-  ASSERT_EQ(6, env_->random_read_counter_.Read());
-  // All compactin input files should have already been opened.
+  if (prefetch_supported) {
+    // Compaction needs two input blocks, which requires 2 preads, and
+    // generate a new SST file which needs 4 preads (footer, meta block,
+    // property block and index block). In total 6.
+    ASSERT_EQ(6, env_->random_read_counter_.Read());
+  } else {
+    // With prefetch off, compaction needs two input blocks,
+    // followed by a single buffered read.  In total 3.
+    ASSERT_EQ(3, env_->random_read_counter_.Read());
+  }
+  // All compaction input files should have already been opened.
   ASSERT_EQ(1, env_->random_file_open_counter_.load());
 
   // One pread per a normal data block read
@@ -3736,22 +4299,298 @@ TEST_F(DBTest2, TestNumPread) {
   ASSERT_EQ(0, env_->random_file_open_counter_.load());
 }
 
-TEST_F(DBTest2, TraceAndReplay) {
-  Options options = CurrentOptions();
-  options.merge_operator = MergeOperators::CreatePutOperator();
-  ReadOptions ro;
-  WriteOptions wo;
-  TraceOptions trace_opts;
-  EnvOptions env_opts;
-  CreateAndReopenWithCF({"pikachu"}, options);
-  Random rnd(301);
-  Iterator* single_iter = nullptr;
-
-  ASSERT_TRUE(db_->EndTrace().IsIOError());
+class TraceExecutionResultHandler : public TraceRecordResult::Handler {
+ public:
+  TraceExecutionResultHandler() {}
+  ~TraceExecutionResultHandler() override {}
 
-  std::string trace_filename = dbname_ + "/rocksdb.trace";
-  std::unique_ptr<TraceWriter> trace_writer;
-  ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
+  virtual Status Handle(const StatusOnlyTraceExecutionResult& result) override {
+    if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+      return Status::InvalidArgument("Invalid timestamps.");
+    }
+    result.GetStatus().PermitUncheckedError();
+    switch (result.GetTraceType()) {
+      case kTraceWrite: {
+        total_latency_ += result.GetLatency();
+        cnt_++;
+        writes_++;
+        break;
+      }
+      default:
+        return Status::Corruption("Type mismatch.");
+    }
+    return Status::OK();
+  }
+
+  virtual Status Handle(
+      const SingleValueTraceExecutionResult& result) override {
+    if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+      return Status::InvalidArgument("Invalid timestamps.");
+    }
+    result.GetStatus().PermitUncheckedError();
+    switch (result.GetTraceType()) {
+      case kTraceGet: {
+        total_latency_ += result.GetLatency();
+        cnt_++;
+        gets_++;
+        break;
+      }
+      default:
+        return Status::Corruption("Type mismatch.");
+    }
+    return Status::OK();
+  }
+
+  virtual Status Handle(
+      const MultiValuesTraceExecutionResult& result) override {
+    if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+      return Status::InvalidArgument("Invalid timestamps.");
+    }
+    for (const Status& s : result.GetMultiStatus()) {
+      s.PermitUncheckedError();
+    }
+    switch (result.GetTraceType()) {
+      case kTraceMultiGet: {
+        total_latency_ += result.GetLatency();
+        cnt_++;
+        multigets_++;
+        break;
+      }
+      default:
+        return Status::Corruption("Type mismatch.");
+    }
+    return Status::OK();
+  }
+
+  virtual Status Handle(const IteratorTraceExecutionResult& result) override {
+    if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
+      return Status::InvalidArgument("Invalid timestamps.");
+    }
+    result.GetStatus().PermitUncheckedError();
+    switch (result.GetTraceType()) {
+      case kTraceIteratorSeek:
+      case kTraceIteratorSeekForPrev: {
+        total_latency_ += result.GetLatency();
+        cnt_++;
+        seeks_++;
+        break;
+      }
+      default:
+        return Status::Corruption("Type mismatch.");
+    }
+    return Status::OK();
+  }
+
+  void Reset() {
+    total_latency_ = 0;
+    cnt_ = 0;
+    writes_ = 0;
+    gets_ = 0;
+    seeks_ = 0;
+    multigets_ = 0;
+  }
+
+  double GetAvgLatency() const {
+    return cnt_ == 0 ? 0.0 : 1.0 * total_latency_ / cnt_;
+  }
+
+  int GetNumWrites() const { return writes_; }
+
+  int GetNumGets() const { return gets_; }
+
+  int GetNumIterSeeks() const { return seeks_; }
+
+  int GetNumMultiGets() const { return multigets_; }
+
+ private:
+  std::atomic<uint64_t> total_latency_{0};
+  std::atomic<uint32_t> cnt_{0};
+  std::atomic<int> writes_{0};
+  std::atomic<int> gets_{0};
+  std::atomic<int> seeks_{0};
+  std::atomic<int> multigets_{0};
+};
+
+TEST_F(DBTest2, TraceAndReplay) {
+  Options options = CurrentOptions();
+  options.merge_operator = MergeOperators::CreatePutOperator();
+  ReadOptions ro;
+  WriteOptions wo;
+  TraceOptions trace_opts;
+  EnvOptions env_opts;
+  CreateAndReopenWithCF({"pikachu"}, options);
+  Random rnd(301);
+  Iterator* single_iter = nullptr;
+
+  ASSERT_TRUE(db_->EndTrace().IsIOError());
+
+  std::string trace_filename = dbname_ + "/rocksdb.trace";
+  std::unique_ptr<TraceWriter> trace_writer;
+  ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
+  ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
+
+  // 5 Writes
+  ASSERT_OK(Put(0, "a", "1"));
+  ASSERT_OK(Merge(0, "b", "2"));
+  ASSERT_OK(Delete(0, "c"));
+  ASSERT_OK(SingleDelete(0, "d"));
+  ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
+
+  // 6th Write
+  WriteBatch batch;
+  ASSERT_OK(batch.Put("f", "11"));
+  ASSERT_OK(batch.Merge("g", "12"));
+  ASSERT_OK(batch.Delete("h"));
+  ASSERT_OK(batch.SingleDelete("i"));
+  ASSERT_OK(batch.DeleteRange("j", "k"));
+  ASSERT_OK(db_->Write(wo, &batch));
+
+  // 2 Seek(ForPrev)s
+  single_iter = db_->NewIterator(ro);
+  single_iter->Seek("f");  // Seek 1
+  single_iter->SeekForPrev("g");
+  ASSERT_OK(single_iter->status());
+  delete single_iter;
+
+  // 2 Gets
+  ASSERT_EQ("1", Get(0, "a"));
+  ASSERT_EQ("12", Get(0, "g"));
+
+  // 7th and 8th Write, 3rd Get
+  ASSERT_OK(Put(1, "foo", "bar"));
+  ASSERT_OK(Put(1, "rocksdb", "rocks"));
+  ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
+
+  // Total Write x 8, Get x 3, Seek x 2.
+  ASSERT_OK(db_->EndTrace());
+  // These should not get into the trace file as it is after EndTrace.
+  ASSERT_OK(Put("hello", "world"));
+  ASSERT_OK(Merge("foo", "bar"));
+
+  // Open another db, replay, and verify the data
+  std::string value;
+  std::string dbname2 = test::PerThreadDBPath(env_, "/db_replay");
+  ASSERT_OK(DestroyDB(dbname2, options));
+
+  // Using a different name than db2, to pacify infer's use-after-lifetime
+  // warnings (http://fbinfer.com).
+  DB* db2_init = nullptr;
+  options.create_if_missing = true;
+  ASSERT_OK(DB::Open(options, dbname2, &db2_init));
+  ColumnFamilyHandle* cf;
+  ASSERT_OK(
+      db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
+  delete cf;
+  delete db2_init;
+
+  DB* db2 = nullptr;
+  std::vector<ColumnFamilyDescriptor> column_families;
+  ColumnFamilyOptions cf_options;
+  cf_options.merge_operator = MergeOperators::CreatePutOperator();
+  column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
+  column_families.push_back(
+      ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
+  std::vector<ColumnFamilyHandle*> handles;
+  DBOptions db_opts;
+  db_opts.env = env_;
+  ASSERT_OK(DB::Open(db_opts, dbname2, column_families, &handles, &db2));
+
+  env_->SleepForMicroseconds(100);
+  // Verify that the keys don't already exist
+  ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
+  ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
+
+  std::unique_ptr<TraceReader> trace_reader;
+  ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
+  std::unique_ptr<Replayer> replayer;
+  ASSERT_OK(
+      db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+
+  TraceExecutionResultHandler res_handler;
+  std::function<void(Status, std::unique_ptr<TraceRecordResult> &&)> res_cb =
+      [&res_handler](Status exec_s, std::unique_ptr<TraceRecordResult>&& res) {
+        ASSERT_TRUE(exec_s.ok() || exec_s.IsNotSupported());
+        if (res != nullptr) {
+          ASSERT_OK(res->Accept(&res_handler));
+          res.reset();
+        }
+      };
+
+  // Unprepared replay should fail with Status::Incomplete()
+  ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
+  ASSERT_OK(replayer->Prepare());
+  // Ok to repeatedly Prepare().
+  ASSERT_OK(replayer->Prepare());
+  // Replay using 1 thread, 1x speed.
+  ASSERT_OK(replayer->Replay(ReplayOptions(1, 1.0), res_cb));
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 8);
+  ASSERT_EQ(res_handler.GetNumGets(), 3);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
+
+  ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
+  ASSERT_EQ("1", value);
+  ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
+  ASSERT_EQ("12", value);
+  ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
+  ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
+
+  ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
+  ASSERT_EQ("bar", value);
+  ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
+  ASSERT_EQ("rocks", value);
+
+  // Re-replay should fail with Status::Incomplete() if Prepare() was not
+  // called. Currently we don't distinguish between unprepared and trace end.
+  ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
+
+  // Re-replay using 2 threads, 2x speed.
+  ASSERT_OK(replayer->Prepare());
+  ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0), res_cb));
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 8);
+  ASSERT_EQ(res_handler.GetNumGets(), 3);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
+
+  // Re-replay using 2 threads, 1/2 speed.
+  ASSERT_OK(replayer->Prepare());
+  ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5), res_cb));
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 8);
+  ASSERT_EQ(res_handler.GetNumGets(), 3);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
+
+  replayer.reset();
+
+  for (auto handle : handles) {
+    delete handle;
+  }
+  delete db2;
+  ASSERT_OK(DestroyDB(dbname2, options));
+}
+
+TEST_F(DBTest2, TraceAndManualReplay) {
+  Options options = CurrentOptions();
+  options.merge_operator = MergeOperators::CreatePutOperator();
+  ReadOptions ro;
+  WriteOptions wo;
+  TraceOptions trace_opts;
+  EnvOptions env_opts;
+  CreateAndReopenWithCF({"pikachu"}, options);
+  Random rnd(301);
+  Iterator* single_iter = nullptr;
+
+  ASSERT_TRUE(db_->EndTrace().IsIOError());
+
+  std::string trace_filename = dbname_ + "/rocksdb.trace";
+  std::unique_ptr<TraceWriter> trace_writer;
+  ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
   ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
 
   ASSERT_OK(Put(0, "a", "1"));
@@ -3771,6 +4610,37 @@ TEST_F(DBTest2, TraceAndReplay) {
   single_iter = db_->NewIterator(ro);
   single_iter->Seek("f");
   single_iter->SeekForPrev("g");
+  ASSERT_OK(single_iter->status());
+  delete single_iter;
+
+  // Write some sequenced keys for testing lower/upper bounds of iterator.
+  batch.Clear();
+  ASSERT_OK(batch.Put("iter-0", "iter-0"));
+  ASSERT_OK(batch.Put("iter-1", "iter-1"));
+  ASSERT_OK(batch.Put("iter-2", "iter-2"));
+  ASSERT_OK(batch.Put("iter-3", "iter-3"));
+  ASSERT_OK(batch.Put("iter-4", "iter-4"));
+  ASSERT_OK(db_->Write(wo, &batch));
+
+  ReadOptions bounded_ro = ro;
+  Slice lower_bound("iter-1");
+  Slice upper_bound("iter-3");
+  bounded_ro.iterate_lower_bound = &lower_bound;
+  bounded_ro.iterate_upper_bound = &upper_bound;
+  single_iter = db_->NewIterator(bounded_ro);
+  single_iter->Seek("iter-0");
+  ASSERT_EQ(single_iter->key().ToString(), "iter-1");
+  single_iter->Seek("iter-2");
+  ASSERT_EQ(single_iter->key().ToString(), "iter-2");
+  single_iter->Seek("iter-4");
+  ASSERT_FALSE(single_iter->Valid());
+  single_iter->SeekForPrev("iter-0");
+  ASSERT_FALSE(single_iter->Valid());
+  single_iter->SeekForPrev("iter-2");
+  ASSERT_EQ(single_iter->key().ToString(), "iter-2");
+  single_iter->SeekForPrev("iter-4");
+  ASSERT_EQ(single_iter->key().ToString(), "iter-2");
+  ASSERT_OK(single_iter->status());
   delete single_iter;
 
   ASSERT_EQ("1", Get(0, "a"));
@@ -3780,10 +4650,14 @@ TEST_F(DBTest2, TraceAndReplay) {
   ASSERT_OK(Put(1, "rocksdb", "rocks"));
   ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
 
+  // Same as TraceAndReplay, Write x 8, Get x 3, Seek x 2.
+  // Plus 1 WriteBatch for iterator with lower/upper bounds, and 6
+  // Seek(ForPrev)s.
+  // Total Write x 9, Get x 3, Seek x 8
   ASSERT_OK(db_->EndTrace());
   // These should not get into the trace file as it is after EndTrace.
-  Put("hello", "world");
-  Merge("foo", "bar");
+  ASSERT_OK(Put("hello", "world"));
+  ASSERT_OK(Merge("foo", "bar"));
 
   // Open another db, replay, and verify the data
   std::string value;
@@ -3820,8 +4694,76 @@ TEST_F(DBTest2, TraceAndReplay) {
 
   std::unique_ptr<TraceReader> trace_reader;
   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
-  Replayer replayer(db2, handles_, std::move(trace_reader));
-  ASSERT_OK(replayer.Replay());
+  std::unique_ptr<Replayer> replayer;
+  ASSERT_OK(
+      db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+
+  TraceExecutionResultHandler res_handler;
+
+  // Manual replay for 2 times. The 2nd checks if the replay can restart.
+  std::unique_ptr<TraceRecord> record;
+  std::unique_ptr<TraceRecordResult> result;
+  for (int i = 0; i < 2; i++) {
+    // Next should fail if unprepared.
+    ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
+    ASSERT_OK(replayer->Prepare());
+    Status s = Status::OK();
+    // Looping until trace end.
+    while (s.ok()) {
+      s = replayer->Next(&record);
+      // Skip unsupported operations.
+      if (s.IsNotSupported()) {
+        continue;
+      }
+      if (s.ok()) {
+        ASSERT_OK(replayer->Execute(record, &result));
+        if (result != nullptr) {
+          ASSERT_OK(result->Accept(&res_handler));
+          if (record->GetTraceType() == kTraceIteratorSeek ||
+              record->GetTraceType() == kTraceIteratorSeekForPrev) {
+            IteratorSeekQueryTraceRecord* iter_rec =
+                dynamic_cast<IteratorSeekQueryTraceRecord*>(record.get());
+            IteratorTraceExecutionResult* iter_res =
+                dynamic_cast<IteratorTraceExecutionResult*>(result.get());
+            // Check if lower/upper bounds are correctly saved and decoded.
+            std::string lower_str = iter_rec->GetLowerBound().ToString();
+            std::string upper_str = iter_rec->GetUpperBound().ToString();
+            std::string iter_key = iter_res->GetKey().ToString();
+            std::string iter_value = iter_res->GetValue().ToString();
+            if (!lower_str.empty() && !upper_str.empty()) {
+              ASSERT_EQ(lower_str, "iter-1");
+              ASSERT_EQ(upper_str, "iter-3");
+              if (iter_res->GetValid()) {
+                // If iterator is valid, then lower_bound <= key < upper_bound.
+                ASSERT_GE(iter_key, lower_str);
+                ASSERT_LT(iter_key, upper_str);
+              } else {
+                // If iterator is invalid, then
+                //   key < lower_bound or key >= upper_bound.
+                ASSERT_TRUE(iter_key < lower_str || iter_key >= upper_str);
+              }
+            }
+            // If iterator is invalid, the key and value should be empty.
+            if (!iter_res->GetValid()) {
+              ASSERT_TRUE(iter_key.empty());
+              ASSERT_TRUE(iter_value.empty());
+            }
+          }
+          result.reset();
+        }
+      }
+    }
+    // Status::Incomplete() will be returned when manually reading the trace
+    // end, or Prepare() was not called.
+    ASSERT_TRUE(s.IsIncomplete());
+    ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
+    ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+    ASSERT_EQ(res_handler.GetNumWrites(), 9);
+    ASSERT_EQ(res_handler.GetNumGets(), 3);
+    ASSERT_EQ(res_handler.GetNumIterSeeks(), 8);
+    ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+    res_handler.Reset();
+  }
 
   ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
   ASSERT_EQ("1", value);
@@ -3835,6 +4777,138 @@ TEST_F(DBTest2, TraceAndReplay) {
   ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
   ASSERT_EQ("rocks", value);
 
+  // Test execution of artificially created TraceRecords.
+  uint64_t fake_ts = 1U;
+  // Write
+  batch.Clear();
+  ASSERT_OK(batch.Put("trace-record-write1", "write1"));
+  ASSERT_OK(batch.Put("trace-record-write2", "write2"));
+  record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  ASSERT_OK(result->Accept(&res_handler));  // Write x 1
+  ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value));
+  ASSERT_EQ("write1", value);
+  ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value));
+  ASSERT_EQ("write2", value);
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 1);
+  ASSERT_EQ(res_handler.GetNumGets(), 0);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
+
+  // Get related
+  // Get an existing key.
+  record.reset(new GetQueryTraceRecord(handles[0]->GetID(),
+                                       "trace-record-write1", fake_ts++));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  ASSERT_OK(result->Accept(&res_handler));  // Get x 1
+  // Get an non-existing key, should still return Status::OK().
+  record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get",
+                                       fake_ts++));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  ASSERT_OK(result->Accept(&res_handler));  // Get x 2
+  // Get from an invalid (non-existing) cf_id.
+  uint32_t invalid_cf_id = handles[1]->GetID() + 1;
+  record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++));
+  ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+  ASSERT_TRUE(result == nullptr);
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 0);
+  ASSERT_EQ(res_handler.GetNumGets(), 2);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
+
+  // Iteration related
+  for (IteratorSeekQueryTraceRecord::SeekType seekType :
+       {IteratorSeekQueryTraceRecord::kSeek,
+        IteratorSeekQueryTraceRecord::kSeekForPrev}) {
+    // Seek to an existing key.
+    record.reset(new IteratorSeekQueryTraceRecord(
+        seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++));
+    ASSERT_OK(replayer->Execute(record, &result));
+    ASSERT_TRUE(result != nullptr);
+    ASSERT_OK(result->Accept(&res_handler));  // Seek x 1 in one iteration
+    // Seek to an non-existing key, should still return Status::OK().
+    record.reset(new IteratorSeekQueryTraceRecord(
+        seekType, handles[0]->GetID(), "trace-record-get", fake_ts++));
+    ASSERT_OK(replayer->Execute(record, &result));
+    ASSERT_TRUE(result != nullptr);
+    ASSERT_OK(result->Accept(&res_handler));  // Seek x 2 in one iteration
+    // Seek from an invalid cf_id.
+    record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id,
+                                                  "whatever", fake_ts++));
+    ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+    ASSERT_TRUE(result == nullptr);
+  }
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 0);
+  ASSERT_EQ(res_handler.GetNumGets(), 0);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 4);  // Seek x 2 in two iterations
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
+  res_handler.Reset();
+
+  // MultiGet related
+  // Get existing keys.
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+      std::vector<std::string>({"a", "foo"}), fake_ts++));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  ASSERT_OK(result->Accept(&res_handler));  // MultiGet x 1
+  // Get all non-existing keys, should still return Status::OK().
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+      std::vector<std::string>({"no1", "no2"}), fake_ts++));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  ASSERT_OK(result->Accept(&res_handler));  // MultiGet x 2
+  // Get mixed of existing and non-existing keys, should still return
+  // Status::OK().
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+      std::vector<std::string>({"a", "no2"}), fake_ts++));
+  ASSERT_OK(replayer->Execute(record, &result));
+  ASSERT_TRUE(result != nullptr);
+  MultiValuesTraceExecutionResult* mvr =
+      dynamic_cast<MultiValuesTraceExecutionResult*>(result.get());
+  ASSERT_TRUE(mvr != nullptr);
+  ASSERT_OK(mvr->GetMultiStatus()[0]);
+  ASSERT_TRUE(mvr->GetMultiStatus()[1].IsNotFound());
+  ASSERT_EQ(mvr->GetValues()[0], "1");
+  ASSERT_EQ(mvr->GetValues()[1], "");
+  ASSERT_OK(result->Accept(&res_handler));  // MultiGet x 3
+  // Get from an invalid (non-existing) cf_id.
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>(
+          {handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}),
+      std::vector<std::string>({"a", "foo", "whatever"}), fake_ts++));
+  ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
+  ASSERT_TRUE(result == nullptr);
+  // Empty MultiGet
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>(), std::vector<std::string>(), fake_ts++));
+  ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
+  ASSERT_TRUE(result == nullptr);
+  // MultiGet size mismatch
+  record.reset(new MultiGetQueryTraceRecord(
+      std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
+      std::vector<std::string>({"a"}), fake_ts++));
+  ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
+  ASSERT_TRUE(result == nullptr);
+  ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
+  ASSERT_EQ(res_handler.GetNumWrites(), 0);
+  ASSERT_EQ(res_handler.GetNumGets(), 0);
+  ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
+  ASSERT_EQ(res_handler.GetNumMultiGets(), 3);
+  res_handler.Reset();
+
+  replayer.reset();
+
   for (auto handle : handles) {
     delete handle;
   }
@@ -3898,8 +4972,12 @@ TEST_F(DBTest2, TraceWithLimit) {
 
   std::unique_ptr<TraceReader> trace_reader;
   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
-  Replayer replayer(db2, handles_, std::move(trace_reader));
-  ASSERT_OK(replayer.Replay());
+  std::unique_ptr<Replayer> replayer;
+  ASSERT_OK(
+      db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+  ASSERT_OK(replayer->Prepare());
+  ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
+  replayer.reset();
 
   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
   ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
@@ -3969,8 +5047,12 @@ TEST_F(DBTest2, TraceWithSampling) {
 
   std::unique_ptr<TraceReader> trace_reader;
   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
-  Replayer replayer(db2, handles_, std::move(trace_reader));
-  ASSERT_OK(replayer.Replay());
+  std::unique_ptr<Replayer> replayer;
+  ASSERT_OK(
+      db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+  ASSERT_OK(replayer->Prepare());
+  ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
+  replayer.reset();
 
   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
   ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
@@ -4031,12 +5113,12 @@ TEST_F(DBTest2, TraceWithFilter) {
 
   ASSERT_OK(db_->EndTrace());
   // These should not get into the trace file as it is after EndTrace.
-  Put("hello", "world");
-  Merge("foo", "bar");
+  ASSERT_OK(Put("hello", "world"));
+  ASSERT_OK(Merge("foo", "bar"));
 
   // Open another db, replay, and verify the data
   std::string value;
-  std::string dbname2 = test::TmpDir(env_) + "/db_replay";
+  std::string dbname2 = test::PerThreadDBPath(env_, "db_replay");
   ASSERT_OK(DestroyDB(dbname2, options));
 
   // Using a different name than db2, to pacify infer's use-after-lifetime
@@ -4069,8 +5151,12 @@ TEST_F(DBTest2, TraceWithFilter) {
 
   std::unique_ptr<TraceReader> trace_reader;
   ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
-  Replayer replayer(db2, handles_, std::move(trace_reader));
-  ASSERT_OK(replayer.Replay());
+  std::unique_ptr<Replayer> replayer;
+  ASSERT_OK(
+      db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
+  ASSERT_OK(replayer->Prepare());
+  ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
+  replayer.reset();
 
   // All the key-values should not present since we filter out the WRITE ops.
   ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
@@ -4087,7 +5173,7 @@ TEST_F(DBTest2, TraceWithFilter) {
   ASSERT_OK(DestroyDB(dbname2, options));
 
   // Set up a new db.
-  std::string dbname3 = test::TmpDir(env_) + "/db_not_trace_read";
+  std::string dbname3 = test::PerThreadDBPath(env_, "db_not_trace_read");
   ASSERT_OK(DestroyDB(dbname3, options));
 
   DB* db3_init = nullptr;
@@ -4105,7 +5191,7 @@ TEST_F(DBTest2, TraceWithFilter) {
       ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
   handles.clear();
 
-  DB* db3 =  nullptr;
+  DB* db3 = nullptr;
   ASSERT_OK(DB::Open(db_opts, dbname3, column_families, &handles, &db3));
 
   env_->SleepForMicroseconds(100);
@@ -4113,12 +5199,12 @@ TEST_F(DBTest2, TraceWithFilter) {
   ASSERT_TRUE(db3->Get(ro, handles[0], "a", &value).IsNotFound());
   ASSERT_TRUE(db3->Get(ro, handles[0], "g", &value).IsNotFound());
 
-  //The tracer will not record the READ ops.
+  // The tracer will not record the READ ops.
   trace_opts.filter = TraceFilterType::kTraceFilterGet;
   std::string trace_filename3 = dbname_ + "/rocksdb.trace_3";
   std::unique_ptr<TraceWriter> trace_writer3;
   ASSERT_OK(
-    NewFileTraceWriter(env_, env_opts, trace_filename3, &trace_writer3));
+      NewFileTraceWriter(env_, env_opts, trace_filename3, &trace_writer3));
   ASSERT_OK(db3->StartTrace(trace_opts, std::move(trace_writer3)));
 
   ASSERT_OK(db3->Put(wo, handles[0], "a", "1"));
@@ -4140,7 +5226,7 @@ TEST_F(DBTest2, TraceWithFilter) {
 
   std::unique_ptr<TraceReader> trace_reader3;
   ASSERT_OK(
-    NewFileTraceReader(env_, env_opts, trace_filename3, &trace_reader3));
+      NewFileTraceReader(env_, env_opts, trace_filename3, &trace_reader3));
 
   // Count the number of records in the trace file;
   int count = 0;
@@ -4181,9 +5267,9 @@ TEST_F(DBTest2, PinnableSliceAndMmapReads) {
   ASSERT_FALSE(pinned_value.IsPinned());
   ASSERT_EQ(pinned_value.ToString(), "bar");
 
-  dbfull()->TEST_CompactRange(0 /* level */, nullptr /* begin */,
-                              nullptr /* end */, nullptr /* column_family */,
-                              true /* disallow_trivial_move */);
+  ASSERT_OK(dbfull()->TEST_CompactRange(
+      0 /* level */, nullptr /* begin */, nullptr /* end */,
+      nullptr /* column_family */, true /* disallow_trivial_move */));
 
   // Ensure pinned_value doesn't rely on memory munmap'd by the above
   // compaction. It crashes if it does.
@@ -4227,10 +5313,10 @@ TEST_F(DBTest2, DISABLED_IteratorPinnedMemory) {
 
   // Since v is the size of a block, each key should take a block
   // of 400+ bytes.
-  Put("1", v);
-  Put("3", v);
-  Put("5", v);
-  Put("7", v);
+  ASSERT_OK(Put("1", v));
+  ASSERT_OK(Put("3", v));
+  ASSERT_OK(Put("5", v));
+  ASSERT_OK(Put("7", v));
   ASSERT_OK(Flush());
 
   ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
@@ -4259,16 +5345,18 @@ TEST_F(DBTest2, DISABLED_IteratorPinnedMemory) {
     iter->Seek("3");
     ASSERT_TRUE(iter->Valid());
 
+    ASSERT_OK(iter->status());
+
     ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
     ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
   }
   ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
 
   // Test compaction case
-  Put("2", v);
-  Put("5", v);
-  Put("6", v);
-  Put("8", v);
+  ASSERT_OK(Put("2", v));
+  ASSERT_OK(Put("5", v));
+  ASSERT_OK(Put("6", v));
+  ASSERT_OK(Put("8", v));
   ASSERT_OK(Flush());
 
   // Clear existing data in block cache
@@ -4327,20 +5415,20 @@ TEST_F(DBTest2, TestBBTTailPrefetch) {
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
-  Put("1", "1");
-  Put("9", "1");
-  Flush();
+  ASSERT_OK(Put("1", "1"));
+  ASSERT_OK(Put("9", "1"));
+  ASSERT_OK(Flush());
 
   expected_lower_bound = 0;
   expected_higher_bound = 8 * 1024;
 
-  Put("1", "1");
-  Put("9", "1");
-  Flush();
+  ASSERT_OK(Put("1", "1"));
+  ASSERT_OK(Put("9", "1"));
+  ASSERT_OK(Flush());
 
-  Put("1", "1");
-  Put("9", "1");
-  Flush();
+  ASSERT_OK(Put("1", "1"));
+  ASSERT_OK(Put("9", "1"));
+  ASSERT_OK(Flush());
 
   // Full compaction to make sure there is no L0 file after the open.
   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
@@ -4373,13 +5461,13 @@ TEST_F(DBTest2, TestBBTTailPrefetch) {
   options.max_open_files = -1;
   Reopen(options);
 
-  Put("1", "1");
-  Put("9", "1");
-  Flush();
+  ASSERT_OK(Put("1", "1"));
+  ASSERT_OK(Put("9", "1"));
+  ASSERT_OK(Flush());
 
-  Put("1", "1");
-  Put("9", "1");
-  Flush();
+  ASSERT_OK(Put("1", "1"));
+  ASSERT_OK(Put("9", "1"));
+  ASSERT_OK(Flush());
 
   ASSERT_TRUE(called.load());
   called = false;
@@ -4414,16 +5502,20 @@ TEST_F(DBTest2, TestGetColumnFamilyHandleUnlocked) {
   port::Thread user_thread1([&]() {
     auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[0]->GetID());
     ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
-    TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1");
-    TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1");
+    TEST_SYNC_POINT(
+        "TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1");
+    TEST_SYNC_POINT(
+        "TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1");
     ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
   });
 
   port::Thread user_thread2([&]() {
-    TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2");
+    TEST_SYNC_POINT(
+        "TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2");
     auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[1]->GetID());
     ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
-    TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2");
+    TEST_SYNC_POINT(
+        "TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2");
     ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
   });
 
@@ -4480,31 +5572,36 @@ TEST_F(DBTest2, TestCompactFiles) {
   GetSstFiles(env_, dbname_, &files);
   ASSERT_EQ(files.size(), 2);
 
-  port::Thread user_thread1(
-      [&]() { db_->CompactFiles(CompactionOptions(), handle, files, 1); });
+  Status user_thread1_status;
+  port::Thread user_thread1([&]() {
+    user_thread1_status =
+        db_->CompactFiles(CompactionOptions(), handle, files, 1);
+  });
 
+  Status user_thread2_status;
   port::Thread user_thread2([&]() {
-    ASSERT_OK(db_->IngestExternalFile(handle, {external_file2},
-                                      IngestExternalFileOptions()));
+    user_thread2_status = db_->IngestExternalFile(handle, {external_file2},
+                                                  IngestExternalFileOptions());
     TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1");
   });
 
   user_thread1.join();
   user_thread2.join();
 
+  ASSERT_OK(user_thread1_status);
+  ASSERT_OK(user_thread2_status);
+
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
 }
 #endif  // ROCKSDB_LITE
 
-// TODO: figure out why this test fails in appveyor
-#ifndef OS_WIN
 TEST_F(DBTest2, MultiDBParallelOpenTest) {
   const int kNumDbs = 2;
   Options options = CurrentOptions();
   std::vector<std::string> dbnames;
   for (int i = 0; i < kNumDbs; ++i) {
-    dbnames.emplace_back(test::TmpDir(env_) + "/db" + ToString(i));
+    dbnames.emplace_back(test::PerThreadDBPath(env_, "db" + std::to_string(i)));
     ASSERT_OK(DestroyDB(dbnames.back(), options));
   }
 
@@ -4529,7 +5626,6 @@ TEST_F(DBTest2, MultiDBParallelOpenTest) {
   }
 
   // Verify non-empty DBs can be recovered in parallel
-  dbs.clear();
   open_threads.clear();
   for (int i = 0; i < kNumDbs; ++i) {
     open_threads.emplace_back(
@@ -4546,11 +5642,11 @@ TEST_F(DBTest2, MultiDBParallelOpenTest) {
     ASSERT_OK(DestroyDB(dbnames[i], options));
   }
 }
-#endif  // OS_WIN
 
 namespace {
 class DummyOldStats : public Statistics {
  public:
+  const char* Name() const override { return "DummyOldStats"; }
   uint64_t getTickerCount(uint32_t /*ticker_type*/) const override { return 0; }
   void recordTick(uint32_t /* ticker_type */, uint64_t /* count */) override {
     num_rt++;
@@ -4573,7 +5669,7 @@ class DummyOldStats : public Statistics {
   std::atomic<int> num_rt{0};
   std::atomic<int> num_mt{0};
 };
-}  // namespace
+}  // anonymous namespace
 
 TEST_F(DBTest2, OldStatsInterface) {
   DummyOldStats* dos = new DummyOldStats();
@@ -4583,7 +5679,7 @@ TEST_F(DBTest2, OldStatsInterface) {
   options.statistics = stats;
   Reopen(options);
 
-  Put("foo", "bar");
+  ASSERT_OK(Put("foo", "bar"));
   ASSERT_EQ("bar", Get("foo"));
   ASSERT_OK(Flush());
   ASSERT_EQ("bar", Get("foo"));
@@ -4631,6 +5727,7 @@ TEST_F(DBTest2, PrefixBloomReseek) {
   ASSERT_OK(Put("bbb1", ""));
 
   Iterator* iter = db_->NewIterator(ReadOptions());
+  ASSERT_OK(iter->status());
 
   // Seeking into f1, the iterator will check bloom filter which returns the
   // file iterator ot be invalidate, and the cursor will put into f2, with
@@ -4669,6 +5766,7 @@ TEST_F(DBTest2, PrefixBloomFilteredOut) {
   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
 
   Iterator* iter = db_->NewIterator(ReadOptions());
+  ASSERT_OK(iter->status());
 
   // Bloom filter is filterd out by f1.
   // This is just one of several valid position following the contract.
@@ -4676,6 +5774,7 @@ TEST_F(DBTest2, PrefixBloomFilteredOut) {
   // the behavior of the current implementation. If underlying implementation
   // changes, the test might fail here.
   iter->Seek("bbb1");
+  ASSERT_OK(iter->status());
   ASSERT_FALSE(iter->Valid());
 
   delete iter;
@@ -4822,6 +5921,7 @@ TEST_F(DBTest2, SeekFileRangeDeleteTail) {
     ReadOptions ro;
     ro.total_order_seek = true;
     std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
+    ASSERT_OK(iter->status());
     iter->Seek("e");
     ASSERT_TRUE(iter->Valid());
     ASSERT_EQ("x", iter->key().ToString());
@@ -4839,6 +5939,7 @@ TEST_F(DBTest2, BackgroundPurgeTest) {
 
   ASSERT_OK(Put("a", "a"));
   Iterator* iter = db_->NewIterator(ReadOptions());
+  ASSERT_OK(iter->status());
   ASSERT_OK(Flush());
   size_t value = options.write_buffer_manager->memory_usage();
   ASSERT_GT(value, base_value);
@@ -4897,7 +5998,7 @@ TEST_F(DBTest2, SameSmallestInSameLevel) {
   ASSERT_OK(Put("key", "2"));
   ASSERT_OK(db_->Merge(WriteOptions(), "key", "3"));
   ASSERT_OK(db_->Merge(WriteOptions(), "key", "4"));
-  Flush();
+  ASSERT_OK(Flush());
   CompactRangeOptions cro;
   cro.change_level = true;
   cro.target_level = 2;
@@ -4905,14 +6006,14 @@ TEST_F(DBTest2, SameSmallestInSameLevel) {
                                    nullptr));
 
   ASSERT_OK(db_->Merge(WriteOptions(), "key", "5"));
-  Flush();
+  ASSERT_OK(Flush());
   ASSERT_OK(db_->Merge(WriteOptions(), "key", "6"));
-  Flush();
+  ASSERT_OK(Flush());
   ASSERT_OK(db_->Merge(WriteOptions(), "key", "7"));
-  Flush();
+  ASSERT_OK(Flush());
   ASSERT_OK(db_->Merge(WriteOptions(), "key", "8"));
-  Flush();
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("0,4,1", FilesPerLevel());
 #endif  // ROCKSDB_LITE
@@ -4921,8 +6022,8 @@ TEST_F(DBTest2, SameSmallestInSameLevel) {
 }
 
 TEST_F(DBTest2, FileConsistencyCheckInOpen) {
-  Put("foo", "bar");
-  Flush();
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Flush());
 
   SyncPoint::GetInstance()->SetCallBack(
       "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
@@ -4957,10 +6058,11 @@ TEST_F(DBTest2, BlockBasedTablePrefixIndexSeekForPrev) {
   ASSERT_OK(Put("a1", large_value));
   ASSERT_OK(Put("x1", large_value));
   ASSERT_OK(Put("y1", large_value));
-  Flush();
+  ASSERT_OK(Flush());
 
   {
     std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
+    ASSERT_OK(iterator->status());
     iterator->SeekForPrev("x3");
     ASSERT_TRUE(iterator->Valid());
     ASSERT_EQ("x1", iterator->key().ToString());
@@ -5072,7 +6174,7 @@ TEST_F(DBTest2, ChangePrefixExtractor) {
     ASSERT_OK(Put("xx1", ""));
     ASSERT_OK(Put("xz1", ""));
     ASSERT_OK(Put("zz", ""));
-    Flush();
+    ASSERT_OK(Flush());
 
     // After reopening DB with prefix size 2 => 1, prefix extractor
     // won't take effective unless it won't change results based
@@ -5082,6 +6184,7 @@ TEST_F(DBTest2, ChangePrefixExtractor) {
 
     {
       std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
+      ASSERT_OK(iterator->status());
       iterator->Seek("xa");
       ASSERT_TRUE(iterator->Valid());
       ASSERT_EQ("xb", iterator->key().ToString());
@@ -5106,6 +6209,7 @@ TEST_F(DBTest2, ChangePrefixExtractor) {
 
     {
       std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+      ASSERT_OK(iterator->status());
 
       // SeekForPrev() never uses prefix bloom if it is changed.
       iterator->SeekForPrev("xg0");
@@ -5120,6 +6224,7 @@ TEST_F(DBTest2, ChangePrefixExtractor) {
     ub = Slice(ub_str);
     {
       std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+      ASSERT_OK(iterator->status());
 
       iterator->Seek("x");
       ASSERT_TRUE(iterator->Valid());
@@ -5166,6 +6271,8 @@ TEST_F(DBTest2, ChangePrefixExtractor) {
       if (expect_filter_check) {
         ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
       }
+
+      ASSERT_OK(iterator->status());
     }
     {
       std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
@@ -5183,6 +6290,8 @@ TEST_F(DBTest2, ChangePrefixExtractor) {
       if (expect_filter_check) {
         ASSERT_EQ(6, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
       }
+
+      ASSERT_OK(iterator->status());
     }
 
     ub_str = "xg9";
@@ -5195,6 +6304,7 @@ TEST_F(DBTest2, ChangePrefixExtractor) {
       if (expect_filter_check) {
         ASSERT_EQ(7, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
       }
+      ASSERT_OK(iterator->status());
     }
   }
 }
@@ -5214,139 +6324,950 @@ TEST_F(DBTest2, BlockBasedTablePrefixGetIndexNotFound) {
   Reopen(options);
 
   ASSERT_OK(Put("b1", "ok"));
-  Flush();
+  ASSERT_OK(Flush());
 
   // Flushing several files so that the chance that hash bucket
   // is empty fo "b" in at least one of the files is high.
   ASSERT_OK(Put("a1", ""));
   ASSERT_OK(Put("c1", ""));
-  Flush();
+  ASSERT_OK(Flush());
 
   ASSERT_OK(Put("a2", ""));
   ASSERT_OK(Put("c2", ""));
-  Flush();
+  ASSERT_OK(Flush());
 
   ASSERT_OK(Put("a3", ""));
   ASSERT_OK(Put("c3", ""));
-  Flush();
+  ASSERT_OK(Flush());
 
   ASSERT_OK(Put("a4", ""));
   ASSERT_OK(Put("c4", ""));
-  Flush();
+  ASSERT_OK(Flush());
 
   ASSERT_OK(Put("a5", ""));
   ASSERT_OK(Put("c5", ""));
-  Flush();
+  ASSERT_OK(Flush());
 
   ASSERT_EQ("ok", Get("b1"));
 }
 
 #ifndef ROCKSDB_LITE
 TEST_F(DBTest2, AutoPrefixMode1) {
-  // create a DB with block prefix index
-  BlockBasedTableOptions table_options;
-  Options options = CurrentOptions();
-  table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
-  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
-  options.prefix_extractor.reset(NewFixedPrefixTransform(1));
-  options.statistics = CreateDBStatistics();
-
-  Reopen(options);
-
-  Random rnd(301);
-  std::string large_value = rnd.RandomString(500);
-
-  ASSERT_OK(Put("a1", large_value));
-  ASSERT_OK(Put("x1", large_value));
-  ASSERT_OK(Put("y1", large_value));
-  Flush();
+  do {
+    // create a DB with block prefix index
+    Options options = CurrentOptions();
+    BlockBasedTableOptions table_options =
+        *options.table_factory->GetOptions<BlockBasedTableOptions>();
+    table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
+    options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+    options.prefix_extractor.reset(NewFixedPrefixTransform(1));
+    options.statistics = CreateDBStatistics();
 
-  ReadOptions ro;
-  ro.total_order_seek = false;
-  ro.auto_prefix_mode = true;
-  {
-    std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
-    iterator->Seek("b1");
-    ASSERT_TRUE(iterator->Valid());
-    ASSERT_EQ("x1", iterator->key().ToString());
-    ASSERT_EQ(0, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
-  }
+    Reopen(options);
 
-  std::string ub_str = "b9";
-  Slice ub(ub_str);
-  ro.iterate_upper_bound = &ub;
+    Random rnd(301);
+    std::string large_value = rnd.RandomString(500);
 
-  {
-    std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
-    iterator->Seek("b1");
-    ASSERT_FALSE(iterator->Valid());
-    ASSERT_EQ(1, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
-  }
+    ASSERT_OK(Put("a1", large_value));
+    ASSERT_OK(Put("x1", large_value));
+    ASSERT_OK(Put("y1", large_value));
+    ASSERT_OK(Flush());
 
-  ub_str = "z";
-  ub = Slice(ub_str);
-  {
-    std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
-    iterator->Seek("b1");
-    ASSERT_TRUE(iterator->Valid());
-    ASSERT_EQ("x1", iterator->key().ToString());
-    ASSERT_EQ(1, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
-  }
+    ReadOptions ro;
+    ro.total_order_seek = false;
+    ro.auto_prefix_mode = true;
 
-  ub_str = "c";
-  ub = Slice(ub_str);
-  {
-    std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
-    iterator->Seek("b1");
-    ASSERT_FALSE(iterator->Valid());
-    ASSERT_EQ(2, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
-  }
+    const auto stat = BLOOM_FILTER_PREFIX_CHECKED;
+    {
+      std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+      iterator->Seek("b1");
+      ASSERT_TRUE(iterator->Valid());
+      ASSERT_EQ("x1", iterator->key().ToString());
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+      ASSERT_OK(iterator->status());
+    }
 
-  // The same queries without recreating iterator
-  {
-    ub_str = "b9";
-    ub = Slice(ub_str);
+    Slice ub;
     ro.iterate_upper_bound = &ub;
 
-    std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
-    iterator->Seek("b1");
-    ASSERT_FALSE(iterator->Valid());
-    ASSERT_EQ(3, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
+    ub = "b9";
+    {
+      std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+      iterator->Seek("b1");
+      ASSERT_FALSE(iterator->Valid());
+      EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+      ASSERT_OK(iterator->status());
+    }
 
-    ub_str = "z";
-    ub = Slice(ub_str);
+    ub = "z";
+    {
+      std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+      iterator->Seek("b1");
+      ASSERT_TRUE(iterator->Valid());
+      ASSERT_EQ("x1", iterator->key().ToString());
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+      ASSERT_OK(iterator->status());
+    }
 
-    iterator->Seek("b1");
-    ASSERT_TRUE(iterator->Valid());
-    ASSERT_EQ("x1", iterator->key().ToString());
-    ASSERT_EQ(3, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
+    ub = "c";
+    {
+      std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+      iterator->Seek("b1");
+      ASSERT_FALSE(iterator->Valid());
+      EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+      ASSERT_OK(iterator->status());
+    }
 
-    ub_str = "c";
-    ub = Slice(ub_str);
+    ub = "c1";
+    {
+      std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+      iterator->Seek("b1");
+      ASSERT_FALSE(iterator->Valid());
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+      ASSERT_OK(iterator->status());
+    }
 
-    iterator->Seek("b1");
-    ASSERT_FALSE(iterator->Valid());
-    ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
+    // The same queries without recreating iterator
+    {
+      std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
 
-    ub_str = "b9";
-    ub = Slice(ub_str);
-    ro.iterate_upper_bound = &ub;
-    iterator->SeekForPrev("b1");
-    ASSERT_TRUE(iterator->Valid());
-    ASSERT_EQ("a1", iterator->key().ToString());
-    ASSERT_EQ(4, TestGetTickerCount(options, BLOOM_FILTER_PREFIX_CHECKED));
+      ub = "b9";
+      iterator->Seek("b1");
+      ASSERT_FALSE(iterator->Valid());
+      EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+      ASSERT_OK(iterator->status());
 
-    ub_str = "zz";
-    ub = Slice(ub_str);
-    ro.iterate_upper_bound = &ub;
-    iterator->SeekToLast();
-    ASSERT_TRUE(iterator->Valid());
-    ASSERT_EQ("y1", iterator->key().ToString());
+      ub = "z";
+      iterator->Seek("b1");
+      ASSERT_TRUE(iterator->Valid());
+      ASSERT_EQ("x1", iterator->key().ToString());
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
 
-    iterator->SeekToFirst();
-    ASSERT_TRUE(iterator->Valid());
-    ASSERT_EQ("a1", iterator->key().ToString());
+      ub = "c";
+      iterator->Seek("b1");
+      ASSERT_FALSE(iterator->Valid());
+      EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+
+      ub = "b9";
+      iterator->SeekForPrev("b1");
+      ASSERT_TRUE(iterator->Valid());
+      ASSERT_EQ("a1", iterator->key().ToString());
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+      ub = "zz";
+      iterator->SeekToLast();
+      ASSERT_TRUE(iterator->Valid());
+      ASSERT_EQ("y1", iterator->key().ToString());
+
+      iterator->SeekToFirst();
+      ASSERT_TRUE(iterator->Valid());
+      ASSERT_EQ("a1", iterator->key().ToString());
+    }
+
+    // Similar, now with reverse comparator
+    // Technically, we are violating axiom 2 of prefix_extractors, but
+    // it should be revised because of major use-cases using
+    // ReverseBytewiseComparator with capped/fixed prefix Seek. (FIXME)
+    options.comparator = ReverseBytewiseComparator();
+    options.prefix_extractor.reset(NewFixedPrefixTransform(1));
+
+    DestroyAndReopen(options);
+
+    ASSERT_OK(Put("a1", large_value));
+    ASSERT_OK(Put("x1", large_value));
+    ASSERT_OK(Put("y1", large_value));
+    ASSERT_OK(Flush());
+
+    {
+      std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+
+      ub = "b1";
+      iterator->Seek("b9");
+      ASSERT_FALSE(iterator->Valid());
+      EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+      ASSERT_OK(iterator->status());
+
+      ub = "b1";
+      iterator->Seek("z");
+      ASSERT_TRUE(iterator->Valid());
+      ASSERT_EQ("y1", iterator->key().ToString());
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+      ub = "b1";
+      iterator->Seek("c");
+      ASSERT_FALSE(iterator->Valid());
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+      ub = "b";
+      iterator->Seek("c9");
+      ASSERT_FALSE(iterator->Valid());
+      // Fails if ReverseBytewiseComparator::IsSameLengthImmediateSuccessor
+      // is "correctly" implemented.
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+      ub = "a";
+      iterator->Seek("b9");
+      // Fails if ReverseBytewiseComparator::IsSameLengthImmediateSuccessor
+      // is "correctly" implemented.
+      ASSERT_TRUE(iterator->Valid());
+      ASSERT_EQ("a1", iterator->key().ToString());
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+      ub = "b";
+      iterator->Seek("a");
+      ASSERT_FALSE(iterator->Valid());
+      // Fails if ReverseBytewiseComparator::IsSameLengthImmediateSuccessor
+      // matches BytewiseComparator::IsSameLengthImmediateSuccessor. Upper
+      // comparing before seek key prevents a real bug from surfacing.
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+      ub = "b1";
+      iterator->SeekForPrev("b9");
+      ASSERT_TRUE(iterator->Valid());
+      // Fails if ReverseBytewiseComparator::IsSameLengthImmediateSuccessor
+      // is "correctly" implemented.
+      ASSERT_EQ("x1", iterator->key().ToString());
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+
+      ub = "a";
+      iterator->SeekToLast();
+      ASSERT_TRUE(iterator->Valid());
+      ASSERT_EQ("a1", iterator->key().ToString());
+
+      iterator->SeekToFirst();
+      ASSERT_TRUE(iterator->Valid());
+      ASSERT_EQ("y1", iterator->key().ToString());
+    }
+
+    // Now something a bit different, related to "short" keys that
+    // auto_prefix_mode can omit. See "BUG" section of auto_prefix_mode.
+    options.comparator = BytewiseComparator();
+    for (const auto config : {"fixed:2", "capped:2"}) {
+      ASSERT_OK(SliceTransform::CreateFromString(ConfigOptions(), config,
+                                                 &options.prefix_extractor));
+
+      // FIXME: kHashSearch, etc. requires all keys be InDomain
+      if (StartsWith(config, "fixed") &&
+          (table_options.index_type == BlockBasedTableOptions::kHashSearch ||
+           StartsWith(options.memtable_factory->Name(), "Hash"))) {
+        continue;
+      }
+      DestroyAndReopen(options);
+
+      const char* a_end_stuff = "a\xffXYZ";
+      const char* b_begin_stuff = "b\x00XYZ";
+      ASSERT_OK(Put("a", large_value));
+      ASSERT_OK(Put("b", large_value));
+      ASSERT_OK(Put(Slice(b_begin_stuff, 3), large_value));
+      ASSERT_OK(Put("c", large_value));
+      ASSERT_OK(Flush());
+
+      // control showing valid optimization with auto_prefix mode
+      ub = Slice(a_end_stuff, 4);
+      ro.iterate_upper_bound = &ub;
+
+      std::unique_ptr<Iterator> iterator(db_->NewIterator(ro));
+      iterator->Seek(Slice(a_end_stuff, 2));
+      ASSERT_FALSE(iterator->Valid());
+      EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+      ASSERT_OK(iterator->status());
+
+      // test, cannot be validly optimized with auto_prefix_mode
+      ub = Slice(b_begin_stuff, 2);
+      ro.iterate_upper_bound = &ub;
+
+      iterator->Seek(Slice(a_end_stuff, 2));
+      // !!! BUG !!! See "BUG" section of auto_prefix_mode.
+      ASSERT_FALSE(iterator->Valid());
+      EXPECT_EQ(1, TestGetAndResetTickerCount(options, stat));
+      ASSERT_OK(iterator->status());
+
+      // To prove that is the wrong result, now use total order seek
+      ReadOptions tos_ro = ro;
+      tos_ro.total_order_seek = true;
+      tos_ro.auto_prefix_mode = false;
+      iterator.reset(db_->NewIterator(tos_ro));
+      iterator->Seek(Slice(a_end_stuff, 2));
+      ASSERT_TRUE(iterator->Valid());
+      ASSERT_EQ("b", iterator->key().ToString());
+      EXPECT_EQ(0, TestGetAndResetTickerCount(options, stat));
+      ASSERT_OK(iterator->status());
+    }
+  } while (ChangeOptions(kSkipPlainTable));
+}
+
+class RenameCurrentTest : public DBTestBase,
+                          public testing::WithParamInterface<std::string> {
+ public:
+  RenameCurrentTest()
+      : DBTestBase("rename_current_test", /*env_do_fsync=*/true),
+        sync_point_(GetParam()) {}
+
+  ~RenameCurrentTest() override {}
+
+  void SetUp() override {
+    env_->no_file_overwrite_.store(true, std::memory_order_release);
+  }
+
+  void TearDown() override {
+    env_->no_file_overwrite_.store(false, std::memory_order_release);
+  }
+
+  void SetupSyncPoints() {
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->SetCallBack(sync_point_, [&](void* arg) {
+      Status* s = reinterpret_cast<Status*>(arg);
+      assert(s);
+      *s = Status::IOError("Injected IO error.");
+    });
+  }
+
+  const std::string sync_point_;
+};
+
+INSTANTIATE_TEST_CASE_P(DistributedFS, RenameCurrentTest,
+                        ::testing::Values("SetCurrentFile:BeforeRename",
+                                          "SetCurrentFile:AfterRename"));
+
+TEST_P(RenameCurrentTest, Open) {
+  Destroy(last_options_);
+  Options options = GetDefaultOptions();
+  options.create_if_missing = true;
+  SetupSyncPoints();
+  SyncPoint::GetInstance()->EnableProcessing();
+  Status s = TryReopen(options);
+  ASSERT_NOK(s);
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  Reopen(options);
+}
+
+TEST_P(RenameCurrentTest, Flush) {
+  Destroy(last_options_);
+  Options options = GetDefaultOptions();
+  options.max_manifest_file_size = 1;
+  options.create_if_missing = true;
+  Reopen(options);
+  ASSERT_OK(Put("key", "value"));
+  SetupSyncPoints();
+  SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_NOK(Flush());
+
+  ASSERT_NOK(Put("foo", "value"));
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  Reopen(options);
+  ASSERT_EQ("value", Get("key"));
+  ASSERT_EQ("NOT_FOUND", Get("foo"));
+}
+
+TEST_P(RenameCurrentTest, Compaction) {
+  Destroy(last_options_);
+  Options options = GetDefaultOptions();
+  options.max_manifest_file_size = 1;
+  options.create_if_missing = true;
+  Reopen(options);
+  ASSERT_OK(Put("a", "a_value"));
+  ASSERT_OK(Put("c", "c_value"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("b", "b_value"));
+  ASSERT_OK(Put("d", "d_value"));
+  ASSERT_OK(Flush());
+
+  SetupSyncPoints();
+  SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
+                               /*end=*/nullptr));
+
+  ASSERT_NOK(Put("foo", "value"));
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  Reopen(options);
+  ASSERT_EQ("NOT_FOUND", Get("foo"));
+  ASSERT_EQ("d_value", Get("d"));
+}
+
+TEST_F(DBTest2, LastLevelTemperature) {
+  class TestListener : public EventListener {
+   public:
+    void OnFileReadFinish(const FileOperationInfo& info) override {
+      UpdateFileTemperature(info);
+    }
+
+    void OnFileWriteFinish(const FileOperationInfo& info) override {
+      UpdateFileTemperature(info);
+    }
+
+    void OnFileFlushFinish(const FileOperationInfo& info) override {
+      UpdateFileTemperature(info);
+    }
+
+    void OnFileSyncFinish(const FileOperationInfo& info) override {
+      UpdateFileTemperature(info);
+    }
+
+    void OnFileCloseFinish(const FileOperationInfo& info) override {
+      UpdateFileTemperature(info);
+    }
+
+    bool ShouldBeNotifiedOnFileIO() override { return true; }
+
+    std::unordered_map<uint64_t, Temperature> file_temperatures;
+
+   private:
+    void UpdateFileTemperature(const FileOperationInfo& info) {
+      auto filename = GetFileName(info.path);
+      uint64_t number;
+      FileType type;
+      ASSERT_TRUE(ParseFileName(filename, &number, &type));
+      if (type == kTableFile) {
+        MutexLock l(&mutex_);
+        auto ret = file_temperatures.insert({number, info.temperature});
+        if (!ret.second) {
+          // the same file temperature should always be the same for all events
+          ASSERT_TRUE(ret.first->second == info.temperature);
+        }
+      }
+    }
+
+    std::string GetFileName(const std::string& fname) {
+      auto filename = fname.substr(fname.find_last_of(kFilePathSeparator) + 1);
+      // workaround only for Windows that the file path could contain both
+      // Windows FilePathSeparator and '/'
+      filename = filename.substr(filename.find_last_of('/') + 1);
+      return filename;
+    }
+
+    port::Mutex mutex_;
+  };
+
+  const int kNumLevels = 7;
+  const int kLastLevel = kNumLevels - 1;
+
+  auto* listener = new TestListener();
+
+  Options options = CurrentOptions();
+  options.bottommost_temperature = Temperature::kWarm;
+  options.level0_file_num_compaction_trigger = 2;
+  options.level_compaction_dynamic_level_bytes = true;
+  options.num_levels = kNumLevels;
+  options.statistics = CreateDBStatistics();
+  options.listeners.emplace_back(listener);
+  Reopen(options);
+
+  auto size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_EQ(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_EQ(size, 0);
+  size = GetSstSizeHelper(Temperature::kHot);
+  ASSERT_EQ(size, 0);
+
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Put("bar", "bar"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Put("bar", "bar"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  get_iostats_context()->Reset();
+  IOStatsContext* iostats = get_iostats_context();
+
+  ColumnFamilyMetaData metadata;
+  db_->GetColumnFamilyMetaData(&metadata);
+  ASSERT_EQ(1, metadata.file_count);
+  SstFileMetaData meta = metadata.levels[kLastLevel].files[0];
+  ASSERT_EQ(Temperature::kWarm, meta.temperature);
+  uint64_t number;
+  FileType type;
+  ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+  ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+
+  size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_EQ(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_GT(size, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+  ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+  ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+
+  ASSERT_EQ("bar", Get("foo"));
+
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 1);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_bytes_read, 0);
+  ASSERT_GT(iostats->file_io_stats_by_temperature.warm_file_bytes_read, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read, 0);
+  ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+  ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
+  ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
+
+  // non-bottommost file still has unknown temperature
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Put("bar", "bar"));
+  ASSERT_OK(Flush());
+  ASSERT_EQ("bar", Get("bar"));
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 1);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_bytes_read, 0);
+  ASSERT_GT(iostats->file_io_stats_by_temperature.warm_file_bytes_read, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read, 0);
+  ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+  ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
+  ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
+
+  db_->GetColumnFamilyMetaData(&metadata);
+  ASSERT_EQ(2, metadata.file_count);
+  meta = metadata.levels[0].files[0];
+  ASSERT_EQ(Temperature::kUnknown, meta.temperature);
+  ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+  ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+
+  meta = metadata.levels[kLastLevel].files[0];
+  ASSERT_EQ(Temperature::kWarm, meta.temperature);
+  ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+  ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+
+  size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_GT(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_GT(size, 0);
+
+  // reopen and check the information is persisted
+  Reopen(options);
+  db_->GetColumnFamilyMetaData(&metadata);
+  ASSERT_EQ(2, metadata.file_count);
+  meta = metadata.levels[0].files[0];
+  ASSERT_EQ(Temperature::kUnknown, meta.temperature);
+  ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+  ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+
+  meta = metadata.levels[kLastLevel].files[0];
+  ASSERT_EQ(Temperature::kWarm, meta.temperature);
+  ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+  ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+  size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_GT(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_GT(size, 0);
+
+  // check other non-exist temperatures
+  size = GetSstSizeHelper(Temperature::kHot);
+  ASSERT_EQ(size, 0);
+  size = GetSstSizeHelper(Temperature::kCold);
+  ASSERT_EQ(size, 0);
+  std::string prop;
+  ASSERT_TRUE(dbfull()->GetProperty(
+      DB::Properties::kLiveSstFilesSizeAtTemperature + std::to_string(22),
+      &prop));
+  ASSERT_EQ(std::atoi(prop.c_str()), 0);
+
+  Reopen(options);
+  db_->GetColumnFamilyMetaData(&metadata);
+  ASSERT_EQ(2, metadata.file_count);
+  meta = metadata.levels[0].files[0];
+  ASSERT_EQ(Temperature::kUnknown, meta.temperature);
+  ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+  ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+
+  meta = metadata.levels[kLastLevel].files[0];
+  ASSERT_EQ(Temperature::kWarm, meta.temperature);
+  ASSERT_TRUE(ParseFileName(meta.name, &number, &type));
+  ASSERT_EQ(listener->file_temperatures.at(number), meta.temperature);
+}
+
+TEST_F(DBTest2, LastLevelTemperatureUniversal) {
+  const int kTriggerNum = 3;
+  const int kNumLevels = 5;
+  const int kBottommostLevel = kNumLevels - 1;
+  Options options = CurrentOptions();
+  options.compaction_style = kCompactionStyleUniversal;
+  options.level0_file_num_compaction_trigger = kTriggerNum;
+  options.num_levels = kNumLevels;
+  options.statistics = CreateDBStatistics();
+  DestroyAndReopen(options);
+
+  auto size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_EQ(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_EQ(size, 0);
+  size = GetSstSizeHelper(Temperature::kHot);
+  ASSERT_EQ(size, 0);
+  get_iostats_context()->Reset();
+  IOStatsContext* iostats = get_iostats_context();
+
+  for (int i = 0; i < kTriggerNum; i++) {
+    ASSERT_OK(Put("foo", "bar"));
+    ASSERT_OK(Put("bar", "bar"));
+    ASSERT_OK(Flush());
   }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  ColumnFamilyMetaData metadata;
+  db_->GetColumnFamilyMetaData(&metadata);
+  ASSERT_EQ(1, metadata.file_count);
+  ASSERT_EQ(Temperature::kUnknown,
+            metadata.levels[kBottommostLevel].files[0].temperature);
+  size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_GT(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_EQ(size, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_read_count, 0);
+  ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
+  ASSERT_EQ("bar", Get("foo"));
+
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_bytes_read, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_bytes_read, 0);
+  ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read, 0);
+  ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
+
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Put("bar", "bar"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  db_->GetColumnFamilyMetaData(&metadata);
+  ASSERT_EQ(2, metadata.file_count);
+  ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
+  size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_GT(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_EQ(size, 0);
+
+  // Update bottommost temperature
+  options.bottommost_temperature = Temperature::kWarm;
+  Reopen(options);
+  db_->GetColumnFamilyMetaData(&metadata);
+  // Should not impact existing ones
+  ASSERT_EQ(Temperature::kUnknown,
+            metadata.levels[kBottommostLevel].files[0].temperature);
+  size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_GT(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_EQ(size, 0);
+
+  // new generated file should have the new settings
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+  db_->GetColumnFamilyMetaData(&metadata);
+  ASSERT_EQ(1, metadata.file_count);
+  ASSERT_EQ(Temperature::kWarm,
+            metadata.levels[kBottommostLevel].files[0].temperature);
+  size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_EQ(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_GT(size, 0);
+  ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
+  ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_COUNT), 0);
+  ASSERT_GT(options.statistics->getTickerCount(WARM_FILE_READ_COUNT), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_COUNT), 0);
+
+  // non-bottommost file still has unknown temperature
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Put("bar", "bar"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  db_->GetColumnFamilyMetaData(&metadata);
+  ASSERT_EQ(2, metadata.file_count);
+  ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
+  size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_GT(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_GT(size, 0);
+
+  // check other non-exist temperatures
+  size = GetSstSizeHelper(Temperature::kHot);
+  ASSERT_EQ(size, 0);
+  size = GetSstSizeHelper(Temperature::kCold);
+  ASSERT_EQ(size, 0);
+  std::string prop;
+  ASSERT_TRUE(dbfull()->GetProperty(
+      DB::Properties::kLiveSstFilesSizeAtTemperature + std::to_string(22),
+      &prop));
+  ASSERT_EQ(std::atoi(prop.c_str()), 0);
+
+  // Update bottommost temperature dynamically with SetOptions
+  auto s = db_->SetOptions({{"last_level_temperature", "kCold"}});
+  ASSERT_OK(s);
+  ASSERT_EQ(db_->GetOptions().bottommost_temperature, Temperature::kCold);
+  db_->GetColumnFamilyMetaData(&metadata);
+  // Should not impact the existing files
+  ASSERT_EQ(Temperature::kWarm,
+            metadata.levels[kBottommostLevel].files[0].temperature);
+  size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_GT(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_GT(size, 0);
+  size = GetSstSizeHelper(Temperature::kCold);
+  ASSERT_EQ(size, 0);
+
+  // new generated files should have the new settings
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+  db_->GetColumnFamilyMetaData(&metadata);
+  ASSERT_EQ(1, metadata.file_count);
+  ASSERT_EQ(Temperature::kCold,
+            metadata.levels[kBottommostLevel].files[0].temperature);
+  size = GetSstSizeHelper(Temperature::kUnknown);
+  ASSERT_EQ(size, 0);
+  size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_EQ(size, 0);
+  size = GetSstSizeHelper(Temperature::kCold);
+  ASSERT_GT(size, 0);
+
+  // kLastTemperature is an invalid temperature
+  options.bottommost_temperature = Temperature::kLastTemperature;
+  s = TryReopen(options);
+  ASSERT_TRUE(s.IsIOError());
+}
+
+TEST_F(DBTest2, LastLevelStatistics) {
+  Options options = CurrentOptions();
+  options.bottommost_temperature = Temperature::kWarm;
+  options.level0_file_num_compaction_trigger = 2;
+  options.level_compaction_dynamic_level_bytes = true;
+  options.statistics = CreateDBStatistics();
+  Reopen(options);
+
+  // generate 1 sst on level 0
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Put("bar", "bar"));
+  ASSERT_OK(Flush());
+  ASSERT_EQ("bar", Get("bar"));
+
+  ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES), 0);
+  ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_BYTES), 0);
+  ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT), 0);
+
+  // 2nd flush to trigger compaction
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Put("bar", "bar"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_EQ("bar", Get("bar"));
+
+  ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_BYTES),
+            options.statistics->getTickerCount(WARM_FILE_READ_BYTES));
+  ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT),
+            options.statistics->getTickerCount(WARM_FILE_READ_COUNT));
+
+  auto pre_bytes =
+      options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES);
+  auto pre_count =
+      options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT);
+
+  // 3rd flush to generate 1 sst on level 0
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Put("bar", "bar"));
+  ASSERT_OK(Flush());
+  ASSERT_EQ("bar", Get("bar"));
+
+  ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_BYTES),
+            pre_bytes);
+  ASSERT_GT(options.statistics->getTickerCount(NON_LAST_LEVEL_READ_COUNT),
+            pre_count);
+  ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_BYTES),
+            options.statistics->getTickerCount(WARM_FILE_READ_BYTES));
+  ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT),
+            options.statistics->getTickerCount(WARM_FILE_READ_COUNT));
+}
+
+TEST_F(DBTest2, CheckpointFileTemperature) {
+  class NoLinkTestFS : public FileTemperatureTestFS {
+    using FileTemperatureTestFS::FileTemperatureTestFS;
+
+    IOStatus LinkFile(const std::string&, const std::string&, const IOOptions&,
+                      IODebugContext*) override {
+      // return not supported to force checkpoint copy the file instead of just
+      // link
+      return IOStatus::NotSupported();
+    }
+  };
+  auto test_fs = std::make_shared<NoLinkTestFS>(env_->GetFileSystem());
+  std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, test_fs));
+  Options options = CurrentOptions();
+  options.bottommost_temperature = Temperature::kWarm;
+  // set dynamic_level to true so the compaction would compact the data to the
+  // last level directly which will have the last_level_temperature
+  options.level_compaction_dynamic_level_bytes = true;
+  options.level0_file_num_compaction_trigger = 2;
+  options.env = env.get();
+  Reopen(options);
+
+  // generate a bottommost file and a non-bottommost file
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Put("bar", "bar"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Put("bar", "bar"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Put("bar", "bar"));
+  ASSERT_OK(Flush());
+  auto size = GetSstSizeHelper(Temperature::kWarm);
+  ASSERT_GT(size, 0);
+
+  std::map<uint64_t, Temperature> temperatures;
+  std::vector<LiveFileStorageInfo> infos;
+  ASSERT_OK(
+      dbfull()->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(), &infos));
+  for (auto info : infos) {
+    temperatures.emplace(info.file_number, info.temperature);
+  }
+
+  test_fs->PopRequestedSstFileTemperatures();
+  Checkpoint* checkpoint;
+  ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+  ASSERT_OK(
+      checkpoint->CreateCheckpoint(dbname_ + kFilePathSeparator + "tempcp"));
+
+  // checking src file src_temperature hints: 2 sst files: 1 sst is kWarm,
+  // another is kUnknown
+  std::vector<std::pair<uint64_t, Temperature>> requested_temps;
+  test_fs->PopRequestedSstFileTemperatures(&requested_temps);
+  // Two requests
+  ASSERT_EQ(requested_temps.size(), 2);
+  std::set<uint64_t> distinct_requests;
+  for (const auto& requested_temp : requested_temps) {
+    // Matching manifest temperatures
+    ASSERT_EQ(temperatures.at(requested_temp.first), requested_temp.second);
+    distinct_requests.insert(requested_temp.first);
+  }
+  // Each request to distinct file
+  ASSERT_EQ(distinct_requests.size(), requested_temps.size());
+
+  delete checkpoint;
+  Close();
+}
+
+TEST_F(DBTest2, FileTemperatureManifestFixup) {
+  auto test_fs = std::make_shared<FileTemperatureTestFS>(env_->GetFileSystem());
+  std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, test_fs));
+  Options options = CurrentOptions();
+  options.bottommost_temperature = Temperature::kWarm;
+  // set dynamic_level to true so the compaction would compact the data to the
+  // last level directly which will have the last_level_temperature
+  options.level_compaction_dynamic_level_bytes = true;
+  options.level0_file_num_compaction_trigger = 2;
+  options.env = env.get();
+  std::vector<std::string> cfs = {/*"default",*/ "test1", "test2"};
+  CreateAndReopenWithCF(cfs, options);
+  // Needed for later re-opens (weird)
+  cfs.insert(cfs.begin(), kDefaultColumnFamilyName);
+
+  // Generate a bottommost file in all CFs
+  for (int cf = 0; cf < 3; ++cf) {
+    ASSERT_OK(Put(cf, "a", "val"));
+    ASSERT_OK(Put(cf, "c", "val"));
+    ASSERT_OK(Flush(cf));
+    ASSERT_OK(Put(cf, "b", "val"));
+    ASSERT_OK(Put(cf, "d", "val"));
+    ASSERT_OK(Flush(cf));
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  // verify
+  ASSERT_GT(GetSstSizeHelper(Temperature::kWarm), 0);
+  ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
+  ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
+  ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
+
+  // Generate a non-bottommost file in all CFs
+  for (int cf = 0; cf < 3; ++cf) {
+    ASSERT_OK(Put(cf, "e", "val"));
+    ASSERT_OK(Flush(cf));
+  }
+
+  // re-verify
+  ASSERT_GT(GetSstSizeHelper(Temperature::kWarm), 0);
+  // Not supported: ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
+  ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
+  ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
+
+  // Now change FS temperature on bottommost file(s) to kCold
+  std::map<uint64_t, Temperature> current_temps;
+  test_fs->CopyCurrentSstFileTemperatures(&current_temps);
+  for (auto e : current_temps) {
+    if (e.second == Temperature::kWarm) {
+      test_fs->OverrideSstFileTemperature(e.first, Temperature::kCold);
+    }
+  }
+  // Metadata not yet updated
+  ASSERT_EQ(Get("a"), "val");
+  ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
+
+  // Update with Close and UpdateManifestForFilesState, but first save cf
+  // descriptors
+  std::vector<ColumnFamilyDescriptor> column_families;
+  for (size_t i = 0; i < handles_.size(); ++i) {
+    ColumnFamilyDescriptor cfdescriptor;
+    // GetDescriptor is not implemented for ROCKSDB_LITE
+    handles_[i]->GetDescriptor(&cfdescriptor).PermitUncheckedError();
+    column_families.push_back(cfdescriptor);
+  }
+  Close();
+  experimental::UpdateManifestForFilesStateOptions update_opts;
+  update_opts.update_temperatures = true;
+
+  ASSERT_OK(experimental::UpdateManifestForFilesState(
+      options, dbname_, column_families, update_opts));
+
+  // Re-open and re-verify after update
+  ReopenWithColumnFamilies(cfs, options);
+  ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
+  // Not supported: ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
+  ASSERT_EQ(GetSstSizeHelper(Temperature::kWarm), 0);
+  ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
+
+  // Change kUnknown to kHot
+  test_fs->CopyCurrentSstFileTemperatures(&current_temps);
+  for (auto e : current_temps) {
+    if (e.second == Temperature::kUnknown) {
+      test_fs->OverrideSstFileTemperature(e.first, Temperature::kHot);
+    }
+  }
+
+  // Update with Close and UpdateManifestForFilesState
+  Close();
+  ASSERT_OK(experimental::UpdateManifestForFilesState(
+      options, dbname_, column_families, update_opts));
+
+  // Re-open and re-verify after update
+  ReopenWithColumnFamilies(cfs, options);
+  ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
+  ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
+  ASSERT_EQ(GetSstSizeHelper(Temperature::kWarm), 0);
+  ASSERT_GT(GetSstSizeHelper(Temperature::kHot), 0);
+
+  Close();
 }
 #endif  // ROCKSDB_LITE
 
@@ -5375,15 +7296,353 @@ TEST_F(DBTest2, PointInTimeRecoveryWithIOErrorWhileReadingWal) {
   Status s = TryReopen(options);
   ASSERT_TRUE(s.IsIOError());
 }
-}  // namespace ROCKSDB_NAMESPACE
 
-#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
-extern "C" {
-void RegisterCustomObjects(int argc, char** argv);
+TEST_F(DBTest2, PointInTimeRecoveryWithSyncFailureInCFCreation) {
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::BackgroundCallFlush:Start:1",
+        "PointInTimeRecoveryWithSyncFailureInCFCreation:1"},
+       {"PointInTimeRecoveryWithSyncFailureInCFCreation:2",
+        "DBImpl::BackgroundCallFlush:Start:2"}});
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  CreateColumnFamilies({"test1"}, Options());
+  ASSERT_OK(Put("foo", "bar"));
+
+  // Creating a CF when a flush is going on, log is synced but the
+  // closed log file is not synced and corrupted.
+  port::Thread flush_thread([&]() { ASSERT_NOK(Flush()); });
+  TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:1");
+  CreateColumnFamilies({"test2"}, Options());
+  env_->corrupt_in_sync_ = true;
+  TEST_SYNC_POINT("PointInTimeRecoveryWithSyncFailureInCFCreation:2");
+  flush_thread.join();
+  env_->corrupt_in_sync_ = false;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+  // Reopening the DB should not corrupt anything
+  Options options = CurrentOptions();
+  options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+  ReopenWithColumnFamilies({"default", "test1", "test2"}, options);
 }
-#else
-void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
-#endif  // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
+
+TEST_F(DBTest2, RenameDirectory) {
+  Options options = CurrentOptions();
+  DestroyAndReopen(options);
+  ASSERT_OK(Put("foo", "value0"));
+  Close();
+  auto old_dbname = dbname_;
+  auto new_dbname = dbname_ + "_2";
+  EXPECT_OK(env_->RenameFile(dbname_, new_dbname));
+  options.create_if_missing = false;
+  dbname_ = new_dbname;
+  ASSERT_OK(TryReopen(options));
+  ASSERT_EQ("value0", Get("foo"));
+  Destroy(options);
+  dbname_ = old_dbname;
+}
+
+TEST_F(DBTest2, SstUniqueIdVerifyBackwardCompatible) {
+  const int kNumSst = 3;
+  const int kLevel0Trigger = 4;
+  auto options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kLevel0Trigger;
+  options.statistics = CreateDBStatistics();
+  // Skip for now
+  options.verify_sst_unique_id_in_manifest = false;
+  Reopen(options);
+
+  std::atomic_int skipped = 0;
+  std::atomic_int passed = 0;
+  SyncPoint::GetInstance()->SetCallBack(
+      "BlockBasedTable::Open::SkippedVerifyUniqueId",
+      [&](void* /*arg*/) { skipped++; });
+  SyncPoint::GetInstance()->SetCallBack(
+      "BlockBasedTable::Open::PassedVerifyUniqueId",
+      [&](void* /*arg*/) { passed++; });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  // generate a few SSTs
+  for (int i = 0; i < kNumSst; i++) {
+    for (int j = 0; j < 100; j++) {
+      ASSERT_OK(Put(Key(i * 10 + j), "value"));
+    }
+    ASSERT_OK(Flush());
+  }
+
+  // Verification has been skipped on files so far
+  EXPECT_EQ(skipped, kNumSst);
+  EXPECT_EQ(passed, 0);
+
+  // Reopen with verification
+  options.verify_sst_unique_id_in_manifest = true;
+  skipped = 0;
+  passed = 0;
+  Reopen(options);
+  EXPECT_EQ(skipped, 0);
+  EXPECT_EQ(passed, kNumSst);
+
+  // Now simulate no unique id in manifest for next file
+  // NOTE: this only works for loading manifest from disk,
+  // not in-memory manifest, so we need to re-open below.
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionEdit::EncodeTo:UniqueId", [&](void* arg) {
+        auto unique_id = static_cast<UniqueId64x2*>(arg);
+        // remove id before writing it to manifest
+        (*unique_id)[0] = 0;
+        (*unique_id)[1] = 0;
+      });
+
+  // test compaction generated Sst
+  for (int i = kNumSst; i < kLevel0Trigger; i++) {
+    for (int j = 0; j < 100; j++) {
+      ASSERT_OK(Put(Key(i * 10 + j), "value"));
+    }
+    ASSERT_OK(Flush());
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+#ifndef ROCKSDB_LITE
+  ASSERT_EQ("0,1", FilesPerLevel(0));
+#endif  // ROCKSDB_LITE
+
+  // Reopen (with verification)
+  ASSERT_TRUE(options.verify_sst_unique_id_in_manifest);
+  skipped = 0;
+  passed = 0;
+  Reopen(options);
+  EXPECT_EQ(skipped, 1);
+  EXPECT_EQ(passed, 0);
+}
+
+TEST_F(DBTest2, SstUniqueIdVerify) {
+  const int kNumSst = 3;
+  const int kLevel0Trigger = 4;
+  auto options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kLevel0Trigger;
+  // Allow mismatch for now
+  options.verify_sst_unique_id_in_manifest = false;
+  Reopen(options);
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "PropertyBlockBuilder::AddTableProperty:Start", [&](void* props_vs) {
+        auto props = static_cast<TableProperties*>(props_vs);
+        // update table property session_id to a different one, which
+        // changes unique ID
+        props->db_session_id = DBImpl::GenerateDbSessionId(nullptr);
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  // generate a few SSTs
+  for (int i = 0; i < kNumSst; i++) {
+    for (int j = 0; j < 100; j++) {
+      ASSERT_OK(Put(Key(i * 10 + j), "value"));
+    }
+    ASSERT_OK(Flush());
+  }
+
+  // Reopen with verification should report corruption
+  options.verify_sst_unique_id_in_manifest = true;
+  auto s = TryReopen(options);
+  ASSERT_TRUE(s.IsCorruption());
+
+  // Reopen without verification should be fine
+  options.verify_sst_unique_id_in_manifest = false;
+  Reopen(options);
+
+  // test compaction generated Sst
+  for (int i = kNumSst; i < kLevel0Trigger; i++) {
+    for (int j = 0; j < 100; j++) {
+      ASSERT_OK(Put(Key(i * 10 + j), "value"));
+    }
+    ASSERT_OK(Flush());
+  }
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+#ifndef ROCKSDB_LITE
+  ASSERT_EQ("0,1", FilesPerLevel(0));
+#endif  // ROCKSDB_LITE
+
+  // Reopen with verification should fail
+  options.verify_sst_unique_id_in_manifest = true;
+  s = TryReopen(options);
+  ASSERT_TRUE(s.IsCorruption());
+}
+
+TEST_F(DBTest2, SstUniqueIdVerifyMultiCFs) {
+  const int kNumSst = 3;
+  const int kLevel0Trigger = 4;
+  auto options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = kLevel0Trigger;
+  // Allow mismatch for now
+  options.verify_sst_unique_id_in_manifest = false;
+
+  CreateAndReopenWithCF({"one", "two"}, options);
+
+  // generate good SSTs
+  for (int cf_num : {0, 2}) {
+    for (int i = 0; i < kNumSst; i++) {
+      for (int j = 0; j < 100; j++) {
+        ASSERT_OK(Put(cf_num, Key(i * 10 + j), "value"));
+      }
+      ASSERT_OK(Flush(cf_num));
+    }
+  }
+
+  // generate SSTs with bad unique id
+  SyncPoint::GetInstance()->SetCallBack(
+      "PropertyBlockBuilder::AddTableProperty:Start", [&](void* props_vs) {
+        auto props = static_cast<TableProperties*>(props_vs);
+        // update table property session_id to a different one
+        props->db_session_id = DBImpl::GenerateDbSessionId(nullptr);
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+  for (int i = 0; i < kNumSst; i++) {
+    for (int j = 0; j < 100; j++) {
+      ASSERT_OK(Put(1, Key(i * 10 + j), "value"));
+    }
+    ASSERT_OK(Flush(1));
+  }
+
+  // Reopen with verification should report corruption
+  options.verify_sst_unique_id_in_manifest = true;
+  auto s = TryReopenWithColumnFamilies({"default", "one", "two"}, options);
+  ASSERT_TRUE(s.IsCorruption());
+}
+
+TEST_F(DBTest2, BestEffortsRecoveryWithSstUniqueIdVerification) {
+  const auto tamper_with_uniq_id = [&](void* arg) {
+    auto props = static_cast<TableProperties*>(arg);
+    assert(props);
+    // update table property session_id to a different one
+    props->db_session_id = DBImpl::GenerateDbSessionId(nullptr);
+  };
+
+  const auto assert_db = [&](size_t expected_count,
+                             const std::string& expected_v) {
+    std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
+    size_t cnt = 0;
+    for (it->SeekToFirst(); it->Valid(); it->Next(), ++cnt) {
+      ASSERT_EQ(std::to_string(cnt), it->key());
+      ASSERT_EQ(expected_v, it->value());
+    }
+    ASSERT_EQ(expected_count, cnt);
+  };
+
+  const int num_l0_compaction_trigger = 8;
+  const int num_l0 = num_l0_compaction_trigger - 1;
+  Options options = CurrentOptions();
+  options.level0_file_num_compaction_trigger = num_l0_compaction_trigger;
+
+  for (int k = 0; k < num_l0; ++k) {
+    // Allow mismatch for now
+    options.verify_sst_unique_id_in_manifest = false;
+
+    DestroyAndReopen(options);
+
+    constexpr size_t num_keys_per_file = 10;
+    for (int i = 0; i < num_l0; ++i) {
+      for (size_t j = 0; j < num_keys_per_file; ++j) {
+        ASSERT_OK(Put(std::to_string(j), "v" + std::to_string(i)));
+      }
+      if (i == k) {
+        SyncPoint::GetInstance()->DisableProcessing();
+        SyncPoint::GetInstance()->SetCallBack(
+            "PropertyBlockBuilder::AddTableProperty:Start",
+            tamper_with_uniq_id);
+        SyncPoint::GetInstance()->EnableProcessing();
+      }
+      ASSERT_OK(Flush());
+    }
+
+    options.verify_sst_unique_id_in_manifest = true;
+    Status s = TryReopen(options);
+    ASSERT_TRUE(s.IsCorruption());
+
+    options.best_efforts_recovery = true;
+    Reopen(options);
+    assert_db(k == 0 ? 0 : num_keys_per_file, "v" + std::to_string(k - 1));
+
+    // Reopen with regular recovery
+    options.best_efforts_recovery = false;
+    Reopen(options);
+    assert_db(k == 0 ? 0 : num_keys_per_file, "v" + std::to_string(k - 1));
+
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearAllCallBacks();
+
+    for (size_t i = 0; i < num_keys_per_file; ++i) {
+      ASSERT_OK(Put(std::to_string(i), "v"));
+    }
+    ASSERT_OK(Flush());
+    Reopen(options);
+    {
+      for (size_t i = 0; i < num_keys_per_file; ++i) {
+        ASSERT_EQ("v", Get(std::to_string(i)));
+      }
+    }
+  }
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBTest2, GetLatestSeqAndTsForKey) {
+  Destroy(last_options_);
+
+  Options options = CurrentOptions();
+  options.max_write_buffer_size_to_maintain = 64 << 10;
+  options.create_if_missing = true;
+  options.disable_auto_compactions = true;
+  options.comparator = test::BytewiseComparatorWithU64TsWrapper();
+  options.statistics = CreateDBStatistics();
+
+  Reopen(options);
+
+  constexpr uint64_t kTsU64Value = 12;
+
+  for (uint64_t key = 0; key < 100; ++key) {
+    std::string ts;
+    PutFixed64(&ts, kTsU64Value);
+
+    std::string key_str;
+    PutFixed64(&key_str, key);
+    std::reverse(key_str.begin(), key_str.end());
+    ASSERT_OK(db_->Put(WriteOptions(), key_str, ts, "value"));
+  }
+
+  ASSERT_OK(Flush());
+
+  constexpr bool cache_only = true;
+  constexpr SequenceNumber lower_bound_seq = 0;
+  auto* cfhi = static_cast_with_check<ColumnFamilyHandleImpl>(
+      dbfull()->DefaultColumnFamily());
+  assert(cfhi);
+  assert(cfhi->cfd());
+  SuperVersion* sv = cfhi->cfd()->GetSuperVersion();
+  for (uint64_t key = 0; key < 100; ++key) {
+    std::string key_str;
+    PutFixed64(&key_str, key);
+    std::reverse(key_str.begin(), key_str.end());
+    std::string ts;
+    SequenceNumber seq = kMaxSequenceNumber;
+    bool found_record_for_key = false;
+    bool is_blob_index = false;
+
+    const Status s = dbfull()->GetLatestSequenceForKey(
+        sv, key_str, cache_only, lower_bound_seq, &seq, &ts,
+        &found_record_for_key, &is_blob_index);
+    ASSERT_OK(s);
+    std::string expected_ts;
+    PutFixed64(&expected_ts, kTsU64Value);
+    ASSERT_EQ(expected_ts, ts);
+    ASSERT_TRUE(found_record_for_key);
+    ASSERT_FALSE(is_blob_index);
+  }
+
+  // Verify that no read to SST files.
+  ASSERT_EQ(0, options.statistics->getTickerCount(GET_HIT_L0));
+}
+#endif  // ROCKSDB_LITE
+
+}  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();