]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/compact_files_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / compact_files_test.cc
index 048ed6e26f172cedcea98e383f37e0fcac1b2c84..ef38946f7e23c6c66ec052a1c458fd64f23a771f 100644 (file)
@@ -77,7 +77,7 @@ TEST_F(CompactFilesTest, L0ConflictsFiles) {
   options.compression = kNoCompression;
 
   DB* db = nullptr;
-  DestroyDB(db_name_, options);
+  ASSERT_OK(DestroyDB(db_name_, options));
   Status s = DB::Open(options, db_name_, &db);
   assert(s.ok());
   assert(db);
@@ -91,9 +91,9 @@ TEST_F(CompactFilesTest, L0ConflictsFiles) {
   // create couple files
   // Background compaction starts and waits in BackgroundCallCompaction:0
   for (int i = 0; i < kLevel0Trigger * 4; ++i) {
-    db->Put(WriteOptions(), ToString(i), "");
-    db->Put(WriteOptions(), ToString(100 - i), "");
-    db->Flush(FlushOptions());
+    ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), ""));
+    ASSERT_OK(db->Put(WriteOptions(), std::to_string(100 - i), ""));
+    ASSERT_OK(db->Flush(FlushOptions()));
   }
 
   ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
@@ -118,6 +118,81 @@ TEST_F(CompactFilesTest, L0ConflictsFiles) {
   delete db;
 }
 
+TEST_F(CompactFilesTest, MultipleLevel) {
+  Options options;
+  options.create_if_missing = true;
+  options.level_compaction_dynamic_level_bytes = true;
+  options.num_levels = 6;
+  // Add listener
+  FlushedFileCollector* collector = new FlushedFileCollector();
+  options.listeners.emplace_back(collector);
+
+  DB* db = nullptr;
+  ASSERT_OK(DestroyDB(db_name_, options));
+  Status s = DB::Open(options, db_name_, &db);
+  ASSERT_OK(s);
+  ASSERT_NE(db, nullptr);
+
+  // create couple files in L0, L3, L4 and L5
+  for (int i = 5; i > 2; --i) {
+    collector->ClearFlushedFiles();
+    ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), ""));
+    ASSERT_OK(db->Flush(FlushOptions()));
+    // Ensure background work is fully finished including listener callbacks
+    // before accessing listener state.
+    ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
+    auto l0_files = collector->GetFlushedFiles();
+    ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, i));
+
+    std::string prop;
+    ASSERT_TRUE(db->GetProperty(
+        "rocksdb.num-files-at-level" + std::to_string(i), &prop));
+    ASSERT_EQ("1", prop);
+  }
+  ASSERT_OK(db->Put(WriteOptions(), std::to_string(0), ""));
+  ASSERT_OK(db->Flush(FlushOptions()));
+
+  ColumnFamilyMetaData meta;
+  db->GetColumnFamilyMetaData(&meta);
+  // Compact files except the file in L3
+  std::vector<std::string> files;
+  for (int i = 0; i < 6; ++i) {
+    if (i == 3) continue;
+    for (auto& file : meta.levels[i].files) {
+      files.push_back(file.db_path + "/" + file.name);
+    }
+  }
+
+  SyncPoint::GetInstance()->LoadDependency({
+      {"CompactionJob::Run():Start", "CompactFilesTest.MultipleLevel:0"},
+      {"CompactFilesTest.MultipleLevel:1", "CompactFilesImpl:3"},
+  });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  std::thread thread([&] {
+    TEST_SYNC_POINT("CompactFilesTest.MultipleLevel:0");
+    ASSERT_OK(db->Put(WriteOptions(), "bar", "v2"));
+    ASSERT_OK(db->Put(WriteOptions(), "foo", "v2"));
+    ASSERT_OK(db->Flush(FlushOptions()));
+    TEST_SYNC_POINT("CompactFilesTest.MultipleLevel:1");
+  });
+
+  // Compaction cannot move up the data to higher level
+  // here we have input file from level 5, so the output level has to be >= 5
+  for (int invalid_output_level = 0; invalid_output_level < 5;
+       invalid_output_level++) {
+    s = db->CompactFiles(CompactionOptions(), files, invalid_output_level);
+    std::cout << s.ToString() << std::endl;
+    ASSERT_TRUE(s.IsInvalidArgument());
+  }
+
+  ASSERT_OK(db->CompactFiles(CompactionOptions(), files, 5));
+  SyncPoint::GetInstance()->DisableProcessing();
+  thread.join();
+
+  delete db;
+}
+
 TEST_F(CompactFilesTest, ObsoleteFiles) {
   Options options;
   // to trigger compaction more easily
@@ -136,20 +211,20 @@ TEST_F(CompactFilesTest, ObsoleteFiles) {
   options.listeners.emplace_back(collector);
 
   DB* db = nullptr;
-  DestroyDB(db_name_, options);
+  ASSERT_OK(DestroyDB(db_name_, options));
   Status s = DB::Open(options, db_name_, &db);
-  assert(s.ok());
-  assert(db);
+  ASSERT_OK(s);
+  ASSERT_NE(db, nullptr);
 
   // create couple files
   for (int i = 1000; i < 2000; ++i) {
-    db->Put(WriteOptions(), ToString(i),
-            std::string(kWriteBufferSize / 10, 'a' + (i % 26)));
+    ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
+                      std::string(kWriteBufferSize / 10, 'a' + (i % 26))));
   }
 
   auto l0_files = collector->GetFlushedFiles();
   ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));
-  static_cast_with_check<DBImpl>(db)->TEST_WaitForCompact();
+  ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForCompact());
 
   // verify all compaction input files are deleted
   for (auto fname : l0_files) {
@@ -175,22 +250,24 @@ TEST_F(CompactFilesTest, NotCutOutputOnLevel0) {
   options.listeners.emplace_back(collector);
 
   DB* db = nullptr;
-  DestroyDB(db_name_, options);
+  ASSERT_OK(DestroyDB(db_name_, options));
   Status s = DB::Open(options, db_name_, &db);
   assert(s.ok());
   assert(db);
 
   // create couple files
   for (int i = 0; i < 500; ++i) {
-    db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
+    ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
+                      std::string(1000, 'a' + (i % 26))));
   }
-  static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable();
+  ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable());
   auto l0_files_1 = collector->GetFlushedFiles();
   collector->ClearFlushedFiles();
   for (int i = 0; i < 500; ++i) {
-    db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
+    ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
+                      std::string(1000, 'a' + (i % 26))));
   }
-  static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable();
+  ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable());
   auto l0_files_2 = collector->GetFlushedFiles();
   ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));
   ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));
@@ -211,17 +288,20 @@ TEST_F(CompactFilesTest, CapturingPendingFiles) {
   options.listeners.emplace_back(collector);
 
   DB* db = nullptr;
-  DestroyDB(db_name_, options);
+  ASSERT_OK(DestroyDB(db_name_, options));
   Status s = DB::Open(options, db_name_, &db);
-  assert(s.ok());
+  ASSERT_OK(s);
   assert(db);
 
   // Create 5 files.
   for (int i = 0; i < 5; ++i) {
-    db->Put(WriteOptions(), "key" + ToString(i), "value");
-    db->Flush(FlushOptions());
+    ASSERT_OK(db->Put(WriteOptions(), "key" + std::to_string(i), "value"));
+    ASSERT_OK(db->Flush(FlushOptions()));
   }
 
+  // Ensure background work is fully finished including listener callbacks
+  // before accessing listener state.
+  ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
   auto l0_files = collector->GetFlushedFiles();
   EXPECT_EQ(5, l0_files.size());
 
@@ -237,8 +317,8 @@ TEST_F(CompactFilesTest, CapturingPendingFiles) {
 
   // In the meantime flush another file.
   TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
-  db->Put(WriteOptions(), "key5", "value");
-  db->Flush(FlushOptions());
+  ASSERT_OK(db->Put(WriteOptions(), "key5", "value"));
+  ASSERT_OK(db->Flush(FlushOptions()));
   TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
 
   compaction_thread.join();
@@ -249,7 +329,7 @@ TEST_F(CompactFilesTest, CapturingPendingFiles) {
 
   // Make sure we can reopen the DB.
   s = DB::Open(options, db_name_, &db);
-  ASSERT_TRUE(s.ok());
+  ASSERT_OK(s);
   assert(db);
   delete db;
 }
@@ -268,9 +348,7 @@ TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
       return true;
     }
 
-    void SetDB(DB* db) {
-      db_ = db;
-    }
+    void SetDB(DB* db) { db_ = db; }
 
     const char* Name() const override { return "FilterWithGet"; }
 
@@ -278,7 +356,6 @@ TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
     DB* db_;
   };
 
-
   std::shared_ptr<FilterWithGet> cf(new FilterWithGet());
 
   Options options;
@@ -286,15 +363,15 @@ TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
   options.compaction_filter = cf.get();
 
   DB* db = nullptr;
-  DestroyDB(db_name_, options);
+  ASSERT_OK(DestroyDB(db_name_, options));
   Status s = DB::Open(options, db_name_, &db);
   ASSERT_OK(s);
 
   cf->SetDB(db);
 
   // Write one L0 file
-  db->Put(WriteOptions(), "K1", "V1");
-  db->Flush(FlushOptions());
+  ASSERT_OK(db->Put(WriteOptions(), "K1", "V1"));
+  ASSERT_OK(db->Flush(FlushOptions()));
 
   // Compact all L0 files using CompactFiles
   ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
@@ -305,7 +382,6 @@ TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
         db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), {fname}, 0));
   }
 
-
   delete db;
 }
 
@@ -320,11 +396,10 @@ TEST_F(CompactFilesTest, SentinelCompressionType) {
   }
   // Check that passing `CompressionType::kDisableCompressionOption` to
   // `CompactFiles` causes it to use the column family compression options.
-  for (auto compaction_style :
-       {CompactionStyle::kCompactionStyleLevel,
-        CompactionStyle::kCompactionStyleUniversal,
-        CompactionStyle::kCompactionStyleNone}) {
-    DestroyDB(db_name_, Options());
+  for (auto compaction_style : {CompactionStyle::kCompactionStyleLevel,
+                                CompactionStyle::kCompactionStyleUniversal,
+                                CompactionStyle::kCompactionStyleNone}) {
+    ASSERT_OK(DestroyDB(db_name_, Options()));
     Options options;
     options.compaction_style = compaction_style;
     // L0: Snappy, L1: ZSTD, L2: Snappy
@@ -337,9 +412,12 @@ TEST_F(CompactFilesTest, SentinelCompressionType) {
     DB* db = nullptr;
     ASSERT_OK(DB::Open(options, db_name_, &db));
 
-    db->Put(WriteOptions(), "key", "val");
-    db->Flush(FlushOptions());
+    ASSERT_OK(db->Put(WriteOptions(), "key", "val"));
+    ASSERT_OK(db->Flush(FlushOptions()));
 
+    // Ensure background work is fully finished including listener callbacks
+    // before accessing listener state.
+    ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
     auto l0_files = collector->GetFlushedFiles();
     ASSERT_EQ(1, l0_files.size());
 
@@ -375,16 +453,17 @@ TEST_F(CompactFilesTest, GetCompactionJobInfo) {
   options.listeners.emplace_back(collector);
 
   DB* db = nullptr;
-  DestroyDB(db_name_, options);
+  ASSERT_OK(DestroyDB(db_name_, options));
   Status s = DB::Open(options, db_name_, &db);
-  assert(s.ok());
+  ASSERT_OK(s);
   assert(db);
 
   // create couple files
   for (int i = 0; i < 500; ++i) {
-    db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
+    ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
+                      std::string(1000, 'a' + (i % 26))));
   }
-  static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable();
+  ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable());
   auto l0_files_1 = collector->GetFlushedFiles();
   CompactionOptions co;
   co.compression = CompressionType::kLZ4Compression;
@@ -406,6 +485,7 @@ TEST_F(CompactFilesTest, GetCompactionJobInfo) {
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
+  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 }