X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Frocksdb%2Fdb%2Fversion_set.cc;h=cffc5979d5283849be09ef649625cba231057585;hb=20effc670b57271cb089376d6d0800990e5218d5;hp=e913a97dd8cca0a49a0f2de0e469a402c9fcc0df;hpb=a71831dadd1e1f3e0fa70405511f65cc33db0498;p=ceph.git diff --git a/ceph/src/rocksdb/db/version_set.cc b/ceph/src/rocksdb/db/version_set.cc index e913a97dd..cffc5979d 100644 --- a/ceph/src/rocksdb/db/version_set.cc +++ b/ceph/src/rocksdb/db/version_set.cc @@ -10,6 +10,7 @@ #include "db/version_set.h" #include + #include #include #include @@ -19,7 +20,11 @@ #include #include #include + #include "compaction/compaction.h" +#include "db/blob/blob_file_cache.h" +#include "db/blob/blob_file_reader.h" +#include "db/blob/blob_index.h" #include "db/internal_stats.h" #include "db/log_reader.h" #include "db/log_writer.h" @@ -29,6 +34,7 @@ #include "db/pinned_iterators_manager.h" #include "db/table_cache.h" #include "db/version_builder.h" +#include "db/version_edit_handler.h" #include "file/filename.h" #include "file/random_access_file_reader.h" #include "file/read_write_util.h" @@ -49,6 +55,7 @@ #include "table/table_reader.h" #include "table/two_level_iterator.h" #include "test_util/sync_point.h" +#include "util/cast_util.h" #include "util/coding.h" #include "util/stop_watch.h" #include "util/string_util.h" @@ -88,9 +95,9 @@ Status OverlapWithIterator(const Comparator* ucmp, *overlap = false; if (iter->Valid()) { ParsedInternalKey seek_result; - if (!ParseInternalKey(iter->key(), &seek_result)) { - return Status::Corruption("DB have corrupted keys"); - } + Status s = ParseInternalKey(iter->key(), &seek_result, + false /* log_err_key */); // TODO + if (!s.ok()) return s; if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <= 0) { @@ -364,6 +371,7 @@ class FilePickerMultiGet { range_(range), batch_iter_(range->begin()), batch_iter_prev_(range->begin()), + upper_key_(range->begin()), maybe_repeat_key_(false), current_level_range_(*range, range->begin(), range->end()), current_file_range_(*range, range->begin(), range->end()), @@ -432,7 +440,7 @@ class FilePickerMultiGet { !file_hit)) { struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()]; f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level]; - Slice& user_key = batch_iter_->ukey; + Slice& user_key = batch_iter_->ukey_without_ts; // Do key range filtering of files or/and fractional cascading if: // (1) not all the files are in level 0, or @@ -446,17 +454,17 @@ class FilePickerMultiGet { // Check if key is within a file's range. If search left bound and // right bound point to the same find, we are sure key falls in // range. + int cmp_smallest = user_comparator_->CompareWithoutTimestamp( + user_key, false, ExtractUserKey(f->smallest_key), true); + assert(curr_level_ == 0 || fp_ctx.curr_index_in_curr_level == fp_ctx.start_index_in_curr_level || - user_comparator_->Compare(user_key, - ExtractUserKey(f->smallest_key)) <= 0); + cmp_smallest <= 0); - int cmp_smallest = user_comparator_->Compare( - user_key, ExtractUserKey(f->smallest_key)); if (cmp_smallest >= 0) { - cmp_largest = user_comparator_->Compare( - user_key, ExtractUserKey(f->largest_key)); + cmp_largest = user_comparator_->CompareWithoutTimestamp( + user_key, false, ExtractUserKey(f->largest_key), true); } else { cmp_largest = -1; } @@ -480,9 +488,20 @@ class FilePickerMultiGet { } if (cmp_largest == 0) { // cmp_largest is 0, which means the next key will not be in this - // file, so stop looking further. Also don't increment megt_iter_ - // as we may have to look for this key in the next file if we don't - // find it in this one + // file, so stop looking further. However, its possible there are + // duplicates in the batch, so find the upper bound for the batch + // in this file (upper_key_) by skipping past the duplicates. We + // leave batch_iter_ as is since we may have to pick up from there + // for the next file, if this file has a merge value rather than + // final value + upper_key_ = batch_iter_; + ++upper_key_; + while (upper_key_ != current_level_range_.end() && + user_comparator_->CompareWithoutTimestamp( + batch_iter_->ukey_without_ts, false, + upper_key_->ukey_without_ts, false) == 0) { + ++upper_key_; + } break; } else { if (curr_level_ == 0) { @@ -502,6 +521,12 @@ class FilePickerMultiGet { *fd = f; *file_index = curr_file_index; *is_last_key_in_file = cmp_largest == 0; + if (!*is_last_key_in_file) { + // If the largest key in the batch overlapping the file is not the + // largest key in the file, upper_ley_ would not have been updated so + // update it here + upper_key_ = batch_iter_; + } return file_hit; } @@ -523,7 +548,7 @@ class FilePickerMultiGet { // file regardless for all keys not found yet if (current_level_range_.CheckKeyDone(batch_iter_) || curr_level_ == 0) { - ++batch_iter_; + batch_iter_ = upper_key_; } } // batch_iter_prev_ will become the start key for the next file @@ -543,18 +568,20 @@ class FilePickerMultiGet { &is_last_key_in_file)) { search_ended_ = !PrepareNextLevel(); } else { - MultiGetRange::Iterator upper_key = batch_iter_; if (is_last_key_in_file) { // Since cmp_largest is 0, batch_iter_ still points to the last key // that falls in this file, instead of the next one. Increment - // upper_key so we can set the range properly for SST MultiGet - ++upper_key; - ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level); + // the file index for all keys between batch_iter_ and upper_key_ + auto tmp_iter = batch_iter_; + while (tmp_iter != upper_key_) { + ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level); + ++tmp_iter; + } maybe_repeat_key_ = true; } // Set the range for this file current_file_range_ = - MultiGetRange(next_file_range, batch_iter_prev_, upper_key); + MultiGetRange(next_file_range, batch_iter_prev_, upper_key_); returned_file_level_ = curr_level_; hit_file_level_ = curr_level_; is_hit_file_last_in_level_ = @@ -606,6 +633,7 @@ class FilePickerMultiGet { // key found in the previous SST file, in order to serve as the start of // the batch key range for the next SST file MultiGetRange::Iterator batch_iter_prev_; + MultiGetRange::Iterator upper_key_; bool maybe_repeat_key_; MultiGetRange current_level_range_; MultiGetRange current_file_range_; @@ -625,7 +653,7 @@ class FilePickerMultiGet { if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level < curr_file_level_->num_files) { batch_iter_prev_ = current_level_range_.begin(); - batch_iter_ = current_level_range_.begin(); + upper_key_ = batch_iter_ = current_level_range_.begin(); return true; } } @@ -720,7 +748,7 @@ class FilePickerMultiGet { } if (level_contains_keys) { batch_iter_prev_ = current_level_range_.begin(); - batch_iter_ = current_level_range_.begin(); + upper_key_ = batch_iter_ = current_level_range_.begin(); return true; } curr_level_++; @@ -852,6 +880,7 @@ namespace { class LevelIterator final : public InternalIterator { public: + // @param read_options Must outlive this iterator. LevelIterator(TableCache* table_cache, const ReadOptions& read_options, const FileOptions& file_options, const InternalKeyComparator& icomparator, @@ -860,7 +889,8 @@ class LevelIterator final : public InternalIterator { HistogramImpl* file_read_hist, TableReaderCaller caller, bool skip_filters, int level, RangeDelAggregator* range_del_agg, const std::vector* - compaction_boundaries = nullptr) + compaction_boundaries = nullptr, + bool allow_unprepared_value = false) : table_cache_(table_cache), read_options_(read_options), file_options_(file_options), @@ -872,6 +902,7 @@ class LevelIterator final : public InternalIterator { should_sample_(should_sample), caller_(caller), skip_filters_(skip_filters), + allow_unprepared_value_(allow_unprepared_value), file_index_(flevel_->num_files), level_(level), range_del_agg_(range_del_agg), @@ -906,14 +937,21 @@ class LevelIterator final : public InternalIterator { return file_iter_.iter() ? file_iter_.status() : Status::OK(); } + bool PrepareValue() override { + return file_iter_.PrepareValue(); + } + inline bool MayBeOutOfLowerBound() override { assert(Valid()); return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound(); } - inline bool MayBeOutOfUpperBound() override { - assert(Valid()); - return file_iter_.MayBeOutOfUpperBound(); + inline IterBoundCheck UpperBoundCheckResult() override { + if (Valid()) { + return file_iter_.UpperBoundCheckResult(); + } else { + return IterBoundCheck::kUnknown; + } } void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { @@ -940,13 +978,6 @@ class LevelIterator final : public InternalIterator { void SetFileIterator(InternalIterator* iter); void InitFileIterator(size_t new_file_index); - // Called by both of Next() and NextAndGetResult(). Force inline. - void NextImpl() { - assert(Valid()); - file_iter_.Next(); - SkipEmptyFileForward(); - } - const Slice& file_smallest_key(size_t file_index) { assert(file_index < flevel_->num_files); return flevel_->files[file_index].smallest_key; @@ -955,8 +986,8 @@ class LevelIterator final : public InternalIterator { bool KeyReachedUpperBound(const Slice& internal_key) { return read_options_.iterate_upper_bound != nullptr && user_comparator_.CompareWithoutTimestamp( - ExtractUserKey(internal_key), - *read_options_.iterate_upper_bound) >= 0; + ExtractUserKey(internal_key), /*a_has_ts=*/true, + *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0; } InternalIterator* NewFileIterator() { @@ -977,8 +1008,9 @@ class LevelIterator final : public InternalIterator { read_options_, file_options_, icomparator_, *file_meta.file_metadata, range_del_agg_, prefix_extractor_, nullptr /* don't need reference to table */, file_read_hist_, caller_, - /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key, - largest_compaction_key); + /*arena=*/nullptr, skip_filters_, level_, + /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key, + largest_compaction_key, allow_unprepared_value_); } // Check if current file being fully within iterate_lower_bound. @@ -989,14 +1021,14 @@ class LevelIterator final : public InternalIterator { if (read_options_.iterate_lower_bound != nullptr && file_index_ < flevel_->num_files) { may_be_out_of_lower_bound_ = - user_comparator_.Compare( - ExtractUserKey(file_smallest_key(file_index_)), - *read_options_.iterate_lower_bound) < 0; + user_comparator_.CompareWithoutTimestamp( + ExtractUserKey(file_smallest_key(file_index_)), /*a_has_ts=*/true, + *read_options_.iterate_lower_bound, /*b_has_ts=*/false) < 0; } } TableCache* table_cache_; - const ReadOptions read_options_; + const ReadOptions& read_options_; const FileOptions& file_options_; const InternalKeyComparator& icomparator_; const UserComparatorWrapper user_comparator_; @@ -1011,6 +1043,7 @@ class LevelIterator final : public InternalIterator { bool should_sample_; TableReaderCaller caller_; bool skip_filters_; + bool allow_unprepared_value_; bool may_be_out_of_lower_bound_ = true; size_t file_index_; int level_; @@ -1063,13 +1096,17 @@ void LevelIterator::Seek(const Slice& target) { // next key after the prefix, or make the iterator invalid. // A side benefit will be that it invalidates the iterator earlier so that // the upper level merging iterator can merge fewer child iterators. - Slice target_user_key = ExtractUserKey(target); - Slice file_user_key = ExtractUserKey(file_iter_.key()); - if (prefix_extractor_->InDomain(target_user_key) && - (!prefix_extractor_->InDomain(file_user_key) || - user_comparator_.Compare( - prefix_extractor_->Transform(target_user_key), - prefix_extractor_->Transform(file_user_key)) != 0)) { + size_t ts_sz = user_comparator_.timestamp_size(); + Slice target_user_key_without_ts = + ExtractUserKeyAndStripTimestamp(target, ts_sz); + Slice file_user_key_without_ts = + ExtractUserKeyAndStripTimestamp(file_iter_.key(), ts_sz); + if (prefix_extractor_->InDomain(target_user_key_without_ts) && + (!prefix_extractor_->InDomain(file_user_key_without_ts) || + user_comparator_.CompareWithoutTimestamp( + prefix_extractor_->Transform(target_user_key_without_ts), false, + prefix_extractor_->Transform(file_user_key_without_ts), + false) != 0)) { SetFileIterator(nullptr); } } @@ -1108,14 +1145,26 @@ void LevelIterator::SeekToLast() { CheckMayBeOutOfLowerBound(); } -void LevelIterator::Next() { NextImpl(); } +void LevelIterator::Next() { + assert(Valid()); + file_iter_.Next(); + SkipEmptyFileForward(); +} bool LevelIterator::NextAndGetResult(IterateResult* result) { - NextImpl(); - bool is_valid = Valid(); - if (is_valid) { - result->key = key(); - result->may_be_out_of_upper_bound = MayBeOutOfUpperBound(); + assert(Valid()); + bool is_valid = file_iter_.NextAndGetResult(result); + if (!is_valid) { + SkipEmptyFileForward(); + is_valid = Valid(); + if (is_valid) { + result->key = key(); + result->bound_check_result = file_iter_.UpperBoundCheckResult(); + // Ideally, we should return the real file_iter_.value_prepared but the + // information is not here. It would casue an extra PrepareValue() + // for the first key of a file. + result->value_prepared = !allow_unprepared_value_; + } } return is_valid; } @@ -1130,7 +1179,8 @@ bool LevelIterator::SkipEmptyFileForward() { bool seen_empty_file = false; while (file_iter_.iter() == nullptr || (!file_iter_.Valid() && file_iter_.status().ok() && - !file_iter_.iter()->IsOutOfBound())) { + file_iter_.iter()->UpperBoundCheckResult() != + IterBoundCheck::kOutOfBound)) { seen_empty_file = true; // Move to next file if (file_index_ >= flevel_->num_files - 1) { @@ -1202,28 +1252,6 @@ void LevelIterator::InitFileIterator(size_t new_file_index) { } } // anonymous namespace -// A wrapper of version builder which references the current version in -// constructor and unref it in the destructor. -// Both of the constructor and destructor need to be called inside DB Mutex. -class BaseReferencedVersionBuilder { - public: - explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd) - : version_builder_(new VersionBuilder( - cfd->current()->version_set()->file_options(), cfd->table_cache(), - cfd->current()->storage_info(), cfd->ioptions()->info_log)), - version_(cfd->current()) { - version_->Ref(); - } - ~BaseReferencedVersionBuilder() { - version_->Unref(); - } - VersionBuilder* version_builder() { return version_builder_.get(); } - - private: - std::unique_ptr version_builder_; - Version* version_; -}; - Status Version::GetTableProperties(std::shared_ptr* tp, const FileMetaData* file_meta, const std::string* fname) const { @@ -1264,8 +1292,8 @@ Status Version::GetTableProperties(std::shared_ptr* tp, // pass the magic number check in the footer. std::unique_ptr file_reader( new RandomAccessFileReader( - std::move(file), file_name, nullptr /* env */, nullptr /* stats */, - 0 /* hist_type */, nullptr /* file_read_hist */, + std::move(file), file_name, nullptr /* env */, io_tracer_, + nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */, nullptr /* rate_limiter */, ioptions->listeners)); s = ReadTableProperties( file_reader.get(), file_meta->fd.GetFileSize(), @@ -1554,12 +1582,13 @@ double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel( void Version::AddIterators(const ReadOptions& read_options, const FileOptions& soptions, MergeIteratorBuilder* merge_iter_builder, - RangeDelAggregator* range_del_agg) { + RangeDelAggregator* range_del_agg, + bool allow_unprepared_value) { assert(storage_info_.finalized_); for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) { AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level, - range_del_agg); + range_del_agg, allow_unprepared_value); } } @@ -1567,7 +1596,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, const FileOptions& soptions, MergeIteratorBuilder* merge_iter_builder, int level, - RangeDelAggregator* range_del_agg) { + RangeDelAggregator* range_del_agg, + bool allow_unprepared_value) { assert(storage_info_.finalized_); if (level >= storage_info_.num_non_empty_levels()) { // This is an empty level @@ -1590,9 +1620,9 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, mutable_cf_options_.prefix_extractor.get(), nullptr, cfd_->internal_stats()->GetFileReadHist(0), TableReaderCaller::kUserIterator, arena, - /*skip_filters=*/false, /*level=*/0, + /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr)); + /*largest_compaction_key=*/nullptr, allow_unprepared_value)); } if (should_sample) { // Count ones for every L0 files. This is done per iterator creation @@ -1614,7 +1644,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(), cfd_->internal_stats()->GetFileReadHist(level), TableReaderCaller::kUserIterator, IsFilterSkipped(level), level, - range_del_agg, /*largest_compaction_key=*/nullptr)); + range_del_agg, + /*compaction_boundaries=*/nullptr, allow_unprepared_value)); } } @@ -1648,9 +1679,10 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, mutable_cf_options_.prefix_extractor.get(), nullptr, cfd_->internal_stats()->GetFileReadHist(0), TableReaderCaller::kUserIterator, &arena, - /*skip_filters=*/false, /*level=*/0, + /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr)); + /*largest_compaction_key=*/nullptr, + /*allow_unprepared_value=*/false)); status = OverlapWithIterator( ucmp, smallest_user_key, largest_user_key, iter.get(), overlap); if (!status.ok() || *overlap) { @@ -1726,6 +1758,7 @@ VersionStorageInfo::VersionStorageInfo( Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, const FileOptions& file_opt, const MutableCFOptions mutable_cf_options, + const std::shared_ptr& io_tracer, uint64_t version_number) : env_(vset->env_), cfd_(column_family_data), @@ -1733,6 +1766,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, db_statistics_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->statistics), table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()), + blob_file_cache_(cfd_ ? cfd_->blob_file_cache() : nullptr), merge_operator_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->merge_operator), storage_info_( @@ -1751,10 +1785,62 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, refs_(0), file_options_(file_opt), mutable_cf_options_(mutable_cf_options), - version_number_(version_number) {} + max_file_size_for_l0_meta_pin_( + MaxFileSizeForL0MetaPin(mutable_cf_options_)), + version_number_(version_number), + io_tracer_(io_tracer) {} + +Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, + PinnableSlice* value) const { + assert(value); + + if (read_options.read_tier == kBlockCacheTier) { + return Status::Incomplete("Cannot read blob: no disk I/O allowed"); + } + + BlobIndex blob_index; + + { + Status s = blob_index.DecodeFrom(*value); + if (!s.ok()) { + return s; + } + } + + if (blob_index.HasTTL() || blob_index.IsInlined()) { + return Status::Corruption("Unexpected TTL/inlined blob index"); + } + + const auto& blob_files = storage_info_.GetBlobFiles(); + + const uint64_t blob_file_number = blob_index.file_number(); + + const auto it = blob_files.find(blob_file_number); + if (it == blob_files.end()) { + return Status::Corruption("Invalid blob file number"); + } + + CacheHandleGuard blob_file_reader; + + { + assert(blob_file_cache_); + const Status s = blob_file_cache_->GetBlobFileReader(blob_file_number, + &blob_file_reader); + if (!s.ok()) { + return s; + } + } + + assert(blob_file_reader.GetValue()); + const Status s = blob_file_reader.GetValue()->GetBlob( + read_options, user_key, blob_index.offset(), blob_index.size(), + blob_index.compression(), value); + + return s; +} void Version::Get(const ReadOptions& read_options, const LookupKey& k, - PinnableSlice* value, Status* status, + PinnableSlice* value, std::string* timestamp, Status* status, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, bool* value_found, bool* key_exists, SequenceNumber* seq, ReadCallback* callback, @@ -1775,12 +1861,19 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, vset_->block_cache_tracer_->is_tracing_enabled()) { tracing_get_id = vset_->block_cache_tracer_->NextGetId(); } + + // Note: the old StackableDB-based BlobDB passes in + // GetImplOptions::is_blob_index; for the integrated BlobDB implementation, we + // need to provide it here. + bool is_blob_index = false; + bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index; + GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, - do_merge ? value : nullptr, value_found, merge_context, do_merge, - max_covering_tombstone_seq, this->env_, seq, - merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, + do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found, + merge_context, do_merge, max_covering_tombstone_seq, this->env_, seq, + merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob_to_use, tracing_get_id); // Pin blocks that we read to hold merge operands @@ -1814,11 +1907,11 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), IsFilterSkipped(static_cast(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()), - fp.GetCurrentLevel()); + fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_); // TODO: examine the behavior for corrupted key if (timer_enabled) { PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), - fp.GetCurrentLevel()); + fp.GetHitFileLevel()); } if (!status->ok()) { return; @@ -1838,6 +1931,18 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, // TODO: update per-level perfcontext user_key_return_count for kMerge break; case GetContext::kFound: + if (is_blob_index) { + if (do_merge && value) { + *status = GetBlob(read_options, user_key, value); + if (!status->ok()) { + if (status->IsIncomplete()) { + get_context.MarkKeyMayExist(); + } + return; + } + } + } + if (fp.GetHitFileLevel() == 0) { RecordTick(db_statistics_, GET_HIT_L0); } else if (fp.GetHitFileLevel() == 1) { @@ -1855,7 +1960,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, case GetContext::kCorrupt: *status = Status::Corruption("corrupted key for ", user_key); return; - case GetContext::kBlobIndex: + case GetContext::kUnexpectedBlobIndex: ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); *status = Status::NotSupported( "Encounter unexpected blob index. Please open DB with " @@ -1917,11 +2022,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, assert(iter->s->ok() || iter->s->IsMergeInProgress()); get_ctx.emplace_back( user_comparator(), merge_operator_, info_log_, db_statistics_, - iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey, - iter->value, nullptr, &(iter->merge_context), true, - &iter->max_covering_tombstone_seq, this->env_, nullptr, - merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, - tracing_mget_id); + iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, + iter->ukey_with_ts, iter->value, iter->timestamp, nullptr, + &(iter->merge_context), true, &iter->max_covering_tombstone_seq, + this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, + callback, is_blob, tracing_mget_id); // MergeInProgress status, if set, has been transferred to the get_context // state, so we set status to ok here. From now on, the iter status will // be used for IO errors, and get_context state will be used for any @@ -1940,6 +2045,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_, user_comparator(), internal_comparator()); FdWithKeyRange* f = fp.GetNextFile(); + Status s; + uint64_t num_index_read = 0; + uint64_t num_filter_read = 0; + uint64_t num_data_read = 0; + uint64_t num_sst_read = 0; while (f != nullptr) { MultiGetRange file_range = fp.CurrentFileRange(); @@ -1947,17 +2057,17 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && get_perf_context()->per_level_perf_context_enabled; StopWatchNano timer(env_, timer_enabled /* auto_start */); - Status s = table_cache_->MultiGet( + s = table_cache_->MultiGet( read_options, *internal_comparator(), *f->file_metadata, &file_range, mutable_cf_options_.prefix_extractor.get(), cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), IsFilterSkipped(static_cast(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()), - fp.GetCurrentLevel()); + fp.GetHitFileLevel()); // TODO: examine the behavior for corrupted key if (timer_enabled) { PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), - fp.GetCurrentLevel()); + fp.GetHitFileLevel()); } if (!s.ok()) { // TODO: Set status for individual keys appropriately @@ -1968,7 +2078,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, return; } uint64_t batch_size = 0; - for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { + for (auto iter = file_range.begin(); s.ok() && iter != file_range.end(); + ++iter) { GetContext& get_context = *iter->get_context; Status* status = iter->s; // The Status in the KeyContext takes precedence over GetContext state @@ -1985,6 +2096,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, sample_file_read_inc(f->file_metadata); } batch_size++; + num_index_read += get_context.get_context_stats_.num_index_read; + num_filter_read += get_context.get_context_stats_.num_filter_read; + num_data_read += get_context.get_context_stats_.num_data_read; + num_sst_read += get_context.get_context_stats_.num_sst_read; + // report the counters before returning if (get_context.State() != GetContext::kNotFound && get_context.State() != GetContext::kMerge && @@ -2014,7 +2130,12 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, } PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel()); + file_range.AddValueSize(iter->value->size()); file_range.MarkKeyDone(iter); + if (file_range.GetValueSize() > read_options.value_size_soft_limit) { + s = Status::Aborted(); + break; + } continue; case GetContext::kDeleted: // Use empty error message for speed @@ -2026,7 +2147,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, Status::Corruption("corrupted key for ", iter->lkey->user_key()); file_range.MarkKeyDone(iter); continue; - case GetContext::kBlobIndex: + case GetContext::kUnexpectedBlobIndex: ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); *status = Status::NotSupported( "Encounter unexpected blob index. Please open DB with " @@ -2035,15 +2156,32 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, continue; } } + + // Report MultiGet stats per level. + if (fp.IsHitFileLastInLevel()) { + // Dump the stats if this is the last file of this level and reset for + // next level. + RecordInHistogram(db_statistics_, + NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, + num_index_read + num_filter_read); + RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL, + num_data_read); + RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read); + num_filter_read = 0; + num_index_read = 0; + num_data_read = 0; + num_sst_read = 0; + } + RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size); - if (file_picker_range.empty()) { + if (!s.ok() || file_picker_range.empty()) { break; } f = fp.GetNextFile(); } // Process any left over keys - for (auto iter = range->begin(); iter != range->end(); ++iter) { + for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) { GetContext& get_context = *iter->get_context; Status* status = iter->s; Slice user_key = iter->lkey->user_key(); @@ -2068,12 +2206,23 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, nullptr /* result_operand */, true); if (LIKELY(iter->value != nullptr)) { iter->value->PinSelf(); + range->AddValueSize(iter->value->size()); + range->MarkKeyDone(iter); + if (range->GetValueSize() > read_options.value_size_soft_limit) { + s = Status::Aborted(); + break; + } } } else { range->MarkKeyDone(iter); *status = Status::NotFound(); // Use an empty error message for speed } } + + for (auto iter = range->begin(); iter != range->end(); ++iter) { + range->MarkKeyDone(iter); + *(iter->s) = s; + } } bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) { @@ -2095,6 +2244,9 @@ void VersionStorageInfo::GenerateLevelFilesBrief() { void Version::PrepareApply( const MutableCFOptions& mutable_cf_options, bool update_stats) { + TEST_SYNC_POINT_CALLBACK( + "Version::PrepareApply:forced_check", + reinterpret_cast(&storage_info_.force_consistency_checks_)); UpdateAccumulatedStats(update_stats); storage_info_.UpdateNumNonEmptyLevels(); storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options); @@ -2396,6 +2548,11 @@ void VersionStorageInfo::ComputeCompactionScore( // compaction score for the whole DB. Adding other levels as if // they are L0 files. for (int i = 1; i < num_levels(); i++) { + // Its possible that a subset of the files in a level may be in a + // compaction, due to delete triggered compaction or trivial move. + // In that case, the below check may not catch a level being + // compacted as it only checks the first file. The worst that can + // happen is a scheduled compaction thread will find nothing to do. if (!files_[i].empty() && !files_[i][0]->being_compacted) { num_sorted_runs++; } @@ -2425,9 +2582,21 @@ void VersionStorageInfo::ComputeCompactionScore( // Level-based involves L0->L0 compactions that can lead to oversized // L0 files. Take into account size as well to avoid later giant // compactions to the base level. - score = std::max( - score, static_cast(total_size) / - mutable_cf_options.max_bytes_for_level_base); + uint64_t l0_target_size = mutable_cf_options.max_bytes_for_level_base; + if (immutable_cf_options.level_compaction_dynamic_level_bytes && + level_multiplier_ != 0.0) { + // Prevent L0 to Lbase fanout from growing larger than + // `level_multiplier_`. This prevents us from getting stuck picking + // L0 forever even when it is hurting write-amp. That could happen + // in dynamic level compaction's write-burst mode where the base + // level's target size can grow to be enormous. + l0_target_size = + std::max(l0_target_size, + static_cast(level_max_bytes_[base_level_] / + level_multiplier_)); + } + score = + std::max(score, static_cast(total_size) / l0_target_size); } } } else { @@ -2593,31 +2762,30 @@ bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) { } } // anonymous namespace -void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) { - auto* level_files = &files_[level]; - // Must not overlap -#ifndef NDEBUG - if (level > 0 && !level_files->empty() && - internal_comparator_->Compare( - (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) { - auto* f2 = (*level_files)[level_files->size() - 1]; - if (info_log != nullptr) { - Error(info_log, "Adding new file %" PRIu64 - " range (%s, %s) to level %d but overlapping " - "with existing file %" PRIu64 " %s %s", - f->fd.GetNumber(), f->smallest.DebugString(true).c_str(), - f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(), - f2->smallest.DebugString(true).c_str(), - f2->largest.DebugString(true).c_str()); - LogFlush(info_log); - } - assert(false); - } -#else - (void)info_log; -#endif +void VersionStorageInfo::AddFile(int level, FileMetaData* f) { + auto& level_files = files_[level]; + level_files.push_back(f); + f->refs++; - level_files->push_back(f); + + const uint64_t file_number = f->fd.GetNumber(); + + assert(file_locations_.find(file_number) == file_locations_.end()); + file_locations_.emplace(file_number, + FileLocation(level, level_files.size() - 1)); +} + +void VersionStorageInfo::AddBlobFile( + std::shared_ptr blob_file_meta) { + assert(blob_file_meta); + + const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber(); + + auto it = blob_files_.lower_bound(blob_file_number); + assert(it == blob_files_.end() || it->first != blob_file_number); + + blob_files_.insert( + it, BlobFiles::value_type(blob_file_number, std::move(blob_file_meta))); } // Version::PrepareApply() need to be called before calling the function, or @@ -3345,22 +3513,23 @@ void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions, } uint64_t VersionStorageInfo::EstimateLiveDataSize() const { - // Estimate the live data size by adding up the size of the last level for all - // key ranges. Note: Estimate depends on the ordering of files in level 0 - // because files in level 0 can be overlapping. + // Estimate the live data size by adding up the size of a maximal set of + // sst files with no range overlap in same or higher level. The less + // compacted, the more optimistic (smaller) this estimate is. Also, + // for multiple sorted runs within a level, file order will matter. uint64_t size = 0; auto ikey_lt = [this](InternalKey* x, InternalKey* y) { return internal_comparator_->Compare(*x, *y) < 0; }; - // (Ordered) map of largest keys in non-overlapping files + // (Ordered) map of largest keys in files being included in size estimate std::map ranges(ikey_lt); for (int l = num_levels_ - 1; l >= 0; l--) { bool found_end = false; for (auto file : files_[l]) { - // Find the first file where the largest key is larger than the smallest - // key of the current file. If this file does not overlap with the + // Find the first file already included with largest key is larger than + // the smallest key of `file`. If that file does not overlap with the // current file, none of the files in the map does. If there is // no potential overlap, we can safely insert the rest of this level // (if the level is not 0) into the map without checking again because @@ -3409,13 +3578,27 @@ bool VersionStorageInfo::RangeMightExistAfterSortedRun( return false; } -void Version::AddLiveFiles(std::vector* live) { - for (int level = 0; level < storage_info_.num_levels(); level++) { - const std::vector& files = storage_info_.files_[level]; - for (const auto& file : files) { - live->push_back(file->fd); +void Version::AddLiveFiles(std::vector* live_table_files, + std::vector* live_blob_files) const { + assert(live_table_files); + assert(live_blob_files); + + for (int level = 0; level < storage_info_.num_levels(); ++level) { + const auto& level_files = storage_info_.LevelFiles(level); + for (const auto& meta : level_files) { + assert(meta); + + live_table_files->emplace_back(meta->fd.GetNumber()); } } + + const auto& blob_files = storage_info_.GetBlobFiles(); + for (const auto& pair : blob_files) { + const auto& meta = pair.second; + assert(meta); + + live_blob_files->emplace_back(meta->GetBlobFileNumber()); + } } std::string Version::DebugString(bool hex, bool print_stats) const { @@ -3462,6 +3645,21 @@ std::string Version::DebugString(bool hex, bool print_stats) const { r.append("\n"); } } + + const auto& blob_files = storage_info_.GetBlobFiles(); + if (!blob_files.empty()) { + r.append("--- blob files --- version# "); + AppendNumberTo(&r, version_number_); + r.append(" ---\n"); + for (const auto& pair : blob_files) { + const auto& blob_file_meta = pair.second; + assert(blob_file_meta); + + r.append(blob_file_meta->DebugString()); + r.push_back('\n'); + } + } + return r; } @@ -3473,15 +3671,30 @@ struct VersionSet::ManifestWriter { ColumnFamilyData* cfd; const MutableCFOptions mutable_cf_options; const autovector& edit_list; + const std::function manifest_write_callback; - explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd, - const MutableCFOptions& cf_options, - const autovector& e) + explicit ManifestWriter( + InstrumentedMutex* mu, ColumnFamilyData* _cfd, + const MutableCFOptions& cf_options, const autovector& e, + const std::function& manifest_wcb) : done(false), cv(mu), cfd(_cfd), mutable_cf_options(cf_options), - edit_list(e) {} + edit_list(e), + manifest_write_callback(manifest_wcb) {} + ~ManifestWriter() { status.PermitUncheckedError(); } + + bool IsAllWalEdits() const { + bool all_wal_edits = true; + for (const auto& e : edit_list) { + if (!e->IsWalManipulation()) { + all_wal_edits = false; + break; + } + } + return all_wal_edits; + } }; Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) { @@ -3534,12 +3747,15 @@ VersionSet::VersionSet(const std::string& dbname, const FileOptions& storage_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller, - BlockCacheTracer* const block_cache_tracer) - : column_family_set_(new ColumnFamilySet( - dbname, _db_options, storage_options, table_cache, - write_buffer_manager, write_controller, block_cache_tracer)), + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer) + : column_family_set_( + new ColumnFamilySet(dbname, _db_options, storage_options, table_cache, + write_buffer_manager, write_controller, + block_cache_tracer, io_tracer)), + table_cache_(table_cache), env_(_db_options->env), - fs_(_db_options->fs.get()), + fs_(_db_options->fs, io_tracer), dbname_(dbname), db_options_(_db_options), next_file_number_(2), @@ -3553,21 +3769,49 @@ VersionSet::VersionSet(const std::string& dbname, current_version_number_(0), manifest_file_size_(0), file_options_(storage_options), - block_cache_tracer_(block_cache_tracer) {} + block_cache_tracer_(block_cache_tracer), + io_tracer_(io_tracer) {} VersionSet::~VersionSet() { // we need to delete column_family_set_ because its destructor depends on // VersionSet - Cache* table_cache = column_family_set_->get_table_cache(); column_family_set_.reset(); for (auto& file : obsolete_files_) { if (file.metadata->table_reader_handle) { - table_cache->Release(file.metadata->table_reader_handle); - TableCache::Evict(table_cache, file.metadata->fd.GetNumber()); + table_cache_->Release(file.metadata->table_reader_handle); + TableCache::Evict(table_cache_, file.metadata->fd.GetNumber()); } file.DeleteMetadata(); } obsolete_files_.clear(); + io_status_.PermitUncheckedError(); +} + +void VersionSet::Reset() { + if (column_family_set_) { + WriteBufferManager* wbm = column_family_set_->write_buffer_manager(); + WriteController* wc = column_family_set_->write_controller(); + column_family_set_.reset( + new ColumnFamilySet(dbname_, db_options_, file_options_, table_cache_, + wbm, wc, block_cache_tracer_, io_tracer_)); + } + db_id_.clear(); + next_file_number_.store(2); + min_log_number_to_keep_2pc_.store(0); + manifest_file_number_ = 0; + options_file_number_ = 0; + pending_manifest_file_number_ = 0; + last_sequence_.store(0); + last_allocated_sequence_.store(0); + last_published_sequence_.store(0); + prev_log_number_ = 0; + descriptor_log_.reset(); + current_version_number_ = 0; + manifest_writers_.clear(); + manifest_file_size_ = 0; + obsolete_files_.clear(); + obsolete_manifests_.clear(); + wals_.Reset(); } void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, @@ -3600,8 +3844,9 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Status VersionSet::ProcessManifestWrites( std::deque& writers, InstrumentedMutex* mu, - Directory* db_directory, bool new_descriptor_log, + FSDirectory* db_directory, bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options) { + mu->AssertHeld(); assert(!writers.empty()); ManifestWriter& first_writer = writers.front(); ManifestWriter* last_writer = &first_writer; @@ -3678,16 +3923,22 @@ Status VersionSet::ProcessManifestWrites( } } if (version == nullptr) { - version = new Version(last_writer->cfd, this, file_options_, - last_writer->mutable_cf_options, - current_version_number_++); - versions.push_back(version); - mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options); - builder_guards.emplace_back( - new BaseReferencedVersionBuilder(last_writer->cfd)); - builder = builder_guards.back()->version_builder(); - } - assert(builder != nullptr); // make checker happy + // WAL manipulations do not need to be applied to versions. + if (!last_writer->IsAllWalEdits()) { + version = new Version(last_writer->cfd, this, file_options_, + last_writer->mutable_cf_options, io_tracer_, + current_version_number_++); + versions.push_back(version); + mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options); + builder_guards.emplace_back( + new BaseReferencedVersionBuilder(last_writer->cfd)); + builder = builder_guards.back()->version_builder(); + } + assert(last_writer->IsAllWalEdits() || builder); + assert(last_writer->IsAllWalEdits() || version); + TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion", + version); + } for (const auto& e : last_writer->edit_list) { if (e->is_in_atomic_group_) { if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ || @@ -3760,9 +4011,6 @@ Status VersionSet::ProcessManifestWrites( } #endif // NDEBUG - uint64_t new_manifest_file_size = 0; - Status s; - assert(pending_manifest_file_number_ == 0); if (!descriptor_log_ || manifest_file_size_ > db_options_->max_manifest_file_size) { @@ -3776,6 +4024,7 @@ Status VersionSet::ProcessManifestWrites( // reads its content after releasing db mutex to avoid race with // SwitchMemtable(). std::unordered_map curr_state; + VersionEdit wal_additions; if (new_descriptor_log) { pending_manifest_file_number_ = NewFileNumber(); batch_edits.back()->SetNextFile(next_file_number_.load()); @@ -3790,13 +4039,20 @@ Status VersionSet::ProcessManifestWrites( assert(curr_state.find(cfd->GetID()) == curr_state.end()); curr_state[cfd->GetID()] = {cfd->GetLogNumber()}; } + + for (const auto& wal : wals_.GetWals()) { + wal_additions.AddWal(wal.first, wal.second); + } } + uint64_t new_manifest_file_size = 0; + Status s; + IOStatus io_s; { FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_); mu->Unlock(); - TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); + TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr); if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) { for (int i = 0; i < static_cast(versions.size()); ++i) { assert(!builder_guards.empty() && @@ -3805,10 +4061,11 @@ Status VersionSet::ProcessManifestWrites( builder_guards.size() == versions.size()); ColumnFamilyData* cfd = versions[i]->cfd_; s = builder_guards[i]->version_builder()->LoadTableHandlers( - cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits, + cfd->internal_stats(), 1 /* max_threads */, true /* prefetch_index_and_filter_in_cache */, false /* is_initial_load */, - mutable_cf_options_ptrs[i]->prefix_extractor.get()); + mutable_cf_options_ptrs[i]->prefix_extractor.get(), + MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i])); if (!s.ok()) { if (db_options_->paranoid_checks) { break; @@ -3827,18 +4084,21 @@ Status VersionSet::ProcessManifestWrites( std::string descriptor_fname = DescriptorFileName(dbname_, pending_manifest_file_number_); std::unique_ptr descriptor_file; - s = NewWritableFile(fs_, descriptor_fname, &descriptor_file, - opt_file_opts); - if (s.ok()) { + io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file, + opt_file_opts); + if (io_s.ok()) { descriptor_file->SetPreallocationBlockSize( db_options_->manifest_preallocation_size); std::unique_ptr file_writer(new WritableFileWriter( std::move(descriptor_file), descriptor_fname, opt_file_opts, env_, - nullptr, db_options_->listeners)); + io_tracer_, nullptr, db_options_->listeners)); descriptor_log_.reset( new log::Writer(std::move(file_writer), 0, false)); - s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get()); + s = WriteCurrentStateToManifest(curr_state, wal_additions, + descriptor_log_.get(), io_s); + } else { + s = io_s; } } @@ -3872,15 +4132,19 @@ Status VersionSet::ProcessManifestWrites( } ++idx; #endif /* !NDEBUG */ - s = descriptor_log_->AddRecord(record); - if (!s.ok()) { + io_s = descriptor_log_->AddRecord(record); + if (!io_s.ok()) { + s = io_s; break; } } if (s.ok()) { - s = SyncManifest(env_, db_options_, descriptor_log_->file()); + io_s = SyncManifest(env_, db_options_, descriptor_log_->file()); + TEST_SYNC_POINT_CALLBACK( + "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s); } - if (!s.ok()) { + if (!io_s.ok()) { + s = io_s; ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n", s.ToString().c_str()); } @@ -3889,8 +4153,11 @@ Status VersionSet::ProcessManifestWrites( // If we just created a new descriptor file, install it by writing a // new CURRENT file that points to it. if (s.ok() && new_descriptor_log) { - s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_, - db_directory); + io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_, + db_directory); + if (!io_s.ok()) { + s = io_s; + } TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest"); } @@ -3910,6 +4177,28 @@ Status VersionSet::ProcessManifestWrites( mu->Lock(); } + if (s.ok()) { + // Apply WAL edits, DB mutex must be held. + for (auto& e : batch_edits) { + if (e->IsWalAddition()) { + s = wals_.AddWals(e->GetWalAdditions()); + } else if (e->IsWalDeletion()) { + s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber()); + } + if (!s.ok()) { + break; + } + } + } + + if (!io_s.ok()) { + if (io_status_.ok()) { + io_status_ = io_s; + } + } else if (!io_status_.ok()) { + io_status_ = io_s; + } + // Append the old manifest file to the obsolete_manifest_ list to be deleted // by PurgeObsoleteFiles later. if (s.ok() && new_descriptor_log) { @@ -3986,9 +4275,15 @@ Status VersionSet::ProcessManifestWrites( ROCKS_LOG_INFO(db_options_->info_log, "Deleting manifest %" PRIu64 " current manifest %" PRIu64 "\n", - manifest_file_number_, pending_manifest_file_number_); - env_->DeleteFile( + pending_manifest_file_number_, manifest_file_number_); + Status manifest_del_status = env_->DeleteFile( DescriptorFileName(dbname_, pending_manifest_file_number_)); + if (!manifest_del_status.ok()) { + ROCKS_LOG_WARN(db_options_->info_log, + "Failed to delete manifest %" PRIu64 ": %s", + pending_manifest_file_number_, + manifest_del_status.ToString().c_str()); + } } } @@ -4007,6 +4302,9 @@ Status VersionSet::ProcessManifestWrites( } ready->status = s; ready->done = true; + if (ready->manifest_write_callback) { + (ready->manifest_write_callback)(s); + } if (need_signal) { ready->cv.Signal(); } @@ -4026,8 +4324,9 @@ Status VersionSet::LogAndApply( const autovector& column_family_datas, const autovector& mutable_cf_options_list, const autovector>& edit_lists, - InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log, - const ColumnFamilyOptions* new_cf_options) { + InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log, + const ColumnFamilyOptions* new_cf_options, + const std::vector>& manifest_wcbs) { mu->AssertHeld(); int num_edits = 0; for (const auto& elist : edit_lists) { @@ -4057,12 +4356,16 @@ Status VersionSet::LogAndApply( assert(static_cast(num_cfds) == edit_lists.size()); } for (int i = 0; i < num_cfds; ++i) { + const auto wcb = + manifest_wcbs.empty() ? [](const Status&) {} : manifest_wcbs[i]; writers.emplace_back(mu, column_family_datas[i], - *mutable_cf_options_list[i], edit_lists[i]); + *mutable_cf_options_list[i], edit_lists[i], wcb); manifest_writers_.push_back(&writers[i]); } assert(!writers.empty()); ManifestWriter& first_writer = writers.front(); + TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting", + nullptr); while (!first_writer.done && &first_writer != manifest_writers_.front()) { first_writer.cv.Wait(); } @@ -4074,6 +4377,7 @@ Status VersionSet::LogAndApply( for (const auto& writer : writers) { assert(writer.done); } + TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu); #endif /* !NDEBUG */ return first_writer.status; } @@ -4143,9 +4447,11 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_ : last_sequence_); - Status s = builder->Apply(edit); - - return s; + // The builder can be nullptr only if edit is WAL manipulation, + // because WAL edits do not need to be applied to versions, + // we return Status::OK() in this case. + assert(builder || edit->IsWalManipulation()); + return builder ? builder->Apply(edit) : Status::OK(); } Status VersionSet::ApplyOneVersionEditToBuilder( @@ -4219,6 +4525,16 @@ Status VersionSet::ApplyOneVersionEditToBuilder( return Status::Corruption( "Manifest - dropping non-existing column family"); } + } else if (edit.IsWalAddition()) { + Status s = wals_.AddWals(edit.GetWalAdditions()); + if (!s.ok()) { + return s; + } + } else if (edit.IsWalDeletion()) { + Status s = wals_.DeleteWalsBefore(edit.GetWalDeletion().GetLogNumber()); + if (!s.ok()) { + return s; + } } else if (!cf_in_not_found) { if (!cf_in_builders) { return Status::Corruption( @@ -4319,24 +4635,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname, if (dbname.back() != '/') { manifest_path->push_back('/'); } - *manifest_path += fname; + manifest_path->append(fname); return Status::OK(); } Status VersionSet::ReadAndRecover( - log::Reader* reader, AtomicGroupReadBuffer* read_buffer, + log::Reader& reader, AtomicGroupReadBuffer* read_buffer, const std::unordered_map& name_to_options, std::unordered_map& column_families_not_found, std::unordered_map>& builders, - VersionEditParams* version_edit_params, std::string* db_id) { - assert(reader != nullptr); + Status* log_read_status, VersionEditParams* version_edit_params, + std::string* db_id) { assert(read_buffer != nullptr); + assert(log_read_status != nullptr); Status s; Slice record; std::string scratch; size_t recovered_edits = 0; - while (reader->ReadRecord(&record, &scratch) && s.ok()) { + while (s.ok() && reader.ReadRecord(&record, &scratch) && + log_read_status->ok()) { VersionEdit edit; s = edit.DecodeFrom(record); if (!s.ok()) { @@ -4380,6 +4698,9 @@ Status VersionSet::ReadAndRecover( } } } + if (!log_read_status->ok()) { + s = *log_read_status; + } if (!s.ok()) { // Clear the buffer if we fail to decode/apply an edit. read_buffer->Clear(); @@ -4392,18 +4713,9 @@ Status VersionSet::ReadAndRecover( Status VersionSet::Recover( const std::vector& column_families, bool read_only, std::string* db_id) { - std::unordered_map cf_name_to_options; - for (const auto& cf : column_families) { - cf_name_to_options.emplace(cf.name, cf.options); - } - // keeps track of column families in manifest that were not found in - // column families parameters. if those column families are not dropped - // by subsequent manifest records, Recover() will return failure status - std::unordered_map column_families_not_found; - // Read "CURRENT" file, which contains a pointer to the current manifest file std::string manifest_path; - Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path, + Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path, &manifest_file_number_); if (!s.ok()) { return s; @@ -4423,138 +4735,32 @@ Status VersionSet::Recover( } manifest_file_reader.reset( new SequentialFileReader(std::move(manifest_file), manifest_path, - db_options_->log_readahead_size)); - } - - std::unordered_map> - builders; - - // add default column family - auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); - if (default_cf_iter == cf_name_to_options.end()) { - return Status::InvalidArgument("Default column family not specified"); + db_options_->log_readahead_size, io_tracer_)); } - VersionEdit default_cf_edit; - default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); - default_cf_edit.SetColumnFamily(0); - ColumnFamilyData* default_cfd = - CreateColumnFamily(default_cf_iter->second, &default_cf_edit); - // In recovery, nobody else can access it, so it's fine to set it to be - // initialized earlier. - default_cfd->set_initialized(); - builders.insert( - std::make_pair(0, std::unique_ptr( - new BaseReferencedVersionBuilder(default_cfd)))); uint64_t current_manifest_file_size = 0; - VersionEditParams version_edit_params; + uint64_t log_number = 0; { VersionSet::LogReporter reporter; - reporter.status = &s; + Status log_read_status; + reporter.status = &log_read_status; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, true /* checksum */, 0 /* log_number */); - Slice record; - std::string scratch; - AtomicGroupReadBuffer read_buffer; - s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options, - column_families_not_found, builders, - &version_edit_params, db_id); - current_manifest_file_size = reader.GetReadOffset(); - assert(current_manifest_file_size != 0); - } - - if (s.ok()) { - if (!version_edit_params.has_next_file_number_) { - s = Status::Corruption("no meta-nextfile entry in descriptor"); - } else if (!version_edit_params.has_log_number_) { - s = Status::Corruption("no meta-lognumber entry in descriptor"); - } else if (!version_edit_params.has_last_sequence_) { - s = Status::Corruption("no last-sequence-number entry in descriptor"); - } - - if (!version_edit_params.has_prev_log_number_) { - version_edit_params.SetPrevLogNumber(0); - } - - column_family_set_->UpdateMaxColumnFamily( - version_edit_params.max_column_family_); - - // When reading DB generated using old release, min_log_number_to_keep=0. - // All log files will be scanned for potential prepare entries. - MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_); - MarkFileNumberUsed(version_edit_params.prev_log_number_); - MarkFileNumberUsed(version_edit_params.log_number_); - } - - // there were some column families in the MANIFEST that weren't specified - // in the argument. This is OK in read_only mode - if (read_only == false && !column_families_not_found.empty()) { - std::string list_of_not_found; - for (const auto& cf : column_families_not_found) { - list_of_not_found += ", " + cf.second; - } - list_of_not_found = list_of_not_found.substr(2); - s = Status::InvalidArgument( - "You have to open all column families. Column families not opened: " + - list_of_not_found); - } - - if (s.ok()) { - for (auto cfd : *column_family_set_) { - assert(builders.count(cfd->GetID()) > 0); - auto* builder = builders[cfd->GetID()]->version_builder(); - if (!builder->CheckConsistencyForNumLevels()) { - s = Status::InvalidArgument( - "db has more levels than options.num_levels"); - break; - } + VersionEditHandler handler( + read_only, column_families, const_cast(this), + /*track_missing_files=*/false, + /*no_error_if_table_files_missing=*/false, io_tracer_); + handler.Iterate(reader, &log_read_status); + s = handler.status(); + if (s.ok()) { + log_number = handler.GetVersionEditParams().log_number_; + current_manifest_file_size = reader.GetReadOffset(); + assert(current_manifest_file_size != 0); + handler.GetDbId(db_id); } } if (s.ok()) { - for (auto cfd : *column_family_set_) { - if (cfd->IsDropped()) { - continue; - } - if (read_only) { - cfd->table_cache()->SetTablesAreImmortal(); - } - assert(cfd->initialized()); - auto builders_iter = builders.find(cfd->GetID()); - assert(builders_iter != builders.end()); - auto builder = builders_iter->second->version_builder(); - - // unlimited table cache. Pre-load table handle now. - // Need to do it out of the mutex. - s = builder->LoadTableHandlers( - cfd->internal_stats(), db_options_->max_file_opening_threads, - false /* prefetch_index_and_filter_in_cache */, - true /* is_initial_load */, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); - if (!s.ok()) { - if (db_options_->paranoid_checks) { - return s; - } - s = Status::OK(); - } - - Version* v = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), - current_version_number_++); - builder->SaveTo(v->storage_info()); - - // Install recovered version - v->PrepareApply(*cfd->GetLatestMutableCFOptions(), - !(db_options_->skip_stats_update_on_db_open)); - AppendVersion(cfd, v); - } - manifest_file_size_ = current_manifest_file_size; - next_file_number_.store(version_edit_params.next_file_number_ + 1); - last_allocated_sequence_ = version_edit_params.last_sequence_; - last_published_sequence_ = version_edit_params.last_sequence_; - last_sequence_ = version_edit_params.last_sequence_; - prev_log_number_ = version_edit_params.prev_log_number_; - ROCKS_LOG_INFO( db_options_->info_log, "Recovered from manifest file:%s succeeded," @@ -4563,9 +4769,8 @@ Status VersionSet::Recover( ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32 ",min_log_number_to_keep is %" PRIu64 "\n", manifest_path.c_str(), manifest_file_number_, next_file_number_.load(), - last_sequence_.load(), version_edit_params.log_number_, - prev_log_number_, column_family_set_->GetMaxColumnFamily(), - min_log_number_to_keep_2pc()); + last_sequence_.load(), log_number, prev_log_number_, + column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); for (auto cfd : *column_family_set_) { if (cfd->IsDropped()) { @@ -4581,6 +4786,148 @@ Status VersionSet::Recover( return s; } +namespace { +class ManifestPicker { + public: + explicit ManifestPicker(const std::string& dbname, + const std::vector& files_in_dbname); + // REQUIRES Valid() == true + std::string GetNextManifest(uint64_t* file_number, std::string* file_name); + bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); } + + private: + const std::string& dbname_; + // MANIFEST file names(s) + std::vector manifest_files_; + std::vector::const_iterator manifest_file_iter_; +}; + +ManifestPicker::ManifestPicker(const std::string& dbname, + const std::vector& files_in_dbname) + : dbname_(dbname) { + // populate manifest files + assert(!files_in_dbname.empty()); + for (const auto& fname : files_in_dbname) { + uint64_t file_num = 0; + FileType file_type; + bool parse_ok = ParseFileName(fname, &file_num, &file_type); + if (parse_ok && file_type == kDescriptorFile) { + manifest_files_.push_back(fname); + } + } + // seek to first manifest + std::sort(manifest_files_.begin(), manifest_files_.end(), + [](const std::string& lhs, const std::string& rhs) { + uint64_t num1 = 0; + uint64_t num2 = 0; + FileType type1; + FileType type2; + bool parse_ok1 = ParseFileName(lhs, &num1, &type1); + bool parse_ok2 = ParseFileName(rhs, &num2, &type2); +#ifndef NDEBUG + assert(parse_ok1); + assert(parse_ok2); +#else + (void)parse_ok1; + (void)parse_ok2; +#endif + return num1 > num2; + }); + manifest_file_iter_ = manifest_files_.begin(); +} + +std::string ManifestPicker::GetNextManifest(uint64_t* number, + std::string* file_name) { + assert(Valid()); + std::string ret; + if (manifest_file_iter_ != manifest_files_.end()) { + ret.assign(dbname_); + if (ret.back() != kFilePathSeparator) { + ret.push_back(kFilePathSeparator); + } + ret.append(*manifest_file_iter_); + if (number) { + FileType type; + bool parse = ParseFileName(*manifest_file_iter_, number, &type); + assert(type == kDescriptorFile); +#ifndef NDEBUG + assert(parse); +#else + (void)parse; +#endif + } + if (file_name) { + *file_name = *manifest_file_iter_; + } + ++manifest_file_iter_; + } + return ret; +} +} // namespace + +Status VersionSet::TryRecover( + const std::vector& column_families, bool read_only, + const std::vector& files_in_dbname, std::string* db_id, + bool* has_missing_table_file) { + ManifestPicker manifest_picker(dbname_, files_in_dbname); + if (!manifest_picker.Valid()) { + return Status::Corruption("Cannot locate MANIFEST file in " + dbname_); + } + Status s; + std::string manifest_path = + manifest_picker.GetNextManifest(&manifest_file_number_, nullptr); + while (!manifest_path.empty()) { + s = TryRecoverFromOneManifest(manifest_path, column_families, read_only, + db_id, has_missing_table_file); + if (s.ok() || !manifest_picker.Valid()) { + break; + } + Reset(); + manifest_path = + manifest_picker.GetNextManifest(&manifest_file_number_, nullptr); + } + return s; +} + +Status VersionSet::TryRecoverFromOneManifest( + const std::string& manifest_path, + const std::vector& column_families, bool read_only, + std::string* db_id, bool* has_missing_table_file) { + ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n", + manifest_path.c_str()); + std::unique_ptr manifest_file_reader; + Status s; + { + std::unique_ptr manifest_file; + s = fs_->NewSequentialFile(manifest_path, + fs_->OptimizeForManifestRead(file_options_), + &manifest_file, nullptr); + if (!s.ok()) { + return s; + } + manifest_file_reader.reset( + new SequentialFileReader(std::move(manifest_file), manifest_path, + db_options_->log_readahead_size, io_tracer_)); + } + + assert(s.ok()); + VersionSet::LogReporter reporter; + reporter.status = &s; + log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, + /*checksum=*/true, /*log_num=*/0); + VersionEditHandlerPointInTime handler_pit( + read_only, column_families, const_cast(this), io_tracer_); + + handler_pit.Iterate(reader, &s); + + handler_pit.GetDbId(db_id); + + assert(nullptr != has_missing_table_file); + *has_missing_table_file = handler_pit.HasMissingFiles(); + + return handler_pit.status(); +} + Status VersionSet::ListColumnFamilies(std::vector* column_families, const std::string& dbname, FileSystem* fs) { @@ -4603,51 +4950,27 @@ Status VersionSet::ListColumnFamilies(std::vector* column_families, if (!s.ok()) { return s; } - file_reader.reset(new SequentialFileReader(std::move(file), manifest_path)); + file_reader.reset(new SequentialFileReader(std::move(file), manifest_path, + nullptr /*IOTracer*/)); } - std::map column_family_names; - // default column family is always implicitly there - column_family_names.insert({0, kDefaultColumnFamilyName}); VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, true /* checksum */, 0 /* log_number */); - Slice record; - std::string scratch; - while (reader.ReadRecord(&record, &scratch) && s.ok()) { - VersionEdit edit; - s = edit.DecodeFrom(record); - if (!s.ok()) { - break; - } - if (edit.is_column_family_add_) { - if (column_family_names.find(edit.column_family_) != - column_family_names.end()) { - s = Status::Corruption("Manifest adding the same column family twice"); - break; - } - column_family_names.insert( - {edit.column_family_, edit.column_family_name_}); - } else if (edit.is_column_family_drop_) { - if (column_family_names.find(edit.column_family_) == - column_family_names.end()) { - s = Status::Corruption( - "Manifest - dropping non-existing column family"); - break; - } - column_family_names.erase(edit.column_family_); - } - } + ListColumnFamiliesHandler handler; + handler.Iterate(reader, &s); + + assert(column_families); column_families->clear(); - if (s.ok()) { - for (const auto& iter : column_family_names) { + if (handler.status().ok()) { + for (const auto& iter : handler.GetColumnFamilyNames()) { column_families->push_back(iter.second); } } - return s; + return handler.status(); } #ifndef ROCKSDB_LITE @@ -4667,7 +4990,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, WriteController wc(options->delayed_write_rate); WriteBufferManager wb(options->db_write_buffer_size); VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc, - /*block_cache_tracer=*/nullptr); + nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/); Status status; std::vector dummy; @@ -4720,7 +5043,19 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, } if (first_nonempty_level > 0) { - new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level); + auto& new_last_level = new_files_list[new_levels - 1]; + + new_last_level = vstorage->LevelFiles(first_nonempty_level); + + for (size_t i = 0; i < new_last_level.size(); ++i) { + const FileMetaData* const meta = new_last_level[i]; + assert(meta); + + const uint64_t file_number = meta->fd.GetNumber(); + + vstorage->file_locations_[file_number] = + VersionStorageInfo::FileLocation(new_levels - 1, i); + } } delete[] vstorage -> files_; @@ -4743,8 +5078,10 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, // metadata from Manifest to VersionSet before calling this function. Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) { // Clean the previously stored checksum information if any. + Status s; if (checksum_list == nullptr) { - return Status::InvalidArgument("checksum_list is nullptr"); + s = Status::InvalidArgument("checksum_list is nullptr"); + return s; } checksum_list->reset(); @@ -4755,13 +5092,22 @@ Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) { for (int level = 0; level < cfd->NumberLevels(); level++) { for (const auto& file : cfd->current()->storage_info()->LevelFiles(level)) { - checksum_list->InsertOneFileChecksum(file->fd.GetNumber(), - file->file_checksum, - file->file_checksum_func_name); + s = checksum_list->InsertOneFileChecksum(file->fd.GetNumber(), + file->file_checksum, + file->file_checksum_func_name); + if (!s.ok()) { + break; + } + } + if (!s.ok()) { + break; } } + if (!s.ok()) { + break; + } } - return Status::OK(); + return s; } Status VersionSet::DumpManifest(Options& options, std::string& dscname, @@ -4771,205 +5117,31 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, Status s; { std::unique_ptr file; - s = options.file_system->NewSequentialFile( + const std::shared_ptr& fs = options.env->GetFileSystem(); + s = fs->NewSequentialFile( dscname, - options.file_system->OptimizeForManifestRead(file_options_), &file, + fs->OptimizeForManifestRead(file_options_), &file, nullptr); if (!s.ok()) { return s; } file_reader.reset(new SequentialFileReader( - std::move(file), dscname, db_options_->log_readahead_size)); + std::move(file), dscname, db_options_->log_readahead_size, io_tracer_)); } - bool have_prev_log_number = false; - bool have_next_file = false; - bool have_last_sequence = false; - uint64_t next_file = 0; - uint64_t last_sequence = 0; - uint64_t previous_log_number = 0; - int count = 0; - std::unordered_map comparators; - std::unordered_map> - builders; - - // add default column family - VersionEdit default_cf_edit; - default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); - default_cf_edit.SetColumnFamily(0); - ColumnFamilyData* default_cfd = - CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit); - builders.insert( - std::make_pair(0, std::unique_ptr( - new BaseReferencedVersionBuilder(default_cfd)))); - + std::vector column_families( + 1, ColumnFamilyDescriptor(kDefaultColumnFamilyName, options)); + DumpManifestHandler handler(column_families, this, io_tracer_, verbose, hex, + json); { VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, true /* checksum */, 0 /* log_number */); - Slice record; - std::string scratch; - while (reader.ReadRecord(&record, &scratch) && s.ok()) { - VersionEdit edit; - s = edit.DecodeFrom(record); - if (!s.ok()) { - break; - } - - // Write out each individual edit - if (verbose && !json) { - printf("%s\n", edit.DebugString(hex).c_str()); - } else if (json) { - printf("%s\n", edit.DebugJSON(count, hex).c_str()); - } - count++; - - bool cf_in_builders = - builders.find(edit.column_family_) != builders.end(); - - if (edit.has_comparator_) { - comparators.insert({edit.column_family_, edit.comparator_}); - } - - ColumnFamilyData* cfd = nullptr; - - if (edit.is_column_family_add_) { - if (cf_in_builders) { - s = Status::Corruption( - "Manifest adding the same column family twice"); - break; - } - cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit); - cfd->set_initialized(); - builders.insert(std::make_pair( - edit.column_family_, std::unique_ptr( - new BaseReferencedVersionBuilder(cfd)))); - } else if (edit.is_column_family_drop_) { - if (!cf_in_builders) { - s = Status::Corruption( - "Manifest - dropping non-existing column family"); - break; - } - auto builder_iter = builders.find(edit.column_family_); - builders.erase(builder_iter); - comparators.erase(edit.column_family_); - cfd = column_family_set_->GetColumnFamily(edit.column_family_); - assert(cfd != nullptr); - cfd->UnrefAndTryDelete(); - cfd = nullptr; - } else { - if (!cf_in_builders) { - s = Status::Corruption( - "Manifest record referencing unknown column family"); - break; - } - - cfd = column_family_set_->GetColumnFamily(edit.column_family_); - // this should never happen since cf_in_builders is true - assert(cfd != nullptr); - - // if it is not column family add or column family drop, - // then it's a file add/delete, which should be forwarded - // to builder - auto builder = builders.find(edit.column_family_); - assert(builder != builders.end()); - s = builder->second->version_builder()->Apply(&edit); - if (!s.ok()) { - break; - } - } - - if (cfd != nullptr && edit.has_log_number_) { - cfd->SetLogNumber(edit.log_number_); - } - - - if (edit.has_prev_log_number_) { - previous_log_number = edit.prev_log_number_; - have_prev_log_number = true; - } - - if (edit.has_next_file_number_) { - next_file = edit.next_file_number_; - have_next_file = true; - } - - if (edit.has_last_sequence_) { - last_sequence = edit.last_sequence_; - have_last_sequence = true; - } - - if (edit.has_max_column_family_) { - column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_); - } - - if (edit.has_min_log_number_to_keep_) { - MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_); - } - } + handler.Iterate(reader, &s); } - file_reader.reset(); - if (s.ok()) { - if (!have_next_file) { - s = Status::Corruption("no meta-nextfile entry in descriptor"); - printf("no meta-nextfile entry in descriptor"); - } else if (!have_last_sequence) { - printf("no last-sequence-number entry in descriptor"); - s = Status::Corruption("no last-sequence-number entry in descriptor"); - } - - if (!have_prev_log_number) { - previous_log_number = 0; - } - } - - if (s.ok()) { - for (auto cfd : *column_family_set_) { - if (cfd->IsDropped()) { - continue; - } - auto builders_iter = builders.find(cfd->GetID()); - assert(builders_iter != builders.end()); - auto builder = builders_iter->second->version_builder(); - - Version* v = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), - current_version_number_++); - builder->SaveTo(v->storage_info()); - v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false); - - printf("--------------- Column family \"%s\" (ID %" PRIu32 - ") --------------\n", - cfd->GetName().c_str(), cfd->GetID()); - printf("log number: %" PRIu64 "\n", cfd->GetLogNumber()); - auto comparator = comparators.find(cfd->GetID()); - if (comparator != comparators.end()) { - printf("comparator: %s\n", comparator->second.c_str()); - } else { - printf("comparator: \n"); - } - printf("%s \n", v->DebugString(hex).c_str()); - delete v; - } - - next_file_number_.store(next_file + 1); - last_allocated_sequence_ = last_sequence; - last_published_sequence_ = last_sequence; - last_sequence_ = last_sequence; - prev_log_number_ = previous_log_number; - - printf("next_file_number %" PRIu64 " last_sequence %" PRIu64 - " prev_log_number %" PRIu64 " max_column_family %" PRIu32 - " min_log_number_to_keep " - "%" PRIu64 "\n", - next_file_number_.load(), last_sequence, previous_log_number, - column_family_set_->GetMaxColumnFamily(), - min_log_number_to_keep_2pc()); - } - - return s; + return handler.status(); } #endif // ROCKSDB_LITE @@ -4990,7 +5162,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { Status VersionSet::WriteCurrentStateToManifest( const std::unordered_map& curr_state, - log::Writer* log) { + const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s) { // TODO: Break up into multiple records to reduce memory usage on recovery? // WARNING: This method doesn't hold a mutex!! @@ -4999,6 +5171,7 @@ Status VersionSet::WriteCurrentStateToManifest( // LogAndApply. Column family manipulations can only happen within LogAndApply // (the same single thread), so we're safe to iterate. + assert(io_s.ok()); if (db_options_->write_dbid_to_manifest) { VersionEdit edit_for_db_id; assert(!db_id_.empty()); @@ -5008,13 +5181,30 @@ Status VersionSet::WriteCurrentStateToManifest( return Status::Corruption("Unable to Encode VersionEdit:" + edit_for_db_id.DebugString(true)); } - Status add_record = log->AddRecord(db_id_record); - if (!add_record.ok()) { - return add_record; + io_s = log->AddRecord(db_id_record); + if (!io_s.ok()) { + return io_s; + } + } + + // Save WALs. + if (!wal_additions.GetWalAdditions().empty()) { + TEST_SYNC_POINT_CALLBACK("VersionSet::WriteCurrentStateToManifest:SaveWal", + const_cast(&wal_additions)); + std::string record; + if (!wal_additions.EncodeTo(&record)) { + return Status::Corruption("Unable to Encode VersionEdit: " + + wal_additions.DebugString(true)); + } + io_s = log->AddRecord(record); + if (!io_s.ok()) { + return io_s; } } for (auto cfd : *column_family_set_) { + assert(cfd); + if (cfd->IsDropped()) { continue; } @@ -5035,9 +5225,9 @@ Status VersionSet::WriteCurrentStateToManifest( return Status::Corruption( "Unable to Encode VersionEdit:" + edit.DebugString(true)); } - Status s = log->AddRecord(record); - if (!s.ok()) { - return s; + io_s = log->AddRecord(record); + if (!io_s.ok()) { + return io_s; } } @@ -5046,6 +5236,9 @@ Status VersionSet::WriteCurrentStateToManifest( VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); + assert(cfd->current()); + assert(cfd->current()->storage_info()); + for (int level = 0; level < cfd->NumberLevels(); level++) { for (const auto& f : cfd->current()->storage_info()->LevelFiles(level)) { @@ -5057,6 +5250,24 @@ Status VersionSet::WriteCurrentStateToManifest( f->file_checksum, f->file_checksum_func_name); } } + + const auto& blob_files = cfd->current()->storage_info()->GetBlobFiles(); + for (const auto& pair : blob_files) { + const uint64_t blob_file_number = pair.first; + const auto& meta = pair.second; + + assert(meta); + assert(blob_file_number == meta->GetBlobFileNumber()); + + edit.AddBlobFile(blob_file_number, meta->GetTotalBlobCount(), + meta->GetTotalBlobBytes(), meta->GetChecksumMethod(), + meta->GetChecksumValue()); + if (meta->GetGarbageBlobCount() > 0) { + edit.AddBlobFileGarbage(blob_file_number, meta->GetGarbageBlobCount(), + meta->GetGarbageBlobBytes()); + } + } + const auto iter = curr_state.find(cfd->GetID()); assert(iter != curr_state.end()); uint64_t log_number = iter->second.log_number; @@ -5066,9 +5277,9 @@ Status VersionSet::WriteCurrentStateToManifest( return Status::Corruption( "Unable to Encode VersionEdit:" + edit.DebugString(true)); } - Status s = log->AddRecord(record); - if (!s.ok()) { - return s; + io_s = log->AddRecord(record); + if (!io_s.ok()) { + return io_s; } } } @@ -5193,7 +5404,8 @@ uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options, static_cast(total_full_size * margin)) { total_full_size += total_intersecting_size / 2; } else { - // Estimate for all the first files, at each level + // Estimate for all the first files (might also be last files), at each + // level for (const auto file_ptr : first_files) { total_full_size += ApproximateSize(v, *file_ptr, start, end, caller); } @@ -5273,61 +5485,79 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f, v->GetMutableCFOptions().prefix_extractor.get()); } -void VersionSet::AddLiveFiles(std::vector* live_list) { +void VersionSet::AddLiveFiles(std::vector* live_table_files, + std::vector* live_blob_files) const { + assert(live_table_files); + assert(live_blob_files); + // pre-calculate space requirement - int64_t total_files = 0; + size_t total_table_files = 0; + size_t total_blob_files = 0; + + assert(column_family_set_); for (auto cfd : *column_family_set_) { + assert(cfd); + if (!cfd->initialized()) { continue; } - Version* dummy_versions = cfd->dummy_versions(); + + Version* const dummy_versions = cfd->dummy_versions(); + assert(dummy_versions); + for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { + assert(v); + const auto* vstorage = v->storage_info(); - for (int level = 0; level < vstorage->num_levels(); level++) { - total_files += vstorage->LevelFiles(level).size(); + assert(vstorage); + + for (int level = 0; level < vstorage->num_levels(); ++level) { + total_table_files += vstorage->LevelFiles(level).size(); } + + total_blob_files += vstorage->GetBlobFiles().size(); } } // just one time extension to the right size - live_list->reserve(live_list->size() + static_cast(total_files)); + live_table_files->reserve(live_table_files->size() + total_table_files); + live_blob_files->reserve(live_blob_files->size() + total_blob_files); + assert(column_family_set_); for (auto cfd : *column_family_set_) { + assert(cfd); if (!cfd->initialized()) { continue; } + auto* current = cfd->current(); bool found_current = false; - Version* dummy_versions = cfd->dummy_versions(); + + Version* const dummy_versions = cfd->dummy_versions(); + assert(dummy_versions); + for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { - v->AddLiveFiles(live_list); + v->AddLiveFiles(live_table_files, live_blob_files); if (v == current) { found_current = true; } } + if (!found_current && current != nullptr) { // Should never happen unless it is a bug. assert(false); - current->AddLiveFiles(live_list); + current->AddLiveFiles(live_table_files, live_blob_files); } } } InternalIterator* VersionSet::MakeInputIterator( - const Compaction* c, RangeDelAggregator* range_del_agg, + const ReadOptions& read_options, const Compaction* c, + RangeDelAggregator* range_del_agg, const FileOptions& file_options_compactions) { auto cfd = c->column_family_data(); - ReadOptions read_options; - read_options.verify_checksums = true; - read_options.fill_cache = false; - // Compaction iterators shouldn't be confined to a single prefix. - // Compactions use Seek() for - // (a) concurrent compactions, - // (b) CompactionFilter::Decision::kRemoveAndSkipUntil. - read_options.total_order_seek = true; - // Level-0 files have to be merged together. For other levels, // we will make a concatenating iterator per level. // TODO(opt): use concatenating iterator for level-0 if there is no overlap @@ -5343,15 +5573,17 @@ InternalIterator* VersionSet::MakeInputIterator( for (size_t i = 0; i < flevel->num_files; i++) { list[num++] = cfd->table_cache()->NewIterator( read_options, file_options_compactions, - cfd->internal_comparator(), - *flevel->files[i].file_metadata, range_del_agg, - c->mutable_cf_options()->prefix_extractor.get(), + cfd->internal_comparator(), *flevel->files[i].file_metadata, + range_del_agg, c->mutable_cf_options()->prefix_extractor.get(), /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction, /*arena=*/nullptr, - /*skip_filters=*/false, /*level=*/static_cast(which), + /*skip_filters=*/false, + /*level=*/static_cast(c->level(which)), + MaxFileSizeForL0MetaPin(*c->mutable_cf_options()), /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr); + /*largest_compaction_key=*/nullptr, + /*allow_unprepared_value=*/false); } } else { // Create concatenating iterator for the files from this level @@ -5362,7 +5594,7 @@ InternalIterator* VersionSet::MakeInputIterator( /*should_sample=*/false, /*no per level latency histogram=*/nullptr, TableReaderCaller::kCompaction, /*skip_filters=*/false, - /*level=*/static_cast(which), range_del_agg, + /*level=*/static_cast(c->level(which)), range_del_agg, c->boundaries(which)); } } @@ -5490,28 +5722,46 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { } void VersionSet::GetObsoleteFiles(std::vector* files, + std::vector* blob_files, std::vector* manifest_filenames, uint64_t min_pending_output) { + assert(files); + assert(blob_files); + assert(manifest_filenames); + assert(files->empty()); + assert(blob_files->empty()); assert(manifest_filenames->empty()); - obsolete_manifests_.swap(*manifest_filenames); + std::vector pending_files; for (auto& f : obsolete_files_) { if (f.metadata->fd.GetNumber() < min_pending_output) { - files->push_back(std::move(f)); + files->emplace_back(std::move(f)); } else { - pending_files.push_back(std::move(f)); + pending_files.emplace_back(std::move(f)); } } obsolete_files_.swap(pending_files); + + std::vector pending_blob_files; + for (auto& blob_file : obsolete_blob_files_) { + if (blob_file.GetBlobFileNumber() < min_pending_output) { + blob_files->emplace_back(std::move(blob_file)); + } else { + pending_blob_files.emplace_back(std::move(blob_file)); + } + } + obsolete_blob_files_.swap(pending_blob_files); + + obsolete_manifests_.swap(*manifest_filenames); } ColumnFamilyData* VersionSet::CreateColumnFamily( - const ColumnFamilyOptions& cf_options, VersionEdit* edit) { + const ColumnFamilyOptions& cf_options, const VersionEdit* edit) { assert(edit->is_column_family_add_); MutableCFOptions dummy_cf_options; Version* dummy_versions = - new Version(nullptr, this, file_options_, dummy_cf_options); + new Version(nullptr, this, file_options_, dummy_cf_options, io_tracer_); // Ref() dummy version once so that later we can call Unref() to delete it // by avoiding calling "delete" explicitly (~Version is private) dummy_versions->Ref(); @@ -5520,7 +5770,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( cf_options); Version* v = new Version(new_cfd, this, file_options_, - *new_cfd->GetLatestMutableCFOptions(), + *new_cfd->GetLatestMutableCFOptions(), io_tracer_, current_version_number_++); // Fill level target base information. @@ -5561,15 +5811,26 @@ uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) { return total_files_size; } -ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname, - const ImmutableDBOptions* _db_options, - const FileOptions& _file_options, - Cache* table_cache, - WriteBufferManager* write_buffer_manager, - WriteController* write_controller) +Status VersionSet::VerifyFileMetadata(const std::string& fpath, + const FileMetaData& meta) const { + uint64_t fsize = 0; + Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr); + if (status.ok()) { + if (fsize != meta.fd.GetFileSize()) { + status = Status::Corruption("File size mismatch: " + fpath); + } + } + return status; +} + +ReactiveVersionSet::ReactiveVersionSet( + const std::string& dbname, const ImmutableDBOptions* _db_options, + const FileOptions& _file_options, Cache* table_cache, + WriteBufferManager* write_buffer_manager, WriteController* write_controller, + const std::shared_ptr& io_tracer) : VersionSet(dbname, _db_options, _file_options, table_cache, write_buffer_manager, write_controller, - /*block_cache_tracer=*/nullptr), + /*block_cache_tracer=*/nullptr, io_tracer), number_of_edits_to_skip_(0) {} ReactiveVersionSet::~ReactiveVersionSet() {} @@ -5601,8 +5862,7 @@ Status ReactiveVersionSet::Recover( // In recovery, nobody else can access it, so it's fine to set it to be // initialized earlier. default_cfd->set_initialized(); - std::unordered_map> - builders; + VersionBuilderMap builders; std::unordered_map column_families_not_found; builders.insert( std::make_pair(0, std::unique_ptr( @@ -5610,7 +5870,7 @@ Status ReactiveVersionSet::Recover( manifest_reader_status->reset(new Status()); manifest_reporter->reset(new LogReporter()); - static_cast(manifest_reporter->get())->status = + static_cast_with_check(manifest_reporter->get())->status = manifest_reader_status->get(); Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader); log::Reader* reader = manifest_reader->get(); @@ -5619,10 +5879,9 @@ Status ReactiveVersionSet::Recover( VersionEdit version_edit; while (s.ok() && retry < 1) { assert(reader != nullptr); - Slice record; - std::string scratch; - s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options, - column_families_not_found, builders, &version_edit); + s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options, + column_families_not_found, builders, + manifest_reader_status->get(), &version_edit); if (s.ok()) { bool enough = version_edit.has_next_file_number_ && version_edit.has_log_number_ && @@ -5649,7 +5908,8 @@ Status ReactiveVersionSet::Recover( cfd->internal_stats(), db_options_->max_file_opening_threads, false /* prefetch_index_and_filter_in_cache */, true /* is_initial_load */, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + cfd->GetLatestMutableCFOptions()->prefix_extractor.get(), + MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions())); if (!s.ok()) { enough = false; if (s.IsPathNotFound()) { @@ -5699,15 +5959,25 @@ Status ReactiveVersionSet::Recover( auto* builder = builders_iter->second->version_builder(); Version* v = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), + *cfd->GetLatestMutableCFOptions(), io_tracer_, current_version_number_++); - builder->SaveTo(v->storage_info()); + s = builder->SaveTo(v->storage_info()); - // Install recovered version - v->PrepareApply(*cfd->GetLatestMutableCFOptions(), - !(db_options_->skip_stats_update_on_db_open)); - AppendVersion(cfd, v); + if (s.ok()) { + // Install recovered version + v->PrepareApply(*cfd->GetLatestMutableCFOptions(), + !(db_options_->skip_stats_update_on_db_open)); + AppendVersion(cfd, v); + } else { + ROCKS_LOG_ERROR(db_options_->info_log, + "[%s]: inconsistent version: %s\n", + cfd->GetName().c_str(), s.ToString().c_str()); + delete v; + break; + } } + } + if (s.ok()) { next_file_number_.store(version_edit.next_file_number_ + 1); last_allocated_sequence_ = version_edit.last_sequence_; last_published_sequence_ = version_edit.last_sequence_; @@ -5784,6 +6054,8 @@ Status ReactiveVersionSet::ReadAndApply( s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit); if (s.ok()) { applied_edits++; + } else { + break; } } } @@ -5793,13 +6065,14 @@ Status ReactiveVersionSet::ReadAndApply( } // It's possible that: // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted. + // Or the version(s) rebuilt from tailing the MANIFEST is inconsistent. // 2) we have finished reading the current MANIFEST. // 3) we have encountered an IOError reading the current MANIFEST. // We need to look for the next MANIFEST and start from there. If we cannot // find the next MANIFEST, we should exit the loop. - s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader); + Status tmp_s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader); reader = manifest_reader->get(); - if (s.ok()) { + if (tmp_s.ok()) { if (reader->file()->file_name() == old_manifest_path) { // Still processing the same MANIFEST, thus no need to continue this // loop since no record is available if we have reached here. @@ -5829,6 +6102,7 @@ Status ReactiveVersionSet::ReadAndApply( number_of_edits_to_skip_ += 2; } } + s = tmp_s; } } } @@ -5911,7 +6185,8 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( cfd->internal_stats(), db_options_->max_file_opening_threads, false /* prefetch_index_and_filter_in_cache */, false /* is_initial_load */, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + cfd->GetLatestMutableCFOptions()->prefix_extractor.get(), + MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions())); TEST_SYNC_POINT_CALLBACK( "ReactiveVersionSet::ApplyOneVersionEditToBuilder:" "AfterLoadTableHandlers", @@ -5919,14 +6194,18 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( if (s.ok()) { auto version = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), + *cfd->GetLatestMutableCFOptions(), io_tracer_, current_version_number_++); - builder->SaveTo(version->storage_info()); - version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true); - AppendVersion(cfd, version); - active_version_builders_.erase(builder_iter); - if (cfds_changed->count(cfd) == 0) { - cfds_changed->insert(cfd); + s = builder->SaveTo(version->storage_info()); + if (s.ok()) { + version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true); + AppendVersion(cfd, version); + active_version_builders_.erase(builder_iter); + if (cfds_changed->count(cfd) == 0) { + cfds_changed->insert(cfd); + } + } else { + delete version; } } else if (s.IsPathNotFound()) { s = Status::OK(); @@ -5934,23 +6213,25 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( // Some other error has occurred during LoadTableHandlers. } - if (version_edit->HasNextFile()) { - next_file_number_.store(version_edit->next_file_number_ + 1); - } - if (version_edit->has_last_sequence_) { - last_allocated_sequence_ = version_edit->last_sequence_; - last_published_sequence_ = version_edit->last_sequence_; - last_sequence_ = version_edit->last_sequence_; - } - if (version_edit->has_prev_log_number_) { - prev_log_number_ = version_edit->prev_log_number_; - MarkFileNumberUsed(version_edit->prev_log_number_); - } - if (version_edit->has_log_number_) { - MarkFileNumberUsed(version_edit->log_number_); + if (s.ok()) { + if (version_edit->HasNextFile()) { + next_file_number_.store(version_edit->next_file_number_ + 1); + } + if (version_edit->has_last_sequence_) { + last_allocated_sequence_ = version_edit->last_sequence_; + last_published_sequence_ = version_edit->last_sequence_; + last_sequence_ = version_edit->last_sequence_; + } + if (version_edit->has_prev_log_number_) { + prev_log_number_ = version_edit->prev_log_number_; + MarkFileNumberUsed(version_edit->prev_log_number_); + } + if (version_edit->has_log_number_) { + MarkFileNumberUsed(version_edit->log_number_); + } + column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_); + MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_); } - column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_); - MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_); return s; } @@ -5961,7 +6242,7 @@ Status ReactiveVersionSet::MaybeSwitchManifest( Status s; do { std::string manifest_path; - s = GetCurrentManifestPath(dbname_, fs_, &manifest_path, + s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path, &manifest_file_number_); std::unique_ptr manifest_file; if (s.ok()) { @@ -5983,9 +6264,9 @@ Status ReactiveVersionSet::MaybeSwitchManifest( } std::unique_ptr manifest_file_reader; if (s.ok()) { - manifest_file_reader.reset( - new SequentialFileReader(std::move(manifest_file), manifest_path, - db_options_->log_readahead_size)); + manifest_file_reader.reset(new SequentialFileReader( + std::move(manifest_file), manifest_path, + db_options_->log_readahead_size, io_tracer_)); manifest_reader->reset(new log::FragmentBufferedReader( nullptr, std::move(manifest_file_reader), reporter, true /* checksum */, 0 /* log_number */));