#include "db/version_builder.h"
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
#include <algorithm>
#include <atomic>
+#include <cinttypes>
#include <functional>
#include <map>
#include <set>
#include "db/version_set.h"
#include "port/port.h"
#include "table/table_reader.h"
+#include "util/string_util.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
if (a->fd.largest_seqno != b->fd.largest_seqno) {
std::unordered_map<uint64_t, FileMetaData*> added_files;
};
- const EnvOptions& env_options_;
+ const FileOptions& file_options_;
Logger* info_log_;
TableCache* table_cache_;
VersionStorageInfo* base_vstorage_;
FileComparator level_nonzero_cmp_;
public:
- Rep(const EnvOptions& env_options, Logger* info_log, TableCache* table_cache,
+ Rep(const FileOptions& file_options, Logger* info_log,
+ TableCache* table_cache,
VersionStorageInfo* base_vstorage)
- : env_options_(env_options),
+ : file_options_(file_options),
info_log_(info_log),
table_cache_(table_cache),
base_vstorage_(base_vstorage),
}
}
- void CheckConsistency(VersionStorageInfo* vstorage) {
+ Status CheckConsistency(VersionStorageInfo* vstorage) {
#ifdef NDEBUG
if (!vstorage->force_consistency_checks()) {
// Dont run consistency checks in release mode except if
// explicitly asked to
- return;
+ return Status::OK();
}
#endif
// make sure the files are sorted correctly
for (size_t i = 1; i < level_files.size(); i++) {
auto f1 = level_files[i - 1];
auto f2 = level_files[i];
+#ifndef NDEBUG
+ auto pair = std::make_pair(&f1, &f2);
+ TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency", &pair);
+#endif
if (level == 0) {
if (!level_zero_cmp_(f1, f2)) {
fprintf(stderr, "L0 files are not sorted properly");
- abort();
+ return Status::Corruption("L0 files are not sorted properly");
}
if (f2->fd.smallest_seqno == f2->fd.largest_seqno) {
" vs. file with global_seqno %" PRIu64 "\n",
f1->fd.smallest_seqno, f1->fd.largest_seqno,
external_file_seqno);
- abort();
+ return Status::Corruption(
+ "L0 file with seqno " +
+ NumberToString(f1->fd.smallest_seqno) + " " +
+ NumberToString(f1->fd.largest_seqno) +
+ " vs. file with global_seqno" +
+ NumberToString(external_file_seqno) + " with fileNumber " +
+ NumberToString(f1->fd.GetNumber()));
}
} else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
fprintf(stderr,
" %" PRIu64 "\n",
f1->fd.smallest_seqno, f1->fd.largest_seqno,
f2->fd.smallest_seqno, f2->fd.largest_seqno);
- abort();
+ return Status::Corruption(
+ "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) +
+ " " + NumberToString(f1->fd.largest_seqno) + " " +
+ NumberToString(f1->fd.GetNumber()) + " vs. " +
+ NumberToString(f2->fd.smallest_seqno) + " " +
+ NumberToString(f2->fd.largest_seqno) + " " +
+ NumberToString(f2->fd.GetNumber()));
}
} else {
if (!level_nonzero_cmp_(f1, f2)) {
fprintf(stderr, "L%d files are not sorted properly", level);
- abort();
+ return Status::Corruption("L" + NumberToString(level) +
+ " files are not sorted properly");
}
// Make sure there is no overlap in levels > 0
fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level,
(f1->largest).DebugString(true).c_str(),
(f2->smallest).DebugString(true).c_str());
- abort();
+ return Status::Corruption(
+ "L" + NumberToString(level) + " have overlapping ranges " +
+ (f1->largest).DebugString(true) + " vs. " +
+ (f2->smallest).DebugString(true));
}
}
}
}
+ return Status::OK();
}
- void CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number,
- int level) {
+ Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number,
+ int level) {
#ifdef NDEBUG
if (!base_vstorage_->force_consistency_checks()) {
// Dont run consistency checks in release mode except if
// explicitly asked to
- return;
+ return Status::OK();
}
#endif
// a file to be deleted better exist in the previous version
}
if (!found) {
fprintf(stderr, "not found %" PRIu64 "\n", number);
- abort();
+ return Status::Corruption("not found " + NumberToString(number));
}
+ return Status::OK();
}
bool CheckConsistencyForNumLevels() {
}
// Apply all of the edits in *edit to the current state.
- void Apply(VersionEdit* edit) {
- CheckConsistency(base_vstorage_);
+ Status Apply(VersionEdit* edit) {
+ Status s = CheckConsistency(base_vstorage_);
+ if (!s.ok()) {
+ return s;
+ }
// Delete files
- const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles();
+ const auto& del = edit->GetDeletedFiles();
for (const auto& del_file : del) {
const auto level = del_file.first;
const auto number = del_file.second;
levels_[level].added_files.erase(exising);
}
} else {
- auto exising = invalid_levels_[level].find(number);
- if (exising != invalid_levels_[level].end()) {
- invalid_levels_[level].erase(exising);
- } else {
+ if (invalid_levels_[level].erase(number) == 0) {
// Deleting an non-existing file on invalid level.
has_invalid_levels_ = true;
}
levels_[level].added_files[f->fd.GetNumber()] = f;
} else {
uint64_t number = new_file.second.fd.GetNumber();
- if (invalid_levels_[level].count(number) == 0) {
- invalid_levels_[level].insert(number);
+ auto& lvls = invalid_levels_[level];
+ if (lvls.count(number) == 0) {
+ lvls.insert(number);
} else {
// Creating an already existing file on invalid level.
has_invalid_levels_ = true;
}
}
}
+ return s;
}
// Save the current state in *v.
- void SaveTo(VersionStorageInfo* vstorage) {
- CheckConsistency(base_vstorage_);
- CheckConsistency(vstorage);
+ Status SaveTo(VersionStorageInfo* vstorage) {
+ Status s = CheckConsistency(base_vstorage_);
+ if (!s.ok()) {
+ return s;
+ }
+
+ s = CheckConsistency(vstorage);
+ if (!s.ok()) {
+ return s;
+ }
for (int level = 0; level < num_levels_; level++) {
const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
}
}
- CheckConsistency(vstorage);
+ s = CheckConsistency(vstorage);
+ return s;
}
Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
auto* file_meta = files_meta[file_idx].first;
int level = files_meta[file_idx].second;
statuses[file_idx] = table_cache_->FindTable(
- env_options_, *(base_vstorage_->InternalComparator()),
+ file_options_, *(base_vstorage_->InternalComparator()),
file_meta->fd, &file_meta->table_reader_handle, prefix_extractor,
false /*no_io */, true /* record_read_stats */,
internal_stats->GetFileReadHist(level), false, level,
}
};
-VersionBuilder::VersionBuilder(const EnvOptions& env_options,
+VersionBuilder::VersionBuilder(const FileOptions& file_options,
TableCache* table_cache,
VersionStorageInfo* base_vstorage,
Logger* info_log)
- : rep_(new Rep(env_options, info_log, table_cache, base_vstorage)) {}
+ : rep_(new Rep(file_options, info_log, table_cache, base_vstorage)) {}
VersionBuilder::~VersionBuilder() { delete rep_; }
-void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
- rep_->CheckConsistency(vstorage);
+Status VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
+ return rep_->CheckConsistency(vstorage);
}
-void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
- uint64_t number, int level) {
- rep_->CheckConsistencyForDeletes(edit, number, level);
+Status VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
+ uint64_t number, int level) {
+ return rep_->CheckConsistencyForDeletes(edit, number, level);
}
bool VersionBuilder::CheckConsistencyForNumLevels() {
return rep_->CheckConsistencyForNumLevels();
}
-void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
+Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); }
-void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
- rep_->SaveTo(vstorage);
+Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
+ return rep_->SaveTo(vstorage);
}
Status VersionBuilder::LoadTableHandlers(
rep_->MaybeAddFile(vstorage, level, f);
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE