#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "file/filename.h"
-#include "memtable/hash_linklist_rep.h"
#include "monitoring/statistics.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
class EventListenerTest : public DBTestBase {
public:
- EventListenerTest() : DBTestBase("/listener_test", /*env_do_fsync=*/true) {}
+ EventListenerTest() : DBTestBase("listener_test", /*env_do_fsync=*/true) {}
static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
uint64_t size) {
public:
explicit TestCompactionListener(EventListenerTest* test) : test_(test) {}
- void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override {
+ void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
std::lock_guard<std::mutex> lock(mutex_);
compacted_dbs_.push_back(db);
ASSERT_GT(ci.input_files.size(), 0U);
TestCompactionListener* listener = new TestCompactionListener(this);
options.listeners.emplace_back(listener);
- std::vector<std::string> cf_names = {
- "pikachu", "ilya", "muromec", "dobrynia",
- "nikitich", "alyosha", "popovich"};
+ std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
+ "dobrynia", "nikitich", "alyosha",
+ "popovich"};
CreateAndReopenWithCF(cf_names, options);
ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
for (int i = 1; i < 8; ++i) {
ASSERT_OK(Flush(i));
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
nullptr, nullptr));
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size());
: slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
db_closed = false;
}
- void OnTableFileCreated(
- const TableFileCreationInfo& info) override {
+
+ virtual ~TestFlushListener() {
+ prev_fc_info_.status.PermitUncheckedError(); // Ignore the status
+ }
+ void OnTableFileCreated(const TableFileCreationInfo& info) override {
// remember the info for later checking the FlushJobInfo.
prev_fc_info_ = info;
ASSERT_GT(info.db_name.size(), 0U);
#endif // ROCKSDB_USING_THREAD_STATUS
}
- void OnFlushCompleted(
- DB* db, const FlushJobInfo& info) override {
+ void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
flushed_dbs_.push_back(db);
flushed_column_family_names_.push_back(info.cf_name);
if (info.triggered_writes_slowdown) {
ASSERT_TRUE(test_);
if (db == test_->db_) {
std::vector<std::vector<FileMetaData>> files_by_level;
+ ASSERT_LT(info.cf_id, test_->handles_.size());
+ ASSERT_GE(info.cf_id, 0u);
+ ASSERT_NE(test_->handles_[info.cf_id], nullptr);
test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[info.cf_id],
&files_by_level);
#endif // ROCKSDB_USING_THREAD_STATUS
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
- std::vector<std::string> cf_names = {
- "pikachu", "ilya", "muromec", "dobrynia",
- "nikitich", "alyosha", "popovich"};
+ std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
+ "dobrynia", "nikitich", "alyosha",
+ "popovich"};
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());
CreateAndReopenWithCF(cf_names, options);
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
for (int i = 1; i < 8; ++i) {
ASSERT_OK(Flush(i));
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ // Ensure background work is fully finished including listener callbacks
+ // before accessing listener state.
+ ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_EQ(listener->flushed_dbs_.size(), i);
ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
}
#ifdef ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
- TestFlushListener* listener = new TestFlushListener(options.env, this);
- options.listeners.emplace_back(listener);
- options.table_properties_collector_factories.push_back(
- std::make_shared<TestPropertiesCollectorFactory>());
- std::vector<std::string> cf_names = {
- "pikachu", "ilya", "muromec", "dobrynia",
- "nikitich", "alyosha", "popovich"};
- CreateAndReopenWithCF(cf_names, options);
-
- ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
- ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
- ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
- ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
- ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
- ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
- ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
- for (int i = 1; i < 8; ++i) {
- ASSERT_OK(Flush(i));
- ASSERT_EQ(listener->flushed_dbs_.size(), i);
- ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
- }
+ for (auto atomic_flush : {false, true}) {
+ options.atomic_flush = atomic_flush;
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+ TestFlushListener* listener = new TestFlushListener(options.env, this);
+ options.listeners.emplace_back(listener);
+ options.table_properties_collector_factories.push_back(
+ std::make_shared<TestPropertiesCollectorFactory>());
+ std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
+ "dobrynia", "nikitich", "alyosha",
+ "popovich"};
+ CreateAndReopenWithCF(cf_names, options);
+
+ ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
+ ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
+ ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
+ ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
+ ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
+ ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
+ ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ for (int i = 1; i < 8; ++i) {
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted",
+ "EventListenerTest.MultiCF:PreVerifyListener"}});
+ ASSERT_OK(Flush(i));
+ TEST_SYNC_POINT("EventListenerTest.MultiCF:PreVerifyListener");
+ ASSERT_EQ(listener->flushed_dbs_.size(), i);
+ ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
+ // make sure callback functions are called in the right order
+ if (i == 7) {
+ for (size_t j = 0; j < cf_names.size(); j++) {
+ ASSERT_EQ(listener->flushed_dbs_[j], db_);
+ ASSERT_EQ(listener->flushed_column_family_names_[j], cf_names[j]);
+ }
+ }
+ }
- // make sure callback functions are called in the right order
- for (size_t i = 0; i < cf_names.size(); i++) {
- ASSERT_EQ(listener->flushed_dbs_[i], db_);
- ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ Close();
}
}
listeners.emplace_back(new TestFlushListener(options.env, this));
}
- std::vector<std::string> cf_names = {
- "pikachu", "ilya", "muromec", "dobrynia",
- "nikitich", "alyosha", "popovich"};
+ std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
+ "dobrynia", "nikitich", "alyosha",
+ "popovich"};
options.create_if_missing = true;
for (int i = 0; i < kNumListeners; ++i) {
ColumnFamilyOptions cf_opts(options);
std::vector<DB*> dbs;
- std::vector<std::vector<ColumnFamilyHandle *>> vec_handles;
+ std::vector<std::vector<ColumnFamilyHandle*>> vec_handles;
for (int d = 0; d < kNumDBs; ++d) {
- ASSERT_OK(DestroyDB(dbname_ + ToString(d), options));
+ ASSERT_OK(DestroyDB(dbname_ + std::to_string(d), options));
DB* db;
std::vector<ColumnFamilyHandle*> handles;
- ASSERT_OK(DB::Open(options, dbname_ + ToString(d), &db));
+ ASSERT_OK(DB::Open(options, dbname_ + std::to_string(d), &db));
for (size_t c = 0; c < cf_names.size(); ++c) {
ColumnFamilyHandle* handle;
- db->CreateColumnFamily(cf_opts, cf_names[c], &handle);
+ ASSERT_OK(db->CreateColumnFamily(cf_opts, cf_names[c], &handle));
handles.push_back(handle);
}
for (int d = 0; d < kNumDBs; ++d) {
for (size_t c = 0; c < cf_names.size(); ++c) {
- ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c],
- cf_names[c], cf_names[c]));
+ ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c], cf_names[c],
+ cf_names[c]));
}
}
for (size_t c = 0; c < cf_names.size(); ++c) {
for (int d = 0; d < kNumDBs; ++d) {
ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c]));
- static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForFlushMemTable();
+ ASSERT_OK(
+ static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForFlushMemTable());
}
}
+ for (int d = 0; d < kNumDBs; ++d) {
+ // Ensure background work is fully finished including listener callbacks
+ // before accessing listener state.
+ ASSERT_OK(
+ static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForBackgroundWork());
+ }
+
for (auto* listener : listeners) {
int pos = 0;
for (size_t c = 0; c < cf_names.size(); ++c) {
}
}
-
for (auto handles : vec_handles) {
for (auto h : handles) {
delete h;
// keep writing until writes are forced to stop.
for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
++i) {
- Put(1, ToString(i), std::string(10000, 'x'), WriteOptions());
+ ASSERT_OK(
+ Put(1, std::to_string(i), std::string(10000, 'x'), WriteOptions()));
FlushOptions fo;
fo.allow_write_stall = true;
- db_->Flush(fo, handles_[1]);
+ ASSERT_OK(db_->Flush(fo, handles_[1]));
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
}
+ // Ensure background work is fully finished including listener callbacks
+ // before accessing listener state.
+ ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
}
Options options;
options.env = CurrentOptions().env;
options.create_if_missing = true;
- options.memtable_factory.reset(
- new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
+ options.memtable_factory.reset(test::NewSpecialSkipListFactory(
+ DBTestBase::kNumKeysByGenerateNewRandomFile));
TestCompactionReasonListener* listener = new TestCompactionReasonListener();
options.listeners.emplace_back(listener);
for (int i = 0; i < 4; i++) {
GenerateNewRandomFile(&rnd);
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(listener->compaction_reasons_.size(), 1);
ASSERT_EQ(listener->compaction_reasons_[0],
}
// Do a trivial move from L0 -> L1
- db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
options.max_bytes_for_level_base = 1;
Close();
listener->compaction_reasons_.clear();
Reopen(options);
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(listener->compaction_reasons_.size(), 1);
for (auto compaction_reason : listener->compaction_reasons_) {
listener->compaction_reasons_.clear();
Reopen(options);
- Put("key", "value");
+ ASSERT_OK(Put("key", "value"));
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
Options options;
options.env = CurrentOptions().env;
options.create_if_missing = true;
- options.memtable_factory.reset(
- new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
+ options.memtable_factory.reset(test::NewSpecialSkipListFactory(
+ DBTestBase::kNumKeysByGenerateNewRandomFile));
TestCompactionReasonListener* listener = new TestCompactionReasonListener();
options.listeners.emplace_back(listener);
for (int i = 0; i < 8; i++) {
GenerateNewRandomFile(&rnd);
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(listener->compaction_reasons_.size(), 0);
for (auto compaction_reason : listener->compaction_reasons_) {
for (int i = 0; i < 8; i++) {
GenerateNewRandomFile(&rnd);
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(listener->compaction_reasons_.size(), 0);
for (auto compaction_reason : listener->compaction_reasons_) {
listener->compaction_reasons_.clear();
Reopen(options);
- db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_GT(listener->compaction_reasons_.size(), 0);
for (auto compaction_reason : listener->compaction_reasons_) {
Options options;
options.env = CurrentOptions().env;
options.create_if_missing = true;
- options.memtable_factory.reset(
- new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
+ options.memtable_factory.reset(test::NewSpecialSkipListFactory(
+ DBTestBase::kNumKeysByGenerateNewRandomFile));
TestCompactionReasonListener* listener = new TestCompactionReasonListener();
options.listeners.emplace_back(listener);
for (int i = 0; i < 4; i++) {
GenerateNewRandomFile(&rnd);
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(listener->compaction_reasons_.size(), 0);
for (auto compaction_reason : listener->compaction_reasons_) {
class TestEnv : public EnvWrapper {
public:
explicit TestEnv(Env* t) : EnvWrapper(t) {}
+ static const char* kClassName() { return "TestEnv"; }
+ const char* Name() const override { return kClassName(); }
void SetStatus(Status s) { status_ = s; }
ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
if (info.status.ok()) {
- ASSERT_GT(info.table_properties.data_size, 0U);
- ASSERT_GT(info.table_properties.raw_key_size, 0U);
- ASSERT_GT(info.table_properties.raw_value_size, 0U);
- ASSERT_GT(info.table_properties.num_data_blocks, 0U);
- ASSERT_GT(info.table_properties.num_entries, 0U);
+ if (info.table_properties.num_range_deletions == 0U) {
+ ASSERT_GT(info.table_properties.data_size, 0U);
+ ASSERT_GT(info.table_properties.raw_key_size, 0U);
+ ASSERT_GT(info.table_properties.raw_value_size, 0U);
+ ASSERT_GT(info.table_properties.num_data_blocks, 0U);
+ ASSERT_GT(info.table_properties.num_entries, 0U);
+ }
} else {
if (idx >= 0) {
failure_[idx]++;
+ last_failure_ = info.status;
}
}
}
int started_[2];
int finished_[2];
int failure_[2];
+ Status last_failure_;
};
TEST_F(EventListenerTest, TableFileCreationListenersTest) {
ASSERT_OK(Put("foo", "aaa"));
ASSERT_OK(Put("bar", "bbb"));
ASSERT_OK(Flush());
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
ASSERT_OK(Put("foo", "aaa1"));
ASSERT_OK(Put("bar", "bbb1"));
test_env->SetStatus(Status::NotSupported("not supported"));
ASSERT_NOK(Flush());
listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
+ ASSERT_TRUE(listener->last_failure_.IsNotSupported());
test_env->SetStatus(Status::OK());
Reopen(options);
ASSERT_OK(Put("foo", "aaa2"));
ASSERT_OK(Put("bar", "bbb2"));
ASSERT_OK(Flush());
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
const Slice kRangeStart = "a";
const Slice kRangeEnd = "z";
- dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd);
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(
+ dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
listener->CheckAndResetCounters(0, 0, 0, 1, 1, 0);
ASSERT_OK(Put("foo", "aaa3"));
ASSERT_OK(Put("bar", "bbb3"));
ASSERT_OK(Flush());
test_env->SetStatus(Status::NotSupported("not supported"));
- dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd);
- dbfull()->TEST_WaitForCompact();
+ ASSERT_NOK(
+ dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd));
+ ASSERT_NOK(dbfull()->TEST_WaitForCompact());
listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1);
- Close();
+ ASSERT_TRUE(listener->last_failure_.IsNotSupported());
+
+ // Reset
+ test_env->SetStatus(Status::OK());
+ DestroyAndReopen(options);
+
+ // Verify that an empty table file that is immediately deleted gives Aborted
+ // status to listener.
+ ASSERT_OK(Put("baz", "z"));
+ ASSERT_OK(SingleDelete("baz"));
+ ASSERT_OK(Flush());
+ listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
+ ASSERT_TRUE(listener->last_failure_.IsAborted());
+
+ // Also in compaction
+ ASSERT_OK(Put("baz", "z"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
+ kRangeStart, kRangeEnd));
+ ASSERT_OK(Flush());
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ listener->CheckAndResetCounters(2, 2, 0, 1, 1, 1);
+ ASSERT_TRUE(listener->last_failure_.IsAborted());
+
+ Close(); // Avoid UAF on listener
}
class MemTableSealedListener : public EventListener {
-private:
+ private:
SequenceNumber latest_seq_number_;
-public:
+
+ public:
MemTableSealedListener() {}
void OnMemTableSealed(const MemTableInfo& info) override {
latest_seq_number_ = info.first_seqno;
}
void OnFlushCompleted(DB* /*db*/,
- const FlushJobInfo& flush_job_info) override {
+ const FlushJobInfo& flush_job_info) override {
ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_);
}
};
for (unsigned int i = 0; i < 10; i++) {
std::string tag = std::to_string(i);
- ASSERT_OK(Put("foo"+tag, "aaa"));
- ASSERT_OK(Put("bar"+tag, "bbb"));
+ ASSERT_OK(Put("foo" + tag, "aaa"));
+ ASSERT_OK(Put("bar" + tag, "bbb"));
ASSERT_OK(Flush());
}
options.create_if_missing = true;
options.env = env_;
options.listeners.push_back(listener);
- options.memtable_factory.reset(new SpecialSkipListFactory(1));
+ options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
options.paranoid_checks = true;
DestroyAndReopen(options);
options.env = env_;
options.level0_file_num_compaction_trigger = 2;
options.listeners.push_back(listener);
- options.memtable_factory.reset(new SpecialSkipListFactory(2));
+ options.memtable_factory.reset(test::NewSpecialSkipListFactory(2));
options.paranoid_checks = true;
DestroyAndReopen(options);
file_syncs_success_.store(0);
file_truncates_.store(0);
file_truncates_success_.store(0);
+ file_seq_reads_.store(0);
+ blob_file_reads_.store(0);
+ blob_file_writes_.store(0);
+ blob_file_flushes_.store(0);
+ blob_file_closes_.store(0);
+ blob_file_syncs_.store(0);
+ blob_file_truncates_.store(0);
}
void OnFileReadFinish(const FileOperationInfo& info) override {
if (info.status.ok()) {
++file_reads_success_;
}
+ if (info.path.find("MANIFEST") != std::string::npos) {
+ ++file_seq_reads_;
+ }
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_reads_;
+ }
ReportDuration(info);
}
if (info.status.ok()) {
++file_writes_success_;
}
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_writes_;
+ }
ReportDuration(info);
}
if (info.status.ok()) {
++file_flushes_success_;
}
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_flushes_;
+ }
ReportDuration(info);
}
if (info.status.ok()) {
++file_closes_success_;
}
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_closes_;
+ }
ReportDuration(info);
}
if (info.status.ok()) {
++file_syncs_success_;
}
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_syncs_;
+ }
ReportDuration(info);
}
if (info.status.ok()) {
++file_truncates_success_;
}
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_truncates_;
+ }
ReportDuration(info);
}
std::atomic<size_t> file_syncs_success_;
std::atomic<size_t> file_truncates_;
std::atomic<size_t> file_truncates_success_;
+ std::atomic<size_t> file_seq_reads_;
+ std::atomic<size_t> blob_file_reads_;
+ std::atomic<size_t> blob_file_writes_;
+ std::atomic<size_t> blob_file_flushes_;
+ std::atomic<size_t> blob_file_closes_;
+ std::atomic<size_t> blob_file_syncs_;
+ std::atomic<size_t> blob_file_truncates_;
private:
void ReportDuration(const FileOperationInfo& info) const {
}
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "aaa"));
- dbfull()->Flush(FlushOptions());
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->Flush(FlushOptions()));
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_GE(listener->file_writes_.load(),
listener->file_writes_success_.load());
ASSERT_GT(listener->file_writes_.load(), 0);
}
}
+TEST_F(EventListenerTest, OnBlobFileOperationTest) {
+ Options options;
+ options.env = CurrentOptions().env;
+ options.create_if_missing = true;
+ TestFileOperationListener* listener = new TestFileOperationListener();
+ options.listeners.emplace_back(listener);
+ options.disable_auto_compactions = true;
+ options.enable_blob_files = true;
+ options.min_blob_size = 0;
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0.5;
+
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("Key1", "blob_value1"));
+ ASSERT_OK(Put("Key2", "blob_value2"));
+ ASSERT_OK(Put("Key3", "blob_value3"));
+ ASSERT_OK(Put("Key4", "blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key3", "new_blob_value3"));
+ ASSERT_OK(Put("Key4", "new_blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key5", "blob_value5"));
+ ASSERT_OK(Put("Key6", "blob_value6"));
+ ASSERT_OK(Flush());
+
+ ASSERT_GT(listener->blob_file_writes_.load(), 0U);
+ ASSERT_GT(listener->blob_file_flushes_.load(), 0U);
+ Close();
+
+ Reopen(options);
+ ASSERT_GT(listener->blob_file_closes_.load(), 0U);
+ ASSERT_GT(listener->blob_file_syncs_.load(), 0U);
+ if (true == options.use_direct_io_for_flush_and_compaction) {
+ ASSERT_GT(listener->blob_file_truncates_.load(), 0U);
+ }
+}
+
+TEST_F(EventListenerTest, ReadManifestAndWALOnRecovery) {
+ Options options;
+ options.env = CurrentOptions().env;
+ options.create_if_missing = true;
+
+ TestFileOperationListener* listener = new TestFileOperationListener();
+ options.listeners.emplace_back(listener);
+
+ options.use_direct_io_for_flush_and_compaction = false;
+ Status s = TryReopen(options);
+ if (s.IsInvalidArgument()) {
+ options.use_direct_io_for_flush_and_compaction = false;
+ } else {
+ ASSERT_OK(s);
+ }
+ DestroyAndReopen(options);
+ ASSERT_OK(Put("foo", "aaa"));
+ Close();
+
+ size_t seq_reads = listener->file_seq_reads_.load();
+ Reopen(options);
+ ASSERT_GT(listener->file_seq_reads_.load(), seq_reads);
+}
+
+class BlobDBJobLevelEventListenerTest : public EventListener {
+ public:
+ explicit BlobDBJobLevelEventListenerTest(EventListenerTest* test)
+ : test_(test), call_count_(0) {}
+
+ const VersionStorageInfo* GetVersionStorageInfo() const {
+ VersionSet* const versions = test_->dbfull()->GetVersionSet();
+ assert(versions);
+
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ EXPECT_NE(cfd, nullptr);
+
+ Version* const current = cfd->current();
+ EXPECT_NE(current, nullptr);
+
+ const VersionStorageInfo* const storage_info = current->storage_info();
+ EXPECT_NE(storage_info, nullptr);
+
+ return storage_info;
+ }
+
+ void CheckBlobFileAdditions(
+ const std::vector<BlobFileAdditionInfo>& blob_file_addition_infos) const {
+ const auto* vstorage = GetVersionStorageInfo();
+
+ EXPECT_FALSE(blob_file_addition_infos.empty());
+
+ for (const auto& blob_file_addition_info : blob_file_addition_infos) {
+ const auto meta = vstorage->GetBlobFileMetaData(
+ blob_file_addition_info.blob_file_number);
+
+ EXPECT_NE(meta, nullptr);
+ EXPECT_EQ(meta->GetBlobFileNumber(),
+ blob_file_addition_info.blob_file_number);
+ EXPECT_EQ(meta->GetTotalBlobBytes(),
+ blob_file_addition_info.total_blob_bytes);
+ EXPECT_EQ(meta->GetTotalBlobCount(),
+ blob_file_addition_info.total_blob_count);
+ EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
+ }
+ }
+
+ std::vector<std::string> GetFlushedFiles() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ std::vector<std::string> result;
+ for (const auto& fname : flushed_files_) {
+ result.push_back(fname);
+ }
+ return result;
+ }
+
+ void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
+ call_count_++;
+
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ flushed_files_.push_back(info.file_path);
+ }
+
+ EXPECT_EQ(info.blob_compression_type, kNoCompression);
+
+ CheckBlobFileAdditions(info.blob_file_addition_infos);
+ }
+
+ void OnCompactionCompleted(DB* /*db*/,
+ const CompactionJobInfo& info) override {
+ call_count_++;
+
+ EXPECT_EQ(info.blob_compression_type, kNoCompression);
+
+ CheckBlobFileAdditions(info.blob_file_addition_infos);
+
+ EXPECT_FALSE(info.blob_file_garbage_infos.empty());
+
+ for (const auto& blob_file_garbage_info : info.blob_file_garbage_infos) {
+ EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U);
+ EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U);
+ EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U);
+ EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty());
+ }
+ }
+
+ EventListenerTest* test_;
+ uint32_t call_count_;
+
+ private:
+ std::vector<std::string> flushed_files_;
+ std::mutex mutex_;
+};
+
+// Test OnFlushCompleted EventListener called for blob files
+TEST_F(EventListenerTest, BlobDBOnFlushCompleted) {
+ Options options;
+ options.env = CurrentOptions().env;
+ options.enable_blob_files = true;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+
+ options.min_blob_size = 0;
+ BlobDBJobLevelEventListenerTest* blob_event_listener =
+ new BlobDBJobLevelEventListenerTest(this);
+ options.listeners.emplace_back(blob_event_listener);
+
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("Key1", "blob_value1"));
+ ASSERT_OK(Put("Key2", "blob_value2"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key3", "blob_value3"));
+ ASSERT_OK(Flush());
+
+ ASSERT_EQ(Get("Key1"), "blob_value1");
+ ASSERT_EQ(Get("Key2"), "blob_value2");
+ ASSERT_EQ(Get("Key3"), "blob_value3");
+
+ ASSERT_GT(blob_event_listener->call_count_, 0U);
+}
+
+// Test OnCompactionCompleted EventListener called for blob files
+TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) {
+ Options options;
+ options.env = CurrentOptions().env;
+ options.enable_blob_files = true;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+ options.min_blob_size = 0;
+ BlobDBJobLevelEventListenerTest* blob_event_listener =
+ new BlobDBJobLevelEventListenerTest(this);
+ options.listeners.emplace_back(blob_event_listener);
+
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0.5;
+
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("Key1", "blob_value1"));
+ ASSERT_OK(Put("Key2", "blob_value2"));
+ ASSERT_OK(Put("Key3", "blob_value3"));
+ ASSERT_OK(Put("Key4", "blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key3", "new_blob_value3"));
+ ASSERT_OK(Put("Key4", "new_blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key5", "blob_value5"));
+ ASSERT_OK(Put("Key6", "blob_value6"));
+ ASSERT_OK(Flush());
+
+ blob_event_listener->call_count_ = 0;
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+
+ // On compaction, because of blob_garbage_collection_age_cutoff, it will
+ // delete the oldest blob file and create new blob file during compaction.
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+
+ // Make sure, OnCompactionCompleted is called.
+ ASSERT_GT(blob_event_listener->call_count_, 0U);
+}
+
+// Test CompactFiles calls OnCompactionCompleted EventListener for blob files
+// and populate the blob files info.
+TEST_F(EventListenerTest, BlobDBCompactFiles) {
+ Options options;
+ options.env = CurrentOptions().env;
+ options.enable_blob_files = true;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+ options.min_blob_size = 0;
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0.5;
+
+ BlobDBJobLevelEventListenerTest* blob_event_listener =
+ new BlobDBJobLevelEventListenerTest(this);
+ options.listeners.emplace_back(blob_event_listener);
+
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("Key1", "blob_value1"));
+ ASSERT_OK(Put("Key2", "blob_value2"));
+ ASSERT_OK(Put("Key3", "blob_value3"));
+ ASSERT_OK(Put("Key4", "blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key3", "new_blob_value3"));
+ ASSERT_OK(Put("Key4", "new_blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key5", "blob_value5"));
+ ASSERT_OK(Put("Key6", "blob_value6"));
+ ASSERT_OK(Flush());
+
+ std::vector<std::string> output_file_names;
+ CompactionJobInfo compaction_job_info;
+
+ // On compaction, because of blob_garbage_collection_age_cutoff, it will
+ // delete the oldest blob file and create new blob file during compaction
+ // which will be populated in output_files_names.
+ ASSERT_OK(dbfull()->CompactFiles(
+ CompactionOptions(), blob_event_listener->GetFlushedFiles(), 1, -1,
+ &output_file_names, &compaction_job_info));
+
+ bool is_blob_in_output = false;
+ for (const auto& file : output_file_names) {
+ if (EndsWith(file, ".blob")) {
+ is_blob_in_output = true;
+ }
+ }
+ ASSERT_TRUE(is_blob_in_output);
+
+ for (const auto& blob_file_addition_info :
+ compaction_job_info.blob_file_addition_infos) {
+ EXPECT_GT(blob_file_addition_info.blob_file_number, 0U);
+ EXPECT_GT(blob_file_addition_info.total_blob_bytes, 0U);
+ EXPECT_GT(blob_file_addition_info.total_blob_count, 0U);
+ EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
+ }
+
+ for (const auto& blob_file_garbage_info :
+ compaction_job_info.blob_file_garbage_infos) {
+ EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U);
+ EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U);
+ EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U);
+ EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty());
+ }
+}
+
+class BlobDBFileLevelEventListener : public EventListener {
+ public:
+ void OnBlobFileCreationStarted(
+ const BlobFileCreationBriefInfo& info) override {
+ files_started_++;
+ EXPECT_FALSE(info.db_name.empty());
+ EXPECT_FALSE(info.cf_name.empty());
+ EXPECT_FALSE(info.file_path.empty());
+ EXPECT_GT(info.job_id, 0);
+ }
+
+ void OnBlobFileCreated(const BlobFileCreationInfo& info) override {
+ files_created_++;
+ EXPECT_FALSE(info.db_name.empty());
+ EXPECT_FALSE(info.cf_name.empty());
+ EXPECT_FALSE(info.file_path.empty());
+ EXPECT_GT(info.job_id, 0);
+ EXPECT_GT(info.total_blob_count, 0U);
+ EXPECT_GT(info.total_blob_bytes, 0U);
+ EXPECT_EQ(info.file_checksum, kUnknownFileChecksum);
+ EXPECT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
+ EXPECT_TRUE(info.status.ok());
+ }
+
+ void OnBlobFileDeleted(const BlobFileDeletionInfo& info) override {
+ files_deleted_++;
+ EXPECT_FALSE(info.db_name.empty());
+ EXPECT_FALSE(info.file_path.empty());
+ EXPECT_GT(info.job_id, 0);
+ EXPECT_TRUE(info.status.ok());
+ }
+
+ void CheckCounters() {
+ EXPECT_EQ(files_started_, files_created_);
+ EXPECT_GT(files_started_, 0U);
+ EXPECT_GT(files_deleted_, 0U);
+ EXPECT_LT(files_deleted_, files_created_);
+ }
+
+ private:
+ std::atomic<uint32_t> files_started_{};
+ std::atomic<uint32_t> files_created_{};
+ std::atomic<uint32_t> files_deleted_{};
+};
+
+TEST_F(EventListenerTest, BlobDBFileTest) {
+ Options options;
+ options.env = CurrentOptions().env;
+ options.enable_blob_files = true;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+ options.min_blob_size = 0;
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0.5;
+
+ BlobDBFileLevelEventListener* blob_event_listener =
+ new BlobDBFileLevelEventListener();
+ options.listeners.emplace_back(blob_event_listener);
+
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("Key1", "blob_value1"));
+ ASSERT_OK(Put("Key2", "blob_value2"));
+ ASSERT_OK(Put("Key3", "blob_value3"));
+ ASSERT_OK(Put("Key4", "blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key3", "new_blob_value3"));
+ ASSERT_OK(Put("Key4", "new_blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key5", "blob_value5"));
+ ASSERT_OK(Put("Key6", "blob_value6"));
+ ASSERT_OK(Flush());
+
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+
+ // On compaction, because of blob_garbage_collection_age_cutoff, it will
+ // delete the oldest blob file and create new blob file during compaction.
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ blob_event_listener->CheckCounters();
+}
+
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}