]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/version_builder.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / version_builder.cc
index 49c35cf9f2db3e482ba8ccc6d4726eb069af98b1..2c65dcf719557ad7179591f969fb0f0c5e95ed8f 100644 (file)
@@ -23,6 +23,7 @@
 #include <utility>
 #include <vector>
 
+#include "cache/cache_reservation_manager.h"
 #include "db/blob/blob_file_meta.h"
 #include "db/dbformat.h"
 #include "db/internal_stats.h"
 
 namespace ROCKSDB_NAMESPACE {
 
-bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
-  if (a->fd.largest_seqno != b->fd.largest_seqno) {
-    return a->fd.largest_seqno > b->fd.largest_seqno;
-  }
-  if (a->fd.smallest_seqno != b->fd.smallest_seqno) {
-    return a->fd.smallest_seqno > b->fd.smallest_seqno;
-  }
-  // Break ties by file number
-  return a->fd.GetNumber() > b->fd.GetNumber();
-}
+class VersionBuilder::Rep {
+  class NewestFirstBySeqNo {
+   public:
+    bool operator()(const FileMetaData* lhs, const FileMetaData* rhs) const {
+      assert(lhs);
+      assert(rhs);
 
-namespace {
-bool BySmallestKey(FileMetaData* a, FileMetaData* b,
-                   const InternalKeyComparator* cmp) {
-  int r = cmp->Compare(a->smallest, b->smallest);
-  if (r != 0) {
-    return (r < 0);
-  }
-  // Break ties by file number
-  return (a->fd.GetNumber() < b->fd.GetNumber());
-}
-}  // namespace
+      if (lhs->fd.largest_seqno != rhs->fd.largest_seqno) {
+        return lhs->fd.largest_seqno > rhs->fd.largest_seqno;
+      }
 
-class VersionBuilder::Rep {
- private:
-  // Helper to sort files_ in v
-  // kLevel0 -- NewestFirstBySeqNo
-  // kLevelNon0 -- BySmallestKey
-  struct FileComparator {
-    enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method;
-    const InternalKeyComparator* internal_comparator;
-
-    FileComparator() : internal_comparator(nullptr) {}
-
-    bool operator()(FileMetaData* f1, FileMetaData* f2) const {
-      switch (sort_method) {
-        case kLevel0:
-          return NewestFirstBySeqNo(f1, f2);
-        case kLevelNon0:
-          return BySmallestKey(f1, f2, internal_comparator);
+      if (lhs->fd.smallest_seqno != rhs->fd.smallest_seqno) {
+        return lhs->fd.smallest_seqno > rhs->fd.smallest_seqno;
       }
-      assert(false);
-      return false;
+
+      // Break ties by file number
+      return lhs->fd.GetNumber() > rhs->fd.GetNumber();
+    }
+  };
+
+  class BySmallestKey {
+   public:
+    explicit BySmallestKey(const InternalKeyComparator* cmp) : cmp_(cmp) {}
+
+    bool operator()(const FileMetaData* lhs, const FileMetaData* rhs) const {
+      assert(lhs);
+      assert(rhs);
+      assert(cmp_);
+
+      const int r = cmp_->Compare(lhs->smallest, rhs->smallest);
+      if (r != 0) {
+        return (r < 0);
+      }
+
+      // Break ties by file number
+      return (lhs->fd.GetNumber() < rhs->fd.GetNumber());
     }
+
+   private:
+    const InternalKeyComparator* cmp_;
   };
 
   struct LevelState {
@@ -86,16 +83,14 @@ class VersionBuilder::Rep {
     std::unordered_map<uint64_t, FileMetaData*> added_files;
   };
 
+  // A class that represents the accumulated changes (like additional garbage or
+  // newly linked/unlinked SST files) for a given blob file after applying a
+  // series of VersionEdits.
   class BlobFileMetaDataDelta {
    public:
     bool IsEmpty() const {
-      return !shared_meta_ && !additional_garbage_count_ &&
-             !additional_garbage_bytes_ && newly_linked_ssts_.empty() &&
-             newly_unlinked_ssts_.empty();
-    }
-
-    std::shared_ptr<SharedBlobFileMetaData> GetSharedMeta() const {
-      return shared_meta_;
+      return !additional_garbage_count_ && !additional_garbage_bytes_ &&
+             newly_linked_ssts_.empty() && newly_unlinked_ssts_.empty();
     }
 
     uint64_t GetAdditionalGarbageCount() const {
@@ -114,13 +109,6 @@ class VersionBuilder::Rep {
       return newly_unlinked_ssts_;
     }
 
-    void SetSharedMeta(std::shared_ptr<SharedBlobFileMetaData> shared_meta) {
-      assert(!shared_meta_);
-      assert(shared_meta);
-
-      shared_meta_ = std::move(shared_meta);
-    }
-
     void AddGarbage(uint64_t count, uint64_t bytes) {
       additional_garbage_count_ += count;
       additional_garbage_bytes_ += bytes;
@@ -159,13 +147,91 @@ class VersionBuilder::Rep {
     }
 
    private:
-    std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
     uint64_t additional_garbage_count_ = 0;
     uint64_t additional_garbage_bytes_ = 0;
     std::unordered_set<uint64_t> newly_linked_ssts_;
     std::unordered_set<uint64_t> newly_unlinked_ssts_;
   };
 
+  // A class that represents the state of a blob file after applying a series of
+  // VersionEdits. In addition to the resulting state, it also contains the
+  // delta (see BlobFileMetaDataDelta above). The resulting state can be used to
+  // identify obsolete blob files, while the delta makes it possible to
+  // efficiently detect trivial moves.
+  class MutableBlobFileMetaData {
+   public:
+    // To be used for brand new blob files
+    explicit MutableBlobFileMetaData(
+        std::shared_ptr<SharedBlobFileMetaData>&& shared_meta)
+        : shared_meta_(std::move(shared_meta)) {}
+
+    // To be used for pre-existing blob files
+    explicit MutableBlobFileMetaData(
+        const std::shared_ptr<BlobFileMetaData>& meta)
+        : shared_meta_(meta->GetSharedMeta()),
+          linked_ssts_(meta->GetLinkedSsts()),
+          garbage_blob_count_(meta->GetGarbageBlobCount()),
+          garbage_blob_bytes_(meta->GetGarbageBlobBytes()) {}
+
+    const std::shared_ptr<SharedBlobFileMetaData>& GetSharedMeta() const {
+      return shared_meta_;
+    }
+
+    uint64_t GetBlobFileNumber() const {
+      assert(shared_meta_);
+      return shared_meta_->GetBlobFileNumber();
+    }
+
+    bool HasDelta() const { return !delta_.IsEmpty(); }
+
+    const std::unordered_set<uint64_t>& GetLinkedSsts() const {
+      return linked_ssts_;
+    }
+
+    uint64_t GetGarbageBlobCount() const { return garbage_blob_count_; }
+
+    uint64_t GetGarbageBlobBytes() const { return garbage_blob_bytes_; }
+
+    bool AddGarbage(uint64_t count, uint64_t bytes) {
+      assert(shared_meta_);
+
+      if (garbage_blob_count_ + count > shared_meta_->GetTotalBlobCount() ||
+          garbage_blob_bytes_ + bytes > shared_meta_->GetTotalBlobBytes()) {
+        return false;
+      }
+
+      delta_.AddGarbage(count, bytes);
+
+      garbage_blob_count_ += count;
+      garbage_blob_bytes_ += bytes;
+
+      return true;
+    }
+
+    void LinkSst(uint64_t sst_file_number) {
+      delta_.LinkSst(sst_file_number);
+
+      assert(linked_ssts_.find(sst_file_number) == linked_ssts_.end());
+      linked_ssts_.emplace(sst_file_number);
+    }
+
+    void UnlinkSst(uint64_t sst_file_number) {
+      delta_.UnlinkSst(sst_file_number);
+
+      assert(linked_ssts_.find(sst_file_number) != linked_ssts_.end());
+      linked_ssts_.erase(sst_file_number);
+    }
+
+   private:
+    std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
+    // Accumulated changes
+    BlobFileMetaDataDelta delta_;
+    // Resulting state after applying the changes
+    BlobFileMetaData::LinkedSsts linked_ssts_;
+    uint64_t garbage_blob_count_ = 0;
+    uint64_t garbage_blob_bytes_ = 0;
+  };
+
   const FileOptions& file_options_;
   const ImmutableCFOptions* const ioptions_;
   TableCache* table_cache_;
@@ -183,30 +249,34 @@ class VersionBuilder::Rep {
   bool has_invalid_levels_;
   // Current levels of table files affected by additions/deletions.
   std::unordered_map<uint64_t, int> table_file_levels_;
-  FileComparator level_zero_cmp_;
-  FileComparator level_nonzero_cmp_;
+  // Current compact cursors that should be changed after the last compaction
+  std::unordered_map<int, InternalKey> updated_compact_cursors_;
+  NewestFirstBySeqNo level_zero_cmp_;
+  BySmallestKey level_nonzero_cmp_;
+
+  // Mutable metadata objects for all blob files affected by the series of
+  // version edits.
+  std::map<uint64_t, MutableBlobFileMetaData> mutable_blob_file_metas_;
 
-  // Metadata delta for all blob files affected by the series of version edits.
-  std::map<uint64_t, BlobFileMetaDataDelta> blob_file_meta_deltas_;
+  std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
 
  public:
   Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
       TableCache* table_cache, VersionStorageInfo* base_vstorage,
-      VersionSet* version_set)
+      VersionSet* version_set,
+      std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr)
       : file_options_(file_options),
         ioptions_(ioptions),
         table_cache_(table_cache),
         base_vstorage_(base_vstorage),
         version_set_(version_set),
         num_levels_(base_vstorage->num_levels()),
-        has_invalid_levels_(false) {
+        has_invalid_levels_(false),
+        level_nonzero_cmp_(base_vstorage_->InternalComparator()),
+        file_metadata_cache_res_mgr_(file_metadata_cache_res_mgr) {
     assert(ioptions_);
 
     levels_ = new LevelState[num_levels_];
-    level_zero_cmp_.sort_method = FileComparator::kLevel0;
-    level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
-    level_nonzero_cmp_.internal_comparator =
-        base_vstorage_->InternalComparator();
   }
 
   ~Rep() {
@@ -228,33 +298,20 @@ class VersionBuilder::Rep {
         table_cache_->ReleaseHandle(f->table_reader_handle);
         f->table_reader_handle = nullptr;
       }
-      delete f;
-    }
-  }
 
-  bool IsBlobFileInVersion(uint64_t blob_file_number) const {
-    auto delta_it = blob_file_meta_deltas_.find(blob_file_number);
-    if (delta_it != blob_file_meta_deltas_.end()) {
-      if (delta_it->second.GetSharedMeta()) {
-        return true;
+      if (file_metadata_cache_res_mgr_) {
+        Status s = file_metadata_cache_res_mgr_->UpdateCacheReservation(
+            f->ApproximateMemoryUsage(), false /* increase */);
+        s.PermitUncheckedError();
       }
+      delete f;
     }
-
-    assert(base_vstorage_);
-
-    const auto& base_blob_files = base_vstorage_->GetBlobFiles();
-
-    auto base_it = base_blob_files.find(blob_file_number);
-    if (base_it != base_blob_files.end()) {
-      assert(base_it->second);
-      assert(base_it->second->GetSharedMeta());
-
-      return true;
-    }
-
-    return false;
   }
 
+  // Mapping used for checking the consistency of links between SST files and
+  // blob files. It is built using the forward links (table file -> blob file),
+  // and is subsequently compared with the inverse mapping stored in the
+  // BlobFileMetaData objects.
   using ExpectedLinkedSsts =
       std::unordered_map<uint64_t, BlobFileMetaData::LinkedSsts>;
 
@@ -270,93 +327,161 @@ class VersionBuilder::Rep {
     (*expected_linked_ssts)[blob_file_number].emplace(table_file_number);
   }
 
-  Status CheckConsistencyDetails(VersionStorageInfo* vstorage) {
-    // Make sure the files are sorted correctly and that the links between
-    // table files and blob files are consistent. The latter is checked using
-    // the following mapping, which is built using the forward links
-    // (table file -> blob file), and is subsequently compared with the inverse
-    // mapping stored in the BlobFileMetaData objects.
-    ExpectedLinkedSsts expected_linked_ssts;
+  template <typename Checker>
+  Status CheckConsistencyDetailsForLevel(
+      const VersionStorageInfo* vstorage, int level, Checker checker,
+      const std::string& sync_point,
+      ExpectedLinkedSsts* expected_linked_ssts) const {
+#ifdef NDEBUG
+    (void)sync_point;
+#endif
 
-    for (int level = 0; level < num_levels_; level++) {
-      auto& level_files = vstorage->LevelFiles(level);
+    assert(vstorage);
+    assert(level >= 0 && level < num_levels_);
+    assert(expected_linked_ssts);
 
-      if (level_files.empty()) {
-        continue;
-      }
+    const auto& level_files = vstorage->LevelFiles(level);
+
+    if (level_files.empty()) {
+      return Status::OK();
+    }
+
+    assert(level_files[0]);
+    UpdateExpectedLinkedSsts(level_files[0]->fd.GetNumber(),
+                             level_files[0]->oldest_blob_file_number,
+                             expected_linked_ssts);
+
+    for (size_t i = 1; i < level_files.size(); ++i) {
+      assert(level_files[i]);
+      UpdateExpectedLinkedSsts(level_files[i]->fd.GetNumber(),
+                               level_files[i]->oldest_blob_file_number,
+                               expected_linked_ssts);
+
+      auto lhs = level_files[i - 1];
+      auto rhs = level_files[i];
 
-      assert(level_files[0]);
-      UpdateExpectedLinkedSsts(level_files[0]->fd.GetNumber(),
-                               level_files[0]->oldest_blob_file_number,
-                               &expected_linked_ssts);
-      for (size_t i = 1; i < level_files.size(); i++) {
-        assert(level_files[i]);
-        UpdateExpectedLinkedSsts(level_files[i]->fd.GetNumber(),
-                                 level_files[i]->oldest_blob_file_number,
-                                 &expected_linked_ssts);
-
-        auto f1 = level_files[i - 1];
-        auto f2 = level_files[i];
-        if (level == 0) {
 #ifndef NDEBUG
-          auto pair = std::make_pair(&f1, &f2);
-          TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency0", &pair);
+      auto pair = std::make_pair(&lhs, &rhs);
+      TEST_SYNC_POINT_CALLBACK(sync_point, &pair);
 #endif
-          if (!level_zero_cmp_(f1, f2)) {
-            return Status::Corruption("L0 files are not sorted properly");
+
+      const Status s = checker(lhs, rhs);
+      if (!s.ok()) {
+        return s;
+      }
+    }
+
+    return Status::OK();
+  }
+
+  // Make sure table files are sorted correctly and that the links between
+  // table files and blob files are consistent.
+  Status CheckConsistencyDetails(const VersionStorageInfo* vstorage) const {
+    assert(vstorage);
+
+    ExpectedLinkedSsts expected_linked_ssts;
+
+    if (num_levels_ > 0) {
+      // Check L0
+      {
+        auto l0_checker = [this](const FileMetaData* lhs,
+                                 const FileMetaData* rhs) {
+          assert(lhs);
+          assert(rhs);
+
+          if (!level_zero_cmp_(lhs, rhs)) {
+            std::ostringstream oss;
+            oss << "L0 files are not sorted properly: files #"
+                << lhs->fd.GetNumber() << ", #" << rhs->fd.GetNumber();
+
+            return Status::Corruption("VersionBuilder", oss.str());
           }
 
-          if (f2->fd.smallest_seqno == f2->fd.largest_seqno) {
+          if (rhs->fd.smallest_seqno == rhs->fd.largest_seqno) {
             // This is an external file that we ingested
-            SequenceNumber external_file_seqno = f2->fd.smallest_seqno;
-            if (!(external_file_seqno < f1->fd.largest_seqno ||
+            const SequenceNumber external_file_seqno = rhs->fd.smallest_seqno;
+
+            if (!(external_file_seqno < lhs->fd.largest_seqno ||
                   external_file_seqno == 0)) {
-              return Status::Corruption(
-                  "L0 file with seqno " +
-                  NumberToString(f1->fd.smallest_seqno) + " " +
-                  NumberToString(f1->fd.largest_seqno) +
-                  " vs. file with global_seqno" +
-                  NumberToString(external_file_seqno) + " with fileNumber " +
-                  NumberToString(f1->fd.GetNumber()));
+              std::ostringstream oss;
+              oss << "L0 file #" << lhs->fd.GetNumber() << " with seqno "
+                  << lhs->fd.smallest_seqno << ' ' << lhs->fd.largest_seqno
+                  << " vs. file #" << rhs->fd.GetNumber()
+                  << " with global_seqno " << external_file_seqno;
+
+              return Status::Corruption("VersionBuilder", oss.str());
             }
-          } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
-            return Status::Corruption(
-                "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) +
-                " " + NumberToString(f1->fd.largest_seqno) + " " +
-                NumberToString(f1->fd.GetNumber()) + " vs. " +
-                NumberToString(f2->fd.smallest_seqno) + " " +
-                NumberToString(f2->fd.largest_seqno) + " " +
-                NumberToString(f2->fd.GetNumber()));
+          } else if (lhs->fd.smallest_seqno <= rhs->fd.smallest_seqno) {
+            std::ostringstream oss;
+            oss << "L0 file #" << lhs->fd.GetNumber() << " with seqno "
+                << lhs->fd.smallest_seqno << ' ' << lhs->fd.largest_seqno
+                << " vs. file #" << rhs->fd.GetNumber() << " with seqno "
+                << rhs->fd.smallest_seqno << ' ' << rhs->fd.largest_seqno;
+
+            return Status::Corruption("VersionBuilder", oss.str());
           }
-        } else {
-#ifndef NDEBUG
-          auto pair = std::make_pair(&f1, &f2);
-          TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency1", &pair);
-#endif
-          if (!level_nonzero_cmp_(f1, f2)) {
-            return Status::Corruption("L" + NumberToString(level) +
-                                      " files are not sorted properly");
+
+          return Status::OK();
+        };
+
+        const Status s = CheckConsistencyDetailsForLevel(
+            vstorage, /* level */ 0, l0_checker,
+            "VersionBuilder::CheckConsistency0", &expected_linked_ssts);
+        if (!s.ok()) {
+          return s;
+        }
+      }
+
+      // Check L1 and up
+      const InternalKeyComparator* const icmp = vstorage->InternalComparator();
+      assert(icmp);
+
+      for (int level = 1; level < num_levels_; ++level) {
+        auto checker = [this, level, icmp](const FileMetaData* lhs,
+                                           const FileMetaData* rhs) {
+          assert(lhs);
+          assert(rhs);
+
+          if (!level_nonzero_cmp_(lhs, rhs)) {
+            std::ostringstream oss;
+            oss << 'L' << level << " files are not sorted properly: files #"
+                << lhs->fd.GetNumber() << ", #" << rhs->fd.GetNumber();
+
+            return Status::Corruption("VersionBuilder", oss.str());
           }
 
-          // Make sure there is no overlap in levels > 0
-          if (vstorage->InternalComparator()->Compare(f1->largest,
-                                                      f2->smallest) >= 0) {
-            return Status::Corruption(
-                "L" + NumberToString(level) + " have overlapping ranges " +
-                (f1->largest).DebugString(true) + " vs. " +
-                (f2->smallest).DebugString(true));
+          // Make sure there is no overlap in level
+          if (icmp->Compare(lhs->largest, rhs->smallest) >= 0) {
+            std::ostringstream oss;
+            oss << 'L' << level << " has overlapping ranges: file #"
+                << lhs->fd.GetNumber()
+                << " largest key: " << lhs->largest.DebugString(true)
+                << " vs. file #" << rhs->fd.GetNumber()
+                << " smallest key: " << rhs->smallest.DebugString(true);
+
+            return Status::Corruption("VersionBuilder", oss.str());
           }
+
+          return Status::OK();
+        };
+
+        const Status s = CheckConsistencyDetailsForLevel(
+            vstorage, level, checker, "VersionBuilder::CheckConsistency1",
+            &expected_linked_ssts);
+        if (!s.ok()) {
+          return s;
         }
       }
     }
 
-    // Make sure that all blob files in the version have non-garbage data.
+    // Make sure that all blob files in the version have non-garbage data and
+    // the links between them and the table files are consistent.
     const auto& blob_files = vstorage->GetBlobFiles();
-    for (const auto& pair : blob_files) {
-      const uint64_t blob_file_number = pair.first;
-      const auto& blob_file_meta = pair.second;
+    for (const auto& blob_file_meta : blob_files) {
       assert(blob_file_meta);
 
+      const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
+
       if (blob_file_meta->GetGarbageBlobCount() >=
           blob_file_meta->GetTotalBlobCount()) {
         std::ostringstream oss;
@@ -382,7 +507,9 @@ class VersionBuilder::Rep {
     return ret_s;
   }
 
-  Status CheckConsistency(VersionStorageInfo* vstorage) {
+  Status CheckConsistency(const VersionStorageInfo* vstorage) const {
+    assert(vstorage);
+
     // Always run consistency checks in debug build
 #ifdef NDEBUG
     if (!vstorage->force_consistency_checks()) {
@@ -422,6 +549,38 @@ class VersionBuilder::Rep {
     return true;
   }
 
+  bool IsBlobFileInVersion(uint64_t blob_file_number) const {
+    auto mutable_it = mutable_blob_file_metas_.find(blob_file_number);
+    if (mutable_it != mutable_blob_file_metas_.end()) {
+      return true;
+    }
+
+    assert(base_vstorage_);
+    const auto meta = base_vstorage_->GetBlobFileMetaData(blob_file_number);
+
+    return !!meta;
+  }
+
+  MutableBlobFileMetaData* GetOrCreateMutableBlobFileMetaData(
+      uint64_t blob_file_number) {
+    auto mutable_it = mutable_blob_file_metas_.find(blob_file_number);
+    if (mutable_it != mutable_blob_file_metas_.end()) {
+      return &mutable_it->second;
+    }
+
+    assert(base_vstorage_);
+    const auto meta = base_vstorage_->GetBlobFileMetaData(blob_file_number);
+
+    if (meta) {
+      mutable_it = mutable_blob_file_metas_
+                       .emplace(blob_file_number, MutableBlobFileMetaData(meta))
+                       .first;
+      return &mutable_it->second;
+    }
+
+    return nullptr;
+  }
+
   Status ApplyBlobFileAddition(const BlobFileAddition& blob_file_addition) {
     const uint64_t blob_file_number = blob_file_addition.GetBlobFileNumber();
 
@@ -456,8 +615,8 @@ class VersionBuilder::Rep {
         blob_file_addition.GetChecksumMethod(),
         blob_file_addition.GetChecksumValue(), deleter);
 
-    blob_file_meta_deltas_[blob_file_number].SetSharedMeta(
-        std::move(shared_meta));
+    mutable_blob_file_metas_.emplace(
+        blob_file_number, MutableBlobFileMetaData(std::move(shared_meta)));
 
     return Status::OK();
   }
@@ -465,16 +624,22 @@ class VersionBuilder::Rep {
   Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
     const uint64_t blob_file_number = blob_file_garbage.GetBlobFileNumber();
 
-    if (!IsBlobFileInVersion(blob_file_number)) {
+    MutableBlobFileMetaData* const mutable_meta =
+        GetOrCreateMutableBlobFileMetaData(blob_file_number);
+
+    if (!mutable_meta) {
       std::ostringstream oss;
       oss << "Blob file #" << blob_file_number << " not found";
 
       return Status::Corruption("VersionBuilder", oss.str());
     }
 
-    blob_file_meta_deltas_[blob_file_number].AddGarbage(
-        blob_file_garbage.GetGarbageBlobCount(),
-        blob_file_garbage.GetGarbageBlobBytes());
+    if (!mutable_meta->AddGarbage(blob_file_garbage.GetGarbageBlobCount(),
+                                  blob_file_garbage.GetGarbageBlobBytes())) {
+      std::ostringstream oss;
+      oss << "Garbage overflow for blob file #" << blob_file_number;
+      return Status::Corruption("VersionBuilder", oss.str());
+    }
 
     return Status::OK();
   }
@@ -547,9 +712,12 @@ class VersionBuilder::Rep {
     const uint64_t blob_file_number =
         GetOldestBlobFileNumberForTableFile(level, file_number);
 
-    if (blob_file_number != kInvalidBlobFileNumber &&
-        IsBlobFileInVersion(blob_file_number)) {
-      blob_file_meta_deltas_[blob_file_number].UnlinkSst(file_number);
+    if (blob_file_number != kInvalidBlobFileNumber) {
+      MutableBlobFileMetaData* const mutable_meta =
+          GetOrCreateMutableBlobFileMetaData(blob_file_number);
+      if (mutable_meta) {
+        mutable_meta->UnlinkSst(file_number);
+      }
     }
 
     auto& level_state = levels_[level];
@@ -608,15 +776,34 @@ class VersionBuilder::Rep {
     FileMetaData* const f = new FileMetaData(meta);
     f->refs = 1;
 
+    if (file_metadata_cache_res_mgr_) {
+      Status s = file_metadata_cache_res_mgr_->UpdateCacheReservation(
+          f->ApproximateMemoryUsage(), true /* increase */);
+      if (!s.ok()) {
+        delete f;
+        s = Status::MemoryLimit(
+            "Can't allocate " +
+            kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
+                CacheEntryRole::kFileMetadata)] +
+            " due to exceeding the memory limit "
+            "based on "
+            "cache capacity");
+        return s;
+      }
+    }
+
     auto& add_files = level_state.added_files;
     assert(add_files.find(file_number) == add_files.end());
     add_files.emplace(file_number, f);
 
     const uint64_t blob_file_number = f->oldest_blob_file_number;
 
-    if (blob_file_number != kInvalidBlobFileNumber &&
-        IsBlobFileInVersion(blob_file_number)) {
-      blob_file_meta_deltas_[blob_file_number].LinkSst(file_number);
+    if (blob_file_number != kInvalidBlobFileNumber) {
+      MutableBlobFileMetaData* const mutable_meta =
+          GetOrCreateMutableBlobFileMetaData(blob_file_number);
+      if (mutable_meta) {
+        mutable_meta->LinkSst(file_number);
+      }
     }
 
     table_file_levels_[file_number] = level;
@@ -624,8 +811,24 @@ class VersionBuilder::Rep {
     return Status::OK();
   }
 
+  Status ApplyCompactCursors(int level,
+                             const InternalKey& smallest_uncompacted_key) {
+    if (level < 0) {
+      std::ostringstream oss;
+      oss << "Cannot add compact cursor (" << level << ","
+          << smallest_uncompacted_key.Encode().ToString()
+          << " due to invalid level (level = " << level << ")";
+      return Status::Corruption("VersionBuilder", oss.str());
+    }
+    if (level < num_levels_) {
+      // Omit levels (>= num_levels_) when re-open with shrinking num_levels_
+      updated_compact_cursors_[level] = smallest_uncompacted_key;
+    }
+    return Status::OK();
+  }
+
   // Apply all of the edits in *edit to the current state.
-  Status Apply(VersionEdit* edit) {
+  Status Apply(const VersionEdit* edit) {
     {
       const Status s = CheckConsistency(base_vstorage_);
       if (!s.ok()) {
@@ -675,229 +878,342 @@ class VersionBuilder::Rep {
       }
     }
 
+    // Populate compact cursors for round-robin compaction, leave
+    // the cursor to be empty to indicate it is invalid
+    for (const auto& cursor : edit->GetCompactCursors()) {
+      const int level = cursor.first;
+      const InternalKey smallest_uncompacted_key = cursor.second;
+      const Status s = ApplyCompactCursors(level, smallest_uncompacted_key);
+      if (!s.ok()) {
+        return s;
+      }
+    }
     return Status::OK();
   }
 
-  static BlobFileMetaData::LinkedSsts ApplyLinkedSstChanges(
-      const BlobFileMetaData::LinkedSsts& base,
-      const std::unordered_set<uint64_t>& newly_linked,
-      const std::unordered_set<uint64_t>& newly_unlinked) {
-    BlobFileMetaData::LinkedSsts result(base);
+  // Helper function template for merging the blob file metadata from the base
+  // version with the mutable metadata representing the state after applying the
+  // edits. The function objects process_base and process_mutable are
+  // respectively called to handle a base version object when there is no
+  // matching mutable object, and a mutable object when there is no matching
+  // base version object. process_both is called to perform the merge when a
+  // given blob file appears both in the base version and the mutable list. The
+  // helper stops processing objects if a function object returns false. Blob
+  // files with a file number below first_blob_file are not processed.
+  template <typename ProcessBase, typename ProcessMutable, typename ProcessBoth>
+  void MergeBlobFileMetas(uint64_t first_blob_file, ProcessBase process_base,
+                          ProcessMutable process_mutable,
+                          ProcessBoth process_both) const {
+    assert(base_vstorage_);
+
+    auto base_it = base_vstorage_->GetBlobFileMetaDataLB(first_blob_file);
+    const auto base_it_end = base_vstorage_->GetBlobFiles().end();
+
+    auto mutable_it = mutable_blob_file_metas_.lower_bound(first_blob_file);
+    const auto mutable_it_end = mutable_blob_file_metas_.end();
+
+    while (base_it != base_it_end && mutable_it != mutable_it_end) {
+      const auto& base_meta = *base_it;
+      assert(base_meta);
+
+      const uint64_t base_blob_file_number = base_meta->GetBlobFileNumber();
+      const uint64_t mutable_blob_file_number = mutable_it->first;
+
+      if (base_blob_file_number < mutable_blob_file_number) {
+        if (!process_base(base_meta)) {
+          return;
+        }
+
+        ++base_it;
+      } else if (mutable_blob_file_number < base_blob_file_number) {
+        const auto& mutable_meta = mutable_it->second;
+
+        if (!process_mutable(mutable_meta)) {
+          return;
+        }
+
+        ++mutable_it;
+      } else {
+        assert(base_blob_file_number == mutable_blob_file_number);
 
-    for (uint64_t sst_file_number : newly_unlinked) {
-      assert(result.find(sst_file_number) != result.end());
+        const auto& mutable_meta = mutable_it->second;
+
+        if (!process_both(base_meta, mutable_meta)) {
+          return;
+        }
 
-      result.erase(sst_file_number);
+        ++base_it;
+        ++mutable_it;
+      }
     }
 
-    for (uint64_t sst_file_number : newly_linked) {
-      assert(result.find(sst_file_number) == result.end());
+    while (base_it != base_it_end) {
+      const auto& base_meta = *base_it;
 
-      result.emplace(sst_file_number);
+      if (!process_base(base_meta)) {
+        return;
+      }
+
+      ++base_it;
     }
 
-    return result;
+    while (mutable_it != mutable_it_end) {
+      const auto& mutable_meta = mutable_it->second;
+
+      if (!process_mutable(mutable_meta)) {
+        return;
+      }
+
+      ++mutable_it;
+    }
   }
 
-  static std::shared_ptr<BlobFileMetaData> CreateMetaDataForNewBlobFile(
-      const BlobFileMetaDataDelta& delta) {
-    auto shared_meta = delta.GetSharedMeta();
-    assert(shared_meta);
+  // Helper function template for finding the first blob file that has linked
+  // SSTs.
+  template <typename Meta>
+  static bool CheckLinkedSsts(const Meta& meta,
+                              uint64_t* min_oldest_blob_file_num) {
+    assert(min_oldest_blob_file_num);
+
+    if (!meta.GetLinkedSsts().empty()) {
+      assert(*min_oldest_blob_file_num == kInvalidBlobFileNumber);
 
-    assert(delta.GetNewlyUnlinkedSsts().empty());
+      *min_oldest_blob_file_num = meta.GetBlobFileNumber();
 
-    auto meta = BlobFileMetaData::Create(
-        std::move(shared_meta), delta.GetNewlyLinkedSsts(),
-        delta.GetAdditionalGarbageCount(), delta.GetAdditionalGarbageBytes());
+      return false;
+    }
 
-    return meta;
+    return true;
   }
 
-  static std::shared_ptr<BlobFileMetaData>
-  GetOrCreateMetaDataForExistingBlobFile(
-      const std::shared_ptr<BlobFileMetaData>& base_meta,
-      const BlobFileMetaDataDelta& delta) {
-    assert(base_meta);
-    assert(!delta.GetSharedMeta());
+  // Find the oldest blob file that has linked SSTs.
+  uint64_t GetMinOldestBlobFileNumber() const {
+    uint64_t min_oldest_blob_file_num = kInvalidBlobFileNumber;
 
-    if (delta.IsEmpty()) {
-      return base_meta;
-    }
+    auto process_base =
+        [&min_oldest_blob_file_num](
+            const std::shared_ptr<BlobFileMetaData>& base_meta) {
+          assert(base_meta);
 
-    auto shared_meta = base_meta->GetSharedMeta();
-    assert(shared_meta);
+          return CheckLinkedSsts(*base_meta, &min_oldest_blob_file_num);
+        };
 
-    auto linked_ssts = ApplyLinkedSstChanges(base_meta->GetLinkedSsts(),
-                                             delta.GetNewlyLinkedSsts(),
-                                             delta.GetNewlyUnlinkedSsts());
+    auto process_mutable = [&min_oldest_blob_file_num](
+                               const MutableBlobFileMetaData& mutable_meta) {
+      return CheckLinkedSsts(mutable_meta, &min_oldest_blob_file_num);
+    };
+
+    auto process_both = [&min_oldest_blob_file_num](
+                            const std::shared_ptr<BlobFileMetaData>& base_meta,
+                            const MutableBlobFileMetaData& mutable_meta) {
+#ifndef NDEBUG
+      assert(base_meta);
+      assert(base_meta->GetSharedMeta() == mutable_meta.GetSharedMeta());
+#else
+      (void)base_meta;
+#endif
+
+      // Look at mutable_meta since it supersedes *base_meta
+      return CheckLinkedSsts(mutable_meta, &min_oldest_blob_file_num);
+    };
 
-    auto meta = BlobFileMetaData::Create(
-        std::move(shared_meta), std::move(linked_ssts),
-        base_meta->GetGarbageBlobCount() + delta.GetAdditionalGarbageCount(),
-        base_meta->GetGarbageBlobBytes() + delta.GetAdditionalGarbageBytes());
+    MergeBlobFileMetas(kInvalidBlobFileNumber, process_base, process_mutable,
+                       process_both);
 
-    return meta;
+    return min_oldest_blob_file_num;
+  }
+
+  static std::shared_ptr<BlobFileMetaData> CreateBlobFileMetaData(
+      const MutableBlobFileMetaData& mutable_meta) {
+    return BlobFileMetaData::Create(
+        mutable_meta.GetSharedMeta(), mutable_meta.GetLinkedSsts(),
+        mutable_meta.GetGarbageBlobCount(), mutable_meta.GetGarbageBlobBytes());
   }
 
   // Add the blob file specified by meta to *vstorage if it is determined to
-  // contain valid data (blobs). We make this decision based on the amount
-  // of garbage in the file, and whether the file or any lower-numbered blob
-  // files have any linked SSTs. The latter condition is tracked using the
-  // flag *found_first_non_empty.
-  void AddBlobFileIfNeeded(VersionStorageInfo* vstorage,
-                           const std::shared_ptr<BlobFileMetaData>& meta,
-                           bool* found_first_non_empty) const {
+  // contain valid data (blobs).
+  template <typename Meta>
+  static void AddBlobFileIfNeeded(VersionStorageInfo* vstorage, Meta&& meta) {
     assert(vstorage);
     assert(meta);
-    assert(found_first_non_empty);
 
-    if (!meta->GetLinkedSsts().empty()) {
-      (*found_first_non_empty) = true;
-    } else if (!(*found_first_non_empty) ||
-               meta->GetGarbageBlobCount() >= meta->GetTotalBlobCount()) {
+    if (meta->GetLinkedSsts().empty() &&
+        meta->GetGarbageBlobCount() >= meta->GetTotalBlobCount()) {
       return;
     }
 
-    vstorage->AddBlobFile(meta);
+    vstorage->AddBlobFile(std::forward<Meta>(meta));
   }
 
   // Merge the blob file metadata from the base version with the changes (edits)
   // applied, and save the result into *vstorage.
   void SaveBlobFilesTo(VersionStorageInfo* vstorage) const {
-    assert(base_vstorage_);
     assert(vstorage);
 
-    bool found_first_non_empty = false;
+    assert(base_vstorage_);
+    vstorage->ReserveBlob(base_vstorage_->GetBlobFiles().size() +
+                          mutable_blob_file_metas_.size());
 
-    const auto& base_blob_files = base_vstorage_->GetBlobFiles();
-    auto base_it = base_blob_files.begin();
-    const auto base_it_end = base_blob_files.end();
+    const uint64_t oldest_blob_file_with_linked_ssts =
+        GetMinOldestBlobFileNumber();
 
-    auto delta_it = blob_file_meta_deltas_.begin();
-    const auto delta_it_end = blob_file_meta_deltas_.end();
+    auto process_base =
+        [vstorage](const std::shared_ptr<BlobFileMetaData>& base_meta) {
+          assert(base_meta);
 
-    while (base_it != base_it_end && delta_it != delta_it_end) {
-      const uint64_t base_blob_file_number = base_it->first;
-      const uint64_t delta_blob_file_number = delta_it->first;
+          AddBlobFileIfNeeded(vstorage, base_meta);
 
-      if (base_blob_file_number < delta_blob_file_number) {
-        const auto& base_meta = base_it->second;
+          return true;
+        };
 
-        AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
+    auto process_mutable =
+        [vstorage](const MutableBlobFileMetaData& mutable_meta) {
+          AddBlobFileIfNeeded(vstorage, CreateBlobFileMetaData(mutable_meta));
 
-        ++base_it;
-      } else if (delta_blob_file_number < base_blob_file_number) {
-        const auto& delta = delta_it->second;
+          return true;
+        };
 
-        auto meta = CreateMetaDataForNewBlobFile(delta);
+    auto process_both = [vstorage](
+                            const std::shared_ptr<BlobFileMetaData>& base_meta,
+                            const MutableBlobFileMetaData& mutable_meta) {
+      assert(base_meta);
+      assert(base_meta->GetSharedMeta() == mutable_meta.GetSharedMeta());
 
-        AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+      if (!mutable_meta.HasDelta()) {
+        assert(base_meta->GetGarbageBlobCount() ==
+               mutable_meta.GetGarbageBlobCount());
+        assert(base_meta->GetGarbageBlobBytes() ==
+               mutable_meta.GetGarbageBlobBytes());
+        assert(base_meta->GetLinkedSsts() == mutable_meta.GetLinkedSsts());
 
-        ++delta_it;
-      } else {
-        assert(base_blob_file_number == delta_blob_file_number);
+        AddBlobFileIfNeeded(vstorage, base_meta);
 
-        const auto& base_meta = base_it->second;
-        const auto& delta = delta_it->second;
+        return true;
+      }
 
-        auto meta = GetOrCreateMetaDataForExistingBlobFile(base_meta, delta);
+      AddBlobFileIfNeeded(vstorage, CreateBlobFileMetaData(mutable_meta));
 
-        AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+      return true;
+    };
 
-        ++base_it;
-        ++delta_it;
+    MergeBlobFileMetas(oldest_blob_file_with_linked_ssts, process_base,
+                       process_mutable, process_both);
+  }
+
+  void MaybeAddFile(VersionStorageInfo* vstorage, int level,
+                    FileMetaData* f) const {
+    const uint64_t file_number = f->fd.GetNumber();
+
+    const auto& level_state = levels_[level];
+
+    const auto& del_files = level_state.deleted_files;
+    const auto del_it = del_files.find(file_number);
+
+    if (del_it != del_files.end()) {
+      // f is to-be-deleted table file
+      vstorage->RemoveCurrentStats(f);
+    } else {
+      const auto& add_files = level_state.added_files;
+      const auto add_it = add_files.find(file_number);
+
+      // Note: if the file appears both in the base version and in the added
+      // list, the added FileMetaData supersedes the one in the base version.
+      if (add_it != add_files.end() && add_it->second != f) {
+        vstorage->RemoveCurrentStats(f);
+      } else {
+        vstorage->AddFile(level, f);
       }
     }
+  }
 
-    while (base_it != base_it_end) {
-      const auto& base_meta = base_it->second;
+  template <typename Cmp>
+  void SaveSSTFilesTo(VersionStorageInfo* vstorage, int level, Cmp cmp) const {
+    // Merge the set of added files with the set of pre-existing files.
+    // Drop any deleted files.  Store the result in *vstorage.
+    const auto& base_files = base_vstorage_->LevelFiles(level);
+    const auto& unordered_added_files = levels_[level].added_files;
+    vstorage->Reserve(level, base_files.size() + unordered_added_files.size());
+
+    // Sort added files for the level.
+    std::vector<FileMetaData*> added_files;
+    added_files.reserve(unordered_added_files.size());
+    for (const auto& pair : unordered_added_files) {
+      added_files.push_back(pair.second);
+    }
+    std::sort(added_files.begin(), added_files.end(), cmp);
+
+    auto base_iter = base_files.begin();
+    auto base_end = base_files.end();
+    auto added_iter = added_files.begin();
+    auto added_end = added_files.end();
+    while (added_iter != added_end || base_iter != base_end) {
+      if (base_iter == base_end ||
+          (added_iter != added_end && cmp(*added_iter, *base_iter))) {
+        MaybeAddFile(vstorage, level, *added_iter++);
+      } else {
+        MaybeAddFile(vstorage, level, *base_iter++);
+      }
+    }
+  }
 
-      AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
+  void SaveSSTFilesTo(VersionStorageInfo* vstorage) const {
+    assert(vstorage);
 
-      ++base_it;
+    if (!num_levels_) {
+      return;
     }
 
-    while (delta_it != delta_it_end) {
-      const auto& delta = delta_it->second;
-
-      auto meta = CreateMetaDataForNewBlobFile(delta);
+    SaveSSTFilesTo(vstorage, /* level */ 0, level_zero_cmp_);
 
-      AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+    for (int level = 1; level < num_levels_; ++level) {
+      SaveSSTFilesTo(vstorage, level, level_nonzero_cmp_);
+    }
+  }
 
-      ++delta_it;
+  void SaveCompactCursorsTo(VersionStorageInfo* vstorage) const {
+    for (auto iter = updated_compact_cursors_.begin();
+         iter != updated_compact_cursors_.end(); iter++) {
+      vstorage->AddCursorForOneLevel(iter->first, iter->second);
     }
   }
 
-  // Save the current state in *v.
-  Status SaveTo(VersionStorageInfo* vstorage) {
-    Status s = CheckConsistency(base_vstorage_);
+  // Save the current state in *vstorage.
+  Status SaveTo(VersionStorageInfo* vstorage) const {
+    Status s;
+
+#ifndef NDEBUG
+    // The same check is done within Apply() so we skip it in release mode.
+    s = CheckConsistency(base_vstorage_);
     if (!s.ok()) {
       return s;
     }
+#endif  // NDEBUG
 
     s = CheckConsistency(vstorage);
     if (!s.ok()) {
       return s;
     }
 
-    for (int level = 0; level < num_levels_; level++) {
-      const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
-      // Merge the set of added files with the set of pre-existing files.
-      // Drop any deleted files.  Store the result in *v.
-      const auto& base_files = base_vstorage_->LevelFiles(level);
-      const auto& unordered_added_files = levels_[level].added_files;
-      vstorage->Reserve(level,
-                        base_files.size() + unordered_added_files.size());
-
-      // Sort added files for the level.
-      std::vector<FileMetaData*> added_files;
-      added_files.reserve(unordered_added_files.size());
-      for (const auto& pair : unordered_added_files) {
-        added_files.push_back(pair.second);
-      }
-      std::sort(added_files.begin(), added_files.end(), cmp);
-
-#ifndef NDEBUG
-      FileMetaData* prev_added_file = nullptr;
-      for (const auto& added : added_files) {
-        if (level > 0 && prev_added_file != nullptr) {
-          assert(base_vstorage_->InternalComparator()->Compare(
-                     prev_added_file->smallest, added->smallest) <= 0);
-        }
-        prev_added_file = added;
-      }
-#endif
-
-      auto base_iter = base_files.begin();
-      auto base_end = base_files.end();
-      auto added_iter = added_files.begin();
-      auto added_end = added_files.end();
-      while (added_iter != added_end || base_iter != base_end) {
-        if (base_iter == base_end ||
-                (added_iter != added_end && cmp(*added_iter, *base_iter))) {
-          MaybeAddFile(vstorage, level, *added_iter++);
-        } else {
-          MaybeAddFile(vstorage, level, *base_iter++);
-        }
-      }
-    }
+    SaveSSTFilesTo(vstorage);
 
     SaveBlobFilesTo(vstorage);
 
+    SaveCompactCursorsTo(vstorage);
+
     s = CheckConsistency(vstorage);
     return s;
   }
 
-  Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
-                           bool prefetch_index_and_filter_in_cache,
-                           bool is_initial_load,
-                           const SliceTransform* prefix_extractor,
-                           size_t max_file_size_for_l0_meta_pin) {
+  Status LoadTableHandlers(
+      InternalStats* internal_stats, int max_threads,
+      bool prefetch_index_and_filter_in_cache, bool is_initial_load,
+      const std::shared_ptr<const SliceTransform>& prefix_extractor,
+      size_t max_file_size_for_l0_meta_pin) {
     assert(table_cache_ != nullptr);
 
     size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
     bool always_load = (table_cache_capacity == TableCache::kInfiniteCapacity);
-    size_t max_load = port::kMaxSizet;
+    size_t max_load = std::numeric_limits<size_t>::max();
 
     if (!always_load) {
       // If it is initial loading and not set to always loading all the
@@ -958,11 +1274,12 @@ class VersionBuilder::Rep {
         int level = files_meta[file_idx].second;
         statuses[file_idx] = table_cache_->FindTable(
             ReadOptions(), file_options_,
-            *(base_vstorage_->InternalComparator()), file_meta->fd,
+            *(base_vstorage_->InternalComparator()), *file_meta,
             &file_meta->table_reader_handle, prefix_extractor, false /*no_io */,
             true /* record_read_stats */,
             internal_stats->GetFileReadHist(level), false, level,
-            prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin);
+            prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin,
+            file_meta->temperature);
         if (file_meta->table_reader_handle != nullptr) {
           // Load table_reader
           file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
@@ -989,40 +1306,15 @@ class VersionBuilder::Rep {
     }
     return ret;
   }
-
-  void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
-    const uint64_t file_number = f->fd.GetNumber();
-
-    const auto& level_state = levels_[level];
-
-    const auto& del_files = level_state.deleted_files;
-    const auto del_it = del_files.find(file_number);
-
-    if (del_it != del_files.end()) {
-      // f is to-be-deleted table file
-      vstorage->RemoveCurrentStats(f);
-    } else {
-      const auto& add_files = level_state.added_files;
-      const auto add_it = add_files.find(file_number);
-
-      // Note: if the file appears both in the base version and in the added
-      // list, the added FileMetaData supersedes the one in the base version.
-      if (add_it != add_files.end() && add_it->second != f) {
-        vstorage->RemoveCurrentStats(f);
-      } else {
-        vstorage->AddFile(level, f);
-      }
-    }
-  }
 };
 
-VersionBuilder::VersionBuilder(const FileOptions& file_options,
-                               const ImmutableCFOptions* ioptions,
-                               TableCache* table_cache,
-                               VersionStorageInfo* base_vstorage,
-                               VersionSet* version_set)
+VersionBuilder::VersionBuilder(
+    const FileOptions& file_options, const ImmutableCFOptions* ioptions,
+    TableCache* table_cache, VersionStorageInfo* base_vstorage,
+    VersionSet* version_set,
+    std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr)
     : rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
-                   version_set)) {}
+                   version_set, file_metadata_cache_res_mgr)) {}
 
 VersionBuilder::~VersionBuilder() = default;
 
@@ -1030,28 +1322,35 @@ bool VersionBuilder::CheckConsistencyForNumLevels() {
   return rep_->CheckConsistencyForNumLevels();
 }
 
-Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); }
+Status VersionBuilder::Apply(const VersionEdit* edit) {
+  return rep_->Apply(edit);
+}
 
-Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
+Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) const {
   return rep_->SaveTo(vstorage);
 }
 
 Status VersionBuilder::LoadTableHandlers(
     InternalStats* internal_stats, int max_threads,
     bool prefetch_index_and_filter_in_cache, bool is_initial_load,
-    const SliceTransform* prefix_extractor,
+    const std::shared_ptr<const SliceTransform>& prefix_extractor,
     size_t max_file_size_for_l0_meta_pin) {
   return rep_->LoadTableHandlers(
       internal_stats, max_threads, prefetch_index_and_filter_in_cache,
       is_initial_load, prefix_extractor, max_file_size_for_l0_meta_pin);
 }
 
+uint64_t VersionBuilder::GetMinOldestBlobFileNumber() const {
+  return rep_->GetMinOldestBlobFileNumber();
+}
+
 BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
     ColumnFamilyData* cfd)
     : version_builder_(new VersionBuilder(
           cfd->current()->version_set()->file_options(), cfd->ioptions(),
           cfd->table_cache(), cfd->current()->storage_info(),
-          cfd->current()->version_set())),
+          cfd->current()->version_set(),
+          cfd->GetFileMetadataCacheReservationManager())),
       version_(cfd->current()) {
   version_->Ref();
 }
@@ -1060,7 +1359,8 @@ BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
     ColumnFamilyData* cfd, Version* v)
     : version_builder_(new VersionBuilder(
           cfd->current()->version_set()->file_options(), cfd->ioptions(),
-          cfd->table_cache(), v->storage_info(), v->version_set())),
+          cfd->table_cache(), v->storage_info(), v->version_set(),
+          cfd->GetFileMetadataCacheReservationManager())),
       version_(v) {
   assert(version_ != cfd->current());
 }