]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/version_set.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / version_set.cc
index e913a97dd8cca0a49a0f2de0e469a402c9fcc0df..cffc5979d5283849be09ef649625cba231057585 100644 (file)
@@ -10,6 +10,7 @@
 #include "db/version_set.h"
 
 #include <stdio.h>
+
 #include <algorithm>
 #include <array>
 #include <cinttypes>
 #include <string>
 #include <unordered_map>
 #include <vector>
+
 #include "compaction/compaction.h"
+#include "db/blob/blob_file_cache.h"
+#include "db/blob/blob_file_reader.h"
+#include "db/blob/blob_index.h"
 #include "db/internal_stats.h"
 #include "db/log_reader.h"
 #include "db/log_writer.h"
@@ -29,6 +34,7 @@
 #include "db/pinned_iterators_manager.h"
 #include "db/table_cache.h"
 #include "db/version_builder.h"
+#include "db/version_edit_handler.h"
 #include "file/filename.h"
 #include "file/random_access_file_reader.h"
 #include "file/read_write_util.h"
@@ -49,6 +55,7 @@
 #include "table/table_reader.h"
 #include "table/two_level_iterator.h"
 #include "test_util/sync_point.h"
+#include "util/cast_util.h"
 #include "util/coding.h"
 #include "util/stop_watch.h"
 #include "util/string_util.h"
@@ -88,9 +95,9 @@ Status OverlapWithIterator(const Comparator* ucmp,
   *overlap = false;
   if (iter->Valid()) {
     ParsedInternalKey seek_result;
-    if (!ParseInternalKey(iter->key(), &seek_result)) {
-      return Status::Corruption("DB have corrupted keys");
-    }
+    Status s = ParseInternalKey(iter->key(), &seek_result,
+                                false /* log_err_key */);  // TODO
+    if (!s.ok()) return s;
 
     if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
         0) {
@@ -364,6 +371,7 @@ class FilePickerMultiGet {
         range_(range),
         batch_iter_(range->begin()),
         batch_iter_prev_(range->begin()),
+        upper_key_(range->begin()),
         maybe_repeat_key_(false),
         current_level_range_(*range, range->begin(), range->end()),
         current_file_range_(*range, range->begin(), range->end()),
@@ -432,7 +440,7 @@ class FilePickerMultiGet {
             !file_hit)) {
       struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
       f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
-      Slice& user_key = batch_iter_->ukey;
+      Slice& user_key = batch_iter_->ukey_without_ts;
 
       // Do key range filtering of files or/and fractional cascading if:
       // (1) not all the files are in level 0, or
@@ -446,17 +454,17 @@ class FilePickerMultiGet {
         // Check if key is within a file's range. If search left bound and
         // right bound point to the same find, we are sure key falls in
         // range.
+        int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
+            user_key, false, ExtractUserKey(f->smallest_key), true);
+
         assert(curr_level_ == 0 ||
                fp_ctx.curr_index_in_curr_level ==
                    fp_ctx.start_index_in_curr_level ||
-               user_comparator_->Compare(user_key,
-                                         ExtractUserKey(f->smallest_key)) <= 0);
+               cmp_smallest <= 0);
 
-        int cmp_smallest = user_comparator_->Compare(
-            user_key, ExtractUserKey(f->smallest_key));
         if (cmp_smallest >= 0) {
-          cmp_largest = user_comparator_->Compare(
-              user_key, ExtractUserKey(f->largest_key));
+          cmp_largest = user_comparator_->CompareWithoutTimestamp(
+              user_key, false, ExtractUserKey(f->largest_key), true);
         } else {
           cmp_largest = -1;
         }
@@ -480,9 +488,20 @@ class FilePickerMultiGet {
       }
       if (cmp_largest == 0) {
         // cmp_largest is 0, which means the next key will not be in this
-        // file, so stop looking further. Also don't increment megt_iter_
-        // as we may have to look for this key in the next file if we don't
-        // find it in this one
+        // file, so stop looking further. However, its possible there are
+        // duplicates in the batch, so find the upper bound for the batch
+        // in this file (upper_key_) by skipping past the duplicates. We
+        // leave batch_iter_ as is since we may have to pick up from there
+        // for the next file, if this file has a merge value rather than
+        // final value
+        upper_key_ = batch_iter_;
+        ++upper_key_;
+        while (upper_key_ != current_level_range_.end() &&
+               user_comparator_->CompareWithoutTimestamp(
+                   batch_iter_->ukey_without_ts, false,
+                   upper_key_->ukey_without_ts, false) == 0) {
+          ++upper_key_;
+        }
         break;
       } else {
         if (curr_level_ == 0) {
@@ -502,6 +521,12 @@ class FilePickerMultiGet {
     *fd = f;
     *file_index = curr_file_index;
     *is_last_key_in_file = cmp_largest == 0;
+    if (!*is_last_key_in_file) {
+      // If the largest key in the batch overlapping the file is not the
+      // largest key in the file, upper_ley_ would not have been updated so
+      // update it here
+      upper_key_ = batch_iter_;
+    }
     return file_hit;
   }
 
@@ -523,7 +548,7 @@ class FilePickerMultiGet {
           // file regardless for all keys not found yet
           if (current_level_range_.CheckKeyDone(batch_iter_) ||
               curr_level_ == 0) {
-            ++batch_iter_;
+            batch_iter_ = upper_key_;
           }
         }
         // batch_iter_prev_ will become the start key for the next file
@@ -543,18 +568,20 @@ class FilePickerMultiGet {
                                       &is_last_key_in_file)) {
         search_ended_ = !PrepareNextLevel();
       } else {
-        MultiGetRange::Iterator upper_key = batch_iter_;
         if (is_last_key_in_file) {
           // Since cmp_largest is 0, batch_iter_ still points to the last key
           // that falls in this file, instead of the next one. Increment
-          // upper_key so we can set the range properly for SST MultiGet
-          ++upper_key;
-          ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level);
+          // the file index for all keys between batch_iter_ and upper_key_
+          auto tmp_iter = batch_iter_;
+          while (tmp_iter != upper_key_) {
+            ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level);
+            ++tmp_iter;
+          }
           maybe_repeat_key_ = true;
         }
         // Set the range for this file
         current_file_range_ =
-            MultiGetRange(next_file_range, batch_iter_prev_, upper_key);
+            MultiGetRange(next_file_range, batch_iter_prev_, upper_key_);
         returned_file_level_ = curr_level_;
         hit_file_level_ = curr_level_;
         is_hit_file_last_in_level_ =
@@ -606,6 +633,7 @@ class FilePickerMultiGet {
   // key found in the previous SST file, in order to serve as the start of
   // the batch key range for the next SST file
   MultiGetRange::Iterator batch_iter_prev_;
+  MultiGetRange::Iterator upper_key_;
   bool maybe_repeat_key_;
   MultiGetRange current_level_range_;
   MultiGetRange current_file_range_;
@@ -625,7 +653,7 @@ class FilePickerMultiGet {
       if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
           curr_file_level_->num_files) {
         batch_iter_prev_ = current_level_range_.begin();
-        batch_iter_ = current_level_range_.begin();
+        upper_key_ = batch_iter_ = current_level_range_.begin();
         return true;
       }
     }
@@ -720,7 +748,7 @@ class FilePickerMultiGet {
       }
       if (level_contains_keys) {
         batch_iter_prev_ = current_level_range_.begin();
-        batch_iter_ = current_level_range_.begin();
+        upper_key_ = batch_iter_ = current_level_range_.begin();
         return true;
       }
       curr_level_++;
@@ -852,6 +880,7 @@ namespace {
 
 class LevelIterator final : public InternalIterator {
  public:
+  // @param read_options Must outlive this iterator.
   LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
                 const FileOptions& file_options,
                 const InternalKeyComparator& icomparator,
@@ -860,7 +889,8 @@ class LevelIterator final : public InternalIterator {
                 HistogramImpl* file_read_hist, TableReaderCaller caller,
                 bool skip_filters, int level, RangeDelAggregator* range_del_agg,
                 const std::vector<AtomicCompactionUnitBoundary>*
-                    compaction_boundaries = nullptr)
+                    compaction_boundaries = nullptr,
+                bool allow_unprepared_value = false)
       : table_cache_(table_cache),
         read_options_(read_options),
         file_options_(file_options),
@@ -872,6 +902,7 @@ class LevelIterator final : public InternalIterator {
         should_sample_(should_sample),
         caller_(caller),
         skip_filters_(skip_filters),
+        allow_unprepared_value_(allow_unprepared_value),
         file_index_(flevel_->num_files),
         level_(level),
         range_del_agg_(range_del_agg),
@@ -906,14 +937,21 @@ class LevelIterator final : public InternalIterator {
     return file_iter_.iter() ? file_iter_.status() : Status::OK();
   }
 
+  bool PrepareValue() override {
+    return file_iter_.PrepareValue();
+  }
+
   inline bool MayBeOutOfLowerBound() override {
     assert(Valid());
     return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
   }
 
-  inline bool MayBeOutOfUpperBound() override {
-    assert(Valid());
-    return file_iter_.MayBeOutOfUpperBound();
+  inline IterBoundCheck UpperBoundCheckResult() override {
+    if (Valid()) {
+      return file_iter_.UpperBoundCheckResult();
+    } else {
+      return IterBoundCheck::kUnknown;
+    }
   }
 
   void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
@@ -940,13 +978,6 @@ class LevelIterator final : public InternalIterator {
   void SetFileIterator(InternalIterator* iter);
   void InitFileIterator(size_t new_file_index);
 
-  // Called by both of Next() and NextAndGetResult(). Force inline.
-  void NextImpl() {
-    assert(Valid());
-    file_iter_.Next();
-    SkipEmptyFileForward();
-  }
-
   const Slice& file_smallest_key(size_t file_index) {
     assert(file_index < flevel_->num_files);
     return flevel_->files[file_index].smallest_key;
@@ -955,8 +986,8 @@ class LevelIterator final : public InternalIterator {
   bool KeyReachedUpperBound(const Slice& internal_key) {
     return read_options_.iterate_upper_bound != nullptr &&
            user_comparator_.CompareWithoutTimestamp(
-               ExtractUserKey(internal_key),
-               *read_options_.iterate_upper_bound) >= 0;
+               ExtractUserKey(internal_key), /*a_has_ts=*/true,
+               *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0;
   }
 
   InternalIterator* NewFileIterator() {
@@ -977,8 +1008,9 @@ class LevelIterator final : public InternalIterator {
         read_options_, file_options_, icomparator_, *file_meta.file_metadata,
         range_del_agg_, prefix_extractor_,
         nullptr /* don't need reference to table */, file_read_hist_, caller_,
-        /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key,
-        largest_compaction_key);
+        /*arena=*/nullptr, skip_filters_, level_,
+        /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key,
+        largest_compaction_key, allow_unprepared_value_);
   }
 
   // Check if current file being fully within iterate_lower_bound.
@@ -989,14 +1021,14 @@ class LevelIterator final : public InternalIterator {
     if (read_options_.iterate_lower_bound != nullptr &&
         file_index_ < flevel_->num_files) {
       may_be_out_of_lower_bound_ =
-          user_comparator_.Compare(
-              ExtractUserKey(file_smallest_key(file_index_)),
-              *read_options_.iterate_lower_bound) < 0;
+          user_comparator_.CompareWithoutTimestamp(
+              ExtractUserKey(file_smallest_key(file_index_)), /*a_has_ts=*/true,
+              *read_options_.iterate_lower_bound, /*b_has_ts=*/false) < 0;
     }
   }
 
   TableCache* table_cache_;
-  const ReadOptions read_options_;
+  const ReadOptions& read_options_;
   const FileOptions& file_options_;
   const InternalKeyComparator& icomparator_;
   const UserComparatorWrapper user_comparator_;
@@ -1011,6 +1043,7 @@ class LevelIterator final : public InternalIterator {
   bool should_sample_;
   TableReaderCaller caller_;
   bool skip_filters_;
+  bool allow_unprepared_value_;
   bool may_be_out_of_lower_bound_ = true;
   size_t file_index_;
   int level_;
@@ -1063,13 +1096,17 @@ void LevelIterator::Seek(const Slice& target) {
     //    next key after the prefix, or make the iterator invalid.
     // A side benefit will be that it invalidates the iterator earlier so that
     // the upper level merging iterator can merge fewer child iterators.
-    Slice target_user_key = ExtractUserKey(target);
-    Slice file_user_key = ExtractUserKey(file_iter_.key());
-    if (prefix_extractor_->InDomain(target_user_key) &&
-        (!prefix_extractor_->InDomain(file_user_key) ||
-         user_comparator_.Compare(
-             prefix_extractor_->Transform(target_user_key),
-             prefix_extractor_->Transform(file_user_key)) != 0)) {
+    size_t ts_sz = user_comparator_.timestamp_size();
+    Slice target_user_key_without_ts =
+        ExtractUserKeyAndStripTimestamp(target, ts_sz);
+    Slice file_user_key_without_ts =
+        ExtractUserKeyAndStripTimestamp(file_iter_.key(), ts_sz);
+    if (prefix_extractor_->InDomain(target_user_key_without_ts) &&
+        (!prefix_extractor_->InDomain(file_user_key_without_ts) ||
+         user_comparator_.CompareWithoutTimestamp(
+             prefix_extractor_->Transform(target_user_key_without_ts), false,
+             prefix_extractor_->Transform(file_user_key_without_ts),
+             false) != 0)) {
       SetFileIterator(nullptr);
     }
   }
@@ -1108,14 +1145,26 @@ void LevelIterator::SeekToLast() {
   CheckMayBeOutOfLowerBound();
 }
 
-void LevelIterator::Next() { NextImpl(); }
+void LevelIterator::Next() {
+  assert(Valid());
+  file_iter_.Next();
+  SkipEmptyFileForward();
+}
 
 bool LevelIterator::NextAndGetResult(IterateResult* result) {
-  NextImpl();
-  bool is_valid = Valid();
-  if (is_valid) {
-    result->key = key();
-    result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
+  assert(Valid());
+  bool is_valid = file_iter_.NextAndGetResult(result);
+  if (!is_valid) {
+    SkipEmptyFileForward();
+    is_valid = Valid();
+    if (is_valid) {
+      result->key = key();
+      result->bound_check_result = file_iter_.UpperBoundCheckResult();
+      // Ideally, we should return the real file_iter_.value_prepared but the
+      // information is not here. It would casue an extra PrepareValue()
+      // for the first key of a file.
+      result->value_prepared = !allow_unprepared_value_;
+    }
   }
   return is_valid;
 }
@@ -1130,7 +1179,8 @@ bool LevelIterator::SkipEmptyFileForward() {
   bool seen_empty_file = false;
   while (file_iter_.iter() == nullptr ||
          (!file_iter_.Valid() && file_iter_.status().ok() &&
-          !file_iter_.iter()->IsOutOfBound())) {
+          file_iter_.iter()->UpperBoundCheckResult() !=
+              IterBoundCheck::kOutOfBound)) {
     seen_empty_file = true;
     // Move to next file
     if (file_index_ >= flevel_->num_files - 1) {
@@ -1202,28 +1252,6 @@ void LevelIterator::InitFileIterator(size_t new_file_index) {
 }
 }  // anonymous namespace
 
-// A wrapper of version builder which references the current version in
-// constructor and unref it in the destructor.
-// Both of the constructor and destructor need to be called inside DB Mutex.
-class BaseReferencedVersionBuilder {
- public:
-  explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
-      : version_builder_(new VersionBuilder(
-            cfd->current()->version_set()->file_options(), cfd->table_cache(),
-            cfd->current()->storage_info(), cfd->ioptions()->info_log)),
-        version_(cfd->current()) {
-    version_->Ref();
-  }
-  ~BaseReferencedVersionBuilder() {
-    version_->Unref();
-  }
-  VersionBuilder* version_builder() { return version_builder_.get(); }
-
- private:
-  std::unique_ptr<VersionBuilder> version_builder_;
-  Version* version_;
-};
-
 Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
                                    const FileMetaData* file_meta,
                                    const std::string* fname) const {
@@ -1264,8 +1292,8 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
   // pass the magic number check in the footer.
   std::unique_ptr<RandomAccessFileReader> file_reader(
       new RandomAccessFileReader(
-          std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
-          0 /* hist_type */, nullptr /* file_read_hist */,
+          std::move(file), file_name, nullptr /* env */, io_tracer_,
+          nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
           nullptr /* rate_limiter */, ioptions->listeners));
   s = ReadTableProperties(
       file_reader.get(), file_meta->fd.GetFileSize(),
@@ -1554,12 +1582,13 @@ double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
 void Version::AddIterators(const ReadOptions& read_options,
                            const FileOptions& soptions,
                            MergeIteratorBuilder* merge_iter_builder,
-                           RangeDelAggregator* range_del_agg) {
+                           RangeDelAggregator* range_del_agg,
+                           bool allow_unprepared_value) {
   assert(storage_info_.finalized_);
 
   for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
     AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
-                         range_del_agg);
+                         range_del_agg, allow_unprepared_value);
   }
 }
 
@@ -1567,7 +1596,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
                                    const FileOptions& soptions,
                                    MergeIteratorBuilder* merge_iter_builder,
                                    int level,
-                                   RangeDelAggregator* range_del_agg) {
+                                   RangeDelAggregator* range_del_agg,
+                                   bool allow_unprepared_value) {
   assert(storage_info_.finalized_);
   if (level >= storage_info_.num_non_empty_levels()) {
     // This is an empty level
@@ -1590,9 +1620,9 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
           mutable_cf_options_.prefix_extractor.get(), nullptr,
           cfd_->internal_stats()->GetFileReadHist(0),
           TableReaderCaller::kUserIterator, arena,
-          /*skip_filters=*/false, /*level=*/0,
+          /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
           /*smallest_compaction_key=*/nullptr,
-          /*largest_compaction_key=*/nullptr));
+          /*largest_compaction_key=*/nullptr, allow_unprepared_value));
     }
     if (should_sample) {
       // Count ones for every L0 files. This is done per iterator creation
@@ -1614,7 +1644,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
         mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
         cfd_->internal_stats()->GetFileReadHist(level),
         TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
-        range_del_agg, /*largest_compaction_key=*/nullptr));
+        range_del_agg,
+        /*compaction_boundaries=*/nullptr, allow_unprepared_value));
   }
 }
 
@@ -1648,9 +1679,10 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
           mutable_cf_options_.prefix_extractor.get(), nullptr,
           cfd_->internal_stats()->GetFileReadHist(0),
           TableReaderCaller::kUserIterator, &arena,
-          /*skip_filters=*/false, /*level=*/0,
+          /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
           /*smallest_compaction_key=*/nullptr,
-          /*largest_compaction_key=*/nullptr));
+          /*largest_compaction_key=*/nullptr,
+          /*allow_unprepared_value=*/false));
       status = OverlapWithIterator(
           ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
       if (!status.ok() || *overlap) {
@@ -1726,6 +1758,7 @@ VersionStorageInfo::VersionStorageInfo(
 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
                  const FileOptions& file_opt,
                  const MutableCFOptions mutable_cf_options,
+                 const std::shared_ptr<IOTracer>& io_tracer,
                  uint64_t version_number)
     : env_(vset->env_),
       cfd_(column_family_data),
@@ -1733,6 +1766,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
       db_statistics_((cfd_ == nullptr) ? nullptr
                                        : cfd_->ioptions()->statistics),
       table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
+      blob_file_cache_(cfd_ ? cfd_->blob_file_cache() : nullptr),
       merge_operator_((cfd_ == nullptr) ? nullptr
                                         : cfd_->ioptions()->merge_operator),
       storage_info_(
@@ -1751,10 +1785,62 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
       refs_(0),
       file_options_(file_opt),
       mutable_cf_options_(mutable_cf_options),
-      version_number_(version_number) {}
+      max_file_size_for_l0_meta_pin_(
+          MaxFileSizeForL0MetaPin(mutable_cf_options_)),
+      version_number_(version_number),
+      io_tracer_(io_tracer) {}
+
+Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
+                        PinnableSlice* value) const {
+  assert(value);
+
+  if (read_options.read_tier == kBlockCacheTier) {
+    return Status::Incomplete("Cannot read blob: no disk I/O allowed");
+  }
+
+  BlobIndex blob_index;
+
+  {
+    Status s = blob_index.DecodeFrom(*value);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+
+  if (blob_index.HasTTL() || blob_index.IsInlined()) {
+    return Status::Corruption("Unexpected TTL/inlined blob index");
+  }
+
+  const auto& blob_files = storage_info_.GetBlobFiles();
+
+  const uint64_t blob_file_number = blob_index.file_number();
+
+  const auto it = blob_files.find(blob_file_number);
+  if (it == blob_files.end()) {
+    return Status::Corruption("Invalid blob file number");
+  }
+
+  CacheHandleGuard<BlobFileReader> blob_file_reader;
+
+  {
+    assert(blob_file_cache_);
+    const Status s = blob_file_cache_->GetBlobFileReader(blob_file_number,
+                                                         &blob_file_reader);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+
+  assert(blob_file_reader.GetValue());
+  const Status s = blob_file_reader.GetValue()->GetBlob(
+      read_options, user_key, blob_index.offset(), blob_index.size(),
+      blob_index.compression(), value);
+
+  return s;
+}
 
 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
-                  PinnableSlice* value, Status* status,
+                  PinnableSlice* value, std::string* timestamp, Status* status,
                   MergeContext* merge_context,
                   SequenceNumber* max_covering_tombstone_seq, bool* value_found,
                   bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
@@ -1775,12 +1861,19 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
       vset_->block_cache_tracer_->is_tracing_enabled()) {
     tracing_get_id = vset_->block_cache_tracer_->NextGetId();
   }
+
+  // Note: the old StackableDB-based BlobDB passes in
+  // GetImplOptions::is_blob_index; for the integrated BlobDB implementation, we
+  // need to provide it here.
+  bool is_blob_index = false;
+  bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index;
+
   GetContext get_context(
       user_comparator(), merge_operator_, info_log_, db_statistics_,
       status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
-      do_merge ? value : nullptr, value_found, merge_context, do_merge,
-      max_covering_tombstone_seq, this->env_, seq,
-      merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
+      do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
+      merge_context, do_merge, max_covering_tombstone_seq, this->env_, seq,
+      merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob_to_use,
       tracing_get_id);
 
   // Pin blocks that we read to hold merge operands
@@ -1814,11 +1907,11 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
         cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
         IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
                         fp.IsHitFileLastInLevel()),
-        fp.GetCurrentLevel());
+        fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_);
     // TODO: examine the behavior for corrupted key
     if (timer_enabled) {
       PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
-                                fp.GetCurrentLevel());
+                                fp.GetHitFileLevel());
     }
     if (!status->ok()) {
       return;
@@ -1838,6 +1931,18 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
         // TODO: update per-level perfcontext user_key_return_count for kMerge
         break;
       case GetContext::kFound:
+        if (is_blob_index) {
+          if (do_merge && value) {
+            *status = GetBlob(read_options, user_key, value);
+            if (!status->ok()) {
+              if (status->IsIncomplete()) {
+                get_context.MarkKeyMayExist();
+              }
+              return;
+            }
+          }
+        }
+
         if (fp.GetHitFileLevel() == 0) {
           RecordTick(db_statistics_, GET_HIT_L0);
         } else if (fp.GetHitFileLevel() == 1) {
@@ -1855,7 +1960,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
       case GetContext::kCorrupt:
         *status = Status::Corruption("corrupted key for ", user_key);
         return;
-      case GetContext::kBlobIndex:
+      case GetContext::kUnexpectedBlobIndex:
         ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
         *status = Status::NotSupported(
             "Encounter unexpected blob index. Please open DB with "
@@ -1917,11 +2022,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
     assert(iter->s->ok() || iter->s->IsMergeInProgress());
     get_ctx.emplace_back(
         user_comparator(), merge_operator_, info_log_, db_statistics_,
-        iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
-        iter->value, nullptr, &(iter->merge_context), true,
-        &iter->max_covering_tombstone_seq, this->env_, nullptr,
-        merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
-        tracing_mget_id);
+        iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge,
+        iter->ukey_with_ts, iter->value, iter->timestamp, nullptr,
+        &(iter->merge_context), true, &iter->max_covering_tombstone_seq,
+        this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr,
+        callback, is_blob, tracing_mget_id);
     // MergeInProgress status, if set, has been transferred to the get_context
     // state, so we set status to ok here. From now on, the iter status will
     // be used for IO errors, and get_context state will be used for any
@@ -1940,6 +2045,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
       &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
       &storage_info_.file_indexer_, user_comparator(), internal_comparator());
   FdWithKeyRange* f = fp.GetNextFile();
+  Status s;
+  uint64_t num_index_read = 0;
+  uint64_t num_filter_read = 0;
+  uint64_t num_data_read = 0;
+  uint64_t num_sst_read = 0;
 
   while (f != nullptr) {
     MultiGetRange file_range = fp.CurrentFileRange();
@@ -1947,17 +2057,17 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
         GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
         get_perf_context()->per_level_perf_context_enabled;
     StopWatchNano timer(env_, timer_enabled /* auto_start */);
-    Status s = table_cache_->MultiGet(
+    s = table_cache_->MultiGet(
         read_options, *internal_comparator(), *f->file_metadata, &file_range,
         mutable_cf_options_.prefix_extractor.get(),
         cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
         IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
                         fp.IsHitFileLastInLevel()),
-        fp.GetCurrentLevel());
+        fp.GetHitFileLevel());
     // TODO: examine the behavior for corrupted key
     if (timer_enabled) {
       PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
-                                fp.GetCurrentLevel());
+                                fp.GetHitFileLevel());
     }
     if (!s.ok()) {
       // TODO: Set status for individual keys appropriately
@@ -1968,7 +2078,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
       return;
     }
     uint64_t batch_size = 0;
-    for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
+    for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
+         ++iter) {
       GetContext& get_context = *iter->get_context;
       Status* status = iter->s;
       // The Status in the KeyContext takes precedence over GetContext state
@@ -1985,6 +2096,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
         sample_file_read_inc(f->file_metadata);
       }
       batch_size++;
+      num_index_read += get_context.get_context_stats_.num_index_read;
+      num_filter_read += get_context.get_context_stats_.num_filter_read;
+      num_data_read += get_context.get_context_stats_.num_data_read;
+      num_sst_read += get_context.get_context_stats_.num_sst_read;
+
       // report the counters before returning
       if (get_context.State() != GetContext::kNotFound &&
           get_context.State() != GetContext::kMerge &&
@@ -2014,7 +2130,12 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
           }
           PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
                                     fp.GetHitFileLevel());
+          file_range.AddValueSize(iter->value->size());
           file_range.MarkKeyDone(iter);
+          if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
+            s = Status::Aborted();
+            break;
+          }
           continue;
         case GetContext::kDeleted:
           // Use empty error message for speed
@@ -2026,7 +2147,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
               Status::Corruption("corrupted key for ", iter->lkey->user_key());
           file_range.MarkKeyDone(iter);
           continue;
-        case GetContext::kBlobIndex:
+        case GetContext::kUnexpectedBlobIndex:
           ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
           *status = Status::NotSupported(
               "Encounter unexpected blob index. Please open DB with "
@@ -2035,15 +2156,32 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
           continue;
       }
     }
+
+    // Report MultiGet stats per level.
+    if (fp.IsHitFileLastInLevel()) {
+      // Dump the stats if this is the last file of this level and reset for
+      // next level.
+      RecordInHistogram(db_statistics_,
+                        NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
+                        num_index_read + num_filter_read);
+      RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL,
+                        num_data_read);
+      RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);
+      num_filter_read = 0;
+      num_index_read = 0;
+      num_data_read = 0;
+      num_sst_read = 0;
+    }
+
     RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
-    if (file_picker_range.empty()) {
+    if (!s.ok() || file_picker_range.empty()) {
       break;
     }
     f = fp.GetNextFile();
   }
 
   // Process any left over keys
-  for (auto iter = range->begin(); iter != range->end(); ++iter) {
+  for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
     GetContext& get_context = *iter->get_context;
     Status* status = iter->s;
     Slice user_key = iter->lkey->user_key();
@@ -2068,12 +2206,23 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
           nullptr /* result_operand */, true);
       if (LIKELY(iter->value != nullptr)) {
         iter->value->PinSelf();
+        range->AddValueSize(iter->value->size());
+        range->MarkKeyDone(iter);
+        if (range->GetValueSize() > read_options.value_size_soft_limit) {
+          s = Status::Aborted();
+          break;
+        }
       }
     } else {
       range->MarkKeyDone(iter);
       *status = Status::NotFound();  // Use an empty error message for speed
     }
   }
+
+  for (auto iter = range->begin(); iter != range->end(); ++iter) {
+    range->MarkKeyDone(iter);
+    *(iter->s) = s;
+  }
 }
 
 bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
@@ -2095,6 +2244,9 @@ void VersionStorageInfo::GenerateLevelFilesBrief() {
 void Version::PrepareApply(
     const MutableCFOptions& mutable_cf_options,
     bool update_stats) {
+  TEST_SYNC_POINT_CALLBACK(
+      "Version::PrepareApply:forced_check",
+      reinterpret_cast<void*>(&storage_info_.force_consistency_checks_));
   UpdateAccumulatedStats(update_stats);
   storage_info_.UpdateNumNonEmptyLevels();
   storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
@@ -2396,6 +2548,11 @@ void VersionStorageInfo::ComputeCompactionScore(
         // compaction score for the whole DB. Adding other levels as if
         // they are L0 files.
         for (int i = 1; i < num_levels(); i++) {
+          // Its possible that a subset of the files in a level may be in a
+          // compaction, due to delete triggered compaction or trivial move.
+          // In that case, the below check may not catch a level being
+          // compacted as it only checks the first file. The worst that can
+          // happen is a scheduled compaction thread will find nothing to do.
           if (!files_[i].empty() && !files_[i][0]->being_compacted) {
             num_sorted_runs++;
           }
@@ -2425,9 +2582,21 @@ void VersionStorageInfo::ComputeCompactionScore(
           // Level-based involves L0->L0 compactions that can lead to oversized
           // L0 files. Take into account size as well to avoid later giant
           // compactions to the base level.
-          score = std::max(
-              score, static_cast<double>(total_size) /
-                     mutable_cf_options.max_bytes_for_level_base);
+          uint64_t l0_target_size = mutable_cf_options.max_bytes_for_level_base;
+          if (immutable_cf_options.level_compaction_dynamic_level_bytes &&
+              level_multiplier_ != 0.0) {
+            // Prevent L0 to Lbase fanout from growing larger than
+            // `level_multiplier_`. This prevents us from getting stuck picking
+            // L0 forever even when it is hurting write-amp. That could happen
+            // in dynamic level compaction's write-burst mode where the base
+            // level's target size can grow to be enormous.
+            l0_target_size =
+                std::max(l0_target_size,
+                         static_cast<uint64_t>(level_max_bytes_[base_level_] /
+                                               level_multiplier_));
+          }
+          score =
+              std::max(score, static_cast<double>(total_size) / l0_target_size);
         }
       }
     } else {
@@ -2593,31 +2762,30 @@ bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
 }
 } // anonymous namespace
 
-void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
-  auto* level_files = &files_[level];
-  // Must not overlap
-#ifndef NDEBUG
-  if (level > 0 && !level_files->empty() &&
-      internal_comparator_->Compare(
-          (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) {
-    auto* f2 = (*level_files)[level_files->size() - 1];
-    if (info_log != nullptr) {
-      Error(info_log, "Adding new file %" PRIu64
-                      " range (%s, %s) to level %d but overlapping "
-                      "with existing file %" PRIu64 " %s %s",
-            f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
-            f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
-            f2->smallest.DebugString(true).c_str(),
-            f2->largest.DebugString(true).c_str());
-      LogFlush(info_log);
-    }
-    assert(false);
-  }
-#else
-  (void)info_log;
-#endif
+void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
+  auto& level_files = files_[level];
+  level_files.push_back(f);
+
   f->refs++;
-  level_files->push_back(f);
+
+  const uint64_t file_number = f->fd.GetNumber();
+
+  assert(file_locations_.find(file_number) == file_locations_.end());
+  file_locations_.emplace(file_number,
+                          FileLocation(level, level_files.size() - 1));
+}
+
+void VersionStorageInfo::AddBlobFile(
+    std::shared_ptr<BlobFileMetaData> blob_file_meta) {
+  assert(blob_file_meta);
+
+  const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
+
+  auto it = blob_files_.lower_bound(blob_file_number);
+  assert(it == blob_files_.end() || it->first != blob_file_number);
+
+  blob_files_.insert(
+      it, BlobFiles::value_type(blob_file_number, std::move(blob_file_meta)));
 }
 
 // Version::PrepareApply() need to be called before calling the function, or
@@ -3345,22 +3513,23 @@ void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
 }
 
 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
-  // Estimate the live data size by adding up the size of the last level for all
-  // key ranges. Note: Estimate depends on the ordering of files in level 0
-  // because files in level 0 can be overlapping.
+  // Estimate the live data size by adding up the size of a maximal set of
+  // sst files with no range overlap in same or higher level. The less
+  // compacted, the more optimistic (smaller) this estimate is. Also,
+  // for multiple sorted runs within a level, file order will matter.
   uint64_t size = 0;
 
   auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
     return internal_comparator_->Compare(*x, *y) < 0;
   };
-  // (Ordered) map of largest keys in non-overlapping files
+  // (Ordered) map of largest keys in files being included in size estimate
   std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
 
   for (int l = num_levels_ - 1; l >= 0; l--) {
     bool found_end = false;
     for (auto file : files_[l]) {
-      // Find the first file where the largest key is larger than the smallest
-      // key of the current file. If this file does not overlap with the
+      // Find the first file already included with largest key is larger than
+      // the smallest key of `file`. If that file does not overlap with the
       // current file, none of the files in the map does. If there is
       // no potential overlap, we can safely insert the rest of this level
       // (if the level is not 0) into the map without checking again because
@@ -3409,13 +3578,27 @@ bool VersionStorageInfo::RangeMightExistAfterSortedRun(
   return false;
 }
 
-void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
-  for (int level = 0; level < storage_info_.num_levels(); level++) {
-    const std::vector<FileMetaData*>& files = storage_info_.files_[level];
-    for (const auto& file : files) {
-      live->push_back(file->fd);
+void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files,
+                           std::vector<uint64_t>* live_blob_files) const {
+  assert(live_table_files);
+  assert(live_blob_files);
+
+  for (int level = 0; level < storage_info_.num_levels(); ++level) {
+    const auto& level_files = storage_info_.LevelFiles(level);
+    for (const auto& meta : level_files) {
+      assert(meta);
+
+      live_table_files->emplace_back(meta->fd.GetNumber());
     }
   }
+
+  const auto& blob_files = storage_info_.GetBlobFiles();
+  for (const auto& pair : blob_files) {
+    const auto& meta = pair.second;
+    assert(meta);
+
+    live_blob_files->emplace_back(meta->GetBlobFileNumber());
+  }
 }
 
 std::string Version::DebugString(bool hex, bool print_stats) const {
@@ -3462,6 +3645,21 @@ std::string Version::DebugString(bool hex, bool print_stats) const {
       r.append("\n");
     }
   }
+
+  const auto& blob_files = storage_info_.GetBlobFiles();
+  if (!blob_files.empty()) {
+    r.append("--- blob files --- version# ");
+    AppendNumberTo(&r, version_number_);
+    r.append(" ---\n");
+    for (const auto& pair : blob_files) {
+      const auto& blob_file_meta = pair.second;
+      assert(blob_file_meta);
+
+      r.append(blob_file_meta->DebugString());
+      r.push_back('\n');
+    }
+  }
+
   return r;
 }
 
@@ -3473,15 +3671,30 @@ struct VersionSet::ManifestWriter {
   ColumnFamilyData* cfd;
   const MutableCFOptions mutable_cf_options;
   const autovector<VersionEdit*>& edit_list;
+  const std::function<void(const Status&)> manifest_write_callback;
 
-  explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
-                          const MutableCFOptions& cf_options,
-                          const autovector<VersionEdit*>& e)
+  explicit ManifestWriter(
+      InstrumentedMutex* mu, ColumnFamilyData* _cfd,
+      const MutableCFOptions& cf_options, const autovector<VersionEdit*>& e,
+      const std::function<void(const Status&)>& manifest_wcb)
       : done(false),
         cv(mu),
         cfd(_cfd),
         mutable_cf_options(cf_options),
-        edit_list(e) {}
+        edit_list(e),
+        manifest_write_callback(manifest_wcb) {}
+  ~ManifestWriter() { status.PermitUncheckedError(); }
+
+  bool IsAllWalEdits() const {
+    bool all_wal_edits = true;
+    for (const auto& e : edit_list) {
+      if (!e->IsWalManipulation()) {
+        all_wal_edits = false;
+        break;
+      }
+    }
+    return all_wal_edits;
+  }
 };
 
 Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
@@ -3534,12 +3747,15 @@ VersionSet::VersionSet(const std::string& dbname,
                        const FileOptions& storage_options, Cache* table_cache,
                        WriteBufferManager* write_buffer_manager,
                        WriteController* write_controller,
-                       BlockCacheTracer* const block_cache_tracer)
-    : column_family_set_(new ColumnFamilySet(
-          dbname, _db_options, storage_options, table_cache,
-          write_buffer_manager, write_controller, block_cache_tracer)),
+                       BlockCacheTracer* const block_cache_tracer,
+                       const std::shared_ptr<IOTracer>& io_tracer)
+    : column_family_set_(
+          new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
+                              write_buffer_manager, write_controller,
+                              block_cache_tracer, io_tracer)),
+      table_cache_(table_cache),
       env_(_db_options->env),
-      fs_(_db_options->fs.get()),
+      fs_(_db_options->fs, io_tracer),
       dbname_(dbname),
       db_options_(_db_options),
       next_file_number_(2),
@@ -3553,21 +3769,49 @@ VersionSet::VersionSet(const std::string& dbname,
       current_version_number_(0),
       manifest_file_size_(0),
       file_options_(storage_options),
-      block_cache_tracer_(block_cache_tracer) {}
+      block_cache_tracer_(block_cache_tracer),
+      io_tracer_(io_tracer) {}
 
 VersionSet::~VersionSet() {
   // we need to delete column_family_set_ because its destructor depends on
   // VersionSet
-  Cache* table_cache = column_family_set_->get_table_cache();
   column_family_set_.reset();
   for (auto& file : obsolete_files_) {
     if (file.metadata->table_reader_handle) {
-      table_cache->Release(file.metadata->table_reader_handle);
-      TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
+      table_cache_->Release(file.metadata->table_reader_handle);
+      TableCache::Evict(table_cache_, file.metadata->fd.GetNumber());
     }
     file.DeleteMetadata();
   }
   obsolete_files_.clear();
+  io_status_.PermitUncheckedError();
+}
+
+void VersionSet::Reset() {
+  if (column_family_set_) {
+    WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
+    WriteController* wc = column_family_set_->write_controller();
+    column_family_set_.reset(
+        new ColumnFamilySet(dbname_, db_options_, file_options_, table_cache_,
+                            wbm, wc, block_cache_tracer_, io_tracer_));
+  }
+  db_id_.clear();
+  next_file_number_.store(2);
+  min_log_number_to_keep_2pc_.store(0);
+  manifest_file_number_ = 0;
+  options_file_number_ = 0;
+  pending_manifest_file_number_ = 0;
+  last_sequence_.store(0);
+  last_allocated_sequence_.store(0);
+  last_published_sequence_.store(0);
+  prev_log_number_ = 0;
+  descriptor_log_.reset();
+  current_version_number_ = 0;
+  manifest_writers_.clear();
+  manifest_file_size_ = 0;
+  obsolete_files_.clear();
+  obsolete_manifests_.clear();
+  wals_.Reset();
 }
 
 void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
@@ -3600,8 +3844,9 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
 
 Status VersionSet::ProcessManifestWrites(
     std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
-    Directory* db_directory, bool new_descriptor_log,
+    FSDirectory* db_directory, bool new_descriptor_log,
     const ColumnFamilyOptions* new_cf_options) {
+  mu->AssertHeld();
   assert(!writers.empty());
   ManifestWriter& first_writer = writers.front();
   ManifestWriter* last_writer = &first_writer;
@@ -3678,16 +3923,22 @@ Status VersionSet::ProcessManifestWrites(
         }
       }
       if (version == nullptr) {
-        version = new Version(last_writer->cfd, this, file_options_,
-                              last_writer->mutable_cf_options,
-                              current_version_number_++);
-        versions.push_back(version);
-        mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
-        builder_guards.emplace_back(
-            new BaseReferencedVersionBuilder(last_writer->cfd));
-        builder = builder_guards.back()->version_builder();
-      }
-      assert(builder != nullptr);  // make checker happy
+        // WAL manipulations do not need to be applied to versions.
+        if (!last_writer->IsAllWalEdits()) {
+          version = new Version(last_writer->cfd, this, file_options_,
+                                last_writer->mutable_cf_options, io_tracer_,
+                                current_version_number_++);
+          versions.push_back(version);
+          mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
+          builder_guards.emplace_back(
+              new BaseReferencedVersionBuilder(last_writer->cfd));
+          builder = builder_guards.back()->version_builder();
+        }
+        assert(last_writer->IsAllWalEdits() || builder);
+        assert(last_writer->IsAllWalEdits() || version);
+        TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion",
+                                 version);
+      }
       for (const auto& e : last_writer->edit_list) {
         if (e->is_in_atomic_group_) {
           if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
@@ -3760,9 +4011,6 @@ Status VersionSet::ProcessManifestWrites(
   }
 #endif  // NDEBUG
 
-  uint64_t new_manifest_file_size = 0;
-  Status s;
-
   assert(pending_manifest_file_number_ == 0);
   if (!descriptor_log_ ||
       manifest_file_size_ > db_options_->max_manifest_file_size) {
@@ -3776,6 +4024,7 @@ Status VersionSet::ProcessManifestWrites(
   // reads its content after releasing db mutex to avoid race with
   // SwitchMemtable().
   std::unordered_map<uint32_t, MutableCFState> curr_state;
+  VersionEdit wal_additions;
   if (new_descriptor_log) {
     pending_manifest_file_number_ = NewFileNumber();
     batch_edits.back()->SetNextFile(next_file_number_.load());
@@ -3790,13 +4039,20 @@ Status VersionSet::ProcessManifestWrites(
       assert(curr_state.find(cfd->GetID()) == curr_state.end());
       curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
     }
+
+    for (const auto& wal : wals_.GetWals()) {
+      wal_additions.AddWal(wal.first, wal.second);
+    }
   }
 
+  uint64_t new_manifest_file_size = 0;
+  Status s;
+  IOStatus io_s;
   {
     FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
     mu->Unlock();
 
-    TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
+    TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
     if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
       for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
         assert(!builder_guards.empty() &&
@@ -3805,10 +4061,11 @@ Status VersionSet::ProcessManifestWrites(
                builder_guards.size() == versions.size());
         ColumnFamilyData* cfd = versions[i]->cfd_;
         s = builder_guards[i]->version_builder()->LoadTableHandlers(
-            cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
+            cfd->internal_stats(), 1 /* max_threads */,
             true /* prefetch_index_and_filter_in_cache */,
             false /* is_initial_load */,
-            mutable_cf_options_ptrs[i]->prefix_extractor.get());
+            mutable_cf_options_ptrs[i]->prefix_extractor.get(),
+            MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i]));
         if (!s.ok()) {
           if (db_options_->paranoid_checks) {
             break;
@@ -3827,18 +4084,21 @@ Status VersionSet::ProcessManifestWrites(
       std::string descriptor_fname =
           DescriptorFileName(dbname_, pending_manifest_file_number_);
       std::unique_ptr<FSWritableFile> descriptor_file;
-      s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
-                          opt_file_opts);
-      if (s.ok()) {
+      io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file,
+                             opt_file_opts);
+      if (io_s.ok()) {
         descriptor_file->SetPreallocationBlockSize(
             db_options_->manifest_preallocation_size);
 
         std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
             std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
-            nullptr, db_options_->listeners));
+            io_tracer_, nullptr, db_options_->listeners));
         descriptor_log_.reset(
             new log::Writer(std::move(file_writer), 0, false));
-        s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
+        s = WriteCurrentStateToManifest(curr_state, wal_additions,
+                                        descriptor_log_.get(), io_s);
+      } else {
+        s = io_s;
       }
     }
 
@@ -3872,15 +4132,19 @@ Status VersionSet::ProcessManifestWrites(
         }
         ++idx;
 #endif /* !NDEBUG */
-        s = descriptor_log_->AddRecord(record);
-        if (!s.ok()) {
+        io_s = descriptor_log_->AddRecord(record);
+        if (!io_s.ok()) {
+          s = io_s;
           break;
         }
       }
       if (s.ok()) {
-        s = SyncManifest(env_, db_options_, descriptor_log_->file());
+        io_s = SyncManifest(env_, db_options_, descriptor_log_->file());
+        TEST_SYNC_POINT_CALLBACK(
+            "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
       }
-      if (!s.ok()) {
+      if (!io_s.ok()) {
+        s = io_s;
         ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
                         s.ToString().c_str());
       }
@@ -3889,8 +4153,11 @@ Status VersionSet::ProcessManifestWrites(
     // If we just created a new descriptor file, install it by writing a
     // new CURRENT file that points to it.
     if (s.ok() && new_descriptor_log) {
-      s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
-                         db_directory);
+      io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
+                            db_directory);
+      if (!io_s.ok()) {
+        s = io_s;
+      }
       TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
     }
 
@@ -3910,6 +4177,28 @@ Status VersionSet::ProcessManifestWrites(
     mu->Lock();
   }
 
+  if (s.ok()) {
+    // Apply WAL edits, DB mutex must be held.
+    for (auto& e : batch_edits) {
+      if (e->IsWalAddition()) {
+        s = wals_.AddWals(e->GetWalAdditions());
+      } else if (e->IsWalDeletion()) {
+        s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber());
+      }
+      if (!s.ok()) {
+        break;
+      }
+    }
+  }
+
+  if (!io_s.ok()) {
+    if (io_status_.ok()) {
+      io_status_ = io_s;
+    }
+  } else if (!io_status_.ok()) {
+    io_status_ = io_s;
+  }
+
   // Append the old manifest file to the obsolete_manifest_ list to be deleted
   // by PurgeObsoleteFiles later.
   if (s.ok() && new_descriptor_log) {
@@ -3986,9 +4275,15 @@ Status VersionSet::ProcessManifestWrites(
       ROCKS_LOG_INFO(db_options_->info_log,
                      "Deleting manifest %" PRIu64 " current manifest %" PRIu64
                      "\n",
-                     manifest_file_number_, pending_manifest_file_number_);
-      env_->DeleteFile(
+                     pending_manifest_file_number_, manifest_file_number_);
+      Status manifest_del_status = env_->DeleteFile(
           DescriptorFileName(dbname_, pending_manifest_file_number_));
+      if (!manifest_del_status.ok()) {
+        ROCKS_LOG_WARN(db_options_->info_log,
+                       "Failed to delete manifest %" PRIu64 ": %s",
+                       pending_manifest_file_number_,
+                       manifest_del_status.ToString().c_str());
+      }
     }
   }
 
@@ -4007,6 +4302,9 @@ Status VersionSet::ProcessManifestWrites(
     }
     ready->status = s;
     ready->done = true;
+    if (ready->manifest_write_callback) {
+      (ready->manifest_write_callback)(s);
+    }
     if (need_signal) {
       ready->cv.Signal();
     }
@@ -4026,8 +4324,9 @@ Status VersionSet::LogAndApply(
     const autovector<ColumnFamilyData*>& column_family_datas,
     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
     const autovector<autovector<VersionEdit*>>& edit_lists,
-    InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log,
-    const ColumnFamilyOptions* new_cf_options) {
+    InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log,
+    const ColumnFamilyOptions* new_cf_options,
+    const std::vector<std::function<void(const Status&)>>& manifest_wcbs) {
   mu->AssertHeld();
   int num_edits = 0;
   for (const auto& elist : edit_lists) {
@@ -4057,12 +4356,16 @@ Status VersionSet::LogAndApply(
     assert(static_cast<size_t>(num_cfds) == edit_lists.size());
   }
   for (int i = 0; i < num_cfds; ++i) {
+    const auto wcb =
+        manifest_wcbs.empty() ? [](const Status&) {} : manifest_wcbs[i];
     writers.emplace_back(mu, column_family_datas[i],
-                         *mutable_cf_options_list[i], edit_lists[i]);
+                         *mutable_cf_options_list[i], edit_lists[i], wcb);
     manifest_writers_.push_back(&writers[i]);
   }
   assert(!writers.empty());
   ManifestWriter& first_writer = writers.front();
+  TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting",
+                           nullptr);
   while (!first_writer.done && &first_writer != manifest_writers_.front()) {
     first_writer.cv.Wait();
   }
@@ -4074,6 +4377,7 @@ Status VersionSet::LogAndApply(
     for (const auto& writer : writers) {
       assert(writer.done);
     }
+    TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu);
 #endif /* !NDEBUG */
     return first_writer.status;
   }
@@ -4143,9 +4447,11 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
   edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
                                                       : last_sequence_);
 
-  Status s = builder->Apply(edit);
-
-  return s;
+  // The builder can be nullptr only if edit is WAL manipulation,
+  // because WAL edits do not need to be applied to versions,
+  // we return Status::OK() in this case.
+  assert(builder || edit->IsWalManipulation());
+  return builder ? builder->Apply(edit) : Status::OK();
 }
 
 Status VersionSet::ApplyOneVersionEditToBuilder(
@@ -4219,6 +4525,16 @@ Status VersionSet::ApplyOneVersionEditToBuilder(
       return Status::Corruption(
           "Manifest - dropping non-existing column family");
     }
+  } else if (edit.IsWalAddition()) {
+    Status s = wals_.AddWals(edit.GetWalAdditions());
+    if (!s.ok()) {
+      return s;
+    }
+  } else if (edit.IsWalDeletion()) {
+    Status s = wals_.DeleteWalsBefore(edit.GetWalDeletion().GetLogNumber());
+    if (!s.ok()) {
+      return s;
+    }
   } else if (!cf_in_not_found) {
     if (!cf_in_builders) {
       return Status::Corruption(
@@ -4319,24 +4635,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
   if (dbname.back() != '/') {
     manifest_path->push_back('/');
   }
-  *manifest_path += fname;
+  manifest_path->append(fname);
   return Status::OK();
 }
 
 Status VersionSet::ReadAndRecover(
-    log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
+    log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
     const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
     std::unordered_map<int, std::string>& column_families_not_found,
     std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
         builders,
-    VersionEditParams* version_edit_params, std::string* db_id) {
-  assert(reader != nullptr);
+    Status* log_read_status, VersionEditParams* version_edit_params,
+    std::string* db_id) {
   assert(read_buffer != nullptr);
+  assert(log_read_status != nullptr);
   Status s;
   Slice record;
   std::string scratch;
   size_t recovered_edits = 0;
-  while (reader->ReadRecord(&record, &scratch) && s.ok()) {
+  while (s.ok() && reader.ReadRecord(&record, &scratch) &&
+         log_read_status->ok()) {
     VersionEdit edit;
     s = edit.DecodeFrom(record);
     if (!s.ok()) {
@@ -4380,6 +4698,9 @@ Status VersionSet::ReadAndRecover(
       }
     }
   }
+  if (!log_read_status->ok()) {
+    s = *log_read_status;
+  }
   if (!s.ok()) {
     // Clear the buffer if we fail to decode/apply an edit.
     read_buffer->Clear();
@@ -4392,18 +4713,9 @@ Status VersionSet::ReadAndRecover(
 Status VersionSet::Recover(
     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
     std::string* db_id) {
-  std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
-  for (const auto& cf : column_families) {
-    cf_name_to_options.emplace(cf.name, cf.options);
-  }
-  // keeps track of column families in manifest that were not found in
-  // column families parameters. if those column families are not dropped
-  // by subsequent manifest records, Recover() will return failure status
-  std::unordered_map<int, std::string> column_families_not_found;
-
   // Read "CURRENT" file, which contains a pointer to the current manifest file
   std::string manifest_path;
-  Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
+  Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
                                     &manifest_file_number_);
   if (!s.ok()) {
     return s;
@@ -4423,138 +4735,32 @@ Status VersionSet::Recover(
     }
     manifest_file_reader.reset(
         new SequentialFileReader(std::move(manifest_file), manifest_path,
-                                 db_options_->log_readahead_size));
-  }
-
-  std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
-      builders;
-
-  // add default column family
-  auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
-  if (default_cf_iter == cf_name_to_options.end()) {
-    return Status::InvalidArgument("Default column family not specified");
+                                 db_options_->log_readahead_size, io_tracer_));
   }
-  VersionEdit default_cf_edit;
-  default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
-  default_cf_edit.SetColumnFamily(0);
-  ColumnFamilyData* default_cfd =
-      CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
-  // In recovery, nobody else can access it, so it's fine to set it to be
-  // initialized earlier.
-  default_cfd->set_initialized();
-  builders.insert(
-      std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
-                            new BaseReferencedVersionBuilder(default_cfd))));
   uint64_t current_manifest_file_size = 0;
-  VersionEditParams version_edit_params;
+  uint64_t log_number = 0;
   {
     VersionSet::LogReporter reporter;
-    reporter.status = &s;
+    Status log_read_status;
+    reporter.status = &log_read_status;
     log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
                        true /* checksum */, 0 /* log_number */);
-    Slice record;
-    std::string scratch;
-    AtomicGroupReadBuffer read_buffer;
-    s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
-                       column_families_not_found, builders,
-                       &version_edit_params, db_id);
-    current_manifest_file_size = reader.GetReadOffset();
-    assert(current_manifest_file_size != 0);
-  }
-
-  if (s.ok()) {
-    if (!version_edit_params.has_next_file_number_) {
-      s = Status::Corruption("no meta-nextfile entry in descriptor");
-    } else if (!version_edit_params.has_log_number_) {
-      s = Status::Corruption("no meta-lognumber entry in descriptor");
-    } else if (!version_edit_params.has_last_sequence_) {
-      s = Status::Corruption("no last-sequence-number entry in descriptor");
-    }
-
-    if (!version_edit_params.has_prev_log_number_) {
-      version_edit_params.SetPrevLogNumber(0);
-    }
-
-    column_family_set_->UpdateMaxColumnFamily(
-        version_edit_params.max_column_family_);
-
-    // When reading DB generated using old release, min_log_number_to_keep=0.
-    // All log files will be scanned for potential prepare entries.
-    MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
-    MarkFileNumberUsed(version_edit_params.prev_log_number_);
-    MarkFileNumberUsed(version_edit_params.log_number_);
-  }
-
-  // there were some column families in the MANIFEST that weren't specified
-  // in the argument. This is OK in read_only mode
-  if (read_only == false && !column_families_not_found.empty()) {
-    std::string list_of_not_found;
-    for (const auto& cf : column_families_not_found) {
-      list_of_not_found += ", " + cf.second;
-    }
-    list_of_not_found = list_of_not_found.substr(2);
-    s = Status::InvalidArgument(
-        "You have to open all column families. Column families not opened: " +
-        list_of_not_found);
-  }
-
-  if (s.ok()) {
-    for (auto cfd : *column_family_set_) {
-      assert(builders.count(cfd->GetID()) > 0);
-      auto* builder = builders[cfd->GetID()]->version_builder();
-      if (!builder->CheckConsistencyForNumLevels()) {
-        s = Status::InvalidArgument(
-            "db has more levels than options.num_levels");
-        break;
-      }
+    VersionEditHandler handler(
+        read_only, column_families, const_cast<VersionSet*>(this),
+        /*track_missing_files=*/false,
+        /*no_error_if_table_files_missing=*/false, io_tracer_);
+    handler.Iterate(reader, &log_read_status);
+    s = handler.status();
+    if (s.ok()) {
+      log_number = handler.GetVersionEditParams().log_number_;
+      current_manifest_file_size = reader.GetReadOffset();
+      assert(current_manifest_file_size != 0);
+      handler.GetDbId(db_id);
     }
   }
 
   if (s.ok()) {
-    for (auto cfd : *column_family_set_) {
-      if (cfd->IsDropped()) {
-        continue;
-      }
-      if (read_only) {
-        cfd->table_cache()->SetTablesAreImmortal();
-      }
-      assert(cfd->initialized());
-      auto builders_iter = builders.find(cfd->GetID());
-      assert(builders_iter != builders.end());
-      auto builder = builders_iter->second->version_builder();
-
-      // unlimited table cache. Pre-load table handle now.
-      // Need to do it out of the mutex.
-      s = builder->LoadTableHandlers(
-          cfd->internal_stats(), db_options_->max_file_opening_threads,
-          false /* prefetch_index_and_filter_in_cache */,
-          true /* is_initial_load */,
-          cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
-      if (!s.ok()) {
-        if (db_options_->paranoid_checks) {
-          return s;
-        }
-        s = Status::OK();
-      }
-
-      Version* v = new Version(cfd, this, file_options_,
-                               *cfd->GetLatestMutableCFOptions(),
-                               current_version_number_++);
-      builder->SaveTo(v->storage_info());
-
-      // Install recovered version
-      v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
-          !(db_options_->skip_stats_update_on_db_open));
-      AppendVersion(cfd, v);
-    }
-
     manifest_file_size_ = current_manifest_file_size;
-    next_file_number_.store(version_edit_params.next_file_number_ + 1);
-    last_allocated_sequence_ = version_edit_params.last_sequence_;
-    last_published_sequence_ = version_edit_params.last_sequence_;
-    last_sequence_ = version_edit_params.last_sequence_;
-    prev_log_number_ = version_edit_params.prev_log_number_;
-
     ROCKS_LOG_INFO(
         db_options_->info_log,
         "Recovered from manifest file:%s succeeded,"
@@ -4563,9 +4769,8 @@ Status VersionSet::Recover(
         ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
         ",min_log_number_to_keep is %" PRIu64 "\n",
         manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
-        last_sequence_.load(), version_edit_params.log_number_,
-        prev_log_number_, column_family_set_->GetMaxColumnFamily(),
-        min_log_number_to_keep_2pc());
+        last_sequence_.load(), log_number, prev_log_number_,
+        column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
 
     for (auto cfd : *column_family_set_) {
       if (cfd->IsDropped()) {
@@ -4581,6 +4786,148 @@ Status VersionSet::Recover(
   return s;
 }
 
+namespace {
+class ManifestPicker {
+ public:
+  explicit ManifestPicker(const std::string& dbname,
+                          const std::vector<std::string>& files_in_dbname);
+  // REQUIRES Valid() == true
+  std::string GetNextManifest(uint64_t* file_number, std::string* file_name);
+  bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); }
+
+ private:
+  const std::string& dbname_;
+  // MANIFEST file names(s)
+  std::vector<std::string> manifest_files_;
+  std::vector<std::string>::const_iterator manifest_file_iter_;
+};
+
+ManifestPicker::ManifestPicker(const std::string& dbname,
+                               const std::vector<std::string>& files_in_dbname)
+    : dbname_(dbname) {
+  // populate manifest files
+  assert(!files_in_dbname.empty());
+  for (const auto& fname : files_in_dbname) {
+    uint64_t file_num = 0;
+    FileType file_type;
+    bool parse_ok = ParseFileName(fname, &file_num, &file_type);
+    if (parse_ok && file_type == kDescriptorFile) {
+      manifest_files_.push_back(fname);
+    }
+  }
+  // seek to first manifest
+  std::sort(manifest_files_.begin(), manifest_files_.end(),
+            [](const std::string& lhs, const std::string& rhs) {
+              uint64_t num1 = 0;
+              uint64_t num2 = 0;
+              FileType type1;
+              FileType type2;
+              bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
+              bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
+#ifndef NDEBUG
+              assert(parse_ok1);
+              assert(parse_ok2);
+#else
+              (void)parse_ok1;
+              (void)parse_ok2;
+#endif
+              return num1 > num2;
+            });
+  manifest_file_iter_ = manifest_files_.begin();
+}
+
+std::string ManifestPicker::GetNextManifest(uint64_t* number,
+                                            std::string* file_name) {
+  assert(Valid());
+  std::string ret;
+  if (manifest_file_iter_ != manifest_files_.end()) {
+    ret.assign(dbname_);
+    if (ret.back() != kFilePathSeparator) {
+      ret.push_back(kFilePathSeparator);
+    }
+    ret.append(*manifest_file_iter_);
+    if (number) {
+      FileType type;
+      bool parse = ParseFileName(*manifest_file_iter_, number, &type);
+      assert(type == kDescriptorFile);
+#ifndef NDEBUG
+      assert(parse);
+#else
+      (void)parse;
+#endif
+    }
+    if (file_name) {
+      *file_name = *manifest_file_iter_;
+    }
+    ++manifest_file_iter_;
+  }
+  return ret;
+}
+}  // namespace
+
+Status VersionSet::TryRecover(
+    const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
+    const std::vector<std::string>& files_in_dbname, std::string* db_id,
+    bool* has_missing_table_file) {
+  ManifestPicker manifest_picker(dbname_, files_in_dbname);
+  if (!manifest_picker.Valid()) {
+    return Status::Corruption("Cannot locate MANIFEST file in " + dbname_);
+  }
+  Status s;
+  std::string manifest_path =
+      manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
+  while (!manifest_path.empty()) {
+    s = TryRecoverFromOneManifest(manifest_path, column_families, read_only,
+                                  db_id, has_missing_table_file);
+    if (s.ok() || !manifest_picker.Valid()) {
+      break;
+    }
+    Reset();
+    manifest_path =
+        manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
+  }
+  return s;
+}
+
+Status VersionSet::TryRecoverFromOneManifest(
+    const std::string& manifest_path,
+    const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
+    std::string* db_id, bool* has_missing_table_file) {
+  ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n",
+                 manifest_path.c_str());
+  std::unique_ptr<SequentialFileReader> manifest_file_reader;
+  Status s;
+  {
+    std::unique_ptr<FSSequentialFile> manifest_file;
+    s = fs_->NewSequentialFile(manifest_path,
+                               fs_->OptimizeForManifestRead(file_options_),
+                               &manifest_file, nullptr);
+    if (!s.ok()) {
+      return s;
+    }
+    manifest_file_reader.reset(
+        new SequentialFileReader(std::move(manifest_file), manifest_path,
+                                 db_options_->log_readahead_size, io_tracer_));
+  }
+
+  assert(s.ok());
+  VersionSet::LogReporter reporter;
+  reporter.status = &s;
+  log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
+                     /*checksum=*/true, /*log_num=*/0);
+  VersionEditHandlerPointInTime handler_pit(
+      read_only, column_families, const_cast<VersionSet*>(this), io_tracer_);
+
+  handler_pit.Iterate(reader, &s);
+
+  handler_pit.GetDbId(db_id);
+
+  assert(nullptr != has_missing_table_file);
+  *has_missing_table_file = handler_pit.HasMissingFiles();
+
+  return handler_pit.status();
+}
+
 Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
                                       const std::string& dbname,
                                       FileSystem* fs) {
@@ -4603,51 +4950,27 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
     if (!s.ok()) {
       return s;
   }
-  file_reader.reset(new SequentialFileReader(std::move(file), manifest_path));
+  file_reader.reset(new SequentialFileReader(std::move(file), manifest_path,
+                                             nullptr /*IOTracer*/));
   }
 
-  std::map<uint32_t, std::string> column_family_names;
-  // default column family is always implicitly there
-  column_family_names.insert({0, kDefaultColumnFamilyName});
   VersionSet::LogReporter reporter;
   reporter.status = &s;
   log::Reader reader(nullptr, std::move(file_reader), &reporter,
                      true /* checksum */, 0 /* log_number */);
-  Slice record;
-  std::string scratch;
-  while (reader.ReadRecord(&record, &scratch) && s.ok()) {
-    VersionEdit edit;
-    s = edit.DecodeFrom(record);
-    if (!s.ok()) {
-      break;
-    }
-    if (edit.is_column_family_add_) {
-      if (column_family_names.find(edit.column_family_) !=
-          column_family_names.end()) {
-        s = Status::Corruption("Manifest adding the same column family twice");
-        break;
-      }
-      column_family_names.insert(
-          {edit.column_family_, edit.column_family_name_});
-    } else if (edit.is_column_family_drop_) {
-      if (column_family_names.find(edit.column_family_) ==
-          column_family_names.end()) {
-        s = Status::Corruption(
-            "Manifest - dropping non-existing column family");
-        break;
-      }
-      column_family_names.erase(edit.column_family_);
-    }
-  }
 
+  ListColumnFamiliesHandler handler;
+  handler.Iterate(reader, &s);
+
+  assert(column_families);
   column_families->clear();
-  if (s.ok()) {
-    for (const auto& iter : column_family_names) {
+  if (handler.status().ok()) {
+    for (const auto& iter : handler.GetColumnFamilyNames()) {
       column_families->push_back(iter.second);
     }
   }
 
-  return s;
+  return handler.status();
 }
 
 #ifndef ROCKSDB_LITE
@@ -4667,7 +4990,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
   WriteController wc(options->delayed_write_rate);
   WriteBufferManager wb(options->db_write_buffer_size);
   VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
-                      /*block_cache_tracer=*/nullptr);
+                      nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/);
   Status status;
 
   std::vector<ColumnFamilyDescriptor> dummy;
@@ -4720,7 +5043,19 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
   }
 
   if (first_nonempty_level > 0) {
-    new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
+    auto& new_last_level = new_files_list[new_levels - 1];
+
+    new_last_level = vstorage->LevelFiles(first_nonempty_level);
+
+    for (size_t i = 0; i < new_last_level.size(); ++i) {
+      const FileMetaData* const meta = new_last_level[i];
+      assert(meta);
+
+      const uint64_t file_number = meta->fd.GetNumber();
+
+      vstorage->file_locations_[file_number] =
+          VersionStorageInfo::FileLocation(new_levels - 1, i);
+    }
   }
 
   delete[] vstorage -> files_;
@@ -4743,8 +5078,10 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
 // metadata from Manifest to VersionSet before calling this function.
 Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
   // Clean the previously stored checksum information if any.
+  Status s;
   if (checksum_list == nullptr) {
-    return Status::InvalidArgument("checksum_list is nullptr");
+    s = Status::InvalidArgument("checksum_list is nullptr");
+    return s;
   }
   checksum_list->reset();
 
@@ -4755,13 +5092,22 @@ Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
     for (int level = 0; level < cfd->NumberLevels(); level++) {
       for (const auto& file :
            cfd->current()->storage_info()->LevelFiles(level)) {
-        checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
-                                             file->file_checksum,
-                                             file->file_checksum_func_name);
+        s = checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
+                                                 file->file_checksum,
+                                                 file->file_checksum_func_name);
+        if (!s.ok()) {
+          break;
+        }
+      }
+      if (!s.ok()) {
+        break;
       }
     }
+    if (!s.ok()) {
+      break;
+    }
   }
-  return Status::OK();
+  return s;
 }
 
 Status VersionSet::DumpManifest(Options& options, std::string& dscname,
@@ -4771,205 +5117,31 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
   Status s;
   {
     std::unique_ptr<FSSequentialFile> file;
-    s = options.file_system->NewSequentialFile(
+    const std::shared_ptr<FileSystem>& fs = options.env->GetFileSystem();
+    s = fs->NewSequentialFile(
         dscname,
-        options.file_system->OptimizeForManifestRead(file_options_), &file,
+        fs->OptimizeForManifestRead(file_options_), &file,
         nullptr);
     if (!s.ok()) {
       return s;
     }
     file_reader.reset(new SequentialFileReader(
-        std::move(file), dscname, db_options_->log_readahead_size));
+        std::move(file), dscname, db_options_->log_readahead_size, io_tracer_));
   }
 
-  bool have_prev_log_number = false;
-  bool have_next_file = false;
-  bool have_last_sequence = false;
-  uint64_t next_file = 0;
-  uint64_t last_sequence = 0;
-  uint64_t previous_log_number = 0;
-  int count = 0;
-  std::unordered_map<uint32_t, std::string> comparators;
-  std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
-      builders;
-
-  // add default column family
-  VersionEdit default_cf_edit;
-  default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
-  default_cf_edit.SetColumnFamily(0);
-  ColumnFamilyData* default_cfd =
-      CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
-  builders.insert(
-      std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
-                            new BaseReferencedVersionBuilder(default_cfd))));
-
+  std::vector<ColumnFamilyDescriptor> column_families(
+      1, ColumnFamilyDescriptor(kDefaultColumnFamilyName, options));
+  DumpManifestHandler handler(column_families, this, io_tracer_, verbose, hex,
+                              json);
   {
     VersionSet::LogReporter reporter;
     reporter.status = &s;
     log::Reader reader(nullptr, std::move(file_reader), &reporter,
                        true /* checksum */, 0 /* log_number */);
-    Slice record;
-    std::string scratch;
-    while (reader.ReadRecord(&record, &scratch) && s.ok()) {
-      VersionEdit edit;
-      s = edit.DecodeFrom(record);
-      if (!s.ok()) {
-        break;
-      }
-
-      // Write out each individual edit
-      if (verbose && !json) {
-        printf("%s\n", edit.DebugString(hex).c_str());
-      } else if (json) {
-        printf("%s\n", edit.DebugJSON(count, hex).c_str());
-      }
-      count++;
-
-      bool cf_in_builders =
-          builders.find(edit.column_family_) != builders.end();
-
-      if (edit.has_comparator_) {
-        comparators.insert({edit.column_family_, edit.comparator_});
-      }
-
-      ColumnFamilyData* cfd = nullptr;
-
-      if (edit.is_column_family_add_) {
-        if (cf_in_builders) {
-          s = Status::Corruption(
-              "Manifest adding the same column family twice");
-          break;
-        }
-        cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
-        cfd->set_initialized();
-        builders.insert(std::make_pair(
-            edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
-                                     new BaseReferencedVersionBuilder(cfd))));
-      } else if (edit.is_column_family_drop_) {
-        if (!cf_in_builders) {
-          s = Status::Corruption(
-              "Manifest - dropping non-existing column family");
-          break;
-        }
-        auto builder_iter = builders.find(edit.column_family_);
-        builders.erase(builder_iter);
-        comparators.erase(edit.column_family_);
-        cfd = column_family_set_->GetColumnFamily(edit.column_family_);
-        assert(cfd != nullptr);
-        cfd->UnrefAndTryDelete();
-        cfd = nullptr;
-      } else {
-        if (!cf_in_builders) {
-          s = Status::Corruption(
-              "Manifest record referencing unknown column family");
-          break;
-        }
-
-        cfd = column_family_set_->GetColumnFamily(edit.column_family_);
-        // this should never happen since cf_in_builders is true
-        assert(cfd != nullptr);
-
-        // if it is not column family add or column family drop,
-        // then it's a file add/delete, which should be forwarded
-        // to builder
-        auto builder = builders.find(edit.column_family_);
-        assert(builder != builders.end());
-        s = builder->second->version_builder()->Apply(&edit);
-        if (!s.ok()) {
-          break;
-        }
-      }
-
-      if (cfd != nullptr && edit.has_log_number_) {
-        cfd->SetLogNumber(edit.log_number_);
-      }
-
-
-      if (edit.has_prev_log_number_) {
-        previous_log_number = edit.prev_log_number_;
-        have_prev_log_number = true;
-      }
-
-      if (edit.has_next_file_number_) {
-        next_file = edit.next_file_number_;
-        have_next_file = true;
-      }
-
-      if (edit.has_last_sequence_) {
-        last_sequence = edit.last_sequence_;
-        have_last_sequence = true;
-      }
-
-      if (edit.has_max_column_family_) {
-        column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
-      }
-
-      if (edit.has_min_log_number_to_keep_) {
-        MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
-      }
-    }
+    handler.Iterate(reader, &s);
   }
-  file_reader.reset();
 
-  if (s.ok()) {
-    if (!have_next_file) {
-      s = Status::Corruption("no meta-nextfile entry in descriptor");
-      printf("no meta-nextfile entry in descriptor");
-    } else if (!have_last_sequence) {
-      printf("no last-sequence-number entry in descriptor");
-      s = Status::Corruption("no last-sequence-number entry in descriptor");
-    }
-
-    if (!have_prev_log_number) {
-      previous_log_number = 0;
-    }
-  }
-
-  if (s.ok()) {
-    for (auto cfd : *column_family_set_) {
-      if (cfd->IsDropped()) {
-        continue;
-      }
-      auto builders_iter = builders.find(cfd->GetID());
-      assert(builders_iter != builders.end());
-      auto builder = builders_iter->second->version_builder();
-
-      Version* v = new Version(cfd, this, file_options_,
-                               *cfd->GetLatestMutableCFOptions(),
-                               current_version_number_++);
-      builder->SaveTo(v->storage_info());
-      v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
-
-      printf("--------------- Column family \"%s\"  (ID %" PRIu32
-             ") --------------\n",
-             cfd->GetName().c_str(), cfd->GetID());
-      printf("log number: %" PRIu64 "\n", cfd->GetLogNumber());
-      auto comparator = comparators.find(cfd->GetID());
-      if (comparator != comparators.end()) {
-        printf("comparator: %s\n", comparator->second.c_str());
-      } else {
-        printf("comparator: <NO COMPARATOR>\n");
-      }
-      printf("%s \n", v->DebugString(hex).c_str());
-      delete v;
-    }
-
-    next_file_number_.store(next_file + 1);
-    last_allocated_sequence_ = last_sequence;
-    last_published_sequence_ = last_sequence;
-    last_sequence_ = last_sequence;
-    prev_log_number_ = previous_log_number;
-
-    printf("next_file_number %" PRIu64 " last_sequence %" PRIu64
-           "  prev_log_number %" PRIu64 " max_column_family %" PRIu32
-           " min_log_number_to_keep "
-           "%" PRIu64 "\n",
-           next_file_number_.load(), last_sequence, previous_log_number,
-           column_family_set_->GetMaxColumnFamily(),
-           min_log_number_to_keep_2pc());
-  }
-
-  return s;
+  return handler.status();
 }
 #endif  // ROCKSDB_LITE
 
@@ -4990,7 +5162,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
 
 Status VersionSet::WriteCurrentStateToManifest(
     const std::unordered_map<uint32_t, MutableCFState>& curr_state,
-    log::Writer* log) {
+    const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s) {
   // TODO: Break up into multiple records to reduce memory usage on recovery?
 
   // WARNING: This method doesn't hold a mutex!!
@@ -4999,6 +5171,7 @@ Status VersionSet::WriteCurrentStateToManifest(
   // LogAndApply. Column family manipulations can only happen within LogAndApply
   // (the same single thread), so we're safe to iterate.
 
+  assert(io_s.ok());
   if (db_options_->write_dbid_to_manifest) {
     VersionEdit edit_for_db_id;
     assert(!db_id_.empty());
@@ -5008,13 +5181,30 @@ Status VersionSet::WriteCurrentStateToManifest(
       return Status::Corruption("Unable to Encode VersionEdit:" +
                                 edit_for_db_id.DebugString(true));
     }
-    Status add_record = log->AddRecord(db_id_record);
-    if (!add_record.ok()) {
-      return add_record;
+    io_s = log->AddRecord(db_id_record);
+    if (!io_s.ok()) {
+      return io_s;
+    }
+  }
+
+  // Save WALs.
+  if (!wal_additions.GetWalAdditions().empty()) {
+    TEST_SYNC_POINT_CALLBACK("VersionSet::WriteCurrentStateToManifest:SaveWal",
+                             const_cast<VersionEdit*>(&wal_additions));
+    std::string record;
+    if (!wal_additions.EncodeTo(&record)) {
+      return Status::Corruption("Unable to Encode VersionEdit: " +
+                                wal_additions.DebugString(true));
+    }
+    io_s = log->AddRecord(record);
+    if (!io_s.ok()) {
+      return io_s;
     }
   }
 
   for (auto cfd : *column_family_set_) {
+    assert(cfd);
+
     if (cfd->IsDropped()) {
       continue;
     }
@@ -5035,9 +5225,9 @@ Status VersionSet::WriteCurrentStateToManifest(
         return Status::Corruption(
             "Unable to Encode VersionEdit:" + edit.DebugString(true));
       }
-      Status s = log->AddRecord(record);
-      if (!s.ok()) {
-        return s;
+      io_s = log->AddRecord(record);
+      if (!io_s.ok()) {
+        return io_s;
       }
     }
 
@@ -5046,6 +5236,9 @@ Status VersionSet::WriteCurrentStateToManifest(
       VersionEdit edit;
       edit.SetColumnFamily(cfd->GetID());
 
+      assert(cfd->current());
+      assert(cfd->current()->storage_info());
+
       for (int level = 0; level < cfd->NumberLevels(); level++) {
         for (const auto& f :
              cfd->current()->storage_info()->LevelFiles(level)) {
@@ -5057,6 +5250,24 @@ Status VersionSet::WriteCurrentStateToManifest(
                        f->file_checksum, f->file_checksum_func_name);
         }
       }
+
+      const auto& blob_files = cfd->current()->storage_info()->GetBlobFiles();
+      for (const auto& pair : blob_files) {
+        const uint64_t blob_file_number = pair.first;
+        const auto& meta = pair.second;
+
+        assert(meta);
+        assert(blob_file_number == meta->GetBlobFileNumber());
+
+        edit.AddBlobFile(blob_file_number, meta->GetTotalBlobCount(),
+                         meta->GetTotalBlobBytes(), meta->GetChecksumMethod(),
+                         meta->GetChecksumValue());
+        if (meta->GetGarbageBlobCount() > 0) {
+          edit.AddBlobFileGarbage(blob_file_number, meta->GetGarbageBlobCount(),
+                                  meta->GetGarbageBlobBytes());
+        }
+      }
+
       const auto iter = curr_state.find(cfd->GetID());
       assert(iter != curr_state.end());
       uint64_t log_number = iter->second.log_number;
@@ -5066,9 +5277,9 @@ Status VersionSet::WriteCurrentStateToManifest(
         return Status::Corruption(
             "Unable to Encode VersionEdit:" + edit.DebugString(true));
       }
-      Status s = log->AddRecord(record);
-      if (!s.ok()) {
-        return s;
+      io_s = log->AddRecord(record);
+      if (!io_s.ok()) {
+        return io_s;
       }
     }
   }
@@ -5193,7 +5404,8 @@ uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
                         static_cast<uint64_t>(total_full_size * margin)) {
     total_full_size += total_intersecting_size / 2;
   } else {
-    // Estimate for all the first files, at each level
+    // Estimate for all the first files (might also be last files), at each
+    // level
     for (const auto file_ptr : first_files) {
       total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
     }
@@ -5273,61 +5485,79 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
       v->GetMutableCFOptions().prefix_extractor.get());
 }
 
-void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
+void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_table_files,
+                              std::vector<uint64_t>* live_blob_files) const {
+  assert(live_table_files);
+  assert(live_blob_files);
+
   // pre-calculate space requirement
-  int64_t total_files = 0;
+  size_t total_table_files = 0;
+  size_t total_blob_files = 0;
+
+  assert(column_family_set_);
   for (auto cfd : *column_family_set_) {
+    assert(cfd);
+
     if (!cfd->initialized()) {
       continue;
     }
-    Version* dummy_versions = cfd->dummy_versions();
+
+    Version* const dummy_versions = cfd->dummy_versions();
+    assert(dummy_versions);
+
     for (Version* v = dummy_versions->next_; v != dummy_versions;
          v = v->next_) {
+      assert(v);
+
       const auto* vstorage = v->storage_info();
-      for (int level = 0; level < vstorage->num_levels(); level++) {
-        total_files += vstorage->LevelFiles(level).size();
+      assert(vstorage);
+
+      for (int level = 0; level < vstorage->num_levels(); ++level) {
+        total_table_files += vstorage->LevelFiles(level).size();
       }
+
+      total_blob_files += vstorage->GetBlobFiles().size();
     }
   }
 
   // just one time extension to the right size
-  live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
+  live_table_files->reserve(live_table_files->size() + total_table_files);
+  live_blob_files->reserve(live_blob_files->size() + total_blob_files);
 
+  assert(column_family_set_);
   for (auto cfd : *column_family_set_) {
+    assert(cfd);
     if (!cfd->initialized()) {
       continue;
     }
+
     auto* current = cfd->current();
     bool found_current = false;
-    Version* dummy_versions = cfd->dummy_versions();
+
+    Version* const dummy_versions = cfd->dummy_versions();
+    assert(dummy_versions);
+
     for (Version* v = dummy_versions->next_; v != dummy_versions;
          v = v->next_) {
-      v->AddLiveFiles(live_list);
+      v->AddLiveFiles(live_table_files, live_blob_files);
       if (v == current) {
         found_current = true;
       }
     }
+
     if (!found_current && current != nullptr) {
       // Should never happen unless it is a bug.
       assert(false);
-      current->AddLiveFiles(live_list);
+      current->AddLiveFiles(live_table_files, live_blob_files);
     }
   }
 }
 
 InternalIterator* VersionSet::MakeInputIterator(
-    const Compaction* c, RangeDelAggregator* range_del_agg,
+    const ReadOptions& read_options, const Compaction* c,
+    RangeDelAggregator* range_del_agg,
     const FileOptions& file_options_compactions) {
   auto cfd = c->column_family_data();
-  ReadOptions read_options;
-  read_options.verify_checksums = true;
-  read_options.fill_cache = false;
-  // Compaction iterators shouldn't be confined to a single prefix.
-  // Compactions use Seek() for
-  // (a) concurrent compactions,
-  // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
-  read_options.total_order_seek = true;
-
   // Level-0 files have to be merged together.  For other levels,
   // we will make a concatenating iterator per level.
   // TODO(opt): use concatenating iterator for level-0 if there is no overlap
@@ -5343,15 +5573,17 @@ InternalIterator* VersionSet::MakeInputIterator(
         for (size_t i = 0; i < flevel->num_files; i++) {
           list[num++] = cfd->table_cache()->NewIterator(
               read_options, file_options_compactions,
-              cfd->internal_comparator(),
-              *flevel->files[i].file_metadata, range_del_agg,
-              c->mutable_cf_options()->prefix_extractor.get(),
+              cfd->internal_comparator(), *flevel->files[i].file_metadata,
+              range_del_agg, c->mutable_cf_options()->prefix_extractor.get(),
               /*table_reader_ptr=*/nullptr,
               /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
               /*arena=*/nullptr,
-              /*skip_filters=*/false, /*level=*/static_cast<int>(which),
+              /*skip_filters=*/false,
+              /*level=*/static_cast<int>(c->level(which)),
+              MaxFileSizeForL0MetaPin(*c->mutable_cf_options()),
               /*smallest_compaction_key=*/nullptr,
-              /*largest_compaction_key=*/nullptr);
+              /*largest_compaction_key=*/nullptr,
+              /*allow_unprepared_value=*/false);
         }
       } else {
         // Create concatenating iterator for the files from this level
@@ -5362,7 +5594,7 @@ InternalIterator* VersionSet::MakeInputIterator(
             /*should_sample=*/false,
             /*no per level latency histogram=*/nullptr,
             TableReaderCaller::kCompaction, /*skip_filters=*/false,
-            /*level=*/static_cast<int>(which), range_del_agg,
+            /*level=*/static_cast<int>(c->level(which)), range_del_agg,
             c->boundaries(which));
       }
     }
@@ -5490,28 +5722,46 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
 }
 
 void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
+                                  std::vector<ObsoleteBlobFileInfo>* blob_files,
                                   std::vector<std::string>* manifest_filenames,
                                   uint64_t min_pending_output) {
+  assert(files);
+  assert(blob_files);
+  assert(manifest_filenames);
+  assert(files->empty());
+  assert(blob_files->empty());
   assert(manifest_filenames->empty());
-  obsolete_manifests_.swap(*manifest_filenames);
+
   std::vector<ObsoleteFileInfo> pending_files;
   for (auto& f : obsolete_files_) {
     if (f.metadata->fd.GetNumber() < min_pending_output) {
-      files->push_back(std::move(f));
+      files->emplace_back(std::move(f));
     } else {
-      pending_files.push_back(std::move(f));
+      pending_files.emplace_back(std::move(f));
     }
   }
   obsolete_files_.swap(pending_files);
+
+  std::vector<ObsoleteBlobFileInfo> pending_blob_files;
+  for (auto& blob_file : obsolete_blob_files_) {
+    if (blob_file.GetBlobFileNumber() < min_pending_output) {
+      blob_files->emplace_back(std::move(blob_file));
+    } else {
+      pending_blob_files.emplace_back(std::move(blob_file));
+    }
+  }
+  obsolete_blob_files_.swap(pending_blob_files);
+
+  obsolete_manifests_.swap(*manifest_filenames);
 }
 
 ColumnFamilyData* VersionSet::CreateColumnFamily(
-    const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
+    const ColumnFamilyOptions& cf_options, const VersionEdit* edit) {
   assert(edit->is_column_family_add_);
 
   MutableCFOptions dummy_cf_options;
   Version* dummy_versions =
-      new Version(nullptr, this, file_options_, dummy_cf_options);
+      new Version(nullptr, this, file_options_, dummy_cf_options, io_tracer_);
   // Ref() dummy version once so that later we can call Unref() to delete it
   // by avoiding calling "delete" explicitly (~Version is private)
   dummy_versions->Ref();
@@ -5520,7 +5770,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
       cf_options);
 
   Version* v = new Version(new_cfd, this, file_options_,
-                           *new_cfd->GetLatestMutableCFOptions(),
+                           *new_cfd->GetLatestMutableCFOptions(), io_tracer_,
                            current_version_number_++);
 
   // Fill level target base information.
@@ -5561,15 +5811,26 @@ uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
   return total_files_size;
 }
 
-ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
-                                       const ImmutableDBOptions* _db_options,
-                                       const FileOptions& _file_options,
-                                       Cache* table_cache,
-                                       WriteBufferManager* write_buffer_manager,
-                                       WriteController* write_controller)
+Status VersionSet::VerifyFileMetadata(const std::string& fpath,
+                                      const FileMetaData& meta) const {
+  uint64_t fsize = 0;
+  Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr);
+  if (status.ok()) {
+    if (fsize != meta.fd.GetFileSize()) {
+      status = Status::Corruption("File size mismatch: " + fpath);
+    }
+  }
+  return status;
+}
+
+ReactiveVersionSet::ReactiveVersionSet(
+    const std::string& dbname, const ImmutableDBOptions* _db_options,
+    const FileOptions& _file_options, Cache* table_cache,
+    WriteBufferManager* write_buffer_manager, WriteController* write_controller,
+    const std::shared_ptr<IOTracer>& io_tracer)
     : VersionSet(dbname, _db_options, _file_options, table_cache,
                  write_buffer_manager, write_controller,
-                 /*block_cache_tracer=*/nullptr),
+                 /*block_cache_tracer=*/nullptr, io_tracer),
       number_of_edits_to_skip_(0) {}
 
 ReactiveVersionSet::~ReactiveVersionSet() {}
@@ -5601,8 +5862,7 @@ Status ReactiveVersionSet::Recover(
   // In recovery, nobody else can access it, so it's fine to set it to be
   // initialized earlier.
   default_cfd->set_initialized();
-  std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
-      builders;
+  VersionBuilderMap builders;
   std::unordered_map<int, std::string> column_families_not_found;
   builders.insert(
       std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
@@ -5610,7 +5870,7 @@ Status ReactiveVersionSet::Recover(
 
   manifest_reader_status->reset(new Status());
   manifest_reporter->reset(new LogReporter());
-  static_cast<LogReporter*>(manifest_reporter->get())->status =
+  static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
       manifest_reader_status->get();
   Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
   log::Reader* reader = manifest_reader->get();
@@ -5619,10 +5879,9 @@ Status ReactiveVersionSet::Recover(
   VersionEdit version_edit;
   while (s.ok() && retry < 1) {
     assert(reader != nullptr);
-    Slice record;
-    std::string scratch;
-    s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
-                       column_families_not_found, builders, &version_edit);
+    s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options,
+                       column_families_not_found, builders,
+                       manifest_reader_status->get(), &version_edit);
     if (s.ok()) {
       bool enough = version_edit.has_next_file_number_ &&
                     version_edit.has_log_number_ &&
@@ -5649,7 +5908,8 @@ Status ReactiveVersionSet::Recover(
                 cfd->internal_stats(), db_options_->max_file_opening_threads,
                 false /* prefetch_index_and_filter_in_cache */,
                 true /* is_initial_load */,
-                cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
+                cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
+                MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions()));
             if (!s.ok()) {
               enough = false;
               if (s.IsPathNotFound()) {
@@ -5699,15 +5959,25 @@ Status ReactiveVersionSet::Recover(
       auto* builder = builders_iter->second->version_builder();
 
       Version* v = new Version(cfd, this, file_options_,
-                               *cfd->GetLatestMutableCFOptions(),
+                               *cfd->GetLatestMutableCFOptions(), io_tracer_,
                                current_version_number_++);
-      builder->SaveTo(v->storage_info());
+      s = builder->SaveTo(v->storage_info());
 
-      // Install recovered version
-      v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
-                      !(db_options_->skip_stats_update_on_db_open));
-      AppendVersion(cfd, v);
+      if (s.ok()) {
+        // Install recovered version
+        v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
+                        !(db_options_->skip_stats_update_on_db_open));
+        AppendVersion(cfd, v);
+      } else {
+        ROCKS_LOG_ERROR(db_options_->info_log,
+                        "[%s]: inconsistent version: %s\n",
+                        cfd->GetName().c_str(), s.ToString().c_str());
+        delete v;
+        break;
+      }
     }
+  }
+  if (s.ok()) {
     next_file_number_.store(version_edit.next_file_number_ + 1);
     last_allocated_sequence_ = version_edit.last_sequence_;
     last_published_sequence_ = version_edit.last_sequence_;
@@ -5784,6 +6054,8 @@ Status ReactiveVersionSet::ReadAndApply(
         s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
         if (s.ok()) {
           applied_edits++;
+        } else {
+          break;
         }
       }
     }
@@ -5793,13 +6065,14 @@ Status ReactiveVersionSet::ReadAndApply(
     }
     // It's possible that:
     // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
+    //    Or the version(s) rebuilt from tailing the MANIFEST is inconsistent.
     // 2) we have finished reading the current MANIFEST.
     // 3) we have encountered an IOError reading the current MANIFEST.
     // We need to look for the next MANIFEST and start from there. If we cannot
     // find the next MANIFEST, we should exit the loop.
-    s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
+    Status tmp_s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
     reader = manifest_reader->get();
-    if (s.ok()) {
+    if (tmp_s.ok()) {
       if (reader->file()->file_name() == old_manifest_path) {
         // Still processing the same MANIFEST, thus no need to continue this
         // loop since no record is available if we have reached here.
@@ -5829,6 +6102,7 @@ Status ReactiveVersionSet::ReadAndApply(
             number_of_edits_to_skip_ += 2;
           }
         }
+        s = tmp_s;
       }
     }
   }
@@ -5911,7 +6185,8 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
         cfd->internal_stats(), db_options_->max_file_opening_threads,
         false /* prefetch_index_and_filter_in_cache */,
         false /* is_initial_load */,
-        cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
+        cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
+        MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions()));
     TEST_SYNC_POINT_CALLBACK(
         "ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
         "AfterLoadTableHandlers",
@@ -5919,14 +6194,18 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
 
     if (s.ok()) {
       auto version = new Version(cfd, this, file_options_,
-                                 *cfd->GetLatestMutableCFOptions(),
+                                 *cfd->GetLatestMutableCFOptions(), io_tracer_,
                                  current_version_number_++);
-      builder->SaveTo(version->storage_info());
-      version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
-      AppendVersion(cfd, version);
-      active_version_builders_.erase(builder_iter);
-      if (cfds_changed->count(cfd) == 0) {
-        cfds_changed->insert(cfd);
+      s = builder->SaveTo(version->storage_info());
+      if (s.ok()) {
+        version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
+        AppendVersion(cfd, version);
+        active_version_builders_.erase(builder_iter);
+        if (cfds_changed->count(cfd) == 0) {
+          cfds_changed->insert(cfd);
+        }
+      } else {
+        delete version;
       }
     } else if (s.IsPathNotFound()) {
       s = Status::OK();
@@ -5934,23 +6213,25 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
     // Some other error has occurred during LoadTableHandlers.
   }
 
-  if (version_edit->HasNextFile()) {
-    next_file_number_.store(version_edit->next_file_number_ + 1);
-  }
-  if (version_edit->has_last_sequence_) {
-    last_allocated_sequence_ = version_edit->last_sequence_;
-    last_published_sequence_ = version_edit->last_sequence_;
-    last_sequence_ = version_edit->last_sequence_;
-  }
-  if (version_edit->has_prev_log_number_) {
-    prev_log_number_ = version_edit->prev_log_number_;
-    MarkFileNumberUsed(version_edit->prev_log_number_);
-  }
-  if (version_edit->has_log_number_) {
-    MarkFileNumberUsed(version_edit->log_number_);
+  if (s.ok()) {
+    if (version_edit->HasNextFile()) {
+      next_file_number_.store(version_edit->next_file_number_ + 1);
+    }
+    if (version_edit->has_last_sequence_) {
+      last_allocated_sequence_ = version_edit->last_sequence_;
+      last_published_sequence_ = version_edit->last_sequence_;
+      last_sequence_ = version_edit->last_sequence_;
+    }
+    if (version_edit->has_prev_log_number_) {
+      prev_log_number_ = version_edit->prev_log_number_;
+      MarkFileNumberUsed(version_edit->prev_log_number_);
+    }
+    if (version_edit->has_log_number_) {
+      MarkFileNumberUsed(version_edit->log_number_);
+    }
+    column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
+    MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
   }
-  column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
-  MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
   return s;
 }
 
@@ -5961,7 +6242,7 @@ Status ReactiveVersionSet::MaybeSwitchManifest(
   Status s;
   do {
     std::string manifest_path;
-    s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
+    s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
                                &manifest_file_number_);
     std::unique_ptr<FSSequentialFile> manifest_file;
     if (s.ok()) {
@@ -5983,9 +6264,9 @@ Status ReactiveVersionSet::MaybeSwitchManifest(
     }
     std::unique_ptr<SequentialFileReader> manifest_file_reader;
     if (s.ok()) {
-      manifest_file_reader.reset(
-          new SequentialFileReader(std::move(manifest_file), manifest_path,
-                                   db_options_->log_readahead_size));
+      manifest_file_reader.reset(new SequentialFileReader(
+          std::move(manifest_file), manifest_path,
+          db_options_->log_readahead_size, io_tracer_));
       manifest_reader->reset(new log::FragmentBufferedReader(
           nullptr, std::move(manifest_file_reader), reporter,
           true /* checksum */, 0 /* log_number */));