// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "db/forward_iterator.h"
namespace rocksdb {
// Usage:
-// LevelIterator iter;
+// ForwardLevelIterator iter;
// iter.SetFileIndex(file_index);
-// iter.Seek(target);
+// iter.Seek(target); // or iter.SeekToFirst();
// iter.Next()
-class LevelIterator : public InternalIterator {
+class ForwardLevelIterator : public InternalIterator {
public:
- LevelIterator(const ColumnFamilyData* const cfd,
- const ReadOptions& read_options,
- const std::vector<FileMetaData*>& files)
+ ForwardLevelIterator(const ColumnFamilyData* const cfd,
+ const ReadOptions& read_options,
+ const std::vector<FileMetaData*>& files,
+ const SliceTransform* prefix_extractor)
: cfd_(cfd),
read_options_(read_options),
files_(files),
valid_(false),
file_index_(std::numeric_limits<uint32_t>::max()),
file_iter_(nullptr),
- pinned_iters_mgr_(nullptr) {}
+ pinned_iters_mgr_(nullptr),
+ prefix_extractor_(prefix_extractor) {}
- ~LevelIterator() {
+ ~ForwardLevelIterator() {
// Reset current pointer
if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
pinned_iters_mgr_->PinIterator(file_iter_);
void SetFileIndex(uint32_t file_index) {
assert(file_index < files_.size());
+ status_ = Status::OK();
if (file_index != file_index_) {
file_index_ = file_index;
Reset();
}
- valid_ = false;
}
void Reset() {
assert(file_index_ < files_.size());
cfd_->internal_comparator(), {} /* snapshots */);
file_iter_ = cfd_->table_cache()->NewIterator(
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
- files_[file_index_]->fd, read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
- nullptr /* table_reader_ptr */, nullptr, false);
+ *files_[file_index_],
+ read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
+ prefix_extractor_, nullptr /* table_reader_ptr */, nullptr, false);
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
+ valid_ = false;
if (!range_del_agg.IsEmpty()) {
status_ = Status::NotSupported(
"Range tombstones unsupported with ForwardIterator");
- valid_ = false;
}
}
void SeekToLast() override {
- status_ = Status::NotSupported("LevelIterator::SeekToLast()");
+ status_ = Status::NotSupported("ForwardLevelIterator::SeekToLast()");
valid_ = false;
}
void Prev() override {
- status_ = Status::NotSupported("LevelIterator::Prev()");
+ status_ = Status::NotSupported("ForwardLevelIterator::Prev()");
valid_ = false;
}
bool Valid() const override {
return valid_;
}
void SeekToFirst() override {
- SetFileIndex(0);
+ assert(file_iter_ != nullptr);
+ if (!status_.ok()) {
+ assert(!valid_);
+ return;
+ }
file_iter_->SeekToFirst();
valid_ = file_iter_->Valid();
}
void Seek(const Slice& internal_key) override {
assert(file_iter_ != nullptr);
+
+ // This deviates from the usual convention for InternalIterator::Seek() in
+ // that it doesn't discard pre-existing error status. That's because this
+ // Seek() is only supposed to be called immediately after SetFileIndex()
+ // (which discards pre-existing error status), and SetFileIndex() may set
+ // an error status, which we shouldn't discard.
+ if (!status_.ok()) {
+ assert(!valid_);
+ return;
+ }
+
file_iter_->Seek(internal_key);
valid_ = file_iter_->Valid();
}
- void SeekForPrev(const Slice& internal_key) override {
- status_ = Status::NotSupported("LevelIterator::SeekForPrev()");
+ void SeekForPrev(const Slice& /*internal_key*/) override {
+ status_ = Status::NotSupported("ForwardLevelIterator::SeekForPrev()");
valid_ = false;
}
void Next() override {
assert(valid_);
file_iter_->Next();
for (;;) {
- if (file_iter_->status().IsIncomplete() || file_iter_->Valid()) {
- valid_ = !file_iter_->status().IsIncomplete();
+ valid_ = file_iter_->Valid();
+ if (!file_iter_->status().ok()) {
+ assert(!valid_);
+ return;
+ }
+ if (valid_) {
return;
}
if (file_index_ + 1 >= files_.size()) {
return;
}
SetFileIndex(file_index_ + 1);
+ if (!status_.ok()) {
+ assert(!valid_);
+ return;
+ }
file_iter_->SeekToFirst();
}
}
Status status() const override {
if (!status_.ok()) {
return status_;
- } else if (file_iter_ && !file_iter_->status().ok()) {
+ } else if (file_iter_) {
return file_iter_->status();
}
return Status::OK();
Status status_;
InternalIterator* file_iter_;
PinnedIteratorsManager* pinned_iters_mgr_;
+ const SliceTransform* prefix_extractor_;
};
ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
: db_(db),
read_options_(read_options),
cfd_(cfd),
- prefix_extractor_(cfd->ioptions()->prefix_extractor),
+ prefix_extractor_(current_sv->mutable_cf_options.prefix_extractor.get()),
user_comparator_(cfd->user_comparator()),
immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
sv_(current_sv),
Cleanup(true);
}
-namespace {
-// Used in PinnedIteratorsManager to release pinned SuperVersion
-static void ReleaseSuperVersionFunc(void* sv) {
- delete reinterpret_cast<SuperVersion*>(sv);
-}
-} // namespace
-
-void ForwardIterator::SVCleanup() {
- if (sv_ != nullptr && sv_->Unref()) {
+void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
+ bool background_purge_on_iterator_cleanup) {
+ if (sv->Unref()) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
- db_->mutex_.Lock();
- sv_->Cleanup();
- db_->FindObsoleteFiles(&job_context, false, true);
- if (read_options_.background_purge_on_iterator_cleanup) {
- db_->ScheduleBgLogWriterClose(&job_context);
- }
- db_->mutex_.Unlock();
- if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
- pinned_iters_mgr_->PinPtr(sv_, &ReleaseSuperVersionFunc);
- } else {
- delete sv_;
- }
+ db->mutex_.Lock();
+ sv->Cleanup();
+ db->FindObsoleteFiles(&job_context, false, true);
+ if (background_purge_on_iterator_cleanup) {
+ db->ScheduleBgLogWriterClose(&job_context);
+ }
+ db->mutex_.Unlock();
+ delete sv;
if (job_context.HaveSomethingToDelete()) {
- db_->PurgeObsoleteFiles(
- job_context, read_options_.background_purge_on_iterator_cleanup);
+ db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
}
job_context.Clean();
}
}
+namespace {
+struct SVCleanupParams {
+ DBImpl* db;
+ SuperVersion* sv;
+ bool background_purge_on_iterator_cleanup;
+};
+}
+
+// Used in PinnedIteratorsManager to release pinned SuperVersion
+void ForwardIterator::DeferredSVCleanup(void* arg) {
+ auto d = reinterpret_cast<SVCleanupParams*>(arg);
+ ForwardIterator::SVCleanup(
+ d->db, d->sv, d->background_purge_on_iterator_cleanup);
+ delete d;
+}
+
+void ForwardIterator::SVCleanup() {
+ if (sv_ == nullptr) {
+ return;
+ }
+ if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
+ // pinned_iters_mgr_ tells us to make sure that all visited key-value slices
+ // are alive until pinned_iters_mgr_->ReleasePinnedData() is called.
+ // The slices may point into some memtables owned by sv_, so we need to keep
+ // sv_ referenced until pinned_iters_mgr_ unpins everything.
+ auto p = new SVCleanupParams{
+ db_, sv_, read_options_.background_purge_on_iterator_cleanup};
+ pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup);
+ } else {
+ SVCleanup(db_, sv_, read_options_.background_purge_on_iterator_cleanup);
+ }
+}
+
void ForwardIterator::Cleanup(bool release_sv) {
if (mutable_iter_ != nullptr) {
DeleteIterator(mutable_iter_, true /* is_arena */);
}
void ForwardIterator::Seek(const Slice& internal_key) {
- if (IsOverUpperBound(internal_key)) {
- valid_ = false;
- }
if (sv_ == nullptr) {
RebuildIterators(true);
} else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
if (!l0_iters_[i]->status().ok()) {
immutable_status_ = l0_iters_[i]->status();
- } else if (l0_iters_[i]->Valid()) {
- if (!IsOverUpperBound(l0_iters_[i]->key())) {
- immutable_min_heap_.push(l0_iters_[i]);
- } else {
- has_iter_trimmed_for_upper_bound_ = true;
- DeleteIterator(l0_iters_[i]);
- l0_iters_[i] = nullptr;
- }
+ } else if (l0_iters_[i]->Valid() &&
+ !IsOverUpperBound(l0_iters_[i]->key())) {
+ immutable_min_heap_.push(l0_iters_[i]);
+ } else {
+ has_iter_trimmed_for_upper_bound_ = true;
+ DeleteIterator(l0_iters_[i]);
+ l0_iters_[i] = nullptr;
}
}
if (!level_iters_[level - 1]->status().ok()) {
immutable_status_ = level_iters_[level - 1]->status();
- } else if (level_iters_[level - 1]->Valid()) {
- if (!IsOverUpperBound(level_iters_[level - 1]->key())) {
- immutable_min_heap_.push(level_iters_[level - 1]);
- } else {
- // Nothing in this level is interesting. Remove.
- has_iter_trimmed_for_upper_bound_ = true;
- DeleteIterator(level_iters_[level - 1]);
- level_iters_[level - 1] = nullptr;
- }
+ } else if (level_iters_[level - 1]->Valid() &&
+ !IsOverUpperBound(level_iters_[level - 1]->key())) {
+ immutable_min_heap_.push(level_iters_[level - 1]);
+ } else {
+ // Nothing in this level is interesting. Remove.
+ has_iter_trimmed_for_upper_bound_ = true;
+ DeleteIterator(level_iters_[level - 1]);
+ level_iters_[level - 1] = nullptr;
}
}
}
}
// Set PinnedIteratorsManager for L1+ levels iterators.
- for (LevelIterator* child_iter : level_iters_) {
+ for (ForwardLevelIterator* child_iter : level_iters_) {
if (child_iter) {
child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
}
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
}
RangeDelAggregator range_del_agg(
- InternalKeyComparator(cfd_->internal_comparator()), {} /* snapshots */);
+ cfd_->internal_comparator(), {} /* snapshots */);
mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
if (!read_options_.ignore_range_deletions) {
if ((read_options_.iterate_upper_bound != nullptr) &&
cfd_->internal_comparator().user_comparator()->Compare(
l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
- has_iter_trimmed_for_upper_bound_ = true;
+ // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator
+ // will never be interested in files with smallest key above
+ // iterate_upper_bound, since iterate_upper_bound can't be changed.
l0_iters_.push_back(nullptr);
continue;
}
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
- read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd,
- read_options_.ignore_range_deletions ? nullptr : &range_del_agg));
+ read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
+ read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
+ sv_->mutable_cf_options.prefix_extractor.get()));
}
BuildLevelIterators(vstorage);
current_ = nullptr;
mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
RangeDelAggregator range_del_agg(
- InternalKeyComparator(cfd_->internal_comparator()), {} /* snapshots */);
+ cfd_->internal_comparator(), {} /* snapshots */);
if (!read_options_.ignore_range_deletions) {
std::unique_ptr<InternalIterator> range_del_iter(
svnew->mem->NewRangeTombstoneIterator(read_options_));
range_del_agg.AddTombstones(std::move(range_del_iter));
- sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
- &range_del_agg);
+ svnew->imm->AddRangeTombstoneIterators(read_options_, &arena_,
+ &range_del_agg);
}
const auto* vstorage = sv_->current->storage_info();
}
l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
- l0_files_new[inew]->fd,
- read_options_.ignore_range_deletions ? nullptr : &range_del_agg));
+ *l0_files_new[inew],
+ read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
+ svnew->mutable_cf_options.prefix_extractor.get()));
}
for (auto* f : l0_iters_) {
has_iter_trimmed_for_upper_bound_ = true;
}
} else {
- level_iters_.push_back(
- new LevelIterator(cfd_, read_options_, level_files));
+ level_iters_.push_back(new ForwardLevelIterator(
+ cfd_, read_options_, level_files,
+ sv_->mutable_cf_options.prefix_extractor.get()));
}
}
}
DeleteIterator(l0_iters_[i]);
l0_iters_[i] = cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
- l0_files[i]->fd, nullptr /* range_del_agg */);
+ *l0_files[i], nullptr /* range_del_agg */,
+ sv_->mutable_cf_options.prefix_extractor.get());
l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
}
current_ = mutable_iter_;
}
}
- valid_ = (current_ != nullptr);
+ valid_ = current_ != nullptr && immutable_status_.ok();
if (!status_.ok()) {
status_ = Status::OK();
}
uint32_t ForwardIterator::FindFileInRange(
const std::vector<FileMetaData*>& files, const Slice& internal_key,
uint32_t left, uint32_t right) {
- while (left < right) {
- uint32_t mid = (left + right) / 2;
- const FileMetaData* f = files[mid];
- if (cfd_->internal_comparator().InternalKeyComparator::Compare(
- f->largest.Encode(), internal_key) < 0) {
- // Key at "mid.largest" is < "target". Therefore all
- // files at or before "mid" are uninteresting.
- left = mid + 1;
- } else {
- // Key at "mid.largest" is >= "target". Therefore all files
- // after "mid" are uninteresting.
- right = mid;
- }
- }
- return right;
+ auto cmp = [&](const FileMetaData* f, const Slice& key) -> bool {
+ return cfd_->internal_comparator().InternalKeyComparator::Compare(
+ f->largest.Encode(), key) < 0;
+ };
+ const auto &b = files.begin();
+ return static_cast<uint32_t>(std::lower_bound(b + left,
+ b + right, internal_key, cmp) - b);
}
void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {