// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h"
+#include "env/mock_env.h"
#include "file/sst_file_manager_impl.h"
#include "port/port.h"
#include "port/stack_trace.h"
+#include "rocksdb/cache.h"
#include "rocksdb/sst_file_manager.h"
+#include "rocksdb/table.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
class DBSSTTest : public DBTestBase {
public:
- DBSSTTest() : DBTestBase("/db_sst_test", /*env_do_fsync=*/true) {}
+ DBSSTTest() : DBTestBase("db_sst_test", /*env_do_fsync=*/true) {}
};
#ifndef ROCKSDB_LITE
for (int i = 0; i < 10; ++i) {
GenerateNewFile(&rnd, &key_id, false);
}
- Flush();
+ ASSERT_OK(Flush());
Close();
int const num_files = GetSstFileCount(dbname_);
ASSERT_GT(num_files, 0);
ASSERT_OK(Flush());
}
// this should execute both L0->L1 and L1->(move)->L2 compactions
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,1", FilesPerLevel(0));
// If the moved file is actually deleted (the move-safeguard in
ASSERT_OK(Flush());
}
// this should execute both L0->L1 and L1->(move)->L2 compactions
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,1", FilesPerLevel(0));
test::SleepingBackgroundTask blocking_thread;
// finish the flush!
blocking_thread.WakeUp();
blocking_thread.WaitUntilDone();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// File just flushed is too big for L0 and L1 so gets moved to L2.
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,1,0,1", FilesPerLevel(0));
metadata.clear();
listener->VerifyMatchedCount(1);
}
+// Test that producing an empty .sst file does not write it out to
+// disk, and that the DeleteFile() env method is not called for
+// removing the non-existing file later.
+TEST_F(DBSSTTest, DeleteFileNotCalledForNotCreatedSSTFile) {
+ Options options = CurrentOptions();
+ options.env = env_;
+
+ OnFileDeletionListener* listener = new OnFileDeletionListener();
+ options.listeners.emplace_back(listener);
+
+ Reopen(options);
+
+ // Flush the empty database.
+ ASSERT_OK(Flush());
+ ASSERT_EQ("", FilesPerLevel(0));
+
+ // We expect no .sst files.
+ std::vector<LiveFileMetaData> metadata;
+ db_->GetLiveFilesMetaData(&metadata);
+ ASSERT_EQ(metadata.size(), 0U);
+
+ // We expect no file deletions.
+ listener->VerifyMatchedCount(0);
+}
+
+// Test that producing a non-empty .sst file does write it out to
+// disk, and that the DeleteFile() env method is not called for removing
+// the file later.
+TEST_F(DBSSTTest, DeleteFileNotCalledForCreatedSSTFile) {
+ Options options = CurrentOptions();
+ options.env = env_;
+
+ OnFileDeletionListener* listener = new OnFileDeletionListener();
+ options.listeners.emplace_back(listener);
+
+ Reopen(options);
+
+ ASSERT_OK(Put("pika", "choo"));
+
+ // Flush the non-empty database.
+ ASSERT_OK(Flush());
+ ASSERT_EQ("1", FilesPerLevel(0));
+
+ // We expect 1 .sst files.
+ std::vector<LiveFileMetaData> metadata;
+ db_->GetLiveFilesMetaData(&metadata);
+ ASSERT_EQ(metadata.size(), 1U);
+
+ // We expect no file deletions.
+ listener->VerifyMatchedCount(0);
+}
+
TEST_F(DBSSTTest, DBWithSstFileManager) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
for (int i = 0; i < 25; i++) {
GenerateNewRandomFile(&rnd);
ASSERT_OK(Flush());
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Verify that we are tracking all sst files in dbname_
std::unordered_map<std::string, uint64_t> files_in_db;
- ASSERT_OK(GetAllSSTFiles(&files_in_db));
+ ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
}
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
std::unordered_map<std::string, uint64_t> files_in_db;
- ASSERT_OK(GetAllSSTFiles(&files_in_db));
+ ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
// Verify that we are tracking all sst files in dbname_
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
// Verify the total files size
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
-TEST_F(DBSSTTest, RateLimitedDelete) {
+TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
+ std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+ auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+
+ int files_added = 0;
+ int files_deleted = 0;
+ int files_moved = 0;
+ int files_scheduled_to_delete = 0;
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::OnAddFile", [&](void* arg) {
+ const std::string* const file_path =
+ static_cast<const std::string*>(arg);
+ if (file_path->find(".blob") != std::string::npos) {
+ files_added++;
+ }
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
+ const std::string* const file_path =
+ static_cast<const std::string*>(arg);
+ if (file_path->find(".blob") != std::string::npos) {
+ files_deleted++;
+ }
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
+ assert(arg);
+ const std::string* const file_path =
+ static_cast<const std::string*>(arg);
+ if (file_path->find(".blob") != std::string::npos) {
+ ++files_scheduled_to_delete;
+ }
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ Options options = CurrentOptions();
+ options.sst_file_manager = sst_file_manager;
+ options.enable_blob_files = true;
+ options.blob_file_size = 32; // create one blob per file
+ DestroyAndReopen(options);
+ Random rnd(301);
+
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(Put("Key_" + std::to_string(i), "Value_" + std::to_string(i)));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ // Verify that we are tracking all sst and blob files in dbname_
+ std::unordered_map<std::string, uint64_t> files_in_db;
+ ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
+ ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
+ ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+ }
+
+ std::vector<uint64_t> blob_files = GetBlobFileNumbers();
+ ASSERT_EQ(files_added, blob_files.size());
+ // No blob file is obsoleted.
+ ASSERT_EQ(files_deleted, 0);
+ ASSERT_EQ(files_scheduled_to_delete, 0);
+ // No files were moved.
+ ASSERT_EQ(files_moved, 0);
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ std::unordered_map<std::string, uint64_t> files_in_db;
+ ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
+ ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
+
+ // Verify that we are tracking all sst and blob files in dbname_
+ ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+ // Verify the total files size
+ uint64_t total_files_size = 0;
+ for (auto& file_to_size : files_in_db) {
+ total_files_size += file_to_size.second;
+ }
+ ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
+ Close();
+
+ Reopen(options);
+ ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+ ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
+
+ // Verify that we track all the files again after the DB is closed and opened.
+ Close();
+
+ sst_file_manager.reset(NewSstFileManager(env_));
+ options.sst_file_manager = sst_file_manager;
+ sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+
+ Reopen(options);
+
+ ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+ ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
+
+ // Destroy DB and it will remove all the blob files from sst file manager and
+ // blob files deletion will go through ScheduleFileDeletion.
+ ASSERT_EQ(files_deleted, 0);
+ ASSERT_EQ(files_scheduled_to_delete, 0);
+ Close();
+ ASSERT_OK(DestroyDB(dbname_, options));
+ ASSERT_EQ(files_deleted, blob_files.size());
+ ASSERT_EQ(files_scheduled_to_delete, blob_files.size());
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFilesWithGC) {
+ std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+ auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+ Options options = CurrentOptions();
+ options.sst_file_manager = sst_file_manager;
+ options.enable_blob_files = true;
+ options.blob_file_size = 32; // create one blob per file
+ options.disable_auto_compactions = true;
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0.5;
+
+ int files_added = 0;
+ int files_deleted = 0;
+ int files_moved = 0;
+ int files_scheduled_to_delete = 0;
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::OnAddFile", [&](void* arg) {
+ const std::string* const file_path =
+ static_cast<const std::string*>(arg);
+ if (file_path->find(".blob") != std::string::npos) {
+ files_added++;
+ }
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
+ const std::string* const file_path =
+ static_cast<const std::string*>(arg);
+ if (file_path->find(".blob") != std::string::npos) {
+ files_deleted++;
+ }
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
+ assert(arg);
+ const std::string* const file_path =
+ static_cast<const std::string*>(arg);
+ if (file_path->find(".blob") != std::string::npos) {
+ ++files_scheduled_to_delete;
+ }
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ DestroyAndReopen(options);
+ Random rnd(301);
+
+ constexpr char first_key[] = "first_key";
+ constexpr char first_value[] = "first_value";
+ constexpr char second_key[] = "second_key";
+ constexpr char second_value[] = "second_value";
+
+ ASSERT_OK(Put(first_key, first_value));
+ ASSERT_OK(Put(second_key, second_value));
+ ASSERT_OK(Flush());
+
+ constexpr char third_key[] = "third_key";
+ constexpr char third_value[] = "third_value";
+ constexpr char fourth_key[] = "fourth_key";
+ constexpr char fourth_value[] = "fourth_value";
+ constexpr char fifth_key[] = "fifth_key";
+ constexpr char fifth_value[] = "fifth_value";
+
+ ASSERT_OK(Put(third_key, third_value));
+ ASSERT_OK(Put(fourth_key, fourth_value));
+ ASSERT_OK(Put(fifth_key, fifth_value));
+ ASSERT_OK(Flush());
+
+ const std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
+
+ ASSERT_EQ(original_blob_files.size(), 5);
+ ASSERT_EQ(files_added, 5);
+ ASSERT_EQ(files_deleted, 0);
+ ASSERT_EQ(files_scheduled_to_delete, 0);
+ ASSERT_EQ(files_moved, 0);
+ {
+ // Verify that we are tracking all sst and blob files in dbname_
+ std::unordered_map<std::string, uint64_t> files_in_db;
+ ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
+ ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
+ ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+ }
+
+ const size_t cutoff_index = static_cast<size_t>(
+ options.blob_garbage_collection_age_cutoff * original_blob_files.size());
+
+ size_t expected_number_of_files = original_blob_files.size();
+ // Note: turning off enable_blob_files before the compaction results in
+ // garbage collected values getting inlined.
+ ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
+ expected_number_of_files -= cutoff_index;
+ files_added = 0;
+
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ sfm->WaitForEmptyTrash();
+
+ ASSERT_EQ(Get(first_key), first_value);
+ ASSERT_EQ(Get(second_key), second_value);
+ ASSERT_EQ(Get(third_key), third_value);
+ ASSERT_EQ(Get(fourth_key), fourth_value);
+ ASSERT_EQ(Get(fifth_key), fifth_value);
+
+ const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
+
+ ASSERT_EQ(new_blob_files.size(), expected_number_of_files);
+ // No new file is added.
+ ASSERT_EQ(files_added, 0);
+ ASSERT_EQ(files_deleted, cutoff_index);
+ ASSERT_EQ(files_scheduled_to_delete, cutoff_index);
+ ASSERT_EQ(files_moved, 0);
+
+ // Original blob files below the cutoff should be gone, original blob files at
+ // or above the cutoff should be still there
+ for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
+ ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
+ }
+
+ {
+ // Verify that we are tracking all sst and blob files in dbname_
+ std::unordered_map<std::string, uint64_t> files_in_db;
+ ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
+ ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
+ ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
+ }
+
+ Close();
+ ASSERT_OK(DestroyDB(dbname_, options));
+ sfm->WaitForEmptyTrash();
+ ASSERT_EQ(files_deleted, 5);
+ ASSERT_EQ(files_scheduled_to_delete, 5);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+class DBSSTTestRateLimit : public DBSSTTest,
+ public ::testing::WithParamInterface<bool> {
+ public:
+ DBSSTTestRateLimit() : DBSSTTest() {}
+ ~DBSSTTestRateLimit() override {}
+};
+
+TEST_P(DBSSTTestRateLimit, RateLimitedDelete) {
Destroy(last_options_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBSSTTest::RateLimitedDelete:1",
*abs_time_us = Env::Default()->NowMicros();
});
+ // Disable PeriodicTaskScheduler as it also has TimedWait, which could update
+ // the simulated sleep time
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::StartPeriodicTaskScheduler:DisableScheduler", [&](void* arg) {
+ bool* disable_scheduler = static_cast<bool*>(arg);
+ *disable_scheduler = true;
+ });
+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ bool different_wal_dir = GetParam();
Options options = CurrentOptions();
SetTimeElapseOnlySleepOnReopen(&options);
options.disable_auto_compactions = true;
options.env = env_;
options.statistics = CreateDBStatistics();
+ if (different_wal_dir) {
+ options.wal_dir = alternative_wal_dir_;
+ }
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
Status s;
sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
WriteOptions wo;
- wo.disableWAL = true;
- ASSERT_OK(TryReopen(options));
+ if (!different_wal_dir) {
+ wo.disableWAL = true;
+ }
+ Reopen(options);
// Create 4 files in L0
for (char v = 'a'; v <= 'd'; v++) {
ASSERT_OK(Put("Key2", DummyString(1024, v), wo));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
+INSTANTIATE_TEST_CASE_P(RateLimitedDelete, DBSSTTestRateLimit,
+ ::testing::Bool());
+
TEST_F(DBSSTTest, RateLimitedWALDelete) {
Destroy(last_options_);
}
class DBWALTestWithParam
- : public DBSSTTest,
+ : public DBTestBase,
public testing::WithParamInterface<std::tuple<std::string, bool>> {
public:
- DBWALTestWithParam() {
+ explicit DBWALTestWithParam()
+ : DBTestBase("db_wal_test_with_params", /*env_do_fsync=*/true) {
wal_dir_ = std::get<0>(GetParam());
wal_dir_same_as_dbname_ = std::get<1>(GetParam());
}
class MyEnv : public EnvWrapper {
public:
MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}
-
- Status DeleteFile(const std::string& fname) {
+ const char* Name() const override { return "MyEnv"; }
+ Status DeleteFile(const std::string& fname) override {
if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
return Status::OK();
}
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
- ASSERT_OK(TryReopen(options));
+ Reopen(options);
// Create 4 files in L0
for (char v = 'a'; v <= 'd'; v++) {
+ if (v == 'c') {
+ // Maximize the change that the last log file will be preserved in trash
+ // before restarting the DB.
+ // We have to set this on the 2nd to last file for it to delay deletion
+ // on the last file. (Quirk of DeleteScheduler::BackgroundEmptyTrash())
+ options.sst_file_manager->SetDeleteRateBytesPerSecond(1);
+ }
ASSERT_OK(Put("Key2", DummyString(1024, v)));
ASSERT_OK(Put("Key3", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
if (!wal_dir_same_as_dbname_) {
// Forcibly create some trash log files
std::unique_ptr<WritableFile> result;
- env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
- EnvOptions());
+ ASSERT_OK(env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
+ EnvOptions()));
result.reset();
}
- env->GetChildren(options.wal_dir, &filenames);
+ ASSERT_OK(env->GetChildren(options.wal_dir, &filenames));
for (const std::string& fname : filenames) {
if (fname.find(".log.trash") != std::string::npos) {
trash_log_count++;
ASSERT_GE(trash_log_count, 1);
env->set_fake_log_delete(false);
- ASSERT_OK(TryReopen(options));
+ Reopen(options);
filenames.clear();
trash_log_count = 0;
- env->GetChildren(options.wal_dir, &filenames);
+ ASSERT_OK(env->GetChildren(options.wal_dir, &filenames));
for (const std::string& fname : filenames) {
if (fname.find(".log.trash") != std::string::npos) {
trash_log_count++;
Destroy(last_options_);
// Add some trash files to the db directory so the DB can clean them up
- env_->CreateDirIfMissing(dbname_);
+ ASSERT_OK(env_->CreateDirIfMissing(dbname_));
ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "001.sst.trash"));
ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "002.sst.trash"));
ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "003.sst.trash"));
// Reopen the DB and verify that it deletes existing trash files
- ASSERT_OK(TryReopen(options));
+ Reopen(options);
sfm->WaitForEmptyTrash();
ASSERT_NOK(env_->FileExists(dbname_ + "/" + "001.sst.trash"));
ASSERT_NOK(env_->FileExists(dbname_ + "/" + "002.sst.trash"));
ASSERT_NOK(env_->FileExists(dbname_ + "/" + "003.sst.trash"));
}
-
// Create a DB with 2 db_paths, and generate multiple files in the 2
// db_paths using CompactRangeOptions, make sure that files that were
// deleted from first db_path were deleted using DeleteScheduler and
// Create 4 files in L0
for (int i = 0; i < 4; i++) {
- ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A'), wo));
+ ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'A'), wo));
ASSERT_OK(Flush());
}
// We created 4 sst files in L0
// Create 4 files in L0
for (int i = 4; i < 8; i++) {
- ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B'), wo));
+ ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'B'), wo));
ASSERT_OK(Flush());
}
ASSERT_EQ("4,1", FilesPerLevel(0));
// Create 4 files in L0
for (int i = 0; i < 4; i++) {
- ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A')));
+ ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'A')));
ASSERT_OK(Flush());
}
// We created 4 sst files in L0
int num_sst_files = 0;
int num_wal_files = 0;
std::vector<std::string> db_files;
- env_->GetChildren(dbname_, &db_files);
+ ASSERT_OK(env_->GetChildren(dbname_, &db_files));
for (std::string f : db_files) {
if (f.substr(f.find_last_of(".") + 1) == "sst") {
num_sst_files++;
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->SetDeleteRateBytesPerSecond(1024 * 1024);
- sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
+ // Set an extra high trash ratio to prevent immediate/non-rate limited
+ // deletions
+ sfm->delete_scheduler()->SetMaxTrashDBRatio(1000.0);
ASSERT_OK(DestroyDB(dbname_, options));
sfm->WaitForEmptyTrash();
ASSERT_EQ(bg_delete_file, num_sst_files + num_wal_files);
uint64_t first_file_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
- ASSERT_OK(GetAllSSTFiles(&files_in_db, &first_file_size));
+ ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &first_file_size));
ASSERT_EQ(sfm->GetTotalSize(), first_file_size);
// Set the maximum allowed space usage to the current total size
ASSERT_NOK(Flush());
}
+TEST_F(DBSSTTest, DBWithMaxSpaceAllowedWithBlobFiles) {
+ std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+ auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+
+ Options options = CurrentOptions();
+ options.sst_file_manager = sst_file_manager;
+ options.disable_auto_compactions = true;
+ options.enable_blob_files = true;
+ DestroyAndReopen(options);
+
+ Random rnd(301);
+
+ // Generate a file containing keys.
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
+ }
+ ASSERT_OK(Flush());
+
+ uint64_t files_size = 0;
+ uint64_t total_files_size = 0;
+ std::unordered_map<std::string, uint64_t> files_in_db;
+
+ ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db, &files_size));
+ // Make sure blob files are considered by SSTFileManage in size limits.
+ ASSERT_GT(files_size, 0);
+ total_files_size = files_size;
+ ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &files_size));
+ total_files_size += files_size;
+ ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
+
+ // Set the maximum allowed space usage to the current total size.
+ sfm->SetMaxAllowedSpaceUsage(total_files_size + 1);
+
+ bool max_allowed_space_reached = false;
+ bool delete_blob_file = false;
+ // Sync point called after blob file is closed and max allowed space is
+ // checked.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "BlobFileCompletionCallback::CallBack::MaxAllowedSpaceReached",
+ [&](void* /*arg*/) { max_allowed_space_reached = true; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "BuildTable::AfterDeleteFile",
+ [&](void* /*arg*/) { delete_blob_file = true; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+ {
+ "BuildTable::AfterDeleteFile",
+ "DBSSTTest::DBWithMaxSpaceAllowedWithBlobFiles:1",
+ },
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(Put("key1", "val1"));
+ // This flush will fail
+ ASSERT_NOK(Flush());
+ ASSERT_TRUE(max_allowed_space_reached);
+
+ TEST_SYNC_POINT("DBSSTTest::DBWithMaxSpaceAllowedWithBlobFiles:1");
+ ASSERT_TRUE(delete_blob_file);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
TEST_F(DBSSTTest, CancellingCompactionsWorks) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
ASSERT_OK(Flush());
uint64_t total_file_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
- ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
+ ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size));
// Set the maximum allowed space usage to the current total size
sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
// Because we set a callback in CancelledCompaction, we actually
// let the compaction run
ASSERT_GT(completed_compactions, 0);
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
// Make sure the stat is bumped
- ASSERT_GT(dbfull()->immutable_db_options().statistics.get()->getTickerCount(COMPACTION_CANCELLED), 0);
+ ASSERT_GT(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
+ COMPACTION_CANCELLED),
+ 0);
ASSERT_EQ(0,
dbfull()->immutable_db_options().statistics.get()->getTickerCount(
FILES_MARKED_TRASH));
ASSERT_OK(Flush());
uint64_t total_file_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
- ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
+ ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size));
// Set the maximum allowed space usage to the current total size
sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
ASSERT_OK(Flush());
// OK, now trigger a manual compaction
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_TRUE(dbfull()
+ ->CompactRange(CompactRangeOptions(), nullptr, nullptr)
+ .IsCompactionTooLarge());
// Wait for manual compaction to get scheduled and finish
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
// Make sure the stat is bumped
// Now make sure CompactFiles also gets cancelled
auto l0_files = collector->GetFlushedFiles();
- dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0);
+ ASSERT_TRUE(
+ dbfull()
+ ->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0)
+ .IsCompactionTooLarge());
// Wait for manual compaction to get scheduled and finish
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
COMPACTION_CANCELLED),
"CompactFilesImpl:End", [&](void* /*arg*/) { completed_compactions++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0);
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_OK(dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
+ l0_files, 0));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
ASSERT_GT(completed_compactions, 0);
ASSERT_TRUE(bg_error_set);
uint64_t total_sst_files_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
- ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_sst_files_size));
+ ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_sst_files_size));
ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
- db_->CompactRange(compact_options, nullptr, nullptr);
+ ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
// Create 12 Files in L0
for (int i = 0; i < 12; i++) {
}
}
+TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFilesSubjectToMemoryLimit) {
+ for (CacheEntryRoleOptions::Decision charge_table_reader :
+ {CacheEntryRoleOptions::Decision::kEnabled,
+ CacheEntryRoleOptions::Decision::kDisabled}) {
+ // Open DB with infinite max open files
+ // - First iteration use 1 thread to open files
+ // - Second iteration use 5 threads to open files
+ for (int iter = 0; iter < 2; iter++) {
+ Options options;
+ options.create_if_missing = true;
+ options.write_buffer_size = 100000;
+ options.disable_auto_compactions = true;
+ options.max_open_files = -1;
+
+ BlockBasedTableOptions table_options;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ if (iter == 0) {
+ options.max_file_opening_threads = 1;
+ } else {
+ options.max_file_opening_threads = 5;
+ }
+
+ DestroyAndReopen(options);
+
+ // Create 5 Files in L0 (then move then to L2)
+ for (int i = 0; i < 5; i++) {
+ std::string k = "L2_" + Key(i);
+ ASSERT_OK(Put(k, k + std::string(1000, 'a')));
+ ASSERT_OK(Flush()) << i;
+ }
+ CompactRangeOptions compact_options;
+ compact_options.change_level = true;
+ compact_options.target_level = 2;
+ ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
+
+ // Create 5 Files in L0
+ for (int i = 0; i < 5; i++) {
+ std::string k = "L0_" + Key(i);
+ ASSERT_OK(Put(k, k + std::string(1000, 'a')));
+ ASSERT_OK(Flush());
+ }
+ Close();
+
+ table_options.cache_usage_options.options_overrides.insert(
+ {CacheEntryRole::kBlockBasedTableReader,
+ {/*.charged = */ charge_table_reader}});
+ table_options.block_cache =
+ NewLRUCache(1024 /* capacity */, 0 /* num_shard_bits */,
+ true /* strict_capacity_limit */);
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+ // Reopening the DB will try to load all existing files, conditionally
+ // subject to memory limit
+ Status s = TryReopen(options);
+
+ if (charge_table_reader == CacheEntryRoleOptions::Decision::kEnabled) {
+ EXPECT_TRUE(s.IsMemoryLimit());
+ EXPECT_TRUE(s.ToString().find(
+ kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
+ CacheEntryRole::kBlockBasedTableReader)]) !=
+ std::string::npos);
+ EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") !=
+ std::string::npos);
+
+ } else {
+ EXPECT_TRUE(s.ok());
+ ASSERT_EQ("5,0,5", FilesPerLevel(0));
+ }
+ }
+ }
+}
+
TEST_F(DBSSTTest, GetTotalSstFilesSize) {
// We don't propagate oldest-key-time table property on compaction and
// just write 0 as default value. This affect the exact table size, since
// Generate 5 files in L0
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 10; j++) {
- std::string val = "val_file_" + ToString(i);
+ std::string val = "val_file_" + std::to_string(i);
ASSERT_OK(Put(Key(j), val));
}
- Flush();
+ ASSERT_OK(Flush());
}
ASSERT_EQ("5", FilesPerLevel(0));
// hold current version
std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
+ ASSERT_OK(iter1->status());
// Compact 5 files into 1 file in L0
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// hold current version
std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));
+ ASSERT_OK(iter2->status());
// Delete all keys and compact, this will delete all live files
for (int i = 0; i < 10; i++) {
ASSERT_OK(Delete(Key(i)));
}
- Flush();
+ ASSERT_OK(Flush());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("", FilesPerLevel(0));
// Total SST files = 6 (5 original files + compacted file)
ASSERT_EQ(total_sst_files_size, 6 * single_file_size);
+ ASSERT_OK(iter1->status());
iter1.reset();
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
// Total SST files = 1 (compacted file)
ASSERT_EQ(total_sst_files_size, 1 * single_file_size);
+ ASSERT_OK(iter2->status());
iter2.reset();
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
+TEST_F(DBSSTTest, OpenDBWithoutGetFileSizeInvocations) {
+ Options options = CurrentOptions();
+ std::unique_ptr<MockEnv> env{MockEnv::Create(Env::Default())};
+ options.env = env.get();
+ options.disable_auto_compactions = true;
+ options.compression = kNoCompression;
+ options.enable_blob_files = true;
+ options.blob_file_size = 32; // create one blob per file
+ options.skip_checking_sst_file_sizes_on_db_open = true;
+
+ DestroyAndReopen(options);
+ // Generate 5 files in L0
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 10; j++) {
+ std::string val = "val_file_" + std::to_string(i);
+ ASSERT_OK(Put(Key(j), val));
+ }
+ ASSERT_OK(Flush());
+ }
+ Close();
+
+ bool is_get_file_size_called = false;
+ SyncPoint::GetInstance()->SetCallBack(
+ "MockFileSystem::GetFileSize:CheckFileType", [&](void* arg) {
+ std::string* filename = reinterpret_cast<std::string*>(arg);
+ if (filename->find(".blob") != std::string::npos) {
+ is_get_file_size_called = true;
+ }
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+ Reopen(options);
+ ASSERT_FALSE(is_get_file_size_called);
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ Destroy(options);
+}
+
TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
// Generate 5 files in L0
for (int i = 0; i < 5; i++) {
ASSERT_OK(Put(Key(i), "val"));
- Flush();
+ ASSERT_OK(Flush());
}
ASSERT_EQ("5", FilesPerLevel(0));
// hold current version
std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
+ ASSERT_OK(iter1->status());
// Compaction will do trivial move from L0 to L1
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// hold current version
std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));
+ ASSERT_OK(iter2->status());
// Delete all keys and compact, this will delete all live files
for (int i = 0; i < 5; i++) {
ASSERT_OK(Delete(Key(i)));
}
- Flush();
+ ASSERT_OK(Flush());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("", FilesPerLevel(0));
// Total SST files = 5 (used in 2 version)
ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
+ ASSERT_OK(iter1->status());
iter1.reset();
+ ASSERT_OK(iter2->status());
iter2.reset();
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
ASSERT_EQ(total_sst_files_size, 0);
}
+// This test if blob files are recorded by SST File Manager when Compaction job
+// creates/delete them and in case of AtomicFlush.
+TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) {
+ std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+ auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+ Options options = CurrentOptions();
+ options.sst_file_manager = sst_file_manager;
+ options.enable_blob_files = true;
+ options.min_blob_size = 0;
+ options.disable_auto_compactions = true;
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0.5;
+ options.atomic_flush = true;
+
+ int files_added = 0;
+ int files_deleted = 0;
+ int files_scheduled_to_delete = 0;
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::OnAddFile", [&](void* arg) {
+ const std::string* const file_path =
+ static_cast<const std::string*>(arg);
+ if (EndsWith(*file_path, ".blob")) {
+ files_added++;
+ }
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
+ const std::string* const file_path =
+ static_cast<const std::string*>(arg);
+ if (EndsWith(*file_path, ".blob")) {
+ files_deleted++;
+ }
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
+ assert(arg);
+ const std::string* const file_path =
+ static_cast<const std::string*>(arg);
+ if (EndsWith(*file_path, ".blob")) {
+ ++files_scheduled_to_delete;
+ }
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ DestroyAndReopen(options);
+ Random rnd(301);
+
+ ASSERT_OK(Put("key_1", "value_1"));
+ ASSERT_OK(Put("key_2", "value_2"));
+ ASSERT_OK(Put("key_3", "value_3"));
+ ASSERT_OK(Put("key_4", "value_4"));
+ ASSERT_OK(Flush());
+
+ // Overwrite will create the garbage data.
+ ASSERT_OK(Put("key_3", "new_value_3"));
+ ASSERT_OK(Put("key_4", "new_value_4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key5", "blob_value5"));
+ ASSERT_OK(Put("Key6", "blob_value6"));
+ ASSERT_OK(Flush());
+
+ ASSERT_EQ(files_added, 3);
+ ASSERT_EQ(files_deleted, 0);
+ ASSERT_EQ(files_scheduled_to_delete, 0);
+ files_added = 0;
+
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+ // Compaction job will create a new file and delete the older files.
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ASSERT_EQ(files_added, 1);
+ ASSERT_EQ(files_scheduled_to_delete, 1);
+
+ sfm->WaitForEmptyTrash();
+
+ ASSERT_EQ(files_deleted, 1);
+
+ Close();
+ ASSERT_OK(DestroyDB(dbname_, options));
+
+ ASSERT_EQ(files_scheduled_to_delete, 4);
+
+ sfm->WaitForEmptyTrash();
+
+ ASSERT_EQ(files_deleted, 4);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
+ RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}