#include <utility>
#include <vector>
+#include "cache/cache_reservation_manager.h"
#include "db/blob/blob_file_meta.h"
#include "db/dbformat.h"
#include "db/internal_stats.h"
namespace ROCKSDB_NAMESPACE {
-bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
- if (a->fd.largest_seqno != b->fd.largest_seqno) {
- return a->fd.largest_seqno > b->fd.largest_seqno;
- }
- if (a->fd.smallest_seqno != b->fd.smallest_seqno) {
- return a->fd.smallest_seqno > b->fd.smallest_seqno;
- }
- // Break ties by file number
- return a->fd.GetNumber() > b->fd.GetNumber();
-}
+class VersionBuilder::Rep {
+ class NewestFirstBySeqNo {
+ public:
+ bool operator()(const FileMetaData* lhs, const FileMetaData* rhs) const {
+ assert(lhs);
+ assert(rhs);
-namespace {
-bool BySmallestKey(FileMetaData* a, FileMetaData* b,
- const InternalKeyComparator* cmp) {
- int r = cmp->Compare(a->smallest, b->smallest);
- if (r != 0) {
- return (r < 0);
- }
- // Break ties by file number
- return (a->fd.GetNumber() < b->fd.GetNumber());
-}
-} // namespace
+ if (lhs->fd.largest_seqno != rhs->fd.largest_seqno) {
+ return lhs->fd.largest_seqno > rhs->fd.largest_seqno;
+ }
-class VersionBuilder::Rep {
- private:
- // Helper to sort files_ in v
- // kLevel0 -- NewestFirstBySeqNo
- // kLevelNon0 -- BySmallestKey
- struct FileComparator {
- enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method;
- const InternalKeyComparator* internal_comparator;
-
- FileComparator() : internal_comparator(nullptr) {}
-
- bool operator()(FileMetaData* f1, FileMetaData* f2) const {
- switch (sort_method) {
- case kLevel0:
- return NewestFirstBySeqNo(f1, f2);
- case kLevelNon0:
- return BySmallestKey(f1, f2, internal_comparator);
+ if (lhs->fd.smallest_seqno != rhs->fd.smallest_seqno) {
+ return lhs->fd.smallest_seqno > rhs->fd.smallest_seqno;
}
- assert(false);
- return false;
+
+ // Break ties by file number
+ return lhs->fd.GetNumber() > rhs->fd.GetNumber();
+ }
+ };
+
+ class BySmallestKey {
+ public:
+ explicit BySmallestKey(const InternalKeyComparator* cmp) : cmp_(cmp) {}
+
+ bool operator()(const FileMetaData* lhs, const FileMetaData* rhs) const {
+ assert(lhs);
+ assert(rhs);
+ assert(cmp_);
+
+ const int r = cmp_->Compare(lhs->smallest, rhs->smallest);
+ if (r != 0) {
+ return (r < 0);
+ }
+
+ // Break ties by file number
+ return (lhs->fd.GetNumber() < rhs->fd.GetNumber());
}
+
+ private:
+ const InternalKeyComparator* cmp_;
};
struct LevelState {
std::unordered_map<uint64_t, FileMetaData*> added_files;
};
+ // A class that represents the accumulated changes (like additional garbage or
+ // newly linked/unlinked SST files) for a given blob file after applying a
+ // series of VersionEdits.
class BlobFileMetaDataDelta {
public:
bool IsEmpty() const {
- return !shared_meta_ && !additional_garbage_count_ &&
- !additional_garbage_bytes_ && newly_linked_ssts_.empty() &&
- newly_unlinked_ssts_.empty();
- }
-
- std::shared_ptr<SharedBlobFileMetaData> GetSharedMeta() const {
- return shared_meta_;
+ return !additional_garbage_count_ && !additional_garbage_bytes_ &&
+ newly_linked_ssts_.empty() && newly_unlinked_ssts_.empty();
}
uint64_t GetAdditionalGarbageCount() const {
return newly_unlinked_ssts_;
}
- void SetSharedMeta(std::shared_ptr<SharedBlobFileMetaData> shared_meta) {
- assert(!shared_meta_);
- assert(shared_meta);
-
- shared_meta_ = std::move(shared_meta);
- }
-
void AddGarbage(uint64_t count, uint64_t bytes) {
additional_garbage_count_ += count;
additional_garbage_bytes_ += bytes;
}
private:
- std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
uint64_t additional_garbage_count_ = 0;
uint64_t additional_garbage_bytes_ = 0;
std::unordered_set<uint64_t> newly_linked_ssts_;
std::unordered_set<uint64_t> newly_unlinked_ssts_;
};
+ // A class that represents the state of a blob file after applying a series of
+ // VersionEdits. In addition to the resulting state, it also contains the
+ // delta (see BlobFileMetaDataDelta above). The resulting state can be used to
+ // identify obsolete blob files, while the delta makes it possible to
+ // efficiently detect trivial moves.
+ class MutableBlobFileMetaData {
+ public:
+ // To be used for brand new blob files
+ explicit MutableBlobFileMetaData(
+ std::shared_ptr<SharedBlobFileMetaData>&& shared_meta)
+ : shared_meta_(std::move(shared_meta)) {}
+
+ // To be used for pre-existing blob files
+ explicit MutableBlobFileMetaData(
+ const std::shared_ptr<BlobFileMetaData>& meta)
+ : shared_meta_(meta->GetSharedMeta()),
+ linked_ssts_(meta->GetLinkedSsts()),
+ garbage_blob_count_(meta->GetGarbageBlobCount()),
+ garbage_blob_bytes_(meta->GetGarbageBlobBytes()) {}
+
+ const std::shared_ptr<SharedBlobFileMetaData>& GetSharedMeta() const {
+ return shared_meta_;
+ }
+
+ uint64_t GetBlobFileNumber() const {
+ assert(shared_meta_);
+ return shared_meta_->GetBlobFileNumber();
+ }
+
+ bool HasDelta() const { return !delta_.IsEmpty(); }
+
+ const std::unordered_set<uint64_t>& GetLinkedSsts() const {
+ return linked_ssts_;
+ }
+
+ uint64_t GetGarbageBlobCount() const { return garbage_blob_count_; }
+
+ uint64_t GetGarbageBlobBytes() const { return garbage_blob_bytes_; }
+
+ bool AddGarbage(uint64_t count, uint64_t bytes) {
+ assert(shared_meta_);
+
+ if (garbage_blob_count_ + count > shared_meta_->GetTotalBlobCount() ||
+ garbage_blob_bytes_ + bytes > shared_meta_->GetTotalBlobBytes()) {
+ return false;
+ }
+
+ delta_.AddGarbage(count, bytes);
+
+ garbage_blob_count_ += count;
+ garbage_blob_bytes_ += bytes;
+
+ return true;
+ }
+
+ void LinkSst(uint64_t sst_file_number) {
+ delta_.LinkSst(sst_file_number);
+
+ assert(linked_ssts_.find(sst_file_number) == linked_ssts_.end());
+ linked_ssts_.emplace(sst_file_number);
+ }
+
+ void UnlinkSst(uint64_t sst_file_number) {
+ delta_.UnlinkSst(sst_file_number);
+
+ assert(linked_ssts_.find(sst_file_number) != linked_ssts_.end());
+ linked_ssts_.erase(sst_file_number);
+ }
+
+ private:
+ std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
+ // Accumulated changes
+ BlobFileMetaDataDelta delta_;
+ // Resulting state after applying the changes
+ BlobFileMetaData::LinkedSsts linked_ssts_;
+ uint64_t garbage_blob_count_ = 0;
+ uint64_t garbage_blob_bytes_ = 0;
+ };
+
const FileOptions& file_options_;
const ImmutableCFOptions* const ioptions_;
TableCache* table_cache_;
bool has_invalid_levels_;
// Current levels of table files affected by additions/deletions.
std::unordered_map<uint64_t, int> table_file_levels_;
- FileComparator level_zero_cmp_;
- FileComparator level_nonzero_cmp_;
+ // Current compact cursors that should be changed after the last compaction
+ std::unordered_map<int, InternalKey> updated_compact_cursors_;
+ NewestFirstBySeqNo level_zero_cmp_;
+ BySmallestKey level_nonzero_cmp_;
+
+ // Mutable metadata objects for all blob files affected by the series of
+ // version edits.
+ std::map<uint64_t, MutableBlobFileMetaData> mutable_blob_file_metas_;
- // Metadata delta for all blob files affected by the series of version edits.
- std::map<uint64_t, BlobFileMetaDataDelta> blob_file_meta_deltas_;
+ std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
public:
Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
TableCache* table_cache, VersionStorageInfo* base_vstorage,
- VersionSet* version_set)
+ VersionSet* version_set,
+ std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr)
: file_options_(file_options),
ioptions_(ioptions),
table_cache_(table_cache),
base_vstorage_(base_vstorage),
version_set_(version_set),
num_levels_(base_vstorage->num_levels()),
- has_invalid_levels_(false) {
+ has_invalid_levels_(false),
+ level_nonzero_cmp_(base_vstorage_->InternalComparator()),
+ file_metadata_cache_res_mgr_(file_metadata_cache_res_mgr) {
assert(ioptions_);
levels_ = new LevelState[num_levels_];
- level_zero_cmp_.sort_method = FileComparator::kLevel0;
- level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
- level_nonzero_cmp_.internal_comparator =
- base_vstorage_->InternalComparator();
}
~Rep() {
table_cache_->ReleaseHandle(f->table_reader_handle);
f->table_reader_handle = nullptr;
}
- delete f;
- }
- }
- bool IsBlobFileInVersion(uint64_t blob_file_number) const {
- auto delta_it = blob_file_meta_deltas_.find(blob_file_number);
- if (delta_it != blob_file_meta_deltas_.end()) {
- if (delta_it->second.GetSharedMeta()) {
- return true;
+ if (file_metadata_cache_res_mgr_) {
+ Status s = file_metadata_cache_res_mgr_->UpdateCacheReservation(
+ f->ApproximateMemoryUsage(), false /* increase */);
+ s.PermitUncheckedError();
}
+ delete f;
}
-
- assert(base_vstorage_);
-
- const auto& base_blob_files = base_vstorage_->GetBlobFiles();
-
- auto base_it = base_blob_files.find(blob_file_number);
- if (base_it != base_blob_files.end()) {
- assert(base_it->second);
- assert(base_it->second->GetSharedMeta());
-
- return true;
- }
-
- return false;
}
+ // Mapping used for checking the consistency of links between SST files and
+ // blob files. It is built using the forward links (table file -> blob file),
+ // and is subsequently compared with the inverse mapping stored in the
+ // BlobFileMetaData objects.
using ExpectedLinkedSsts =
std::unordered_map<uint64_t, BlobFileMetaData::LinkedSsts>;
(*expected_linked_ssts)[blob_file_number].emplace(table_file_number);
}
- Status CheckConsistencyDetails(VersionStorageInfo* vstorage) {
- // Make sure the files are sorted correctly and that the links between
- // table files and blob files are consistent. The latter is checked using
- // the following mapping, which is built using the forward links
- // (table file -> blob file), and is subsequently compared with the inverse
- // mapping stored in the BlobFileMetaData objects.
- ExpectedLinkedSsts expected_linked_ssts;
+ template <typename Checker>
+ Status CheckConsistencyDetailsForLevel(
+ const VersionStorageInfo* vstorage, int level, Checker checker,
+ const std::string& sync_point,
+ ExpectedLinkedSsts* expected_linked_ssts) const {
+#ifdef NDEBUG
+ (void)sync_point;
+#endif
- for (int level = 0; level < num_levels_; level++) {
- auto& level_files = vstorage->LevelFiles(level);
+ assert(vstorage);
+ assert(level >= 0 && level < num_levels_);
+ assert(expected_linked_ssts);
- if (level_files.empty()) {
- continue;
- }
+ const auto& level_files = vstorage->LevelFiles(level);
+
+ if (level_files.empty()) {
+ return Status::OK();
+ }
+
+ assert(level_files[0]);
+ UpdateExpectedLinkedSsts(level_files[0]->fd.GetNumber(),
+ level_files[0]->oldest_blob_file_number,
+ expected_linked_ssts);
+
+ for (size_t i = 1; i < level_files.size(); ++i) {
+ assert(level_files[i]);
+ UpdateExpectedLinkedSsts(level_files[i]->fd.GetNumber(),
+ level_files[i]->oldest_blob_file_number,
+ expected_linked_ssts);
+
+ auto lhs = level_files[i - 1];
+ auto rhs = level_files[i];
- assert(level_files[0]);
- UpdateExpectedLinkedSsts(level_files[0]->fd.GetNumber(),
- level_files[0]->oldest_blob_file_number,
- &expected_linked_ssts);
- for (size_t i = 1; i < level_files.size(); i++) {
- assert(level_files[i]);
- UpdateExpectedLinkedSsts(level_files[i]->fd.GetNumber(),
- level_files[i]->oldest_blob_file_number,
- &expected_linked_ssts);
-
- auto f1 = level_files[i - 1];
- auto f2 = level_files[i];
- if (level == 0) {
#ifndef NDEBUG
- auto pair = std::make_pair(&f1, &f2);
- TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency0", &pair);
+ auto pair = std::make_pair(&lhs, &rhs);
+ TEST_SYNC_POINT_CALLBACK(sync_point, &pair);
#endif
- if (!level_zero_cmp_(f1, f2)) {
- return Status::Corruption("L0 files are not sorted properly");
+
+ const Status s = checker(lhs, rhs);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ return Status::OK();
+ }
+
+ // Make sure table files are sorted correctly and that the links between
+ // table files and blob files are consistent.
+ Status CheckConsistencyDetails(const VersionStorageInfo* vstorage) const {
+ assert(vstorage);
+
+ ExpectedLinkedSsts expected_linked_ssts;
+
+ if (num_levels_ > 0) {
+ // Check L0
+ {
+ auto l0_checker = [this](const FileMetaData* lhs,
+ const FileMetaData* rhs) {
+ assert(lhs);
+ assert(rhs);
+
+ if (!level_zero_cmp_(lhs, rhs)) {
+ std::ostringstream oss;
+ oss << "L0 files are not sorted properly: files #"
+ << lhs->fd.GetNumber() << ", #" << rhs->fd.GetNumber();
+
+ return Status::Corruption("VersionBuilder", oss.str());
}
- if (f2->fd.smallest_seqno == f2->fd.largest_seqno) {
+ if (rhs->fd.smallest_seqno == rhs->fd.largest_seqno) {
// This is an external file that we ingested
- SequenceNumber external_file_seqno = f2->fd.smallest_seqno;
- if (!(external_file_seqno < f1->fd.largest_seqno ||
+ const SequenceNumber external_file_seqno = rhs->fd.smallest_seqno;
+
+ if (!(external_file_seqno < lhs->fd.largest_seqno ||
external_file_seqno == 0)) {
- return Status::Corruption(
- "L0 file with seqno " +
- NumberToString(f1->fd.smallest_seqno) + " " +
- NumberToString(f1->fd.largest_seqno) +
- " vs. file with global_seqno" +
- NumberToString(external_file_seqno) + " with fileNumber " +
- NumberToString(f1->fd.GetNumber()));
+ std::ostringstream oss;
+ oss << "L0 file #" << lhs->fd.GetNumber() << " with seqno "
+ << lhs->fd.smallest_seqno << ' ' << lhs->fd.largest_seqno
+ << " vs. file #" << rhs->fd.GetNumber()
+ << " with global_seqno " << external_file_seqno;
+
+ return Status::Corruption("VersionBuilder", oss.str());
}
- } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
- return Status::Corruption(
- "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) +
- " " + NumberToString(f1->fd.largest_seqno) + " " +
- NumberToString(f1->fd.GetNumber()) + " vs. " +
- NumberToString(f2->fd.smallest_seqno) + " " +
- NumberToString(f2->fd.largest_seqno) + " " +
- NumberToString(f2->fd.GetNumber()));
+ } else if (lhs->fd.smallest_seqno <= rhs->fd.smallest_seqno) {
+ std::ostringstream oss;
+ oss << "L0 file #" << lhs->fd.GetNumber() << " with seqno "
+ << lhs->fd.smallest_seqno << ' ' << lhs->fd.largest_seqno
+ << " vs. file #" << rhs->fd.GetNumber() << " with seqno "
+ << rhs->fd.smallest_seqno << ' ' << rhs->fd.largest_seqno;
+
+ return Status::Corruption("VersionBuilder", oss.str());
}
- } else {
-#ifndef NDEBUG
- auto pair = std::make_pair(&f1, &f2);
- TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency1", &pair);
-#endif
- if (!level_nonzero_cmp_(f1, f2)) {
- return Status::Corruption("L" + NumberToString(level) +
- " files are not sorted properly");
+
+ return Status::OK();
+ };
+
+ const Status s = CheckConsistencyDetailsForLevel(
+ vstorage, /* level */ 0, l0_checker,
+ "VersionBuilder::CheckConsistency0", &expected_linked_ssts);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ // Check L1 and up
+ const InternalKeyComparator* const icmp = vstorage->InternalComparator();
+ assert(icmp);
+
+ for (int level = 1; level < num_levels_; ++level) {
+ auto checker = [this, level, icmp](const FileMetaData* lhs,
+ const FileMetaData* rhs) {
+ assert(lhs);
+ assert(rhs);
+
+ if (!level_nonzero_cmp_(lhs, rhs)) {
+ std::ostringstream oss;
+ oss << 'L' << level << " files are not sorted properly: files #"
+ << lhs->fd.GetNumber() << ", #" << rhs->fd.GetNumber();
+
+ return Status::Corruption("VersionBuilder", oss.str());
}
- // Make sure there is no overlap in levels > 0
- if (vstorage->InternalComparator()->Compare(f1->largest,
- f2->smallest) >= 0) {
- return Status::Corruption(
- "L" + NumberToString(level) + " have overlapping ranges " +
- (f1->largest).DebugString(true) + " vs. " +
- (f2->smallest).DebugString(true));
+ // Make sure there is no overlap in level
+ if (icmp->Compare(lhs->largest, rhs->smallest) >= 0) {
+ std::ostringstream oss;
+ oss << 'L' << level << " has overlapping ranges: file #"
+ << lhs->fd.GetNumber()
+ << " largest key: " << lhs->largest.DebugString(true)
+ << " vs. file #" << rhs->fd.GetNumber()
+ << " smallest key: " << rhs->smallest.DebugString(true);
+
+ return Status::Corruption("VersionBuilder", oss.str());
}
+
+ return Status::OK();
+ };
+
+ const Status s = CheckConsistencyDetailsForLevel(
+ vstorage, level, checker, "VersionBuilder::CheckConsistency1",
+ &expected_linked_ssts);
+ if (!s.ok()) {
+ return s;
}
}
}
- // Make sure that all blob files in the version have non-garbage data.
+ // Make sure that all blob files in the version have non-garbage data and
+ // the links between them and the table files are consistent.
const auto& blob_files = vstorage->GetBlobFiles();
- for (const auto& pair : blob_files) {
- const uint64_t blob_file_number = pair.first;
- const auto& blob_file_meta = pair.second;
+ for (const auto& blob_file_meta : blob_files) {
assert(blob_file_meta);
+ const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
+
if (blob_file_meta->GetGarbageBlobCount() >=
blob_file_meta->GetTotalBlobCount()) {
std::ostringstream oss;
return ret_s;
}
- Status CheckConsistency(VersionStorageInfo* vstorage) {
+ Status CheckConsistency(const VersionStorageInfo* vstorage) const {
+ assert(vstorage);
+
// Always run consistency checks in debug build
#ifdef NDEBUG
if (!vstorage->force_consistency_checks()) {
return true;
}
+ bool IsBlobFileInVersion(uint64_t blob_file_number) const {
+ auto mutable_it = mutable_blob_file_metas_.find(blob_file_number);
+ if (mutable_it != mutable_blob_file_metas_.end()) {
+ return true;
+ }
+
+ assert(base_vstorage_);
+ const auto meta = base_vstorage_->GetBlobFileMetaData(blob_file_number);
+
+ return !!meta;
+ }
+
+ MutableBlobFileMetaData* GetOrCreateMutableBlobFileMetaData(
+ uint64_t blob_file_number) {
+ auto mutable_it = mutable_blob_file_metas_.find(blob_file_number);
+ if (mutable_it != mutable_blob_file_metas_.end()) {
+ return &mutable_it->second;
+ }
+
+ assert(base_vstorage_);
+ const auto meta = base_vstorage_->GetBlobFileMetaData(blob_file_number);
+
+ if (meta) {
+ mutable_it = mutable_blob_file_metas_
+ .emplace(blob_file_number, MutableBlobFileMetaData(meta))
+ .first;
+ return &mutable_it->second;
+ }
+
+ return nullptr;
+ }
+
Status ApplyBlobFileAddition(const BlobFileAddition& blob_file_addition) {
const uint64_t blob_file_number = blob_file_addition.GetBlobFileNumber();
blob_file_addition.GetChecksumMethod(),
blob_file_addition.GetChecksumValue(), deleter);
- blob_file_meta_deltas_[blob_file_number].SetSharedMeta(
- std::move(shared_meta));
+ mutable_blob_file_metas_.emplace(
+ blob_file_number, MutableBlobFileMetaData(std::move(shared_meta)));
return Status::OK();
}
Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
const uint64_t blob_file_number = blob_file_garbage.GetBlobFileNumber();
- if (!IsBlobFileInVersion(blob_file_number)) {
+ MutableBlobFileMetaData* const mutable_meta =
+ GetOrCreateMutableBlobFileMetaData(blob_file_number);
+
+ if (!mutable_meta) {
std::ostringstream oss;
oss << "Blob file #" << blob_file_number << " not found";
return Status::Corruption("VersionBuilder", oss.str());
}
- blob_file_meta_deltas_[blob_file_number].AddGarbage(
- blob_file_garbage.GetGarbageBlobCount(),
- blob_file_garbage.GetGarbageBlobBytes());
+ if (!mutable_meta->AddGarbage(blob_file_garbage.GetGarbageBlobCount(),
+ blob_file_garbage.GetGarbageBlobBytes())) {
+ std::ostringstream oss;
+ oss << "Garbage overflow for blob file #" << blob_file_number;
+ return Status::Corruption("VersionBuilder", oss.str());
+ }
return Status::OK();
}
const uint64_t blob_file_number =
GetOldestBlobFileNumberForTableFile(level, file_number);
- if (blob_file_number != kInvalidBlobFileNumber &&
- IsBlobFileInVersion(blob_file_number)) {
- blob_file_meta_deltas_[blob_file_number].UnlinkSst(file_number);
+ if (blob_file_number != kInvalidBlobFileNumber) {
+ MutableBlobFileMetaData* const mutable_meta =
+ GetOrCreateMutableBlobFileMetaData(blob_file_number);
+ if (mutable_meta) {
+ mutable_meta->UnlinkSst(file_number);
+ }
}
auto& level_state = levels_[level];
FileMetaData* const f = new FileMetaData(meta);
f->refs = 1;
+ if (file_metadata_cache_res_mgr_) {
+ Status s = file_metadata_cache_res_mgr_->UpdateCacheReservation(
+ f->ApproximateMemoryUsage(), true /* increase */);
+ if (!s.ok()) {
+ delete f;
+ s = Status::MemoryLimit(
+ "Can't allocate " +
+ kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
+ CacheEntryRole::kFileMetadata)] +
+ " due to exceeding the memory limit "
+ "based on "
+ "cache capacity");
+ return s;
+ }
+ }
+
auto& add_files = level_state.added_files;
assert(add_files.find(file_number) == add_files.end());
add_files.emplace(file_number, f);
const uint64_t blob_file_number = f->oldest_blob_file_number;
- if (blob_file_number != kInvalidBlobFileNumber &&
- IsBlobFileInVersion(blob_file_number)) {
- blob_file_meta_deltas_[blob_file_number].LinkSst(file_number);
+ if (blob_file_number != kInvalidBlobFileNumber) {
+ MutableBlobFileMetaData* const mutable_meta =
+ GetOrCreateMutableBlobFileMetaData(blob_file_number);
+ if (mutable_meta) {
+ mutable_meta->LinkSst(file_number);
+ }
}
table_file_levels_[file_number] = level;
return Status::OK();
}
+ Status ApplyCompactCursors(int level,
+ const InternalKey& smallest_uncompacted_key) {
+ if (level < 0) {
+ std::ostringstream oss;
+ oss << "Cannot add compact cursor (" << level << ","
+ << smallest_uncompacted_key.Encode().ToString()
+ << " due to invalid level (level = " << level << ")";
+ return Status::Corruption("VersionBuilder", oss.str());
+ }
+ if (level < num_levels_) {
+ // Omit levels (>= num_levels_) when re-open with shrinking num_levels_
+ updated_compact_cursors_[level] = smallest_uncompacted_key;
+ }
+ return Status::OK();
+ }
+
// Apply all of the edits in *edit to the current state.
- Status Apply(VersionEdit* edit) {
+ Status Apply(const VersionEdit* edit) {
{
const Status s = CheckConsistency(base_vstorage_);
if (!s.ok()) {
}
}
+ // Populate compact cursors for round-robin compaction, leave
+ // the cursor to be empty to indicate it is invalid
+ for (const auto& cursor : edit->GetCompactCursors()) {
+ const int level = cursor.first;
+ const InternalKey smallest_uncompacted_key = cursor.second;
+ const Status s = ApplyCompactCursors(level, smallest_uncompacted_key);
+ if (!s.ok()) {
+ return s;
+ }
+ }
return Status::OK();
}
- static BlobFileMetaData::LinkedSsts ApplyLinkedSstChanges(
- const BlobFileMetaData::LinkedSsts& base,
- const std::unordered_set<uint64_t>& newly_linked,
- const std::unordered_set<uint64_t>& newly_unlinked) {
- BlobFileMetaData::LinkedSsts result(base);
+ // Helper function template for merging the blob file metadata from the base
+ // version with the mutable metadata representing the state after applying the
+ // edits. The function objects process_base and process_mutable are
+ // respectively called to handle a base version object when there is no
+ // matching mutable object, and a mutable object when there is no matching
+ // base version object. process_both is called to perform the merge when a
+ // given blob file appears both in the base version and the mutable list. The
+ // helper stops processing objects if a function object returns false. Blob
+ // files with a file number below first_blob_file are not processed.
+ template <typename ProcessBase, typename ProcessMutable, typename ProcessBoth>
+ void MergeBlobFileMetas(uint64_t first_blob_file, ProcessBase process_base,
+ ProcessMutable process_mutable,
+ ProcessBoth process_both) const {
+ assert(base_vstorage_);
+
+ auto base_it = base_vstorage_->GetBlobFileMetaDataLB(first_blob_file);
+ const auto base_it_end = base_vstorage_->GetBlobFiles().end();
+
+ auto mutable_it = mutable_blob_file_metas_.lower_bound(first_blob_file);
+ const auto mutable_it_end = mutable_blob_file_metas_.end();
+
+ while (base_it != base_it_end && mutable_it != mutable_it_end) {
+ const auto& base_meta = *base_it;
+ assert(base_meta);
+
+ const uint64_t base_blob_file_number = base_meta->GetBlobFileNumber();
+ const uint64_t mutable_blob_file_number = mutable_it->first;
+
+ if (base_blob_file_number < mutable_blob_file_number) {
+ if (!process_base(base_meta)) {
+ return;
+ }
+
+ ++base_it;
+ } else if (mutable_blob_file_number < base_blob_file_number) {
+ const auto& mutable_meta = mutable_it->second;
+
+ if (!process_mutable(mutable_meta)) {
+ return;
+ }
+
+ ++mutable_it;
+ } else {
+ assert(base_blob_file_number == mutable_blob_file_number);
- for (uint64_t sst_file_number : newly_unlinked) {
- assert(result.find(sst_file_number) != result.end());
+ const auto& mutable_meta = mutable_it->second;
+
+ if (!process_both(base_meta, mutable_meta)) {
+ return;
+ }
- result.erase(sst_file_number);
+ ++base_it;
+ ++mutable_it;
+ }
}
- for (uint64_t sst_file_number : newly_linked) {
- assert(result.find(sst_file_number) == result.end());
+ while (base_it != base_it_end) {
+ const auto& base_meta = *base_it;
- result.emplace(sst_file_number);
+ if (!process_base(base_meta)) {
+ return;
+ }
+
+ ++base_it;
}
- return result;
+ while (mutable_it != mutable_it_end) {
+ const auto& mutable_meta = mutable_it->second;
+
+ if (!process_mutable(mutable_meta)) {
+ return;
+ }
+
+ ++mutable_it;
+ }
}
- static std::shared_ptr<BlobFileMetaData> CreateMetaDataForNewBlobFile(
- const BlobFileMetaDataDelta& delta) {
- auto shared_meta = delta.GetSharedMeta();
- assert(shared_meta);
+ // Helper function template for finding the first blob file that has linked
+ // SSTs.
+ template <typename Meta>
+ static bool CheckLinkedSsts(const Meta& meta,
+ uint64_t* min_oldest_blob_file_num) {
+ assert(min_oldest_blob_file_num);
+
+ if (!meta.GetLinkedSsts().empty()) {
+ assert(*min_oldest_blob_file_num == kInvalidBlobFileNumber);
- assert(delta.GetNewlyUnlinkedSsts().empty());
+ *min_oldest_blob_file_num = meta.GetBlobFileNumber();
- auto meta = BlobFileMetaData::Create(
- std::move(shared_meta), delta.GetNewlyLinkedSsts(),
- delta.GetAdditionalGarbageCount(), delta.GetAdditionalGarbageBytes());
+ return false;
+ }
- return meta;
+ return true;
}
- static std::shared_ptr<BlobFileMetaData>
- GetOrCreateMetaDataForExistingBlobFile(
- const std::shared_ptr<BlobFileMetaData>& base_meta,
- const BlobFileMetaDataDelta& delta) {
- assert(base_meta);
- assert(!delta.GetSharedMeta());
+ // Find the oldest blob file that has linked SSTs.
+ uint64_t GetMinOldestBlobFileNumber() const {
+ uint64_t min_oldest_blob_file_num = kInvalidBlobFileNumber;
- if (delta.IsEmpty()) {
- return base_meta;
- }
+ auto process_base =
+ [&min_oldest_blob_file_num](
+ const std::shared_ptr<BlobFileMetaData>& base_meta) {
+ assert(base_meta);
- auto shared_meta = base_meta->GetSharedMeta();
- assert(shared_meta);
+ return CheckLinkedSsts(*base_meta, &min_oldest_blob_file_num);
+ };
- auto linked_ssts = ApplyLinkedSstChanges(base_meta->GetLinkedSsts(),
- delta.GetNewlyLinkedSsts(),
- delta.GetNewlyUnlinkedSsts());
+ auto process_mutable = [&min_oldest_blob_file_num](
+ const MutableBlobFileMetaData& mutable_meta) {
+ return CheckLinkedSsts(mutable_meta, &min_oldest_blob_file_num);
+ };
+
+ auto process_both = [&min_oldest_blob_file_num](
+ const std::shared_ptr<BlobFileMetaData>& base_meta,
+ const MutableBlobFileMetaData& mutable_meta) {
+#ifndef NDEBUG
+ assert(base_meta);
+ assert(base_meta->GetSharedMeta() == mutable_meta.GetSharedMeta());
+#else
+ (void)base_meta;
+#endif
+
+ // Look at mutable_meta since it supersedes *base_meta
+ return CheckLinkedSsts(mutable_meta, &min_oldest_blob_file_num);
+ };
- auto meta = BlobFileMetaData::Create(
- std::move(shared_meta), std::move(linked_ssts),
- base_meta->GetGarbageBlobCount() + delta.GetAdditionalGarbageCount(),
- base_meta->GetGarbageBlobBytes() + delta.GetAdditionalGarbageBytes());
+ MergeBlobFileMetas(kInvalidBlobFileNumber, process_base, process_mutable,
+ process_both);
- return meta;
+ return min_oldest_blob_file_num;
+ }
+
+ static std::shared_ptr<BlobFileMetaData> CreateBlobFileMetaData(
+ const MutableBlobFileMetaData& mutable_meta) {
+ return BlobFileMetaData::Create(
+ mutable_meta.GetSharedMeta(), mutable_meta.GetLinkedSsts(),
+ mutable_meta.GetGarbageBlobCount(), mutable_meta.GetGarbageBlobBytes());
}
// Add the blob file specified by meta to *vstorage if it is determined to
- // contain valid data (blobs). We make this decision based on the amount
- // of garbage in the file, and whether the file or any lower-numbered blob
- // files have any linked SSTs. The latter condition is tracked using the
- // flag *found_first_non_empty.
- void AddBlobFileIfNeeded(VersionStorageInfo* vstorage,
- const std::shared_ptr<BlobFileMetaData>& meta,
- bool* found_first_non_empty) const {
+ // contain valid data (blobs).
+ template <typename Meta>
+ static void AddBlobFileIfNeeded(VersionStorageInfo* vstorage, Meta&& meta) {
assert(vstorage);
assert(meta);
- assert(found_first_non_empty);
- if (!meta->GetLinkedSsts().empty()) {
- (*found_first_non_empty) = true;
- } else if (!(*found_first_non_empty) ||
- meta->GetGarbageBlobCount() >= meta->GetTotalBlobCount()) {
+ if (meta->GetLinkedSsts().empty() &&
+ meta->GetGarbageBlobCount() >= meta->GetTotalBlobCount()) {
return;
}
- vstorage->AddBlobFile(meta);
+ vstorage->AddBlobFile(std::forward<Meta>(meta));
}
// Merge the blob file metadata from the base version with the changes (edits)
// applied, and save the result into *vstorage.
void SaveBlobFilesTo(VersionStorageInfo* vstorage) const {
- assert(base_vstorage_);
assert(vstorage);
- bool found_first_non_empty = false;
+ assert(base_vstorage_);
+ vstorage->ReserveBlob(base_vstorage_->GetBlobFiles().size() +
+ mutable_blob_file_metas_.size());
- const auto& base_blob_files = base_vstorage_->GetBlobFiles();
- auto base_it = base_blob_files.begin();
- const auto base_it_end = base_blob_files.end();
+ const uint64_t oldest_blob_file_with_linked_ssts =
+ GetMinOldestBlobFileNumber();
- auto delta_it = blob_file_meta_deltas_.begin();
- const auto delta_it_end = blob_file_meta_deltas_.end();
+ auto process_base =
+ [vstorage](const std::shared_ptr<BlobFileMetaData>& base_meta) {
+ assert(base_meta);
- while (base_it != base_it_end && delta_it != delta_it_end) {
- const uint64_t base_blob_file_number = base_it->first;
- const uint64_t delta_blob_file_number = delta_it->first;
+ AddBlobFileIfNeeded(vstorage, base_meta);
- if (base_blob_file_number < delta_blob_file_number) {
- const auto& base_meta = base_it->second;
+ return true;
+ };
- AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
+ auto process_mutable =
+ [vstorage](const MutableBlobFileMetaData& mutable_meta) {
+ AddBlobFileIfNeeded(vstorage, CreateBlobFileMetaData(mutable_meta));
- ++base_it;
- } else if (delta_blob_file_number < base_blob_file_number) {
- const auto& delta = delta_it->second;
+ return true;
+ };
- auto meta = CreateMetaDataForNewBlobFile(delta);
+ auto process_both = [vstorage](
+ const std::shared_ptr<BlobFileMetaData>& base_meta,
+ const MutableBlobFileMetaData& mutable_meta) {
+ assert(base_meta);
+ assert(base_meta->GetSharedMeta() == mutable_meta.GetSharedMeta());
- AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+ if (!mutable_meta.HasDelta()) {
+ assert(base_meta->GetGarbageBlobCount() ==
+ mutable_meta.GetGarbageBlobCount());
+ assert(base_meta->GetGarbageBlobBytes() ==
+ mutable_meta.GetGarbageBlobBytes());
+ assert(base_meta->GetLinkedSsts() == mutable_meta.GetLinkedSsts());
- ++delta_it;
- } else {
- assert(base_blob_file_number == delta_blob_file_number);
+ AddBlobFileIfNeeded(vstorage, base_meta);
- const auto& base_meta = base_it->second;
- const auto& delta = delta_it->second;
+ return true;
+ }
- auto meta = GetOrCreateMetaDataForExistingBlobFile(base_meta, delta);
+ AddBlobFileIfNeeded(vstorage, CreateBlobFileMetaData(mutable_meta));
- AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+ return true;
+ };
- ++base_it;
- ++delta_it;
+ MergeBlobFileMetas(oldest_blob_file_with_linked_ssts, process_base,
+ process_mutable, process_both);
+ }
+
+ void MaybeAddFile(VersionStorageInfo* vstorage, int level,
+ FileMetaData* f) const {
+ const uint64_t file_number = f->fd.GetNumber();
+
+ const auto& level_state = levels_[level];
+
+ const auto& del_files = level_state.deleted_files;
+ const auto del_it = del_files.find(file_number);
+
+ if (del_it != del_files.end()) {
+ // f is to-be-deleted table file
+ vstorage->RemoveCurrentStats(f);
+ } else {
+ const auto& add_files = level_state.added_files;
+ const auto add_it = add_files.find(file_number);
+
+ // Note: if the file appears both in the base version and in the added
+ // list, the added FileMetaData supersedes the one in the base version.
+ if (add_it != add_files.end() && add_it->second != f) {
+ vstorage->RemoveCurrentStats(f);
+ } else {
+ vstorage->AddFile(level, f);
}
}
+ }
- while (base_it != base_it_end) {
- const auto& base_meta = base_it->second;
+ template <typename Cmp>
+ void SaveSSTFilesTo(VersionStorageInfo* vstorage, int level, Cmp cmp) const {
+ // Merge the set of added files with the set of pre-existing files.
+ // Drop any deleted files. Store the result in *vstorage.
+ const auto& base_files = base_vstorage_->LevelFiles(level);
+ const auto& unordered_added_files = levels_[level].added_files;
+ vstorage->Reserve(level, base_files.size() + unordered_added_files.size());
+
+ // Sort added files for the level.
+ std::vector<FileMetaData*> added_files;
+ added_files.reserve(unordered_added_files.size());
+ for (const auto& pair : unordered_added_files) {
+ added_files.push_back(pair.second);
+ }
+ std::sort(added_files.begin(), added_files.end(), cmp);
+
+ auto base_iter = base_files.begin();
+ auto base_end = base_files.end();
+ auto added_iter = added_files.begin();
+ auto added_end = added_files.end();
+ while (added_iter != added_end || base_iter != base_end) {
+ if (base_iter == base_end ||
+ (added_iter != added_end && cmp(*added_iter, *base_iter))) {
+ MaybeAddFile(vstorage, level, *added_iter++);
+ } else {
+ MaybeAddFile(vstorage, level, *base_iter++);
+ }
+ }
+ }
- AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
+ void SaveSSTFilesTo(VersionStorageInfo* vstorage) const {
+ assert(vstorage);
- ++base_it;
+ if (!num_levels_) {
+ return;
}
- while (delta_it != delta_it_end) {
- const auto& delta = delta_it->second;
-
- auto meta = CreateMetaDataForNewBlobFile(delta);
+ SaveSSTFilesTo(vstorage, /* level */ 0, level_zero_cmp_);
- AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+ for (int level = 1; level < num_levels_; ++level) {
+ SaveSSTFilesTo(vstorage, level, level_nonzero_cmp_);
+ }
+ }
- ++delta_it;
+ void SaveCompactCursorsTo(VersionStorageInfo* vstorage) const {
+ for (auto iter = updated_compact_cursors_.begin();
+ iter != updated_compact_cursors_.end(); iter++) {
+ vstorage->AddCursorForOneLevel(iter->first, iter->second);
}
}
- // Save the current state in *v.
- Status SaveTo(VersionStorageInfo* vstorage) {
- Status s = CheckConsistency(base_vstorage_);
+ // Save the current state in *vstorage.
+ Status SaveTo(VersionStorageInfo* vstorage) const {
+ Status s;
+
+#ifndef NDEBUG
+ // The same check is done within Apply() so we skip it in release mode.
+ s = CheckConsistency(base_vstorage_);
if (!s.ok()) {
return s;
}
+#endif // NDEBUG
s = CheckConsistency(vstorage);
if (!s.ok()) {
return s;
}
- for (int level = 0; level < num_levels_; level++) {
- const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
- // Merge the set of added files with the set of pre-existing files.
- // Drop any deleted files. Store the result in *v.
- const auto& base_files = base_vstorage_->LevelFiles(level);
- const auto& unordered_added_files = levels_[level].added_files;
- vstorage->Reserve(level,
- base_files.size() + unordered_added_files.size());
-
- // Sort added files for the level.
- std::vector<FileMetaData*> added_files;
- added_files.reserve(unordered_added_files.size());
- for (const auto& pair : unordered_added_files) {
- added_files.push_back(pair.second);
- }
- std::sort(added_files.begin(), added_files.end(), cmp);
-
-#ifndef NDEBUG
- FileMetaData* prev_added_file = nullptr;
- for (const auto& added : added_files) {
- if (level > 0 && prev_added_file != nullptr) {
- assert(base_vstorage_->InternalComparator()->Compare(
- prev_added_file->smallest, added->smallest) <= 0);
- }
- prev_added_file = added;
- }
-#endif
-
- auto base_iter = base_files.begin();
- auto base_end = base_files.end();
- auto added_iter = added_files.begin();
- auto added_end = added_files.end();
- while (added_iter != added_end || base_iter != base_end) {
- if (base_iter == base_end ||
- (added_iter != added_end && cmp(*added_iter, *base_iter))) {
- MaybeAddFile(vstorage, level, *added_iter++);
- } else {
- MaybeAddFile(vstorage, level, *base_iter++);
- }
- }
- }
+ SaveSSTFilesTo(vstorage);
SaveBlobFilesTo(vstorage);
+ SaveCompactCursorsTo(vstorage);
+
s = CheckConsistency(vstorage);
return s;
}
- Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
- bool prefetch_index_and_filter_in_cache,
- bool is_initial_load,
- const SliceTransform* prefix_extractor,
- size_t max_file_size_for_l0_meta_pin) {
+ Status LoadTableHandlers(
+ InternalStats* internal_stats, int max_threads,
+ bool prefetch_index_and_filter_in_cache, bool is_initial_load,
+ const std::shared_ptr<const SliceTransform>& prefix_extractor,
+ size_t max_file_size_for_l0_meta_pin) {
assert(table_cache_ != nullptr);
size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
bool always_load = (table_cache_capacity == TableCache::kInfiniteCapacity);
- size_t max_load = port::kMaxSizet;
+ size_t max_load = std::numeric_limits<size_t>::max();
if (!always_load) {
// If it is initial loading and not set to always loading all the
int level = files_meta[file_idx].second;
statuses[file_idx] = table_cache_->FindTable(
ReadOptions(), file_options_,
- *(base_vstorage_->InternalComparator()), file_meta->fd,
+ *(base_vstorage_->InternalComparator()), *file_meta,
&file_meta->table_reader_handle, prefix_extractor, false /*no_io */,
true /* record_read_stats */,
internal_stats->GetFileReadHist(level), false, level,
- prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin);
+ prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin,
+ file_meta->temperature);
if (file_meta->table_reader_handle != nullptr) {
// Load table_reader
file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
}
return ret;
}
-
- void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
- const uint64_t file_number = f->fd.GetNumber();
-
- const auto& level_state = levels_[level];
-
- const auto& del_files = level_state.deleted_files;
- const auto del_it = del_files.find(file_number);
-
- if (del_it != del_files.end()) {
- // f is to-be-deleted table file
- vstorage->RemoveCurrentStats(f);
- } else {
- const auto& add_files = level_state.added_files;
- const auto add_it = add_files.find(file_number);
-
- // Note: if the file appears both in the base version and in the added
- // list, the added FileMetaData supersedes the one in the base version.
- if (add_it != add_files.end() && add_it->second != f) {
- vstorage->RemoveCurrentStats(f);
- } else {
- vstorage->AddFile(level, f);
- }
- }
- }
};
-VersionBuilder::VersionBuilder(const FileOptions& file_options,
- const ImmutableCFOptions* ioptions,
- TableCache* table_cache,
- VersionStorageInfo* base_vstorage,
- VersionSet* version_set)
+VersionBuilder::VersionBuilder(
+ const FileOptions& file_options, const ImmutableCFOptions* ioptions,
+ TableCache* table_cache, VersionStorageInfo* base_vstorage,
+ VersionSet* version_set,
+ std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr)
: rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
- version_set)) {}
+ version_set, file_metadata_cache_res_mgr)) {}
VersionBuilder::~VersionBuilder() = default;
return rep_->CheckConsistencyForNumLevels();
}
-Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); }
+Status VersionBuilder::Apply(const VersionEdit* edit) {
+ return rep_->Apply(edit);
+}
-Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
+Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) const {
return rep_->SaveTo(vstorage);
}
Status VersionBuilder::LoadTableHandlers(
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool is_initial_load,
- const SliceTransform* prefix_extractor,
+ const std::shared_ptr<const SliceTransform>& prefix_extractor,
size_t max_file_size_for_l0_meta_pin) {
return rep_->LoadTableHandlers(
internal_stats, max_threads, prefetch_index_and_filter_in_cache,
is_initial_load, prefix_extractor, max_file_size_for_l0_meta_pin);
}
+uint64_t VersionBuilder::GetMinOldestBlobFileNumber() const {
+ return rep_->GetMinOldestBlobFileNumber();
+}
+
BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
ColumnFamilyData* cfd)
: version_builder_(new VersionBuilder(
cfd->current()->version_set()->file_options(), cfd->ioptions(),
cfd->table_cache(), cfd->current()->storage_info(),
- cfd->current()->version_set())),
+ cfd->current()->version_set(),
+ cfd->GetFileMetadataCacheReservationManager())),
version_(cfd->current()) {
version_->Ref();
}
ColumnFamilyData* cfd, Version* v)
: version_builder_(new VersionBuilder(
cfd->current()->version_set()->file_options(), cfd->ioptions(),
- cfd->table_cache(), v->storage_info(), v->version_set())),
+ cfd->table_cache(), v->storage_info(), v->version_set(),
+ cfd->GetFileMetadataCacheReservationManager())),
version_(v) {
assert(version_ != cfd->current());
}