// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
#include <inttypes.h>
#include <stdio.h>
#include <algorithm>
-#include <climits>
#include <map>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
+#include <list>
#include "db/compaction.h"
#include "db/internal_stats.h"
#include "db/log_reader.h"
#include "db/pinned_iterators_manager.h"
#include "db/table_cache.h"
#include "db/version_builder.h"
+#include "monitoring/file_read_sample.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
const Slice& key,
uint32_t left,
uint32_t right) {
- while (left < right) {
- uint32_t mid = (left + right) / 2;
- const FdWithKeyRange& f = file_level.files[mid];
- if (icmp.InternalKeyComparator::Compare(f.largest_key, key) < 0) {
- // Key at "mid.largest" is < "target". Therefore all
- // files at or before "mid" are uninteresting.
- left = mid + 1;
- } else {
- // Key at "mid.largest" is >= "target". Therefore all files
- // after "mid" are uninteresting.
- right = mid;
+ auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
+ return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
+ };
+ const auto &b = file_level.files;
+ return static_cast<int>(std::lower_bound(b + left,
+ b + right, key, cmp) - b);
+}
+
+Status OverlapWithIterator(const Comparator* ucmp,
+ const Slice& smallest_user_key,
+ const Slice& largest_user_key,
+ InternalIterator* iter,
+ bool* overlap) {
+ InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
+ kValueTypeForSeek);
+ iter->Seek(range_start.Encode());
+ if (!iter->status().ok()) {
+ return iter->status();
+ }
+
+ *overlap = false;
+ if (iter->Valid()) {
+ ParsedInternalKey seek_result;
+ if (!ParseInternalKey(iter->key(), &seek_result)) {
+ return Status::Corruption("DB have corrupted keys");
+ }
+
+ if (ucmp->Compare(seek_result.user_key, largest_user_key) <= 0) {
+ *overlap = true;
}
}
- return right;
+
+ return iter->status();
}
// Class to help choose the next file to search for the particular key.
#endif
level_files_brief_(file_levels),
is_hit_file_last_in_level_(false),
+ curr_file_level_(nullptr),
user_key_(user_key),
ikey_(ikey),
file_indexer_(file_indexer),
user_comparator_(user_comparator),
internal_comparator_(internal_comparator) {
+#ifdef NDEBUG
+ (void)files;
+#endif
// Setup member variables to search first level.
search_ended_ = !PrepareNextLevel();
if (!search_ended_) {
}
}
- int GetCurrentLevel() { return returned_file_level_; }
+ int GetCurrentLevel() const { return curr_level_; }
FdWithKeyRange* GetNextFile() {
while (!search_ended_) { // Loops over different levels.
// Do key range filtering of files or/and fractional cascading if:
// (1) not all the files are in level 0, or
- // (2) there are more than 3 Level 0 files
- // If there are only 3 or less level 0 files in the system, we skip
+ // (2) there are more than 3 current level files
+ // If there are only 3 or less current level files in the system, we skip
// the key range filtering. In this case, more likely, the system is
// highly tuned to minimize number of tables queried by each query,
// so it is unlikely that key range filtering is more efficient than
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
- if (f->table_reader_handle) {
- cfd_->table_cache()->EraseHandle(f->fd, f->table_reader_handle);
- f->table_reader_handle = nullptr;
- }
- vset_->obsolete_files_.push_back(f);
+ assert(cfd_ != nullptr);
+ uint32_t path_id = f->fd.GetPathId();
+ assert(path_id < cfd_->ioptions()->cf_paths.size());
+ vset_->obsolete_files_.push_back(
+ ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
}
}
}
FdWithKeyRange& f = file_level->files[i];
f.fd = files[i]->fd;
+ f.file_metadata = files[i];
f.smallest_key = Slice(mem, smallest_size);
f.largest_key = Slice(mem + smallest_size, largest_size);
}
// Binary search over file list
uint32_t index = 0;
if (smallest_user_key != nullptr) {
- // Find the earliest possible internal key for smallest_user_key
+ // Find the leftmost possible internal key for smallest_user_key
InternalKey small;
- small.SetMaxPossibleForUserKey(*smallest_user_key);
+ small.SetMinPossibleForUserKey(*smallest_user_key);
index = FindFile(icmp, file_level, small.Encode());
}
}
namespace {
-
-// An internal iterator. For a given version/level pair, yields
-// information about the files in the level. For a given entry, key()
-// is the largest key that occurs in the file, and value() is an
-// 16-byte value containing the file number and file size, both
-// encoded using EncodeFixed64.
-class LevelFileNumIterator : public InternalIterator {
+class LevelIterator final : public InternalIterator {
public:
- LevelFileNumIterator(const InternalKeyComparator& icmp,
- const LevelFilesBrief* flevel)
- : icmp_(icmp),
+ LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
+ const EnvOptions& env_options,
+ const InternalKeyComparator& icomparator,
+ const LevelFilesBrief* flevel,
+ const SliceTransform* prefix_extractor, bool should_sample,
+ HistogramImpl* file_read_hist, bool for_compaction,
+ bool skip_filters, int level, RangeDelAggregator* range_del_agg)
+ : table_cache_(table_cache),
+ read_options_(read_options),
+ env_options_(env_options),
+ icomparator_(icomparator),
flevel_(flevel),
- index_(static_cast<uint32_t>(flevel->num_files)),
- current_value_(0, 0, 0) { // Marks as invalid
- }
- virtual bool Valid() const override { return index_ < flevel_->num_files; }
- virtual void Seek(const Slice& target) override {
- index_ = FindFile(icmp_, *flevel_, target);
- }
- virtual void SeekForPrev(const Slice& target) override {
- SeekForPrevImpl(target, &icmp_);
+ prefix_extractor_(prefix_extractor),
+ file_read_hist_(file_read_hist),
+ should_sample_(should_sample),
+ for_compaction_(for_compaction),
+ skip_filters_(skip_filters),
+ file_index_(flevel_->num_files),
+ level_(level),
+ range_del_agg_(range_del_agg),
+ pinned_iters_mgr_(nullptr) {
+ // Empty level is not supported.
+ assert(flevel_ != nullptr && flevel_->num_files > 0);
}
- virtual void SeekToFirst() override { index_ = 0; }
- virtual void SeekToLast() override {
- index_ = (flevel_->num_files == 0)
- ? 0
- : static_cast<uint32_t>(flevel_->num_files) - 1;
- }
- virtual void Next() override {
+ virtual ~LevelIterator() { delete file_iter_.Set(nullptr); }
+
+ virtual void Seek(const Slice& target) override;
+ virtual void SeekForPrev(const Slice& target) override;
+ virtual void SeekToFirst() override;
+ virtual void SeekToLast() override;
+ virtual void Next() override;
+ virtual void Prev() override;
+
+ virtual bool Valid() const override { return file_iter_.Valid(); }
+ virtual Slice key() const override {
assert(Valid());
- index_++;
+ return file_iter_.key();
}
- virtual void Prev() override {
+ virtual Slice value() const override {
assert(Valid());
- if (index_ == 0) {
- index_ = static_cast<uint32_t>(flevel_->num_files); // Marks as invalid
- } else {
- index_--;
+ return file_iter_.value();
+ }
+ virtual Status status() const override {
+ return file_iter_.iter() ? file_iter_.status() : Status::OK();
+ }
+ virtual void SetPinnedItersMgr(
+ PinnedIteratorsManager* pinned_iters_mgr) override {
+ pinned_iters_mgr_ = pinned_iters_mgr;
+ if (file_iter_.iter()) {
+ file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
}
}
- Slice key() const override {
- assert(Valid());
- return flevel_->files[index_].largest_key;
+ virtual bool IsKeyPinned() const override {
+ return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+ file_iter_.iter() && file_iter_.IsKeyPinned();
}
- Slice value() const override {
- assert(Valid());
-
- auto file_meta = flevel_->files[index_];
- current_value_ = file_meta.fd;
- return Slice(reinterpret_cast<const char*>(¤t_value_),
- sizeof(FileDescriptor));
+ virtual bool IsValuePinned() const override {
+ return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+ file_iter_.iter() && file_iter_.IsValuePinned();
}
- virtual Status status() const override { return Status::OK(); }
private:
- const InternalKeyComparator icmp_;
- const LevelFilesBrief* flevel_;
- uint32_t index_;
- mutable FileDescriptor current_value_;
-};
+ void SkipEmptyFileForward();
+ void SkipEmptyFileBackward();
+ void SetFileIterator(InternalIterator* iter);
+ void InitFileIterator(size_t new_file_index);
-class LevelFileIteratorState : public TwoLevelIteratorState {
- public:
- // @param skip_filters Disables loading/accessing the filter block
- LevelFileIteratorState(TableCache* table_cache,
- const ReadOptions& read_options,
- const EnvOptions& env_options,
- const InternalKeyComparator& icomparator,
- HistogramImpl* file_read_hist, bool for_compaction,
- bool prefix_enabled, bool skip_filters, int level,
- RangeDelAggregator* range_del_agg)
- : TwoLevelIteratorState(prefix_enabled),
- table_cache_(table_cache),
- read_options_(read_options),
- env_options_(env_options),
- icomparator_(icomparator),
- file_read_hist_(file_read_hist),
- for_compaction_(for_compaction),
- skip_filters_(skip_filters),
- level_(level),
- range_del_agg_(range_del_agg) {}
+ const Slice& file_smallest_key(size_t file_index) {
+ assert(file_index < flevel_->num_files);
+ return flevel_->files[file_index].smallest_key;
+ }
- InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override {
- if (meta_handle.size() != sizeof(FileDescriptor)) {
- return NewErrorInternalIterator(
- Status::Corruption("FileReader invoked with unexpected value"));
- }
- const FileDescriptor* fd =
- reinterpret_cast<const FileDescriptor*>(meta_handle.data());
- return table_cache_->NewIterator(
- read_options_, env_options_, icomparator_, *fd, range_del_agg_,
- nullptr /* don't need reference to table */, file_read_hist_,
- for_compaction_, nullptr /* arena */, skip_filters_, level_);
+ bool KeyReachedUpperBound(const Slice& internal_key) {
+ return read_options_.iterate_upper_bound != nullptr &&
+ icomparator_.user_comparator()->Compare(
+ ExtractUserKey(internal_key),
+ *read_options_.iterate_upper_bound) >= 0;
}
- bool PrefixMayMatch(const Slice& internal_key) override {
- return true;
+ InternalIterator* NewFileIterator() {
+ assert(file_index_ < flevel_->num_files);
+ auto file_meta = flevel_->files[file_index_];
+ if (should_sample_) {
+ sample_file_read_inc(file_meta.file_metadata);
+ }
+
+ return table_cache_->NewIterator(
+ read_options_, env_options_, icomparator_, *file_meta.file_metadata,
+ range_del_agg_, prefix_extractor_,
+ nullptr /* don't need reference to table */,
+ file_read_hist_, for_compaction_, nullptr /* arena */, skip_filters_,
+ level_);
}
- private:
TableCache* table_cache_;
const ReadOptions read_options_;
const EnvOptions& env_options_;
const InternalKeyComparator& icomparator_;
+ const LevelFilesBrief* flevel_;
+ mutable FileDescriptor current_value_;
+ const SliceTransform* prefix_extractor_;
+
HistogramImpl* file_read_hist_;
+ bool should_sample_;
bool for_compaction_;
bool skip_filters_;
+ size_t file_index_;
int level_;
RangeDelAggregator* range_del_agg_;
+ IteratorWrapper file_iter_; // May be nullptr
+ PinnedIteratorsManager* pinned_iters_mgr_;
};
+void LevelIterator::Seek(const Slice& target) {
+ size_t new_file_index = FindFile(icomparator_, *flevel_, target);
+
+ InitFileIterator(new_file_index);
+ if (file_iter_.iter() != nullptr) {
+ file_iter_.Seek(target);
+ }
+ SkipEmptyFileForward();
+}
+
+void LevelIterator::SeekForPrev(const Slice& target) {
+ size_t new_file_index = FindFile(icomparator_, *flevel_, target);
+ if (new_file_index >= flevel_->num_files) {
+ new_file_index = flevel_->num_files - 1;
+ }
+
+ InitFileIterator(new_file_index);
+ if (file_iter_.iter() != nullptr) {
+ file_iter_.SeekForPrev(target);
+ SkipEmptyFileBackward();
+ }
+}
+
+void LevelIterator::SeekToFirst() {
+ InitFileIterator(0);
+ if (file_iter_.iter() != nullptr) {
+ file_iter_.SeekToFirst();
+ }
+ SkipEmptyFileForward();
+}
+
+void LevelIterator::SeekToLast() {
+ InitFileIterator(flevel_->num_files - 1);
+ if (file_iter_.iter() != nullptr) {
+ file_iter_.SeekToLast();
+ }
+ SkipEmptyFileBackward();
+}
+
+void LevelIterator::Next() {
+ assert(Valid());
+ file_iter_.Next();
+ SkipEmptyFileForward();
+}
+
+void LevelIterator::Prev() {
+ assert(Valid());
+ file_iter_.Prev();
+ SkipEmptyFileBackward();
+}
+
+void LevelIterator::SkipEmptyFileForward() {
+ while (file_iter_.iter() == nullptr ||
+ (!file_iter_.Valid() && file_iter_.status().ok() &&
+ !file_iter_.iter()->IsOutOfBound())) {
+ // Move to next file
+ if (file_index_ >= flevel_->num_files - 1) {
+ // Already at the last file
+ SetFileIterator(nullptr);
+ return;
+ }
+ if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
+ SetFileIterator(nullptr);
+ return;
+ }
+ InitFileIterator(file_index_ + 1);
+ if (file_iter_.iter() != nullptr) {
+ file_iter_.SeekToFirst();
+ }
+ }
+}
+
+void LevelIterator::SkipEmptyFileBackward() {
+ while (file_iter_.iter() == nullptr ||
+ (!file_iter_.Valid() && file_iter_.status().ok())) {
+ // Move to previous file
+ if (file_index_ == 0) {
+ // Already the first file
+ SetFileIterator(nullptr);
+ return;
+ }
+ InitFileIterator(file_index_ - 1);
+ if (file_iter_.iter() != nullptr) {
+ file_iter_.SeekToLast();
+ }
+ }
+}
+
+void LevelIterator::SetFileIterator(InternalIterator* iter) {
+ if (pinned_iters_mgr_ && iter) {
+ iter->SetPinnedItersMgr(pinned_iters_mgr_);
+ }
+
+ InternalIterator* old_iter = file_iter_.Set(iter);
+ if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
+ pinned_iters_mgr_->PinIterator(old_iter);
+ } else {
+ delete old_iter;
+ }
+}
+
+void LevelIterator::InitFileIterator(size_t new_file_index) {
+ if (new_file_index >= flevel_->num_files) {
+ file_index_ = new_file_index;
+ SetFileIterator(nullptr);
+ return;
+ } else {
+ // If the file iterator shows incomplete, we try it again if users seek
+ // to the same file, as this time we may go to a different data block
+ // which is cached in block cache.
+ //
+ if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
+ new_file_index == file_index_) {
+ // file_iter_ is already constructed with this iterator, so
+ // no need to change anything
+ } else {
+ file_index_ = new_file_index;
+ InternalIterator* iter = NewFileIterator();
+ SetFileIterator(iter);
+ }
+ }
+}
+
// A wrapper of version builder which references the current version in
// constructor and unref it in the destructor.
// Both of the constructor and destructor need to be called inside DB Mutex.
auto table_cache = cfd_->table_cache();
auto ioptions = cfd_->ioptions();
Status s = table_cache->GetTableProperties(
- vset_->env_options_, cfd_->internal_comparator(), file_meta->fd,
- tp, true /* no io */);
+ env_options_, cfd_->internal_comparator(), file_meta->fd, tp,
+ mutable_cf_options_.prefix_extractor.get(), true /* no io */);
if (s.ok()) {
return s;
}
// 2. Table is not present in table cache, we'll read the table properties
// directly from the properties block in the file.
std::unique_ptr<RandomAccessFile> file;
+ std::string file_name;
if (fname != nullptr) {
- s = ioptions->env->NewRandomAccessFile(
- *fname, &file, vset_->env_options_);
+ file_name = *fname;
} else {
- s = ioptions->env->NewRandomAccessFile(
- TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
- file_meta->fd.GetPathId()),
- &file, vset_->env_options_);
+ file_name =
+ TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
+ file_meta->fd.GetPathId());
}
+ s = ioptions->env->NewRandomAccessFile(file_name, &file, env_options_);
if (!s.ok()) {
return s;
}
// By setting the magic number to kInvalidTableMagicNumber, we can by
// pass the magic number check in the footer.
std::unique_ptr<RandomAccessFileReader> file_reader(
- new RandomAccessFileReader(std::move(file)));
+ new RandomAccessFileReader(std::move(file), file_name));
s = ReadTableProperties(
file_reader.get(), file_meta->fd.GetFileSize(),
- Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions, &raw_table_properties);
+ Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
+ &raw_table_properties, false /* compression_type_missing */);
if (!s.ok()) {
return s;
}
int level) {
for (const auto& file_meta : storage_info_.files_[level]) {
auto fname =
- TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
+ TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
// 1. If the table is already present in table cache, load table
// properties from there.
false);
for (const auto& file_meta : files) {
auto fname =
- TableFileName(vset_->db_options_->db_paths,
+ TableFileName(cfd_->ioptions()->cf_paths,
file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
if (props->count(fname) == 0) {
// 1. If the table is already present in table cache, load table
for (auto& file_level : storage_info_.level_files_brief_) {
for (size_t i = 0; i < file_level.num_files; i++) {
total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
- vset_->env_options_, cfd_->internal_comparator(),
- file_level.files[i].fd);
+ env_options_, cfd_->internal_comparator(), file_level.files[i].fd,
+ mutable_cf_options_.prefix_extractor.get());
}
}
return total_usage;
for (const auto& file : vstorage->LevelFiles(level)) {
uint32_t path_id = file->fd.GetPathId();
std::string file_path;
- if (path_id < ioptions->db_paths.size()) {
- file_path = ioptions->db_paths[path_id].path;
+ if (path_id < ioptions->cf_paths.size()) {
+ file_path = ioptions->cf_paths[path_id].path;
} else {
- assert(!ioptions->db_paths.empty());
- file_path = ioptions->db_paths.back().path;
+ assert(!ioptions->cf_paths.empty());
+ file_path = ioptions->cf_paths.back().path;
}
- files.emplace_back(
+ files.emplace_back(SstFileMetaData{
MakeTableFileName("", file->fd.GetNumber()),
file_path,
- file->fd.GetFileSize(),
- file->smallest_seqno,
- file->largest_seqno,
+ static_cast<size_t>(file->fd.GetFileSize()),
+ file->fd.smallest_seqno,
+ file->fd.largest_seqno,
file->smallest.user_key().ToString(),
file->largest.user_key().ToString(),
- file->being_compacted);
+ file->stats.num_reads_sampled.load(std::memory_order_relaxed),
+ file->being_compacted});
level_size += file->fd.GetFileSize();
}
cf_meta->levels.emplace_back(
}
}
+uint64_t Version::GetSstFilesSize() {
+ uint64_t sst_files_size = 0;
+ for (int level = 0; level < storage_info_.num_levels_; level++) {
+ for (const auto& file_meta : storage_info_.LevelFiles(level)) {
+ sst_files_size += file_meta->fd.GetFileSize();
+ }
+ }
+ return sst_files_size;
+}
uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
// Estimation will be inaccurate when:
return;
}
+ bool should_sample = should_sample_file_read();
+
auto* arena = merge_iter_builder->GetArena();
if (level == 0) {
// Merge all level zero files together since they may overlap
for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
const auto& file = storage_info_.LevelFilesBrief(0).files[i];
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
- read_options, soptions, cfd_->internal_comparator(), file.fd,
- range_del_agg, nullptr, cfd_->internal_stats()->GetFileReadHist(0),
- false, arena, false /* skip_filters */, 0 /* level */));
+ read_options, soptions, cfd_->internal_comparator(), *file.file_metadata,
+ range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
+ cfd_->internal_stats()->GetFileReadHist(0), false, arena,
+ false /* skip_filters */, 0 /* level */));
}
- } else {
+ if (should_sample) {
+ // Count ones for every L0 files. This is done per iterator creation
+ // rather than Seek(), while files in other levels are recored per seek.
+ // If users execute one range query per iterator, there may be some
+ // discrepancy here.
+ for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
+ sample_file_read_inc(meta);
+ }
+ }
+ } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
- auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState));
- auto* state = new (mem)
- LevelFileIteratorState(cfd_->table_cache(), read_options, soptions,
- cfd_->internal_comparator(),
- cfd_->internal_stats()->GetFileReadHist(level),
- false /* for_compaction */,
- cfd_->ioptions()->prefix_extractor != nullptr,
- IsFilterSkipped(level), level, range_del_agg);
- mem = arena->AllocateAligned(sizeof(LevelFileNumIterator));
- auto* first_level_iter = new (mem) LevelFileNumIterator(
- cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level));
- merge_iter_builder->AddIterator(
- NewTwoLevelIterator(state, first_level_iter, arena, false));
+ auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
+ merge_iter_builder->AddIterator(new (mem) LevelIterator(
+ cfd_->table_cache(), read_options, soptions,
+ cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
+ mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
+ cfd_->internal_stats()->GetFileReadHist(level),
+ false /* for_compaction */, IsFilterSkipped(level), level,
+ range_del_agg));
}
}
+Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
+ const EnvOptions& env_options,
+ const Slice& smallest_user_key,
+ const Slice& largest_user_key,
+ int level, bool* overlap) {
+ assert(storage_info_.finalized_);
+
+ auto icmp = cfd_->internal_comparator();
+ auto ucmp = icmp.user_comparator();
+
+ Arena arena;
+ Status status;
+ RangeDelAggregator range_del_agg(icmp, {}, false);
+
+ *overlap = false;
+
+ if (level == 0) {
+ for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
+ const auto file = &storage_info_.LevelFilesBrief(0).files[i];
+ if (AfterFile(ucmp, &smallest_user_key, file) ||
+ BeforeFile(ucmp, &largest_user_key, file)) {
+ continue;
+ }
+ ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
+ read_options, env_options, cfd_->internal_comparator(), *file->file_metadata,
+ &range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr,
+ cfd_->internal_stats()->GetFileReadHist(0), false, &arena,
+ false /* skip_filters */, 0 /* level */));
+ status = OverlapWithIterator(
+ ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
+ if (!status.ok() || *overlap) {
+ break;
+ }
+ }
+ } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
+ auto mem = arena.AllocateAligned(sizeof(LevelIterator));
+ ScopedArenaIterator iter(new (mem) LevelIterator(
+ cfd_->table_cache(), read_options, env_options,
+ cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
+ mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
+ cfd_->internal_stats()->GetFileReadHist(level),
+ false /* for_compaction */, IsFilterSkipped(level), level,
+ &range_del_agg));
+ status = OverlapWithIterator(
+ ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
+ }
+
+ if (status.ok() && *overlap == false &&
+ range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
+ *overlap = true;
+ }
+ return status;
+}
+
VersionStorageInfo::VersionStorageInfo(
const InternalKeyComparator* internal_comparator,
const Comparator* user_comparator, int levels,
current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
current_num_deletions_ = ref_vstorage->current_num_deletions_;
current_num_samples_ = ref_vstorage->current_num_samples_;
+ oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
}
}
Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
+ const EnvOptions& env_opt,
+ const MutableCFOptions mutable_cf_options,
uint64_t version_number)
: env_(vset->env_),
cfd_(column_family_data),
next_(this),
prev_(this),
refs_(0),
+ env_options_(env_opt),
+ mutable_cf_options_(mutable_cf_options),
version_number_(version_number) {}
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
PinnableSlice* value, Status* status,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, bool* value_found,
- bool* key_exists, SequenceNumber* seq) {
+ bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
+ bool* is_blob) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
value, value_found, merge_context, range_del_agg, this->env_, seq,
- merge_operator_ ? &pinned_iters_mgr : nullptr);
+ merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
// Pin blocks that we read to hold merge operands
if (merge_operator_) {
storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
+
while (f != nullptr) {
+ if (get_context.sample()) {
+ sample_file_read_inc(f->file_metadata);
+ }
+
*status = table_cache_->Get(
- read_options, *internal_comparator(), f->fd, ikey, &get_context,
+ read_options, *internal_comparator(), *f->file_metadata, ikey,
+ &get_context, mutable_cf_options_.prefix_extractor.get(),
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
return;
}
+ // report the counters before returning
+ if (get_context.State() != GetContext::kNotFound &&
+ get_context.State() != GetContext::kMerge &&
+ db_statistics_ != nullptr) {
+ get_context.ReportCounters();
+ }
switch (get_context.State()) {
case GetContext::kNotFound:
// Keep searching in other files
break;
+ case GetContext::kMerge:
+ break;
case GetContext::kFound:
if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0);
case GetContext::kCorrupt:
*status = Status::Corruption("corrupted key for ", user_key);
return;
- case GetContext::kMerge:
- break;
+ case GetContext::kBlobIndex:
+ ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
+ *status = Status::NotSupported(
+ "Encounter unexpected blob index. Please open DB with "
+ "rocksdb::blob_db::BlobDB instead.");
+ return;
}
f = fp.GetNextFile();
}
+ if (db_statistics_ != nullptr) {
+ get_context.ReportCounters();
+ }
if (GetContext::kMerge == get_context.State()) {
if (!merge_operator_) {
*status = Status::InvalidArgument(
std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, merge_context->GetOperands(),
- str_value, info_log_, db_statistics_, env_);
+ str_value, info_log_, db_statistics_, env_,
+ nullptr /* result_operand */, true);
if (LIKELY(value != nullptr)) {
value->PinSelf();
}
storage_info_.GenerateFileIndexer();
storage_info_.GenerateLevelFilesBrief();
storage_info_.GenerateLevel0NonOverlapping();
+ storage_info_.GenerateBottommostFiles();
}
bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
storage_info_.UpdateAccumulatedStats(file_meta);
// when option "max_open_files" is -1, all the file metadata has
// already been read, so MaybeInitializeFileMetaData() won't incur
- // any I/O cost.
- if (vset_->db_options_->max_open_files == -1) {
+ // any I/O cost. "max_open_files=-1" means that the table cache passed
+ // to the VersionSet and then to the ColumnFamilySet has a size of
+ // TableCache::kInfiniteCapacity
+ if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
+ TableCache::kInfiniteCapacity) {
continue;
}
if (++init_count >= kMaxInitCount) {
return 0;
}
+int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
+ if (allow_ingest_behind) {
+ assert(num_levels() > 1);
+ return num_levels() - 2;
+ }
+ return num_levels() - 1;
+}
+
void VersionStorageInfo::EstimateCompactionBytesNeeded(
const MutableCFOptions& mutable_cf_options) {
// Only implemented for level-based compaction
// accumulated bytes.
uint64_t bytes_compact_to_next_level = 0;
+ uint64_t level_size = 0;
+ for (auto* f : files_[0]) {
+ level_size += f->fd.GetFileSize();
+ }
// Level 0
bool level0_compact_triggered = false;
- if (static_cast<int>(files_[0].size()) >
- mutable_cf_options.level0_file_num_compaction_trigger) {
+ if (static_cast<int>(files_[0].size()) >=
+ mutable_cf_options.level0_file_num_compaction_trigger ||
+ level_size >= mutable_cf_options.max_bytes_for_level_base) {
level0_compact_triggered = true;
- for (auto* f : files_[0]) {
- bytes_compact_to_next_level += f->fd.GetFileSize();
- }
- estimated_compaction_needed_bytes_ = bytes_compact_to_next_level;
+ estimated_compaction_needed_bytes_ = level_size;
+ bytes_compact_to_next_level = level_size;
} else {
estimated_compaction_needed_bytes_ = 0;
}
// Level 1 and up.
uint64_t bytes_next_level = 0;
for (int level = base_level(); level <= MaxInputLevel(); level++) {
- uint64_t level_size = 0;
+ level_size = 0;
if (bytes_next_level > 0) {
#ifndef NDEBUG
uint64_t level_size2 = 0;
}
}
+namespace {
+uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
+ const MutableCFOptions& mutable_cf_options,
+ const std::vector<FileMetaData*>& files) {
+ uint32_t ttl_expired_files_count = 0;
+
+ int64_t _current_time;
+ auto status = ioptions.env->GetCurrentTime(&_current_time);
+ if (status.ok()) {
+ const uint64_t current_time = static_cast<uint64_t>(_current_time);
+ for (auto f : files) {
+ if (!f->being_compacted && f->fd.table_reader != nullptr &&
+ f->fd.table_reader->GetTableProperties() != nullptr) {
+ auto creation_time =
+ f->fd.table_reader->GetTableProperties()->creation_time;
+ if (creation_time > 0 &&
+ creation_time < (current_time -
+ mutable_cf_options.compaction_options_fifo.ttl)) {
+ ttl_expired_files_count++;
+ }
+ }
+ }
+ }
+ return ttl_expired_files_count;
+}
+} // anonymous namespace
+
void VersionStorageInfo::ComputeCompactionScore(
const ImmutableCFOptions& immutable_cf_options,
const MutableCFOptions& mutable_cf_options) {
}
if (compaction_style_ == kCompactionStyleFIFO) {
- score =
- static_cast<double>(total_size) /
- immutable_cf_options.compaction_options_fifo.max_table_files_size;
+ score = static_cast<double>(total_size) /
+ mutable_cf_options.compaction_options_fifo.max_table_files_size;
+ if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
+ score = std::max(
+ static_cast<double>(num_sorted_runs) /
+ mutable_cf_options.level0_file_num_compaction_trigger,
+ score);
+ }
+ if (mutable_cf_options.compaction_options_fifo.ttl > 0) {
+ score = std::max(
+ static_cast<double>(GetExpiredTtlFilesCount(
+ immutable_cf_options, mutable_cf_options, files_[level])),
+ score);
+ }
+
} else {
score = static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger;
}
}
ComputeFilesMarkedForCompaction();
+ ComputeBottommostFilesMarkedForCompaction();
+ if (mutable_cf_options.ttl > 0) {
+ ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
+ }
EstimateCompactionBytesNeeded(mutable_cf_options);
}
}
}
+void VersionStorageInfo::ComputeExpiredTtlFiles(
+ const ImmutableCFOptions& ioptions, const uint64_t ttl) {
+ assert(ttl > 0);
+
+ expired_ttl_files_.clear();
+
+ int64_t _current_time;
+ auto status = ioptions.env->GetCurrentTime(&_current_time);
+ if (!status.ok()) {
+ return;
+ }
+ const uint64_t current_time = static_cast<uint64_t>(_current_time);
+
+ for (int level = 0; level < num_levels() - 1; level++) {
+ for (auto f : files_[level]) {
+ if (!f->being_compacted && f->fd.table_reader != nullptr &&
+ f->fd.table_reader->GetTableProperties() != nullptr) {
+ auto creation_time =
+ f->fd.table_reader->GetTableProperties()->creation_time;
+ if (creation_time > 0 && creation_time < (current_time - ttl)) {
+ expired_ttl_files_.emplace_back(level, f);
+ }
+ }
+ }
+ }
+}
+
namespace {
// used to sort files by size
}
assert(false);
}
+#else
+ (void)info_log;
#endif
f->refs++;
level_files->push_back(f);
// 4. GenerateFileIndexer();
// 5. GenerateLevelFilesBrief();
// 6. GenerateLevel0NonOverlapping();
+// 7. GenerateBottommostFiles();
void VersionStorageInfo::SetFinalized() {
finalized_ = true;
#ifndef NDEBUG
void VersionStorageInfo::UpdateFilesByCompactionPri(
CompactionPri compaction_pri) {
- if (compaction_style_ == kCompactionStyleFIFO ||
+ if (compaction_style_ == kCompactionStyleNone ||
+ compaction_style_ == kCompactionStyleFIFO ||
compaction_style_ == kCompactionStyleUniversal) {
// don't need this
return;
case kOldestLargestSeqFirst:
std::sort(temp.begin(), temp.end(),
[](const Fsize& f1, const Fsize& f2) -> bool {
- return f1.file->largest_seqno < f2.file->largest_seqno;
+ return f1.file->fd.largest_seqno <
+ f2.file->fd.largest_seqno;
});
break;
case kOldestSmallestSeqFirst:
std::sort(temp.begin(), temp.end(),
[](const Fsize& f1, const Fsize& f2) -> bool {
- return f1.file->smallest_seqno < f2.file->smallest_seqno;
+ return f1.file->fd.smallest_seqno <
+ f2.file->fd.smallest_seqno;
});
break;
case kMinOverlappingRatio:
}
}
+void VersionStorageInfo::GenerateBottommostFiles() {
+ assert(!finalized_);
+ assert(bottommost_files_.empty());
+ for (size_t level = 0; level < level_files_brief_.size(); ++level) {
+ for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
+ ++file_idx) {
+ const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
+ int l0_file_idx;
+ if (level == 0) {
+ l0_file_idx = static_cast<int>(file_idx);
+ } else {
+ l0_file_idx = -1;
+ }
+ if (!RangeMightExistAfterSortedRun(f.smallest_key, f.largest_key,
+ static_cast<int>(level),
+ l0_file_idx)) {
+ bottommost_files_.emplace_back(static_cast<int>(level),
+ f.file_metadata);
+ }
+ }
+ }
+}
+
+void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
+ assert(seqnum >= oldest_snapshot_seqnum_);
+ oldest_snapshot_seqnum_ = seqnum;
+ if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
+ ComputeBottommostFilesMarkedForCompaction();
+ }
+}
+
+void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
+ bottommost_files_marked_for_compaction_.clear();
+ bottommost_files_mark_threshold_ = kMaxSequenceNumber;
+ for (auto& level_and_file : bottommost_files_) {
+ if (!level_and_file.second->being_compacted &&
+ level_and_file.second->fd.largest_seqno != 0 &&
+ level_and_file.second->num_deletions > 1) {
+ // largest_seqno might be nonzero due to containing the final key in an
+ // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
+ // ensures the file really contains deleted or overwritten keys.
+ if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
+ bottommost_files_marked_for_compaction_.push_back(level_and_file);
+ } else {
+ bottommost_files_mark_threshold_ =
+ std::min(bottommost_files_mark_threshold_,
+ level_and_file.second->fd.largest_seqno);
+ }
+ }
+ }
+}
+
void Version::Ref() {
++refs_;
}
void VersionStorageInfo::GetOverlappingInputs(
int level, const InternalKey* begin, const InternalKey* end,
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
- bool expand_range) const {
+ bool expand_range, InternalKey** next_smallest) const {
if (level >= num_non_empty_levels_) {
// this level is empty, no overlapping inputs
return;
}
inputs->clear();
+ if (file_index) {
+ *file_index = -1;
+ }
+ const Comparator* user_cmp = user_comparator_;
+ if (level > 0) {
+ GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
+ file_index, false, next_smallest);
+ return;
+ }
+
+ if (next_smallest) {
+ // next_smallest key only makes sense for non-level 0, where files are
+ // non-overlapping
+ *next_smallest = nullptr;
+ }
+
Slice user_begin, user_end;
if (begin != nullptr) {
user_begin = begin->user_key();
if (end != nullptr) {
user_end = end->user_key();
}
- if (file_index) {
- *file_index = -1;
- }
- const Comparator* user_cmp = user_comparator_;
- if (begin != nullptr && end != nullptr && level > 0) {
- GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs,
- hint_index, file_index);
- return;
- }
- for (size_t i = 0; i < level_files_brief_[level].num_files; ) {
- FdWithKeyRange* f = &(level_files_brief_[level].files[i++]);
- const Slice file_start = ExtractUserKey(f->smallest_key);
- const Slice file_limit = ExtractUserKey(f->largest_key);
- if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
- // "f" is completely before specified range; skip it
- } else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 0) {
- // "f" is completely after specified range; skip it
- } else {
- inputs->push_back(files_[level][i-1]);
- if (level == 0 && expand_range) {
- // Level-0 files may overlap each other. So check if the newly
- // added file has expanded the range. If so, restart search.
- if (begin != nullptr && user_cmp->Compare(file_start, user_begin) < 0) {
- user_begin = file_start;
- inputs->clear();
- i = 0;
- } else if (end != nullptr
- && user_cmp->Compare(file_limit, user_end) > 0) {
- user_end = file_limit;
- inputs->clear();
- i = 0;
+ // index stores the file index need to check.
+ std::list<size_t> index;
+ for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
+ index.emplace_back(i);
+ }
+
+ while (!index.empty()) {
+ bool found_overlapping_file = false;
+ auto iter = index.begin();
+ while (iter != index.end()) {
+ FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
+ const Slice file_start = ExtractUserKey(f->smallest_key);
+ const Slice file_limit = ExtractUserKey(f->largest_key);
+ if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
+ // "f" is completely before specified range; skip it
+ iter++;
+ } else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 0) {
+ // "f" is completely after specified range; skip it
+ iter++;
+ } else {
+ // if overlap
+ inputs->emplace_back(files_[level][*iter]);
+ found_overlapping_file = true;
+ // record the first file index.
+ if (file_index && *file_index == -1) {
+ *file_index = static_cast<int>(*iter);
+ }
+ // the related file is overlap, erase to avoid checking again.
+ iter = index.erase(iter);
+ if (expand_range) {
+ if (begin != nullptr &&
+ user_cmp->Compare(file_start, user_begin) < 0) {
+ user_begin = file_start;
+ }
+ if (end != nullptr &&
+ user_cmp->Compare(file_limit, user_end) > 0) {
+ user_end = file_limit;
+ }
}
- } else if (file_index) {
- *file_index = static_cast<int>(i) - 1;
}
}
+ // if all the files left are not overlap, break
+ if (!found_overlapping_file) {
+ break;
+ }
}
}
void VersionStorageInfo::GetCleanInputsWithinInterval(
int level, const InternalKey* begin, const InternalKey* end,
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
- if (level >= num_non_empty_levels_) {
+ inputs->clear();
+ if (file_index) {
+ *file_index = -1;
+ }
+ if (level >= num_non_empty_levels_ || level == 0 ||
+ level_files_brief_[level].num_files == 0) {
// this level is empty, no inputs within range
+ // also don't support clean input interval within L0
return;
}
- inputs->clear();
- Slice user_begin, user_end;
- if (begin != nullptr) {
- user_begin = begin->user_key();
+ const auto& level_files = level_files_brief_[level];
+ if (begin == nullptr) {
+ begin = &level_files.files[0].file_metadata->smallest;
}
- if (end != nullptr) {
- user_end = end->user_key();
+ if (end == nullptr) {
+ end = &level_files.files[level_files.num_files - 1].file_metadata->largest;
}
- if (file_index) {
- *file_index = -1;
+
+ GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
+ hint_index, file_index,
+ true /* within_interval */);
+}
+
+namespace {
+
+const uint64_t kRangeTombstoneSentinel =
+ PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
+
+// Utility for comparing sstable boundary keys. Returns -1 if either a or b is
+// null which provides the property that a==null indicates a key that is less
+// than any key and b==null indicates a key that is greater than any key. Note
+// that the comparison is performed primarily on the user-key portion of the
+// key. If the user-keys compare equal, an additional test is made to sort
+// range tombstone sentinel keys before other keys with the same user-key. The
+// result is that 2 user-keys will compare equal if they differ purely on
+// their sequence number and value, but the range tombstone sentinel for that
+// user-key will compare not equal. This is necessary because the range
+// tombstone sentinel key is set as the largest key for an sstable even though
+// that key never appears in the database. We don't want adjacent sstables to
+// be considered overlapping if they are separated by the range tombstone
+// sentinel.
+int sstableKeyCompare(const Comparator* user_cmp,
+ const InternalKey& a, const InternalKey& b) {
+ auto c = user_cmp->Compare(a.user_key(), b.user_key());
+ if (c != 0) {
+ return c;
+ }
+ auto a_footer = ExtractInternalKeyFooter(a.Encode());
+ auto b_footer = ExtractInternalKeyFooter(b.Encode());
+ if (a_footer == kRangeTombstoneSentinel) {
+ if (b_footer != kRangeTombstoneSentinel) {
+ return -1;
+ }
+ } else if (b_footer == kRangeTombstoneSentinel) {
+ return 1;
}
- if (begin != nullptr && end != nullptr && level > 0) {
- GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs,
- hint_index, file_index,
- true /* within_interval */);
+ return 0;
+}
+
+int sstableKeyCompare(const Comparator* user_cmp,
+ const InternalKey* a, const InternalKey& b) {
+ if (a == nullptr) {
+ return -1;
}
+ return sstableKeyCompare(user_cmp, *a, b);
}
+int sstableKeyCompare(const Comparator* user_cmp,
+ const InternalKey& a, const InternalKey* b) {
+ if (b == nullptr) {
+ return -1;
+ }
+ return sstableKeyCompare(user_cmp, a, *b);
+}
+
+} // namespace
+
// Store in "*inputs" all files in "level" that overlap [begin,end]
// Employ binary search to find at least one file that overlaps the
// specified range. From that file, iterate backwards and
// within range [begin, end]. "clean" means there is a boudnary
// between the files in "*inputs" and the surrounding files
void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
- int level, const Slice& user_begin, const Slice& user_end,
+ int level, const InternalKey* begin, const InternalKey* end,
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
- bool within_interval) const {
+ bool within_interval, InternalKey** next_smallest) const {
assert(level > 0);
int min = 0;
int mid = 0;
int max = static_cast<int>(files_[level].size()) - 1;
bool foundOverlap = false;
- const Comparator* user_cmp = user_comparator_;
+ auto user_cmp = user_comparator_;
// if the caller already knows the index of a file that has overlap,
// then we can skip the binary search.
while (!foundOverlap && min <= max) {
mid = (min + max)/2;
FdWithKeyRange* f = &(level_files_brief_[level].files[mid]);
- const Slice file_start = ExtractUserKey(f->smallest_key);
- const Slice file_limit = ExtractUserKey(f->largest_key);
- if ((!within_interval && user_cmp->Compare(file_limit, user_begin) < 0) ||
- (within_interval && user_cmp->Compare(file_start, user_begin) < 0)) {
+ auto& smallest = f->file_metadata->smallest;
+ auto& largest = f->file_metadata->largest;
+ if ((!within_interval && sstableKeyCompare(user_cmp, begin, largest) > 0) ||
+ (within_interval && sstableKeyCompare(user_cmp, begin, smallest) > 0)) {
min = mid + 1;
} else if ((!within_interval &&
- user_cmp->Compare(user_end, file_start) < 0) ||
+ sstableKeyCompare(user_cmp, smallest, end) > 0) ||
(within_interval &&
- user_cmp->Compare(user_end, file_limit) < 0)) {
+ sstableKeyCompare(user_cmp, largest, end) > 0)) {
max = mid - 1;
} else {
foundOverlap = true;
// If there were no overlapping files, return immediately.
if (!foundOverlap) {
+ if (next_smallest) {
+ next_smallest = nullptr;
+ }
return;
}
// returns the index where an overlap is found
int start_index, end_index;
if (within_interval) {
- ExtendFileRangeWithinInterval(level, user_begin, user_end, mid, &start_index,
- &end_index);
+ ExtendFileRangeWithinInterval(level, begin, end, mid,
+ &start_index, &end_index);
} else {
- ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid,
+ ExtendFileRangeOverlappingInterval(level, begin, end, mid,
&start_index, &end_index);
+ assert(end_index >= start_index);
}
- assert(end_index >= start_index);
// insert overlapping files into vector
for (int i = start_index; i <= end_index; i++) {
inputs->push_back(files_[level][i]);
}
+
+ if (next_smallest != nullptr) {
+ // Provide the next key outside the range covered by inputs
+ if (++end_index < static_cast<int>(files_[level].size())) {
+ **next_smallest = files_[level][end_index]->smallest;
+ } else {
+ *next_smallest = nullptr;
+ }
+ }
}
// Store in *start_index and *end_index the range of all files in
// and forward to find all overlapping files.
// Use FileLevel in searching, make it faster
void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
- int level, const Slice& user_begin, const Slice& user_end,
+ int level, const InternalKey* begin, const InternalKey* end,
unsigned int mid_index, int* start_index, int* end_index) const {
- const Comparator* user_cmp = user_comparator_;
+ auto user_cmp = user_comparator_;
const FdWithKeyRange* files = level_files_brief_[level].files;
#ifndef NDEBUG
{
// assert that the file at mid_index overlaps with the range
assert(mid_index < level_files_brief_[level].num_files);
const FdWithKeyRange* f = &files[mid_index];
- const Slice fstart = ExtractUserKey(f->smallest_key);
- const Slice flimit = ExtractUserKey(f->largest_key);
- if (user_cmp->Compare(fstart, user_begin) >= 0) {
- assert(user_cmp->Compare(fstart, user_end) <= 0);
+ auto& smallest = f->file_metadata->smallest;
+ auto& largest = f->file_metadata->largest;
+ if (sstableKeyCompare(user_cmp, begin, smallest) <= 0) {
+ assert(sstableKeyCompare(user_cmp, smallest, end) <= 0);
} else {
- assert(user_cmp->Compare(flimit, user_begin) >= 0);
+ // fprintf(stderr, "ExtendFileRangeOverlappingInterval\n%s - %s\n%s - %s\n%d %d\n",
+ // begin ? begin->DebugString().c_str() : "(null)",
+ // end ? end->DebugString().c_str() : "(null)",
+ // smallest->DebugString().c_str(),
+ // largest->DebugString().c_str(),
+ // sstableKeyCompare(user_cmp, smallest, begin),
+ // sstableKeyCompare(user_cmp, largest, begin));
+ assert(sstableKeyCompare(user_cmp, begin, largest) <= 0);
}
}
#endif
*start_index = mid_index + 1;
*end_index = mid_index;
- int count __attribute__((unused)) = 0;
+ int count __attribute__((__unused__));
+ count = 0;
// check backwards from 'mid' to lower indices
for (int i = mid_index; i >= 0 ; i--) {
const FdWithKeyRange* f = &files[i];
- const Slice file_limit = ExtractUserKey(f->largest_key);
- if (user_cmp->Compare(file_limit, user_begin) >= 0) {
+ auto& largest = f->file_metadata->largest;
+ if (sstableKeyCompare(user_cmp, begin, largest) <= 0) {
*start_index = i;
assert((count++, true));
} else {
for (unsigned int i = mid_index+1;
i < level_files_brief_[level].num_files; i++) {
const FdWithKeyRange* f = &files[i];
- const Slice file_start = ExtractUserKey(f->smallest_key);
- if (user_cmp->Compare(file_start, user_end) <= 0) {
+ auto& smallest = f->file_metadata->smallest;
+ if (sstableKeyCompare(user_cmp, smallest, end) <= 0) {
assert((count++, true));
*end_index = i;
} else {
// the clean range required.
// Use FileLevel in searching, make it faster
void VersionStorageInfo::ExtendFileRangeWithinInterval(
- int level, const Slice& user_begin, const Slice& user_end,
+ int level, const InternalKey* begin, const InternalKey* end,
unsigned int mid_index, int* start_index, int* end_index) const {
assert(level != 0);
- const Comparator* user_cmp = user_comparator_;
+ auto* user_cmp = user_comparator_;
const FdWithKeyRange* files = level_files_brief_[level].files;
#ifndef NDEBUG
{
// assert that the file at mid_index is within the range
assert(mid_index < level_files_brief_[level].num_files);
const FdWithKeyRange* f = &files[mid_index];
- const Slice fstart = ExtractUserKey(f->smallest_key);
- const Slice flimit = ExtractUserKey(f->largest_key);
- assert(user_cmp->Compare(fstart, user_begin) >= 0 &&
- user_cmp->Compare(flimit, user_end) <= 0);
+ auto& smallest = f->file_metadata->smallest;
+ auto& largest = f->file_metadata->largest;
+ assert(sstableKeyCompare(user_cmp, begin, smallest) <= 0 &&
+ sstableKeyCompare(user_cmp, largest, end) <= 0);
}
#endif
- ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid_index,
+ ExtendFileRangeOverlappingInterval(level, begin, end, mid_index,
start_index, end_index);
int left = *start_index;
int right = *end_index;
// shrink from left to right
while (left <= right) {
- const Slice& first_key_in_range = ExtractUserKey(files[left].smallest_key);
- if (user_cmp->Compare(first_key_in_range, user_begin) < 0) {
+ auto& smallest = files[left].file_metadata->smallest;
+ if (sstableKeyCompare(user_cmp, begin, smallest) > 0) {
left++;
continue;
}
if (left > 0) { // If not first file
- const Slice& last_key_before =
- ExtractUserKey(files[left - 1].largest_key);
- if (user_cmp->Equal(first_key_in_range, last_key_before)) {
- // The first user key in range overlaps with the previous file's last
- // key
+ auto& largest = files[left - 1].file_metadata->largest;
+ if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
left++;
continue;
}
}
// shrink from right to left
while (left <= right) {
- const Slice last_key_in_range = ExtractUserKey(files[right].largest_key);
- if (user_cmp->Compare(last_key_in_range, user_end) > 0) {
+ auto& largest = files[right].file_metadata->largest;
+ if (sstableKeyCompare(user_cmp, largest, end) > 0) {
right--;
continue;
}
if (right < static_cast<int>(level_files_brief_[level].num_files) -
1) { // If not the last file
- const Slice first_key_after =
- ExtractUserKey(files[right + 1].smallest_key);
- if (user_cmp->Equal(last_key_in_range, first_key_after)) {
+ auto& smallest = files[right + 1].file_metadata->smallest;
+ if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
// The last user key in range overlaps with the next file's first key
right--;
continue;
AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
int ret = snprintf(scratch->buffer + len, sz,
"#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
- f->fd.GetNumber(), f->smallest_seqno, sztxt,
+ f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
static_cast<int>(f->being_compacted));
if (ret < 0 || ret >= sz)
break;
level_size = MultiplyCheckOverflow(
level_size, options.max_bytes_for_level_multiplier);
}
- level_max_bytes_[i] = level_size;
+ // Don't set any level below base_bytes_max. Otherwise, the LSM can
+ // assume an hourglass shape where L1+ sizes are smaller than L0. This
+ // causes compaction scoring, which depends on level sizes, to favor L1+
+ // at the expense of L0, which may fill up and stall.
+ level_max_bytes_[i] = std::max(level_size, base_bytes_max);
}
}
}
return size;
}
+bool VersionStorageInfo::RangeMightExistAfterSortedRun(
+ const Slice& smallest_key, const Slice& largest_key, int last_level,
+ int last_l0_idx) {
+ assert((last_l0_idx != -1) == (last_level == 0));
+ // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
+ // bottommost only if it's the oldest L0 file and there are no files on older
+ // levels. It'd be better to consider it bottommost if there's no overlap in
+ // older levels/files.
+ if (last_level == 0 &&
+ last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
+ return true;
+ }
+
+ // Checks whether there are files living beyond the `last_level`. If lower
+ // levels have files, it checks for overlap between [`smallest_key`,
+ // `largest_key`] and those files. Bottomlevel optimizations can be made if
+ // there are no files in lower levels or if there is no overlap with the files
+ // in the lower levels.
+ for (int level = last_level + 1; level < num_levels(); level++) {
+ // The range is not in the bottommost level if there are files in lower
+ // levels when the `last_level` is 0 or if there are files in lower levels
+ // which overlap with [`smallest_key`, `largest_key`].
+ if (files_[level].size() > 0 &&
+ (last_level == 0 ||
+ OverlapInLevel(level, &smallest_key, &largest_key))) {
+ return true;
+ }
+ }
+ return false;
+}
void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
for (int level = 0; level < storage_info_.num_levels(); level++) {
}
}
-std::string Version::DebugString(bool hex) const {
+std::string Version::DebugString(bool hex, bool print_stats) const {
std::string r;
for (int level = 0; level < storage_info_.num_levels_; level++) {
// E.g.,
// --- level 1 ---
// 17:123['a' .. 'd']
// 20:43['e' .. 'g']
+ //
+ // if print_stats=true:
+ // 17:123['a' .. 'd'](4096)
r.append("--- level ");
AppendNumberTo(&r, level);
r.append(" --- version# ");
r.append(files[i]->smallest.DebugString(hex));
r.append(" .. ");
r.append(files[i]->largest.DebugString(hex));
- r.append("]\n");
+ r.append("]");
+ if (print_stats) {
+ r.append("(");
+ r.append(ToString(
+ files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
+ r.append(")");
+ }
+ r.append("\n");
}
}
return r;
bool done;
InstrumentedCondVar cv;
ColumnFamilyData* cfd;
+ const MutableCFOptions mutable_cf_options;
const autovector<VersionEdit*>& edit_list;
explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
+ const MutableCFOptions& cf_options,
const autovector<VersionEdit*>& e)
- : done(false), cv(mu), cfd(_cfd), edit_list(e) {}
+ : done(false),
+ cv(mu),
+ cfd(_cfd),
+ mutable_cf_options(cf_options),
+ edit_list(e) {}
};
VersionSet::VersionSet(const std::string& dbname,
- const ImmutableDBOptions* db_options,
+ const ImmutableDBOptions* _db_options,
const EnvOptions& storage_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller)
: column_family_set_(
- new ColumnFamilySet(dbname, db_options, storage_options, table_cache,
+ new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller)),
- env_(db_options->env),
+ env_(_db_options->env),
dbname_(dbname),
- db_options_(db_options),
+ db_options_(_db_options),
next_file_number_(2),
manifest_file_number_(0), // Filled by Recover()
+ options_file_number_(0),
pending_manifest_file_number_(0),
last_sequence_(0),
+ last_allocated_sequence_(0),
+ last_published_sequence_(0),
prev_log_number_(0),
current_version_number_(0),
manifest_file_size_(0),
- env_options_(storage_options),
- env_options_compactions_(
- env_->OptimizeForCompactionTableRead(env_options_, *db_options_)) {}
+ env_options_(storage_options) {}
void CloseTables(void* ptr, size_t) {
TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
VersionSet::~VersionSet() {
// we need to delete column_family_set_ because its destructor depends on
// VersionSet
- column_family_set_->get_table_cache()->ApplyToAllCacheEntries(&CloseTables,
- false);
+ Cache* table_cache = column_family_set_->get_table_cache();
+ table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */);
column_family_set_.reset();
- for (auto file : obsolete_files_) {
- delete file;
+ for (auto& file : obsolete_files_) {
+ if (file.metadata->table_reader_handle) {
+ table_cache->Release(file.metadata->table_reader_handle);
+ TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
+ }
+ file.DeleteMetadata();
}
obsolete_files_.clear();
}
v->next_->prev_ = v;
}
-Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
- const MutableCFOptions& mutable_cf_options,
- const autovector<VersionEdit*>& edit_list,
- InstrumentedMutex* mu, Directory* db_directory,
- bool new_descriptor_log,
- const ColumnFamilyOptions* new_cf_options) {
- mu->AssertHeld();
- // num of edits
- auto num_edits = edit_list.size();
- if (num_edits == 0) {
- return Status::OK();
- } else if (num_edits > 1) {
-#ifndef NDEBUG
- // no group commits for column family add or drop
- for (auto& edit : edit_list) {
- assert(!edit->IsColumnFamilyManipulation());
- }
-#endif
- }
-
- // column_family_data can be nullptr only if this is column_family_add.
- // in that case, we also need to specify ColumnFamilyOptions
- if (column_family_data == nullptr) {
- assert(num_edits == 1);
- assert(edit_list[0]->is_column_family_add_);
- assert(new_cf_options != nullptr);
- }
+Status VersionSet::ProcessManifestWrites(
+ std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
+ Directory* db_directory, bool new_descriptor_log,
+ const ColumnFamilyOptions* new_cf_options) {
+ assert(!writers.empty());
+ ManifestWriter& first_writer = writers.front();
+ ManifestWriter* last_writer = &first_writer;
- // queue our request
- ManifestWriter w(mu, column_family_data, edit_list);
- manifest_writers_.push_back(&w);
- while (!w.done && &w != manifest_writers_.front()) {
- w.cv.Wait();
- }
- if (w.done) {
- return w.status;
- }
- if (column_family_data != nullptr && column_family_data->IsDropped()) {
- // if column family is dropped by the time we get here, no need to write
- // anything to the manifest
- manifest_writers_.pop_front();
- // Notify new head of write queue
- if (!manifest_writers_.empty()) {
- manifest_writers_.front()->cv.Signal();
- }
- // we steal this code to also inform about cf-drop
- return Status::ShutdownInProgress();
- }
+ assert(!manifest_writers_.empty());
+ assert(manifest_writers_.front() == &first_writer);
autovector<VersionEdit*> batch_edits;
- Version* v = nullptr;
- std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(nullptr);
-
- // process all requests in the queue
- ManifestWriter* last_writer = &w;
- assert(!manifest_writers_.empty());
- assert(manifest_writers_.front() == &w);
- if (w.edit_list.front()->IsColumnFamilyManipulation()) {
- // no group commits for column family add or drop
- LogAndApplyCFHelper(w.edit_list.front());
- batch_edits.push_back(w.edit_list.front());
+ autovector<Version*> versions;
+ autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
+ std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
+
+ if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
+ // No group commits for column family add or drop
+ LogAndApplyCFHelper(first_writer.edit_list.front());
+ batch_edits.push_back(first_writer.edit_list.front());
} else {
- v = new Version(column_family_data, this, current_version_number_++);
- builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data));
- auto* builder = builder_guard->version_builder();
- for (const auto& writer : manifest_writers_) {
- if (writer->edit_list.front()->IsColumnFamilyManipulation() ||
- writer->cfd->GetID() != column_family_data->GetID()) {
+ auto it = manifest_writers_.cbegin();
+ while (it != manifest_writers_.cend()) {
+ if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
// no group commits for column family add or drop
- // also, group commits across column families are not supported
break;
}
- last_writer = writer;
- for (const auto& edit : writer->edit_list) {
- LogAndApplyHelper(column_family_data, builder, v, edit, mu);
- batch_edits.push_back(edit);
+ last_writer = *(it++);
+ assert(last_writer != nullptr);
+ assert(last_writer->cfd != nullptr);
+ if (last_writer->cfd != nullptr && last_writer->cfd->IsDropped()) {
+ continue;
+ }
+ // We do a linear search on versions because versions is small.
+ // TODO(yanqin) maybe consider unordered_map
+ Version* version = nullptr;
+ VersionBuilder* builder = nullptr;
+ for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
+ uint32_t cf_id = last_writer->cfd->GetID();
+ if (versions[i]->cfd()->GetID() == cf_id) {
+ version = versions[i];
+ assert(!builder_guards.empty() &&
+ builder_guards.size() == versions.size());
+ builder = builder_guards[i]->version_builder();
+ TEST_SYNC_POINT_CALLBACK(
+ "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
+ break;
+ }
+ }
+ if (version == nullptr) {
+ version = new Version(last_writer->cfd, this, env_options_,
+ last_writer->mutable_cf_options,
+ current_version_number_++);
+ versions.push_back(version);
+ mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
+ builder_guards.emplace_back(
+ new BaseReferencedVersionBuilder(last_writer->cfd));
+ builder = builder_guards.back()->version_builder();
+ }
+ assert(builder != nullptr); // make checker happy
+ for (const auto& e : last_writer->edit_list) {
+ LogAndApplyHelper(last_writer->cfd, builder, version, e, mu);
+ batch_edits.push_back(e);
}
}
- builder->SaveTo(v->storage_info());
+ for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
+ assert(!builder_guards.empty() &&
+ builder_guards.size() == versions.size());
+ auto* builder = builder_guards[i]->version_builder();
+ builder->SaveTo(versions[i]->storage_info());
+ }
}
- // Initialize new descriptor log file if necessary by creating
- // a temporary file that contains a snapshot of the current version.
uint64_t new_manifest_file_size = 0;
Status s;
}
if (new_descriptor_log) {
- // if we're writing out new snapshot make sure to persist max column family
+ // if we are writing out new snapshot make sure to persist max column
+ // family.
if (column_family_set_->GetMaxColumnFamily() > 0) {
- w.edit_list.front()->SetMaxColumnFamily(
+ first_writer.edit_list.front()->SetMaxColumnFamily(
column_family_set_->GetMaxColumnFamily());
}
}
- // Unlock during expensive operations. New writes cannot get here
- // because &w is ensuring that all new writes get queued.
{
-
+ EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
mu->Unlock();
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
- if (!w.edit_list.front()->IsColumnFamilyManipulation() &&
- db_options_->max_open_files == -1) {
- // unlimited table cache. Pre-load table handle now.
- // Need to do it out of the mutex.
- builder_guard->version_builder()->LoadTableHandlers(
- column_family_data->internal_stats(),
- column_family_data->ioptions()->optimize_filters_for_hits,
- true /* prefetch_index_and_filter_in_cache */);
+ if (!first_writer.edit_list.front()->IsColumnFamilyManipulation() &&
+ column_family_set_->get_table_cache()->GetCapacity() ==
+ TableCache::kInfiniteCapacity) {
+ for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
+ assert(!builder_guards.empty() &&
+ builder_guards.size() == versions.size());
+ assert(!mutable_cf_options_ptrs.empty() &&
+ builder_guards.size() == versions.size());
+ ColumnFamilyData* cfd = versions[i]->cfd_;
+ builder_guards[i]->version_builder()->LoadTableHandlers(
+ cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
+ true /* prefetch_index_and_filter_in_cache */,
+ mutable_cf_options_ptrs[i]->prefix_extractor.get());
+ }
}
// This is fine because everything inside of this block is serialized --
// only one thread can be here at the same time
if (new_descriptor_log) {
- // create manifest file
+ // create new manifest file
ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
pending_manifest_file_number_);
+ std::string descriptor_fname =
+ DescriptorFileName(dbname_, pending_manifest_file_number_);
unique_ptr<WritableFile> descriptor_file;
- EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
- s = NewWritableFile(
- env_, DescriptorFileName(dbname_, pending_manifest_file_number_),
- &descriptor_file, opt_env_opts);
+ s = NewWritableFile(env_, descriptor_fname, &descriptor_file,
+ opt_env_opts);
if (s.ok()) {
descriptor_file->SetPreallocationBlockSize(
db_options_->manifest_preallocation_size);
- unique_ptr<WritableFileWriter> file_writer(
- new WritableFileWriter(std::move(descriptor_file), opt_env_opts));
+ unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+ std::move(descriptor_file), descriptor_fname, opt_env_opts));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteSnapshot(descriptor_log_.get());
}
}
- if (!w.edit_list.front()->IsColumnFamilyManipulation()) {
- // This is cpu-heavy operations, which should be called outside mutex.
- v->PrepareApply(mutable_cf_options, true);
+ if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
+ for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
+ versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
+ }
}
- // Write new record to MANIFEST log
+ // Write new records to MANIFEST log
if (s.ok()) {
for (auto& e : batch_edits) {
std::string record;
if (!e->EncodeTo(&record)) {
- s = Status::Corruption(
- "Unable to Encode VersionEdit:" + e->DebugString(true));
+ s = Status::Corruption("Unable to encode VersionEdit:" +
+ e->DebugString(true));
break;
}
TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
s = SyncManifest(env_, db_options_, descriptor_log_->file());
}
if (!s.ok()) {
- ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write: %s\n",
+ ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
s.ToString().c_str());
}
}
new_manifest_file_size = descriptor_log_->file()->GetFileSize();
}
- if (w.edit_list.front()->is_column_family_drop_) {
+ if (first_writer.edit_list.front()->is_column_family_drop_) {
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
mu->Lock();
}
- // Append the old mainfest file to the obsolete_manifests_ list to be deleted
+ // Append the old manifest file to the obsolete_manifest_ list to be deleted
// by PurgeObsoleteFiles later.
if (s.ok() && new_descriptor_log) {
obsolete_manifests_.emplace_back(
DescriptorFileName("", manifest_file_number_));
}
- // Install the new version
+ // Install the new versions
if (s.ok()) {
- if (w.edit_list.front()->is_column_family_add_) {
- // no group commit on column family add
+ if (first_writer.edit_list.front()->is_column_family_add_) {
assert(batch_edits.size() == 1);
assert(new_cf_options != nullptr);
- CreateColumnFamily(*new_cf_options, w.edit_list.front());
- } else if (w.edit_list.front()->is_column_family_drop_) {
+ CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
+ } else if (first_writer.edit_list.front()->is_column_family_drop_) {
assert(batch_edits.size() == 1);
- column_family_data->SetDropped();
- if (column_family_data->Unref()) {
- delete column_family_data;
+ first_writer.cfd->SetDropped();
+ if (first_writer.cfd->Unref()) {
+ delete first_writer.cfd;
}
} else {
- uint64_t max_log_number_in_batch = 0;
+ // Each version in versions corresponds to a column family.
+ // For each column family, update its log number indicating that logs
+ // with number smaller than this should be ignored.
+ for (const auto version : versions) {
+ uint64_t max_log_number_in_batch = 0;
+ uint32_t cf_id = version->cfd_->GetID();
+ for (const auto& e : batch_edits) {
+ if (e->has_log_number_ && e->column_family_ == cf_id) {
+ max_log_number_in_batch =
+ std::max(max_log_number_in_batch, e->log_number_);
+ }
+ }
+ if (max_log_number_in_batch != 0) {
+ assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
+ version->cfd_->SetLogNumber(max_log_number_in_batch);
+ }
+ }
+
+ uint64_t last_min_log_number_to_keep = 0;
for (auto& e : batch_edits) {
- if (e->has_log_number_) {
- max_log_number_in_batch =
- std::max(max_log_number_in_batch, e->log_number_);
+ if (e->has_min_log_number_to_keep_) {
+ last_min_log_number_to_keep =
+ std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
}
}
- if (max_log_number_in_batch != 0) {
- assert(column_family_data->GetLogNumber() <= max_log_number_in_batch);
- column_family_data->SetLogNumber(max_log_number_in_batch);
+
+ if (last_min_log_number_to_keep != 0) {
+ // Should only be set in 2PC mode.
+ MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
}
- AppendVersion(column_family_data, v);
- }
+ for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
+ ColumnFamilyData* cfd = versions[i]->cfd_;
+ AppendVersion(cfd, versions[i]);
+ }
+ }
manifest_file_number_ = pending_manifest_file_number_;
manifest_file_size_ = new_manifest_file_size;
- prev_log_number_ = w.edit_list.front()->prev_log_number_;
+ prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
} else {
std::string version_edits;
for (auto& e : batch_edits) {
- version_edits = version_edits + "\n" + e->DebugString(true);
+ version_edits += ("\n" + e->DebugString(true));
+ }
+ ROCKS_LOG_ERROR(db_options_->info_log,
+ "Error in committing version edit to MANIFEST: %s",
+ version_edits.c_str());
+ for (auto v : versions) {
+ delete v;
}
- ROCKS_LOG_ERROR(
- db_options_->info_log,
- "[%s] Error in committing version edit to MANIFEST: %s",
- column_family_data ? column_family_data->GetName().c_str() : "<null>",
- version_edits.c_str());
- delete v;
if (new_descriptor_log) {
- ROCKS_LOG_INFO(db_options_->info_log, "Deleting manifest %" PRIu64
- " current manifest %" PRIu64 "\n",
+ ROCKS_LOG_INFO(db_options_->info_log,
+ "Deleting manifest %" PRIu64 " current manifest %" PRIu64
+ "\n",
manifest_file_number_, pending_manifest_file_number_);
descriptor_log_.reset();
env_->DeleteFile(
DescriptorFileName(dbname_, pending_manifest_file_number_));
}
}
+
pending_manifest_file_number_ = 0;
// wake up all the waiting writers
while (true) {
ManifestWriter* ready = manifest_writers_.front();
manifest_writers_.pop_front();
- if (ready != &w) {
- ready->status = s;
- ready->done = true;
+ bool need_signal = true;
+ for (const auto& w : writers) {
+ if (&w == ready) {
+ need_signal = false;
+ break;
+ }
+ }
+ ready->status = s;
+ ready->done = true;
+ if (need_signal) {
ready->cv.Signal();
}
- if (ready == last_writer) break;
+ if (ready == last_writer) {
+ break;
+ }
}
- // Notify new head of write queue
if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
return s;
}
+// 'datas' is gramatically incorrect. We still use this notation is to indicate
+// that this variable represents a collection of column_family_data.
+Status VersionSet::LogAndApply(
+ const std::vector<ColumnFamilyData*>& column_family_datas,
+ const std::vector<MutableCFOptions>& mutable_cf_options_list,
+ const std::vector<autovector<VersionEdit*>>& edit_lists,
+ InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log,
+ const ColumnFamilyOptions* new_cf_options) {
+ mu->AssertHeld();
+ int num_edits = 0;
+ for (const auto& elist : edit_lists) {
+ num_edits += static_cast<int>(elist.size());
+ }
+ if (num_edits == 0) {
+ return Status::OK();
+ } else if (num_edits > 1) {
+#ifndef NDEBUG
+ for (const auto& edit_list : edit_lists) {
+ for (const auto& edit : edit_list) {
+ assert(!edit->IsColumnFamilyManipulation());
+ }
+ }
+#endif /* ! NDEBUG */
+ }
+
+ int num_cfds = static_cast<int>(column_family_datas.size());
+ if (num_cfds == 1 && column_family_datas[0] == nullptr) {
+ assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
+ assert(edit_lists[0][0]->is_column_family_add_);
+ assert(new_cf_options != nullptr);
+ }
+ std::deque<ManifestWriter> writers;
+ if (num_cfds > 0) {
+ assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
+ assert(static_cast<size_t>(num_cfds) == edit_lists.size());
+ }
+ for (int i = 0; i < num_cfds; ++i) {
+ writers.emplace_back(mu, column_family_datas[i], mutable_cf_options_list[i],
+ edit_lists[i]);
+ manifest_writers_.push_back(&writers[i]);
+ }
+ assert(!writers.empty());
+ ManifestWriter& first_writer = writers.front();
+ while (!first_writer.done && &first_writer != manifest_writers_.front()) {
+ first_writer.cv.Wait();
+ }
+ if (first_writer.done) {
+ // All non-CF-manipulation operations can be grouped together and committed
+ // to MANIFEST. They should all have finished. The status code is stored in
+ // the first manifest writer.
+#ifndef NDEBUG
+ for (const auto& writer : writers) {
+ assert(writer.done);
+ }
+#endif /* !NDEBUG */
+ return first_writer.status;
+ }
+
+ int num_undropped_cfds = 0;
+ for (auto cfd : column_family_datas) {
+ // if cfd == nullptr, it is a column family add.
+ if (cfd == nullptr || !cfd->IsDropped()) {
+ ++num_undropped_cfds;
+ }
+ }
+ if (0 == num_undropped_cfds) {
+ // TODO (yanqin) maybe use a different status code to denote column family
+ // drop other than OK and ShutdownInProgress
+ for (int i = 0; i != num_cfds; ++i) {
+ manifest_writers_.pop_front();
+ }
+ // Notify new head of manifest write queue.
+ if (!manifest_writers_.empty()) {
+ manifest_writers_.front()->cv.Signal();
+ }
+ return Status::OK();
+ }
+
+ return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
+ new_cf_options);
+}
+
void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
assert(edit->IsColumnFamilyManipulation());
edit->SetNextFile(next_file_number_.load());
- edit->SetLastSequence(last_sequence_);
+ // The log might have data that is not visible to memtbale and hence have not
+ // updated the last_sequence_ yet. It is also possible that the log has is
+ // expecting some new data that is not written yet. Since LastSequence is an
+ // upper bound on the sequence, it is ok to record
+ // last_allocated_sequence_ as the last sequence.
+ edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
+ : last_sequence_);
if (edit->is_column_family_drop_) {
// if we drop column family, we have to make sure to save max column family,
// so that we don't reuse existing ID
}
void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
- VersionBuilder* builder, Version* v,
+ VersionBuilder* builder, Version* /*v*/,
VersionEdit* edit, InstrumentedMutex* mu) {
+#ifdef NDEBUG
+ (void)cfd;
+#endif
mu->AssertHeld();
assert(!edit->IsColumnFamilyManipulation());
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_.load());
- edit->SetLastSequence(last_sequence_);
+ // The log might have data that is not visible to memtbale and hence have not
+ // updated the last_sequence_ yet. It is also possible that the log has is
+ // expecting some new data that is not written yet. Since LastSequence is an
+ // upper bound on the sequence, it is ok to record
+ // last_allocated_sequence_ as the last sequence.
+ edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
+ : last_sequence_);
builder->Apply(edit);
}
+Status VersionSet::ApplyOneVersionEdit(
+ VersionEdit& edit,
+ const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
+ std::unordered_map<int, std::string>& column_families_not_found,
+ std::unordered_map<uint32_t, BaseReferencedVersionBuilder*>& builders,
+ bool* have_log_number, uint64_t* /* log_number */,
+ bool* have_prev_log_number, uint64_t* previous_log_number,
+ bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
+ SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
+ uint32_t* max_column_family) {
+ // Not found means that user didn't supply that column
+ // family option AND we encountered column family add
+ // record. Once we encounter column family drop record,
+ // we will delete the column family from
+ // column_families_not_found.
+ bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
+ column_families_not_found.end());
+ // in builders means that user supplied that column family
+ // option AND that we encountered column family add record
+ bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
+
+ // they can't both be true
+ assert(!(cf_in_not_found && cf_in_builders));
+
+ ColumnFamilyData* cfd = nullptr;
+
+ if (edit.is_column_family_add_) {
+ if (cf_in_builders || cf_in_not_found) {
+ return Status::Corruption(
+ "Manifest adding the same column family twice: " +
+ edit.column_family_name_);
+ }
+ auto cf_options = name_to_options.find(edit.column_family_name_);
+ if (cf_options == name_to_options.end()) {
+ column_families_not_found.insert(
+ {edit.column_family_, edit.column_family_name_});
+ } else {
+ cfd = CreateColumnFamily(cf_options->second, &edit);
+ cfd->set_initialized();
+ builders.insert(
+ {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
+ }
+ } else if (edit.is_column_family_drop_) {
+ if (cf_in_builders) {
+ auto builder = builders.find(edit.column_family_);
+ assert(builder != builders.end());
+ delete builder->second;
+ builders.erase(builder);
+ cfd = column_family_set_->GetColumnFamily(edit.column_family_);
+ assert(cfd != nullptr);
+ if (cfd->Unref()) {
+ delete cfd;
+ cfd = nullptr;
+ } else {
+ // who else can have reference to cfd!?
+ assert(false);
+ }
+ } else if (cf_in_not_found) {
+ column_families_not_found.erase(edit.column_family_);
+ } else {
+ return Status::Corruption(
+ "Manifest - dropping non-existing column family");
+ }
+ } else if (!cf_in_not_found) {
+ if (!cf_in_builders) {
+ return Status::Corruption(
+ "Manifest record referencing unknown column family");
+ }
+
+ cfd = column_family_set_->GetColumnFamily(edit.column_family_);
+ // this should never happen since cf_in_builders is true
+ assert(cfd != nullptr);
+
+ // if it is not column family add or column family drop,
+ // then it's a file add/delete, which should be forwarded
+ // to builder
+ auto builder = builders.find(edit.column_family_);
+ assert(builder != builders.end());
+ builder->second->version_builder()->Apply(&edit);
+ }
+
+ if (cfd != nullptr) {
+ if (edit.has_log_number_) {
+ if (cfd->GetLogNumber() > edit.log_number_) {
+ ROCKS_LOG_WARN(
+ db_options_->info_log,
+ "MANIFEST corruption detected, but ignored - Log numbers in "
+ "records NOT monotonically increasing");
+ } else {
+ cfd->SetLogNumber(edit.log_number_);
+ *have_log_number = true;
+ }
+ }
+ if (edit.has_comparator_ &&
+ edit.comparator_ != cfd->user_comparator()->Name()) {
+ return Status::InvalidArgument(
+ cfd->user_comparator()->Name(),
+ "does not match existing comparator " + edit.comparator_);
+ }
+ }
+
+ if (edit.has_prev_log_number_) {
+ *previous_log_number = edit.prev_log_number_;
+ *have_prev_log_number = true;
+ }
+
+ if (edit.has_next_file_number_) {
+ *next_file = edit.next_file_number_;
+ *have_next_file = true;
+ }
+
+ if (edit.has_max_column_family_) {
+ *max_column_family = edit.max_column_family_;
+ }
+
+ if (edit.has_min_log_number_to_keep_) {
+ *min_log_number_to_keep =
+ std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_);
+ }
+
+ if (edit.has_last_sequence_) {
+ *last_sequence = edit.last_sequence_;
+ *have_last_sequence = true;
+ }
+ return Status::OK();
+}
+
Status VersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only) {
{
unique_ptr<SequentialFile> manifest_file;
s = env_->NewSequentialFile(manifest_filename, &manifest_file,
- env_options_);
+ env_->OptimizeForManifestRead(env_options_));
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(
- new SequentialFileReader(std::move(manifest_file)));
+ new SequentialFileReader(std::move(manifest_file), manifest_filename));
}
uint64_t current_manifest_file_size;
s = env_->GetFileSize(manifest_filename, ¤t_manifest_file_size);
uint64_t log_number = 0;
uint64_t previous_log_number = 0;
uint32_t max_column_family = 0;
+ uint64_t min_log_number_to_keep = 0;
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
// add default column family
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
+ // In recovery, nobody else can access it, so it's fine to set it to be
+ // initialized earlier.
+ default_cfd->set_initialized();
builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
{
VersionSet::LogReporter reporter;
reporter.status = &s;
- log::Reader reader(NULL, std::move(manifest_file_reader), &reporter,
- true /*checksum*/, 0 /*initial_offset*/, 0);
+ log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
+ true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
+ std::vector<VersionEdit> replay_buffer;
+ size_t num_entries_decoded = 0;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
break;
}
- // Not found means that user didn't supply that column
- // family option AND we encountered column family add
- // record. Once we encounter column family drop record,
- // we will delete the column family from
- // column_families_not_found.
- bool cf_in_not_found =
- column_families_not_found.find(edit.column_family_) !=
- column_families_not_found.end();
- // in builders means that user supplied that column family
- // option AND that we encountered column family add record
- bool cf_in_builders =
- builders.find(edit.column_family_) != builders.end();
-
- // they can't both be true
- assert(!(cf_in_not_found && cf_in_builders));
-
- ColumnFamilyData* cfd = nullptr;
-
- if (edit.is_column_family_add_) {
- if (cf_in_builders || cf_in_not_found) {
- s = Status::Corruption(
- "Manifest adding the same column family twice");
- break;
- }
- auto cf_options = cf_name_to_options.find(edit.column_family_name_);
- if (cf_options == cf_name_to_options.end()) {
- column_families_not_found.insert(
- {edit.column_family_, edit.column_family_name_});
- } else {
- cfd = CreateColumnFamily(cf_options->second, &edit);
- builders.insert(
- {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
- }
- } else if (edit.is_column_family_drop_) {
- if (cf_in_builders) {
- auto builder = builders.find(edit.column_family_);
- assert(builder != builders.end());
- delete builder->second;
- builders.erase(builder);
- cfd = column_family_set_->GetColumnFamily(edit.column_family_);
- if (cfd->Unref()) {
- delete cfd;
- cfd = nullptr;
- } else {
- // who else can have reference to cfd!?
- assert(false);
- }
- } else if (cf_in_not_found) {
- column_families_not_found.erase(edit.column_family_);
- } else {
- s = Status::Corruption(
- "Manifest - dropping non-existing column family");
- break;
- }
- } else if (!cf_in_not_found) {
- if (!cf_in_builders) {
- s = Status::Corruption(
- "Manifest record referencing unknown column family");
- break;
+ if (edit.is_in_atomic_group_) {
+ if (replay_buffer.empty()) {
+ replay_buffer.resize(edit.remaining_entries_ + 1);
}
-
- cfd = column_family_set_->GetColumnFamily(edit.column_family_);
- // this should never happen since cf_in_builders is true
- assert(cfd != nullptr);
- if (edit.max_level_ >= cfd->current()->storage_info()->num_levels()) {
- s = Status::InvalidArgument(
- "db has more levels than options.num_levels");
- break;
+ ++num_entries_decoded;
+ if (num_entries_decoded + edit.remaining_entries_ !=
+ static_cast<uint32_t>(replay_buffer.size())) {
+ return Status::Corruption("corrupted atomic group");
}
-
- // if it is not column family add or column family drop,
- // then it's a file add/delete, which should be forwarded
- // to builder
- auto builder = builders.find(edit.column_family_);
- assert(builder != builders.end());
- builder->second->version_builder()->Apply(&edit);
- }
-
- if (cfd != nullptr) {
- if (edit.has_log_number_) {
- if (cfd->GetLogNumber() > edit.log_number_) {
- ROCKS_LOG_WARN(
- db_options_->info_log,
- "MANIFEST corruption detected, but ignored - Log numbers in "
- "records NOT monotonically increasing");
- } else {
- cfd->SetLogNumber(edit.log_number_);
- have_log_number = true;
+ replay_buffer[num_entries_decoded - 1] = std::move(edit);
+ if (num_entries_decoded == replay_buffer.size()) {
+ for (auto& e : replay_buffer) {
+ s = ApplyOneVersionEdit(
+ e, cf_name_to_options, column_families_not_found, builders,
+ &have_log_number, &log_number, &have_prev_log_number,
+ &previous_log_number, &have_next_file, &next_file,
+ &have_last_sequence, &last_sequence, &min_log_number_to_keep,
+ &max_column_family);
+ if (!s.ok()) {
+ break;
+ }
}
+ replay_buffer.clear();
+ num_entries_decoded = 0;
}
- if (edit.has_comparator_ &&
- edit.comparator_ != cfd->user_comparator()->Name()) {
- s = Status::InvalidArgument(
- cfd->user_comparator()->Name(),
- "does not match existing comparator " + edit.comparator_);
- break;
+ } else {
+ if (!replay_buffer.empty()) {
+ return Status::Corruption("corrupted atomic group");
}
+ s = ApplyOneVersionEdit(
+ edit, cf_name_to_options, column_families_not_found, builders,
+ &have_log_number, &log_number, &have_prev_log_number,
+ &previous_log_number, &have_next_file, &next_file,
+ &have_last_sequence, &last_sequence, &min_log_number_to_keep,
+ &max_column_family);
}
-
- if (edit.has_prev_log_number_) {
- previous_log_number = edit.prev_log_number_;
- have_prev_log_number = true;
- }
-
- if (edit.has_next_file_number_) {
- next_file = edit.next_file_number_;
- have_next_file = true;
- }
-
- if (edit.has_max_column_family_) {
- max_column_family = edit.max_column_family_;
- }
-
- if (edit.has_last_sequence_) {
- last_sequence = edit.last_sequence_;
- have_last_sequence = true;
+ if (!s.ok()) {
+ break;
}
}
}
column_family_set_->UpdateMaxColumnFamily(max_column_family);
- MarkFileNumberUsedDuringRecovery(previous_log_number);
- MarkFileNumberUsedDuringRecovery(log_number);
+ // When reading DB generated using old release, min_log_number_to_keep=0.
+ // All log files will be scanned for potential prepare entries.
+ MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
+ MarkFileNumberUsed(previous_log_number);
+ MarkFileNumberUsed(log_number);
}
// there were some column families in the MANIFEST that weren't specified
list_of_not_found);
}
+ if (s.ok()) {
+ for (auto cfd : *column_family_set_) {
+ assert(builders.count(cfd->GetID()) > 0);
+ auto* builder = builders[cfd->GetID()]->version_builder();
+ if (!builder->CheckConsistencyForNumLevels()) {
+ s = Status::InvalidArgument(
+ "db has more levels than options.num_levels");
+ break;
+ }
+ }
+ }
+
if (s.ok()) {
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
+ if (read_only) {
+ cfd->table_cache()->SetTablesAreImmortal();
+ }
+ assert(cfd->initialized());
auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end());
auto* builder = builders_iter->second->version_builder();
- if (db_options_->max_open_files == -1) {
+ if (GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
+ TableCache::kInfiniteCapacity) {
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
builder->LoadTableHandlers(
cfd->internal_stats(), db_options_->max_file_opening_threads,
- false /* prefetch_index_and_filter_in_cache */);
+ false /* prefetch_index_and_filter_in_cache */,
+ cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
}
- Version* v = new Version(cfd, this, current_version_number_++);
+ Version* v = new Version(cfd, this, env_options_,
+ *cfd->GetLatestMutableCFOptions(),
+ current_version_number_++);
builder->SaveTo(v->storage_info());
// Install recovered version
manifest_file_size_ = current_manifest_file_size;
next_file_number_.store(next_file + 1);
+ last_allocated_sequence_ = last_sequence;
+ last_published_sequence_ = last_sequence;
last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number;
"manifest_file_number is %lu, next_file_number is %lu, "
"last_sequence is %lu, log_number is %lu,"
"prev_log_number is %lu,"
- "max_column_family is %u\n",
+ "max_column_family is %u,"
+ "min_log_number_to_keep is %lu\n",
manifest_filename.c_str(), (unsigned long)manifest_file_number_,
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
(unsigned long)log_number, (unsigned long)prev_log_number_,
- column_family_set_->GetMaxColumnFamily());
+ column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
if (!s.ok()) {
return s;
}
- file_reader.reset(new SequentialFileReader(std::move(file)));
+ file_reader.reset(new SequentialFileReader(std::move(file), dscname));
}
std::map<uint32_t, std::string> column_family_names;
column_family_names.insert({0, kDefaultColumnFamilyName});
VersionSet::LogReporter reporter;
reporter.status = &s;
- log::Reader reader(NULL, std::move(file_reader), &reporter, true /*checksum*/,
- 0 /*initial_offset*/, 0);
+ log::Reader reader(nullptr, std::move(file_reader), &reporter,
+ true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
Status s;
{
unique_ptr<SequentialFile> file;
- s = options.env->NewSequentialFile(dscname, &file, env_options_);
+ s = options.env->NewSequentialFile(
+ dscname, &file, env_->OptimizeForManifestRead(env_options_));
if (!s.ok()) {
return s;
}
- file_reader.reset(new SequentialFileReader(std::move(file)));
+ file_reader.reset(new SequentialFileReader(std::move(file), dscname));
}
bool have_prev_log_number = false;
{
VersionSet::LogReporter reporter;
reporter.status = &s;
- log::Reader reader(NULL, std::move(file_reader), &reporter,
- true /*checksum*/, 0 /*initial_offset*/, 0);
+ log::Reader reader(nullptr, std::move(file_reader), &reporter,
+ true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
break;
}
cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
+ cfd->set_initialized();
builders.insert(
{edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
} else if (edit.is_column_family_drop_) {
cfd->SetLogNumber(edit.log_number_);
}
+
if (edit.has_prev_log_number_) {
previous_log_number = edit.prev_log_number_;
have_prev_log_number = true;
if (edit.has_max_column_family_) {
column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
}
+
+ if (edit.has_min_log_number_to_keep_) {
+ MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
+ }
}
}
file_reader.reset();
assert(builders_iter != builders.end());
auto builder = builders_iter->second->version_builder();
- Version* v = new Version(cfd, this, current_version_number_++);
+ Version* v = new Version(cfd, this, env_options_,
+ *cfd->GetLatestMutableCFOptions(),
+ current_version_number_++);
builder->SaveTo(v->storage_info());
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
}
next_file_number_.store(next_file + 1);
+ last_allocated_sequence_ = last_sequence;
+ last_published_sequence_ = last_sequence;
last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number;
printf(
"next_file_number %lu last_sequence "
- "%lu prev_log_number %lu max_column_family %u\n",
+ "%lu prev_log_number %lu max_column_family %u min_log_number_to_keep "
+ "%" PRIu64 "\n",
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence,
(unsigned long)previous_log_number,
- column_family_set_->GetMaxColumnFamily());
+ column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
}
return s;
}
#endif // ROCKSDB_LITE
-void VersionSet::MarkFileNumberUsedDuringRecovery(uint64_t number) {
- // only called during recovery which is single threaded, so this works because
- // there can't be concurrent calls
+void VersionSet::MarkFileNumberUsed(uint64_t number) {
+ // only called during recovery and repair which are single threaded, so this
+ // works because there can't be concurrent calls
if (next_file_number_.load(std::memory_order_relaxed) <= number) {
next_file_number_.store(number + 1, std::memory_order_relaxed);
}
}
+// Called only either from ::LogAndApply which is protected by mutex or during
+// recovery which is single-threaded.
+void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
+ if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
+ min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
+ }
+}
+
Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery?
if (cfd->IsDropped()) {
continue;
}
+ assert(cfd->initialized());
{
// Store column family info
VersionEdit edit;
cfd->current()->storage_info()->LevelFiles(level)) {
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
- f->smallest_seqno, f->largest_seqno,
+ f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction);
}
}
// approximate offset of "key" within the table.
TableReader* table_reader_ptr;
InternalIterator* iter = v->cfd_->table_cache()->NewIterator(
- ReadOptions(), env_options_, v->cfd_->internal_comparator(), f.fd,
- nullptr /* range_del_agg */, &table_reader_ptr);
+ ReadOptions(), v->env_options_, v->cfd_->internal_comparator(),
+ *f.file_metadata, nullptr /* range_del_agg */,
+ v->GetMutableCFOptions().prefix_extractor.get(), &table_reader_ptr);
if (table_reader_ptr != nullptr) {
result = table_reader_ptr->ApproximateOffsetOf(key);
}
// pre-calculate space requirement
int64_t total_files = 0;
for (auto cfd : *column_family_set_) {
+ if (!cfd->initialized()) {
+ continue;
+ }
Version* dummy_versions = cfd->dummy_versions();
for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
for (auto cfd : *column_family_set_) {
+ if (!cfd->initialized()) {
+ continue;
+ }
auto* current = cfd->current();
bool found_current = false;
Version* dummy_versions = cfd->dummy_versions();
}
InternalIterator* VersionSet::MakeInputIterator(
- const Compaction* c, RangeDelAggregator* range_del_agg) {
+ const Compaction* c, RangeDelAggregator* range_del_agg,
+ const EnvOptions& env_options_compactions) {
auto cfd = c->column_family_data();
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.fill_cache = false;
- if (c->ShouldFormSubcompactions()) {
- read_options.total_order_seek = true;
- }
+ // Compaction iterators shouldn't be confined to a single prefix.
+ // Compactions use Seek() for
+ // (a) concurrent compactions,
+ // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
+ read_options.total_order_seek = true;
// Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level.
const LevelFilesBrief* flevel = c->input_levels(which);
for (size_t i = 0; i < flevel->num_files; i++) {
list[num++] = cfd->table_cache()->NewIterator(
- read_options, env_options_compactions_,
- cfd->internal_comparator(), flevel->files[i].fd, range_del_agg,
+ read_options, env_options_compactions, cfd->internal_comparator(),
+ *flevel->files[i].file_metadata, range_del_agg,
+ c->mutable_cf_options()->prefix_extractor.get(),
nullptr /* table_reader_ptr */,
nullptr /* no per level latency histogram */,
true /* for_compaction */, nullptr /* arena */,
- false /* skip_filters */, (int)which /* level */);
+ false /* skip_filters */, static_cast<int>(which) /* level */);
}
} else {
// Create concatenating iterator for the files from this level
- list[num++] = NewTwoLevelIterator(
- new LevelFileIteratorState(
- cfd->table_cache(), read_options, env_options_compactions_,
- cfd->internal_comparator(),
- nullptr /* no per level latency histogram */,
- true /* for_compaction */, false /* prefix enabled */,
- false /* skip_filters */, (int)which /* level */,
- range_del_agg),
- new LevelFileNumIterator(cfd->internal_comparator(),
- c->input_levels(which)));
+ list[num++] = new LevelIterator(
+ cfd->table_cache(), read_options, env_options_compactions,
+ cfd->internal_comparator(), c->input_levels(which),
+ c->mutable_cf_options()->prefix_extractor.get(),
+ false /* should_sample */,
+ nullptr /* no per level latency histogram */,
+ true /* for_compaction */, false /* skip_filters */,
+ static_cast<int>(which) /* level */, range_del_agg);
}
}
}
}
}
}
+#else
+ (void)c;
#endif
return true; // everything good
}
FileMetaData** meta,
ColumnFamilyData** cfd) {
for (auto cfd_iter : *column_family_set_) {
+ if (!cfd_iter->initialized()) {
+ continue;
+ }
Version* version = cfd_iter->current();
const auto* vstorage = version->storage_info();
for (int level = 0; level < vstorage->num_levels(); level++) {
void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
for (auto cfd : *column_family_set_) {
- if (cfd->IsDropped()) {
+ if (cfd->IsDropped() || !cfd->initialized()) {
continue;
}
for (int level = 0; level < cfd->NumberLevels(); level++) {
LiveFileMetaData filemetadata;
filemetadata.column_family_name = cfd->GetName();
uint32_t path_id = file->fd.GetPathId();
- if (path_id < db_options_->db_paths.size()) {
- filemetadata.db_path = db_options_->db_paths[path_id].path;
+ if (path_id < cfd->ioptions()->cf_paths.size()) {
+ filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
} else {
- assert(!db_options_->db_paths.empty());
- filemetadata.db_path = db_options_->db_paths.back().path;
+ assert(!cfd->ioptions()->cf_paths.empty());
+ filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
}
filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
filemetadata.level = level;
- filemetadata.size = file->fd.GetFileSize();
+ filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
filemetadata.smallestkey = file->smallest.user_key().ToString();
filemetadata.largestkey = file->largest.user_key().ToString();
- filemetadata.smallest_seqno = file->smallest_seqno;
- filemetadata.largest_seqno = file->largest_seqno;
+ filemetadata.smallest_seqno = file->fd.smallest_seqno;
+ filemetadata.largest_seqno = file->fd.largest_seqno;
metadata->push_back(filemetadata);
}
}
}
}
-void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files,
+void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
std::vector<std::string>* manifest_filenames,
uint64_t min_pending_output) {
assert(manifest_filenames->empty());
obsolete_manifests_.swap(*manifest_filenames);
- std::vector<FileMetaData*> pending_files;
- for (auto f : obsolete_files_) {
- if (f->fd.GetNumber() < min_pending_output) {
- files->push_back(f);
+ std::vector<ObsoleteFileInfo> pending_files;
+ for (auto& f : obsolete_files_) {
+ if (f.metadata->fd.GetNumber() < min_pending_output) {
+ files->push_back(std::move(f));
} else {
- pending_files.push_back(f);
+ pending_files.push_back(std::move(f));
}
}
obsolete_files_.swap(pending_files);
const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
assert(edit->is_column_family_add_);
- Version* dummy_versions = new Version(nullptr, this);
+ MutableCFOptions dummy_cf_options;
+ Version* dummy_versions =
+ new Version(nullptr, this, env_options_, dummy_cf_options);
// Ref() dummy version once so that later we can call Unref() to delete it
// by avoiding calling "delete" explicitly (~Version is private)
dummy_versions->Ref();
edit->column_family_name_, edit->column_family_, dummy_versions,
cf_options);
- Version* v = new Version(new_cfd, this, current_version_number_++);
+ Version* v = new Version(new_cfd, this, env_options_,
+ *new_cfd->GetLatestMutableCFOptions(),
+ current_version_number_++);
// Fill level target base information.
v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),