]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_flush_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_flush_test.cc
index bab206d3d0f5fb975b3a52c1069f86af3d41e614..eeaa6912370069ec31bdfadcd3e5758738f2653c 100644 (file)
 
 #include "db/db_impl/db_impl.h"
 #include "db/db_test_util.h"
+#include "file/filename.h"
 #include "port/port.h"
 #include "port/stack_trace.h"
-#include "test_util/fault_injection_test_env.h"
 #include "test_util/sync_point.h"
 #include "util/cast_util.h"
 #include "util/mutexlock.h"
+#include "utilities/fault_injection_env.h"
 
 namespace ROCKSDB_NAMESPACE {
 
 class DBFlushTest : public DBTestBase {
  public:
-  DBFlushTest() : DBTestBase("/db_flush_test") {}
+  DBFlushTest() : DBTestBase("/db_flush_test", /*env_do_fsync=*/true) {}
 };
 
 class DBFlushDirectIOTest : public DBFlushTest,
@@ -62,7 +63,7 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
   ASSERT_OK(Put("bar", "v"));
   ASSERT_OK(dbfull()->Flush(no_wait));
   // If the issue is hit we will wait here forever.
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
 #ifndef ROCKSDB_LITE
   ASSERT_EQ(2, TotalTableFiles());
 #endif  // ROCKSDB_LITE
@@ -87,9 +88,9 @@ TEST_F(DBFlushTest, SyncFail) {
   SyncPoint::GetInstance()->EnableProcessing();
 
   CreateAndReopenWithCF({"pikachu"}, options);
-  Put("key", "value");
+  ASSERT_OK(Put("key", "value"));
   auto* cfd =
-      reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
+      static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
           ->cfd();
   FlushOptions flush_options;
   flush_options.wait = false;
@@ -106,7 +107,8 @@ TEST_F(DBFlushTest, SyncFail) {
   TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
   fault_injection_env->SetFilesystemActive(true);
   // Now the background job will do the flush; wait for it.
-  dbfull()->TEST_WaitForFlushMemTable();
+  // Returns the IO error happend during flush.
+  ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("", FilesPerLevel());  // flush failed.
 #endif                             // ROCKSDB_LITE
@@ -125,7 +127,7 @@ TEST_F(DBFlushTest, SyncSkip) {
   SyncPoint::GetInstance()->EnableProcessing();
 
   Reopen(options);
-  Put("key", "value");
+  ASSERT_OK(Put("key", "value"));
 
   FlushOptions flush_options;
   flush_options.wait = false;
@@ -135,7 +137,7 @@ TEST_F(DBFlushTest, SyncSkip) {
   TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
 
   // Now the background job will do the flush; wait for it.
-  dbfull()->TEST_WaitForFlushMemTable();
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
 
   Destroy(options);
 }
@@ -170,9 +172,9 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
   ASSERT_OK(Put("key", "val"));
   for (int i = 0; i < 4; ++i) {
     ASSERT_OK(Put("key", "val"));
-    dbfull()->TEST_WaitForFlushMemTable();
+    ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
   }
-  dbfull()->TEST_WaitForCompact();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
   ASSERT_EQ(4, num_flushes);
   ASSERT_EQ(1, num_compactions);
 }
@@ -305,7 +307,8 @@ TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
   // mode.
   fault_injection_env->SetFilesystemActive(false);
   ASSERT_OK(db_->ContinueBackgroundWork());
-  dbfull()->TEST_WaitForFlushMemTable();
+  // We ingested the error to env, so the returned status is not OK.
+  ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
 #ifndef ROCKSDB_LITE
   uint64_t num_bg_errors;
   ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kBackgroundErrors,
@@ -379,9 +382,9 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
       DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
       InstrumentedMutex* mutex = db_impl->mutex();
       mutex->Lock();
-      auto* cfd =
-          reinterpret_cast<ColumnFamilyHandleImpl*>(db->DefaultColumnFamily())
-              ->cfd();
+      auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
+                      db->DefaultColumnFamily())
+                      ->cfd();
       ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
       mutex->Unlock();
     }
@@ -443,6 +446,180 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
 }
 #endif  // !ROCKSDB_LITE
 
+TEST_F(DBFlushTest, FlushWithBlob) {
+  constexpr uint64_t min_blob_size = 10;
+
+  Options options;
+  options.env = CurrentOptions().env;
+  options.enable_blob_files = true;
+  options.min_blob_size = min_blob_size;
+  options.disable_auto_compactions = true;
+  options.env = env_;
+
+  Reopen(options);
+
+  constexpr char short_value[] = "short";
+  static_assert(sizeof(short_value) - 1 < min_blob_size,
+                "short_value too long");
+
+  constexpr char long_value[] = "long_value";
+  static_assert(sizeof(long_value) - 1 >= min_blob_size,
+                "long_value too short");
+
+  ASSERT_OK(Put("key1", short_value));
+  ASSERT_OK(Put("key2", long_value));
+
+  ASSERT_OK(Flush());
+
+  ASSERT_EQ(Get("key1"), short_value);
+  ASSERT_EQ(Get("key2"), long_value);
+
+  VersionSet* const versions = dbfull()->TEST_GetVersionSet();
+  assert(versions);
+
+  ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+  assert(cfd);
+
+  Version* const current = cfd->current();
+  assert(current);
+
+  const VersionStorageInfo* const storage_info = current->storage_info();
+  assert(storage_info);
+
+  const auto& l0_files = storage_info->LevelFiles(0);
+  ASSERT_EQ(l0_files.size(), 1);
+
+  const FileMetaData* const table_file = l0_files[0];
+  assert(table_file);
+
+  const auto& blob_files = storage_info->GetBlobFiles();
+  ASSERT_EQ(blob_files.size(), 1);
+
+  const auto& blob_file = blob_files.begin()->second;
+  assert(blob_file);
+
+  ASSERT_EQ(table_file->smallest.user_key(), "key1");
+  ASSERT_EQ(table_file->largest.user_key(), "key2");
+  ASSERT_EQ(table_file->fd.smallest_seqno, 1);
+  ASSERT_EQ(table_file->fd.largest_seqno, 2);
+  ASSERT_EQ(table_file->oldest_blob_file_number,
+            blob_file->GetBlobFileNumber());
+
+  ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
+
+#ifndef ROCKSDB_LITE
+  const InternalStats* const internal_stats = cfd->internal_stats();
+  assert(internal_stats);
+
+  const uint64_t expected_bytes =
+      table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
+
+  const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+  ASSERT_FALSE(compaction_stats.empty());
+  ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes);
+  ASSERT_EQ(compaction_stats[0].num_output_files, 2);
+
+  const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
+  ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes);
+#endif  // ROCKSDB_LITE
+}
+
+class DBFlushTestBlobError : public DBFlushTest,
+                             public testing::WithParamInterface<std::string> {
+ public:
+  DBFlushTestBlobError()
+      : fault_injection_env_(env_), sync_point_(GetParam()) {}
+  ~DBFlushTestBlobError() { Close(); }
+
+  FaultInjectionTestEnv fault_injection_env_;
+  std::string sync_point_;
+};
+
+INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError,
+                        ::testing::ValuesIn(std::vector<std::string>{
+                            "BlobFileBuilder::WriteBlobToFile:AddRecord",
+                            "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
+
+TEST_P(DBFlushTestBlobError, FlushError) {
+  Options options;
+  options.enable_blob_files = true;
+  options.disable_auto_compactions = true;
+  options.env = &fault_injection_env_;
+
+  Reopen(options);
+
+  ASSERT_OK(Put("key", "blob"));
+
+  SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
+    fault_injection_env_.SetFilesystemActive(false,
+                                             Status::IOError(sync_point_));
+  });
+  SyncPoint::GetInstance()->SetCallBack(
+      "BuildTable:BeforeDeleteFile", [this](void* /* arg */) {
+        fault_injection_env_.SetFilesystemActive(true);
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_NOK(Flush());
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+
+  VersionSet* const versions = dbfull()->TEST_GetVersionSet();
+  assert(versions);
+
+  ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+  assert(cfd);
+
+  Version* const current = cfd->current();
+  assert(current);
+
+  const VersionStorageInfo* const storage_info = current->storage_info();
+  assert(storage_info);
+
+  const auto& l0_files = storage_info->LevelFiles(0);
+  ASSERT_TRUE(l0_files.empty());
+
+  const auto& blob_files = storage_info->GetBlobFiles();
+  ASSERT_TRUE(blob_files.empty());
+
+  // Make sure the files generated by the failed job have been deleted
+  std::vector<std::string> files;
+  ASSERT_OK(env_->GetChildren(dbname_, &files));
+  for (const auto& file : files) {
+    uint64_t number = 0;
+    FileType type = kTableFile;
+
+    if (!ParseFileName(file, &number, &type)) {
+      continue;
+    }
+
+    ASSERT_NE(type, kTableFile);
+    ASSERT_NE(type, kBlobFile);
+  }
+
+#ifndef ROCKSDB_LITE
+  const InternalStats* const internal_stats = cfd->internal_stats();
+  assert(internal_stats);
+
+  const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+  ASSERT_FALSE(compaction_stats.empty());
+
+  if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
+    ASSERT_EQ(compaction_stats[0].bytes_written, 0);
+    ASSERT_EQ(compaction_stats[0].num_output_files, 0);
+  } else {
+    // SST file writing succeeded; blob file writing failed (during Finish)
+    ASSERT_GT(compaction_stats[0].bytes_written, 0);
+    ASSERT_EQ(compaction_stats[0].num_output_files, 1);
+  }
+
+  const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
+  ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
+            compaction_stats[0].bytes_written);
+#endif  // ROCKSDB_LITE
+}
+
 TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
   Options options = CurrentOptions();
   options.create_if_missing = true;
@@ -499,13 +676,13 @@ TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
   TEST_SYNC_POINT(
       "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
   if (options.atomic_flush) {
-    for (size_t i = 0; i != num_cfs - 1; ++i) {
+    for (size_t i = 0; i + 1 != num_cfs; ++i) {
       auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
       ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
       ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
     }
   } else {
-    for (size_t i = 0; i != num_cfs - 1; ++i) {
+    for (size_t i = 0; i + 1 != num_cfs; ++i) {
       auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
       ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
       ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
@@ -549,7 +726,8 @@ TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {
   fault_injection_env->SetFilesystemActive(false);
   TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
   for (auto* cfh : handles_) {
-    dbfull()->TEST_WaitForFlushMemTable(cfh);
+    // Returns the IO error happend during flush.
+    ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable(cfh));
   }
   for (size_t i = 0; i != num_cfs; ++i) {
     auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);