#include "db/version_set.h"
#include <stdio.h>
+
#include <algorithm>
#include <array>
#include <cinttypes>
#include <string>
#include <unordered_map>
#include <vector>
+
#include "compaction/compaction.h"
+#include "db/blob/blob_file_cache.h"
+#include "db/blob/blob_file_reader.h"
+#include "db/blob/blob_index.h"
#include "db/internal_stats.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/pinned_iterators_manager.h"
#include "db/table_cache.h"
#include "db/version_builder.h"
+#include "db/version_edit_handler.h"
#include "file/filename.h"
#include "file/random_access_file_reader.h"
#include "file/read_write_util.h"
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
#include "test_util/sync_point.h"
+#include "util/cast_util.h"
#include "util/coding.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
*overlap = false;
if (iter->Valid()) {
ParsedInternalKey seek_result;
- if (!ParseInternalKey(iter->key(), &seek_result)) {
- return Status::Corruption("DB have corrupted keys");
- }
+ Status s = ParseInternalKey(iter->key(), &seek_result,
+ false /* log_err_key */); // TODO
+ if (!s.ok()) return s;
if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
0) {
range_(range),
batch_iter_(range->begin()),
batch_iter_prev_(range->begin()),
+ upper_key_(range->begin()),
maybe_repeat_key_(false),
current_level_range_(*range, range->begin(), range->end()),
current_file_range_(*range, range->begin(), range->end()),
!file_hit)) {
struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
- Slice& user_key = batch_iter_->ukey;
+ Slice& user_key = batch_iter_->ukey_without_ts;
// Do key range filtering of files or/and fractional cascading if:
// (1) not all the files are in level 0, or
// Check if key is within a file's range. If search left bound and
// right bound point to the same find, we are sure key falls in
// range.
+ int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
+ user_key, false, ExtractUserKey(f->smallest_key), true);
+
assert(curr_level_ == 0 ||
fp_ctx.curr_index_in_curr_level ==
fp_ctx.start_index_in_curr_level ||
- user_comparator_->Compare(user_key,
- ExtractUserKey(f->smallest_key)) <= 0);
+ cmp_smallest <= 0);
- int cmp_smallest = user_comparator_->Compare(
- user_key, ExtractUserKey(f->smallest_key));
if (cmp_smallest >= 0) {
- cmp_largest = user_comparator_->Compare(
- user_key, ExtractUserKey(f->largest_key));
+ cmp_largest = user_comparator_->CompareWithoutTimestamp(
+ user_key, false, ExtractUserKey(f->largest_key), true);
} else {
cmp_largest = -1;
}
}
if (cmp_largest == 0) {
// cmp_largest is 0, which means the next key will not be in this
- // file, so stop looking further. Also don't increment megt_iter_
- // as we may have to look for this key in the next file if we don't
- // find it in this one
+ // file, so stop looking further. However, its possible there are
+ // duplicates in the batch, so find the upper bound for the batch
+ // in this file (upper_key_) by skipping past the duplicates. We
+ // leave batch_iter_ as is since we may have to pick up from there
+ // for the next file, if this file has a merge value rather than
+ // final value
+ upper_key_ = batch_iter_;
+ ++upper_key_;
+ while (upper_key_ != current_level_range_.end() &&
+ user_comparator_->CompareWithoutTimestamp(
+ batch_iter_->ukey_without_ts, false,
+ upper_key_->ukey_without_ts, false) == 0) {
+ ++upper_key_;
+ }
break;
} else {
if (curr_level_ == 0) {
*fd = f;
*file_index = curr_file_index;
*is_last_key_in_file = cmp_largest == 0;
+ if (!*is_last_key_in_file) {
+ // If the largest key in the batch overlapping the file is not the
+ // largest key in the file, upper_ley_ would not have been updated so
+ // update it here
+ upper_key_ = batch_iter_;
+ }
return file_hit;
}
// file regardless for all keys not found yet
if (current_level_range_.CheckKeyDone(batch_iter_) ||
curr_level_ == 0) {
- ++batch_iter_;
+ batch_iter_ = upper_key_;
}
}
// batch_iter_prev_ will become the start key for the next file
&is_last_key_in_file)) {
search_ended_ = !PrepareNextLevel();
} else {
- MultiGetRange::Iterator upper_key = batch_iter_;
if (is_last_key_in_file) {
// Since cmp_largest is 0, batch_iter_ still points to the last key
// that falls in this file, instead of the next one. Increment
- // upper_key so we can set the range properly for SST MultiGet
- ++upper_key;
- ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level);
+ // the file index for all keys between batch_iter_ and upper_key_
+ auto tmp_iter = batch_iter_;
+ while (tmp_iter != upper_key_) {
+ ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level);
+ ++tmp_iter;
+ }
maybe_repeat_key_ = true;
}
// Set the range for this file
current_file_range_ =
- MultiGetRange(next_file_range, batch_iter_prev_, upper_key);
+ MultiGetRange(next_file_range, batch_iter_prev_, upper_key_);
returned_file_level_ = curr_level_;
hit_file_level_ = curr_level_;
is_hit_file_last_in_level_ =
// key found in the previous SST file, in order to serve as the start of
// the batch key range for the next SST file
MultiGetRange::Iterator batch_iter_prev_;
+ MultiGetRange::Iterator upper_key_;
bool maybe_repeat_key_;
MultiGetRange current_level_range_;
MultiGetRange current_file_range_;
if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
curr_file_level_->num_files) {
batch_iter_prev_ = current_level_range_.begin();
- batch_iter_ = current_level_range_.begin();
+ upper_key_ = batch_iter_ = current_level_range_.begin();
return true;
}
}
}
if (level_contains_keys) {
batch_iter_prev_ = current_level_range_.begin();
- batch_iter_ = current_level_range_.begin();
+ upper_key_ = batch_iter_ = current_level_range_.begin();
return true;
}
curr_level_++;
class LevelIterator final : public InternalIterator {
public:
+ // @param read_options Must outlive this iterator.
LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
const FileOptions& file_options,
const InternalKeyComparator& icomparator,
HistogramImpl* file_read_hist, TableReaderCaller caller,
bool skip_filters, int level, RangeDelAggregator* range_del_agg,
const std::vector<AtomicCompactionUnitBoundary>*
- compaction_boundaries = nullptr)
+ compaction_boundaries = nullptr,
+ bool allow_unprepared_value = false)
: table_cache_(table_cache),
read_options_(read_options),
file_options_(file_options),
should_sample_(should_sample),
caller_(caller),
skip_filters_(skip_filters),
+ allow_unprepared_value_(allow_unprepared_value),
file_index_(flevel_->num_files),
level_(level),
range_del_agg_(range_del_agg),
return file_iter_.iter() ? file_iter_.status() : Status::OK();
}
+ bool PrepareValue() override {
+ return file_iter_.PrepareValue();
+ }
+
inline bool MayBeOutOfLowerBound() override {
assert(Valid());
return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
}
- inline bool MayBeOutOfUpperBound() override {
- assert(Valid());
- return file_iter_.MayBeOutOfUpperBound();
+ inline IterBoundCheck UpperBoundCheckResult() override {
+ if (Valid()) {
+ return file_iter_.UpperBoundCheckResult();
+ } else {
+ return IterBoundCheck::kUnknown;
+ }
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
void SetFileIterator(InternalIterator* iter);
void InitFileIterator(size_t new_file_index);
- // Called by both of Next() and NextAndGetResult(). Force inline.
- void NextImpl() {
- assert(Valid());
- file_iter_.Next();
- SkipEmptyFileForward();
- }
-
const Slice& file_smallest_key(size_t file_index) {
assert(file_index < flevel_->num_files);
return flevel_->files[file_index].smallest_key;
bool KeyReachedUpperBound(const Slice& internal_key) {
return read_options_.iterate_upper_bound != nullptr &&
user_comparator_.CompareWithoutTimestamp(
- ExtractUserKey(internal_key),
- *read_options_.iterate_upper_bound) >= 0;
+ ExtractUserKey(internal_key), /*a_has_ts=*/true,
+ *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0;
}
InternalIterator* NewFileIterator() {
read_options_, file_options_, icomparator_, *file_meta.file_metadata,
range_del_agg_, prefix_extractor_,
nullptr /* don't need reference to table */, file_read_hist_, caller_,
- /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key,
- largest_compaction_key);
+ /*arena=*/nullptr, skip_filters_, level_,
+ /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key,
+ largest_compaction_key, allow_unprepared_value_);
}
// Check if current file being fully within iterate_lower_bound.
if (read_options_.iterate_lower_bound != nullptr &&
file_index_ < flevel_->num_files) {
may_be_out_of_lower_bound_ =
- user_comparator_.Compare(
- ExtractUserKey(file_smallest_key(file_index_)),
- *read_options_.iterate_lower_bound) < 0;
+ user_comparator_.CompareWithoutTimestamp(
+ ExtractUserKey(file_smallest_key(file_index_)), /*a_has_ts=*/true,
+ *read_options_.iterate_lower_bound, /*b_has_ts=*/false) < 0;
}
}
TableCache* table_cache_;
- const ReadOptions read_options_;
+ const ReadOptions& read_options_;
const FileOptions& file_options_;
const InternalKeyComparator& icomparator_;
const UserComparatorWrapper user_comparator_;
bool should_sample_;
TableReaderCaller caller_;
bool skip_filters_;
+ bool allow_unprepared_value_;
bool may_be_out_of_lower_bound_ = true;
size_t file_index_;
int level_;
// next key after the prefix, or make the iterator invalid.
// A side benefit will be that it invalidates the iterator earlier so that
// the upper level merging iterator can merge fewer child iterators.
- Slice target_user_key = ExtractUserKey(target);
- Slice file_user_key = ExtractUserKey(file_iter_.key());
- if (prefix_extractor_->InDomain(target_user_key) &&
- (!prefix_extractor_->InDomain(file_user_key) ||
- user_comparator_.Compare(
- prefix_extractor_->Transform(target_user_key),
- prefix_extractor_->Transform(file_user_key)) != 0)) {
+ size_t ts_sz = user_comparator_.timestamp_size();
+ Slice target_user_key_without_ts =
+ ExtractUserKeyAndStripTimestamp(target, ts_sz);
+ Slice file_user_key_without_ts =
+ ExtractUserKeyAndStripTimestamp(file_iter_.key(), ts_sz);
+ if (prefix_extractor_->InDomain(target_user_key_without_ts) &&
+ (!prefix_extractor_->InDomain(file_user_key_without_ts) ||
+ user_comparator_.CompareWithoutTimestamp(
+ prefix_extractor_->Transform(target_user_key_without_ts), false,
+ prefix_extractor_->Transform(file_user_key_without_ts),
+ false) != 0)) {
SetFileIterator(nullptr);
}
}
CheckMayBeOutOfLowerBound();
}
-void LevelIterator::Next() { NextImpl(); }
+void LevelIterator::Next() {
+ assert(Valid());
+ file_iter_.Next();
+ SkipEmptyFileForward();
+}
bool LevelIterator::NextAndGetResult(IterateResult* result) {
- NextImpl();
- bool is_valid = Valid();
- if (is_valid) {
- result->key = key();
- result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
+ assert(Valid());
+ bool is_valid = file_iter_.NextAndGetResult(result);
+ if (!is_valid) {
+ SkipEmptyFileForward();
+ is_valid = Valid();
+ if (is_valid) {
+ result->key = key();
+ result->bound_check_result = file_iter_.UpperBoundCheckResult();
+ // Ideally, we should return the real file_iter_.value_prepared but the
+ // information is not here. It would casue an extra PrepareValue()
+ // for the first key of a file.
+ result->value_prepared = !allow_unprepared_value_;
+ }
}
return is_valid;
}
bool seen_empty_file = false;
while (file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && file_iter_.status().ok() &&
- !file_iter_.iter()->IsOutOfBound())) {
+ file_iter_.iter()->UpperBoundCheckResult() !=
+ IterBoundCheck::kOutOfBound)) {
seen_empty_file = true;
// Move to next file
if (file_index_ >= flevel_->num_files - 1) {
}
} // anonymous namespace
-// A wrapper of version builder which references the current version in
-// constructor and unref it in the destructor.
-// Both of the constructor and destructor need to be called inside DB Mutex.
-class BaseReferencedVersionBuilder {
- public:
- explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
- : version_builder_(new VersionBuilder(
- cfd->current()->version_set()->file_options(), cfd->table_cache(),
- cfd->current()->storage_info(), cfd->ioptions()->info_log)),
- version_(cfd->current()) {
- version_->Ref();
- }
- ~BaseReferencedVersionBuilder() {
- version_->Unref();
- }
- VersionBuilder* version_builder() { return version_builder_.get(); }
-
- private:
- std::unique_ptr<VersionBuilder> version_builder_;
- Version* version_;
-};
-
Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
const FileMetaData* file_meta,
const std::string* fname) const {
// pass the magic number check in the footer.
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
- std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
- 0 /* hist_type */, nullptr /* file_read_hist */,
+ std::move(file), file_name, nullptr /* env */, io_tracer_,
+ nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
nullptr /* rate_limiter */, ioptions->listeners));
s = ReadTableProperties(
file_reader.get(), file_meta->fd.GetFileSize(),
void Version::AddIterators(const ReadOptions& read_options,
const FileOptions& soptions,
MergeIteratorBuilder* merge_iter_builder,
- RangeDelAggregator* range_del_agg) {
+ RangeDelAggregator* range_del_agg,
+ bool allow_unprepared_value) {
assert(storage_info_.finalized_);
for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
- range_del_agg);
+ range_del_agg, allow_unprepared_value);
}
}
const FileOptions& soptions,
MergeIteratorBuilder* merge_iter_builder,
int level,
- RangeDelAggregator* range_del_agg) {
+ RangeDelAggregator* range_del_agg,
+ bool allow_unprepared_value) {
assert(storage_info_.finalized_);
if (level >= storage_info_.num_non_empty_levels()) {
// This is an empty level
mutable_cf_options_.prefix_extractor.get(), nullptr,
cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, arena,
- /*skip_filters=*/false, /*level=*/0,
+ /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
/*smallest_compaction_key=*/nullptr,
- /*largest_compaction_key=*/nullptr));
+ /*largest_compaction_key=*/nullptr, allow_unprepared_value));
}
if (should_sample) {
// Count ones for every L0 files. This is done per iterator creation
mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
- range_del_agg, /*largest_compaction_key=*/nullptr));
+ range_del_agg,
+ /*compaction_boundaries=*/nullptr, allow_unprepared_value));
}
}
mutable_cf_options_.prefix_extractor.get(), nullptr,
cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, &arena,
- /*skip_filters=*/false, /*level=*/0,
+ /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
/*smallest_compaction_key=*/nullptr,
- /*largest_compaction_key=*/nullptr));
+ /*largest_compaction_key=*/nullptr,
+ /*allow_unprepared_value=*/false));
status = OverlapWithIterator(
ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
if (!status.ok() || *overlap) {
Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
const FileOptions& file_opt,
const MutableCFOptions mutable_cf_options,
+ const std::shared_ptr<IOTracer>& io_tracer,
uint64_t version_number)
: env_(vset->env_),
cfd_(column_family_data),
db_statistics_((cfd_ == nullptr) ? nullptr
: cfd_->ioptions()->statistics),
table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
+ blob_file_cache_(cfd_ ? cfd_->blob_file_cache() : nullptr),
merge_operator_((cfd_ == nullptr) ? nullptr
: cfd_->ioptions()->merge_operator),
storage_info_(
refs_(0),
file_options_(file_opt),
mutable_cf_options_(mutable_cf_options),
- version_number_(version_number) {}
+ max_file_size_for_l0_meta_pin_(
+ MaxFileSizeForL0MetaPin(mutable_cf_options_)),
+ version_number_(version_number),
+ io_tracer_(io_tracer) {}
+
+Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
+ PinnableSlice* value) const {
+ assert(value);
+
+ if (read_options.read_tier == kBlockCacheTier) {
+ return Status::Incomplete("Cannot read blob: no disk I/O allowed");
+ }
+
+ BlobIndex blob_index;
+
+ {
+ Status s = blob_index.DecodeFrom(*value);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ if (blob_index.HasTTL() || blob_index.IsInlined()) {
+ return Status::Corruption("Unexpected TTL/inlined blob index");
+ }
+
+ const auto& blob_files = storage_info_.GetBlobFiles();
+
+ const uint64_t blob_file_number = blob_index.file_number();
+
+ const auto it = blob_files.find(blob_file_number);
+ if (it == blob_files.end()) {
+ return Status::Corruption("Invalid blob file number");
+ }
+
+ CacheHandleGuard<BlobFileReader> blob_file_reader;
+
+ {
+ assert(blob_file_cache_);
+ const Status s = blob_file_cache_->GetBlobFileReader(blob_file_number,
+ &blob_file_reader);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ assert(blob_file_reader.GetValue());
+ const Status s = blob_file_reader.GetValue()->GetBlob(
+ read_options, user_key, blob_index.offset(), blob_index.size(),
+ blob_index.compression(), value);
+
+ return s;
+}
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
- PinnableSlice* value, Status* status,
+ PinnableSlice* value, std::string* timestamp, Status* status,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, bool* value_found,
bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
vset_->block_cache_tracer_->is_tracing_enabled()) {
tracing_get_id = vset_->block_cache_tracer_->NextGetId();
}
+
+ // Note: the old StackableDB-based BlobDB passes in
+ // GetImplOptions::is_blob_index; for the integrated BlobDB implementation, we
+ // need to provide it here.
+ bool is_blob_index = false;
+ bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index;
+
GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
- do_merge ? value : nullptr, value_found, merge_context, do_merge,
- max_covering_tombstone_seq, this->env_, seq,
- merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
+ do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
+ merge_context, do_merge, max_covering_tombstone_seq, this->env_, seq,
+ merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob_to_use,
tracing_get_id);
// Pin blocks that we read to hold merge operands
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
- fp.GetCurrentLevel());
+ fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_);
// TODO: examine the behavior for corrupted key
if (timer_enabled) {
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
- fp.GetCurrentLevel());
+ fp.GetHitFileLevel());
}
if (!status->ok()) {
return;
// TODO: update per-level perfcontext user_key_return_count for kMerge
break;
case GetContext::kFound:
+ if (is_blob_index) {
+ if (do_merge && value) {
+ *status = GetBlob(read_options, user_key, value);
+ if (!status->ok()) {
+ if (status->IsIncomplete()) {
+ get_context.MarkKeyMayExist();
+ }
+ return;
+ }
+ }
+ }
+
if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0);
} else if (fp.GetHitFileLevel() == 1) {
case GetContext::kCorrupt:
*status = Status::Corruption("corrupted key for ", user_key);
return;
- case GetContext::kBlobIndex:
+ case GetContext::kUnexpectedBlobIndex:
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
assert(iter->s->ok() || iter->s->IsMergeInProgress());
get_ctx.emplace_back(
user_comparator(), merge_operator_, info_log_, db_statistics_,
- iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
- iter->value, nullptr, &(iter->merge_context), true,
- &iter->max_covering_tombstone_seq, this->env_, nullptr,
- merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
- tracing_mget_id);
+ iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge,
+ iter->ukey_with_ts, iter->value, iter->timestamp, nullptr,
+ &(iter->merge_context), true, &iter->max_covering_tombstone_seq,
+ this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr,
+ callback, is_blob, tracing_mget_id);
// MergeInProgress status, if set, has been transferred to the get_context
// state, so we set status to ok here. From now on, the iter status will
// be used for IO errors, and get_context state will be used for any
&storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
&storage_info_.file_indexer_, user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
+ Status s;
+ uint64_t num_index_read = 0;
+ uint64_t num_filter_read = 0;
+ uint64_t num_data_read = 0;
+ uint64_t num_sst_read = 0;
while (f != nullptr) {
MultiGetRange file_range = fp.CurrentFileRange();
GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
get_perf_context()->per_level_perf_context_enabled;
StopWatchNano timer(env_, timer_enabled /* auto_start */);
- Status s = table_cache_->MultiGet(
+ s = table_cache_->MultiGet(
read_options, *internal_comparator(), *f->file_metadata, &file_range,
mutable_cf_options_.prefix_extractor.get(),
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
- fp.GetCurrentLevel());
+ fp.GetHitFileLevel());
// TODO: examine the behavior for corrupted key
if (timer_enabled) {
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
- fp.GetCurrentLevel());
+ fp.GetHitFileLevel());
}
if (!s.ok()) {
// TODO: Set status for individual keys appropriately
return;
}
uint64_t batch_size = 0;
- for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
+ for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
+ ++iter) {
GetContext& get_context = *iter->get_context;
Status* status = iter->s;
// The Status in the KeyContext takes precedence over GetContext state
sample_file_read_inc(f->file_metadata);
}
batch_size++;
+ num_index_read += get_context.get_context_stats_.num_index_read;
+ num_filter_read += get_context.get_context_stats_.num_filter_read;
+ num_data_read += get_context.get_context_stats_.num_data_read;
+ num_sst_read += get_context.get_context_stats_.num_sst_read;
+
// report the counters before returning
if (get_context.State() != GetContext::kNotFound &&
get_context.State() != GetContext::kMerge &&
}
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
fp.GetHitFileLevel());
+ file_range.AddValueSize(iter->value->size());
file_range.MarkKeyDone(iter);
+ if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
+ s = Status::Aborted();
+ break;
+ }
continue;
case GetContext::kDeleted:
// Use empty error message for speed
Status::Corruption("corrupted key for ", iter->lkey->user_key());
file_range.MarkKeyDone(iter);
continue;
- case GetContext::kBlobIndex:
+ case GetContext::kUnexpectedBlobIndex:
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
continue;
}
}
+
+ // Report MultiGet stats per level.
+ if (fp.IsHitFileLastInLevel()) {
+ // Dump the stats if this is the last file of this level and reset for
+ // next level.
+ RecordInHistogram(db_statistics_,
+ NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
+ num_index_read + num_filter_read);
+ RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL,
+ num_data_read);
+ RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);
+ num_filter_read = 0;
+ num_index_read = 0;
+ num_data_read = 0;
+ num_sst_read = 0;
+ }
+
RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
- if (file_picker_range.empty()) {
+ if (!s.ok() || file_picker_range.empty()) {
break;
}
f = fp.GetNextFile();
}
// Process any left over keys
- for (auto iter = range->begin(); iter != range->end(); ++iter) {
+ for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
GetContext& get_context = *iter->get_context;
Status* status = iter->s;
Slice user_key = iter->lkey->user_key();
nullptr /* result_operand */, true);
if (LIKELY(iter->value != nullptr)) {
iter->value->PinSelf();
+ range->AddValueSize(iter->value->size());
+ range->MarkKeyDone(iter);
+ if (range->GetValueSize() > read_options.value_size_soft_limit) {
+ s = Status::Aborted();
+ break;
+ }
}
} else {
range->MarkKeyDone(iter);
*status = Status::NotFound(); // Use an empty error message for speed
}
}
+
+ for (auto iter = range->begin(); iter != range->end(); ++iter) {
+ range->MarkKeyDone(iter);
+ *(iter->s) = s;
+ }
}
bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
void Version::PrepareApply(
const MutableCFOptions& mutable_cf_options,
bool update_stats) {
+ TEST_SYNC_POINT_CALLBACK(
+ "Version::PrepareApply:forced_check",
+ reinterpret_cast<void*>(&storage_info_.force_consistency_checks_));
UpdateAccumulatedStats(update_stats);
storage_info_.UpdateNumNonEmptyLevels();
storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
// compaction score for the whole DB. Adding other levels as if
// they are L0 files.
for (int i = 1; i < num_levels(); i++) {
+ // Its possible that a subset of the files in a level may be in a
+ // compaction, due to delete triggered compaction or trivial move.
+ // In that case, the below check may not catch a level being
+ // compacted as it only checks the first file. The worst that can
+ // happen is a scheduled compaction thread will find nothing to do.
if (!files_[i].empty() && !files_[i][0]->being_compacted) {
num_sorted_runs++;
}
// Level-based involves L0->L0 compactions that can lead to oversized
// L0 files. Take into account size as well to avoid later giant
// compactions to the base level.
- score = std::max(
- score, static_cast<double>(total_size) /
- mutable_cf_options.max_bytes_for_level_base);
+ uint64_t l0_target_size = mutable_cf_options.max_bytes_for_level_base;
+ if (immutable_cf_options.level_compaction_dynamic_level_bytes &&
+ level_multiplier_ != 0.0) {
+ // Prevent L0 to Lbase fanout from growing larger than
+ // `level_multiplier_`. This prevents us from getting stuck picking
+ // L0 forever even when it is hurting write-amp. That could happen
+ // in dynamic level compaction's write-burst mode where the base
+ // level's target size can grow to be enormous.
+ l0_target_size =
+ std::max(l0_target_size,
+ static_cast<uint64_t>(level_max_bytes_[base_level_] /
+ level_multiplier_));
+ }
+ score =
+ std::max(score, static_cast<double>(total_size) / l0_target_size);
}
}
} else {
}
} // anonymous namespace
-void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
- auto* level_files = &files_[level];
- // Must not overlap
-#ifndef NDEBUG
- if (level > 0 && !level_files->empty() &&
- internal_comparator_->Compare(
- (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) {
- auto* f2 = (*level_files)[level_files->size() - 1];
- if (info_log != nullptr) {
- Error(info_log, "Adding new file %" PRIu64
- " range (%s, %s) to level %d but overlapping "
- "with existing file %" PRIu64 " %s %s",
- f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
- f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
- f2->smallest.DebugString(true).c_str(),
- f2->largest.DebugString(true).c_str());
- LogFlush(info_log);
- }
- assert(false);
- }
-#else
- (void)info_log;
-#endif
+void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
+ auto& level_files = files_[level];
+ level_files.push_back(f);
+
f->refs++;
- level_files->push_back(f);
+
+ const uint64_t file_number = f->fd.GetNumber();
+
+ assert(file_locations_.find(file_number) == file_locations_.end());
+ file_locations_.emplace(file_number,
+ FileLocation(level, level_files.size() - 1));
+}
+
+void VersionStorageInfo::AddBlobFile(
+ std::shared_ptr<BlobFileMetaData> blob_file_meta) {
+ assert(blob_file_meta);
+
+ const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
+
+ auto it = blob_files_.lower_bound(blob_file_number);
+ assert(it == blob_files_.end() || it->first != blob_file_number);
+
+ blob_files_.insert(
+ it, BlobFiles::value_type(blob_file_number, std::move(blob_file_meta)));
}
// Version::PrepareApply() need to be called before calling the function, or
}
uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
- // Estimate the live data size by adding up the size of the last level for all
- // key ranges. Note: Estimate depends on the ordering of files in level 0
- // because files in level 0 can be overlapping.
+ // Estimate the live data size by adding up the size of a maximal set of
+ // sst files with no range overlap in same or higher level. The less
+ // compacted, the more optimistic (smaller) this estimate is. Also,
+ // for multiple sorted runs within a level, file order will matter.
uint64_t size = 0;
auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
return internal_comparator_->Compare(*x, *y) < 0;
};
- // (Ordered) map of largest keys in non-overlapping files
+ // (Ordered) map of largest keys in files being included in size estimate
std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
for (int l = num_levels_ - 1; l >= 0; l--) {
bool found_end = false;
for (auto file : files_[l]) {
- // Find the first file where the largest key is larger than the smallest
- // key of the current file. If this file does not overlap with the
+ // Find the first file already included with largest key is larger than
+ // the smallest key of `file`. If that file does not overlap with the
// current file, none of the files in the map does. If there is
// no potential overlap, we can safely insert the rest of this level
// (if the level is not 0) into the map without checking again because
return false;
}
-void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
- for (int level = 0; level < storage_info_.num_levels(); level++) {
- const std::vector<FileMetaData*>& files = storage_info_.files_[level];
- for (const auto& file : files) {
- live->push_back(file->fd);
+void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files,
+ std::vector<uint64_t>* live_blob_files) const {
+ assert(live_table_files);
+ assert(live_blob_files);
+
+ for (int level = 0; level < storage_info_.num_levels(); ++level) {
+ const auto& level_files = storage_info_.LevelFiles(level);
+ for (const auto& meta : level_files) {
+ assert(meta);
+
+ live_table_files->emplace_back(meta->fd.GetNumber());
}
}
+
+ const auto& blob_files = storage_info_.GetBlobFiles();
+ for (const auto& pair : blob_files) {
+ const auto& meta = pair.second;
+ assert(meta);
+
+ live_blob_files->emplace_back(meta->GetBlobFileNumber());
+ }
}
std::string Version::DebugString(bool hex, bool print_stats) const {
r.append("\n");
}
}
+
+ const auto& blob_files = storage_info_.GetBlobFiles();
+ if (!blob_files.empty()) {
+ r.append("--- blob files --- version# ");
+ AppendNumberTo(&r, version_number_);
+ r.append(" ---\n");
+ for (const auto& pair : blob_files) {
+ const auto& blob_file_meta = pair.second;
+ assert(blob_file_meta);
+
+ r.append(blob_file_meta->DebugString());
+ r.push_back('\n');
+ }
+ }
+
return r;
}
ColumnFamilyData* cfd;
const MutableCFOptions mutable_cf_options;
const autovector<VersionEdit*>& edit_list;
+ const std::function<void(const Status&)> manifest_write_callback;
- explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
- const MutableCFOptions& cf_options,
- const autovector<VersionEdit*>& e)
+ explicit ManifestWriter(
+ InstrumentedMutex* mu, ColumnFamilyData* _cfd,
+ const MutableCFOptions& cf_options, const autovector<VersionEdit*>& e,
+ const std::function<void(const Status&)>& manifest_wcb)
: done(false),
cv(mu),
cfd(_cfd),
mutable_cf_options(cf_options),
- edit_list(e) {}
+ edit_list(e),
+ manifest_write_callback(manifest_wcb) {}
+ ~ManifestWriter() { status.PermitUncheckedError(); }
+
+ bool IsAllWalEdits() const {
+ bool all_wal_edits = true;
+ for (const auto& e : edit_list) {
+ if (!e->IsWalManipulation()) {
+ all_wal_edits = false;
+ break;
+ }
+ }
+ return all_wal_edits;
+ }
};
Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
const FileOptions& storage_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller,
- BlockCacheTracer* const block_cache_tracer)
- : column_family_set_(new ColumnFamilySet(
- dbname, _db_options, storage_options, table_cache,
- write_buffer_manager, write_controller, block_cache_tracer)),
+ BlockCacheTracer* const block_cache_tracer,
+ const std::shared_ptr<IOTracer>& io_tracer)
+ : column_family_set_(
+ new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
+ write_buffer_manager, write_controller,
+ block_cache_tracer, io_tracer)),
+ table_cache_(table_cache),
env_(_db_options->env),
- fs_(_db_options->fs.get()),
+ fs_(_db_options->fs, io_tracer),
dbname_(dbname),
db_options_(_db_options),
next_file_number_(2),
current_version_number_(0),
manifest_file_size_(0),
file_options_(storage_options),
- block_cache_tracer_(block_cache_tracer) {}
+ block_cache_tracer_(block_cache_tracer),
+ io_tracer_(io_tracer) {}
VersionSet::~VersionSet() {
// we need to delete column_family_set_ because its destructor depends on
// VersionSet
- Cache* table_cache = column_family_set_->get_table_cache();
column_family_set_.reset();
for (auto& file : obsolete_files_) {
if (file.metadata->table_reader_handle) {
- table_cache->Release(file.metadata->table_reader_handle);
- TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
+ table_cache_->Release(file.metadata->table_reader_handle);
+ TableCache::Evict(table_cache_, file.metadata->fd.GetNumber());
}
file.DeleteMetadata();
}
obsolete_files_.clear();
+ io_status_.PermitUncheckedError();
+}
+
+void VersionSet::Reset() {
+ if (column_family_set_) {
+ WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
+ WriteController* wc = column_family_set_->write_controller();
+ column_family_set_.reset(
+ new ColumnFamilySet(dbname_, db_options_, file_options_, table_cache_,
+ wbm, wc, block_cache_tracer_, io_tracer_));
+ }
+ db_id_.clear();
+ next_file_number_.store(2);
+ min_log_number_to_keep_2pc_.store(0);
+ manifest_file_number_ = 0;
+ options_file_number_ = 0;
+ pending_manifest_file_number_ = 0;
+ last_sequence_.store(0);
+ last_allocated_sequence_.store(0);
+ last_published_sequence_.store(0);
+ prev_log_number_ = 0;
+ descriptor_log_.reset();
+ current_version_number_ = 0;
+ manifest_writers_.clear();
+ manifest_file_size_ = 0;
+ obsolete_files_.clear();
+ obsolete_manifests_.clear();
+ wals_.Reset();
}
void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Status VersionSet::ProcessManifestWrites(
std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
- Directory* db_directory, bool new_descriptor_log,
+ FSDirectory* db_directory, bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options) {
+ mu->AssertHeld();
assert(!writers.empty());
ManifestWriter& first_writer = writers.front();
ManifestWriter* last_writer = &first_writer;
}
}
if (version == nullptr) {
- version = new Version(last_writer->cfd, this, file_options_,
- last_writer->mutable_cf_options,
- current_version_number_++);
- versions.push_back(version);
- mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
- builder_guards.emplace_back(
- new BaseReferencedVersionBuilder(last_writer->cfd));
- builder = builder_guards.back()->version_builder();
- }
- assert(builder != nullptr); // make checker happy
+ // WAL manipulations do not need to be applied to versions.
+ if (!last_writer->IsAllWalEdits()) {
+ version = new Version(last_writer->cfd, this, file_options_,
+ last_writer->mutable_cf_options, io_tracer_,
+ current_version_number_++);
+ versions.push_back(version);
+ mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
+ builder_guards.emplace_back(
+ new BaseReferencedVersionBuilder(last_writer->cfd));
+ builder = builder_guards.back()->version_builder();
+ }
+ assert(last_writer->IsAllWalEdits() || builder);
+ assert(last_writer->IsAllWalEdits() || version);
+ TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion",
+ version);
+ }
for (const auto& e : last_writer->edit_list) {
if (e->is_in_atomic_group_) {
if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
}
#endif // NDEBUG
- uint64_t new_manifest_file_size = 0;
- Status s;
-
assert(pending_manifest_file_number_ == 0);
if (!descriptor_log_ ||
manifest_file_size_ > db_options_->max_manifest_file_size) {
// reads its content after releasing db mutex to avoid race with
// SwitchMemtable().
std::unordered_map<uint32_t, MutableCFState> curr_state;
+ VersionEdit wal_additions;
if (new_descriptor_log) {
pending_manifest_file_number_ = NewFileNumber();
batch_edits.back()->SetNextFile(next_file_number_.load());
assert(curr_state.find(cfd->GetID()) == curr_state.end());
curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
}
+
+ for (const auto& wal : wals_.GetWals()) {
+ wal_additions.AddWal(wal.first, wal.second);
+ }
}
+ uint64_t new_manifest_file_size = 0;
+ Status s;
+ IOStatus io_s;
{
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
mu->Unlock();
- TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
+ TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
assert(!builder_guards.empty() &&
builder_guards.size() == versions.size());
ColumnFamilyData* cfd = versions[i]->cfd_;
s = builder_guards[i]->version_builder()->LoadTableHandlers(
- cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
+ cfd->internal_stats(), 1 /* max_threads */,
true /* prefetch_index_and_filter_in_cache */,
false /* is_initial_load */,
- mutable_cf_options_ptrs[i]->prefix_extractor.get());
+ mutable_cf_options_ptrs[i]->prefix_extractor.get(),
+ MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i]));
if (!s.ok()) {
if (db_options_->paranoid_checks) {
break;
std::string descriptor_fname =
DescriptorFileName(dbname_, pending_manifest_file_number_);
std::unique_ptr<FSWritableFile> descriptor_file;
- s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
- opt_file_opts);
- if (s.ok()) {
+ io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file,
+ opt_file_opts);
+ if (io_s.ok()) {
descriptor_file->SetPreallocationBlockSize(
db_options_->manifest_preallocation_size);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
- nullptr, db_options_->listeners));
+ io_tracer_, nullptr, db_options_->listeners));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
- s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
+ s = WriteCurrentStateToManifest(curr_state, wal_additions,
+ descriptor_log_.get(), io_s);
+ } else {
+ s = io_s;
}
}
}
++idx;
#endif /* !NDEBUG */
- s = descriptor_log_->AddRecord(record);
- if (!s.ok()) {
+ io_s = descriptor_log_->AddRecord(record);
+ if (!io_s.ok()) {
+ s = io_s;
break;
}
}
if (s.ok()) {
- s = SyncManifest(env_, db_options_, descriptor_log_->file());
+ io_s = SyncManifest(env_, db_options_, descriptor_log_->file());
+ TEST_SYNC_POINT_CALLBACK(
+ "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
}
- if (!s.ok()) {
+ if (!io_s.ok()) {
+ s = io_s;
ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
s.ToString().c_str());
}
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && new_descriptor_log) {
- s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
- db_directory);
+ io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
+ db_directory);
+ if (!io_s.ok()) {
+ s = io_s;
+ }
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
}
mu->Lock();
}
+ if (s.ok()) {
+ // Apply WAL edits, DB mutex must be held.
+ for (auto& e : batch_edits) {
+ if (e->IsWalAddition()) {
+ s = wals_.AddWals(e->GetWalAdditions());
+ } else if (e->IsWalDeletion()) {
+ s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber());
+ }
+ if (!s.ok()) {
+ break;
+ }
+ }
+ }
+
+ if (!io_s.ok()) {
+ if (io_status_.ok()) {
+ io_status_ = io_s;
+ }
+ } else if (!io_status_.ok()) {
+ io_status_ = io_s;
+ }
+
// Append the old manifest file to the obsolete_manifest_ list to be deleted
// by PurgeObsoleteFiles later.
if (s.ok() && new_descriptor_log) {
ROCKS_LOG_INFO(db_options_->info_log,
"Deleting manifest %" PRIu64 " current manifest %" PRIu64
"\n",
- manifest_file_number_, pending_manifest_file_number_);
- env_->DeleteFile(
+ pending_manifest_file_number_, manifest_file_number_);
+ Status manifest_del_status = env_->DeleteFile(
DescriptorFileName(dbname_, pending_manifest_file_number_));
+ if (!manifest_del_status.ok()) {
+ ROCKS_LOG_WARN(db_options_->info_log,
+ "Failed to delete manifest %" PRIu64 ": %s",
+ pending_manifest_file_number_,
+ manifest_del_status.ToString().c_str());
+ }
}
}
}
ready->status = s;
ready->done = true;
+ if (ready->manifest_write_callback) {
+ (ready->manifest_write_callback)(s);
+ }
if (need_signal) {
ready->cv.Signal();
}
const autovector<ColumnFamilyData*>& column_family_datas,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<autovector<VersionEdit*>>& edit_lists,
- InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log,
- const ColumnFamilyOptions* new_cf_options) {
+ InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log,
+ const ColumnFamilyOptions* new_cf_options,
+ const std::vector<std::function<void(const Status&)>>& manifest_wcbs) {
mu->AssertHeld();
int num_edits = 0;
for (const auto& elist : edit_lists) {
assert(static_cast<size_t>(num_cfds) == edit_lists.size());
}
for (int i = 0; i < num_cfds; ++i) {
+ const auto wcb =
+ manifest_wcbs.empty() ? [](const Status&) {} : manifest_wcbs[i];
writers.emplace_back(mu, column_family_datas[i],
- *mutable_cf_options_list[i], edit_lists[i]);
+ *mutable_cf_options_list[i], edit_lists[i], wcb);
manifest_writers_.push_back(&writers[i]);
}
assert(!writers.empty());
ManifestWriter& first_writer = writers.front();
+ TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting",
+ nullptr);
while (!first_writer.done && &first_writer != manifest_writers_.front()) {
first_writer.cv.Wait();
}
for (const auto& writer : writers) {
assert(writer.done);
}
+ TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu);
#endif /* !NDEBUG */
return first_writer.status;
}
edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
: last_sequence_);
- Status s = builder->Apply(edit);
-
- return s;
+ // The builder can be nullptr only if edit is WAL manipulation,
+ // because WAL edits do not need to be applied to versions,
+ // we return Status::OK() in this case.
+ assert(builder || edit->IsWalManipulation());
+ return builder ? builder->Apply(edit) : Status::OK();
}
Status VersionSet::ApplyOneVersionEditToBuilder(
return Status::Corruption(
"Manifest - dropping non-existing column family");
}
+ } else if (edit.IsWalAddition()) {
+ Status s = wals_.AddWals(edit.GetWalAdditions());
+ if (!s.ok()) {
+ return s;
+ }
+ } else if (edit.IsWalDeletion()) {
+ Status s = wals_.DeleteWalsBefore(edit.GetWalDeletion().GetLogNumber());
+ if (!s.ok()) {
+ return s;
+ }
} else if (!cf_in_not_found) {
if (!cf_in_builders) {
return Status::Corruption(
if (dbname.back() != '/') {
manifest_path->push_back('/');
}
- *manifest_path += fname;
+ manifest_path->append(fname);
return Status::OK();
}
Status VersionSet::ReadAndRecover(
- log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
+ log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
builders,
- VersionEditParams* version_edit_params, std::string* db_id) {
- assert(reader != nullptr);
+ Status* log_read_status, VersionEditParams* version_edit_params,
+ std::string* db_id) {
assert(read_buffer != nullptr);
+ assert(log_read_status != nullptr);
Status s;
Slice record;
std::string scratch;
size_t recovered_edits = 0;
- while (reader->ReadRecord(&record, &scratch) && s.ok()) {
+ while (s.ok() && reader.ReadRecord(&record, &scratch) &&
+ log_read_status->ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
}
}
}
+ if (!log_read_status->ok()) {
+ s = *log_read_status;
+ }
if (!s.ok()) {
// Clear the buffer if we fail to decode/apply an edit.
read_buffer->Clear();
Status VersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
std::string* db_id) {
- std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
- for (const auto& cf : column_families) {
- cf_name_to_options.emplace(cf.name, cf.options);
- }
- // keeps track of column families in manifest that were not found in
- // column families parameters. if those column families are not dropped
- // by subsequent manifest records, Recover() will return failure status
- std::unordered_map<int, std::string> column_families_not_found;
-
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string manifest_path;
- Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
+ Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
&manifest_file_number_);
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file), manifest_path,
- db_options_->log_readahead_size));
- }
-
- std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
- builders;
-
- // add default column family
- auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
- if (default_cf_iter == cf_name_to_options.end()) {
- return Status::InvalidArgument("Default column family not specified");
+ db_options_->log_readahead_size, io_tracer_));
}
- VersionEdit default_cf_edit;
- default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
- default_cf_edit.SetColumnFamily(0);
- ColumnFamilyData* default_cfd =
- CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
- // In recovery, nobody else can access it, so it's fine to set it to be
- // initialized earlier.
- default_cfd->set_initialized();
- builders.insert(
- std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
- new BaseReferencedVersionBuilder(default_cfd))));
uint64_t current_manifest_file_size = 0;
- VersionEditParams version_edit_params;
+ uint64_t log_number = 0;
{
VersionSet::LogReporter reporter;
- reporter.status = &s;
+ Status log_read_status;
+ reporter.status = &log_read_status;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true /* checksum */, 0 /* log_number */);
- Slice record;
- std::string scratch;
- AtomicGroupReadBuffer read_buffer;
- s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
- column_families_not_found, builders,
- &version_edit_params, db_id);
- current_manifest_file_size = reader.GetReadOffset();
- assert(current_manifest_file_size != 0);
- }
-
- if (s.ok()) {
- if (!version_edit_params.has_next_file_number_) {
- s = Status::Corruption("no meta-nextfile entry in descriptor");
- } else if (!version_edit_params.has_log_number_) {
- s = Status::Corruption("no meta-lognumber entry in descriptor");
- } else if (!version_edit_params.has_last_sequence_) {
- s = Status::Corruption("no last-sequence-number entry in descriptor");
- }
-
- if (!version_edit_params.has_prev_log_number_) {
- version_edit_params.SetPrevLogNumber(0);
- }
-
- column_family_set_->UpdateMaxColumnFamily(
- version_edit_params.max_column_family_);
-
- // When reading DB generated using old release, min_log_number_to_keep=0.
- // All log files will be scanned for potential prepare entries.
- MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
- MarkFileNumberUsed(version_edit_params.prev_log_number_);
- MarkFileNumberUsed(version_edit_params.log_number_);
- }
-
- // there were some column families in the MANIFEST that weren't specified
- // in the argument. This is OK in read_only mode
- if (read_only == false && !column_families_not_found.empty()) {
- std::string list_of_not_found;
- for (const auto& cf : column_families_not_found) {
- list_of_not_found += ", " + cf.second;
- }
- list_of_not_found = list_of_not_found.substr(2);
- s = Status::InvalidArgument(
- "You have to open all column families. Column families not opened: " +
- list_of_not_found);
- }
-
- if (s.ok()) {
- for (auto cfd : *column_family_set_) {
- assert(builders.count(cfd->GetID()) > 0);
- auto* builder = builders[cfd->GetID()]->version_builder();
- if (!builder->CheckConsistencyForNumLevels()) {
- s = Status::InvalidArgument(
- "db has more levels than options.num_levels");
- break;
- }
+ VersionEditHandler handler(
+ read_only, column_families, const_cast<VersionSet*>(this),
+ /*track_missing_files=*/false,
+ /*no_error_if_table_files_missing=*/false, io_tracer_);
+ handler.Iterate(reader, &log_read_status);
+ s = handler.status();
+ if (s.ok()) {
+ log_number = handler.GetVersionEditParams().log_number_;
+ current_manifest_file_size = reader.GetReadOffset();
+ assert(current_manifest_file_size != 0);
+ handler.GetDbId(db_id);
}
}
if (s.ok()) {
- for (auto cfd : *column_family_set_) {
- if (cfd->IsDropped()) {
- continue;
- }
- if (read_only) {
- cfd->table_cache()->SetTablesAreImmortal();
- }
- assert(cfd->initialized());
- auto builders_iter = builders.find(cfd->GetID());
- assert(builders_iter != builders.end());
- auto builder = builders_iter->second->version_builder();
-
- // unlimited table cache. Pre-load table handle now.
- // Need to do it out of the mutex.
- s = builder->LoadTableHandlers(
- cfd->internal_stats(), db_options_->max_file_opening_threads,
- false /* prefetch_index_and_filter_in_cache */,
- true /* is_initial_load */,
- cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
- if (!s.ok()) {
- if (db_options_->paranoid_checks) {
- return s;
- }
- s = Status::OK();
- }
-
- Version* v = new Version(cfd, this, file_options_,
- *cfd->GetLatestMutableCFOptions(),
- current_version_number_++);
- builder->SaveTo(v->storage_info());
-
- // Install recovered version
- v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
- !(db_options_->skip_stats_update_on_db_open));
- AppendVersion(cfd, v);
- }
-
manifest_file_size_ = current_manifest_file_size;
- next_file_number_.store(version_edit_params.next_file_number_ + 1);
- last_allocated_sequence_ = version_edit_params.last_sequence_;
- last_published_sequence_ = version_edit_params.last_sequence_;
- last_sequence_ = version_edit_params.last_sequence_;
- prev_log_number_ = version_edit_params.prev_log_number_;
-
ROCKS_LOG_INFO(
db_options_->info_log,
"Recovered from manifest file:%s succeeded,"
",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
",min_log_number_to_keep is %" PRIu64 "\n",
manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
- last_sequence_.load(), version_edit_params.log_number_,
- prev_log_number_, column_family_set_->GetMaxColumnFamily(),
- min_log_number_to_keep_2pc());
+ last_sequence_.load(), log_number, prev_log_number_,
+ column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
return s;
}
+namespace {
+class ManifestPicker {
+ public:
+ explicit ManifestPicker(const std::string& dbname,
+ const std::vector<std::string>& files_in_dbname);
+ // REQUIRES Valid() == true
+ std::string GetNextManifest(uint64_t* file_number, std::string* file_name);
+ bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); }
+
+ private:
+ const std::string& dbname_;
+ // MANIFEST file names(s)
+ std::vector<std::string> manifest_files_;
+ std::vector<std::string>::const_iterator manifest_file_iter_;
+};
+
+ManifestPicker::ManifestPicker(const std::string& dbname,
+ const std::vector<std::string>& files_in_dbname)
+ : dbname_(dbname) {
+ // populate manifest files
+ assert(!files_in_dbname.empty());
+ for (const auto& fname : files_in_dbname) {
+ uint64_t file_num = 0;
+ FileType file_type;
+ bool parse_ok = ParseFileName(fname, &file_num, &file_type);
+ if (parse_ok && file_type == kDescriptorFile) {
+ manifest_files_.push_back(fname);
+ }
+ }
+ // seek to first manifest
+ std::sort(manifest_files_.begin(), manifest_files_.end(),
+ [](const std::string& lhs, const std::string& rhs) {
+ uint64_t num1 = 0;
+ uint64_t num2 = 0;
+ FileType type1;
+ FileType type2;
+ bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
+ bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
+#ifndef NDEBUG
+ assert(parse_ok1);
+ assert(parse_ok2);
+#else
+ (void)parse_ok1;
+ (void)parse_ok2;
+#endif
+ return num1 > num2;
+ });
+ manifest_file_iter_ = manifest_files_.begin();
+}
+
+std::string ManifestPicker::GetNextManifest(uint64_t* number,
+ std::string* file_name) {
+ assert(Valid());
+ std::string ret;
+ if (manifest_file_iter_ != manifest_files_.end()) {
+ ret.assign(dbname_);
+ if (ret.back() != kFilePathSeparator) {
+ ret.push_back(kFilePathSeparator);
+ }
+ ret.append(*manifest_file_iter_);
+ if (number) {
+ FileType type;
+ bool parse = ParseFileName(*manifest_file_iter_, number, &type);
+ assert(type == kDescriptorFile);
+#ifndef NDEBUG
+ assert(parse);
+#else
+ (void)parse;
+#endif
+ }
+ if (file_name) {
+ *file_name = *manifest_file_iter_;
+ }
+ ++manifest_file_iter_;
+ }
+ return ret;
+}
+} // namespace
+
+Status VersionSet::TryRecover(
+ const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
+ const std::vector<std::string>& files_in_dbname, std::string* db_id,
+ bool* has_missing_table_file) {
+ ManifestPicker manifest_picker(dbname_, files_in_dbname);
+ if (!manifest_picker.Valid()) {
+ return Status::Corruption("Cannot locate MANIFEST file in " + dbname_);
+ }
+ Status s;
+ std::string manifest_path =
+ manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
+ while (!manifest_path.empty()) {
+ s = TryRecoverFromOneManifest(manifest_path, column_families, read_only,
+ db_id, has_missing_table_file);
+ if (s.ok() || !manifest_picker.Valid()) {
+ break;
+ }
+ Reset();
+ manifest_path =
+ manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
+ }
+ return s;
+}
+
+Status VersionSet::TryRecoverFromOneManifest(
+ const std::string& manifest_path,
+ const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
+ std::string* db_id, bool* has_missing_table_file) {
+ ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n",
+ manifest_path.c_str());
+ std::unique_ptr<SequentialFileReader> manifest_file_reader;
+ Status s;
+ {
+ std::unique_ptr<FSSequentialFile> manifest_file;
+ s = fs_->NewSequentialFile(manifest_path,
+ fs_->OptimizeForManifestRead(file_options_),
+ &manifest_file, nullptr);
+ if (!s.ok()) {
+ return s;
+ }
+ manifest_file_reader.reset(
+ new SequentialFileReader(std::move(manifest_file), manifest_path,
+ db_options_->log_readahead_size, io_tracer_));
+ }
+
+ assert(s.ok());
+ VersionSet::LogReporter reporter;
+ reporter.status = &s;
+ log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
+ /*checksum=*/true, /*log_num=*/0);
+ VersionEditHandlerPointInTime handler_pit(
+ read_only, column_families, const_cast<VersionSet*>(this), io_tracer_);
+
+ handler_pit.Iterate(reader, &s);
+
+ handler_pit.GetDbId(db_id);
+
+ assert(nullptr != has_missing_table_file);
+ *has_missing_table_file = handler_pit.HasMissingFiles();
+
+ return handler_pit.status();
+}
+
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
const std::string& dbname,
FileSystem* fs) {
if (!s.ok()) {
return s;
}
- file_reader.reset(new SequentialFileReader(std::move(file), manifest_path));
+ file_reader.reset(new SequentialFileReader(std::move(file), manifest_path,
+ nullptr /*IOTracer*/));
}
- std::map<uint32_t, std::string> column_family_names;
- // default column family is always implicitly there
- column_family_names.insert({0, kDefaultColumnFamilyName});
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter,
true /* checksum */, 0 /* log_number */);
- Slice record;
- std::string scratch;
- while (reader.ReadRecord(&record, &scratch) && s.ok()) {
- VersionEdit edit;
- s = edit.DecodeFrom(record);
- if (!s.ok()) {
- break;
- }
- if (edit.is_column_family_add_) {
- if (column_family_names.find(edit.column_family_) !=
- column_family_names.end()) {
- s = Status::Corruption("Manifest adding the same column family twice");
- break;
- }
- column_family_names.insert(
- {edit.column_family_, edit.column_family_name_});
- } else if (edit.is_column_family_drop_) {
- if (column_family_names.find(edit.column_family_) ==
- column_family_names.end()) {
- s = Status::Corruption(
- "Manifest - dropping non-existing column family");
- break;
- }
- column_family_names.erase(edit.column_family_);
- }
- }
+ ListColumnFamiliesHandler handler;
+ handler.Iterate(reader, &s);
+
+ assert(column_families);
column_families->clear();
- if (s.ok()) {
- for (const auto& iter : column_family_names) {
+ if (handler.status().ok()) {
+ for (const auto& iter : handler.GetColumnFamilyNames()) {
column_families->push_back(iter.second);
}
}
- return s;
+ return handler.status();
}
#ifndef ROCKSDB_LITE
WriteController wc(options->delayed_write_rate);
WriteBufferManager wb(options->db_write_buffer_size);
VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
- /*block_cache_tracer=*/nullptr);
+ nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/);
Status status;
std::vector<ColumnFamilyDescriptor> dummy;
}
if (first_nonempty_level > 0) {
- new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
+ auto& new_last_level = new_files_list[new_levels - 1];
+
+ new_last_level = vstorage->LevelFiles(first_nonempty_level);
+
+ for (size_t i = 0; i < new_last_level.size(); ++i) {
+ const FileMetaData* const meta = new_last_level[i];
+ assert(meta);
+
+ const uint64_t file_number = meta->fd.GetNumber();
+
+ vstorage->file_locations_[file_number] =
+ VersionStorageInfo::FileLocation(new_levels - 1, i);
+ }
}
delete[] vstorage -> files_;
// metadata from Manifest to VersionSet before calling this function.
Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
// Clean the previously stored checksum information if any.
+ Status s;
if (checksum_list == nullptr) {
- return Status::InvalidArgument("checksum_list is nullptr");
+ s = Status::InvalidArgument("checksum_list is nullptr");
+ return s;
}
checksum_list->reset();
for (int level = 0; level < cfd->NumberLevels(); level++) {
for (const auto& file :
cfd->current()->storage_info()->LevelFiles(level)) {
- checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
- file->file_checksum,
- file->file_checksum_func_name);
+ s = checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
+ file->file_checksum,
+ file->file_checksum_func_name);
+ if (!s.ok()) {
+ break;
+ }
+ }
+ if (!s.ok()) {
+ break;
}
}
+ if (!s.ok()) {
+ break;
+ }
}
- return Status::OK();
+ return s;
}
Status VersionSet::DumpManifest(Options& options, std::string& dscname,
Status s;
{
std::unique_ptr<FSSequentialFile> file;
- s = options.file_system->NewSequentialFile(
+ const std::shared_ptr<FileSystem>& fs = options.env->GetFileSystem();
+ s = fs->NewSequentialFile(
dscname,
- options.file_system->OptimizeForManifestRead(file_options_), &file,
+ fs->OptimizeForManifestRead(file_options_), &file,
nullptr);
if (!s.ok()) {
return s;
}
file_reader.reset(new SequentialFileReader(
- std::move(file), dscname, db_options_->log_readahead_size));
+ std::move(file), dscname, db_options_->log_readahead_size, io_tracer_));
}
- bool have_prev_log_number = false;
- bool have_next_file = false;
- bool have_last_sequence = false;
- uint64_t next_file = 0;
- uint64_t last_sequence = 0;
- uint64_t previous_log_number = 0;
- int count = 0;
- std::unordered_map<uint32_t, std::string> comparators;
- std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
- builders;
-
- // add default column family
- VersionEdit default_cf_edit;
- default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
- default_cf_edit.SetColumnFamily(0);
- ColumnFamilyData* default_cfd =
- CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
- builders.insert(
- std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
- new BaseReferencedVersionBuilder(default_cfd))));
-
+ std::vector<ColumnFamilyDescriptor> column_families(
+ 1, ColumnFamilyDescriptor(kDefaultColumnFamilyName, options));
+ DumpManifestHandler handler(column_families, this, io_tracer_, verbose, hex,
+ json);
{
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter,
true /* checksum */, 0 /* log_number */);
- Slice record;
- std::string scratch;
- while (reader.ReadRecord(&record, &scratch) && s.ok()) {
- VersionEdit edit;
- s = edit.DecodeFrom(record);
- if (!s.ok()) {
- break;
- }
-
- // Write out each individual edit
- if (verbose && !json) {
- printf("%s\n", edit.DebugString(hex).c_str());
- } else if (json) {
- printf("%s\n", edit.DebugJSON(count, hex).c_str());
- }
- count++;
-
- bool cf_in_builders =
- builders.find(edit.column_family_) != builders.end();
-
- if (edit.has_comparator_) {
- comparators.insert({edit.column_family_, edit.comparator_});
- }
-
- ColumnFamilyData* cfd = nullptr;
-
- if (edit.is_column_family_add_) {
- if (cf_in_builders) {
- s = Status::Corruption(
- "Manifest adding the same column family twice");
- break;
- }
- cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
- cfd->set_initialized();
- builders.insert(std::make_pair(
- edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
- new BaseReferencedVersionBuilder(cfd))));
- } else if (edit.is_column_family_drop_) {
- if (!cf_in_builders) {
- s = Status::Corruption(
- "Manifest - dropping non-existing column family");
- break;
- }
- auto builder_iter = builders.find(edit.column_family_);
- builders.erase(builder_iter);
- comparators.erase(edit.column_family_);
- cfd = column_family_set_->GetColumnFamily(edit.column_family_);
- assert(cfd != nullptr);
- cfd->UnrefAndTryDelete();
- cfd = nullptr;
- } else {
- if (!cf_in_builders) {
- s = Status::Corruption(
- "Manifest record referencing unknown column family");
- break;
- }
-
- cfd = column_family_set_->GetColumnFamily(edit.column_family_);
- // this should never happen since cf_in_builders is true
- assert(cfd != nullptr);
-
- // if it is not column family add or column family drop,
- // then it's a file add/delete, which should be forwarded
- // to builder
- auto builder = builders.find(edit.column_family_);
- assert(builder != builders.end());
- s = builder->second->version_builder()->Apply(&edit);
- if (!s.ok()) {
- break;
- }
- }
-
- if (cfd != nullptr && edit.has_log_number_) {
- cfd->SetLogNumber(edit.log_number_);
- }
-
-
- if (edit.has_prev_log_number_) {
- previous_log_number = edit.prev_log_number_;
- have_prev_log_number = true;
- }
-
- if (edit.has_next_file_number_) {
- next_file = edit.next_file_number_;
- have_next_file = true;
- }
-
- if (edit.has_last_sequence_) {
- last_sequence = edit.last_sequence_;
- have_last_sequence = true;
- }
-
- if (edit.has_max_column_family_) {
- column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
- }
-
- if (edit.has_min_log_number_to_keep_) {
- MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
- }
- }
+ handler.Iterate(reader, &s);
}
- file_reader.reset();
- if (s.ok()) {
- if (!have_next_file) {
- s = Status::Corruption("no meta-nextfile entry in descriptor");
- printf("no meta-nextfile entry in descriptor");
- } else if (!have_last_sequence) {
- printf("no last-sequence-number entry in descriptor");
- s = Status::Corruption("no last-sequence-number entry in descriptor");
- }
-
- if (!have_prev_log_number) {
- previous_log_number = 0;
- }
- }
-
- if (s.ok()) {
- for (auto cfd : *column_family_set_) {
- if (cfd->IsDropped()) {
- continue;
- }
- auto builders_iter = builders.find(cfd->GetID());
- assert(builders_iter != builders.end());
- auto builder = builders_iter->second->version_builder();
-
- Version* v = new Version(cfd, this, file_options_,
- *cfd->GetLatestMutableCFOptions(),
- current_version_number_++);
- builder->SaveTo(v->storage_info());
- v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
-
- printf("--------------- Column family \"%s\" (ID %" PRIu32
- ") --------------\n",
- cfd->GetName().c_str(), cfd->GetID());
- printf("log number: %" PRIu64 "\n", cfd->GetLogNumber());
- auto comparator = comparators.find(cfd->GetID());
- if (comparator != comparators.end()) {
- printf("comparator: %s\n", comparator->second.c_str());
- } else {
- printf("comparator: <NO COMPARATOR>\n");
- }
- printf("%s \n", v->DebugString(hex).c_str());
- delete v;
- }
-
- next_file_number_.store(next_file + 1);
- last_allocated_sequence_ = last_sequence;
- last_published_sequence_ = last_sequence;
- last_sequence_ = last_sequence;
- prev_log_number_ = previous_log_number;
-
- printf("next_file_number %" PRIu64 " last_sequence %" PRIu64
- " prev_log_number %" PRIu64 " max_column_family %" PRIu32
- " min_log_number_to_keep "
- "%" PRIu64 "\n",
- next_file_number_.load(), last_sequence, previous_log_number,
- column_family_set_->GetMaxColumnFamily(),
- min_log_number_to_keep_2pc());
- }
-
- return s;
+ return handler.status();
}
#endif // ROCKSDB_LITE
Status VersionSet::WriteCurrentStateToManifest(
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
- log::Writer* log) {
+ const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s) {
// TODO: Break up into multiple records to reduce memory usage on recovery?
// WARNING: This method doesn't hold a mutex!!
// LogAndApply. Column family manipulations can only happen within LogAndApply
// (the same single thread), so we're safe to iterate.
+ assert(io_s.ok());
if (db_options_->write_dbid_to_manifest) {
VersionEdit edit_for_db_id;
assert(!db_id_.empty());
return Status::Corruption("Unable to Encode VersionEdit:" +
edit_for_db_id.DebugString(true));
}
- Status add_record = log->AddRecord(db_id_record);
- if (!add_record.ok()) {
- return add_record;
+ io_s = log->AddRecord(db_id_record);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ }
+
+ // Save WALs.
+ if (!wal_additions.GetWalAdditions().empty()) {
+ TEST_SYNC_POINT_CALLBACK("VersionSet::WriteCurrentStateToManifest:SaveWal",
+ const_cast<VersionEdit*>(&wal_additions));
+ std::string record;
+ if (!wal_additions.EncodeTo(&record)) {
+ return Status::Corruption("Unable to Encode VersionEdit: " +
+ wal_additions.DebugString(true));
+ }
+ io_s = log->AddRecord(record);
+ if (!io_s.ok()) {
+ return io_s;
}
}
for (auto cfd : *column_family_set_) {
+ assert(cfd);
+
if (cfd->IsDropped()) {
continue;
}
return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true));
}
- Status s = log->AddRecord(record);
- if (!s.ok()) {
- return s;
+ io_s = log->AddRecord(record);
+ if (!io_s.ok()) {
+ return io_s;
}
}
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
+ assert(cfd->current());
+ assert(cfd->current()->storage_info());
+
for (int level = 0; level < cfd->NumberLevels(); level++) {
for (const auto& f :
cfd->current()->storage_info()->LevelFiles(level)) {
f->file_checksum, f->file_checksum_func_name);
}
}
+
+ const auto& blob_files = cfd->current()->storage_info()->GetBlobFiles();
+ for (const auto& pair : blob_files) {
+ const uint64_t blob_file_number = pair.first;
+ const auto& meta = pair.second;
+
+ assert(meta);
+ assert(blob_file_number == meta->GetBlobFileNumber());
+
+ edit.AddBlobFile(blob_file_number, meta->GetTotalBlobCount(),
+ meta->GetTotalBlobBytes(), meta->GetChecksumMethod(),
+ meta->GetChecksumValue());
+ if (meta->GetGarbageBlobCount() > 0) {
+ edit.AddBlobFileGarbage(blob_file_number, meta->GetGarbageBlobCount(),
+ meta->GetGarbageBlobBytes());
+ }
+ }
+
const auto iter = curr_state.find(cfd->GetID());
assert(iter != curr_state.end());
uint64_t log_number = iter->second.log_number;
return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true));
}
- Status s = log->AddRecord(record);
- if (!s.ok()) {
- return s;
+ io_s = log->AddRecord(record);
+ if (!io_s.ok()) {
+ return io_s;
}
}
}
static_cast<uint64_t>(total_full_size * margin)) {
total_full_size += total_intersecting_size / 2;
} else {
- // Estimate for all the first files, at each level
+ // Estimate for all the first files (might also be last files), at each
+ // level
for (const auto file_ptr : first_files) {
total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
}
v->GetMutableCFOptions().prefix_extractor.get());
}
-void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
+void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_table_files,
+ std::vector<uint64_t>* live_blob_files) const {
+ assert(live_table_files);
+ assert(live_blob_files);
+
// pre-calculate space requirement
- int64_t total_files = 0;
+ size_t total_table_files = 0;
+ size_t total_blob_files = 0;
+
+ assert(column_family_set_);
for (auto cfd : *column_family_set_) {
+ assert(cfd);
+
if (!cfd->initialized()) {
continue;
}
- Version* dummy_versions = cfd->dummy_versions();
+
+ Version* const dummy_versions = cfd->dummy_versions();
+ assert(dummy_versions);
+
for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
+ assert(v);
+
const auto* vstorage = v->storage_info();
- for (int level = 0; level < vstorage->num_levels(); level++) {
- total_files += vstorage->LevelFiles(level).size();
+ assert(vstorage);
+
+ for (int level = 0; level < vstorage->num_levels(); ++level) {
+ total_table_files += vstorage->LevelFiles(level).size();
}
+
+ total_blob_files += vstorage->GetBlobFiles().size();
}
}
// just one time extension to the right size
- live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
+ live_table_files->reserve(live_table_files->size() + total_table_files);
+ live_blob_files->reserve(live_blob_files->size() + total_blob_files);
+ assert(column_family_set_);
for (auto cfd : *column_family_set_) {
+ assert(cfd);
if (!cfd->initialized()) {
continue;
}
+
auto* current = cfd->current();
bool found_current = false;
- Version* dummy_versions = cfd->dummy_versions();
+
+ Version* const dummy_versions = cfd->dummy_versions();
+ assert(dummy_versions);
+
for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
- v->AddLiveFiles(live_list);
+ v->AddLiveFiles(live_table_files, live_blob_files);
if (v == current) {
found_current = true;
}
}
+
if (!found_current && current != nullptr) {
// Should never happen unless it is a bug.
assert(false);
- current->AddLiveFiles(live_list);
+ current->AddLiveFiles(live_table_files, live_blob_files);
}
}
}
InternalIterator* VersionSet::MakeInputIterator(
- const Compaction* c, RangeDelAggregator* range_del_agg,
+ const ReadOptions& read_options, const Compaction* c,
+ RangeDelAggregator* range_del_agg,
const FileOptions& file_options_compactions) {
auto cfd = c->column_family_data();
- ReadOptions read_options;
- read_options.verify_checksums = true;
- read_options.fill_cache = false;
- // Compaction iterators shouldn't be confined to a single prefix.
- // Compactions use Seek() for
- // (a) concurrent compactions,
- // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
- read_options.total_order_seek = true;
-
// Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level.
// TODO(opt): use concatenating iterator for level-0 if there is no overlap
for (size_t i = 0; i < flevel->num_files; i++) {
list[num++] = cfd->table_cache()->NewIterator(
read_options, file_options_compactions,
- cfd->internal_comparator(),
- *flevel->files[i].file_metadata, range_del_agg,
- c->mutable_cf_options()->prefix_extractor.get(),
+ cfd->internal_comparator(), *flevel->files[i].file_metadata,
+ range_del_agg, c->mutable_cf_options()->prefix_extractor.get(),
/*table_reader_ptr=*/nullptr,
/*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
/*arena=*/nullptr,
- /*skip_filters=*/false, /*level=*/static_cast<int>(which),
+ /*skip_filters=*/false,
+ /*level=*/static_cast<int>(c->level(which)),
+ MaxFileSizeForL0MetaPin(*c->mutable_cf_options()),
/*smallest_compaction_key=*/nullptr,
- /*largest_compaction_key=*/nullptr);
+ /*largest_compaction_key=*/nullptr,
+ /*allow_unprepared_value=*/false);
}
} else {
// Create concatenating iterator for the files from this level
/*should_sample=*/false,
/*no per level latency histogram=*/nullptr,
TableReaderCaller::kCompaction, /*skip_filters=*/false,
- /*level=*/static_cast<int>(which), range_del_agg,
+ /*level=*/static_cast<int>(c->level(which)), range_del_agg,
c->boundaries(which));
}
}
}
void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
+ std::vector<ObsoleteBlobFileInfo>* blob_files,
std::vector<std::string>* manifest_filenames,
uint64_t min_pending_output) {
+ assert(files);
+ assert(blob_files);
+ assert(manifest_filenames);
+ assert(files->empty());
+ assert(blob_files->empty());
assert(manifest_filenames->empty());
- obsolete_manifests_.swap(*manifest_filenames);
+
std::vector<ObsoleteFileInfo> pending_files;
for (auto& f : obsolete_files_) {
if (f.metadata->fd.GetNumber() < min_pending_output) {
- files->push_back(std::move(f));
+ files->emplace_back(std::move(f));
} else {
- pending_files.push_back(std::move(f));
+ pending_files.emplace_back(std::move(f));
}
}
obsolete_files_.swap(pending_files);
+
+ std::vector<ObsoleteBlobFileInfo> pending_blob_files;
+ for (auto& blob_file : obsolete_blob_files_) {
+ if (blob_file.GetBlobFileNumber() < min_pending_output) {
+ blob_files->emplace_back(std::move(blob_file));
+ } else {
+ pending_blob_files.emplace_back(std::move(blob_file));
+ }
+ }
+ obsolete_blob_files_.swap(pending_blob_files);
+
+ obsolete_manifests_.swap(*manifest_filenames);
}
ColumnFamilyData* VersionSet::CreateColumnFamily(
- const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
+ const ColumnFamilyOptions& cf_options, const VersionEdit* edit) {
assert(edit->is_column_family_add_);
MutableCFOptions dummy_cf_options;
Version* dummy_versions =
- new Version(nullptr, this, file_options_, dummy_cf_options);
+ new Version(nullptr, this, file_options_, dummy_cf_options, io_tracer_);
// Ref() dummy version once so that later we can call Unref() to delete it
// by avoiding calling "delete" explicitly (~Version is private)
dummy_versions->Ref();
cf_options);
Version* v = new Version(new_cfd, this, file_options_,
- *new_cfd->GetLatestMutableCFOptions(),
+ *new_cfd->GetLatestMutableCFOptions(), io_tracer_,
current_version_number_++);
// Fill level target base information.
return total_files_size;
}
-ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
- const ImmutableDBOptions* _db_options,
- const FileOptions& _file_options,
- Cache* table_cache,
- WriteBufferManager* write_buffer_manager,
- WriteController* write_controller)
+Status VersionSet::VerifyFileMetadata(const std::string& fpath,
+ const FileMetaData& meta) const {
+ uint64_t fsize = 0;
+ Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr);
+ if (status.ok()) {
+ if (fsize != meta.fd.GetFileSize()) {
+ status = Status::Corruption("File size mismatch: " + fpath);
+ }
+ }
+ return status;
+}
+
+ReactiveVersionSet::ReactiveVersionSet(
+ const std::string& dbname, const ImmutableDBOptions* _db_options,
+ const FileOptions& _file_options, Cache* table_cache,
+ WriteBufferManager* write_buffer_manager, WriteController* write_controller,
+ const std::shared_ptr<IOTracer>& io_tracer)
: VersionSet(dbname, _db_options, _file_options, table_cache,
write_buffer_manager, write_controller,
- /*block_cache_tracer=*/nullptr),
+ /*block_cache_tracer=*/nullptr, io_tracer),
number_of_edits_to_skip_(0) {}
ReactiveVersionSet::~ReactiveVersionSet() {}
// In recovery, nobody else can access it, so it's fine to set it to be
// initialized earlier.
default_cfd->set_initialized();
- std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
- builders;
+ VersionBuilderMap builders;
std::unordered_map<int, std::string> column_families_not_found;
builders.insert(
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
manifest_reader_status->reset(new Status());
manifest_reporter->reset(new LogReporter());
- static_cast<LogReporter*>(manifest_reporter->get())->status =
+ static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
manifest_reader_status->get();
Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
log::Reader* reader = manifest_reader->get();
VersionEdit version_edit;
while (s.ok() && retry < 1) {
assert(reader != nullptr);
- Slice record;
- std::string scratch;
- s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
- column_families_not_found, builders, &version_edit);
+ s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options,
+ column_families_not_found, builders,
+ manifest_reader_status->get(), &version_edit);
if (s.ok()) {
bool enough = version_edit.has_next_file_number_ &&
version_edit.has_log_number_ &&
cfd->internal_stats(), db_options_->max_file_opening_threads,
false /* prefetch_index_and_filter_in_cache */,
true /* is_initial_load */,
- cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
+ cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
+ MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions()));
if (!s.ok()) {
enough = false;
if (s.IsPathNotFound()) {
auto* builder = builders_iter->second->version_builder();
Version* v = new Version(cfd, this, file_options_,
- *cfd->GetLatestMutableCFOptions(),
+ *cfd->GetLatestMutableCFOptions(), io_tracer_,
current_version_number_++);
- builder->SaveTo(v->storage_info());
+ s = builder->SaveTo(v->storage_info());
- // Install recovered version
- v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
- !(db_options_->skip_stats_update_on_db_open));
- AppendVersion(cfd, v);
+ if (s.ok()) {
+ // Install recovered version
+ v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
+ !(db_options_->skip_stats_update_on_db_open));
+ AppendVersion(cfd, v);
+ } else {
+ ROCKS_LOG_ERROR(db_options_->info_log,
+ "[%s]: inconsistent version: %s\n",
+ cfd->GetName().c_str(), s.ToString().c_str());
+ delete v;
+ break;
+ }
}
+ }
+ if (s.ok()) {
next_file_number_.store(version_edit.next_file_number_ + 1);
last_allocated_sequence_ = version_edit.last_sequence_;
last_published_sequence_ = version_edit.last_sequence_;
s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
if (s.ok()) {
applied_edits++;
+ } else {
+ break;
}
}
}
}
// It's possible that:
// 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
+ // Or the version(s) rebuilt from tailing the MANIFEST is inconsistent.
// 2) we have finished reading the current MANIFEST.
// 3) we have encountered an IOError reading the current MANIFEST.
// We need to look for the next MANIFEST and start from there. If we cannot
// find the next MANIFEST, we should exit the loop.
- s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
+ Status tmp_s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
reader = manifest_reader->get();
- if (s.ok()) {
+ if (tmp_s.ok()) {
if (reader->file()->file_name() == old_manifest_path) {
// Still processing the same MANIFEST, thus no need to continue this
// loop since no record is available if we have reached here.
number_of_edits_to_skip_ += 2;
}
}
+ s = tmp_s;
}
}
}
cfd->internal_stats(), db_options_->max_file_opening_threads,
false /* prefetch_index_and_filter_in_cache */,
false /* is_initial_load */,
- cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
+ cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
+ MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions()));
TEST_SYNC_POINT_CALLBACK(
"ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
"AfterLoadTableHandlers",
if (s.ok()) {
auto version = new Version(cfd, this, file_options_,
- *cfd->GetLatestMutableCFOptions(),
+ *cfd->GetLatestMutableCFOptions(), io_tracer_,
current_version_number_++);
- builder->SaveTo(version->storage_info());
- version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
- AppendVersion(cfd, version);
- active_version_builders_.erase(builder_iter);
- if (cfds_changed->count(cfd) == 0) {
- cfds_changed->insert(cfd);
+ s = builder->SaveTo(version->storage_info());
+ if (s.ok()) {
+ version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
+ AppendVersion(cfd, version);
+ active_version_builders_.erase(builder_iter);
+ if (cfds_changed->count(cfd) == 0) {
+ cfds_changed->insert(cfd);
+ }
+ } else {
+ delete version;
}
} else if (s.IsPathNotFound()) {
s = Status::OK();
// Some other error has occurred during LoadTableHandlers.
}
- if (version_edit->HasNextFile()) {
- next_file_number_.store(version_edit->next_file_number_ + 1);
- }
- if (version_edit->has_last_sequence_) {
- last_allocated_sequence_ = version_edit->last_sequence_;
- last_published_sequence_ = version_edit->last_sequence_;
- last_sequence_ = version_edit->last_sequence_;
- }
- if (version_edit->has_prev_log_number_) {
- prev_log_number_ = version_edit->prev_log_number_;
- MarkFileNumberUsed(version_edit->prev_log_number_);
- }
- if (version_edit->has_log_number_) {
- MarkFileNumberUsed(version_edit->log_number_);
+ if (s.ok()) {
+ if (version_edit->HasNextFile()) {
+ next_file_number_.store(version_edit->next_file_number_ + 1);
+ }
+ if (version_edit->has_last_sequence_) {
+ last_allocated_sequence_ = version_edit->last_sequence_;
+ last_published_sequence_ = version_edit->last_sequence_;
+ last_sequence_ = version_edit->last_sequence_;
+ }
+ if (version_edit->has_prev_log_number_) {
+ prev_log_number_ = version_edit->prev_log_number_;
+ MarkFileNumberUsed(version_edit->prev_log_number_);
+ }
+ if (version_edit->has_log_number_) {
+ MarkFileNumberUsed(version_edit->log_number_);
+ }
+ column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
+ MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
}
- column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
- MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
return s;
}
Status s;
do {
std::string manifest_path;
- s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
+ s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
&manifest_file_number_);
std::unique_ptr<FSSequentialFile> manifest_file;
if (s.ok()) {
}
std::unique_ptr<SequentialFileReader> manifest_file_reader;
if (s.ok()) {
- manifest_file_reader.reset(
- new SequentialFileReader(std::move(manifest_file), manifest_path,
- db_options_->log_readahead_size));
+ manifest_file_reader.reset(new SequentialFileReader(
+ std::move(manifest_file), manifest_path,
+ db_options_->log_readahead_size, io_tracer_));
manifest_reader->reset(new log::FragmentBufferedReader(
nullptr, std::move(manifest_file_reader), reporter,
true /* checksum */, 0 /* log_number */));