]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/listener_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / listener_test.cc
index 1712a5be89b6a3f09507cc7c88f796255dd6657c..160866bb774f348b1653d6a6841567a69cdb8220 100644 (file)
@@ -10,7 +10,6 @@
 #include "db/version_set.h"
 #include "db/write_batch_internal.h"
 #include "file/filename.h"
-#include "memtable/hash_linklist_rep.h"
 #include "monitoring/statistics.h"
 #include "rocksdb/cache.h"
 #include "rocksdb/compaction_filter.h"
@@ -38,7 +37,7 @@ namespace ROCKSDB_NAMESPACE {
 
 class EventListenerTest : public DBTestBase {
  public:
-  EventListenerTest() : DBTestBase("/listener_test", /*env_do_fsync=*/true) {}
+  EventListenerTest() : DBTestBase("listener_test", /*env_do_fsync=*/true) {}
 
   static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
                              uint64_t size) {
@@ -90,7 +89,7 @@ class TestCompactionListener : public EventListener {
  public:
   explicit TestCompactionListener(EventListenerTest* test) : test_(test) {}
 
-  void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override {
+  void OnCompactionCompleted(DBdb, const CompactionJobInfo& ci) override {
     std::lock_guard<std::mutex> lock(mutex_);
     compacted_dbs_.push_back(db);
     ASSERT_GT(ci.input_files.size(), 0U);
@@ -173,9 +172,9 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
 
   TestCompactionListener* listener = new TestCompactionListener(this);
   options.listeners.emplace_back(listener);
-  std::vector<std::string> cf_names = {
-      "pikachu", "ilya", "muromec", "dobrynia",
-      "nikitich", "alyosha", "popovich"};
+  std::vector<std::string> cf_names = {"pikachu",  "ilya",     "muromec",
+                                       "dobrynia", "nikitich", "alyosha",
+                                       "popovich"};
   CreateAndReopenWithCF(cf_names, options);
   ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
 
@@ -192,10 +191,10 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
   ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
   for (int i = 1; i < 8; ++i) {
     ASSERT_OK(Flush(i));
-    dbfull()->TEST_WaitForFlushMemTable();
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
     ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
                                      nullptr, nullptr));
-    dbfull()->TEST_WaitForCompact();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
   }
 
   ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size());
@@ -211,8 +210,11 @@ class TestFlushListener : public EventListener {
       : slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
     db_closed = false;
   }
-  void OnTableFileCreated(
-      const TableFileCreationInfo& info) override {
+
+  virtual ~TestFlushListener() {
+    prev_fc_info_.status.PermitUncheckedError();  // Ignore the status
+  }
+  void OnTableFileCreated(const TableFileCreationInfo& info) override {
     // remember the info for later checking the FlushJobInfo.
     prev_fc_info_ = info;
     ASSERT_GT(info.db_name.size(), 0U);
@@ -247,8 +249,7 @@ class TestFlushListener : public EventListener {
 #endif  // ROCKSDB_USING_THREAD_STATUS
   }
 
-  void OnFlushCompleted(
-      DB* db, const FlushJobInfo& info) override {
+  void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
     flushed_dbs_.push_back(db);
     flushed_column_family_names_.push_back(info.cf_name);
     if (info.triggered_writes_slowdown) {
@@ -271,6 +272,9 @@ class TestFlushListener : public EventListener {
     ASSERT_TRUE(test_);
     if (db == test_->db_) {
       std::vector<std::vector<FileMetaData>> files_by_level;
+      ASSERT_LT(info.cf_id, test_->handles_.size());
+      ASSERT_GE(info.cf_id, 0u);
+      ASSERT_NE(test_->handles_[info.cf_id], nullptr);
       test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[info.cf_id],
                                              &files_by_level);
 
@@ -311,9 +315,9 @@ TEST_F(EventListenerTest, OnSingleDBFlushTest) {
 #endif  // ROCKSDB_USING_THREAD_STATUS
   TestFlushListener* listener = new TestFlushListener(options.env, this);
   options.listeners.emplace_back(listener);
-  std::vector<std::string> cf_names = {
-      "pikachu", "ilya", "muromec", "dobrynia",
-      "nikitich", "alyosha", "popovich"};
+  std::vector<std::string> cf_names = {"pikachu",  "ilya",     "muromec",
+                                       "dobrynia", "nikitich", "alyosha",
+                                       "popovich"};
   options.table_properties_collector_factories.push_back(
       std::make_shared<TestPropertiesCollectorFactory>());
   CreateAndReopenWithCF(cf_names, options);
@@ -333,7 +337,10 @@ TEST_F(EventListenerTest, OnSingleDBFlushTest) {
   ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
   for (int i = 1; i < 8; ++i) {
     ASSERT_OK(Flush(i));
-    dbfull()->TEST_WaitForFlushMemTable();
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+    // Ensure background work is fully finished including listener callbacks
+    // before accessing listener state.
+    ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
     ASSERT_EQ(listener->flushed_dbs_.size(), i);
     ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
   }
@@ -352,32 +359,48 @@ TEST_F(EventListenerTest, MultiCF) {
 #ifdef ROCKSDB_USING_THREAD_STATUS
   options.enable_thread_tracking = true;
 #endif  // ROCKSDB_USING_THREAD_STATUS
-  TestFlushListener* listener = new TestFlushListener(options.env, this);
-  options.listeners.emplace_back(listener);
-  options.table_properties_collector_factories.push_back(
-      std::make_shared<TestPropertiesCollectorFactory>());
-  std::vector<std::string> cf_names = {
-      "pikachu", "ilya", "muromec", "dobrynia",
-      "nikitich", "alyosha", "popovich"};
-  CreateAndReopenWithCF(cf_names, options);
-
-  ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
-  ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
-  ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
-  ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
-  ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
-  ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
-  ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
-  for (int i = 1; i < 8; ++i) {
-    ASSERT_OK(Flush(i));
-    ASSERT_EQ(listener->flushed_dbs_.size(), i);
-    ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
-  }
+  for (auto atomic_flush : {false, true}) {
+    options.atomic_flush = atomic_flush;
+    options.create_if_missing = true;
+    DestroyAndReopen(options);
+    TestFlushListener* listener = new TestFlushListener(options.env, this);
+    options.listeners.emplace_back(listener);
+    options.table_properties_collector_factories.push_back(
+        std::make_shared<TestPropertiesCollectorFactory>());
+    std::vector<std::string> cf_names = {"pikachu",  "ilya",     "muromec",
+                                         "dobrynia", "nikitich", "alyosha",
+                                         "popovich"};
+    CreateAndReopenWithCF(cf_names, options);
+
+    ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
+    ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
+    ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
+    ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
+    ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
+    ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
+    ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
+
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+    for (int i = 1; i < 8; ++i) {
+      ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+          {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted",
+            "EventListenerTest.MultiCF:PreVerifyListener"}});
+      ASSERT_OK(Flush(i));
+      TEST_SYNC_POINT("EventListenerTest.MultiCF:PreVerifyListener");
+      ASSERT_EQ(listener->flushed_dbs_.size(), i);
+      ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
+      // make sure callback functions are called in the right order
+      if (i == 7) {
+        for (size_t j = 0; j < cf_names.size(); j++) {
+          ASSERT_EQ(listener->flushed_dbs_[j], db_);
+          ASSERT_EQ(listener->flushed_column_family_names_[j], cf_names[j]);
+        }
+      }
+    }
 
-  // make sure callback functions are called in the right order
-  for (size_t i = 0; i < cf_names.size(); i++) {
-    ASSERT_EQ(listener->flushed_dbs_[i], db_);
-    ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+    Close();
   }
 }
 
@@ -396,9 +419,9 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) {
     listeners.emplace_back(new TestFlushListener(options.env, this));
   }
 
-  std::vector<std::string> cf_names = {
-      "pikachu", "ilya", "muromec", "dobrynia",
-      "nikitich", "alyosha", "popovich"};
+  std::vector<std::string> cf_names = {"pikachu",  "ilya",     "muromec",
+                                       "dobrynia", "nikitich", "alyosha",
+                                       "popovich"};
 
   options.create_if_missing = true;
   for (int i = 0; i < kNumListeners; ++i) {
@@ -408,16 +431,16 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) {
   ColumnFamilyOptions cf_opts(options);
 
   std::vector<DB*> dbs;
-  std::vector<std::vector<ColumnFamilyHandle *>> vec_handles;
+  std::vector<std::vector<ColumnFamilyHandle*>> vec_handles;
 
   for (int d = 0; d < kNumDBs; ++d) {
-    ASSERT_OK(DestroyDB(dbname_ + ToString(d), options));
+    ASSERT_OK(DestroyDB(dbname_ + std::to_string(d), options));
     DB* db;
     std::vector<ColumnFamilyHandle*> handles;
-    ASSERT_OK(DB::Open(options, dbname_ + ToString(d), &db));
+    ASSERT_OK(DB::Open(options, dbname_ + std::to_string(d), &db));
     for (size_t c = 0; c < cf_names.size(); ++c) {
       ColumnFamilyHandle* handle;
-      db->CreateColumnFamily(cf_opts, cf_names[c], &handle);
+      ASSERT_OK(db->CreateColumnFamily(cf_opts, cf_names[c], &handle));
       handles.push_back(handle);
     }
 
@@ -427,18 +450,26 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) {
 
   for (int d = 0; d < kNumDBs; ++d) {
     for (size_t c = 0; c < cf_names.size(); ++c) {
-      ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c],
-                cf_names[c], cf_names[c]));
+      ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c], cf_names[c],
+                            cf_names[c]));
     }
   }
 
   for (size_t c = 0; c < cf_names.size(); ++c) {
     for (int d = 0; d < kNumDBs; ++d) {
       ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c]));
-      static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForFlushMemTable();
+      ASSERT_OK(
+          static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForFlushMemTable());
     }
   }
 
+  for (int d = 0; d < kNumDBs; ++d) {
+    // Ensure background work is fully finished including listener callbacks
+    // before accessing listener state.
+    ASSERT_OK(
+        static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForBackgroundWork());
+  }
+
   for (auto* listener : listeners) {
     int pos = 0;
     for (size_t c = 0; c < cf_names.size(); ++c) {
@@ -450,7 +481,6 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) {
     }
   }
 
-
   for (auto handles : vec_handles) {
     for (auto h : handles) {
       delete h;
@@ -494,12 +524,16 @@ TEST_F(EventListenerTest, DisableBGCompaction) {
   // keep writing until writes are forced to stop.
   for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
        ++i) {
-    Put(1, ToString(i), std::string(10000, 'x'), WriteOptions());
+    ASSERT_OK(
+        Put(1, std::to_string(i), std::string(10000, 'x'), WriteOptions()));
     FlushOptions fo;
     fo.allow_write_stall = true;
-    db_->Flush(fo, handles_[1]);
+    ASSERT_OK(db_->Flush(fo, handles_[1]));
     db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
   }
+  // Ensure background work is fully finished including listener callbacks
+  // before accessing listener state.
+  ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
   ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
 }
 
@@ -518,8 +552,8 @@ TEST_F(EventListenerTest, CompactionReasonLevel) {
   Options options;
   options.env = CurrentOptions().env;
   options.create_if_missing = true;
-  options.memtable_factory.reset(
-      new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
+  options.memtable_factory.reset(test::NewSpecialSkipListFactory(
+      DBTestBase::kNumKeysByGenerateNewRandomFile));
 
   TestCompactionReasonListener* listener = new TestCompactionReasonListener();
   options.listeners.emplace_back(listener);
@@ -534,7 +568,7 @@ TEST_F(EventListenerTest, CompactionReasonLevel) {
   for (int i = 0; i < 4; i++) {
     GenerateNewRandomFile(&rnd);
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   ASSERT_EQ(listener->compaction_reasons_.size(), 1);
   ASSERT_EQ(listener->compaction_reasons_[0],
@@ -551,14 +585,14 @@ TEST_F(EventListenerTest, CompactionReasonLevel) {
   }
 
   // Do a trivial move from L0 -> L1
-  db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 
   options.max_bytes_for_level_base = 1;
   Close();
   listener->compaction_reasons_.clear();
   Reopen(options);
 
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_GT(listener->compaction_reasons_.size(), 1);
 
   for (auto compaction_reason : listener->compaction_reasons_) {
@@ -570,7 +604,7 @@ TEST_F(EventListenerTest, CompactionReasonLevel) {
   listener->compaction_reasons_.clear();
   Reopen(options);
 
-  Put("key", "value");
+  ASSERT_OK(Put("key", "value"));
   CompactRangeOptions cro;
   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
@@ -584,8 +618,8 @@ TEST_F(EventListenerTest, CompactionReasonUniversal) {
   Options options;
   options.env = CurrentOptions().env;
   options.create_if_missing = true;
-  options.memtable_factory.reset(
-      new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
+  options.memtable_factory.reset(test::NewSpecialSkipListFactory(
+      DBTestBase::kNumKeysByGenerateNewRandomFile));
 
   TestCompactionReasonListener* listener = new TestCompactionReasonListener();
   options.listeners.emplace_back(listener);
@@ -604,7 +638,7 @@ TEST_F(EventListenerTest, CompactionReasonUniversal) {
   for (int i = 0; i < 8; i++) {
     GenerateNewRandomFile(&rnd);
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   ASSERT_GT(listener->compaction_reasons_.size(), 0);
   for (auto compaction_reason : listener->compaction_reasons_) {
@@ -622,7 +656,7 @@ TEST_F(EventListenerTest, CompactionReasonUniversal) {
   for (int i = 0; i < 8; i++) {
     GenerateNewRandomFile(&rnd);
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   ASSERT_GT(listener->compaction_reasons_.size(), 0);
   for (auto compaction_reason : listener->compaction_reasons_) {
@@ -634,7 +668,7 @@ TEST_F(EventListenerTest, CompactionReasonUniversal) {
   listener->compaction_reasons_.clear();
   Reopen(options);
 
-  db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 
   ASSERT_GT(listener->compaction_reasons_.size(), 0);
   for (auto compaction_reason : listener->compaction_reasons_) {
@@ -646,8 +680,8 @@ TEST_F(EventListenerTest, CompactionReasonFIFO) {
   Options options;
   options.env = CurrentOptions().env;
   options.create_if_missing = true;
-  options.memtable_factory.reset(
-      new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
+  options.memtable_factory.reset(test::NewSpecialSkipListFactory(
+      DBTestBase::kNumKeysByGenerateNewRandomFile));
 
   TestCompactionReasonListener* listener = new TestCompactionReasonListener();
   options.listeners.emplace_back(listener);
@@ -663,7 +697,7 @@ TEST_F(EventListenerTest, CompactionReasonFIFO) {
   for (int i = 0; i < 4; i++) {
     GenerateNewRandomFile(&rnd);
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
 
   ASSERT_GT(listener->compaction_reasons_.size(), 0);
   for (auto compaction_reason : listener->compaction_reasons_) {
@@ -676,6 +710,8 @@ class TableFileCreationListener : public EventListener {
   class TestEnv : public EnvWrapper {
    public:
     explicit TestEnv(Env* t) : EnvWrapper(t) {}
+    static const char* kClassName() { return "TestEnv"; }
+    const char* Name() const override { return kClassName(); }
 
     void SetStatus(Status s) { status_ = s; }
 
@@ -753,14 +789,17 @@ class TableFileCreationListener : public EventListener {
     ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
     ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
     if (info.status.ok()) {
-      ASSERT_GT(info.table_properties.data_size, 0U);
-      ASSERT_GT(info.table_properties.raw_key_size, 0U);
-      ASSERT_GT(info.table_properties.raw_value_size, 0U);
-      ASSERT_GT(info.table_properties.num_data_blocks, 0U);
-      ASSERT_GT(info.table_properties.num_entries, 0U);
+      if (info.table_properties.num_range_deletions == 0U) {
+        ASSERT_GT(info.table_properties.data_size, 0U);
+        ASSERT_GT(info.table_properties.raw_key_size, 0U);
+        ASSERT_GT(info.table_properties.raw_value_size, 0U);
+        ASSERT_GT(info.table_properties.num_data_blocks, 0U);
+        ASSERT_GT(info.table_properties.num_entries, 0U);
+      }
     } else {
       if (idx >= 0) {
         failure_[idx]++;
+        last_failure_ = info.status;
       }
     }
   }
@@ -768,6 +807,7 @@ class TableFileCreationListener : public EventListener {
   int started_[2];
   int finished_[2];
   int failure_[2];
+  Status last_failure_;
 };
 
 TEST_F(EventListenerTest, TableFileCreationListenersTest) {
@@ -783,49 +823,78 @@ TEST_F(EventListenerTest, TableFileCreationListenersTest) {
   ASSERT_OK(Put("foo", "aaa"));
   ASSERT_OK(Put("bar", "bbb"));
   ASSERT_OK(Flush());
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
   ASSERT_OK(Put("foo", "aaa1"));
   ASSERT_OK(Put("bar", "bbb1"));
   test_env->SetStatus(Status::NotSupported("not supported"));
   ASSERT_NOK(Flush());
   listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
+  ASSERT_TRUE(listener->last_failure_.IsNotSupported());
   test_env->SetStatus(Status::OK());
 
   Reopen(options);
   ASSERT_OK(Put("foo", "aaa2"));
   ASSERT_OK(Put("bar", "bbb2"));
   ASSERT_OK(Flush());
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
 
   const Slice kRangeStart = "a";
   const Slice kRangeEnd = "z";
-  dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd);
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(
+      dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   listener->CheckAndResetCounters(0, 0, 0, 1, 1, 0);
 
   ASSERT_OK(Put("foo", "aaa3"));
   ASSERT_OK(Put("bar", "bbb3"));
   ASSERT_OK(Flush());
   test_env->SetStatus(Status::NotSupported("not supported"));
-  dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd);
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_NOK(
+      dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd));
+  ASSERT_NOK(dbfull()->TEST_WaitForCompact());
   listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1);
-  Close();
+  ASSERT_TRUE(listener->last_failure_.IsNotSupported());
+
+  // Reset
+  test_env->SetStatus(Status::OK());
+  DestroyAndReopen(options);
+
+  // Verify that an empty table file that is immediately deleted gives Aborted
+  // status to listener.
+  ASSERT_OK(Put("baz", "z"));
+  ASSERT_OK(SingleDelete("baz"));
+  ASSERT_OK(Flush());
+  listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
+  ASSERT_TRUE(listener->last_failure_.IsAborted());
+
+  // Also in compaction
+  ASSERT_OK(Put("baz", "z"));
+  ASSERT_OK(Flush());
+  ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+                             kRangeStart, kRangeEnd));
+  ASSERT_OK(Flush());
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  listener->CheckAndResetCounters(2, 2, 0, 1, 1, 1);
+  ASSERT_TRUE(listener->last_failure_.IsAborted());
+
+  Close();  // Avoid UAF on listener
 }
 
 class MemTableSealedListener : public EventListener {
-private:
+ private:
   SequenceNumber latest_seq_number_;
-public:
+
+ public:
   MemTableSealedListener() {}
   void OnMemTableSealed(const MemTableInfo& info) override {
     latest_seq_number_ = info.first_seqno;
   }
 
   void OnFlushCompleted(DB* /*db*/,
-    const FlushJobInfo& flush_job_info) override {
+                        const FlushJobInfo& flush_job_info) override {
     ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_);
   }
 };
@@ -840,8 +909,8 @@ TEST_F(EventListenerTest, MemTableSealedListenerTest) {
 
   for (unsigned int i = 0; i < 10; i++) {
     std::string tag = std::to_string(i);
-    ASSERT_OK(Put("foo"+tag, "aaa"));
-    ASSERT_OK(Put("bar"+tag, "bbb"));
+    ASSERT_OK(Put("foo" + tag, "aaa"));
+    ASSERT_OK(Put("bar" + tag, "bbb"));
 
     ASSERT_OK(Flush());
   }
@@ -912,7 +981,7 @@ TEST_F(EventListenerTest, BackgroundErrorListenerFailedFlushTest) {
   options.create_if_missing = true;
   options.env = env_;
   options.listeners.push_back(listener);
-  options.memtable_factory.reset(new SpecialSkipListFactory(1));
+  options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
   options.paranoid_checks = true;
   DestroyAndReopen(options);
 
@@ -943,7 +1012,7 @@ TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) {
   options.env = env_;
   options.level0_file_num_compaction_trigger = 2;
   options.listeners.push_back(listener);
-  options.memtable_factory.reset(new SpecialSkipListFactory(2));
+  options.memtable_factory.reset(test::NewSpecialSkipListFactory(2));
   options.paranoid_checks = true;
   DestroyAndReopen(options);
 
@@ -988,6 +1057,13 @@ class TestFileOperationListener : public EventListener {
     file_syncs_success_.store(0);
     file_truncates_.store(0);
     file_truncates_success_.store(0);
+    file_seq_reads_.store(0);
+    blob_file_reads_.store(0);
+    blob_file_writes_.store(0);
+    blob_file_flushes_.store(0);
+    blob_file_closes_.store(0);
+    blob_file_syncs_.store(0);
+    blob_file_truncates_.store(0);
   }
 
   void OnFileReadFinish(const FileOperationInfo& info) override {
@@ -995,6 +1071,12 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_reads_success_;
     }
+    if (info.path.find("MANIFEST") != std::string::npos) {
+      ++file_seq_reads_;
+    }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_reads_;
+    }
     ReportDuration(info);
   }
 
@@ -1003,6 +1085,9 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_writes_success_;
     }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_writes_;
+    }
     ReportDuration(info);
   }
 
@@ -1011,6 +1096,9 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_flushes_success_;
     }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_flushes_;
+    }
     ReportDuration(info);
   }
 
@@ -1019,6 +1107,9 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_closes_success_;
     }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_closes_;
+    }
     ReportDuration(info);
   }
 
@@ -1027,6 +1118,9 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_syncs_success_;
     }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_syncs_;
+    }
     ReportDuration(info);
   }
 
@@ -1035,6 +1129,9 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_truncates_success_;
     }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_truncates_;
+    }
     ReportDuration(info);
   }
 
@@ -1052,6 +1149,13 @@ class TestFileOperationListener : public EventListener {
   std::atomic<size_t> file_syncs_success_;
   std::atomic<size_t> file_truncates_;
   std::atomic<size_t> file_truncates_success_;
+  std::atomic<size_t> file_seq_reads_;
+  std::atomic<size_t> blob_file_reads_;
+  std::atomic<size_t> blob_file_writes_;
+  std::atomic<size_t> blob_file_flushes_;
+  std::atomic<size_t> blob_file_closes_;
+  std::atomic<size_t> blob_file_syncs_;
+  std::atomic<size_t> blob_file_truncates_;
 
  private:
   void ReportDuration(const FileOperationInfo& info) const {
@@ -1076,8 +1180,8 @@ TEST_F(EventListenerTest, OnFileOperationTest) {
   }
   DestroyAndReopen(options);
   ASSERT_OK(Put("foo", "aaa"));
-  dbfull()->Flush(FlushOptions());
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(dbfull()->Flush(FlushOptions()));
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   ASSERT_GE(listener->file_writes_.load(),
             listener->file_writes_success_.load());
   ASSERT_GT(listener->file_writes_.load(), 0);
@@ -1101,11 +1205,391 @@ TEST_F(EventListenerTest, OnFileOperationTest) {
   }
 }
 
+TEST_F(EventListenerTest, OnBlobFileOperationTest) {
+  Options options;
+  options.env = CurrentOptions().env;
+  options.create_if_missing = true;
+  TestFileOperationListener* listener = new TestFileOperationListener();
+  options.listeners.emplace_back(listener);
+  options.disable_auto_compactions = true;
+  options.enable_blob_files = true;
+  options.min_blob_size = 0;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0.5;
+
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("Key1", "blob_value1"));
+  ASSERT_OK(Put("Key2", "blob_value2"));
+  ASSERT_OK(Put("Key3", "blob_value3"));
+  ASSERT_OK(Put("Key4", "blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key3", "new_blob_value3"));
+  ASSERT_OK(Put("Key4", "new_blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key5", "blob_value5"));
+  ASSERT_OK(Put("Key6", "blob_value6"));
+  ASSERT_OK(Flush());
+
+  ASSERT_GT(listener->blob_file_writes_.load(), 0U);
+  ASSERT_GT(listener->blob_file_flushes_.load(), 0U);
+  Close();
+
+  Reopen(options);
+  ASSERT_GT(listener->blob_file_closes_.load(), 0U);
+  ASSERT_GT(listener->blob_file_syncs_.load(), 0U);
+  if (true == options.use_direct_io_for_flush_and_compaction) {
+    ASSERT_GT(listener->blob_file_truncates_.load(), 0U);
+  }
+}
+
+TEST_F(EventListenerTest, ReadManifestAndWALOnRecovery) {
+  Options options;
+  options.env = CurrentOptions().env;
+  options.create_if_missing = true;
+
+  TestFileOperationListener* listener = new TestFileOperationListener();
+  options.listeners.emplace_back(listener);
+
+  options.use_direct_io_for_flush_and_compaction = false;
+  Status s = TryReopen(options);
+  if (s.IsInvalidArgument()) {
+    options.use_direct_io_for_flush_and_compaction = false;
+  } else {
+    ASSERT_OK(s);
+  }
+  DestroyAndReopen(options);
+  ASSERT_OK(Put("foo", "aaa"));
+  Close();
+
+  size_t seq_reads = listener->file_seq_reads_.load();
+  Reopen(options);
+  ASSERT_GT(listener->file_seq_reads_.load(), seq_reads);
+}
+
+class BlobDBJobLevelEventListenerTest : public EventListener {
+ public:
+  explicit BlobDBJobLevelEventListenerTest(EventListenerTest* test)
+      : test_(test), call_count_(0) {}
+
+  const VersionStorageInfo* GetVersionStorageInfo() const {
+    VersionSet* const versions = test_->dbfull()->GetVersionSet();
+    assert(versions);
+
+    ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+    EXPECT_NE(cfd, nullptr);
+
+    Version* const current = cfd->current();
+    EXPECT_NE(current, nullptr);
+
+    const VersionStorageInfo* const storage_info = current->storage_info();
+    EXPECT_NE(storage_info, nullptr);
+
+    return storage_info;
+  }
+
+  void CheckBlobFileAdditions(
+      const std::vector<BlobFileAdditionInfo>& blob_file_addition_infos) const {
+    const auto* vstorage = GetVersionStorageInfo();
+
+    EXPECT_FALSE(blob_file_addition_infos.empty());
+
+    for (const auto& blob_file_addition_info : blob_file_addition_infos) {
+      const auto meta = vstorage->GetBlobFileMetaData(
+          blob_file_addition_info.blob_file_number);
+
+      EXPECT_NE(meta, nullptr);
+      EXPECT_EQ(meta->GetBlobFileNumber(),
+                blob_file_addition_info.blob_file_number);
+      EXPECT_EQ(meta->GetTotalBlobBytes(),
+                blob_file_addition_info.total_blob_bytes);
+      EXPECT_EQ(meta->GetTotalBlobCount(),
+                blob_file_addition_info.total_blob_count);
+      EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
+    }
+  }
+
+  std::vector<std::string> GetFlushedFiles() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    std::vector<std::string> result;
+    for (const auto& fname : flushed_files_) {
+      result.push_back(fname);
+    }
+    return result;
+  }
+
+  void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
+    call_count_++;
+
+    {
+      std::lock_guard<std::mutex> lock(mutex_);
+      flushed_files_.push_back(info.file_path);
+    }
+
+    EXPECT_EQ(info.blob_compression_type, kNoCompression);
+
+    CheckBlobFileAdditions(info.blob_file_addition_infos);
+  }
+
+  void OnCompactionCompleted(DB* /*db*/,
+                             const CompactionJobInfo& info) override {
+    call_count_++;
+
+    EXPECT_EQ(info.blob_compression_type, kNoCompression);
+
+    CheckBlobFileAdditions(info.blob_file_addition_infos);
+
+    EXPECT_FALSE(info.blob_file_garbage_infos.empty());
+
+    for (const auto& blob_file_garbage_info : info.blob_file_garbage_infos) {
+      EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U);
+      EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U);
+      EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U);
+      EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty());
+    }
+  }
+
+  EventListenerTest* test_;
+  uint32_t call_count_;
+
+ private:
+  std::vector<std::string> flushed_files_;
+  std::mutex mutex_;
+};
+
+// Test OnFlushCompleted EventListener called for blob files
+TEST_F(EventListenerTest, BlobDBOnFlushCompleted) {
+  Options options;
+  options.env = CurrentOptions().env;
+  options.enable_blob_files = true;
+  options.create_if_missing = true;
+  options.disable_auto_compactions = true;
+
+  options.min_blob_size = 0;
+  BlobDBJobLevelEventListenerTest* blob_event_listener =
+      new BlobDBJobLevelEventListenerTest(this);
+  options.listeners.emplace_back(blob_event_listener);
+
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("Key1", "blob_value1"));
+  ASSERT_OK(Put("Key2", "blob_value2"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key3", "blob_value3"));
+  ASSERT_OK(Flush());
+
+  ASSERT_EQ(Get("Key1"), "blob_value1");
+  ASSERT_EQ(Get("Key2"), "blob_value2");
+  ASSERT_EQ(Get("Key3"), "blob_value3");
+
+  ASSERT_GT(blob_event_listener->call_count_, 0U);
+}
+
+// Test OnCompactionCompleted EventListener called for blob files
+TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) {
+  Options options;
+  options.env = CurrentOptions().env;
+  options.enable_blob_files = true;
+  options.create_if_missing = true;
+  options.disable_auto_compactions = true;
+  options.min_blob_size = 0;
+  BlobDBJobLevelEventListenerTest* blob_event_listener =
+      new BlobDBJobLevelEventListenerTest(this);
+  options.listeners.emplace_back(blob_event_listener);
+
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0.5;
+
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("Key1", "blob_value1"));
+  ASSERT_OK(Put("Key2", "blob_value2"));
+  ASSERT_OK(Put("Key3", "blob_value3"));
+  ASSERT_OK(Put("Key4", "blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key3", "new_blob_value3"));
+  ASSERT_OK(Put("Key4", "new_blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key5", "blob_value5"));
+  ASSERT_OK(Put("Key6", "blob_value6"));
+  ASSERT_OK(Flush());
+
+  blob_event_listener->call_count_ = 0;
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+
+  // On compaction, because of blob_garbage_collection_age_cutoff, it will
+  // delete the oldest blob file and create new blob file during compaction.
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+
+  // Make sure, OnCompactionCompleted is called.
+  ASSERT_GT(blob_event_listener->call_count_, 0U);
+}
+
+// Test CompactFiles calls OnCompactionCompleted EventListener for blob files
+// and populate the blob files info.
+TEST_F(EventListenerTest, BlobDBCompactFiles) {
+  Options options;
+  options.env = CurrentOptions().env;
+  options.enable_blob_files = true;
+  options.create_if_missing = true;
+  options.disable_auto_compactions = true;
+  options.min_blob_size = 0;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0.5;
+
+  BlobDBJobLevelEventListenerTest* blob_event_listener =
+      new BlobDBJobLevelEventListenerTest(this);
+  options.listeners.emplace_back(blob_event_listener);
+
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("Key1", "blob_value1"));
+  ASSERT_OK(Put("Key2", "blob_value2"));
+  ASSERT_OK(Put("Key3", "blob_value3"));
+  ASSERT_OK(Put("Key4", "blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key3", "new_blob_value3"));
+  ASSERT_OK(Put("Key4", "new_blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key5", "blob_value5"));
+  ASSERT_OK(Put("Key6", "blob_value6"));
+  ASSERT_OK(Flush());
+
+  std::vector<std::string> output_file_names;
+  CompactionJobInfo compaction_job_info;
+
+  // On compaction, because of blob_garbage_collection_age_cutoff, it will
+  // delete the oldest blob file and create new blob file during compaction
+  // which will be populated in output_files_names.
+  ASSERT_OK(dbfull()->CompactFiles(
+      CompactionOptions(), blob_event_listener->GetFlushedFiles(), 1, -1,
+      &output_file_names, &compaction_job_info));
+
+  bool is_blob_in_output = false;
+  for (const auto& file : output_file_names) {
+    if (EndsWith(file, ".blob")) {
+      is_blob_in_output = true;
+    }
+  }
+  ASSERT_TRUE(is_blob_in_output);
+
+  for (const auto& blob_file_addition_info :
+       compaction_job_info.blob_file_addition_infos) {
+    EXPECT_GT(blob_file_addition_info.blob_file_number, 0U);
+    EXPECT_GT(blob_file_addition_info.total_blob_bytes, 0U);
+    EXPECT_GT(blob_file_addition_info.total_blob_count, 0U);
+    EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
+  }
+
+  for (const auto& blob_file_garbage_info :
+       compaction_job_info.blob_file_garbage_infos) {
+    EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U);
+    EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U);
+    EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U);
+    EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty());
+  }
+}
+
+class BlobDBFileLevelEventListener : public EventListener {
+ public:
+  void OnBlobFileCreationStarted(
+      const BlobFileCreationBriefInfo& info) override {
+    files_started_++;
+    EXPECT_FALSE(info.db_name.empty());
+    EXPECT_FALSE(info.cf_name.empty());
+    EXPECT_FALSE(info.file_path.empty());
+    EXPECT_GT(info.job_id, 0);
+  }
+
+  void OnBlobFileCreated(const BlobFileCreationInfo& info) override {
+    files_created_++;
+    EXPECT_FALSE(info.db_name.empty());
+    EXPECT_FALSE(info.cf_name.empty());
+    EXPECT_FALSE(info.file_path.empty());
+    EXPECT_GT(info.job_id, 0);
+    EXPECT_GT(info.total_blob_count, 0U);
+    EXPECT_GT(info.total_blob_bytes, 0U);
+    EXPECT_EQ(info.file_checksum, kUnknownFileChecksum);
+    EXPECT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
+    EXPECT_TRUE(info.status.ok());
+  }
+
+  void OnBlobFileDeleted(const BlobFileDeletionInfo& info) override {
+    files_deleted_++;
+    EXPECT_FALSE(info.db_name.empty());
+    EXPECT_FALSE(info.file_path.empty());
+    EXPECT_GT(info.job_id, 0);
+    EXPECT_TRUE(info.status.ok());
+  }
+
+  void CheckCounters() {
+    EXPECT_EQ(files_started_, files_created_);
+    EXPECT_GT(files_started_, 0U);
+    EXPECT_GT(files_deleted_, 0U);
+    EXPECT_LT(files_deleted_, files_created_);
+  }
+
+ private:
+  std::atomic<uint32_t> files_started_{};
+  std::atomic<uint32_t> files_created_{};
+  std::atomic<uint32_t> files_deleted_{};
+};
+
+TEST_F(EventListenerTest, BlobDBFileTest) {
+  Options options;
+  options.env = CurrentOptions().env;
+  options.enable_blob_files = true;
+  options.create_if_missing = true;
+  options.disable_auto_compactions = true;
+  options.min_blob_size = 0;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0.5;
+
+  BlobDBFileLevelEventListener* blob_event_listener =
+      new BlobDBFileLevelEventListener();
+  options.listeners.emplace_back(blob_event_listener);
+
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("Key1", "blob_value1"));
+  ASSERT_OK(Put("Key2", "blob_value2"));
+  ASSERT_OK(Put("Key3", "blob_value3"));
+  ASSERT_OK(Put("Key4", "blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key3", "new_blob_value3"));
+  ASSERT_OK(Put("Key4", "new_blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key5", "blob_value5"));
+  ASSERT_OK(Put("Key6", "blob_value6"));
+  ASSERT_OK(Flush());
+
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+
+  // On compaction, because of blob_garbage_collection_age_cutoff, it will
+  // delete the oldest blob file and create new blob file during compaction.
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  blob_event_listener->CheckCounters();
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 #endif  // ROCKSDB_LITE
 
 int main(int argc, char** argv) {
+  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 }