// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h"
+#include "file/sst_file_manager_impl.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/sst_file_manager.h"
-#include "util/sst_file_manager_impl.h"
+#include "util/random.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
class DBSSTTest : public DBTestBase {
public:
- DBSSTTest() : DBTestBase("/db_sst_test") {}
+ DBSSTTest() : DBTestBase("/db_sst_test", /*env_do_fsync=*/true) {}
};
+#ifndef ROCKSDB_LITE
+// A class which remembers the name of each flushed file.
+class FlushedFileCollector : public EventListener {
+ public:
+ FlushedFileCollector() {}
+ ~FlushedFileCollector() override {}
+
+ void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
+ std::lock_guard<std::mutex> lock(mutex_);
+ flushed_files_.push_back(info.file_path);
+ }
+
+ std::vector<std::string> GetFlushedFiles() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ std::vector<std::string> result;
+ for (auto fname : flushed_files_) {
+ result.push_back(fname);
+ }
+ return result;
+ }
+ void ClearFlushedFiles() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ flushed_files_.clear();
+ }
+
+ private:
+ std::vector<std::string> flushed_files_;
+ std::mutex mutex_;
+};
+#endif // ROCKSDB_LITE
+
TEST_F(DBSSTTest, DontDeletePendingOutputs) {
Options options;
options.env = env_;
int const num_files = GetSstFileCount(dbname_);
ASSERT_GT(num_files, 0);
+ Reopen(options);
+ std::vector<std::string> values;
+ values.reserve(key_id);
+ for (int k = 0; k < key_id; ++k) {
+ values.push_back(Get(Key(k)));
+ }
+ Close();
+
std::vector<std::string> filenames;
- GetSstFiles(dbname_, &filenames);
+ GetSstFiles(env_, dbname_, &filenames);
int num_ldb_files = 0;
for (size_t i = 0; i < filenames.size(); ++i) {
if (i & 1) {
Reopen(options);
for (int k = 0; k < key_id; ++k) {
- ASSERT_NE("NOT_FOUND", Get(Key(k)));
+ ASSERT_EQ(values[k], Get(Key(k)));
}
Destroy(options);
}
+// Check that we don't crash when opening DB with
+// DBOptions::skip_checking_sst_file_sizes_on_db_open = true.
+TEST_F(DBSSTTest, SkipCheckingSSTFileSizesOnDBOpen) {
+ ASSERT_OK(Put("pika", "choo"));
+ ASSERT_OK(Flush());
+
+ // Just open the DB with the option set to true and check that we don't crash.
+ Options options;
+ options.env = env_;
+ options.skip_checking_sst_file_sizes_on_db_open = true;
+ Reopen(options);
+
+ ASSERT_EQ("choo", Get("pika"));
+}
+
#ifndef ROCKSDB_LITE
TEST_F(DBSSTTest, DontDeleteMovedFile) {
// This test triggers move compaction and verifies that the file is not
for (int i = 0; i < 2; ++i) {
// Create 1MB sst file
for (int j = 0; j < 100; ++j) {
- ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
+ ASSERT_OK(Put(Key(i * 50 + j), rnd.RandomString(10 * 1024)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 2; ++i) {
// Create 1MB sst file
for (int j = 0; j < 100; ++j) {
- ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
+ ASSERT_OK(Put(Key(i * 50 + j), rnd.RandomString(10 * 1024)));
}
ASSERT_OK(Flush());
}
// write_buffer_size. The flush will be blocked with block_first_time
// pending_file is protecting all the files created after
for (int j = 0; j < 256; ++j) {
- ASSERT_OK(Put(Key(j), RandomString(&rnd, 10 * 1024)));
+ ASSERT_OK(Put(Key(j), rnd.RandomString(10 * 1024)));
}
blocking_thread.WaitUntilSleeping();
int files_added = 0;
int files_deleted = 0;
int files_moved = 0;
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
- "SstFileManagerImpl::OnAddFile", [&](void* arg) { files_added++; });
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
- "SstFileManagerImpl::OnDeleteFile", [&](void* arg) { files_deleted++; });
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
- "SstFileManagerImpl::OnMoveFile", [&](void* arg) { files_moved++; });
- rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::OnAddFile", [&](void* /*arg*/) { files_added++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::OnDeleteFile",
+ [&](void* /*arg*/) { files_deleted++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
// Verify that we are tracking all sst files in dbname_
- ASSERT_EQ(sfm->GetTrackedFiles(), GetAllSSTFiles());
+ std::unordered_map<std::string, uint64_t> files_in_db;
+ ASSERT_OK(GetAllSSTFiles(&files_in_db));
+ ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
}
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
- auto files_in_db = GetAllSSTFiles();
+ std::unordered_map<std::string, uint64_t> files_in_db;
+ ASSERT_OK(GetAllSSTFiles(&files_in_db));
// Verify that we are tracking all sst files in dbname_
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
// Verify the total files size
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
- rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, RateLimitedDelete) {
Destroy(last_options_);
- rocksdb::SyncPoint::GetInstance()->LoadDependency({
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBSSTTest::RateLimitedDelete:1",
"DeleteScheduler::BackgroundEmptyTrash"},
});
std::vector<uint64_t> penalties;
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::BackgroundEmptyTrash:Wait",
[&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
// Turn timed wait into a simulated sleep
uint64_t* abs_time_us = static_cast<uint64_t*>(arg);
- int64_t cur_time = 0;
- env_->GetCurrentTime(&cur_time);
- if (*abs_time_us > static_cast<uint64_t>(cur_time)) {
- env_->addon_time_.fetch_add(*abs_time_us -
- static_cast<uint64_t>(cur_time));
+ uint64_t cur_time = env_->NowMicros();
+ if (*abs_time_us > cur_time) {
+ env_->MockSleepForMicroseconds(*abs_time_us - cur_time);
}
- // Randomly sleep shortly
- env_->addon_time_.fetch_add(
- static_cast<uint64_t>(Random::GetTLSInstance()->Uniform(10)));
+ // Plus an additional short, random amount
+ env_->MockSleepForMicroseconds(Random::GetTLSInstance()->Uniform(10));
- // Set wait until time to before current to force not to sleep.
- int64_t real_cur_time = 0;
- Env::Default()->GetCurrentTime(&real_cur_time);
- *abs_time_us = static_cast<uint64_t>(real_cur_time);
+ // Set wait until time to before (actual) current time to force not
+ // to sleep
+ *abs_time_us = Env::Default()->NowMicros();
});
- rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- env_->no_slowdown_ = true;
- env_->time_elapse_only_sleep_ = true;
Options options = CurrentOptions();
+ SetTimeElapseOnlySleepOnReopen(&options);
options.disable_auto_compactions = true;
options.env = env_;
+ options.statistics = CreateDBStatistics();
- std::string trash_dir = test::TmpDir(env_) + "/trash";
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
Status s;
options.sst_file_manager.reset(
- NewSstFileManager(env_, nullptr, trash_dir, 0, false, &s));
+ NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
ASSERT_OK(s);
options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
+ sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
+ WriteOptions wo;
+ wo.disableWAL = true;
ASSERT_OK(TryReopen(options));
// Create 4 files in L0
for (char v = 'a'; v <= 'd'; v++) {
- ASSERT_OK(Put("Key2", DummyString(1024, v)));
- ASSERT_OK(Put("Key3", DummyString(1024, v)));
- ASSERT_OK(Put("Key4", DummyString(1024, v)));
- ASSERT_OK(Put("Key1", DummyString(1024, v)));
- ASSERT_OK(Put("Key4", DummyString(1024, v)));
+ ASSERT_OK(Put("Key2", DummyString(1024, v), wo));
+ ASSERT_OK(Put("Key3", DummyString(1024, v), wo));
+ ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
+ ASSERT_OK(Put("Key1", DummyString(1024, v), wo));
+ ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
ASSERT_OK(Flush());
}
// We created 4 sst files in L0
// Compaction will move the 4 files in L0 to trash and create 1 L1 file
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
ASSERT_EQ("0,1", FilesPerLevel(0));
uint64_t delete_start_time = env_->NowMicros();
}
ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
ASSERT_LT(time_spent_deleting, expected_penlty * 1.1);
+ ASSERT_EQ(4, options.statistics->getAndResetTickerCount(FILES_MARKED_TRASH));
+ ASSERT_EQ(
+ 0, options.statistics->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBSSTTest, RateLimitedWALDelete) {
+ Destroy(last_options_);
+
+ std::vector<uint64_t> penalties;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::BackgroundEmptyTrash:Wait",
+ [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
+
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.compression = kNoCompression;
+ options.env = env_;
+
+ int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
+ Status s;
+ options.sst_file_manager.reset(
+ NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
+ ASSERT_OK(s);
+ options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
+ auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
+ sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
+ SetTimeElapseOnlySleepOnReopen(&options);
+
+ ASSERT_OK(TryReopen(options));
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Create 4 files in L0
+ for (char v = 'a'; v <= 'd'; v++) {
+ ASSERT_OK(Put("Key2", DummyString(1024, v)));
+ ASSERT_OK(Put("Key3", DummyString(1024, v)));
+ ASSERT_OK(Put("Key4", DummyString(1024, v)));
+ ASSERT_OK(Put("Key1", DummyString(1024, v)));
+ ASSERT_OK(Put("Key4", DummyString(1024, v)));
+ ASSERT_OK(Flush());
+ }
+ // We created 4 sst files in L0
+ ASSERT_EQ("4", FilesPerLevel(0));
- rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ // Compaction will move the 4 files in L0 to trash and create 1 L1 file
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+ ASSERT_EQ("0,1", FilesPerLevel(0));
+
+ sfm->WaitForEmptyTrash();
+ ASSERT_EQ(penalties.size(), 8);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
+class DBWALTestWithParam
+ : public DBSSTTest,
+ public testing::WithParamInterface<std::tuple<std::string, bool>> {
+ public:
+ DBWALTestWithParam() {
+ wal_dir_ = std::get<0>(GetParam());
+ wal_dir_same_as_dbname_ = std::get<1>(GetParam());
+ }
+
+ std::string wal_dir_;
+ bool wal_dir_same_as_dbname_;
+};
+
+TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
+ class MyEnv : public EnvWrapper {
+ public:
+ MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}
+
+ Status DeleteFile(const std::string& fname) {
+ if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
+ return Status::OK();
+ }
+
+ return target()->DeleteFile(fname);
+ }
+
+ void set_fake_log_delete(bool fake) { fake_log_delete = fake; }
+
+ private:
+ bool fake_log_delete;
+ };
+
+ std::unique_ptr<MyEnv> env(new MyEnv(env_));
+ Destroy(last_options_);
+
+ env->set_fake_log_delete(true);
+
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.compression = kNoCompression;
+ options.env = env.get();
+ options.wal_dir = dbname_ + wal_dir_;
+
+ int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
+ Status s;
+ options.sst_file_manager.reset(
+ NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
+ ASSERT_OK(s);
+ options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
+ auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
+ sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
+
+ ASSERT_OK(TryReopen(options));
+
+ // Create 4 files in L0
+ for (char v = 'a'; v <= 'd'; v++) {
+ ASSERT_OK(Put("Key2", DummyString(1024, v)));
+ ASSERT_OK(Put("Key3", DummyString(1024, v)));
+ ASSERT_OK(Put("Key4", DummyString(1024, v)));
+ ASSERT_OK(Put("Key1", DummyString(1024, v)));
+ ASSERT_OK(Put("Key4", DummyString(1024, v)));
+ ASSERT_OK(Flush());
+ }
+ // We created 4 sst files in L0
+ ASSERT_EQ("4", FilesPerLevel(0));
+
+ Close();
+
+ options.sst_file_manager.reset();
+ std::vector<std::string> filenames;
+ int trash_log_count = 0;
+ if (!wal_dir_same_as_dbname_) {
+ // Forcibly create some trash log files
+ std::unique_ptr<WritableFile> result;
+ env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
+ EnvOptions());
+ result.reset();
+ }
+ env->GetChildren(options.wal_dir, &filenames);
+ for (const std::string& fname : filenames) {
+ if (fname.find(".log.trash") != std::string::npos) {
+ trash_log_count++;
+ }
+ }
+ ASSERT_GE(trash_log_count, 1);
+
+ env->set_fake_log_delete(false);
+ ASSERT_OK(TryReopen(options));
+
+ filenames.clear();
+ trash_log_count = 0;
+ env->GetChildren(options.wal_dir, &filenames);
+ for (const std::string& fname : filenames) {
+ if (fname.find(".log.trash") != std::string::npos) {
+ trash_log_count++;
+ }
+ }
+ ASSERT_EQ(trash_log_count, 0);
+ Close();
+}
+
+INSTANTIATE_TEST_CASE_P(DBWALTestWithParam, DBWALTestWithParam,
+ ::testing::Values(std::make_tuple("", true),
+ std::make_tuple("_wal_dir", false)));
+
+TEST_F(DBSSTTest, OpenDBWithExistingTrash) {
+ Options options = CurrentOptions();
+
+ options.sst_file_manager.reset(
+ NewSstFileManager(env_, nullptr, "", 1024 * 1024 /* 1 MB/sec */));
+ auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
+
+ Destroy(last_options_);
+
+ // Add some trash files to the db directory so the DB can clean them up
+ env_->CreateDirIfMissing(dbname_);
+ ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "001.sst.trash"));
+ ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "002.sst.trash"));
+ ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "003.sst.trash"));
+
+ // Reopen the DB and verify that it deletes existing trash files
+ ASSERT_OK(TryReopen(options));
+ sfm->WaitForEmptyTrash();
+ ASSERT_NOK(env_->FileExists(dbname_ + "/" + "001.sst.trash"));
+ ASSERT_NOK(env_->FileExists(dbname_ + "/" + "002.sst.trash"));
+ ASSERT_NOK(env_->FileExists(dbname_ + "/" + "003.sst.trash"));
+}
+
+
// Create a DB with 2 db_paths, and generate multiple files in the 2
// db_paths using CompactRangeOptions, make sure that files that were
// deleted from first db_path were deleted using DeleteScheduler and
// files in the second path were not.
TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
- int bg_delete_file = 0;
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ std::atomic<int> bg_delete_file(0);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteTrashFile:DeleteFile",
- [&](void* arg) { bg_delete_file++; });
- rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ [&](void* /*arg*/) { bg_delete_file++; });
+ // The deletion scheduler sometimes skips marking file as trash according to
+ // a heuristic. In that case the deletion will go through the below SyncPoint.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteFile", [&](void* /*arg*/) { bg_delete_file++; });
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.db_paths.emplace_back(dbname_ + "_2", 1024 * 100);
options.env = env_;
- std::string trash_dir = test::TmpDir(env_) + "/trash";
int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec
Status s;
- options.sst_file_manager.reset(NewSstFileManager(
- env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
+ options.sst_file_manager.reset(
+ NewSstFileManager(env_, nullptr, "", rate_bytes_per_sec, false, &s,
+ /* max_trash_db_ratio= */ 1.1));
+
ASSERT_OK(s);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
DestroyAndReopen(options);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ WriteOptions wo;
+ wo.disableWAL = true;
// Create 4 files in L0
for (int i = 0; i < 4; i++) {
- ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A')));
+ ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A'), wo));
ASSERT_OK(Flush());
}
// We created 4 sst files in L0
// Create 4 files in L0
for (int i = 4; i < 8; i++) {
- ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B')));
+ ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B'), wo));
ASSERT_OK(Flush());
}
ASSERT_EQ("4,1", FilesPerLevel(0));
sfm->WaitForEmptyTrash();
ASSERT_EQ(bg_delete_file, 8);
+ // Compaction will delete both files and regenerate a file in L1 in second
+ // db path. The deleted files should still be cleaned up via delete scheduler.
compact_options.bottommost_level_compaction =
- BottommostLevelCompaction::kForce;
+ BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel(0));
sfm->WaitForEmptyTrash();
- ASSERT_EQ(bg_delete_file, 8);
+ ASSERT_EQ(bg_delete_file, 10);
- rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, DestroyDBWithRateLimitedDelete) {
int bg_delete_file = 0;
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteTrashFile:DeleteFile",
- [&](void* arg) { bg_delete_file++; });
- rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ [&](void* /*arg*/) { bg_delete_file++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ Status s;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.env = env_;
+ options.sst_file_manager.reset(
+ NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
+ ASSERT_OK(s);
DestroyAndReopen(options);
// Create 4 files in L0
// Close DB and destroy it using DeleteScheduler
Close();
- std::string trash_dir = test::TmpDir(env_) + "/trash";
- int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec
- Status s;
- options.sst_file_manager.reset(NewSstFileManager(
- env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
- ASSERT_OK(s);
- ASSERT_OK(DestroyDB(dbname_, options));
+
+ int num_sst_files = 0;
+ int num_wal_files = 0;
+ std::vector<std::string> db_files;
+ env_->GetChildren(dbname_, &db_files);
+ for (std::string f : db_files) {
+ if (f.substr(f.find_last_of(".") + 1) == "sst") {
+ num_sst_files++;
+ } else if (f.substr(f.find_last_of(".") + 1) == "log") {
+ num_wal_files++;
+ }
+ }
+ ASSERT_GT(num_sst_files, 0);
+ ASSERT_GT(num_wal_files, 0);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
+
+ sfm->SetDeleteRateBytesPerSecond(1024 * 1024);
+ sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
+ ASSERT_OK(DestroyDB(dbname_, options));
sfm->WaitForEmptyTrash();
- // We have deleted the 4 sst files in the delete_scheduler
- ASSERT_EQ(bg_delete_file, 4);
+ ASSERT_EQ(bg_delete_file, num_sst_files + num_wal_files);
}
TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
// Generate a file containing 100 keys.
for (int i = 0; i < 100; i++) {
- ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
+ ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
uint64_t first_file_size = 0;
- auto files_in_db = GetAllSSTFiles(&first_file_size);
+ std::unordered_map<std::string, uint64_t> files_in_db;
+ ASSERT_OK(GetAllSSTFiles(&files_in_db, &first_file_size));
ASSERT_EQ(sfm->GetTotalSize(), first_file_size);
// Set the maximum allowed space usage to the current total size
ASSERT_NOK(Flush());
}
+TEST_F(DBSSTTest, CancellingCompactionsWorks) {
+ std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+ auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+
+ Options options = CurrentOptions();
+ options.sst_file_manager = sst_file_manager;
+ options.level0_file_num_compaction_trigger = 2;
+ options.statistics = CreateDBStatistics();
+ DestroyAndReopen(options);
+
+ int completed_compactions = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* /*arg*/) {
+ sfm->SetMaxAllowedSpaceUsage(0);
+ ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BackgroundCompaction:NonTrivial:AfterRun",
+ [&](void* /*arg*/) { completed_compactions++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ Random rnd(301);
+
+ // Generate a file containing 10 keys.
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
+ }
+ ASSERT_OK(Flush());
+ uint64_t total_file_size = 0;
+ std::unordered_map<std::string, uint64_t> files_in_db;
+ ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
+ // Set the maximum allowed space usage to the current total size
+ sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
+
+ // Generate another file to trigger compaction.
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
+ }
+ ASSERT_OK(Flush());
+ dbfull()->TEST_WaitForCompact(true);
+
+ // Because we set a callback in CancelledCompaction, we actually
+ // let the compaction run
+ ASSERT_GT(completed_compactions, 0);
+ ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
+ // Make sure the stat is bumped
+ ASSERT_GT(dbfull()->immutable_db_options().statistics.get()->getTickerCount(COMPACTION_CANCELLED), 0);
+ ASSERT_EQ(0,
+ dbfull()->immutable_db_options().statistics.get()->getTickerCount(
+ FILES_MARKED_TRASH));
+ ASSERT_EQ(4,
+ dbfull()->immutable_db_options().statistics.get()->getTickerCount(
+ FILES_DELETED_IMMEDIATELY));
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBSSTTest, CancellingManualCompactionsWorks) {
+ std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+ auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+
+ Options options = CurrentOptions();
+ options.sst_file_manager = sst_file_manager;
+ options.statistics = CreateDBStatistics();
+
+ FlushedFileCollector* collector = new FlushedFileCollector();
+ options.listeners.emplace_back(collector);
+
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+
+ // Generate a file containing 10 keys.
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
+ }
+ ASSERT_OK(Flush());
+ uint64_t total_file_size = 0;
+ std::unordered_map<std::string, uint64_t> files_in_db;
+ ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
+ // Set the maximum allowed space usage to the current total size
+ sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
+
+ // Generate another file to trigger compaction.
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
+ }
+ ASSERT_OK(Flush());
+
+ // OK, now trigger a manual compaction
+ dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+
+ // Wait for manual compaction to get scheduled and finish
+ dbfull()->TEST_WaitForCompact(true);
+
+ ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
+ // Make sure the stat is bumped
+ ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
+ COMPACTION_CANCELLED),
+ 1);
+
+ // Now make sure CompactFiles also gets cancelled
+ auto l0_files = collector->GetFlushedFiles();
+ dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0);
+
+ // Wait for manual compaction to get scheduled and finish
+ dbfull()->TEST_WaitForCompact(true);
+
+ ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
+ COMPACTION_CANCELLED),
+ 2);
+ ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
+
+ // Now let the flush through and make sure GetCompactionsReservedSize
+ // returns to normal
+ sfm->SetMaxAllowedSpaceUsage(0);
+ int completed_compactions = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactFilesImpl:End", [&](void* /*arg*/) { completed_compactions++; });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0);
+ dbfull()->TEST_WaitForCompact(true);
+
+ ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
+ ASSERT_GT(completed_compactions, 0);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
// This test will set a maximum allowed space for the DB, then it will
// keep filling the DB until the limit is reached and bg_error_ is set.
// When bg_error_ is set we will verify that the DB size is greater
// than the limit.
- std::vector<int> max_space_limits_mbs = {1, 2, 4, 8, 10};
- decltype(max_space_limits_mbs)::value_type limit_mb_cb;
- bool bg_error_set = false;
- uint64_t total_sst_files_size = 0;
+ std::vector<int> max_space_limits_mbs = {1, 10};
+ std::atomic<bool> bg_error_set(false);
- std::atomic<int> estimate_multiplier(1);
- int reached_max_space_on_flush = 0;
- int reached_max_space_on_compaction = 0;
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ std::atomic<int> reached_max_space_on_flush(0);
+ std::atomic<int> reached_max_space_on_compaction(0);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
[&](void* arg) {
Status* bg_error = static_cast<Status*>(arg);
bg_error_set = true;
- GetAllSSTFiles(&total_sst_files_size);
reached_max_space_on_flush++;
- // low limit for size calculated using sst files
- ASSERT_GE(total_sst_files_size, limit_mb_cb * 1024 * 1024);
// clear error to ensure compaction callback is called
*bg_error = Status::OK();
- estimate_multiplier++; // used in the main loop assert
});
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) {
+ bool* enough_room = static_cast<bool*>(arg);
+ *enough_room = true;
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached",
- [&](void* arg) {
+ [&](void* /*arg*/) {
bg_error_set = true;
- GetAllSSTFiles(&total_sst_files_size);
reached_max_space_on_compaction++;
});
for (auto limit_mb : max_space_limits_mbs) {
bg_error_set = false;
- total_sst_files_size = 0;
- estimate_multiplier = 1;
- limit_mb_cb = limit_mb;
- rocksdb::SyncPoint::GetInstance()->ClearTrace();
- rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
sfm->SetMaxAllowedSpaceUsage(limit_mb * 1024 * 1024);
- int keys_written = 0;
- uint64_t estimated_db_size = 0;
+ // It is easy to detect if the test is stuck in a loop. No need for
+ // complex termination logic.
while (true) {
- auto s = Put(RandomString(&rnd, 10), RandomString(&rnd, 50));
+ auto s = Put(rnd.RandomString(10), rnd.RandomString(50));
if (!s.ok()) {
break;
}
- keys_written++;
- // Check the estimated db size vs the db limit just to make sure we
- // dont run into an infinite loop
- estimated_db_size = keys_written * 60; // ~60 bytes per key
- ASSERT_LT(estimated_db_size,
- estimate_multiplier * limit_mb * 1024 * 1024 * 2);
}
ASSERT_TRUE(bg_error_set);
+ uint64_t total_sst_files_size = 0;
+ std::unordered_map<std::string, uint64_t> files_in_db;
+ ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_sst_files_size));
ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024);
- rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
ASSERT_GT(reached_max_space_on_flush, 0);
}
TEST_F(DBSSTTest, GetTotalSstFilesSize) {
+ // We don't propagate oldest-key-time table property on compaction and
+ // just write 0 as default value. This affect the exact table size, since
+ // we encode table properties as varint64. Force time to be 0 to work around
+ // it. Should remove the workaround after we propagate the property on
+ // compaction.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "FlushJob::WriteLevel0Table:oldest_ancester_time", [&](void* arg) {
+ uint64_t* current_time = static_cast<uint64_t*>(arg);
+ *current_time = 0;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.compression = kNoCompression;
// Live SST files = 0
// Total SST files = 0
ASSERT_EQ(total_sst_files_size, 0);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
#endif // ROCKSDB_LITE
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
- rocksdb::port::InstallStackTraceHandler();
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}