options.compression = kNoCompression;
DB* db = nullptr;
- DestroyDB(db_name_, options);
+ ASSERT_OK(DestroyDB(db_name_, options));
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);
// create couple files
// Background compaction starts and waits in BackgroundCallCompaction:0
for (int i = 0; i < kLevel0Trigger * 4; ++i) {
- db->Put(WriteOptions(), ToString(i), "");
- db->Put(WriteOptions(), ToString(100 - i), "");
- db->Flush(FlushOptions());
+ ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), ""));
+ ASSERT_OK(db->Put(WriteOptions(), std::to_string(100 - i), ""));
+ ASSERT_OK(db->Flush(FlushOptions()));
}
ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
delete db;
}
+TEST_F(CompactFilesTest, MultipleLevel) {
+ Options options;
+ options.create_if_missing = true;
+ options.level_compaction_dynamic_level_bytes = true;
+ options.num_levels = 6;
+ // Add listener
+ FlushedFileCollector* collector = new FlushedFileCollector();
+ options.listeners.emplace_back(collector);
+
+ DB* db = nullptr;
+ ASSERT_OK(DestroyDB(db_name_, options));
+ Status s = DB::Open(options, db_name_, &db);
+ ASSERT_OK(s);
+ ASSERT_NE(db, nullptr);
+
+ // create couple files in L0, L3, L4 and L5
+ for (int i = 5; i > 2; --i) {
+ collector->ClearFlushedFiles();
+ ASSERT_OK(db->Put(WriteOptions(), std::to_string(i), ""));
+ ASSERT_OK(db->Flush(FlushOptions()));
+ // Ensure background work is fully finished including listener callbacks
+ // before accessing listener state.
+ ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
+ auto l0_files = collector->GetFlushedFiles();
+ ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, i));
+
+ std::string prop;
+ ASSERT_TRUE(db->GetProperty(
+ "rocksdb.num-files-at-level" + std::to_string(i), &prop));
+ ASSERT_EQ("1", prop);
+ }
+ ASSERT_OK(db->Put(WriteOptions(), std::to_string(0), ""));
+ ASSERT_OK(db->Flush(FlushOptions()));
+
+ ColumnFamilyMetaData meta;
+ db->GetColumnFamilyMetaData(&meta);
+ // Compact files except the file in L3
+ std::vector<std::string> files;
+ for (int i = 0; i < 6; ++i) {
+ if (i == 3) continue;
+ for (auto& file : meta.levels[i].files) {
+ files.push_back(file.db_path + "/" + file.name);
+ }
+ }
+
+ SyncPoint::GetInstance()->LoadDependency({
+ {"CompactionJob::Run():Start", "CompactFilesTest.MultipleLevel:0"},
+ {"CompactFilesTest.MultipleLevel:1", "CompactFilesImpl:3"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ std::thread thread([&] {
+ TEST_SYNC_POINT("CompactFilesTest.MultipleLevel:0");
+ ASSERT_OK(db->Put(WriteOptions(), "bar", "v2"));
+ ASSERT_OK(db->Put(WriteOptions(), "foo", "v2"));
+ ASSERT_OK(db->Flush(FlushOptions()));
+ TEST_SYNC_POINT("CompactFilesTest.MultipleLevel:1");
+ });
+
+ // Compaction cannot move up the data to higher level
+ // here we have input file from level 5, so the output level has to be >= 5
+ for (int invalid_output_level = 0; invalid_output_level < 5;
+ invalid_output_level++) {
+ s = db->CompactFiles(CompactionOptions(), files, invalid_output_level);
+ std::cout << s.ToString() << std::endl;
+ ASSERT_TRUE(s.IsInvalidArgument());
+ }
+
+ ASSERT_OK(db->CompactFiles(CompactionOptions(), files, 5));
+ SyncPoint::GetInstance()->DisableProcessing();
+ thread.join();
+
+ delete db;
+}
+
TEST_F(CompactFilesTest, ObsoleteFiles) {
Options options;
// to trigger compaction more easily
options.listeners.emplace_back(collector);
DB* db = nullptr;
- DestroyDB(db_name_, options);
+ ASSERT_OK(DestroyDB(db_name_, options));
Status s = DB::Open(options, db_name_, &db);
- assert(s.ok());
- assert(db);
+ ASSERT_OK(s);
+ ASSERT_NE(db, nullptr);
// create couple files
for (int i = 1000; i < 2000; ++i) {
- db->Put(WriteOptions(), ToString(i),
- std::string(kWriteBufferSize / 10, 'a' + (i % 26)));
+ ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
+ std::string(kWriteBufferSize / 10, 'a' + (i % 26))));
}
auto l0_files = collector->GetFlushedFiles();
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));
- static_cast_with_check<DBImpl>(db)->TEST_WaitForCompact();
+ ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForCompact());
// verify all compaction input files are deleted
for (auto fname : l0_files) {
options.listeners.emplace_back(collector);
DB* db = nullptr;
- DestroyDB(db_name_, options);
+ ASSERT_OK(DestroyDB(db_name_, options));
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);
// create couple files
for (int i = 0; i < 500; ++i) {
- db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
+ ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
+ std::string(1000, 'a' + (i % 26))));
}
- static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable();
+ ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable());
auto l0_files_1 = collector->GetFlushedFiles();
collector->ClearFlushedFiles();
for (int i = 0; i < 500; ++i) {
- db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
+ ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
+ std::string(1000, 'a' + (i % 26))));
}
- static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable();
+ ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable());
auto l0_files_2 = collector->GetFlushedFiles();
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));
options.listeners.emplace_back(collector);
DB* db = nullptr;
- DestroyDB(db_name_, options);
+ ASSERT_OK(DestroyDB(db_name_, options));
Status s = DB::Open(options, db_name_, &db);
- assert(s.ok());
+ ASSERT_OK(s);
assert(db);
// Create 5 files.
for (int i = 0; i < 5; ++i) {
- db->Put(WriteOptions(), "key" + ToString(i), "value");
- db->Flush(FlushOptions());
+ ASSERT_OK(db->Put(WriteOptions(), "key" + std::to_string(i), "value"));
+ ASSERT_OK(db->Flush(FlushOptions()));
}
+ // Ensure background work is fully finished including listener callbacks
+ // before accessing listener state.
+ ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
auto l0_files = collector->GetFlushedFiles();
EXPECT_EQ(5, l0_files.size());
// In the meantime flush another file.
TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:0");
- db->Put(WriteOptions(), "key5", "value");
- db->Flush(FlushOptions());
+ ASSERT_OK(db->Put(WriteOptions(), "key5", "value"));
+ ASSERT_OK(db->Flush(FlushOptions()));
TEST_SYNC_POINT("CompactFilesTest.CapturingPendingFiles:1");
compaction_thread.join();
// Make sure we can reopen the DB.
s = DB::Open(options, db_name_, &db);
- ASSERT_TRUE(s.ok());
+ ASSERT_OK(s);
assert(db);
delete db;
}
return true;
}
- void SetDB(DB* db) {
- db_ = db;
- }
+ void SetDB(DB* db) { db_ = db; }
const char* Name() const override { return "FilterWithGet"; }
DB* db_;
};
-
std::shared_ptr<FilterWithGet> cf(new FilterWithGet());
Options options;
options.compaction_filter = cf.get();
DB* db = nullptr;
- DestroyDB(db_name_, options);
+ ASSERT_OK(DestroyDB(db_name_, options));
Status s = DB::Open(options, db_name_, &db);
ASSERT_OK(s);
cf->SetDB(db);
// Write one L0 file
- db->Put(WriteOptions(), "K1", "V1");
- db->Flush(FlushOptions());
+ ASSERT_OK(db->Put(WriteOptions(), "K1", "V1"));
+ ASSERT_OK(db->Flush(FlushOptions()));
// Compact all L0 files using CompactFiles
ROCKSDB_NAMESPACE::ColumnFamilyMetaData meta;
db->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), {fname}, 0));
}
-
delete db;
}
}
// Check that passing `CompressionType::kDisableCompressionOption` to
// `CompactFiles` causes it to use the column family compression options.
- for (auto compaction_style :
- {CompactionStyle::kCompactionStyleLevel,
- CompactionStyle::kCompactionStyleUniversal,
- CompactionStyle::kCompactionStyleNone}) {
- DestroyDB(db_name_, Options());
+ for (auto compaction_style : {CompactionStyle::kCompactionStyleLevel,
+ CompactionStyle::kCompactionStyleUniversal,
+ CompactionStyle::kCompactionStyleNone}) {
+ ASSERT_OK(DestroyDB(db_name_, Options()));
Options options;
options.compaction_style = compaction_style;
// L0: Snappy, L1: ZSTD, L2: Snappy
DB* db = nullptr;
ASSERT_OK(DB::Open(options, db_name_, &db));
- db->Put(WriteOptions(), "key", "val");
- db->Flush(FlushOptions());
+ ASSERT_OK(db->Put(WriteOptions(), "key", "val"));
+ ASSERT_OK(db->Flush(FlushOptions()));
+ // Ensure background work is fully finished including listener callbacks
+ // before accessing listener state.
+ ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
auto l0_files = collector->GetFlushedFiles();
ASSERT_EQ(1, l0_files.size());
options.listeners.emplace_back(collector);
DB* db = nullptr;
- DestroyDB(db_name_, options);
+ ASSERT_OK(DestroyDB(db_name_, options));
Status s = DB::Open(options, db_name_, &db);
- assert(s.ok());
+ ASSERT_OK(s);
assert(db);
// create couple files
for (int i = 0; i < 500; ++i) {
- db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
+ ASSERT_OK(db->Put(WriteOptions(), std::to_string(i),
+ std::string(1000, 'a' + (i % 26))));
}
- static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable();
+ ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForFlushMemTable());
auto l0_files_1 = collector->GetFlushedFiles();
CompactionOptions co;
co.compression = CompressionType::kLZ4Compression;
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}