#include <cinttypes>
#include <functional>
#include <map>
+#include <memory>
#include <set>
+#include <sstream>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
+#include "db/blob/blob_file_meta.h"
#include "db/dbformat.h"
#include "db/internal_stats.h"
#include "db/table_cache.h"
std::unordered_map<uint64_t, FileMetaData*> added_files;
};
+ class BlobFileMetaDataDelta {
+ public:
+ bool IsEmpty() const {
+ return !shared_meta_ && !additional_garbage_count_ &&
+ !additional_garbage_bytes_ && newly_linked_ssts_.empty() &&
+ newly_unlinked_ssts_.empty();
+ }
+
+ std::shared_ptr<SharedBlobFileMetaData> GetSharedMeta() const {
+ return shared_meta_;
+ }
+
+ uint64_t GetAdditionalGarbageCount() const {
+ return additional_garbage_count_;
+ }
+
+ uint64_t GetAdditionalGarbageBytes() const {
+ return additional_garbage_bytes_;
+ }
+
+ const std::unordered_set<uint64_t>& GetNewlyLinkedSsts() const {
+ return newly_linked_ssts_;
+ }
+
+ const std::unordered_set<uint64_t>& GetNewlyUnlinkedSsts() const {
+ return newly_unlinked_ssts_;
+ }
+
+ void SetSharedMeta(std::shared_ptr<SharedBlobFileMetaData> shared_meta) {
+ assert(!shared_meta_);
+ assert(shared_meta);
+
+ shared_meta_ = std::move(shared_meta);
+ }
+
+ void AddGarbage(uint64_t count, uint64_t bytes) {
+ additional_garbage_count_ += count;
+ additional_garbage_bytes_ += bytes;
+ }
+
+ void LinkSst(uint64_t sst_file_number) {
+ assert(newly_linked_ssts_.find(sst_file_number) ==
+ newly_linked_ssts_.end());
+
+ // Reconcile with newly unlinked SSTs on the fly. (Note: an SST can be
+ // linked to and unlinked from the same blob file in the case of a trivial
+ // move.)
+ auto it = newly_unlinked_ssts_.find(sst_file_number);
+
+ if (it != newly_unlinked_ssts_.end()) {
+ newly_unlinked_ssts_.erase(it);
+ } else {
+ newly_linked_ssts_.emplace(sst_file_number);
+ }
+ }
+
+ void UnlinkSst(uint64_t sst_file_number) {
+ assert(newly_unlinked_ssts_.find(sst_file_number) ==
+ newly_unlinked_ssts_.end());
+
+ // Reconcile with newly linked SSTs on the fly. (Note: an SST can be
+ // linked to and unlinked from the same blob file in the case of a trivial
+ // move.)
+ auto it = newly_linked_ssts_.find(sst_file_number);
+
+ if (it != newly_linked_ssts_.end()) {
+ newly_linked_ssts_.erase(it);
+ } else {
+ newly_unlinked_ssts_.emplace(sst_file_number);
+ }
+ }
+
+ private:
+ std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
+ uint64_t additional_garbage_count_ = 0;
+ uint64_t additional_garbage_bytes_ = 0;
+ std::unordered_set<uint64_t> newly_linked_ssts_;
+ std::unordered_set<uint64_t> newly_unlinked_ssts_;
+ };
+
const FileOptions& file_options_;
- Logger* info_log_;
+ const ImmutableCFOptions* const ioptions_;
TableCache* table_cache_;
VersionStorageInfo* base_vstorage_;
+ VersionSet* version_set_;
int num_levels_;
LevelState* levels_;
- // Store states of levels larger than num_levels_. We do this instead of
+ // Store sizes of levels larger than num_levels_. We do this instead of
// storing them in levels_ to avoid regression in case there are no files
// on invalid levels. The version is not consistent if in the end the files
// on invalid levels don't cancel out.
- std::map<int, std::unordered_set<uint64_t>> invalid_levels_;
+ std::unordered_map<int, size_t> invalid_level_sizes_;
// Whether there are invalid new files or invalid deletion on levels larger
// than num_levels_.
bool has_invalid_levels_;
+ // Current levels of table files affected by additions/deletions.
+ std::unordered_map<uint64_t, int> table_file_levels_;
FileComparator level_zero_cmp_;
FileComparator level_nonzero_cmp_;
+ // Metadata delta for all blob files affected by the series of version edits.
+ std::map<uint64_t, BlobFileMetaDataDelta> blob_file_meta_deltas_;
+
public:
- Rep(const FileOptions& file_options, Logger* info_log,
- TableCache* table_cache,
- VersionStorageInfo* base_vstorage)
+ Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
+ TableCache* table_cache, VersionStorageInfo* base_vstorage,
+ VersionSet* version_set)
: file_options_(file_options),
- info_log_(info_log),
+ ioptions_(ioptions),
table_cache_(table_cache),
base_vstorage_(base_vstorage),
+ version_set_(version_set),
num_levels_(base_vstorage->num_levels()),
has_invalid_levels_(false) {
+ assert(ioptions_);
+
levels_ = new LevelState[num_levels_];
level_zero_cmp_.sort_method = FileComparator::kLevel0;
level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
}
}
- Status CheckConsistency(VersionStorageInfo* vstorage) {
-#ifdef NDEBUG
- if (!vstorage->force_consistency_checks()) {
- // Dont run consistency checks in release mode except if
- // explicitly asked to
- return Status::OK();
+ bool IsBlobFileInVersion(uint64_t blob_file_number) const {
+ auto delta_it = blob_file_meta_deltas_.find(blob_file_number);
+ if (delta_it != blob_file_meta_deltas_.end()) {
+ if (delta_it->second.GetSharedMeta()) {
+ return true;
+ }
}
-#endif
- // make sure the files are sorted correctly
+
+ assert(base_vstorage_);
+
+ const auto& base_blob_files = base_vstorage_->GetBlobFiles();
+
+ auto base_it = base_blob_files.find(blob_file_number);
+ if (base_it != base_blob_files.end()) {
+ assert(base_it->second);
+ assert(base_it->second->GetSharedMeta());
+
+ return true;
+ }
+
+ return false;
+ }
+
+ using ExpectedLinkedSsts =
+ std::unordered_map<uint64_t, BlobFileMetaData::LinkedSsts>;
+
+ static void UpdateExpectedLinkedSsts(
+ uint64_t table_file_number, uint64_t blob_file_number,
+ ExpectedLinkedSsts* expected_linked_ssts) {
+ assert(expected_linked_ssts);
+
+ if (blob_file_number == kInvalidBlobFileNumber) {
+ return;
+ }
+
+ (*expected_linked_ssts)[blob_file_number].emplace(table_file_number);
+ }
+
+ Status CheckConsistencyDetails(VersionStorageInfo* vstorage) {
+ // Make sure the files are sorted correctly and that the links between
+ // table files and blob files are consistent. The latter is checked using
+ // the following mapping, which is built using the forward links
+ // (table file -> blob file), and is subsequently compared with the inverse
+ // mapping stored in the BlobFileMetaData objects.
+ ExpectedLinkedSsts expected_linked_ssts;
+
for (int level = 0; level < num_levels_; level++) {
auto& level_files = vstorage->LevelFiles(level);
+
+ if (level_files.empty()) {
+ continue;
+ }
+
+ assert(level_files[0]);
+ UpdateExpectedLinkedSsts(level_files[0]->fd.GetNumber(),
+ level_files[0]->oldest_blob_file_number,
+ &expected_linked_ssts);
for (size_t i = 1; i < level_files.size(); i++) {
+ assert(level_files[i]);
+ UpdateExpectedLinkedSsts(level_files[i]->fd.GetNumber(),
+ level_files[i]->oldest_blob_file_number,
+ &expected_linked_ssts);
+
auto f1 = level_files[i - 1];
auto f2 = level_files[i];
+ if (level == 0) {
#ifndef NDEBUG
- auto pair = std::make_pair(&f1, &f2);
- TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency", &pair);
+ auto pair = std::make_pair(&f1, &f2);
+ TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency0", &pair);
#endif
- if (level == 0) {
if (!level_zero_cmp_(f1, f2)) {
- fprintf(stderr, "L0 files are not sorted properly");
return Status::Corruption("L0 files are not sorted properly");
}
SequenceNumber external_file_seqno = f2->fd.smallest_seqno;
if (!(external_file_seqno < f1->fd.largest_seqno ||
external_file_seqno == 0)) {
- fprintf(stderr,
- "L0 file with seqno %" PRIu64 " %" PRIu64
- " vs. file with global_seqno %" PRIu64 "\n",
- f1->fd.smallest_seqno, f1->fd.largest_seqno,
- external_file_seqno);
return Status::Corruption(
"L0 file with seqno " +
NumberToString(f1->fd.smallest_seqno) + " " +
NumberToString(f1->fd.GetNumber()));
}
} else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
- fprintf(stderr,
- "L0 files seqno %" PRIu64 " %" PRIu64 " vs. %" PRIu64
- " %" PRIu64 "\n",
- f1->fd.smallest_seqno, f1->fd.largest_seqno,
- f2->fd.smallest_seqno, f2->fd.largest_seqno);
return Status::Corruption(
"L0 files seqno " + NumberToString(f1->fd.smallest_seqno) +
" " + NumberToString(f1->fd.largest_seqno) + " " +
NumberToString(f2->fd.GetNumber()));
}
} else {
+#ifndef NDEBUG
+ auto pair = std::make_pair(&f1, &f2);
+ TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency1", &pair);
+#endif
if (!level_nonzero_cmp_(f1, f2)) {
- fprintf(stderr, "L%d files are not sorted properly", level);
return Status::Corruption("L" + NumberToString(level) +
" files are not sorted properly");
}
// Make sure there is no overlap in levels > 0
if (vstorage->InternalComparator()->Compare(f1->largest,
f2->smallest) >= 0) {
- fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level,
- (f1->largest).DebugString(true).c_str(),
- (f2->smallest).DebugString(true).c_str());
return Status::Corruption(
"L" + NumberToString(level) + " have overlapping ranges " +
(f1->largest).DebugString(true) + " vs. " +
}
}
}
- return Status::OK();
+
+ // Make sure that all blob files in the version have non-garbage data.
+ const auto& blob_files = vstorage->GetBlobFiles();
+ for (const auto& pair : blob_files) {
+ const uint64_t blob_file_number = pair.first;
+ const auto& blob_file_meta = pair.second;
+ assert(blob_file_meta);
+
+ if (blob_file_meta->GetGarbageBlobCount() >=
+ blob_file_meta->GetTotalBlobCount()) {
+ std::ostringstream oss;
+ oss << "Blob file #" << blob_file_number
+ << " consists entirely of garbage";
+
+ return Status::Corruption("VersionBuilder", oss.str());
+ }
+
+ if (blob_file_meta->GetLinkedSsts() !=
+ expected_linked_ssts[blob_file_number]) {
+ std::ostringstream oss;
+ oss << "Links are inconsistent between table files and blob file #"
+ << blob_file_number;
+
+ return Status::Corruption("VersionBuilder", oss.str());
+ }
+ }
+
+ Status ret_s;
+ TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistencyBeforeReturn",
+ &ret_s);
+ return ret_s;
}
- Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number,
- int level) {
+ Status CheckConsistency(VersionStorageInfo* vstorage) {
+ // Always run consistency checks in debug build
#ifdef NDEBUG
- if (!base_vstorage_->force_consistency_checks()) {
- // Dont run consistency checks in release mode except if
- // explicitly asked to
+ if (!vstorage->force_consistency_checks()) {
return Status::OK();
}
#endif
- // a file to be deleted better exist in the previous version
- bool found = false;
- for (int l = 0; !found && l < num_levels_; l++) {
- const std::vector<FileMetaData*>& base_files =
- base_vstorage_->LevelFiles(l);
- for (size_t i = 0; i < base_files.size(); i++) {
- FileMetaData* f = base_files[i];
- if (f->fd.GetNumber() == number) {
- found = true;
- break;
- }
+ Status s = CheckConsistencyDetails(vstorage);
+ if (s.IsCorruption() && s.getState()) {
+ // Make it clear the error is due to force_consistency_checks = 1 or
+ // debug build
+#ifdef NDEBUG
+ auto prefix = "force_consistency_checks";
+#else
+ auto prefix = "force_consistency_checks(DEBUG)";
+#endif
+ s = Status::Corruption(prefix, s.getState());
+ } else {
+ // was only expecting corruption with message, or OK
+ assert(s.ok());
+ }
+ return s;
+ }
+
+ bool CheckConsistencyForNumLevels() const {
+ // Make sure there are no files on or beyond num_levels().
+ if (has_invalid_levels_) {
+ return false;
+ }
+
+ for (const auto& pair : invalid_level_sizes_) {
+ const size_t level_size = pair.second;
+ if (level_size != 0) {
+ return false;
}
}
- // if the file did not exist in the previous version, then it
- // is possibly moved from lower level to higher level in current
- // version
- for (int l = level + 1; !found && l < num_levels_; l++) {
- auto& level_added = levels_[l].added_files;
- auto got = level_added.find(number);
- if (got != level_added.end()) {
- found = true;
- break;
+
+ return true;
+ }
+
+ Status ApplyBlobFileAddition(const BlobFileAddition& blob_file_addition) {
+ const uint64_t blob_file_number = blob_file_addition.GetBlobFileNumber();
+
+ if (IsBlobFileInVersion(blob_file_number)) {
+ std::ostringstream oss;
+ oss << "Blob file #" << blob_file_number << " already added";
+
+ return Status::Corruption("VersionBuilder", oss.str());
+ }
+
+ // Note: we use C++11 for now but in C++14, this could be done in a more
+ // elegant way using generalized lambda capture.
+ VersionSet* const vs = version_set_;
+ const ImmutableCFOptions* const ioptions = ioptions_;
+
+ auto deleter = [vs, ioptions](SharedBlobFileMetaData* shared_meta) {
+ if (vs) {
+ assert(ioptions);
+ assert(!ioptions->cf_paths.empty());
+ assert(shared_meta);
+
+ vs->AddObsoleteBlobFile(shared_meta->GetBlobFileNumber(),
+ ioptions->cf_paths.front().path);
}
+
+ delete shared_meta;
+ };
+
+ auto shared_meta = SharedBlobFileMetaData::Create(
+ blob_file_number, blob_file_addition.GetTotalBlobCount(),
+ blob_file_addition.GetTotalBlobBytes(),
+ blob_file_addition.GetChecksumMethod(),
+ blob_file_addition.GetChecksumValue(), deleter);
+
+ blob_file_meta_deltas_[blob_file_number].SetSharedMeta(
+ std::move(shared_meta));
+
+ return Status::OK();
+ }
+
+ Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
+ const uint64_t blob_file_number = blob_file_garbage.GetBlobFileNumber();
+
+ if (!IsBlobFileInVersion(blob_file_number)) {
+ std::ostringstream oss;
+ oss << "Blob file #" << blob_file_number << " not found";
+
+ return Status::Corruption("VersionBuilder", oss.str());
+ }
+
+ blob_file_meta_deltas_[blob_file_number].AddGarbage(
+ blob_file_garbage.GetGarbageBlobCount(),
+ blob_file_garbage.GetGarbageBlobBytes());
+
+ return Status::OK();
+ }
+
+ int GetCurrentLevelForTableFile(uint64_t file_number) const {
+ auto it = table_file_levels_.find(file_number);
+ if (it != table_file_levels_.end()) {
+ return it->second;
+ }
+
+ assert(base_vstorage_);
+ return base_vstorage_->GetFileLocation(file_number).GetLevel();
+ }
+
+ uint64_t GetOldestBlobFileNumberForTableFile(int level,
+ uint64_t file_number) const {
+ assert(level < num_levels_);
+
+ const auto& added_files = levels_[level].added_files;
+
+ auto it = added_files.find(file_number);
+ if (it != added_files.end()) {
+ const FileMetaData* const meta = it->second;
+ assert(meta);
+
+ return meta->oldest_blob_file_number;
}
- // maybe this file was added in a previous edit that was Applied
- if (!found) {
- auto& level_added = levels_[level].added_files;
- auto got = level_added.find(number);
- if (got != level_added.end()) {
- found = true;
+ assert(base_vstorage_);
+ const FileMetaData* const meta =
+ base_vstorage_->GetFileMetaDataByNumber(file_number);
+ assert(meta);
+
+ return meta->oldest_blob_file_number;
+ }
+
+ Status ApplyFileDeletion(int level, uint64_t file_number) {
+ assert(level != VersionStorageInfo::FileLocation::Invalid().GetLevel());
+
+ const int current_level = GetCurrentLevelForTableFile(file_number);
+
+ if (level != current_level) {
+ if (level >= num_levels_) {
+ has_invalid_levels_ = true;
}
+
+ std::ostringstream oss;
+ oss << "Cannot delete table file #" << file_number << " from level "
+ << level << " since it is ";
+ if (current_level ==
+ VersionStorageInfo::FileLocation::Invalid().GetLevel()) {
+ oss << "not in the LSM tree";
+ } else {
+ oss << "on level " << current_level;
+ }
+
+ return Status::Corruption("VersionBuilder", oss.str());
}
- if (!found) {
- fprintf(stderr, "not found %" PRIu64 "\n", number);
- return Status::Corruption("not found " + NumberToString(number));
+
+ if (level >= num_levels_) {
+ assert(invalid_level_sizes_[level] > 0);
+ --invalid_level_sizes_[level];
+
+ table_file_levels_[file_number] =
+ VersionStorageInfo::FileLocation::Invalid().GetLevel();
+
+ return Status::OK();
}
+
+ const uint64_t blob_file_number =
+ GetOldestBlobFileNumberForTableFile(level, file_number);
+
+ if (blob_file_number != kInvalidBlobFileNumber &&
+ IsBlobFileInVersion(blob_file_number)) {
+ blob_file_meta_deltas_[blob_file_number].UnlinkSst(file_number);
+ }
+
+ auto& level_state = levels_[level];
+
+ auto& add_files = level_state.added_files;
+ auto add_it = add_files.find(file_number);
+ if (add_it != add_files.end()) {
+ UnrefFile(add_it->second);
+ add_files.erase(add_it);
+ }
+
+ auto& del_files = level_state.deleted_files;
+ assert(del_files.find(file_number) == del_files.end());
+ del_files.emplace(file_number);
+
+ table_file_levels_[file_number] =
+ VersionStorageInfo::FileLocation::Invalid().GetLevel();
+
return Status::OK();
}
- bool CheckConsistencyForNumLevels() {
- // Make sure there are no files on or beyond num_levels().
- if (has_invalid_levels_) {
- return false;
- }
- for (auto& level : invalid_levels_) {
- if (level.second.size() > 0) {
- return false;
+ Status ApplyFileAddition(int level, const FileMetaData& meta) {
+ assert(level != VersionStorageInfo::FileLocation::Invalid().GetLevel());
+
+ const uint64_t file_number = meta.fd.GetNumber();
+
+ const int current_level = GetCurrentLevelForTableFile(file_number);
+
+ if (current_level !=
+ VersionStorageInfo::FileLocation::Invalid().GetLevel()) {
+ if (level >= num_levels_) {
+ has_invalid_levels_ = true;
}
+
+ std::ostringstream oss;
+ oss << "Cannot add table file #" << file_number << " to level " << level
+ << " since it is already in the LSM tree on level " << current_level;
+ return Status::Corruption("VersionBuilder", oss.str());
}
- return true;
+
+ if (level >= num_levels_) {
+ ++invalid_level_sizes_[level];
+ table_file_levels_[file_number] = level;
+
+ return Status::OK();
+ }
+
+ auto& level_state = levels_[level];
+
+ auto& del_files = level_state.deleted_files;
+ auto del_it = del_files.find(file_number);
+ if (del_it != del_files.end()) {
+ del_files.erase(del_it);
+ }
+
+ FileMetaData* const f = new FileMetaData(meta);
+ f->refs = 1;
+
+ auto& add_files = level_state.added_files;
+ assert(add_files.find(file_number) == add_files.end());
+ add_files.emplace(file_number, f);
+
+ const uint64_t blob_file_number = f->oldest_blob_file_number;
+
+ if (blob_file_number != kInvalidBlobFileNumber &&
+ IsBlobFileInVersion(blob_file_number)) {
+ blob_file_meta_deltas_[blob_file_number].LinkSst(file_number);
+ }
+
+ table_file_levels_[file_number] = level;
+
+ return Status::OK();
}
// Apply all of the edits in *edit to the current state.
Status Apply(VersionEdit* edit) {
- Status s = CheckConsistency(base_vstorage_);
- if (!s.ok()) {
- return s;
+ {
+ const Status s = CheckConsistency(base_vstorage_);
+ if (!s.ok()) {
+ return s;
+ }
}
- // Delete files
- const auto& del = edit->GetDeletedFiles();
- for (const auto& del_file : del) {
- const auto level = del_file.first;
- const auto number = del_file.second;
- if (level < num_levels_) {
- levels_[level].deleted_files.insert(number);
- CheckConsistencyForDeletes(edit, number, level);
+ // Note: we process the blob file related changes first because the
+ // table file addition/deletion logic depends on the blob files
+ // already being there.
- auto exising = levels_[level].added_files.find(number);
- if (exising != levels_[level].added_files.end()) {
- UnrefFile(exising->second);
- levels_[level].added_files.erase(exising);
- }
- } else {
- if (invalid_levels_[level].erase(number) == 0) {
- // Deleting an non-existing file on invalid level.
- has_invalid_levels_ = true;
- }
+ // Add new blob files
+ for (const auto& blob_file_addition : edit->GetBlobFileAdditions()) {
+ const Status s = ApplyBlobFileAddition(blob_file_addition);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ // Increase the amount of garbage for blob files affected by GC
+ for (const auto& blob_file_garbage : edit->GetBlobFileGarbages()) {
+ const Status s = ApplyBlobFileGarbage(blob_file_garbage);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ // Delete table files
+ for (const auto& deleted_file : edit->GetDeletedFiles()) {
+ const int level = deleted_file.first;
+ const uint64_t file_number = deleted_file.second;
+
+ const Status s = ApplyFileDeletion(level, file_number);
+ if (!s.ok()) {
+ return s;
}
}
- // Add new files
+ // Add new table files
for (const auto& new_file : edit->GetNewFiles()) {
const int level = new_file.first;
- if (level < num_levels_) {
- FileMetaData* f = new FileMetaData(new_file.second);
- f->refs = 1;
-
- assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
- levels_[level].added_files.end());
- levels_[level].deleted_files.erase(f->fd.GetNumber());
- levels_[level].added_files[f->fd.GetNumber()] = f;
+ const FileMetaData& meta = new_file.second;
+
+ const Status s = ApplyFileAddition(level, meta);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ return Status::OK();
+ }
+
+ static BlobFileMetaData::LinkedSsts ApplyLinkedSstChanges(
+ const BlobFileMetaData::LinkedSsts& base,
+ const std::unordered_set<uint64_t>& newly_linked,
+ const std::unordered_set<uint64_t>& newly_unlinked) {
+ BlobFileMetaData::LinkedSsts result(base);
+
+ for (uint64_t sst_file_number : newly_unlinked) {
+ assert(result.find(sst_file_number) != result.end());
+
+ result.erase(sst_file_number);
+ }
+
+ for (uint64_t sst_file_number : newly_linked) {
+ assert(result.find(sst_file_number) == result.end());
+
+ result.emplace(sst_file_number);
+ }
+
+ return result;
+ }
+
+ static std::shared_ptr<BlobFileMetaData> CreateMetaDataForNewBlobFile(
+ const BlobFileMetaDataDelta& delta) {
+ auto shared_meta = delta.GetSharedMeta();
+ assert(shared_meta);
+
+ assert(delta.GetNewlyUnlinkedSsts().empty());
+
+ auto meta = BlobFileMetaData::Create(
+ std::move(shared_meta), delta.GetNewlyLinkedSsts(),
+ delta.GetAdditionalGarbageCount(), delta.GetAdditionalGarbageBytes());
+
+ return meta;
+ }
+
+ static std::shared_ptr<BlobFileMetaData>
+ GetOrCreateMetaDataForExistingBlobFile(
+ const std::shared_ptr<BlobFileMetaData>& base_meta,
+ const BlobFileMetaDataDelta& delta) {
+ assert(base_meta);
+ assert(!delta.GetSharedMeta());
+
+ if (delta.IsEmpty()) {
+ return base_meta;
+ }
+
+ auto shared_meta = base_meta->GetSharedMeta();
+ assert(shared_meta);
+
+ auto linked_ssts = ApplyLinkedSstChanges(base_meta->GetLinkedSsts(),
+ delta.GetNewlyLinkedSsts(),
+ delta.GetNewlyUnlinkedSsts());
+
+ auto meta = BlobFileMetaData::Create(
+ std::move(shared_meta), std::move(linked_ssts),
+ base_meta->GetGarbageBlobCount() + delta.GetAdditionalGarbageCount(),
+ base_meta->GetGarbageBlobBytes() + delta.GetAdditionalGarbageBytes());
+
+ return meta;
+ }
+
+ // Add the blob file specified by meta to *vstorage if it is determined to
+ // contain valid data (blobs). We make this decision based on the amount
+ // of garbage in the file, and whether the file or any lower-numbered blob
+ // files have any linked SSTs. The latter condition is tracked using the
+ // flag *found_first_non_empty.
+ void AddBlobFileIfNeeded(VersionStorageInfo* vstorage,
+ const std::shared_ptr<BlobFileMetaData>& meta,
+ bool* found_first_non_empty) const {
+ assert(vstorage);
+ assert(meta);
+ assert(found_first_non_empty);
+
+ if (!meta->GetLinkedSsts().empty()) {
+ (*found_first_non_empty) = true;
+ } else if (!(*found_first_non_empty) ||
+ meta->GetGarbageBlobCount() >= meta->GetTotalBlobCount()) {
+ return;
+ }
+
+ vstorage->AddBlobFile(meta);
+ }
+
+ // Merge the blob file metadata from the base version with the changes (edits)
+ // applied, and save the result into *vstorage.
+ void SaveBlobFilesTo(VersionStorageInfo* vstorage) const {
+ assert(base_vstorage_);
+ assert(vstorage);
+
+ bool found_first_non_empty = false;
+
+ const auto& base_blob_files = base_vstorage_->GetBlobFiles();
+ auto base_it = base_blob_files.begin();
+ const auto base_it_end = base_blob_files.end();
+
+ auto delta_it = blob_file_meta_deltas_.begin();
+ const auto delta_it_end = blob_file_meta_deltas_.end();
+
+ while (base_it != base_it_end && delta_it != delta_it_end) {
+ const uint64_t base_blob_file_number = base_it->first;
+ const uint64_t delta_blob_file_number = delta_it->first;
+
+ if (base_blob_file_number < delta_blob_file_number) {
+ const auto& base_meta = base_it->second;
+
+ AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
+
+ ++base_it;
+ } else if (delta_blob_file_number < base_blob_file_number) {
+ const auto& delta = delta_it->second;
+
+ auto meta = CreateMetaDataForNewBlobFile(delta);
+
+ AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+
+ ++delta_it;
} else {
- uint64_t number = new_file.second.fd.GetNumber();
- auto& lvls = invalid_levels_[level];
- if (lvls.count(number) == 0) {
- lvls.insert(number);
- } else {
- // Creating an already existing file on invalid level.
- has_invalid_levels_ = true;
- }
+ assert(base_blob_file_number == delta_blob_file_number);
+
+ const auto& base_meta = base_it->second;
+ const auto& delta = delta_it->second;
+
+ auto meta = GetOrCreateMetaDataForExistingBlobFile(base_meta, delta);
+
+ AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+
+ ++base_it;
+ ++delta_it;
}
}
- return s;
+
+ while (base_it != base_it_end) {
+ const auto& base_meta = base_it->second;
+
+ AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
+
+ ++base_it;
+ }
+
+ while (delta_it != delta_it_end) {
+ const auto& delta = delta_it->second;
+
+ auto meta = CreateMetaDataForNewBlobFile(delta);
+
+ AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
+
+ ++delta_it;
+ }
}
// Save the current state in *v.
}
}
+ SaveBlobFilesTo(vstorage);
+
s = CheckConsistency(vstorage);
return s;
}
Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
- const SliceTransform* prefix_extractor) {
+ const SliceTransform* prefix_extractor,
+ size_t max_file_size_for_l0_meta_pin) {
assert(table_cache_ != nullptr);
size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
size_t max_load = port::kMaxSizet;
if (!always_load) {
- // If it is initial loading and not set to always laoding all the
+ // If it is initial loading and not set to always loading all the
// files, we only load up to kInitialLoadLimit files, to limit the
// time reopening the DB.
const size_t kInitialLoadLimit = 16;
auto* file_meta = files_meta[file_idx].first;
int level = files_meta[file_idx].second;
statuses[file_idx] = table_cache_->FindTable(
- file_options_, *(base_vstorage_->InternalComparator()),
- file_meta->fd, &file_meta->table_reader_handle, prefix_extractor,
- false /*no_io */, true /* record_read_stats */,
+ ReadOptions(), file_options_,
+ *(base_vstorage_->InternalComparator()), file_meta->fd,
+ &file_meta->table_reader_handle, prefix_extractor, false /*no_io */,
+ true /* record_read_stats */,
internal_stats->GetFileReadHist(level), false, level,
- prefetch_index_and_filter_in_cache);
+ prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin);
if (file_meta->table_reader_handle != nullptr) {
// Load table_reader
file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
for (auto& t : threads) {
t.join();
}
+ Status ret;
for (const auto& s : statuses) {
if (!s.ok()) {
- return s;
+ if (ret.ok()) {
+ ret = s;
+ }
}
}
- return Status::OK();
+ return ret;
}
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
- if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
+ const uint64_t file_number = f->fd.GetNumber();
+
+ const auto& level_state = levels_[level];
+
+ const auto& del_files = level_state.deleted_files;
+ const auto del_it = del_files.find(file_number);
+
+ if (del_it != del_files.end()) {
// f is to-be-deleted table file
vstorage->RemoveCurrentStats(f);
} else {
- vstorage->AddFile(level, f, info_log_);
+ const auto& add_files = level_state.added_files;
+ const auto add_it = add_files.find(file_number);
+
+ // Note: if the file appears both in the base version and in the added
+ // list, the added FileMetaData supersedes the one in the base version.
+ if (add_it != add_files.end() && add_it->second != f) {
+ vstorage->RemoveCurrentStats(f);
+ } else {
+ vstorage->AddFile(level, f);
+ }
}
}
};
VersionBuilder::VersionBuilder(const FileOptions& file_options,
+ const ImmutableCFOptions* ioptions,
TableCache* table_cache,
VersionStorageInfo* base_vstorage,
- Logger* info_log)
- : rep_(new Rep(file_options, info_log, table_cache, base_vstorage)) {}
+ VersionSet* version_set)
+ : rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
+ version_set)) {}
-VersionBuilder::~VersionBuilder() { delete rep_; }
-
-Status VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
- return rep_->CheckConsistency(vstorage);
-}
-
-Status VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
- uint64_t number, int level) {
- return rep_->CheckConsistencyForDeletes(edit, number, level);
-}
+VersionBuilder::~VersionBuilder() = default;
bool VersionBuilder::CheckConsistencyForNumLevels() {
return rep_->CheckConsistencyForNumLevels();
Status VersionBuilder::LoadTableHandlers(
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool is_initial_load,
- const SliceTransform* prefix_extractor) {
- return rep_->LoadTableHandlers(internal_stats, max_threads,
- prefetch_index_and_filter_in_cache,
- is_initial_load, prefix_extractor);
+ const SliceTransform* prefix_extractor,
+ size_t max_file_size_for_l0_meta_pin) {
+ return rep_->LoadTableHandlers(
+ internal_stats, max_threads, prefetch_index_and_filter_in_cache,
+ is_initial_load, prefix_extractor, max_file_size_for_l0_meta_pin);
+}
+
+BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
+ ColumnFamilyData* cfd)
+ : version_builder_(new VersionBuilder(
+ cfd->current()->version_set()->file_options(), cfd->ioptions(),
+ cfd->table_cache(), cfd->current()->storage_info(),
+ cfd->current()->version_set())),
+ version_(cfd->current()) {
+ version_->Ref();
+}
+
+BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
+ ColumnFamilyData* cfd, Version* v)
+ : version_builder_(new VersionBuilder(
+ cfd->current()->version_set()->file_options(), cfd->ioptions(),
+ cfd->table_cache(), v->storage_info(), v->version_set())),
+ version_(v) {
+ assert(version_ != cfd->current());
}
-void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
- FileMetaData* f) {
- rep_->MaybeAddFile(vstorage, level, f);
+BaseReferencedVersionBuilder::~BaseReferencedVersionBuilder() {
+ version_->Unref();
}
} // namespace ROCKSDB_NAMESPACE