class FlushedFileCollector : public EventListener {
public:
FlushedFileCollector() {}
- ~FlushedFileCollector() {}
+ ~FlushedFileCollector() override {}
- virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
+ void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.push_back(info.file_path);
}
env_->time_elapse_only_sleep_ = true;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
+ // Need to disable stats dumping and persisting which also use
+ // RepeatableThread, one of whose member variables is of type
+ // InstrumentedCondVar. The callback for
+ // InstrumentedCondVar::TimedWaitInternal can be triggered by stats dumping
+ // and persisting threads and cause time_spent_deleting measurement to become
+ // incorrect.
+ options.stats_dump_period_sec = 0;
+ options.stats_persist_period_sec = 0;
options.env = env_;
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
+ WriteOptions wo;
+ wo.disableWAL = true;
ASSERT_OK(TryReopen(options));
// Create 4 files in L0
for (char v = 'a'; v <= 'd'; v++) {
- ASSERT_OK(Put("Key2", DummyString(1024, v)));
- ASSERT_OK(Put("Key3", DummyString(1024, v)));
- ASSERT_OK(Put("Key4", DummyString(1024, v)));
- ASSERT_OK(Put("Key1", DummyString(1024, v)));
- ASSERT_OK(Put("Key4", DummyString(1024, v)));
+ ASSERT_OK(Put("Key2", DummyString(1024, v), wo));
+ ASSERT_OK(Put("Key3", DummyString(1024, v), wo));
+ ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
+ ASSERT_OK(Put("Key1", DummyString(1024, v), wo));
+ ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
ASSERT_OK(Flush());
}
// We created 4 sst files in L0
// Compaction will move the 4 files in L0 to trash and create 1 L1 file
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
ASSERT_EQ("0,1", FilesPerLevel(0));
uint64_t delete_start_time = env_->NowMicros();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
+TEST_F(DBSSTTest, RateLimitedWALDelete) {
+ Destroy(last_options_);
+
+ std::vector<uint64_t> penalties;
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::BackgroundEmptyTrash:Wait",
+ [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
+
+ env_->no_slowdown_ = true;
+ env_->time_elapse_only_sleep_ = true;
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.env = env_;
+
+ int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
+ Status s;
+ options.sst_file_manager.reset(
+ NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
+ ASSERT_OK(s);
+ options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
+ auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
+ sfm->delete_scheduler()->SetMaxTrashDBRatio(2.1);
+
+ ASSERT_OK(TryReopen(options));
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Create 4 files in L0
+ for (char v = 'a'; v <= 'd'; v++) {
+ ASSERT_OK(Put("Key2", DummyString(1024, v)));
+ ASSERT_OK(Put("Key3", DummyString(1024, v)));
+ ASSERT_OK(Put("Key4", DummyString(1024, v)));
+ ASSERT_OK(Put("Key1", DummyString(1024, v)));
+ ASSERT_OK(Put("Key4", DummyString(1024, v)));
+ ASSERT_OK(Flush());
+ }
+ // We created 4 sst files in L0
+ ASSERT_EQ("4", FilesPerLevel(0));
+
+ // Compaction will move the 4 files in L0 to trash and create 1 L1 file
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+ ASSERT_EQ("0,1", FilesPerLevel(0));
+
+ sfm->WaitForEmptyTrash();
+ ASSERT_EQ(penalties.size(), 8);
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
TEST_F(DBSSTTest, OpenDBWithExistingTrash) {
Options options = CurrentOptions();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteFile",
[&](void* /*arg*/) { bg_delete_file++; });
- rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.disable_auto_compactions = true;
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
DestroyAndReopen(options);
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ WriteOptions wo;
+ wo.disableWAL = true;
// Create 4 files in L0
for (int i = 0; i < 4; i++) {
- ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A')));
+ ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A'), wo));
ASSERT_OK(Flush());
}
// We created 4 sst files in L0
// Create 4 files in L0
for (int i = 4; i < 8; i++) {
- ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B')));
+ ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B'), wo));
ASSERT_OK(Flush());
}
ASSERT_EQ("4,1", FilesPerLevel(0));
// Close DB and destroy it using DeleteScheduler
Close();
+ int num_sst_files = 0;
+ int num_wal_files = 0;
+ std::vector<std::string> db_files;
+ env_->GetChildren(dbname_, &db_files);
+ for (std::string f : db_files) {
+ if (f.substr(f.find_last_of(".") + 1) == "sst") {
+ num_sst_files++;
+ } else if (f.substr(f.find_last_of(".") + 1) == "log") {
+ num_wal_files++;
+ }
+ }
+ ASSERT_GT(num_sst_files, 0);
+ ASSERT_GT(num_wal_files, 0);
+
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->SetDeleteRateBytesPerSecond(1024 * 1024);
sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
ASSERT_OK(DestroyDB(dbname_, options));
sfm->WaitForEmptyTrash();
- // We have deleted the 4 sst files in the delete_scheduler
- ASSERT_EQ(bg_delete_file, 4);
+ ASSERT_EQ(bg_delete_file, num_sst_files + num_wal_files);
}
TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {