]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/error_handler_fs_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / error_handler_fs_test.cc
index 34c2f4cbb2df5a526d1131649880f7a2d68c306e..153f3b79ef202c88662354167560fa7e40378e6c 100644 (file)
@@ -9,9 +9,9 @@
 #ifndef ROCKSDB_LITE
 
 #include "db/db_test_util.h"
+#include "file/sst_file_manager_impl.h"
 #include "port/stack_trace.h"
 #include "rocksdb/io_status.h"
-#include "rocksdb/perf_context.h"
 #include "rocksdb/sst_file_manager.h"
 #if !defined(ROCKSDB_LITE)
 #include "test_util/sync_point.h"
@@ -25,7 +25,7 @@ namespace ROCKSDB_NAMESPACE {
 class DBErrorHandlingFSTest : public DBTestBase {
  public:
   DBErrorHandlingFSTest()
-      : DBTestBase("/db_error_handling_fs_test", /*env_do_fsync=*/true) {
+      : DBTestBase("db_error_handling_fs_test", /*env_do_fsync=*/true) {
     fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem()));
     fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_));
   }
@@ -66,6 +66,7 @@ class ErrorHandlerFSListener : public EventListener {
   ~ErrorHandlerFSListener() {
     file_creation_error_.PermitUncheckedError();
     bg_error_.PermitUncheckedError();
+    new_bg_error_.PermitUncheckedError();
   }
 
   void OnTableFileCreationStarted(
@@ -89,11 +90,11 @@ class ErrorHandlerFSListener : public EventListener {
     }
   }
 
-  void OnErrorRecoveryCompleted(Status old_bg_error) override {
+  void OnErrorRecoveryEnd(const BackgroundErrorRecoveryInfo& info) override {
     InstrumentedMutexLock l(&mutex_);
     recovery_complete_ = true;
     cv_.SignalAll();
-    old_bg_error.PermitUncheckedError();
+    new_bg_error_ = info.new_bg_error;
   }
 
   bool WaitForRecovery(uint64_t /*abs_time_us*/) {
@@ -138,6 +139,8 @@ class ErrorHandlerFSListener : public EventListener {
     file_creation_error_ = io_s;
   }
 
+  Status new_bg_error() { return new_bg_error_; }
+
  private:
   InstrumentedMutex mutex_;
   InstrumentedCondVar cv_;
@@ -148,6 +151,7 @@ class ErrorHandlerFSListener : public EventListener {
   int file_count_;
   IOStatus file_creation_error_;
   Status bg_error_;
+  Status new_bg_error_;
   FaultInjectionTestFS* fault_fs_;
 };
 
@@ -158,6 +162,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
   options.env = fault_env_.get();
   options.create_if_missing = true;
   options.listeners.emplace_back(listener);
+  options.statistics = CreateDBStatistics();
   Status s;
 
   listener->EnableAutoRecovery(false);
@@ -173,14 +178,73 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
   SyncPoint::GetInstance()->DisableProcessing();
   fault_fs_->SetFilesystemActive(true);
   s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_IO_ERROR_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
 
   Reopen(options);
   ASSERT_EQ("val", Get(Key(0)));
   Destroy(options);
 }
 
-TEST_F(DBErrorHandlingFSTest, FLushWritRetryableError) {
+// All the NoSpace IOError will be handled as the regular BG Error no matter the
+// retryable flag is set of not. So the auto resume for retryable IO Error will
+// not be triggered. Also, it is mapped as hard error.
+TEST_F(DBErrorHandlingFSTest, FLushWriteNoSpaceError) {
+  std::shared_ptr<ErrorHandlerFSListener> listener(
+      new ErrorHandlerFSListener());
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.listeners.emplace_back(listener);
+  options.max_bgerror_resume_count = 2;
+  options.bgerror_resume_retry_interval = 100000;  // 0.1 second
+  options.statistics = CreateDBStatistics();
+  Status s;
+
+  listener->EnableAutoRecovery(false);
+  DestroyAndReopen(options);
+
+  IOStatus error_msg = IOStatus::NoSpace("Retryable IO Error");
+  error_msg.SetRetryable(true);
+
+  ASSERT_OK(Put(Key(1), "val1"));
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeFinishBuildTable",
+      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  s = dbfull()->Resume();
+  ASSERT_OK(s);
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_IO_ERROR_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
+  Destroy(options);
+}
+
+TEST_F(DBErrorHandlingFSTest, FLushWriteRetryableError) {
   std::shared_ptr<ErrorHandlerFSListener> listener(
       new ErrorHandlerFSListener());
   Options options = GetDefaultOptions();
@@ -188,6 +252,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableError) {
   options.create_if_missing = true;
   options.listeners.emplace_back(listener);
   options.max_bgerror_resume_count = 0;
+  options.statistics = CreateDBStatistics();
   Status s;
 
   listener->EnableAutoRecovery(false);
@@ -202,11 +267,23 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableError) {
       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
   SyncPoint::GetInstance()->EnableProcessing();
   s = Flush();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
   SyncPoint::GetInstance()->DisableProcessing();
   fault_fs_->SetFilesystemActive(true);
   s = dbfull()->Resume();
   ASSERT_OK(s);
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_IO_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
   Reopen(options);
   ASSERT_EQ("val1", Get(Key(1)));
 
@@ -216,7 +293,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableError) {
       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
   SyncPoint::GetInstance()->EnableProcessing();
   s = Flush();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
   SyncPoint::GetInstance()->DisableProcessing();
   fault_fs_->SetFilesystemActive(true);
   s = dbfull()->Resume();
@@ -230,7 +307,91 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableError) {
       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
   SyncPoint::GetInstance()->EnableProcessing();
   s = Flush();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  s = dbfull()->Resume();
+  ASSERT_OK(s);
+  Reopen(options);
+  ASSERT_EQ("val3", Get(Key(3)));
+
+  Destroy(options);
+}
+
+TEST_F(DBErrorHandlingFSTest, FLushWriteFileScopeError) {
+  std::shared_ptr<ErrorHandlerFSListener> listener(
+      new ErrorHandlerFSListener());
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.listeners.emplace_back(listener);
+  options.max_bgerror_resume_count = 0;
+  Status s;
+
+  listener->EnableAutoRecovery(false);
+  DestroyAndReopen(options);
+
+  IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
+  error_msg.SetDataLoss(true);
+  error_msg.SetScope(
+      ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
+  error_msg.SetRetryable(false);
+
+  ASSERT_OK(Put(Key(1), "val1"));
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeFinishBuildTable",
+      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  s = dbfull()->Resume();
+  ASSERT_OK(s);
+  Reopen(options);
+  ASSERT_EQ("val1", Get(Key(1)));
+
+  ASSERT_OK(Put(Key(2), "val2"));
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeSyncTable",
+      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  s = dbfull()->Resume();
+  ASSERT_OK(s);
+  Reopen(options);
+  ASSERT_EQ("val2", Get(Key(2)));
+
+  ASSERT_OK(Put(Key(3), "val3"));
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeCloseTableFile",
+      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  s = dbfull()->Resume();
+  ASSERT_OK(s);
+  Reopen(options);
+  ASSERT_EQ("val3", Get(Key(3)));
+
+  // not file scope, but retyrable set
+  error_msg.SetDataLoss(false);
+  error_msg.SetScope(
+      ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFileSystem);
+  error_msg.SetRetryable(true);
+
+  ASSERT_OK(Put(Key(3), "val3"));
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeCloseTableFile",
+      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
   SyncPoint::GetInstance()->DisableProcessing();
   fault_fs_->SetFilesystemActive(true);
   s = dbfull()->Resume();
@@ -241,6 +402,96 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableError) {
   Destroy(options);
 }
 
+TEST_F(DBErrorHandlingFSTest, FLushWALWriteRetryableError) {
+  std::shared_ptr<ErrorHandlerFSListener> listener(
+      new ErrorHandlerFSListener());
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.listeners.emplace_back(listener);
+  options.max_bgerror_resume_count = 0;
+  Status s;
+
+  IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
+  error_msg.SetRetryable(true);
+
+  listener->EnableAutoRecovery(false);
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::SyncClosedLogs:Start",
+      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options);
+
+  WriteOptions wo = WriteOptions();
+  wo.disableWAL = false;
+  ASSERT_OK(Put(Key(1), "val1", wo));
+
+  s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  auto cfh = dbfull()->GetColumnFamilyHandle(1);
+  s = dbfull()->DropColumnFamily(cfh);
+
+  s = dbfull()->Resume();
+  ASSERT_OK(s);
+  ASSERT_EQ("val1", Get(Key(1)));
+  ASSERT_OK(Put(Key(3), "val3", wo));
+  ASSERT_EQ("val3", Get(Key(3)));
+  s = Flush();
+  ASSERT_OK(s);
+  ASSERT_EQ("val3", Get(Key(3)));
+
+  Destroy(options);
+}
+
+TEST_F(DBErrorHandlingFSTest, FLushWALAtomicWriteRetryableError) {
+  std::shared_ptr<ErrorHandlerFSListener> listener(
+      new ErrorHandlerFSListener());
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.listeners.emplace_back(listener);
+  options.max_bgerror_resume_count = 0;
+  options.atomic_flush = true;
+  Status s;
+
+  IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
+  error_msg.SetRetryable(true);
+
+  listener->EnableAutoRecovery(false);
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::SyncClosedLogs:Start",
+      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options);
+
+  WriteOptions wo = WriteOptions();
+  wo.disableWAL = false;
+  ASSERT_OK(Put(Key(1), "val1", wo));
+
+  s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  auto cfh = dbfull()->GetColumnFamilyHandle(1);
+  s = dbfull()->DropColumnFamily(cfh);
+
+  s = dbfull()->Resume();
+  ASSERT_OK(s);
+  ASSERT_EQ("val1", Get(Key(1)));
+  ASSERT_OK(Put(Key(3), "val3", wo));
+  ASSERT_EQ("val3", Get(Key(3)));
+  s = Flush();
+  ASSERT_OK(s);
+  ASSERT_EQ("val3", Get(Key(3)));
+
+  Destroy(options);
+}
+
+// The flush error is injected before we finish the table build
 TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError1) {
   std::shared_ptr<ErrorHandlerFSListener> listener(
       new ErrorHandlerFSListener());
@@ -249,6 +500,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError1) {
   options.create_if_missing = true;
   options.listeners.emplace_back(listener);
   options.max_bgerror_resume_count = 0;
+  options.statistics = CreateDBStatistics();
   Status s;
 
   listener->EnableAutoRecovery(false);
@@ -271,7 +523,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError1) {
   SyncPoint::GetInstance()->DisableProcessing();
   fault_fs_->SetFilesystemActive(true);
   s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
   ASSERT_EQ("val1", Get(Key(1)));
   ASSERT_EQ("val2", Get(Key(2)));
   ASSERT_OK(Put(Key(3), "val3", wo));
@@ -279,11 +531,24 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError1) {
   s = Flush();
   ASSERT_OK(s);
   ASSERT_EQ("val3", Get(Key(3)));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_IO_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
 
   Destroy(options);
 }
 
-TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError2) {
+// The retryable IO error is injected before we sync table
+TEST_F(DBErrorHandlingFSTest, FLushWriteNoWALRetryableError2) {
   std::shared_ptr<ErrorHandlerFSListener> listener(
       new ErrorHandlerFSListener());
   Options options = GetDefaultOptions();
@@ -314,7 +579,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError2) {
   SyncPoint::GetInstance()->DisableProcessing();
   fault_fs_->SetFilesystemActive(true);
   s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
   ASSERT_EQ("val1", Get(Key(1)));
   ASSERT_EQ("val2", Get(Key(2)));
   ASSERT_OK(Put(Key(3), "val3", wo));
@@ -326,7 +591,8 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError2) {
   Destroy(options);
 }
 
-TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError3) {
+// The retryable IO error is injected before we close the table file
+TEST_F(DBErrorHandlingFSTest, FLushWriteNoWALRetryableError3) {
   std::shared_ptr<ErrorHandlerFSListener> listener(
       new ErrorHandlerFSListener());
   Options options = GetDefaultOptions();
@@ -357,7 +623,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError3) {
   SyncPoint::GetInstance()->DisableProcessing();
   fault_fs_->SetFilesystemActive(true);
   s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
   ASSERT_EQ("val1", Get(Key(1)));
   ASSERT_EQ("val2", Get(Key(2)));
   ASSERT_OK(Put(Key(3), "val3", wo));
@@ -399,7 +665,7 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
   SyncPoint::GetInstance()->DisableProcessing();
   fault_fs_->SetFilesystemActive(true);
   s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   new_manifest = GetManifestNameFromLiveFiles();
   ASSERT_NE(new_manifest, old_manifest);
@@ -437,12 +703,103 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) {
       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
   SyncPoint::GetInstance()->EnableProcessing();
   s = Flush();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  s = dbfull()->Resume();
+  ASSERT_OK(s);
+
+  new_manifest = GetManifestNameFromLiveFiles();
+  ASSERT_NE(new_manifest, old_manifest);
+
+  Reopen(options);
+  ASSERT_EQ("val", Get(Key(0)));
+  ASSERT_EQ("val", Get(Key(1)));
+  Close();
+}
+
+TEST_F(DBErrorHandlingFSTest, ManifestWriteFileScopeError) {
+  std::shared_ptr<ErrorHandlerFSListener> listener(
+      new ErrorHandlerFSListener());
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.listeners.emplace_back(listener);
+  options.max_bgerror_resume_count = 0;
+  Status s;
+  std::string old_manifest;
+  std::string new_manifest;
+
+  listener->EnableAutoRecovery(false);
+  DestroyAndReopen(options);
+  old_manifest = GetManifestNameFromLiveFiles();
+
+  IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
+  error_msg.SetDataLoss(true);
+  error_msg.SetScope(
+      ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
+  error_msg.SetRetryable(false);
+
+  ASSERT_OK(Put(Key(0), "val"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Put(Key(1), "val"));
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::LogAndApply:WriteManifest",
+      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
   SyncPoint::GetInstance()->ClearAllCallBacks();
   SyncPoint::GetInstance()->DisableProcessing();
   fault_fs_->SetFilesystemActive(true);
   s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
+
+  new_manifest = GetManifestNameFromLiveFiles();
+  ASSERT_NE(new_manifest, old_manifest);
+
+  Reopen(options);
+  ASSERT_EQ("val", Get(Key(0)));
+  ASSERT_EQ("val", Get(Key(1)));
+  Close();
+}
+
+TEST_F(DBErrorHandlingFSTest, ManifestWriteNoWALRetryableError) {
+  std::shared_ptr<ErrorHandlerFSListener> listener(
+      new ErrorHandlerFSListener());
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.listeners.emplace_back(listener);
+  options.max_bgerror_resume_count = 0;
+  Status s;
+  std::string old_manifest;
+  std::string new_manifest;
+
+  listener->EnableAutoRecovery(false);
+  DestroyAndReopen(options);
+  old_manifest = GetManifestNameFromLiveFiles();
+
+  IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
+  error_msg.SetRetryable(true);
+
+  WriteOptions wo = WriteOptions();
+  wo.disableWAL = true;
+  ASSERT_OK(Put(Key(0), "val", wo));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Put(Key(1), "val", wo));
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::LogAndApply:WriteManifest",
+      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  s = dbfull()->Resume();
+  ASSERT_OK(s);
 
   new_manifest = GetManifestNameFromLiveFiles();
   ASSERT_NE(new_manifest, old_manifest);
@@ -490,7 +847,7 @@ TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) {
 
   // A successful Resume() will create a new manifest file
   s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   new_manifest = GetManifestNameFromLiveFiles();
   ASSERT_NE(new_manifest, old_manifest);
@@ -523,7 +880,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
   ASSERT_OK(Put(Key(0), "val"));
   ASSERT_OK(Put(Key(2), "val"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
       // Wait for flush of 2nd L0 file before starting compaction
@@ -553,7 +910,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
   // This Flush will trigger a compaction, which will fail when appending to
   // the manifest
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   TEST_SYNC_POINT("CompactionManifestWriteError:0");
   // Clear all errors so when the compaction is retried, it will succeed
@@ -564,7 +921,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
 
   s = dbfull()->TEST_WaitForCompact();
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   new_manifest = GetManifestNameFromLiveFiles();
   ASSERT_NE(new_manifest, old_manifest);
@@ -597,7 +954,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
   ASSERT_OK(Put(Key(0), "val"));
   ASSERT_OK(Put(Key(2), "val"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
   listener->EnableAutoRecovery(false);
@@ -623,7 +980,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
 
   ASSERT_OK(Put(Key(1), "val"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   TEST_SYNC_POINT("CompactionManifestWriteError:0");
   TEST_SYNC_POINT("CompactionManifestWriteError:1");
@@ -635,7 +992,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
   SyncPoint::GetInstance()->ClearAllCallBacks();
   SyncPoint::GetInstance()->DisableProcessing();
   s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   new_manifest = GetManifestNameFromLiveFiles();
   ASSERT_NE(new_manifest, old_manifest);
@@ -661,7 +1018,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
   ASSERT_OK(Put(Key(0), "va;"));
   ASSERT_OK(Put(Key(2), "va;"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   listener->OverrideBGError(
       Status(Status::NoSpace(), Status::Severity::kHardError));
@@ -678,18 +1035,18 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
 
   ASSERT_OK(Put(Key(1), "val"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   s = dbfull()->TEST_WaitForCompact();
   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
 
   fault_fs_->SetFilesystemActive(true);
   s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
   Destroy(options);
 }
 
-TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) {
+TEST_F(DBErrorHandlingFSTest, DISABLED_CompactionWriteRetryableError) {
   std::shared_ptr<ErrorHandlerFSListener> listener(
       new ErrorHandlerFSListener());
   Options options = GetDefaultOptions();
@@ -707,7 +1064,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) {
   ASSERT_OK(Put(Key(0), "va;"));
   ASSERT_OK(Put(Key(2), "va;"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
   listener->EnableAutoRecovery(false);
@@ -717,20 +1074,73 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) {
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "CompactionJob::OpenCompactionOutputFile",
       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::BackgroundCompaction:Finish",
+      [&](void*) { CancelAllBackgroundWork(dbfull()); });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
   ASSERT_OK(Put(Key(1), "val"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
-  s = dbfull()->TEST_WaitForCompact();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  s = dbfull()->TEST_GetBGError();
+  ASSERT_OK(s);
+  fault_fs_->SetFilesystemActive(true);
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  SyncPoint::GetInstance()->DisableProcessing();
+  s = dbfull()->Resume();
+  ASSERT_OK(s);
+  Destroy(options);
+}
+
+TEST_F(DBErrorHandlingFSTest, DISABLED_CompactionWriteFileScopeError) {
+  std::shared_ptr<ErrorHandlerFSListener> listener(
+      new ErrorHandlerFSListener());
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.level0_file_num_compaction_trigger = 2;
+  options.listeners.emplace_back(listener);
+  options.max_bgerror_resume_count = 0;
+  Status s;
+  DestroyAndReopen(options);
+
+  IOStatus error_msg = IOStatus::IOError("File Scope Data Loss Error");
+  error_msg.SetDataLoss(true);
+  error_msg.SetScope(
+      ROCKSDB_NAMESPACE::IOStatus::IOErrorScope::kIOErrorScopeFile);
+  error_msg.SetRetryable(false);
+
+  ASSERT_OK(Put(Key(0), "va;"));
+  ASSERT_OK(Put(Key(2), "va;"));
+  s = Flush();
+  ASSERT_OK(s);
+
+  listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
+  listener->EnableAutoRecovery(false);
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+        "BackgroundCallCompaction:0"}});
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "CompactionJob::OpenCompactionOutputFile",
+      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::BackgroundCompaction:Finish",
+      [&](void*) { CancelAllBackgroundWork(dbfull()); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(Put(Key(1), "val"));
+  s = Flush();
+  ASSERT_OK(s);
+
+  s = dbfull()->TEST_GetBGError();
+  ASSERT_OK(s);
 
   fault_fs_->SetFilesystemActive(true);
   SyncPoint::GetInstance()->ClearAllCallBacks();
   SyncPoint::GetInstance()->DisableProcessing();
   s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
   Destroy(options);
 }
 
@@ -745,7 +1155,7 @@ TEST_F(DBErrorHandlingFSTest, CorruptionError) {
   ASSERT_OK(Put(Key(0), "va;"));
   ASSERT_OK(Put(Key(2), "va;"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
       {{"DBImpl::FlushMemTable:FlushMemTableFinished",
@@ -759,7 +1169,7 @@ TEST_F(DBErrorHandlingFSTest, CorruptionError) {
 
   ASSERT_OK(Put(Key(1), "val"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   s = dbfull()->TEST_WaitForCompact();
   ASSERT_EQ(s.severity(),
@@ -767,7 +1177,7 @@ TEST_F(DBErrorHandlingFSTest, CorruptionError) {
 
   fault_fs_->SetFilesystemActive(true);
   s = dbfull()->Resume();
-  ASSERT_NE(s, Status::OK());
+  ASSERT_NOK(s);
   Destroy(options);
 }
 
@@ -782,6 +1192,7 @@ TEST_F(DBErrorHandlingFSTest, AutoRecoverFlushError) {
   options.env = fault_env_.get();
   options.create_if_missing = true;
   options.listeners.emplace_back(listener);
+  options.statistics = CreateDBStatistics();
   Status s;
 
   listener->EnableAutoRecovery();
@@ -799,7 +1210,19 @@ TEST_F(DBErrorHandlingFSTest, AutoRecoverFlushError) {
   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
 
   s = Put(Key(1), "val");
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_IO_ERROR_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
+  ASSERT_EQ(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
 
   Reopen(options);
   ASSERT_EQ("val", Get(Key(0)));
@@ -829,7 +1252,7 @@ TEST_F(DBErrorHandlingFSTest, FailRecoverFlushError) {
   // We should be able to shutdown the database while auto recovery is going
   // on in the background
   Close();
-  DestroyDB(dbname_, options);
+  DestroyDB(dbname_, options).PermitUncheckedError();
 }
 
 TEST_F(DBErrorHandlingFSTest, WALWriteError) {
@@ -859,7 +1282,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteError) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
+    ASSERT_OK(dbfull()->Write(wopts, &batch));
   };
 
   {
@@ -885,6 +1308,10 @@ TEST_F(DBErrorHandlingFSTest, WALWriteError) {
     ASSERT_EQ(s, s.NoSpace());
   }
   SyncPoint::GetInstance()->DisableProcessing();
+  // `ClearAllCallBacks()` is needed in addition to `DisableProcessing()` to
+  // drain all callbacks. Otherwise, a pending callback in the background
+  // could re-disable `fault_fs_` after we enable it below.
+  SyncPoint::GetInstance()->ClearAllCallBacks();
   fault_fs_->SetFilesystemActive(true);
   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
   for (auto i = 0; i < 199; ++i) {
@@ -915,7 +1342,6 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
   options.listeners.emplace_back(listener);
   options.paranoid_checks = true;
   options.max_bgerror_resume_count = 0;
-  Status s;
   Random rnd(301);
 
   DestroyAndReopen(options);
@@ -933,7 +1359,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
+    ASSERT_OK(dbfull()->Write(wopts, &batch));
   };
 
   // For the second batch, the first 2 file Append are successful, then the
@@ -956,8 +1382,8 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
     SyncPoint::GetInstance()->EnableProcessing();
     WriteOptions wopts;
     wopts.sync = true;
-    s = dbfull()->Write(wopts, &batch);
-    ASSERT_EQ(true, s.IsIOError());
+    Status s = dbfull()->Write(wopts, &batch);
+    ASSERT_TRUE(s.IsIOError());
   }
   fault_fs_->SetFilesystemActive(true);
   SyncPoint::GetInstance()->ClearAllCallBacks();
@@ -973,8 +1399,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
   }
 
   // Resume and write a new batch, should be in the WAL
-  s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(dbfull()->Resume());
   {
     WriteBatch batch;
 
@@ -984,7 +1409,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
+    ASSERT_OK(dbfull()->Write(wopts, &batch));
   };
 
   Reopen(options);
@@ -1010,7 +1435,6 @@ TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
   options.create_if_missing = true;
   options.writable_file_max_buffer_size = 32768;
   options.listeners.emplace_back(listener);
-  Status s;
   Random rnd(301);
 
   listener->EnableAutoRecovery();
@@ -1027,7 +1451,7 @@ TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
+    ASSERT_OK(dbfull()->Write(wopts, &batch));
   };
 
   {
@@ -1050,10 +1474,14 @@ TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
     SyncPoint::GetInstance()->EnableProcessing();
     WriteOptions wopts;
     wopts.sync = true;
-    s = dbfull()->Write(wopts, &batch);
-    ASSERT_EQ(s, s.NoSpace());
+    Status s = dbfull()->Write(wopts, &batch);
+    ASSERT_TRUE(s.IsNoSpace());
   }
   SyncPoint::GetInstance()->DisableProcessing();
+  // `ClearAllCallBacks()` is needed in addition to `DisableProcessing()` to
+  // drain all callbacks. Otherwise, a pending callback in the background
+  // could re-disable `fault_fs_` after we enable it below.
+  SyncPoint::GetInstance()->ClearAllCallBacks();
   fault_fs_->SetFilesystemActive(true);
   ASSERT_EQ(listener->WaitForRecovery(5000000), true);
 
@@ -1119,9 +1547,8 @@ TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
     listener[i]->InjectFileCreationError(fault_fs[i], 3,
                                          IOStatus::NoSpace("Out of space"));
     snprintf(buf, sizeof(buf), "_%d", i);
-    DestroyDB(dbname_ + std::string(buf), options[i]);
-    ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
-              Status::OK());
+    ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
+    ASSERT_OK(DB::Open(options[i], dbname_ + std::string(buf), &dbptr));
     db.emplace_back(dbptr);
   }
 
@@ -1134,8 +1561,8 @@ TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
-    ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
+    ASSERT_OK(db[i]->Write(wopts, &batch));
+    ASSERT_OK(db[i]->Flush(FlushOptions()));
   }
 
   def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
@@ -1149,8 +1576,8 @@ TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
-    ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
+    ASSERT_OK(db[i]->Write(wopts, &batch));
+    ASSERT_OK(db[i]->Flush(FlushOptions()));
   }
 
   for (auto i = 0; i < kNumDbInstances; ++i) {
@@ -1163,16 +1590,19 @@ TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
   for (auto i = 0; i < kNumDbInstances; ++i) {
     std::string prop;
     ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
-    ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),
-              Status::OK());
+    ASSERT_OK(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true));
     EXPECT_TRUE(db[i]->GetProperty(
-        "rocksdb.num-files-at-level" + NumberToString(0), &prop));
+        "rocksdb.num-files-at-level" + std::to_string(0), &prop));
     EXPECT_EQ(atoi(prop.c_str()), 0);
     EXPECT_TRUE(db[i]->GetProperty(
-        "rocksdb.num-files-at-level" + NumberToString(1), &prop));
+        "rocksdb.num-files-at-level" + std::to_string(1), &prop));
     EXPECT_EQ(atoi(prop.c_str()), 1);
   }
 
+  SstFileManagerImpl* sfmImpl =
+      static_cast_with_check<SstFileManagerImpl>(sfm.get());
+  sfmImpl->Close();
+
   for (auto i = 0; i < kNumDbInstances; ++i) {
     char buf[16];
     snprintf(buf, sizeof(buf), "_%d", i);
@@ -1181,7 +1611,7 @@ TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
     if (getenv("KEEP_DB")) {
       printf("DB is still at %s%s\n", dbname_.c_str(), buf);
     } else {
-      Status s = DestroyDB(dbname_ + std::string(buf), options[i]);
+      ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
     }
   }
   options.clear();
@@ -1236,9 +1666,8 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
         break;
     }
     snprintf(buf, sizeof(buf), "_%d", i);
-    DestroyDB(dbname_ + std::string(buf), options[i]);
-    ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
-              Status::OK());
+    ASSERT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
+    ASSERT_OK(DB::Open(options[i], dbname_ + std::string(buf), &dbptr));
     db.emplace_back(dbptr);
   }
 
@@ -1251,8 +1680,8 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
-    ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
+    ASSERT_OK(db[i]->Write(wopts, &batch));
+    ASSERT_OK(db[i]->Flush(FlushOptions()));
   }
 
   def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
@@ -1266,11 +1695,11 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
+    ASSERT_OK(db[i]->Write(wopts, &batch));
     if (i != 1) {
-      ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
+      ASSERT_OK(db[i]->Flush(FlushOptions()));
     } else {
-      ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::NoSpace());
+      ASSERT_TRUE(db[i]->Flush(FlushOptions()).IsNoSpace());
     }
   }
 
@@ -1284,7 +1713,7 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
         ASSERT_EQ(s.severity(), Status::Severity::kHardError);
         break;
       case 2:
-        ASSERT_EQ(s, Status::OK());
+        ASSERT_OK(s);
         break;
     }
     fault_fs[i]->SetFilesystemActive(true);
@@ -1297,17 +1726,20 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
       ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
     }
     if (i == 1) {
-      ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),
-                Status::OK());
+      ASSERT_OK(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true));
     }
     EXPECT_TRUE(db[i]->GetProperty(
-        "rocksdb.num-files-at-level" + NumberToString(0), &prop));
+        "rocksdb.num-files-at-level" + std::to_string(0), &prop));
     EXPECT_EQ(atoi(prop.c_str()), 0);
     EXPECT_TRUE(db[i]->GetProperty(
-        "rocksdb.num-files-at-level" + NumberToString(1), &prop));
+        "rocksdb.num-files-at-level" + std::to_string(1), &prop));
     EXPECT_EQ(atoi(prop.c_str()), 1);
   }
 
+  SstFileManagerImpl* sfmImpl =
+      static_cast_with_check<SstFileManagerImpl>(sfm.get());
+  sfmImpl->Close();
+
   for (auto i = 0; i < kNumDbInstances; ++i) {
     char buf[16];
     snprintf(buf, sizeof(buf), "_%d", i);
@@ -1316,7 +1748,7 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
     if (getenv("KEEP_DB")) {
       printf("DB is still at %s%s\n", dbname_.c_str(), buf);
     } else {
-      DestroyDB(dbname_ + std::string(buf), options[i]);
+      EXPECT_OK(DestroyDB(dbname_ + std::string(buf), options[i]));
     }
   }
   options.clear();
@@ -1328,7 +1760,7 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
 // to soft error and trigger auto resume. During auto resume, SwitchMemtable
 // is disabled to avoid small SST tables. Write can still be applied before
 // the bg error is cleaned unless the memtable is full.
-TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover1) {
+TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableErrorAutoRecover1) {
   // Activate the FS before the first resume
   std::shared_ptr<ErrorHandlerFSListener> listener(
       new ErrorHandlerFSListener());
@@ -1338,6 +1770,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover1) {
   options.listeners.emplace_back(listener);
   options.max_bgerror_resume_count = 2;
   options.bgerror_resume_retry_interval = 100000;  // 0.1 second
+  options.statistics = CreateDBStatistics();
   Status s;
 
   listener->EnableAutoRecovery(false);
@@ -1365,6 +1798,22 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover1) {
   ASSERT_EQ("val1", Get(Key(1)));
   SyncPoint::GetInstance()->DisableProcessing();
   fault_fs_->SetFilesystemActive(true);
+  ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_ERROR_COUNT));
+  ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_IO_ERROR_COUNT));
+  ASSERT_EQ(3, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_COUNT));
+  ASSERT_LE(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
+  ASSERT_LE(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
+  HistogramData autoresume_retry;
+  options.statistics->histogramData(ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
+                                    &autoresume_retry);
+  ASSERT_GE(autoresume_retry.max, 0);
   ASSERT_OK(Put(Key(2), "val2", wo));
   s = Flush();
   // Since auto resume fails, the bg error is not cleand, flush will
@@ -1373,17 +1822,15 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover1) {
   ASSERT_EQ("val2", Get(Key(2)));
 
   // call auto resume
-  s = dbfull()->Resume();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(dbfull()->Resume());
   ASSERT_OK(Put(Key(3), "val3", wo));
-  s = Flush();
   // After resume is successful, the flush should be ok.
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(Flush());
   ASSERT_EQ("val3", Get(Key(3)));
   Destroy(options);
 }
 
-TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover2) {
+TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableErrorAutoRecover2) {
   // Activate the FS before the first resume
   std::shared_ptr<ErrorHandlerFSListener> listener(
       new ErrorHandlerFSListener());
@@ -1393,6 +1840,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover2) {
   options.listeners.emplace_back(listener);
   options.max_bgerror_resume_count = 2;
   options.bgerror_resume_retry_interval = 100000;  // 0.1 second
+  options.statistics = CreateDBStatistics();
   Status s;
 
   listener->EnableAutoRecovery(false);
@@ -1411,164 +1859,40 @@ TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover2) {
   SyncPoint::GetInstance()->EnableProcessing();
   s = Flush();
   ASSERT_EQ("val1", Get(Key(1)));
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
-  SyncPoint::GetInstance()->DisableProcessing();
-  fault_fs_->SetFilesystemActive(true);
-  ASSERT_EQ(listener->WaitForRecovery(5000000), true);
-  ASSERT_EQ("val1", Get(Key(1)));
-  ASSERT_OK(Put(Key(2), "val2", wo));
-  s = Flush();
-  // Since auto resume is successful, the bg error is cleaned, flush will
-  // be successful.
-  ASSERT_OK(s);
-  ASSERT_EQ("val2", Get(Key(2)));
-  Destroy(options);
-}
-
-TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover1) {
-  // Fail the first resume and make the second resume successful
-  std::shared_ptr<ErrorHandlerFSListener> listener(
-      new ErrorHandlerFSListener());
-  Options options = GetDefaultOptions();
-  options.env = fault_env_.get();
-  options.create_if_missing = true;
-  options.listeners.emplace_back(listener);
-  options.max_bgerror_resume_count = 2;
-  options.bgerror_resume_retry_interval = 100000;  // 0.1 second
-  Status s;
-
-  listener->EnableAutoRecovery(false);
-  DestroyAndReopen(options);
-
-  IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
-  error_msg.SetRetryable(true);
-
-  ASSERT_OK(Put(Key(1), "val1"));
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
-      {{"RecoverFromRetryableBGIOError:BeforeWait0",
-        "FLushWritRetryableeErrorAutoRecover1:0"},
-       {"FLushWritRetryableeErrorAutoRecover1:1",
-        "RecoverFromRetryableBGIOError:BeforeWait1"},
-       {"RecoverFromRetryableBGIOError:RecoverSuccess",
-        "FLushWritRetryableeErrorAutoRecover1:2"}});
-  SyncPoint::GetInstance()->SetCallBack(
-      "BuildTable:BeforeFinishBuildTable",
-      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
-  SyncPoint::GetInstance()->EnableProcessing();
-  s = Flush();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:0");
-  fault_fs_->SetFilesystemActive(true);
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:1");
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:2");
-  SyncPoint::GetInstance()->DisableProcessing();
-
-  ASSERT_EQ("val1", Get(Key(1)));
-  Reopen(options);
-  ASSERT_EQ("val1", Get(Key(1)));
-  ASSERT_OK(Put(Key(2), "val2"));
-  s = Flush();
-  ASSERT_EQ(s, Status::OK());
-  ASSERT_EQ("val2", Get(Key(2)));
-
-  Destroy(options);
-}
-
-TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover2) {
-  // Activate the FS before the first resume
-  std::shared_ptr<ErrorHandlerFSListener> listener(
-      new ErrorHandlerFSListener());
-  Options options = GetDefaultOptions();
-  options.env = fault_env_.get();
-  options.create_if_missing = true;
-  options.listeners.emplace_back(listener);
-  options.max_bgerror_resume_count = 2;
-  options.bgerror_resume_retry_interval = 100000;  // 0.1 second
-  Status s;
-
-  listener->EnableAutoRecovery(false);
-  DestroyAndReopen(options);
-
-  IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
-  error_msg.SetRetryable(true);
-
-  ASSERT_OK(Put(Key(1), "val1"));
-  SyncPoint::GetInstance()->SetCallBack(
-      "BuildTable:BeforeFinishBuildTable",
-      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
-
-  SyncPoint::GetInstance()->EnableProcessing();
-  s = Flush();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
-  SyncPoint::GetInstance()->DisableProcessing();
-  fault_fs_->SetFilesystemActive(true);
-  ASSERT_EQ(listener->WaitForRecovery(5000000), true);
-
-  ASSERT_EQ("val1", Get(Key(1)));
-  Reopen(options);
-  ASSERT_EQ("val1", Get(Key(1)));
-  ASSERT_OK(Put(Key(2), "val2"));
-  s = Flush();
-  ASSERT_EQ(s, Status::OK());
-  ASSERT_EQ("val2", Get(Key(2)));
-
-  Destroy(options);
-}
-
-TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover3) {
-  // Fail all the resume and let user to resume
-  std::shared_ptr<ErrorHandlerFSListener> listener(
-      new ErrorHandlerFSListener());
-  Options options = GetDefaultOptions();
-  options.env = fault_env_.get();
-  options.create_if_missing = true;
-  options.listeners.emplace_back(listener);
-  options.max_bgerror_resume_count = 2;
-  options.bgerror_resume_retry_interval = 100000;  // 0.1 second
-  Status s;
-
-  listener->EnableAutoRecovery(false);
-  DestroyAndReopen(options);
-
-  IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
-  error_msg.SetRetryable(true);
-
-  ASSERT_OK(Put(Key(1), "val1"));
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
-      {{"FLushWritRetryableeErrorAutoRecover3:0",
-        "RecoverFromRetryableBGIOError:BeforeStart"},
-       {"RecoverFromRetryableBGIOError:LoopOut",
-        "FLushWritRetryableeErrorAutoRecover3:1"}});
-  SyncPoint::GetInstance()->SetCallBack(
-      "BuildTable:BeforeFinishBuildTable",
-      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
-  SyncPoint::GetInstance()->EnableProcessing();
-  s = Flush();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover3:0");
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover3:1");
-  fault_fs_->SetFilesystemActive(true);
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
-  SyncPoint::GetInstance()->DisableProcessing();
-
-  ASSERT_EQ("val1", Get(Key(1)));
-  // Auto resume fails due to FS does not recover during resume. User call
-  // resume manually here.
-  s = dbfull()->Resume();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  ASSERT_EQ(listener->WaitForRecovery(5000000), true);
   ASSERT_EQ("val1", Get(Key(1)));
-  ASSERT_EQ(s, Status::OK());
-  ASSERT_OK(Put(Key(2), "val2"));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_IO_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_COUNT));
+  ASSERT_LE(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
+  ASSERT_LE(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_SUCCESS_COUNT));
+  HistogramData autoresume_retry;
+  options.statistics->histogramData(ERROR_HANDLER_AUTORESUME_RETRY_COUNT,
+                                    &autoresume_retry);
+  ASSERT_GE(autoresume_retry.max, 0);
+  ASSERT_OK(Put(Key(2), "val2", wo));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  // Since auto resume is successful, the bg error is cleaned, flush will
+  // be successful.
+  ASSERT_OK(s);
   ASSERT_EQ("val2", Get(Key(2)));
-
   Destroy(options);
 }
 
-TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover4) {
-  // Fail the first resume and does not do resume second time because
-  // the IO error severity is Fatal Error and not Retryable.
+// Auto resume fromt the flush retryable IO error. Activate the FS before the
+// first resume. Resume is successful
+TEST_F(DBErrorHandlingFSTest, FLushWritRetryableErrorAutoRecover1) {
+  // Activate the FS before the first resume
   std::shared_ptr<ErrorHandlerFSListener> listener(
       new ErrorHandlerFSListener());
   Options options = GetDefaultOptions();
@@ -1576,7 +1900,7 @@ TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover4) {
   options.create_if_missing = true;
   options.listeners.emplace_back(listener);
   options.max_bgerror_resume_count = 2;
-  options.bgerror_resume_retry_interval = 10;  // 0.1 second
+  options.bgerror_resume_retry_interval = 100000;  // 0.1 second
   Status s;
 
   listener->EnableAutoRecovery(false);
@@ -1584,55 +1908,33 @@ TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover4) {
 
   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
   error_msg.SetRetryable(true);
-  IOStatus nr_msg = IOStatus::IOError("No Retryable Fatal IO Error");
-  nr_msg.SetRetryable(false);
 
   ASSERT_OK(Put(Key(1), "val1"));
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
-      {{"RecoverFromRetryableBGIOError:BeforeStart",
-        "FLushWritRetryableeErrorAutoRecover4:0"},
-       {"FLushWritRetryableeErrorAutoRecover4:2",
-        "RecoverFromRetryableBGIOError:RecoverFail0"}});
   SyncPoint::GetInstance()->SetCallBack(
       "BuildTable:BeforeFinishBuildTable",
       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
-  SyncPoint::GetInstance()->SetCallBack(
-      "RecoverFromRetryableBGIOError:BeforeResume1",
-      [&](void*) { fault_fs_->SetFilesystemActive(false, nr_msg); });
 
   SyncPoint::GetInstance()->EnableProcessing();
   s = Flush();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover4:0");
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover4:2");
-  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
   SyncPoint::GetInstance()->DisableProcessing();
   fault_fs_->SetFilesystemActive(true);
-  // Even the FS is recoverd, due to the Fatal Error in bg_error_ the resume
-  // and flush will all fail.
-  ASSERT_EQ("val1", Get(Key(1)));
-  s = dbfull()->Resume();
-  ASSERT_NE(s, Status::OK());
-  ASSERT_EQ("val1", Get(Key(1)));
-  ASSERT_OK(Put(Key(2), "val2"));
-  s = Flush();
-  ASSERT_NE(s, Status::OK());
-  ASSERT_EQ("NOT_FOUND", Get(Key(2)));
+  ASSERT_EQ(listener->WaitForRecovery(5000000), true);
 
+  ASSERT_EQ("val1", Get(Key(1)));
   Reopen(options);
   ASSERT_EQ("val1", Get(Key(1)));
   ASSERT_OK(Put(Key(2), "val2"));
-  s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(Flush());
   ASSERT_EQ("val2", Get(Key(2)));
 
   Destroy(options);
 }
 
-TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover5) {
-  // During the resume, call DB->CLose, make sure the resume thread exist
-  // before close continues. Due to the shutdown, the resume is not successful
-  // and the FS does not become active, so close status is still IO error
+// Auto resume fromt the flush retryable IO error and set the retry limit count.
+// Never activate the FS and auto resume should fail at the end
+TEST_F(DBErrorHandlingFSTest, FLushWritRetryableErrorAutoRecover2) {
+  // Fail all the resume and let user to resume
   std::shared_ptr<ErrorHandlerFSListener> listener(
       new ErrorHandlerFSListener());
   Options options = GetDefaultOptions();
@@ -1640,7 +1942,7 @@ TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover5) {
   options.create_if_missing = true;
   options.listeners.emplace_back(listener);
   options.max_bgerror_resume_count = 2;
-  options.bgerror_resume_retry_interval = 10;  // 0.1 second
+  options.bgerror_resume_retry_interval = 100000;  // 0.1 second
   Status s;
 
   listener->EnableAutoRecovery(false);
@@ -1651,37 +1953,39 @@ TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover5) {
 
   ASSERT_OK(Put(Key(1), "val1"));
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
-      {{"RecoverFromRetryableBGIOError:BeforeStart",
-        "FLushWritRetryableeErrorAutoRecover5:0"}});
+      {{"FLushWritRetryableeErrorAutoRecover2:0",
+        "RecoverFromRetryableBGIOError:BeforeStart"},
+       {"RecoverFromRetryableBGIOError:LoopOut",
+        "FLushWritRetryableeErrorAutoRecover2:1"}});
   SyncPoint::GetInstance()->SetCallBack(
       "BuildTable:BeforeFinishBuildTable",
       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
   SyncPoint::GetInstance()->EnableProcessing();
   s = Flush();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover5:0");
-  // The first resume will cause recovery_error and its severity is the
-  // Fatal error
-  s = dbfull()->Close();
-  ASSERT_NE(s, Status::OK());
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover2:0");
+  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover2:1");
+  fault_fs_->SetFilesystemActive(true);
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
   SyncPoint::GetInstance()->DisableProcessing();
-  fault_fs_->SetFilesystemActive(true);
 
-  Reopen(options);
-  ASSERT_NE("val1", Get(Key(1)));
+  ASSERT_EQ("val1", Get(Key(1)));
+  // Auto resume fails due to FS does not recover during resume. User call
+  // resume manually here.
+  s = dbfull()->Resume();
+  ASSERT_EQ("val1", Get(Key(1)));
+  ASSERT_OK(s);
   ASSERT_OK(Put(Key(2), "val2"));
-  s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(Flush());
   ASSERT_EQ("val2", Get(Key(2)));
 
   Destroy(options);
 }
 
-TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover6) {
-  // During the resume, call DB->CLose, make sure the resume thread exist
-  // before close continues. Due to the shutdown, the resume is not successful
-  // and the FS does not become active, so close status is still IO error
+// Auto resume fromt the flush retryable IO error and set the retry limit count.
+// Fail the first resume and let the second resume be successful.
+TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableErrorAutoRecover) {
+  // Fail the first resume and let the second resume be successful
   std::shared_ptr<ErrorHandlerFSListener> listener(
       new ErrorHandlerFSListener());
   Options options = GetDefaultOptions();
@@ -1689,54 +1993,51 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover6) {
   options.create_if_missing = true;
   options.listeners.emplace_back(listener);
   options.max_bgerror_resume_count = 2;
-  options.bgerror_resume_retry_interval = 10;  // 0.1 second
+  options.bgerror_resume_retry_interval = 100000;  // 0.1 second
   Status s;
+  std::string old_manifest;
+  std::string new_manifest;
 
   listener->EnableAutoRecovery(false);
   DestroyAndReopen(options);
+  old_manifest = GetManifestNameFromLiveFiles();
 
   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
   error_msg.SetRetryable(true);
 
-  ASSERT_OK(Put(Key(1), "val1"));
+  ASSERT_OK(Put(Key(0), "val"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(Put(Key(1), "val"));
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
-      {{"FLushWritRetryableeErrorAutoRecover6:0",
-        "RecoverFromRetryableBGIOError:BeforeStart"},
-       {"RecoverFromRetryableBGIOError:BeforeWait0",
-        "FLushWritRetryableeErrorAutoRecover6:1"},
-       {"FLushWritRetryableeErrorAutoRecover6:2",
+      {{"RecoverFromRetryableBGIOError:BeforeStart",
+        "ManifestWriteRetryableErrorAutoRecover:0"},
+       {"ManifestWriteRetryableErrorAutoRecover:1",
         "RecoverFromRetryableBGIOError:BeforeWait1"},
-       {"RecoverFromRetryableBGIOError:AfterWait0",
-        "FLushWritRetryableeErrorAutoRecover6:3"}});
+       {"RecoverFromRetryableBGIOError:RecoverSuccess",
+        "ManifestWriteRetryableErrorAutoRecover:2"}});
   SyncPoint::GetInstance()->SetCallBack(
-      "BuildTable:BeforeFinishBuildTable",
+      "VersionSet::LogAndApply:WriteManifest",
       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
   SyncPoint::GetInstance()->EnableProcessing();
   s = Flush();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:0");
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:1");
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:0");
   fault_fs_->SetFilesystemActive(true);
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:2");
-  TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:3");
-  // The first resume will cause recovery_error and its severity is the
-  // Fatal error
-  s = dbfull()->Close();
-  ASSERT_EQ(s, Status::OK());
+  TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:1");
+  TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:2");
   SyncPoint::GetInstance()->DisableProcessing();
 
-  Reopen(options);
-  ASSERT_EQ("val1", Get(Key(1)));
-  ASSERT_OK(Put(Key(2), "val2"));
-  s = Flush();
-  ASSERT_EQ(s, Status::OK());
-  ASSERT_EQ("val2", Get(Key(2)));
+  new_manifest = GetManifestNameFromLiveFiles();
+  ASSERT_NE(new_manifest, old_manifest);
 
-  Destroy(options);
+  Reopen(options);
+  ASSERT_EQ("val", Get(Key(0)));
+  ASSERT_EQ("val", Get(Key(1)));
+  Close();
 }
 
-TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableErrorAutoRecover) {
+TEST_F(DBErrorHandlingFSTest, ManifestWriteNoWALRetryableErrorAutoRecover) {
   // Fail the first resume and let the second resume be successful
   std::shared_ptr<ErrorHandlerFSListener> listener(
       new ErrorHandlerFSListener());
@@ -1757,27 +2058,29 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableErrorAutoRecover) {
   IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
   error_msg.SetRetryable(true);
 
-  ASSERT_OK(Put(Key(0), "val"));
+  WriteOptions wo = WriteOptions();
+  wo.disableWAL = true;
+  ASSERT_OK(Put(Key(0), "val", wo));
   ASSERT_OK(Flush());
-  ASSERT_OK(Put(Key(1), "val"));
+  ASSERT_OK(Put(Key(1), "val", wo));
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
       {{"RecoverFromRetryableBGIOError:BeforeStart",
-        "ManifestWriteRetryableErrorAutoRecover:0"},
-       {"ManifestWriteRetryableErrorAutoRecover:1",
+        "ManifestWriteNoWALRetryableErrorAutoRecover:0"},
+       {"ManifestWriteNoWALRetryableErrorAutoRecover:1",
         "RecoverFromRetryableBGIOError:BeforeWait1"},
        {"RecoverFromRetryableBGIOError:RecoverSuccess",
-        "ManifestWriteRetryableErrorAutoRecover:2"}});
+        "ManifestWriteNoWALRetryableErrorAutoRecover:2"}});
   SyncPoint::GetInstance()->SetCallBack(
       "VersionSet::LogAndApply:WriteManifest",
       [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
   SyncPoint::GetInstance()->EnableProcessing();
   s = Flush();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
-  TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:0");
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:0");
   fault_fs_->SetFilesystemActive(true);
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
-  TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:1");
-  TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:2");
+  TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:1");
+  TEST_SYNC_POINT("ManifestWriteNoWALRetryableErrorAutoRecover:2");
   SyncPoint::GetInstance()->DisableProcessing();
 
   new_manifest = GetManifestNameFromLiveFiles();
@@ -1812,8 +2115,7 @@ TEST_F(DBErrorHandlingFSTest,
 
   ASSERT_OK(Put(Key(0), "val"));
   ASSERT_OK(Put(Key(2), "val"));
-  s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(Flush());
 
   listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
   listener->EnableAutoRecovery(false);
@@ -1850,7 +2152,7 @@ TEST_F(DBErrorHandlingFSTest,
 
   ASSERT_OK(Put(Key(1), "val"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   TEST_SYNC_POINT("CompactionManifestWriteErrorAR:0");
   TEST_SYNC_POINT("CompactionManifestWriteErrorAR:1");
@@ -1900,7 +2202,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableErrorAutoRecover) {
   ASSERT_OK(Put(Key(0), "va;"));
   ASSERT_OK(Put(Key(2), "va;"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
   listener->EnableAutoRecovery(false);
@@ -1925,11 +2227,10 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableErrorAutoRecover) {
 
   ASSERT_OK(Put(Key(1), "val"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   s = dbfull()->TEST_WaitForCompact();
-  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
-
+  ASSERT_OK(s);
   TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
   SyncPoint::GetInstance()->ClearAllCallBacks();
   SyncPoint::GetInstance()->DisableProcessing();
@@ -1965,7 +2266,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover1) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
+    ASSERT_OK(dbfull()->Write(wopts, &batch));
   };
 
   // For the second batch, the first 2 file Append are successful, then the
@@ -1978,7 +2279,8 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover1) {
       ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
     }
     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
-        {{"RecoverFromRetryableBGIOError:BeforeResume0", "WALWriteError1:0"},
+        {{"WALWriteErrorDone", "RecoverFromRetryableBGIOError:BeforeStart"},
+         {"RecoverFromRetryableBGIOError:BeforeResume0", "WALWriteError1:0"},
          {"WALWriteError1:1", "RecoverFromRetryableBGIOError:BeforeResume1"},
          {"RecoverFromRetryableBGIOError:RecoverSuccess", "WALWriteError1:2"}});
 
@@ -1994,6 +2296,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover1) {
     wopts.sync = true;
     s = dbfull()->Write(wopts, &batch);
     ASSERT_EQ(true, s.IsIOError());
+    TEST_SYNC_POINT("WALWriteErrorDone");
 
     TEST_SYNC_POINT("WALWriteError1:0");
     fault_fs_->SetFilesystemActive(true);
@@ -2022,7 +2325,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover1) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
+    ASSERT_OK(dbfull()->Write(wopts, &batch));
   };
 
   Reopen(options);
@@ -2066,7 +2369,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
+    ASSERT_OK(dbfull()->Write(wopts, &batch));
   };
 
   // For the second batch, the first 2 file Append are successful, then the
@@ -2123,7 +2426,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
+    ASSERT_OK(dbfull()->Write(wopts, &batch));
   };
 
   Reopen(options);
@@ -2137,6 +2440,246 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) {
   Close();
 }
 
+// Fail auto resume from a flush retryable error and verify that
+// OnErrorRecoveryEnd listener callback is called
+TEST_F(DBErrorHandlingFSTest, FLushWritRetryableErrorAbortRecovery) {
+  // Activate the FS before the first resume
+  std::shared_ptr<ErrorHandlerFSListener> listener(
+      new ErrorHandlerFSListener());
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.listeners.emplace_back(listener);
+  options.max_bgerror_resume_count = 2;
+  options.bgerror_resume_retry_interval = 100000;  // 0.1 second
+  Status s;
+
+  listener->EnableAutoRecovery(false);
+  DestroyAndReopen(options);
+
+  IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
+  error_msg.SetRetryable(true);
+
+  ASSERT_OK(Put(Key(1), "val1"));
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeFinishBuildTable",
+      [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
+
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  ASSERT_EQ(listener->WaitForRecovery(5000000), true);
+  ASSERT_EQ(listener->new_bg_error(), Status::Aborted());
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+
+  Destroy(options);
+}
+
+TEST_F(DBErrorHandlingFSTest, FlushReadError) {
+  std::shared_ptr<ErrorHandlerFSListener> listener =
+      std::make_shared<ErrorHandlerFSListener>();
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.listeners.emplace_back(listener);
+  options.statistics = CreateDBStatistics();
+  Status s;
+
+  listener->EnableAutoRecovery(false);
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put(Key(0), "val"));
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeOutputValidation", [&](void*) {
+        IOStatus st = IOStatus::IOError();
+        st.SetRetryable(true);
+        st.SetScope(IOStatus::IOErrorScope::kIOErrorScopeFile);
+        fault_fs_->SetFilesystemActive(false, st);
+      });
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeDeleteFile",
+      [&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  ASSERT_EQ(listener->WaitForRecovery(5000000), true);
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_IO_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
+  ASSERT_LE(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_COUNT));
+  ASSERT_LE(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
+  s = dbfull()->TEST_GetBGError();
+  ASSERT_OK(s);
+
+  Reopen(GetDefaultOptions());
+  ASSERT_EQ("val", Get(Key(0)));
+}
+
+TEST_F(DBErrorHandlingFSTest, AtomicFlushReadError) {
+  std::shared_ptr<ErrorHandlerFSListener> listener =
+      std::make_shared<ErrorHandlerFSListener>();
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.listeners.emplace_back(listener);
+  options.statistics = CreateDBStatistics();
+  Status s;
+
+  listener->EnableAutoRecovery(false);
+  options.atomic_flush = true;
+  CreateAndReopenWithCF({"pikachu"}, options);
+
+  ASSERT_OK(Put(0, Key(0), "val"));
+  ASSERT_OK(Put(1, Key(0), "val"));
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeOutputValidation", [&](void*) {
+        IOStatus st = IOStatus::IOError();
+        st.SetRetryable(true);
+        st.SetScope(IOStatus::IOErrorScope::kIOErrorScopeFile);
+        fault_fs_->SetFilesystemActive(false, st);
+      });
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeDeleteFile",
+      [&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush({0, 1});
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  ASSERT_EQ(listener->WaitForRecovery(5000000), true);
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_IO_ERROR_COUNT));
+  ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
+  ASSERT_LE(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_COUNT));
+  ASSERT_LE(0, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
+  s = dbfull()->TEST_GetBGError();
+  ASSERT_OK(s);
+
+  TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
+                              GetDefaultOptions());
+  ASSERT_EQ("val", Get(Key(0)));
+}
+
+TEST_F(DBErrorHandlingFSTest, AtomicFlushNoSpaceError) {
+  std::shared_ptr<ErrorHandlerFSListener> listener =
+      std::make_shared<ErrorHandlerFSListener>();
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.listeners.emplace_back(listener);
+  options.statistics = CreateDBStatistics();
+  Status s;
+
+  listener->EnableAutoRecovery(true);
+  options.atomic_flush = true;
+  CreateAndReopenWithCF({"pikachu"}, options);
+
+  ASSERT_OK(Put(0, Key(0), "val"));
+  ASSERT_OK(Put(1, Key(0), "val"));
+  SyncPoint::GetInstance()->SetCallBack("BuildTable:create_file", [&](void*) {
+    IOStatus st = IOStatus::NoSpace();
+    fault_fs_->SetFilesystemActive(false, st);
+  });
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeDeleteFile",
+      [&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush({0, 1});
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  fault_fs_->SetFilesystemActive(true);
+  ASSERT_EQ(listener->WaitForRecovery(5000000), true);
+  ASSERT_LE(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_ERROR_COUNT));
+  ASSERT_LE(1, options.statistics->getAndResetTickerCount(
+                   ERROR_HANDLER_BG_IO_ERROR_COUNT));
+  s = dbfull()->TEST_GetBGError();
+  ASSERT_OK(s);
+
+  TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
+                              GetDefaultOptions());
+  ASSERT_EQ("val", Get(Key(0)));
+}
+
+TEST_F(DBErrorHandlingFSTest, CompactionReadRetryableErrorAutoRecover) {
+  // In this test, in the first round of compaction, the FS is set to error.
+  // So the first compaction fails due to retryable IO error and it is mapped
+  // to soft error. Then, compaction is rescheduled, in the second round of
+  // compaction, the FS is set to active and compaction is successful, so
+  // the test will hit the CompactionJob::FinishCompactionOutputFile1 sync
+  // point.
+  std::shared_ptr<ErrorHandlerFSListener> listener =
+      std::make_shared<ErrorHandlerFSListener>();
+  Options options = GetDefaultOptions();
+  options.env = fault_env_.get();
+  options.create_if_missing = true;
+  options.level0_file_num_compaction_trigger = 2;
+  options.listeners.emplace_back(listener);
+  BlockBasedTableOptions table_options;
+  table_options.no_block_cache = true;
+  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+  Status s;
+  std::atomic<bool> fail_first(false);
+  std::atomic<bool> fail_second(true);
+  Random rnd(301);
+  DestroyAndReopen(options);
+
+  IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
+  error_msg.SetRetryable(true);
+
+  for (int i = 0; i < 100; ++i) {
+    ASSERT_OK(Put(Key(i), rnd.RandomString(1024)));
+  }
+  s = Flush();
+  ASSERT_OK(s);
+
+  listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
+  listener->EnableAutoRecovery(false);
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+        "BackgroundCallCompaction:0"},
+       {"CompactionJob::FinishCompactionOutputFile1",
+        "CompactionWriteRetryableErrorAutoRecover0"}});
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::BackgroundCompaction:Start",
+      [&](void*) { fault_fs_->SetFilesystemActive(true); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "BackgroundCallCompaction:0", [&](void*) { fail_first.store(true); });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "CompactionJob::Run():PausingManualCompaction:2", [&](void*) {
+        if (fail_first.load() && fail_second.load()) {
+          fault_fs_->SetFilesystemActive(false, error_msg);
+          fail_second.store(false);
+        }
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(Put(Key(1), "val"));
+  s = Flush();
+  ASSERT_OK(s);
+
+  s = dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(s);
+  TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  SyncPoint::GetInstance()->DisableProcessing();
+
+  Reopen(GetDefaultOptions());
+}
+
 class DBErrorHandlingFencingTest : public DBErrorHandlingFSTest,
                                    public testing::WithParamInterface<bool> {};
 
@@ -2185,7 +2728,7 @@ TEST_P(DBErrorHandlingFencingTest, ManifestWriteFenced) {
   old_manifest = GetManifestNameFromLiveFiles();
 
   ASSERT_OK(Put(Key(0), "val"));
-  Flush();
+  ASSERT_OK(Flush());
   ASSERT_OK(Put(Key(1), "val"));
   SyncPoint::GetInstance()->SetCallBack(
       "VersionSet::LogAndApply:WriteManifest", [&](void*) {
@@ -2218,7 +2761,7 @@ TEST_P(DBErrorHandlingFencingTest, CompactionWriteFenced) {
   ASSERT_OK(Put(Key(0), "va;"));
   ASSERT_OK(Put(Key(2), "va;"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   listener->EnableAutoRecovery(true);
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
@@ -2232,7 +2775,7 @@ TEST_P(DBErrorHandlingFencingTest, CompactionWriteFenced) {
 
   ASSERT_OK(Put(Key(1), "val"));
   s = Flush();
-  ASSERT_EQ(s, Status::OK());
+  ASSERT_OK(s);
 
   s = dbfull()->TEST_WaitForCompact();
   ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
@@ -2268,7 +2811,7 @@ TEST_P(DBErrorHandlingFencingTest, WALWriteFenced) {
 
     WriteOptions wopts;
     wopts.sync = true;
-    ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
+    ASSERT_OK(dbfull()->Write(wopts, &batch));
   };
 
   {