#include <algorithm>
#include <chrono>
#include <cstdlib>
+#include <iomanip>
#include <map>
#include <memory>
+#include <sstream>
#include <string>
#include <vector>
+#include "db/blob_index.h"
#include "db/db_test_util.h"
+#include "env/composite_env_wrapper.h"
+#include "file/file_util.h"
+#include "file/sst_file_manager_impl.h"
#include "port/port.h"
#include "rocksdb/utilities/debug.h"
+#include "test_util/fault_injection_test_env.h"
+#include "test_util/sync_point.h"
+#include "test_util/testharness.h"
#include "util/cast_util.h"
-#include "util/fault_injection_test_env.h"
-#include "util/file_util.h"
#include "util/random.h"
-#include "util/sst_file_manager_impl.h"
#include "util/string_util.h"
-#include "util/sync_point.h"
-#include "util/testharness.h"
#include "utilities/blob_db/blob_db.h"
#include "utilities/blob_db/blob_db_impl.h"
-#include "utilities/blob_db/blob_index.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
namespace blob_db {
class BlobDBTest : public testing::Test {
public:
const int kMaxBlobSize = 1 << 14;
- struct BlobRecord {
- std::string key;
- std::string value;
- uint64_t expiration = 0;
+ struct BlobIndexVersion {
+ BlobIndexVersion() = default;
+ BlobIndexVersion(std::string _user_key, uint64_t _file_number,
+ uint64_t _expiration, SequenceNumber _sequence,
+ ValueType _type)
+ : user_key(std::move(_user_key)),
+ file_number(_file_number),
+ expiration(_expiration),
+ sequence(_sequence),
+ type(_type) {}
+
+ std::string user_key;
+ uint64_t file_number = kInvalidBlobFileNumber;
+ uint64_t expiration = kNoExpiration;
+ SequenceNumber sequence = 0;
+ ValueType type = kTypeValue;
};
BlobDBTest()
}
}
+ void VerifyBaseDBBlobIndex(
+ const std::map<std::string, BlobIndexVersion> &expected_versions) {
+ const size_t kMaxKeys = 10000;
+ std::vector<KeyVersion> versions;
+ ASSERT_OK(
+ GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions));
+ ASSERT_EQ(versions.size(), expected_versions.size());
+
+ size_t i = 0;
+ for (const auto &expected_pair : expected_versions) {
+ const BlobIndexVersion &expected_version = expected_pair.second;
+
+ ASSERT_EQ(versions[i].user_key, expected_version.user_key);
+ ASSERT_EQ(versions[i].sequence, expected_version.sequence);
+ ASSERT_EQ(versions[i].type, expected_version.type);
+ if (versions[i].type != kTypeBlobIndex) {
+ ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
+ ASSERT_EQ(kNoExpiration, expected_version.expiration);
+
+ ++i;
+ continue;
+ }
+
+ BlobIndex blob_index;
+ ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
+
+ const uint64_t file_number = !blob_index.IsInlined()
+ ? blob_index.file_number()
+ : kInvalidBlobFileNumber;
+ ASSERT_EQ(file_number, expected_version.file_number);
+
+ const uint64_t expiration =
+ blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
+ ASSERT_EQ(expiration, expected_version.expiration);
+
+ ++i;
+ }
+ }
+
void InsertBlobs() {
WriteOptions wo;
std::string value;
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
- GCStats gc_stats;
- ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
- ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
- ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
- GCStats gc_stats;
- ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
- ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
- ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
VerifyDB(data);
}
-TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- DBImpl *db_impl = static_cast_with_check<DBImpl, DB>(blob_db_->GetBaseDB());
- std::map<std::string, std::string> data;
- for (int i = 0; i < 200; i++) {
- PutRandom("key" + ToString(i), &rnd, &data);
- }
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
- // Test for data in SST
- size_t new_keys = 0;
- for (int i = 0; i < 100; i++) {
- if (rnd.Next() % 2 == 1) {
- new_keys++;
- PutRandom("key" + ToString(i), &rnd, &data);
- }
- }
- db_impl->TEST_FlushMemTable(true /*wait*/);
- // Test for data in memtable
- for (int i = 100; i < 200; i++) {
- if (rnd.Next() % 2 == 1) {
- new_keys++;
- PutRandom("key" + ToString(i), &rnd, &data);
- }
- }
- GCStats gc_stats;
- ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
- ASSERT_EQ(200, gc_stats.blob_count);
- ASSERT_EQ(0, gc_stats.num_keys_expired);
- ASSERT_EQ(200 - new_keys, gc_stats.num_keys_relocated);
- VerifyDB(data);
-}
-
-TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
- Random rnd(301);
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1"));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
-
- SyncPoint::GetInstance()->LoadDependency(
- {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
- "BlobDBImpl::PutUntil:Start"},
- {"BlobDBImpl::PutUntil:Finish",
- "BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}});
- SyncPoint::GetInstance()->EnableProcessing();
-
- auto writer = port::Thread(
- [this]() { ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v2")); });
-
- GCStats gc_stats;
- ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
- ASSERT_EQ(1, gc_stats.blob_count);
- ASSERT_EQ(0, gc_stats.num_keys_expired);
- ASSERT_EQ(1, gc_stats.num_keys_overwritten);
- ASSERT_EQ(0, gc_stats.num_keys_relocated);
- writer.join();
- VerifyDB({{"foo", "v2"}});
-}
-
-TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
- Random rnd(301);
- Options options;
- options.env = mock_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options, options);
- mock_env_->set_current_time(100);
- ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
- mock_env_->set_current_time(300);
-
- SyncPoint::GetInstance()->LoadDependency(
- {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
- "BlobDBImpl::PutUntil:Start"},
- {"BlobDBImpl::PutUntil:Finish",
- "BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}});
- SyncPoint::GetInstance()->EnableProcessing();
-
- auto writer = port::Thread([this]() {
- ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v2", 400));
- });
-
- GCStats gc_stats;
- ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
- ASSERT_EQ(1, gc_stats.blob_count);
- ASSERT_EQ(1, gc_stats.num_keys_expired);
- ASSERT_EQ(0, gc_stats.num_keys_relocated);
- writer.join();
- VerifyDB({{"foo", "v2"}});
-}
-
-TEST_F(BlobDBTest, NewFileGeneratedFromGCShouldMarkAsImmutable) {
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- ASSERT_OK(Put("foo", "bar"));
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- auto blob_file1 = blob_files[0];
- ASSERT_EQ(1, blob_files.size());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file1));
- GCStats gc_stats;
- ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_file1, &gc_stats));
- ASSERT_EQ(1, gc_stats.blob_count);
- ASSERT_EQ(1, gc_stats.num_keys_relocated);
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(2, blob_files.size());
- ASSERT_EQ(blob_file1, blob_files[0]);
- ASSERT_TRUE(blob_files[1]->Immutable());
-}
-
-// This test is no longer valid since we now return an error when we go
-// over the configured max_db_size.
-// The test needs to be re-written later in such a way that writes continue
-// after a GC happens.
-TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) {
- // Use mock env to stop wall clock.
- Options options;
- options.env = mock_env_.get();
- BlobDBOptions bdb_options;
- bdb_options.max_db_size = 100;
- bdb_options.blob_file_size = 100;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- std::string value(100, 'v');
- ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key_with_ttl", value, 60));
- for (int i = 0; i < 10; i++) {
- ASSERT_OK(blob_db_->Put(WriteOptions(), "key" + ToString(i), value));
- }
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(11, blob_files.size());
- ASSERT_TRUE(blob_files[0]->HasTTL());
- ASSERT_TRUE(blob_files[0]->Immutable());
- for (int i = 1; i <= 10; i++) {
- ASSERT_FALSE(blob_files[i]->HasTTL());
- if (i < 10) {
- ASSERT_TRUE(blob_files[i]->Immutable());
- }
- }
- blob_db_impl()->TEST_RunGC();
- // The oldest simple blob file (i.e. blob_files[1]) has been selected for GC.
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(1, obsolete_files.size());
- ASSERT_EQ(blob_files[1]->BlobFileNumber(),
- obsolete_files[0]->BlobFileNumber());
-}
-
-TEST_F(BlobDBTest, ReadWhileGC) {
- // run the same test for Get(), MultiGet() and Iterator each.
- for (int i = 0; i < 2; i++) {
- BlobDBOptions bdb_options;
- bdb_options.min_blob_size = 0;
- bdb_options.disable_background_tasks = true;
- Open(bdb_options);
- blob_db_->Put(WriteOptions(), "foo", "bar");
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- std::shared_ptr<BlobFile> bfile = blob_files[0];
- uint64_t bfile_number = bfile->BlobFileNumber();
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
-
- switch (i) {
- case 0:
- SyncPoint::GetInstance()->LoadDependency(
- {{"BlobDBImpl::Get:AfterIndexEntryGet:1",
- "BlobDBTest::ReadWhileGC:1"},
- {"BlobDBTest::ReadWhileGC:2",
- "BlobDBImpl::Get:AfterIndexEntryGet:2"}});
- break;
- case 1:
- SyncPoint::GetInstance()->LoadDependency(
- {{"BlobDBIterator::UpdateBlobValue:Start:1",
- "BlobDBTest::ReadWhileGC:1"},
- {"BlobDBTest::ReadWhileGC:2",
- "BlobDBIterator::UpdateBlobValue:Start:2"}});
- break;
- }
- SyncPoint::GetInstance()->EnableProcessing();
-
- auto reader = port::Thread([this, i]() {
- std::string value;
- std::vector<std::string> values;
- std::vector<Status> statuses;
- switch (i) {
- case 0:
- ASSERT_OK(blob_db_->Get(ReadOptions(), "foo", &value));
- ASSERT_EQ("bar", value);
- break;
- case 1:
- // VerifyDB use iterator to scan the DB.
- VerifyDB({{"foo", "bar"}});
- break;
- }
- });
-
- TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:1");
- GCStats gc_stats;
- ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
- ASSERT_EQ(1, gc_stats.blob_count);
- ASSERT_EQ(1, gc_stats.num_keys_relocated);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- // The file shouln't be deleted
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(2, blob_files.size());
- ASSERT_EQ(bfile_number, blob_files[0]->BlobFileNumber());
- auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
- ASSERT_EQ(1, obsolete_files.size());
- ASSERT_EQ(bfile_number, obsolete_files[0]->BlobFileNumber());
- TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:2");
- reader.join();
- SyncPoint::GetInstance()->DisableProcessing();
-
- // The file is deleted this time
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_NE(bfile_number, blob_files[0]->BlobFileNumber());
- ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
- VerifyDB({{"foo", "bar"}});
- Destroy();
- }
-}
-
TEST_F(BlobDBTest, SstFileManager) {
// run the same test for Get(), MultiGet() and Iterator each.
std::shared_ptr<SstFileManager> sst_file_manager(
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
+ bdb_options.enable_garbage_collection = true;
+ bdb_options.garbage_collection_cutoff = 1.0;
Options db_options;
- int files_deleted_directly = 0;
int files_scheduled_to_delete = 0;
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
- "SstFileManagerImpl::ScheduleFileDeletion",
- [&](void * /*arg*/) { files_scheduled_to_delete++; });
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
- "DeleteScheduler::DeleteFile",
- [&](void * /*arg*/) { files_deleted_directly++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
+ assert(arg);
+ const std::string *const file_path =
+ static_cast<const std::string *>(arg);
+ if (file_path->find(".blob") != std::string::npos) {
+ ++files_scheduled_to_delete;
+ }
+ });
SyncPoint::GetInstance()->EnableProcessing();
db_options.sst_file_manager = sst_file_manager;
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFile> bfile = blob_files[0];
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
- GCStats gc_stats;
- ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
blob_db_impl()->TEST_DeleteObsoleteFiles();
// Even if SSTFileManager is not set, DB is creating a dummy one.
ASSERT_EQ(1, files_scheduled_to_delete);
- ASSERT_EQ(0, files_deleted_directly);
Destroy();
// Make sure that DestroyBlobDB() also goes through delete scheduler.
- ASSERT_GE(files_scheduled_to_delete, 2);
- // Due to a timing issue, the WAL may or may not be deleted directly. The
- // blob file is first scheduled, followed by WAL. If the background trash
- // thread does not wake up on time, the WAL file will be directly
- // deleted as the trash size will be > DB size
- ASSERT_LE(files_deleted_directly, 1);
+ ASSERT_EQ(2, files_scheduled_to_delete);
SyncPoint::GetInstance()->DisableProcessing();
sfm->WaitForEmptyTrash();
}
TEST_F(BlobDBTest, SstFileManagerRestart) {
- int files_deleted_directly = 0;
int files_scheduled_to_delete = 0;
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
- "SstFileManagerImpl::ScheduleFileDeletion",
- [&](void * /*arg*/) { files_scheduled_to_delete++; });
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
- "DeleteScheduler::DeleteFile",
- [&](void * /*arg*/) { files_deleted_directly++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
+ assert(arg);
+ const std::string *const file_path =
+ static_cast<const std::string *>(arg);
+ if (file_path->find(".blob") != std::string::npos) {
+ ++files_scheduled_to_delete;
+ }
+ });
// run the same test for Get(), MultiGet() and Iterator each.
std::shared_ptr<SstFileManager> sst_file_manager(
Close();
// Create 3 dummy trash files under the blob_dir
- CreateFile(db_options.env, blob_dir + "/000666.blob.trash", "", false);
- CreateFile(db_options.env, blob_dir + "/000888.blob.trash", "", true);
- CreateFile(db_options.env, blob_dir + "/something_not_match.trash", "",
- false);
+ LegacyFileSystemWrapper fs(db_options.env);
+ CreateFile(&fs, blob_dir + "/000666.blob.trash", "", false);
+ CreateFile(&fs, blob_dir + "/000888.blob.trash", "", true);
+ CreateFile(&fs, blob_dir + "/something_not_match.trash", "", false);
// Make sure that reopening the DB rescan the existing trash files
Open(bdb_options, db_options);
- ASSERT_GE(files_scheduled_to_delete, 3);
- // Depending on timing, the WAL file may or may not be directly deleted
- ASSERT_LE(files_deleted_directly, 1);
+ ASSERT_EQ(files_scheduled_to_delete, 2);
sfm->WaitForEmptyTrash();
TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
+ bdb_options.enable_garbage_collection = true;
+ bdb_options.garbage_collection_cutoff = 1.0;
bdb_options.disable_background_tasks = true;
+
// i = when to take snapshot
for (int i = 0; i < 4; i++) {
- for (bool delete_key : {true, false}) {
- const Snapshot *snapshot = nullptr;
- Destroy();
- Open(bdb_options);
- // First file
- ASSERT_OK(Put("key1", "value"));
- if (i == 0) {
- snapshot = blob_db_->GetSnapshot();
- }
- auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(1, blob_files.size());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
- // Second file
- ASSERT_OK(Put("key2", "value"));
- if (i == 1) {
- snapshot = blob_db_->GetSnapshot();
- }
- blob_files = blob_db_impl()->TEST_GetBlobFiles();
- ASSERT_EQ(2, blob_files.size());
- auto bfile = blob_files[1];
- ASSERT_FALSE(bfile->Immutable());
- ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
- // Third file
- ASSERT_OK(Put("key3", "value"));
- if (i == 2) {
- snapshot = blob_db_->GetSnapshot();
- }
- if (delete_key) {
- Delete("key2");
- }
- GCStats gc_stats;
- ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
- ASSERT_TRUE(bfile->Obsolete());
- ASSERT_EQ(1, gc_stats.blob_count);
- if (delete_key) {
- ASSERT_EQ(0, gc_stats.num_keys_relocated);
- } else {
- ASSERT_EQ(1, gc_stats.num_keys_relocated);
- }
- ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
- bfile->GetObsoleteSequence());
- if (i == 3) {
- snapshot = blob_db_->GetSnapshot();
- }
- size_t num_files = delete_key ? 3 : 4;
- ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size());
+ Destroy();
+ Open(bdb_options);
+
+ const Snapshot *snapshot = nullptr;
+
+ // First file
+ ASSERT_OK(Put("key1", "value"));
+ if (i == 0) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
+
+ // Second file
+ ASSERT_OK(Put("key2", "value"));
+ if (i == 1) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(2, blob_files.size());
+ auto bfile = blob_files[1];
+ ASSERT_FALSE(bfile->Immutable());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
+
+ // Third file
+ ASSERT_OK(Put("key3", "value"));
+ if (i == 2) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_TRUE(bfile->Obsolete());
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
+ bfile->GetObsoleteSequence());
+
+ Delete("key2");
+ if (i == 3) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+
+ ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+
+ if (i >= 2) {
+ // The snapshot shouldn't see data in bfile
+ ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
+ blob_db_->ReleaseSnapshot(snapshot);
+ } else {
+ // The snapshot will see data in bfile, so the file shouldn't be deleted
+ ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
+ blob_db_->ReleaseSnapshot(snapshot);
blob_db_impl()->TEST_DeleteObsoleteFiles();
- if (i == 3) {
- // The snapshot shouldn't see data in bfile
- ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size());
- blob_db_->ReleaseSnapshot(snapshot);
- } else {
- // The snapshot will see data in bfile, so the file shouldn't be deleted
- ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size());
- blob_db_->ReleaseSnapshot(snapshot);
- blob_db_impl()->TEST_DeleteObsoleteFiles();
- ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size());
- }
+ ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
}
}
}
// Path should be relative to db_name, but begin with slash.
std::string filename = "/blob_dir/000001.blob";
ASSERT_EQ(filename, metadata[0].name);
+ ASSERT_EQ(1, metadata[0].file_number);
ASSERT_EQ("default", metadata[0].column_family_name);
std::vector<std::string> livefile;
uint64_t mfs;
// filtered regardless of snapshot.
const Snapshot *snapshot = blob_db_->GetSnapshot();
// Issue manual compaction to trigger compaction filter.
- ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(),
- blob_db_->DefaultColumnFamily(), nullptr,
- nullptr));
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
blob_db_->ReleaseSnapshot(snapshot);
// Verify expired blob index are filtered.
std::vector<KeyVersion> versions;
}
// Test compaction filter should remove any blob index where corresponding
-// blob file has been removed (either by FIFO or garbage collection).
+// blob file has been removed.
TEST_F(BlobDBTest, FilterFileNotAvailable) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
ASSERT_EQ("foo", versions[1].user_key);
VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
- ASSERT_OK(blob_db_->Flush(FlushOptions()));
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
ASSERT_EQ(2, versions.size());
VerifyDB(data_after_compact);
}
+TEST_F(BlobDBTest, GarbageCollection) {
+ constexpr size_t kNumPuts = 1 << 10;
+
+ constexpr uint64_t kExpiration = 1000;
+ constexpr uint64_t kCompactTime = 500;
+
+ constexpr uint64_t kKeySize = 7; // "key" + 4 digits
+
+ constexpr uint64_t kSmallValueSize = 1 << 6;
+ constexpr uint64_t kLargeValueSize = 1 << 8;
+ constexpr uint64_t kMinBlobSize = 1 << 7;
+ static_assert(kSmallValueSize < kMinBlobSize, "");
+ static_assert(kLargeValueSize > kMinBlobSize, "");
+
+ constexpr size_t kBlobsPerFile = 8;
+ constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
+ constexpr uint64_t kBlobFileSize =
+ BlobLogHeader::kSize +
+ (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
+
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = kMinBlobSize;
+ bdb_options.blob_file_size = kBlobFileSize;
+ bdb_options.enable_garbage_collection = true;
+ bdb_options.garbage_collection_cutoff = 0.25;
+ bdb_options.disable_background_tasks = true;
+
+ Options options;
+ options.env = mock_env_.get();
+ options.statistics = CreateDBStatistics();
+
+ Open(bdb_options, options);
+
+ std::map<std::string, std::string> data;
+ std::map<std::string, KeyVersion> blob_value_versions;
+ std::map<std::string, BlobIndexVersion> blob_index_versions;
+
+ Random rnd(301);
+
+ // Add a bunch of large non-TTL values. These will be written to non-TTL
+ // blob files and will be subject to GC.
+ for (size_t i = 0; i < kNumPuts; ++i) {
+ std::ostringstream oss;
+ oss << "key" << std::setw(4) << std::setfill('0') << i;
+
+ const std::string key(oss.str());
+ const std::string value(
+ test::RandomHumanReadableString(&rnd, kLargeValueSize));
+ const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+
+ ASSERT_OK(Put(key, value));
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+
+ data[key] = value;
+ blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
+ blob_index_versions[key] =
+ BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
+ sequence, kTypeBlobIndex);
+ }
+
+ // Add some small and/or TTL values that will be ignored during GC.
+ // First, add a large TTL value will be written to its own TTL blob file.
+ {
+ const std::string key("key2000");
+ const std::string value(
+ test::RandomHumanReadableString(&rnd, kLargeValueSize));
+ const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+
+ ASSERT_OK(PutUntil(key, value, kExpiration));
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+
+ data[key] = value;
+ blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
+ blob_index_versions[key] =
+ BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
+ sequence, kTypeBlobIndex);
+ }
+
+ // Now add a small TTL value (which will be inlined).
+ {
+ const std::string key("key3000");
+ const std::string value(
+ test::RandomHumanReadableString(&rnd, kSmallValueSize));
+ const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+
+ ASSERT_OK(PutUntil(key, value, kExpiration));
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+
+ data[key] = value;
+ blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
+ blob_index_versions[key] = BlobIndexVersion(
+ key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
+ }
+
+ // Finally, add a small non-TTL value (which will be stored as a regular
+ // value).
+ {
+ const std::string key("key4000");
+ const std::string value(
+ test::RandomHumanReadableString(&rnd, kSmallValueSize));
+ const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+
+ ASSERT_OK(Put(key, value));
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+
+ data[key] = value;
+ blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
+ blob_index_versions[key] = BlobIndexVersion(
+ key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
+ }
+
+ VerifyDB(data);
+ VerifyBaseDB(blob_value_versions);
+ VerifyBaseDBBlobIndex(blob_index_versions);
+
+ // At this point, we should have 128 immutable non-TTL files with file numbers
+ // 1..128.
+ {
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
+ for (size_t i = 0; i < kNumBlobFiles; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
+ ASSERT_EQ(live_imm_files[i]->GetFileSize(),
+ kBlobFileSize + BlobLogFooter::kSize);
+ }
+ }
+
+ mock_env_->set_current_time(kCompactTime);
+
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+ // We expect the data to remain the same and the blobs from the oldest N files
+ // to be moved to new files. Sequence numbers get zeroed out during the
+ // compaction.
+ VerifyDB(data);
+
+ for (auto &pair : blob_value_versions) {
+ KeyVersion &version = pair.second;
+ version.sequence = 0;
+ }
+
+ VerifyBaseDB(blob_value_versions);
+
+ const uint64_t cutoff = static_cast<uint64_t>(
+ bdb_options.garbage_collection_cutoff * kNumBlobFiles);
+ for (auto &pair : blob_index_versions) {
+ BlobIndexVersion &version = pair.second;
+
+ version.sequence = 0;
+
+ if (version.file_number == kInvalidBlobFileNumber) {
+ continue;
+ }
+
+ if (version.file_number > cutoff) {
+ continue;
+ }
+
+ version.file_number += kNumBlobFiles + 1;
+ }
+
+ VerifyBaseDBBlobIndex(blob_index_versions);
+
+ const Statistics *const statistics = options.statistics.get();
+ assert(statistics);
+
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
+ cutoff * kBlobsPerFile);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
+ cutoff * kBlobsPerFile * kLargeValueSize);
+
+ // At this point, we should have 128 immutable non-TTL files with file numbers
+ // 33..128 and 130..161. (129 was taken by the TTL blob file.)
+ {
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
+ for (size_t i = 0; i < kNumBlobFiles; ++i) {
+ uint64_t expected_file_number = i + cutoff + 1;
+ if (expected_file_number > kNumBlobFiles) {
+ ++expected_file_number;
+ }
+
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
+ ASSERT_EQ(live_imm_files[i]->GetFileSize(),
+ kBlobFileSize + BlobLogFooter::kSize);
+ }
+ }
+}
+
+TEST_F(BlobDBTest, GarbageCollectionFailure) {
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.enable_garbage_collection = true;
+ bdb_options.garbage_collection_cutoff = 1.0;
+ bdb_options.disable_background_tasks = true;
+
+ Options db_options;
+ db_options.statistics = CreateDBStatistics();
+
+ Open(bdb_options, db_options);
+
+ // Write a couple of valid blobs.
+ Put("foo", "bar");
+ Put("dead", "beef");
+
+ // Write a fake blob reference into the base DB that cannot be parsed.
+ WriteBatch batch;
+ ASSERT_OK(WriteBatchInternal::PutBlobIndex(
+ &batch, blob_db_->DefaultColumnFamily()->GetID(), "key",
+ "not a valid blob index"));
+ ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
+
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(blob_files.size(), 1);
+ auto blob_file = blob_files[0];
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
+
+ ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
+ .IsCorruption());
+
+ const Statistics *const statistics = db_options.statistics.get();
+ assert(statistics);
+
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
+}
+
// File should be evicted after expiration.
TEST_F(BlobDBTest, EvictExpiredFile) {
BlobDBOptions bdb_options;
}
}
+TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
+ BlobDBOptions bdb_options;
+ bdb_options.enable_garbage_collection = true;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+
+ // Register some dummy blob files.
+ blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
+ blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
+ blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
+ blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
+ blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
+
+ // Initialize the blob <-> SST file mapping. First, add some SST files with
+ // blob file references, then some without.
+ std::vector<LiveFileMetaData> live_files;
+
+ for (uint64_t i = 1; i <= 10; ++i) {
+ LiveFileMetaData live_file;
+ live_file.file_number = i;
+ live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
+
+ live_files.emplace_back(live_file);
+ }
+
+ for (uint64_t i = 11; i <= 20; ++i) {
+ LiveFileMetaData live_file;
+ live_file.file_number = i;
+
+ live_files.emplace_back(live_file);
+ }
+
+ blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
+
+ // Check that the blob <-> SST mappings have been correctly initialized.
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+
+ ASSERT_EQ(blob_files.size(), 5);
+
+ {
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 5);
+ for (size_t i = 0; i < 5; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
+ }
+
+ ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
+ }
+
+ {
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
+ const std::vector<bool> expected_obsolete{false, false, false, false,
+ false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 5);
+ for (size_t i = 0; i < 5; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
+ }
+
+ ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
+ }
+
+ // Simulate a flush where the SST does not reference any blob files.
+ {
+ FlushJobInfo info{};
+ info.file_number = 21;
+ info.smallest_seqno = 1;
+ info.largest_seqno = 100;
+
+ blob_db_impl()->TEST_ProcessFlushJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
+ const std::vector<bool> expected_obsolete{false, false, false, false,
+ false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 5);
+ for (size_t i = 0; i < 5; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
+ }
+
+ ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
+ }
+
+ // Simulate a flush where the SST references a blob file.
+ {
+ FlushJobInfo info{};
+ info.file_number = 22;
+ info.oldest_blob_file_number = 5;
+ info.smallest_seqno = 101;
+ info.largest_seqno = 200;
+
+ blob_db_impl()->TEST_ProcessFlushJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
+ const std::vector<bool> expected_obsolete{false, false, false, false,
+ false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 5);
+ for (size_t i = 0; i < 5; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
+ }
+
+ ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
+ }
+
+ // Simulate a compaction. Some inputs and outputs have blob file references,
+ // some don't. There is also a trivial move (which means the SST appears on
+ // both the input and the output list). Blob file 1 loses all its linked SSTs,
+ // and since it got marked immutable at sequence number 200 which has already
+ // been flushed, it can be marked obsolete.
+ {
+ CompactionJobInfo info{};
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1});
+ info.input_file_infos.emplace_back(
+ CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5});
+ info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
+ info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
+ info.output_file_infos.emplace_back(
+ CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
+
+ blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
+ const std::vector<bool> expected_obsolete{true, false, false, false, false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 4);
+ for (size_t i = 0; i < 4; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
+ }
+
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(obsolete_files.size(), 1);
+ ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
+ }
+
+ // Simulate a failed compaction. No mappings should be updated.
+ {
+ CompactionJobInfo info{};
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
+ info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
+ info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
+ info.status = Status::Corruption();
+
+ blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
+ const std::vector<bool> expected_obsolete{true, false, false, false, false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 4);
+ for (size_t i = 0; i < 4; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
+ }
+
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(obsolete_files.size(), 1);
+ ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
+ }
+
+ // Simulate another compaction. Blob file 2 loses all its linked SSTs
+ // but since it got marked immutable at sequence number 300 which hasn't
+ // been flushed yet, it cannot be marked obsolete at this point.
+ {
+ CompactionJobInfo info{};
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
+ info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
+ info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
+
+ blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
+ const std::vector<bool> expected_obsolete{true, false, false, false, false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 4);
+ for (size_t i = 0; i < 4; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
+ }
+
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(obsolete_files.size(), 1);
+ ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
+ }
+
+ // Simulate a flush with largest sequence number 300. This will make it
+ // possible to mark blob file 2 obsolete.
+ {
+ FlushJobInfo info{};
+ info.file_number = 26;
+ info.smallest_seqno = 201;
+ info.largest_seqno = 300;
+
+ blob_db_impl()->TEST_ProcessFlushJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
+ const std::vector<bool> expected_obsolete{true, true, false, false, false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 3);
+ for (size_t i = 0; i < 3; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3);
+ }
+
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(obsolete_files.size(), 2);
+ ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
+ ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2);
+ }
+}
+
TEST_F(BlobDBTest, ShutdownWait) {
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 100;
{"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
});
// Force all tasks to be scheduled immediately.
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TimeQueue::Add:item.end", [&](void *arg) {
std::chrono::steady_clock::time_point *tp =
static_cast<std::chrono::steady_clock::time_point *>(arg);
std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
});
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
// Sleep 3 ms to increase the chance of data race.
// We've synced up the code so that EvictExpiredFiles()
}
} // namespace blob_db
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE
// A black-box test for the ttl wrapper around rocksdb
int main(int argc, char** argv) {