]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/version_set.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / version_set.cc
index 5ae301a2d4b0dc8a278c607ae19de21bdd5aea80..434d791e74d27ffb10c7a4dd511ba6b8f022c01c 100644 (file)
@@ -1,7 +1,7 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 #include <inttypes.h>
 #include <stdio.h>
 #include <algorithm>
-#include <climits>
 #include <map>
 #include <set>
 #include <string>
 #include <unordered_map>
 #include <vector>
+#include <list>
 #include "db/compaction.h"
 #include "db/internal_stats.h"
 #include "db/log_reader.h"
@@ -32,6 +32,7 @@
 #include "db/pinned_iterators_manager.h"
 #include "db/table_cache.h"
 #include "db/version_builder.h"
+#include "monitoring/file_read_sample.h"
 #include "monitoring/perf_context_imp.h"
 #include "rocksdb/env.h"
 #include "rocksdb/merge_operator.h"
@@ -62,20 +63,39 @@ int FindFileInRange(const InternalKeyComparator& icmp,
     const Slice& key,
     uint32_t left,
     uint32_t right) {
-  while (left < right) {
-    uint32_t mid = (left + right) / 2;
-    const FdWithKeyRange& f = file_level.files[mid];
-    if (icmp.InternalKeyComparator::Compare(f.largest_key, key) < 0) {
-      // Key at "mid.largest" is < "target".  Therefore all
-      // files at or before "mid" are uninteresting.
-      left = mid + 1;
-    } else {
-      // Key at "mid.largest" is >= "target".  Therefore all files
-      // after "mid" are uninteresting.
-      right = mid;
+  auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
+    return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
+  };
+  const auto &b = file_level.files;
+  return static_cast<int>(std::lower_bound(b + left,
+                                           b + right, key, cmp) - b);
+}
+
+Status OverlapWithIterator(const Comparator* ucmp,
+    const Slice& smallest_user_key,
+    const Slice& largest_user_key,
+    InternalIterator* iter,
+    bool* overlap) {
+  InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
+                          kValueTypeForSeek);
+  iter->Seek(range_start.Encode());
+  if (!iter->status().ok()) {
+    return iter->status();
+  }
+
+  *overlap = false;
+  if (iter->Valid()) {
+    ParsedInternalKey seek_result;
+    if (!ParseInternalKey(iter->key(), &seek_result)) {
+      return Status::Corruption("DB have corrupted keys");
+    }
+
+    if (ucmp->Compare(seek_result.user_key, largest_user_key) <= 0) {
+      *overlap = true;
     }
   }
-  return right;
+
+  return iter->status();
 }
 
 // Class to help choose the next file to search for the particular key.
@@ -102,11 +122,15 @@ class FilePicker {
 #endif
         level_files_brief_(file_levels),
         is_hit_file_last_in_level_(false),
+        curr_file_level_(nullptr),
         user_key_(user_key),
         ikey_(ikey),
         file_indexer_(file_indexer),
         user_comparator_(user_comparator),
         internal_comparator_(internal_comparator) {
+#ifdef NDEBUG
+    (void)files;
+#endif
     // Setup member variables to search first level.
     search_ended_ = !PrepareNextLevel();
     if (!search_ended_) {
@@ -120,7 +144,7 @@ class FilePicker {
     }
   }
 
-  int GetCurrentLevel() { return returned_file_level_; }
+  int GetCurrentLevel() const { return curr_level_; }
 
   FdWithKeyRange* GetNextFile() {
     while (!search_ended_) {  // Loops over different levels.
@@ -134,8 +158,8 @@ class FilePicker {
 
         // Do key range filtering of files or/and fractional cascading if:
         // (1) not all the files are in level 0, or
-        // (2) there are more than 3 Level 0 files
-        // If there are only 3 or less level 0 files in the system, we skip
+        // (2) there are more than 3 current level files
+        // If there are only 3 or less current level files in the system, we skip
         // the key range filtering. In this case, more likely, the system is
         // highly tuned to minimize number of tables queried by each query,
         // so it is unlikely that key range filtering is more efficient than
@@ -327,11 +351,11 @@ Version::~Version() {
       assert(f->refs > 0);
       f->refs--;
       if (f->refs <= 0) {
-        if (f->table_reader_handle) {
-          cfd_->table_cache()->EraseHandle(f->fd, f->table_reader_handle);
-          f->table_reader_handle = nullptr;
-        }
-        vset_->obsolete_files_.push_back(f);
+        assert(cfd_ != nullptr);
+        uint32_t path_id = f->fd.GetPathId();
+        assert(path_id < cfd_->ioptions()->cf_paths.size());
+        vset_->obsolete_files_.push_back(
+            ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
       }
     }
   }
@@ -368,6 +392,7 @@ void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
 
     FdWithKeyRange& f = file_level->files[i];
     f.fd = files[i]->fd;
+    f.file_metadata = files[i];
     f.smallest_key = Slice(mem, smallest_size);
     f.largest_key = Slice(mem + smallest_size, largest_size);
   }
@@ -411,9 +436,9 @@ bool SomeFileOverlapsRange(
   // Binary search over file list
   uint32_t index = 0;
   if (smallest_user_key != nullptr) {
-    // Find the earliest possible internal key for smallest_user_key
+    // Find the leftmost possible internal key for smallest_user_key
     InternalKey small;
-    small.SetMaxPossibleForUserKey(*smallest_user_key);
+    small.SetMinPossibleForUserKey(*smallest_user_key);
     index = FindFile(icmp, file_level, small.Encode());
   }
 
@@ -426,118 +451,245 @@ bool SomeFileOverlapsRange(
 }
 
 namespace {
-
-// An internal iterator.  For a given version/level pair, yields
-// information about the files in the level.  For a given entry, key()
-// is the largest key that occurs in the file, and value() is an
-// 16-byte value containing the file number and file size, both
-// encoded using EncodeFixed64.
-class LevelFileNumIterator : public InternalIterator {
+class LevelIterator final : public InternalIterator {
  public:
-  LevelFileNumIterator(const InternalKeyComparator& icmp,
-                       const LevelFilesBrief* flevel)
-      : icmp_(icmp),
+  LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
+                const EnvOptions& env_options,
+                const InternalKeyComparator& icomparator,
+                const LevelFilesBrief* flevel,
+                const SliceTransform* prefix_extractor, bool should_sample,
+                HistogramImpl* file_read_hist, bool for_compaction,
+                bool skip_filters, int level, RangeDelAggregator* range_del_agg)
+      : table_cache_(table_cache),
+        read_options_(read_options),
+        env_options_(env_options),
+        icomparator_(icomparator),
         flevel_(flevel),
-        index_(static_cast<uint32_t>(flevel->num_files)),
-        current_value_(0, 0, 0) {  // Marks as invalid
-  }
-  virtual bool Valid() const override { return index_ < flevel_->num_files; }
-  virtual void Seek(const Slice& target) override {
-    index_ = FindFile(icmp_, *flevel_, target);
-  }
-  virtual void SeekForPrev(const Slice& target) override {
-    SeekForPrevImpl(target, &icmp_);
+        prefix_extractor_(prefix_extractor),
+        file_read_hist_(file_read_hist),
+        should_sample_(should_sample),
+        for_compaction_(for_compaction),
+        skip_filters_(skip_filters),
+        file_index_(flevel_->num_files),
+        level_(level),
+        range_del_agg_(range_del_agg),
+        pinned_iters_mgr_(nullptr) {
+    // Empty level is not supported.
+    assert(flevel_ != nullptr && flevel_->num_files > 0);
   }
 
-  virtual void SeekToFirst() override { index_ = 0; }
-  virtual void SeekToLast() override {
-    index_ = (flevel_->num_files == 0)
-                 ? 0
-                 : static_cast<uint32_t>(flevel_->num_files) - 1;
-  }
-  virtual void Next() override {
+  virtual ~LevelIterator() { delete file_iter_.Set(nullptr); }
+
+  virtual void Seek(const Slice& target) override;
+  virtual void SeekForPrev(const Slice& target) override;
+  virtual void SeekToFirst() override;
+  virtual void SeekToLast() override;
+  virtual void Next() override;
+  virtual void Prev() override;
+
+  virtual bool Valid() const override { return file_iter_.Valid(); }
+  virtual Slice key() const override {
     assert(Valid());
-    index_++;
+    return file_iter_.key();
   }
-  virtual void Prev() override {
+  virtual Slice value() const override {
     assert(Valid());
-    if (index_ == 0) {
-      index_ = static_cast<uint32_t>(flevel_->num_files);  // Marks as invalid
-    } else {
-      index_--;
+    return file_iter_.value();
+  }
+  virtual Status status() const override {
+    return file_iter_.iter() ? file_iter_.status() : Status::OK();
+  }
+  virtual void SetPinnedItersMgr(
+      PinnedIteratorsManager* pinned_iters_mgr) override {
+    pinned_iters_mgr_ = pinned_iters_mgr;
+    if (file_iter_.iter()) {
+      file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
     }
   }
-  Slice key() const override {
-    assert(Valid());
-    return flevel_->files[index_].largest_key;
+  virtual bool IsKeyPinned() const override {
+    return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+           file_iter_.iter() && file_iter_.IsKeyPinned();
   }
-  Slice value() const override {
-    assert(Valid());
-
-    auto file_meta = flevel_->files[index_];
-    current_value_ = file_meta.fd;
-    return Slice(reinterpret_cast<const char*>(&current_value_),
-                 sizeof(FileDescriptor));
+  virtual bool IsValuePinned() const override {
+    return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+           file_iter_.iter() && file_iter_.IsValuePinned();
   }
-  virtual Status status() const override { return Status::OK(); }
 
  private:
-  const InternalKeyComparator icmp_;
-  const LevelFilesBrief* flevel_;
-  uint32_t index_;
-  mutable FileDescriptor current_value_;
-};
+  void SkipEmptyFileForward();
+  void SkipEmptyFileBackward();
+  void SetFileIterator(InternalIterator* iter);
+  void InitFileIterator(size_t new_file_index);
 
-class LevelFileIteratorState : public TwoLevelIteratorState {
- public:
-  // @param skip_filters Disables loading/accessing the filter block
-  LevelFileIteratorState(TableCache* table_cache,
-                         const ReadOptions& read_options,
-                         const EnvOptions& env_options,
-                         const InternalKeyComparator& icomparator,
-                         HistogramImpl* file_read_hist, bool for_compaction,
-                         bool prefix_enabled, bool skip_filters, int level,
-                         RangeDelAggregator* range_del_agg)
-      : TwoLevelIteratorState(prefix_enabled),
-        table_cache_(table_cache),
-        read_options_(read_options),
-        env_options_(env_options),
-        icomparator_(icomparator),
-        file_read_hist_(file_read_hist),
-        for_compaction_(for_compaction),
-        skip_filters_(skip_filters),
-        level_(level),
-        range_del_agg_(range_del_agg) {}
+  const Slice& file_smallest_key(size_t file_index) {
+    assert(file_index < flevel_->num_files);
+    return flevel_->files[file_index].smallest_key;
+  }
 
-  InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override {
-    if (meta_handle.size() != sizeof(FileDescriptor)) {
-      return NewErrorInternalIterator(
-          Status::Corruption("FileReader invoked with unexpected value"));
-    }
-    const FileDescriptor* fd =
-        reinterpret_cast<const FileDescriptor*>(meta_handle.data());
-    return table_cache_->NewIterator(
-        read_options_, env_options_, icomparator_, *fd, range_del_agg_,
-        nullptr /* don't need reference to table */, file_read_hist_,
-        for_compaction_, nullptr /* arena */, skip_filters_, level_);
+  bool KeyReachedUpperBound(const Slice& internal_key) {
+    return read_options_.iterate_upper_bound != nullptr &&
+           icomparator_.user_comparator()->Compare(
+               ExtractUserKey(internal_key),
+               *read_options_.iterate_upper_bound) >= 0;
   }
 
-  bool PrefixMayMatch(const Slice& internal_key) override {
-    return true;
+  InternalIterator* NewFileIterator() {
+    assert(file_index_ < flevel_->num_files);
+    auto file_meta = flevel_->files[file_index_];
+    if (should_sample_) {
+      sample_file_read_inc(file_meta.file_metadata);
+    }
+
+    return table_cache_->NewIterator(
+        read_options_, env_options_, icomparator_, *file_meta.file_metadata,
+        range_del_agg_, prefix_extractor_,
+        nullptr /* don't need reference to table */,
+        file_read_hist_, for_compaction_, nullptr /* arena */, skip_filters_,
+        level_);
   }
 
- private:
   TableCache* table_cache_;
   const ReadOptions read_options_;
   const EnvOptions& env_options_;
   const InternalKeyComparator& icomparator_;
+  const LevelFilesBrief* flevel_;
+  mutable FileDescriptor current_value_;
+  const SliceTransform* prefix_extractor_;
+
   HistogramImpl* file_read_hist_;
+  bool should_sample_;
   bool for_compaction_;
   bool skip_filters_;
+  size_t file_index_;
   int level_;
   RangeDelAggregator* range_del_agg_;
+  IteratorWrapper file_iter_;  // May be nullptr
+  PinnedIteratorsManager* pinned_iters_mgr_;
 };
 
+void LevelIterator::Seek(const Slice& target) {
+  size_t new_file_index = FindFile(icomparator_, *flevel_, target);
+
+  InitFileIterator(new_file_index);
+  if (file_iter_.iter() != nullptr) {
+    file_iter_.Seek(target);
+  }
+  SkipEmptyFileForward();
+}
+
+void LevelIterator::SeekForPrev(const Slice& target) {
+  size_t new_file_index = FindFile(icomparator_, *flevel_, target);
+  if (new_file_index >= flevel_->num_files) {
+    new_file_index = flevel_->num_files - 1;
+  }
+
+  InitFileIterator(new_file_index);
+  if (file_iter_.iter() != nullptr) {
+    file_iter_.SeekForPrev(target);
+    SkipEmptyFileBackward();
+  }
+}
+
+void LevelIterator::SeekToFirst() {
+  InitFileIterator(0);
+  if (file_iter_.iter() != nullptr) {
+    file_iter_.SeekToFirst();
+  }
+  SkipEmptyFileForward();
+}
+
+void LevelIterator::SeekToLast() {
+  InitFileIterator(flevel_->num_files - 1);
+  if (file_iter_.iter() != nullptr) {
+    file_iter_.SeekToLast();
+  }
+  SkipEmptyFileBackward();
+}
+
+void LevelIterator::Next() {
+  assert(Valid());
+  file_iter_.Next();
+  SkipEmptyFileForward();
+}
+
+void LevelIterator::Prev() {
+  assert(Valid());
+  file_iter_.Prev();
+  SkipEmptyFileBackward();
+}
+
+void LevelIterator::SkipEmptyFileForward() {
+  while (file_iter_.iter() == nullptr ||
+         (!file_iter_.Valid() && file_iter_.status().ok() &&
+          !file_iter_.iter()->IsOutOfBound())) {
+    // Move to next file
+    if (file_index_ >= flevel_->num_files - 1) {
+      // Already at the last file
+      SetFileIterator(nullptr);
+      return;
+    }
+    if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
+      SetFileIterator(nullptr);
+      return;
+    }
+    InitFileIterator(file_index_ + 1);
+    if (file_iter_.iter() != nullptr) {
+      file_iter_.SeekToFirst();
+    }
+  }
+}
+
+void LevelIterator::SkipEmptyFileBackward() {
+  while (file_iter_.iter() == nullptr ||
+         (!file_iter_.Valid() && file_iter_.status().ok())) {
+    // Move to previous file
+    if (file_index_ == 0) {
+      // Already the first file
+      SetFileIterator(nullptr);
+      return;
+    }
+    InitFileIterator(file_index_ - 1);
+    if (file_iter_.iter() != nullptr) {
+      file_iter_.SeekToLast();
+    }
+  }
+}
+
+void LevelIterator::SetFileIterator(InternalIterator* iter) {
+  if (pinned_iters_mgr_ && iter) {
+    iter->SetPinnedItersMgr(pinned_iters_mgr_);
+  }
+
+  InternalIterator* old_iter = file_iter_.Set(iter);
+  if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
+    pinned_iters_mgr_->PinIterator(old_iter);
+  } else {
+    delete old_iter;
+  }
+}
+
+void LevelIterator::InitFileIterator(size_t new_file_index) {
+  if (new_file_index >= flevel_->num_files) {
+    file_index_ = new_file_index;
+    SetFileIterator(nullptr);
+    return;
+  } else {
+    // If the file iterator shows incomplete, we try it again if users seek
+    // to the same file, as this time we may go to a different data block
+    // which is cached in block cache.
+    //
+    if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
+        new_file_index == file_index_) {
+      // file_iter_ is already constructed with this iterator, so
+      // no need to change anything
+    } else {
+      file_index_ = new_file_index;
+      InternalIterator* iter = NewFileIterator();
+      SetFileIterator(iter);
+    }
+  }
+}
+
 // A wrapper of version builder which references the current version in
 // constructor and unref it in the destructor.
 // Both of the constructor and destructor need to be called inside DB Mutex.
@@ -568,8 +720,8 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
   auto table_cache = cfd_->table_cache();
   auto ioptions = cfd_->ioptions();
   Status s = table_cache->GetTableProperties(
-      vset_->env_options_, cfd_->internal_comparator(), file_meta->fd,
-      tp, true /* no io */);
+      env_options_, cfd_->internal_comparator(), file_meta->fd, tp,
+      mutable_cf_options_.prefix_extractor.get(), true /* no io */);
   if (s.ok()) {
     return s;
   }
@@ -583,15 +735,15 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
   // 2. Table is not present in table cache, we'll read the table properties
   // directly from the properties block in the file.
   std::unique_ptr<RandomAccessFile> file;
+  std::string file_name;
   if (fname != nullptr) {
-    s = ioptions->env->NewRandomAccessFile(
-        *fname, &file, vset_->env_options_);
+    file_name = *fname;
   } else {
-    s = ioptions->env->NewRandomAccessFile(
-        TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
-                      file_meta->fd.GetPathId()),
-        &file, vset_->env_options_);
+    file_name =
+      TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
+                    file_meta->fd.GetPathId());
   }
+  s = ioptions->env->NewRandomAccessFile(file_name, &file, env_options_);
   if (!s.ok()) {
     return s;
   }
@@ -600,10 +752,11 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
   // By setting the magic number to kInvalidTableMagicNumber, we can by
   // pass the magic number check in the footer.
   std::unique_ptr<RandomAccessFileReader> file_reader(
-      new RandomAccessFileReader(std::move(file)));
+      new RandomAccessFileReader(std::move(file), file_name));
   s = ReadTableProperties(
       file_reader.get(), file_meta->fd.GetFileSize(),
-      Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions, &raw_table_properties);
+      Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
+      &raw_table_properties, false /* compression_type_missing */);
   if (!s.ok()) {
     return s;
   }
@@ -629,7 +782,7 @@ Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
                                          int level) {
   for (const auto& file_meta : storage_info_.files_[level]) {
     auto fname =
-        TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
+        TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
                       file_meta->fd.GetPathId());
     // 1. If the table is already present in table cache, load table
     // properties from there.
@@ -657,7 +810,7 @@ Status Version::GetPropertiesOfTablesInRange(
                                          false);
       for (const auto& file_meta : files) {
         auto fname =
-            TableFileName(vset_->db_options_->db_paths,
+            TableFileName(cfd_->ioptions()->cf_paths,
                           file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
         if (props->count(fname) == 0) {
           // 1. If the table is already present in table cache, load table
@@ -703,8 +856,8 @@ size_t Version::GetMemoryUsageByTableReaders() {
   for (auto& file_level : storage_info_.level_files_brief_) {
     for (size_t i = 0; i < file_level.num_files; i++) {
       total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
-          vset_->env_options_, cfd_->internal_comparator(),
-          file_level.files[i].fd);
+          env_options_, cfd_->internal_comparator(), file_level.files[i].fd,
+          mutable_cf_options_.prefix_extractor.get());
     }
   }
   return total_usage;
@@ -729,21 +882,22 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
     for (const auto& file : vstorage->LevelFiles(level)) {
       uint32_t path_id = file->fd.GetPathId();
       std::string file_path;
-      if (path_id < ioptions->db_paths.size()) {
-        file_path = ioptions->db_paths[path_id].path;
+      if (path_id < ioptions->cf_paths.size()) {
+        file_path = ioptions->cf_paths[path_id].path;
       } else {
-        assert(!ioptions->db_paths.empty());
-        file_path = ioptions->db_paths.back().path;
+        assert(!ioptions->cf_paths.empty());
+        file_path = ioptions->cf_paths.back().path;
       }
-      files.emplace_back(
+      files.emplace_back(SstFileMetaData{
           MakeTableFileName("", file->fd.GetNumber()),
           file_path,
-          file->fd.GetFileSize(),
-          file->smallest_seqno,
-          file->largest_seqno,
+          static_cast<size_t>(file->fd.GetFileSize()),
+          file->fd.smallest_seqno,
+          file->fd.largest_seqno,
           file->smallest.user_key().ToString(),
           file->largest.user_key().ToString(),
-          file->being_compacted);
+          file->stats.num_reads_sampled.load(std::memory_order_relaxed),
+          file->being_compacted});
       level_size += file->fd.GetFileSize();
     }
     cf_meta->levels.emplace_back(
@@ -752,6 +906,15 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
   }
 }
 
+uint64_t Version::GetSstFilesSize() {
+  uint64_t sst_files_size = 0;
+  for (int level = 0; level < storage_info_.num_levels_; level++) {
+    for (const auto& file_meta : storage_info_.LevelFiles(level)) {
+      sst_files_size += file_meta->fd.GetFileSize();
+    }
+  }
+  return sst_files_size;
+}
 
 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
   // Estimation will be inaccurate when:
@@ -826,36 +989,97 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
     return;
   }
 
+  bool should_sample = should_sample_file_read();
+
   auto* arena = merge_iter_builder->GetArena();
   if (level == 0) {
     // Merge all level zero files together since they may overlap
     for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
       const auto& file = storage_info_.LevelFilesBrief(0).files[i];
       merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
-          read_options, soptions, cfd_->internal_comparator(), file.fd,
-          range_del_agg, nullptr, cfd_->internal_stats()->GetFileReadHist(0),
-          false, arena, false /* skip_filters */, 0 /* level */));
+          read_options, soptions, cfd_->internal_comparator(), *file.file_metadata,
+          range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
+          cfd_->internal_stats()->GetFileReadHist(0), false, arena,
+          false /* skip_filters */, 0 /* level */));
     }
-  } else {
+    if (should_sample) {
+      // Count ones for every L0 files. This is done per iterator creation
+      // rather than Seek(), while files in other levels are recored per seek.
+      // If users execute one range query per iterator, there may be some
+      // discrepancy here.
+      for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
+        sample_file_read_inc(meta);
+      }
+    }
+  } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
     // For levels > 0, we can use a concatenating iterator that sequentially
     // walks through the non-overlapping files in the level, opening them
     // lazily.
-    auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState));
-    auto* state = new (mem)
-        LevelFileIteratorState(cfd_->table_cache(), read_options, soptions,
-                               cfd_->internal_comparator(),
-                               cfd_->internal_stats()->GetFileReadHist(level),
-                               false /* for_compaction */,
-                               cfd_->ioptions()->prefix_extractor != nullptr,
-                               IsFilterSkipped(level), level, range_del_agg);
-    mem = arena->AllocateAligned(sizeof(LevelFileNumIterator));
-    auto* first_level_iter = new (mem) LevelFileNumIterator(
-        cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level));
-    merge_iter_builder->AddIterator(
-        NewTwoLevelIterator(state, first_level_iter, arena, false));
+    auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
+    merge_iter_builder->AddIterator(new (mem) LevelIterator(
+        cfd_->table_cache(), read_options, soptions,
+        cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
+        mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
+        cfd_->internal_stats()->GetFileReadHist(level),
+        false /* for_compaction */, IsFilterSkipped(level), level,
+        range_del_agg));
   }
 }
 
+Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
+                                         const EnvOptions& env_options,
+                                         const Slice& smallest_user_key,
+                                         const Slice& largest_user_key,
+                                         int level, bool* overlap) {
+  assert(storage_info_.finalized_);
+
+  auto icmp = cfd_->internal_comparator();
+  auto ucmp = icmp.user_comparator();
+
+  Arena arena;
+  Status status;
+  RangeDelAggregator range_del_agg(icmp, {}, false);
+
+  *overlap = false;
+
+  if (level == 0) {
+    for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
+      const auto file = &storage_info_.LevelFilesBrief(0).files[i];
+      if (AfterFile(ucmp, &smallest_user_key, file) ||
+          BeforeFile(ucmp, &largest_user_key, file)) {
+        continue;
+      }
+      ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
+          read_options, env_options, cfd_->internal_comparator(), *file->file_metadata,
+          &range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
+          cfd_->internal_stats()->GetFileReadHist(0), false, &arena,
+          false /* skip_filters */, 0 /* level */));
+      status = OverlapWithIterator(
+          ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
+      if (!status.ok() || *overlap) {
+        break;
+      }
+    }
+  } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
+    auto mem = arena.AllocateAligned(sizeof(LevelIterator));
+    ScopedArenaIterator iter(new (mem) LevelIterator(
+        cfd_->table_cache(), read_options, env_options,
+        cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
+        mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
+        cfd_->internal_stats()->GetFileReadHist(level),
+        false /* for_compaction */, IsFilterSkipped(level), level,
+        &range_del_agg));
+    status = OverlapWithIterator(
+        ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
+  }
+
+  if (status.ok() && *overlap == false &&
+      range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
+    *overlap = true;
+  }
+  return status;
+}
+
 VersionStorageInfo::VersionStorageInfo(
     const InternalKeyComparator* internal_comparator,
     const Comparator* user_comparator, int levels,
@@ -897,10 +1121,13 @@ VersionStorageInfo::VersionStorageInfo(
     current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
     current_num_deletions_ = ref_vstorage->current_num_deletions_;
     current_num_samples_ = ref_vstorage->current_num_samples_;
+    oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
   }
 }
 
 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
+                 const EnvOptions& env_opt,
+                 const MutableCFOptions mutable_cf_options,
                  uint64_t version_number)
     : env_(vset->env_),
       cfd_(column_family_data),
@@ -924,13 +1151,16 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
       next_(this),
       prev_(this),
       refs_(0),
+      env_options_(env_opt),
+      mutable_cf_options_(mutable_cf_options),
       version_number_(version_number) {}
 
 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
                   PinnableSlice* value, Status* status,
                   MergeContext* merge_context,
                   RangeDelAggregator* range_del_agg, bool* value_found,
-                  bool* key_exists, SequenceNumber* seq) {
+                  bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
+                  bool* is_blob) {
   Slice ikey = k.internal_key();
   Slice user_key = k.user_key();
 
@@ -946,7 +1176,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
       user_comparator(), merge_operator_, info_log_, db_statistics_,
       status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
       value, value_found, merge_context, range_del_agg, this->env_, seq,
-      merge_operator_ ? &pinned_iters_mgr : nullptr);
+      merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
 
   // Pin blocks that we read to hold merge operands
   if (merge_operator_) {
@@ -958,9 +1188,15 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
       storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
       user_comparator(), internal_comparator());
   FdWithKeyRange* f = fp.GetNextFile();
+
   while (f != nullptr) {
+    if (get_context.sample()) {
+      sample_file_read_inc(f->file_metadata);
+    }
+
     *status = table_cache_->Get(
-        read_options, *internal_comparator(), f->fd, ikey, &get_context,
+        read_options, *internal_comparator(), *f->file_metadata, ikey,
+        &get_context, mutable_cf_options_.prefix_extractor.get(),
         cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
         IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
                         fp.IsHitFileLastInLevel()),
@@ -970,10 +1206,18 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
       return;
     }
 
+    // report the counters before returning
+    if (get_context.State() != GetContext::kNotFound &&
+        get_context.State() != GetContext::kMerge &&
+        db_statistics_ != nullptr) {
+      get_context.ReportCounters();
+    }
     switch (get_context.State()) {
       case GetContext::kNotFound:
         // Keep searching in other files
         break;
+      case GetContext::kMerge:
+        break;
       case GetContext::kFound:
         if (fp.GetHitFileLevel() == 0) {
           RecordTick(db_statistics_, GET_HIT_L0);
@@ -990,12 +1234,19 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
       case GetContext::kCorrupt:
         *status = Status::Corruption("corrupted key for ", user_key);
         return;
-      case GetContext::kMerge:
-        break;
+      case GetContext::kBlobIndex:
+        ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
+        *status = Status::NotSupported(
+            "Encounter unexpected blob index. Please open DB with "
+            "rocksdb::blob_db::BlobDB instead.");
+        return;
     }
     f = fp.GetNextFile();
   }
 
+  if (db_statistics_ != nullptr) {
+    get_context.ReportCounters();
+  }
   if (GetContext::kMerge == get_context.State()) {
     if (!merge_operator_) {
       *status =  Status::InvalidArgument(
@@ -1007,7 +1258,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
     std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
     *status = MergeHelper::TimedFullMerge(
         merge_operator_, user_key, nullptr, merge_context->GetOperands(),
-        str_value, info_log_, db_statistics_, env_);
+        str_value, info_log_, db_statistics_, env_,
+        nullptr /* result_operand */, true);
     if (LIKELY(value != nullptr)) {
       value->PinSelf();
     }
@@ -1045,6 +1297,7 @@ void Version::PrepareApply(
   storage_info_.GenerateFileIndexer();
   storage_info_.GenerateLevelFilesBrief();
   storage_info_.GenerateLevel0NonOverlapping();
+  storage_info_.GenerateBottommostFiles();
 }
 
 bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
@@ -1119,8 +1372,11 @@ void Version::UpdateAccumulatedStats(bool update_stats) {
           storage_info_.UpdateAccumulatedStats(file_meta);
           // when option "max_open_files" is -1, all the file metadata has
           // already been read, so MaybeInitializeFileMetaData() won't incur
-          // any I/O cost.
-          if (vset_->db_options_->max_open_files == -1) {
+          // any I/O cost. "max_open_files=-1" means that the table cache passed
+          // to the VersionSet and then to the ColumnFamilySet has a size of
+          // TableCache::kInfiniteCapacity
+          if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
+              TableCache::kInfiniteCapacity) {
             continue;
           }
           if (++init_count >= kMaxInitCount) {
@@ -1185,6 +1441,14 @@ int VersionStorageInfo::MaxInputLevel() const {
   return 0;
 }
 
+int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
+  if (allow_ingest_behind) {
+    assert(num_levels() > 1);
+    return num_levels() - 2;
+  }
+  return num_levels() - 1;
+}
+
 void VersionStorageInfo::EstimateCompactionBytesNeeded(
     const MutableCFOptions& mutable_cf_options) {
   // Only implemented for level-based compaction
@@ -1204,15 +1468,18 @@ void VersionStorageInfo::EstimateCompactionBytesNeeded(
   // accumulated bytes.
 
   uint64_t bytes_compact_to_next_level = 0;
+  uint64_t level_size = 0;
+  for (auto* f : files_[0]) {
+    level_size += f->fd.GetFileSize();
+  }
   // Level 0
   bool level0_compact_triggered = false;
-  if (static_cast<int>(files_[0].size()) >
-      mutable_cf_options.level0_file_num_compaction_trigger) {
+  if (static_cast<int>(files_[0].size()) >=
+          mutable_cf_options.level0_file_num_compaction_trigger ||
+      level_size >= mutable_cf_options.max_bytes_for_level_base) {
     level0_compact_triggered = true;
-    for (auto* f : files_[0]) {
-      bytes_compact_to_next_level += f->fd.GetFileSize();
-    }
-    estimated_compaction_needed_bytes_ = bytes_compact_to_next_level;
+    estimated_compaction_needed_bytes_ = level_size;
+    bytes_compact_to_next_level = level_size;
   } else {
     estimated_compaction_needed_bytes_ = 0;
   }
@@ -1220,7 +1487,7 @@ void VersionStorageInfo::EstimateCompactionBytesNeeded(
   // Level 1 and up.
   uint64_t bytes_next_level = 0;
   for (int level = base_level(); level <= MaxInputLevel(); level++) {
-    uint64_t level_size = 0;
+    level_size = 0;
     if (bytes_next_level > 0) {
 #ifndef NDEBUG
       uint64_t level_size2 = 0;
@@ -1267,6 +1534,33 @@ void VersionStorageInfo::EstimateCompactionBytesNeeded(
   }
 }
 
+namespace {
+uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
+                                 const MutableCFOptions& mutable_cf_options,
+                                 const std::vector<FileMetaData*>& files) {
+  uint32_t ttl_expired_files_count = 0;
+
+  int64_t _current_time;
+  auto status = ioptions.env->GetCurrentTime(&_current_time);
+  if (status.ok()) {
+    const uint64_t current_time = static_cast<uint64_t>(_current_time);
+    for (auto f : files) {
+      if (!f->being_compacted && f->fd.table_reader != nullptr &&
+          f->fd.table_reader->GetTableProperties() != nullptr) {
+        auto creation_time =
+            f->fd.table_reader->GetTableProperties()->creation_time;
+        if (creation_time > 0 &&
+            creation_time < (current_time -
+                             mutable_cf_options.compaction_options_fifo.ttl)) {
+          ttl_expired_files_count++;
+        }
+      }
+    }
+  }
+  return ttl_expired_files_count;
+}
+}  // anonymous namespace
+
 void VersionStorageInfo::ComputeCompactionScore(
     const ImmutableCFOptions& immutable_cf_options,
     const MutableCFOptions& mutable_cf_options) {
@@ -1304,9 +1598,21 @@ void VersionStorageInfo::ComputeCompactionScore(
       }
 
       if (compaction_style_ == kCompactionStyleFIFO) {
-        score =
-            static_cast<double>(total_size) /
-            immutable_cf_options.compaction_options_fifo.max_table_files_size;
+        score = static_cast<double>(total_size) /
+                mutable_cf_options.compaction_options_fifo.max_table_files_size;
+        if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
+          score = std::max(
+              static_cast<double>(num_sorted_runs) /
+                  mutable_cf_options.level0_file_num_compaction_trigger,
+              score);
+        }
+        if (mutable_cf_options.compaction_options_fifo.ttl > 0) {
+          score = std::max(
+              static_cast<double>(GetExpiredTtlFilesCount(
+                  immutable_cf_options, mutable_cf_options, files_[level])),
+              score);
+        }
+
       } else {
         score = static_cast<double>(num_sorted_runs) /
                 mutable_cf_options.level0_file_num_compaction_trigger;
@@ -1349,6 +1655,10 @@ void VersionStorageInfo::ComputeCompactionScore(
     }
   }
   ComputeFilesMarkedForCompaction();
+  ComputeBottommostFilesMarkedForCompaction();
+  if (mutable_cf_options.ttl > 0) {
+    ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
+  }
   EstimateCompactionBytesNeeded(mutable_cf_options);
 }
 
@@ -1375,6 +1685,33 @@ void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
   }
 }
 
+void VersionStorageInfo::ComputeExpiredTtlFiles(
+    const ImmutableCFOptions& ioptions, const uint64_t ttl) {
+  assert(ttl > 0);
+
+  expired_ttl_files_.clear();
+
+  int64_t _current_time;
+  auto status = ioptions.env->GetCurrentTime(&_current_time);
+  if (!status.ok()) {
+    return;
+  }
+  const uint64_t current_time = static_cast<uint64_t>(_current_time);
+
+  for (int level = 0; level < num_levels() - 1; level++) {
+    for (auto f : files_[level]) {
+      if (!f->being_compacted && f->fd.table_reader != nullptr &&
+          f->fd.table_reader->GetTableProperties() != nullptr) {
+        auto creation_time =
+            f->fd.table_reader->GetTableProperties()->creation_time;
+        if (creation_time > 0 && creation_time < (current_time - ttl)) {
+          expired_ttl_files_.emplace_back(level, f);
+        }
+      }
+    }
+  }
+}
+
 namespace {
 
 // used to sort files by size
@@ -1411,6 +1748,8 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
     }
     assert(false);
   }
+#else
+  (void)info_log;
 #endif
   f->refs++;
   level_files->push_back(f);
@@ -1424,6 +1763,7 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
 // 4. GenerateFileIndexer();
 // 5. GenerateLevelFilesBrief();
 // 6. GenerateLevel0NonOverlapping();
+// 7. GenerateBottommostFiles();
 void VersionStorageInfo::SetFinalized() {
   finalized_ = true;
 #ifndef NDEBUG
@@ -1515,7 +1855,8 @@ void SortFileByOverlappingRatio(
 
 void VersionStorageInfo::UpdateFilesByCompactionPri(
     CompactionPri compaction_pri) {
-  if (compaction_style_ == kCompactionStyleFIFO ||
+  if (compaction_style_ == kCompactionStyleNone ||
+      compaction_style_ == kCompactionStyleFIFO ||
       compaction_style_ == kCompactionStyleUniversal) {
     // don't need this
     return;
@@ -1546,13 +1887,15 @@ void VersionStorageInfo::UpdateFilesByCompactionPri(
       case kOldestLargestSeqFirst:
         std::sort(temp.begin(), temp.end(),
                   [](const Fsize& f1, const Fsize& f2) -> bool {
-                    return f1.file->largest_seqno < f2.file->largest_seqno;
+                    return f1.file->fd.largest_seqno <
+                           f2.file->fd.largest_seqno;
                   });
         break;
       case kOldestSmallestSeqFirst:
         std::sort(temp.begin(), temp.end(),
                   [](const Fsize& f1, const Fsize& f2) -> bool {
-                    return f1.file->smallest_seqno < f2.file->smallest_seqno;
+                    return f1.file->fd.smallest_seqno <
+                           f2.file->fd.smallest_seqno;
                   });
         break;
       case kMinOverlappingRatio:
@@ -1600,6 +1943,58 @@ void VersionStorageInfo::GenerateLevel0NonOverlapping() {
   }
 }
 
+void VersionStorageInfo::GenerateBottommostFiles() {
+  assert(!finalized_);
+  assert(bottommost_files_.empty());
+  for (size_t level = 0; level < level_files_brief_.size(); ++level) {
+    for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
+         ++file_idx) {
+      const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
+      int l0_file_idx;
+      if (level == 0) {
+        l0_file_idx = static_cast<int>(file_idx);
+      } else {
+        l0_file_idx = -1;
+      }
+      if (!RangeMightExistAfterSortedRun(f.smallest_key, f.largest_key,
+                                         static_cast<int>(level),
+                                         l0_file_idx)) {
+        bottommost_files_.emplace_back(static_cast<int>(level),
+                                       f.file_metadata);
+      }
+    }
+  }
+}
+
+void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
+  assert(seqnum >= oldest_snapshot_seqnum_);
+  oldest_snapshot_seqnum_ = seqnum;
+  if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
+    ComputeBottommostFilesMarkedForCompaction();
+  }
+}
+
+void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
+  bottommost_files_marked_for_compaction_.clear();
+  bottommost_files_mark_threshold_ = kMaxSequenceNumber;
+  for (auto& level_and_file : bottommost_files_) {
+    if (!level_and_file.second->being_compacted &&
+        level_and_file.second->fd.largest_seqno != 0 &&
+        level_and_file.second->num_deletions > 1) {
+      // largest_seqno might be nonzero due to containing the final key in an
+      // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
+      // ensures the file really contains deleted or overwritten keys.
+      if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
+        bottommost_files_marked_for_compaction_.push_back(level_and_file);
+      } else {
+        bottommost_files_mark_threshold_ =
+            std::min(bottommost_files_mark_threshold_,
+                     level_and_file.second->fd.largest_seqno);
+      }
+    }
+  }
+}
+
 void Version::Ref() {
   ++refs_;
 }
@@ -1633,13 +2028,29 @@ bool VersionStorageInfo::OverlapInLevel(int level,
 void VersionStorageInfo::GetOverlappingInputs(
     int level, const InternalKey* begin, const InternalKey* end,
     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
-    bool expand_range) const {
+    bool expand_range, InternalKey** next_smallest) const {
   if (level >= num_non_empty_levels_) {
     // this level is empty, no overlapping inputs
     return;
   }
 
   inputs->clear();
+  if (file_index) {
+    *file_index = -1;
+  }
+  const Comparator* user_cmp = user_comparator_;
+  if (level > 0) {
+    GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
+                                          file_index, false, next_smallest);
+    return;
+  }
+
+  if (next_smallest) {
+    // next_smallest key only makes sense for non-level 0, where files are
+    // non-overlapping
+    *next_smallest = nullptr;
+  }
+
   Slice user_begin, user_end;
   if (begin != nullptr) {
     user_begin = begin->user_key();
@@ -1647,43 +2058,52 @@ void VersionStorageInfo::GetOverlappingInputs(
   if (end != nullptr) {
     user_end = end->user_key();
   }
-  if (file_index) {
-    *file_index = -1;
-  }
-  const Comparator* user_cmp = user_comparator_;
-  if (begin != nullptr && end != nullptr && level > 0) {
-    GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs,
-                                          hint_index, file_index);
-    return;
-  }
 
-  for (size_t i = 0; i < level_files_brief_[level].num_files; ) {
-    FdWithKeyRange* f = &(level_files_brief_[level].files[i++]);
-    const Slice file_start = ExtractUserKey(f->smallest_key);
-    const Slice file_limit = ExtractUserKey(f->largest_key);
-    if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
-      // "f" is completely before specified range; skip it
-    } else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 0) {
-      // "f" is completely after specified range; skip it
-    } else {
-      inputs->push_back(files_[level][i-1]);
-      if (level == 0 && expand_range) {
-        // Level-0 files may overlap each other.  So check if the newly
-        // added file has expanded the range.  If so, restart search.
-        if (begin != nullptr && user_cmp->Compare(file_start, user_begin) < 0) {
-          user_begin = file_start;
-          inputs->clear();
-          i = 0;
-        } else if (end != nullptr
-            && user_cmp->Compare(file_limit, user_end) > 0) {
-          user_end = file_limit;
-          inputs->clear();
-          i = 0;
+  // index stores the file index need to check.
+  std::list<size_t> index;
+  for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
+    index.emplace_back(i);
+  }
+
+  while (!index.empty()) {
+    bool found_overlapping_file = false;
+    auto iter = index.begin();
+    while (iter != index.end()) {
+      FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
+      const Slice file_start = ExtractUserKey(f->smallest_key);
+      const Slice file_limit = ExtractUserKey(f->largest_key);
+      if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
+        // "f" is completely before specified range; skip it
+        iter++;
+      } else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 0) {
+        // "f" is completely after specified range; skip it
+        iter++;
+      } else {
+        // if overlap
+        inputs->emplace_back(files_[level][*iter]);
+        found_overlapping_file = true;
+        // record the first file index.
+        if (file_index && *file_index == -1) {
+          *file_index = static_cast<int>(*iter);
+        }
+        // the related file is overlap, erase to avoid checking again.
+        iter = index.erase(iter);
+        if (expand_range) {
+          if (begin != nullptr &&
+              user_cmp->Compare(file_start, user_begin) < 0) {
+            user_begin = file_start;
+          }
+          if (end != nullptr &&
+              user_cmp->Compare(file_limit, user_end) > 0) {
+            user_end = file_limit;
+          }
         }
-      } else if (file_index) {
-        *file_index = static_cast<int>(i) - 1;
       }
     }
+    // if all the files left are not overlap, break
+    if (!found_overlapping_file) {
+      break;
+    }
   }
 }
 
@@ -1696,29 +2116,84 @@ void VersionStorageInfo::GetOverlappingInputs(
 void VersionStorageInfo::GetCleanInputsWithinInterval(
     int level, const InternalKey* begin, const InternalKey* end,
     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
-  if (level >= num_non_empty_levels_) {
+  inputs->clear();
+  if (file_index) {
+    *file_index = -1;
+  }
+  if (level >= num_non_empty_levels_ || level == 0 ||
+      level_files_brief_[level].num_files == 0) {
     // this level is empty, no inputs within range
+    // also don't support clean input interval within L0
     return;
   }
 
-  inputs->clear();
-  Slice user_begin, user_end;
-  if (begin != nullptr) {
-    user_begin = begin->user_key();
+  const auto& level_files = level_files_brief_[level];
+  if (begin == nullptr) {
+    begin = &level_files.files[0].file_metadata->smallest;
   }
-  if (end != nullptr) {
-    user_end = end->user_key();
+  if (end == nullptr) {
+    end = &level_files.files[level_files.num_files - 1].file_metadata->largest;
   }
-  if (file_index) {
-    *file_index = -1;
+
+  GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
+                                        hint_index, file_index,
+                                        true /* within_interval */);
+}
+
+namespace {
+
+const uint64_t kRangeTombstoneSentinel =
+    PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
+
+// Utility for comparing sstable boundary keys. Returns -1 if either a or b is
+// null which provides the property that a==null indicates a key that is less
+// than any key and b==null indicates a key that is greater than any key. Note
+// that the comparison is performed primarily on the user-key portion of the
+// key. If the user-keys compare equal, an additional test is made to sort
+// range tombstone sentinel keys before other keys with the same user-key. The
+// result is that 2 user-keys will compare equal if they differ purely on
+// their sequence number and value, but the range tombstone sentinel for that
+// user-key will compare not equal. This is necessary because the range
+// tombstone sentinel key is set as the largest key for an sstable even though
+// that key never appears in the database. We don't want adjacent sstables to
+// be considered overlapping if they are separated by the range tombstone
+// sentinel.
+int sstableKeyCompare(const Comparator* user_cmp,
+                      const InternalKey& a, const InternalKey& b) {
+  auto c = user_cmp->Compare(a.user_key(), b.user_key());
+  if (c != 0) {
+    return c;
+  }
+  auto a_footer = ExtractInternalKeyFooter(a.Encode());
+  auto b_footer = ExtractInternalKeyFooter(b.Encode());
+  if (a_footer == kRangeTombstoneSentinel) {
+    if (b_footer != kRangeTombstoneSentinel) {
+      return -1;
+    }
+  } else if (b_footer == kRangeTombstoneSentinel) {
+    return 1;
   }
-  if (begin != nullptr && end != nullptr && level > 0) {
-    GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs,
-                                          hint_index, file_index,
-                                          true /* within_interval */);
+  return 0;
+}
+
+int sstableKeyCompare(const Comparator* user_cmp,
+                      const InternalKey* a, const InternalKey& b) {
+  if (a == nullptr) {
+    return -1;
   }
+  return sstableKeyCompare(user_cmp, *a, b);
 }
 
+int sstableKeyCompare(const Comparator* user_cmp,
+                      const InternalKey& a, const InternalKey* b) {
+  if (b == nullptr) {
+    return -1;
+  }
+  return sstableKeyCompare(user_cmp, a, *b);
+}
+
+} // namespace
+
 // Store in "*inputs" all files in "level" that overlap [begin,end]
 // Employ binary search to find at least one file that overlaps the
 // specified range. From that file, iterate backwards and
@@ -1727,15 +2202,15 @@ void VersionStorageInfo::GetCleanInputsWithinInterval(
 // within range [begin, end]. "clean" means there is a boudnary
 // between the files in "*inputs" and the surrounding files
 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
-    int level, const Slice& user_begin, const Slice& user_end,
+    int level, const InternalKey* begin, const InternalKey* end,
     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
-    bool within_interval) const {
+    bool within_interval, InternalKey** next_smallest) const {
   assert(level > 0);
   int min = 0;
   int mid = 0;
   int max = static_cast<int>(files_[level].size()) - 1;
   bool foundOverlap = false;
-  const Comparator* user_cmp = user_comparator_;
+  auto user_cmp = user_comparator_;
 
   // if the caller already knows the index of a file that has overlap,
   // then we can skip the binary search.
@@ -1747,15 +2222,15 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
   while (!foundOverlap && min <= max) {
     mid = (min + max)/2;
     FdWithKeyRange* f = &(level_files_brief_[level].files[mid]);
-    const Slice file_start = ExtractUserKey(f->smallest_key);
-    const Slice file_limit = ExtractUserKey(f->largest_key);
-    if ((!within_interval && user_cmp->Compare(file_limit, user_begin) < 0) ||
-        (within_interval && user_cmp->Compare(file_start, user_begin) < 0)) {
+    auto& smallest = f->file_metadata->smallest;
+    auto& largest = f->file_metadata->largest;
+    if ((!within_interval && sstableKeyCompare(user_cmp, begin, largest) > 0) ||
+        (within_interval && sstableKeyCompare(user_cmp, begin, smallest) > 0)) {
       min = mid + 1;
     } else if ((!within_interval &&
-                user_cmp->Compare(user_end, file_start) < 0) ||
+                sstableKeyCompare(user_cmp, smallest, end) > 0) ||
                (within_interval &&
-                user_cmp->Compare(user_end, file_limit) < 0)) {
+                sstableKeyCompare(user_cmp, largest, end) > 0)) {
       max = mid - 1;
     } else {
       foundOverlap = true;
@@ -1765,6 +2240,9 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
 
   // If there were no overlapping files, return immediately.
   if (!foundOverlap) {
+    if (next_smallest) {
+      next_smallest = nullptr;
+    }
     return;
   }
   // returns the index where an overlap is found
@@ -1774,17 +2252,26 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
 
   int start_index, end_index;
   if (within_interval) {
-    ExtendFileRangeWithinInterval(level, user_begin, user_end, mid, &start_index,
-                                  &end_index);
+    ExtendFileRangeWithinInterval(level, begin, end, mid,
+                                  &start_index, &end_index);
   } else {
-    ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid,
+    ExtendFileRangeOverlappingInterval(level, begin, end, mid,
                                        &start_index, &end_index);
+    assert(end_index >= start_index);
   }
-  assert(end_index >= start_index);
   // insert overlapping files into vector
   for (int i = start_index; i <= end_index; i++) {
     inputs->push_back(files_[level][i]);
   }
+
+  if (next_smallest != nullptr) {
+    // Provide the next key outside the range covered by inputs
+    if (++end_index < static_cast<int>(files_[level].size())) {
+      **next_smallest = files_[level][end_index]->smallest;
+    } else {
+      *next_smallest = nullptr;
+    }
+  }
 }
 
 // Store in *start_index and *end_index the range of all files in
@@ -1794,33 +2281,41 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
 // and forward to find all overlapping files.
 // Use FileLevel in searching, make it faster
 void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
-    int level, const Slice& user_begin, const Slice& user_end,
+    int level, const InternalKey* begin, const InternalKey* end,
     unsigned int mid_index, int* start_index, int* end_index) const {
-  const Comparator* user_cmp = user_comparator_;
+  auto user_cmp = user_comparator_;
   const FdWithKeyRange* files = level_files_brief_[level].files;
 #ifndef NDEBUG
   {
     // assert that the file at mid_index overlaps with the range
     assert(mid_index < level_files_brief_[level].num_files);
     const FdWithKeyRange* f = &files[mid_index];
-    const Slice fstart = ExtractUserKey(f->smallest_key);
-    const Slice flimit = ExtractUserKey(f->largest_key);
-    if (user_cmp->Compare(fstart, user_begin) >= 0) {
-      assert(user_cmp->Compare(fstart, user_end) <= 0);
+    auto& smallest = f->file_metadata->smallest;
+    auto& largest = f->file_metadata->largest;
+    if (sstableKeyCompare(user_cmp, begin, smallest) <= 0) {
+      assert(sstableKeyCompare(user_cmp, smallest, end) <= 0);
     } else {
-      assert(user_cmp->Compare(flimit, user_begin) >= 0);
+      // fprintf(stderr, "ExtendFileRangeOverlappingInterval\n%s - %s\n%s - %s\n%d %d\n",
+      //         begin ? begin->DebugString().c_str() : "(null)",
+      //         end ? end->DebugString().c_str() : "(null)",
+      //         smallest->DebugString().c_str(),
+      //         largest->DebugString().c_str(),
+      //         sstableKeyCompare(user_cmp, smallest, begin),
+      //         sstableKeyCompare(user_cmp, largest, begin));
+      assert(sstableKeyCompare(user_cmp, begin, largest) <= 0);
     }
   }
 #endif
   *start_index = mid_index + 1;
   *end_index = mid_index;
-  int count __attribute__((unused)) = 0;
+  int count __attribute__((__unused__));
+  count = 0;
 
   // check backwards from 'mid' to lower indices
   for (int i = mid_index; i >= 0 ; i--) {
     const FdWithKeyRange* f = &files[i];
-    const Slice file_limit = ExtractUserKey(f->largest_key);
-    if (user_cmp->Compare(file_limit, user_begin) >= 0) {
+    auto& largest = f->file_metadata->largest;
+    if (sstableKeyCompare(user_cmp, begin, largest) <= 0) {
       *start_index = i;
       assert((count++, true));
     } else {
@@ -1831,8 +2326,8 @@ void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
   for (unsigned int i = mid_index+1;
        i < level_files_brief_[level].num_files; i++) {
     const FdWithKeyRange* f = &files[i];
-    const Slice file_start = ExtractUserKey(f->smallest_key);
-    if (user_cmp->Compare(file_start, user_end) <= 0) {
+    auto& smallest = f->file_metadata->smallest;
+    if (sstableKeyCompare(user_cmp, smallest, end) <= 0) {
       assert((count++, true));
       *end_index = i;
     } else {
@@ -1850,39 +2345,36 @@ void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
 // the clean range required.
 // Use FileLevel in searching, make it faster
 void VersionStorageInfo::ExtendFileRangeWithinInterval(
-    int level, const Slice& user_begin, const Slice& user_end,
+    int level, const InternalKey* begin, const InternalKey* end,
     unsigned int mid_index, int* start_index, int* end_index) const {
   assert(level != 0);
-  const Comparator* user_cmp = user_comparator_;
+  auto* user_cmp = user_comparator_;
   const FdWithKeyRange* files = level_files_brief_[level].files;
 #ifndef NDEBUG
   {
     // assert that the file at mid_index is within the range
     assert(mid_index < level_files_brief_[level].num_files);
     const FdWithKeyRange* f = &files[mid_index];
-    const Slice fstart = ExtractUserKey(f->smallest_key);
-    const Slice flimit = ExtractUserKey(f->largest_key);
-    assert(user_cmp->Compare(fstart, user_begin) >= 0 &&
-           user_cmp->Compare(flimit, user_end) <= 0);
+    auto& smallest = f->file_metadata->smallest;
+    auto& largest = f->file_metadata->largest;
+    assert(sstableKeyCompare(user_cmp, begin, smallest) <= 0 &&
+           sstableKeyCompare(user_cmp, largest, end) <= 0);
   }
 #endif
-  ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid_index,
+  ExtendFileRangeOverlappingInterval(level, begin, end, mid_index,
                                      start_index, end_index);
   int left = *start_index;
   int right = *end_index;
   // shrink from left to right
   while (left <= right) {
-    const Slice& first_key_in_range = ExtractUserKey(files[left].smallest_key);
-    if (user_cmp->Compare(first_key_in_range, user_begin) < 0) {
+    auto& smallest = files[left].file_metadata->smallest;
+    if (sstableKeyCompare(user_cmp, begin, smallest) > 0) {
       left++;
       continue;
     }
     if (left > 0) {  // If not first file
-      const Slice& last_key_before =
-          ExtractUserKey(files[left - 1].largest_key);
-      if (user_cmp->Equal(first_key_in_range, last_key_before)) {
-        // The first user key in range overlaps with the previous file's last
-        // key
+      auto& largest = files[left - 1].file_metadata->largest;
+      if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
         left++;
         continue;
       }
@@ -1891,16 +2383,15 @@ void VersionStorageInfo::ExtendFileRangeWithinInterval(
   }
   // shrink from right to left
   while (left <= right) {
-    const Slice last_key_in_range = ExtractUserKey(files[right].largest_key);
-    if (user_cmp->Compare(last_key_in_range, user_end) > 0) {
+    auto& largest = files[right].file_metadata->largest;
+    if (sstableKeyCompare(user_cmp, largest, end) > 0) {
       right--;
       continue;
     }
     if (right < static_cast<int>(level_files_brief_[level].num_files) -
                     1) {  // If not the last file
-      const Slice first_key_after =
-          ExtractUserKey(files[right + 1].smallest_key);
-      if (user_cmp->Equal(last_key_in_range, first_key_after)) {
+      auto& smallest = files[right + 1].file_metadata->smallest;
+      if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
         // The last user key in range overlaps with the next file's first key
         right--;
         continue;
@@ -1961,7 +2452,7 @@ const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
     AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
     int ret = snprintf(scratch->buffer + len, sz,
                        "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
-                       f->fd.GetNumber(), f->smallest_seqno, sztxt,
+                       f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
                        static_cast<int>(f->being_compacted));
     if (ret < 0 || ret >= sz)
       break;
@@ -2108,7 +2599,11 @@ void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
           level_size = MultiplyCheckOverflow(
               level_size, options.max_bytes_for_level_multiplier);
         }
-        level_max_bytes_[i] = level_size;
+        // Don't set any level below base_bytes_max. Otherwise, the LSM can
+        // assume an hourglass shape where L1+ sizes are smaller than L0. This
+        // causes compaction scoring, which depends on level sizes, to favor L1+
+        // at the expense of L0, which may fill up and stall.
+        level_max_bytes_[i] = std::max(level_size, base_bytes_max);
       }
     }
   }
@@ -2148,6 +2643,36 @@ uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
   return size;
 }
 
+bool VersionStorageInfo::RangeMightExistAfterSortedRun(
+    const Slice& smallest_key, const Slice& largest_key, int last_level,
+    int last_l0_idx) {
+  assert((last_l0_idx != -1) == (last_level == 0));
+  // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
+  // bottommost only if it's the oldest L0 file and there are no files on older
+  // levels. It'd be better to consider it bottommost if there's no overlap in
+  // older levels/files.
+  if (last_level == 0 &&
+      last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
+    return true;
+  }
+
+  // Checks whether there are files living beyond the `last_level`. If lower
+  // levels have files, it checks for overlap between [`smallest_key`,
+  // `largest_key`] and those files. Bottomlevel optimizations can be made if
+  // there are no files in lower levels or if there is no overlap with the files
+  // in the lower levels.
+  for (int level = last_level + 1; level < num_levels(); level++) {
+    // The range is not in the bottommost level if there are files in lower
+    // levels when the `last_level` is 0 or if there are files in lower levels
+    // which overlap with [`smallest_key`, `largest_key`].
+    if (files_[level].size() > 0 &&
+        (last_level == 0 ||
+         OverlapInLevel(level, &smallest_key, &largest_key))) {
+      return true;
+    }
+  }
+  return false;
+}
 
 void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
   for (int level = 0; level < storage_info_.num_levels(); level++) {
@@ -2158,13 +2683,16 @@ void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
   }
 }
 
-std::string Version::DebugString(bool hex) const {
+std::string Version::DebugString(bool hex, bool print_stats) const {
   std::string r;
   for (int level = 0; level < storage_info_.num_levels_; level++) {
     // E.g.,
     //   --- level 1 ---
     //   17:123['a' .. 'd']
     //   20:43['e' .. 'g']
+    //
+    // if print_stats=true:
+    //   17:123['a' .. 'd'](4096)
     r.append("--- level ");
     AppendNumberTo(&r, level);
     r.append(" --- version# ");
@@ -2180,7 +2708,14 @@ std::string Version::DebugString(bool hex) const {
       r.append(files[i]->smallest.DebugString(hex));
       r.append(" .. ");
       r.append(files[i]->largest.DebugString(hex));
-      r.append("]\n");
+      r.append("]");
+      if (print_stats) {
+        r.append("(");
+        r.append(ToString(
+            files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
+        r.append(")");
+      }
+      r.append("\n");
     }
   }
   return r;
@@ -2192,34 +2727,41 @@ struct VersionSet::ManifestWriter {
   bool done;
   InstrumentedCondVar cv;
   ColumnFamilyData* cfd;
+  const MutableCFOptions mutable_cf_options;
   const autovector<VersionEdit*>& edit_list;
 
   explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
+                          const MutableCFOptions& cf_options,
                           const autovector<VersionEdit*>& e)
-      : done(false), cv(mu), cfd(_cfd), edit_list(e) {}
+      : done(false),
+        cv(mu),
+        cfd(_cfd),
+        mutable_cf_options(cf_options),
+        edit_list(e) {}
 };
 
 VersionSet::VersionSet(const std::string& dbname,
-                       const ImmutableDBOptions* db_options,
+                       const ImmutableDBOptions* _db_options,
                        const EnvOptions& storage_options, Cache* table_cache,
                        WriteBufferManager* write_buffer_manager,
                        WriteController* write_controller)
     : column_family_set_(
-          new ColumnFamilySet(dbname, db_options, storage_options, table_cache,
+          new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
                               write_buffer_manager, write_controller)),
-      env_(db_options->env),
+      env_(_db_options->env),
       dbname_(dbname),
-      db_options_(db_options),
+      db_options_(_db_options),
       next_file_number_(2),
       manifest_file_number_(0),  // Filled by Recover()
+      options_file_number_(0),
       pending_manifest_file_number_(0),
       last_sequence_(0),
+      last_allocated_sequence_(0),
+      last_published_sequence_(0),
       prev_log_number_(0),
       current_version_number_(0),
       manifest_file_size_(0),
-      env_options_(storage_options),
-      env_options_compactions_(
-          env_->OptimizeForCompactionTableRead(env_options_, *db_options_)) {}
+      env_options_(storage_options) {}
 
 void CloseTables(void* ptr, size_t) {
   TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
@@ -2229,11 +2771,15 @@ void CloseTables(void* ptr, size_t) {
 VersionSet::~VersionSet() {
   // we need to delete column_family_set_ because its destructor depends on
   // VersionSet
-  column_family_set_->get_table_cache()->ApplyToAllCacheEntries(&CloseTables,
-                                                                false);
+  Cache* table_cache = column_family_set_->get_table_cache();
+  table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */);
   column_family_set_.reset();
-  for (auto file : obsolete_files_) {
-    delete file;
+  for (auto& file : obsolete_files_) {
+    if (file.metadata->table_reader_handle) {
+      table_cache->Release(file.metadata->table_reader_handle);
+      TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
+    }
+    file.DeleteMetadata();
   }
   obsolete_files_.clear();
 }
@@ -2266,89 +2812,79 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
   v->next_->prev_ = v;
 }
 
-Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
-                               const MutableCFOptions& mutable_cf_options,
-                               const autovector<VersionEdit*>& edit_list,
-                               InstrumentedMutex* mu, Directory* db_directory,
-                               bool new_descriptor_log,
-                               const ColumnFamilyOptions* new_cf_options) {
-  mu->AssertHeld();
-  // num of edits
-  auto num_edits = edit_list.size();
-  if (num_edits == 0) {
-    return Status::OK();
-  } else if (num_edits > 1) {
-#ifndef NDEBUG
-    // no group commits for column family add or drop
-    for (auto& edit : edit_list) {
-      assert(!edit->IsColumnFamilyManipulation());
-    }
-#endif
-  }
-
-  // column_family_data can be nullptr only if this is column_family_add.
-  // in that case, we also need to specify ColumnFamilyOptions
-  if (column_family_data == nullptr) {
-    assert(num_edits == 1);
-    assert(edit_list[0]->is_column_family_add_);
-    assert(new_cf_options != nullptr);
-  }
+Status VersionSet::ProcessManifestWrites(
+    std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
+    Directory* db_directory, bool new_descriptor_log,
+    const ColumnFamilyOptions* new_cf_options) {
+  assert(!writers.empty());
+  ManifestWriter& first_writer = writers.front();
+  ManifestWriter* last_writer = &first_writer;
 
-  // queue our request
-  ManifestWriter w(mu, column_family_data, edit_list);
-  manifest_writers_.push_back(&w);
-  while (!w.done && &w != manifest_writers_.front()) {
-    w.cv.Wait();
-  }
-  if (w.done) {
-    return w.status;
-  }
-  if (column_family_data != nullptr && column_family_data->IsDropped()) {
-    // if column family is dropped by the time we get here, no need to write
-    // anything to the manifest
-    manifest_writers_.pop_front();
-    // Notify new head of write queue
-    if (!manifest_writers_.empty()) {
-      manifest_writers_.front()->cv.Signal();
-    }
-    // we steal this code to also inform about cf-drop
-    return Status::ShutdownInProgress();
-  }
+  assert(!manifest_writers_.empty());
+  assert(manifest_writers_.front() == &first_writer);
 
   autovector<VersionEdit*> batch_edits;
-  Version* v = nullptr;
-  std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(nullptr);
-
-  // process all requests in the queue
-  ManifestWriter* last_writer = &w;
-  assert(!manifest_writers_.empty());
-  assert(manifest_writers_.front() == &w);
-  if (w.edit_list.front()->IsColumnFamilyManipulation()) {
-    // no group commits for column family add or drop
-    LogAndApplyCFHelper(w.edit_list.front());
-    batch_edits.push_back(w.edit_list.front());
+  autovector<Version*> versions;
+  autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
+  std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
+
+  if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
+    // No group commits for column family add or drop
+    LogAndApplyCFHelper(first_writer.edit_list.front());
+    batch_edits.push_back(first_writer.edit_list.front());
   } else {
-    v = new Version(column_family_data, this, current_version_number_++);
-    builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data));
-    auto* builder = builder_guard->version_builder();
-    for (const auto& writer : manifest_writers_) {
-      if (writer->edit_list.front()->IsColumnFamilyManipulation() ||
-          writer->cfd->GetID() != column_family_data->GetID()) {
+    auto it = manifest_writers_.cbegin();
+    while (it != manifest_writers_.cend()) {
+      if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
         // no group commits for column family add or drop
-        // also, group commits across column families are not supported
         break;
       }
-      last_writer = writer;
-      for (const auto& edit : writer->edit_list) {
-        LogAndApplyHelper(column_family_data, builder, v, edit, mu);
-        batch_edits.push_back(edit);
+      last_writer = *(it++);
+      assert(last_writer != nullptr);
+      assert(last_writer->cfd != nullptr);
+      if (last_writer->cfd != nullptr && last_writer->cfd->IsDropped()) {
+        continue;
+      }
+      // We do a linear search on versions because versions is small.
+      // TODO(yanqin) maybe consider unordered_map
+      Version* version = nullptr;
+      VersionBuilder* builder = nullptr;
+      for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
+        uint32_t cf_id = last_writer->cfd->GetID();
+        if (versions[i]->cfd()->GetID() == cf_id) {
+          version = versions[i];
+          assert(!builder_guards.empty() &&
+                 builder_guards.size() == versions.size());
+          builder = builder_guards[i]->version_builder();
+          TEST_SYNC_POINT_CALLBACK(
+              "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
+          break;
+        }
+      }
+      if (version == nullptr) {
+        version = new Version(last_writer->cfd, this, env_options_,
+                              last_writer->mutable_cf_options,
+                              current_version_number_++);
+        versions.push_back(version);
+        mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
+        builder_guards.emplace_back(
+            new BaseReferencedVersionBuilder(last_writer->cfd));
+        builder = builder_guards.back()->version_builder();
+      }
+      assert(builder != nullptr);  // make checker happy
+      for (const auto& e : last_writer->edit_list) {
+        LogAndApplyHelper(last_writer->cfd, builder, version, e, mu);
+        batch_edits.push_back(e);
       }
     }
-    builder->SaveTo(v->storage_info());
+    for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
+      assert(!builder_guards.empty() &&
+             builder_guards.size() == versions.size());
+      auto* builder = builder_guards[i]->version_builder();
+      builder->SaveTo(versions[i]->storage_info());
+    }
   }
 
-  // Initialize new descriptor log file if necessary by creating
-  // a temporary file that contains a snapshot of the current version.
   uint64_t new_manifest_file_size = 0;
   Status s;
 
@@ -2363,65 +2899,71 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
   }
 
   if (new_descriptor_log) {
-    // if we're writing out new snapshot make sure to persist max column family
+    // if we are writing out new snapshot make sure to persist max column
+    // family.
     if (column_family_set_->GetMaxColumnFamily() > 0) {
-      w.edit_list.front()->SetMaxColumnFamily(
+      first_writer.edit_list.front()->SetMaxColumnFamily(
           column_family_set_->GetMaxColumnFamily());
     }
   }
 
-  // Unlock during expensive operations. New writes cannot get here
-  // because &w is ensuring that all new writes get queued.
   {
-
+    EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
     mu->Unlock();
 
     TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
-    if (!w.edit_list.front()->IsColumnFamilyManipulation() &&
-        db_options_->max_open_files == -1) {
-      // unlimited table cache. Pre-load table handle now.
-      // Need to do it out of the mutex.
-      builder_guard->version_builder()->LoadTableHandlers(
-          column_family_data->internal_stats(),
-          column_family_data->ioptions()->optimize_filters_for_hits,
-          true /* prefetch_index_and_filter_in_cache */);
+    if (!first_writer.edit_list.front()->IsColumnFamilyManipulation() &&
+        column_family_set_->get_table_cache()->GetCapacity() ==
+            TableCache::kInfiniteCapacity) {
+      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
+        assert(!builder_guards.empty() &&
+               builder_guards.size() == versions.size());
+        assert(!mutable_cf_options_ptrs.empty() &&
+               builder_guards.size() == versions.size());
+        ColumnFamilyData* cfd = versions[i]->cfd_;
+        builder_guards[i]->version_builder()->LoadTableHandlers(
+            cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
+            true /* prefetch_index_and_filter_in_cache */,
+            mutable_cf_options_ptrs[i]->prefix_extractor.get());
+      }
     }
 
     // This is fine because everything inside of this block is serialized --
     // only one thread can be here at the same time
     if (new_descriptor_log) {
-      // create manifest file
+      // create new manifest file
       ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
                      pending_manifest_file_number_);
+      std::string descriptor_fname =
+          DescriptorFileName(dbname_, pending_manifest_file_number_);
       unique_ptr<WritableFile> descriptor_file;
-      EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
-      s = NewWritableFile(
-          env_, DescriptorFileName(dbname_, pending_manifest_file_number_),
-          &descriptor_file, opt_env_opts);
+      s = NewWritableFile(env_, descriptor_fname, &descriptor_file,
+                          opt_env_opts);
       if (s.ok()) {
         descriptor_file->SetPreallocationBlockSize(
             db_options_->manifest_preallocation_size);
 
-        unique_ptr<WritableFileWriter> file_writer(
-            new WritableFileWriter(std::move(descriptor_file), opt_env_opts));
+        unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+            std::move(descriptor_file), descriptor_fname, opt_env_opts));
         descriptor_log_.reset(
             new log::Writer(std::move(file_writer), 0, false));
         s = WriteSnapshot(descriptor_log_.get());
       }
     }
 
-    if (!w.edit_list.front()->IsColumnFamilyManipulation()) {
-      // This is cpu-heavy operations, which should be called outside mutex.
-      v->PrepareApply(mutable_cf_options, true);
+    if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
+      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
+        versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
+      }
     }
 
-    // Write new record to MANIFEST log
+    // Write new records to MANIFEST log
     if (s.ok()) {
       for (auto& e : batch_edits) {
         std::string record;
         if (!e->EncodeTo(&record)) {
-          s = Status::Corruption(
-              "Unable to Encode VersionEdit:" + e->DebugString(true));
+          s = Status::Corruption("Unable to encode VersionEdit:" +
+                                 e->DebugString(true));
           break;
         }
         TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
@@ -2435,7 +2977,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
         s = SyncManifest(env_, db_options_, descriptor_log_->file());
       }
       if (!s.ok()) {
-        ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write: %s\n",
+        ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
                         s.ToString().c_str());
       }
     }
@@ -2452,7 +2994,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
       new_manifest_file_size = descriptor_log_->file()->GetFileSize();
     }
 
-    if (w.edit_list.front()->is_column_family_drop_) {
+    if (first_writer.edit_list.front()->is_column_family_drop_) {
       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
@@ -2463,88 +3005,207 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
     mu->Lock();
   }
 
-  // Append the old mainfest file to the obsolete_manifests_ list to be deleted
+  // Append the old manifest file to the obsolete_manifest_ list to be deleted
   // by PurgeObsoleteFiles later.
   if (s.ok() && new_descriptor_log) {
     obsolete_manifests_.emplace_back(
         DescriptorFileName("", manifest_file_number_));
   }
 
-  // Install the new version
+  // Install the new versions
   if (s.ok()) {
-    if (w.edit_list.front()->is_column_family_add_) {
-      // no group commit on column family add
+    if (first_writer.edit_list.front()->is_column_family_add_) {
       assert(batch_edits.size() == 1);
       assert(new_cf_options != nullptr);
-      CreateColumnFamily(*new_cf_options, w.edit_list.front());
-    } else if (w.edit_list.front()->is_column_family_drop_) {
+      CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
+    } else if (first_writer.edit_list.front()->is_column_family_drop_) {
       assert(batch_edits.size() == 1);
-      column_family_data->SetDropped();
-      if (column_family_data->Unref()) {
-        delete column_family_data;
+      first_writer.cfd->SetDropped();
+      if (first_writer.cfd->Unref()) {
+        delete first_writer.cfd;
       }
     } else {
-      uint64_t max_log_number_in_batch  = 0;
+      // Each version in versions corresponds to a column family.
+      // For each column family, update its log number indicating that logs
+      // with number smaller than this should be ignored.
+      for (const auto version : versions) {
+        uint64_t max_log_number_in_batch = 0;
+        uint32_t cf_id = version->cfd_->GetID();
+        for (const auto& e : batch_edits) {
+          if (e->has_log_number_ && e->column_family_ == cf_id) {
+            max_log_number_in_batch =
+                std::max(max_log_number_in_batch, e->log_number_);
+          }
+        }
+        if (max_log_number_in_batch != 0) {
+          assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
+          version->cfd_->SetLogNumber(max_log_number_in_batch);
+        }
+      }
+
+      uint64_t last_min_log_number_to_keep = 0;
       for (auto& e : batch_edits) {
-        if (e->has_log_number_) {
-          max_log_number_in_batch =
-              std::max(max_log_number_in_batch, e->log_number_);
+        if (e->has_min_log_number_to_keep_) {
+          last_min_log_number_to_keep =
+              std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
         }
       }
-      if (max_log_number_in_batch != 0) {
-        assert(column_family_data->GetLogNumber() <= max_log_number_in_batch);
-        column_family_data->SetLogNumber(max_log_number_in_batch);
+
+      if (last_min_log_number_to_keep != 0) {
+        // Should only be set in 2PC mode.
+        MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
       }
-      AppendVersion(column_family_data, v);
-    }
 
+      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
+        ColumnFamilyData* cfd = versions[i]->cfd_;
+        AppendVersion(cfd, versions[i]);
+      }
+    }
     manifest_file_number_ = pending_manifest_file_number_;
     manifest_file_size_ = new_manifest_file_size;
-    prev_log_number_ = w.edit_list.front()->prev_log_number_;
+    prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
   } else {
     std::string version_edits;
     for (auto& e : batch_edits) {
-      version_edits = version_edits + "\n" + e->DebugString(true);
+      version_edits += ("\n" + e->DebugString(true));
+    }
+    ROCKS_LOG_ERROR(db_options_->info_log,
+                    "Error in committing version edit to MANIFEST: %s",
+                    version_edits.c_str());
+    for (auto v : versions) {
+      delete v;
     }
-    ROCKS_LOG_ERROR(
-        db_options_->info_log,
-        "[%s] Error in committing version edit to MANIFEST: %s",
-        column_family_data ? column_family_data->GetName().c_str() : "<null>",
-        version_edits.c_str());
-    delete v;
     if (new_descriptor_log) {
-      ROCKS_LOG_INFO(db_options_->info_log, "Deleting manifest %" PRIu64
-                                            " current manifest %" PRIu64 "\n",
+      ROCKS_LOG_INFO(db_options_->info_log,
+                     "Deleting manifest %" PRIu64 " current manifest %" PRIu64
+                     "\n",
                      manifest_file_number_, pending_manifest_file_number_);
       descriptor_log_.reset();
       env_->DeleteFile(
           DescriptorFileName(dbname_, pending_manifest_file_number_));
     }
   }
+
   pending_manifest_file_number_ = 0;
 
   // wake up all the waiting writers
   while (true) {
     ManifestWriter* ready = manifest_writers_.front();
     manifest_writers_.pop_front();
-    if (ready != &w) {
-      ready->status = s;
-      ready->done = true;
+    bool need_signal = true;
+    for (const auto& w : writers) {
+      if (&w == ready) {
+        need_signal = false;
+        break;
+      }
+    }
+    ready->status = s;
+    ready->done = true;
+    if (need_signal) {
       ready->cv.Signal();
     }
-    if (ready == last_writer) break;
+    if (ready == last_writer) {
+      break;
+    }
   }
-  // Notify new head of write queue
   if (!manifest_writers_.empty()) {
     manifest_writers_.front()->cv.Signal();
   }
   return s;
 }
 
+// 'datas' is gramatically incorrect. We still use this notation is to indicate
+// that this variable represents a collection of column_family_data.
+Status VersionSet::LogAndApply(
+    const std::vector<ColumnFamilyData*>& column_family_datas,
+    const std::vector<MutableCFOptions>& mutable_cf_options_list,
+    const std::vector<autovector<VersionEdit*>>& edit_lists,
+    InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log,
+    const ColumnFamilyOptions* new_cf_options) {
+  mu->AssertHeld();
+  int num_edits = 0;
+  for (const auto& elist : edit_lists) {
+    num_edits += static_cast<int>(elist.size());
+  }
+  if (num_edits == 0) {
+    return Status::OK();
+  } else if (num_edits > 1) {
+#ifndef NDEBUG
+    for (const auto& edit_list : edit_lists) {
+      for (const auto& edit : edit_list) {
+        assert(!edit->IsColumnFamilyManipulation());
+      }
+    }
+#endif /* ! NDEBUG */
+  }
+
+  int num_cfds = static_cast<int>(column_family_datas.size());
+  if (num_cfds == 1 && column_family_datas[0] == nullptr) {
+    assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
+    assert(edit_lists[0][0]->is_column_family_add_);
+    assert(new_cf_options != nullptr);
+  }
+  std::deque<ManifestWriter> writers;
+  if (num_cfds > 0) {
+    assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
+    assert(static_cast<size_t>(num_cfds) == edit_lists.size());
+  }
+  for (int i = 0; i < num_cfds; ++i) {
+    writers.emplace_back(mu, column_family_datas[i], mutable_cf_options_list[i],
+                         edit_lists[i]);
+    manifest_writers_.push_back(&writers[i]);
+  }
+  assert(!writers.empty());
+  ManifestWriter& first_writer = writers.front();
+  while (!first_writer.done && &first_writer != manifest_writers_.front()) {
+    first_writer.cv.Wait();
+  }
+  if (first_writer.done) {
+    // All non-CF-manipulation operations can be grouped together and committed
+    // to MANIFEST. They should all have finished. The status code is stored in
+    // the first manifest writer.
+#ifndef NDEBUG
+    for (const auto& writer : writers) {
+      assert(writer.done);
+    }
+#endif /* !NDEBUG */
+    return first_writer.status;
+  }
+
+  int num_undropped_cfds = 0;
+  for (auto cfd : column_family_datas) {
+    // if cfd == nullptr, it is a column family add.
+    if (cfd == nullptr || !cfd->IsDropped()) {
+      ++num_undropped_cfds;
+    }
+  }
+  if (0 == num_undropped_cfds) {
+    // TODO (yanqin) maybe use a different status code to denote column family
+    // drop other than OK and ShutdownInProgress
+    for (int i = 0; i != num_cfds; ++i) {
+      manifest_writers_.pop_front();
+    }
+    // Notify new head of manifest write queue.
+    if (!manifest_writers_.empty()) {
+      manifest_writers_.front()->cv.Signal();
+    }
+    return Status::OK();
+  }
+
+  return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
+                               new_cf_options);
+}
+
 void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
   assert(edit->IsColumnFamilyManipulation());
   edit->SetNextFile(next_file_number_.load());
-  edit->SetLastSequence(last_sequence_);
+  // The log might have data that is not visible to memtbale and hence have not
+  // updated the last_sequence_ yet. It is also possible that the log has is
+  // expecting some new data that is not written yet. Since LastSequence is an
+  // upper bound on the sequence, it is ok to record
+  // last_allocated_sequence_ as the last sequence.
+  edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
+                                                      : last_sequence_);
   if (edit->is_column_family_drop_) {
     // if we drop column family, we have to make sure to save max column family,
     // so that we don't reuse existing ID
@@ -2553,8 +3214,11 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
 }
 
 void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
-                                   VersionBuilder* builder, Version* v,
+                                   VersionBuilder* builder, Version* /*v*/,
                                    VersionEdit* edit, InstrumentedMutex* mu) {
+#ifdef NDEBUG
+  (void)cfd;
+#endif
   mu->AssertHeld();
   assert(!edit->IsColumnFamilyManipulation());
 
@@ -2567,11 +3231,144 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
     edit->SetPrevLogNumber(prev_log_number_);
   }
   edit->SetNextFile(next_file_number_.load());
-  edit->SetLastSequence(last_sequence_);
+  // The log might have data that is not visible to memtbale and hence have not
+  // updated the last_sequence_ yet. It is also possible that the log has is
+  // expecting some new data that is not written yet. Since LastSequence is an
+  // upper bound on the sequence, it is ok to record
+  // last_allocated_sequence_ as the last sequence.
+  edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
+                                                      : last_sequence_);
 
   builder->Apply(edit);
 }
 
+Status VersionSet::ApplyOneVersionEdit(
+    VersionEdit& edit,
+    const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
+    std::unordered_map<int, std::string>& column_families_not_found,
+    std::unordered_map<uint32_t, BaseReferencedVersionBuilder*>& builders,
+    bool* have_log_number, uint64_t* /* log_number */,
+    bool* have_prev_log_number, uint64_t* previous_log_number,
+    bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
+    SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
+    uint32_t* max_column_family) {
+  // Not found means that user didn't supply that column
+  // family option AND we encountered column family add
+  // record. Once we encounter column family drop record,
+  // we will delete the column family from
+  // column_families_not_found.
+  bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
+                          column_families_not_found.end());
+  // in builders means that user supplied that column family
+  // option AND that we encountered column family add record
+  bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
+
+  // they can't both be true
+  assert(!(cf_in_not_found && cf_in_builders));
+
+  ColumnFamilyData* cfd = nullptr;
+
+  if (edit.is_column_family_add_) {
+    if (cf_in_builders || cf_in_not_found) {
+      return Status::Corruption(
+          "Manifest adding the same column family twice: " +
+          edit.column_family_name_);
+    }
+    auto cf_options = name_to_options.find(edit.column_family_name_);
+    if (cf_options == name_to_options.end()) {
+      column_families_not_found.insert(
+          {edit.column_family_, edit.column_family_name_});
+    } else {
+      cfd = CreateColumnFamily(cf_options->second, &edit);
+      cfd->set_initialized();
+      builders.insert(
+          {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
+    }
+  } else if (edit.is_column_family_drop_) {
+    if (cf_in_builders) {
+      auto builder = builders.find(edit.column_family_);
+      assert(builder != builders.end());
+      delete builder->second;
+      builders.erase(builder);
+      cfd = column_family_set_->GetColumnFamily(edit.column_family_);
+      assert(cfd != nullptr);
+      if (cfd->Unref()) {
+        delete cfd;
+        cfd = nullptr;
+      } else {
+        // who else can have reference to cfd!?
+        assert(false);
+      }
+    } else if (cf_in_not_found) {
+      column_families_not_found.erase(edit.column_family_);
+    } else {
+      return Status::Corruption(
+          "Manifest - dropping non-existing column family");
+    }
+  } else if (!cf_in_not_found) {
+    if (!cf_in_builders) {
+      return Status::Corruption(
+          "Manifest record referencing unknown column family");
+    }
+
+    cfd = column_family_set_->GetColumnFamily(edit.column_family_);
+    // this should never happen since cf_in_builders is true
+    assert(cfd != nullptr);
+
+    // if it is not column family add or column family drop,
+    // then it's a file add/delete, which should be forwarded
+    // to builder
+    auto builder = builders.find(edit.column_family_);
+    assert(builder != builders.end());
+    builder->second->version_builder()->Apply(&edit);
+  }
+
+  if (cfd != nullptr) {
+    if (edit.has_log_number_) {
+      if (cfd->GetLogNumber() > edit.log_number_) {
+        ROCKS_LOG_WARN(
+            db_options_->info_log,
+            "MANIFEST corruption detected, but ignored - Log numbers in "
+            "records NOT monotonically increasing");
+      } else {
+        cfd->SetLogNumber(edit.log_number_);
+        *have_log_number = true;
+      }
+    }
+    if (edit.has_comparator_ &&
+        edit.comparator_ != cfd->user_comparator()->Name()) {
+      return Status::InvalidArgument(
+          cfd->user_comparator()->Name(),
+          "does not match existing comparator " + edit.comparator_);
+    }
+  }
+
+  if (edit.has_prev_log_number_) {
+    *previous_log_number = edit.prev_log_number_;
+    *have_prev_log_number = true;
+  }
+
+  if (edit.has_next_file_number_) {
+    *next_file = edit.next_file_number_;
+    *have_next_file = true;
+  }
+
+  if (edit.has_max_column_family_) {
+    *max_column_family = edit.max_column_family_;
+  }
+
+  if (edit.has_min_log_number_to_keep_) {
+    *min_log_number_to_keep =
+        std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_);
+  }
+
+  if (edit.has_last_sequence_) {
+    *last_sequence = edit.last_sequence_;
+    *have_last_sequence = true;
+  }
+  return Status::OK();
+}
+
 Status VersionSet::Recover(
     const std::vector<ColumnFamilyDescriptor>& column_families,
     bool read_only) {
@@ -2613,12 +3410,12 @@ Status VersionSet::Recover(
   {
     unique_ptr<SequentialFile> manifest_file;
     s = env_->NewSequentialFile(manifest_filename, &manifest_file,
-                                env_options_);
+                                env_->OptimizeForManifestRead(env_options_));
     if (!s.ok()) {
       return s;
     }
     manifest_file_reader.reset(
-        new SequentialFileReader(std::move(manifest_file)));
+        new SequentialFileReader(std::move(manifest_file), manifest_filename));
   }
   uint64_t current_manifest_file_size;
   s = env_->GetFileSize(manifest_filename, &current_manifest_file_size);
@@ -2635,6 +3432,7 @@ Status VersionSet::Recover(
   uint64_t log_number = 0;
   uint64_t previous_log_number = 0;
   uint32_t max_column_family = 0;
+  uint64_t min_log_number_to_keep = 0;
   std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
 
   // add default column family
@@ -2647,15 +3445,20 @@ Status VersionSet::Recover(
   default_cf_edit.SetColumnFamily(0);
   ColumnFamilyData* default_cfd =
       CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
+  // In recovery, nobody else can access it, so it's fine to set it to be
+  // initialized earlier.
+  default_cfd->set_initialized();
   builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
 
   {
     VersionSet::LogReporter reporter;
     reporter.status = &s;
-    log::Reader reader(NULL, std::move(manifest_file_reader), &reporter,
-                       true /*checksum*/, 0 /*initial_offset*/, 0);
+    log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
+                       true /* checksum */, 0 /* log_number */);
     Slice record;
     std::string scratch;
+    std::vector<VersionEdit> replay_buffer;
+    size_t num_entries_decoded = 0;
     while (reader.ReadRecord(&record, &scratch) && s.ok()) {
       VersionEdit edit;
       s = edit.DecodeFrom(record);
@@ -2663,122 +3466,44 @@ Status VersionSet::Recover(
         break;
       }
 
-      // Not found means that user didn't supply that column
-      // family option AND we encountered column family add
-      // record. Once we encounter column family drop record,
-      // we will delete the column family from
-      // column_families_not_found.
-      bool cf_in_not_found =
-          column_families_not_found.find(edit.column_family_) !=
-          column_families_not_found.end();
-      // in builders means that user supplied that column family
-      // option AND that we encountered column family add record
-      bool cf_in_builders =
-          builders.find(edit.column_family_) != builders.end();
-
-      // they can't both be true
-      assert(!(cf_in_not_found && cf_in_builders));
-
-      ColumnFamilyData* cfd = nullptr;
-
-      if (edit.is_column_family_add_) {
-        if (cf_in_builders || cf_in_not_found) {
-          s = Status::Corruption(
-              "Manifest adding the same column family twice");
-          break;
-        }
-        auto cf_options = cf_name_to_options.find(edit.column_family_name_);
-        if (cf_options == cf_name_to_options.end()) {
-          column_families_not_found.insert(
-              {edit.column_family_, edit.column_family_name_});
-        } else {
-          cfd = CreateColumnFamily(cf_options->second, &edit);
-          builders.insert(
-              {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
-        }
-      } else if (edit.is_column_family_drop_) {
-        if (cf_in_builders) {
-          auto builder = builders.find(edit.column_family_);
-          assert(builder != builders.end());
-          delete builder->second;
-          builders.erase(builder);
-          cfd = column_family_set_->GetColumnFamily(edit.column_family_);
-          if (cfd->Unref()) {
-            delete cfd;
-            cfd = nullptr;
-          } else {
-            // who else can have reference to cfd!?
-            assert(false);
-          }
-        } else if (cf_in_not_found) {
-          column_families_not_found.erase(edit.column_family_);
-        } else {
-          s = Status::Corruption(
-              "Manifest - dropping non-existing column family");
-          break;
-        }
-      } else if (!cf_in_not_found) {
-        if (!cf_in_builders) {
-          s = Status::Corruption(
-              "Manifest record referencing unknown column family");
-          break;
+      if (edit.is_in_atomic_group_) {
+        if (replay_buffer.empty()) {
+          replay_buffer.resize(edit.remaining_entries_ + 1);
         }
-
-        cfd = column_family_set_->GetColumnFamily(edit.column_family_);
-        // this should never happen since cf_in_builders is true
-        assert(cfd != nullptr);
-        if (edit.max_level_ >= cfd->current()->storage_info()->num_levels()) {
-          s = Status::InvalidArgument(
-              "db has more levels than options.num_levels");
-          break;
+        ++num_entries_decoded;
+        if (num_entries_decoded + edit.remaining_entries_ !=
+            static_cast<uint32_t>(replay_buffer.size())) {
+          return Status::Corruption("corrupted atomic group");
         }
-
-        // if it is not column family add or column family drop,
-        // then it's a file add/delete, which should be forwarded
-        // to builder
-        auto builder = builders.find(edit.column_family_);
-        assert(builder != builders.end());
-        builder->second->version_builder()->Apply(&edit);
-      }
-
-      if (cfd != nullptr) {
-        if (edit.has_log_number_) {
-          if (cfd->GetLogNumber() > edit.log_number_) {
-            ROCKS_LOG_WARN(
-                db_options_->info_log,
-                "MANIFEST corruption detected, but ignored - Log numbers in "
-                "records NOT monotonically increasing");
-          } else {
-            cfd->SetLogNumber(edit.log_number_);
-            have_log_number = true;
+        replay_buffer[num_entries_decoded - 1] = std::move(edit);
+        if (num_entries_decoded == replay_buffer.size()) {
+          for (auto& e : replay_buffer) {
+            s = ApplyOneVersionEdit(
+                e, cf_name_to_options, column_families_not_found, builders,
+                &have_log_number, &log_number, &have_prev_log_number,
+                &previous_log_number, &have_next_file, &next_file,
+                &have_last_sequence, &last_sequence, &min_log_number_to_keep,
+                &max_column_family);
+            if (!s.ok()) {
+              break;
+            }
           }
+          replay_buffer.clear();
+          num_entries_decoded = 0;
         }
-        if (edit.has_comparator_ &&
-            edit.comparator_ != cfd->user_comparator()->Name()) {
-          s = Status::InvalidArgument(
-              cfd->user_comparator()->Name(),
-              "does not match existing comparator " + edit.comparator_);
-          break;
+      } else {
+        if (!replay_buffer.empty()) {
+          return Status::Corruption("corrupted atomic group");
         }
+        s = ApplyOneVersionEdit(
+            edit, cf_name_to_options, column_families_not_found, builders,
+            &have_log_number, &log_number, &have_prev_log_number,
+            &previous_log_number, &have_next_file, &next_file,
+            &have_last_sequence, &last_sequence, &min_log_number_to_keep,
+            &max_column_family);
       }
-
-      if (edit.has_prev_log_number_) {
-        previous_log_number = edit.prev_log_number_;
-        have_prev_log_number = true;
-      }
-
-      if (edit.has_next_file_number_) {
-        next_file = edit.next_file_number_;
-        have_next_file = true;
-      }
-
-      if (edit.has_max_column_family_) {
-        max_column_family = edit.max_column_family_;
-      }
-
-      if (edit.has_last_sequence_) {
-        last_sequence = edit.last_sequence_;
-        have_last_sequence = true;
+      if (!s.ok()) {
+        break;
       }
     }
   }
@@ -2798,8 +3523,11 @@ Status VersionSet::Recover(
 
     column_family_set_->UpdateMaxColumnFamily(max_column_family);
 
-    MarkFileNumberUsedDuringRecovery(previous_log_number);
-    MarkFileNumberUsedDuringRecovery(log_number);
+    // When reading DB generated using old release, min_log_number_to_keep=0.
+    // All log files will be scanned for potential prepare entries.
+    MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
+    MarkFileNumberUsed(previous_log_number);
+    MarkFileNumberUsed(log_number);
   }
 
   // there were some column families in the MANIFEST that weren't specified
@@ -2815,24 +3543,44 @@ Status VersionSet::Recover(
         list_of_not_found);
   }
 
+  if (s.ok()) {
+    for (auto cfd : *column_family_set_) {
+      assert(builders.count(cfd->GetID()) > 0);
+      auto* builder = builders[cfd->GetID()]->version_builder();
+      if (!builder->CheckConsistencyForNumLevels()) {
+        s = Status::InvalidArgument(
+            "db has more levels than options.num_levels");
+        break;
+      }
+    }
+  }
+
   if (s.ok()) {
     for (auto cfd : *column_family_set_) {
       if (cfd->IsDropped()) {
         continue;
       }
+      if (read_only) {
+        cfd->table_cache()->SetTablesAreImmortal();
+      }
+      assert(cfd->initialized());
       auto builders_iter = builders.find(cfd->GetID());
       assert(builders_iter != builders.end());
       auto* builder = builders_iter->second->version_builder();
 
-      if (db_options_->max_open_files == -1) {
+      if (GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
+          TableCache::kInfiniteCapacity) {
         // unlimited table cache. Pre-load table handle now.
         // Need to do it out of the mutex.
         builder->LoadTableHandlers(
             cfd->internal_stats(), db_options_->max_file_opening_threads,
-            false /* prefetch_index_and_filter_in_cache */);
+            false /* prefetch_index_and_filter_in_cache */,
+            cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
       }
 
-      Version* v = new Version(cfd, this, current_version_number_++);
+      Version* v = new Version(cfd, this, env_options_,
+                               *cfd->GetLatestMutableCFOptions(),
+                               current_version_number_++);
       builder->SaveTo(v->storage_info());
 
       // Install recovered version
@@ -2843,6 +3591,8 @@ Status VersionSet::Recover(
 
     manifest_file_size_ = current_manifest_file_size;
     next_file_number_.store(next_file + 1);
+    last_allocated_sequence_ = last_sequence;
+    last_published_sequence_ = last_sequence;
     last_sequence_ = last_sequence;
     prev_log_number_ = previous_log_number;
 
@@ -2852,11 +3602,12 @@ Status VersionSet::Recover(
         "manifest_file_number is %lu, next_file_number is %lu, "
         "last_sequence is %lu, log_number is %lu,"
         "prev_log_number is %lu,"
-        "max_column_family is %u\n",
+        "max_column_family is %u,"
+        "min_log_number_to_keep is %lu\n",
         manifest_filename.c_str(), (unsigned long)manifest_file_number_,
         (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
         (unsigned long)log_number, (unsigned long)prev_log_number_,
-        column_family_set_->GetMaxColumnFamily());
+        column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
 
     for (auto cfd : *column_family_set_) {
       if (cfd->IsDropped()) {
@@ -2900,7 +3651,7 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
   if (!s.ok()) {
     return s;
   }
-  file_reader.reset(new SequentialFileReader(std::move(file)));
+  file_reader.reset(new SequentialFileReader(std::move(file), dscname));
   }
 
   std::map<uint32_t, std::string> column_family_names;
@@ -2908,8 +3659,8 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
   column_family_names.insert({0, kDefaultColumnFamilyName});
   VersionSet::LogReporter reporter;
   reporter.status = &s;
-  log::Reader reader(NULL, std::move(file_reader), &reporter, true /*checksum*/,
-                     0 /*initial_offset*/, 0);
+  log::Reader reader(nullptr, std::move(file_reader), &reporter,
+                     true /* checksum */, 0 /* log_number */);
   Slice record;
   std::string scratch;
   while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@@ -3039,11 +3790,12 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
   Status s;
   {
     unique_ptr<SequentialFile> file;
-    s = options.env->NewSequentialFile(dscname, &file, env_options_);
+    s = options.env->NewSequentialFile(
+        dscname, &file, env_->OptimizeForManifestRead(env_options_));
     if (!s.ok()) {
       return s;
     }
-    file_reader.reset(new SequentialFileReader(std::move(file)));
+    file_reader.reset(new SequentialFileReader(std::move(file), dscname));
   }
 
   bool have_prev_log_number = false;
@@ -3067,8 +3819,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
   {
     VersionSet::LogReporter reporter;
     reporter.status = &s;
-    log::Reader reader(NULL, std::move(file_reader), &reporter,
-                       true /*checksum*/, 0 /*initial_offset*/, 0);
+    log::Reader reader(nullptr, std::move(file_reader), &reporter,
+                       true /* checksum */, 0 /* log_number */);
     Slice record;
     std::string scratch;
     while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@@ -3102,6 +3854,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
           break;
         }
         cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
+        cfd->set_initialized();
         builders.insert(
             {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
       } else if (edit.is_column_family_drop_) {
@@ -3142,6 +3895,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
         cfd->SetLogNumber(edit.log_number_);
       }
 
+
       if (edit.has_prev_log_number_) {
         previous_log_number = edit.prev_log_number_;
         have_prev_log_number = true;
@@ -3160,6 +3914,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
       if (edit.has_max_column_family_) {
         column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
       }
+
+      if (edit.has_min_log_number_to_keep_) {
+        MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
+      }
     }
   }
   file_reader.reset();
@@ -3187,7 +3945,9 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
       assert(builders_iter != builders.end());
       auto builder = builders_iter->second->version_builder();
 
-      Version* v = new Version(cfd, this, current_version_number_++);
+      Version* v = new Version(cfd, this, env_options_,
+                               *cfd->GetLatestMutableCFOptions(),
+                               current_version_number_++);
       builder->SaveTo(v->storage_info());
       v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
 
@@ -3210,29 +3970,40 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
     }
 
     next_file_number_.store(next_file + 1);
+    last_allocated_sequence_ = last_sequence;
+    last_published_sequence_ = last_sequence;
     last_sequence_ = last_sequence;
     prev_log_number_ = previous_log_number;
 
     printf(
         "next_file_number %lu last_sequence "
-        "%lu  prev_log_number %lu max_column_family %u\n",
+        "%lu  prev_log_number %lu max_column_family %u min_log_number_to_keep "
+        "%" PRIu64 "\n",
         (unsigned long)next_file_number_.load(), (unsigned long)last_sequence,
         (unsigned long)previous_log_number,
-        column_family_set_->GetMaxColumnFamily());
+        column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
   }
 
   return s;
 }
 #endif  // ROCKSDB_LITE
 
-void VersionSet::MarkFileNumberUsedDuringRecovery(uint64_t number) {
-  // only called during recovery which is single threaded, so this works because
-  // there can't be concurrent calls
+void VersionSet::MarkFileNumberUsed(uint64_t number) {
+  // only called during recovery and repair which are single threaded, so this
+  // works because there can't be concurrent calls
   if (next_file_number_.load(std::memory_order_relaxed) <= number) {
     next_file_number_.store(number + 1, std::memory_order_relaxed);
   }
 }
 
+// Called only either from ::LogAndApply which is protected by mutex or during
+// recovery which is single-threaded.
+void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
+  if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
+    min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
+  }
+}
+
 Status VersionSet::WriteSnapshot(log::Writer* log) {
   // TODO: Break up into multiple records to reduce memory usage on recovery?
 
@@ -3245,6 +4016,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
     if (cfd->IsDropped()) {
       continue;
     }
+    assert(cfd->initialized());
     {
       // Store column family info
       VersionEdit edit;
@@ -3277,7 +4049,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
              cfd->current()->storage_info()->LevelFiles(level)) {
           edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
                        f->fd.GetFileSize(), f->smallest, f->largest,
-                       f->smallest_seqno, f->largest_seqno,
+                       f->fd.smallest_seqno, f->fd.largest_seqno,
                        f->marked_for_compaction);
         }
       }
@@ -3397,8 +4169,9 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
     // approximate offset of "key" within the table.
     TableReader* table_reader_ptr;
     InternalIterator* iter = v->cfd_->table_cache()->NewIterator(
-        ReadOptions(), env_options_, v->cfd_->internal_comparator(), f.fd,
-        nullptr /* range_del_agg */, &table_reader_ptr);
+        ReadOptions(), v->env_options_, v->cfd_->internal_comparator(),
+        *f.file_metadata, nullptr /* range_del_agg */,
+        v->GetMutableCFOptions().prefix_extractor.get(), &table_reader_ptr);
     if (table_reader_ptr != nullptr) {
       result = table_reader_ptr->ApproximateOffsetOf(key);
     }
@@ -3411,6 +4184,9 @@ void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
   // pre-calculate space requirement
   int64_t total_files = 0;
   for (auto cfd : *column_family_set_) {
+    if (!cfd->initialized()) {
+      continue;
+    }
     Version* dummy_versions = cfd->dummy_versions();
     for (Version* v = dummy_versions->next_; v != dummy_versions;
          v = v->next_) {
@@ -3425,6 +4201,9 @@ void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
   live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
 
   for (auto cfd : *column_family_set_) {
+    if (!cfd->initialized()) {
+      continue;
+    }
     auto* current = cfd->current();
     bool found_current = false;
     Version* dummy_versions = cfd->dummy_versions();
@@ -3444,14 +4223,17 @@ void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
 }
 
 InternalIterator* VersionSet::MakeInputIterator(
-    const Compaction* c, RangeDelAggregator* range_del_agg) {
+    const Compaction* c, RangeDelAggregator* range_del_agg,
+    const EnvOptions& env_options_compactions) {
   auto cfd = c->column_family_data();
   ReadOptions read_options;
   read_options.verify_checksums = true;
   read_options.fill_cache = false;
-  if (c->ShouldFormSubcompactions()) {
-    read_options.total_order_seek = true;
-  }
+  // Compaction iterators shouldn't be confined to a single prefix.
+  // Compactions use Seek() for
+  // (a) concurrent compactions,
+  // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
+  read_options.total_order_seek = true;
 
   // Level-0 files have to be merged together.  For other levels,
   // we will make a concatenating iterator per level.
@@ -3467,25 +4249,24 @@ InternalIterator* VersionSet::MakeInputIterator(
         const LevelFilesBrief* flevel = c->input_levels(which);
         for (size_t i = 0; i < flevel->num_files; i++) {
           list[num++] = cfd->table_cache()->NewIterator(
-              read_options, env_options_compactions_,
-              cfd->internal_comparator(), flevel->files[i].fd, range_del_agg,
+              read_options, env_options_compactions, cfd->internal_comparator(),
+              *flevel->files[i].file_metadata, range_del_agg,
+              c->mutable_cf_options()->prefix_extractor.get(),
               nullptr /* table_reader_ptr */,
               nullptr /* no per level latency histogram */,
               true /* for_compaction */, nullptr /* arena */,
-              false /* skip_filters */, (int)which /* level */);
+              false /* skip_filters */, static_cast<int>(which) /* level */);
         }
       } else {
         // Create concatenating iterator for the files from this level
-        list[num++] = NewTwoLevelIterator(
-            new LevelFileIteratorState(
-                cfd->table_cache(), read_options, env_options_compactions_,
-                cfd->internal_comparator(),
-                nullptr /* no per level latency histogram */,
-                true /* for_compaction */, false /* prefix enabled */,
-                false /* skip_filters */, (int)which /* level */,
-                range_del_agg),
-            new LevelFileNumIterator(cfd->internal_comparator(),
-                                     c->input_levels(which)));
+        list[num++] = new LevelIterator(
+            cfd->table_cache(), read_options, env_options_compactions,
+            cfd->internal_comparator(), c->input_levels(which),
+            c->mutable_cf_options()->prefix_extractor.get(),
+            false /* should_sample */,
+            nullptr /* no per level latency histogram */,
+            true /* for_compaction */, false /* skip_filters */,
+            static_cast<int>(which) /* level */, range_del_agg);
       }
     }
   }
@@ -3542,6 +4323,8 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
       }
     }
   }
+#else
+  (void)c;
 #endif
   return true;     // everything good
 }
@@ -3550,6 +4333,9 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
                                       FileMetaData** meta,
                                       ColumnFamilyData** cfd) {
   for (auto cfd_iter : *column_family_set_) {
+    if (!cfd_iter->initialized()) {
+      continue;
+    }
     Version* version = cfd_iter->current();
     const auto* vstorage = version->storage_info();
     for (int level = 0; level < vstorage->num_levels(); level++) {
@@ -3568,7 +4354,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
 
 void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
   for (auto cfd : *column_family_set_) {
-    if (cfd->IsDropped()) {
+    if (cfd->IsDropped() || !cfd->initialized()) {
       continue;
     }
     for (int level = 0; level < cfd->NumberLevels(); level++) {
@@ -3577,36 +4363,36 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
         LiveFileMetaData filemetadata;
         filemetadata.column_family_name = cfd->GetName();
         uint32_t path_id = file->fd.GetPathId();
-        if (path_id < db_options_->db_paths.size()) {
-          filemetadata.db_path = db_options_->db_paths[path_id].path;
+        if (path_id < cfd->ioptions()->cf_paths.size()) {
+          filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
         } else {
-          assert(!db_options_->db_paths.empty());
-          filemetadata.db_path = db_options_->db_paths.back().path;
+          assert(!cfd->ioptions()->cf_paths.empty());
+          filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
         }
         filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
         filemetadata.level = level;
-        filemetadata.size = file->fd.GetFileSize();
+        filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
         filemetadata.smallestkey = file->smallest.user_key().ToString();
         filemetadata.largestkey = file->largest.user_key().ToString();
-        filemetadata.smallest_seqno = file->smallest_seqno;
-        filemetadata.largest_seqno = file->largest_seqno;
+        filemetadata.smallest_seqno = file->fd.smallest_seqno;
+        filemetadata.largest_seqno = file->fd.largest_seqno;
         metadata->push_back(filemetadata);
       }
     }
   }
 }
 
-void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files,
+void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
                                   std::vector<std::string>* manifest_filenames,
                                   uint64_t min_pending_output) {
   assert(manifest_filenames->empty());
   obsolete_manifests_.swap(*manifest_filenames);
-  std::vector<FileMetaData*> pending_files;
-  for (auto f : obsolete_files_) {
-    if (f->fd.GetNumber() < min_pending_output) {
-      files->push_back(f);
+  std::vector<ObsoleteFileInfo> pending_files;
+  for (auto& f : obsolete_files_) {
+    if (f.metadata->fd.GetNumber() < min_pending_output) {
+      files->push_back(std::move(f));
     } else {
-      pending_files.push_back(f);
+      pending_files.push_back(std::move(f));
     }
   }
   obsolete_files_.swap(pending_files);
@@ -3616,7 +4402,9 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
     const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
   assert(edit->is_column_family_add_);
 
-  Version* dummy_versions = new Version(nullptr, this);
+  MutableCFOptions dummy_cf_options;
+  Version* dummy_versions =
+      new Version(nullptr, this, env_options_, dummy_cf_options);
   // Ref() dummy version once so that later we can call Unref() to delete it
   // by avoiding calling "delete" explicitly (~Version is private)
   dummy_versions->Ref();
@@ -3624,7 +4412,9 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
       edit->column_family_name_, edit->column_family_, dummy_versions,
       cf_options);
 
-  Version* v = new Version(new_cfd, this, current_version_number_++);
+  Version* v = new Version(new_cfd, this, env_options_,
+                           *new_cfd->GetLatestMutableCFOptions(),
+                           current_version_number_++);
 
   // Fill level target base information.
   v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),