X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Frocksdb%2Fdb%2Fdb_flush_test.cc;h=eeaa6912370069ec31bdfadcd3e5758738f2653c;hb=20effc670b57271cb089376d6d0800990e5218d5;hp=bab206d3d0f5fb975b3a52c1069f86af3d41e614;hpb=a71831dadd1e1f3e0fa70405511f65cc33db0498;p=ceph.git diff --git a/ceph/src/rocksdb/db/db_flush_test.cc b/ceph/src/rocksdb/db/db_flush_test.cc index bab206d3d..eeaa69123 100644 --- a/ceph/src/rocksdb/db/db_flush_test.cc +++ b/ceph/src/rocksdb/db/db_flush_test.cc @@ -11,18 +11,19 @@ #include "db/db_impl/db_impl.h" #include "db/db_test_util.h" +#include "file/filename.h" #include "port/port.h" #include "port/stack_trace.h" -#include "test_util/fault_injection_test_env.h" #include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/mutexlock.h" +#include "utilities/fault_injection_env.h" namespace ROCKSDB_NAMESPACE { class DBFlushTest : public DBTestBase { public: - DBFlushTest() : DBTestBase("/db_flush_test") {} + DBFlushTest() : DBTestBase("/db_flush_test", /*env_do_fsync=*/true) {} }; class DBFlushDirectIOTest : public DBFlushTest, @@ -62,7 +63,7 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) { ASSERT_OK(Put("bar", "v")); ASSERT_OK(dbfull()->Flush(no_wait)); // If the issue is hit we will wait here forever. - dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); #ifndef ROCKSDB_LITE ASSERT_EQ(2, TotalTableFiles()); #endif // ROCKSDB_LITE @@ -87,9 +88,9 @@ TEST_F(DBFlushTest, SyncFail) { SyncPoint::GetInstance()->EnableProcessing(); CreateAndReopenWithCF({"pikachu"}, options); - Put("key", "value"); + ASSERT_OK(Put("key", "value")); auto* cfd = - reinterpret_cast(db_->DefaultColumnFamily()) + static_cast_with_check(db_->DefaultColumnFamily()) ->cfd(); FlushOptions flush_options; flush_options.wait = false; @@ -106,7 +107,8 @@ TEST_F(DBFlushTest, SyncFail) { TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); fault_injection_env->SetFilesystemActive(true); // Now the background job will do the flush; wait for it. - dbfull()->TEST_WaitForFlushMemTable(); + // Returns the IO error happend during flush. + ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable()); #ifndef ROCKSDB_LITE ASSERT_EQ("", FilesPerLevel()); // flush failed. #endif // ROCKSDB_LITE @@ -125,7 +127,7 @@ TEST_F(DBFlushTest, SyncSkip) { SyncPoint::GetInstance()->EnableProcessing(); Reopen(options); - Put("key", "value"); + ASSERT_OK(Put("key", "value")); FlushOptions flush_options; flush_options.wait = false; @@ -135,7 +137,7 @@ TEST_F(DBFlushTest, SyncSkip) { TEST_SYNC_POINT("DBFlushTest::SyncSkip:2"); // Now the background job will do the flush; wait for it. - dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); Destroy(options); } @@ -170,9 +172,9 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) { ASSERT_OK(Put("key", "val")); for (int i = 0; i < 4; ++i) { ASSERT_OK(Put("key", "val")); - dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); } - dbfull()->TEST_WaitForCompact(); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ(4, num_flushes); ASSERT_EQ(1, num_compactions); } @@ -305,7 +307,8 @@ TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) { // mode. fault_injection_env->SetFilesystemActive(false); ASSERT_OK(db_->ContinueBackgroundWork()); - dbfull()->TEST_WaitForFlushMemTable(); + // We ingested the error to env, so the returned status is not OK. + ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable()); #ifndef ROCKSDB_LITE uint64_t num_bg_errors; ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kBackgroundErrors, @@ -379,9 +382,9 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { DBImpl* db_impl = static_cast_with_check(db); InstrumentedMutex* mutex = db_impl->mutex(); mutex->Lock(); - auto* cfd = - reinterpret_cast(db->DefaultColumnFamily()) - ->cfd(); + auto* cfd = static_cast_with_check( + db->DefaultColumnFamily()) + ->cfd(); ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber()); mutex->Unlock(); } @@ -443,6 +446,180 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { } #endif // !ROCKSDB_LITE +TEST_F(DBFlushTest, FlushWithBlob) { + constexpr uint64_t min_blob_size = 10; + + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + options.disable_auto_compactions = true; + options.env = env_; + + Reopen(options); + + constexpr char short_value[] = "short"; + static_assert(sizeof(short_value) - 1 < min_blob_size, + "short_value too long"); + + constexpr char long_value[] = "long_value"; + static_assert(sizeof(long_value) - 1 >= min_blob_size, + "long_value too short"); + + ASSERT_OK(Put("key1", short_value)); + ASSERT_OK(Put("key2", long_value)); + + ASSERT_OK(Flush()); + + ASSERT_EQ(Get("key1"), short_value); + ASSERT_EQ(Get("key2"), long_value); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& l0_files = storage_info->LevelFiles(0); + ASSERT_EQ(l0_files.size(), 1); + + const FileMetaData* const table_file = l0_files[0]; + assert(table_file); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_EQ(blob_files.size(), 1); + + const auto& blob_file = blob_files.begin()->second; + assert(blob_file); + + ASSERT_EQ(table_file->smallest.user_key(), "key1"); + ASSERT_EQ(table_file->largest.user_key(), "key2"); + ASSERT_EQ(table_file->fd.smallest_seqno, 1); + ASSERT_EQ(table_file->fd.largest_seqno, 2); + ASSERT_EQ(table_file->oldest_blob_file_number, + blob_file->GetBlobFileNumber()); + + ASSERT_EQ(blob_file->GetTotalBlobCount(), 1); + +#ifndef ROCKSDB_LITE + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const uint64_t expected_bytes = + table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes(); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_FALSE(compaction_stats.empty()); + ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes); + ASSERT_EQ(compaction_stats[0].num_output_files, 2); + + const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes); +#endif // ROCKSDB_LITE +} + +class DBFlushTestBlobError : public DBFlushTest, + public testing::WithParamInterface { + public: + DBFlushTestBlobError() + : fault_injection_env_(env_), sync_point_(GetParam()) {} + ~DBFlushTestBlobError() { Close(); } + + FaultInjectionTestEnv fault_injection_env_; + std::string sync_point_; +}; + +INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError, + ::testing::ValuesIn(std::vector{ + "BlobFileBuilder::WriteBlobToFile:AddRecord", + "BlobFileBuilder::WriteBlobToFile:AppendFooter"})); + +TEST_P(DBFlushTestBlobError, FlushError) { + Options options; + options.enable_blob_files = true; + options.disable_auto_compactions = true; + options.env = &fault_injection_env_; + + Reopen(options); + + ASSERT_OK(Put("key", "blob")); + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeDeleteFile", [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(true); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_NOK(Flush()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& l0_files = storage_info->LevelFiles(0); + ASSERT_TRUE(l0_files.empty()); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_TRUE(blob_files.empty()); + + // Make sure the files generated by the failed job have been deleted + std::vector files; + ASSERT_OK(env_->GetChildren(dbname_, &files)); + for (const auto& file : files) { + uint64_t number = 0; + FileType type = kTableFile; + + if (!ParseFileName(file, &number, &type)) { + continue; + } + + ASSERT_NE(type, kTableFile); + ASSERT_NE(type, kBlobFile); + } + +#ifndef ROCKSDB_LITE + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_FALSE(compaction_stats.empty()); + + if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") { + ASSERT_EQ(compaction_stats[0].bytes_written, 0); + ASSERT_EQ(compaction_stats[0].num_output_files, 0); + } else { + // SST file writing succeeded; blob file writing failed (during Finish) + ASSERT_GT(compaction_stats[0].bytes_written, 0); + ASSERT_EQ(compaction_stats[0].num_output_files, 1); + } + + const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], + compaction_stats[0].bytes_written); +#endif // ROCKSDB_LITE +} + TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { Options options = CurrentOptions(); options.create_if_missing = true; @@ -499,13 +676,13 @@ TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) { TEST_SYNC_POINT( "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"); if (options.atomic_flush) { - for (size_t i = 0; i != num_cfs - 1; ++i) { + for (size_t i = 0; i + 1 != num_cfs; ++i) { auto cfh = static_cast(handles_[i]); ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed()); ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty()); } } else { - for (size_t i = 0; i != num_cfs - 1; ++i) { + for (size_t i = 0; i + 1 != num_cfs; ++i) { auto cfh = static_cast(handles_[i]); ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed()); ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty()); @@ -549,7 +726,8 @@ TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) { fault_injection_env->SetFilesystemActive(false); TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2"); for (auto* cfh : handles_) { - dbfull()->TEST_WaitForFlushMemTable(cfh); + // Returns the IO error happend during flush. + ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable(cfh)); } for (size_t i = 0; i != num_cfs; ++i) { auto cfh = static_cast(handles_[i]);