]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/version_builder.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / version_builder.cc
index 4694218a1db8a939e98673ae6712db1df9c607df..49c35cf9f2db3e482ba8ccc6d4726eb069af98b1 100644 (file)
 #include <cinttypes>
 #include <functional>
 #include <map>
+#include <memory>
 #include <set>
+#include <sstream>
 #include <thread>
 #include <unordered_map>
 #include <unordered_set>
 #include <utility>
 #include <vector>
 
+#include "db/blob/blob_file_meta.h"
 #include "db/dbformat.h"
 #include "db/internal_stats.h"
 #include "db/table_cache.h"
@@ -83,33 +86,122 @@ class VersionBuilder::Rep {
     std::unordered_map<uint64_t, FileMetaData*> added_files;
   };
 
+  class BlobFileMetaDataDelta {
+   public:
+    bool IsEmpty() const {
+      return !shared_meta_ && !additional_garbage_count_ &&
+             !additional_garbage_bytes_ && newly_linked_ssts_.empty() &&
+             newly_unlinked_ssts_.empty();
+    }
+
+    std::shared_ptr<SharedBlobFileMetaData> GetSharedMeta() const {
+      return shared_meta_;
+    }
+
+    uint64_t GetAdditionalGarbageCount() const {
+      return additional_garbage_count_;
+    }
+
+    uint64_t GetAdditionalGarbageBytes() const {
+      return additional_garbage_bytes_;
+    }
+
+    const std::unordered_set<uint64_t>& GetNewlyLinkedSsts() const {
+      return newly_linked_ssts_;
+    }
+
+    const std::unordered_set<uint64_t>& GetNewlyUnlinkedSsts() const {
+      return newly_unlinked_ssts_;
+    }
+
+    void SetSharedMeta(std::shared_ptr<SharedBlobFileMetaData> shared_meta) {
+      assert(!shared_meta_);
+      assert(shared_meta);
+
+      shared_meta_ = std::move(shared_meta);
+    }
+
+    void AddGarbage(uint64_t count, uint64_t bytes) {
+      additional_garbage_count_ += count;
+      additional_garbage_bytes_ += bytes;
+    }
+
+    void LinkSst(uint64_t sst_file_number) {
+      assert(newly_linked_ssts_.find(sst_file_number) ==
+             newly_linked_ssts_.end());
+
+      // Reconcile with newly unlinked SSTs on the fly. (Note: an SST can be
+      // linked to and unlinked from the same blob file in the case of a trivial
+      // move.)
+      auto it = newly_unlinked_ssts_.find(sst_file_number);
+
+      if (it != newly_unlinked_ssts_.end()) {
+        newly_unlinked_ssts_.erase(it);
+      } else {
+        newly_linked_ssts_.emplace(sst_file_number);
+      }
+    }
+
+    void UnlinkSst(uint64_t sst_file_number) {
+      assert(newly_unlinked_ssts_.find(sst_file_number) ==
+             newly_unlinked_ssts_.end());
+
+      // Reconcile with newly linked SSTs on the fly. (Note: an SST can be
+      // linked to and unlinked from the same blob file in the case of a trivial
+      // move.)
+      auto it = newly_linked_ssts_.find(sst_file_number);
+
+      if (it != newly_linked_ssts_.end()) {
+        newly_linked_ssts_.erase(it);
+      } else {
+        newly_unlinked_ssts_.emplace(sst_file_number);
+      }
+    }
+
+   private:
+    std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
+    uint64_t additional_garbage_count_ = 0;
+    uint64_t additional_garbage_bytes_ = 0;
+    std::unordered_set<uint64_t> newly_linked_ssts_;
+    std::unordered_set<uint64_t> newly_unlinked_ssts_;
+  };
+
   const FileOptions& file_options_;
-  Logger* info_log_;
+  const ImmutableCFOptions* const ioptions_;
   TableCache* table_cache_;
   VersionStorageInfo* base_vstorage_;
+  VersionSet* version_set_;
   int num_levels_;
   LevelState* levels_;
-  // Store states of levels larger than num_levels_. We do this instead of
+  // Store sizes of levels larger than num_levels_. We do this instead of
   // storing them in levels_ to avoid regression in case there are no files
   // on invalid levels. The version is not consistent if in the end the files
   // on invalid levels don't cancel out.
-  std::map<int, std::unordered_set<uint64_t>> invalid_levels_;
+  std::unordered_map<int, size_t> invalid_level_sizes_;
   // Whether there are invalid new files or invalid deletion on levels larger
   // than num_levels_.
   bool has_invalid_levels_;
+  // Current levels of table files affected by additions/deletions.
+  std::unordered_map<uint64_t, int> table_file_levels_;
   FileComparator level_zero_cmp_;
   FileComparator level_nonzero_cmp_;
 
+  // Metadata delta for all blob files affected by the series of version edits.
+  std::map<uint64_t, BlobFileMetaDataDelta> blob_file_meta_deltas_;
+
  public:
-  Rep(const FileOptions& file_options, Logger* info_log,
-      TableCache* table_cache,
-      VersionStorageInfo* base_vstorage)
+  Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
+      TableCache* table_cache, VersionStorageInfo* base_vstorage,
+      VersionSet* version_set)
       : file_options_(file_options),
-        info_log_(info_log),
+        ioptions_(ioptions),
         table_cache_(table_cache),
         base_vstorage_(base_vstorage),
+        version_set_(version_set),
         num_levels_(base_vstorage->num_levels()),
         has_invalid_levels_(false) {
+    assert(ioptions_);
+
     levels_ = new LevelState[num_levels_];
     level_zero_cmp_.sort_method = FileComparator::kLevel0;
     level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
@@ -140,27 +232,77 @@ class VersionBuilder::Rep {
     }
   }
 
-  Status CheckConsistency(VersionStorageInfo* vstorage) {
-#ifdef NDEBUG
-    if (!vstorage->force_consistency_checks()) {
-      // Dont run consistency checks in release mode except if
-      // explicitly asked to
-      return Status::OK();
+  bool IsBlobFileInVersion(uint64_t blob_file_number) const {
+    auto delta_it = blob_file_meta_deltas_.find(blob_file_number);
+    if (delta_it != blob_file_meta_deltas_.end()) {
+      if (delta_it->second.GetSharedMeta()) {
+        return true;
+      }
     }
-#endif
-    // make sure the files are sorted correctly
+
+    assert(base_vstorage_);
+
+    const auto& base_blob_files = base_vstorage_->GetBlobFiles();
+
+    auto base_it = base_blob_files.find(blob_file_number);
+    if (base_it != base_blob_files.end()) {
+      assert(base_it->second);
+      assert(base_it->second->GetSharedMeta());
+
+      return true;
+    }
+
+    return false;
+  }
+
+  using ExpectedLinkedSsts =
+      std::unordered_map<uint64_t, BlobFileMetaData::LinkedSsts>;
+
+  static void UpdateExpectedLinkedSsts(
+      uint64_t table_file_number, uint64_t blob_file_number,
+      ExpectedLinkedSsts* expected_linked_ssts) {
+    assert(expected_linked_ssts);
+
+    if (blob_file_number == kInvalidBlobFileNumber) {
+      return;
+    }
+
+    (*expected_linked_ssts)[blob_file_number].emplace(table_file_number);
+  }
+
+  Status CheckConsistencyDetails(VersionStorageInfo* vstorage) {
+    // Make sure the files are sorted correctly and that the links between
+    // table files and blob files are consistent. The latter is checked using
+    // the following mapping, which is built using the forward links
+    // (table file -> blob file), and is subsequently compared with the inverse
+    // mapping stored in the BlobFileMetaData objects.
+    ExpectedLinkedSsts expected_linked_ssts;
+
     for (int level = 0; level < num_levels_; level++) {
       auto& level_files = vstorage->LevelFiles(level);
+
+      if (level_files.empty()) {
+        continue;
+      }
+
+      assert(level_files[0]);
+      UpdateExpectedLinkedSsts(level_files[0]->fd.GetNumber(),
+                               level_files[0]->oldest_blob_file_number,
+                               &expected_linked_ssts);
       for (size_t i = 1; i < level_files.size(); i++) {
+        assert(level_files[i]);
+        UpdateExpectedLinkedSsts(level_files[i]->fd.GetNumber(),
+                                 level_files[i]->oldest_blob_file_number,
+                                 &expected_linked_ssts);
+
         auto f1 = level_files[i - 1];
         auto f2 = level_files[i];
+        if (level == 0) {
 #ifndef NDEBUG
-        auto pair = std::make_pair(&f1, &f2);
-        TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency", &pair);
+          auto pair = std::make_pair(&f1, &f2);
+          TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency0", &pair);
 #endif
-        if (level == 0) {
           if (!level_zero_cmp_(f1, f2)) {
-            fprintf(stderr, "L0 files are not sorted properly");
             return Status::Corruption("L0 files are not sorted properly");
           }
 
@@ -169,11 +311,6 @@ class VersionBuilder::Rep {
             SequenceNumber external_file_seqno = f2->fd.smallest_seqno;
             if (!(external_file_seqno < f1->fd.largest_seqno ||
                   external_file_seqno == 0)) {
-              fprintf(stderr,
-                      "L0 file with seqno %" PRIu64 " %" PRIu64
-                      " vs. file with global_seqno %" PRIu64 "\n",
-                      f1->fd.smallest_seqno, f1->fd.largest_seqno,
-                      external_file_seqno);
               return Status::Corruption(
                   "L0 file with seqno " +
                   NumberToString(f1->fd.smallest_seqno) + " " +
@@ -183,11 +320,6 @@ class VersionBuilder::Rep {
                   NumberToString(f1->fd.GetNumber()));
             }
           } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
-            fprintf(stderr,
-                    "L0 files seqno %" PRIu64 " %" PRIu64 " vs. %" PRIu64
-                    " %" PRIu64 "\n",
-                    f1->fd.smallest_seqno, f1->fd.largest_seqno,
-                    f2->fd.smallest_seqno, f2->fd.largest_seqno);
             return Status::Corruption(
                 "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) +
                 " " + NumberToString(f1->fd.largest_seqno) + " " +
@@ -197,8 +329,11 @@ class VersionBuilder::Rep {
                 NumberToString(f2->fd.GetNumber()));
           }
         } else {
+#ifndef NDEBUG
+          auto pair = std::make_pair(&f1, &f2);
+          TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency1", &pair);
+#endif
           if (!level_nonzero_cmp_(f1, f2)) {
-            fprintf(stderr, "L%d files are not sorted properly", level);
             return Status::Corruption("L" + NumberToString(level) +
                                       " files are not sorted properly");
           }
@@ -206,9 +341,6 @@ class VersionBuilder::Rep {
           // Make sure there is no overlap in levels > 0
           if (vstorage->InternalComparator()->Compare(f1->largest,
                                                       f2->smallest) >= 0) {
-            fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level,
-                    (f1->largest).DebugString(true).c_str(),
-                    (f2->smallest).DebugString(true).c_str());
             return Status::Corruption(
                 "L" + NumberToString(level) + " have overlapping ranges " +
                 (f1->largest).DebugString(true) + " vs. " +
@@ -217,123 +349,483 @@ class VersionBuilder::Rep {
         }
       }
     }
-    return Status::OK();
+
+    // Make sure that all blob files in the version have non-garbage data.
+    const auto& blob_files = vstorage->GetBlobFiles();
+    for (const auto& pair : blob_files) {
+      const uint64_t blob_file_number = pair.first;
+      const auto& blob_file_meta = pair.second;
+      assert(blob_file_meta);
+
+      if (blob_file_meta->GetGarbageBlobCount() >=
+          blob_file_meta->GetTotalBlobCount()) {
+        std::ostringstream oss;
+        oss << "Blob file #" << blob_file_number
+            << " consists entirely of garbage";
+
+        return Status::Corruption("VersionBuilder", oss.str());
+      }
+
+      if (blob_file_meta->GetLinkedSsts() !=
+          expected_linked_ssts[blob_file_number]) {
+        std::ostringstream oss;
+        oss << "Links are inconsistent between table files and blob file #"
+            << blob_file_number;
+
+        return Status::Corruption("VersionBuilder", oss.str());
+      }
+    }
+
+    Status ret_s;
+    TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistencyBeforeReturn",
+                             &ret_s);
+    return ret_s;
   }
 
-  Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number,
-                                    int level) {
+  Status CheckConsistency(VersionStorageInfo* vstorage) {
+    // Always run consistency checks in debug build
 #ifdef NDEBUG
-    if (!base_vstorage_->force_consistency_checks()) {
-      // Dont run consistency checks in release mode except if
-      // explicitly asked to
+    if (!vstorage->force_consistency_checks()) {
       return Status::OK();
     }
 #endif
-    // a file to be deleted better exist in the previous version
-    bool found = false;
-    for (int l = 0; !found && l < num_levels_; l++) {
-      const std::vector<FileMetaData*>& base_files =
-          base_vstorage_->LevelFiles(l);
-      for (size_t i = 0; i < base_files.size(); i++) {
-        FileMetaData* f = base_files[i];
-        if (f->fd.GetNumber() == number) {
-          found = true;
-          break;
-        }
+    Status s = CheckConsistencyDetails(vstorage);
+    if (s.IsCorruption() && s.getState()) {
+      // Make it clear the error is due to force_consistency_checks = 1 or
+      // debug build
+#ifdef NDEBUG
+      auto prefix = "force_consistency_checks";
+#else
+      auto prefix = "force_consistency_checks(DEBUG)";
+#endif
+      s = Status::Corruption(prefix, s.getState());
+    } else {
+      // was only expecting corruption with message, or OK
+      assert(s.ok());
+    }
+    return s;
+  }
+
+  bool CheckConsistencyForNumLevels() const {
+    // Make sure there are no files on or beyond num_levels().
+    if (has_invalid_levels_) {
+      return false;
+    }
+
+    for (const auto& pair : invalid_level_sizes_) {
+      const size_t level_size = pair.second;
+      if (level_size != 0) {
+        return false;
       }
     }
-    // if the file did not exist in the previous version, then it
-    // is possibly moved from lower level to higher level in current
-    // version
-    for (int l = level + 1; !found && l < num_levels_; l++) {
-      auto& level_added = levels_[l].added_files;
-      auto got = level_added.find(number);
-      if (got != level_added.end()) {
-        found = true;
-        break;
+
+    return true;
+  }
+
+  Status ApplyBlobFileAddition(const BlobFileAddition& blob_file_addition) {
+    const uint64_t blob_file_number = blob_file_addition.GetBlobFileNumber();
+
+    if (IsBlobFileInVersion(blob_file_number)) {
+      std::ostringstream oss;
+      oss << "Blob file #" << blob_file_number << " already added";
+
+      return Status::Corruption("VersionBuilder", oss.str());
+    }
+
+    // Note: we use C++11 for now but in C++14, this could be done in a more
+    // elegant way using generalized lambda capture.
+    VersionSet* const vs = version_set_;
+    const ImmutableCFOptions* const ioptions = ioptions_;
+
+    auto deleter = [vs, ioptions](SharedBlobFileMetaData* shared_meta) {
+      if (vs) {
+        assert(ioptions);
+        assert(!ioptions->cf_paths.empty());
+        assert(shared_meta);
+
+        vs->AddObsoleteBlobFile(shared_meta->GetBlobFileNumber(),
+                                ioptions->cf_paths.front().path);
       }
+
+      delete shared_meta;
+    };
+
+    auto shared_meta = SharedBlobFileMetaData::Create(
+        blob_file_number, blob_file_addition.GetTotalBlobCount(),
+        blob_file_addition.GetTotalBlobBytes(),
+        blob_file_addition.GetChecksumMethod(),
+        blob_file_addition.GetChecksumValue(), deleter);
+
+    blob_file_meta_deltas_[blob_file_number].SetSharedMeta(
+        std::move(shared_meta));
+
+    return Status::OK();
+  }
+
+  Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
+    const uint64_t blob_file_number = blob_file_garbage.GetBlobFileNumber();
+
+    if (!IsBlobFileInVersion(blob_file_number)) {
+      std::ostringstream oss;
+      oss << "Blob file #" << blob_file_number << " not found";
+
+      return Status::Corruption("VersionBuilder", oss.str());
+    }
+
+    blob_file_meta_deltas_[blob_file_number].AddGarbage(
+        blob_file_garbage.GetGarbageBlobCount(),
+        blob_file_garbage.GetGarbageBlobBytes());
+
+    return Status::OK();
+  }
+
+  int GetCurrentLevelForTableFile(uint64_t file_number) const {
+    auto it = table_file_levels_.find(file_number);
+    if (it != table_file_levels_.end()) {
+      return it->second;
+    }
+
+    assert(base_vstorage_);
+    return base_vstorage_->GetFileLocation(file_number).GetLevel();
+  }
+
+  uint64_t GetOldestBlobFileNumberForTableFile(int level,
+                                               uint64_t file_number) const {
+    assert(level < num_levels_);
+
+    const auto& added_files = levels_[level].added_files;
+
+    auto it = added_files.find(file_number);
+    if (it != added_files.end()) {
+      const FileMetaData* const meta = it->second;
+      assert(meta);
+
+      return meta->oldest_blob_file_number;
     }
 
-    // maybe this file was added in a previous edit that was Applied
-    if (!found) {
-      auto& level_added = levels_[level].added_files;
-      auto got = level_added.find(number);
-      if (got != level_added.end()) {
-        found = true;
+    assert(base_vstorage_);
+    const FileMetaData* const meta =
+        base_vstorage_->GetFileMetaDataByNumber(file_number);
+    assert(meta);
+
+    return meta->oldest_blob_file_number;
+  }
+
+  Status ApplyFileDeletion(int level, uint64_t file_number) {
+    assert(level != VersionStorageInfo::FileLocation::Invalid().GetLevel());
+
+    const int current_level = GetCurrentLevelForTableFile(file_number);
+
+    if (level != current_level) {
+      if (level >= num_levels_) {
+        has_invalid_levels_ = true;
       }
+
+      std::ostringstream oss;
+      oss << "Cannot delete table file #" << file_number << " from level "
+          << level << " since it is ";
+      if (current_level ==
+          VersionStorageInfo::FileLocation::Invalid().GetLevel()) {
+        oss << "not in the LSM tree";
+      } else {
+        oss << "on level " << current_level;
+      }
+
+      return Status::Corruption("VersionBuilder", oss.str());
     }
-    if (!found) {
-      fprintf(stderr, "not found %" PRIu64 "\n", number);
-      return Status::Corruption("not found " + NumberToString(number));
+
+    if (level >= num_levels_) {
+      assert(invalid_level_sizes_[level] > 0);
+      --invalid_level_sizes_[level];
+
+      table_file_levels_[file_number] =
+          VersionStorageInfo::FileLocation::Invalid().GetLevel();
+
+      return Status::OK();
     }
+
+    const uint64_t blob_file_number =
+        GetOldestBlobFileNumberForTableFile(level, file_number);
+
+    if (blob_file_number != kInvalidBlobFileNumber &&
+        IsBlobFileInVersion(blob_file_number)) {
+      blob_file_meta_deltas_[blob_file_number].UnlinkSst(file_number);
+    }
+
+    auto& level_state = levels_[level];
+
+    auto& add_files = level_state.added_files;
+    auto add_it = add_files.find(file_number);
+    if (add_it != add_files.end()) {
+      UnrefFile(add_it->second);
+      add_files.erase(add_it);
+    }
+
+    auto& del_files = level_state.deleted_files;
+    assert(del_files.find(file_number) == del_files.end());
+    del_files.emplace(file_number);
+
+    table_file_levels_[file_number] =
+        VersionStorageInfo::FileLocation::Invalid().GetLevel();
+
     return Status::OK();
   }
 
-  bool CheckConsistencyForNumLevels() {
-    // Make sure there are no files on or beyond num_levels().
-    if (has_invalid_levels_) {
-      return false;
-    }
-    for (auto& level : invalid_levels_) {
-      if (level.second.size() > 0) {
-        return false;
+  Status ApplyFileAddition(int level, const FileMetaData& meta) {
+    assert(level != VersionStorageInfo::FileLocation::Invalid().GetLevel());
+
+    const uint64_t file_number = meta.fd.GetNumber();
+
+    const int current_level = GetCurrentLevelForTableFile(file_number);
+
+    if (current_level !=
+        VersionStorageInfo::FileLocation::Invalid().GetLevel()) {
+      if (level >= num_levels_) {
+        has_invalid_levels_ = true;
       }
+
+      std::ostringstream oss;
+      oss << "Cannot add table file #" << file_number << " to level " << level
+          << " since it is already in the LSM tree on level " << current_level;
+      return Status::Corruption("VersionBuilder", oss.str());
     }
-    return true;
+
+    if (level >= num_levels_) {
+      ++invalid_level_sizes_[level];
+      table_file_levels_[file_number] = level;
+
+      return Status::OK();
+    }
+
+    auto& level_state = levels_[level];
+
+    auto& del_files = level_state.deleted_files;
+    auto del_it = del_files.find(file_number);
+    if (del_it != del_files.end()) {
+      del_files.erase(del_it);
+    }
+
+    FileMetaData* const f = new FileMetaData(meta);
+    f->refs = 1;
+
+    auto& add_files = level_state.added_files;
+    assert(add_files.find(file_number) == add_files.end());
+    add_files.emplace(file_number, f);
+
+    const uint64_t blob_file_number = f->oldest_blob_file_number;
+
+    if (blob_file_number != kInvalidBlobFileNumber &&
+        IsBlobFileInVersion(blob_file_number)) {
+      blob_file_meta_deltas_[blob_file_number].LinkSst(file_number);
+    }
+
+    table_file_levels_[file_number] = level;
+
+    return Status::OK();
   }
 
   // Apply all of the edits in *edit to the current state.
   Status Apply(VersionEdit* edit) {
-    Status s = CheckConsistency(base_vstorage_);
-    if (!s.ok()) {
-      return s;
+    {
+      const Status s = CheckConsistency(base_vstorage_);
+      if (!s.ok()) {
+        return s;
+      }
     }
 
-    // Delete files
-    const auto& del = edit->GetDeletedFiles();
-    for (const auto& del_file : del) {
-      const auto level = del_file.first;
-      const auto number = del_file.second;
-      if (level < num_levels_) {
-        levels_[level].deleted_files.insert(number);
-        CheckConsistencyForDeletes(edit, number, level);
+    // Note: we process the blob file related changes first because the
+    // table file addition/deletion logic depends on the blob files
+    // already being there.
 
-        auto exising = levels_[level].added_files.find(number);
-        if (exising != levels_[level].added_files.end()) {
-          UnrefFile(exising->second);
-          levels_[level].added_files.erase(exising);
-        }
-      } else {
-        if (invalid_levels_[level].erase(number) == 0) {
-          // Deleting an non-existing file on invalid level.
-          has_invalid_levels_ = true;
-        }
+    // Add new blob files
+    for (const auto& blob_file_addition : edit->GetBlobFileAdditions()) {
+      const Status s = ApplyBlobFileAddition(blob_file_addition);
+      if (!s.ok()) {
+        return s;
+      }
+    }
+
+    // Increase the amount of garbage for blob files affected by GC
+    for (const auto& blob_file_garbage : edit->GetBlobFileGarbages()) {
+      const Status s = ApplyBlobFileGarbage(blob_file_garbage);
+      if (!s.ok()) {
+        return s;
+      }
+    }
+
+    // Delete table files
+    for (const auto& deleted_file : edit->GetDeletedFiles()) {
+      const int level = deleted_file.first;
+      const uint64_t file_number = deleted_file.second;
+
+      const Status s = ApplyFileDeletion(level, file_number);
+      if (!s.ok()) {
+        return s;
       }
     }
 
-    // Add new files
+    // Add new table files
     for (const auto& new_file : edit->GetNewFiles()) {
       const int level = new_file.first;
-      if (level < num_levels_) {
-        FileMetaData* f = new FileMetaData(new_file.second);
-        f->refs = 1;
-
-        assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
-               levels_[level].added_files.end());
-        levels_[level].deleted_files.erase(f->fd.GetNumber());
-        levels_[level].added_files[f->fd.GetNumber()] = f;
+      const FileMetaData& meta = new_file.second;
+
+      const Status s = ApplyFileAddition(level, meta);
+      if (!s.ok()) {
+        return s;
+      }
+    }
+
+    return Status::OK();
+  }
+
+  static BlobFileMetaData::LinkedSsts ApplyLinkedSstChanges(
+      const BlobFileMetaData::LinkedSsts& base,
+      const std::unordered_set<uint64_t>& newly_linked,
+      const std::unordered_set<uint64_t>& newly_unlinked) {
+    BlobFileMetaData::LinkedSsts result(base);
+
+    for (uint64_t sst_file_number : newly_unlinked) {
+      assert(result.find(sst_file_number) != result.end());
+
+      result.erase(sst_file_number);
+    }
+
+    for (uint64_t sst_file_number : newly_linked) {
+      assert(result.find(sst_file_number) == result.end());
+
+      result.emplace(sst_file_number);
+    }
+
+    return result;
+  }
+
+  static std::shared_ptr<BlobFileMetaData> CreateMetaDataForNewBlobFile(
+      const BlobFileMetaDataDelta& delta) {
+    auto shared_meta = delta.GetSharedMeta();
+    assert(shared_meta);
+
+    assert(delta.GetNewlyUnlinkedSsts().empty());
+
+    auto meta = BlobFileMetaData::Create(
+        std::move(shared_meta), delta.GetNewlyLinkedSsts(),
+        delta.GetAdditionalGarbageCount(), delta.GetAdditionalGarbageBytes());
+
+    return meta;
+  }
+
+  static std::shared_ptr<BlobFileMetaData>
+  GetOrCreateMetaDataForExistingBlobFile(
+      const std::shared_ptr<BlobFileMetaData>& base_meta,
+      const BlobFileMetaDataDelta& delta) {
+    assert(base_meta);
+    assert(!delta.GetSharedMeta());
+
+    if (delta.IsEmpty()) {
+      return base_meta;
+    }
+
+    auto shared_meta = base_meta->GetSharedMeta();
+    assert(shared_meta);
+
+    auto linked_ssts = ApplyLinkedSstChanges(base_meta->GetLinkedSsts(),
+                                             delta.GetNewlyLinkedSsts(),
+                                             delta.GetNewlyUnlinkedSsts());
+
+    auto meta = BlobFileMetaData::Create(
+        std::move(shared_meta), std::move(linked_ssts),
+        base_meta->GetGarbageBlobCount() + delta.GetAdditionalGarbageCount(),
+        base_meta->GetGarbageBlobBytes() + delta.GetAdditionalGarbageBytes());
+
+    return meta;
+  }
+
+  // Add the blob file specified by meta to *vstorage if it is determined to
+  // contain valid data (blobs). We make this decision based on the amount
+  // of garbage in the file, and whether the file or any lower-numbered blob
+  // files have any linked SSTs. The latter condition is tracked using the
+  // flag *found_first_non_empty.
+  void AddBlobFileIfNeeded(VersionStorageInfo* vstorage,
+                           const std::shared_ptr<BlobFileMetaData>& meta,
+                           bool* found_first_non_empty) const {
+    assert(vstorage);
+    assert(meta);
+    assert(found_first_non_empty);
+
+    if (!meta->GetLinkedSsts().empty()) {
+      (*found_first_non_empty) = true;
+    } else if (!(*found_first_non_empty) ||
+               meta->GetGarbageBlobCount() >= meta->GetTotalBlobCount()) {
+      return;
+    }
+
+    vstorage->AddBlobFile(meta);
+  }
+
+  // Merge the blob file metadata from the base version with the changes (edits)
+  // applied, and save the result into *vstorage.
+  void SaveBlobFilesTo(VersionStorageInfo* vstorage) const {
+    assert(base_vstorage_);
+    assert(vstorage);
+
+    bool found_first_non_empty = false;
+
+    const auto& base_blob_files = base_vstorage_->GetBlobFiles();
+    auto base_it = base_blob_files.begin();
+    const auto base_it_end = base_blob_files.end();
+
+    auto delta_it = blob_file_meta_deltas_.begin();
+    const auto delta_it_end = blob_file_meta_deltas_.end();
+
+    while (base_it != base_it_end && delta_it != delta_it_end) {
+      const uint64_t base_blob_file_number = base_it->first;
+      const uint64_t delta_blob_file_number = delta_it->first;
+
+      if (base_blob_file_number < delta_blob_file_number) {
+        const auto& base_meta = base_it->second;
+
+        AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
+
+        ++base_it;
+      } else if (delta_blob_file_number < base_blob_file_number) {
+        const auto& delta = delta_it->second;
+
+        auto meta = CreateMetaDataForNewBlobFile(delta);
+
+        AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+
+        ++delta_it;
       } else {
-        uint64_t number = new_file.second.fd.GetNumber();
-        auto& lvls = invalid_levels_[level];
-        if (lvls.count(number) == 0) {
-          lvls.insert(number);
-        } else {
-          // Creating an already existing file on invalid level.
-          has_invalid_levels_ = true;
-        }
+        assert(base_blob_file_number == delta_blob_file_number);
+
+        const auto& base_meta = base_it->second;
+        const auto& delta = delta_it->second;
+
+        auto meta = GetOrCreateMetaDataForExistingBlobFile(base_meta, delta);
+
+        AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+
+        ++base_it;
+        ++delta_it;
       }
     }
-    return s;
+
+    while (base_it != base_it_end) {
+      const auto& base_meta = base_it->second;
+
+      AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
+
+      ++base_it;
+    }
+
+    while (delta_it != delta_it_end) {
+      const auto& delta = delta_it->second;
+
+      auto meta = CreateMetaDataForNewBlobFile(delta);
+
+      AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+
+      ++delta_it;
+    }
   }
 
   // Save the current state in *v.
@@ -390,6 +882,8 @@ class VersionBuilder::Rep {
       }
     }
 
+    SaveBlobFilesTo(vstorage);
+
     s = CheckConsistency(vstorage);
     return s;
   }
@@ -397,7 +891,8 @@ class VersionBuilder::Rep {
   Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
                            bool prefetch_index_and_filter_in_cache,
                            bool is_initial_load,
-                           const SliceTransform* prefix_extractor) {
+                           const SliceTransform* prefix_extractor,
+                           size_t max_file_size_for_l0_meta_pin) {
     assert(table_cache_ != nullptr);
 
     size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
@@ -405,7 +900,7 @@ class VersionBuilder::Rep {
     size_t max_load = port::kMaxSizet;
 
     if (!always_load) {
-      // If it is initial loading and not set to always laoding all the
+      // If it is initial loading and not set to always loading all the
       // files, we only load up to kInitialLoadLimit files, to limit the
       // time reopening the DB.
       const size_t kInitialLoadLimit = 16;
@@ -462,11 +957,12 @@ class VersionBuilder::Rep {
         auto* file_meta = files_meta[file_idx].first;
         int level = files_meta[file_idx].second;
         statuses[file_idx] = table_cache_->FindTable(
-            file_options_, *(base_vstorage_->InternalComparator()),
-            file_meta->fd, &file_meta->table_reader_handle, prefix_extractor,
-            false /*no_io */, true /* record_read_stats */,
+            ReadOptions(), file_options_,
+            *(base_vstorage_->InternalComparator()), file_meta->fd,
+            &file_meta->table_reader_handle, prefix_extractor, false /*no_io */,
+            true /* record_read_stats */,
             internal_stats->GetFileReadHist(level), false, level,
-            prefetch_index_and_filter_in_cache);
+            prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin);
         if (file_meta->table_reader_handle != nullptr) {
           // Load table_reader
           file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
@@ -483,40 +979,52 @@ class VersionBuilder::Rep {
     for (auto& t : threads) {
       t.join();
     }
+    Status ret;
     for (const auto& s : statuses) {
       if (!s.ok()) {
-        return s;
+        if (ret.ok()) {
+          ret = s;
+        }
       }
     }
-    return Status::OK();
+    return ret;
   }
 
   void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
-    if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
+    const uint64_t file_number = f->fd.GetNumber();
+
+    const auto& level_state = levels_[level];
+
+    const auto& del_files = level_state.deleted_files;
+    const auto del_it = del_files.find(file_number);
+
+    if (del_it != del_files.end()) {
       // f is to-be-deleted table file
       vstorage->RemoveCurrentStats(f);
     } else {
-      vstorage->AddFile(level, f, info_log_);
+      const auto& add_files = level_state.added_files;
+      const auto add_it = add_files.find(file_number);
+
+      // Note: if the file appears both in the base version and in the added
+      // list, the added FileMetaData supersedes the one in the base version.
+      if (add_it != add_files.end() && add_it->second != f) {
+        vstorage->RemoveCurrentStats(f);
+      } else {
+        vstorage->AddFile(level, f);
+      }
     }
   }
 };
 
 VersionBuilder::VersionBuilder(const FileOptions& file_options,
+                               const ImmutableCFOptions* ioptions,
                                TableCache* table_cache,
                                VersionStorageInfo* base_vstorage,
-                               Logger* info_log)
-    : rep_(new Rep(file_options, info_log, table_cache, base_vstorage)) {}
+                               VersionSet* version_set)
+    : rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
+                   version_set)) {}
 
-VersionBuilder::~VersionBuilder() { delete rep_; }
-
-Status VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
-  return rep_->CheckConsistency(vstorage);
-}
-
-Status VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
-                                                  uint64_t number, int level) {
-  return rep_->CheckConsistencyForDeletes(edit, number, level);
-}
+VersionBuilder::~VersionBuilder() = default;
 
 bool VersionBuilder::CheckConsistencyForNumLevels() {
   return rep_->CheckConsistencyForNumLevels();
@@ -531,15 +1039,34 @@ Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
 Status VersionBuilder::LoadTableHandlers(
     InternalStats* internal_stats, int max_threads,
     bool prefetch_index_and_filter_in_cache, bool is_initial_load,
-    const SliceTransform* prefix_extractor) {
-  return rep_->LoadTableHandlers(internal_stats, max_threads,
-                                 prefetch_index_and_filter_in_cache,
-                                 is_initial_load, prefix_extractor);
+    const SliceTransform* prefix_extractor,
+    size_t max_file_size_for_l0_meta_pin) {
+  return rep_->LoadTableHandlers(
+      internal_stats, max_threads, prefetch_index_and_filter_in_cache,
+      is_initial_load, prefix_extractor, max_file_size_for_l0_meta_pin);
+}
+
+BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
+    ColumnFamilyData* cfd)
+    : version_builder_(new VersionBuilder(
+          cfd->current()->version_set()->file_options(), cfd->ioptions(),
+          cfd->table_cache(), cfd->current()->storage_info(),
+          cfd->current()->version_set())),
+      version_(cfd->current()) {
+  version_->Ref();
+}
+
+BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
+    ColumnFamilyData* cfd, Version* v)
+    : version_builder_(new VersionBuilder(
+          cfd->current()->version_set()->file_options(), cfd->ioptions(),
+          cfd->table_cache(), v->storage_info(), v->version_set())),
+      version_(v) {
+  assert(version_ != cfd->current());
 }
 
-void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
-                                  FileMetaData* f) {
-  rep_->MaybeAddFile(vstorage, level, f);
+BaseReferencedVersionBuilder::~BaseReferencedVersionBuilder() {
+  version_->Unref();
 }
 
 }  // namespace ROCKSDB_NAMESPACE