#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
+#include "file/filename.h"
#include "port/port.h"
#include "port/stack_trace.h"
-#include "test_util/fault_injection_test_env.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/mutexlock.h"
+#include "utilities/fault_injection_env.h"
namespace ROCKSDB_NAMESPACE {
class DBFlushTest : public DBTestBase {
public:
- DBFlushTest() : DBTestBase("/db_flush_test") {}
+ DBFlushTest() : DBTestBase("/db_flush_test", /*env_do_fsync=*/true) {}
};
class DBFlushDirectIOTest : public DBFlushTest,
ASSERT_OK(Put("bar", "v"));
ASSERT_OK(dbfull()->Flush(no_wait));
// If the issue is hit we will wait here forever.
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
#ifndef ROCKSDB_LITE
ASSERT_EQ(2, TotalTableFiles());
#endif // ROCKSDB_LITE
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
- Put("key", "value");
+ ASSERT_OK(Put("key", "value"));
auto* cfd =
- reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
+ static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
->cfd();
FlushOptions flush_options;
flush_options.wait = false;
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
fault_injection_env->SetFilesystemActive(true);
// Now the background job will do the flush; wait for it.
- dbfull()->TEST_WaitForFlushMemTable();
+ // Returns the IO error happend during flush.
+ ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
#ifndef ROCKSDB_LITE
ASSERT_EQ("", FilesPerLevel()); // flush failed.
#endif // ROCKSDB_LITE
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
- Put("key", "value");
+ ASSERT_OK(Put("key", "value"));
FlushOptions flush_options;
flush_options.wait = false;
TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
// Now the background job will do the flush; wait for it.
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
Destroy(options);
}
ASSERT_OK(Put("key", "val"));
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put("key", "val"));
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(4, num_flushes);
ASSERT_EQ(1, num_compactions);
}
// mode.
fault_injection_env->SetFilesystemActive(false);
ASSERT_OK(db_->ContinueBackgroundWork());
- dbfull()->TEST_WaitForFlushMemTable();
+ // We ingested the error to env, so the returned status is not OK.
+ ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
#ifndef ROCKSDB_LITE
uint64_t num_bg_errors;
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kBackgroundErrors,
DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
InstrumentedMutex* mutex = db_impl->mutex();
mutex->Lock();
- auto* cfd =
- reinterpret_cast<ColumnFamilyHandleImpl*>(db->DefaultColumnFamily())
- ->cfd();
+ auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
+ db->DefaultColumnFamily())
+ ->cfd();
ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
mutex->Unlock();
}
}
#endif // !ROCKSDB_LITE
+TEST_F(DBFlushTest, FlushWithBlob) {
+ constexpr uint64_t min_blob_size = 10;
+
+ Options options;
+ options.env = CurrentOptions().env;
+ options.enable_blob_files = true;
+ options.min_blob_size = min_blob_size;
+ options.disable_auto_compactions = true;
+ options.env = env_;
+
+ Reopen(options);
+
+ constexpr char short_value[] = "short";
+ static_assert(sizeof(short_value) - 1 < min_blob_size,
+ "short_value too long");
+
+ constexpr char long_value[] = "long_value";
+ static_assert(sizeof(long_value) - 1 >= min_blob_size,
+ "long_value too short");
+
+ ASSERT_OK(Put("key1", short_value));
+ ASSERT_OK(Put("key2", long_value));
+
+ ASSERT_OK(Flush());
+
+ ASSERT_EQ(Get("key1"), short_value);
+ ASSERT_EQ(Get("key2"), long_value);
+
+ VersionSet* const versions = dbfull()->TEST_GetVersionSet();
+ assert(versions);
+
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ assert(cfd);
+
+ Version* const current = cfd->current();
+ assert(current);
+
+ const VersionStorageInfo* const storage_info = current->storage_info();
+ assert(storage_info);
+
+ const auto& l0_files = storage_info->LevelFiles(0);
+ ASSERT_EQ(l0_files.size(), 1);
+
+ const FileMetaData* const table_file = l0_files[0];
+ assert(table_file);
+
+ const auto& blob_files = storage_info->GetBlobFiles();
+ ASSERT_EQ(blob_files.size(), 1);
+
+ const auto& blob_file = blob_files.begin()->second;
+ assert(blob_file);
+
+ ASSERT_EQ(table_file->smallest.user_key(), "key1");
+ ASSERT_EQ(table_file->largest.user_key(), "key2");
+ ASSERT_EQ(table_file->fd.smallest_seqno, 1);
+ ASSERT_EQ(table_file->fd.largest_seqno, 2);
+ ASSERT_EQ(table_file->oldest_blob_file_number,
+ blob_file->GetBlobFileNumber());
+
+ ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
+
+#ifndef ROCKSDB_LITE
+ const InternalStats* const internal_stats = cfd->internal_stats();
+ assert(internal_stats);
+
+ const uint64_t expected_bytes =
+ table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
+
+ const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+ ASSERT_FALSE(compaction_stats.empty());
+ ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes);
+ ASSERT_EQ(compaction_stats[0].num_output_files, 2);
+
+ const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
+ ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes);
+#endif // ROCKSDB_LITE
+}
+
+class DBFlushTestBlobError : public DBFlushTest,
+ public testing::WithParamInterface<std::string> {
+ public:
+ DBFlushTestBlobError()
+ : fault_injection_env_(env_), sync_point_(GetParam()) {}
+ ~DBFlushTestBlobError() { Close(); }
+
+ FaultInjectionTestEnv fault_injection_env_;
+ std::string sync_point_;
+};
+
+INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError,
+ ::testing::ValuesIn(std::vector<std::string>{
+ "BlobFileBuilder::WriteBlobToFile:AddRecord",
+ "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
+
+TEST_P(DBFlushTestBlobError, FlushError) {
+ Options options;
+ options.enable_blob_files = true;
+ options.disable_auto_compactions = true;
+ options.env = &fault_injection_env_;
+
+ Reopen(options);
+
+ ASSERT_OK(Put("key", "blob"));
+
+ SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
+ fault_injection_env_.SetFilesystemActive(false,
+ Status::IOError(sync_point_));
+ });
+ SyncPoint::GetInstance()->SetCallBack(
+ "BuildTable:BeforeDeleteFile", [this](void* /* arg */) {
+ fault_injection_env_.SetFilesystemActive(true);
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_NOK(Flush());
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ VersionSet* const versions = dbfull()->TEST_GetVersionSet();
+ assert(versions);
+
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ assert(cfd);
+
+ Version* const current = cfd->current();
+ assert(current);
+
+ const VersionStorageInfo* const storage_info = current->storage_info();
+ assert(storage_info);
+
+ const auto& l0_files = storage_info->LevelFiles(0);
+ ASSERT_TRUE(l0_files.empty());
+
+ const auto& blob_files = storage_info->GetBlobFiles();
+ ASSERT_TRUE(blob_files.empty());
+
+ // Make sure the files generated by the failed job have been deleted
+ std::vector<std::string> files;
+ ASSERT_OK(env_->GetChildren(dbname_, &files));
+ for (const auto& file : files) {
+ uint64_t number = 0;
+ FileType type = kTableFile;
+
+ if (!ParseFileName(file, &number, &type)) {
+ continue;
+ }
+
+ ASSERT_NE(type, kTableFile);
+ ASSERT_NE(type, kBlobFile);
+ }
+
+#ifndef ROCKSDB_LITE
+ const InternalStats* const internal_stats = cfd->internal_stats();
+ assert(internal_stats);
+
+ const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+ ASSERT_FALSE(compaction_stats.empty());
+
+ if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
+ ASSERT_EQ(compaction_stats[0].bytes_written, 0);
+ ASSERT_EQ(compaction_stats[0].num_output_files, 0);
+ } else {
+ // SST file writing succeeded; blob file writing failed (during Finish)
+ ASSERT_GT(compaction_stats[0].bytes_written, 0);
+ ASSERT_EQ(compaction_stats[0].num_output_files, 1);
+ }
+
+ const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
+ ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
+ compaction_stats[0].bytes_written);
+#endif // ROCKSDB_LITE
+}
+
TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;
TEST_SYNC_POINT(
"DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
if (options.atomic_flush) {
- for (size_t i = 0; i != num_cfs - 1; ++i) {
+ for (size_t i = 0; i + 1 != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
}
} else {
- for (size_t i = 0; i != num_cfs - 1; ++i) {
+ for (size_t i = 0; i + 1 != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
for (auto* cfh : handles_) {
- dbfull()->TEST_WaitForFlushMemTable(cfh);
+ // Returns the IO error happend during flush.
+ ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable(cfh));
}
for (size_t i = 0; i != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);