]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/table/block_based_table_reader.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / table / block_based_table_reader.cc
index abb476578fc48c53ee88d6ca1f71bd3ab83629e7..9f2e02d68063e4cd3a66b9fa038351ae22c895b8 100644 (file)
@@ -1,7 +1,7 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
@@ -9,6 +9,7 @@
 #include "table/block_based_table_reader.h"
 
 #include <algorithm>
+#include <array>
 #include <limits>
 #include <string>
 #include <utility>
@@ -30,6 +31,7 @@
 #include "table/block.h"
 #include "table/block_based_filter_block.h"
 #include "table/block_based_table_factory.h"
+#include "table/block_fetcher.h"
 #include "table/block_prefix_index.h"
 #include "table/filter_block.h"
 #include "table/format.h"
@@ -63,6 +65,8 @@ BlockBasedTable::~BlockBasedTable() {
   delete rep_;
 }
 
+std::atomic<uint64_t> BlockBasedTable::next_cache_key_id_(0);
+
 namespace {
 // Read the block identified by "handle" from "file".
 // The only relevant option is options.verify_checksums for now.
@@ -70,17 +74,18 @@ namespace {
 // On success fill *result and return OK - caller owns *result
 // @param compression_dict Data for presetting the compression library's
 //    dictionary.
-Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer,
-                         const ReadOptions& options, const BlockHandle& handle,
-                         std::unique_ptr<Block>* result,
-                         const ImmutableCFOptions& ioptions, bool do_uncompress,
-                         const Slice& compression_dict,
-                         const PersistentCacheOptions& cache_options,
-                         SequenceNumber global_seqno,
-                         size_t read_amp_bytes_per_bit) {
+Status ReadBlockFromFile(
+    RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer,
+    const Footer& footer, const ReadOptions& options, const BlockHandle& handle,
+    std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
+    bool do_uncompress, const Slice& compression_dict,
+    const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
+    size_t read_amp_bytes_per_bit, const bool immortal_file = false) {
   BlockContents contents;
-  Status s = ReadBlockContents(file, footer, options, handle, &contents, ioptions,
-                               do_uncompress, compression_dict, cache_options);
+  BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
+                             &contents, ioptions, do_uncompress,
+                             compression_dict, cache_options, immortal_file);
+  Status s = block_fetcher.ReadBlockContents();
   if (s.ok()) {
     result->reset(new Block(std::move(contents), global_seqno,
                             read_amp_bytes_per_bit, ioptions.statistics));
@@ -91,13 +96,13 @@ Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer,
 
 // Delete the resource that is held by the iterator.
 template <class ResourceType>
-void DeleteHeldResource(void* arg, void* ignored) {
+void DeleteHeldResource(void* arg, void* /*ignored*/) {
   delete reinterpret_cast<ResourceType*>(arg);
 }
 
 // Delete the entry resided in the cache.
 template <class Entry>
-void DeleteCachedEntry(const Slice& key, void* value) {
+void DeleteCachedEntry(const Slice& /*key*/, void* value) {
   auto entry = reinterpret_cast<Entry*>(value);
   delete entry;
 }
@@ -112,6 +117,13 @@ void ReleaseCachedEntry(void* arg, void* h) {
   cache->Release(handle);
 }
 
+// Release the cached entry and decrement its ref count.
+void ForceReleaseCachedEntry(void* arg, void* h) {
+  Cache* cache = reinterpret_cast<Cache*>(arg);
+  Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
+  cache->Release(handle, true /* force_erase */);
+}
+
 Slice GetCacheKeyFromOffset(const char* cache_key_prefix,
                             size_t cache_key_prefix_size, uint64_t offset,
                             char* cache_key) {
@@ -126,27 +138,66 @@ Slice GetCacheKeyFromOffset(const char* cache_key_prefix,
 Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
                                  Tickers block_cache_miss_ticker,
                                  Tickers block_cache_hit_ticker,
-                                 Statistics* statistics) {
+                                 uint64_t* block_cache_miss_stats,
+                                 uint64_t* block_cache_hit_stats,
+                                 Statistics* statistics,
+                                 GetContext* get_context) {
   auto cache_handle = block_cache->Lookup(key, statistics);
   if (cache_handle != nullptr) {
     PERF_COUNTER_ADD(block_cache_hit_count, 1);
-    // overall cache hit
-    RecordTick(statistics, BLOCK_CACHE_HIT);
-    // total bytes read from cache
-    RecordTick(statistics, BLOCK_CACHE_BYTES_READ,
-               block_cache->GetUsage(cache_handle));
-    // block-type specific cache hit
-    RecordTick(statistics, block_cache_hit_ticker);
+    if (get_context != nullptr) {
+      // overall cache hit
+      get_context->get_context_stats_.num_cache_hit++;
+      // total bytes read from cache
+      get_context->get_context_stats_.num_cache_bytes_read +=
+          block_cache->GetUsage(cache_handle);
+      // block-type specific cache hit
+      (*block_cache_hit_stats)++;
+    } else {
+      // overall cache hit
+      RecordTick(statistics, BLOCK_CACHE_HIT);
+      // total bytes read from cache
+      RecordTick(statistics, BLOCK_CACHE_BYTES_READ,
+                 block_cache->GetUsage(cache_handle));
+      RecordTick(statistics, block_cache_hit_ticker);
+    }
   } else {
-    // overall cache miss
-    RecordTick(statistics, BLOCK_CACHE_MISS);
-    // block-type specific cache miss
-    RecordTick(statistics, block_cache_miss_ticker);
+    if (get_context != nullptr) {
+      // overall cache miss
+      get_context->get_context_stats_.num_cache_miss++;
+      // block-type specific cache miss
+      (*block_cache_miss_stats)++;
+    } else {
+      RecordTick(statistics, BLOCK_CACHE_MISS);
+      RecordTick(statistics, block_cache_miss_ticker);
+    }
   }
 
   return cache_handle;
 }
 
+// For hash based index, return true if prefix_extractor and
+// prefix_extractor_block mismatch, false otherwise. This flag will be used
+// as total_order_seek via NewIndexIterator
+bool PrefixExtractorChanged(const TableProperties* table_properties,
+                            const SliceTransform* prefix_extractor) {
+  // BlockBasedTableOptions::kHashSearch requires prefix_extractor to be set.
+  // Turn off hash index in prefix_extractor is not set; if  prefix_extractor
+  // is set but prefix_extractor_block is not set, also disable hash index
+  if (prefix_extractor == nullptr || table_properties == nullptr ||
+      table_properties->prefix_extractor_name.empty()) {
+    return true;
+  }
+
+  // prefix_extractor and prefix_extractor_block are both non-empty
+  if (table_properties->prefix_extractor_name.compare(
+          prefix_extractor->Name()) != 0) {
+    return true;
+  } else {
+    return false;
+  }
+}
+
 }  // namespace
 
 // Index that allows binary search lookup in a two-level index structure.
@@ -157,48 +208,129 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
   // On success, index_reader will be populated; otherwise it will remain
   // unmodified.
   static Status Create(BlockBasedTable* table, RandomAccessFileReader* file,
+                       FilePrefetchBuffer* prefetch_buffer,
                        const Footer& footer, const BlockHandle& index_handle,
                        const ImmutableCFOptions& ioptions,
-                       const Comparator* comparator, IndexReader** index_reader,
+                       const InternalKeyComparator* icomparator,
+                       IndexReader** index_reader,
                        const PersistentCacheOptions& cache_options,
-                       const int level) {
+                       const int level, const bool index_key_includes_seq,
+                       const bool index_value_is_full) {
     std::unique_ptr<Block> index_block;
     auto s = ReadBlockFromFile(
-        file, footer, ReadOptions(), index_handle, &index_block, ioptions,
-        true /* decompress */, Slice() /*compression dict*/, cache_options,
+        file, prefetch_buffer, footer, ReadOptions(), index_handle,
+        &index_block, ioptions, true /* decompress */,
+        Slice() /*compression dict*/, cache_options,
         kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */);
 
     if (s.ok()) {
-      *index_reader =
-          new PartitionIndexReader(table, comparator, std::move(index_block),
-                                   ioptions.statistics, level);
+      *index_reader = new PartitionIndexReader(
+          table, icomparator, std::move(index_block), ioptions.statistics,
+          level, index_key_includes_seq, index_value_is_full);
     }
 
     return s;
   }
 
   // return a two-level iterator: first level is on the partition index
-  virtual InternalIterator* NewIterator(BlockIter* iter = nullptr,
-                                        bool dont_care = true) override {
+  virtual InternalIteratorBase<BlockHandle>* NewIterator(
+      IndexBlockIter* /*iter*/ = nullptr, bool /*dont_care*/ = true,
+      bool fill_cache = true) override {
+    Statistics* kNullStats = nullptr;
     // Filters are already checked before seeking the index
-    const bool skip_filters = true;
-    const bool is_index = true;
-    Cleanable* block_cache_cleaner = nullptr;
-    const bool pin_cached_indexes =
-        level_ == 0 &&
-        table_->rep_->table_options.pin_l0_filter_and_index_blocks_in_cache;
-    if (pin_cached_indexes) {
-      // Keep partition indexes into the cache as long as the partition index
-      // reader object is alive
-      block_cache_cleaner = this;
-    }
-    return NewTwoLevelIterator(
-        new BlockBasedTable::BlockEntryIteratorState(
-            table_, ReadOptions(), skip_filters, is_index, block_cache_cleaner),
-        index_block_->NewIterator(comparator_, nullptr, true));
+    if (!partition_map_.empty()) {
+      return NewTwoLevelIterator(
+          new BlockBasedTable::PartitionedIndexIteratorState(
+              table_, &partition_map_, index_key_includes_seq_,
+              index_value_is_full_),
+          index_block_->NewIterator<IndexBlockIter>(
+              icomparator_, icomparator_->user_comparator(), nullptr,
+              kNullStats, true, index_key_includes_seq_, index_value_is_full_));
+    } else {
+      auto ro = ReadOptions();
+      ro.fill_cache = fill_cache;
+      bool kIsIndex = true;
+      return new BlockBasedTableIterator<IndexBlockIter, BlockHandle>(
+          table_, ro, *icomparator_,
+          index_block_->NewIterator<IndexBlockIter>(
+              icomparator_, icomparator_->user_comparator(), nullptr,
+              kNullStats, true, index_key_includes_seq_, index_value_is_full_),
+          false, true, /* prefix_extractor */ nullptr, kIsIndex,
+          index_key_includes_seq_, index_value_is_full_);
+    }
     // TODO(myabandeh): Update TwoLevelIterator to be able to make use of
-    // on-stack
-    // BlockIter while the state is on heap
+    // on-stack BlockIter while the state is on heap. Currentlly it assumes
+    // the first level iter is always on heap and will attempt to delete it
+    // in its destructor.
+  }
+
+  virtual void CacheDependencies(bool pin) override {
+    // Before read partitions, prefetch them to avoid lots of IOs
+    auto rep = table_->rep_;
+    IndexBlockIter biter;
+    BlockHandle handle;
+    Statistics* kNullStats = nullptr;
+    index_block_->NewIterator<IndexBlockIter>(
+        icomparator_, icomparator_->user_comparator(), &biter, kNullStats, true,
+        index_key_includes_seq_, index_value_is_full_);
+    // Index partitions are assumed to be consecuitive. Prefetch them all.
+    // Read the first block offset
+    biter.SeekToFirst();
+    if (!biter.Valid()) {
+      // Empty index.
+      return;
+    }
+    handle = biter.value();
+    uint64_t prefetch_off = handle.offset();
+
+    // Read the last block's offset
+    biter.SeekToLast();
+    if (!biter.Valid()) {
+      // Empty index.
+      return;
+    }
+    handle = biter.value();
+    uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize;
+    uint64_t prefetch_len = last_off - prefetch_off;
+    std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
+    auto& file = table_->rep_->file;
+    prefetch_buffer.reset(new FilePrefetchBuffer());
+    Status s = prefetch_buffer->Prefetch(file.get(), prefetch_off,
+                                         static_cast<size_t>(prefetch_len));
+
+    // After prefetch, read the partitions one by one
+    biter.SeekToFirst();
+    auto ro = ReadOptions();
+    Cache* block_cache = rep->table_options.block_cache.get();
+    for (; biter.Valid(); biter.Next()) {
+      handle = biter.value();
+      BlockBasedTable::CachableEntry<Block> block;
+      Slice compression_dict;
+      if (rep->compression_dict_block) {
+        compression_dict = rep->compression_dict_block->data;
+      }
+      const bool is_index = true;
+      // TODO: Support counter batch update for partitioned index and
+      // filter blocks
+      s = table_->MaybeLoadDataBlockToCache(
+          prefetch_buffer.get(), rep, ro, handle, compression_dict, &block,
+          is_index, nullptr /* get_context */);
+
+      assert(s.ok() || block.value == nullptr);
+      if (s.ok() && block.value != nullptr) {
+        if (block.cache_handle != nullptr) {
+          if (pin) {
+            partition_map_[handle.offset()] = block;
+            RegisterCleanup(&ReleaseCachedEntry, block_cache,
+                            block.cache_handle);
+          } else {
+            block_cache->Release(block.cache_handle);
+          }
+        } else {
+          delete block.value;
+        }
+      }
+    }
   }
 
   virtual size_t size() const override { return index_block_->size(); }
@@ -208,22 +340,35 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
 
   virtual size_t ApproximateMemoryUsage() const override {
     assert(index_block_);
-    return index_block_->ApproximateMemoryUsage();
+    size_t usage = index_block_->ApproximateMemoryUsage();
+#ifdef ROCKSDB_MALLOC_USABLE_SIZE
+    usage += malloc_usable_size((void*)this);
+#else
+    usage += sizeof(*this);
+#endif  // ROCKSDB_MALLOC_USABLE_SIZE
+    // TODO(myabandeh): more accurate estimate of partition_map_ mem usage
+    return usage;
   }
 
  private:
-  PartitionIndexReader(BlockBasedTable* table, const Comparator* comparator,
+  PartitionIndexReader(BlockBasedTable* table,
+                       const InternalKeyComparator* icomparator,
                        std::unique_ptr<Block>&& index_block, Statistics* stats,
-                       const int level)
-      : IndexReader(comparator, stats),
+                       const int /*level*/, const bool index_key_includes_seq,
+                       const bool index_value_is_full)
+      : IndexReader(icomparator, stats),
         table_(table),
         index_block_(std::move(index_block)),
-        level_(level) {
+        index_key_includes_seq_(index_key_includes_seq),
+        index_value_is_full_(index_value_is_full) {
     assert(index_block_ != nullptr);
   }
   BlockBasedTable* table_;
   std::unique_ptr<Block> index_block_;
-  int level_;
+  std::unordered_map<uint64_t, BlockBasedTable::CachableEntry<Block>>
+      partition_map_;
+  const bool index_key_includes_seq_;
+  const bool index_value_is_full_;
 };
 
 // Index that allows binary search lookup for the first key of each block.
@@ -235,28 +380,38 @@ class BinarySearchIndexReader : public IndexReader {
   // `BinarySearchIndexReader`.
   // On success, index_reader will be populated; otherwise it will remain
   // unmodified.
-  static Status Create(RandomAccessFileReader* file, const Footer& footer,
-                       const BlockHandle& index_handle,
-                       const ImmutableCFOptions &ioptions,
-                       const Comparator* comparator, IndexReader** index_reader,
-                       const PersistentCacheOptions& cache_options) {
+  static Status Create(RandomAccessFileReader* file,
+                       FilePrefetchBuffer* prefetch_buffer,
+                       const Footer& footer, const BlockHandle& index_handle,
+                       const ImmutableCFOptions& ioptions,
+                       const InternalKeyComparator* icomparator,
+                       IndexReader** index_reader,
+                       const PersistentCacheOptions& cache_options,
+                       const bool index_key_includes_seq,
+                       const bool index_value_is_full) {
     std::unique_ptr<Block> index_block;
     auto s = ReadBlockFromFile(
-        file, footer, ReadOptions(), index_handle, &index_block, ioptions,
-        true /* decompress */, Slice() /*compression dict*/, cache_options,
+        file, prefetch_buffer, footer, ReadOptions(), index_handle,
+        &index_block, ioptions, true /* decompress */,
+        Slice() /*compression dict*/, cache_options,
         kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */);
 
     if (s.ok()) {
       *index_reader = new BinarySearchIndexReader(
-          comparator, std::move(index_block), ioptions.statistics);
+          icomparator, std::move(index_block), ioptions.statistics,
+          index_key_includes_seq, index_value_is_full);
     }
 
     return s;
   }
 
-  virtual InternalIterator* NewIterator(BlockIter* iter = nullptr,
-                                        bool dont_care = true) override {
-    return index_block_->NewIterator(comparator_, iter, true);
+  virtual InternalIteratorBase<BlockHandle>* NewIterator(
+      IndexBlockIter* iter = nullptr, bool /*dont_care*/ = true,
+      bool /*dont_care*/ = true) override {
+    Statistics* kNullStats = nullptr;
+    return index_block_->NewIterator<IndexBlockIter>(
+        icomparator_, icomparator_->user_comparator(), iter, kNullStats, true,
+        index_key_includes_seq_, index_value_is_full_);
   }
 
   virtual size_t size() const override { return index_block_->size(); }
@@ -266,36 +421,49 @@ class BinarySearchIndexReader : public IndexReader {
 
   virtual size_t ApproximateMemoryUsage() const override {
     assert(index_block_);
-    return index_block_->ApproximateMemoryUsage();
+    size_t usage = index_block_->ApproximateMemoryUsage();
+#ifdef ROCKSDB_MALLOC_USABLE_SIZE
+    usage += malloc_usable_size((void*)this);
+#else
+    usage += sizeof(*this);
+#endif  // ROCKSDB_MALLOC_USABLE_SIZE
+    return usage;
   }
 
  private:
-  BinarySearchIndexReader(const Comparator* comparator,
+  BinarySearchIndexReader(const InternalKeyComparator* icomparator,
                           std::unique_ptr<Block>&& index_block,
-                          Statistics* stats)
-      : IndexReader(comparator, stats), index_block_(std::move(index_block)) {
+                          Statistics* stats, const bool index_key_includes_seq,
+                          const bool index_value_is_full)
+      : IndexReader(icomparator, stats),
+        index_block_(std::move(index_block)),
+        index_key_includes_seq_(index_key_includes_seq),
+        index_value_is_full_(index_value_is_full) {
     assert(index_block_ != nullptr);
   }
   std::unique_ptr<Block> index_block_;
+  const bool index_key_includes_seq_;
+  const bool index_value_is_full_;
 };
 
 // Index that leverages an internal hash table to quicken the lookup for a given
 // key.
 class HashIndexReader : public IndexReader {
  public:
-  static Status Create(const SliceTransform* hash_key_extractor,
-                       const Footer& footer, RandomAccessFileReader* file,
-                       const ImmutableCFOptions& ioptions,
-                       const Comparator* comparator,
-                       const BlockHandle& index_handle,
-                       InternalIterator* meta_index_iter,
-                       IndexReader** index_reader,
-                       bool hash_index_allow_collision,
-                       const PersistentCacheOptions& cache_options) {
+  static Status Create(
+      const SliceTransform* hash_key_extractor, const Footer& footer,
+      RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer,
+      const ImmutableCFOptions& ioptions,
+      const InternalKeyComparator* icomparator, const BlockHandle& index_handle,
+      InternalIterator* meta_index_iter, IndexReader** index_reader,
+      bool /*hash_index_allow_collision*/,
+      const PersistentCacheOptions& cache_options,
+      const bool index_key_includes_seq, const bool index_value_is_full) {
     std::unique_ptr<Block> index_block;
     auto s = ReadBlockFromFile(
-        file, footer, ReadOptions(), index_handle, &index_block, ioptions,
-        true /* decompress */, Slice() /*compression dict*/, cache_options,
+        file, prefetch_buffer, footer, ReadOptions(), index_handle,
+        &index_block, ioptions, true /* decompress */,
+        Slice() /*compression dict*/, cache_options,
         kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */);
 
     if (!s.ok()) {
@@ -306,9 +474,9 @@ class HashIndexReader : public IndexReader {
     // hard error. We can still fall back to the original binary search index.
     // So, Create will succeed regardless, from this point on.
 
-    auto new_index_reader =
-        new HashIndexReader(comparator, std::move(index_block),
-          ioptions.statistics);
+    auto new_index_reader = new HashIndexReader(
+        icomparator, std::move(index_block), ioptions.statistics,
+        index_key_includes_seq, index_value_is_full);
     *index_reader = new_index_reader;
 
     // Get prefixes block
@@ -329,18 +497,23 @@ class HashIndexReader : public IndexReader {
       return Status::OK();
     }
 
+    Slice dummy_comp_dict;
     // Read contents for the blocks
     BlockContents prefixes_contents;
-    s = ReadBlockContents(file, footer, ReadOptions(), prefixes_handle,
-                          &prefixes_contents, ioptions, true /* decompress */,
-                          Slice() /*compression dict*/, cache_options);
+    BlockFetcher prefixes_block_fetcher(
+        file, prefetch_buffer, footer, ReadOptions(), prefixes_handle,
+        &prefixes_contents, ioptions, true /* decompress */,
+        dummy_comp_dict /*compression dict*/, cache_options);
+    s = prefixes_block_fetcher.ReadBlockContents();
     if (!s.ok()) {
       return s;
     }
     BlockContents prefixes_meta_contents;
-    s = ReadBlockContents(file, footer, ReadOptions(), prefixes_meta_handle,
-                          &prefixes_meta_contents, ioptions, true /* decompress */,
-                          Slice() /*compression dict*/, cache_options);
+    BlockFetcher prefixes_meta_block_fetcher(
+        file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle,
+        &prefixes_meta_contents, ioptions, true /* decompress */,
+        dummy_comp_dict /*compression dict*/, cache_options);
+    s = prefixes_meta_block_fetcher.ReadBlockContents();
     if (!s.ok()) {
       // TODO: log error
       return Status::OK();
@@ -351,15 +524,20 @@ class HashIndexReader : public IndexReader {
                                  prefixes_meta_contents.data, &prefix_index);
     // TODO: log error
     if (s.ok()) {
-      new_index_reader->index_block_->SetBlockPrefixIndex(prefix_index);
+      new_index_reader->prefix_index_.reset(prefix_index);
     }
 
     return Status::OK();
   }
 
-  virtual InternalIterator* NewIterator(BlockIter* iter = nullptr,
-                                        bool total_order_seek = true) override {
-    return index_block_->NewIterator(comparator_, iter, total_order_seek);
+  virtual InternalIteratorBase<BlockHandle>* NewIterator(
+      IndexBlockIter* iter = nullptr, bool total_order_seek = true,
+      bool /*dont_care*/ = true) override {
+    Statistics* kNullStats = nullptr;
+    return index_block_->NewIterator<IndexBlockIter>(
+        icomparator_, icomparator_->user_comparator(), iter, kNullStats,
+        total_order_seek, index_key_includes_seq_, index_value_is_full_,
+        prefix_index_.get());
   }
 
   virtual size_t size() const override { return index_block_->size(); }
@@ -369,14 +547,28 @@ class HashIndexReader : public IndexReader {
 
   virtual size_t ApproximateMemoryUsage() const override {
     assert(index_block_);
-    return index_block_->ApproximateMemoryUsage() +
-           prefixes_contents_.data.size();
+    size_t usage = index_block_->ApproximateMemoryUsage();
+    usage += prefixes_contents_.usable_size();
+#ifdef ROCKSDB_MALLOC_USABLE_SIZE
+    usage += malloc_usable_size((void*)this);
+#else
+    if (prefix_index_) {
+      usage += prefix_index_->ApproximateMemoryUsage();
+    }
+    usage += sizeof(*this);
+#endif  // ROCKSDB_MALLOC_USABLE_SIZE
+    return usage;
   }
 
  private:
-  HashIndexReader(const Comparator* comparator,
-                  std::unique_ptr<Block>&& index_block, Statistics* stats)
-      : IndexReader(comparator, stats), index_block_(std::move(index_block)) {
+  HashIndexReader(const InternalKeyComparator* icomparator,
+                  std::unique_ptr<Block>&& index_block, Statistics* stats,
+                  const bool index_key_includes_seq,
+                  const bool index_value_is_full)
+      : IndexReader(icomparator, stats),
+        index_block_(std::move(index_block)),
+        index_key_includes_seq_(index_key_includes_seq),
+        index_value_is_full_(index_value_is_full) {
     assert(index_block_ != nullptr);
   }
 
@@ -384,7 +576,10 @@ class HashIndexReader : public IndexReader {
   }
 
   std::unique_ptr<Block> index_block_;
+  std::unique_ptr<BlockPrefixIndex> prefix_index_;
   BlockContents prefixes_contents_;
+  const bool index_key_includes_seq_;
+  const bool index_value_is_full_;
 };
 
 // Helper function to setup the cache key's prefix for the Table.
@@ -458,51 +653,71 @@ bool IsFeatureSupported(const TableProperties& table_properties,
   return true;
 }
 
-SequenceNumber GetGlobalSequenceNumber(const TableProperties& table_properties,
-                                       Logger* info_log) {
-  auto& props = table_properties.user_collected_properties;
-
-  auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion);
-  auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno);
+// Caller has to ensure seqno is not nullptr.
+Status GetGlobalSequenceNumber(const TableProperties& table_properties,
+                               SequenceNumber largest_seqno,
+                               SequenceNumber* seqno) {
+  const auto& props = table_properties.user_collected_properties;
+  const auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion);
+  const auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno);
 
+  *seqno = kDisableGlobalSequenceNumber;
   if (version_pos == props.end()) {
     if (seqno_pos != props.end()) {
+      std::array<char, 200> msg_buf;
       // This is not an external sst file, global_seqno is not supported.
-      assert(false);
-      ROCKS_LOG_ERROR(
-          info_log,
+      snprintf(
+          msg_buf.data(), msg_buf.max_size(),
           "A non-external sst file have global seqno property with value %s",
           seqno_pos->second.c_str());
+      return Status::Corruption(msg_buf.data());
     }
-    return kDisableGlobalSequenceNumber;
+    return Status::OK();
   }
 
   uint32_t version = DecodeFixed32(version_pos->second.c_str());
   if (version < 2) {
     if (seqno_pos != props.end() || version != 1) {
+      std::array<char, 200> msg_buf;
       // This is a v1 external sst file, global_seqno is not supported.
-      assert(false);
-      ROCKS_LOG_ERROR(
-          info_log,
-          "An external sst file with version %u have global seqno property "
-          "with value %s",
-          version, seqno_pos->second.c_str());
+      snprintf(msg_buf.data(), msg_buf.max_size(),
+               "An external sst file with version %u have global seqno "
+               "property with value %s",
+               version, seqno_pos->second.c_str());
+      return Status::Corruption(msg_buf.data());
     }
-    return kDisableGlobalSequenceNumber;
+    return Status::OK();
   }
 
-  SequenceNumber global_seqno = DecodeFixed64(seqno_pos->second.c_str());
+  // Since we have a plan to deprecate global_seqno, we do not return failure
+  // if seqno_pos == props.end(). We rely on version_pos to detect whether the
+  // SST is external.
+  SequenceNumber global_seqno(0);
+  if (seqno_pos != props.end()) {
+    global_seqno = DecodeFixed64(seqno_pos->second.c_str());
+  }
+  if (global_seqno != 0 && global_seqno != largest_seqno) {
+    std::array<char, 200> msg_buf;
+    snprintf(msg_buf.data(), msg_buf.max_size(),
+             "An external sst file with version %u have global seqno property "
+             "with value %s, while largest seqno in the file is %llu",
+             version, seqno_pos->second.c_str(),
+             static_cast<unsigned long long>(largest_seqno));
+    return Status::Corruption(msg_buf.data());
+  }
+  global_seqno = largest_seqno;
+  *seqno = largest_seqno;
 
   if (global_seqno > kMaxSequenceNumber) {
-    assert(false);
-    ROCKS_LOG_ERROR(
-        info_log,
-        "An external sst file with version %u have global seqno property "
-        "with value %llu, which is greater than kMaxSequenceNumber",
-        version, global_seqno);
+    std::array<char, 200> msg_buf;
+    snprintf(msg_buf.data(), msg_buf.max_size(),
+             "An external sst file with version %u have global seqno property "
+             "with value %llu, which is greater than kMaxSequenceNumber",
+             version, static_cast<unsigned long long>(global_seqno));
+    return Status::Corruption(msg_buf.data());
   }
 
-  return global_seqno;
+  return Status::OK();
 }
 }  // namespace
 
@@ -525,18 +740,59 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
                              unique_ptr<RandomAccessFileReader>&& file,
                              uint64_t file_size,
                              unique_ptr<TableReader>* table_reader,
+                             const SliceTransform* prefix_extractor,
                              const bool prefetch_index_and_filter_in_cache,
-                             const bool skip_filters, const int level) {
+                             const bool skip_filters, const int level,
+                             const bool immortal_table,
+                             const SequenceNumber largest_seqno,
+                             TailPrefetchStats* tail_prefetch_stats) {
   table_reader->reset();
 
   Footer footer;
 
-  // Before read footer, readahead backwards to prefetch data
-  Status s =
-      file->Prefetch((file_size < 512 * 1024 ? 0 : file_size - 512 * 1024),
-                     512 * 1024 /* 512 KB prefetching */);
-  s = ReadFooterFromFile(file.get(), file_size, &footer,
-                              kBlockBasedTableMagicNumber);
+  std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
+
+  // prefetch both index and filters, down to all partitions
+  const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
+  const bool preload_all = !table_options.cache_index_and_filter_blocks;
+
+  size_t tail_prefetch_size = 0;
+  if (tail_prefetch_stats != nullptr) {
+    // Multiple threads may get a 0 (no history) when running in parallel,
+    // but it will get cleared after the first of them finishes.
+    tail_prefetch_size = tail_prefetch_stats->GetSuggestedPrefetchSize();
+  }
+  if (tail_prefetch_size == 0) {
+    // Before read footer, readahead backwards to prefetch data. Do more
+    // readahead if we're going to read index/filter.
+    // TODO: This may incorrectly select small readahead in case partitioned
+    // index/filter is enabled and top-level partition pinning is enabled.
+    // That's because we need to issue readahead before we read the properties,
+    // at which point we don't yet know the index type.
+    tail_prefetch_size = prefetch_all || preload_all ? 512 * 1024 : 4 * 1024;
+  }
+  size_t prefetch_off;
+  size_t prefetch_len;
+  if (file_size < tail_prefetch_size) {
+    prefetch_off = 0;
+    prefetch_len = static_cast<size_t>(file_size);
+  } else {
+    prefetch_off = static_cast<size_t>(file_size - tail_prefetch_size);
+    prefetch_len = tail_prefetch_size;
+  }
+  TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen",
+                           &tail_prefetch_size);
+  Status s;
+  // TODO should not have this special logic in the future.
+  if (!file->use_direct_io()) {
+    prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, false, true));
+    s = file->Prefetch(prefetch_off, prefetch_len);
+  } else {
+    prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true));
+    s = prefetch_buffer->Prefetch(file.get(), prefetch_off, prefetch_len);
+  }
+  s = ReadFooterFromFile(file.get(), prefetch_buffer.get(), file_size, &footer,
+                         kBlockBasedTableMagicNumber);
   if (!s.ok()) {
     return s;
   }
@@ -551,7 +807,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
   // raw pointer will be used to create HashIndexReader, whose reset may
   // access a dangling pointer.
   Rep* rep = new BlockBasedTable::Rep(ioptions, env_options, table_options,
-                                      internal_comparator, skip_filters);
+                                      internal_comparator, skip_filters,
+                                      immortal_table);
   rep->file = std::move(file);
   rep->footer = footer;
   rep->index_type = table_options.index_type;
@@ -559,7 +816,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
   // We need to wrap data with internal_prefix_transform to make sure it can
   // handle prefix correctly.
   rep->internal_prefix_transform.reset(
-      new InternalKeySliceTransform(rep->ioptions.prefix_extractor));
+      new InternalKeySliceTransform(prefix_extractor));
   SetupCacheKeyPrefix(rep, file_size);
   unique_ptr<BlockBasedTable> new_table(new BlockBasedTable(rep));
 
@@ -573,7 +830,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
   // Read meta index
   std::unique_ptr<Block> meta;
   std::unique_ptr<InternalIterator> meta_iter;
-  s = ReadMetaBlock(rep, &meta, &meta_iter);
+  s = ReadMetaBlock(rep, prefetch_buffer.get(), &meta, &meta_iter);
   if (!s.ok()) {
     return s;
   }
@@ -619,8 +876,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
     s = meta_iter->status();
     TableProperties* table_properties = nullptr;
     if (s.ok()) {
-      s = ReadProperties(meta_iter->value(), rep->file.get(), rep->footer,
-                         rep->ioptions, &table_properties);
+      s = ReadProperties(meta_iter->value(), rep->file.get(),
+                         prefetch_buffer.get(), rep->footer, rep->ioptions,
+                         &table_properties, false /* compression_type_missing */);
     }
 
     if (!s.ok()) {
@@ -629,32 +887,45 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
                      "block %s",
                      s.ToString().c_str());
     } else {
+      assert(table_properties != nullptr);
       rep->table_properties.reset(table_properties);
+      rep->blocks_maybe_compressed = rep->table_properties->compression_name !=
+                                     CompressionTypeToString(kNoCompression);
     }
   } else {
     ROCKS_LOG_ERROR(rep->ioptions.info_log,
                     "Cannot find Properties block from file.");
   }
+#ifndef ROCKSDB_LITE
+  if (rep->table_properties) {
+    ParseSliceTransform(rep->table_properties->prefix_extractor_name,
+                        &(rep->table_prefix_extractor));
+  }
+#endif  // ROCKSDB_LITE
 
   // Read the compression dictionary meta block
   bool found_compression_dict;
-  s = SeekToCompressionDictBlock(meta_iter.get(), &found_compression_dict);
+  BlockHandle compression_dict_handle;
+  s = SeekToCompressionDictBlock(meta_iter.get(), &found_compression_dict,
+    &compression_dict_handle);
   if (!s.ok()) {
     ROCKS_LOG_WARN(
         rep->ioptions.info_log,
         "Error when seeking to compression dictionary block from file: %s",
         s.ToString().c_str());
-  } else if (found_compression_dict) {
+  } else if (found_compression_dict && !compression_dict_handle.IsNull()) {
     // TODO(andrewkr): Add to block cache if cache_index_and_filter_blocks is
     // true.
-    unique_ptr<BlockContents> compression_dict_block{new BlockContents()};
-    // TODO(andrewkr): ReadMetaBlock repeats SeekToCompressionDictBlock().
-    // maybe decode a handle from meta_iter
-    // and do ReadBlockContents(handle) instead
-    s = rocksdb::ReadMetaBlock(rep->file.get(), file_size,
-                               kBlockBasedTableMagicNumber, rep->ioptions,
-                               rocksdb::kCompressionDictBlock,
-                               compression_dict_block.get());
+    std::unique_ptr<BlockContents> compression_dict_cont{new BlockContents()};
+    PersistentCacheOptions cache_options;
+    ReadOptions read_options;
+    read_options.verify_checksums = false;
+    BlockFetcher compression_block_fetcher(
+      rep->file.get(), prefetch_buffer.get(), rep->footer, read_options,
+      compression_dict_handle, compression_dict_cont.get(), rep->ioptions, false /* decompress */,
+      Slice() /*compression dict*/, cache_options);
+    s = compression_block_fetcher.ReadBlockContents();
+
     if (!s.ok()) {
       ROCKS_LOG_WARN(
           rep->ioptions.info_log,
@@ -662,7 +933,25 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
           "block %s",
           s.ToString().c_str());
     } else {
-      rep->compression_dict_block = std::move(compression_dict_block);
+      rep->compression_dict_block = std::move(compression_dict_cont);
+    }
+  }
+
+  // Read the table properties, if provided.
+  if (rep->table_properties) {
+    rep->whole_key_filtering &=
+        IsFeatureSupported(*(rep->table_properties),
+                           BlockBasedTablePropertyNames::kWholeKeyFiltering,
+                           rep->ioptions.info_log);
+    rep->prefix_filtering &= IsFeatureSupported(
+        *(rep->table_properties),
+        BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log);
+
+    s = GetGlobalSequenceNumber(*(rep->table_properties), largest_seqno,
+                                &(rep->global_seqno));
+    if (!s.ok()) {
+      ROCKS_LOG_ERROR(rep->ioptions.info_log, "%s", s.ToString().c_str());
+      return s;
     }
   }
 
@@ -678,9 +967,10 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
   } else {
     if (found_range_del_block && !rep->range_del_handle.IsNull()) {
       ReadOptions read_options;
-      s = MaybeLoadDataBlockToCache(rep, read_options, rep->range_del_handle,
-                                    Slice() /* compression_dict */,
-                                    &rep->range_del_entry);
+      s = MaybeLoadDataBlockToCache(
+          prefetch_buffer.get(), rep, read_options, rep->range_del_handle,
+          Slice() /* compression_dict */, &rep->range_del_entry,
+          false /* is_index */, nullptr /* get_context */);
       if (!s.ok()) {
         ROCKS_LOG_WARN(
             rep->ioptions.info_log,
@@ -690,77 +980,108 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
     }
   }
 
-  // Determine whether whole key filtering is supported.
-  if (rep->table_properties) {
-    rep->whole_key_filtering &=
-        IsFeatureSupported(*(rep->table_properties),
-                           BlockBasedTablePropertyNames::kWholeKeyFiltering,
-                           rep->ioptions.info_log);
-    rep->prefix_filtering &= IsFeatureSupported(
-        *(rep->table_properties),
-        BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log);
-
-    rep->global_seqno = GetGlobalSequenceNumber(*(rep->table_properties),
-                                                rep->ioptions.info_log);
-  }
-
-    // pre-fetching of blocks is turned on
+  bool need_upper_bound_check =
+      PrefixExtractorChanged(rep->table_properties.get(), prefix_extractor);
+
+  BlockBasedTableOptions::IndexType index_type = new_table->UpdateIndexType();
+  // prefetch the first level of index
+  const bool prefetch_index =
+      prefetch_all ||
+      (table_options.pin_top_level_index_and_filter &&
+       index_type == BlockBasedTableOptions::kTwoLevelIndexSearch);
+  // prefetch the first level of filter
+  const bool prefetch_filter =
+      prefetch_all || (table_options.pin_top_level_index_and_filter &&
+                       rep->filter_type == Rep::FilterType::kPartitionedFilter);
+  // Partition fitlers cannot be enabled without partition indexes
+  assert(!prefetch_filter || prefetch_index);
+  // pin both index and filters, down to all partitions
+  const bool pin_all =
+      rep->table_options.pin_l0_filter_and_index_blocks_in_cache && level == 0;
+  // pin the first level of index
+  const bool pin_index =
+      pin_all || (table_options.pin_top_level_index_and_filter &&
+                  index_type == BlockBasedTableOptions::kTwoLevelIndexSearch);
+  // pin the first level of filter
+  const bool pin_filter =
+      pin_all || (table_options.pin_top_level_index_and_filter &&
+                  rep->filter_type == Rep::FilterType::kPartitionedFilter);
+  // pre-fetching of blocks is turned on
   // Will use block cache for index/filter blocks access
   // Always prefetch index and filter for level 0
   if (table_options.cache_index_and_filter_blocks) {
-    if (prefetch_index_and_filter_in_cache || level == 0) {
-      assert(table_options.block_cache != nullptr);
+    assert(table_options.block_cache != nullptr);
+    if (prefetch_index) {
       // Hack: Call NewIndexIterator() to implicitly add index to the
       // block_cache
-
-      // if pin_l0_filter_and_index_blocks_in_cache is true and this is
-      // a level0 file, then we will pass in this pointer to rep->index
-      // to NewIndexIterator(), which will save the index block in there
-      // else it's a nullptr and nothing special happens
-      CachableEntry<IndexReader>* index_entry = nullptr;
-      if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache &&
-          level == 0) {
-        index_entry = &rep->index_entry;
-      }
-      unique_ptr<InternalIterator> iter(
-          new_table->NewIndexIterator(ReadOptions(), nullptr, index_entry));
+      CachableEntry<IndexReader> index_entry;
+      // check prefix_extractor match only if hash based index is used
+      bool disable_prefix_seek =
+          rep->index_type == BlockBasedTableOptions::kHashSearch &&
+          need_upper_bound_check;
+      unique_ptr<InternalIteratorBase<BlockHandle>> iter(
+          new_table->NewIndexIterator(ReadOptions(), disable_prefix_seek,
+                                      nullptr, &index_entry));
       s = iter->status();
-
       if (s.ok()) {
-        // Hack: Call GetFilter() to implicitly add filter to the block_cache
-        auto filter_entry = new_table->GetFilter();
-        // if pin_l0_filter_and_index_blocks_in_cache is true, and this is
-        // a level0 file, then save it in rep_->filter_entry; it will be
-        // released in the destructor only, hence it will be pinned in the
-        // cache while this reader is alive
-        if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache &&
-            level == 0) {
-          rep->filter_entry = filter_entry;
-          if (rep->filter_entry.value != nullptr) {
-            rep->filter_entry.value->SetLevel(level);
-          }
+        // This is the first call to NewIndexIterator() since we're in Open().
+        // On success it should give us ownership of the `CachableEntry` by
+        // populating `index_entry`.
+        assert(index_entry.value != nullptr);
+        if (prefetch_all) {
+          index_entry.value->CacheDependencies(pin_all);
+        }
+        if (pin_index) {
+          rep->index_entry = std::move(index_entry);
         } else {
-          filter_entry.Release(table_options.block_cache.get());
+          index_entry.Release(table_options.block_cache.get());
         }
       }
     }
+    if (s.ok() && prefetch_filter) {
+      // Hack: Call GetFilter() to implicitly add filter to the block_cache
+      auto filter_entry =
+          new_table->GetFilter(rep->table_prefix_extractor.get());
+      if (filter_entry.value != nullptr && prefetch_all) {
+        filter_entry.value->CacheDependencies(
+            pin_all, rep->table_prefix_extractor.get());
+      }
+      // if pin_filter is true then save it in rep_->filter_entry; it will be
+      // released in the destructor only, hence it will be pinned in the
+      // cache while this reader is alive
+      if (pin_filter) {
+        rep->filter_entry = filter_entry;
+      } else {
+        filter_entry.Release(table_options.block_cache.get());
+      }
+    }
   } else {
     // If we don't use block cache for index/filter blocks access, we'll
     // pre-load these blocks, which will kept in member variables in Rep
     // and with a same life-time as this table object.
     IndexReader* index_reader = nullptr;
-    s = new_table->CreateIndexReader(&index_reader, meta_iter.get(), level);
-
+    s = new_table->CreateIndexReader(prefetch_buffer.get(), &index_reader,
+                                     meta_iter.get(), level);
     if (s.ok()) {
       rep->index_reader.reset(index_reader);
+      // The partitions of partitioned index are always stored in cache. They
+      // are hence follow the configuration for pin and prefetch regardless of
+      // the value of cache_index_and_filter_blocks
+      if (prefetch_index_and_filter_in_cache || level == 0) {
+        rep->index_reader->CacheDependencies(pin_all);
+      }
 
       // Set filter block
       if (rep->filter_policy) {
         const bool is_a_filter_partition = true;
-        rep->filter.reset(
-            new_table->ReadFilter(rep->filter_handle, !is_a_filter_partition));
-        if (rep->filter.get()) {
-          rep->filter->SetLevel(level);
+        auto filter = new_table->ReadFilter(
+            prefetch_buffer.get(), rep->filter_handle, !is_a_filter_partition,
+            rep->table_prefix_extractor.get());
+        rep->filter.reset(filter);
+        // Refer to the comment above about paritioned indexes always being
+        // cached
+        if (filter && (prefetch_index_and_filter_in_cache || level == 0)) {
+          filter->CacheDependencies(pin_all, rep->table_prefix_extractor.get());
         }
       }
     } else {
@@ -769,6 +1090,12 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
   }
 
   if (s.ok()) {
+    assert(prefetch_buffer.get() != nullptr);
+    if (tail_prefetch_stats != nullptr) {
+      assert(prefetch_buffer->min_offset_read() < file_size);
+      tail_prefetch_stats->RecordEffectiveSize(
+                               static_cast<size_t>(file_size) - prefetch_buffer->min_offset_read());
+    }
     *table_reader = std::move(new_table);
   }
 
@@ -791,7 +1118,6 @@ void BlockBasedTable::SetupForCompaction() {
     default:
       assert(false);
   }
-  compaction_optimized_ = true;
 }
 
 std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties()
@@ -813,14 +1139,14 @@ size_t BlockBasedTable::ApproximateMemoryUsage() const {
 // Load the meta-block from the file. On success, return the loaded meta block
 // and its iterator.
 Status BlockBasedTable::ReadMetaBlock(Rep* rep,
+                                      FilePrefetchBuffer* prefetch_buffer,
                                       std::unique_ptr<Block>* meta_block,
                                       std::unique_ptr<InternalIterator>* iter) {
   // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
   // it is an empty block.
-  //  TODO: we never really verify check sum for meta index block
   std::unique_ptr<Block> meta;
   Status s = ReadBlockFromFile(
-      rep->file.get(), rep->footer, ReadOptions(),
+      rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(),
       rep->footer.metaindex_handle(), &meta, rep->ioptions,
       true /* decompress */, Slice() /*compression dict*/,
       rep->persistent_cache_options, kDisableGlobalSequenceNumber,
@@ -836,7 +1162,8 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
 
   *meta_block = std::move(meta);
   // meta block uses bytewise comparator.
-  iter->reset(meta_block->get()->NewIterator(BytewiseComparator()));
+  iter->reset(meta_block->get()->NewIterator<DataBlockIter>(
+      BytewiseComparator(), BytewiseComparator()));
   return Status::OK();
 }
 
@@ -845,8 +1172,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
     Cache* block_cache, Cache* block_cache_compressed,
     const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
     BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
-    const Slice& compression_dict, size_t read_amp_bytes_per_bit,
-    bool is_index) {
+    const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
+    GetContext* get_context) {
   Status s;
   Block* compressed_block = nullptr;
   Cache::Handle* block_cache_compressed_handle = nullptr;
@@ -857,7 +1184,16 @@ Status BlockBasedTable::GetDataBlockFromCache(
     block->cache_handle = GetEntryFromCache(
         block_cache, block_cache_key,
         is_index ? BLOCK_CACHE_INDEX_MISS : BLOCK_CACHE_DATA_MISS,
-        is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT, statistics);
+        is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT,
+        get_context
+            ? (is_index ? &get_context->get_context_stats_.num_cache_index_miss
+                        : &get_context->get_context_stats_.num_cache_data_miss)
+            : nullptr,
+        get_context
+            ? (is_index ? &get_context->get_context_stats_.num_cache_index_hit
+                        : &get_context->get_context_stats_.num_cache_data_hit)
+            : nullptr,
+        statistics, get_context);
     if (block->cache_handle != nullptr) {
       block->value =
           reinterpret_cast<Block*>(block_cache->Value(block->cache_handle));
@@ -890,10 +1226,11 @@ Status BlockBasedTable::GetDataBlockFromCache(
 
   // Retrieve the uncompressed contents into a new buffer
   BlockContents contents;
-  s = UncompressBlockContents(compressed_block->data(),
+  UncompressionContext uncompresssion_ctx(compressed_block->compression_type(),
+                                          compression_dict);
+  s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(),
                               compressed_block->size(), &contents,
-                              format_version, compression_dict,
-                              ioptions);
+                              format_version, ioptions);
 
   // Insert uncompressed block into block cache
   if (s.ok()) {
@@ -904,22 +1241,38 @@ Status BlockBasedTable::GetDataBlockFromCache(
     assert(block->value->compression_type() == kNoCompression);
     if (block_cache != nullptr && block->value->cachable() &&
         read_options.fill_cache) {
-      s = block_cache->Insert(
-          block_cache_key, block->value, block->value->usable_size(),
-          &DeleteCachedEntry<Block>, &(block->cache_handle));
+      size_t charge = block->value->ApproximateMemoryUsage();
+      s = block_cache->Insert(block_cache_key, block->value, charge,
+                              &DeleteCachedEntry<Block>,
+                              &(block->cache_handle));
+      block_cache->TEST_mark_as_data_block(block_cache_key, charge);
       if (s.ok()) {
-        RecordTick(statistics, BLOCK_CACHE_ADD);
+        if (get_context != nullptr) {
+          get_context->get_context_stats_.num_cache_add++;
+          get_context->get_context_stats_.num_cache_bytes_write += charge;
+        } else {
+          RecordTick(statistics, BLOCK_CACHE_ADD);
+          RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, charge);
+        }
         if (is_index) {
-          RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
-          RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT,
-                     block->value->usable_size());
+          if (get_context != nullptr) {
+            get_context->get_context_stats_.num_cache_index_add++;
+            get_context->get_context_stats_.num_cache_index_bytes_insert +=
+                charge;
+          } else {
+            RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
+            RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, charge);
+          }
         } else {
-          RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
-          RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT,
-                     block->value->usable_size());
+          if (get_context != nullptr) {
+            get_context->get_context_stats_.num_cache_data_add++;
+            get_context->get_context_stats_.num_cache_data_bytes_insert +=
+                charge;
+          } else {
+            RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
+            RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, charge);
+          }
         }
-        RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
-                   block->value->usable_size());
       } else {
         RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
         delete block->value;
@@ -936,10 +1289,10 @@ Status BlockBasedTable::GetDataBlockFromCache(
 Status BlockBasedTable::PutDataBlockToCache(
     const Slice& block_cache_key, const Slice& compressed_block_cache_key,
     Cache* block_cache, Cache* block_cache_compressed,
-    const ReadOptions& read_options, const ImmutableCFOptions& ioptions,
+    const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions,
     CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
     const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
-    Cache::Priority priority) {
+    Cache::Priority priority, GetContext* get_context) {
   assert(raw_block->compression_type() == kNoCompression ||
          block_cache_compressed != nullptr);
 
@@ -948,8 +1301,11 @@ Status BlockBasedTable::PutDataBlockToCache(
   BlockContents contents;
   Statistics* statistics = ioptions.statistics;
   if (raw_block->compression_type() != kNoCompression) {
-    s = UncompressBlockContents(raw_block->data(), raw_block->size(), &contents,
-                                format_version, compression_dict, ioptions);
+    UncompressionContext uncompression_ctx(raw_block->compression_type(),
+                                           compression_dict);
+    s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
+                                raw_block->size(), &contents, format_version,
+                                ioptions);
   }
   if (!s.ok()) {
     delete raw_block;
@@ -970,7 +1326,7 @@ Status BlockBasedTable::PutDataBlockToCache(
   if (block_cache_compressed != nullptr && raw_block != nullptr &&
       raw_block->cachable()) {
     s = block_cache_compressed->Insert(compressed_block_cache_key, raw_block,
-                                       raw_block->usable_size(),
+                                       raw_block->ApproximateMemoryUsage(),
                                        &DeleteCachedEntry<Block>);
     if (s.ok()) {
       // Avoid the following code to delete this cached block.
@@ -985,23 +1341,38 @@ Status BlockBasedTable::PutDataBlockToCache(
   // insert into uncompressed block cache
   assert((block->value->compression_type() == kNoCompression));
   if (block_cache != nullptr && block->value->cachable()) {
-    s = block_cache->Insert(
-        block_cache_key, block->value, block->value->usable_size(),
-        &DeleteCachedEntry<Block>, &(block->cache_handle), priority);
+    size_t charge = block->value->ApproximateMemoryUsage();
+    s = block_cache->Insert(block_cache_key, block->value, charge,
+                            &DeleteCachedEntry<Block>, &(block->cache_handle),
+                            priority);
+    block_cache->TEST_mark_as_data_block(block_cache_key, charge);
     if (s.ok()) {
       assert(block->cache_handle != nullptr);
-      RecordTick(statistics, BLOCK_CACHE_ADD);
+      if (get_context != nullptr) {
+        get_context->get_context_stats_.num_cache_add++;
+        get_context->get_context_stats_.num_cache_bytes_write += charge;
+      } else {
+        RecordTick(statistics, BLOCK_CACHE_ADD);
+        RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, charge);
+      }
       if (is_index) {
-        RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
-        RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT,
-                   block->value->usable_size());
+        if (get_context != nullptr) {
+          get_context->get_context_stats_.num_cache_index_add++;
+          get_context->get_context_stats_.num_cache_index_bytes_insert +=
+              charge;
+        } else {
+          RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
+          RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, charge);
+        }
       } else {
-        RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
-        RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT,
-                   block->value->usable_size());
+        if (get_context != nullptr) {
+          get_context->get_context_stats_.num_cache_data_add++;
+          get_context->get_context_stats_.num_cache_data_bytes_insert += charge;
+        } else {
+          RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
+          RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, charge);
+        }
       }
-      RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
-                 block->value->usable_size());
       assert(reinterpret_cast<Block*>(
                  block_cache->Value(block->cache_handle)) == block->value);
     } else {
@@ -1015,7 +1386,9 @@ Status BlockBasedTable::PutDataBlockToCache(
 }
 
 FilterBlockReader* BlockBasedTable::ReadFilter(
-    const BlockHandle& filter_handle, const bool is_a_filter_partition) const {
+    FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_handle,
+    const bool is_a_filter_partition,
+    const SliceTransform* prefix_extractor) const {
   auto& rep = rep_;
   // TODO: We might want to unify with ReadBlockFromFile() if we start
   // requiring checksum verification in Table::Open.
@@ -1023,11 +1396,16 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
     return nullptr;
   }
   BlockContents block;
-  if (!ReadBlockContents(rep->file.get(), rep->footer, ReadOptions(),
-                         filter_handle, &block, rep->ioptions,
-                         false /* decompress */, Slice() /*compression dict*/,
-                         rep->persistent_cache_options)
-           .ok()) {
+
+  Slice dummy_comp_dict;
+
+  BlockFetcher block_fetcher(rep->file.get(), prefetch_buffer, rep->footer,
+                             ReadOptions(), filter_handle, &block,
+                             rep->ioptions, false /* decompress */,
+                             dummy_comp_dict, rep->persistent_cache_options);
+  Status s = block_fetcher.ReadBlockContents();
+
+  if (!s.ok()) {
     // Error reading the block
     return nullptr;
   }
@@ -1043,14 +1421,18 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
   switch (filter_type) {
     case Rep::FilterType::kPartitionedFilter: {
       return new PartitionedFilterBlockReader(
-          rep->prefix_filtering ? rep->ioptions.prefix_extractor : nullptr,
+          rep->prefix_filtering ? prefix_extractor : nullptr,
           rep->whole_key_filtering, std::move(block), nullptr,
-          rep->ioptions.statistics, rep->internal_comparator, this);
+          rep->ioptions.statistics, rep->internal_comparator, this,
+          rep_->table_properties == nullptr ||
+              rep_->table_properties->index_key_is_user_key == 0,
+          rep_->table_properties == nullptr ||
+              rep_->table_properties->index_value_is_delta_encoded == 0);
     }
 
     case Rep::FilterType::kBlockFilter:
       return new BlockBasedFilterBlockReader(
-          rep->prefix_filtering ? rep->ioptions.prefix_extractor : nullptr,
+          rep->prefix_filtering ? prefix_extractor : nullptr,
           rep->table_options, rep->whole_key_filtering, std::move(block),
           rep->ioptions.statistics);
 
@@ -1059,7 +1441,7 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
           rep->filter_policy->GetFilterBitsReader(block.data);
       assert(filter_bits_reader != nullptr);
       return new FullFilterBlockReader(
-          rep->prefix_filtering ? rep->ioptions.prefix_extractor : nullptr,
+          rep->prefix_filtering ? prefix_extractor : nullptr,
           rep->whole_key_filtering, std::move(block), filter_bits_reader,
           rep->ioptions.statistics);
     }
@@ -1073,15 +1455,18 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
 }
 
 BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
-                                                          bool no_io) const {
+    const SliceTransform* prefix_extractor, FilePrefetchBuffer* prefetch_buffer,
+    bool no_io, GetContext* get_context) const {
   const BlockHandle& filter_blk_handle = rep_->filter_handle;
   const bool is_a_filter_partition = true;
-  return GetFilter(filter_blk_handle, !is_a_filter_partition, no_io);
+  return GetFilter(prefetch_buffer, filter_blk_handle, !is_a_filter_partition,
+                   no_io, get_context, prefix_extractor);
 }
 
 BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
-    const BlockHandle& filter_blk_handle, const bool is_a_filter_partition,
-    bool no_io) const {
+    FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle,
+    const bool is_a_filter_partition, bool no_io, GetContext* get_context,
+    const SliceTransform* prefix_extractor) const {
   // If cache_index_and_filter_blocks is false, filter should be pre-populated.
   // We will return rep_->filter anyway. rep_->filter can be nullptr if filter
   // read fails at Open() time. We don't want to reload again since it will
@@ -1109,9 +1494,13 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
                          filter_blk_handle, cache_key);
 
   Statistics* statistics = rep_->ioptions.statistics;
-  auto cache_handle =
-      GetEntryFromCache(block_cache, key, BLOCK_CACHE_FILTER_MISS,
-                        BLOCK_CACHE_FILTER_HIT, statistics);
+  auto cache_handle = GetEntryFromCache(
+      block_cache, key, BLOCK_CACHE_FILTER_MISS, BLOCK_CACHE_FILTER_HIT,
+      get_context ? &get_context->get_context_stats_.num_cache_filter_miss
+                  : nullptr,
+      get_context ? &get_context->get_context_stats_.num_cache_filter_hit
+                  : nullptr,
+      statistics, get_context);
 
   FilterBlockReader* filter = nullptr;
   if (cache_handle != nullptr) {
@@ -1121,19 +1510,28 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
     // Do not invoke any io.
     return CachableEntry<FilterBlockReader>();
   } else {
-    filter = ReadFilter(filter_blk_handle, is_a_filter_partition);
+    filter = ReadFilter(prefetch_buffer, filter_blk_handle,
+                        is_a_filter_partition, prefix_extractor);
     if (filter != nullptr) {
-      assert(filter->size() > 0);
+      size_t usage = filter->ApproximateMemoryUsage();
       Status s = block_cache->Insert(
-          key, filter, filter->size(), &DeleteCachedFilterEntry, &cache_handle,
+          key, filter, usage, &DeleteCachedFilterEntry, &cache_handle,
           rep_->table_options.cache_index_and_filter_blocks_with_high_priority
               ? Cache::Priority::HIGH
               : Cache::Priority::LOW);
       if (s.ok()) {
-        RecordTick(statistics, BLOCK_CACHE_ADD);
-        RecordTick(statistics, BLOCK_CACHE_FILTER_ADD);
-        RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT, filter->size());
-        RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter->size());
+        if (get_context != nullptr) {
+          get_context->get_context_stats_.num_cache_add++;
+          get_context->get_context_stats_.num_cache_bytes_write += usage;
+          get_context->get_context_stats_.num_cache_filter_add++;
+          get_context->get_context_stats_.num_cache_filter_bytes_insert +=
+              usage;
+        } else {
+          RecordTick(statistics, BLOCK_CACHE_ADD);
+          RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usage);
+          RecordTick(statistics, BLOCK_CACHE_FILTER_ADD);
+          RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT, usage);
+        }
       } else {
         RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
         delete filter;
@@ -1145,18 +1543,23 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
   return { filter, cache_handle };
 }
 
-InternalIterator* BlockBasedTable::NewIndexIterator(
-    const ReadOptions& read_options, BlockIter* input_iter,
-    CachableEntry<IndexReader>* index_entry) {
+// disable_prefix_seek should be set to true when prefix_extractor found in SST
+// differs from the one in mutable_cf_options and index type is HashBasedIndex
+InternalIteratorBase<BlockHandle>* BlockBasedTable::NewIndexIterator(
+    const ReadOptions& read_options, bool disable_prefix_seek,
+    IndexBlockIter* input_iter, CachableEntry<IndexReader>* index_entry,
+    GetContext* get_context) {
   // index reader has already been pre-populated.
   if (rep_->index_reader) {
     return rep_->index_reader->NewIterator(
-        input_iter, read_options.total_order_seek);
+        input_iter, read_options.total_order_seek || disable_prefix_seek,
+        read_options.fill_cache);
   }
   // we have a pinned index block
   if (rep_->index_entry.IsSet()) {
-    return rep_->index_entry.value->NewIterator(input_iter,
-                                                read_options.total_order_seek);
+    return rep_->index_entry.value->NewIterator(
+        input_iter, read_options.total_order_seek || disable_prefix_seek,
+        read_options.fill_cache);
   }
 
   PERF_TIMER_GUARD(read_index_block_nanos);
@@ -1168,16 +1571,21 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
       GetCacheKeyFromOffset(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
                             rep_->dummy_index_reader_offset, cache_key);
   Statistics* statistics = rep_->ioptions.statistics;
-  auto cache_handle =
-      GetEntryFromCache(block_cache, key, BLOCK_CACHE_INDEX_MISS,
-                        BLOCK_CACHE_INDEX_HIT, statistics);
+  auto cache_handle = GetEntryFromCache(
+      block_cache, key, BLOCK_CACHE_INDEX_MISS, BLOCK_CACHE_INDEX_HIT,
+      get_context ? &get_context->get_context_stats_.num_cache_index_miss
+                  : nullptr,
+      get_context ? &get_context->get_context_stats_.num_cache_index_hit
+                  : nullptr,
+      statistics, get_context);
 
   if (cache_handle == nullptr && no_io) {
     if (input_iter != nullptr) {
-      input_iter->SetStatus(Status::Incomplete("no blocking io"));
+      input_iter->Invalidate(Status::Incomplete("no blocking io"));
       return input_iter;
     } else {
-      return NewErrorInternalIterator(Status::Incomplete("no blocking io"));
+      return NewErrorInternalIterator<BlockHandle>(
+          Status::Incomplete("no blocking io"));
     }
   }
 
@@ -1189,26 +1597,31 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
     // Create index reader and put it in the cache.
     Status s;
     TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:2");
-    s = CreateIndexReader(&index_reader);
+    s = CreateIndexReader(nullptr /* prefetch_buffer */, &index_reader);
     TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:1");
     TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:3");
     TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:4");
+    size_t charge = 0;
     if (s.ok()) {
       assert(index_reader != nullptr);
+      charge = index_reader->ApproximateMemoryUsage();
       s = block_cache->Insert(
-          key, index_reader, index_reader->usable_size(),
-          &DeleteCachedIndexEntry, &cache_handle,
+          key, index_reader, charge, &DeleteCachedIndexEntry, &cache_handle,
           rep_->table_options.cache_index_and_filter_blocks_with_high_priority
               ? Cache::Priority::HIGH
               : Cache::Priority::LOW);
     }
 
     if (s.ok()) {
-      size_t usable_size = index_reader->usable_size();
-      RecordTick(statistics, BLOCK_CACHE_ADD);
+      if (get_context != nullptr) {
+        get_context->get_context_stats_.num_cache_add++;
+        get_context->get_context_stats_.num_cache_bytes_write += charge;
+      } else {
+        RecordTick(statistics, BLOCK_CACHE_ADD);
+        RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, charge);
+      }
       RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
-      RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, usable_size);
-      RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usable_size);
+      RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, charge);
     } else {
       if (index_reader != nullptr) {
         delete index_reader;
@@ -1216,10 +1629,10 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
       RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
       // make sure if something goes wrong, index_reader shall remain intact.
       if (input_iter != nullptr) {
-        input_iter->SetStatus(s);
+        input_iter->Invalidate(s);
         return input_iter;
       } else {
-        return NewErrorInternalIterator(s);
+        return NewErrorInternalIterator<BlockHandle>(s);
       }
     }
 
@@ -1227,7 +1640,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
 
   assert(cache_handle);
   auto* iter = index_reader->NewIterator(
-      input_iter, read_options.total_order_seek);
+      input_iter, read_options.total_order_seek || disable_prefix_seek);
 
   // the caller would like to take ownership of the index block
   // don't call RegisterCleanup() in this case, the caller will take care of it
@@ -1240,24 +1653,16 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
   return iter;
 }
 
-InternalIterator* BlockBasedTable::NewDataBlockIterator(
-    Rep* rep, const ReadOptions& ro, const Slice& index_value,
-    BlockIter* input_iter, bool is_index) {
-  BlockHandle handle;
-  Slice input = index_value;
-  // We intentionally allow extra stuff in index_value so that we
-  // can add more features in the future.
-  Status s = handle.DecodeFrom(&input);
-  return NewDataBlockIterator(rep, ro, handle, input_iter, is_index, s);
-}
-
 // Convert an index iterator value (i.e., an encoded BlockHandle)
 // into an iterator over the contents of the corresponding block.
 // If input_iter is null, new a iterator
 // If input_iter is not null, update this iter and return it
-InternalIterator* BlockBasedTable::NewDataBlockIterator(
+template <typename TBlockIter>
+TBlockIter* BlockBasedTable::NewDataBlockIterator(
     Rep* rep, const ReadOptions& ro, const BlockHandle& handle,
-    BlockIter* input_iter, bool is_index, Status s) {
+    TBlockIter* input_iter, bool is_index, bool key_includes_seq,
+    bool index_key_is_full, GetContext* get_context, Status s,
+    FilePrefetchBuffer* prefetch_buffer) {
   PERF_TIMER_GUARD(new_table_block_iter_nanos);
 
   const bool no_io = (ro.read_tier == kBlockCacheTier);
@@ -1268,57 +1673,96 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
     if (rep->compression_dict_block) {
       compression_dict = rep->compression_dict_block->data;
     }
-    s = MaybeLoadDataBlockToCache(rep, ro, handle, compression_dict, &block,
-                                  is_index);
+    s = MaybeLoadDataBlockToCache(prefetch_buffer, rep, ro, handle,
+                                  compression_dict, &block, is_index,
+                                  get_context);
   }
 
+  TBlockIter* iter;
+  if (input_iter != nullptr) {
+    iter = input_iter;
+  } else {
+    iter = new TBlockIter;
+  }
   // Didn't get any data from block caches.
   if (s.ok() && block.value == nullptr) {
     if (no_io) {
       // Could not read from block_cache and can't do IO
-      if (input_iter != nullptr) {
-        input_iter->SetStatus(Status::Incomplete("no blocking io"));
-        return input_iter;
-      } else {
-        return NewErrorInternalIterator(Status::Incomplete("no blocking io"));
-      }
+      iter->Invalidate(Status::Incomplete("no blocking io"));
+      return iter;
     }
     std::unique_ptr<Block> block_value;
-    s = ReadBlockFromFile(
-        rep->file.get(), rep->footer, ro, handle, &block_value, rep->ioptions,
-        true /* compress */, compression_dict, rep->persistent_cache_options,
-        rep->global_seqno, rep->table_options.read_amp_bytes_per_bit);
+    {
+      StopWatch sw(rep->ioptions.env, rep->ioptions.statistics,
+                   READ_BLOCK_GET_MICROS);
+      s = ReadBlockFromFile(
+          rep->file.get(), prefetch_buffer, rep->footer, ro, handle,
+          &block_value, rep->ioptions, rep->blocks_maybe_compressed,
+          compression_dict, rep->persistent_cache_options,
+          is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
+          rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
+    }
     if (s.ok()) {
       block.value = block_value.release();
     }
   }
 
-  InternalIterator* iter;
   if (s.ok()) {
     assert(block.value != nullptr);
-    iter = block.value->NewIterator(&rep->internal_comparator, input_iter, true,
-                                    rep->ioptions.statistics);
+    const bool kTotalOrderSeek = true;
+    iter = block.value->NewIterator<TBlockIter>(
+        &rep->internal_comparator, rep->internal_comparator.user_comparator(),
+        iter, rep->ioptions.statistics, kTotalOrderSeek, key_includes_seq,
+        index_key_is_full);
     if (block.cache_handle != nullptr) {
       iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
                             block.cache_handle);
     } else {
+      if (!ro.fill_cache && rep->cache_key_prefix_size != 0) {
+        // insert a dummy record to block cache to track the memory usage
+        Cache::Handle* cache_handle;
+        // There are two other types of cache keys: 1) SST cache key added in
+        // `MaybeLoadDataBlockToCache` 2) dummy cache key added in
+        // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
+        // from SST cache key(31 bytes), and use non-zero prefix to
+        // differentiate from `write_buffer_manager`
+        const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
+        char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
+        // Prefix: use rep->cache_key_prefix padded by 0s
+        memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
+        assert(rep->cache_key_prefix_size != 0);
+        assert(rep->cache_key_prefix_size <= kExtraCacheKeyPrefix);
+        memcpy(cache_key, rep->cache_key_prefix, rep->cache_key_prefix_size);
+        char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
+                                   next_cache_key_id_++);
+        assert(end - cache_key <=
+               static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
+        Slice unique_key =
+            Slice(cache_key, static_cast<size_t>(end - cache_key));
+        s = block_cache->Insert(unique_key, nullptr,
+                                block.value->ApproximateMemoryUsage(), nullptr,
+                                &cache_handle);
+        if (s.ok()) {
+          if (cache_handle != nullptr) {
+            iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
+                                  cache_handle);
+          }
+        }
+      }
       iter->RegisterCleanup(&DeleteHeldResource<Block>, block.value, nullptr);
     }
   } else {
     assert(block.value == nullptr);
-    if (input_iter != nullptr) {
-      input_iter->SetStatus(s);
-      iter = input_iter;
-    } else {
-      iter = NewErrorInternalIterator(s);
-    }
+    iter->Invalidate(s);
   }
   return iter;
 }
 
 Status BlockBasedTable::MaybeLoadDataBlockToCache(
-    Rep* rep, const ReadOptions& ro, const BlockHandle& handle,
-    Slice compression_dict, CachableEntry<Block>* block_entry, bool is_index) {
+    FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro,
+    const BlockHandle& handle, Slice compression_dict,
+    CachableEntry<Block>* block_entry, bool is_index, GetContext* get_context) {
+  assert(block_entry != nullptr);
   const bool no_io = (ro.read_tier == kBlockCacheTier);
   Cache* block_cache = rep->table_options.block_cache.get();
   Cache* block_cache_compressed =
@@ -1348,17 +1792,19 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
     s = GetDataBlockFromCache(
         key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
         block_entry, rep->table_options.format_version, compression_dict,
-        rep->table_options.read_amp_bytes_per_bit, is_index);
+        rep->table_options.read_amp_bytes_per_bit, is_index, get_context);
 
     if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
       std::unique_ptr<Block> raw_block;
       {
         StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
         s = ReadBlockFromFile(
-            rep->file.get(), rep->footer, ro, handle, &raw_block, rep->ioptions,
-            block_cache_compressed == nullptr, compression_dict,
-            rep->persistent_cache_options, rep->global_seqno,
-            rep->table_options.read_amp_bytes_per_bit);
+            rep->file.get(), prefetch_buffer, rep->footer, ro, handle,
+            &raw_block, rep->ioptions,
+            block_cache_compressed == nullptr && rep->blocks_maybe_compressed,
+            compression_dict, rep->persistent_cache_options,
+            is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
+            rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
       }
 
       if (s.ok()) {
@@ -1367,59 +1813,54 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
             block_entry, raw_block.release(), rep->table_options.format_version,
             compression_dict, rep->table_options.read_amp_bytes_per_bit,
             is_index,
-            is_index &&
-                    rep->table_options
-                        .cache_index_and_filter_blocks_with_high_priority
+            is_index && rep->table_options
+                            .cache_index_and_filter_blocks_with_high_priority
                 ? Cache::Priority::HIGH
-                : Cache::Priority::LOW);
+                : Cache::Priority::LOW,
+            get_context);
       }
     }
   }
+  assert(s.ok() || block_entry->value == nullptr);
   return s;
 }
 
-BlockBasedTable::BlockEntryIteratorState::BlockEntryIteratorState(
-    BlockBasedTable* table, const ReadOptions& read_options, bool skip_filters,
-    bool is_index, Cleanable* block_cache_cleaner)
-    : TwoLevelIteratorState(table->rep_->ioptions.prefix_extractor != nullptr),
-      table_(table),
-      read_options_(read_options),
-      skip_filters_(skip_filters),
-      is_index_(is_index),
-      block_cache_cleaner_(block_cache_cleaner) {}
-
-InternalIterator*
-BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator(
-    const Slice& index_value) {
+BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState(
+    BlockBasedTable* table,
+    std::unordered_map<uint64_t, CachableEntry<Block>>* block_map,
+    bool index_key_includes_seq, bool index_key_is_full)
+    : table_(table),
+      block_map_(block_map),
+      index_key_includes_seq_(index_key_includes_seq),
+      index_key_is_full_(index_key_is_full) {}
+
+template <class TBlockIter, typename TValue>
+const size_t BlockBasedTableIterator<TBlockIter, TValue>::kMaxReadaheadSize =
+    256 * 1024;
+
+InternalIteratorBase<BlockHandle>*
+BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator(
+    const BlockHandle& handle) {
   // Return a block iterator on the index partition
-  BlockHandle handle;
-  Slice input = index_value;
-  Status s = handle.DecodeFrom(&input);
-  auto iter = NewDataBlockIterator(table_->rep_, read_options_, handle, nullptr,
-                                   is_index_, s);
-  if (block_cache_cleaner_) {
-    uint64_t offset = handle.offset();
-    {
-      ReadLock rl(&cleaner_mu);
-      if (cleaner_set.find(offset) != cleaner_set.end()) {
-        // already have a refernce to the block cache objects
-        return iter;
-      }
-    }
-    WriteLock wl(&cleaner_mu);
-    cleaner_set.insert(offset);
-    // Keep the data into cache until the cleaner cleansup
-    iter->DelegateCleanupsTo(block_cache_cleaner_);
-  }
-  return iter;
-}
-
-bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch(
-    const Slice& internal_key) {
-  if (read_options_.total_order_seek || skip_filters_) {
-    return true;
-  }
-  return table_->PrefixMayMatch(internal_key);
+  auto rep = table_->get_rep();
+  auto block = block_map_->find(handle.offset());
+  // This is a possible scenario since block cache might not have had space
+  // for the partition
+  if (block != block_map_->end()) {
+    PERF_COUNTER_ADD(block_cache_hit_count, 1);
+    RecordTick(rep->ioptions.statistics, BLOCK_CACHE_INDEX_HIT);
+    RecordTick(rep->ioptions.statistics, BLOCK_CACHE_HIT);
+    Cache* block_cache = rep->table_options.block_cache.get();
+    assert(block_cache);
+    RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ,
+               block_cache->GetUsage(block->second.cache_handle));
+    Statistics* kNullStats = nullptr;
+    return block->second.value->NewIterator<IndexBlockIter>(
+        &rep->internal_comparator, rep->internal_comparator.user_comparator(),
+        nullptr, kNullStats, true, index_key_includes_seq_, index_key_is_full_);
+  }
+  // Create an empty iterator
+  return new IndexBlockIter();
 }
 
 // This will be broken if the user specifies an unusual implementation
@@ -1434,33 +1875,52 @@ bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch(
 // Otherwise, this method guarantees no I/O will be incurred.
 //
 // REQUIRES: this method shouldn't be called while the DB lock is held.
-bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
+bool BlockBasedTable::PrefixMayMatch(
+    const Slice& internal_key, const ReadOptions& read_options,
+    const SliceTransform* options_prefix_extractor,
+    const bool need_upper_bound_check) {
   if (!rep_->filter_policy) {
     return true;
   }
 
-  assert(rep_->ioptions.prefix_extractor != nullptr);
+  const SliceTransform* prefix_extractor;
+
+  if (rep_->table_prefix_extractor == nullptr) {
+    if (need_upper_bound_check) {
+      return true;
+    }
+    prefix_extractor = options_prefix_extractor;
+  } else {
+    prefix_extractor = rep_->table_prefix_extractor.get();
+  }
   auto user_key = ExtractUserKey(internal_key);
-  if (!rep_->ioptions.prefix_extractor->InDomain(user_key) ||
-      rep_->table_properties->prefix_extractor_name.compare(
-          rep_->ioptions.prefix_extractor->Name()) != 0) {
+  if (!prefix_extractor->InDomain(user_key)) {
     return true;
   }
-  auto prefix = rep_->ioptions.prefix_extractor->Transform(user_key);
 
   bool may_match = true;
   Status s;
 
   // First, try check with full filter
-  const bool no_io = true;
-  auto filter_entry = GetFilter(no_io);
+  auto filter_entry = GetFilter(prefix_extractor);
   FilterBlockReader* filter = filter_entry.value;
+  bool filter_checked = true;
   if (filter != nullptr) {
     if (!filter->IsBlockBased()) {
       const Slice* const const_ikey_ptr = &internal_key;
-      may_match =
-          filter->PrefixMayMatch(prefix, kNotValid, no_io, const_ikey_ptr);
+      may_match = filter->RangeMayExist(
+          read_options.iterate_upper_bound, user_key, prefix_extractor,
+          rep_->internal_comparator.user_comparator(), const_ikey_ptr,
+          &filter_checked, need_upper_bound_check);
     } else {
+      // if prefix_extractor changed for block based filter, skip filter
+      if (need_upper_bound_check) {
+        if (!rep_->filter_entry.IsSet()) {
+          filter_entry.Release(rep_->table_options.block_cache.get());
+        }
+        return true;
+      }
+      auto prefix = prefix_extractor->Transform(user_key);
       InternalKey internal_key_prefix(prefix, kMaxSequenceNumber, kTypeValue);
       auto internal_prefix = internal_key_prefix.Encode();
 
@@ -1471,7 +1931,11 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
       no_io_read_options.read_tier = kBlockCacheTier;
 
       // Then, try find it within each block
-      unique_ptr<InternalIterator> iiter(NewIndexIterator(no_io_read_options));
+      // we already know prefix_extractor and prefix_extractor_name must match
+      // because `CheckPrefixMayMatch` first checks `check_filter_ == true`
+      unique_ptr<InternalIteratorBase<BlockHandle>> iiter(
+          NewIndexIterator(no_io_read_options,
+                           /* need_upper_bound_check */ false));
       iiter->Seek(internal_prefix);
 
       if (!iiter->Valid()) {
@@ -1480,7 +1944,10 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
         // and we're not really sure that we're past the end
         // of the file
         may_match = iiter->status().IsIncomplete();
-      } else if (ExtractUserKey(iiter->key())
+      } else if ((rep_->table_properties &&
+                          rep_->table_properties->index_key_is_user_key
+                      ? iiter->key()
+                      : ExtractUserKey(iiter->key()))
                      .starts_with(ExtractUserKey(internal_prefix))) {
         // we need to check for this subtle case because our only
         // guarantee is that "the key is a string >= last key in that data
@@ -1499,19 +1966,19 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
         // after the data block corresponding to iiter->key() cannot
         // possibly contain the key.  Thus, the corresponding data block
         // is the only on could potentially contain the prefix.
-        Slice handle_value = iiter->value();
-        BlockHandle handle;
-        s = handle.DecodeFrom(&handle_value);
-        assert(s.ok());
-        may_match = filter->PrefixMayMatch(prefix, handle.offset());
+        BlockHandle handle = iiter->value();
+        may_match =
+            filter->PrefixMayMatch(prefix, prefix_extractor, handle.offset());
       }
     }
   }
 
-  Statistics* statistics = rep_->ioptions.statistics;
-  RecordTick(statistics, BLOOM_FILTER_PREFIX_CHECKED);
-  if (!may_match) {
-    RecordTick(statistics, BLOOM_FILTER_PREFIX_USEFUL);
+  if (filter_checked) {
+    Statistics* statistics = rep_->ioptions.statistics;
+    RecordTick(statistics, BLOOM_FILTER_PREFIX_CHECKED);
+    if (!may_match) {
+      RecordTick(statistics, BLOOM_FILTER_PREFIX_USEFUL);
+    }
   }
 
   // if rep_->filter_entry is not set, we should call Release(); otherwise
@@ -1520,16 +1987,266 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
   if (!rep_->filter_entry.IsSet()) {
     filter_entry.Release(rep_->table_options.block_cache.get());
   }
-
   return may_match;
 }
 
-InternalIterator* BlockBasedTable::NewIterator(const ReadOptions& read_options,
-                                               Arena* arena,
-                                               bool skip_filters) {
-  return NewTwoLevelIterator(
-      new BlockEntryIteratorState(this, read_options, skip_filters),
-      NewIndexIterator(read_options), arena);
+template <class TBlockIter, typename TValue>
+void BlockBasedTableIterator<TBlockIter, TValue>::Seek(const Slice& target) {
+  is_out_of_bound_ = false;
+  if (!CheckPrefixMayMatch(target)) {
+    ResetDataIter();
+    return;
+  }
+
+  SavePrevIndexValue();
+
+  index_iter_->Seek(target);
+
+  if (!index_iter_->Valid()) {
+    ResetDataIter();
+    return;
+  }
+
+  InitDataBlock();
+
+  block_iter_.Seek(target);
+
+  FindKeyForward();
+  assert(
+      !block_iter_.Valid() ||
+      (key_includes_seq_ && icomp_.Compare(target, block_iter_.key()) <= 0) ||
+      (!key_includes_seq_ &&
+       icomp_.user_comparator()->Compare(ExtractUserKey(target),
+                                         block_iter_.key()) <= 0));
+}
+
+template <class TBlockIter, typename TValue>
+void BlockBasedTableIterator<TBlockIter, TValue>::SeekForPrev(
+    const Slice& target) {
+  is_out_of_bound_ = false;
+  if (!CheckPrefixMayMatch(target)) {
+    ResetDataIter();
+    return;
+  }
+
+  SavePrevIndexValue();
+
+  // Call Seek() rather than SeekForPrev() in the index block, because the
+  // target data block will likely to contain the position for `target`, the
+  // same as Seek(), rather than than before.
+  // For example, if we have three data blocks, each containing two keys:
+  //   [2, 4]  [6, 8] [10, 12]
+  //  (the keys in the index block would be [4, 8, 12])
+  // and the user calls SeekForPrev(7), we need to go to the second block,
+  // just like if they call Seek(7).
+  // The only case where the block is difference is when they seek to a position
+  // in the boundary. For example, if they SeekForPrev(5), we should go to the
+  // first block, rather than the second. However, we don't have the information
+  // to distinguish the two unless we read the second block. In this case, we'll
+  // end up with reading two blocks.
+  index_iter_->Seek(target);
+
+  if (!index_iter_->Valid()) {
+    index_iter_->SeekToLast();
+    if (!index_iter_->Valid()) {
+      ResetDataIter();
+      block_iter_points_to_real_block_ = false;
+      return;
+    }
+  }
+
+  InitDataBlock();
+
+  block_iter_.SeekForPrev(target);
+
+  FindKeyBackward();
+  assert(!block_iter_.Valid() ||
+         icomp_.Compare(target, block_iter_.key()) >= 0);
+}
+
+template <class TBlockIter, typename TValue>
+void BlockBasedTableIterator<TBlockIter, TValue>::SeekToFirst() {
+  is_out_of_bound_ = false;
+  SavePrevIndexValue();
+  index_iter_->SeekToFirst();
+  if (!index_iter_->Valid()) {
+    ResetDataIter();
+    return;
+  }
+  InitDataBlock();
+  block_iter_.SeekToFirst();
+  FindKeyForward();
+}
+
+template <class TBlockIter, typename TValue>
+void BlockBasedTableIterator<TBlockIter, TValue>::SeekToLast() {
+  is_out_of_bound_ = false;
+  SavePrevIndexValue();
+  index_iter_->SeekToLast();
+  if (!index_iter_->Valid()) {
+    ResetDataIter();
+    return;
+  }
+  InitDataBlock();
+  block_iter_.SeekToLast();
+  FindKeyBackward();
+}
+
+template <class TBlockIter, typename TValue>
+void BlockBasedTableIterator<TBlockIter, TValue>::Next() {
+  assert(block_iter_points_to_real_block_);
+  block_iter_.Next();
+  FindKeyForward();
+}
+
+template <class TBlockIter, typename TValue>
+void BlockBasedTableIterator<TBlockIter, TValue>::Prev() {
+  assert(block_iter_points_to_real_block_);
+  block_iter_.Prev();
+  FindKeyBackward();
+}
+
+template <class TBlockIter, typename TValue>
+void BlockBasedTableIterator<TBlockIter, TValue>::InitDataBlock() {
+  BlockHandle data_block_handle = index_iter_->value();
+  if (!block_iter_points_to_real_block_ ||
+      data_block_handle.offset() != prev_index_value_.offset() ||
+      // if previous attempt of reading the block missed cache, try again
+      block_iter_.status().IsIncomplete()) {
+    if (block_iter_points_to_real_block_) {
+      ResetDataIter();
+    }
+    auto* rep = table_->get_rep();
+
+    // Automatically prefetch additional data when a range scan (iterator) does
+    // more than 2 sequential IOs. This is enabled only for user reads and when
+    // ReadOptions.readahead_size is 0.
+    if (!for_compaction_ && read_options_.readahead_size == 0) {
+      num_file_reads_++;
+      if (num_file_reads_ > 2) {
+        if (!rep->file->use_direct_io() &&
+            (data_block_handle.offset() +
+                 static_cast<size_t>(data_block_handle.size()) +
+                 kBlockTrailerSize >
+             readahead_limit_)) {
+          // Buffered I/O
+          // Discarding the return status of Prefetch calls intentionally, as we
+          // can fallback to reading from disk if Prefetch fails.
+          rep->file->Prefetch(data_block_handle.offset(), readahead_size_);
+          readahead_limit_ =
+              static_cast<size_t>(data_block_handle.offset() + readahead_size_);
+          // Keep exponentially increasing readahead size until
+          // kMaxReadaheadSize.
+          readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ * 2);
+        } else if (rep->file->use_direct_io() && !prefetch_buffer_) {
+          // Direct I/O
+          // Let FilePrefetchBuffer take care of the readahead.
+          prefetch_buffer_.reset(new FilePrefetchBuffer(
+              rep->file.get(), kInitReadaheadSize, kMaxReadaheadSize));
+        }
+      }
+    }
+
+    Status s;
+    BlockBasedTable::NewDataBlockIterator<TBlockIter>(
+        rep, read_options_, data_block_handle, &block_iter_, is_index_,
+        key_includes_seq_, index_key_is_full_,
+        /* get_context */ nullptr, s, prefetch_buffer_.get());
+    block_iter_points_to_real_block_ = true;
+  }
+}
+
+template <class TBlockIter, typename TValue>
+void BlockBasedTableIterator<TBlockIter, TValue>::FindKeyForward() {
+  assert(!is_out_of_bound_);
+  // TODO the while loop inherits from two-level-iterator. We don't know
+  // whether a block can be empty so it can be replaced by an "if".
+  while (!block_iter_.Valid()) {
+    if (!block_iter_.status().ok()) {
+      return;
+    }
+    ResetDataIter();
+    // We used to check the current index key for upperbound.
+    // It will only save a data reading for a small percentage of use cases,
+    // so for code simplicity, we removed it. We can add it back if there is a
+    // significnat performance regression.
+    index_iter_->Next();
+
+    if (index_iter_->Valid()) {
+      InitDataBlock();
+      block_iter_.SeekToFirst();
+    } else {
+      return;
+    }
+  }
+
+  // Check upper bound on the current key
+  bool reached_upper_bound =
+      (read_options_.iterate_upper_bound != nullptr &&
+       block_iter_points_to_real_block_ && block_iter_.Valid() &&
+       icomp_.user_comparator()->Compare(ExtractUserKey(block_iter_.key()),
+                                         *read_options_.iterate_upper_bound) >=
+           0);
+  TEST_SYNC_POINT_CALLBACK(
+      "BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound",
+      &reached_upper_bound);
+  if (reached_upper_bound) {
+    is_out_of_bound_ = true;
+    return;
+  }
+}
+
+template <class TBlockIter, typename TValue>
+void BlockBasedTableIterator<TBlockIter, TValue>::FindKeyBackward() {
+  assert(!is_out_of_bound_);
+  while (!block_iter_.Valid()) {
+    if (!block_iter_.status().ok()) {
+      return;
+    }
+
+    ResetDataIter();
+    index_iter_->Prev();
+
+    if (index_iter_->Valid()) {
+      InitDataBlock();
+      block_iter_.SeekToLast();
+    } else {
+      return;
+    }
+  }
+
+  // We could have check lower bound here too, but we opt not to do it for
+  // code simplicity.
+}
+
+InternalIterator* BlockBasedTable::NewIterator(
+    const ReadOptions& read_options, const SliceTransform* prefix_extractor,
+    Arena* arena, bool skip_filters, bool for_compaction) {
+  bool need_upper_bound_check =
+      PrefixExtractorChanged(rep_->table_properties.get(), prefix_extractor);
+  const bool kIsNotIndex = false;
+  if (arena == nullptr) {
+    return new BlockBasedTableIterator<DataBlockIter>(
+        this, read_options, rep_->internal_comparator,
+        NewIndexIterator(
+            read_options,
+            need_upper_bound_check &&
+                rep_->index_type == BlockBasedTableOptions::kHashSearch),
+        !skip_filters && !read_options.total_order_seek &&
+            prefix_extractor != nullptr,
+        need_upper_bound_check, prefix_extractor, kIsNotIndex,
+        true /*key_includes_seq*/, for_compaction);
+  } else {
+    auto* mem =
+        arena->AllocateAligned(sizeof(BlockBasedTableIterator<DataBlockIter>));
+    return new (mem) BlockBasedTableIterator<DataBlockIter>(
+        this, read_options, rep_->internal_comparator,
+        NewIndexIterator(read_options, need_upper_bound_check),
+        !skip_filters && !read_options.total_order_seek &&
+            prefix_extractor != nullptr,
+        need_upper_bound_check, prefix_extractor, kIsNotIndex,
+        true /*key_includes_seq*/, for_compaction);
+  }
 }
 
 InternalIterator* BlockBasedTable::NewRangeTombstoneIterator(
@@ -1547,76 +2264,94 @@ InternalIterator* BlockBasedTable::NewRangeTombstoneIterator(
     Cache* block_cache = rep_->table_options.block_cache.get();
     assert(block_cache != nullptr);
     if (block_cache->Ref(rep_->range_del_entry.cache_handle)) {
-      auto iter = rep_->range_del_entry.value->NewIterator(
-          &rep_->internal_comparator, nullptr /* iter */,
-          true /* total_order_seek */, rep_->ioptions.statistics);
+      auto iter = rep_->range_del_entry.value->NewIterator<DataBlockIter>(
+          &rep_->internal_comparator,
+          rep_->internal_comparator.user_comparator());
       iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
                             rep_->range_del_entry.cache_handle);
       return iter;
     }
   }
-  std::string str;
-  rep_->range_del_handle.EncodeTo(&str);
-  // The meta-block exists but isn't in uncompressed block cache (maybe because
-  // it is disabled), so go through the full lookup process.
-  return NewDataBlockIterator(rep_, read_options, Slice(str));
+  // The meta-block exists but isn't in uncompressed block cache (maybe
+  // because it is disabled), so go through the full lookup process.
+  return NewDataBlockIterator<DataBlockIter>(rep_, read_options,
+                                             rep_->range_del_handle);
 }
 
-bool BlockBasedTable::FullFilterKeyMayMatch(const ReadOptions& read_options,
-                                            FilterBlockReader* filter,
-                                            const Slice& internal_key,
-                                            const bool no_io) const {
+bool BlockBasedTable::FullFilterKeyMayMatch(
+    const ReadOptions& read_options, FilterBlockReader* filter,
+    const Slice& internal_key, const bool no_io,
+    const SliceTransform* prefix_extractor) const {
   if (filter == nullptr || filter->IsBlockBased()) {
     return true;
   }
   Slice user_key = ExtractUserKey(internal_key);
   const Slice* const const_ikey_ptr = &internal_key;
+  bool may_match = true;
   if (filter->whole_key_filtering()) {
-    return filter->KeyMayMatch(user_key, kNotValid, no_io, const_ikey_ptr);
-  }
-  if (!read_options.total_order_seek && rep_->ioptions.prefix_extractor &&
-      rep_->table_properties->prefix_extractor_name.compare(
-          rep_->ioptions.prefix_extractor->Name()) == 0 &&
-      rep_->ioptions.prefix_extractor->InDomain(user_key) &&
-      !filter->PrefixMayMatch(
-          rep_->ioptions.prefix_extractor->Transform(user_key), kNotValid,
-          false, const_ikey_ptr)) {
-    return false;
+    may_match = filter->KeyMayMatch(user_key, prefix_extractor, kNotValid,
+                                    no_io, const_ikey_ptr);
+  } else if (!read_options.total_order_seek && prefix_extractor &&
+             rep_->table_properties->prefix_extractor_name.compare(
+                 prefix_extractor->Name()) == 0 &&
+             prefix_extractor->InDomain(user_key) &&
+             !filter->PrefixMayMatch(prefix_extractor->Transform(user_key),
+                                     prefix_extractor, kNotValid, false,
+                                     const_ikey_ptr)) {
+    may_match = false;
+  }
+  if (may_match) {
+    RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_POSITIVE);
   }
-  return true;
+  return may_match;
 }
 
 Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
-                            GetContext* get_context, bool skip_filters) {
+                            GetContext* get_context,
+                            const SliceTransform* prefix_extractor,
+                            bool skip_filters) {
+  assert(key.size() >= 8);  // key must be internal key
   Status s;
   const bool no_io = read_options.read_tier == kBlockCacheTier;
   CachableEntry<FilterBlockReader> filter_entry;
   if (!skip_filters) {
-    filter_entry = GetFilter(read_options.read_tier == kBlockCacheTier);
+    filter_entry =
+        GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr,
+                  read_options.read_tier == kBlockCacheTier, get_context);
   }
   FilterBlockReader* filter = filter_entry.value;
 
   // First check the full filter
   // If full filter not useful, Then go into each block
-  if (!FullFilterKeyMayMatch(read_options, filter, key, no_io)) {
+  if (!FullFilterKeyMayMatch(read_options, filter, key, no_io,
+                             prefix_extractor)) {
     RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL);
   } else {
-    BlockIter iiter_on_stack;
-    auto iiter = NewIndexIterator(read_options, &iiter_on_stack);
-    std::unique_ptr<InternalIterator> iiter_unique_ptr;
+    IndexBlockIter iiter_on_stack;
+    // if prefix_extractor found in block differs from options, disable
+    // BlockPrefixIndex. Only do this check when index_type is kHashSearch.
+    bool need_upper_bound_check = false;
+    if (rep_->index_type == BlockBasedTableOptions::kHashSearch) {
+      need_upper_bound_check = PrefixExtractorChanged(
+          rep_->table_properties.get(), prefix_extractor);
+    }
+    auto iiter =
+        NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack,
+                         /* index_entry */ nullptr, get_context);
+    std::unique_ptr<InternalIteratorBase<BlockHandle>> iiter_unique_ptr;
     if (iiter != &iiter_on_stack) {
       iiter_unique_ptr.reset(iiter);
     }
 
+    bool matched = false;  // if such user key mathced a key in SST
     bool done = false;
     for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) {
-      Slice handle_value = iiter->value();
+      BlockHandle handle = iiter->value();
 
-      BlockHandle handle;
       bool not_exist_in_filter =
           filter != nullptr && filter->IsBlockBased() == true &&
-          handle.DecodeFrom(&handle_value).ok() &&
-          !filter->KeyMayMatch(ExtractUserKey(key), handle.offset(), no_io);
+          !filter->KeyMayMatch(ExtractUserKey(key), prefix_extractor,
+                               handle.offset(), no_io);
 
       if (not_exist_in_filter) {
         // Not found
@@ -1625,14 +2360,16 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
         RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL);
         break;
       } else {
-        BlockIter biter;
-        NewDataBlockIterator(rep_, read_options, iiter->value(), &biter);
+        DataBlockIter biter;
+        NewDataBlockIterator<DataBlockIter>(
+            rep_, read_options, iiter->value(), &biter, false,
+            true /* key_includes_seq */, get_context);
 
         if (read_options.read_tier == kBlockCacheTier &&
             biter.status().IsIncomplete()) {
           // couldn't get block from block_cache
-          // Update Saver.state to Found because we are only looking for whether
-          // we can guarantee the key is not there when "no_io" is set
+          // Update Saver.state to Found because we are only looking for
+          // whether we can guarantee the key is not there when "no_io" is set
           get_context->MarkKeyMayExist();
           break;
         }
@@ -1641,14 +2378,25 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
           break;
         }
 
+        bool may_exist = biter.SeekForGet(key);
+        if (!may_exist) {
+          // HashSeek cannot find the key this block and the the iter is not
+          // the end of the block, i.e. cannot be in the following blocks
+          // either. In this case, the seek_key cannot be found, so we break
+          // from the top level for-loop.
+          break;
+        }
+
         // Call the *saver function on each entry/block until it returns false
-        for (biter.Seek(key); biter.Valid(); biter.Next()) {
+        for (; biter.Valid(); biter.Next()) {
           ParsedInternalKey parsed_key;
           if (!ParseInternalKey(biter.key(), &parsed_key)) {
             s = Status::Corruption(Slice());
           }
 
-          if (!get_context->SaveValue(parsed_key, biter.value(), &biter)) {
+          if (!get_context->SaveValue(
+                  parsed_key, biter.value(), &matched,
+                  biter.IsValuePinned() ? &biter : nullptr)) {
             done = true;
             break;
           }
@@ -1660,6 +2408,9 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
         break;
       }
     }
+    if (matched && filter != nullptr && !filter->IsBlockBased()) {
+      RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_TRUE_POSITIVE);
+    }
     if (s.ok()) {
       s = iiter->status();
     }
@@ -1677,16 +2428,18 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
 Status BlockBasedTable::Prefetch(const Slice* const begin,
                                  const Slice* const end) {
   auto& comparator = rep_->internal_comparator;
+  auto user_comparator = comparator.user_comparator();
   // pre-condition
   if (begin && end && comparator.Compare(*begin, *end) > 0) {
     return Status::InvalidArgument(*begin, *end);
   }
 
-  BlockIter iiter_on_stack;
-  auto iiter = NewIndexIterator(ReadOptions(), &iiter_on_stack);
-  std::unique_ptr<InternalIterator> iiter_unique_ptr;
+  IndexBlockIter iiter_on_stack;
+  auto iiter = NewIndexIterator(ReadOptions(), false, &iiter_on_stack);
+  std::unique_ptr<InternalIteratorBase<BlockHandle>> iiter_unique_ptr;
   if (iiter != &iiter_on_stack) {
-    iiter_unique_ptr = std::unique_ptr<InternalIterator>(iiter);
+    iiter_unique_ptr =
+        std::unique_ptr<InternalIteratorBase<BlockHandle>>(iiter);
   }
 
   if (!iiter->status().ok()) {
@@ -1699,9 +2452,13 @@ Status BlockBasedTable::Prefetch(const Slice* const begin,
 
   for (begin ? iiter->Seek(*begin) : iiter->SeekToFirst(); iiter->Valid();
        iiter->Next()) {
-    Slice block_handle = iiter->value();
-
-    if (end && comparator.Compare(iiter->key(), *end) >= 0) {
+    BlockHandle block_handle = iiter->value();
+    const bool is_user_key = rep_->table_properties &&
+                             rep_->table_properties->index_key_is_user_key > 0;
+    if (end &&
+        ((!is_user_key && comparator.Compare(iiter->key(), *end) >= 0) ||
+         (is_user_key &&
+          user_comparator->Compare(iiter->key(), ExtractUserKey(*end)) >= 0))) {
       if (prefetching_boundary_page) {
         break;
       }
@@ -1712,8 +2469,9 @@ Status BlockBasedTable::Prefetch(const Slice* const begin,
     }
 
     // Load the block specified by the block_handle into the block cache
-    BlockIter biter;
-    NewDataBlockIterator(rep_, ReadOptions(), block_handle, &biter);
+    DataBlockIter biter;
+    NewDataBlockIterator<DataBlockIter>(rep_, ReadOptions(), block_handle,
+                                        &biter);
 
     if (!biter.status().ok()) {
       // there was an unexpected error while pre-fetching
@@ -1724,26 +2482,106 @@ Status BlockBasedTable::Prefetch(const Slice* const begin,
   return Status::OK();
 }
 
+Status BlockBasedTable::VerifyChecksum() {
+  Status s;
+  // Check Meta blocks
+  std::unique_ptr<Block> meta;
+  std::unique_ptr<InternalIterator> meta_iter;
+  s = ReadMetaBlock(rep_, nullptr /* prefetch buffer */, &meta, &meta_iter);
+  if (s.ok()) {
+    s = VerifyChecksumInBlocks(meta_iter.get());
+    if (!s.ok()) {
+      return s;
+    }
+  } else {
+    return s;
+  }
+  // Check Data blocks
+  IndexBlockIter iiter_on_stack;
+  InternalIteratorBase<BlockHandle>* iiter =
+      NewIndexIterator(ReadOptions(), false, &iiter_on_stack);
+  std::unique_ptr<InternalIteratorBase<BlockHandle>> iiter_unique_ptr;
+  if (iiter != &iiter_on_stack) {
+    iiter_unique_ptr =
+        std::unique_ptr<InternalIteratorBase<BlockHandle>>(iiter);
+  }
+  if (!iiter->status().ok()) {
+    // error opening index iterator
+    return iiter->status();
+  }
+  s = VerifyChecksumInBlocks(iiter);
+  return s;
+}
+
+Status BlockBasedTable::VerifyChecksumInBlocks(
+    InternalIteratorBase<BlockHandle>* index_iter) {
+  Status s;
+  for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) {
+    s = index_iter->status();
+    if (!s.ok()) {
+      break;
+    }
+    BlockHandle handle = index_iter->value();
+    BlockContents contents;
+    Slice dummy_comp_dict;
+    BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
+                               rep_->footer, ReadOptions(), handle, &contents,
+                               rep_->ioptions, false /* decompress */,
+                               dummy_comp_dict /*compression dict*/,
+                               rep_->persistent_cache_options);
+    s = block_fetcher.ReadBlockContents();
+    if (!s.ok()) {
+      break;
+    }
+  }
+  return s;
+}
+
+Status BlockBasedTable::VerifyChecksumInBlocks(
+    InternalIteratorBase<Slice>* index_iter) {
+  Status s;
+  for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) {
+    s = index_iter->status();
+    if (!s.ok()) {
+      break;
+    }
+    BlockHandle handle;
+    Slice input = index_iter->value();
+    s = handle.DecodeFrom(&input);
+    BlockContents contents;
+    Slice dummy_comp_dict;
+    BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
+                               rep_->footer, ReadOptions(), handle, &contents,
+                               rep_->ioptions, false /* decompress */,
+                               dummy_comp_dict /*compression dict*/,
+                               rep_->persistent_cache_options);
+    s = block_fetcher.ReadBlockContents();
+    if (!s.ok()) {
+      break;
+    }
+  }
+  return s;
+}
+
 bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
                                       const Slice& key) {
-  std::unique_ptr<InternalIterator> iiter(NewIndexIterator(options));
+  std::unique_ptr<InternalIteratorBase<BlockHandle>> iiter(
+      NewIndexIterator(options));
   iiter->Seek(key);
   assert(iiter->Valid());
   CachableEntry<Block> block;
 
-  BlockHandle handle;
-  Slice input = iiter->value();
-  Status s = handle.DecodeFrom(&input);
-  assert(s.ok());
+  BlockHandle handle = iiter->value();
   Cache* block_cache = rep_->table_options.block_cache.get();
   assert(block_cache != nullptr);
 
   char cache_key_storage[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
   Slice cache_key =
-      GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
-                  handle, cache_key_storage);
+      GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, handle,
+                  cache_key_storage);
   Slice ckey;
 
+  Status s;
   s = GetDataBlockFromCache(
       cache_key, ckey, block_cache, nullptr, rep_->ioptions, options, &block,
       rep_->table_options.format_version,
@@ -1758,56 +2596,72 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
   return in_cache;
 }
 
-// REQUIRES: The following fields of rep_ should have already been populated:
-//  1. file
-//  2. index_handle,
-//  3. options
-//  4. internal_comparator
-//  5. index_type
-Status BlockBasedTable::CreateIndexReader(
-    IndexReader** index_reader, InternalIterator* preloaded_meta_index_iter,
-    int level) {
+BlockBasedTableOptions::IndexType BlockBasedTable::UpdateIndexType() {
   // Some old version of block-based tables don't have index type present in
   // table properties. If that's the case we can safely use the kBinarySearch.
-  auto index_type_on_file = BlockBasedTableOptions::kBinarySearch;
+  BlockBasedTableOptions::IndexType index_type_on_file =
+      BlockBasedTableOptions::kBinarySearch;
   if (rep_->table_properties) {
     auto& props = rep_->table_properties->user_collected_properties;
     auto pos = props.find(BlockBasedTablePropertyNames::kIndexType);
     if (pos != props.end()) {
       index_type_on_file = static_cast<BlockBasedTableOptions::IndexType>(
           DecodeFixed32(pos->second.c_str()));
+      // update index_type with the true type
+      rep_->index_type = index_type_on_file;
     }
   }
+  return index_type_on_file;
+}
+
+// REQUIRES: The following fields of rep_ should have already been populated:
+//  1. file
+//  2. index_handle,
+//  3. options
+//  4. internal_comparator
+//  5. index_type
+Status BlockBasedTable::CreateIndexReader(
+    FilePrefetchBuffer* prefetch_buffer, IndexReader** index_reader,
+    InternalIterator* preloaded_meta_index_iter, int level) {
+  auto index_type_on_file = UpdateIndexType();
 
   auto file = rep_->file.get();
-  auto comparator = &rep_->internal_comparator;
+  const InternalKeyComparator* icomparator = &rep_->internal_comparator;
   const Footer& footer = rep_->footer;
-  if (index_type_on_file == BlockBasedTableOptions::kHashSearch &&
-      rep_->ioptions.prefix_extractor == nullptr) {
-    ROCKS_LOG_WARN(rep_->ioptions.info_log,
-                   "BlockBasedTableOptions::kHashSearch requires "
-                   "options.prefix_extractor to be set."
-                   " Fall back to binary search index.");
-    index_type_on_file = BlockBasedTableOptions::kBinarySearch;
-  }
+
+  // kHashSearch requires non-empty prefix_extractor but bypass checking
+  // prefix_extractor here since we have no access to MutableCFOptions.
+  // Add need_upper_bound_check flag in  BlockBasedTable::NewIndexIterator.
+  // If prefix_extractor does not match prefix_extractor_name from table
+  // properties, turn off Hash Index by setting total_order_seek to true
 
   switch (index_type_on_file) {
     case BlockBasedTableOptions::kTwoLevelIndexSearch: {
       return PartitionIndexReader::Create(
-          this, file, footer, footer.index_handle(), rep_->ioptions, comparator,
-          index_reader, rep_->persistent_cache_options, level);
+          this, file, prefetch_buffer, footer, footer.index_handle(),
+          rep_->ioptions, icomparator, index_reader,
+          rep_->persistent_cache_options, level,
+          rep_->table_properties == nullptr ||
+              rep_->table_properties->index_key_is_user_key == 0,
+          rep_->table_properties == nullptr ||
+              rep_->table_properties->index_value_is_delta_encoded == 0);
     }
     case BlockBasedTableOptions::kBinarySearch: {
       return BinarySearchIndexReader::Create(
-          file, footer, footer.index_handle(), rep_->ioptions, comparator,
-          index_reader, rep_->persistent_cache_options);
+          file, prefetch_buffer, footer, footer.index_handle(), rep_->ioptions,
+          icomparator, index_reader, rep_->persistent_cache_options,
+          rep_->table_properties == nullptr ||
+              rep_->table_properties->index_key_is_user_key == 0,
+          rep_->table_properties == nullptr ||
+              rep_->table_properties->index_value_is_delta_encoded == 0);
     }
     case BlockBasedTableOptions::kHashSearch: {
       std::unique_ptr<Block> meta_guard;
       std::unique_ptr<InternalIterator> meta_iter_guard;
       auto meta_index_iter = preloaded_meta_index_iter;
       if (meta_index_iter == nullptr) {
-        auto s = ReadMetaBlock(rep_, &meta_guard, &meta_iter_guard);
+        auto s =
+            ReadMetaBlock(rep_, prefetch_buffer, &meta_guard, &meta_iter_guard);
         if (!s.ok()) {
           // we simply fall back to binary search in case there is any
           // problem with prefix hash index loading.
@@ -1815,42 +2669,44 @@ Status BlockBasedTable::CreateIndexReader(
                          "Unable to read the metaindex block."
                          " Fall back to binary search index.");
           return BinarySearchIndexReader::Create(
-              file, footer, footer.index_handle(), rep_->ioptions, comparator,
-              index_reader, rep_->persistent_cache_options);
+              file, prefetch_buffer, footer, footer.index_handle(),
+              rep_->ioptions, icomparator, index_reader,
+              rep_->persistent_cache_options,
+              rep_->table_properties == nullptr ||
+                  rep_->table_properties->index_key_is_user_key == 0,
+              rep_->table_properties == nullptr ||
+                  rep_->table_properties->index_value_is_delta_encoded == 0);
         }
         meta_index_iter = meta_iter_guard.get();
       }
 
       return HashIndexReader::Create(
-          rep_->internal_prefix_transform.get(), footer, file, rep_->ioptions,
-          comparator, footer.index_handle(), meta_index_iter, index_reader,
-          rep_->hash_index_allow_collision, rep_->persistent_cache_options);
+          rep_->internal_prefix_transform.get(), footer, file, prefetch_buffer,
+          rep_->ioptions, icomparator, footer.index_handle(), meta_index_iter,
+          index_reader, rep_->hash_index_allow_collision,
+          rep_->persistent_cache_options,
+          rep_->table_properties == nullptr ||
+              rep_->table_properties->index_key_is_user_key == 0,
+          rep_->table_properties == nullptr ||
+              rep_->table_properties->index_value_is_delta_encoded == 0);
     }
     default: {
       std::string error_message =
-          "Unrecognized index type: " + ToString(rep_->index_type);
+          "Unrecognized index type: " + ToString(index_type_on_file);
       return Status::InvalidArgument(error_message.c_str());
     }
   }
 }
 
 uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key) {
-  unique_ptr<InternalIterator> index_iter(NewIndexIterator(ReadOptions()));
+  unique_ptr<InternalIteratorBase<BlockHandle>> index_iter(
+      NewIndexIterator(ReadOptions()));
 
   index_iter->Seek(key);
   uint64_t result;
   if (index_iter->Valid()) {
-    BlockHandle handle;
-    Slice input = index_iter->value();
-    Status s = handle.DecodeFrom(&input);
-    if (s.ok()) {
-      result = handle.offset();
-    } else {
-      // Strange: we can't decode the block handle in the index block.
-      // We'll just return the offset of the metaindex block, which is
-      // close to the whole file size for this case.
-      result = rep_->footer.metaindex_handle().offset();
-    }
+    BlockHandle handle = index_iter->value();
+    result = handle.offset();
   } else {
     // key is past the last key in the file. If table_properties is not
     // available, approximate the offset by returning the offset of the
@@ -1877,7 +2733,7 @@ bool BlockBasedTable::TEST_index_reader_preloaded() const {
 
 Status BlockBasedTable::GetKVPairsFromDataBlocks(
     std::vector<KVPairBlock>* kv_pair_blocks) {
-  std::unique_ptr<InternalIterator> blockhandles_iter(
+  std::unique_ptr<InternalIteratorBase<BlockHandle>> blockhandles_iter(
       NewIndexIterator(ReadOptions()));
 
   Status s = blockhandles_iter->status();
@@ -1895,8 +2751,8 @@ Status BlockBasedTable::GetKVPairsFromDataBlocks(
     }
 
     std::unique_ptr<InternalIterator> datablock_iter;
-    datablock_iter.reset(
-        NewDataBlockIterator(rep_, ReadOptions(), blockhandles_iter->value()));
+    datablock_iter.reset(NewDataBlockIterator<DataBlockIter>(
+        rep_, ReadOptions(), blockhandles_iter->value()));
     s = datablock_iter->status();
 
     if (!s.ok()) {
@@ -1925,7 +2781,8 @@ Status BlockBasedTable::GetKVPairsFromDataBlocks(
   return Status::OK();
 }
 
-Status BlockBasedTable::DumpTable(WritableFile* out_file) {
+Status BlockBasedTable::DumpTable(WritableFile* out_file,
+                                  const SliceTransform* prefix_extractor) {
   // Output Footer
   out_file->Append(
       "Footer Details:\n"
@@ -1940,7 +2797,8 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
       "--------------------------------------\n");
   std::unique_ptr<Block> meta;
   std::unique_ptr<InternalIterator> meta_iter;
-  Status s = ReadMetaBlock(rep_, &meta, &meta_iter);
+  Status s =
+      ReadMetaBlock(rep_, nullptr /* prefetch_buffer */, &meta, &meta_iter);
   if (s.ok()) {
     for (meta_iter->SeekToFirst(); meta_iter->Valid(); meta_iter->Next()) {
       s = meta_iter->status();
@@ -1982,29 +2840,32 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
         "  ");
     out_file->Append(table_properties->ToString("\n  ", ": ").c_str());
     out_file->Append("\n");
-  }
 
-  // Output Filter blocks
-  if (!rep_->filter && !table_properties->filter_policy_name.empty()) {
-    // Support only BloomFilter as off now
-    rocksdb::BlockBasedTableOptions table_options;
-    table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(1));
-    if (table_properties->filter_policy_name.compare(
-            table_options.filter_policy->Name()) == 0) {
-      std::string filter_block_key = kFilterBlockPrefix;
-      filter_block_key.append(table_properties->filter_policy_name);
-      BlockHandle handle;
-      if (FindMetaBlock(meta_iter.get(), filter_block_key, &handle).ok()) {
-        BlockContents block;
-        if (ReadBlockContents(
-                rep_->file.get(), rep_->footer, ReadOptions(), handle, &block,
-                rep_->ioptions, false /*decompress*/,
-                Slice() /*compression dict*/, rep_->persistent_cache_options)
-                .ok()) {
-          rep_->filter.reset(new BlockBasedFilterBlockReader(
-              rep_->ioptions.prefix_extractor, table_options,
-              table_options.whole_key_filtering, std::move(block),
-              rep_->ioptions.statistics));
+    // Output Filter blocks
+    if (!rep_->filter && !table_properties->filter_policy_name.empty()) {
+      // Support only BloomFilter as off now
+      rocksdb::BlockBasedTableOptions table_options;
+      table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(1));
+      if (table_properties->filter_policy_name.compare(
+              table_options.filter_policy->Name()) == 0) {
+        std::string filter_block_key = kFilterBlockPrefix;
+        filter_block_key.append(table_properties->filter_policy_name);
+        BlockHandle handle;
+        if (FindMetaBlock(meta_iter.get(), filter_block_key, &handle).ok()) {
+          BlockContents block;
+          Slice dummy_comp_dict;
+          BlockFetcher block_fetcher(
+              rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer,
+              ReadOptions(), handle, &block, rep_->ioptions,
+              false /*decompress*/, dummy_comp_dict /*compression dict*/,
+              rep_->persistent_cache_options);
+          s = block_fetcher.ReadBlockContents();
+          if (!s.ok()) {
+            rep_->filter.reset(new BlockBasedFilterBlockReader(
+                prefix_extractor, table_options,
+                table_options.whole_key_filtering, std::move(block),
+                rep_->ioptions.statistics));
+          }
         }
       }
     }
@@ -2061,6 +2922,9 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
 }
 
 void BlockBasedTable::Close() {
+  if (rep_->closed) {
+    return;
+  }
   rep_->filter_entry.Release(rep_->table_options.block_cache.get());
   rep_->index_entry.Release(rep_->table_options.block_cache.get());
   rep_->range_del_entry.Release(rep_->table_options.block_cache.get());
@@ -2069,7 +2933,7 @@ void BlockBasedTable::Close() {
     char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
     // Get the filter block key
     auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
-                           rep_->footer.metaindex_handle(), cache_key);
+                           rep_->filter_handle, cache_key);
     rep_->table_options.block_cache.get()->Erase(key);
     // Get the index block key
     key = GetCacheKeyFromOffset(rep_->cache_key_prefix,
@@ -2077,14 +2941,14 @@ void BlockBasedTable::Close() {
                                 rep_->dummy_index_reader_offset, cache_key);
     rep_->table_options.block_cache.get()->Erase(key);
   }
+  rep_->closed = true;
 }
 
 Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) {
   out_file->Append(
       "Index Details:\n"
       "--------------------------------------\n");
-
-  std::unique_ptr<InternalIterator> blockhandles_iter(
+  std::unique_ptr<InternalIteratorBase<BlockHandle>> blockhandles_iter(
       NewIndexIterator(ReadOptions()));
   Status s = blockhandles_iter->status();
   if (!s.ok()) {
@@ -2101,16 +2965,23 @@ Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) {
       break;
     }
     Slice key = blockhandles_iter->key();
+    Slice user_key;
     InternalKey ikey;
-    ikey.DecodeFrom(key);
+    if (rep_->table_properties &&
+        rep_->table_properties->index_key_is_user_key != 0) {
+      user_key = key;
+    } else {
+      ikey.DecodeFrom(key);
+      user_key = ikey.user_key();
+    }
 
     out_file->Append("  HEX    ");
-    out_file->Append(ikey.user_key().ToString(true).c_str());
+    out_file->Append(user_key.ToString(true).c_str());
     out_file->Append(": ");
     out_file->Append(blockhandles_iter->value().ToString(true).c_str());
     out_file->Append("\n");
 
-    std::string str_key = ikey.user_key().ToString();
+    std::string str_key = user_key.ToString();
     std::string res_key("");
     char cspace = ' ';
     for (size_t i = 0; i < str_key.size(); i++) {
@@ -2126,7 +2997,7 @@ Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) {
 }
 
 Status BlockBasedTable::DumpDataBlocks(WritableFile* out_file) {
-  std::unique_ptr<InternalIterator> blockhandles_iter(
+  std::unique_ptr<InternalIteratorBase<BlockHandle>> blockhandles_iter(
       NewIndexIterator(ReadOptions()));
   Status s = blockhandles_iter->status();
   if (!s.ok()) {
@@ -2146,9 +3017,7 @@ Status BlockBasedTable::DumpDataBlocks(WritableFile* out_file) {
       break;
     }
 
-    Slice bh_val = blockhandles_iter->value();
-    BlockHandle bh;
-    bh.DecodeFrom(&bh_val);
+    BlockHandle bh = blockhandles_iter->value();
     uint64_t datablock_size = bh.size();
     datablock_size_min = std::min(datablock_size_min, datablock_size);
     datablock_size_max = std::max(datablock_size_max, datablock_size);
@@ -2162,8 +3031,8 @@ Status BlockBasedTable::DumpDataBlocks(WritableFile* out_file) {
     out_file->Append("--------------------------------------\n");
 
     std::unique_ptr<InternalIterator> datablock_iter;
-    datablock_iter.reset(
-        NewDataBlockIterator(rep_, ReadOptions(), blockhandles_iter->value()));
+    datablock_iter.reset(NewDataBlockIterator<DataBlockIter>(
+        rep_, ReadOptions(), blockhandles_iter->value()));
     s = datablock_iter->status();
 
     if (!s.ok()) {
@@ -2219,11 +3088,19 @@ void BlockBasedTable::DumpKeyValue(const Slice& key, const Slice& value,
   std::string res_key(""), res_value("");
   char cspace = ' ';
   for (size_t i = 0; i < str_key.size(); i++) {
-    res_key.append(&str_key[i], 1);
+    if (str_key[i] == '\0') {
+      res_key.append("\\0", 2);
+    } else {
+      res_key.append(&str_key[i], 1);
+    }
     res_key.append(1, cspace);
   }
   for (size_t i = 0; i < str_value.size(); i++) {
-    res_value.append(&str_value[i], 1);
+    if (str_value[i] == '\0') {
+      res_value.append("\\0", 2);
+    } else {
+      res_value.append(&str_value[i], 1);
+    }
     res_value.append(1, cspace);
   }
 
@@ -2236,20 +3113,20 @@ void BlockBasedTable::DumpKeyValue(const Slice& key, const Slice& value,
 
 namespace {
 
-void DeleteCachedFilterEntry(const Slice& key, void* value) {
+void DeleteCachedFilterEntry(const Slice& /*key*/, void* value) {
   FilterBlockReader* filter = reinterpret_cast<FilterBlockReader*>(value);
   if (filter->statistics() != nullptr) {
     RecordTick(filter->statistics(), BLOCK_CACHE_FILTER_BYTES_EVICT,
-               filter->size());
+               filter->ApproximateMemoryUsage());
   }
   delete filter;
 }
 
-void DeleteCachedIndexEntry(const Slice& key, void* value) {
+void DeleteCachedIndexEntry(const Slice& /*key*/, void* value) {
   IndexReader* index_reader = reinterpret_cast<IndexReader*>(value);
   if (index_reader->statistics() != nullptr) {
     RecordTick(index_reader->statistics(), BLOCK_CACHE_INDEX_BYTES_EVICT,
-               index_reader->usable_size());
+               index_reader->ApproximateMemoryUsage());
   }
   delete index_reader;
 }