]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/table/table_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / table / table_test.cc
index 2a24c99eb274c959b4f10db4310af4f1b7648b39..4ff4c7fa9d589963094927fb76c879cf6bd9143a 100644 (file)
@@ -70,7 +70,7 @@ const std::string kDummyValue(10000, 'o');
 // DummyPropertiesCollector used to test BlockBasedTableProperties
 class DummyPropertiesCollector : public TablePropertiesCollector {
  public:
-  const char* Name() const override { return ""; }
+  const char* Name() const override { return "DummyPropertiesCollector"; }
 
   Status Finish(UserCollectedProperties* /*properties*/) override {
     return Status::OK();
@@ -92,7 +92,9 @@ class DummyPropertiesCollectorFactory1
       TablePropertiesCollectorFactory::Context /*context*/) override {
     return new DummyPropertiesCollector();
   }
-  const char* Name() const override { return "DummyPropertiesCollector1"; }
+  const char* Name() const override {
+    return "DummyPropertiesCollectorFactory1";
+  }
 };
 
 class DummyPropertiesCollectorFactory2
@@ -102,7 +104,9 @@ class DummyPropertiesCollectorFactory2
       TablePropertiesCollectorFactory::Context /*context*/) override {
     return new DummyPropertiesCollector();
   }
-  const char* Name() const override { return "DummyPropertiesCollector2"; }
+  const char* Name() const override {
+    return "DummyPropertiesCollectorFactory2";
+  }
 };
 
 // Return reverse of "key".
@@ -211,46 +215,6 @@ class Constructor {
   stl_wrappers::KVMap data_;
 };
 
-class BlockConstructor: public Constructor {
- public:
-  explicit BlockConstructor(const Comparator* cmp)
-      : Constructor(cmp),
-        comparator_(cmp),
-        block_(nullptr) { }
-  ~BlockConstructor() override { delete block_; }
-  Status FinishImpl(const Options& /*options*/,
-                    const ImmutableCFOptions& /*ioptions*/,
-                    const MutableCFOptions& /*moptions*/,
-                    const BlockBasedTableOptions& table_options,
-                    const InternalKeyComparator& /*internal_comparator*/,
-                    const stl_wrappers::KVMap& kv_map) override {
-    delete block_;
-    block_ = nullptr;
-    BlockBuilder builder(table_options.block_restart_interval);
-
-    for (const auto kv : kv_map) {
-      builder.Add(kv.first, kv.second);
-    }
-    // Open the block
-    data_ = builder.Finish().ToString();
-    BlockContents contents;
-    contents.data = data_;
-    block_ = new Block(std::move(contents), kDisableGlobalSequenceNumber);
-    return Status::OK();
-  }
-  InternalIterator* NewIterator(
-      const SliceTransform* /*prefix_extractor*/) const override {
-    return block_->NewDataIterator(comparator_, comparator_);
-  }
-
- private:
-  const Comparator* comparator_;
-  std::string data_;
-  Block* block_;
-
-  BlockConstructor();
-};
-
 // A helper class that converts internal format keys into user keys
 class KeyConvertingIterator : public InternalIterator {
  public:
@@ -281,14 +245,18 @@ class KeyConvertingIterator : public InternalIterator {
   void SeekToLast() override { iter_->SeekToLast(); }
   void Next() override { iter_->Next(); }
   void Prev() override { iter_->Prev(); }
-  bool IsOutOfBound() override { return iter_->IsOutOfBound(); }
+  IterBoundCheck UpperBoundCheckResult() override {
+    return iter_->UpperBoundCheckResult();
+  }
 
   Slice key() const override {
     assert(Valid());
     ParsedInternalKey parsed_key;
-    if (!ParseInternalKey(iter_->key(), &parsed_key)) {
-      status_ = Status::Corruption("malformed internal key");
-      return Slice("corrupted key");
+    Status pik_status =
+        ParseInternalKey(iter_->key(), &parsed_key, true /* log_err_key */);
+    if (!pik_status.ok()) {
+      status_ = pik_status;
+      return Slice(status_.getState());
     }
     return parsed_key.user_key;
   }
@@ -308,7 +276,56 @@ class KeyConvertingIterator : public InternalIterator {
   void operator=(const KeyConvertingIterator&);
 };
 
-class TableConstructor: public Constructor {
+// `BlockConstructor` APIs always accept/return user keys.
+class BlockConstructor : public Constructor {
+ public:
+  explicit BlockConstructor(const Comparator* cmp)
+      : Constructor(cmp), comparator_(cmp), block_(nullptr) {}
+  ~BlockConstructor() override { delete block_; }
+  Status FinishImpl(const Options& /*options*/,
+                    const ImmutableCFOptions& /*ioptions*/,
+                    const MutableCFOptions& /*moptions*/,
+                    const BlockBasedTableOptions& table_options,
+                    const InternalKeyComparator& /*internal_comparator*/,
+                    const stl_wrappers::KVMap& kv_map) override {
+    delete block_;
+    block_ = nullptr;
+    BlockBuilder builder(table_options.block_restart_interval);
+
+    for (const auto& kv : kv_map) {
+      // `DataBlockIter` assumes it reads only internal keys. `BlockConstructor`
+      // clients provide user keys, so we need to convert to internal key format
+      // before writing the data block.
+      ParsedInternalKey ikey(kv.first, kMaxSequenceNumber, kTypeValue);
+      std::string encoded;
+      AppendInternalKey(&encoded, ikey);
+      builder.Add(encoded, kv.second);
+    }
+    // Open the block
+    data_ = builder.Finish().ToString();
+    BlockContents contents;
+    contents.data = data_;
+    block_ = new Block(std::move(contents));
+    return Status::OK();
+  }
+  InternalIterator* NewIterator(
+      const SliceTransform* /*prefix_extractor*/) const override {
+    // `DataBlockIter` returns the internal keys it reads.
+    // `KeyConvertingIterator` converts them to user keys before they are
+    // exposed to the `BlockConstructor` clients.
+    return new KeyConvertingIterator(
+        block_->NewDataIterator(comparator_, kDisableGlobalSequenceNumber));
+  }
+
+ private:
+  const Comparator* comparator_;
+  std::string data_;
+  Block* block_;
+
+  BlockConstructor();
+};
+
+class TableConstructor : public Constructor {
  public:
   explicit TableConstructor(const Comparator* cmp,
                             bool convert_to_internal_key = false,
@@ -351,7 +368,7 @@ class TableConstructor: public Constructor {
         TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
         file_writer_.get()));
 
-    for (const auto kv : kv_map) {
+    for (const auto& kv : kv_map) {
       if (convert_to_internal_key_) {
         ParsedInternalKey ikey(kv.first, kMaxSequenceNumber, kTypeValue);
         std::string encoded;
@@ -377,17 +394,17 @@ class TableConstructor: public Constructor {
     return ioptions.table_factory->NewTableReader(
         TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions,
                            internal_comparator, !kSkipFilters, !kImmortal,
-                           level_, largest_seqno_, &block_cache_tracer_),
+                           false, level_, largest_seqno_, &block_cache_tracer_,
+                           moptions.write_buffer_size),
         std::move(file_reader_), TEST_GetSink()->contents().size(),
         &table_reader_);
   }
 
   InternalIterator* NewIterator(
       const SliceTransform* prefix_extractor) const override {
-    ReadOptions ro;
     InternalIterator* iter = table_reader_->NewIterator(
-        ro, prefix_extractor, /*arena=*/nullptr, /*skip_filters=*/false,
-        TableReaderCaller::kUncategorized);
+        read_options_, prefix_extractor, /*arena=*/nullptr,
+        /*skip_filters=*/false, TableReaderCaller::kUncategorized);
     if (convert_to_internal_key_) {
       return new KeyConvertingIterator(iter);
     } else {
@@ -442,6 +459,7 @@ class TableConstructor: public Constructor {
     file_reader_.reset();
   }
 
+  const ReadOptions read_options_;
   uint64_t uniq_id_;
   std::unique_ptr<WritableFileWriter> file_writer_;
   std::unique_ptr<RandomAccessFileReader> file_reader_;
@@ -485,7 +503,7 @@ class MemTableConstructor: public Constructor {
                              kMaxSequenceNumber, 0 /* column_family_id */);
     memtable_->Ref();
     int seq = 1;
-    for (const auto kv : kv_map) {
+    for (const auto& kv : kv_map) {
       memtable_->Add(seq, kTypeValue, kv.first, kv.second);
       seq++;
     }
@@ -546,9 +564,9 @@ class DBConstructor: public Constructor {
     delete db_;
     db_ = nullptr;
     NewDB();
-    for (const auto kv : kv_map) {
+    for (const auto& kv : kv_map) {
       WriteBatch batch;
-      batch.Put(kv.first, kv.second);
+      EXPECT_OK(batch.Put(kv.first, kv.second));
       EXPECT_TRUE(db_->Write(WriteOptions(), &batch).ok());
     }
     return Status::OK();
@@ -598,10 +616,22 @@ struct TestArgs {
   bool reverse_compare;
   int restart_interval;
   CompressionType compression;
+  uint32_t compression_parallel_threads;
   uint32_t format_version;
   bool use_mmap;
 };
 
+std::ostream& operator<<(std::ostream& os, const TestArgs& args) {
+  os << "type: " << args.type << " reverse_compare: " << args.reverse_compare
+     << " restart_interval: " << args.restart_interval
+     << " compression: " << args.compression
+     << " compression_parallel_threads: " << args.compression_parallel_threads
+     << " format_version: " << args.format_version
+     << " use_mmap: " << args.use_mmap;
+
+  return os;
+}
+
 static std::vector<TestArgs> GenerateArgList() {
   std::vector<TestArgs> test_args;
   std::vector<TestType> test_types = {
@@ -615,6 +645,7 @@ static std::vector<TestArgs> GenerateArgList() {
       MEMTABLE_TEST, DB_TEST};
   std::vector<bool> reverse_compare_types = {false, true};
   std::vector<int> restart_intervals = {16, 1, 1024};
+  std::vector<uint32_t> compression_parallel_threads = {1, 4};
 
   // Only add compression if it is supported
   std::vector<std::pair<CompressionType, bool>> compression_types;
@@ -657,6 +688,8 @@ static std::vector<TestArgs> GenerateArgList() {
         one_arg.reverse_compare = reverse_compare;
         one_arg.restart_interval = restart_intervals[0];
         one_arg.compression = compression_types[0].first;
+        one_arg.compression_parallel_threads = 1;
+        one_arg.format_version = 0;
         one_arg.use_mmap = true;
         test_args.push_back(one_arg);
         one_arg.use_mmap = false;
@@ -667,14 +700,17 @@ static std::vector<TestArgs> GenerateArgList() {
 
       for (auto restart_interval : restart_intervals) {
         for (auto compression_type : compression_types) {
-          TestArgs one_arg;
-          one_arg.type = test_type;
-          one_arg.reverse_compare = reverse_compare;
-          one_arg.restart_interval = restart_interval;
-          one_arg.compression = compression_type.first;
-          one_arg.format_version = compression_type.second ? 2 : 1;
-          one_arg.use_mmap = false;
-          test_args.push_back(one_arg);
+          for (auto num_threads : compression_parallel_threads) {
+            TestArgs one_arg;
+            one_arg.type = test_type;
+            one_arg.reverse_compare = reverse_compare;
+            one_arg.restart_interval = restart_interval;
+            one_arg.compression = compression_type.first;
+            one_arg.compression_parallel_threads = num_threads;
+            one_arg.format_version = compression_type.second ? 2 : 1;
+            one_arg.use_mmap = false;
+            test_args.push_back(one_arg);
+          }
         }
       }
     }
@@ -715,41 +751,38 @@ class FixedOrLessPrefixTransform : public SliceTransform {
 
 class HarnessTest : public testing::Test {
  public:
-  HarnessTest()
-      : ioptions_(options_),
+  explicit HarnessTest(const TestArgs& args)
+      : args_(args),
+        ioptions_(options_),
         moptions_(options_),
-        constructor_(nullptr),
-        write_buffer_(options_.db_write_buffer_size) {}
-
-  void Init(const TestArgs& args) {
-    delete constructor_;
-    constructor_ = nullptr;
-    options_ = Options();
-    options_.compression = args.compression;
+        write_buffer_(options_.db_write_buffer_size),
+        support_prev_(true),
+        only_support_prefix_seek_(false) {
+    options_.compression = args_.compression;
+    options_.compression_opts.parallel_threads =
+        args_.compression_parallel_threads;
     // Use shorter block size for tests to exercise block boundary
     // conditions more.
-    if (args.reverse_compare) {
+    if (args_.reverse_compare) {
       options_.comparator = &reverse_key_comparator;
     }
 
     internal_comparator_.reset(
         new test::PlainInternalKeyComparator(options_.comparator));
 
-    support_prev_ = true;
-    only_support_prefix_seek_ = false;
-    options_.allow_mmap_reads = args.use_mmap;
-    switch (args.type) {
+    options_.allow_mmap_reads = args_.use_mmap;
+    switch (args_.type) {
       case BLOCK_BASED_TABLE_TEST:
         table_options_.flush_block_policy_factory.reset(
             new FlushBlockBySizePolicyFactory());
         table_options_.block_size = 256;
-        table_options_.block_restart_interval = args.restart_interval;
-        table_options_.index_block_restart_interval = args.restart_interval;
-        table_options_.format_version = args.format_version;
+        table_options_.block_restart_interval = args_.restart_interval;
+        table_options_.index_block_restart_interval = args_.restart_interval;
+        table_options_.format_version = args_.format_version;
         options_.table_factory.reset(
             new BlockBasedTableFactory(table_options_));
-        constructor_ = new TableConstructor(
-            options_.comparator, true /* convert_to_internal_key_ */);
+        constructor_.reset(new TableConstructor(
+            options_.comparator, true /* convert_to_internal_key_ */));
         internal_comparator_.reset(
             new InternalKeyComparator(options_.comparator));
         break;
@@ -760,8 +793,8 @@ class HarnessTest : public testing::Test {
         only_support_prefix_seek_ = true;
         options_.prefix_extractor.reset(new FixedOrLessPrefixTransform(2));
         options_.table_factory.reset(NewPlainTableFactory());
-        constructor_ = new TableConstructor(
-            options_.comparator, true /* convert_to_internal_key_ */);
+        constructor_.reset(new TableConstructor(
+            options_.comparator, true /* convert_to_internal_key_ */));
         internal_comparator_.reset(
             new InternalKeyComparator(options_.comparator));
         break;
@@ -770,8 +803,8 @@ class HarnessTest : public testing::Test {
         only_support_prefix_seek_ = true;
         options_.prefix_extractor.reset(NewNoopTransform());
         options_.table_factory.reset(NewPlainTableFactory());
-        constructor_ = new TableConstructor(
-            options_.comparator, true /* convert_to_internal_key_ */);
+        constructor_.reset(new TableConstructor(
+            options_.comparator, true /* convert_to_internal_key_ */));
         internal_comparator_.reset(
             new InternalKeyComparator(options_.comparator));
         break;
@@ -789,8 +822,8 @@ class HarnessTest : public testing::Test {
           options_.table_factory.reset(
               NewPlainTableFactory(plain_table_options));
         }
-        constructor_ = new TableConstructor(
-            options_.comparator, true /* convert_to_internal_key_ */);
+        constructor_.reset(new TableConstructor(
+            options_.comparator, true /* convert_to_internal_key_ */));
         internal_comparator_.reset(
             new InternalKeyComparator(options_.comparator));
         break;
@@ -799,28 +832,26 @@ class HarnessTest : public testing::Test {
         table_options_.block_size = 256;
         options_.table_factory.reset(
             new BlockBasedTableFactory(table_options_));
-        constructor_ = new BlockConstructor(options_.comparator);
+        constructor_.reset(new BlockConstructor(options_.comparator));
         break;
       case MEMTABLE_TEST:
         table_options_.block_size = 256;
         options_.table_factory.reset(
             new BlockBasedTableFactory(table_options_));
-        constructor_ = new MemTableConstructor(options_.comparator,
-                                               &write_buffer_);
+        constructor_.reset(
+            new MemTableConstructor(options_.comparator, &write_buffer_));
         break;
       case DB_TEST:
         table_options_.block_size = 256;
         options_.table_factory.reset(
             new BlockBasedTableFactory(table_options_));
-        constructor_ = new DBConstructor(options_.comparator);
+        constructor_.reset(new DBConstructor(options_.comparator));
         break;
     }
     ioptions_ = ImmutableCFOptions(options_);
     moptions_ = MutableCFOptions(options_);
   }
 
-  ~HarnessTest() override { delete constructor_; }
-
   void Add(const std::string& key, const std::string& value) {
     constructor_->Add(key, value);
   }
@@ -843,12 +874,15 @@ class HarnessTest : public testing::Test {
     InternalIterator* iter = constructor_->NewIterator();
     ASSERT_TRUE(!iter->Valid());
     iter->SeekToFirst();
+    ASSERT_OK(iter->status());
     for (stl_wrappers::KVMap::const_iterator model_iter = data.begin();
          model_iter != data.end(); ++model_iter) {
       ASSERT_EQ(ToString(data, model_iter), ToString(iter));
       iter->Next();
+      ASSERT_OK(iter->status());
     }
     ASSERT_TRUE(!iter->Valid());
+    ASSERT_OK(iter->status());
     if (constructor_->IsArenaMode() && !constructor_->AnywayDeleteIterator()) {
       iter->~InternalIterator();
     } else {
@@ -861,12 +895,15 @@ class HarnessTest : public testing::Test {
     InternalIterator* iter = constructor_->NewIterator();
     ASSERT_TRUE(!iter->Valid());
     iter->SeekToLast();
+    ASSERT_OK(iter->status());
     for (stl_wrappers::KVMap::const_reverse_iterator model_iter = data.rbegin();
          model_iter != data.rend(); ++model_iter) {
       ASSERT_EQ(ToString(data, model_iter), ToString(iter));
       iter->Prev();
+      ASSERT_OK(iter->status());
     }
     ASSERT_TRUE(!iter->Valid());
+    ASSERT_OK(iter->status());
     if (constructor_->IsArenaMode() && !constructor_->AnywayDeleteIterator()) {
       iter->~InternalIterator();
     } else {
@@ -888,6 +925,7 @@ class HarnessTest : public testing::Test {
           if (iter->Valid()) {
             if (kVerbose) fprintf(stderr, "Next\n");
             iter->Next();
+            ASSERT_OK(iter->status());
             ++model_iter;
             ASSERT_EQ(ToString(data, model_iter), ToString(iter));
           }
@@ -897,6 +935,7 @@ class HarnessTest : public testing::Test {
         case 1: {
           if (kVerbose) fprintf(stderr, "SeekToFirst\n");
           iter->SeekToFirst();
+          ASSERT_OK(iter->status());
           model_iter = data.begin();
           ASSERT_EQ(ToString(data, model_iter), ToString(iter));
           break;
@@ -908,6 +947,7 @@ class HarnessTest : public testing::Test {
           if (kVerbose) fprintf(stderr, "Seek '%s'\n",
                                 EscapeString(key).c_str());
           iter->Seek(Slice(key));
+          ASSERT_OK(iter->status());
           ASSERT_EQ(ToString(data, model_iter), ToString(iter));
           break;
         }
@@ -916,6 +956,7 @@ class HarnessTest : public testing::Test {
           if (iter->Valid()) {
             if (kVerbose) fprintf(stderr, "Prev\n");
             iter->Prev();
+            ASSERT_OK(iter->status());
             if (model_iter == data.begin()) {
               model_iter = data.end();   // Wrap around to invalid value
             } else {
@@ -929,6 +970,7 @@ class HarnessTest : public testing::Test {
         case 4: {
           if (kVerbose) fprintf(stderr, "SeekToLast\n");
           iter->SeekToLast();
+          ASSERT_OK(iter->status());
           if (keys.empty()) {
             model_iter = data.end();
           } else {
@@ -1006,40 +1048,37 @@ class HarnessTest : public testing::Test {
   // Returns nullptr if not running against a DB
   DB* db() const { return constructor_->db(); }
 
-  void RandomizedHarnessTest(size_t part, size_t total) {
-    std::vector<TestArgs> args = GenerateArgList();
-    assert(part);
-    assert(part <= total);
-    for (size_t i = 0; i < args.size(); i++) {
-      if ((i % total) + 1 != part) {
-        continue;
-      }
-      Init(args[i]);
-      Random rnd(test::RandomSeed() + 5);
-      for (int num_entries = 0; num_entries < 2000;
-           num_entries += (num_entries < 50 ? 1 : 200)) {
-        for (int e = 0; e < num_entries; e++) {
-          std::string v;
-          Add(test::RandomKey(&rnd, rnd.Skewed(4)),
-              test::RandomString(&rnd, rnd.Skewed(5), &v).ToString());
-        }
-        Test(&rnd);
-      }
-    }
-  }
-
  private:
-  Options options_ = Options();
+  TestArgs args_;
+  Options options_;
   ImmutableCFOptions ioptions_;
   MutableCFOptions moptions_;
-  BlockBasedTableOptions table_options_ = BlockBasedTableOptions();
-  Constructor* constructor_;
+  BlockBasedTableOptions table_options_;
+  std::unique_ptr<Constructor> constructor_;
   WriteBufferManager write_buffer_;
   bool support_prev_;
   bool only_support_prefix_seek_;
   std::shared_ptr<InternalKeyComparator> internal_comparator_;
 };
 
+class ParameterizedHarnessTest : public HarnessTest,
+                                 public testing::WithParamInterface<TestArgs> {
+ public:
+  ParameterizedHarnessTest() : HarnessTest(GetParam()) {}
+};
+
+INSTANTIATE_TEST_CASE_P(TableTest, ParameterizedHarnessTest,
+                        ::testing::ValuesIn(GenerateArgList()));
+
+class DBHarnessTest : public HarnessTest {
+ public:
+  DBHarnessTest()
+      : HarnessTest(TestArgs{DB_TEST, /* reverse_compare */ false,
+                             /* restart_interval */ 16, kNoCompression,
+                             /* compression_parallel_threads */ 1,
+                             /* format_version */ 0, /* use_mmap */ false}) {}
+};
+
 static bool Between(uint64_t val, uint64_t low, uint64_t high) {
   bool result = (val >= low) && (val <= high);
   if (!result) {
@@ -1091,7 +1130,10 @@ class BlockBasedTableTest
     std::unique_ptr<TraceWriter> trace_writer;
     EXPECT_OK(NewFileTraceWriter(env_, EnvOptions(), trace_file_path_,
                                  &trace_writer));
-    c->block_cache_tracer_.StartTrace(env_, trace_opt, std::move(trace_writer));
+    // Always return Status::OK().
+    assert(c->block_cache_tracer_
+               .StartTrace(env_, trace_opt, std::move(trace_writer))
+               .ok());
     {
       std::string user_key = "k01";
       InternalKey internal_key(user_key, 0, kTypeValue);
@@ -1111,51 +1153,53 @@ class BlockBasedTableTest
       const std::vector<BlockCacheTraceRecord>& expected_records) {
     c->block_cache_tracer_.EndTrace();
 
-    std::unique_ptr<TraceReader> trace_reader;
-    Status s =
-        NewFileTraceReader(env_, EnvOptions(), trace_file_path_, &trace_reader);
-    EXPECT_OK(s);
-    BlockCacheTraceReader reader(std::move(trace_reader));
-    BlockCacheTraceHeader header;
-    EXPECT_OK(reader.ReadHeader(&header));
-    uint32_t index = 0;
-    while (s.ok()) {
-      BlockCacheTraceRecord access;
-      s = reader.ReadAccess(&access);
-      if (!s.ok()) {
-        break;
-      }
-      ASSERT_LT(index, expected_records.size());
-      EXPECT_NE("", access.block_key);
-      EXPECT_EQ(access.block_type, expected_records[index].block_type);
-      EXPECT_GT(access.block_size, 0);
-      EXPECT_EQ(access.caller, expected_records[index].caller);
-      EXPECT_EQ(access.no_insert, expected_records[index].no_insert);
-      EXPECT_EQ(access.is_cache_hit, expected_records[index].is_cache_hit);
-      // Get
-      if (access.caller == TableReaderCaller::kUserGet) {
-        EXPECT_EQ(access.referenced_key,
-                  expected_records[index].referenced_key);
-        EXPECT_EQ(access.get_id, expected_records[index].get_id);
-        EXPECT_EQ(access.get_from_user_specified_snapshot,
-                  expected_records[index].get_from_user_specified_snapshot);
-        if (access.block_type == TraceType::kBlockTraceDataBlock) {
-          EXPECT_GT(access.referenced_data_size, 0);
-          EXPECT_GT(access.num_keys_in_block, 0);
-          EXPECT_EQ(access.referenced_key_exist_in_block,
-                    expected_records[index].referenced_key_exist_in_block);
+    {
+      std::unique_ptr<TraceReader> trace_reader;
+      Status s =
+          NewFileTraceReader(env_, EnvOptions(), trace_file_path_, &trace_reader);
+      EXPECT_OK(s);
+      BlockCacheTraceReader reader(std::move(trace_reader));
+      BlockCacheTraceHeader header;
+      EXPECT_OK(reader.ReadHeader(&header));
+      uint32_t index = 0;
+      while (s.ok()) {
+        BlockCacheTraceRecord access;
+        s = reader.ReadAccess(&access);
+        if (!s.ok()) {
+          break;
         }
-      } else {
-        EXPECT_EQ(access.referenced_key, "");
-        EXPECT_EQ(access.get_id, 0);
-        EXPECT_TRUE(access.get_from_user_specified_snapshot == Boolean::kFalse);
-        EXPECT_EQ(access.referenced_data_size, 0);
-        EXPECT_EQ(access.num_keys_in_block, 0);
-        EXPECT_TRUE(access.referenced_key_exist_in_block == Boolean::kFalse);
+        ASSERT_LT(index, expected_records.size());
+        EXPECT_NE("", access.block_key);
+        EXPECT_EQ(access.block_type, expected_records[index].block_type);
+        EXPECT_GT(access.block_size, 0);
+        EXPECT_EQ(access.caller, expected_records[index].caller);
+        EXPECT_EQ(access.no_insert, expected_records[index].no_insert);
+        EXPECT_EQ(access.is_cache_hit, expected_records[index].is_cache_hit);
+        // Get
+        if (access.caller == TableReaderCaller::kUserGet) {
+          EXPECT_EQ(access.referenced_key,
+                    expected_records[index].referenced_key);
+          EXPECT_EQ(access.get_id, expected_records[index].get_id);
+          EXPECT_EQ(access.get_from_user_specified_snapshot,
+                    expected_records[index].get_from_user_specified_snapshot);
+          if (access.block_type == TraceType::kBlockTraceDataBlock) {
+            EXPECT_GT(access.referenced_data_size, 0);
+            EXPECT_GT(access.num_keys_in_block, 0);
+            EXPECT_EQ(access.referenced_key_exist_in_block,
+                      expected_records[index].referenced_key_exist_in_block);
+          }
+        } else {
+          EXPECT_EQ(access.referenced_key, "");
+          EXPECT_EQ(access.get_id, 0);
+          EXPECT_TRUE(access.get_from_user_specified_snapshot == Boolean::kFalse);
+          EXPECT_EQ(access.referenced_data_size, 0);
+          EXPECT_EQ(access.num_keys_in_block, 0);
+          EXPECT_TRUE(access.referenced_key_exist_in_block == Boolean::kFalse);
+        }
+        index++;
       }
-      index++;
+      EXPECT_EQ(index, expected_records.size());
     }
-    EXPECT_EQ(index, expected_records.size());
     EXPECT_OK(env_->DeleteFile(trace_file_path_));
     EXPECT_OK(env_->DeleteDir(test_path_));
   }
@@ -1178,17 +1222,19 @@ class FileChecksumTestHelper {
  public:
   FileChecksumTestHelper(bool convert_to_internal_key = false)
       : convert_to_internal_key_(convert_to_internal_key) {
-    sink_ = new test::StringSink();
   }
   ~FileChecksumTestHelper() {}
 
   void CreateWriteableFile() {
+    sink_ = new test::StringSink();
     file_writer_.reset(test::GetWritableFileWriter(sink_, "" /* don't care */));
   }
 
-  void SetFileChecksumFunc(FileChecksumFunc* checksum_func) {
+  void SetFileChecksumGenerator(FileChecksumGenerator* checksum_generator) {
     if (file_writer_ != nullptr) {
-      file_writer_->TEST_SetFileChecksumFunc(checksum_func);
+      file_writer_->TEST_SetFileChecksumGenerator(checksum_generator);
+    } else {
+      delete checksum_generator;
     }
   }
 
@@ -1203,14 +1249,13 @@ class FileChecksumTestHelper {
   void AddKVtoKVMap(int num_entries) {
     Random rnd(test::RandomSeed());
     for (int i = 0; i < num_entries; i++) {
-      std::string v;
-      test::RandomString(&rnd, 100, &v);
+      std::string v = rnd.RandomString(100);
       kv_map_[test::RandomKey(&rnd, 20)] = v;
     }
   }
 
   Status WriteKVAndFlushTable() {
-    for (const auto kv : kv_map_) {
+    for (const auto& kv : kv_map_) {
       if (convert_to_internal_key_) {
         ParsedInternalKey ikey(kv.first, kMaxSequenceNumber, kTypeValue);
         std::string encoded;
@@ -1229,15 +1274,18 @@ class FileChecksumTestHelper {
     return s;
   }
 
-  std::string GetFileChecksum() { return table_builder_->GetFileChecksum(); }
+  std::string GetFileChecksum() {
+    file_writer_->Close();
+    return table_builder_->GetFileChecksum();
+  }
 
   const char* GetFileChecksumFuncName() {
     return table_builder_->GetFileChecksumFuncName();
   }
 
-  Status CalculateFileChecksum(FileChecksumFunc* file_checksum_func,
+  Status CalculateFileChecksum(FileChecksumGenerator* file_checksum_generator,
                                std::string* checksum) {
-    assert(file_checksum_func != nullptr);
+    assert(file_checksum_generator != nullptr);
     cur_uniq_id_ = checksum_uniq_id_++;
     test::StringSink* ss_rw =
         ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter(
@@ -1247,29 +1295,24 @@ class FileChecksumTestHelper {
     std::unique_ptr<char[]> scratch(new char[2048]);
     Slice result;
     uint64_t offset = 0;
-    std::string tmp_checksum;
-    bool first_read = true;
     Status s;
-    s = file_reader_->Read(offset, 2048, &result, scratch.get(), false);
+    s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(),
+                           nullptr, false);
     if (!s.ok()) {
       return s;
     }
     while (result.size() != 0) {
-      if (first_read) {
-        first_read = false;
-        tmp_checksum = file_checksum_func->Value(scratch.get(), result.size());
-      } else {
-        tmp_checksum = file_checksum_func->Extend(tmp_checksum, scratch.get(),
-                                                  result.size());
-      }
+      file_checksum_generator->Update(scratch.get(), result.size());
       offset += static_cast<uint64_t>(result.size());
-      s = file_reader_->Read(offset, 2048, &result, scratch.get(), false);
+      s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(),
+                             nullptr, false);
       if (!s.ok()) {
         return s;
       }
     }
     EXPECT_EQ(offset, static_cast<uint64_t>(table_builder_->FileSize()));
-    *checksum = tmp_checksum;
+    file_checksum_generator->Finalize();
+    *checksum = file_checksum_generator->GetChecksum();
     return Status::OK();
   }
 
@@ -1280,7 +1323,7 @@ class FileChecksumTestHelper {
   std::unique_ptr<RandomAccessFileReader> file_reader_;
   std::unique_ptr<TableBuilder> table_builder_;
   stl_wrappers::KVMap kv_map_;
-  test::StringSink* sink_;
+  test::StringSink* sink_ = nullptr;
 
   static uint64_t checksum_uniq_id_;
 };
@@ -1475,8 +1518,9 @@ TEST_P(BlockBasedTableTest, BlockBasedTableProperties2) {
     ASSERT_EQ("rocksdb.ReverseBytewiseComparator", props.comparator_name);
     ASSERT_EQ("UInt64AddOperator", props.merge_operator_name);
     ASSERT_EQ("rocksdb.Noop", props.prefix_extractor_name);
-    ASSERT_EQ("[DummyPropertiesCollector1,DummyPropertiesCollector2]",
-              props.property_collectors_names);
+    ASSERT_EQ(
+        "[DummyPropertiesCollectorFactory1,DummyPropertiesCollectorFactory2]",
+        props.property_collectors_names);
     ASSERT_EQ("", props.filter_policy_name);  // no filter policy is used
     c.ResetTableReader();
   }
@@ -1529,7 +1573,8 @@ TEST_P(BlockBasedTableTest, RangeDelBlock) {
     for (size_t i = 0; i < expected_tombstones.size(); i++) {
       ASSERT_TRUE(iter->Valid());
       ParsedInternalKey parsed_key;
-      ASSERT_TRUE(ParseInternalKey(iter->key(), &parsed_key));
+      ASSERT_OK(
+          ParseInternalKey(iter->key(), &parsed_key, true /* log_err_key */));
       RangeTombstone t(parsed_key, iter->value());
       const auto& expected_t = expected_tombstones[i];
       ASSERT_EQ(t.start_key_, expected_t.start_key_);
@@ -1860,10 +1905,11 @@ TEST_P(BlockBasedTableTest, SkipPrefixBloomFilter) {
   options.prefix_extractor.reset(NewFixedPrefixTransform(9));
   const ImmutableCFOptions new_ioptions(options);
   const MutableCFOptions new_moptions(options);
-  c.Reopen(new_ioptions, new_moptions);
+  ASSERT_OK(c.Reopen(new_ioptions, new_moptions));
   auto reader = c.GetTableReader();
+  ReadOptions read_options;
   std::unique_ptr<InternalIterator> db_iter(reader->NewIterator(
-      ReadOptions(), new_moptions.prefix_extractor.get(), /*arena=*/nullptr,
+      read_options, new_moptions.prefix_extractor.get(), /*arena=*/nullptr,
       /*skip_filters=*/false, TableReaderCaller::kUncategorized));
 
   // Test point lookup
@@ -1877,16 +1923,10 @@ TEST_P(BlockBasedTableTest, SkipPrefixBloomFilter) {
   }
 }
 
-static std::string RandomString(Random* rnd, int len) {
-  std::string r;
-  test::RandomString(rnd, len, &r);
-  return r;
-}
-
 void AddInternalKey(TableConstructor* c, const std::string& prefix,
                     std::string value = "v", int /*suffix_len*/ = 800) {
   static Random rnd(1023);
-  InternalKey k(prefix + RandomString(&rnd, 800), 0, kTypeValue);
+  InternalKey k(prefix + rnd.RandomString(800), 0, kTypeValue);
   c->Add(k.Encode().ToString(), value);
 }
 
@@ -1930,14 +1970,16 @@ void TableTest::IndexTest(BlockBasedTableOptions table_options) {
   ASSERT_EQ(5u, props->num_data_blocks);
 
   // TODO(Zhongyi): update test to use MutableCFOptions
+  ReadOptions read_options;
   std::unique_ptr<InternalIterator> index_iter(reader->NewIterator(
-      ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr,
+      read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr,
       /*skip_filters=*/false, TableReaderCaller::kUncategorized));
 
   // -- Find keys do not exist, but have common prefix.
   std::vector<std::string> prefixes = {"001", "003", "005", "007", "009"};
-  std::vector<std::string> lower_bound = {keys[0], keys[1], keys[2],
-                                          keys[7], keys[9], };
+  std::vector<std::string> lower_bound = {
+      keys[0], keys[1], keys[2], keys[7], keys[9],
+  };
 
   // find the lower bound of the prefix
   for (size_t i = 0; i < prefixes.size(); ++i) {
@@ -2014,6 +2056,80 @@ void TableTest::IndexTest(BlockBasedTableOptions table_options) {
       ASSERT_TRUE(BytewiseComparator()->Compare(prefix, ukey_prefix) > 0);
     }
   }
+
+  {
+    // Test reseek case. It should impact partitioned index more.
+    ReadOptions ro;
+    ro.total_order_seek = true;
+    std::unique_ptr<InternalIterator> index_iter2(reader->NewIterator(
+        ro, moptions.prefix_extractor.get(), /*arena=*/nullptr,
+        /*skip_filters=*/false, TableReaderCaller::kUncategorized));
+
+    // Things to cover in partitioned index:
+    // 1. Both of Seek() and SeekToLast() has optimization to prevent
+    //    rereek leaf index block if it remains to the same one, and
+    //    they reuse the same variable.
+    // 2. When Next() or Prev() is called, the block moves, so the
+    //    optimization should kick in only with the current one.
+    index_iter2->Seek(InternalKey("0055", 0, kTypeValue).Encode());
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0055", index_iter2->key().ToString().substr(0, 4));
+
+    index_iter2->SeekToLast();
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0095", index_iter2->key().ToString().substr(0, 4));
+
+    index_iter2->Seek(InternalKey("0055", 0, kTypeValue).Encode());
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0055", index_iter2->key().ToString().substr(0, 4));
+
+    index_iter2->SeekToLast();
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0095", index_iter2->key().ToString().substr(0, 4));
+    index_iter2->Prev();
+    ASSERT_TRUE(index_iter2->Valid());
+    index_iter2->Prev();
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0075", index_iter2->key().ToString().substr(0, 4));
+
+    index_iter2->Seek(InternalKey("0095", 0, kTypeValue).Encode());
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0095", index_iter2->key().ToString().substr(0, 4));
+    index_iter2->Prev();
+    ASSERT_TRUE(index_iter2->Valid());
+    index_iter2->Prev();
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0075", index_iter2->key().ToString().substr(0, 4));
+
+    index_iter2->SeekToLast();
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0095", index_iter2->key().ToString().substr(0, 4));
+
+    index_iter2->Seek(InternalKey("0095", 0, kTypeValue).Encode());
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0095", index_iter2->key().ToString().substr(0, 4));
+
+    index_iter2->Prev();
+    ASSERT_TRUE(index_iter2->Valid());
+    index_iter2->Prev();
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0075", index_iter2->key().ToString().substr(0, 4));
+
+    index_iter2->Seek(InternalKey("0075", 0, kTypeValue).Encode());
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0075", index_iter2->key().ToString().substr(0, 4));
+
+    index_iter2->Next();
+    ASSERT_TRUE(index_iter2->Valid());
+    index_iter2->Next();
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0095", index_iter2->key().ToString().substr(0, 4));
+
+    index_iter2->SeekToLast();
+    ASSERT_TRUE(index_iter2->Valid());
+    ASSERT_EQ("0095", index_iter2->key().ToString().substr(0, 4));
+  }
+
   c.ResetTableReader();
 }
 
@@ -2092,7 +2208,8 @@ class CustomFlushBlockPolicy : public FlushBlockPolicyFactory,
   explicit CustomFlushBlockPolicy(std::vector<int> keys_per_block)
       : keys_per_block_(keys_per_block) {}
 
-  const char* Name() const override { return "table_test"; }
+  const char* Name() const override { return "CustomFlushBlockPolicy"; }
+
   FlushBlockPolicy* NewFlushBlockPolicy(const BlockBasedTableOptions&,
                                         const BlockBuilder&) const override {
     return new CustomFlushBlockPolicy(keys_per_block_);
@@ -2164,9 +2281,11 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) {
     auto reader = c.GetTableReader();
     auto props = reader->GetTableProperties();
     ASSERT_EQ(4u, props->num_data_blocks);
+    ReadOptions read_options;
     std::unique_ptr<InternalIterator> iter(reader->NewIterator(
-        ReadOptions(), /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
-        /*skip_filters=*/false, TableReaderCaller::kUncategorized));
+        read_options, /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
+        /*skip_filters=*/false, TableReaderCaller::kUncategorized,
+        /*compaction_readahead_size=*/0, /*allow_unprepared_value=*/true));
 
     // Shouldn't have read data blocks before iterator is seeked.
     EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
@@ -2183,6 +2302,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) {
     EXPECT_EQ(keys[2], iter->key().ToString());
     EXPECT_EQ(use_first_key ? 0 : 1,
               stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    ASSERT_TRUE(iter->PrepareValue());
     EXPECT_EQ("v2", iter->value().ToString());
     EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
     EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
@@ -2193,6 +2313,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) {
     EXPECT_EQ(keys[4], iter->key().ToString());
     EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
     EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    ASSERT_TRUE(iter->PrepareValue());
     EXPECT_EQ("v4", iter->value().ToString());
     EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
 
@@ -2208,6 +2329,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) {
     ASSERT_TRUE(iter->Valid());
     EXPECT_EQ(keys[5], iter->key().ToString());
     EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    ASSERT_TRUE(iter->PrepareValue());
     EXPECT_EQ("v5", iter->value().ToString());
     EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
     EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
@@ -2225,6 +2347,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) {
     ASSERT_TRUE(iter->Valid());
     EXPECT_EQ(keys[7], iter->key().ToString());
     EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    ASSERT_TRUE(iter->PrepareValue());
     EXPECT_EQ("v7", iter->value().ToString());
     EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
 
@@ -2246,6 +2369,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) {
     EXPECT_EQ(keys[3], iter->key().ToString());
     EXPECT_EQ(use_first_key ? 1 : 2,
               stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    ASSERT_TRUE(iter->PrepareValue());
     EXPECT_EQ("v3", iter->value().ToString());
     EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
     EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
@@ -2265,6 +2389,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) {
               stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
     // All blocks are in cache now, there'll be no more misses ever.
     EXPECT_EQ(4, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    ASSERT_TRUE(iter->PrepareValue());
     EXPECT_EQ("v1", iter->value().ToString());
 
     // Next into the next block again.
@@ -2292,6 +2417,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) {
     EXPECT_EQ(keys[4], iter->key().ToString());
     EXPECT_EQ(use_first_key ? 3 : 6,
               stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    ASSERT_TRUE(iter->PrepareValue());
     EXPECT_EQ("v4", iter->value().ToString());
     EXPECT_EQ(use_first_key ? 3 : 6,
               stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
@@ -2301,6 +2427,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) {
     EXPECT_EQ(keys[7], iter->key().ToString());
     EXPECT_EQ(use_first_key ? 4 : 7,
               stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    ASSERT_TRUE(iter->PrepareValue());
     EXPECT_EQ("v7", iter->value().ToString());
     EXPECT_EQ(use_first_key ? 4 : 7,
               stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
@@ -2339,9 +2466,11 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKeyGlobalSeqno) {
   auto reader = c.GetTableReader();
   auto props = reader->GetTableProperties();
   ASSERT_EQ(1u, props->num_data_blocks);
+  ReadOptions read_options;
   std::unique_ptr<InternalIterator> iter(reader->NewIterator(
-      ReadOptions(), /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
-      /*skip_filters=*/false, TableReaderCaller::kUncategorized));
+      read_options, /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized,
+      /*compaction_readahead_size=*/0, /*allow_unprepared_value=*/true));
 
   iter->Seek(InternalKey("a", 0, kTypeValue).Encode().ToString());
   ASSERT_TRUE(iter->Valid());
@@ -2351,6 +2480,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKeyGlobalSeqno) {
   // Key should have been served from index, without reading data blocks.
   EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
 
+  ASSERT_TRUE(iter->PrepareValue());
   EXPECT_EQ("x", iter->value().ToString());
   EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
   EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
@@ -2373,7 +2503,7 @@ TEST_P(BlockBasedTableTest, IndexSizeStat) {
   std::vector<std::string> keys;
 
   for (int i = 0; i < 100; ++i) {
-    keys.push_back(RandomString(&rnd, 10000));
+    keys.push_back(rnd.RandomString(10000));
   }
 
   // Each time we load one more key to the table. the table index block
@@ -2417,7 +2547,7 @@ TEST_P(BlockBasedTableTest, NumBlockStat) {
   for (int i = 0; i < 10; ++i) {
     // the key/val are slightly smaller than block size, so that each block
     // holds roughly one key/value pair.
-    c.Add(RandomString(&rnd, 900), "val");
+    c.Add(rnd.RandomString(900), "val");
   }
 
   std::vector<std::string> ks;
@@ -2567,8 +2697,9 @@ TEST_P(BlockBasedTableTest, TracingIterator) {
            GetPlainInternalComparator(options.comparator), &keys, &kvmap);
 
   for (uint32_t i = 1; i <= 2; i++) {
+    ReadOptions read_options;
     std::unique_ptr<InternalIterator> iter(c.GetTableReader()->NewIterator(
-        ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr,
+        read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr,
         /*skip_filters=*/false, TableReaderCaller::kUserIterator));
     iter->SeekToFirst();
     while (iter->Valid()) {
@@ -2712,8 +2843,8 @@ TEST_P(BlockBasedTableTest, BlockCacheDisabledTest) {
                            GetContext::kNotFound, Slice(), nullptr, nullptr,
                            nullptr, true, nullptr, nullptr);
     // a hack that just to trigger BlockBasedTable::GetFilter.
-    reader->Get(ReadOptions(), "non-exist-key", &get_context,
-                moptions.prefix_extractor.get());
+    ASSERT_OK(reader->Get(ReadOptions(), "non-exist-key", &get_context,
+                          moptions.prefix_extractor.get()));
     BlockCachePropertiesSnapshot props(options.statistics.get());
     props.AssertIndexBlockStat(0, 0);
     props.AssertFilterBlockStat(0, 0);
@@ -2787,6 +2918,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
   // Only data block will be accessed
   {
     iter->SeekToFirst();
+    ASSERT_OK(iter->status());
     BlockCachePropertiesSnapshot props(options.statistics.get());
     props.AssertEqual(1, 1, 0 + 1,  // data block miss
                       0);
@@ -2801,6 +2933,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
   {
     iter.reset(c.NewIterator(moptions.prefix_extractor.get()));
     iter->SeekToFirst();
+    ASSERT_OK(iter->status());
     BlockCachePropertiesSnapshot props(options.statistics.get());
     props.AssertEqual(1, 1 + 1, /* index block hit */
                       1, 0 + 1 /* data block hit */);
@@ -2822,7 +2955,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
   options.table_factory.reset(new BlockBasedTableFactory(table_options));
   const ImmutableCFOptions ioptions2(options);
   const MutableCFOptions moptions2(options);
-  c.Reopen(ioptions2, moptions2);
+  ASSERT_OK(c.Reopen(ioptions2, moptions2));
   {
     BlockCachePropertiesSnapshot props(options.statistics.get());
     props.AssertEqual(1,  // index block miss
@@ -2848,6 +2981,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
     // SeekToFirst() accesses data block. With similar reason, we expect data
     // block's cache miss.
     iter->SeekToFirst();
+    ASSERT_OK(iter->status());
     BlockCachePropertiesSnapshot props(options.statistics.get());
     props.AssertEqual(2, 0, 0 + 1,  // data block miss
                       0);
@@ -2900,7 +3034,7 @@ void ValidateBlockSizeDeviation(int value, int expected) {
   BlockBasedTableFactory* factory = new BlockBasedTableFactory(table_options);
 
   const BlockBasedTableOptions* normalized_table_options =
-      (const BlockBasedTableOptions*)factory->GetOptions();
+      factory->GetOptions<BlockBasedTableOptions>();
   ASSERT_EQ(normalized_table_options->block_size_deviation, expected);
 
   delete factory;
@@ -2912,7 +3046,7 @@ void ValidateBlockRestartInterval(int value, int expected) {
   BlockBasedTableFactory* factory = new BlockBasedTableFactory(table_options);
 
   const BlockBasedTableOptions* normalized_table_options =
-      (const BlockBasedTableOptions*)factory->GetOptions();
+      factory->GetOptions<BlockBasedTableOptions>();
   ASSERT_EQ(normalized_table_options->block_restart_interval, expected);
 
   delete factory;
@@ -3168,17 +3302,9 @@ TEST_P(BlockBasedTableTest, NoFileChecksum) {
   BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
   std::unique_ptr<InternalKeyComparator> comparator(
       new InternalKeyComparator(BytewiseComparator()));
-  SequenceNumber largest_seqno = 0;
   int level = 0;
   std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
       int_tbl_prop_collector_factories;
-
-  if (largest_seqno != 0) {
-    // Pretend that it's an external file written by SstFileWriter.
-    int_tbl_prop_collector_factories.emplace_back(
-        new SstFileWriterPropertiesCollectorFactory(2 /* version */,
-                                                    0 /* global_seqno*/));
-  }
   std::string column_family_name;
 
   FileChecksumTestHelper f(true);
@@ -3192,39 +3318,36 @@ TEST_P(BlockBasedTableTest, NoFileChecksum) {
                           column_family_name, level),
       TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
       f.GetFileWriter()));
-  f.ResetTableBuilder(std::move(builder));
+  ASSERT_OK(f.ResetTableBuilder(std::move(builder)));
   f.AddKVtoKVMap(1000);
   f.WriteKVAndFlushTable();
-  ASSERT_STREQ(f.GetFileChecksumFuncName(),
-               kUnknownFileChecksumFuncName.c_str());
-  ASSERT_STREQ(f.GetFileChecksum().c_str(), kUnknownFileChecksum.c_str());
+  ASSERT_STREQ(f.GetFileChecksumFuncName(), kUnknownFileChecksumFuncName);
+  ASSERT_STREQ(f.GetFileChecksum().c_str(), kUnknownFileChecksum);
 }
 
-TEST_P(BlockBasedTableTest, Crc32FileChecksum) {
+TEST_P(BlockBasedTableTest, Crc32cFileChecksum) {
+  FileChecksumGenCrc32cFactory* file_checksum_gen_factory =
+      new FileChecksumGenCrc32cFactory();
   Options options;
-  options.sst_file_checksum_func =
-      std::shared_ptr<FileChecksumFunc>(CreateFileChecksumFuncCrc32c());
+  options.file_checksum_gen_factory.reset(file_checksum_gen_factory);
   ImmutableCFOptions ioptions(options);
   MutableCFOptions moptions(options);
   BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
   std::unique_ptr<InternalKeyComparator> comparator(
       new InternalKeyComparator(BytewiseComparator()));
-  SequenceNumber largest_seqno = 0;
   int level = 0;
   std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
       int_tbl_prop_collector_factories;
-
-  if (largest_seqno != 0) {
-    // Pretend that it's an external file written by SstFileWriter.
-    int_tbl_prop_collector_factories.emplace_back(
-        new SstFileWriterPropertiesCollectorFactory(2 /* version */,
-                                                    0 /* global_seqno*/));
-  }
   std::string column_family_name;
 
+  FileChecksumGenContext gen_context;
+  gen_context.file_name = "db/tmp";
+  std::unique_ptr<FileChecksumGenerator> checksum_crc32c_gen1 =
+      options.file_checksum_gen_factory->CreateFileChecksumGenerator(
+          gen_context);
   FileChecksumTestHelper f(true);
   f.CreateWriteableFile();
-  f.SetFileChecksumFunc(options.sst_file_checksum_func.get());
+  f.SetFileChecksumGenerator(checksum_crc32c_gen1.release());
   std::unique_ptr<TableBuilder> builder;
   builder.reset(ioptions.table_factory->NewTableBuilder(
       TableBuilderOptions(ioptions, moptions, *comparator,
@@ -3234,14 +3357,27 @@ TEST_P(BlockBasedTableTest, Crc32FileChecksum) {
                           column_family_name, level),
       TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
       f.GetFileWriter()));
-  f.ResetTableBuilder(std::move(builder));
+  ASSERT_OK(f.ResetTableBuilder(std::move(builder)));
   f.AddKVtoKVMap(1000);
   f.WriteKVAndFlushTable();
   ASSERT_STREQ(f.GetFileChecksumFuncName(), "FileChecksumCrc32c");
+
+  std::unique_ptr<FileChecksumGenerator> checksum_crc32c_gen2 =
+      options.file_checksum_gen_factory->CreateFileChecksumGenerator(
+          gen_context);
   std::string checksum;
-  ASSERT_OK(
-      f.CalculateFileChecksum(options.sst_file_checksum_func.get(), &checksum));
+  ASSERT_OK(f.CalculateFileChecksum(checksum_crc32c_gen2.get(), &checksum));
   ASSERT_STREQ(f.GetFileChecksum().c_str(), checksum.c_str());
+
+  // Unit test the generator itself for schema stability
+  std::unique_ptr<FileChecksumGenerator> checksum_crc32c_gen3 =
+      options.file_checksum_gen_factory->CreateFileChecksumGenerator(
+          gen_context);
+  const char data[] = "here is some data";
+  checksum_crc32c_gen3->Update(data, sizeof(data));
+  checksum_crc32c_gen3->Finalize();
+  checksum = checksum_crc32c_gen3->GetChecksum();
+  ASSERT_STREQ(checksum.c_str(), "\345\245\277\110");
 }
 
 // Plain table is not supported in ROCKSDB_LITE
@@ -3327,24 +3463,24 @@ TEST_F(PlainTableTest, NoFileChecksum) {
           false /* skip_filters */, column_family_name, unknown_level),
       TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
       f.GetFileWriter()));
-  f.ResetTableBuilder(std::move(builder));
+  ASSERT_OK(f.ResetTableBuilder(std::move(builder)));
   f.AddKVtoKVMap(1000);
   f.WriteKVAndFlushTable();
-  ASSERT_STREQ(f.GetFileChecksumFuncName(),
-               kUnknownFileChecksumFuncName.c_str());
-  EXPECT_EQ(f.GetFileChecksum(), kUnknownFileChecksum.c_str());
+  ASSERT_STREQ(f.GetFileChecksumFuncName(), kUnknownFileChecksumFuncName);
+  EXPECT_EQ(f.GetFileChecksum(), kUnknownFileChecksum);
 }
 
-TEST_F(PlainTableTest, Crc32FileChecksum) {
+TEST_F(PlainTableTest, Crc32cFileChecksum) {
   PlainTableOptions plain_table_options;
   plain_table_options.user_key_len = 20;
   plain_table_options.bloom_bits_per_key = 8;
   plain_table_options.hash_table_ratio = 0;
   PlainTableFactory factory(plain_table_options);
 
+  FileChecksumGenCrc32cFactory* file_checksum_gen_factory =
+      new FileChecksumGenCrc32cFactory();
   Options options;
-  options.sst_file_checksum_func =
-      std::shared_ptr<FileChecksumFunc>(CreateFileChecksumFuncCrc32c());
+  options.file_checksum_gen_factory.reset(file_checksum_gen_factory);
   const ImmutableCFOptions ioptions(options);
   const MutableCFOptions moptions(options);
   InternalKeyComparator ikc(options.comparator);
@@ -3352,9 +3488,15 @@ TEST_F(PlainTableTest, Crc32FileChecksum) {
       int_tbl_prop_collector_factories;
   std::string column_family_name;
   int unknown_level = -1;
+
+  FileChecksumGenContext gen_context;
+  gen_context.file_name = "db/tmp";
+  std::unique_ptr<FileChecksumGenerator> checksum_crc32c_gen1 =
+      options.file_checksum_gen_factory->CreateFileChecksumGenerator(
+          gen_context);
   FileChecksumTestHelper f(true);
   f.CreateWriteableFile();
-  f.SetFileChecksumFunc(options.sst_file_checksum_func.get());
+  f.SetFileChecksumGenerator(checksum_crc32c_gen1.release());
 
   std::unique_ptr<TableBuilder> builder(factory.NewTableBuilder(
       TableBuilderOptions(
@@ -3363,13 +3505,16 @@ TEST_F(PlainTableTest, Crc32FileChecksum) {
           false /* skip_filters */, column_family_name, unknown_level),
       TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
       f.GetFileWriter()));
-  f.ResetTableBuilder(std::move(builder));
+  ASSERT_OK(f.ResetTableBuilder(std::move(builder)));
   f.AddKVtoKVMap(1000);
   f.WriteKVAndFlushTable();
   ASSERT_STREQ(f.GetFileChecksumFuncName(), "FileChecksumCrc32c");
+
+  std::unique_ptr<FileChecksumGenerator> checksum_crc32c_gen2 =
+      options.file_checksum_gen_factory->CreateFileChecksumGenerator(
+          gen_context);
   std::string checksum;
-  ASSERT_OK(
-      f.CalculateFileChecksum(options.sst_file_checksum_func.get(), &checksum));
+  ASSERT_OK(f.CalculateFileChecksum(checksum_crc32c_gen2.get(), &checksum));
   EXPECT_STREQ(f.GetFileChecksum().c_str(), checksum.c_str());
 }
 
@@ -3387,6 +3532,7 @@ TEST_F(GeneralTableTest, ApproximateOffsetOfPlain) {
   std::vector<std::string> keys;
   stl_wrappers::KVMap kvmap;
   Options options;
+  options.db_host_id = "";
   test::PlainInternalKeyComparator internal_comparator(options.comparator);
   options.compression = kNoCompression;
   BlockBasedTableOptions table_options;
@@ -3431,12 +3577,12 @@ static void DoCompressionTest(CompressionType comp) {
   const MutableCFOptions moptions(options);
   c.Finish(options, ioptions, moptions, table_options, ikc, &keys, &kvmap);
 
-  ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"),       0,      0));
-  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"),       0,      0));
-  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"),       0,      0));
-  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"),    2000,   3500));
-  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"),    2000,   3500));
-  ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"),    4000,   6500));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, 0));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"), 0, 0));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), 2000, 3500));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), 2000, 3500));
+  ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 4000, 7000));
   c.ResetTableReader();
 }
 
@@ -3483,74 +3629,25 @@ TEST_F(GeneralTableTest, ApproximateOffsetOfCompressed) {
 }
 
 #ifndef ROCKSDB_VALGRIND_RUN
-// RandomizedHarnessTest is very slow for certain combination of arguments
-// Split into 8 pieces to reduce the time individual tests take.
-TEST_F(HarnessTest, Randomized1) {
-  // part 1 out of 8
-  const size_t part = 1;
-  const size_t total = 8;
-  RandomizedHarnessTest(part, total);
-}
-
-TEST_F(HarnessTest, Randomized2) {
-  // part 2 out of 8
-  const size_t part = 2;
-  const size_t total = 8;
-  RandomizedHarnessTest(part, total);
-}
-
-TEST_F(HarnessTest, Randomized3) {
-  // part 3 out of 8
-  const size_t part = 3;
-  const size_t total = 8;
-  RandomizedHarnessTest(part, total);
-}
-
-TEST_F(HarnessTest, Randomized4) {
-  // part 4 out of 8
-  const size_t part = 4;
-  const size_t total = 8;
-  RandomizedHarnessTest(part, total);
-}
-
-TEST_F(HarnessTest, Randomized5) {
-  // part 5 out of 8
-  const size_t part = 5;
-  const size_t total = 8;
-  RandomizedHarnessTest(part, total);
-}
-
-TEST_F(HarnessTest, Randomized6) {
-  // part 6 out of 8
-  const size_t part = 6;
-  const size_t total = 8;
-  RandomizedHarnessTest(part, total);
-}
-
-TEST_F(HarnessTest, Randomized7) {
-  // part 7 out of 8
-  const size_t part = 7;
-  const size_t total = 8;
-  RandomizedHarnessTest(part, total);
-}
-
-TEST_F(HarnessTest, Randomized8) {
-  // part 8 out of 8
-  const size_t part = 8;
-  const size_t total = 8;
-  RandomizedHarnessTest(part, total);
+TEST_P(ParameterizedHarnessTest, RandomizedHarnessTest) {
+  Random rnd(test::RandomSeed() + 5);
+  for (int num_entries = 0; num_entries < 2000;
+       num_entries += (num_entries < 50 ? 1 : 200)) {
+    for (int e = 0; e < num_entries; e++) {
+      Add(test::RandomKey(&rnd, rnd.Skewed(4)),
+          rnd.RandomString(rnd.Skewed(5)));
+    }
+    Test(&rnd);
+  }
 }
 
 #ifndef ROCKSDB_LITE
-TEST_F(HarnessTest, RandomizedLongDB) {
+TEST_F(DBHarnessTest, RandomizedLongDB) {
   Random rnd(test::RandomSeed());
-  TestArgs args = {DB_TEST, false, 16, kNoCompression, 0, false};
-  Init(args);
   int num_entries = 100000;
   for (int e = 0; e < num_entries; e++) {
     std::string v;
-    Add(test::RandomKey(&rnd, rnd.Skewed(4)),
-        test::RandomString(&rnd, rnd.Skewed(5), &v).ToString());
+    Add(test::RandomKey(&rnd, rnd.Skewed(4)), rnd.RandomString(rnd.Skewed(5)));
   }
   Test(&rnd);
 
@@ -3568,28 +3665,42 @@ TEST_F(HarnessTest, RandomizedLongDB) {
 #endif  // ROCKSDB_LITE
 #endif  // ROCKSDB_VALGRIND_RUN
 
-class MemTableTest : public testing::Test {};
+class MemTableTest : public testing::Test {
+ public:
+  MemTableTest() {
+    InternalKeyComparator cmp(BytewiseComparator());
+    auto table_factory = std::make_shared<SkipListFactory>();
+    options_.memtable_factory = table_factory;
+    ImmutableCFOptions ioptions(options_);
+    wb_ = new WriteBufferManager(options_.db_write_buffer_size);
+    memtable_ = new MemTable(cmp, ioptions, MutableCFOptions(options_), wb_,
+                             kMaxSequenceNumber, 0 /* column_family_id */);
+    memtable_->Ref();
+  }
+
+  ~MemTableTest() {
+    delete memtable_->Unref();
+    delete wb_;
+  }
+
+  MemTable* GetMemTable() { return memtable_; }
+
+ private:
+  MemTable* memtable_;
+  Options options_;
+  WriteBufferManager* wb_;
+};
 
 TEST_F(MemTableTest, Simple) {
-  InternalKeyComparator cmp(BytewiseComparator());
-  auto table_factory = std::make_shared<SkipListFactory>();
-  Options options;
-  options.memtable_factory = table_factory;
-  ImmutableCFOptions ioptions(options);
-  WriteBufferManager wb(options.db_write_buffer_size);
-  MemTable* memtable =
-      new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
-                   kMaxSequenceNumber, 0 /* column_family_id */);
-  memtable->Ref();
   WriteBatch batch;
   WriteBatchInternal::SetSequence(&batch, 100);
-  batch.Put(std::string("k1"), std::string("v1"));
-  batch.Put(std::string("k2"), std::string("v2"));
-  batch.Put(std::string("k3"), std::string("v3"));
-  batch.Put(std::string("largekey"), std::string("vlarge"));
-  batch.DeleteRange(std::string("chi"), std::string("xigua"));
-  batch.DeleteRange(std::string("begin"), std::string("end"));
-  ColumnFamilyMemTablesDefault cf_mems_default(memtable);
+  ASSERT_OK(batch.Put(std::string("k1"), std::string("v1")));
+  ASSERT_OK(batch.Put(std::string("k2"), std::string("v2")));
+  ASSERT_OK(batch.Put(std::string("k3"), std::string("v3")));
+  ASSERT_OK(batch.Put(std::string("largekey"), std::string("vlarge")));
+  ASSERT_OK(batch.DeleteRange(std::string("chi"), std::string("xigua")));
+  ASSERT_OK(batch.DeleteRange(std::string("begin"), std::string("end")));
+  ColumnFamilyMemTablesDefault cf_mems_default(GetMemTable());
   ASSERT_TRUE(
       WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr, nullptr)
           .ok());
@@ -3600,10 +3711,10 @@ TEST_F(MemTableTest, Simple) {
     std::unique_ptr<InternalIterator> iter_guard;
     InternalIterator* iter;
     if (i == 0) {
-      iter = memtable->NewIterator(ReadOptions(), &arena);
+      iter = GetMemTable()->NewIterator(ReadOptions(), &arena);
       arena_iter_guard.set(iter);
     } else {
-      iter = memtable->NewRangeTombstoneIterator(
+      iter = GetMemTable()->NewRangeTombstoneIterator(
           ReadOptions(), kMaxSequenceNumber /* read_seq */);
       iter_guard.reset(iter);
     }
@@ -3617,54 +3728,36 @@ TEST_F(MemTableTest, Simple) {
       iter->Next();
     }
   }
-
-  delete memtable->Unref();
 }
 
 // Test the empty key
-TEST_F(HarnessTest, SimpleEmptyKey) {
-  auto args = GenerateArgList();
-  for (const auto& arg : args) {
-    Init(arg);
-    Random rnd(test::RandomSeed() + 1);
-    Add("", "v");
-    Test(&rnd);
-  }
+TEST_P(ParameterizedHarnessTest, SimpleEmptyKey) {
+  Random rnd(test::RandomSeed() + 1);
+  Add("", "v");
+  Test(&rnd);
 }
 
-TEST_F(HarnessTest, SimpleSingle) {
-  auto args = GenerateArgList();
-  for (const auto& arg : args) {
-    Init(arg);
-    Random rnd(test::RandomSeed() + 2);
-    Add("abc", "v");
-    Test(&rnd);
-  }
+TEST_P(ParameterizedHarnessTest, SimpleSingle) {
+  Random rnd(test::RandomSeed() + 2);
+  Add("abc", "v");
+  Test(&rnd);
 }
 
-TEST_F(HarnessTest, SimpleMulti) {
-  auto args = GenerateArgList();
-  for (const auto& arg : args) {
-    Init(arg);
-    Random rnd(test::RandomSeed() + 3);
-    Add("abc", "v");
-    Add("abcd", "v");
-    Add("ac", "v2");
-    Test(&rnd);
-  }
+TEST_P(ParameterizedHarnessTest, SimpleMulti) {
+  Random rnd(test::RandomSeed() + 3);
+  Add("abc", "v");
+  Add("abcd", "v");
+  Add("ac", "v2");
+  Test(&rnd);
 }
 
-TEST_F(HarnessTest, SimpleSpecialKey) {
-  auto args = GenerateArgList();
-  for (const auto& arg : args) {
-    Init(arg);
-    Random rnd(test::RandomSeed() + 4);
-    Add("\xff\xff", "v3");
-    Test(&rnd);
-  }
+TEST_P(ParameterizedHarnessTest, SimpleSpecialKey) {
+  Random rnd(test::RandomSeed() + 4);
+  Add("\xff\xff", "v3");
+  Test(&rnd);
 }
 
-TEST_F(HarnessTest, FooterTests) {
+TEST(TableTest, FooterTests) {
   {
     // upconvert legacy block based
     std::string encoded;
@@ -3675,7 +3768,7 @@ TEST_F(HarnessTest, FooterTests) {
     footer.EncodeTo(&encoded);
     Footer decoded_footer;
     Slice encoded_slice(encoded);
-    decoded_footer.DecodeFrom(&encoded_slice);
+    ASSERT_OK(decoded_footer.DecodeFrom(&encoded_slice));
     ASSERT_EQ(decoded_footer.table_magic_number(), kBlockBasedTableMagicNumber);
     ASSERT_EQ(decoded_footer.checksum(), kCRC32c);
     ASSERT_EQ(decoded_footer.metaindex_handle().offset(), meta_index.offset());
@@ -3695,7 +3788,7 @@ TEST_F(HarnessTest, FooterTests) {
     footer.EncodeTo(&encoded);
     Footer decoded_footer;
     Slice encoded_slice(encoded);
-    decoded_footer.DecodeFrom(&encoded_slice);
+    ASSERT_OK(decoded_footer.DecodeFrom(&encoded_slice));
     ASSERT_EQ(decoded_footer.table_magic_number(), kBlockBasedTableMagicNumber);
     ASSERT_EQ(decoded_footer.checksum(), kxxHash);
     ASSERT_EQ(decoded_footer.metaindex_handle().offset(), meta_index.offset());
@@ -3715,7 +3808,7 @@ TEST_F(HarnessTest, FooterTests) {
     footer.EncodeTo(&encoded);
     Footer decoded_footer;
     Slice encoded_slice(encoded);
-    decoded_footer.DecodeFrom(&encoded_slice);
+    ASSERT_OK(decoded_footer.DecodeFrom(&encoded_slice));
     ASSERT_EQ(decoded_footer.table_magic_number(), kBlockBasedTableMagicNumber);
     ASSERT_EQ(decoded_footer.checksum(), kxxHash64);
     ASSERT_EQ(decoded_footer.metaindex_handle().offset(), meta_index.offset());
@@ -3736,7 +3829,7 @@ TEST_F(HarnessTest, FooterTests) {
     footer.EncodeTo(&encoded);
     Footer decoded_footer;
     Slice encoded_slice(encoded);
-    decoded_footer.DecodeFrom(&encoded_slice);
+    ASSERT_OK(decoded_footer.DecodeFrom(&encoded_slice));
     ASSERT_EQ(decoded_footer.table_magic_number(), kPlainTableMagicNumber);
     ASSERT_EQ(decoded_footer.checksum(), kCRC32c);
     ASSERT_EQ(decoded_footer.metaindex_handle().offset(), meta_index.offset());
@@ -3756,7 +3849,7 @@ TEST_F(HarnessTest, FooterTests) {
     footer.EncodeTo(&encoded);
     Footer decoded_footer;
     Slice encoded_slice(encoded);
-    decoded_footer.DecodeFrom(&encoded_slice);
+    ASSERT_OK(decoded_footer.DecodeFrom(&encoded_slice));
     ASSERT_EQ(decoded_footer.table_magic_number(), kPlainTableMagicNumber);
     ASSERT_EQ(decoded_footer.checksum(), kxxHash);
     ASSERT_EQ(decoded_footer.metaindex_handle().offset(), meta_index.offset());
@@ -3776,7 +3869,7 @@ TEST_F(HarnessTest, FooterTests) {
     footer.EncodeTo(&encoded);
     Footer decoded_footer;
     Slice encoded_slice(encoded);
-    decoded_footer.DecodeFrom(&encoded_slice);
+    ASSERT_OK(decoded_footer.DecodeFrom(&encoded_slice));
     ASSERT_EQ(decoded_footer.table_magic_number(), kBlockBasedTableMagicNumber);
     ASSERT_EQ(decoded_footer.checksum(), kCRC32c);
     ASSERT_EQ(decoded_footer.metaindex_handle().offset(), meta_index.offset());
@@ -3822,8 +3915,8 @@ TEST_P(IndexBlockRestartIntervalTest, IndexBlockRestartInterval) {
   TableConstructor c(BytewiseComparator());
   static Random rnd(301);
   for (int i = 0; i < kKeysInTable; i++) {
-    InternalKey k(RandomString(&rnd, kKeySize), 0, kTypeValue);
-    c.Add(k.Encode().ToString(), RandomString(&rnd, kValSize));
+    InternalKey k(rnd.RandomString(kKeySize), 0, kTypeValue);
+    c.Add(k.Encode().ToString(), rnd.RandomString(kValSize));
   }
 
   std::vector<std::string> keys;
@@ -3836,8 +3929,9 @@ TEST_P(IndexBlockRestartIntervalTest, IndexBlockRestartInterval) {
            &kvmap);
   auto reader = c.GetTableReader();
 
+  ReadOptions read_options;
   std::unique_ptr<InternalIterator> db_iter(reader->NewIterator(
-      ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr,
+      read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr,
       /*skip_filters=*/false, TableReaderCaller::kUncategorized));
 
   // Test point lookup
@@ -3934,12 +4028,12 @@ TEST_F(PrefixTest, PrefixAndWholeKeyTest) {
     std::string prefix = "[" + std::to_string(i) + "]";
     for (int j = 0; j < 10; j++) {
       std::string key = prefix + std::to_string(j);
-      db->Put(ROCKSDB_NAMESPACE::WriteOptions(), key, "1");
+      ASSERT_OK(db->Put(ROCKSDB_NAMESPACE::WriteOptions(), key, "1"));
     }
   }
 
   // Trigger compaction.
-  db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+  ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
   delete db;
   // In the second round, turn whole_key_filtering off and expect
   // rocksdb still works.
@@ -4023,6 +4117,7 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
 
   // Helper function to get the contents of the table InternalIterator
   std::unique_ptr<TableReader> table_reader;
+  const ReadOptions read_options;
   std::function<InternalIterator*()> GetTableInternalIter = [&]() {
     std::unique_ptr<RandomAccessFileReader> file_reader(
         test::GetRandomAccessFileReader(
@@ -4034,7 +4129,7 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
         std::move(file_reader), ss_rw.contents().size(), &table_reader);
 
     return table_reader->NewIterator(
-        ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr,
+        read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr,
         /*skip_filters=*/false, TableReaderCaller::kUncategorized);
   };
 
@@ -4046,7 +4141,7 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
   char current_c = 'a';
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     ParsedInternalKey pik;
-    ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
+    ASSERT_OK(ParseInternalKey(iter->key(), &pik, true /* log_err_key */));
 
     ASSERT_EQ(pik.type, ValueType::kTypeValue);
     ASSERT_EQ(pik.sequence, 0);
@@ -4067,7 +4162,7 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
   current_c = 'a';
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     ParsedInternalKey pik;
-    ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
+    ASSERT_OK(ParseInternalKey(iter->key(), &pik, true /* log_err_key */));
 
     ASSERT_EQ(pik.type, ValueType::kTypeValue);
     ASSERT_EQ(pik.sequence, 10);
@@ -4085,7 +4180,7 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
     ASSERT_TRUE(iter->Valid());
 
     ParsedInternalKey pik;
-    ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
+    ASSERT_OK(ParseInternalKey(iter->key(), &pik, true /* log_err_key */));
 
     ASSERT_EQ(pik.type, ValueType::kTypeValue);
     ASSERT_EQ(pik.sequence, 10);
@@ -4104,7 +4199,7 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
   current_c = 'a';
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     ParsedInternalKey pik;
-    ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
+    ASSERT_OK(ParseInternalKey(iter->key(), &pik, true /* log_err_key */));
 
     ASSERT_EQ(pik.type, ValueType::kTypeValue);
     ASSERT_EQ(pik.sequence, 3);
@@ -4123,7 +4218,7 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
     ASSERT_TRUE(iter->Valid());
 
     ParsedInternalKey pik;
-    ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
+    ASSERT_OK(ParseInternalKey(iter->key(), &pik, true /* log_err_key */));
 
     ASSERT_EQ(pik.type, ValueType::kTypeValue);
     ASSERT_EQ(pik.sequence, 3);
@@ -4205,8 +4300,9 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
                          GetPlainInternalComparator(options2.comparator)),
       std::move(file_reader), ss_rw.contents().size(), &table_reader));
 
+  ReadOptions read_options;
   std::unique_ptr<InternalIterator> db_iter(table_reader->NewIterator(
-      ReadOptions(), moptions2.prefix_extractor.get(), /*arena=*/nullptr,
+      read_options, moptions2.prefix_extractor.get(), /*arena=*/nullptr,
       /*skip_filters=*/false, TableReaderCaller::kUncategorized));
 
   int expected_key = 1;
@@ -4273,8 +4369,10 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
     uint64_t file_size = ss_rw.contents().size();
 
     Footer footer;
-    ASSERT_OK(ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size,
-                                 &footer, kBlockBasedTableMagicNumber));
+    IOOptions opts;
+    ASSERT_OK(ReadFooterFromFile(opts, file, nullptr /* prefetch_buffer */,
+                                 file_size, &footer,
+                                 kBlockBasedTableMagicNumber));
 
     auto BlockFetchHelper = [&](const BlockHandle& handle, BlockType block_type,
                                 BlockContents* contents) {
@@ -4297,11 +4395,10 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
 
     BlockFetchHelper(metaindex_handle, BlockType::kMetaIndex,
                      &metaindex_contents);
-    Block metaindex_block(std::move(metaindex_contents),
-                          kDisableGlobalSequenceNumber);
+    Block metaindex_block(std::move(metaindex_contents));
 
     std::unique_ptr<InternalIterator> meta_iter(metaindex_block.NewDataIterator(
-        BytewiseComparator(), BytewiseComparator()));
+        BytewiseComparator(), kDisableGlobalSequenceNumber));
     bool found_properties_block = true;
     ASSERT_OK(SeekToPropertiesBlock(meta_iter.get(), &found_properties_block));
     ASSERT_TRUE(found_properties_block);
@@ -4314,8 +4411,7 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
 
     BlockFetchHelper(properties_handle, BlockType::kProperties,
                      &properties_contents);
-    Block properties_block(std::move(properties_contents),
-                           kDisableGlobalSequenceNumber);
+    Block properties_block(std::move(properties_contents));
 
     ASSERT_EQ(properties_block.NumRestarts(), 1u);
   }
@@ -4361,7 +4457,8 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
 
   // read footer
   Footer footer;
-  ASSERT_OK(ReadFooterFromFile(table_reader.get(),
+  IOOptions opts;
+  ASSERT_OK(ReadFooterFromFile(opts, table_reader.get(),
                                nullptr /* prefetch_buffer */, table_size,
                                &footer, kBlockBasedTableMagicNumber));
 
@@ -4376,12 +4473,12 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
       UncompressionDict::GetEmptyDict(), pcache_opts,
       nullptr /*memory_allocator*/);
   ASSERT_OK(block_fetcher.ReadBlockContents());
-  Block metaindex_block(std::move(metaindex_contents),
-                        kDisableGlobalSequenceNumber);
+  Block metaindex_block(std::move(metaindex_contents));
 
   // verify properties block comes last
   std::unique_ptr<InternalIterator> metaindex_iter{
-      metaindex_block.NewDataIterator(options.comparator, options.comparator)};
+      metaindex_block.NewDataIterator(options.comparator,
+                                      kDisableGlobalSequenceNumber)};
   uint64_t max_offset = 0;
   std::string key_at_max_offset;
   for (metaindex_iter->SeekToFirst(); metaindex_iter->Valid();
@@ -4458,9 +4555,10 @@ TEST_F(BBTTailPrefetchTest, TestTailPrefetchStats) {
 TEST_F(BBTTailPrefetchTest, FilePrefetchBufferMinOffset) {
   TailPrefetchStats tpstats;
   FilePrefetchBuffer buffer(nullptr, 0, 0, false, true);
-  buffer.TryReadFromCache(500, 10, nullptr);
-  buffer.TryReadFromCache(480, 10, nullptr);
-  buffer.TryReadFromCache(490, 10, nullptr);
+  IOOptions opts;
+  buffer.TryReadFromCache(opts, 500, 10, nullptr);
+  buffer.TryReadFromCache(opts, 480, 10, nullptr);
+  buffer.TryReadFromCache(opts, 490, 10, nullptr);
   ASSERT_EQ(480, buffer.min_offset_read());
 }
 
@@ -4483,9 +4581,9 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) {
   static Random rnd(1048);
   for (int i = 0; i < kNumKeys; i++) {
     // padding one "0" to mark existent keys.
-    std::string random_key(RandomString(&rnd, kKeySize - 1) + "1");
+    std::string random_key(rnd.RandomString(kKeySize - 1) + "1");
     InternalKey k(random_key, 0, kTypeValue);
-    c.Add(k.Encode().ToString(), RandomString(&rnd, kValSize));
+    c.Add(k.Encode().ToString(), rnd.RandomString(kValSize));
   }
 
   std::vector<std::string> keys;
@@ -4499,8 +4597,9 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) {
   auto reader = c.GetTableReader();
 
   std::unique_ptr<InternalIterator> seek_iter;
+  ReadOptions read_options;
   seek_iter.reset(reader->NewIterator(
-      ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr,
+      read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr,
       /*skip_filters=*/false, TableReaderCaller::kUncategorized));
   for (int i = 0; i < 2; ++i) {
     ReadOptions ro;
@@ -4587,13 +4686,15 @@ TEST_P(BlockBasedTableTest, OutOfBoundOnSeek) {
       /*skip_filters=*/false, TableReaderCaller::kUncategorized)));
   iter->SeekToFirst();
   ASSERT_FALSE(iter->Valid());
-  ASSERT_TRUE(iter->IsOutOfBound());
+  ASSERT_OK(iter->status());
+  ASSERT_TRUE(iter->UpperBoundCheckResult() == IterBoundCheck::kOutOfBound);
   iter.reset(new KeyConvertingIterator(reader->NewIterator(
       read_opt, /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
       /*skip_filters=*/false, TableReaderCaller::kUncategorized)));
   iter->Seek("foo");
   ASSERT_FALSE(iter->Valid());
-  ASSERT_TRUE(iter->IsOutOfBound());
+  ASSERT_OK(iter->status());
+  ASSERT_TRUE(iter->UpperBoundCheckResult() == IterBoundCheck::kOutOfBound);
 }
 
 // BlockBasedTableIterator should invalidate itself and return
@@ -4628,7 +4729,7 @@ TEST_P(BlockBasedTableTest, OutOfBoundOnNext) {
   ASSERT_EQ("bar", iter->key());
   iter->Next();
   ASSERT_FALSE(iter->Valid());
-  ASSERT_TRUE(iter->IsOutOfBound());
+  ASSERT_TRUE(iter->UpperBoundCheckResult() == IterBoundCheck::kOutOfBound);
   std::string ub2 = "foo_after";
   Slice ub_slice2(ub2);
   read_opt.iterate_upper_bound = &ub_slice2;
@@ -4640,7 +4741,7 @@ TEST_P(BlockBasedTableTest, OutOfBoundOnNext) {
   ASSERT_EQ("foo", iter->key());
   iter->Next();
   ASSERT_FALSE(iter->Valid());
-  ASSERT_FALSE(iter->IsOutOfBound());
+  ASSERT_FALSE(iter->UpperBoundCheckResult() == IterBoundCheck::kOutOfBound);
 }
 
 }  // namespace ROCKSDB_NAMESPACE