]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/corruption_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / corruption_test.cc
index 846ad5402ed8c8cfe240eb1370c01db4501953a8..8ccac61303c65fd8483700e7ab4dd8bb8c235420 100644 (file)
@@ -7,6 +7,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
+#include "rocksdb/options.h"
 #ifndef ROCKSDB_LITE
 
 #include <fcntl.h>
 #include "db/db_test_util.h"
 #include "db/log_format.h"
 #include "db/version_set.h"
-#include "env/composite_env_wrapper.h"
 #include "file/filename.h"
+#include "port/stack_trace.h"
 #include "rocksdb/cache.h"
 #include "rocksdb/convenience.h"
 #include "rocksdb/db.h"
 #include "rocksdb/env.h"
 #include "rocksdb/table.h"
+#include "rocksdb/utilities/transaction_db.h"
 #include "rocksdb/write_batch.h"
 #include "table/block_based/block_based_table_builder.h"
 #include "table/meta_blocks.h"
 namespace ROCKSDB_NAMESPACE {
 
 static constexpr int kValueSize = 1000;
-
+namespace {
+// A wrapper that allows injection of errors.
+class ErrorEnv : public EnvWrapper {
+ public:
+  bool writable_file_error_;
+  int num_writable_file_errors_;
+
+  explicit ErrorEnv(Env* _target)
+      : EnvWrapper(_target),
+        writable_file_error_(false),
+        num_writable_file_errors_(0) {}
+  const char* Name() const override { return "ErrorEnv"; }
+
+  virtual Status NewWritableFile(const std::string& fname,
+                                 std::unique_ptr<WritableFile>* result,
+                                 const EnvOptions& soptions) override {
+    result->reset();
+    if (writable_file_error_) {
+      ++num_writable_file_errors_;
+      return Status::IOError(fname, "fake error");
+    }
+    return target()->NewWritableFile(fname, result, soptions);
+  }
+};
+}  // anonymous namespace
 class CorruptionTest : public testing::Test {
  public:
-  test::ErrorEnv env_;
+  std::shared_ptr<Env> env_guard_;
+  ErrorEnv* env_;
   std::string dbname_;
   std::shared_ptr<Cache> tiny_cache_;
   Options options_;
@@ -53,9 +80,14 @@ class CorruptionTest : public testing::Test {
     // set it to 0), test SequenceNumberRecovery will fail, likely because of a
     // bug in recovery code. Keep it 4 for now to make the test passes.
     tiny_cache_ = NewLRUCache(100, 4);
+    Env* base_env = Env::Default();
+    EXPECT_OK(
+        test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
+    EXPECT_NE(base_env, nullptr);
+    env_ = new ErrorEnv(base_env);
     options_.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
-    options_.env = &env_;
-    dbname_ = test::PerThreadDBPath("corruption_test");
+    options_.env = env_;
+    dbname_ = test::PerThreadDBPath(env_, "corruption_test");
     Status s = DestroyDB(dbname_, options_);
     EXPECT_OK(s);
 
@@ -77,8 +109,11 @@ class CorruptionTest : public testing::Test {
     if (getenv("KEEP_DB")) {
       fprintf(stdout, "db is still at %s\n", dbname_.c_str());
     } else {
-      EXPECT_OK(DestroyDB(dbname_, Options()));
+      Options opts;
+      opts.env = env_->target();
+      EXPECT_OK(DestroyDB(dbname_, opts));
     }
+    delete env_;
   }
 
   void CloseDb() {
@@ -93,7 +128,7 @@ class CorruptionTest : public testing::Test {
     if (opt.env == Options().env) {
       // If env is not overridden, replace it with ErrorEnv.
       // Otherwise, the test already uses a non-default Env.
-      opt.env = &env_;
+      opt.env = env_;
     }
     opt.arena_block_size = 4096;
     BlockBasedTableOptions table_options;
@@ -103,9 +138,7 @@ class CorruptionTest : public testing::Test {
     return DB::Open(opt, dbname_, &db_);
   }
 
-  void Reopen(Options* options = nullptr) {
-    ASSERT_OK(TryReopen(options));
-  }
+  void Reopen(Options* options = nullptr) { ASSERT_OK(TryReopen(options)); }
 
   void RepairDB() {
     delete db_;
@@ -121,7 +154,7 @@ class CorruptionTest : public testing::Test {
         DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
         ASSERT_OK(dbi->TEST_FlushMemTable());
       }
-      //if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
+      // if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
       Slice key = Key(i + start, &key_space);
       batch.Clear();
       ASSERT_OK(batch.Put(key, Value(i + start, &value_space)));
@@ -148,8 +181,7 @@ class CorruptionTest : public testing::Test {
       ASSERT_OK(iter->status());
       uint64_t key;
       Slice in(iter->key());
-      if (!ConsumeDecimalNumber(&in, &key) ||
-          !in.empty() ||
+      if (!ConsumeDecimalNumber(&in, &key) || !in.empty() ||
           key < next_expected) {
         bad_keys++;
         continue;
@@ -165,10 +197,11 @@ class CorruptionTest : public testing::Test {
     iter->status().PermitUncheckedError();
     delete iter;
 
-    fprintf(stderr,
-      "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%llu\n",
-            min_expected, max_expected, correct, bad_keys, bad_values,
-            static_cast<unsigned long long>(missed));
+    fprintf(
+        stderr,
+        "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%llu\n",
+        min_expected, max_expected, correct, bad_keys, bad_values,
+        static_cast<unsigned long long>(missed));
     ASSERT_LE(min_expected, correct);
     ASSERT_GE(max_expected, correct);
   }
@@ -176,14 +209,13 @@ class CorruptionTest : public testing::Test {
   void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
     // Pick file to corrupt
     std::vector<std::string> filenames;
-    ASSERT_OK(env_.GetChildren(dbname_, &filenames));
+    ASSERT_OK(env_->GetChildren(dbname_, &filenames));
     uint64_t number;
     FileType type;
     std::string fname;
     int picked_number = -1;
     for (size_t i = 0; i < filenames.size(); i++) {
-      if (ParseFileName(filenames[i], &number, &type) &&
-          type == filetype &&
+      if (ParseFileName(filenames[i], &number, &type) && type == filetype &&
           static_cast<int>(number) > picked_number) {  // Pick latest file
         fname = dbname_ + "/" + filenames[i];
         picked_number = static_cast<int>(number);
@@ -191,7 +223,7 @@ class CorruptionTest : public testing::Test {
     }
     ASSERT_TRUE(!fname.empty()) << filetype;
 
-    ASSERT_OK(test::CorruptFile(&env_, fname, offset, bytes_to_corrupt));
+    ASSERT_OK(test::CorruptFile(env_, fname, offset, bytes_to_corrupt));
   }
 
   // corrupts exactly one file at level `level`. if no file found at level,
@@ -201,7 +233,7 @@ class CorruptionTest : public testing::Test {
     db_->GetLiveFilesMetaData(&metadata);
     for (const auto& m : metadata) {
       if (m.level == level) {
-        ASSERT_OK(test::CorruptFile(&env_, dbname_ + "/" + m.name, offset,
+        ASSERT_OK(test::CorruptFile(env_, dbname_ + "/" + m.name, offset,
                                     bytes_to_corrupt));
         return;
       }
@@ -209,7 +241,6 @@ class CorruptionTest : public testing::Test {
     FAIL() << "no file found at level";
   }
 
-
   int Property(const std::string& name) {
     std::string property;
     int result;
@@ -242,6 +273,42 @@ class CorruptionTest : public testing::Test {
     }
     return Slice(*storage);
   }
+
+  void GetSortedWalFiles(std::vector<uint64_t>& file_nums) {
+    std::vector<std::string> tmp_files;
+    ASSERT_OK(env_->GetChildren(dbname_, &tmp_files));
+    FileType type = kWalFile;
+    for (const auto& file : tmp_files) {
+      uint64_t number = 0;
+      if (ParseFileName(file, &number, &type) && type == kWalFile) {
+        file_nums.push_back(number);
+      }
+    }
+    std::sort(file_nums.begin(), file_nums.end());
+  }
+
+  void CorruptFileWithTruncation(FileType file, uint64_t number,
+                                 uint64_t bytes_to_truncate = 0) {
+    std::string path;
+    switch (file) {
+      case FileType::kWalFile:
+        path = LogFileName(dbname_, number);
+        break;
+      // TODO: Add other file types as this method is being used for those file
+      // types.
+      default:
+        return;
+    }
+    uint64_t old_size = 0;
+    ASSERT_OK(env_->GetFileSize(path, &old_size));
+    assert(old_size > bytes_to_truncate);
+    uint64_t new_size = old_size - bytes_to_truncate;
+    // If bytes_to_truncate == 0, it will do full truncation.
+    if (bytes_to_truncate == 0) {
+      new_size = 0;
+    }
+    ASSERT_OK(test::TruncateFile(env_, path, new_size));
+  }
 };
 
 TEST_F(CorruptionTest, Recovery) {
@@ -267,15 +334,81 @@ TEST_F(CorruptionTest, Recovery) {
   Check(36, 36);
 }
 
+TEST_F(CorruptionTest, PostPITRCorruptionWALsRetained) {
+  // Repro for bug where WALs following the point-in-time recovery were not
+  // retained leading to the next recovery failing.
+  CloseDb();
+
+  options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+
+  const std::string test_cf_name = "test_cf";
+  std::vector<ColumnFamilyDescriptor> cf_descs;
+  cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
+  cf_descs.emplace_back(test_cf_name, ColumnFamilyOptions());
+
+  uint64_t log_num;
+  {
+    options_.create_missing_column_families = true;
+    std::vector<ColumnFamilyHandle*> cfhs;
+    ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
+    assert(db_ != nullptr);  // suppress false clang-analyze report
+
+    ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k", "v"));
+    ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k", "v"));
+    ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k2", "v2"));
+    std::vector<uint64_t> file_nums;
+    GetSortedWalFiles(file_nums);
+    log_num = file_nums.back();
+    for (auto* cfh : cfhs) {
+      delete cfh;
+    }
+    CloseDb();
+  }
+
+  CorruptFileWithTruncation(FileType::kWalFile, log_num,
+                            /*bytes_to_truncate=*/1);
+
+  {
+    // Recover "k" -> "v" for both CFs. "k2" -> "v2" is lost due to truncation.
+    options_.avoid_flush_during_recovery = true;
+    std::vector<ColumnFamilyHandle*> cfhs;
+    ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
+    assert(db_ != nullptr);  // suppress false clang-analyze report
+
+    // Flush one but not both CFs and write some data so there's a seqno gap
+    // between the PITR corruption and the next DB session's first WAL.
+    ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k2", "v2"));
+    ASSERT_OK(db_->Flush(FlushOptions(), cfhs[1]));
+
+    for (auto* cfh : cfhs) {
+      delete cfh;
+    }
+    CloseDb();
+  }
+
+  // With the bug, this DB open would remove the WALs following the PITR
+  // corruption. Then, the next recovery would fail.
+  for (int i = 0; i < 2; ++i) {
+    std::vector<ColumnFamilyHandle*> cfhs;
+    ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
+    assert(db_ != nullptr);  // suppress false clang-analyze report
+
+    for (auto* cfh : cfhs) {
+      delete cfh;
+    }
+    CloseDb();
+  }
+}
+
 TEST_F(CorruptionTest, RecoverWriteError) {
-  env_.writable_file_error_ = true;
+  env_->writable_file_error_ = true;
   Status s = TryReopen();
   ASSERT_TRUE(!s.ok());
 }
 
 TEST_F(CorruptionTest, NewFileErrorDuringWrite) {
   // Do enough writing to force minor compaction
-  env_.writable_file_error_ = true;
+  env_->writable_file_error_ = true;
   const int num =
       static_cast<int>(3 + (Options().write_buffer_size / kValueSize));
   std::string value_storage;
@@ -291,8 +424,8 @@ TEST_F(CorruptionTest, NewFileErrorDuringWrite) {
     ASSERT_TRUE(!failed || !s.ok());
   }
   ASSERT_TRUE(!s.ok());
-  ASSERT_GE(env_.num_writable_file_errors_, 1);
-  env_.writable_file_error_ = false;
+  ASSERT_GE(env_->num_writable_file_errors_, 1);
+  env_->writable_file_error_ = false;
   Reopen();
 }
 
@@ -310,7 +443,7 @@ TEST_F(CorruptionTest, TableFile) {
 
 TEST_F(CorruptionTest, VerifyChecksumReadahead) {
   Options options;
-  SpecialEnv senv(Env::Default());
+  SpecialEnv senv(env_->target());
   options.env = &senv;
   // Disable block cache as we are going to check checksum for
   // the same file twice and measure number of reads.
@@ -417,7 +550,10 @@ TEST_F(CorruptionTest, CorruptedDescriptor) {
   ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello"));
   DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
   ASSERT_OK(dbi->TEST_FlushMemTable());
-  ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
+  CompactRangeOptions cro;
+  cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+  ASSERT_OK(
+      dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
 
   Corrupt(kDescriptorFile, 0, 1000);
   Status s = TryReopen();
@@ -432,6 +568,7 @@ TEST_F(CorruptionTest, CorruptedDescriptor) {
 
 TEST_F(CorruptionTest, CompactionInputError) {
   Options options;
+  options.env = env_;
   Reopen(&options);
   Build(10);
   DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
@@ -452,6 +589,7 @@ TEST_F(CorruptionTest, CompactionInputError) {
 
 TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
   Options options;
+  options.env = env_;
   options.paranoid_checks = true;
   options.write_buffer_size = 131072;
   options.max_write_buffer_number = 2;
@@ -522,22 +660,23 @@ TEST_F(CorruptionTest, RangeDeletionCorrupted) {
   ASSERT_EQ(static_cast<size_t>(1), metadata.size());
   std::string filename = dbname_ + metadata[0].name;
 
-  std::unique_ptr<RandomAccessFile> file;
-  ASSERT_OK(options_.env->NewRandomAccessFile(filename, &file, EnvOptions()));
-  std::unique_ptr<RandomAccessFileReader> file_reader(
-      new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file),
-                                 filename));
+  FileOptions file_opts;
+  const auto& fs = options_.env->GetFileSystem();
+  std::unique_ptr<RandomAccessFileReader> file_reader;
+  ASSERT_OK(RandomAccessFileReader::Create(fs, filename, file_opts,
+                                           &file_reader, nullptr));
 
   uint64_t file_size;
-  ASSERT_OK(options_.env->GetFileSize(filename, &file_size));
+  ASSERT_OK(
+      fs->GetFileSize(filename, file_opts.io_options, &file_size, nullptr));
 
   BlockHandle range_del_handle;
-  ASSERT_OK(FindMetaBlock(
+  ASSERT_OK(FindMetaBlockInFile(
       file_reader.get(), file_size, kBlockBasedTableMagicNumber,
-      ImmutableCFOptions(options_), kRangeDelBlock, &range_del_handle));
+      ImmutableOptions(options_), kRangeDelBlockName, &range_del_handle));
 
   ASSERT_OK(TryReopen());
-  ASSERT_OK(test::CorruptFile(&env_, filename,
+  ASSERT_OK(test::CorruptFile(env_, filename,
                               static_cast<int>(range_del_handle.offset()), 1));
   ASSERT_TRUE(TryReopen().IsCorruption());
 }
@@ -545,6 +684,7 @@ TEST_F(CorruptionTest, RangeDeletionCorrupted) {
 TEST_F(CorruptionTest, FileSystemStateCorrupted) {
   for (int iter = 0; iter < 2; ++iter) {
     Options options;
+    options.env = env_;
     options.paranoid_checks = true;
     options.create_if_missing = true;
     Reopen(&options);
@@ -561,13 +701,13 @@ TEST_F(CorruptionTest, FileSystemStateCorrupted) {
 
     if (iter == 0) {  // corrupt file size
       std::unique_ptr<WritableFile> file;
-      env_.NewWritableFile(filename, &file, EnvOptions());
+      ASSERT_OK(env_->NewWritableFile(filename, &file, EnvOptions()));
       ASSERT_OK(file->Append(Slice("corrupted sst")));
       file.reset();
       Status x = TryReopen(&options);
       ASSERT_TRUE(x.IsCorruption());
     } else {  // delete the file
-      ASSERT_OK(env_.DeleteFile(filename));
+      ASSERT_OK(env_->DeleteFile(filename));
       Status x = TryReopen(&options);
       ASSERT_TRUE(x.IsCorruption());
     }
@@ -583,6 +723,7 @@ static const auto& corruption_modes = {
 
 TEST_F(CorruptionTest, ParanoidFileChecksOnFlush) {
   Options options;
+  options.env = env_;
   options.check_flush_compaction_key_order = false;
   options.paranoid_file_checks = true;
   options.create_if_missing = true;
@@ -597,7 +738,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnFlush) {
     options.table_factory = mock;
     mock->SetCorruptionMode(mode);
     ASSERT_OK(DB::Open(options, dbname_, &db_));
-    assert(db_ != nullptr);
+    assert(db_ != nullptr);  // suppress false clang-analyze report
     Build(10);
     s = db_->Flush(FlushOptions());
     if (mode == mock::MockTableFactory::kCorruptNone) {
@@ -610,6 +751,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnFlush) {
 
 TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
   Options options;
+  options.env = env_;
   options.paranoid_file_checks = true;
   options.create_if_missing = true;
   options.check_flush_compaction_key_order = false;
@@ -618,17 +760,20 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
     delete db_;
     db_ = nullptr;
     s = DestroyDB(dbname_, options);
+    ASSERT_OK(s);
     std::shared_ptr<mock::MockTableFactory> mock =
         std::make_shared<mock::MockTableFactory>();
     options.table_factory = mock;
     ASSERT_OK(DB::Open(options, dbname_, &db_));
-    assert(db_ != nullptr);
+    assert(db_ != nullptr);  // suppress false clang-analyze report
     Build(100, 2);
     // ASSERT_OK(db_->Flush(FlushOptions()));
     DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
     ASSERT_OK(dbi->TEST_FlushMemTable());
     mock->SetCorruptionMode(mode);
-    s = dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true);
+    CompactRangeOptions cro;
+    cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+    s = dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr);
     if (mode == mock::MockTableFactory::kCorruptNone) {
       ASSERT_OK(s);
     } else {
@@ -639,6 +784,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
 
 TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeFirst) {
   Options options;
+  options.env = env_;
   options.check_flush_compaction_key_order = false;
   options.paranoid_file_checks = true;
   options.create_if_missing = true;
@@ -648,7 +794,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeFirst) {
     ASSERT_OK(DestroyDB(dbname_, options));
     ASSERT_OK(DB::Open(options, dbname_, &db_));
     std::string start, end;
-    assert(db_ != nullptr);
+    assert(db_ != nullptr);  // suppress false clang-analyze report
     ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
                                Key(3, &start), Key(7, &end)));
     auto snap = db_->GetSnapshot();
@@ -663,7 +809,10 @@ TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeFirst) {
     } else {
       DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
       ASSERT_OK(dbi->TEST_FlushMemTable());
-      ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+      CompactRangeOptions cro;
+      cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+      ASSERT_OK(
+          dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
     }
     db_->ReleaseSnapshot(snap);
   }
@@ -671,6 +820,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeFirst) {
 
 TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRange) {
   Options options;
+  options.env = env_;
   options.check_flush_compaction_key_order = false;
   options.paranoid_file_checks = true;
   options.create_if_missing = true;
@@ -679,7 +829,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRange) {
     db_ = nullptr;
     ASSERT_OK(DestroyDB(dbname_, options));
     ASSERT_OK(DB::Open(options, dbname_, &db_));
-    assert(db_ != nullptr);
+    assert(db_ != nullptr);  // suppress false clang-analyze report
     Build(10, 0, 0);
     std::string start, end;
     ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
@@ -698,7 +848,10 @@ TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRange) {
     } else {
       DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
       ASSERT_OK(dbi->TEST_FlushMemTable());
-      ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+      CompactRangeOptions cro;
+      cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+      ASSERT_OK(
+          dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
     }
     db_->ReleaseSnapshot(snap);
   }
@@ -706,6 +859,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRange) {
 
 TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeLast) {
   Options options;
+  options.env = env_;
   options.check_flush_compaction_key_order = false;
   options.paranoid_file_checks = true;
   options.create_if_missing = true;
@@ -714,7 +868,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeLast) {
     db_ = nullptr;
     ASSERT_OK(DestroyDB(dbname_, options));
     ASSERT_OK(DB::Open(options, dbname_, &db_));
-    assert(db_ != nullptr);
+    assert(db_ != nullptr);  // suppress false clang-analyze report
     std::string start, end;
     Build(10);
     ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
@@ -730,7 +884,10 @@ TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeLast) {
     } else {
       DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
       ASSERT_OK(dbi->TEST_FlushMemTable());
-      ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+      CompactRangeOptions cro;
+      cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+      ASSERT_OK(
+          dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
     }
     db_->ReleaseSnapshot(snap);
   }
@@ -738,6 +895,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeLast) {
 
 TEST_F(CorruptionTest, LogCorruptionErrorsInCompactionIterator) {
   Options options;
+  options.env = env_;
   options.create_if_missing = true;
   options.allow_data_in_errors = true;
   auto mode = mock::MockTableFactory::kCorruptKey;
@@ -751,18 +909,22 @@ TEST_F(CorruptionTest, LogCorruptionErrorsInCompactionIterator) {
   options.table_factory = mock;
 
   ASSERT_OK(DB::Open(options, dbname_, &db_));
-  assert(db_ != nullptr);
+  assert(db_ != nullptr);  // suppress false clang-analyze report
   Build(100, 2);
 
   DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
   ASSERT_OK(dbi->TEST_FlushMemTable());
-  Status s = dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true);
+  CompactRangeOptions cro;
+  cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+  Status s =
+      dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr);
   ASSERT_NOK(s);
   ASSERT_TRUE(s.IsCorruption());
 }
 
 TEST_F(CorruptionTest, CompactionKeyOrderCheck) {
   Options options;
+  options.env = env_;
   options.paranoid_file_checks = false;
   options.create_if_missing = true;
   options.check_flush_compaction_key_order = false;
@@ -773,7 +935,7 @@ TEST_F(CorruptionTest, CompactionKeyOrderCheck) {
       std::make_shared<mock::MockTableFactory>();
   options.table_factory = mock;
   ASSERT_OK(DB::Open(options, dbname_, &db_));
-  assert(db_ != nullptr);
+  assert(db_ != nullptr);  // suppress false clang-analyze report
   mock->SetCorruptionMode(mock::MockTableFactory::kCorruptReorderKey);
   Build(100, 2);
   DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
@@ -781,11 +943,15 @@ TEST_F(CorruptionTest, CompactionKeyOrderCheck) {
 
   mock->SetCorruptionMode(mock::MockTableFactory::kCorruptNone);
   ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "true"}}));
-  ASSERT_NOK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+  CompactRangeOptions cro;
+  cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+  ASSERT_NOK(
+      dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
 }
 
 TEST_F(CorruptionTest, FlushKeyOrderCheck) {
   Options options;
+  options.env = env_;
   options.paranoid_file_checks = false;
   options.create_if_missing = true;
   ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "true"}}));
@@ -814,7 +980,6 @@ TEST_F(CorruptionTest, FlushKeyOrderCheck) {
 }
 
 TEST_F(CorruptionTest, DisableKeyOrderCheck) {
-  Options options;
   ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "false"}}));
   DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
 
@@ -828,7 +993,10 @@ TEST_F(CorruptionTest, DisableKeyOrderCheck) {
   ASSERT_OK(db_->Put(WriteOptions(), "foo2", "v1"));
   ASSERT_OK(db_->Put(WriteOptions(), "foo4", "v1"));
   ASSERT_OK(dbi->TEST_FlushMemTable());
-  ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+  CompactRangeOptions cro;
+  cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+  ASSERT_OK(
+      dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
 }
@@ -836,7 +1004,7 @@ TEST_F(CorruptionTest, DisableKeyOrderCheck) {
 TEST_F(CorruptionTest, VerifyWholeTableChecksum) {
   CloseDb();
   Options options;
-  options.env = &env_;
+  options.env = env_;
   ASSERT_OK(DestroyDB(dbname_, options));
   options.create_if_missing = true;
   options.file_checksum_gen_factory =
@@ -857,9 +1025,9 @@ TEST_F(CorruptionTest, VerifyWholeTableChecksum) {
   SyncPoint::GetInstance()->ClearAllCallBacks();
   int count{0};
   SyncPoint::GetInstance()->SetCallBack(
-      "DBImpl::VerifySstFileChecksum:mismatch", [&](void* arg) {
+      "DBImpl::VerifyFullFileChecksum:mismatch", [&](void* arg) {
         auto* s = reinterpret_cast<Status*>(arg);
-        assert(s);
+        ASSERT_NE(s, nullptr);
         ++count;
         ASSERT_NOK(*s);
       });
@@ -868,10 +1036,543 @@ TEST_F(CorruptionTest, VerifyWholeTableChecksum) {
   ASSERT_EQ(1, count);
 }
 
+class CrashDuringRecoveryWithCorruptionTest
+    : public CorruptionTest,
+      public testing::WithParamInterface<std::tuple<bool, bool>> {
+ public:
+  explicit CrashDuringRecoveryWithCorruptionTest()
+      : CorruptionTest(),
+        avoid_flush_during_recovery_(std::get<0>(GetParam())),
+        track_and_verify_wals_in_manifest_(std::get<1>(GetParam())) {}
+
+ protected:
+  const bool avoid_flush_during_recovery_;
+  const bool track_and_verify_wals_in_manifest_;
+};
+
+INSTANTIATE_TEST_CASE_P(CorruptionTest, CrashDuringRecoveryWithCorruptionTest,
+                        ::testing::Values(std::make_tuple(true, false),
+                                          std::make_tuple(false, false),
+                                          std::make_tuple(true, true),
+                                          std::make_tuple(false, true)));
+
+// In case of non-TransactionDB with avoid_flush_during_recovery = true, RocksDB
+// won't flush the data from WAL to L0 for all column families if possible. As a
+// result, not all column families can increase their log_numbers, and
+// min_log_number_to_keep won't change.
+// It may prematurely persist a new MANIFEST even before we can declare the DB
+// is in consistent state after recovery (this is when the new WAL is synced)
+// and advances log_numbers for some column families.
+//
+// If there is power failure before we sync the new WAL, we will end up in
+// a situation in which after persisting the MANIFEST, RocksDB will see some
+// column families' log_numbers larger than the corrupted wal, and
+// "Column family inconsistency: SST file contains data beyond the point of
+// corruption" error will be hit, causing recovery to fail.
+//
+// After adding the fix, only after new WAL is synced, RocksDB persist a new
+// MANIFEST with column families to ensure RocksDB is in consistent state.
+// RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
+// synced immediately afterwards. The sequence number of the sentinel
+// WriteBatch will be the next sequence number immediately after the largest
+// sequence number recovered from previous WALs and MANIFEST because of which DB
+// will be in consistent state.
+// If a future recovery starts from the new MANIFEST, then it means the new WAL
+// is successfully synced. Due to the sentinel empty write batch at the
+// beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
+// If future recovery starts from the old MANIFEST, it means the writing the new
+// MANIFEST failed. It won't have the "SST ahead of WAL" error.
+//
+// The combination of corrupting a WAL and injecting an error during subsequent
+// re-open exposes the bug of prematurely persisting a new MANIFEST with
+// advanced ColumnFamilyData::log_number.
+TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecovery) {
+  CloseDb();
+  Options options;
+  options.track_and_verify_wals_in_manifest =
+      track_and_verify_wals_in_manifest_;
+  options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+  options.avoid_flush_during_recovery = false;
+  options.env = env_;
+  ASSERT_OK(DestroyDB(dbname_, options));
+  options.create_if_missing = true;
+  options.max_write_buffer_number = 8;
+
+  Reopen(&options);
+  Status s;
+  const std::string test_cf_name = "test_cf";
+  ColumnFamilyHandle* cfh = nullptr;
+  s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
+  ASSERT_OK(s);
+  delete cfh;
+  CloseDb();
+
+  std::vector<ColumnFamilyDescriptor> cf_descs;
+  cf_descs.emplace_back(kDefaultColumnFamilyName, options);
+  cf_descs.emplace_back(test_cf_name, options);
+  std::vector<ColumnFamilyHandle*> handles;
+
+  // 1. Open and populate the DB. Write and flush default_cf several times to
+  // advance wal number so that some column families have advanced log_number
+  // while other don't.
+  {
+    ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
+    auto* dbimpl = static_cast_with_check<DBImpl>(db_);
+    assert(dbimpl);
+
+    // Write one key to test_cf.
+    ASSERT_OK(db_->Put(WriteOptions(), handles[1], "old_key", "dontcare"));
+    ASSERT_OK(db_->Flush(FlushOptions(), handles[1]));
+
+    // Write to default_cf and flush this cf several times to advance wal
+    // number. TEST_SwitchMemtable makes sure WALs are not synced and test can
+    // corrupt un-sync WAL.
+    for (int i = 0; i < 2; ++i) {
+      ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
+                         "value" + std::to_string(i)));
+      ASSERT_OK(dbimpl->TEST_SwitchMemtable());
+    }
+
+    for (auto* h : handles) {
+      delete h;
+    }
+    handles.clear();
+    CloseDb();
+  }
+
+  // 2. Corrupt second last un-syned wal file to emulate power reset which
+  // caused the DB to lose the un-synced WAL.
+  {
+    std::vector<uint64_t> file_nums;
+    GetSortedWalFiles(file_nums);
+    size_t size = file_nums.size();
+    assert(size >= 2);
+    uint64_t log_num = file_nums[size - 2];
+    CorruptFileWithTruncation(FileType::kWalFile, log_num,
+                              /*bytes_to_truncate=*/8);
+  }
+
+  // 3. After first crash reopen the DB which contains corrupted WAL. Default
+  // family has higher log number than corrupted wal number.
+  //
+  // Case1: If avoid_flush_during_recovery = true, RocksDB won't flush the data
+  // from WAL to L0 for all column families (test_cf_name in this case). As a
+  // result, not all column families can increase their log_numbers, and
+  // min_log_number_to_keep won't change.
+  //
+  // Case2: If avoid_flush_during_recovery = false, all column families have
+  // flushed their data from WAL to L0 during recovery, and none of them will
+  // ever need to read the WALs again.
+
+  // 4. Fault is injected to fail the recovery.
+  {
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearAllCallBacks();
+    SyncPoint::GetInstance()->SetCallBack(
+        "DBImpl::GetLogSizeAndMaybeTruncate:0", [&](void* arg) {
+          auto* tmp_s = reinterpret_cast<Status*>(arg);
+          assert(tmp_s);
+          *tmp_s = Status::IOError("Injected");
+        });
+    SyncPoint::GetInstance()->EnableProcessing();
+
+    handles.clear();
+    options.avoid_flush_during_recovery = true;
+    s = DB::Open(options, dbname_, cf_descs, &handles, &db_);
+    ASSERT_TRUE(s.IsIOError());
+    ASSERT_EQ("IO error: Injected", s.ToString());
+    for (auto* h : handles) {
+      delete h;
+    }
+    CloseDb();
+
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearAllCallBacks();
+  }
+
+  // 5. After second crash reopen the db with second corruption. Default family
+  // has higher log number than corrupted wal number.
+  //
+  // Case1: If avoid_flush_during_recovery = true, we persist a new
+  // MANIFEST with advanced log_numbers for some column families only after
+  // syncing the WAL. So during second crash, RocksDB will skip the corrupted
+  // WAL files as they have been moved to different folder. Since newly synced
+  // WAL file's sequence number (sentinel WriteBatch) will be the next
+  // sequence number immediately after the largest sequence number recovered
+  // from previous WALs and MANIFEST, db will be in consistent state and opens
+  // successfully.
+  //
+  // Case2: If avoid_flush_during_recovery = false, the corrupted WAL is below
+  // this number. So during a second crash after persisting the new MANIFEST,
+  // RocksDB will skip the corrupted WAL(s) because they are all below this
+  // bound. Therefore, we won't hit the "column family inconsistency" error
+  // message.
+  {
+    options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
+    ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
+
+    // Verify that data is not lost.
+    {
+      std::string v;
+      ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
+      ASSERT_EQ("dontcare", v);
+
+      v.clear();
+      ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(0), &v));
+      ASSERT_EQ("value" + std::to_string(0), v);
+
+      // Since  it's corrupting second last wal, below key is not found.
+      v.clear();
+      ASSERT_EQ(db_->Get(ReadOptions(), "key" + std::to_string(1), &v),
+                Status::NotFound());
+    }
+
+    for (auto* h : handles) {
+      delete h;
+    }
+    handles.clear();
+    CloseDb();
+  }
+}
+
+// In case of TransactionDB, it enables two-phase-commit. The prepare section of
+// an uncommitted transaction always need to be kept. Even if we perform flush
+// during recovery, we may still need to hold an old WAL. The
+// min_log_number_to_keep won't change, and "Column family inconsistency: SST
+// file contains data beyond the point of corruption" error will be hit, causing
+// recovery to fail.
+//
+// After adding the fix, only after new WAL is synced, RocksDB persist a new
+// MANIFEST with column families to ensure RocksDB is in consistent state.
+// RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
+// synced immediately afterwards. The sequence number of the sentinel
+// WriteBatch will be the next sequence number immediately after the largest
+// sequence number recovered from previous WALs and MANIFEST because of which DB
+// will be in consistent state.
+// If a future recovery starts from the new MANIFEST, then it means the new WAL
+// is successfully synced. Due to the sentinel empty write batch at the
+// beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
+// If future recovery starts from the old MANIFEST, it means the writing the new
+// MANIFEST failed. It won't have the "SST ahead of WAL" error.
+//
+// The combination of corrupting a WAL and injecting an error during subsequent
+// re-open exposes the bug of prematurely persisting a new MANIFEST with
+// advanced ColumnFamilyData::log_number.
+TEST_P(CrashDuringRecoveryWithCorruptionTest, TxnDbCrashDuringRecovery) {
+  CloseDb();
+  Options options;
+  options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+  options.track_and_verify_wals_in_manifest =
+      track_and_verify_wals_in_manifest_;
+  options.avoid_flush_during_recovery = false;
+  options.env = env_;
+  ASSERT_OK(DestroyDB(dbname_, options));
+  options.create_if_missing = true;
+  options.max_write_buffer_number = 3;
+  Reopen(&options);
+
+  // Create cf test_cf_name.
+  ColumnFamilyHandle* cfh = nullptr;
+  const std::string test_cf_name = "test_cf";
+  Status s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
+  ASSERT_OK(s);
+  delete cfh;
+  CloseDb();
+
+  std::vector<ColumnFamilyDescriptor> cf_descs;
+  cf_descs.emplace_back(kDefaultColumnFamilyName, options);
+  cf_descs.emplace_back(test_cf_name, options);
+  std::vector<ColumnFamilyHandle*> handles;
+
+  TransactionDB* txn_db = nullptr;
+  TransactionDBOptions txn_db_opts;
+
+  // 1. Open and populate the DB. Write and flush default_cf several times to
+  // advance wal number so that some column families have advanced log_number
+  // while other don't.
+  {
+    ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
+                                  &handles, &txn_db));
+
+    auto* txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
+    // Put cf1
+    ASSERT_OK(txn->Put(handles[1], "foo", "value"));
+    ASSERT_OK(txn->SetName("txn0"));
+    ASSERT_OK(txn->Prepare());
+    ASSERT_OK(txn_db->Flush(FlushOptions()));
+
+    delete txn;
+    txn = nullptr;
+
+    auto* dbimpl = static_cast_with_check<DBImpl>(txn_db->GetRootDB());
+    assert(dbimpl);
+
+    // Put and flush cf0
+    for (int i = 0; i < 2; ++i) {
+      ASSERT_OK(txn_db->Put(WriteOptions(), "key" + std::to_string(i),
+                            "value" + std::to_string(i)));
+      ASSERT_OK(dbimpl->TEST_SwitchMemtable());
+    }
+
+    // Put cf1
+    txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
+    ASSERT_OK(txn->Put(handles[1], "foo1", "value1"));
+    ASSERT_OK(txn->Commit());
+
+    delete txn;
+    txn = nullptr;
+
+    for (auto* h : handles) {
+      delete h;
+    }
+    handles.clear();
+    delete txn_db;
+  }
+
+  // 2. Corrupt second last wal to emulate power reset which caused the DB to
+  // lose the un-synced WAL.
+  {
+    std::vector<uint64_t> file_nums;
+    GetSortedWalFiles(file_nums);
+    size_t size = file_nums.size();
+    assert(size >= 2);
+    uint64_t log_num = file_nums[size - 2];
+    CorruptFileWithTruncation(FileType::kWalFile, log_num,
+                              /*bytes_to_truncate=*/8);
+  }
+
+  // 3. After first crash reopen the DB which contains corrupted WAL. Default
+  // family has higher log number than corrupted wal number. There may be old
+  // WAL files that it must not delete because they can contain data of
+  // uncommitted transactions. As a result, min_log_number_to_keep won't change.
+
+  {
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearAllCallBacks();
+    SyncPoint::GetInstance()->SetCallBack(
+        "DBImpl::Open::BeforeSyncWAL", [&](void* arg) {
+          auto* tmp_s = reinterpret_cast<Status*>(arg);
+          assert(tmp_s);
+          *tmp_s = Status::IOError("Injected");
+        });
+    SyncPoint::GetInstance()->EnableProcessing();
+
+    handles.clear();
+    s = TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs, &handles,
+                            &txn_db);
+    ASSERT_TRUE(s.IsIOError());
+    ASSERT_EQ("IO error: Injected", s.ToString());
+    for (auto* h : handles) {
+      delete h;
+    }
+    CloseDb();
+
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearAllCallBacks();
+  }
+
+  // 4. Corrupt max_wal_num.
+  {
+    std::vector<uint64_t> file_nums;
+    GetSortedWalFiles(file_nums);
+    size_t size = file_nums.size();
+    uint64_t log_num = file_nums[size - 1];
+    CorruptFileWithTruncation(FileType::kWalFile, log_num);
+  }
+
+  // 5. After second crash reopen the db with second corruption. Default family
+  // has higher log number than corrupted wal number.
+  // We persist a new MANIFEST with advanced log_numbers for some column
+  // families only after syncing the WAL. So during second crash, RocksDB will
+  // skip the corrupted WAL files as they have been moved to different folder.
+  // Since newly synced WAL file's sequence number (sentinel WriteBatch) will be
+  // the next sequence number immediately after the largest sequence number
+  // recovered from previous WALs and MANIFEST, db will be in consistent state
+  // and opens successfully.
+  {
+    ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
+                                  &handles, &txn_db));
+
+    // Verify that data is not lost.
+    {
+      std::string v;
+      // Key not visible since it's not committed.
+      ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo", &v),
+                Status::NotFound());
+
+      v.clear();
+      ASSERT_OK(txn_db->Get(ReadOptions(), "key" + std::to_string(0), &v));
+      ASSERT_EQ("value" + std::to_string(0), v);
+
+      // Last WAL is corrupted which contains two keys below.
+      v.clear();
+      ASSERT_EQ(txn_db->Get(ReadOptions(), "key" + std::to_string(1), &v),
+                Status::NotFound());
+      v.clear();
+      ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo1", &v),
+                Status::NotFound());
+    }
+
+    for (auto* h : handles) {
+      delete h;
+    }
+    delete txn_db;
+  }
+}
+
+// This test is similar to
+// CrashDuringRecoveryWithCorruptionTest.CrashDuringRecovery except it calls
+// flush and corrupts Last WAL. It calls flush to sync some of the WALs and
+// remaining are unsyned one of which is then corrupted to simulate crash.
+//
+// In case of non-TransactionDB with avoid_flush_during_recovery = true, RocksDB
+// won't flush the data from WAL to L0 for all column families if possible. As a
+// result, not all column families can increase their log_numbers, and
+// min_log_number_to_keep won't change.
+// It may prematurely persist a new MANIFEST even before we can declare the DB
+// is in consistent state after recovery (this is when the new WAL is synced)
+// and advances log_numbers for some column families.
+//
+// If there is power failure before we sync the new WAL, we will end up in
+// a situation in which after persisting the MANIFEST, RocksDB will see some
+// column families' log_numbers larger than the corrupted wal, and
+// "Column family inconsistency: SST file contains data beyond the point of
+// corruption" error will be hit, causing recovery to fail.
+//
+// After adding the fix, only after new WAL is synced, RocksDB persist a new
+// MANIFEST with column families to ensure RocksDB is in consistent state.
+// RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
+// synced immediately afterwards. The sequence number of the sentinel
+// WriteBatch will be the next sequence number immediately after the largest
+// sequence number recovered from previous WALs and MANIFEST because of which DB
+// will be in consistent state.
+// If a future recovery starts from the new MANIFEST, then it means the new WAL
+// is successfully synced. Due to the sentinel empty write batch at the
+// beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
+// If future recovery starts from the old MANIFEST, it means the writing the new
+// MANIFEST failed. It won't have the "SST ahead of WAL" error.
+
+// The combination of corrupting a WAL and injecting an error during subsequent
+// re-open exposes the bug of prematurely persisting a new MANIFEST with
+// advanced ColumnFamilyData::log_number.
+TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecoveryWithFlush) {
+  CloseDb();
+  Options options;
+  options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+  options.avoid_flush_during_recovery = false;
+  options.env = env_;
+  options.create_if_missing = true;
+
+  ASSERT_OK(DestroyDB(dbname_, options));
+  Reopen(&options);
+
+  ColumnFamilyHandle* cfh = nullptr;
+  const std::string test_cf_name = "test_cf";
+  Status s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
+  ASSERT_OK(s);
+  delete cfh;
+
+  CloseDb();
+
+  std::vector<ColumnFamilyDescriptor> cf_descs;
+  cf_descs.emplace_back(kDefaultColumnFamilyName, options);
+  cf_descs.emplace_back(test_cf_name, options);
+  std::vector<ColumnFamilyHandle*> handles;
+
+  {
+    ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
+
+    // Write one key to test_cf.
+    ASSERT_OK(db_->Put(WriteOptions(), handles[1], "old_key", "dontcare"));
+
+    // Write to default_cf and flush this cf several times to advance wal
+    // number.
+    for (int i = 0; i < 2; ++i) {
+      ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
+                         "value" + std::to_string(i)));
+      ASSERT_OK(db_->Flush(FlushOptions()));
+    }
+
+    ASSERT_OK(db_->Put(WriteOptions(), handles[1], "dontcare", "dontcare"));
+    for (auto* h : handles) {
+      delete h;
+    }
+    handles.clear();
+    CloseDb();
+  }
+
+  // Corrupt second last un-syned wal file to emulate power reset which
+  // caused the DB to lose the un-synced WAL.
+  {
+    std::vector<uint64_t> file_nums;
+    GetSortedWalFiles(file_nums);
+    size_t size = file_nums.size();
+    uint64_t log_num = file_nums[size - 1];
+    CorruptFileWithTruncation(FileType::kWalFile, log_num,
+                              /*bytes_to_truncate=*/8);
+  }
+
+  // Fault is injected to fail the recovery.
+  {
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearAllCallBacks();
+    SyncPoint::GetInstance()->SetCallBack(
+        "DBImpl::GetLogSizeAndMaybeTruncate:0", [&](void* arg) {
+          auto* tmp_s = reinterpret_cast<Status*>(arg);
+          assert(tmp_s);
+          *tmp_s = Status::IOError("Injected");
+        });
+    SyncPoint::GetInstance()->EnableProcessing();
+
+    handles.clear();
+    options.avoid_flush_during_recovery = true;
+    s = DB::Open(options, dbname_, cf_descs, &handles, &db_);
+    ASSERT_TRUE(s.IsIOError());
+    ASSERT_EQ("IO error: Injected", s.ToString());
+    for (auto* h : handles) {
+      delete h;
+    }
+    CloseDb();
+
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearAllCallBacks();
+  }
+
+  // Reopen db again
+  {
+    options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
+    ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
+
+    // Verify that data is not lost.
+    {
+      std::string v;
+      ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
+      ASSERT_EQ("dontcare", v);
+
+      for (int i = 0; i < 2; ++i) {
+        v.clear();
+        ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(i), &v));
+        ASSERT_EQ("value" + std::to_string(i), v);
+      }
+
+      // Since it's corrupting last wal after Flush, below key is not found.
+      v.clear();
+      ASSERT_EQ(db_->Get(ReadOptions(), handles[1], "dontcare", &v),
+                Status::NotFound());
+    }
+
+    for (auto* h : handles) {
+      delete h;
+    }
+  }
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
+  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   ::testing::InitGoogleTest(&argc, argv);
+  RegisterCustomObjects(argc, argv);
   return RUN_ALL_TESTS();
 }