]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_sst_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_sst_test.cc
index 3ccb57baa5ba8042979362117cc90cbf1f223673..7f031444a4caa6a5579c20de8b2b1c042cd49dc1 100644 (file)
@@ -8,17 +8,20 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include "db/db_test_util.h"
+#include "env/mock_env.h"
 #include "file/sst_file_manager_impl.h"
 #include "port/port.h"
 #include "port/stack_trace.h"
+#include "rocksdb/cache.h"
 #include "rocksdb/sst_file_manager.h"
+#include "rocksdb/table.h"
 #include "util/random.h"
 
 namespace ROCKSDB_NAMESPACE {
 
 class DBSSTTest : public DBTestBase {
  public:
-  DBSSTTest() : DBTestBase("/db_sst_test", /*env_do_fsync=*/true) {}
+  DBSSTTest() : DBTestBase("db_sst_test", /*env_do_fsync=*/true) {}
 };
 
 #ifndef ROCKSDB_LITE
@@ -98,7 +101,7 @@ TEST_F(DBSSTTest, SSTsWithLdbSuffixHandling) {
   for (int i = 0; i < 10; ++i) {
     GenerateNewFile(&rnd, &key_id, false);
   }
-  Flush();
+  ASSERT_OK(Flush());
   Close();
   int const num_files = GetSstFileCount(dbname_);
   ASSERT_GT(num_files, 0);
@@ -170,7 +173,7 @@ TEST_F(DBSSTTest, DontDeleteMovedFile) {
     ASSERT_OK(Flush());
   }
   // this should execute both L0->L1 and L1->(move)->L2 compactions
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_EQ("0,0,1", FilesPerLevel(0));
 
   // If the moved file is actually deleted (the move-safeguard in
@@ -218,7 +221,7 @@ TEST_F(DBSSTTest, DeleteObsoleteFilesPendingOutputs) {
     ASSERT_OK(Flush());
   }
   // this should execute both L0->L1 and L1->(move)->L2 compactions
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_EQ("0,0,1", FilesPerLevel(0));
 
   test::SleepingBackgroundTask blocking_thread;
@@ -264,9 +267,9 @@ TEST_F(DBSSTTest, DeleteObsoleteFilesPendingOutputs) {
   // finish the flush!
   blocking_thread.WakeUp();
   blocking_thread.WaitUntilDone();
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   // File just flushed is too big for L0 and L1 so gets moved to L2.
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_EQ("0,0,1,0,1", FilesPerLevel(0));
 
   metadata.clear();
@@ -278,6 +281,58 @@ TEST_F(DBSSTTest, DeleteObsoleteFilesPendingOutputs) {
   listener->VerifyMatchedCount(1);
 }
 
+// Test that producing an empty .sst file does not write it out to
+// disk, and that the DeleteFile() env method is not called for
+// removing the non-existing file later.
+TEST_F(DBSSTTest, DeleteFileNotCalledForNotCreatedSSTFile) {
+  Options options = CurrentOptions();
+  options.env = env_;
+
+  OnFileDeletionListener* listener = new OnFileDeletionListener();
+  options.listeners.emplace_back(listener);
+
+  Reopen(options);
+
+  // Flush the empty database.
+  ASSERT_OK(Flush());
+  ASSERT_EQ("", FilesPerLevel(0));
+
+  // We expect no .sst files.
+  std::vector<LiveFileMetaData> metadata;
+  db_->GetLiveFilesMetaData(&metadata);
+  ASSERT_EQ(metadata.size(), 0U);
+
+  // We expect no file deletions.
+  listener->VerifyMatchedCount(0);
+}
+
+// Test that producing a non-empty .sst file does write it out to
+// disk, and that the DeleteFile() env method is not called for removing
+// the file later.
+TEST_F(DBSSTTest, DeleteFileNotCalledForCreatedSSTFile) {
+  Options options = CurrentOptions();
+  options.env = env_;
+
+  OnFileDeletionListener* listener = new OnFileDeletionListener();
+  options.listeners.emplace_back(listener);
+
+  Reopen(options);
+
+  ASSERT_OK(Put("pika", "choo"));
+
+  // Flush the non-empty database.
+  ASSERT_OK(Flush());
+  ASSERT_EQ("1", FilesPerLevel(0));
+
+  // We expect 1 .sst files.
+  std::vector<LiveFileMetaData> metadata;
+  db_->GetLiveFilesMetaData(&metadata);
+  ASSERT_EQ(metadata.size(), 1U);
+
+  // We expect no file deletions.
+  listener->VerifyMatchedCount(0);
+}
+
 TEST_F(DBSSTTest, DBWithSstFileManager) {
   std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
   auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
@@ -302,17 +357,18 @@ TEST_F(DBSSTTest, DBWithSstFileManager) {
   for (int i = 0; i < 25; i++) {
     GenerateNewRandomFile(&rnd);
     ASSERT_OK(Flush());
-    dbfull()->TEST_WaitForFlushMemTable();
-    dbfull()->TEST_WaitForCompact();
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
     // Verify that we are tracking all sst files in dbname_
     std::unordered_map<std::string, uint64_t> files_in_db;
-    ASSERT_OK(GetAllSSTFiles(&files_in_db));
+    ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
     ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
   }
   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   std::unordered_map<std::string, uint64_t> files_in_db;
-  ASSERT_OK(GetAllSSTFiles(&files_in_db));
+  ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
   // Verify that we are tracking all sst files in dbname_
   ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
   // Verify the total files size
@@ -346,7 +402,272 @@ TEST_F(DBSSTTest, DBWithSstFileManager) {
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
-TEST_F(DBSSTTest, RateLimitedDelete) {
+TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
+  std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+  auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+
+  int files_added = 0;
+  int files_deleted = 0;
+  int files_moved = 0;
+  int files_scheduled_to_delete = 0;
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnAddFile", [&](void* arg) {
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (file_path->find(".blob") != std::string::npos) {
+          files_added++;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (file_path->find(".blob") != std::string::npos) {
+          files_deleted++;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
+        assert(arg);
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (file_path->find(".blob") != std::string::npos) {
+          ++files_scheduled_to_delete;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  Options options = CurrentOptions();
+  options.sst_file_manager = sst_file_manager;
+  options.enable_blob_files = true;
+  options.blob_file_size = 32;  // create one blob per file
+  DestroyAndReopen(options);
+  Random rnd(301);
+
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(Put("Key_" + std::to_string(i), "Value_" + std::to_string(i)));
+    ASSERT_OK(Flush());
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+    // Verify that we are tracking all sst and blob files in dbname_
+    std::unordered_map<std::string, uint64_t> files_in_db;
+    ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
+    ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
+    ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+  }
+
+  std::vector<uint64_t> blob_files = GetBlobFileNumbers();
+  ASSERT_EQ(files_added, blob_files.size());
+  // No blob file is obsoleted.
+  ASSERT_EQ(files_deleted, 0);
+  ASSERT_EQ(files_scheduled_to_delete, 0);
+  // No files were moved.
+  ASSERT_EQ(files_moved, 0);
+
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  std::unordered_map<std::string, uint64_t> files_in_db;
+  ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
+  ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
+
+  // Verify that we are tracking all sst and blob files in dbname_
+  ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+  // Verify the total files size
+  uint64_t total_files_size = 0;
+  for (auto& file_to_size : files_in_db) {
+    total_files_size += file_to_size.second;
+  }
+  ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
+  Close();
+
+  Reopen(options);
+  ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+  ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
+
+  // Verify that we track all the files again after the DB is closed and opened.
+  Close();
+
+  sst_file_manager.reset(NewSstFileManager(env_));
+  options.sst_file_manager = sst_file_manager;
+  sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+
+  Reopen(options);
+
+  ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+  ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
+
+  // Destroy DB and it will remove all the blob files from sst file manager and
+  // blob files deletion will go through ScheduleFileDeletion.
+  ASSERT_EQ(files_deleted, 0);
+  ASSERT_EQ(files_scheduled_to_delete, 0);
+  Close();
+  ASSERT_OK(DestroyDB(dbname_, options));
+  ASSERT_EQ(files_deleted, blob_files.size());
+  ASSERT_EQ(files_scheduled_to_delete, blob_files.size());
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFilesWithGC) {
+  std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+  auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+  Options options = CurrentOptions();
+  options.sst_file_manager = sst_file_manager;
+  options.enable_blob_files = true;
+  options.blob_file_size = 32;  // create one blob per file
+  options.disable_auto_compactions = true;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0.5;
+
+  int files_added = 0;
+  int files_deleted = 0;
+  int files_moved = 0;
+  int files_scheduled_to_delete = 0;
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnAddFile", [&](void* arg) {
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (file_path->find(".blob") != std::string::npos) {
+          files_added++;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (file_path->find(".blob") != std::string::npos) {
+          files_deleted++;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
+        assert(arg);
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (file_path->find(".blob") != std::string::npos) {
+          ++files_scheduled_to_delete;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  DestroyAndReopen(options);
+  Random rnd(301);
+
+  constexpr char first_key[] = "first_key";
+  constexpr char first_value[] = "first_value";
+  constexpr char second_key[] = "second_key";
+  constexpr char second_value[] = "second_value";
+
+  ASSERT_OK(Put(first_key, first_value));
+  ASSERT_OK(Put(second_key, second_value));
+  ASSERT_OK(Flush());
+
+  constexpr char third_key[] = "third_key";
+  constexpr char third_value[] = "third_value";
+  constexpr char fourth_key[] = "fourth_key";
+  constexpr char fourth_value[] = "fourth_value";
+  constexpr char fifth_key[] = "fifth_key";
+  constexpr char fifth_value[] = "fifth_value";
+
+  ASSERT_OK(Put(third_key, third_value));
+  ASSERT_OK(Put(fourth_key, fourth_value));
+  ASSERT_OK(Put(fifth_key, fifth_value));
+  ASSERT_OK(Flush());
+
+  const std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
+
+  ASSERT_EQ(original_blob_files.size(), 5);
+  ASSERT_EQ(files_added, 5);
+  ASSERT_EQ(files_deleted, 0);
+  ASSERT_EQ(files_scheduled_to_delete, 0);
+  ASSERT_EQ(files_moved, 0);
+  {
+    // Verify that we are tracking all sst and blob files in dbname_
+    std::unordered_map<std::string, uint64_t> files_in_db;
+    ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
+    ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
+    ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+  }
+
+  const size_t cutoff_index = static_cast<size_t>(
+      options.blob_garbage_collection_age_cutoff * original_blob_files.size());
+
+  size_t expected_number_of_files = original_blob_files.size();
+  // Note: turning off enable_blob_files before the compaction results in
+  // garbage collected values getting inlined.
+  ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
+  expected_number_of_files -= cutoff_index;
+  files_added = 0;
+
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  sfm->WaitForEmptyTrash();
+
+  ASSERT_EQ(Get(first_key), first_value);
+  ASSERT_EQ(Get(second_key), second_value);
+  ASSERT_EQ(Get(third_key), third_value);
+  ASSERT_EQ(Get(fourth_key), fourth_value);
+  ASSERT_EQ(Get(fifth_key), fifth_value);
+
+  const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
+
+  ASSERT_EQ(new_blob_files.size(), expected_number_of_files);
+  // No new file is added.
+  ASSERT_EQ(files_added, 0);
+  ASSERT_EQ(files_deleted, cutoff_index);
+  ASSERT_EQ(files_scheduled_to_delete, cutoff_index);
+  ASSERT_EQ(files_moved, 0);
+
+  // Original blob files below the cutoff should be gone, original blob files at
+  // or above the cutoff should be still there
+  for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
+    ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
+  }
+
+  {
+    // Verify that we are tracking all sst and blob files in dbname_
+    std::unordered_map<std::string, uint64_t> files_in_db;
+    ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
+    ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
+    ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+  }
+
+  Close();
+  ASSERT_OK(DestroyDB(dbname_, options));
+  sfm->WaitForEmptyTrash();
+  ASSERT_EQ(files_deleted, 5);
+  ASSERT_EQ(files_scheduled_to_delete, 5);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+class DBSSTTestRateLimit : public DBSSTTest,
+                           public ::testing::WithParamInterface<bool> {
+ public:
+  DBSSTTestRateLimit() : DBSSTTest() {}
+  ~DBSSTTestRateLimit() override {}
+};
+
+TEST_P(DBSSTTestRateLimit, RateLimitedDelete) {
   Destroy(last_options_);
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
       {"DBSSTTest::RateLimitedDelete:1",
@@ -374,13 +695,25 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
         *abs_time_us = Env::Default()->NowMicros();
       });
 
+  // Disable PeriodicTaskScheduler as it also has TimedWait, which could update
+  // the simulated sleep time
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::StartPeriodicTaskScheduler:DisableScheduler", [&](void* arg) {
+        bool* disable_scheduler = static_cast<bool*>(arg);
+        *disable_scheduler = true;
+      });
+
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
+  bool different_wal_dir = GetParam();
   Options options = CurrentOptions();
   SetTimeElapseOnlySleepOnReopen(&options);
   options.disable_auto_compactions = true;
   options.env = env_;
   options.statistics = CreateDBStatistics();
+  if (different_wal_dir) {
+    options.wal_dir = alternative_wal_dir_;
+  }
 
   int64_t rate_bytes_per_sec = 1024 * 10;  // 10 Kbs / Sec
   Status s;
@@ -392,8 +725,10 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
   sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
 
   WriteOptions wo;
-  wo.disableWAL = true;
-  ASSERT_OK(TryReopen(options));
+  if (!different_wal_dir) {
+    wo.disableWAL = true;
+  }
+  Reopen(options);
   // Create 4 files in L0
   for (char v = 'a'; v <= 'd'; v++) {
     ASSERT_OK(Put("Key2", DummyString(1024, v), wo));
@@ -437,6 +772,9 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
+INSTANTIATE_TEST_CASE_P(RateLimitedDelete, DBSSTTestRateLimit,
+                        ::testing::Bool());
+
 TEST_F(DBSSTTest, RateLimitedWALDelete) {
   Destroy(last_options_);
 
@@ -489,10 +827,11 @@ TEST_F(DBSSTTest, RateLimitedWALDelete) {
 }
 
 class DBWALTestWithParam
-    : public DBSSTTest,
+    : public DBTestBase,
       public testing::WithParamInterface<std::tuple<std::string, bool>> {
  public:
-  DBWALTestWithParam() {
+  explicit DBWALTestWithParam()
+      : DBTestBase("db_wal_test_with_params", /*env_do_fsync=*/true) {
     wal_dir_ = std::get<0>(GetParam());
     wal_dir_same_as_dbname_ = std::get<1>(GetParam());
   }
@@ -505,8 +844,8 @@ TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
   class MyEnv : public EnvWrapper {
    public:
     MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}
-
-    Status DeleteFile(const std::string& fname) {
+    const char* Name() const override { return "MyEnv"; }
+    Status DeleteFile(const std::string& fname) override {
       if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
         return Status::OK();
       }
@@ -540,10 +879,17 @@ TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
   auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
   sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
 
-  ASSERT_OK(TryReopen(options));
+  Reopen(options);
 
   // Create 4 files in L0
   for (char v = 'a'; v <= 'd'; v++) {
+    if (v == 'c') {
+      // Maximize the change that the last log file will be preserved in trash
+      // before restarting the DB.
+      // We have to set this on the 2nd to last file for it to delay deletion
+      // on the last file. (Quirk of DeleteScheduler::BackgroundEmptyTrash())
+      options.sst_file_manager->SetDeleteRateBytesPerSecond(1);
+    }
     ASSERT_OK(Put("Key2", DummyString(1024, v)));
     ASSERT_OK(Put("Key3", DummyString(1024, v)));
     ASSERT_OK(Put("Key4", DummyString(1024, v)));
@@ -562,11 +908,11 @@ TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
   if (!wal_dir_same_as_dbname_) {
     // Forcibly create some trash log files
     std::unique_ptr<WritableFile> result;
-    env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
-                         EnvOptions());
+    ASSERT_OK(env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
+                                   EnvOptions()));
     result.reset();
   }
-  env->GetChildren(options.wal_dir, &filenames);
+  ASSERT_OK(env->GetChildren(options.wal_dir, &filenames));
   for (const std::string& fname : filenames) {
     if (fname.find(".log.trash") != std::string::npos) {
       trash_log_count++;
@@ -575,11 +921,11 @@ TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
   ASSERT_GE(trash_log_count, 1);
 
   env->set_fake_log_delete(false);
-  ASSERT_OK(TryReopen(options));
+  Reopen(options);
 
   filenames.clear();
   trash_log_count = 0;
-  env->GetChildren(options.wal_dir, &filenames);
+  ASSERT_OK(env->GetChildren(options.wal_dir, &filenames));
   for (const std::string& fname : filenames) {
     if (fname.find(".log.trash") != std::string::npos) {
       trash_log_count++;
@@ -603,20 +949,19 @@ TEST_F(DBSSTTest, OpenDBWithExistingTrash) {
   Destroy(last_options_);
 
   // Add some trash files to the db directory so the DB can clean them up
-  env_->CreateDirIfMissing(dbname_);
+  ASSERT_OK(env_->CreateDirIfMissing(dbname_));
   ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "001.sst.trash"));
   ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "002.sst.trash"));
   ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "003.sst.trash"));
 
   // Reopen the DB and verify that it deletes existing trash files
-  ASSERT_OK(TryReopen(options));
+  Reopen(options);
   sfm->WaitForEmptyTrash();
   ASSERT_NOK(env_->FileExists(dbname_ + "/" + "001.sst.trash"));
   ASSERT_NOK(env_->FileExists(dbname_ + "/" + "002.sst.trash"));
   ASSERT_NOK(env_->FileExists(dbname_ + "/" + "003.sst.trash"));
 }
 
-
 // Create a DB with 2 db_paths, and generate multiple files in the 2
 // db_paths using CompactRangeOptions, make sure that files that were
 // deleted from first db_path were deleted using DeleteScheduler and
@@ -654,7 +999,7 @@ TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
 
   // Create 4 files in L0
   for (int i = 0; i < 4; i++) {
-    ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A'), wo));
+    ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'A'), wo));
     ASSERT_OK(Flush());
   }
   // We created 4 sst files in L0
@@ -670,7 +1015,7 @@ TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
 
   // Create 4 files in L0
   for (int i = 4; i < 8; i++) {
-    ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B'), wo));
+    ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'B'), wo));
     ASSERT_OK(Flush());
   }
   ASSERT_EQ("4,1", FilesPerLevel(0));
@@ -716,7 +1061,7 @@ TEST_F(DBSSTTest, DestroyDBWithRateLimitedDelete) {
 
   // Create 4 files in L0
   for (int i = 0; i < 4; i++) {
-    ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A')));
+    ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'A')));
     ASSERT_OK(Flush());
   }
   // We created 4 sst files in L0
@@ -728,7 +1073,7 @@ TEST_F(DBSSTTest, DestroyDBWithRateLimitedDelete) {
   int num_sst_files = 0;
   int num_wal_files = 0;
   std::vector<std::string> db_files;
-  env_->GetChildren(dbname_, &db_files);
+  ASSERT_OK(env_->GetChildren(dbname_, &db_files));
   for (std::string f : db_files) {
     if (f.substr(f.find_last_of(".") + 1) == "sst") {
       num_sst_files++;
@@ -742,7 +1087,9 @@ TEST_F(DBSSTTest, DestroyDBWithRateLimitedDelete) {
   auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
 
   sfm->SetDeleteRateBytesPerSecond(1024 * 1024);
-  sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
+  // Set an extra high trash ratio to prevent immediate/non-rate limited
+  // deletions
+  sfm->delete_scheduler()->SetMaxTrashDBRatio(1000.0);
   ASSERT_OK(DestroyDB(dbname_, options));
   sfm->WaitForEmptyTrash();
   ASSERT_EQ(bg_delete_file, num_sst_files + num_wal_files);
@@ -767,7 +1114,7 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
 
   uint64_t first_file_size = 0;
   std::unordered_map<std::string, uint64_t> files_in_db;
-  ASSERT_OK(GetAllSSTFiles(&files_in_db, &first_file_size));
+  ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &first_file_size));
   ASSERT_EQ(sfm->GetTotalSize(), first_file_size);
 
   // Set the maximum allowed space usage to the current total size
@@ -778,6 +1125,68 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
   ASSERT_NOK(Flush());
 }
 
+TEST_F(DBSSTTest, DBWithMaxSpaceAllowedWithBlobFiles) {
+  std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+  auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+
+  Options options = CurrentOptions();
+  options.sst_file_manager = sst_file_manager;
+  options.disable_auto_compactions = true;
+  options.enable_blob_files = true;
+  DestroyAndReopen(options);
+
+  Random rnd(301);
+
+  // Generate a file containing keys.
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
+  }
+  ASSERT_OK(Flush());
+
+  uint64_t files_size = 0;
+  uint64_t total_files_size = 0;
+  std::unordered_map<std::string, uint64_t> files_in_db;
+
+  ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db, &files_size));
+  // Make sure blob files are considered by SSTFileManage in size limits.
+  ASSERT_GT(files_size, 0);
+  total_files_size = files_size;
+  ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &files_size));
+  total_files_size += files_size;
+  ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
+
+  // Set the maximum allowed space usage to the current total size.
+  sfm->SetMaxAllowedSpaceUsage(total_files_size + 1);
+
+  bool max_allowed_space_reached = false;
+  bool delete_blob_file = false;
+  // Sync point called after blob file is closed and max allowed space is
+  // checked.
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "BlobFileCompletionCallback::CallBack::MaxAllowedSpaceReached",
+      [&](void* /*arg*/) { max_allowed_space_reached = true; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable::AfterDeleteFile",
+      [&](void* /*arg*/) { delete_blob_file = true; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+      {
+          "BuildTable::AfterDeleteFile",
+          "DBSSTTest::DBWithMaxSpaceAllowedWithBlobFiles:1",
+      },
+  });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(Put("key1", "val1"));
+  // This flush will fail
+  ASSERT_NOK(Flush());
+  ASSERT_TRUE(max_allowed_space_reached);
+
+  TEST_SYNC_POINT("DBSSTTest::DBWithMaxSpaceAllowedWithBlobFiles:1");
+  ASSERT_TRUE(delete_blob_file);
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
 TEST_F(DBSSTTest, CancellingCompactionsWorks) {
   std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
   auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
@@ -808,7 +1217,7 @@ TEST_F(DBSSTTest, CancellingCompactionsWorks) {
   ASSERT_OK(Flush());
   uint64_t total_file_size = 0;
   std::unordered_map<std::string, uint64_t> files_in_db;
-  ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
+  ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size));
   // Set the maximum allowed space usage to the current total size
   sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
 
@@ -817,14 +1226,16 @@ TEST_F(DBSSTTest, CancellingCompactionsWorks) {
     ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
   }
   ASSERT_OK(Flush());
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 
   // Because we set a callback in CancelledCompaction, we actually
   // let the compaction run
   ASSERT_GT(completed_compactions, 0);
   ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
   // Make sure the stat is bumped
-  ASSERT_GT(dbfull()->immutable_db_options().statistics.get()->getTickerCount(COMPACTION_CANCELLED), 0);
+  ASSERT_GT(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
+                COMPACTION_CANCELLED),
+            0);
   ASSERT_EQ(0,
             dbfull()->immutable_db_options().statistics.get()->getTickerCount(
                 FILES_MARKED_TRASH));
@@ -856,7 +1267,7 @@ TEST_F(DBSSTTest, CancellingManualCompactionsWorks) {
   ASSERT_OK(Flush());
   uint64_t total_file_size = 0;
   std::unordered_map<std::string, uint64_t> files_in_db;
-  ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
+  ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size));
   // Set the maximum allowed space usage to the current total size
   sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
 
@@ -867,10 +1278,12 @@ TEST_F(DBSSTTest, CancellingManualCompactionsWorks) {
   ASSERT_OK(Flush());
 
   // OK, now trigger a manual compaction
-  dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+  ASSERT_TRUE(dbfull()
+                  ->CompactRange(CompactRangeOptions(), nullptr, nullptr)
+                  .IsCompactionTooLarge());
 
   // Wait for manual compaction to get scheduled and finish
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 
   ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
   // Make sure the stat is bumped
@@ -880,10 +1293,13 @@ TEST_F(DBSSTTest, CancellingManualCompactionsWorks) {
 
   // Now make sure CompactFiles also gets cancelled
   auto l0_files = collector->GetFlushedFiles();
-  dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0);
+  ASSERT_TRUE(
+      dbfull()
+          ->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0)
+          .IsCompactionTooLarge());
 
   // Wait for manual compaction to get scheduled and finish
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 
   ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
                 COMPACTION_CANCELLED),
@@ -898,8 +1314,9 @@ TEST_F(DBSSTTest, CancellingManualCompactionsWorks) {
       "CompactFilesImpl:End", [&](void* /*arg*/) { completed_compactions++; });
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-  dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0);
-  dbfull()->TEST_WaitForCompact(true);
+  ASSERT_OK(dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
+                                   l0_files, 0));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
 
   ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
   ASSERT_GT(completed_compactions, 0);
@@ -967,7 +1384,7 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
     ASSERT_TRUE(bg_error_set);
     uint64_t total_sst_files_size = 0;
     std::unordered_map<std::string, uint64_t> files_in_db;
-    ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_sst_files_size));
+    ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_sst_files_size));
     ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024);
     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
   }
@@ -1003,7 +1420,7 @@ TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFiles) {
     CompactRangeOptions compact_options;
     compact_options.change_level = true;
     compact_options.target_level = 2;
-    db_->CompactRange(compact_options, nullptr, nullptr);
+    ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
 
     // Create 12 Files in L0
     for (int i = 0; i < 12; i++) {
@@ -1032,6 +1449,78 @@ TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFiles) {
   }
 }
 
+TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFilesSubjectToMemoryLimit) {
+  for (CacheEntryRoleOptions::Decision charge_table_reader :
+       {CacheEntryRoleOptions::Decision::kEnabled,
+        CacheEntryRoleOptions::Decision::kDisabled}) {
+    // Open DB with infinite max open files
+    //  - First iteration use 1 thread to open files
+    //  - Second iteration use 5 threads to open files
+    for (int iter = 0; iter < 2; iter++) {
+      Options options;
+      options.create_if_missing = true;
+      options.write_buffer_size = 100000;
+      options.disable_auto_compactions = true;
+      options.max_open_files = -1;
+
+      BlockBasedTableOptions table_options;
+      options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+      if (iter == 0) {
+        options.max_file_opening_threads = 1;
+      } else {
+        options.max_file_opening_threads = 5;
+      }
+
+      DestroyAndReopen(options);
+
+      // Create 5 Files in L0 (then move then to L2)
+      for (int i = 0; i < 5; i++) {
+        std::string k = "L2_" + Key(i);
+        ASSERT_OK(Put(k, k + std::string(1000, 'a')));
+        ASSERT_OK(Flush()) << i;
+      }
+      CompactRangeOptions compact_options;
+      compact_options.change_level = true;
+      compact_options.target_level = 2;
+      ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
+
+      // Create 5 Files in L0
+      for (int i = 0; i < 5; i++) {
+        std::string k = "L0_" + Key(i);
+        ASSERT_OK(Put(k, k + std::string(1000, 'a')));
+        ASSERT_OK(Flush());
+      }
+      Close();
+
+      table_options.cache_usage_options.options_overrides.insert(
+          {CacheEntryRole::kBlockBasedTableReader,
+           {/*.charged = */ charge_table_reader}});
+      table_options.block_cache =
+          NewLRUCache(1024 /* capacity */, 0 /* num_shard_bits */,
+                      true /* strict_capacity_limit */);
+      options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+      // Reopening the DB will try to load all existing files, conditionally
+      // subject to memory limit
+      Status s = TryReopen(options);
+
+      if (charge_table_reader == CacheEntryRoleOptions::Decision::kEnabled) {
+        EXPECT_TRUE(s.IsMemoryLimit());
+        EXPECT_TRUE(s.ToString().find(
+                        kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
+                            CacheEntryRole::kBlockBasedTableReader)]) !=
+                    std::string::npos);
+        EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") !=
+                    std::string::npos);
+
+      } else {
+        EXPECT_TRUE(s.ok());
+        ASSERT_EQ("5,0,5", FilesPerLevel(0));
+      }
+    }
+  }
+}
+
 TEST_F(DBSSTTest, GetTotalSstFilesSize) {
   // We don't propagate oldest-key-time table property on compaction and
   // just write 0 as default value. This affect the exact table size, since
@@ -1052,10 +1541,10 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) {
   // Generate 5 files in L0
   for (int i = 0; i < 5; i++) {
     for (int j = 0; j < 10; j++) {
-      std::string val = "val_file_" + ToString(i);
+      std::string val = "val_file_" + std::to_string(i);
       ASSERT_OK(Put(Key(j), val));
     }
-    Flush();
+    ASSERT_OK(Flush());
   }
   ASSERT_EQ("5", FilesPerLevel(0));
 
@@ -1079,6 +1568,7 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) {
 
   // hold current version
   std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
+  ASSERT_OK(iter1->status());
 
   // Compact 5 files into 1 file in L0
   ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
@@ -1102,12 +1592,13 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) {
 
   // hold current version
   std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));
+  ASSERT_OK(iter2->status());
 
   // Delete all keys and compact, this will delete all live files
   for (int i = 0; i < 10; i++) {
     ASSERT_OK(Delete(Key(i)));
   }
-  Flush();
+  ASSERT_OK(Flush());
   ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
   ASSERT_EQ("", FilesPerLevel(0));
 
@@ -1121,6 +1612,7 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) {
   // Total SST files = 6 (5 original files + compacted file)
   ASSERT_EQ(total_sst_files_size, 6 * single_file_size);
 
+  ASSERT_OK(iter1->status());
   iter1.reset();
   ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
                                        &total_sst_files_size));
@@ -1128,6 +1620,7 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) {
   // Total SST files = 1 (compacted file)
   ASSERT_EQ(total_sst_files_size, 1 * single_file_size);
 
+  ASSERT_OK(iter2->status());
   iter2.reset();
   ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
                                        &total_sst_files_size));
@@ -1138,6 +1631,45 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) {
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
+TEST_F(DBSSTTest, OpenDBWithoutGetFileSizeInvocations) {
+  Options options = CurrentOptions();
+  std::unique_ptr<MockEnv> env{MockEnv::Create(Env::Default())};
+  options.env = env.get();
+  options.disable_auto_compactions = true;
+  options.compression = kNoCompression;
+  options.enable_blob_files = true;
+  options.blob_file_size = 32;  // create one blob per file
+  options.skip_checking_sst_file_sizes_on_db_open = true;
+
+  DestroyAndReopen(options);
+  // Generate 5 files in L0
+  for (int i = 0; i < 5; i++) {
+    for (int j = 0; j < 10; j++) {
+      std::string val = "val_file_" + std::to_string(i);
+      ASSERT_OK(Put(Key(j), val));
+    }
+    ASSERT_OK(Flush());
+  }
+  Close();
+
+  bool is_get_file_size_called = false;
+  SyncPoint::GetInstance()->SetCallBack(
+      "MockFileSystem::GetFileSize:CheckFileType", [&](void* arg) {
+        std::string* filename = reinterpret_cast<std::string*>(arg);
+        if (filename->find(".blob") != std::string::npos) {
+          is_get_file_size_called = true;
+        }
+      });
+
+  SyncPoint::GetInstance()->EnableProcessing();
+  Reopen(options);
+  ASSERT_FALSE(is_get_file_size_called);
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+
+  Destroy(options);
+}
+
 TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
   Options options = CurrentOptions();
   options.disable_auto_compactions = true;
@@ -1146,7 +1678,7 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
   // Generate 5 files in L0
   for (int i = 0; i < 5; i++) {
     ASSERT_OK(Put(Key(i), "val"));
-    Flush();
+    ASSERT_OK(Flush());
   }
   ASSERT_EQ("5", FilesPerLevel(0));
 
@@ -1171,6 +1703,7 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
 
   // hold current version
   std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
+  ASSERT_OK(iter1->status());
 
   // Compaction will do trivial move from L0 to L1
   ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
@@ -1194,12 +1727,13 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
 
   // hold current version
   std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));
+  ASSERT_OK(iter2->status());
 
   // Delete all keys and compact, this will delete all live files
   for (int i = 0; i < 5; i++) {
     ASSERT_OK(Delete(Key(i)));
   }
-  Flush();
+  ASSERT_OK(Flush());
   ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
   ASSERT_EQ("", FilesPerLevel(0));
 
@@ -1213,7 +1747,9 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
   // Total SST files = 5 (used in 2 version)
   ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
 
+  ASSERT_OK(iter1->status());
   iter1.reset();
+  ASSERT_OK(iter2->status());
   iter2.reset();
 
   ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
@@ -1223,6 +1759,103 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
   ASSERT_EQ(total_sst_files_size, 0);
 }
 
+// This test if blob files are recorded by SST File Manager when Compaction job
+// creates/delete them and in case of AtomicFlush.
+TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) {
+  std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+  auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+  Options options = CurrentOptions();
+  options.sst_file_manager = sst_file_manager;
+  options.enable_blob_files = true;
+  options.min_blob_size = 0;
+  options.disable_auto_compactions = true;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0.5;
+  options.atomic_flush = true;
+
+  int files_added = 0;
+  int files_deleted = 0;
+  int files_scheduled_to_delete = 0;
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnAddFile", [&](void* arg) {
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (EndsWith(*file_path, ".blob")) {
+          files_added++;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (EndsWith(*file_path, ".blob")) {
+          files_deleted++;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
+        assert(arg);
+        const std::string* const file_path =
+            static_cast<const std::string*>(arg);
+        if (EndsWith(*file_path, ".blob")) {
+          ++files_scheduled_to_delete;
+        }
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  DestroyAndReopen(options);
+  Random rnd(301);
+
+  ASSERT_OK(Put("key_1", "value_1"));
+  ASSERT_OK(Put("key_2", "value_2"));
+  ASSERT_OK(Put("key_3", "value_3"));
+  ASSERT_OK(Put("key_4", "value_4"));
+  ASSERT_OK(Flush());
+
+  // Overwrite will create the garbage data.
+  ASSERT_OK(Put("key_3", "new_value_3"));
+  ASSERT_OK(Put("key_4", "new_value_4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key5", "blob_value5"));
+  ASSERT_OK(Put("Key6", "blob_value6"));
+  ASSERT_OK(Flush());
+
+  ASSERT_EQ(files_added, 3);
+  ASSERT_EQ(files_deleted, 0);
+  ASSERT_EQ(files_scheduled_to_delete, 0);
+  files_added = 0;
+
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+  // Compaction job will create a new file and delete the older files.
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  ASSERT_EQ(files_added, 1);
+  ASSERT_EQ(files_scheduled_to_delete, 1);
+
+  sfm->WaitForEmptyTrash();
+
+  ASSERT_EQ(files_deleted, 1);
+
+  Close();
+  ASSERT_OK(DestroyDB(dbname_, options));
+
+  ASSERT_EQ(files_scheduled_to_delete, 4);
+
+  sfm->WaitForEmptyTrash();
+
+  ASSERT_EQ(files_deleted, 4);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
 #endif  // ROCKSDB_LITE
 
 }  // namespace ROCKSDB_NAMESPACE
@@ -1230,5 +1863,6 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
 int main(int argc, char** argv) {
   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   ::testing::InitGoogleTest(&argc, argv);
+  RegisterCustomObjects(argc, argv);
   return RUN_ALL_TESTS();
 }