]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/version_set.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / version_set.cc
index 6c7b77a9005927d6cbe7f3577b94678020c47530..e913a97dd8cca0a49a0f2de0e469a402c9fcc0df 100644 (file)
@@ -9,20 +9,17 @@
 
 #include "db/version_set.h"
 
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
 #include <stdio.h>
 #include <algorithm>
+#include <array>
+#include <cinttypes>
 #include <list>
 #include <map>
 #include <set>
 #include <string>
 #include <unordered_map>
 #include <vector>
-#include "db/compaction.h"
+#include "compaction/compaction.h"
 #include "db/internal_stats.h"
 #include "db/log_reader.h"
 #include "db/log_writer.h"
 #include "db/pinned_iterators_manager.h"
 #include "db/table_cache.h"
 #include "db/version_builder.h"
+#include "file/filename.h"
+#include "file/random_access_file_reader.h"
+#include "file/read_write_util.h"
+#include "file/writable_file_writer.h"
 #include "monitoring/file_read_sample.h"
 #include "monitoring/perf_context_imp.h"
+#include "monitoring/persistent_stats_history.h"
 #include "rocksdb/env.h"
 #include "rocksdb/merge_operator.h"
 #include "rocksdb/write_buffer_manager.h"
 #include "table/internal_iterator.h"
 #include "table/merging_iterator.h"
 #include "table/meta_blocks.h"
-#include "table/plain_table_factory.h"
+#include "table/multiget_context.h"
+#include "table/plain/plain_table_factory.h"
 #include "table/table_reader.h"
 #include "table/two_level_iterator.h"
+#include "test_util/sync_point.h"
 #include "util/coding.h"
-#include "util/file_reader_writer.h"
-#include "util/filename.h"
 #include "util/stop_watch.h"
 #include "util/string_util.h"
-#include "util/sync_point.h"
 #include "util/user_comparator_wrapper.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 namespace {
 
@@ -91,7 +92,8 @@ Status OverlapWithIterator(const Comparator* ucmp,
       return Status::Corruption("DB have corrupted keys");
     }
 
-    if (ucmp->Compare(seek_result.user_key, largest_user_key) <= 0) {
+    if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
+        0) {
       *overlap = true;
     }
   }
@@ -169,17 +171,16 @@ class FilePicker {
           // Check if key is within a file's range. If search left bound and
           // right bound point to the same find, we are sure key falls in
           // range.
-          assert(
-              curr_level_ == 0 ||
-              curr_index_in_curr_level_ == start_index_in_curr_level_ ||
-              user_comparator_->Compare(user_key_,
-                ExtractUserKey(f->smallest_key)) <= 0);
-
-          int cmp_smallest = user_comparator_->Compare(user_key_,
-              ExtractUserKey(f->smallest_key));
+          assert(curr_level_ == 0 ||
+                 curr_index_in_curr_level_ == start_index_in_curr_level_ ||
+                 user_comparator_->CompareWithoutTimestamp(
+                     user_key_, ExtractUserKey(f->smallest_key)) <= 0);
+
+          int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
+              user_key_, ExtractUserKey(f->smallest_key));
           if (cmp_smallest >= 0) {
-            cmp_largest = user_comparator_->Compare(user_key_,
-                ExtractUserKey(f->largest_key));
+            cmp_largest = user_comparator_->CompareWithoutTimestamp(
+                user_key_, ExtractUserKey(f->largest_key));
           }
 
           // Setup file search bound for the next level based on the
@@ -345,6 +346,389 @@ class FilePicker {
     return false;
   }
 };
+
+class FilePickerMultiGet {
+ private:
+  struct FilePickerContext;
+
+ public:
+  FilePickerMultiGet(MultiGetRange* range,
+                     autovector<LevelFilesBrief>* file_levels,
+                     unsigned int num_levels, FileIndexer* file_indexer,
+                     const Comparator* user_comparator,
+                     const InternalKeyComparator* internal_comparator)
+      : num_levels_(num_levels),
+        curr_level_(static_cast<unsigned int>(-1)),
+        returned_file_level_(static_cast<unsigned int>(-1)),
+        hit_file_level_(static_cast<unsigned int>(-1)),
+        range_(range),
+        batch_iter_(range->begin()),
+        batch_iter_prev_(range->begin()),
+        maybe_repeat_key_(false),
+        current_level_range_(*range, range->begin(), range->end()),
+        current_file_range_(*range, range->begin(), range->end()),
+        level_files_brief_(file_levels),
+        is_hit_file_last_in_level_(false),
+        curr_file_level_(nullptr),
+        file_indexer_(file_indexer),
+        user_comparator_(user_comparator),
+        internal_comparator_(internal_comparator) {
+    for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
+      fp_ctx_array_[iter.index()] =
+          FilePickerContext(0, FileIndexer::kLevelMaxIndex);
+    }
+
+    // Setup member variables to search first level.
+    search_ended_ = !PrepareNextLevel();
+    if (!search_ended_) {
+      // REVISIT
+      // Prefetch Level 0 table data to avoid cache miss if possible.
+      // As of now, only PlainTableReader and CuckooTableReader do any
+      // prefetching. This may not be necessary anymore once we implement
+      // batching in those table readers
+      for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
+        auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
+        if (r) {
+          for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
+            r->Prepare(iter->ikey);
+          }
+        }
+      }
+    }
+  }
+
+  int GetCurrentLevel() const { return curr_level_; }
+
+  // Iterates through files in the current level until it finds a file that
+  // contains atleast one key from the MultiGet batch
+  bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
+                                  size_t* file_index, FdWithKeyRange** fd,
+                                  bool* is_last_key_in_file) {
+    size_t curr_file_index = *file_index;
+    FdWithKeyRange* f = nullptr;
+    bool file_hit = false;
+    int cmp_largest = -1;
+    if (curr_file_index >= curr_file_level_->num_files) {
+      // In the unlikely case the next key is a duplicate of the current key,
+      // and the current key is the last in the level and the internal key
+      // was not found, we need to skip lookup for the remaining keys and
+      // reset the search bounds
+      if (batch_iter_ != current_level_range_.end()) {
+        ++batch_iter_;
+        for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
+          struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
+          fp_ctx.search_left_bound = 0;
+          fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
+        }
+      }
+      return false;
+    }
+    // Loops over keys in the MultiGet batch until it finds a file with
+    // atleast one of the keys. Then it keeps moving forward until the
+    // last key in the batch that falls in that file
+    while (batch_iter_ != current_level_range_.end() &&
+           (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
+                curr_file_index ||
+            !file_hit)) {
+      struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
+      f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
+      Slice& user_key = batch_iter_->ukey;
+
+      // Do key range filtering of files or/and fractional cascading if:
+      // (1) not all the files are in level 0, or
+      // (2) there are more than 3 current level files
+      // If there are only 3 or less current level files in the system, we
+      // skip the key range filtering. In this case, more likely, the system
+      // is highly tuned to minimize number of tables queried by each query,
+      // so it is unlikely that key range filtering is more efficient than
+      // querying the files.
+      if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
+        // Check if key is within a file's range. If search left bound and
+        // right bound point to the same find, we are sure key falls in
+        // range.
+        assert(curr_level_ == 0 ||
+               fp_ctx.curr_index_in_curr_level ==
+                   fp_ctx.start_index_in_curr_level ||
+               user_comparator_->Compare(user_key,
+                                         ExtractUserKey(f->smallest_key)) <= 0);
+
+        int cmp_smallest = user_comparator_->Compare(
+            user_key, ExtractUserKey(f->smallest_key));
+        if (cmp_smallest >= 0) {
+          cmp_largest = user_comparator_->Compare(
+              user_key, ExtractUserKey(f->largest_key));
+        } else {
+          cmp_largest = -1;
+        }
+
+        // Setup file search bound for the next level based on the
+        // comparison results
+        if (curr_level_ > 0) {
+          file_indexer_->GetNextLevelIndex(
+              curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
+              cmp_largest, &fp_ctx.search_left_bound,
+              &fp_ctx.search_right_bound);
+        }
+        // Key falls out of current file's range
+        if (cmp_smallest < 0 || cmp_largest > 0) {
+          next_file_range->SkipKey(batch_iter_);
+        } else {
+          file_hit = true;
+        }
+      } else {
+        file_hit = true;
+      }
+      if (cmp_largest == 0) {
+        // cmp_largest is 0, which means the next key will not be in this
+        // file, so stop looking further. Also don't increment megt_iter_
+        // as we may have to look for this key in the next file if we don't
+        // find it in this one
+        break;
+      } else {
+        if (curr_level_ == 0) {
+          // We need to look through all files in level 0
+          ++fp_ctx.curr_index_in_curr_level;
+        }
+        ++batch_iter_;
+      }
+      if (!file_hit) {
+        curr_file_index =
+            (batch_iter_ != current_level_range_.end())
+                ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
+                : curr_file_level_->num_files;
+      }
+    }
+
+    *fd = f;
+    *file_index = curr_file_index;
+    *is_last_key_in_file = cmp_largest == 0;
+    return file_hit;
+  }
+
+  FdWithKeyRange* GetNextFile() {
+    while (!search_ended_) {
+      // Start searching next level.
+      if (batch_iter_ == current_level_range_.end()) {
+        search_ended_ = !PrepareNextLevel();
+        continue;
+      } else {
+        if (maybe_repeat_key_) {
+          maybe_repeat_key_ = false;
+          // Check if we found the final value for the last key in the
+          // previous lookup range. If we did, then there's no need to look
+          // any further for that key, so advance batch_iter_. Else, keep
+          // batch_iter_ positioned on that key so we look it up again in
+          // the next file
+          // For L0, always advance the key because we will look in the next
+          // file regardless for all keys not found yet
+          if (current_level_range_.CheckKeyDone(batch_iter_) ||
+              curr_level_ == 0) {
+            ++batch_iter_;
+          }
+        }
+        // batch_iter_prev_ will become the start key for the next file
+        // lookup
+        batch_iter_prev_ = batch_iter_;
+      }
+
+      MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
+                                    current_level_range_.end());
+      size_t curr_file_index =
+          (batch_iter_ != current_level_range_.end())
+              ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
+              : curr_file_level_->num_files;
+      FdWithKeyRange* f;
+      bool is_last_key_in_file;
+      if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
+                                      &is_last_key_in_file)) {
+        search_ended_ = !PrepareNextLevel();
+      } else {
+        MultiGetRange::Iterator upper_key = batch_iter_;
+        if (is_last_key_in_file) {
+          // Since cmp_largest is 0, batch_iter_ still points to the last key
+          // that falls in this file, instead of the next one. Increment
+          // upper_key so we can set the range properly for SST MultiGet
+          ++upper_key;
+          ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level);
+          maybe_repeat_key_ = true;
+        }
+        // Set the range for this file
+        current_file_range_ =
+            MultiGetRange(next_file_range, batch_iter_prev_, upper_key);
+        returned_file_level_ = curr_level_;
+        hit_file_level_ = curr_level_;
+        is_hit_file_last_in_level_ =
+            curr_file_index == curr_file_level_->num_files - 1;
+        return f;
+      }
+    }
+
+    // Search ended
+    return nullptr;
+  }
+
+  // getter for current file level
+  // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
+  unsigned int GetHitFileLevel() { return hit_file_level_; }
+
+  // Returns true if the most recent "hit file" (i.e., one returned by
+  // GetNextFile()) is at the last index in its level.
+  bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
+
+  const MultiGetRange& CurrentFileRange() { return current_file_range_; }
+
+ private:
+  unsigned int num_levels_;
+  unsigned int curr_level_;
+  unsigned int returned_file_level_;
+  unsigned int hit_file_level_;
+
+  struct FilePickerContext {
+    int32_t search_left_bound;
+    int32_t search_right_bound;
+    unsigned int curr_index_in_curr_level;
+    unsigned int start_index_in_curr_level;
+
+    FilePickerContext(int32_t left, int32_t right)
+        : search_left_bound(left), search_right_bound(right),
+          curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
+
+    FilePickerContext() = default;
+  };
+  std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
+  MultiGetRange* range_;
+  // Iterator to iterate through the keys in a MultiGet batch, that gets reset
+  // at the beginning of each level. Each call to GetNextFile() will position
+  // batch_iter_ at or right after the last key that was found in the returned
+  // SST file
+  MultiGetRange::Iterator batch_iter_;
+  // An iterator that records the previous position of batch_iter_, i.e last
+  // key found in the previous SST file, in order to serve as the start of
+  // the batch key range for the next SST file
+  MultiGetRange::Iterator batch_iter_prev_;
+  bool maybe_repeat_key_;
+  MultiGetRange current_level_range_;
+  MultiGetRange current_file_range_;
+  autovector<LevelFilesBrief>* level_files_brief_;
+  bool search_ended_;
+  bool is_hit_file_last_in_level_;
+  LevelFilesBrief* curr_file_level_;
+  FileIndexer* file_indexer_;
+  const Comparator* user_comparator_;
+  const InternalKeyComparator* internal_comparator_;
+
+  // Setup local variables to search next level.
+  // Returns false if there are no more levels to search.
+  bool PrepareNextLevel() {
+    if (curr_level_ == 0) {
+      MultiGetRange::Iterator mget_iter = current_level_range_.begin();
+      if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
+          curr_file_level_->num_files) {
+        batch_iter_prev_ = current_level_range_.begin();
+        batch_iter_ = current_level_range_.begin();
+        return true;
+      }
+    }
+
+    curr_level_++;
+    // Reset key range to saved value
+    while (curr_level_ < num_levels_) {
+      bool level_contains_keys = false;
+      curr_file_level_ = &(*level_files_brief_)[curr_level_];
+      if (curr_file_level_->num_files == 0) {
+        // When current level is empty, the search bound generated from upper
+        // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
+        // also empty.
+
+        for (auto mget_iter = current_level_range_.begin();
+             mget_iter != current_level_range_.end(); ++mget_iter) {
+          struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
+
+          assert(fp_ctx.search_left_bound == 0);
+          assert(fp_ctx.search_right_bound == -1 ||
+                 fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
+          // Since current level is empty, it will need to search all files in
+          // the next level
+          fp_ctx.search_left_bound = 0;
+          fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
+        }
+        // Skip all subsequent empty levels
+        do {
+          ++curr_level_;
+        } while ((curr_level_ < num_levels_) &&
+                 (*level_files_brief_)[curr_level_].num_files == 0);
+        continue;
+      }
+
+      // Some files may overlap each other. We find
+      // all files that overlap user_key and process them in order from
+      // newest to oldest. In the context of merge-operator, this can occur at
+      // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
+      // are always compacted into a single entry).
+      int32_t start_index = -1;
+      current_level_range_ =
+          MultiGetRange(*range_, range_->begin(), range_->end());
+      for (auto mget_iter = current_level_range_.begin();
+           mget_iter != current_level_range_.end(); ++mget_iter) {
+        struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
+        if (curr_level_ == 0) {
+          // On Level-0, we read through all files to check for overlap.
+          start_index = 0;
+          level_contains_keys = true;
+        } else {
+          // On Level-n (n>=1), files are sorted. Binary search to find the
+          // earliest file whose largest key >= ikey. Search left bound and
+          // right bound are used to narrow the range.
+          if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
+            if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
+              fp_ctx.search_right_bound =
+                  static_cast<int32_t>(curr_file_level_->num_files) - 1;
+            }
+            // `search_right_bound_` is an inclusive upper-bound, but since it
+            // was determined based on user key, it is still possible the lookup
+            // key falls to the right of `search_right_bound_`'s corresponding
+            // file. So, pass a limit one higher, which allows us to detect this
+            // case.
+            Slice& ikey = mget_iter->ikey;
+            start_index = FindFileInRange(
+                *internal_comparator_, *curr_file_level_, ikey,
+                static_cast<uint32_t>(fp_ctx.search_left_bound),
+                static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
+            if (start_index == fp_ctx.search_right_bound + 1) {
+              // `ikey_` comes after `search_right_bound_`. The lookup key does
+              // not exist on this level, so let's skip this level and do a full
+              // binary search on the next level.
+              fp_ctx.search_left_bound = 0;
+              fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
+              current_level_range_.SkipKey(mget_iter);
+              continue;
+            } else {
+              level_contains_keys = true;
+            }
+          } else {
+            // search_left_bound > search_right_bound, key does not exist in
+            // this level. Since no comparison is done in this level, it will
+            // need to search all files in the next level.
+            fp_ctx.search_left_bound = 0;
+            fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
+            current_level_range_.SkipKey(mget_iter);
+            continue;
+          }
+        }
+        fp_ctx.start_index_in_curr_level = start_index;
+        fp_ctx.curr_index_in_curr_level = start_index;
+      }
+      if (level_contains_keys) {
+        batch_iter_prev_ = current_level_range_.begin();
+        batch_iter_ = current_level_range_.begin();
+        return true;
+      }
+      curr_level_++;
+    }
+    // curr_level_ = num_levels_. So, no more levels to search.
+    return false;
+  }
+};
 }  // anonymous namespace
 
 VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
@@ -414,14 +798,16 @@ static bool AfterFile(const Comparator* ucmp,
                       const Slice* user_key, const FdWithKeyRange* f) {
   // nullptr user_key occurs before all keys and is therefore never after *f
   return (user_key != nullptr &&
-          ucmp->Compare(*user_key, ExtractUserKey(f->largest_key)) > 0);
+          ucmp->CompareWithoutTimestamp(*user_key,
+                                        ExtractUserKey(f->largest_key)) > 0);
 }
 
 static bool BeforeFile(const Comparator* ucmp,
                        const Slice* user_key, const FdWithKeyRange* f) {
   // nullptr user_key occurs after all keys and is therefore never before *f
   return (user_key != nullptr &&
-          ucmp->Compare(*user_key, ExtractUserKey(f->smallest_key)) < 0);
+          ucmp->CompareWithoutTimestamp(*user_key,
+                                        ExtractUserKey(f->smallest_key)) < 0);
 }
 
 bool SomeFileOverlapsRange(
@@ -466,24 +852,25 @@ namespace {
 
 class LevelIterator final : public InternalIterator {
  public:
-  LevelIterator(
-      TableCache* table_cache, const ReadOptions& read_options,
-      const EnvOptions& env_options, const InternalKeyComparator& icomparator,
-      const LevelFilesBrief* flevel, const SliceTransform* prefix_extractor,
-      bool should_sample, HistogramImpl* file_read_hist, bool for_compaction,
-      bool skip_filters, int level, RangeDelAggregator* range_del_agg,
-      const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries =
-          nullptr)
+  LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
+                const FileOptions& file_options,
+                const InternalKeyComparator& icomparator,
+                const LevelFilesBrief* flevel,
+                const SliceTransform* prefix_extractor, bool should_sample,
+                HistogramImpl* file_read_hist, TableReaderCaller caller,
+                bool skip_filters, int level, RangeDelAggregator* range_del_agg,
+                const std::vector<AtomicCompactionUnitBoundary>*
+                    compaction_boundaries = nullptr)
       : table_cache_(table_cache),
         read_options_(read_options),
-        env_options_(env_options),
+        file_options_(file_options),
         icomparator_(icomparator),
         user_comparator_(icomparator.user_comparator()),
         flevel_(flevel),
         prefix_extractor_(prefix_extractor),
         file_read_hist_(file_read_hist),
         should_sample_(should_sample),
-        for_compaction_(for_compaction),
+        caller_(caller),
         skip_filters_(skip_filters),
         file_index_(flevel_->num_files),
         level_(level),
@@ -500,7 +887,8 @@ class LevelIterator final : public InternalIterator {
   void SeekForPrev(const Slice& target) override;
   void SeekToFirst() override;
   void SeekToLast() override;
-  void Next() override;
+  void Next() final override;
+  bool NextAndGetResult(IterateResult* result) override;
   void Prev() override;
 
   bool Valid() const override { return file_iter_.Valid(); }
@@ -508,34 +896,57 @@ class LevelIterator final : public InternalIterator {
     assert(Valid());
     return file_iter_.key();
   }
+
   Slice value() const override {
     assert(Valid());
     return file_iter_.value();
   }
+
   Status status() const override {
     return file_iter_.iter() ? file_iter_.status() : Status::OK();
   }
+
+  inline bool MayBeOutOfLowerBound() override {
+    assert(Valid());
+    return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
+  }
+
+  inline bool MayBeOutOfUpperBound() override {
+    assert(Valid());
+    return file_iter_.MayBeOutOfUpperBound();
+  }
+
   void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
     pinned_iters_mgr_ = pinned_iters_mgr;
     if (file_iter_.iter()) {
       file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
     }
   }
+
   bool IsKeyPinned() const override {
     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
            file_iter_.iter() && file_iter_.IsKeyPinned();
   }
+
   bool IsValuePinned() const override {
     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
            file_iter_.iter() && file_iter_.IsValuePinned();
   }
 
  private:
-  void SkipEmptyFileForward();
+  // Return true if at least one invalid file is seen and skipped.
+  bool SkipEmptyFileForward();
   void SkipEmptyFileBackward();
   void SetFileIterator(InternalIterator* iter);
   void InitFileIterator(size_t new_file_index);
 
+  // Called by both of Next() and NextAndGetResult(). Force inline.
+  void NextImpl() {
+    assert(Valid());
+    file_iter_.Next();
+    SkipEmptyFileForward();
+  }
+
   const Slice& file_smallest_key(size_t file_index) {
     assert(file_index < flevel_->num_files);
     return flevel_->files[file_index].smallest_key;
@@ -543,8 +954,9 @@ class LevelIterator final : public InternalIterator {
 
   bool KeyReachedUpperBound(const Slice& internal_key) {
     return read_options_.iterate_upper_bound != nullptr &&
-           user_comparator_.Compare(ExtractUserKey(internal_key),
-                                    *read_options_.iterate_upper_bound) >= 0;
+           user_comparator_.CompareWithoutTimestamp(
+               ExtractUserKey(internal_key),
+               *read_options_.iterate_upper_bound) >= 0;
   }
 
   InternalIterator* NewFileIterator() {
@@ -560,27 +972,46 @@ class LevelIterator final : public InternalIterator {
       smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
       largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
     }
+    CheckMayBeOutOfLowerBound();
     return table_cache_->NewIterator(
-        read_options_, env_options_, icomparator_, *file_meta.file_metadata,
+        read_options_, file_options_, icomparator_, *file_meta.file_metadata,
         range_del_agg_, prefix_extractor_,
-        nullptr /* don't need reference to table */,
-        file_read_hist_, for_compaction_, nullptr /* arena */, skip_filters_,
-        level_, smallest_compaction_key, largest_compaction_key);
+        nullptr /* don't need reference to table */, file_read_hist_, caller_,
+        /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key,
+        largest_compaction_key);
+  }
+
+  // Check if current file being fully within iterate_lower_bound.
+  //
+  // Note MyRocks may update iterate bounds between seek. To workaround it,
+  // we need to check and update may_be_out_of_lower_bound_ accordingly.
+  void CheckMayBeOutOfLowerBound() {
+    if (read_options_.iterate_lower_bound != nullptr &&
+        file_index_ < flevel_->num_files) {
+      may_be_out_of_lower_bound_ =
+          user_comparator_.Compare(
+              ExtractUserKey(file_smallest_key(file_index_)),
+              *read_options_.iterate_lower_bound) < 0;
+    }
   }
 
   TableCache* table_cache_;
   const ReadOptions read_options_;
-  const EnvOptions& env_options_;
+  const FileOptions& file_options_;
   const InternalKeyComparator& icomparator_;
   const UserComparatorWrapper user_comparator_;
   const LevelFilesBrief* flevel_;
   mutable FileDescriptor current_value_;
+  // `prefix_extractor_` may be non-null even for total order seek. Checking
+  // this variable is not the right way to identify whether prefix iterator
+  // is used.
   const SliceTransform* prefix_extractor_;
 
   HistogramImpl* file_read_hist_;
   bool should_sample_;
-  bool for_compaction_;
+  TableReaderCaller caller_;
   bool skip_filters_;
+  bool may_be_out_of_lower_bound_ = true;
   size_t file_index_;
   int level_;
   RangeDelAggregator* range_del_agg_;
@@ -593,13 +1024,56 @@ class LevelIterator final : public InternalIterator {
 };
 
 void LevelIterator::Seek(const Slice& target) {
-  size_t new_file_index = FindFile(icomparator_, *flevel_, target);
+  // Check whether the seek key fall under the same file
+  bool need_to_reseek = true;
+  if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
+    const FdWithKeyRange& cur_file = flevel_->files[file_index_];
+    if (icomparator_.InternalKeyComparator::Compare(
+            target, cur_file.largest_key) <= 0 &&
+        icomparator_.InternalKeyComparator::Compare(
+            target, cur_file.smallest_key) >= 0) {
+      need_to_reseek = false;
+      assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
+             file_index_);
+    }
+  }
+  if (need_to_reseek) {
+    TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
+    size_t new_file_index = FindFile(icomparator_, *flevel_, target);
+    InitFileIterator(new_file_index);
+  }
 
-  InitFileIterator(new_file_index);
   if (file_iter_.iter() != nullptr) {
     file_iter_.Seek(target);
   }
-  SkipEmptyFileForward();
+  if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
+      !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
+      file_iter_.iter() != nullptr && file_iter_.Valid()) {
+    // We've skipped the file we initially positioned to. In the prefix
+    // seek case, it is likely that the file is skipped because of
+    // prefix bloom or hash, where more keys are skipped. We then check
+    // the current key and invalidate the iterator if the prefix is
+    // already passed.
+    // When doing prefix iterator seek, when keys for one prefix have
+    // been exhausted, it can jump to any key that is larger. Here we are
+    // enforcing a stricter contract than that, in order to make it easier for
+    // higher layers (merging and DB iterator) to reason the correctness:
+    // 1. Within the prefix, the result should be accurate.
+    // 2. If keys for the prefix is exhausted, it is either positioned to the
+    //    next key after the prefix, or make the iterator invalid.
+    // A side benefit will be that it invalidates the iterator earlier so that
+    // the upper level merging iterator can merge fewer child iterators.
+    Slice target_user_key = ExtractUserKey(target);
+    Slice file_user_key = ExtractUserKey(file_iter_.key());
+    if (prefix_extractor_->InDomain(target_user_key) &&
+        (!prefix_extractor_->InDomain(file_user_key) ||
+         user_comparator_.Compare(
+             prefix_extractor_->Transform(target_user_key),
+             prefix_extractor_->Transform(file_user_key)) != 0)) {
+      SetFileIterator(nullptr);
+    }
+  }
+  CheckMayBeOutOfLowerBound();
 }
 
 void LevelIterator::SeekForPrev(const Slice& target) {
@@ -613,6 +1087,7 @@ void LevelIterator::SeekForPrev(const Slice& target) {
     file_iter_.SeekForPrev(target);
     SkipEmptyFileBackward();
   }
+  CheckMayBeOutOfLowerBound();
 }
 
 void LevelIterator::SeekToFirst() {
@@ -621,6 +1096,7 @@ void LevelIterator::SeekToFirst() {
     file_iter_.SeekToFirst();
   }
   SkipEmptyFileForward();
+  CheckMayBeOutOfLowerBound();
 }
 
 void LevelIterator::SeekToLast() {
@@ -629,12 +1105,19 @@ void LevelIterator::SeekToLast() {
     file_iter_.SeekToLast();
   }
   SkipEmptyFileBackward();
+  CheckMayBeOutOfLowerBound();
 }
 
-void LevelIterator::Next() {
-  assert(Valid());
-  file_iter_.Next();
-  SkipEmptyFileForward();
+void LevelIterator::Next() { NextImpl(); }
+
+bool LevelIterator::NextAndGetResult(IterateResult* result) {
+  NextImpl();
+  bool is_valid = Valid();
+  if (is_valid) {
+    result->key = key();
+    result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
+  }
+  return is_valid;
 }
 
 void LevelIterator::Prev() {
@@ -643,25 +1126,28 @@ void LevelIterator::Prev() {
   SkipEmptyFileBackward();
 }
 
-void LevelIterator::SkipEmptyFileForward() {
+bool LevelIterator::SkipEmptyFileForward() {
+  bool seen_empty_file = false;
   while (file_iter_.iter() == nullptr ||
          (!file_iter_.Valid() && file_iter_.status().ok() &&
           !file_iter_.iter()->IsOutOfBound())) {
+    seen_empty_file = true;
     // Move to next file
     if (file_index_ >= flevel_->num_files - 1) {
       // Already at the last file
       SetFileIterator(nullptr);
-      return;
+      break;
     }
     if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
       SetFileIterator(nullptr);
-      return;
+      break;
     }
     InitFileIterator(file_index_ + 1);
     if (file_iter_.iter() != nullptr) {
       file_iter_.SeekToFirst();
     }
   }
+  return seen_empty_file;
 }
 
 void LevelIterator::SkipEmptyFileBackward() {
@@ -723,7 +1209,7 @@ class BaseReferencedVersionBuilder {
  public:
   explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
       : version_builder_(new VersionBuilder(
-            cfd->current()->version_set()->env_options(), cfd->table_cache(),
+            cfd->current()->version_set()->file_options(), cfd->table_cache(),
             cfd->current()->storage_info(), cfd->ioptions()->info_log)),
         version_(cfd->current()) {
     version_->Ref();
@@ -744,7 +1230,7 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
   auto table_cache = cfd_->table_cache();
   auto ioptions = cfd_->ioptions();
   Status s = table_cache->GetTableProperties(
-      env_options_, cfd_->internal_comparator(), file_meta->fd, tp,
+      file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
       mutable_cf_options_.prefix_extractor.get(), true /* no io */);
   if (s.ok()) {
     return s;
@@ -758,7 +1244,7 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
 
   // 2. Table is not present in table cache, we'll read the table properties
   // directly from the properties block in the file.
-  std::unique_ptr<RandomAccessFile> file;
+  std::unique_ptr<FSRandomAccessFile> file;
   std::string file_name;
   if (fname != nullptr) {
     file_name = *fname;
@@ -767,7 +1253,8 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
       TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
                     file_meta->fd.GetPathId());
   }
-  s = ioptions->env->NewRandomAccessFile(file_name, &file, env_options_);
+  s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
+                                        nullptr);
   if (!s.ok()) {
     return s;
   }
@@ -779,8 +1266,7 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
       new RandomAccessFileReader(
           std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
           0 /* hist_type */, nullptr /* file_read_hist */,
-          nullptr /* rate_limiter */, false /* for_compaction*/,
-          ioptions->listeners));
+          nullptr /* rate_limiter */, ioptions->listeners));
   s = ReadTableProperties(
       file_reader.get(), file_meta->fd.GetFileSize(),
       Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
@@ -806,6 +1292,60 @@ Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
   return Status::OK();
 }
 
+Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
+                                            std::string* out_str) {
+  if (max_entries_to_print <= 0) {
+    return Status::OK();
+  }
+  int num_entries_left = max_entries_to_print;
+
+  std::stringstream ss;
+
+  for (int level = 0; level < storage_info_.num_levels_; level++) {
+    for (const auto& file_meta : storage_info_.files_[level]) {
+      auto fname =
+          TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
+                        file_meta->fd.GetPathId());
+
+      ss << "=== file : " << fname << " ===\n";
+
+      TableCache* table_cache = cfd_->table_cache();
+      std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
+
+      Status s = table_cache->GetRangeTombstoneIterator(
+          ReadOptions(), cfd_->internal_comparator(), *file_meta,
+          &tombstone_iter);
+      if (!s.ok()) {
+        return s;
+      }
+      if (tombstone_iter) {
+        tombstone_iter->SeekToFirst();
+
+        while (tombstone_iter->Valid() && num_entries_left > 0) {
+          ss << "start: " << tombstone_iter->start_key().ToString(true)
+             << " end: " << tombstone_iter->end_key().ToString(true)
+             << " seq: " << tombstone_iter->seq() << '\n';
+          tombstone_iter->Next();
+          num_entries_left--;
+        }
+        if (num_entries_left <= 0) {
+          break;
+        }
+      }
+    }
+    if (num_entries_left <= 0) {
+      break;
+    }
+  }
+  assert(num_entries_left >= 0);
+  if (num_entries_left <= 0) {
+    ss << "(results may not be complete)\n";
+  }
+
+  *out_str = ss.str();
+  return Status::OK();
+}
+
 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
                                          int level) {
   for (const auto& file_meta : storage_info_.files_[level]) {
@@ -884,7 +1424,7 @@ size_t Version::GetMemoryUsageByTableReaders() {
   for (auto& file_level : storage_info_.level_files_brief_) {
     for (size_t i = 0; i < file_level.num_files; i++) {
       total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
-          env_options_, cfd_->internal_comparator(), file_level.files[i].fd,
+          file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
           mutable_cf_options_.prefix_extractor.get());
     }
   }
@@ -916,16 +1456,16 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
         assert(!ioptions->cf_paths.empty());
         file_path = ioptions->cf_paths.back().path;
       }
+      const uint64_t file_number = file->fd.GetNumber();
       files.emplace_back(SstFileMetaData{
-          MakeTableFileName("", file->fd.GetNumber()),
-          file_path,
-          static_cast<size_t>(file->fd.GetFileSize()),
-          file->fd.smallest_seqno,
-          file->fd.largest_seqno,
-          file->smallest.user_key().ToString(),
+          MakeTableFileName("", file_number), file_number, file_path,
+          static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
+          file->fd.largest_seqno, file->smallest.user_key().ToString(),
           file->largest.user_key().ToString(),
           file->stats.num_reads_sampled.load(std::memory_order_relaxed),
-          file->being_compacted});
+          file->being_compacted, file->oldest_blob_file_number,
+          file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime(),
+          file->file_checksum, file->file_checksum_func_name});
       files.back().num_entries = file->num_entries;
       files.back().num_deletions = file->num_deletions;
       level_size += file->fd.GetFileSize();
@@ -946,6 +1486,24 @@ uint64_t Version::GetSstFilesSize() {
   return sst_files_size;
 }
 
+void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
+  uint64_t oldest_time = port::kMaxUint64;
+  for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
+    for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
+      assert(meta->fd.table_reader != nullptr);
+      uint64_t file_creation_time = meta->TryGetFileCreationTime();
+      if (file_creation_time == kUnknownFileCreationTime) {
+        *creation_time = 0;
+        return;
+      }
+      if (file_creation_time < oldest_time) {
+        oldest_time = file_creation_time;
+      }
+    }
+  }
+  *creation_time = oldest_time;
+}
+
 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
   // Estimation will be inaccurate when:
   // (1) there exist merge keys
@@ -994,7 +1552,7 @@ double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
 }
 
 void Version::AddIterators(const ReadOptions& read_options,
-                           const EnvOptions& soptions,
+                           const FileOptions& soptions,
                            MergeIteratorBuilder* merge_iter_builder,
                            RangeDelAggregator* range_del_agg) {
   assert(storage_info_.finalized_);
@@ -1006,7 +1564,7 @@ void Version::AddIterators(const ReadOptions& read_options,
 }
 
 void Version::AddIteratorsForLevel(const ReadOptions& read_options,
-                                   const EnvOptions& soptions,
+                                   const FileOptions& soptions,
                                    MergeIteratorBuilder* merge_iter_builder,
                                    int level,
                                    RangeDelAggregator* range_del_agg) {
@@ -1027,10 +1585,14 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
     for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
       const auto& file = storage_info_.LevelFilesBrief(0).files[i];
       merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
-          read_options, soptions, cfd_->internal_comparator(), *file.file_metadata,
-          range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
-          cfd_->internal_stats()->GetFileReadHist(0), false, arena,
-          false /* skip_filters */, 0 /* level */));
+          read_options, soptions, cfd_->internal_comparator(),
+          *file.file_metadata, range_del_agg,
+          mutable_cf_options_.prefix_extractor.get(), nullptr,
+          cfd_->internal_stats()->GetFileReadHist(0),
+          TableReaderCaller::kUserIterator, arena,
+          /*skip_filters=*/false, /*level=*/0,
+          /*smallest_compaction_key=*/nullptr,
+          /*largest_compaction_key=*/nullptr));
     }
     if (should_sample) {
       // Count ones for every L0 files. This is done per iterator creation
@@ -1051,13 +1613,13 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
         cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
         mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
         cfd_->internal_stats()->GetFileReadHist(level),
-        false /* for_compaction */, IsFilterSkipped(level), level,
-        range_del_agg));
+        TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
+        range_del_agg, /*largest_compaction_key=*/nullptr));
   }
 }
 
 Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
-                                         const EnvOptions& env_options,
+                                         const FileOptions& file_options,
                                          const Slice& smallest_user_key,
                                          const Slice& largest_user_key,
                                          int level, bool* overlap) {
@@ -1081,10 +1643,14 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
         continue;
       }
       ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
-          read_options, env_options, cfd_->internal_comparator(), *file->file_metadata,
-          &range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
-          cfd_->internal_stats()->GetFileReadHist(0), false, &arena,
-          false /* skip_filters */, 0 /* level */));
+          read_options, file_options, cfd_->internal_comparator(),
+          *file->file_metadata, &range_del_agg,
+          mutable_cf_options_.prefix_extractor.get(), nullptr,
+          cfd_->internal_stats()->GetFileReadHist(0),
+          TableReaderCaller::kUserIterator, &arena,
+          /*skip_filters=*/false, /*level=*/0,
+          /*smallest_compaction_key=*/nullptr,
+          /*largest_compaction_key=*/nullptr));
       status = OverlapWithIterator(
           ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
       if (!status.ok() || *overlap) {
@@ -1094,11 +1660,11 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
   } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
     auto mem = arena.AllocateAligned(sizeof(LevelIterator));
     ScopedArenaIterator iter(new (mem) LevelIterator(
-        cfd_->table_cache(), read_options, env_options,
+        cfd_->table_cache(), read_options, file_options,
         cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
         mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
         cfd_->internal_stats()->GetFileReadHist(level),
-        false /* for_compaction */, IsFilterSkipped(level), level,
+        TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
         &range_del_agg));
     status = OverlapWithIterator(
         ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
@@ -1158,7 +1724,7 @@ VersionStorageInfo::VersionStorageInfo(
 }
 
 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
-                 const EnvOptions& env_opt,
+                 const FileOptions& file_opt,
                  const MutableCFOptions mutable_cf_options,
                  uint64_t version_number)
     : env_(vset->env_),
@@ -1183,7 +1749,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
       next_(this),
       prev_(this),
       refs_(0),
-      env_options_(env_opt),
+      file_options_(file_opt),
       mutable_cf_options_(mutable_cf_options),
       version_number_(version_number) {}
 
@@ -1192,7 +1758,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
                   MergeContext* merge_context,
                   SequenceNumber* max_covering_tombstone_seq, bool* value_found,
                   bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
-                  bool* is_blob) {
+                  bool* is_blob, bool do_merge) {
   Slice ikey = k.internal_key();
   Slice user_key = k.user_key();
 
@@ -1204,11 +1770,18 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
   }
 
   PinnedIteratorsManager pinned_iters_mgr;
+  uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
+  if (vset_ && vset_->block_cache_tracer_ &&
+      vset_->block_cache_tracer_->is_tracing_enabled()) {
+    tracing_get_id = vset_->block_cache_tracer_->NextGetId();
+  }
   GetContext get_context(
       user_comparator(), merge_operator_, info_log_, db_statistics_,
       status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
-      value, value_found, merge_context, max_covering_tombstone_seq, this->env_,
-      seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
+      do_merge ? value : nullptr, value_found, merge_context, do_merge,
+      max_covering_tombstone_seq, this->env_, seq,
+      merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
+      tracing_get_id);
 
   // Pin blocks that we read to hold merge operands
   if (merge_operator_) {
@@ -1272,7 +1845,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
         } else if (fp.GetHitFileLevel() >= 2) {
           RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
         }
-        PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel());
+        PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
+                                  fp.GetHitFileLevel());
         return;
       case GetContext::kDeleted:
         // Use empty error message for speed
@@ -1285,16 +1859,19 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
         ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
         *status = Status::NotSupported(
             "Encounter unexpected blob index. Please open DB with "
-            "rocksdb::blob_db::BlobDB instead.");
+            "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
         return;
     }
     f = fp.GetNextFile();
   }
-
   if (db_statistics_ != nullptr) {
     get_context.ReportCounters();
   }
   if (GetContext::kMerge == get_context.State()) {
+    if (!do_merge) {
+      *status = Status::OK();
+      return;
+    }
     if (!merge_operator_) {
       *status =  Status::InvalidArgument(
           "merge_operator is not properly initialized.");
@@ -1318,53 +1895,234 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
   }
 }
 
-bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
-  // Reaching the bottom level implies misses at all upper levels, so we'll
-  // skip checking the filters when we predict a hit.
-  return cfd_->ioptions()->optimize_filters_for_hits &&
-         (level > 0 || is_file_last_in_level) &&
-         level == storage_info_.num_non_empty_levels() - 1;
-}
+void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
+                       ReadCallback* callback, bool* is_blob) {
+  PinnedIteratorsManager pinned_iters_mgr;
 
-void VersionStorageInfo::GenerateLevelFilesBrief() {
-  level_files_brief_.resize(num_non_empty_levels_);
-  for (int level = 0; level < num_non_empty_levels_; level++) {
-    DoGenerateLevelFilesBrief(
-        &level_files_brief_[level], files_[level], &arena_);
+  // Pin blocks that we read to hold merge operands
+  if (merge_operator_) {
+    pinned_iters_mgr.StartPinning();
   }
-}
-
-void Version::PrepareApply(
-    const MutableCFOptions& mutable_cf_options,
-    bool update_stats) {
-  UpdateAccumulatedStats(update_stats);
-  storage_info_.UpdateNumNonEmptyLevels();
-  storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
-  storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
-  storage_info_.GenerateFileIndexer();
-  storage_info_.GenerateLevelFilesBrief();
-  storage_info_.GenerateLevel0NonOverlapping();
-  storage_info_.GenerateBottommostFiles();
-}
+  uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
+
+  if (vset_ && vset_->block_cache_tracer_ &&
+      vset_->block_cache_tracer_->is_tracing_enabled()) {
+    tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
+  }
+  // Even though we know the batch size won't be > MAX_BATCH_SIZE,
+  // use autovector in order to avoid unnecessary construction of GetContext
+  // objects, which is expensive
+  autovector<GetContext, 16> get_ctx;
+  for (auto iter = range->begin(); iter != range->end(); ++iter) {
+    assert(iter->s->ok() || iter->s->IsMergeInProgress());
+    get_ctx.emplace_back(
+        user_comparator(), merge_operator_, info_log_, db_statistics_,
+        iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
+        iter->value, nullptr, &(iter->merge_context), true,
+        &iter->max_covering_tombstone_seq, this->env_, nullptr,
+        merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
+        tracing_mget_id);
+    // MergeInProgress status, if set, has been transferred to the get_context
+    // state, so we set status to ok here. From now on, the iter status will
+    // be used for IO errors, and get_context state will be used for any
+    // key level errors
+    *(iter->s) = Status::OK();
+  }
+  int get_ctx_index = 0;
+  for (auto iter = range->begin(); iter != range->end();
+       ++iter, get_ctx_index++) {
+    iter->get_context = &(get_ctx[get_ctx_index]);
+  }
+
+  MultiGetRange file_picker_range(*range, range->begin(), range->end());
+  FilePickerMultiGet fp(
+      &file_picker_range,
+      &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
+      &storage_info_.file_indexer_, user_comparator(), internal_comparator());
+  FdWithKeyRange* f = fp.GetNextFile();
 
-bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
-  if (file_meta->init_stats_from_file ||
-      file_meta->compensated_file_size > 0) {
-    return false;
-  }
-  std::shared_ptr<const TableProperties> tp;
-  Status s = GetTableProperties(&tp, file_meta);
-  file_meta->init_stats_from_file = true;
-  if (!s.ok()) {
-    ROCKS_LOG_ERROR(vset_->db_options_->info_log,
-                    "Unable to load table properties for file %" PRIu64
-                    " --- %s\n",
-                    file_meta->fd.GetNumber(), s.ToString().c_str());
-    return false;
-  }
-  if (tp.get() == nullptr) return false;
-  file_meta->num_entries = tp->num_entries;
-  file_meta->num_deletions = tp->num_deletions;
+  while (f != nullptr) {
+    MultiGetRange file_range = fp.CurrentFileRange();
+    bool timer_enabled =
+        GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
+        get_perf_context()->per_level_perf_context_enabled;
+    StopWatchNano timer(env_, timer_enabled /* auto_start */);
+    Status s = table_cache_->MultiGet(
+        read_options, *internal_comparator(), *f->file_metadata, &file_range,
+        mutable_cf_options_.prefix_extractor.get(),
+        cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
+        IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
+                        fp.IsHitFileLastInLevel()),
+        fp.GetCurrentLevel());
+    // TODO: examine the behavior for corrupted key
+    if (timer_enabled) {
+      PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
+                                fp.GetCurrentLevel());
+    }
+    if (!s.ok()) {
+      // TODO: Set status for individual keys appropriately
+      for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
+        *iter->s = s;
+        file_range.MarkKeyDone(iter);
+      }
+      return;
+    }
+    uint64_t batch_size = 0;
+    for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
+      GetContext& get_context = *iter->get_context;
+      Status* status = iter->s;
+      // The Status in the KeyContext takes precedence over GetContext state
+      // Status may be an error if there were any IO errors in the table
+      // reader. We never expect Status to be NotFound(), as that is
+      // determined by get_context
+      assert(!status->IsNotFound());
+      if (!status->ok()) {
+        file_range.MarkKeyDone(iter);
+        continue;
+      }
+
+      if (get_context.sample()) {
+        sample_file_read_inc(f->file_metadata);
+      }
+      batch_size++;
+      // report the counters before returning
+      if (get_context.State() != GetContext::kNotFound &&
+          get_context.State() != GetContext::kMerge &&
+          db_statistics_ != nullptr) {
+        get_context.ReportCounters();
+      } else {
+        if (iter->max_covering_tombstone_seq > 0) {
+          // The remaining files we look at will only contain covered keys, so
+          // we stop here for this key
+          file_picker_range.SkipKey(iter);
+        }
+      }
+      switch (get_context.State()) {
+        case GetContext::kNotFound:
+          // Keep searching in other files
+          break;
+        case GetContext::kMerge:
+          // TODO: update per-level perfcontext user_key_return_count for kMerge
+          break;
+        case GetContext::kFound:
+          if (fp.GetHitFileLevel() == 0) {
+            RecordTick(db_statistics_, GET_HIT_L0);
+          } else if (fp.GetHitFileLevel() == 1) {
+            RecordTick(db_statistics_, GET_HIT_L1);
+          } else if (fp.GetHitFileLevel() >= 2) {
+            RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
+          }
+          PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
+                                    fp.GetHitFileLevel());
+          file_range.MarkKeyDone(iter);
+          continue;
+        case GetContext::kDeleted:
+          // Use empty error message for speed
+          *status = Status::NotFound();
+          file_range.MarkKeyDone(iter);
+          continue;
+        case GetContext::kCorrupt:
+          *status =
+              Status::Corruption("corrupted key for ", iter->lkey->user_key());
+          file_range.MarkKeyDone(iter);
+          continue;
+        case GetContext::kBlobIndex:
+          ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
+          *status = Status::NotSupported(
+              "Encounter unexpected blob index. Please open DB with "
+              "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
+          file_range.MarkKeyDone(iter);
+          continue;
+      }
+    }
+    RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
+    if (file_picker_range.empty()) {
+      break;
+    }
+    f = fp.GetNextFile();
+  }
+
+  // Process any left over keys
+  for (auto iter = range->begin(); iter != range->end(); ++iter) {
+    GetContext& get_context = *iter->get_context;
+    Status* status = iter->s;
+    Slice user_key = iter->lkey->user_key();
+
+    if (db_statistics_ != nullptr) {
+      get_context.ReportCounters();
+    }
+    if (GetContext::kMerge == get_context.State()) {
+      if (!merge_operator_) {
+        *status = Status::InvalidArgument(
+            "merge_operator is not properly initialized.");
+        range->MarkKeyDone(iter);
+        continue;
+      }
+      // merge_operands are in saver and we hit the beginning of the key history
+      // do a final merge of nullptr and operands;
+      std::string* str_value =
+          iter->value != nullptr ? iter->value->GetSelf() : nullptr;
+      *status = MergeHelper::TimedFullMerge(
+          merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
+          str_value, info_log_, db_statistics_, env_,
+          nullptr /* result_operand */, true);
+      if (LIKELY(iter->value != nullptr)) {
+        iter->value->PinSelf();
+      }
+    } else {
+      range->MarkKeyDone(iter);
+      *status = Status::NotFound();  // Use an empty error message for speed
+    }
+  }
+}
+
+bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
+  // Reaching the bottom level implies misses at all upper levels, so we'll
+  // skip checking the filters when we predict a hit.
+  return cfd_->ioptions()->optimize_filters_for_hits &&
+         (level > 0 || is_file_last_in_level) &&
+         level == storage_info_.num_non_empty_levels() - 1;
+}
+
+void VersionStorageInfo::GenerateLevelFilesBrief() {
+  level_files_brief_.resize(num_non_empty_levels_);
+  for (int level = 0; level < num_non_empty_levels_; level++) {
+    DoGenerateLevelFilesBrief(
+        &level_files_brief_[level], files_[level], &arena_);
+  }
+}
+
+void Version::PrepareApply(
+    const MutableCFOptions& mutable_cf_options,
+    bool update_stats) {
+  UpdateAccumulatedStats(update_stats);
+  storage_info_.UpdateNumNonEmptyLevels();
+  storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
+  storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
+  storage_info_.GenerateFileIndexer();
+  storage_info_.GenerateLevelFilesBrief();
+  storage_info_.GenerateLevel0NonOverlapping();
+  storage_info_.GenerateBottommostFiles();
+}
+
+bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
+  if (file_meta->init_stats_from_file ||
+      file_meta->compensated_file_size > 0) {
+    return false;
+  }
+  std::shared_ptr<const TableProperties> tp;
+  Status s = GetTableProperties(&tp, file_meta);
+  file_meta->init_stats_from_file = true;
+  if (!s.ok()) {
+    ROCKS_LOG_ERROR(vset_->db_options_->info_log,
+                    "Unable to load table properties for file %" PRIu64
+                    " --- %s\n",
+                    file_meta->fd.GetNumber(), s.ToString().c_str());
+    return false;
+  }
+  if (tp.get() == nullptr) return false;
+  file_meta->num_entries = tp->num_entries;
+  file_meta->num_deletions = tp->num_deletions;
   file_meta->raw_value_size = tp->raw_value_size;
   file_meta->raw_key_size = tp->raw_key_size;
 
@@ -1372,6 +2130,9 @@ bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
 }
 
 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
+  TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
+                           nullptr);
+
   assert(file_meta->init_stats_from_file);
   accumulated_file_size_ += file_meta->fd.GetFileSize();
   accumulated_raw_key_size_ += file_meta->raw_key_size;
@@ -1591,13 +2352,11 @@ uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
   auto status = ioptions.env->GetCurrentTime(&_current_time);
   if (status.ok()) {
     const uint64_t current_time = static_cast<uint64_t>(_current_time);
-    for (auto f : files) {
-      if (!f->being_compacted && f->fd.table_reader != nullptr &&
-          f->fd.table_reader->GetTableProperties() != nullptr) {
-        auto creation_time =
-            f->fd.table_reader->GetTableProperties()->creation_time;
-        if (creation_time > 0 &&
-            creation_time < (current_time - mutable_cf_options.ttl)) {
+    for (FileMetaData* f : files) {
+      if (!f->being_compacted) {
+        uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
+        if (oldest_ancester_time != 0 &&
+            oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
           ttl_expired_files_count++;
         }
       }
@@ -1705,6 +2464,10 @@ void VersionStorageInfo::ComputeCompactionScore(
   if (mutable_cf_options.ttl > 0) {
     ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
   }
+  if (mutable_cf_options.periodic_compaction_seconds > 0) {
+    ComputeFilesMarkedForPeriodicCompaction(
+        immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
+  }
   EstimateCompactionBytesNeeded(mutable_cf_options);
 }
 
@@ -1745,12 +2508,11 @@ void VersionStorageInfo::ComputeExpiredTtlFiles(
   const uint64_t current_time = static_cast<uint64_t>(_current_time);
 
   for (int level = 0; level < num_levels() - 1; level++) {
-    for (auto f : files_[level]) {
-      if (!f->being_compacted && f->fd.table_reader != nullptr &&
-          f->fd.table_reader->GetTableProperties() != nullptr) {
-        auto creation_time =
-            f->fd.table_reader->GetTableProperties()->creation_time;
-        if (creation_time > 0 && creation_time < (current_time - ttl)) {
+    for (FileMetaData* f : files_[level]) {
+      if (!f->being_compacted) {
+        uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
+        if (oldest_ancester_time > 0 &&
+            oldest_ancester_time < (current_time - ttl)) {
           expired_ttl_files_.emplace_back(level, f);
         }
       }
@@ -1758,6 +2520,63 @@ void VersionStorageInfo::ComputeExpiredTtlFiles(
   }
 }
 
+void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
+    const ImmutableCFOptions& ioptions,
+    const uint64_t periodic_compaction_seconds) {
+  assert(periodic_compaction_seconds > 0);
+
+  files_marked_for_periodic_compaction_.clear();
+
+  int64_t temp_current_time;
+  auto status = ioptions.env->GetCurrentTime(&temp_current_time);
+  if (!status.ok()) {
+    return;
+  }
+  const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
+
+  // If periodic_compaction_seconds is larger than current time, periodic
+  // compaction can't possibly be triggered.
+  if (periodic_compaction_seconds > current_time) {
+    return;
+  }
+
+  const uint64_t allowed_time_limit =
+      current_time - periodic_compaction_seconds;
+
+  for (int level = 0; level < num_levels(); level++) {
+    for (auto f : files_[level]) {
+      if (!f->being_compacted) {
+        // Compute a file's modification time in the following order:
+        // 1. Use file_creation_time table property if it is > 0.
+        // 2. Use creation_time table property if it is > 0.
+        // 3. Use file's mtime metadata if the above two table properties are 0.
+        // Don't consider the file at all if the modification time cannot be
+        // correctly determined based on the above conditions.
+        uint64_t file_modification_time = f->TryGetFileCreationTime();
+        if (file_modification_time == kUnknownFileCreationTime) {
+          file_modification_time = f->TryGetOldestAncesterTime();
+        }
+        if (file_modification_time == kUnknownOldestAncesterTime) {
+          auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
+                                         f->fd.GetPathId());
+          status = ioptions.env->GetFileModificationTime(
+              file_path, &file_modification_time);
+          if (!status.ok()) {
+            ROCKS_LOG_WARN(ioptions.info_log,
+                           "Can't get file modification time: %s: %s",
+                           file_path.c_str(), status.ToString().c_str());
+            continue;
+          }
+        }
+        if (file_modification_time > 0 &&
+            file_modification_time < allowed_time_limit) {
+          files_marked_for_periodic_compaction_.emplace_back(level, f);
+        }
+      }
+    }
+  }
+}
+
 namespace {
 
 // used to sort files by size
@@ -2120,11 +2939,12 @@ void VersionStorageInfo::GetOverlappingInputs(
       FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
       const Slice file_start = ExtractUserKey(f->smallest_key);
       const Slice file_limit = ExtractUserKey(f->largest_key);
-      if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
+      if (begin != nullptr &&
+          user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
         // "f" is completely before specified range; skip it
         iter++;
       } else if (end != nullptr &&
-                 user_cmp->Compare(file_start, user_end) > 0) {
+                 user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
         // "f" is completely after specified range; skip it
         iter++;
       } else {
@@ -2139,10 +2959,11 @@ void VersionStorageInfo::GetOverlappingInputs(
         iter = index.erase(iter);
         if (expand_range) {
           if (begin != nullptr &&
-              user_cmp->Compare(file_start, user_begin) < 0) {
+              user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
             user_begin = file_start;
           }
-          if (end != nullptr && user_cmp->Compare(file_limit, user_end) > 0) {
+          if (end != nullptr &&
+              user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
             user_end = file_limit;
           }
         }
@@ -2175,14 +2996,6 @@ void VersionStorageInfo::GetCleanInputsWithinInterval(
     return;
   }
 
-  const auto& level_files = level_files_brief_[level];
-  if (begin == nullptr) {
-    begin = &level_files.files[0].file_metadata->smallest;
-  }
-  if (end == nullptr) {
-    end = &level_files.files[level_files.num_files - 1].file_metadata->largest;
-  }
-
   GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
                                         hint_index, file_index,
                                         true /* within_interval */);
@@ -2200,67 +3013,94 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
     bool within_interval, InternalKey** next_smallest) const {
   assert(level > 0);
-  int min = 0;
-  int mid = 0;
-  int max = static_cast<int>(files_[level].size()) - 1;
-  bool foundOverlap = false;
+
   auto user_cmp = user_comparator_;
+  const FdWithKeyRange* files = level_files_brief_[level].files;
+  const int num_files = static_cast<int>(level_files_brief_[level].num_files);
 
-  // if the caller already knows the index of a file that has overlap,
-  // then we can skip the binary search.
-  if (hint_index != -1) {
-    mid = hint_index;
-    foundOverlap = true;
-  }
-
-  while (!foundOverlap && min <= max) {
-    mid = (min + max)/2;
-    FdWithKeyRange* f = &(level_files_brief_[level].files[mid]);
-    auto& smallest = f->file_metadata->smallest;
-    auto& largest = f->file_metadata->largest;
-    if ((!within_interval && sstableKeyCompare(user_cmp, begin, largest) > 0) ||
-        (within_interval && sstableKeyCompare(user_cmp, begin, smallest) > 0)) {
-      min = mid + 1;
-    } else if ((!within_interval &&
-                sstableKeyCompare(user_cmp, smallest, end) > 0) ||
-               (within_interval &&
-                sstableKeyCompare(user_cmp, largest, end) > 0)) {
-      max = mid - 1;
-    } else {
-      foundOverlap = true;
-      break;
+  // begin to use binary search to find lower bound
+  // and upper bound.
+  int start_index = 0;
+  int end_index = num_files;
+
+  if (begin != nullptr) {
+    // if within_interval is true, with file_key would find
+    // not overlapping ranges in std::lower_bound.
+    auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
+                                             const InternalKey* k) {
+      auto& file_key = within_interval ? f.file_metadata->smallest
+                                       : f.file_metadata->largest;
+      return sstableKeyCompare(user_cmp, file_key, *k) < 0;
+    };
+
+    start_index = static_cast<int>(
+        std::lower_bound(files,
+                         files + (hint_index == -1 ? num_files : hint_index),
+                         begin, cmp) -
+        files);
+
+    if (start_index > 0 && within_interval) {
+      bool is_overlapping = true;
+      while (is_overlapping && start_index < num_files) {
+        auto& pre_limit = files[start_index - 1].file_metadata->largest;
+        auto& cur_start = files[start_index].file_metadata->smallest;
+        is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
+        start_index += is_overlapping;
+      }
     }
   }
 
+  if (end != nullptr) {
+    // if within_interval is true, with file_key would find
+    // not overlapping ranges in std::upper_bound.
+    auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
+                                             const FdWithKeyRange& f) {
+      auto& file_key = within_interval ? f.file_metadata->largest
+                                       : f.file_metadata->smallest;
+      return sstableKeyCompare(user_cmp, *k, file_key) < 0;
+    };
+
+    end_index = static_cast<int>(
+        std::upper_bound(files + start_index, files + num_files, end, cmp) -
+        files);
+
+    if (end_index < num_files && within_interval) {
+      bool is_overlapping = true;
+      while (is_overlapping && end_index > start_index) {
+        auto& next_start = files[end_index].file_metadata->smallest;
+        auto& cur_limit = files[end_index - 1].file_metadata->largest;
+        is_overlapping =
+            sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
+        end_index -= is_overlapping;
+      }
+    }
+  }
+
+  assert(start_index <= end_index);
+
   // If there were no overlapping files, return immediately.
-  if (!foundOverlap) {
+  if (start_index == end_index) {
     if (next_smallest) {
       *next_smallest = nullptr;
     }
     return;
   }
+
+  assert(start_index < end_index);
+
   // returns the index where an overlap is found
   if (file_index) {
-    *file_index = mid;
+    *file_index = start_index;
   }
 
-  int start_index, end_index;
-  if (within_interval) {
-    ExtendFileRangeWithinInterval(level, begin, end, mid,
-                                  &start_index, &end_index);
-  } else {
-    ExtendFileRangeOverlappingInterval(level, begin, end, mid,
-                                       &start_index, &end_index);
-    assert(end_index >= start_index);
-  }
   // insert overlapping files into vector
-  for (int i = start_index; i <= end_index; i++) {
+  for (int i = start_index; i < end_index; i++) {
     inputs->push_back(files_[level][i]);
   }
 
   if (next_smallest != nullptr) {
     // Provide the next key outside the range covered by inputs
-    if (++end_index < static_cast<int>(files_[level].size())) {
+    if (end_index < static_cast<int>(files_[level].size())) {
       **next_smallest = files_[level][end_index]->smallest;
     } else {
       *next_smallest = nullptr;
@@ -2268,136 +3108,6 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
   }
 }
 
-// Store in *start_index and *end_index the range of all files in
-// "level" that overlap [begin,end]
-// The mid_index specifies the index of at least one file that
-// overlaps the specified range. From that file, iterate backward
-// and forward to find all overlapping files.
-// Use FileLevel in searching, make it faster
-void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
-    int level, const InternalKey* begin, const InternalKey* end,
-    unsigned int mid_index, int* start_index, int* end_index) const {
-  auto user_cmp = user_comparator_;
-  const FdWithKeyRange* files = level_files_brief_[level].files;
-#ifndef NDEBUG
-  {
-    // assert that the file at mid_index overlaps with the range
-    assert(mid_index < level_files_brief_[level].num_files);
-    const FdWithKeyRange* f = &files[mid_index];
-    auto& smallest = f->file_metadata->smallest;
-    auto& largest = f->file_metadata->largest;
-    if (sstableKeyCompare(user_cmp, begin, smallest) <= 0) {
-      assert(sstableKeyCompare(user_cmp, smallest, end) <= 0);
-    } else {
-      // fprintf(stderr, "ExtendFileRangeOverlappingInterval\n%s - %s\n%s - %s\n%d %d\n",
-      //         begin ? begin->DebugString().c_str() : "(null)",
-      //         end ? end->DebugString().c_str() : "(null)",
-      //         smallest->DebugString().c_str(),
-      //         largest->DebugString().c_str(),
-      //         sstableKeyCompare(user_cmp, smallest, begin),
-      //         sstableKeyCompare(user_cmp, largest, begin));
-      assert(sstableKeyCompare(user_cmp, begin, largest) <= 0);
-    }
-  }
-#endif
-  *start_index = mid_index + 1;
-  *end_index = mid_index;
-  int count __attribute__((__unused__));
-  count = 0;
-
-  // check backwards from 'mid' to lower indices
-  for (int i = mid_index; i >= 0 ; i--) {
-    const FdWithKeyRange* f = &files[i];
-    auto& largest = f->file_metadata->largest;
-    if (sstableKeyCompare(user_cmp, begin, largest) <= 0) {
-      *start_index = i;
-      assert((count++, true));
-    } else {
-      break;
-    }
-  }
-  // check forward from 'mid+1' to higher indices
-  for (unsigned int i = mid_index+1;
-       i < level_files_brief_[level].num_files; i++) {
-    const FdWithKeyRange* f = &files[i];
-    auto& smallest = f->file_metadata->smallest;
-    if (sstableKeyCompare(user_cmp, smallest, end) <= 0) {
-      assert((count++, true));
-      *end_index = i;
-    } else {
-      break;
-    }
-  }
-  assert(count == *end_index - *start_index + 1);
-}
-
-// Store in *start_index and *end_index the clean range of all files in
-// "level" within [begin,end]
-// The mid_index specifies the index of at least one file within
-// the specified range. From that file, iterate backward
-// and forward to find all overlapping files and then "shrink" to
-// the clean range required.
-// Use FileLevel in searching, make it faster
-void VersionStorageInfo::ExtendFileRangeWithinInterval(
-    int level, const InternalKey* begin, const InternalKey* end,
-    unsigned int mid_index, int* start_index, int* end_index) const {
-  assert(level != 0);
-  auto* user_cmp = user_comparator_;
-  const FdWithKeyRange* files = level_files_brief_[level].files;
-#ifndef NDEBUG
-  {
-    // assert that the file at mid_index is within the range
-    assert(mid_index < level_files_brief_[level].num_files);
-    const FdWithKeyRange* f = &files[mid_index];
-    auto& smallest = f->file_metadata->smallest;
-    auto& largest = f->file_metadata->largest;
-    assert(sstableKeyCompare(user_cmp, begin, smallest) <= 0 &&
-           sstableKeyCompare(user_cmp, largest, end) <= 0);
-  }
-#endif
-  ExtendFileRangeOverlappingInterval(level, begin, end, mid_index,
-                                     start_index, end_index);
-  int left = *start_index;
-  int right = *end_index;
-  // shrink from left to right
-  while (left <= right) {
-    auto& smallest = files[left].file_metadata->smallest;
-    if (sstableKeyCompare(user_cmp, begin, smallest) > 0) {
-      left++;
-      continue;
-    }
-    if (left > 0) {  // If not first file
-      auto& largest = files[left - 1].file_metadata->largest;
-      if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
-        left++;
-        continue;
-      }
-    }
-    break;
-  }
-  // shrink from right to left
-  while (left <= right) {
-    auto& largest = files[right].file_metadata->largest;
-    if (sstableKeyCompare(user_cmp, largest, end) > 0) {
-      right--;
-      continue;
-    }
-    if (right < static_cast<int>(level_files_brief_[level].num_files) -
-                    1) {  // If not the last file
-      auto& smallest = files[right + 1].file_metadata->smallest;
-      if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
-        // The last user key in range overlaps with the next file's first key
-        right--;
-        continue;
-      }
-    }
-    break;
-  }
-
-  *start_index = left;
-  *end_index = right;
-}
-
 uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
   assert(level >= 0);
   assert(level < num_levels());
@@ -2576,7 +3286,7 @@ void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
         // base_bytes_min. We set it be base_bytes_min.
         base_level_size = base_bytes_min + 1U;
         base_level_ = first_non_empty_level;
-        ROCKS_LOG_WARN(ioptions.info_log,
+        ROCKS_LOG_INFO(ioptions.info_log,
                        "More existing levels in DB than needed. "
                        "max_bytes_for_level_multiplier may not be guaranteed.");
       } else {
@@ -2713,11 +3423,11 @@ std::string Version::DebugString(bool hex, bool print_stats) const {
   for (int level = 0; level < storage_info_.num_levels_; level++) {
     // E.g.,
     //   --- level 1 ---
-    //   17:123['a' .. 'd']
-    //   20:43['e' .. 'g']
+    //   17:123[1 .. 124]['a' .. 'd']
+    //   20:43[124 .. 128]['e' .. 'g']
     //
     // if print_stats=true:
-    //   17:123['a' .. 'd'](4096)
+    //   17:123[1 .. 124]['a' .. 'd'](4096)
     r.append("--- level ");
     AppendNumberTo(&r, level);
     r.append(" --- version# ");
@@ -2730,10 +3440,19 @@ std::string Version::DebugString(bool hex, bool print_stats) const {
       r.push_back(':');
       AppendNumberTo(&r, files[i]->fd.GetFileSize());
       r.append("[");
+      AppendNumberTo(&r, files[i]->fd.smallest_seqno);
+      r.append(" .. ");
+      AppendNumberTo(&r, files[i]->fd.largest_seqno);
+      r.append("]");
+      r.append("[");
       r.append(files[i]->smallest.DebugString(hex));
       r.append(" .. ");
       r.append(files[i]->largest.DebugString(hex));
       r.append("]");
+      if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
+        r.append(" blob_file:");
+        AppendNumberTo(&r, files[i]->oldest_blob_file_number);
+      }
       if (print_stats) {
         r.append("(");
         r.append(ToString(
@@ -2765,15 +3484,62 @@ struct VersionSet::ManifestWriter {
         edit_list(e) {}
 };
 
+Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
+  assert(edit);
+  if (edit->is_in_atomic_group_) {
+    TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
+    if (replay_buffer_.empty()) {
+      replay_buffer_.resize(edit->remaining_entries_ + 1);
+      TEST_SYNC_POINT_CALLBACK(
+          "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
+    }
+    read_edits_in_atomic_group_++;
+    if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
+        static_cast<uint32_t>(replay_buffer_.size())) {
+      TEST_SYNC_POINT_CALLBACK(
+          "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
+      return Status::Corruption("corrupted atomic group");
+    }
+    replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
+    if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
+      TEST_SYNC_POINT_CALLBACK(
+          "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
+      return Status::OK();
+    }
+    return Status::OK();
+  }
+
+  // A normal edit.
+  if (!replay_buffer().empty()) {
+    TEST_SYNC_POINT_CALLBACK(
+        "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
+    return Status::Corruption("corrupted atomic group");
+  }
+  return Status::OK();
+}
+
+bool AtomicGroupReadBuffer::IsFull() const {
+  return read_edits_in_atomic_group_ == replay_buffer_.size();
+}
+
+bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
+
+void AtomicGroupReadBuffer::Clear() {
+  read_edits_in_atomic_group_ = 0;
+  replay_buffer_.clear();
+}
+
 VersionSet::VersionSet(const std::string& dbname,
                        const ImmutableDBOptions* _db_options,
-                       const EnvOptions& storage_options, Cache* table_cache,
+                       const FileOptions& storage_options, Cache* table_cache,
                        WriteBufferManager* write_buffer_manager,
-                       WriteController* write_controller)
-    : column_family_set_(
-          new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
-                              write_buffer_manager, write_controller)),
+                       WriteController* write_controller,
+                       BlockCacheTracer* const block_cache_tracer)
+    : column_family_set_(new ColumnFamilySet(
+          dbname, _db_options, storage_options, table_cache,
+          write_buffer_manager, write_controller, block_cache_tracer)),
       env_(_db_options->env),
+      fs_(_db_options->fs.get()),
       dbname_(dbname),
       db_options_(_db_options),
       next_file_number_(2),
@@ -2786,18 +3552,13 @@ VersionSet::VersionSet(const std::string& dbname,
       prev_log_number_(0),
       current_version_number_(0),
       manifest_file_size_(0),
-      env_options_(storage_options) {}
-
-void CloseTables(void* ptr, size_t) {
-  TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
-  table_reader->Close();
-}
+      file_options_(storage_options),
+      block_cache_tracer_(block_cache_tracer) {}
 
 VersionSet::~VersionSet() {
   // we need to delete column_family_set_ because its destructor depends on
   // VersionSet
   Cache* table_cache = column_family_set_->get_table_cache();
-  table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */);
   column_family_set_.reset();
   for (auto& file : obsolete_files_) {
     if (file.metadata->table_reader_handle) {
@@ -2917,7 +3678,7 @@ Status VersionSet::ProcessManifestWrites(
         }
       }
       if (version == nullptr) {
-        version = new Version(last_writer->cfd, this, env_options_,
+        version = new Version(last_writer->cfd, this, file_options_,
                               last_writer->mutable_cf_options,
                               current_version_number_++);
         versions.push_back(version);
@@ -2937,7 +3698,14 @@ Status VersionSet::ProcessManifestWrites(
         } else if (group_start != std::numeric_limits<size_t>::max()) {
           group_start = std::numeric_limits<size_t>::max();
         }
-        LogAndApplyHelper(last_writer->cfd, builder, e, mu);
+        Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
+        if (!s.ok()) {
+          // free up the allocated memory
+          for (auto v : versions) {
+            delete v;
+          }
+          return s;
+        }
         batch_edits.push_back(e);
       }
     }
@@ -2945,7 +3713,14 @@ Status VersionSet::ProcessManifestWrites(
       assert(!builder_guards.empty() &&
              builder_guards.size() == versions.size());
       auto* builder = builder_guards[i]->version_builder();
-      builder->SaveTo(versions[i]->storage_info());
+      Status s = builder->SaveTo(versions[i]->storage_info());
+      if (!s.ok()) {
+        // free up the allocated memory
+        for (auto v : versions) {
+          delete v;
+        }
+        return s;
+      }
     }
   }
 
@@ -2992,24 +3767,33 @@ Status VersionSet::ProcessManifestWrites(
   if (!descriptor_log_ ||
       manifest_file_size_ > db_options_->max_manifest_file_size) {
     TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
-    pending_manifest_file_number_ = NewFileNumber();
-    batch_edits.back()->SetNextFile(next_file_number_.load());
     new_descriptor_log = true;
   } else {
     pending_manifest_file_number_ = manifest_file_number_;
   }
 
+  // Local cached copy of state variable(s). WriteCurrentStateToManifest()
+  // reads its content after releasing db mutex to avoid race with
+  // SwitchMemtable().
+  std::unordered_map<uint32_t, MutableCFState> curr_state;
   if (new_descriptor_log) {
+    pending_manifest_file_number_ = NewFileNumber();
+    batch_edits.back()->SetNextFile(next_file_number_.load());
+
     // if we are writing out new snapshot make sure to persist max column
     // family.
     if (column_family_set_->GetMaxColumnFamily() > 0) {
       first_writer.edit_list.front()->SetMaxColumnFamily(
           column_family_set_->GetMaxColumnFamily());
     }
+    for (const auto* cfd : *column_family_set_) {
+      assert(curr_state.find(cfd->GetID()) == curr_state.end());
+      curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
+    }
   }
 
   {
-    EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
+    FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
     mu->Unlock();
 
     TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
@@ -3020,46 +3804,52 @@ Status VersionSet::ProcessManifestWrites(
         assert(!mutable_cf_options_ptrs.empty() &&
                builder_guards.size() == versions.size());
         ColumnFamilyData* cfd = versions[i]->cfd_;
-        builder_guards[i]->version_builder()->LoadTableHandlers(
+        s = builder_guards[i]->version_builder()->LoadTableHandlers(
             cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
             true /* prefetch_index_and_filter_in_cache */,
             false /* is_initial_load */,
             mutable_cf_options_ptrs[i]->prefix_extractor.get());
+        if (!s.ok()) {
+          if (db_options_->paranoid_checks) {
+            break;
+          }
+          s = Status::OK();
+        }
       }
     }
 
-    // This is fine because everything inside of this block is serialized --
-    // only one thread can be here at the same time
-    if (new_descriptor_log) {
+    if (s.ok() && new_descriptor_log) {
+      // This is fine because everything inside of this block is serialized --
+      // only one thread can be here at the same time
       // create new manifest file
       ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
                      pending_manifest_file_number_);
       std::string descriptor_fname =
           DescriptorFileName(dbname_, pending_manifest_file_number_);
-      std::unique_ptr<WritableFile> descriptor_file;
-      s = NewWritableFile(env_, descriptor_fname, &descriptor_file,
-                          opt_env_opts);
+      std::unique_ptr<FSWritableFile> descriptor_file;
+      s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
+                          opt_file_opts);
       if (s.ok()) {
         descriptor_file->SetPreallocationBlockSize(
             db_options_->manifest_preallocation_size);
 
         std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
-            std::move(descriptor_file), descriptor_fname, opt_env_opts, env_,
+            std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
             nullptr, db_options_->listeners));
         descriptor_log_.reset(
             new log::Writer(std::move(file_writer), 0, false));
-        s = WriteSnapshot(descriptor_log_.get());
+        s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
       }
     }
 
-    if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
-      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
-        versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
+    if (s.ok()) {
+      if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
+        for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
+          versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
+        }
       }
-    }
 
-    // Write new records to MANIFEST log
-    if (s.ok()) {
+      // Write new records to MANIFEST log
 #ifndef NDEBUG
       size_t idx = 0;
 #endif
@@ -3074,8 +3864,9 @@ Status VersionSet::ProcessManifestWrites(
                          rocksdb_kill_odds * REDUCE_ODDS2);
 #ifndef NDEBUG
         if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
-          TEST_SYNC_POINT(
-              "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0");
+          TEST_SYNC_POINT_CALLBACK(
+              "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
+              nullptr);
           TEST_SYNC_POINT(
               "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
         }
@@ -3135,9 +3926,7 @@ Status VersionSet::ProcessManifestWrites(
     } else if (first_writer.edit_list.front()->is_column_family_drop_) {
       assert(batch_edits.size() == 1);
       first_writer.cfd->SetDropped();
-      if (first_writer.cfd->Unref()) {
-        delete first_writer.cfd;
-      }
+      first_writer.cfd->UnrefAndTryDelete();
     } else {
       // Each version in versions corresponds to a column family.
       // For each column family, update its log number indicating that logs
@@ -3189,12 +3978,15 @@ Status VersionSet::ProcessManifestWrites(
     for (auto v : versions) {
       delete v;
     }
+    // If manifest append failed for whatever reason, the file could be
+    // corrupted. So we need to force the next version update to start a
+    // new manifest file.
+    descriptor_log_.reset();
     if (new_descriptor_log) {
       ROCKS_LOG_INFO(db_options_->info_log,
                      "Deleting manifest %" PRIu64 " current manifest %" PRIu64
                      "\n",
                      manifest_file_number_, pending_manifest_file_number_);
-      descriptor_log_.reset();
       env_->DeleteFile(
           DescriptorFileName(dbname_, pending_manifest_file_number_));
     }
@@ -3294,8 +4086,6 @@ Status VersionSet::LogAndApply(
     }
   }
   if (0 == num_undropped_cfds) {
-    // TODO (yanqin) maybe use a different status code to denote column family
-    // drop other than OK and ShutdownInProgress
     for (int i = 0; i != num_cfds; ++i) {
       manifest_writers_.pop_front();
     }
@@ -3303,7 +4093,7 @@ Status VersionSet::LogAndApply(
     if (!manifest_writers_.empty()) {
       manifest_writers_.front()->cv.Signal();
     }
-    return Status::ShutdownInProgress();
+    return Status::ColumnFamilyDropped();
   }
 
   return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
@@ -3327,9 +4117,9 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
   }
 }
 
-void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
-                                   VersionBuilder* builder, VersionEdit* edit,
-                                   InstrumentedMutex* mu) {
+Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
+                                     VersionBuilder* builder, VersionEdit* edit,
+                                     InstrumentedMutex* mu) {
 #ifdef NDEBUG
   (void)cfd;
 #endif
@@ -3353,7 +4143,9 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
   edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
                                                       : last_sequence_);
 
-  builder->Apply(edit);
+  Status s = builder->Apply(edit);
+
+  return s;
 }
 
 Status VersionSet::ApplyOneVersionEditToBuilder(
@@ -3362,10 +4154,7 @@ Status VersionSet::ApplyOneVersionEditToBuilder(
     std::unordered_map<int, std::string>& column_families_not_found,
     std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
         builders,
-    bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
-    uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
-    bool* have_last_sequence, SequenceNumber* last_sequence,
-    uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
+    VersionEditParams* version_edit_params) {
   // Not found means that user didn't supply that column
   // family option AND we encountered column family add
   // record. Once we encounter column family drop record,
@@ -3389,11 +4178,23 @@ Status VersionSet::ApplyOneVersionEditToBuilder(
           edit.column_family_name_);
     }
     auto cf_options = name_to_options.find(edit.column_family_name_);
-    if (cf_options == name_to_options.end()) {
+    // implicitly add persistent_stats column family without requiring user
+    // to specify
+    bool is_persistent_stats_column_family =
+        edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
+    if (cf_options == name_to_options.end() &&
+        !is_persistent_stats_column_family) {
       column_families_not_found.insert(
           {edit.column_family_, edit.column_family_name_});
     } else {
-      cfd = CreateColumnFamily(cf_options->second, &edit);
+      // recover persistent_stats CF from a DB that already contains it
+      if (is_persistent_stats_column_family) {
+        ColumnFamilyOptions cfo;
+        OptimizeForPersistentStats(&cfo);
+        cfd = CreateColumnFamily(cfo, &edit);
+      } else {
+        cfd = CreateColumnFamily(cf_options->second, &edit);
+      }
       cfd->set_initialized();
       builders.insert(std::make_pair(
           edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
@@ -3406,8 +4207,7 @@ Status VersionSet::ApplyOneVersionEditToBuilder(
       builders.erase(builder);
       cfd = column_family_set_->GetColumnFamily(edit.column_family_);
       assert(cfd != nullptr);
-      if (cfd->Unref()) {
-        delete cfd;
+      if (cfd->UnrefAndTryDelete()) {
         cfd = nullptr;
       } else {
         // who else can have reference to cfd!?
@@ -3434,71 +4234,74 @@ Status VersionSet::ApplyOneVersionEditToBuilder(
     // to builder
     auto builder = builders.find(edit.column_family_);
     assert(builder != builders.end());
-    builder->second->version_builder()->Apply(&edit);
+    Status s = builder->second->version_builder()->Apply(&edit);
+    if (!s.ok()) {
+      return s;
+    }
   }
-  return ExtractInfoFromVersionEdit(
-      cfd, edit, have_log_number, log_number, have_prev_log_number,
-      previous_log_number, have_next_file, next_file, have_last_sequence,
-      last_sequence, min_log_number_to_keep, max_column_family);
+  return ExtractInfoFromVersionEdit(cfd, edit, version_edit_params);
 }
 
 Status VersionSet::ExtractInfoFromVersionEdit(
-    ColumnFamilyData* cfd, const VersionEdit& edit, bool* have_log_number,
-    uint64_t* log_number, bool* have_prev_log_number,
-    uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
-    bool* have_last_sequence, SequenceNumber* last_sequence,
-    uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
+    ColumnFamilyData* cfd, const VersionEdit& from_edit,
+    VersionEditParams* version_edit_params) {
   if (cfd != nullptr) {
-    if (edit.has_log_number_) {
-      if (cfd->GetLogNumber() > edit.log_number_) {
+    if (from_edit.has_db_id_) {
+      version_edit_params->SetDBId(from_edit.db_id_);
+    }
+    if (from_edit.has_log_number_) {
+      if (cfd->GetLogNumber() > from_edit.log_number_) {
         ROCKS_LOG_WARN(
             db_options_->info_log,
             "MANIFEST corruption detected, but ignored - Log numbers in "
             "records NOT monotonically increasing");
       } else {
-        cfd->SetLogNumber(edit.log_number_);
-        *have_log_number = true;
-        *log_number = edit.log_number_;
+        cfd->SetLogNumber(from_edit.log_number_);
+        version_edit_params->SetLogNumber(from_edit.log_number_);
       }
     }
-    if (edit.has_comparator_ &&
-        edit.comparator_ != cfd->user_comparator()->Name()) {
+    if (from_edit.has_comparator_ &&
+        from_edit.comparator_ != cfd->user_comparator()->Name()) {
       return Status::InvalidArgument(
           cfd->user_comparator()->Name(),
-          "does not match existing comparator " + edit.comparator_);
+          "does not match existing comparator " + from_edit.comparator_);
     }
   }
 
-  if (edit.has_prev_log_number_) {
-    *previous_log_number = edit.prev_log_number_;
-    *have_prev_log_number = true;
+  if (from_edit.has_prev_log_number_) {
+    version_edit_params->SetPrevLogNumber(from_edit.prev_log_number_);
   }
 
-  if (edit.has_next_file_number_) {
-    *next_file = edit.next_file_number_;
-    *have_next_file = true;
+  if (from_edit.has_next_file_number_) {
+    version_edit_params->SetNextFile(from_edit.next_file_number_);
   }
 
-  if (edit.has_max_column_family_) {
-    *max_column_family = edit.max_column_family_;
+  if (from_edit.has_max_column_family_) {
+    version_edit_params->SetMaxColumnFamily(from_edit.max_column_family_);
   }
 
-  if (edit.has_min_log_number_to_keep_) {
-    *min_log_number_to_keep =
-        std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_);
+  if (from_edit.has_min_log_number_to_keep_) {
+    version_edit_params->min_log_number_to_keep_ =
+        std::max(version_edit_params->min_log_number_to_keep_,
+                 from_edit.min_log_number_to_keep_);
   }
 
-  if (edit.has_last_sequence_) {
-    *last_sequence = edit.last_sequence_;
-    *have_last_sequence = true;
+  if (from_edit.has_last_sequence_) {
+    version_edit_params->SetLastSequence(from_edit.last_sequence_);
   }
   return Status::OK();
 }
 
-Status VersionSet::GetCurrentManifestPath(std::string* manifest_path) {
+Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
+                                          FileSystem* fs,
+                                          std::string* manifest_path,
+                                          uint64_t* manifest_file_number) {
+  assert(fs != nullptr);
   assert(manifest_path != nullptr);
+  assert(manifest_file_number != nullptr);
+
   std::string fname;
-  Status s = ReadFileToString(env_, CurrentFileName(dbname_), &fname);
+  Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
   if (!s.ok()) {
     return s;
   }
@@ -3508,24 +4311,90 @@ Status VersionSet::GetCurrentManifestPath(std::string* manifest_path) {
   // remove the trailing '\n'
   fname.resize(fname.size() - 1);
   FileType type;
-  bool parse_ok = ParseFileName(fname, &manifest_file_number_, &type);
+  bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
   if (!parse_ok || type != kDescriptorFile) {
     return Status::Corruption("CURRENT file corrupted");
   }
-  *manifest_path = dbname_;
-  if (dbname_.back() != '/') {
+  *manifest_path = dbname;
+  if (dbname.back() != '/') {
     manifest_path->push_back('/');
   }
   *manifest_path += fname;
   return Status::OK();
 }
 
+Status VersionSet::ReadAndRecover(
+    log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
+    const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
+    std::unordered_map<int, std::string>& column_families_not_found,
+    std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
+        builders,
+    VersionEditParams* version_edit_params, std::string* db_id) {
+  assert(reader != nullptr);
+  assert(read_buffer != nullptr);
+  Status s;
+  Slice record;
+  std::string scratch;
+  size_t recovered_edits = 0;
+  while (reader->ReadRecord(&record, &scratch) && s.ok()) {
+    VersionEdit edit;
+    s = edit.DecodeFrom(record);
+    if (!s.ok()) {
+      break;
+    }
+    if (edit.has_db_id_) {
+      db_id_ = edit.GetDbId();
+      if (db_id != nullptr) {
+        db_id->assign(edit.GetDbId());
+      }
+    }
+    s = read_buffer->AddEdit(&edit);
+    if (!s.ok()) {
+      break;
+    }
+    if (edit.is_in_atomic_group_) {
+      if (read_buffer->IsFull()) {
+        // Apply edits in an atomic group when we have read all edits in the
+        // group.
+        for (auto& e : read_buffer->replay_buffer()) {
+          s = ApplyOneVersionEditToBuilder(e, name_to_options,
+                                           column_families_not_found, builders,
+                                           version_edit_params);
+          if (!s.ok()) {
+            break;
+          }
+          recovered_edits++;
+        }
+        if (!s.ok()) {
+          break;
+        }
+        read_buffer->Clear();
+      }
+    } else {
+      // Apply a normal edit immediately.
+      s = ApplyOneVersionEditToBuilder(edit, name_to_options,
+                                       column_families_not_found, builders,
+                                       version_edit_params);
+      if (s.ok()) {
+        recovered_edits++;
+      }
+    }
+  }
+  if (!s.ok()) {
+    // Clear the buffer if we fail to decode/apply an edit.
+    read_buffer->Clear();
+  }
+  TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits",
+                           &recovered_edits);
+  return s;
+}
+
 Status VersionSet::Recover(
-    const std::vector<ColumnFamilyDescriptor>& column_families,
-    bool read_only) {
+    const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
+    std::string* db_id) {
   std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
-  for (auto cf : column_families) {
-    cf_name_to_options.insert({cf.name, cf.options});
+  for (const auto& cf : column_families) {
+    cf_name_to_options.emplace(cf.name, cf.options);
   }
   // keeps track of column families in manifest that were not found in
   // column families parameters. if those column families are not dropped
@@ -3534,7 +4403,8 @@ Status VersionSet::Recover(
 
   // Read "CURRENT" file, which contains a pointer to the current manifest file
   std::string manifest_path;
-  Status s = GetCurrentManifestPath(&manifest_path);
+  Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
+                                    &manifest_file_number_);
   if (!s.ok()) {
     return s;
   }
@@ -3544,31 +4414,18 @@ Status VersionSet::Recover(
 
   std::unique_ptr<SequentialFileReader> manifest_file_reader;
   {
-    std::unique_ptr<SequentialFile> manifest_file;
-    s = env_->NewSequentialFile(manifest_path, &manifest_file,
-                                env_->OptimizeForManifestRead(env_options_));
+    std::unique_ptr<FSSequentialFile> manifest_file;
+    s = fs_->NewSequentialFile(manifest_path,
+                               fs_->OptimizeForManifestRead(file_options_),
+                               &manifest_file, nullptr);
     if (!s.ok()) {
       return s;
     }
     manifest_file_reader.reset(
-        new SequentialFileReader(std::move(manifest_file), manifest_path));
-  }
-  uint64_t current_manifest_file_size;
-  s = env_->GetFileSize(manifest_path, &current_manifest_file_size);
-  if (!s.ok()) {
-    return s;
+        new SequentialFileReader(std::move(manifest_file), manifest_path,
+                                 db_options_->log_readahead_size));
   }
 
-  bool have_log_number = false;
-  bool have_prev_log_number = false;
-  bool have_next_file = false;
-  bool have_last_sequence = false;
-  uint64_t next_file = 0;
-  uint64_t last_sequence = 0;
-  uint64_t log_number = 0;
-  uint64_t previous_log_number = 0;
-  uint32_t max_column_family = 0;
-  uint64_t min_log_number_to_keep = 0;
   std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
       builders;
 
@@ -3588,7 +4445,8 @@ Status VersionSet::Recover(
   builders.insert(
       std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
                             new BaseReferencedVersionBuilder(default_cfd))));
-
+  uint64_t current_manifest_file_size = 0;
+  VersionEditParams version_edit_params;
   {
     VersionSet::LogReporter reporter;
     reporter.status = &s;
@@ -3596,88 +4454,35 @@ Status VersionSet::Recover(
                        true /* checksum */, 0 /* log_number */);
     Slice record;
     std::string scratch;
-    std::vector<VersionEdit> replay_buffer;
-    size_t num_entries_decoded = 0;
-    while (reader.ReadRecord(&record, &scratch) && s.ok()) {
-      VersionEdit edit;
-      s = edit.DecodeFrom(record);
-      if (!s.ok()) {
-        break;
-      }
-
-      if (edit.is_in_atomic_group_) {
-        if (replay_buffer.empty()) {
-          replay_buffer.resize(edit.remaining_entries_ + 1);
-          TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:FirstInAtomicGroup",
-                                   &edit);
-        }
-        ++num_entries_decoded;
-        if (num_entries_decoded + edit.remaining_entries_ !=
-            static_cast<uint32_t>(replay_buffer.size())) {
-          TEST_SYNC_POINT_CALLBACK(
-              "VersionSet::Recover:IncorrectAtomicGroupSize", &edit);
-          s = Status::Corruption("corrupted atomic group");
-          break;
-        }
-        replay_buffer[num_entries_decoded - 1] = std::move(edit);
-        if (num_entries_decoded == replay_buffer.size()) {
-          TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup",
-                                   &edit);
-          for (auto& e : replay_buffer) {
-            s = ApplyOneVersionEditToBuilder(
-                e, cf_name_to_options, column_families_not_found, builders,
-                &have_log_number, &log_number, &have_prev_log_number,
-                &previous_log_number, &have_next_file, &next_file,
-                &have_last_sequence, &last_sequence, &min_log_number_to_keep,
-                &max_column_family);
-            if (!s.ok()) {
-              break;
-            }
-          }
-          replay_buffer.clear();
-          num_entries_decoded = 0;
-        }
-        TEST_SYNC_POINT("VersionSet::Recover:AtomicGroup");
-      } else {
-        if (!replay_buffer.empty()) {
-          TEST_SYNC_POINT_CALLBACK(
-              "VersionSet::Recover:AtomicGroupMixedWithNormalEdits", &edit);
-          s = Status::Corruption("corrupted atomic group");
-          break;
-        }
-        s = ApplyOneVersionEditToBuilder(
-            edit, cf_name_to_options, column_families_not_found, builders,
-            &have_log_number, &log_number, &have_prev_log_number,
-            &previous_log_number, &have_next_file, &next_file,
-            &have_last_sequence, &last_sequence, &min_log_number_to_keep,
-            &max_column_family);
-      }
-      if (!s.ok()) {
-        break;
-      }
-    }
+    AtomicGroupReadBuffer read_buffer;
+    s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
+                       column_families_not_found, builders,
+                       &version_edit_params, db_id);
+    current_manifest_file_size = reader.GetReadOffset();
+    assert(current_manifest_file_size != 0);
   }
 
   if (s.ok()) {
-    if (!have_next_file) {
+    if (!version_edit_params.has_next_file_number_) {
       s = Status::Corruption("no meta-nextfile entry in descriptor");
-    } else if (!have_log_number) {
+    } else if (!version_edit_params.has_log_number_) {
       s = Status::Corruption("no meta-lognumber entry in descriptor");
-    } else if (!have_last_sequence) {
+    } else if (!version_edit_params.has_last_sequence_) {
       s = Status::Corruption("no last-sequence-number entry in descriptor");
     }
 
-    if (!have_prev_log_number) {
-      previous_log_number = 0;
+    if (!version_edit_params.has_prev_log_number_) {
+      version_edit_params.SetPrevLogNumber(0);
     }
 
-    column_family_set_->UpdateMaxColumnFamily(max_column_family);
+    column_family_set_->UpdateMaxColumnFamily(
+        version_edit_params.max_column_family_);
 
     // When reading DB generated using old release, min_log_number_to_keep=0.
     // All log files will be scanned for potential prepare entries.
-    MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
-    MarkFileNumberUsed(previous_log_number);
-    MarkFileNumberUsed(log_number);
+    MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
+    MarkFileNumberUsed(version_edit_params.prev_log_number_);
+    MarkFileNumberUsed(version_edit_params.log_number_);
   }
 
   // there were some column families in the MANIFEST that weren't specified
@@ -3720,13 +4525,19 @@ Status VersionSet::Recover(
 
       // unlimited table cache. Pre-load table handle now.
       // Need to do it out of the mutex.
-      builder->LoadTableHandlers(
+      s = builder->LoadTableHandlers(
           cfd->internal_stats(), db_options_->max_file_opening_threads,
           false /* prefetch_index_and_filter_in_cache */,
           true /* is_initial_load */,
           cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
+      if (!s.ok()) {
+        if (db_options_->paranoid_checks) {
+          return s;
+        }
+        s = Status::OK();
+      }
 
-      Version* v = new Version(cfd, this, env_options_,
+      Version* v = new Version(cfd, this, file_options_,
                                *cfd->GetLatestMutableCFOptions(),
                                current_version_number_++);
       builder->SaveTo(v->storage_info());
@@ -3738,11 +4549,11 @@ Status VersionSet::Recover(
     }
 
     manifest_file_size_ = current_manifest_file_size;
-    next_file_number_.store(next_file + 1);
-    last_allocated_sequence_ = last_sequence;
-    last_published_sequence_ = last_sequence;
-    last_sequence_ = last_sequence;
-    prev_log_number_ = previous_log_number;
+    next_file_number_.store(version_edit_params.next_file_number_ + 1);
+    last_allocated_sequence_ = version_edit_params.last_sequence_;
+    last_published_sequence_ = version_edit_params.last_sequence_;
+    last_sequence_ = version_edit_params.last_sequence_;
+    prev_log_number_ = version_edit_params.prev_log_number_;
 
     ROCKS_LOG_INFO(
         db_options_->info_log,
@@ -3751,8 +4562,8 @@ Status VersionSet::Recover(
         ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
         ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
         ",min_log_number_to_keep is %" PRIu64 "\n",
-        manifest_path.c_str(), manifest_file_number_,
-        next_file_number_.load(), last_sequence_.load(), log_number,
+        manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
+        last_sequence_.load(), version_edit_params.log_number_,
         prev_log_number_, column_family_set_->GetMaxColumnFamily(),
         min_log_number_to_keep_2pc());
 
@@ -3771,31 +4582,28 @@ Status VersionSet::Recover(
 }
 
 Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
-                                      const std::string& dbname, Env* env) {
+                                      const std::string& dbname,
+                                      FileSystem* fs) {
   // these are just for performance reasons, not correcntes,
   // so we're fine using the defaults
-  EnvOptions soptions;
+  FileOptions soptions;
   // Read "CURRENT" file, which contains a pointer to the current manifest file
-  std::string current;
-  Status s = ReadFileToString(env, CurrentFileName(dbname), &current);
+  std::string manifest_path;
+  uint64_t manifest_file_number;
+  Status s =
+      GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
   if (!s.ok()) {
     return s;
   }
-  if (current.empty() || current[current.size()-1] != '\n') {
-    return Status::Corruption("CURRENT file does not end with newline");
-  }
-  current.resize(current.size() - 1);
-
-  std::string dscname = dbname + "/" + current;
 
   std::unique_ptr<SequentialFileReader> file_reader;
   {
-    std::unique_ptr<SequentialFile> file;
-    s = env->NewSequentialFile(dscname, &file, soptions);
+    std::unique_ptr<FSSequentialFile> file;
+    s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
     if (!s.ok()) {
       return s;
   }
-  file_reader.reset(new SequentialFileReader(std::move(file), dscname));
+  file_reader.reset(new SequentialFileReader(std::move(file), manifest_path));
   }
 
   std::map<uint32_t, std::string> column_family_names;
@@ -3845,7 +4653,7 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
 #ifndef ROCKSDB_LITE
 Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
                                         const Options* options,
-                                        const EnvOptions& env_options,
+                                        const FileOptions& file_options,
                                         int new_levels) {
   if (new_levels <= 1) {
     return Status::InvalidArgument(
@@ -3858,7 +4666,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
                                         options->table_cache_numshardbits));
   WriteController wc(options->delayed_write_rate);
   WriteBufferManager wb(options->db_write_buffer_size);
-  VersionSet versions(dbname, &db_options, env_options, tc.get(), &wb, &wc);
+  VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
+                      /*block_cache_tracer=*/nullptr);
   Status status;
 
   std::vector<ColumnFamilyDescriptor> dummy;
@@ -3902,7 +4711,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
   }
 
   // we need to allocate an array with the old number of levels size to
-  // avoid SIGSEGV in WriteSnapshot()
+  // avoid SIGSEGV in WriteCurrentStatetoManifest()
   // however, all levels bigger or equal to new_levels will be empty
   std::vector<FileMetaData*>* new_files_list =
       new std::vector<FileMetaData*>[current_levels];
@@ -3927,19 +4736,50 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
       mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
 }
 
+// Get the checksum information including the checksum and checksum function
+// name of all SST files in VersionSet. Store the information in
+// FileChecksumList which contains a map from file number to its checksum info.
+// If DB is not running, make sure call VersionSet::Recover() to load the file
+// metadata from Manifest to VersionSet before calling this function.
+Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
+  // Clean the previously stored checksum information if any.
+  if (checksum_list == nullptr) {
+    return Status::InvalidArgument("checksum_list is nullptr");
+  }
+  checksum_list->reset();
+
+  for (auto cfd : *column_family_set_) {
+    if (cfd->IsDropped() || !cfd->initialized()) {
+      continue;
+    }
+    for (int level = 0; level < cfd->NumberLevels(); level++) {
+      for (const auto& file :
+           cfd->current()->storage_info()->LevelFiles(level)) {
+        checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
+                                             file->file_checksum,
+                                             file->file_checksum_func_name);
+      }
+    }
+  }
+  return Status::OK();
+}
+
 Status VersionSet::DumpManifest(Options& options, std::string& dscname,
                                 bool verbose, bool hex, bool json) {
   // Open the specified manifest file.
   std::unique_ptr<SequentialFileReader> file_reader;
   Status s;
   {
-    std::unique_ptr<SequentialFile> file;
-    s = options.env->NewSequentialFile(
-        dscname, &file, env_->OptimizeForManifestRead(env_options_));
+    std::unique_ptr<FSSequentialFile> file;
+    s = options.file_system->NewSequentialFile(
+        dscname,
+        options.file_system->OptimizeForManifestRead(file_options_), &file,
+        nullptr);
     if (!s.ok()) {
       return s;
     }
-    file_reader.reset(new SequentialFileReader(std::move(file), dscname));
+    file_reader.reset(new SequentialFileReader(
+        std::move(file), dscname, db_options_->log_readahead_size));
   }
 
   bool have_prev_log_number = false;
@@ -4016,8 +4856,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
         comparators.erase(edit.column_family_);
         cfd = column_family_set_->GetColumnFamily(edit.column_family_);
         assert(cfd != nullptr);
-        cfd->Unref();
-        delete cfd;
+        cfd->UnrefAndTryDelete();
         cfd = nullptr;
       } else {
         if (!cf_in_builders) {
@@ -4035,7 +4874,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
         // to builder
         auto builder = builders.find(edit.column_family_);
         assert(builder != builders.end());
-        builder->second->version_builder()->Apply(&edit);
+        s = builder->second->version_builder()->Apply(&edit);
+        if (!s.ok()) {
+          break;
+        }
       }
 
       if (cfd != nullptr && edit.has_log_number_) {
@@ -4092,7 +4934,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
       assert(builders_iter != builders.end());
       auto builder = builders_iter->second->version_builder();
 
-      Version* v = new Version(cfd, this, env_options_,
+      Version* v = new Version(cfd, this, file_options_,
                                *cfd->GetLatestMutableCFOptions(),
                                current_version_number_++);
       builder->SaveTo(v->storage_info());
@@ -4138,7 +4980,6 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
     next_file_number_.store(number + 1, std::memory_order_relaxed);
   }
 }
-
 // Called only either from ::LogAndApply which is protected by mutex or during
 // recovery which is single-threaded.
 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
@@ -4147,7 +4988,9 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
   }
 }
 
-Status VersionSet::WriteSnapshot(log::Writer* log) {
+Status VersionSet::WriteCurrentStateToManifest(
+    const std::unordered_map<uint32_t, MutableCFState>& curr_state,
+    log::Writer* log) {
   // TODO: Break up into multiple records to reduce memory usage on recovery?
 
   // WARNING: This method doesn't hold a mutex!!
@@ -4155,6 +4998,22 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
   // This is done without DB mutex lock held, but only within single-threaded
   // LogAndApply. Column family manipulations can only happen within LogAndApply
   // (the same single thread), so we're safe to iterate.
+
+  if (db_options_->write_dbid_to_manifest) {
+    VersionEdit edit_for_db_id;
+    assert(!db_id_.empty());
+    edit_for_db_id.SetDBId(db_id_);
+    std::string db_id_record;
+    if (!edit_for_db_id.EncodeTo(&db_id_record)) {
+      return Status::Corruption("Unable to Encode VersionEdit:" +
+                                edit_for_db_id.DebugString(true));
+    }
+    Status add_record = log->AddRecord(db_id_record);
+    if (!add_record.ok()) {
+      return add_record;
+    }
+  }
+
   for (auto cfd : *column_family_set_) {
     if (cfd->IsDropped()) {
       continue;
@@ -4193,10 +5052,15 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
           edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
                        f->fd.GetFileSize(), f->smallest, f->largest,
                        f->fd.smallest_seqno, f->fd.largest_seqno,
-                       f->marked_for_compaction);
+                       f->marked_for_compaction, f->oldest_blob_file_number,
+                       f->oldest_ancester_time, f->file_creation_time,
+                       f->file_checksum, f->file_checksum_func_name);
         }
       }
-      edit.SetLogNumber(cfd->GetLogNumber());
+      const auto iter = curr_state.find(cfd->GetID());
+      assert(iter != curr_state.end());
+      uint64_t log_number = iter->second.log_number;
+      edit.SetLogNumber(log_number);
       std::string record;
       if (!edit.EncodeTo(&record)) {
         return Status::Corruption(
@@ -4208,7 +5072,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
       }
     }
   }
-
   return Status::OK();
 }
 
@@ -4218,111 +5081,198 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
 // we avoid doing binary search for the keys b and c twice and instead somehow
 // maintain state of where they first appear in the files.
-uint64_t VersionSet::ApproximateSize(Version* v, const Slice& start,
+uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
+                                     Version* v, const Slice& start,
                                      const Slice& end, int start_level,
-                                     int end_level) {
+                                     int end_level, TableReaderCaller caller) {
+  const auto& icmp = v->cfd_->internal_comparator();
+
   // pre-condition
-  assert(v->cfd_->internal_comparator().Compare(start, end) <= 0);
+  assert(icmp.Compare(start, end) <= 0);
 
-  uint64_t size = 0;
+  uint64_t total_full_size = 0;
   const auto* vstorage = v->storage_info();
-  end_level = end_level == -1
-                  ? vstorage->num_non_empty_levels()
-                  : std::min(end_level, vstorage->num_non_empty_levels());
+  const int num_non_empty_levels = vstorage->num_non_empty_levels();
+  end_level = (end_level == -1) ? num_non_empty_levels
+                                : std::min(end_level, num_non_empty_levels);
 
   assert(start_level <= end_level);
 
-  for (int level = start_level; level < end_level; level++) {
+  // Outline of the optimization that uses options.files_size_error_margin.
+  // When approximating the files total size that is used to store a keys range,
+  // we first sum up the sizes of the files that fully fall into the range.
+  // Then we sum up the sizes of all the files that may intersect with the range
+  // (this includes all files in L0 as well). Then, if total_intersecting_size
+  // is smaller than total_full_size * options.files_size_error_margin - we can
+  // infer that the intersecting files have a sufficiently negligible
+  // contribution to the total size, and we can approximate the storage required
+  // for the keys in range as just half of the intersecting_files_size.
+  // E.g., if the value of files_size_error_margin is 0.1, then the error of the
+  // approximation is limited to only ~10% of the total size of files that fully
+  // fall into the keys range. In such case, this helps to avoid a costly
+  // process of binary searching the intersecting files that is required only
+  // for a more precise calculation of the total size.
+
+  autovector<FdWithKeyRange*, 32> first_files;
+  autovector<FdWithKeyRange*, 16> last_files;
+
+  // scan all the levels
+  for (int level = start_level; level < end_level; ++level) {
     const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
-    if (!files_brief.num_files) {
+    if (files_brief.num_files == 0) {
       // empty level, skip exploration
       continue;
     }
 
-    if (!level) {
-      // level 0 data is sorted order, handle the use case explicitly
-      size += ApproximateSizeLevel0(v, files_brief, start, end);
+    if (level == 0) {
+      // level 0 files are not in sorted order, we need to iterate through
+      // the list to compute the total bytes that require scanning,
+      // so handle the case explicitly (similarly to first_files case)
+      for (size_t i = 0; i < files_brief.num_files; i++) {
+        first_files.push_back(&files_brief.files[i]);
+      }
       continue;
     }
 
     assert(level > 0);
     assert(files_brief.num_files > 0);
 
-    // identify the file position for starting key
-    const uint64_t idx_start = FindFileInRange(
-        v->cfd_->internal_comparator(), files_brief, start,
-        /*start=*/0, static_cast<uint32_t>(files_brief.num_files - 1));
-    assert(idx_start < files_brief.num_files);
-
-    // scan all files from the starting position until the ending position
-    // inferred from the sorted order
-    for (uint64_t i = idx_start; i < files_brief.num_files; i++) {
-      uint64_t val;
-      val = ApproximateSize(v, files_brief.files[i], end);
-      if (!val) {
-        // the files after this will not have the range
-        break;
-      }
+    // identify the file position for start key
+    const int idx_start =
+        FindFileInRange(icmp, files_brief, start, 0,
+                        static_cast<uint32_t>(files_brief.num_files - 1));
+    assert(static_cast<size_t>(idx_start) < files_brief.num_files);
 
-      size += val;
+    // identify the file position for end key
+    int idx_end = idx_start;
+    if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
+      idx_end =
+          FindFileInRange(icmp, files_brief, end, idx_start,
+                          static_cast<uint32_t>(files_brief.num_files - 1));
+    }
+    assert(idx_end >= idx_start &&
+           static_cast<size_t>(idx_end) < files_brief.num_files);
 
-      if (i == idx_start) {
-        // subtract the bytes needed to be scanned to get to the starting
-        // key
-        val = ApproximateSize(v, files_brief.files[i], start);
-        assert(size >= val);
-        size -= val;
-      }
+    // scan all files from the starting index to the ending index
+    // (inferred from the sorted order)
+
+    // first scan all the intermediate full files (excluding first and last)
+    for (int i = idx_start + 1; i < idx_end; ++i) {
+      uint64_t file_size = files_brief.files[i].fd.GetFileSize();
+      // The entire file falls into the range, so we can just take its size.
+      assert(file_size ==
+             ApproximateSize(v, files_brief.files[i], start, end, caller));
+      total_full_size += file_size;
+    }
+
+    // save the first and the last files (which may be the same file), so we
+    // can scan them later.
+    first_files.push_back(&files_brief.files[idx_start]);
+    if (idx_start != idx_end) {
+      // we need to estimate size for both files, only if they are different
+      last_files.push_back(&files_brief.files[idx_end]);
     }
   }
 
-  return size;
-}
+  // The sum of all file sizes that intersect the [start, end] keys range.
+  uint64_t total_intersecting_size = 0;
+  for (const auto* file_ptr : first_files) {
+    total_intersecting_size += file_ptr->fd.GetFileSize();
+  }
+  for (const auto* file_ptr : last_files) {
+    total_intersecting_size += file_ptr->fd.GetFileSize();
+  }
 
-uint64_t VersionSet::ApproximateSizeLevel0(Version* v,
-                                           const LevelFilesBrief& files_brief,
-                                           const Slice& key_start,
-                                           const Slice& key_end) {
-  // level 0 files are not in sorted order, we need to iterate through
-  // the list to compute the total bytes that require scanning
-  uint64_t size = 0;
-  for (size_t i = 0; i < files_brief.num_files; i++) {
-    const uint64_t start = ApproximateSize(v, files_brief.files[i], key_start);
-    const uint64_t end = ApproximateSize(v, files_brief.files[i], key_end);
-    assert(end >= start);
-    size += end - start;
+  // Now scan all the first & last files at each level, and estimate their size.
+  // If the total_intersecting_size is less than X% of the total_full_size - we
+  // want to approximate the result in order to avoid the costly binary search
+  // inside ApproximateSize. We use half of file size as an approximation below.
+
+  const double margin = options.files_size_error_margin;
+  if (margin > 0 && total_intersecting_size <
+                        static_cast<uint64_t>(total_full_size * margin)) {
+    total_full_size += total_intersecting_size / 2;
+  } else {
+    // Estimate for all the first files, at each level
+    for (const auto file_ptr : first_files) {
+      total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
+    }
+
+    // Estimate for all the last files, at each level
+    for (const auto file_ptr : last_files) {
+      // We could use ApproximateSize here, but calling ApproximateOffsetOf
+      // directly is just more efficient.
+      total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
+    }
   }
-  return size;
+
+  return total_full_size;
 }
 
-uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
-                                     const Slice& key) {
+uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
+                                         const Slice& key,
+                                         TableReaderCaller caller) {
   // pre-condition
   assert(v);
+  const auto& icmp = v->cfd_->internal_comparator();
 
   uint64_t result = 0;
-  if (v->cfd_->internal_comparator().Compare(f.largest_key, key) <= 0) {
+  if (icmp.Compare(f.largest_key, key) <= 0) {
     // Entire file is before "key", so just add the file size
     result = f.fd.GetFileSize();
-  } else if (v->cfd_->internal_comparator().Compare(f.smallest_key, key) > 0) {
+  } else if (icmp.Compare(f.smallest_key, key) > 0) {
     // Entire file is after "key", so ignore
     result = 0;
   } else {
     // "key" falls in the range for this table.  Add the
     // approximate offset of "key" within the table.
-    TableReader* table_reader_ptr;
-    InternalIterator* iter = v->cfd_->table_cache()->NewIterator(
-        ReadOptions(), v->env_options_, v->cfd_->internal_comparator(),
-        *f.file_metadata, nullptr /* range_del_agg */,
-        v->GetMutableCFOptions().prefix_extractor.get(), &table_reader_ptr);
-    if (table_reader_ptr != nullptr) {
-      result = table_reader_ptr->ApproximateOffsetOf(key);
+    TableCache* table_cache = v->cfd_->table_cache();
+    if (table_cache != nullptr) {
+      result = table_cache->ApproximateOffsetOf(
+          key, f.file_metadata->fd, caller, icmp,
+          v->GetMutableCFOptions().prefix_extractor.get());
     }
-    delete iter;
   }
   return result;
 }
 
+uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
+                                     const Slice& start, const Slice& end,
+                                     TableReaderCaller caller) {
+  // pre-condition
+  assert(v);
+  const auto& icmp = v->cfd_->internal_comparator();
+  assert(icmp.Compare(start, end) <= 0);
+
+  if (icmp.Compare(f.largest_key, start) <= 0 ||
+      icmp.Compare(f.smallest_key, end) > 0) {
+    // Entire file is before or after the start/end keys range
+    return 0;
+  }
+
+  if (icmp.Compare(f.smallest_key, start) >= 0) {
+    // Start of the range is before the file start - approximate by end offset
+    return ApproximateOffsetOf(v, f, end, caller);
+  }
+
+  if (icmp.Compare(f.largest_key, end) < 0) {
+    // End of the range is after the file end - approximate by subtracting
+    // start offset from the file size
+    uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
+    assert(f.fd.GetFileSize() >= start_offset);
+    return f.fd.GetFileSize() - start_offset;
+  }
+
+  // The interval falls entirely in the range for this file.
+  TableCache* table_cache = v->cfd_->table_cache();
+  if (table_cache == nullptr) {
+    return 0;
+  }
+  return table_cache->ApproximateSize(
+      start, end, f.file_metadata->fd, caller, icmp,
+      v->GetMutableCFOptions().prefix_extractor.get());
+}
+
 void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
   // pre-calculate space requirement
   int64_t total_files = 0;
@@ -4367,7 +5317,7 @@ void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
 
 InternalIterator* VersionSet::MakeInputIterator(
     const Compaction* c, RangeDelAggregator* range_del_agg,
-    const EnvOptions& env_options_compactions) {
+    const FileOptions& file_options_compactions) {
   auto cfd = c->column_family_data();
   ReadOptions read_options;
   read_options.verify_checksums = true;
@@ -4392,24 +5342,27 @@ InternalIterator* VersionSet::MakeInputIterator(
         const LevelFilesBrief* flevel = c->input_levels(which);
         for (size_t i = 0; i < flevel->num_files; i++) {
           list[num++] = cfd->table_cache()->NewIterator(
-              read_options, env_options_compactions, cfd->internal_comparator(),
+              read_options, file_options_compactions,
+              cfd->internal_comparator(),
               *flevel->files[i].file_metadata, range_del_agg,
               c->mutable_cf_options()->prefix_extractor.get(),
-              nullptr /* table_reader_ptr */,
-              nullptr /* no per level latency histogram */,
-              true /* for_compaction */, nullptr /* arena */,
-              false /* skip_filters */, static_cast<int>(which) /* level */);
+              /*table_reader_ptr=*/nullptr,
+              /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
+              /*arena=*/nullptr,
+              /*skip_filters=*/false, /*level=*/static_cast<int>(which),
+              /*smallest_compaction_key=*/nullptr,
+              /*largest_compaction_key=*/nullptr);
         }
       } else {
         // Create concatenating iterator for the files from this level
         list[num++] = new LevelIterator(
-            cfd->table_cache(), read_options, env_options_compactions,
+            cfd->table_cache(), read_options, file_options_compactions,
             cfd->internal_comparator(), c->input_levels(which),
             c->mutable_cf_options()->prefix_extractor.get(),
-            false /* should_sample */,
-            nullptr /* no per level latency histogram */,
-            true /* for_compaction */, false /* skip_filters */,
-            static_cast<int>(which) /* level */, range_del_agg,
+            /*should_sample=*/false,
+            /*no per level latency histogram=*/nullptr,
+            TableReaderCaller::kCompaction, /*skip_filters=*/false,
+            /*level=*/static_cast<int>(which), range_del_agg,
             c->boundaries(which));
       }
     }
@@ -4513,7 +5466,9 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
           assert(!cfd->ioptions()->cf_paths.empty());
           filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
         }
-        filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
+        const uint64_t file_number = file->fd.GetNumber();
+        filemetadata.name = MakeTableFileName("", file_number);
+        filemetadata.file_number = file_number;
         filemetadata.level = level;
         filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
         filemetadata.smallestkey = file->smallest.user_key().ToString();
@@ -4525,6 +5480,9 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
         filemetadata.being_compacted = file->being_compacted;
         filemetadata.num_entries = file->num_entries;
         filemetadata.num_deletions = file->num_deletions;
+        filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
+        filemetadata.file_checksum = file->file_checksum;
+        filemetadata.file_checksum_func_name = file->file_checksum_func_name;
         metadata->push_back(filemetadata);
       }
     }
@@ -4553,7 +5511,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
 
   MutableCFOptions dummy_cf_options;
   Version* dummy_versions =
-      new Version(nullptr, this, env_options_, dummy_cf_options);
+      new Version(nullptr, this, file_options_, dummy_cf_options);
   // Ref() dummy version once so that later we can call Unref() to delete it
   // by avoiding calling "delete" explicitly (~Version is private)
   dummy_versions->Ref();
@@ -4561,7 +5519,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
       edit->column_family_name_, edit->column_family_, dummy_versions,
       cf_options);
 
-  Version* v = new Version(new_cfd, this, env_options_,
+  Version* v = new Version(new_cfd, this, file_options_,
                            *new_cfd->GetLatestMutableCFOptions(),
                            current_version_number_++);
 
@@ -4605,12 +5563,14 @@ uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
 
 ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
                                        const ImmutableDBOptions* _db_options,
-                                       const EnvOptions& _env_options,
+                                       const FileOptions& _file_options,
                                        Cache* table_cache,
                                        WriteBufferManager* write_buffer_manager,
                                        WriteController* write_controller)
-    : VersionSet(dbname, _db_options, _env_options, table_cache,
-                 write_buffer_manager, write_controller) {}
+    : VersionSet(dbname, _db_options, _file_options, table_cache,
+                 write_buffer_manager, write_controller,
+                 /*block_cache_tracer=*/nullptr),
+      number_of_edits_to_skip_(0) {}
 
 ReactiveVersionSet::~ReactiveVersionSet() {}
 
@@ -4641,17 +5601,6 @@ Status ReactiveVersionSet::Recover(
   // In recovery, nobody else can access it, so it's fine to set it to be
   // initialized earlier.
   default_cfd->set_initialized();
-
-  bool have_log_number = false;
-  bool have_prev_log_number = false;
-  bool have_next_file = false;
-  bool have_last_sequence = false;
-  uint64_t next_file = 0;
-  uint64_t last_sequence = 0;
-  uint64_t log_number = 0;
-  uint64_t previous_log_number = 0;
-  uint32_t max_column_family = 0;
-  uint64_t min_log_number_to_keep = 0;
   std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
       builders;
   std::unordered_map<int, std::string> column_families_not_found;
@@ -4667,25 +5616,17 @@ Status ReactiveVersionSet::Recover(
   log::Reader* reader = manifest_reader->get();
 
   int retry = 0;
+  VersionEdit version_edit;
   while (s.ok() && retry < 1) {
     assert(reader != nullptr);
     Slice record;
     std::string scratch;
-    while (s.ok() && reader->ReadRecord(&record, &scratch)) {
-      VersionEdit edit;
-      s = edit.DecodeFrom(record);
-      if (!s.ok()) {
-        break;
-      }
-      s = ApplyOneVersionEditToBuilder(
-          edit, cf_name_to_options, column_families_not_found, builders,
-          &have_log_number, &log_number, &have_prev_log_number,
-          &previous_log_number, &have_next_file, &next_file,
-          &have_last_sequence, &last_sequence, &min_log_number_to_keep,
-          &max_column_family);
-    }
+    s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
+                       column_families_not_found, builders, &version_edit);
     if (s.ok()) {
-      bool enough = have_next_file && have_log_number && have_last_sequence;
+      bool enough = version_edit.has_next_file_number_ &&
+                    version_edit.has_log_number_ &&
+                    version_edit.has_last_sequence_;
       if (enough) {
         for (const auto& cf : column_families) {
           auto cfd = column_family_set_->GetColumnFamily(cf.name);
@@ -4727,14 +5668,14 @@ Status ReactiveVersionSet::Recover(
   }
 
   if (s.ok()) {
-    if (!have_prev_log_number) {
-      previous_log_number = 0;
+    if (!version_edit.has_prev_log_number_) {
+      version_edit.prev_log_number_ = 0;
     }
-    column_family_set_->UpdateMaxColumnFamily(max_column_family);
+    column_family_set_->UpdateMaxColumnFamily(version_edit.max_column_family_);
 
-    MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
-    MarkFileNumberUsed(previous_log_number);
-    MarkFileNumberUsed(log_number);
+    MarkMinLogNumberToKeep2PC(version_edit.min_log_number_to_keep_);
+    MarkFileNumberUsed(version_edit.prev_log_number_);
+    MarkFileNumberUsed(version_edit.log_number_);
 
     for (auto cfd : *column_family_set_) {
       assert(builders.count(cfd->GetID()) > 0);
@@ -4757,7 +5698,7 @@ Status ReactiveVersionSet::Recover(
       assert(builders_iter != builders.end());
       auto* builder = builders_iter->second->version_builder();
 
-      Version* v = new Version(cfd, this, env_options_,
+      Version* v = new Version(cfd, this, file_options_,
                                *cfd->GetLatestMutableCFOptions(),
                                current_version_number_++);
       builder->SaveTo(v->storage_info());
@@ -4767,11 +5708,11 @@ Status ReactiveVersionSet::Recover(
                       !(db_options_->skip_stats_update_on_db_open));
       AppendVersion(cfd, v);
     }
-    next_file_number_.store(next_file + 1);
-    last_allocated_sequence_ = last_sequence;
-    last_published_sequence_ = last_sequence;
-    last_sequence_ = last_sequence;
-    prev_log_number_ = previous_log_number;
+    next_file_number_.store(version_edit.next_file_number_ + 1);
+    last_allocated_sequence_ = version_edit.last_sequence_;
+    last_published_sequence_ = version_edit.last_sequence_;
+    last_sequence_ = version_edit.last_sequence_;
+    prev_log_number_ = version_edit.prev_log_number_;
     for (auto cfd : *column_family_set_) {
       if (cfd->IsDropped()) {
         continue;
@@ -4793,17 +5734,7 @@ Status ReactiveVersionSet::ReadAndApply(
   mu->AssertHeld();
 
   Status s;
-  bool have_log_number = false;
-  bool have_prev_log_number = false;
-  bool have_next_file = false;
-  bool have_last_sequence = false;
-  uint64_t next_file = 0;
-  uint64_t last_sequence = 0;
-  uint64_t log_number = 0;
-  uint64_t previous_log_number = 0;
-  uint32_t max_column_family = 0;
-  uint64_t min_log_number_to_keep = 0;
-
+  uint64_t applied_edits = 0;
   while (s.ok()) {
     Slice record;
     std::string scratch;
@@ -4815,73 +5746,50 @@ Status ReactiveVersionSet::ReadAndApply(
       if (!s.ok()) {
         break;
       }
-      ColumnFamilyData* cfd =
-          column_family_set_->GetColumnFamily(edit.column_family_);
-      // If we cannot find this column family in our column family set, then it
-      // may be a new column family created by the primary after the secondary
-      // starts. Ignore it for now.
-      if (nullptr == cfd) {
+
+      // Skip the first VersionEdits of each MANIFEST generated by
+      // VersionSet::WriteCurrentStatetoManifest.
+      if (number_of_edits_to_skip_ > 0) {
+        ColumnFamilyData* cfd =
+            column_family_set_->GetColumnFamily(edit.column_family_);
+        if (cfd != nullptr && !cfd->IsDropped()) {
+          --number_of_edits_to_skip_;
+        }
         continue;
       }
-      if (active_version_builders_.find(edit.column_family_) ==
-          active_version_builders_.end()) {
-        std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
-            new BaseReferencedVersionBuilder(cfd));
-        active_version_builders_.insert(
-            std::make_pair(edit.column_family_, std::move(builder_guard)));
-      }
-      s = ApplyOneVersionEditToBuilder(
-          edit, &have_log_number, &log_number, &have_prev_log_number,
-          &previous_log_number, &have_next_file, &next_file,
-          &have_last_sequence, &last_sequence, &min_log_number_to_keep,
-          &max_column_family);
+
+      s = read_buffer_.AddEdit(&edit);
       if (!s.ok()) {
         break;
       }
-      auto builder_iter = active_version_builders_.find(edit.column_family_);
-      assert(builder_iter != active_version_builders_.end());
-      auto builder = builder_iter->second->version_builder();
-      assert(builder != nullptr);
-      s = builder->LoadTableHandlers(
-          cfd->internal_stats(), db_options_->max_file_opening_threads,
-          false /* prefetch_index_and_filter_in_cache */,
-          false /* is_initial_load */,
-          cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
-      TEST_SYNC_POINT_CALLBACK(
-          "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", &s);
-      if (!s.ok() && !s.IsPathNotFound()) {
-        break;
-      } else if (s.IsPathNotFound()) {
-        s = Status::OK();
-      } else {  // s.ok() == true
-        auto version = new Version(cfd, this, env_options_,
-                                   *cfd->GetLatestMutableCFOptions(),
-                                   current_version_number_++);
-        builder->SaveTo(version->storage_info());
-        version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
-        AppendVersion(cfd, version);
-        active_version_builders_.erase(builder_iter);
-        if (cfds_changed->count(cfd) == 0) {
-          cfds_changed->insert(cfd);
+      VersionEdit temp_edit;
+      if (edit.is_in_atomic_group_) {
+        if (read_buffer_.IsFull()) {
+          // Apply edits in an atomic group when we have read all edits in the
+          // group.
+          for (auto& e : read_buffer_.replay_buffer()) {
+            s = ApplyOneVersionEditToBuilder(e, cfds_changed, &temp_edit);
+            if (!s.ok()) {
+              break;
+            }
+            applied_edits++;
+          }
+          if (!s.ok()) {
+            break;
+          }
+          read_buffer_.Clear();
+        }
+      } else {
+        // Apply a normal edit immediately.
+        s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
+        if (s.ok()) {
+          applied_edits++;
         }
       }
-      if (have_next_file) {
-        next_file_number_.store(next_file + 1);
-      }
-      if (have_last_sequence) {
-        last_allocated_sequence_ = last_sequence;
-        last_published_sequence_ = last_sequence;
-        last_sequence_ = last_sequence;
-      }
-      if (have_prev_log_number) {
-        prev_log_number_ = previous_log_number;
-        MarkFileNumberUsed(previous_log_number);
-      }
-      if (have_log_number) {
-        MarkFileNumberUsed(log_number);
-      }
-      column_family_set_->UpdateMaxColumnFamily(max_column_family);
-      MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
+    }
+    if (!s.ok()) {
+      // Clear the buffer if we fail to decode/apply an edit.
+      read_buffer_.Clear();
     }
     // It's possible that:
     // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
@@ -4891,8 +5799,37 @@ Status ReactiveVersionSet::ReadAndApply(
     // find the next MANIFEST, we should exit the loop.
     s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
     reader = manifest_reader->get();
-    if (s.ok() && reader->file()->file_name() == old_manifest_path) {
-      break;
+    if (s.ok()) {
+      if (reader->file()->file_name() == old_manifest_path) {
+        // Still processing the same MANIFEST, thus no need to continue this
+        // loop since no record is available if we have reached here.
+        break;
+      } else {
+        // We have switched to a new MANIFEST whose first records have been
+        // generated by VersionSet::WriteCurrentStatetoManifest. Since the
+        // secondary instance has already finished recovering upon start, there
+        // is no need for the secondary to process these records. Actually, if
+        // the secondary were to replay these records, the secondary may end up
+        // adding the same SST files AGAIN to each column family, causing
+        // consistency checks done by VersionBuilder to fail. Therefore, we
+        // record the number of records to skip at the beginning of the new
+        // MANIFEST and ignore them.
+        number_of_edits_to_skip_ = 0;
+        for (auto* cfd : *column_family_set_) {
+          if (cfd->IsDropped()) {
+            continue;
+          }
+          // Increase number_of_edits_to_skip by 2 because
+          // WriteCurrentStatetoManifest() writes 2 version edits for each
+          // column family at the beginning of the newly-generated MANIFEST.
+          // TODO(yanqin) remove hard-coded value.
+          if (db_options_->write_dbid_to_manifest) {
+            number_of_edits_to_skip_ += 3;
+          } else {
+            number_of_edits_to_skip_ += 2;
+          }
+        }
+      }
     }
   }
 
@@ -4910,52 +5847,111 @@ Status ReactiveVersionSet::ReadAndApply(
       }
     }
   }
+  TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits",
+                           &applied_edits);
   return s;
 }
 
 Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
-    VersionEdit& edit, bool* have_log_number, uint64_t* log_number,
-    bool* have_prev_log_number, uint64_t* previous_log_number,
-    bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
-    SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
-    uint32_t* max_column_family) {
-  ColumnFamilyData* cfd = nullptr;
-  Status status;
+    VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
+    VersionEdit* version_edit) {
+  ColumnFamilyData* cfd =
+      column_family_set_->GetColumnFamily(edit.column_family_);
+
+  // If we cannot find this column family in our column family set, then it
+  // may be a new column family created by the primary after the secondary
+  // starts. It is also possible that the secondary instance opens only a subset
+  // of column families. Ignore it for now.
+  if (nullptr == cfd) {
+    return Status::OK();
+  }
+  if (active_version_builders_.find(edit.column_family_) ==
+          active_version_builders_.end() &&
+      !cfd->IsDropped()) {
+    std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
+        new BaseReferencedVersionBuilder(cfd));
+    active_version_builders_.insert(
+        std::make_pair(edit.column_family_, std::move(builder_guard)));
+  }
+
+  auto builder_iter = active_version_builders_.find(edit.column_family_);
+  assert(builder_iter != active_version_builders_.end());
+  auto builder = builder_iter->second->version_builder();
+  assert(builder != nullptr);
+
   if (edit.is_column_family_add_) {
     // TODO (yanqin) for now the secondary ignores column families created
     // after Open. This also simplifies handling of switching to a new MANIFEST
     // and processing the snapshot of the system at the beginning of the
     // MANIFEST.
-    return Status::OK();
   } else if (edit.is_column_family_drop_) {
-    cfd = column_family_set_->GetColumnFamily(edit.column_family_);
-    // Drop a CF created by primary after secondary starts? Then ignore
-    if (cfd == nullptr) {
-      return Status::OK();
-    }
     // Drop the column family by setting it to be 'dropped' without destroying
     // the column family handle.
+    // TODO (haoyu) figure out how to handle column faimly drop for
+    // secondary instance. (Is it possible that the ref count for cfd is 0 but
+    // the ref count for its versions is higher than 0?)
     cfd->SetDropped();
-    if (cfd->Unref()) {
-      delete cfd;
+    if (cfd->UnrefAndTryDelete()) {
       cfd = nullptr;
     }
+    active_version_builders_.erase(builder_iter);
   } else {
-    cfd = column_family_set_->GetColumnFamily(edit.column_family_);
-    // Operation on a CF created after Open? Then ignore
-    if (cfd == nullptr) {
-      return Status::OK();
+    Status s = builder->Apply(&edit);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+  Status s = ExtractInfoFromVersionEdit(cfd, edit, version_edit);
+  if (!s.ok()) {
+    return s;
+  }
+
+  if (cfd != nullptr && !cfd->IsDropped()) {
+    s = builder->LoadTableHandlers(
+        cfd->internal_stats(), db_options_->max_file_opening_threads,
+        false /* prefetch_index_and_filter_in_cache */,
+        false /* is_initial_load */,
+        cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
+    TEST_SYNC_POINT_CALLBACK(
+        "ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
+        "AfterLoadTableHandlers",
+        &s);
+
+    if (s.ok()) {
+      auto version = new Version(cfd, this, file_options_,
+                                 *cfd->GetLatestMutableCFOptions(),
+                                 current_version_number_++);
+      builder->SaveTo(version->storage_info());
+      version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
+      AppendVersion(cfd, version);
+      active_version_builders_.erase(builder_iter);
+      if (cfds_changed->count(cfd) == 0) {
+        cfds_changed->insert(cfd);
+      }
+    } else if (s.IsPathNotFound()) {
+      s = Status::OK();
     }
-    auto builder_iter = active_version_builders_.find(edit.column_family_);
-    assert(builder_iter != active_version_builders_.end());
-    auto builder = builder_iter->second->version_builder();
-    assert(builder != nullptr);
-    builder->Apply(&edit);
+    // Some other error has occurred during LoadTableHandlers.
+  }
+
+  if (version_edit->HasNextFile()) {
+    next_file_number_.store(version_edit->next_file_number_ + 1);
+  }
+  if (version_edit->has_last_sequence_) {
+    last_allocated_sequence_ = version_edit->last_sequence_;
+    last_published_sequence_ = version_edit->last_sequence_;
+    last_sequence_ = version_edit->last_sequence_;
   }
-  return ExtractInfoFromVersionEdit(
-      cfd, edit, have_log_number, log_number, have_prev_log_number,
-      previous_log_number, have_next_file, next_file, have_last_sequence,
-      last_sequence, min_log_number_to_keep, max_column_family);
+  if (version_edit->has_prev_log_number_) {
+    prev_log_number_ = version_edit->prev_log_number_;
+    MarkFileNumberUsed(version_edit->prev_log_number_);
+  }
+  if (version_edit->has_log_number_) {
+    MarkFileNumberUsed(version_edit->log_number_);
+  }
+  column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
+  MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
+  return s;
 }
 
 Status ReactiveVersionSet::MaybeSwitchManifest(
@@ -4965,8 +5961,9 @@ Status ReactiveVersionSet::MaybeSwitchManifest(
   Status s;
   do {
     std::string manifest_path;
-    s = GetCurrentManifestPath(&manifest_path);
-    std::unique_ptr<SequentialFile> manifest_file;
+    s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
+                               &manifest_file_number_);
+    std::unique_ptr<FSSequentialFile> manifest_file;
     if (s.ok()) {
       if (nullptr == manifest_reader->get() ||
           manifest_reader->get()->file()->file_name() != manifest_path) {
@@ -4976,9 +5973,9 @@ Status ReactiveVersionSet::MaybeSwitchManifest(
         TEST_SYNC_POINT(
             "ReactiveVersionSet::MaybeSwitchManifest:"
             "AfterGetCurrentManifestPath:1");
-        s = env_->NewSequentialFile(
-            manifest_path, &manifest_file,
-            env_->OptimizeForManifestRead(env_options_));
+        s = fs_->NewSequentialFile(manifest_path,
+                                   env_->OptimizeForManifestRead(file_options_),
+                                   &manifest_file, nullptr);
       } else {
         // No need to switch manifest.
         break;
@@ -4987,7 +5984,8 @@ Status ReactiveVersionSet::MaybeSwitchManifest(
     std::unique_ptr<SequentialFileReader> manifest_file_reader;
     if (s.ok()) {
       manifest_file_reader.reset(
-          new SequentialFileReader(std::move(manifest_file), manifest_path));
+          new SequentialFileReader(std::move(manifest_file), manifest_path,
+                                   db_options_->log_readahead_size));
       manifest_reader->reset(new log::FragmentBufferedReader(
           nullptr, std::move(manifest_file_reader), reporter,
           true /* checksum */, 0 /* log_number */));
@@ -4996,12 +5994,12 @@ Status ReactiveVersionSet::MaybeSwitchManifest(
       // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
       // active_version_builders_ map because we choose to construct the
       // versions from scratch, thanks to the first part of each MANIFEST
-      // written by VersionSet::WriteSnapshot. This is not necessary, but we
-      // choose this at present for the sake of simplicity.
+      // written by VersionSet::WriteCurrentStatetoManifest. This is not
+      // necessary, but we choose this at present for the sake of simplicity.
       active_version_builders_.clear();
     }
   } while (s.IsPathNotFound());
   return s;
 }
 
-}  // namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE