]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_impl/db_impl_open.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_open.cc
index 9d82a0a1e291f2e3d9993c2e9e58036e1c9133a5..40ffa2e85669da22acc5e41860f83855462ba1c0 100644 (file)
 #include "db/builder.h"
 #include "db/db_impl/db_impl.h"
 #include "db/error_handler.h"
-#include "db/periodic_work_scheduler.h"
+#include "db/periodic_task_scheduler.h"
 #include "env/composite_env_wrapper.h"
+#include "file/filename.h"
 #include "file/read_write_util.h"
 #include "file/sst_file_manager_impl.h"
 #include "file/writable_file_writer.h"
+#include "logging/logging.h"
 #include "monitoring/persistent_stats_history.h"
 #include "options/options_helper.h"
 #include "rocksdb/table.h"
 #include "util/rate_limiter.h"
 
 namespace ROCKSDB_NAMESPACE {
-Options SanitizeOptions(const std::string& dbname, const Options& src) {
-  auto db_options = SanitizeOptions(dbname, DBOptions(src));
+Options SanitizeOptions(const std::string& dbname, const Options& src,
+                        bool read_only, Status* logger_creation_s) {
+  auto db_options =
+      SanitizeOptions(dbname, DBOptions(src), read_only, logger_creation_s);
   ImmutableDBOptions immutable_db_options(db_options);
   auto cf_options =
       SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
   return Options(db_options, cf_options);
 }
 
-DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
+DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
+                          bool read_only, Status* logger_creation_s) {
   DBOptions result(src);
 
   if (result.env == nullptr) {
@@ -50,11 +55,14 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
                              &result.max_open_files);
   }
 
-  if (result.info_log == nullptr) {
+  if (result.info_log == nullptr && !read_only) {
     Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
     if (!s.ok()) {
       // No place suitable for logging
       result.info_log = nullptr;
+      if (logger_creation_s) {
+        *logger_creation_s = s;
+      }
     }
   }
 
@@ -109,16 +117,28 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
     result.recycle_log_file_num = 0;
   }
 
-  if (result.wal_dir.empty()) {
+  if (result.db_paths.size() == 0) {
+    result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
+  } else if (result.wal_dir.empty()) {
     // Use dbname as default
     result.wal_dir = dbname;
   }
-  if (result.wal_dir.back() == '/') {
-    result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
+  if (!result.wal_dir.empty()) {
+    // If there is a wal_dir already set, check to see if the wal_dir is the
+    // same as the dbname AND the same as the db_path[0] (which must exist from
+    // a few lines ago). If the wal_dir matches both of these values, then clear
+    // the wal_dir value, which will make wal_dir == dbname.  Most likely this
+    // condition was the result of reading an old options file where we forced
+    // wal_dir to be set (to dbname).
+    auto npath = NormalizePath(dbname + "/");
+    if (npath == NormalizePath(result.wal_dir + "/") &&
+        npath == NormalizePath(result.db_paths[0].path + "/")) {
+      result.wal_dir.clear();
+    }
   }
 
-  if (result.db_paths.size() == 0) {
-    result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
+  if (!result.wal_dir.empty() && result.wal_dir.back() == '/') {
+    result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
   }
 
   if (result.use_direct_reads && result.compaction_readahead_size == 0) {
@@ -126,10 +146,6 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
     result.compaction_readahead_size = 1024 * 1024 * 2;
   }
 
-  if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
-    result.new_table_reader_for_compaction_inputs = true;
-  }
-
   // Force flush on DB open if 2PC is enabled, since with 2PC we have no
   // guarantee that consecutive log files have consecutive sequence id, which
   // make recovery complicated.
@@ -139,7 +155,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
 
 #ifndef ROCKSDB_LITE
   ImmutableDBOptions immutable_db_options(result);
-  if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
+  if (!immutable_db_options.IsWalDirSameAsDBPath()) {
     // Either the WAL dir and db_paths[0]/db_name are not the same, or we
     // cannot tell for sure. In either case, assume they're different and
     // explicitly cleanup the trash log files (bypass DeleteScheduler)
@@ -147,12 +163,17 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
     // DeleteScheduler::CleanupDirectory on the same dir later, it will be
     // safe
     std::vector<std::string> filenames;
-    result.env->GetChildren(result.wal_dir, &filenames).PermitUncheckedError();
+    IOOptions io_opts;
+    io_opts.do_not_recurse = true;
+    auto wal_dir = immutable_db_options.GetWalDir();
+    Status s = immutable_db_options.fs->GetChildren(
+        wal_dir, io_opts, &filenames, /*IODebugContext*=*/nullptr);
+    s.PermitUncheckedError();  //**TODO: What to do on error?
     for (std::string& filename : filenames) {
       if (filename.find(".log.trash", filename.length() -
                                           std::string(".log.trash").length()) !=
           std::string::npos) {
-        std::string trash_file = result.wal_dir + "/" + filename;
+        std::string trash_file = wal_dir + "/" + filename;
         result.env->DeleteFile(trash_file).PermitUncheckedError();
       }
     }
@@ -163,7 +184,8 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
   // was not used)
   auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
   for (size_t i = 0; i < result.db_paths.size(); i++) {
-    DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path);
+    DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path)
+        .PermitUncheckedError();
   }
 
   // Create a default SstFileManager for purposes of tracking compaction size
@@ -173,7 +195,14 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
         NewSstFileManager(result.env, result.info_log));
     result.sst_file_manager = sst_file_manager;
   }
-#endif
+#endif  // !ROCKSDB_LITE
+
+  // Supported wal compression types
+  if (!StreamingCompressionTypeSupported(result.wal_compression)) {
+    result.wal_compression = kNoCompression;
+    ROCKS_LOG_WARN(result.info_log,
+                   "wal_compression is disabled since only zstd is supported");
+  }
 
   if (!result.paranoid_checks) {
     result.skip_checking_sst_file_sizes_on_db_open = true;
@@ -241,7 +270,8 @@ Status DBImpl::ValidateOptions(const DBOptions& db_options) {
   if (db_options.unordered_write &&
       !db_options.allow_concurrent_memtable_write) {
     return Status::InvalidArgument(
-        "unordered_write is incompatible with !allow_concurrent_memtable_write");
+        "unordered_write is incompatible with "
+        "!allow_concurrent_memtable_write");
   }
 
   if (db_options.unordered_write && db_options.enable_pipelined_write) {
@@ -260,6 +290,12 @@ Status DBImpl::ValidateOptions(const DBOptions& db_options) {
         "atomic_flush is currently incompatible with best-efforts recovery");
   }
 
+  if (db_options.use_direct_io_for_flush_and_compaction &&
+      0 == db_options.writable_file_max_buffer_size) {
+    return Status::InvalidArgument(
+        "writes in direct IO require writable_file_max_buffer_size > 0");
+  }
+
   return Status::OK();
 }
 
@@ -281,23 +317,29 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
   ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
   const std::string manifest = DescriptorFileName(dbname_, 1);
   {
+    if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
+      fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
+    }
     std::unique_ptr<FSWritableFile> file;
     FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
     s = NewWritableFile(fs_.get(), manifest, &file, file_options);
     if (!s.ok()) {
       return s;
     }
+    FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
     file->SetPreallocationBlockSize(
         immutable_db_options_.manifest_preallocation_size);
     std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
-        std::move(file), manifest, file_options, env_, io_tracer_,
-        nullptr /* stats */, immutable_db_options_.listeners));
+        std::move(file), manifest, file_options, immutable_db_options_.clock,
+        io_tracer_, nullptr /* stats */, immutable_db_options_.listeners,
+        nullptr, tmp_set.Contains(FileType::kDescriptorFile),
+        tmp_set.Contains(FileType::kDescriptorFile)));
     log::Writer log(std::move(file_writer), 0, false);
     std::string record;
     new_db.EncodeTo(&record);
     s = log.AddRecord(record);
     if (s.ok()) {
-      s = SyncManifest(env_, &immutable_db_options_, log.file());
+      s = SyncManifest(&immutable_db_options_, log.file());
     }
   }
   if (s.ok()) {
@@ -308,7 +350,7 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
           manifest.substr(manifest.find_last_of("/\\") + 1));
     }
   } else {
-    fs_->DeleteFile(manifest, IOOptions(), nullptr);
+    fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
   }
   return s;
 }
@@ -365,7 +407,7 @@ IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
 Status DBImpl::Recover(
     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
     bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
-    uint64_t* recovered_seq) {
+    uint64_t* recovered_seq, RecoveryContext* recovery_ctx) {
   mutex_.AssertHeld();
 
   bool is_new_db = false;
@@ -394,7 +436,10 @@ Status DBImpl::Recover(
       s = env_->FileExists(current_fname);
     } else {
       s = Status::NotFound();
-      Status io_s = env_->GetChildren(dbname_, &files_in_dbname);
+      IOOptions io_opts;
+      io_opts.do_not_recurse = true;
+      Status io_s = immutable_db_options_.fs->GetChildren(
+          dbname_, io_opts, &files_in_dbname, /*IODebugContext*=*/nullptr);
       if (!io_s.ok()) {
         s = io_s;
         files_in_dbname.clear();
@@ -403,11 +448,14 @@ Status DBImpl::Recover(
         uint64_t number = 0;
         FileType type = kWalFile;  // initialize
         if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
-          // Found MANIFEST (descriptor log), thus best-efforts recovery does
-          // not have to treat the db as empty.
-          s = Status::OK();
-          manifest_path = dbname_ + "/" + file;
-          break;
+          uint64_t bytes;
+          s = env_->GetFileSize(DescriptorFileName(dbname_, number), &bytes);
+          if (s.ok() && bytes != 0) {
+            // Found non-empty MANIFEST (descriptor log), thus best-efforts
+            // recovery does not have to treat the db as empty.
+            manifest_path = dbname_ + "/" + file;
+            break;
+          }
         }
       }
     }
@@ -457,7 +505,10 @@ Status DBImpl::Recover(
     }
   } else if (immutable_db_options_.best_efforts_recovery) {
     assert(files_in_dbname.empty());
-    Status s = env_->GetChildren(dbname_, &files_in_dbname);
+    IOOptions io_opts;
+    io_opts.do_not_recurse = true;
+    Status s = immutable_db_options_.fs->GetChildren(
+        dbname_, io_opts, &files_in_dbname, /*IODebugContext*=*/nullptr);
     if (s.IsNotFound()) {
       return Status::InvalidArgument(dbname_,
                                      "does not exist (open for read only)");
@@ -484,15 +535,17 @@ Status DBImpl::Recover(
   if (!s.ok()) {
     return s;
   }
-  s = SetDBId();
+  s = SetupDBId(read_only, recovery_ctx);
+  ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB ID: %s\n", db_id_.c_str());
   if (s.ok() && !read_only) {
-    s = DeleteUnreferencedSstFiles();
+    s = DeleteUnreferencedSstFiles(recovery_ctx);
   }
 
   if (immutable_db_options_.paranoid_checks && s.ok()) {
     s = CheckConsistency();
   }
   if (s.ok() && !read_only) {
+    // TODO: share file descriptors (FSDirectory) with SetDirectories above
     std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
     for (auto cfd : *versions_->GetColumnFamilySet()) {
       s = cfd->AddDirectories(&created_dirs);
@@ -501,10 +554,6 @@ Status DBImpl::Recover(
       }
     }
   }
-  // DB mutex is already held
-  if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
-    s = InitPersistStatsColumnFamily();
-  }
 
   std::vector<std::string> files_in_wal_dir;
   if (s.ok()) {
@@ -521,10 +570,6 @@ Status DBImpl::Recover(
     default_cf_handle_ = new ColumnFamilyHandleImpl(
         versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
     default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
-    // TODO(Zhongyi): handle single_column_family_mode_ when
-    // persistent_stats is enabled
-    single_column_family_mode_ =
-        versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
 
     // Recover from all newer log files than the ones named in the
     // descriptor (new log files may have been added by the previous
@@ -533,12 +578,15 @@ Status DBImpl::Recover(
     // Note that prev_log_number() is no longer used, but we pay
     // attention to it in case we are recovering a database
     // produced by an older version of rocksdb.
+    auto wal_dir = immutable_db_options_.GetWalDir();
     if (!immutable_db_options_.best_efforts_recovery) {
-      s = env_->GetChildren(immutable_db_options_.wal_dir, &files_in_wal_dir);
+      IOOptions io_opts;
+      io_opts.do_not_recurse = true;
+      s = immutable_db_options_.fs->GetChildren(
+          wal_dir, io_opts, &files_in_wal_dir, /*IODebugContext*=*/nullptr);
     }
     if (s.IsNotFound()) {
-      return Status::InvalidArgument("wal_dir not found",
-                                     immutable_db_options_.wal_dir);
+      return Status::InvalidArgument("wal_dir not found", wal_dir);
     } else if (!s.ok()) {
       return s;
     }
@@ -554,8 +602,7 @@ Status DBImpl::Recover(
               "existing log file: ",
               file);
         } else {
-          wal_files[number] =
-              LogFileName(immutable_db_options_.wal_dir, number);
+          wal_files[number] = LogFileName(wal_dir, number);
         }
       }
     }
@@ -575,7 +622,10 @@ Status DBImpl::Recover(
       WalNumber max_wal_number =
           versions_->GetWalSet().GetWals().rbegin()->first;
       edit.DeleteWalsBefore(max_wal_number + 1);
-      s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
+      assert(recovery_ctx != nullptr);
+      assert(versions_->GetColumnFamilySet() != nullptr);
+      recovery_ctx->UpdateVersionEdits(
+          versions_->GetColumnFamilySet()->GetDefault(), edit);
     }
     if (!s.ok()) {
       return s;
@@ -611,8 +661,8 @@ Status DBImpl::Recover(
       std::sort(wals.begin(), wals.end());
 
       bool corrupted_wal_found = false;
-      s = RecoverLogFiles(wals, &next_sequence, read_only,
-                          &corrupted_wal_found);
+      s = RecoverLogFiles(wals, &next_sequence, read_only, &corrupted_wal_found,
+                          recovery_ctx);
       if (corrupted_wal_found && recovered_seq != nullptr) {
         *recovered_seq = next_sequence;
       }
@@ -635,13 +685,16 @@ Status DBImpl::Recover(
     if (s.ok()) {
       const std::string normalized_dbname = NormalizePath(dbname_);
       const std::string normalized_wal_dir =
-          NormalizePath(immutable_db_options_.wal_dir);
+          NormalizePath(immutable_db_options_.GetWalDir());
       if (immutable_db_options_.best_efforts_recovery) {
         filenames = std::move(files_in_dbname);
       } else if (normalized_dbname == normalized_wal_dir) {
         filenames = std::move(files_in_wal_dir);
       } else {
-        s = env_->GetChildren(GetName(), &filenames);
+        IOOptions io_opts;
+        io_opts.do_not_recurse = true;
+        s = immutable_db_options_.fs->GetChildren(
+            GetName(), io_opts, &filenames, /*IODebugContext*=*/nullptr);
       }
     }
     if (s.ok()) {
@@ -654,6 +707,12 @@ Status DBImpl::Recover(
         }
       }
       versions_->options_file_number_ = options_file_number;
+      uint64_t options_file_size = 0;
+      if (options_file_number > 0) {
+        s = env_->GetFileSize(OptionsFileName(GetName(), options_file_number),
+                              &options_file_size);
+      }
+      versions_->options_file_size_ = options_file_size;
     }
   }
   return s;
@@ -721,11 +780,11 @@ Status DBImpl::PersistentStatsProcessFormatVersion() {
     WriteBatch batch;
     if (s.ok()) {
       s = batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
-                    ToString(kStatsCFCurrentFormatVersion));
+                    std::to_string(kStatsCFCurrentFormatVersion));
     }
     if (s.ok()) {
       s = batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
-                    ToString(kStatsCFCompatibleFormatVersion));
+                    std::to_string(kStatsCFCompatibleFormatVersion));
     }
     if (s.ok()) {
       WriteOptions wo;
@@ -766,10 +825,155 @@ Status DBImpl::InitPersistStatsColumnFamily() {
   return s;
 }
 
+Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
+  mutex_.AssertHeld();
+  assert(versions_->descriptor_log_ == nullptr);
+  Status s = versions_->LogAndApply(
+      recovery_ctx.cfds_, recovery_ctx.mutable_cf_opts_,
+      recovery_ctx.edit_lists_, &mutex_, directories_.GetDbDir());
+  if (s.ok() && !(recovery_ctx.files_to_delete_.empty())) {
+    mutex_.Unlock();
+    for (const auto& fname : recovery_ctx.files_to_delete_) {
+      s = env_->DeleteFile(fname);
+      if (!s.ok()) {
+        break;
+      }
+    }
+    mutex_.Lock();
+  }
+  return s;
+}
+
+void DBImpl::InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap() {
+#ifndef ROCKSDB_LITE
+  if (immutable_db_options_.wal_filter == nullptr) {
+    return;
+  }
+  assert(immutable_db_options_.wal_filter != nullptr);
+  WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
+
+  std::map<std::string, uint32_t> cf_name_id_map;
+  std::map<uint32_t, uint64_t> cf_lognumber_map;
+  assert(versions_);
+  assert(versions_->GetColumnFamilySet());
+  for (auto cfd : *versions_->GetColumnFamilySet()) {
+    assert(cfd);
+    cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
+    cf_lognumber_map.insert(std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
+  }
+
+  wal_filter.ColumnFamilyLogNumberMap(cf_lognumber_map, cf_name_id_map);
+#endif  // !ROCKSDB_LITE
+}
+
+bool DBImpl::InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
+                                                const std::string& wal_fname,
+                                                log::Reader::Reporter& reporter,
+                                                Status& status,
+                                                bool& stop_replay,
+                                                WriteBatch& batch) {
+#ifndef ROCKSDB_LITE
+  if (immutable_db_options_.wal_filter == nullptr) {
+    return true;
+  }
+  assert(immutable_db_options_.wal_filter != nullptr);
+  WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
+
+  WriteBatch new_batch;
+  bool batch_changed = false;
+
+  bool process_current_record = true;
+
+  WalFilter::WalProcessingOption wal_processing_option =
+      wal_filter.LogRecordFound(wal_number, wal_fname, batch, &new_batch,
+                                &batch_changed);
+
+  switch (wal_processing_option) {
+    case WalFilter::WalProcessingOption::kContinueProcessing:
+      // do nothing, proceeed normally
+      break;
+    case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
+      // skip current record
+      process_current_record = false;
+      break;
+    case WalFilter::WalProcessingOption::kStopReplay:
+      // skip current record and stop replay
+      process_current_record = false;
+      stop_replay = true;
+      break;
+    case WalFilter::WalProcessingOption::kCorruptedRecord: {
+      status = Status::Corruption("Corruption reported by Wal Filter ",
+                                  wal_filter.Name());
+      MaybeIgnoreError(&status);
+      if (!status.ok()) {
+        process_current_record = false;
+        reporter.Corruption(batch.GetDataSize(), status);
+      }
+      break;
+    }
+    default: {
+      // logical error which should not happen. If RocksDB throws, we would
+      // just do `throw std::logic_error`.
+      assert(false);
+      status = Status::NotSupported(
+          "Unknown WalProcessingOption returned by Wal Filter ",
+          wal_filter.Name());
+      MaybeIgnoreError(&status);
+      if (!status.ok()) {
+        // Ignore the error with current record processing.
+        stop_replay = true;
+      }
+      break;
+    }
+  }
+
+  if (!process_current_record) {
+    return false;
+  }
+
+  if (batch_changed) {
+    // Make sure that the count in the new batch is
+    // within the orignal count.
+    int new_count = WriteBatchInternal::Count(&new_batch);
+    int original_count = WriteBatchInternal::Count(&batch);
+    if (new_count > original_count) {
+      ROCKS_LOG_FATAL(
+          immutable_db_options_.info_log,
+          "Recovering log #%" PRIu64
+          " mode %d log filter %s returned "
+          "more records (%d) than original (%d) which is not allowed. "
+          "Aborting recovery.",
+          wal_number, static_cast<int>(immutable_db_options_.wal_recovery_mode),
+          wal_filter.Name(), new_count, original_count);
+      status = Status::NotSupported(
+          "More than original # of records "
+          "returned by Wal Filter ",
+          wal_filter.Name());
+      return false;
+    }
+    // Set the same sequence number in the new_batch
+    // as the original batch.
+    WriteBatchInternal::SetSequence(&new_batch,
+                                    WriteBatchInternal::Sequence(&batch));
+    batch = new_batch;
+  }
+  return true;
+#else   // !ROCKSDB_LITE
+  (void)wal_number;
+  (void)wal_fname;
+  (void)reporter;
+  (void)status;
+  (void)stop_replay;
+  (void)batch;
+  return true;
+#endif  // ROCKSDB_LITE
+}
+
 // REQUIRES: wal_numbers are sorted in ascending order
 Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
                                SequenceNumber* next_sequence, bool read_only,
-                               bool* corrupted_wal_found) {
+                               bool* corrupted_wal_found,
+                               RecoveryContext* recovery_ctx) {
   struct LogReporter : public log::Reader::Reporter {
     Env* env;
     Logger* info_log;
@@ -807,26 +1011,19 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
     stream.EndArray();
   }
 
-#ifndef ROCKSDB_LITE
-  if (immutable_db_options_.wal_filter != nullptr) {
-    std::map<std::string, uint32_t> cf_name_id_map;
-    std::map<uint32_t, uint64_t> cf_lognumber_map;
-    for (auto cfd : *versions_->GetColumnFamilySet()) {
-      cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
-      cf_lognumber_map.insert(
-          std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
-    }
-
-    immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
-                                                               cf_name_id_map);
-  }
-#endif
+  // No-op for immutable_db_options_.wal_filter == nullptr.
+  InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap();
 
   bool stop_replay_by_wal_filter = false;
   bool stop_replay_for_corruption = false;
   bool flushed = false;
   uint64_t corrupted_wal_number = kMaxSequenceNumber;
   uint64_t min_wal_number = MinLogNumberToKeep();
+  if (!allow_2pc()) {
+    // In non-2pc mode, we skip WALs that do not back unflushed data.
+    min_wal_number =
+        std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData());
+  }
   for (auto wal_number : wal_numbers) {
     if (wal_number < min_wal_number) {
       ROCKS_LOG_INFO(immutable_db_options_.info_log,
@@ -840,7 +1037,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
     // update the file number allocation counter in VersionSet.
     versions_->MarkFileNumberUsed(wal_number);
     // Open the log file
-    std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
+    std::string fname =
+        LogFileName(immutable_db_options_.GetWalDir(), wal_number);
 
     ROCKS_LOG_INFO(immutable_db_options_.info_log,
                    "Recovering log #%" PRIu64 " mode %d", wal_number,
@@ -861,9 +1059,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
     std::unique_ptr<SequentialFileReader> file_reader;
     {
       std::unique_ptr<FSSequentialFile> file;
-      status = fs_->NewSequentialFile(fname,
-                                      fs_->OptimizeForLogRead(file_options_),
-                                      &file, nullptr);
+      status = fs_->NewSequentialFile(
+          fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
       if (!status.ok()) {
         MaybeIgnoreError(&status);
         if (!status.ok()) {
@@ -902,13 +1099,14 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
     // Read all the records and add to a memtable
     std::string scratch;
     Slice record;
-    WriteBatch batch;
 
     TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
                              /*arg=*/nullptr);
+    uint64_t record_checksum;
     while (!stop_replay_by_wal_filter &&
            reader.ReadRecord(&record, &scratch,
-                             immutable_db_options_.wal_recovery_mode) &&
+                             immutable_db_options_.wal_recovery_mode,
+                             &record_checksum) &&
            status.ok()) {
       if (record.size() < WriteBatchInternal::kHeader) {
         reporter.Corruption(record.size(),
@@ -916,10 +1114,25 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
         continue;
       }
 
+      // We create a new batch and initialize with a valid prot_info_ to store
+      // the data checksums
+      WriteBatch batch;
+
       status = WriteBatchInternal::SetContents(&batch, record);
       if (!status.ok()) {
         return status;
       }
+      TEST_SYNC_POINT_CALLBACK(
+          "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", &batch);
+      TEST_SYNC_POINT_CALLBACK(
+          "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
+          &record_checksum);
+      status = WriteBatchInternal::UpdateProtectionInfo(
+          &batch, 8 /* bytes_per_key */, &record_checksum);
+      if (!status.ok()) {
+        return status;
+      }
+
       SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
 
       if (immutable_db_options_.wal_recovery_mode ==
@@ -937,83 +1150,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
         }
       }
 
-#ifndef ROCKSDB_LITE
-      if (immutable_db_options_.wal_filter != nullptr) {
-        WriteBatch new_batch;
-        bool batch_changed = false;
-
-        WalFilter::WalProcessingOption wal_processing_option =
-            immutable_db_options_.wal_filter->LogRecordFound(
-                wal_number, fname, batch, &new_batch, &batch_changed);
-
-        switch (wal_processing_option) {
-          case WalFilter::WalProcessingOption::kContinueProcessing:
-            // do nothing, proceeed normally
-            break;
-          case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
-            // skip current record
-            continue;
-          case WalFilter::WalProcessingOption::kStopReplay:
-            // skip current record and stop replay
-            stop_replay_by_wal_filter = true;
-            continue;
-          case WalFilter::WalProcessingOption::kCorruptedRecord: {
-            status =
-                Status::Corruption("Corruption reported by Wal Filter ",
-                                   immutable_db_options_.wal_filter->Name());
-            MaybeIgnoreError(&status);
-            if (!status.ok()) {
-              reporter.Corruption(record.size(), status);
-              continue;
-            }
-            break;
-          }
-          default: {
-            assert(false);  // unhandled case
-            status = Status::NotSupported(
-                "Unknown WalProcessingOption returned"
-                " by Wal Filter ",
-                immutable_db_options_.wal_filter->Name());
-            MaybeIgnoreError(&status);
-            if (!status.ok()) {
-              return status;
-            } else {
-              // Ignore the error with current record processing.
-              continue;
-            }
-          }
-        }
-
-        if (batch_changed) {
-          // Make sure that the count in the new batch is
-          // within the orignal count.
-          int new_count = WriteBatchInternal::Count(&new_batch);
-          int original_count = WriteBatchInternal::Count(&batch);
-          if (new_count > original_count) {
-            ROCKS_LOG_FATAL(
-                immutable_db_options_.info_log,
-                "Recovering log #%" PRIu64
-                " mode %d log filter %s returned "
-                "more records (%d) than original (%d) which is not allowed. "
-                "Aborting recovery.",
-                wal_number,
-                static_cast<int>(immutable_db_options_.wal_recovery_mode),
-                immutable_db_options_.wal_filter->Name(), new_count,
-                original_count);
-            status = Status::NotSupported(
-                "More than original # of records "
-                "returned by Wal Filter ",
-                immutable_db_options_.wal_filter->Name());
-            return status;
-          }
-          // Set the same sequence number in the new_batch
-          // as the original batch.
-          WriteBatchInternal::SetSequence(&new_batch,
-                                          WriteBatchInternal::Sequence(&batch));
-          batch = new_batch;
-        }
+      // For the default case of wal_filter == nullptr, always performs no-op
+      // and returns true.
+      if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, reporter,
+                                              status, stop_replay_by_wal_filter,
+                                              batch)) {
+        continue;
       }
-#endif  // ROCKSDB_LITE
 
       // If column family was not found, it might mean that the WAL write
       // batch references to the column family that was dropped after the
@@ -1128,11 +1271,29 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
        immutable_db_options_.wal_recovery_mode ==
            WALRecoveryMode::kTolerateCorruptedTailRecords)) {
     for (auto cfd : *versions_->GetColumnFamilySet()) {
-      if (cfd->GetLogNumber() > corrupted_wal_number) {
+      // One special case cause cfd->GetLogNumber() > corrupted_wal_number but
+      // the CF is still consistent: If a new column family is created during
+      // the flush and the WAL sync fails at the same time, the new CF points to
+      // the new WAL but the old WAL is curropted. Since the new CF is empty, it
+      // is still consistent. We add the check of CF sst file size to avoid the
+      // false positive alert.
+
+      // Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
+      // the ignorance of a very rare inconsistency case caused in data
+      // canclation. One CF is empty due to KV deletion. But those operations
+      // are in the WAL. If the WAL is corrupted, the status of this CF might
+      // not be consistent with others. However, the consistency check will be
+      // bypassed due to empty CF.
+      // TODO: a better and complete implementation is needed to ensure strict
+      // consistency check in WAL recovery including hanlding the tailing
+      // issues.
+      if (cfd->GetLogNumber() > corrupted_wal_number &&
+          cfd->GetLiveSstFilesSize() > 0) {
         ROCKS_LOG_ERROR(immutable_db_options_.info_log,
                         "Column family inconsistency: SST file contains data"
                         " beyond the point of corruption.");
-        return Status::Corruption("SST file is ahead of WALs");
+        return Status::Corruption("SST file is ahead of WALs in CF " +
+                                  cfd->GetName());
       }
     }
   }
@@ -1199,34 +1360,42 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
       // VersionSet::next_file_number_ always to be strictly greater than any
       // log number
       versions_->MarkFileNumberUsed(max_wal_number + 1);
+      assert(recovery_ctx != nullptr);
 
-      autovector<ColumnFamilyData*> cfds;
-      autovector<const MutableCFOptions*> cf_opts;
-      autovector<autovector<VersionEdit*>> edit_lists;
       for (auto* cfd : *versions_->GetColumnFamilySet()) {
-        cfds.push_back(cfd);
-        cf_opts.push_back(cfd->GetLatestMutableCFOptions());
         auto iter = version_edits.find(cfd->GetID());
         assert(iter != version_edits.end());
-        edit_lists.push_back({&iter->second});
+        recovery_ctx->UpdateVersionEdits(cfd, iter->second);
       }
 
-      std::unique_ptr<VersionEdit> wal_deletion;
-      if (immutable_db_options_.track_and_verify_wals_in_manifest) {
-        wal_deletion.reset(new VersionEdit);
-        wal_deletion->DeleteWalsBefore(max_wal_number + 1);
-        edit_lists.back().push_back(wal_deletion.get());
+      if (flushed) {
+        VersionEdit wal_deletion;
+        if (immutable_db_options_.track_and_verify_wals_in_manifest) {
+          wal_deletion.DeleteWalsBefore(max_wal_number + 1);
+        }
+        if (!allow_2pc()) {
+          // In non-2pc mode, flushing the memtables of the column families
+          // means we can advance min_log_number_to_keep.
+          wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1);
+        }
+        assert(versions_->GetColumnFamilySet() != nullptr);
+        recovery_ctx->UpdateVersionEdits(
+            versions_->GetColumnFamilySet()->GetDefault(), wal_deletion);
       }
-
-      // write MANIFEST with update
-      status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
-                                      directories_.GetDbDir(),
-                                      /*new_descriptor_log=*/true);
     }
   }
 
-  if (status.ok() && data_seen && !flushed) {
-    status = RestoreAliveLogFiles(wal_numbers);
+  if (status.ok()) {
+    if (data_seen && !flushed) {
+      status = RestoreAliveLogFiles(wal_numbers);
+    } else if (!wal_numbers.empty()) {  // If there's no data in the WAL, or we
+                                        // flushed all the data, still
+      // truncate the log file. If the process goes into a crash loop before
+      // the file is deleted, the preallocated space will never get freed.
+      const bool truncate = !read_only;
+      GetLogSizeAndMaybeTruncate(wal_numbers.back(), truncate, nullptr)
+          .PermitUncheckedError();
+    }
   }
 
   event_logger_.Log() << "job" << job_id << "event"
@@ -1235,6 +1404,42 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
   return status;
 }
 
+Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
+                                          LogFileNumberSize* log_ptr) {
+  LogFileNumberSize log(wal_number);
+  std::string fname =
+      LogFileName(immutable_db_options_.GetWalDir(), wal_number);
+  Status s;
+  // This gets the appear size of the wals, not including preallocated space.
+  s = env_->GetFileSize(fname, &log.size);
+  TEST_SYNC_POINT_CALLBACK("DBImpl::GetLogSizeAndMaybeTruncate:0", /*arg=*/&s);
+  if (s.ok() && truncate) {
+    std::unique_ptr<FSWritableFile> last_log;
+    Status truncate_status = fs_->ReopenWritableFile(
+        fname,
+        fs_->OptimizeForLogWrite(
+            file_options_,
+            BuildDBOptions(immutable_db_options_, mutable_db_options_)),
+        &last_log, nullptr);
+    if (truncate_status.ok()) {
+      truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
+    }
+    if (truncate_status.ok()) {
+      truncate_status = last_log->Close(IOOptions(), nullptr);
+    }
+    // Not a critical error if fail to truncate.
+    if (!truncate_status.ok() && !truncate_status.IsNotSupported()) {
+      ROCKS_LOG_WARN(immutable_db_options_.info_log,
+                     "Failed to truncate log #%" PRIu64 ": %s", wal_number,
+                     truncate_status.ToString().c_str());
+    }
+  }
+  if (log_ptr) {
+    *log_ptr = log;
+  }
+  return s;
+}
+
 Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
   if (wal_numbers.empty()) {
     return Status::OK();
@@ -1242,50 +1447,29 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
   Status s;
   mutex_.AssertHeld();
   assert(immutable_db_options_.avoid_flush_during_recovery);
-  if (two_write_queues_) {
-    log_write_mutex_.Lock();
-  }
   // Mark these as alive so they'll be considered for deletion later by
   // FindObsoleteFiles()
   total_log_size_ = 0;
   log_empty_ = false;
+  uint64_t min_wal_with_unflushed_data =
+      versions_->MinLogNumberWithUnflushedData();
   for (auto wal_number : wal_numbers) {
-    LogFileNumberSize log(wal_number);
-    std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
-    // This gets the appear size of the wals, not including preallocated space.
-    s = env_->GetFileSize(fname, &log.size);
-    if (!s.ok()) {
-      break;
+    if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) {
+      // In non-2pc mode, the WAL files not backing unflushed data are not
+      // alive, thus should not be added to the alive_log_files_.
+      continue;
     }
-    total_log_size_ += log.size;
-    alive_log_files_.push_back(log);
     // We preallocate space for wals, but then after a crash and restart, those
     // preallocated space are not needed anymore. It is likely only the last
     // log has such preallocated space, so we only truncate for the last log.
-    if (wal_number == wal_numbers.back()) {
-      std::unique_ptr<FSWritableFile> last_log;
-      Status truncate_status = fs_->ReopenWritableFile(
-          fname,
-          fs_->OptimizeForLogWrite(
-              file_options_,
-              BuildDBOptions(immutable_db_options_, mutable_db_options_)),
-          &last_log, nullptr);
-      if (truncate_status.ok()) {
-        truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
-      }
-      if (truncate_status.ok()) {
-        truncate_status = last_log->Close(IOOptions(), nullptr);
-      }
-      // Not a critical error if fail to truncate.
-      if (!truncate_status.ok()) {
-        ROCKS_LOG_WARN(immutable_db_options_.info_log,
-                       "Failed to truncate log #%" PRIu64 ": %s", wal_number,
-                       truncate_status.ToString().c_str());
-      }
+    LogFileNumberSize log;
+    s = GetLogSizeAndMaybeTruncate(
+        wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log);
+    if (!s.ok()) {
+      break;
     }
-  }
-  if (two_write_queues_) {
-    log_write_mutex_.Unlock();
+    total_log_size_ += log.size;
+    alive_log_files_.push_back(log);
   }
   return s;
 }
@@ -1293,7 +1477,13 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
 Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
                                            MemTable* mem, VersionEdit* edit) {
   mutex_.AssertHeld();
-  const uint64_t start_micros = env_->NowMicros();
+  assert(cfd);
+  assert(cfd->imm());
+  // The immutable memtable list must be empty.
+  assert(std::numeric_limits<uint64_t>::max() ==
+         cfd->imm()->GetEarliestMemTableID());
+
+  const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
 
   FileMetaData meta;
   std::vector<BlobFileAddition> blob_file_additions;
@@ -1321,7 +1511,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
         cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
 
     int64_t _current_time = 0;
-    env_->GetCurrentTime(&_current_time)
+    immutable_db_options_.clock->GetCurrentTime(&_current_time)
         .PermitUncheckedError();  // ignore error
     const uint64_t current_time = static_cast<uint64_t>(_current_time);
     meta.oldest_ancester_time = current_time;
@@ -1340,27 +1530,36 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
       std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
           range_del_iters;
       auto range_del_iter =
-          mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
+          // This is called during recovery, where a live memtable is flushed
+          // directly. In this case, no fragmented tombstone list is cached in
+          // this memtable yet.
+          mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
+                                         false /* immutable_memtable */);
       if (range_del_iter != nullptr) {
         range_del_iters.emplace_back(range_del_iter);
       }
 
       IOStatus io_s;
-      s = BuildTable(
-          dbname_, versions_.get(), immutable_db_options_, *cfd->ioptions(),
-          mutable_cf_options, file_options_for_compaction_, cfd->table_cache(),
-          iter.get(), std::move(range_del_iters), &meta, &blob_file_additions,
-          cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
-          cfd->GetID(), cfd->GetName(), snapshot_seqs,
-          earliest_write_conflict_snapshot, snapshot_checker,
+      TableBuilderOptions tboptions(
+          *cfd->ioptions(), mutable_cf_options, cfd->internal_comparator(),
+          cfd->int_tbl_prop_collector_factories(),
           GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
-          mutable_cf_options.sample_for_compression,
-          mutable_cf_options.compression_opts, paranoid_file_checks,
-          cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s,
-          io_tracer_, &event_logger_, job_id, Env::IO_HIGH,
-          nullptr /* table_properties */, -1 /* level */, current_time,
-          0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */,
-          db_id_, db_session_id_);
+          mutable_cf_options.compression_opts, cfd->GetID(), cfd->GetName(),
+          0 /* level */, false /* is_bottommost */,
+          TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
+          0 /* file_creation_time */, db_id_, db_session_id_,
+          0 /* target_file_size */, meta.fd.GetNumber());
+      SeqnoToTimeMapping empty_seqno_time_mapping;
+      s = BuildTable(
+          dbname_, versions_.get(), immutable_db_options_, tboptions,
+          file_options_for_compaction_, cfd->table_cache(), iter.get(),
+          std::move(range_del_iters), &meta, &blob_file_additions,
+          snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber,
+          snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s,
+          io_tracer_, BlobFileCreationReason::kRecovery,
+          empty_seqno_time_mapping, &event_logger_, job_id, Env::IO_HIGH,
+          nullptr /* table_properties */, write_hint,
+          nullptr /*full_history_ts_low*/, &blob_callback_);
       LogFlush(immutable_db_options_.info_log);
       ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                       "[%s] [WriteLevel0TableForRecovery]"
@@ -1368,6 +1567,11 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
                       cfd->GetName().c_str(), meta.fd.GetNumber(),
                       meta.fd.GetFileSize(), s.ToString().c_str());
       mutex_.Lock();
+
+      // TODO(AR) is this ok?
+      if (!io_s.ok() && s.ok()) {
+        s = io_s;
+      }
     }
   }
   ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
@@ -1382,15 +1586,18 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
     edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
                   meta.fd.GetFileSize(), meta.smallest, meta.largest,
                   meta.fd.smallest_seqno, meta.fd.largest_seqno,
-                  meta.marked_for_compaction, meta.oldest_blob_file_number,
-                  meta.oldest_ancester_time, meta.file_creation_time,
-                  meta.file_checksum, meta.file_checksum_func_name);
+                  meta.marked_for_compaction, meta.temperature,
+                  meta.oldest_blob_file_number, meta.oldest_ancester_time,
+                  meta.file_creation_time, meta.file_checksum,
+                  meta.file_checksum_func_name, meta.unique_id);
 
-    edit->SetBlobFileAdditions(std::move(blob_file_additions));
+    for (const auto& blob : blob_file_additions) {
+      edit->AddBlobFile(blob);
+    }
   }
 
   InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
-  stats.micros = env_->NowMicros() - start_micros;
+  stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
 
   if (has_output) {
     stats.bytes_written = meta.fd.GetFileSize();
@@ -1399,14 +1606,15 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
 
   const auto& blobs = edit->GetBlobFileAdditions();
   for (const auto& blob : blobs) {
-    stats.bytes_written += blob.GetTotalBlobBytes();
+    stats.bytes_written_blob += blob.GetTotalBlobBytes();
   }
 
-  stats.num_output_files += static_cast<int>(blobs.size());
+  stats.num_output_files_blob = static_cast<int>(blobs.size());
 
   cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
-  cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
-                                    stats.bytes_written);
+  cfd->internal_stats()->AddCFStats(
+      InternalStats::BYTES_FLUSHED,
+      stats.bytes_written + stats.bytes_written_blob);
   RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
   return s;
 }
@@ -1448,6 +1656,72 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
                       !kSeqPerBatch, kBatchPerTxn);
 }
 
+// TODO: Implement the trimming in flush code path.
+// TODO: Perform trimming before inserting into memtable during recovery.
+// TODO: Pick files with max_timestamp > trim_ts by each file's timestamp meta
+// info, and handle only these files to reduce io.
+Status DB::OpenAndTrimHistory(
+    const DBOptions& db_options, const std::string& dbname,
+    const std::vector<ColumnFamilyDescriptor>& column_families,
+    std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
+    std::string trim_ts) {
+  assert(dbptr != nullptr);
+  assert(handles != nullptr);
+  auto validate_options = [&db_options] {
+    if (db_options.avoid_flush_during_recovery) {
+      return Status::InvalidArgument(
+          "avoid_flush_during_recovery incompatible with "
+          "OpenAndTrimHistory");
+    }
+    return Status::OK();
+  };
+  auto s = validate_options();
+  if (!s.ok()) {
+    return s;
+  }
+
+  DB* db = nullptr;
+  s = DB::Open(db_options, dbname, column_families, handles, &db);
+  if (!s.ok()) {
+    return s;
+  }
+  assert(db);
+  CompactRangeOptions options;
+  options.bottommost_level_compaction =
+      BottommostLevelCompaction::kForceOptimized;
+  auto db_impl = static_cast_with_check<DBImpl>(db);
+  for (auto handle : *handles) {
+    assert(handle != nullptr);
+    auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(handle);
+    auto cfd = cfh->cfd();
+    assert(cfd != nullptr);
+    // Only compact column families with timestamp enabled
+    if (cfd->user_comparator() != nullptr &&
+        cfd->user_comparator()->timestamp_size() > 0) {
+      s = db_impl->CompactRangeInternal(options, handle, nullptr, nullptr,
+                                        trim_ts);
+      if (!s.ok()) {
+        break;
+      }
+    }
+  }
+  auto clean_op = [&handles, &db] {
+    for (auto handle : *handles) {
+      auto temp_s = db->DestroyColumnFamilyHandle(handle);
+      assert(temp_s.ok());
+    }
+    handles->clear();
+    delete db;
+  };
+  if (!s.ok()) {
+    clean_op();
+    return s;
+  }
+
+  *dbptr = db;
+  return s;
+}
+
 IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
                            size_t preallocate_block_size,
                            log::Writer** new_log) {
@@ -1458,15 +1732,14 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
       BuildDBOptions(immutable_db_options_, mutable_db_options_);
   FileOptions opt_file_options =
       fs_->OptimizeForLogWrite(file_options_, db_options);
-  std::string log_fname =
-      LogFileName(immutable_db_options_.wal_dir, log_file_num);
+  std::string wal_dir = immutable_db_options_.GetWalDir();
+  std::string log_fname = LogFileName(wal_dir, log_file_num);
 
   if (recycle_log_number) {
     ROCKS_LOG_INFO(immutable_db_options_.info_log,
                    "reusing log %" PRIu64 " from recycle list\n",
                    recycle_log_number);
-    std::string old_log_fname =
-        LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
+    std::string old_log_fname = LogFileName(wal_dir, recycle_log_number);
     TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
     TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
     io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
@@ -1480,12 +1753,17 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
     lfile->SetPreallocationBlockSize(preallocate_block_size);
 
     const auto& listeners = immutable_db_options_.listeners;
+    FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
     std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
-        std::move(lfile), log_fname, opt_file_options, env_, io_tracer_,
-        nullptr /* stats */, listeners));
+        std::move(lfile), log_fname, opt_file_options,
+        immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
+        nullptr, tmp_set.Contains(FileType::kWalFile),
+        tmp_set.Contains(FileType::kWalFile)));
     *new_log = new log::Writer(std::move(file_writer), log_file_num,
                                immutable_db_options_.recycle_log_file_num > 0,
-                               immutable_db_options_.manual_wal_flush);
+                               immutable_db_options_.manual_wal_flush,
+                               immutable_db_options_.wal_compression);
+    io_s = (*new_log)->AddCompressionTypeRecord();
   }
   return io_s;
 }
@@ -1505,6 +1783,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
   }
 
   *dbptr = nullptr;
+  assert(handles);
   handles->clear();
 
   size_t max_write_buffer_size = 0;
@@ -1514,7 +1793,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
   }
 
   DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
-  s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
+  if (!impl->immutable_db_options_.info_log) {
+    s = impl->init_logger_creation_s_;
+    delete impl;
+    return s;
+  } else {
+    assert(impl->init_logger_creation_s_.ok());
+  }
+  s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
   if (s.ok()) {
     std::vector<std::string> paths;
     for (auto& db_path : impl->immutable_db_options_.db_paths) {
@@ -1546,12 +1832,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
     return s;
   }
 
-  impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
-
+  impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
+  RecoveryContext recovery_ctx;
   impl->mutex_.Lock();
+
   // Handles create_if_missing, error_if_exists
   uint64_t recovered_seq(kMaxSequenceNumber);
-  s = impl->Recover(column_families, false, false, false, &recovered_seq);
+  s = impl->Recover(column_families, false, false, false, &recovered_seq,
+                    &recovery_ctx);
   if (s.ok()) {
     uint64_t new_log_number = impl->versions_->NewFileNumber();
     log::Writer* new_log = nullptr;
@@ -1563,57 +1851,13 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
       InstrumentedMutexLock wl(&impl->log_write_mutex_);
       impl->logfile_number_ = new_log_number;
       assert(new_log != nullptr);
+      assert(impl->logs_.empty());
       impl->logs_.emplace_back(new_log_number, new_log);
     }
 
     if (s.ok()) {
-      // set column family handles
-      for (auto cf : column_families) {
-        auto cfd =
-            impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
-        if (cfd != nullptr) {
-          handles->push_back(
-              new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
-          impl->NewThreadStatusCfInfo(cfd);
-        } else {
-          if (db_options.create_missing_column_families) {
-            // missing column family, create it
-            ColumnFamilyHandle* handle;
-            impl->mutex_.Unlock();
-            s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
-            impl->mutex_.Lock();
-            if (s.ok()) {
-              handles->push_back(handle);
-            } else {
-              break;
-            }
-          } else {
-            s = Status::InvalidArgument("Column family not found", cf.name);
-            break;
-          }
-        }
-      }
-    }
-    if (s.ok()) {
-      SuperVersionContext sv_context(/* create_superversion */ true);
-      for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
-        impl->InstallSuperVersionAndScheduleWork(
-            cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
-      }
-      sv_context.Clean();
-      if (impl->two_write_queues_) {
-        impl->log_write_mutex_.Lock();
-      }
       impl->alive_log_files_.push_back(
           DBImpl::LogFileNumberSize(impl->logfile_number_));
-      if (impl->two_write_queues_) {
-        impl->log_write_mutex_.Unlock();
-      }
-
-      impl->DeleteObsoleteFiles();
-      s = impl->directories_.GetDbDir()->Fsync(IOOptions(), nullptr);
-    }
-    if (s.ok()) {
       // In WritePrepared there could be gap in sequence numbers. This breaks
       // the trick we use in kPointInTimeRecovery which assumes the first seq in
       // the log right after the corrupted log is one larger than the last seq
@@ -1629,10 +1873,16 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
         WriteOptions write_options;
         uint64_t log_used, log_size;
         log::Writer* log_writer = impl->logs_.back().writer;
-        s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
+        LogFileNumberSize& log_file_number_size = impl->alive_log_files_.back();
+
+        assert(log_writer->get_log_number() == log_file_number_size.number);
+        impl->mutex_.AssertHeld();
+        s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size,
+                             Env::IO_TOTAL, log_file_number_size);
         if (s.ok()) {
           // Need to fsync, otherwise it might get lost after a power reset.
           s = impl->FlushWAL(false);
+          TEST_SYNC_POINT_CALLBACK("DBImpl::Open::BeforeSyncWAL", /*arg=*/&s);
           if (s.ok()) {
             s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
           }
@@ -1640,25 +1890,60 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
       }
     }
   }
+  if (s.ok()) {
+    s = impl->LogAndApplyForRecovery(recovery_ctx);
+  }
+
   if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
-    // try to read format version
-    s = impl->PersistentStatsProcessFormatVersion();
+    impl->mutex_.AssertHeld();
+    s = impl->InitPersistStatsColumnFamily();
   }
 
   if (s.ok()) {
-    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
-      if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
-        auto* vstorage = cfd->current()->storage_info();
-        for (int i = 1; i < vstorage->num_levels(); ++i) {
-          int num_files = vstorage->NumLevelFiles(i);
-          if (num_files > 0) {
-            s = Status::InvalidArgument(
-                "Not all files are at level 0. Cannot "
-                "open with FIFO compaction style.");
+    // set column family handles
+    for (auto cf : column_families) {
+      auto cfd =
+          impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
+      if (cfd != nullptr) {
+        handles->push_back(
+            new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
+        impl->NewThreadStatusCfInfo(cfd);
+      } else {
+        if (db_options.create_missing_column_families) {
+          // missing column family, create it
+          ColumnFamilyHandle* handle = nullptr;
+          impl->mutex_.Unlock();
+          s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
+          impl->mutex_.Lock();
+          if (s.ok()) {
+            handles->push_back(handle);
+          } else {
             break;
           }
+        } else {
+          s = Status::InvalidArgument("Column family not found", cf.name);
+          break;
         }
       }
+    }
+  }
+
+  if (s.ok()) {
+    SuperVersionContext sv_context(/* create_superversion */ true);
+    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
+      impl->InstallSuperVersionAndScheduleWork(
+          cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
+    }
+    sv_context.Clean();
+  }
+
+  if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
+    // try to read format version
+    s = impl->PersistentStatsProcessFormatVersion();
+  }
+
+  if (s.ok()) {
+    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
       if (!cfd->mem()->IsSnapshotSupported()) {
         impl->is_snapshot_supported_ = false;
       }
@@ -1684,6 +1969,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
 
     *dbptr = impl;
     impl->opened_successfully_ = true;
+    impl->DeleteObsoleteFiles();
+    TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFiles");
     impl->MaybeScheduleFlushOrCompaction();
   } else {
     persist_options_status.PermitUncheckedError();
@@ -1708,19 +1995,24 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
     // vast majority of all files), we'll pass the size to SstFileManager.
     // For all other files SstFileManager will query the size from filesystem.
 
-    std::vector<LiveFileMetaData> metadata;
-
-    impl->mutex_.Lock();
-    impl->versions_->GetLiveFilesMetaData(&metadata);
-    impl->mutex_.Unlock();
+    std::vector<ColumnFamilyMetaData> metadata;
+    impl->GetAllColumnFamilyMetaData(&metadata);
 
     std::unordered_map<std::string, uint64_t> known_file_sizes;
     for (const auto& md : metadata) {
-      std::string name = md.name;
-      if (!name.empty() && name[0] == '/') {
-        name = name.substr(1);
+      for (const auto& lmd : md.levels) {
+        for (const auto& fmd : lmd.files) {
+          known_file_sizes[fmd.relative_filename] = fmd.size;
+        }
+      }
+      for (const auto& bmd : md.blob_files) {
+        std::string name = bmd.blob_file_name;
+        // The BlobMetaData.blob_file_name may start with "/".
+        if (!name.empty() && name[0] == '/') {
+          name = name.substr(1);
+        }
+        known_file_sizes[name] = bmd.blob_file_size;
       }
-      known_file_sizes[name] = md.size;
     }
 
     std::vector<std::string> paths;
@@ -1733,23 +2025,25 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
     // Remove duplicate paths.
     std::sort(paths.begin(), paths.end());
     paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
+    IOOptions io_opts;
+    io_opts.do_not_recurse = true;
     for (auto& path : paths) {
       std::vector<std::string> existing_files;
-      // TODO: Check for errors here?
-      impl->immutable_db_options_.env->GetChildren(path, &existing_files)
-          .PermitUncheckedError();
+      impl->immutable_db_options_.fs
+          ->GetChildren(path, io_opts, &existing_files,
+                        /*IODebugContext*=*/nullptr)
+          .PermitUncheckedError();  //**TODO: What do to on error?
       for (auto& file_name : existing_files) {
         uint64_t file_number;
         FileType file_type;
         std::string file_path = path + "/" + file_name;
         if (ParseFileName(file_name, &file_number, &file_type) &&
-            file_type == kTableFile) {
+            (file_type == kTableFile || file_type == kBlobFile)) {
           // TODO: Check for errors from OnAddFile?
           if (known_file_sizes.count(file_name)) {
             // We're assuming that each sst file name exists in at most one of
             // the paths.
-            sfm->OnAddFile(file_path, known_file_sizes.at(file_name),
-                           /* compaction */ false)
+            sfm->OnAddFile(file_path, known_file_sizes.at(file_name))
                 .PermitUncheckedError();
           } else {
             sfm->OnAddFile(file_path).PermitUncheckedError();
@@ -1773,22 +2067,33 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
     ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
                      impl);
     LogFlush(impl->immutable_db_options_.info_log);
-    assert(impl->TEST_WALBufferIsEmpty());
-    // If the assert above fails then we need to FlushWAL before returning
-    // control back to the user.
-    if (!persist_options_status.ok()) {
+    if (!impl->WALBufferIsEmpty()) {
+      s = impl->FlushWAL(false);
+      if (s.ok()) {
+        // Sync is needed otherwise WAL buffered data might get lost after a
+        // power reset.
+        log::Writer* log_writer = impl->logs_.back().writer;
+        s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
+      }
+    }
+    if (s.ok() && !persist_options_status.ok()) {
       s = Status::IOError(
           "DB::Open() failed --- Unable to persist Options file",
           persist_options_status.ToString());
     }
-  } else {
+  }
+  if (!s.ok()) {
     ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
-                   "Persisting Option File error: %s",
-                   persist_options_status.ToString().c_str());
+                   "DB::Open() failed: %s", s.ToString().c_str());
   }
   if (s.ok()) {
-    impl->StartPeriodicWorkScheduler();
-  } else {
+    s = impl->StartPeriodicTaskScheduler();
+  }
+
+  if (s.ok()) {
+    s = impl->RegisterRecordSeqnoTimeWorker();
+  }
+  if (!s.ok()) {
     for (auto* h : *handles) {
       delete h;
     }