#include "db/version_set.h"
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
#include <stdio.h>
#include <algorithm>
+#include <array>
+#include <cinttypes>
#include <list>
#include <map>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
-#include "db/compaction.h"
+#include "compaction/compaction.h"
#include "db/internal_stats.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/pinned_iterators_manager.h"
#include "db/table_cache.h"
#include "db/version_builder.h"
+#include "file/filename.h"
+#include "file/random_access_file_reader.h"
+#include "file/read_write_util.h"
+#include "file/writable_file_writer.h"
#include "monitoring/file_read_sample.h"
#include "monitoring/perf_context_imp.h"
+#include "monitoring/persistent_stats_history.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/internal_iterator.h"
#include "table/merging_iterator.h"
#include "table/meta_blocks.h"
-#include "table/plain_table_factory.h"
+#include "table/multiget_context.h"
+#include "table/plain/plain_table_factory.h"
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
+#include "test_util/sync_point.h"
#include "util/coding.h"
-#include "util/file_reader_writer.h"
-#include "util/filename.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
-#include "util/sync_point.h"
#include "util/user_comparator_wrapper.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
namespace {
return Status::Corruption("DB have corrupted keys");
}
- if (ucmp->Compare(seek_result.user_key, largest_user_key) <= 0) {
+ if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
+ 0) {
*overlap = true;
}
}
// Check if key is within a file's range. If search left bound and
// right bound point to the same find, we are sure key falls in
// range.
- assert(
- curr_level_ == 0 ||
- curr_index_in_curr_level_ == start_index_in_curr_level_ ||
- user_comparator_->Compare(user_key_,
- ExtractUserKey(f->smallest_key)) <= 0);
-
- int cmp_smallest = user_comparator_->Compare(user_key_,
- ExtractUserKey(f->smallest_key));
+ assert(curr_level_ == 0 ||
+ curr_index_in_curr_level_ == start_index_in_curr_level_ ||
+ user_comparator_->CompareWithoutTimestamp(
+ user_key_, ExtractUserKey(f->smallest_key)) <= 0);
+
+ int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
+ user_key_, ExtractUserKey(f->smallest_key));
if (cmp_smallest >= 0) {
- cmp_largest = user_comparator_->Compare(user_key_,
- ExtractUserKey(f->largest_key));
+ cmp_largest = user_comparator_->CompareWithoutTimestamp(
+ user_key_, ExtractUserKey(f->largest_key));
}
// Setup file search bound for the next level based on the
return false;
}
};
+
+class FilePickerMultiGet {
+ private:
+ struct FilePickerContext;
+
+ public:
+ FilePickerMultiGet(MultiGetRange* range,
+ autovector<LevelFilesBrief>* file_levels,
+ unsigned int num_levels, FileIndexer* file_indexer,
+ const Comparator* user_comparator,
+ const InternalKeyComparator* internal_comparator)
+ : num_levels_(num_levels),
+ curr_level_(static_cast<unsigned int>(-1)),
+ returned_file_level_(static_cast<unsigned int>(-1)),
+ hit_file_level_(static_cast<unsigned int>(-1)),
+ range_(range),
+ batch_iter_(range->begin()),
+ batch_iter_prev_(range->begin()),
+ maybe_repeat_key_(false),
+ current_level_range_(*range, range->begin(), range->end()),
+ current_file_range_(*range, range->begin(), range->end()),
+ level_files_brief_(file_levels),
+ is_hit_file_last_in_level_(false),
+ curr_file_level_(nullptr),
+ file_indexer_(file_indexer),
+ user_comparator_(user_comparator),
+ internal_comparator_(internal_comparator) {
+ for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
+ fp_ctx_array_[iter.index()] =
+ FilePickerContext(0, FileIndexer::kLevelMaxIndex);
+ }
+
+ // Setup member variables to search first level.
+ search_ended_ = !PrepareNextLevel();
+ if (!search_ended_) {
+ // REVISIT
+ // Prefetch Level 0 table data to avoid cache miss if possible.
+ // As of now, only PlainTableReader and CuckooTableReader do any
+ // prefetching. This may not be necessary anymore once we implement
+ // batching in those table readers
+ for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
+ auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
+ if (r) {
+ for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
+ r->Prepare(iter->ikey);
+ }
+ }
+ }
+ }
+ }
+
+ int GetCurrentLevel() const { return curr_level_; }
+
+ // Iterates through files in the current level until it finds a file that
+ // contains atleast one key from the MultiGet batch
+ bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
+ size_t* file_index, FdWithKeyRange** fd,
+ bool* is_last_key_in_file) {
+ size_t curr_file_index = *file_index;
+ FdWithKeyRange* f = nullptr;
+ bool file_hit = false;
+ int cmp_largest = -1;
+ if (curr_file_index >= curr_file_level_->num_files) {
+ // In the unlikely case the next key is a duplicate of the current key,
+ // and the current key is the last in the level and the internal key
+ // was not found, we need to skip lookup for the remaining keys and
+ // reset the search bounds
+ if (batch_iter_ != current_level_range_.end()) {
+ ++batch_iter_;
+ for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
+ struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
+ fp_ctx.search_left_bound = 0;
+ fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
+ }
+ }
+ return false;
+ }
+ // Loops over keys in the MultiGet batch until it finds a file with
+ // atleast one of the keys. Then it keeps moving forward until the
+ // last key in the batch that falls in that file
+ while (batch_iter_ != current_level_range_.end() &&
+ (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
+ curr_file_index ||
+ !file_hit)) {
+ struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
+ f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
+ Slice& user_key = batch_iter_->ukey;
+
+ // Do key range filtering of files or/and fractional cascading if:
+ // (1) not all the files are in level 0, or
+ // (2) there are more than 3 current level files
+ // If there are only 3 or less current level files in the system, we
+ // skip the key range filtering. In this case, more likely, the system
+ // is highly tuned to minimize number of tables queried by each query,
+ // so it is unlikely that key range filtering is more efficient than
+ // querying the files.
+ if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
+ // Check if key is within a file's range. If search left bound and
+ // right bound point to the same find, we are sure key falls in
+ // range.
+ assert(curr_level_ == 0 ||
+ fp_ctx.curr_index_in_curr_level ==
+ fp_ctx.start_index_in_curr_level ||
+ user_comparator_->Compare(user_key,
+ ExtractUserKey(f->smallest_key)) <= 0);
+
+ int cmp_smallest = user_comparator_->Compare(
+ user_key, ExtractUserKey(f->smallest_key));
+ if (cmp_smallest >= 0) {
+ cmp_largest = user_comparator_->Compare(
+ user_key, ExtractUserKey(f->largest_key));
+ } else {
+ cmp_largest = -1;
+ }
+
+ // Setup file search bound for the next level based on the
+ // comparison results
+ if (curr_level_ > 0) {
+ file_indexer_->GetNextLevelIndex(
+ curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
+ cmp_largest, &fp_ctx.search_left_bound,
+ &fp_ctx.search_right_bound);
+ }
+ // Key falls out of current file's range
+ if (cmp_smallest < 0 || cmp_largest > 0) {
+ next_file_range->SkipKey(batch_iter_);
+ } else {
+ file_hit = true;
+ }
+ } else {
+ file_hit = true;
+ }
+ if (cmp_largest == 0) {
+ // cmp_largest is 0, which means the next key will not be in this
+ // file, so stop looking further. Also don't increment megt_iter_
+ // as we may have to look for this key in the next file if we don't
+ // find it in this one
+ break;
+ } else {
+ if (curr_level_ == 0) {
+ // We need to look through all files in level 0
+ ++fp_ctx.curr_index_in_curr_level;
+ }
+ ++batch_iter_;
+ }
+ if (!file_hit) {
+ curr_file_index =
+ (batch_iter_ != current_level_range_.end())
+ ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
+ : curr_file_level_->num_files;
+ }
+ }
+
+ *fd = f;
+ *file_index = curr_file_index;
+ *is_last_key_in_file = cmp_largest == 0;
+ return file_hit;
+ }
+
+ FdWithKeyRange* GetNextFile() {
+ while (!search_ended_) {
+ // Start searching next level.
+ if (batch_iter_ == current_level_range_.end()) {
+ search_ended_ = !PrepareNextLevel();
+ continue;
+ } else {
+ if (maybe_repeat_key_) {
+ maybe_repeat_key_ = false;
+ // Check if we found the final value for the last key in the
+ // previous lookup range. If we did, then there's no need to look
+ // any further for that key, so advance batch_iter_. Else, keep
+ // batch_iter_ positioned on that key so we look it up again in
+ // the next file
+ // For L0, always advance the key because we will look in the next
+ // file regardless for all keys not found yet
+ if (current_level_range_.CheckKeyDone(batch_iter_) ||
+ curr_level_ == 0) {
+ ++batch_iter_;
+ }
+ }
+ // batch_iter_prev_ will become the start key for the next file
+ // lookup
+ batch_iter_prev_ = batch_iter_;
+ }
+
+ MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
+ current_level_range_.end());
+ size_t curr_file_index =
+ (batch_iter_ != current_level_range_.end())
+ ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
+ : curr_file_level_->num_files;
+ FdWithKeyRange* f;
+ bool is_last_key_in_file;
+ if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
+ &is_last_key_in_file)) {
+ search_ended_ = !PrepareNextLevel();
+ } else {
+ MultiGetRange::Iterator upper_key = batch_iter_;
+ if (is_last_key_in_file) {
+ // Since cmp_largest is 0, batch_iter_ still points to the last key
+ // that falls in this file, instead of the next one. Increment
+ // upper_key so we can set the range properly for SST MultiGet
+ ++upper_key;
+ ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level);
+ maybe_repeat_key_ = true;
+ }
+ // Set the range for this file
+ current_file_range_ =
+ MultiGetRange(next_file_range, batch_iter_prev_, upper_key);
+ returned_file_level_ = curr_level_;
+ hit_file_level_ = curr_level_;
+ is_hit_file_last_in_level_ =
+ curr_file_index == curr_file_level_->num_files - 1;
+ return f;
+ }
+ }
+
+ // Search ended
+ return nullptr;
+ }
+
+ // getter for current file level
+ // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
+ unsigned int GetHitFileLevel() { return hit_file_level_; }
+
+ // Returns true if the most recent "hit file" (i.e., one returned by
+ // GetNextFile()) is at the last index in its level.
+ bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
+
+ const MultiGetRange& CurrentFileRange() { return current_file_range_; }
+
+ private:
+ unsigned int num_levels_;
+ unsigned int curr_level_;
+ unsigned int returned_file_level_;
+ unsigned int hit_file_level_;
+
+ struct FilePickerContext {
+ int32_t search_left_bound;
+ int32_t search_right_bound;
+ unsigned int curr_index_in_curr_level;
+ unsigned int start_index_in_curr_level;
+
+ FilePickerContext(int32_t left, int32_t right)
+ : search_left_bound(left), search_right_bound(right),
+ curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
+
+ FilePickerContext() = default;
+ };
+ std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
+ MultiGetRange* range_;
+ // Iterator to iterate through the keys in a MultiGet batch, that gets reset
+ // at the beginning of each level. Each call to GetNextFile() will position
+ // batch_iter_ at or right after the last key that was found in the returned
+ // SST file
+ MultiGetRange::Iterator batch_iter_;
+ // An iterator that records the previous position of batch_iter_, i.e last
+ // key found in the previous SST file, in order to serve as the start of
+ // the batch key range for the next SST file
+ MultiGetRange::Iterator batch_iter_prev_;
+ bool maybe_repeat_key_;
+ MultiGetRange current_level_range_;
+ MultiGetRange current_file_range_;
+ autovector<LevelFilesBrief>* level_files_brief_;
+ bool search_ended_;
+ bool is_hit_file_last_in_level_;
+ LevelFilesBrief* curr_file_level_;
+ FileIndexer* file_indexer_;
+ const Comparator* user_comparator_;
+ const InternalKeyComparator* internal_comparator_;
+
+ // Setup local variables to search next level.
+ // Returns false if there are no more levels to search.
+ bool PrepareNextLevel() {
+ if (curr_level_ == 0) {
+ MultiGetRange::Iterator mget_iter = current_level_range_.begin();
+ if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
+ curr_file_level_->num_files) {
+ batch_iter_prev_ = current_level_range_.begin();
+ batch_iter_ = current_level_range_.begin();
+ return true;
+ }
+ }
+
+ curr_level_++;
+ // Reset key range to saved value
+ while (curr_level_ < num_levels_) {
+ bool level_contains_keys = false;
+ curr_file_level_ = &(*level_files_brief_)[curr_level_];
+ if (curr_file_level_->num_files == 0) {
+ // When current level is empty, the search bound generated from upper
+ // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
+ // also empty.
+
+ for (auto mget_iter = current_level_range_.begin();
+ mget_iter != current_level_range_.end(); ++mget_iter) {
+ struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
+
+ assert(fp_ctx.search_left_bound == 0);
+ assert(fp_ctx.search_right_bound == -1 ||
+ fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
+ // Since current level is empty, it will need to search all files in
+ // the next level
+ fp_ctx.search_left_bound = 0;
+ fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
+ }
+ // Skip all subsequent empty levels
+ do {
+ ++curr_level_;
+ } while ((curr_level_ < num_levels_) &&
+ (*level_files_brief_)[curr_level_].num_files == 0);
+ continue;
+ }
+
+ // Some files may overlap each other. We find
+ // all files that overlap user_key and process them in order from
+ // newest to oldest. In the context of merge-operator, this can occur at
+ // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
+ // are always compacted into a single entry).
+ int32_t start_index = -1;
+ current_level_range_ =
+ MultiGetRange(*range_, range_->begin(), range_->end());
+ for (auto mget_iter = current_level_range_.begin();
+ mget_iter != current_level_range_.end(); ++mget_iter) {
+ struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
+ if (curr_level_ == 0) {
+ // On Level-0, we read through all files to check for overlap.
+ start_index = 0;
+ level_contains_keys = true;
+ } else {
+ // On Level-n (n>=1), files are sorted. Binary search to find the
+ // earliest file whose largest key >= ikey. Search left bound and
+ // right bound are used to narrow the range.
+ if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
+ if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
+ fp_ctx.search_right_bound =
+ static_cast<int32_t>(curr_file_level_->num_files) - 1;
+ }
+ // `search_right_bound_` is an inclusive upper-bound, but since it
+ // was determined based on user key, it is still possible the lookup
+ // key falls to the right of `search_right_bound_`'s corresponding
+ // file. So, pass a limit one higher, which allows us to detect this
+ // case.
+ Slice& ikey = mget_iter->ikey;
+ start_index = FindFileInRange(
+ *internal_comparator_, *curr_file_level_, ikey,
+ static_cast<uint32_t>(fp_ctx.search_left_bound),
+ static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
+ if (start_index == fp_ctx.search_right_bound + 1) {
+ // `ikey_` comes after `search_right_bound_`. The lookup key does
+ // not exist on this level, so let's skip this level and do a full
+ // binary search on the next level.
+ fp_ctx.search_left_bound = 0;
+ fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
+ current_level_range_.SkipKey(mget_iter);
+ continue;
+ } else {
+ level_contains_keys = true;
+ }
+ } else {
+ // search_left_bound > search_right_bound, key does not exist in
+ // this level. Since no comparison is done in this level, it will
+ // need to search all files in the next level.
+ fp_ctx.search_left_bound = 0;
+ fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
+ current_level_range_.SkipKey(mget_iter);
+ continue;
+ }
+ }
+ fp_ctx.start_index_in_curr_level = start_index;
+ fp_ctx.curr_index_in_curr_level = start_index;
+ }
+ if (level_contains_keys) {
+ batch_iter_prev_ = current_level_range_.begin();
+ batch_iter_ = current_level_range_.begin();
+ return true;
+ }
+ curr_level_++;
+ }
+ // curr_level_ = num_levels_. So, no more levels to search.
+ return false;
+ }
+};
} // anonymous namespace
VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
const Slice* user_key, const FdWithKeyRange* f) {
// nullptr user_key occurs before all keys and is therefore never after *f
return (user_key != nullptr &&
- ucmp->Compare(*user_key, ExtractUserKey(f->largest_key)) > 0);
+ ucmp->CompareWithoutTimestamp(*user_key,
+ ExtractUserKey(f->largest_key)) > 0);
}
static bool BeforeFile(const Comparator* ucmp,
const Slice* user_key, const FdWithKeyRange* f) {
// nullptr user_key occurs after all keys and is therefore never before *f
return (user_key != nullptr &&
- ucmp->Compare(*user_key, ExtractUserKey(f->smallest_key)) < 0);
+ ucmp->CompareWithoutTimestamp(*user_key,
+ ExtractUserKey(f->smallest_key)) < 0);
}
bool SomeFileOverlapsRange(
class LevelIterator final : public InternalIterator {
public:
- LevelIterator(
- TableCache* table_cache, const ReadOptions& read_options,
- const EnvOptions& env_options, const InternalKeyComparator& icomparator,
- const LevelFilesBrief* flevel, const SliceTransform* prefix_extractor,
- bool should_sample, HistogramImpl* file_read_hist, bool for_compaction,
- bool skip_filters, int level, RangeDelAggregator* range_del_agg,
- const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries =
- nullptr)
+ LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
+ const FileOptions& file_options,
+ const InternalKeyComparator& icomparator,
+ const LevelFilesBrief* flevel,
+ const SliceTransform* prefix_extractor, bool should_sample,
+ HistogramImpl* file_read_hist, TableReaderCaller caller,
+ bool skip_filters, int level, RangeDelAggregator* range_del_agg,
+ const std::vector<AtomicCompactionUnitBoundary>*
+ compaction_boundaries = nullptr)
: table_cache_(table_cache),
read_options_(read_options),
- env_options_(env_options),
+ file_options_(file_options),
icomparator_(icomparator),
user_comparator_(icomparator.user_comparator()),
flevel_(flevel),
prefix_extractor_(prefix_extractor),
file_read_hist_(file_read_hist),
should_sample_(should_sample),
- for_compaction_(for_compaction),
+ caller_(caller),
skip_filters_(skip_filters),
file_index_(flevel_->num_files),
level_(level),
void SeekForPrev(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
- void Next() override;
+ void Next() final override;
+ bool NextAndGetResult(IterateResult* result) override;
void Prev() override;
bool Valid() const override { return file_iter_.Valid(); }
assert(Valid());
return file_iter_.key();
}
+
Slice value() const override {
assert(Valid());
return file_iter_.value();
}
+
Status status() const override {
return file_iter_.iter() ? file_iter_.status() : Status::OK();
}
+
+ inline bool MayBeOutOfLowerBound() override {
+ assert(Valid());
+ return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
+ }
+
+ inline bool MayBeOutOfUpperBound() override {
+ assert(Valid());
+ return file_iter_.MayBeOutOfUpperBound();
+ }
+
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
if (file_iter_.iter()) {
file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
}
}
+
bool IsKeyPinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
file_iter_.iter() && file_iter_.IsKeyPinned();
}
+
bool IsValuePinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
file_iter_.iter() && file_iter_.IsValuePinned();
}
private:
- void SkipEmptyFileForward();
+ // Return true if at least one invalid file is seen and skipped.
+ bool SkipEmptyFileForward();
void SkipEmptyFileBackward();
void SetFileIterator(InternalIterator* iter);
void InitFileIterator(size_t new_file_index);
+ // Called by both of Next() and NextAndGetResult(). Force inline.
+ void NextImpl() {
+ assert(Valid());
+ file_iter_.Next();
+ SkipEmptyFileForward();
+ }
+
const Slice& file_smallest_key(size_t file_index) {
assert(file_index < flevel_->num_files);
return flevel_->files[file_index].smallest_key;
bool KeyReachedUpperBound(const Slice& internal_key) {
return read_options_.iterate_upper_bound != nullptr &&
- user_comparator_.Compare(ExtractUserKey(internal_key),
- *read_options_.iterate_upper_bound) >= 0;
+ user_comparator_.CompareWithoutTimestamp(
+ ExtractUserKey(internal_key),
+ *read_options_.iterate_upper_bound) >= 0;
}
InternalIterator* NewFileIterator() {
smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
}
+ CheckMayBeOutOfLowerBound();
return table_cache_->NewIterator(
- read_options_, env_options_, icomparator_, *file_meta.file_metadata,
+ read_options_, file_options_, icomparator_, *file_meta.file_metadata,
range_del_agg_, prefix_extractor_,
- nullptr /* don't need reference to table */,
- file_read_hist_, for_compaction_, nullptr /* arena */, skip_filters_,
- level_, smallest_compaction_key, largest_compaction_key);
+ nullptr /* don't need reference to table */, file_read_hist_, caller_,
+ /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key,
+ largest_compaction_key);
+ }
+
+ // Check if current file being fully within iterate_lower_bound.
+ //
+ // Note MyRocks may update iterate bounds between seek. To workaround it,
+ // we need to check and update may_be_out_of_lower_bound_ accordingly.
+ void CheckMayBeOutOfLowerBound() {
+ if (read_options_.iterate_lower_bound != nullptr &&
+ file_index_ < flevel_->num_files) {
+ may_be_out_of_lower_bound_ =
+ user_comparator_.Compare(
+ ExtractUserKey(file_smallest_key(file_index_)),
+ *read_options_.iterate_lower_bound) < 0;
+ }
}
TableCache* table_cache_;
const ReadOptions read_options_;
- const EnvOptions& env_options_;
+ const FileOptions& file_options_;
const InternalKeyComparator& icomparator_;
const UserComparatorWrapper user_comparator_;
const LevelFilesBrief* flevel_;
mutable FileDescriptor current_value_;
+ // `prefix_extractor_` may be non-null even for total order seek. Checking
+ // this variable is not the right way to identify whether prefix iterator
+ // is used.
const SliceTransform* prefix_extractor_;
HistogramImpl* file_read_hist_;
bool should_sample_;
- bool for_compaction_;
+ TableReaderCaller caller_;
bool skip_filters_;
+ bool may_be_out_of_lower_bound_ = true;
size_t file_index_;
int level_;
RangeDelAggregator* range_del_agg_;
};
void LevelIterator::Seek(const Slice& target) {
- size_t new_file_index = FindFile(icomparator_, *flevel_, target);
+ // Check whether the seek key fall under the same file
+ bool need_to_reseek = true;
+ if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
+ const FdWithKeyRange& cur_file = flevel_->files[file_index_];
+ if (icomparator_.InternalKeyComparator::Compare(
+ target, cur_file.largest_key) <= 0 &&
+ icomparator_.InternalKeyComparator::Compare(
+ target, cur_file.smallest_key) >= 0) {
+ need_to_reseek = false;
+ assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
+ file_index_);
+ }
+ }
+ if (need_to_reseek) {
+ TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
+ size_t new_file_index = FindFile(icomparator_, *flevel_, target);
+ InitFileIterator(new_file_index);
+ }
- InitFileIterator(new_file_index);
if (file_iter_.iter() != nullptr) {
file_iter_.Seek(target);
}
- SkipEmptyFileForward();
+ if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
+ !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
+ file_iter_.iter() != nullptr && file_iter_.Valid()) {
+ // We've skipped the file we initially positioned to. In the prefix
+ // seek case, it is likely that the file is skipped because of
+ // prefix bloom or hash, where more keys are skipped. We then check
+ // the current key and invalidate the iterator if the prefix is
+ // already passed.
+ // When doing prefix iterator seek, when keys for one prefix have
+ // been exhausted, it can jump to any key that is larger. Here we are
+ // enforcing a stricter contract than that, in order to make it easier for
+ // higher layers (merging and DB iterator) to reason the correctness:
+ // 1. Within the prefix, the result should be accurate.
+ // 2. If keys for the prefix is exhausted, it is either positioned to the
+ // next key after the prefix, or make the iterator invalid.
+ // A side benefit will be that it invalidates the iterator earlier so that
+ // the upper level merging iterator can merge fewer child iterators.
+ Slice target_user_key = ExtractUserKey(target);
+ Slice file_user_key = ExtractUserKey(file_iter_.key());
+ if (prefix_extractor_->InDomain(target_user_key) &&
+ (!prefix_extractor_->InDomain(file_user_key) ||
+ user_comparator_.Compare(
+ prefix_extractor_->Transform(target_user_key),
+ prefix_extractor_->Transform(file_user_key)) != 0)) {
+ SetFileIterator(nullptr);
+ }
+ }
+ CheckMayBeOutOfLowerBound();
}
void LevelIterator::SeekForPrev(const Slice& target) {
file_iter_.SeekForPrev(target);
SkipEmptyFileBackward();
}
+ CheckMayBeOutOfLowerBound();
}
void LevelIterator::SeekToFirst() {
file_iter_.SeekToFirst();
}
SkipEmptyFileForward();
+ CheckMayBeOutOfLowerBound();
}
void LevelIterator::SeekToLast() {
file_iter_.SeekToLast();
}
SkipEmptyFileBackward();
+ CheckMayBeOutOfLowerBound();
}
-void LevelIterator::Next() {
- assert(Valid());
- file_iter_.Next();
- SkipEmptyFileForward();
+void LevelIterator::Next() { NextImpl(); }
+
+bool LevelIterator::NextAndGetResult(IterateResult* result) {
+ NextImpl();
+ bool is_valid = Valid();
+ if (is_valid) {
+ result->key = key();
+ result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
+ }
+ return is_valid;
}
void LevelIterator::Prev() {
SkipEmptyFileBackward();
}
-void LevelIterator::SkipEmptyFileForward() {
+bool LevelIterator::SkipEmptyFileForward() {
+ bool seen_empty_file = false;
while (file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && file_iter_.status().ok() &&
!file_iter_.iter()->IsOutOfBound())) {
+ seen_empty_file = true;
// Move to next file
if (file_index_ >= flevel_->num_files - 1) {
// Already at the last file
SetFileIterator(nullptr);
- return;
+ break;
}
if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
SetFileIterator(nullptr);
- return;
+ break;
}
InitFileIterator(file_index_ + 1);
if (file_iter_.iter() != nullptr) {
file_iter_.SeekToFirst();
}
}
+ return seen_empty_file;
}
void LevelIterator::SkipEmptyFileBackward() {
public:
explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
: version_builder_(new VersionBuilder(
- cfd->current()->version_set()->env_options(), cfd->table_cache(),
+ cfd->current()->version_set()->file_options(), cfd->table_cache(),
cfd->current()->storage_info(), cfd->ioptions()->info_log)),
version_(cfd->current()) {
version_->Ref();
auto table_cache = cfd_->table_cache();
auto ioptions = cfd_->ioptions();
Status s = table_cache->GetTableProperties(
- env_options_, cfd_->internal_comparator(), file_meta->fd, tp,
+ file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
mutable_cf_options_.prefix_extractor.get(), true /* no io */);
if (s.ok()) {
return s;
// 2. Table is not present in table cache, we'll read the table properties
// directly from the properties block in the file.
- std::unique_ptr<RandomAccessFile> file;
+ std::unique_ptr<FSRandomAccessFile> file;
std::string file_name;
if (fname != nullptr) {
file_name = *fname;
TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
}
- s = ioptions->env->NewRandomAccessFile(file_name, &file, env_options_);
+ s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
+ nullptr);
if (!s.ok()) {
return s;
}
new RandomAccessFileReader(
std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
0 /* hist_type */, nullptr /* file_read_hist */,
- nullptr /* rate_limiter */, false /* for_compaction*/,
- ioptions->listeners));
+ nullptr /* rate_limiter */, ioptions->listeners));
s = ReadTableProperties(
file_reader.get(), file_meta->fd.GetFileSize(),
Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
return Status::OK();
}
+Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
+ std::string* out_str) {
+ if (max_entries_to_print <= 0) {
+ return Status::OK();
+ }
+ int num_entries_left = max_entries_to_print;
+
+ std::stringstream ss;
+
+ for (int level = 0; level < storage_info_.num_levels_; level++) {
+ for (const auto& file_meta : storage_info_.files_[level]) {
+ auto fname =
+ TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
+ file_meta->fd.GetPathId());
+
+ ss << "=== file : " << fname << " ===\n";
+
+ TableCache* table_cache = cfd_->table_cache();
+ std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
+
+ Status s = table_cache->GetRangeTombstoneIterator(
+ ReadOptions(), cfd_->internal_comparator(), *file_meta,
+ &tombstone_iter);
+ if (!s.ok()) {
+ return s;
+ }
+ if (tombstone_iter) {
+ tombstone_iter->SeekToFirst();
+
+ while (tombstone_iter->Valid() && num_entries_left > 0) {
+ ss << "start: " << tombstone_iter->start_key().ToString(true)
+ << " end: " << tombstone_iter->end_key().ToString(true)
+ << " seq: " << tombstone_iter->seq() << '\n';
+ tombstone_iter->Next();
+ num_entries_left--;
+ }
+ if (num_entries_left <= 0) {
+ break;
+ }
+ }
+ }
+ if (num_entries_left <= 0) {
+ break;
+ }
+ }
+ assert(num_entries_left >= 0);
+ if (num_entries_left <= 0) {
+ ss << "(results may not be complete)\n";
+ }
+
+ *out_str = ss.str();
+ return Status::OK();
+}
+
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
int level) {
for (const auto& file_meta : storage_info_.files_[level]) {
for (auto& file_level : storage_info_.level_files_brief_) {
for (size_t i = 0; i < file_level.num_files; i++) {
total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
- env_options_, cfd_->internal_comparator(), file_level.files[i].fd,
+ file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
mutable_cf_options_.prefix_extractor.get());
}
}
assert(!ioptions->cf_paths.empty());
file_path = ioptions->cf_paths.back().path;
}
+ const uint64_t file_number = file->fd.GetNumber();
files.emplace_back(SstFileMetaData{
- MakeTableFileName("", file->fd.GetNumber()),
- file_path,
- static_cast<size_t>(file->fd.GetFileSize()),
- file->fd.smallest_seqno,
- file->fd.largest_seqno,
- file->smallest.user_key().ToString(),
+ MakeTableFileName("", file_number), file_number, file_path,
+ static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
+ file->fd.largest_seqno, file->smallest.user_key().ToString(),
file->largest.user_key().ToString(),
file->stats.num_reads_sampled.load(std::memory_order_relaxed),
- file->being_compacted});
+ file->being_compacted, file->oldest_blob_file_number,
+ file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime(),
+ file->file_checksum, file->file_checksum_func_name});
files.back().num_entries = file->num_entries;
files.back().num_deletions = file->num_deletions;
level_size += file->fd.GetFileSize();
return sst_files_size;
}
+void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
+ uint64_t oldest_time = port::kMaxUint64;
+ for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
+ for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
+ assert(meta->fd.table_reader != nullptr);
+ uint64_t file_creation_time = meta->TryGetFileCreationTime();
+ if (file_creation_time == kUnknownFileCreationTime) {
+ *creation_time = 0;
+ return;
+ }
+ if (file_creation_time < oldest_time) {
+ oldest_time = file_creation_time;
+ }
+ }
+ }
+ *creation_time = oldest_time;
+}
+
uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
// Estimation will be inaccurate when:
// (1) there exist merge keys
}
void Version::AddIterators(const ReadOptions& read_options,
- const EnvOptions& soptions,
+ const FileOptions& soptions,
MergeIteratorBuilder* merge_iter_builder,
RangeDelAggregator* range_del_agg) {
assert(storage_info_.finalized_);
}
void Version::AddIteratorsForLevel(const ReadOptions& read_options,
- const EnvOptions& soptions,
+ const FileOptions& soptions,
MergeIteratorBuilder* merge_iter_builder,
int level,
RangeDelAggregator* range_del_agg) {
for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
const auto& file = storage_info_.LevelFilesBrief(0).files[i];
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
- read_options, soptions, cfd_->internal_comparator(), *file.file_metadata,
- range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
- cfd_->internal_stats()->GetFileReadHist(0), false, arena,
- false /* skip_filters */, 0 /* level */));
+ read_options, soptions, cfd_->internal_comparator(),
+ *file.file_metadata, range_del_agg,
+ mutable_cf_options_.prefix_extractor.get(), nullptr,
+ cfd_->internal_stats()->GetFileReadHist(0),
+ TableReaderCaller::kUserIterator, arena,
+ /*skip_filters=*/false, /*level=*/0,
+ /*smallest_compaction_key=*/nullptr,
+ /*largest_compaction_key=*/nullptr));
}
if (should_sample) {
// Count ones for every L0 files. This is done per iterator creation
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
- false /* for_compaction */, IsFilterSkipped(level), level,
- range_del_agg));
+ TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
+ range_del_agg, /*largest_compaction_key=*/nullptr));
}
}
Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
- const EnvOptions& env_options,
+ const FileOptions& file_options,
const Slice& smallest_user_key,
const Slice& largest_user_key,
int level, bool* overlap) {
continue;
}
ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
- read_options, env_options, cfd_->internal_comparator(), *file->file_metadata,
- &range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
- cfd_->internal_stats()->GetFileReadHist(0), false, &arena,
- false /* skip_filters */, 0 /* level */));
+ read_options, file_options, cfd_->internal_comparator(),
+ *file->file_metadata, &range_del_agg,
+ mutable_cf_options_.prefix_extractor.get(), nullptr,
+ cfd_->internal_stats()->GetFileReadHist(0),
+ TableReaderCaller::kUserIterator, &arena,
+ /*skip_filters=*/false, /*level=*/0,
+ /*smallest_compaction_key=*/nullptr,
+ /*largest_compaction_key=*/nullptr));
status = OverlapWithIterator(
ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
if (!status.ok() || *overlap) {
} else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
auto mem = arena.AllocateAligned(sizeof(LevelIterator));
ScopedArenaIterator iter(new (mem) LevelIterator(
- cfd_->table_cache(), read_options, env_options,
+ cfd_->table_cache(), read_options, file_options,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
- false /* for_compaction */, IsFilterSkipped(level), level,
+ TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
&range_del_agg));
status = OverlapWithIterator(
ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
}
Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
- const EnvOptions& env_opt,
+ const FileOptions& file_opt,
const MutableCFOptions mutable_cf_options,
uint64_t version_number)
: env_(vset->env_),
next_(this),
prev_(this),
refs_(0),
- env_options_(env_opt),
+ file_options_(file_opt),
mutable_cf_options_(mutable_cf_options),
version_number_(version_number) {}
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, bool* value_found,
bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
- bool* is_blob) {
+ bool* is_blob, bool do_merge) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
}
PinnedIteratorsManager pinned_iters_mgr;
+ uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
+ if (vset_ && vset_->block_cache_tracer_ &&
+ vset_->block_cache_tracer_->is_tracing_enabled()) {
+ tracing_get_id = vset_->block_cache_tracer_->NextGetId();
+ }
GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
- value, value_found, merge_context, max_covering_tombstone_seq, this->env_,
- seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
+ do_merge ? value : nullptr, value_found, merge_context, do_merge,
+ max_covering_tombstone_seq, this->env_, seq,
+ merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
+ tracing_get_id);
// Pin blocks that we read to hold merge operands
if (merge_operator_) {
} else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
}
- PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel());
+ PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
+ fp.GetHitFileLevel());
return;
case GetContext::kDeleted:
// Use empty error message for speed
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
- "rocksdb::blob_db::BlobDB instead.");
+ "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
return;
}
f = fp.GetNextFile();
}
-
if (db_statistics_ != nullptr) {
get_context.ReportCounters();
}
if (GetContext::kMerge == get_context.State()) {
+ if (!do_merge) {
+ *status = Status::OK();
+ return;
+ }
if (!merge_operator_) {
*status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
}
}
-bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
- // Reaching the bottom level implies misses at all upper levels, so we'll
- // skip checking the filters when we predict a hit.
- return cfd_->ioptions()->optimize_filters_for_hits &&
- (level > 0 || is_file_last_in_level) &&
- level == storage_info_.num_non_empty_levels() - 1;
-}
+void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
+ ReadCallback* callback, bool* is_blob) {
+ PinnedIteratorsManager pinned_iters_mgr;
-void VersionStorageInfo::GenerateLevelFilesBrief() {
- level_files_brief_.resize(num_non_empty_levels_);
- for (int level = 0; level < num_non_empty_levels_; level++) {
- DoGenerateLevelFilesBrief(
- &level_files_brief_[level], files_[level], &arena_);
+ // Pin blocks that we read to hold merge operands
+ if (merge_operator_) {
+ pinned_iters_mgr.StartPinning();
}
-}
-
-void Version::PrepareApply(
- const MutableCFOptions& mutable_cf_options,
- bool update_stats) {
- UpdateAccumulatedStats(update_stats);
- storage_info_.UpdateNumNonEmptyLevels();
- storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
- storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
- storage_info_.GenerateFileIndexer();
- storage_info_.GenerateLevelFilesBrief();
- storage_info_.GenerateLevel0NonOverlapping();
- storage_info_.GenerateBottommostFiles();
-}
+ uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
+
+ if (vset_ && vset_->block_cache_tracer_ &&
+ vset_->block_cache_tracer_->is_tracing_enabled()) {
+ tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
+ }
+ // Even though we know the batch size won't be > MAX_BATCH_SIZE,
+ // use autovector in order to avoid unnecessary construction of GetContext
+ // objects, which is expensive
+ autovector<GetContext, 16> get_ctx;
+ for (auto iter = range->begin(); iter != range->end(); ++iter) {
+ assert(iter->s->ok() || iter->s->IsMergeInProgress());
+ get_ctx.emplace_back(
+ user_comparator(), merge_operator_, info_log_, db_statistics_,
+ iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
+ iter->value, nullptr, &(iter->merge_context), true,
+ &iter->max_covering_tombstone_seq, this->env_, nullptr,
+ merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
+ tracing_mget_id);
+ // MergeInProgress status, if set, has been transferred to the get_context
+ // state, so we set status to ok here. From now on, the iter status will
+ // be used for IO errors, and get_context state will be used for any
+ // key level errors
+ *(iter->s) = Status::OK();
+ }
+ int get_ctx_index = 0;
+ for (auto iter = range->begin(); iter != range->end();
+ ++iter, get_ctx_index++) {
+ iter->get_context = &(get_ctx[get_ctx_index]);
+ }
+
+ MultiGetRange file_picker_range(*range, range->begin(), range->end());
+ FilePickerMultiGet fp(
+ &file_picker_range,
+ &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
+ &storage_info_.file_indexer_, user_comparator(), internal_comparator());
+ FdWithKeyRange* f = fp.GetNextFile();
-bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
- if (file_meta->init_stats_from_file ||
- file_meta->compensated_file_size > 0) {
- return false;
- }
- std::shared_ptr<const TableProperties> tp;
- Status s = GetTableProperties(&tp, file_meta);
- file_meta->init_stats_from_file = true;
- if (!s.ok()) {
- ROCKS_LOG_ERROR(vset_->db_options_->info_log,
- "Unable to load table properties for file %" PRIu64
- " --- %s\n",
- file_meta->fd.GetNumber(), s.ToString().c_str());
- return false;
- }
- if (tp.get() == nullptr) return false;
- file_meta->num_entries = tp->num_entries;
- file_meta->num_deletions = tp->num_deletions;
+ while (f != nullptr) {
+ MultiGetRange file_range = fp.CurrentFileRange();
+ bool timer_enabled =
+ GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
+ get_perf_context()->per_level_perf_context_enabled;
+ StopWatchNano timer(env_, timer_enabled /* auto_start */);
+ Status s = table_cache_->MultiGet(
+ read_options, *internal_comparator(), *f->file_metadata, &file_range,
+ mutable_cf_options_.prefix_extractor.get(),
+ cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
+ IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
+ fp.IsHitFileLastInLevel()),
+ fp.GetCurrentLevel());
+ // TODO: examine the behavior for corrupted key
+ if (timer_enabled) {
+ PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
+ fp.GetCurrentLevel());
+ }
+ if (!s.ok()) {
+ // TODO: Set status for individual keys appropriately
+ for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
+ *iter->s = s;
+ file_range.MarkKeyDone(iter);
+ }
+ return;
+ }
+ uint64_t batch_size = 0;
+ for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
+ GetContext& get_context = *iter->get_context;
+ Status* status = iter->s;
+ // The Status in the KeyContext takes precedence over GetContext state
+ // Status may be an error if there were any IO errors in the table
+ // reader. We never expect Status to be NotFound(), as that is
+ // determined by get_context
+ assert(!status->IsNotFound());
+ if (!status->ok()) {
+ file_range.MarkKeyDone(iter);
+ continue;
+ }
+
+ if (get_context.sample()) {
+ sample_file_read_inc(f->file_metadata);
+ }
+ batch_size++;
+ // report the counters before returning
+ if (get_context.State() != GetContext::kNotFound &&
+ get_context.State() != GetContext::kMerge &&
+ db_statistics_ != nullptr) {
+ get_context.ReportCounters();
+ } else {
+ if (iter->max_covering_tombstone_seq > 0) {
+ // The remaining files we look at will only contain covered keys, so
+ // we stop here for this key
+ file_picker_range.SkipKey(iter);
+ }
+ }
+ switch (get_context.State()) {
+ case GetContext::kNotFound:
+ // Keep searching in other files
+ break;
+ case GetContext::kMerge:
+ // TODO: update per-level perfcontext user_key_return_count for kMerge
+ break;
+ case GetContext::kFound:
+ if (fp.GetHitFileLevel() == 0) {
+ RecordTick(db_statistics_, GET_HIT_L0);
+ } else if (fp.GetHitFileLevel() == 1) {
+ RecordTick(db_statistics_, GET_HIT_L1);
+ } else if (fp.GetHitFileLevel() >= 2) {
+ RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
+ }
+ PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
+ fp.GetHitFileLevel());
+ file_range.MarkKeyDone(iter);
+ continue;
+ case GetContext::kDeleted:
+ // Use empty error message for speed
+ *status = Status::NotFound();
+ file_range.MarkKeyDone(iter);
+ continue;
+ case GetContext::kCorrupt:
+ *status =
+ Status::Corruption("corrupted key for ", iter->lkey->user_key());
+ file_range.MarkKeyDone(iter);
+ continue;
+ case GetContext::kBlobIndex:
+ ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
+ *status = Status::NotSupported(
+ "Encounter unexpected blob index. Please open DB with "
+ "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
+ file_range.MarkKeyDone(iter);
+ continue;
+ }
+ }
+ RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
+ if (file_picker_range.empty()) {
+ break;
+ }
+ f = fp.GetNextFile();
+ }
+
+ // Process any left over keys
+ for (auto iter = range->begin(); iter != range->end(); ++iter) {
+ GetContext& get_context = *iter->get_context;
+ Status* status = iter->s;
+ Slice user_key = iter->lkey->user_key();
+
+ if (db_statistics_ != nullptr) {
+ get_context.ReportCounters();
+ }
+ if (GetContext::kMerge == get_context.State()) {
+ if (!merge_operator_) {
+ *status = Status::InvalidArgument(
+ "merge_operator is not properly initialized.");
+ range->MarkKeyDone(iter);
+ continue;
+ }
+ // merge_operands are in saver and we hit the beginning of the key history
+ // do a final merge of nullptr and operands;
+ std::string* str_value =
+ iter->value != nullptr ? iter->value->GetSelf() : nullptr;
+ *status = MergeHelper::TimedFullMerge(
+ merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
+ str_value, info_log_, db_statistics_, env_,
+ nullptr /* result_operand */, true);
+ if (LIKELY(iter->value != nullptr)) {
+ iter->value->PinSelf();
+ }
+ } else {
+ range->MarkKeyDone(iter);
+ *status = Status::NotFound(); // Use an empty error message for speed
+ }
+ }
+}
+
+bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
+ // Reaching the bottom level implies misses at all upper levels, so we'll
+ // skip checking the filters when we predict a hit.
+ return cfd_->ioptions()->optimize_filters_for_hits &&
+ (level > 0 || is_file_last_in_level) &&
+ level == storage_info_.num_non_empty_levels() - 1;
+}
+
+void VersionStorageInfo::GenerateLevelFilesBrief() {
+ level_files_brief_.resize(num_non_empty_levels_);
+ for (int level = 0; level < num_non_empty_levels_; level++) {
+ DoGenerateLevelFilesBrief(
+ &level_files_brief_[level], files_[level], &arena_);
+ }
+}
+
+void Version::PrepareApply(
+ const MutableCFOptions& mutable_cf_options,
+ bool update_stats) {
+ UpdateAccumulatedStats(update_stats);
+ storage_info_.UpdateNumNonEmptyLevels();
+ storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
+ storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
+ storage_info_.GenerateFileIndexer();
+ storage_info_.GenerateLevelFilesBrief();
+ storage_info_.GenerateLevel0NonOverlapping();
+ storage_info_.GenerateBottommostFiles();
+}
+
+bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
+ if (file_meta->init_stats_from_file ||
+ file_meta->compensated_file_size > 0) {
+ return false;
+ }
+ std::shared_ptr<const TableProperties> tp;
+ Status s = GetTableProperties(&tp, file_meta);
+ file_meta->init_stats_from_file = true;
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(vset_->db_options_->info_log,
+ "Unable to load table properties for file %" PRIu64
+ " --- %s\n",
+ file_meta->fd.GetNumber(), s.ToString().c_str());
+ return false;
+ }
+ if (tp.get() == nullptr) return false;
+ file_meta->num_entries = tp->num_entries;
+ file_meta->num_deletions = tp->num_deletions;
file_meta->raw_value_size = tp->raw_value_size;
file_meta->raw_key_size = tp->raw_key_size;
}
void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
+ TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
+ nullptr);
+
assert(file_meta->init_stats_from_file);
accumulated_file_size_ += file_meta->fd.GetFileSize();
accumulated_raw_key_size_ += file_meta->raw_key_size;
auto status = ioptions.env->GetCurrentTime(&_current_time);
if (status.ok()) {
const uint64_t current_time = static_cast<uint64_t>(_current_time);
- for (auto f : files) {
- if (!f->being_compacted && f->fd.table_reader != nullptr &&
- f->fd.table_reader->GetTableProperties() != nullptr) {
- auto creation_time =
- f->fd.table_reader->GetTableProperties()->creation_time;
- if (creation_time > 0 &&
- creation_time < (current_time - mutable_cf_options.ttl)) {
+ for (FileMetaData* f : files) {
+ if (!f->being_compacted) {
+ uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
+ if (oldest_ancester_time != 0 &&
+ oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
ttl_expired_files_count++;
}
}
if (mutable_cf_options.ttl > 0) {
ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
}
+ if (mutable_cf_options.periodic_compaction_seconds > 0) {
+ ComputeFilesMarkedForPeriodicCompaction(
+ immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
+ }
EstimateCompactionBytesNeeded(mutable_cf_options);
}
const uint64_t current_time = static_cast<uint64_t>(_current_time);
for (int level = 0; level < num_levels() - 1; level++) {
- for (auto f : files_[level]) {
- if (!f->being_compacted && f->fd.table_reader != nullptr &&
- f->fd.table_reader->GetTableProperties() != nullptr) {
- auto creation_time =
- f->fd.table_reader->GetTableProperties()->creation_time;
- if (creation_time > 0 && creation_time < (current_time - ttl)) {
+ for (FileMetaData* f : files_[level]) {
+ if (!f->being_compacted) {
+ uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
+ if (oldest_ancester_time > 0 &&
+ oldest_ancester_time < (current_time - ttl)) {
expired_ttl_files_.emplace_back(level, f);
}
}
}
}
+void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
+ const ImmutableCFOptions& ioptions,
+ const uint64_t periodic_compaction_seconds) {
+ assert(periodic_compaction_seconds > 0);
+
+ files_marked_for_periodic_compaction_.clear();
+
+ int64_t temp_current_time;
+ auto status = ioptions.env->GetCurrentTime(&temp_current_time);
+ if (!status.ok()) {
+ return;
+ }
+ const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
+
+ // If periodic_compaction_seconds is larger than current time, periodic
+ // compaction can't possibly be triggered.
+ if (periodic_compaction_seconds > current_time) {
+ return;
+ }
+
+ const uint64_t allowed_time_limit =
+ current_time - periodic_compaction_seconds;
+
+ for (int level = 0; level < num_levels(); level++) {
+ for (auto f : files_[level]) {
+ if (!f->being_compacted) {
+ // Compute a file's modification time in the following order:
+ // 1. Use file_creation_time table property if it is > 0.
+ // 2. Use creation_time table property if it is > 0.
+ // 3. Use file's mtime metadata if the above two table properties are 0.
+ // Don't consider the file at all if the modification time cannot be
+ // correctly determined based on the above conditions.
+ uint64_t file_modification_time = f->TryGetFileCreationTime();
+ if (file_modification_time == kUnknownFileCreationTime) {
+ file_modification_time = f->TryGetOldestAncesterTime();
+ }
+ if (file_modification_time == kUnknownOldestAncesterTime) {
+ auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
+ f->fd.GetPathId());
+ status = ioptions.env->GetFileModificationTime(
+ file_path, &file_modification_time);
+ if (!status.ok()) {
+ ROCKS_LOG_WARN(ioptions.info_log,
+ "Can't get file modification time: %s: %s",
+ file_path.c_str(), status.ToString().c_str());
+ continue;
+ }
+ }
+ if (file_modification_time > 0 &&
+ file_modification_time < allowed_time_limit) {
+ files_marked_for_periodic_compaction_.emplace_back(level, f);
+ }
+ }
+ }
+ }
+}
+
namespace {
// used to sort files by size
FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
const Slice file_start = ExtractUserKey(f->smallest_key);
const Slice file_limit = ExtractUserKey(f->largest_key);
- if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
+ if (begin != nullptr &&
+ user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
// "f" is completely before specified range; skip it
iter++;
} else if (end != nullptr &&
- user_cmp->Compare(file_start, user_end) > 0) {
+ user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
// "f" is completely after specified range; skip it
iter++;
} else {
iter = index.erase(iter);
if (expand_range) {
if (begin != nullptr &&
- user_cmp->Compare(file_start, user_begin) < 0) {
+ user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
user_begin = file_start;
}
- if (end != nullptr && user_cmp->Compare(file_limit, user_end) > 0) {
+ if (end != nullptr &&
+ user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
user_end = file_limit;
}
}
return;
}
- const auto& level_files = level_files_brief_[level];
- if (begin == nullptr) {
- begin = &level_files.files[0].file_metadata->smallest;
- }
- if (end == nullptr) {
- end = &level_files.files[level_files.num_files - 1].file_metadata->largest;
- }
-
GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
hint_index, file_index,
true /* within_interval */);
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
bool within_interval, InternalKey** next_smallest) const {
assert(level > 0);
- int min = 0;
- int mid = 0;
- int max = static_cast<int>(files_[level].size()) - 1;
- bool foundOverlap = false;
+
auto user_cmp = user_comparator_;
+ const FdWithKeyRange* files = level_files_brief_[level].files;
+ const int num_files = static_cast<int>(level_files_brief_[level].num_files);
- // if the caller already knows the index of a file that has overlap,
- // then we can skip the binary search.
- if (hint_index != -1) {
- mid = hint_index;
- foundOverlap = true;
- }
-
- while (!foundOverlap && min <= max) {
- mid = (min + max)/2;
- FdWithKeyRange* f = &(level_files_brief_[level].files[mid]);
- auto& smallest = f->file_metadata->smallest;
- auto& largest = f->file_metadata->largest;
- if ((!within_interval && sstableKeyCompare(user_cmp, begin, largest) > 0) ||
- (within_interval && sstableKeyCompare(user_cmp, begin, smallest) > 0)) {
- min = mid + 1;
- } else if ((!within_interval &&
- sstableKeyCompare(user_cmp, smallest, end) > 0) ||
- (within_interval &&
- sstableKeyCompare(user_cmp, largest, end) > 0)) {
- max = mid - 1;
- } else {
- foundOverlap = true;
- break;
+ // begin to use binary search to find lower bound
+ // and upper bound.
+ int start_index = 0;
+ int end_index = num_files;
+
+ if (begin != nullptr) {
+ // if within_interval is true, with file_key would find
+ // not overlapping ranges in std::lower_bound.
+ auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
+ const InternalKey* k) {
+ auto& file_key = within_interval ? f.file_metadata->smallest
+ : f.file_metadata->largest;
+ return sstableKeyCompare(user_cmp, file_key, *k) < 0;
+ };
+
+ start_index = static_cast<int>(
+ std::lower_bound(files,
+ files + (hint_index == -1 ? num_files : hint_index),
+ begin, cmp) -
+ files);
+
+ if (start_index > 0 && within_interval) {
+ bool is_overlapping = true;
+ while (is_overlapping && start_index < num_files) {
+ auto& pre_limit = files[start_index - 1].file_metadata->largest;
+ auto& cur_start = files[start_index].file_metadata->smallest;
+ is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
+ start_index += is_overlapping;
+ }
}
}
+ if (end != nullptr) {
+ // if within_interval is true, with file_key would find
+ // not overlapping ranges in std::upper_bound.
+ auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
+ const FdWithKeyRange& f) {
+ auto& file_key = within_interval ? f.file_metadata->largest
+ : f.file_metadata->smallest;
+ return sstableKeyCompare(user_cmp, *k, file_key) < 0;
+ };
+
+ end_index = static_cast<int>(
+ std::upper_bound(files + start_index, files + num_files, end, cmp) -
+ files);
+
+ if (end_index < num_files && within_interval) {
+ bool is_overlapping = true;
+ while (is_overlapping && end_index > start_index) {
+ auto& next_start = files[end_index].file_metadata->smallest;
+ auto& cur_limit = files[end_index - 1].file_metadata->largest;
+ is_overlapping =
+ sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
+ end_index -= is_overlapping;
+ }
+ }
+ }
+
+ assert(start_index <= end_index);
+
// If there were no overlapping files, return immediately.
- if (!foundOverlap) {
+ if (start_index == end_index) {
if (next_smallest) {
*next_smallest = nullptr;
}
return;
}
+
+ assert(start_index < end_index);
+
// returns the index where an overlap is found
if (file_index) {
- *file_index = mid;
+ *file_index = start_index;
}
- int start_index, end_index;
- if (within_interval) {
- ExtendFileRangeWithinInterval(level, begin, end, mid,
- &start_index, &end_index);
- } else {
- ExtendFileRangeOverlappingInterval(level, begin, end, mid,
- &start_index, &end_index);
- assert(end_index >= start_index);
- }
// insert overlapping files into vector
- for (int i = start_index; i <= end_index; i++) {
+ for (int i = start_index; i < end_index; i++) {
inputs->push_back(files_[level][i]);
}
if (next_smallest != nullptr) {
// Provide the next key outside the range covered by inputs
- if (++end_index < static_cast<int>(files_[level].size())) {
+ if (end_index < static_cast<int>(files_[level].size())) {
**next_smallest = files_[level][end_index]->smallest;
} else {
*next_smallest = nullptr;
}
}
-// Store in *start_index and *end_index the range of all files in
-// "level" that overlap [begin,end]
-// The mid_index specifies the index of at least one file that
-// overlaps the specified range. From that file, iterate backward
-// and forward to find all overlapping files.
-// Use FileLevel in searching, make it faster
-void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
- int level, const InternalKey* begin, const InternalKey* end,
- unsigned int mid_index, int* start_index, int* end_index) const {
- auto user_cmp = user_comparator_;
- const FdWithKeyRange* files = level_files_brief_[level].files;
-#ifndef NDEBUG
- {
- // assert that the file at mid_index overlaps with the range
- assert(mid_index < level_files_brief_[level].num_files);
- const FdWithKeyRange* f = &files[mid_index];
- auto& smallest = f->file_metadata->smallest;
- auto& largest = f->file_metadata->largest;
- if (sstableKeyCompare(user_cmp, begin, smallest) <= 0) {
- assert(sstableKeyCompare(user_cmp, smallest, end) <= 0);
- } else {
- // fprintf(stderr, "ExtendFileRangeOverlappingInterval\n%s - %s\n%s - %s\n%d %d\n",
- // begin ? begin->DebugString().c_str() : "(null)",
- // end ? end->DebugString().c_str() : "(null)",
- // smallest->DebugString().c_str(),
- // largest->DebugString().c_str(),
- // sstableKeyCompare(user_cmp, smallest, begin),
- // sstableKeyCompare(user_cmp, largest, begin));
- assert(sstableKeyCompare(user_cmp, begin, largest) <= 0);
- }
- }
-#endif
- *start_index = mid_index + 1;
- *end_index = mid_index;
- int count __attribute__((__unused__));
- count = 0;
-
- // check backwards from 'mid' to lower indices
- for (int i = mid_index; i >= 0 ; i--) {
- const FdWithKeyRange* f = &files[i];
- auto& largest = f->file_metadata->largest;
- if (sstableKeyCompare(user_cmp, begin, largest) <= 0) {
- *start_index = i;
- assert((count++, true));
- } else {
- break;
- }
- }
- // check forward from 'mid+1' to higher indices
- for (unsigned int i = mid_index+1;
- i < level_files_brief_[level].num_files; i++) {
- const FdWithKeyRange* f = &files[i];
- auto& smallest = f->file_metadata->smallest;
- if (sstableKeyCompare(user_cmp, smallest, end) <= 0) {
- assert((count++, true));
- *end_index = i;
- } else {
- break;
- }
- }
- assert(count == *end_index - *start_index + 1);
-}
-
-// Store in *start_index and *end_index the clean range of all files in
-// "level" within [begin,end]
-// The mid_index specifies the index of at least one file within
-// the specified range. From that file, iterate backward
-// and forward to find all overlapping files and then "shrink" to
-// the clean range required.
-// Use FileLevel in searching, make it faster
-void VersionStorageInfo::ExtendFileRangeWithinInterval(
- int level, const InternalKey* begin, const InternalKey* end,
- unsigned int mid_index, int* start_index, int* end_index) const {
- assert(level != 0);
- auto* user_cmp = user_comparator_;
- const FdWithKeyRange* files = level_files_brief_[level].files;
-#ifndef NDEBUG
- {
- // assert that the file at mid_index is within the range
- assert(mid_index < level_files_brief_[level].num_files);
- const FdWithKeyRange* f = &files[mid_index];
- auto& smallest = f->file_metadata->smallest;
- auto& largest = f->file_metadata->largest;
- assert(sstableKeyCompare(user_cmp, begin, smallest) <= 0 &&
- sstableKeyCompare(user_cmp, largest, end) <= 0);
- }
-#endif
- ExtendFileRangeOverlappingInterval(level, begin, end, mid_index,
- start_index, end_index);
- int left = *start_index;
- int right = *end_index;
- // shrink from left to right
- while (left <= right) {
- auto& smallest = files[left].file_metadata->smallest;
- if (sstableKeyCompare(user_cmp, begin, smallest) > 0) {
- left++;
- continue;
- }
- if (left > 0) { // If not first file
- auto& largest = files[left - 1].file_metadata->largest;
- if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
- left++;
- continue;
- }
- }
- break;
- }
- // shrink from right to left
- while (left <= right) {
- auto& largest = files[right].file_metadata->largest;
- if (sstableKeyCompare(user_cmp, largest, end) > 0) {
- right--;
- continue;
- }
- if (right < static_cast<int>(level_files_brief_[level].num_files) -
- 1) { // If not the last file
- auto& smallest = files[right + 1].file_metadata->smallest;
- if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
- // The last user key in range overlaps with the next file's first key
- right--;
- continue;
- }
- }
- break;
- }
-
- *start_index = left;
- *end_index = right;
-}
-
uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < num_levels());
// base_bytes_min. We set it be base_bytes_min.
base_level_size = base_bytes_min + 1U;
base_level_ = first_non_empty_level;
- ROCKS_LOG_WARN(ioptions.info_log,
+ ROCKS_LOG_INFO(ioptions.info_log,
"More existing levels in DB than needed. "
"max_bytes_for_level_multiplier may not be guaranteed.");
} else {
for (int level = 0; level < storage_info_.num_levels_; level++) {
// E.g.,
// --- level 1 ---
- // 17:123['a' .. 'd']
- // 20:43['e' .. 'g']
+ // 17:123[1 .. 124]['a' .. 'd']
+ // 20:43[124 .. 128]['e' .. 'g']
//
// if print_stats=true:
- // 17:123['a' .. 'd'](4096)
+ // 17:123[1 .. 124]['a' .. 'd'](4096)
r.append("--- level ");
AppendNumberTo(&r, level);
r.append(" --- version# ");
r.push_back(':');
AppendNumberTo(&r, files[i]->fd.GetFileSize());
r.append("[");
+ AppendNumberTo(&r, files[i]->fd.smallest_seqno);
+ r.append(" .. ");
+ AppendNumberTo(&r, files[i]->fd.largest_seqno);
+ r.append("]");
+ r.append("[");
r.append(files[i]->smallest.DebugString(hex));
r.append(" .. ");
r.append(files[i]->largest.DebugString(hex));
r.append("]");
+ if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
+ r.append(" blob_file:");
+ AppendNumberTo(&r, files[i]->oldest_blob_file_number);
+ }
if (print_stats) {
r.append("(");
r.append(ToString(
edit_list(e) {}
};
+Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
+ assert(edit);
+ if (edit->is_in_atomic_group_) {
+ TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
+ if (replay_buffer_.empty()) {
+ replay_buffer_.resize(edit->remaining_entries_ + 1);
+ TEST_SYNC_POINT_CALLBACK(
+ "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
+ }
+ read_edits_in_atomic_group_++;
+ if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
+ static_cast<uint32_t>(replay_buffer_.size())) {
+ TEST_SYNC_POINT_CALLBACK(
+ "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
+ return Status::Corruption("corrupted atomic group");
+ }
+ replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
+ if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
+ TEST_SYNC_POINT_CALLBACK(
+ "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
+ return Status::OK();
+ }
+ return Status::OK();
+ }
+
+ // A normal edit.
+ if (!replay_buffer().empty()) {
+ TEST_SYNC_POINT_CALLBACK(
+ "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
+ return Status::Corruption("corrupted atomic group");
+ }
+ return Status::OK();
+}
+
+bool AtomicGroupReadBuffer::IsFull() const {
+ return read_edits_in_atomic_group_ == replay_buffer_.size();
+}
+
+bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
+
+void AtomicGroupReadBuffer::Clear() {
+ read_edits_in_atomic_group_ = 0;
+ replay_buffer_.clear();
+}
+
VersionSet::VersionSet(const std::string& dbname,
const ImmutableDBOptions* _db_options,
- const EnvOptions& storage_options, Cache* table_cache,
+ const FileOptions& storage_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
- WriteController* write_controller)
- : column_family_set_(
- new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
- write_buffer_manager, write_controller)),
+ WriteController* write_controller,
+ BlockCacheTracer* const block_cache_tracer)
+ : column_family_set_(new ColumnFamilySet(
+ dbname, _db_options, storage_options, table_cache,
+ write_buffer_manager, write_controller, block_cache_tracer)),
env_(_db_options->env),
+ fs_(_db_options->fs.get()),
dbname_(dbname),
db_options_(_db_options),
next_file_number_(2),
prev_log_number_(0),
current_version_number_(0),
manifest_file_size_(0),
- env_options_(storage_options) {}
-
-void CloseTables(void* ptr, size_t) {
- TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
- table_reader->Close();
-}
+ file_options_(storage_options),
+ block_cache_tracer_(block_cache_tracer) {}
VersionSet::~VersionSet() {
// we need to delete column_family_set_ because its destructor depends on
// VersionSet
Cache* table_cache = column_family_set_->get_table_cache();
- table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */);
column_family_set_.reset();
for (auto& file : obsolete_files_) {
if (file.metadata->table_reader_handle) {
}
}
if (version == nullptr) {
- version = new Version(last_writer->cfd, this, env_options_,
+ version = new Version(last_writer->cfd, this, file_options_,
last_writer->mutable_cf_options,
current_version_number_++);
versions.push_back(version);
} else if (group_start != std::numeric_limits<size_t>::max()) {
group_start = std::numeric_limits<size_t>::max();
}
- LogAndApplyHelper(last_writer->cfd, builder, e, mu);
+ Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
+ if (!s.ok()) {
+ // free up the allocated memory
+ for (auto v : versions) {
+ delete v;
+ }
+ return s;
+ }
batch_edits.push_back(e);
}
}
assert(!builder_guards.empty() &&
builder_guards.size() == versions.size());
auto* builder = builder_guards[i]->version_builder();
- builder->SaveTo(versions[i]->storage_info());
+ Status s = builder->SaveTo(versions[i]->storage_info());
+ if (!s.ok()) {
+ // free up the allocated memory
+ for (auto v : versions) {
+ delete v;
+ }
+ return s;
+ }
}
}
if (!descriptor_log_ ||
manifest_file_size_ > db_options_->max_manifest_file_size) {
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
- pending_manifest_file_number_ = NewFileNumber();
- batch_edits.back()->SetNextFile(next_file_number_.load());
new_descriptor_log = true;
} else {
pending_manifest_file_number_ = manifest_file_number_;
}
+ // Local cached copy of state variable(s). WriteCurrentStateToManifest()
+ // reads its content after releasing db mutex to avoid race with
+ // SwitchMemtable().
+ std::unordered_map<uint32_t, MutableCFState> curr_state;
if (new_descriptor_log) {
+ pending_manifest_file_number_ = NewFileNumber();
+ batch_edits.back()->SetNextFile(next_file_number_.load());
+
// if we are writing out new snapshot make sure to persist max column
// family.
if (column_family_set_->GetMaxColumnFamily() > 0) {
first_writer.edit_list.front()->SetMaxColumnFamily(
column_family_set_->GetMaxColumnFamily());
}
+ for (const auto* cfd : *column_family_set_) {
+ assert(curr_state.find(cfd->GetID()) == curr_state.end());
+ curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
+ }
}
{
- EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
+ FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
mu->Unlock();
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
assert(!mutable_cf_options_ptrs.empty() &&
builder_guards.size() == versions.size());
ColumnFamilyData* cfd = versions[i]->cfd_;
- builder_guards[i]->version_builder()->LoadTableHandlers(
+ s = builder_guards[i]->version_builder()->LoadTableHandlers(
cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
true /* prefetch_index_and_filter_in_cache */,
false /* is_initial_load */,
mutable_cf_options_ptrs[i]->prefix_extractor.get());
+ if (!s.ok()) {
+ if (db_options_->paranoid_checks) {
+ break;
+ }
+ s = Status::OK();
+ }
}
}
- // This is fine because everything inside of this block is serialized --
- // only one thread can be here at the same time
- if (new_descriptor_log) {
+ if (s.ok() && new_descriptor_log) {
+ // This is fine because everything inside of this block is serialized --
+ // only one thread can be here at the same time
// create new manifest file
ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
pending_manifest_file_number_);
std::string descriptor_fname =
DescriptorFileName(dbname_, pending_manifest_file_number_);
- std::unique_ptr<WritableFile> descriptor_file;
- s = NewWritableFile(env_, descriptor_fname, &descriptor_file,
- opt_env_opts);
+ std::unique_ptr<FSWritableFile> descriptor_file;
+ s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
+ opt_file_opts);
if (s.ok()) {
descriptor_file->SetPreallocationBlockSize(
db_options_->manifest_preallocation_size);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
- std::move(descriptor_file), descriptor_fname, opt_env_opts, env_,
+ std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
nullptr, db_options_->listeners));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
- s = WriteSnapshot(descriptor_log_.get());
+ s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
}
}
- if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
- for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
- versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
+ if (s.ok()) {
+ if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
+ for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
+ versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
+ }
}
- }
- // Write new records to MANIFEST log
- if (s.ok()) {
+ // Write new records to MANIFEST log
#ifndef NDEBUG
size_t idx = 0;
#endif
rocksdb_kill_odds * REDUCE_ODDS2);
#ifndef NDEBUG
if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
- TEST_SYNC_POINT(
- "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0");
+ TEST_SYNC_POINT_CALLBACK(
+ "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
+ nullptr);
TEST_SYNC_POINT(
"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
}
} else if (first_writer.edit_list.front()->is_column_family_drop_) {
assert(batch_edits.size() == 1);
first_writer.cfd->SetDropped();
- if (first_writer.cfd->Unref()) {
- delete first_writer.cfd;
- }
+ first_writer.cfd->UnrefAndTryDelete();
} else {
// Each version in versions corresponds to a column family.
// For each column family, update its log number indicating that logs
for (auto v : versions) {
delete v;
}
+ // If manifest append failed for whatever reason, the file could be
+ // corrupted. So we need to force the next version update to start a
+ // new manifest file.
+ descriptor_log_.reset();
if (new_descriptor_log) {
ROCKS_LOG_INFO(db_options_->info_log,
"Deleting manifest %" PRIu64 " current manifest %" PRIu64
"\n",
manifest_file_number_, pending_manifest_file_number_);
- descriptor_log_.reset();
env_->DeleteFile(
DescriptorFileName(dbname_, pending_manifest_file_number_));
}
}
}
if (0 == num_undropped_cfds) {
- // TODO (yanqin) maybe use a different status code to denote column family
- // drop other than OK and ShutdownInProgress
for (int i = 0; i != num_cfds; ++i) {
manifest_writers_.pop_front();
}
if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
- return Status::ShutdownInProgress();
+ return Status::ColumnFamilyDropped();
}
return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
}
}
-void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
- VersionBuilder* builder, VersionEdit* edit,
- InstrumentedMutex* mu) {
+Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
+ VersionBuilder* builder, VersionEdit* edit,
+ InstrumentedMutex* mu) {
#ifdef NDEBUG
(void)cfd;
#endif
edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
: last_sequence_);
- builder->Apply(edit);
+ Status s = builder->Apply(edit);
+
+ return s;
}
Status VersionSet::ApplyOneVersionEditToBuilder(
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
builders,
- bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
- uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
- bool* have_last_sequence, SequenceNumber* last_sequence,
- uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
+ VersionEditParams* version_edit_params) {
// Not found means that user didn't supply that column
// family option AND we encountered column family add
// record. Once we encounter column family drop record,
edit.column_family_name_);
}
auto cf_options = name_to_options.find(edit.column_family_name_);
- if (cf_options == name_to_options.end()) {
+ // implicitly add persistent_stats column family without requiring user
+ // to specify
+ bool is_persistent_stats_column_family =
+ edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
+ if (cf_options == name_to_options.end() &&
+ !is_persistent_stats_column_family) {
column_families_not_found.insert(
{edit.column_family_, edit.column_family_name_});
} else {
- cfd = CreateColumnFamily(cf_options->second, &edit);
+ // recover persistent_stats CF from a DB that already contains it
+ if (is_persistent_stats_column_family) {
+ ColumnFamilyOptions cfo;
+ OptimizeForPersistentStats(&cfo);
+ cfd = CreateColumnFamily(cfo, &edit);
+ } else {
+ cfd = CreateColumnFamily(cf_options->second, &edit);
+ }
cfd->set_initialized();
builders.insert(std::make_pair(
edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
builders.erase(builder);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
assert(cfd != nullptr);
- if (cfd->Unref()) {
- delete cfd;
+ if (cfd->UnrefAndTryDelete()) {
cfd = nullptr;
} else {
// who else can have reference to cfd!?
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
- builder->second->version_builder()->Apply(&edit);
+ Status s = builder->second->version_builder()->Apply(&edit);
+ if (!s.ok()) {
+ return s;
+ }
}
- return ExtractInfoFromVersionEdit(
- cfd, edit, have_log_number, log_number, have_prev_log_number,
- previous_log_number, have_next_file, next_file, have_last_sequence,
- last_sequence, min_log_number_to_keep, max_column_family);
+ return ExtractInfoFromVersionEdit(cfd, edit, version_edit_params);
}
Status VersionSet::ExtractInfoFromVersionEdit(
- ColumnFamilyData* cfd, const VersionEdit& edit, bool* have_log_number,
- uint64_t* log_number, bool* have_prev_log_number,
- uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
- bool* have_last_sequence, SequenceNumber* last_sequence,
- uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
+ ColumnFamilyData* cfd, const VersionEdit& from_edit,
+ VersionEditParams* version_edit_params) {
if (cfd != nullptr) {
- if (edit.has_log_number_) {
- if (cfd->GetLogNumber() > edit.log_number_) {
+ if (from_edit.has_db_id_) {
+ version_edit_params->SetDBId(from_edit.db_id_);
+ }
+ if (from_edit.has_log_number_) {
+ if (cfd->GetLogNumber() > from_edit.log_number_) {
ROCKS_LOG_WARN(
db_options_->info_log,
"MANIFEST corruption detected, but ignored - Log numbers in "
"records NOT monotonically increasing");
} else {
- cfd->SetLogNumber(edit.log_number_);
- *have_log_number = true;
- *log_number = edit.log_number_;
+ cfd->SetLogNumber(from_edit.log_number_);
+ version_edit_params->SetLogNumber(from_edit.log_number_);
}
}
- if (edit.has_comparator_ &&
- edit.comparator_ != cfd->user_comparator()->Name()) {
+ if (from_edit.has_comparator_ &&
+ from_edit.comparator_ != cfd->user_comparator()->Name()) {
return Status::InvalidArgument(
cfd->user_comparator()->Name(),
- "does not match existing comparator " + edit.comparator_);
+ "does not match existing comparator " + from_edit.comparator_);
}
}
- if (edit.has_prev_log_number_) {
- *previous_log_number = edit.prev_log_number_;
- *have_prev_log_number = true;
+ if (from_edit.has_prev_log_number_) {
+ version_edit_params->SetPrevLogNumber(from_edit.prev_log_number_);
}
- if (edit.has_next_file_number_) {
- *next_file = edit.next_file_number_;
- *have_next_file = true;
+ if (from_edit.has_next_file_number_) {
+ version_edit_params->SetNextFile(from_edit.next_file_number_);
}
- if (edit.has_max_column_family_) {
- *max_column_family = edit.max_column_family_;
+ if (from_edit.has_max_column_family_) {
+ version_edit_params->SetMaxColumnFamily(from_edit.max_column_family_);
}
- if (edit.has_min_log_number_to_keep_) {
- *min_log_number_to_keep =
- std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_);
+ if (from_edit.has_min_log_number_to_keep_) {
+ version_edit_params->min_log_number_to_keep_ =
+ std::max(version_edit_params->min_log_number_to_keep_,
+ from_edit.min_log_number_to_keep_);
}
- if (edit.has_last_sequence_) {
- *last_sequence = edit.last_sequence_;
- *have_last_sequence = true;
+ if (from_edit.has_last_sequence_) {
+ version_edit_params->SetLastSequence(from_edit.last_sequence_);
}
return Status::OK();
}
-Status VersionSet::GetCurrentManifestPath(std::string* manifest_path) {
+Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
+ FileSystem* fs,
+ std::string* manifest_path,
+ uint64_t* manifest_file_number) {
+ assert(fs != nullptr);
assert(manifest_path != nullptr);
+ assert(manifest_file_number != nullptr);
+
std::string fname;
- Status s = ReadFileToString(env_, CurrentFileName(dbname_), &fname);
+ Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
if (!s.ok()) {
return s;
}
// remove the trailing '\n'
fname.resize(fname.size() - 1);
FileType type;
- bool parse_ok = ParseFileName(fname, &manifest_file_number_, &type);
+ bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
if (!parse_ok || type != kDescriptorFile) {
return Status::Corruption("CURRENT file corrupted");
}
- *manifest_path = dbname_;
- if (dbname_.back() != '/') {
+ *manifest_path = dbname;
+ if (dbname.back() != '/') {
manifest_path->push_back('/');
}
*manifest_path += fname;
return Status::OK();
}
+Status VersionSet::ReadAndRecover(
+ log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
+ const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
+ std::unordered_map<int, std::string>& column_families_not_found,
+ std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
+ builders,
+ VersionEditParams* version_edit_params, std::string* db_id) {
+ assert(reader != nullptr);
+ assert(read_buffer != nullptr);
+ Status s;
+ Slice record;
+ std::string scratch;
+ size_t recovered_edits = 0;
+ while (reader->ReadRecord(&record, &scratch) && s.ok()) {
+ VersionEdit edit;
+ s = edit.DecodeFrom(record);
+ if (!s.ok()) {
+ break;
+ }
+ if (edit.has_db_id_) {
+ db_id_ = edit.GetDbId();
+ if (db_id != nullptr) {
+ db_id->assign(edit.GetDbId());
+ }
+ }
+ s = read_buffer->AddEdit(&edit);
+ if (!s.ok()) {
+ break;
+ }
+ if (edit.is_in_atomic_group_) {
+ if (read_buffer->IsFull()) {
+ // Apply edits in an atomic group when we have read all edits in the
+ // group.
+ for (auto& e : read_buffer->replay_buffer()) {
+ s = ApplyOneVersionEditToBuilder(e, name_to_options,
+ column_families_not_found, builders,
+ version_edit_params);
+ if (!s.ok()) {
+ break;
+ }
+ recovered_edits++;
+ }
+ if (!s.ok()) {
+ break;
+ }
+ read_buffer->Clear();
+ }
+ } else {
+ // Apply a normal edit immediately.
+ s = ApplyOneVersionEditToBuilder(edit, name_to_options,
+ column_families_not_found, builders,
+ version_edit_params);
+ if (s.ok()) {
+ recovered_edits++;
+ }
+ }
+ }
+ if (!s.ok()) {
+ // Clear the buffer if we fail to decode/apply an edit.
+ read_buffer->Clear();
+ }
+ TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits",
+ &recovered_edits);
+ return s;
+}
+
Status VersionSet::Recover(
- const std::vector<ColumnFamilyDescriptor>& column_families,
- bool read_only) {
+ const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
+ std::string* db_id) {
std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
- for (auto cf : column_families) {
- cf_name_to_options.insert({cf.name, cf.options});
+ for (const auto& cf : column_families) {
+ cf_name_to_options.emplace(cf.name, cf.options);
}
// keeps track of column families in manifest that were not found in
// column families parameters. if those column families are not dropped
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string manifest_path;
- Status s = GetCurrentManifestPath(&manifest_path);
+ Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
+ &manifest_file_number_);
if (!s.ok()) {
return s;
}
std::unique_ptr<SequentialFileReader> manifest_file_reader;
{
- std::unique_ptr<SequentialFile> manifest_file;
- s = env_->NewSequentialFile(manifest_path, &manifest_file,
- env_->OptimizeForManifestRead(env_options_));
+ std::unique_ptr<FSSequentialFile> manifest_file;
+ s = fs_->NewSequentialFile(manifest_path,
+ fs_->OptimizeForManifestRead(file_options_),
+ &manifest_file, nullptr);
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(
- new SequentialFileReader(std::move(manifest_file), manifest_path));
- }
- uint64_t current_manifest_file_size;
- s = env_->GetFileSize(manifest_path, ¤t_manifest_file_size);
- if (!s.ok()) {
- return s;
+ new SequentialFileReader(std::move(manifest_file), manifest_path,
+ db_options_->log_readahead_size));
}
- bool have_log_number = false;
- bool have_prev_log_number = false;
- bool have_next_file = false;
- bool have_last_sequence = false;
- uint64_t next_file = 0;
- uint64_t last_sequence = 0;
- uint64_t log_number = 0;
- uint64_t previous_log_number = 0;
- uint32_t max_column_family = 0;
- uint64_t min_log_number_to_keep = 0;
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
builders;
builders.insert(
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
new BaseReferencedVersionBuilder(default_cfd))));
-
+ uint64_t current_manifest_file_size = 0;
+ VersionEditParams version_edit_params;
{
VersionSet::LogReporter reporter;
reporter.status = &s;
true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
- std::vector<VersionEdit> replay_buffer;
- size_t num_entries_decoded = 0;
- while (reader.ReadRecord(&record, &scratch) && s.ok()) {
- VersionEdit edit;
- s = edit.DecodeFrom(record);
- if (!s.ok()) {
- break;
- }
-
- if (edit.is_in_atomic_group_) {
- if (replay_buffer.empty()) {
- replay_buffer.resize(edit.remaining_entries_ + 1);
- TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:FirstInAtomicGroup",
- &edit);
- }
- ++num_entries_decoded;
- if (num_entries_decoded + edit.remaining_entries_ !=
- static_cast<uint32_t>(replay_buffer.size())) {
- TEST_SYNC_POINT_CALLBACK(
- "VersionSet::Recover:IncorrectAtomicGroupSize", &edit);
- s = Status::Corruption("corrupted atomic group");
- break;
- }
- replay_buffer[num_entries_decoded - 1] = std::move(edit);
- if (num_entries_decoded == replay_buffer.size()) {
- TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup",
- &edit);
- for (auto& e : replay_buffer) {
- s = ApplyOneVersionEditToBuilder(
- e, cf_name_to_options, column_families_not_found, builders,
- &have_log_number, &log_number, &have_prev_log_number,
- &previous_log_number, &have_next_file, &next_file,
- &have_last_sequence, &last_sequence, &min_log_number_to_keep,
- &max_column_family);
- if (!s.ok()) {
- break;
- }
- }
- replay_buffer.clear();
- num_entries_decoded = 0;
- }
- TEST_SYNC_POINT("VersionSet::Recover:AtomicGroup");
- } else {
- if (!replay_buffer.empty()) {
- TEST_SYNC_POINT_CALLBACK(
- "VersionSet::Recover:AtomicGroupMixedWithNormalEdits", &edit);
- s = Status::Corruption("corrupted atomic group");
- break;
- }
- s = ApplyOneVersionEditToBuilder(
- edit, cf_name_to_options, column_families_not_found, builders,
- &have_log_number, &log_number, &have_prev_log_number,
- &previous_log_number, &have_next_file, &next_file,
- &have_last_sequence, &last_sequence, &min_log_number_to_keep,
- &max_column_family);
- }
- if (!s.ok()) {
- break;
- }
- }
+ AtomicGroupReadBuffer read_buffer;
+ s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
+ column_families_not_found, builders,
+ &version_edit_params, db_id);
+ current_manifest_file_size = reader.GetReadOffset();
+ assert(current_manifest_file_size != 0);
}
if (s.ok()) {
- if (!have_next_file) {
+ if (!version_edit_params.has_next_file_number_) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
- } else if (!have_log_number) {
+ } else if (!version_edit_params.has_log_number_) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
- } else if (!have_last_sequence) {
+ } else if (!version_edit_params.has_last_sequence_) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
- if (!have_prev_log_number) {
- previous_log_number = 0;
+ if (!version_edit_params.has_prev_log_number_) {
+ version_edit_params.SetPrevLogNumber(0);
}
- column_family_set_->UpdateMaxColumnFamily(max_column_family);
+ column_family_set_->UpdateMaxColumnFamily(
+ version_edit_params.max_column_family_);
// When reading DB generated using old release, min_log_number_to_keep=0.
// All log files will be scanned for potential prepare entries.
- MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
- MarkFileNumberUsed(previous_log_number);
- MarkFileNumberUsed(log_number);
+ MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
+ MarkFileNumberUsed(version_edit_params.prev_log_number_);
+ MarkFileNumberUsed(version_edit_params.log_number_);
}
// there were some column families in the MANIFEST that weren't specified
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
- builder->LoadTableHandlers(
+ s = builder->LoadTableHandlers(
cfd->internal_stats(), db_options_->max_file_opening_threads,
false /* prefetch_index_and_filter_in_cache */,
true /* is_initial_load */,
cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
+ if (!s.ok()) {
+ if (db_options_->paranoid_checks) {
+ return s;
+ }
+ s = Status::OK();
+ }
- Version* v = new Version(cfd, this, env_options_,
+ Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
}
manifest_file_size_ = current_manifest_file_size;
- next_file_number_.store(next_file + 1);
- last_allocated_sequence_ = last_sequence;
- last_published_sequence_ = last_sequence;
- last_sequence_ = last_sequence;
- prev_log_number_ = previous_log_number;
+ next_file_number_.store(version_edit_params.next_file_number_ + 1);
+ last_allocated_sequence_ = version_edit_params.last_sequence_;
+ last_published_sequence_ = version_edit_params.last_sequence_;
+ last_sequence_ = version_edit_params.last_sequence_;
+ prev_log_number_ = version_edit_params.prev_log_number_;
ROCKS_LOG_INFO(
db_options_->info_log,
", last_sequence is %" PRIu64 ", log_number is %" PRIu64
",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
",min_log_number_to_keep is %" PRIu64 "\n",
- manifest_path.c_str(), manifest_file_number_,
- next_file_number_.load(), last_sequence_.load(), log_number,
+ manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
+ last_sequence_.load(), version_edit_params.log_number_,
prev_log_number_, column_family_set_->GetMaxColumnFamily(),
min_log_number_to_keep_2pc());
}
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
- const std::string& dbname, Env* env) {
+ const std::string& dbname,
+ FileSystem* fs) {
// these are just for performance reasons, not correcntes,
// so we're fine using the defaults
- EnvOptions soptions;
+ FileOptions soptions;
// Read "CURRENT" file, which contains a pointer to the current manifest file
- std::string current;
- Status s = ReadFileToString(env, CurrentFileName(dbname), ¤t);
+ std::string manifest_path;
+ uint64_t manifest_file_number;
+ Status s =
+ GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
if (!s.ok()) {
return s;
}
- if (current.empty() || current[current.size()-1] != '\n') {
- return Status::Corruption("CURRENT file does not end with newline");
- }
- current.resize(current.size() - 1);
-
- std::string dscname = dbname + "/" + current;
std::unique_ptr<SequentialFileReader> file_reader;
{
- std::unique_ptr<SequentialFile> file;
- s = env->NewSequentialFile(dscname, &file, soptions);
+ std::unique_ptr<FSSequentialFile> file;
+ s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
if (!s.ok()) {
return s;
}
- file_reader.reset(new SequentialFileReader(std::move(file), dscname));
+ file_reader.reset(new SequentialFileReader(std::move(file), manifest_path));
}
std::map<uint32_t, std::string> column_family_names;
#ifndef ROCKSDB_LITE
Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
const Options* options,
- const EnvOptions& env_options,
+ const FileOptions& file_options,
int new_levels) {
if (new_levels <= 1) {
return Status::InvalidArgument(
options->table_cache_numshardbits));
WriteController wc(options->delayed_write_rate);
WriteBufferManager wb(options->db_write_buffer_size);
- VersionSet versions(dbname, &db_options, env_options, tc.get(), &wb, &wc);
+ VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
+ /*block_cache_tracer=*/nullptr);
Status status;
std::vector<ColumnFamilyDescriptor> dummy;
}
// we need to allocate an array with the old number of levels size to
- // avoid SIGSEGV in WriteSnapshot()
+ // avoid SIGSEGV in WriteCurrentStatetoManifest()
// however, all levels bigger or equal to new_levels will be empty
std::vector<FileMetaData*>* new_files_list =
new std::vector<FileMetaData*>[current_levels];
mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
}
+// Get the checksum information including the checksum and checksum function
+// name of all SST files in VersionSet. Store the information in
+// FileChecksumList which contains a map from file number to its checksum info.
+// If DB is not running, make sure call VersionSet::Recover() to load the file
+// metadata from Manifest to VersionSet before calling this function.
+Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
+ // Clean the previously stored checksum information if any.
+ if (checksum_list == nullptr) {
+ return Status::InvalidArgument("checksum_list is nullptr");
+ }
+ checksum_list->reset();
+
+ for (auto cfd : *column_family_set_) {
+ if (cfd->IsDropped() || !cfd->initialized()) {
+ continue;
+ }
+ for (int level = 0; level < cfd->NumberLevels(); level++) {
+ for (const auto& file :
+ cfd->current()->storage_info()->LevelFiles(level)) {
+ checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
+ file->file_checksum,
+ file->file_checksum_func_name);
+ }
+ }
+ }
+ return Status::OK();
+}
+
Status VersionSet::DumpManifest(Options& options, std::string& dscname,
bool verbose, bool hex, bool json) {
// Open the specified manifest file.
std::unique_ptr<SequentialFileReader> file_reader;
Status s;
{
- std::unique_ptr<SequentialFile> file;
- s = options.env->NewSequentialFile(
- dscname, &file, env_->OptimizeForManifestRead(env_options_));
+ std::unique_ptr<FSSequentialFile> file;
+ s = options.file_system->NewSequentialFile(
+ dscname,
+ options.file_system->OptimizeForManifestRead(file_options_), &file,
+ nullptr);
if (!s.ok()) {
return s;
}
- file_reader.reset(new SequentialFileReader(std::move(file), dscname));
+ file_reader.reset(new SequentialFileReader(
+ std::move(file), dscname, db_options_->log_readahead_size));
}
bool have_prev_log_number = false;
comparators.erase(edit.column_family_);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
assert(cfd != nullptr);
- cfd->Unref();
- delete cfd;
+ cfd->UnrefAndTryDelete();
cfd = nullptr;
} else {
if (!cf_in_builders) {
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
- builder->second->version_builder()->Apply(&edit);
+ s = builder->second->version_builder()->Apply(&edit);
+ if (!s.ok()) {
+ break;
+ }
}
if (cfd != nullptr && edit.has_log_number_) {
assert(builders_iter != builders.end());
auto builder = builders_iter->second->version_builder();
- Version* v = new Version(cfd, this, env_options_,
+ Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
next_file_number_.store(number + 1, std::memory_order_relaxed);
}
}
-
// Called only either from ::LogAndApply which is protected by mutex or during
// recovery which is single-threaded.
void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
}
}
-Status VersionSet::WriteSnapshot(log::Writer* log) {
+Status VersionSet::WriteCurrentStateToManifest(
+ const std::unordered_map<uint32_t, MutableCFState>& curr_state,
+ log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery?
// WARNING: This method doesn't hold a mutex!!
// This is done without DB mutex lock held, but only within single-threaded
// LogAndApply. Column family manipulations can only happen within LogAndApply
// (the same single thread), so we're safe to iterate.
+
+ if (db_options_->write_dbid_to_manifest) {
+ VersionEdit edit_for_db_id;
+ assert(!db_id_.empty());
+ edit_for_db_id.SetDBId(db_id_);
+ std::string db_id_record;
+ if (!edit_for_db_id.EncodeTo(&db_id_record)) {
+ return Status::Corruption("Unable to Encode VersionEdit:" +
+ edit_for_db_id.DebugString(true));
+ }
+ Status add_record = log->AddRecord(db_id_record);
+ if (!add_record.ok()) {
+ return add_record;
+ }
+ }
+
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
- f->marked_for_compaction);
+ f->marked_for_compaction, f->oldest_blob_file_number,
+ f->oldest_ancester_time, f->file_creation_time,
+ f->file_checksum, f->file_checksum_func_name);
}
}
- edit.SetLogNumber(cfd->GetLogNumber());
+ const auto iter = curr_state.find(cfd->GetID());
+ assert(iter != curr_state.end());
+ uint64_t log_number = iter->second.log_number;
+ edit.SetLogNumber(log_number);
std::string record;
if (!edit.EncodeTo(&record)) {
return Status::Corruption(
}
}
}
-
return Status::OK();
}
// (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
// we avoid doing binary search for the keys b and c twice and instead somehow
// maintain state of where they first appear in the files.
-uint64_t VersionSet::ApproximateSize(Version* v, const Slice& start,
+uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
+ Version* v, const Slice& start,
const Slice& end, int start_level,
- int end_level) {
+ int end_level, TableReaderCaller caller) {
+ const auto& icmp = v->cfd_->internal_comparator();
+
// pre-condition
- assert(v->cfd_->internal_comparator().Compare(start, end) <= 0);
+ assert(icmp.Compare(start, end) <= 0);
- uint64_t size = 0;
+ uint64_t total_full_size = 0;
const auto* vstorage = v->storage_info();
- end_level = end_level == -1
- ? vstorage->num_non_empty_levels()
- : std::min(end_level, vstorage->num_non_empty_levels());
+ const int num_non_empty_levels = vstorage->num_non_empty_levels();
+ end_level = (end_level == -1) ? num_non_empty_levels
+ : std::min(end_level, num_non_empty_levels);
assert(start_level <= end_level);
- for (int level = start_level; level < end_level; level++) {
+ // Outline of the optimization that uses options.files_size_error_margin.
+ // When approximating the files total size that is used to store a keys range,
+ // we first sum up the sizes of the files that fully fall into the range.
+ // Then we sum up the sizes of all the files that may intersect with the range
+ // (this includes all files in L0 as well). Then, if total_intersecting_size
+ // is smaller than total_full_size * options.files_size_error_margin - we can
+ // infer that the intersecting files have a sufficiently negligible
+ // contribution to the total size, and we can approximate the storage required
+ // for the keys in range as just half of the intersecting_files_size.
+ // E.g., if the value of files_size_error_margin is 0.1, then the error of the
+ // approximation is limited to only ~10% of the total size of files that fully
+ // fall into the keys range. In such case, this helps to avoid a costly
+ // process of binary searching the intersecting files that is required only
+ // for a more precise calculation of the total size.
+
+ autovector<FdWithKeyRange*, 32> first_files;
+ autovector<FdWithKeyRange*, 16> last_files;
+
+ // scan all the levels
+ for (int level = start_level; level < end_level; ++level) {
const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
- if (!files_brief.num_files) {
+ if (files_brief.num_files == 0) {
// empty level, skip exploration
continue;
}
- if (!level) {
- // level 0 data is sorted order, handle the use case explicitly
- size += ApproximateSizeLevel0(v, files_brief, start, end);
+ if (level == 0) {
+ // level 0 files are not in sorted order, we need to iterate through
+ // the list to compute the total bytes that require scanning,
+ // so handle the case explicitly (similarly to first_files case)
+ for (size_t i = 0; i < files_brief.num_files; i++) {
+ first_files.push_back(&files_brief.files[i]);
+ }
continue;
}
assert(level > 0);
assert(files_brief.num_files > 0);
- // identify the file position for starting key
- const uint64_t idx_start = FindFileInRange(
- v->cfd_->internal_comparator(), files_brief, start,
- /*start=*/0, static_cast<uint32_t>(files_brief.num_files - 1));
- assert(idx_start < files_brief.num_files);
-
- // scan all files from the starting position until the ending position
- // inferred from the sorted order
- for (uint64_t i = idx_start; i < files_brief.num_files; i++) {
- uint64_t val;
- val = ApproximateSize(v, files_brief.files[i], end);
- if (!val) {
- // the files after this will not have the range
- break;
- }
+ // identify the file position for start key
+ const int idx_start =
+ FindFileInRange(icmp, files_brief, start, 0,
+ static_cast<uint32_t>(files_brief.num_files - 1));
+ assert(static_cast<size_t>(idx_start) < files_brief.num_files);
- size += val;
+ // identify the file position for end key
+ int idx_end = idx_start;
+ if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
+ idx_end =
+ FindFileInRange(icmp, files_brief, end, idx_start,
+ static_cast<uint32_t>(files_brief.num_files - 1));
+ }
+ assert(idx_end >= idx_start &&
+ static_cast<size_t>(idx_end) < files_brief.num_files);
- if (i == idx_start) {
- // subtract the bytes needed to be scanned to get to the starting
- // key
- val = ApproximateSize(v, files_brief.files[i], start);
- assert(size >= val);
- size -= val;
- }
+ // scan all files from the starting index to the ending index
+ // (inferred from the sorted order)
+
+ // first scan all the intermediate full files (excluding first and last)
+ for (int i = idx_start + 1; i < idx_end; ++i) {
+ uint64_t file_size = files_brief.files[i].fd.GetFileSize();
+ // The entire file falls into the range, so we can just take its size.
+ assert(file_size ==
+ ApproximateSize(v, files_brief.files[i], start, end, caller));
+ total_full_size += file_size;
+ }
+
+ // save the first and the last files (which may be the same file), so we
+ // can scan them later.
+ first_files.push_back(&files_brief.files[idx_start]);
+ if (idx_start != idx_end) {
+ // we need to estimate size for both files, only if they are different
+ last_files.push_back(&files_brief.files[idx_end]);
}
}
- return size;
-}
+ // The sum of all file sizes that intersect the [start, end] keys range.
+ uint64_t total_intersecting_size = 0;
+ for (const auto* file_ptr : first_files) {
+ total_intersecting_size += file_ptr->fd.GetFileSize();
+ }
+ for (const auto* file_ptr : last_files) {
+ total_intersecting_size += file_ptr->fd.GetFileSize();
+ }
-uint64_t VersionSet::ApproximateSizeLevel0(Version* v,
- const LevelFilesBrief& files_brief,
- const Slice& key_start,
- const Slice& key_end) {
- // level 0 files are not in sorted order, we need to iterate through
- // the list to compute the total bytes that require scanning
- uint64_t size = 0;
- for (size_t i = 0; i < files_brief.num_files; i++) {
- const uint64_t start = ApproximateSize(v, files_brief.files[i], key_start);
- const uint64_t end = ApproximateSize(v, files_brief.files[i], key_end);
- assert(end >= start);
- size += end - start;
+ // Now scan all the first & last files at each level, and estimate their size.
+ // If the total_intersecting_size is less than X% of the total_full_size - we
+ // want to approximate the result in order to avoid the costly binary search
+ // inside ApproximateSize. We use half of file size as an approximation below.
+
+ const double margin = options.files_size_error_margin;
+ if (margin > 0 && total_intersecting_size <
+ static_cast<uint64_t>(total_full_size * margin)) {
+ total_full_size += total_intersecting_size / 2;
+ } else {
+ // Estimate for all the first files, at each level
+ for (const auto file_ptr : first_files) {
+ total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
+ }
+
+ // Estimate for all the last files, at each level
+ for (const auto file_ptr : last_files) {
+ // We could use ApproximateSize here, but calling ApproximateOffsetOf
+ // directly is just more efficient.
+ total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
+ }
}
- return size;
+
+ return total_full_size;
}
-uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
- const Slice& key) {
+uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
+ const Slice& key,
+ TableReaderCaller caller) {
// pre-condition
assert(v);
+ const auto& icmp = v->cfd_->internal_comparator();
uint64_t result = 0;
- if (v->cfd_->internal_comparator().Compare(f.largest_key, key) <= 0) {
+ if (icmp.Compare(f.largest_key, key) <= 0) {
// Entire file is before "key", so just add the file size
result = f.fd.GetFileSize();
- } else if (v->cfd_->internal_comparator().Compare(f.smallest_key, key) > 0) {
+ } else if (icmp.Compare(f.smallest_key, key) > 0) {
// Entire file is after "key", so ignore
result = 0;
} else {
// "key" falls in the range for this table. Add the
// approximate offset of "key" within the table.
- TableReader* table_reader_ptr;
- InternalIterator* iter = v->cfd_->table_cache()->NewIterator(
- ReadOptions(), v->env_options_, v->cfd_->internal_comparator(),
- *f.file_metadata, nullptr /* range_del_agg */,
- v->GetMutableCFOptions().prefix_extractor.get(), &table_reader_ptr);
- if (table_reader_ptr != nullptr) {
- result = table_reader_ptr->ApproximateOffsetOf(key);
+ TableCache* table_cache = v->cfd_->table_cache();
+ if (table_cache != nullptr) {
+ result = table_cache->ApproximateOffsetOf(
+ key, f.file_metadata->fd, caller, icmp,
+ v->GetMutableCFOptions().prefix_extractor.get());
}
- delete iter;
}
return result;
}
+uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
+ const Slice& start, const Slice& end,
+ TableReaderCaller caller) {
+ // pre-condition
+ assert(v);
+ const auto& icmp = v->cfd_->internal_comparator();
+ assert(icmp.Compare(start, end) <= 0);
+
+ if (icmp.Compare(f.largest_key, start) <= 0 ||
+ icmp.Compare(f.smallest_key, end) > 0) {
+ // Entire file is before or after the start/end keys range
+ return 0;
+ }
+
+ if (icmp.Compare(f.smallest_key, start) >= 0) {
+ // Start of the range is before the file start - approximate by end offset
+ return ApproximateOffsetOf(v, f, end, caller);
+ }
+
+ if (icmp.Compare(f.largest_key, end) < 0) {
+ // End of the range is after the file end - approximate by subtracting
+ // start offset from the file size
+ uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
+ assert(f.fd.GetFileSize() >= start_offset);
+ return f.fd.GetFileSize() - start_offset;
+ }
+
+ // The interval falls entirely in the range for this file.
+ TableCache* table_cache = v->cfd_->table_cache();
+ if (table_cache == nullptr) {
+ return 0;
+ }
+ return table_cache->ApproximateSize(
+ start, end, f.file_metadata->fd, caller, icmp,
+ v->GetMutableCFOptions().prefix_extractor.get());
+}
+
void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
// pre-calculate space requirement
int64_t total_files = 0;
InternalIterator* VersionSet::MakeInputIterator(
const Compaction* c, RangeDelAggregator* range_del_agg,
- const EnvOptions& env_options_compactions) {
+ const FileOptions& file_options_compactions) {
auto cfd = c->column_family_data();
ReadOptions read_options;
read_options.verify_checksums = true;
const LevelFilesBrief* flevel = c->input_levels(which);
for (size_t i = 0; i < flevel->num_files; i++) {
list[num++] = cfd->table_cache()->NewIterator(
- read_options, env_options_compactions, cfd->internal_comparator(),
+ read_options, file_options_compactions,
+ cfd->internal_comparator(),
*flevel->files[i].file_metadata, range_del_agg,
c->mutable_cf_options()->prefix_extractor.get(),
- nullptr /* table_reader_ptr */,
- nullptr /* no per level latency histogram */,
- true /* for_compaction */, nullptr /* arena */,
- false /* skip_filters */, static_cast<int>(which) /* level */);
+ /*table_reader_ptr=*/nullptr,
+ /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
+ /*arena=*/nullptr,
+ /*skip_filters=*/false, /*level=*/static_cast<int>(which),
+ /*smallest_compaction_key=*/nullptr,
+ /*largest_compaction_key=*/nullptr);
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = new LevelIterator(
- cfd->table_cache(), read_options, env_options_compactions,
+ cfd->table_cache(), read_options, file_options_compactions,
cfd->internal_comparator(), c->input_levels(which),
c->mutable_cf_options()->prefix_extractor.get(),
- false /* should_sample */,
- nullptr /* no per level latency histogram */,
- true /* for_compaction */, false /* skip_filters */,
- static_cast<int>(which) /* level */, range_del_agg,
+ /*should_sample=*/false,
+ /*no per level latency histogram=*/nullptr,
+ TableReaderCaller::kCompaction, /*skip_filters=*/false,
+ /*level=*/static_cast<int>(which), range_del_agg,
c->boundaries(which));
}
}
assert(!cfd->ioptions()->cf_paths.empty());
filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
}
- filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
+ const uint64_t file_number = file->fd.GetNumber();
+ filemetadata.name = MakeTableFileName("", file_number);
+ filemetadata.file_number = file_number;
filemetadata.level = level;
filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
filemetadata.smallestkey = file->smallest.user_key().ToString();
filemetadata.being_compacted = file->being_compacted;
filemetadata.num_entries = file->num_entries;
filemetadata.num_deletions = file->num_deletions;
+ filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
+ filemetadata.file_checksum = file->file_checksum;
+ filemetadata.file_checksum_func_name = file->file_checksum_func_name;
metadata->push_back(filemetadata);
}
}
MutableCFOptions dummy_cf_options;
Version* dummy_versions =
- new Version(nullptr, this, env_options_, dummy_cf_options);
+ new Version(nullptr, this, file_options_, dummy_cf_options);
// Ref() dummy version once so that later we can call Unref() to delete it
// by avoiding calling "delete" explicitly (~Version is private)
dummy_versions->Ref();
edit->column_family_name_, edit->column_family_, dummy_versions,
cf_options);
- Version* v = new Version(new_cfd, this, env_options_,
+ Version* v = new Version(new_cfd, this, file_options_,
*new_cfd->GetLatestMutableCFOptions(),
current_version_number_++);
ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
const ImmutableDBOptions* _db_options,
- const EnvOptions& _env_options,
+ const FileOptions& _file_options,
Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller)
- : VersionSet(dbname, _db_options, _env_options, table_cache,
- write_buffer_manager, write_controller) {}
+ : VersionSet(dbname, _db_options, _file_options, table_cache,
+ write_buffer_manager, write_controller,
+ /*block_cache_tracer=*/nullptr),
+ number_of_edits_to_skip_(0) {}
ReactiveVersionSet::~ReactiveVersionSet() {}
// In recovery, nobody else can access it, so it's fine to set it to be
// initialized earlier.
default_cfd->set_initialized();
-
- bool have_log_number = false;
- bool have_prev_log_number = false;
- bool have_next_file = false;
- bool have_last_sequence = false;
- uint64_t next_file = 0;
- uint64_t last_sequence = 0;
- uint64_t log_number = 0;
- uint64_t previous_log_number = 0;
- uint32_t max_column_family = 0;
- uint64_t min_log_number_to_keep = 0;
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
builders;
std::unordered_map<int, std::string> column_families_not_found;
log::Reader* reader = manifest_reader->get();
int retry = 0;
+ VersionEdit version_edit;
while (s.ok() && retry < 1) {
assert(reader != nullptr);
Slice record;
std::string scratch;
- while (s.ok() && reader->ReadRecord(&record, &scratch)) {
- VersionEdit edit;
- s = edit.DecodeFrom(record);
- if (!s.ok()) {
- break;
- }
- s = ApplyOneVersionEditToBuilder(
- edit, cf_name_to_options, column_families_not_found, builders,
- &have_log_number, &log_number, &have_prev_log_number,
- &previous_log_number, &have_next_file, &next_file,
- &have_last_sequence, &last_sequence, &min_log_number_to_keep,
- &max_column_family);
- }
+ s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
+ column_families_not_found, builders, &version_edit);
if (s.ok()) {
- bool enough = have_next_file && have_log_number && have_last_sequence;
+ bool enough = version_edit.has_next_file_number_ &&
+ version_edit.has_log_number_ &&
+ version_edit.has_last_sequence_;
if (enough) {
for (const auto& cf : column_families) {
auto cfd = column_family_set_->GetColumnFamily(cf.name);
}
if (s.ok()) {
- if (!have_prev_log_number) {
- previous_log_number = 0;
+ if (!version_edit.has_prev_log_number_) {
+ version_edit.prev_log_number_ = 0;
}
- column_family_set_->UpdateMaxColumnFamily(max_column_family);
+ column_family_set_->UpdateMaxColumnFamily(version_edit.max_column_family_);
- MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
- MarkFileNumberUsed(previous_log_number);
- MarkFileNumberUsed(log_number);
+ MarkMinLogNumberToKeep2PC(version_edit.min_log_number_to_keep_);
+ MarkFileNumberUsed(version_edit.prev_log_number_);
+ MarkFileNumberUsed(version_edit.log_number_);
for (auto cfd : *column_family_set_) {
assert(builders.count(cfd->GetID()) > 0);
assert(builders_iter != builders.end());
auto* builder = builders_iter->second->version_builder();
- Version* v = new Version(cfd, this, env_options_,
+ Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
!(db_options_->skip_stats_update_on_db_open));
AppendVersion(cfd, v);
}
- next_file_number_.store(next_file + 1);
- last_allocated_sequence_ = last_sequence;
- last_published_sequence_ = last_sequence;
- last_sequence_ = last_sequence;
- prev_log_number_ = previous_log_number;
+ next_file_number_.store(version_edit.next_file_number_ + 1);
+ last_allocated_sequence_ = version_edit.last_sequence_;
+ last_published_sequence_ = version_edit.last_sequence_;
+ last_sequence_ = version_edit.last_sequence_;
+ prev_log_number_ = version_edit.prev_log_number_;
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
mu->AssertHeld();
Status s;
- bool have_log_number = false;
- bool have_prev_log_number = false;
- bool have_next_file = false;
- bool have_last_sequence = false;
- uint64_t next_file = 0;
- uint64_t last_sequence = 0;
- uint64_t log_number = 0;
- uint64_t previous_log_number = 0;
- uint32_t max_column_family = 0;
- uint64_t min_log_number_to_keep = 0;
-
+ uint64_t applied_edits = 0;
while (s.ok()) {
Slice record;
std::string scratch;
if (!s.ok()) {
break;
}
- ColumnFamilyData* cfd =
- column_family_set_->GetColumnFamily(edit.column_family_);
- // If we cannot find this column family in our column family set, then it
- // may be a new column family created by the primary after the secondary
- // starts. Ignore it for now.
- if (nullptr == cfd) {
+
+ // Skip the first VersionEdits of each MANIFEST generated by
+ // VersionSet::WriteCurrentStatetoManifest.
+ if (number_of_edits_to_skip_ > 0) {
+ ColumnFamilyData* cfd =
+ column_family_set_->GetColumnFamily(edit.column_family_);
+ if (cfd != nullptr && !cfd->IsDropped()) {
+ --number_of_edits_to_skip_;
+ }
continue;
}
- if (active_version_builders_.find(edit.column_family_) ==
- active_version_builders_.end()) {
- std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
- new BaseReferencedVersionBuilder(cfd));
- active_version_builders_.insert(
- std::make_pair(edit.column_family_, std::move(builder_guard)));
- }
- s = ApplyOneVersionEditToBuilder(
- edit, &have_log_number, &log_number, &have_prev_log_number,
- &previous_log_number, &have_next_file, &next_file,
- &have_last_sequence, &last_sequence, &min_log_number_to_keep,
- &max_column_family);
+
+ s = read_buffer_.AddEdit(&edit);
if (!s.ok()) {
break;
}
- auto builder_iter = active_version_builders_.find(edit.column_family_);
- assert(builder_iter != active_version_builders_.end());
- auto builder = builder_iter->second->version_builder();
- assert(builder != nullptr);
- s = builder->LoadTableHandlers(
- cfd->internal_stats(), db_options_->max_file_opening_threads,
- false /* prefetch_index_and_filter_in_cache */,
- false /* is_initial_load */,
- cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
- TEST_SYNC_POINT_CALLBACK(
- "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", &s);
- if (!s.ok() && !s.IsPathNotFound()) {
- break;
- } else if (s.IsPathNotFound()) {
- s = Status::OK();
- } else { // s.ok() == true
- auto version = new Version(cfd, this, env_options_,
- *cfd->GetLatestMutableCFOptions(),
- current_version_number_++);
- builder->SaveTo(version->storage_info());
- version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
- AppendVersion(cfd, version);
- active_version_builders_.erase(builder_iter);
- if (cfds_changed->count(cfd) == 0) {
- cfds_changed->insert(cfd);
+ VersionEdit temp_edit;
+ if (edit.is_in_atomic_group_) {
+ if (read_buffer_.IsFull()) {
+ // Apply edits in an atomic group when we have read all edits in the
+ // group.
+ for (auto& e : read_buffer_.replay_buffer()) {
+ s = ApplyOneVersionEditToBuilder(e, cfds_changed, &temp_edit);
+ if (!s.ok()) {
+ break;
+ }
+ applied_edits++;
+ }
+ if (!s.ok()) {
+ break;
+ }
+ read_buffer_.Clear();
+ }
+ } else {
+ // Apply a normal edit immediately.
+ s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
+ if (s.ok()) {
+ applied_edits++;
}
}
- if (have_next_file) {
- next_file_number_.store(next_file + 1);
- }
- if (have_last_sequence) {
- last_allocated_sequence_ = last_sequence;
- last_published_sequence_ = last_sequence;
- last_sequence_ = last_sequence;
- }
- if (have_prev_log_number) {
- prev_log_number_ = previous_log_number;
- MarkFileNumberUsed(previous_log_number);
- }
- if (have_log_number) {
- MarkFileNumberUsed(log_number);
- }
- column_family_set_->UpdateMaxColumnFamily(max_column_family);
- MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
+ }
+ if (!s.ok()) {
+ // Clear the buffer if we fail to decode/apply an edit.
+ read_buffer_.Clear();
}
// It's possible that:
// 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
// find the next MANIFEST, we should exit the loop.
s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
reader = manifest_reader->get();
- if (s.ok() && reader->file()->file_name() == old_manifest_path) {
- break;
+ if (s.ok()) {
+ if (reader->file()->file_name() == old_manifest_path) {
+ // Still processing the same MANIFEST, thus no need to continue this
+ // loop since no record is available if we have reached here.
+ break;
+ } else {
+ // We have switched to a new MANIFEST whose first records have been
+ // generated by VersionSet::WriteCurrentStatetoManifest. Since the
+ // secondary instance has already finished recovering upon start, there
+ // is no need for the secondary to process these records. Actually, if
+ // the secondary were to replay these records, the secondary may end up
+ // adding the same SST files AGAIN to each column family, causing
+ // consistency checks done by VersionBuilder to fail. Therefore, we
+ // record the number of records to skip at the beginning of the new
+ // MANIFEST and ignore them.
+ number_of_edits_to_skip_ = 0;
+ for (auto* cfd : *column_family_set_) {
+ if (cfd->IsDropped()) {
+ continue;
+ }
+ // Increase number_of_edits_to_skip by 2 because
+ // WriteCurrentStatetoManifest() writes 2 version edits for each
+ // column family at the beginning of the newly-generated MANIFEST.
+ // TODO(yanqin) remove hard-coded value.
+ if (db_options_->write_dbid_to_manifest) {
+ number_of_edits_to_skip_ += 3;
+ } else {
+ number_of_edits_to_skip_ += 2;
+ }
+ }
+ }
}
}
}
}
}
+ TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits",
+ &applied_edits);
return s;
}
Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
- VersionEdit& edit, bool* have_log_number, uint64_t* log_number,
- bool* have_prev_log_number, uint64_t* previous_log_number,
- bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
- SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
- uint32_t* max_column_family) {
- ColumnFamilyData* cfd = nullptr;
- Status status;
+ VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
+ VersionEdit* version_edit) {
+ ColumnFamilyData* cfd =
+ column_family_set_->GetColumnFamily(edit.column_family_);
+
+ // If we cannot find this column family in our column family set, then it
+ // may be a new column family created by the primary after the secondary
+ // starts. It is also possible that the secondary instance opens only a subset
+ // of column families. Ignore it for now.
+ if (nullptr == cfd) {
+ return Status::OK();
+ }
+ if (active_version_builders_.find(edit.column_family_) ==
+ active_version_builders_.end() &&
+ !cfd->IsDropped()) {
+ std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
+ new BaseReferencedVersionBuilder(cfd));
+ active_version_builders_.insert(
+ std::make_pair(edit.column_family_, std::move(builder_guard)));
+ }
+
+ auto builder_iter = active_version_builders_.find(edit.column_family_);
+ assert(builder_iter != active_version_builders_.end());
+ auto builder = builder_iter->second->version_builder();
+ assert(builder != nullptr);
+
if (edit.is_column_family_add_) {
// TODO (yanqin) for now the secondary ignores column families created
// after Open. This also simplifies handling of switching to a new MANIFEST
// and processing the snapshot of the system at the beginning of the
// MANIFEST.
- return Status::OK();
} else if (edit.is_column_family_drop_) {
- cfd = column_family_set_->GetColumnFamily(edit.column_family_);
- // Drop a CF created by primary after secondary starts? Then ignore
- if (cfd == nullptr) {
- return Status::OK();
- }
// Drop the column family by setting it to be 'dropped' without destroying
// the column family handle.
+ // TODO (haoyu) figure out how to handle column faimly drop for
+ // secondary instance. (Is it possible that the ref count for cfd is 0 but
+ // the ref count for its versions is higher than 0?)
cfd->SetDropped();
- if (cfd->Unref()) {
- delete cfd;
+ if (cfd->UnrefAndTryDelete()) {
cfd = nullptr;
}
+ active_version_builders_.erase(builder_iter);
} else {
- cfd = column_family_set_->GetColumnFamily(edit.column_family_);
- // Operation on a CF created after Open? Then ignore
- if (cfd == nullptr) {
- return Status::OK();
+ Status s = builder->Apply(&edit);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ Status s = ExtractInfoFromVersionEdit(cfd, edit, version_edit);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (cfd != nullptr && !cfd->IsDropped()) {
+ s = builder->LoadTableHandlers(
+ cfd->internal_stats(), db_options_->max_file_opening_threads,
+ false /* prefetch_index_and_filter_in_cache */,
+ false /* is_initial_load */,
+ cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
+ TEST_SYNC_POINT_CALLBACK(
+ "ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
+ "AfterLoadTableHandlers",
+ &s);
+
+ if (s.ok()) {
+ auto version = new Version(cfd, this, file_options_,
+ *cfd->GetLatestMutableCFOptions(),
+ current_version_number_++);
+ builder->SaveTo(version->storage_info());
+ version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
+ AppendVersion(cfd, version);
+ active_version_builders_.erase(builder_iter);
+ if (cfds_changed->count(cfd) == 0) {
+ cfds_changed->insert(cfd);
+ }
+ } else if (s.IsPathNotFound()) {
+ s = Status::OK();
}
- auto builder_iter = active_version_builders_.find(edit.column_family_);
- assert(builder_iter != active_version_builders_.end());
- auto builder = builder_iter->second->version_builder();
- assert(builder != nullptr);
- builder->Apply(&edit);
+ // Some other error has occurred during LoadTableHandlers.
+ }
+
+ if (version_edit->HasNextFile()) {
+ next_file_number_.store(version_edit->next_file_number_ + 1);
+ }
+ if (version_edit->has_last_sequence_) {
+ last_allocated_sequence_ = version_edit->last_sequence_;
+ last_published_sequence_ = version_edit->last_sequence_;
+ last_sequence_ = version_edit->last_sequence_;
}
- return ExtractInfoFromVersionEdit(
- cfd, edit, have_log_number, log_number, have_prev_log_number,
- previous_log_number, have_next_file, next_file, have_last_sequence,
- last_sequence, min_log_number_to_keep, max_column_family);
+ if (version_edit->has_prev_log_number_) {
+ prev_log_number_ = version_edit->prev_log_number_;
+ MarkFileNumberUsed(version_edit->prev_log_number_);
+ }
+ if (version_edit->has_log_number_) {
+ MarkFileNumberUsed(version_edit->log_number_);
+ }
+ column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
+ MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
+ return s;
}
Status ReactiveVersionSet::MaybeSwitchManifest(
Status s;
do {
std::string manifest_path;
- s = GetCurrentManifestPath(&manifest_path);
- std::unique_ptr<SequentialFile> manifest_file;
+ s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
+ &manifest_file_number_);
+ std::unique_ptr<FSSequentialFile> manifest_file;
if (s.ok()) {
if (nullptr == manifest_reader->get() ||
manifest_reader->get()->file()->file_name() != manifest_path) {
TEST_SYNC_POINT(
"ReactiveVersionSet::MaybeSwitchManifest:"
"AfterGetCurrentManifestPath:1");
- s = env_->NewSequentialFile(
- manifest_path, &manifest_file,
- env_->OptimizeForManifestRead(env_options_));
+ s = fs_->NewSequentialFile(manifest_path,
+ env_->OptimizeForManifestRead(file_options_),
+ &manifest_file, nullptr);
} else {
// No need to switch manifest.
break;
std::unique_ptr<SequentialFileReader> manifest_file_reader;
if (s.ok()) {
manifest_file_reader.reset(
- new SequentialFileReader(std::move(manifest_file), manifest_path));
+ new SequentialFileReader(std::move(manifest_file), manifest_path,
+ db_options_->log_readahead_size));
manifest_reader->reset(new log::FragmentBufferedReader(
nullptr, std::move(manifest_file_reader), reporter,
true /* checksum */, 0 /* log_number */));
// TODO (yanqin) every time we switch to a new MANIFEST, we clear the
// active_version_builders_ map because we choose to construct the
// versions from scratch, thanks to the first part of each MANIFEST
- // written by VersionSet::WriteSnapshot. This is not necessary, but we
- // choose this at present for the sake of simplicity.
+ // written by VersionSet::WriteCurrentStatetoManifest. This is not
+ // necessary, but we choose this at present for the sake of simplicity.
active_version_builders_.clear();
}
} while (s.IsPathNotFound());
return s;
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE