]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/version_set_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / version_set_test.cc
index 03e0e26d27987e8e0aef35c5bc258e93d35d72ca..5408857203dac95be467773af55885930828aec7 100644 (file)
@@ -8,9 +8,10 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include "db/version_set.h"
+
 #include "db/db_impl/db_impl.h"
 #include "db/log_writer.h"
-#include "logging/logging.h"
+#include "table/block_based/block_based_table_factory.h"
 #include "table/mock_table.h"
 #include "test_util/testharness.h"
 #include "test_util/testutil.h"
@@ -95,7 +96,7 @@ Options GetOptionsWithNumLevels(int num_levels,
   return opt;
 }
 
-class VersionStorageInfoTest : public testing::Test {
+class VersionStorageInfoTestBase : public testing::Test {
  public:
   const Comparator* ucmp_;
   InternalKeyComparator icmp_;
@@ -110,17 +111,19 @@ class VersionStorageInfoTest : public testing::Test {
     return InternalKey(ukey, smallest_seq, kTypeValue);
   }
 
-  VersionStorageInfoTest()
-      : ucmp_(BytewiseComparator()),
+  explicit VersionStorageInfoTestBase(const Comparator* ucmp)
+      : ucmp_(ucmp),
         icmp_(ucmp_),
         logger_(new CountingLogger()),
         options_(GetOptionsWithNumLevels(6, logger_)),
         ioptions_(options_),
         mutable_cf_options_(options_),
-        vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {}
+        vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel,
+                  /*src_vstorage=*/nullptr,
+                  /*_force_consistency_checks=*/false) {}
 
-  ~VersionStorageInfoTest() override {
-    for (int i = 0; i < vstorage_.num_levels(); i++) {
+  ~VersionStorageInfoTestBase() override {
+    for (int i = 0; i < vstorage_.num_levels(); ++i) {
       for (auto* f : vstorage_.LevelFiles(i)) {
         if (--f->refs == 0) {
           delete f;
@@ -171,6 +174,13 @@ class VersionStorageInfoTest : public testing::Test {
   }
 };
 
+class VersionStorageInfoTest : public VersionStorageInfoTestBase {
+ public:
+  VersionStorageInfoTest() : VersionStorageInfoTestBase(BytewiseComparator()) {}
+
+  ~VersionStorageInfoTest() override {}
+};
+
 TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) {
   ioptions_.level_compaction_dynamic_level_bytes = false;
   mutable_cf_options_.max_bytes_for_level_base = 10;
@@ -362,19 +372,19 @@ TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) {
   Add(2, 3U, "6", "8", 1U);  // Partial overlap with last level
   Add(3, 4U, "1", "9", 1U);  // Contains range of last level
   Add(4, 5U, "4", "5", 1U);  // Inside range of last level
-  Add(4, 5U, "6", "7", 1U);  // Inside range of last level
-  Add(5, 6U, "4", "7", 10U);
+  Add(4, 6U, "6", "7", 1U);  // Inside range of last level
+  Add(5, 7U, "4", "7", 10U);
   ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize());
 }
 
 TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) {
   Add(0, 1U, "9", "9", 1U);  // Level 0 is not ordered
-  Add(0, 1U, "5", "6", 1U);  // Ignored because of [5,6] in l1
-  Add(1, 1U, "1", "2", 1U);  // Ignored because of [2,3] in l2
-  Add(1, 2U, "3", "4", 1U);  // Ignored because of [2,3] in l2
-  Add(1, 3U, "5", "6", 1U);
-  Add(2, 4U, "2", "3", 1U);
-  Add(3, 5U, "7", "8", 1U);
+  Add(0, 2U, "5", "6", 1U);  // Ignored because of [5,6] in l1
+  Add(1, 3U, "1", "2", 1U);  // Ignored because of [2,3] in l2
+  Add(1, 4U, "3", "4", 1U);  // Ignored because of [2,3] in l2
+  Add(1, 5U, "5", "6", 1U);
+  Add(2, 6U, "2", "3", 1U);
+  Add(3, 7U, "7", "8", 1U);
   ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize());
 }
 
@@ -411,6 +421,76 @@ TEST_F(VersionStorageInfoTest, GetOverlappingInputs) {
       1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}));
 }
 
+TEST_F(VersionStorageInfoTest, FileLocationAndMetaDataByNumber) {
+  Add(0, 11U, "1", "2", 5000U);
+  Add(0, 12U, "1", "2", 5000U);
+
+  Add(2, 7U, "1", "2", 8000U);
+
+  ASSERT_EQ(vstorage_.GetFileLocation(11U),
+            VersionStorageInfo::FileLocation(0, 0));
+  ASSERT_NE(vstorage_.GetFileMetaDataByNumber(11U), nullptr);
+
+  ASSERT_EQ(vstorage_.GetFileLocation(12U),
+            VersionStorageInfo::FileLocation(0, 1));
+  ASSERT_NE(vstorage_.GetFileMetaDataByNumber(12U), nullptr);
+
+  ASSERT_EQ(vstorage_.GetFileLocation(7U),
+            VersionStorageInfo::FileLocation(2, 0));
+  ASSERT_NE(vstorage_.GetFileMetaDataByNumber(7U), nullptr);
+
+  ASSERT_FALSE(vstorage_.GetFileLocation(999U).IsValid());
+  ASSERT_EQ(vstorage_.GetFileMetaDataByNumber(999U), nullptr);
+}
+
+class VersionStorageInfoTimestampTest : public VersionStorageInfoTestBase {
+ public:
+  VersionStorageInfoTimestampTest()
+      : VersionStorageInfoTestBase(test::ComparatorWithU64Ts()) {}
+  ~VersionStorageInfoTimestampTest() override {}
+  std::string Timestamp(uint64_t ts) const {
+    std::string ret;
+    PutFixed64(&ret, ts);
+    return ret;
+  }
+  std::string PackUserKeyAndTimestamp(const Slice& ukey, uint64_t ts) const {
+    std::string ret;
+    ret.assign(ukey.data(), ukey.size());
+    PutFixed64(&ret, ts);
+    return ret;
+  }
+};
+
+TEST_F(VersionStorageInfoTimestampTest, GetOverlappingInputs) {
+  Add(/*level=*/1, /*file_number=*/1, /*smallest=*/
+      {PackUserKeyAndTimestamp("a", /*ts=*/9), /*s=*/0, kTypeValue},
+      /*largest=*/
+      {PackUserKeyAndTimestamp("a", /*ts=*/8), /*s=*/0, kTypeValue},
+      /*file_size=*/100);
+  Add(/*level=*/1, /*file_number=*/2, /*smallest=*/
+      {PackUserKeyAndTimestamp("a", /*ts=*/5), /*s=*/0, kTypeValue},
+      /*largest=*/
+      {PackUserKeyAndTimestamp("b", /*ts=*/10), /*s=*/0, kTypeValue},
+      /*file_size=*/100);
+  Add(/*level=*/1, /*file_number=*/3, /*smallest=*/
+      {PackUserKeyAndTimestamp("c", /*ts=*/12), /*s=*/0, kTypeValue},
+      /*largest=*/
+      {PackUserKeyAndTimestamp("d", /*ts=*/1), /*s=*/0, kTypeValue},
+      /*file_size=*/100);
+  vstorage_.UpdateNumNonEmptyLevels();
+  vstorage_.GenerateLevelFilesBrief();
+  ASSERT_EQ(
+      "1,2",
+      GetOverlappingFiles(
+          /*level=*/1,
+          {PackUserKeyAndTimestamp("a", /*ts=*/12), /*s=*/0, kTypeValue},
+          {PackUserKeyAndTimestamp("a", /*ts=*/11), /*s=*/0, kTypeValue}));
+  ASSERT_EQ("3",
+            GetOverlappingFiles(
+                /*level=*/1,
+                {PackUserKeyAndTimestamp("c", /*ts=*/15), /*s=*/0, kTypeValue},
+                {PackUserKeyAndTimestamp("c", /*ts=*/2), /*s=*/0, kTypeValue}));
+}
 
 class FindLevelFileTest : public testing::Test {
  public:
@@ -611,40 +691,72 @@ class VersionSetTestBase {
   const static std::string kColumnFamilyName3;
   int num_initial_edits_;
 
-  VersionSetTestBase()
-      : env_(Env::Default()),
-        fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
-        dbname_(test::PerThreadDBPath("version_set_test")),
-        db_options_(),
+  explicit VersionSetTestBase(const std::string& name)
+      : env_(nullptr),
+        dbname_(test::PerThreadDBPath(name)),
+        options_(),
+        db_options_(options_),
+        cf_options_(options_),
+        immutable_cf_options_(db_options_, cf_options_),
         mutable_cf_options_(cf_options_),
         table_cache_(NewLRUCache(50000, 16)),
         write_buffer_manager_(db_options_.db_write_buffer_size),
         shutting_down_(false),
         mock_table_factory_(std::make_shared<mock::MockTableFactory>()) {
-    EXPECT_OK(env_->CreateDirIfMissing(dbname_));
+    const char* test_env_uri = getenv("TEST_ENV_URI");
+    if (test_env_uri) {
+      Status s = Env::LoadEnv(test_env_uri, &env_, &env_guard_);
+      EXPECT_OK(s);
+    } else if (getenv("MEM_ENV")) {
+      env_guard_.reset(NewMemEnv(Env::Default()));
+      env_ = env_guard_.get();
+    } else {
+      env_ = Env::Default();
+    }
+    EXPECT_NE(nullptr, env_);
 
+    fs_ = env_->GetFileSystem();
+    EXPECT_OK(fs_->CreateDirIfMissing(dbname_, IOOptions(), nullptr));
+
+    options_.env = env_;
     db_options_.env = env_;
     db_options_.fs = fs_;
-    versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
-                                   table_cache_.get(), &write_buffer_manager_,
-                                   &write_controller_,
-                                   /*block_cache_tracer=*/nullptr)),
-        reactive_versions_ = std::make_shared<ReactiveVersionSet>(
-            dbname_, &db_options_, env_options_, table_cache_.get(),
-            &write_buffer_manager_, &write_controller_);
+    immutable_cf_options_.env = env_;
+    immutable_cf_options_.fs = fs_.get();
+
+    versions_.reset(
+        new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
+                       &write_buffer_manager_, &write_controller_,
+                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
+    reactive_versions_ = std::make_shared<ReactiveVersionSet>(
+        dbname_, &db_options_, env_options_, table_cache_.get(),
+        &write_buffer_manager_, &write_controller_, nullptr);
     db_options_.db_paths.emplace_back(dbname_,
                                       std::numeric_limits<uint64_t>::max());
   }
 
-  void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
-                       SequenceNumber* last_seqno,
-                       std::unique_ptr<log::Writer>* log_writer) {
+  virtual ~VersionSetTestBase() {
+    if (getenv("KEEP_DB")) {
+      fprintf(stdout, "DB is still at %s\n", dbname_.c_str());
+    } else {
+      Options options;
+      options.env = env_;
+      EXPECT_OK(DestroyDB(dbname_, options));
+    }
+  }
+
+ protected:
+  virtual void PrepareManifest(
+      std::vector<ColumnFamilyDescriptor>* column_families,
+      SequenceNumber* last_seqno, std::unique_ptr<log::Writer>* log_writer) {
     assert(column_families != nullptr);
     assert(last_seqno != nullptr);
     assert(log_writer != nullptr);
     VersionEdit new_db;
     if (db_options_.write_dbid_to_manifest) {
-      DBImpl* impl = new DBImpl(DBOptions(), dbname_);
+      DBOptions tmp_db_options;
+      tmp_db_options.env = env_;
+      std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
       std::string db_id;
       impl->GetDbIdentityFromIdentityFile(&db_id);
       new_db.SetDBId(db_id);
@@ -700,27 +812,73 @@ class VersionSetTestBase {
 
   // Create DB with 3 column families.
   void NewDB() {
-    std::vector<ColumnFamilyDescriptor> column_families;
     SequenceNumber last_seqno;
     std::unique_ptr<log::Writer> log_writer;
     SetIdentityFile(env_, dbname_);
-    PrepareManifest(&column_families, &last_seqno, &log_writer);
+    PrepareManifest(&column_families_, &last_seqno, &log_writer);
     log_writer.reset();
     // Make "CURRENT" file point to the new manifest file.
-    Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
+    Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
     ASSERT_OK(s);
 
-    EXPECT_OK(versions_->Recover(column_families, false));
-    EXPECT_EQ(column_families.size(),
+    EXPECT_OK(versions_->Recover(column_families_, false));
+    EXPECT_EQ(column_families_.size(),
               versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
   }
 
+  void VerifyManifest(std::string* manifest_path) const {
+    assert(manifest_path != nullptr);
+    uint64_t manifest_file_number = 0;
+    Status s = versions_->GetCurrentManifestPath(
+        dbname_, fs_.get(), manifest_path, &manifest_file_number);
+    ASSERT_OK(s);
+    ASSERT_EQ(1, manifest_file_number);
+  }
+
+  Status LogAndApplyToDefaultCF(VersionEdit& edit) {
+    mutex_.Lock();
+    Status s =
+        versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
+                               mutable_cf_options_, &edit, &mutex_);
+    mutex_.Unlock();
+    return s;
+  }
+
+  Status LogAndApplyToDefaultCF(
+      const autovector<std::unique_ptr<VersionEdit>>& edits) {
+    autovector<VersionEdit*> vedits;
+    for (auto& e : edits) {
+      vedits.push_back(e.get());
+    }
+    mutex_.Lock();
+    Status s =
+        versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
+                               mutable_cf_options_, vedits, &mutex_);
+    mutex_.Unlock();
+    return s;
+  }
+
+  void CreateNewManifest() {
+    constexpr FSDirectory* db_directory = nullptr;
+    constexpr bool new_descriptor_log = true;
+    mutex_.Lock();
+    VersionEdit dummy;
+    ASSERT_OK(versions_->LogAndApply(
+        versions_->GetColumnFamilySet()->GetDefault(), mutable_cf_options_,
+        &dummy, &mutex_, db_directory, new_descriptor_log));
+    mutex_.Unlock();
+  }
+
+  Env* mem_env_;
   Env* env_;
+  std::shared_ptr<Env> env_guard_;
   std::shared_ptr<FileSystem> fs_;
   const std::string dbname_;
   EnvOptions env_options_;
+  Options options_;
   ImmutableDBOptions db_options_;
   ColumnFamilyOptions cf_options_;
+  ImmutableCFOptions immutable_cf_options_;
   MutableCFOptions mutable_cf_options_;
   std::shared_ptr<Cache> table_cache_;
   WriteController write_controller_;
@@ -730,6 +888,7 @@ class VersionSetTestBase {
   InstrumentedMutex mutex_;
   std::atomic<bool> shutting_down_;
   std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
+  std::vector<ColumnFamilyDescriptor> column_families_;
 };
 
 const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
@@ -738,7 +897,7 @@ const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
 
 class VersionSetTest : public VersionSetTestBase, public testing::Test {
  public:
-  VersionSetTest() : VersionSetTestBase() {}
+  VersionSetTest() : VersionSetTestBase("version_set_test") {}
 };
 
 TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
@@ -777,10 +936,742 @@ TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
   EXPECT_EQ(kGroupSize - 1, count);
 }
 
+TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) {
+  // Initialize the database and add a couple of blob files, one with some
+  // garbage in it, and one without any garbage.
+  NewDB();
+
+  assert(versions_);
+  assert(versions_->GetColumnFamilySet());
+
+  ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault();
+  assert(cfd);
+
+  Version* const version = cfd->current();
+  assert(version);
+
+  VersionStorageInfo* const storage_info = version->storage_info();
+  assert(storage_info);
+
+  {
+    constexpr uint64_t blob_file_number = 123;
+    constexpr uint64_t total_blob_count = 456;
+    constexpr uint64_t total_blob_bytes = 77777777;
+    constexpr char checksum_method[] = "SHA1";
+    constexpr char checksum_value[] =
+        "bdb7f34a59dfa1592ce7f52e99f98c570c525cbd";
+
+    auto shared_meta = SharedBlobFileMetaData::Create(
+        blob_file_number, total_blob_count, total_blob_bytes, checksum_method,
+        checksum_value);
+
+    constexpr uint64_t garbage_blob_count = 89;
+    constexpr uint64_t garbage_blob_bytes = 1000000;
+
+    auto meta = BlobFileMetaData::Create(
+        std::move(shared_meta), BlobFileMetaData::LinkedSsts(),
+        garbage_blob_count, garbage_blob_bytes);
+
+    storage_info->AddBlobFile(std::move(meta));
+  }
+
+  {
+    constexpr uint64_t blob_file_number = 234;
+    constexpr uint64_t total_blob_count = 555;
+    constexpr uint64_t total_blob_bytes = 66666;
+    constexpr char checksum_method[] = "CRC32";
+    constexpr char checksum_value[] = "3d87ff57";
+
+    auto shared_meta = SharedBlobFileMetaData::Create(
+        blob_file_number, total_blob_count, total_blob_bytes, checksum_method,
+        checksum_value);
+
+    constexpr uint64_t garbage_blob_count = 0;
+    constexpr uint64_t garbage_blob_bytes = 0;
+
+    auto meta = BlobFileMetaData::Create(
+        std::move(shared_meta), BlobFileMetaData::LinkedSsts(),
+        garbage_blob_count, garbage_blob_bytes);
+
+    storage_info->AddBlobFile(std::move(meta));
+  }
+
+  // Force the creation of a new manifest file and make sure metadata for
+  // the blob files is re-persisted.
+  size_t addition_encoded = 0;
+  SyncPoint::GetInstance()->SetCallBack(
+      "BlobFileAddition::EncodeTo::CustomFields",
+      [&](void* /* arg */) { ++addition_encoded; });
+
+  size_t garbage_encoded = 0;
+  SyncPoint::GetInstance()->SetCallBack(
+      "BlobFileGarbage::EncodeTo::CustomFields",
+      [&](void* /* arg */) { ++garbage_encoded; });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  CreateNewManifest();
+
+  ASSERT_EQ(addition_encoded, 2);
+  ASSERT_EQ(garbage_encoded, 1);
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_F(VersionSetTest, AddLiveBlobFiles) {
+  // Initialize the database and add a blob file.
+  NewDB();
+
+  assert(versions_);
+  assert(versions_->GetColumnFamilySet());
+
+  ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault();
+  assert(cfd);
+
+  Version* const first_version = cfd->current();
+  assert(first_version);
+
+  VersionStorageInfo* const first_storage_info = first_version->storage_info();
+  assert(first_storage_info);
+
+  constexpr uint64_t first_blob_file_number = 234;
+  constexpr uint64_t first_total_blob_count = 555;
+  constexpr uint64_t first_total_blob_bytes = 66666;
+  constexpr char first_checksum_method[] = "CRC32";
+  constexpr char first_checksum_value[] = "3d87ff57";
+
+  auto first_shared_meta = SharedBlobFileMetaData::Create(
+      first_blob_file_number, first_total_blob_count, first_total_blob_bytes,
+      first_checksum_method, first_checksum_value);
+
+  constexpr uint64_t garbage_blob_count = 0;
+  constexpr uint64_t garbage_blob_bytes = 0;
+
+  auto first_meta = BlobFileMetaData::Create(
+      std::move(first_shared_meta), BlobFileMetaData::LinkedSsts(),
+      garbage_blob_count, garbage_blob_bytes);
+
+  first_storage_info->AddBlobFile(first_meta);
+
+  // Reference the version so it stays alive even after the following version
+  // edit.
+  first_version->Ref();
+
+  // Get live files directly from version.
+  std::vector<uint64_t> version_table_files;
+  std::vector<uint64_t> version_blob_files;
+
+  first_version->AddLiveFiles(&version_table_files, &version_blob_files);
+
+  ASSERT_EQ(version_blob_files.size(), 1);
+  ASSERT_EQ(version_blob_files[0], first_blob_file_number);
+
+  // Create a new version containing an additional blob file.
+  versions_->TEST_CreateAndAppendVersion(cfd);
+
+  Version* const second_version = cfd->current();
+  assert(second_version);
+  assert(second_version != first_version);
+
+  VersionStorageInfo* const second_storage_info =
+      second_version->storage_info();
+  assert(second_storage_info);
+
+  constexpr uint64_t second_blob_file_number = 456;
+  constexpr uint64_t second_total_blob_count = 100;
+  constexpr uint64_t second_total_blob_bytes = 2000000;
+  constexpr char second_checksum_method[] = "CRC32B";
+  constexpr char second_checksum_value[] = "6dbdf23a";
+
+  auto second_shared_meta = SharedBlobFileMetaData::Create(
+      second_blob_file_number, second_total_blob_count, second_total_blob_bytes,
+      second_checksum_method, second_checksum_value);
+
+  auto second_meta = BlobFileMetaData::Create(
+      std::move(second_shared_meta), BlobFileMetaData::LinkedSsts(),
+      garbage_blob_count, garbage_blob_bytes);
+
+  second_storage_info->AddBlobFile(std::move(first_meta));
+  second_storage_info->AddBlobFile(std::move(second_meta));
+
+  // Get all live files from version set. Note that the result contains
+  // duplicates.
+  std::vector<uint64_t> all_table_files;
+  std::vector<uint64_t> all_blob_files;
+
+  versions_->AddLiveFiles(&all_table_files, &all_blob_files);
+
+  ASSERT_EQ(all_blob_files.size(), 3);
+  ASSERT_EQ(all_blob_files[0], first_blob_file_number);
+  ASSERT_EQ(all_blob_files[1], first_blob_file_number);
+  ASSERT_EQ(all_blob_files[2], second_blob_file_number);
+
+  // Clean up previous version.
+  first_version->Unref();
+}
+
+TEST_F(VersionSetTest, ObsoleteBlobFile) {
+  // Initialize the database and add a blob file that is entirely garbage
+  // and thus can immediately be marked obsolete.
+  NewDB();
+
+  VersionEdit edit;
+
+  constexpr uint64_t blob_file_number = 234;
+  constexpr uint64_t total_blob_count = 555;
+  constexpr uint64_t total_blob_bytes = 66666;
+  constexpr char checksum_method[] = "CRC32";
+  constexpr char checksum_value[] = "3d87ff57";
+
+  edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
+                   checksum_method, checksum_value);
+
+  edit.AddBlobFileGarbage(blob_file_number, total_blob_count, total_blob_bytes);
+
+  mutex_.Lock();
+  Status s =
+      versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
+                             mutable_cf_options_, &edit, &mutex_);
+  mutex_.Unlock();
+
+  ASSERT_OK(s);
+
+  // Make sure blob files from the pending number range are not returned
+  // as obsolete.
+  {
+    std::vector<ObsoleteFileInfo> table_files;
+    std::vector<ObsoleteBlobFileInfo> blob_files;
+    std::vector<std::string> manifest_files;
+    constexpr uint64_t min_pending_output = blob_file_number;
+
+    versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
+                                min_pending_output);
+
+    ASSERT_TRUE(blob_files.empty());
+  }
+
+  // Make sure the blob file is returned as obsolete if it's not in the pending
+  // range.
+  {
+    std::vector<ObsoleteFileInfo> table_files;
+    std::vector<ObsoleteBlobFileInfo> blob_files;
+    std::vector<std::string> manifest_files;
+    constexpr uint64_t min_pending_output = blob_file_number + 1;
+
+    versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
+                                min_pending_output);
+
+    ASSERT_EQ(blob_files.size(), 1);
+    ASSERT_EQ(blob_files[0].GetBlobFileNumber(), blob_file_number);
+  }
+
+  // Make sure it's not returned a second time.
+  {
+    std::vector<ObsoleteFileInfo> table_files;
+    std::vector<ObsoleteBlobFileInfo> blob_files;
+    std::vector<std::string> manifest_files;
+    constexpr uint64_t min_pending_output = blob_file_number + 1;
+
+    versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
+                                min_pending_output);
+
+    ASSERT_TRUE(blob_files.empty());
+  }
+}
+
+TEST_F(VersionSetTest, WalEditsNotAppliedToVersion) {
+  NewDB();
+
+  constexpr uint64_t kNumWals = 5;
+
+  autovector<std::unique_ptr<VersionEdit>> edits;
+  // Add some WALs.
+  for (uint64_t i = 1; i <= kNumWals; i++) {
+    edits.emplace_back(new VersionEdit);
+    // WAL's size equals its log number.
+    edits.back()->AddWal(i, WalMetadata(i));
+  }
+  // Delete the first half of the WALs.
+  edits.emplace_back(new VersionEdit);
+  edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
+
+  autovector<Version*> versions;
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::ProcessManifestWrites:NewVersion",
+      [&](void* arg) { versions.push_back(reinterpret_cast<Version*>(arg)); });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  LogAndApplyToDefaultCF(edits);
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+
+  // Since the edits are all WAL edits, no version should be created.
+  ASSERT_EQ(versions.size(), 1);
+  ASSERT_EQ(versions[0], nullptr);
+}
+
+// Similar to WalEditsNotAppliedToVersion, but contains a non-WAL edit.
+TEST_F(VersionSetTest, NonWalEditsAppliedToVersion) {
+  NewDB();
+
+  const std::string kDBId = "db_db";
+  constexpr uint64_t kNumWals = 5;
+
+  autovector<std::unique_ptr<VersionEdit>> edits;
+  // Add some WALs.
+  for (uint64_t i = 1; i <= kNumWals; i++) {
+    edits.emplace_back(new VersionEdit);
+    // WAL's size equals its log number.
+    edits.back()->AddWal(i, WalMetadata(i));
+  }
+  // Delete the first half of the WALs.
+  edits.emplace_back(new VersionEdit);
+  edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
+  edits.emplace_back(new VersionEdit);
+  edits.back()->SetDBId(kDBId);
+
+  autovector<Version*> versions;
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::ProcessManifestWrites:NewVersion",
+      [&](void* arg) { versions.push_back(reinterpret_cast<Version*>(arg)); });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  LogAndApplyToDefaultCF(edits);
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+
+  // Since the edits are all WAL edits, no version should be created.
+  ASSERT_EQ(versions.size(), 1);
+  ASSERT_NE(versions[0], nullptr);
+}
+
+TEST_F(VersionSetTest, WalAddition) {
+  NewDB();
+
+  constexpr WalNumber kLogNumber = 10;
+  constexpr uint64_t kSizeInBytes = 111;
+
+  // A WAL is just created.
+  {
+    VersionEdit edit;
+    edit.AddWal(kLogNumber);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+
+    const auto& wals = versions_->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 1);
+    ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
+    ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize());
+  }
+
+  // The WAL is synced for several times before closing.
+  {
+    for (uint64_t size_delta = 100; size_delta > 0; size_delta /= 2) {
+      uint64_t size = kSizeInBytes - size_delta;
+      WalMetadata wal(size);
+      VersionEdit edit;
+      edit.AddWal(kLogNumber, wal);
+
+      ASSERT_OK(LogAndApplyToDefaultCF(edit));
+
+      const auto& wals = versions_->GetWalSet().GetWals();
+      ASSERT_EQ(wals.size(), 1);
+      ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
+      ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
+      ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), size);
+    }
+  }
+
+  // The WAL is closed.
+  {
+    WalMetadata wal(kSizeInBytes);
+    VersionEdit edit;
+    edit.AddWal(kLogNumber, wal);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+
+    const auto& wals = versions_->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 1);
+    ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
+    ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
+    ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
+  }
+
+  // Recover a new VersionSet.
+  {
+    std::unique_ptr<VersionSet> new_versions(
+        new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
+                       &write_buffer_manager_, &write_controller_,
+                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
+    ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false));
+    const auto& wals = new_versions->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 1);
+    ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
+    ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
+    ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
+  }
+}
+
+TEST_F(VersionSetTest, WalCloseWithoutSync) {
+  NewDB();
+
+  constexpr WalNumber kLogNumber = 10;
+  constexpr uint64_t kSizeInBytes = 111;
+  constexpr uint64_t kSyncedSizeInBytes = kSizeInBytes / 2;
+
+  // A WAL is just created.
+  {
+    VersionEdit edit;
+    edit.AddWal(kLogNumber);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+
+    const auto& wals = versions_->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 1);
+    ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
+    ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize());
+  }
+
+  // The WAL is synced before closing.
+  {
+    WalMetadata wal(kSyncedSizeInBytes);
+    VersionEdit edit;
+    edit.AddWal(kLogNumber, wal);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+
+    const auto& wals = versions_->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 1);
+    ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
+    ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
+    ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
+  }
+
+  // A new WAL with larger log number is created,
+  // implicitly marking the current WAL closed.
+  {
+    VersionEdit edit;
+    edit.AddWal(kLogNumber + 1);
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+
+    const auto& wals = versions_->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 2);
+    ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
+    ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
+    ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
+    ASSERT_TRUE(wals.find(kLogNumber + 1) != wals.end());
+    ASSERT_FALSE(wals.at(kLogNumber + 1).HasSyncedSize());
+  }
+
+  // Recover a new VersionSet.
+  {
+    std::unique_ptr<VersionSet> new_versions(
+        new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
+                       &write_buffer_manager_, &write_controller_,
+                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
+    ASSERT_OK(new_versions->Recover(column_families_, false));
+    const auto& wals = new_versions->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 2);
+    ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
+    ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
+    ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
+  }
+}
+
+TEST_F(VersionSetTest, WalDeletion) {
+  NewDB();
+
+  constexpr WalNumber kClosedLogNumber = 10;
+  constexpr WalNumber kNonClosedLogNumber = 20;
+  constexpr uint64_t kSizeInBytes = 111;
+
+  // Add a non-closed and a closed WAL.
+  {
+    VersionEdit edit;
+    edit.AddWal(kClosedLogNumber, WalMetadata(kSizeInBytes));
+    edit.AddWal(kNonClosedLogNumber);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+
+    const auto& wals = versions_->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 2);
+    ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
+    ASSERT_TRUE(wals.find(kClosedLogNumber) != wals.end());
+    ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
+    ASSERT_TRUE(wals.at(kClosedLogNumber).HasSyncedSize());
+    ASSERT_EQ(wals.at(kClosedLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
+  }
+
+  // Delete the closed WAL.
+  {
+    VersionEdit edit;
+    edit.DeleteWalsBefore(kNonClosedLogNumber);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+
+    const auto& wals = versions_->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 1);
+    ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
+    ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
+  }
+
+  // Recover a new VersionSet, only the non-closed WAL should show up.
+  {
+    std::unique_ptr<VersionSet> new_versions(
+        new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
+                       &write_buffer_manager_, &write_controller_,
+                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
+    ASSERT_OK(new_versions->Recover(column_families_, false));
+    const auto& wals = new_versions->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 1);
+    ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
+    ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
+  }
+
+  // Force the creation of a new MANIFEST file,
+  // only the non-closed WAL should be written to the new MANIFEST.
+  {
+    std::vector<WalAddition> wal_additions;
+    SyncPoint::GetInstance()->SetCallBack(
+        "VersionSet::WriteCurrentStateToManifest:SaveWal", [&](void* arg) {
+          VersionEdit* edit = reinterpret_cast<VersionEdit*>(arg);
+          ASSERT_TRUE(edit->IsWalAddition());
+          for (auto& addition : edit->GetWalAdditions()) {
+            wal_additions.push_back(addition);
+          }
+        });
+    SyncPoint::GetInstance()->EnableProcessing();
+
+    CreateNewManifest();
+
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearAllCallBacks();
+
+    ASSERT_EQ(wal_additions.size(), 1);
+    ASSERT_EQ(wal_additions[0].GetLogNumber(), kNonClosedLogNumber);
+    ASSERT_FALSE(wal_additions[0].GetMetadata().HasSyncedSize());
+  }
+
+  // Recover from the new MANIFEST, only the non-closed WAL should show up.
+  {
+    std::unique_ptr<VersionSet> new_versions(
+        new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
+                       &write_buffer_manager_, &write_controller_,
+                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
+    ASSERT_OK(new_versions->Recover(column_families_, false));
+    const auto& wals = new_versions->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 1);
+    ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
+    ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
+  }
+}
+
+TEST_F(VersionSetTest, WalCreateTwice) {
+  NewDB();
+
+  constexpr WalNumber kLogNumber = 10;
+
+  VersionEdit edit;
+  edit.AddWal(kLogNumber);
+
+  ASSERT_OK(LogAndApplyToDefaultCF(edit));
+
+  Status s = LogAndApplyToDefaultCF(edit);
+  ASSERT_TRUE(s.IsCorruption());
+  ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") !=
+              std::string::npos)
+      << s.ToString();
+}
+
+TEST_F(VersionSetTest, WalCreateAfterClose) {
+  NewDB();
+
+  constexpr WalNumber kLogNumber = 10;
+  constexpr uint64_t kSizeInBytes = 111;
+
+  {
+    // Add a closed WAL.
+    VersionEdit edit;
+    edit.AddWal(kLogNumber);
+    WalMetadata wal(kSizeInBytes);
+    edit.AddWal(kLogNumber, wal);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+  }
+
+  {
+    // Create the same WAL again.
+    VersionEdit edit;
+    edit.AddWal(kLogNumber);
+
+    Status s = LogAndApplyToDefaultCF(edit);
+    ASSERT_TRUE(s.IsCorruption());
+    ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") !=
+                std::string::npos)
+        << s.ToString();
+  }
+}
+
+TEST_F(VersionSetTest, AddWalWithSmallerSize) {
+  NewDB();
+
+  constexpr WalNumber kLogNumber = 10;
+  constexpr uint64_t kSizeInBytes = 111;
+
+  {
+    // Add a closed WAL.
+    VersionEdit edit;
+    WalMetadata wal(kSizeInBytes);
+    edit.AddWal(kLogNumber, wal);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+  }
+
+  {
+    // Add the same WAL with smaller synced size.
+    VersionEdit edit;
+    WalMetadata wal(kSizeInBytes / 2);
+    edit.AddWal(kLogNumber, wal);
+
+    Status s = LogAndApplyToDefaultCF(edit);
+    ASSERT_TRUE(s.IsCorruption());
+    ASSERT_TRUE(
+        s.ToString().find(
+            "WAL 10 must not have smaller synced size than previous one") !=
+        std::string::npos)
+        << s.ToString();
+  }
+}
+
+TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) {
+  NewDB();
+
+  constexpr WalNumber kLogNumber0 = 10;
+  constexpr WalNumber kLogNumber1 = 20;
+  constexpr WalNumber kNonExistingNumber = 15;
+  constexpr uint64_t kSizeInBytes = 111;
+
+  {
+    // Add closed WALs.
+    VersionEdit edit;
+    WalMetadata wal(kSizeInBytes);
+    edit.AddWal(kLogNumber0, wal);
+    edit.AddWal(kLogNumber1, wal);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+  }
+
+  {
+    // Delete WALs before a non-existing WAL.
+    VersionEdit edit;
+    edit.DeleteWalsBefore(kNonExistingNumber);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+  }
+
+  // Recover a new VersionSet, WAL0 is deleted, WAL1 is not.
+  {
+    std::unique_ptr<VersionSet> new_versions(
+        new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
+                       &write_buffer_manager_, &write_controller_,
+                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
+    ASSERT_OK(new_versions->Recover(column_families_, false));
+    const auto& wals = new_versions->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 1);
+    ASSERT_TRUE(wals.find(kLogNumber1) != wals.end());
+  }
+}
+
+TEST_F(VersionSetTest, DeleteAllWals) {
+  NewDB();
+
+  constexpr WalNumber kMaxLogNumber = 10;
+  constexpr uint64_t kSizeInBytes = 111;
+
+  {
+    // Add a closed WAL.
+    VersionEdit edit;
+    WalMetadata wal(kSizeInBytes);
+    edit.AddWal(kMaxLogNumber, wal);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+  }
+
+  {
+    VersionEdit edit;
+    edit.DeleteWalsBefore(kMaxLogNumber + 10);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+  }
+
+  // Recover a new VersionSet, all WALs are deleted.
+  {
+    std::unique_ptr<VersionSet> new_versions(
+        new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
+                       &write_buffer_manager_, &write_controller_,
+                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
+    ASSERT_OK(new_versions->Recover(column_families_, false));
+    const auto& wals = new_versions->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 0);
+  }
+}
+
+TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
+  NewDB();
+
+  constexpr int kAtomicGroupSize = 7;
+  constexpr uint64_t kNumWals = 5;
+  const std::string kDBId = "db_db";
+
+  int remaining = kAtomicGroupSize;
+  autovector<std::unique_ptr<VersionEdit>> edits;
+  // Add 5 WALs.
+  for (uint64_t i = 1; i <= kNumWals; i++) {
+    edits.emplace_back(new VersionEdit);
+    // WAL's size equals its log number.
+    edits.back()->AddWal(i, WalMetadata(i));
+    edits.back()->MarkAtomicGroup(--remaining);
+  }
+  // One edit with the min log number set.
+  edits.emplace_back(new VersionEdit);
+  edits.back()->SetDBId(kDBId);
+  edits.back()->MarkAtomicGroup(--remaining);
+  // Delete the first added 4 WALs.
+  edits.emplace_back(new VersionEdit);
+  edits.back()->DeleteWalsBefore(kNumWals);
+  edits.back()->MarkAtomicGroup(--remaining);
+  ASSERT_EQ(remaining, 0);
+
+  Status s = LogAndApplyToDefaultCF(edits);
+
+  // Recover a new VersionSet, the min log number and the last WAL should be
+  // kept.
+  {
+    std::unique_ptr<VersionSet> new_versions(
+        new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
+                       &write_buffer_manager_, &write_controller_,
+                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
+    std::string db_id;
+    ASSERT_OK(
+        new_versions->Recover(column_families_, /*read_only=*/false, &db_id));
+
+    ASSERT_EQ(db_id, kDBId);
+
+    const auto& wals = new_versions->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 1);
+    ASSERT_TRUE(wals.find(kNumWals) != wals.end());
+    ASSERT_TRUE(wals.at(kNumWals).HasSyncedSize());
+    ASSERT_EQ(wals.at(kNumWals).GetSyncedSizeInBytes(), kNumWals);
+  }
+}
+
 class VersionSetAtomicGroupTest : public VersionSetTestBase,
                                   public testing::Test {
  public:
-  VersionSetAtomicGroupTest() : VersionSetTestBase() {}
+  VersionSetAtomicGroupTest()
+      : VersionSetTestBase("version_set_atomic_group_test") {}
 
   void SetUp() override {
     PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
@@ -796,7 +1687,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase,
       edits_[i].MarkAtomicGroup(--remaining);
       edits_[i].SetLastSequence(last_seqno_++);
     }
-    ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
+    ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
   }
 
   void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {
@@ -808,7 +1699,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase,
       edits_[i].MarkAtomicGroup(--remaining);
       edits_[i].SetLastSequence(last_seqno_++);
     }
-    ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
+    ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
   }
 
   void SetupCorruptedAtomicGroup(int atomic_group_size) {
@@ -822,7 +1713,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase,
       }
       edits_[i].SetLastSequence(last_seqno_++);
     }
-    ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
+    ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
   }
 
   void SetupIncorrectAtomicGroup(int atomic_group_size) {
@@ -838,7 +1729,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase,
       }
       edits_[i].SetLastSequence(last_seqno_++);
     }
-    ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
+    ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
   }
 
   void SetupTestSyncPoints() {
@@ -859,6 +1750,10 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase,
           EXPECT_TRUE(first_in_atomic_group_);
           last_in_atomic_group_ = true;
         });
+    SyncPoint::GetInstance()->SetCallBack(
+        "VersionEditHandlerBase::Iterate:Finish", [&](void* arg) {
+          num_recovered_edits_ = *reinterpret_cast<int*>(arg);
+        });
     SyncPoint::GetInstance()->SetCallBack(
         "VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) {
           num_recovered_edits_ = *reinterpret_cast<int*>(arg);
@@ -1104,7 +1999,7 @@ TEST_F(VersionSetAtomicGroupTest,
   // Write the corrupted edits.
   AddNewEditsToLog(kAtomicGroupSize);
   mu.Lock();
-  EXPECT_OK(
+  EXPECT_NOK(
       reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
   mu.Unlock();
   EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
@@ -1154,7 +2049,7 @@ TEST_F(VersionSetAtomicGroupTest,
                                         &manifest_reader_status));
   AddNewEditsToLog(kAtomicGroupSize);
   mu.Lock();
-  EXPECT_OK(
+  EXPECT_NOK(
       reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
   mu.Unlock();
   EXPECT_EQ(edits_[1].DebugString(),
@@ -1164,7 +2059,8 @@ TEST_F(VersionSetAtomicGroupTest,
 class VersionSetTestDropOneCF : public VersionSetTestBase,
                                 public testing::TestWithParam<std::string> {
  public:
-  VersionSetTestDropOneCF() : VersionSetTestBase() {}
+  VersionSetTestDropOneCF()
+      : VersionSetTestBase("version_set_test_drop_one_cf") {}
 };
 
 // This test simulates the following execution sequence
@@ -1189,7 +2085,7 @@ TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
   SequenceNumber last_seqno;
   std::unique_ptr<log::Writer> log_writer;
   PrepareManifest(&column_families, &last_seqno, &log_writer);
-  Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
+  Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
   ASSERT_OK(s);
 
   EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
@@ -1279,6 +2175,712 @@ INSTANTIATE_TEST_CASE_P(
     testing::Values(VersionSetTestBase::kColumnFamilyName1,
                     VersionSetTestBase::kColumnFamilyName2,
                     VersionSetTestBase::kColumnFamilyName3));
+
+class EmptyDefaultCfNewManifest : public VersionSetTestBase,
+                                  public testing::Test {
+ public:
+  EmptyDefaultCfNewManifest() : VersionSetTestBase("version_set_new_db_test") {}
+  // Emulate DBImpl::NewDB()
+  void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/,
+                       SequenceNumber* /*last_seqno*/,
+                       std::unique_ptr<log::Writer>* log_writer) override {
+    assert(log_writer != nullptr);
+    VersionEdit new_db;
+    new_db.SetLogNumber(0);
+    std::unique_ptr<WritableFile> file;
+    const std::string manifest_path = DescriptorFileName(dbname_, 1);
+    Status s = env_->NewWritableFile(
+        manifest_path, &file, env_->OptimizeForManifestWrite(env_options_));
+    ASSERT_OK(s);
+    std::unique_ptr<WritableFileWriter> file_writer(
+        new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)),
+                               manifest_path, env_options_));
+    log_writer->reset(new log::Writer(std::move(file_writer), 0, true));
+    std::string record;
+    ASSERT_TRUE(new_db.EncodeTo(&record));
+    s = (*log_writer)->AddRecord(record);
+    ASSERT_OK(s);
+    // Create new column family
+    VersionEdit new_cf;
+    new_cf.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1);
+    new_cf.SetColumnFamily(1);
+    new_cf.SetLastSequence(2);
+    new_cf.SetNextFile(2);
+    record.clear();
+    ASSERT_TRUE(new_cf.EncodeTo(&record));
+    s = (*log_writer)->AddRecord(record);
+    ASSERT_OK(s);
+  }
+
+ protected:
+  bool write_dbid_to_manifest_ = false;
+  std::unique_ptr<log::Writer> log_writer_;
+};
+
+// Create db, create column family. Cf creation will switch to a new MANIFEST.
+// Then reopen db, trying to recover.
+TEST_F(EmptyDefaultCfNewManifest, Recover) {
+  PrepareManifest(nullptr, nullptr, &log_writer_);
+  log_writer_.reset();
+  Status s =
+      SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
+  ASSERT_OK(s);
+  std::string manifest_path;
+  VerifyManifest(&manifest_path);
+  std::vector<ColumnFamilyDescriptor> column_families;
+  column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
+  column_families.emplace_back(VersionSetTestBase::kColumnFamilyName1,
+                               cf_options_);
+  std::string db_id;
+  bool has_missing_table_file = false;
+  s = versions_->TryRecoverFromOneManifest(
+      manifest_path, column_families, false, &db_id, &has_missing_table_file);
+  ASSERT_OK(s);
+  ASSERT_FALSE(has_missing_table_file);
+}
+
+class VersionSetTestEmptyDb
+    : public VersionSetTestBase,
+      public testing::TestWithParam<
+          std::tuple<bool, bool, std::vector<std::string>>> {
+ public:
+  static const std::string kUnknownColumnFamilyName;
+  VersionSetTestEmptyDb() : VersionSetTestBase("version_set_test_empty_db") {}
+
+ protected:
+  void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/,
+                       SequenceNumber* /*last_seqno*/,
+                       std::unique_ptr<log::Writer>* log_writer) override {
+    assert(nullptr != log_writer);
+    VersionEdit new_db;
+    if (db_options_.write_dbid_to_manifest) {
+      DBOptions tmp_db_options;
+      tmp_db_options.env = env_;
+      std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
+      std::string db_id;
+      impl->GetDbIdentityFromIdentityFile(&db_id);
+      new_db.SetDBId(db_id);
+    }
+    const std::string manifest_path = DescriptorFileName(dbname_, 1);
+    std::unique_ptr<WritableFile> file;
+    Status s = env_->NewWritableFile(
+        manifest_path, &file, env_->OptimizeForManifestWrite(env_options_));
+    ASSERT_OK(s);
+    std::unique_ptr<WritableFileWriter> file_writer(
+        new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)),
+                               manifest_path, env_options_));
+    {
+      log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
+      std::string record;
+      new_db.EncodeTo(&record);
+      s = (*log_writer)->AddRecord(record);
+      ASSERT_OK(s);
+    }
+  }
+
+  std::unique_ptr<log::Writer> log_writer_;
+};
+
+const std::string VersionSetTestEmptyDb::kUnknownColumnFamilyName = "unknown";
+
+TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) {
+  db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
+  PrepareManifest(nullptr, nullptr, &log_writer_);
+  log_writer_.reset();
+  Status s =
+      SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
+  ASSERT_OK(s);
+
+  std::string manifest_path;
+  VerifyManifest(&manifest_path);
+
+  bool read_only = std::get<1>(GetParam());
+  const std::vector<std::string> cf_names = std::get<2>(GetParam());
+
+  std::vector<ColumnFamilyDescriptor> column_families;
+  for (const auto& cf_name : cf_names) {
+    column_families.emplace_back(cf_name, cf_options_);
+  }
+
+  std::string db_id;
+  bool has_missing_table_file = false;
+  s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
+                                           read_only, &db_id,
+                                           &has_missing_table_file);
+  auto iter =
+      std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
+  if (iter == cf_names.end()) {
+    ASSERT_TRUE(s.IsInvalidArgument());
+  } else {
+    ASSERT_TRUE(s.IsCorruption());
+  }
+}
+
+TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest1) {
+  db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
+  PrepareManifest(nullptr, nullptr, &log_writer_);
+  // Only a subset of column families in the MANIFEST.
+  VersionEdit new_cf1;
+  new_cf1.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1);
+  new_cf1.SetColumnFamily(1);
+  Status s;
+  {
+    std::string record;
+    new_cf1.EncodeTo(&record);
+    s = log_writer_->AddRecord(record);
+    ASSERT_OK(s);
+  }
+  log_writer_.reset();
+  s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
+  ASSERT_OK(s);
+
+  std::string manifest_path;
+  VerifyManifest(&manifest_path);
+
+  bool read_only = std::get<1>(GetParam());
+  const std::vector<std::string>& cf_names = std::get<2>(GetParam());
+  std::vector<ColumnFamilyDescriptor> column_families;
+  for (const auto& cf_name : cf_names) {
+    column_families.emplace_back(cf_name, cf_options_);
+  }
+  std::string db_id;
+  bool has_missing_table_file = false;
+  s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
+                                           read_only, &db_id,
+                                           &has_missing_table_file);
+  auto iter =
+      std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
+  if (iter == cf_names.end()) {
+    ASSERT_TRUE(s.IsInvalidArgument());
+  } else {
+    ASSERT_TRUE(s.IsCorruption());
+  }
+}
+
+TEST_P(VersionSetTestEmptyDb, OpenFromInCompleteManifest2) {
+  db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
+  PrepareManifest(nullptr, nullptr, &log_writer_);
+  // Write all column families but no log_number, next_file_number and
+  // last_sequence.
+  const std::vector<std::string> all_cf_names = {
+      kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
+      kColumnFamilyName3};
+  uint32_t cf_id = 1;
+  Status s;
+  for (size_t i = 1; i != all_cf_names.size(); ++i) {
+    VersionEdit new_cf;
+    new_cf.AddColumnFamily(all_cf_names[i]);
+    new_cf.SetColumnFamily(cf_id++);
+    std::string record;
+    ASSERT_TRUE(new_cf.EncodeTo(&record));
+    s = log_writer_->AddRecord(record);
+    ASSERT_OK(s);
+  }
+  log_writer_.reset();
+  s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
+  ASSERT_OK(s);
+
+  std::string manifest_path;
+  VerifyManifest(&manifest_path);
+
+  bool read_only = std::get<1>(GetParam());
+  const std::vector<std::string>& cf_names = std::get<2>(GetParam());
+  std::vector<ColumnFamilyDescriptor> column_families;
+  for (const auto& cf_name : cf_names) {
+    column_families.emplace_back(cf_name, cf_options_);
+  }
+  std::string db_id;
+  bool has_missing_table_file = false;
+  s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
+                                           read_only, &db_id,
+                                           &has_missing_table_file);
+  auto iter =
+      std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
+  if (iter == cf_names.end()) {
+    ASSERT_TRUE(s.IsInvalidArgument());
+  } else {
+    ASSERT_TRUE(s.IsCorruption());
+  }
+}
+
+TEST_P(VersionSetTestEmptyDb, OpenManifestWithUnknownCF) {
+  db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
+  PrepareManifest(nullptr, nullptr, &log_writer_);
+  // Write all column families but no log_number, next_file_number and
+  // last_sequence.
+  const std::vector<std::string> all_cf_names = {
+      kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
+      kColumnFamilyName3};
+  uint32_t cf_id = 1;
+  Status s;
+  for (size_t i = 1; i != all_cf_names.size(); ++i) {
+    VersionEdit new_cf;
+    new_cf.AddColumnFamily(all_cf_names[i]);
+    new_cf.SetColumnFamily(cf_id++);
+    std::string record;
+    ASSERT_TRUE(new_cf.EncodeTo(&record));
+    s = log_writer_->AddRecord(record);
+    ASSERT_OK(s);
+  }
+  {
+    VersionEdit tmp_edit;
+    tmp_edit.SetColumnFamily(4);
+    tmp_edit.SetLogNumber(0);
+    tmp_edit.SetNextFile(2);
+    tmp_edit.SetLastSequence(0);
+    std::string record;
+    ASSERT_TRUE(tmp_edit.EncodeTo(&record));
+    s = log_writer_->AddRecord(record);
+    ASSERT_OK(s);
+  }
+  log_writer_.reset();
+  s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
+  ASSERT_OK(s);
+
+  std::string manifest_path;
+  VerifyManifest(&manifest_path);
+
+  bool read_only = std::get<1>(GetParam());
+  const std::vector<std::string>& cf_names = std::get<2>(GetParam());
+  std::vector<ColumnFamilyDescriptor> column_families;
+  for (const auto& cf_name : cf_names) {
+    column_families.emplace_back(cf_name, cf_options_);
+  }
+  std::string db_id;
+  bool has_missing_table_file = false;
+  s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
+                                           read_only, &db_id,
+                                           &has_missing_table_file);
+  auto iter =
+      std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
+  if (iter == cf_names.end()) {
+    ASSERT_TRUE(s.IsInvalidArgument());
+  } else {
+    ASSERT_TRUE(s.IsCorruption());
+  }
+}
+
+TEST_P(VersionSetTestEmptyDb, OpenCompleteManifest) {
+  db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
+  PrepareManifest(nullptr, nullptr, &log_writer_);
+  // Write all column families but no log_number, next_file_number and
+  // last_sequence.
+  const std::vector<std::string> all_cf_names = {
+      kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
+      kColumnFamilyName3};
+  uint32_t cf_id = 1;
+  Status s;
+  for (size_t i = 1; i != all_cf_names.size(); ++i) {
+    VersionEdit new_cf;
+    new_cf.AddColumnFamily(all_cf_names[i]);
+    new_cf.SetColumnFamily(cf_id++);
+    std::string record;
+    ASSERT_TRUE(new_cf.EncodeTo(&record));
+    s = log_writer_->AddRecord(record);
+    ASSERT_OK(s);
+  }
+  {
+    VersionEdit tmp_edit;
+    tmp_edit.SetLogNumber(0);
+    tmp_edit.SetNextFile(2);
+    tmp_edit.SetLastSequence(0);
+    std::string record;
+    ASSERT_TRUE(tmp_edit.EncodeTo(&record));
+    s = log_writer_->AddRecord(record);
+    ASSERT_OK(s);
+  }
+  log_writer_.reset();
+  s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
+  ASSERT_OK(s);
+
+  std::string manifest_path;
+  VerifyManifest(&manifest_path);
+
+  bool read_only = std::get<1>(GetParam());
+  const std::vector<std::string>& cf_names = std::get<2>(GetParam());
+  std::vector<ColumnFamilyDescriptor> column_families;
+  for (const auto& cf_name : cf_names) {
+    column_families.emplace_back(cf_name, cf_options_);
+  }
+  std::string db_id;
+  bool has_missing_table_file = false;
+  s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
+                                           read_only, &db_id,
+                                           &has_missing_table_file);
+  auto iter =
+      std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
+  if (iter == cf_names.end()) {
+    ASSERT_TRUE(s.IsInvalidArgument());
+  } else if (read_only) {
+    ASSERT_OK(s);
+    ASSERT_FALSE(has_missing_table_file);
+  } else if (cf_names.size() == all_cf_names.size()) {
+    ASSERT_OK(s);
+    ASSERT_FALSE(has_missing_table_file);
+  } else if (cf_names.size() < all_cf_names.size()) {
+    ASSERT_TRUE(s.IsInvalidArgument());
+  } else {
+    ASSERT_OK(s);
+    ASSERT_FALSE(has_missing_table_file);
+    ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(
+        kUnknownColumnFamilyName);
+    ASSERT_EQ(nullptr, cfd);
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(
+    BestEffortRecovery, VersionSetTestEmptyDb,
+    testing::Combine(
+        /*write_dbid_to_manifest=*/testing::Bool(),
+        /*read_only=*/testing::Bool(),
+        /*cf_names=*/
+        testing::Values(
+            std::vector<std::string>(),
+            std::vector<std::string>({kDefaultColumnFamilyName}),
+            std::vector<std::string>({VersionSetTestBase::kColumnFamilyName1,
+                                      VersionSetTestBase::kColumnFamilyName2,
+                                      VersionSetTestBase::kColumnFamilyName3}),
+            std::vector<std::string>({kDefaultColumnFamilyName,
+                                      VersionSetTestBase::kColumnFamilyName1}),
+            std::vector<std::string>({kDefaultColumnFamilyName,
+                                      VersionSetTestBase::kColumnFamilyName1,
+                                      VersionSetTestBase::kColumnFamilyName2,
+                                      VersionSetTestBase::kColumnFamilyName3}),
+            std::vector<std::string>(
+                {kDefaultColumnFamilyName,
+                 VersionSetTestBase::kColumnFamilyName1,
+                 VersionSetTestBase::kColumnFamilyName2,
+                 VersionSetTestBase::kColumnFamilyName3,
+                 VersionSetTestEmptyDb::kUnknownColumnFamilyName}))));
+
+class VersionSetTestMissingFiles : public VersionSetTestBase,
+                                   public testing::Test {
+ public:
+  VersionSetTestMissingFiles()
+      : VersionSetTestBase("version_set_test_missing_files"),
+        block_based_table_options_(),
+        table_factory_(std::make_shared<BlockBasedTableFactory>(
+            block_based_table_options_)),
+        internal_comparator_(
+            std::make_shared<InternalKeyComparator>(options_.comparator)) {}
+
+ protected:
+  void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
+                       SequenceNumber* last_seqno,
+                       std::unique_ptr<log::Writer>* log_writer) override {
+    assert(column_families != nullptr);
+    assert(last_seqno != nullptr);
+    assert(log_writer != nullptr);
+    const std::string manifest = DescriptorFileName(dbname_, 1);
+    std::unique_ptr<WritableFile> file;
+    Status s = env_->NewWritableFile(
+        manifest, &file, env_->OptimizeForManifestWrite(env_options_));
+    ASSERT_OK(s);
+    std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+        NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));
+    log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
+    VersionEdit new_db;
+    if (db_options_.write_dbid_to_manifest) {
+      DBOptions tmp_db_options;
+      tmp_db_options.env = env_;
+      std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
+      std::string db_id;
+      impl->GetDbIdentityFromIdentityFile(&db_id);
+      new_db.SetDBId(db_id);
+    }
+    {
+      std::string record;
+      ASSERT_TRUE(new_db.EncodeTo(&record));
+      s = (*log_writer)->AddRecord(record);
+      ASSERT_OK(s);
+    }
+    const std::vector<std::string> cf_names = {
+        kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
+        kColumnFamilyName3};
+    uint32_t cf_id = 1;  // default cf id is 0
+    cf_options_.table_factory = table_factory_;
+    for (const auto& cf_name : cf_names) {
+      column_families->emplace_back(cf_name, cf_options_);
+      if (cf_name == kDefaultColumnFamilyName) {
+        continue;
+      }
+      VersionEdit new_cf;
+      new_cf.AddColumnFamily(cf_name);
+      new_cf.SetColumnFamily(cf_id);
+      std::string record;
+      ASSERT_TRUE(new_cf.EncodeTo(&record));
+      s = (*log_writer)->AddRecord(record);
+      ASSERT_OK(s);
+
+      VersionEdit cf_files;
+      cf_files.SetColumnFamily(cf_id);
+      cf_files.SetLogNumber(0);
+      record.clear();
+      ASSERT_TRUE(cf_files.EncodeTo(&record));
+      s = (*log_writer)->AddRecord(record);
+      ASSERT_OK(s);
+      ++cf_id;
+    }
+    SequenceNumber seq = 2;
+    {
+      VersionEdit edit;
+      edit.SetNextFile(7);
+      edit.SetLastSequence(seq);
+      std::string record;
+      ASSERT_TRUE(edit.EncodeTo(&record));
+      s = (*log_writer)->AddRecord(record);
+      ASSERT_OK(s);
+    }
+    *last_seqno = seq + 1;
+  }
+
+  struct SstInfo {
+    uint64_t file_number;
+    std::string column_family;
+    std::string key;  // the only key
+    int level = 0;
+    SstInfo(uint64_t file_num, const std::string& cf_name,
+            const std::string& _key)
+        : SstInfo(file_num, cf_name, _key, 0) {}
+    SstInfo(uint64_t file_num, const std::string& cf_name,
+            const std::string& _key, int lvl)
+        : file_number(file_num),
+          column_family(cf_name),
+          key(_key),
+          level(lvl) {}
+  };
+
+  // Create dummy sst, return their metadata. Note that only file name and size
+  // are used.
+  void CreateDummyTableFiles(const std::vector<SstInfo>& file_infos,
+                             std::vector<FileMetaData>* file_metas) {
+    assert(file_metas != nullptr);
+    for (const auto& info : file_infos) {
+      uint64_t file_num = info.file_number;
+      std::string fname = MakeTableFileName(dbname_, file_num);
+      std::unique_ptr<FSWritableFile> file;
+      Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr);
+      ASSERT_OK(s);
+      std::unique_ptr<WritableFileWriter> fwriter(
+          new WritableFileWriter(std::move(file), fname, FileOptions(), env_));
+      std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
+          int_tbl_prop_collector_factories;
+
+      std::unique_ptr<TableBuilder> builder(table_factory_->NewTableBuilder(
+          TableBuilderOptions(
+              immutable_cf_options_, mutable_cf_options_, *internal_comparator_,
+              &int_tbl_prop_collector_factories, kNoCompression,
+              /*_sample_for_compression=*/0, CompressionOptions(),
+              /*_skip_filters=*/false, info.column_family, info.level),
+          TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
+          fwriter.get()));
+      InternalKey ikey(info.key, 0, ValueType::kTypeValue);
+      builder->Add(ikey.Encode(), "value");
+      ASSERT_OK(builder->Finish());
+      fwriter->Flush();
+      uint64_t file_size = 0;
+      s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr);
+      ASSERT_OK(s);
+      ASSERT_NE(0, file_size);
+      FileMetaData meta;
+      meta = FileMetaData(file_num, /*file_path_id=*/0, file_size, ikey, ikey,
+                          0, 0, false, 0, 0, 0, kUnknownFileChecksum,
+                          kUnknownFileChecksumFuncName);
+      file_metas->emplace_back(meta);
+    }
+  }
+
+  // This method updates last_sequence_.
+  void WriteFileAdditionAndDeletionToManifest(
+      uint32_t cf, const std::vector<std::pair<int, FileMetaData>>& added_files,
+      const std::vector<std::pair<int, uint64_t>>& deleted_files) {
+    VersionEdit edit;
+    edit.SetColumnFamily(cf);
+    for (const auto& elem : added_files) {
+      int level = elem.first;
+      edit.AddFile(level, elem.second);
+    }
+    for (const auto& elem : deleted_files) {
+      int level = elem.first;
+      edit.DeleteFile(level, elem.second);
+    }
+    edit.SetLastSequence(last_seqno_);
+    ++last_seqno_;
+    assert(log_writer_.get() != nullptr);
+    std::string record;
+    ASSERT_TRUE(edit.EncodeTo(&record));
+    Status s = log_writer_->AddRecord(record);
+    ASSERT_OK(s);
+  }
+
+  BlockBasedTableOptions block_based_table_options_;
+  std::shared_ptr<TableFactory> table_factory_;
+  std::shared_ptr<InternalKeyComparator> internal_comparator_;
+  std::vector<ColumnFamilyDescriptor> column_families_;
+  SequenceNumber last_seqno_;
+  std::unique_ptr<log::Writer> log_writer_;
+};
+
+TEST_F(VersionSetTestMissingFiles, ManifestFarBehindSst) {
+  std::vector<SstInfo> existing_files = {
+      SstInfo(100, kDefaultColumnFamilyName, "a"),
+      SstInfo(102, kDefaultColumnFamilyName, "b"),
+      SstInfo(103, kDefaultColumnFamilyName, "c"),
+      SstInfo(107, kDefaultColumnFamilyName, "d"),
+      SstInfo(110, kDefaultColumnFamilyName, "e")};
+  std::vector<FileMetaData> file_metas;
+  CreateDummyTableFiles(existing_files, &file_metas);
+
+  PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
+  std::vector<std::pair<int, FileMetaData>> added_files;
+  for (uint64_t file_num = 10; file_num < 15; ++file_num) {
+    std::string smallest_ukey = "a";
+    std::string largest_ukey = "b";
+    InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue);
+    InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue);
+    FileMetaData meta =
+        FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12,
+                     smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0,
+                     kUnknownFileChecksum, kUnknownFileChecksumFuncName);
+    added_files.emplace_back(0, meta);
+  }
+  WriteFileAdditionAndDeletionToManifest(
+      /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
+  std::vector<std::pair<int, uint64_t>> deleted_files;
+  deleted_files.emplace_back(0, 10);
+  WriteFileAdditionAndDeletionToManifest(
+      /*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
+  log_writer_.reset();
+  Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
+  ASSERT_OK(s);
+  std::string manifest_path;
+  VerifyManifest(&manifest_path);
+  std::string db_id;
+  bool has_missing_table_file = false;
+  s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
+                                           /*read_only=*/false, &db_id,
+                                           &has_missing_table_file);
+  ASSERT_OK(s);
+  ASSERT_TRUE(has_missing_table_file);
+  for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
+    VersionStorageInfo* vstorage = cfd->current()->storage_info();
+    const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
+    ASSERT_TRUE(files.empty());
+  }
+}
+
+TEST_F(VersionSetTestMissingFiles, ManifestAheadofSst) {
+  std::vector<SstInfo> existing_files = {
+      SstInfo(100, kDefaultColumnFamilyName, "a"),
+      SstInfo(102, kDefaultColumnFamilyName, "b"),
+      SstInfo(103, kDefaultColumnFamilyName, "c"),
+      SstInfo(107, kDefaultColumnFamilyName, "d"),
+      SstInfo(110, kDefaultColumnFamilyName, "e")};
+  std::vector<FileMetaData> file_metas;
+  CreateDummyTableFiles(existing_files, &file_metas);
+
+  PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
+  std::vector<std::pair<int, FileMetaData>> added_files;
+  for (size_t i = 3; i != 5; ++i) {
+    added_files.emplace_back(0, file_metas[i]);
+  }
+  WriteFileAdditionAndDeletionToManifest(
+      /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
+
+  added_files.clear();
+  for (uint64_t file_num = 120; file_num < 130; ++file_num) {
+    std::string smallest_ukey = "a";
+    std::string largest_ukey = "b";
+    InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue);
+    InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue);
+    FileMetaData meta =
+        FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12,
+                     smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0,
+                     kUnknownFileChecksum, kUnknownFileChecksumFuncName);
+    added_files.emplace_back(0, meta);
+  }
+  WriteFileAdditionAndDeletionToManifest(
+      /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
+  log_writer_.reset();
+  Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
+  ASSERT_OK(s);
+  std::string manifest_path;
+  VerifyManifest(&manifest_path);
+  std::string db_id;
+  bool has_missing_table_file = false;
+  s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
+                                           /*read_only=*/false, &db_id,
+                                           &has_missing_table_file);
+  ASSERT_OK(s);
+  ASSERT_TRUE(has_missing_table_file);
+  for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
+    VersionStorageInfo* vstorage = cfd->current()->storage_info();
+    const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
+    if (cfd->GetName() == kDefaultColumnFamilyName) {
+      ASSERT_EQ(2, files.size());
+      for (const auto* fmeta : files) {
+        if (fmeta->fd.GetNumber() != 107 && fmeta->fd.GetNumber() != 110) {
+          ASSERT_FALSE(true);
+        }
+      }
+    } else {
+      ASSERT_TRUE(files.empty());
+    }
+  }
+}
+
+TEST_F(VersionSetTestMissingFiles, NoFileMissing) {
+  std::vector<SstInfo> existing_files = {
+      SstInfo(100, kDefaultColumnFamilyName, "a"),
+      SstInfo(102, kDefaultColumnFamilyName, "b"),
+      SstInfo(103, kDefaultColumnFamilyName, "c"),
+      SstInfo(107, kDefaultColumnFamilyName, "d"),
+      SstInfo(110, kDefaultColumnFamilyName, "e")};
+  std::vector<FileMetaData> file_metas;
+  CreateDummyTableFiles(existing_files, &file_metas);
+
+  PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
+  std::vector<std::pair<int, FileMetaData>> added_files;
+  for (const auto& meta : file_metas) {
+    added_files.emplace_back(0, meta);
+  }
+  WriteFileAdditionAndDeletionToManifest(
+      /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
+  std::vector<std::pair<int, uint64_t>> deleted_files;
+  deleted_files.emplace_back(/*level=*/0, 100);
+  WriteFileAdditionAndDeletionToManifest(
+      /*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
+  log_writer_.reset();
+  Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
+  ASSERT_OK(s);
+  std::string manifest_path;
+  VerifyManifest(&manifest_path);
+  std::string db_id;
+  bool has_missing_table_file = false;
+  s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
+                                           /*read_only=*/false, &db_id,
+                                           &has_missing_table_file);
+  ASSERT_OK(s);
+  ASSERT_FALSE(has_missing_table_file);
+  for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
+    VersionStorageInfo* vstorage = cfd->current()->storage_info();
+    const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
+    if (cfd->GetName() == kDefaultColumnFamilyName) {
+      ASSERT_EQ(existing_files.size() - deleted_files.size(), files.size());
+      bool has_deleted_file = false;
+      for (const auto* fmeta : files) {
+        if (fmeta->fd.GetNumber() == 100) {
+          has_deleted_file = true;
+          break;
+        }
+      }
+      ASSERT_FALSE(has_deleted_file);
+    } else {
+      ASSERT_TRUE(files.empty());
+    }
+  }
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {