// iter.Next()
class ForwardLevelIterator : public InternalIterator {
public:
- ForwardLevelIterator(const ColumnFamilyData* const cfd,
- const ReadOptions& read_options,
- const std::vector<FileMetaData*>& files,
- const SliceTransform* prefix_extractor,
- bool allow_unprepared_value)
+ ForwardLevelIterator(
+ const ColumnFamilyData* const cfd, const ReadOptions& read_options,
+ const std::vector<FileMetaData*>& files,
+ const std::shared_ptr<const SliceTransform>& prefix_extractor,
+ bool allow_unprepared_value)
: cfd_(cfd),
read_options_(read_options),
files_(files),
file_iter_(nullptr),
pinned_iters_mgr_(nullptr),
prefix_extractor_(prefix_extractor),
- allow_unprepared_value_(allow_unprepared_value) {}
+ allow_unprepared_value_(allow_unprepared_value) {
+ status_.PermitUncheckedError(); // Allow uninitialized status through
+ }
~ForwardLevelIterator() override {
// Reset current pointer
status_ = Status::NotSupported("ForwardLevelIterator::Prev()");
valid_ = false;
}
- bool Valid() const override {
- return valid_;
- }
+ bool Valid() const override { return valid_; }
void SeekToFirst() override {
assert(file_iter_ != nullptr);
if (!status_.ok()) {
Status status_;
InternalIterator* file_iter_;
PinnedIteratorsManager* pinned_iters_mgr_;
- const SliceTransform* prefix_extractor_;
+ // Kept alive by ForwardIterator::sv_->mutable_cf_options
+ const std::shared_ptr<const SliceTransform>& prefix_extractor_;
const bool allow_unprepared_value_;
};
if (sv_) {
RebuildIterators(false);
}
-}
-ForwardIterator::~ForwardIterator() {
- Cleanup(true);
+ // immutable_status_ is a local aggregation of the
+ // status of the immutable Iterators.
+ // We have to PermitUncheckedError in case it is never
+ // used, otherwise it will fail ASSERT_STATUS_CHECKED.
+ immutable_status_.PermitUncheckedError();
}
+ForwardIterator::~ForwardIterator() { Cleanup(true); }
+
void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
bool background_purge_on_iterator_cleanup) {
if (sv->Unref()) {
SuperVersion* sv;
bool background_purge_on_iterator_cleanup;
};
-}
+} // anonymous namespace
// Used in PinnedIteratorsManager to release pinned SuperVersion
void ForwardIterator::DeferredSVCleanup(void* arg) {
auto d = reinterpret_cast<SVCleanupParams*>(arg);
- ForwardIterator::SVCleanup(
- d->db, d->sv, d->background_purge_on_iterator_cleanup);
+ ForwardIterator::SVCleanup(d->db, d->sv,
+ d->background_purge_on_iterator_cleanup);
delete d;
}
} else if (immutable_status_.IsIncomplete()) {
ResetIncompleteIterators();
}
- SeekInternal(Slice(), true);
+ SeekInternal(Slice(), true, false);
}
bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
} else if (immutable_status_.IsIncomplete()) {
ResetIncompleteIterators();
}
- SeekInternal(internal_key, false);
+
+ SeekInternal(internal_key, false, false);
+ if (read_options_.async_io) {
+ SeekInternal(internal_key, false, true);
+ }
}
+// In case of async_io, SeekInternal is called twice with seek_after_async_io
+// enabled in second call which only does seeking part to retrieve the blocks.
void ForwardIterator::SeekInternal(const Slice& internal_key,
- bool seek_to_first) {
+ bool seek_to_first,
+ bool seek_after_async_io) {
assert(mutable_iter_);
// mutable
- seek_to_first ? mutable_iter_->SeekToFirst() :
- mutable_iter_->Seek(internal_key);
+ if (!seek_after_async_io) {
+ seek_to_first ? mutable_iter_->SeekToFirst()
+ : mutable_iter_->Seek(internal_key);
+ }
// immutable
// TODO(ljin): NeedToSeekImmutable has negative impact on performance
// if it turns to need to seek immutable often. We probably want to have
// an option to turn it off.
- if (seek_to_first || NeedToSeekImmutable(internal_key)) {
- immutable_status_ = Status::OK();
- if (has_iter_trimmed_for_upper_bound_ &&
- (
- // prev_ is not set yet
- is_prev_set_ == false ||
- // We are doing SeekToFirst() and internal_key.size() = 0
- seek_to_first ||
- // prev_key_ > internal_key
- cfd_->internal_comparator().InternalKeyComparator::Compare(
- prev_key_.GetInternalKey(), internal_key) > 0)) {
- // Some iterators are trimmed. Need to rebuild.
- RebuildIterators(true);
- // Already seeked mutable iter, so seek again
- seek_to_first ? mutable_iter_->SeekToFirst()
- : mutable_iter_->Seek(internal_key);
- }
- {
- auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
- immutable_min_heap_.swap(tmp);
- }
- for (size_t i = 0; i < imm_iters_.size(); i++) {
- auto* m = imm_iters_[i];
- seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
- if (!m->status().ok()) {
- immutable_status_ = m->status();
- } else if (m->Valid()) {
- immutable_min_heap_.push(m);
+ if (seek_to_first || seek_after_async_io ||
+ NeedToSeekImmutable(internal_key)) {
+ if (!seek_after_async_io) {
+ immutable_status_ = Status::OK();
+ if (has_iter_trimmed_for_upper_bound_ &&
+ (
+ // prev_ is not set yet
+ is_prev_set_ == false ||
+ // We are doing SeekToFirst() and internal_key.size() = 0
+ seek_to_first ||
+ // prev_key_ > internal_key
+ cfd_->internal_comparator().InternalKeyComparator::Compare(
+ prev_key_.GetInternalKey(), internal_key) > 0)) {
+ // Some iterators are trimmed. Need to rebuild.
+ RebuildIterators(true);
+ // Already seeked mutable iter, so seek again
+ seek_to_first ? mutable_iter_->SeekToFirst()
+ : mutable_iter_->Seek(internal_key);
+ }
+ {
+ auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
+ immutable_min_heap_.swap(tmp);
+ }
+ for (size_t i = 0; i < imm_iters_.size(); i++) {
+ auto* m = imm_iters_[i];
+ seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
+ if (!m->status().ok()) {
+ immutable_status_ = m->status();
+ } else if (m->Valid()) {
+ immutable_min_heap_.push(m);
+ }
}
}
if (!l0_iters_[i]) {
continue;
}
+ if (seek_after_async_io) {
+ if (!l0_iters_[i]->status().IsTryAgain()) {
+ continue;
+ }
+ }
+
if (seek_to_first) {
l0_iters_[i]->SeekToFirst();
} else {
- // If the target key passes over the larget key, we are sure Next()
+ // If the target key passes over the largest key, we are sure Next()
// won't go over this file.
- if (user_comparator_->Compare(target_user_key,
+ if (seek_after_async_io == false &&
+ user_comparator_->Compare(target_user_key,
l0[i]->largest.user_key()) > 0) {
if (read_options_.iterate_upper_bound != nullptr) {
has_iter_trimmed_for_upper_bound_ = true;
l0_iters_[i]->Seek(internal_key);
}
- if (!l0_iters_[i]->status().ok()) {
+ if (l0_iters_[i]->status().IsTryAgain()) {
+ assert(!seek_after_async_io);
+ continue;
+ } else if (!l0_iters_[i]->status().ok()) {
immutable_status_ = l0_iters_[i]->status();
} else if (l0_iters_[i]->Valid() &&
!IsOverUpperBound(l0_iters_[i]->key())) {
if (level_iters_[level - 1] == nullptr) {
continue;
}
+
+ if (seek_after_async_io) {
+ if (!level_iters_[level - 1]->status().IsTryAgain()) {
+ continue;
+ }
+ }
uint32_t f_idx = 0;
- if (!seek_to_first) {
+ if (!seek_to_first && !seek_after_async_io) {
f_idx = FindFileInRange(level_files, internal_key, 0,
static_cast<uint32_t>(level_files.size()));
}
// Seek
- if (f_idx < level_files.size()) {
- level_iters_[level - 1]->SetFileIndex(f_idx);
- seek_to_first ? level_iters_[level - 1]->SeekToFirst() :
- level_iters_[level - 1]->Seek(internal_key);
+ if (seek_after_async_io || f_idx < level_files.size()) {
+ if (!seek_after_async_io) {
+ level_iters_[level - 1]->SetFileIndex(f_idx);
+ }
+ seek_to_first ? level_iters_[level - 1]->SeekToFirst()
+ : level_iters_[level - 1]->Seek(internal_key);
- if (!level_iters_[level - 1]->status().ok()) {
+ if (level_iters_[level - 1]->status().IsTryAgain()) {
+ assert(!seek_after_async_io);
+ continue;
+ } else if (!level_iters_[level - 1]->status().ok()) {
immutable_status_ = level_iters_[level - 1]->status();
} else if (level_iters_[level - 1]->Valid() &&
!IsOverUpperBound(level_iters_[level - 1]->key())) {
immutable_min_heap_.push(current_);
}
- UpdateCurrent();
+ // For async_io, it should be updated when seek_after_async_io is true (in
+ // second call).
+ if (seek_to_first || !read_options_.async_io || seek_after_async_io) {
+ UpdateCurrent();
+ }
TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
}
assert(valid_);
bool update_prev_key = false;
- if (sv_ == nullptr ||
- sv_->version_number != cfd_->GetSuperVersionNumber()) {
+ if (sv_ == nullptr || sv_->version_number != cfd_->GetSuperVersionNumber()) {
std::string current_key = key().ToString();
Slice old_key(current_key.data(), current_key.size());
} else {
RenewIterators();
}
- SeekInternal(old_key, false);
+
+ SeekInternal(old_key, false, false);
+ if (read_options_.async_io) {
+ SeekInternal(old_key, false, true);
+ }
+
if (!valid_ || key().compare(old_key) != 0) {
return;
}
update_prev_key = true;
}
-
if (update_prev_key) {
prev_key_.SetInternalKey(current_->key());
is_prev_set_ = true;
assert(!current_->Valid());
assert(!current_->status().ok());
- assert(current_ != mutable_iter_); // memtable iterator can't fail
+ assert(current_ != mutable_iter_); // memtable iterator can't fail
assert(immutable_status_.ok());
valid_ = false;
Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
assert(prop != nullptr);
if (prop_name == "rocksdb.iterator.super-version-number") {
- *prop = ToString(sv_->version_number);
+ *prop = std::to_string(sv_->version_number);
return Status::OK();
}
return Status::InvalidArgument();
if (!read_options_.ignore_range_deletions) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
sv_->mem->NewRangeTombstoneIterator(
- read_options_, sv_->current->version_set()->LastSequence()));
+ read_options_, sv_->current->version_set()->LastSequence(),
+ false /* immutable_memtable */));
range_del_agg.AddTombstones(std::move(range_del_iter));
// Always return Status::OK().
Status temp_s = sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
- sv_->mutable_cf_options.prefix_extractor.get(),
+ sv_->mutable_cf_options.prefix_extractor,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_));
}
- BuildLevelIterators(vstorage);
+ BuildLevelIterators(vstorage, sv_);
current_ = nullptr;
is_prev_set_ = false;
if (!read_options_.ignore_range_deletions) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
svnew->mem->NewRangeTombstoneIterator(
- read_options_, sv_->current->version_set()->LastSequence()));
+ read_options_, sv_->current->version_set()->LastSequence(),
+ false /* immutable_memtable */));
range_del_agg.AddTombstones(std::move(range_del_iter));
// Always return Status::OK().
Status temp_s = svnew->imm->AddRangeTombstoneIterators(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
*l0_files_new[inew],
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
- svnew->mutable_cf_options.prefix_extractor.get(),
+ svnew->mutable_cf_options.prefix_extractor,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
DeleteIterator(l);
}
level_iters_.clear();
- BuildLevelIterators(vstorage_new);
+ BuildLevelIterators(vstorage_new, svnew);
current_ = nullptr;
is_prev_set_ = false;
SVCleanup();
}
}
-void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
+void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage,
+ SuperVersion* sv) {
level_iters_.reserve(vstorage->num_levels() - 1);
for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
const auto& level_files = vstorage->LevelFiles(level);
} else {
level_iters_.push_back(new ForwardLevelIterator(
cfd_, read_options_, level_files,
- sv_->mutable_cf_options.prefix_extractor.get(),
- allow_unprepared_value_));
+ sv->mutable_cf_options.prefix_extractor, allow_unprepared_value_));
}
}
}
l0_iters_[i] = cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
*l0_files[i], /*range_del_agg=*/nullptr,
- sv_->mutable_cf_options.prefix_extractor.get(),
+ sv_->mutable_cf_options.prefix_extractor,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
}
Slice prev_key = prev_key_.GetInternalKey();
if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
- prefix_extractor_->Transform(prev_key)) != 0) {
+ prefix_extractor_->Transform(prev_key)) != 0) {
return true;
}
if (cfd_->internal_comparator().InternalKeyComparator::Compare(
- prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
+ prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
return true;
}
return false;
}
if (cfd_->internal_comparator().InternalKeyComparator::Compare(
- target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
- : current_->key()) > 0) {
+ target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
+ : current_->key()) > 0) {
return true;
}
return false;
uint32_t ForwardIterator::FindFileInRange(
const std::vector<FileMetaData*>& files, const Slice& internal_key,
uint32_t left, uint32_t right) {
- auto cmp = [&](const FileMetaData* f, const Slice& key) -> bool {
+ auto cmp = [&](const FileMetaData* f, const Slice& k) -> bool {
return cfd_->internal_comparator().InternalKeyComparator::Compare(
- f->largest.Encode(), key) < 0;
+ f->largest.Encode(), k) < 0;
};
- const auto &b = files.begin();
- return static_cast<uint32_t>(std::lower_bound(b + left,
- b + right, internal_key, cmp) - b);
+ const auto& b = files.begin();
+ return static_cast<uint32_t>(
+ std::lower_bound(b + left, b + right, internal_key, cmp) - b);
}
void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {