]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/table/table_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / table / table_test.cc
index f217fe50aa9282210dcc223f61e22368da3018d1..2a24c99eb274c959b4f10db4310af4f1b7648b39 100644 (file)
@@ -8,7 +8,6 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include <stdio.h>
-
 #include <algorithm>
 #include <iostream>
 #include <map>
 #include <string>
 #include <vector>
 
+#include "block_fetcher.h"
 #include "cache/lru_cache.h"
 #include "db/dbformat.h"
 #include "db/memtable.h"
 #include "db/write_batch_internal.h"
 #include "memtable/stl_wrappers.h"
+#include "meta_blocks.h"
 #include "monitoring/statistics.h"
 #include "port/port.h"
 #include "rocksdb/cache.h"
 #include "rocksdb/db.h"
 #include "rocksdb/env.h"
+#include "rocksdb/file_checksum.h"
+#include "rocksdb/file_system.h"
 #include "rocksdb/iterator.h"
 #include "rocksdb/memtablerep.h"
 #include "rocksdb/perf_context.h"
 #include "rocksdb/slice_transform.h"
 #include "rocksdb/statistics.h"
 #include "rocksdb/write_buffer_manager.h"
-#include "table/block.h"
-#include "table/block_based_table_builder.h"
-#include "table/block_based_table_factory.h"
-#include "table/block_based_table_reader.h"
-#include "table/block_builder.h"
-#include "table/block_fetcher.h"
+#include "table/block_based/block.h"
+#include "table/block_based/block_based_table_builder.h"
+#include "table/block_based/block_based_table_factory.h"
+#include "table/block_based/block_based_table_reader.h"
+#include "table/block_based/block_builder.h"
+#include "table/block_based/flush_block_policy.h"
 #include "table/format.h"
 #include "table/get_context.h"
 #include "table/internal_iterator.h"
-#include "table/meta_blocks.h"
-#include "table/plain_table_factory.h"
+#include "table/plain/plain_table_factory.h"
 #include "table/scoped_arena_iterator.h"
 #include "table/sst_file_writer_collectors.h"
+#include "test_util/sync_point.h"
+#include "test_util/testharness.h"
+#include "test_util/testutil.h"
 #include "util/compression.h"
+#include "util/file_checksum_helper.h"
 #include "util/random.h"
 #include "util/string_util.h"
-#include "util/sync_point.h"
-#include "util/testharness.h"
-#include "util/testutil.h"
 #include "utilities/merge_operators.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 extern const uint64_t kLegacyBlockBasedTableMagicNumber;
 extern const uint64_t kLegacyPlainTableMagicNumber;
@@ -62,6 +65,8 @@ extern const uint64_t kPlainTableMagicNumber;
 
 namespace {
 
+const std::string kDummyValue(10000, 'o');
+
 // DummyPropertiesCollector used to test BlockBasedTableProperties
 class DummyPropertiesCollector : public TablePropertiesCollector {
  public:
@@ -235,7 +240,7 @@ class BlockConstructor: public Constructor {
   }
   InternalIterator* NewIterator(
       const SliceTransform* /*prefix_extractor*/) const override {
-    return block_->NewIterator<DataBlockIter>(comparator_, comparator_);
+    return block_->NewDataIterator(comparator_, comparator_);
   }
 
  private:
@@ -276,6 +281,7 @@ class KeyConvertingIterator : public InternalIterator {
   void SeekToLast() override { iter_->SeekToLast(); }
   void Next() override { iter_->Next(); }
   void Prev() override { iter_->Prev(); }
+  bool IsOutOfBound() override { return iter_->IsOutOfBound(); }
 
   Slice key() const override {
     assert(Valid());
@@ -306,10 +312,13 @@ class TableConstructor: public Constructor {
  public:
   explicit TableConstructor(const Comparator* cmp,
                             bool convert_to_internal_key = false,
-                            int level = -1)
+                            int level = -1, SequenceNumber largest_seqno = 0)
       : Constructor(cmp),
+        largest_seqno_(largest_seqno),
         convert_to_internal_key_(convert_to_internal_key),
-        level_(level) {}
+        level_(level) {
+    env_ = ROCKSDB_NAMESPACE::Env::Default();
+  }
   ~TableConstructor() override { Reset(); }
 
   Status FinishImpl(const Options& options, const ImmutableCFOptions& ioptions,
@@ -324,6 +333,14 @@ class TableConstructor: public Constructor {
     std::unique_ptr<TableBuilder> builder;
     std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
         int_tbl_prop_collector_factories;
+
+    if (largest_seqno_ != 0) {
+      // Pretend that it's an external file written by SstFileWriter.
+      int_tbl_prop_collector_factories.emplace_back(
+          new SstFileWriterPropertiesCollectorFactory(2 /* version */,
+                                                      0 /* global_seqno*/));
+    }
+
     std::string column_family_name;
     builder.reset(ioptions.table_factory->NewTableBuilder(
         TableBuilderOptions(ioptions, moptions, internal_comparator,
@@ -360,7 +377,7 @@ class TableConstructor: public Constructor {
     return ioptions.table_factory->NewTableReader(
         TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions,
                            internal_comparator, !kSkipFilters, !kImmortal,
-                           level_),
+                           level_, largest_seqno_, &block_cache_tracer_),
         std::move(file_reader_), TEST_GetSink()->contents().size(),
         &table_reader_);
   }
@@ -368,7 +385,9 @@ class TableConstructor: public Constructor {
   InternalIterator* NewIterator(
       const SliceTransform* prefix_extractor) const override {
     ReadOptions ro;
-    InternalIterator* iter = table_reader_->NewIterator(ro, prefix_extractor);
+    InternalIterator* iter = table_reader_->NewIterator(
+        ro, prefix_extractor, /*arena=*/nullptr, /*skip_filters=*/false,
+        TableReaderCaller::kUncategorized);
     if (convert_to_internal_key_) {
       return new KeyConvertingIterator(iter);
     } else {
@@ -380,9 +399,11 @@ class TableConstructor: public Constructor {
     if (convert_to_internal_key_) {
       InternalKey ikey(key, kMaxSequenceNumber, kTypeValue);
       const Slice skey = ikey.Encode();
-      return table_reader_->ApproximateOffsetOf(skey);
+      return table_reader_->ApproximateOffsetOf(
+          skey, TableReaderCaller::kUncategorized);
     }
-    return table_reader_->ApproximateOffsetOf(key);
+    return table_reader_->ApproximateOffsetOf(
+        key, TableReaderCaller::kUncategorized);
   }
 
   virtual Status Reopen(const ImmutableCFOptions& ioptions,
@@ -407,9 +428,12 @@ class TableConstructor: public Constructor {
   bool ConvertToInternalKey() { return convert_to_internal_key_; }
 
   test::StringSink* TEST_GetSink() {
-    return static_cast<test::StringSink*>(file_writer_->writable_file());
+    return ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter(
+        file_writer_.get());
   }
 
+  BlockCacheTracer block_cache_tracer_;
+
  private:
   void Reset() {
     uniq_id_ = 0;
@@ -422,6 +446,7 @@ class TableConstructor: public Constructor {
   std::unique_ptr<WritableFileWriter> file_writer_;
   std::unique_ptr<RandomAccessFileReader> file_reader_;
   std::unique_ptr<TableReader> table_reader_;
+  SequenceNumber largest_seqno_;
   bool convert_to_internal_key_;
   int level_;
 
@@ -429,6 +454,7 @@ class TableConstructor: public Constructor {
 
   static uint64_t cur_uniq_id_;
   EnvOptions soptions;
+  Env* env_;
 };
 uint64_t TableConstructor::cur_uniq_id_ = 1;
 
@@ -1047,7 +1073,9 @@ class BlockBasedTableTest
     : public TableTest,
       virtual public ::testing::WithParamInterface<uint32_t> {
  public:
-  BlockBasedTableTest() : format_(GetParam()) {}
+  BlockBasedTableTest() : format_(GetParam()) {
+    env_ = ROCKSDB_NAMESPACE::Env::Default();
+  }
 
   BlockBasedTableOptions GetBlockBasedTableOptions() {
     BlockBasedTableOptions options;
@@ -1055,16 +1083,210 @@ class BlockBasedTableTest
     return options;
   }
 
+  void SetupTracingTest(TableConstructor* c) {
+    test_path_ = test::PerThreadDBPath("block_based_table_tracing_test");
+    EXPECT_OK(env_->CreateDir(test_path_));
+    trace_file_path_ = test_path_ + "/block_cache_trace_file";
+    TraceOptions trace_opt;
+    std::unique_ptr<TraceWriter> trace_writer;
+    EXPECT_OK(NewFileTraceWriter(env_, EnvOptions(), trace_file_path_,
+                                 &trace_writer));
+    c->block_cache_tracer_.StartTrace(env_, trace_opt, std::move(trace_writer));
+    {
+      std::string user_key = "k01";
+      InternalKey internal_key(user_key, 0, kTypeValue);
+      std::string encoded_key = internal_key.Encode().ToString();
+      c->Add(encoded_key, kDummyValue);
+    }
+    {
+      std::string user_key = "k02";
+      InternalKey internal_key(user_key, 0, kTypeValue);
+      std::string encoded_key = internal_key.Encode().ToString();
+      c->Add(encoded_key, kDummyValue);
+    }
+  }
+
+  void VerifyBlockAccessTrace(
+      TableConstructor* c,
+      const std::vector<BlockCacheTraceRecord>& expected_records) {
+    c->block_cache_tracer_.EndTrace();
+
+    std::unique_ptr<TraceReader> trace_reader;
+    Status s =
+        NewFileTraceReader(env_, EnvOptions(), trace_file_path_, &trace_reader);
+    EXPECT_OK(s);
+    BlockCacheTraceReader reader(std::move(trace_reader));
+    BlockCacheTraceHeader header;
+    EXPECT_OK(reader.ReadHeader(&header));
+    uint32_t index = 0;
+    while (s.ok()) {
+      BlockCacheTraceRecord access;
+      s = reader.ReadAccess(&access);
+      if (!s.ok()) {
+        break;
+      }
+      ASSERT_LT(index, expected_records.size());
+      EXPECT_NE("", access.block_key);
+      EXPECT_EQ(access.block_type, expected_records[index].block_type);
+      EXPECT_GT(access.block_size, 0);
+      EXPECT_EQ(access.caller, expected_records[index].caller);
+      EXPECT_EQ(access.no_insert, expected_records[index].no_insert);
+      EXPECT_EQ(access.is_cache_hit, expected_records[index].is_cache_hit);
+      // Get
+      if (access.caller == TableReaderCaller::kUserGet) {
+        EXPECT_EQ(access.referenced_key,
+                  expected_records[index].referenced_key);
+        EXPECT_EQ(access.get_id, expected_records[index].get_id);
+        EXPECT_EQ(access.get_from_user_specified_snapshot,
+                  expected_records[index].get_from_user_specified_snapshot);
+        if (access.block_type == TraceType::kBlockTraceDataBlock) {
+          EXPECT_GT(access.referenced_data_size, 0);
+          EXPECT_GT(access.num_keys_in_block, 0);
+          EXPECT_EQ(access.referenced_key_exist_in_block,
+                    expected_records[index].referenced_key_exist_in_block);
+        }
+      } else {
+        EXPECT_EQ(access.referenced_key, "");
+        EXPECT_EQ(access.get_id, 0);
+        EXPECT_TRUE(access.get_from_user_specified_snapshot == Boolean::kFalse);
+        EXPECT_EQ(access.referenced_data_size, 0);
+        EXPECT_EQ(access.num_keys_in_block, 0);
+        EXPECT_TRUE(access.referenced_key_exist_in_block == Boolean::kFalse);
+      }
+      index++;
+    }
+    EXPECT_EQ(index, expected_records.size());
+    EXPECT_OK(env_->DeleteFile(trace_file_path_));
+    EXPECT_OK(env_->DeleteDir(test_path_));
+  }
+
  protected:
   uint64_t IndexUncompressedHelper(bool indexCompress);
 
  private:
   uint32_t format_;
+  Env* env_;
+  std::string trace_file_path_;
+  std::string test_path_;
 };
 class PlainTableTest : public TableTest {};
 class TablePropertyTest : public testing::Test {};
 class BBTTailPrefetchTest : public TableTest {};
 
+// The helper class to test the file checksum
+class FileChecksumTestHelper {
+ public:
+  FileChecksumTestHelper(bool convert_to_internal_key = false)
+      : convert_to_internal_key_(convert_to_internal_key) {
+    sink_ = new test::StringSink();
+  }
+  ~FileChecksumTestHelper() {}
+
+  void CreateWriteableFile() {
+    file_writer_.reset(test::GetWritableFileWriter(sink_, "" /* don't care */));
+  }
+
+  void SetFileChecksumFunc(FileChecksumFunc* checksum_func) {
+    if (file_writer_ != nullptr) {
+      file_writer_->TEST_SetFileChecksumFunc(checksum_func);
+    }
+  }
+
+  WritableFileWriter* GetFileWriter() { return file_writer_.get(); }
+
+  Status ResetTableBuilder(std::unique_ptr<TableBuilder>&& builder) {
+    assert(builder != nullptr);
+    table_builder_ = std::move(builder);
+    return Status::OK();
+  }
+
+  void AddKVtoKVMap(int num_entries) {
+    Random rnd(test::RandomSeed());
+    for (int i = 0; i < num_entries; i++) {
+      std::string v;
+      test::RandomString(&rnd, 100, &v);
+      kv_map_[test::RandomKey(&rnd, 20)] = v;
+    }
+  }
+
+  Status WriteKVAndFlushTable() {
+    for (const auto kv : kv_map_) {
+      if (convert_to_internal_key_) {
+        ParsedInternalKey ikey(kv.first, kMaxSequenceNumber, kTypeValue);
+        std::string encoded;
+        AppendInternalKey(&encoded, ikey);
+        table_builder_->Add(encoded, kv.second);
+      } else {
+        table_builder_->Add(kv.first, kv.second);
+      }
+      EXPECT_TRUE(table_builder_->status().ok());
+    }
+    Status s = table_builder_->Finish();
+    file_writer_->Flush();
+    EXPECT_TRUE(s.ok());
+
+    EXPECT_EQ(sink_->contents().size(), table_builder_->FileSize());
+    return s;
+  }
+
+  std::string GetFileChecksum() { return table_builder_->GetFileChecksum(); }
+
+  const char* GetFileChecksumFuncName() {
+    return table_builder_->GetFileChecksumFuncName();
+  }
+
+  Status CalculateFileChecksum(FileChecksumFunc* file_checksum_func,
+                               std::string* checksum) {
+    assert(file_checksum_func != nullptr);
+    cur_uniq_id_ = checksum_uniq_id_++;
+    test::StringSink* ss_rw =
+        ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter(
+            file_writer_.get());
+    file_reader_.reset(test::GetRandomAccessFileReader(
+        new test::StringSource(ss_rw->contents())));
+    std::unique_ptr<char[]> scratch(new char[2048]);
+    Slice result;
+    uint64_t offset = 0;
+    std::string tmp_checksum;
+    bool first_read = true;
+    Status s;
+    s = file_reader_->Read(offset, 2048, &result, scratch.get(), false);
+    if (!s.ok()) {
+      return s;
+    }
+    while (result.size() != 0) {
+      if (first_read) {
+        first_read = false;
+        tmp_checksum = file_checksum_func->Value(scratch.get(), result.size());
+      } else {
+        tmp_checksum = file_checksum_func->Extend(tmp_checksum, scratch.get(),
+                                                  result.size());
+      }
+      offset += static_cast<uint64_t>(result.size());
+      s = file_reader_->Read(offset, 2048, &result, scratch.get(), false);
+      if (!s.ok()) {
+        return s;
+      }
+    }
+    EXPECT_EQ(offset, static_cast<uint64_t>(table_builder_->FileSize()));
+    *checksum = tmp_checksum;
+    return Status::OK();
+  }
+
+ private:
+  bool convert_to_internal_key_;
+  uint64_t cur_uniq_id_;
+  std::unique_ptr<WritableFileWriter> file_writer_;
+  std::unique_ptr<RandomAccessFileReader> file_reader_;
+  std::unique_ptr<TableBuilder> table_builder_;
+  stl_wrappers::KVMap kv_map_;
+  test::StringSink* sink_;
+
+  static uint64_t checksum_uniq_id_;
+};
+
+uint64_t FileChecksumTestHelper::checksum_uniq_id_ = 1;
+
 INSTANTIATE_TEST_CASE_P(FormatDef, BlockBasedTableTest,
                         testing::Values(test::kDefaultFormatVersion));
 INSTANTIATE_TEST_CASE_P(FormatLatest, BlockBasedTableTest,
@@ -1478,7 +1700,7 @@ TEST_P(BlockBasedTableTest, PrefetchTest) {
 
 TEST_P(BlockBasedTableTest, TotalOrderSeekOnHashIndex) {
   BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
-  for (int i = 0; i < 4; ++i) {
+  for (int i = 0; i <= 5; ++i) {
     Options options;
     // Make each key/value an individual block
     table_options.block_size = 64;
@@ -1509,11 +1731,16 @@ TEST_P(BlockBasedTableTest, TotalOrderSeekOnHashIndex) {
       options.prefix_extractor.reset(NewFixedPrefixTransform(4));
       break;
     case 4:
-    default:
-      // Binary search index
+      // Two-level index
       table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
       options.table_factory.reset(new BlockBasedTableFactory(table_options));
       break;
+    case 5:
+      // Binary search with first key
+      table_options.index_type =
+          BlockBasedTableOptions::kBinarySearchWithFirstKey;
+      options.table_factory.reset(new BlockBasedTableFactory(table_options));
+      break;
     }
 
     TableConstructor c(BytewiseComparator(),
@@ -1536,8 +1763,9 @@ TEST_P(BlockBasedTableTest, TotalOrderSeekOnHashIndex) {
     auto* reader = c.GetTableReader();
     ReadOptions ro;
     ro.total_order_seek = true;
-    std::unique_ptr<InternalIterator> iter(
-        reader->NewIterator(ro, moptions.prefix_extractor.get()));
+    std::unique_ptr<InternalIterator> iter(reader->NewIterator(
+        ro, moptions.prefix_extractor.get(), /*arena=*/nullptr,
+        /*skip_filters=*/false, TableReaderCaller::kUncategorized));
 
     iter->Seek(InternalKey("b", 0, kTypeValue).Encode());
     ASSERT_OK(iter->status());
@@ -1595,8 +1823,9 @@ TEST_P(BlockBasedTableTest, NoopTransformSeek) {
   for (int i = 0; i < 2; ++i) {
     ReadOptions ro;
     ro.total_order_seek = (i == 0);
-    std::unique_ptr<InternalIterator> iter(
-        reader->NewIterator(ro, moptions.prefix_extractor.get()));
+    std::unique_ptr<InternalIterator> iter(reader->NewIterator(
+        ro, moptions.prefix_extractor.get(), /*arena=*/nullptr,
+        /*skip_filters=*/false, TableReaderCaller::kUncategorized));
 
     iter->Seek(key.Encode());
     ASSERT_OK(iter->status());
@@ -1633,8 +1862,9 @@ TEST_P(BlockBasedTableTest, SkipPrefixBloomFilter) {
   const MutableCFOptions new_moptions(options);
   c.Reopen(new_ioptions, new_moptions);
   auto reader = c.GetTableReader();
-  std::unique_ptr<InternalIterator> db_iter(
-      reader->NewIterator(ReadOptions(), new_moptions.prefix_extractor.get()));
+  std::unique_ptr<InternalIterator> db_iter(reader->NewIterator(
+      ReadOptions(), new_moptions.prefix_extractor.get(), /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized));
 
   // Test point lookup
   // only one kv
@@ -1654,10 +1884,10 @@ static std::string RandomString(Random* rnd, int len) {
 }
 
 void AddInternalKey(TableConstructor* c, const std::string& prefix,
-                    int /*suffix_len*/ = 800) {
+                    std::string value = "v", int /*suffix_len*/ = 800) {
   static Random rnd(1023);
   InternalKey k(prefix + RandomString(&rnd, 800), 0, kTypeValue);
-  c->Add(k.Encode().ToString(), "v");
+  c->Add(k.Encode().ToString(), value);
 }
 
 void TableTest::IndexTest(BlockBasedTableOptions table_options) {
@@ -1700,8 +1930,9 @@ void TableTest::IndexTest(BlockBasedTableOptions table_options) {
   ASSERT_EQ(5u, props->num_data_blocks);
 
   // TODO(Zhongyi): update test to use MutableCFOptions
-  std::unique_ptr<InternalIterator> index_iter(
-      reader->NewIterator(ReadOptions(), moptions.prefix_extractor.get()));
+  std::unique_ptr<InternalIterator> index_iter(reader->NewIterator(
+      ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized));
 
   // -- Find keys do not exist, but have common prefix.
   std::vector<std::string> prefixes = {"001", "003", "005", "007", "009"};
@@ -1742,7 +1973,8 @@ void TableTest::IndexTest(BlockBasedTableOptions table_options) {
     auto key = prefixes[i] + "9";
     index_iter->Seek(InternalKey(key, 0, kTypeValue).Encode());
 
-    ASSERT_OK(index_iter->status());
+    ASSERT_TRUE(index_iter->status().ok() || index_iter->status().IsNotFound());
+    ASSERT_TRUE(!index_iter->status().IsNotFound() || !index_iter->Valid());
     if (i == prefixes.size() - 1) {
       // last key
       ASSERT_TRUE(!index_iter->Valid());
@@ -1769,6 +2001,19 @@ void TableTest::IndexTest(BlockBasedTableOptions table_options) {
       ASSERT_TRUE(BytewiseComparator()->Compare(prefix, ukey_prefix) < 0);
     }
   }
+  for (const auto& prefix : non_exist_prefixes) {
+    index_iter->SeekForPrev(InternalKey(prefix, 0, kTypeValue).Encode());
+    // regular_iter->Seek(prefix);
+
+    ASSERT_OK(index_iter->status());
+    // Seek to non-existing prefixes should yield either invalid, or a
+    // key with prefix greater than the target.
+    if (index_iter->Valid()) {
+      Slice ukey = ExtractUserKey(index_iter->key());
+      Slice ukey_prefix = options.prefix_extractor->Transform(ukey);
+      ASSERT_TRUE(BytewiseComparator()->Compare(prefix, ukey_prefix) > 0);
+    }
+  }
   c.ResetTableReader();
 }
 
@@ -1796,6 +2041,325 @@ TEST_P(BlockBasedTableTest, PartitionIndexTest) {
   }
 }
 
+TEST_P(BlockBasedTableTest, IndexSeekOptimizationIncomplete) {
+  std::unique_ptr<InternalKeyComparator> comparator(
+      new InternalKeyComparator(BytewiseComparator()));
+  BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+  Options options;
+  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+  const ImmutableCFOptions ioptions(options);
+  const MutableCFOptions moptions(options);
+
+  TableConstructor c(BytewiseComparator());
+  AddInternalKey(&c, "pika");
+
+  std::vector<std::string> keys;
+  stl_wrappers::KVMap kvmap;
+  c.Finish(options, ioptions, moptions, table_options, *comparator, &keys,
+           &kvmap);
+  ASSERT_EQ(1, keys.size());
+
+  auto reader = c.GetTableReader();
+  ReadOptions ropt;
+  ropt.read_tier = ReadTier::kBlockCacheTier;
+  std::unique_ptr<InternalIterator> iter(reader->NewIterator(
+      ropt, /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized));
+
+  auto ikey = [](Slice user_key) {
+    return InternalKey(user_key, 0, kTypeValue).Encode().ToString();
+  };
+
+  iter->Seek(ikey("pika"));
+  ASSERT_FALSE(iter->Valid());
+  ASSERT_TRUE(iter->status().IsIncomplete());
+
+  // This used to crash at some point.
+  iter->Seek(ikey("pika"));
+  ASSERT_FALSE(iter->Valid());
+  ASSERT_TRUE(iter->status().IsIncomplete());
+}
+
+TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey1) {
+  BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+  table_options.index_type = BlockBasedTableOptions::kBinarySearchWithFirstKey;
+  IndexTest(table_options);
+}
+
+class CustomFlushBlockPolicy : public FlushBlockPolicyFactory,
+                               public FlushBlockPolicy {
+ public:
+  explicit CustomFlushBlockPolicy(std::vector<int> keys_per_block)
+      : keys_per_block_(keys_per_block) {}
+
+  const char* Name() const override { return "table_test"; }
+  FlushBlockPolicy* NewFlushBlockPolicy(const BlockBasedTableOptions&,
+                                        const BlockBuilder&) const override {
+    return new CustomFlushBlockPolicy(keys_per_block_);
+  }
+
+  bool Update(const Slice&, const Slice&) override {
+    if (keys_in_current_block_ >= keys_per_block_.at(current_block_idx_)) {
+      ++current_block_idx_;
+      keys_in_current_block_ = 1;
+      return true;
+    }
+
+    ++keys_in_current_block_;
+    return false;
+  }
+
+  std::vector<int> keys_per_block_;
+
+  int current_block_idx_ = 0;
+  int keys_in_current_block_ = 0;
+};
+
+TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) {
+  for (int use_first_key = 0; use_first_key < 2; ++use_first_key) {
+    SCOPED_TRACE("use_first_key = " + std::to_string(use_first_key));
+    BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+    table_options.index_type =
+        use_first_key ? BlockBasedTableOptions::kBinarySearchWithFirstKey
+                      : BlockBasedTableOptions::kBinarySearch;
+    table_options.block_cache = NewLRUCache(10000);  // fits all blocks
+    table_options.index_shortening =
+        BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
+    table_options.flush_block_policy_factory =
+        std::make_shared<CustomFlushBlockPolicy>(std::vector<int>{2, 1, 3, 2});
+    Options options;
+    options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+    options.statistics = CreateDBStatistics();
+    Statistics* stats = options.statistics.get();
+    std::unique_ptr<InternalKeyComparator> comparator(
+        new InternalKeyComparator(BytewiseComparator()));
+    const ImmutableCFOptions ioptions(options);
+    const MutableCFOptions moptions(options);
+
+    TableConstructor c(BytewiseComparator());
+
+    // Block 0.
+    AddInternalKey(&c, "aaaa", "v0");
+    AddInternalKey(&c, "aaac", "v1");
+
+    // Block 1.
+    AddInternalKey(&c, "aaca", "v2");
+
+    // Block 2.
+    AddInternalKey(&c, "caaa", "v3");
+    AddInternalKey(&c, "caac", "v4");
+    AddInternalKey(&c, "caae", "v5");
+
+    // Block 3.
+    AddInternalKey(&c, "ccaa", "v6");
+    AddInternalKey(&c, "ccac", "v7");
+
+    // Write the file.
+    std::vector<std::string> keys;
+    stl_wrappers::KVMap kvmap;
+    c.Finish(options, ioptions, moptions, table_options, *comparator, &keys,
+             &kvmap);
+    ASSERT_EQ(8, keys.size());
+
+    auto reader = c.GetTableReader();
+    auto props = reader->GetTableProperties();
+    ASSERT_EQ(4u, props->num_data_blocks);
+    std::unique_ptr<InternalIterator> iter(reader->NewIterator(
+        ReadOptions(), /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
+        /*skip_filters=*/false, TableReaderCaller::kUncategorized));
+
+    // Shouldn't have read data blocks before iterator is seeked.
+    EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    auto ikey = [](Slice user_key) {
+      return InternalKey(user_key, 0, kTypeValue).Encode().ToString();
+    };
+
+    // Seek to a key between blocks. If index contains first key, we shouldn't
+    // read any data blocks until value is requested.
+    iter->Seek(ikey("aaba"));
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[2], iter->key().ToString());
+    EXPECT_EQ(use_first_key ? 0 : 1,
+              stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    EXPECT_EQ("v2", iter->value().ToString());
+    EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    // Seek to the middle of a block. The block should be read right away.
+    iter->Seek(ikey("caab"));
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[4], iter->key().ToString());
+    EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    EXPECT_EQ("v4", iter->value().ToString());
+    EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    // Seek to just before the same block and don't access value.
+    // The iterator should keep pinning the block contents.
+    iter->Seek(ikey("baaa"));
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[3], iter->key().ToString());
+    EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    // Seek to the same block again to check that the block is still pinned.
+    iter->Seek(ikey("caae"));
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[5], iter->key().ToString());
+    EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    EXPECT_EQ("v5", iter->value().ToString());
+    EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    // Step forward and fall through to the next block. Don't access value.
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[6], iter->key().ToString());
+    EXPECT_EQ(use_first_key ? 2 : 3,
+              stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    // Step forward again. Block should be read.
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[7], iter->key().ToString());
+    EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    EXPECT_EQ("v7", iter->value().ToString());
+    EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    // Step forward and reach the end.
+    iter->Next();
+    EXPECT_FALSE(iter->Valid());
+    EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    // Seek to a single-key block and step forward without accessing value.
+    iter->Seek(ikey("aaca"));
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[2], iter->key().ToString());
+    EXPECT_EQ(use_first_key ? 0 : 1,
+              stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[3], iter->key().ToString());
+    EXPECT_EQ(use_first_key ? 1 : 2,
+              stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    EXPECT_EQ("v3", iter->value().ToString());
+    EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+
+    // Seek between blocks and step back without accessing value.
+    iter->Seek(ikey("aaca"));
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[2], iter->key().ToString());
+    EXPECT_EQ(use_first_key ? 2 : 3,
+              stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+
+    iter->Prev();
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[1], iter->key().ToString());
+    EXPECT_EQ(use_first_key ? 2 : 3,
+              stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    // All blocks are in cache now, there'll be no more misses ever.
+    EXPECT_EQ(4, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+    EXPECT_EQ("v1", iter->value().ToString());
+
+    // Next into the next block again.
+    iter->Next();
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[2], iter->key().ToString());
+    EXPECT_EQ(use_first_key ? 2 : 4,
+              stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    // Seek to first and step back without accessing value.
+    iter->SeekToFirst();
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[0], iter->key().ToString());
+    EXPECT_EQ(use_first_key ? 2 : 5,
+              stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    iter->Prev();
+    EXPECT_FALSE(iter->Valid());
+    EXPECT_EQ(use_first_key ? 2 : 5,
+              stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    // Do some SeekForPrev() and SeekToLast() just to cover all methods.
+    iter->SeekForPrev(ikey("caad"));
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[4], iter->key().ToString());
+    EXPECT_EQ(use_first_key ? 3 : 6,
+              stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    EXPECT_EQ("v4", iter->value().ToString());
+    EXPECT_EQ(use_first_key ? 3 : 6,
+              stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    iter->SeekToLast();
+    ASSERT_TRUE(iter->Valid());
+    EXPECT_EQ(keys[7], iter->key().ToString());
+    EXPECT_EQ(use_first_key ? 4 : 7,
+              stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+    EXPECT_EQ("v7", iter->value().ToString());
+    EXPECT_EQ(use_first_key ? 4 : 7,
+              stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+
+    EXPECT_EQ(4, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+
+    c.ResetTableReader();
+  }
+}
+
+TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKeyGlobalSeqno) {
+  BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+  table_options.index_type = BlockBasedTableOptions::kBinarySearchWithFirstKey;
+  table_options.block_cache = NewLRUCache(10000);
+  Options options;
+  options.statistics = CreateDBStatistics();
+  Statistics* stats = options.statistics.get();
+  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+  std::unique_ptr<InternalKeyComparator> comparator(
+      new InternalKeyComparator(BytewiseComparator()));
+  const ImmutableCFOptions ioptions(options);
+  const MutableCFOptions moptions(options);
+
+  TableConstructor c(BytewiseComparator(), /* convert_to_internal_key */ false,
+                     /* level */ -1, /* largest_seqno */ 42);
+
+  c.Add(InternalKey("b", 0, kTypeValue).Encode().ToString(), "x");
+  c.Add(InternalKey("c", 0, kTypeValue).Encode().ToString(), "y");
+
+  std::vector<std::string> keys;
+  stl_wrappers::KVMap kvmap;
+  c.Finish(options, ioptions, moptions, table_options, *comparator, &keys,
+           &kvmap);
+  ASSERT_EQ(2, keys.size());
+
+  auto reader = c.GetTableReader();
+  auto props = reader->GetTableProperties();
+  ASSERT_EQ(1u, props->num_data_blocks);
+  std::unique_ptr<InternalIterator> iter(reader->NewIterator(
+      ReadOptions(), /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized));
+
+  iter->Seek(InternalKey("a", 0, kTypeValue).Encode().ToString());
+  ASSERT_TRUE(iter->Valid());
+  EXPECT_EQ(InternalKey("b", 42, kTypeValue).Encode().ToString(),
+            iter->key().ToString());
+  EXPECT_NE(keys[0], iter->key().ToString());
+  // Key should have been served from index, without reading data blocks.
+  EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+
+  EXPECT_EQ("x", iter->value().ToString());
+  EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
+  EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
+  EXPECT_EQ(InternalKey("b", 42, kTypeValue).Encode().ToString(),
+            iter->key().ToString());
+
+  c.ResetTableReader();
+}
+
 // It's very hard to figure out the index block size of a block accurately.
 // To make sure we get the index size, we just make sure as key number
 // grows, the filter block size also grows.
@@ -1867,6 +2431,187 @@ TEST_P(BlockBasedTableTest, NumBlockStat) {
   c.ResetTableReader();
 }
 
+TEST_P(BlockBasedTableTest, TracingGetTest) {
+  TableConstructor c(BytewiseComparator());
+  Options options;
+  BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+  options.create_if_missing = true;
+  table_options.block_cache = NewLRUCache(1024 * 1024, 0);
+  table_options.cache_index_and_filter_blocks = true;
+  table_options.filter_policy.reset(NewBloomFilterPolicy(10, true));
+  options.table_factory.reset(new BlockBasedTableFactory(table_options));
+  SetupTracingTest(&c);
+  std::vector<std::string> keys;
+  stl_wrappers::KVMap kvmap;
+  ImmutableCFOptions ioptions(options);
+  MutableCFOptions moptions(options);
+  c.Finish(options, ioptions, moptions, table_options,
+           GetPlainInternalComparator(options.comparator), &keys, &kvmap);
+  std::string user_key = "k01";
+  InternalKey internal_key(user_key, 0, kTypeValue);
+  std::string encoded_key = internal_key.Encode().ToString();
+  for (uint32_t i = 1; i <= 2; i++) {
+    PinnableSlice value;
+    GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
+                           GetContext::kNotFound, user_key, &value, nullptr,
+                           nullptr, true, nullptr, nullptr, nullptr, nullptr,
+                           nullptr, nullptr, /*tracing_get_id=*/i);
+    get_perf_context()->Reset();
+    ASSERT_OK(c.GetTableReader()->Get(ReadOptions(), encoded_key, &get_context,
+                                      moptions.prefix_extractor.get()));
+    ASSERT_EQ(get_context.State(), GetContext::kFound);
+    ASSERT_EQ(value.ToString(), kDummyValue);
+  }
+
+  // Verify traces.
+  std::vector<BlockCacheTraceRecord> expected_records;
+  // The first two records should be prefetching index and filter blocks.
+  BlockCacheTraceRecord record;
+  record.block_type = TraceType::kBlockTraceIndexBlock;
+  record.caller = TableReaderCaller::kPrefetch;
+  record.is_cache_hit = Boolean::kFalse;
+  record.no_insert = Boolean::kFalse;
+  expected_records.push_back(record);
+  record.block_type = TraceType::kBlockTraceFilterBlock;
+  expected_records.push_back(record);
+  // Then we should have three records for one index, one filter, and one data
+  // block access.
+  record.get_id = 1;
+  record.block_type = TraceType::kBlockTraceIndexBlock;
+  record.caller = TableReaderCaller::kUserGet;
+  record.get_from_user_specified_snapshot = Boolean::kFalse;
+  record.referenced_key = encoded_key;
+  record.referenced_key_exist_in_block = Boolean::kTrue;
+  record.is_cache_hit = Boolean::kTrue;
+  expected_records.push_back(record);
+  record.block_type = TraceType::kBlockTraceFilterBlock;
+  expected_records.push_back(record);
+  record.is_cache_hit = Boolean::kFalse;
+  record.block_type = TraceType::kBlockTraceDataBlock;
+  expected_records.push_back(record);
+  // The second get should all observe cache hits.
+  record.is_cache_hit = Boolean::kTrue;
+  record.get_id = 2;
+  record.block_type = TraceType::kBlockTraceIndexBlock;
+  record.caller = TableReaderCaller::kUserGet;
+  record.get_from_user_specified_snapshot = Boolean::kFalse;
+  record.referenced_key = encoded_key;
+  expected_records.push_back(record);
+  record.block_type = TraceType::kBlockTraceFilterBlock;
+  expected_records.push_back(record);
+  record.block_type = TraceType::kBlockTraceDataBlock;
+  expected_records.push_back(record);
+  VerifyBlockAccessTrace(&c, expected_records);
+  c.ResetTableReader();
+}
+
+TEST_P(BlockBasedTableTest, TracingApproximateOffsetOfTest) {
+  TableConstructor c(BytewiseComparator());
+  Options options;
+  BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+  options.create_if_missing = true;
+  table_options.block_cache = NewLRUCache(1024 * 1024, 0);
+  table_options.cache_index_and_filter_blocks = true;
+  table_options.filter_policy.reset(NewBloomFilterPolicy(10, true));
+  options.table_factory.reset(new BlockBasedTableFactory(table_options));
+  SetupTracingTest(&c);
+  std::vector<std::string> keys;
+  stl_wrappers::KVMap kvmap;
+  ImmutableCFOptions ioptions(options);
+  MutableCFOptions moptions(options);
+  c.Finish(options, ioptions, moptions, table_options,
+           GetPlainInternalComparator(options.comparator), &keys, &kvmap);
+  for (uint32_t i = 1; i <= 2; i++) {
+    std::string user_key = "k01";
+    InternalKey internal_key(user_key, 0, kTypeValue);
+    std::string encoded_key = internal_key.Encode().ToString();
+    c.GetTableReader()->ApproximateOffsetOf(
+        encoded_key, TableReaderCaller::kUserApproximateSize);
+  }
+  // Verify traces.
+  std::vector<BlockCacheTraceRecord> expected_records;
+  // The first two records should be prefetching index and filter blocks.
+  BlockCacheTraceRecord record;
+  record.block_type = TraceType::kBlockTraceIndexBlock;
+  record.caller = TableReaderCaller::kPrefetch;
+  record.is_cache_hit = Boolean::kFalse;
+  record.no_insert = Boolean::kFalse;
+  expected_records.push_back(record);
+  record.block_type = TraceType::kBlockTraceFilterBlock;
+  expected_records.push_back(record);
+  // Then we should have two records for only index blocks.
+  record.block_type = TraceType::kBlockTraceIndexBlock;
+  record.caller = TableReaderCaller::kUserApproximateSize;
+  record.is_cache_hit = Boolean::kTrue;
+  expected_records.push_back(record);
+  expected_records.push_back(record);
+  VerifyBlockAccessTrace(&c, expected_records);
+  c.ResetTableReader();
+}
+
+TEST_P(BlockBasedTableTest, TracingIterator) {
+  TableConstructor c(BytewiseComparator());
+  Options options;
+  BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+  options.create_if_missing = true;
+  table_options.block_cache = NewLRUCache(1024 * 1024, 0);
+  table_options.cache_index_and_filter_blocks = true;
+  table_options.filter_policy.reset(NewBloomFilterPolicy(10, true));
+  options.table_factory.reset(new BlockBasedTableFactory(table_options));
+  SetupTracingTest(&c);
+  std::vector<std::string> keys;
+  stl_wrappers::KVMap kvmap;
+  ImmutableCFOptions ioptions(options);
+  MutableCFOptions moptions(options);
+  c.Finish(options, ioptions, moptions, table_options,
+           GetPlainInternalComparator(options.comparator), &keys, &kvmap);
+
+  for (uint32_t i = 1; i <= 2; i++) {
+    std::unique_ptr<InternalIterator> iter(c.GetTableReader()->NewIterator(
+        ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr,
+        /*skip_filters=*/false, TableReaderCaller::kUserIterator));
+    iter->SeekToFirst();
+    while (iter->Valid()) {
+      iter->key();
+      iter->value();
+      iter->Next();
+    }
+    ASSERT_OK(iter->status());
+    iter.reset();
+  }
+
+  // Verify traces.
+  std::vector<BlockCacheTraceRecord> expected_records;
+  // The first two records should be prefetching index and filter blocks.
+  BlockCacheTraceRecord record;
+  record.block_type = TraceType::kBlockTraceIndexBlock;
+  record.caller = TableReaderCaller::kPrefetch;
+  record.is_cache_hit = Boolean::kFalse;
+  record.no_insert = Boolean::kFalse;
+  expected_records.push_back(record);
+  record.block_type = TraceType::kBlockTraceFilterBlock;
+  expected_records.push_back(record);
+  // Then we should have three records for index and two data block access.
+  record.block_type = TraceType::kBlockTraceIndexBlock;
+  record.caller = TableReaderCaller::kUserIterator;
+  record.is_cache_hit = Boolean::kTrue;
+  expected_records.push_back(record);
+  record.block_type = TraceType::kBlockTraceDataBlock;
+  record.is_cache_hit = Boolean::kFalse;
+  expected_records.push_back(record);
+  expected_records.push_back(record);
+  // When we iterate this file for the second time, we should observe all cache
+  // hits.
+  record.block_type = TraceType::kBlockTraceIndexBlock;
+  record.is_cache_hit = Boolean::kTrue;
+  expected_records.push_back(record);
+  record.block_type = TraceType::kBlockTraceDataBlock;
+  expected_records.push_back(record);
+  expected_records.push_back(record);
+  VerifyBlockAccessTrace(&c, expected_records);
+  c.ResetTableReader();
+}
+
 // A simple tool that takes the snapshot of block cache statistics.
 class BlockCachePropertiesSnapshot {
  public:
@@ -1952,8 +2697,8 @@ TEST_P(BlockBasedTableTest, BlockCacheDisabledTest) {
 
   // preloading filter/index blocks is enabled.
   auto reader = dynamic_cast<BlockBasedTable*>(c.GetTableReader());
-  ASSERT_TRUE(reader->TEST_filter_block_preloaded());
-  ASSERT_TRUE(reader->TEST_index_reader_preloaded());
+  ASSERT_FALSE(reader->TEST_FilterBlockInCache());
+  ASSERT_FALSE(reader->TEST_IndexBlockInCache());
 
   {
     // nothing happens in the beginning
@@ -1965,7 +2710,7 @@ TEST_P(BlockBasedTableTest, BlockCacheDisabledTest) {
   {
     GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
                            GetContext::kNotFound, Slice(), nullptr, nullptr,
-                           nullptr, nullptr, nullptr);
+                           nullptr, true, nullptr, nullptr);
     // a hack that just to trigger BlockBasedTable::GetFilter.
     reader->Get(ReadOptions(), "non-exist-key", &get_context,
                 moptions.prefix_extractor.get());
@@ -1985,7 +2730,11 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
 
   // Enable the cache for index/filter blocks
   BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
-  table_options.block_cache = NewLRUCache(2048, 2);
+  LRUCacheOptions co;
+  co.capacity = 2048;
+  co.num_shard_bits = 2;
+  co.metadata_charge_policy = kDontChargeCacheMetadata;
+  table_options.block_cache = NewLRUCache(co);
   table_options.cache_index_and_filter_blocks = true;
   options.table_factory.reset(new BlockBasedTableFactory(table_options));
   std::vector<std::string> keys;
@@ -1999,8 +2748,8 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
            GetPlainInternalComparator(options.comparator), &keys, &kvmap);
   // preloading filter/index blocks is prohibited.
   auto* reader = dynamic_cast<BlockBasedTable*>(c.GetTableReader());
-  ASSERT_TRUE(!reader->TEST_filter_block_preloaded());
-  ASSERT_TRUE(!reader->TEST_index_reader_preloaded());
+  ASSERT_FALSE(reader->TEST_FilterBlockInCache());
+  ASSERT_TRUE(reader->TEST_IndexBlockInCache());
 
   // -- PART 1: Open with regular block cache.
   // Since block_cache is disabled, no cache activities will be involved.
@@ -2015,7 +2764,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
                       0, 0, 0);
     ASSERT_EQ(props.GetCacheBytesRead(), 0);
     ASSERT_EQ(props.GetCacheBytesWrite(),
-              table_options.block_cache->GetUsage());
+              static_cast<int64_t>(table_options.block_cache->GetUsage()));
     last_cache_bytes_read = props.GetCacheBytesRead();
   }
 
@@ -2031,7 +2780,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
     // Cache hit, bytes read from cache should increase
     ASSERT_GT(props.GetCacheBytesRead(), last_cache_bytes_read);
     ASSERT_EQ(props.GetCacheBytesWrite(),
-              table_options.block_cache->GetUsage());
+              static_cast<int64_t>(table_options.block_cache->GetUsage()));
     last_cache_bytes_read = props.GetCacheBytesRead();
   }
 
@@ -2044,7 +2793,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
     // Cache miss, Bytes read from cache should not change
     ASSERT_EQ(props.GetCacheBytesRead(), last_cache_bytes_read);
     ASSERT_EQ(props.GetCacheBytesWrite(),
-              table_options.block_cache->GetUsage());
+              static_cast<int64_t>(table_options.block_cache->GetUsage()));
     last_cache_bytes_read = props.GetCacheBytesRead();
   }
 
@@ -2058,7 +2807,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
     // Cache hit, bytes read from cache should increase
     ASSERT_GT(props.GetCacheBytesRead(), last_cache_bytes_read);
     ASSERT_EQ(props.GetCacheBytesWrite(),
-              table_options.block_cache->GetUsage());
+              static_cast<int64_t>(table_options.block_cache->GetUsage()));
   }
   // release the iterator so that the block cache can reset correctly.
   iter.reset();
@@ -2132,11 +2881,11 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) {
   MutableCFOptions moptions4(options);
   ASSERT_OK(c3.Reopen(ioptions4, moptions4));
   reader = dynamic_cast<BlockBasedTable*>(c3.GetTableReader());
-  ASSERT_TRUE(!reader->TEST_filter_block_preloaded());
+  ASSERT_FALSE(reader->TEST_FilterBlockInCache());
   PinnableSlice value;
   GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
                          GetContext::kNotFound, user_key, &value, nullptr,
-                         nullptr, nullptr, nullptr);
+                         nullptr, true, nullptr, nullptr);
   ASSERT_OK(reader->Get(ReadOptions(), internal_key.Encode(), &get_context,
                         moptions4.prefix_extractor.get()));
   ASSERT_STREQ(value.data(), "hello");
@@ -2219,21 +2968,25 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) {
                GetPlainInternalComparator(options.comparator), &keys, &kvmap);
       auto reader = c.GetTableReader();
       PinnableSlice value;
-      GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
-                             GetContext::kNotFound, user_key, &value, nullptr,
-                             nullptr, nullptr, nullptr);
-      get_perf_context()->Reset();
-      ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context,
-                            moptions.prefix_extractor.get()));
-      if (index_and_filter_in_cache) {
-        // data, index and filter block
-        ASSERT_EQ(get_perf_context()->block_read_count, 3);
-      } else {
-        // just the data block
-        ASSERT_EQ(get_perf_context()->block_read_count, 1);
+      {
+        GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
+                               GetContext::kNotFound, user_key, &value, nullptr,
+                               nullptr, true, nullptr, nullptr);
+        get_perf_context()->Reset();
+        ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context,
+                              moptions.prefix_extractor.get()));
+        if (index_and_filter_in_cache) {
+          // data, index and filter block
+          ASSERT_EQ(get_perf_context()->block_read_count, 3);
+          ASSERT_EQ(get_perf_context()->index_block_read_count, 1);
+          ASSERT_EQ(get_perf_context()->filter_block_read_count, 1);
+        } else {
+          // just the data block
+          ASSERT_EQ(get_perf_context()->block_read_count, 1);
+        }
+        ASSERT_EQ(get_context.State(), GetContext::kFound);
+        ASSERT_STREQ(value.data(), "hello");
       }
-      ASSERT_EQ(get_context.State(), GetContext::kFound);
-      ASSERT_STREQ(value.data(), "hello");
 
       // Get non-existing key
       user_key = "does-not-exist";
@@ -2241,21 +2994,26 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) {
       encoded_key = internal_key.Encode().ToString();
 
       value.Reset();
-      get_context = GetContext(options.comparator, nullptr, nullptr, nullptr,
+      {
+        GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
                                GetContext::kNotFound, user_key, &value, nullptr,
-                               nullptr, nullptr, nullptr);
-      get_perf_context()->Reset();
-      ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context,
-                            moptions.prefix_extractor.get()));
-      ASSERT_EQ(get_context.State(), GetContext::kNotFound);
+                               nullptr, true, nullptr, nullptr);
+        get_perf_context()->Reset();
+        ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context,
+                              moptions.prefix_extractor.get()));
+        ASSERT_EQ(get_context.State(), GetContext::kNotFound);
+      }
 
       if (index_and_filter_in_cache) {
         if (bloom_filter_type == 0) {
           // with block-based, we read index and then the filter
           ASSERT_EQ(get_perf_context()->block_read_count, 2);
+          ASSERT_EQ(get_perf_context()->index_block_read_count, 1);
+          ASSERT_EQ(get_perf_context()->filter_block_read_count, 1);
         } else {
           // with full-filter, we read filter first and then we stop
           ASSERT_EQ(get_perf_context()->block_read_count, 1);
+          ASSERT_EQ(get_perf_context()->filter_block_read_count, 1);
         }
       } else {
         // filter is already in memory and it figures out that the key doesn't
@@ -2266,176 +3024,6 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) {
   }
 }
 
-// A wrapper around LRICache that also keeps track of data blocks (in contrast
-// with the objects) in the cache. The class is very simple and can be used only
-// for trivial tests.
-class MockCache : public LRUCache {
- public:
-  MockCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
-            double high_pri_pool_ratio)
-      : LRUCache(capacity, num_shard_bits, strict_capacity_limit,
-                 high_pri_pool_ratio) {}
-  Status Insert(const Slice& key, void* value, size_t charge,
-                void (*deleter)(const Slice& key, void* value),
-                Handle** handle = nullptr,
-                Priority priority = Priority::LOW) override {
-    // Replace the deleter with our own so that we keep track of data blocks
-    // erased from the cache
-    deleters_[key.ToString()] = deleter;
-    return ShardedCache::Insert(key, value, charge, &MockDeleter, handle,
-                                priority);
-  }
-  // This is called by the application right after inserting a data block
-  void TEST_mark_as_data_block(const Slice& key, size_t charge) override {
-    marked_data_in_cache_[key.ToString()] = charge;
-    marked_size_ += charge;
-  }
-  using DeleterFunc = void (*)(const Slice& key, void* value);
-  static std::map<std::string, DeleterFunc> deleters_;
-  static std::map<std::string, size_t> marked_data_in_cache_;
-  static size_t marked_size_;
-  static void MockDeleter(const Slice& key, void* value) {
-    // If the item was marked for being data block, decrease its usage from  the
-    // total data block usage of the cache
-    if (marked_data_in_cache_.find(key.ToString()) !=
-        marked_data_in_cache_.end()) {
-      marked_size_ -= marked_data_in_cache_[key.ToString()];
-    }
-    // Then call the origianl deleter
-    assert(deleters_.find(key.ToString()) != deleters_.end());
-    auto deleter = deleters_[key.ToString()];
-    deleter(key, value);
-  }
-};
-
-size_t MockCache::marked_size_ = 0;
-std::map<std::string, MockCache::DeleterFunc> MockCache::deleters_;
-std::map<std::string, size_t> MockCache::marked_data_in_cache_;
-
-// Block cache can contain raw data blocks as well as general objects. If an
-// object depends on the table to be live, it then must be destructed before the
-// table is closed. This test makes sure that the only items remains in the
-// cache after the table is closed are raw data blocks.
-TEST_P(BlockBasedTableTest, NoObjectInCacheAfterTableClose) {
-  std::vector<CompressionType> compression_types{kNoCompression};
-
-  // The following are the compression library versions supporting compression
-  // dictionaries. See the test case CacheCompressionDict in the
-  // DBBlockCacheTest suite.
-#ifdef ZLIB
-  compression_types.push_back(kZlibCompression);
-#endif  // ZLIB
-#if LZ4_VERSION_NUMBER >= 10400
-  compression_types.push_back(kLZ4Compression);
-  compression_types.push_back(kLZ4HCCompression);
-#endif  // LZ4_VERSION_NUMBER >= 10400
-#if ZSTD_VERSION_NUMBER >= 500
-  compression_types.push_back(kZSTD);
-#endif  // ZSTD_VERSION_NUMBER >= 500
-
-  for (int level: {-1, 0, 1, 10}) {
-    for (auto index_type :
-        {BlockBasedTableOptions::IndexType::kBinarySearch,
-        BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch}) {
-      for (bool block_based_filter : {true, false}) {
-        for (bool partition_filter : {true, false}) {
-          if (partition_filter &&
-              (block_based_filter ||
-               index_type !=
-               BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch)) {
-            continue;
-          }
-          for (bool index_and_filter_in_cache : {true, false}) {
-            for (bool pin_l0 : {true, false}) {
-              for (bool pin_top_level : {true, false}) {
-                if (pin_l0 && !index_and_filter_in_cache) {
-                  continue;
-                }
-
-                for (auto compression_type : compression_types) {
-                  for (uint32_t max_dict_bytes : {0, 1 << 14}) {
-                    if (compression_type == kNoCompression && max_dict_bytes)
-                      continue;
-
-                    // Create a table
-                    Options opt;
-                    std::unique_ptr<InternalKeyComparator> ikc;
-                    ikc.reset(new test::PlainInternalKeyComparator(
-                      opt.comparator));
-                    opt.compression = compression_type;
-                    opt.compression_opts.max_dict_bytes = max_dict_bytes;
-                    BlockBasedTableOptions table_options =
-                      GetBlockBasedTableOptions();
-                    table_options.block_size = 1024;
-                    table_options.index_type = index_type;
-                    table_options.pin_l0_filter_and_index_blocks_in_cache =
-                      pin_l0;
-                    table_options.pin_top_level_index_and_filter =
-                      pin_top_level;
-                    table_options.partition_filters = partition_filter;
-                    table_options.cache_index_and_filter_blocks =
-                      index_and_filter_in_cache;
-                    // big enough so we don't ever lose cached values.
-                    table_options.block_cache = std::make_shared<MockCache>(
-                      16 * 1024 * 1024, 4, false, 0.0);
-                    table_options.filter_policy.reset(
-                      rocksdb::NewBloomFilterPolicy(10, block_based_filter));
-                    opt.table_factory.reset(NewBlockBasedTableFactory(
-                      table_options));
-
-                    bool convert_to_internal_key = false;
-                    TableConstructor c(BytewiseComparator(),
-                      convert_to_internal_key, level);
-                    std::string user_key = "k01";
-                    std::string key =
-                      InternalKey(user_key, 0, kTypeValue).Encode().ToString();
-                    c.Add(key, "hello");
-                    std::vector<std::string> keys;
-                    stl_wrappers::KVMap kvmap;
-                    const ImmutableCFOptions ioptions(opt);
-                    const MutableCFOptions moptions(opt);
-                    c.Finish(opt, ioptions, moptions, table_options, *ikc,
-                      &keys, &kvmap);
-
-                    // Doing a read to make index/filter loaded into the cache
-                    auto table_reader =
-                      dynamic_cast<BlockBasedTable*>(c.GetTableReader());
-                    PinnableSlice value;
-                    GetContext get_context(opt.comparator, nullptr, nullptr,
-                      nullptr, GetContext::kNotFound, user_key, &value,
-                      nullptr, nullptr, nullptr, nullptr);
-                    InternalKey ikey(user_key, 0, kTypeValue);
-                    auto s = table_reader->Get(ReadOptions(), key, &get_context,
-                      moptions.prefix_extractor.get());
-                    ASSERT_EQ(get_context.State(), GetContext::kFound);
-                    ASSERT_STREQ(value.data(), "hello");
-
-                    // Close the table
-                    c.ResetTableReader();
-
-                    auto usage = table_options.block_cache->GetUsage();
-                    auto pinned_usage =
-                      table_options.block_cache->GetPinnedUsage();
-                    // The only usage must be for marked data blocks
-                    ASSERT_EQ(usage, MockCache::marked_size_);
-                    // There must be some pinned data since PinnableSlice has
-                    // not released them yet
-                    ASSERT_GT(pinned_usage, 0);
-                    // Release pinnable slice reousrces
-                    value.Reset();
-                    pinned_usage = table_options.block_cache->GetPinnedUsage();
-                    ASSERT_EQ(pinned_usage, 0);
-                  }
-                }
-              }
-            }
-          }
-        }
-      }
-    }
-  } // level
-}
-
 TEST_P(BlockBasedTableTest, BlockCacheLeak) {
   // Check that when we reopen a table we don't lose access to blocks already
   // in the cache. This test checks whether the Table actually makes use of the
@@ -2572,67 +3160,88 @@ TEST_P(BlockBasedTableTest, MemoryAllocator) {
   EXPECT_GT(custom_memory_allocator->numAllocations.load(), 0);
 }
 
-TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) {
-  // A regression test to avoid data race described in
-  // https://github.com/facebook/rocksdb/issues/1267
-  TableConstructor c(BytewiseComparator(), true /* convert_to_internal_key_ */);
-  std::vector<std::string> keys;
-  stl_wrappers::KVMap kvmap;
-  c.Add("a1", "val1");
+// Test the file checksum of block based table
+TEST_P(BlockBasedTableTest, NoFileChecksum) {
   Options options;
-  options.prefix_extractor.reset(NewFixedPrefixTransform(1));
+  ImmutableCFOptions ioptions(options);
+  MutableCFOptions moptions(options);
   BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
-  table_options.index_type = BlockBasedTableOptions::kHashSearch;
-  table_options.cache_index_and_filter_blocks = true;
-  table_options.block_cache = NewLRUCache(0);
-  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
-  const ImmutableCFOptions ioptions(options);
-  const MutableCFOptions moptions(options);
-  c.Finish(options, ioptions, moptions, table_options,
-           GetPlainInternalComparator(options.comparator), &keys, &kvmap);
+  std::unique_ptr<InternalKeyComparator> comparator(
+      new InternalKeyComparator(BytewiseComparator()));
+  SequenceNumber largest_seqno = 0;
+  int level = 0;
+  std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
+      int_tbl_prop_collector_factories;
 
-  rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
-      {
-          {"BlockBasedTable::NewIndexIterator::thread1:1",
-           "BlockBasedTable::NewIndexIterator::thread2:2"},
-          {"BlockBasedTable::NewIndexIterator::thread2:3",
-           "BlockBasedTable::NewIndexIterator::thread1:4"},
-      },
-      {
-          {"BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker",
-           "BlockBasedTable::NewIndexIterator::thread1:1"},
-          {"BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker",
-           "BlockBasedTable::NewIndexIterator::thread1:4"},
-          {"BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker",
-           "BlockBasedTable::NewIndexIterator::thread2:2"},
-          {"BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker",
-           "BlockBasedTable::NewIndexIterator::thread2:3"},
-      });
-
-  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
-  ReadOptions ro;
-  auto* reader = c.GetTableReader();
+  if (largest_seqno != 0) {
+    // Pretend that it's an external file written by SstFileWriter.
+    int_tbl_prop_collector_factories.emplace_back(
+        new SstFileWriterPropertiesCollectorFactory(2 /* version */,
+                                                    0 /* global_seqno*/));
+  }
+  std::string column_family_name;
 
-  std::function<void()> func1 = [&]() {
-    TEST_SYNC_POINT("BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker");
-    // TODO(Zhongyi): update test to use MutableCFOptions
-    std::unique_ptr<InternalIterator> iter(
-        reader->NewIterator(ro, moptions.prefix_extractor.get()));
-    iter->Seek(InternalKey("a1", 0, kTypeValue).Encode());
-  };
+  FileChecksumTestHelper f(true);
+  f.CreateWriteableFile();
+  std::unique_ptr<TableBuilder> builder;
+  builder.reset(ioptions.table_factory->NewTableBuilder(
+      TableBuilderOptions(ioptions, moptions, *comparator,
+                          &int_tbl_prop_collector_factories,
+                          options.compression, options.sample_for_compression,
+                          options.compression_opts, false /* skip_filters */,
+                          column_family_name, level),
+      TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
+      f.GetFileWriter()));
+  f.ResetTableBuilder(std::move(builder));
+  f.AddKVtoKVMap(1000);
+  f.WriteKVAndFlushTable();
+  ASSERT_STREQ(f.GetFileChecksumFuncName(),
+               kUnknownFileChecksumFuncName.c_str());
+  ASSERT_STREQ(f.GetFileChecksum().c_str(), kUnknownFileChecksum.c_str());
+}
 
-  std::function<void()> func2 = [&]() {
-    TEST_SYNC_POINT("BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker");
-    std::unique_ptr<InternalIterator> iter(
-        reader->NewIterator(ro, moptions.prefix_extractor.get()));
-  };
+TEST_P(BlockBasedTableTest, Crc32FileChecksum) {
+  Options options;
+  options.sst_file_checksum_func =
+      std::shared_ptr<FileChecksumFunc>(CreateFileChecksumFuncCrc32c());
+  ImmutableCFOptions ioptions(options);
+  MutableCFOptions moptions(options);
+  BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
+  std::unique_ptr<InternalKeyComparator> comparator(
+      new InternalKeyComparator(BytewiseComparator()));
+  SequenceNumber largest_seqno = 0;
+  int level = 0;
+  std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
+      int_tbl_prop_collector_factories;
 
-  auto thread1 = port::Thread(func1);
-  auto thread2 = port::Thread(func2);
-  thread1.join();
-  thread2.join();
-  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
-  c.ResetTableReader();
+  if (largest_seqno != 0) {
+    // Pretend that it's an external file written by SstFileWriter.
+    int_tbl_prop_collector_factories.emplace_back(
+        new SstFileWriterPropertiesCollectorFactory(2 /* version */,
+                                                    0 /* global_seqno*/));
+  }
+  std::string column_family_name;
+
+  FileChecksumTestHelper f(true);
+  f.CreateWriteableFile();
+  f.SetFileChecksumFunc(options.sst_file_checksum_func.get());
+  std::unique_ptr<TableBuilder> builder;
+  builder.reset(ioptions.table_factory->NewTableBuilder(
+      TableBuilderOptions(ioptions, moptions, *comparator,
+                          &int_tbl_prop_collector_factories,
+                          options.compression, options.sample_for_compression,
+                          options.compression_opts, false /* skip_filters */,
+                          column_family_name, level),
+      TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
+      f.GetFileWriter()));
+  f.ResetTableBuilder(std::move(builder));
+  f.AddKVtoKVMap(1000);
+  f.WriteKVAndFlushTable();
+  ASSERT_STREQ(f.GetFileChecksumFuncName(), "FileChecksumCrc32c");
+  std::string checksum;
+  ASSERT_OK(
+      f.CalculateFileChecksum(options.sst_file_checksum_func.get(), &checksum));
+  ASSERT_STREQ(f.GetFileChecksum().c_str(), checksum.c_str());
 }
 
 // Plain table is not supported in ROCKSDB_LITE
@@ -2673,7 +3282,7 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
   file_writer->Flush();
 
   test::StringSink* ss =
-    static_cast<test::StringSink*>(file_writer->writable_file());
+      ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter(file_writer.get());
   std::unique_ptr<RandomAccessFileReader> file_reader(
       test::GetRandomAccessFileReader(
           new test::StringSource(ss->contents(), 72242, true)));
@@ -2692,6 +3301,78 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
   ASSERT_EQ(26ul, props->num_entries);
   ASSERT_EQ(1ul, props->num_data_blocks);
 }
+
+TEST_F(PlainTableTest, NoFileChecksum) {
+  PlainTableOptions plain_table_options;
+  plain_table_options.user_key_len = 20;
+  plain_table_options.bloom_bits_per_key = 8;
+  plain_table_options.hash_table_ratio = 0;
+  PlainTableFactory factory(plain_table_options);
+
+  Options options;
+  const ImmutableCFOptions ioptions(options);
+  const MutableCFOptions moptions(options);
+  InternalKeyComparator ikc(options.comparator);
+  std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
+      int_tbl_prop_collector_factories;
+  std::string column_family_name;
+  int unknown_level = -1;
+  FileChecksumTestHelper f(true);
+  f.CreateWriteableFile();
+
+  std::unique_ptr<TableBuilder> builder(factory.NewTableBuilder(
+      TableBuilderOptions(
+          ioptions, moptions, ikc, &int_tbl_prop_collector_factories,
+          kNoCompression, 0 /* sample_for_compression */, CompressionOptions(),
+          false /* skip_filters */, column_family_name, unknown_level),
+      TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
+      f.GetFileWriter()));
+  f.ResetTableBuilder(std::move(builder));
+  f.AddKVtoKVMap(1000);
+  f.WriteKVAndFlushTable();
+  ASSERT_STREQ(f.GetFileChecksumFuncName(),
+               kUnknownFileChecksumFuncName.c_str());
+  EXPECT_EQ(f.GetFileChecksum(), kUnknownFileChecksum.c_str());
+}
+
+TEST_F(PlainTableTest, Crc32FileChecksum) {
+  PlainTableOptions plain_table_options;
+  plain_table_options.user_key_len = 20;
+  plain_table_options.bloom_bits_per_key = 8;
+  plain_table_options.hash_table_ratio = 0;
+  PlainTableFactory factory(plain_table_options);
+
+  Options options;
+  options.sst_file_checksum_func =
+      std::shared_ptr<FileChecksumFunc>(CreateFileChecksumFuncCrc32c());
+  const ImmutableCFOptions ioptions(options);
+  const MutableCFOptions moptions(options);
+  InternalKeyComparator ikc(options.comparator);
+  std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
+      int_tbl_prop_collector_factories;
+  std::string column_family_name;
+  int unknown_level = -1;
+  FileChecksumTestHelper f(true);
+  f.CreateWriteableFile();
+  f.SetFileChecksumFunc(options.sst_file_checksum_func.get());
+
+  std::unique_ptr<TableBuilder> builder(factory.NewTableBuilder(
+      TableBuilderOptions(
+          ioptions, moptions, ikc, &int_tbl_prop_collector_factories,
+          kNoCompression, 0 /* sample_for_compression */, CompressionOptions(),
+          false /* skip_filters */, column_family_name, unknown_level),
+      TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
+      f.GetFileWriter()));
+  f.ResetTableBuilder(std::move(builder));
+  f.AddKVtoKVMap(1000);
+  f.WriteKVAndFlushTable();
+  ASSERT_STREQ(f.GetFileChecksumFuncName(), "FileChecksumCrc32c");
+  std::string checksum;
+  ASSERT_OK(
+      f.CalculateFileChecksum(options.sst_file_checksum_func.get(), &checksum));
+  EXPECT_STREQ(f.GetFileChecksum().c_str(), checksum.c_str());
+}
+
 #endif  // !ROCKSDB_LITE
 
 TEST_F(GeneralTableTest, ApproximateOffsetOfPlain) {
@@ -2910,7 +3591,8 @@ TEST_F(MemTableTest, Simple) {
   batch.DeleteRange(std::string("begin"), std::string("end"));
   ColumnFamilyMemTablesDefault cf_mems_default(memtable);
   ASSERT_TRUE(
-      WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr).ok());
+      WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr, nullptr)
+          .ok());
 
   for (int i = 0; i < 2; ++i) {
     Arena arena;
@@ -3154,8 +3836,9 @@ TEST_P(IndexBlockRestartIntervalTest, IndexBlockRestartInterval) {
            &kvmap);
   auto reader = c.GetTableReader();
 
-  std::unique_ptr<InternalIterator> db_iter(
-      reader->NewIterator(ReadOptions(), moptions.prefix_extractor.get()));
+  std::unique_ptr<InternalIterator> db_iter(reader->NewIterator(
+      ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized));
 
   // Test point lookup
   for (auto& kv : kvmap) {
@@ -3186,24 +3869,27 @@ class PrefixTest : public testing::Test {
 
 namespace {
 // A simple PrefixExtractor that only works for test PrefixAndWholeKeyTest
-class TestPrefixExtractor : public rocksdb::SliceTransform {
+class TestPrefixExtractor : public ROCKSDB_NAMESPACE::SliceTransform {
  public:
   ~TestPrefixExtractor() override{};
   const char* Name() const override { return "TestPrefixExtractor"; }
 
-  rocksdb::Slice Transform(const rocksdb::Slice& src) const override {
+  ROCKSDB_NAMESPACE::Slice Transform(
+      const ROCKSDB_NAMESPACE::Slice& src) const override {
     assert(IsValid(src));
-    return rocksdb::Slice(src.data(), 3);
+    return ROCKSDB_NAMESPACE::Slice(src.data(), 3);
   }
 
-  bool InDomain(const rocksdb::Slice& src) const override {
+  bool InDomain(const ROCKSDB_NAMESPACE::Slice& src) const override {
     assert(IsValid(src));
     return true;
   }
 
-  bool InRange(const rocksdb::Slice& /*dst*/) const override { return true; }
+  bool InRange(const ROCKSDB_NAMESPACE::Slice& /*dst*/) const override {
+    return true;
+  }
 
-  bool IsValid(const rocksdb::Slice& src) const {
+  bool IsValid(const ROCKSDB_NAMESPACE::Slice& src) const {
     if (src.size() != 4) {
       return false;
     }
@@ -3225,30 +3911,30 @@ class TestPrefixExtractor : public rocksdb::SliceTransform {
 }  // namespace
 
 TEST_F(PrefixTest, PrefixAndWholeKeyTest) {
-  rocksdb::Options options;
-  options.compaction_style = rocksdb::kCompactionStyleUniversal;
+  ROCKSDB_NAMESPACE::Options options;
+  options.compaction_style = ROCKSDB_NAMESPACE::kCompactionStyleUniversal;
   options.num_levels = 20;
   options.create_if_missing = true;
   options.optimize_filters_for_hits = false;
   options.target_file_size_base = 268435456;
   options.prefix_extractor = std::make_shared<TestPrefixExtractor>();
-  rocksdb::BlockBasedTableOptions bbto;
-  bbto.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10));
+  ROCKSDB_NAMESPACE::BlockBasedTableOptions bbto;
+  bbto.filter_policy.reset(ROCKSDB_NAMESPACE::NewBloomFilterPolicy(10));
   bbto.block_size = 262144;
   bbto.whole_key_filtering = true;
 
   const std::string kDBPath = test::PerThreadDBPath("table_prefix_test");
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   DestroyDB(kDBPath, options);
-  rocksdb::DB* db;
-  ASSERT_OK(rocksdb::DB::Open(options, kDBPath, &db));
+  ROCKSDB_NAMESPACE::DB* db;
+  ASSERT_OK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &db));
 
   // Create a bunch of keys with 10 filters.
   for (int i = 0; i < 10; i++) {
     std::string prefix = "[" + std::to_string(i) + "]";
     for (int j = 0; j < 10; j++) {
       std::string key = prefix + std::to_string(j);
-      db->Put(rocksdb::WriteOptions(), key, "1");
+      db->Put(ROCKSDB_NAMESPACE::WriteOptions(), key, "1");
     }
   }
 
@@ -3347,13 +4033,14 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
                            EnvOptions(), ikc),
         std::move(file_reader), ss_rw.contents().size(), &table_reader);
 
-    return table_reader->NewIterator(ReadOptions(),
-                                     moptions.prefix_extractor.get());
+    return table_reader->NewIterator(
+        ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr,
+        /*skip_filters=*/false, TableReaderCaller::kUncategorized);
   };
 
   GetVersionAndGlobalSeqno();
-  ASSERT_EQ(2, version);
-  ASSERT_EQ(0, global_seqno);
+  ASSERT_EQ(2u, version);
+  ASSERT_EQ(0u, global_seqno);
 
   InternalIterator* iter = GetTableInternalIter();
   char current_c = 'a';
@@ -3373,8 +4060,8 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
   // Update global sequence number to 10
   SetGlobalSeqno(10);
   GetVersionAndGlobalSeqno();
-  ASSERT_EQ(2, version);
-  ASSERT_EQ(10, global_seqno);
+  ASSERT_EQ(2u, version);
+  ASSERT_EQ(10u, global_seqno);
 
   iter = GetTableInternalIter();
   current_c = 'a';
@@ -3410,8 +4097,8 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
   // Update global sequence number to 3
   SetGlobalSeqno(3);
   GetVersionAndGlobalSeqno();
-  ASSERT_EQ(2, version);
-  ASSERT_EQ(3, global_seqno);
+  ASSERT_EQ(2u, version);
+  ASSERT_EQ(3u, global_seqno);
 
   iter = GetTableInternalIter();
   current_c = 'a';
@@ -3519,7 +4206,8 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
       std::move(file_reader), ss_rw.contents().size(), &table_reader));
 
   std::unique_ptr<InternalIterator> db_iter(table_reader->NewIterator(
-      ReadOptions(), moptions2.prefix_extractor.get()));
+      ReadOptions(), moptions2.prefix_extractor.get(), /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized));
 
   int expected_key = 1;
   for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
@@ -3588,7 +4276,7 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
     ASSERT_OK(ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size,
                                  &footer, kBlockBasedTableMagicNumber));
 
-    auto BlockFetchHelper = [&](const BlockHandle& handle,
+    auto BlockFetchHelper = [&](const BlockHandle& handle, BlockType block_type,
                                 BlockContents* contents) {
       ReadOptions read_options;
       read_options.verify_checksums = false;
@@ -3597,8 +4285,8 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
       BlockFetcher block_fetcher(
           file, nullptr /* prefetch_buffer */, footer, read_options, handle,
           contents, ioptions, false /* decompress */,
-          false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
-          cache_options);
+          false /*maybe_compressed*/, block_type,
+          UncompressionDict::GetEmptyDict(), cache_options);
 
       ASSERT_OK(block_fetcher.ReadBlockContents());
     };
@@ -3607,13 +4295,13 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
     auto metaindex_handle = footer.metaindex_handle();
     BlockContents metaindex_contents;
 
-    BlockFetchHelper(metaindex_handle, &metaindex_contents);
+    BlockFetchHelper(metaindex_handle, BlockType::kMetaIndex,
+                     &metaindex_contents);
     Block metaindex_block(std::move(metaindex_contents),
                           kDisableGlobalSequenceNumber);
 
-    std::unique_ptr<InternalIterator> meta_iter(
-        metaindex_block.NewIterator<DataBlockIter>(BytewiseComparator(),
-                                                   BytewiseComparator()));
+    std::unique_ptr<InternalIterator> meta_iter(metaindex_block.NewDataIterator(
+        BytewiseComparator(), BytewiseComparator()));
     bool found_properties_block = true;
     ASSERT_OK(SeekToPropertiesBlock(meta_iter.get(), &found_properties_block));
     ASSERT_TRUE(found_properties_block);
@@ -3624,11 +4312,12 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
     ASSERT_OK(properties_handle.DecodeFrom(&v));
     BlockContents properties_contents;
 
-    BlockFetchHelper(properties_handle, &properties_contents);
+    BlockFetchHelper(properties_handle, BlockType::kProperties,
+                     &properties_contents);
     Block properties_block(std::move(properties_contents),
                            kDisableGlobalSequenceNumber);
 
-    ASSERT_EQ(properties_block.NumRestarts(), 1);
+    ASSERT_EQ(properties_block.NumRestarts(), 1u);
   }
 }
 
@@ -3683,16 +4372,16 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
   BlockFetcher block_fetcher(
       table_reader.get(), nullptr /* prefetch_buffer */, footer, ReadOptions(),
       metaindex_handle, &metaindex_contents, ioptions, false /* decompress */,
-      false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
-      pcache_opts, nullptr /*memory_allocator*/);
+      false /*maybe_compressed*/, BlockType::kMetaIndex,
+      UncompressionDict::GetEmptyDict(), pcache_opts,
+      nullptr /*memory_allocator*/);
   ASSERT_OK(block_fetcher.ReadBlockContents());
   Block metaindex_block(std::move(metaindex_contents),
                         kDisableGlobalSequenceNumber);
 
   // verify properties block comes last
   std::unique_ptr<InternalIterator> metaindex_iter{
-      metaindex_block.NewIterator<DataBlockIter>(options.comparator,
-                                                 options.comparator)};
+      metaindex_block.NewDataIterator(options.comparator, options.comparator)};
   uint64_t max_offset = 0;
   std::string key_at_max_offset;
   for (metaindex_iter->SeekToFirst(); metaindex_iter->Valid();
@@ -3713,7 +4402,7 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
 }
 
 TEST_P(BlockBasedTableTest, BadOptions) {
-  rocksdb::Options options;
+  ROCKSDB_NAMESPACE::Options options;
   options.compression = kNoCompression;
   BlockBasedTableOptions bbto = GetBlockBasedTableOptions();
   bbto.block_size = 4000;
@@ -3723,13 +4412,13 @@ TEST_P(BlockBasedTableTest, BadOptions) {
       test::PerThreadDBPath("block_based_table_bad_options_test");
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
   DestroyDB(kDBPath, options);
-  rocksdb::DB* db;
-  ASSERT_NOK(rocksdb::DB::Open(options, kDBPath, &db));
+  ROCKSDB_NAMESPACE::DB* db;
+  ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &db));
 
   bbto.block_size = 4096;
   options.compression = kSnappyCompression;
   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
-  ASSERT_NOK(rocksdb::DB::Open(options, kDBPath, &db));
+  ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &db));
 }
 
 TEST_F(BBTTailPrefetchTest, TestTailPrefetchStats) {
@@ -3810,8 +4499,9 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) {
   auto reader = c.GetTableReader();
 
   std::unique_ptr<InternalIterator> seek_iter;
-  seek_iter.reset(
-      reader->NewIterator(ReadOptions(), moptions.prefix_extractor.get()));
+  seek_iter.reset(reader->NewIterator(
+      ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized));
   for (int i = 0; i < 2; ++i) {
     ReadOptions ro;
     // for every kv, we seek using two method: Get() and Seek()
@@ -3835,7 +4525,7 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) {
         std::string user_key = ExtractUserKey(kv.first).ToString();
         GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
                                GetContext::kNotFound, user_key, &value, nullptr,
-                               nullptr, nullptr, nullptr);
+                               nullptr, true, nullptr, nullptr);
         ASSERT_OK(reader->Get(ro, kv.first, &get_context,
                               moptions.prefix_extractor.get()));
         ASSERT_EQ(get_context.State(), GetContext::kFound);
@@ -3861,7 +4551,7 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) {
         PinnableSlice value;
         GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
                                GetContext::kNotFound, user_key, &value, nullptr,
-                               nullptr, nullptr, nullptr);
+                               nullptr, true, nullptr, nullptr);
         ASSERT_OK(reader->Get(ro, encoded_key, &get_context,
                               moptions.prefix_extractor.get()));
         ASSERT_EQ(get_context.State(), GetContext::kNotFound);
@@ -3871,7 +4561,89 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) {
   }
 }
 
-}  // namespace rocksdb
+// BlockBasedTableIterator should invalidate itself and return
+// OutOfBound()=true immediately after Seek(), to allow LevelIterator
+// filter out corresponding level.
+TEST_P(BlockBasedTableTest, OutOfBoundOnSeek) {
+  TableConstructor c(BytewiseComparator(), true /*convert_to_internal_key*/);
+  c.Add("foo", "v1");
+  std::vector<std::string> keys;
+  stl_wrappers::KVMap kvmap;
+  Options options;
+  BlockBasedTableOptions table_opt(GetBlockBasedTableOptions());
+  options.table_factory.reset(NewBlockBasedTableFactory(table_opt));
+  const ImmutableCFOptions ioptions(options);
+  const MutableCFOptions moptions(options);
+  c.Finish(options, ioptions, moptions, table_opt,
+           GetPlainInternalComparator(BytewiseComparator()), &keys, &kvmap);
+  auto* reader = c.GetTableReader();
+  ReadOptions read_opt;
+  std::string upper_bound = "bar";
+  Slice upper_bound_slice(upper_bound);
+  read_opt.iterate_upper_bound = &upper_bound_slice;
+  std::unique_ptr<InternalIterator> iter;
+  iter.reset(new KeyConvertingIterator(reader->NewIterator(
+      read_opt, /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized)));
+  iter->SeekToFirst();
+  ASSERT_FALSE(iter->Valid());
+  ASSERT_TRUE(iter->IsOutOfBound());
+  iter.reset(new KeyConvertingIterator(reader->NewIterator(
+      read_opt, /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized)));
+  iter->Seek("foo");
+  ASSERT_FALSE(iter->Valid());
+  ASSERT_TRUE(iter->IsOutOfBound());
+}
+
+// BlockBasedTableIterator should invalidate itself and return
+// OutOfBound()=true after Next(), if it finds current index key is no smaller
+// than upper bound, unless it is pointing to the last data block.
+TEST_P(BlockBasedTableTest, OutOfBoundOnNext) {
+  TableConstructor c(BytewiseComparator(), true /*convert_to_internal_key*/);
+  c.Add("bar", "v");
+  c.Add("foo", "v");
+  std::vector<std::string> keys;
+  stl_wrappers::KVMap kvmap;
+  Options options;
+  BlockBasedTableOptions table_opt(GetBlockBasedTableOptions());
+  table_opt.flush_block_policy_factory =
+      std::make_shared<FlushBlockEveryKeyPolicyFactory>();
+  options.table_factory.reset(NewBlockBasedTableFactory(table_opt));
+  const ImmutableCFOptions ioptions(options);
+  const MutableCFOptions moptions(options);
+  c.Finish(options, ioptions, moptions, table_opt,
+           GetPlainInternalComparator(BytewiseComparator()), &keys, &kvmap);
+  auto* reader = c.GetTableReader();
+  ReadOptions read_opt;
+  std::string ub1 = "bar_after";
+  Slice ub_slice1(ub1);
+  read_opt.iterate_upper_bound = &ub_slice1;
+  std::unique_ptr<InternalIterator> iter;
+  iter.reset(new KeyConvertingIterator(reader->NewIterator(
+      read_opt, /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized)));
+  iter->Seek("bar");
+  ASSERT_TRUE(iter->Valid());
+  ASSERT_EQ("bar", iter->key());
+  iter->Next();
+  ASSERT_FALSE(iter->Valid());
+  ASSERT_TRUE(iter->IsOutOfBound());
+  std::string ub2 = "foo_after";
+  Slice ub_slice2(ub2);
+  read_opt.iterate_upper_bound = &ub_slice2;
+  iter.reset(new KeyConvertingIterator(reader->NewIterator(
+      read_opt, /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kUncategorized)));
+  iter->Seek("foo");
+  ASSERT_TRUE(iter->Valid());
+  ASSERT_EQ("foo", iter->key());
+  iter->Next();
+  ASSERT_FALSE(iter->Valid());
+  ASSERT_FALSE(iter->IsOutOfBound());
+}
+
+}  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);